すべてのプロダクト
Search
ドキュメントセンター

DataWorks:データの処理

最終更新日:Oct 22, 2025

このトピックでは、Spark SQL に基づいて作成された ods_user_info_d_spark および ods_raw_log_d_spark 外部テーブルを使用して、プライベート Object Storage Service (OSS) バケットに同期された基本的なユーザー情報とユーザーの Web サイトアクセスログにアクセスする方法について説明します。このトピックでは、E-MapReduce (EMR) Spark SQL ノードを使用して同期されたデータを処理し、目的のユーザープロファイルデータを取得する方法についても説明します。このトピックは、Spark SQL を使用して同期されたデータを計算および分析し、データウェアハウスの簡単なデータ処理を完了する方法を理解するのに役立ちます。

前提条件

このチュートリアルを開始する前に、「データの同期」の手順を完了してください。

ステップ 1: データ処理リンクの確立

データ同期」フェーズでは、必要なデータが Spark を使用してロードされます。次の目的は、データをさらに処理して、基本的なユーザープロファイルデータを生成することです。

  1. DataWorks コンソールにログインし、Data Studio ページの DATA STUDIO ペインに移動します。DATA STUDIO ペインの [ワークスペースディレクトリ] セクションで、準備した User_profile_analysis_Spark ワークフローを見つけ、ワークフロー名をクリックしてワークフローの設定タブに移動します。

  2. 設定タブの EMR セクションから右側のキャンバスに [EMR SPARK SQL] をドラッグします。[ノードの作成] ダイアログボックスで、ノードの [名前] パラメーターを設定します。

    次の表に、このチュートリアルで使用されるノード名とノードの機能を示します。

    ノードタイプ

    ノード名

    ノード機能

    imageEMR Spark SQL

    dwd_log_info_di_spark

    このノードは、Spark SQL に基づいて ods_raw_log_d_spark テーブルのデータを処理し、データを dwd_log_info_di_spark テーブルに同期するために使用できます。

    imageEMR Spark SQL

    dws_user_info_all_di_spark

    このノードは、ファクトログテーブル dwd_log_info_di_spark基本ユーザー情報テーブル ods_user_info_d_spark を uid フィールドに基づいて結合し、集約ユーザーログテーブルを生成するために使用できます。

    このノードは、基本ユーザー情報テーブル ods_user_info_d_spark とファクトログテーブル dwd_log_info_di_spark を集約し、テーブルから dws_user_info_all_di_spark テーブルにデータを同期するためにも使用できます。

    imageEMR Spark SQL

    ads_user_info_1d_spark

    このノードは、dws_user_info_all_di_spark テーブルのデータをさらに処理し、テーブルから ads_user_info_1d_spark テーブルにデータを同期して、基本的なユーザープロファイルを生成するために使用できます。

  3. 次の図に示すように、線を描画して EMR Spark SQL ノードの先祖ノードを設定します。

    説明

    線を描画して、ワークフロー内のノードの スケジューリング依存関係 を設定できます。また、自動解析機能を使用して、システムがノード間のスケジューリング依存関係を自動的に識別できるようにすることもできます。このチュートリアルでは、ノード間のスケジューリング依存関係は線を描画して設定します。自動解析機能の詳細については、「自動解析機能の使用」をご参照ください。

ステップ 2: データ処理ノードの設定

ワークフローが設定された後、EMR Spark SQL ノードを使用して、基本ユーザー情報テーブルと ファクトログテーブル のデータを処理し、初期ユーザープロファイルテーブル ads_user_info_1d_spark を生成します。

dwd_log_info_di_spark ノードの設定

