All Products
Search
Document Center

DataWorks:Real-time synchronization from a Hologres table to Kafka

Last Updated:Oct 28, 2025

Data Integration supports real-time synchronization of data from a single table in data sources such as DataHub and Hologres to Kafka. A real-time ETL synchronization task initializes a topic in Kafka based on the schema of the source Hologres table and synchronizes data from the Hologres table to Kafka in real time for consumption. This topic describes how to configure real-time synchronization from a single Hologres table to Kafka.

Limits

  • The version of the Kafka data source must range from 0.10.2 to 3.6.0.

  • The version of the Hologres data source must be V2.1 or later.

  • Incremental synchronization of data from a Hologres partitioned table is not supported.

  • Messages for DDL changes on a Hologres table cannot be synchronized.

  • Incremental data of the following data types can be synchronized from Hologres: INTEGER, BIGINT, TEXT, CHAR(n), VARCHAR(n), REAL, JSON, SERIAL, OID, INT4[], INT8[], FLOAT8[], BOOLEAN[], TEXT[], and JSONB.

  • You must enable binary logging for the Hologres table in the source Hologres database. For more information, see Subscribe to Hologres binary logs.

Prerequisites

Procedure

1. Select a synchronization task type

  1. Go to the Data Integration page.

    Log on to the DataWorks console. In the top navigation bar, select the desired region. In the left-side navigation pane, choose Data Integration > Data Integration. On the page that appears, select the desired workspace from the drop-down list and click Go to Data Integration.

  2. In the left-side navigation pane, click Synchronization Task. Then, click Create Synchronization Task at the top of the page to go to the page for creating a synchronization task. Configure the following basic information:

    • Data Source And Destination: HologresKafka

    • New Task Name: Customize a name for the synchronization task.

    • Synchronization Type: Single Table Real-time.

    • Synchronization Step: Select Full Synchronization.

2. Configure network and resources

  1. In the Network And Resource Configuration section, select a Resource Group for the synchronization task. You can allocate Task Resource Usage in CUs for the task.

  2. For Source Data Source, select the added Hologres data source. For Destination Data Source, select the added Kafka data source. Then, click Test Connectivity.image

  3. After you make sure that both the source and destination data sources are connected, click Next.

3. Configure the synchronization link

a. Configure the Hologres source

At the top of the page, click the Hologres data source and edit Holo Source Information.

image

  1. In the Holo Source Information section, select the schema that contains the Hologres table from which you want to read data and the source table.

  2. Click Data Sampling in the upper-right corner.

    In the Data Output Preview dialog box, specify Number Of Samples and click Start Collection. You can sample data from the specified Hologres table to preview the data in the Hologres table. This provides input for data preview and visual configuration in subsequent data processing nodes.

b. Configure the Kafka destination

At the top of the page, click the Kafka destination and edit Kafka Destination Information.

image

  1. In the Kafka Destination Information section, select the Kafka topic to which you want to write data.

  2. Set Merge Source Binlog Update Messages as needed. If you enable this option, the two update messages that correspond to an update operation in the source binary logs are merged into one message before they are written to Kafka.

  3. Set Output Format, Key Column, and Kafka Producer Parameters.

    • Output Format: Confirm the format of the value content in records that are written to Kafka. Valid values: Canal CDC and JSON. For more information, see Appendix: Description of output formats.

    • Key Column: Select source columns. The values of the selected columns are serialized into strings and concatenated with commas to form the key of records that are written to the Kafka topic.

      Note
      • The serialization rules for column values are the same as the JSON serialization rules for column data types in Hologres.

      • The key values in the Kafka topic determine the partitions to which data is written. Data with the same key value is written to the same partition. To ensure that a consumer can consume data in the Kafka topic in sequence, we recommend that you use the primary key columns of the Hologres table as the key columns.

      • If no source column is used as the key column, the key values in the Kafka topic are null. In this case, data is written to random partitions in the Kafka topic.

    • Kafka Producer Parameters: These parameters affect the consistency, stability, and exception handling behavior of write operations. In most cases, you can use the default configurations. If you have custom requirements, you can specify specific parameters. For information about the producer parameters that are supported by different versions of Kafka, see the Kafka documentation.

