全部产品
Search
文档中心

Simple Log Service:Gunakan kelompok konsumen untuk mengonsumsi log

更新时间:Jul 06, 2025

Layanan Log Sederhana memungkinkan perangkat lunak pihak ketiga, aplikasi dalam berbagai bahasa pemrograman, layanan cloud, dan kerangka komputasi aliran untuk mengonsumsi data secara real-time dengan memanggil SDK Layanan Log Sederhana. Namun, konsumsi berbasis SDK tidak dapat memenuhi persyaratan untuk detail implementasi tertentu, seperti load balancing dan failover di antara konsumen. Dalam kasus ini, Anda dapat membuat kelompok konsumen untuk mengonsumsi data dalam hitungan detik. Topik ini menjelaskan cara menggunakan kelompok konsumen untuk mengonsumsi data.

Ikhtisar

Penyimpanan log terdiri dari beberapa shard. Layanan Log Sederhana mengalokasikan shard ke konsumen dalam kelompok konsumen berdasarkan aturan berikut:

  • Setiap shard hanya dapat dialokasikan ke satu konsumen.

  • Seorang konsumen dapat mengonsumsi data dari beberapa shard.

Ketika konsumen baru ditambahkan ke kelompok konsumen, shard yang telah dialokasikan ke konsumen dalam kelompok akan dialokasikan ulang untuk load balancing. Shard dialokasikan ulang sesuai aturan sebelumnya.

Istilah

Istilah

Deskripsi

kelompok konsumen

Anda dapat menggunakan kelompok konsumen untuk mengonsumsi data di Layanan Log Sederhana. Kelompok konsumen terdiri dari beberapa konsumen. Semua konsumen dalam kelompok konsumen mengonsumsi data di penyimpanan log yang sama. Konsumen tidak mengonsumsi data secara berulang.

Penting

Anda dapat membuat hingga 30 kelompok konsumen untuk sebuah penyimpanan log.

konsumen

Konsumen dalam kelompok konsumen mengonsumsi data.

Penting

Nama konsumen dalam kelompok konsumen harus unik.

Logstore

Penyimpanan log digunakan untuk mengumpulkan, menyimpan, dan menanyakan data. Untuk informasi lebih lanjut, lihat Logstore.

shard

Shard digunakan untuk mengontrol kapasitas baca dan tulis penyimpanan log. Di Layanan Log Sederhana, data disimpan dalam shard. Untuk informasi lebih lanjut, lihat Shard.

checkpoint

Checkpoint konsumsi adalah posisi di mana program berhenti mengonsumsi data. Jika program dimulai ulang, program akan mengonsumsi data dari checkpoint konsumsi terakhir.

Catatan

Jika Anda menggunakan kelompok konsumen untuk mengonsumsi data, Layanan Log Sederhana secara otomatis menyimpan checkpoint konsumsi saat terjadi kesalahan dalam program Anda. Konsumen dapat melanjutkan konsumsi data dari checkpoint konsumsi tanpa mengonsumsi data secara berulang setelah program pulih.

Prasyarat

Langkah 1: Buat kelompok konsumen

Bagian ini menjelaskan cara menggunakan SDK Layanan Log Sederhana, API Layanan Log Sederhana, dan CLI Layanan Log Sederhana untuk membuat kelompok konsumen.

Gunakan SDK Layanan Log Sederhana

Contoh kode:

CreateConsumerGroup.java

import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.exception.LogException;

public class CreateConsumerGroup {
    public static void main(String[] args) throws LogException {
         // Dalam contoh ini, ID AccessKey dan rahasia AccessKey diperoleh dari variabel lingkungan.
        String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        // Nama proyek.
        String projectName = "ali-test-project";
        // Nama penyimpanan log.
        String logstoreName = "ali-test-logstore";
        // Titik akhir Layanan Log Sederhana. Dalam contoh ini, titik akhir Layanan Log Sederhana untuk wilayah China (Hangzhou) digunakan. Ganti nilai parameter dengan titik akhir yang sebenarnya.
        String host = "https://cn-hangzhou.log.aliyuncs.com";

        // Buat klien Layanan Log Sederhana.
        Client client = new Client(host, accessId, accessKey);

        try {
            // Nama kelompok konsumen.
            String consumerGroupName = "ali-test-consumergroup2";
            System.out.println("siap untuk membuat consumergroup");

            ConsumerGroup consumerGroup = new ConsumerGroup(consumerGroupName, 300, true);

            client.CreateConsumerGroup(projectName, logstoreName, consumerGroup);

            System.out.println(String.format("buat consumergroup %s berhasil", consumerGroupName));

        } catch (LogException e) {
            System.out.println("LogException e :" + e.toString());
            System.out.println("kode kesalahan :" + e.GetErrorCode());
            System.out.println("pesan kesalahan :" + e.GetErrorMessage());
            throw e;
        }
    }
}

