Data Transmission Service (DTS) lets you select a storage format for data that you synchronize or migrate to a message queue, such as Kafka or RocketMQ. This topic describes the available data formats to help you parse the data.
Data formats
DTS lets you store data written to a message queue in the following formats:
DTS Avro: A data serialization format used to convert data structures or objects for storage and transmission.
Shareplex Json: The data format used when the data replication software SharePlex reads data from a source database and writes the data to a message queue.
Canal Json: The data format used when Canal parses incremental database logs and transmits the incremental data to a message queue.
DTS Avro
You can parse the data based on the DTS Avro schema definition. For more information, see DTS Avro schema definition and DTS Avro deserialization example.
In the DTS Avro format, Data Definition Language (DDL) statements are of the string type.
Shareplex Json
Parameters
Parameter | Description |
| The time when the transaction is committed in the database. The format is yyyy-MM-ddTHH:mm:ssZ (UTC). |
| The ID of the user who commits the transaction. |
| The data operation type. Valid values: INSERT, UPDATE, DELETE, TRUNCATE, DROP COLUMN, UPDATE BEFORE, and UPDATE AFTER. |
| The System Change Number (SCN). It identifies the version of the transaction that the database commits at a specific point in time. Each committed transaction is assigned a unique SCN. |
| A relatively unique address value used to locate a record in the database. |
| The transaction ID. |
| The sequence number of the operation within the transaction. The sequence starts from 1. |
| The total number of operations in the transaction. |
| The table name. |
| The index of the operation within the transaction. The format is |
| The time when the transaction is committed to the destination database. |
Examples
Insert data
{
"meta": {
"time": "2017-06-16T14:24:34",
"userid": 84,
"op": "ins",
"scn": "14589063118712",
"rowid": "AAATGpAAIAAItcIAAA",
"trans": "7.0.411499",
"seq": 1,
"size": 11,
"table": "CL_BIZ1.MIO_LOG",
"idx": "1/11",
"posttime": "2017-06-16T14:33:52"
},
"data": {
"MIO_LOG_ID": "32539737"
}
}Update data
{
"meta": {
"time": "2017-06-16T15:38:13",
"userid": 84,
"op": "upd",
"table": "CL_BIZ1.MIO_LOG"
….
},
"data": {
"CNTR_NO": "1171201606"
},
"key": {
"MIO_LOG_ID": "32537893",
"PLNMIO_REC_ID": "31557806",
"POL_CODE": null,
"CNTR_TYPE": null,
"CNTR_NO": "1171201606syui26"
}
}Delete data
{
"meta": {
"time": "2017-06-16T15:51:35",
"userid": 84,
"op": "del",
},
"data": {
"MIO_LOG_ID": "32539739",
"PLNMIO_REC_ID": "31557806",
"POL_CODE": null,
"CNTR_TYPE": null,
"CG_NO": null
}
}Canal Json
Parameters
Parameter | Description |
| The database name. |
| The time when the operation is run in the source database. This is a 13-digit UNIX timestamp in milliseconds. Note
|
| The serial number of the operation. Note This value is generated from the timestamp and an internal DTS offset. It can help determine the order of operations. |
| Specifies whether the operation is a DDL operation.
|
| The data type of the field. Note Parameter information such as precision is not supported. |
| The data before or after the change. Note For synchronization or migration tasks created before March 20, 2022, old stores the data after the change and |
| The primary key name. |
| The SQL statement. |
| The converted field type. The value is the same as the value of dataTypeNumber. For more information, see Mappings between field types and dataTypeNumber values. |
| The table name. |
| The time when the operation starts to be written to the destination database. This is a 13-digit UNIX timestamp in milliseconds. Note You can use a search engine to find a UNIX timestamp converter. |
| The operation type, such as DELETE, UPDATE, or INSERT. Note During the full task phase, the type is fixed as INIT. |
| The Global Transaction Identifier (GTID). A GTID is globally unique. Each transaction corresponds to one GTID. |
Examples
Update data
For synchronization or migration tasks created before March 20, 2022, when a DELETE statement from the source table is synchronized or migrated to Kafka, old contains the data and data is null. To align with the open source community, this behavior was changed for tasks created or restarted on or after March 20, 2022. For these tasks, data contains the data and old is null.
Synchronization or migration tasks created before March 20, 2022
{
"old": [
{
"shipping_type": "aaa"
}
],
"database": "dbname",
"es": 1600161894000,
"id": 58,
"isDdl": false,
"mysqlType": {
"id": "bigint",
"shipping_type": "varchar"
},
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": -5,
"shipping_type": 12
},
"table": "tablename",
"ts": 1600161894771,
"type": "DELETE"
}Synchronization or migration tasks created or restarted on or after March 20, 2022
{
"data": [
{
"id": "500000287",
"shipping_type": null
}
],
"database": "dbname",
"es": 1600161894000,
"id": 58,
"isDdl": false,
"mysqlType": {
"id": "bigint",
"shipping_type": "varchar"
},
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": -5,
"shipping_type": 12
},
"table": "tablename",
"ts": 1600161894771,
"type": "DELETE"
}
DDL operation
{
"database":"dbname", // The name of the database for synchronization or migration.
"es":1600161894000, // The time when data from the source database is written to the binary log.
"id":58, // The offset in the DTS cache.
"isDdl":true, // Specifies whether to synchronize or migrate DDL operations.
"sql":"eg:createxxx", // The DDL statement from the binary log.
"table":"tablename", // The name of the table for synchronization or migration.
"ts":1600161894771, // The time when DTS writes data to the destination.
"type":"DDL"
}