本文為您介紹如何使用DataWorks中的EMR Hive節點,對同步至OSS的使用者資訊表(ods_user_info_d_emr)及訪問日誌資料表(ods_raw_log_d_emr)中的資料進行加工,進而得到目標使用者畫像資料。
前提條件
開始本實驗前,請首先完成同步資料中的操作。
步驟一:設計商務程序
商務程序節點間依賴關係的配置請參見同步資料。
雙擊建立的商務程序開啟編輯頁面,按一下滑鼠建立節點,選擇EMR Hive並拖拽至右側的編輯頁面。在建立節點對話方塊中,輸入節點名稱,單擊確認。
此處需要建立3個EMR Hive節點,依次命名為dwd_log_info_di_emr、dws_user_info_all_di_emr和ads_user_info_1d_emr,並配置如下圖所示的依賴關係。
dwd_log_info_di_emr:用於對原始OSS日誌資料進行清洗。
dws_user_info_all_di_emr:用於將清洗後的日誌資料和使用者基本資料資料進行匯總。
ads_user_info_1d_emr:用於產生終端使用者畫像資料。

步驟二:建立函數
根據同步的原始日誌資料格式,我們需要通過函數等方式將其拆解為目標格式。本案例已為您提供用於將IP解析為地區的函數程式碼封裝。您可以將程式碼封裝下載至本地,並在DataWorks註冊為函數後,即可調用該函數。
上傳資源
在資料開發頁面開啟WorkShop商務程序,按右鍵EMR,選擇,配置建立資源參數,然後單擊建立。

關鍵參數配置如下:
儲存路徑:選擇準備環境中建立EMR叢集配置的OSS Bucket。
上傳檔案:選擇已下載ip2region-emr.jar檔案。
其他參數保持預設或根據實際情況配置。
單擊工具列
,將資源提交至開發環境對應的EMR引擎專案。
註冊函數
在資料開發頁面開啟商務程序,按右鍵EMR,選擇建立函數。
在建立函數對話方塊中,函數名稱輸入getregion,單擊建立,配置函數資訊。

