This topic describes how to use the demo code of a Kafka client to consume tracked data. The change tracking feature of the new version allows you to consume tracked data by using a Kafka client from V0.11 to V1.1.
- A change tracking task is created. For more information, see Track data changes from ApsaraDB RDS for MySQL (new).
- One or more consumer groups are created. For more information, see Create consumer groups.
- If you enable auto commit when you use the change tracking feature, some data may
be committed before it is consumed. This results in data loss. We recommend that you
manually commit data.
Note If data fails to be committed due to a fault, you can restart the client to continue consuming data from the last recorded consumer offset. However, duplicate data may be generated during this period. You must manually filter out the duplicate data.
- Data is serialized and stored in the Avro format. For more information, see Record.avsc.
Note If the client that you use is not a Kafka client, you must parse the tracked data based on the Avro schema.
- Regarding the
offsetFotTimesinterface, the search unit of DTS is seconds, and the search unit of native Kafka is milliseconds.
Download and run the demo code of the Kafka client
|Step||Related files and directories|
|1. Use the native Kafka consumer to obtain incremental data from change tracking tasks.||subscribe_example-master/javaimpl/src/main/java/recordgenerator/|
|2. Deserialize the image of the incremental data, and obtain pre-image , post-image and other attributes.||subscribe_example-master/javaimpl/src/main/java/boot/MysqlRecordPrinter.java|
|3. Convert the type of the dataTypeNumber field in the deserialized data into the
specified type of the MySQL field.
Note For more information, see Mappings between MySQL data types and dataTypeNumber values.
This procedure uses IntelliJ IDEA (Community Edition 2018.1.4 Windows) as an example.
- Download the demo code of the Kafka client, and then extract the file.
- Open IntelliJ IDEA. In the window that appears, click Open.
- In the dialog box that appears, go to the directory in which the downloaded demo code
resides. Find the pom.xml sample project file.
- In the dialog box that appears, select Open as Project.
- On the IntelliJ IDEA page, expand folders to find the demo file of the Kafka client
NotifyDemo.java, and double-click the file.
- Set the parameters in the NotifyDemo.java file.
Parameter Description Setting USER_NAME The username of the consumer group.Warning If you are not using the Kafka client, you must specify the username in the following format:
<Consumer group account>-<Consumer group ID>, such as
dtstest-dtsae******bpv. Otherwise, the connection fails.
In the DTS console, click the target instance ID, and then click Data Consume. You can obtain the Consumer Group ID and the corresponding Account information.Note The password of the consumer group account is automatically specified when you create a consumer group. PASSWORD_NAME The password of the account. SID_NAME The ID of the consumer group. GROUP_NAME The name of the consumer group. Set this parameter to the consumer group ID. KAFKA_TOPIC The topic of the change tracking task. In the DTS console, click the ID of the target instance. On the Track Data Changes page, you can obtain the tracked topic, network address, and port number. KAFKA_BROKER_URL_NAME The endpoint and port number of the change tracking task.Note If you track data changes by using internal networks, the network latency is minimized. This is applicable if the ECS instance where you deploy the Kafka client belongs to the same VPC or classic network as the change tracking instance. INITIAL_CHECKPOINT_NAME The consumer offset of consumed data in the Unix timestamp format.Note You must save the consumer offset. If the consumption process is interrupted, you can specify the consumer offset on the change tracking client to resume data consumption. This allows you to prevent against data loss. When you start the change tracking client, you can specify the consumer offset to consume data on demand. When you use the Kafka client to track data changes for the first time, convert the format of the required time point into Unix timestamp. USE_CONFIG_CHECKPOINT_NAME Default value: true. If this parameter is set to true, the client is forced to consume data from the specified consumer offset. This allows you to retain the data that is received but not processed. -
- On the top of the IntelliJ IDEA page, choose
to run the client.Note On the first run, the software loads and installs the relevant dependency package.
The following figure shows that the Kafka client can track data changes in the source database.
You can also delete the
// characters from the
//log.info(ret); string in line 25 of the NotifyDemo.java file. Then, run the client again to view the data change information.
Mappings between MySQL data types and dataTypeNumber values
|MySQL data type||Value of dataTypeNumber|