After you configure a change tracking task, you can use the flink-dts-connector file to consume tracked data. This topic describes how to use the flink-dts-connector file to consume tracked data.

Limits

  • Data Transmission Service (DTS) supports the following types of Flink programs: DataStream API and Table API & SQL.
  • If you use a Table API & SQL program, you can consume the data of only one table each time you configure a change tracking task. If you want to consume the data of multiple tables, you must configure a task for each table.

Procedure

IntelliJ IDEA (Community Edition 2020.1 Windows) is used in this example.

  1. Create a change tracking task. For more information, see Track data changes from an ApsaraDB RDS for MySQL instance (new), Track data changes from a PolarDB for MySQL cluster, or Track data changes from a self-managed Oracle database.
  2. Create one or more consumer groups. For more information, see Create consumer groups.
  3. Download the flink-dts-connector file and decompress it.
  4. Open IntelliJ IDEA. In the window that appears, click Open or Import.
    Open a project
  5. In the dialog box that appears, go to the directory where the flink-dts-connector file is decompressed, and expand the folders to find the pom.xml file.
    Find the pom.xml file
  6. In the dialog box that appears, select Open as Project.
  7. Add the following dependency to the pom.xml file:
    <dependency>
          <groupId>com.alibaba.flink</groupId>
          <artifactId>flink-dts-connector</artifactId>
          <version>1.1.1-SNAPSHOT</version>
          <classifier>jar-with-dependencies</classifier>
    </dependency>
  8. On the IntelliJ IDEA page, expand the folders to find the Java files. Then, double-click a Java file based on the type of Flink connector that you use.
    • If you use a DataStream API connector, you must double-click the flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\datastream\DtsExample.java file and perform the following operations:
      1. In the top menu bar of IntelliJ IDEA, click the Run icon. Run icon
      2. In the dialog box that appears, choose DtsExample > Edit. Edit
      3. In the Program arguments field, enter the parameters and corresponding values, and then click Run to run flink-dts-connector.
        Note For more information about the parameters and query methods, see Parameters.
        --broker-url dts-cn-******.******.***:****** --topic cn_hangzhou_rm_**********_dtstest_version2 --sid dts****** --user dtstest --password Test123456 --checkpoint 1624440043
      4. The following figure shows that the Flink program can track data changes from the source database. Data changes (DataStream API)
        Note To query specific records of data changes, you can go to the Task Manager page of the Flink program.
    • If you use a Table API & SQL connector, you must double-click the flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\sql\DtsTableISelectTCaseTest.java file and perform the following operations:
      Note A single DtsTableISelectTCaseTest.java file can be used to configure only one change tracking task and consume the data of only one table. If you want to consume the data of multiple tables, you must configure a task for each table.
      1. Enter two forward slashes (//) and add comments, as shown in the following figure. Add comments
      2. Specify the information of the table from which you want to track data changes. SQL statements are supported.
      3. Set the parameters required for the change tracking instance. For more information, see Parameters. Parameters for Table API & SQL
      4. In the top menu bar of IntelliJ IDEA, click Run'DtsTableISelectTCaseTest' to run flink-dts-connector.
      5. The following figure shows that the Flink program can track data changes from the source database. Data changes (Table API & SQL)
        Note To query specific records of data changes, you can go to the Task Manager page of the Flink program.

Parameters

Parameters in the DstExample file Parameters in the DtsTableISelectTCaseTest file Description Query method
broker-url dts.server The network address and port number of the change tracking instance.
Note If you track data changes over internal networks, the network latency is minimal. This is applicable if the Elastic Compute Service (ECS) instance where you deploy the Flink program belongs to the classic network or the same VPC as the change tracking instance.
In the DTS console, click the instance ID. On the View Task Settings page, you can obtain the tracked topic, network address, and port number. View Task Settings
topic topic The topic of the change tracking instance.
sid dts.sid The ID of the consumer group. In the DTS console, click the instance ID, and then click Data Consume. You can obtain the consumer group ID and the corresponding username.
Note When you create a consumer group, the password of the consumer group is automatically specified.
Data Consume
user dts.user The username of the consumer group.
Warning If you are not using the flink-dts-connector file that is described in this topic, you must specify this parameter in the following format: <Username>-<Consumer group ID>, for example, dtstest-dtsae******bpv. Otherwise, the connection fails.
password dts.password The password of the consumer group.
checkpoint dts.checkpoint The consumer offset. It is the timestamp when flink-dts-connector consumes the first data record. The value is a UNIX timestamp, for example, 1624440043.
Note The consumer offset is useful in the following scenarios:
  • If the consumption process is interrupted, you can specify the consumer offset on the change tracking client to resume data consumption. This allows you to prevent against data loss.
  • When you start the change tracking client, you can specify the consumer offset to consume data on demand.
The consumer offset must be within the data range of the change tracking instance, as shown in the following figure. The consumer offset must be converted to a UNIX timestamp. Data range
Note You can use a search engine to obtain a UNIX timestamp converter.
None dts.cdc.table.name The objects for change tracking. You can specify only a single table in the format of <Database name>.<Table name>, for example, dtstestdata.order. In the DTS console, click the instance ID. On the View Task Settings page, click View Objects to view the database name and table name.

FAQ

Error message Possible cause Solution
Cluster changed from *** to ***, consumer require restart.
The DStore module used by DTS to read incremental data is switched. As a result, the consumer offset of the Flink program is lost. You do not need to restart the Flink program. You only need to query the consumer offset of the Flink program and set the checkpoint or dts.checkpoint parameter in the DtsExample.java and DtsTableISelectTCaseTest.java files again to resume data consumption.