關鍵參數配置如下:
所屬資源:選擇ip2region-emr.jar。
類名:輸入org.alidata.emr.udf.Ip2Region。
其他參數保持預設或根據實際情況配置。
單擊工具列
,將函數提交至開發環境對應的EMR引擎專案。
步驟三:配置EMR Hive節點
建立dwd_log_info_di_emr節點
1. 編輯代碼
雙擊dwd_log_info_di_emr節點,進入節點配置頁面。在節點編輯頁面,編寫如下語句。
如果您工作空間的資料開發中綁定多個EMR引擎,請按需選擇EMR引擎。如果僅綁定一個EMR引擎,則無需選擇。
--建立ODS層表
CREATE TABLE IF NOT EXISTS dwd_log_info_di_emr (
ip STRING COMMENT 'ip地址',
uid STRING COMMENT '使用者ID',
`time` STRING COMMENT '時間yyyymmddhh:mi:ss',
status STRING COMMENT '伺服器返回狀態代碼',
bytes STRING COMMENT '返回給用戶端的位元組數',
region STRING COMMENT '地區,根據ip得到',
method STRING COMMENT 'http請求類型',
url STRING COMMENT 'url',
protocol STRING COMMENT 'http協議版本號碼',
referer STRING COMMENT '來源url',
device STRING COMMENT '終端類型 ',
identity STRING COMMENT '訪問類型 crawler feed user unknown'
)
PARTITIONED BY (
dt STRING
);
ALTER TABLE dwd_log_info_di_emr ADD IF NOT EXISTS PARTITION (dt='${bizdate}');
set hive.vectorized.execution.enabled = false;
INSERT OVERWRITE TABLE dwd_log_info_di_emr PARTITION (dt='${bizdate}')
SELECT ip
, uid
, tm
, status
, bytes
, getregion(ip) AS region --使用自訂UDF通過ip得到地區。
, regexp_extract(request, '(^[^ ]+) .*') AS method --通過正則把request差分為三個欄位。
, regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') AS url
, regexp_extract(request, '.* ([^ ]+$)') AS protocol
, regexp_extract(referer, '^[^/]+://([^/]+){1}') AS referer --通過正則清洗refer,得到更精準的url。
, CASE
WHEN lower(agent) RLIKE 'android' THEN 'android' --通過agent得到終端資訊和訪問形式。
WHEN lower(agent) RLIKE 'iphone' THEN 'iphone'
WHEN lower(agent) RLIKE 'ipad' THEN 'ipad'
WHEN lower(agent) RLIKE 'macintosh' THEN 'macintosh'
WHEN lower(agent) RLIKE 'windows phone' THEN 'windows_phone'
WHEN lower(agent) RLIKE 'windows' THEN 'windows_pc'
ELSE 'unknown'
END AS device
, CASE
WHEN lower(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler'
WHEN lower(agent) RLIKE 'feed'
OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') RLIKE 'feed' THEN 'feed'
WHEN lower(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)'
AND agent RLIKE '^[Mozilla|Opera]'
AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') NOT RLIKE 'feed' THEN 'user'
ELSE 'unknown'
END AS identity
FROM (
SELECT SPLIT(col, '##@@')[0] AS ip
, SPLIT(col, '##@@')[1] AS uid
, SPLIT(col, '##@@')[2] AS tm
, SPLIT(col, '##@@')[3] AS request
, SPLIT(col, '##@@')[4] AS status
, SPLIT(col, '##@@')[5] AS bytes
, SPLIT(col, '##@@')[6] AS referer
, SPLIT(col, '##@@')[7] AS agent
FROM ods_raw_log_d_emr
WHERE dt = '${bizdate}'
) a;2. 配置調度資訊
通過以下配置實現調度情境,每日00:30待上遊ods_raw_log_d_emr節點將儲存於OSS的user_log.txt資料同步至EMR的ods_raw_log_d_emr表後,可觸發當前dwd_log_info_di_emr節點對ods_raw_log_d_emr表資料進行加工,加工結果寫入dwd_log_info_di_emr表對應業務時間分區。
配置項 | 配置內容 | 圖示 |
調度參數 | 在調度參數地區添加:
|
|
時間屬性 | 設定重跑屬性為運行成功或失敗皆可重跑。 |
|
調度依賴 | 在調度依賴確認產出表已作為本節點輸出。 格式為 |
|
時間屬性的配置,配置調度周期為日,無需單獨配置當前節點定時調度時間,當前節點每日調起時間由商務程序虛擬節點workshop_start_emr的定時調度時間控制,即每日00:30後才會調度。
3. 儲存配置
本案例其他必填配置項,您可按需自行配置,配置完成後,在節點代碼編輯頁面,單擊工具列中的
按鈕,儲存當前配置。
建立dws_user_info_all_di_emr節點
1. 編輯代碼
雙擊dws_user_info_all_di_emr節點,進入節點配置頁面。在節點編輯頁面,編寫如下語句。
如果您工作空間的資料開發中綁定多個EMR引擎,請按需選擇EMR引擎。如果僅綁定一個EMR引擎,則無需選擇。
--建立DW層表
CREATE TABLE IF NOT EXISTS dws_user_info_all_di_emr (
uid STRING COMMENT '使用者ID',
gender STRING COMMENT '性別',
age_range STRING COMMENT '年齡段',
zodiac STRING COMMENT '星座',
region STRING COMMENT '地區,根據ip得到',
device STRING COMMENT '終端類型 ',
identity STRING COMMENT '訪問類型 crawler feed user unknown',
method STRING COMMENT 'http請求類型',
url STRING COMMENT 'url',
referer STRING COMMENT '來源url',
`time` STRING COMMENT '時間yyyymmddhh:mi:ss'
)
PARTITIONED BY (
dt STRING
);
ALTER TABLE dws_user_info_all_di_emr ADD IF NOT EXISTS PARTITION (dt='${bizdate}');
INSERT OVERWRITE TABLE dws_user_info_all_di_emr PARTITION (dt='${bizdate}')
SELECT COALESCE(a.uid, b.uid) AS uid
, b.gender
, b.age_range
, b.zodiac
, a.region
, a.device
, a.identity
, a.method
, a.url
, a.referer
, a.`time`
FROM (
SELECT *
FROM dwd_log_info_di_emr
WHERE dt = '${bizdate}'
) a
LEFT OUTER JOIN (
SELECT *
FROM ods_user_info_d_emr
WHERE dt = '${bizdate}'
) b
ON a.uid = b.uid;2. 配置調度資訊
通過以下配置實現調度情境,每日00:30待上遊任務ods_user_info_d_emr和dwd_log_info_di_emr執行完成後,可觸發dws_user_info_all_di_emr節點對ods_user_info_d_emr和dwd_log_info_di_emr兩表進行合并處理,並寫入dws_user_info_all_di_emr表中。
配置項 | 配置內容 | 圖示 |
調度參數 | 在調度參數地區添加:
|
|
時間屬性 | 設定重跑屬性為運行成功或失敗皆可重跑。 |
|
調度依賴 | 在調度依賴確認產出表已作為本節點輸出。 格式為 |
|
時間屬性的配置,配置調度周期為日,無需單獨配置當前節點定時調度時間,當前節點每日調起時間由商務程序虛擬節點workshop_start_emr的定時調度時間控制,即每日00:30後才會調度。
3. 儲存配置
本案例其他必填配置項,您可按需自行配置,配置完成後,在節點代碼編輯頁面,單擊工具列中的
按鈕,儲存當前配置。
建立ads_user_info_1d_emr節點
1. 編輯代碼
雙擊ads_user_info_1d_emr節點,進入節點配置頁面。在節點編輯頁面,編寫如下語句。
如果您工作空間的資料開發中綁定多個EMR引擎,請按需選擇EMR引擎。如果僅綁定一個EMR引擎,則無需選擇。
--建立RPT層表
CREATE TABLE IF NOT EXISTS ads_user_info_1d_emr (
uid STRING COMMENT '使用者ID',
region STRING COMMENT '地區,根據ip得到',
device STRING COMMENT '終端類型 ',
pv BIGINT COMMENT 'pv',
gender STRING COMMENT '性別',
age_range STRING COMMENT '年齡段',
zodiac STRING COMMENT '星座'
)
PARTITIONED BY (
dt STRING
);
ALTER TABLE ads_user_info_1d_emr ADD IF NOT EXISTS PARTITION (dt='${bizdate}');
INSERT OVERWRITE TABLE ads_user_info_1d_emr PARTITION (dt='${bizdate}')
SELECT uid
, MAX(region)
, MAX(device)
, COUNT(0) AS pv
, MAX(gender)
, MAX(age_range)
, MAX(zodiac)
FROM dws_user_info_all_di_emr
WHERE dt = '${bizdate}'
GROUP BY uid;2. 配置調度資訊
在上遊dws_user_info_all_di_emr節點任務將ods_user_info_d_emr表和dwd_log_info_di_emr表合并後完成後,可觸發ads_user_info_1d_emr節點任務,進一步加工資料產生可消費資料。
配置項 | 配置內容 | 圖示 |
調度參數 | 在調度參數地區添加:
|
|
時間屬性 | 設定重跑屬性為運行成功或失敗皆可重跑。 |
|
調度依賴 | 在調度依賴確認產出表已作為本節點輸出。 格式為 |
|
時間屬性的配置,配置調度周期為日,無需單獨配置當前節點定時調度時間,當前節點每日調起時間由商務程序虛擬節點workshop_start_emr的定時調度時間控制,即每日00:30後才會調度。
3. 儲存配置
本案例其他必填配置項,您可按需自行配置,配置完成後,在節點代碼編輯頁面,單擊工具列中的
按鈕,儲存當前配置。
步驟四:提交商務程序
完成商務程序所有配置後,測試該流程是否能正常運行,測試成功後,需要提交流程等待發布。
在商務程序的編輯頁面,單擊
,運行商務程序。待商務程序中的所有節點後出現
,單擊
,提交運行成功的商務程序。選擇提交對話方塊中需要提交的節點,勾選忽略輸入輸出不一致的警示,然後單擊確認。
提交成功後,發布各流程節點。
單擊頁面右側發布,進入建立發布包頁面。
選中待發布的節點,單擊發布選中項,在確認發布對話方塊,單擊發布。
步驟五:在生產環境運行任務
任務發布後,在次日才會產生執行個體運行,您可以通過補資料來對發行流程進行補資料操作,以便查看任務在生產環境是否可以運行,詳情可參見執行補資料並查看補資料執行個體(新版)。
任務發布成功後,單擊右上方的營運中心。
您也可以進入商務程序的編輯頁面,單擊工具列中的前往營運,進入營運中心頁面。
單擊左側導覽列中的,進入周期任務頁面,單擊workshop_start_emr虛節點。
在右側的DAG圖中,按右鍵workshop_start_emr節點,選擇。
勾選需要補資料的任務,輸入業務日期,單擊提交並跳轉。
在補資料頁面單擊重新整理,直至SQL任務全部運行成功即可。
後續步驟
任務周期性調度情境下,為保障任務產出的表資料符合預期,我們可以對任務產出的表資料進行資料品質監控,詳情請參見配置資料品質監控。






