Elasticsearch sink コネクタは、ご利用の ApsaraMQ for Kafka インスタンスの Topic からメッセージを読み取り、Elasticsearch インデックスに書き込みます。このコネクタは Function Compute をミドルウェアとして使用します。ソース Topic からメッセージをコンシュームし、Function Compute の関数に渡し、その関数が Bulk API を介して Elasticsearch に書き込みます。各メッセージは、Topic、パーティション、オフセット、タイムスタンプなどのメタデータとともに、ターゲットインデックス内のドキュメントになります。
事前準備
コネクタを作成する前に、以下の設定を完了してください。
ApsaraMQ for Kafka
Function Compute
Elasticsearch
Elasticsearch コンソールで Elasticsearch クラスターとインデックスを作成します。Function Compute クライアント (バージョン 7.7.0) との互換性のために、バージョン 7.0 以降を使用してください。
Function Compute エンドポイントの CIDR ブロックを Elasticsearch のホワイトリストに追加します。初期テストでは、VPC 内のすべての IP アドレスを許可するために
0.0.0.0/0を指定し、接続性を確認した後に範囲を制限します。
収集する情報
ウィザードを開始する前に、次の詳細情報を収集してください。
| 情報 | 確認場所 | 例 |
|---|---|---|
| Elasticsearch インスタンス ID | Elasticsearch コンソール | es-cn-oew1o67x0000**** |
| Elasticsearch エンドポイント (パブリックまたはプライベート) | クラスターの基本情報 | es-cn-oew1o67x0000****.elasticsearch.aliyuncs.com |
| Elasticsearch ポート | クラスターの基本情報 | 9200 (HTTP/HTTPS) または 9300 (TCP) |
| Elasticsearch のユーザー名とパスワード | クラスターの作成時に設定されます。必要に応じてリセット | elastic / ******** |
| Elasticsearch インデックス名 | Elasticsearch コンソール | elastic_test |
| ソース Topic 名 | ApsaraMQ for Kafka コンソール | elasticsearch-test-input |
制限事項
ApsaraMQ for Kafka インスタンスと Elasticsearch クラスターは、同じリージョンにある必要があります。
ApsaraMQ for Kafka はメッセージを UTF-8 文字列としてシリアル化します。バイナリデータはサポートされていません。
Elasticsearch クラスターのプライベートエンドポイントを指定した場合、デフォルトでは Function Compute はアクセスできません。接続を有効にするには、Elasticsearch クラスターと同じ VPC と vSwitch を使用するように Function Compute サービスを設定します。詳細については、「Function Compute サービスの設定」をご参照ください。
その他のコネクタの制限事項については、「制限事項」をご参照ください。
課金
コネクタは Function Compute を使用してデータをエクスポートします。Function Compute は無料利用枠を提供しています。無料利用枠を超える使用量については、Function Compute の課金に従って請求されます。
コネクタの作成とデプロイ
ApsaraMQ for Kafka コンソールにログインします。
[概要] ページの [リソース配布] セクションで、ご利用のリージョンを選択します。
左側のナビゲーションウィンドウで、[コネクタ] をクリックします。
[コネクタ] ページで、[インスタンスの選択] ドロップダウンリストからご利用のインスタンスを選択し、[コネクタの作成] をクリックします。
ステップ 1:基本情報の構成
[基本情報の構成] ステップで、コネクタ名を設定し、インスタンスの詳細を確認します。
| パラメーター | 説明 | 例 |
|---|---|---|
| 名前 | ApsaraMQ for Kafka インスタンス内で一意の名前。1~48 文字の数字、小文字、ハイフン (-) を使用します。ハイフンで始めることはできません。コネクタは自動的に connect-<connector-name> という名前のコンシューマーグループを作成します。 | kafka-elasticsearch-sink |
| インスタンス | 現在のインスタンスの名前と ID を表示します。 | demo alikafka_post-cn-st21p8vj**** |
デフォルトでは、[サービスリンクロールの作成を承認] が選択されています。ApsaraMQ for Kafka は、サービスリンクロールがまだ存在しない場合に作成します。
[次へ] をクリックします。
ステップ 2:ソースサービスの構成
[ソースサービスの構成] ステップで、ソースサービスとして [Message Queue for Apache Kafka] を選択し、以下のパラメーターを設定します。
| パラメーター | 説明 | 例 |
|---|---|---|
| データソーストピック | データを消費するトピックです。 | elasticsearch-test-input |
| コンシューマースレッドの同時実行数 | 同時実行されるコンシューマースレッドの数です。有効な値:1、2、3、6、12。デフォルト値:6。 | 6 |
| コンシューマーオフセット | データ消費を開始する位置です。最も古いオフセット を選択すると、先頭から読み取ります。最新のオフセット を選択すると、新規メッセージのみを読み取ります。 | 最も古いオフセット |
[ランタイム環境の構成] をクリックして、追加のパラメーターを展開します。
| パラメーター | 説明 | 例 |
|---|---|---|
| VPC ID | ソースインスタンスの VPC。自動入力されるため、変更は不要です。 | vpc-bp1xpdnd3l*** |
| vSwitch ID | ソースインスタンスの vSwitch。同じ VPC 内にある必要があります。 | vsw-bp1d2jgg81*** |
| 障害処理ポリシー | メッセージに障害が発生した場合に実行するアクション。[サブスクリプションの継続] はエラーをログに記録し、コンシュームを続行します。[サブスクリプションの停止] はエラーをログに記録し、パーティションを停止します。ログの詳細については「コネクタの管理」を、トラブルシューティングについては「エラーコード」をご参照ください。 | サブスクリプションの継続 |
| リソース作成方法 | [自動] は必要な内部 Topic を自動的に作成します。[手動] はご自身で作成できます。 | 自動 |
| コネクタコンシューマーグループ | コネクタタスクのコンシューマーグループ。フォーマット:connect-<connector-name>。 | connect-kafka-elasticsearch-sink |
内部 Topic (手動作成のみ)
[リソース作成方法] を [手動] に設定した場合は、以下の Topic を作成してください。[ローカルストレージ] が必要なすべての Topic は、Professional Edition インスタンスでのみ利用可能です。
| パラメーター | 命名規則 | パーティション | ストレージエンジン | cleanup.policy |
|---|---|---|---|---|
| タスク オフセット トピック | connect-offset-* | 1 より大きい | ローカルストレージ | コンパクト |
| タスク設定 Topic | connect-config-* | ちょうど 1 | ローカルストレージ | コンパクト |
| タスクステータス Topic | connect-status-* | 6 (推奨) | ローカルストレージ | コンパクト |
| デッドレターキュー Topic | connect-error-* | 6 (推奨) | ローカルストレージまたはクラウドストレージ | -- |
| エラーデータ Topic | connect-error-* | 6 (推奨) | ローカルストレージまたはクラウドストレージ | -- |
Topic リソースを節約するために、デッドレターキュー Topic とエラーデータ Topic には同じ Topic を使用してください。
[次へ] をクリックします。
ステップ 3:宛先サービスの構成
[宛先サービスの構成] ステップで、宛先サービスとして [Elasticsearch] を選択し、以下のパラメーターを設定します。
| パラメーター | 説明 | 例 |
|---|---|---|
| Elasticsearch インスタンス ID | Elasticsearch クラスターの ID。 | es-cn-oew1o67x0000**** |
| エンドポイント | クラスターのパブリックまたはプライベートエンドポイント。詳細については、「クラスターの基本情報を表示」をご参照ください。 | es-cn-oew1o67x0000****.elasticsearch.aliyuncs.com |
| ポート | HTTP/HTTPS の場合は 9200、TCP の場合は 9300。 | 9300 |
| ユーザー名 | Elasticsearch のユーザー名。デフォルト:elastic。必要に応じて X-Pack RBAC を通じてカスタマイズしてください。アカウントにはターゲットインデックスへの書き込み権限が必要です。 | elastic |
| パスワード | クラスター作成時に設定したパスワード。忘れた場合はパスワードをリセットしてください。 | ******** |
| インデックス | ターゲットの Elasticsearch インデックス名。 | elastic_test |
- ユーザー名とパスワードは、コネクタタスクの作成時に環境変数として Function Compute に渡されます。ApsaraMQ for Kafka は、タスク作成後にこれらの認証情報を保存しません。
- メッセージは Elasticsearch Bulk API を介して転送されるため、アカウントにはインデックスへの書き込み権限が必要です。
[作成] をクリックします。
コネクタのデプロイ
作成後、コネクタは [コネクタ] ページに表示されます。[操作] 列の [デプロイ] をクリックしてコネクタを開始します。
Function Compute サービスの設定
コネクタをデプロイすると、Function Compute は自動的に kafka-service-<connector-name>-<random-string> という名前のサービスを作成します。Elasticsearch クラスターがプライベートエンドポイントを使用している場合は、Elasticsearch クラスターと同じ VPC と vSwitch を使用するように Function Compute サービスを設定します。
[コネクタ] ページでコネクタを見つけます。[操作] 列で、[その他] > [関数の設定] を選択します。
Function Compute コンソールで、自動作成されたサービスを見つけ、Elasticsearch クラスターに合わせて VPC と vSwitch の設定を更新します。
データフローの検証
テストメッセージを送信して、データが ApsaraMQ for Kafka から Elasticsearch に流れることを確認します。
テストメッセージの送信
[コネクタ] ページでコネクタを見つけ、[操作] 列の [テスト] をクリックします。
[メッセージの送信] パネルで、[送信方法] を [コンソール] に設定します。
[メッセージキー] フィールドに、キー (例:
demo) を入力します。[メッセージ内容] フィールドに、JSON 本文 (例:) を入力します。
{"key": "test"}「[指定されたパーティションに送信]」の場合、「[はい]」をクリックし、特定のパーティションを対象とするために[パーティション ID](例:
0)を入力するか、または「[いいえ]」をクリックしてシステムに割り当てさせます。パーティション ID を確認するには、「パーティションのステータスを表示」をご参照ください。
テストメッセージは [Docker] または [SDK] を通じても送信できます。[送信方法] フィールドで対応するオプションを選択し、画面の指示に従ってください。
Elasticsearch インデックスの確認
次のクエリを実行して、ターゲットインデックスを検索します。
GET /<index_name>/_search応答に送信したメッセージが含まれていることを確認します。成功した応答は次のようになります。
{ "took": 8, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 1, "relation": "eq" }, "max_score": 1.0, "hits": [ { "_index": "product_****", "_type": "_doc", "_id": "TX3TZHgBfHNEDGoZ****", "_score": 1.0, "_source": { "msg_body": { "key": "test", "offset": 2, "overflowFlag": false, "partition": 2, "timestamp": 1616599282417, "topic": "dv****", "value": "test1", "valueSize": 8 }, "doc_as_upsert": true } } ] } }
トラブルシューティング
| 現象 | 考えられる原因 | 解決方法 |
|---|---|---|
| コネクタが Elasticsearch への書き込みに失敗する | Function Compute が Elasticsearch クラスターに到達できない | Function Compute サービスが Elasticsearch クラスターと同じ VPC と vSwitch を使用していることを確認します。詳細については、「Function Compute サービスの設定」をご参照ください。 |
| メッセージがコンシュームされない | コンシューマーオフセットの設定が正しくない | [コンシューマーオフセット] の設定を確認します。既存のすべてのメッセージを読み取るには [最も早いオフセット] を、新しいメッセージのみを読み取るには [最新のオフセット] を使用します。 |
| 認証エラー | 無効な認証情報または権限不足 | ユーザー名とパスワードが正しいこと、およびアカウントがターゲットインデックスへの書き込み権限を持っていることを確認します。 |
Function Compute の関数呼び出しログについては、「ログ機能の設定」をご参照ください。