本文介紹如何建立使用MaxCompute Sink Connector,您可以通過MaxCompute Sink Connector將資料從雲訊息佇列 Kafka 版執行個體的資料來源Topic匯出至MaxCompute的表中。
前提條件
詳細步驟,請參見建立前提。
注意事項
如需使用MaxCompute資料分割函數,建立表時需額外建立一個分區列,列名為time,類型為STRING。
步驟一:建立目標資源
通過MaxCompute用戶端建立表。更多資訊,請參見建立表。
本文以名稱為kafka_to_maxcompute的表為例。表中有3列資料,並使用資料分割函數。該表的建表語句如下:
CREATE TABLE IF NOT EXISTS kafka_to_maxcompute(topic STRING,valueName STRING,valueAge BIGINT) PARTITIONED by (time STRING);如不使用資料分割函數,語句如下:
CREATE TABLE IF NOT EXISTS kafka_to_maxcompute(topic STRING,valueName STRING,valueAge BIGINT);執行成功後,如下圖所示:
在表管理頁面,查看建立的表資訊。
步驟二:建立MaxCompute Sink Connector並啟動
登入雲訊息佇列 Kafka 版控制台,在概览頁面的资源分布地區,選擇地區。
在左側導覽列,選擇。
在工作清單頁面,單擊建立任務。
在创建任务面板,設定任务名称和描述,配置以下參數。
任務建立
在Source(源)設定精靈,選擇数据提供方為訊息佇列 Kafka 版,設定以下參數,然後單擊下一步。
參數
說明
樣本
地域
源Kafka執行個體所在的地區。
華東1(杭州)
kafka執行個體
資料來源所在的Kafka執行個體ID。
alikafka_post-cn-9hdsbdhd****
Topic
資料來源所在的Kafka執行個體Topic。
guide-sink-topic
Group ID
資料來源所在的Kafka執行個體中的Group ID。
快速建立:自動建立以GID_EVENTBRIDGE_xxx命名的Group ID。
使用已有:選擇已建立的Group,請選擇獨立的Group ID,不要和已有的業務混用,以免影響已有的訊息收發。
使用已有
消费位点
最新位點:從最新位點開始消費。
最早位點:從最初位點開始消費。
最新位點
網路設定
有跨境傳輸資料需求時選擇自建公網,其他情況可選擇基礎網路。
基礎網路
資料格式
資料格式是針對支援二進位傳遞的資料來源端推出的指定內容格式的編碼能力。支援多種資料格式編碼,如無特殊編碼訴求可將格式設定為Json。
Json(預設Json格式編碼,位元據按照utf-8 編碼為Json格式放入Payload。)
Text(文字格式設定編碼,位元據按照utf-8編碼為字串放入Payload。)
Binary(二進位格式編碼,位元據按照Base64編碼為字串放入Payload。)
Json
批量推送条数
調用函數發送的最大批量訊息條數,當積壓的訊息數量到達設定值時才會發送請求,取值範圍為 [1,10000]。
2000
批量推送间隔(单位:秒)
調用函數的間隔時間,系統每到間隔時間點會將訊息彙總後發給Function Compute,取值範圍為[0,15],單位為秒。0秒錶示無等待時間,直接投遞。
3
在Filtering(过滤)設定精靈,定義資料模式過濾發送的請求。更多資訊,請參見事件模式。
在Transform(转换)設定精靈,設定資料清洗,實現分割、映射、富化及動態路由等繁雜資料加工能力。更多資訊,請參見使用Function Compute實現訊息資料清洗。
在Sink(目标)設定精靈,選擇服务类型為MaxCompute acs.maxcompute,配置以下參數。
參數
說明
樣本
帳號AccessKey ID
阿里雲帳號的AccessKey ID,用於訪問MaxCompute服務。
yourAccessKeyID
帳號AccessKey Secret
阿里雲帳號的AccessKey Secret。
yourAccessKeySecret
MaxCompute專案名稱
選擇已建立的MaxCompute專案。
test_compute
MaxCompute表名稱
選擇已建立的MaxCompute表。
kafka_to_maxcompute
MaxCompute表入參
選擇MaxCompute表後,此處會展示表的列名和類型資訊,配置資料擷取規則即可。下面是一條訊息樣本,本樣本中Topic列對應的值從Topic欄位提取,則定義資料擷取規則為
$.topic。{ 'data': { 'topic': 't_test', 'partition': 2, 'offset': 1, 'timestamp': 1717048990499, 'headers': { 'headers': [], 'isReadOnly': False }, 'key': 'MaxCompute-K1', 'value': 'MaxCompute-V1' }, 'id': '9b05fc19-9838-4990-bb49-ddb942307d3f-2-1', 'source': 'acs:alikafka', 'specversion': '1.0', 'type': 'alikafka:Topic:Message', 'datacontenttype': 'application/json; charset=utf-8', 'time': '2024-05-30T06:03:10.499Z', 'aliyunaccountid': '1413397765616316' }topic:
$.data.topicvaluename:
$.data.valuevalueage:
$.data.offset分區配置
關閉:不開啟分區能力。
開啟:開啟分區能力。
開啟分區後需配置分區值等參數:
分取值支援{yyyy}、{MM}、{dd}、{HH}、{mm}時間變數參數,分別對應年、月、日、時、分。時間變數大小寫敏感。
分取值支援填寫常量。
開啟
{yyyy}-{MM}-{dd}.{HH}:{mm}.suffix
網路設定
專用網路:通過Virtual Private Cloud將Kafka訊息投遞到MaxCompute。
公網:通過公網將Kafka訊息投遞到MaxCompute。
公網
VPC
選擇VPC ID。僅當網路設定為專用網路時需要配置此參數。
vpc-bp17fapfdj0dwzjkd****
交換器
選擇vSwitch ID。僅當網路設定為專用網路時需要配置此參數。
vsw-bp1gbjhj53hdjdkg****
安全性群組
選擇安全性群組。僅當網路設定為專用網路時需要配置此參數。
test_group
任務屬性
配置事件推送失敗時的重試策略及錯誤發生時的處理方式。更多資訊,請參見重試和死信。
完成上述配置後,單擊儲存。在工作清單頁面,找到剛建立的MaxCompute Sink Connector任務,此時狀態欄為啟動中,當狀態變為運行中時,Connector建立成功。
步驟三:測試MaxCompute Sink Connector
在工作清單頁面,在MaxCompute Sink Connector任務的事件來源列單擊源Topic。
在Topic詳情頁面,單擊體驗發送訊息。
在快速體驗訊息收發面板,按照下圖配置訊息內容,然後單擊確定。

進入MaxCompute控制台,執行以下SQL語句查看分區資訊。
show PARTITIONS kafka_to_maxcompute;查詢結果如下所示:

根據分區資訊,執行以下語句,查看分區內資料。
SELECT * FROM kafka_to_maxcompute WHERE time="2024-05-31.16:37.suffix";查詢結果如下所示:
