すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:Elasticsearch sink コネクタの作成

最終更新日:Mar 11, 2026

Elasticsearch sink コネクタは、ご利用の ApsaraMQ for Kafka インスタンスの Topic からメッセージを読み取り、Elasticsearch インデックスに書き込みます。このコネクタは Function Compute をミドルウェアとして使用します。ソース Topic からメッセージをコンシュームし、Function Compute の関数に渡し、その関数が Bulk API を介して Elasticsearch に書き込みます。各メッセージは、Topic、パーティション、オフセット、タイムスタンプなどのメタデータとともに、ターゲットインデックス内のドキュメントになります。

事前準備

コネクタを作成する前に、以下の設定を完了してください。

ApsaraMQ for Kafka

Function Compute

Elasticsearch

収集する情報

ウィザードを開始する前に、次の詳細情報を収集してください。

情報確認場所
Elasticsearch インスタンス IDElasticsearch コンソールes-cn-oew1o67x0000****
Elasticsearch エンドポイント (パブリックまたはプライベート)クラスターの基本情報es-cn-oew1o67x0000****.elasticsearch.aliyuncs.com
Elasticsearch ポートクラスターの基本情報9200 (HTTP/HTTPS) または 9300 (TCP)
Elasticsearch のユーザー名とパスワードクラスターの作成時に設定されます。必要に応じてリセットelastic / ********
Elasticsearch インデックス名Elasticsearch コンソールelastic_test
ソース Topic 名ApsaraMQ for Kafka コンソールelasticsearch-test-input

制限事項

  • ApsaraMQ for Kafka インスタンスと Elasticsearch クラスターは、同じリージョンにある必要があります。

  • ApsaraMQ for Kafka はメッセージを UTF-8 文字列としてシリアル化します。バイナリデータはサポートされていません。

  • Elasticsearch クラスターのプライベートエンドポイントを指定した場合、デフォルトでは Function Compute はアクセスできません。接続を有効にするには、Elasticsearch クラスターと同じ VPC と vSwitch を使用するように Function Compute サービスを設定します。詳細については、「Function Compute サービスの設定」をご参照ください。

  • その他のコネクタの制限事項については、「制限事項」をご参照ください。

課金

コネクタは Function Compute を使用してデータをエクスポートします。Function Compute は無料利用枠を提供しています。無料利用枠を超える使用量については、Function Compute の課金に従って請求されます。

コネクタの作成とデプロイ

  1. ApsaraMQ for Kafka コンソールにログインします。

  2. [概要] ページの [リソース配布] セクションで、ご利用のリージョンを選択します。

  3. 左側のナビゲーションウィンドウで、[コネクタ] をクリックします。

  4. [コネクタ] ページで、[インスタンスの選択] ドロップダウンリストからご利用のインスタンスを選択し、[コネクタの作成] をクリックします。

ステップ 1:基本情報の構成

[基本情報の構成] ステップで、コネクタ名を設定し、インスタンスの詳細を確認します。

パラメーター説明
名前ApsaraMQ for Kafka インスタンス内で一意の名前。1~48 文字の数字、小文字、ハイフン (-) を使用します。ハイフンで始めることはできません。コネクタは自動的に connect-<connector-name> という名前のコンシューマーグループを作成します。kafka-elasticsearch-sink
インスタンス現在のインスタンスの名前と ID を表示します。demo alikafka_post-cn-st21p8vj****
重要

デフォルトでは、[サービスリンクロールの作成を承認] が選択されています。ApsaraMQ for Kafka は、サービスリンクロールがまだ存在しない場合に作成します。

[次へ] をクリックします。

ステップ 2:ソースサービスの構成

[ソースサービスの構成] ステップで、ソースサービスとして [Message Queue for Apache Kafka] を選択し、以下のパラメーターを設定します。

パラメーター説明
データソーストピックデータを消費するトピックです。elasticsearch-test-input
コンシューマースレッドの同時実行数同時実行されるコンシューマースレッドの数です。有効な値:1、2、3、6、12。デフォルト値:6。6
コンシューマーオフセットデータ消費を開始する位置です。最も古いオフセット を選択すると、先頭から読み取ります。最新のオフセット を選択すると、新規メッセージのみを読み取ります。最も古いオフセット

[ランタイム環境の構成] をクリックして、追加のパラメーターを展開します。

パラメーター説明
VPC IDソースインスタンスの VPC。自動入力されるため、変更は不要です。vpc-bp1xpdnd3l***
vSwitch IDソースインスタンスの vSwitch。同じ VPC 内にある必要があります。vsw-bp1d2jgg81***
障害処理ポリシーメッセージに障害が発生した場合に実行するアクション。[サブスクリプションの継続] はエラーをログに記録し、コンシュームを続行します。[サブスクリプションの停止] はエラーをログに記録し、パーティションを停止します。ログの詳細については「コネクタの管理」を、トラブルシューティングについては「エラーコード」をご参照ください。
説明
  • ログの表示方法の詳細については、「コネクタの関連操作」をご参照ください。
  • エラーコードに基づいた解決策の見つけ方については、「エラーコード」をご参照ください。
サブスクリプションの継続
リソース作成方法[自動] は必要な内部 Topic を自動的に作成します。[手動] はご自身で作成できます。自動
コネクタコンシューマーグループコネクタタスクのコンシューマーグループ。フォーマット:connect-<connector-name>connect-kafka-elasticsearch-sink

内部 Topic (手動作成のみ)

[リソース作成方法][手動] に設定した場合は、以下の Topic を作成してください。[ローカルストレージ] が必要なすべての Topic は、Professional Edition インスタンスでのみ利用可能です。

