すべてのプロダクト
Search
ドキュメントセンター

Data Transmission Service:メッセージキューのデータ形式

最終更新日:May 07, 2025

Data Transmission Service (DTS) を使用して Kafka クラスタや ApsaraMQ for RocketMQ などのメッセージキューにデータを移行または同期する場合、メッセージキューにデータを格納する形式を指定できます。このトピックでは、メッセージキューにデータを格納するために使用できる形式について説明します。これらのデータ形式の定義に基づいてデータを解析できます。

データ形式

DTS では、以下のいずれかの形式でメッセージキューにデータを格納できます。

  • DTS Avro: データ構造またはオブジェクトを変換して格納と転送を容易にするためのデータシリアル化形式です。

  • Shareplex Json: データレプリケーションソフトウェア SharePlex を使用してソースデータベースから読み取られたデータがメッセージキューに格納される形式です。

  • Canal Json: Canal がソースデータベースの増分データに関するログを解析し、増分データをメッセージキューに送信した後に、データがメッセージキューに格納される形式です。

DTS Avro

DTS Avro のスキーマ定義に基づいてデータを解析する必要があります。詳細については、GitHub の subscribe_example/avro/ および subscribe_example/javaimpl/src/main/java/recordprocessor /AvroDeserializer.java をご覧ください。

説明

DTS Avro 形式では、データ定義言語 (DDL) 文は STRING 型です。

Shareplex Json

パラメータ

パラメータ

説明

time

データベース内のトランザクションがコミットされた UTC 時刻。このパラメータの値は、yyyy-MM-ddTHH:mm:ssZ 形式です。

userid

トランザクションをコミットしたユーザーの ID。

op

操作タイプ。有効な値: INSERT、UPDATE、DELETE、TRUNCATE、DROP COLUMN、UPDATE BEFORE、UPDATE AFTER。

scn

特定の時点においてデータベースがコミットするトランザクションのバージョンを識別するシステム変更番号 (SCN)。コミットされた各トランザクションには一意の SCN が割り当てられます。

rowid

データベース内のレコードを識別するために使用される相対的に一意のアドレス値。

trans

トランザクションの ID。

seq

トランザクションにおける操作のシーケンス番号。番号は 1 から始まります。

size

トランザクションにおける操作の総数。

table

テーブルの名前。

idx

トランザクションにおける操作のインデックス。 seq/size 形式です。たとえば、1/11 は、11 個の操作を含むトランザクションにおいて操作のシーケンス番号が 1 であることを示します。

posttime

トランザクションがターゲットデータベースにコミットされた時刻。

例:

挿入されたデータ

{
    "meta": {
        "time": "2017-06-16T14:24:34", 
        "userid": 84,                                    /* ユーザーID */
        "op": "ins",                                   /* 操作:挿入 */
          "scn": "14589063118712",                  /* システム変更番号 */
          "rowid": "AAATGpAAIAAItcIAAA",      /* 行ID */
        "trans": "7.0.411499",                 /* トランザクションID */
        "seq": 1,                                          /* シーケンス番号 */
        "size": 11,                                         /* 操作の総数 */
        "table": "CL_BIZ1.MIO_LOG",       /* テーブル名 */
          "idx": "1/11",                                       /* インデックス */
        "posttime": "2017-06-16T14:33:52" /* コミット時刻 */
    },
    "data": {
        "MIO_LOG_ID": "32539737"
     }
}

更新されたデータ

{
    "meta": {
        "time": "2017-06-16T15:38:13", /* 更新時刻 */
        "userid": 84, /* ユーザーID */
        "op": "upd",                             /* 操作:更新 */
        "table": "CL_BIZ1.MIO_LOG" /* テーブル名 */
        …. /* その他のメタデータ */
    },
    "data": {                                          /* 更新後のデータ */
        "CNTR_NO": "1171201606"
    },
    "key": {                                            /* 更新前のデータ */
        "MIO_LOG_ID": "32537893",
        "PLNMIO_REC_ID": "31557806",
        "POL_CODE": null,
        "CNTR_TYPE": null,
        "CNTR_NO": "1171201606syui26"
    }
}

削除されたデータ

{
    "meta": {
        "time": "2017-06-16T15:51:35", /* 削除時刻 */
        "userid": 84, /* ユーザーID */
        "op": "del",                      /* 操作:削除 */
     },
    "data": {                                    /* 削除されたデータ */
        "MIO_LOG_ID": "32539739",
        "PLNMIO_REC_ID": "31557806",
        "POL_CODE": null,
        "CNTR_TYPE": null,
        "CG_NO": null
     }
}

Canal Json

パラメータ

パラメータ

説明

database

データベースの名前。

es

データベースで操作が実行された時刻。値は 13 桁の UNIX タイムスタンプです。単位: ミリ秒。

説明

