本教程以MySQL中的使用者基本資料ods_user_info_d表和OSS中的網站訪問日誌資料user_log.txt檔案為例,通過Data Integration離線同步任務分別同步至StarRocks的ods_user_info_d_starrocks、ods_raw_log_d_starrocks表。旨在介紹如何通過DataWorksData Integration實現異構資料來源間的資料同步,完成數倉資料同步操作。
前提條件
在進行資料同步之前,請確保已準備好所需的工作環境。具體操作步驟請參見準備環境。
章節目標
將案例提供的公用資料來源中的資料同步至StarRocks,完成商務程序設計中的資料同步部分的內容。
源端資料來源類型 | 源端待同步資料 | 源端表結構 | 目標端資料來源類型 | 接收源端資料的目標表 | 目標表結構 |
MySQL | 表:ods_user_info_d 使用者基本資料資料 |
| StarRocks |
|
|
HttpFile | 檔案:user_log.txt 使用者網站訪問日誌資料 | 一行為一條使用者訪問記錄 | StarRocks |
|
|
進入資料開發
登入DataWorks控制台,切換至目標地區後,單擊左側導覽列的,在下拉框中選擇對應工作空間後單擊進入資料開發。
一、設計商務程序
設計商務程序
建立商務程序。
資料開發需基於商務程序使用對應的開發組件進行具體開發操作。在建立節點之前,您需要先建立商務程序。具體操作方法可參見建立商務程序。
該商務程序的命名為:使用者Portrait analysis_StarRocks。

設計商務程序
商務程序建立完成後,將自動延伸該商務程序畫布。請根據商務程序設計,在商務程序畫布中單擊建立節點,通過將節點群組件拖拽至商務程序畫布,並通過拉線設定節點上下遊依賴的方式,設計資料同步階段的商務程序。

在本教程中,由於虛擬節點和同步節點之間並無血緣關係,因此我們通過商務程序拉線的方式來設定節點的依賴關係。有關更多依賴關係設定方式的詳細資料,詳情請參見調度依賴配置指引。以下為各個節點的類型、命名以及作用的介紹。
節點分類
節點類型
節點命名
(以最終產出表命名)
節點作用
通用
虛擬節點

workshop_start_starrocks用於統籌管理整個使用者Portrait analysis商務程序,例如商務程序起調時間。當空間商務程序較複雜時,可使資料流轉路徑更清晰。該節點為空白跑任務,無須編輯代碼。
資料庫
StarRocks

ddl_ods_user_info_d_starrocks同步任務前建立,用於接收源端MySQL儲存的使用者基本資料資料的StarRocks表
ods_user_info_d_starrocks。資料庫
StarRocks

ddl_ods_raw_log_d_starrocks同步任務前建立,用於接收源端OSS儲存的使用者網站訪問記錄的StarRocks表
ods_raw_log_d_starrocks。Data Integration
離線同步

ods_user_info_d_starrocks用於將儲存於MySQL的使用者基本資料資料同步至StarRocks表
ods_user_info_d_starrocks。Data Integration
離線同步

