すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Elasticsearch

最終更新日:Jan 06, 2026

このトピックでは、Elasticsearch コネクタの使用方法について説明します。

背景情報

Alibaba Cloud Elasticsearch は、セキュリティ、Machine Learning、Graph、アプリケーションパフォーマンス管理 (APM) などのオープンソース Elasticsearch の機能と互換性があります。データ分析、データ検索、その他のシナリオに適しており、アクセス制御、セキュリティの監視とアラート、自動レポート生成などのエンタープライズレベルのサービスを提供します。

Elasticsearch コネクタは、以下をサポートします:

カテゴリ

説明

サポートされるタイプ

ソーステーブル、ディメンションテーブル、シンクテーブル

実行モード

バッチモードおよびストリーミングモード

データ形式

JSON

特定の監視メトリック

  • ソーステーブル

    • pendingRecords

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerSecond

  • ディメンションテーブル

    なし

  • シンクテーブル (Ververica Runtime (VVR) 6.0.6 以降)

    • numRecordsOut

    • numRecordsOutPerSecond

説明

メトリックの詳細については、「メトリック」をご参照ください。

API タイプ

DataStream および SQL

シンクテーブルでのデータの更新または削除

はい

前提条件

制限事項

  • ソーステーブルとディメンションテーブルは Elasticsearch 6.8.x 以降をサポートしますが、8.x 以降はサポートしません。

  • シンクテーブルは Elasticsearch 6.x、7.x、8.x のみをサポートします。

  • 完全な Elasticsearch ソーステーブルのみがサポートされます。増分ソーステーブルはサポートされません。

構文

  • ソーステーブル

    CREATE TABLE elasticsearch_source(
      name STRING,
      location STRING,
      value FLOAT
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'indexName' = '<yourIndexName>'
    );
  • ディメンションテーブル

    CREATE TABLE es_dim(
      field1 STRING, -- このフィールドは JOIN 操作のキーとして使用され、STRING 型である必要があります。
      field2 FLOAT,
      field3 BIGINT,
      PRIMARY KEY (field1) NOT ENFORCED
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'indexName' = '<yourIndexName>'
    );
    説明
    • プライマリキーを指定する場合、ディメンションテーブルの JOIN 操作には 1 つのキーフィールドしか使用できません。このキーは、対応する Elasticsearch インデックスのドキュメント ID である必要があります。

    • プライマリキーを指定しない場合、ディメンションテーブルの JOIN 操作に 1 つ以上のキーフィールドを使用できます。これらのキーは、対応する Elasticsearch インデックスのドキュメント内のフィールドである必要があります。

    • String 型の場合、互換性を確保するために、デフォルトでフィールド名に `.keyword` サフィックスが追加されます。これにより Elasticsearch の Text フィールドとのマッチングが妨げられる場合は、`ignoreKeywordSuffix` パラメーターを true に設定できます。

  • シンクテーブル

    CREATE TABLE es_sink(
      user_id   STRING,
      user_name   STRING,
      uv BIGINT,
      pv BIGINT,
      PRIMARY KEY (user_id) NOT ENFORCED
    ) WITH (
      'connector' = 'elasticsearch-7', -- Elasticsearch 6.x を使用する場合は、このパラメーターを elasticsearch-6 に設定します。
      'hosts' = '<yourHosts>',
      'index' = '<yourIndex>'
    );
    説明
    • Elasticsearch シンクテーブルは、プライマリキーが定義されているかどうかに応じて、upsert モードまたは append モードで動作します。

      • プライマリキーが定義されている場合、それはドキュメント ID である必要があります。Elasticsearch シンクテーブルは upsert モードで動作し、UPDATE および DELETE メッセージを処理できます。

      • プライマリキーが定義されていない場合、Elasticsearch はランダムなドキュメント ID を自動的に生成します。Elasticsearch シンクテーブルは append モードで動作し、INSERT メッセージのみを消費できます。

    • BYTES、ROW、ARRAY、MAP などの一部のデータ型には、対応する文字列表現がありません。したがって、これらの型のフィールドはプライマリキーフィールドとして使用できません。

    • DDL 文のフィールドは、Elasticsearch ドキュメントのフィールドに対応します。ドキュメント ID などのメタデータは Elasticsearch インスタンスによって維持されるため、Elasticsearch シンクテーブルに書き込むことはできません。

WITH パラメーター

