All Products
Search
Document Center

Simple Log Service:Mengonsumsi log dengan kelompok konsumen

Last Updated:Mar 26, 2026

Mengonsumsi data secara langsung dari Simple Log Service menggunakan SDK sering kali tidak mencukupi untuk tugas kompleks seperti load balancing dan failover, terutama saat mengintegrasikan dengan perangkat lunak pihak ketiga, aplikasi multi-bahasa, layanan cloud, atau framework komputasi aliran. Dalam kasus tersebut, gunakan consumer groups untuk mengonsumsi data. Consumer groups menyediakan konsumsi hampir real-time dengan latensi biasanya dalam hitungan detik. Topik ini menjelaskan cara mengonsumsi data menggunakan consumer groups.

Overview

Sebuah Logstore berisi beberapa shard. Saat Anda menggunakan consumer group untuk mengonsumsi data, shard diberikan kepada konsumen dalam grup tersebut sebagai berikut:

  • Dalam satu consumer group, setiap shard hanya diberikan kepada satu konsumen.

  • Dalam satu consumer group, satu konsumen dapat diberikan beberapa shard.

Saat konsumen baru bergabung ke dalam consumer group, shard akan ditugaskan ulang untuk menyeimbangkan beban. Penugasan ulang ini mengikuti prinsip yang dijelaskan di atas.

image

Key concepts

Term

Description

consumer group

Consumer group terdiri dari beberapa konsumen yang bekerja sama untuk memproses data dari sebuah Logstore. Konsumen dalam grup yang sama berbagi workload, memastikan bahwa setiap catatan data diproses hanya oleh satu konsumen dalam grup tersebut.

Penting

Anda dapat membuat hingga 30 consumer group untuk setiap Logstore.

consumer

Unit dalam consumer group yang mengonsumsi data.

Penting

Konsumen dalam consumer group yang sama harus memiliki nama unik.

Logstore

Unit dasar untuk pengumpulan data, penyimpanan, dan kueri. Untuk informasi lebih lanjut, lihat Logstore.

shard

Unit dasar Logstore yang menyediakan kapasitas baca dan tulis tetap. Semua data dalam Logstore disimpan dalam shard. Untuk informasi lebih lanjut, lihat shard.

checkpoint

Penanda yang menunjukkan posisi terakhir yang telah dikonsumsi oleh konsumen. Setelah restart, konsumen menggunakan checkpoint untuk melanjutkan konsumsi.

Catatan

Saat menggunakan consumer group, checkpoint disimpan secara otomatis jika proses konsumen gagal. Setelah pemulihan, proses melanjutkan konsumsi dari checkpoint yang tersimpan untuk mencegah konsumsi duplikat.

Prerequisites

Step 1: Create a consumer group

Bagian ini menunjukkan cara membuat consumer group menggunakan SDK, API, atau CLI.

SDK

Kode berikut menunjukkan cara membuat consumer group:

CreateConsumerGroup.java

import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.exception.LogException;

public class CreateConsumerGroup {
    public static void main(String[] args) throws LogException {
         // Contoh ini mengambil ID AccessKey dan Rahasia AccessKey dari variabel lingkungan.
        String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        // Masukkan nama proyek.
        String projectName = "ali-test-project";
        // Masukkan nama Logstore.
        String logstoreName = "ali-test-logstore";
        // Tetapkan titik akhir untuk Simple Log Service. Contoh ini menggunakan titik akhir wilayah China (Hangzhou). Ganti dengan titik akhir yang sebenarnya.
        String host = "https://cn-hangzhou.log.aliyuncs.com";

        // Buat klien Simple Log Service.
        Client client = new Client(host, accessId, accessKey);

        try {
            // Tetapkan nama consumer group.
            String consumerGroupName = "ali-test-consumergroup2";
            System.out.println("ready to create consumergroup");

            ConsumerGroup consumerGroup = new ConsumerGroup(consumerGroupName, 300, true);

            client.CreateConsumerGroup(projectName, logstoreName, consumerGroup);

            System.out.println(String.format("create consumergroup %s success", consumerGroupName));

        } catch (LogException e) {
            System.out.println("LogException e :" + e.toString());
            System.out.println("error code :" + e.GetErrorCode());
            System.out.println("error message :" + e.GetErrorMessage());
            throw e;
        }
    }
}