Untuk informasi lebih lanjut tentang contoh kode yang digunakan untuk mengelola kelompok konsumen, lihat Gunakan SDK Layanan Log Sederhana untuk Java untuk mengelola kelompok konsumen dan Gunakan SDK Layanan Log Sederhana untuk Python untuk mengelola kelompok konsumen.

Gunakan API Layanan Log Sederhana

Untuk informasi lebih lanjut tentang cara menggunakan API Layanan Log Sederhana untuk membuat kelompok konsumen, lihat CreateConsumerGroup.

Untuk informasi lebih lanjut tentang cara memeriksa apakah kelompok konsumen telah dibuat, lihat ListConsumerGroup.

Gunakan CLI Layanan Log Sederhana

Untuk informasi lebih lanjut tentang cara menggunakan CLI Layanan Log Sederhana untuk membuat kelompok konsumen, lihat create_consumer_group.

Untuk informasi lebih lanjut tentang cara memeriksa apakah kelompok konsumen telah dibuat, lihat list_consumer_group.

Langkah 2: Konsumsi data log

Cara kerjanya

Pertama kali Anda memanggil SDK Layanan Log Sederhana untuk Java untuk memulai konsumen, SDK akan membuat kelompok konsumen jika tidak menemukan kelompok konsumen tempat konsumen tersebut milik. Setelah kelompok konsumen dibuat, SDK mencatat checkpoint konsumsi awal dan mulai mengonsumsi data dari checkpoint tersebut. Checkpoint konsumsi awal menjadi tidak valid setelah konsumsi pertama kali. Saat konsumen dimulai ulang, konsumen melanjutkan konsumsi data dari checkpoint konsumsi terakhir yang disimpan oleh Layanan Log Sederhana. Contoh checkpoint konsumsi:

  • LogHubConfig.ConsumePosition.BEGIN_CURSOR: checkpoint konsumsi awal, yang menentukan log pertama dalam penyimpanan log. Konsumen memulai konsumsi dari data paling awal.

  • LogHubConfig.ConsumePosition.END_CURSOR: checkpoint konsumsi akhir, yang menentukan log terakhir dalam penyimpanan log.

Contoh

Anda dapat menggunakan SDK Layanan Log Sederhana untuk Java, C++, Python, atau Go untuk membuat kelompok konsumen dan mengonsumsi data. Dalam contoh ini, SDK Layanan Log Sederhana untuk Java digunakan.

