全部產品
Search
文件中心

DataWorks:Kafka單表離線同步至MaxCompute

更新時間:Nov 27, 2025

本文以將Kafka單表離線同步至MaxCompute為例,為您介紹Kafka的分鐘、小時、天增量資料定時調度寫入MaxCompute小時、天分區表的配置詳情。

注意事項

  • Kafka的版本需要大於等於0.10.2小於等於2.2.x,且Kafka啟用了記錄時間戳記,並且記錄帶有正確的業務時間戳記。

  • 增量資料開始同步後,如果仍有時間戳記小於等於起始時間的記錄寫入Kafka Topic,這些資料可能被漏讀,所以當Kafka Topic中資料寫入出現延遲或者時間戳記亂序時,要注意對離線同步任務造成的資料漏讀風險。

  • Kafka側參數同步結束策略原則上只有滿足以下條件才可以選擇1分鐘讀取不到新資料,否則存在資料漏讀風險。

    • Kafka Topic中部分或全部分區存在長時間(10分鐘以上)無資料寫入情況。

    • 每個周期執行個體啟動後,不會有時間戳記小於結束時間參數的記錄寫入Kafka Topic。

前提條件

使用限制

暫不支援將源端資料同步至MaxCompute外部表格。

操作步驟

說明

本文以資料開發(Data Studio)(新版)介面操作為例,示範離線同步任務配置。

一、建立節點與任務配置

對於通用的節點建立和嚮導配置步驟,本文將直接引用通用操作指南嚮導模式配置,不再贅述。

二、配置資料來源與去向

配置資料來源(Kafka)

本文檔將Kafka資料單表離線同步至MaxCompute,資料來源為Kafka檔案,配置要點如下。

說明

通用的Kafka資料來源的配置項介紹可查看Kafka Reader文檔,以下為本次實踐的配置參考。

配置項

配置要點

主題

選擇待同步的Kafka Topic。如果您使用的是標準模式的DataWorks,需要在對應開發和生產環境的Kafka叢集中有同名的Topic,此處的主題即選擇此同名的Topic即可。

說明

如果:

  • 開發環境的Topic不存在:則此處配置離線同步節點的主題時,下拉框中無法搜尋到待同步的Topic。

  • 生產環境的Topic不存在:則離線同步任務配置完,提交發布後,在生產環境周期調度運行時會因為沒法找到待同步的表而導致任務失敗。

消費群組ID

根據業務需要填寫,確保在Kafka叢集側唯一,便於在Kafka叢集側統計和監控消費情況。

讀取起始位點起始時間讀取結束位點結束時間

讀取起始點位讀取結束位點選擇指定時間起始時間結束時間分別設定為調度參數${startTime}${endTime}

這幾個參數明確了後續同步資料時起始和結束位置,本實踐的配置表明從${startTime}時間的資料開始同步,一直到${endTime}時間的資料結束。${startTime}${endTime}在同步任務實際運行時會根據調度參數配置運算式作參數替換。

時區

可置空或選擇預設使用DataWorks所在地區的伺服器時區。

說明

如果您此前有聯絡阿里雲支援人員修改過調度時區,這裡可選擇您修改後的時區。

鍵類型實值型別編碼

根據Kafka Topic記錄實際情況選擇。

同步結束策略

同步結束策略如果滿足以下條件可以選擇1分鐘讀取不到新資料,否則選擇到達指定結束位點

  • Kafka Topic中部分或全部分區存在長時間(10分鐘以上)無資料寫入情況。

  • 每個周期執行個體啟動後,不會有時間戳記小於結束時間參數的記錄寫入Kafka Topic。

進階配置

保持預設即可。

配置資料去向(MaxCompute)

本文檔將Kafka資料單表離線同步至MaxCompute,資料去向為表,配置要點如下。

說明

下表中未說明參數保持預設即可。

配置項

配置要點

資料來源

預設顯示上一步選擇的MaxCompute資料來源。如果您使用的是標準模式的DataWorks工作空間,會分別顯示開發和生產專案的名稱。

選擇待同步的MaxCompute表。如果您使用的是標準類型的DataWorks工作空間,請確保在MaxCompute的開發環境和生產環境中存在同名且表結構一致的MaxCompute表。

您也可以單擊一鍵產生目標表結構,系統將自動建立表接收資料,支援手動調整建表語句。

說明

