全部產品
Search
文件中心

ApsaraMQ for Kafka:建立Kafka Sink Connector

更新時間:Feb 18, 2025

本文介紹如何在ApsaraMQ for Kafka控制台建立訊息流程出任務,將指定ApsaraMQ for Kafka執行個體中的資料匯出至其他的ApsaraMQ for Kafka

前提條件

建立訊息流程出任務

  1. 登入雲訊息佇列 Kafka 版控制台,在概览頁面的资源分布地區,選擇地區。

  2. 在左側導覽列,選擇Connector生態整合 > 工作清單

  3. 工作清單頁面,單擊建立任務

  4. 创建任务面板,設定任务名称描述,配置以下參數,單擊儲存

    • 任務建立

      1. Source(源)設定精靈,選擇数据提供方訊息佇列 Kafka 版,設定以下參數,然後單擊下一步

        參數

        說明

        樣本

        地區

        選擇雲訊息佇列 Kafka 版源執行個體所在的地區。

        華北2(北京)

        kafka 執行個體

        選擇生產雲訊息佇列 Kafka 版訊息的源執行個體。

        alikafka_post-cn-jte3****

        Topic

        選擇生產雲訊息佇列 Kafka 版訊息的Topic。

        demo-topic

        Group ID

        選擇源執行個體的消費組名稱。

        • 快速建立:推薦方案,自動建立以GID_EVENTBRIDGE_xxx 命名的 Group ID。

        • 使用已有:請選擇獨立的Group ID,不要和已有的業務混用,以免影響已有的訊息收發

        快速建立

        消費位點

        選擇開始消費訊息的位點。

        • 最新點位

        • 最早點位

        最新位點

        網路設定

        選擇路由訊息的網路類型。

        • 基礎網路

        • 自建公網

        基礎網路

        Virtual Private Cloud

        選擇VPC ID。當網路設定設定為自建公網時需要設定此參數。

        vpc-bp17fapfdj0dwzjkd****

        交換器

        選擇vSwitch ID。當網路設定設定為自建公網時需要設定此參數。

        vsw-bp1gbjhj53hdjdkg****

        安全性群組

        選擇安全性群組。當網路設定設定為自建公網時需要設定此參數。

        alikafka_pre-cn-7mz2****

        数据格式

        資料格式是針對支援二進位傳遞的資料來源端推出的指定內容格式的編碼能力。支援多種資料格式編碼,如無特殊編碼訴求可將格式設定為Json。

        • Json(Json格式編碼,位元據按照utf-8 編碼為Json格式放入Payload。)

        • Text(預設文字格式設定編碼,位元據按照utf-8編碼為字串放入Payload。)

        • Binary(二進位格式編碼,位元據按照Base64編碼為字串放入Payload。)

        Json

        批量推送条数

        調用函數發送的最大批量訊息條數,當積壓的訊息數量到達設定值時才會發送請求,取值範圍為 [1,10000]。

        100

        批量推送间隔(单位:秒)

        調用函數的間隔時間,系統每到間隔時間點會將訊息彙總後發給Function Compute,取值範圍為[0,15],單位為秒。0秒錶示無等待時間,直接投遞。

        3

      2. Filtering(过滤)設定精靈,設定資料模式内容過濾發送的請求。更多資訊,請參見事件模式

      3. Transform(转换)設定精靈,設定資料清洗,實現分割、映射、富化及動態路由等繁雜資料加工能力。更多資訊,請參見使用Function Compute實現訊息資料清洗

      4. Sink(目标)設定精靈,選擇服务类型訊息佇列 Kafka 版,配置以下參數。

        參數

        說明

        樣本

        实例ID

        選擇已建立的雲訊息佇列 Kafka 版執行個體。

        test

        Topic

        選擇已建立執行個體中的Topic。

        test

        确认模式(ack)

        選擇雲訊息佇列 Kafka 版接收到資料後給用戶端發出的確認訊號。

        • None

        • LeaderOnly

        • All

        None

        消息体(Value)

        事件匯流排EventBridge通過JSONPath提取訊息中的資料,將指定的訊息內容路由到目標。

        • 完整資料

        • 資料提取

        • 固定值

        • 模板

        資料提取

        $.data.value

        消息键值(Key)

        事件匯流排EventBridge通過JSONPath提取訊息中的資料,將指定的訊息內容路由到目標。

        • 資料提取

        • 固定值

        • 模板

        資料提取

        $.data.key
    • 任務屬性

      設定此任務的重試策略及無效信件佇列。更多資訊,請參見重試和死信

  5. 返回任务列表頁面,找到建立好的任務,在其右側操作列,單擊启用

  6. 提示對話方塊,閱讀提示資訊,然後單擊确认

    啟用任務後,會有30秒~60秒的延遲時間,您可以在任务列表頁面的状态欄查看啟動進度。

其他動作

任务列表頁面,找到目標任務,在其右側操作列,執行其他動作。

  • 查看任務詳情:單擊详情,在任務頁面,查看任務的基礎資訊、任務屬性及監控指標。

  • 編輯任務配置:單擊编辑,在編輯任務面板,修改任務詳情及屬性。

  • 啟停任務:單擊启用或者停用,然後在提示對話方塊,單擊确认

  • 刪除任務:單擊删除,然後在提示對話方塊,單擊确认