すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Elasticsearch コネクタ

最終更新日:Jun 07, 2025

このトピックでは、Elasticsearch コネクタの使用方法について説明します。

背景情報

Alibaba Cloud Elasticsearch は、Security、Machine Learning、Graph、Application Performance Monitoring(APM)などのオープンソース Elasticsearch の機能と互換性があります。 Alibaba Cloud Elasticsearch は、データ分析やデータ検索など、さまざまなシナリオに適しています。 Alibaba Cloud Elasticsearch は、アクセス制御、セキュリティ監視とアラート、レポートの自動生成などのエンタープライズクラスのサービスを提供します。

次の表に、Elasticsearch コネクタでサポートされている機能を示します。

項目

説明

テーブルの種類

ソーステーブル、ディメンションテーブル、およびシンクテーブル

実行モード

バッチモードとストリーミングモード

データ形式

JSON

メトリック

  • ソーステーブルのメトリック

    • pendingRecords

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerSecond

  • ディメンションテーブルのメトリック

    デフォルト値なし

  • Ververica Runtime(VVR)6.0.6 以後の Realtime Compute for Apache Flink のシンクテーブルのメトリック

    • numRecordsOut

    • numRecordsOutPerSecond

説明

メトリックの詳細については、「メトリック」をご参照ください。

API タイプ

DataStream API と SQL API

シンクテーブルのデータの更新または削除

サポートされています

前提条件

制限

  • Elasticsearch コネクタは、関連する Elasticsearch クラスタのバージョンが V6.8.X 以降で V8.X より前の場合にのみ、ソーステーブルとディメンションテーブルに使用できます。

  • Elasticsearch コネクタは、関連する Elasticsearch クラスタのバージョンが V6.X、V7.X、または V8.X の場合にのみ、シンクテーブルに使用できます。

  • Elasticsearch コネクタは、完全な Elasticsearch ソーステーブルにのみ使用でき、増分 Elasticsearch ソーステーブルには使用できません。

構文

  • ソーステーブルを作成する:

    CREATE TABLE elasticsearch_source(
      name STRING,
      location STRING,
      value FLOAT
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'indexName' = '<yourIndexName>'
    );
  • ディメンションテーブルを作成する:

    CREATE TABLE es_dim(
      field1 STRING, --- このフィールドがディメンションテーブルと別のテーブルを結合するためのキーとして使用される場合、このフィールドの値は STRING データ型である必要があります。
      field2 FLOAT,
      field3 BIGINT,
      PRIMARY KEY (field1) NOT ENFORCED
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'indexName' = '<yourIndexName>'
    );
    説明
    • ディメンションテーブルにプライマリキーを定義する場合、ディメンションテーブルと別のテーブルを結合するために使用できるキーは 1 つだけです。 キーは、Elasticsearch インデックス内のドキュメントの ID です。

    • ディメンションテーブルにプライマリキーを定義しない場合、ディメンションテーブルと別のテーブルを結合するために 1 つ以上のキーを使用できます。 キーは、Elasticsearch インデックス内のドキュメントのフィールドです。

    • デフォルトでは、互換性を確保するために、STRING データ型のフィールド名に .keyword 接尾辞が追加されます。 Elasticsearch テーブルの TEXT データ型のフィールドが一致しない場合は、ignoreKeywordSuffix オプションの値を true に設定できます。

  • シンクテーブルを作成する:

    CREATE TABLE es_sink(
      user_id   STRING,
      user_name   STRING,
      uv BIGINT,
      pv BIGINT,
      PRIMARY KEY (user_id) NOT ENFORCED
    ) WITH (
      'connector' = 'elasticsearch-7' -- Elasticsearch のバージョンが V6.X の場合は、elasticsearch-6 と入力します。
      'hosts' = '<yourHosts>',
      'index' = '<yourIndex>'
    );
    説明
    • Elasticsearch シンクテーブルは、プライマリキーが定義されているかどうかに基づいて、upsert モードまたは append モードで動作するように決定されます。

      • Elasticsearch シンクテーブルにプライマリキーが定義されている場合、プライマリキーはドキュメント ID である必要があり、Elasticsearch シンクテーブルは upsert モードで動作します。 このモードでは、Elasticsearch シンクテーブルは UPDATE メッセージと DELETE メッセージを使用できます。

      • Elasticsearch シンクテーブルにプライマリキーが定義されていない場合、Elasticsearch はランダムなドキュメント ID を自動的に生成し、Elasticsearch シンクテーブルは append モードで動作します。 このモードでは、Elasticsearch シンクテーブルは INSERT メッセージのみを使用できます。

    • BYTES、ROW、ARRAY、MAP などの特定のデータ型は文字列として表現できません。 したがって、これらのデータ型のフィールドをプライマリキーフィールドとして使用することはできません。

    • DDL 文のフィールドは、Elasticsearch ドキュメントのフィールドに対応しています。 ドキュメント ID などのメタデータは、Elasticsearch クラスタで保持されます。 したがって、メタデータを Elasticsearch シンクテーブルに書き込むことはできません。