如果:

  • 開發環境不存在待同步的MaxCompute表,則在配置離線同步節點的去向表的下拉框中無法搜到待同步表。

  • 生產環境不存在待同步的MaxCompute表,同步任務提交發布後,資料同步任務調度運行時將會由於無法找到待同步表而導致同步任務運行失敗。

  • 開發環境和生產環境的表結構不一致,同步任務提交發布後,同步任務實際調度運行時的列對應關係,可能與此處離線同步節點配置的列對應關係不一致,最終導致資料寫入不正確。

分區資訊

如果表為分區表,您可以填入分區列的取值。

  • 取值可以是固定值,如ds=20220101

  • 取值可以是調度系統參數,如ds=${partition},當任務運行時,會自動替換調度系統參數。

三、配置欄位對應

選擇資料來源和資料去向後,需要指定讀取端和寫入端列的映射關係。您可以選擇同名映射同行映射取消映射手動編輯映射關係

  • Kafka側欄位中預設的6個欄位。

    欄位名

    含義

    __key__

    Kafka記錄的Key。

    __value__

    Kafka記錄的Value。

    __partition__

    Kafka記錄所在分區號,分區號為從0開始的整數。

    __headers__

    Kafka記錄的dHeaders。

    __offset__

    Kafka記錄在所在分區的位移量,位移量為從0開始的整數。

    __timestamp__

    Kafka記錄的13位整數毫秒時間戳記。

  • Kafka側欄位可自訂配置JSON解析,可以通過.(擷取子欄位)[](擷取數組元素)兩種文法,擷取Kafka記錄JSON格式的value欄位內容。

    重要

    如果JSON欄位名中帶有"."字元,由於會引發欄位定義文法歧義,無法通過欄位定義擷取欄位值。

    Kafka某條JSON格式的記錄value的資料樣本如下。

    {
          "a": {
          "a1": "hello"
          },
          "b": "world",
          "c":[
                "xxxxxxx",
                "yyyyyyy"
                ],
          "d":[
                {
                      "AA":"this",
                      "BB":"is_data"
                },
                {
                      "AA":"that",
                      "BB":"is_also_data"
                }
            ],
         "a.b": "unreachable"
    }
    • 如果同步a1的資料“hello”,Kafka側欄位增加a.a1

    • 如果同步b的資料“world”,Kafka側欄位增加b

    • 如果同步c的資料“yyyyyyy”,Kafka側欄位增加c[1]

    • 如果同步AA的資料“this”,Kafka側欄位增加d[0].AA

    • Kafka側欄位定義若為a.b,則無法同步資料“unreachable”

  • 允許源表欄位或目標表欄位存在不參與映射的欄位,源表不參與映射的欄位同步執行個體不會讀取,目標端不參與映射的欄位將寫入NULL。

  • 不允許一個源頭表欄位對應到多個目標表欄位,也不允許一個目標表欄位對應到多個目標表欄位

四、配置進階參數

單擊任務右側的進階配置,可設定任務期望最大並發數髒資料策略等。本教程將髒資料策略配置為不容忍髒資料,其他參數在初期可保持預設。更多資訊,請參見嚮導模式配置

五、調試配置並運行

  1. 單擊離線同步節點編輯頁面右側的調試配置,設定調試運行使用的資源群組指令碼參數,然後單擊頂部工具列的運行,測試同步鏈路是否成功運行。

  2. 您可以在左側導覽列單擊image,然後單擊個人目錄右側的image,建立一個尾碼為.sql的檔案,執行如下SQL查詢資料去向表中的資料是否符合預期。

    說明
    SELECT * FROM <MaxCompute側目標表名> WHERE pt=<指定分區> LIMIT 20;

六、調度配置與發布

單擊離線同步任務右側的調度配置,設定周期運行所需的調度配置參數後,單擊頂部工具列的發布,進入發布面板,根據頁面提示完成發布

在上述配置資料來源與資料去向時,使用了三個調度參數:${startTime}${endTime}${partition},在此處調度配置中需根據實際同步需求指定這三個調度參數的替換策略,以下為幾個典型情境的配置樣本。

典型情境

推薦配置

情境樣本說明

同步任務每5分鐘調度一次

  • startTime=$[yyyymmddhh24mi-8/24/60]00

  • endTime=$[yyyymmddhh24mi-3/24/60]00

  • partition=$[yyyymmddhh24mi-8/24/60]

