Set up a change tracking channel in Pull mode to stream incremental data changes from a Lindorm table to a channel. Once the channel is active, you can use the SDKs provided by Lindorm to subscribe to and consume the incremental data from the change tracking channel.
Prerequisites
Before you begin, ensure that you have:
Client IP addresses added to the whitelist of your Lindorm instance. See Configure whitelists.
The change tracking feature enabled on your instance. See Enable change tracking.
Create a subscription channel
Log on to the Lindorm Tunnel Service (LTS) web UI. In the left-side navigation pane, choose Change Data Capture > Pull.

Click Create Subscription and configure the following parameters.
Click Commit.
(Optional) To inspect the channel after creation, click Details in the Actions column. The details page shows channel configuration, data consumption progress, and storage usage.
Consume subscribed data
After the channel is created, configure your Kafka consumer to read from the topic. All events use ByteArrayDeserializer for both keys and values.
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class TestConsume {
public static void main(String[] args) throws Exception {
// The topic name must match the one you specified when creating the channel.
String topic = "test-topic";
Properties props = new Properties();
// Replace with your Lindorm instance endpoint.
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "ld-xxx:9092");
// Use ByteArrayDeserializer for both keys and values.
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
// The consumer group is created automatically on first consumption.
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id-0");
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
// Poll for records. Adjust the timeout based on your expected data volume.
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(10000));
for (ConsumerRecord<byte[], byte[]> record : records) {
System.out.println("key: " + Bytes.toString(record.key()));
System.out.println("value: " + Bytes.toString(record.value()));
}
// Commit the offset to mark records as processed, then close the consumer.
consumer.commitSync();
consumer.close();
}
}Replace the following placeholders before running the code:
| Placeholder | Description | Example |
|---|---|---|
test-topic | The topic name you specified when creating the channel | my-cdc-topic |
ld-xxx:9092 | The endpoint of your Lindorm instance | ld-bp1abc123:9092 |
group-id-0 | A consumer group ID. The group is created automatically. | my-consumer-group |
For the format of change tracking events, see Data format.
Configuration tips
Topic partition count: Set Topic Partition Num to match the number of concurrent consumers you plan to run. If you are unsure, use the default of 4.
Data retention: Set Data Expiration Time (Day) based on how long your consumers may be offline. The default of 7 days suits most use cases.
Consumer group: The consumer group you specify in
GROUP_ID_CONFIGis created automatically on first use. Use a unique group ID for each independent consumer application.
What's next
To verify your channel is receiving data, check the Details page and confirm that the consumption progress is advancing.
To learn about the structure of change events, see Data format.