すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:Elasticsearch シンクコネクタの作成

最終更新日:Apr 15, 2025

このトピックでは、ApsaraMQ for Kafka インスタンスのソース Topic から Elasticsearch クラスタのインデックスにデータを同期するために、Elasticsearch シンクコネクタを作成する方法について説明します。

前提条件

データを同期する前に、以下の要件が満たされていることを確認してください。

  • ApsaraMQ for Kafka

    • ApsaraMQ for Kafka インスタンスでコネクタ機能が有効になっていること。詳細については、コネクタ機能の有効化をご参照ください。

    • ApsaraMQ for Kafka インスタンスに Topic が作成されていること。詳細については、手順 1:Topic を作成するをご参照ください。

  • Function Compute

  • Elasticsearch

    説明
    • Function Compute で使用される Elasticsearch クライアントのバージョンは 7.7.0 です。互換性を確保するために、バージョン 7.0 以降の Elasticsearch クラスタを作成してください。

    • ホワイトリストを設定する際に、CIDR ブロックとして 0.0.0.0/0 を指定できます。これは、使用する仮想プライベートクラウド(VPC)内のすべての IP アドレスから Elasticsearch クラスタにアクセスできることを示します。アクセスが成功したら、必要に応じて CIDR ブロックを変更してください。

使用上の注意

  • ApsaraMQ for Kafka から Elasticsearch にデータを同期するには、ソース Topic を含む Message Queue for Apache Kafka インスタンスと Elasticsearch クラスタが同じリージョンにある必要があります。Message Queue for Apache Kafka は最初に Function Compute にデータを同期します。その後、Function Compute が Elasticsearch にデータを同期します。コネクタの制限については、制限をご参照ください。

  • Elasticsearch シンクコネクタは、Function Compute を使用してデータをエクスポートします。Function Compute は一定量の無料リソースを提供します。この無料枠を使い切った場合、使用した Function Compute リソースに対して課金規則に基づいて課金されます。詳細については、課金概要をご参照ください。

  • Function Compute では、関数呼び出しのログをクエリして問題をトラブルシューティングできます。詳細については、ロギングの設定をご参照ください。

  • ApsaraMQ for Kafka は、転送のためにメッセージを UTF-8 エンコードの文字列にシリアル化します。Message Queue for Apache Kafka はバイナリデータをサポートしていません。

  • デフォルトでは、Elasticsearch シンクコネクタに Elasticsearch クラスタのプライベートエンドポイントを指定した場合、Function Compute は Elasticsearch クラスタにアクセスできません。ネットワーク接続を確保するには、Function Compute コンソールで、関連する Function Compute サービスに Elasticsearch クラスタと同じ VPC と vSwitch を指定する必要があります。詳細については、サービスの更新をご参照ください。

