Apache RocketMQ をイベントソースとして EventBridge および Function Compute と統合すると、Apache RocketMQ トリガーにより、Apache RocketMQ に公開されたメッセージを処理する関連関数を実行できます。本トピックでは、Apache RocketMQ トリガーの作成、入力パラメーターの設定、および Function Compute コンソールでのコード記述・テスト方法について説明します。
背景情報
Function Compute コンソールでトリガー作成リクエストを送信すると、Function Compute は、トリガー構成に基づき、EventBridge 内に自動的に イベントストリーム リソースを作成します。
トリガーが作成されると、Function Compute コンソールでトリガー情報を確認でき、また EventBridge コンソールで自動作成されたリソース情報を確認できます。Apache RocketMQ にメッセージがキューイングされると、関数がトリガーされます。バッチ構成に応じて、1 つ以上のメッセージイベントがバッチ形式で関数にプッシュされ、処理されます。
前提条件
EventBridge
Function Compute
Apache RocketMQ
Apache RocketMQ 5.0: , クイックスタート (国際版)。
Apache RocketMQ 4.x: クイックスタート。
セルフマネージド Apache RocketMQ クラスターを作成しました。
トピックを作成する
コンシューマーグループの作成
以下のドキュメントを使用して、Apache RocketMQ クラスターの迅速なデプロイおよびメッセージの送受信を行ってください。
制限事項
イベントソースとして使用する Apache RocketMQ クラスターは、パブリックネットワーク経由または Alibaba Cloud VPC 内からアクセス可能である必要があります。
Apache RocketMQ クラスターが Alibaba Cloud VPC 内からアクセス可能な場合、VPC 接続型インスタンスと Function Compute 関数は同一リージョン内に配置する必要があります。
イベントストリームの数が上限を超えると、追加の Apache RocketMQ トリガーを作成できません。イベントストリームの上限については、「制限事項」をご参照ください。
手順 1:Apache RocketMQ トリガーの作成
Function Compute コンソール にログインします。左側ナビゲーションウィンドウで、サービスと関数 をクリックします。
上部ナビゲーションバーでリージョンを選択します。サービス ページで、目的のサービスをクリックします。
関数 ページで、目的の関数名をクリックします。
関数の詳細ページで、トリガー タブをクリックします。バージョンまたはエイリアス のドロップダウンリストから、トリガーを作成する対象のバージョンまたはエイリアスを選択し、トリガーの作成 をクリックします。
「トリガーの作成」パネルで、必要な情報を入力し、OK をクリックします。
設定項目
操作
例
トリガータイプ
セルフマネージド Apache RocketMQ を選択します。
セルフマネージド Apache RocketMQ
名前
カスタムのトリガー名を入力します。
apache-rocketmq-trigger
バージョンまたはエイリアス
デフォルト値は LATEST です。他のバージョンまたはエイリアスに対してトリガーを作成する場合は、関数の詳細ページ右上隅で該当のバージョンまたはエイリアスに切り替えてください。「バージョン」と「エイリアス」の詳細については、「バージョンの管理」および「エイリアスの管理」をご参照ください。
LATEST
アクセスポイント
クラスターの NameServer アドレスを入力します。
192.168.X.X:9876
トピック
作成済みの Apache RocketMQ インスタンスのトピックを選択します。
testTopic
グループ ID
作成済みの Apache RocketMQ インスタンスのコンシューマーグループ ID を選択します。
testGroup
FilterType
メッセージフィルタリングのタイプを選択します。有効な値は以下のとおりです。
Tag:タグによるメッセージフィルタリング。
SQL:メッセージのプロパティおよびその値と一致する SQL ステートメントを使用したメッセージフィルタリング。
Tag
Filter
FilterType を選択した後、選択したフィルタータイプに対応するフィルター文を設定する必要があります。
TagA
認証モード
認証モードを選択します。現在サポートされているのは ACL モードのみです。
ACL
ユーザー名
認証モード を ACL に設定した場合、Apache RocketMQ インスタンスのユーザー名を本人確認用に設定する必要があります。
admin
パスワード
認証モード を ACL に設定した場合、Apache RocketMQ インスタンスのパスワードを本人確認用に設定する必要があります。
******
コンシューマーオフセット
メッセージのコンシューマーオフセットとは、Apache RocketMQ がメッセージの取得を開始するイベントバス上の位置を指定するものです。以下に有効な値を示します。
最大オフセット:最新のオフセットから消費を開始します。
最小オフセット:最初のオフセットから消費を開始します。
指定タイムスタンプ:指定したタイムスタンプから消費を開始します。
最新オフセット
ネットワーク構成
メッセージルーティングに使用するネットワークタイプを選択します。有効な値は以下のとおりです。
パブリックネットワーク:パブリックネットワーク経由で Apache RocketMQ クラスターにアクセスします。
VPC:Alibaba Cloud VPC 経由で Apache RocketMQ クラスターにアクセスします。対応する VPC、vSwitch、および セキュリティグループ を選択する必要があります。
パブリックネットワーク
呼び出しモード
関数呼び出しモードを選択します。
有効な値は以下のとおりです。
同期呼び出し
トリガー状態
トリガー作成直後に有効化するかどうかを指定します。デフォルトでは [トリガーを有効化] が選択されており、作成直後にトリガーが有効化されます。
トリガーを有効化
プッシュ構成、再試行、デッドレターキューなどの詳細設定項目については、「トリガーの高度な機能」をご参照ください。
トリガーが作成されると、トリガー タブに表示されます。トリガーの変更または削除については、「トリガーの管理」をご参照ください。
手順 2:関数の入力パラメーターの設定
セルフマネージド Apache RocketMQ イベントソースは、イベント を入力パラメーターとして関数に渡します。トリガーイベントをシミュレートするために、イベント を手動で関数に渡すことができます。
- 関数の詳細ページで、コード タブをクリックし、
アイコンをクリックします。表示されるドロップダウンリストから、テストパラメーターの設定 を選択します。 テストパラメーターの設定 パネルで、新しいテストイベントの作成 タブまたは 既存のテストイベントの編集 タブをクリックします。イベント名 およびイベント内容を入力し、OK をクリックします。
イベント は以下の形式です。
[ { "msgId": "7F0000010BDD2A84AEE70DA49B57****", "topic": "testTopic", "systemProperties": { "UNIQ_KEY": "7F0000010BDD2A84AEE70DA49B57****", "CLUSTER": "DefaultCluster", "MIN_OFFSET": "0", "TAGS": "TagA", "MAX_OFFSET": "128" }, "userProperties": {}, "body": "Hello RocketMQ" } ]以下の表に、イベント フィールド内のパラメーターについて説明します。
パラメーター
タイプ
例
説明
msgId
String
7F0000010BDD2A84AEE70DA49B57****
Apache RocketMQ 内のメッセージ ID です。
topic
String
testTopic
トピック名です。
systemProperties
Map
システムプロパティーです。
UNIQ_KEY
String
7F0000010BDD2A84AEE70DA49B57****
メッセージの一意キーです。
CLUSTER
String
DefaultCluster
Apache RocketMQ クラスターの名前です。
MIN_OFFSET
Int
0
最小オフセットです。
MAX_OFFSET
Int
128
最大オフセットです。
TAGS
String
TagA
フィルター用プロパティーです。
userProperties
Map
なし
ユーザーのプロパティ。
body
String
Hello RocketMQ
メッセージ本文です。
手順 3:関数コードの記述とテスト
トリガーを作成した後、関数コードを記述・テストして、その正しさを検証します。実際の運用では、Apache RocketMQ がメッセージを受信すると、トリガーが自動的に関数を実行します。
関数の詳細ページで、コード タブをクリックします。コードエディタでコードを記述し、コードのデプロイ をクリックします。
本トピックでは Node.js コードを例として使用します。サンプルコードは以下のとおりです。
'use strict'; /* 初期化機能を有効化するには、 以下のとおり initializer 関数を実装してください: exports.initializer = (context, callback) => { console.log('初期化中'); callback(null, ''); }; */ exports.handler = (event, context, callback) => { console.log("event: %s", event); // イベントパラメーターを解析し、イベントを処理します。 callback(null, 'return result'); }- コード タブをクリックし、関数のテスト をクリックします。 関数が実行された後、コードタブで結果を確認できます。
補足情報
Function Compute コンソール以外にも、以下の方法でトリガーを設定できます。
Serverless Devs を使用してトリガーを設定します。詳細については、「トリガーの作成」をご参照ください。
SDK を使用してトリガーを設定します。詳細については、「SDK」をご参照ください。
既存のトリガーを変更または削除するには、「トリガーの管理」をご参照ください。