全部产品
Search
文档中心

ApsaraDB for SelectDB:Gunakan Doris Kafka Connector untuk mengimpor data

更新时间:Jul 30, 2025

ApsaraDB for SelectDB dapat secara otomatis berlangganan data di Kafka dan menyinkronkan data dari Kafka menggunakan Doris Kafka Connector. Topik ini menjelaskan cara menggunakan Doris Kafka Connector untuk menyinkronkan data dari sumber data Kafka ke ApsaraDB for SelectDB.

Informasi latar belakang

Kafka Connect adalah alat yang andal untuk mentransfer data antara Apache Kafka dan sistem lainnya. Anda dapat mendefinisikan konektor untuk mengimpor atau mengekspor data dari sumber data Kafka.

Doris Kafka Connector yang disediakan oleh Apache Doris berjalan di kluster Kafka Connect. Doris Kafka Connector dapat membaca data dari topik Kafka dan menulis data tersebut ke ApsaraDB for SelectDB.

Dalam skenario bisnis, Anda mungkin mendorong data terbaru dalam database ke Kafka menggunakan Debezium Connector atau memanggil operasi API untuk menulis data berformat JSON ke Kafka secara real-time. Kemudian, Anda dapat menggunakan Doris Kafka Connector untuk secara otomatis berlangganan data di Kafka dan menyinkronkan data tersebut ke ApsaraDB for SelectDB.

Mode operasi Kafka Connect

Kafka Connect menyediakan mode standalone dan mode terdistribusi. Anda dapat memilih mode operasi sesuai dengan kebutuhan bisnis Anda.

Mode standalone

Peringatan

Kami merekomendasikan agar Anda tidak menggunakan mode standalone di lingkungan produksi.

Konfigurasikan mode standalone

Konfigurasikan file connect-standalone.properties.

# Ubah alamat IP broker.
bootstrap.servers=127.0.0.1:9092

Buat file connect-selectdb-sink.properties di direktori konfigurasi Kafka dan konfigurasikan item berikut di dalam file:

name=test-selectdb-sink
connector.class=org.apache.doris.kafka.connector.DorisSinkConnector
topics=topic_test
doris.topic2table.map=topic_test:test_kafka_tbl
buffer.count.records=10000
buffer.flush.time=120
buffer.size.bytes=5000000
doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com
doris.http.port=8030
doris.query.port=9030
doris.user=admin
doris.password=****
doris.database=test_db
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

Aktifkan mode standalone

$KAFKA_HOME/bin/connect-standalone.sh -daemon $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-selectdb-sink.properties

Mode terdistribusi

Konfigurasikan mode terdistribusi

Konfigurasikan file connect-distributed.properties.

# Ubah alamat IP broker.
bootstrap.servers=127.0.0.1:9092
 
# Ubah ID grup. ID grup harus konsisten di kluster yang sama.
group.id=connect-cluster

Aktifkan mode terdistribusi

$KAFKA_HOME/bin/connect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties

Konfigurasikan konektor

curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
  "name":"test-selectdb-sink-cluster",
  "config":{
    "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
    "topics":"topic_test",
    "doris.topic2table.map": "topic_test:test_kafka_tbl",
    "buffer.count.records":"10000",
    "buffer.flush.time":"120",
    "buffer.size.bytes":"5000000",
    "doris.urls":"selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com",
    "doris.user":"admin",
    "doris.password":"***",
    "doris.database":"test_db",
    "doris.http.port":"8080",
    "doris.query.port":"9030",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter":"org.apache.kafka.connect.json.JsonConverter"
  }
}'

Parameter

Parameter

Deskripsi

name

Nama konektor. Dalam sebagian besar kasus, nama tersebut adalah string yang tidak mengandung karakter kontrol ISO dan harus unik di kluster Kafka Connect.

connector.class

Nama kelas atau alias konektor. Atur nilainya menjadi com.selectdb.kafka.connector.SelectdbSinkConnector.

topics

Nama topik yang berfungsi sebagai sumber data. Pisahkan beberapa nama topik dengan koma (,).

doris.topic2table.map