検索エンジンを使用して、UNIX タイムスタンプコンバーターを取得できます。

id

操作のシリアル番号。

isDdl

操作が DDL 操作かどうかを示します。

  • true: 操作は DDL 操作です。

  • false: 操作は DDL 操作ではありません。

mysqlType

フィールドのデータ型。

説明

精度の型などの操作パラメータはサポートされていません。

old および data

更新前と更新後のデータ。

説明

2022 年 3 月 20 日より前に作成されたデータ同期または移行インスタンスの場合、old の値は更新後のデータであり、data の値は更新前のデータです。デフォルトでは、すべての列のデータが含まれます。オープンソースコミュニティとの整合性を保つため、2022 年 3 月 20 日以降に作成または再起動されたデータ同期または移行インスタンスの場合、data の値は更新後のデータであり、old の値は更新前のデータです。

pkNames

プライマリキーの名前。

sql

SQL 文。

sqlType

変換されたフィールドの型。有効な値は、dataTypeNumber パラメータの有効な値と同じです。詳細については、「Kafka クライアントを使用して追跡されたデータを使用する」トピックの「MySQL データ型と dataTypeNumber 値のマッピング」セクションをご参照ください (MySQL データ型と dataTypeNumber 値のマッピング)。

table

テーブルの名前。

ts

ターゲットデータベースで操作の実行が開始された時刻。値は 13 桁の UNIX タイムスタンプです。単位: ミリ秒。

説明

検索エンジンを使用して、UNIX タイムスタンプコンバーターを取得できます。

type

操作タイプ。有効な値: DELETE、UPDATE、INSERT。

説明

完全同期または移行中は、操作タイプは INIT に固定されます。

gtid

トランザクションを識別するグローバルトランザクション ID (GTID)。各トランザクションにはグローバルに一意の GTID が割り当てられます。

更新されたデータ

説明

2022 年 3 月 20 日より前に作成され、ソーステーブルの DELETE 文を使用して Kafka クラスタに同期された変更追跡インスタンスの場合、old の値はデータであり、data の値は NULL です。オープンソースコミュニティとの整合性を保つため、2022 年 3 月 20 日以降に作成または再起動された変更追跡インスタンスの場合、data の値はデータであり、old の値は NULL です。

2022 年 3 月 20 日より前に作成されたデータ同期または移行インスタンス変更の保存

{
    "old": [ /* 更新後のデータ */
        {
            "shipping_type": "aaa"
        }
    ], 
    "database": "dbname", /* データベース名 */
    "es": 1600161894000, /* ソースデータベースへの書き込み時刻 */
    "id": 58, /* 操作のシリアル番号 */
    "isDdl": false, /* DDL操作かどうか */
    "mysqlType": { /* データ型 */
        "id": "bigint", 
        "shipping_type": "varchar"
    }, 
    "pkNames": [ /* プライマリキー名 */
        "id"
    ], 
    "sql": "", /* SQL文 */
    "sqlType": { /* 変換されたフィールド型 */
        "id": -5, 
        "shipping_type": 12
    }, 
    "table": "tablename", /* テーブル名 */
    "ts": 1600161894771, /* ターゲットデータベースへの書き込み時刻 */
    "type": "DELETE" /* 操作タイプ */
}

2022 年 3 月 20 日以降に作成または再起動されたデータ同期または移行インスタンス

{
    "data": [ /* 更新後のデータ */
        {
            "id": "500000287", 
            "shipping_type": null
        }
    ], 
    "database": "dbname", /* データベース名 */
    "es": 1600161894000, /* ソースデータベースへの書き込み時刻 */
    "id": 58, /* 操作のシリアル番号 */
    "isDdl": false, /* DDL操作かどうか */
    "mysqlType": { /* データ型 */
        "id": "bigint", 
        "shipping_type": "varchar"
    }, 
    "pkNames": [ /* プライマリキー名 */
        "id"
    ], 
    "sql": "", /* SQL文 */
    "sqlType": { /* 変換されたフィールド型 */
        "id": -5, 
        "shipping_type": 12
    }, 
    "table": "tablename", /* テーブル名 */
    "ts": 1600161894771, /* ターゲットデータベースへの書き込み時刻 */
    "type": "DELETE" /* 操作タイプ */
}
            

DDL 操作

{
    "database":"dbname", /* ソースデータベースの名前 */
    "es":1600161894000, /* ソースデータベースのデータがバイナリログに書き込まれた時刻 */
    "id":58, /* DTS キャッシュのオフセット */
    "isDdl":true, /* DDL 操作を同期または移行するかどうかを指定します */
    "sql":"eg:createxxx", /* バイナリログに記録された DDL 文 */
    "table":"tablename", /* ソーステーブルの名前 */
    "ts":1600161894771, /* DTS がターゲットデータベースにデータを書き込んだ時刻 */
    "type":"DDL" /* 操作タイプ */
}