すべてのプロダクト
Search
ドキュメントセンター

Function Compute:RocketMQ トリガー

最終更新日:Apr 23, 2026

ApsaraMQ for RocketMQ トリガーを使用して、トピックにメッセージが到着するたびに Function Compute (FC) 関数を呼び出します。FC は、イベントルーティングと配信を処理する EventBridge を介して ApsaraMQ for RocketMQ と統合されています。このトピックでは、トリガーの作成、イベントペイロードの理解、機能コードの記述、および設定のテストについて説明します。

仕組み

Function Compute コンソールで ApsaraMQ for RocketMQ トリガーを作成すると、FC はトリガー構成に基づいて EventBridge にイベントストリームを自動的に作成します。トリガーがアクティブになると、ソースとなるトピックに到着したメッセージは、バッチ構成に応じて、個別にまたはバッチで関数に配信されます。

ApsaraMQ for RocketMQ トリガーは、エンドツーエンドのストリーミングデータ処理シナリオ向けに設計されています。

制限事項

  • ApsaraMQ for RocketMQ インスタンスは、Function Compute 関数と同じリージョンに存在する必要があります。

  • アカウント内のイベントストリームの数が上限に達した場合、追加の ApsaraMQ for RocketMQ トリガーを作成することはできません。クォータ情報については、「使用制限」をご参照ください。

前提条件

開始する前に、以下を準備してください。

ステップ 1: トリガーの作成

  1. Function Compute コンソールにログインし、関数詳細ページに移動します。

  2. 設定]タブをクリックし、次にトリガーページで[トリガーの作成]をクリックします。

  3. 以下の表に記載されているトリガーのパラメーターを設定し、[OK] をクリックします。

image
パラメーター説明
コンシューマーオフセットApsaraMQ for RocketMQ がメッセージのプルを開始するオフセットです。有効な値:[最新オフセット] (最新のメッセージから開始します)、[最古オフセット] (最も古い利用可能なメッセージから開始します)、[タイムスタンプ] (特定の時点から開始します)。最新オフセット
呼び出し方法イベントが発生したときに、関数がどのように呼び出されるかを示します。[同期呼び出し]:Function Compute は関数を実行し、応答を待機してから次のイベントまたはバッチを処理します。最大ペイロード:32 MB。詳細については、「同期呼び出し」をご参照ください。[非同期呼び出し]:Function Compute は次のイベントにすぐに移行し、関数が完了するのを待機しません。最大ペイロード:128 KB。詳細については、「非同期呼び出し」をご参照ください。同期呼び出し

プッシュ構成、リトライポリシー、デッドレターキューなどの高度な設定については、「トリガーの高度な機能」をご参照ください。

ステップ 2: テストパラメーターの設定 (オプション)

ApsaraMQ for RocketMQ は、event 配列として関数にメッセージを配信します。実際のメッセージを送信する前に、トリガーイベントをシミュレートして機能コードを検証します。

  1. 関数の詳細ページの [コード] タブで、[テスト関数] の横にある image.png アイコンをクリックし、[テストパラメーターの設定] を選択します。

  2. [テストパラメーターの設定]」パネルで、「[新しいテストイベントを作成]」または「[既存のテストイベントを編集]」をクリックし、名前とイベントの内容を入力してから、「[OK]」をクリックします。

以下は、2つのメッセージを含むサンプル event ペイロードです。

