This topic describes the formats of messages that are written to Kafka and the meaning of each field in the messages.

Background information

A node that synchronizes all data in a data source to Kafka writes the data that DataWorks reads from the data source to Kafka topics in the JSON format. A message that is written to Kafka contains the column change information and the status of the data before and after the change. To ensure that a consumer knows the progress of a synchronization node when it consumes Kafka messages, the synchronization node periodically generates a heartbeat message that contains the op field with the MHEARTBEAT value and writes the heartbeat message to Kafka topics. For more information about the formats of messages that are written to Kafka, see Format of a Kafka message, Format of a heartbeat message generated by a synchronization node, and Format of a Kafka message for data change in a source. For more information about the type and meaning of each field in a message, see Field types and Fields.

Format of a Kafka message

Format of a message that is written to Kafka:

{
    "schema": { // The metadata change information. Only column names and column types are specified.
        "dataColumn": [// The column change information. The data in a destination topic is updated based on the information.
            {
                "name": "id",
                "type": "LONG"
            },
            {
                "name": "name",
                "type": "STRING"
            },
            {
                "name": "binData",
                "type": "BYTES"
            },
            {
                "name": "ts",
                "type": "DATE"
            },
            {
              "name":"rowid",// If an Oracle data source is used, rowid is added as a column.
              "type":"STRING"
            }
        ],
        "primaryKey": [
            "pkName1",
            "pkName2"
        ],
        "source": {
            "dbType": "mysql",
            "dbVersion": "1.0.0",
            "dbName": "myDatabase",
            "schemaName": "mySchema",
            "tableName": "tableName"
        }
    },
    "payload": {
        "before": {
            "dataColumn":{
                "id": 111,
                "name":"scooter",
                "binData": "[base64 string]",
                "ts": 1590315269000,
                "rowid": "AAIUMPAAFAACxExAAE"// The ID of a row in the Oracle data source. The value is of the STRING type.
            }
        },
        "after": {
            "dataColumn":{
                "id": 222,
                "name":"donald",
                "binData": "[base64 string]",
                "ts": 1590315269000,
                "rowid": "AAIUMPAAFAACxExAAE"// The ID of a row in the Oracle data source. The value is of the STRING type.
            }
        },
        "sequenceId":"XXX",// The unique sequence ID of each data record that is generated after the incremental data and all data are merged. The value is of the STRING type.
        "scn":"xxxx",// The system change number (SCN) of the Oracle data source. The value is of the STRING type.
        "op": "INSERT/UPDATE_BEFOR/UPDATE_AFTER/DELETE/TRANSACTION_BEGIN/TRANSACTION_END/CREATE/ALTER/ERASE/QUERY/TRUNCATE/RENAME/CINDEX/DINDEX/GTID/XACOMMIT/XAROLLBACK/MHEARTBEAT...",// The operation that is performed. The value of the parameter is case-sensitive.
        "timestamp": {
            "eventTime": 1,// Required. The time when the data in the source database changes. The value is a 13-bit timestamp in milliseconds.
            "systemTime": 2,// Optional. The time when the synchronization node reads the change message. The value is a 13-bit timestamp in milliseconds.
            "checkpointTime": 3// Optional. The specified time when the synchronization offset is reset. The value is a 13-bit timestamp in milliseconds and equals the value of the eventTime field in most cases.
        },
        "ddl": {
            "text": "ADD COLUMN ...",
            "ddlMeta": "[SQLStatement serialized binary, expressed in base64 string]"
        }
    },
    "version":"1.0.0"
}

Format of a heartbeat message generated by a synchronization node

{
    "schema": {
        "dataColumn": null,
        "primaryKey": null,
        "source": null
    },
    "payload": {
        "before": null,
        "after": null,
        "sequenceId": null,
        "timestamp": {
            "eventTime": 1620457659000,
            "checkpointTime": 1620457659000
        },
        "op": "MHEARTBEAT",
        "ddl": null
    },
    "version": "0.0.1"
}

