本教程以MySQL資料來源中的使用者基本資料ods_user_info_d表和HttpFile中的網站訪問日誌資料user_log.txt檔案為例,通過Data Integration離線同步任務分別同步至私人OSS中,再通過Spark SQL建立外部表格來訪問私人OSS資料存放區。本章節旨在完成資料同步操作。
章節目標
本章節通過Data Integration將平台提供的MySQL資料來源內的使用者基本資料資料與HttpFile資料來源內的使用者網站訪問日誌資料同步至私人OSSObject Storage Service建立的資料來源中。
源端資料來源類型
源端待同步資料
源端表結構
目標端資料來源類型
MySQL
表:ods_user_info_d
使用者基本資料資料
uid 使用者名稱
gender 性別
age_range 年齡分段
zodiac 星座
OSS
HttpFile
檔案:user_log.txt
使用者網站訪問日誌資料
一行為一條使用者訪問記錄。
$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent"$http_referer" "$http_user_agent" [unknown_content];OSS
完成同步任務後,通過EMR Spark SQL建立外部表格來訪問私人OSS資料存放區。
一、設計商務程序
本步驟內,將Data Integration節點以及EMR Spark SQL 節點相結合,形成使用者Portrait analysis任務流程中擷取資料部分的流程。主要是通過ods_raw_log_d_2oss_spark節點從HttpFile資料來源擷取日誌資料至私人OSS資料來源中,再通過ods_raw_log_d_spark節點產生簡單的日誌外部表格,從私人OSS資料存放區中擷取使用者日誌資料。以及通過ods_user_info_d_2oss_spark從MySQL資料來源同步處理的使用者基本資料至私人OSS資料來源中,再通過ods_user_info_d_spark實現外部表格的建立,從私人OSS資料存放區中擷取使用者基本資料資料。
登入DataWorks控制台,切換至目標地區後,單擊左側導覽列的,在下拉框中選擇對應工作空間後單擊進入資料開發。
設計商務程序
建立商務程序。
資料開發需基於商務程序使用對應的開發組件進行具體開發操作。在建立節點之前,您需要先建立商務程序。具體操作方法可參見建立商務程序。
該商務程序的命名為:
使用者Portrait analysis_Spark。
設計商務程序。
商務程序建立完成後,將自動延伸該商務程序畫布。請根據商務程序設計,在商務程序畫布中單擊建立節點,通過將節點群組件拖拽至商務程序畫布,並通過拉線設定節點上下遊依賴的方式,設計資料同步階段的商務程序。

