すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:Elasticsearch シンクコネクタの作成

最終更新日:Feb 17, 2025

このトピックでは、ApsaraMQ for Kafka インスタンストピックのデータを Elasticsearch にエクスポートする Elasticsearch シンクコネクタを作成する方法について説明します。

前提条件

前提条件については、前提条件をご参照ください。

手順 1:Elasticsearch リソースの作成

手順 2:Elasticsearch シンクコネクタの作成

  1. ApsaraMQ for Kafka コンソールにログインします。リソースの分布 セクションの 概要 ページで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。

  2. 左側のナビゲーションペインで、[コネクタエコシステム統合] > [タスク] を選択します。

  3. [タスク] ページで、[タスクの作成] をクリックします。

  4. タスクの作成 ページで、タスク名 パラメータと 説明 パラメータを設定します。次に、画面の指示に従って他のパラメータを設定します。

    • タスクの作成

      1. Source (ソース) 手順で、データプロバイダー パラメータを [message Queue For Apache Kafka] に設定し、画面の指示に従って他のパラメータを設定します。次に、[次の手順] をクリックします。次の表にパラメータを示します。

        パラメータ

        説明

        Region

        ApsaraMQ for Kafka インスタンスが存在するリージョン。

        中国 (杭州)

        Message Queue For Apache Kafka インスタンス

        ルーティングするデータが生成される ApsaraMQ for Kafka インスタンスの ID。

        alikafka_post-cn-9hdsbdhd****

        Topic

        ルーティングするデータが生成される ApsaraMQ for Kafka インスタンストピック。

        guide-sink-topic

        グループ ID

        ルーティングするデータが生成される ApsaraMQ for Kafka インスタンスのグループの ID。

        • [クイック作成]: システムは、ID が GID_EVENTBRIDGE_xxx 形式のグループを自動的に作成します。

        • [既存のグループを使用]: 使用されていない既存のグループの ID を選択します。使用中の既存のグループを選択すると、既存のメッセージの発行と購読に影響します。

        既存のグループを使用

        Consumer Offset

        • [最新のオフセット]: メッセージは最新のオフセットから消費されます。

        • [最も古いオフセット]: メッセージは最も古いオフセットから消費されます。

        最新のオフセット

        ネットワーク設定

        国境を越えたデータ送信が必要な場合は、[インターネット] を選択します。それ以外の場合は、[ベーシックネットワーク] を選択します。

        ベーシックネットワーク

        データ形式

        データ形式機能は、ソースから配信されたバイナリデータを特定のデータ形式にエンコードするために使用されます。複数のデータ形式がサポートされています。エンコードに関する特別な要件がない場合は、値として Json を指定します。

        • Json: バイナリデータは UTF-8 エンコーディングに基づいて JSON 形式のデータにエンコードされ、ペイロードに配置されます。これはデフォルト値です。

        • Text: バイナリデータは UTF-8 エンコーディングに基づいて文字列にエンコードされ、ペイロードに配置されます。

        • Binary: バイナリデータは Base64 エンコーディングに基づいて文字列にエンコードされ、ペイロードに配置されます。

        Json

        一括プッシュの件数

        各関数呼び出しで送信できるメッセージの最大数。リクエストは、バックログ内のメッセージ数が指定された値に達した場合にのみ送信されます。有効な値: 1 ~ 10000。

        2000

        バッチプッシュ間隔 (単位:秒)

        関数が呼び出される時間間隔。システムは、指定された時間間隔で集約されたメッセージを Function Compute に送信します。有効な値: 0 ~ 15。単位: 秒。値 0 は、集約後すぐにメッセージが送信されることを示します。

        3

      2. Filtering (フィルタリング) 手順で、データをフィルタリングするデータパターンを定義します。詳細については、メッセージのフィルタリングをご参照ください。

      3. Transform (変換) 手順で、分割、マッピング、エンリッチメント、動的ルーティングなどのデータ処理機能を実装するデータクレンジング方法を指定します。詳細については、Function Compute を使用したメッセージクレンジングの実行をご参照ください。

      4. Sink (ターゲット) 手順で、サービスタイプ パラメータを [elasticsearch Acs.elasticsearch] に設定し、他のパラメータを設定します。次の表にパラメータを示します。

        パラメータ

        説明

        Elasticsearch クラスタ

        作成した Alibaba Cloud Elasticsearch クラスタ。

        es-cn-pe336j0gj001e****

        クラスタログイン名

        Elasticsearch クラスタの作成時に指定したログイン名。デフォルトのログイン名: elastic。

        elastic

        クラスタログインパスワード

        Elasticsearch クラスタの作成時に指定したパスワード。

        ******

        インデックス名

        作成したインデックスの名前。インデックスの作成方法については、手順 3:インデックスの作成をご参照ください。文字列定数または JSONPath 構文を使用して抽出されたバリアントがサポートされています。例: product_info および $.data.key。

        product_info

        ドキュメントタイプ

        ドキュメントタイプ。文字列定数または JSONPath 構文を使用して抽出されたバリアントがサポートされています。

        例: _doc および $.data.key。

        説明

        このパラメータは、Elasticsearch インスタンスのバージョンが 7 より前の場合にのみ使用できます。このパラメータのデフォルト値は _doc です。

        _doc

        ドキュメント

        イベントのすべてのコンテンツまたはイベントの特定のコンテンツを Elasticsearch に配信するかどうかを指定します。このパラメータに [部分イベント] を選択した場合は、JSONPath 抽出ルールを指定する必要があります。

        完全なイベント

        ネットワーク設定

        • [VPC]: ApsaraMQ for Kafka のメッセージは、VPC 内の Elasticsearch に配信されます。

        • [インターネット]: ApsaraMQ for Kafka のメッセージは、インターネット経由で Elasticsearch に配信されます。

        インターネット

        VPC

        Elasticsearch インスタンスが属する VPC。[ネットワーク設定] パラメータを VPC に設定した場合にのみ、このパラメータが必要です。

        vpc-bp17fapfdj0dwzjkd****

        vSwitch

        Elasticsearch インスタンスが属する vSwitch。[ネットワーク設定] パラメータを VPC に設定した場合にのみ、このパラメータが必要です。

        vsw-bp1gbjhj53hdjdkg****

        セキュリティグループ

        Elasticsearch インスタンスが属するセキュリティグループ。[ネットワーク設定] パラメータを VPC に設定した場合にのみ、このパラメータが必要です。

        test_group

    • タスクのプロパティ

      イベントのプッシュに失敗した場合に使用する再試行ポリシーと、障害の処理に使用するメソッドを設定します。詳細については、再試行ポリシーと配信不能キューをご参照ください。

  5. [保存] をクリックします。[タスク] ページで、作成した Elasticsearch シンクコネクタを見つけます。[ステータス] 列のステータスが [開始中] から [実行中] に変わると、コネクタが作成されます。

手順 3:Elasticsearch シンクコネクタのテスト

  1. [タスク] ページで、作成した Elasticsearch シンクコネクタを見つけ、[イベントソース] 列のソース トピックをクリックします。

  2. トピック詳細ページで、[メッセージの送信] をクリックします。
  3. [メッセージの送受信を開始] パネルで、次の図に基づいてパラメータを設定し、[OK] をクリックします。

    发送消息

  4. Elasticsearch コンソールにログインし、Kibana を使用してインスタンスにアクセスします。詳細については、はじめにをご参照ください。

  5. Elasticsearch クラスタの Kibana コンソールで、次のコマンドを実行してデータ挿入結果を表示します。

    GET /{Index name}/_search

    次の図は、データ挿入結果を示しています。测试结果