Mengonsumsi data secara langsung dari Simple Log Service menggunakan SDK sering kali tidak mencukupi untuk tugas kompleks seperti load balancing dan failover, terutama saat mengintegrasikan dengan perangkat lunak pihak ketiga, aplikasi multi-bahasa, layanan cloud, atau framework komputasi aliran. Dalam kasus tersebut, gunakan consumer groups untuk mengonsumsi data. Consumer groups menyediakan konsumsi hampir real-time dengan latensi biasanya dalam hitungan detik. Topik ini menjelaskan cara mengonsumsi data menggunakan consumer groups.
Overview
Sebuah Logstore berisi beberapa shard. Saat Anda menggunakan consumer group untuk mengonsumsi data, shard diberikan kepada konsumen dalam grup tersebut sebagai berikut:
-
Dalam satu consumer group, setiap shard hanya diberikan kepada satu konsumen.
-
Dalam satu consumer group, satu konsumen dapat diberikan beberapa shard.
Saat konsumen baru bergabung ke dalam consumer group, shard akan ditugaskan ulang untuk menyeimbangkan beban. Penugasan ulang ini mengikuti prinsip yang dijelaskan di atas.
Key concepts
|
Term
|
Description
|
|
consumer group
|
Consumer group terdiri dari beberapa konsumen yang bekerja sama untuk memproses data dari sebuah Logstore. Konsumen dalam grup yang sama berbagi workload, memastikan bahwa setiap catatan data diproses hanya oleh satu konsumen dalam grup tersebut.
Penting
Anda dapat membuat hingga 30 consumer group untuk setiap Logstore.
|
|
consumer
|
Unit dalam consumer group yang mengonsumsi data.
Penting
Konsumen dalam consumer group yang sama harus memiliki nama unik.
|
|
Logstore
|
Unit dasar untuk pengumpulan data, penyimpanan, dan kueri. Untuk informasi lebih lanjut, lihat Logstore.
|
|
shard
|
Unit dasar Logstore yang menyediakan kapasitas baca dan tulis tetap. Semua data dalam Logstore disimpan dalam shard. Untuk informasi lebih lanjut, lihat shard.
|
|
checkpoint
|
Penanda yang menunjukkan posisi terakhir yang telah dikonsumsi oleh konsumen. Setelah restart, konsumen menggunakan checkpoint untuk melanjutkan konsumsi.
Catatan
Saat menggunakan consumer group, checkpoint disimpan secara otomatis jika proses konsumen gagal. Setelah pemulihan, proses melanjutkan konsumsi dari checkpoint yang tersimpan untuk mencegah konsumsi duplikat.
|
Step 1: Create a consumer group
Bagian ini menunjukkan cara membuat consumer group menggunakan SDK, API, atau CLI.
SDK
Kode berikut menunjukkan cara membuat consumer group:
CreateConsumerGroup.java
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.exception.LogException;
public class CreateConsumerGroup {
public static void main(String[] args) throws LogException {
// Contoh ini mengambil ID AccessKey dan Rahasia AccessKey dari variabel lingkungan.
String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
// Masukkan nama proyek.
String projectName = "ali-test-project";
// Masukkan nama Logstore.
String logstoreName = "ali-test-logstore";
// Tetapkan titik akhir untuk Simple Log Service. Contoh ini menggunakan titik akhir wilayah China (Hangzhou). Ganti dengan titik akhir yang sebenarnya.
String host = "https://cn-hangzhou.log.aliyuncs.com";
// Buat klien Simple Log Service.
Client client = new Client(host, accessId, accessKey);
try {
// Tetapkan nama consumer group.
String consumerGroupName = "ali-test-consumergroup2";
System.out.println("ready to create consumergroup");
ConsumerGroup consumerGroup = new ConsumerGroup(consumerGroupName, 300, true);
client.CreateConsumerGroup(projectName, logstoreName, consumerGroup);
System.out.println(String.format("create consumergroup %s success", consumerGroupName));
} catch (LogException e) {
System.out.println("LogException e :" + e.toString());
System.out.println("error code :" + e.GetErrorCode());
System.out.println("error message :" + e.GetErrorMessage());
throw e;
}
}
}
Untuk contoh kode lainnya, lihat Use the Java SDK to manage consumer groups dan Use the Log Service SDK for Python to manage consumer groups.
Step 2: Consume logs
How it works
Saat konsumen pertama kali dijalankan, SDK akan membuat consumer group jika belum ada. Pengaturan start consumption checkpoint menentukan posisi awal dan hanya berlaku saat consumer group pertama kali dibuat. Pada restart berikutnya, konsumen melanjutkan dari checkpoint konsumsi terakhir yang disimpan di server. Contohnya:
-
LogHubConfig.ConsumePosition.BEGIN_CURSOR: Consumer group memulai konsumsi dari log paling awal yang tersedia di Logstore.
-
LogHubConfig.ConsumePosition.END_CURSOR: Consumer group hanya memproses log baru yang tiba setelah konsumen dijalankan.
Consumption examples
Anda dapat menggunakan SDK untuk Java, C++, Python, dan Go untuk mengonsumsi data dari consumer group. Bagian ini memberikan contoh menggunakan Java SDK.
SDK
-
Tambahkan dependensi Maven.
Dalam file pom.xml Anda, tambahkan dependensi berikut:
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>loghub-client-lib</artifactId>
<version>0.6.50</version>
</dependency>
-
Buat logika konsumsi. Kode berikut merupakan contohnya:
SampleLogHubProcessor.java
import com.aliyun.openservices.log.common.FastLog;
import com.aliyun.openservices.log.common.FastLogContent;
import com.aliyun.openservices.log.common.FastLogGroup;
import com.aliyun.openservices.log.common.FastLogTag;
import com.aliyun.openservices.log.common.LogGroupData;
import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
import java.util.List;
public class SampleLogHubProcessor implements ILogHubProcessor {
private int shardId;
// Melacak stempel waktu penyimpanan checkpoint terakhir.
private long mLastSaveTime = 0;
// Dipanggil saat prosesor diinisialisasi.
public void initialize(int shardId) {
this.shardId = shardId;
}
// Logika utama konsumsi data. Tangani semua pengecualian dalam metode ini; jangan lemparkan kembali.
public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) {
// Cetak data yang diambil.
for (LogGroupData logGroup : logGroups) {
FastLogGroup fastLogGroup = logGroup.GetFastLogGroup();
System.out.println("Tags");
for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) {
FastLogTag logTag = fastLogGroup.getLogTags(i);
System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue());
}
for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) {
FastLog log = fastLogGroup.getLogs(i);
System.out.println("--------\nLog: " + i + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
for (int j = 0; j < log.getContentsCount(); ++j) {
FastLogContent content = log.getContents(j);
System.out.println(content.getKey() + "\t:\t" + content.getValue());
}
}
}
long curTime = System.currentTimeMillis();
// Simpan checkpoint ke server setiap 30 detik. Jika worker berhenti secara tak terduga, worker baru melanjutkan konsumsi dari checkpoint terakhir, yang dapat menyebabkan sejumlah kecil data dikonsumsi ulang.
try {
if (curTime - mLastSaveTime > 30 * 1000) {
// <code>true
false
Untuk contoh kode lainnya, lihat aliyun-log-consumer-java dan Aliyun LOG Go Consumer.
-
Buat factory instance konsumen. Kode berikut merupakan contohnya:
SampleLogHubProcessorFactory.java
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
class SampleLogHubProcessorFactory implements ILogHubProcessorFactory {
public ILogHubProcessor generatorProcessor() {
// Menghasilkan instance konsumen. Catatan: Metode ini harus mengembalikan objek SampleLogHubProcessor baru pada setiap pemanggilan.
return new SampleLogHubProcessor();
}
}
-
Buat konsumen dan mulai thread konsumen untuk mengonsumsi data dari Logstore yang ditentukan. Kode berikut merupakan contohnya:
Main.java
import com.aliyun.openservices.loghub.client.ClientWorker;
import com.aliyun.openservices.loghub.client.config.LogHubConfig;
import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
public class Main {
// Titik akhir untuk Simple Log Service. Ganti ini dengan titik akhir aktual Anda. Contoh ini menggunakan titik akhir untuk Wilayah Tiongkok (Hangzhou).
private static String Endpoint = "cn-hangzhou.log.aliyuncs.com";
// Nama proyek Anda.
private static String Project = "ali-test-project";
// Nama Logstore Anda.
private static String Logstore = "ali-test-logstore";
// Nama kelompok konsumen Anda. Anda tidak perlu membuat kelompok konsumen terlebih dahulu. Program akan membuatnya secara otomatis saat waktu proses.
private static String ConsumerGroup = "ali-test-consumergroup2";
// Mengambil ID AccessKey dan Rahasia AccessKey dari variabel lingkungan.
private static String AccessKeyId= System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
// 'consumer_1' adalah nama konsumen, yang harus unik dalam satu kelompok konsumen. Untuk menyeimbangkan konsumsi dari Logstore di beberapa mesin, gunakan pengidentifikasi unik seperti alamat IP mesin untuk setiap konsumen.
// maxFetchLogGroupSize: Jumlah maksimum kelompok log yang diambil per permintaan. Nilai default direkomendasikan. Untuk mengubahnya, gunakan config.setMaxFetchLogGroupSize(100). Nilai harus berada dalam rentang (0, 1000].
LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR,1000);
ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
Thread thread = new Thread(worker);
// Thread memulai ClientWorker, yang mengimplementasikan antarmuka Runnable dan berjalan secara otomatis.
thread.start();
Thread.sleep(60 * 60 * 1000);
// Menghentikan instans konsumen dengan memanggil <code>worker.shutdown()
-
Jalankan Main.java.
Sebagai contoh, saat mengonsumsi log NGINX, outputnya menyerupai berikut:
: GET
request_uri : /request/path-3/file-7
status : 200
body_bytes_sent : 3820
host : www.example.com
request_time : 43
request_length : 1987
http_user_agent : Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36
http_referer : www.example.com
http_x_forwarded_for : 192.168.10.196
upstream_response_time : 0.02
--------
Log: 158, time: 1635629778, GetContentCount: 14
......
category : null
source : 127.0.0.1
topic : nginx_access_log
machineUUID : null
Tags
__receive_time__ : 1635629815
--------
Log: 0, time: 1635629788, GetContentCount: 14
......
category : null
source : 127.0.0.1
topic : nginx_access_log
machineUUID : null
Tags
__receive_time__ : 1635629877
--------
......
SDK and SPL
-
Tambahkan dependensi Maven.
Dalam file pom.xml Anda, tambahkan dependensi berikut:
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>loghub-client-lib</artifactId>
<version>0.6.50</version>
</dependency>
-
Buat logika konsumsi. Kode berikut merupakan contohnya:
SPLLogHubProcessor.java
import com.aliyun.openservices.log.common.FastLog;
import com.aliyun.openservices.log.common.FastLogContent;
import com.aliyun.openservices.log.common.FastLogGroup;
import com.aliyun.openservices.log.common.FastLogTag;
import com.aliyun.openservices.log.common.LogGroupData;
import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
import java.util.List;
public class SPLLogHubProcessor implements ILogHubProcessor {
private int shardId;
// Melacak stempel waktu penyimpanan checkpoint terakhir.
private long mLastSaveTime = 0;
// Dipanggil saat prosesor diinisialisasi.
public void initialize(int shardId) {
this.shardId = shardId;
}
// Logika utama konsumsi data. Tangani semua pengecualian dalam metode ini; jangan lemparkan kembali.
public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) {
// Cetak data yang diambil.
for (LogGroupData logGroup : logGroups) {
FastLogGroup fastLogGroup = logGroup.GetFastLogGroup();
System.out.println("Tags");
for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) {
FastLogTag logTag = fastLogGroup.getLogTags(i);
System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue());
}
for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) {
FastLog log = fastLogGroup.getLogs(i);
System.out.println("--------\nLog: " + i + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
for (int j = 0; j < log.getContentsCount(); ++j) {
FastLogContent content = log.getContents(j);
System.out.println(content.getKey() + "\t:\t" + content.getValue());
}
}
}
long curTime = System.currentTimeMillis();
// Simpan checkpoint ke server setiap 30 detik. Jika worker berhenti secara tak terduga, worker baru melanjutkan konsumsi dari checkpoint terakhir, yang dapat menyebabkan sejumlah kecil data dikonsumsi ulang.
try {
if (curTime - mLastSaveTime > 30 * 1000) {
// <code>true
false
-
Buat factory instance konsumen. Kode berikut merupakan contohnya:
SPLLogHubProcessorFactory.java
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
class SPLLogHubProcessorFactory implements ILogHubProcessorFactory {
public ILogHubProcessor generatorProcessor() {
// Menghasilkan instance konsumen. Catatan: Metode ini harus mengembalikan objek SPLLogHubProcessor baru pada setiap pemanggilan.
return new SPLLogHubProcessor();
}
}
-
Buat konsumen dan mulai thread konsumen untuk mengonsumsi data dari Logstore yang ditentukan. Kode berikut merupakan contohnya:
Main.java
import com.aliyun.openservices.loghub.client.ClientWorker;
import com.aliyun.openservices.loghub.client.config.LogHubConfig;
import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
public class Main {
// Titik akhir untuk Simple Log Service. Ganti dengan titik akhir Anda yang sebenarnya. Contoh ini menggunakan titik akhir wilayah China (Hangzhou).
private static String Endpoint = "cn-hangzhou.log.aliyuncs.com";
// Nama proyek Anda.
private static String Project = "ali-test-project";
// Nama Logstore Anda.
private static String Logstore = "ali-test-logstore";
// Nama consumer group Anda. Anda tidak perlu membuat consumer group terlebih dahulu. Program akan membuatnya secara otomatis saat runtime.
private static String ConsumerGroup = "ali-test-consumergroup2";
// Mengambil ID AccessKey dan Rahasia AccessKey dari variabel lingkungan.
private static String AccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
// 'consumer_1' adalah nama konsumen, yang harus unik dalam satu consumer group. Untuk menyeimbangkan konsumsi dari Logstore di beberapa mesin, gunakan pengenal unik seperti alamat IP mesin untuk setiap konsumen.
// maxFetchLogGroupSize: Jumlah maksimum grup log yang diambil per permintaan. Nilai default direkomendasikan. Untuk mengubahnya, gunakan config.setMaxFetchLogGroupSize(100). Nilainya harus dalam rentang (0, 1000].
LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR, 1000);
// Menentukan kueri Simple Log Service Processing Language (SPL) untuk konsumsi.
config.setQuery("* | where cast(body_bytes_sent as bigint) > 14000");
ClientWorker worker = new ClientWorker(new SPLLogHubProcessorFactory(), config);
Thread thread = new Thread(worker);
// Thread memulai ClientWorker, yang mengimplementasikan antarmuka Runnable dan berjalan secara otomatis.
thread.start();
Thread.sleep(60 * 60 * 1000);
// Hentikan instance konsumen dengan memanggil <code>worker.shutdown()
-
Jalankan Main.java.
Sebagai contoh, saat mengonsumsi log NGINX, outputnya menyerupai berikut:
: GET
request_uri : /request/path-3/file-7
status : 200
body_bytes_sent : 3820
host : www.example.com
request_time : 43
request_length : 1987
http_user_agent : Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36
http_referer : www.example.com
http_x_forwarded_for : 192.168.10.196
upstream_response_time : 0.02
--------
Log: 158, time: 1635629778, GetContentCount: 14
......
category : null
source : 127.0.0.1
topic : nginx_access_log
machineUUID : null
Tags
__receive_time__ : 1635629815
--------
Log: 0, time: 1635629788, GetContentCount: 14
......
category : null
source : 127.0.0.1
topic : nginx_access_log
machineUUID : null
Tags
__receive_time__ : 1635629877
--------
......
Step 3: View consumer group status
Bagian ini menjelaskan dua cara untuk melihat status consumer group.
Java SDK
-
Contoh kode berikut menunjukkan cara melihat checkpoint konsumsi untuk setiap shard:
ConsumerGroupTest.java
import java.util.List;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.Consts.CursorMode;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.common.ConsumerGroupShardCheckPoint;
import com.aliyun.openservices.log.exception.LogException;
public class ConsumerGroupTest {
static String endpoint = "cn-hangzhou.log.aliyuncs.com";
static String project = "ali-test-project";
static String logstore = "ali-test-logstore";
static String accesskeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
static String accesskey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
public static void main(String[] args) throws LogException {
Client client = new Client(endpoint, accesskeyId, accesskey);
// Dapatkan semua consumer group dalam Logstore. Mengembalikan daftar kosong jika tidak ada consumer group.
List<ConsumerGroup> consumerGroups = client.ListConsumerGroup(project, logstore).GetConsumerGroups();
for(ConsumerGroup c: consumerGroups){
// Cetak properti consumer group: nama, timeout heartbeat, dan status konsumsi terurut.
System.out.println("Name: " + c.getConsumerGroupName());
System.out.println("Heartbeat timeout: " + c.getTimeout());
System.out.println("Ordered consumption: " + c.isInOrder());
for(ConsumerGroupShardCheckPoint cp: client.GetCheckPoint(project, logstore, c.getConsumerGroupName()).GetCheckPoints()){
System.out.println("shard: " + cp.getShard());
// Stempel waktu dalam mikrodetik (long).
System.out.println("Last checkpoint update time: " + cp.getUpdateTime());
System.out.println("Consumer name: " + cp.getConsumer());
String consumerPrg = "";
if(cp.getCheckPoint().isEmpty())
consumerPrg = "Consumption has not started";
else{
// Stempel waktu UNIX dalam detik. Catatan: Format nilai ini untuk output.
try{
int prg = client.GetPrevCursorTime(project, logstore, cp.getShard(), cp.getCheckPoint()).GetCursorTime();
consumerPrg = "" + prg;
}
catch(LogException e){
if(e.GetErrorCode().equals("InvalidCursor"))
// Checkpoint tidak valid karena lebih lama dari periode retensi Logstore.
consumerPrg = "Invalid. The last consumption checkpoint is older than the Logstore's retention period.";
else{
// internal server error
throw e;
}
}
}
System.out.println("Consumption checkpoint: " + consumerPrg);
String endCursor = client.GetCursor(project, logstore, cp.getShard(), CursorMode.END).GetCursor();
int endPrg = 0;
try{
endPrg = client.GetPrevCursorTime(project, logstore, cp.getShard(), endCursor).GetCursorTime();
}
catch(LogException e){
// do nothing
}
// Stempel waktu UNIX dalam detik. Catatan: Format nilai ini untuk output.
System.out.println("Latest data arrival time: " + endPrg);
}
}
}
}
-
Output berikut dikembalikan:
Name: ali-test-consumergroup2
Heartbeat timeout: 60
Ordered consumption: false
shard: 0
Last checkpoint update time: 0
Consumer name: consumer_1
Consumption checkpoint: Consumption has not started
Latest data arrival time: 1729583617
shard: 1
Last checkpoint update time: 0
Consumer name: consumer_1
Consumption checkpoint: Consumption has not started
Latest data arrival time: 1729583738
Process finished with exit code 0
Console
-
Login ke Simple Log Service console.
Pada bagian Projects, klik proyek yang diinginkan.

-
Pada tab , klik ikon
di samping Logstore target, lalu klik ikon
di samping data consumption.
-
Dalam daftar consumer group, klik consumer group target.
-
Pada halaman Consumer Group Status, Anda dapat melihat checkpoint konsumsi untuk setiap shard.
Related operations
-
Authorize a RAM user
Untuk mengelola consumer group dengan RAM user, berikan izin yang diperlukan kepada pengguna tersebut. Untuk informasi lebih lanjut, lihat Create a RAM user and grant permissions.
Tabel berikut mencantumkan tindakan yang diperlukan.
|
Action
|
Description
|
Resource
|
|
log:GetCursorOrData (GetCursor)
|
Mendapatkan cursor berdasarkan waktu tertentu.
|
acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${LogStoreName}
|
|
log:CreateConsumerGroup (CreateConsumerGroup)
|
Membuat consumer group untuk Logstore tertentu.
|
acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${LogStoreName}/consumergroup/${consumerGroupName}
|
|
log:ListConsumerGroup (ListConsumerGroup)
|
Menampilkan semua consumer group untuk Logstore tertentu.
|
acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${LogStoreName}/consumergroup/*
|
|
log:ConsumerGroupUpdateCheckPoint (ConsumerGroupUpdateCheckPoint)
|
Memperbarui checkpoint shard untuk consumer group tertentu.
|
acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${LogStoreName}/consumergroup/${consumerGroupName}
|
|
log:ConsumerGroupHeartBeat (ConsumerGroupHeartBeat)
|
Mengirim heartbeat dari konsumen ke server.
|
acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${LogStoreName}/consumergroup/${consumerGroupName}
|
|
log:UpdateConsumerGroup (UpdateConsumerGroup)
|
Memodifikasi properti consumer group tertentu.
|
acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${LogStoreName}/consumergroup/${consumerGroupName}
|
|
log:GetConsumerGroupCheckPoint (GetCheckPoint)
|
Mendapatkan checkpoint satu atau semua shard untuk consumer group tertentu.
|
acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${LogStoreName}/consumergroup/${consumerGroupName}
|
Sebagai contoh, untuk memberikan izin kepada RAM user untuk consumer group dengan detail berikut, gunakan kebijakan di bawah ini.
-
Akun Alibaba Cloud: 174649****602745.
-
ID wilayah: cn-hangzhou.
-
Nama proyek: project-test.
-
Nama Logstore: logstore-test.
-
Nama consumer group: consumergroup-test.
Contoh kebijakan:
{
"Version": "1",
"Statement": [
{
"Effect": "Allow",
"Action": [
"log:GetCursorOrData"
],
"Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test"
},
{
"Effect": "Allow",
"Action": [
"log:CreateConsumerGroup",
"log:ListConsumerGroup"
],
"Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/*"
},
{
"Effect": "Allow",
"Action": [
"log:ConsumerGroupUpdateCheckPoint",
"log:ConsumerGroupHeartBeat",
"log:UpdateConsumerGroup",
"log:GetConsumerGroupCheckPoint"
],
"Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/consumergroup-test"
}
]
}
-
Troubleshooting
Konfigurasikan Log4j untuk aplikasi konsumen Anda agar mencatat pengecualian dari consumer group. Hal ini membantu Anda menemukan dan mendiagnosis masalah. Kode berikut menunjukkan contoh konfigurasi log4j.properties:
log4j.rootLogger = info,stdout
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n
Setelah mengonfigurasi Log4j, Anda akan melihat pesan pengecualian seperti contoh berikut saat menjalankan aplikasi konsumen:
[WARN ] 2018-03-14 12:01:52,747 method:com.aliyun.openservices.loghub.client.LogHubConsumer.sampleLogError(LogHubConsumer.java:159)
com.aliyun.openservices.log.exception.LogException: Invalid loggroup count, (0,1000]
-
Consume data from a specific time
// Parameter consumerStartTimeInSeconds menentukan waktu mulai konsumsi.
public LogHubConfig(String consumerGroupName,
String consumerName,
String loghubEndPoint,
String project, String logStore,
String accessId, String accessKey,
int consumerStartTimeInSeconds);
// Parameter position adalah enumerasi. LogHubConfig.ConsumePosition.BEGIN_CURSOR memulai konsumsi dari data paling awal, dan LogHubConfig.ConsumePosition.END_CURSOR memulai dari data terbaru.
public LogHubConfig(String consumerGroupName,
String consumerName,
String loghubEndPoint,
String project, String logStore,
String accessId, String accessKey,
ConsumePosition position);
Catatan
-
Gunakan konstruktor yang sesuai dengan kebutuhan konsumsi Anda.
-
Jika checkpoint sudah disimpan di server, konsumsi dilanjutkan dari checkpoint yang tersimpan, terlepas dari pengaturan lainnya.
-
Secara default, Simple Log Service memprioritaskan checkpoint untuk konsumsi data. Jika Anda menentukan waktu mulai, pastikan nilai consumerStartTimeInSeconds berada dalam periode waktu hidup (TTL). Jika tidak, pengaturan tersebut tidak berpengaruh.
-
Reset a checkpoint
public static void updateCheckpoint() throws Exception {
Client client = new Client(host, accessId, accessKey);
// Stempel waktu harus berupa stempel waktu UNIX dalam detik. Jika stempel waktu Anda dalam milidetik, bagi dengan 1000 seperti pada contoh.
long timestamp = Timestamp.valueOf("2017-11-15 00:00:00").getTime() / 1000;
ListShardResponse response = client.ListShard(new ListShardRequest(project, logStore));
for (Shard shard : response.GetShards()) {
int shardId = shard.GetShardId();
String cursor = client.GetCursor(project, logStore, shardId, timestamp).GetCursor();
client.UpdateCheckPoint(project, logStore, consumerGroup, shardId, cursor);
}
}
References
-
API
-
SDK
|
Language
|
Documentation
|
|
Java
|
|
|
Python
|
|
-
CLI