このトピックでは、Spark SQL に基づいて作成された ods_user_info_d_spark および ods_raw_log_d_spark 外部テーブルを使用して、プライベート Object Storage Service (OSS) バケットに同期された基本的なユーザー情報とユーザーの Web サイトアクセスログにアクセスする方法について説明します。このトピックでは、E-MapReduce (EMR) Spark SQL ノードを使用して同期されたデータを処理し、目的のユーザープロファイルデータを取得する方法についても説明します。このトピックは、Spark SQL を使用して同期されたデータを計算および分析し、データウェアハウスの簡単なデータ処理を完了する方法を理解するのに役立ちます。
前提条件
このチュートリアルを開始する前に、「データの同期」の手順を完了してください。
ステップ 1: データ処理リンクの確立
「データ同期」フェーズでは、必要なデータが Spark を使用してロードされます。次の目的は、データをさらに処理して、基本的なユーザープロファイルデータを生成することです。
DataWorks コンソールにログインし、Data Studio ページの DATA STUDIO ペインに移動します。DATA STUDIO ペインの [ワークスペースディレクトリ] セクションで、準備した
User_profile_analysis_Sparkワークフローを見つけ、ワークフロー名をクリックしてワークフローの設定タブに移動します。設定タブの EMR セクションから右側のキャンバスに [EMR SPARK SQL] をドラッグします。[ノードの作成] ダイアログボックスで、ノードの [名前] パラメーターを設定します。
次の表に、このチュートリアルで使用されるノード名とノードの機能を示します。
ノードタイプ
ノード名
ノード機能
EMR Spark SQLdwd_log_info_di_sparkこのノードは、Spark SQL に基づいて
ods_raw_log_d_sparkテーブルのデータを処理し、データをdwd_log_info_di_sparkテーブルに同期するために使用できます。
EMR Spark SQLdws_user_info_all_di_sparkこのノードは、ファクトログテーブル
dwd_log_info_di_sparkと基本ユーザー情報テーブルods_user_info_d_sparkを uid フィールドに基づいて結合し、集約ユーザーログテーブルを生成するために使用できます。このノードは、基本ユーザー情報テーブル
ods_user_info_d_sparkとファクトログテーブルdwd_log_info_di_sparkを集約し、テーブルからdws_user_info_all_di_sparkテーブルにデータを同期するためにも使用できます。
EMR Spark SQLads_user_info_1d_sparkこのノードは、
dws_user_info_all_di_sparkテーブルのデータをさらに処理し、テーブルからads_user_info_1d_sparkテーブルにデータを同期して、基本的なユーザープロファイルを生成するために使用できます。次の図に示すように、線を描画して EMR Spark SQL ノードの先祖ノードを設定します。
説明線を描画して、ワークフロー内のノードの スケジューリング依存関係 を設定できます。また、自動解析機能を使用して、システムがノード間のスケジューリング依存関係を自動的に識別できるようにすることもできます。このチュートリアルでは、ノード間のスケジューリング依存関係は線を描画して設定します。自動解析機能の詳細については、「自動解析機能の使用」をご参照ください。
ステップ 2: データ処理ノードの設定
ワークフローが設定された後、EMR Spark SQL ノードを使用して、基本ユーザー情報テーブルと ファクトログテーブル のデータを処理し、初期ユーザープロファイルテーブル ads_user_info_1d_spark を生成します。
dwd_log_info_di_spark ノードの設定
このノードのサンプルコードでは、Spark が提供する関数を使用して、先祖テーブル ods_raw_log_d_spark のフィールドの SQL コードを処理し、そのフィールドを dwd_log_info_di_spark テーブルに同期します。
ワークフローのキャンバスで、
dwd_log_info_di_sparkノードにポインターを移動し、[ノードを開く] をクリックします。次の SQL 文をコピーして、コードエディタに貼り付けます:
-- シナリオ: 次のサンプルコードの SQL 文は Spark SQL 文です。Spark SQL 関数を使用して、##@@ を使用して Spark にロードされた ods_raw_log_d_spark テーブルのデータを分割して複数のフィールドを生成し、そのフィールドを dwd_log_info_di_spark テーブルに同期できます。 -- 注: -- DataWorks は、スケジューリングシナリオで日次増分データを宛先テーブルの対応するパーティションに書き込むために使用できるスケジューリングパラメーターを提供します。 -- 実際の開発シナリオでは、${変数名} 形式でノードコードに変数を定義できます。その後、ノードの設定タブの [プロパティ] タブで、スケジューリングパラメーターを値として変数に割り当てることができます。これにより、スケジューリングシナリオでのスケジューリングパラメーターの設定に基づいて、ノードコード内のスケジューリングパラメーターの値が動的に置き換えられます。 CREATE TABLE IF NOT EXISTS dwd_log_info_di_spark ( ip STRING COMMENT 'IP アドレス', uid STRING COMMENT 'ユーザー ID', tm STRING COMMENT 'yyyymmddhh:mi:ss 形式の時間', status STRING COMMENT 'サーバーから返される状態コード', bytes STRING COMMENT 'クライアントに返されるバイト数', method STRING COMMENT'リクエストメソッド', url STRING COMMENT 'url', protocol STRING COMMENT 'プロトコル', referer STRING, device STRING, identity STRING ) PARTITIONED BY (dt STRING) LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/dwd_log_info_di_spark/log_${bizdate}/'; ALTER TABLE dwd_log_info_di_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}'); INSERT OVERWRITE TABLE dwd_log_info_di_spark PARTITION (dt = '${bizdate}') SELECT ip, uid, tm, status, bytes, regexp_extract(request, '(^[^ ]+) .*', 1) AS method, regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) AS url, regexp_extract(request, '.* ([^ ]+$)', 1) AS protocol, regexp_extract(referer, '^[^/]+://([^/]+){1}', 1) AS referer, CASE WHEN lower(agent) RLIKE 'android' THEN 'android' WHEN lower(agent) RLIKE 'iphone' THEN 'iphone' WHEN lower(agent) RLIKE 'ipad' THEN 'ipad' WHEN lower(agent) RLIKE 'macintosh' THEN 'macintosh' WHEN lower(agent) RLIKE 'windows phone' THEN 'windows_phone' WHEN lower(agent) RLIKE 'windows' THEN 'windows_pc' ELSE 'unknown' END AS device, CASE WHEN lower(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler' WHEN lower(agent) RLIKE 'feed' OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) RLIKE 'feed' THEN 'feed' WHEN lower(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)' AND agent RLIKE '^(Mozilla|Opera)' AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) NOT RLIKE 'feed' THEN 'user' ELSE 'unknown' END AS identity FROM ( SELECT SPLIT(col, '##@@')[0] AS ip, SPLIT(col, '##@@')[1] AS uid, SPLIT(col, '##@@')[2] AS tm, SPLIT(col, '##@@')[3] AS request, SPLIT(col, '##@@')[4] AS status, SPLIT(col, '##@@')[5] AS bytes, SPLIT(col, '##@@')[6] AS referer, SPLIT(col, '##@@')[7] AS agent FROM ods_raw_log_d_spark WHERE dt = '${bizdate}' ) a;説明必要に応じて、コード内の location アドレスを置き換えてください。
dw-spark-demoは、環境を準備した ときに作成した OSS バケットの名前です。ノードの設定タブの右側のナビゲーションウィンドウで、[デバッグ設定] をクリックします。[デバッグ設定] タブで、次のパラメーターを設定します。これらのパラメーターは、ステップ 4 でワークフローをテストするために使用されます。
パラメーター
説明
計算資源
環境を準備する ときにワークスペースに関連付けられた Spark 計算資源を選択します。
リソースグループ
環境を準備する ときにワークスペースに関連付けられたサーバーレスリソースグループを選択します。
スクリプトパラメーター
bizdate パラメーターの [パラメーター値] 列に、
yyyymmdd形式で値を入力します。例:bizdate=20250223。ワークフローをデバッグすると、Data Studio はワークフロー内のノードに定義された変数を定数に置き換えます。(オプション) スケジューリングプロパティを設定します。
このチュートリアルでは、スケジューリングパラメーターのデフォルト値を維持できます。ノード編集ページの右側のペインで [スケジューリング設定] をクリックできます。パラメーターの詳細については、「ノードのスケジューリングプロパティを設定する」をご参照ください。
スケジューリングパラメーター: これらは、このチュートリアルのワークフローですでに設定されています。内部ノード用に設定する必要はありません。タスクやコードで直接使用できます。
スケジューリングポリシー: 子ノードがワークフローの開始後に実行を待機する時間を指定するには、[遅延実行時間] パラメーターを設定できます。このチュートリアルでは、このパラメーターを設定する必要はありません。
[保存] をクリックします。
dws_user_info_all_di_spark ノードの設定
このノードは、基本ユーザー情報テーブル ods_user_info_d_spark とファクトログテーブル dwd_log_info_di_spark を集約し、集約結果を dws_user_info_all_di_spark テーブルに同期するために使用されます。
ワークフローのキャンバスで、
dws_user_info_all_di_sparkノードにポインターを移動し、[ノードを開く] をクリックします。次の SQL 文をコピーして、コードエディタに貼り付けます:
-- シナリオ: 次のサンプルコードの SQL 文は Spark SQL 文です。uid フィールドに基づいて dwd_log_info_di_spark テーブルと ods_user_info_d_spark テーブルを結合し、指定された dt パーティションにデータを書き込むことができます。 -- 注: -- DataWorks は、スケジューリングシナリオで日次増分データを宛先テーブルの対応するパーティションに書き込むために使用できるスケジューリングパラメーターを提供します。 -- 実際の開発シナリオでは、${変数名} 形式でノードコードに変数を定義できます。その後、ノードの設定タブの [プロパティ] タブで、スケジューリングパラメーターを値として変数に割り当てることができます。これにより、スケジューリングシナリオでのスケジューリングパラメーターの設定に基づいて、ノードコード内のスケジューリングパラメーターの値が動的に置き換えられます。 CREATE TABLE IF NOT EXISTS dws_user_info_all_di_spark ( uid STRING COMMENT 'ユーザー ID', gender STRING COMMENT '性別', age_range STRING COMMENT '年齢範囲', zodiac STRING COMMENT '星座', device STRING COMMENT '端末タイプ', method STRING COMMENT 'HTTP リクエストタイプ', url STRING COMMENT 'url', `time` STRING COMMENT 'yyyymmddhh:mi:ss 形式の時間' ) PARTITIONED BY (dt STRING) LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/dws_user_info_all_di_spark/log_${bizdate}/'; -- パーティションを追加します。 ALTER TABLE dws_user_info_all_di_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}'); -- 基本ユーザー情報テーブルとファクトログテーブルからデータを挿入します。 INSERT OVERWRITE TABLE dws_user_info_all_di_spark PARTITION (dt = '${bizdate}') SELECT COALESCE(a.uid, b.uid) AS uid, b.gender AS gender, b.age_range AS age_range, b.zodiac AS zodiac, a.device AS device, a.method AS method, a.url AS url, a.tm FROM ( SELECT * FROM dwd_log_info_di_spark WHERE dt='${bizdate}' ) a LEFT OUTER JOIN ( SELECT * FROM ods_user_info_d_spark WHERE dt='${bizdate}' ) b ON a.uid = b.uid;説明必要に応じて、コード内の location アドレスを置き換えてください。
dw-spark-demoは、環境を準備した ときに作成した OSS バケットの名前です。ノードの設定タブの右側のナビゲーションウィンドウで、[デバッグ設定] をクリックします。[デバッグ設定] タブで、次のパラメーターを設定します。これらのパラメーターは、ステップ 4 でワークフローをテストするために使用されます。
パラメーター
説明
計算資源
環境を準備する ときにワークスペースに関連付けられた Spark 計算資源を選択します。
リソースグループ
環境を準備する ときにワークスペースに関連付けられたサーバーレスリソースグループを選択します。
スクリプトパラメーター
bizdate パラメーターの [パラメーター値] 列に、
yyyymmdd形式で値を入力します。例:bizdate=20250223。ワークフローをデバッグすると、Data Studio はワークフロー内のノードに定義された変数を定数に置き換えます。(オプション) スケジューリングプロパティを設定します。
このチュートリアルでは、スケジューリングパラメーターのデフォルト値を維持できます。ノード編集ページの右側のペインで [スケジューリング設定] をクリックできます。パラメーターの詳細については、「ノードのスケジューリングプロパティを設定する」をご参照ください。
スケジューリングパラメーター: これらは、このチュートリアルのワークフローですでに設定されています。内部ノード用に設定する必要はありません。タスクやコードで直接使用できます。
スケジューリングポリシー: 子ノードがワークフローの開始後に実行を待機する時間を指定するには、[遅延実行時間] パラメーターを設定できます。このチュートリアルでは、このパラメーターを設定する必要はありません。
[保存] をクリックします。
ads_user_info_1d_spark ノードの設定
このノードは、dws_user_info_all_di_spark テーブルのデータをさらに処理し、テーブルから ads_user_info_1d_spark テーブルにデータを同期して、基本的なユーザープロファイルを生成するために使用されます。
ワークフローのキャンバスで、
ads_user_info_1d_sparkノードにポインターを移動し、[ノードを開く] をクリックします。次の SQL 文をコピーして、コードエディタに貼り付けます:
-- シナリオ: 次のサンプルコードの SQL 文は Spark SQL 文です。Spark SQL 関数を使用して、Spark SQL の dws_user_info_all_di_spark テーブルをさらに処理し、データを ads_user_info_1d_spark テーブルに同期できます。 -- 注: -- DataWorks は、スケジューリングシナリオで日次増分データを宛先テーブルの対応するパーティションに書き込むために使用できるスケジューリングパラメーターを提供します。 -- 実際の開発シナリオでは、${変数名} 形式でノードコードに変数を定義できます。その後、ノードの設定タブの [プロパティ] タブで、スケジューリングパラメーターを値として変数に割り当てることができます。これにより、スケジューリングシナリオでのスケジューリングパラメーターの設定に基づいて、ノードコード内のスケジューリングパラメーターの値が動的に置き換えられます。 CREATE TABLE IF NOT EXISTS ads_user_info_1d_spark ( uid STRING COMMENT 'ユーザー ID', device STRING COMMENT '端末タイプ', pv BIGINT COMMENT 'pv', gender STRING COMMENT '性別', age_range STRING COMMENT '年齢範囲', zodiac STRING COMMENT '星座' ) PARTITIONED BY ( dt STRING ) LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ads_user_info_1d_spark/log_${bizdate}/'; ALTER TABLE ads_user_info_1d_spark ADD IF NOT EXISTS PARTITION (dt='${bizdate}'); INSERT OVERWRITE TABLE ads_user_info_1d_spark PARTITION (dt='${bizdate}') SELECT uid , MAX(device) , COUNT(0) AS pv , MAX(gender) , MAX(age_range) , MAX(zodiac) FROM dws_user_info_all_di_spark WHERE dt = '${bizdate}' GROUP BY uid;説明必要に応じて、コード内の location アドレスを置き換えてください。
dw-spark-demoは、環境を準備した ときに作成した OSS バケットの名前です。ノードの設定タブの右側のナビゲーションウィンドウで、[デバッグ設定] をクリックします。[デバッグ設定] タブで、次のパラメーターを設定します。これらのパラメーターは、ステップ 4 でワークフローをテストするために使用されます。
パラメーター
説明
計算資源
環境を準備する ときにワークスペースに関連付けられた Spark 計算資源を選択します。
リソースグループ
環境を準備する ときにワークスペースに関連付けられたサーバーレスリソースグループを選択します。
スクリプトパラメーター
bizdate パラメーターの [パラメーター値] 列に、
yyyymmdd形式で値を入力します。例:bizdate=20250223。ワークフローをデバッグすると、Data Studio はワークフロー内のノードに定義された変数を定数に置き換えます。(オプション) スケジューリングプロパティを設定します。
このチュートリアルでは、スケジューリングパラメーターのデフォルト値を維持できます。ノード編集ページの右側のペインで [スケジューリング設定] をクリックできます。パラメーターの詳細については、「ノードのスケジューリングプロパティを設定する」をご参照ください。
スケジューリングパラメーター: これらは、このチュートリアルのワークフローですでに設定されています。内部ノード用に設定する必要はありません。タスクやコードで直接使用できます。
スケジューリングポリシー: 子ノードがワークフローの開始後に実行を待機する時間を指定するには、[遅延実行時間] パラメーターを設定できます。このチュートリアルでは、このパラメーターを設定する必要はありません。
[保存] をクリックします。
ステップ 3: データの処理
データを同期します。
ワークフローのツールバーで、[実行] をクリックします。この実行でノードに定義されているパラメーター変数の値を設定します。このチュートリアルでは、例として
20250223を使用します。必要に応じて値を変更できます。[OK] をクリックし、プロセスが完了するのを待ちます。データ処理結果をクエリします。
SQL クエリページに移動します。
DataWorks コンソールにログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、 を選択します。表示されたページで、[DataAnalysis に移動] をクリックします。表示されたページの左側のナビゲーションウィンドウで、[SQL クエリ] をクリックします。
SQL クエリファイルを設定します。
[マイファイル] の横にある
アイコンをクリックし、[ファイルの作成] を選択します。[ファイルの作成] ダイアログボックスで、[ファイル名] パラメーターを設定し、[OK] をクリックします。左側のナビゲーションツリーで、作成した SQL ファイルを見つけて、ファイルの設定タブに移動します。
設定タブの右上隅にある
アイコンをクリックします。表示されるポップオーバーで、次のパラメーターを設定します。パラメーター
説明
ワークスペース
User_profile_analysis_Sparkワークフローが属するワークスペースを選択します。データソースタイプ
ドロップダウンリストから
EMR Spark SQLを選択します。データソース名
環境を準備する ときにワークスペースに関連付けられた EMR Serverless Spark 計算資源を選択します。
[OK] をクリックします。
クエリの SQL 文を記述します。
このトピックのすべてのノードが正常に実行された後、次の SQL 文を記述して実行し、EMR Spark SQL ノードに基づいて外部テーブルが期待どおりに作成されたかどうかを確認します。
-- パーティションフィルター条件を現在の操作のデータタイムスタンプに更新する必要があります。たとえば、ノードが 2025 年 2 月 23 日に実行されるようにスケジュールされている場合、ノードのデータタイムスタンプは 20250222 であり、ノードのスケジューリング時間より 1 日早くなります。 SELECT * FROM dwd_log_info_di_spark WHERE dt ='データタイムスタンプ';
ステップ 4: ワークフローのデプロイ
ノードを自動的にスケジュールするには、事前に本番環境にデプロイする必要があります。以下の手順に従って、ワークフローを本番環境にデプロイできます。
このチュートリアルでは、スケジューリングパラメーターは ワークフローレベルのスケジューリング設定 でグローバルに設定されています。公開前に個々のノードのスケジューリングパラメーターを設定する必要はありません。
1. Data Studio の左側のナビゲーションウィンドウで、
をクリックして Data Studio ページに移動します。
2. [ワークスペースディレクトリ] で、作成したワークフローを見つけてクリックし、ワークフロー設定ページを開きます。
3. ノードツールバーで、[デプロイ] をクリックします。
4. [本番環境へのデプロイを開始] をクリックし、ガイド付きの手順に従ってプロセスを完了します。
ステップ 5: 本番環境でのノードの実行
タスクがデプロイされると、そのインスタンスが生成され、翌日に実行されます。バックフィル操作を実行して、公開されたワークフローが本番環境で期待どおりに実行できるかどうかを確認できます。詳細については、「データのバックフィルとデータバックフィルインスタンスの表示 (新バージョン)」をご参照ください。
ノードがデプロイされた後、Data Studio ページの上部のナビゲーションバーで [オペレーションセンター] をクリックします。
DataWorks コンソールの左上隅にある
アイコンをクリックし、 を選択することもできます。オペレーションセンターページの左側のナビゲーションウィンドウで、 を選択します。[自動トリガーノード] ページで、ゼロロードノード
workshop_start_sparkを見つけて、ノード名をクリックします。ノードの有向非巡回グラフ (DAG) で、
workshop_start_sparkノードを右クリックし、 を選択します。[データのバックフィル] パネルで、データをバックフィルするノードを選択し、[データタイムスタンプ] パラメーターを設定してから、[送信してリダイレクト] をクリックします。
[データバックフィル] ページの上部で、[更新] をクリックして、workshop_start_spark ノードとその子孫ノードが正常に実行されたかどうかを確認します。
その他の操作
視覚化された方法でデータを表示する: ユーザープロファイル分析が完了したら、DataAnalysis を使用して処理されたデータをチャートで表示します。これにより、主要な情報をすばやく抽出し、データの背後にあるビジネスの傾向を把握できます。
データ品質の監視: データ処理後に生成されるテーブルの監視ルールを設定して、ダーティデータを事前に特定して遮断し、ダーティデータの影響が拡大するのを防ぎます。
データの管理: ユーザープロファイル分析タスクが完了すると、Spark でデータテーブルが生成されます。生成されたデータテーブルをデータマップで表示し、データリネージに基づいてテーブル間の関係を表示できます。
DataService Studio API を使用してサービスを提供する: 最終的に処理されたデータを取得した後、DataService Studio の標準化された API を使用してデータを共有し、API を使用してデータを受信する他のビジネスモジュールにデータを提供します。