このトピックでは、ApsaraMQ for Kafka インスタンストピックのデータを Elasticsearch にエクスポートする Elasticsearch シンクコネクタを作成する方法について説明します。
前提条件
前提条件については、前提条件をご参照ください。
手順 1:Elasticsearch リソースの作成
Elasticsearch コンソールでインスタンスとインデックスを作成します。詳細については、はじめにをご参照ください。
Function Compute へのアクセスに使用するエンドポイントの CIDR ブロックを、Elasticsearch クラスタの IP アドレスホワイトリストに追加します。この操作は、仮想プライベートクラウド(VPC)内の Elasticsearch にアクセスする場合にのみ実行されます。ホワイトリストの設定方法については、Elasticsearch クラスタのパブリックまたはプライベート IP アドレスホワイトリストの設定をご参照ください。
手順 2:Elasticsearch シンクコネクタの作成
ApsaraMQ for Kafka コンソールにログインします。リソースの分布 セクションの 概要 ページで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。
左側のナビゲーションペインで、 を選択します。
[タスク] ページで、[タスクの作成] をクリックします。
タスクの作成 ページで、タスク名 パラメータと 説明 パラメータを設定します。次に、画面の指示に従って他のパラメータを設定します。
タスクの作成
Source (ソース) 手順で、データプロバイダー パラメータを [message Queue For Apache Kafka] に設定し、画面の指示に従って他のパラメータを設定します。次に、[次の手順] をクリックします。次の表にパラメータを示します。
パラメータ
説明
例
Region
ApsaraMQ for Kafka インスタンスが存在するリージョン。
中国 (杭州)
Message Queue For Apache Kafka インスタンス
ルーティングするデータが生成される ApsaraMQ for Kafka インスタンスの ID。
alikafka_post-cn-9hdsbdhd****
Topic
ルーティングするデータが生成される ApsaraMQ for Kafka インスタンストピック。
guide-sink-topic
グループ ID
ルーティングするデータが生成される ApsaraMQ for Kafka インスタンスのグループの ID。
[クイック作成]: システムは、ID が GID_EVENTBRIDGE_xxx 形式のグループを自動的に作成します。
[既存のグループを使用]: 使用されていない既存のグループの ID を選択します。使用中の既存のグループを選択すると、既存のメッセージの発行と購読に影響します。
既存のグループを使用
Consumer Offset
[最新のオフセット]: メッセージは最新のオフセットから消費されます。
[最も古いオフセット]: メッセージは最も古いオフセットから消費されます。
最新のオフセット
ネットワーク設定
国境を越えたデータ送信が必要な場合は、[インターネット] を選択します。それ以外の場合は、[ベーシックネットワーク] を選択します。
ベーシックネットワーク
データ形式
データ形式機能は、ソースから配信されたバイナリデータを特定のデータ形式にエンコードするために使用されます。複数のデータ形式がサポートされています。エンコードに関する特別な要件がない場合は、値として Json を指定します。
Json: バイナリデータは UTF-8 エンコーディングに基づいて JSON 形式のデータにエンコードされ、ペイロードに配置されます。これはデフォルト値です。
Text: バイナリデータは UTF-8 エンコーディングに基づいて文字列にエンコードされ、ペイロードに配置されます。
Binary: バイナリデータは Base64 エンコーディングに基づいて文字列にエンコードされ、ペイロードに配置されます。
Json
一括プッシュの件数
各関数呼び出しで送信できるメッセージの最大数。リクエストは、バックログ内のメッセージ数が指定された値に達した場合にのみ送信されます。有効な値: 1 ~ 10000。
2000
バッチプッシュ間隔 (単位:秒)
関数が呼び出される時間間隔。システムは、指定された時間間隔で集約されたメッセージを Function Compute に送信します。有効な値: 0 ~ 15。単位: 秒。値 0 は、集約後すぐにメッセージが送信されることを示します。
3
Filtering (フィルタリング) 手順で、データをフィルタリングするデータパターンを定義します。詳細については、メッセージのフィルタリングをご参照ください。
Transform (変換) 手順で、分割、マッピング、エンリッチメント、動的ルーティングなどのデータ処理機能を実装するデータクレンジング方法を指定します。詳細については、Function Compute を使用したメッセージクレンジングの実行をご参照ください。
Sink (ターゲット) 手順で、サービスタイプ パラメータを [elasticsearch Acs.elasticsearch] に設定し、他のパラメータを設定します。次の表にパラメータを示します。
パラメータ
説明
例
Elasticsearch クラスタ
作成した Alibaba Cloud Elasticsearch クラスタ。
es-cn-pe336j0gj001e****
クラスタログイン名
Elasticsearch クラスタの作成時に指定したログイン名。デフォルトのログイン名: elastic。
elastic
クラスタログインパスワード
Elasticsearch クラスタの作成時に指定したパスワード。
******
インデックス名
作成したインデックスの名前。インデックスの作成方法については、手順 3:インデックスの作成をご参照ください。文字列定数または JSONPath 構文を使用して抽出されたバリアントがサポートされています。例: product_info および $.data.key。
product_info
ドキュメントタイプ
ドキュメントタイプ。文字列定数または JSONPath 構文を使用して抽出されたバリアントがサポートされています。
例: _doc および $.data.key。
説明このパラメータは、Elasticsearch インスタンスのバージョンが 7 より前の場合にのみ使用できます。このパラメータのデフォルト値は _doc です。
_doc
ドキュメント
イベントのすべてのコンテンツまたはイベントの特定のコンテンツを Elasticsearch に配信するかどうかを指定します。このパラメータに [部分イベント] を選択した場合は、JSONPath 抽出ルールを指定する必要があります。
完全なイベント
ネットワーク設定
[VPC]: ApsaraMQ for Kafka のメッセージは、VPC 内の Elasticsearch に配信されます。
[インターネット]: ApsaraMQ for Kafka のメッセージは、インターネット経由で Elasticsearch に配信されます。
インターネット
VPC
Elasticsearch インスタンスが属する VPC。[ネットワーク設定] パラメータを VPC に設定した場合にのみ、このパラメータが必要です。
vpc-bp17fapfdj0dwzjkd****
vSwitch
Elasticsearch インスタンスが属する vSwitch。[ネットワーク設定] パラメータを VPC に設定した場合にのみ、このパラメータが必要です。
vsw-bp1gbjhj53hdjdkg****
セキュリティグループ
Elasticsearch インスタンスが属するセキュリティグループ。[ネットワーク設定] パラメータを VPC に設定した場合にのみ、このパラメータが必要です。
test_group
タスクのプロパティ
イベントのプッシュに失敗した場合に使用する再試行ポリシーと、障害の処理に使用するメソッドを設定します。詳細については、再試行ポリシーと配信不能キューをご参照ください。
[保存] をクリックします。[タスク] ページで、作成した Elasticsearch シンクコネクタを見つけます。[ステータス] 列のステータスが [開始中] から [実行中] に変わると、コネクタが作成されます。
手順 3:Elasticsearch シンクコネクタのテスト
[タスク] ページで、作成した Elasticsearch シンクコネクタを見つけ、[イベントソース] 列のソース トピックをクリックします。
- トピック詳細ページで、[メッセージの送信] をクリックします。
[メッセージの送受信を開始] パネルで、次の図に基づいてパラメータを設定し、[OK] をクリックします。

Elasticsearch コンソールにログインし、Kibana を使用してインスタンスにアクセスします。詳細については、はじめにをご参照ください。
Elasticsearch クラスタの Kibana コンソールで、次のコマンドを実行してデータ挿入結果を表示します。
GET /{Index name}/_search次の図は、データ挿入結果を示しています。
