Topik ini menjelaskan cara menggunakan demo klien Kafka untuk mengonsumsi data yang dilacak. Fitur pelacakan perubahan versi baru memungkinkan Anda mengonsumsi data yang dilacak menggunakan klien Kafka versi V0.11 hingga V2.7.
Catatan penggunaan
Jika Anda mengaktifkan komit otomatis saat menggunakan fitur pelacakan perubahan, beberapa data mungkin dikomit sebelum dikonsumsi, sehingga menyebabkan kehilangan data. Kami menyarankan Anda untuk secara manual mengkomit data.
CatatanJika data gagal dikomit, Anda dapat memulai ulang klien untuk melanjutkan konsumsi data dari titik pemeriksaan terakhir yang tercatat. Namun, data duplikat mungkin dihasilkan selama periode ini. Anda harus secara manual menyaring data duplikat tersebut.
Data diserialisasi dan disimpan dalam format Avro. Untuk informasi lebih lanjut, lihat Record.avsc.
PeringatanJika Anda tidak menggunakan klien Kafka yang dijelaskan dalam topik ini, Anda harus mengurai data yang dilacak berdasarkan skema Avro.
Satuan pencarian adalah detik ketika Data Transmission Service (DTS) memanggil operasi
offsetForTimes. Satuan pencarian adalah milidetik ketika klien Kafka asli memanggil operasi ini.Koneksi transien mungkin terjadi antara klien Kafka dan server pelacakan perubahan karena beberapa alasan, seperti pemulihan bencana. Jika Anda tidak menggunakan klien Kafka yang dijelaskan dalam topik ini, klien Kafka Anda harus memiliki kemampuan penyambungan ulang jaringan.
Jalankan klien Kafka
Unduh demo klien Kafka. Untuk informasi lebih lanjut tentang cara menggunakan demo, lihat Readme.
Klik ikon
dan pilih Download ZIP untuk mengunduh paket.Jika Anda menggunakan klien Kafka versi 2.0, Anda harus mengubah nomor versi dalam file subscribe_example-master/javaimpl/pom.xml menjadi 2.0.0.

Tabel 1. Langkah-langkah untuk Menjalankan Klien Kafka
Langkah | File atau direktori |
1. Gunakan konsumen Kafka asli untuk mendapatkan data inkremental dari instance pelacakan perubahan. | subscribe_example-master/javaimpl/src/main/java/recordgenerator/ |
2. Deserialize gambar data inkremental, dan dapatkan pre-image (Nilai setiap bidang sebelum entri data diperbarui), post-image (Nilai setiap bidang setelah entri data diperbarui), dan atribut lainnya. Peringatan
| subscribe_example-master/javaimpl/src/main/java/boot/RecordPrinter.java |
3. Konversikan nilai dataTypeNumber dalam data yang telah dideserialize menjadi tipe data dari database yang sesuai. Catatan Untuk informasi lebih lanjut, lihat bagian-bagian berikut dari topik ini: | subscribe_example-master/javaimpl/src/main/java/recordprocessor/mysql/ |
Prosedur
Langkah-langkah berikut menunjukkan cara menjalankan klien Kafka untuk mengonsumsi data yang dilacak. Dalam contoh ini, IntelliJ IDEA Community Edition 2018.1.4 untuk Windows digunakan.
Buat tugas pelacakan perubahan. Untuk informasi lebih lanjut, lihat Lacak Perubahan Data dari Instance ApsaraDB RDS for MySQL, Lacak Perubahan Data dari Kluster PolarDB for MySQL, atau Lacak Perubahan Data dari Database Oracle yang Dikelola Sendiri.
Buat satu atau lebih grup konsumen. Untuk informasi lebih lanjut, lihat Buat Grup Konsumen.
Unduh demo klien Kafka dan ekstrak paket tersebut.
CatatanKlik ikon
dan pilih Download ZIP untuk mengunduh paket.Buka IntelliJ IDEA. Di jendela yang muncul, klik Open.

