本文介紹如何建立OSS Sink Connector將資料從雲訊息佇列 Kafka 版執行個體的資料來源Topic匯出至Object Storage Service。
前提條件
在匯出資料前,請確保您已完成以下操作:
為雲訊息佇列 Kafka 版執行個體開啟Connector。更多資訊,請參見開啟Connector。
為雲訊息佇列 Kafka 版執行個體建立資料來源Topic。更多資訊,請參見步驟一:建立Topic。
開通Function Compute服務。更多資訊,請參見開通Function Compute服務。
注意事項
僅支援在同地區內,將資料從雲訊息佇列 Kafka 版執行個體的資料來源Topic匯出至Function Compute,再由Function Compute匯出至Object Storage Service。Connector的限制說明,請參見使用限制。
該功能基於Function Compute服務提供。Function Compute為您提供了一定的免費額度,超額部分將產生費用,請以Function Compute的計費規則為準。計費詳情,請參見計費概述。
Function Compute的函數調用支援日誌查詢。具體操作步驟,請參見配置日誌。
訊息轉儲時,雲訊息佇列 Kafka 版中訊息用UTF-8 String序列化,暫不支援二進位的資料格式。
建立並部署OSS Sink Connector
在概览頁面的资源分布地區,選擇地區。
在左側導覽列,單擊Connector 任务列表。
在Connector 任务列表頁面,從选择实例的下拉式清單選擇Connector所屬的執行個體,然後單擊创建 Connector。
在创建 Connector設定精靈面頁面,完成以下操作。
在配置基本信息頁簽,按需配置以下參數,然後單擊下一步。
重要雲訊息佇列 Kafka 版會為您自動選中授权创建服务关联角色。
如果未建立服務關聯角色,雲訊息佇列 Kafka 版會為您自動建立一個服務關聯角色,以便您使用雲訊息佇列 Kafka 版匯出資料至Object Storage Service的功能。
如果已建立服務關聯角色,雲訊息佇列 Kafka 版不會重複建立。
關於該服務關聯角色的更多資訊,請參見服務關聯角色。
參數
描述
樣本值
名称
Connector的名稱。命名規則:
可以包含數字、小寫英文字母和短劃線(-),但不能以短劃線(-)開頭,長度限制為48個字元。
同一個雲訊息佇列 Kafka 版執行個體內保持唯一。
Connector的資料同步任務必須使用名稱為connect-任務名稱的Group。如果您未手動建立該Group,系統將為您自動建立。
kafka-oss-sink
实例
預設配置為執行個體的名稱與執行個體ID。
demo alikafka_post-cn-st21p8vj****
在配置源服务頁簽,選擇資料來源為訊息佇列Kafka版,並配置以下參數,然後單擊下一步。
參數
描述
樣本值
数据源 Topic
需要同步資料的Topic。
oss-test-input
消费线程并发数
資料來源Topic的消費線程並發數。預設值為6。取值說明如下:
1
2
3
6
12
6
消费初始位置
開始消費的位置。取值說明如下:
最早位点:從最初位點開始消費。
最近位点:從最新位點開始消費。
最早位点
VPC ID
資料同步任務所在的VPC。單擊配置运行环境顯示該參數。預設為雲訊息佇列 Kafka 版執行個體所在的VPC,您無需填寫。
vpc-bp1xpdnd3l***
vSwitch ID
資料同步任務所在的交換器。單擊配置运行环境顯示該參數。該交換器必須與雲訊息佇列 Kafka 版執行個體處於同一VPC。預設為部署雲訊息佇列 Kafka 版執行個體時填寫的交換器。
vsw-bp1d2jgg81***
失败处理
訊息發送失敗後,是否繼續訂閱出現錯誤的Topic的分區。單擊配置运行环境顯示該參數。取值說明如下。
继续订阅:繼續訂閱出現錯誤的Topic的分區,並列印錯誤記錄檔。
停止订阅:停止訂閱出現錯誤的Topic的分區,並列印錯誤記錄檔
說明如何查看日誌,請參見Connector相關操作。
如何根據錯誤碼尋找解決方案,請參見錯誤碼。
继续订阅
创建资源方式
選擇建立Connector所依賴的Topic與Group的方式。單擊配置运行环境顯示該參數。
自动创建
手动创建
自动创建
Connector 消费组
Connector使用的Group。單擊配置运行环境顯示該參數。該Group的名稱建議以connect-cluster開頭。
connect-cluster-kafka-oss-sink
任务位点 Topic
用於儲存消費位點的Topic。單擊配置运行环境顯示該參數。
Topic:建議以connect-offset開頭。
分區數:Topic的分區數量必須大於1。
儲存引擎:Topic的儲存引擎必須為Local儲存。
說明僅專業版執行個體支援在建立Topic時選擇儲存引擎為Local儲存,標準版暫不支援。
cleanup.policy:Topic的日誌清理策略必須為compact。
connect-offset-kafka-oss-sink
任务配置 Topic
用於儲存任務配置的Topic。單擊配置运行环境顯示該參數。
Topic:建議以connect-config開頭。
分區數:Topic的分區數量必須為1。
儲存引擎:Topic的儲存引擎必須為Local儲存。
說明僅專業版執行個體支援在建立Topic時選擇儲存引擎為Local儲存,標準版暫不支援。
cleanup.policy:Topic的日誌清理策略必須為compact。
connect-config-kafka-oss-sink
任务状态 Topic
用於儲存任務狀態的Topic。單擊配置运行环境顯示該參數。
Topic:建議以connect-status開頭。
分區數:Topic的分區數量建議為6。
儲存引擎:Topic的儲存引擎必須為Local儲存。
說明僅專業版執行個體支援在建立Topic時選擇儲存引擎為Local儲存,標準版暫不支援。
cleanup.policy:Topic的日誌清理策略必須為compact。
connect-status-kafka-oss-sink
死信队列 Topic
用於儲存Connect架構的異常資料的Topic。單擊配置运行环境顯示該參數。該Topic可以和異常資料Topic為同一個Topic,以節省Topic資源。
Topic:建議以connect-error開頭。
分區數:Topic的分區數量建議為6。
儲存引擎:Topic的儲存引擎可以為Local儲存或雲端儲存。
說明僅專業版執行個體支援在建立Topic時選擇儲存引擎為Local儲存,標準版暫不支援。
connect-error-kafka-oss-sink
异常数据 Topic
用於儲存Sink的異常資料的Topic。單擊配置运行环境顯示該參數。該Topic可以和無效信件佇列Topic為同一個Topic,以節省Topic資源。
Topic:建議以connect-error開頭。
分區數:Topic的分區數量建議為6。
儲存引擎:Topic的儲存引擎可以為Local儲存或雲端儲存。
說明僅專業版執行個體支援在建立Topic時選擇儲存引擎為Local儲存,標準版暫不支援。
connect-error-kafka-oss-sink
在配置目标服务頁簽,選擇目標服務為Object Storage Service,並配置以下參數,然後單擊创建。
參數
描述
樣本值
Bucket 名称
Object Storage ServiceBucket的名稱。
bucket_test
Access Key
阿里雲帳號的AccessKey ID。
yourAccessKeyID
Secret Key
阿里雲帳號的AccessKey Secret。
yourAccessKeySecret
請確保您使用的AccessKey ID所對應的帳號已被授予以下最小許可權:
{ "Version": "1", "Statement": [ { "Action": [ "oss:GetObject", "oss:PutObject" ], "Resource": "*", "Effect": "Allow" } ] }說明AccessKey ID和AccessKey Secret是雲訊息佇列 Kafka 版建立任務時作為環境變數傳遞至Object Storage Service的資料,任務建立成功後,雲訊息佇列 Kafka 版不儲存AccessKey ID和AccessKey Secret資訊。
建立完成後,在Connector 任务列表頁面,查看建立的Connector 。
建立完成後,在Connector 任务列表頁面,找到建立的Connector ,單擊其操作列的部署。
發送測試訊息
您可以向雲訊息佇列 Kafka 版的資料來源Topic發送訊息,測試資料能否被匯出至Object Storage Service。
在Connector 任务列表頁面,找到目標Connector,在其右側操作列,單擊测试。
在发送消息面板,發送測試訊息。
发送方式選擇控制台。
在消息 Key文字框中輸入訊息的Key值,例如demo。
在消息内容文字框輸入測試的訊息內容,例如 {"key": "test"}。
設定发送到指定分区,選擇是否指定分區。
單擊是,在分区 ID文字框中輸入分區的ID,例如0。如果您需查詢分區的ID,請參見查看分區狀態。
單擊否,不指定分區。
发送方式選擇Docker,執行运行 Docker 容器生产示例消息地區的Docker命令,發送訊息。
发送方式選擇SDK,根據您的業務需求,選擇需要的語言或者架構的SDK以及接入方式,通過SDK發送訊息。
驗證結果
向雲訊息佇列 Kafka 版的資料來源Topic發送訊息後,查看OSS檔案管理,驗證資料匯出結果。更多資訊,請參見檔案概覽。
檔案管理中顯示新匯出的檔案。

雲訊息佇列 Kafka 版資料匯出至Object Storage Service的格式樣本如下:
[
{
"key":"123",
"offset":4,
"overflowFlag":true,
"partition":0,
"timestamp":1603779578478,
"topic":"Test",
"value":"1",
"valueSize":272687
}
]更多操作
您可以按需對該Connector所依賴的Function Compute資源進行配置。
在Connector 任务列表頁面,找到建立的Connector,單擊其操作列的。
頁面跳轉至Function Compute控制台,您可以按需配置函數資源。