All Products
Search
Document Center

Data Transmission Service:Use flink-dts-connector to consume tracked data

Last Updated:Mar 30, 2026

After you configure a change tracking task, use flink-dts-connector to consume the tracked data in a Flink program. Data Transmission Service (DTS) supports two Flink programming models: DataStream API and Table API & SQL.

Usage notes

  • Table API & SQL: Each change tracking task configuration consumes data from one table only. To consume data from multiple tables, configure a separate task for each table.

Prerequisites

Before you begin, ensure that you have:

Set up flink-dts-connector

In this example, IntelliJ IDEA Community Edition 2020.1 Windows is used.

  1. Download the flink-dts-connector repository and decompress it.

  2. Open the project in IntelliJ IDEA:

    • On the welcome screen, click Open or Import.

      Open a project

    • Navigate to the decompressed directory, expand the folders to locate pom.xml.

      Find the pom.xml file

    • Click Open as Project.

  3. Add the following dependency to your pom.xml:

    <dependency>
      <groupId>com.alibaba.flink</groupId>
      <artifactId>flink-dts-connector</artifactId>
      <version>1.1.1-SNAPSHOT</version>
      <classifier>jar-with-dependencies</classifier>
    </dependency>

Run with DataStream API

Open flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\datastream\DtsExample.java.

To run the connector:

  1. In the top menu bar, click the Run icon.

    Run icon

  2. Choose DtsExample > Edit.

    edit

  3. In the Program arguments field, enter the connection parameters and click Run. Replace the placeholder values with your actual values:

    --broker-url <endpoint>:<port> --topic <topic-name> --sid <consumer-group-id> --user <username> --password <password> --checkpoint <unix-timestamp>

    For example:

    --broker-url dts-cn-******.******.***:****** --topic cn_hangzhou_rm_**********_dtstest_version2 --sid dts****** --user dtstest --password Test123456 --checkpoint 1624440043

    For parameter descriptions and how to find each value, see Parameters.

When the connector runs successfully, the Flink program tracks and displays data changes from the source database.

Data changes (DataStream API)
To query specific change records, go to the Task Manager page of the Flink program.

Run with Table API & SQL

A single DtsTableISelectTCaseTest.java file supports one change tracking task and one table. To consume data from multiple tables, configure a separate task for each table.

Open flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\sql\DtsTableISelectTCaseTest.java.

To run the connector:

  1. Comment out the existing configuration lines by adding // at the beginning of each line.

    Add comments

  2. Specify the table to track using a SQL statement.

  3. Set the connection parameters for the change tracking instance. For parameter descriptions, see Parameters.

    Parameters for Table API & SQL

  4. In the top menu bar, click Run 'DtsTableISelectTCaseTest'.

When the connector runs successfully, the Flink program tracks and displays data changes from the source database.

Data changes (Table API & SQL)
To query specific change records, go to the Task Manager page of the Flink program.

Parameters

Parameter (DataStream API) Parameter (Table API & SQL) Description How to get the value
broker-url dts.server Endpoint and port number of the change tracking instance. For minimal network latency, deploy the Flink program on an Elastic Compute Service (ECS) instance on the classic network or in the same virtual private cloud (VPC) as the change tracking instance. In the DTS console, click the instance ID. On the Basic Information page, see Topic and Network.
topic topic Topic name of the change tracking instance. Same as above.
sid dts.sid Consumer group ID. In the DTS console, click the instance ID. In the left-side navigation pane, click Consume Data. See Consumer Group ID/Name and Account.
user dts.user Username of the consumer group.
Warning

If you use a different client (not flink-dts-connector), format the username as <Username>-<Consumer group ID>, for example, dtstest-dtsae******bpv. Otherwise, the connection fails.

Same as above.
password dts.password Password of the consumer group. Specified when you created the consumer group.
checkpoint dts.checkpoint Consumption checkpoint — the UNIX timestamp from which the connector starts consuming data. See Set the consumption checkpoint. The timestamp must fall within the data range of the change tracking instance. See Data Range on the Change Tracking Tasks page. Use an online converter to get a UNIX timestamp.
N/A dts-cdc.table.name The table to track, in the format <Database name>.<Table name>, for example, dtstestdata.order. One table per configuration. In the DTS console, click the instance ID. On the Basic Information or Task Management page, click View Objects.

Set the consumption checkpoint

The checkpoint / dts.checkpoint parameter controls where the connector starts reading data. Use it in two scenarios:

  • Resume after interruption: After consumption is interrupted, set the checkpoint to the last consumed offset to resume without data loss.

  • Start from a specific point: Set the checkpoint to any timestamp within the data range of the change tracking instance to consume data from that point forward.

The value must be a UNIX timestamp, and it must fall within the data range of the change tracking instance. View the data range in the Data Range column on the Change Tracking Tasks page.

Troubleshooting

Error message Cause Solution
Cluster changed from * to *, consumer require restart. The DStore module that DTS uses to read incremental data switched, causing the Flink program to lose its consumer offset. Do not restart the Flink program. Query the current consumer offset and set checkpoint (DataStream API) or dts.checkpoint (Table API & SQL) to that value to resume consumption.

What's next