All Products
Search
Document Center

Lindorm:Create a Pull channel for data subscription

Last Updated:Mar 28, 2026

Set up a change tracking channel in Pull mode to stream incremental data changes from a Lindorm table to a channel. Once the channel is active, you can use the SDKs provided by Lindorm to subscribe to and consume the incremental data from the change tracking channel.

Prerequisites

Before you begin, ensure that you have:

Create a subscription channel

  1. Log on to the Lindorm Tunnel Service (LTS) web UI. In the left-side navigation pane, choose Change Data Capture > Pull.

    streamone

  2. Click Create Subscription and configure the following parameters.

  3. Click Commit.

  4. (Optional) To inspect the channel after creation, click Details in the Actions column. The details page shows channel configuration, data consumption progress, and storage usage.

Consume subscribed data

After the channel is created, configure your Kafka consumer to read from the topic. All events use ByteArrayDeserializer for both keys and values.

import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class TestConsume {
  public static void main(String[] args) throws Exception {
    // The topic name must match the one you specified when creating the channel.
    String topic = "test-topic";

    Properties props = new Properties();
    // Replace with your Lindorm instance endpoint.
    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "ld-xxx:9092");
    // Use ByteArrayDeserializer for both keys and values.
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
    // The consumer group is created automatically on first consumption.
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id-0");

    KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList(topic));

    // Poll for records. Adjust the timeout based on your expected data volume.
    ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(10000));
    for (ConsumerRecord<byte[], byte[]> record : records) {
      System.out.println("key: " + Bytes.toString(record.key()));
      System.out.println("value: " + Bytes.toString(record.value()));
    }

    // Commit the offset to mark records as processed, then close the consumer.
    consumer.commitSync();
    consumer.close();
  }
}

Replace the following placeholders before running the code:

PlaceholderDescriptionExample
test-topicThe topic name you specified when creating the channelmy-cdc-topic
ld-xxx:9092The endpoint of your Lindorm instanceld-bp1abc123:9092
group-id-0A consumer group ID. The group is created automatically.my-consumer-group
For the format of change tracking events, see Data format.

Configuration tips

  • Topic partition count: Set Topic Partition Num to match the number of concurrent consumers you plan to run. If you are unsure, use the default of 4.

  • Data retention: Set Data Expiration Time (Day) based on how long your consumers may be offline. The default of 7 days suits most use cases.

  • Consumer group: The consumer group you specify in GROUP_ID_CONFIG is created automatically on first use. Use a unique group ID for each independent consumer application.

What's next

  • To verify your channel is receiving data, check the Details page and confirm that the consumption progress is advancing.

  • To learn about the structure of change events, see Data format.