Topik ini menjelaskan format pesan yang ditulis ke Kafka dan arti setiap bidang dalam pesan tersebut.
Informasi latar belakang
Sebuah node yang menyinkronkan semua data dari sumber data ke Kafka menulis data yang dibaca DataWorks dari sumber data ke topik Kafka dalam format JSON. Pesan yang ditulis ke Kafka mencakup informasi perubahan kolom serta status data sebelum dan sesudah perubahan. Untuk memastikan konsumen mengetahui kemajuan node sinkronisasi saat mengonsumsi pesan Kafka, node sinkronisasi secara berkala menghasilkan pesan denyut jantung dengan bidang op bernilai MHEARTBEAT dan menulisnya ke topik Kafka. Informasi lebih lanjut tentang format pesan yang ditulis ke Kafka dapat ditemukan di Format Pesan Kafka, Format Pesan Denyut Jantung yang Dihasilkan oleh Node Sinkronisasi, dan Format Pesan Kafka untuk Perubahan Data dalam Sumber. Informasi lebih lanjut tentang tipe dan arti setiap bidang dalam pesan dapat ditemukan di Tipe Bidang dan Bidang.
Format pesan Kafka
Format pesan yang ditulis ke Kafka:
{
"schema": { // Informasi perubahan metadata. Hanya nama kolom dan tipe kolom yang ditentukan.
"dataColumn": [// Informasi perubahan kolom. Data dalam topik tujuan diperbarui berdasarkan informasi ini.
{
"name": "id",
"type": "LONG"
},
{
"name": "name",
"type": "STRING"
},
{
"name": "binData",
"type": "BYTES"
},
{
"name": "ts",
"type": "DATE"
},
{
"name":"rowid",// Jika sumber data Oracle digunakan, rowid ditambahkan sebagai kolom.
"type":"STRING"
}
],
"primaryKey": [
"pkName1",
"pkName2"
],
"source": {
"dbType": "mysql",
"dbVersion": "1.0.0",
"dbName": "myDatabase",
"schemaName": "mySchema",
"tableName": "tableName"
}
},
"payload": {
"before": {
"dataColumn":{
"id": 111,
"name":"scooter",
"binData": "[string base64]",
"ts": 1590315269000,
"rowid": "AAIUMPAAFAACxExAAE"// ID baris dalam sumber data Oracle. Nilainya bertipe STRING.
}
},
"after": {
"dataColumn":{
"id": 222,
"name":"donald",
"binData": "[string base64]",
"ts": 1590315269000,
"rowid": "AAIUMPAAFAACxExAAE"// ID baris dalam sumber data Oracle. Nilainya bertipe STRING.
}
},
"sequenceId":"XXX",// ID urutan unik setiap catatan data yang dihasilkan setelah data tambahan dan semua data digabungkan. Nilainya bertipe STRING.
"scn":"xxxx",// Nomor perubahan sistem (SCN) dari sumber data Oracle. Nilainya bertipe STRING.
"op": "INSERT/UPDATE_BEFOR/UPDATE_AFTER/DELETE/TRANSACTION_BEGIN/TRANSACTION_END/CREATE/ALTER/ERASE/QUERY/TRUNCATE/RENAME/CINDEX/DINDEX/GTID/XACOMMIT/XAROLLBACK/MHEARTBEAT...",// Operasi yang dilakukan. Nilai parameter ini peka huruf besar-kecil.
"timestamp": {
"eventTime": 1,// Diperlukan. Waktu ketika data dalam database sumber berubah. Nilainya adalah timestamp 13-bit dalam milidetik.
"systemTime": 2,// Opsional. Waktu ketika node sinkronisasi membaca pesan perubahan. Nilainya adalah timestamp 13-bit dalam milidetik.
"checkpointTime": 3// Opsional. Waktu tertentu ketika offset sinkronisasi diatur ulang. Nilainya adalah timestamp 13-bit dalam milidetik dan sama dengan nilai bidang eventTime dalam banyak kasus.
},
"ddl": {
"text": "ADD COLUMN ...",
"ddlMeta": "[Pernyataan SQL diserialisasi biner, dinyatakan dalam string base64]"
}
},
"version":"1.0.0"
}Untuk detail tentang tipe bidang dan deskripsi, lihat Tipe Bidang dan Bidang.
Format pesan denyut jantung yang dihasilkan oleh node sinkronisasi
{
"schema": {
"dataColumn": null,
"primaryKey": null,
"source": null
},
"payload": {
"before": null,
"after": null,
"sequenceId": null,
"timestamp": {
"eventTime": 1620457659000,
"checkpointTime": 1620457659000
},
"op": "MHEARTBEAT",
"ddl": null
},
"version": "0.0.1"
}Untuk detail tentang tipe bidang dan deskripsi, lihat Tipe Bidang dan Bidang.
Format pesan Kafka untuk perubahan data dalam sumber
Format pesan Kafka untuk penyisipan data ke dalam sumber:
{ "schema": { "dataColumn": [ { "name": "name", "type": "STRING" }, { "name": "job", "type": "STRING" }, { "name": "sex", "type": "STRING" }, { "name": "#alibaba_rds_row_id#", "type": "LONG" } ], "primaryKey": null, "source": { "dbType": "MySQL", "dbName": "pkset_test", "tableName": "pkset_test_no_pk" } }, "payload": { "before": null, "after": { "dataColumn": { "name": "name11", "job": "job11", "sex": "man", "#alibaba_rds_row_id#": 15 } }, "sequenceId": "1620457642589000000", "timestamp": { "eventTime": 1620457896000, "systemTime": 1620457896977, "checkpointTime": 1620457896000 }, "op": "INSERT", "ddl": null }, "version": "0.0.1" }Format pesan Kafka untuk pembaruan data dalam sumber:
Jika When one record in the source is updated, one Kafka record is generated tidak dipilih, dua pesan Kafka dihasilkan untuk pembaruan data dalam sumber. Satu pesan menjelaskan status data sebelum pembaruan, dan pesan lainnya menjelaskan status data setelah pembaruan. Contoh berikut menunjukkan format:
Format pesan Kafka yang menjelaskan status data sebelum perubahan:
{ "schema": { "dataColumn": [ { "name": "name", "type": "STRING" }, { "name": "job", "type": "STRING" }, { "name": "sex", "type": "STRING" }, { "name": "#alibaba_rds_row_id#", "type": "LONG" } ], "primaryKey": null, "source": { "dbType": "MySQL", "dbName": "pkset_test", "tableName": "pkset_test_no_pk" } }, "payload": { "before": { "dataColumn": { "name": "name11", "job": "job11", "sex": "man", "#alibaba_rds_row_id#": 15 } }, "after": null, "sequenceId": "1620457642589000001", "timestamp": { "eventTime": 1620458077000, "systemTime": 1620458077779, "checkpointTime": 1620458077000 }, "op": "UPDATE_BEFOR", "ddl": null }, "version": "0.0.1" }Format pesan Kafka yang menjelaskan status data setelah perubahan:
{ "schema": { "dataColumn": [ { "name": "name", "type": "STRING" }, { "name": "job", "type": "STRING" }, { "name": "sex", "type": "STRING" }, { "name": "#alibaba_rds_row_id#", "type": "LONG" } ], "primaryKey": null, "source": { "dbType": "MySQL", "dbName": "pkset_test", "tableName": "pkset_test_no_pk" } }, "payload": { "before": null, "after": { "dataColumn": { "name": "name11", "job": "job11", "sex": "woman", "#alibaba_rds_row_id#": 15 } }, "sequenceId": "1620457642589000001", "timestamp": { "eventTime": 1620458077000, "systemTime": 1620458077779, "checkpointTime": 1620458077000 }, "op": "UPDATE_AFTER", "ddl": null }, "version": "0.0.1" }Jika When one record in the source is updated, one Kafka record is generated dipilih, hanya satu pesan Kafka yang dihasilkan untuk pembaruan data dalam sumber. Pesan tersebut menjelaskan status data sebelum dan sesudah pembaruan. Contoh berikut menunjukkan format:
{ "schema": { "dataColumn": [ { "name": "name", "type": "STRING" }, { "name": "job", "type": "STRING" }, { "name": "sex", "type": "STRING" }, { "name": "#alibaba_rds_row_id#", "type": "LONG" } ], "primaryKey": null, "source": { "dbType": "MySQL", "dbName": "pkset_test", "tableName": "pkset_test_no_pk" } }, "payload": { "before": { "dataColumn": { "name": "name11", "job": "job11", "sex": "man", "#alibaba_rds_row_id#": 15 } }, "after": { "dataColumn": { "name": "name11", "job": "job11", "sex": "woman", "#alibaba_rds_row_id#": 15 } }, "sequenceId": "1620457642589000001", "timestamp": { "eventTime": 1620458077000, "systemTime": 1620458077779, "checkpointTime": 1620458077000 }, "op": "UPDATE_AFTER", "ddl": null }, "version": "0.0.1" }
Format pesan Kafka untuk penghapusan data dari sumber:
{ "schema": { "dataColumn": [ { "name": "name", "type": "STRING" }, { "name": "job", "type": "STRING" }, { "name": "sex", "type": "STRING" }, { "name": "#alibaba_rds_row_id#", "type": "LONG" } ], "primaryKey": null, "source": { "dbType": "MySQL", "dbName": "pkset_test", "tableName": "pkset_test_no_pk" } }, "payload": { "before": { "dataColumn": { "name": "name11", "job": "job11", "sex": "woman", "#alibaba_rds_row_id#": 15 } }, "after": null, "sequenceId": "1620457642589000002", "timestamp": { "eventTime": 1620458266000, "systemTime": 1620458266101, "checkpointTime": 1620458266000 }, "op": "DELETE", "ddl": null }, "version": "0.0.1" }CatatanUntuk detail tentang tipe bidang dan deskripsi, lihat Tipe Bidang dan Bidang.
Format pesan untuk keluaran waktu nyata dari satu tabel
Saat mengonfigurasi Kafka sebagai tujuan untuk tugas sinkronisasi tabel tunggal waktu nyata, Anda harus mengonfirmasi format nilai yang ditulis ke Kafka. Format yang valid adalah Canal CDC dan JSON. Untuk detailnya, lihat Lampiran: Deskripsi Format Keluaran.
Tipe bidang
Data yang dibaca dari sumber dipetakan ke tipe BOOLEAN, DOUBLE, DATE, BYTES, LONG, dan STRING, lalu ditulis ke topik Kafka dalam format JSON yang berbeda.
Tipe bidang | Deskripsi |
BOOLEAN | Berkorespondensi dengan tipe BOOLEAN dalam JSON. Nilai valid: true dan false. |
DATE | Berkorespondensi dengan tipe NUMBER dalam JSON. Nilainya adalah timestamp 13-digit dalam milidetik. |
BYTES | Berkorespondensi dengan tipe STRING dalam JSON. Sebelum data ditulis ke Kafka, larik byte dikodekan dalam Base64 dan diubah menjadi string. Konsumen perlu mendekode string yang dikodekan Base64 sebelum mengonsumsi string tersebut. Base64.getEncoder().encodeToString(text.getBytes("UTF-8")) digunakan untuk pengkodean dan Base64.getDecoder().decode(encodedText)) digunakan untuk dekode. |
STRING | Berkorespondensi dengan tipe STRING dalam JSON. |
LONG | Berkorespondensi dengan tipe NUMBER dalam JSON. |
DOUBLE | Berkorespondensi dengan tipe NUMBER dalam JSON. |
Bidang
Tabel berikut menjelaskan arti setiap bidang dalam pesan yang ditulis ke Kafka.
Bidang tingkat-1 | Bidang tingkat-2 | Deskripsi |
schema | dataColumn | Nama dan tipe kolom. Nilainya bertipe JSONArray. dataColumn mencatat nama dan tipe kolom yang diperbarui dalam sumber. Operasi perubahan dapat berupa penambahan, penghapusan, atau modifikasi data, atau perubahan skema tabel dalam sumber.
|
primaryKey | Informasi kunci utama. Nilainya bertipe List. pk: nama kunci utama. | |
source | Informasi tentang database sumber atau tabel sumber. Nilainya bertipe Object.
| |
payload | before | Data sebelum perubahan. Nilainya bertipe JSONObject. Misalnya, jika database MySQL adalah sumber dan data dalam sumber tersebut diperbarui, bidang before mencatat data sebelum pembaruan.
|
after | Data setelah perubahan. Bidang after mencatat data setelah perubahan dalam format data yang sama dengan bidang before. | |
sequenceId | ID urutan unik setiap catatan data yang dihasilkan oleh StreamX setelah data tambahan dan semua data digabungkan. Nilainya bertipe STRING. Catatan Setelah pesan pembaruan data dibaca dari sumber, dua catatan tulis dihasilkan: update before dan update after. Kedua catatan tulis memiliki ID urutan yang sama. | |
scn | SCN dari sumber. Bidang ini valid ketika sumber adalah database Oracle. | |
op | Tipe operasi yang dilakukan pada data dalam sumber. Nilai valid:
| |
timestamp | Timestamp dari catatan data. Nilainya bertipe JSONObject.
| |
ddl | Bidang ini ditentukan hanya jika skema tabel dalam sumber diubah. Nilainya NULL ketika operasi DDL, seperti penambahan, penghapusan, atau modifikasi data dilakukan dalam sumber.
| |
version | N/A | Versi data dalam format JSON. |