4. Configure alert rules

To prevent the failure of the synchronization task from causing latency on business data synchronization, you can configure different alert rules for the synchronization task.

  1. In the upper-right corner of the page, click Configure Alert Rule to go to the Configure Alert Rule panel.

  2. In the Configure Alert Rule panel, click Add Alert Rule. In the Add Alert Rule dialog box, configure the parameters to configure an alert rule.

    Note

    The alert rules that you configure in this step take effect for the real-time synchronization subtask that will be generated by the synchronization task. After the configuration of the synchronization task is complete, you can refer to Manage real-time synchronization tasks to go to the Real-time Synchronization Task page and modify alert rules configured for the real-time synchronization subtask.

  3. Manage alert rules.

    You can enable or disable alert rules that are created. You can also specify different alert recipients based on the severity levels of alerts.

5. Configure advanced parameters

DataWorks allows you to modify the configurations of specific parameters. You can change the values of these parameters based on your business requirements.

Note

To prevent unexpected errors or data quality issues, we recommend that you understand the meanings of the parameters before you change the values of the parameters.

  1. In the upper-right corner of the configuration page, click Configure Advanced Parameters.

  2. In the Configure Advanced Parameters panel, change the values of the desired parameters.

6. Configure resource groups

You can click Configure Resource Group in the upper-right corner of the page to view and change the resource groups that are used to run the current synchronization task.

7. Execute the synchronization task

  1. After the configuration of the synchronization task is complete, click Complete in the lower part of the page.

  2. In the Tasks section of the Synchronization Task page, find the created synchronization task and click Start in the Operation column.

  3. Click the name or ID of the synchronization task in the Tasks section and view the detailed running process of the synchronization task.

Perform O&M operations on the synchronization task

View the status of the synchronization task

After the data synchronization solution is created, you can go to the Tasks page to view all data synchronization solutions that are created in the workspace and the basic information of each data synchronization solution.

image

  • You can Start or Stop a synchronization task in the Actions column. You can also Edit or View a synchronization task from the More drop-down list.

  • For a started task, you can view the basic status of the task in Execution Overview. You can also click the corresponding overview area to view execution details.

image

A real-time synchronization task from a Hologres table to Kafka consists of the following three steps:

  • Structure Migration: includes the creation method of the destination table (existing table or automatic table creation). If you select automatic table creation, the data definition language (DDL) statement for creating the table is displayed.

  • Full Initialization: If you select Full Synchronization for Synchronization Step of your task, the progress of full initialization is displayed here.

  • Real-time Data Synchronization: includes statistics information about real-time synchronization, such as real-time read and write traffic, dirty data, failover, and operation logs.

Rerun the synchronization task

In some special cases, if you want to modify the fields to synchronize, the fields in a destination table, or table name information, you can also click Rerun in the Operation column of the desired synchronization task. This way, the system synchronizes the changes that are made to the destination. Data in the tables that are already synchronized and are not modified will not be synchronized again.

  • Directly click Rerun without modifying the configurations of the synchronization task to enable the system to rerun the synchronization task.

  • Modify the configurations of the synchronization task and then click Complete. Click Apply Updates that is displayed in the Operation column of the synchronization task to rerun the synchronization task for the latest configurations to take effect.

Appendix: Description of output formats

Canal CDC