Contoh 1: Gunakan SDK

  1. Tambahkan dependensi Maven.

    Buka file pom.xml dan tambahkan kode berikut:

    <dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
      <version>2.5.0</version>
    </dependency>
    <dependency>
      <groupId>com.aliyun.openservices</groupId>
      <artifactId>loghub-client-lib</artifactId>
      <version>0.6.50</version>
    </dependency>
  2. Tulis logika implementasi konsumsi data. Contoh kode:

    SampleLogHubProcessor.java

    import com.aliyun.openservices.log.common.FastLog;
    import com.aliyun.openservices.log.common.FastLogContent;
    import com.aliyun.openservices.log.common.FastLogGroup;
    import com.aliyun.openservices.log.common.FastLogTag;
    import com.aliyun.openservices.log.common.LogGroupData;
    import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    
    import java.util.List;
    
    public class SampleLogHubProcessor implements ILogHubProcessor {
        private int shardId;
        // Waktu checkpoint konsumsi terakhir disimpan.
        private long mLastSaveTime = 0;
    
        // Metode initialize dipanggil sekali saat objek prosesor diinisialisasi.
        public void initialize(int shardId) {
            this.shardId = shardId;
        }
    
        // Logika utama konsumsi data. Anda harus menyertakan kode untuk menangani semua pengecualian yang mungkin terjadi selama konsumsi data.
        public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) {
            // Tampilkan data yang diperoleh.
            for (LogGroupData logGroup : logGroups) {
                FastLogGroup fastLogGroup = logGroup.GetFastLogGroup();
                System.out.println("Tags");
                for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) {
                    FastLogTag logTag = fastLogGroup.getLogTags(i);
                    System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue());
                }
                for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) {
                    FastLog log = fastLogGroup.getLogs(i);
                    System.out.println("--------\nLog: " + i + ", waktu: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
                    for (int j = 0; j < log.getContentsCount(); ++j) {
                        FastLogContent content = log.getContents(j);
                        System.out.println(content.getKey() + "\t:\t" + content.getValue());
                    }
                }
            }
            long curTime = System.currentTimeMillis();
            // Checkpoint konsumsi ditulis ke Layanan Log Sederhana pada interval 30 detik. Jika instance ClientWorker berhenti secara tak terduga dalam 30 detik, instance ClientWorker yang baru dimulai mengonsumsi data dari checkpoint konsumsi terakhir. Sejumlah kecil data mungkin dikonsumsi berulang kali.
            try {
                if (curTime - mLastSaveTime > 30 * 1000) {
                    // Nilai true menunjukkan bahwa checkpoint konsumsi segera diperbarui ke Layanan Log Sederhana. Secara default, checkpoint konsumsi yang disimpan di memori secara otomatis diperbarui ke Layanan Log Sederhana pada interval 60 detik.
                    checkPointTracker.saveCheckPoint(true);
                    mLastSaveTime = curTime;
                } else {
                    // Nilai false menunjukkan bahwa checkpoint konsumsi disimpan secara lokal dan dapat diperbarui ke Layanan Log Sederhana menggunakan mekanisme pembaruan checkpoint konsumsi otomatis.
                    checkPointTracker.saveCheckPoint(false);
                }
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
            return null;
        }
    
        // Fungsi shutdown dari instance ClientWorker dipanggil. Anda dapat mengelola checkpoint konsumsi.
        public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
            // Segera simpan checkpoint konsumsi ke Layanan Log Sederhana.
            try {
                checkPointTracker.saveCheckPoint(true);
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
        }
    }

    Untuk informasi lebih lanjut tentang contoh kode, lihat aliyun-log-consumer-java dan Aliyun LOG Go Consumer.

  3. Definisikan entitas konsumen. Contoh kode:

    SampleLogHubProcessorFactory.java

    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
    
    class SampleLogHubProcessorFactory implements ILogHubProcessorFactory {
        public ILogHubProcessor generatorProcessor() {
            // Hasilkan konsumen. Setiap kali metode generatorProcessor dipanggil, objek SampleLogHubProcessor baru dikembalikan sesuai harapan.
            return new SampleLogHubProcessor();
        }
    }
  4. Buat kelompok konsumen dan mulai thread konsumen untuk memungkinkan konsumen dalam kelompok konsumen mengonsumsi data dalam penyimpanan log yang ditentukan. Contoh kode:

    Main.java

    import com.aliyun.openservices.loghub.client.ClientWorker;
    import com.aliyun.openservices.loghub.client.config.LogHubConfig;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
    
    public class Main {
        // Titik akhir Layanan Log Sederhana. Masukkan titik akhir berdasarkan kebutuhan bisnis Anda.
        private static String Endpoint = "cn-hangzhou.log.aliyuncs.com";
        // Nama proyek Layanan Log Sederhana. Masukkan nama berdasarkan kebutuhan bisnis Anda. Anda harus memasukkan nama proyek yang ada.
        private static String Project = "ali-test-project";
        // Nama penyimpanan log. Masukkan nama berdasarkan kebutuhan bisnis Anda. Anda harus memasukkan nama penyimpanan log yang ada.
        private static String Logstore = "ali-test-logstore";
        // Nama kelompok konsumen. Masukkan nama berdasarkan kebutuhan bisnis Anda. Anda tidak perlu membuat kelompok konsumen terlebih dahulu. Kelompok konsumen dibuat secara otomatis saat program berjalan.
        private static String ConsumerGroup = "ali-test-consumergroup2";
        // Dalam contoh ini, ID AccessKey dan rahasia AccessKey diperoleh dari variabel lingkungan.
        private static String AccessKeyId= System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    
        public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
            // consumer_1 menentukan nama konsumen. Nama setiap konsumen dalam kelompok konsumen harus unik. Jika konsumen berbeda memulai proses pada mesin berbeda untuk mengonsumsi data dalam penyimpanan log, Anda dapat menggunakan alamat IP mesin untuk mengidentifikasi setiap konsumen.
            // maxFetchLogGroupSize menentukan jumlah maksimum grup log yang dapat diperoleh dari Layanan Log Sederhana sekaligus. Pertahankan nilai default. Anda dapat menggunakan config.setMaxFetchLogGroupSize(100); untuk mengubah jumlah maksimum. Nilai valid: (0,1000].
            LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR,1000);
            ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
            Thread thread = new Thread(worker);
            // Setelah instance Thread berjalan, instance ClientWorker secara otomatis berjalan dan memperluas antarmuka Runnable.
            thread.start();
            Thread.sleep(60 * 60 * 1000);
            // Fungsi shutdown dari instance ClientWorker dipanggil untuk keluar dari konsumen. Instance Thread secara otomatis berhenti.
            worker.shutdown();
            // Beberapa tugas asinkron dihasilkan saat instance ClientWorker berjalan. Untuk memastikan bahwa semua tugas yang sedang berjalan berhenti dengan aman setelah shutdown, kami sarankan Anda mengatur Thread.sleep selama 30 detik.
            Thread.sleep(30 * 1000);
        }
    }
  5. Jalankan file Main.java.

    Dalam contoh ini, log NGINX dikonsumsi dan hasil konsumsi ditampilkan.

    :    GET
    request_uri    :    /request/path-3/file-7
    status    :    200
    body_bytes_sent    :    3820
    host    :    www.example.com
    request_time    :    43
    request_length    :    1987
    http_user_agent    :    Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36
    http_referer    :    www.example.com
    http_x_forwarded_for    :    192.168.10.196
    upstream_response_time    :    0.02
    --------
    Log: 158, waktu: 1635629778, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log
        machineUUID    :    null
    Tags
        __receive_time__    :    1635629815
    --------
    Log: 0, waktu: 1635629788, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log
        machineUUID    :    null
    Tags
        __receive_time__    :    1635629877
    --------
    ......

