すべてのプロダクト
Search
ドキュメントセンター

DataWorks:データ処理

最終更新日:Jul 17, 2025

このトピックでは、E-MapReduce (EMR) Spark SQLノードに基づいて作成された ods_user_info_d_spark および ods_raw_log_d_spark 外部テーブルを使用して、非公開の Object Storage Service (OSS) バケットに同期されたユーザーの基本的なユーザー情報と Web サイトアクセスログにアクセスする方法、および他の EMR Spark SQLノードを使用して同期されたデータを処理して、目的のユーザープロファイルデータを取得する方法について説明します。 このトピックは、Spark SQL を使用して同期されたデータを計算および分析し、データウェアハウスの単純なデータ処理を完了する方法を理解するのに役立ちます。

前提条件

必要なデータが同期されています。 詳細については、「データの同期」をご参照ください。

  • ods_user_info_d_spark 外部テーブルは、EMR Spark SQLノードに基づいて作成され、外部テーブルを使用して、非公開 OSS バケットに同期された基本的なユーザー情報にアクセスできます。

  • ods_raw_log_d_spark 外部テーブルは、EMR Spark SQLノードに基づいて作成され、外部テーブルを使用して、非公開 OSS バケットに同期されたユーザーの Web サイトアクセスログにアクセスできます。

注意事項

EMR Serverless Spark ワークスペースは関数登録をサポートしていません。 したがって、ログ情報を分割したり、IP アドレスを地域に変換したりする関数を登録することはできません。 このトピックでは、ods_raw_log_d_spark テーブルは組み込みの Spark SQL 関数を使用して分割され、ユーザープロファイル分析用の dwd_log_info_di_spark テーブルが生成されます。

目的

ods_user_info_d_spark および ods_raw_log_d_spark 外部テーブルを処理して、基本的なユーザープロファイルテーブルを生成します。

  1. Spark SQL を使用して ods_raw_log_d_spark テーブルを処理し、dwd_log_info_di_spark という名前の新しいログテーブルを生成します。

  2. uid フィールドに基づいて dwd_log_info_di_spark テーブルと ods_user_info_d_spark テーブルを結合し、dws_user_info_all_di_spark という名前の集計テーブルを生成します。

  3. dws_user_info_all_di_spark テーブルを処理して、ads_user_info_1d_spark という名前のテーブルを生成します。 dws_user_info_all_di_spark テーブルには、多数のフィールドと大量のデータが含まれています。 この場合、データ消費に時間がかかる場合があります。 したがって、さらなるデータ処理が必要です。

ステップ 1: ワークフローを設計する

データ同期フェーズでは、ユーザーの基本的なユーザー情報と Web サイトアクセスログが同期されます。 データ処理フェーズでは、dwd_log_info_di_spark ノードを追加してログテーブルを分割し、新しいログテーブルを生成し、次に dws_user_info_all_di_spark ノードを追加して新しいログテーブルと基本ユーザー情報テーブルを結合して集計テーブルを生成します。 次に、ads_user_info_1d_spark ノードを追加して集計テーブルをさらに処理し、基本的なユーザープロファイルテーブルを生成します。

  1. DataStudio ページに移動します。

    DataWorks コンソールにログオンします。 上部のナビゲーションバーで、目的のリージョンを選択します。 左側のナビゲーションウィンドウで、[データ開発と O&M] > [データ開発] を選択します。 表示されるページで、ドロップダウンリストから目的のワークスペースを選択し、[データ開発に移動] をクリックします。

  2. データ処理用のノードを作成します。 データ同期フェーズでは、EMR Spark SQLノードに基づいて外部テーブルが作成され、非公開 OSS バケットに保存されている同期データにアクセスします。 データ処理フェーズでは、同期データを処理して基本的なユーザープロファイルデータを生成することが目的です。

    • さまざまなレベルのノードとノードの作業ロジック

      ワークフローキャンバスの上部で、[ノードの作成] をクリックして、データ処理のために次の表に記載されているノードを作成します。

      ノードカテゴリ

      ノードタイプ

      ノード名

      (出力テーブルにちなんで名付けられました)

      コードロジック

      EMR

      imageEMR Spark SQL

      dwd_log_info_di_spark

      ods_raw_log_d_spark テーブルを分割して、後続の結合操作用の新しいログテーブルを生成します。

      EMR

      imageEMR Spark SQL

      dws_user_infor_all_di_spark

      基本ユーザー情報テーブル新しいログテーブルを結合して、集計テーブルを生成します。

      EMR

      imageEMR Spark SQL

      ads_user_info_1d_spark

      集計テーブルをさらに処理して、基本的なユーザープロファイルテーブルを生成します。

    • ワークフローの有向非巡回グラフ (DAG)

      ノードをワークフローキャンバスにドラッグし、線を描画してノード間の依存関係を設定することにより、データ処理のワークフローを設計します。

      image

