Data Transmission Service (DTS) memungkinkan Anda memilih format penyimpanan saat menyinkronkan atau memigrasikan data ke message queue, seperti Kafka atau RocketMQ. Topik ini menjelaskan format data tersebut untuk membantu Anda menguraikannya.
Format penyimpanan data
DTS mendukung tiga format penyimpanan berikut untuk data yang ditulis ke message queue:
-
DTS Avro: Format serialisasi data yang mengonversi struktur data atau objek menjadi format yang mudah disimpan atau ditransmisikan.
-
Shareplex Json: Saat perangkat lunak replikasi data SharePlex membaca data dari database sumber dan menulisnya ke message queue, data tersebut disimpan dalam format Shareplex Json.
-
Canal Json: Canal mengurai log inkremental dari database dan mengirimkan data inkremental tersebut ke message queue. Data tersebut disimpan dalam format Canal Json.
DTS Avro
Anda harus mengurai data berdasarkan definisi skema DTS Avro. Untuk informasi selengkapnya, lihat definisi skema DTS Avro dan contoh deserialisasi DTS Avro.
Dalam format DTS Avro, pernyataan DDL bertipe String.
Shareplex Json
Deskripsi Parameter
|
Parameter |
Deskripsi |
|
|
Waktu transaksi dikomit di database. Formatnya adalah yyyy-MM-ddTHH:mm:ssZ (UTC). |
|
|
ID pengguna yang melakukan komit transaksi. |
|
|
Jenis operasi data. Nilai yang valid meliputi INSERT, UPDATE, DELETE, TRUNCATE, DROP COLUMN, UPDATE BEFORE, dan UPDATE AFTER. |
|
|
System Change Number (SCN). SCN mengidentifikasi versi transaksi yang dikomit pada waktu tertentu di database. Setiap transaksi yang dikomit diberi SCN unik. |
|
|
Nilai alamat yang relatif unik yang digunakan untuk menemukan catatan di database. |
|
|
ID transaksi. |
|
|
Nomor urut operasi dalam transaksi. Nilainya dimulai dari 1. |
|
|
Jumlah total operasi dalam transaksi. |
|
|
Nama tabel. |
|
|
Indeks operasi dalam transaksi. Formatnya adalah |
|
|
Waktu transaksi dikomit ke database tujuan. |
Contoh
Masukkan data
{
"meta": {
"time": "2017-06-16T14:24:34",
"userid": 84,
"op": "ins",
"scn": "14589063118712",
"rowid": "AAATGpAAIAAItcIAAA",
"trans": "7.0.411499",
"seq": 1,
"size": 11,
"table": "CL_BIZ1.MIO_LOG",
"idx": "1/11",
"posttime": "2017-06-16T14:33:52"
},
"data": {
"MIO_LOG_ID": "32539737"
}
}
Perbarui data
{
"meta": {
"time": "2017-06-16T15:38:13",
"userid": 84,
"op": "upd",
"table": "CL_BIZ1.MIO_LOG"
….
},
"data": {
"CNTR_NO": "1171201606"
},
"key": {
"MIO_LOG_ID": "32537893",
"PLNMIO_REC_ID": "31557806",
"POL_CODE": null,
"CNTR_TYPE": null,
"CNTR_NO": "1171201606syui26"
}
}
Hapus data
{
"meta": {
"time": "2017-06-16T15:51:35",
"userid": 84,
"op": "del",
},
"data": {
"MIO_LOG_ID": "32539739",
"PLNMIO_REC_ID": "31557806",
"POL_CODE": null,
"CNTR_TYPE": null,
"CG_NO": null
}
}
Canal Json
Jika Anda mengaktifkan Split message delivery after partition key update, Kafka mengirimkan pesan DELETE dan pesan INSERT saat kunci partisi diubah. Kafka memilih partisi untuk setiap pesan berdasarkan nilai kunci partisinya masing-masing.
Contoh: Kunci partisi id memiliki nilai 1. Pesan dikirim ke partition-1. Daftar berikut menjelaskan perbedaan sebelum dan sesudah Anda mengaktifkan Split message delivery after partition key update.
-
Diaktifkan: Saat Anda menjalankan perintah
UPDATE SET id = 2 WHERE id = 1di database sumber, pesanDELETEdenganid=1dikirim kepartition-1, dan pesanINSERTdenganid=2dikirim kepartition-2. -
Nonaktif: Hanya satu pesan
UPDATEyang dikirim kepartition-1. OperasiUPDATEmemilih partisi untuk pengiriman berdasarkan nilai sebelum perubahan.
Deskripsi metrik
|
Parameter |
Deskripsi |
|
|
Nama database. |
|
|
Waktu operasi dieksekusi di database sumber. Ini adalah stempel waktu UNIX 13 digit dalam milidetik. Catatan
|
|
|
Nomor seri operasi. Catatan
Ini dihasilkan dari stempel waktu dan offset internal DTS. Ini dapat membantu Anda menentukan urutan catatan. |
|
|
Menentukan apakah operasi tersebut merupakan operasi DDL.
|
|
|
Tipe data bidang. Catatan
Parameter untuk tipe data, seperti presisi, tidak didukung. |
|
|
Data sebelum atau setelah perubahan. Catatan
Untuk instans sinkronisasi atau migrasi yang dibuat sebelum 20 Maret 2022, nilai old adalah data setelah perubahan, dan nilai |
|
|
Nama primary key. |
|
|
Pernyataan SQL. |
|
|
Tipe bidang yang ditransformasi. Nilainya sama dengan nilai dataTypeNumber. Untuk informasi selengkapnya, lihat Pemetaan antara tipe bidang dan nilai dataTypeNumber. |
|
|
Nama tabel. |
|
|
Waktu operasi mulai menulis data ke database tujuan. Ini adalah stempel waktu UNIX 13 digit dalam milidetik. Catatan
Anda dapat menggunakan mesin pencari untuk menemukan tool konversi stempel waktu UNIX. |
|
|
Jenis operasi, seperti DELETE, UPDATE, atau INSERT. Catatan
Untuk tugas sinkronisasi atau migrasi data penuh, nilainya tetap INIT. |
|
|
Global Transaction Identifier (GTID). GTID bersifat unik secara global. Setiap transaksi berkorespondensi dengan satu GTID. Catatan
DTS tidak mendukung sinkronisasi bidang GTID. Nilai bidang ini NULL secara default. |
Contoh
Perbarui data
Untuk instans sinkronisasi atau migrasi yang dibuat sebelum 20 Maret 2022, saat pernyataan DELETE dari tabel sumber disinkronkan atau dimigrasikan ke Kafka, bidang old berisi data dan bidang data bernilai null. Untuk menyelaraskan dengan komunitas open source, untuk instans yang dibuat atau dijalankan ulang pada atau setelah 20 Maret 2022, bidang data berisi data dan bidang old bernilai null.
Instans sinkronisasi atau migrasi yang dibuat sebelum 20 Maret 2022
{
"old": [
{
"shipping_type": "aaa"
}
],
"database": "dbname",
"es": 1600161894000,
"id": 58,
"isDdl": false,
"mysqlType": {
"id": "bigint",
"shipping_type": "varchar"
},
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": -5,
"shipping_type": 12
},
"table": "tablename",
"ts": 1600161894771,
"type": "DELETE"
}
Instans sinkronisasi atau migrasi yang dibuat atau dijalankan ulang pada atau setelah 20 Maret 2022
{
"data": [
{
"id": "500000287",
"shipping_type": null
}
],
"database": "dbname",
"es": 1600161894000,
"id": 58,
"isDdl": false,
"mysqlType": {
"id": "bigint",
"shipping_type": "varchar"
},
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": -5,
"shipping_type": 12
},
"table": "tablename",
"ts": 1600161894771,
"type": "DELETE"
}
Operasi DDL
{
"database":"dbname", // Nama database untuk sinkronisasi atau migrasi.
"es":1600161894000, // Waktu saat data sumber ditulis ke log biner.
"id":58, // Offset dalam cache DTS.
"isDdl":true, // Menentukan apakah pernyataan DDL disinkronkan atau dimigrasikan.
"sql":"eg:createxxx", // Pernyataan DDL dari log biner.
"table":"tablename", // Nama tabel untuk sinkronisasi atau migrasi.
"ts":1600161894771, // Waktu saat DTS menulis data ke tujuan.
"type":"DDL"
}