Untuk contoh kode lainnya, lihat Use the Java SDK to manage consumer groups dan Use the Log Service SDK for Python to manage consumer groups.

API

Untuk membuat consumer group, panggil operasi CreateConsumerGroup.

Untuk memeriksa apakah consumer group telah dibuat, panggil operasi ListConsumerGroup.

CLI

Untuk membuat consumer group, jalankan perintah create_consumer_group.

Untuk memeriksa apakah consumer group telah dibuat, jalankan perintah list_consumer_group.

Step 2: Consume logs

How it works

Saat konsumen pertama kali dijalankan, SDK akan membuat consumer group jika belum ada. Pengaturan start consumption checkpoint menentukan posisi awal dan hanya berlaku saat consumer group pertama kali dibuat. Pada restart berikutnya, konsumen melanjutkan dari checkpoint konsumsi terakhir yang disimpan di server. Contohnya:

  • LogHubConfig.ConsumePosition.BEGIN_CURSOR: Consumer group memulai konsumsi dari log paling awal yang tersedia di Logstore.

  • LogHubConfig.ConsumePosition.END_CURSOR: Consumer group hanya memproses log baru yang tiba setelah konsumen dijalankan.

Consumption examples

Anda dapat menggunakan SDK untuk Java, C++, Python, dan Go untuk mengonsumsi data dari consumer group. Bagian ini memberikan contoh menggunakan Java SDK.