ソーステーブル

パラメーター

説明

データ型

必須

デフォルト値

注意

connector

ソーステーブルのタイプ。

String

はい

なし

静的フィールドは Elasticsearch です。

endPoint

サーバーアドレス。

String

はい

なし

例:http://127.0.0.1:XXXX

indexName

インデックス名。

String

はい

なし

なし。

accessId

Elasticsearch インスタンスのユーザー名。

String

いいえ

なし

デフォルト値は空で、認証が実行されないことを意味します。accessId を指定する場合は、空でない accessKey も指定する必要があります。

重要

ユーザー名とパスワードの漏洩を防ぐため、変数を使用してください。詳細については、「プロジェクト変数」をご参照ください。

accessKey

Elasticsearch インスタンスのパスワード。

String

いいえ

なし

typeNames

タイプ名。

String

いいえ

_doc

Elasticsearch 7.0 以降ではこのパラメーターを設定しないでください。

batchSize

各スクロールリクエストで Elasticsearch クラスターから取得するドキュメントの最大数。

Int

いいえ

2000

なし。

keepScrollAliveSecs

スクロールコンテキストを維持する最大時間。

Int

いいえ

3600

単位は秒です。

シンクテーブル

パラメーター

説明

データ型

必須

デフォルト値

注意

connector

シンクテーブルのタイプ。

String

はい

なし

有効な値は elasticsearch-6elasticsearch-7elasticsearch-8 です。

説明

このパラメーターを elasticsearch-8 に設定できるのは、VVR 8.0.5 以降のみです。

hosts

サーバーアドレス。

String

はい

なし

例:127.0.0.1:XXXX

index

インデックス名。

String

はい

なし

Elasticsearch シンクテーブルは、静的インデックスと動的インデックスの両方をサポートします。静的インデックスと動的インデックスを使用する際は、以下の点にご注意ください:

  • 静的インデックスを使用する場合、index パラメーターの値は myusers のようなプレーンな文字列でなければなりません。すべてのレコードは myusers インデックスに書き込まれます。

  • 動的インデックスを使用する場合、{field_name} を使用してレコード内のフィールド値を参照し、動的に宛先インデックスを生成できます。また、{field_name|date_format_string} を使用して、TIMESTAMP、DATE、TIME 型のフィールド値を date_format_string で指定された形式に変換することもできます。date_format_string は Java の DateTimeFormatter と互換性があります。たとえば、パラメーターを myusers-{log_ts|yyyy-MM-dd} に設定した場合、log_ts フィールド値が 2020-03-27 12:25:55 のレコードは myusers-2020-03-27 インデックスに書き込まれます。

document-type

ドキュメントタイプ。

String

  • elasticsearch-6:必須

  • elasticsearch-7:サポート対象外

なし

connector パラメーターが elasticsearch-6 に設定されている場合、このパラメーターの値は Elasticsearch 側の type パラメーターの値と同じでなければなりません。

username

ユーザー名。

String

いいえ

デフォルト値は空で、認証が実行されないことを意味します。username を指定する場合は、空でない password も指定する必要があります。

重要

ユーザー名とパスワードの漏洩を防ぐため、変数を使用してください。詳細については、「プロジェクト変数」をご参照ください。

password

パスワード。

String

いいえ

document-id.key-delimiter

ドキュメント ID の区切り文字。

String

いいえ

_

Elasticsearch シンクテーブルでは、プライマリキーを使用して Elasticsearch のドキュメント ID を計算します。Elasticsearch シンクテーブルは、DDL で定義された順序ですべてのプライマリキーフィールドを document-id.key-delimiter で指定されたキー区切り文字で接続することにより、各行のドキュメント ID 文字列を生成します。

説明

ドキュメント ID は最大 512 バイトの文字列で、スペースを含みません。

failure-handler

失敗した Elasticsearch リクエストに対する障害処理ポリシー。

String

いいえ

fail

以下のポリシーが利用可能です:

  • fail (デフォルト):リクエストが失敗した場合、ジョブは失敗します。

  • ignore:失敗を無視し、リクエストを削除します。

  • retry-rejected:キューの容量が満杯で失敗したリクエストを再追加します。

  • カスタムクラス名:ActionRequestFailureHandler のサブクラスを使用して障害を処理します。

sink.flush-on-checkpoint

チェックポイントでフラッシュ操作を実行するかどうかを指定します。

