すべてのプロダクト
Search
ドキュメントセンター

Elasticsearch:Realtime Compute for Apache Flink を使用した Elasticsearch へのデータ処理と同期

最終更新日:Dec 07, 2025

ログ検索システムを構築するために、Realtime Compute for Apache Flink を使用してログデータを処理し、検索のために Elasticsearch に書き込むことができます。このトピックでは、Alibaba Cloud の Simple Log Service (SLS) を例として、このプロセスについて説明します。

前提条件

以下のタスクを完了していること。

背景情報

Realtime Compute for Apache Flink は、Alibaba Cloud が公式にサポートする Flink ベースのサービスです。Kafka や Elasticsearch など、さまざまなソースシステムとシンクシステムをサポートしています。Realtime Compute for Apache Flink と Elasticsearch の組み合わせは、典型的なログ検索シナリオの要件を満たします。

Kafka や SLS などのシステムからのログは、Flink によって単純または複雑な計算で処理された後、検索のために Elasticsearch に書き込まれます。Flink の強力な計算能力と Elasticsearch の強力な検索能力を組み合わせることで、ビジネス向けのリアルタイムなデータ変換とクエリを実装できます。これにより、リアルタイムサービスへの移行が容易になります。

Realtime Compute for Apache Flink は、Elasticsearch に接続する簡単な方法を提供します。たとえば、ビジネスログやデータが SLS に書き込まれ、検索のために Elasticsearch に書き込む前に処理が必要な場合は、次の図に示すパイプラインを使用できます。Flink+ES data link

手順

  1. Realtime Compute for Apache Flink コンソールにログインします。

  2. Realtime Compute for Apache Flink ジョブを作成します。

    詳細については、Alibaba Cloud Blink 排他的モード (Alibaba Cloud では新規購入停止) ドキュメントBlink SQL 開発ガイドジョブ開発 > 開発セクションをご参照ください。

  3. Flink SQL を記述します。

    1. Simple Log Service のソーステーブルを作成します。

      create table sls_stream(
        a int,
        b int,
        c VARCHAR
      )
      WITH (
        type ='sls',  
        endPoint ='<yourEndpoint>',
        accessId ='<yourAccessId>',
        accessKey ='<yourAccessKey>',
        startTime = '<yourStartTime>',
        project ='<yourProjectName>',
        logStore ='<yourLogStoreName>',
        consumerGroup ='<yourConsumerGroupName>'
      );

      WITH 句のパラメーターは次の表のとおりです。

      変数

      説明

      endPoint

      Alibaba Cloud SLS のパブリックエンドポイント。これは、対応するプロジェクトとそのログデータにアクセスするための URL です。詳細については、「エンドポイント」をご参照ください。

      たとえば、中国 (杭州) リージョンの SLS のエンドポイントは http://cn-hangzhou.log.aliyuncs.com です。エンドポイントは http:// で始まる必要があります。

      accessId

      ご利用の AccessKey ID。

      accessKey

      ご利用の AccessKey Secret。

      startTime

      ログの消費を開始する時点。Flink ジョブを実行する際、選択した時間はここで設定した時間より後である必要があります。

      project

      Simple Log Service プロジェクトの名前です。

      logStore

      プロジェクト内のログストアの名前です。

      consumerGroup

      SLS のコンシューマーグループの名前。

    2. Elasticsearch の結果テーブルを作成します。

      重要
      • Realtime Compute for Apache Flink バージョン 3.2.2 以降は、Elasticsearch の結果テーブルをサポートしています。Flink ジョブを作成する際は、サポートされているバージョンを選択してください。

      • Elasticsearch の結果テーブルは REST API を使用して実装されており、すべての Elasticsearch バージョンと互換性があります。

      CREATE TABLE es_stream_sink(
        a int,
        cnt BIGINT,
        PRIMARY KEY(a)
      )
      WITH(
        type ='elasticsearch-7',
        endPoint = 'http://<instanceid>.public.elasticsearch.aliyuncs.com:<port>',
        accessId = '<yourAccessId>',
        accessKey = '<yourAccessSecret>',
        index = '<yourIndex>',
        typeName = '<yourTypeName>'
      );

      WITH 句のパラメーターは次の表のとおりです。

      パラメーター

      説明

      endPoint

      ご利用の Alibaba Cloud Elasticsearch インスタンスのパブリックエンドポイント。フォーマットは http://<instanceid>.public.elasticsearch.aliyuncs.com:9200 です。この情報は、インスタンスの基本情報ページから取得できます。詳細については、「インスタンスの基本情報の表示」をご参照ください。

      accessId

      Alibaba Cloud Elasticsearch インスタンスにアクセスするためのユーザー名。デフォルトは elastic です。

      accessKey

      ユーザーのパスワード。elastic ユーザーのパスワードは、インスタンスの作成時に設定します。パスワードを忘れた場合は、リセットできます。パスワードのリセットに関する注意事項と手順の詳細については、「インスタンスのアクセスパスワードのリセット」をご参照ください。

      index

      インデックス名。インデックスを作成していない場合は、まず作成してください。詳細については、「初心者向けガイド:インスタンス作成からデータ取得まで」をご参照ください。また、インデックスの自動作成を有効にすることもできます。詳細については、「YML パラメーターの設定」をご参照ください。

      typeName

      インデックスタイプ。Elasticsearch V7.0 以降のインスタンスの場合、これは _doc である必要があります。

      説明
      • Elasticsearch は、プライマリキーを使用したドキュメントの更新をサポートしています。PRIMARY KEY として指定できるフィールドは 1 つだけです。PRIMARY KEY を指定すると、PRIMARY KEY フィールドの値がドキュメント ID として使用されます。PRIMARY KEY が指定されていない場合、システムはランダムにドキュメント ID を生成します。詳細については、「Index API」をご参照ください。

      • Elasticsearch は複数の更新モードをサポートしています。モードは、WITH 句の updateMode パラメーターを使用して指定できます:

        • updateMode=full の場合、新しいドキュメントが既存のドキュメントを完全に上書きします。

        • updateMode=inc の場合、Elasticsearch は入力フィールド値に基づいて対応するフィールドを更新します。

      • Elasticsearch のすべての更新は、デフォルトで UPSERT セマンティクス (INSERT または UPDATE を意味します) になります。

    3. データを処理および同期するためのビジネスロジックを記述します。

      INSERT INTO es_stream_sink
      SELECT 
        a,
        count(*) as cnt
      FROM sls_stream GROUP BY a
  4. ジョブを公開して開始します。

    ジョブを公開して開始すると、SLS からのデータが集計され、Alibaba Cloud Elasticsearch に書き込まれます。

関連情報

Realtime Compute for Apache Flink と Elasticsearch を使用すると、リアルタイム検索パイプラインを迅速に作成できます。Elasticsearch へのデータ書き込みに関するより複雑な要件がある場合は、Realtime Compute for Apache Flink のカスタムシンク機能を使用できます。