Contoh 2: Gunakan SDK dan SPL

  1. Tambahkan dependensi Maven.

    Buka file pom.xml dan tambahkan kode berikut:

    <dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
      <version>2.5.0</version>
    </dependency>
    <dependency>
      <groupId>com.aliyun.openservices</groupId>
      <artifactId>loghub-client-lib</artifactId>
      <version>0.6.47</version>
    </dependency>
  2. Tulis logika implementasi konsumsi data. Contoh kode:

    SPLLogHubProcessor.java

    import com.aliyun.openservices.log.common.FastLog;
    import com.aliyun.openservices.log.common.FastLogContent;
    import com.aliyun.openservices.log.common.FastLogGroup;
    import com.aliyun.openservices.log.common.FastLogTag;
    import com.aliyun.openservices.log.common.LogGroupData;
    import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    
    import java.util.List;
    
    public class SPLLogHubProcessor implements ILogHubProcessor {
        private int shardId;
        // Waktu checkpoint konsumsi terakhir disimpan.
        private long mLastSaveTime = 0;
    
        // Metode initialize dipanggil sekali saat objek prosesor diinisialisasi.
        public void initialize(int shardId) {
            this.shardId = shardId;
        }
    
        // Logika utama konsumsi data. Anda harus menyertakan kode untuk menangani semua pengecualian yang mungkin terjadi selama konsumsi data.
        public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) {
            // Tampilkan data yang diperoleh.
            for (LogGroupData logGroup : logGroups) {
                FastLogGroup fastLogGroup = logGroup.GetFastLogGroup();
                System.out.println("Tags");
                for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) {
                    FastLogTag logTag = fastLogGroup.getLogTags(i);
                    System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue());
                }
                for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) {
                    FastLog log = fastLogGroup.getLogs(i);
                    System.out.println("--------\nLog: " + i + ", waktu: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
                    for (int j = 0; j < log.getContentsCount(); ++j) {
                        FastLogContent content = log.getContents(j);
                        System.out.println(content.getKey() + "\t:\t" + content.getValue());
                    }
                }
            }
            long curTime = System.currentTimeMillis();
            // Checkpoint konsumsi ditulis ke Layanan Log Sederhana pada interval 30 detik. Jika instance ClientWorker berhenti secara tak terduga dalam 30 detik, instance ClientWorker yang baru dimulai mengonsumsi data dari checkpoint konsumsi terakhir. Sejumlah kecil data mungkin dikonsumsi berulang kali.
            try {
                if (curTime - mLastSaveTime > 30 * 1000) {
                    // Nilai true menunjukkan bahwa checkpoint konsumsi segera diperbarui ke Layanan Log Sederhana. Secara default, checkpoint konsumsi yang disimpan di memori secara otomatis diperbarui ke Layanan Log Sederhana pada interval 60 detik.
                    checkPointTracker.saveCheckPoint(true);
                    mLastSaveTime = curTime;
                } else {
                    // Nilai false menunjukkan bahwa checkpoint konsumsi disimpan secara lokal dan dapat diperbarui ke Layanan Log Sederhana menggunakan mekanisme pembaruan checkpoint konsumsi otomatis.
                    checkPointTracker.saveCheckPoint(false);
                }
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
            return null;
        }
    
        // Fungsi shutdown dari instance ClientWorker dipanggil. Anda dapat mengelola checkpoint konsumsi.
        public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
            // Segera simpan checkpoint konsumsi ke Layanan Log Sederhana.
            try {
                checkPointTracker.saveCheckPoint(true);
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
        }
    }
  3. Definisikan entitas konsumen. Contoh kode:

    SPLLogHubProcessorFactory.java

    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
    
    class SPLLogHubProcessorFactory implements ILogHubProcessorFactory {
        public ILogHubProcessor generatorProcessor() {
            // Hasilkan konsumen. Setiap kali metode generatorProcessor dipanggil, objek SPLLogHubProcessor baru dikembalikan sesuai harapan.
            return new SPLLogHubProcessor();
        }
    }
  4. Buat kelompok konsumen dan mulai thread konsumen untuk memungkinkan konsumen dalam kelompok konsumen mengonsumsi data dalam penyimpanan log yang ditentukan. Contoh kode:

    Main.java

    import com.aliyun.openservices.loghub.client.ClientWorker;
    import com.aliyun.openservices.loghub.client.config.LogHubConfig;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
    
    public class Main {
        // Titik akhir Layanan Log Sederhana. Masukkan titik akhir berdasarkan kebutuhan bisnis Anda.
        private static String Endpoint = "cn-hangzhou.log.aliyuncs.com";
        // Nama proyek Layanan Log Sederhana. Masukkan nama berdasarkan kebutuhan bisnis Anda. Anda harus memasukkan nama proyek yang ada.
        private static String Project = "ali-test-project";
        // Nama penyimpanan log. Masukkan nama berdasarkan kebutuhan bisnis Anda. Anda harus memasukkan nama penyimpanan log yang ada.
        private static String Logstore = "ali-test-logstore";
        // Nama kelompok konsumen. Masukkan nama berdasarkan kebutuhan bisnis Anda. Anda tidak perlu membuat kelompok konsumen terlebih dahulu. Kelompok konsumen dibuat secara otomatis saat program berjalan.
        private static String ConsumerGroup = "ali-test-consumergroup2";
        // Dalam contoh ini, ID AccessKey dan rahasia AccessKey diperoleh dari variabel lingkungan.
        private static String AccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    
    
        public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
            // consumer_1 menentukan nama konsumen. Nama setiap konsumen dalam kelompok konsumen harus unik. Jika konsumen berbeda memulai proses pada mesin berbeda untuk mengonsumsi data dalam penyimpanan log, Anda dapat menggunakan alamat IP mesin untuk mengidentifikasi setiap konsumen.
            // maxFetchLogGroupSize menentukan jumlah maksimum grup log yang dapat diperoleh dari Layanan Log Sederhana sekaligus. Pertahankan nilai default. Anda dapat menggunakan config.setMaxFetchLogGroupSize(100); untuk mengubah jumlah maksimum. Nilai valid: (0,1000].
            LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR, 1000);
            // Anda dapat menggunakan setQuery untuk menentukan pernyataan Bahasa Pemrosesan Log Sederhana (SPL) untuk konsumsi data.
            config.setQuery("* | where cast(body_bytes_sent as bigint) > 14000");
            ClientWorker worker = new ClientWorker(new SPLLogHubProcessorFactory(), config);
            Thread thread = new Thread(worker);
            // Setelah instance Thread berjalan, instance ClientWorker secara otomatis berjalan dan memperluas antarmuka Runnable.
            thread.start();
            Thread.sleep(60 * 60 * 1000);
            // Fungsi shutdown dari instance ClientWorker dipanggil untuk keluar dari konsumen. Instance Thread secara otomatis berhenti.
            worker.shutdown();
            // Beberapa tugas asinkron dihasilkan saat instance ClientWorker berjalan. Untuk memastikan bahwa semua tugas yang sedang berjalan berhenti dengan aman setelah shutdown, kami sarankan Anda mengatur Thread.sleep selama 30 detik.
            Thread.sleep(30 * 1000);
        }
    }
  5. Jalankan file Main.java.

    Dalam contoh ini, log NGINX dikonsumsi dan hasil konsumsi ditampilkan.

    :    GET
    request_uri    :    /request/path-3/file-7
    status    :    200
    body_bytes_sent    :    3820
    host    :    www.example.com
    request_time    :    43
    request_length    :    1987
    http_user_agent    :    Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36
    http_referer    :    www.example.com
    http_x_forwarded_for    :    192.168.10.196
    upstream_response_time    :    0.02
    --------
    Log: 158, waktu: 1635629778, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log
        machineUUID    :    null
    Tags
        __receive_time__    :    1635629815
    --------
    Log: 0, waktu: 1635629788, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log
        machineUUID    :    null
    Tags
        __receive_time__    :    1635629877
    --------
    ......

