このトピックでは、ApsaraMQ for Kafka インスタンスのソース トピックから Function Compute の関数にデータを同期するために、Function Compute シンクコネクタを作成する方法について説明します。
前提条件
前提条件については、前提条件 をご参照ください。使用上の注意
- デスティネーション関数のタイプとしてイベント関数を必ず選択してください。
- 同じリージョンにある Message Queue for Apache Kafka から Function Compute にメッセージを配信する場合、データは内部ネットワーク経由で転送されます。この場合、インターネット トラフィック料金は発生しません。
手順 1:Function Compute リソースの作成
Function Compute で関数を作成します。詳細については、関数を簡単に作成する をご参照ください。
この例では、Python ランタイム環境で実行される guide-fc-sink-service サービス用に、sink-fc という名前のイベント関数が作成されます。関数コード:
# -*- coding: utf-8 -*-
import logging
import json
def handler(event, context):
logger = logging.getLogger()
evt = json.loads(event)
logger.info(evt)
return "success"
# イベントとコンテキストを受け取り、ログに記録し、「success」を返します。
手順 2:Function Compute シンクコネクタの作成と開始
ApsaraMQ for Kafka コンソール にログオンします。リソースの分布 セクションの 概要 ページで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。
- 左側のナビゲーション ペインで、 を選択します。
- [メッセージのアウトフロー] ページで、[タスクの作成] をクリックします。
- [メッセージのアウトフロー タスクの作成] パネルで、パラメータを設定し、[確認] をクリックします。
- [基本情報] セクションで、[タスク名] パラメータを設定し、関数計算[メッセージのアウトフロー タスク タイプ] ドロップダウン リストから を選択します。
- [リソース設定] セクションで、パラメータを設定します。次の表にパラメータを示します。
表 1. ソース Message Queue for Apache Kafka パラメータ 説明 例 リージョン Message Queue for Apache Kafka インスタンスが存在するリージョンを選択します。 中国 (杭州) Message Queue for Apache Kafka インスタンス Message Queue for Apache Kafka インスタンスの ID。 alikafka_post-cn-9hdsbdhd**** トピック Message Queue for Apache Kafka インスタンスのソース トピック。 guide-sink-topic グループ ID Message Queue for Apache Kafka インスタンスのグループの ID。 - [クイック作成]: システムは、ID が GID_EVENTBRIDGE_xxx 形式のグループを自動的に作成します。
- [既存のグループを使用]: 使用されていない既存のグループの ID を選択します。使用されている既存のグループを選択すると、既存のメッセージの発行と購読に影響します。
既存のグループを使用 同時実行クォータ (コンシューマー) トピック内のデータを同時に消費するスレッドの数。次の項目では、トピック内のスレッドとパーティション間のマッピングについて説明します。 - トピック内のパーティションの数が、トピック内のデータを同時に消費するスレッドの数と同じ場合、1 つのスレッドが 1 つのパーティション内のデータを消費します。この設定を使用することをお勧めします。
- トピック内のパーティションの数が、トピック内のデータを同時に消費するスレッドの数よりも多い場合、スレッドはすべてのパーティション内のデータを均等に消費します。
- トピック内のパーティションの数が、トピック内のデータを同時に消費するスレッドの数よりも少ない場合、1 つのスレッドが 1 つのパーティション内のデータを消費します。追加のスレッドはデータを消費しません。
2 コンシューマー オフセット - [最新のオフセット]: 最新のオフセットからメッセージを消費します。
- [最も古いオフセット]: 最も古いオフセットからメッセージを消費します。
最新のオフセット ネットワーク設定 国境を越えたデータ転送が必要な場合は、[インターネット] を選択します。それ以外の場合は、[デフォルト ネットワーク] を選択します。 デフォルト ネットワーク 表 2. ターゲット Function Compute パラメータ 説明 例 サービス 作成した Function Compute サービスを選択します。 guide-fc-sink-service 関数 作成した Function Compute 関数を選択します。 sink-fc 呼び出しモード - [非同期]: 非同期モードで Function Compute にリクエストを送信します。このモードでは、ペイロードのサイズは 128 KB に制限されています。
- [同期]: 同期モードで Function Compute にリクエストを送信します。このモードでは、ペイロードのサイズは 32 MB に制限されています。同期リクエストのタイムアウト期間は 5 分です。
同期 バッチ プッシュ バッチ プッシュ機能を使用して、複数のイベントをバッチに集約できます。メッセージ パラメータまたは 間隔 パラメータで指定された条件が満たされると、バッチ プッシュがトリガーされます。たとえば、メッセージ パラメータを 100 に設定し、間隔パラメータを 15 秒に設定した場合、メッセージの数が 10 秒で 100 に達すると、バッチ プッシュがすぐにトリガーされます。有効な値: - [有効]
- [無効]: このパラメータを無効に設定すると、一度に 1 つのメッセージのみが Function Compute に配信されます。
有効 メッセージ 関数呼び出しごとに送信できるメッセージの最大数。バックログ内のメッセージの数が指定された値に達した場合にのみ、リクエストが送信されます。有効な値:[1,10000]。バッチ プッシュ パラメータが [有効] に設定されている場合にのみ、このパラメータが必要です。 100 間隔 関数を呼び出す間隔。指定された間隔に達すると、システムは集約されたメッセージを Function Compute に送信します。有効な値:0 ~ 15。単位:秒。値 0 は、集約後すぐにメッセージが送信されることを指定します。バッチ プッシュ パラメータが [有効] に設定されている場合にのみ、このパラメータが必要です。 10
上記の手順を実行した後、[メッセージのアウトフロー] ページに移動し、作成した Function Compute シンクコネクタを見つけ、開始[アクション] 列の ステータス開始実行中 をクリックします。 列のステータスが から に変わると、コネクタが作成されます。
手順 3:Function Compute シンクコネクタのテスト
- [メッセージのアウトフロー] ページで、作成した Function Compute シンクコネクタを見つけ、[イベント ソース] 列のソース トピックの名前をクリックします。
- トピック詳細ページで、[メッセージの送信] をクリックします。
- [メッセージの送受信を開始] パネルで、次の図に基づいてパラメータを設定し、[OK] をクリックします。

- [メッセージのアウトフロー] ページで、作成した Function Compute シンクコネクタを見つけ、[イベント ターゲット] 列のデスティネーション関数の名前をクリックします。
- [ログ] タブをクリックします。次に、[リクエスト] タブをクリックして、呼び出しログを表示します。