パラメーター命名規則パーティションストレージエンジンcleanup.policy
タスク オフセット トピックconnect-offset-*1 より大きいローカルストレージコンパクト
タスク設定 Topicconnect-config-*ちょうど 1ローカルストレージコンパクト
タスクステータス Topicconnect-status-*6 (推奨)ローカルストレージコンパクト
デッドレターキュー Topicconnect-error-*6 (推奨)ローカルストレージまたはクラウドストレージ--
エラーデータ Topicconnect-error-*6 (推奨)ローカルストレージまたはクラウドストレージ--
説明

Topic リソースを節約するために、デッドレターキュー Topic とエラーデータ Topic には同じ Topic を使用してください。

[次へ] をクリックします。

ステップ 3:宛先サービスの構成

[宛先サービスの構成] ステップで、宛先サービスとして [Elasticsearch] を選択し、以下のパラメーターを設定します。

パラメーター説明
Elasticsearch インスタンス IDElasticsearch クラスターの ID。es-cn-oew1o67x0000****
エンドポイントクラスターのパブリックまたはプライベートエンドポイント。詳細については、「クラスターの基本情報を表示」をご参照ください。es-cn-oew1o67x0000****.elasticsearch.aliyuncs.com
ポートHTTP/HTTPS の場合は 9200、TCP の場合は 9300。9300
ユーザー名Elasticsearch のユーザー名。デフォルト:elastic。必要に応じて X-Pack RBAC を通じてカスタマイズしてください。アカウントにはターゲットインデックスへの書き込み権限が必要です。elastic
パスワードクラスター作成時に設定したパスワード。忘れた場合はパスワードをリセットしてください。********
インデックスターゲットの Elasticsearch インデックス名。elastic_test
説明
  • ユーザー名とパスワードは、コネクタタスクの作成時に環境変数として Function Compute に渡されます。ApsaraMQ for Kafka は、タスク作成後にこれらの認証情報を保存しません。
  • メッセージは Elasticsearch Bulk API を介して転送されるため、アカウントにはインデックスへの書き込み権限が必要です。

[作成] をクリックします。

コネクタのデプロイ

作成後、コネクタは [コネクタ] ページに表示されます。[操作] 列の [デプロイ] をクリックしてコネクタを開始します。

Function Compute サービスの設定

コネクタをデプロイすると、Function Compute は自動的に kafka-service-<connector-name>-<random-string> という名前のサービスを作成します。Elasticsearch クラスターがプライベートエンドポイントを使用している場合は、Elasticsearch クラスターと同じ VPC と vSwitch を使用するように Function Compute サービスを設定します。

  1. [コネクタ] ページでコネクタを見つけます。[操作] 列で、[その他] > [関数の設定] を選択します。

  2. Function Compute コンソールで、自動作成されたサービスを見つけ、Elasticsearch クラスターに合わせて VPC と vSwitch の設定を更新します。

データフローの検証

テストメッセージを送信して、データが ApsaraMQ for Kafka から Elasticsearch に流れることを確認します。

テストメッセージの送信

  1. [コネクタ] ページでコネクタを見つけ、[操作] 列の [テスト] をクリックします。

  2. [メッセージの送信] パネルで、[送信方法][コンソール] に設定します。

  3. [メッセージキー] フィールドに、キー (例:demo) を入力します。

  4. [メッセージ内容] フィールドに、JSON 本文 (例:) を入力します。

       {"key": "test"}
  5. [指定されたパーティションに送信]」の場合、「[はい]」をクリックし、特定のパーティションを対象とするために[パーティション ID](例: 0)を入力するか、または「[いいえ]」をクリックしてシステムに割り当てさせます。パーティション ID を確認するには、「パーティションのステータスを表示」をご参照ください。

テストメッセージは [Docker] または [SDK] を通じても送信できます。[送信方法] フィールドで対応するオプションを選択し、画面の指示に従ってください。

Elasticsearch インデックスの確認

  1. Kibana コンソールにログインします。

  2. 次のクエリを実行して、ターゲットインデックスを検索します。

       GET /<index_name>/_search
  3. 応答に送信したメッセージが含まれていることを確認します。成功した応答は次のようになります。

       {
         "took": 8,
         "timed_out": false,
         "_shards": {
           "total": 5,
           "successful": 5,
           "skipped": 0,
           "failed": 0
         },
         "hits": {
           "total": {
             "value": 1,
             "relation": "eq"
           },
           "max_score": 1.0,
           "hits": [
             {
               "_index": "product_****",
               "_type": "_doc",
               "_id": "TX3TZHgBfHNEDGoZ****",
               "_score": 1.0,
               "_source": {
                 "msg_body": {
                   "key": "test",
                   "offset": 2,
                   "overflowFlag": false,
                   "partition": 2,
                   "timestamp": 1616599282417,
                   "topic": "dv****",
                   "value": "test1",
                   "valueSize": 8
                 },
                 "doc_as_upsert": true
               }
             }
           ]
         }
       }

トラブルシューティング

現象考えられる原因解決方法
コネクタが Elasticsearch への書き込みに失敗するFunction Compute が Elasticsearch クラスターに到達できないFunction Compute サービスが Elasticsearch クラスターと同じ VPC と vSwitch を使用していることを確認します。詳細については、「Function Compute サービスの設定」をご参照ください。
メッセージがコンシュームされないコンシューマーオフセットの設定が正しくない[コンシューマーオフセット] の設定を確認します。既存のすべてのメッセージを読み取るには [最も早いオフセット] を、新しいメッセージのみを読み取るには [最新のオフセット] を使用します。
認証エラー無効な認証情報または権限不足ユーザー名とパスワードが正しいこと、およびアカウントがターゲットインデックスへの書き込み権限を持っていることを確認します。

Function Compute の関数呼び出しログについては、「ログ機能の設定」をご参照ください。