When you use Data Transmission Service (DTS) to migrate or synchronize data to a Kafka cluster, you can specify the format in which data is stored in the Kafka cluster. This topic describes the formats that you can use to store data in a Kafka cluster. You can parse data based on the definition of these data formats.

Data formats

DTS allows you to store data into a Kafka cluster in the following formats:

  • DTS Avro: a data serialization format into which data structures or objects can be converted to facilitate storage and transmission.
  • Shareplex Json: the format in which the data read from the source database by using the data replication software SharePlex is stored.
  • Canal Json: the format in which data is stored in a Kafka cluster after Canal parses the logs about the incremental data of the source database and transmits the incremental data to the Kafka cluster.

DTS Avro

DTS Avro is the default data format. By default, data that is migrated or synchronized to a Kafka cluster by using DTS is stored in the DTS Avro format. You must parse the data based on the schema definition of DTS Avro. For more information, visit GitHub.
Note In the DTS Avro format, the DDL statements are of the STRING type.

Shareplex Json

Table 1. The following table describes the parameters related to the SharePlex JSON format.
ParameterDescription
timeThe UTC time when the transaction in the database is committed. It is in the yyyy-MM-ddTHH:mm:ssZ format.
useridThe ID of the user who commits the transaction.
opThe operation type. Valid values: INSERT, UPDATE, DELETE, TRUNCATE, DROP COLUMN, UPDATE BEFORE, and UPDATE AFTER.
scnThe system change number (SCN) that identifies the version of the transaction that the database commits at a specific point in time. Each committed transaction is assigned a unique SCN.
rowidA relatively unique address value that is used to identify a record in the database.
transThe ID of the transaction.
seqThe sequence number of the operation in the transaction. It starts from 1.
sizeThe total number of operations in the transaction.
tableThe name of the table.
idxThe index of the operation in the transaction, in the seq/size format. For example, 1/11 indicates that the sequence number of the operation is 1 in the transaction that contains a total number of 11 operations.
posttimeThe time when the transaction is committed to the destination database.

Examples:

  • Data inserted
    {
        "meta": {
            "time": "2017-06-16T14:24:34", 
            "userid": 84,                                    
            "op": "ins",                                   
              "scn": "14589063118712",                  
              "rowid": "AAATGpAAIAAItcIAAA",      
            "trans": "7.0.411499",                 
            "seq": 1,                                          
            "size": 11,                                         
            "table": "CL_BIZ1.MIO_LOG",       
              "idx": "1/11",                                       
            "posttime": "2017-06-16T14:33:52"
        },
        "data": {
            "MIO_LOG_ID": "32539737"
         }
    }
  • Data updated
    {
        "meta": {
            "time": "2017-06-16T15:38:13",
            "userid": 84,
            "op": "upd",                             
            "table": "CL_BIZ1.MIO_LOG"
            ...
        },
        "data": {                                          
            "CNTR_NO": "1171201606"
        },
        "key": {                                            
            "MIO_LOG_ID": "32537893",
            "PLNMIO_REC_ID": "31557806",
            "POL_CODE": null,
            "CNTR_TYPE": null,
            "CNTR_NO": "1171201606syui26"
        }
    }
  • Data deleted
    {
        "meta": {
            "time": "2017-06-16T15:51:35",
            "userid": 84,
            "op": "del",                      
         },
        "data": {                                    
            "MIO_LOG_ID": "32539739",
            "PLNMIO_REC_ID": "31557806",
            "POL_CODE": null,
            "CNTR_TYPE": null,
            "CG_NO": null
         }
    }

Canal Json

Table 2. The following table describes the parameters related to the Canal JSON format.
ParameterDescription
databaseThe name of the database.
esThe time when the operation is performed on the database. The value is a 13-bit UNIX timestamp. Unit: milliseconds.
Note You can use a search engine to obtain a UNIX timestamp converter.
idThe serial number of the operation.
isDdlIndicates whether the operation is a DDL operation. Valid values:
  • true: The operation is a DDL operation.
  • false: The operation is not a DDL operation.
mysqlTypeThe data type of the field.
old and dataThe data before and after update.
Note For change tracking instances that are created before March 20, 2022, the value of old is the data after update and the value of data is the data before update. To keep consistent with the open source community, the value of data is the data after update and the value of old is the data before update for change tracking instances that are created or restarted as of March 20, 2022.
pkNamesThe name of the primary key.
sqlThe SQL statement.
sqlTypeThe data type of the field after conversion. For example, the type LONG is converted from UNSIGNED INTEGER and BIGDECIMAL from UNSIGNED LONG.
tableThe name of the table.
tsThe time when the operation is written to the destination database. The value is a 13-bit UNIX timestamp. Unit: milliseconds.
Note You can use a search engine to obtain a UNIX timestamp converter.
typeThe operation type. Valid values: DELETE, UPDATE, and INSERT.

Examples of data updated

Note For change tracking instances that are created before March 20, 2022, after the DELETE statements of the source table are synchronized, the value of old is data and the value of data is NULL. To keep consistent with the open source community, the value of data is data and the value of old is NULL for change tracking instances that are created or restarted as of March 20, 2022.
Change tracking instances that are created before March 20, 2022
{
    "old": [
        {
            "shipping_type": "aaa"
        }
    ], 
    "database": "dbname", 
    "es": 1600161894000, 
    "id": 58, 
    "isDdl": false, 
    "mysqlType": {
        "id": "bigint(20)", 
        "shipping_type": "varchar(50)"
    }, 
    "pkNames": [
        "id"
    ], 
    "sql": "", 
    "sqlType": {
        "id": -5, 
        "shipping_type": 12
    }, 
    "table": "tablename", 
    "ts": 1600161894771, 
    "type": "DELETE"
}
Change tracking instances that are created or restarted as of March 20, 2022
{
    "data": [
        {
            "id": "500000287", 
            "shipping_type": null
        }
    ], 
    "database": "dbname", 
    "es": 1600161894000, 
    "id": 58, 
    "isDdl": false, 
    "mysqlType": {
        "id": "bigint(20)", 
        "shipping_type": "varchar(50)"
    }, 
    "pkNames": [
        "id"
    ], 
    "sql": "", 
    "sqlType": {
        "id": -5, 
        "shipping_type": 12
    }, 
    "table": "tablename", 
    "ts": 1600161894771, 
    "type": "DELETE"
}
            

Example of a DDL operation

{  
    "database": "dbname",    The name of the source database.
    "es": 1600161894000,    The time when the data is written to the binary logs of the source database.
    "id": 58,      The offsets of DTS cache.
    "isDdl": true,       Indicates that the operation is a DDL operation.
    "sql": "eg: create xxx",      The DDL statements recorded in the binary logs.
    "table": "tablename",       The name of the table that is synchronized.
    "ts": 1600161894771,      The time when the data is written to the destination database by using DTS.
    "type": "DDL"                 
}