全部產品
Search
文件中心

ApsaraMQ for Kafka:建立OSS Sink Connector

更新時間:Mar 28, 2025

本文介紹如何建立OSS Sink Connector將資料從雲訊息佇列 Kafka 版執行個體的資料來源Topic匯出至Object Storage Service。

前提條件

在匯出資料前,請確保您已完成以下操作:

注意事項

  • 僅支援在同地區內,將資料從雲訊息佇列 Kafka 版執行個體的資料來源Topic匯出至Function Compute,再由Function Compute匯出至Object Storage Service。Connector的限制說明,請參見使用限制

  • 該功能基於Function Compute服務提供。Function Compute為您提供了一定的免費額度,超額部分將產生費用,請以Function Compute的計費規則為準。計費詳情,請參見計費概述

  • Function Compute的函數調用支援日誌查詢。具體操作步驟,請參見配置日誌

  • 訊息轉儲時,雲訊息佇列 Kafka 版中訊息用UTF-8 String序列化,暫不支援二進位的資料格式。

建立並部署OSS Sink Connector

  1. 登入雲訊息佇列 Kafka 版控制台

  2. 概览頁面的资源分布地區,選擇地區。

  3. 在左側導覽列,單擊Connector 任务列表

  4. Connector 任务列表頁面,從选择实例的下拉式清單選擇Connector所屬的執行個體,然後單擊创建 Connector

  5. 创建 Connector設定精靈面頁面,完成以下操作。

    1. 配置基本信息頁簽,按需配置以下參數,然後單擊下一步

      重要

      雲訊息佇列 Kafka 版會為您自動選中授权创建服务关联角色

      • 如果未建立服務關聯角色,雲訊息佇列 Kafka 版會為您自動建立一個服務關聯角色,以便您使用雲訊息佇列 Kafka 版匯出資料至Object Storage Service的功能。

      • 如果已建立服務關聯角色,雲訊息佇列 Kafka 版不會重複建立。

      關於該服務關聯角色的更多資訊,請參見服務關聯角色

      參數

      描述

      樣本值

      名称

      Connector的名稱。命名規則:

      • 可以包含數字、小寫英文字母和短劃線(-),但不能以短劃線(-)開頭,長度限制為48個字元。

      • 同一個雲訊息佇列 Kafka 版執行個體內保持唯一。

      Connector的資料同步任務必須使用名稱為connect-任務名稱Group。如果您未手動建立該Group,系統將為您自動建立。

      kafka-oss-sink

      实例

      預設配置為執行個體的名稱與執行個體ID。

      demo alikafka_post-cn-st21p8vj****

    2. 配置源服务頁簽,選擇資料來源訊息佇列Kafka版,並配置以下參數,然後單擊下一步

      參數

      描述

      樣本值

      数据源 Topic

      需要同步資料的Topic。

      oss-test-input

      消费线程并发数

      資料來源Topic的消費線程並發數。預設值為6。取值說明如下:

      • 1

      • 2

      • 3

      • 6

      • 12

      6

      消费初始位置

      開始消費的位置。取值說明如下:

      • 最早位点:從最初位點開始消費。

      • 最近位点:從最新位點開始消費。

      最早位点

      VPC ID

      資料同步任務所在的VPC。單擊配置运行环境顯示該參數。預設為雲訊息佇列 Kafka 版執行個體所在的VPC,您無需填寫。

      vpc-bp1xpdnd3l***

      vSwitch ID

      資料同步任務所在的交換器。單擊配置运行环境顯示該參數。該交換器必須與雲訊息佇列 Kafka 版執行個體處於同一VPC。預設為部署雲訊息佇列 Kafka 版執行個體時填寫的交換器。

      vsw-bp1d2jgg81***

      失败处理

      訊息發送失敗後,是否繼續訂閱出現錯誤的Topic的分區。單擊配置运行环境顯示該參數。取值說明如下。

      • 继续订阅:繼續訂閱出現錯誤的Topic的分區,並列印錯誤記錄檔。

      • 停止订阅:停止訂閱出現錯誤的Topic的分區,並列印錯誤記錄檔

      說明

      继续订阅

      创建资源方式

      選擇建立Connector所依賴的Topic與Group的方式。單擊配置运行环境顯示該參數。

      • 自动创建

      • 手动创建

      自动创建

      Connector 消费组

      Connector使用的Group。單擊配置运行环境顯示該參數。該Group的名稱建議以connect-cluster開頭。

      connect-cluster-kafka-oss-sink

      任务位点 Topic

      用於儲存消費位點的Topic。單擊配置运行环境顯示該參數。

      • Topic:建議以connect-offset開頭。

      • 分區數:Topic的分區數量必須大於1。

      • 儲存引擎:Topic的儲存引擎必須為Local儲存。

        說明

        僅專業版執行個體支援在建立Topic時選擇儲存引擎為Local儲存,標準版暫不支援。

      • cleanup.policy:Topic的日誌清理策略必須為compact。

      connect-offset-kafka-oss-sink

      任务配置 Topic

      用於儲存任務配置的Topic。單擊配置运行环境顯示該參數。

      • Topic:建議以connect-config開頭。

      • 分區數:Topic的分區數量必須為1。

      • 儲存引擎:Topic的儲存引擎必須為Local儲存。

        說明

        僅專業版執行個體支援在建立Topic時選擇儲存引擎為Local儲存,標準版暫不支援。

      • cleanup.policy:Topic的日誌清理策略必須為compact。

      connect-config-kafka-oss-sink

      任务状态 Topic

      用於儲存任務狀態的Topic。單擊配置运行环境顯示該參數。

      • Topic:建議以connect-status開頭。

      • 分區數:Topic的分區數量建議為6。

      • 儲存引擎:Topic的儲存引擎必須為Local儲存。

        說明

        僅專業版執行個體支援在建立Topic時選擇儲存引擎為Local儲存,標準版暫不支援。

      • cleanup.policy:Topic的日誌清理策略必須為compact。

      connect-status-kafka-oss-sink

      死信队列 Topic

      用於儲存Connect架構的異常資料的Topic。單擊配置运行环境顯示該參數。該Topic可以和異常資料Topic為同一個Topic,以節省Topic資源。

      • Topic:建議以connect-error開頭。

      • 分區數:Topic的分區數量建議為6。

      • 儲存引擎:Topic的儲存引擎可以為Local儲存或雲端儲存。

        說明

        僅專業版執行個體支援在建立Topic時選擇儲存引擎為Local儲存,標準版暫不支援。

      connect-error-kafka-oss-sink

      异常数据 Topic

      用於儲存Sink的異常資料的Topic。單擊配置运行环境顯示該參數。該Topic可以和無效信件佇列Topic為同一個Topic,以節省Topic資源。

      • Topic:建議以connect-error開頭。

      • 分區數:Topic的分區數量建議為6。

      • 儲存引擎:Topic的儲存引擎可以為Local儲存或雲端儲存。

        說明

        僅專業版執行個體支援在建立Topic時選擇儲存引擎為Local儲存,標準版暫不支援。

      connect-error-kafka-oss-sink

    3. 配置目标服务頁簽,選擇目標服務Object Storage Service,並配置以下參數,然後單擊创建

      參數

      描述

      樣本值

      Bucket 名称

      Object Storage ServiceBucket的名稱。

      bucket_test

      Access Key

      阿里雲帳號的AccessKey ID。

      yourAccessKeyID

      Secret Key

      阿里雲帳號的AccessKey Secret。

      yourAccessKeySecret

      請確保您使用的AccessKey ID所對應的帳號已被授予以下最小許可權:

      {
          "Version": "1",
          "Statement": [
              {
                  "Action": [
                      "oss:GetObject",
                      "oss:PutObject"
                  ],
                  "Resource": "*",
                  "Effect": "Allow"
              }
          ]
      }
      說明

      AccessKey ID和AccessKey Secret是雲訊息佇列 Kafka 版建立任務時作為環境變數傳遞至Object Storage Service的資料,任務建立成功後,雲訊息佇列 Kafka 版不儲存AccessKey ID和AccessKey Secret資訊。

      建立完成後,在Connector 任务列表頁面,查看建立的Connector 。

  6. 建立完成後,在Connector 任务列表頁面,找到建立的Connector ,單擊其操作列的部署