このノードのサンプルコードでは、Spark が提供する関数を使用して、先祖テーブル ods_raw_log_d_spark のフィールドの SQL コードを処理し、そのフィールドを dwd_log_info_di_spark テーブルに同期します。

  1. ワークフローのキャンバスで、dwd_log_info_di_spark ノードにポインターを移動し、[ノードを開く] をクリックします。

  2. 次の SQL 文をコピーして、コードエディタに貼り付けます:

    -- シナリオ: 次のサンプルコードの SQL 文は Spark SQL 文です。Spark SQL 関数を使用して、##@@ を使用して Spark にロードされた ods_raw_log_d_spark テーブルのデータを分割して複数のフィールドを生成し、そのフィールドを dwd_log_info_di_spark テーブルに同期できます。
    -- 注:
    --      DataWorks は、スケジューリングシナリオで日次増分データを宛先テーブルの対応するパーティションに書き込むために使用できるスケジューリングパラメーターを提供します。
    --      実際の開発シナリオでは、${変数名} 形式でノードコードに変数を定義できます。その後、ノードの設定タブの [プロパティ] タブで、スケジューリングパラメーターを値として変数に割り当てることができます。これにより、スケジューリングシナリオでのスケジューリングパラメーターの設定に基づいて、ノードコード内のスケジューリングパラメーターの値が動的に置き換えられます。
    CREATE TABLE IF NOT EXISTS dwd_log_info_di_spark (
      ip STRING COMMENT 'IP アドレス',
      uid STRING COMMENT 'ユーザー ID',
      tm STRING COMMENT 'yyyymmddhh:mi:ss 形式の時間',
      status STRING COMMENT 'サーバーから返される状態コード',
      bytes STRING COMMENT 'クライアントに返されるバイト数',
      method STRING COMMENT'リクエストメソッド',
      url STRING COMMENT 'url',
      protocol STRING COMMENT 'プロトコル',
      referer STRING,
      device STRING,
      identity STRING
    )
    PARTITIONED BY (dt STRING)
    LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/dwd_log_info_di_spark/log_${bizdate}/';
    
    ALTER TABLE dwd_log_info_di_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}');
    
    INSERT  OVERWRITE TABLE dwd_log_info_di_spark PARTITION (dt = '${bizdate}')
    SELECT ip, 
           uid, 
           tm, 
           status, 
           bytes, 
           regexp_extract(request, '(^[^ ]+) .*', 1) AS method,
           regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) AS url,
           regexp_extract(request, '.* ([^ ]+$)', 1) AS protocol,
           regexp_extract(referer, '^[^/]+://([^/]+){1}', 1) AS referer,
           CASE 
               WHEN lower(agent) RLIKE 'android' THEN 'android' 
               WHEN lower(agent) RLIKE 'iphone' THEN 'iphone' 
               WHEN lower(agent) RLIKE 'ipad' THEN 'ipad' 
               WHEN lower(agent) RLIKE 'macintosh' THEN 'macintosh' 
               WHEN lower(agent) RLIKE 'windows phone' THEN 'windows_phone' 
               WHEN lower(agent) RLIKE 'windows' THEN 'windows_pc' 
               ELSE 'unknown' 
           END AS device, 
           CASE 
               WHEN lower(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler' 
               WHEN lower(agent) RLIKE 'feed' OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) RLIKE 'feed' THEN 'feed' 
               WHEN lower(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)' AND agent RLIKE '^(Mozilla|Opera)' AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) NOT RLIKE 'feed' THEN 'user' 
               ELSE 'unknown' 
           END AS identity
    FROM (
        SELECT 
            SPLIT(col, '##@@')[0] AS ip, 
            SPLIT(col, '##@@')[1] AS uid, 
            SPLIT(col, '##@@')[2] AS tm, 
            SPLIT(col, '##@@')[3] AS request, 
            SPLIT(col, '##@@')[4] AS status, 
            SPLIT(col, '##@@')[5] AS bytes, 
            SPLIT(col, '##@@')[6] AS referer, 
            SPLIT(col, '##@@')[7] AS agent
        FROM ods_raw_log_d_spark
        WHERE dt = '${bizdate}'
    ) a;
    説明

    必要に応じて、コード内の location アドレスを置き換えてください。dw-spark-demo は、環境を準備した ときに作成した OSS バケットの名前です。

  3. ノードの設定タブの右側のナビゲーションウィンドウで、[デバッグ設定] をクリックします。[デバッグ設定] タブで、次のパラメーターを設定します。これらのパラメーターは、ステップ 4 でワークフローをテストするために使用されます。

    パラメーター

    説明

    計算資源

    環境を準備する ときにワークスペースに関連付けられた Spark 計算資源を選択します。

    リソースグループ

    環境を準備する ときにワークスペースに関連付けられたサーバーレスリソースグループを選択します。

    スクリプトパラメーター

    bizdate パラメーターの [パラメーター値] 列に、yyyymmdd 形式で値を入力します。例: bizdate=20250223。ワークフローをデバッグすると、Data Studio はワークフロー内のノードに定義された変数を定数に置き換えます。

  4. (オプション) スケジューリングプロパティを設定します。

    このチュートリアルでは、スケジューリングパラメーターのデフォルト値を維持できます。ノード編集ページの右側のペインで [スケジューリング設定] をクリックできます。パラメーターの詳細については、「ノードのスケジューリングプロパティを設定する」をご参照ください。

    • スケジューリングパラメーター: これらは、このチュートリアルのワークフローですでに設定されています。内部ノード用に設定する必要はありません。タスクやコードで直接使用できます。

    • スケジューリングポリシー: 子ノードがワークフローの開始後に実行を待機する時間を指定するには、[遅延実行時間] パラメーターを設定できます。このチュートリアルでは、このパラメーターを設定する必要はありません。

  5. [保存] をクリックします。

