All Products
Search
Document Center

Data Transmission Service:Use flink-dts-connector to consume tracked data

Last Updated:Oct 27, 2023

After you configure a change tracking task, you can use the flink-dts-connector file to consume tracked data. This topic describes how to use the flink-dts-connector file to consume tracked data.

Usage notes

  • Data Transmission Service (DTS) supports the following types of Flink programs: DataStream API and Table API & SQL.

  • If you use a Table API & SQL program, you can consume the data of only one table each time you configure a change tracking task. If you want to consume the data of multiple tables, you must configure a task for each table.

Procedure

In this example, IntelliJ IDEA Community Edition 2020.1 Windows is used.

  1. Create a change tracking task. For more information, see the relevant topics in Overview of change tracking scenarios.
  2. Create one or more consumer groups. For more information, see Create consumer groups.
  3. Download the flink-dts-connector file and decompress it.

  4. Open IntelliJ IDEA. In the window that appears, click Open or Import.

    Open a project
  5. In the dialog box that appears, go to the directory where the flink-dts-connector file is decompressed, and expand the folders to find the pom.xml file.

    Find the pom.xml file
  6. In the dialog box that appears, select Open as Project.
  7. Add the following dependency to the pom.xml file:

    <dependency>
          <groupId>com.alibaba.flink</groupId>
          <artifactId>flink-dts-connector</artifactId>
          <version>1.1.1-SNAPSHOT</version>
          <classifier>jar-with-dependencies</classifier>
    </dependency>
  8. In IntelliJ IDEA, expand the folders to find the Java files. Then, double-click a Java file based on the type of Flink connector that you use.

    • If you use a DataStream API connector, you must double-click the flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\datastream\DtsExample.java file and perform the following operations:

      1. In the top menu bar of IntelliJ IDEA, click the Run icon. Run icon

      2. In the dialog box that appears, choose DtsExample > Edit. edit

      3. In the Program arguments field, enter the parameters and corresponding values, and then click Run to run flink-dts-connector.

        Note

        For more information about the parameters and methods to obtain the parameter values, see the Parameters section of this topic.

        --broker-url dts-cn-******.******.***:****** --topic cn_hangzhou_rm_**********_dtstest_version2 --sid dts****** --user dtstest --password Test123456 --checkpoint 1624440043
      4. The following figure shows that the Flink program can track data changes from the source database. Data changes (DataStream API)

        Note

        To query specific records of data changes, you can go to the Task Manager page of the Flink program.

    • If you use a Table API & SQL connector, you must double-click the flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\sql\DtsTableISelectTCaseTest.java file and perform the following operations:

      Note

      A single DtsTableISelectTCaseTest.java file can be used to configure only one change tracking task and consume the data of only one table. If you want to consume the data of multiple tables, you must configure a task for each table.

      1. Enter two forward slashes (//) and add comments, as shown in the following figure. Add comments

      2. Specify the information of the table from which you want to track data changes. SQL statements are supported.

      3. Set the parameters required for the change tracking instance. For more information, see the Parameters section of this topic. Parameters for Table API & SQL

      4. In the top menu bar of IntelliJ IDEA, click Run'DtsTableISelectTCaseTest' to run flink-dts-connector.

      5. The following figure shows that the Flink program can track data changes from the source database. Data changes (Table API & SQL)

        Note

        To query specific records of data changes, you can go to the Task Manager page of the Flink program.

Parameters

Parameter in the DstExample file

Parameter in the DtsTableISelectTCaseTest file

Description

Method to obtain the parameter value

broker-url

dts.server

The endpoint and port number of the change tracking instance.

Note

If you track data changes over internal networks, the network latency is minimal. This is applicable if the Elastic Compute Service (ECS) instance on which you deploy the Flink program resides on the classic network or in the same virtual private cloud (VPC) as the change tracking instance.

In the DTS console, find the change tracking instance that you want to manage and click the instance ID. On the Basic Information page, you can view Topic and Network of the instance.

topic

topic

The name of the topic of the change tracking instance.

sid

dts.sid

The ID of the consumer group.

In the DTS console, find the change tracking instance that you want to manage and click the instance ID. In the left-side navigation pane, click Consume Data. You can view Consumer Group ID/Name of the instance and Account of the consumer group.

Note

The password of the consumer group account is specified when you create the consumer group.

user

dts.user

The username of the consumer group.

Warning

If you are not using the flink-dts-connector file that is described in this topic, you must specify this parameter in the following format: <Username>-<Consumer group ID>. Otherwise, the connection fails. Example: dtstest-dtsae******bpv.

password

dts.password

The password of the consumer group.

checkpoint

dts.checkpoint

The consumer offset. It is the timestamp generated when flink-dts-connector consumes the first data record. The value is a UNIX timestamp. Example: 1624440043.

Note

The consumer offset can be used in the following scenarios:

  • After the consumption process is interrupted, you can specify the consumer offset to resume data consumption. This allows you to prevent data loss.

  • When you start the change tracking client, you can specify the consumer offset to consume data based on your business requirements.

The consumer offset of consumed data must be within the data range of the change tracking instance. The consumer offset must be converted to a UNIX timestamp.

Note
  • You can view the data range of the change tracking instance in the Data Range column on the Change Tracking Tasks page.

  • You can use a search engine to obtain a UNIX timestamp converter.

N/A

dts-cdc.table.name

The objects for change tracking. You can specify only a single table in the format of <Database name>.<Table name>. Example: dtstestdata.order.

In the DTS console, find the change tracking instance that you want to manage and click the instance ID. In the upper part of the Basic Information or Task Management page, click View Objects to view the database and table to which the objects for change tracking belong.

FAQ

Error message

Possible cause

Solution

Cluster changed from *** to ***, consumer require restart.

The DStore module used by DTS to read incremental data is switched. As a result, the consumer offset of the Flink program is lost.

You do not need to restart the Flink program. You need to only query the consumer offset of the Flink program and set the checkpoint or dts.checkpoint parameter in the DtsExample.java and DtsTableISelectTCaseTest.java files again to resume data consumption.