All Products
Search
Document Center

ApsaraMQ for Kafka:Sinkronisasi data dari SQL Server ke ApsaraMQ for Kafka menggunakan Kafka Connect

Last Updated:Mar 11, 2026

Tutorial ini menjelaskan cara membangun pipeline change data capture (CDC) untuk mengalirkan perubahan tingkat baris dari database SQL Server ke ApsaraMQ for Kafka. Pipeline ini menggunakan Kafka Connect dalam mode terdistribusi dengan konektor sumber Debezium SQL Server.

Cara kerja

Konektor sumber Debezium SQL Server membaca log transaksi SQL Server melalui CDC dan mengonversi setiap operasi insert, update, atau delete menjadi pesan Kafka. Kafka Connect menjalankan konektor sebagai proses worker terdistribusi, mendorong event perubahan ke topik ApsaraMQ for Kafka, serta melacak riwayat skema dalam topik khusus.

Alur data:

SQL Server (CDC diaktifkan) → Konektor sumber Debezium → Kafka Connect → Topik ApsaraMQ for Kafka

Setiap tabel yang dipantau dipetakan ke topik terpisah bernama <server-name>.<database>.<table> — contohnya, server1.testDB.products.

Prasyarat

Sebelum memulai, siapkan komponen-komponen berikut:

  • Konektor sumber Debezium SQL Server — Unduh dari Repositori Maven. Pilih versi yang kompatibel dengan versi Kafka Connect Anda.

  • Kafka Connect 2.1.0 atau lebih baru — Unduh dari Unduhan Apache Kafka.

    Catatan Konektor sumber Debezium SQL Server memerlukan Kafka Connect 2.1.0 atau lebih baru. Versi sebelumnya tidak didukung.
  • Docker — Unduh dari Docker Desktop.