dws_user_info_all_di_spark ノードの設定

このノードは、基本ユーザー情報テーブル ods_user_info_d_spark とファクトログテーブル dwd_log_info_di_spark を集約し、集約結果を dws_user_info_all_di_spark テーブルに同期するために使用されます。

  1. ワークフローのキャンバスで、dws_user_info_all_di_spark ノードにポインターを移動し、[ノードを開く] をクリックします。

  2. 次の SQL 文をコピーして、コードエディタに貼り付けます:

    -- シナリオ: 次のサンプルコードの SQL 文は Spark SQL 文です。uid フィールドに基づいて dwd_log_info_di_spark テーブルと ods_user_info_d_spark テーブルを結合し、指定された dt パーティションにデータを書き込むことができます。
    -- 注:
    --      DataWorks は、スケジューリングシナリオで日次増分データを宛先テーブルの対応するパーティションに書き込むために使用できるスケジューリングパラメーターを提供します。
    --      実際の開発シナリオでは、${変数名} 形式でノードコードに変数を定義できます。その後、ノードの設定タブの [プロパティ] タブで、スケジューリングパラメーターを値として変数に割り当てることができます。これにより、スケジューリングシナリオでのスケジューリングパラメーターの設定に基づいて、ノードコード内のスケジューリングパラメーターの値が動的に置き換えられます。
    CREATE TABLE IF NOT EXISTS dws_user_info_all_di_spark (
        uid        STRING COMMENT 'ユーザー ID',
        gender     STRING COMMENT '性別',
        age_range  STRING COMMENT '年齢範囲',
        zodiac     STRING COMMENT '星座',
        device     STRING COMMENT '端末タイプ',
        method     STRING COMMENT 'HTTP リクエストタイプ',
        url        STRING COMMENT 'url',
        `time`     STRING COMMENT 'yyyymmddhh:mi:ss 形式の時間'
    )
    PARTITIONED BY (dt STRING)
    LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/dws_user_info_all_di_spark/log_${bizdate}/';
    
    -- パーティションを追加します。
    ALTER TABLE dws_user_info_all_di_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}');
    
    -- 基本ユーザー情報テーブルとファクトログテーブルからデータを挿入します。
    INSERT OVERWRITE TABLE dws_user_info_all_di_spark PARTITION (dt = '${bizdate}')
    SELECT 
        COALESCE(a.uid, b.uid) AS uid,
        b.gender AS gender,    
        b.age_range AS age_range,
        b.zodiac AS zodiac,
        a.device AS device,
        a.method AS method,
        a.url AS url,
        a.tm
    FROM (
      SELECT * 
      FROM dwd_log_info_di_spark 
      WHERE dt='${bizdate}'
    ) a
    LEFT OUTER JOIN (
      SELECT * 
      FROM ods_user_info_d_spark 
      WHERE dt='${bizdate}'
    ) b
    ON 
        a.uid = b.uid;
    説明

    必要に応じて、コード内の location アドレスを置き換えてください。dw-spark-demo は、環境を準備した ときに作成した OSS バケットの名前です。

  3. ノードの設定タブの右側のナビゲーションウィンドウで、[デバッグ設定] をクリックします。[デバッグ設定] タブで、次のパラメーターを設定します。これらのパラメーターは、ステップ 4 でワークフローをテストするために使用されます。

    パラメーター

    説明

    計算資源

    環境を準備する ときにワークスペースに関連付けられた Spark 計算資源を選択します。

    リソースグループ

    環境を準備する ときにワークスペースに関連付けられたサーバーレスリソースグループを選択します。

    スクリプトパラメーター

    bizdate パラメーターの [パラメーター値] 列に、yyyymmdd 形式で値を入力します。例: bizdate=20250223。ワークフローをデバッグすると、Data Studio はワークフロー内のノードに定義された変数を定数に置き換えます。

  4. (オプション) スケジューリングプロパティを設定します。

    このチュートリアルでは、スケジューリングパラメーターのデフォルト値を維持できます。ノード編集ページの右側のペインで [スケジューリング設定] をクリックできます。パラメーターの詳細については、「ノードのスケジューリングプロパティを設定する」をご参照ください。

    • スケジューリングパラメーター: これらは、このチュートリアルのワークフローですでに設定されています。内部ノード用に設定する必要はありません。タスクやコードで直接使用できます。

    • スケジューリングポリシー: 子ノードがワークフローの開始後に実行を待機する時間を指定するには、[遅延実行時間] パラメーターを設定できます。このチュートリアルでは、このパラメーターを設定する必要はありません。

  5. [保存] をクリックします。

