Data Transmission Service (DTS) を使用して Kafka クラスタや ApsaraMQ for RocketMQ などのメッセージキューにデータを移行または同期する場合、メッセージキューにデータを格納する形式を指定できます。このトピックでは、メッセージキューにデータを格納するために使用できる形式について説明します。これらのデータ形式の定義に基づいてデータを解析できます。
データ形式
DTS では、以下のいずれかの形式でメッセージキューにデータを格納できます。
DTS Avro: データ構造またはオブジェクトを変換して格納と転送を容易にするためのデータシリアル化形式です。
Shareplex Json: データレプリケーションソフトウェア SharePlex を使用してソースデータベースから読み取られたデータがメッセージキューに格納される形式です。
Canal Json: Canal がソースデータベースの増分データに関するログを解析し、増分データをメッセージキューに送信した後に、データがメッセージキューに格納される形式です。
DTS Avro
DTS Avro のスキーマ定義に基づいてデータを解析する必要があります。詳細については、GitHub の subscribe_example/avro/ および subscribe_example/javaimpl/src/main/java/recordprocessor /AvroDeserializer.java をご覧ください。
DTS Avro 形式では、データ定義言語 (DDL) 文は STRING 型です。
Shareplex Json
パラメータ
パラメータ | 説明 |
| データベース内のトランザクションがコミットされた UTC 時刻。このパラメータの値は、yyyy-MM-ddTHH:mm:ssZ 形式です。 |
| トランザクションをコミットしたユーザーの ID。 |
| 操作タイプ。有効な値: INSERT、UPDATE、DELETE、TRUNCATE、DROP COLUMN、UPDATE BEFORE、UPDATE AFTER。 |
| 特定の時点においてデータベースがコミットするトランザクションのバージョンを識別するシステム変更番号 (SCN)。コミットされた各トランザクションには一意の SCN が割り当てられます。 |
| データベース内のレコードを識別するために使用される相対的に一意のアドレス値。 |
| トランザクションの ID。 |
| トランザクションにおける操作のシーケンス番号。番号は 1 から始まります。 |
| トランザクションにおける操作の総数。 |
| テーブルの名前。 |
| トランザクションにおける操作のインデックス。 |
| トランザクションがターゲットデータベースにコミットされた時刻。 |
例:
挿入されたデータ
{
"meta": {
"time": "2017-06-16T14:24:34",
"userid": 84, /* ユーザーID */
"op": "ins", /* 操作:挿入 */
"scn": "14589063118712", /* システム変更番号 */
"rowid": "AAATGpAAIAAItcIAAA", /* 行ID */
"trans": "7.0.411499", /* トランザクションID */
"seq": 1, /* シーケンス番号 */
"size": 11, /* 操作の総数 */
"table": "CL_BIZ1.MIO_LOG", /* テーブル名 */
"idx": "1/11", /* インデックス */
"posttime": "2017-06-16T14:33:52" /* コミット時刻 */
},
"data": {
"MIO_LOG_ID": "32539737"
}
}更新されたデータ
{
"meta": {
"time": "2017-06-16T15:38:13", /* 更新時刻 */
"userid": 84, /* ユーザーID */
"op": "upd", /* 操作:更新 */
"table": "CL_BIZ1.MIO_LOG" /* テーブル名 */
…. /* その他のメタデータ */
},
"data": { /* 更新後のデータ */
"CNTR_NO": "1171201606"
},
"key": { /* 更新前のデータ */
"MIO_LOG_ID": "32537893",
"PLNMIO_REC_ID": "31557806",
"POL_CODE": null,
"CNTR_TYPE": null,
"CNTR_NO": "1171201606syui26"
}
}削除されたデータ
{
"meta": {
"time": "2017-06-16T15:51:35", /* 削除時刻 */
"userid": 84, /* ユーザーID */
"op": "del", /* 操作:削除 */
},
"data": { /* 削除されたデータ */
"MIO_LOG_ID": "32539739",
"PLNMIO_REC_ID": "31557806",
"POL_CODE": null,
"CNTR_TYPE": null,
"CG_NO": null
}
}Canal Json
パラメータ
パラメータ | 説明 |
| データベースの名前。 |
| データベースで操作が実行された時刻。値は 13 桁の UNIX タイムスタンプです。単位: ミリ秒。 説明 検索エンジンを使用して、UNIX タイムスタンプコンバーターを取得できます。 |
| 操作のシリアル番号。 |
| 操作が DDL 操作かどうかを示します。
|
| フィールドのデータ型。 説明 精度の型などの操作パラメータはサポートされていません。 |
| 更新前と更新後のデータ。 説明 2022 年 3 月 20 日より前に作成されたデータ同期または移行インスタンスの場合、 |
| プライマリキーの名前。 |
| SQL 文。 |
| 変換されたフィールドの型。有効な値は、dataTypeNumber パラメータの有効な値と同じです。詳細については、「Kafka クライアントを使用して追跡されたデータを使用する」トピックの「MySQL データ型と dataTypeNumber 値のマッピング」セクションをご参照ください (MySQL データ型と dataTypeNumber 値のマッピング)。 |
| テーブルの名前。 |
| ターゲットデータベースで操作の実行が開始された時刻。値は 13 桁の UNIX タイムスタンプです。単位: ミリ秒。 説明 検索エンジンを使用して、UNIX タイムスタンプコンバーターを取得できます。 |
| 操作タイプ。有効な値: DELETE、UPDATE、INSERT。 説明 完全同期または移行中は、操作タイプは INIT に固定されます。 |
| トランザクションを識別するグローバルトランザクション ID (GTID)。各トランザクションにはグローバルに一意の GTID が割り当てられます。 |
例
更新されたデータ
2022 年 3 月 20 日より前に作成され、ソーステーブルの DELETE 文を使用して Kafka クラスタに同期された変更追跡インスタンスの場合、old の値はデータであり、data の値は NULL です。オープンソースコミュニティとの整合性を保つため、2022 年 3 月 20 日以降に作成または再起動された変更追跡インスタンスの場合、data の値はデータであり、old の値は NULL です。
2022 年 3 月 20 日より前に作成されたデータ同期または移行インスタンス変更の保存
{
"old": [ /* 更新後のデータ */
{
"shipping_type": "aaa"
}
],
"database": "dbname", /* データベース名 */
"es": 1600161894000, /* ソースデータベースへの書き込み時刻 */
"id": 58, /* 操作のシリアル番号 */
"isDdl": false, /* DDL操作かどうか */
"mysqlType": { /* データ型 */
"id": "bigint",
"shipping_type": "varchar"
},
"pkNames": [ /* プライマリキー名 */
"id"
],
"sql": "", /* SQL文 */
"sqlType": { /* 変換されたフィールド型 */
"id": -5,
"shipping_type": 12
},
"table": "tablename", /* テーブル名 */
"ts": 1600161894771, /* ターゲットデータベースへの書き込み時刻 */
"type": "DELETE" /* 操作タイプ */
}2022 年 3 月 20 日以降に作成または再起動されたデータ同期または移行インスタンス
{
"data": [ /* 更新後のデータ */
{
"id": "500000287",
"shipping_type": null
}
],
"database": "dbname", /* データベース名 */
"es": 1600161894000, /* ソースデータベースへの書き込み時刻 */
"id": 58, /* 操作のシリアル番号 */
"isDdl": false, /* DDL操作かどうか */
"mysqlType": { /* データ型 */
"id": "bigint",
"shipping_type": "varchar"
},
"pkNames": [ /* プライマリキー名 */
"id"
],
"sql": "", /* SQL文 */
"sqlType": { /* 変換されたフィールド型 */
"id": -5,
"shipping_type": 12
},
"table": "tablename", /* テーブル名 */
"ts": 1600161894771, /* ターゲットデータベースへの書き込み時刻 */
"type": "DELETE" /* 操作タイプ */
}
DDL 操作
{
"database":"dbname", /* ソースデータベースの名前 */
"es":1600161894000, /* ソースデータベースのデータがバイナリログに書き込まれた時刻 */
"id":58, /* DTS キャッシュのオフセット */
"isDdl":true, /* DDL 操作を同期または移行するかどうかを指定します */
"sql":"eg:createxxx", /* バイナリログに記録された DDL 文 */
"table":"tablename", /* ソーステーブルの名前 */
"ts":1600161894771, /* DTS がターゲットデータベースにデータを書き込んだ時刻 */
"type":"DDL" /* 操作タイプ */
}