全部产品
Search
文档中心

Lindorm:Buat saluran Pull untuk langganan data

更新时间:Jul 02, 2025

Topik ini menjelaskan cara membuat saluran pelacakan perubahan yang beroperasi dalam mode Pull. Setelah saluran pelacakan perubahan dibuat untuk tabel di instans Lindorm, data tambahan dari tabel tersebut ditarik ke saluran secara real-time dan disimpan di saluran. Anda dapat menggunakan SDK yang disediakan oleh Lindorm untuk berlangganan dan mengonsumsi data tambahan dari saluran pelacakan perubahan. Anda juga dapat membuat, melihat, dan menghapus saluran pelacakan perubahan melalui antarmuka web Lindorm Tunnel Service (LTS).

Prasyarat

Alamat IP klien Anda telah ditambahkan ke daftar putih instans Lindorm Anda. Untuk informasi lebih lanjut, lihat Konfigurasi Daftar Putih.

Fitur pelacakan perubahan telah diaktifkan. Untuk informasi lebih lanjut, lihat Aktifkan Pelacakan Perubahan.

Prosedur

  1. Masuk ke antarmuka web LTS. Di panel navigasi sebelah kiri, pilih Change Data Capture > Pull.

    streamone

  2. Di halaman yang muncul, klik Create Subscription dan konfigurasikan parameter yang dijelaskan dalam tabel berikut.

    Parameter

    Deskripsi

    Lindorm Datasource

    Pilih ID instans Lindorm Anda.

    Lindorm Table

    Pilih tabel tempat Anda ingin membuat saluran pelacakan perubahan. Anda hanya dapat memilih satu tabel untuk saluran pelacakan perubahan.

    Topic

    Masukkan nama topik tempat Anda ingin mengonsumsi data yang dilanggan.

    Data Expiration Time (Day)

    Tentukan jumlah hari data yang dilanggan dapat disimpan. Nilai default: 7.

    Topic Partition Num

    Tentukan jumlah partisi dalam topik pada klien Kafka. Data dalam beberapa partisi dapat dikonsumsi secara bersamaan. Nilai default: 4.

  3. Klik Commit.

  4. Opsional. Klik Details di kolom Actions yang sesuai dengan saluran yang Anda buat. Di halaman yang muncul, Anda dapat melihat detail tentang saluran pelacakan perubahan, konsumsi data, dan penggunaan penyimpanan.

  5. Opsional. Konsumsi data yang dilanggan. Saat mengonfigurasi klien Kafka Anda, Anda dapat menambahkan kode berikut untuk mengimplementasikan konsumsi data.

    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.kafka.clients.admin.AdminClientConfig;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.ByteArrayDeserializer;
    
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Properties;
    
    public class TestConsume {
      public static void main(String[] args) throws Exception {
        // Tentukan nama topik tempat data yang ingin Anda konsumsi termasuk. Nama topik harus sama dengan yang Anda tentukan saat membuat saluran pelacakan perubahan.
        String topic = "test-topic";
    
        // Tentukan item konfigurasi yang digunakan untuk terhubung ke titik akhir instans Lindorm Anda.
        Properties props = new Properties();
        // Tentukan titik akhir instans Lindorm Anda.
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "ld-xxx:9092");
        // Tentukan serializer yang digunakan untuk serialisasi kunci. Anda harus menggunakan serializer yang sama seperti dalam contoh ini.
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        // Tentukan serializer yang digunakan untuk serialisasi nilai. Anda harus menggunakan serializer yang sama seperti dalam contoh ini.
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        // Tentukan nama grup konsumen. Grup konsumen dibuat secara otomatis saat data dikonsumsi.
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id-0");
    
        // Buat konsumen.
        KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
        // Berlangganan ke topik.
        consumer.subscribe(Arrays.asList(topic));
    
        // Gunakan konsumen untuk menarik data.
        ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(10000));
        for (ConsumerRecord<byte[], byte[]> record : records) {
          // Lihat data.
          System.out.println("key: " + Bytes.toString(record.key()));
          System.out.println("value: " + Bytes.toString(record.value()));
        }
        // Komit offset konsumen saat ini.
        consumer.commitSync();
        // Nonaktifkan konsumen.
        consumer.close();
      }
    }
    Catatan

    Untuk informasi lebih lanjut tentang format data yang dihasilkan oleh fitur pelacakan perubahan, lihat Format Data.