All Products
Search
Document Center

DataWorks:Appendix: DataHub message formats

Last Updated:Jun 25, 2026

This appendix describes the supported operations, sharding strategies, data formats, and message examples for different DataHub data types.

Supported operations for different data types

A topic is the basic unit for data subscription and publication in DataHub, representing a collection of streaming data. DataHub supports two data types: TUPLE and BLOB.

DataHub type

Write DML messages

Write upstream heartbeat messages

Write DDL messages

Source to topic mapping

Type

TUPLE

Supported

Not supported

Not supported

One table to one topic

DataHub-supported types

BLOB

Supported

Supported

Supported

One database (multiple tables) to one topic

BLOB binary data

  • TUPLE topics have a fixed schema that you cannot alter after creation. They are suitable for scenarios where the source table schema is stable and does not involve DDL operations like ADD COLUMN or DROP COLUMN. TUPLE topics do not support forwarding DDL messages or heartbeat messages from the source to downstream consumers. This one-to-one mapping can be inconvenient for downstream consumption if you have many source tables, as you must create a corresponding topic for each.

  • BLOB topics do not have a predefined schema and store only raw binary data, making them more flexible. They support forwarding DDL messages and heartbeat messages from the source to downstream consumers. They map multiple tables from a single database to one topic. This allows you to use one topic for all source tables, simplifying downstream consumption. This type is ideal for scenarios where DataHub acts as an intermediate message queue for full-database migration.

Sharding strategies for different data types

A shard is a concurrent channel for data transmission within a DataHub topic. While a single shard has a limited write throughput, you can use multiple shards to increase it. DataHub guarantees in-order consumption only within a single shard, not across multiple shards. To improve write performance with multiple shards while maintaining message order and preventing data skew, DataHub offers the following sharding strategies for TUPLE and BLOB data types.

Scenario

TUPLE

BLOB

With a primary key (including custom primary keys)

Shard by primary key

Shard by primary key

Ordering guarantee

Messages with the same primary key are processed in order

Messages with the same primary key are processed in order

Without a primary key

Random sharding

Shard by table name

Ordering guarantee

Ordering is not guaranteed

Messages from the same table are processed in order

