すべてのプロダクト
Search
ドキュメントセンター

DataWorks:付録: メッセージ形式

最終更新日:Jul 08, 2025

このトピックでは、Kafkaに書き込まれるメッセージの形式と、メッセージ内の各フィールドの意味について説明します。

背景情報

データソース内のすべてのデータをKafkaに同期するノードは、DataWorksがデータソースから読み取ったデータをJSON形式でKafkaトピックに書き込みます。 Kafkaに書き込まれるメッセージには、列の変更情報と、変更前後のデータの状態が含まれています。 コンシューマーがKafkaメッセージを消費するときに同期ノードの進行状況を把握できるように、同期ノードは定期的に、値がMHEARTBEATのopフィールドを含むハートビートメッセージを生成し、Kafkaトピックに書き込みます。 Kafkaに書き込まれるメッセージの形式の詳細については、Kafkaメッセージの形式同期ノードによって生成されるハートビートメッセージの形式、およびソースのデータ変更に対するKafkaメッセージの形式をご参照ください。 メッセージ内の各フィールドのタイプと意味の詳細については、フィールドタイプおよびフィールドをご参照ください。

Kafkaメッセージの形式

Kafkaに書き込まれるメッセージの形式:

{
    "schema": { // メタデータの変更情報。カラム名とカラムタイプのみが指定されます。
        "dataColumn": [// カラムの変更情報。デスティネーション トピックのデータは、この情報に基づいて更新されます。
            {
                "name": "id",
                "type": "LONG"
            },
            {
                "name": "name",
                "type": "STRING"
            },
            {
                "name": "binData",
                "type": "BYTES"
            },
            {
                "name": "ts",
                "type": "DATE"
            },
            {
              "name":"rowid",// Oracle データソースを使用する場合、rowid がカラムとして追加されます。
              "type":"STRING"
            }
        ],
        "primaryKey": [
            "pkName1",
            "pkName2"
        ],
        "source": {
            "dbType": "mysql",
            "dbVersion": "1.0.0",
            "dbName": "myDatabase",
            "schemaName": "mySchema",
            "tableName": "tableName"
        }
    },
    "payload": {
        "before": {
            "dataColumn":{
                "id": 111,
                "name":"scooter",
                "binData": "[base64 string]", // base64 文字列
                "ts": 1590315269000,
                "rowid": "AAIUMPAAFAACxExAAE"// Oracle データソースの行の ID。値は STRING タイプです。
            }
        },
        "after": {
            "dataColumn":{
                "id": 222,
                "name":"donald",
                "binData": "[base64 string]", // base64 文字列
                "ts": 1590315269000,
                "rowid": "AAIUMPAAFAACxExAAE"// Oracle データソースの行の ID。値は STRING タイプです。
            }
        },
        "sequenceId":"XXX",// 増分データとすべてのデータがマージされた後に生成される各データレコードの一意のシーケンス ID。値は STRING タイプです。
        "scn":"xxxx",// Oracle データソースのシステム変更番号 (SCN)。値は STRING タイプです。
        "op": "INSERT/UPDATE_BEFOR/UPDATE_AFTER/DELETE/TRANSACTION_BEGIN/TRANSACTION_END/CREATE/ALTER/ERASE/QUERY/TRUNCATE/RENAME/CINDEX/DINDEX/GTID/XACOMMIT/XAROLLBACK/MHEARTBEAT...",// 実行される操作。パラメータの値は大文字と小文字が区別されます。
        "timestamp": {
            "eventTime": 1,// 必須。ソースデータベースのデータが変更された時刻。値はミリ秒単位の 13 桁のタイムスタンプです。
            "systemTime": 2,// オプション。同期ノードが変更メッセージを読み取った時刻。値はミリ秒単位の 13 桁のタイムスタンプです。
            "checkpointTime": 3// オプション。同期オフセットがリセットされる指定時刻。値はミリ秒単位の 13 桁のタイムスタンプで、ほとんどの場合、eventTime フィールドの値と同じです。
        },
        "ddl": {
            "text": "ADD COLUMN ...",
            "ddlMeta": "[SQLStatement シリアル化されたバイナリ、base64 文字列で表現]" // SQLStatement シリアル化されたバイナリ、base64 文字列で表現
        }
    },
    "version":"1.0.0"
}
説明

フィールドのタイプと説明の詳細については、「フィールドタイプ」および「フィールド」をご参照ください。