ods_raw_log_d_starrocks用於將儲存於OSS的使用者網站訪問記錄同步至StarRocks表
ods_raw_log_d_starrocks。
配置調度邏輯
本案例通過虛擬節點workshop_start_starrocks控制整個商務程序每天00:30調度執行,以下為虛擬節點關鍵調度配置,其他節點調度無須變更,實現邏輯詳情請參見:情境:如何配置商務程序定時時間。其他調度配置相關說明,請參見:任務調度屬性配置概述。
調度配置 | 圖片展示 | 說明 |
調度時間配置 |
| 虛擬節點配置調度時間為00:30,該虛擬節點會在每日00:30調起當前商務程序並執行。 |
調度依賴配置 |
| 由於虛擬節點 |
DataWorks中的所有節點都需要依賴於上遊節點,資料同步階段的所有任務都以虛擬節點workshop_start_starrocks為依賴。換句話說,通過workshop_start_starrocks節點來觸發資料同步商務程序的執行。
二、搭建同步鏈路
建立目標StarRocks表
在進行同步操作之前,需要提前建立目標StarRocks表,以便儲存後續同步過來的未經處理資料。
本案例Starrocks表基於源端表結構產生,具體請參見章節目標。在商務程序面板,雙擊資料庫ddl_ods_user_info_d_starrocks節點和資料庫ddl_ods_raw_log_d_starrocks節點,進入節點編輯頁,分別填入對應的StarRocks建表命令,並單擊
儲存。
ddl_ods_user_info_d_starrocksCREATE TABLE IF NOT EXISTS ods_user_info_d_starrocks ( uid STRING COMMENT '使用者ID', gender STRING COMMENT '性別', age_range STRING COMMENT '年齡段', zodiac STRING COMMENT '星座', dt STRING not null COMMENT '時間' ) DUPLICATE KEY(uid) COMMENT '使用者行為分析案例-使用者基本資料表' PARTITION BY(dt) PROPERTIES("replication_num" = "1");ddl_ods_raw_info_d_starrocksCREATE TABLE IF NOT EXISTS ods_raw_log_d_starrocks ( col STRING COMMENT '日誌', dt DATE not null COMMENT '時間' ) DUPLICATE KEY(col) COMMENT '使用者行為分析案例-網站訪問日誌未經處理資料表' PARTITION BY(dt) PROPERTIES ("replication_num" = "1");
配置使用者資料同步鏈路
在商務程序面板,雙擊離線同步ods_user_info_d_starrocks節點,進入ods_user_info_d_starrocks節點的配置面板,完成使用者基本資料資料從案例提供的MySQL表ods_user_info_d同步至StarRocks表ods_user_info_d_starrocks的同步鏈路配置操作。
網路與資源配置。
在配置好資料來源、我的資源群組、資料去向後,單擊下一步,根據頁面提示完成連通性測試。詳細配置如下。
配置項
配置內容
資料來源
資料來源:MySQL
資料來源名稱:
user_behavior_analysis_mysql
我的資源群組
選擇在準備環境階段建立的Serverless資源群組。
資料去向
資料去向:StarRocks
資料來源名稱:
Doc_StarRocks_Storage_Compute_Tightly_01

配置任務。
配置資料來源與去向。
模組
配置項
配置內容
資料來源
表
選擇MySQL表
ods_user_info_d。切分鍵
建議使用主鍵或有索引的列作為切分鍵,僅支援類型為整型的欄位。
此處配置切分鍵為
uid欄位。資料去向
表
選擇StarRocks表
ods_user_info_d_starrocks。匯入前準備語句
本案例按
dt欄位動態分區,為避免節點重跑資料重複寫入,通過以下SQL語句實現每次同步前刪除已有目標資料分割。ALTER TABLE ods_user_info_d_starrocks DROP PARTITION IF EXISTS p${var} FORCE其中${var}為參數,後續在調度設定階段為其賦值調度參數,以實現調度情境下的動態入參,詳情請參見調度設定。StreamLoad請求參數
StreamLoad 的請求參數,需為JSON格式。
{ "row_delimiter": "\\x02", "column_separator": "\\x01" }
欄位對應。
通過欄位對應關係確定源端欄位與目標端欄位的寫入關係,並通過變數賦值調度參數的方式,實現動態為StarRocks分區欄位賦值,即每日資料寫入StarRocks對應業務分區。
單擊同名映射,源端MySQL欄位將自動對應目標表同名欄位,即源端欄位資料將預設寫入與源端欄位同名的目標端欄位。
單擊添加一行,輸入
'${var}',並手動設定該欄位與StarRocks的dt欄位進行映射。
調度配置。
配置頁面單擊右側調度配置,可進入調度配置面板配置調度與節點資訊,詳情可參見節點調度配置。以下為配置的內容。
配置項
配置內容
圖示
調度參數
在調度參數項中單擊新增參數,添加:
參數名:var
參數值:$[yyyymmdd-1]

調度依賴
在調度依賴確認產出表已作為本節點輸出。
格式為
worksspacename.tablename
配置使用者日誌同步鏈路
在商務程序面板,雙擊離線同步ods_raw_log_d_starrocks節點,進入ods_raw_log_d_starrocks節點的配置面板,完成使用者網站訪問資訊資料從平台提供的公用資料來源HttpFile檔案user_log.txt同步到StarRocks表ods_raw_log_d_starrocks的同步鏈路配置操作。
網路與資源配置。
在配置好資料來源、我的資源群組和資料去向後,請單擊下一步,根據頁面提示完成連通性測試。詳細配置如下。
配置項
配置內容
資料來源
資料來源:HttpFile
資料來源名稱:
user_behavior_analysis_HttpFile
我的資源群組
選擇在準備環境階段購買的Serverless資源群組。
資料去向
資料去向:StarRocks
資料來源名稱:
Doc_StarRocks_Storage_Compute_Tightly_01