Pemetaan antara topik dan tabel. Pisahkan beberapa hubungan pemetaan dengan koma (,). Contoh: topic1:tb1,topic2:tb2. Secara default, parameter ini dibiarkan kosong, yang menunjukkan bahwa topik dan tabel memiliki nama yang sama.

buffer.count.records

Jumlah catatan data yang dibuffer di memori untuk setiap partisi Kafka sebelum data disiram ke ApsaraDB for SelectDB. Nilai default: 10000.

buffer.flush.time

Interval waktu buffer diperbarui di memori. Unit: detik. Nilai default: 120.

buffer.size.bytes

Ukuran kumulatif catatan data yang dibuffer di memori untuk setiap partisi Kafka. Unit: byte. Nilai default: 5000000.

doris.urls

Titik akhir ApsaraDB for SelectDB.

Untuk mendapatkan titik akhir instans ApsaraDB for SelectDB, lakukan operasi berikut: Masuk ke ApsaraDB for SelectDB. Pergi ke halaman Instance Details instans dan lihat titik akhir di bagian Network Information.

Contoh: selectdb-cn4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030

doris.http.port

Nomor port HTTP dari ApsaraDB for SelectDB. Nilai default: 8080.

doris.query.port

Nomor port MySQL dari ApsaraDB for SelectDB. Nilai default: 9030.

doris.user

Nama pengguna yang digunakan untuk terhubung ke ApsaraDB for SelectDB.

doris.password

Kata sandi nama pengguna yang digunakan untuk terhubung ke ApsaraDB for SelectDB.

doris.database

Nama database tempat data ditulis di ApsaraDB for SelectDB.

key.converter

Kelas konverter untuk mengonversi kunci tertentu dalam data berformat JSON.

value.converter

Kelas konverter untuk mengonversi nilai tertentu dalam data berformat JSON.

jmx

Menentukan apakah akan menggunakan Java Management Extensions (JMX) untuk memantau Doris Kafka Connector. Untuk informasi lebih lanjut, lihat Gunakan JMX untuk memantau Doris Kafka Connector. Nilai default: TRUE.

enable.delete

Menentukan apakah akan menghapus rekaman secara bersamaan. Nilai default: false.

label.prefix

Awalan label yang digunakan saat Anda mengimpor data menggunakan Stream Load. Nilai defaultnya adalah nama konektor.

auto.redirect

Menentukan apakah akan mengarahkan ulang permintaan Stream Load. Jika Anda mengaktifkan fitur pengalihan otomatis, permintaan Stream Load dialihkan ke backend (BE) tempat data akan ditulis melalui frontend (FE). Informasi BE tidak lagi ditampilkan.

load.model

Mode impor data. Nilai valid:

  • stream_load: Data langsung diimpor ke ApsaraDB for SelectDB.

  • copy_into: Data diimpor ke Layanan Penyimpanan Objek (OSS) dan kemudian dimuat ke ApsaraDB for SelectDB.

Nilai default: stream_load.

sink.properties.*

Parameter untuk mengimpor data menggunakan Stream Load.

Sebagai contoh, parameter sink.properties.column_separator menentukan pemisah kolom.

Untuk informasi lebih lanjut, lihat Gunakan Stream Load untuk mengimpor data.

delivery.guarantee

Metode yang digunakan untuk memastikan konsistensi data ketika data Kafka yang dikonsumsi diimpor ke ApsaraDB for SelectDB. Nilai valid: at_least_once dan exactly_once. Nilai default: at_least_once.

Anda dapat mengatur parameter ini ke exactly_once hanya jika Anda mengatur parameter load.model ke copy_into.ApsaraDB for SelectDB

enable.2pc

Menentukan apakah akan mengaktifkan mode komit dua fase. Anda dapat mengaktifkan mode komit dua fase untuk memastikan semantik tepat-sekali.

Catatan

Untuk informasi lebih lanjut tentang item konfigurasi umum lainnya dari Kafka Connect Sink, lihat bagian Mengonfigurasi Konektor di Dokumentasi Kafka 3.7.

Contoh