同期ノードによって生成されるハートビートメッセージの形式

{
    "schema": {
        "dataColumn": null,
        "primaryKey": null,
        "source": null
    },
    "payload": {
        "before": null,
        "after": null,
        "sequenceId": null,
        "timestamp": {
            "eventTime": 1620457659000,
            "checkpointTime": 1620457659000
        },
        "op": "MHEARTBEAT",
        "ddl": null
    },
    "version": "0.0.1"
}
説明

フィールドのタイプと説明の詳細については、「フィールドタイプ」および「フィールド」をご参照ください。

ソースのデータ変更に対するKafkaメッセージの形式

  • ソースへのデータ挿入の Kafka メッセージの形式:

    {
        "schema": {
            "dataColumn": [
                {
                    "name": "name",
                    "type": "STRING"
                },
                {
                    "name": "job",
                    "type": "STRING"
                },
                {
                    "name": "sex",
                    "type": "STRING"
                },
                {
                    "name": "#alibaba_rds_row_id#",
                    "type": "LONG"
                }
            ],
            "primaryKey": null,
            "source": {
                "dbType": "MySQL",
                "dbName": "pkset_test",
                "tableName": "pkset_test_no_pk"
            }
        },
        "payload": {
            "before": null,
            "after": {
                "dataColumn": {
                    "name": "name11",
                    "job": "job11",
                    "sex": "man",
                    "#alibaba_rds_row_id#": 15
                }
            },
            "sequenceId": "1620457642589000000",
            "timestamp": {
                "eventTime": 1620457896000,
                "systemTime": 1620457896977,
                "checkpointTime": 1620457896000
            },
            "op": "INSERT",
            "ddl": null
        },
        "version": "0.0.1"
    }
  • ソースのデータ更新の Kafka メッセージの形式:

    • [ソース内の 1 つのレコードが更新されると、1 つの Kafka レコードが生成されます] が選択されていない場合、ソースのデータ更新に対して 2 つの Kafka メッセージが生成されます。1 つの Kafka メッセージは更新前のデータの状態を表し、もう 1 つの Kafka メッセージは更新後のデータの状態を表します。次のサンプルメッセージは、形式を示しています。

      変更前のデータの状態を表す Kafka メッセージの形式:

      {
          "schema": {
              "dataColumn": [
                  {
                      "name": "name",
                      "type": "STRING"
                  },
                  {
                      "name": "job",
                      "type": "STRING"
                  },
                  {
                      "name": "sex",
                      "type": "STRING"
                  },
                  {
                      "name": "#alibaba_rds_row_id#",
                      "type": "LONG"
                  }
              ],
              "primaryKey": null,
              "source": {
                  "dbType": "MySQL",
                  "dbName": "pkset_test",
                  "tableName": "pkset_test_no_pk"
              }
          },
          "payload": {
              "before": {
                  "dataColumn": {
                      "name": "name11",
                      "job": "job11",
                      "sex": "man",
                      "#alibaba_rds_row_id#": 15
                  }
              },
              "after": null,
              "sequenceId": "1620457642589000001",
              "timestamp": {
                  "eventTime": 1620458077000,
                  "systemTime": 1620458077779,
                  "checkpointTime": 1620458077000
              },
              "op": "UPDATE_BEFOR",
              "ddl": null
          },
          "version": "0.0.1"
      }

      変更後のデータの状態を表す Kafka メッセージの形式:

      {
          "schema": {
              "dataColumn": [
                  {
                      "name": "name",
                      "type": "STRING"
                  },
                  {
                      "name": "job",
                      "type": "STRING"
                  },
                  {
                      "name": "sex",
                      "type": "STRING"
                  },
                  {
                      "name": "#alibaba_rds_row_id#",
                      "type": "LONG"
                  }
              ],
              "primaryKey": null,
              "source": {
                  "dbType": "MySQL",
                  "dbName": "pkset_test",
                  "tableName": "pkset_test_no_pk"
              }
          },
          "payload": {
              "before": null,
              "after": {
                  "dataColumn": {
                      "name": "name11",
                      "job": "job11",
                      "sex": "woman",
                      "#alibaba_rds_row_id#": 15
                  }
              },
              "sequenceId": "1620457642589000001",
              "timestamp": {
                  "eventTime": 1620458077000,
                  "systemTime": 1620458077779,
                  "checkpointTime": 1620458077000
              },
              "op": "UPDATE_AFTER",
              "ddl": null
          },
          "version": "0.0.1"
      }
    • [ソース内の 1 つのレコードが更新されると、1 つの Kafka レコードが生成されます] が選択されている場合、ソースのデータ更新に対して 1 つの Kafka メッセージのみが生成されます。 Kafka メッセージは、更新前後のデータの状態を表します。次のサンプルメッセージは、形式を示しています。

      {
          "schema": {
              "dataColumn": [
                  {
                      "name": "name",
                      "type": "STRING"
                  },
                  {
                      "name": "job",
                      "type": "STRING"
                  },
                  {
                      "name": "sex",
                      "type": "STRING"
                  },
                  {
                      "name": "#alibaba_rds_row_id#",
                      "type": "LONG"
                  }
              ],
              "primaryKey": null,
              "source": {
                  "dbType": "MySQL",
                  "dbName": "pkset_test",
                  "tableName": "pkset_test_no_pk"
              }
          },
          "payload": {
              "before": {
                  "dataColumn": {
                      "name": "name11",
                      "job": "job11",
                      "sex": "man",
                      "#alibaba_rds_row_id#": 15
                  }
              },
              "after": {
                  "dataColumn": {
                      "name": "name11",
                      "job": "job11",
                      "sex": "woman",
                      "#alibaba_rds_row_id#": 15
                  }
              },
              "sequenceId": "1620457642589000001",
              "timestamp": {
                  "eventTime": 1620458077000,
                  "systemTime": 1620458077779,
                  "checkpointTime": 1620458077000
              },
              "op": "UPDATE_AFTER",
              "ddl": null
          },
          "version": "0.0.1"
      }
  • ソースからのデータ削除の Kafka メッセージの形式:

    {
        "schema": {
            "dataColumn": [
                {
                    "name": "name",
                    "type": "STRING"
                },
                {
                    "name": "job",
                    "type": "STRING"
                },
                {
                    "name": "sex",
                    "type": "STRING"
                },
                {
                    "name": "#alibaba_rds_row_id#",
                    "type": "LONG"
                }
            ],
            "primaryKey": null,
            "source": {
                "dbType": "MySQL",
                "dbName": "pkset_test",
                "tableName": "pkset_test_no_pk"
            }
        },
        "payload": {
            "before": {
                "dataColumn": {
                    "name": "name11",
                    "job": "job11",
                    "sex": "woman",
                    "#alibaba_rds_row_id#": 15
                }
            },
            "after": null,
            "sequenceId": "1620457642589000002",
            "timestamp": {
                "eventTime": 1620458266000,
                "systemTime": 1620458266101,
                "checkpointTime": 1620458266000
            },
            "op": "DELETE",
            "ddl": null
        },
        "version": "0.0.1"
    }
    説明

    フィールドのタイプと説明の詳細については、「フィールドタイプ」および「フィールド」をご参照ください。

