DataHub uses two topic types—Tuple and Blob—each with a different message format and sharding strategy. This reference describes how each type works, how shards are assigned, and the exact JSON structure of Blob messages with examples for every operation type.
Topic types
A topic is the smallest unit for publishing and subscribing to data in DataHub. DataHub supports two topic types:
| Tuple | Blob | |
|---|---|---|
| DML messages | Supported | Supported |
| DDL messages | Not supported | Supported |
| Heartbeat messages | Not supported | Supported |
| Source-to-topic mapping | One table to one topic | One database (multiple tables) to one topic |
| Data type | DataHub-supported types | BLOB binary data |
Tuple topics have a fixed schema that cannot change after the topic is created. Each source table maps to its own topic, so DDL operations (such as ADD COLUMN or DROP COLUMN) are not supported. Use Tuple topics when your source tables have a stable schema and you do not need to propagate DDL changes downstream.
Blob topics store raw BLOB binary data and carry no schema, which gives them full flexibility. A single Blob topic can receive data from an entire database—all tables included—and pass DDL and heartbeat messages to downstream consumers. Blob topics are well suited for full database migration where DataHub acts as an intermediate message queue.
Sharding strategies
A shard is a concurrent channel for writing data to a DataHub topic. Each shard has a maximum write throughput, and DataHub guarantees message order only within a single shard, not across shards.
| Scenario | Tuple | Blob |
|---|---|---|
| With a primary key | Shard by primary key | Shard by primary key |
| Order guarantee | Messages with the same primary key are ordered | Messages with the same primary key are ordered |
| Without a primary key | Random sharding | Shard by table name |
| Order guarantee | Not guaranteed | Messages for the same table are ordered |
For tables with a primary key (including custom primary keys), both topic types route messages by primary key value, keeping all changes to the same row on the same shard. For tables without a primary key, Tuple topics distribute writes randomly, while Blob topics group writes by table name to preserve per-table order.
Tuple data format
When Data Integration creates a Tuple topic, it automatically adds five metadata columns alongside your source table columns:
| Column | Description |
|---|---|
_sequence_id_ |
String. A unique numeric ID for each message. An UPDATE produces two messages (before and after) that share the same sequence ID. |
_excute_time_ |
The time when the change was generated in the source. |
_source_table_ |
The name of the source table. |
_before_image_ |
Y if this row represents the state before the change (applies to UPDATE before-image and DELETE). N otherwise. |
_after_image_ |
Y if this row represents the state after the change (applies to UPDATE after-image and INSERT). N otherwise. |
The following example shows the messages produced by an INSERT, an UPDATE, and a DELETE on the same row:
_sequence_id_ |
_operation_type_ |
_excute_time_ |
_before_image_ |
_after_image_ |
|---|---|---|---|---|
| 1649991610688000000 | I | 1649991726000 | N | Y |
| 1649991610688000001 | U | 1649991756000 | Y | N |
| 1649991610688000001 | U | 1649991756000 | N | Y |
| 1649991610688000002 | D | 1649991774000 | Y | N |
The UPDATE produces two rows with the same _sequence_id_: the first carries _before_image_=Y (the original values), and the second carries _after_image_=Y (the updated values).
Blob message format
Blob messages are binary data converted from a JSON string. Every message is a single JSONObject serialized to a UTF-8 byte array.
Top-level structure
{
"schema": { ... },
"payload": { ... },
"version": "1.0.0"
}
Field types
All field values in a Blob message must be one of six types defined by StreamX:
| Type | Description |
|---|---|
BOOLEAN |
true or false |
DOUBLE |
Double-precision floating-point number |
LONG |
64-bit integer |
STRING |
UTF-8 string |
DATE |
A 13-digit integer representing a millisecond-precision timestamp |
BYTES |
A Base64-encoded string. Encode and decode using java.util.Base64: |
String text = "test_text123";
// Encode
Base64.getEncoder().encodeToString(text.getBytes("UTF-8"));
// Decode
Base64.getDecoder().decode(encodedText);
schema object
The schema object describes the structure of the source table and database at the time of the change.
| Field | Type | Description |
|---|---|---|
schema.dataColumn |
JSONArray | All columns involved in the change, each with a name (String) and type (one of the six StreamX types). Covers both data changes and DDL schema changes. |
schema.primaryKey |
Array | Primary key column names. |
schema.source.dbType |
String | Database type (for example, mysql). |
schema.source.dbVersion |
String | Database version. |
schema.source.dbName |
String | Database name. |
schema.source.schemaName |
String | Schema name. Applies to databases such as PostgreSQL and SQL Server. |
schema.source.tableName |
String | Table name. |
payload object
The payload object contains the actual change data.
| Field | Type | When present | Description |
|---|---|---|---|
payload.op |
String | Always | Operation type. Case-sensitive. See Operation types. |
payload.sequenceId |
String | DML operations | Used to sort messages when merging incremental and full data. |
payload.before |
JSONObject | UPDATE, DELETE | Row state before the change. Contains a dataColumn map of column_name: column_value. BYTES values are Base64-encoded strings; DATE values are 13-digit long integers. |
payload.after |
JSONObject | INSERT, UPDATE | Row state after the change. Same format as before. |
payload.timestamp.eventTime |
Long | Always | Required. Time of the change in the source database. 13-digit millisecond-precision timestamp. |
payload.timestamp.systemTime |
Long | Some sources | Time when the sync task processed this message. Present for sources such as Oracle CDC. |
payload.timestamp.checkpointTime |
Long | Some sources | Time set when resetting the sync offset. Present for sources such as OceanBase Database. |
payload.ddl.text |
String | DDL operations | The DDL statement text. |
payload.ddl.ddlMeta |
String | DDL operations | The SQLStatement object parsed by FastSQL, serialized and Base64-encoded. Required when DDL support is enabled; downstream consumers deserialize this to apply the DDL change. |
Operation types
The op field is case-sensitive. Valid values:
| Value | Description | before |
after |
|---|---|---|---|
INSERT |
Row inserted | null | Populated |
UPDATE_BEFOR |
Before-image of an updated row | Populated | null |
UPDATE_AFTER |
After-image of an updated row | null | Populated |
DELETE |
Row deleted | Populated | null |
TRANSACTION_BEGIN |
Transaction started | — | — |
TRANSACTION_END |
Transaction committed | — | — |
CREATE |
Table created | — | — |
ALTER |
Table schema changed | — | — |
QUERY |
Original SQL of the change | — | — |
TRUNCATE |
Table truncated | — | — |
RENAME |
Table renamed | — | — |
CINDEX |
Index created | — | — |
DINDEX |
Index deleted | — | — |
GTID |
Global transaction ID event | — | — |
XACOMMIT |
XA transaction committed | — | — |
XAROLLBACK |
XA transaction rolled back | — | — |
ERASE |
Data erased | — | — |
MHEARTBEAT |
Heartbeat message when no new data arrives from the source | — | — |
UPDATE_BEFORis the exact enum value used in the API (notUPDATE_BEFORE).
Serialization
To serialize a Blob message:
-
Build the JSONObject according to the format above.
-
Convert it to a JSON string (for example, using fastJSON's
toJSONStringmethod). -
Encode the string as a UTF-8 byte array using
getBytes(Charsets.UTF_8).
One message corresponds to one JSONObject.
Message examples
All examples below use the following source table on a MySQL database:
-
Database:
yunshi_db -
Table:
t_shiyu_pk -
Columns:
id(LONG),name(STRING),comment(STRING) -
Primary key: composite key on
idandname
INSERT
{
"schema": {
"dataColumn": [
{ "name": "id", "type": "LONG" },
{ "name": "name", "type": "STRING" },
{ "name": "comment", "type": "STRING" }
],
"source": {
"dbName": "yunshi_db",
"dbType": "MySQL",
"tableName": "t_shiyu_pk"
},
"primaryKey": ["id", "name"]
},
"payload": {
"op": "INSERT",
"after": {
"dataColumn": {
"id": 1,
"name": "joe",
"comment": "comment"
}
},
"sequenceId": "1605339516000000004",
"timestamp": {
"eventTime": 1605339932000,
"systemTime": 1605339932736,
"checkpointTime": 1605339932000
}
},
"version": "0.0.1"
}
UPDATE
An UPDATE produces two messages with the same sequenceId: UPDATE_BEFOR carries the original row state, and UPDATE_AFTER carries the updated row state.
UPDATE_BEFOR (original values):
{
"schema": {
"dataColumn": [
{ "name": "id", "type": "LONG" },
{ "name": "name", "type": "STRING" },
{ "name": "comment", "type": "STRING" }
],
"source": {
"dbName": "yunshi_db",
"dbType": "MySQL",
"tableName": "t_shiyu_pk"
},
"primaryKey": ["id", "name"]
},
"payload": {
"op": "UPDATE_BEFOR",
"before": {
"dataColumn": {
"id": 1,
"name": "joe",
"comment": "comment"
}
},
"sequenceId": "1605339516000000005",
"timestamp": {
"eventTime": 1605339934000,
"systemTime": 1605339934951,
"checkpointTime": 1605339934000
}
},
"version": "0.0.1"
}
UPDATE_AFTER (updated values, same sequenceId):
{
"schema": {
"dataColumn": [
{ "name": "id", "type": "LONG" },
{ "name": "name", "type": "STRING" },
{ "name": "comment", "type": "STRING" }
],
"source": {
"dbName": "yunshi_db",
"dbType": "MySQL",
"tableName": "t_shiyu_pk"
},
"primaryKey": ["id", "name"]
},
"payload": {
"op": "UPDATE_AFTER",
"after": {
"dataColumn": {
"id": 1,
"name": "joe",
"comment": "com1"
}
},
"sequenceId": "1605339516000000005",
"timestamp": {
"eventTime": 1605339934000,
"systemTime": 1605339934951,
"checkpointTime": 1605339934000
}
},
"version": "0.0.1"
}
DELETE
{
"schema": {
"dataColumn": [
{ "name": "id", "type": "LONG" },
{ "name": "name", "type": "STRING" },
{ "name": "comment", "type": "STRING" }
],
"source": {
"dbName": "yunshi_db",
"dbType": "MySQL",
"tableName": "t_shiyu_pk"
},
"primaryKey": ["id", "name"]
},
"payload": {
"op": "DELETE",
"before": {
"dataColumn": {
"id": 1,
"name": "joe",
"comment": "com1"
}
},
"sequenceId": "1605339516000000006",
"timestamp": {
"eventTime": 1605339937000,
"systemTime": 1605339937671,
"checkpointTime": 1605339937000
}
},
"version": "0.0.1"
}
Heartbeat
When no new data arrives from the source, DataHub writes a heartbeat message to indicate the sync task is running normally. The schema is empty.
{
"schema": {},
"payload": {
"op": "MHEARTBEAT",
"timestamp": {
"eventTime": 1605339953629,
"checkpointTime": 1605339953629
}
},
"version": "0.0.1"
}
DDL (ALTER)
When a DDL operation changes the table schema, the ddl field is populated and the before/after fields are omitted.
{
"schema": {
"source": {
"dbName": "test_db",
"dbType": "MySQL",
"tableName": "t_test_nopk"
}
},
"payload": {
"op": "ALTER",
"sequenceId": "1605339516000000035",
"ddl": {
"text": "alter table t_test_nopk add column holo text",
"ddlMeta": "rO0ABXNyACljb20uYWxpYmFiYS5kaS5wbHVnaW4uY2VudGVyLm1ldGEuRERMTWV0YQLb5Cx/YWXtAgACTAAHZGRsVGV4dHQAEkxqYXZhL2xhbmcvU3RyaW5nO0wACXN0YXRlbWVudHQAKkxjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3QvU1FMU3RhdGVtZW50O3hwdAAtYWx0ZXIgdGFibGUgdF9zaGl5dV9ub3BrIGFkZCBjb2x1bW4gaG9sbyB0ZXh0c3IAPGNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5zdGF0ZW1lbnQuU1FMQWx0ZXJUYWJsZVN0YXRlbWVudBQPP3vMUl2cAgAPSQAHYnVja2V0c1oABmlnbm9yZVoAF2ludmFsaWRhdGVHbG9iYWxJbmRleGVzWgAPbWVyZ2VTbWFsbEZpbGVzWgAHb2ZmbGluZVoABm9ubGluZVoADnJlbW92ZVBhdGl0aW5nWgATdXBkYXRlR2xvYmFsSW5kZXhlc1oAD3VwZ3JhZGVQYXRpdGluZ0wAC2NsdXN0ZXJlZEJ5dAAQTGphdmEvdXRpbC9MaXN0O0wABWl0ZW1zcQB+AAZMAAlwYXJ0aXRpb250ACxMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTFBhcnRpdGlvbkJ5O0wACHNvcnRlZEJ5cQB+AAZMAAx0YWJsZU9wdGlvbnNxAH4ABkwAC3RhYmxlU291cmNldAA6TGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL2FzdC9zdGF0ZW1lbnQvU1FMRXhwclRhYmxlU291cmNlO3hyACxjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3QuU1FMU3RhdGVtZW50SW1wbEOxUUDVCJMGAgADWgAJYWZ0ZXJTZW1pTAAGZGJUeXBldAAcTGNvbS9hbGliYWJhL2Zhc3RzcWwvRGJUeXBlO0wACWhlYWRIaW50c3EAfgAGeHIAKWNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5TUUxPYmplY3RJbXBs5LvqLFggFVECAAVJAAxzb3VyY2VDb2x1bW5JAApzb3VyY2VMaW5lTAAKYXR0cmlidXRlc3QAD0xqYXZhL3V0aWwvTWFwO0wABGhpbnR0ACxMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTENvbW1lbnRIaW50O0wABnBhcmVudHQAJ0xjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3QvU1FMT2JqZWN0O3hwAAAAAAAAAABwcHAAfnIAGmNvbS5hbGliYWJhLmZhc3RzcWwuRGJUeXBlAAAAAAAAAAASAAB4cgAOamF2YS5sYW5nLkVudW0AAAAAAAAAABIAAHhwdAAFbXlzcWxwAAAAAAAAAAAAAAAAc3IAE2phdmEudXRpbC5BcnJheUxpc3R4gdIdmcdhnQMAAUkABHNpemV4cAAAAAB3BAAAAAB4c3EAfgAUAAAAAXcEAAAAAXNyADxjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3Quc3RhdGVtZW50LlNRTEFsdGVyVGFibGVBZGRDb2x1bW4l5T6CFe//BAIABloAB2Nhc2NhZGVaAAVmaXJzdEwAC2FmdGVyQ29sdW1udAAlTGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL2FzdC9TUUxOYW1lO0wAB2NvbHVtbnNxAH4ABkwAC2ZpcnN0Q29sdW1ucQB+ABhMAAhyZXN0cmljdHQAE0xqYXZhL2xhbmcvQm9vbGVhbjt4cQB+AAsAAAAAAAAAAHBwcQB+AA8AAHBzcQB+ABQAAAABdwQAAAABc3IAOWNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5zdGF0ZW1lbnQuU1FMQ29sdW1uRGVmaW5pdGlvbst0gLKZ0qAtAgAmWgANYXV0b0luY3JlbWVudFoADGRpc2FibGVJbmRleFoAB3ByZVNvcnRJAAxwcmVTb3J0T3JkZXJaAAZzdG9yZWRaAAd2aXJ0dWFsWgAHZmlzaWJsZUwACGFubkluZGV4dAApTGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL2FzdC9TUUxBbm5JbmRleDtMAAZhc0V4cHJ0ACVMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTEV4cHI7TAALY2hhcnNldEV4cHJxAH4AHkwADWNvbFByb3BlcnRpZXNxAH4ABkwAC2NvbGxhdGVFeHBycQB+AB5MAAdjb21tZW50cQB+AB5MAAtjb21wcmVzc2lvbnQALkxjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3QvZXhwci9TUUxDaGFyRXhwcjtMAAtjb25zdHJhaW50c3EAfgAGTAAIZGF0YVR5cGV0AClMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTERhdGFUeXBlO0wABmRiVHlwZXEAfgAKTAALZGVmYXVsdEV4cHJxAH4AHkwACWRlbGltaXRlcnEAfgAeTAASZGVsaW1pdGVyVG9rZW5pemVycQB+AB5MAAZlbmFibGVxAH4AGUwABmVuY29kZXEAfgAfTAAGZm9ybWF0cQB+AB5MABBnZW5lcmF0ZWRBbGF3c0FzcQB+AB5MAAhpZGVudGl0eXQARExjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3Qvc3RhdGVtZW50L1NRTENvbHVtbkRlZmluaXRpb24kSWRlbnRpdHk7TAASanNvbkluZGV4QXR0cnNFeHBycQB+AB5MAAhtYXBwZWRCeXEAfgAGTAAEbmFtZXEAfgAYTAAMbmxwVG9rZW5pemVycQB+AB5MAAhvblVwZGF0ZXEAfgAeTAAEcmVseXEAfgAZTAAMc2VxdWVuY2VUeXBldAAvTGNvbS5hbGliYWJhLmZhc3RzcWwvc3FsL2FzdC9BdXRvSW5jcmVtZW50VHlwZTtMAARzdGVwcQB+AB5MAAdzdG9yYWdlcQB+AB5MAAl1bml0Q291bnRxAH4AHkwACXVuaXRJbmRleHEAfgAeTAAIdmFsaWRhdGVxAH4AGUwACXZhbHVlVHlwZXEAfgAeeHEAfgALAAAAAAAAAABwcHEAfgAaAAAAAAAAAAAAAHBwcHBwcHBzcQB+ABQAAAAAdwQAAAAAeHNyADpjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3Quc3RhdGVtZW50LlNRTENoYXJhY3RlckRhdGFUeXBlqtJac/d+04cCAAVaAAloYXNCaW5hcnlMAAtjaGFyU2V0TmFtZXEAfgABTAAIY2hhclR5cGVxAH4AAUwAB2NvbGxhdGVxAH4AAUwABWhpbnRzcQB+AAZ4cgArY29tLmFsaWJhYmEuZmFzdHNxbC5zcWwuYXN0LlNRTERhdGFUeXBlSW1wbEWL29ic1gZFAgAJSgAObmFtZUhhc2hDb2RlNjRaAAh1bnNpZ25lZFoAEXdpdGhMb2NhbFRpbWVab25lWgAIemVyb2ZpbGxMAAlhcmd1bWVudHNxAH4ABkwABmRiVHlwZXEAfgAKTAAHaW5kZXhCeXEAfgAeTAAEbmFtZXEAfgABTAAMd2l0aFRpbWVab25lcQB+ABl4cQB+AAsAAAAAAAAAAHBwcQB+ACP6BPTvGZVAfgAAAHNxAH4AFAAAAAB3BAAAAAB4cHB0AAR0ZXh0cABwcHBwcQB+ABJwcHBwcHBwcHBwc3IAMmNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5leHByLlNRTElkZW50aWZpZXJFeHBy3DXH1zvWbgkCAARKAApoYXNoQ29kZTY0TAAEbmFtZXEAfgABTAAOcmVzb2x2ZWRDb2x1bW5xAH4ADkwAE3Jlc29sdmVkT3duZXJPYmplY3RxAH4ADnhyACdjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3QuU1FMRXhwckltcGxs2ypmFJxWrQIAAHhxAH4ACwAAAAAAAAAAcHBwQCnxzH5tIDl0AARob2xvcHBwcHBwcHBwcHBweHBweHBzcQB+ABQAAAAAdwQAAAAAeHNxAH4AFAAAAAB3BAAAAAB4c3IAOGNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5zdGF0ZW1lbnQuU1FMRXhwclRhYmxlU291cmNlRHD7eYJ4eswCAAVMAAdjb2x1bW5zcQB+AAZMAARleHBycQB+AB5MAApwYXJ0aXRpb25zcQB+AAZMAAhzYW1wbGluZ3QAOExjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3Qvc3RhdGVtZW50L1NRTFRhYmxlU2FtcGxpbmc7TAAMc2NoZW1hT2JqZWN0dAAxTGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL3JlcG9zaXRvcnkvU2NoZW1hT2JqZWN0O3hyADhjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3Quc3RhdGVtZW50LlNRTFRhYmxlU291cmNlSW1wbAqEMenTm5zUAgAESgAPYWxpYXNIYXNoQ29kZTY0TAAFYWxpYXNxAH4AAUwACWZsYXNoYmFja3EAfgAeTAAFaGludHNxAH4ABnhxAH4ACwAAAAAAAAAAcHBwAAAAAAAAAABwcHBwc3EAfgAqAAAAAAAAAABwcHEAfgA0NH7o4UvP9Dt0AAx0X3NoaXl1X25vcGtwcHBwcA=="
},
"timestamp": {
"eventTime": 1605342109000,
"systemTime": 1605342109259,
"checkpointTime": 1605342109000
}
},
"version": "0.0.1"
}