Siapkan lingkungan

  1. Instal kluster Apache Kafka versi 2.4.0 atau lebih baru atau Confluent Cloud. Dalam contoh ini, kluster Kafka mandiri digunakan.

    # Unduh dan ekstrak paket. 
    wget https://archive.apache.org/dist/kafka/2.4.0/kafka_2.12-2.4.0.tgz
    tar -zxvf kafka_2.12-2.4.0.tgz
    cd kafka_2.12-2.4.0/
    bin/zookeeper-server-start.sh -daemon config/zookeeper.properties 
    bin/kafka-server-start.sh -daemon config/server.properties
  2. Unduh paket doris-kafka-connector-1.0.0.jar dan simpan paket JAR di direktori KAKFA_HOME/libs.

  3. Buat ApsaraDB for SelectDB. Untuk informasi lebih lanjut, lihat Buat Instans.

  4. Terhubung ke ApsaraDB for SelectDB melalui protokol MySQL. Untuk informasi lebih lanjut, lihat Terhubung ke Instans.

  5. Buat database uji dan tabel uji.

    1. Jalankan pernyataan berikut untuk membuat database uji:

      CREATE DATABASE test_db;
    2. Jalankan pernyataan berikut untuk membuat tabel uji:

      USE test_db;
      CREATE TABLE employees (
          emp_no       int NOT NULL,
          birth_date   date,
          first_name   varchar(20),
          last_name    varchar(20),
          gender       char(2),
          hire_date    date
      )
      UNIQUE KEY(`emp_no`)
      DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;

Contoh 1: Sinkronkan data berformat JSON

  1. Konfigurasikan ApsaraDB for SelectDB Sink.

    Dalam contoh ini, mode standalone digunakan. Buat file selectdb-sink.properties di direktori konfigurasi Kafka dan konfigurasikan item berikut di dalam file:

    name=selectdb_sink
    connector.class=org.apache.doris.kafka.connector.DorisSinkConnector
    topics=test_topic
    doris.topic2table.map=test_topic:example_tbl
    buffer.count.records=10000
    buffer.flush.time=120
    buffer.size.bytes=5000000
    doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com
    doris.http.port=8080
    doris.query.port=9030
    doris.user=admin
    doris.password=***
    doris.database=test_db
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    
    # Opsional. Konfigurasikan antrian pesan gagal.
    errors.tolerance=all
    errors.deadletterqueue.topic.name=test_error
    errors.deadletterqueue.context.headers.enable = true
    errors.deadletterqueue.topic.replication.factor=1
  2. Mulai Kafka Connect.

    bin/connect-standalone.sh -daemon config/connect-standalone.properties config/selectdb-sink.properties

Contoh 2: Gunakan Debezium Connector untuk menyinkronkan data dari database MySQL ke ApsaraDB for SelectDB

Dalam beberapa skenario bisnis, Anda perlu menyinkronkan data dari database bisnis secara real-time. Dalam hal ini, mekanisme capture data change (CDC) diperlukan.

