All Products
Search
Document Center

DataWorks:Message formats

Last Updated:Mar 26, 2026

DataHub uses two topic types—Tuple and Blob—each with a different message format and sharding strategy. This reference describes how each type works, how shards are assigned, and the exact JSON structure of Blob messages with examples for every operation type.

Topic types

A topic is the smallest unit for publishing and subscribing to data in DataHub. DataHub supports two topic types:

Tuple Blob
DML messages Supported Supported
DDL messages Not supported Supported
Heartbeat messages Not supported Supported
Source-to-topic mapping One table to one topic One database (multiple tables) to one topic
Data type DataHub-supported types BLOB binary data

Tuple topics have a fixed schema that cannot change after the topic is created. Each source table maps to its own topic, so DDL operations (such as ADD COLUMN or DROP COLUMN) are not supported. Use Tuple topics when your source tables have a stable schema and you do not need to propagate DDL changes downstream.

Blob topics store raw BLOB binary data and carry no schema, which gives them full flexibility. A single Blob topic can receive data from an entire database—all tables included—and pass DDL and heartbeat messages to downstream consumers. Blob topics are well suited for full database migration where DataHub acts as an intermediate message queue.

Sharding strategies

A shard is a concurrent channel for writing data to a DataHub topic. Each shard has a maximum write throughput, and DataHub guarantees message order only within a single shard, not across shards.

Scenario Tuple Blob
With a primary key Shard by primary key Shard by primary key
Order guarantee Messages with the same primary key are ordered Messages with the same primary key are ordered
Without a primary key Random sharding Shard by table name
Order guarantee Not guaranteed Messages for the same table are ordered

For tables with a primary key (including custom primary keys), both topic types route messages by primary key value, keeping all changes to the same row on the same shard. For tables without a primary key, Tuple topics distribute writes randomly, while Blob topics group writes by table name to preserve per-table order.

Tuple data format

When Data Integration creates a Tuple topic, it automatically adds five metadata columns alongside your source table columns:

Column Description
_sequence_id_ String. A unique numeric ID for each message. An UPDATE produces two messages (before and after) that share the same sequence ID.
_excute_time_ The time when the change was generated in the source.
_source_table_ The name of the source table.
_before_image_ Y if this row represents the state before the change (applies to UPDATE before-image and DELETE). N otherwise.
_after_image_ Y if this row represents the state after the change (applies to UPDATE after-image and INSERT). N otherwise.

The following example shows the messages produced by an INSERT, an UPDATE, and a DELETE on the same row:

_sequence_id_ _operation_type_ _excute_time_ _before_image_ _after_image_
1649991610688000000 I 1649991726000 N Y
1649991610688000001 U 1649991756000 Y N
1649991610688000001 U 1649991756000 N Y
1649991610688000002 D 1649991774000 Y N

The UPDATE produces two rows with the same _sequence_id_: the first carries _before_image_=Y (the original values), and the second carries _after_image_=Y (the updated values).

Blob message format

Blob messages are binary data converted from a JSON string. Every message is a single JSONObject serialized to a UTF-8 byte array.

Top-level structure

{
  "schema": { ... },
  "payload": { ... },
  "version": "1.0.0"
}

Field types

All field values in a Blob message must be one of six types defined by StreamX:

Type Description
BOOLEAN true or false
DOUBLE Double-precision floating-point number
LONG 64-bit integer
STRING UTF-8 string
DATE A 13-digit integer representing a millisecond-precision timestamp
BYTES A Base64-encoded string. Encode and decode using java.util.Base64:
String text = "test_text123";
// Encode
Base64.getEncoder().encodeToString(text.getBytes("UTF-8"));
// Decode
Base64.getDecoder().decode(encodedText);

schema object

The schema object describes the structure of the source table and database at the time of the change.

Field Type Description
schema.dataColumn JSONArray All columns involved in the change, each with a name (String) and type (one of the six StreamX types). Covers both data changes and DDL schema changes.
schema.primaryKey Array Primary key column names.
schema.source.dbType String Database type (for example, mysql).
schema.source.dbVersion String Database version.
schema.source.dbName String Database name.
schema.source.schemaName String Schema name. Applies to databases such as PostgreSQL and SQL Server.
schema.source.tableName String Table name.