Format of a Kafka message for data change in a source

  • Format of a Kafka message for data insertion into a source:
    {
        "schema": {
            "dataColumn": [
                {
                    "name": "name",
                    "type": "STRING"
                },
                {
                    "name": "job",
                    "type": "STRING"
                },
                {
                    "name": "sex",
                    "type": "STRING"
                },
                {
                    "name": "#alibaba_rds_row_id#",
                    "type": "LONG"
                }
            ],
            "primaryKey": null,
            "source": {
                "dbType": "MySQL",
                "dbName": "pkset_test",
                "tableName": "pkset_test_no_pk"
            }
        },
        "payload": {
            "before": null,
            "after": {
                "dataColumn": {
                    "name": "name11",
                    "job": "job11",
                    "sex": "man",
                    "#alibaba_rds_row_id#": 15
                }
            },
            "sequenceId": "1620457642589000000",
            "timestamp": {
                "eventTime": 1620457896000,
                "systemTime": 1620457896977,
                "checkpointTime": 1620457896000
            },
            "op": "INSERT",
            "ddl": null
        },
        "version": "0.0.1"
    }
  • Format of a Kafka message for data update in a source:
    • If When one record in the source is updated, one Kafka record is generated is not selected, two Kafka messages are generated for a data update in a source. One Kafka message describes the status of data before the update and the other Kafka message describes the status of data after the update. The following sample messages show the formats:
      Format of the Kafka message that describes the status of data before the change:
      {
          "schema": {
              "dataColumn": [
                  {
                      "name": "name",
                      "type": "STRING"
                  },
                  {
                      "name": "job",
                      "type": "STRING"
                  },
                  {
                      "name": "sex",
                      "type": "STRING"
                  },
                  {
                      "name": "#alibaba_rds_row_id#",
                      "type": "LONG"
                  }
              ],
              "primaryKey": null,
              "source": {
                  "dbType": "MySQL",
                  "dbName": "pkset_test",
                  "tableName": "pkset_test_no_pk"
              }
          },
          "payload": {
              "before": {
                  "dataColumn": {
                      "name": "name11",
                      "job": "job11",
                      "sex": "man",
                      "#alibaba_rds_row_id#": 15
                  }
              },
              "after": null,
              "sequenceId": "1620457642589000001",
              "timestamp": {
                  "eventTime": 1620458077000,
                  "systemTime": 1620458077779,
                  "checkpointTime": 1620458077000
              },
              "op": "UPDATE_BEFOR",
              "ddl": null
          },
          "version": "0.0.1"
      }
      Format of the Kafka message that describes the status of data after the change:
      {
          "schema": {
              "dataColumn": [
                  {
                      "name": "name",
                      "type": "STRING"
                  },
                  {
                      "name": "job",
                      "type": "STRING"
                  },
                  {
                      "name": "sex",
                      "type": "STRING"
                  },
                  {
                      "name": "#alibaba_rds_row_id#",
                      "type": "LONG"
                  }
              ],
              "primaryKey": null,
              "source": {
                  "dbType": "MySQL",
                  "dbName": "pkset_test",
                  "tableName": "pkset_test_no_pk"
              }
          },
          "payload": {
              "before": null,
              "after": {
                  "dataColumn": {
                      "name": "name11",
                      "job": "job11",
                      "sex": "woman",
                      "#alibaba_rds_row_id#": 15
                  }
              },
              "sequenceId": "1620457642589000001",
              "timestamp": {
                  "eventTime": 1620458077000,
                  "systemTime": 1620458077779,
                  "checkpointTime": 1620458077000
              },
              "op": "UPDATE_AFTER",
              "ddl": null
          },
          "version": "0.0.1"
      }
    • If When one record in the source is updated, one Kafka record is generated is selected, only one Kafka message is generated for a data update in a source. The Kafka message describes the status of data before and after the update. The following sample messages show the formats:
      {
          "schema": {
              "dataColumn": [
                  {
                      "name": "name",
                      "type": "STRING"
                  },
                  {
                      "name": "job",
                      "type": "STRING"
                  },
                  {
                      "name": "sex",
                      "type": "STRING"
                  },
                  {
                      "name": "#alibaba_rds_row_id#",
                      "type": "LONG"
                  }
              ],
              "primaryKey": null,
              "source": {
                  "dbType": "MySQL",
                  "dbName": "pkset_test",
                  "tableName": "pkset_test_no_pk"
              }
          },
          "payload": {
              "before": {
                  "dataColumn": {
                      "name": "name11",
                      "job": "job11",
                      "sex": "man",
                      "#alibaba_rds_row_id#": 15
                  }
              },
              "after": {
                  "dataColumn": {
                      "name": "name11",
                      "job": "job11",
                      "sex": "woman",
                      "#alibaba_rds_row_id#": 15
                  }
              },
              "sequenceId": "1620457642589000001",
              "timestamp": {
                  "eventTime": 1620458077000,
                  "systemTime": 1620458077779,
                  "checkpointTime": 1620458077000
              },
              "op": "UPDATE_AFTER",
              "ddl": null
          },
          "version": "0.0.1"
      }
  • Format of the Kafka message for data deletion from a source:
    {
        "schema": {
            "dataColumn": [
                {
                    "name": "name",
                    "type": "STRING"
                },
                {
                    "name": "job",
                    "type": "STRING"
                },
                {
                    "name": "sex",
                    "type": "STRING"
                },
                {
                    "name": "#alibaba_rds_row_id#",
                    "type": "LONG"
                }
            ],
            "primaryKey": null,
            "source": {
                "dbType": "MySQL",
                "dbName": "pkset_test",
                "tableName": "pkset_test_no_pk"
            }
        },
        "payload": {
            "before": {
                "dataColumn": {
                    "name": "name11",
                    "job": "job11",
                    "sex": "woman",
                    "#alibaba_rds_row_id#": 15
                }
            },
            "after": null,
            "sequenceId": "1620457642589000002",
            "timestamp": {
                "eventTime": 1620458266000,
                "systemTime": 1620458266101,
                "checkpointTime": 1620458266000
            },
            "op": "DELETE",
            "ddl": null
        },
        "version": "0.0.1"
    }

