This topic describes the formats of messages that are written to Kafka and the meaning of each field in the messages.

Background information

A node that synchronizes all the data in a data source to Kafka writes the data that DataWorks reads from a data source to Kafka topics in the JSON format. A message that is written to Kafka contains the column change information and the status of the data before and after the change. To ensure that a consumer knows the progress of a sync node when it consumes Kafka messages, the sync node periodically generates a heartbeat message that contains the op field with the MHEARTBEAT value and writes the message to Kafka topics. For more information about the formats of messages that are written to Kafka, see Format of a Kafka message, Format of a heartbeat message generated by a sync node, and Format of a Kafka message for data change in a data source. For more information about the type and meaning of each field in a message, see Field types and Fields.

Format of a Kafka message

Format of a message that is written to Kafka:

{
    "schema": { // The metadata change information. Only column names and column types are specified.
        "dataColumn": [// The column change information. The data in a destination topic is updated based on the information.
            {
                "name": "id",
                "type": "LONG"
            },
            {
                "name": "name",
                "type": "STRING"
            },
            {
                "name": "binData",
                "type": "BYTES"
            },
            {
                "name": "ts",
                "type": "DATE"
            },
            {
              "name":"rowid",// If an Oracle data source is used, rowid is added as a column.
              "type":"STRING"
            }
        ],
        "primaryKey": [
            "pkName1",
            "pkName2"
        ],
        "source": {
            "dbType": "mysql",
            "dbVersion": "1.0.0",
            "dbName": "myDatabase",
            "schemaName": "mySchema",
            "tableName": "tableName"
        }
    },
    "payload": {
        "before": {
            "dataColumn":{
                "id": 111,
                "name":"scooter",
                "binData": "[base64 string]",
                "ts": 1590315269000,
                "rowid": "AAIUMPAAFAACxExAAE"// The ID of a row in the Oracle data source. The value is of the String type.
            }
        },
        "after": {
            "dataColumn":{
                "id": 222,
                "name":"donald",
                "binData": "[base64 string]",
                "ts": 1590315269000,
                "rowid": "AAIUMPAAFAACxExAAE"// The ID of a row in the Oracle data source. The value is of the String type.
            }
        },
        "sequenceId":"XXX",// The unique sequence ID of each data record that is generated after the incremental data and all data are merged. The value is of the String type.
        "scn":"xxxx",// The system change number (SCN) of the Oracle data source. The value is of the String type.
        "op": "INSERT/UPDATE_BEFOR/UPDATE_AFTER/DELETE/TRANSACTION_BEGIN/TRANSACTION_END/CREATE/ALTER/ERASE/QUERY/TRUNCATE/RENAME/CINDEX/DINDEX/GTID/XACOMMIT/XAROLLBACK/MHEARTBEAT...",// The value is case-sensitive.
        "timestamp": {
            "eventTime": 1,// Required. The time when data changes in the source database. The value is a 13-bit timestamp in milliseconds.
            "systemTime": 2,// Optional. The time when the sync node reads the change message. The value is a 13-bit timestamp in milliseconds.
            "checkpointTime": 3,// Optional. The specified time when the synchronization offset is reset. The value is a 13-bit timestamp in milliseconds and usually equals the value of the eventTime field.
        },
        "ddl": {
            "text": "ADD COLUMN ...",
            "ddlMeta": "[SQLStatement serialized binary, expressed in base64 string]"
        }
    },
    "version":"1.0.0"
}

Format of a heartbeat message generated by a sync node

{
    "schema": {
        "dataColumn": null,
        "primaryKey": null,
        "source": null
    },
    "payload": {
        "before": null,
        "after": null,
        "sequenceId": null,
        "timestamp": {
            "eventTime": 1620457659000,
            "checkpointTime": 1620457659000
        },
        "op": "MHEARTBEAT",
        "ddl": null
    },
    "version": "0.0.1"
}