發送測試訊息

您可以向雲訊息佇列 Kafka 版的資料來源Topic發送訊息,測試資料能否被匯出至Object Storage Service。

  1. Connector 任务列表頁面,找到目標Connector,在其右側操作列,單擊测试

  2. 发送消息面板,發送測試訊息。

    • 发送方式選擇控制台

      1. 消息 Key文字框中輸入訊息的Key值,例如demo。

      2. 消息内容文字框輸入測試的訊息內容,例如 {"key": "test"}。

      3. 設定发送到指定分区,選擇是否指定分區。

        • 單擊,在分区 ID文字框中輸入分區的ID,例如0。如果您需查詢分區的ID,請參見查看分區狀態

        • 單擊,不指定分區。

    • 发送方式選擇Docker,執行运行 Docker 容器生产示例消息地區的Docker命令,發送訊息。

    • 发送方式選擇SDK,根據您的業務需求,選擇需要的語言或者架構的SDK以及接入方式,通過SDK發送訊息。

驗證結果

雲訊息佇列 Kafka 版的資料來源Topic發送訊息後,查看OSS檔案管理,驗證資料匯出結果。更多資訊,請參見檔案概覽

檔案管理中顯示新匯出的檔案。

files

雲訊息佇列 Kafka 版資料匯出至Object Storage Service的格式樣本如下:

[
    {
        "key":"123",
        "offset":4,
        "overflowFlag":true,
        "partition":0,
        "timestamp":1603779578478,
        "topic":"Test",
        "value":"1",
        "valueSize":272687
    }
]

更多操作

您可以按需對該Connector所依賴的Function Compute資源進行配置。

Connector 任务列表頁面,找到建立的Connector,單擊其操作列的更多 > 配置函数

頁面跳轉至Function Compute控制台,您可以按需配置函數資源。