Langkah 1: Konfigurasikan Kafka Connect

  1. Ekstrak paket konektor sumber Debezium SQL Server ke direktori lokal.

  2. Buka file konfigurasi Kafka Connect connect-distributed.properties dan atur plugin.path ke direktori yang berisi konektor yang telah diekstrak:

    Penting

    Pada versi Kafka Connect yang lebih lama, properti plugin.path tidak didukung. Sebagai gantinya, atur variabel lingkungan CLASSPATH:

    export CLASSPATH=/kafka/connect/plugins/sqlserver-connector/*
    # Path ke direktori yang berisi JAR konektor yang telah diekstrak
    plugin.path=/kafka/connect/plugins

Langkah 2: Jalankan Kafka Connect

  1. Akses Internet saja — Jika Anda mengakses ApsaraMQ for Kafka melalui Internet, konfigurasikan file otentikasi JAAS terlebih dahulu. Lewati langkah ini jika mengakses melalui virtual private cloud (VPC).

    export KAFKA_OPTS="-Djava.security.auth.login.config=kafka_client_jaas.conf"
  2. Jalankan Kafka Connect dalam mode terdistribusi:

    bin/connect-distributed.sh config/connect-distributed.properties

Langkah 3: Siapkan SQL Server dengan Docker

Penting

CDC memerlukan SQL Server 2016 SP1 atau lebih baru. Untuk detail selengkapnya, lihat Catatan rilis SQL Server 2016 SP1 dan Tentang change data capture.

  1. Unduh docker-compose-sqlserver.yaml.

  2. Jalankan kontainer SQL Server:

    docker-compose -f docker-compose-sqlserver.yaml up
  3. Unduh inventory.sql dan muat data uji ke dalam database:

    cat inventory.sql | docker exec -i tutorial_sqlserver_1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'

Langkah 4: Aktifkan CDC untuk tabel yang sudah ada (opsional)

Jika Anda ingin menangkap perubahan dari tabel yang sudah ada di database, aktifkan CDC di tingkat database dan tingkat tabel.

Aktifkan CDC di tingkat database

USE testDB
GO
EXEC sys.sp_cdc_enable_db
GO

Aktifkan CDC untuk tabel tertentu

USE testDB
GO

EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name   = N'MyTable',
@role_name     = N'MyRole',
@filegroup_name = N'MyDB_CT',
@supports_net_changes = 1
GO

Parameter-parameter tersebut adalah:

ParameterDeskripsi
@source_schemaSkema dari tabel sumber, seperti dbo
@source_nameNama tabel yang akan dipantau
@role_namePeran database yang mengontrol akses ke data perubahan
@filegroup_nameFilegroup yang digunakan untuk menyimpan tabel perubahan
@supports_net_changesAtur ke 1 untuk mengaktifkan kueri perubahan bersih (net change)

Verifikasi status CDC

Jalankan perintah berikut untuk memastikan CDC aktif dan akun Anda memiliki izin yang diperlukan:

EXEC sys.sp_cdc_help_change_data_capture
GO

Jika hasilnya kosong, akun Anda tidak memiliki akses ke tabel yang telah diaktifkan CDC-nya. Periksa penugasan peran.

Verifikasi bahwa SQL Server Agent sedang berjalan

CDC bergantung pada SQL Server Agent. Jalankan perintah berikut untuk memeriksa statusnya:

EXEC master.dbo.xp_servicecontrol N'QUERYSTATE',N'SQLSERVERAGENT'

Jika output menampilkan Running, SQL Server Agent aktif.

Langkah 5: Konfigurasikan dan jalankan konektor sumber

Buat topik di ApsaraMQ for Kafka

Sebelum menjalankan konektor, buat topik yang diperlukan di Konsol ApsaraMQ for Kafka. Konektor menulis event perubahan ke topik bernama <server-name>.<database>.<table>.

Untuk database uji dalam tutorial ini (testDB dengan empat tabel), buat topik-topik berikut:

TopikTujuan
server1Topik tingkat server untuk konektor
server1.testDB.customersEvent perubahan dari tabel customers
server1.testDB.ordersEvent perubahan dari tabel orders
server1.testDB.productsEvent perubahan dari tabel products
server1.testDB.products_on_handEvent perubahan dari tabel products_on_hand
schema-changes-inventoryRiwayat perubahan skema untuk konektor

Untuk petunjuk membuat topik, lihat Buat topik. Atau, panggil API CreateTopic.

Konfigurasikan konektor

  1. Unduh register-sqlserver.json.

  2. Buka register-sqlserver.json dan perbarui properti-properti berikut sesuai metode akses Anda. Akses VPC Akses Internet Properti utamanya adalah: Untuk akses Internet, properti SSL dan SASL tambahan mengaktifkan koneksi terenkripsi dan terotentikasi ke ApsaraMQ for Kafka.

    PropertiDeskripsi
    database.history.kafka.bootstrap.serversTitik akhir instans ApsaraMQ for Kafka Anda. Gunakan titik akhir default untuk akses VPC atau titik akhir SSL untuk akses Internet. Anda dapat menemukan titik akhir tersebut di Konsol ApsaraMQ for Kafka.
    database.server.nameNama server logis yang digunakan sebagai awalan untuk semua topik event perubahan. Mengatur nilai ini ke server1 menghasilkan topik seperti server1.testDB.products.
    database.history.kafka.topicTopik tempat konektor menyimpan riwayat perubahan skema. Buat topik ini di konsol sebelum menjalankan konektor.
    "database.history.kafka.bootstrap.servers" : "<your-default-endpoint>",
    "database.server.name": "server1",
    "database.history.kafka.topic": "schema-changes-inventory"
    "database.history.kafka.bootstrap.servers" : "<your-ssl-endpoint>",
    "database.server.name": "server1",
    "database.history.kafka.topic": "schema-changes-inventory",
    "database.history.producer.ssl.truststore.location": "kafka.client.truststore.jks",
    "database.history.producer.ssl.truststore.password": "KafkaOnsClient",
    "database.history.producer.security.protocol": "SASL_SSL",
    "database.history.producer.sasl.mechanism": "PLAIN",
    "database.history.consumer.ssl.truststore.location": "kafka.client.truststore.jks",
    "database.history.consumer.ssl.truststore.password": "KafkaOnsClient",
    "database.history.consumer.security.protocol": "SASL_SSL",
    "database.history.consumer.sasl.mechanism": "PLAIN"

Jalankan konektor

Daftarkan konektor ke Kafka Connect dengan mengirim permintaan POST:

curl -i -X POST \
  -H "Accept:application/json" \
  -H "Content-Type:application/json" \
  http://localhost:8083/connectors/ \
  -d @register-sqlserver.json

Verifikasi hasil

  1. Lakukan operasi insert, update, atau delete pada baris di tabel SQL Server yang dipantau.

  2. Di Konsol ApsaraMQ for Kafka, buka halaman Message Query dan kueri topik yang sesuai. Jika event perubahan muncul sebagai pesan, pipeline berfungsi dengan baik. Untuk informasi lebih lanjut, lihat Kueri pesan.

Langkah selanjutnya

  • Untuk memantau lebih banyak tabel, aktifkan CDC untuk setiap tabel dan buat topik yang sesuai di ApsaraMQ for Kafka.

  • Untuk daftar lengkap opsi konfigurasi konektor, lihat dokumentasi konektor Debezium SQL Server.