ログ検索システムを構築するために、Realtime Compute for Apache Flink を使用してログデータを処理し、検索のために Elasticsearch に書き込むことができます。このトピックでは、Alibaba Cloud の Simple Log Service (SLS) を例として、このプロセスについて説明します。
前提条件
以下のタスクを完了していること。
Realtime Compute for Apache Flink をアクティブ化し、プロジェクトを作成します。
Alibaba Cloud Elasticsearch インスタンスを作成済みであること。
詳細については、「Alibaba Cloud Elasticsearch インスタンスの作成」をご参照ください。
SLS を有効化し、プロジェクトと Logstore を作成済みであること。
詳細については、「Alibaba Cloud Simple Log Service の有効化」、「プロジェクトの管理」、および「基本的な Logstore の作成」をご参照ください。
背景情報
Realtime Compute for Apache Flink は、Alibaba Cloud が公式にサポートする Flink ベースのサービスです。Kafka や Elasticsearch など、さまざまなソースシステムとシンクシステムをサポートしています。Realtime Compute for Apache Flink と Elasticsearch の組み合わせは、典型的なログ検索シナリオの要件を満たします。
Kafka や SLS などのシステムからのログは、Flink によって単純または複雑な計算で処理された後、検索のために Elasticsearch に書き込まれます。Flink の強力な計算能力と Elasticsearch の強力な検索能力を組み合わせることで、ビジネス向けのリアルタイムなデータ変換とクエリを実装できます。これにより、リアルタイムサービスへの移行が容易になります。
Realtime Compute for Apache Flink は、Elasticsearch に接続する簡単な方法を提供します。たとえば、ビジネスログやデータが SLS に書き込まれ、検索のために Elasticsearch に書き込む前に処理が必要な場合は、次の図に示すパイプラインを使用できます。
手順
Realtime Compute for Apache Flink ジョブを作成します。
詳細については、Alibaba Cloud Blink 排他的モード (Alibaba Cloud では新規購入停止) ドキュメントのBlink SQL 開発ガイドのジョブ開発 > 開発セクションをご参照ください。
Flink SQL を記述します。
Simple Log Service のソーステーブルを作成します。
create table sls_stream( a int, b int, c VARCHAR ) WITH ( type ='sls', endPoint ='<yourEndpoint>', accessId ='<yourAccessId>', accessKey ='<yourAccessKey>', startTime = '<yourStartTime>', project ='<yourProjectName>', logStore ='<yourLogStoreName>', consumerGroup ='<yourConsumerGroupName>' );WITH 句のパラメーターは次の表のとおりです。
変数
説明
endPoint
Alibaba Cloud SLS のパブリックエンドポイント。これは、対応するプロジェクトとそのログデータにアクセスするための URL です。詳細については、「エンドポイント」をご参照ください。
たとえば、中国 (杭州) リージョンの SLS のエンドポイントは http://cn-hangzhou.log.aliyuncs.com です。エンドポイントは http:// で始まる必要があります。
accessId
ご利用の AccessKey ID。
accessKey
ご利用の AccessKey Secret。
startTime
ログの消費を開始する時点。Flink ジョブを実行する際、選択した時間はここで設定した時間より後である必要があります。
project
Simple Log Service プロジェクトの名前です。
logStore
プロジェクト内のログストアの名前です。
consumerGroup
SLS のコンシューマーグループの名前。
Elasticsearch の結果テーブルを作成します。
重要Realtime Compute for Apache Flink バージョン 3.2.2 以降は、Elasticsearch の結果テーブルをサポートしています。Flink ジョブを作成する際は、サポートされているバージョンを選択してください。
Elasticsearch の結果テーブルは REST API を使用して実装されており、すべての Elasticsearch バージョンと互換性があります。
CREATE TABLE es_stream_sink( a int, cnt BIGINT, PRIMARY KEY(a) ) WITH( type ='elasticsearch-7', endPoint = 'http://<instanceid>.public.elasticsearch.aliyuncs.com:<port>', accessId = '<yourAccessId>', accessKey = '<yourAccessSecret>', index = '<yourIndex>', typeName = '<yourTypeName>' );WITH 句のパラメーターは次の表のとおりです。
パラメーター
説明
endPoint
ご利用の Alibaba Cloud Elasticsearch インスタンスのパブリックエンドポイント。フォーマットは http://<instanceid>.public.elasticsearch.aliyuncs.com:9200 です。この情報は、インスタンスの基本情報ページから取得できます。詳細については、「インスタンスの基本情報の表示」をご参照ください。
accessId
Alibaba Cloud Elasticsearch インスタンスにアクセスするためのユーザー名。デフォルトは elastic です。
accessKey
ユーザーのパスワード。elastic ユーザーのパスワードは、インスタンスの作成時に設定します。パスワードを忘れた場合は、リセットできます。パスワードのリセットに関する注意事項と手順の詳細については、「インスタンスのアクセスパスワードのリセット」をご参照ください。
index
インデックス名。インデックスを作成していない場合は、まず作成してください。詳細については、「初心者向けガイド:インスタンス作成からデータ取得まで」をご参照ください。また、インデックスの自動作成を有効にすることもできます。詳細については、「YML パラメーターの設定」をご参照ください。
typeName
インデックスタイプ。Elasticsearch V7.0 以降のインスタンスの場合、これは _doc である必要があります。
説明Elasticsearch は、プライマリキーを使用したドキュメントの更新をサポートしています。
PRIMARY KEYとして指定できるフィールドは 1 つだけです。PRIMARY KEYを指定すると、PRIMARY KEYフィールドの値がドキュメント ID として使用されます。PRIMARY KEYが指定されていない場合、システムはランダムにドキュメント ID を生成します。詳細については、「Index API」をご参照ください。Elasticsearch は複数の更新モードをサポートしています。モードは、WITH 句の updateMode パラメーターを使用して指定できます:
updateMode=fullの場合、新しいドキュメントが既存のドキュメントを完全に上書きします。updateMode=incの場合、Elasticsearch は入力フィールド値に基づいて対応するフィールドを更新します。
Elasticsearch のすべての更新は、デフォルトで UPSERT セマンティクス (INSERT または UPDATE を意味します) になります。
データを処理および同期するためのビジネスロジックを記述します。
INSERT INTO es_stream_sink SELECT a, count(*) as cnt FROM sls_stream GROUP BY a
ジョブを公開して開始します。
ジョブを公開して開始すると、SLS からのデータが集計され、Alibaba Cloud Elasticsearch に書き込まれます。
関連情報
Realtime Compute for Apache Flink と Elasticsearch を使用すると、リアルタイム検索パイプラインを迅速に作成できます。Elasticsearch へのデータ書き込みに関するより複雑な要件がある場合は、Realtime Compute for Apache Flink のカスタムシンク機能を使用できます。