ApsaraMQ for RocketMQ トリガーを使用して、トピックにメッセージが到着するたびに Function Compute (FC) 関数を呼び出します。FC は、イベントルーティングと配信を処理する EventBridge を介して ApsaraMQ for RocketMQ と統合されています。このトピックでは、トリガーの作成、イベントペイロードの理解、機能コードの記述、および設定のテストについて説明します。
仕組み
Function Compute コンソールで ApsaraMQ for RocketMQ トリガーを作成すると、FC はトリガー構成に基づいて EventBridge にイベントストリームを自動的に作成します。トリガーがアクティブになると、ソースとなるトピックに到着したメッセージは、バッチ構成に応じて、個別にまたはバッチで関数に配信されます。
ApsaraMQ for RocketMQ トリガーは、エンドツーエンドのストリーミングデータ処理シナリオ向けに設計されています。
制限事項
ApsaraMQ for RocketMQ インスタンスは、Function Compute 関数と同じリージョンに存在する必要があります。
アカウント内のイベントストリームの数が上限に達した場合、追加の ApsaraMQ for RocketMQ トリガーを作成することはできません。クォータ情報については、「使用制限」をご参照ください。
前提条件
開始する前に、以下を準備してください。
EventBridge をアクティブ化し、必要な権限を付与済みであること。「EventBridge をアクティブ化し、RAM ユーザーに権限を付与する」をご参照ください。
Function Compute でイベント関数を作成済みであること。「イベント関数の作成」をご参照ください。
ApsaraMQ for RocketMQ をアクティブ化し、必要な権限を付与済みであること。「ApsaraMQ for RocketMQ をアクティブ化し、権限を付与する」をご参照ください。
ApsaraMQ for RocketMQ インスタンス、トピック、およびグループ ID を作成済みであること。「インスタンス、トピック、およびグループ ID の作成」をご参照ください。
ステップ 1: トリガーの作成
Function Compute コンソールにログインし、関数詳細ページに移動します。
[設定]タブをクリックし、次にトリガーページで[トリガーの作成]をクリックします。
以下の表に記載されているトリガーのパラメーターを設定し、[OK] をクリックします。

