本文介绍通过Pull模式创建数据订阅功能,创建后订阅通道会实时拉取数据库实例的增量数据,并将增量数据保存在订阅通道中,您可以使用Lindorm提供的SDK从订阅通道中订阅增量数据并进行消费。同时,您可以在LTS页面进行订阅通道的创建、查看及删除等操作。
前提条件
已将客户端IP添加至白名单中,具体操作请参见设置白名单。
已开通数据订阅功能,具体操作,请参见开通数据订阅。
操作步骤
进入LTS(原BDS)页面,在左侧导航栏中,选择数据订阅 > Pull模式。
单击创建数据订阅通道,并配置以下参数。
名称
描述
源集群
填写Lindorm实例ID。
Lindorm表名
选择需要创建数据订阅通道的Lindorm实例表,一条通道只能选择一张表格。
主题名
用于消费数据的主题名称。
数据过期时间(天)
表示数据可以保存的天数,默认为7天。
主题分区数
表示Kafka客户端为该主题设置多个分区,多分区可以并发消费数据,默认为4个分区。
单击提交。
(可选)找到目标订阅通道,单击操作列的详情,可以查看数据订阅通道详情、消费详情和存储详情。
(可选)您可以通过以下代码在Kafka客户端对订阅数据进行消费。
import org.apache.hadoop.hbase.util.Bytes; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import java.time.Duration; import java.util.Arrays; import java.util.Properties; public class TestConsume { public static void main(String[] args) throws Exception { // 创建订阅通道时填写的topic名称 String topic = "test-topic"; // 链接endpoint的配置项 Properties props = new Properties(); // 指定链接endpoint地址 props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "ld-xxx:9092"); // 指定Key序列化器,不可更改 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); // 指定Value序列化器,不可更改 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); // 指定消费组名称,在消费时会自动创建 props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id-0"); // 创建消费者 KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props); // 订阅主题 consumer.subscribe(Arrays.asList(topic)); // 用消费者拉取数据 ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(10000)); for (ConsumerRecord<byte[], byte[]> record : records) { // 查看数据内容 System.out.println("key: " + Bytes.toString(record.key())); System.out.println("value: " + Bytes.toString(record.value())); } // 提交当前消费位移 consumer.commitSync(); // 关闭消费者 consumer.close(); } }
说明数据消费格式说明请参见数据消费格式。