ApsaraDB for SelectDB dapat secara otomatis berlangganan data di Kafka dan menyinkronkan data dari Kafka menggunakan Doris Kafka Connector. Topik ini menjelaskan cara menggunakan Doris Kafka Connector untuk menyinkronkan data dari sumber data Kafka ke ApsaraDB for SelectDB.
Informasi latar belakang
Kafka Connect adalah alat yang andal untuk mentransfer data antara Apache Kafka dan sistem lainnya. Anda dapat mendefinisikan konektor untuk mengimpor atau mengekspor data dari sumber data Kafka.
Doris Kafka Connector yang disediakan oleh Apache Doris berjalan di kluster Kafka Connect. Doris Kafka Connector dapat membaca data dari topik Kafka dan menulis data tersebut ke ApsaraDB for SelectDB.
Dalam skenario bisnis, Anda mungkin mendorong data terbaru dalam database ke Kafka menggunakan Debezium Connector atau memanggil operasi API untuk menulis data berformat JSON ke Kafka secara real-time. Kemudian, Anda dapat menggunakan Doris Kafka Connector untuk secara otomatis berlangganan data di Kafka dan menyinkronkan data tersebut ke ApsaraDB for SelectDB.
Mode operasi Kafka Connect
Kafka Connect menyediakan mode standalone dan mode terdistribusi. Anda dapat memilih mode operasi sesuai dengan kebutuhan bisnis Anda.
Mode standalone
Kami merekomendasikan agar Anda tidak menggunakan mode standalone di lingkungan produksi.
Konfigurasikan mode standalone
Konfigurasikan file connect-standalone.properties.
# Ubah alamat IP broker.
bootstrap.servers=127.0.0.1:9092Buat file connect-selectdb-sink.properties di direktori konfigurasi Kafka dan konfigurasikan item berikut di dalam file:
name=test-selectdb-sink
connector.class=org.apache.doris.kafka.connector.DorisSinkConnector
topics=topic_test
doris.topic2table.map=topic_test:test_kafka_tbl
buffer.count.records=10000
buffer.flush.time=120
buffer.size.bytes=5000000
doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com
doris.http.port=8030
doris.query.port=9030
doris.user=admin
doris.password=****
doris.database=test_db
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverterAktifkan mode standalone
$KAFKA_HOME/bin/connect-standalone.sh -daemon $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-selectdb-sink.propertiesMode terdistribusi
Konfigurasikan mode terdistribusi
Konfigurasikan file connect-distributed.properties.
# Ubah alamat IP broker.
bootstrap.servers=127.0.0.1:9092
# Ubah ID grup. ID grup harus konsisten di kluster yang sama.
group.id=connect-clusterAktifkan mode terdistribusi
$KAFKA_HOME/bin/connect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.propertiesKonfigurasikan konektor
curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
"name":"test-selectdb-sink-cluster",
"config":{
"connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
"topics":"topic_test",
"doris.topic2table.map": "topic_test:test_kafka_tbl",
"buffer.count.records":"10000",
"buffer.flush.time":"120",
"buffer.size.bytes":"5000000",
"doris.urls":"selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com",
"doris.user":"admin",
"doris.password":"***",
"doris.database":"test_db",
"doris.http.port":"8080",
"doris.query.port":"9030",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter"
}
}'Parameter
Parameter | Deskripsi |
name | Nama konektor. Dalam sebagian besar kasus, nama tersebut adalah string yang tidak mengandung karakter kontrol ISO dan harus unik di kluster Kafka Connect. |
connector.class | Nama kelas atau alias konektor. Atur nilainya menjadi |
topics | Nama topik yang berfungsi sebagai sumber data. Pisahkan beberapa nama topik dengan koma (,). |
doris.topic2table.map | Pemetaan antara topik dan tabel. Pisahkan beberapa hubungan pemetaan dengan koma (,). Contoh: |
buffer.count.records | Jumlah catatan data yang dibuffer di memori untuk setiap partisi Kafka sebelum data disiram ke ApsaraDB for SelectDB. Nilai default: 10000. |
buffer.flush.time | Interval waktu buffer diperbarui di memori. Unit: detik. Nilai default: 120. |
buffer.size.bytes | Ukuran kumulatif catatan data yang dibuffer di memori untuk setiap partisi Kafka. Unit: byte. Nilai default: 5000000. |
doris.urls | Titik akhir ApsaraDB for SelectDB. Untuk mendapatkan titik akhir instans ApsaraDB for SelectDB, lakukan operasi berikut: Masuk ke ApsaraDB for SelectDB. Pergi ke halaman Instance Details instans dan lihat titik akhir di bagian Network Information. Contoh: selectdb-cn4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030 |
doris.http.port | Nomor port HTTP dari ApsaraDB for SelectDB. Nilai default: 8080. |
doris.query.port | Nomor port MySQL dari ApsaraDB for SelectDB. Nilai default: 9030. |
doris.user | Nama pengguna yang digunakan untuk terhubung ke ApsaraDB for SelectDB. |
doris.password | Kata sandi nama pengguna yang digunakan untuk terhubung ke ApsaraDB for SelectDB. |
doris.database | Nama database tempat data ditulis di ApsaraDB for SelectDB. |
key.converter | Kelas konverter untuk mengonversi kunci tertentu dalam data berformat JSON. |
value.converter | Kelas konverter untuk mengonversi nilai tertentu dalam data berformat JSON. |
jmx | Menentukan apakah akan menggunakan Java Management Extensions (JMX) untuk memantau Doris Kafka Connector. Untuk informasi lebih lanjut, lihat Gunakan JMX untuk memantau Doris Kafka Connector. Nilai default: TRUE. |
enable.delete | Menentukan apakah akan menghapus rekaman secara bersamaan. Nilai default: false. |
label.prefix | Awalan label yang digunakan saat Anda mengimpor data menggunakan Stream Load. Nilai defaultnya adalah nama konektor. |
auto.redirect | Menentukan apakah akan mengarahkan ulang permintaan Stream Load. Jika Anda mengaktifkan fitur pengalihan otomatis, permintaan Stream Load dialihkan ke backend (BE) tempat data akan ditulis melalui frontend (FE). Informasi BE tidak lagi ditampilkan. |
load.model | Mode impor data. Nilai valid:
Nilai default: |
sink.properties.* | Parameter untuk mengimpor data menggunakan Stream Load. Sebagai contoh, parameter Untuk informasi lebih lanjut, lihat Gunakan Stream Load untuk mengimpor data. |
delivery.guarantee | Metode yang digunakan untuk memastikan konsistensi data ketika data Kafka yang dikonsumsi diimpor ke ApsaraDB for SelectDB. Nilai valid: Anda dapat mengatur parameter ini ke |
enable.2pc | Menentukan apakah akan mengaktifkan mode komit dua fase. Anda dapat mengaktifkan mode komit dua fase untuk memastikan semantik tepat-sekali. |
Untuk informasi lebih lanjut tentang item konfigurasi umum lainnya dari Kafka Connect Sink, lihat bagian Mengonfigurasi Konektor di Dokumentasi Kafka 3.7.
Contoh
Siapkan lingkungan
Instal kluster Apache Kafka versi 2.4.0 atau lebih baru atau Confluent Cloud. Dalam contoh ini, kluster Kafka mandiri digunakan.
# Unduh dan ekstrak paket. wget https://archive.apache.org/dist/kafka/2.4.0/kafka_2.12-2.4.0.tgz tar -zxvf kafka_2.12-2.4.0.tgz cd kafka_2.12-2.4.0/ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties bin/kafka-server-start.sh -daemon config/server.propertiesUnduh paket doris-kafka-connector-1.0.0.jar dan simpan paket JAR di direktori KAKFA_HOME/libs.
Buat ApsaraDB for SelectDB. Untuk informasi lebih lanjut, lihat Buat Instans.
Terhubung ke ApsaraDB for SelectDB melalui protokol MySQL. Untuk informasi lebih lanjut, lihat Terhubung ke Instans.
Buat database uji dan tabel uji.
Jalankan pernyataan berikut untuk membuat database uji:
CREATE DATABASE test_db;Jalankan pernyataan berikut untuk membuat tabel uji:
USE test_db; CREATE TABLE employees ( emp_no int NOT NULL, birth_date date, first_name varchar(20), last_name varchar(20), gender char(2), hire_date date ) UNIQUE KEY(`emp_no`) DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;
Contoh 1: Sinkronkan data berformat JSON
Konfigurasikan ApsaraDB for SelectDB Sink.
Dalam contoh ini, mode standalone digunakan. Buat file selectdb-sink.properties di direktori konfigurasi Kafka dan konfigurasikan item berikut di dalam file:
name=selectdb_sink connector.class=org.apache.doris.kafka.connector.DorisSinkConnector topics=test_topic doris.topic2table.map=test_topic:example_tbl buffer.count.records=10000 buffer.flush.time=120 buffer.size.bytes=5000000 doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com doris.http.port=8080 doris.query.port=9030 doris.user=admin doris.password=*** doris.database=test_db key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Opsional. Konfigurasikan antrian pesan gagal. errors.tolerance=all errors.deadletterqueue.topic.name=test_error errors.deadletterqueue.context.headers.enable = true errors.deadletterqueue.topic.replication.factor=1Mulai Kafka Connect.
bin/connect-standalone.sh -daemon config/connect-standalone.properties config/selectdb-sink.properties
Contoh 2: Gunakan Debezium Connector untuk menyinkronkan data dari database MySQL ke ApsaraDB for SelectDB
Dalam beberapa skenario bisnis, Anda perlu menyinkronkan data dari database bisnis secara real-time. Dalam hal ini, mekanisme capture data change (CDC) diperlukan.
Debezium Connector adalah alat CDC yang dikembangkan berdasarkan Kafka Connect. Debezium Connector dapat terhubung ke berbagai database, seperti MySQL, PostgreSQL, SQL Server, Oracle, dan MongoDB, dan terus mengirimkan data dari database tersebut ke topik Kafka dalam format seragam untuk konsumsi oleh sink downstream secara real-time. Dalam contoh ini, sumber data MySQL digunakan.
Unduh paket Debezium Connector.
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.9.8.Final/debezium-connector-mysql-1.9.8.Final-plugin.tar.gzEkstrak paket yang diunduh.
tar -zxvf debezium-connector-mysql-1.9.8.Final-plugin.tar.gzSimpan semua paket JAR yang diekstraksi di direktori KAKFA_HOME/libs.
Konfigurasikan sumber data MySQL.
Buat file mysql-source.properties di direktori konfigurasi Kafka dan konfigurasikan item berikut di dalam file:
name=mysql-source connector.class=io.debezium.connector.mysql.MySqlConnector database.hostname=rm-bp17372257wkz****.rwlb.rds.aliyuncs.com database.port=3306 database.user=testuser database.password=**** database.server.id=1 # Pengenal unik klien di Kafka. database.server.name=test123 # Database dan tabel tempat data disinkronkan. Secara default, data disinkronkan dari semua database dan tabel. database.include.list=test table.include.list=test.test_table database.history.kafka.bootstrap.servers=localhost:9092 # Topik Kafka yang digunakan untuk menyimpan perubahan skema pada database dan tabel. database.history.kafka.topic=dbhistory transforms=unwrap # Untuk informasi lebih lanjut, kunjungi https://debezium.io/documentation/reference/stable/transformations/event-flattening.html. transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState # Catat perubahan acara untuk operasi DELETE. transforms.unwrap.delete.handling.mode=rewriteSetelah konfigurasi selesai, topik Kafka secara default dinamai dalam format
SERVER_NAME.DATABASE_NAME.TABLE_NAME.CatatanUntuk informasi lebih lanjut tentang cara mengonfigurasi Debezium Connector, lihat Debezium connector for MySQL.
Konfigurasikan ApsaraDB for SelectDB Sink.
Buat file selectdb-sink.properties di direktori konfigurasi Kafka dan konfigurasikan item berikut di dalam file:
name=selectdb-sink connector.class=org.apache.doris.kafka.connector.DorisSinkConnector topics=test123.test.test_table doris.topic2table.map=test123.test.test_table:test_table buffer.count.records=10000 buffer.flush.time=120 buffer.size.bytes=5000000 doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com doris.http.port=8080 doris.query.port=9030 doris.user=admin doris.password=**** doris.database=test key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Opsional. Konfigurasikan antrian pesan gagal. #errors.tolerance=all #errors.deadletterqueue.topic.name=test_error #errors.deadletterqueue.context.headers.enable = true #errors.deadletterqueue.topic.replication.factor=1CatatanSebelum Anda menyinkronkan data ke ApsaraDB for SelectDB, Anda harus membuat database dan tabel di instans tersebut.
Mulai Kafka Connect.
bin/connect-standalone.sh -daemon config/connect-standalone.properties config/mysql-source.properties config/selectdb-sink.propertiesCatatanSetelah Anda memulai Kafka Connect, Anda dapat melihat file logs/connect.log untuk memeriksa apakah Kafka Connect telah dimulai.
Penggunaan lanjutan
Kelola konektor
# Periksa status konektor.
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/status -X GET
# Hapus konektor saat ini.
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster -X DELETE
# Jeda konektor saat ini.
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/pause -X PUT
# Mulai ulang konektor saat ini.
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/resume -X PUT
# Mulai ulang tugas dalam konektor.
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/tasks/0/restart -X POSTUntuk informasi lebih lanjut, lihat Kafka Connect REST Interface for Confluent Platform.
Konfigurasikan antrian pesan gagal
Secara default, proses konversi atau kesalahan yang terjadi selama proses konversi dapat menyebabkan konektor gagal. Anda juga dapat mentolerir kesalahan tersebut dengan mengonfigurasi konektor untuk melewati kesalahan. Anda dapat menulis detail kesalahan, operasi yang gagal, dan catatan data abnormal dengan berbagai tingkat detail ke antrian pesan gagal.
errors.tolerance=all
errors.deadletterqueue.topic.name=test_error_topic
errors.deadletterqueue.context.headers.enable=true
errors.deadletterqueue.topic.replication.factor=1Untuk informasi lebih lanjut, lihat Pelaporan Kesalahan di Connect.
Akses kluster Kafka dengan otentikasi SSL
Untuk mengakses kluster Kafka dengan otentikasi SSL menggunakan Kafka Connect, Anda harus menyediakan file sertifikat client.truststore.jks yang digunakan untuk mengotentikasi kunci publik broker Kafka. Anda dapat menambahkan konfigurasi berikut ke file connect-distributed.properties:
# Worker Connect
security.protocol=SSL
ssl.truststore.location=/var/ssl/private/client.truststore.jks
ssl.truststore.password=test1234
# Konsumen tertanam untuk konektor sink
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/var/ssl/private/client.truststore.jks
consumer.ssl.truststore.password=test1234Untuk informasi lebih lanjut tentang konfigurasi untuk mengakses kluster Kafka dengan otentikasi SSL menggunakan Kafka Connect, lihat Konfigurasi Kafka Connect.