ステップ 2: EMR Spark SQLノードを構成する

ワークフローの構成が完了したら、EMR Spark SQLノードの Spark SQL 関数を使用して ods_raw_log_d_spark テーブルを分割し、新しいログテーブルを取得できます。 次に、新しいログテーブルと基本ユーザー情報テーブルを結合して集計テーブルを生成し、集計テーブルをさらにクレンジングおよび処理して、各ユーザーのユーザープロファイルを作成できます。

dwd_log_info_di_spark ノードを構成する

ワークフローの構成タブで、dwd_log_info_di_spark ノードをダブルクリックします。 ノードの構成タブで、ods_raw_log_d_spark テーブルを処理し、処理済みデータを dwd_log_info_di_spark テーブルに書き込む SQL コードを入力します。

  1. ノードコードを構成します。

    dwd_log_info_di_spark ノードをダブルクリックして、ノードの構成タブに移動します。 次のステートメントを記述します。

    -- シナリオ: 次のサンプルコードの SQL ステートメントは Spark SQL ステートメントです。 Spark SQL 関数を使用して、##@@ を使用して Spark にロードされた ods_raw_log_d_spark テーブルのデータを分割して複数のフィールドを生成し、それらのフィールドを dwd_log_info_di_spark テーブルに書き込むことができます。
    -- 注意:
    --      DataWorks は、スケジューリングシナリオで日次増分データを宛先テーブルの対応するパーティションに書き込むために使用できるスケジューリングパラメータを提供します。
    --      実際の開発シナリオでは、${変数名} 形式でノードコードに変数を定義できます。 次に、ノードの構成タブの [プロパティ] タブで、スケジューリングパラメータを値として変数に割り当てることができます。 これにより、スケジューリングシナリオのスケジューリングパラメータの構成に基づいて、ノードコードのスケジューリングパラメータの値が動的に置き換えられます。
    
    CREATE TABLE IF NOT EXISTS dwd_log_info_di_spark (
      ip STRING COMMENT 'IPアドレス',
      uid STRING COMMENT 'ユーザーID',
      tm STRING COMMENT 'yyyymmddhh:mi:ss 形式の時間',
      status STRING COMMENT 'サーバーから返される状態コード',
      bytes STRING COMMENT 'クライアントに返されるバイト数',
      method STRING COMMENT'リクエストメソッド',
      url STRING COMMENT 'URL',
      protocol STRING COMMENT 'プロトコル',
      referer STRING ,
      device STRING,
      identity STRING
      )
    
    PARTITIONED BY (
      dt STRING
    );
    
    ALTER TABLE dwd_log_info_di_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}');
    
    INSERT OVERWRITE TABLE dwd_log_info_di_spark PARTITION (dt='${bizdate}')
    SELECT ip, 
           uid, 
           tm, 
           status, 
           bytes, 
           regexp_extract(request, '(^[^ ]+) .*', 1) AS method,
           regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) AS url,
           regexp_extract(request, '.* ([^ ]+$)', 1) AS protocol,
           regexp_extract(referer, '^[^/]+://([^/]+){1}', 1) AS referer,
           CASE 
               WHEN lower(agent) RLIKE 'android' THEN 'android' 
               WHEN lower(agent) RLIKE 'iphone' THEN 'iphone' 
               WHEN lower(agent) RLIKE 'ipad' THEN 'ipad' 
               WHEN lower(agent) RLIKE 'macintosh' THEN 'macintosh' 
               WHEN lower(agent) RLIKE 'windows phone' THEN 'windows_phone' 
               WHEN lower(agent) RLIKE 'windows' THEN 'windows_pc' 
               ELSE 'unknown' 
           END AS device, 
           CASE 
               WHEN lower(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler' 
               WHEN lower(agent) RLIKE 'feed' OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) RLIKE 'feed' THEN 'feed' 
               WHEN lower(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)' AND agent RLIKE '^(Mozilla|Opera)' AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) NOT RLIKE 'feed' THEN 'user' 
               ELSE 'unknown' 
           END AS identity
    FROM (
        SELECT 
            SPLIT(col, '##@@')[0] AS ip, 
            SPLIT(col, '##@@')[1] AS uid, 
            SPLIT(col, '##@@')[2] AS tm, 
            SPLIT(col, '##@@')[3] AS request, 
            SPLIT(col, '##@@')[4] AS status, 
            SPLIT(col, '##@@')[5] AS bytes, 
            SPLIT(col, '##@@')[6] AS referer, 
            SPLIT(col, '##@@')[7] AS agent
        FROM ods_raw_log_d_spark
        WHERE dt = '${bizdate}'
    ) a;
  2. ノードのスケジューリングプロパティを構成します。

    セクション

    スクリーンショット

    パラメータの追加

    [パラメータの追加] セクションの [スケジューリングパラメータ] をクリックします。 テーブルに表示される行で、スケジューリングパラメータとスケジューリングパラメータの値を指定できます。

    • パラメータ名を bizdate に設定します。

    • パラメータ値を $[yyyymmdd-1] に設定します。

    詳細については、「スケジューリングパラメータの構成」をご参照ください。

    image

    依存関係

    このセクションでは、出力テーブルが現在のノードの出力名として使用されていることを確認します。

    出力テーブルには、ワークスペース名.ノード名 の形式で名前が付けられます。

    詳細については、「スケジューリングの依存関係の構成」をご参照ください。

    image

    説明

    [スケジュール] セクションで、[スケジューリングサイクル] パラメータを [日] に設定します。 現在のノードの [スケジュールされた時間] パラメータを個別に構成する必要はありません。 現在のノードが毎日実行されるようにスケジュールされている時間は、ワークフローの workshop_start_spark ゼロロードノードのスケジュールされた時間によって決まります。 現在のノードは、毎日 00:30 以降に実行されるようにスケジュールされています。

  3. オプション。 Spark システムパラメータを構成します。

    ノードの構成タブの右側のナビゲーションウィンドウで、[詳細設定] タブをクリックして、Spark 固有のプロパティまたはパラメータを構成できます。 このトピックでは、Spark システムパラメータが使用されます。 次の表に、構成できるシステムパラメータを示します。

    詳細パラメータ

    説明

    SERVERLESS_RELEASE_VERSION

    Serverless Spark エンジンのバージョンを変更します。 例:

    "SERVERLESS_RELEASE_VERSION": "esr-2.1 (Spark 3.3.1, Scala 2.12, Java Runtime)"

    SERVERLESS_QUEUE_NAME

    リソースキューを変更します。 例:

    "SERVERLESS_QUEUE_NAME": "dev_queue"

    SERVERLESS_SQL_COMPUTE

    SQL 計算を変更します。 例:

    "SERVERLESS_SQL_COMPUTE": "sc-b4356b0af6039727"

    FLOW_SKIP_SQL_ANALYZE

    SQL ステートメントの実行方法を指定します。 有効な値:

    • true: 複数の SQL ステートメントが一度に実行されます。

    • false: 一度に 1 つの SQL ステートメントのみが実行されます。

    説明

    このパラメータは、DataWorks ワークスペースの開発環境でのテストにのみ使用できます。

    その他

    • [詳細設定] タブで、EMR Spark SQLノードの カスタム Spark パラメータを追加できます。 たとえば、spark.eventLog.enabled: false を指定すると、DataWorks は、ワークスペースでサポートされている次の形式で EMR Serverless Spark ワークスペースに配信される EMR クラスタのコードを自動補完します。 --conf key=value

    • グローバル Spark パラメータを構成することもできます。 詳細については、「グローバル Spark パラメータの構成」をご参照ください。

    Spark 固有のプロパティまたはパラメータの設定の詳細については、「Spark 構成」をご参照ください。

  4. 構成を保存します。

    この例では、ビジネス要件に基づいて他の必要な構成項目を構成できます。 構成が完了したら、ノードの構成タブの上部ツールバーにある image.png アイコンをクリックして、ノード構成を保存します。

  5. ログテーブルの分割結果を確認します。

    祖先ノードと現在のノードが正常に実行されたら、DataStudio ページの左側のナビゲーションウィンドウで [アドホッククエリ] をクリックします。 [アドホッククエリ] ウィンドウで、[EMR Spark SQL] タイプのアドホッククエリタスクを作成し、SQL ステートメントを記述して、現在のノードによって作成されたテーブルが期待どおりに生成されているかどうかを確認します。

    -- パーティションフィルター条件を現在の操作のデータタイムスタンプに更新する必要があります。 たとえば、ノードが 2023 年 2 月 22 日に実行されるようにスケジュールされている場合、ノードのデータタイムスタンプは 20230221 であり、これはノードのスケジュールされた時間よりも 1 日前です。
    SELECT * FROM dwd_log_info_di_spark WHERE dt ='データタイムスタンプ';
    説明

    このトピックの SQL ステートメントでは、スケジューリングパラメータ ${bizdate} が構成され、値 T-1 がスケジューリングパラメータに割り当てられます。 バッチコンピューティングシナリオでは、bizdate はビジネストランザクションが実行された日付を示し、これは多くの場合データタイムスタンプと呼ばれます。 たとえば、現在の日に前日の売上高の統計データを収集する場合、前日はビジネストランザクションが実行された日付であり、データタイムスタンプを表します。