在本教程中,由於虛擬節點和同步節點之間並無血緣關係,因此我們通過商務程序拉線的方式來設定節點的依賴關係。有關更多依賴關係設定方式的詳細資料,詳情請參見調度依賴配置指引。以下為各個節點的類型、命名以及作用的介紹。
節點分類
節點類型
節點命名
節點作用
通用
虛擬節點workshop_start_spark用於統籌管理整個使用者Portrait analysis商務程序,例如商務程序起調時間。當商務程序較複雜時,可使資料流轉路徑更清晰。該節點為空白跑任務,無須編輯代碼。
Data Integration
離線同步ods_raw_log_d_2oss_spark用於將HttpFile資料來源儲存的使用者網站訪問記錄,通過離線同步的方式同步至私人OSS資料來源中,以供後續Spark SQL擷取。
Data Integration
離線同步ods_user_info_d_2oss_spark用於將MySQL資料來源儲存的使用者基本資料資料,通過離線同步的方式同步至私人OSS資料來源中,以供後續Spark SQL擷取。
EMR
EMR Spark SQLods_raw_log_d_spark在EMR Spark SQL節點中,建立表
ods_raw_log_d_spark,並通過該外部表格訪問私人OSS中的使用者網站訪問記錄資料。EMR
EMR Spark SQLods_user_info_d_spark在EMR Spark SQL節點中,建立表
ods_user_info_d_spark,並通過該外部表格訪問私人OSS中的使用者基本資料資料。
配置調度邏輯
本案例通過虛擬節點workshop_start_Spark控制整個商務程序每天00:30調度執行,以下為虛擬節點關鍵調度配置,其他節點調度無須變更,實現邏輯詳情請參見:情境:如何配置商務程序定時時間。其他調度配置相關說明,請參見:任務調度屬性配置概述。
調度配置 | 圖片樣本 | 說明 |
調度時間配置 |
| 虛擬節點配置調度時間為00:30,該虛擬節點會在每日00:30調起當前商務程序並執行。 |
調度依賴配置 |
| 由於虛擬節點 |
DataWorks中的所有節點都需要依賴於上遊節點,資料同步階段的所有任務都以虛擬節點workshop_start_spark為依賴,通過workshop_start_spark節點來觸發資料同步商務程序的執行。
二、搭建同步鏈路
配置完成商務程序後,分別雙擊ods_user_info_d_2oss_spark以及ods_raw_log_d_2oss_sparkData Integration節點,配置使用者資料同步至私人OSS和配置使用者日誌同步至私人OSS,並且通過ods_raw_log_d_spark和ods_user_info_d_spark採用 Spark SQL代碼,實現通過Spark SQL建立的外表來訪問儲存於私人OSS的資料。
使用者資料與日誌同步至OSS資料來源
使用Data Integration將平台提供的使用者資料與使用者日誌同步至私人OSSObject Storage Service的Bucket目錄下。
配置使用者日誌同步至OSS
通過離線Data Integration任務,實現從平台的HttpFile資料來源內的擷取使用者日誌資訊,同步至私人OSS資料來源中。
同步HttpFile資料來源的日誌資訊至自建的OSS。
在資料開發頁面,雙擊ods_raw_log_d_2oss_spark節點,進入節點配置頁面。
配置同步網路連結。
完成以下網路與資源配置後,單擊下一步,並根據介面提示完成連通性測試。
參數
描述
資料來源
資料來源:HttpFile。
資料來源名稱:user_behavior_analysis_httpfile。
我的資源群組
選擇已購買的Serverless資源群組。
資料去向
資料去向:OSS。
資料來源名稱:選擇前文建立的私人OSS資料來源,此處樣本為test_g。
配置同步任務。
參數
描述
資料來源
檔案路徑:/user_log.txt。
文本類型:選擇text類型。
資料行分隔符號:輸入資料行分隔符號為|。
壓縮格式:包括None、Gzip、Bzip2和Zip四種類型,此處選擇None。
是否跳過表頭:選擇No。
資料去向
文本類型:選擇text類型。
檔案名稱(含路徑):根據您自建OSS的目錄進行輸入,樣本為ods_raw_log_d/log_${bizdate}/log_${bizdate}.txt,其中ods_raw_log_d為您自建的目錄名,$bizdate表示擷取前一天的日期。
資料行分隔符號:輸入資料行分隔符號為|。
調度設定。
配置頁面單擊右側調度配置,可進入調度配置面板配置調度與節點資訊。以下為配置的內容。
說明DataWorks提供調度參數,可實現在調度情境下,將每日資料寫入不同的OSS路徑及檔案下,並以業務日期對路徑目錄與檔案進行命名。
在實際情境下,您可以在資料去向的檔案名稱(含目錄)配置中通過
${變數名}格式自訂路徑中的變數,並通過在調度配置頁面為變數賦值調度參數的方式,實現調度情境下動態產生資料去向目錄與檔案名稱。配置項
配置內容
圖示
調度參數
在調度參數項中單擊新增參數,添加:
參數名:
bizdate參數值:
$[yyyymmdd-1]
詳情可參見:配置調度參數。

調度依賴
在調度依賴確認產出表已作為本節點輸出。
格式為
workspacename.節點名。詳情可參見:配置調度依賴。

