Layanan Log Sederhana memungkinkan perangkat lunak pihak ketiga, aplikasi dalam berbagai bahasa pemrograman, layanan cloud, dan kerangka komputasi aliran untuk mengonsumsi data secara real-time dengan memanggil SDK Layanan Log Sederhana. Namun, konsumsi berbasis SDK tidak dapat memenuhi persyaratan untuk detail implementasi tertentu, seperti load balancing dan failover di antara konsumen. Dalam kasus ini, Anda dapat membuat kelompok konsumen untuk mengonsumsi data dalam hitungan detik. Topik ini menjelaskan cara menggunakan kelompok konsumen untuk mengonsumsi data.
Ikhtisar
Penyimpanan log terdiri dari beberapa shard. Layanan Log Sederhana mengalokasikan shard ke konsumen dalam kelompok konsumen berdasarkan aturan berikut:
Ketika konsumen baru ditambahkan ke kelompok konsumen, shard yang telah dialokasikan ke konsumen dalam kelompok akan dialokasikan ulang untuk load balancing. Shard dialokasikan ulang sesuai aturan sebelumnya.
Istilah
Istilah | Deskripsi |
kelompok konsumen | Anda dapat menggunakan kelompok konsumen untuk mengonsumsi data di Layanan Log Sederhana. Kelompok konsumen terdiri dari beberapa konsumen. Semua konsumen dalam kelompok konsumen mengonsumsi data di penyimpanan log yang sama. Konsumen tidak mengonsumsi data secara berulang.
Penting Anda dapat membuat hingga 30 kelompok konsumen untuk sebuah penyimpanan log. |
konsumen | Konsumen dalam kelompok konsumen mengonsumsi data.
Penting Nama konsumen dalam kelompok konsumen harus unik. |
Logstore | Penyimpanan log digunakan untuk mengumpulkan, menyimpan, dan menanyakan data. Untuk informasi lebih lanjut, lihat Logstore. |
shard | Shard digunakan untuk mengontrol kapasitas baca dan tulis penyimpanan log. Di Layanan Log Sederhana, data disimpan dalam shard. Untuk informasi lebih lanjut, lihat Shard. |
checkpoint | Checkpoint konsumsi adalah posisi di mana program berhenti mengonsumsi data. Jika program dimulai ulang, program akan mengonsumsi data dari checkpoint konsumsi terakhir.
Catatan Jika Anda menggunakan kelompok konsumen untuk mengonsumsi data, Layanan Log Sederhana secara otomatis menyimpan checkpoint konsumsi saat terjadi kesalahan dalam program Anda. Konsumen dapat melanjutkan konsumsi data dari checkpoint konsumsi tanpa mengonsumsi data secara berulang setelah program pulih. |
Langkah 1: Buat kelompok konsumen
Bagian ini menjelaskan cara menggunakan SDK Layanan Log Sederhana, API Layanan Log Sederhana, dan CLI Layanan Log Sederhana untuk membuat kelompok konsumen.
Gunakan SDK Layanan Log Sederhana
Contoh kode:
CreateConsumerGroup.java
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.exception.LogException;
public class CreateConsumerGroup {
public static void main(String[] args) throws LogException {
// Dalam contoh ini, ID AccessKey dan rahasia AccessKey diperoleh dari variabel lingkungan.
String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
// Nama proyek.
String projectName = "ali-test-project";
// Nama penyimpanan log.
String logstoreName = "ali-test-logstore";
// Titik akhir Layanan Log Sederhana. Dalam contoh ini, titik akhir Layanan Log Sederhana untuk wilayah China (Hangzhou) digunakan. Ganti nilai parameter dengan titik akhir yang sebenarnya.
String host = "https://cn-hangzhou.log.aliyuncs.com";
// Buat klien Layanan Log Sederhana.
Client client = new Client(host, accessId, accessKey);
try {
// Nama kelompok konsumen.
String consumerGroupName = "ali-test-consumergroup2";
System.out.println("siap untuk membuat consumergroup");
ConsumerGroup consumerGroup = new ConsumerGroup(consumerGroupName, 300, true);
client.CreateConsumerGroup(projectName, logstoreName, consumerGroup);
System.out.println(String.format("buat consumergroup %s berhasil", consumerGroupName));
} catch (LogException e) {
System.out.println("LogException e :" + e.toString());
System.out.println("kode kesalahan :" + e.GetErrorCode());
System.out.println("pesan kesalahan :" + e.GetErrorMessage());
throw e;
}
}
}
Untuk informasi lebih lanjut tentang contoh kode yang digunakan untuk mengelola kelompok konsumen, lihat Gunakan SDK Layanan Log Sederhana untuk Java untuk mengelola kelompok konsumen dan Gunakan SDK Layanan Log Sederhana untuk Python untuk mengelola kelompok konsumen.
Gunakan API Layanan Log Sederhana
Untuk informasi lebih lanjut tentang cara menggunakan API Layanan Log Sederhana untuk membuat kelompok konsumen, lihat CreateConsumerGroup.
Untuk informasi lebih lanjut tentang cara memeriksa apakah kelompok konsumen telah dibuat, lihat ListConsumerGroup.
Gunakan CLI Layanan Log Sederhana
Untuk informasi lebih lanjut tentang cara menggunakan CLI Layanan Log Sederhana untuk membuat kelompok konsumen, lihat create_consumer_group.
Untuk informasi lebih lanjut tentang cara memeriksa apakah kelompok konsumen telah dibuat, lihat list_consumer_group.
Langkah 2: Konsumsi data log
Cara kerjanya
Pertama kali Anda memanggil SDK Layanan Log Sederhana untuk Java untuk memulai konsumen, SDK akan membuat kelompok konsumen jika tidak menemukan kelompok konsumen tempat konsumen tersebut milik. Setelah kelompok konsumen dibuat, SDK mencatat checkpoint konsumsi awal dan mulai mengonsumsi data dari checkpoint tersebut. Checkpoint konsumsi awal menjadi tidak valid setelah konsumsi pertama kali. Saat konsumen dimulai ulang, konsumen melanjutkan konsumsi data dari checkpoint konsumsi terakhir yang disimpan oleh Layanan Log Sederhana. Contoh checkpoint konsumsi:
LogHubConfig.ConsumePosition.BEGIN_CURSOR: checkpoint konsumsi awal, yang menentukan log pertama dalam penyimpanan log. Konsumen memulai konsumsi dari data paling awal.
LogHubConfig.ConsumePosition.END_CURSOR: checkpoint konsumsi akhir, yang menentukan log terakhir dalam penyimpanan log.
Contoh
Anda dapat menggunakan SDK Layanan Log Sederhana untuk Java, C++, Python, atau Go untuk membuat kelompok konsumen dan mengonsumsi data. Dalam contoh ini, SDK Layanan Log Sederhana untuk Java digunakan.
Contoh 1: Gunakan SDK
Tambahkan dependensi Maven.
Buka file pom.xml dan tambahkan kode berikut:
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>loghub-client-lib</artifactId>
<version>0.6.50</version>
</dependency>
Tulis logika implementasi konsumsi data. Contoh kode:
SampleLogHubProcessor.java
import com.aliyun.openservices.log.common.FastLog;
import com.aliyun.openservices.log.common.FastLogContent;
import com.aliyun.openservices.log.common.FastLogGroup;
import com.aliyun.openservices.log.common.FastLogTag;
import com.aliyun.openservices.log.common.LogGroupData;
import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
import java.util.List;
public class SampleLogHubProcessor implements ILogHubProcessor {
private int shardId;
// Waktu checkpoint konsumsi terakhir disimpan.
private long mLastSaveTime = 0;
// Metode initialize dipanggil sekali saat objek prosesor diinisialisasi.
public void initialize(int shardId) {
this.shardId = shardId;
}
// Logika utama konsumsi data. Anda harus menyertakan kode untuk menangani semua pengecualian yang mungkin terjadi selama konsumsi data.
public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) {
// Tampilkan data yang diperoleh.
for (LogGroupData logGroup : logGroups) {
FastLogGroup fastLogGroup = logGroup.GetFastLogGroup();
System.out.println("Tags");
for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) {
FastLogTag logTag = fastLogGroup.getLogTags(i);
System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue());
}
for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) {
FastLog log = fastLogGroup.getLogs(i);
System.out.println("--------\nLog: " + i + ", waktu: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
for (int j = 0; j < log.getContentsCount(); ++j) {
FastLogContent content = log.getContents(j);
System.out.println(content.getKey() + "\t:\t" + content.getValue());
}
}
}
long curTime = System.currentTimeMillis();
// Checkpoint konsumsi ditulis ke Layanan Log Sederhana pada interval 30 detik. Jika instance ClientWorker berhenti secara tak terduga dalam 30 detik, instance ClientWorker yang baru dimulai mengonsumsi data dari checkpoint konsumsi terakhir. Sejumlah kecil data mungkin dikonsumsi berulang kali.
try {
if (curTime - mLastSaveTime > 30 * 1000) {
// Nilai true menunjukkan bahwa checkpoint konsumsi segera diperbarui ke Layanan Log Sederhana. Secara default, checkpoint konsumsi yang disimpan di memori secara otomatis diperbarui ke Layanan Log Sederhana pada interval 60 detik.
checkPointTracker.saveCheckPoint(true);
mLastSaveTime = curTime;
} else {
// Nilai false menunjukkan bahwa checkpoint konsumsi disimpan secara lokal dan dapat diperbarui ke Layanan Log Sederhana menggunakan mekanisme pembaruan checkpoint konsumsi otomatis.
checkPointTracker.saveCheckPoint(false);
}
} catch (LogHubCheckPointException e) {
e.printStackTrace();
}
return null;
}
// Fungsi shutdown dari instance ClientWorker dipanggil. Anda dapat mengelola checkpoint konsumsi.
public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
// Segera simpan checkpoint konsumsi ke Layanan Log Sederhana.
try {
checkPointTracker.saveCheckPoint(true);
} catch (LogHubCheckPointException e) {
e.printStackTrace();
}
}
}
Untuk informasi lebih lanjut tentang contoh kode, lihat aliyun-log-consumer-java dan Aliyun LOG Go Consumer.
Definisikan entitas konsumen. Contoh kode:
SampleLogHubProcessorFactory.java
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
class SampleLogHubProcessorFactory implements ILogHubProcessorFactory {
public ILogHubProcessor generatorProcessor() {
// Hasilkan konsumen. Setiap kali metode generatorProcessor dipanggil, objek SampleLogHubProcessor baru dikembalikan sesuai harapan.
return new SampleLogHubProcessor();
}
}
Buat kelompok konsumen dan mulai thread konsumen untuk memungkinkan konsumen dalam kelompok konsumen mengonsumsi data dalam penyimpanan log yang ditentukan. Contoh kode:
Main.java
import com.aliyun.openservices.loghub.client.ClientWorker;
import com.aliyun.openservices.loghub.client.config.LogHubConfig;
import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
public class Main {
// Titik akhir Layanan Log Sederhana. Masukkan titik akhir berdasarkan kebutuhan bisnis Anda.
private static String Endpoint = "cn-hangzhou.log.aliyuncs.com";
// Nama proyek Layanan Log Sederhana. Masukkan nama berdasarkan kebutuhan bisnis Anda. Anda harus memasukkan nama proyek yang ada.
private static String Project = "ali-test-project";
// Nama penyimpanan log. Masukkan nama berdasarkan kebutuhan bisnis Anda. Anda harus memasukkan nama penyimpanan log yang ada.
private static String Logstore = "ali-test-logstore";
// Nama kelompok konsumen. Masukkan nama berdasarkan kebutuhan bisnis Anda. Anda tidak perlu membuat kelompok konsumen terlebih dahulu. Kelompok konsumen dibuat secara otomatis saat program berjalan.
private static String ConsumerGroup = "ali-test-consumergroup2";
// Dalam contoh ini, ID AccessKey dan rahasia AccessKey diperoleh dari variabel lingkungan.
private static String AccessKeyId= System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
// consumer_1 menentukan nama konsumen. Nama setiap konsumen dalam kelompok konsumen harus unik. Jika konsumen berbeda memulai proses pada mesin berbeda untuk mengonsumsi data dalam penyimpanan log, Anda dapat menggunakan alamat IP mesin untuk mengidentifikasi setiap konsumen.
// maxFetchLogGroupSize menentukan jumlah maksimum grup log yang dapat diperoleh dari Layanan Log Sederhana sekaligus. Pertahankan nilai default. Anda dapat menggunakan config.setMaxFetchLogGroupSize(100); untuk mengubah jumlah maksimum. Nilai valid: (0,1000].
LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR,1000);
ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
Thread thread = new Thread(worker);
// Setelah instance Thread berjalan, instance ClientWorker secara otomatis berjalan dan memperluas antarmuka Runnable.
thread.start();
Thread.sleep(60 * 60 * 1000);
// Fungsi shutdown dari instance ClientWorker dipanggil untuk keluar dari konsumen. Instance Thread secara otomatis berhenti.
worker.shutdown();
// Beberapa tugas asinkron dihasilkan saat instance ClientWorker berjalan. Untuk memastikan bahwa semua tugas yang sedang berjalan berhenti dengan aman setelah shutdown, kami sarankan Anda mengatur Thread.sleep selama 30 detik.
Thread.sleep(30 * 1000);
}
}
Jalankan file Main.java.
Dalam contoh ini, log NGINX dikonsumsi dan hasil konsumsi ditampilkan.
: GET
request_uri : /request/path-3/file-7
status : 200
body_bytes_sent : 3820
host : www.example.com
request_time : 43
request_length : 1987
http_user_agent : Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36
http_referer : www.example.com
http_x_forwarded_for : 192.168.10.196
upstream_response_time : 0.02
--------
Log: 158, waktu: 1635629778, GetContentCount: 14
......
category : null
source : 127.0.0.1
topic : nginx_access_log
machineUUID : null
Tags
__receive_time__ : 1635629815
--------
Log: 0, waktu: 1635629788, GetContentCount: 14
......
category : null
source : 127.0.0.1
topic : nginx_access_log
machineUUID : null
Tags
__receive_time__ : 1635629877
--------
......
Contoh 2: Gunakan SDK dan SPL
Tambahkan dependensi Maven.
Buka file pom.xml dan tambahkan kode berikut:
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>loghub-client-lib</artifactId>
<version>0.6.47</version>
</dependency>
Tulis logika implementasi konsumsi data. Contoh kode:
SPLLogHubProcessor.java
import com.aliyun.openservices.log.common.FastLog;
import com.aliyun.openservices.log.common.FastLogContent;
import com.aliyun.openservices.log.common.FastLogGroup;
import com.aliyun.openservices.log.common.FastLogTag;
import com.aliyun.openservices.log.common.LogGroupData;
import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
import java.util.List;
public class SPLLogHubProcessor implements ILogHubProcessor {
private int shardId;
// Waktu checkpoint konsumsi terakhir disimpan.
private long mLastSaveTime = 0;
// Metode initialize dipanggil sekali saat objek prosesor diinisialisasi.
public void initialize(int shardId) {
this.shardId = shardId;
}
// Logika utama konsumsi data. Anda harus menyertakan kode untuk menangani semua pengecualian yang mungkin terjadi selama konsumsi data.
public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) {
// Tampilkan data yang diperoleh.
for (LogGroupData logGroup : logGroups) {
FastLogGroup fastLogGroup = logGroup.GetFastLogGroup();
System.out.println("Tags");
for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) {
FastLogTag logTag = fastLogGroup.getLogTags(i);
System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue());
}
for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) {
FastLog log = fastLogGroup.getLogs(i);
System.out.println("--------\nLog: " + i + ", waktu: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
for (int j = 0; j < log.getContentsCount(); ++j) {
FastLogContent content = log.getContents(j);
System.out.println(content.getKey() + "\t:\t" + content.getValue());
}
}
}
long curTime = System.currentTimeMillis();
// Checkpoint konsumsi ditulis ke Layanan Log Sederhana pada interval 30 detik. Jika instance ClientWorker berhenti secara tak terduga dalam 30 detik, instance ClientWorker yang baru dimulai mengonsumsi data dari checkpoint konsumsi terakhir. Sejumlah kecil data mungkin dikonsumsi berulang kali.
try {
if (curTime - mLastSaveTime > 30 * 1000) {
// Nilai true menunjukkan bahwa checkpoint konsumsi segera diperbarui ke Layanan Log Sederhana. Secara default, checkpoint konsumsi yang disimpan di memori secara otomatis diperbarui ke Layanan Log Sederhana pada interval 60 detik.
checkPointTracker.saveCheckPoint(true);
mLastSaveTime = curTime;
} else {
// Nilai false menunjukkan bahwa checkpoint konsumsi disimpan secara lokal dan dapat diperbarui ke Layanan Log Sederhana menggunakan mekanisme pembaruan checkpoint konsumsi otomatis.
checkPointTracker.saveCheckPoint(false);
}
} catch (LogHubCheckPointException e) {
e.printStackTrace();
}
return null;
}
// Fungsi shutdown dari instance ClientWorker dipanggil. Anda dapat mengelola checkpoint konsumsi.
public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
// Segera simpan checkpoint konsumsi ke Layanan Log Sederhana.
try {
checkPointTracker.saveCheckPoint(true);
} catch (LogHubCheckPointException e) {
e.printStackTrace();
}
}
}
Definisikan entitas konsumen. Contoh kode:
SPLLogHubProcessorFactory.java
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
class SPLLogHubProcessorFactory implements ILogHubProcessorFactory {
public ILogHubProcessor generatorProcessor() {
// Hasilkan konsumen. Setiap kali metode generatorProcessor dipanggil, objek SPLLogHubProcessor baru dikembalikan sesuai harapan.
return new SPLLogHubProcessor();
}
}
Buat kelompok konsumen dan mulai thread konsumen untuk memungkinkan konsumen dalam kelompok konsumen mengonsumsi data dalam penyimpanan log yang ditentukan. Contoh kode:
Main.java
import com.aliyun.openservices.loghub.client.ClientWorker;
import com.aliyun.openservices.loghub.client.config.LogHubConfig;
import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
public class Main {
// Titik akhir Layanan Log Sederhana. Masukkan titik akhir berdasarkan kebutuhan bisnis Anda.
private static String Endpoint = "cn-hangzhou.log.aliyuncs.com";
// Nama proyek Layanan Log Sederhana. Masukkan nama berdasarkan kebutuhan bisnis Anda. Anda harus memasukkan nama proyek yang ada.
private static String Project = "ali-test-project";
// Nama penyimpanan log. Masukkan nama berdasarkan kebutuhan bisnis Anda. Anda harus memasukkan nama penyimpanan log yang ada.
private static String Logstore = "ali-test-logstore";
// Nama kelompok konsumen. Masukkan nama berdasarkan kebutuhan bisnis Anda. Anda tidak perlu membuat kelompok konsumen terlebih dahulu. Kelompok konsumen dibuat secara otomatis saat program berjalan.
private static String ConsumerGroup = "ali-test-consumergroup2";
// Dalam contoh ini, ID AccessKey dan rahasia AccessKey diperoleh dari variabel lingkungan.
private static String AccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
// consumer_1 menentukan nama konsumen. Nama setiap konsumen dalam kelompok konsumen harus unik. Jika konsumen berbeda memulai proses pada mesin berbeda untuk mengonsumsi data dalam penyimpanan log, Anda dapat menggunakan alamat IP mesin untuk mengidentifikasi setiap konsumen.
// maxFetchLogGroupSize menentukan jumlah maksimum grup log yang dapat diperoleh dari Layanan Log Sederhana sekaligus. Pertahankan nilai default. Anda dapat menggunakan config.setMaxFetchLogGroupSize(100); untuk mengubah jumlah maksimum. Nilai valid: (0,1000].
LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR, 1000);
// Anda dapat menggunakan setQuery untuk menentukan pernyataan Bahasa Pemrosesan Log Sederhana (SPL) untuk konsumsi data.
config.setQuery("* | where cast(body_bytes_sent as bigint) > 14000");
ClientWorker worker = new ClientWorker(new SPLLogHubProcessorFactory(), config);
Thread thread = new Thread(worker);
// Setelah instance Thread berjalan, instance ClientWorker secara otomatis berjalan dan memperluas antarmuka Runnable.
thread.start();
Thread.sleep(60 * 60 * 1000);
// Fungsi shutdown dari instance ClientWorker dipanggil untuk keluar dari konsumen. Instance Thread secara otomatis berhenti.
worker.shutdown();
// Beberapa tugas asinkron dihasilkan saat instance ClientWorker berjalan. Untuk memastikan bahwa semua tugas yang sedang berjalan berhenti dengan aman setelah shutdown, kami sarankan Anda mengatur Thread.sleep selama 30 detik.
Thread.sleep(30 * 1000);
}
}
Jalankan file Main.java.
Dalam contoh ini, log NGINX dikonsumsi dan hasil konsumsi ditampilkan.
: GET
request_uri : /request/path-3/file-7
status : 200
body_bytes_sent : 3820
host : www.example.com
request_time : 43
request_length : 1987
http_user_agent : Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36
http_referer : www.example.com
http_x_forwarded_for : 192.168.10.196
upstream_response_time : 0.02
--------
Log: 158, waktu: 1635629778, GetContentCount: 14
......
category : null
source : 127.0.0.1
topic : nginx_access_log
machineUUID : null
Tags
__receive_time__ : 1635629815
--------
Log: 0, waktu: 1635629788, GetContentCount: 14
......
category : null
source : 127.0.0.1
topic : nginx_access_log
machineUUID : null
Tags
__receive_time__ : 1635629877
--------
......
Langkah 3: Lihat status kelompok konsumen
Bagian ini menjelaskan metode yang dapat Anda gunakan untuk melihat status kelompok konsumen.
Gunakan SDK Layanan Log Sederhana untuk Java
Lihat checkpoint konsumsi setiap shard. Contoh kode:
ConsumerGroupTest.java
import java.util.List;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.Consts.CursorMode;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.common.ConsumerGroupShardCheckPoint;
import com.aliyun.openservices.log.exception.LogException;
public class ConsumerGroupTest {
static String endpoint = "cn-hangzhou.log.aliyuncs.com";
static String project = "ali-test-project";
static String logstore = "ali-test-logstore";
static String accesskeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
static String accesskey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
public static void main(String[] args) throws LogException {
Client client = new Client(endpoint, accesskeyId, accesskey);
// Dapatkan semua kelompok konsumen yang dibuat untuk penyimpanan log. Jika tidak ada kelompok konsumen yang ada, string kosong dikembalikan.
List<ConsumerGroup> consumerGroups = client.ListConsumerGroup(project, logstore).GetConsumerGroups();
for(ConsumerGroup c: consumerGroups){
// Tampilkan atribut setiap kelompok konsumen, termasuk nama, periode timeout heartbeat, dan apakah data dikonsumsi secara berurutan.
System.out.println("Nama: " + c.getConsumerGroupName());
System.out.println("Periode timeout heartbeat " + c.getTimeout());
System.out.println("Konsumsi berurutan: " + c.isInOrder());
for(ConsumerGroupShardCheckPoint cp: client.GetCheckPoint(project, logstore, c.getConsumerGroupName()).GetCheckPoints()){
System.out.println("shard: " + cp.getShard());
// Waktu adalah bilangan bulat panjang dan akurat hingga mikrodetik.
System.out.println("Waktu checkpoint konsumsi terakhir diperbarui: " + cp.getUpdateTime());
System.out.println("Nama konsumen: " + cp.getConsumer());
String consumerPrg = "";
if(cp.getCheckPoint().isEmpty())
consumerPrg = "Konsumsi belum dimulai";
else{
// Timestamp UNIX. Unit: detik. Format nilai output timestamp.
try{
int prg = client.GetPrevCursorTime(project, logstore, cp.getShard(), cp.getCheckPoint()).GetCursorTime();
consumerPrg = "" + prg;
}
catch(LogException e){
if(e.GetErrorCode() == "InvalidCursor")
consumerPrg = "Tidak valid. Waktu checkpoint konsumsi terakhir diperbarui melebihi periode retensi data";
else{
// kesalahan server internal
throw e;
}
}
}
System.out.println("Checkpoint konsumsi: " + consumerPrg);
String endCursor = client.GetCursor(project, logstore, cp.getShard(), CursorMode.END).GetCursor();
int endPrg = 0;
try{
endPrg = client.GetPrevCursorTime(project, logstore, cp.getShard(), endCursor).GetCursorTime();
}
catch(LogException e){
// abaikan
}
// Timestamp UNIX. Unit: detik. Format nilai output timestamp.
System.out.println("Waktu data terakhir diterima: " + endPrg);
}
}
}
}
Lihat output. Contoh:
Nama: ali-test-consumergroup2
Periode timeout heartbeat: 60
Konsumsi berurutan: false
shard: 0
Waktu checkpoint konsumsi terakhir diperbarui: 0
Nama konsumen: consumer_1
Checkpoint konsumsi: Konsumsi belum dimulai
Waktu data terakhir diterima: 1729583617
shard: 1
Waktu checkpoint konsumsi terakhir diperbarui: 0
Nama konsumen: consumer_1
Checkpoint konsumsi: Konsumsi belum dimulai
Waktu data terakhir diterima: 1729583738
Proses selesai dengan kode keluar 0
Gunakan konsol Layanan Log Sederhana
Masuk ke konsol Layanan Log Sederhana.
Di bagian Proyek, klik yang ingin Anda kelola.

