After you configure a change tracking task, use flink-dts-connector to consume the tracked data in a Flink program. Data Transmission Service (DTS) supports two Flink programming models: DataStream API and Table API & SQL.
Usage notes
-
Table API & SQL: Each change tracking task configuration consumes data from one table only. To consume data from multiple tables, configure a separate task for each table.
Prerequisites
Before you begin, ensure that you have:
-
A change tracking task. See Overview of change tracking scenarios
-
One or more consumer groups. See Create consumer groups
Set up flink-dts-connector
In this example, IntelliJ IDEA Community Edition 2020.1 Windows is used.
-
Download the flink-dts-connector repository and decompress it.
-
Open the project in IntelliJ IDEA:
-
On the welcome screen, click Open or Import.

-
Click Open as Project.
-
-
Add the following dependency to your
pom.xml:<dependency> <groupId>com.alibaba.flink</groupId> <artifactId>flink-dts-connector</artifactId> <version>1.1.1-SNAPSHOT</version> <classifier>jar-with-dependencies</classifier> </dependency>
Run with DataStream API
Open flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\datastream\DtsExample.java.
To run the connector:
-
In the top menu bar, click the Run icon.

-
Choose DtsExample > Edit.

-
In the Program arguments field, enter the connection parameters and click Run. Replace the placeholder values with your actual values:
--broker-url <endpoint>:<port> --topic <topic-name> --sid <consumer-group-id> --user <username> --password <password> --checkpoint <unix-timestamp>For example:
--broker-url dts-cn-******.******.***:****** --topic cn_hangzhou_rm_**********_dtstest_version2 --sid dts****** --user dtstest --password Test123456 --checkpoint 1624440043For parameter descriptions and how to find each value, see Parameters.
When the connector runs successfully, the Flink program tracks and displays data changes from the source database.
To query specific change records, go to the Task Manager page of the Flink program.
Run with Table API & SQL
A single DtsTableISelectTCaseTest.java file supports one change tracking task and one table. To consume data from multiple tables, configure a separate task for each table.
Open flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\sql\DtsTableISelectTCaseTest.java.
To run the connector:
-
Comment out the existing configuration lines by adding
//at the beginning of each line.
-
Specify the table to track using a SQL statement.
-
Set the connection parameters for the change tracking instance. For parameter descriptions, see Parameters.

-
In the top menu bar, click Run 'DtsTableISelectTCaseTest'.
When the connector runs successfully, the Flink program tracks and displays data changes from the source database.
To query specific change records, go to the Task Manager page of the Flink program.
Parameters
| Parameter (DataStream API) | Parameter (Table API & SQL) | Description | How to get the value |
|---|---|---|---|
broker-url |
dts.server |
Endpoint and port number of the change tracking instance. For minimal network latency, deploy the Flink program on an Elastic Compute Service (ECS) instance on the classic network or in the same virtual private cloud (VPC) as the change tracking instance. | In the DTS console, click the instance ID. On the Basic Information page, see Topic and Network. |
topic |
topic |
Topic name of the change tracking instance. | Same as above. |
sid |
dts.sid |
Consumer group ID. | In the DTS console, click the instance ID. In the left-side navigation pane, click Consume Data. See Consumer Group ID/Name and Account. |
user |
dts.user |
Username of the consumer group. Warning
If you use a different client (not flink-dts-connector), format the username as |
Same as above. |
password |
dts.password |
Password of the consumer group. | Specified when you created the consumer group. |
checkpoint |
dts.checkpoint |
Consumption checkpoint — the UNIX timestamp from which the connector starts consuming data. See Set the consumption checkpoint. | The timestamp must fall within the data range of the change tracking instance. See Data Range on the Change Tracking Tasks page. Use an online converter to get a UNIX timestamp. |
| N/A | dts-cdc.table.name |
The table to track, in the format <Database name>.<Table name>, for example, dtstestdata.order. One table per configuration. |
In the DTS console, click the instance ID. On the Basic Information or Task Management page, click View Objects. |
Set the consumption checkpoint
The checkpoint / dts.checkpoint parameter controls where the connector starts reading data. Use it in two scenarios:
-
Resume after interruption: After consumption is interrupted, set the checkpoint to the last consumed offset to resume without data loss.
-
Start from a specific point: Set the checkpoint to any timestamp within the data range of the change tracking instance to consume data from that point forward.
The value must be a UNIX timestamp, and it must fall within the data range of the change tracking instance. View the data range in the Data Range column on the Change Tracking Tasks page.
Troubleshooting
| Error message | Cause | Solution |
|---|---|---|
Cluster changed from * to *, consumer require restart. |
The DStore module that DTS uses to read incremental data switched, causing the Flink program to lose its consumer offset. | Do not restart the Flink program. Query the current consumer offset and set checkpoint (DataStream API) or dts.checkpoint (Table API & SQL) to that value to resume consumption. |