SDK

  1. Tambahkan dependensi Maven.

    Dalam file pom.xml Anda, tambahkan dependensi berikut:

    <dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
      <version>2.5.0</version>
    </dependency>
    <dependency>
      <groupId>com.aliyun.openservices</groupId>
      <artifactId>loghub-client-lib</artifactId>
      <version>0.6.50</version>
    </dependency>
  2. Buat logika konsumsi. Kode berikut merupakan contohnya:

    SampleLogHubProcessor.java

    import com.aliyun.openservices.log.common.FastLog;
    import com.aliyun.openservices.log.common.FastLogContent;
    import com.aliyun.openservices.log.common.FastLogGroup;
    import com.aliyun.openservices.log.common.FastLogTag;
    import com.aliyun.openservices.log.common.LogGroupData;
    import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    
    import java.util.List;
    
    public class SampleLogHubProcessor implements ILogHubProcessor {
        private int shardId;
        // Melacak stempel waktu penyimpanan checkpoint terakhir.
        private long mLastSaveTime = 0;
    
        // Dipanggil saat prosesor diinisialisasi.
        public void initialize(int shardId) {
            this.shardId = shardId;
        }
    
        // Logika utama konsumsi data. Tangani semua pengecualian dalam metode ini; jangan lemparkan kembali.
        public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) {
            // Cetak data yang diambil.
            for (LogGroupData logGroup : logGroups) {
                FastLogGroup fastLogGroup = logGroup.GetFastLogGroup();
                System.out.println("Tags");
                for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) {
                    FastLogTag logTag = fastLogGroup.getLogTags(i);
                    System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue());
                }
                for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) {
                    FastLog log = fastLogGroup.getLogs(i);
                    System.out.println("--------\nLog: " + i + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
                    for (int j = 0; j < log.getContentsCount(); ++j) {
                        FastLogContent content = log.getContents(j);
                        System.out.println(content.getKey() + "\t:\t" + content.getValue());
                    }
                }
            }
            long curTime = System.currentTimeMillis();
            // Simpan checkpoint ke server setiap 30 detik. Jika worker berhenti secara tak terduga, worker baru melanjutkan konsumsi dari checkpoint terakhir, yang dapat menyebabkan sejumlah kecil data dikonsumsi ulang.
            try {
                if (curTime - mLastSaveTime > 30 * 1000) {
                    // <code>true
    false

    Untuk contoh kode lainnya, lihat aliyun-log-consumer-java dan Aliyun LOG Go Consumer.

  3. Buat factory instance konsumen. Kode berikut merupakan contohnya:

    SampleLogHubProcessorFactory.java

    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
    
    class SampleLogHubProcessorFactory implements ILogHubProcessorFactory {
        public ILogHubProcessor generatorProcessor() {
            // Menghasilkan instance konsumen. Catatan: Metode ini harus mengembalikan objek SampleLogHubProcessor baru pada setiap pemanggilan.
            return new SampleLogHubProcessor();
        }
    }
  4. Buat konsumen dan mulai thread konsumen untuk mengonsumsi data dari Logstore yang ditentukan. Kode berikut merupakan contohnya:

    Main.java

    import com.aliyun.openservices.loghub.client.ClientWorker;
    import com.aliyun.openservices.loghub.client.config.LogHubConfig;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
    
    public class Main {
        // Titik akhir untuk Simple Log Service. Ganti ini dengan titik akhir aktual Anda. Contoh ini menggunakan titik akhir untuk Wilayah Tiongkok (Hangzhou).
        private static String Endpoint = "cn-hangzhou.log.aliyuncs.com";
        // Nama proyek Anda.
        private static String Project = "ali-test-project";
        // Nama Logstore Anda.
        private static String Logstore = "ali-test-logstore";
        // Nama kelompok konsumen Anda. Anda tidak perlu membuat kelompok konsumen terlebih dahulu. Program akan membuatnya secara otomatis saat waktu proses.
        private static String ConsumerGroup = "ali-test-consumergroup2";
        // Mengambil ID AccessKey dan Rahasia AccessKey dari variabel lingkungan.
        private static String AccessKeyId= System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    
        public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
            // 'consumer_1' adalah nama konsumen, yang harus unik dalam satu kelompok konsumen. Untuk menyeimbangkan konsumsi dari Logstore di beberapa mesin, gunakan pengidentifikasi unik seperti alamat IP mesin untuk setiap konsumen.
            // maxFetchLogGroupSize: Jumlah maksimum kelompok log yang diambil per permintaan. Nilai default direkomendasikan. Untuk mengubahnya, gunakan config.setMaxFetchLogGroupSize(100). Nilai harus berada dalam rentang (0, 1000].
            LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR,1000);
            ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
            Thread thread = new Thread(worker);
            // Thread memulai ClientWorker, yang mengimplementasikan antarmuka Runnable dan berjalan secara otomatis.
            thread.start();
            Thread.sleep(60 * 60 * 1000);
            // Menghentikan instans konsumen dengan memanggil <code>worker.shutdown()
  5. Jalankan Main.java.

    Sebagai contoh, saat mengonsumsi log NGINX, outputnya menyerupai berikut:

    :    GET
    request_uri    :    /request/path-3/file-7
    status    :    200
    body_bytes_sent    :    3820
    host    :    www.example.com
    request_time    :    43
    request_length    :    1987
    http_user_agent    :    Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36
    http_referer    :    www.example.com
    http_x_forwarded_for    :    192.168.10.196
    upstream_response_time    :    0.02
    --------
    Log: 158, time: 1635629778, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log
        machineUUID    :    null
    Tags
        __receive_time__    :    1635629815
    --------
    Log: 0, time: 1635629788, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log
        machineUUID    :    null
    Tags
        __receive_time__    :    1635629877
    --------
    ......

SDK and SPL

  1. Tambahkan dependensi Maven.

    Dalam file pom.xml Anda, tambahkan dependensi berikut:

    <dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
      <version>2.5.0</version>
    </dependency>
    <dependency>
      <groupId>com.aliyun.openservices</groupId>
      <artifactId>loghub-client-lib</artifactId>
      <version>0.6.50</version>
    </dependency>
  2. Buat logika konsumsi. Kode berikut merupakan contohnya:

    SPLLogHubProcessor.java

    import com.aliyun.openservices.log.common.FastLog;
    import com.aliyun.openservices.log.common.FastLogContent;
    import com.aliyun.openservices.log.common.FastLogGroup;
    import com.aliyun.openservices.log.common.FastLogTag;
    import com.aliyun.openservices.log.common.LogGroupData;
    import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    
    import java.util.List;
    
    public class SPLLogHubProcessor implements ILogHubProcessor {
        private int shardId;
        // Melacak stempel waktu penyimpanan checkpoint terakhir.
        private long mLastSaveTime = 0;
    
        // Dipanggil saat prosesor diinisialisasi.
        public void initialize(int shardId) {
            this.shardId = shardId;
        }
    
        // Logika utama konsumsi data. Tangani semua pengecualian dalam metode ini; jangan lemparkan kembali.
        public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) {
            // Cetak data yang diambil.
            for (LogGroupData logGroup : logGroups) {
                FastLogGroup fastLogGroup = logGroup.GetFastLogGroup();
                System.out.println("Tags");
                for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) {
                    FastLogTag logTag = fastLogGroup.getLogTags(i);
                    System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue());
                }
                for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) {
                    FastLog log = fastLogGroup.getLogs(i);
                    System.out.println("--------\nLog: " + i + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
                    for (int j = 0; j < log.getContentsCount(); ++j) {
                        FastLogContent content = log.getContents(j);
                        System.out.println(content.getKey() + "\t:\t" + content.getValue());
                    }
                }
            }
            long curTime = System.currentTimeMillis();
            // Simpan checkpoint ke server setiap 30 detik. Jika worker berhenti secara tak terduga, worker baru melanjutkan konsumsi dari checkpoint terakhir, yang dapat menyebabkan sejumlah kecil data dikonsumsi ulang.
            try {
                if (curTime - mLastSaveTime > 30 * 1000) {
                    // <code>true
    false
  3. Buat factory instance konsumen. Kode berikut merupakan contohnya:

    SPLLogHubProcessorFactory.java

    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
    
    class SPLLogHubProcessorFactory implements ILogHubProcessorFactory {
        public ILogHubProcessor generatorProcessor() {
            // Menghasilkan instance konsumen. Catatan: Metode ini harus mengembalikan objek SPLLogHubProcessor baru pada setiap pemanggilan.
            return new SPLLogHubProcessor();
        }
    }
  4. Buat konsumen dan mulai thread konsumen untuk mengonsumsi data dari Logstore yang ditentukan. Kode berikut merupakan contohnya:

    Main.java

    import com.aliyun.openservices.loghub.client.ClientWorker;
    import com.aliyun.openservices.loghub.client.config.LogHubConfig;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
    
    public class Main {
        // Titik akhir untuk Simple Log Service. Ganti dengan titik akhir Anda yang sebenarnya. Contoh ini menggunakan titik akhir wilayah China (Hangzhou).
        private static String Endpoint = "cn-hangzhou.log.aliyuncs.com";
        // Nama proyek Anda.
        private static String Project = "ali-test-project";
        // Nama Logstore Anda.
        private static String Logstore = "ali-test-logstore";
        // Nama consumer group Anda. Anda tidak perlu membuat consumer group terlebih dahulu. Program akan membuatnya secara otomatis saat runtime.
        private static String ConsumerGroup = "ali-test-consumergroup2";
        // Mengambil ID AccessKey dan Rahasia AccessKey dari variabel lingkungan.
        private static String AccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    
    
        public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
            // 'consumer_1' adalah nama konsumen, yang harus unik dalam satu consumer group. Untuk menyeimbangkan konsumsi dari Logstore di beberapa mesin, gunakan pengenal unik seperti alamat IP mesin untuk setiap konsumen.
            // maxFetchLogGroupSize: Jumlah maksimum grup log yang diambil per permintaan. Nilai default direkomendasikan. Untuk mengubahnya, gunakan config.setMaxFetchLogGroupSize(100). Nilainya harus dalam rentang (0, 1000].
            LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR, 1000);
            // Menentukan kueri Simple Log Service Processing Language (SPL) untuk konsumsi.
            config.setQuery("* | where cast(body_bytes_sent as bigint) > 14000");
            ClientWorker worker = new ClientWorker(new SPLLogHubProcessorFactory(), config);
            Thread thread = new Thread(worker);
            // Thread memulai ClientWorker, yang mengimplementasikan antarmuka Runnable dan berjalan secara otomatis.
            thread.start();
            Thread.sleep(60 * 60 * 1000);
            // Hentikan instance konsumen dengan memanggil <code>worker.shutdown()
  5. Jalankan Main.java.

    Sebagai contoh, saat mengonsumsi log NGINX, outputnya menyerupai berikut:

    :    GET
    request_uri    :    /request/path-3/file-7
    status    :    200
    body_bytes_sent    :    3820
    host    :    www.example.com
    request_time    :    43
    request_length    :    1987
    http_user_agent    :    Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36
    http_referer    :    www.example.com
    http_x_forwarded_for    :    192.168.10.196
    upstream_response_time    :    0.02
    --------
    Log: 158, time: 1635629778, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log
        machineUUID    :    null
    Tags
        __receive_time__    :    1635629815
    --------
    Log: 0, time: 1635629788, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log
        machineUUID    :    null
    Tags
        __receive_time__    :    1635629877
    --------
    ......

Step 3: View consumer group status

Bagian ini menjelaskan dua cara untuk melihat status consumer group.

Java SDK

  1. Contoh kode berikut menunjukkan cara melihat checkpoint konsumsi untuk setiap shard:

    ConsumerGroupTest.java

    import java.util.List;
    import com.aliyun.openservices.log.Client;
    import com.aliyun.openservices.log.common.Consts.CursorMode;
    import com.aliyun.openservices.log.common.ConsumerGroup;
    import com.aliyun.openservices.log.common.ConsumerGroupShardCheckPoint;
    import com.aliyun.openservices.log.exception.LogException;
    public class ConsumerGroupTest {
        static String endpoint = "cn-hangzhou.log.aliyuncs.com";
        static String project = "ali-test-project";
        static String logstore = "ali-test-logstore";
        static String accesskeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        static String accesskey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        public static void main(String[] args) throws LogException {
            Client client = new Client(endpoint, accesskeyId, accesskey);
            // Dapatkan semua consumer group dalam Logstore. Mengembalikan daftar kosong jika tidak ada consumer group.
            List<ConsumerGroup> consumerGroups = client.ListConsumerGroup(project, logstore).GetConsumerGroups();
            for(ConsumerGroup c: consumerGroups){
                // Cetak properti consumer group: nama, timeout heartbeat, dan status konsumsi terurut.
                System.out.println("Name: " + c.getConsumerGroupName());
                System.out.println("Heartbeat timeout: " + c.getTimeout());
                System.out.println("Ordered consumption: " + c.isInOrder());
                for(ConsumerGroupShardCheckPoint cp: client.GetCheckPoint(project, logstore, c.getConsumerGroupName()).GetCheckPoints()){
                    System.out.println("shard: " + cp.getShard());
                    // Stempel waktu dalam mikrodetik (long).
                    System.out.println("Last checkpoint update time: " + cp.getUpdateTime());
                    System.out.println("Consumer name: " + cp.getConsumer());
                    String consumerPrg = "";
                    if(cp.getCheckPoint().isEmpty())
                        consumerPrg = "Consumption has not started";
                    else{
                        // Stempel waktu UNIX dalam detik. Catatan: Format nilai ini untuk output.
                        try{
                            int prg = client.GetPrevCursorTime(project, logstore, cp.getShard(), cp.getCheckPoint()).GetCursorTime();
                            consumerPrg = "" + prg;
                        }
                        catch(LogException e){
                            if(e.GetErrorCode().equals("InvalidCursor"))
                                // Checkpoint tidak valid karena lebih lama dari periode retensi Logstore.
                                consumerPrg = "Invalid. The last consumption checkpoint is older than the Logstore's retention period.";
                            else{
                                // internal server error
                                throw e;
                            }
                        }
                    }
                    System.out.println("Consumption checkpoint: " + consumerPrg);
                    String endCursor = client.GetCursor(project, logstore, cp.getShard(), CursorMode.END).GetCursor();
                    int endPrg = 0;
                    try{
                        endPrg = client.GetPrevCursorTime(project, logstore, cp.getShard(), endCursor).GetCursorTime();
                    }
                    catch(LogException e){
                        // do nothing
                    }
                    // Stempel waktu UNIX dalam detik. Catatan: Format nilai ini untuk output.
                    System.out.println("Latest data arrival time: " + endPrg);
                }
            }
        }
    }
  2. Output berikut dikembalikan:

    Name: ali-test-consumergroup2
    Heartbeat timeout: 60
    Ordered consumption: false
    shard: 0
    Last checkpoint update time: 0
    Consumer name: consumer_1
    Consumption checkpoint: Consumption has not started
    Latest data arrival time: 1729583617
    shard: 1
    Last checkpoint update time: 0
    Consumer name: consumer_1
    Consumption checkpoint: Consumption has not started
    Latest data arrival time: 1729583738
    
    Process finished with exit code 0

Console

  1. Login ke Simple Log Service console.

  2. Pada bagian Projects, klik proyek yang diinginkan.

    image

  3. Pada tab Log Storage > Logstores, klik ikon 展开节点 di samping Logstore target, lalu klik ikon 展开节点 di samping data consumption.

  4. Dalam daftar consumer group, klik consumer group target.

  5. Pada halaman Consumer Group Status, Anda dapat melihat checkpoint konsumsi untuk setiap shard.image

Related operations

  • Authorize a RAM user

    Untuk mengelola consumer group dengan RAM user, berikan izin yang diperlukan kepada pengguna tersebut. Untuk informasi lebih lanjut, lihat Create a RAM user and grant permissions.

    Tabel berikut mencantumkan tindakan yang diperlukan.

    Action

    Description

    Resource

    log:GetCursorOrData (GetCursor)

    Mendapatkan cursor berdasarkan waktu tertentu.

    acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${LogStoreName}

    log:CreateConsumerGroup (CreateConsumerGroup)

    Membuat consumer group untuk Logstore tertentu.

    acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${LogStoreName}/consumergroup/${consumerGroupName}

    log:ListConsumerGroup (ListConsumerGroup)

    Menampilkan semua consumer group untuk Logstore tertentu.

    acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${LogStoreName}/consumergroup/*

    log:ConsumerGroupUpdateCheckPoint (ConsumerGroupUpdateCheckPoint)

    Memperbarui checkpoint shard untuk consumer group tertentu.

    acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${LogStoreName}/consumergroup/${consumerGroupName}

    log:ConsumerGroupHeartBeat (ConsumerGroupHeartBeat)

    Mengirim heartbeat dari konsumen ke server.

    acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${LogStoreName}/consumergroup/${consumerGroupName}

    log:UpdateConsumerGroup (UpdateConsumerGroup)

    Memodifikasi properti consumer group tertentu.

    acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${LogStoreName}/consumergroup/${consumerGroupName}

    log:GetConsumerGroupCheckPoint (GetCheckPoint)

    Mendapatkan checkpoint satu atau semua shard untuk consumer group tertentu.

    acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${LogStoreName}/consumergroup/${consumerGroupName}

    Sebagai contoh, untuk memberikan izin kepada RAM user untuk consumer group dengan detail berikut, gunakan kebijakan di bawah ini.

    • Akun Alibaba Cloud: 174649****602745.

    • ID wilayah: cn-hangzhou.

    • Nama proyek: project-test.

    • Nama Logstore: logstore-test.

    • Nama consumer group: consumergroup-test.

    Contoh kebijakan:

    {
      "Version": "1",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": [
            "log:GetCursorOrData"
          ],
          "Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test"
        },
        {
          "Effect": "Allow",
          "Action": [
            "log:CreateConsumerGroup",
            "log:ListConsumerGroup"
          ],
          "Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/*"
        },
        {
          "Effect": "Allow",
          "Action": [
            "log:ConsumerGroupUpdateCheckPoint",
            "log:ConsumerGroupHeartBeat",
            "log:UpdateConsumerGroup",
            "log:GetConsumerGroupCheckPoint"
          ],
          "Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/consumergroup-test"
        }
      ]
    }
  • Troubleshooting

    Konfigurasikan Log4j untuk aplikasi konsumen Anda agar mencatat pengecualian dari consumer group. Hal ini membantu Anda menemukan dan mendiagnosis masalah. Kode berikut menunjukkan contoh konfigurasi log4j.properties:

    log4j.rootLogger = info,stdout
    log4j.appender.stdout = org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.Target = System.out
    log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n

    Setelah mengonfigurasi Log4j, Anda akan melihat pesan pengecualian seperti contoh berikut saat menjalankan aplikasi konsumen:

    [WARN ] 2018-03-14 12:01:52,747 method:com.aliyun.openservices.loghub.client.LogHubConsumer.sampleLogError(LogHubConsumer.java:159)
    com.aliyun.openservices.log.exception.LogException: Invalid loggroup count, (0,1000]
  • Consume data from a specific time

    // Parameter consumerStartTimeInSeconds menentukan waktu mulai konsumsi.
    public LogHubConfig(String consumerGroupName, 
                          String consumerName, 
                          String loghubEndPoint,
                          String project, String logStore,
                          String accessId, String accessKey,
                          int consumerStartTimeInSeconds);
    
    // Parameter position adalah enumerasi. LogHubConfig.ConsumePosition.BEGIN_CURSOR memulai konsumsi dari data paling awal, dan LogHubConfig.ConsumePosition.END_CURSOR memulai dari data terbaru.
    public LogHubConfig(String consumerGroupName, 
                          String consumerName, 
                          String loghubEndPoint,
                          String project, String logStore,
                          String accessId, String accessKey,
                          ConsumePosition position);
    Catatan
    • Gunakan konstruktor yang sesuai dengan kebutuhan konsumsi Anda.

    • Jika checkpoint sudah disimpan di server, konsumsi dilanjutkan dari checkpoint yang tersimpan, terlepas dari pengaturan lainnya.

    • Secara default, Simple Log Service memprioritaskan checkpoint untuk konsumsi data. Jika Anda menentukan waktu mulai, pastikan nilai consumerStartTimeInSeconds berada dalam periode waktu hidup (TTL). Jika tidak, pengaturan tersebut tidak berpengaruh.

  • Reset a checkpoint

    public static void updateCheckpoint() throws Exception {
            Client client = new Client(host, accessId, accessKey);
            // Stempel waktu harus berupa stempel waktu UNIX dalam detik. Jika stempel waktu Anda dalam milidetik, bagi dengan 1000 seperti pada contoh.
            long timestamp = Timestamp.valueOf("2017-11-15 00:00:00").getTime() / 1000;
            ListShardResponse response = client.ListShard(new ListShardRequest(project, logStore));
            for (Shard shard : response.GetShards()) {
                int shardId = shard.GetShardId();
                String cursor = client.GetCursor(project, logStore, shardId, timestamp).GetCursor();
                client.UpdateCheckPoint(project, logStore, consumerGroup, shardId, cursor);
            }
        }

References