Di kotak dialog yang muncul, navigasikan ke direktori tempat demo yang diunduh berada. Temukan file pom.xml.

Di kotak dialog yang muncul, pilih Open as Project.
Di jendela Alat Proyek IntelliJ IDEA, klik folder untuk menemukan file demo klien Kafka, lalu klik dua kali file tersebut. Nama file adalah NotifyDemoDB.java.
Konfigurasikan parameter dalam file NotifyDemoDB.java.

Parameter
Deskripsi
Cara mendapatkan nilai parameter
USER_NAME
Akun grup konsumen.
PeringatanJika Anda tidak menggunakan klien Kafka yang dijelaskan dalam topik ini, Anda harus menentukan akun dalam format berikut:
<Username>-<Consumer group ID>. Contoh:dtstest-dtsae******bpv. Jika tidak, koneksi akan gagal.Di konsol DTS, temukan instance pelacakan perubahan yang ingin Anda kelola dan klik ID instance. Di panel navigasi di sebelah kiri, klik Consume Data. Di halaman yang muncul, Anda dapat memperoleh ID grup konsumen dan nama pengguna yang sesuai.
CatatanKata sandi akun grup konsumen ditentukan secara otomatis saat Anda membuat grup konsumen.

PASSWORD_NAME
Kata sandi akun.
SID_NAME
ID grup konsumen.
GROUP_NAME
Nama grup konsumen. Atur parameter ini ke ID grup konsumen.
KAFKA_TOPIC
Topik instance pelacakan perubahan.
Di konsol DTS, temukan instance pelacakan perubahan yang ingin Anda kelola dan klik ID instance. Pada halaman Task Management, Anda dapat memperoleh topik dan informasi jaringan.

KAFKA_BROKER_URL_NAME
Titik akhir instance pelacakan perubahan.
CatatanJika Anda melacak perubahan data melalui jaringan internal, latensi jaringan minimal. Ini berlaku jika instance Elastic Compute Service (ECS) tempat Anda menerapkan klien Kafka berada pada jaringan klasik atau di virtual private cloud (VPC) yang sama dengan instance pelacakan perubahan.
INITIAL_CHECKPOINT_NAME
Titik pemeriksaan konsumsi data yang dikonsumsi. Nilainya adalah timestamp UNIX. Contoh: 1592269238.
CatatanAnda harus menyimpan titik pemeriksaan konsumsi untuk alasan berikut:
Jika proses konsumsi terganggu, Anda dapat menentukan titik pemeriksaan konsumsi pada klien Kafka untuk melanjutkan konsumsi data. Ini mencegah kehilangan data.
Saat Anda memulai klien Kafka, Anda dapat menentukan titik pemeriksaan konsumsi untuk mengonsumsi data sesuai permintaan.
Jika parameter SUBSCRIBE_MODE_NAME diatur ke subscribe, parameter INITIAL_CHECKPOINT_NAME yang Anda tentukan hanya berlaku saat Anda memulai klien Kafka untuk pertama kalinya.
Titik pemeriksaan konsumsi data yang dikonsumsi harus berada dalam rentang data instance pelacakan perubahan, seperti yang ditunjukkan pada gambar berikut. Titik pemeriksaan konsumsi harus dikonversi menjadi timestamp UNIX.
CatatanAnda dapat menggunakan mesin pencari untuk mendapatkan konverter timestamp UNIX.
USE_CONFIG_CHECKPOINT_NAME
Menentukan apakah akan memaksa klien mengonsumsi data dari titik pemeriksaan konsumsi yang ditentukan. Nilai default: true. Anda dapat mengatur parameter ini ke true untuk mencegah data yang diterima tetapi belum diproses hilang.
Tidak tersedia
SUBSCRIBE_MODE_NAME
Menentukan apakah akan menjalankan dua atau lebih klien Kafka untuk grup konsumen. Jika Anda ingin menggunakan fitur ini, atur parameter ini ke subscribe untuk klien Kafka ini.
Nilai default adalah assign, yang menunjukkan bahwa fitur ini tidak digunakan. Kami menyarankan Anda hanya menerapkan satu klien Kafka untuk grup konsumen.
Tidak tersedia
Di bilah menu atas IntelliJ IDEA, pilih untuk menjalankan klien.
CatatanJika Anda menjalankan IntelliJ IDEA untuk pertama kali, waktu tertentu diperlukan untuk memuat dan menginstal dependensi yang relevan.
Hasil pada klien Kafka
Gambar berikut menunjukkan bahwa klien Kafka dapat melacak perubahan data dari database sumber.