Format of a Kafka message for data change in a data source

  • Format of a Kafka message for data insertion into a data source:
    {
        "schema": {
            "dataColumn": [
                {
                    "name": "name",
                    "type": "STRING"
                },
                {
                    "name": "job",
                    "type": "STRING"
                },
                {
                    "name": "sex",
                    "type": "STRING"
                },
                {
                    "name": "#alibaba_rds_row_id#",
                    "type": "LONG"
                }
            ],
            "primaryKey": null,
            "source": {
                "dbType": "MySQL",
                "dbName": "pkset_test",
                "tableName": "pkset_test_no_pk"
            }
        },
        "payload": {
            "before": null,
            "after": {
                "dataColumn": {
                    "name": "name11",
                    "job": "job11",
                    "sex": "man",
                    "#alibaba_rds_row_id#": 15
                }
            },
            "sequenceId": "1620457642589000000",
            "timestamp": {
                "eventTime": 1620457896000,
                "systemTime": 1620457896977,
                "checkpointTime": 1620457896000
            },
            "op": "INSERT",
            "ddl": null
        },
        "version": "0.0.1"
    }
  • Format of a Kafka message for data update in a data source:
    Two Kafka messages are generated for a data update in a data source. One Kafka message describes the status of data before the update and the other Kafka message describes the status of data after the update. The following sample messages show the formats:
    • Format of the Kafka message that describes the status of data before the change:
      {
          "schema": {
              "dataColumn": [
                  {
                      "name": "name",
                      "type": "STRING"
                  },
                  {
                      "name": "job",
                      "type": "STRING"
                  },
                  {
                      "name": "sex",
                      "type": "STRING"
                  },
                  {
                      "name": "#alibaba_rds_row_id#",
                      "type": "LONG"
                  }
              ],
              "primaryKey": null,
              "source": {
                  "dbType": "MySQL",
                  "dbName": "pkset_test",
                  "tableName": "pkset_test_no_pk"
              }
          },
          "payload": {
              "before": {
                  "dataColumn": {
                      "name": "name11",
                      "job": "job11",
                      "sex": "man",
                      "#alibaba_rds_row_id#": 15
                  }
              },
              "after": null,
              "sequenceId": "1620457642589000001",
              "timestamp": {
                  "eventTime": 1620458077000,
                  "systemTime": 1620458077779,
                  "checkpointTime": 1620458077000
              },
              "op": "UPDATE_BEFOR",
              "ddl": null
          },
          "version": "0.0.1"
      }
    • Format of the Kafka message that describes the status of data after the change:
      {
          "schema": {
              "dataColumn": [
                  {
                      "name": "name",
                      "type": "STRING"
                  },
                  {
                      "name": "job",
                      "type": "STRING"
                  },
                  {
                      "name": "sex",
                      "type": "STRING"
                  },
                  {
                      "name": "#alibaba_rds_row_id#",
                      "type": "LONG"
                  }
              ],
              "primaryKey": null,
              "source": {
                  "dbType": "MySQL",
                  "dbName": "pkset_test",
                  "tableName": "pkset_test_no_pk"
              }
          },
          "payload": {
              "before": null,
              "after": {
                  "dataColumn": {
                      "name": "name11",
                      "job": "job11",
                      "sex": "woman",
                      "#alibaba_rds_row_id#": 15
                  }
              },
              "sequenceId": "1620457642589000001",
              "timestamp": {
                  "eventTime": 1620458077000,
                  "systemTime": 1620458077779,
                  "checkpointTime": 1620458077000
              },
              "op": "UPDATE_AFTER",
              "ddl": null
          },
          "version": "0.0.1"
      }
  • Format of the Kafka message for data deletion from a data source:
    {
        "schema": {
            "dataColumn": [
                {
                    "name": "name",
                    "type": "STRING"
                },
                {
                    "name": "job",
                    "type": "STRING"
                },
                {
                    "name": "sex",
                    "type": "STRING"
                },
                {
                    "name": "#alibaba_rds_row_id#",
                    "type": "LONG"
                }
            ],
            "primaryKey": null,
            "source": {
                "dbType": "MySQL",
                "dbName": "pkset_test",
                "tableName": "pkset_test_no_pk"
            }
        },
        "payload": {
            "before": {
                "dataColumn": {
                    "name": "name11",
                    "job": "job11",
                    "sex": "woman",
                    "#alibaba_rds_row_id#": 15
                }
            },
            "after": null,
            "sequenceId": "1620457642589000002",
            "timestamp": {
                "eventTime": 1620458266000,
                "systemTime": 1620458266101,
                "checkpointTime": 1620458266000
            },
            "op": "DELETE",
            "ddl": null
        },
        "version": "0.0.1"
    }