payload object

The payload object contains the actual change data.

Field Type When present Description
payload.op String Always Operation type. Case-sensitive. See Operation types.
payload.sequenceId String DML operations Used to sort messages when merging incremental and full data.
payload.before JSONObject UPDATE, DELETE Row state before the change. Contains a dataColumn map of column_name: column_value. BYTES values are Base64-encoded strings; DATE values are 13-digit long integers.
payload.after JSONObject INSERT, UPDATE Row state after the change. Same format as before.
payload.timestamp.eventTime Long Always Required. Time of the change in the source database. 13-digit millisecond-precision timestamp.
payload.timestamp.systemTime Long Some sources Time when the sync task processed this message. Present for sources such as Oracle CDC.
payload.timestamp.checkpointTime Long Some sources Time set when resetting the sync offset. Present for sources such as OceanBase Database.
payload.ddl.text String DDL operations The DDL statement text.
payload.ddl.ddlMeta String DDL operations The SQLStatement object parsed by FastSQL, serialized and Base64-encoded. Required when DDL support is enabled; downstream consumers deserialize this to apply the DDL change.

Operation types

The op field is case-sensitive. Valid values:

Value Description before after
INSERT Row inserted null Populated
UPDATE_BEFOR Before-image of an updated row Populated null
UPDATE_AFTER After-image of an updated row null Populated
DELETE Row deleted Populated null
TRANSACTION_BEGIN Transaction started
TRANSACTION_END Transaction committed
CREATE Table created
ALTER Table schema changed
QUERY Original SQL of the change
TRUNCATE Table truncated
RENAME Table renamed
CINDEX Index created
DINDEX Index deleted
GTID Global transaction ID event
XACOMMIT XA transaction committed
XAROLLBACK XA transaction rolled back
ERASE Data erased
MHEARTBEAT Heartbeat message when no new data arrives from the source
UPDATE_BEFOR is the exact enum value used in the API (not UPDATE_BEFORE).

Serialization