Boolean

いいえ

true

  • true:デフォルト値。

  • false:この機能を無効にすると、コネクタはチェックポイント中にすべての保留中のリクエストが完了したことを確認するのを待ちません。したがって、コネクタはリクエストに対して at-least-once 保証を提供しません。

sink.bulk-flush.backoff.strategy

一時的なリクエストエラーによりフラッシュ操作が失敗した場合、sink.bulk-flush.backoff.strategy を設定してリトライポリシーを指定します。

Enum

いいえ

DISABLED

  • DISABLED (デフォルト):リトライを実行しません。最初のリクエストエラーの後、操作は失敗します。

  • CONSTANT:定数バックオフ。リトライ間の待機時間は同じです。

  • EXPONENTIAL:指数バックオフ。リトライ間の待機時間は指数関数的に増加します。

sink.bulk-flush.backoff.max-retries

バックオフリトライの最大数。

Int

いいえ

なし

なし。

sink.bulk-flush.backoff.delay

各バックオフ試行間の遅延。

Duration

いいえ

なし

  • CONSTANT バックオフポリシーの場合、この値は各リトライ間の遅延です。

  • EXPONENTIAL バックオフポリシーの場合、この値は初期のベースライン遅延です。

sink.bulk-flush.max-actions

各バッチリクエストでバッファリングされる操作の最大数。

Int

いいえ

1000

値 0 はこの機能を無効にします。

sink.bulk-flush.max-size

リクエスト用バッファーの最大メモリサイズ。

String

いいえ

2 MB

単位は MB です。デフォルト値は 2 MB です。値 0 MB はこの機能を無効にします。

sink.bulk-flush.interval

フラッシュ間隔。

Duration

いいえ

1s

単位は秒です。デフォルト値は 1s です。値 0s はこの機能を無効にします。

connection.path-prefix

各 REST 通信に追加するプレフィックス文字列。

String

いいえ

なし。

retry-on-conflict

バージョン競合例外による更新操作で許可されるリトライの最大数。リトライ回数がこの値を超えると、例外がスローされ、ジョブは失敗します。

Int

いいえ

0

説明
  • このパラメーターは、VVR 4.0.13 以降でのみサポートされます。

  • このパラメーターは、プライマリキーが定義されている場合にのみ有効です。

routing-fields

Elasticsearch の特定のシャードにドキュメントをルーティングするための 1 つ以上の ES フィールド名を指定します。

String

いいえ

なし

複数のフィールド名はセミコロン (;) で区切ります。フィールドが空の場合、null に設定されます。

説明

このパラメーターは、elasticsearch-7 および elasticsearch-8 の VVR 8.0.6 以降でのみサポートされます。

sink.delete-strategy

リトラクションメッセージ (-D/-U) を受信したときの動作を設定します。

Enum

いいえ

DELETE_ROW_ON_PK

以下の動作が利用可能です:

  • DELETE_ROW_ON_PK (デフォルト):-U メッセージは無視しますが、-D メッセージを受信するとプライマリキーに対応する行 (ドキュメント) を削除します。

  • IGNORE_DELETE:-U および -D メッセージを無視します。Elasticsearch シンクはリトラクションを実行しません。

  • NON_PK_FIELD_TO_NULL:-U メッセージは無視しますが、-D メッセージを受信するとプライマリキーに対応する行 (ドキュメント) を変更します。プライマリキーの値は変更されず、テーブルスキーマ内の他のすべての非プライマリキーの値は NULL に設定されます。これは主に、複数のシンクが同じ Elasticsearch テーブルに書き込む際のパーシャルアップデートに使用されます。

  • CHANGELOG_STANDARD:DELETE_ROW_ON_PK と同様ですが、-U メッセージを受信したときにもプライマリキーに対応する行 (ドキュメント) を削除します。

    説明

    このパラメーターは、VVR 8.0.8 以降でのみサポートされます。

sink.ignore-null-when-update

データを更新する際に、受信データフィールドの値が null の場合に、対応するフィールドを null に更新するか、フィールドを更新しないかを指定します。

BOOLEAN

いいえ

false

有効な値:

  • true:フィールドを更新しません。このパラメーターを true に設定できるのは、Flink テーブルにプライマリキーが設定され、Elasticsearch のデータ形式が JSON の場合のみです。

  • false:フィールドを null に更新します。

説明

