本文介紹如何建立使用OSS Sink Connector,您可以通過OSS Sink Connector將資料從雲訊息佇列 Kafka 版的資料來源Topic匯出至Object Storage Service的Object中。
前提條件
詳細步驟,請參見建立前提。
步驟一:建立目標服務資源
在Object Storage Service控制台建立一個儲存空間(Bucket)。詳細步驟,請參見控制台建立儲存空間。
本文以oss-sink-connector-bucket Bucket為例。
步驟二:建立OSS Sink Connector並啟動
登入雲訊息佇列 Kafka 版控制台,在概览頁面的资源分布地區,選擇地區。
在左側導覽列,選擇。
在訊息流程出(Sink)頁面,單擊建立任務。
在基礎資訊地區,設定任務名稱,將流出類型選擇為Object Storage Service。
在資源配置地區,設定以下參數。
表 1. 源(ApsaraMQ for Kafka)
參數
說明
樣本
地區
源Kafka執行個體所在的地區。
華東1(杭州)
kafka執行個體
資料來源所在的Kafka執行個體ID。
alikafka_post-cn-9hdsbdhd****
Topic
資料來源所在的Kafka執行個體Topic。
guide-sink-topic
Group ID
資料來源所在的Kafka執行個體中的Group ID。
快速建立:自動建立以GID_EVENTBRIDGE_xxx命名的Group ID。
使用已有:選擇已建立的Group,請選擇獨立的Group ID,不要和已有的業務混用,以免影響已有的訊息收發。
使用已有
並發配額(消費者數)
消費Topic資料的並發線程數,線程和Topic分區的對應關係如下:
Topic分區數=並發消費數:一個線程消費一個Topic分區。建議使用。
Topic分區數>並發消費數:多個並發消費會均攤所有分區消費。
Topic分區數<並發消費數:一個線程消費一個Topic分區,多出的消費數無效。
2
消費位點
最新位點:從最新位點開始消費。
最早位點:從最初位點開始消費。
最新位點
網路設定
有跨境傳輸資料需求時選擇自建公網,其他情況可選擇預設網路。
預設網路
表 1. 目標(Object Storage Service)
參數
說明
樣本
OSS Bucket
已建立的Object Storage Service Bucket。
oss-sink-connector-bucket
儲存路徑
無需分區:資料儲存路徑為{Kafka Instance ID}/{Topic Name}。
時間分區:
YYYY/MM/dd/HH:產生的OSS檔案目錄為{Kafka Instance ID}/{Topic Name}/YYYY/MM/dd/HH。
YYYY/MM/dd:產生的OSS檔案目錄為{Kafka Instance ID}/{Topic Name}/YYYY/MM/dd。
YYYYMMddHH:產生的OSS檔案目錄為{Kafka Instance ID}/{Topic Name}/YYYYMMddHH。
YYYYMMdd:產生的OSS檔案目錄為{Kafka Instance ID}/{Topic Name}/YYYYMMdd。
說明其中,YYYY、MM、dd、HH分別代表年、月、日、時。
alikafka_post-cn-9dhsaassdd****/guide-oss-sink-topic/YYYY/MM/dd/HH
進階配置
當積攢的訊息滿足批量彙總檔案大小和批量彙總事件視窗兩個條件中的任意一個時,新的訊息將會寫入新的檔案。
無
批量彙總檔案大小
配置需要彙總的檔案大小,取值範圍為[1,128],單位:MiB。
5
批量彙總事件視窗
配置需要彙總的時間視窗。單位:分鐘。
1
完成上述配置後,在訊息流程出(Sink)頁面,找到剛建立的OSS Sink Connector任務,單擊其右側操作列的啟動。當狀態欄由啟動中變為運行中時,Connector建立成功。
步驟三:測試OSS Sink Connector
在訊息流程出(Sink)頁面,在OSS Sink Connector任務的事件來源列單擊源Topic。
在Topic詳情頁面,單擊體驗發送訊息。
在快速體驗訊息收發面板,按照下圖配置訊息內容,然後單擊確定。

在訊息流程出(Sink)頁面,在OSS Sink Connector任務的事件目標列單擊目標Bucket。
在Bucket頁面,選擇左側導覽列的,然後進入Bucket的最深層路徑。

可以看到此路徑中有如下兩類Object:
系統meta檔案:格式為.oss_meta_file_partition_{partitionID},檔案數量和上遊Topic的Partition數量相同,用於記錄攢批資訊,您無需關注。
資料檔案:格式為partition_{partitionID}_offset_{offset}_{8位Random字串},如果一個Object中彙總了一個Partition的多條訊息,Object名稱中的Offset為這批訊息中的最小Offset值。
在對應Object右側操作列,選擇。
開啟下載的檔案,查看訊息內容。

如圖所示,多條訊息之間通過換行分隔。