Canal CDC is a CDC data format defined by Alibaba Canal.

  • Fields and meanings

    Field name

    Field value meaning

    id

    The value of this field is fixed as 0.

    database

    The name of the Hologres database.

    table

    The name of the Hologres table.

    pkNames

    The primary key column of the Hologres table.

    isDdl

    Specifies whether binary logs record DDL changes. The value of this field is fixed as false because synchronization of messages for DDL changes on the Hologres table is not supported.

    type

    The type of the DML change. Valid values: INSERT, UPDATE, and DELETE.

    Note

    One change operation on the Hologres table generates two records whose type is UPDATE. The records will be written to the Kafka topic.

    • One of the records corresponds to the data content before the change.

    • The other corresponds to the data content after the change.

    • If you want to synchronize full data from the Hologres table to the Kafka topic, the value of the type field is fixed as INSERT.

    es

    A 13-digit timestamp in milliseconds. The timestamp indicates the time when the data in the Hologres table is changed.

    If you want to synchronize full data from the Hologres table to the Kafka topic, the value of the es field is fixed as 0.

    ts

    A 13-digit timestamp in milliseconds. The timestamp indicates the time when the binary logs generated for the Hologres table are read by the synchronization task.

    sql

    The SQL code that records DDL changes when the binary logs generated for the Hologres table contain the DDL changes. The value of this field is fixed as an empty string because synchronization of messages for DDL changes on the Hologres table is not supported.

    sqlType

    The SQL field data types to which the data types of Hologres table fields correspond.

    Mappings between Hologres data types and valid values of sqlType:

    • bigint: -5

    • decimal whose scale is not 0: 3

    • decimal whose scale is 0: -5

    • boolean: 16

    • date: 91

    • float4: 6

    • float8: 8

    • integer: 4

    • smallint: 5

    • json: 12

    • text: 12

    • varchar: 12

    • timestamp: 93

    • timestamptz: 93

    • bigserial: -5

    • bytea: 12

    • char: 12

    • serial: 4

    • time: 92

    • int4[]: 12

    • int8[]: 12

    • float4[]: 12

    • float8[]: 12

    • boolean[]: 12

    • text[]: 12

    mysqlType

    The MySQL field data types to which the data types of Hologres table fields correspond.

    Mappings between Hologres data types and valid values of mysqlType:

    • bigint: BIGINT

    • int4: INT

    • decimal whose scale is not 0: DECIMAL(xx,xx)

    • decimal whose scale is 0: BIGINT

    • boolean: BOOLEAN

    • date: DATE

    • float4: FLOAT

    • float8: DOUBLE

    • integer: INT

    • smallint: SMALLINT

    • json: TEXT

    • text: TEXT

    • varchar: VARCHAR(xx)

    • timestamp: DATETIME(6)

    • timestamptz: DATETIME(6)

    • bigserial: BIGINT

    • bytea: TEXT

    • char: TEXT

    • serial: INT

    • time: TIME(6)

    • int4[]: TEXT

    • int8[]: TEXT

    • float4[]: TEXT

    • float8[]: TEXT

    • boolean[]: TEXT

    • text[]: TEXT

    data

    The data changes in the Hologres table. The field names in the Hologres table are used as keys, and the data changes in the fields are serialized into strings and used as values. Then, the keys and values are organized as JSON-formatted strings. For more information about serialization, see Description of JSON serialization.

    old

    Two records whose type is UPDATE are generated for one change operation on the Hologres table and written to the Kafka topic.

    The records correspond to the data content before and after the change. In the first record, the old field is used to record data content in the Hologres table before the change. The data field is used to record data content for DML changes other than UPDATE.

  • Example of Canal JSON-formatted data that corresponds to data changes generated by an INSERT operation in binary logs of the Hologres table

    {
        "id": 0,
        "database": "test",
        "table": "tp_int",
        "pkNames": [
            "id"
        ],
        "isDdl": false,
        "type": "INSERT",
        "es": 1640007049196,
        "ts": 1639633142960,
        "sql": "",
        "sqlType": {
            "bigint": -5,
            "integer": 4,
            "smallint": 5
        },
        "mysqlType": {
            "bigint": "BIGINT",
            "integer": "INT",
            "smallint": "SMALLINT"
        },
        "data": [
            {
                "bigint": "9223372036854775807",
                "integer": "2147483647",
                "smallint": "32767"
            }
        ],
        "old": null
    }
  • Example of Canal JSON-formatted data that corresponds to full data that is synchronized from the Hologres table

    {
        "id": 0,
        "database": "test",
        "table": "tp_int",
        "pkNames": [
            "id"
        ],
        "isDdl": false,
        "type": "INSERT",
        "es": 0,
        "ts": 1639633142960,
        "sql": "",
        "sqlType": {
            "bigint": -5,
            "integer": 4,
            "smallint": 5
        },
        "mysqlType": {
            "bigint": "BIGINT",
            "integer": "INT",
            "smallint": "SMALLINT"
        },
        "data": [
            {
                "bigint": "9223372036854775807",
                "integer": "2147483647",
                "smallint": "32767"
            }
        ],
        "old": null
    }
  • Example of two Canal JSON-formatted data records that correspond to data changes generated by an UPDATE operation in binary logs of the Hologres table

    // Data content before a change
    {
        "id": 0,
        "database": "test",
        "table": "tp_int",
        "pkNames": [
            "id"
        ],
        "isDdl": false,
        "type": "UPDATE",
        "es": 1640007049196,
        "ts": 1639633142960,
        "sql": "",
        "sqlType": {
            "bigint": -5,
            "integer": 4,
            "smallint": 5
        },
        "mysqlType": {
            "bigint": "BIGINT",
            "integer": "INT",
            "smallint": "SMALLINT"
        },
        "old": [
            {
                "bigint": "0",
                "integer": "0",
                "smallint": "0"
            }
        ],
        "data": null
    }
    // Data content after a change
    {
        "id": 0,
        "database": "test",
        "table": "tp_int",
        "pkNames": [
            "id"
        ],
        "isDdl": false,
        "type": "UPDATE",
        "es": 1640007049196,
        "ts": 1639633142960,
        "sql": "",
        "sqlType": {
            "bigint": -5,
            "integer": 4,
            "smallint": 5
        },
        "mysqlType": {
            "bigint": "BIGINT",
            "integer": "INT",
            "smallint": "SMALLINT"
        },
        "data": [
            {
                "bigint": "9223372036854775807",
                "integer": "2147483647",
                "smallint": "32767"
            }
        ],
        "old": null
    }
  • Example of Canal JSON-formatted data that corresponds to data changes generated by a DELETE operation in binary logs of the Hologres table

    {
        "id": 0,
        "database": "test",
        "table": "tp_int",
        "pkNames": [
            "id"
        ],
        "isDdl": false,
        "type": "DELETE",
        "es": 1640007049196,
        "ts": 1639633142960,
        "sql": "",
        "sqlType": {
            "bigint": -5,
            "integer": 4,
            "smallint": 5
        },
        "mysqlType": {
            "bigint": "BIGINT",
            "integer": "INT",
            "smallint": "SMALLINT"
        },
        "data": [
            {
                "bigint": "9223372036854775807",
                "integer": "2147483647",
                "smallint": "32767"
            }
        ],
        "old": null
    }