Elasticsearch シンクコネクタの作成とデプロイ

  1. ApsaraMQ for Kafka コンソールにログインします。

  2. リソースの分布概要 ページの セクションで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。

  3. 左側のナビゲーションペインで、Connector タスクリスト をクリックします。

  4. Connector タスクリスト ページで、インスタンスの選択 ドロップダウンリストからコネクタが属するインスタンスを選択し、Connector の作成 をクリックします。

  5. Connector の作成 ウィザードを完了するために、以下の操作を実行します。

    1. 基本情報の設定 ステップで、パラメータを構成し、次へ をクリックします。次の表にパラメータを示します。

      重要

      デフォルトでは、Authorize to Create Service Linked Role が選択されています。これは、ApsaraMQ for Kafka が以下のルールに基づいてサービスリンクロールを作成することを意味します。

      • サービスリンクロールを作成していない場合、ApsaraMQ for Kafka は、Elasticsearch シンクコネクタを使用して ApsaraMQ for Kafka から Elasticsearch にデータを同期するために、サービスリンクロールを自動的に作成します。

      • サービスリンクロールが使用可能な場合、ApsaraMQ for Kafka は新しいロールを作成しません。

      サービスリンクロールの詳細については、サービスリンクロールをご参照ください。

      パラメータ

      説明

      名前

      コネクタの名前。以下の命名規則に基づいてコネクタ名を指定します。

      • コネクタ名は 1 ~ 48 文字で、数字、小文字、ハイフン(-)を含めることができます。名前はハイフン(-)で始めることはできません。

      • 名前は ApsaraMQ for Kafka インスタンス内で一意である必要があります。

      コネクタのデータ同期タスクは、connect-タスク名 形式で名前が付けられたコンシューマーグループを使用する必要があります。このようなコンシューマーグループを作成していない場合、Message Queue for Apache Kafka は自動的に作成します。

      kafka-elasticsearch-sink

      インスタンス

      Message Queue for Apache Kafka インスタンスに関する情報。デフォルトでは、インスタンスの名前と ID が表示されます。

      demo alikafka_post-cn-st21p8vj****

    2. ソースサービスの設定 ステップで、ソースサービスとして Message Queue for Apache Kafka を選択し、パラメータを構成して、次へ をクリックします。次の表にパラメータを示します。

      パラメータ

      説明

      データソース Topic

      データを同期する Topic の名前。

      elasticsearch-test-input

      コンシューマースレッドの同時発生数

      ソース Topic からデータを同期するために使用される同時実行コンシューマースレッドの数。デフォルト値:6。有効な値:

      • 1

      • 2

      • 3

      • 6

      • 12

      6

      消費の開始位置

      メッセージ消費を開始するコンシューマーオフセット。有効な値:

      • 一番古いオフセット:メッセージ消費は最も古いコンシューマーオフセットから開始されます。

      • 一番新しいオフセット:メッセージ消費は最新のコンシューマーオフセットから開始されます。

      一番古いオフセット

      VPC ID

      ソースインスタンスがデプロイされている VPC の ID。実行環境の設定 をクリックすると、パラメータが表示されます。デフォルトでは、ソースの ApsaraMQ for Kafka インスタンスをデプロイしたときに指定した VPC ID が表示されます。このパラメータを設定する必要はありません。

      vpc-bp1xpdnd3l***

      VSwitch ID

      ソースインスタンスが接続されている vSwitch の ID。実行環境の設定 をクリックすると、パラメータが表示されます。vSwitch は、ソースの ApsaraMQ for Kafka インスタンスと同じ VPC にデプロイする必要があります。デフォルト値は、ApsaraMQ for Kafka インスタンスをデプロイしたときに指定した vSwitch ID です。

      vsw-bp1d2jgg81***

      失敗の処理

      そのパーティションでメッセージ送信エラーが発生した後、パーティションへのサブスクリプションを保持するかどうかを指定します。実行環境の設定 をクリックすると、パラメータが表示されます。有効な値:

      • サブスクリプションの継続:サブスクリプションを保持します。エラーのログエントリが生成されます。

      • サブスクリプションの停止:サブスクリプションを停止します。エラーのログエントリが生成されます。

      説明
      • ログ情報の表示方法については、コネクタの管理をご参照ください。

      • エラーコードに基づいてエラーをトラブルシューティングする方法については、エラーコードをご参照ください。

      サブスクリプションの継続

      リソースの作成方法

      Elasticsearch シンクコネクタに必要な Topic とグループを作成する方法。実行環境の設定 をクリックすると、パラメータが表示されます。

      • 自動作成

      • 手動で作成します

      自動作成

      Connector コンシューマーグループ

      コネクタのデータ同期タスクで使用されるコンシューマーグループ。グループ 実行環境の設定 をクリックすると、パラメータが表示されます。このコンシューマーグループの名前は、connect-タスク名 形式である必要があります。

      connect-kafka-elasticsearch-sink

      タスクサイトの Topic

      コンシューマーオフセットを格納するために使用される Topic。実行環境の設定 をクリックすると、パラメータが表示されます。

      • Topic:Topic 名は connect-offset で始めることをお勧めします。

      • パーティション:Topic のパーティション数は 1 より大きい必要があります。

      • ストレージエンジン:Topic のストレージエンジンをローカルストレージに設定する必要があります。

        説明

        Professional Edition Message Queue for Apache Kafka インスタンスの Topic を作成する場合にのみ、ストレージエンジンをローカルストレージに設定できます。

      • cleanup.policy:Topic のログクリーンアップポリシーを Compact に設定する必要があります。

      connect-offset-kafka-elasticsearch-sink

      タスク設定の Topic

      タスク構成を格納するために使用される Topic。実行環境の設定 をクリックすると、パラメータが表示されます。

      • Topic:Topic 名は connect-config で始めることをお勧めします。

      • パーティション:Topic には 1 つのパーティションのみを含めることができます。

      • ストレージエンジン:Topic のストレージエンジンをローカルストレージに設定する必要があります。

        説明

        Professional Edition Message Queue for Apache Kafka インスタンスの Topic を作成する場合にのみ、ストレージエンジンをローカルストレージに設定できます。

      • cleanup.policy:Topic のログクリーンアップポリシーを Compact に設定する必要があります。

      connect-config-kafka-elasticsearch-sink

      タスクステータスの Topic

      タスクステータスを格納するために使用される Topic。実行環境の設定 をクリックすると、パラメータが表示されます。

      • Topic:Topic 名は connect-status で始めることをお勧めします。

      • パーティション:Topic のパーティション数を 6 に設定することをお勧めします。

      • ストレージエンジン:Topic のストレージエンジンをローカルストレージに設定する必要があります。

        説明

        Professional Edition Message Queue for Apache Kafka インスタンスの Topic を作成する場合にのみ、ストレージエンジンをローカルストレージに設定できます。

      • cleanup.policy:Topic のログクリーンアップポリシーを Compact に設定する必要があります。

      connect-status-kafka-elasticsearch-sink

      デッドレターキューの Topic

      Kafka Connect フレームワークのエラーデータを格納するために使用される Topic。実行環境の設定 をクリックすると、パラメータが表示されます。Topic リソースを節約するために、Topic を作成し、その Topic をデッドレターキュー Topic と エラーデータ Topic の両方として使用できます。

      • Topic:Topic 名は connect-error で始めることをお勧めします。

      • パーティション:Topic のパーティション数を 6 に設定することをお勧めします。

      • ストレージエンジン:Topic のストレージエンジンをローカルストレージまたはクラウドストレージに設定できます。

        説明

        Professional Edition Message Queue for Apache Kafka インスタンスの Topic を作成する場合にのみ、ストレージエンジンをローカルストレージに設定できます。

      connect-error-kafka-elasticsearch-sink

      例外データ Topic

      シンクコネクタのエラーデータを格納するために使用される Topic。実行環境の設定 をクリックすると、パラメータが表示されます。Topic リソースを節約するために、Topic を作成し、その Topic を デッドレターキュー Topic とエラーデータ Topic の両方として使用できます。

      • Topic:Topic 名は connect-error で始めることをお勧めします。

      • パーティション:Topic のパーティション数を 6 に設定することをお勧めします。

      • ストレージエンジン:Topic のストレージエンジンをローカルストレージまたはクラウドストレージに設定できます。

        説明

        Professional Edition Message Queue for Apache Kafka インスタンスの Topic を作成する場合にのみ、ストレージエンジンをローカルストレージに設定できます。

      connect-error-kafka-elasticsearch-sink

    3. ターゲットサービスの設定 ステップで、宛先サービスとして Elasticsearch を選択し、パラメータを構成して、作成 をクリックします。次の表にパラメータを示します。

      パラメータ

      説明

      ES インスタンス ID

      Elasticsearch クラスタの ID。

      es-cn-oew1o67x0000****

      アクセスアドレス

      Elasticsearch クラスタのパブリックまたはプライベートエンドポイント。詳細については、クラスタの基本情報の表示をご参照ください。

      es-cn-oew1o67x0000****.elasticsearch.aliyuncs.com

      アクセスポート

      Elasticsearch クラスタへのアクセスに使用されるパブリックまたはプライベートポート。有効な値:

      • 9200:HTTP および HTTPS の場合

      • 9300:TCP の場合

      詳細については、クラスタの基本情報の表示をご参照ください。

      9300

      ユーザー名

      Kibana コンソールへのログインに使用されるユーザー名。デフォルト値:elastic。ユーザー名をカスタマイズすることもできます。詳細については、Elasticsearch X-Pack が提供する RBAC メカニズムを使用してアクセス制御を実装するをご参照ください。

      elastic

      ユーザーパスワード

      Kibana コンソールへのログインに使用されるパスワード。elastic ユーザーのパスワードは、Elasticsearch クラスタの作成時に指定されます。パスワードを忘れた場合は、リセットできます。詳細については、Elasticsearch クラスタのアクセスパスワードのリセットをご参照ください。

      ********

      インデックス

      Elasticsearch インデックスの名前。

      elastic_test

      説明
      • ユーザー名とパスワードは、Elasticsearch オブジェクトの初期化に使用されます。bulk を使用してメッセージを送信するには、アカウントにインデックスへの書き込み権限があることを確認してください。

      • ユーザー名とパスワードは、ApsaraMQ for Kafka がデータエクスポートタスクを作成するときに、環境変数として Function Compute の関数に渡されます。タスクが作成された後、ApsaraMQ for Kafka はユーザー名またはパスワードを保存しません。

      コネクタが作成された後、Connector タスクリスト ページでコネクタを表示できます。

  6. Connector タスクリスト ページに移動し、作成したコネクタを見つけて、デプロイ操作 列の をクリックします。

