全部產品
Search
文件中心

DataWorks:Kafka單表即時同步至OSS資料湖

更新時間:Jun 26, 2025

Data Integration目前支援將Kafka、LogHub等源頭的資料單表即時同步至OSS。本文以Kafka為源端、OSS為目標端情境為例,為您介紹Kafka如何通過Data Integration即時同步至OSS資料湖。

使用限制

Kafka的版本需要大於等於0.10.2小於等於2.2.0。

前提條件

操作步驟

一、選擇同步任務類型

  1. 進入Data Integration頁面。

    登入DataWorks控制台,切換至目標地區後,單擊左側導覽列的Data Integration > Data Integration,在下拉框中選擇對應工作空間後單擊進入Data Integration

  2. 在左側導覽列單擊同步任務,然後在頁面頂部單擊建立同步任務,進入同步任務的建立頁面,配置如下基本資料。

    • 資料來源和去向KafkaOSS

    • 新任務名稱:自訂同步任務名稱。

    • 同步類型單表即時

二、網路與資源配置

  1. 網路與資源配置地區,選擇同步任務所使用的資源群組。您可以為該任務分配任務資源佔用CU數。

  2. 來來源資料源選擇已添加的kafka資料來源,去向資料來源選擇已添加的OSS資料來源後,單擊測試連通性image

  3. 確保來來源資料源與去向資料來源均連通成功後,單擊下一步

三、配置同步鏈路

1、配置Kafka來源

在頁面上方單擊資料來源Kafka,編輯Kafka來源資訊

image

  1. Kafka來源資訊地區,選擇Kafka叢集中需要同步的Topic。

    其他配置可使用任務建立時產生的預設值,也可根據需要進行修改。

  2. 單擊右上方的資料採樣

    在彈出對話方塊中指定好開始時間採樣條數後,單擊開始採集按鈕,可以對指定的Kafka Topic進行資料採樣,同時您可以預覽Topic中的資料,為後續資料處理節點的資料預覽和可視化配置提供輸入。

  3. 輸出欄位配置地區,按需勾選同步任務需同步的欄位。

2、編輯資料處理節點

單擊image表徵圖可以增加資料處理方式。目前提供5種資料處理方式(資料脫敏字串替換資料過濾JSON解析欄位編輯與賦值),您可根據需要做順序編排,在任務運行時會按照編排的資料處理先後順序執行資料處理。

image

每完成一個資料處理節點配置,可以單擊右上方的資料輸出預覽按鈕,在彈出對話方塊中,單擊重新擷取上遊輸出,類比得到Kafka Topic採樣資料經過當前資料處理節點處理後的結果。

image

說明

資料輸出預覽強依賴Kafka來源的資料採樣,在執行資料輸出預覽前需要先在Kafka來源表單中完成資料採樣。

3、配置OSS去向資訊

在頁面上方單擊資料去向OSS,編輯OSS去向資訊

image

  1. OSS去向資訊地區,選擇要寫入的OSS基本資料。

    • 寫入格式:支援HudiPaimonlceberg三種寫入格式。

    • 選擇中繼資料庫自動構建位置:如果您當前帳號下開通了DLF產品,支援同步資料入湖時自動在DLF構建對應的中繼資料庫和中繼資料表資訊。

      說明

      不支援跨地區構建中繼資料。

    • 儲存直接選取:選擇入湖後資料存放區在OSS的哪個路徑下。

    • 目標庫:選擇資料寫入的目標資料庫,支援您選擇建立庫,建立DLF中繼資料庫,需指定庫名

    • 目標表:選擇要寫入的OSS表是自動建表還是使用已有表

    • 表名:填寫或者選擇要寫入的OSS表名。

  2. (可選)編輯表結構。

    當選擇自動建表時,您需要單擊編輯表結構按鈕,在彈框中編輯目標表結構。同時,支援您單擊根據上遊節點輸出資料行重建表結構按鈕,自動根據上遊節點輸出資料行,產生表結構。您可以在自動產生的表結構中選擇一列配置為主鍵。

  3. 配置欄位對應。

    系統會自動按照同名映射原則產生上遊列與目標表列之間的映射,您可根據需要進行調整,支援一個上遊列映射到多個目標表列,不允許多個上遊列映射到一個目標表列,當上遊列未配置到目標表列的映射時,對應列不會寫入目標表。

