すべてのプロダクト
Search
ドキュメントセンター

Function Compute:Kafka トリガー

最終更新日:Apr 01, 2026

ApsaraMQ for Kafka トリガーは、Kafka トピックにメッセージが発行されると Function Compute (FC) 関数を呼び出し、ポーリングなしでイベント駆動型の処理を可能にします。

仕組み

ApsaraMQ for Kafka は EventBridge を介して Function Compute と統合されます。Function Compute コンソールでトリガーを作成すると、FC はトリガーの設定に基づいて EventBridge に イベントストリーム を自動的に作成します。

トリガーがアクティブになると、指定されたトピックをモニタリングします。メッセージが ApsaraMQ for Kafka に発行されると、EventBridge はバッチ設定に基づいて、1 つ以上のメッセージをバッチで関連付けられた関数に配信します。

制限事項

  • ApsaraMQ for Kafka インスタンスは、関数と同じリージョンに存在する必要があります。

  • イベントストリームの数が上限に達した場合、追加の Kafka トリガーは作成できません。上限については、「制限事項」をご参照ください。

前提条件

開始する前に、以下を確認してください。

ステップ 1:トリガーの作成

  1. Function Compute コンソールにログインし、対象の関数を開きます。

  2. 設定]タブで、[トリガー]ページに移動し、[トリガーの作成]をクリックします。

  3. 次の表に示すトリガーパラメーターを設定し、[OK] をクリックします。

image
パラメーター必須説明
コンシューマーオフセットはいEventBridge がメッセージのプルを開始するポイントです。Earliest Offset:利用可能な最も古いメッセージから開始します。Latest Offset:新しいメッセージのみから開始します。Earliest Offset
呼び出し方法はいイベントまたはイベントのバッチが配信された際に、関数をどのように呼び出すかを指定します。Sync Invocation:次のバッチを処理する前に応答を待ちます。ペイロード制限は 32 MB です。詳細については、「Synchronous invocation」をご参照ください。Async Invocation:即座に応答を返し、次のバッチの処理を続行します。ペイロード制限は 128 KB です。詳細については、「Asynchronous invocation」をご参照ください。Sync Invocation
最大配信同時実行数いいえFunction Compute に同時に配信される Kafka メッセージの最大数です。有効な値:1~300。Sync Invocation のみで使用できます。制限を引き上げるには、EventBridge のクォータセンターにアクセスし、EventStreaming FC Sink Maximum Concurrent Number of Synchronous Posting を検索して、Apply をクリックしてください。1

プッシュ設定、リトライポリシー、デッドレターキューなどの詳細設定については、「トリガーの高度な機能」をご参照ください。

ステップ 2:(オプション) テストパラメーターの設定

トリガーは Kafka メッセージを event パラメーターとして関数に渡します。実際のメッセージを発行せずに機能をテストするには、トリガーのペイロードをシミュレートするテストイベントを手動で設定します。

注:シミュレーションテストは解析ロジックの検証に役立ちますが、完全なトリガーパイプラインをテストするものではありません。エンドツーエンドの動作を確認するには、ステップ 3 で説明するように、実際の Kafka メッセージを使用してください。
  1. [Code] タブで、[Test Function] の隣にある image.png アイコンをクリックし、[Configure Test Parameters] を選択します。

  2. テストパラメーターの設定」パネルで、[新しいテストイベントの作成] または [既存のテストイベントの編集] をクリックし、名前とイベントの内容を入力して、[OK] をクリックします。

次の例は、バッチ処理された 2 つのメッセージの event 構造を示しています。

[
    {
        "specversion": "1.0",
        "id": "8e215af8-ca18-4249-8645-f96c1026****",
        "source": "acs:alikafka",
        "type": "alikafka:Topic:Message",
        "subject": "acs:alikafka_pre-cn-i7m2t7t1****:topic:mytopic",
        "datacontenttype": "application/json; charset=utf-8",
        "time": "2022-06-23T02:49:51.589Z",
        "aliyunaccountid": "164901546557****",
        "data": {
            "topic": "****",
            "partition": 7,
            "offset": 25,
            "timestamp": 1655952591589,
            "headers": {
                "headers": [],
                "isReadOnly": false
            },
            "key": "keytest",
            "value": "hello kafka msg"
        }
    },
    {
        "specversion": "1.0",
        "id": "8e215af8-ca18-4249-8645-f96c1026****",
        "source": "acs:alikafka",
        "type": "alikafka:Topic:Message",
        "subject": "acs:alikafka_pre-cn-i7m2t7t1****:topic:mytopic",
        "datacontenttype": "application/json; charset=utf-8",
        "time": "2022-06-23T02:49:51.589Z",
        "aliyunaccountid": "164901546557****",
        "data": {
            "topic": "****",
            "partition": 7,
            "offset": 25,
            "timestamp": 1655952591589,
            "headers": {
                "headers": [],
                "isReadOnly": false
            },
            "key": "keytest",
            "value": "hello kafka msg"
        }
    }
]

data オブジェクトには、Kafka 固有のフィールドが含まれています。

フィールドタイプ説明
topicStringTopicNameトピック名です。
partitionInt1メッセージが消費されたパーティション番号です。
offsetInt0パーティション内のメッセージオフセットです。
timestampString1655952591589メッセージの消費が開始された日時を示す UNIX タイムスタンプ (ミリ秒) です。

CloudEvents エンベロープ フィールド(specversionidsourcetype など)については、「イベントの概要」をご参照ください。

ステップ 3:関数コードの作成とテスト

  1. [コード]」タブで、エディターに機能コードを記述し、[デプロイ] をクリックします。次の Node.js の例は、関数ハンドラを示しています。

    'use strict';
    /*
    初期化関数機能を有効にするには、
    以下のように initializer 関数を実装してください:
    exports.initializer = (context, callback) => {
      console.log('initializing');
      callback(null, '');
    };
    */
    exports.handler = (event, context, callback) => {
      console.log("event: %s", event);
      // イベントパラメーターを解析し、イベントを処理します。
      callback(null, 'return result');
    }
  2. 次のいずれかの方法で関数をテストします。

    • シミュレートされたテスト: [テスト関数] をクリックして、手順 2 で設定したテストイベントを使用します。これは、実際の Kafka 接続を必要とせずに解析ロジックを検証するのに役立ちます。

    • エンドツーエンドテスト: ApsaraMQ for Kafka コンソール にログインし、トピックを選択して、[メッセージの送信] をクリックして実際のメッセージを公開します。トリガーがメッセージを検出し、関数を自動的に呼び出します。

    image

  3. 実行後、[リアルタイムログ] で出力を確認します。

    image

次のステップ

既存のトリガーを変更または削除するには、「トリガーの管理」をご参照ください。