dws_user_info_all_di_spark ノードを構成する

uid フィールドに基づいて dwd_log_info_di_spark テーブルと ods_user_info_d_spark テーブルを結合して、dws_user_info_all_di_spark テーブルを生成します。

  1. ノードコードを構成します。

    dws_user_info_all_di_spark ノードをダブルクリックして、ノードの構成タブに移動します。 構成タブで、次のステートメントを記述します。

    -- シナリオ: 次のサンプルコードの SQL ステートメントは Spark SQL ステートメントです。 uid フィールドに基づいて dwd_log_info_di_spark テーブルと ods_user_info_d_spark テーブルを結合し、指定された dt パーティションにデータを書き込むことができます。
    -- 注意:
    --      DataWorks は、スケジューリングシナリオで日次増分データを宛先テーブルの対応するパーティションに書き込むために使用できるスケジューリングパラメータを提供します。
    --      実際の開発シナリオでは、${変数名} 形式でノードコードに変数を定義できます。 次に、ノードの構成タブの [プロパティ] タブで、スケジューリングパラメータを値として変数に割り当てることができます。 これにより、スケジューリングシナリオのスケジューリングパラメータの構成に基づいて、ノードコードのスケジューリングパラメータの値が動的に置き換えられます。
    CREATE TABLE IF NOT EXISTS dws_user_info_all_di_spark (
        uid        STRING COMMENT 'ユーザーID',
        gender     STRING COMMENT '性別',
        age_range  STRING COMMENT '年齢層',
        zodiac     STRING COMMENT '星座',
        device     STRING COMMENT '端末タイプ',
        method     STRING COMMENT 'HTTP リクエストタイプ',
        url        STRING COMMENT 'URL',
        `time`     STRING COMMENT 'yyyymmddhh:mi:ss 形式の時間'
    )
    PARTITIONED BY (dt STRING);
    
    -- パーティションを追加します。
    ALTER TABLE dws_user_info_all_di_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}');
    
    -- 基本ユーザー情報テーブルと新しいログテーブルからデータを挿入します。
    INSERT OVERWRITE TABLE dws_user_info_all_di_spark PARTITION (dt = '${bizdate}')
    SELECT 
        COALESCE(a.uid, b.uid) AS uid,
        b.gender AS gender,    
        b.age_range AS age_range,
        b.zodiac AS zodiac,
        a.device AS device,
        a.method AS method,
        a.url AS url,
        a.tm
    FROM 
       dwd_log_info_di_spark as a
    LEFT OUTER JOIN 
        ods_user_info_d_spark as b
    ON 
        a.uid = b.uid ;
  2. ノードのスケジューリングプロパティを構成します。

    セクション

    説明

    スクリーンショット

    パラメータの追加

    [パラメータの追加] セクションの [スケジューリングパラメータ] をクリックします。 テーブルに表示される行で、スケジューリングパラメータとスケジューリングパラメータの値を指定できます。

    • パラメータ名を bizdate に設定します。

    • パラメータ値を $[yyyymmdd-1] に設定します。

    詳細については、「スケジューリングパラメータの構成」をご参照ください。

    image

    依存関係

    このセクションでは、出力テーブルが現在のノードの出力名として使用されていることを確認します。

    出力テーブルには、ワークスペース名.ノード名 の形式で名前が付けられます。

    詳細については、「スケジューリングの依存関係の構成」をご参照ください。

    image

    説明

    [スケジュール] セクションで、[スケジューリングサイクル] パラメータを [日] に設定します。 現在のノードの [スケジュールされた時間] パラメータを個別に構成する必要はありません。 現在のノードが毎日実行されるようにスケジュールされている時間は、ワークフローの workshop_start_spark ゼロロードノードのスケジュールされた時間によって決まります。 現在のノードは、毎日 00:30 以降に実行されるようにスケジュールされています。

  3. オプション。 Spark システムパラメータを構成します。

    ノードの構成タブの右側のナビゲーションウィンドウで、[詳細設定] タブをクリックして、Spark 固有のプロパティまたはパラメータを構成できます。 このトピックでは、Spark システムパラメータが使用されます。 次の表に、構成できるシステムパラメータを示します。

    詳細パラメータ

    説明

    SERVERLESS_RELEASE_VERSION

    Serverless Spark エンジンのバージョンを変更します。 例:

    "SERVERLESS_RELEASE_VERSION": "esr-2.1 (Spark 3.3.1, Scala 2.12, Java Runtime)"

    SERVERLESS_QUEUE_NAME

    リソースキューを変更します。 例:

    "SERVERLESS_QUEUE_NAME": "dev_queue"

    SERVERLESS_SQL_COMPUTE

    SQL 計算を変更します。 例:

    "SERVERLESS_SQL_COMPUTE": "sc-b4356b0af6039727"

    FLOW_SKIP_SQL_ANALYZE

    SQL ステートメントの実行方法を指定します。 有効な値:

    • true: 複数の SQL ステートメントが一度に実行されます。

    • false: 一度に 1 つの SQL ステートメントのみが実行されます。

    説明

    このパラメータは、DataWorks ワークスペースの開発環境でのテストにのみ使用できます。

    その他

    • [詳細設定] タブで、EMR Spark SQLノードの カスタム Spark パラメータを追加できます。 たとえば、spark.eventLog.enabled: false を指定すると、DataWorks は、ワークスペースでサポートされている次の形式で EMR Serverless Spark ワークスペースに配信される EMR クラスタのコードを自動補完します。 --conf key=value

    • グローバル Spark パラメータを構成することもできます。 詳細については、「グローバル Spark パラメータの構成」をご参照ください。

    Spark 固有のプロパティまたはパラメータの設定の詳細については、「Spark 構成」をご参照ください。

  4. 構成を保存します。

    この例では、ビジネス要件に基づいて他の必要な構成項目を構成できます。 構成が完了したら、ノードの構成タブの上部ツールバーにある image.png アイコンをクリックして、ノード構成を保存します。

  5. データマージ結果を確認します。

    祖先ノードと現在のノードが正常に実行されたら、DataStudio ページの左側のナビゲーションウィンドウで [アドホッククエリ] をクリックします。 [アドホッククエリ] ウィンドウで、[EMR Spark SQL] タイプのアドホッククエリタスクを作成し、SQL ステートメントを記述して、現在のノードによって作成されたテーブルが期待どおりに生成されているかどうかを確認します。

    -- パーティションフィルター条件を現在の操作のデータタイムスタンプに更新する必要があります。 たとえば、ノードが 2024 年 8 月 8 日に実行されるようにスケジュールされている場合、ノードのデータタイムスタンプは 20240807 であり、これはノードのスケジュールされた時間よりも 1 日前です。
    SELECT * FROM dws_user_info_all_di_spark WHERE dt ='データタイムスタンプ';
    説明

    このトピックの SQL ステートメントでは、スケジューリングパラメータ ${bizdate} が構成され、値 T-1 がスケジューリングパラメータに割り当てられます。 バッチコンピューティングシナリオでは、bizdate はビジネストランザクションが実行された日付を示し、これは多くの場合データタイムスタンプと呼ばれます。 たとえば、現在の日に前日の売上高の統計データを収集する場合、前日はビジネストランザクションが実行された日付であり、データタイムスタンプを表します。