配置完成後,單擊工具列中的
表徵圖,進行儲存。
配置使用者資料同步至OSS
通過離線Data Integration任務,實現從平台的MySQL資料來源內的擷取使用者資料資訊,同步至私人OSS資料來源中。
在資料開發頁面,雙擊ods_user_info_d_2oss_spark節點,進入節點配置頁面。
配置同步網路連結。
完成以下網路與資源配置後,單擊下一步,並根據介面提示完成連通性測試。
參數
描述
資料來源
資料來源:MySQL。
資料來源名稱:user_behavior_analysis_mysql。
我的資源群組
選擇已購買的Serverless資源群組。
資料去向
資料去向:OSS。
資料來源名稱:選擇前文建立的私人OSS資料來源,此處樣本為test_g。
配置同步任務。
參數
描述
資料來源
表:選擇資料來源中的ods_user_info_d。
切分鍵:建議使用主鍵或有索引的列作為切分鍵,僅支援類型為整型的欄位。此處設定切分鍵為uid。
資料去向
文本類型:選擇text類型。
檔案名稱(含路徑):根據您自建OSS的目錄進行輸入,樣本為ods_user_info_d/user_${bizdate}/user_${bizdate}.txt。其中ods_user_info_d為您自建的目錄名,$bizdate表示擷取前一天的日期。
資料行分隔符號:輸入資料行分隔符號為|。
調度設定
配置頁面單擊右側調度配置,可進入調度配置面板配置調度與節點資訊。以下為配置的內容。
說明DataWorks提供調度參數,可實現在調度情境下,將每日資料寫入不同的OSS路徑及檔案下,並以業務日期對路徑目錄與檔案進行命名。
在實際情境下,您可以在資料去向的檔案名稱(含目錄)配置中通過
${變數名}格式自訂路徑中的變數,並通過在調度配置頁面通過為變數賦值調度參數的方式,實現調度情境下動態產生資料去向目錄與檔案名稱。配置項
配置內容
圖示
調度參數
在調度參數項中單擊新增參數,添加:
參數名:
bizdate參數值:
$[yyyymmdd-1]
詳情可參見:配置調度參數。

調度依賴
在調度依賴確認產出表已作為本節點輸出。
格式為
workspacename.節點名。詳情可參見:配置調度依賴。

