本文以將Kafka單表離線同步至MaxCompute為例,為您介紹Kafka的分鐘、小時、天增量資料定時調度寫入MaxCompute小時、天分區表的配置詳情。
注意事項
Kafka的版本需要大於等於0.10.2小於等於2.2.x,且Kafka啟用了記錄時間戳記,並且記錄帶有正確的業務時間戳記。
增量資料開始同步後,如果仍有時間戳記小於等於起始時間的記錄寫入Kafka Topic,這些資料可能被漏讀,所以當Kafka Topic中資料寫入出現延遲或者時間戳記亂序時,要注意對離線同步任務造成的資料漏讀風險。
Kafka側參數同步結束策略原則上只有滿足以下條件才可以選擇1分鐘讀取不到新資料,否則存在資料漏讀風險。
Kafka Topic中部分或全部分區存在長時間(10分鐘以上)無資料寫入情況。
每個周期執行個體啟動後,不會有時間戳記小於結束時間參數的記錄寫入Kafka Topic。
前提條件
已購買Serverless資源群組。
已建立Kafka資料來源和MaxCompute資料來源,詳情請參見資料來源配置。
已完成資源群組與資料來源間的網路連通,詳情請參見網路連通方案概述。
使用限制
暫不支援將源端資料同步至MaxCompute外部表格。
操作步驟
本文以資料開發(Data Studio)(新版)介面操作為例,示範離線同步任務配置。
一、建立節點與任務配置
對於通用的節點建立和嚮導配置步驟,本文將直接引用通用操作指南嚮導模式配置,不再贅述。
二、配置資料來源與去向
配置資料來源(Kafka)
本文檔將Kafka資料單表離線同步至MaxCompute,資料來源為Kafka檔案,配置要點如下。
通用的Kafka資料來源的配置項介紹可查看Kafka Reader文檔,以下為本次實踐的配置參考。
配置項 | 配置要點 |
主題 | 選擇待同步的Kafka Topic。如果您使用的是標準模式的DataWorks,需要在對應開發和生產環境的Kafka叢集中有同名的Topic,此處的主題即選擇此同名的Topic即可。 說明 如果:
|
消費群組ID | 根據業務需要填寫,確保在Kafka叢集側唯一,便於在Kafka叢集側統計和監控消費情況。 |
讀取起始位點、起始時間讀取結束位點、結束時間 | 讀取起始點位與讀取結束位點選擇指定時間,起始時間與結束時間分別設定為調度參數 這幾個參數明確了後續同步資料時起始和結束位置,本實踐的配置表明從 |
時區 | 可置空或選擇預設使用DataWorks所在地區的伺服器時區。 說明 如果您此前有聯絡阿里雲支援人員修改過調度時區,這裡可選擇您修改後的時區。 |
鍵類型、實值型別、編碼 | 根據Kafka Topic記錄實際情況選擇。 |
同步結束策略 | 同步結束策略如果滿足以下條件可以選擇1分鐘讀取不到新資料,否則選擇到達指定結束位點。
|
進階配置 | 保持預設即可。 |
配置資料去向(MaxCompute)
本文檔將Kafka資料單表離線同步至MaxCompute,資料去向為表,配置要點如下。
下表中未說明參數保持預設即可。
配置項 | 配置要點 |
資料來源 | 預設顯示上一步選擇的MaxCompute資料來源。如果您使用的是標準模式的DataWorks工作空間,會分別顯示開發和生產專案的名稱。 |
表 | 選擇待同步的MaxCompute表。如果您使用的是標準類型的DataWorks工作空間,請確保在MaxCompute的開發環境和生產環境中存在同名且表結構一致的MaxCompute表。 您也可以單擊一鍵產生目標表結構,系統將自動建立表接收資料,支援手動調整建表語句。 說明 如果:
|
分區資訊 | 如果表為分區表,您可以填入分區列的取值。
|
三、配置欄位對應
選擇資料來源和資料去向後,需要指定讀取端和寫入端列的映射關係。您可以選擇同名映射、同行映射、取消映射或手動編輯映射關係。
Kafka側欄位中預設的6個欄位。
欄位名
含義
__key__
Kafka記錄的Key。
__value__
Kafka記錄的Value。
__partition__
Kafka記錄所在分區號,分區號為從0開始的整數。
__headers__
Kafka記錄的dHeaders。
__offset__
Kafka記錄在所在分區的位移量,位移量為從0開始的整數。
__timestamp__
Kafka記錄的13位整數毫秒時間戳記。
Kafka側欄位可自訂配置JSON解析,可以通過
.(擷取子欄位)和[](擷取數組元素)兩種文法,擷取Kafka記錄JSON格式的value欄位內容。重要如果JSON欄位名中帶有"."字元,由於會引發欄位定義文法歧義,無法通過欄位定義擷取欄位值。
Kafka某條JSON格式的記錄value的資料樣本如下。
{ "a": { "a1": "hello" }, "b": "world", "c":[ "xxxxxxx", "yyyyyyy" ], "d":[ { "AA":"this", "BB":"is_data" }, { "AA":"that", "BB":"is_also_data" } ], "a.b": "unreachable" }如果同步a1的資料
“hello”,Kafka側欄位增加a.a1。如果同步b的資料
“world”,Kafka側欄位增加b。如果同步c的資料
“yyyyyyy”,Kafka側欄位增加c[1]。如果同步AA的資料
“this”,Kafka側欄位增加d[0].AA。Kafka側欄位定義若為
a.b,則無法同步資料“unreachable”。
允許源表欄位或目標表欄位存在不參與映射的欄位,源表不參與映射的欄位同步執行個體不會讀取,目標端不參與映射的欄位將寫入NULL。
不允許一個源頭表欄位對應到多個目標表欄位,也不允許一個目標表欄位對應到多個目標表欄位
四、配置進階參數
單擊任務右側的進階配置,可設定任務期望最大並發數、髒資料策略等。本教程將髒資料策略配置為不容忍髒資料,其他參數在初期可保持預設。更多資訊,請參見嚮導模式配置。
五、調試配置並運行
單擊離線同步節點編輯頁面右側的調試配置,設定調試運行使用的資源群組和指令碼參數,然後單擊頂部工具列的運行,測試同步鏈路是否成功運行。
您可以在左側導覽列單擊
,然後單擊個人目錄右側的
,建立一個尾碼為.sql的檔案,執行如下SQL查詢資料去向表中的資料是否符合預期。說明此方式查詢需要將目標端MaxCompute綁定為DataWorks的計算資源。
您需要在
.sql檔案編輯頁面右側單擊調試配置,指定資料來源類型、計算資源、資源群組後,再單擊頂部工具列的運行。
SELECT * FROM <MaxCompute側目標表名> WHERE pt=<指定分區> LIMIT 20;
六、調度配置與發布
單擊離線同步任務右側的調度配置,設定周期運行所需的調度配置參數後,單擊頂部工具列的發布,進入發布面板,根據頁面提示完成發布。
在上述配置資料來源與資料去向時,使用了三個調度參數:${startTime}、${endTime}和${partition},在此處調度配置中需根據實際同步需求指定這三個調度參數的替換策略,以下為幾個典型情境的配置樣本。
典型情境 | 推薦配置 | 情境樣本說明 |
同步任務每5分鐘調度一次 |
| 如果同步任務在2022-11-22 10:00被調度啟動,則:
|
同步任務每小時調度一次 |
說明
| 如果同步任務在2022-11-22 10:05被調度啟動,則:
|
同步任務每天調度一次 |
| 如果同步任務在2022-11-22 00:05被調度啟動,則:
|
同步任務每周調度一次 |
| 如果同步任務在2022-11-22 00:05被調度啟動,則:
|
同步任務每月調度一次 |
| 如果同步任務在2022-11-22 00:05被調度啟動,則:
|
根據希望的調度間隔,設定調度周期。
典型情境 | 推薦配置 | 情境樣本說明 |
同步任務每5分鐘調度一次 |
| 無 |
同步任務每小時調度一次 |
| 開始時間設定一個比00:00稍晚一點的時間,例如00:15,確保同步任務執行個體啟動後,對應時間區間內的資料已全部寫入Kafka Topic中。 |
同步任務每天調度一次 |
| 定時調度時間設定一個比00:00稍晚一點的時間,例如00:15,確保同步任務執行個體啟動後,對應時間區間內的資料已全部寫入Kafka Topic中。 |
同步任務每周調度一次 |
| 定時調度時間設定一個比00:00稍晚一點的時間,例如00:15,確保同步任務執行個體啟動後,對應時間區間內的資料已全部寫入Kafka Topic中。 |
同步任務每月調度一次 |
| 定時調度時間設定一個比00:00稍晚一點的時間,例如00:15,確保同步任務執行個體啟動後,對應時間區間內的資料已全部寫入Kafka Topic中。 |
如果出現執行個體啟動後,仍有時間戳記小於等於起始時間的記錄寫入Kafka Topic,則這些資料可能被漏讀,所以當Kafka Topic中資料寫入出現延遲或者時間戳記亂序時,要注意對離線同步任務造成的資料漏讀風險。