ads_user_info_1d_spark ノードを構成する

dws_user_info_all_di_spark テーブルで最大値とカウントの計算を実行して、消費のための基本ユーザープロファイルテーブルとして ads_user_info_1d_spark テーブルを生成します。

  1. ノードコードを構成します。

    ads_user_info_1d_spark ノードをダブルクリックして、ノードの構成タブに移動します。 構成タブで、次のステートメントを記述します。

    -- シナリオ: 次のサンプルコードの SQL ステートメントは Spark SQL ステートメントです。 Spark SQL 関数を使用して、Spark SQL の dws_user_info_all_di_spark テーブルをさらに処理し、データを ads_user_info_1d_spark テーブルに書き込むことができます。
    -- 注意:
    --      DataWorks は、スケジューリングシナリオで日次増分データを宛先テーブルの対応するパーティションに書き込むために使用できるスケジューリングパラメータを提供します。
    --      実際の開発シナリオでは、${変数名} 形式でノードコードに変数を定義できます。 次に、ノードの構成タブの [プロパティ] タブで、スケジューリングパラメータを値として変数に割り当てることができます。 これにより、スケジューリングシナリオのスケジューリングパラメータの構成に基づいて、ノードコードのスケジューリングパラメータの値が動的に置き換えられます。
    CREATE TABLE IF NOT EXISTS ads_user_info_1d_spark (
      uid STRING COMMENT 'ユーザーID',
      device STRING COMMENT '端末タイプ',
      pv BIGINT COMMENT 'ページビュー',
      gender STRING COMMENT '性別',
      age_range STRING COMMENT '年齢層',
      zodiac STRING COMMENT '星座'
    )
    PARTITIONED BY (
      dt STRING
    );
    ALTER TABLE ads_user_info_1d_spark ADD IF NOT EXISTS PARTITION (dt='${bizdate}');
    INSERT OVERWRITE TABLE ads_user_info_1d_spark PARTITION (dt='${bizdate}')
    SELECT uid
      , MAX(device)
      , COUNT(0) AS pv
      , MAX(gender)
      , MAX(age_range)
      , MAX(zodiac)
    FROM dws_user_info_all_di_spark
    WHERE dt = '${bizdate}'
    GROUP BY uid;
  2. ノードのスケジューリングプロパティを構成します。

    セクション

    説明

    スクリーンショット

    パラメータの追加

    [パラメータの追加] セクションの [スケジューリングパラメータ] をクリックします。 テーブルに表示される行で、スケジューリングパラメータとスケジューリングパラメータの値を指定できます。

    • パラメータ名を bizdate に設定します。

    • パラメータ値を $[yyyymmdd-1] に設定します。

    詳細については、「スケジューリングパラメータの構成」をご参照ください。

    image

    依存関係

    このセクションでは、出力テーブルが現在のノードの出力名として使用されていることを確認します。

    出力テーブルには、ワークスペース名.ノード名 の形式で名前が付けられます。

    詳細については、「スケジューリングの依存関係の構成」をご参照ください。

    image

    説明

    [スケジュール] セクションで、[スケジューリングサイクル] パラメータを [日] に設定します。 現在のノードの [スケジュールされた時間] パラメータを個別に構成する必要はありません。 現在のノードが毎日実行されるようにスケジュールされている時間は、ワークフローの workshop_start_spark ゼロロードノードのスケジュールされた時間によって決まります。 現在のノードは、毎日 00:30 以降に実行されるようにスケジュールされています。

  3. オプション。 Spark システムパラメータを構成します。

    ノードの構成タブの右側のナビゲーションウィンドウで、[詳細設定] タブをクリックして、Spark 固有のプロパティまたはパラメータを構成できます。 このトピックでは、Spark システムパラメータが使用されます。 次の表に、構成できるシステムパラメータを示します。

    詳細パラメータ

    説明

    SERVERLESS_RELEASE_VERSION

    Serverless Spark エンジンのバージョンを変更します。 例:

    "SERVERLESS_RELEASE_VERSION": "esr-2.1 (Spark 3.3.1, Scala 2.12, Java Runtime)"

    SERVERLESS_QUEUE_NAME

    リソースキューを変更します。 例:

    "SERVERLESS_QUEUE_NAME": "dev_queue"

    SERVERLESS_SQL_COMPUTE

    SQL 計算を変更します。 例:

    "SERVERLESS_SQL_COMPUTE": "sc-b4356b0af6039727"

    FLOW_SKIP_SQL_ANALYZE

    SQL ステートメントの実行方法を指定します。 有効な値:

    • true: 複数の SQL ステートメントが一度に実行されます。

    • false: 一度に 1 つの SQL ステートメントのみが実行されます。

    説明

    このパラメータは、DataWorks ワークスペースの開発環境でのテストにのみ使用できます。

    その他

    • [詳細設定] タブで、EMR Spark SQLノードの カスタム Spark パラメータを追加できます。 たとえば、spark.eventLog.enabled: false を指定すると、DataWorks は、ワークスペースでサポートされている次の形式で EMR Serverless Spark ワークスペースに配信される EMR クラスタのコードを自動補完します。 --conf key=value

    • グローバル Spark パラメータを構成することもできます。 詳細については、「グローバル Spark パラメータの構成」をご参照ください。

    Spark 固有のプロパティまたはパラメータの設定の詳細については、「Spark 構成」をご参照ください。

  4. 構成を保存します。

    この例では、ビジネス要件に基づいて他の必要な構成項目を構成できます。 構成が完了したら、ノードの構成タブの上部ツールバーにある image.png アイコンをクリックして、ノード構成を保存します。

  5. ユーザープロファイルテーブルの結果を確認します。

    祖先ノードと現在のノードが正常に実行されたら、DataStudio ページの左側のナビゲーションウィンドウで [アドホッククエリ] をクリックします。 [アドホッククエリ] ウィンドウで、[EMR Spark SQL] タイプのアドホッククエリタスクを作成し、SQL ステートメントを記述して、現在のノードによって作成されたテーブルが期待どおりに生成されているかどうかを確認します。

    -- パーティションフィルター条件を現在の操作のデータタイムスタンプに更新する必要があります。 たとえば、ノードが 2023 年 2 月 22 日に実行されるようにスケジュールされている場合、ノードのデータタイムスタンプは 20230221 であり、これはノードのスケジュールされた時間よりも 1 日前です。
    SELECT * FROM ads_user_info_1d_spark WHERE dt ='データタイムスタンプ';
    説明

    このトピックの SQL ステートメントでは、スケジューリングパラメータ ${bizdate} が構成され、値 T-1 がスケジューリングパラメータに割り当てられます。 バッチコンピューティングシナリオでは、bizdate はビジネストランザクションが実行された日付を示し、これは多くの場合データタイムスタンプと呼ばれます。 たとえば、現在の日に前日の売上高の統計データを収集する場合、前日はビジネストランザクションが実行された日付であり、データタイムスタンプを表します。