Data formats

  • TUPLE

    The TUPLE format uses data types natively supported by DataHub. Data Integration adds several metadata columns when you create a topic. The schema for the TUPLE format contains both metadata columns and business data columns. Fields that start with an underscore, such as_sequence_id_ and_operation_type_, are metadata columns. The remaining fields are business data columns. The metadata columns include_sequence_id_, _excute_time_, _source_table_, _before_image_, and_after_image_.

    Parameter

    Description

    _sequence_id_

    A unique message ID of the STRING type, consisting of digits. The UPDATE_BEFOR and UPDATE_AFTER operations for the same update share a sequence ID.

    _excute_time_

    The time when the data was generated.

    _source_table_

    The name of the source table.

    _before_image_

    The pre-image. The value is Y for an UPDATE_BEFOR or DELETE operation, and N for an UPDATE_AFTER or INSERT operation.

    _after_image_

    The post-image. The value is N for an UPDATE_BEFOR or DELETE operation, and Y for an UPDATE_AFTER or INSERT operation.

    Example: The following table shows the data synchronized to DataHub after executing INSERT, UPDATE, and DELETE statements.

    _sequence_id_

    _operation_type_

    _excute_time_

    _before_image_

    _after_image_

    1649991610688000000

    I

    1649991726000

    N

    Y

    1649991610688000001

    U

    1649991756000

    Y

    N

    1649991610688000001

    U

    1649991756000

    N

    Y

    1649991610688000002

    D

    1649991774000

    Y

    N

  • BLOB

    A BLOB message is binary data created by converting a JSON string. The corresponding JSON format is as follows:

    {
        "schema": { // Metadata about the change, specifying only column names and types.
            "dataColumn": [ // Information about the changed data columns, used to update records in the destination table.
                {
                    "name": "id",
                    "type": "LONG"
                },
                {
                    "name": "name",
                    "type": "STRING"
                },
                {
                    "name": "binData",
                    "type": "BYTES"
                },
                {
                    "name": "ts",
                    "type": "DATE"
                }
            ],
            "primaryKey": [
                "pkName1",
                "pkName2"
            ],
            "source": {
                "dbType": "mysql",
                "dbVersion": "1.0.0",
                "dbName": "myDatabase",
                "schemaName": "mySchema",
                "tableName": "tableName"
            }
        },
        "payload": {
            "before": {
                "dataColumn":{
                    "id": 111,
                    "name":"scooter",
                    "binData": "[base64 string]",
                    "ts": 1590315269000
                }
            },
            "after": {
                "dataColumn":{
                    "id": 222,
                    "name":"donald",
                    "binData": "[base64 string]",
                    "ts": 1590315269000
                }
            },
              "sequenceId":XXX, // A string used for data ordering when merging full and incremental data.
            "op": "INSERT/UPDATE/DELETE/TRANSACTION_BEGIN/TRANSACTION_END/CREATE/ALTER/ERASE/QUERY/TRUNCATE/RENAME/CINDEX/DINDEX/GTID/XACOMMIT/XAROLLBACK/MHEARTBEAT...", // Case-sensitive.
            "timestamp": {
                "eventTime": 1, // Required. The time of the record change. A 13-digit timestamp with millisecond precision.
                "systemTime": 2, // Optional. Exists for some data sources like Oracle CDC.
                "checkpointTime": 3 // Optional. Included for some data sources like OceanBase.
            },
            "ddl": {
                "text": "ADD COLUMN ...",
                "ddlMeta": "[SQLStatement serialized binary, expressed in base64 string]"
            }
        },
        "version":"1.0.0"
    }
    • BLOB fields

      Important

      The data types for all fields in the message are defined by StreamX and include BOOLEAN, DOUBLE, DATE, BYTES, LONG, and STRING.

      BOOLEAN: The value is `true` or `false`.
      DATE: The value is a 13-digit integer representing a timestamp with millisecond precision.
      BYTES: Stores byte arrays as a Base64-encoded string.
      Use the `java.util.Base64` APIs for Base64 encoding and decoding:
      String text = "text123";
      // Encode
      Base64.getEncoder().encodeToString(text.getBytes("UTF-8"))
      // Decode
      Base64.getDecoder().decode(encodedText)
                                  

      Top-level element

      Second-level element

      Description

      schema

      dataColumn

      A JSONArray that contains the type information for data columns.dataColumn records all columns and their types in an upstream data change record. A change operation can be a data modification (such as insert, delete, or update) or a table structure modification.

      • name: The column name.

      • type: The column data type.

      primaryKey

      A list of strings that represent the primary key column names.

      pk: The primary key name.

      source

      An object that contains information about the source database or table.

      • dbType: A string representing the database type.

      • dbVersion: A string representing the database version.

      • dbName: A string representing the database name.

      • schemaName: A string representing the schema name, required for databases like PostgreSQL and SQL Server.

      • tableName: A string representing the table name.

      payload

      before

      A JSONObject that contains the pre-image of the data. For an UPDATE operation on a MySQL source, the before field stores the record's content before the update.

      • This field is populated when an update or delete message is read from the source.

      • dataColumn: A parameter of the JSONObject type that represents data information. The format is column name: column value. The column name is a string, and the column value depends on its data type: values of the BYTES type are represented as Base64 strings, values of the DATE type are represented as 13-digit timestamps of the long type, and values of other types are represented by their native types.

      after

      The post-image of the data. The format is the same as the before field.

      Note

      This field is required for UPDATE and INSERT operations.

      op

      The operation type. Valid values:

      • INSERT: A data insertion.

      • UPDATE_BEFOR: The pre-image of an update.

      • UPDATE_AFTER: The post-image of an update.

      • DELETE: A data deletion.

      • TRANSACTION_BEGIN: The start of a database transaction.

      • TRANSACTION_END: The end of a database transaction.

      • CREATE: A table creation.

      • ALTER: A table alteration.

      • QUERY: The original SQL for the database change.

      • TRUNCATE: A table truncation.

      • RENAME: A table rename.

      • CINDEX: An index creation.

      • DINDEX: An index deletion.

      • MHEARTBEAT: A heartbeat message that indicates the synchronization task is running normally even when there is no new data from the source.

      timestamp

      A JSONObject that contains timestamps related to this data record.

      • eventTime: A Long value that represents the time when the change occurred in the source database. It is a 13-digit timestamp with millisecond precision.

      • systemTime: A Long value that represents the time when the synchronization task processed this change message. It is a 13-digit timestamp with millisecond precision.

      • checkpointTime: A Long value representing the time used to reset the synchronization offset. This value is usually the same as eventTime. It is a 13-digit timestamp with millisecond precision.

      ddl

      This field is populated only for DDL operations that change the table structure. For DML operations such as data insertion, deletion, and modification, theddl field is null.

      • text: A String that contains the database DDL statement text.

      • ddlMeta: A String that contains the binary representation of a SQLStatement object, encoded in Base64. The SQLStatement object is generated by parsing the DDL statement with FastSQL.

        If you enable DDL support, the system passes the serialized SQLStatement object. The downstream component can then deserialize this object to reconstruct the DDL statement for the destination data source and apply the change.

      version

      N/A

      The version number of the format.

    • BLOB serialization

      In this JSON format, each message corresponds to a single JSONObject. The structure of this JSONObject, which can include nested objects and arrays, defines the message format.

      To serialize the message, convert the JSONObject to a string (for example, by using the toJSONString method from fastjson) and then convert the string to a byte array using the String.getBytes(Charsets.UTF_8) method.