WITH 句のコネクタオプション

  • ソース固有

    オプション

    説明

    データ型

    必須

    デフォルト値

    備考

    connector

    ソーステーブルのタイプ。

    STRING

    はい

    デフォルト値なし

    値を elasticsearch に設定します。

    endPoint

    Elasticsearch クラスタのエンドポイント。

    STRING

    はい

    デフォルト値なし

    例:http://127.0.0.1:XXXX

    indexName

    Elasticsearch インデックスの名前。

    STRING

    はい

    デフォルト値なし

    該当なし。

    accessId

    Elasticsearch クラスタにアクセスするために使用されるユーザー名。

    STRING

    いいえ

    デフォルト値なし

    デフォルトでは、このオプションは空です。 これは、権限検証が不要であることを示します。 accessId オプションを構成する場合は、 accessKey オプションも構成する必要があります。

    重要

    セキュリティを強化するために、プレーンテキストで AccessKey ペアをハードコーディングする代わりに変数を使用してください。 詳細については、「変数を管理する」をご参照ください。

    accessKey

    Elasticsearch クラスタにアクセスするために使用されるパスワード。

    STRING

    いいえ

    デフォルト値なし

    typeNames

    タイプの名前。

    STRING

    いいえ

    _doc

    Elasticsearch クラスタのバージョンが V7.0 以降の場合は、このオプションを構成しないことをお勧めします。

    batchSize

    スクロールリクエストごとに Elasticsearch クラスタから取得できるドキュメントの最大数。

    INT

    いいえ

    2000

    該当なし。

    keepScrollAliveSecs

    スクロールコンテキストの最大保持期間。

    INT

    いいえ

    3600

    単位:秒。

  • シンク固有

    オプション

    説明

    データ型

    必須

    デフォルト値

    備考

    connector

    シンクテーブルのタイプ。

    String

    はい

    デフォルト値なし

    有効な値:elasticsearch-6elasticsearch-7、およびelasticsearch-8

    説明

    VVR 8.0.5 以降でのみ elasticsearch-8 がサポートされています。

    hosts

    Elasticsearch クラスタのエンドポイント。

    String

    はい

    デフォルト値なし

    例:127.0.0.1:XXXX

    index

    Elasticsearch インデックスの名前。

    String

    はい

    デフォルト値なし

    Elasticsearch シンクテーブルは、静的インデックスと動的インデックスの両方をサポートしています。 静的インデックスと動的インデックスを使用する場合は、次の点に注意してください。

    • 静的インデックスを使用する場合、index オプションの値は、myusers などの文字列である必要があります。 すべてのレコードは myusers インデックスに書き込まれます。

    • 動的インデックスを使用する場合、{field_name} を使用してレコード内のフィールド値を参照し、宛先インデックスを動的に生成できます。 また、{field_namedate_format_string} を使用して、TIMESTAMP、DATE、および TIME データ型のフィールド値を date_format_string で指定された形式に変換することもできます。 date_format_string は、Java の DateTimeFormatter と互換性があります。 たとえば、動的インデックスを myusers-{log_tsyyyy-MM-dd} に設定した場合、log_ts フィールドの値にあるレコード 2020-03-27 12:25:55 は、myusers-2020-03-27 インデックスに書き込まれます。

    document-type

    ドキュメントのタイプ。

    String

    • connector オプションが elasticsearch-6 に設定されている場合、このオプションを構成する必要があります。

    • connector オプションが elasticsearch-7 に設定されている場合、このオプションはサポートされていません。

    デフォルト値なし

    connector オプションが elasticsearch-6 に設定されている場合、このオプションの値は、Elasticsearch に構成されている type オプションの値と同じである必要があります。

    username

    Elasticsearch クラスタにアクセスするために使用されるユーザー名。

    String

    いいえ

    デフォルト値なし

    デフォルトでは、このオプションは空のままです。これは、権限検証が不要であることを示します。 username オプションを構成する場合は、password オプションも構成する必要があります。

    重要

    セキュリティを強化するために、プレーンテキストで AccessKey ペアをハードコーディングする代わりに変数を使用してください。 詳細については、「変数を管理する」をご参照ください。

    password

    Elasticsearch クラスタにアクセスするために使用されるパスワード。

    String

    いいえ

    デフォルト値なし

    document-id.key-delimiter

    複数のドキュメント ID を区切るために使用されるデリミタ。

    String

    いいえ

    _

    Elasticsearch シンク テーブルでは、プライマリキーを使用して Elasticsearch のドキュメント ID が計算されます。[document-id.key-delimiter] で指定されたキーデリミタを使用して、DDL 文で定義された順序ですべてのプライマリキーフィールドが連結されます。各行についてもドキュメント ID が生成されます。

    説明

    ドキュメント ID は、スペースを含まない最大 512 バイトの文字列です。

    failure-handler

    Elasticsearch リクエストが失敗した場合に使用されるエラー処理ポリシーです。

    String

    いいえ

    fail

    有効な値:

    • fail: リクエストが失敗した場合、デプロイメントは失敗します。これはデフォルト値です。

    • ignore: エラーは無視され、リクエストは削除されます。

    • retry_rejected: キューの容量が一杯でエラーが発生した場合、リクエストが再試行されます。

    • カスタム クラス名: ActionRequestFailureHandler サブクラスを使用して、エラーのトラブルシューティングを行います。

    sink.flush-on-checkpoint

    チェックポイント中にフラッシュ操作をトリガーするかどうかを指定します。

    ブール値

    いいえ

    true

    • true:チェックポイント中にフラッシュ操作がトリガーされます。これはデフォルト値です。

    • false:チェックポイント中にフラッシュ操作はトリガーされません。この機能が無効になっている場合、Elasticsearch コネクタはチェックポイント中にすべての保留中のリクエストが完了するかどうかを確認するまで待機しません。したがって、Elasticsearch コネクタはリクエストに対して少なくとも1回保証を提供しません。

    sink.bulk-flush.backoff.strategy

    sink.bulk-flush.backoff.strategy オプションを構成して、一時的なリクエストエラーが原因でフラッシュ操作が失敗した場合の再試行ポリシーを指定できます。

    Enum

    いいえ

    DISABLED

    • DISABLED:フラッシュ操作は再試行されません。最初のリクエストエラーが発生すると、フラッシュ操作は失敗します。これはデフォルト値です。

    • CONSTANT:各フラッシュ操作の待機時間は同じです。

    • EXPONENTIAL:各フラッシュ操作の待機時間は指数関数的に増加します。

    sink.bulk-flush.backoff.max-retries

    再試行の最大回数。

    整数

    いいえ

    デフォルト値はありません

    該当なし。

    sink.bulk-flush.backoff.delay

    再試行間の遅延。

    期間

    いいえ

    デフォルト値はありません

    • sink.bulk-flush.backoff.strategy オプションが CONSTANT に設定されている場合、このオプションの値は再試行間の遅延時間です。

    • sink.bulk-flush.backoff.strategy オプションが EXPONENTIAL に設定されている場合、このオプションの値は初期ベースライン遅延です。

    sink.bulk-flush.max-actions

    リクエストのバッチごとに実行できる最大フラッシュ操作数です。

    Int

    いいえ

    1000

    値 0 は、この機能が無効になっていることを示します。

    sink.bulk-flush.max-size

    リクエストが保存されるバッファーの最大メモリサイズ。

    String

    いいえ

    2 MB

    デフォルト値: 2 。単位: MB 。このオプションが 0 に設定されている場合、この機能は無効になります。

    sink.bulk-flush.interval

    フラッシュ操作が実行される間隔です。

    期間

    いいえ

    1s

    デフォルト値: 1 。単位:秒。このオプションが 0 に設定されている場合、この機能は無効になります。

    connection.path-prefix

    各 REST 通信に追加する必要があるプレフィックス。

    String

    いいえ

    デフォルト値はありません

    該当なし。

    リトライオンコンフリクト

    更新操作でバージョン競合が原因で許可される最大リトライ回数です。リトライ回数がこのオプションの値を超えると、例外が発生し、デプロイメントは失敗します。

    Int

    いいえ

    0

    説明
    • VVR 4.0.13 以降のみがこのオプションをサポートしています。

    • このオプションは、プライマリキーが指定されている場合にのみ有効になります。

    ルーティングフィールド

    Elasticsearch シンクテーブルの 1 つ以上のフィールド名。フィールド名は、ドキュメントを Elasticsearch クラスタの指定されたシャードにルーティングするために使用されます。

    String

    いいえ

    デフォルト値なし

    複数のフィールド名はセミコロン(;)で区切ります。フィールドが空の場合、フィールドは null に設定されます。

    説明

    connector オプションが elasticsearch-7 または elasticsearch-8 に設定されている場合、このオプションは VVR 8.0.6 以降でのみサポートされます。

    sink.delete-strategy

    削除メッセージ(DELETE または UPDATE)を受信したときに実行される操作です。

    Enum

    いいえ

    DELETE_ROW_ON_PK

    有効な値:

    • DELETE_ROW_ON_PK:UPDATE メッセージを無視し、DELETE メッセージを受信したときにプライマリキー値に一致する行(ドキュメント)を削除します。これはデフォルト値です。

    • IGNORE_DELETE:UPDATE メッセージと DELETE メッセージを無視します。Elasticsearch シンクでは削除は発生しません。

    • NON_PK_FIELD_TO_NULL:UPDATE メッセージを無視し、DELETE メッセージを受信したときにプライマリキー値に一致する行(ドキュメント)を変更します。プライマリキー値は変更されず、テーブルスキーマ内のプライマリキー以外の値は NULL に設定されます。この値は、複数のシンクを使用して同じ Elasticsearch テーブルにデータを書き込むときに、データを部分的に更新するために使用されます。

    • CHANGELOG_STANDARD:DELETE_ROW_ON_PK と似ています。唯一の違いは、UPDATE メッセージを受信したときにも、プライマリキー値に一致する行(ドキュメント)が削除されることです。

      説明

      VVR 8.0.8 以後のみがこのオプションをサポートしています。

    sink.ignore-null-when-update

    テーブルを更新するときにNULL値を無視するかどうかを指定します。

    ブール値

    いいえ

    false

    有効な値:

    • true: NULL値を無視します。

    • false: NULL値を無視しません。

    説明
    • プライマリキーを持つテーブル、かつ format オプションが JSON に設定されている場合のみ、true がサポートされます。

    • このオプションは VVR 11.1 以後でサポートされています。

  • ディメンションテーブル固有

    オプション

    説明

    データの型

    必須

    デフォルト値

    備考

    コネクタ

    ディメンションテーブルのタイプ。

    String

    はい

    デフォルト値なし

    値を elasticsearch に設定します。

    エンドポイント

    Elasticsearch クラスタのエンドポイント。

    String

    はい

    デフォルト値なし

    例: http://127.0.0.1:XXXX

    indexName

    Elasticsearch インデックスの名前。

    String

    はい

    デフォルト値なし

    該当なし。

    accessId

    Elasticsearch クラスタへのアクセスに使用するユーザー名。

    String

    いいえ

    デフォルト値なし

    デフォルトでは、このオプションは空です。これは、権限検証が不要であることを示します。 accessId オプションを構成する場合は、 accessKey オプションも構成する必要があります。

    重要

    セキュリティを強化するために、プレーンテキストで AccessKey ペアをハードコーディングする代わりに変数を使用してください。詳細については、「変数を管理する」をご参照ください。

    accessKey

    Elasticsearch クラスタへのアクセスに使用するパスワード。

    String

    いいえ

    デフォルト値なし

    typeNames

    タイプの名前。

    String

    いいえ

    _doc

    Elasticsearch クラスタのバージョンが V7.0 以降の場合は、このオプションを構成しないことをお勧めします。

    maxJoinRows

    結合できる最大行数。

    Integer

    いいえ

    1024

    該当なし。

    キャッシュ

    キャッシュポリシー。

    String

    いいえ

    なし

    有効な値:

    • ALL: ディメンションテーブルのすべてのデータがキャッシュされます。デプロイメントが実行される前に、システムはディメンションテーブルのすべてのデータをキャッシュにロードします。これにより、後続のすべてのクエリでキャッシュが検索されます。システムがキャッシュ内でデータレコードを見つけられない場合、結合キーは存在しません。キャッシュエントリが期限切れになると、システムはキャッシュ内のすべてのデータを再ロードします。

    • LRU: ディメンションテーブルの一部のデータがキャッシュされます。ソーステーブルからデータレコードが読み取られるたびに、システムはキャッシュ内のデータを検索します。データが見つからない場合、システムは物理ディメンションテーブルでデータを検索します。

    • なし: データはキャッシュされません。

    cacheSize

    キャッシュできるデータの最大行数。

    Long

    いいえ

    100000

    cacheSize オプションは、キャッシュオプションを LRU に設定した場合にのみ有効になります。

    cacheTTLMs

    キャッシュのタイムアウト期間。

    Long

    いいえ

    Long.MAX_VALUE

    単位: ミリ秒。 cacheTTLMs オプションの構成は、キャッシュオプションによって異なります。

    • キャッシュオプションが LRU に設定されている場合、 cacheTTLMs オプションはキャッシュのタイムアウト期間を指定します。デフォルトでは、キャッシュエントリは期限切れになりません。

    • キャッシュオプションを ALL に設定すると、 cacheTTLMs オプションはシステムがキャッシュをリフレッシュする間隔を指定します。デフォルトでは、キャッシュはリフレッシュされません。

    ignoreKeywordSuffix

    STRING データ型のフィールドの名前に自動的に追加される .keyword 接尾辞を無視するかどうかを指定します。

    Boolean

    いいえ

    false

    Realtime Compute for Apache Flink は、互換性を確保するために TEXT データ型のフィールドを STRING データ型のフィールドに変換します。デフォルトでは、.keyword 接尾辞が STRING データ型のフィールドの名前に追加されます。

    有効な値:

    • true: .keyword 接尾辞は無視されます。

      Elasticsearch シンクテーブルの TEXT データ型のフィールドを一致させることができない場合は、このオプションを true に設定します。

    • false: .keyword 接尾辞は無視されません。

    cacheEmpty

    物理ディメンションテーブルで見つかった空の結果をキャッシュするかどうかを指定します。

    Boolean

    いいえ

    true

    cacheEmpty オプションは、キャッシュオプションが LRU に設定されている場合にのみ有効になります。

    queryMaxDocs

    各データレコードが非プライマリキーディメンションテーブルに送信された後、Elasticsearch サーバーがクエリされたときに返されるドキュメントの最大数。

    Integer

    いいえ

    10000

    デフォルト値 10000 は、このオプションの最大値でもあります。

    説明
    • VVR 8.0.8 以降でのみ、このオプションがサポートされています。

    • プライマリキーテーブルのデータは一意であるため、このオプションは非プライマリキーディメンションテーブルに対してのみ有効です。

    • クエリの正確性を確保するために、大きな値がデフォルト値として使用されます。ただし、大きな値は Elasticsearch クエリ中のメモリ使用量を増加させます。メモリ不足の問題が発生した場合は、値を小さくしてメモリ使用量を削減できます。

