Tutorial ini menjelaskan cara membangun pipeline change data capture (CDC) untuk mengalirkan perubahan tingkat baris dari database SQL Server ke ApsaraMQ for Kafka. Pipeline ini menggunakan Kafka Connect dalam mode terdistribusi dengan konektor sumber Debezium SQL Server.
Cara kerja
Konektor sumber Debezium SQL Server membaca log transaksi SQL Server melalui CDC dan mengonversi setiap operasi insert, update, atau delete menjadi pesan Kafka. Kafka Connect menjalankan konektor sebagai proses worker terdistribusi, mendorong event perubahan ke topik ApsaraMQ for Kafka, serta melacak riwayat skema dalam topik khusus.
Alur data:
SQL Server (CDC diaktifkan) → Konektor sumber Debezium → Kafka Connect → Topik ApsaraMQ for KafkaSetiap tabel yang dipantau dipetakan ke topik terpisah bernama <server-name>.<database>.<table> — contohnya, server1.testDB.products.
Prasyarat
Sebelum memulai, siapkan komponen-komponen berikut:
Konektor sumber Debezium SQL Server — Unduh dari Repositori Maven. Pilih versi yang kompatibel dengan versi Kafka Connect Anda.
Kafka Connect 2.1.0 atau lebih baru — Unduh dari Unduhan Apache Kafka.
Catatan Konektor sumber Debezium SQL Server memerlukan Kafka Connect 2.1.0 atau lebih baru. Versi sebelumnya tidak didukung.Docker — Unduh dari Docker Desktop.
Langkah 1: Konfigurasikan Kafka Connect
Ekstrak paket konektor sumber Debezium SQL Server ke direktori lokal.
Buka file konfigurasi Kafka Connect
connect-distributed.propertiesdan aturplugin.pathke direktori yang berisi konektor yang telah diekstrak:PentingPada versi Kafka Connect yang lebih lama, properti
plugin.pathtidak didukung. Sebagai gantinya, atur variabel lingkunganCLASSPATH:export CLASSPATH=/kafka/connect/plugins/sqlserver-connector/*# Path ke direktori yang berisi JAR konektor yang telah diekstrak plugin.path=/kafka/connect/plugins
Langkah 2: Jalankan Kafka Connect
Akses Internet saja — Jika Anda mengakses ApsaraMQ for Kafka melalui Internet, konfigurasikan file otentikasi JAAS terlebih dahulu. Lewati langkah ini jika mengakses melalui virtual private cloud (VPC).
export KAFKA_OPTS="-Djava.security.auth.login.config=kafka_client_jaas.conf"Jalankan Kafka Connect dalam mode terdistribusi:
bin/connect-distributed.sh config/connect-distributed.properties
Langkah 3: Siapkan SQL Server dengan Docker
CDC memerlukan SQL Server 2016 SP1 atau lebih baru. Untuk detail selengkapnya, lihat Catatan rilis SQL Server 2016 SP1 dan Tentang change data capture.
Jalankan kontainer SQL Server:
docker-compose -f docker-compose-sqlserver.yaml upUnduh inventory.sql dan muat data uji ke dalam database:
cat inventory.sql | docker exec -i tutorial_sqlserver_1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'
Langkah 4: Aktifkan CDC untuk tabel yang sudah ada (opsional)
Jika Anda ingin menangkap perubahan dari tabel yang sudah ada di database, aktifkan CDC di tingkat database dan tingkat tabel.
Aktifkan CDC di tingkat database
USE testDB
GO
EXEC sys.sp_cdc_enable_db
GOAktifkan CDC untuk tabel tertentu
USE testDB
GO
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'MyTable',
@role_name = N'MyRole',
@filegroup_name = N'MyDB_CT',
@supports_net_changes = 1
GOParameter-parameter tersebut adalah:
| Parameter | Deskripsi |
|---|---|
@source_schema | Skema dari tabel sumber, seperti dbo |
@source_name | Nama tabel yang akan dipantau |
@role_name | Peran database yang mengontrol akses ke data perubahan |
@filegroup_name | Filegroup yang digunakan untuk menyimpan tabel perubahan |
@supports_net_changes | Atur ke 1 untuk mengaktifkan kueri perubahan bersih (net change) |
Verifikasi status CDC
Jalankan perintah berikut untuk memastikan CDC aktif dan akun Anda memiliki izin yang diperlukan:
EXEC sys.sp_cdc_help_change_data_capture
GOJika hasilnya kosong, akun Anda tidak memiliki akses ke tabel yang telah diaktifkan CDC-nya. Periksa penugasan peran.
Verifikasi bahwa SQL Server Agent sedang berjalan
CDC bergantung pada SQL Server Agent. Jalankan perintah berikut untuk memeriksa statusnya:
EXEC master.dbo.xp_servicecontrol N'QUERYSTATE',N'SQLSERVERAGENT'Jika output menampilkan Running, SQL Server Agent aktif.
Langkah 5: Konfigurasikan dan jalankan konektor sumber
Buat topik di ApsaraMQ for Kafka
Sebelum menjalankan konektor, buat topik yang diperlukan di Konsol ApsaraMQ for Kafka. Konektor menulis event perubahan ke topik bernama <server-name>.<database>.<table>.
Untuk database uji dalam tutorial ini (testDB dengan empat tabel), buat topik-topik berikut:
| Topik | Tujuan |
|---|---|
server1 | Topik tingkat server untuk konektor |
server1.testDB.customers | Event perubahan dari tabel customers |
server1.testDB.orders | Event perubahan dari tabel orders |
server1.testDB.products | Event perubahan dari tabel products |
server1.testDB.products_on_hand | Event perubahan dari tabel products_on_hand |
schema-changes-inventory | Riwayat perubahan skema untuk konektor |
Untuk petunjuk membuat topik, lihat Buat topik. Atau, panggil API CreateTopic.
Konfigurasikan konektor
Unduh register-sqlserver.json.
Buka
register-sqlserver.jsondan perbarui properti-properti berikut sesuai metode akses Anda. Akses VPC Akses Internet Properti utamanya adalah: Untuk akses Internet, properti SSL dan SASL tambahan mengaktifkan koneksi terenkripsi dan terotentikasi ke ApsaraMQ for Kafka.Properti Deskripsi database.history.kafka.bootstrap.serversTitik akhir instans ApsaraMQ for Kafka Anda. Gunakan titik akhir default untuk akses VPC atau titik akhir SSL untuk akses Internet. Anda dapat menemukan titik akhir tersebut di Konsol ApsaraMQ for Kafka. database.server.nameNama server logis yang digunakan sebagai awalan untuk semua topik event perubahan. Mengatur nilai ini ke server1menghasilkan topik sepertiserver1.testDB.products.database.history.kafka.topicTopik tempat konektor menyimpan riwayat perubahan skema. Buat topik ini di konsol sebelum menjalankan konektor. "database.history.kafka.bootstrap.servers" : "<your-default-endpoint>", "database.server.name": "server1", "database.history.kafka.topic": "schema-changes-inventory""database.history.kafka.bootstrap.servers" : "<your-ssl-endpoint>", "database.server.name": "server1", "database.history.kafka.topic": "schema-changes-inventory", "database.history.producer.ssl.truststore.location": "kafka.client.truststore.jks", "database.history.producer.ssl.truststore.password": "KafkaOnsClient", "database.history.producer.security.protocol": "SASL_SSL", "database.history.producer.sasl.mechanism": "PLAIN", "database.history.consumer.ssl.truststore.location": "kafka.client.truststore.jks", "database.history.consumer.ssl.truststore.password": "KafkaOnsClient", "database.history.consumer.security.protocol": "SASL_SSL", "database.history.consumer.sasl.mechanism": "PLAIN"
Jalankan konektor
Daftarkan konektor ke Kafka Connect dengan mengirim permintaan POST:
curl -i -X POST \
-H "Accept:application/json" \
-H "Content-Type:application/json" \
http://localhost:8083/connectors/ \
-d @register-sqlserver.jsonVerifikasi hasil
Lakukan operasi insert, update, atau delete pada baris di tabel SQL Server yang dipantau.
Di Konsol ApsaraMQ for Kafka, buka halaman Message Query dan kueri topik yang sesuai. Jika event perubahan muncul sebagai pesan, pipeline berfungsi dengan baik. Untuk informasi lebih lanjut, lihat Kueri pesan.
Langkah selanjutnya
Untuk memantau lebih banyak tabel, aktifkan CDC untuk setiap tabel dan buat topik yang sesuai di ApsaraMQ for Kafka.
Untuk daftar lengkap opsi konfigurasi konektor, lihat dokumentasi konektor Debezium SQL Server.