To serialize a Blob message:

  1. Build the JSONObject according to the format above.

  2. Convert it to a JSON string (for example, using fastJSON's toJSONString method).

  3. Encode the string as a UTF-8 byte array using getBytes(Charsets.UTF_8).

One message corresponds to one JSONObject.

Message examples

All examples below use the following source table on a MySQL database:

  • Database: yunshi_db

  • Table: t_shiyu_pk

  • Columns: id (LONG), name (STRING), comment (STRING)

  • Primary key: composite key on id and name

INSERT

{
  "schema": {
    "dataColumn": [
      { "name": "id", "type": "LONG" },
      { "name": "name", "type": "STRING" },
      { "name": "comment", "type": "STRING" }
    ],
    "source": {
      "dbName": "yunshi_db",
      "dbType": "MySQL",
      "tableName": "t_shiyu_pk"
    },
    "primaryKey": ["id", "name"]
  },
  "payload": {
    "op": "INSERT",
    "after": {
      "dataColumn": {
        "id": 1,
        "name": "joe",
        "comment": "comment"
      }
    },
    "sequenceId": "1605339516000000004",
    "timestamp": {
      "eventTime": 1605339932000,
      "systemTime": 1605339932736,
      "checkpointTime": 1605339932000
    }
  },
  "version": "0.0.1"
}

UPDATE

An UPDATE produces two messages with the same sequenceId: UPDATE_BEFOR carries the original row state, and UPDATE_AFTER carries the updated row state.

UPDATE_BEFOR (original values):

{
  "schema": {
    "dataColumn": [
      { "name": "id", "type": "LONG" },
      { "name": "name", "type": "STRING" },
      { "name": "comment", "type": "STRING" }
    ],
    "source": {
      "dbName": "yunshi_db",
      "dbType": "MySQL",
      "tableName": "t_shiyu_pk"
    },
    "primaryKey": ["id", "name"]
  },
  "payload": {
    "op": "UPDATE_BEFOR",
    "before": {
      "dataColumn": {
        "id": 1,
        "name": "joe",
        "comment": "comment"
      }
    },
    "sequenceId": "1605339516000000005",
    "timestamp": {
      "eventTime": 1605339934000,
      "systemTime": 1605339934951,
      "checkpointTime": 1605339934000
    }
  },
  "version": "0.0.1"
}

UPDATE_AFTER (updated values, same sequenceId):

{
  "schema": {
    "dataColumn": [
      { "name": "id", "type": "LONG" },
      { "name": "name", "type": "STRING" },
      { "name": "comment", "type": "STRING" }
    ],
    "source": {
      "dbName": "yunshi_db",
      "dbType": "MySQL",
      "tableName": "t_shiyu_pk"
    },
    "primaryKey": ["id", "name"]
  },
  "payload": {
    "op": "UPDATE_AFTER",
    "after": {
      "dataColumn": {
        "id": 1,
        "name": "joe",
        "comment": "com1"
      }
    },
    "sequenceId": "1605339516000000005",
    "timestamp": {
      "eventTime": 1605339934000,
      "systemTime": 1605339934951,
      "checkpointTime": 1605339934000
    }
  },
  "version": "0.0.1"
}

DELETE

{
  "schema": {
    "dataColumn": [
      { "name": "id", "type": "LONG" },
      { "name": "name", "type": "STRING" },
      { "name": "comment", "type": "STRING" }
    ],
    "source": {
      "dbName": "yunshi_db",
      "dbType": "MySQL",
      "tableName": "t_shiyu_pk"
    },
    "primaryKey": ["id", "name"]
  },
  "payload": {
    "op": "DELETE",
    "before": {
      "dataColumn": {
        "id": 1,
        "name": "joe",
        "comment": "com1"
      }
    },
    "sequenceId": "1605339516000000006",
    "timestamp": {
      "eventTime": 1605339937000,
      "systemTime": 1605339937671,
      "checkpointTime": 1605339937000
    }
  },
  "version": "0.0.1"
}

Heartbeat

When no new data arrives from the source, DataHub writes a heartbeat message to indicate the sync task is running normally. The schema is empty.

{
  "schema": {},
  "payload": {
    "op": "MHEARTBEAT",
    "timestamp": {
      "eventTime": 1605339953629,
      "checkpointTime": 1605339953629
    }
  },
  "version": "0.0.1"
}

DDL (ALTER)

When a DDL operation changes the table schema, the ddl field is populated and the before/after fields are omitted.

{
  "schema": {
    "source": {
      "dbName": "test_db",
      "dbType": "MySQL",
      "tableName": "t_test_nopk"
    }
  },
  "payload": {
    "op": "ALTER",
    "sequenceId": "1605339516000000035",
    "ddl": {
      "text": "alter table t_test_nopk add column holo text",
      "ddlMeta": "rO0ABXNyACljb20uYWxpYmFiYS5kaS5wbHVnaW4uY2VudGVyLm1ldGEuRERMTWV0YQLb5Cx/YWXtAgACTAAHZGRsVGV4dHQAEkxqYXZhL2xhbmcvU3RyaW5nO0wACXN0YXRlbWVudHQAKkxjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3QvU1FMU3RhdGVtZW50O3hwdAAtYWx0ZXIgdGFibGUgdF9zaGl5dV9ub3BrIGFkZCBjb2x1bW4gaG9sbyB0ZXh0c3IAPGNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5zdGF0ZW1lbnQuU1FMQWx0ZXJUYWJsZVN0YXRlbWVudBQPP3vMUl2cAgAPSQAHYnVja2V0c1oABmlnbm9yZVoAF2ludmFsaWRhdGVHbG9iYWxJbmRleGVzWgAPbWVyZ2VTbWFsbEZpbGVzWgAHb2ZmbGluZVoABm9ubGluZVoADnJlbW92ZVBhdGl0aW5nWgATdXBkYXRlR2xvYmFsSW5kZXhlc1oAD3VwZ3JhZGVQYXRpdGluZ0wAC2NsdXN0ZXJlZEJ5dAAQTGphdmEvdXRpbC9MaXN0O0wABWl0ZW1zcQB+AAZMAAlwYXJ0aXRpb250ACxMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTFBhcnRpdGlvbkJ5O0wACHNvcnRlZEJ5cQB+AAZMAAx0YWJsZU9wdGlvbnNxAH4ABkwAC3RhYmxlU291cmNldAA6TGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL2FzdC9zdGF0ZW1lbnQvU1FMRXhwclRhYmxlU291cmNlO3hyACxjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3QuU1FMU3RhdGVtZW50SW1wbEOxUUDVCJMGAgADWgAJYWZ0ZXJTZW1pTAAGZGJUeXBldAAcTGNvbS9hbGliYWJhL2Zhc3RzcWwvRGJUeXBlO0wACWhlYWRIaW50c3EAfgAGeHIAKWNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5TUUxPYmplY3RJbXBs5LvqLFggFVECAAVJAAxzb3VyY2VDb2x1bW5JAApzb3VyY2VMaW5lTAAKYXR0cmlidXRlc3QAD0xqYXZhL3V0aWwvTWFwO0wABGhpbnR0ACxMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTENvbW1lbnRIaW50O0wABnBhcmVudHQAJ0xjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3QvU1FMT2JqZWN0O3hwAAAAAAAAAABwcHAAfnIAGmNvbS5hbGliYWJhLmZhc3RzcWwuRGJUeXBlAAAAAAAAAAASAAB4cgAOamF2YS5sYW5nLkVudW0AAAAAAAAAABIAAHhwdAAFbXlzcWxwAAAAAAAAAAAAAAAAc3IAE2phdmEudXRpbC5BcnJheUxpc3R4gdIdmcdhnQMAAUkABHNpemV4cAAAAAB3BAAAAAB4c3EAfgAUAAAAAXcEAAAAAXNyADxjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3Quc3RhdGVtZW50LlNRTEFsdGVyVGFibGVBZGRDb2x1bW4l5T6CFe//BAIABloAB2Nhc2NhZGVaAAVmaXJzdEwAC2FmdGVyQ29sdW1udAAlTGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL2FzdC9TUUxOYW1lO0wAB2NvbHVtbnNxAH4ABkwAC2ZpcnN0Q29sdW1ucQB+ABhMAAhyZXN0cmljdHQAE0xqYXZhL2xhbmcvQm9vbGVhbjt4cQB+AAsAAAAAAAAAAHBwcQB+AA8AAHBzcQB+ABQAAAABdwQAAAABc3IAOWNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5zdGF0ZW1lbnQuU1FMQ29sdW1uRGVmaW5pdGlvbst0gLKZ0qAtAgAmWgANYXV0b0luY3JlbWVudFoADGRpc2FibGVJbmRleFoAB3ByZVNvcnRJAAxwcmVTb3J0T3JkZXJaAAZzdG9yZWRaAAd2aXJ0dWFsWgAHZmlzaWJsZUwACGFubkluZGV4dAApTGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL2FzdC9TUUxBbm5JbmRleDtMAAZhc0V4cHJ0ACVMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTEV4cHI7TAALY2hhcnNldEV4cHJxAH4AHkwADWNvbFByb3BlcnRpZXNxAH4ABkwAC2NvbGxhdGVFeHBycQB+AB5MAAdjb21tZW50cQB+AB5MAAtjb21wcmVzc2lvbnQALkxjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3QvZXhwci9TUUxDaGFyRXhwcjtMAAtjb25zdHJhaW50c3EAfgAGTAAIZGF0YVR5cGV0AClMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTERhdGFUeXBlO0wABmRiVHlwZXEAfgAKTAALZGVmYXVsdEV4cHJxAH4AHkwACWRlbGltaXRlcnEAfgAeTAASZGVsaW1pdGVyVG9rZW5pemVycQB+AB5MAAZlbmFibGVxAH4AGUwABmVuY29kZXEAfgAfTAAGZm9ybWF0cQB+AB5MABBnZW5lcmF0ZWRBbGF3c0FzcQB+AB5MAAhpZGVudGl0eXQARExjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3Qvc3RhdGVtZW50L1NRTENvbHVtbkRlZmluaXRpb24kSWRlbnRpdHk7TAASanNvbkluZGV4QXR0cnNFeHBycQB+AB5MAAhtYXBwZWRCeXEAfgAGTAAEbmFtZXEAfgAYTAAMbmxwVG9rZW5pemVycQB+AB5MAAhvblVwZGF0ZXEAfgAeTAAEcmVseXEAfgAZTAAMc2VxdWVuY2VUeXBldAAvTGNvbS5hbGliYWJhLmZhc3RzcWwvc3FsL2FzdC9BdXRvSW5jcmVtZW50VHlwZTtMAARzdGVwcQB+AB5MAAdzdG9yYWdlcQB+AB5MAAl1bml0Q291bnRxAH4AHkwACXVuaXRJbmRleHEAfgAeTAAIdmFsaWRhdGVxAH4AGUwACXZhbHVlVHlwZXEAfgAeeHEAfgALAAAAAAAAAABwcHEAfgAaAAAAAAAAAAAAAHBwcHBwcHBzcQB+ABQAAAAAdwQAAAAAeHNyADpjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3Quc3RhdGVtZW50LlNRTENoYXJhY3RlckRhdGFUeXBlqtJac/d+04cCAAVaAAloYXNCaW5hcnlMAAtjaGFyU2V0TmFtZXEAfgABTAAIY2hhclR5cGVxAH4AAUwAB2NvbGxhdGVxAH4AAUwABWhpbnRzcQB+AAZ4cgArY29tLmFsaWJhYmEuZmFzdHNxbC5zcWwuYXN0LlNRTERhdGFUeXBlSW1wbEWL29ic1gZFAgAJSgAObmFtZUhhc2hDb2RlNjRaAAh1bnNpZ25lZFoAEXdpdGhMb2NhbFRpbWVab25lWgAIemVyb2ZpbGxMAAlhcmd1bWVudHNxAH4ABkwABmRiVHlwZXEAfgAKTAAHaW5kZXhCeXEAfgAeTAAEbmFtZXEAfgABTAAMd2l0aFRpbWVab25lcQB+ABl4cQB+AAsAAAAAAAAAAHBwcQB+ACP6BPTvGZVAfgAAAHNxAH4AFAAAAAB3BAAAAAB4cHB0AAR0ZXh0cABwcHBwcQB+ABJwcHBwcHBwcHBwc3IAMmNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5leHByLlNRTElkZW50aWZpZXJFeHBy3DXH1zvWbgkCAARKAApoYXNoQ29kZTY0TAAEbmFtZXEAfgABTAAOcmVzb2x2ZWRDb2x1bW5xAH4ADkwAE3Jlc29sdmVkT3duZXJPYmplY3RxAH4ADnhyACdjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3QuU1FMRXhwckltcGxs2ypmFJxWrQIAAHhxAH4ACwAAAAAAAAAAcHBwQCnxzH5tIDl0AARob2xvcHBwcHBwcHBwcHBweHBweHBzcQB+ABQAAAAAdwQAAAAAeHNxAH4AFAAAAAB3BAAAAAB4c3IAOGNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5zdGF0ZW1lbnQuU1FMRXhwclRhYmxlU291cmNlRHD7eYJ4eswCAAVMAAdjb2x1bW5zcQB+AAZMAARleHBycQB+AB5MAApwYXJ0aXRpb25zcQB+AAZMAAhzYW1wbGluZ3QAOExjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3Qvc3RhdGVtZW50L1NRTFRhYmxlU2FtcGxpbmc7TAAMc2NoZW1hT2JqZWN0dAAxTGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL3JlcG9zaXRvcnkvU2NoZW1hT2JqZWN0O3hyADhjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3Quc3RhdGVtZW50LlNRTFRhYmxlU291cmNlSW1wbAqEMenTm5zUAgAESgAPYWxpYXNIYXNoQ29kZTY0TAAFYWxpYXNxAH4AAUwACWZsYXNoYmFja3EAfgAeTAAFaGludHNxAH4ABnhxAH4ACwAAAAAAAAAAcHBwAAAAAAAAAABwcHBwc3EAfgAqAAAAAAAAAABwcHEAfgA0NH7o4UvP9Dt0AAx0X3NoaXl1X25vcGtwcHBwcA=="
    },
    "timestamp": {
      "eventTime": 1605342109000,
      "systemTime": 1605342109259,
      "checkpointTime": 1605342109000
    }
  },
  "version": "0.0.1"
}