全部產品
Search
文件中心

ApsaraMQ for Kafka:建立OSS Sink Connector(舊版)

更新時間:Feb 17, 2025

本文介紹如何建立使用OSS Sink Connector,您可以通過OSS Sink Connector將資料從雲訊息佇列 Kafka 版的資料來源Topic匯出至Object Storage Service的Object中。

前提條件

詳細步驟,請參見建立前提

步驟一:建立目標服務資源

在Object Storage Service控制台建立一個儲存空間(Bucket)。詳細步驟,請參見控制台建立儲存空間

本文以oss-sink-connector-bucket Bucket為例。

步驟二:建立OSS Sink Connector並啟動

  1. 登入雲訊息佇列 Kafka 版控制台,在概览頁面的资源分布地區,選擇地區。

  2. 在左側導覽列,選擇Connector生態整合 > 訊息流程出(Sink)

  3. 訊息流程出(Sink)頁面,單擊建立任務

  4. 訊息流程出建立面板,配置以下參數,單擊確定

    1. 基礎資訊地區,設定任務名稱,將流出類型選擇為Object Storage Service

    2. 資源配置地區,設定以下參數。

      表 1. 源(ApsaraMQ for Kafka

      參數

      說明

      樣本

      地區

      源Kafka執行個體所在的地區。

      華東1(杭州)

      kafka執行個體

      資料來源所在的Kafka執行個體ID。

      alikafka_post-cn-9hdsbdhd****

      Topic

      資料來源所在的Kafka執行個體Topic。

      guide-sink-topic

      Group ID

      資料來源所在的Kafka執行個體中的Group ID。

      • 快速建立:自動建立以GID_EVENTBRIDGE_xxx命名的Group ID。

      • 使用已有:選擇已建立的Group,請選擇獨立的Group ID,不要和已有的業務混用,以免影響已有的訊息收發。

      使用已有

      並發配額(消費者數)

      消費Topic資料的並發線程數,線程和Topic分區的對應關係如下:

      • Topic分區數=並發消費數:一個線程消費一個Topic分區。建議使用。

      • Topic分區數>並發消費數:多個並發消費會均攤所有分區消費。

      • Topic分區數<並發消費數:一個線程消費一個Topic分區,多出的消費數無效。

      2

      消費位點

      • 最新位點:從最新位點開始消費。

      • 最早位點:從最初位點開始消費。

      最新位點

      網路設定

      有跨境傳輸資料需求時選擇自建公網,其他情況可選擇預設網路

      預設網路

      表 1. 目標(Object Storage Service)

      參數

      說明

      樣本

      OSS Bucket

      已建立的Object Storage Service Bucket。

      oss-sink-connector-bucket

      儲存路徑

      • 無需分區:資料儲存路徑為{Kafka Instance ID}/{Topic Name}

      • 時間分區

        • YYYY/MM/dd/HH:產生的OSS檔案目錄為{Kafka Instance ID}/{Topic Name}/YYYY/MM/dd/HH

        • YYYY/MM/dd:產生的OSS檔案目錄為{Kafka Instance ID}/{Topic Name}/YYYY/MM/dd

        • YYYYMMddHH:產生的OSS檔案目錄為{Kafka Instance ID}/{Topic Name}/YYYYMMddHH

        • YYYYMMdd:產生的OSS檔案目錄為{Kafka Instance ID}/{Topic Name}/YYYYMMdd

      說明

      其中,YYYY、MM、dd、HH分別代表年、月、日、時。

      alikafka_post-cn-9dhsaassdd****/guide-oss-sink-topic/YYYY/MM/dd/HH

      進階配置

      當積攢的訊息滿足批量彙總檔案大小批量彙總事件視窗兩個條件中的任意一個時,新的訊息將會寫入新的檔案。

      批量彙總檔案大小

      配置需要彙總的檔案大小,取值範圍為[1,128],單位:MiB。

      5

      批量彙總事件視窗

      配置需要彙總的時間視窗。單位:分鐘。

      1

    完成上述配置後,在訊息流程出(Sink)頁面,找到剛建立的OSS Sink Connector任務,單擊其右側操作列的啟動。當狀態欄由啟動中變為運行中時,Connector建立成功。

步驟三:測試OSS Sink Connector

  1. 訊息流程出(Sink)頁面,在OSS Sink Connector任務的事件來源列單擊源Topic。

  2. 在Topic詳情頁面,單擊體驗發送訊息

  3. 快速體驗訊息收發面板,按照下圖配置訊息內容,然後單擊確定

    發送訊息

  4. 訊息流程出(Sink)頁面,在OSS Sink Connector任務的事件目標列單擊目標Bucket。

  5. 在Bucket頁面,選擇左側導覽列的檔案管理 > 檔案清單,然後進入Bucket的最深層路徑。

    最深層路徑

    可以看到此路徑中有如下兩類Object:

    • 系統meta檔案:格式為.oss_meta_file_partition_{partitionID},檔案數量和上遊Topic的Partition數量相同,用於記錄攢批資訊,您無需關注。

    • 資料檔案:格式為partition_{partitionID}_offset_{offset}_{8位Random字串},如果一個Object中彙總了一個Partition的多條訊息,Object名稱中的Offset為這批訊息中的最小Offset值。

  6. 在對應Object右側操作列,選擇表徵圖 > 下載

  7. 開啟下載的檔案,查看訊息內容。

    訊息

    如圖所示,多條訊息之間通過換行分隔。