This appendix describes the supported operations, sharding strategies, data formats, and message examples for different DataHub data types.
Supported operations for different data types
A topic is the basic unit for data subscription and publication in DataHub, representing a collection of streaming data. DataHub supports two data types: TUPLE and BLOB.
DataHub type | Write DML messages | Write upstream heartbeat messages | Write DDL messages | Source to topic mapping | Type |
TUPLE | Supported | Not supported | Not supported | One table to one topic | DataHub-supported types |
BLOB | Supported | Supported | Supported | One database (multiple tables) to one topic | BLOB binary data |
TUPLE topics have a fixed schema that you cannot alter after creation. They are suitable for scenarios where the source table schema is stable and does not involve DDL operations like
ADD COLUMNorDROP COLUMN. TUPLE topics do not support forwarding DDL messages or heartbeat messages from the source to downstream consumers. This one-to-one mapping can be inconvenient for downstream consumption if you have many source tables, as you must create a corresponding topic for each.BLOB topics do not have a predefined schema and store only raw binary data, making them more flexible. They support forwarding DDL messages and heartbeat messages from the source to downstream consumers. They map multiple tables from a single database to one topic. This allows you to use one topic for all source tables, simplifying downstream consumption. This type is ideal for scenarios where DataHub acts as an intermediate message queue for full-database migration.
Sharding strategies for different data types
A shard is a concurrent channel for data transmission within a DataHub topic. While a single shard has a limited write throughput, you can use multiple shards to increase it. DataHub guarantees in-order consumption only within a single shard, not across multiple shards. To improve write performance with multiple shards while maintaining message order and preventing data skew, DataHub offers the following sharding strategies for TUPLE and BLOB data types.
Scenario | TUPLE | BLOB |
With a primary key (including custom primary keys) | Shard by primary key | Shard by primary key |
Ordering guarantee | Messages with the same primary key are processed in order | Messages with the same primary key are processed in order |
Without a primary key | Random sharding | Shard by table name |
Ordering guarantee | Ordering is not guaranteed | Messages from the same table are processed in order |
Data formats
TUPLE
The TUPLE format uses data types natively supported by DataHub. Data Integration adds several metadata columns when you create a topic. The schema for the TUPLE format contains both metadata columns and business data columns. Fields that start with an underscore, such as
_sequence_id_and_operation_type_, are metadata columns. The remaining fields are business data columns. The metadata columns include_sequence_id_,_excute_time_,_source_table_,_before_image_, and_after_image_.Parameter
Description
_sequence_id_
A unique message ID of the STRING type, consisting of digits. The UPDATE_BEFOR and UPDATE_AFTER operations for the same update share a sequence ID.
_excute_time_
The time when the data was generated.
_source_table_
The name of the source table.
_before_image_
The pre-image. The value is
Yfor an UPDATE_BEFOR or DELETE operation, andNfor an UPDATE_AFTER or INSERT operation._after_image_
The post-image. The value is
Nfor an UPDATE_BEFOR or DELETE operation, andYfor an UPDATE_AFTER or INSERT operation.Example: The following table shows the data synchronized to DataHub after executing INSERT, UPDATE, and DELETE statements.
_sequence_id_
_operation_type_
_excute_time_
_before_image_
_after_image_
1649991610688000000
I
1649991726000
N
Y
1649991610688000001
U
1649991756000
Y
N
1649991610688000001
U
1649991756000
N
Y
1649991610688000002
D
1649991774000
Y
N
BLOB
A BLOB message is binary data created by converting a JSON string. The corresponding JSON format is as follows:
{ "schema": { // Metadata about the change, specifying only column names and types. "dataColumn": [ // Information about the changed data columns, used to update records in the destination table. { "name": "id", "type": "LONG" }, { "name": "name", "type": "STRING" }, { "name": "binData", "type": "BYTES" }, { "name": "ts", "type": "DATE" } ], "primaryKey": [ "pkName1", "pkName2" ], "source": { "dbType": "mysql", "dbVersion": "1.0.0", "dbName": "myDatabase", "schemaName": "mySchema", "tableName": "tableName" } }, "payload": { "before": { "dataColumn":{ "id": 111, "name":"scooter", "binData": "[base64 string]", "ts": 1590315269000 } }, "after": { "dataColumn":{ "id": 222, "name":"donald", "binData": "[base64 string]", "ts": 1590315269000 } }, "sequenceId":XXX, // A string used for data ordering when merging full and incremental data. "op": "INSERT/UPDATE/DELETE/TRANSACTION_BEGIN/TRANSACTION_END/CREATE/ALTER/ERASE/QUERY/TRUNCATE/RENAME/CINDEX/DINDEX/GTID/XACOMMIT/XAROLLBACK/MHEARTBEAT...", // Case-sensitive. "timestamp": { "eventTime": 1, // Required. The time of the record change. A 13-digit timestamp with millisecond precision. "systemTime": 2, // Optional. Exists for some data sources like Oracle CDC. "checkpointTime": 3 // Optional. Included for some data sources like OceanBase. }, "ddl": { "text": "ADD COLUMN ...", "ddlMeta": "[SQLStatement serialized binary, expressed in base64 string]" } }, "version":"1.0.0" }BLOB fields
ImportantThe data types for all fields in the message are defined by StreamX and include
BOOLEAN,DOUBLE,DATE,BYTES,LONG, andSTRING.BOOLEAN: The value is `true` or `false`. DATE: The value is a 13-digit integer representing a timestamp with millisecond precision. BYTES: Stores byte arrays as a Base64-encoded string. Use the `java.util.Base64` APIs for Base64 encoding and decoding: String text = "text123"; // Encode Base64.getEncoder().encodeToString(text.getBytes("UTF-8")) // Decode Base64.getDecoder().decode(encodedText)Top-level element
Second-level element
Description
schema
dataColumn
A JSONArray that contains the type information for data columns.dataColumn records all columns and their types in an upstream data change record. A change operation can be a data modification (such as insert, delete, or update) or a table structure modification.
name: The column name.
type: The column data type.
primaryKey
A list of strings that represent the primary key column names.
pk: The primary key name.
source
An object that contains information about the source database or table.
dbType: A string representing the database type.
dbVersion: A string representing the database version.
dbName: A string representing the database name.
schemaName: A string representing the schema name, required for databases like PostgreSQL and SQL Server.
tableName: A string representing the table name.
payload
before
A JSONObject that contains the pre-image of the data. For an
UPDATEoperation on a MySQL source, thebeforefield stores the record's content before the update.This field is populated when an update or delete message is read from the source.
dataColumn: A parameter of the JSONObject type that represents data information. The format is column name: column value. The column name is a string, and the column value depends on its data type: values of the BYTES type are represented as Base64 strings, values of the DATE type are represented as 13-digit timestamps of the long type, and values of other types are represented by their native types.
after
The post-image of the data. The format is the same as the
beforefield.NoteThis field is required for
UPDATEandINSERToperations.op
The operation type. Valid values:
INSERT: A data insertion.
UPDATE_BEFOR: The pre-image of an update.
UPDATE_AFTER: The post-image of an update.
DELETE: A data deletion.
TRANSACTION_BEGIN: The start of a database transaction.
TRANSACTION_END: The end of a database transaction.
CREATE: A table creation.
ALTER: A table alteration.
QUERY: The original SQL for the database change.
TRUNCATE: A table truncation.
RENAME: A table rename.
CINDEX: An index creation.
DINDEX: An index deletion.
MHEARTBEAT: A heartbeat message that indicates the synchronization task is running normally even when there is no new data from the source.
timestamp
A JSONObject that contains timestamps related to this data record.
eventTime: A Long value that represents the time when the change occurred in the source database. It is a 13-digit timestamp with millisecond precision.
systemTime: A Long value that represents the time when the synchronization task processed this change message. It is a 13-digit timestamp with millisecond precision.
checkpointTime: A Long value representing the time used to reset the synchronization offset. This value is usually the same as
eventTime. It is a 13-digit timestamp with millisecond precision.
ddl
This field is populated only for DDL operations that change the table structure. For DML operations such as data insertion, deletion, and modification, theddl field is null.
text: A String that contains the database DDL statement text.
ddlMeta: A String that contains the binary representation of a SQLStatement object, encoded in Base64. The SQLStatement object is generated by parsing the DDL statement with FastSQL.
If you enable DDL support, the system passes the serialized SQLStatement object. The downstream component can then deserialize this object to reconstruct the DDL statement for the destination data source and apply the change.
version
N/A
The version number of the format.
BLOB serialization
In this JSON format, each message corresponds to a single JSONObject. The structure of this JSONObject, which can include nested objects and arrays, defines the message format.
To serialize the message, convert the JSONObject to a string (for example, by using the
toJSONStringmethod from fastjson) and then convert the string to a byte array using theString.getBytes(Charsets.UTF_8)method.
Sample JSON messages
Insert:
{ "schema": { "dataColumn": [ { "name": "id", "type": "LONG" }, { "name": "name", "type": "STRING" }, { "name": "comment", "type": "STRING" } ], "source": { "dbName": "example_db", "dbType": "MySQL", "tableName": "example_table_pk" }, "primaryKey": [ "id", "name" ] }, "payload": { "op": "INSERT", "after": { "dataColumn": { "name": "joe", "comment": "comment", "id": 1 } }, "sequenceId": "1605339516000000004", "timestamp": { "eventTime": 1605339932000, "systemTime": 1605339932736, "checkpointTime": 1605339932000 } }, "version": "0.0.1" }Update before:
{ "schema": { "dataColumn": [ { "name": "id", "type": "LONG" }, { "name": "name", "type": "STRING" }, { "name": "comment", "type": "STRING" } ], "source": { "dbName": "example_db", "dbType": "MySQL", "tableName": "example_table_pk" }, "primaryKey": [ "id", "name" ] }, "payload": { "op": "UPDATE_BEFOR", "before": { "dataColumn": { "name": "joe", "comment": "comment", "id": 1 } }, "sequenceId": "1605339516000000005", "timestamp": { "eventTime": 1605339934000, "systemTime": 1605339934951, "checkpointTime": 1605339934000 } }, "version": "0.0.1" }Update after:
{ "schema": { "dataColumn": [ { "name": "id", "type": "LONG" }, { "name": "name", "type": "STRING" }, { "name": "comment", "type": "STRING" } ], "source": { "dbName": "example_db", "dbType": "MySQL", "tableName": "example_table_pk" }, "primaryKey": [ "id", "name" ] }, "payload": { "op": "UPDATE_AFTER", "after": { "dataColumn": { "name": "joe", "comment": "com1", "id": 1 } }, "sequenceId": "1605339516000000005", "timestamp": { "eventTime": 1605339934000, "systemTime": 1605339934951, "checkpointTime": 1605339934000 } }, "version": "0.0.1" }Delete:
{ "schema": { "dataColumn": [ { "name": "id", "type": "LONG" }, { "name": "name", "type": "STRING" }, { "name": "comment", "type": "STRING" } ], "source": { "dbName": "example_db", "dbType": "MySQL", "tableName": "example_table_pk" }, "primaryKey": [ "id", "name" ] }, "payload": { "op": "DELETE", "before": { "dataColumn": { "name": "joe", "comment": "com1", "id": 1 } }, "sequenceId": "1605339516000000006", "timestamp": { "eventTime": 1605339937000, "systemTime": 1605339937671, "checkpointTime": 1605339937000 } }, "version": "0.0.1" }Heartbeat:
{ "schema": {}, "payload": { "op": "MHEARTBEAT", "timestamp": { "eventTime": 1605339953629, "checkpointTime": 1605339953629 } }, "version": "0.0.1" }DDL:
{ "schema": { "source": { "dbName": "example_db", "dbType": "MySQL", "tableName": "example_table_nopk" } }, "payload": { "op": "ALTER", "sequenceId": "1605339516000000035", "ddl": { "text": "alter table example_table_nopk add column holo text", "ddlMeta": "rO0ABXNyACljb20uYWxpYmFiYS5kaS5wbHVnaW4uY2VudGVyLm1ldGEuRERMTWV0YQLb5Cx/YWXtAgACTAAHZGRsVGV4dHQAEkxqYXZhL2xhbmcvU3RyaW5nO0wACXN0YXRlbWVudHQAKkxjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3QvU1FMU3RhdGVtZW50O3hwdAAtYWx0ZXIgdGFibGUgdF9zaGl5dV9ub3BrIGFkZCBjb2x1bW4gaG9sbyB0ZXh0c3IAPGNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5zdGF0ZW1lbnQuU1FMQWx0ZXJUYWJsZVN0YXRlbWVudBQPP3vMUl2cAgAPSQAHYnVja2V0c1oABmlnbm9yZVoAF2ludmFsaWRhdGVHbG9iYWxJbmRleGVzWgAPbWVyZ2VTbWFsbEZpbGVzWgAHb2ZmbGluZVoABm9ubGluZVoADnJlbW92ZVBhdGl0aW5nWgATdXBkYXRlR2xvYmFsSW5kZXhlc1oAD3VwZ3JhZGVQYXRpdGluZ0wAC2NsdXN0ZXJlZEJ5dAAQTGphdmEvdXRpbC9MaXN0O0wABWl0ZW1zcQB+AAZMAAlwYXJ0aXRpb250ACxMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTFBhcnRpdGlvbkJ5O0wACHNvcnRlZEJ5cQB+AAZMAAx0YWJsZU9wdGlvbnNxAH4ABkwAC3RhYmxlU291cmNldAA6TGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL2FzdC9zdGF0ZW1lbnQvU1FMRXhwclRhYmxlU291cmNlO3hyACxjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3QuU1FMU3RhdGVtZW50SW1wbEOxUUDVCJMGAgADWgAJYWZ0ZXJTZW1pTAAGZGJUeXBldAAcTGNvbS9hbGliYWJhL2Zhc3RzcWwvRGJUeXBlO0wACWhlYWRIaW50c3EAfgAGeHIAKWNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5TUUxPYmplY3RJbXBs5LvqLFggFVECAAVJAAxzb3VyY2VDb2x1bW5JAApzb3VyY2VMaW5lTAAKYXR0cmlidXRlc3QAD0xqYXZhL3V0aWwvTWFwO0wABGhpbnR0ACxMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTENvbW1lbnRIaW50O0wABnBhcmVudHQAJ0xjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3QvU1FMT2JqZWN0O3hwAAAAAAAAAABwcHAAfnIAGmNvbS5hbGliYWJhLmZhc3RzcWwuRGJUeXBlAAAAAAAAAAASAAB4cgAOamF2YS5sYW5nLkVudW0AAAAAAAAAABIAAHhwdAAFbXlzcWxwAAAAAAAAAAAAAAAAc3IAE2phdmEudXRpbC5BcnJheUxpc3R4gdIdmcdhnQMAAUkABHNpemV4cAAAAAB3BAAAAAB4c3EAfgAUAAAAAXcEAAAAAXNyADxjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3Quc3RhdGVtZW50LlNRTEFsdGVyVGFibGVBZGRDb2x1bW4l5T6CFe//BAIABloAB2Nhc2NhZGVaAAVmaXJzdEwAC2FmdGVyQ29sdW1udAAlTGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL2FzdC9TUUxOYW1lO0wAB2NvbHVtbnNxAH4ABkwAC2ZpcnN0Q29sdW1ucQB+ABhMAAhyZXN0cmljdHQAE0xqYXZhL2xhbmcvQm9vbGVhbjt4cQB+AAsAAAAAAAAAAHBwcQB+AA8AAHBzcQB+ABQAAAABdwQAAAABc3IAOWNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5zdGF0ZW1lbnQuU1FMQ29sdW1uRGVmaW5pdGlvbst0gLKZ0qAtAgAmWgANYXV0b0luY3JlbWVudFoADGRpc2FibGVJbmRleFoAB3ByZVNvcnRJAAxwcmVTb3J0T3JkZXJaAAZzdG9yZWRaAAd2aXJ0dWFsWgAHdmlzaWJsZUwACGFubkluZGV4dAApTGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL2FzdC9TUUxBbm5JbmRleDtMAAZhc0V4cHJ0ACVMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTEV4cHI7TAALY2hhcnNldEV4cHJxAH4AHkwADWNvbFByb3BlcnRpZXNxAH4ABkwAC2NvbGxhdGVFeHBycQB+AB5MAAdjb21tZW50cQB+AB5MAAtjb21wcmVzc2lvbnQALkxjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3QvZXhwci9TUUxDaGFyRXhwcjtMAAtjb25zdHJhaW50c3EAfgAGTAAIZGF0YVR5cGV0AClMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTERhdGFUeXBlO0wABmRiVHlwZXEAfgAKTAALZGVmYXVsdEV4cHJxAH4AHkwACWRlbGltaXRlcnEAfgAeTAASZGVsaW1pdGVyVG9rZW5pemVycQB+AB5MAAZlbmFibGVxAH4AGUwABmVuY29kZXEAfgAfTAAGZm9ybWF0cQB+AB5MABBnZW5lcmF0ZWRBbGF3c0FzcQB+AB5MAAhpZGVudGl0eXQARExjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3Qvc3RhdGVtZW50L1NRTENvbHVtbkRlZmluaXRpb24kSWRlbnRpdHk7TAASanNvbkluZGV4QXR0cnNFeHBycQB+AB5MAAhtYXBwZWRCeXEAfgAGTAAEbmFtZXEAfgAYTAAMbmxwVG9rZW5pemVycQB+AB5MAAhvblVwZGF0ZXEAfgAeTAAEcmVseXEAfgAZTAAMc2VxdWVuY2VUeXBldAAvTGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL2FzdC9BdXRvSW5jcmVtZW50VHlwZTtMAARzdGVwcQB+AB5MAAdzdG9yYWdlcQB+AB5MAAl1bml0Q291bnRxAH4AHkwACXVuaXRJbmRleHEAfgAeTAAIdmFsaWRhdGVxAH4AGUwACXZhbHVlVHlwZXEAfgAeeHEAfgALAAAAAAAAAABwcHEAfgAaAAAAAAAAAAAAAHBwcHBwcHBzcQB+ABQAAAAAdwQAAAAAeHNyADpjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3Quc3RhdGVtZW50LlNRTENoYXJhY3RlckRhdGFUeXBlqtJac/d+04cCAAVaAAloYXNCaW5hcnlMAAtjaGFyU2V0TmFtZXEAfgABTAAIY2hhclR5cGVxAH4AAUwAB2NvbGxhdGVxAH4AAUwABWhpbnRzcQB+AAZ4cgArY29tLmFsaWJhYmEuZmFzdHNxbC5zcWwuYXN0LlNRTERhdGFUeXBlSW1wbEWL29pc1gZFAgAJSgAObmFtZUhhc2hDb2RlNjRaAAh1bnNpZ25lZFoAEXdpdGhMb2NhbFRpbWVab25lWgAIemVyb2ZpbGxMAAlhcmd1bWVudHNxAH4ABkwABmRiVHlwZXEAfgAKTAAHaW5kZXhCeXEAfgAeTAAEbmFtZXEAfgABTAAMd2l0aFRpbWVab25lcQB+ABl4cQB+AAsAAAAAAAAAAHBwcQB+ACP6BPTvGZVAfgAAAHNxAH4AFAAAAAB3BAAAAAB4cHB0AAR0ZXh0cABwcHBwcQB+ABJwcHBwcHBwcHBwc3IAMmNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5leHByLlNRTElkZW50aWZpZXJFeHBy3DXH1zvWbgkCAARKAApoYXNoQ29kZTY0TAAEbmFtZXEAfgABTAAOcmVzb2x2ZWRDb2x1bW5xAH4ADkwAE3Jlc29sdmVkT3duZXJPYmplY3RxAH4ADnhyACdjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3QuU1FMRXhwckltcGxs2ypmFJxWrQIAAHhxAH4ACwAAAAAAAAAAcHBwQCnxzH5tIDl0AARob2xvcHBwcHBwcHBwcHBweHBweHBzcQB+ABQAAAAAdwQAAAAAeHNxAH4AFAAAAAB3BAAAAAB4c3IAOGNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5zdGF0ZW1lbnQuU1FMRXhwclRhYmxlU291cmNlRHD7eYJ4eswCAAVMAAdjb2x1bW5zcQB+AAZMAARleHBycQB+AB5MAApwYXJ0aXRpb25zcQB+AAZMAAhzYW1wbGluZ3QAOExjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3Qvc3RhdGVtZW50L1NRTFRhYmxlU2FtcGxpbmc7TAAMc2NoZW1hT2JqZWN0dAAxTGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL3JlcG9zaXRvcnkvU2NoZW1hT2JqZWN0O3hyADhjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3Quc3RhdGVtZW50LlNRTFRhYmxlU291cmNlSW1wbAqEMenTm5zUAgAESgAPYWxpYXNIYXNoQ29kZTY0TAAFYWxpYXNxAH4AAUwACWZsYXNoYmFja3EAfgAeTAAFaGludHNxAH4ABnhxAH4ACwAAAAAAAAAAcHBwAAAAAAAAAABwcHBwc3EAfgAqAAAAAAAAAABwcHEAfgA0NH7o4UvP9Dt0AAx0X3NoaXl1X25vcGtwcHBwcA==" }, "timestamp": { "eventTime": 1605342109000, "systemTime": 1605342109259, "checkpointTime": 1605342109000 } }, "version": "0.0.1" }