すべてのプロダクト
Search
ドキュメントセンター

Data Transmission Service:メッセージキューのデータ形式

最終更新日:Dec 14, 2025

Data Transmission Service (DTS) を使用すると、Kafka や RocketMQ などのメッセージキューに同期または移行するデータのストレージ形式を選択できます。このトピックでは、データの解析に役立つ利用可能なデータ形式について説明します。

データ形式

DTS では、メッセージキューに書き込まれるデータを次の形式で保存できます。

  • DTS Avro:データ構造やオブジェクトをストレージや伝送用に変換するために使用されるデータシリアル化形式です。

  • Shareplex Json:データレプリケーションソフトウェアの SharePlex がソースデータベースからデータを読み取り、メッセージキューに書き込む際に使用されるデータ形式です。

  • Canal JsonCanal が増分データベースログを解析し、増分データをメッセージキューに送信する際に使用されるデータ形式です。

DTS Avro

DTS Avro スキーマ定義に基づいてデータを解析できます。詳細については、「DTS Avro スキーマ定義」および「DTS Avro デシリアライゼーションの例」をご参照ください。

説明

DTS Avro 形式では、データ定義言語 (DDL) 文は文字列型です。

Shareplex Json

パラメータ

パラメータ

説明

time

トランザクションがデータベースにコミットされた時間。形式は yyyy-MM-ddTHH:mm:ssZ (UTC) です。

userid

トランザクションをコミットしたユーザーの ID。

op

データ操作タイプ。有効な値:INSERT、UPDATE、DELETE、TRUNCATE、DROP COLUMN、UPDATE BEFORE、UPDATE AFTER。

scn

システム変更番号 (SCN)。データベースが特定の時点でコミットしたトランザクションのバージョンを識別します。コミットされた各トランザクションには、一意の SCN が割り当てられます。

rowid

データベース内のレコードを特定するために使用される、比較的一意なアドレス値。

trans

トランザクション ID。

seq

トランザクション内の操作のシーケンス番号。シーケンスは 1 から始まります。

size

トランザクションにおける操作の総数。

table

テーブル名。

idx

トランザクション内の操作のインデックス。形式は seq/size です。たとえば、1/11 は、11 個の操作を含むトランザクションの最初の操作であることを示します。

posttime

トランザクションがターゲットデータベースにコミットされた時刻。

データの挿入

{
    "meta": {
        "time": "2017-06-16T14:24:34", 
        "userid": 84,                                    
        "op": "ins",                                   
          "scn": "14589063118712",                  
          "rowid": "AAATGpAAIAAItcIAAA",      
        "trans": "7.0.411499",                 
        "seq": 1,                                          
        "size": 11,                                         
        "table": "CL_BIZ1.MIO_LOG",       
          "idx": "1/11",                                       
        "posttime": "2017-06-16T14:33:52"
    },
    "data": {
        "MIO_LOG_ID": "32539737"
     }
}

データの更新

{
    "meta": {
        "time": "2017-06-16T15:38:13",
        "userid": 84,
        "op": "upd",                             
        "table": "CL_BIZ1.MIO_LOG"
        ….
    },
    "data": {                                          
        "CNTR_NO": "1171201606"
    },
    "key": {                                            
        "MIO_LOG_ID": "32537893",
        "PLNMIO_REC_ID": "31557806",
        "POL_CODE": null,
        "CNTR_TYPE": null,
        "CNTR_NO": "1171201606syui26"
    }
}

データの削除

{
    "meta": {
        "time": "2017-06-16T15:51:35",
        "userid": 84,
        "op": "del",                      
     },
    "data": {                                    
        "MIO_LOG_ID": "32539739",
        "PLNMIO_REC_ID": "31557806",
        "POL_CODE": null,
        "CNTR_TYPE": null,
        "CG_NO": null
     }
}

Canal Json

パラメータ

パラメータ

説明

database

データベース名。

es

ソースデータベースで操作が実行された時間。ミリ秒単位の 13 桁の UNIX タイムスタンプです。

説明
  • 実際の実行時間は秒単位です。値にはミリ秒を表すために 3 つのゼロが追加されます。

  • 検索エンジンを使用して UNIX タイムスタンプコンバーターを見つけることができます。

id

操作のシリアル番号。

説明

この値は、タイムスタンプと内部 DTS オフセットから生成されます。操作の順序を決定するのに役立ちます。

isDdl

