E-MapReduce (EMR) Spark SQL ノードを使用して、生のユーザーデータを構造化されたユーザープロファイルテーブルに変換します。このチュートリアルでは、DataWorks ワークフローで ODS から DWD、DWS、ADS への 3 ノードパイプラインを構築し、本番環境にデプロイする手順を説明します。
このチュートリアルを完了すると、以下の方法を習得できます。
ワークフローに EMR Spark SQL ノードを追加し、それらの間のスケジューリングの依存関係を構成します。
Spark SQL を記述して、生ログデータを分割し、テーブルを結合し、ユーザープロファイルデータを集計します。
デバッグパラメータを構成し、ワークフローを実行して結果を検証します。
ワークフローを本番環境にデプロイし、バックフィル実行をトリガーします。
前提条件
開始する前に、「データ同期」のすべてのステップを完了していることを確認してください。このチュートリアルでは、User_profile_analysis_Spark ワークフローを作成し、このチュートリアルが依存する以下のリソースをプロビジョニングします。
ワークスペース内の外部テーブル
ods_user_info_d_sparkとods_raw_log_d_spark。中間データと最終出力データを保存するための OSS バケット (ドメイン:
dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com)。ご利用のワークスペースに関連付けられた Spark 計算リソースとサーバーレスリソースグループ。
ワークフローレベルのスケジューリングパラメータ。これには
bizdate(フォーマット:yyyymmdd) が含まれます。
ステップ 1: ノードの追加と依存関係の設定
データは、3 つの EMR Spark SQL ノードを順に流れます。
| ノード名 | 概要 |
|---|---|
dwd_log_info_di_spark | ods_raw_log_d_spark からの生ログレコードを ##@@ デリミタを使用して分割し、構造化されたフィールド (メソッド、URL、プロトコル、デバイスタイプ、訪問者 ID) を dwd_log_info_di_spark テーブルに抽出します。 |
dws_user_info_all_di_spark | クリーンアップされたログテーブルとユーザー情報テーブル ods_user_info_d_spark を uid で結合し、結果を dws_user_info_all_di_spark |
ads_user_info_1d_spark | 結合されたテーブルを uid で集計し、ads_user_info_1d_spark |
DataWorks コンソールにログインし、[Data Studio] に移動します。[ワークスペースディレクトリ] で、
User_profile_analysis_Sparkワークフローをクリックします。「EMR」セクションから、[EMR SPARK SQL] ノードをキャンバスにドラッグします。「ノードの作成」ダイアログボックスで、ノード名を入力します。上記にリストされている 3 つのノードすべてを作成するには、この手順を繰り返します。
キャンバス上に線を引き、スケジューリングの依存関係を設定します。
dwd_log_info_di_spark→dws_user_info_all_di_spark→ads_user_info_1d_spark
線は手動で (このチュートリアルのように) 描画することも、自動解析機能を使用して自動的に検出することもできます。自動解析の詳細については、「自動解析機能の使用」をご参照ください。
ステップ 2: データ処理ノードの構成
dwd_log_info_di_spark ノードの構成
このノードは、ods_raw_log_d_spark からの生ログレコードを ##@@ デリミタを使用して分割し、regexp_extract を適用して構造化されたフィールドを導出します。出力は、dt でパーティション分割された dwd_log_info_di_spark テーブルに書き込まれます。
キャンバスで、
dwd_log_info_di_sparkにカーソルを合わせて[ノードを開く]をクリックします。以下の SQL をコードエディタに貼り付けます。
LOCATION句のdw-spark-demoを、環境を準備したときに作成した OSS バケットのドメイン名に置き換えます。Paimon テーブル (DLF)
-- シナリオ: この Spark SQL コードは、Spark 関数を使用して ods_raw_log_d_spark テーブルの `col` 列を "##@@" で分割します。その後、複数のフィールドを生成し、結果を dwd_log_info_di_spark という新しいテーブルに書き込みます。 CREATE TABLE IF NOT EXISTS dwd_log_info_di_spark ( ip STRING COMMENT 'IP アドレス', uid STRING COMMENT 'ユーザー ID', tm STRING COMMENT 'yyyymmddhh:mi:ss 形式の時間', status STRING COMMENT 'サーバーから返された状態コード', bytes STRING COMMENT 'クライアントに返されたバイト数', method STRING COMMENT'リクエストメソッド', url STRING COMMENT 'URL', protocol STRING COMMENT 'プロトコル', referer STRING, device STRING, identity STRING, dt STRING COMMENT 'パーティションキー' -- ベストプラクティス: パーティションキーをテーブルの列として含めます。 ) PARTITIONED BY (dt) TBLPROPERTIES ( 'format' = 'paimon' -- コア: テーブルを Paimon テーブルとして宣言します。 ); INSERT OVERWRITE TABLE dwd_log_info_di_spark PARTITION (dt = '${bizdate}') SELECT ip, uid, tm, status, bytes, regexp_extract(request, '(^[^ ]+) .*', 1) AS method, regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) AS url, regexp_extract(request, '.* ([^ ]+$)', 1) AS protocol, regexp_extract(referer, '^[^/]+://([^/]+){1}', 1) AS referer, CASE WHEN lower(agent) RLIKE 'android' THEN 'android' WHEN lower(agent) RLIKE 'iphone' THEN 'iphone' WHEN lower(agent) RLIKE 'ipad' THEN 'ipad' WHEN lower(agent) RLIKE 'macintosh' THEN 'macintosh' WHEN lower(agent) RLIKE 'windows phone' THEN 'windows_phone' WHEN lower(agent) RLIKE 'windows' THEN 'windows_pc' ELSE 'unknown' END AS device, CASE WHEN lower(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler' WHEN lower(agent) RLIKE 'feed' OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) RLIKE 'feed' THEN 'feed' WHEN lower(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)' AND agent RLIKE '^(Mozilla|Opera)' AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) NOT RLIKE 'feed' THEN 'user' ELSE 'unknown' END AS identity FROM ( SELECT SPLIT(col, '##@@')[0] AS ip, SPLIT(col, '##@@')[1] AS uid, SPLIT(col, '##@@')[2] AS tm, SPLIT(col, '##@@')[3] AS request, SPLIT(col, '##@@')[4] AS status, SPLIT(col, '##@@')[5] AS bytes, SPLIT(col, '##@@')[6] AS referer, SPLIT(col, '##@@')[7] AS agent FROM ods_raw_log_d_spark WHERE dt = '${bizdate}' ) a;Hive テーブル (DLF-Legacy)
-- ods_raw_log_d_spark からの生ログレコードを ##@@ をデリミタとして分割し、 -- その後、メソッド、URL、プロトコル、デバイスタイプ、訪問者 ID フィールドを導出します。 -- ${bizdate} は、実行時に挿入されるスケジューリングパラメータです。 CREATE TABLE IF NOT EXISTS dwd_log_info_di_spark ( ip STRING COMMENT 'IP アドレス', uid STRING COMMENT 'ユーザー ID', tm STRING COMMENT 'yyyymmddhh:mi:ss 形式の時間', status STRING COMMENT 'サーバーから返された状態コード', bytes STRING COMMENT 'クライアントに返されたバイト数', method STRING COMMENT 'リクエストメソッド', url STRING COMMENT 'URL', protocol STRING COMMENT 'プロトコル', referer STRING, device STRING, identity STRING ) PARTITIONED BY (dt STRING) LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/dwd_log_info_di_spark/log_${bizdate}/'; ALTER TABLE dwd_log_info_di_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}'); INSERT OVERWRITE TABLE dwd_log_info_di_spark PARTITION (dt = '${bizdate}') SELECT ip, uid, tm, status, bytes, regexp_extract(request, '(^[^ ]+) .*', 1) AS method, regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) AS url, regexp_extract(request, '.* ([^ ]+$)', 1) AS protocol, regexp_extract(referer, '^[^/]+://([^/]+){1}', 1) AS referer, CASE WHEN lower(agent) RLIKE 'android' THEN 'android' WHEN lower(agent) RLIKE 'iphone' THEN 'iphone' WHEN lower(agent) RLIKE 'ipad' THEN 'ipad' WHEN lower(agent) RLIKE 'macintosh' THEN 'macintosh' WHEN lower(agent) RLIKE 'windows phone' THEN 'windows_phone' WHEN lower(agent) RLIKE 'windows' THEN 'windows_pc' ELSE 'unknown' END AS device, CASE WHEN lower(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler' WHEN lower(agent) RLIKE 'feed' OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) RLIKE 'feed' THEN 'feed' WHEN lower(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)' AND agent RLIKE '^(Mozilla|Opera)' AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) NOT RLIKE 'feed' THEN 'user' ELSE 'unknown' END AS identity FROM ( SELECT SPLIT(col, '##@@')[0] AS ip, SPLIT(col, '##@@')[1] AS uid, SPLIT(col, '##@@')[2] AS tm, SPLIT(col, '##@@')[3] AS request, SPLIT(col, '##@@')[4] AS status, SPLIT(col, '##@@')[5] AS bytes, SPLIT(col, '##@@')[6] AS referer, SPLIT(col, '##@@')[7] AS agent FROM ods_raw_log_d_spark WHERE dt = '${bizdate}' ) a;右側のナビゲーションウィンドウで、[デバッグ設定] をクリックし、以下のパラメーターを設定します。これらのパラメーターは、手順 3 でワークフローを実行する際に使用されます。
パラメーター 説明 例 コンピューティングリソース ご利用のワークスペースに関連付けられた Spark 計算リソース ドロップダウンリストから選択します [リソースグループ] ご利用のワークスペースに関連付けられたサーバーレスリソースグループ ドロップダウンリストから選択します スクリプトパラメーター ( bizdate)yyyymmdd形式の日付。Data Studio は実行時にこれを${bizdate}に置き換えます。bizdate=20250223[保存] をクリックします。
このチュートリアルでは、スケジューリングパラメーターとポリシーはワークフローレベルで設定されます。個別のノードでこれらを設定する必要はありません。ノードレベルの設定を確認するには、右側のナビゲーションウィンドウで [プロパティ] をクリックします。詳細については、「ノードスケジューリング」をご参照ください。
dws_user_info_all_di_spark ノードの構成
このノードは、クリーンアップされたログテーブルとユーザー情報ディメンションテーブルを uid で結合し、結果を dws_user_info_all_di_spark に書き込みます。これは dt でパーティション分割されます。
キャンバスで、
dws_user_info_all_di_sparkにカーソルを合わせ、[ノードを開く] をクリックします。以下の SQL をコードエディタに貼り付けます。
LOCATION句のdw-spark-demoを、ご利用の OSS バケットドメイン名に置き換えます。Paimon テーブル (DLF)
-- シナリオ: この Spark SQL コードは、dwd_log_info_di_spark テーブルと ods_user_info_d_spark テーブルを uid フィールドで結合し、結果を対応する dt パーティションに書き込みます。 CREATE TABLE IF NOT EXISTS dws_user_info_all_di_spark ( uid STRING COMMENT 'ユーザー ID', gender STRING COMMENT '性別', age_range STRING COMMENT '年齢層', zodiac STRING COMMENT '星座', device STRING COMMENT 'デバイスタイプ', method STRING COMMENT 'HTTP リクエストタイプ', url STRING COMMENT 'URL', `time` STRING COMMENT 'yyyymmddhh:mi:ss 形式の時間', dt STRING COMMENT 'パーティションキー' -- ベストプラクティス: パーティションキーをテーブルの列として含めます。 ) PARTITIONED BY (dt) TBLPROPERTIES ( 'format' = 'paimon' -- コア: テーブルを Paimon テーブルとして宣言します。 ); -- ユーザーテーブルとログテーブルからデータを挿入します。 INSERT OVERWRITE TABLE dws_user_info_all_di_spark PARTITION (dt = '${bizdate}') SELECT COALESCE(a.uid, b.uid) AS uid, b.gender AS gender, b.age_range AS age_range, b.zodiac AS zodiac, a.device AS device, a.method AS method, a.url AS url, a.tm FROM ( SELECT * FROM dwd_log_info_di_spark WHERE dt='${bizdate}' ) a LEFT OUTER JOIN ( SELECT * FROM ods_user_info_d_spark WHERE dt='${bizdate}' ) b ON a.uid = b.uid;Hive テーブル (DLF-Legacy)
-- dwd_log_info_di_spark (ログファクト) と ods_user_info_d_spark (ユーザーディメンション) を uid で結合します。 -- COALESCE は、uid がログテーブルには存在するがユーザーテーブルには存在しないケースを処理します。 CREATE TABLE IF NOT EXISTS dws_user_info_all_di_spark ( uid STRING COMMENT 'ユーザー ID', gender STRING COMMENT '性別', age_range STRING COMMENT '年齢層', zodiac STRING COMMENT '星座', device STRING COMMENT '端末タイプ', method STRING COMMENT 'HTTP リクエストタイプ', url STRING COMMENT 'URL', `time` STRING COMMENT 'yyyymmddhh:mi:ss 形式の時間' ) PARTITIONED BY (dt STRING) LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/dws_user_info_all_di_spark/log_${bizdate}/'; ALTER TABLE dws_user_info_all_di_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}'); INSERT OVERWRITE TABLE dws_user_info_all_di_spark PARTITION (dt = '${bizdate}') SELECT COALESCE(a.uid, b.uid) AS uid, b.gender AS gender, b.age_range AS age_range, b.zodiac AS zodiac, a.device AS device, a.method AS method, a.url AS url, a.tm FROM ( SELECT * FROM dwd_log_info_di_spark WHERE dt='${bizdate}' ) a LEFT OUTER JOIN ( SELECT * FROM ods_user_info_d_spark WHERE dt='${bizdate}' ) b ON a.uid = b.uid;[デバッグ設定] に、前のノードと同じパラメーター値を設定します。
[保存] をクリックします。
ads_user_info_1d_spark ノードの構成
このノードは、結合されたテーブルを uid で集計し、ads_user_info_1d_spark に日次ユーザープロファイルサマリーを生成します。各行は、1 ユーザーの日次ページビュー数とプロファイル属性を表します。
キャンバス上で、
ads_user_info_1d_sparkの上にマウスを合わせて、[ノードを開く] をクリックします。以下の SQL をコードエディタに貼り付けます。
LOCATION句のdw-spark-demoを、ご利用の OSS バケットドメイン名に置き換えます。Paimon テーブル (DLF)
-- シナリオ: この Spark SQL コードは、Spark 関数を使用して dws_user_info_all_di_spark テーブルをさらに処理し、結果を ads_user_info_1d_spark という新しいテーブルに書き込みます。 CREATE TABLE IF NOT EXISTS ads_user_info_1d_spark ( uid STRING COMMENT 'ユーザー ID', device STRING COMMENT 'デバイスタイプ', pv BIGINT COMMENT 'PV', gender STRING COMMENT '性別', age_range STRING COMMENT '年齢層', zodiac STRING COMMENT '星座', dt STRING COMMENT 'パーティションキー' -- ベストプラクティス: パーティションキーをテーブルの列として含めます。 ) PARTITIONED BY (dt) TBLPROPERTIES ( 'format' = 'paimon' -- コア: テーブルを Paimon テーブルとして宣言します。 ); INSERT OVERWRITE TABLE ads_user_info_1d_spark PARTITION (dt='${bizdate}') SELECT uid , MAX(device) , COUNT(0) AS pv , MAX(gender) , MAX(age_range) , MAX(zodiac) FROM dws_user_info_all_di_spark WHERE dt = '${bizdate}' GROUP BY uid;Hive テーブル (DLF-Legacy)
-- dws_user_info_all_di_spark を uid で集計し、最終的なユーザープロファイルを生成します。 -- COUNT(0) はページビュー (pv) を計算し、MAX() は各属性の代表的な値を選択します。 CREATE TABLE IF NOT EXISTS ads_user_info_1d_spark ( uid STRING COMMENT 'ユーザー ID', device STRING COMMENT '端末タイプ', pv BIGINT COMMENT 'PV', gender STRING COMMENT '性別', age_range STRING COMMENT '年齢層', zodiac STRING COMMENT '星座' ) PARTITIONED BY ( dt STRING ) LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ads_user_info_1d_spark/log_${bizdate}/'; ALTER TABLE ads_user_info_1d_spark ADD IF NOT EXISTS PARTITION (dt='${bizdate}'); INSERT OVERWRITE TABLE ads_user_info_1d_spark PARTITION (dt='${bizdate}') SELECT uid , MAX(device) , COUNT(0) AS pv , MAX(gender) , MAX(age_range) , MAX(zodiac) FROM dws_user_info_all_di_spark WHERE dt = '${bizdate}' GROUP BY uid;[デバッグ構成]を、前のノードと同じパラメーター値で設定します。
[保存]をクリックします。
ステップ 3: ワークフローの実行と検証
ワークフロー構成タブの上部のツールバーで、[実行] をクリックします。[実行時パラメーターの入力] ダイアログボックスで、
20250223(または任意の日付)をbizdateの値として入力し、[OK] をクリックします。すべてのノードが完了したら、出力をクエリして結果を検証します。
DataWorks コンソールで、[データ分析およびサービス] > [データ分析] に移動し、[データ分析へ移動] をクリックします。左側のナビゲーションウィンドウで、[SQL クエリ] をクリックします。
「マイ ファイル」の横にあるアイコンをクリックし、[ファイルの作成] を選択します。ファイル名を入力して、[OK] をクリックします。
ファイルを開き、右上の設定アイコンをクリックして構成します。
パラメータ 値 [ワークスペース] User_profile_analysis_Sparkワークフローを含むワークスペース[データソースタイプ] EMR Spark SQL[データソース名] ご利用のワークスペースに関連付けられた EMR Serverless Spark 計算リソース [OK] をクリックし、次のクエリを実行します。
The data timestampを、使用したbizdateの値に置き換えます。たとえば、ノードが 2025 年 2 月 23 日に実行された場合、データタイムスタンプは20250222(スケジューリング日の 1 日前) になります。SELECT * FROM dwd_log_info_di_spark WHERE dt = 'The data timestamp';
ステップ 4: ワークフローのデプロイ
ノードは、スケジュールに基づいて実行される前に本番環境にデプロイする必要があります。
スケジューリングパラメータは、「データ同期チュートリアル」でワークフローレベルで構成されています。デプロイ前に個々のノードに対して構成する必要はありません。
Data Studio の左側のナビゲーションウィンドウで、Data Studio アイコンをクリックして Data Studio ページに戻ります。
[ワークスペースディレクトリ] で、
User_profile_analysis_Sparkワークフローを見つけて開きます。ノードツールバーの [デプロイ] をクリックします。
[本番環境へのデプロイメントを開始] をクリックし、ガイド付きのステップに従ってデプロイメントを完了します。
ステップ 5: 本番環境でのノード実行
デプロイ後、スケジュールされたインスタンスは翌日から生成されます。ワークフローをすぐに検証するためにバックフィルを実行します。
DataStudio ページのトップナビゲーションバーで、[オペレーションセンター] をクリックします。または、DataWorks コンソールの左上隅にあるプロダクト アイコンをクリックし、[すべてのプロダクト] > [データ開発およびタスク運用] > [オペレーションセンター] を選択します。
左側のナビゲーションウィンドウで、[自動トリガーノード O&M] > [自動トリガーノード] を選択します。ゼロロードノード
workshop_start_sparkを探し、その名前をクリックします。ノードの有向非循環グラフ (DAG) で、
workshop_start_sparkを右クリックし、[実行] > [現在のノードと子孫ノードを遡って実行] を選択します。[バックフィルデータ] パネルで、バックフィルするノードを選択し、[データタイムスタンプ] をセットし、[送信してリダイレクト] をクリックします。
[データバックフィル] ページで、[更新] をクリックして、
workshop_start_sparkとその子孫ノードが正常に実行されたかどうかを確認します。
このチュートリアル完了後の予期しない課金を避けるには、ワークフロー内のすべてのノードに有効期間を設定するか、ゼロロードノード workshop_start_spark をフリーズ次のステップ
これで、生のユーザーデータを構造化されたユーザープロファイルテーブルに処理し、日次スケジュールで実行される Spark SQL パイプラインが完成しました。ここから、以下のことができます。
結果の可視化: DataAnalysis を使用して、
ads_user_info_1d_sparkテーブルからチャートを構築し、ユーザープロファイルデータからトレンドを抽出します。データ品質のモニタリング: 出力テーブルにモニタリングルールを設定し、不良データがダウンストリームに伝播する前に検出してインターセプトします。
データリネージの探索: Data Map で生成されたテーブルを表示し、データリネージを使用してテーブル間の関係をトレースします。
API を介したデータ公開: DataService Studio を使用して、最終的に処理されたデータを標準化された API としてダウンストリームコンシューマーに公開します。