[
    {
        "id": "94ebc15f-f0db-4bbe-acce-56fb72fb****",
        "source": "RocketMQ-Function-rocketmq-trigger",
        "specversion": "1.0",
        "type": "mq:Topic:SendMessage",
        "datacontenttype": "application/json; charset=utf-8",
        "subject": "acs:mq:cn-hangzhou:164901546557****:MQ_INST_164901546557****_BXhFHryi%TopicName",
        "time": "2021-04-08T06:01:20.766Z",
        "aliyunaccountid": "164901546557****",
        "aliyunpublishtime": "2021-10-15T02:05:16.791Z",
        "aliyunoriginalaccountid": "164901546557****",
        "aliyuneventbusname": "RocketMQ-Function-rocketmq-trigger",
        "aliyunregionid": "cn-chengdu",
        "aliyunpublishaddr": "42.120.XX.XX",
        "data": {
            "topic": "TopicName",
            "systemProperties": {
                "MIN_OFFSET": "0",
                "TRACE_ON": "true",
                "MAX_OFFSET": "8",
                "MSG_REGION": "cn-hangzhou",
                "KEYS": "systemProperties.KEYS",
                "CONSUME_START_TIME": 1628577790396,
                "TAGS": "systemProperties.TAGS",
                "INSTANCE_ID": "MQ_INST_164901546557****_BXhFHryi"
            },
            "userProperties": {},
            "body": "TEST"
        }
    },
    {
        "id": "94ebc15f-f0db-4bbe-acce-56fb72fb****",
        "source": "RocketMQ-Function-rocketmq-trigger",
        "specversion": "1.0",
        "type": "mq:Topic:SendMessage",
        "datacontenttype": "application/json; charset=utf-8",
        "subject": "acs:mq:cn-hangzhou:164901546557****:MQ_INST_164901546557****_BXhFHryi%TopicName",
        "time": "2021-04-08T06:01:20.766Z",
        "aliyunaccountid": "164901546557****",
        "aliyunpublishtime": "2021-10-15T02:05:16.791Z",
        "aliyunoriginalaccountid": "164901546557****",
        "aliyuneventbusname": "RocketMQ-Function-rocketmq-trigger",
        "aliyunregionid": "cn-chengdu",
        "aliyunpublishaddr": "42.120.XX.XX",
        "data": {
            "topic": "TopicName",
            "systemProperties": {
                "MIN_OFFSET": "0",
                "TRACE_ON": "true",
                "MAX_OFFSET": "8",
                "MSG_REGION": "cn-hangzhou",
                "KEYS": "systemProperties.KEYS",
                "CONSUME_START_TIME": 1628577790396,
                "TAGS": "systemProperties.TAGS",
                "INSTANCE_ID": "MQ_INST_164901546557****_BXhFHryi"
            },
            "userProperties": {},
            "body": "TEST"
        }
    }
]

配列内の各要素は、CloudEvents 形式のメッセージです。トップレベルフィールド (idsourcespecversion など) は CloudEvents 仕様に準拠しています。詳細については、「概要」をご参照ください。data オブジェクトには RocketMQ メッセージ本文が含まれています。

フィールドタイプ説明
topicStringTopicNameトピック名。
systemPropertiesMapRocketMQ システムレベルのメッセージプロパティ。
systemProperties.MIN_OFFSETInt0キュー内の最小オフセット。
systemProperties.TRACE_ONBooleantrueメッセージトレースが存在するかどうか。
systemProperties.MAX_OFFSETInt8キュー内の最大オフセット
systemProperties.MSG_REGIONStringcn-hangzhouメッセージが送信されたリージョン。
systemProperties.KEYSStringsystemProperties.KEYSメッセージのフィルタリングに使用されるキー。
systemProperties.CONSUME_START_TIMELong1628577790396メッセージ消費開始時刻 (ミリ秒)。
systemProperties.UNIQ_KEYStringAC14C305069E1B28CDFA3181CDA2****メッセージの一意キー。
systemProperties.TAGSStringsystemProperties.TAGSメッセージのフィルタリングに使用されるタグ。
systemProperties.INSTANCE_IDStringMQ_INST_123456789098****_BXhFHryiApsaraMQ for RocketMQ インスタンス ID。
userPropertiesMapユーザープロパティ。
bodyStringTESTメッセージ本文。

ステップ 3: 機能の記述とテスト

トリガーが作成されたら、機能コードを記述してテストします。

機能コードの記述

関数詳細ページの[コード]タブで、ハンドラコードを記述し、[コードをデプロイ]をクリックします。

以下は、イベントを処理する方法を示す Node.js の例です。

'use strict';

exports.handler = (event, context, callback) => {
  console.log("event: %s", event);
  // イベントパラメーターを解析し、イベントを処理します。
  callback(null, 'return result');
}

機能のテスト

2つの方法があります。

方法 1: テストパラメーターを使用したシミュレーション

手順 2 でテストイベントを設定した場合は、[テスト関数][コード] タブでクリックします。

方法 2: 実際のメッセージの送信

ApsaraMQ for RocketMQ コンソールにログオンし、作成したトピックを選択して、[メッセージの送信] をクリックします。

image

関数の実行後、[リアルタイムログ] で出力を表示します。

image

次のステップ

  • このトリガーを変更または削除するには、「トリガーの管理」をご参照ください。

  • このトリガーによって作成されたイベントストリームを表示するには、EventBridge コンソールにログインします。「概要」をご参照ください。