Pada tab , klik ikon
di sebelah penyimpanan log yang ingin Anda kelola. Kemudian, klik ikon
di sebelah Data Consumption.
Dalam daftar kelompok konsumen, klik kelompok konsumen yang ingin Anda kelola.
Di halaman Consumer Group Status, lihat checkpoint konsumsi setiap shard.
Apa yang harus dilakukan selanjutnya
Otorisasi pengguna RAM untuk melakukan operasi pada kelompok konsumen
Sebelum menggunakan pengguna Resource Access Management (RAM) untuk mengelola kelompok konsumen, Anda harus memberikan izin yang diperlukan kepada pengguna RAM. Untuk informasi lebih lanjut, lihat Buat pengguna RAM dan otorisasi pengguna RAM untuk mengakses Layanan Log Sederhana.
Tabel berikut menjelaskan tindakan yang dapat Anda otorisasi kepada pengguna RAM.
Tindakan | Deskripsi | Sumber daya |
log:GetCursorOrData(GetCursor) | Kueri kursor berdasarkan waktu pembuatan log. | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName} |
log:CreateConsumerGroup(CreateConsumerGroup) | Membuat kelompok konsumen untuk penyimpanan log. | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
log:ListConsumerGroup(ListConsumerGroup) | Kueri semua kelompok konsumen dari penyimpanan log. | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/* |
log:ConsumerGroupUpdateCheckPoint(ConsumerGroupUpdateCheckPoint) | Memperbarui checkpoint konsumsi untuk shard yang dialokasikan ke kelompok konsumen. | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
log:ConsumerGroupHeartBeat(ConsumerGroupHeartBeat) | Mengirim pesan heartbeat untuk konsumen ke Layanan Log Sederhana. | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
log:UpdateConsumerGroup(UpdateConsumerGroup) | Memodifikasi atribut kelompok konsumen. | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
log:GetConsumerGroupCheckPoint(GetCheckPoint) | Kueri checkpoint konsumsi untuk satu atau semua shard yang dialokasikan ke kelompok konsumen. | acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName} |
Berikut adalah informasi sumber daya terkait kelompok konsumen. Untuk memberikan izin kepada pengguna RAM agar dapat melakukan operasi pada kelompok konsumen, gunakan kode berikut sebagai referensi:
ID akun Alibaba Cloud tempat proyek berada: 174649****602745
ID wilayah tempat proyek berada: cn-hangzhou
Nama proyek: project-test
Nama penyimpanan log: logstore-test
Nama kelompok konsumen: consumergroup-test
Contoh kode:
{
"Version": "1",
"Statement": [
{
"Effect": "Allow",
"Action": [
"log:GetCursorOrData"
],
"Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test"
},
{
"Effect": "Allow",
"Action": [
"log:CreateConsumerGroup",
"log:ListConsumerGroup"
],
"Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/*"
},
{
"Effect": "Allow",
"Action": [
"log:ConsumerGroupUpdateCheckPoint",
"log:ConsumerGroupHeartBeat",
"log:UpdateConsumerGroup",
"log:GetConsumerGroupCheckPoint"
],
"Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/consumergroup-test"
}
]
}
Konfigurasikan Log4j untuk pemecahan masalah.
Kami sarankan Anda mengonfigurasi Log4j untuk program konsumen agar dapat menampilkan pesan kesalahan saat terjadi pengecualian dalam kelompok konsumen. Ini membantu Anda memecahkan masalah kesalahan tersebut. Berikut adalah contoh file konfigurasi log4j.properties umum:
log4j.rootLogger = info,stdout
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n
Setelah mengonfigurasi Log4j, Anda dapat menerima pesan kesalahan saat menjalankan program konsumen. Contoh berikut menunjukkan pesan kesalahan:
[WARN ] 2018-03-14 12:01:52,747 method:com.aliyun.openservices.loghub.client.LogHubConsumer.sampleLogError(LogHubConsumer.java:159)
com.aliyun.openservices.log.exception.LogException: Invalid loggroup count, (0,1000]
Gunakan kelompok konsumen untuk mengonsumsi data yang dihasilkan setelah titik waktu tertentu
// consumerStartTimeInSeconds menentukan titik waktu. Data yang dihasilkan setelah titik waktu tersebut dikonsumsi.
public LogHubConfig(String consumerGroupName,
String consumerName,
String loghubEndPoint,
String project, String logStore,
String accessId, String accessKey,
int consumerStartTimeInSeconds);
// position adalah variabel enumerasi. LogHubConfig.ConsumePosition.BEGIN_CURSOR menentukan bahwa konsumsi dimulai dari data paling awal. LogHubConfig.ConsumePosition.END_CURSOR menentukan bahwa konsumsi dimulai dari data paling baru.
public LogHubConfig(String consumerGroupName,
String consumerName,
String loghubEndPoint,
String project, String logStore,
String accessId, String accessKey,
ConsumePosition position);
Catatan Anda dapat menggunakan konstruktor yang berbeda sesuai dengan kebutuhan bisnis Anda.
Jika checkpoint konsumsi disimpan di Layanan Log Sederhana, konsumsi data dimulai dari checkpoint tersebut.
Saat Layanan Log Sederhana mengonsumsi data, checkpoint konsumsi diprioritaskan untuk memulai konsumsi data. Jika Anda ingin menentukan titik waktu dari mana Layanan Log Sederhana mulai mengonsumsi data, pastikan nilai consumerStartTimeInSeconds berada dalam periode time-to-live (TTL). Jika tidak, Layanan Log Sederhana tidak dapat mengonsumsi data sesuai konfigurasi Anda.
Reset checkpoint konsumsi
public static void updateCheckpoint() throws Exception {
Client client = new Client(host, accessId, accessKey);
// Tentukan timestamp UNIX yang akurat hingga detik. Jika Anda menentukan timestamp dalam milidetik, bagi timestamp tersebut dengan 1000. Contoh:
long timestamp = Timestamp.valueOf("2017-11-15 00:00:00").getTime() / 1000;
ListShardResponse response = client.ListShard(new ListShardRequest(project, logStore));
for (Shard shard : response.GetShards()) {
int shardId = shard.GetShardId();
String cursor = client.GetCursor(project, logStore, shardId, timestamp).GetCursor();
client.UpdateCheckPoint(project, logStore, consumerGroup, shardId, cursor);
}
}
Referensi
API
SDK
Bahasa pemrograman | Referensi |
Java | |
Python | |
CLI