Layanan Log Sederhana memungkinkan perangkat lunak pihak ketiga, aplikasi dalam berbagai bahasa pemrograman, layanan cloud, dan kerangka komputasi aliran untuk mengonsumsi data secara real-time dengan memanggil SDK Layanan Log Sederhana. Selama pemanggilan SDK, Anda dapat membuat grup konsumen untuk mengonsumsi data dalam hitungan detik. Topik ini menjelaskan cara menggunakan grup konsumen untuk mengonsumsi data.
Alur Kerja
Penyimpanan log terdiri dari beberapa shard. Layanan Log Sederhana mengalokasikan shard ke konsumen dalam grup konsumen berdasarkan aturan berikut:
Setiap shard hanya dapat dialokasikan ke satu konsumen.
Seorang konsumen dapat mengonsumsi data dari beberapa shard.
Setelah konsumen baru ditambahkan ke grup konsumen, shard yang telah dialokasikan akan dialokasikan ulang untuk menyeimbangkan beban di antara konsumen. Proses alokasi ulang mengikuti aturan sebelumnya.
Jika Anda menggunakan grup konsumen untuk mengonsumsi data, Layanan Log Sederhana secara otomatis menyimpan titik pemeriksaan saat terjadi kesalahan dalam program Anda. Setelah program pulih, konsumen dapat melanjutkan konsumsi data dari titik pemeriksaan tanpa mengonsumsi data berulang kali.
Prasyarat
Pengguna Resource Access Management (RAM) telah dibuat, dan izin yang diperlukan telah diberikan kepada pengguna RAM. Untuk informasi lebih lanjut, lihat Buat Pengguna RAM dan Otorisasi Pengguna RAM untuk Mengakses Layanan Log Sederhana.
Jika Anda menggunakan pengguna RAM untuk mengelola grup konsumen, pastikan bahwa pengguna RAM memiliki izin yang diperlukan. Untuk informasi lebih lanjut, lihat Otorisasi Pengguna RAM.
Variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi. Untuk informasi lebih lanjut, lihat Konfigurasikan Variabel Lingkungan di Linux, macOS, dan Windows.
PentingPasangan AccessKey akun Alibaba Cloud memiliki izin untuk semua operasi API. Kami menyarankan Anda menggunakan pasangan AccessKey pengguna RAM untuk memanggil operasi API atau melakukan pemeliharaan rutin O&M.
Hindari menyertakan ID AccessKey atau rahasia AccessKey dalam kode proyek Anda. Jika tidak, pasangan AccessKey mungkin bocor, dan keamanan semua sumber daya dalam akun Anda dapat terganggu.
Lingkungan pengembangan SDK telah disiapkan. Untuk informasi lebih lanjut, lihat Ikhtisar SDK Layanan Log Sederhana.
Istilah
Istilah | Deskripsi |
grup konsumen | Anda dapat menggunakan grup konsumen untuk mengonsumsi data di Layanan Log Sederhana. Grup konsumen terdiri dari beberapa konsumen. Semua konsumen dalam grup konsumen mengonsumsi data di penyimpanan log yang sama. Konsumen tidak mengonsumsi data secara berulang. Penting Anda dapat membuat hingga 30 grup konsumen untuk sebuah penyimpanan log. |
konsumen | Konsumen dalam grup konsumen mengonsumsi data. Penting Nama konsumen dalam grup konsumen harus unik. |
penyimpanan log | Penyimpanan log digunakan untuk mengumpulkan, menyimpan, dan menanyakan data. Untuk informasi lebih lanjut, lihat Penyimpanan log. |
shard | Shard digunakan untuk mengontrol kapasitas baca dan tulis penyimpanan log. Di Layanan Log Sederhana, data disimpan dalam shard. Untuk informasi lebih lanjut, lihat Shard. |
titik pemeriksaan | Titik pemeriksaan adalah posisi di mana program berhenti mengonsumsi data. Jika program dimulai ulang, program mengonsumsi data dari titik pemeriksaan terakhir. |
Langkah 1: Buat grup konsumen
Panggil operasi API untuk membuat grup konsumen
Untuk informasi lebih lanjut tentang cara memanggil operasi API untuk membuat grup konsumen, lihat CreateConsumerGroup.
Untuk informasi lebih lanjut tentang cara memeriksa apakah grup konsumen telah dibuat, lihat ListConsumerGroup.
Gunakan SDK Layanan Log Sederhana untuk membuat grup konsumen
Untuk informasi lebih lanjut tentang kode contoh yang digunakan untuk mengelola grup konsumen, lihat Gunakan SDK Layanan Log Sederhana untuk Java untuk Mengelola Grup Konsumen dan Gunakan SDK Layanan Log Sederhana untuk Python untuk Mengelola Grup Konsumen.
Gunakan CLI Layanan Log Sederhana untuk membuat grup konsumen
Untuk informasi lebih lanjut tentang cara menggunakan CLI Layanan Log Sederhana untuk membuat grup konsumen, lihat create_consumer_group.
Untuk informasi lebih lanjut tentang cara memeriksa apakah grup konsumen telah dibuat, lihat list_consumer_group.
Langkah 2: Konsumsi data
Konsumsi data
Anda dapat menggunakan SDK Layanan Log Sederhana untuk Java, C++, Python, atau Go untuk membuat grup konsumen dan mengonsumsi data. Dalam contoh ini, SDK Layanan Log Sederhana untuk Java digunakan.
Cara kerjanya
Pertama kali Anda memanggil SDK Layanan Log Sederhana untuk Java untuk memulai konsumen, SDK akan membuat grup konsumen jika tidak menemukan grup konsumen tempat konsumen tersebut milik. Setelah grup konsumen dibuat, SDK mencatat titik pemeriksaan awal dan mulai mengonsumsi data dari titik pemeriksaan tersebut. Titik pemeriksaan awal menjadi tidak valid setelah konsumsi pertama kali. Saat konsumen dimulai ulang, konsumen melanjutkan konsumsi data dari titik pemeriksaan terakhir yang disimpan oleh Layanan Log Sederhana. Contoh titik pemeriksaan:
LogHubConfig.ConsumePosition.BEGIN_CURSOR: titik pemeriksaan awal, yang menentukan log pertama dalam penyimpanan log. Seorang konsumen memulai konsumsi dari data paling awal.LogHubConfig.ConsumePosition.END_CURSOR: titik pemeriksaan akhir, yang menentukan log terakhir dalam penyimpanan log.
Tambahkan dependensi Maven.
Buka file pom.xml di direktori root proyek Java Anda dan tambahkan kode berikut:
<dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>loghub-client-lib</artifactId> <version>0.6.47</version> </dependency>Buat file bernama
SampleLogHubProcessor.javauntuk menulis logika implementasi konsumen.import com.aliyun.openservices.log.common.FastLog; import com.aliyun.openservices.log.common.FastLogContent; import com.aliyun.openservices.log.common.FastLogGroup; import com.aliyun.openservices.log.common.FastLogTag; import com.aliyun.openservices.log.common.LogGroupData; import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker; import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException; import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor; import java.util.List; public class SampleLogHubProcessor implements ILogHubProcessor { private int shardId; // Waktu terakhir titik pemeriksaan disimpan. private long mLastSaveTime = 0; // Metode initialize dipanggil sekali saat objek prosesor diinisialisasi. public void initialize(int shardId) { this.shardId = shardId; } // Logika utama konsumsi data. Anda harus menyertakan kode untuk menangani semua pengecualian yang mungkin terjadi selama konsumsi data. public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) { // Tampilkan data yang diperoleh. for (LogGroupData logGroup : logGroups) { FastLogGroup fastLogGroup = logGroup.GetFastLogGroup(); System.out.println("Tags"); for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) { FastLogTag logTag = fastLogGroup.getLogTags(i); System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue()); } for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) { FastLog log = fastLogGroup.getLogs(i); System.out.println("--------\nLog: " + i + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount()); for (int j = 0; j < log.getContentsCount(); ++j) { FastLogContent content = log.getContents(j); System.out.println(content.getKey() + "\t:\t" + content.getValue()); } } } long curTime = System.currentTimeMillis(); // Titik pemeriksaan ditulis ke Layanan Log Sederhana pada interval 30 detik. Jika instance ClientWorker berhenti secara tak terduga dalam 30 detik, instance ClientWorker yang baru dimulai mengonsumsi data dari titik pemeriksaan terakhir. Sejumlah kecil data mungkin dikonsumsi berulang kali. try { if (curTime - mLastSaveTime > 30 * 1000) { // Nilai true menunjukkan bahwa titik pemeriksaan segera diperbarui ke Layanan Log Sederhana. Secara default, titik pemeriksaan yang disimpan di memori secara otomatis diperbarui ke Layanan Log Sederhana pada interval 60 detik. checkPointTracker.saveCheckPoint(true); mLastSaveTime = curTime; } else { // Nilai false menunjukkan bahwa titik pemeriksaan disimpan secara lokal dan dapat diperbarui ke Layanan Log Sederhana menggunakan mekanisme pembaruan titik pemeriksaan otomatis. checkPointTracker.saveCheckPoint(false); } } catch (LogHubCheckPointException e) { e.printStackTrace(); } return null; } // Fungsi shutdown instance ClientWorker dipanggil. Anda dapat mengelola titik pemeriksaan. public void shutdown(ILogHubCheckPointTracker checkPointTracker) { // Segera simpan titik pemeriksaan ke Layanan Log Sederhana. try { checkPointTracker.saveCheckPoint(true); } catch (LogHubCheckPointException e) { e.printStackTrace(); } } }Untuk informasi lebih lanjut, lihat Java, C++, Python, dan Go.
Buat file bernama
SampleLogHubProcessorFactory.javauntuk mendefinisikan entitas konsumen.class SampleLogHubProcessorFactory implements ILogHubProcessorFactory { public ILogHubProcessor generatorProcessor() { // Hasilkan konsumen. Setiap kali metode generatorProcessor dipanggil, objek SampleLogHubProcessor baru dikembalikan sesuai harapan. return new SampleLogHubProcessor(); } }Buat file Main.java. Buat grup konsumen dan mulai thread konsumen untuk memungkinkan konsumen dalam grup konsumen mengonsumsi data di penyimpanan log yang ditentukan.
import com.aliyun.openservices.loghub.client.ClientWorker; import com.aliyun.openservices.loghub.client.config.LogHubConfig; import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException; public class Main { // Titik akhir Layanan Log Sederhana. Masukkan titik akhir berdasarkan kebutuhan bisnis Anda. private static String Endpoint = "cn-hangzhou.log.aliyuncs.com"; // Nama proyek Layanan Log Sederhana. Masukkan nama berdasarkan kebutuhan bisnis Anda. Anda harus memasukkan nama proyek yang ada. private static String Project = "ali-cn-hangzhou-sls-admin"; // Nama penyimpanan log. Masukkan nama berdasarkan kebutuhan bisnis Anda. Anda harus memasukkan nama penyimpanan log yang ada. private static String Logstore = "sls_operation_log"; // Nama grup konsumen. Masukkan nama berdasarkan kebutuhan bisnis Anda. Anda tidak perlu membuat grup konsumen terlebih dahulu. Grup konsumen dibuat secara otomatis saat program berjalan. private static String ConsumerGroup = "consumerGroupX"; // Dalam contoh ini, ID AccessKey dan rahasia AccessKey diperoleh dari variabel lingkungan. private static String AccessKeyId= System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException { // consumer_1 menentukan nama konsumen. Nama setiap konsumen dalam grup konsumen harus unik. Jika konsumen berbeda memulai proses pada mesin berbeda untuk mengonsumsi data dalam penyimpanan log, Anda dapat menggunakan alamat IP mesin untuk mengidentifikasi setiap konsumen. // maxFetchLogGroupSize menentukan jumlah maksimum grup log yang dapat diperoleh dari Layanan Log Sederhana sekaligus. Pertahankan nilai default. Anda dapat menggunakan config.setMaxFetchLogGroupSize(100); untuk mengubah jumlah maksimum. Nilai valid: (0,1000]. LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR,1000); ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config); Thread thread = new Thread(worker); // Setelah instance Thread berjalan, instance ClientWorker secara otomatis berjalan dan memperluas antarmuka Runnable. thread.start(); Thread.sleep(60 * 60 * 1000); // Fungsi shutdown instance ClientWorker dipanggil untuk keluar dari konsumen. Instance Thread secara otomatis berhenti. worker.shutdown(); // Beberapa tugas asinkron dihasilkan saat instance ClientWorker berjalan. Untuk memastikan bahwa semua tugas yang sedang berjalan berhenti dengan aman setelah shutdown, kami sarankan Anda mengatur Thread.sleep selama 30 detik. Thread.sleep(30 * 1000); } }Jalankan file Main.java.
Dalam contoh ini, log NGINX dikonsumsi dan hasil konsumsi ditampilkan.
: GET request_uri : /request/path-3/file-7 status : 200 body_bytes_sent : 3820 host : www.example.com request_time : 43 request_length : 1987 http_user_agent : Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36 http_referer : www.example.com http_x_forwarded_for : 192.168.10.196 upstream_response_time : 0.02 -------- Log: 158, time: 1635629778, GetContentCount: 14 ...... category : null source : 127.0.0.1 topic : nginx_access_log machineUUID : null Tags __receive_time__ : 1635629815 -------- Log: 0, time: 1635629788, GetContentCount: 14 ...... category : null source : 127.0.0.1 topic : nginx_access_log machineUUID : null Tags __receive_time__ : 1635629877 -------- ......
Konsumsi data berdasarkan SPL
Anda dapat menggunakan SDK Layanan Log Sederhana untuk Java, C++, Python, atau Go untuk membuat grup konsumen dan mengonsumsi data. Dalam contoh ini, SDK Layanan Log Sederhana untuk Java digunakan.
Cara kerjanya
Pertama kali Anda memanggil SDK Layanan Log Sederhana untuk Java untuk memulai konsumen, SDK akan membuat grup konsumen jika tidak menemukan grup konsumen tempat konsumen tersebut milik. Setelah grup konsumen dibuat, SDK mencatat titik pemeriksaan awal dan mulai mengonsumsi data dari titik pemeriksaan tersebut. Titik pemeriksaan awal menjadi tidak valid setelah konsumsi pertama kali. Saat konsumen dimulai ulang, konsumen melanjutkan konsumsi data dari titik pemeriksaan terakhir yang disimpan oleh Layanan Log Sederhana. Contoh titik pemeriksaan:
LogHubConfig.ConsumePosition.BEGIN_CURSOR: titik pemeriksaan awal, yang menentukan log pertama dalam penyimpanan log. Seorang konsumen memulai konsumsi dari data paling awal.LogHubConfig.ConsumePosition.END_CURSOR: titik pemeriksaan akhir, yang menentukan log terakhir dalam penyimpanan log.
Tambahkan dependensi Maven.
Buka file pom.xml di direktori root proyek Java Anda dan tambahkan kode berikut:
<dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>aliyun-log</artifactId> <version>0.6.99</version> </dependency>Buat file bernama SPLLogHubProcessor.java.
import com.aliyun.openservices.log.common.FastLog; import com.aliyun.openservices.log.common.FastLogContent; import com.aliyun.openservices.log.common.FastLogGroup; import com.aliyun.openservices.log.common.FastLogTag; import com.aliyun.openservices.log.common.LogGroupData; import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker; import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException; import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor; import java.util.List; public class SPLLogHubProcessor implements ILogHubProcessor { private int shardId; // Waktu terakhir titik pemeriksaan disimpan. private long mLastSaveTime = 0; // Metode initialize dipanggil sekali saat objek prosesor diinisialisasi. public void initialize(int shardId) { this.shardId = shardId; } // Logika utama konsumsi data. Anda harus menyertakan kode untuk menangani semua pengecualian yang mungkin terjadi selama konsumsi data. public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) { // Tampilkan data yang diperoleh. for (LogGroupData logGroup : logGroups) { FastLogGroup fastLogGroup = logGroup.GetFastLogGroup(); System.out.println("Tags"); for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) { FastLogTag logTag = fastLogGroup.getLogTags(i); System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue()); } for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) { FastLog log = fastLogGroup.getLogs(i); System.out.println("--------\nLog: " + i + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount()); for (int j = 0; j < log.getContentsCount(); ++j) { FastLogContent content = log.getContents(j); System.out.println(content.getKey() + "\t:\t" + content.getValue()); } } } long curTime = System.currentTimeMillis(); // Titik pemeriksaan ditulis ke Layanan Log Sederhana pada interval 30 detik. Jika instance ClientWorker berhenti secara tak terduga dalam 30 detik, instance ClientWorker yang baru dimulai mengonsumsi data dari titik pemeriksaan terakhir. Sejumlah kecil data mungkin dikonsumsi berulang kali. try { if (curTime - mLastSaveTime > 30 * 1000) { // Nilai true menunjukkan bahwa titik pemeriksaan segera diperbarui ke Layanan Log Sederhana. Secara default, titik pemeriksaan yang disimpan di memori secara otomatis diperbarui ke Layanan Log Sederhana pada interval 60 detik. checkPointTracker.saveCheckPoint(true); mLastSaveTime = curTime; } else { // Nilai false menunjukkan bahwa titik pemeriksaan disimpan secara lokal dan dapat diperbarui ke Layanan Log Sederhana menggunakan mekanisme pembaruan titik pemeriksaan otomatis. checkPointTracker.saveCheckPoint(false); } } catch (LogHubCheckPointException e) { e.printStackTrace(); } return null; } // Fungsi shutdown instance ClientWorker dipanggil. Anda dapat mengelola titik pemeriksaan. public void shutdown(ILogHubCheckPointTracker checkPointTracker) { // Segera simpan titik pemeriksaan ke Layanan Log Sederhana. try { checkPointTracker.saveCheckPoint(true); } catch (LogHubCheckPointException e) { e.printStackTrace(); } } }Buat file bernama SPLLogHubProcessorFactory.java.
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor; import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory; class SPLLogHubProcessorFactory implements ILogHubProcessorFactory { public ILogHubProcessor generatorProcessor() { // Hasilkan konsumen. Setiap kali metode generatorProcessor dipanggil, objek SPLLogHubProcessor baru dikembalikan sesuai harapan. return new SPLLogHubProcessor(); } }Buat file Main.java. Buat grup konsumen dan mulai thread konsumen untuk memungkinkan konsumen dalam grup konsumen mengonsumsi data di penyimpanan log yang ditentukan. Untuk informasi lebih lanjut tentang kode contoh yang digunakan untuk mengelola grup konsumen, lihat Gunakan SDK Layanan Log Sederhana untuk Java untuk Mengelola Grup Konsumen dan Gunakan SDK Layanan Log Sederhana untuk Python untuk Mengelola Grup Konsumen.
import com.aliyun.openservices.loghub.client.ClientWorker; import com.aliyun.openservices.loghub.client.config.LogHubConfig; import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException; public class SPLConsumer { // Titik akhir Layanan Log Sederhana. Masukkan titik akhir berdasarkan kebutuhan bisnis Anda. private static String Endpoint = "cn-hangzhou.log.aliyuncs.com"; // Nama proyek Layanan Log Sederhana. Masukkan nama berdasarkan kebutuhan bisnis Anda. Anda harus memasukkan nama proyek yang ada. private static String Project = "your_project"; // Nama penyimpanan log. Masukkan nama berdasarkan kebutuhan bisnis Anda. Anda harus memasukkan nama penyimpanan log yang ada. private static String Logstore = "your_logstore"; // Nama grup konsumen. Masukkan nama berdasarkan kebutuhan bisnis Anda. Anda tidak perlu membuat grup konsumen terlebih dahulu. Grup konsumen dibuat secara otomatis saat program berjalan. private static String ConsumerGroup = "consumerGroupX"; // Dalam contoh ini, ID AccessKey dan rahasia AccessKey diperoleh dari variabel lingkungan. private static String AccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException { // consumer_1 menentukan nama konsumen. Nama setiap konsumen dalam grup konsumen harus unik. Jika konsumen berbeda memulai proses pada mesin berbeda untuk mengonsumsi data dalam penyimpanan log, Anda dapat menggunakan alamat IP mesin untuk mengidentifikasi setiap konsumen. // maxFetchLogGroupSize menentukan jumlah maksimum grup log yang dapat diperoleh dari Layanan Log Sederhana sekaligus. Pertahankan nilai default. Anda dapat menggunakan config.setMaxFetchLogGroupSize(100); untuk mengubah jumlah maksimum. Nilai valid: (0,1000]. LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR, 1000); // Anda dapat menggunakan setQuery untuk menentukan pernyataan Bahasa Pemrosesan Layanan Log Sederhana (SPL) untuk konsumsi data. config.setQuery("* | where cast(body_bytes_sent as bigint) > 14000"); ClientWorker worker = new ClientWorker(new SPLLogHubProcessorFactory(), config); Thread thread = new Thread(worker); // Setelah instance Thread berjalan, instance ClientWorker secara otomatis berjalan dan memperluas antarmuka Runnable. thread.start(); Thread.sleep(60 * 60 * 1000); // Fungsi shutdown instance ClientWorker dipanggil untuk keluar dari konsumen. Instance Thread secara otomatis berhenti. worker.shutdown(); // Beberapa tugas asinkron dihasilkan saat instance ClientWorker berjalan. Untuk memastikan bahwa semua tugas yang sedang berjalan berhenti dengan aman setelah shutdown, kami sarankan Anda mengatur Thread.sleep selama 30 detik. Thread.sleep(30 * 1000); } }Jalankan file Main.java.
Dalam contoh ini, log NGINX dikonsumsi dan hasil konsumsi ditampilkan.
: GET request_uri : /request/path-3/file-7 status : 200 body_bytes_sent : 3820 host : www.example.com request_time : 43 request_length : 1987 http_user_agent : Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36 http_referer : www.example.com http_x_forwarded_for : 192.168.10.196 upstream_response_time : 0.02 -------- Log: 158, time: 1635629778, GetContentCount: 14 ...... category : null source : 127.0.0.1 topic : nginx_access_log machineUUID : null Tags __receive_time__ : 1635629815 -------- Log: 0, time: 1635629788, GetContentCount: 14 ...... category : null source : 127.0.0.1 topic : nginx_access_log machineUUID : null Tags __receive_time__ : 1635629877 -------- ......
Langkah 3: Lihat status grup konsumen
Gunakan konsol Layanan Log Sederhana
Masuk ke konsol Layanan Log Sederhana.
Di bagian Proyek, klik yang ingin Anda kelola.

Pada tab , klik ikon
di sebelah penyimpanan log yang ingin Anda kelola. Lalu, klik ikon
di sebelah Data Consumption.Dalam daftar grup konsumen, klik grup konsumen yang ingin Anda kelola.
Di halaman Consumer Group Status, lihat titik pemeriksaan konsumsi setiap shard.
Gunakan SDK Layanan Log Sederhana
Dalam contoh ini, SDK Layanan Log Sederhana untuk Java digunakan. Jalankan file ConsumerGroupTest.java untuk melihat titik pemeriksaan konsumsi setiap shard.
import java.util.List;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.Consts.CursorMode;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.common.ConsumerGroupShardCheckPoint;
import com.aliyun.openservices.log.exception.LogException;
public class ConsumerGroupTest {
static String endpoint = "";
static String project = "";
static String logstore = "";
static String accesskeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
static String accesskey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
public static void main(String[] args) throws LogException {
Client client = new Client(endpoint, accesskeyId, accesskey);
// Dapatkan semua grup konsumen yang dibuat untuk penyimpanan log. Jika tidak ada grup konsumen yang ada, string kosong dikembalikan.
List<ConsumerGroup> consumerGroups = client.ListConsumerGroup(project, logstore).GetConsumerGroups();
for(ConsumerGroup c: consumerGroups){
// Tampilkan atribut setiap grup konsumen, termasuk nama, periode timeout heartbeat, dan apakah data dikonsumsi secara berurutan.
System.out.println("Nama: " + c.getConsumerGroupName());
System.out.println("Periode timeout heartbeat " + c.getTimeout());
System.out.println("Konsumsi berurutan: " + c.isInOrder());
for(ConsumerGroupShardCheckPoint cp: client.GetCheckPoint(project, logstore, c.getConsumerGroupName()).GetCheckPoints()){
System.out.println("shard: " + cp.getShard());
// Waktu adalah bilangan bulat panjang dan akurat hingga mikrodetik.
System.out.println("Waktu terakhir pembaruan kemajuan konsumsi data: " + cp.getUpdateTime());
System.out.println("Nama konsumen: " + cp.getConsumer());
String consumerPrg = "";
if(cp.getCheckPoint().isEmpty())
consumerPrg = "Konsumsi belum dimulai";
else{
// Timestamp UNIX. Unit: detik. Format nilai output timestamp.
try{
int prg = client.GetPrevCursorTime(project, logstore, cp.getShard(), cp.getCheckPoint()).GetCursorTime();
consumerPrg = "" + prg;
}
catch(LogException e){
if(e.GetErrorCode() == "InvalidCursor")
consumerPrg = "Tidak valid. Waktu terakhir pembaruan kemajuan konsumsi data melebihi periode retensi data";
else{
// kesalahan server internal
throw e;
}
}
}
System.out.println("Kemajuan konsumsi: " + consumerPrg);
String endCursor = client.GetCursor(project, logstore, cp.getShard(), CursorMode.END).GetCursor();
int endPrg = 0;
try{
endPrg = client.GetPrevCursorTime(project, logstore, cp.getShard(), endCursor).GetCursorTime();
}
catch(LogException e){
// abaikan
}
// Timestamp UNIX. Unit: detik. Format nilai output timestamp.
System.out.println("Waktu penerimaan data terakhir: " + endPrg);
}
}
}
}Informasi berikut dikembalikan:
Nama: etl-6cac01c571d5a4b933649c04a7ba215b
Periode timeout heartbeat: 60
Konsumsi berurutan: false
shard: 0
Waktu terakhir pembaruan kemajuan konsumsi data: 1639555453575211
Nama konsumen: etl-356464787983a3d17086a9797e3d5f0e6959b066-256521
Kemajuan konsumsi: 1639555453
Waktu penerimaan data terakhir: 1639555453
shard: 1
Waktu terakhir pembaruan kemajuan konsumsi data: 1639555392071328
Nama konsumen: etl-356464787983a3d17086a9797e3d5f0e6959b066-256521
Kemajuan konsumsi: 1639555391
Waktu penerimaan data terakhir: 1639555391
Nama: etl-2bd3fdfdd63595d56b1ac24393bf5991
Periode timeout heartbeat: 60
Konsumsi berurutan: false
shard: 0
Waktu terakhir pembaruan kemajuan konsumsi data: 1639555453256773
Nama konsumen: etl-532719dd2f198b3878ee3c6dfc80aeffb39ee48b-061390
Kemajuan konsumsi: 1639555453
Waktu penerimaan data terakhir: 1639555453
shard: 1
Waktu terakhir pembaruan kemajuan konsumsi data: 1639555392066234
Nama konsumen: etl-532719dd2f198b3878ee3c6dfc80aeffb39ee48b-061390
Kemajuan konsumsi: 1639555391
Waktu penerimaan data terakhir: 1639555391
Nama: consumerGroupX
Periode timeout heartbeat: 60
Konsumsi berurutan: false
shard: 0
Waktu terakhir pembaruan kemajuan konsumsi data: 1639555434142879
Nama konsumen: consumer_1
Kemajuan konsumsi: 1635615029
Waktu penerimaan data terakhir: 1639555453
shard: 1
Waktu terakhir pembaruan kemajuan konsumsi data: 1639555437976929
Nama konsumen: consumer_1
Kemajuan konsumsi: 1635616802
Waktu penerimaan data terakhir: 1639555391Otorisasi pengguna RAM untuk melakukan operasi pada grup konsumen
Sebelum Anda dapat menggunakan pengguna RAM untuk mengelola grup konsumen, Anda harus memberikan izin yang diperlukan kepada pengguna RAM. Untuk informasi lebih lanjut, lihat Langkah 2: Berikan Izin kepada Pengguna RAM.
Tabel berikut menjelaskan tindakan yang dapat Anda otorisasi kepada pengguna RAM untuk dilakukan.
Tindakan | Deskripsi | Sumber daya |
Mengkueri kursor berdasarkan waktu pembuatan log. | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName} | |
Membuat grup konsumen untuk penyimpanan log. | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} | |
Mengkueri semua grup konsumen dari penyimpanan log. | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/* | |
Memperbarui titik pemeriksaan untuk shard yang dialokasikan ke grup konsumen. | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} | |
Mengirim pesan heartbeat untuk konsumen ke Layanan Log Sederhana. | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} | |
Memodifikasi atribut grup konsumen. | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} | |
Mengkueri titik pemeriksaan untuk satu atau semua shard yang dialokasikan ke grup konsumen. | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
Daftar berikut menyediakan informasi sumber daya tentang grup konsumen. Untuk mengizinkan pengguna RAM melakukan operasi pada grup konsumen, Anda dapat merujuk pada kode berikut untuk memberikan izin yang diperlukan kepada pengguna RAM:
ID akun Alibaba Cloud tempat proyek milik: 174649****602745
ID wilayah tempat proyek berada: cn-hangzhou
Nama proyek: project-test
Nama penyimpanan log: logstore-test
Nama grup konsumen: consumergroup-test
{
"Version": "1",
"Statement": [
{
"Effect": "Allow",
"Action": [
"log:GetCursorOrData"
],
"Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test"
},
{
"Effect": "Allow",
"Action": [
"log:CreateConsumerGroup",
"log:ListConsumerGroup"
],
"Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/*"
},
{
"Effect": "Allow",
"Action": [
"log:ConsumerGroupUpdateCheckPoint",
"log:ConsumerGroupHeartBeat",
"log:UpdateConsumerGroup",
"log:GetConsumerGroupCheckPoint"
],
"Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/consumergroup-test"
}
]
}Apa yang harus dilakukan selanjutnya
Konfigurasikan Log4j untuk pemecahan masalah.
Kami menyarankan Anda mengonfigurasi Log4j untuk program konsumen Anda guna menampilkan pesan kesalahan saat terjadi pengecualian dalam grup konsumen. Hal ini membantu Anda memecahkan masalah kesalahan tersebut. Kode berikut menunjukkan file konfigurasi log4j.properties umum:
log4j.rootLogger = info,stdout log4j.appender.stdout = org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target = System.out log4j.appender.stdout.layout = org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%nSetelah mengonfigurasi Log4j, Anda dapat menerima pesan kesalahan saat menjalankan program konsumen. Contoh berikut menunjukkan pesan kesalahan:
[WARN ] 2018-03-14 12:01:52,747 method:com.aliyun.openservices.loghub.client.LogHubConsumer.sampleLogError(LogHubConsumer.java:159) com.aliyun.openservices.log.exception.LogException: Invalid loggroup count, (0,1000]Gunakan grup konsumen untuk mengonsumsi data yang dihasilkan setelah suatu titik waktu tertentu.
// consumerStartTimeInSeconds menentukan suatu titik waktu. Data yang dihasilkan setelah titik waktu tersebut dikonsumsi. public LogHubConfig(String consumerGroupName, String consumerName, String loghubEndPoint, String project, String logStore, String accessId, String accessKey, int consumerStartTimeInSeconds); // position adalah variabel enumerasi. LogHubConfig.ConsumePosition.BEGIN_CURSOR menentukan bahwa konsumsi dimulai dari data paling awal. LogHubConfig.ConsumePosition.END_CURSOR menentukan bahwa konsumsi dimulai dari data terbaru. public LogHubConfig(String consumerGroupName, String consumerName, String loghubEndPoint, String project, String logStore, String accessId, String accessKey, ConsumePosition position);CatatanAnda dapat menggunakan konstruktor yang berbeda sesuai dengan kebutuhan bisnis Anda.
Jika titik pemeriksaan disimpan di Layanan Log Sederhana, konsumsi data dimulai dari titik pemeriksaan tersebut.
Saat Layanan Log Sederhana mengonsumsi data, titik pemeriksaan diprioritaskan untuk memulai konsumsi data. Jika Anda ingin menentukan titik waktu dari mana Layanan Log Sederhana mulai mengonsumsi data, pastikan nilai consumerStartTimeInSeconds berada dalam periode time-to-live (TTL). Jika tidak, Layanan Log Sederhana tidak dapat mengonsumsi data berdasarkan konfigurasi Anda.
Atur ulang titik pemeriksaan.
public static void updateCheckpoint() throws Exception { Client client = new Client(host, accessId, accessKey); // Tentukan timestamp UNIX yang akurat hingga detik. Jika Anda menentukan timestamp dalam milidetik, bagi timestamp tersebut dengan 1000. Contoh: long timestamp = Timestamp.valueOf("2017-11-15 00:00:00").getTime() / 1000; ListShardResponse response = client.ListShard(new ListShardRequest(project, logStore)); for (Shard shard : response.GetShards()) { int shardId = shard.GetShardId(); String cursor = client.GetCursor(project, logStore, shardId, timestamp).GetCursor(); client.UpdateCheckPoint(project, logStore, consumerGroup, shardId, cursor); } }
Referensi
API
Tindakan
Operasi API
Buat grup konsumen
Kueri grup konsumen
Hapus grup konsumen
Perbarui grup konsumen
Kirim pesan heartbeat untuk konsumen
Kueri titik pemeriksaan grup konsumen
Perbarui titik pemeriksaan grup konsumen
SDK
Bahasa pemrograman
Referensi
Java
Python
CLI
Tindakan
Perintah CLI
Buat grup konsumen
Kueri grup konsumen
Perbarui grup konsumen
Hapus grup konsumen
Kueri titik pemeriksaan grup konsumen
Perbarui titik pemeriksaan grup konsumen