このパラメーターは、VVR 11.1 以降でのみサポートされます。

connection.request-timeout

接続マネージャーから接続を要求する際のタイムアウト。

Duration

いいえ

なし

説明

このパラメーターは、VVR 11.5 以降でのみサポートされます。

connect.timeout

接続を確立するためのタイムアウト。

Duration

いいえ

なし

説明

このパラメーターは、VVR 11.5 以降でのみサポートされます。

socket.timeout

データを待機するためのタイムアウト。これは、2 つの連続するデータパケット間の最大アイドル時間です。

Duration

いいえ

なし

説明

このパラメーターは、VVR 11.5 以降でのみサポートされます。

sink.bulk-flush.update.doc_as_upsert

ドキュメントを更新フィールドとして使用するかどうかを指定します。

BOOLEAN

いいえ

false

有効な値:

  • true:Update Request の doc_as_upsert フィールドを true に設定します。

  • false:Update Request の upsert フィールドにドキュメントを入力します。

https://github.com/elastic/elasticsearch/issues/105804 によると、Elasticsearch のプリセットデータパイプラインは、バルク更新のパーシャルアップデートをサポートしていません。データパイプラインを使用する場合は、このパラメーターを true に設定してください。

説明

このパラメーターは、VVR 11.5 以降でのみサポートされます。

ディメンションテーブル

パラメーター

説明

データ型

必須

デフォルト値

注意

connector

ディメンションテーブルのタイプ。

String

はい

なし

値は Elasticsearch に固定されています。

endPoint

サーバーアドレス。

String

はい

なし

例:http://127.0.0.1:XXXX

indexName

インデックス名。

String

はい

なし

なし。

accessId

Elasticsearch インスタンスのユーザー名。

String

いいえ

なし

デフォルト値は空で、認証が実行されないことを意味します。accessId を指定する場合は、空でない accessKey も指定する必要があります。

重要

ユーザー名とパスワードの漏洩を防ぐため、変数を使用してください。詳細については、「プロジェクト変数」をご参照ください。

accessKey

Elasticsearch インスタンスのパスワード。

String

いいえ

なし

typeNames

タイプ名。

String

いいえ

_doc

Elasticsearch 7.0 以降ではこのパラメーターを設定しないでください。

maxJoinRows

単一のデータ行に対して結合する行の最大数。

Integer

いいえ

1024

なし。

cache

キャッシュポリシー。

String

いいえ

なし

次の 3 つのキャッシュポリシーがサポートされています:

  • ALL:ディメンションテーブルのすべてのデータをキャッシュします。ジョブが実行される前に、システムはディメンションテーブルからすべてのデータをキャッシュにロードします。以降のすべてのルックアップはキャッシュに対して実行されます。キャッシュにデータが見つからない場合、キーは存在しません。キャッシュの有効期限が切れると、フルキャッシュが再読み込みされます。

  • LRU:ディメンションテーブルのデータの一部をキャッシュします。ソーステーブルの各レコードについて、システムはまずキャッシュ内のデータを検索します。データが見つからない場合、システムは物理ディメンションテーブルでそれを検索します。

  • None:キャッシュなし。

cacheSize

キャッシュサイズ。キャッシュするデータ行の数です。

Long

いいえ

100000

cacheSize パラメーターは、cache パラメーターが LRU に設定されている場合にのみ有効です。

cacheTTLMs

キャッシュの有効期限が切れるまでのタイムアウト期間。

Long

いいえ

Long.MAX_VALUE

単位はミリ秒です。cacheTTLMs の設定は、cache の設定に依存します:

  • cache が LRU に設定されている場合、cacheTTLMs はキャッシュのタイムアウト期間です。デフォルトでは、キャッシュは期限切れになりません。

  • cache が ALL に設定されている場合、cacheTTLMs はキャッシュの再読み込み間隔です。デフォルトでは、キャッシュは再読み込みされません。

ignoreKeywordSuffix

String フィールドに自動的に追加される .keyword サフィックスを無視するかどうかを指定します。

Boolean

いいえ

false

互換性を確保するため、Flink は Elasticsearch の Text 型を String 型に変換し、デフォルトで String 型のフィールド名に .keyword サフィックスを追加します。

有効な値:

  • true:設定は無視されます。

    これにより Elasticsearch の Text 型フィールドとのマッチングが妨げられる場合は、このパラメーターを true に設定してください。

  • false:項目は無視されません。