単一テーブルのリアルタイム出力のメッセージ形式

リアルタイム単一テーブル同期タスクの出力先として Kafka を構成する場合は、Kafka に書き込まれる値の形式を確認する必要があります。有効な形式は、Canal CDC と JSON です。詳細については、「付録: 出力形式の説明」をご参照ください。

フィールドタイプ

ソースから読み取ったデータは、BOOLEAN、DOUBLE、DATE、BYTES、LONG、STRING の各タイプにマッピングされ、異なる JSON 形式で Kafka トピックに書き込まれます。

フィールドタイプ

説明

BOOLEAN

JSON の BOOLEAN タイプに対応します。有効な値: true および false。

DATE

JSON の NUMBER タイプに対応します。値はミリ秒単位の 13 桁のタイムスタンプです。

BYTES

JSON の STRING タイプに対応します。データが Kafka に書き込まれる前に、バイト配列は Base64 でエンコードされ、文字列に変換されます。コンシューマーは、文字列を消費する前に、Base64 でエンコードされた文字列をデコードする必要があります。エンコードには Base64.getEncoder().encodeToString(text.getBytes("UTF-8")) が使用され、デコードには Base64.getDecoder().decode(encodedText)) が使用されます。

STRING

JSON の STRING タイプに対応します。

LONG

JSON の NUMBER タイプに対応します。

DOUBLE

JSON の NUMBER タイプに対応します。