配置完成後,單擊工具列中的
表徵圖。
建立Spark外部表格載入OSS資料
資料通過離線整合任務同步至私人OSS資料來源後,基於產生的OSS檔案,通過Spark SQL的create文法建立ods_raw_log_d_spark與ods_user_info_d_spark表,並通過LOCATION來擷取OSS中的使用者資訊檔、使用者日誌資訊以供後續資料加工使用。
配置ods_raw_log_d_spark節點
基於通過EMR Spark SQL建立的外部表格ods_raw_log_d_spark,用LOCATION來訪問離線Data Integration任務寫入私人OSSObject Storage ServiceBucket的日誌資訊。
代碼配置。
-- 情境:以下SQL為Spark SQL,通過EMR Spark SQL建立的外部表格ods_raw_log_d_spark,用LOCATION來擷取離線Data Integration任務寫入私人OSSObject Storage ServiceBucket的日誌資訊,並添加對應的dt分區。 -- 補充: -- DataWorks提供調度參數,可實現調度情境下,將每日增量資料寫入目標表對應的業務分區內。 -- 在實際開發情境下,您可通過${變數名}格式定義代碼變數,並在調度配置頁面通過變數賦值調度參數的方式,實現調度情境下代碼動態入參。 CREATE EXTERNAL TABLE IF NOT EXISTS ods_raw_log_d_spark ( `col` STRING ) PARTITIONED BY ( dt STRING ) LOCATION 'oss://dw-emr-demo/ods_raw_log_d/log_${bizdate}/'; ALTER TABLE ods_raw_log_d_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}') LOCATION 'oss://dw-emr-demo/ods_raw_log_d/log_${bizdate}/' ;說明上述代碼中的location為樣本路徑,與之前配置離線同步任務時的資料去向相同,需要輸入您建立的檔案路徑名稱,其中dw-emr-demo是您準備環境時建立的OSS Bucket網域名稱。
配置調度配置。
為
ods_raw_log_d_spark節點配置任務調度,通過配置的調度參數來擷取對應業務日期的私人OSS記錄檔,並寫入同樣業務日期分區的Spark表內。配置項
配置內容
圖示
調度參數
在調度參數項中單擊新增參數,添加:
參數名:
bizdate參數值:
$[yyyymmdd-1],詳情可參見:配置調度參數。
調度依賴
在調度依賴確認產出表已作為本節點輸出。
格式為
workspacename.節點名。詳情可參見:配置調度依賴。
說明本章節在SQL中配置了調度參數
${bizdate},並將其賦值為T-1。在離線計算情境下bizdate為業務交易發生的日期,也常被稱為業務日期。例如,今天統計前一天的營業額,此處的前一天指的是交易發生的日期,也就是業務日期。完成配置後,單擊
儲存節點。
配置ods_user_info_d_spark節點
基於通過EMR Spark SQL節點建立的外部表格ods_user_info_d_spark,用LOCATION來訪問離線Data Integration任務寫入私人OSSObject Storage ServiceBucket的使用者資訊。
代碼配置。
-- 情境:以下SQL為Spark SQL,通過EMR Spark SQL節點建立的外部表格ods_user_info_d_spark,用LOCATION來擷取離線Data Integration任務寫入私人OSSObject Storage ServiceBucket的使用者資訊,並寫入對應的dt分區。 -- 補充: -- DataWorks提供調度參數,可實現調度情境下,將每日增量資料寫入目標表對應的業務分區內。 -- 在實際開發情境下,您可通過${變數名}格式定義代碼變數,並在調度配置頁面通過變數賦值調度參數的方式,實現調度情境下代碼動態入參。 CREATE EXTERNAL TABLE IF NOT EXISTS ods_user_info_d_spark ( `uid` STRING COMMENT '使用者ID' ,`gender` STRING COMMENT '性別' ,`age_range` STRING COMMENT '年齡段' ,`zodiac` STRING COMMENT '星座' ) PARTITIONED BY ( dt STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY'|' STORED AS TEXTFILE LOCATION 'oss://dw-emr-demo/ods_user_info_d/user_${bizdate}/' ; ALTER TABLE ods_user_info_d_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}') LOCATION'oss://dw-emr-demo/ods_user_info_d/user_${bizdate}/' ;說明上述代碼中的location為樣本路徑,與之前配置離線同步任務時的資料去向相同,需要輸入您建立的檔案路徑名稱,其中dw-emr-demo是您準備環境時建立的OSS Bucket網域名稱。
配置調度配置。
為
ods_user_info_d_spark節點配置任務調度,通過配置的調度參數來擷取對應業務日期的私人OSS使用者資訊檔,並寫入同樣業務日期分區的Spark表內。配置項
配置內容
圖示
調度參數
在調度參數項中單擊新增參數,添加:
參數名:
bizdate參數值:
$[yyyymmdd-1],詳情可參見:配置調度參數。
調度依賴
在調度依賴確認產出表已作為本節點輸出。
格式為
worksspacename.節點名詳情可參見:配置調度依賴。

完成配置後,單擊
儲存節點。
三、驗證同步資料
在確保該章節內的所有節點運行成功的情況下,在左側導覽列的臨時查詢中建立EMR Spark SQL臨時查詢,編寫SQL查看EMR Spark SQL節點建立的外部表格是否正常產出。
-- 您需要將分區過濾條件更新為您當前操作的實際業務日期。例如,任務啟動並執行日期為20240808,則業務日期為20240807,即任務運行日期的前一天。
SELECT * FROM ods_raw_log_d_spark WHERE dt ='業務日期';--查詢ods_raw_log_d_spark表
SELECT * FROM ods_user_info_d_spark WHERE dt ='業務日期';--查詢ods_user_info_d_spark表在驗證同步資料的SQL中,可將WHERE條件替換為"dt = ${bizdate}",在臨時查詢任務中單擊
帶參運行,為SQL預留位置${bizdate}賦值後運行即可。
後續步驟
現在,您已經完成了同步資料,您可以繼續下一個教程。在下一個教程中,您將學習將使用者基本資料資料、使用者網站訪問日誌資料在Spark中進行加工處理。詳情請參見加工資料。