Json

Json is a format that uses field names in the Hologres binary logs as keys and serializes the data content of the fields into strings as values. Then, the keys and values are organized as JSON-formatted strings and written to the Kafka topic.

Description of JSON serialization

Serialization of Hologres data types

Hologres data type

Serialization result written to Kafka

bit

Not supported. The system reports an error when the synchronization task starts.

inet

Not supported. The system reports an error when the synchronization task starts.

interval

Not supported. The system reports an error when the synchronization task starts.

money

Not supported. The system reports an error when the synchronization task starts.

oid

Not supported. The system reports an error when the synchronization task starts.

timetz

Not supported. The system reports an error when the synchronization task starts.

uuid

Not supported. The system reports an error when the synchronization task starts.

varbit

Not supported. The system reports an error when the synchronization task starts.

jsonb

Not supported. The system reports an error indicating that binary logs fail to be parsed after data is written to Kafka.

bigint

A numeric string. Example: 2.

decimal(38,18)

A numeric string whose decimal places are the same as the precision. Example: 1.234560000000000000.

decimal(38,0)

A numeric string whose decimal places are the same as the precision. Example: 2.

boolean

"true"/"false".

date

A date string in the yyyy-MM-dd format. Example: 2024-02-02.

float4/float8/double

A numeric string. The system does not add 0 to the serialization result. This ensures that the serialization result is consistent with the data that you query from the Hologres table. Example: 1.24.

interger/smallint

A numeric string. Example: 2.

json

