All Products
Search
Document Center

Data Transmission Service:Use flink-dts-connector to consume tracked data

Last Updated:Oct 27, 2023

After you configure a change tracking task, you can use the flink-dts-connector file to consume tracked data. This topic describes how to use the flink-dts-connector file to consume tracked data.

Limits

  • Data Transmission Service (DTS) supports the following types of Flink programs: DataStream API and Table API & SQL.

  • If you use a Table API & SQL program, you can consume the data of only one table each time you configure a change tracking task. If you want to consume the data of multiple tables, you must configure a task for each table.

Procedure

In this example, IntelliJ IDEA Community Edition 2020.1 for Windows is used.

  1. Create a change tracking task. For more information, see Track data changes from an ApsaraDB RDS for MySQL instance, Track data changes from a PolarDB for MySQL cluster, or Track data changes from a self-managed Oracle database.

  2. Create one or more consumer groups. For more information, see Create consumer groups.

  3. Download the flink-dts-connector file and decompress it.

  4. Open IntelliJ IDEA. In the window that appears, click Open or Import.

    Open a project
  5. In the dialog box that appears, go to the directory in which the flink-dts-connector file is decompressed, and expand the folders to find the pom.xml file.

    Find the pom.xml file
  6. In the dialog box that appears, select Open as Project.

  7. Add the following dependency to the pom.xml file:

    <dependency>
          <groupId>com.alibaba.flink</groupId>
          <artifactId>flink-dts-connector</artifactId>
          <version>1.1.1-SNAPSHOT</version>
          <classifier>jar-with-dependencies</classifier>
    </dependency>
  8. In IntelliJ IDEA, expand the folders to find the Java files. Then, double-click a Java file based on the type of Flink connector that you use.

    • If you use a DataStream API connector, you must double-click the flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\datastream\DtsExample.java file and perform the following operations:

      1. In the top menu bar of IntelliJ IDEA, click the Run icon. Run icon

      2. In the dialog box that appears, choose DtsExample > Edit. edit

      3. In the Program arguments field, enter the parameters and corresponding values, and then click Run to run flink-dts-connector.

        Note

        For more information about the parameters and methods to obtain the parameter values, see the Parameters section of this topic.

        --broker-url dts-cn-******.******.***:****** --topic cn_hangzhou_rm_**********_dtstest_version2 --sid dts****** --user dtstest --password Test123456 --checkpoint 1624440043
      4. The following figure shows that the Flink program can track data changes from the source database. Data changes (DataStream API)

        Note

        To query specific records of data changes, you can go to the Task Manager page of the Flink program.

    • If you use a Table API & SQL connector, you must double-click the flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\sql\DtsTableISelectTCaseTest.java file and perform the following operations:

      Note

      A single DtsTableISelectTCaseTest.java file can be used to configure only one change tracking task and consume the data of only one table. If you want to consume the data of multiple tables, you must configure a task for each table.

      1. Enter two forward slashes (//) and add comments, as shown in the following figure. Add comments

      2. Specify the information about the table from which you want to track data changes. SQL statements are supported.

      3. Configure the parameters required for the change tracking instance. For more information, see the Parameters section of this topic. Parameters for Table API & SQL

      4. In the top menu bar of IntelliJ IDEA, click Run'DtsTableISelectTCaseTest' to run flink-dts-connector.

      5. The following figure shows that the Flink program can track data changes from the source database. Data changes (Table API & SQL)

        Note

        To query specific records of data changes, you can go to the Task Manager page of the Flink program.

Parameters

Parameter in the DstExample file

Parameter in the DtsTableISelectTCaseTest file

Description

Method to obtain the parameter value

broker-url

dts.server

The network address and port number of the change tracking instance.

Note

If you track data changes over internal networks, the network latency is minimal. This is applicable if the Elastic Compute Service (ECS) instance on which you deploy the Flink program resides on the classic network or in the same virtual private cloud (VPC) as the change tracking instance.

In the DTS console, find the change tracking instance that you want to manage and click the instance ID. On the View Task Settings page, you can view the tracked topic, network address, and port number. Topic

topic

topic

The name of the topic of the change tracking instance.

sid

dts.sid

The ID of the consumer group.

In the DTS console, find the change tracking instance that you want to manage and click the instance ID. In the left-side navigation pane, click Consume Data. You can view the ID and username of the consumer group.

Note

The password of the username of the consumer group is specified when you create the consumer group.

Data consumption

user

dts.user

The username of the consumer group.

Warning

If you are not using the flink-dts-connector file that is described in this topic, you must specify this parameter in the following format: <Username>-<Consumer group ID>. Otherwise, the connection fails. Example: dtstest-dtsae******bpv.

password

dts.password

The password of the username of the consumer group.

checkpoint

dts.checkpoint

The consumer offset. It is the timestamp generated when flink-dts-connector consumes the first data record. The value is a UNIX timestamp. Example: 1624440043.

Note

The consumer offset can be used in the following scenarios:

  • If the consumption process is interrupted, you can specify the consumer offset to resume data consumption. This allows you to prevent data loss.

  • When you start the change tracking client, you can specify the consumer offset to consume data based on your business requirements.

The consumer offset must be within the data range of the change tracking instance, as shown in the following figure. The consumer offset must be converted to a UNIX timestamp. Timestamp range

Note

You can use a search engine to obtain a UNIX timestamp converter.

None

dts-cdc.table.name

The objects for change tracking. You can specify only a single table. You must comply with the following requirements when you specify the table name:

  • If the database is of the MySQL, PolarDB for MySQL, PolarDB-X 1.0, or PolarDB-X 2.0 type, specify the table name in the format of <Database name>.<Table name>.

  • If the database is of another type, specify the table name in the format of <Schema name>.<Table name>.

In the DTS console, find the change tracking instance that you want to manage and click the instance ID. On the View Task Settings page, click View Objects for Change tracking in the upper-right corner. In the dialog box that appears, you can view the database and table to which the objects for change tracking belong.

FAQ

Error message

Possible cause

Solution

Cluster changed from *** to ***, consumer require restart.

The DStore module used by DTS to read incremental data is switched. As a result, the consumer offset of the Flink program is lost.

You do not need to restart the Flink program. You need to only query the consumer offset of the Flink program and set the checkpoint or dts.checkpoint parameter in the DtsExample.java and DtsTableISelectTCaseTest.java files again to resume data consumption.