ApsaraMQ for Kafka トリガーは、Kafka トピックにメッセージが発行されると Function Compute (FC) 関数を呼び出し、ポーリングなしでイベント駆動型の処理を可能にします。
仕組み
ApsaraMQ for Kafka は EventBridge を介して Function Compute と統合されます。Function Compute コンソールでトリガーを作成すると、FC はトリガーの設定に基づいて EventBridge に イベントストリーム を自動的に作成します。
トリガーがアクティブになると、指定されたトピックをモニタリングします。メッセージが ApsaraMQ for Kafka に発行されると、EventBridge はバッチ設定に基づいて、1 つ以上のメッセージをバッチで関連付けられた関数に配信します。
制限事項
ApsaraMQ for Kafka インスタンスは、関数と同じリージョンに存在する必要があります。
イベントストリームの数が上限に達した場合、追加の Kafka トリガーは作成できません。上限については、「制限事項」をご参照ください。
前提条件
開始する前に、以下を確認してください。
EventBridge:EventBridge を有効化し、RAM ユーザーに権限を付与していること
Function Compute:イベント関数
ApsaraMQ for Kafka:デプロイ済みのインスタンスとトピックおよびコンシューマーグループ
ステップ 1:トリガーの作成
Function Compute コンソールにログインし、対象の関数を開きます。
[設定]タブで、[トリガー]ページに移動し、[トリガーの作成]をクリックします。
次の表に示すトリガーパラメーターを設定し、[OK] をクリックします。

| パラメーター | 必須 | 説明 | 例 |
|---|---|---|---|
| コンシューマーオフセット | はい | EventBridge がメッセージのプルを開始するポイントです。Earliest Offset:利用可能な最も古いメッセージから開始します。Latest Offset:新しいメッセージのみから開始します。 | Earliest Offset |
| 呼び出し方法 | はい | イベントまたはイベントのバッチが配信された際に、関数をどのように呼び出すかを指定します。Sync Invocation:次のバッチを処理する前に応答を待ちます。ペイロード制限は 32 MB です。詳細については、「Synchronous invocation」をご参照ください。Async Invocation:即座に応答を返し、次のバッチの処理を続行します。ペイロード制限は 128 KB です。詳細については、「Asynchronous invocation」をご参照ください。 | Sync Invocation |
| 最大配信同時実行数 | いいえ | Function Compute に同時に配信される Kafka メッセージの最大数です。有効な値:1~300。Sync Invocation のみで使用できます。制限を引き上げるには、EventBridge のクォータセンターにアクセスし、EventStreaming FC Sink Maximum Concurrent Number of Synchronous Posting を検索して、Apply をクリックしてください。 | 1 |
プッシュ設定、リトライポリシー、デッドレターキューなどの詳細設定については、「トリガーの高度な機能」をご参照ください。
ステップ 2:(オプション) テストパラメーターの設定
トリガーは Kafka メッセージを event パラメーターとして関数に渡します。実際のメッセージを発行せずに機能をテストするには、トリガーのペイロードをシミュレートするテストイベントを手動で設定します。
注:シミュレーションテストは解析ロジックの検証に役立ちますが、完全なトリガーパイプラインをテストするものではありません。エンドツーエンドの動作を確認するには、ステップ 3 で説明するように、実際の Kafka メッセージを使用してください。
[Code] タブで、[Test Function] の隣にある
アイコンをクリックし、[Configure Test Parameters] を選択します。「テストパラメーターの設定」パネルで、[新しいテストイベントの作成] または [既存のテストイベントの編集] をクリックし、名前とイベントの内容を入力して、[OK] をクリックします。
次の例は、バッチ処理された 2 つのメッセージの event 構造を示しています。
[
{
"specversion": "1.0",
"id": "8e215af8-ca18-4249-8645-f96c1026****",
"source": "acs:alikafka",
"type": "alikafka:Topic:Message",
"subject": "acs:alikafka_pre-cn-i7m2t7t1****:topic:mytopic",
"datacontenttype": "application/json; charset=utf-8",
"time": "2022-06-23T02:49:51.589Z",
"aliyunaccountid": "164901546557****",
"data": {
"topic": "****",
"partition": 7,
"offset": 25,
"timestamp": 1655952591589,
"headers": {
"headers": [],
"isReadOnly": false
},
"key": "keytest",
"value": "hello kafka msg"
}
},
{
"specversion": "1.0",
"id": "8e215af8-ca18-4249-8645-f96c1026****",
"source": "acs:alikafka",
"type": "alikafka:Topic:Message",
"subject": "acs:alikafka_pre-cn-i7m2t7t1****:topic:mytopic",
"datacontenttype": "application/json; charset=utf-8",
"time": "2022-06-23T02:49:51.589Z",
"aliyunaccountid": "164901546557****",
"data": {
"topic": "****",
"partition": 7,
"offset": 25,
"timestamp": 1655952591589,
"headers": {
"headers": [],
"isReadOnly": false
},
"key": "keytest",
"value": "hello kafka msg"
}
}
]data オブジェクトには、Kafka 固有のフィールドが含まれています。
| フィールド | タイプ | 例 | 説明 |
|---|---|---|---|
topic | String | TopicName | トピック名です。 |
partition | Int | 1 | メッセージが消費されたパーティション番号です。 |
offset | Int | 0 | パーティション内のメッセージオフセットです。 |
timestamp | String | 1655952591589 | メッセージの消費が開始された日時を示す UNIX タイムスタンプ (ミリ秒) です。 |
CloudEvents エンベロープ フィールド(specversion、id、source、type など)については、「イベントの概要」をご参照ください。
ステップ 3:関数コードの作成とテスト
「[コード]」タブで、エディターに機能コードを記述し、[デプロイ] をクリックします。次の Node.js の例は、関数ハンドラを示しています。
'use strict'; /* 初期化関数機能を有効にするには、 以下のように initializer 関数を実装してください: exports.initializer = (context, callback) => { console.log('initializing'); callback(null, ''); }; */ exports.handler = (event, context, callback) => { console.log("event: %s", event); // イベントパラメーターを解析し、イベントを処理します。 callback(null, 'return result'); }次のいずれかの方法で関数をテストします。
シミュレートされたテスト: [テスト関数] をクリックして、手順 2 で設定したテストイベントを使用します。これは、実際の Kafka 接続を必要とせずに解析ロジックを検証するのに役立ちます。
エンドツーエンドテスト: ApsaraMQ for Kafka コンソール にログインし、トピックを選択して、[メッセージの送信] をクリックして実際のメッセージを公開します。トリガーがメッセージを検出し、関数を自動的に呼び出します。

実行後、[リアルタイムログ] で出力を確認します。

次のステップ
既存のトリガーを変更または削除するには、「トリガーの管理」をご参照ください。