ads_user_info_1d_spark ノードの設定

このノードは、dws_user_info_all_di_spark テーブルのデータをさらに処理し、テーブルから ads_user_info_1d_spark テーブルにデータを同期して、基本的なユーザープロファイルを生成するために使用されます。

  1. ワークフローのキャンバスで、ads_user_info_1d_spark ノードにポインターを移動し、[ノードを開く] をクリックします。

  2. 次の SQL 文をコピーして、コードエディタに貼り付けます:

    -- シナリオ: 次のサンプルコードの SQL 文は Spark SQL 文です。Spark SQL 関数を使用して、Spark SQL の dws_user_info_all_di_spark テーブルをさらに処理し、データを ads_user_info_1d_spark テーブルに同期できます。
    -- 注:
    --      DataWorks は、スケジューリングシナリオで日次増分データを宛先テーブルの対応するパーティションに書き込むために使用できるスケジューリングパラメーターを提供します。
    --      実際の開発シナリオでは、${変数名} 形式でノードコードに変数を定義できます。その後、ノードの設定タブの [プロパティ] タブで、スケジューリングパラメーターを値として変数に割り当てることができます。これにより、スケジューリングシナリオでのスケジューリングパラメーターの設定に基づいて、ノードコード内のスケジューリングパラメーターの値が動的に置き換えられます。
    CREATE TABLE IF NOT EXISTS ads_user_info_1d_spark (
      uid STRING COMMENT 'ユーザー ID',
      device STRING COMMENT '端末タイプ',
      pv BIGINT COMMENT 'pv',
      gender STRING COMMENT '性別',
      age_range STRING COMMENT '年齢範囲',
      zodiac STRING COMMENT '星座'
    )
    PARTITIONED BY (
      dt STRING
    )
    LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ads_user_info_1d_spark/log_${bizdate}/';
    
    ALTER TABLE ads_user_info_1d_spark ADD IF NOT EXISTS PARTITION (dt='${bizdate}');
    
    INSERT OVERWRITE TABLE ads_user_info_1d_spark PARTITION (dt='${bizdate}')
    SELECT uid
      , MAX(device)
      , COUNT(0) AS pv
      , MAX(gender)
      , MAX(age_range)
      , MAX(zodiac)
    FROM dws_user_info_all_di_spark
    WHERE dt = '${bizdate}'
    GROUP BY uid; 
    説明

    必要に応じて、コード内の location アドレスを置き換えてください。dw-spark-demo は、環境を準備した ときに作成した OSS バケットの名前です。

  3. ノードの設定タブの右側のナビゲーションウィンドウで、[デバッグ設定] をクリックします。[デバッグ設定] タブで、次のパラメーターを設定します。これらのパラメーターは、ステップ 4 でワークフローをテストするために使用されます。

    パラメーター

    説明

    計算資源

    環境を準備する ときにワークスペースに関連付けられた Spark 計算資源を選択します。

    リソースグループ

    環境を準備する ときにワークスペースに関連付けられたサーバーレスリソースグループを選択します。

    スクリプトパラメーター

    bizdate パラメーターの [パラメーター値] 列に、yyyymmdd 形式で値を入力します。例: bizdate=20250223。ワークフローをデバッグすると、Data Studio はワークフロー内のノードに定義された変数を定数に置き換えます。

  4. (オプション) スケジューリングプロパティを設定します。

    このチュートリアルでは、スケジューリングパラメーターのデフォルト値を維持できます。ノード編集ページの右側のペインで [スケジューリング設定] をクリックできます。パラメーターの詳細については、「ノードのスケジューリングプロパティを設定する」をご参照ください。

    • スケジューリングパラメーター: これらは、このチュートリアルのワークフローですでに設定されています。内部ノード用に設定する必要はありません。タスクやコードで直接使用できます。

    • スケジューリングポリシー: 子ノードがワークフローの開始後に実行を待機する時間を指定するには、[遅延実行時間] パラメーターを設定できます。このチュートリアルでは、このパラメーターを設定する必要はありません。

  5. [保存] をクリックします。

