All Products
Search
Document Center

AnalyticDB:Impor\ data\ ke\ AnalyticDB\ for\ MySQL\ menggunakan\ Logstash

Last Updated:Mar 29, 2026

Jika data Anda sudah mengalir melalui pipeline Apache Kafka, Anda dapat mengarahkannya langsung ke AnalyticDB for MySQL Data Warehouse Edition (V3.0) menggunakan Logstash. Karena AnalyticDB for MySQL sepenuhnya kompatibel dengan MySQL, plug-in output JDBC Logstash standar dapat terhubung tanpa memerlukan driver atau adaptor tambahan.

Cara Logstash memproses data

Logstash memindahkan data melalui tiga tahap plug-in secara berurutan:

TahapJenis plug-inPeran
CollectInputMembaca event dari sumber (Kafka, file, HTTP, dan lainnya)
TransformFilterMengurai, memperkaya, dan membentuk ulang event (penguraian Grok, geolokasi IP, anonimisasi PII)
WriteOutputMengirimkan event yang telah ditransformasikan ke destinasi (AnalyticDB for MySQL melalui JDBC)

Prasyarat

Sebelum memulai, pastikan Anda telah memiliki:

  • Logstash versi 1.5 atau lebih baru (versi 1.5 memperkenalkan integrasi bawaan dengan Apache Kafka)

  • Kluster Apache Kafka dengan setidaknya satu topik yang berisi data untuk diimpor

  • Titik akhir kluster AnalyticDB for MySQL Data Warehouse Edition (V3.0), nama database, username, dan password

  • Driver JDBC MySQL (com.mysql.jdbc.Driver) pada classpath Logstash

Impor data Kafka ke AnalyticDB for MySQL

Langkah 1: Instal dan perbarui plug-in

Di direktori root server Apache Kafka, jalankan:

bin/plugin install
bin/plugin update

Untuk daftar lengkap plug-in Logstash yang tersedia, lihat repositori plug-in Logstash di GitHub.

Langkah 2: Konfigurasikan pipeline

Buat file konfigurasi Logstash, misalnya config/kafka-to-adb.conf, dengan blok input dan blok output.

Konfigurasi input

Contoh berikut mengonfigurasi konsumen Kafka:

input {
    kafka {
        zk_connect => "localhost:2181"
        group_id => "Logstash"
        topic_id => "test"
        codec => plain
        reset_beginning => false # boolean (opsional), default: false
        consumer_threads => 5  # number (opsional), default: 1
        decorate_events => true # boolean (opsional), default: false
    }
}

Parameter input

ParameterTipeWajibDefaultDeskripsi
zk_connectstringYaString koneksi ZooKeeper, contohnya localhost:2181.
group_idstringYaID kelompok konsumen. Konsumsi antar kelompok konsumen berbeda saling terisolasi satu sama lain.
topic_idstringYaTopik Kafka yang akan di-subscribe dan dikonsumsi.
reset_beginningbooleanTidakfalsePosisi offset saat Logstash dimulai: false melanjutkan dari offset terakhir yang dikomit (atau offset paling awal jika belum ada offset sebelumnya); true dimulai dari offset paling awal dan beralih ke mode mengikuti (tail -F) setelah mengonsumsi pesan terakhir.
consumer_threadsnumberTidak1Jumlah thread konsumen paralel.
decorate_eventsbooleanTidakfalseJika true, menyambungkan metadata ke setiap event: ukuran pesan, sumber topik, dan kelompok konsumen.
rebalance_max_retriesnumberTidakJumlah percobaan ulang untuk mendaftarkan node pemilik partisi di ZooKeeper selama rebalance kelompok konsumen.
consumer_timeout_msnumberTidakPeriode timeout untuk menerima pesan. Ubah nilai ini hanya jika nilai default menyebabkan masalah di lingkungan Anda.

Untuk referensi parameter lengkap, lihat README logstash-kafka di GitHub.

Catatan

Untuk mengonsumsi pesan dari topik yang sama secara paralel, bagi topik tersebut menjadi beberapa partisi dan tetapkan group_id dan topic_id yang sama ke setiap konsumen. Hal ini memastikan pesan dikonsumsi secara berurutan.

Konfigurasi output

Contoh berikut menulis event ke AnalyticDB for MySQL menggunakan plug-in output JDBC:

output {
    jdbc {
        driver_class => "com.mysql.jdbc.Driver"
        connection_string => "jdbc:mysql://HOSTNAME/DATABASE?user=USER&password=PASSWORD"
        statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
    }
}

Ganti placeholder dalam connection_string:

PlaceholderDeskripsi
HOSTNAMETitik akhir kluster AnalyticDB for MySQL
DATABASENama database tujuan
USERUsername database
PASSWORDKata sandi database

Parameter output

ParameterTipeWajibDeskripsi
driver_classstringYaNama kelas driver JDBC. Gunakan com.mysql.jdbc.Driver untuk AnalyticDB for MySQL.
connection_stringstringYaURL koneksi JDBC untuk kluster AnalyticDB for MySQL.
statementarrayYaArray di mana elemen pertama adalah pernyataan INSERT dengan placeholder ?, diikuti oleh nama bidang Logstash yang dipetakan ke setiap placeholder secara berurutan.

Untuk referensi parameter lengkap, lihat README logstash-kafka di GitHub.

Langkah 3: Jalankan pipeline

Di direktori instalasi Logstash, jalankan:

bin/Logstash -f config/kafka-to-adb.conf

Logstash mulai mengonsumsi pesan dari topik Kafka yang dikonfigurasi dan menuliskannya ke AnalyticDB for MySQL.