Langkah 3: Lihat status kelompok konsumen

Bagian ini menjelaskan metode yang dapat Anda gunakan untuk melihat status kelompok konsumen.

Gunakan SDK Layanan Log Sederhana untuk Java

  1. Lihat checkpoint konsumsi setiap shard. Contoh kode:

    ConsumerGroupTest.java

    import java.util.List;
    import com.aliyun.openservices.log.Client;
    import com.aliyun.openservices.log.common.Consts.CursorMode;
    import com.aliyun.openservices.log.common.ConsumerGroup;
    import com.aliyun.openservices.log.common.ConsumerGroupShardCheckPoint;
    import com.aliyun.openservices.log.exception.LogException;
    public class ConsumerGroupTest {
        static String endpoint = "cn-hangzhou.log.aliyuncs.com";
        static String project = "ali-test-project";
        static String logstore = "ali-test-logstore";
        static String accesskeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        static String accesskey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        public static void main(String[] args) throws LogException {
            Client client = new Client(endpoint, accesskeyId, accesskey);
            // Dapatkan semua kelompok konsumen yang dibuat untuk penyimpanan log. Jika tidak ada kelompok konsumen yang ada, string kosong dikembalikan.
            List<ConsumerGroup> consumerGroups = client.ListConsumerGroup(project, logstore).GetConsumerGroups();
            for(ConsumerGroup c: consumerGroups){
                // Tampilkan atribut setiap kelompok konsumen, termasuk nama, periode timeout heartbeat, dan apakah data dikonsumsi secara berurutan.
                System.out.println("Nama: " + c.getConsumerGroupName());
                System.out.println("Periode timeout heartbeat " + c.getTimeout());
                System.out.println("Konsumsi berurutan: " + c.isInOrder());
                for(ConsumerGroupShardCheckPoint cp: client.GetCheckPoint(project, logstore, c.getConsumerGroupName()).GetCheckPoints()){
                    System.out.println("shard: " + cp.getShard());
                    // Waktu adalah bilangan bulat panjang dan akurat hingga mikrodetik.
                    System.out.println("Waktu checkpoint konsumsi terakhir diperbarui: " + cp.getUpdateTime());
                    System.out.println("Nama konsumen: " + cp.getConsumer());
                    String consumerPrg = "";
                    if(cp.getCheckPoint().isEmpty())
                        consumerPrg = "Konsumsi belum dimulai";
                    else{
                        // Timestamp UNIX. Unit: detik. Format nilai output timestamp.
                        try{
                            int prg = client.GetPrevCursorTime(project, logstore, cp.getShard(), cp.getCheckPoint()).GetCursorTime();
                            consumerPrg = "" + prg;
                        }
                        catch(LogException e){
                            if(e.GetErrorCode() == "InvalidCursor")
                                consumerPrg = "Tidak valid. Waktu checkpoint konsumsi terakhir diperbarui melebihi periode retensi data";
                            else{
                                // kesalahan server internal
                                throw e;
                            }
                        }
                    }
                    System.out.println("Checkpoint konsumsi: " + consumerPrg);
                    String endCursor = client.GetCursor(project, logstore, cp.getShard(), CursorMode.END).GetCursor();
                    int endPrg = 0;
                    try{
                        endPrg = client.GetPrevCursorTime(project, logstore, cp.getShard(), endCursor).GetCursorTime();
                    }
                    catch(LogException e){
                        // abaikan
                    }
                    // Timestamp UNIX. Unit: detik. Format nilai output timestamp.
                    System.out.println("Waktu data terakhir diterima: " + endPrg);
                }
            }
        }
    }
  2. Lihat output. Contoh:

    Nama: ali-test-consumergroup2
    Periode timeout heartbeat: 60
    Konsumsi berurutan: false
    shard: 0
    Waktu checkpoint konsumsi terakhir diperbarui: 0
    Nama konsumen: consumer_1
    Checkpoint konsumsi: Konsumsi belum dimulai
    Waktu data terakhir diterima: 1729583617
    shard: 1
    Waktu checkpoint konsumsi terakhir diperbarui: 0
    Nama konsumen: consumer_1
    Checkpoint konsumsi: Konsumsi belum dimulai
    Waktu data terakhir diterima: 1729583738
    
    Proses selesai dengan kode keluar 0

