Data Transmission Service (DTS) を使用すると、Kafka や RocketMQ などのメッセージキューに同期または移行するデータのストレージ形式を選択できます。このトピックでは、データの解析に役立つ利用可能なデータ形式について説明します。
データ形式
DTS では、メッセージキューに書き込まれるデータを次の形式で保存できます。
DTS Avro:データ構造やオブジェクトをストレージや伝送用に変換するために使用されるデータシリアル化形式です。
Shareplex Json:データレプリケーションソフトウェアの SharePlex がソースデータベースからデータを読み取り、メッセージキューに書き込む際に使用されるデータ形式です。
Canal Json:Canal が増分データベースログを解析し、増分データをメッセージキューに送信する際に使用されるデータ形式です。
DTS Avro
DTS Avro スキーマ定義に基づいてデータを解析できます。詳細については、「DTS Avro スキーマ定義」および「DTS Avro デシリアライゼーションの例」をご参照ください。
DTS Avro 形式では、データ定義言語 (DDL) 文は文字列型です。
Shareplex Json
パラメータ
パラメータ | 説明 |
| トランザクションがデータベースにコミットされた時間。形式は yyyy-MM-ddTHH:mm:ssZ (UTC) です。 |
| トランザクションをコミットしたユーザーの ID。 |
| データ操作タイプ。有効な値:INSERT、UPDATE、DELETE、TRUNCATE、DROP COLUMN、UPDATE BEFORE、UPDATE AFTER。 |
| システム変更番号 (SCN)。データベースが特定の時点でコミットしたトランザクションのバージョンを識別します。コミットされた各トランザクションには、一意の SCN が割り当てられます。 |
| データベース内のレコードを特定するために使用される、比較的一意なアドレス値。 |
| トランザクション ID。 |
| トランザクション内の操作のシーケンス番号。シーケンスは 1 から始まります。 |
| トランザクションにおける操作の総数。 |
| テーブル名。 |
| トランザクション内の操作のインデックス。形式は |
| トランザクションがターゲットデータベースにコミットされた時刻。 |
例
データの挿入
{
"meta": {
"time": "2017-06-16T14:24:34",
"userid": 84,
"op": "ins",
"scn": "14589063118712",
"rowid": "AAATGpAAIAAItcIAAA",
"trans": "7.0.411499",
"seq": 1,
"size": 11,
"table": "CL_BIZ1.MIO_LOG",
"idx": "1/11",
"posttime": "2017-06-16T14:33:52"
},
"data": {
"MIO_LOG_ID": "32539737"
}
}データの更新
{
"meta": {
"time": "2017-06-16T15:38:13",
"userid": 84,
"op": "upd",
"table": "CL_BIZ1.MIO_LOG"
….
},
"data": {
"CNTR_NO": "1171201606"
},
"key": {
"MIO_LOG_ID": "32537893",
"PLNMIO_REC_ID": "31557806",
"POL_CODE": null,
"CNTR_TYPE": null,
"CNTR_NO": "1171201606syui26"
}
}データの削除
{
"meta": {
"time": "2017-06-16T15:51:35",
"userid": 84,
"op": "del",
},
"data": {
"MIO_LOG_ID": "32539739",
"PLNMIO_REC_ID": "31557806",
"POL_CODE": null,
"CNTR_TYPE": null,
"CG_NO": null
}
}Canal Json
パラメータ
パラメータ | 説明 |
| データベース名。 |
| ソースデータベースで操作が実行された時間。ミリ秒単位の 13 桁の UNIX タイムスタンプです。 説明
|
| 操作のシリアル番号。 説明 この値は、タイムスタンプと内部 DTS オフセットから生成されます。操作の順序を決定するのに役立ちます。 |
| 操作が DDL 操作であるかどうかを指定します。
|
| フィールドのデータ型。 説明 精度などのパラメーター情報はサポートされていません。 |
| 変更前または変更後のデータ。 説明 2022 年 3 月 20 日より前に作成された同期または移行タスクの場合、old は変更後のデータを格納し、 |
| プライマリキー名。 |
| SQL 文。 |
| 変換されたフィールドタイプ。値は dataTypeNumber の値と同じです。詳細については、「フィールドタイプと dataTypeNumber 値のマッピング」をご参照ください。 |
| テーブル名。 |
| 操作がターゲットデータベースに書き込まれ始める時間。ミリ秒単位の 13 桁の UNIX タイムスタンプです。 説明 検索エンジンを使用して UNIX タイムスタンプコンバーターを見つけることができます。 |
| DELETE、UPDATE、INSERT などの操作タイプ。 説明 完全なタスクフェーズでは、タイプは INIT に固定されます。 |
| グローバル トランザクション識別子 (GTID)。GTID はグローバルに一意です。各トランザクションは 1 つの GTID に対応します。 |
例
データの更新
2022 年 3 月 20 日より前に作成された同期または移行タスクの場合、ソーステーブルからの DELETE 文が Kafka に同期または移行されると、old にはデータが含まれ、data は null になります。オープンソースコミュニティとの整合性を保つため、この動作は 2022 年 3 月 20 日以降に作成または再起動されたタスクで変更されました。これらのタスクでは、data にデータが含まれ、old は null になります。
2022 年 3 月 20 日以前に作成された同期または移行タスク
{
"old": [ /* 更新後のデータ */
{
"shipping_type": "aaa"
}
],
"database": "dbname", /* データベース名 */
"es": 1600161894000, /* ソースデータベースへの書き込み時刻 */
"id": 58, /* 操作のシリアル番号 */
"isDdl": false, /* DDL操作かどうか */
"mysqlType": { /* データ型 */
"id": "bigint",
"shipping_type": "varchar"
},
"pkNames": [ /* プライマリキー名 */
"id"
],
"sql": "", /* SQL文 */
"sqlType": { /* 変換されたフィールド型 */
"id": -5,
"shipping_type": 12
},
"table": "tablename", /* テーブル名 */
"ts": 1600161894771, /* ターゲットデータベースへの書き込み時刻 */
"type": "DELETE" /* 操作タイプ */
}2022 年 3 月 20 日以降に作成または再起動された同期または移行タスク
{
"data": [ /* 更新後のデータ */
{
"id": "500000287",
"shipping_type": null
}
],
"database": "dbname", /* データベース名 */
"es": 1600161894000, /* ソースデータベースへの書き込み時刻 */
"id": 58, /* 操作のシリアル番号 */
"isDdl": false, /* DDL操作かどうか */
"mysqlType": { /* データ型 */
"id": "bigint",
"shipping_type": "varchar"
},
"pkNames": [ /* プライマリキー名 */
"id"
],
"sql": "", /* SQL文 */
"sqlType": { /* 変換されたフィールド型 */
"id": -5,
"shipping_type": 12
},
"table": "tablename", /* テーブル名 */
"ts": 1600161894771, /* ターゲットデータベースへの書き込み時刻 */
"type": "DELETE" /* 操作タイプ */
}
DDL 操作
{
"database":"dbname", // 同期または移行対象のデータベース名。
"es":1600161894000, // ソースデータベースのデータがバイナリログに書き込まれた時間。
"id":58, // DTS キャッシュ内のオフセット。
"isDdl":true, // DDL 操作を同期または移行するかどうかを指定します。
"sql":"eg:createxxx", // バイナリログからの DDL 文。
"table":"tablename", // 同期または移行対象のテーブル名。
"ts":1600161894771, // DTS がターゲットにデータを書き込む時間。
"type":"DDL"
}