Ketika menggunakan Data Transmission Service (DTS) untuk migrasi atau sinkronisasi data ke antrian pesan seperti kluster Kafka atau ApsaraMQ for RocketMQ, Anda dapat menentukan format penyimpanan data dalam antrian pesan. Topik ini menjelaskan format yang tersedia untuk menyimpan data dalam antrian pesan dan cara mengurai data berdasarkan definisi format tersebut.
Format data
DTS mendukung penyimpanan data ke antrian pesan dalam format berikut:
DTS Avro: Format serialisasi data yang memungkinkan konversi struktur data atau objek untuk penyimpanan dan transmisi yang lebih efisien.
Shareplex Json: Format penyimpanan data yang dibaca dari database sumber menggunakan perangkat lunak replikasi data SharePlex.
Canal Json: Format penyimpanan data setelah Canal mengurai log tambahan dari database sumber dan mentransmisikan data tersebut ke antrian pesan.
DTS Avro
Data harus diurai berdasarkan definisi skema DTS Avro. Untuk informasi lebih lanjut, lihat subscribe_example di GitHub.
Dalam format DTS Avro, pernyataan Data Definition Language (DDL) bertipe STRING.
Shareplex Json
Parameter
Parameter | Deskripsi |
| Waktu UTC ketika transaksi dalam database dikomit. Nilai parameter ini dalam format yyyy-MM-ddTHH:mm:ssZ. |
| ID pengguna yang mengkomit transaksi. |
| Tipe operasi. Nilai valid: INSERT, UPDATE, DELETE, TRUNCATE, DROP COLUMN, UPDATE BEFORE, dan UPDATE AFTER. |
| Nomor perubahan sistem (SCN) yang mengidentifikasi versi transaksi yang dikomit oleh database pada waktu tertentu. Setiap transaksi yang dikomit diberi SCN unik. |
| Alamat nilai yang relatif unik yang digunakan untuk mengidentifikasi rekaman dalam database. |
| ID transaksi. |
| Nomor urutan operasi dalam transaksi. Nomor dimulai dari 1. |
| Jumlah total operasi dalam transaksi. |
| Nama tabel. |
| Indeks operasi dalam transaksi, dalam format |
| Waktu ketika transaksi dikomit ke database tujuan. |
Contoh:
Data dimasukkan
{
"meta": {
"time": "2017-06-16T14:24:34",
"userid": 84,
"op": "ins",
"scn": "14589063118712",
"rowid": "AAATGpAAIAAItcIAAA",
"trans": "7.0.411499",
"seq": 1,
"size": 11,
"table": "CL_BIZ1.MIO_LOG",
"idx": "1/11",
"posttime": "2017-06-16T14:33:52"
},
"data": {
"MIO_LOG_ID": "32539737"
}
}Data diperbarui
{
"meta": {
"time": "2017-06-16T15:38:13",
"userid": 84,
"op": "upd",
"table": "CL_BIZ1.MIO_LOG"
….
},
"data": {
"CNTR_NO": "1171201606"
},
"key": {
"MIO_LOG_ID": "32537893",
"PLNMIO_REC_ID": "31557806",
"POL_CODE": null,
"CNTR_TYPE": null,
"CNTR_NO": "1171201606syui26"
}
}Data dihapus
{
"meta": {
"time": "2017-06-16T15:51:35",
"userid": 84,
"op": "del",
},
"data": {
"MIO_LOG_ID": "32539739",
"PLNMIO_REC_ID": "31557806",
"POL_CODE": null,
"CNTR_TYPE": null,
"CG_NO": null
}
}Canal Json
Parameter
Parameter | Deskripsi |
| Nama database. |
| Waktu ketika operasi dilakukan dalam database. Nilainya adalah timestamp UNIX 13-bit. Satuan: milidetik. Catatan Anda dapat menggunakan mesin pencari untuk mendapatkan konverter timestamp UNIX. |
| Nomor seri operasi. |
| Menunjukkan apakah operasi adalah operasi DDL.
|
| Tipe data dari bidang. Catatan Parameter operasi seperti tipe presisi tidak didukung. |
| Data sebelum dan sesudah pembaruan. Catatan Untuk instance sinkronisasi atau migrasi data yang dibuat sebelum 20 Maret 2022, nilai |
| Nama kunci utama. |
| Pernyataan SQL. |
| Tipe bidang yang dikonversi. Nilai validnya sama dengan nilai valid parameter dataTypeNumber. Untuk informasi lebih lanjut, lihat bagian Pemetaan antara tipe data MySQL dan nilai dataTypeNumber dalam topik "Gunakan klien Kafka untuk mengonsumsi data terlacak". |
| Nama tabel. |
| Waktu ketika operasi mulai dilakukan dalam database tujuan. Nilainya adalah timestamp UNIX 13-bit. Satuan: milidetik. Catatan Anda dapat menggunakan mesin pencari untuk mendapatkan konverter timestamp UNIX. |
| Tipe operasi. Nilai valid: DELETE, UPDATE, dan INSERT. Catatan Selama sinkronisasi atau migrasi data penuh, tipe operasi ditetapkan ke INIT. |
| Pengenal transaksi global (GTID) yang mengidentifikasi transaksi. Setiap transaksi diberi GTID unik secara global. |
Contoh
Data diperbarui
Untuk instance pelacakan perubahan yang dibuat sebelum 20 Maret 2022 dan disinkronkan ke kluster Kafka menggunakan pernyataan DELETE dari tabel sumber, nilai old adalah data dan nilai data adalah NULL. Untuk tetap konsisten dengan komunitas open source, nilai data adalah data dan nilai old adalah NULL untuk instance pelacakan perubahan yang dibuat atau dimulai ulang mulai 20 Maret 2022.
Instance sinkronisasi atau migrasi data yang dibuat sebelum 20 Maret 2022
{
"old": [
{
"shipping_type": "aaa"
}
],
"database": "dbname",
"es": 1600161894000,
"id": 58,
"isDdl": false,
"mysqlType": {
"id": "bigint",
"shipping_type": "varchar"
},
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": -5,
"shipping_type": 12
},
"table": "tablename",
"ts": 1600161894771,
"type": "DELETE"
}Instance sinkronisasi atau migrasi data yang dibuat atau dimulai ulang mulai 20 Maret 2022
{
"data": [
{
"id": "500000287",
"shipping_type": null
}
],
"database": "dbname",
"es": 1600161894000,
"id": 58,
"isDdl": false,
"mysqlType": {
"id": "bigint",
"shipping_type": "varchar"
},
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": -5,
"shipping_type": 12
},
"table": "tablename",
"ts": 1600161894771,
"type": "DELETE"
}
Operasi DDL
{
"database":"dbname", nama database sumber
"es":1600161894000, waktu ketika data dalam database sumber ditulis ke log biner.
"id":58, offset cache DTS.
"isDdl":true, menentukan apakah akan menyinkronkan atau memigrasikan operasi DDL.
"sql":"eg:createxxx", pernyataan DDL yang direkam dalam log biner.
"table":"tablename", nama tabel sumber.
"ts":1600161894771, waktu ketika DTS menulis data ke database tujuan.
"type":"DDL"
}