Topik ini menjelaskan cara menggunakan demo klien Kafka untuk mengonsumsi data yang dilacak. Fitur pelacakan perubahan versi baru memungkinkan Anda mengonsumsi data yang dilacak dengan menggunakan klien Kafka V0.11 hingga V2.7.
Catatan penggunaan
Jika Anda mengaktifkan komit otomatis saat menggunakan fitur pelacakan perubahan, beberapa data mungkin dikomit sebelum dikonsumsi, yang dapat menyebabkan kehilangan data. Kami menyarankan Anda untuk secara manual mengkomit data.
CatatanJika data gagal dikomit, Anda dapat memulai ulang klien untuk melanjutkan konsumsi data dari titik pemeriksaan terakhir yang direkam. Namun, data duplikat mungkin dihasilkan selama periode ini. Anda harus secara manual menyaring data duplikat tersebut.
Data diserialisasi dan disimpan dalam format Avro. Untuk informasi lebih lanjut, lihat Record.avsc.
PeringatanJika Anda tidak menggunakan klien Kafka yang dijelaskan dalam topik ini, Anda harus mengurai data yang dilacak berdasarkan skema Avro dan memeriksa data yang telah diurai.
Satuan pencarian adalah detik ketika Data Transmission Service (DTS) memanggil operasi
offsetForTimes. Satuan pencarian adalah milidetik ketika klien Kafka asli memanggil operasi ini.Koneksi transien mungkin terjadi antara klien Kafka dan server pelacakan perubahan karena beberapa alasan, seperti pemulihan bencana. Jika Anda tidak menggunakan klien Kafka yang dijelaskan dalam topik ini, klien Kafka Anda harus memiliki kemampuan penyambungan ulang jaringan.
Jika Anda menggunakan klien Kafka asli untuk mengonsumsi data yang dilacak, modul pengumpulan data inkremental mungkin berubah di DTS. Dalam mode langganan, titik pemeriksaan konsumsi yang disimpan oleh klien Kafka ke server DTS dihapus. Anda perlu menentukan titik pemeriksaan konsumsi untuk mengonsumsi data yang dilacak sesuai dengan kebutuhan bisnis Anda. Jika Anda ingin mengonsumsi data dalam mode langganan, kami menyarankan Anda menggunakan demo SDK yang disediakan oleh DTS untuk melacak dan mengonsumsi data, atau secara manual mengelola titik pemeriksaan konsumsi. Untuk informasi lebih lanjut, lihat Gunakan Demo SDK untuk Mengonsumsi Data yang Dilacak dan bagian Kelola Titik Pemeriksaan Konsumsi dari topik ini.
Jalankan klien Kafka
Unduh demo klien Kafka. Untuk informasi lebih lanjut tentang cara menggunakan demo, lihat Readme.
Klik
dan pilih Download ZIP untuk mengunduh paket.Jika Anda menggunakan klien Kafka versi 2.0, Anda harus mengubah nomor versi dalam file subscribe_example-master/javaimpl/pom.xml menjadi 2.0.0.

Tabel 1 Deskripsi Proses
Langkah | Direktori atau file terkait |
1. Gunakan konsumen Kafka asli untuk mendapatkan data inkremental dari instance pelacakan perubahan. | subscribe_example-master/javaimpl/src/main/java/recordgenerator/ |
2. Deserialize gambar data inkremental, dan dapatkan pre-image , post-image , dan atribut lainnya. Peringatan
| subscribe_example-master/javaimpl/src/main/java/boot/RecordPrinter.java |
3. Konversikan nilai dataTypeNumber dalam data yang telah dideserialisasi menjadi tipe data dari database yang sesuai. Catatan Untuk informasi lebih lanjut, lihat Pemetaan antara tipe data dan nilai dataTypeNumber dalam topik ini. | subscribe_example-master/javaimpl/src/main/java/recordprocessor/mysql/ |
Prosedur
Langkah-langkah berikut menunjukkan cara menjalankan klien Kafka untuk mengonsumsi data yang dilacak. Dalam contoh ini, IntelliJ IDEA Community Edition 2018.1.4 untuk Windows digunakan.
Buat instance pelacakan perubahan. Untuk informasi lebih lanjut, lihat Ikhtisar Skenario Pelacakan Perubahan.
Buat satu atau lebih grup konsumen. Untuk informasi lebih lanjut, lihat Buat Grup Konsumen.
Unduh paket demo klien Kafka dan dekompresi paket tersebut.
CatatanKlik
dan pilih Download ZIP untuk mengunduh paket.Buka IntelliJ IDEA. Di jendela yang muncul, klik Open.

