このトピックでは、Elasticsearch コネクタの使用方法について説明します。
背景情報
Alibaba Cloud Elasticsearch は、セキュリティ、Machine Learning、Graph、アプリケーションパフォーマンス管理 (APM) などのオープンソース Elasticsearch の機能と互換性があります。データ分析、データ検索、その他のシナリオに適しており、アクセス制御、セキュリティの監視とアラート、自動レポート生成などのエンタープライズレベルのサービスを提供します。
Elasticsearch コネクタは、以下をサポートします:
カテゴリ | 説明 |
サポートされるタイプ | ソーステーブル、ディメンションテーブル、シンクテーブル |
実行モード | バッチモードおよびストリーミングモード |
データ形式 | JSON |
特定の監視メトリック |
説明 メトリックの詳細については、「メトリック」をご参照ください。 |
API タイプ | DataStream および SQL |
シンクテーブルでのデータの更新または削除 | はい |
前提条件
Elasticsearch インデックスが作成されていること。詳細については、「インデックスの作成」をご参照ください。
Elasticsearch インスタンスに対して、パブリックまたは内部アクセスホワイトリストが設定されていること。詳細については、「インスタンスのパブリックまたは内部アクセスホワイトリストの設定」をご参照ください。
制限事項
ソーステーブルとディメンションテーブルは Elasticsearch 6.8.x 以降をサポートしますが、8.x 以降はサポートしません。
シンクテーブルは Elasticsearch 6.x、7.x、8.x のみをサポートします。
完全な Elasticsearch ソーステーブルのみがサポートされます。増分ソーステーブルはサポートされません。
構文
ソーステーブル
CREATE TABLE elasticsearch_source( name STRING, location STRING, value FLOAT ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'indexName' = '<yourIndexName>' );ディメンションテーブル
CREATE TABLE es_dim( field1 STRING, -- このフィールドは JOIN 操作のキーとして使用され、STRING 型である必要があります。 field2 FLOAT, field3 BIGINT, PRIMARY KEY (field1) NOT ENFORCED ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'indexName' = '<yourIndexName>' );説明プライマリキーを指定する場合、ディメンションテーブルの JOIN 操作には 1 つのキーフィールドしか使用できません。このキーは、対応する Elasticsearch インデックスのドキュメント ID である必要があります。
プライマリキーを指定しない場合、ディメンションテーブルの JOIN 操作に 1 つ以上のキーフィールドを使用できます。これらのキーは、対応する Elasticsearch インデックスのドキュメント内のフィールドである必要があります。
String 型の場合、互換性を確保するために、デフォルトでフィールド名に `.keyword` サフィックスが追加されます。これにより Elasticsearch の Text フィールドとのマッチングが妨げられる場合は、`ignoreKeywordSuffix` パラメーターを true に設定できます。
シンクテーブル
CREATE TABLE es_sink( user_id STRING, user_name STRING, uv BIGINT, pv BIGINT, PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7', -- Elasticsearch 6.x を使用する場合は、このパラメーターを elasticsearch-6 に設定します。 'hosts' = '<yourHosts>', 'index' = '<yourIndex>' );説明Elasticsearch シンクテーブルは、プライマリキーが定義されているかどうかに応じて、upsert モードまたは append モードで動作します。
プライマリキーが定義されている場合、それはドキュメント ID である必要があります。Elasticsearch シンクテーブルは upsert モードで動作し、UPDATE および DELETE メッセージを処理できます。
プライマリキーが定義されていない場合、Elasticsearch はランダムなドキュメント ID を自動的に生成します。Elasticsearch シンクテーブルは append モードで動作し、INSERT メッセージのみを消費できます。
BYTES、ROW、ARRAY、MAP などの一部のデータ型には、対応する文字列表現がありません。したがって、これらの型のフィールドはプライマリキーフィールドとして使用できません。
DDL 文のフィールドは、Elasticsearch ドキュメントのフィールドに対応します。ドキュメント ID などのメタデータは Elasticsearch インスタンスによって維持されるため、Elasticsearch シンクテーブルに書き込むことはできません。
WITH パラメーター
ソーステーブル
パラメーター | 説明 | データ型 | 必須 | デフォルト値 | 注意 |
connector | ソーステーブルのタイプ。 | String | はい | なし | 静的フィールドは Elasticsearch です。 |
endPoint | サーバーアドレス。 | String | はい | なし | 例: |
indexName | インデックス名。 | String | はい | なし | なし。 |
accessId | Elasticsearch インスタンスのユーザー名。 | String | いいえ | なし | デフォルト値は空で、認証が実行されないことを意味します。accessId を指定する場合は、空でない accessKey も指定する必要があります。 重要 ユーザー名とパスワードの漏洩を防ぐため、変数を使用してください。詳細については、「プロジェクト変数」をご参照ください。 |
accessKey | Elasticsearch インスタンスのパスワード。 | String | いいえ | なし | |
typeNames | タイプ名。 | String | いいえ | _doc | Elasticsearch 7.0 以降ではこのパラメーターを設定しないでください。 |
batchSize | 各スクロールリクエストで Elasticsearch クラスターから取得するドキュメントの最大数。 | Int | いいえ | 2000 | なし。 |
keepScrollAliveSecs | スクロールコンテキストを維持する最大時間。 | Int | いいえ | 3600 | 単位は秒です。 |
シンクテーブル
パラメーター | 説明 | データ型 | 必須 | デフォルト値 | 注意 |
connector | シンクテーブルのタイプ。 | String | はい | なし | 有効な値は 説明 このパラメーターを |
hosts | サーバーアドレス。 | String | はい | なし | 例: |
index | インデックス名。 | String | はい | なし | Elasticsearch シンクテーブルは、静的インデックスと動的インデックスの両方をサポートします。静的インデックスと動的インデックスを使用する際は、以下の点にご注意ください:
|
document-type | ドキュメントタイプ。 | String |
| なし | connector パラメーターが |
username | ユーザー名。 | String | いいえ | 空 | デフォルト値は空で、認証が実行されないことを意味します。username を指定する場合は、空でない password も指定する必要があります。 重要 ユーザー名とパスワードの漏洩を防ぐため、変数を使用してください。詳細については、「プロジェクト変数」をご参照ください。 |
password | パスワード。 | String | いいえ | 空 | |
document-id.key-delimiter | ドキュメント ID の区切り文字。 | String | いいえ | _ | Elasticsearch シンクテーブルでは、プライマリキーを使用して Elasticsearch のドキュメント ID を計算します。Elasticsearch シンクテーブルは、DDL で定義された順序ですべてのプライマリキーフィールドを document-id.key-delimiter で指定されたキー区切り文字で接続することにより、各行のドキュメント ID 文字列を生成します。 説明 ドキュメント ID は最大 512 バイトの文字列で、スペースを含みません。 |
failure-handler | 失敗した Elasticsearch リクエストに対する障害処理ポリシー。 | String | いいえ | fail | 以下のポリシーが利用可能です:
|
sink.flush-on-checkpoint | チェックポイントでフラッシュ操作を実行するかどうかを指定します。 | Boolean | いいえ | true |
|
sink.bulk-flush.backoff.strategy | 一時的なリクエストエラーによりフラッシュ操作が失敗した場合、sink.bulk-flush.backoff.strategy を設定してリトライポリシーを指定します。 | Enum | いいえ | DISABLED |
|
sink.bulk-flush.backoff.max-retries | バックオフリトライの最大数。 | Int | いいえ | なし | なし。 |
sink.bulk-flush.backoff.delay | 各バックオフ試行間の遅延。 | Duration | いいえ | なし |
|
sink.bulk-flush.max-actions | 各バッチリクエストでバッファリングされる操作の最大数。 | Int | いいえ | 1000 | 値 0 はこの機能を無効にします。 |
sink.bulk-flush.max-size | リクエスト用バッファーの最大メモリサイズ。 | String | いいえ | 2 MB | 単位は MB です。デフォルト値は 2 MB です。値 0 MB はこの機能を無効にします。 |
sink.bulk-flush.interval | フラッシュ間隔。 | Duration | いいえ | 1s | 単位は秒です。デフォルト値は 1s です。値 0s はこの機能を無効にします。 |
connection.path-prefix | 各 REST 通信に追加するプレフィックス文字列。 | String | いいえ | 空 | なし。 |
retry-on-conflict | バージョン競合例外による更新操作で許可されるリトライの最大数。リトライ回数がこの値を超えると、例外がスローされ、ジョブは失敗します。 | Int | いいえ | 0 | 説明
|
routing-fields | Elasticsearch の特定のシャードにドキュメントをルーティングするための 1 つ以上の ES フィールド名を指定します。 | String | いいえ | なし | 複数のフィールド名はセミコロン (;) で区切ります。フィールドが空の場合、null に設定されます。 説明 このパラメーターは、elasticsearch-7 および elasticsearch-8 の VVR 8.0.6 以降でのみサポートされます。 |
sink.delete-strategy | リトラクションメッセージ (-D/-U) を受信したときの動作を設定します。 | Enum | いいえ | DELETE_ROW_ON_PK | 以下の動作が利用可能です:
|
sink.ignore-null-when-update | データを更新する際に、受信データフィールドの値が null の場合に、対応するフィールドを null に更新するか、フィールドを更新しないかを指定します。 | BOOLEAN | いいえ | false | 有効な値:
説明 このパラメーターは、VVR 11.1 以降でのみサポートされます。 |
connection.request-timeout | 接続マネージャーから接続を要求する際のタイムアウト。 | Duration | いいえ | なし | 説明 このパラメーターは、VVR 11.5 以降でのみサポートされます。 |
connect.timeout | 接続を確立するためのタイムアウト。 | Duration | いいえ | なし | 説明 このパラメーターは、VVR 11.5 以降でのみサポートされます。 |
socket.timeout | データを待機するためのタイムアウト。これは、2 つの連続するデータパケット間の最大アイドル時間です。 | Duration | いいえ | なし | 説明 このパラメーターは、VVR 11.5 以降でのみサポートされます。 |
sink.bulk-flush.update.doc_as_upsert | ドキュメントを更新フィールドとして使用するかどうかを指定します。 | BOOLEAN | いいえ | false | 有効な値:
https://github.com/elastic/elasticsearch/issues/105804 によると、Elasticsearch のプリセットデータパイプラインは、バルク更新のパーシャルアップデートをサポートしていません。データパイプラインを使用する場合は、このパラメーターを true に設定してください。 説明 このパラメーターは、VVR 11.5 以降でのみサポートされます。 |
ディメンションテーブル
パラメーター | 説明 | データ型 | 必須 | デフォルト値 | 注意 |
connector | ディメンションテーブルのタイプ。 | String | はい | なし | 値は Elasticsearch に固定されています。 |
endPoint | サーバーアドレス。 | String | はい | なし | 例: |
indexName | インデックス名。 | String | はい | なし | なし。 |
accessId | Elasticsearch インスタンスのユーザー名。 | String | いいえ | なし | デフォルト値は空で、認証が実行されないことを意味します。accessId を指定する場合は、空でない accessKey も指定する必要があります。 重要 ユーザー名とパスワードの漏洩を防ぐため、変数を使用してください。詳細については、「プロジェクト変数」をご参照ください。 |
accessKey | Elasticsearch インスタンスのパスワード。 | String | いいえ | なし | |
typeNames | タイプ名。 | String | いいえ | _doc | Elasticsearch 7.0 以降ではこのパラメーターを設定しないでください。 |
maxJoinRows | 単一のデータ行に対して結合する行の最大数。 | Integer | いいえ | 1024 | なし。 |
cache | キャッシュポリシー。 | String | いいえ | なし | 次の 3 つのキャッシュポリシーがサポートされています:
|
cacheSize | キャッシュサイズ。キャッシュするデータ行の数です。 | Long | いいえ | 100000 | cacheSize パラメーターは、cache パラメーターが LRU に設定されている場合にのみ有効です。 |
cacheTTLMs | キャッシュの有効期限が切れるまでのタイムアウト期間。 | Long | いいえ | Long.MAX_VALUE | 単位はミリ秒です。cacheTTLMs の設定は、cache の設定に依存します:
|
ignoreKeywordSuffix | String フィールドに自動的に追加される .keyword サフィックスを無視するかどうかを指定します。 | Boolean | いいえ | false | 互換性を確保するため、Flink は Elasticsearch の Text 型を String 型に変換し、デフォルトで String 型のフィールド名に .keyword サフィックスを追加します。 有効な値:
|
cacheEmpty | 物理ディメンションテーブルからのルックアップで空の結果をキャッシュするかどうかを指定します。 | Boolean | いいえ | true | cacheEmpty パラメーターは、cache パラメーターが LRU に設定されている場合にのみ有効です。 |
queryMaxDocs | 非プライマリキーディメンションテーブルの入力からの各受信データレコードに対して Elasticsearch サーバーをクエリする際に返されるドキュメントの最大数。 | Integer | いいえ | 10000 | デフォルト値の 10000 は、Elasticsearch サーバーが返すことができるドキュメントの最大制限です。このパラメーターの値はこの制限を超えることはできません。 説明
|
型マッピング
Flink は Elasticsearch のデータを JSON フォーマットで解析します。詳細については、「データ型マッピング」をご参照ください。
例
ソーステーブルの例
CREATE TEMPORARY TABLE elasticsearch_source ( name STRING, location STRING, `value` FLOAT ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'indexName' = '<yourIndexName>', 'typeNames' = '<yourTypeName>' ); CREATE TEMPORARY TABLE blackhole_sink ( name STRING, location STRING, `value` FLOAT ) WITH ( 'connector' ='blackhole' ); INSERT INTO blackhole_sink SELECT name, location, `value` FROM elasticsearch_source;ディメンションテーブルの例
CREATE TEMPORARY TABLE datagen_source ( id STRING, data STRING, proctime as PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE es_dim ( id STRING, `value` FLOAT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'indexName' = '<yourIndexName>', 'typeNames' = '<yourTypeName>' ); CREATE TEMPORARY TABLE blackhole_sink ( id STRING, data STRING, `value` FLOAT ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT e.*, w.* FROM datagen_source AS e JOIN es_dim FOR SYSTEM_TIME AS OF e.proctime AS w ON e.id = w.id;シンクテーブルの例 1
CREATE TEMPORARY TABLE datagen_source ( id STRING, name STRING, uv BIGINT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE es_sink ( user_id STRING, user_name STRING, uv BIGINT, PRIMARY KEY (user_id) NOT ENFORCED -- プライマリキーはオプションです。プライマリキーが定義されている場合はドキュメント ID として使用されます。それ以外の場合は、ランダムな値がドキュメント ID として使用されます。 ) WITH ( 'connector' = 'elasticsearch-6', 'hosts' = '<yourHosts>', 'index' = '<yourIndex>', 'document-type' = '<yourElasticsearch.types>', 'username' ='${secret_values.ak_id}', 'password' ='${secret_values.ak_secret}' ); INSERT INTO es_sink SELECT id, name, uv FROM datagen_source;シンクテーブルの例 2
CREATE TEMPORARY TABLE datagen_source( id STRING, details ROW< name STRING, ages ARRAY<INT>, attributes MAP<STRING, STRING> > ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE es_sink ( id STRING, details ROW< name STRING, ages ARRAY<INT>, attributes MAP<STRING, STRING> >, PRIMARY KEY (id) NOT ENFORCED -- プライマリキーはオプションです。プライマリキーが定義されている場合はドキュメント ID として使用されます。それ以外の場合は、ランダムな値がドキュメント ID として使用されます。 ) WITH ( 'connector' = 'elasticsearch-6', 'hosts' = '<yourHosts>', 'index' = '<yourIndex>', 'document-type' = '<yourElasticsearch.types>', 'username' ='${secret_values.ak_id}', 'password' ='${secret_values.ak_secret}' ); INSERT INTO es_sink SELECT id, details FROM datagen_source;