ステップ 3: データの処理

  1. データを同期します。

    ワークフローのツールバーで、[実行] をクリックします。この実行でノードに定義されているパラメーター変数の値を設定します。このチュートリアルでは、例として 20250223 を使用します。必要に応じて値を変更できます。[OK] をクリックし、プロセスが完了するのを待ちます。

  2. データ処理結果をクエリします。

    1. SQL クエリページに移動します。

      DataWorks コンソールにログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、データ分析とサービス > データ分析 を選択します。表示されたページで、[DataAnalysis に移動] をクリックします。表示されたページの左側のナビゲーションウィンドウで、[SQL クエリ] をクリックします。

    2. SQL クエリファイルを設定します。

      1. [マイファイル] の横にある image アイコンをクリックし、[ファイルの作成] を選択します。[ファイルの作成] ダイアログボックスで、[ファイル名] パラメーターを設定し、[OK] をクリックします。

      2. 左側のナビゲーションツリーで、作成した SQL ファイルを見つけて、ファイルの設定タブに移動します。

      3. 設定タブの右上隅にある image アイコンをクリックします。表示されるポップオーバーで、次のパラメーターを設定します。

        パラメーター

        説明

        ワークスペース

        User_profile_analysis_Spark ワークフローが属するワークスペースを選択します。

        データソースタイプ

        ドロップダウンリストから EMR Spark SQL を選択します。

        データソース名

        環境を準備する ときにワークスペースに関連付けられた EMR Serverless Spark 計算資源を選択します。

      4. [OK] をクリックします。

    3. クエリの SQL 文を記述します。

      このトピックのすべてのノードが正常に実行された後、次の SQL 文を記述して実行し、EMR Spark SQL ノードに基づいて外部テーブルが期待どおりに作成されたかどうかを確認します。

      -- パーティションフィルター条件を現在の操作のデータタイムスタンプに更新する必要があります。たとえば、ノードが 2025 年 2 月 23 日に実行されるようにスケジュールされている場合、ノードのデータタイムスタンプは 20250222 であり、ノードのスケジューリング時間より 1 日早くなります。
      SELECT * FROM dwd_log_info_di_spark WHERE dt ='データタイムスタンプ';