Di kotak dialog yang muncul, pergi ke direktori tempat demo yang diunduh berada. Temukan file pom.xml.

Di kotak dialog yang muncul, pilih Open as Project.
Di jendela Alat Proyek IntelliJ IDEA, klik folder untuk menemukan file demo klien Kafka, dan klik dua kali file tersebut. Nama file adalah NotifyDemoDB.java.
Tentukan parameter dalam file NotifyDemoDB.java.

Parameter
Deskripsi
Cara mendapatkan nilai parameter
USER_NAME
Nama pengguna akun grup konsumen.
PeringatanJika Anda tidak menggunakan klien Kafka yang dijelaskan dalam topik ini, Anda harus menentukan parameter ini dalam format berikut:
<Username>-<ID Grup Konsumen>. Contoh:dtstest-dtsae******bpv. Jika tidak, koneksi akan gagal.Di konsol DTS, temukan instance pelacakan perubahan yang ingin Anda kelola dan klik ID instance. Di panel navigasi di sebelah kiri, klik Consume Data. Di halaman yang muncul, Anda dapat melihat informasi tentang grup konsumen, seperti ID atau nama dan akun grup konsumen.
CatatanKata sandi akun grup konsumen ditentukan saat Anda membuat grup konsumen.
PASSWORD_NAME
Kata sandi akun.
SID_NAME
ID grup konsumen.
GROUP_NAME
Nama grup konsumen. Tetapkan parameter ini ke ID grup konsumen.
KAFKA_TOPIC
Nama topik yang dilacak dari instance pelacakan perubahan.
Di konsol DTS, temukan instance pelacakan perubahan yang ingin Anda kelola dan klik ID instance. Di halaman Basic Information, Anda dapat melihat informasi tentang topik dan jaringan.
KAFKA_BROKER_URL_NAME
Titik akhir instance pelacakan perubahan.
CatatanJika Anda melacak perubahan data melalui jaringan internal, latensi jaringan minimal. Ini berlaku jika instance Elastic Compute Service (ECS) tempat Anda menerapkan klien Kafka berada di jaringan klasik atau di virtual private cloud (VPC) yang sama dengan instance pelacakan perubahan.
Untuk memastikan stabilitas jaringan, kami menyarankan Anda untuk tidak menggunakan titik akhir publik.
INITIAL_CHECKPOINT_NAME
Titik pemeriksaan konsumsi data yang dikonsumsi. Nilainya adalah timestamp UNIX. Contoh: 1592269238.
CatatanAnda harus menyimpan titik pemeriksaan konsumsi untuk alasan berikut:
Jika proses konsumsi terganggu, Anda dapat menentukan titik pemeriksaan konsumsi pada klien Kafka untuk melanjutkan konsumsi data. Ini mencegah kehilangan data.
Saat Anda memulai klien Kafka, Anda dapat menentukan titik pemeriksaan konsumsi untuk mengonsumsi data berdasarkan kebutuhan bisnis Anda.
Jika parameter SUBSCRIBE_MODE_NAME diatur ke subscribe, parameter INITIAL_CHECKPOINT_NAME yang Anda tentukan hanya berlaku saat Anda memulai klien Kafka untuk pertama kalinya.
Titik pemeriksaan konsumsi data yang dikonsumsi harus berada dalam rentang data instance pelacakan perubahan. Titik pemeriksaan konsumsi harus dikonversi menjadi timestamp UNIX.
CatatanAnda dapat melihat rentang data instance pelacakan perubahan di kolom Data Range pada halaman Tugas Pelacakan Perubahan.
Anda dapat menggunakan mesin pencari untuk mendapatkan konverter timestamp UNIX.
USE_CONFIG_CHECKPOINT_NAME
Menentukan apakah klien dipaksa untuk mengonsumsi data dari titik pemeriksaan konsumsi yang ditentukan. Nilai default: true. Anda dapat mengatur parameter ini ke true untuk mencegah data yang diterima tetapi belum diproses hilang.
Tidak ada
SUBSCRIBE_MODE_NAME
Menentukan apakah menjalankan dua atau lebih klien Kafka untuk grup konsumen. Jika Anda ingin menggunakan fitur ini, atur parameter ini ke subscribe untuk klien Kafka ini.
Nilai default adalah assign, yang menunjukkan bahwa fitur ini tidak digunakan. Kami menyarankan Anda hanya menerapkan satu klien Kafka untuk grup konsumen.
Tidak ada
Di bilah menu atas IntelliJ IDEA, pilih untuk menjalankan klien.
CatatanJika Anda menjalankan IntelliJ IDEA untuk pertama kali, waktu tertentu diperlukan untuk memuat dan menginstal dependensi yang relevan.
Hasil pada klien Kafka
Gambar berikut menunjukkan bahwa klien Kafka dapat melacak perubahan data dari database sumber.

