すべてのプロダクト
Search
ドキュメントセンター

EventBridge:ApsaraMQ for Kafka

最終更新日:Nov 09, 2025

このトピックでは、EventBridge コンソールでイベントストリームのイベントソースとして ApsaraMQ for Kafka を設定する方法について説明します。

前提条件

手順

  1. EventBridge コンソールにログインします。 左側のナビゲーションウィンドウで、[イベントストリーム] をクリックします。

  2. 上部のナビゲーションバーでリージョンを選択し、[イベントストリームの作成] をクリックします。

  3. [イベントストリームの作成] パネルで、[タスク名][説明] を設定し、次のパラメーターを設定してから、[保存] をクリックします。

    • タスクの作成

      1. [ソース] 設定ウィザードで、[データプロバイダー][ApsaraMQ For Kafka] に設定し、次のパラメーターを設定してから、[次へ] をクリックします。

      2. パラメーター

        説明

        リージョン

        ソース ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。

        中国 (北京)

        Kafka インスタンス

        ApsaraMQ for Kafka メッセージを生成するソースインスタンスを選択します。

        MQ_INST_115964845466****_ByBeUp3p

        Topic

        ApsaraMQ for Kafka メッセージを生成する Topic を選択します。

        topic

        グループ ID

        ソースインスタンスのコンシューマーグループの名前を選択します。 専用のコンシューマーグループを使用してイベントソースを作成します。 既存のサービスとコンシューマーグループを共有しないでください。 これにより、既存のメッセージの送受信との干渉を防ぎます。

        GID_http_1

        コンシューマーオフセット

        メッセージの消費を開始するオフセットを選択します。

        最新のオフセット

        ネットワークタイプ

        メッセージルーティングのネットワークタイプを選択します。

        デフォルトネットワーク

        VPC

        VPC ID を選択します。 このパラメーターは、[ネットワークタイプ][インターネット] に設定した場合に必要です。

        vpc-bp17fapfdj0dwzjkd****

        VSwitch

        vSwitch ID を選択します。 このパラメーターは、[ネットワークタイプ][インターネット] に設定した場合に必要です。

        vsw-bp1gbjhj53hdjdkg****

        セキュリティグループ

        セキュリティグループを選択します。 このパラメーターは、[ネットワークタイプ][インターネット] に設定した場合に必要です。

        alikafka_pre-cn-7mz2****

        バッチプッシュ

        バッチプッシュは複数のイベントを集約します。 バッチプッシュは、一括プッシュの件数 または バッチプッシュ間隔 (単位:秒) の値に達したときにトリガーされます。

        たとえば、プッシュ数を 100、間隔を 15 秒に設定したとします。 メッセージ数が 10 秒で 100 に達した場合、15 秒の間隔が経過するのを待たずにプッシュがすぐにトリガーされます。

        有効化

        一括プッシュの件数

        1 回の関数呼び出しで送信できるメッセージの最大数。 リクエストは、蓄積されたメッセージの数が指定された値に達した場合にのみ送信されます。 有効な値: [1, 10000]。

        100

        バッチプッシュ間隔 (単位:秒)

        関数が呼び出される間隔。 システムはメッセージを集約し、指定された間隔で Function Compute に送信します。 有効な値: [0, 15]。 単位: 秒。 値 0 は待機時間がないことを示し、メッセージはすぐに配信されます。

        3

      3. [フィルタリング][変換]、および [シンク] 設定ウィザードで、イベントフィルタリングルール、変換ルール、およびイベントターゲットを設定します。 イベント変換の設定方法の詳細については、「Function Compute を使用したデータクレンジング」をご参照ください。

    • タスクのプロパティ

      イベントストリームのリトライポリシーとデッドレターキューを設定します。 詳細については、「リトライとデッドレターキュー」をご参照ください。

  4. [イベントストリーム] ページに戻ります。 作成したイベントストリームを見つけ、[アクション] 列の [有効化] をクリックします。

    イベントストリームを有効にすると、開始までに 30〜60 秒かかる場合があります。 [イベントストリーム] ページの [ステータス] 列で起動の進行状況を確認できます。

イベントの例

{
  "specversion": "1.0",
  "id": "8e215af8-ca18-4249-8645-f96c1026****",
  "source": "acs:alikafka",
  "type": "alikafka:Topic:Message",
  "subject": "acs:alikafka:alikafka_pre-cn-i7m2msb9****:topic:****",
  "datacontenttype": "application/json; charset=utf-8",
  "time": "2022-06-23T02:49:51.589Z",
  "aliyunaccountid": "182572506381****",
  "data": {
    "topic": "****",
    "partition": 7,
    "offset": 25,
    "timestamp": 1655952591589,
    "headers": {
      "headers": [],
      "isReadOnly": false
    },
    "key": "keytest",
    "value": "hello kafka msg"
  }
}

詳細については、「イベントの概要」をご参照ください。

次の表に、data フィールドのパラメーターを示します。

パラメーター

タイプ

説明

topic

文字列

TopicName

Topic の名前。

partition

Int

1

ApsaraMQ for Kafka のコンシューマーパーティション。

offset

Int

0

ApsaraMQ for Kafka のメッセージオフセット。

timestamp

文字列

1655952591589

メッセージの消費が開始されるときの UNIX タイムスタンプ。

headers.headers

リスト

[header1, header2]

メッセージヘッダー。

headers.isReadOnly

ブール値

false

このフィールドは予約済みであり、実用的な意味はありません。

key

文字列

dataKey

メッセージキー。

value

文字列

dataValue

メッセージの値。 コンテンツの形式は、タスクに設定されているデータ形式によって異なります。

  • Json: メッセージの内容は JSON 構造に解析されます。

  • Text: メッセージの内容は文字列として解析されます。

  • Binary: メッセージの内容は Base64 でエンコードされた文字列です。