Jika data Anda sudah mengalir melalui pipeline Apache Kafka, Anda dapat mengarahkannya langsung ke AnalyticDB for MySQL Data Warehouse Edition (V3.0) menggunakan Logstash. Karena AnalyticDB for MySQL sepenuhnya kompatibel dengan MySQL, plug-in output JDBC Logstash standar dapat terhubung tanpa memerlukan driver atau adaptor tambahan.
Cara Logstash memproses data
Logstash memindahkan data melalui tiga tahap plug-in secara berurutan:
| Tahap | Jenis plug-in | Peran |
|---|---|---|
| Collect | Input | Membaca event dari sumber (Kafka, file, HTTP, dan lainnya) |
| Transform | Filter | Mengurai, memperkaya, dan membentuk ulang event (penguraian Grok, geolokasi IP, anonimisasi PII) |
| Write | Output | Mengirimkan event yang telah ditransformasikan ke destinasi (AnalyticDB for MySQL melalui JDBC) |
Prasyarat
Sebelum memulai, pastikan Anda telah memiliki:
Logstash versi 1.5 atau lebih baru (versi 1.5 memperkenalkan integrasi bawaan dengan Apache Kafka)
Kluster Apache Kafka dengan setidaknya satu topik yang berisi data untuk diimpor
Titik akhir kluster AnalyticDB for MySQL Data Warehouse Edition (V3.0), nama database, username, dan password
Driver JDBC MySQL (
com.mysql.jdbc.Driver) pada classpath Logstash
Impor data Kafka ke AnalyticDB for MySQL
Langkah 1: Instal dan perbarui plug-in
Di direktori root server Apache Kafka, jalankan:
bin/plugin install
bin/plugin updateUntuk daftar lengkap plug-in Logstash yang tersedia, lihat repositori plug-in Logstash di GitHub.
Langkah 2: Konfigurasikan pipeline
Buat file konfigurasi Logstash, misalnya config/kafka-to-adb.conf, dengan blok input dan blok output.
Konfigurasi input
Contoh berikut mengonfigurasi konsumen Kafka:
input {
kafka {
zk_connect => "localhost:2181"
group_id => "Logstash"
topic_id => "test"
codec => plain
reset_beginning => false # boolean (opsional), default: false
consumer_threads => 5 # number (opsional), default: 1
decorate_events => true # boolean (opsional), default: false
}
}Parameter input
| Parameter | Tipe | Wajib | Default | Deskripsi |
|---|---|---|---|---|
zk_connect | string | Ya | — | String koneksi ZooKeeper, contohnya localhost:2181. |
group_id | string | Ya | — | ID kelompok konsumen. Konsumsi antar kelompok konsumen berbeda saling terisolasi satu sama lain. |
topic_id | string | Ya | — | Topik Kafka yang akan di-subscribe dan dikonsumsi. |
reset_beginning | boolean | Tidak | false | Posisi offset saat Logstash dimulai: false melanjutkan dari offset terakhir yang dikomit (atau offset paling awal jika belum ada offset sebelumnya); true dimulai dari offset paling awal dan beralih ke mode mengikuti (tail -F) setelah mengonsumsi pesan terakhir. |
consumer_threads | number | Tidak | 1 | Jumlah thread konsumen paralel. |
decorate_events | boolean | Tidak | false | Jika true, menyambungkan metadata ke setiap event: ukuran pesan, sumber topik, dan kelompok konsumen. |
rebalance_max_retries | number | Tidak | — | Jumlah percobaan ulang untuk mendaftarkan node pemilik partisi di ZooKeeper selama rebalance kelompok konsumen. |
consumer_timeout_ms | number | Tidak | — | Periode timeout untuk menerima pesan. Ubah nilai ini hanya jika nilai default menyebabkan masalah di lingkungan Anda. |
Untuk referensi parameter lengkap, lihat README logstash-kafka di GitHub.
Untuk mengonsumsi pesan dari topik yang sama secara paralel, bagi topik tersebut menjadi beberapa partisi dan tetapkan group_id dan topic_id yang sama ke setiap konsumen. Hal ini memastikan pesan dikonsumsi secara berurutan.
Konfigurasi output
Contoh berikut menulis event ke AnalyticDB for MySQL menggunakan plug-in output JDBC:
output {
jdbc {
driver_class => "com.mysql.jdbc.Driver"
connection_string => "jdbc:mysql://HOSTNAME/DATABASE?user=USER&password=PASSWORD"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
}
}Ganti placeholder dalam connection_string:
| Placeholder | Deskripsi |
|---|---|
HOSTNAME | Titik akhir kluster AnalyticDB for MySQL |
DATABASE | Nama database tujuan |
USER | Username database |
PASSWORD | Kata sandi database |
Parameter output
| Parameter | Tipe | Wajib | Deskripsi |
|---|---|---|---|
driver_class | string | Ya | Nama kelas driver JDBC. Gunakan com.mysql.jdbc.Driver untuk AnalyticDB for MySQL. |
connection_string | string | Ya | URL koneksi JDBC untuk kluster AnalyticDB for MySQL. |
statement | array | Ya | Array di mana elemen pertama adalah pernyataan INSERT dengan placeholder ?, diikuti oleh nama bidang Logstash yang dipetakan ke setiap placeholder secara berurutan. |
Untuk referensi parameter lengkap, lihat README logstash-kafka di GitHub.
Langkah 3: Jalankan pipeline
Di direktori instalasi Logstash, jalankan:
bin/Logstash -f config/kafka-to-adb.confLogstash mulai mengonsumsi pesan dari topik Kafka yang dikonfigurasi dan menuliskannya ke AnalyticDB for MySQL.