Field types

The data that you read from a data source is mapped to the BOOLEAN, DOUBLE, DATE, BYTES, LONG, and STRING types and is written to Kafka topics in different JSON formats.
Field type Description
BOOLEAN Corresponds to the Boolean type in JSON. Valid values: true and false.
DATE Corresponds to the Number type in JSON. The value is a 13-digit timestamp in milliseconds.
BYTES Corresponds to the String type in JSON. Before data is written to Kafka, the byte arrays are encoded in Base64 and converted into strings. A consumer needs to decode the Base64-encoded strings before it consumes the strings. Base64.getEncoder().encodeToString(text.getBytes("UTF-8")) is used for encoding and Base64.getDecoder().decode(encodedText)) is used for decoding.
STRING Corresponds to the String type in JSON.
LONG Corresponds to the Number type in JSON.
DOUBLE Corresponds to the Number type in JSON.

Fields

The following table describes the meaning of each field in a message that is written to Kafka.

Level-1 field Level-2 field Description
schema dataColumn The names and types of columns. The value is of the JSONArray type. dataColumn records the names and types of columns that are updated in a data source. A change operation can be data addition, deletion, or modification, or table schema change in a data source.
  • name: the name of the column.
  • type: the type of the column.
primaryKey The primary key information. The value is of the List type.

pk: the name of the primary key.

source The information about a data source or a source table. The value is of the Object type.
  • dbType: the type of the data source. The value is of the String type.
  • dbVersion: the version of the data source. The value is of the String type.
  • dbName: the name of the data source. The value is of the String type.
  • schemaName: the name of the schema. This field is specific for data sources, such as PostgreSQL and SQL Server. The value is of the String type.
  • tableName: the name of the source table. The value is of the String type.
payload before The data before a change. The value is of the JSONObject type. For example, if a MySQL database is the data source and data in this data source is updated, the before field records the data before the update.
  • After a data update or deletion message is read from a data source, the before field is specified in a write record.
  • dataColumn: the data information. The value is of the JSONObject type. The field value is in the Column name:Column type format. The column name is a string, and the column type can be BOOLEAN, DOUBLE, DATE, BYTES, LONG, or STRING.
after The data after a change. The after field records the data after a change in the same data format as that of the before field.
sequenceId The unique sequence ID of each data record that is generated by StreamX after the incremental data and all data are merged. The value is of the String type.
Note After a data update message is read from a data source, two write records are generated: update before and update after. The two write records have the same sequence ID.
scn The SCN of a data source. This field is valid when the data source is an Oracle database.
op The type of the operation that is performed on the data in a data source. Valid values:
  • INSERT: inserts data.
  • UPDATE_BEFOR: updates data (before).
  • UPDATE_AFTER: updates data (after).
  • DELETE: deletes data.
  • TRANSACTION_BEGIN: starts a data source transaction.
  • TRANSACTION_END: terminates a data source transaction.
  • CREATE: creates a table in a data source.
  • ALTER: modifies a table in a data source.
  • QUERY: queries data changes in a database by executing the SQL statements that cause the data changes.
  • TRUNCATE: removes all rows from a table in a data source.
  • RENAME: renames a table in a data source.
  • CINDEX: creates an index.
  • DINDEX: deletes an index.
  • MHEARTBEAT: a heartbeat message which indicates that a sync node runs normally when no new data is generated in a data source.
timestamp The timestamp of a data record. The value is of the JSONObject type.
  • eventTime: the time when the data in the source database changes. The value is a 13-bit timestamp in milliseconds and is of the Long type.
  • systemTime: the time when the sync node reads the change message. The value is a 13-bit timestamp in milliseconds and is of the Long type.
  • checkpointTime: the specified time when the synchronization offset is reset. The value is a 13-bit timestamp in milliseconds and usually equals the value of the eventTime field. The value is of the Long type.
ddl This field is specified only when the schema of a table in a data source is changed. The value is null when an operation, such as data addition, deletion, or modification is performed in a data source.
  • text: indicates the text of a DDL statement in a data source. The value is of the String type.
  • ddlMeta: a Base64-encoded string that is obtained from a serialized Java object. The Java object records a DDL-based change. The value is of the String type.
version N/A The version of data in the JSON format.