cacheEmpty

物理ディメンションテーブルからのルックアップで空の結果をキャッシュするかどうかを指定します。

Boolean

いいえ

true

cacheEmpty パラメーターは、cache パラメーターが LRU に設定されている場合にのみ有効です。

queryMaxDocs

非プライマリキーディメンションテーブルの入力からの各受信データレコードに対して Elasticsearch サーバーをクエリする際に返されるドキュメントの最大数。

Integer

いいえ

10000

デフォルト値の 10000 は、Elasticsearch サーバーが返すことができるドキュメントの最大制限です。このパラメーターの値はこの制限を超えることはできません。

説明
  • このパラメーターは、VVR 8.0.8 以降でのみサポートされます。

  • プライマリキーテーブルのデータは一意であるため、このパラメーターは非プライマリキーディメンションテーブルに対してのみ有効です。

  • クエリの正確性を確保するため、デフォルト値は大きくなっています。ただし、値が大きいと Elasticsearch をクエリする際のメモリ使用量が増加します。メモリの問題が発生した場合は、この値を下げてメモリ使用量を最適化できます。

型マッピング

Flink は Elasticsearch のデータを JSON フォーマットで解析します。詳細については、「データ型マッピング」をご参照ください。

  • ソーステーブルの例

    CREATE TEMPORARY TABLE elasticsearch_source (
      name STRING,
      location STRING,
      `value` FLOAT
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'indexName' = '<yourIndexName>',
      'typeNames' = '<yourTypeName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink (
      name STRING,
      location STRING,
      `value` FLOAT
    ) WITH (
      'connector' ='blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT name, location, `value`
    FROM elasticsearch_source;
  • ディメンションテーブルの例

    CREATE TEMPORARY TABLE datagen_source (
      id STRING, 
      data STRING,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'datagen' 
    );
    
    CREATE TEMPORARY TABLE es_dim (
      id STRING,
      `value` FLOAT,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'indexName' = '<yourIndexName>',
      'typeNames' = '<yourTypeName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink (
      id STRING,
      data STRING,
      `value` FLOAT
    ) WITH (
      'connector' = 'blackhole' 
    );
    
    INSERT INTO blackhole_sink
    SELECT e.*, w.*
    FROM datagen_source AS e
    JOIN es_dim FOR SYSTEM_TIME AS OF e.proctime AS w
    ON e.id = w.id;
  • シンクテーブルの例 1

    CREATE TEMPORARY TABLE datagen_source (
      id STRING, 
      name STRING,
      uv BIGINT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE es_sink (
      user_id STRING,
      user_name STRING,
      uv BIGINT,
      PRIMARY KEY (user_id) NOT ENFORCED -- プライマリキーはオプションです。プライマリキーが定義されている場合はドキュメント ID として使用されます。それ以外の場合は、ランダムな値がドキュメント ID として使用されます。
    ) WITH (
      'connector' = 'elasticsearch-6',
      'hosts' = '<yourHosts>',
      'index' = '<yourIndex>',
      'document-type' = '<yourElasticsearch.types>',
      'username' ='${secret_values.ak_id}',
      'password' ='${secret_values.ak_secret}'
    );
    
    INSERT INTO es_sink
    SELECT id, name, uv
    FROM datagen_source;
  • シンクテーブルの例 2

    CREATE TEMPORARY TABLE datagen_source(  
      id STRING,
        details ROW<  
            name STRING,  
            ages ARRAY<INT>,  
            attributes MAP<STRING, STRING>  
        >
    ) WITH (  
        'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE es_sink (
      id STRING,
        details ROW<  
            name STRING,  
            ages ARRAY<INT>,  
            attributes MAP<STRING, STRING>  
        >, 
      PRIMARY KEY (id) NOT ENFORCED  -- プライマリキーはオプションです。プライマリキーが定義されている場合はドキュメント ID として使用されます。それ以外の場合は、ランダムな値がドキュメント ID として使用されます。
    ) WITH (
      'connector' = 'elasticsearch-6',
      'hosts' = '<yourHosts>',
      'index' = '<yourIndex>',
      'document-type' = '<yourElasticsearch.types>',
      'username' ='${secret_values.ak_id}',
      'password' ='${secret_values.ak_secret}'
    );
    
    INSERT INTO es_sink
    SELECT id, details
    FROM datagen_source;