本文為您介紹如何用Spark SQL建立外部使用者資訊表ods_user_info_d_spark以及日誌資訊表ods_raw_log_d_spark訪問儲存在私人OSS中的使用者與日誌資料,通過DataWorks的EMR Spark SQL節點進行加工得到目標使用者畫像資料,閱讀本文後,您可以瞭解如何通過Spark SQL來計算和分析已同步的資料,完成數倉簡單資料加工情境。
前提條件
開始本案例前,請先完成同步資料中的操作。
步驟一:搭建資料加工鏈路
在同步資料階段,已經成功將資料用Spark進行載入,接下來的流程的目標是對資料進行進一步加工,以輸出基本使用者畫像資料。
在Data Studio左側導覽列單擊
,然後在專案目錄地區找到已建立好的工作流程User_profile_analysis_Spark,單擊進入工作流程看板。單擊編輯工作流程,在工作流程開發頁面,從左側拖拽EMR Spark SQL節點至畫布中,分別設定節點名稱。
本教程節點名稱樣本及作用如下:
節點類型
節點名稱
節點作用
EMR Spark SQLdwd_log_info_di_spark通過Spark SQL對
ods_raw_log_d_spark表進行處理,將資料寫入dwd_log_info_di_spark表內。
EMR Spark SQLdws_user_info_all_di_spark利用明細日誌表
dwd_log_info_di_spark和使用者表ods_user_info_d_spark的uid欄位進行關聯,產生匯總使用者日誌表。對使用者基本資料表(
ods_user_info_d_spark)和初步加工後的日誌資料表(dwd_log_info_di_spark)進行匯總,將資料寫入dws_user_info_all_di_spark表中。
EMR Spark SQLads_user_info_1d_spark對
dws_user_info_all_di_spark表中資料進一步加工,將資料寫入ads_user_info_1d_spark表,產出基本使用者畫像。手動拖拽連線,配置各節點的上遊節點。最終效果如下:
在頂部工具列單擊儲存。
步驟二:配置資料加工節點
配置完成商務程序後,使用EMR Spark SQL節點對使用者基本資料表和明細日誌表進行處理,最終產生初步使用者畫像表ads_user_info_1d_spark。
配置dwd_log_info_di_spark節點
在本節點的範例程式碼中,利用Spark內建的函數處理上遊表ods_raw_log_d_spark欄位的SQL代碼,並將其寫入dwd_log_info_di_spark表中。
在Workflow畫布中,滑鼠移至上方至
dwd_log_info_di_spark節點上,單擊開啟節點。將如下代碼粘貼至SQL編輯頁面。
-- 情境:以下SQL為Spark SQL,通過Spark SQL函數將載入至Spark中的ods_raw_log_d_spark按"##@@"進行切分後產生多個欄位,並寫入新表dwd_log_info_di_spark。 -- 補充: -- DataWorks提供調度參數,可實現調度情境下,將每日增量資料寫入目標表對應的業務分區內。 -- 在實際開發情境下,您可通過${變數名}格式定義代碼變數,並在調度配置頁面通過變數賦值調度參數的方式,實現調度情境下代碼動態入參。 CREATE TABLE IF NOT EXISTS dwd_log_info_di_spark ( ip STRING COMMENT 'ip地址', uid STRING COMMENT '使用者ID', tm STRING COMMENT '時間yyyymmddhh:mi:ss', status STRING COMMENT '伺服器返回狀態代碼', bytes STRING COMMENT '返回給用戶端的位元組數', method STRING COMMENT'要求方法', url STRING COMMENT 'url', protocol STRING COMMENT '協議', referer STRING, device STRING, identity STRING ) PARTITIONED BY (dt STRING) LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/dwd_log_info_di_spark/log_${bizdate}/'; ALTER TABLE dwd_log_info_di_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}'); INSERT OVERWRITE TABLE dwd_log_info_di_spark PARTITION (dt = '${bizdate}') SELECT ip, uid, tm, status, bytes, regexp_extract(request, '(^[^ ]+) .*', 1) AS method, regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) AS url, regexp_extract(request, '.* ([^ ]+$)', 1) AS protocol, regexp_extract(referer, '^[^/]+://([^/]+){1}', 1) AS referer, CASE WHEN lower(agent) RLIKE 'android' THEN 'android' WHEN lower(agent) RLIKE 'iphone' THEN 'iphone' WHEN lower(agent) RLIKE 'ipad' THEN 'ipad' WHEN lower(agent) RLIKE 'macintosh' THEN 'macintosh' WHEN lower(agent) RLIKE 'windows phone' THEN 'windows_phone' WHEN lower(agent) RLIKE 'windows' THEN 'windows_pc' ELSE 'unknown' END AS device, CASE WHEN lower(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler' WHEN lower(agent) RLIKE 'feed' OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) RLIKE 'feed' THEN 'feed' WHEN lower(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)' AND agent RLIKE '^(Mozilla|Opera)' AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) NOT RLIKE 'feed' THEN 'user' ELSE 'unknown' END AS identity FROM ( SELECT SPLIT(col, '##@@')[0] AS ip, SPLIT(col, '##@@')[1] AS uid, SPLIT(col, '##@@')[2] AS tm, SPLIT(col, '##@@')[3] AS request, SPLIT(col, '##@@')[4] AS status, SPLIT(col, '##@@')[5] AS bytes, SPLIT(col, '##@@')[6] AS referer, SPLIT(col, '##@@')[7] AS agent FROM ods_raw_log_d_spark WHERE dt = '${bizdate}' ) a;說明上述代碼中的location地址,需根據您實際情況替換,其中
dw-spark-demo是您OSSObject Storage Service環境準備時建立的OSS Bucket名。在EMR Spark SQL編輯頁面右側單擊調試配置,配置以下參數,用於在步驟四調試運行中使用調試配置的相關參數測試回合。
配置項
配置說明
計算資源
選擇準備環境階段綁定的Spark計算資源。
資源群組
選擇準備環境階段購買的Serverless資源群組。
指令碼參數
單擊添加參數,將bizdate賦值為
yyyymmdd格式(例如bizdate=20250223)。在調試時,Data Studio將會使用此常量替換任務中的定義的變數。(可選)配置調度屬性。
本教程調度配置相關參數保持預設即可,您可以在節點編輯頁面右側單擊調度配置。調度配置中參數的詳細說明,詳情可參見節點調度。
調度參數:本教程已在工作流程調度參數中統一配置,工作流程內部節點無需配置,在任務或代碼中可直接使用。
調度策略:您可以在延時執行時間參數中指定子節點在工作流程執行後,延遲多久再執行,本教程不設定。
在頂部工具列單擊儲存,儲存工作流程。
配置dws_user_info_all_di_spark節點
本節點將使用者基本資料表(ods_user_info_d_spark)和初步加工後的日誌資料表(dwd_log_info_di_spark)進行匯總,並將結果寫入dws_user_info_all_di_spark表中。
在Workflow畫布中,滑鼠移至上方至
dws_user_info_all_di_spark節點上,單擊開啟節點。將如下代碼粘貼至SQL編輯頁面。
-- 情境:以下SQL為Spark SQL,通過uid將dwd_log_info_di_spark和ods_user_info_d_spark進行關聯,並寫入對應的dt分區。 -- 補充: -- DataWorks提供調度參數,可實現調度情境下,將每日增量資料寫入目標表對應的業務分區內。 -- 在實際開發情境下,您可通過${變數名}格式定義代碼變數,並在調度配置頁面通過變數賦值調度參數的方式,實現調度情境下代碼動態入參。 CREATE TABLE IF NOT EXISTS dws_user_info_all_di_spark ( uid STRING COMMENT '使用者ID', gender STRING COMMENT '性別', age_range STRING COMMENT '年齡段', zodiac STRING COMMENT '星座', device STRING COMMENT '終端類型 ', method STRING COMMENT 'http請求類型', url STRING COMMENT 'url', `time` STRING COMMENT '時間yyyymmddhh:mi:ss' ) PARTITIONED BY (dt STRING) LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/dws_user_info_all_di_spark/log_${bizdate}/'; --添加分區 ALTER TABLE dws_user_info_all_di_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}'); --插入user表與日誌表資料 INSERT OVERWRITE TABLE dws_user_info_all_di_spark PARTITION (dt = '${bizdate}') SELECT COALESCE(a.uid, b.uid) AS uid, b.gender AS gender, b.age_range AS age_range, b.zodiac AS zodiac, a.device AS device, a.method AS method, a.url AS url, a.tm FROM ( SELECT * FROM dwd_log_info_di_spark WHERE dt='${bizdate}' ) a LEFT OUTER JOIN ( SELECT * FROM ods_user_info_d_spark WHERE dt='${bizdate}' ) b ON a.uid = b.uid;說明上述代碼中的location地址,需根據您實際情況替換,其中
dw-spark-demo是您OSSObject Storage Service環境準備時建立的OSS Bucket名。在EMR Spark SQL編輯頁面右側單擊調試配置,配置以下參數,用於在步驟四調試運行中使用調試配置的相關參數測試回合。
配置項
配置說明
計算資源
選擇準備環境階段綁定的Spark計算資源。
資源群組
選擇準備環境階段購買的Serverless資源群組。
指令碼參數
單擊添加參數,將bizdate賦值為
yyyymmdd格式(例如bizdate=20250223)。在調試時,Data Studio將會使用此常量替換任務中的定義的變數。(可選)配置調度屬性。
本教程調度配置相關參數保持預設即可,您可以在節點編輯頁面右側單擊調度配置。調度配置中參數的詳細說明,詳情可參見節點調度。
調度參數:本教程已在工作流程調度參數中統一配置,工作流程內部節點無需配置,在任務或代碼中可直接使用。
調度策略:您可以在延時執行時間參數中指定子節點在工作流程執行後,延遲多久再執行,本教程不設定。
在頂部工具列單擊儲存,儲存工作流程。
配置ads_user_info_1d_spark節點
本節點對dws_user_info_all_di_spark表中資料進一步加工,將資料寫入ads_user_info_1d_spark表,產出基本使用者畫像。
在Workflow畫布中,滑鼠移至上方至
ads_user_info_1d_spark節點上,單擊開啟節點。將如下代碼粘貼至SQL編輯頁面。
-- 情境:以下SQL為Spark SQL,通過Spark SQL函數將Spark中的dws_user_info_all_di_spark表進一步的加工,並寫入新表ads_user_info_1d_spark。 -- 補充: -- DataWorks提供調度參數,可實現調度情境下,將每日增量資料寫入目標表對應的業務分區內。 -- 在實際開發情境下,您可通過${變數名}格式定義代碼變數,並在調度配置頁面通過變數賦值調度參數的方式,實現調度情境下代碼動態入參。 CREATE TABLE IF NOT EXISTS ads_user_info_1d_spark ( uid STRING COMMENT '使用者ID', device STRING COMMENT '終端類型 ', pv BIGINT COMMENT 'pv', gender STRING COMMENT '性別', age_range STRING COMMENT '年齡段', zodiac STRING COMMENT '星座' ) PARTITIONED BY ( dt STRING ) LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ads_user_info_1d_spark/log_${bizdate}/'; ALTER TABLE ads_user_info_1d_spark ADD IF NOT EXISTS PARTITION (dt='${bizdate}'); INSERT OVERWRITE TABLE ads_user_info_1d_spark PARTITION (dt='${bizdate}') SELECT uid , MAX(device) , COUNT(0) AS pv , MAX(gender) , MAX(age_range) , MAX(zodiac) FROM dws_user_info_all_di_spark WHERE dt = '${bizdate}' GROUP BY uid;說明上述代碼中的location地址,需根據您實際情況替換,其中
dw-spark-demo是您OSSObject Storage Service環境準備時建立的OSS Bucket名。在EMR Spark SQL編輯頁面右側單擊調試配置,配置以下參數,用於在步驟四調試運行中使用調試配置的相關參數測試回合。
配置項
配置說明
計算資源
選擇準備環境階段綁定的Spark計算資源。
資源群組
選擇準備環境階段購買的Serverless資源群組。
指令碼參數
單擊添加參數,將bizdate賦值為
yyyymmdd格式(例如bizdate=20250223)。在調試時,Data Studio將會使用此常量替換任務中的定義的變數。(可選)配置調度屬性。
本教程調度配置相關參數保持預設即可,您可以在節點編輯頁面右側單擊調度配置。調度配置中參數的詳細說明,詳情可參見節點調度。
調度參數:本教程已在工作流程調度參數中統一配置,工作流程內部節點無需配置,在任務或代碼中可直接使用。
調度策略:您可以在延時執行時間參數中指定子節點在工作流程執行後,延遲多久再執行,本教程不設定。
在頂部工具列單擊儲存,儲存工作流程。
步驟三:加工資料
同步資料。
在工作流程工具列中,單擊運行,設定各節點定義的參數變數在本次運行中的取值(本教程使用
20250223,您可以按需修改),單擊確定後,等待運行完成。查詢資料加工結果。
進入SQL查詢頁面。
登入DataWorks控制台,切換至目標地區後,單擊左側導覽列的,單擊進入資料分析頁面,單擊左側導覽列的SQL查詢進入SQL查詢頁面。
配置SQL查詢檔案。
單擊我的檔案後的
按鈕建立檔案,自訂SQL查詢檔案名稱。單擊已建立的檔案,進入檔案編輯頁面。
在檔案編輯頁面單擊右上方的
按鈕,配置需進行SQL查詢的工作空間等資訊,配置詳情如下:配置項
說明
工作空間
選擇
User_profile_analysis_Spark工作流程所在的工作空間。資料來源類型
下拉選擇
EMR Spark SQL。資料來源名稱
選擇在準備環境時綁定的EMR Serverless Spark為計算資源開發環境。
SQL Compute
選擇EMR Serverless Spark中建立的SQL會話。
單擊確認按鈕,完成查詢資料來源的配置。
編輯查詢SQL。
在確保該章節內的所有節點運行成功的情況下,編寫以下SQL查詢以檢查EMR Spark SQL節點建立的外部表格是否正常產出。
-- 您需要將分區過濾條件更新為您當前操作的實際業務日期。例如,任務啟動並執行日期為20250223,則業務日期為20250222,即任務運行日期的前一天。 SELECT * FROM dwd_log_info_di_spark WHERE dt ='業務日期';
步驟四:在生產環境運行任務
任務發布後,在次日才會產生執行個體運行,您可以通過補資料來對發行流程進行補資料操作,以便查看任務在生產環境是否可以運行,詳情可參見執行補資料並查看補資料執行個體(新版)。
任務發布成功後,單擊右上方的營運中心。
您也可以單擊左上方的
表徵圖,選擇。單擊左側導覽列中的,進入周期任務頁面,單擊
workshop_start_spark虛節點。在右側的DAG圖中,按右鍵
workshop_start_spark節點,選擇。勾選需要補資料的任務,設定業務日期,單擊提交並跳轉。
在補資料頁面單擊重新整理,直至SQL任務全部運行成功即可。
後續步驟
資料視覺效果展現:使用者Portrait analysis完成後,使用資料分析模組,將加工後的資料以圖表形式直觀展示,便於您快速提取關鍵資訊,洞察資料背後的業務趨勢。
監控資料品質:為資料加工產生的表配置資料品質監控,提前識別髒資料並進行攔截,避免髒資料影響擴大。
管理資料:使用者Portrait analysis任務流程完成後,在EMR Serverless Spark內將建立對應資料表。產生的資料表可在資料地圖模組進行查看,可通過血緣查看產生表之間的關係。
API資料服務:擷取最終加工後的資料後,使用資料服務模組,通過標準化的資料服務介面,實現資料的共用與應用,為其他使用API接收資料的業務模組提供資料。