全部产品
Search
文档中心

Data Transmission Service:Format data dalam antrian pesan

更新时间:Jul 02, 2025

Ketika menggunakan Data Transmission Service (DTS) untuk migrasi atau sinkronisasi data ke antrian pesan seperti kluster Kafka atau ApsaraMQ for RocketMQ, Anda dapat menentukan format penyimpanan data dalam antrian pesan. Topik ini menjelaskan format yang tersedia untuk menyimpan data dalam antrian pesan dan cara mengurai data berdasarkan definisi format tersebut.

Format data

DTS mendukung penyimpanan data ke antrian pesan dalam format berikut:

  • DTS Avro: Format serialisasi data yang memungkinkan konversi struktur data atau objek untuk penyimpanan dan transmisi yang lebih efisien.

  • Shareplex Json: Format penyimpanan data yang dibaca dari database sumber menggunakan perangkat lunak replikasi data SharePlex.

  • Canal Json: Format penyimpanan data setelah Canal mengurai log tambahan dari database sumber dan mentransmisikan data tersebut ke antrian pesan.

DTS Avro

Data harus diurai berdasarkan definisi skema DTS Avro. Untuk informasi lebih lanjut, lihat subscribe_example di GitHub.

Catatan

Dalam format DTS Avro, pernyataan Data Definition Language (DDL) bertipe STRING.

Shareplex Json

Parameter

Parameter

Deskripsi

time

Waktu UTC ketika transaksi dalam database dikomit. Nilai parameter ini dalam format yyyy-MM-ddTHH:mm:ssZ.

userid

ID pengguna yang mengkomit transaksi.

op

Tipe operasi. Nilai valid: INSERT, UPDATE, DELETE, TRUNCATE, DROP COLUMN, UPDATE BEFORE, dan UPDATE AFTER.

scn

Nomor perubahan sistem (SCN) yang mengidentifikasi versi transaksi yang dikomit oleh database pada waktu tertentu. Setiap transaksi yang dikomit diberi SCN unik.

rowid

Alamat nilai yang relatif unik yang digunakan untuk mengidentifikasi rekaman dalam database.

trans

ID transaksi.

seq

Nomor urutan operasi dalam transaksi. Nomor dimulai dari 1.

size

Jumlah total operasi dalam transaksi.

table

Nama tabel.

idx

Indeks operasi dalam transaksi, dalam format seq/size. Contohnya, 1/11 menunjukkan bahwa nomor urutan operasi adalah 1 dalam transaksi yang berisi 11 operasi.

posttime

Waktu ketika transaksi dikomit ke database tujuan.

Contoh:

Data dimasukkan

{
    "meta": {
        "time": "2017-06-16T14:24:34", 
        "userid": 84,                                    
        "op": "ins",                                   
          "scn": "14589063118712",                  
          "rowid": "AAATGpAAIAAItcIAAA",      
        "trans": "7.0.411499",                 
        "seq": 1,                                          
        "size": 11,                                         
        "table": "CL_BIZ1.MIO_LOG",       
          "idx": "1/11",                                       
        "posttime": "2017-06-16T14:33:52"
    },
    "data": {
        "MIO_LOG_ID": "32539737"
     }
}

Data diperbarui

{
    "meta": {
        "time": "2017-06-16T15:38:13",
        "userid": 84,
        "op": "upd",                             
        "table": "CL_BIZ1.MIO_LOG"
        ….
    },
    "data": {                                          
        "CNTR_NO": "1171201606"
    },
    "key": {                                            
        "MIO_LOG_ID": "32537893",
        "PLNMIO_REC_ID": "31557806",
        "POL_CODE": null,
        "CNTR_TYPE": null,
        "CNTR_NO": "1171201606syui26"
    }
}

Data dihapus

{
    "meta": {
        "time": "2017-06-16T15:51:35",
        "userid": 84,
        "op": "del",                      
     },
    "data": {                                    
        "MIO_LOG_ID": "32539739",
        "PLNMIO_REC_ID": "31557806",
        "POL_CODE": null,
        "CNTR_TYPE": null,
        "CG_NO": null
     }
}

Canal Json

Parameter

Parameter

Deskripsi

database

Nama database.

es

Waktu ketika operasi dilakukan dalam database. Nilainya adalah timestamp UNIX 13-bit. Satuan: milidetik.

Catatan