Debezium Connector adalah alat CDC yang dikembangkan berdasarkan Kafka Connect. Debezium Connector dapat terhubung ke berbagai database, seperti MySQL, PostgreSQL, SQL Server, Oracle, dan MongoDB, dan terus mengirimkan data dari database tersebut ke topik Kafka dalam format seragam untuk konsumsi oleh sink downstream secara real-time. Dalam contoh ini, sumber data MySQL digunakan.

  1. Unduh paket Debezium Connector.

    wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.9.8.Final/debezium-connector-mysql-1.9.8.Final-plugin.tar.gz
  2. Ekstrak paket yang diunduh.

    tar -zxvf debezium-connector-mysql-1.9.8.Final-plugin.tar.gz
  3. Simpan semua paket JAR yang diekstraksi di direktori KAKFA_HOME/libs.

  4. Konfigurasikan sumber data MySQL.

    Buat file mysql-source.properties di direktori konfigurasi Kafka dan konfigurasikan item berikut di dalam file:

    name=mysql-source
    connector.class=io.debezium.connector.mysql.MySqlConnector
    database.hostname=rm-bp17372257wkz****.rwlb.rds.aliyuncs.com
    database.port=3306
    database.user=testuser
    database.password=****
    database.server.id=1
    # Pengenal unik klien di Kafka.
    database.server.name=test123
    # Database dan tabel tempat data disinkronkan. Secara default, data disinkronkan dari semua database dan tabel.
    database.include.list=test
    table.include.list=test.test_table
    database.history.kafka.bootstrap.servers=localhost:9092
    # Topik Kafka yang digunakan untuk menyimpan perubahan skema pada database dan tabel.
    database.history.kafka.topic=dbhistory
    transforms=unwrap
    # Untuk informasi lebih lanjut, kunjungi https://debezium.io/documentation/reference/stable/transformations/event-flattening.html.
    transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
    # Catat perubahan acara untuk operasi DELETE.
    transforms.unwrap.delete.handling.mode=rewrite

    Setelah konfigurasi selesai, topik Kafka secara default dinamai dalam format SERVER_NAME.DATABASE_NAME.TABLE_NAME.

    Catatan

    Untuk informasi lebih lanjut tentang cara mengonfigurasi Debezium Connector, lihat Debezium connector for MySQL.

  5. Konfigurasikan ApsaraDB for SelectDB Sink.

    Buat file selectdb-sink.properties di direktori konfigurasi Kafka dan konfigurasikan item berikut di dalam file:

    name=selectdb-sink
    connector.class=org.apache.doris.kafka.connector.DorisSinkConnector
    topics=test123.test.test_table
    doris.topic2table.map=test123.test.test_table:test_table
    buffer.count.records=10000
    buffer.flush.time=120
    buffer.size.bytes=5000000
    doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com
    doris.http.port=8080
    doris.query.port=9030
    doris.user=admin
    doris.password=****
    doris.database=test
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    
    # Opsional. Konfigurasikan antrian pesan gagal.
    #errors.tolerance=all
    #errors.deadletterqueue.topic.name=test_error
    #errors.deadletterqueue.context.headers.enable = true
    #errors.deadletterqueue.topic.replication.factor=1
    Catatan

    Sebelum Anda menyinkronkan data ke ApsaraDB for SelectDB, Anda harus membuat database dan tabel di instans tersebut.

  6. Mulai Kafka Connect.

    bin/connect-standalone.sh -daemon config/connect-standalone.properties config/mysql-source.properties config/selectdb-sink.properties
    Catatan

    Setelah Anda memulai Kafka Connect, Anda dapat melihat file logs/connect.log untuk memeriksa apakah Kafka Connect telah dimulai.

Penggunaan lanjutan

Kelola konektor

# Periksa status konektor.
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/status -X GET
# Hapus konektor saat ini.
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster -X DELETE
# Jeda konektor saat ini.
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/pause -X PUT
# Mulai ulang konektor saat ini.
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/resume -X PUT
# Mulai ulang tugas dalam konektor.
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/tasks/0/restart -X POST

Untuk informasi lebih lanjut, lihat Kafka Connect REST Interface for Confluent Platform.

Konfigurasikan antrian pesan gagal

Secara default, proses konversi atau kesalahan yang terjadi selama proses konversi dapat menyebabkan konektor gagal. Anda juga dapat mentolerir kesalahan tersebut dengan mengonfigurasi konektor untuk melewati kesalahan. Anda dapat menulis detail kesalahan, operasi yang gagal, dan catatan data abnormal dengan berbagai tingkat detail ke antrian pesan gagal.

errors.tolerance=all
errors.deadletterqueue.topic.name=test_error_topic
errors.deadletterqueue.context.headers.enable=true
errors.deadletterqueue.topic.replication.factor=1

Untuk informasi lebih lanjut, lihat Pelaporan Kesalahan di Connect.

Akses kluster Kafka dengan otentikasi SSL

Untuk mengakses kluster Kafka dengan otentikasi SSL menggunakan Kafka Connect, Anda harus menyediakan file sertifikat client.truststore.jks yang digunakan untuk mengotentikasi kunci publik broker Kafka. Anda dapat menambahkan konfigurasi berikut ke file connect-distributed.properties:

# Worker Connect
security.protocol=SSL
ssl.truststore.location=/var/ssl/private/client.truststore.jks
ssl.truststore.password=test1234
 
# Konsumen tertanam untuk konektor sink
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/var/ssl/private/client.truststore.jks
consumer.ssl.truststore.password=test1234

Untuk informasi lebih lanjut tentang konfigurasi untuk mengakses kluster Kafka dengan otentikasi SSL menggunakan Kafka Connect, lihat Konfigurasi Kafka Connect.