Field types

The data that you read from a source is mapped to the BOOLEAN, DOUBLE, DATE, BYTES, LONG, and STRING types and is written to Kafka topics in different JSON formats.
Field typeDescription
BOOLEANCorresponds to the BOOLEAN type in JSON. Valid values: true and false.
DATECorresponds to the NUMBER type in JSON. The value is a 13-digit timestamp in milliseconds.
BYTESCorresponds to the STRING type in JSON. Before data is written to Kafka, the byte arrays are encoded in Base64 and converted into strings. A consumer needs to decode the Base64-encoded strings before it consumes the strings. Base64.getEncoder().encodeToString(text.getBytes("UTF-8")) is used for encoding and Base64.getDecoder().decode(encodedText)) is used for decoding.
STRINGCorresponds to the STRING type in JSON.
LONGCorresponds to the NUMBER type in JSON.
DOUBLECorresponds to the NUMBER type in JSON.

Fields

The following table describes the meaning of each field in a message that is written to Kafka.

Level-1 fieldLevel-2 fieldDescription
schemadataColumnThe names and types of columns. The value is of the JSONArray type. dataColumn records the names and types of columns that are updated in the source. A change operation can be data addition, deletion, or modification, or a table schema change in the source.
  • name: the name of the column.
  • type: the type of the column.
primaryKeyThe primary key information. The value is of the List type.

pk: the name of the primary key.

sourceThe information about the source database or source table. The value is of the Object type.
  • dbType: the type of the source. The value is of the STRING type.
  • dbVersion: the version of the source. The value is of the STRING type.
  • dbName: the name of the source. The value is of the STRING type.
  • schemaName: the name of the schema. This field is specific for sources, such as PostgreSQL and SQL Server. The value is of the STRING type.
  • tableName: the name of the source table. The value is of the STRING type.
payloadbeforeThe data before a change. The value is of the JSONObject type. For example, if a MySQL database is the source and data in this source is updated, the before field records the data before the update.
  • After a data update or deletion message is read from the source, the before field is specified in a write record.
  • dataColumn: the column information. The value is of the JSONObject type. The field value is in the Column name:Column type format. The column name is a string, and the column type can be BOOLEAN, DOUBLE, DATE, BYTES, LONG, or STRING.
afterThe data after a change. The after field records the data after a change in the same data format as that of the before field.
sequenceIdThe unique sequence ID of each data record that is generated by StreamX after the incremental data and all data are merged. The value is of the STRING type.
Note After a data update message is read from the source, two write records are generated: update before and update after. The two write records have the same sequence ID.
scnThe SCN of the source. This field is valid when the source is an Oracle database.
opThe type of the operation that is performed on the data in the source. Valid values:
  • INSERT: inserts data.
  • UPDATE_BEFOR: updates data (before).
  • UPDATE_AFTER: updates data (after).
  • DELETE: deletes data.
  • TRANSACTION_BEGIN: starts a database transaction.
  • TRANSACTION_END: terminates a database transaction.
  • CREATE: creates a table in the source.
  • ALTER: modifies a table in the source.
  • QUERY: queries data changes in the source by executing the SQL statements that cause the data changes.
  • TRUNCATE: removes all rows from a table in the source.
  • RENAME: renames a table in the source.
  • CINDEX: creates an index.
  • DINDEX: deletes an index.
  • MHEARTBEAT: a heartbeat message. The message indicates that a synchronization node runs as expected when no new data is generated in the source.
timestampThe timestamp of a data record. The value is of the JSONObject type.
  • eventTime: the time when the data in the source changes. The value is a 13-digit timestamp in milliseconds and is of the LONG type.
  • systemTime: the time when the synchronization node reads the change message. The value is a 13-digit timestamp in milliseconds and is of the LONG type.
  • checkpointTime: the specified time when the synchronization offset is reset. The value is a 13-digit timestamp in milliseconds and is of the LONG type. In most cases, the value of this field is equal to the value of the eventTime field.
ddlThis field is specified only if the schema of a table in the source is changed. The value is NULL when a DDL operation, such as data addition, deletion, or modification is performed in the source.
  • text: the text of a DDL statement in the source. The value is of the STRING type.
  • ddlMeta: a Base64-encoded string that is obtained from a serialized Java object. The Java object records a DDL-based change. The value is of the STRING type.
versionN/AThe version of data in the JSON format.