如果同步任務在2022-11-22 10:00被調度啟動,則:

  • 會同步Kafka Topic中時間戳記範圍在2022-11-22 09:52(含)到2022-11-22 09:57(不含)的記錄。

  • 同步的Kafka資料寫入MaxCompute的202211220952分區中。

  • endTime設定比執行個體調度時間($[yyyymmddhh24mi])早三分鐘是為了確保同步任務執行個體啟動後,對應時間區間內的資料已全部寫入Kafka Topic中,避免漏讀。

同步任務每小時調度一次

  • startTime=$[yyyymmddhh24-1/24]0000

  • endTime=$[yyyymmddhh24]0000

  • partition=$[yyyymmddhh24]

說明
  • 同步任務每2小時調度一次時,startTime=$[yyyymmddhh24-2/24]0000,另外調度參數保持不變。

  • 同步任務每3小時調度一次時,startTime=$[yyyymmddhh24-3/24]0000,另外調度參數保持不變。

  • 以此類推其他以小時為調度周期的情境下,調度參數的配置結果。

如果同步任務在2022-11-22 10:05被調度啟動,則:

  • 會同步Kafka Topic中時間戳記範圍在2022-11-22 9:00(含)到2022-11-22 10:00(不含)的記錄。

  • 同步的Kafka資料寫入MaxCompute的2022112210分區中。

同步任務每天調度一次

  • startTime=$[yyyymmdd-1]000000

  • endTime=$[yyyymmdd]000000

  • partition=$[yyyymmdd-1]

如果同步任務在2022-11-22 00:05被調度啟動,則:

  • 會同步Kafka Topic中時間戳記範圍在2022-11-21 00:00(含)到2022-11-22 00:00(不含)的記錄。

  • 同步的Kafka資料寫入MaxCompute的20221121分區中。

同步任務每周調度一次

  • startTime=$[yyyymmdd-7]000000

  • endTime=$[yyyymmdd]000000

  • partition=$[yyyymmdd-1]

如果同步任務在2022-11-22 00:05被調度啟動,則:

  • 會同步Kafka Topic中時間戳記範圍在2022-11-15 00:00(含)到2022-11-22 00:00(不含)的記錄。

  • 同步的Kafka資料寫入MaxCompute的20221121分區中。

同步任務每月調度一次

  • startTime=$[add_months(yyyymmdd,-1)]000000

  • endTime=$[yyyymmdd]000000

  • partition=$[yyyymmdd-1]

如果同步任務在2022-11-22 00:05被調度啟動,則:

  • 會同步Kafka Topic中時間戳記範圍在2022-10-22 00:00(含)到2022-11-22 00:00(不含)的記錄。

  • 同步的Kafka資料寫入MaxCompute的20221121分區中。

根據希望的調度間隔,設定調度周期。

典型情境

推薦配置

情境樣本說明

同步任務每5分鐘調度一次

  • 調度周期:分鐘

  • 開始時間:00:00

  • 時間間隔:05分鐘

  • 結束時間:23:59

同步任務每小時調度一次

  • 調度周期:小時

  • 開始時間:00:15

  • 時間間隔:1小時

  • 結束時間:23:59

開始時間設定一個比00:00稍晚一點的時間,例如00:15,確保同步任務執行個體啟動後,對應時間區間內的資料已全部寫入Kafka Topic中。

同步任務每天調度一次

  • 調度周期:天

  • 定時調度時間:00:15

定時調度時間設定一個比00:00稍晚一點的時間,例如00:15,確保同步任務執行個體啟動後,對應時間區間內的資料已全部寫入Kafka Topic中。

同步任務每周調度一次

  • 調度周期:周

  • 指定時間:星期一

  • 定時調度時間:00:15

定時調度時間設定一個比00:00稍晚一點的時間,例如00:15,確保同步任務執行個體啟動後,對應時間區間內的資料已全部寫入Kafka Topic中。

同步任務每月調度一次

  • 調度周期:月

  • 指定時間:每月1號

  • 定時調度時間:00:15

定時調度時間設定一個比00:00稍晚一點的時間,例如00:15,確保同步任務執行個體啟動後,對應時間區間內的資料已全部寫入Kafka Topic中。

重要

如果出現執行個體啟動後,仍有時間戳記小於等於起始時間的記錄寫入Kafka Topic,則這些資料可能被漏讀,所以當Kafka Topic中資料寫入出現延遲或者時間戳記亂序時,要注意對離線同步任務造成的資料漏讀風險。