全部產品
Search
文件中心

EventBridge:雲訊息佇列 Kafka 版

更新時間:Nov 01, 2025

本文介紹如何在事件匯流排EventBridge控制台添加雲訊息佇列 Kafka 版作為事件流中的事件提供方。

前提條件

操作步驟

  1. 登入事件匯流排EventBridge控制台,在左側導覽列,單擊事件流
  2. 在頂部功能表列,選擇地區,然後單擊建立事件流
  3. 建立事件流面板,設定任務名稱描述,配置以下參數,然後單擊儲存

    • 任務建立

      1. Source(源)設定精靈,選擇資料提供方訊息佇列 Kafka 版,設定以下參數,然後單擊下一步

      2. 參數

        說明

        樣本

        地區

        選擇雲訊息佇列 Kafka 版源執行個體所在的地區。

        華北2(北京)

        kafka 執行個體

        選擇生產雲訊息佇列 Kafka 版訊息的源執行個體。

        MQ_INST_115964845466****_ByBeUp3p

        Topic

        選擇生產雲訊息佇列 Kafka 版訊息的Topic。

        topic

        Group ID

        選擇源執行個體的消費組名稱。請使用獨立的消費組來建立事件來源,不要和已有的業務混用消費組,以免影響已有的訊息收發。

        GID_http_1

        消費位點

        選擇開始消費訊息的位點。

        最新位點

        網路設定

        選擇路由訊息的網路類型。

        預設網路

        Virtual Private Cloud

        選擇VPC ID。當網路設定設定為自建公網時需要設定此參數。

        vpc-bp17fapfdj0dwzjkd****

        交換器

        選擇vSwitch ID。當網路設定設定為自建公網時需要設定此參數。

        vsw-bp1gbjhj53hdjdkg****

        安全性群組

        選擇安全性群組。當網路設定設定為自建公網時需要設定此參數。

        alikafka_pre-cn-7mz2****

        批量推送

        批量推送可幫您批量彙總多個事件,當批量推送条数批量推送间隔(单位:秒)兩者條件達到其一時即會觸發批量推送。

        例如:您設定的推送條數為100 條,間隔時間為15 s,在10 s內訊息條數已達到100條,那麼該次推送則不會等15 s後再推送。

        開啟

        批量推送条数

        調用函數發送的最大批量訊息條數,當積壓的訊息數量到達設定值時才會發送請求,取值範圍為 [1,10000]。

        100

        批量推送间隔(单位:秒)

        調用函數的間隔時間,系統每到間隔時間點會將訊息彙總後發給Function Compute,取值範圍為[0,15],單位為秒。0秒錶示無等待時間,直接投遞。

        3

      3. Filtering(過濾)Transform(轉換)Sink(目標)設定精靈,設定事件過濾、轉換規則及事件目標。事件轉換的配置說明,請參見使用Function Compute實現訊息資料清洗

    • 任務屬性

      設定事件流的重試策略及無效信件佇列。更多資訊,請參見重試和死信

  4. 返回事件流頁面,找到建立好的事件流,在其右側操作欄,單擊啟用

    啟用事件流後,會有30秒~60秒的延遲時間,您可以在事件流頁面的狀態欄查看啟動進度。

事件樣本

{
  "specversion": "1.0",
  "id": "8e215af8-ca18-4249-8645-f96c1026****",
  "source": "acs:alikafka",
  "type": "alikafka:Topic:Message",
  "subject": "acs:alikafka:alikafka_pre-cn-i7m2msb9****:topic:****",
  "datacontenttype": "application/json; charset=utf-8",
  "time": "2022-06-23T02:49:51.589Z",
  "aliyunaccountid": "182572506381****",
  "data": {
    "topic": "****",
    "partition": 7,
    "offset": 25,
    "timestamp": 1655952591589,
    "headers": {
      "headers": [],
      "isReadOnly": false
    },
    "key": "keytest",
    "value": "hello kafka msg"
  }
}

CloudEvents規範中定義的參數解釋,請參見事件概述

data欄位包含的參數解釋如下表所示。

參數

類型

樣本值

描述

topic

String

TopicName

Topic的名稱。

partition

Int

1

雲訊息佇列 Kafka 版的消費分區資訊。

offset

Int

0

雲訊息佇列 Kafka 版的訊息位點。

timestamp

String

1655952591589

開始消費時間戳記。

headers.headers

List

[header1, header2]

訊息 header

headers.isReadOnly

Boolean

false

此欄位僅保留,無實際意義

key

String

dataKey

訊息 key

value

String

dataValue

訊息 value。具體內容格式和任務配置的資料格式相關。

  • Json:訊息內容解析為 Json 結構

  • Text:訊息內容解析為字串

  • Binary:訊息內容經 base64 編碼後的字串