Dengan menggunakan kelompok konsumen untuk mengonsumsi data, Anda dapat fokus pada logika bisnis tanpa harus menangani detail implementasi seperti penyeimbangan beban dan failover. Topik ini menjelaskan cara menggunakan Simple Log Service SDK untuk Java, Python, atau Go dalam mengonsumsi log dari penyimpanan log dengan kelompok konsumen yang dikonfigurasi menggunakan prosesor konsumen.
Prasyarat
Pengguna Resource Access Management (RAM) telah dibuat dan diberikan izin yang diperlukan. Untuk informasi lebih lanjut, lihat Buat Pengguna RAM dan Berikan Izin.
Variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi. Untuk informasi lebih lanjut, lihat Konfigurasikan Variabel Lingkungan di Linux, macOS, dan Windows.
PentingPasangan Kunci Akses akun Alibaba Cloud memiliki izin untuk semua Operasi API. Kami sarankan Anda menggunakan Pasangan Kunci Akses Pengguna RAM untuk memanggil Operasi API atau melakukan tugas O&M rutin.
Jangan sertakan ID AccessKey atau Rahasia AccessKey Anda dalam kode proyek Anda. Jika salah satunya bocor, keamanan semua sumber daya dalam akun Anda mungkin terganggu.
Kode contoh
Java
Tambahkan dependensi Maven.
Paket com.aliyun.openservices harus versi 0.6.51 atau lebih baru.
<dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>loghub-client-lib</artifactId> <version>0.6.51</version> </dependency>Buat logika konsumsi data. Kode berikut adalah contoh dari
SPLLogHubProcessor.java.import com.aliyun.openservices.log.common.FastLog; import com.aliyun.openservices.log.common.FastLogContent; import com.aliyun.openservices.log.common.FastLogGroup; import com.aliyun.openservices.log.common.FastLogTag; import com.aliyun.openservices.log.common.LogGroupData; import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker; import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException; import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor; import java.util.List; public class SPLLogHubProcessor implements ILogHubProcessor { private int shardId; // Waktu checkpoint terakhir disimpan. private long mLastSaveTime = 0; // Metode initialize dipanggil sekali saat objek prosesor diinisialisasi. public void initialize(int shardId) { this.shardId = shardId; } // Logika utama untuk konsumsi data. Tangani semua pengecualian yang terjadi selama konsumsi. Jangan langsung melempar pengecualian. public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) { // Cetak data yang diambil. for (LogGroupData logGroup : logGroups) { FastLogGroup fastLogGroup = logGroup.GetFastLogGroup(); System.out.println("Tags"); for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) { FastLogTag logTag = fastLogGroup.getLogTags(i); System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue()); } for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) { FastLog log = fastLogGroup.getLogs(i); System.out.println("--------\nLog: " + i + ", waktu: " + log.getTime() + ", GetContentCount: " + log.getContentsCount()); for (int j = 0; j < log.getContentsCount(); ++j) { FastLogContent content = log.getContents(j); System.out.println(content.getKey() + "\t:\t" + content.getValue()); } } } long curTime = System.currentTimeMillis(); // Tulis checkpoint ke server setiap 30 detik. Jika pekerja berhenti secara tidak terduga dalam 30 detik, pekerja baru mulai mengonsumsi data dari checkpoint terakhir. Sejumlah kecil data mungkin dikonsumsi ulang. try { if (curTime - mLastSaveTime > 30 * 1000) { // Parameter true menunjukkan bahwa checkpoint segera diperbarui ke server. Secara default, checkpoint yang disimpan di memori secara otomatis diperbarui ke server setiap 60 detik. checkPointTracker.saveCheckPoint(true); mLastSaveTime = curTime; } else { // Parameter false menunjukkan bahwa checkpoint disimpan secara lokal. Checkpoint dapat diperbarui ke server oleh mekanisme pembaruan otomatis. checkPointTracker.saveCheckPoint(false); } } catch (LogHubCheckPointException e) { e.printStackTrace(); } return null; } // Fungsi ini dipanggil saat pekerja keluar. Anda dapat melakukan tugas pembersihan di sini. public void shutdown(ILogHubCheckPointTracker checkPointTracker) { // Segera simpan checkpoint ke server. try { checkPointTracker.saveCheckPoint(true); } catch (LogHubCheckPointException e) { e.printStackTrace(); } } }Buat entitas konsumen. Kode berikut adalah contoh dari
SPLLogHubProcessorFactory.java.import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor; import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory; class SPLLogHubProcessorFactory implements ILogHubProcessorFactory { public ILogHubProcessor generatorProcessor() { // Hasilkan instance konsumen. Catatan: Setiap kali metode generatorProcessor dipanggil, objek SPLLogHubProcessor baru harus dikembalikan. return new SPLLogHubProcessor(); } }Buat konsumen dan mulai thread konsumen. Konsumen mengonsumsi data dari penyimpanan log yang ditentukan.
import com.aliyun.openservices.loghub.client.ClientWorker; import com.aliyun.openservices.loghub.client.config.LogHubConfig; import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException; public class Main { // Titik akhir Layanan Log Sederhana. Ganti nilainya dengan titik akhir aktual. private static String Endpoint = "cn-hangzhou.log.aliyuncs.com"; // Nama Proyek Layanan Log Sederhana. Ganti nilainya dengan nama proyek aktual. Dapatkan nama proyek dari proyek yang ada. private static String Project = "ali-test-project"; // Nama penyimpanan log. Ganti nilainya dengan nama penyimpanan log aktual. Dapatkan nama penyimpanan log dari penyimpanan log yang ada. private static String Logstore = "ali-test-logstore"; // Nama kelompok konsumen. Ganti nilainya dengan nama kelompok konsumen aktual. Anda tidak perlu membuat kelompok konsumen terlebih dahulu. Program secara otomatis membuatnya saat runtime. private static String ConsumerGroup = "ali-test-consumergroup2"; // Konten baru // Nama prosesor konsumen. Buat prosesor di Konsol atau dengan memanggil API prosesor konsumen. private static String ConsumeProcessor = "test-consumer-processor"; // Akhir konten baru // Contoh ini mendapatkan ID AccessKey dan Rahasia AccessKey dari variabel lingkungan. private static String AccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException { // consumer_1 adalah nama konsumen. Nama konsumen dalam kelompok konsumen yang sama harus unik. Jika konsumen berbeda memulai beberapa proses di mesin berbeda untuk mengonsumsi data dari Logstore secara seimbang, Anda dapat menggunakan alamat IP mesin sebagai nama konsumen. // maxFetchLogGroupSize menentukan jumlah maksimum grup log yang dapat diambil dari server sekaligus. Anda dapat menggunakan nilai default. Untuk mengubah nilai, gunakan config.setMaxFetchLogGroupSize(100);. Nilai harus dalam rentang (0, 1000]. LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR, 1000); // setProcessor menentukan prosesor konsumen untuk konsumsi data. config.setProcessor(ConsumeProcessor); ClientWorker worker = new ClientWorker(new SPLLogHubProcessorFactory(), config); Thread thread = new Thread(worker); // Setelah thread berjalan, ClientWorker secara otomatis berjalan. ClientWorker memperluas antarmuka Runnable. thread.start(); Thread.sleep(60 * 60 * 1000); // Panggil fungsi shutdown pekerja untuk keluar dari instance konsumen. Thread terkait juga berhenti secara otomatis. worker.shutdown(); // Beberapa tugas asinkron dihasilkan saat ClientWorker berjalan. Setelah shutdown selesai, tunggu tugas yang sedang berjalan untuk keluar dengan aman. Kami sarankan Anda mengatur waktu tidur menjadi 30 detik. Thread.sleep(30 * 1000); } }Jalankan
Main.java. Contoh ini mengonsumsi log NGINX. Log berikut dicetak.: GET request_uri : /request/path-3/file-7 status : 200 body_bytes_sent : 3820 host : www.example.com request_time : 43 request_length : 1987 http_user_agent : Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36 http_referer : www.example.com http_x_forwarded_for : 192.168.xx.xxx upstream_response_time : 0.02 -------- Log: 158, waktu: 1635629778, GetContentCount: 14 ...... category : null source : 127.0.0.1 topic : nginx_access_log machineUUID : null Tags __receive_time__ : 1635629815 -------- Log: 0, waktu: 1635629788, GetContentCount: 14 ...... category : null source : 127.0.0.1 topic : nginx_access_log
Python
Instal Simple Log Service SDK untuk Python. Buat folder proyek bernama spl_consumer_demo dan jalankan perintah berikut di folder tersebut. Untuk informasi lebih lanjut, lihat Instal Simple Log Service SDK untuk Python.
Simple Log Service SDK untuk Python harus versi 0.9.28 atau lebih baru.
pip install -U aliyun-log-python-sdkDi folder spl_consumer_demo, buat file main.py. Buat kelompok konsumen dan mulai thread konsumen. Konsumen mengonsumsi data dari penyimpanan log yang ditentukan.
import os import time from aliyun.log.consumer import * from aliyun.log import * class SPLConsumer(ConsumerProcessorBase): shard_id = -1 last_check_time = 0 def initialize(self, shard): self.shard_id = shard def process(self, log_groups, check_point_tracker): for log_group in log_groups.LogGroups: items = [] for log in log_group.Logs: item = dict() item['time'] = log.Time for content in log.Contents: item[content.Key] = content.Value items.append(item) log_items = dict() log_items['topic'] = log_group.Topic log_items['source'] = log_group.Source log_items['logs'] = items print(log_items) current_time = time.time() if current_time - self.last_check_time > 3: try: self.last_check_time = current_time check_point_tracker.save_check_point(True) except Exception: import traceback traceback.print_exc() else: try: check_point_tracker.save_check_point(False) except Exception: import traceback traceback.print_exc() # None berarti proses berhasil # jika perlu kembali ke checkpoint sebelumnya,return check_point_tracker.get_check_point() return None def shutdown(self, check_point_tracker): try: check_point_tracker.save_check_point(True) except Exception: import traceback traceback.print_exc() def sleep_until(seconds, exit_condition=None, expect_error=False): if not exit_condition: time.sleep(seconds) return s = time.time() while time.time() - s < seconds: try: if exit_condition(): break except Exception: if expect_error: continue time.sleep(1) def spl_consumer_group(): # Titik akhir Layanan Log Sederhana. Contoh ini menggunakan titik akhir wilayah China (Hangzhou). Ganti nilainya dengan titik akhir aktual. endpoint = os.environ.get('ALIYUN_LOG_SAMPLE_ENDPOINT', 'cn-hangzhou.log.aliyuncs.com') # Contoh ini mendapatkan ID AccessKey dan Rahasia AccessKey dari variabel lingkungan. access_key_id = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '') access_key = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '') project = 'your_project' logstore = 'your_logstore' # Nama kelompok konsumen. Anda tidak perlu membuat kelompok konsumen terlebih dahulu. SDK secara otomatis membuatnya. consumer_group = 'consumer-group' consumer_name = "consumer-group-name" # Buat dua konsumen dalam kelompok konsumen untuk mengonsumsi data. option = LogHubConfig(endpoint, access_key_id, access_key, project, logstore, consumer_group, consumer_name, processor="test-consume-processor", cursor_position=CursorPosition.BEGIN_CURSOR, heartbeat_interval=6, data_fetch_interval=1) print("*** mulai mengonsumsi data...") client_worker = ConsumerWorker(SPLConsumer, consumer_option=option) client_worker.start() time.sleep(10000) if __name__ == '__main__': spl_consumer_group()Jalankan main.py di folder spl_consumer_demo dan lihat hasilnya.
python main.py
Go
Instal Go SDK. Buat folder proyek bernama spl_demo dan jalankan perintah berikut di folder tersebut. Untuk informasi lebih lanjut, lihat Instal Go SDK.
Simple Log Service SDK untuk Go harus versi v0.1.107 atau lebih baru.
go get -u github.com/aliyun/aliyun-log-go-sdkDi folder spl_demo, buat file main.go. Buat kelompok konsumen dan mulai thread konsumen. Konsumen mengonsumsi data dari penyimpanan log yang ditentukan.
package main import ( "fmt" "os" "os/signal" "syscall" sls "github.com/aliyun/aliyun-log-go-sdk" consumerLibrary "github.com/aliyun/aliyun-log-go-sdk/consumer" "github.com/go-kit/kit/log/level" ) // README : // Ini adalah contoh yang sangat sederhana untuk menarik data dari penyimpanan log Anda dan mencetaknya untuk dikonsumsi, termasuk pra-penanganan untuk log. func main() { // Titik akhir Layanan Log Sederhana. Contoh ini menggunakan titik akhir wilayah China (Hangzhou). Ganti nilainya dengan titik akhir aktual. option := consumerLibrary.LogHubConfig{ Endpoint: "cn-hangzhou.log.aliyuncs.com", AccessKeyID: os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), AccessKeySecret: os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), Project: "your_project", Logstore: "your_logstore", ConsumerGroupName: "test-spl-cg", ConsumerName: "test-spl-consumer", // Opsi ini digunakan untuk inisialisasi, akan diabaikan setelah kelompok konsumen dibuat dan setiap shard mulai dikonsumsi. // Bisa "begin", "end", "format waktu spesifik dalam timestamp", yaitu waktu penerimaan log. CursorPosition: consumerLibrary.END_CURSOR, // Processor digunakan untuk pra-penanganan log sebelum log dikembalikan ke klien. Untuk informasi lebih lanjut, lihat https://www.alibabacloud.com/help/en/sls/user-guide/rule-based-consumption Processor: "test-consume-processor", } consumerWorker := consumerLibrary.InitConsumerWorkerWithCheckpointTracker(option, process) ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) consumerWorker.Start() if _, ok := <-ch; ok { level.Info(consumerWorker.Logger).Log("msg", "dapatkan sinyal stop, mulai menghentikan pekerja konsumen", "nama pekerja konsumen", option.ConsumerName) consumerWorker.StopAndWait() } } // Isi logika konsumsi Anda di sini, dan berhati-hatilah untuk tidak mengubah parameter fungsi dan nilai kembali, // jika tidak, akan muncul kesalahan. func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker consumerLibrary.CheckPointTracker) (string, error) { fmt.Println(shardId, "loggroup", len(logGroupList.LogGroups)) checkpointTracker.SaveCheckPoint(false) return "", nil }Jalankan perintah berikut di folder spl_demo untuk menginstal dependensi.
go mod tidy go mod vendorJalankan fungsi utama dan lihat output.
go run main.go