A JSON string. Example: {\"a\":2}.

text/varchar

A string encoded in UTF-8. Example: text.

timestamp

A time string that is accurate to the microsecond

  • If both the millisecond and microsecond parts are 0, the two parts are automatically omitted when data is written to Kafka.

    • For example, the time string 2020-01-01 09:01:01.000000 becomes 2020-01-01 09:01:01 after the time string is written to Kafka.

  • If the microsecond part is 0, zeros after the millisecond will be omitted during writing. For example:

    • For example, the time string 2020-01-01 09:01:01.123000 becomes 2020-01-01 09:01:01.123 after the time string is written to Kafka.

  • If the microsecond part is not 0, the system automatically adds three zeros after the microsecond part when data is written to Kafka.

    • For example, the time string 2020-01-01 09:01:01.123457 becomes 2020-01-01 09:01:01.123457000 after the time string is written to Kafka.

timestamp with time zone

A time string that is accurate to the millisecond. Example: 2020-01-01 09:01:01.123.

  • If the millisecond part is 0, the millisecond part is automatically omitted when data is written to Kafka.

    • For example, the time string 2020-01-01 09:01:01.000 becomes 2020-01-01 09:01:01 after the time string is written to Kafka.

bigserial

A numeric string. Example: 2.

bytea

A string that is encoded in Base64. Example: ASDB==.

char

A fixed-length string. Example: char.

serial

A numeric string. Example: 2

time

A time string that is accurate to the microsecond.

  • If both the millisecond and microsecond parts are 0, the two parts are automatically omitted when data is written to Kafka:

    • For example, the time string 2020-01-01 09:01:01.000000 becomes 2020-01-01 09:01:01 after the time string is written to Kafka.

  • If the millisecond or microsecond part is not 0, the system adds 0 for the nanosecond part after the microsecond part:

    • For example, the time string 2020-01-01 09:01:01.123457 becomes 2020-01-01 09:01:01.123457000 after the time string is written to Kafka.

int4[]/int8[]

A string array. Example: ["1","2","3","4"].

float4[]/float8[]

A string array. Example: ["1.23","2.34"].

boolean[]

A string array. Example: ["true","false"].

text[]

A string array. Example: ["a","b"].

Note

If the values of time fields that are serialized fall out of the range [0001-01-01,9999-12-31], the serialization result is different from the query result in Hologres.

Description of metadata fields

Note
  • One INSERT, UPDATE, or DELETE operation recorded in the binary logs of the Hologres table generates two JSON-formatted records, which is the same as when you use the Canal CDC format. The JSON-formatted records will be synchronized to the related Kafka topic. One of the JSON-formatted records corresponds to the data content before the change, and the other corresponds to the data content after the change.

  • For the JSON format, you can select Output Source Binlog Metadata. If you select this option, fields that describe the properties of change records in the Hologres binary logs are added to the JSON-formatted strings.

image

Field name

Field value meaning

_sequence_id_

The unique identifier of records in the binary logs of the Hologres table. If you perform full synchronization, the value of this field is filled as null.

_operation_type_

The type of the DML change. Valid values: "I", "U", and "D", which indicate INSERT, UPDATE, and DELETE operations. If you perform full synchronization, the value of this field is filled as "I".

_execute_time_

A 13-digit timestamp in milliseconds.

  • which indicates the time when the data in the Hologres table is changed.

  • If you perform full synchronization, the value of this field is filled as 0.

_before_image_

  • Specifies whether message data for incremental synchronization corresponds to the data content before a change. Valid values: Y and N, which indicate yes and no.

  • If you perform full synchronization, the value of this field is filled as N.

  • If the message type for the change is INSERT, the value of this field is filled as N.

  • If the message type for the change is UPDATE, two records are written to Kafka. The value of this field for one of the records is filled as Y, and the value of this field for the other is filled as N.

  • If the message type for the change is DELETE, the value of this field is filled as Y.

_after_image_

  • Specifies whether message data for incremental synchronization corresponds to the data content after a change. Valid values: Y and N, which indicate yes and no.

  • If you perform full synchronization, the value of this field is filled as Y.

  • If the message type for the change is INSERT, the value of this field is filled as Y.

  • If the message type for the change is UPDATE, two records are written to Kafka. The value of this field for one of the records is filled as Y, and the value of this field for the other is filled as N.

  • If the message type for the change is DELETE, the value of this field is filled as N.