Anda dapat menggunakan mesin pencari untuk mendapatkan konverter timestamp UNIX.

id

Nomor seri operasi.

isDdl

Menunjukkan apakah operasi adalah operasi DDL.

  • true: Operasi adalah operasi DDL.

  • false: Operasi bukan operasi DDL.

mysqlType

Tipe data dari bidang.

Catatan

Parameter operasi seperti tipe presisi tidak didukung.

old dan data

Data sebelum dan sesudah pembaruan.

Catatan

Untuk instance sinkronisasi atau migrasi data yang dibuat sebelum 20 Maret 2022, nilai old adalah data setelah pembaruan dan nilai data adalah data sebelum pembaruan. Secara default, data dari semua kolom termasuk. Untuk tetap konsisten dengan komunitas open source, nilai data adalah data setelah pembaruan dan nilai old adalah data sebelum pembaruan untuk instance sinkronisasi atau migrasi data yang dibuat atau dimulai ulang mulai 20 Maret 2022.

pkNames

Nama kunci utama.

sql

Pernyataan SQL.

sqlType

Tipe bidang yang dikonversi. Nilai validnya sama dengan nilai valid parameter dataTypeNumber. Untuk informasi lebih lanjut, lihat bagian Pemetaan antara tipe data MySQL dan nilai dataTypeNumber dalam topik "Gunakan klien Kafka untuk mengonsumsi data terlacak".

table

Nama tabel.

ts

Waktu ketika operasi mulai dilakukan dalam database tujuan. Nilainya adalah timestamp UNIX 13-bit. Satuan: milidetik.

Catatan

Anda dapat menggunakan mesin pencari untuk mendapatkan konverter timestamp UNIX.

type

Tipe operasi. Nilai valid: DELETE, UPDATE, dan INSERT.

Catatan

Selama sinkronisasi atau migrasi data penuh, tipe operasi ditetapkan ke INIT.

gtid

Pengenal transaksi global (GTID) yang mengidentifikasi transaksi. Setiap transaksi diberi GTID unik secara global.

Contoh

Data diperbarui

Catatan

Untuk instance pelacakan perubahan yang dibuat sebelum 20 Maret 2022 dan disinkronkan ke kluster Kafka menggunakan pernyataan DELETE dari tabel sumber, nilai old adalah data dan nilai data adalah NULL. Untuk tetap konsisten dengan komunitas open source, nilai data adalah data dan nilai old adalah NULL untuk instance pelacakan perubahan yang dibuat atau dimulai ulang mulai 20 Maret 2022.

Instance sinkronisasi atau migrasi data yang dibuat sebelum 20 Maret 2022

{
    "old": [
        {
            "shipping_type": "aaa"
        }
    ], 
    "database": "dbname", 
    "es": 1600161894000, 
    "id": 58, 
    "isDdl": false, 
    "mysqlType": {
        "id": "bigint", 
        "shipping_type": "varchar"
    }, 
    "pkNames": [
        "id"
    ], 
    "sql": "", 
    "sqlType": {
        "id": -5, 
        "shipping_type": 12
    }, 
    "table": "tablename", 
    "ts": 1600161894771, 
    "type": "DELETE"
}

Instance sinkronisasi atau migrasi data yang dibuat atau dimulai ulang mulai 20 Maret 2022

{
    "data": [
        {
            "id": "500000287", 
            "shipping_type": null
        }
    ], 
    "database": "dbname", 
    "es": 1600161894000, 
    "id": 58, 
    "isDdl": false, 
    "mysqlType": {
        "id": "bigint", 
        "shipping_type": "varchar"
    }, 
    "pkNames": [
        "id"
    ], 
    "sql": "", 
    "sqlType": {
        "id": -5, 
        "shipping_type": 12
    }, 
    "table": "tablename", 
    "ts": 1600161894771, 
    "type": "DELETE"
}
            

Operasi DDL

{
    "database":"dbname", nama database sumber
    "es":1600161894000, waktu ketika data dalam database sumber ditulis ke log biner.
    "id":58, offset cache DTS.
    "isDdl":true, menentukan apakah akan menyinkronkan atau memigrasikan operasi DDL.
    "sql":"eg:createxxx", pernyataan DDL yang direkam dalam log biner.
    "table":"tablename", nama tabel sumber.
    "ts":1600161894771, waktu ketika DTS menulis data ke database tujuan.
    "type":"DDL"
}