Anda dapat menghapus dua garis miring ke depan (//) dari string //log.info(ret); di baris 25 file NotifyDemoDB.java. Kemudian, jalankan klien lagi untuk melihat informasi perubahan data.
FAQ
T: Mengapa saya perlu mencatat titik pemeriksaan konsumsi klien Kafka?
J: Titik pemeriksaan konsumsi yang dicatat oleh DTS adalah titik waktu ketika DTS menerima operasi commit dari klien Kafka. Titik pemeriksaan konsumsi yang dicatat mungkin berbeda dari waktu konsumsi aktual. Jika aplikasi bisnis atau klien Kafka terganggu secara tak terduga, Anda dapat menentukan titik pemeriksaan konsumsi yang akurat untuk melanjutkan konsumsi data. Ini mencegah kehilangan data atau konsumsi data duplikat.
Pemetaan antara tipe data MySQL dan nilai dataTypeNumber
Untuk informasi lebih lanjut, lihat SQL Type field.
Pemetaan antara tipe data Oracle dan nilai dataTypeNumber
Tipe data Oracle | Nilai dataTypeNumber |
VARCHAR2 dan NVARCHAR2 | 1 |
NUMBER dan FLOAT | 2 |
LONG | 8 |
DATE | 12 |
RAW | 23 |
LONG_RAW | 24 |
UNDEFINED | 29 |
XMLTYPE | 58 |
ROWID | 69 |
CHAR dan NCHAR | 96 |
BINARY_FLOAT | 100 |
BINARY_DOUBLE | 101 |
CLOB dan NCLOB | 112 |
BLOB | 113 |
BFILE | 114 |
TIMESTAMP | 180 |
TIMESTAMP_WITH_TIME_ZONE | 181 |
INTERVAL_YEAR_TO_MONTH | 182 |
INTERVAL_DAY_TO_SECOND | 183 |
UROWID | 208 |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | 231 |
Pemetaan antara tipe data PostgreSQL dan nilai dataTypeNumber
Tipe data PostgreSQL | Nilai dataTypeNumber |
INT2 dan SMALLINT | 21 |
INT4, INTEGER, dan SERIAL | 23 |
INT8 dan BIGINT | 20 |
CHARACTER | 18 |
CHARACTER VARYING | 1043 |
REAL | 700 |
DOUBLE PRECISION | 701 |
NUMERIC | 1700 |
MONEY | 790 |
DATE | 1082 |
TIME dan TIME WITHOUT TIME ZONE | 1083 |
TIME WITH TIME ZONE | 1266 |
TIMESTAMP dan TIMESTAMP WITHOUT TIME ZONE | 1114 |
TIMESTAMP WITH TIME ZONE | 1184 |
BYTEA | 17 |
TEXT | 25 |
JSON | 114 |
JSONB | 3082 |
XML | 142 |
UUID | 2950 |
POINT | 600 |
LSEG | 601 |
PATH | 602 |
BOX | 603 |
POLYGON | 604 |
LINE | 628 |
CIDR | 650 |
CIRCLE | 718 |
MACADDR | 829 |
INET | 869 |
INTERVAL | 1186 |
TXID_SNAPSHOT | 2970 |
PG_LSN | 3220 |
TSVECTOR | 3614 |
TSQUERY | 3615 |