このトピックでは、Elasticsearch コネクタの使用方法について説明します。
背景情報
Alibaba Cloud Elasticsearch は、Security、Machine Learning、Graph、Application Performance Monitoring(APM)などのオープンソース Elasticsearch の機能と互換性があります。 Alibaba Cloud Elasticsearch は、データ分析やデータ検索など、さまざまなシナリオに適しています。 Alibaba Cloud Elasticsearch は、アクセス制御、セキュリティ監視とアラート、レポートの自動生成などのエンタープライズクラスのサービスを提供します。
次の表に、Elasticsearch コネクタでサポートされている機能を示します。
項目 | 説明 |
テーブルの種類 | ソーステーブル、ディメンションテーブル、およびシンクテーブル |
実行モード | バッチモードとストリーミングモード |
データ形式 | JSON |
メトリック |
説明 メトリックの詳細については、「メトリック」をご参照ください。 |
API タイプ | DataStream API と SQL API |
シンクテーブルのデータの更新または削除 | サポートされています |
前提条件
Elasticsearch インデックスが作成されていること。 詳細については、「はじめに」トピックの「ステップ 1:クラスタを作成する」セクションをご参照ください。
関連する Elasticsearch クラスタのパブリックまたはプライベート IP アドレスのホワイトリストが構成されていること。 詳細については、「Elasticsearch クラスタのパブリックまたはプライベート IP アドレスのホワイトリストを構成する」をご参照ください。
制限
Elasticsearch コネクタは、関連する Elasticsearch クラスタのバージョンが V6.8.X 以降で V8.X より前の場合にのみ、ソーステーブルとディメンションテーブルに使用できます。
Elasticsearch コネクタは、関連する Elasticsearch クラスタのバージョンが V6.X、V7.X、または V8.X の場合にのみ、シンクテーブルに使用できます。
Elasticsearch コネクタは、完全な Elasticsearch ソーステーブルにのみ使用でき、増分 Elasticsearch ソーステーブルには使用できません。
構文
ソーステーブルを作成する:
CREATE TABLE elasticsearch_source( name STRING, location STRING, value FLOAT ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'indexName' = '<yourIndexName>' );ディメンションテーブルを作成する:
CREATE TABLE es_dim( field1 STRING, --- このフィールドがディメンションテーブルと別のテーブルを結合するためのキーとして使用される場合、このフィールドの値は STRING データ型である必要があります。 field2 FLOAT, field3 BIGINT, PRIMARY KEY (field1) NOT ENFORCED ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'indexName' = '<yourIndexName>' );説明ディメンションテーブルにプライマリキーを定義する場合、ディメンションテーブルと別のテーブルを結合するために使用できるキーは 1 つだけです。 キーは、Elasticsearch インデックス内のドキュメントの ID です。
ディメンションテーブルにプライマリキーを定義しない場合、ディメンションテーブルと別のテーブルを結合するために 1 つ以上のキーを使用できます。 キーは、Elasticsearch インデックス内のドキュメントのフィールドです。
デフォルトでは、互換性を確保するために、STRING データ型のフィールド名に .keyword 接尾辞が追加されます。 Elasticsearch テーブルの TEXT データ型のフィールドが一致しない場合は、
ignoreKeywordSuffixオプションの値を true に設定できます。
シンクテーブルを作成する:
CREATE TABLE es_sink( user_id STRING, user_name STRING, uv BIGINT, pv BIGINT, PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7' -- Elasticsearch のバージョンが V6.X の場合は、elasticsearch-6 と入力します。 'hosts' = '<yourHosts>', 'index' = '<yourIndex>' );説明Elasticsearch シンクテーブルは、プライマリキーが定義されているかどうかに基づいて、upsert モードまたは append モードで動作するように決定されます。
Elasticsearch シンクテーブルにプライマリキーが定義されている場合、プライマリキーはドキュメント ID である必要があり、Elasticsearch シンクテーブルは upsert モードで動作します。 このモードでは、Elasticsearch シンクテーブルは UPDATE メッセージと DELETE メッセージを使用できます。
Elasticsearch シンクテーブルにプライマリキーが定義されていない場合、Elasticsearch はランダムなドキュメント ID を自動的に生成し、Elasticsearch シンクテーブルは append モードで動作します。 このモードでは、Elasticsearch シンクテーブルは INSERT メッセージのみを使用できます。
BYTES、ROW、ARRAY、MAP などの特定のデータ型は文字列として表現できません。 したがって、これらのデータ型のフィールドをプライマリキーフィールドとして使用することはできません。
DDL 文のフィールドは、Elasticsearch ドキュメントのフィールドに対応しています。 ドキュメント ID などのメタデータは、Elasticsearch クラスタで保持されます。 したがって、メタデータを Elasticsearch シンクテーブルに書き込むことはできません。
WITH 句のコネクタオプション
ソース固有
オプション
説明
データ型
必須
デフォルト値
備考
connector
ソーステーブルのタイプ。
STRING
はい
デフォルト値なし
値を elasticsearch に設定します。
endPoint
Elasticsearch クラスタのエンドポイント。
STRING
はい
デフォルト値なし
例:
http://127.0.0.1:XXXX。indexName
Elasticsearch インデックスの名前。
STRING
はい
デフォルト値なし
該当なし。
accessId
Elasticsearch クラスタにアクセスするために使用されるユーザー名。
STRING
いいえ
デフォルト値なし
デフォルトでは、このオプションは空です。 これは、権限検証が不要であることを示します。 accessId オプションを構成する場合は、 accessKey オプションも構成する必要があります。
重要セキュリティを強化するために、プレーンテキストで AccessKey ペアをハードコーディングする代わりに変数を使用してください。 詳細については、「変数を管理する」をご参照ください。
accessKey
Elasticsearch クラスタにアクセスするために使用されるパスワード。
STRING
いいえ
デフォルト値なし
typeNames
タイプの名前。
STRING
いいえ
_doc
Elasticsearch クラスタのバージョンが V7.0 以降の場合は、このオプションを構成しないことをお勧めします。
batchSize
スクロールリクエストごとに Elasticsearch クラスタから取得できるドキュメントの最大数。
INT
いいえ
2000
該当なし。
keepScrollAliveSecs
スクロールコンテキストの最大保持期間。
INT
いいえ
3600
単位:秒。
シンク固有
オプション
説明
データ型
必須
デフォルト値
備考
connector
シンクテーブルのタイプ。
String
はい
デフォルト値なし
有効な値:
elasticsearch-6、elasticsearch-7、およびelasticsearch-8。説明VVR 8.0.5 以降でのみ
elasticsearch-8がサポートされています。hosts
Elasticsearch クラスタのエンドポイント。
String
はい
デフォルト値なし
例:
127.0.0.1:XXXX。index
Elasticsearch インデックスの名前。
String
はい
デフォルト値なし
Elasticsearch シンクテーブルは、静的インデックスと動的インデックスの両方をサポートしています。 静的インデックスと動的インデックスを使用する場合は、次の点に注意してください。
静的インデックスを使用する場合、index オプションの値は、
myusersなどの文字列である必要があります。 すべてのレコードはmyusersインデックスに書き込まれます。動的インデックスを使用する場合、
{field_name}を使用してレコード内のフィールド値を参照し、宛先インデックスを動的に生成できます。 また、{field_namedate_format_string}を使用して、TIMESTAMP、DATE、および TIME データ型のフィールド値をdate_format_stringで指定された形式に変換することもできます。date_format_stringは、Java の DateTimeFormatter と互換性があります。 たとえば、動的インデックスをmyusers-{log_tsyyyy-MM-dd}に設定した場合、log_ts フィールドの値にあるレコード2020-03-27 12:25:55は、myusers-2020-03-27インデックスに書き込まれます。
document-type
ドキュメントのタイプ。
String
connector オプションが elasticsearch-6 に設定されている場合、このオプションを構成する必要があります。
connector オプションが elasticsearch-7 に設定されている場合、このオプションはサポートされていません。
デフォルト値なし
connectorオプションがelasticsearch-6に設定されている場合、このオプションの値は、Elasticsearch に構成されているtypeオプションの値と同じである必要があります。username
Elasticsearch クラスタにアクセスするために使用されるユーザー名。
String
いいえ
デフォルト値なし
デフォルトでは、このオプションは空のままです。これは、権限検証が不要であることを示します。
usernameオプションを構成する場合は、passwordオプションも構成する必要があります。重要セキュリティを強化するために、プレーンテキストで AccessKey ペアをハードコーディングする代わりに変数を使用してください。 詳細については、「変数を管理する」をご参照ください。
password
Elasticsearch クラスタにアクセスするために使用されるパスワード。
String
いいえ
デフォルト値なし
document-id.key-delimiter
複数のドキュメント ID を区切るために使用されるデリミタ。
String
いいえ
_
Elasticsearch シンク テーブルでは、プライマリキーを使用して Elasticsearch のドキュメント ID が計算されます。[document-id.key-delimiter] で指定されたキーデリミタを使用して、DDL 文で定義された順序ですべてのプライマリキーフィールドが連結されます。各行についてもドキュメント ID が生成されます。
説明ドキュメント ID は、スペースを含まない最大 512 バイトの文字列です。
failure-handler
Elasticsearch リクエストが失敗した場合に使用されるエラー処理ポリシーです。
String
いいえ
fail
有効な値:
fail: リクエストが失敗した場合、デプロイメントは失敗します。これはデフォルト値です。
ignore: エラーは無視され、リクエストは削除されます。
retry_rejected: キューの容量が一杯でエラーが発生した場合、リクエストが再試行されます。
カスタム クラス名: ActionRequestFailureHandler サブクラスを使用して、エラーのトラブルシューティングを行います。
sink.flush-on-checkpoint
チェックポイント中にフラッシュ操作をトリガーするかどうかを指定します。
ブール値
いいえ
true
true:チェックポイント中にフラッシュ操作がトリガーされます。これはデフォルト値です。
false:チェックポイント中にフラッシュ操作はトリガーされません。この機能が無効になっている場合、Elasticsearch コネクタはチェックポイント中にすべての保留中のリクエストが完了するかどうかを確認するまで待機しません。したがって、Elasticsearch コネクタはリクエストに対して少なくとも1回保証を提供しません。
sink.bulk-flush.backoff.strategy
sink.bulk-flush.backoff.strategy オプションを構成して、一時的なリクエストエラーが原因でフラッシュ操作が失敗した場合の再試行ポリシーを指定できます。
Enum
いいえ
DISABLED
DISABLED:フラッシュ操作は再試行されません。最初のリクエストエラーが発生すると、フラッシュ操作は失敗します。これはデフォルト値です。
CONSTANT:各フラッシュ操作の待機時間は同じです。
EXPONENTIAL:各フラッシュ操作の待機時間は指数関数的に増加します。
sink.bulk-flush.backoff.max-retries
再試行の最大回数。
整数
いいえ
デフォルト値はありません
該当なし。
sink.bulk-flush.backoff.delay
再試行間の遅延。
期間
いいえ
デフォルト値はありません
sink.bulk-flush.backoff.strategyオプションがCONSTANTに設定されている場合、このオプションの値は再試行間の遅延時間です。sink.bulk-flush.backoff.strategyオプションがEXPONENTIALに設定されている場合、このオプションの値は初期ベースライン遅延です。
sink.bulk-flush.max-actions
リクエストのバッチごとに実行できる最大フラッシュ操作数です。
Int
いいえ
1000
値 0 は、この機能が無効になっていることを示します。
sink.bulk-flush.max-size
リクエストが保存されるバッファーの最大メモリサイズ。
String
いいえ
2 MB
デフォルト値: 2 。単位: MB 。このオプションが 0 に設定されている場合、この機能は無効になります。
sink.bulk-flush.interval
フラッシュ操作が実行される間隔です。
期間
いいえ
1s
デフォルト値: 1 。単位:秒。このオプションが 0 に設定されている場合、この機能は無効になります。
connection.path-prefix
各 REST 通信に追加する必要があるプレフィックス。
String
いいえ
デフォルト値はありません
該当なし。
リトライオンコンフリクト
更新操作でバージョン競合が原因で許可される最大リトライ回数です。リトライ回数がこのオプションの値を超えると、例外が発生し、デプロイメントは失敗します。
Int
いいえ
0
説明VVR 4.0.13 以降のみがこのオプションをサポートしています。
このオプションは、プライマリキーが指定されている場合にのみ有効になります。
ルーティングフィールド
Elasticsearch シンクテーブルの 1 つ以上のフィールド名。フィールド名は、ドキュメントを Elasticsearch クラスタの指定されたシャードにルーティングするために使用されます。
String
いいえ
デフォルト値なし
複数のフィールド名はセミコロン(;)で区切ります。フィールドが空の場合、フィールドは null に設定されます。
説明connectorオプションが elasticsearch-7 または elasticsearch-8 に設定されている場合、このオプションは VVR 8.0.6 以降でのみサポートされます。sink.delete-strategy
削除メッセージ(DELETE または UPDATE)を受信したときに実行される操作です。
Enum
いいえ
DELETE_ROW_ON_PK
有効な値:
DELETE_ROW_ON_PK:UPDATE メッセージを無視し、DELETE メッセージを受信したときにプライマリキー値に一致する行(ドキュメント)を削除します。これはデフォルト値です。
IGNORE_DELETE:UPDATE メッセージと DELETE メッセージを無視します。Elasticsearch シンクでは削除は発生しません。
NON_PK_FIELD_TO_NULL:UPDATE メッセージを無視し、DELETE メッセージを受信したときにプライマリキー値に一致する行(ドキュメント)を変更します。プライマリキー値は変更されず、テーブルスキーマ内のプライマリキー以外の値は NULL に設定されます。この値は、複数のシンクを使用して同じ Elasticsearch テーブルにデータを書き込むときに、データを部分的に更新するために使用されます。
CHANGELOG_STANDARD:DELETE_ROW_ON_PK と似ています。唯一の違いは、UPDATE メッセージを受信したときにも、プライマリキー値に一致する行(ドキュメント)が削除されることです。
説明VVR 8.0.8 以後のみがこのオプションをサポートしています。
sink.ignore-null-when-update
テーブルを更新するときにNULL値を無視するかどうかを指定します。
ブール値
いいえ
false
有効な値:
true: NULL値を無視します。
false: NULL値を無視しません。
説明プライマリキーを持つテーブル、かつ
formatオプションがJSONに設定されている場合のみ、trueがサポートされます。このオプションは VVR 11.1 以後でサポートされています。
ディメンションテーブル固有
オプション
説明
データの型
必須
デフォルト値
備考
コネクタ
ディメンションテーブルのタイプ。
String
はい
デフォルト値なし
値を elasticsearch に設定します。
エンドポイント
Elasticsearch クラスタのエンドポイント。
String
はい
デフォルト値なし
例:
http://127.0.0.1:XXXX。indexName
Elasticsearch インデックスの名前。
String
はい
デフォルト値なし
該当なし。
accessId
Elasticsearch クラスタへのアクセスに使用するユーザー名。
String
いいえ
デフォルト値なし
デフォルトでは、このオプションは空です。これは、権限検証が不要であることを示します。 accessId オプションを構成する場合は、 accessKey オプションも構成する必要があります。
重要セキュリティを強化するために、プレーンテキストで AccessKey ペアをハードコーディングする代わりに変数を使用してください。詳細については、「変数を管理する」をご参照ください。
accessKey
Elasticsearch クラスタへのアクセスに使用するパスワード。
String
いいえ
デフォルト値なし
typeNames
タイプの名前。
String
いいえ
_doc
Elasticsearch クラスタのバージョンが V7.0 以降の場合は、このオプションを構成しないことをお勧めします。
maxJoinRows
結合できる最大行数。
Integer
いいえ
1024
該当なし。
キャッシュ
キャッシュポリシー。
String
いいえ
なし
有効な値:
ALL: ディメンションテーブルのすべてのデータがキャッシュされます。デプロイメントが実行される前に、システムはディメンションテーブルのすべてのデータをキャッシュにロードします。これにより、後続のすべてのクエリでキャッシュが検索されます。システムがキャッシュ内でデータレコードを見つけられない場合、結合キーは存在しません。キャッシュエントリが期限切れになると、システムはキャッシュ内のすべてのデータを再ロードします。
LRU: ディメンションテーブルの一部のデータがキャッシュされます。ソーステーブルからデータレコードが読み取られるたびに、システムはキャッシュ内のデータを検索します。データが見つからない場合、システムは物理ディメンションテーブルでデータを検索します。
なし: データはキャッシュされません。
cacheSize
キャッシュできるデータの最大行数。
Long
いいえ
100000
cacheSize オプションは、キャッシュオプションを
LRUに設定した場合にのみ有効になります。cacheTTLMs
キャッシュのタイムアウト期間。
Long
いいえ
Long.MAX_VALUE
単位: ミリ秒。 cacheTTLMs オプションの構成は、キャッシュオプションによって異なります。
キャッシュオプションが LRU に設定されている場合、 cacheTTLMs オプションはキャッシュのタイムアウト期間を指定します。デフォルトでは、キャッシュエントリは期限切れになりません。
キャッシュオプションを
ALLに設定すると、 cacheTTLMs オプションはシステムがキャッシュをリフレッシュする間隔を指定します。デフォルトでは、キャッシュはリフレッシュされません。
ignoreKeywordSuffix
STRING データ型のフィールドの名前に自動的に追加される .keyword 接尾辞を無視するかどうかを指定します。
Boolean
いいえ
false
Realtime Compute for Apache Flink は、互換性を確保するために TEXT データ型のフィールドを STRING データ型のフィールドに変換します。デフォルトでは、.keyword 接尾辞が STRING データ型のフィールドの名前に追加されます。
有効な値:
true: .keyword 接尾辞は無視されます。
Elasticsearch シンクテーブルの TEXT データ型のフィールドを一致させることができない場合は、このオプションを
trueに設定します。false: .keyword 接尾辞は無視されません。
cacheEmpty
物理ディメンションテーブルで見つかった空の結果をキャッシュするかどうかを指定します。
Boolean
いいえ
true
cacheEmpty オプションは、キャッシュオプションが
LRUに設定されている場合にのみ有効になります。queryMaxDocs
各データレコードが非プライマリキーディメンションテーブルに送信された後、Elasticsearch サーバーがクエリされたときに返されるドキュメントの最大数。
Integer
いいえ
10000
デフォルト値 10000 は、このオプションの最大値でもあります。
説明VVR 8.0.8 以降でのみ、このオプションがサポートされています。
プライマリキーテーブルのデータは一意であるため、このオプションは非プライマリキーディメンションテーブルに対してのみ有効です。
クエリの正確性を確保するために、大きな値がデフォルト値として使用されます。ただし、大きな値は Elasticsearch クエリ中のメモリ使用量を増加させます。メモリ不足の問題が発生した場合は、値を小さくしてメモリ使用量を削減できます。
データ型マッピング
Realtime Compute for Apache Flink は、Elasticsearch データを JSON フォーマットで解析します。詳細については、「データ型マッピング」をご参照ください。
サンプルコード
ソーステーブルのサンプルコード
CREATE TEMPORARY TABLE elasticsearch_source ( name STRING, location STRING, `value` FLOAT ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'indexName' = '<yourIndexName>', 'typeNames' = '<yourTypeName>' ); CREATE TEMPORARY TABLE blackhole_sink ( name STRING, location STRING, `value` FLOAT ) WITH ( 'connector' ='blackhole' ); INSERT INTO blackhole_sink SELECT name, location, `value` FROM elasticsearch_source;ディメンションテーブルのサンプルコード
CREATE TEMPORARY TABLE datagen_source ( id STRING, data STRING, proctime as PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE es_dim ( id STRING, `value` FLOAT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'indexName' = '<yourIndexName>', 'typeNames' = '<yourTypeName>' ); CREATE TEMPORARY TABLE blackhole_sink ( id STRING, data STRING, `value` FLOAT ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT e.*, w.* FROM datagen_source AS e JOIN es_dim FOR SYSTEM_TIME AS OF e.proctime AS w ON e.id = w.id;シンクテーブル 1 のサンプルコード
CREATE TEMPORARY TABLE datagen_source ( id STRING, name STRING, uv BIGINT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE es_sink ( user_id STRING, user_name STRING, uv BIGINT, PRIMARY KEY (user_id) NOT ENFORCED -- プライマリキーはオプションです。プライマリキーを指定する場合、プライマリキーはドキュメント ID として使用されます。プライマリキーを指定しない場合、ランダムな値がドキュメント ID として使用されます。 ) WITH ( 'connector' = 'elasticsearch-6', 'hosts' = '<yourHosts>', 'index' = '<yourIndex>', 'document-type' = '<yourElasticsearch.types>', 'username' ='${secret_values.ak_id}', 'password' ='${secret_values.ak_secret}' ); INSERT INTO es_sink SELECT id, name, uv FROM datagen_source;シンクテーブル 2 のサンプルコード
CREATE TEMPORARY TABLE datagen_source( id STRING, details ROW< name STRING, ages ARRAY<INT>, attributes MAP<STRING, STRING> > ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE es_sink ( id STRING, details ROW< name STRING, ages ARRAY<INT>, attributes MAP<STRING, STRING> >, PRIMARY KEY (id) NOT ENFORCED -- プライマリキーはオプションです。プライマリキーを指定する場合、プライマリキーはドキュメント ID として使用されます。プライマリキーを指定しない場合、ランダムな値がドキュメント ID として使用されます。 ) WITH ( 'connector' = 'elasticsearch-6', 'hosts' = '<yourHosts>', 'index' = '<yourIndex>', 'document-type' = '<yourElasticsearch.types>', 'username' ='${secret_values.ak_id}', 'password' ='${secret_values.ak_secret}' ); INSERT INTO es_sink SELECT id, details FROM datagen_source;