Sample JSON messages

  • Insert:

    {
        "schema": {
            "dataColumn": [
                {
                    "name": "id",
                    "type": "LONG"
                },
                {
                    "name": "name",
                    "type": "STRING"
                },
                {
                    "name": "comment",
                    "type": "STRING"
                }
            ],
            "source": {
                "dbName": "example_db",
                "dbType": "MySQL",
                "tableName": "example_table_pk"
            },
            "primaryKey": [
                "id",
                "name"
            ]
        },
        "payload": {
            "op": "INSERT",
            "after": {
                "dataColumn": {
                    "name": "joe",
                    "comment": "comment",
                    "id": 1
                }
            },
            "sequenceId": "1605339516000000004",
            "timestamp": {
                "eventTime": 1605339932000,
                "systemTime": 1605339932736,
                "checkpointTime": 1605339932000
            }
        },
        "version": "0.0.1"
    }
  • Update before:

    {
        "schema": {
            "dataColumn": [
                {
                    "name": "id",
                    "type": "LONG"
                },
                {
                    "name": "name",
                    "type": "STRING"
                },
                {
                    "name": "comment",
                    "type": "STRING"
                }
            ],
            "source": {
                "dbName": "example_db",
                "dbType": "MySQL",
                "tableName": "example_table_pk"
            },
            "primaryKey": [
                "id",
                "name"
            ]
        },
        "payload": {
            "op": "UPDATE_BEFOR",
            "before": {
                "dataColumn": {
                    "name": "joe",
                    "comment": "comment",
                    "id": 1
                }
            },
            "sequenceId": "1605339516000000005",
            "timestamp": {
                "eventTime": 1605339934000,
                "systemTime": 1605339934951,
                "checkpointTime": 1605339934000
            }
        },
        "version": "0.0.1"
    }
  • Update after:

    {
        "schema": {
            "dataColumn": [
                {
                    "name": "id",
                    "type": "LONG"
                },
                {
                    "name": "name",
                    "type": "STRING"
                },
                {
                    "name": "comment",
                    "type": "STRING"
                }
            ],
            "source": {
                "dbName": "example_db",
                "dbType": "MySQL",
                "tableName": "example_table_pk"
            },
            "primaryKey": [
                "id",
                "name"
            ]
        },
        "payload": {
            "op": "UPDATE_AFTER",
            "after": {
                "dataColumn": {
                    "name": "joe",
                    "comment": "com1",
                    "id": 1
                }
            },
            "sequenceId": "1605339516000000005",
            "timestamp": {
                "eventTime": 1605339934000,
                "systemTime": 1605339934951,
                "checkpointTime": 1605339934000
            }
        },
        "version": "0.0.1"
    }
  • Delete:

    {
        "schema": {
            "dataColumn": [
                {
                    "name": "id",
                    "type": "LONG"
                },
                {
                    "name": "name",
                    "type": "STRING"
                },
                {
                    "name": "comment",
                    "type": "STRING"
                }
            ],
            "source": {
                "dbName": "example_db",
                "dbType": "MySQL",
                "tableName": "example_table_pk"
            },
            "primaryKey": [
                "id",
                "name"
            ]
        },
        "payload": {
            "op": "DELETE",
            "before": {
                "dataColumn": {
                    "name": "joe",
                    "comment": "com1",
                    "id": 1
                }
            },
            "sequenceId": "1605339516000000006",
            "timestamp": {
                "eventTime": 1605339937000,
                "systemTime": 1605339937671,
                "checkpointTime": 1605339937000
            }
        },
        "version": "0.0.1"
    }
  • Heartbeat:

    {
        "schema": {},
        "payload": {
            "op": "MHEARTBEAT",
            "timestamp": {
                "eventTime": 1605339953629,
                "checkpointTime": 1605339953629
            }
        },
        "version": "0.0.1"
    }
  • DDL:

    {
        "schema": {
            "source": {
                "dbName": "example_db",
                "dbType": "MySQL",
                "tableName": "example_table_nopk"
            }
        },
        "payload": {
            "op": "ALTER",
            "sequenceId": "1605339516000000035",
            "ddl": {
                "text": "alter table example_table_nopk add column holo text",
                "ddlMeta": "rO0ABXNyACljb20uYWxpYmFiYS5kaS5wbHVnaW4uY2VudGVyLm1ldGEuRERMTWV0YQLb5Cx/YWXtAgACTAAHZGRsVGV4dHQAEkxqYXZhL2xhbmcvU3RyaW5nO0wACXN0YXRlbWVudHQAKkxjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3QvU1FMU3RhdGVtZW50O3hwdAAtYWx0ZXIgdGFibGUgdF9zaGl5dV9ub3BrIGFkZCBjb2x1bW4gaG9sbyB0ZXh0c3IAPGNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5zdGF0ZW1lbnQuU1FMQWx0ZXJUYWJsZVN0YXRlbWVudBQPP3vMUl2cAgAPSQAHYnVja2V0c1oABmlnbm9yZVoAF2ludmFsaWRhdGVHbG9iYWxJbmRleGVzWgAPbWVyZ2VTbWFsbEZpbGVzWgAHb2ZmbGluZVoABm9ubGluZVoADnJlbW92ZVBhdGl0aW5nWgATdXBkYXRlR2xvYmFsSW5kZXhlc1oAD3VwZ3JhZGVQYXRpdGluZ0wAC2NsdXN0ZXJlZEJ5dAAQTGphdmEvdXRpbC9MaXN0O0wABWl0ZW1zcQB+AAZMAAlwYXJ0aXRpb250ACxMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTFBhcnRpdGlvbkJ5O0wACHNvcnRlZEJ5cQB+AAZMAAx0YWJsZU9wdGlvbnNxAH4ABkwAC3RhYmxlU291cmNldAA6TGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL2FzdC9zdGF0ZW1lbnQvU1FMRXhwclRhYmxlU291cmNlO3hyACxjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3QuU1FMU3RhdGVtZW50SW1wbEOxUUDVCJMGAgADWgAJYWZ0ZXJTZW1pTAAGZGJUeXBldAAcTGNvbS9hbGliYWJhL2Zhc3RzcWwvRGJUeXBlO0wACWhlYWRIaW50c3EAfgAGeHIAKWNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5TUUxPYmplY3RJbXBs5LvqLFggFVECAAVJAAxzb3VyY2VDb2x1bW5JAApzb3VyY2VMaW5lTAAKYXR0cmlidXRlc3QAD0xqYXZhL3V0aWwvTWFwO0wABGhpbnR0ACxMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTENvbW1lbnRIaW50O0wABnBhcmVudHQAJ0xjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3QvU1FMT2JqZWN0O3hwAAAAAAAAAABwcHAAfnIAGmNvbS5hbGliYWJhLmZhc3RzcWwuRGJUeXBlAAAAAAAAAAASAAB4cgAOamF2YS5sYW5nLkVudW0AAAAAAAAAABIAAHhwdAAFbXlzcWxwAAAAAAAAAAAAAAAAc3IAE2phdmEudXRpbC5BcnJheUxpc3R4gdIdmcdhnQMAAUkABHNpemV4cAAAAAB3BAAAAAB4c3EAfgAUAAAAAXcEAAAAAXNyADxjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3Quc3RhdGVtZW50LlNRTEFsdGVyVGFibGVBZGRDb2x1bW4l5T6CFe//BAIABloAB2Nhc2NhZGVaAAVmaXJzdEwAC2FmdGVyQ29sdW1udAAlTGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL2FzdC9TUUxOYW1lO0wAB2NvbHVtbnNxAH4ABkwAC2ZpcnN0Q29sdW1ucQB+ABhMAAhyZXN0cmljdHQAE0xqYXZhL2xhbmcvQm9vbGVhbjt4cQB+AAsAAAAAAAAAAHBwcQB+AA8AAHBzcQB+ABQAAAABdwQAAAABc3IAOWNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5zdGF0ZW1lbnQuU1FMQ29sdW1uRGVmaW5pdGlvbst0gLKZ0qAtAgAmWgANYXV0b0luY3JlbWVudFoADGRpc2FibGVJbmRleFoAB3ByZVNvcnRJAAxwcmVTb3J0T3JkZXJaAAZzdG9yZWRaAAd2aXJ0dWFsWgAHdmlzaWJsZUwACGFubkluZGV4dAApTGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL2FzdC9TUUxBbm5JbmRleDtMAAZhc0V4cHJ0ACVMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTEV4cHI7TAALY2hhcnNldEV4cHJxAH4AHkwADWNvbFByb3BlcnRpZXNxAH4ABkwAC2NvbGxhdGVFeHBycQB+AB5MAAdjb21tZW50cQB+AB5MAAtjb21wcmVzc2lvbnQALkxjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3QvZXhwci9TUUxDaGFyRXhwcjtMAAtjb25zdHJhaW50c3EAfgAGTAAIZGF0YVR5cGV0AClMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTERhdGFUeXBlO0wABmRiVHlwZXEAfgAKTAALZGVmYXVsdEV4cHJxAH4AHkwACWRlbGltaXRlcnEAfgAeTAASZGVsaW1pdGVyVG9rZW5pemVycQB+AB5MAAZlbmFibGVxAH4AGUwABmVuY29kZXEAfgAfTAAGZm9ybWF0cQB+AB5MABBnZW5lcmF0ZWRBbGF3c0FzcQB+AB5MAAhpZGVudGl0eXQARExjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3Qvc3RhdGVtZW50L1NRTENvbHVtbkRlZmluaXRpb24kSWRlbnRpdHk7TAASanNvbkluZGV4QXR0cnNFeHBycQB+AB5MAAhtYXBwZWRCeXEAfgAGTAAEbmFtZXEAfgAYTAAMbmxwVG9rZW5pemVycQB+AB5MAAhvblVwZGF0ZXEAfgAeTAAEcmVseXEAfgAZTAAMc2VxdWVuY2VUeXBldAAvTGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL2FzdC9BdXRvSW5jcmVtZW50VHlwZTtMAARzdGVwcQB+AB5MAAdzdG9yYWdlcQB+AB5MAAl1bml0Q291bnRxAH4AHkwACXVuaXRJbmRleHEAfgAeTAAIdmFsaWRhdGVxAH4AGUwACXZhbHVlVHlwZXEAfgAeeHEAfgALAAAAAAAAAABwcHEAfgAaAAAAAAAAAAAAAHBwcHBwcHBzcQB+ABQAAAAAdwQAAAAAeHNyADpjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3Quc3RhdGVtZW50LlNRTENoYXJhY3RlckRhdGFUeXBlqtJac/d+04cCAAVaAAloYXNCaW5hcnlMAAtjaGFyU2V0TmFtZXEAfgABTAAIY2hhclR5cGVxAH4AAUwAB2NvbGxhdGVxAH4AAUwABWhpbnRzcQB+AAZ4cgArY29tLmFsaWJhYmEuZmFzdHNxbC5zcWwuYXN0LlNRTERhdGFUeXBlSW1wbEWL29pc1gZFAgAJSgAObmFtZUhhc2hDb2RlNjRaAAh1bnNpZ25lZFoAEXdpdGhMb2NhbFRpbWVab25lWgAIemVyb2ZpbGxMAAlhcmd1bWVudHNxAH4ABkwABmRiVHlwZXEAfgAKTAAHaW5kZXhCeXEAfgAeTAAEbmFtZXEAfgABTAAMd2l0aFRpbWVab25lcQB+ABl4cQB+AAsAAAAAAAAAAHBwcQB+ACP6BPTvGZVAfgAAAHNxAH4AFAAAAAB3BAAAAAB4cHB0AAR0ZXh0cABwcHBwcQB+ABJwcHBwcHBwcHBwc3IAMmNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5leHByLlNRTElkZW50aWZpZXJFeHBy3DXH1zvWbgkCAARKAApoYXNoQ29kZTY0TAAEbmFtZXEAfgABTAAOcmVzb2x2ZWRDb2x1bW5xAH4ADkwAE3Jlc29sdmVkT3duZXJPYmplY3RxAH4ADnhyACdjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3QuU1FMRXhwckltcGxs2ypmFJxWrQIAAHhxAH4ACwAAAAAAAAAAcHBwQCnxzH5tIDl0AARob2xvcHBwcHBwcHBwcHBweHBweHBzcQB+ABQAAAAAdwQAAAAAeHNxAH4AFAAAAAB3BAAAAAB4c3IAOGNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5zdGF0ZW1lbnQuU1FMRXhwclRhYmxlU291cmNlRHD7eYJ4eswCAAVMAAdjb2x1bW5zcQB+AAZMAARleHBycQB+AB5MAApwYXJ0aXRpb25zcQB+AAZMAAhzYW1wbGluZ3QAOExjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3Qvc3RhdGVtZW50L1NRTFRhYmxlU2FtcGxpbmc7TAAMc2NoZW1hT2JqZWN0dAAxTGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL3JlcG9zaXRvcnkvU2NoZW1hT2JqZWN0O3hyADhjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3Quc3RhdGVtZW50LlNRTFRhYmxlU291cmNlSW1wbAqEMenTm5zUAgAESgAPYWxpYXNIYXNoQ29kZTY0TAAFYWxpYXNxAH4AAUwACWZsYXNoYmFja3EAfgAeTAAFaGludHNxAH4ABnhxAH4ACwAAAAAAAAAAcHBwAAAAAAAAAABwcHBwc3EAfgAqAAAAAAAAAABwcHEAfgA0NH7o4UvP9Dt0AAx0X3NoaXl1X25vcGtwcHBwcA=="
            },
            "timestamp": {
                "eventTime": 1605342109000,
                "systemTime": 1605342109259,
                "checkpointTime": 1605342109000
            }
        },
        "version": "0.0.1"
    }