任務配置。
配置資料來源與去向
模組
配置項
配置內容
資料來源
檔案路徑
/user_log.txt檔案類型
text資料行分隔符號
|No
完成以上資料來源配置後,單擊確認表資料結構。
資料去向
表
ods_raw_log_d_starrocks匯入前準備語句
本案例按
dt欄位動態分區,為避免節點重跑資料重複寫入,通過以下SQL語句實現每次同步前刪除已有目標資料分割。ALTER TABLE ods_user_info_d_starrocks DROP PARTITION IF EXISTS p${var} FORCE其中
${var}為變數參數,後續在調度設定階段為其賦值調度參數,以實現調度情境下的動態入參。StreamLoad請求參數
{ "row_delimiter": "\\x02", "column_separator": "\\x01" }
欄位對應。
單擊節點頁面工具列
,將任務配置方式從嚮導模式轉換為指令碼模式,完成HttpFile資料來源的欄位對應與StarRocks動態分區欄位dt的動態賦值。來源HttpFile端Column配置新增:
{ "type": "STRING", "value": "${var}" }ods_raw_log_d_starrocks節點完整指令碼樣本:{ "type": "job", "version": "2.0", "steps": [ { "stepType": "httpfile", "parameter": { "fileName": "/user_log.txt", "nullFormat": "", "compress": "", "requestMethod": "GET", "connectTimeoutSeconds": 60, "column": [ { "index": 0, "type": "STRING" }, { "type": "STRING", "value": "${var}" } ], "skipHeader": "false", "encoding": "UTF-8", "fieldDelimiter": "|", "fieldDelimiterOrigin": "|", "socketTimeoutSeconds": 3600, "envType": 0, "datasource": "user_behavior_analysis", "bufferByteSizeInKB": 1024, "fileFormat": "text" }, "name": "Reader", "category": "reader" }, { "stepType": "starrocks", "parameter": { "loadProps": { "row_delimiter": "\\x02", "column_separator": "\\x01" }, "envType": 0, "datasource": "Doc_StarRocks_Storage_Compute_Tightly_01", "column": [ "col", "dt" ], "tableComment": "", "table": "ods_raw_log_d_starrocks", "preSql": "ALTER TABLE ods_raw_log_d_starrocks DROP PARTITION IF EXISTS p${var} FORCE ; " }, "name": "Writer", "category": "writer" }, { "copies": 1, "parameter": { "nodes": [], "edges": [], "groups": [], "version": "2.0" }, "name": "Processor", "category": "processor" } ], "setting": { "errorLimit": { "record": "0" }, "locale": "zh", "speed": { "throttle": false, "concurrent": 2 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }調度設定。
配置頁面單擊右側調度配置,可進入調度配置面板配置調度與節點資訊。以下為配置的內容。
配置項
配置內容
圖示
調度參數
在調度參數項中單擊新增參數,添加:
參數名:var
參數值:$[yyyymmdd-1]

調度依賴
在調度依賴確認產出表已作為本節點輸出。
格式為
worksspacename.tablename
三、驗證同步資料
運行商務程序
進入商務程序面板。
雙擊商務程序下的使用者Portrait analysis_Starrocks版,即可進入該商務程序畫布。

運行商務程序。
在商務程序畫布,單擊工具列中的
表徵圖,將按照上下遊依賴關係運行Data Integration階段的商務程序。查看任務運行狀態。
節點處於
狀態,即代表同步執行過程無問題。查看任務執行日誌。
右鍵畫布中的
ods_user_info_d_starrocks節點、ods_raw_log_d_starrocks節點,選擇查看日誌,即可查看詳細的同步過程。
查看同步結果
建立臨時查詢檔案。
在資料開發頁面的左側導覽列,單擊
,進入臨時查詢面板。按右鍵臨時查詢,選擇。
查詢同步結果表。
--查詢語句中的分區列需要更新為業務日期。例如,任務啟動並執行日期為20240102,則業務日期為20240101,即任務運行日期的前一天。 SELECT * from ods_raw_log_d_starrocks where dt=業務日期; SELECT * from ods_user_info_d_starrocks where dt=業務日期;
後續步驟
現在,您已經完成了同步資料,您可以繼續下一個教程。在下一個教程中,您將學習將使用者基本資料資料、使用者網站訪問日誌資料在StarRocks中進行加工處理。詳情請參見加工資料。