関連する Function Compute サービスの構成

ApsaraMQ for Kafka コンソールで Elasticsearch シンクコネクタを作成してデプロイした後、Function Compute はコネクタの Function Compute サービスを自動的に作成し、kafka-service-<Connector_name>-<Random string> 形式でサービスに名前を付けます。

  1. Connector タスクリスト ページで、作成したコネクタを見つけます。コネクタの 操作 列で、詳細 > 関数の設定 を選択します。

    Function Compute コンソールにリダイレクトされます。

  2. Function Compute コンソールで、自動的に作成されたサービスを見つけ、サービスの VPC と vSwitch を構成します。VPC と vSwitch が Elasticsearch クラスタに指定されたものと同じであることを確認してください。詳細については、サービスの更新をご参照ください。

メッセージの送信

ApsaraMQ for Kafka インスタンスのソース Topic にメッセージを送信して、データが Elasticsearch に同期されるかどうかをテストできます。

  1. Connector タスクリスト ページで、管理するコネクタを見つけて、テスト操作 列の をクリックします。

  2. メッセージの送信 パネルで、テストメッセージを送信するためのパラメータを構成します。

    • 送信方法 パラメータを コンソール に設定した場合は、次の手順を実行します。

      1. メッセージキー フィールドに、メッセージキーを入力します。例:demo。

      2. メッセージの内容 フィールドに、メッセージの内容を入力します。例:{"key": "test"}。

      3. 指定されたパーティションに送信 パラメータを構成して、テストメッセージを特定のパーティションに送信するかどうかを指定します。

        • テストメッセージを特定のパーティションに送信する場合は、はい をクリックし、パーティション ID フィールドにパーティション ID を入力します。例:0。パーティション ID をクエリする方法については、パーティションステータスの表示をご参照ください。

        • テストメッセージを特定のパーティションに送信しない場合は、いいえ をクリックします。

    • 送信方法 パラメータを Docker に設定した場合は、Docker コンテナーを実行してサンプルメッセージを生成する セクションの Docker コマンドを実行して、テストメッセージを送信します。

    • 送信方法 パラメータを SDK に設定した場合は、必要なプログラミング言語またはフレームワークの SDK と、テストメッセージを送受信するためのアクセス方法を選択します。

結果の確認

ApsaraMQ for Kafka インスタンスのソース Topic にメッセージを送信した後、Kibana コンソールにログインし、GET /<index_name>/_search コマンドを実行して Elasticsearch インデックスを表示し、データが同期されているかどうかを確認します。

次のコードは、ApsaraMQ for Kafka から Elasticsearch に同期されたデータの例を示しています。

{
  "took" : 8,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "product_****",
        "_type" : "_doc",
        "_id" : "TX3TZHgBfHNEDGoZ****",
        "_score" : 1.0,
        "_source" : {
          "msg_body" : {
            "key" : "test",  // メッセージキー
            "offset" : 2, // オフセット
            "overflowFlag" : false, // オーバーフローフラグ
            "partition" : 2, // パーティション
            "timestamp" : 1616599282417, // タイムスタンプ
            "topic" : "dv****", // Topic
            "value" : "test1", // 値
            "valueSize" : 8 // 値のサイズ
          },
          "doc_as_upsert" : true
        }
      }
    ]
  }
}