ステップ 3: ワークフローをコミットする

ワークフローを構成したら、ワークフローが期待どおりに実行できるかどうかをテストします。 テストが成功したら、ワークフローをコミットし、ワークフローがデプロイされるのを待ちます。

  1. ワークフローの構成タブで、运行 アイコンをクリックしてワークフローを実行します。

  2. ワークフローのすべてのノードの横に 成功 アイコンが表示されたら、提交 アイコンをクリックしてワークフローをコミットします。

  3. [コミット] ダイアログボックスで、コミットするノードを選択し、説明を入力して、[I/O の不整合アラートを無視する] を選択します。 次に、[確認] をクリックします。

  4. ワークフローがコミットされたら、ワークフロー内のノードをデプロイできます。

    1. ワークフローの構成タブの右上隅にある [デプロイ] をクリックします。 [デプロイタスクの作成] ページが表示されます。

    2. デプロイするノードを選択し、[デプロイ] をクリックします。 [デプロイタスクの作成] ダイアログボックスで、[デプロイ] をクリックします。

ステップ 4: 本番環境でノードを実行する

ある日にノードをデプロイした後、ノード用に生成されたインスタンスは次の日に実行されるようにスケジュールできます。 [データバックフィル] 機能を使用して、デプロイされたワークフロー内のノードのデータをバックフィルできます。 これにより、ノードを本番環境で実行できるかどうかを確認できます。 詳細については、「データのバックフィルとデータバックフィルインスタンスの表示 (新バージョン)」をご参照ください。

  1. ノードをデプロイした後、右上隅にある [オペレーションセンター] をクリックします。

    ワークフローの構成タブの上部ツールバーにある [オペレーションセンター] をクリックして、[オペレーションセンター] ページに移動することもできます。

  2. 左側のナビゲーションウィンドウで、[自動トリガーノード O&M] > [自動トリガーノード] を選択します。 [自動トリガーノード] ページで、workshop_start_spark ゼロロードノードを見つけてクリックします。 ノードの DAG が表示されます。

  3. workshop_start_spark ノードを右クリックし、[実行] > [現在および子孫ノードを遡及的に] を選択します。

  4. [データのバックフィル] パネルで、データをバックフィルするノードを選択し、[データタイムスタンプ] パラメータを構成して、[送信してリダイレクト] をクリックします。 [データバックフィルインスタンスがリストされている] ページが表示されます。

  5. すべての SQL ノードが正常に実行されるまで [更新] をクリックします。