このトピックでは、Kafkaに書き込まれるメッセージの形式と、メッセージ内の各フィールドの意味について説明します。
背景情報
データソース内のすべてのデータをKafkaに同期するノードは、DataWorksがデータソースから読み取ったデータをJSON形式でKafkaトピックに書き込みます。 Kafkaに書き込まれるメッセージには、列の変更情報と、変更前後のデータの状態が含まれています。 コンシューマーがKafkaメッセージを消費するときに同期ノードの進行状況を把握できるように、同期ノードは定期的に、値がMHEARTBEATのopフィールドを含むハートビートメッセージを生成し、Kafkaトピックに書き込みます。 Kafkaに書き込まれるメッセージの形式の詳細については、Kafkaメッセージの形式、同期ノードによって生成されるハートビートメッセージの形式、およびソースのデータ変更に対するKafkaメッセージの形式をご参照ください。 メッセージ内の各フィールドのタイプと意味の詳細については、フィールドタイプおよびフィールドをご参照ください。
Kafkaメッセージの形式
Kafkaに書き込まれるメッセージの形式:
{
"schema": { // メタデータの変更情報。カラム名とカラムタイプのみが指定されます。
"dataColumn": [// カラムの変更情報。デスティネーション トピックのデータは、この情報に基づいて更新されます。
{
"name": "id",
"type": "LONG"
},
{
"name": "name",
"type": "STRING"
},
{
"name": "binData",
"type": "BYTES"
},
{
"name": "ts",
"type": "DATE"
},
{
"name":"rowid",// Oracle データソースを使用する場合、rowid がカラムとして追加されます。
"type":"STRING"
}
],
"primaryKey": [
"pkName1",
"pkName2"
],
"source": {
"dbType": "mysql",
"dbVersion": "1.0.0",
"dbName": "myDatabase",
"schemaName": "mySchema",
"tableName": "tableName"
}
},
"payload": {
"before": {
"dataColumn":{
"id": 111,
"name":"scooter",
"binData": "[base64 string]", // base64 文字列
"ts": 1590315269000,
"rowid": "AAIUMPAAFAACxExAAE"// Oracle データソースの行の ID。値は STRING タイプです。
}
},
"after": {
"dataColumn":{
"id": 222,
"name":"donald",
"binData": "[base64 string]", // base64 文字列
"ts": 1590315269000,
"rowid": "AAIUMPAAFAACxExAAE"// Oracle データソースの行の ID。値は STRING タイプです。
}
},
"sequenceId":"XXX",// 増分データとすべてのデータがマージされた後に生成される各データレコードの一意のシーケンス ID。値は STRING タイプです。
"scn":"xxxx",// Oracle データソースのシステム変更番号 (SCN)。値は STRING タイプです。
"op": "INSERT/UPDATE_BEFOR/UPDATE_AFTER/DELETE/TRANSACTION_BEGIN/TRANSACTION_END/CREATE/ALTER/ERASE/QUERY/TRUNCATE/RENAME/CINDEX/DINDEX/GTID/XACOMMIT/XAROLLBACK/MHEARTBEAT...",// 実行される操作。パラメータの値は大文字と小文字が区別されます。
"timestamp": {
"eventTime": 1,// 必須。ソースデータベースのデータが変更された時刻。値はミリ秒単位の 13 桁のタイムスタンプです。
"systemTime": 2,// オプション。同期ノードが変更メッセージを読み取った時刻。値はミリ秒単位の 13 桁のタイムスタンプです。
"checkpointTime": 3// オプション。同期オフセットがリセットされる指定時刻。値はミリ秒単位の 13 桁のタイムスタンプで、ほとんどの場合、eventTime フィールドの値と同じです。
},
"ddl": {
"text": "ADD COLUMN ...",
"ddlMeta": "[SQLStatement シリアル化されたバイナリ、base64 文字列で表現]" // SQLStatement シリアル化されたバイナリ、base64 文字列で表現
}
},
"version":"1.0.0"
}同期ノードによって生成されるハートビートメッセージの形式
{
"schema": {
"dataColumn": null,
"primaryKey": null,
"source": null
},
"payload": {
"before": null,
"after": null,
"sequenceId": null,
"timestamp": {
"eventTime": 1620457659000,
"checkpointTime": 1620457659000
},
"op": "MHEARTBEAT",
"ddl": null
},
"version": "0.0.1"
}ソースのデータ変更に対するKafkaメッセージの形式
ソースへのデータ挿入の Kafka メッセージの形式:
{ "schema": { "dataColumn": [ { "name": "name", "type": "STRING" }, { "name": "job", "type": "STRING" }, { "name": "sex", "type": "STRING" }, { "name": "#alibaba_rds_row_id#", "type": "LONG" } ], "primaryKey": null, "source": { "dbType": "MySQL", "dbName": "pkset_test", "tableName": "pkset_test_no_pk" } }, "payload": { "before": null, "after": { "dataColumn": { "name": "name11", "job": "job11", "sex": "man", "#alibaba_rds_row_id#": 15 } }, "sequenceId": "1620457642589000000", "timestamp": { "eventTime": 1620457896000, "systemTime": 1620457896977, "checkpointTime": 1620457896000 }, "op": "INSERT", "ddl": null }, "version": "0.0.1" }ソースのデータ更新の Kafka メッセージの形式:
[ソース内の 1 つのレコードが更新されると、1 つの Kafka レコードが生成されます] が選択されていない場合、ソースのデータ更新に対して 2 つの Kafka メッセージが生成されます。1 つの Kafka メッセージは更新前のデータの状態を表し、もう 1 つの Kafka メッセージは更新後のデータの状態を表します。次のサンプルメッセージは、形式を示しています。
変更前のデータの状態を表す Kafka メッセージの形式:
{ "schema": { "dataColumn": [ { "name": "name", "type": "STRING" }, { "name": "job", "type": "STRING" }, { "name": "sex", "type": "STRING" }, { "name": "#alibaba_rds_row_id#", "type": "LONG" } ], "primaryKey": null, "source": { "dbType": "MySQL", "dbName": "pkset_test", "tableName": "pkset_test_no_pk" } }, "payload": { "before": { "dataColumn": { "name": "name11", "job": "job11", "sex": "man", "#alibaba_rds_row_id#": 15 } }, "after": null, "sequenceId": "1620457642589000001", "timestamp": { "eventTime": 1620458077000, "systemTime": 1620458077779, "checkpointTime": 1620458077000 }, "op": "UPDATE_BEFOR", "ddl": null }, "version": "0.0.1" }変更後のデータの状態を表す Kafka メッセージの形式:
{ "schema": { "dataColumn": [ { "name": "name", "type": "STRING" }, { "name": "job", "type": "STRING" }, { "name": "sex", "type": "STRING" }, { "name": "#alibaba_rds_row_id#", "type": "LONG" } ], "primaryKey": null, "source": { "dbType": "MySQL", "dbName": "pkset_test", "tableName": "pkset_test_no_pk" } }, "payload": { "before": null, "after": { "dataColumn": { "name": "name11", "job": "job11", "sex": "woman", "#alibaba_rds_row_id#": 15 } }, "sequenceId": "1620457642589000001", "timestamp": { "eventTime": 1620458077000, "systemTime": 1620458077779, "checkpointTime": 1620458077000 }, "op": "UPDATE_AFTER", "ddl": null }, "version": "0.0.1" }[ソース内の 1 つのレコードが更新されると、1 つの Kafka レコードが生成されます] が選択されている場合、ソースのデータ更新に対して 1 つの Kafka メッセージのみが生成されます。 Kafka メッセージは、更新前後のデータの状態を表します。次のサンプルメッセージは、形式を示しています。
{ "schema": { "dataColumn": [ { "name": "name", "type": "STRING" }, { "name": "job", "type": "STRING" }, { "name": "sex", "type": "STRING" }, { "name": "#alibaba_rds_row_id#", "type": "LONG" } ], "primaryKey": null, "source": { "dbType": "MySQL", "dbName": "pkset_test", "tableName": "pkset_test_no_pk" } }, "payload": { "before": { "dataColumn": { "name": "name11", "job": "job11", "sex": "man", "#alibaba_rds_row_id#": 15 } }, "after": { "dataColumn": { "name": "name11", "job": "job11", "sex": "woman", "#alibaba_rds_row_id#": 15 } }, "sequenceId": "1620457642589000001", "timestamp": { "eventTime": 1620458077000, "systemTime": 1620458077779, "checkpointTime": 1620458077000 }, "op": "UPDATE_AFTER", "ddl": null }, "version": "0.0.1" }
ソースからのデータ削除の Kafka メッセージの形式:
{ "schema": { "dataColumn": [ { "name": "name", "type": "STRING" }, { "name": "job", "type": "STRING" }, { "name": "sex", "type": "STRING" }, { "name": "#alibaba_rds_row_id#", "type": "LONG" } ], "primaryKey": null, "source": { "dbType": "MySQL", "dbName": "pkset_test", "tableName": "pkset_test_no_pk" } }, "payload": { "before": { "dataColumn": { "name": "name11", "job": "job11", "sex": "woman", "#alibaba_rds_row_id#": 15 } }, "after": null, "sequenceId": "1620457642589000002", "timestamp": { "eventTime": 1620458266000, "systemTime": 1620458266101, "checkpointTime": 1620458266000 }, "op": "DELETE", "ddl": null }, "version": "0.0.1" }
単一テーブルのリアルタイム出力のメッセージ形式
リアルタイム単一テーブル同期タスクの出力先として Kafka を構成する場合は、Kafka に書き込まれる値の形式を確認する必要があります。有効な形式は、Canal CDC と JSON です。詳細については、「付録: 出力形式の説明」をご参照ください。
フィールドタイプ
ソースから読み取ったデータは、BOOLEAN、DOUBLE、DATE、BYTES、LONG、STRING の各タイプにマッピングされ、異なる JSON 形式で Kafka トピックに書き込まれます。
フィールドタイプ | 説明 |
BOOLEAN | JSON の BOOLEAN タイプに対応します。有効な値: true および false。 |
DATE | JSON の NUMBER タイプに対応します。値はミリ秒単位の 13 桁のタイムスタンプです。 |
BYTES | JSON の STRING タイプに対応します。データが Kafka に書き込まれる前に、バイト配列は Base64 でエンコードされ、文字列に変換されます。コンシューマーは、文字列を消費する前に、Base64 でエンコードされた文字列をデコードする必要があります。エンコードには Base64.getEncoder().encodeToString(text.getBytes("UTF-8")) が使用され、デコードには Base64.getDecoder().decode(encodedText)) が使用されます。 |
STRING | JSON の STRING タイプに対応します。 |
LONG | JSON の NUMBER タイプに対応します。 |
DOUBLE | JSON の NUMBER タイプに対応します。 |
フィールド
次の表は、Kafkaに書き込まれるメッセージ内の各フィールドの意味を示しています。
レベル 1 フィールド | レベル 2 フィールド | 説明 |
schema | dataColumn | カラムの名前とタイプ。値は JSONArray タイプです。 dataColumn は、ソースで更新されたカラムの名前とタイプを記録します。変更操作は、データの追加、削除、変更、またはソースのテーブルスキーマの変更です。
|
primaryKey | プライマリキー情報。値は List タイプです。 pk: 主キーの名前。 | |
source | ソースデータベースまたはソーステーブルに関する情報。値は Object タイプです。
| |
payload | before | 変更前のデータ。値は JSONObject タイプです。たとえば、MySQL データベースがソースで、このソースのデータが更新された場合、before フィールドには更新前のデータが記録されます。
|
after | 変更後のデータ。 after フィールドは、before フィールドと同じデータ形式で変更後のデータを記録します。 | |
sequenceId | StreamX によって増分データとすべてのデータがマージされた後に生成される各データレコードの一意のシーケンス ID。値は STRING タイプです。 説明 データ更新メッセージがソースから読み取られた後、更新前と更新後の 2 つの書き込みレコードが生成されます。 2 つの書き込みレコードは同じシーケンス ID を持ちます。 | |
scn | ソースの SCN。このフィールドは、ソースが Oracle データベースの場合に有効です。 | |
op | ソースのデータに対して実行される操作のタイプ。有効な値:
| |
timestamp | データレコードのタイムスタンプ。値は JSONObject タイプです。
| |
ddl | このフィールドは、ソースのテーブルのスキーマが変更された場合にのみ指定されます。 DDL 操作 (データの追加、削除、変更など) がソースで実行された場合、値は NULL です。
| |
version | 該当なし | JSON 形式のデータのバージョン。 |