Lindormストリーミングエンジンは、Flink SQLと完全な互換性があります。 Flink SQLを使用してLindormストリーミングエンジンでリアルタイムコンピューティングタスクを作成し、Apache Kafkaトピックに格納されている生データを効率的に処理できます。 このトピックでは、Flink SQLを使用してコンピューティングタスクを送信し、Apache KafkaトピックからLindormワイドテーブルにデータをインポートする方法について説明します。
前提条件
LindormインスタンスでLindormストリーミングエンジンがアクティブ化されていること。 詳細については、「ストリーミングエンジンをアクティブ化する」をご参照ください。
クライアントのIPアドレスがLindormインスタンスのホワイトリストに追加されていること。 詳細については、「ホワイトリストを設定する」をご参照ください。
使用上の注意
- LindormインスタンスとECSインスタンスが同じリージョンにデプロイされていること。 ネットワークレイテンシを削減するために、2つのインスタンスを同じゾーンにデプロイすることをお勧めします。
- LindormインスタンスとECSインスタンスが同じVPCにデプロイされていること。
手順
ステップ 1:データの準備
Kafka APIを使用して、処理するデータをKafkaトピックに書き込みます。 次のいずれかの方法を使用してデータを書き込むことができます。
このトピックでは、例としてオープンソースのKafkaスクリプトツールを使用してデータを書き込みます。
# トピックを作成します。 ./kafka-topics.sh --bootstrap-server <Lindorm Stream Kafka Endpoint> --topic log_topic --create # トピックにデータを書き込みます。 ./kafka-console-producer.sh --bootstrap-server <Lindorm Stream Kafka Endpoint> --topic log_topic {"loglevel": "INFO", "thread":"thread-1", "class": "com.alibaba.stream.test", "detail":"thread-1 info detail", "timestamp": "1675840911549"} {"loglevel": "ERROR", "thread":"thread-2", "class": "com.alibaba.stream.test", "detail":"thread-2 error detail", "timestamp": "1675840911549"} {"loglevel": "WARN", "thread":"thread-3", "class": "com.alibaba.stream.test", "detail":"thread-3 warn detail", "timestamp": "1675840911549"} {"loglevel": "ERROR", "thread":"thread-4", "class": "com.alibaba.stream.test", "detail":"thread-4 error detail", "timestamp": "1675840911549"}[lindorm Stream Kafkaエンドポイント] の表示方法の詳細については、「エンドポイントを表示する」をご参照ください。
処理結果を格納するために、LindormTableに結果テーブルを作成します。
Lindorm-cliを使用してLindormTableに接続します。 詳細については、「Lindorm-cliを使用してLindormTableに接続して使用する」をご参照ください。
logという名前の結果テーブルを作成します。CREATE TABLE IF NOT EXISTS log ( loglevel VARCHAR, thread VARCHAR, class VARCHAR, detail VARCHAR, timestamp BIGINT, primary key (loglevel, thread) );
ステップ 2:Lindormストリーミングエンジンクライアントをインストールする
ECSインスタンスで次のコマンドを実行して、Lindormストリーミングエンジンクライアントのパッケージをダウンロードします。
wget https://hbaseuepublic.oss-cn-beijing.aliyuncs.com/lindorm-sqlline-2.0.2.tar.gz次のコマンドを実行して、パッケージを解凍します。
tar zxvf lindorm-sqlline-2.0.2.tar.gzlindorm-sqlline-2.0.2/binパスに移動し、次のコマンドを実行してLindormストリーミングエンジンに接続します。./lindorm-sqlline -url <Lindorm Stream SQL Endpoint>[lindorm Stream SQLエンドポイント] の表示方法の詳細については、「エンドポイントを表示する」をご参照ください。
ステップ 3:Lindormストリーミングエンジンでコンピューティングタスクを送信する
このステップで説明する例では、次の操作が実行されます。
log_to_lindormという名前のFlinkジョブを作成し、originalDataとlindorm_log_tableという名前の2つのテーブルを作成します。 originalDataテーブルは、Kafkaトピックに関連付けられたソーステーブルです。 lindorm_log_tableは、結果ログを格納するシンクテーブルです。
loglevelがERRORであるログをフィルタリングし、結果テーブルに書き込むストリームジョブを作成します。
サンプルコード:
CREATE FJOB log_to_lindorm(
-- Kafkaソーステーブルを作成します。
CREATE TABLE originalData(
`loglevel` VARCHAR,
`thread` VARCHAR,
`class` VARCHAR,
`detail` VARCHAR,
`timestamp` BIGINT
)WITH(
'connector'='kafka',
'topic'='log_topic',
'scan.startup.mode'='earliest-offset',
'properties.bootstrap.servers'='Lindorm Stream Kafka Endpoint',
'format'='json'
);
-- Lindormワイドテーブルを作成します。
CREATE TABLE lindorm_log_table(
`loglevel` VARCHAR,
`thread` VARCHAR,
`class` VARCHAR,
`detail` VARCHAR,
`timestamp` BIGINT,
PRIMARY KEY (`loglevel`, `thread`) NOT ENFORCED
)WITH(
'connector'='lindorm',
'seedServer'='LindormTable Endpoint for HBase APIs',
'userName'='root',
'password'='test',
'tableName'='log',
'namespace'='default'
);
-- KafkaトピックのデータからERRORログをフィルタリングし、結果ワイドテーブルに書き込みます。
INSERT INTO lindorm_log_table SELECT * FROM originalData WHERE loglevel = 'ERROR';
);HBase API用のLindormTableエンドポイントの表示方法の詳細については、「エンドポイントを表示する」をご参照ください。
ワイドテーブルへの接続に使用されるコネクタの詳細については、「Lindormストリーミングエンジン用のワイドテーブルコネクタを設定する」をご参照ください。
ステップ 4:処理結果のクエリ
次のいずれかの方法を使用して、処理結果をクエリできます。
Lindorm-cliを使用してLindormTableに接続し、次のコマンドを実行して処理結果をクエリします。
SELECT * FROM log LIMIT 5;次の結果が返されます。
+----------+----------+-------------------------+-----------------------+---------------+ | loglevel | thread | class | detail | timestamp | +----------+----------+-------------------------+-----------------------+---------------+ | ERROR | thread-2 | com.alibaba.stream.test | thread-2 error detail | 1675840911549 | | ERROR | thread-4 | com.alibaba.stream.test | thread-4 error detail | 1675840911549 | +----------+----------+-------------------------+-----------------------+---------------+LindormTableのクラスタ管理システムを使用して、処理結果をクエリします。 詳細については、「データクエリ」をご参照ください。