データ型マッピング

Realtime Compute for Apache Flink は、Elasticsearch データを JSON フォーマットで解析します。詳細については、「データ型マッピング」をご参照ください。

サンプルコード

  • ソーステーブルのサンプルコード

    CREATE TEMPORARY TABLE elasticsearch_source (
      name STRING,
      location STRING,
      `value` FLOAT
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'indexName' = '<yourIndexName>',
      'typeNames' = '<yourTypeName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink (
      name STRING,
      location STRING,
      `value` FLOAT
    ) WITH (
      'connector' ='blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT name, location, `value`
    FROM elasticsearch_source;
  • ディメンションテーブルのサンプルコード

    CREATE TEMPORARY TABLE datagen_source (
      id STRING, 
      data STRING,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'datagen' 
    );
    
    CREATE TEMPORARY TABLE es_dim (
      id STRING,
      `value` FLOAT,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'indexName' = '<yourIndexName>',
      'typeNames' = '<yourTypeName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink (
      id STRING,
      data STRING,
      `value` FLOAT
    ) WITH (
      'connector' = 'blackhole' 
    );
    
    INSERT INTO blackhole_sink
    SELECT e.*, w.*
    FROM datagen_source AS e
    JOIN es_dim FOR SYSTEM_TIME AS OF e.proctime AS w
    ON e.id = w.id;
  • シンクテーブル 1 のサンプルコード

    CREATE TEMPORARY TABLE datagen_source (
      id STRING, 
      name STRING,
      uv BIGINT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE es_sink (
      user_id STRING,
      user_name STRING,
      uv BIGINT,
      PRIMARY KEY (user_id) NOT ENFORCED -- プライマリキーはオプションです。プライマリキーを指定する場合、プライマリキーはドキュメント ID として使用されます。プライマリキーを指定しない場合、ランダムな値がドキュメント ID として使用されます。
    ) WITH (
      'connector' = 'elasticsearch-6',
      'hosts' = '<yourHosts>',
      'index' = '<yourIndex>',
      'document-type' = '<yourElasticsearch.types>',
      'username' ='${secret_values.ak_id}',
      'password' ='${secret_values.ak_secret}'
    );
    
    INSERT INTO es_sink
    SELECT id, name, uv
    FROM datagen_source;
  • シンクテーブル 2 のサンプルコード

    CREATE TEMPORARY TABLE datagen_source(  
      id STRING,
        details ROW<  
            name STRING,  
            ages ARRAY<INT>,  
            attributes MAP<STRING, STRING>  
        >
    ) WITH (  
        'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE es_sink (
      id STRING,
        details ROW<  
            name STRING,  
            ages ARRAY<INT>,  
            attributes MAP<STRING, STRING>  
        >, 
      PRIMARY KEY (id) NOT ENFORCED  -- プライマリキーはオプションです。プライマリキーを指定する場合、プライマリキーはドキュメント ID として使用されます。プライマリキーを指定しない場合、ランダムな値がドキュメント ID として使用されます。
    ) WITH (
      'connector' = 'elasticsearch-6',
      'hosts' = '<yourHosts>',
      'index' = '<yourIndex>',
      'document-type' = '<yourElasticsearch.types>',
      'username' ='${secret_values.ak_id}',
      'password' ='${secret_values.ak_secret}'
    );
    
    INSERT INTO es_sink
    SELECT id, details
    FROM datagen_source;