| パラメーター | 説明 | 例 |
|---|---|---|
| コンシューマーオフセット | ApsaraMQ for RocketMQ がメッセージのプルを開始するオフセットです。有効な値:[最新オフセット] (最新のメッセージから開始します)、[最古オフセット] (最も古い利用可能なメッセージから開始します)、[タイムスタンプ] (特定の時点から開始します)。 | 最新オフセット |
| 呼び出し方法 | イベントが発生したときに、関数がどのように呼び出されるかを示します。[同期呼び出し]:Function Compute は関数を実行し、応答を待機してから次のイベントまたはバッチを処理します。最大ペイロード:32 MB。詳細については、「同期呼び出し」をご参照ください。[非同期呼び出し]:Function Compute は次のイベントにすぐに移行し、関数が完了するのを待機しません。最大ペイロード:128 KB。詳細については、「非同期呼び出し」をご参照ください。 | 同期呼び出し |
プッシュ構成、リトライポリシー、デッドレターキューなどの高度な設定については、「トリガーの高度な機能」をご参照ください。
ステップ 2: テストパラメーターの設定 (オプション)
ApsaraMQ for RocketMQ は、event 配列として関数にメッセージを配信します。実際のメッセージを送信する前に、トリガーイベントをシミュレートして機能コードを検証します。
関数の詳細ページの [コード] タブで、[テスト関数] の横にある
アイコンをクリックし、[テストパラメーターの設定] を選択します。「[テストパラメーターの設定]」パネルで、「[新しいテストイベントを作成]」または「[既存のテストイベントを編集]」をクリックし、名前とイベントの内容を入力してから、「[OK]」をクリックします。
以下は、2つのメッセージを含むサンプル event ペイロードです。
[
{
"id": "94ebc15f-f0db-4bbe-acce-56fb72fb****",
"source": "RocketMQ-Function-rocketmq-trigger",
"specversion": "1.0",
"type": "mq:Topic:SendMessage",
"datacontenttype": "application/json; charset=utf-8",
"subject": "acs:mq:cn-hangzhou:164901546557****:MQ_INST_164901546557****_BXhFHryi%TopicName",
"time": "2021-04-08T06:01:20.766Z",
"aliyunaccountid": "164901546557****",
"aliyunpublishtime": "2021-10-15T02:05:16.791Z",
"aliyunoriginalaccountid": "164901546557****",
"aliyuneventbusname": "RocketMQ-Function-rocketmq-trigger",
"aliyunregionid": "cn-chengdu",
"aliyunpublishaddr": "42.120.XX.XX",
"data": {
"topic": "TopicName",
"systemProperties": {
"MIN_OFFSET": "0",
"TRACE_ON": "true",
"MAX_OFFSET": "8",
"MSG_REGION": "cn-hangzhou",
"KEYS": "systemProperties.KEYS",
"CONSUME_START_TIME": 1628577790396,
"TAGS": "systemProperties.TAGS",
"INSTANCE_ID": "MQ_INST_164901546557****_BXhFHryi"
},
"userProperties": {},
"body": "TEST"
}
},
{
"id": "94ebc15f-f0db-4bbe-acce-56fb72fb****",
"source": "RocketMQ-Function-rocketmq-trigger",
"specversion": "1.0",
"type": "mq:Topic:SendMessage",
"datacontenttype": "application/json; charset=utf-8",
"subject": "acs:mq:cn-hangzhou:164901546557****:MQ_INST_164901546557****_BXhFHryi%TopicName",
"time": "2021-04-08T06:01:20.766Z",
"aliyunaccountid": "164901546557****",
"aliyunpublishtime": "2021-10-15T02:05:16.791Z",
"aliyunoriginalaccountid": "164901546557****",
"aliyuneventbusname": "RocketMQ-Function-rocketmq-trigger",
"aliyunregionid": "cn-chengdu",
"aliyunpublishaddr": "42.120.XX.XX",
"data": {
"topic": "TopicName",
"systemProperties": {
"MIN_OFFSET": "0",
"TRACE_ON": "true",
"MAX_OFFSET": "8",
"MSG_REGION": "cn-hangzhou",
"KEYS": "systemProperties.KEYS",
"CONSUME_START_TIME": 1628577790396,
"TAGS": "systemProperties.TAGS",
"INSTANCE_ID": "MQ_INST_164901546557****_BXhFHryi"
},
"userProperties": {},
"body": "TEST"
}
}
]配列内の各要素は、CloudEvents 形式のメッセージです。トップレベルフィールド (id、source、specversion など) は CloudEvents 仕様に準拠しています。詳細については、「概要」をご参照ください。data オブジェクトには RocketMQ メッセージ本文が含まれています。
| フィールド | タイプ | 例 | 説明 |
|---|---|---|---|
topic | String | TopicName | トピック名。 |
systemProperties | Map | RocketMQ システムレベルのメッセージプロパティ。 | |
systemProperties.MIN_OFFSET | Int | 0 | キュー内の最小オフセット。 |
systemProperties.TRACE_ON | Boolean | true | メッセージトレースが存在するかどうか。 |
systemProperties.MAX_OFFSET | Int | 8 | キュー内の最大オフセット |
systemProperties.MSG_REGION | String | cn-hangzhou | メッセージが送信されたリージョン。 |
systemProperties.KEYS | String | systemProperties.KEYS | メッセージのフィルタリングに使用されるキー。 |
systemProperties.CONSUME_START_TIME | Long | 1628577790396 | メッセージ消費開始時刻 (ミリ秒)。 |
systemProperties.UNIQ_KEY | String | AC14C305069E1B28CDFA3181CDA2**** | メッセージの一意キー。 |
systemProperties.TAGS | String | systemProperties.TAGS | メッセージのフィルタリングに使用されるタグ。 |
systemProperties.INSTANCE_ID | String | MQ_INST_123456789098****_BXhFHryi | ApsaraMQ for RocketMQ インスタンス ID。 |
userProperties | Map | ユーザープロパティ。 | |
body | String | TEST | メッセージ本文。 |
ステップ 3: 機能の記述とテスト
トリガーが作成されたら、機能コードを記述してテストします。
機能コードの記述
関数詳細ページの[コード]タブで、ハンドラコードを記述し、[コードをデプロイ]をクリックします。
以下は、イベントを処理する方法を示す Node.js の例です。
'use strict';
exports.handler = (event, context, callback) => {
console.log("event: %s", event);
// イベントパラメーターを解析し、イベントを処理します。
callback(null, 'return result');
}機能のテスト
2つの方法があります。
方法 1: テストパラメーターを使用したシミュレーション
手順 2 でテストイベントを設定した場合は、[テスト関数] を [コード] タブでクリックします。
方法 2: 実際のメッセージの送信
ApsaraMQ for RocketMQ コンソールにログオンし、作成したトピックを選択して、[メッセージの送信] をクリックします。

関数の実行後、[リアルタイムログ] で出力を表示します。