四、警示配置

為避免任務出錯導致業務資料同步延遲,您可以對同步任務設定警示策略。

  1. 單擊頁面右上方的警示配置,進入即時子任務警示設定頁面。

  2. 單擊新增警示,配置警示規則。

    說明

    此處定義的警示規則,將對該任務產生的即時同步子任務生效,您可在任務配置完成後,進入即時同步任務運行與管理介面查看並修改該即時同步子任務的監控警示規則。

  3. 管理警示規則。

    對於已建立的警示規則,您可以通過警示開關控制警示規則是否開啟,同時,您可以根據警示層級將警示發送給不同的人員。

五、進階參數配置

同步任務提供部分參數可供修改,您可以按需對該參數值進行修改。

說明

請在完全瞭解對應參數含義的情況下再進行修改,以免產生不可預料的錯誤或者資料品質問題。

  1. 單擊介面右上方的進階參數配置,進入進階參數配置頁面。

  2. 進階參數配置頁面修改相關參數值。

六、DDL能力配置

來來源資料源會包含許多DDL操作,您可以根據業務需求,在介面右上方單擊DDL能力配置,進入DDL能力配置頁面,對不同的DDL訊息設定同步至目標端的處理策略。

說明

不同DDL訊息處理策略請參見:DDL訊息處理規則

七、資源群組配置

您可以單擊介面右上方的資源群組配置,查看並切換當前的任務所使用的資源群組。

八、類比運行

完成上述所有任務配置後,您可以單擊右上方類比運行,類比整個任務針對少量採樣資料的處理,查看資料寫入目標表後的結果。當任務配置錯誤、類比運行過程中異常或者產生髒資料時,會即時反饋出異常資訊,能夠協助您快速評估任務配置的正確性,以及是否能得到預期結果。

  1. 在彈出的對話方塊中設定採樣參數(開始時間採樣條數)。

  2. 單擊開始採集得到採樣資料。

  3. 單擊預覽按鈕,類比整個任務針對少量採樣資料的處理。

九、執行同步任務

  1. 完成所有配置後,單擊頁面底部的完成配置

  2. Data Integration > 同步任務介面,找到已建立的同步任務,單擊操作列的啟動

  3. 單擊工作清單中對應任務的名稱/ID,查看任務的詳細執行過程。

同步任務營運

查看任務運行狀態

建立完成同步任務後,您可以在同步任務頁面查看當前已建立的同步工作清單及各個同步任務的基本資料。

image

  • 您可以在操作列啟動停止同步任務,在更多中可以對同步任務進行編輯查看等操作。

  • 已啟動的任務您可以在執行概況中看到任務啟動並執行基本情況,也可以單擊對應的概況地區查看執行詳情。

image

Kafka到OSS的單表即時同步任務分為兩個步驟:

  • 結構遷移:包含目標表的建立方式(已有表或自動建表),如果是自動建表,將會為您展示建表的DDL。

  • 即時資料同步:包含即時同步的統計資訊,包含即時的運行資訊、DDL記錄、警示資訊等。

任務重跑

在某些特殊情況下,如果需要修改同步欄位、調整目標表欄位或表名資訊時,您還可以單擊同步任務操作列的重跑,系統會將調整的欄位、變更的目標包等資訊進行同步,之前同步過未修改的表將不會再進行同步。

  • 不修改任務配置,直接單擊重跑操作,重新運行一次同步任務。

  • 編輯任務,進行修改操作後,單擊完成配置。此時任務的操作會變成應用程式更新,單擊應用程式更新會直接觸發修改後的任務重跑。即時同步任務會按照新的配置運行。