すべてのプロダクト
Search
ドキュメントセンター

Lindorm:プル チャネルを作成してデータ サブスクリプションを行う

最終更新日:Jan 14, 2025

このトピックでは、プル モードで動作する変更追跡チャネルを作成する方法について説明します。Lindorm インスタンスのテーブルに対して変更追跡チャネルが作成されると、テーブルの増分データはリアルタイムでチャネルにプルされ、チャネルに格納されます。 Lindorm が提供する SDK を使用して、変更追跡チャネルから増分データをサブスクライブおよび消費できます。Lindorm Tunnel Service(LTS)Web UI で、変更追跡チャネルを作成、表示、および削除できます。

前提条件

クライアントの IP アドレスが Lindorm インスタンスのホワイトリストに追加されていること。詳細については、「ホワイトリストを構成する」をご参照ください。

変更追跡機能が有効になっていること。詳細については、「変更追跡を有効にする」をご参照ください。

手順

  1. LTS Web UI にログオンします。左側のナビゲーション ペインで、[変更データ キャプチャ] > [プル] を選択します。

    streamone

  2. 表示されたページで、[サブスクリプションの作成] をクリックし、次の表に記載されているパラメーターを構成します。

    パラメーター

    説明

    Lindorm データソース

    Lindorm インスタンスの ID を選択します。

    Lindorm テーブル

    変更追跡チャネルを作成するテーブルを選択します。変更追跡チャネルには 1 つのテーブルのみを選択できます。

    トピック

    サブスクライブしたデータを消費するトピックの名前を入力します。

    データ有効期限(日)

    サブスクライブしたデータを保持できる日数を指定します。デフォルト値:7。

    トピック パーティション数

    Kafka クライアントのトピックのパーティション数を指定します。複数のパーティションのデータを同時に消費できます。デフォルト値:4。

  3. [コミット] をクリックします。

  4. オプション。作成したチャネルに対応する [アクション] 列の [詳細] をクリックします。表示されたページで、変更追跡チャネル、データ消費、およびストレージ使用量に関する詳細を表示できます。

  5. オプション。サブスクライブしたデータを消費します。Kafka クライアントを構成するときに、次のコードを追加してデータ消費を実装できます。

    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.kafka.clients.admin.AdminClientConfig;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.ByteArrayDeserializer;
    
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Properties;
    
    public class TestConsume {
      public static void main(String[] args) throws Exception {
        // 消費するデータが属するトピックの名前を指定します。トピック名は、変更追跡チャネルの作成時に指定した名前と同じである必要があります。
        String topic = "test-topic";
    
        // Lindorm インスタンスのエンドポイントへの接続に使用する構成項目を指定します。
        Properties props = new Properties();
        // Lindorm インスタンスのエンドポイントを指定します。
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "ld-xxx:9092");
        // キーのシリアル化に使用するシリアライザーを指定します。この例で使用されているのと同じシリアライザーを使用する必要があります。
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        // 値のシリアル化に使用するシリアライザーを指定します。この例で使用されているのと同じシリアライザーを使用する必要があります。
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        // コンシューマー グループの名前を指定します。コンシューマー グループは、データが消費されると自動的に作成されます。
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id-0");
    
        // コンシューマーを作成します。
        KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
        // トピックをサブスクライブします。
        consumer.subscribe(Arrays.asList(topic));
    
        // コンシューマーを使用してデータをプルします。
        ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(10000));
        for (ConsumerRecord<byte[], byte[]> record : records) {
          // データを表示します。
          System.out.println("key: " + Bytes.toString(record.key()));
          System.out.println("value: " + Bytes.toString(record.value()));
        }
        // 現在のコンシューマー オフセットをコミットします。
        consumer.commitSync();
        // コンシューマーを無効にします。
        consumer.close();
      }
    }
    説明

    変更追跡機能によって生成されるデータの形式の詳細については、「データ形式」をご参照ください。