Fitur change tracking dari Data Transmission Service (DTS) memungkinkan Anda mengonsumsi data menggunakan client Kafka (versi 0.11 hingga 2.7). Topik ini menjelaskan cara memulai dengan demo client Kafka yang disediakan.
Catatan
-
Saat menggunakan demo dalam topik ini, auto commit dapat menyebabkan kehilangan data karena offset mungkin dikomit sebelum pemrosesan data selesai. Untuk mencegah hal ini, kami merekomendasikan manual commit.
CatatanJika commit gagal, client akan memulai ulang konsumsi dari titik pemeriksaan konsumsi terakhir yang terekam. Hal ini dapat menyebabkan duplikasi data, yang harus Anda tangani dalam logika aplikasi Anda.
-
Data disimpan menggunakan serialisasi Avro. Untuk detail format, lihat file Record.avsc.
PeringatanJika Anda menggunakan client Kafka selain yang disediakan dalam topik ini, data mungkin diurai secara salah selama deserialisasi (Contoh deserialisasi Avro DTS). Anda harus memverifikasi data tersebut.
-
Di DTS, API
offsetForTimesmenggunakan satuan detik, sedangkan API Kafka asli menggunakan milidetik. -
Server Data Subscription mungkin mengalami gangguan jaringan sementara akibat peristiwa seperti disaster recovery. Jika Anda tidak menggunakan client Kafka yang disediakan dalam topik ini, pastikan client Anda mendukung retry jaringan.
-
Jika Anda menggunakan client Kafka asli dalam mode subscribe, titik pemeriksaan konsumsi yang disimpan di server mungkin dihapus saat DTS mengganti modul pengumpulan data inkrementalnya. Dalam kasus ini, Anda harus menyesuaikan titik pemeriksaan konsumsi secara manual. Jika Anda perlu menggunakan mode subscribe, kami merekomendasikan menggunakan SDK yang disediakan DTS untuk mengonsumsi data Data Subscription dan mengelola titik pemeriksaan konsumsi sendiri. Untuk informasi lebih lanjut, lihat Konsumsi data Data Subscription menggunakan SDK dan Mengelola titik pemeriksaan konsumsi.
Waktu proses client Kafka
Unduh demo client Kafka. Untuk instruksi penggunaan, lihat file Readme pada demo tersebut.
-
Klik dan pilih Download ZIP.
-
Untuk menggunakan client Kafka 2.0, ubah versi client menjadi 2.0.0 dalam file subscribe_example-master/javaimpl/pom.xml.
Tabel 1. Cara kerja
|
Langkah |
Direktori atau file |
|
1. Gunakan consumer Kafka asli untuk mengambil data perubahan dari saluran langganan. |
subscribe_example-master/javaimpl/src/main/java/recordgenerator/ |
|
2. Deserialisasi data perubahan untuk mengekstrak before image , after image , dan atribut lainnya. Peringatan
|
subscribe_example-master/javaimpl/src/main/java/boot/RecordPrinter.java |
|
3. Konversi field dataTypeNumber ke tipe field database yang sesuai. Catatan
Untuk detail pemetaan, lihat Pemetaan antara tipe field dan nilai dataTypeNumber. |
subscribe_example-master/javaimpl/src/main/java/recordprocessor/mysql/ |
Prosedur
Topik ini menjelaskan cara menjalankan demo client Kafka di IntelliJ IDEA untuk mengonsumsi data dari saluran langganan data.
-
Buat saluran langganan data. Untuk informasi lebih lanjut, lihat Ikhtisar solusi langganan.
-
Buat satu atau beberapa kelompok konsumen. Untuk informasi lebih lanjut, lihat Buat kelompok konsumen.
-
Unduh demo client Kafka dan ekstrak file tersebut.
CatatanKlik , lalu pilih Download ZIP untuk mengunduh file.
-
Buka IntelliJ IDEA dan klik Open.
-
Pada kotak dialog, arahkan ke direktori tempat Anda mengekstrak demo client Kafka. Buka folder seperti yang ditunjukkan pada gambar berikut untuk menemukan file Project Object Model (POM): pom.xml.
-
Pada kotak dialog yang muncul, klik Open as Project.
-
Di jendela IntelliJ IDEA, buka folder, lalu temukan dan klik ganda file demo client Kafka: NotifyDemoDB.java.
-
Atur nilai parameter dalam file NotifyDemoDB.java.
Parameter
Deskripsi
Cara memperoleh
USER_NAME
Username untuk kelompok konsumen.
PeringatanJika Anda menggunakan client berbeda, Anda harus mengatur username dalam format
<akun kelompok konsumen>-<ID kelompok konsumen>. Contoh:dtstest-dtsae******bpv.Di Konsol DTS, klik ID instans langganan target. Di panel navigasi kiri, klik Consume Data. Pada halaman yang muncul, Anda dapat menemukan informasi Consumer Group ID/Name dan Account untuk kelompok konsumen.
CatatanPassword untuk akun kelompok konsumen ditentukan saat Anda membuat kelompok konsumen.
PASSWORD_NAME
Password untuk akun kelompok konsumen.
SID_NAME
ID kelompok konsumen.
GROUP_NAME
Nilai ini harus sesuai dengan ID kelompok konsumen.
KAFKA_TOPIC
Topik langganan untuk saluran langganan data.
Di Konsol DTS, klik ID instans langganan target. Pada halaman Basic Information, Anda dapat menemukan informasi Topic dan Network.
KAFKA_BROKER_URL_NAME
Alamat jaringan untuk saluran langganan data.
Catatan-
Jika client Kafka dan saluran langganan data Anda berada dalam jaringan klasik atau VPC yang sama, gunakan alamat IP internal untuk meminimalkan latensi jaringan.
-
Kami tidak merekomendasikan penggunaan titik akhir publik karena potensi ketidakstabilan jaringan.
INITIAL_CHECKPOINT_NAME
Waktu untuk memulai konsumsi data, ditentukan sebagai Stempel waktu UNIX. Contoh: 1592269238.
Catatan-
Menyimpan stempel waktu memungkinkan Anda:
-
Melanjutkan konsumsi dari stempel waktu terakhir yang disimpan setelah gangguan untuk mencegah kehilangan data.
-
Memulai konsumsi data dari titik waktu tertentu.
-
-
Jika SUBSCRIBE_MODE_NAME diatur ke subscribe, nilai INITIAL_CHECKPOINT_NAME hanya berlaku saat client langganan pertama kali dijalankan.
Waktu mulai konsumsi harus berada dalam rentang data instans langganan dan harus dikonversi ke Stempel waktu UNIX.
Catatan-
Anda dapat melihat rentang data instans langganan di kolom Data Range pada daftar tugas langganan.
-
Anda dapat menggunakan mesin pencari untuk menemukan konverter Stempel waktu UNIX.
USE_CONFIG_CHECKPOINT_NAME
Nilai default adalah true, yang memaksa client untuk memulai dari waktu yang ditentukan. Hal ini mencegah kehilangan data yang telah diterima tetapi belum diproses.
Tidak ada
SUBSCRIBE_MODE_NAME
Untuk menjalankan dua atau lebih client Kafka dalam kelompok konsumen yang sama, atur parameter ini ke subscribe pada semua client.
Nilai default adalah assign. Dalam mode ini, fitur ini dinonaktifkan, dan kami merekomendasikan hanya menerapkan satu client.
Tidak ada
-
-
Dari bilah menu atas, pilih untuk menjalankan client.
CatatanSaat menjalankan aplikasi untuk pertama kalinya, IntelliJ IDEA mungkin memerlukan waktu untuk mengunduh dan menginstal paket dependensi yang diperlukan secara otomatis.
Hasil
Client berhasil berlangganan data perubahan dari database sumber.
Untuk melihat data perubahan secara detail, hapus // dari //log.info(ret); pada baris 25 dalam file NotifyDemoDB.java dan jalankan kembali client tersebut.
FAQ
-
T: Mengapa saya perlu melacak titik pemeriksaan konsumsi client saya secara manual?
J: Titik pemeriksaan konsumsi yang direkam oleh DTS menunjukkan kapan DTS menerima commit dari client Kafka, bukan kapan aplikasi Anda selesai memproses data. Jika aplikasi atau client Kafka Anda berhenti secara tak terduga, menyediakan titik pemeriksaan yang Anda rekam sendiri memungkinkan Anda melanjutkan konsumsi dari titik gangguan yang tepat, sehingga mencegah duplikasi data maupun kehilangan data.
Kelola checkpoint
-
Konfigurasikan client langganan untuk mendengarkan pergantian kluster dalam modul pengumpulan data DTS.
Untuk melakukan ini, daftarkan
ClusterSwitchListenersebagai interceptor konsumen dengan mengatur properti berikut untuk consumer:properties.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ClusterSwitchListener.class.getName());Kode berikut menunjukkan contoh implementasi
ClusterSwitchListener:public class ClusterSwitchListener implements ClusterResourceListener, ConsumerInterceptor { private final static Logger LOG = LoggerFactory.getLogger(ClusterSwitchListener.class); private ClusterResource originClusterResource = null; private ClusterResource currentClusterResource = null; public ConsumerRecords onConsume(ConsumerRecords records) { return records; } public void close() { } public void onCommit(Map offsets) { } public void onUpdate(ClusterResource clusterResource) { synchronized (this) { originClusterResource = currentClusterResource; currentClusterResource = clusterResource; if (null == originClusterResource) { LOG.info("Cluster updated to " + currentClusterResource.clusterId()); } else { if (originClusterResource.clusterId().equals(currentClusterResource.clusterId())) { LOG.info("Cluster not changed on update:" + clusterResource.clusterId()); } else { LOG.error("Cluster changed"); throw new ClusterSwitchException("Cluster changed from " + originClusterResource.clusterId() + " to " + currentClusterResource.clusterId() + ", consumer require restart"); } } } } public boolean isClusterResourceChanged() { if (null == originClusterResource) { return false; } if (originClusterResource.clusterId().equals(currentClusterResource.clusterId())) { return false; } return true; } public void configure(Map<String, ?> configs) { } public static class ClusterSwitchException extends KafkaException { public ClusterSwitchException(String message, Throwable cause) { super(message, cause); } public ClusterSwitchException(String message) { super(message); } public ClusterSwitchException(Throwable cause) { super(cause); } public ClusterSwitchException() { super(); } } -
Tangani event pergantian kluster dari modul pengumpulan data DTS.
Saat
ClusterSwitchExceptiontertangkap, atur ulang checkpoint awal untuk langganan berikutnya ke stempel waktu dari catatan terakhir yang berhasil dikonsumsi. Potongan kode berikut memberikan contoh:try{ //do some action } catch (ClusterSwitchListener.ClusterSwitchException e) { reset(); } // Reset the checkpoint. public reset() { long offset = kafkaConsumer.offsetsForTimes(timestamp); kafkaConsumer.seek(tp,offset); }CatatanUntuk contoh implementasi, lihat KafkaRecordFetcher.
Pemetaan tipe data dan dataTypeNumber
Pemetaan tipe data MySQL dan dataTypeNumber
|
Tipe data MySQL |
Nilai dataTypeNumber |
|
MYSQL_TYPE_DECIMAL |
0 |
|
MYSQL_TYPE_INT8 |
1 |
|
MYSQL_TYPE_INT16 |
2 |
|
MYSQL_TYPE_INT32 |
3 |
|
MYSQL_TYPE_FLOAT |
4 |
|
MYSQL_TYPE_DOUBLE |
5 |
|
MYSQL_TYPE_NULL |
6 |
|
MYSQL_TYPE_TIMESTAMP |
7 |
|
MYSQL_TYPE_INT64 |
8 |
|
MYSQL_TYPE_INT24 |
9 |
|
MYSQL_TYPE_DATE |
10 |
|
MYSQL_TYPE_TIME |
11 |
|
MYSQL_TYPE_DATETIME |
12 |
|
MYSQL_TYPE_YEAR |
13 |
|
MYSQL_TYPE_DATE_NEW |
14 |
|
MYSQL_TYPE_VARCHAR |
15 |
|
MYSQL_TYPE_BIT |
16 |
|
MYSQL_TYPE_TIMESTAMP_NEW |
17 |
|
MYSQL_TYPE_DATETIME_NEW |
18 |
|
MYSQL_TYPE_TIME_NEW |
19 |
|
MYSQL_TYPE_JSON |
245 |
|
MYSQL_TYPE_DECIMAL_NEW |
246 |
|
MYSQL_TYPE_ENUM |
247 |
|
MYSQL_TYPE_SET |
248 |
|
MYSQL_TYPE_TINY_BLOB |
249 |
|
MYSQL_TYPE_MEDIUM_BLOB |
250 |
|
MYSQL_TYPE_LONG_BLOB |
251 |
|
MYSQL_TYPE_BLOB |
252 |
|
MYSQL_TYPE_VAR_STRING |
253 |
|
MYSQL_TYPE_STRING |
254 |
|
MYSQL_TYPE_GEOMETRY |
255 |
Pemetaan tipe data Oracle dan dataTypeNumber
|
Tipe data Oracle |
dataTypeNumber value |
|
VARCHAR2/NVARCHAR2 |
1 |
|
NUMBER/FLOAT |
2 |
|
LONG |
8 |
|
DATE |
12 |
|
RAW |
23 |
|
LONG_RAW |
24 |
|
UNDEFINED |
29 |
|
XMLTYPE |
58 |
|
ROWID |
69 |
|
CHAR, NCHAR |
96 |
|
BINARY_FLOAT |
100 |
|
BINARY_DOUBLE |
101 |
|
CLOB/NCLOB |
112 |
|
BLOB |
113 |
|
BFILE |
114 |
|
TIMESTAMP |
180 |
|
TIMESTAMP_WITH_TIME_ZONE |
181 |
|
INTERVAL_YEAR_TO_MONTH |
182 |
|
INTERVAL_DAY_TO_SECOND |
183 |
|
UROWID |
208 |
|
TIMESTAMP_WITH_LOCAL_TIME_ZONE |
231 |
Pemetaan tipe data PostgreSQL dan dataTypeNumber
|
Tipe data PostgreSQL |
dataTypeNumber value |
|
INT2/SMALLINT |
21 |
|
INT4/INTEGER/SERIAL |
23 |
|
INT8/BIGINT |
20 |
|
CHARACTER |
18 |
|
CHARACTER VARYING |
1043 |
|
REAL |
700 |
|
DOUBLE PRECISION |
701 |
|
NUMERIC |
1700 |
|
MONEY |
790 |
|
DATE |
1082 |
|
TIME/TIME WITHOUT TIME ZONE |
1083 |
|
TIME WITH TIME ZONE |
1266 |
|
TIMESTAMP/TIMESTAMP WITHOUT TIME ZONE |
1114 |
|
TIMESTAMP WITH TIME ZONE |
1184 |
|
BYTEA |
17 |
|
TEXT |
25 |
|
JSON |
114 |
|
JSONB |
3082 |
|
XML |
142 |
|
UUID |
2950 |
|
POINT |
600 |
|
LSEG |
601 |
|
PATH |
602 |
|
BOX |
603 |
|
POLYGON |
604 |
|
LINE |
628 |
|
CIDR |
650 |
|
CIRCLE |
718 |
|
MACADDR |
829 |
|
INET |
869 |
|
INTERVAL |
1186 |
|
TXID_SNAPSHOT |
2970 |
|
PG_LSN |
3220 |
|
TSVECTOR |
3614 |
|
TSQUERY |
3615 |