Anda dapat menghapus dua garis miring ke depan (//) dari string //log.info(ret) di baris 25 file NotifyDemoDB.java. Kemudian, jalankan klien lagi untuk melihat informasi perubahan data.
FAQ
T: Mengapa saya perlu mencatat titik pemeriksaan konsumsi klien Kafka?
J: Titik pemeriksaan konsumsi yang dicatat oleh DTS adalah titik waktu ketika DTS menerima operasi commit dari klien Kafka. Titik pemeriksaan konsumsi yang dicatat mungkin berbeda dari waktu konsumsi aktual. Jika aplikasi bisnis atau klien Kafka terganggu secara tak terduga, Anda dapat menentukan titik pemeriksaan konsumsi yang akurat untuk melanjutkan konsumsi data. Ini mencegah kehilangan data atau konsumsi data duplikat.
Kelola titik pemeriksaan konsumsi
Konfigurasikan klien Kafka untuk mendengarkan pergantian modul pengumpulan data di DTS.
Anda dapat mengonfigurasi properti konsumen klien Kafka untuk mendengarkan pergantian modul pengumpulan data di DTS. Kode berikut memberikan contoh tentang cara mengonfigurasi properti konsumen:
properties.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ClusterSwitchListener.class.getName());Kode berikut memberikan contoh tentang cara mengimplementasikan ClusterSwitchListener:
public class ClusterSwitchListener implements ClusterResourceListener, ConsumerInterceptor { private final static Logger LOG = LoggerFactory.getLogger(ClusterSwitchListener.class); private ClusterResource originClusterResource = null; private ClusterResource currentClusterResource = null; public ConsumerRecords onConsume(ConsumerRecords records) { return records; } public void close() { } public void onCommit(Map offsets) { } public void onUpdate(ClusterResource clusterResource) { synchronized (this) { originClusterResource = currentClusterResource; currentClusterResource = clusterResource; if (null == originClusterResource) { LOG.info("Cluster updated to " + currentClusterResource.clusterId()); } else { if (originClusterResource.clusterId().equals(currentClusterResource.clusterId())) { LOG.info("Cluster not changed on update:" + clusterResource.clusterId()); } else { LOG.error("Cluster changed"); throw new ClusterSwitchException("Cluster changed from " + originClusterResource.clusterId() + " to " + currentClusterResource.clusterId() + ", consumer require restart"); } } } } public boolean isClusterResourceChanged() { if (null == originClusterResource) { return false; } if (originClusterResource.clusterId().equals(currentClusterResource.clusterId())) { return false; } return true; } public void configure(Map<String, ?> configs) { } public static class ClusterSwitchException extends KafkaException { public ClusterSwitchException(String message, Throwable cause) { super(message, cause); } public ClusterSwitchException(String message) { super(message); } public ClusterSwitchException(Throwable cause) { super(cause); } public ClusterSwitchException() { super(); } }Tentukan titik pemeriksaan konsumsi berdasarkan pergantian modul pengumpulan data yang ditangkap di DTS.
Atur titik pemeriksaan konsumsi awal pelacakan data berikutnya ke timestamp entri data terlacak terbaru yang dikonsumsi oleh klien. Kode berikut memberikan contoh tentang cara menentukan titik pemeriksaan konsumsi:
try{ //do some action } catch (ClusterSwitchListener.ClusterSwitchException e) { reset(); } // Reset the consumption checkpoint. public reset() { long offset = kafkaConsumer.offsetsForTimes(timestamp); kafkaConsumer.seek(tp,offset); }CatatanUntuk informasi lebih lanjut tentang contoh-contoh, lihat KafkaRecordFetcher.
Pemetaan antara tipe data dan nilai dataTypeNumber
Pemetaan antara tipe data MySQL dan nilai dataTypeNumber
Tipe data MySQL | Nilai dataTypeNumber |
MYSQL_TYPE_DECIMAL | 0 |
MYSQL_TYPE_INT8 | 1 |
MYSQL_TYPE_INT16 | 2 |
MYSQL_TYPE_INT32 | 3 |
MYSQL_TYPE_FLOAT | 4 |
MYSQL_TYPE_DOUBLE | 5 |
MYSQL_TYPE_NULL | 6 |
MYSQL_TYPE_TIMESTAMP | 7 |
MYSQL_TYPE_INT64 | 8 |
MYSQL_TYPE_INT24 | 9 |
MYSQL_TYPE_DATE | 10 |
MYSQL_TYPE_TIME | 11 |
MYSQL_TYPE_DATETIME | 12 |
MYSQL_TYPE_YEAR | 13 |
MYSQL_TYPE_DATE_NEW | 14 |
MYSQL_TYPE_VARCHAR | 15 |
MYSQL_TYPE_BIT | 16 |
MYSQL_TYPE_TIMESTAMP_NEW | 17 |
MYSQL_TYPE_DATETIME_NEW | 18 |
MYSQL_TYPE_TIME_NEW | 19 |
MYSQL_TYPE_JSON | 245 |
MYSQL_TYPE_DECIMAL_NEW | 246 |
MYSQL_TYPE_ENUM | 247 |
MYSQL_TYPE_SET | 248 |
MYSQL_TYPE_TINY_BLOB | 249 |
MYSQL_TYPE_MEDIUM_BLOB | 250 |
MYSQL_TYPE_LONG_BLOB | 251 |
MYSQL_TYPE_BLOB | 252 |
MYSQL_TYPE_VAR_STRING | 253 |
MYSQL_TYPE_STRING | 254 |
MYSQL_TYPE_GEOMETRY | 255 |
Pemetaan antara tipe data Oracle dan nilai dataTypeNumber
Tipe data Oracle | Nilai dataTypeNumber |
VARCHAR2/NVARCHAR2 | 1 |
NUMBER/FLOAT | 2 |
LONG | 8 |
DATE | 12 |
RAW | 23 |
LONG_RAW | 24 |
UNDEFINED | 29 |
XMLTYPE | 58 |
ROWID | 69 |
CHAR and NCHAR | 96 |
BINARY_FLOAT | 100 |
BINARY_DOUBLE | 101 |
CLOB/NCLOB | 112 |
BLOB | 113 |
BFILE | 114 |
TIMESTAMP | 180 |
TIMESTAMP_WITH_TIME_ZONE | 181 |
INTERVAL_YEAR_TO_MONTH | 182 |
INTERVAL_DAY_TO_SECOND | 183 |
UROWID | 208 |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | 231 |
Pemetaan antara tipe data PostgreSQL dan nilai dataTypeNumber
Tipe data PostgreSQL | Nilai dataTypeNumber |
INT2/SMALLINT | 21 |
INT4/INTEGER/SERIAL | 23 |
INT8/BIGINT | 20 |
CHARACTER | 18 |
CHARACTER VARYING | 1043 |
REAL | 700 |
DOUBLE PRECISION | 701 |
NUMERIC | 1700 |
MONEY | 790 |
DATE | 1082 |
TIME/TIME WITHOUT TIME ZONE | 1083 |
TIME WITH TIME ZONE | 1266 |
TIMESTAMP/TIMESTAMP WITHOUT TIME ZONE | 1114 |
TIMESTAMP WITH TIME ZONE | 1184 |
BYTEA | 17 |
TEXT | 25 |
JSON | 114 |
JSONB | 3082 |
XML | 142 |
UUID | 2950 |
POINT | 600 |
LSEG | 601 |
PATH | 602 |
BOX | 603 |
POLYGON | 604 |
LINE | 628 |
CIDR | 650 |
CIRCLE | 718 |
MACADDR | 829 |
INET | 869 |
INTERVAL | 1186 |
TXID_SNAPSHOT | 2970 |
PG_LSN | 3220 |
TSVECTOR | 3614 |
TSQUERY | 3615 |