ステップ 4: ワークフローのデプロイ

ノードを自動的にスケジュールするには、事前に本番環境にデプロイする必要があります。以下の手順に従って、ワークフローを本番環境にデプロイできます。

説明

このチュートリアルでは、スケジューリングパラメーターは ワークフローレベルのスケジューリング設定 でグローバルに設定されています。公開前に個々のノードのスケジューリングパラメーターを設定する必要はありません。

1. Data Studio の左側のナビゲーションウィンドウで、image をクリックして Data Studio ページに移動します。

2. [ワークスペースディレクトリ] で、作成したワークフローを見つけてクリックし、ワークフロー設定ページを開きます。

3. ノードツールバーで、[デプロイ] をクリックします。

4. [本番環境へのデプロイを開始] をクリックし、ガイド付きの手順に従ってプロセスを完了します。

ステップ 5: 本番環境でのノードの実行

タスクがデプロイされると、そのインスタンスが生成され、翌日に実行されます。バックフィル操作を実行して、公開されたワークフローが本番環境で期待どおりに実行できるかどうかを確認できます。詳細については、「データのバックフィルとデータバックフィルインスタンスの表示 (新バージョン)」をご参照ください。

  1. ノードがデプロイされた後、Data Studio ページの上部のナビゲーションバーで [オペレーションセンター] をクリックします。

    DataWorks コンソールの左上隅にある 图标 アイコンをクリックし、すべてのプロダクト > データ開発とタスク操作 > オペレーションセンター を選択することもできます。

  2. オペレーションセンターページの左側のナビゲーションウィンドウで、自動トリガーノード O&M > 自動トリガーノード を選択します。[自動トリガーノード] ページで、ゼロロードノード workshop_start_spark を見つけて、ノード名をクリックします。

  3. ノードの有向非巡回グラフ (DAG) で、workshop_start_spark ノードを右クリックし、実行 > 現在および子孫ノードを遡及的に を選択します。

  4. [データのバックフィル] パネルで、データをバックフィルするノードを選択し、[データタイムスタンプ] パラメーターを設定してから、[送信してリダイレクト] をクリックします。

  5. [データバックフィル] ページの上部で、[更新] をクリックして、workshop_start_spark ノードとその子孫ノードが正常に実行されたかどうかを確認します。

説明

このチュートリアルの操作完了後に追加料金が発生するのを防ぐには、ワークフロー内のすべてのノードに対して 有効期間 パラメーターを設定するか、ゼロロードノード workshop_start_spark[凍結] することができます。

その他の操作

  • 視覚化された方法でデータを表示する: ユーザープロファイル分析が完了したら、DataAnalysis を使用して処理されたデータをチャートで表示します。これにより、主要な情報をすばやく抽出し、データの背後にあるビジネスの傾向を把握できます。

  • データ品質の監視: データ処理後に生成されるテーブルの監視ルールを設定して、ダーティデータを事前に特定して遮断し、ダーティデータの影響が拡大するのを防ぎます。

  • データの管理: ユーザープロファイル分析タスクが完了すると、Spark でデータテーブルが生成されます。生成されたデータテーブルをデータマップで表示し、データリネージに基づいてテーブル間の関係を表示できます。

  • DataService Studio API を使用してサービスを提供する: 最終的に処理されたデータを取得した後、DataService Studio の標準化された API を使用してデータを共有し、API を使用してデータを受信する他のビジネスモジュールにデータを提供します。