すべてのプロダクト
Search
ドキュメントセンター

:Kafkaを使用してLindormストリーミングエンジンにデータを書き込む

最終更新日:Jan 14, 2025

Lindormストリーミングエンジンは、Flink SQLと完全な互換性があります。 Flink SQLを使用してLindormストリーミングエンジンでリアルタイムコンピューティングタスクを作成し、Apache Kafkaトピックに格納されている生データを効率的に処理できます。 このトピックでは、Flink SQLを使用してコンピューティングタスクを送信し、Apache KafkaトピックからLindormワイドテーブルにデータをインポートする方法について説明します。

前提条件

使用上の注意

アプリケーションがElastic Compute Service(ECS)インスタンスにデプロイされており、VPC経由でLindormインスタンスに接続する必要がある場合は、ネットワーク接続を確保するために、LindormインスタンスとECSインスタンスが次の要件を満たしていることを確認してください。
  • LindormインスタンスとECSインスタンスが同じリージョンにデプロイされていること。 ネットワークレイテンシを削減するために、2つのインスタンスを同じゾーンにデプロイすることをお勧めします。
  • LindormインスタンスとECSインスタンスが同じVPCにデプロイされていること。

手順

ステップ 1:データの準備

  1. Kafka APIを使用して、処理するデータをKafkaトピックに書き込みます。 次のいずれかの方法を使用してデータを書き込むことができます。

    このトピックでは、例としてオープンソースのKafkaスクリプトツールを使用してデータを書き込みます。

    # トピックを作成します。
    ./kafka-topics.sh --bootstrap-server <Lindorm Stream Kafka Endpoint> --topic log_topic --create
    
    # トピックにデータを書き込みます。
    ./kafka-console-producer.sh --bootstrap-server <Lindorm Stream Kafka Endpoint> --topic log_topic
    {"loglevel": "INFO", "thread":"thread-1", "class": "com.alibaba.stream.test", "detail":"thread-1 info detail", "timestamp": "1675840911549"}
    {"loglevel": "ERROR", "thread":"thread-2", "class": "com.alibaba.stream.test", "detail":"thread-2 error detail", "timestamp": "1675840911549"}
    {"loglevel": "WARN", "thread":"thread-3", "class": "com.alibaba.stream.test", "detail":"thread-3 warn detail", "timestamp": "1675840911549"}
    {"loglevel": "ERROR", "thread":"thread-4", "class": "com.alibaba.stream.test", "detail":"thread-4 error detail", "timestamp": "1675840911549"}

    [lindorm Stream Kafkaエンドポイント] の表示方法の詳細については、「エンドポイントを表示する」をご参照ください。

  2. 処理結果を格納するために、LindormTableに結果テーブルを作成します。

    1. Lindorm-cliを使用してLindormTableに接続します。 詳細については、「Lindorm-cliを使用してLindormTableに接続して使用する」をご参照ください。

    2. log という名前の結果テーブルを作成します。

      CREATE TABLE IF NOT EXISTS log (
        loglevel VARCHAR,
        thread VARCHAR,
        class VARCHAR,
        detail VARCHAR,
        timestamp BIGINT,
      primary key (loglevel, thread) );

ステップ 2:Lindormストリーミングエンジンクライアントをインストールする

  1. ECSインスタンスで次のコマンドを実行して、Lindormストリーミングエンジンクライアントのパッケージをダウンロードします。

    wget https://hbaseuepublic.oss-cn-beijing.aliyuncs.com/lindorm-sqlline-2.0.2.tar.gz
  2. 次のコマンドを実行して、パッケージを解凍します。

    tar zxvf lindorm-sqlline-2.0.2.tar.gz
  3. lindorm-sqlline-2.0.2/bin パスに移動し、次のコマンドを実行してLindormストリーミングエンジンに接続します。

    ./lindorm-sqlline -url <Lindorm Stream SQL Endpoint>

    [lindorm Stream SQLエンドポイント] の表示方法の詳細については、「エンドポイントを表示する」をご参照ください。

ステップ 3:Lindormストリーミングエンジンでコンピューティングタスクを送信する

このステップで説明する例では、次の操作が実行されます。

  1. log_to_lindormという名前のFlinkジョブを作成し、originalDataとlindorm_log_tableという名前の2つのテーブルを作成します。 originalDataテーブルは、Kafkaトピックに関連付けられたソーステーブルです。 lindorm_log_tableは、結果ログを格納するシンクテーブルです。

  2. loglevelがERRORであるログをフィルタリングし、結果テーブルに書き込むストリームジョブを作成します。

サンプルコード:

CREATE FJOB log_to_lindorm(
    -- Kafkaソーステーブルを作成します。
    CREATE TABLE originalData(
        `loglevel` VARCHAR,
        `thread` VARCHAR,
        `class` VARCHAR,
        `detail` VARCHAR,
        `timestamp` BIGINT
    )WITH(
        'connector'='kafka',
        'topic'='log_topic',
        'scan.startup.mode'='earliest-offset',
        'properties.bootstrap.servers'='Lindorm Stream Kafka Endpoint',
        'format'='json'
    );
    -- Lindormワイドテーブルを作成します。
    CREATE TABLE lindorm_log_table(
        `loglevel` VARCHAR,
        `thread` VARCHAR,
        `class` VARCHAR,
        `detail` VARCHAR,
        `timestamp` BIGINT,
        PRIMARY KEY (`loglevel`, `thread`) NOT ENFORCED
    )WITH(
        'connector'='lindorm',
        'seedServer'='LindormTable Endpoint for HBase APIs',
        'userName'='root',
        'password'='test',
        'tableName'='log',
        'namespace'='default'
    );
    -- KafkaトピックのデータからERRORログをフィルタリングし、結果ワイドテーブルに書き込みます。
    INSERT INTO lindorm_log_table SELECT * FROM originalData WHERE loglevel = 'ERROR';
);
説明

ステップ 4:処理結果のクエリ

次のいずれかの方法を使用して、処理結果をクエリできます。

  • Lindorm-cliを使用してLindormTableに接続し、次のコマンドを実行して処理結果をクエリします。

    SELECT * FROM log LIMIT 5;

    次の結果が返されます。

    +----------+----------+-------------------------+-----------------------+---------------+
    | loglevel |  thread  |          class          |        detail         |   timestamp   |
    +----------+----------+-------------------------+-----------------------+---------------+
    | ERROR    | thread-2 | com.alibaba.stream.test | thread-2 error detail | 1675840911549 |
    | ERROR    | thread-4 | com.alibaba.stream.test | thread-4 error detail | 1675840911549 |
    +----------+----------+-------------------------+-----------------------+---------------+                                     
  • LindormTableのクラスタ管理システムを使用して、処理結果をクエリします。 詳細については、「データクエリ」をご参照ください。