全部產品
Search
文件中心

Data Transmission Service:訊息佇列中的資料存放區格式

更新時間:Apr 22, 2025

Data Transmission Service支援選擇資料同步或遷移到訊息佇列(例如Kafka或RocketMQ)時所採用的儲存格式,本文為您介紹資料格式的定義說明,方便您根據定義解析資料。

資料存放區格式

DTS支援將寫入至訊息佇列的資料存放區為如下三種格式:

  • DTS Avro:一種資料序列化格式,可以將資料結構或對象轉化成便於儲存或傳輸的格式。

  • Shareplex Json:資料複製軟體SharePlex讀取源庫中的資料,將資料寫入至訊息佇列時,資料存放區格式為Shareplex Json。

  • Canal JsonCanal解析資料庫增量日誌,並將增量資料轉送至訊息佇列,資料存放區格式為Canal Json。

DTS Avro

您需要根據DTS Avro的Schema定義進行資料解析,詳情請參見DTS Avro的Schema定義DTS Avro的還原序列化樣本

說明

DTS Avro格式中的DDL語句為String類型。

Shareplex Json

參數說明

參數

說明

time

資料庫中事務的提交時間,格式為yyyy-MM-ddTHH:mm:ssZ(UTC時間)。

userid

提交事務的使用者ID。

op

資料操作類型,包括INSERT、UPDATE、DELETE、TRUNCATE、DROP COLUMN、UPDATE BEFORE、UPDATE AFTER。

scn

系統變化編號SCN(System Change Number),用以標識資料庫在某個確切時刻提交事務的版本。每個已提交的事務分配一個唯一的SCN。

rowid

用於定位元據庫中一條記錄的一個相對唯一地址值。

trans

事務ID。

seq

事務內部的操作序號,從1開始記錄。

size

事務內部的操作總數。

table

表名。

idx

事務內部操作的索引,格式為seq/size,例如1/11表示,在操作總數為11的事務內部,該操作的序號為1。

posttime

事務提交至目標庫的時間。

樣本

插入資料

{
    "meta": {
        "time": "2017-06-16T14:24:34", 
        "userid": 84,                                    
        "op": "ins",                                   
          "scn": "14589063118712",                  
          "rowid": "AAATGpAAIAAItcIAAA",      
        "trans": "7.0.411499",                 
        "seq": 1,                                          
        "size": 11,                                         
        "table": "CL_BIZ1.MIO_LOG",       
          "idx": "1/11",                                       
        "posttime": "2017-06-16T14:33:52"
    },
    "data": {
        "MIO_LOG_ID": "32539737"
     }
}

更新資料

{
    "meta": {
        "time": "2017-06-16T15:38:13",
        "userid": 84,
        "op": "upd",                             
        "table": "CL_BIZ1.MIO_LOG"
        ….
    },
    "data": {                                          
        "CNTR_NO": "1171201606"
    },
    "key": {                                            
        "MIO_LOG_ID": "32537893",
        "PLNMIO_REC_ID": "31557806",
        "POL_CODE": null,
        "CNTR_TYPE": null,
        "CNTR_NO": "1171201606syui26"
    }
}

刪除資料

{
    "meta": {
        "time": "2017-06-16T15:51:35",
        "userid": 84,
        "op": "del",                      
     },
    "data": {                                    
        "MIO_LOG_ID": "32539739",
        "PLNMIO_REC_ID": "31557806",
        "POL_CODE": null,
        "CNTR_TYPE": null,
        "CG_NO": null
     }
}

Canal Json

參數說明

參數

說明

database

資料庫名稱。

es

操作在源庫的執行時間,13位Unix時間戳記,單位為毫秒。

說明

Unix時間戳記轉換工具可通過搜尋引擎擷取。

id

操作的序號。

isDdl

是否是DDL操作。

  • true:是。

  • false:否。

mysqlType

欄位的資料類型。

說明

不支援精度等資料類型的參數資訊。

olddata

變更前或變更後的資料。

說明

2022年3月20日之前建立的同步或遷移執行個體,old的值是變更後的資料(預設包含所有列的資料,而不是只包含被修改列的資料),data的值是變更前的資料。為了和開源社區保持一致,2022年3月20日起建立或重啟的同步或遷移執行個體,data的值是變更後的資料,old的值是變更前的資料。

pkNames

主鍵名稱。

sql

SQL語句。

sqlType

經轉換處理後的欄位類型,取值與dataTypeNumber的數值相同。更多資訊,請參見欄位類型與dataTypeNumber數值的對應關係

table

表名。

ts

操作開始寫入到目標庫的時間,13位Unix時間戳記,單位為毫秒。

說明

Unix時間戳記轉換工具可通過搜尋引擎擷取。

type

操作的類型,比如DELETE、UPDATE、INSERT。

說明

全量任務階段固定為INIT

gtid

全域事務標識GTID(Global Transaction Identifier),具有全域唯一性,一個事務對應一個GTID。

樣本

更新資料

說明

2022年3月20日之前建立的同步或遷移執行個體,源表的DELETE語句同步或遷移到Kafka,其中old的值是資料,data的值是null。為了和開源社區保持一致,2022年3月20日起建立或重啟的同步或遷移執行個體,data的值是資料,old的值是null。

2022年3月20日之前建立的同步或遷移執行個體

{
    "old": [
        {
            "shipping_type": "aaa"
        }
    ], 
    "database": "dbname", 
    "es": 1600161894000, 
    "id": 58, 
    "isDdl": false, 
    "mysqlType": {
        "id": "bigint", 
        "shipping_type": "varchar"
    }, 
    "pkNames": [
        "id"
    ], 
    "sql": "", 
    "sqlType": {
        "id": -5, 
        "shipping_type": 12
    }, 
    "table": "tablename", 
    "ts": 1600161894771, 
    "type": "DELETE"
}

2022年3月20日起建立或重啟的同步或遷移執行個體

{
    "data": [
        {
            "id": "500000287", 
            "shipping_type": null
        }
    ], 
    "database": "dbname", 
    "es": 1600161894000, 
    "id": 58, 
    "isDdl": false, 
    "mysqlType": {
        "id": "bigint", 
        "shipping_type": "varchar"
    }, 
    "pkNames": [
        "id"
    ], 
    "sql": "", 
    "sqlType": {
        "id": -5, 
        "shipping_type": 12
    }, 
    "table": "tablename", 
    "ts": 1600161894771, 
    "type": "DELETE"
}
            

DDL操作

{
    "database":"dbname",表示同步或遷移的資料庫名稱
    "es":1600161894000,表示源庫資料寫入到binlog的時間
    "id":58,DTS緩衝的位移量
    "isDdl":true,是否同步或遷移DDL
    "sql":"eg:createxxx",Binlog的DDL語句
    "table":"tablename",同步或遷移的表名
    "ts":1600161894771,DTS將資料寫入目標的時間
    "type":"DDL"
}