フィールド

次の表は、Kafkaに書き込まれるメッセージ内の各フィールドの意味を示しています。

レベル 1 フィールド

レベル 2 フィールド

説明

schema

dataColumn

カラムの名前とタイプ。値は JSONArray タイプです。 dataColumn は、ソースで更新されたカラムの名前とタイプを記録します。変更操作は、データの追加、削除、変更、またはソースのテーブルスキーマの変更です。

  • name: カラムの名前。

  • type: カラムのタイプ。

primaryKey

プライマリキー情報。値は List タイプです。

pk: 主キーの名前。

source

ソースデータベースまたはソーステーブルに関する情報。値は Object タイプです。

  • dbType: ソースのタイプ。値は STRING タイプです。

  • dbVersion: ソースのバージョン。値は STRING タイプです。

  • dbName: ソースの名前。値は STRING タイプです。

  • schemaName: スキーマの名前。このフィールドは、PostgreSQL や SQL Server などのソースに固有です。値は STRING タイプです。

  • tableName: ソーステーブルの名前。値は STRING タイプです。

payload

before

変更前のデータ。値は JSONObject タイプです。たとえば、MySQL データベースがソースで、このソースのデータが更新された場合、before フィールドには更新前のデータが記録されます。

  • データ更新または削除メッセージがソースから読み取られた後、before フィールドは書き込みレコードに指定されます。

  • dataColumn: カラム情報。値は JSONObject タイプです。フィールド値は カラム名: カラムタイプ 形式です。カラム名は文字列で、カラムタイプは BOOLEAN、DOUBLE、DATE、BYTES、LONG、または STRING です。

after

変更後のデータ。 after フィールドは、before フィールドと同じデータ形式で変更後のデータを記録します。

sequenceId

StreamX によって増分データとすべてのデータがマージされた後に生成される各データレコードの一意のシーケンス ID。値は STRING タイプです。

説明

データ更新メッセージがソースから読み取られた後、更新前と更新後の 2 つの書き込みレコードが生成されます。 2 つの書き込みレコードは同じシーケンス ID を持ちます。

scn

ソースの SCN。このフィールドは、ソースが Oracle データベースの場合に有効です。

op

ソースのデータに対して実行される操作のタイプ。有効な値:

  • INSERT: データを挿入します。

  • UPDATE_BEFOR: データを更新します (更新前)。

  • UPDATE_AFTER: データを更新します (更新後)。

  • DELETE: データを削除します。

  • TRANSACTION_BEGIN: データベーストランザクションを開始します。

  • TRANSACTION_END: データベーストランザクションを終了します。

  • CREATE: ソースにテーブルを作成します。

  • ALTER: ソースのテーブルを変更します。

  • QUERY: データ変更を引き起こす SQL 文を実行することにより、ソースのデータ変更をクエリします。

  • TRUNCATE: ソースのテーブルからすべての行を削除します。

  • RENAME: ソースのテーブルの名前を変更します。

  • CINDEX: インデックスを作成します。

  • DINDEX: インデックスを削除します。

  • MHEARTBEAT: ハートビートメッセージ。このメッセージは、ソースに新しいデータが生成されていないときに同期ノードが想定どおりに実行されていることを示します。

timestamp

データレコードのタイムスタンプ。値は JSONObject タイプです。

  • eventTime: ソースのデータが変更された時刻。値はミリ秒単位の 13 桁のタイムスタンプで、LONG タイプです。

  • systemTime: 同期ノードが変更メッセージを読み取った時刻。値はミリ秒単位の 13 桁のタイムスタンプで、LONG タイプです。

  • checkpointTime: 同期オフセットがリセットされる指定時刻。値はミリ秒単位の 13 桁のタイムスタンプで、LONG タイプです。ほとんどの場合、このフィールドの値は eventTime フィールドの値と同じです。

ddl

このフィールドは、ソースのテーブルのスキーマが変更された場合にのみ指定されます。 DDL 操作 (データの追加、削除、変更など) がソースで実行された場合、値は NULL です。

  • text: ソースの DDL 文のテキスト。値は STRING タイプです。

  • ddlMeta: シリアル化された Java オブジェクトから取得された Base64 エンコード文字列。 Java オブジェクトは、DDL ベースの変更を記録します。値は STRING タイプです。

version

該当なし

JSON 形式のデータのバージョン。