Gunakan konsol Layanan Log Sederhana

  1. Masuk ke konsol Layanan Log Sederhana.

  2. Di bagian Proyek, klik yang ingin Anda kelola.

    image

  3. Pada tab Log Storage > Logstores, klik ikon 展开节点 di sebelah penyimpanan log yang ingin Anda kelola. Kemudian, klik ikon 展开节点 di sebelah Data Consumption.

  4. Dalam daftar kelompok konsumen, klik kelompok konsumen yang ingin Anda kelola.

  5. Di halaman Consumer Group Status, lihat checkpoint konsumsi setiap shard.image

Apa yang harus dilakukan selanjutnya

  • Otorisasi pengguna RAM untuk melakukan operasi pada kelompok konsumen

    Sebelum menggunakan pengguna Resource Access Management (RAM) untuk mengelola kelompok konsumen, Anda harus memberikan izin yang diperlukan kepada pengguna RAM. Untuk informasi lebih lanjut, lihat Buat pengguna RAM dan otorisasi pengguna RAM untuk mengakses Layanan Log Sederhana.

    Tabel berikut menjelaskan tindakan yang dapat Anda otorisasi kepada pengguna RAM.

    Tindakan

    Deskripsi

    Sumber daya

    log:GetCursorOrData(GetCursor)

    Kueri kursor berdasarkan waktu pembuatan log.

    acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}

    log:CreateConsumerGroup(CreateConsumerGroup)

    Membuat kelompok konsumen untuk penyimpanan log.

    acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}

    log:ListConsumerGroup(ListConsumerGroup)

    Kueri semua kelompok konsumen dari penyimpanan log.

    acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/*

    log:ConsumerGroupUpdateCheckPoint(ConsumerGroupUpdateCheckPoint)

    Memperbarui checkpoint konsumsi untuk shard yang dialokasikan ke kelompok konsumen.

    acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}

    log:ConsumerGroupHeartBeat(ConsumerGroupHeartBeat)

    Mengirim pesan heartbeat untuk konsumen ke Layanan Log Sederhana.

    acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}

    log:UpdateConsumerGroup(UpdateConsumerGroup)

    Memodifikasi atribut kelompok konsumen.

    acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}

    log:GetConsumerGroupCheckPoint(GetCheckPoint)

    Kueri checkpoint konsumsi untuk satu atau semua shard yang dialokasikan ke kelompok konsumen.

    acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}

    Berikut adalah informasi sumber daya terkait kelompok konsumen. Untuk memberikan izin kepada pengguna RAM agar dapat melakukan operasi pada kelompok konsumen, gunakan kode berikut sebagai referensi:

    • ID akun Alibaba Cloud tempat proyek berada: 174649****602745

    • ID wilayah tempat proyek berada: cn-hangzhou

    • Nama proyek: project-test

    • Nama penyimpanan log: logstore-test

    • Nama kelompok konsumen: consumergroup-test

    Contoh kode:

    {
      "Version": "1",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": [
            "log:GetCursorOrData"
          ],
          "Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test"
        },
        {
          "Effect": "Allow",
          "Action": [
            "log:CreateConsumerGroup",
            "log:ListConsumerGroup"
          ],
          "Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/*"
        },
        {
          "Effect": "Allow",
          "Action": [
            "log:ConsumerGroupUpdateCheckPoint",
            "log:ConsumerGroupHeartBeat",
            "log:UpdateConsumerGroup",
            "log:GetConsumerGroupCheckPoint"
          ],
          "Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/consumergroup-test"
        }
      ]
    }
  • Konfigurasikan Log4j untuk pemecahan masalah.

    Kami sarankan Anda mengonfigurasi Log4j untuk program konsumen agar dapat menampilkan pesan kesalahan saat terjadi pengecualian dalam kelompok konsumen. Ini membantu Anda memecahkan masalah kesalahan tersebut. Berikut adalah contoh file konfigurasi log4j.properties umum:

    log4j.rootLogger = info,stdout
    log4j.appender.stdout = org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.Target = System.out
    log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n

    Setelah mengonfigurasi Log4j, Anda dapat menerima pesan kesalahan saat menjalankan program konsumen. Contoh berikut menunjukkan pesan kesalahan:

    [WARN ] 2018-03-14 12:01:52,747 method:com.aliyun.openservices.loghub.client.LogHubConsumer.sampleLogError(LogHubConsumer.java:159)
    com.aliyun.openservices.log.exception.LogException: Invalid loggroup count, (0,1000]
  • Gunakan kelompok konsumen untuk mengonsumsi data yang dihasilkan setelah titik waktu tertentu

    // consumerStartTimeInSeconds menentukan titik waktu. Data yang dihasilkan setelah titik waktu tersebut dikonsumsi.
    public LogHubConfig(String consumerGroupName, 
                          String consumerName, 
                          String loghubEndPoint,
                          String project, String logStore,
                          String accessId, String accessKey,
                          int consumerStartTimeInSeconds);
    
    // position adalah variabel enumerasi. LogHubConfig.ConsumePosition.BEGIN_CURSOR menentukan bahwa konsumsi dimulai dari data paling awal. LogHubConfig.ConsumePosition.END_CURSOR menentukan bahwa konsumsi dimulai dari data paling baru.
    public LogHubConfig(String consumerGroupName, 
                          String consumerName, 
                          String loghubEndPoint,
                          String project, String logStore,
                          String accessId, String accessKey,
                          ConsumePosition position);
    Catatan
    • Anda dapat menggunakan konstruktor yang berbeda sesuai dengan kebutuhan bisnis Anda.

    • Jika checkpoint konsumsi disimpan di Layanan Log Sederhana, konsumsi data dimulai dari checkpoint tersebut.

    • Saat Layanan Log Sederhana mengonsumsi data, checkpoint konsumsi diprioritaskan untuk memulai konsumsi data. Jika Anda ingin menentukan titik waktu dari mana Layanan Log Sederhana mulai mengonsumsi data, pastikan nilai consumerStartTimeInSeconds berada dalam periode time-to-live (TTL). Jika tidak, Layanan Log Sederhana tidak dapat mengonsumsi data sesuai konfigurasi Anda.

  • Reset checkpoint konsumsi

    public static void updateCheckpoint() throws Exception {
            Client client = new Client(host, accessId, accessKey);
            // Tentukan timestamp UNIX yang akurat hingga detik. Jika Anda menentukan timestamp dalam milidetik, bagi timestamp tersebut dengan 1000. Contoh:
            long timestamp = Timestamp.valueOf("2017-11-15 00:00:00").getTime() / 1000;
            ListShardResponse response = client.ListShard(new ListShardRequest(project, logStore));
            for (Shard shard : response.GetShards()) {
                int shardId = shard.GetShardId();
                String cursor = client.GetCursor(project, logStore, shardId, timestamp).GetCursor();
                client.UpdateCheckPoint(project, logStore, consumerGroup, shardId, cursor);
            }
        }

Referensi