All Products
Search
Document Center

Lindorm:Create a Pull channel for data subscription

Last Updated:Jan 03, 2023

This topic describes how to create a change tracking channel that works in Pull mode. After a change tracking channel is created for a table in a Lindorm instance, the incremental data of the table is pulled to the channel in real time and is stored in the channel. You can use the SDKs provided by Lindorm to subscribe to and consume the incremental data from the change tracking channel. You can create, view, and delete change tracking channels in the Lindorm Tunnel Service (LTS) web UI.

Prerequisites

The IP addresses of your clients are added to the whitelist of your Lindorm instance. For more information, see Configure whitelists.

The change tracking feature is enabled. For more information, see Enable change tracking.

Procedure

  1. Log on to the LTS web UI. In the left-side navigation pane, choose Change Data Capture > Pull.

    streamone
  2. On the page that appears, click Create Subscription and configure the parameters that are described in the following table.

    Parameter

    Description

    Lindorm Datasource

    Select the ID of your Lindorm instance.

    Lindorm Table

    Select the table for which you want to create a change tracking channel. You can select only one table for a change tracking channel.

    Topic

    Enter the name of the topic from which you want to consume the subscribed data.

    Data Expiration Time (Day)

    Specify the number of days for which the subscribed data can be retained. Default value: 7.

    Topic Partition Num

    Specify the number of partitions in the topic on the Kafka client. Data in multiple partitions can be concurrently consumed. Default value: 4.

  3. Click Commit.

  4. Optional. Click Details in the Actions column corresponding to the channel that you create. On the page that appears, you can view the details about the change tracking channel, data consumption, and storage usage.

  5. Optional. Consume the subscribed data. When you configure your Kafka client, you can add the following code to implement data consumption.

    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.kafka.clients.admin.AdminClientConfig;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.ByteArrayDeserializer;
    
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Properties;
    
    public class TestConsume {
      public static void main(String[] args) throws Exception {
        // Specify the name of the topic to which the data that you want to consume belongs. The topic name must be the name that you specified when you created the change tracking channel.
        String topic = "test-topic";
    
        // Specify the configuration items that are used to connect to the endpoint of your Lindorm instance.
        Properties props = new Properties();
        // Specify the endpoint of your Lindorm instance.
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "ld-xxx:9092");
        // Specify the serializer that is used to serialize keys. You must use the same serializer that is used in this example.
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        // Specify the serializer that is used to serialize values. You must use the same serializer that is used in this example.
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        // Specify the name of the consumer group. The consumer group is automatically created when data is consumed.
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id-0");
    
        // Create a consumer.
        KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
        // Subscribe to the topic.
        consumer.subscribe(Arrays.asList(topic));
    
        // Use the consumer to pull data.
        ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(10000));
        for (ConsumerRecord<byte[], byte[]> record : records) {
          // View the data.
          System.out.println("key: " + Bytes.toString(record.key()));
          System.out.println("value: " + Bytes.toString(record.value()));
        }
        // Commit the current consumer offset.
        consumer.commitSync();
        // Disable the consumer.
        consumer.close();
      }
    }
    Note

    For more information about the format of data that is generated by the change tracking feature, see Data format.