本文將介紹如何建立HttpFile和MySQL資料來源以訪問使用者資訊和網站日誌資料,配置資料同步鏈路將這些資料同步到在環境準備階段建立的OSS儲存中,並通過建立Spark外表解析OSS中儲存的資料。通過查詢驗證資料同步結果,確認是否完成整個資料同步操作。
前提條件
開始本文的操作前,請準備好需要使用的環境。詳情請參見準備環境。
一、建立資料來源
為確保後續資料處理流程的順利進行,您需要在DataWorks工作空間中建立如下資料來源,用於擷取平台提供的初始資料。
-
MySQL資料來源:本教程將資料來源命名為
user_behavior_analysis_mysql,用於擷取儲存在MySQL的使用者基本資料資料(ods_user_info_d)。 -
Httpfile資料來源:本教程將資料來源命名為
user_behavior_analysis_httpfile,用於擷取儲存在OSS的使用者網站訪問記錄(user_log.txt)。 -
OSS資料來源:用於儲存從MySQL資料源和Httpfile資料來源同步的使用者基本資料資料和使用者網站訪問記錄資料,以供後續Spark建立外部表格後進行讀取。
建立MySQL資料來源(user_behavior_analysis_mysql)
本教程提供的使用者基本資料儲存在MySQL資料庫中,您需要建立MySQL資料來源,以便在後續將MySQL資料庫中的使用者基本資料資料(ods_user_info_d)同步至準備環境階段建立的自有OSSObject Storage Service內。
進入資料來源頁面。
登入DataWorks控制台,切換至目標地區後,單擊左側導覽列的,在下拉框中選擇對應工作空間後單擊進入管理中心。
單擊左側導覽列的資料來源,進入資料來源頁面。
單擊新增資料來源,搜尋選擇資料來源類型為MySQL。
在建立MySQL資料來源頁面,配置相關參數。在本教程中開發環境和生產環境都按如下樣本值填寫。
以下為本教程所需配置的關鍵參數,未說明參數保持預設即可。
參數
描述
資料來源名稱
輸入資料來源名稱,本教程請填寫
user_behavior_analysis_mysql。資料來源描述
DataWorks案例體驗專用資料來源,在單表離線配置時讀取該資料來源即可訪問平台提供的測試資料,該資料來源只支援Data Integration情境讀取,其他模組不支援使用。
配置模式
選擇串連串模式。
串連地址
主機地址IP:
rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.com連接埠號碼:
3306
資料庫名稱
輸入資料庫名,本教程請填寫
workshop。使用者名稱
輸入使用者名稱,本教程請填寫
workshop。密碼
輸入密碼,本教程請填寫
workshop#2017。認證選項
無認證。
在串連配置地區,分別單擊生產環境和開發環境的測試連通性,確保連通狀態為可連通。
重要需確保資源群組已綁定至工作空間,並配置了公網訪問能力,否則後續資料同步時將會報錯。配置步驟請參見準備環境。
如果您無可選的資源群組,可參考連結配置地區的說明提示,單擊前往購買和綁定已購資源群組。
單擊完成建立。
建立HttpFile資料來源(user_behavior_analysis_httpfile)
本教程提供的使用者網站訪問記錄資料存放區在OSS中,您需要建立Httpfile資料來源,以便在後續將OSS中的使用者網站訪問記錄(user_log.txt)同步至準備環境階段建立的自有OSSObject Storage Service內。
在管理中心頁面,單擊左側導覽列的資料來源。
單擊新增資料來源,在新增資料來源對話方塊中,搜尋選擇資料來源類型為HttpFile。
在建立HttpFile資料來源頁面,配置相關參數。在本教程中開發環境和生產環境都按如下樣本值填寫。
以下為本教程所需配置的關鍵參數,未說明參數保持預設即可。
參數
描述
資料來源名稱
輸入資料來源名稱,本教程請填寫
user_behavior_analysis_httpfile。資料來源描述
DataWorks案例體驗專用資料來源,在單表離線配置時讀取該資料來源即可訪問平台提供的測試資料,該資料來源只支援Data Integration情境讀取,其他模組不支援使用。
URL網域名稱
開發環境和生產環境的URL網域名稱均配置為
https://dataworks-workshop-2024.oss-cn-shanghai.aliyuncs.com。在串連配置地區,分別單擊生產環境和開發環境的測試連通性,確保連通狀態為可連通。
重要需確保資源群組已綁定至工作空間,並配置了公網訪問能力,否則後續資料同步時將會報錯。配置步驟請參見準備環境。
如果您無可選的資源群組,可參考連結配置地區的說明提示,單擊前往購買和綁定已購資源群組。
單擊完成建立。
建立OSS資料來源
本案例將MySQL資料來源的使用者資訊和HttpFile資料來源的日誌資訊同步至OSS資料來源。
-
在管理中心頁面,單擊進入資料來源頁面後單擊新增資料來源。
-
在新增資料來源對話方塊中,搜尋選擇資料來源類型為OSS。
-
在建立OSS資料來源頁面,配置各項參數。在本教程中開發環境和生產環境都按樣本值填寫。
參數
描述
資料來源名稱
輸入資料來源的名稱,本樣本為test_g。
資料來源描述
對資料來源進行簡單描述。
訪問模式
選擇Access Key模式。
AccessKey ID
當前登入帳號的AccessKey ID,您可以進入AccessKey頁面複製AccessKey ID。
AccessKey Secret
輸入當前登入帳號的AccessKey Secret。
重要AccessKey Secret只在建立時顯示,不支援後續再次查看,請妥善保管。如果AccessKey泄露或丟失,請刪除並建立新的AccessKey。
Endpoint
輸入
http://oss-cn-shanghai-internal.aliyuncs.com。Bucket
您準備環境時準備的私人OSS Bucket的名稱,樣本為
dw-spark-demo。 -
單擊指定資源群組連通狀態(開發環境)和連通狀態(生產環境)列的測試連通性,等待介面提示測試完成,連通狀態為可連通。
說明需確保至少一個資源群組為可連通狀態,否則此資料來源無法使用嚮導模式建立同步任務。
-
單擊完成建立,建立OSS資料來源。
二、搭建同步鏈路
-
單擊左上方的
表徵圖,選擇。 -
在左側導覽列單擊
,在專案目錄地區,單擊
,選擇建立工作流程,設定工作流程名稱。本教程設定為User_profile_analysis_spark。 -
在工作流程編排頁面,從左側拖拽虛擬節點、Data Integration節點以及EMR SPARK SQL節點至畫布中,分別設定節點名稱。
本教程節點名稱樣本及作用如下:
節點類型
節點名稱
節點作用
虛擬節點workshop_start_spark用於統籌管理整個使用者Portrait analysis工作流程,例如工作流程內部節點的啟動時間。當工作流程較複雜時,可使資料流轉路徑更清晰。該節點為空白跑任務,無須編輯代碼。
單表離線節點ods_raw_log_d_2oss_spark用於將儲存於OSS的使用者網站訪問記錄同步至您建立的OSS中。
單表離線節點ods_user_info_d_2oss_spark用於將儲存於MySQL的使用者基本資料同步至您建立的OSS中。
EMR SPARK SQLods_raw_log_d_spark用於建立
ods_raw_log_d_spark外表,讀取儲存在OSS中的使用者網站訪問記錄。
EMR SPARK SQLods_user_info_d_spark用於建立
ods_user_info_d_spark外表,讀取儲存在OSS中的使用者基本資料。 -
手動拖拽連線,將
workShop_start_spark節點設定為兩個單表離線節點的上遊節點。最終效果如下: 工作流程調度配置。
在工作流程編排頁面右側單擊調度配置,配置相關參數。以下為本教程所需配置的關鍵參數,未說明參數保持預設即可。
調度配置參數
說明
調度參數
為整個工作流程設定調度參數,工作流程中的內部節點可直接使用。本教程配置為
bizdate=$[yyyymmdd-1],擷取前一天的日期。調度周期
本教程配置為
日。調度時間
本教程配置調度時間為
00:30,該工作流程會在每日00:30啟動。節點依賴配置
Workflow無上遊依賴,可不配置。為了方便統一管理,您可以單擊使用工作空間根節點,將工作流程掛載到工作空間根節點下。
工作空間根節點命名格式為:
工作空間名_root。
三、配置同步任務
配置初始節點
-
在工作流程編排頁面,滑鼠移至上方至
workshop_start_spark節點上,單擊開啟節點。 -
在
workshop_start_spark節點編輯頁面右側單擊調度配置,配置相關參數。以下為本樣本所需配置的關鍵參數,未說明的參數請保持預設值即可。調度配置參數
說明
調度資源群組
本教程配置為準備環境階段建立的Serverless資源群組。
節點依賴配置
由於
workshop_start_spark為初始節點,無上遊依賴,此時可以單擊使用工作空間根節點,由工作空間根節點觸發工作流程執行。工作空間根節點命名為:
工作空間名_root。
配置使用者日誌同步鏈路(ods_raw_log_d_2oss_spark)
離線ods_raw_log_d_2oss_spark節點實現將HttpFile資料來源內擷取的使用者日誌資訊,同步至私人OSS資料來源中。
-
在商務程序開發面板上,滑鼠懸浮在
ods_raw_log_d_2oss_spark節點上,單擊開啟節點按鈕,進入節點配置頁面。 -
配置網路與資源配置。
參數
描述
資料來源
-
資料來源:
HttpFile。 -
資料來源名稱:
user_behavior_analysis_httpfile。
我的資源群組
選擇準備環境節點購買的Serverless資源群組。
資料去向
-
資料去向:
OSS。 -
資料來源名稱:選擇前文建立的私人OSS資料來源,此處樣本為test_g。
重要若此處網路不通,請檢查是否為Serverless資源群組開通公網。
-
-
單擊下一步,配置同步任務。
-
配置資料來源與去向
以下為本樣本所需配置的關鍵參數,未說明的參數請保持預設值。
參數
描述
資料來源
-
檔案路徑:/user_log.txt。
-
文本類型:選擇text類型。
-
資料行分隔符號:輸入資料行分隔符號為|。
-
壓縮格式:選擇None。
-
是否跳過表頭:選擇No。
資料去向
-
文本類型:選擇text類型。
-
檔案名稱(含路徑):根據您自建OSS的目錄進行輸入,樣本為ods_raw_log_d/log_${bizdate}/log_${bizdate}.txt,其中ods_raw_log_d為您自建的目錄名,$bizdate表示擷取前一天的日期。
-
資料行分隔符號:輸入資料行分隔符號為|。
-
確認欄位對應及通道控制。
DataWorks通過配置源端與目標端欄位對應關係,實現源端指定欄位資料寫入目標端指定欄位,同時支援設定任務並發數、髒資料策略等。本教程髒資料策略配置為不容忍髒資料,其他配置保持預設。更多資訊,請參見嚮導模式配置。
-
-
配置調試參數。
在單表離線任務配置頁面右側單擊回合組態,配置以下參數,用於在步驟四調試運行中使用回合組態的相關參數測試回合。
配置項
配置說明
資源群組
選擇準備環境階段購買的Serverless資源群組。
指令碼參數
單擊添加參數,將bizdate賦值為
yyyymmdd格式(例如bizdate=20250223)。在調試時,Data Studio將會使用此常量替換任務中的定義的變數。 -
(可選)配置調度屬性。
本教程調度配置相關參數保持預設即可,您可以在節點編輯頁面右側單擊調度配置。調度配置中參數的詳細說明,詳情可參見節點調度配置。
-
調度參數:本教程已在工作流程調度參數中統一配置,工作流程內部節點無需配置,在任務或代碼中可直接使用。
-
調度策略:您可以在延時執行時間參數中指定子節點在工作流程執行後,延遲多久再執行,本教程不設定。
-
-
在頂部工具列單擊儲存,儲存當前節點。
配置使用者資料同步鏈路(ods_user_info_d_2oss_spark)
離線ods_user_info_d_2oss_Spark節點實現將MySQL資料來源內使用者資料資訊,同步至私人OSS資料來源中。
-
在工作流程編排頁面,滑鼠移至上方至
ods_user_info_d_2oss_Spark節點上,單擊開啟節點。 -
配置同步鏈路網路與資源。
參數
描述
資料來源
-
資料來源:
MySQL。 -
資料來源名稱:
user_behavior_analysis_mysql。
我的資源群組
選擇準備環境節點購買的Serverless資源群組。
資料去向
-
資料去向:
OSS。 -
資料來源名稱:選擇前文建立的私人OSS資料來源,此處樣本為
test_g。
重要若此處網路不通,請檢查是否為Serverless資源群組開通公網。
-
-
單擊下一步,配置同步任務。
-
配置資料來源與去向
以下為本樣本所需配置的關鍵參數,未說明的參數請保持預設值。
參數
描述
資料來源
-
表:選擇資料來源中的ods_user_info_d。
-
切分鍵:建議使用主鍵或有索引的列作為切分鍵,僅支援類型為整型的欄位。此處設定切分鍵為uid。
資料去向
-
文本類型:選擇text類型。
-
檔案名稱(含路徑):根據您自建OSS的目錄進行輸入,樣本為ods_user_info_d/user_${bizdate}/user_${bizdate}.txt。其中ods_user_info_d為您自建的目錄名,$bizdate表示擷取前一天的日期。
-
資料行分隔符號:輸入資料行分隔符號為|。
-
確認欄位對應及通道控制。
DataWorks通過配置源端與目標端欄位對應關係,實現源端指定欄位資料寫入目標端指定欄位,同時支援設定任務並發數、髒資料策略等。本教程髒資料策略配置為不容忍髒資料,其他配置保持預設。更多資訊,請參見嚮導模式配置。
-
-
配置調試參數。
在單表離線任務配置頁面右側單擊回合組態,配置以下參數,用於在步驟四調試運行中使用回合組態的相關參數測試回合。
配置項
配置說明
資源群組
選擇準備環境階段購買的Serverless資源群組。
指令碼參數
單擊添加參數,將bizdate賦值為
yyyymmdd格式(例如bizdate=20250223)。在調試時,Data Studio將會使用此常量替換任務中的定義的變數。 -
(可選)配置調度屬性。
本教程調度配置相關參數保持預設即可,您可以在節點編輯頁面右側單擊調度配置。調度配置中參數的詳細說明,詳情可參見節點調度配置。
-
調度參數:本教程已在工作流程調度參數中統一配置,工作流程內部節點無需配置,在任務或代碼中可直接使用。
-
調度策略:您可以在延時執行時間參數中指定子節點在工作流程執行後,延遲多久再執行,本教程不設定。
-
-
在頂部工具列單擊儲存,儲存當前節點。
四、同步資料
-
同步資料。
在Workflow畫布頂部工具列中,單擊運行,設定各節點定義的參數變數在本次運行中的取值(本教程使用
20250223,您可以按需修改),單擊確定後,等待運行完成。 -
查看同步結果。
在Workflow工作流程運行成功後,可登入OSSObject Storage Service,查看建立的OSS資料來源所在的Bucket內,
/ods_user_info_d和/ods_raw_log_d目錄下是否存在相應的目錄與資料。
五、解析資料
完成資料同步後,需使用Spark SQL建立外部表格並解析OSS中儲存的使用者基本資料資料和使用者網站訪問記錄資料。
建立日誌表(ods_raw_log_d_spark)並解析資料
資料通過離線整合任務同步至私人OSS資料來源後,基於產生的OSS檔案,寫入EMR SPARK SQL建立的外部表格ods_raw_log_d_spark。
-
在工作流程編排頁面,滑鼠移至上方至
ods_raw_log_d_spark節點上,單擊開啟節點。 -
編輯建表語句。
Paimon 表(DLF)
-- 1. 建立一個 Append-Only 的 Paimon 表 CREATE TABLE IF NOT EXISTS ods_raw_log_d_spark ( `col` STRING, `dt` STRING -- 將分區鍵也作為表的普通列,這是 Paimon 的推薦做法 ) PARTITIONED BY (dt) TBLPROPERTIES ( 'format' = 'paimon' ); -- 2. 建立一個臨時視圖,指向並解析 OSS 上的源檔案 CREATE TEMPORARY VIEW source_of_logs ( -- 視圖只有一列,用來讀取整行文本 `value` STRING ) USING TEXT OPTIONS ( path 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ods_raw_log_d/log_${bizdate}/' ); INSERT INTO ods_raw_log_d_spark SELECT value, -- 你的原始日誌行 '${bizdate}' AS dt -- 直接在 SELECT 語句中指定分區值 FROM source_of_logs;Hive 表(DLF-Legacy)
-- 情境:以下SQL為Spark SQL,通過EMR Spark SQL建立的外部表格ods_raw_log_d_spark,用LOCATION來擷取離線Data Integration任務寫入私人OSSObject Storage ServiceBucket的日誌資訊,並添加對應的dt分區。 -- 補充: -- DataWorks提供調度參數,可實現調度情境下,將每日增量資料寫入目標表對應的業務分區內。 -- 在實際開發情境下,您可通過${變數名}格式定義代碼變數,並在調度配置頁面通過變數賦值調度參數的方式,實現調度情境下代碼動態入參。 CREATE EXTERNAL TABLE IF NOT EXISTS ods_raw_log_d_spark ( `col` STRING ) PARTITIONED BY ( dt STRING ) LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ods_raw_log_d/log_${bizdate}/'; ALTER TABLE ods_raw_log_d_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}') LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ods_raw_log_d/log_${bizdate}/'說明上述代碼中的location地址,需根據您實際情況替換,其中
dw-spark-demo是您OSSObject Storage Service環境準備時建立的OSS Bucket名。 -
配置調試參數。
在EMR SPARK SQL任務配置頁面右側單擊回合組態,配置以下參數,用於在步驟四調試運行中使用回合組態的相關參數測試回合。
配置項
配置說明
計算資源
選擇準備環境階段綁定的Spark計算資源。
資源群組
選擇準備環境階段購買的Serverless資源群組。
指令碼參數
單擊添加參數,將bizdate賦值為
yyyymmdd格式(例如bizdate=20250223)。在調試時,Data Studio將會使用此常量替換任務中的定義的變數 -
(可選)配置調度屬性。
本教程調度配置相關參數保持預設即可,您可以在節點編輯頁面右側單擊調度配置。調度配置中參數的詳細說明,詳情可參見節點調度配置。
-
調度參數:本教程已在工作流程調度參數中統一配置,工作流程內部節點無需配置,在任務或代碼中可直接使用。
-
調度策略:您可以在延時執行時間參數中指定子節點在工作流程執行後,延遲多久再執行,本教程不設定。
-
-
在頂部工具列單擊儲存,儲存當前節點。
建立使用者表(ods_user_info_d_spark)並解析資料
資料通過離線整合任務同步至私人OSS資料來源後,基於產生的OSS檔案,寫入通過EMR SPARK SQL節點建立的外部表格ods_user_info_d_spark,
-
在工作流程編排頁面,滑鼠移至上方至
ods_user_info_d_spark節點上,單擊開啟節點。 -
配置同步鏈路網路與資源。
Paimon 表(DLF)
-- 1. 建立一個 Paimon 表作為目標 CREATE TABLE IF NOT EXISTS ods_user_info_d_spark ( `uid` STRING COMMENT '使用者ID', `gender` STRING COMMENT '性別', `age_range` STRING COMMENT '年齡段', `zodiac` STRING COMMENT '星座', `dt` STRING COMMENT '分區日期' ) PARTITIONED BY (dt) TBLPROPERTIES ( 'format' = 'paimon' ); -- 2. 建立一個臨時視圖,指向並解析 OSS 上的源檔案 CREATE TEMPORARY VIEW source_of_user_info ( -- 視圖只有一列,用來讀取整行文本 `value` STRING ) USING TEXT -- 告訴 Spark 這是純文字檔案 OPTIONS ( path 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ods_user_info_d/user_${bizdate}/' ); -- 3. 從臨時視圖中查詢、解析資料,並插入到 Paimon 表中 INSERT INTO ods_user_info_d_spark SELECT -- 使用 split 函數按'|'切分原始文本行 split(value, '\\|')[0] AS uid, split(value, '\\|')[1] AS gender, split(value, '\\|')[2] AS age_range, split(value, '\\|')[3] AS zodiac, '${bizdate}' AS dt -- 為分區欄位賦值 FROM source_of_user_info;Hive 表(DLF-Legacy)
-- 情境:以下SQL為Spark SQL,通過EMR Spark SQL節點建立的外部表格ods_user_info_d_spark,用LOCATION來擷取離線Data Integration任務寫入私人OSSObject Storage ServiceBucket的使用者資訊,並寫入對應的dt分區。 -- 補充: -- DataWorks提供調度參數,可實現調度情境下,將每日增量資料寫入目標表對應的業務分區內。 -- 在實際開發情境下,您可通過${變數名}格式定義代碼變數,並在調度配置頁面通過變數賦值調度參數的方式,實現調度情境下代碼動態入參。 CREATE EXTERNAL TABLE IF NOT EXISTS ods_user_info_d_spark ( `uid` STRING COMMENT '使用者ID' ,`gender` STRING COMMENT '性別' ,`age_range` STRING COMMENT '年齡段' ,`zodiac` STRING COMMENT '星座' ) PARTITIONED BY ( dt STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY'|' STORED AS TEXTFILE LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ods_user_info_d/user_${bizdate}/' ; ALTER TABLE ods_user_info_d_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}') LOCATION'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ods_user_info_d/user_${bizdate}/' ;說明上述代碼中的location地址,需根據您實際情況替換,其中
dw-spark-demo是您OSSObject Storage Service環境準備時建立的OSS Bucket名。 -
配置調試參數。
在EMR SPARK SQL任務配置頁面右側單擊回合組態,配置以下參數,用於在步驟四調試運行中使用回合組態的相關參數測試回合。
配置項
配置說明
計算資源
選擇準備環境階段綁定的Spark計算資源。
資源群組
選擇準備環境階段購買的Serverless資源群組。
指令碼參數
單擊添加參數,將bizdate賦值為
yyyymmdd格式(例如bizdate=20250223)。在調試時,Data Studio將會使用此常量替換任務中的定義的變數 -
(可選)配置調度屬性。
本教程調度配置相關參數保持預設即可,您可以在節點編輯頁面右側單擊調度配置。調度配置中參數的詳細說明,詳情可參見節點調度配置。
-
調度參數:本教程已在工作流程調度參數中統一配置,工作流程內部節點無需配置,在任務或代碼中可直接使用。
-
調度策略:您可以在延時執行時間參數中指定子節點在工作流程執行後,延遲多久再執行,本教程不設定。
-
-
在頂部工具列單擊儲存,儲存當前節點。
六、運行任務
同步資料。
在工作流程工具列中,單擊運行,設定各節點定義的參數變數在本次運行中的取值(本教程使用
20250223,您可以按需修改),單擊確定後,等待運行完成。-
查詢資料同步結果。
在確保該章節內的所有節點運行成功的情況下,編寫以下SQL查詢以檢查EMR SPARK SQL節點建立的外部表格是否正常產出。
-
驗證ods_raw_log_d_spark表結果。
-- 您需要將分區過濾條件更新為您當前操作的實際業務日期。例如,任務啟動並執行日期為20250223,則業務日期為20250222,即任務運行日期的前一天。 SELECT * FROM ods_raw_log_d_spark WHERE dt='${bizdate}';--查詢ods_raw_log_d_spark表 -
驗證ods_user_info_d_spark表結果。
-- 您需要將分區過濾條件更新為您當前操作的實際業務日期。例如,任務啟動並執行日期為20250223,則業務日期為20250222,即任務運行日期的前一天。 SELECT * FROM ods_user_info_d_spark WHERE dt='${bizdate}';--查詢ods_user_info_d_spark表
-
後續步驟
現在,您已經學習了如何進行日誌資料同步,完成資料的同步,您可以繼續下一個教程。在該教程中,您將學習如何對同步的資料進行計算與分析。詳情請參見加工資料。
常見問題
-
Q:建立表時,報錯
Option 'path' is not allowed for Normal Paimon table, please remove it in table options.A:語法錯誤,請按照本文的Paimon 表(DLF)的方式建立表。