Versi terbaru pelacakan perubahan mendukung client Kafka versi 0.11 hingga 2.7 untuk mengonsumsi data langganan. DTS menyediakan demo client Kafka. Topik ini menjelaskan cara menggunakan client tersebut.
Catatan penting
-
Jika Anda menggunakan demo yang disediakan dalam topik ini dan mengaktifkan auto commit, beberapa data mungkin hilang karena operasi commit dapat dijalankan sebelum semua data dikonsumsi. Gunakan manual commit untuk menghindari masalah ini.
CatatanJika kegagalan mencegah commit berhasil, client akan melanjutkan konsumsi data dari offset terakhir yang terekam setelah restart. Hal ini dapat mengakibatkan data duplikat. Filter duplikat secara manual.
-
Data disimpan dalam format serialisasi Avro. Untuk detail tentang format tersebut, lihat dokumen Record.avsc.
PeringatanJika Anda tidak menggunakan client Kafka yang disediakan dalam topik ini, deserialisasi (lihat contoh deserialisasi Avro DTS) mungkin menghasilkan data yang tidak akurat. Verifikasi kebenaran data secara mandiri.
-
Untuk API
offsetForTimes, DTS menggunakan detik sebagai satuan pencarian, sedangkan Kafka asli menggunakan milidetik. -
Pemutusan jaringan sementara dapat terjadi pada server pelacakan perubahan karena disaster recovery atau alasan lain. Jika Anda tidak menggunakan client Kafka yang disediakan dalam topik ini, pastikan client Kafka Anda mendukung retry jaringan.
-
Jika Anda menggunakan client Kafka asli untuk mengonsumsi data langganan, DTS mungkin mengganti modul ingestion data inkrementalnya. Hal ini akan menghapus offset konsumen yang tersimpan di server dalam mode subscribe. Sesuaikan offset konsumen secara manual sesuai kebutuhan untuk mengonsumsi data.
Alur kerja client Kafka
Unduh kode demo client Kafka. Untuk detail lebih lanjut tentang penggunaan kode tersebut, lihat file Readme dalam demo.
-
Klik
, lalu pilih Download ZIP untuk mengunduh file. -
Untuk menggunakan client Kafka versi 2.0, ubah file subscribe_example-master/javaimpl/pom.xml dan ganti versi client Kafka menjadi 2.0.0.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
Tabel 1. Deskripsi alur kerja
|
Langkah |
Direktori atau file terkait |
|
1. Gunakan consumer Kafka asli untuk mengambil data inkremental dari saluran pelacakan perubahan. |
subscribe_example-master/javaimpl/src/main/java/recordgenerator/ |
|
2. Deserialisasi data inkremental yang diambil untuk mendapatkan before image (nilai bidang sebelum perubahan), after image (nilai bidang setelah perubahan), dan properti lainnya. Peringatan
|
subscribe_example-master/javaimpl/src/main/java/boot/RecordPrinter.java |
|
3. Konversi field dataTypeNumber dalam data yang telah dideserialisasi ke tipe bidang database yang sesuai. |
subscribe_example-master/javaimpl/src/main/java/recordprocessor/mysql/ |
Prosedur
Topik ini menggunakan IntelliJ IDEA (Community Edition 2018.1.4 untuk Windows) sebagai contoh untuk menjelaskan cara menjalankan client guna mengonsumsi data dari saluran pelacakan perubahan.
-
Buat saluran pelacakan perubahan baru. Untuk detailnya, lihat Buat saluran pelacakan perubahan RDS MySQL, Buat saluran pelacakan perubahan PolarDB MySQL, atau Buat saluran pelacakan perubahan Oracle.
-
Buat satu atau beberapa kelompok konsumen. Untuk detailnya, lihat Tambahkan kelompok konsumen.
-
Unduh kode demo client Kafka, lalu ekstrak file tersebut.
CatatanKlik
, lalu pilih Download ZIP untuk mengunduh file. -
Buka IntelliJ IDEA, lalu klik Open.
-
Pada kotak dialog yang muncul, buka direktori tempat Anda mengunduh kode demo client Kafka. Navigasi melalui folder untuk menemukan file model objek proyek: pom.xml.
Navigasi ke
kafkademo>subscribe_example-master>javaimpl, pilihpom.xml, lalu klik OK. -
Pada kotak dialog yang muncul, pilih Open as Project.
-
Di antarmuka IntelliJ IDEA, navigasi melalui folder untuk menemukan dan klik ganda file demo client Kafka: NotifyDemoDB.java.
-
Atur nilai parameter dalam file NotifyDemoDB.java.
public static Properties getConfigs() { Properties properties = new Properties(); // user password and sid for auth properties.setProperty(USER_NAME, "dtstest"); properties.setProperty(PASSWORD_NAME, "xxx"); properties.setProperty(SID_NAME, "dtsxxx"); // kafka consumer group general same with sid properties.setProperty(GROUP_NAME, "dtsxxx"); // topic to consume, partition is 0 properties.setProperty(KAFKA_TOPIC, "cn_hangzhou_xxx"); // kafka broker url properties.setProperty(KAFKA_BROKER_URL_NAME, "dts-cn-xxx.com:18001"); // initial checkpoint for first seek(a timestamp to set, eg 1566180200 if you want (Mon Aug 19 10:03:21 CST 2019)) properties.setProperty(INITIAL_CHECKPOINT_NAME, "1583307907"); // if force use config checkpoint when start. for checkpoint reset properties.setProperty(USE_CONFIG_CHECKPOINT_NAME, "true"); // use consumer assign or subscribe interface // when use subscribe mode, group config is required. kafka consumer group is enabled properties.setProperty(SUBSCRIBE_MODE_NAME, "assign"); return properties; }Parameter
Deskripsi
Cara memperoleh
USER_NAME
Akun untuk kelompok konsumen.
PeringatanJika Anda tidak menggunakan client yang disediakan dalam topik ini, atur username dalam format
<akun kelompok konsumen>-<ID kelompok konsumen>(misalnya,dtstest-dtsae******bpv). Jika tidak, koneksi akan gagal.Di Konsol DTS, klik ID instansi subscription target, lalu klik Consume Data. Anda dapat memperoleh informasi Consumer Group ID dan Account.
CatatanAnda menentukan password untuk akun kelompok konsumen saat membuat kelompok konsumen.
PASSWORD_NAME
Password untuk akun tersebut.
SID_NAME
ID kelompok konsumen.
GROUP_NAME
Nama kelompok konsumen. Atur parameter ini dengan nilai yang sama seperti ID kelompok konsumen.
KAFKA_TOPIC
Topik subscription dari saluran pelacakan perubahan.
Di Konsol DTS, klik ID instansi subscription target. Pada halaman Task Management, Anda dapat memperoleh informasi Topic dan alamat jaringan. Dapatkan nilai Subscription Topic dari bagian Basic Information dan VPC endpoint (format contoh:
xxx.aliyuncs.com:18003) dari bagian Network.KAFKA_BROKER_URL_NAME
Informasi alamat jaringan untuk saluran subscription data.
Catatan-
Jika instance ECS tempat Anda men-deploy client Kafka dan saluran pelacakan perubahan berada di jaringan klasik atau VPC yang sama, gunakan titik akhir internal untuk meminimalkan latensi jaringan.
-
Kami tidak menyarankan menggunakan titik akhir publik karena potensi ketidakstabilan jaringan.
INITIAL_CHECKPOINT_NAME
Waktu data untuk memulai konsumsi, dalam format stempel waktu UNIX (misalnya, 1592269238).
Catatan-
Simpan stempel waktu tersebut sendiri untuk:
-
Melanjutkan konsumsi data dari stempel waktu terakhir yang dikonsumsi setelah program terganggu guna mencegah kehilangan data.
-
Menentukan offset konsumen yang diinginkan saat memulai client subscription agar dapat mengonsumsi data sesuai kebutuhan.
-
-
Jika SUBSCRIBE_MODE_NAME diatur ke subscribe, nilai INITIAL_CHECKPOINT_NAME hanya berlaku selama startup pertama client subscription.
Stempel waktu data harus berada dalam rentang waktu instansi subscription dan harus dikonversi ke stempel waktu UNIX. Di daftar tugas subscription DTS, periksa bidang Timestamp Range untuk tugas subscription guna menentukan rentang valid untuk INITIAL_CHECKPOINT_NAME.
CatatanGunakan mesin pencari untuk menemukan konverter stempel waktu UNIX.
USE_CONFIG_CHECKPOINT_NAME
Nilai default adalah true. Ini memaksa konsumsi dari stempel waktu data yang ditentukan untuk menghindari kehilangan data yang telah diterima tetapi belum diproses.
Tidak ada
SUBSCRIBE_MODE_NAME
Untuk menjalankan dua atau lebih client Kafka dalam satu kelompok konsumen, atur parameter ini ke subscribe untuk semua client.
Nilai default adalah assign, yang menonaktifkan fitur ini. Deploy hanya satu client dalam mode ini.
Tidak ada
-
-
Di menu atas antarmuka IntelliJ IDEA, pilih untuk menjalankan client.
CatatanSaat menjalankan pertama kali, perangkat lunak memerlukan waktu untuk memuat dan menginstal dependensi secara otomatis.
Hasil eksekusi
Saat Anda menjalankan client, client tersebut berhasil berlangganan perubahan data dari database sumber.
[2020-03-09 10:41:52,408] INFO [Consumer clientId=consumer-1, groupId=dts_xxx] Discovered coordinator xxx (id: xxx rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-03-09 10:41:57,203] INFO commit record with checkpoint Checkpoint[ topicPartition: cn_hangzhou_rm_xxx_dtstest-0timestamp: 1583721711, offset: 1732521, info: 1583721711] (recordprocessor.EtlRecordProcessor)
[2020-03-09 10:41:57,571] INFO EtlRecordProcessor: haven't receive records from generator for 5s (recordprocessor.EtlRecordProcessor)
[2020-03-09 10:42:02,203] INFO commit record with checkpoint Checkpoint[ topicPartition: cn_hangzhou_rm_xxx_dtstest-0timestamp: 1583721721, offset: 1732539, info: 1583721721] (recordprocessor.EtlRecordProcessor)
[2020-03-09 10:42:07,204] INFO commit record with checkpoint Checkpoint[ topicPartition: cn_hangzhou_rm_xxx_dtstest-0timestamp: 1583721726, offset: 1732544, info: 1583721726] (recordprocessor.EtlRecordProcessor)
[2020-03-09 10:42:12,205] INFO commit record with checkpoint Checkpoint[ topicPartition: cn_hangzhou_rm_xxx_dtstest-0timestamp: 1583721731, offset: 1732548, info: 1583721731] (recordprocessor.EtlRecordProcessor)
[2020-03-09 10:42:17,205] INFO commit record with checkpoint Checkpoint[ topicPartition: cn_hangzhou_rm_xxx_dtstest-0timestamp: 1583721736, offset: 1732554, info: 1583721736] (recordprocessor.EtlRecordProcessor)
[2020-03-09 10:42:22,205] INFO commit record with checkpoint Checkpoint[ topicPartition: cn_hangzhou_rm_xxx_dtstest-0timestamp: 1583721741, offset: 1732559, info: 1583721741] (recordprocessor.EtlRecordProcessor)
[2020-03-09 10:42:27,206] INFO commit record with checkpoint Checkpoint[ topicPartition: cn_hangzhou_rm_xxx_dtstest-0timestamp: 1583721746, offset: 1732569, info: 1583721746] (recordprocessor.EtlRecordProcessor)
Anda juga dapat mengaktifkan kembali baris pencetakan log dalam file NotifyDemoDB.java (hapus // dari baris 25: //log.info(ret);), lalu jalankan kembali client untuk melihat informasi perubahan data secara rinci.
FAQ
-
Q: Mengapa saya perlu mencatat offset konsumen sendiri?
A: DTS mencatat offset konsumen berdasarkan waktu penerimaan operasi commit dari client consumer Kafka, yang mungkin berbeda dari waktu konsumsi aktual. Setelah aplikasi atau client consumer Kafka Anda mengalami gangguan abnormal, Anda dapat melanjutkan konsumsi dari offset yang telah Anda catat untuk menghindari data duplikat atau hilang.
Pemetaan antara tipe bidang MySQL dan nilai dataTypeNumber
|
Tipe bidang MySQL |
Nilai dataTypeNumber yang sesuai |
|
MYSQL_TYPE_DECIMAL |
0 |
|
MYSQL_TYPE_INT8 |
1 |
|
MYSQL_TYPE_INT16 |
2 |
|
MYSQL_TYPE_INT32 |
3 |
|
MYSQL_TYPE_FLOAT |
4 |
|
MYSQL_TYPE_DOUBLE |
5 |
|
MYSQL_TYPE_NULL |
6 |
|
MYSQL_TYPE_TIMESTAMP |
7 |
|
MYSQL_TYPE_INT64 |
8 |
|
MYSQL_TYPE_INT24 |
9 |
|
MYSQL_TYPE_DATE |
10 |
|
MYSQL_TYPE_TIME |
11 |
|
MYSQL_TYPE_DATETIME |
12 |
|
MYSQL_TYPE_YEAR |
13 |
|
MYSQL_TYPE_DATE_NEW |
14 |
|
MYSQL_TYPE_VARCHAR |
15 |
|
MYSQL_TYPE_BIT |
16 |
|
MYSQL_TYPE_TIMESTAMP_NEW |
17 |
|
MYSQL_TYPE_DATETIME_NEW |
18 |
|
MYSQL_TYPE_TIME_NEW |
19 |
|
MYSQL_TYPE_JSON |
245 |
|
MYSQL_TYPE_DECIMAL_NEW |
246 |
|
MYSQL_TYPE_ENUM |
247 |
|
MYSQL_TYPE_SET |
248 |
|
MYSQL_TYPE_TINY_BLOB |
249 |
|
MYSQL_TYPE_MEDIUM_BLOB |
250 |
|
MYSQL_TYPE_LONG_BLOB |
251 |
|
MYSQL_TYPE_BLOB |
252 |
|
MYSQL_TYPE_VAR_STRING |
253 |
|
MYSQL_TYPE_STRING |
254 |
|
MYSQL_TYPE_GEOMETRY |
255 |
Pemetaan antara tipe bidang Oracle dan nilai dataTypeNumber
|
Tipe bidang Oracle |
Nilai dataTypeNumber yang sesuai |
|
VARCHAR2/NVARCHAR2 |
1 |
|
NUMBER/FLOAT |
2 |
|
LONG |
8 |
|
DATE |
12 |
|
RAW |
23 |
|
LONG_RAW |
24 |
|
UNDEFINED |
29 |
|
XMLTYPE |
58 |
|
ROWID |
69 |
|
CHAR、NCHAR |
96 |
|
BINARY_FLOAT |
100 |
|
BINARY_DOUBLE |
101 |
|
CLOB/NCLOB |
112 |
|
BLOB |
113 |
|
BFILE |
114 |
|
TIMESTAMP |
180 |
|
TIMESTAMP_WITH_TIME_ZONE |
181 |
|
INTERVAL_YEAR_TO_MONTH |
182 |
|
INTERVAL_DAY_TO_SECOND |
183 |
|
UROWID |
208 |
|
TIMESTAMP_WITH_LOCAL_TIME_ZONE |
231 |
Pemetaan antara tipe bidang PostgreSQL dan nilai dataTypeNumber
|
Tipe bidang PostgreSQL |
Nilai dataTypeNumber yang sesuai |
|
INT2/SMALLINT |
21 |
|
INT4/INTEGER/SERIAL |
23 |
|
INT8/BIGINT |
20 |
|
CHARACTER |
18 |
|
CHARACTER VARYING |
1043 |
|
REAL |
700 |
|
DOUBLE PRECISION |
701 |
|
NUMERIC |
1700 |
|
MONEY |
790 |
|
DATE |
1082 |
|
TIME/TIME WITHOUT TIME ZONE |
1083 |
|
TIME WITH TIME ZONE |
1266 |
|
TIMESTAMP/TIMESTAMP WITHOUT TIME ZONE |
1114 |
|
TIMESTAMP WITH TIME ZONE |
1184 |
|
BYTEA |
17 |
|
TEXT |
25 |
|
JSON |
114 |
|
JSONB |
3082 |
|
XML |
142 |
|
UUID |
2950 |
|
POINT |
600 |
|
LSEG |
601 |
|
PATH |
602 |
|
BOX |
603 |
|
POLYGON |
604 |
|
LINE |
628 |
|
CIDR |
650 |
|
CIRCLE |
718 |
|
MACADDR |
829 |
|
INET |
869 |
|
INTERVAL |
1186 |
|
TXID_SNAPSHOT |
2970 |
|
PG_LSN |
3220 |
|
TSVECTOR |
3614 |
|
TSQUERY |
3615 |