Data Transmission Service支援選擇資料同步或遷移到訊息佇列(例如Kafka或RocketMQ)時所採用的儲存格式,本文為您介紹資料格式的定義說明,方便您根據定義解析資料。
資料存放區格式
DTS支援將寫入至訊息佇列的資料存放區為如下三種格式:
DTS Avro:一種資料序列化格式,可以將資料結構或對象轉化成便於儲存或傳輸的格式。
Shareplex Json:資料複製軟體SharePlex讀取源庫中的資料,將資料寫入至訊息佇列時,資料存放區格式為Shareplex Json。
Canal Json:Canal解析資料庫增量日誌,並將增量資料轉送至訊息佇列,資料存放區格式為Canal Json。
DTS Avro
您需要根據DTS Avro的Schema定義進行資料解析,詳情請參見DTS Avro的Schema定義和DTS Avro的還原序列化樣本。
DTS Avro格式中的DDL語句為String類型。
Shareplex Json
參數說明
參數 | 說明 |
| 資料庫中事務的提交時間,格式為yyyy-MM-ddTHH:mm:ssZ(UTC時間)。 |
| 提交事務的使用者ID。 |
| 資料操作類型,包括INSERT、UPDATE、DELETE、TRUNCATE、DROP COLUMN、UPDATE BEFORE、UPDATE AFTER。 |
| 系統變化編號SCN(System Change Number),用以標識資料庫在某個確切時刻提交事務的版本。每個已提交的事務分配一個唯一的SCN。 |
| 用於定位元據庫中一條記錄的一個相對唯一地址值。 |
| 事務ID。 |
| 事務內部的操作序號,從1開始記錄。 |
| 事務內部的操作總數。 |
| 表名。 |
| 事務內部操作的索引,格式為 |
| 事務提交至目標庫的時間。 |
樣本
插入資料
{
"meta": {
"time": "2017-06-16T14:24:34",
"userid": 84,
"op": "ins",
"scn": "14589063118712",
"rowid": "AAATGpAAIAAItcIAAA",
"trans": "7.0.411499",
"seq": 1,
"size": 11,
"table": "CL_BIZ1.MIO_LOG",
"idx": "1/11",
"posttime": "2017-06-16T14:33:52"
},
"data": {
"MIO_LOG_ID": "32539737"
}
}更新資料
{
"meta": {
"time": "2017-06-16T15:38:13",
"userid": 84,
"op": "upd",
"table": "CL_BIZ1.MIO_LOG"
….
},
"data": {
"CNTR_NO": "1171201606"
},
"key": {
"MIO_LOG_ID": "32537893",
"PLNMIO_REC_ID": "31557806",
"POL_CODE": null,
"CNTR_TYPE": null,
"CNTR_NO": "1171201606syui26"
}
}刪除資料
{
"meta": {
"time": "2017-06-16T15:51:35",
"userid": 84,
"op": "del",
},
"data": {
"MIO_LOG_ID": "32539739",
"PLNMIO_REC_ID": "31557806",
"POL_CODE": null,
"CNTR_TYPE": null,
"CG_NO": null
}
}Canal Json
參數說明
參數 | 說明 |
| 資料庫名稱。 |
| 操作在源庫的執行時間,13位Unix時間戳記,單位為毫秒。 說明 Unix時間戳記轉換工具可通過搜尋引擎擷取。 |
| 操作的序號。 |
| 是否是DDL操作。
|
| 欄位的資料類型。 說明 不支援精度等資料類型的參數資訊。 |
| 變更前或變更後的資料。 說明 2022年3月20日之前建立的同步或遷移執行個體, |
| 主鍵名稱。 |
| SQL語句。 |
| 經轉換處理後的欄位類型,取值與dataTypeNumber的數值相同。更多資訊,請參見欄位類型與dataTypeNumber數值的對應關係。 |
| 表名。 |
| 操作開始寫入到目標庫的時間,13位Unix時間戳記,單位為毫秒。 說明 Unix時間戳記轉換工具可通過搜尋引擎擷取。 |
| 操作的類型,比如DELETE、UPDATE、INSERT。 說明 全量任務階段固定為INIT。 |
| 全域事務標識GTID(Global Transaction Identifier),具有全域唯一性,一個事務對應一個GTID。 |
樣本
更新資料
2022年3月20日之前建立的同步或遷移執行個體,源表的DELETE語句同步或遷移到Kafka,其中old的值是資料,data的值是null。為了和開源社區保持一致,2022年3月20日起建立或重啟的同步或遷移執行個體,data的值是資料,old的值是null。
2022年3月20日之前建立的同步或遷移執行個體
{
"old": [
{
"shipping_type": "aaa"
}
],
"database": "dbname",
"es": 1600161894000,
"id": 58,
"isDdl": false,
"mysqlType": {
"id": "bigint",
"shipping_type": "varchar"
},
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": -5,
"shipping_type": 12
},
"table": "tablename",
"ts": 1600161894771,
"type": "DELETE"
}2022年3月20日起建立或重啟的同步或遷移執行個體
{
"data": [
{
"id": "500000287",
"shipping_type": null
}
],
"database": "dbname",
"es": 1600161894000,
"id": 58,
"isDdl": false,
"mysqlType": {
"id": "bigint",
"shipping_type": "varchar"
},
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": -5,
"shipping_type": 12
},
"table": "tablename",
"ts": 1600161894771,
"type": "DELETE"
}
DDL操作
{
"database":"dbname",表示同步或遷移的資料庫名稱
"es":1600161894000,表示源庫資料寫入到binlog的時間
"id":58,DTS緩衝的位移量
"isDdl":true,是否同步或遷移DDL
"sql":"eg:createxxx",Binlog的DDL語句
"table":"tablename",同步或遷移的表名
"ts":1600161894771,DTS將資料寫入目標的時間
"type":"DDL"
}