操作が DDL 操作であるかどうかを指定します。

  • true: 真。

  • false:いいえ

mysqlType

フィールドのデータ型。

説明

精度などのパラメーター情報はサポートされていません。

old および data

変更前または変更後のデータ。

説明

2022 年 3 月 20 日より前に作成された同期または移行タスクの場合、old は変更後のデータを格納し、data は変更前のデータを格納します。デフォルトでは、old には変更された列だけでなく、すべての列のデータが含まれます。オープンソースコミュニティとの整合性を保つため、2022 年 3 月 20 日以降に作成または再起動されたタスクでは、data が変更後のデータを格納し、old が変更前のデータを格納します。

pkNames

プライマリキー名。

sql

SQL 文。

sqlType

変換されたフィールドタイプ。値は dataTypeNumber の値と同じです。詳細については、「フィールドタイプと dataTypeNumber 値のマッピング」をご参照ください。

table

テーブル名。

ts

操作がターゲットデータベースに書き込まれ始める時間。ミリ秒単位の 13 桁の UNIX タイムスタンプです。

説明

検索エンジンを使用して UNIX タイムスタンプコンバーターを見つけることができます。

type

DELETE、UPDATE、INSERT などの操作タイプ。

説明

完全なタスクフェーズでは、タイプは INIT に固定されます。

gtid

グローバル トランザクション識別子 (GTID)。GTID はグローバルに一意です。各トランザクションは 1 つの GTID に対応します。

データの更新

説明

2022 年 3 月 20 日より前に作成された同期または移行タスクの場合、ソーステーブルからの DELETE 文が Kafka に同期または移行されると、old にはデータが含まれ、data は null になります。オープンソースコミュニティとの整合性を保つため、この動作は 2022 年 3 月 20 日以降に作成または再起動されたタスクで変更されました。これらのタスクでは、data にデータが含まれ、old は null になります。

2022 年 3 月 20 日以前に作成された同期または移行タスク

{
    "old": [ /* 更新後のデータ */
        {
            "shipping_type": "aaa"
        }
    ], 
    "database": "dbname", /* データベース名 */
    "es": 1600161894000, /* ソースデータベースへの書き込み時刻 */
    "id": 58, /* 操作のシリアル番号 */
    "isDdl": false, /* DDL操作かどうか */
    "mysqlType": { /* データ型 */
        "id": "bigint", 
        "shipping_type": "varchar"
    }, 
    "pkNames": [ /* プライマリキー名 */
        "id"
    ], 
    "sql": "", /* SQL文 */
    "sqlType": { /* 変換されたフィールド型 */
        "id": -5, 
        "shipping_type": 12
    }, 
    "table": "tablename", /* テーブル名 */
    "ts": 1600161894771, /* ターゲットデータベースへの書き込み時刻 */
    "type": "DELETE" /* 操作タイプ */
}

2022 年 3 月 20 日以降に作成または再起動された同期または移行タスク

{
    "data": [ /* 更新後のデータ */
        {
            "id": "500000287", 
            "shipping_type": null
        }
    ], 
    "database": "dbname", /* データベース名 */
    "es": 1600161894000, /* ソースデータベースへの書き込み時刻 */
    "id": 58, /* 操作のシリアル番号 */
    "isDdl": false, /* DDL操作かどうか */
    "mysqlType": { /* データ型 */
        "id": "bigint", 
        "shipping_type": "varchar"
    }, 
    "pkNames": [ /* プライマリキー名 */
        "id"
    ], 
    "sql": "", /* SQL文 */
    "sqlType": { /* 変換されたフィールド型 */
        "id": -5, 
        "shipping_type": 12
    }, 
    "table": "tablename", /* テーブル名 */
    "ts": 1600161894771, /* ターゲットデータベースへの書き込み時刻 */
    "type": "DELETE" /* 操作タイプ */
}
            

DDL 操作

{
    "database":"dbname", // 同期または移行対象のデータベース名。
    "es":1600161894000, // ソースデータベースのデータがバイナリログに書き込まれた時間。
    "id":58, // DTS キャッシュ内のオフセット。
    "isDdl":true, // DDL 操作を同期または移行するかどうかを指定します。
    "sql":"eg:createxxx", // バイナリログからの DDL 文。
    "table":"tablename", // 同期または移行対象のテーブル名。
    "ts":1600161894771, // DTS がターゲットにデータを書き込む時間。
    "type":"DDL"
}