このトピックでは、E-MapReduce (EMR) Spark SQLノードに基づいて作成された ods_user_info_d_spark および ods_raw_log_d_spark 外部テーブルを使用して、非公開の Object Storage Service (OSS) バケットに同期されたユーザーの基本的なユーザー情報と Web サイトアクセスログにアクセスする方法、および他の EMR Spark SQLノードを使用して同期されたデータを処理して、目的のユーザープロファイルデータを取得する方法について説明します。 このトピックは、Spark SQL を使用して同期されたデータを計算および分析し、データウェアハウスの単純なデータ処理を完了する方法を理解するのに役立ちます。
前提条件
必要なデータが同期されています。 詳細については、「データの同期」をご参照ください。
ods_user_info_d_spark外部テーブルは、EMR Spark SQLノードに基づいて作成され、外部テーブルを使用して、非公開 OSS バケットに同期された基本的なユーザー情報にアクセスできます。ods_raw_log_d_spark外部テーブルは、EMR Spark SQLノードに基づいて作成され、外部テーブルを使用して、非公開 OSS バケットに同期されたユーザーの Web サイトアクセスログにアクセスできます。
注意事項
EMR Serverless Spark ワークスペースは関数登録をサポートしていません。 したがって、ログ情報を分割したり、IP アドレスを地域に変換したりする関数を登録することはできません。 このトピックでは、ods_raw_log_d_spark テーブルは組み込みの Spark SQL 関数を使用して分割され、ユーザープロファイル分析用の dwd_log_info_di_spark テーブルが生成されます。
目的
ods_user_info_d_spark および ods_raw_log_d_spark 外部テーブルを処理して、基本的なユーザープロファイルテーブルを生成します。
Spark SQL を使用して
ods_raw_log_d_sparkテーブルを処理し、dwd_log_info_di_sparkという名前の新しいログテーブルを生成します。uid フィールドに基づいて
dwd_log_info_di_sparkテーブルとods_user_info_d_sparkテーブルを結合し、dws_user_info_all_di_sparkという名前の集計テーブルを生成します。dws_user_info_all_di_sparkテーブルを処理して、ads_user_info_1d_sparkという名前のテーブルを生成します。 dws_user_info_all_di_spark テーブルには、多数のフィールドと大量のデータが含まれています。 この場合、データ消費に時間がかかる場合があります。 したがって、さらなるデータ処理が必要です。
ステップ 1: ワークフローを設計する
データ同期フェーズでは、ユーザーの基本的なユーザー情報と Web サイトアクセスログが同期されます。 データ処理フェーズでは、dwd_log_info_di_spark ノードを追加してログテーブルを分割し、新しいログテーブルを生成し、次に dws_user_info_all_di_spark ノードを追加して新しいログテーブルと基本ユーザー情報テーブルを結合して集計テーブルを生成します。 次に、ads_user_info_1d_spark ノードを追加して集計テーブルをさらに処理し、基本的なユーザープロファイルテーブルを生成します。
DataStudio ページに移動します。
DataWorks コンソールにログオンします。 上部のナビゲーションバーで、目的のリージョンを選択します。 左側のナビゲーションウィンドウで、 を選択します。 表示されるページで、ドロップダウンリストから目的のワークスペースを選択し、[データ開発に移動] をクリックします。
データ処理用のノードを作成します。 データ同期フェーズでは、EMR Spark SQLノードに基づいて外部テーブルが作成され、非公開 OSS バケットに保存されている同期データにアクセスします。 データ処理フェーズでは、同期データを処理して基本的なユーザープロファイルデータを生成することが目的です。
さまざまなレベルのノードとノードの作業ロジック
ワークフローキャンバスの上部で、[ノードの作成] をクリックして、データ処理のために次の表に記載されているノードを作成します。
ノードカテゴリ
ノードタイプ
ノード名
(出力テーブルにちなんで名付けられました)
コードロジック
EMR
EMR Spark SQLdwd_log_info_di_spark
ods_raw_log_d_spark テーブルを分割して、後続の結合操作用の新しいログテーブルを生成します。
EMR
EMR Spark SQLdws_user_infor_all_di_spark
基本ユーザー情報テーブルと新しいログテーブルを結合して、集計テーブルを生成します。
EMR
EMR Spark SQLads_user_info_1d_spark
集計テーブルをさらに処理して、基本的なユーザープロファイルテーブルを生成します。
ワークフローの有向非巡回グラフ (DAG)
ノードをワークフローキャンバスにドラッグし、線を描画してノード間の依存関係を設定することにより、データ処理のワークフローを設計します。

ステップ 2: EMR Spark SQLノードを構成する
ワークフローの構成が完了したら、EMR Spark SQLノードの Spark SQL 関数を使用して ods_raw_log_d_spark テーブルを分割し、新しいログテーブルを取得できます。 次に、新しいログテーブルと基本ユーザー情報テーブルを結合して集計テーブルを生成し、集計テーブルをさらにクレンジングおよび処理して、各ユーザーのユーザープロファイルを作成できます。
dwd_log_info_di_spark ノードを構成する
ワークフローの構成タブで、dwd_log_info_di_spark ノードをダブルクリックします。 ノードの構成タブで、ods_raw_log_d_spark テーブルを処理し、処理済みデータを dwd_log_info_di_spark テーブルに書き込む SQL コードを入力します。
ノードコードを構成します。
dwd_log_info_di_sparkノードをダブルクリックして、ノードの構成タブに移動します。 次のステートメントを記述します。-- シナリオ: 次のサンプルコードの SQL ステートメントは Spark SQL ステートメントです。 Spark SQL 関数を使用して、##@@ を使用して Spark にロードされた ods_raw_log_d_spark テーブルのデータを分割して複数のフィールドを生成し、それらのフィールドを dwd_log_info_di_spark テーブルに書き込むことができます。 -- 注意: -- DataWorks は、スケジューリングシナリオで日次増分データを宛先テーブルの対応するパーティションに書き込むために使用できるスケジューリングパラメータを提供します。 -- 実際の開発シナリオでは、${変数名} 形式でノードコードに変数を定義できます。 次に、ノードの構成タブの [プロパティ] タブで、スケジューリングパラメータを値として変数に割り当てることができます。 これにより、スケジューリングシナリオのスケジューリングパラメータの構成に基づいて、ノードコードのスケジューリングパラメータの値が動的に置き換えられます。 CREATE TABLE IF NOT EXISTS dwd_log_info_di_spark ( ip STRING COMMENT 'IPアドレス', uid STRING COMMENT 'ユーザーID', tm STRING COMMENT 'yyyymmddhh:mi:ss 形式の時間', status STRING COMMENT 'サーバーから返される状態コード', bytes STRING COMMENT 'クライアントに返されるバイト数', method STRING COMMENT'リクエストメソッド', url STRING COMMENT 'URL', protocol STRING COMMENT 'プロトコル', referer STRING , device STRING, identity STRING ) PARTITIONED BY ( dt STRING ); ALTER TABLE dwd_log_info_di_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}'); INSERT OVERWRITE TABLE dwd_log_info_di_spark PARTITION (dt='${bizdate}') SELECT ip, uid, tm, status, bytes, regexp_extract(request, '(^[^ ]+) .*', 1) AS method, regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) AS url, regexp_extract(request, '.* ([^ ]+$)', 1) AS protocol, regexp_extract(referer, '^[^/]+://([^/]+){1}', 1) AS referer, CASE WHEN lower(agent) RLIKE 'android' THEN 'android' WHEN lower(agent) RLIKE 'iphone' THEN 'iphone' WHEN lower(agent) RLIKE 'ipad' THEN 'ipad' WHEN lower(agent) RLIKE 'macintosh' THEN 'macintosh' WHEN lower(agent) RLIKE 'windows phone' THEN 'windows_phone' WHEN lower(agent) RLIKE 'windows' THEN 'windows_pc' ELSE 'unknown' END AS device, CASE WHEN lower(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler' WHEN lower(agent) RLIKE 'feed' OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) RLIKE 'feed' THEN 'feed' WHEN lower(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)' AND agent RLIKE '^(Mozilla|Opera)' AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) NOT RLIKE 'feed' THEN 'user' ELSE 'unknown' END AS identity FROM ( SELECT SPLIT(col, '##@@')[0] AS ip, SPLIT(col, '##@@')[1] AS uid, SPLIT(col, '##@@')[2] AS tm, SPLIT(col, '##@@')[3] AS request, SPLIT(col, '##@@')[4] AS status, SPLIT(col, '##@@')[5] AS bytes, SPLIT(col, '##@@')[6] AS referer, SPLIT(col, '##@@')[7] AS agent FROM ods_raw_log_d_spark WHERE dt = '${bizdate}' ) a;ノードのスケジューリングプロパティを構成します。
セクション
スクリーンショット
パラメータの追加
[パラメータの追加] セクションの [スケジューリングパラメータ] をクリックします。 テーブルに表示される行で、スケジューリングパラメータとスケジューリングパラメータの値を指定できます。
パラメータ名を
bizdateに設定します。パラメータ値を
$[yyyymmdd-1]に設定します。
詳細については、「スケジューリングパラメータの構成」をご参照ください。

依存関係
このセクションでは、出力テーブルが現在のノードの出力名として使用されていることを確認します。
出力テーブルには、
ワークスペース名.ノード名の形式で名前が付けられます。詳細については、「スケジューリングの依存関係の構成」をご参照ください。
説明[スケジュール] セクションで、[スケジューリングサイクル] パラメータを [日] に設定します。 現在のノードの [スケジュールされた時間] パラメータを個別に構成する必要はありません。 現在のノードが毎日実行されるようにスケジュールされている時間は、ワークフローの workshop_start_spark ゼロロードノードのスケジュールされた時間によって決まります。 現在のノードは、毎日 00:30 以降に実行されるようにスケジュールされています。
構成を保存します。
この例では、ビジネス要件に基づいて他の必要な構成項目を構成できます。 構成が完了したら、ノードの構成タブの上部ツールバーにある
アイコンをクリックして、ノード構成を保存します。ログテーブルの分割結果を確認します。
祖先ノードと現在のノードが正常に実行されたら、DataStudio ページの左側のナビゲーションウィンドウで [アドホッククエリ] をクリックします。 [アドホッククエリ] ウィンドウで、[EMR Spark SQL] タイプのアドホッククエリタスクを作成し、SQL ステートメントを記述して、現在のノードによって作成されたテーブルが期待どおりに生成されているかどうかを確認します。
-- パーティションフィルター条件を現在の操作のデータタイムスタンプに更新する必要があります。 たとえば、ノードが 2023 年 2 月 22 日に実行されるようにスケジュールされている場合、ノードのデータタイムスタンプは 20230221 であり、これはノードのスケジュールされた時間よりも 1 日前です。 SELECT * FROM dwd_log_info_di_spark WHERE dt ='データタイムスタンプ';説明このトピックの SQL ステートメントでは、スケジューリングパラメータ
${bizdate}が構成され、値T-1がスケジューリングパラメータに割り当てられます。 バッチコンピューティングシナリオでは、bizdate はビジネストランザクションが実行された日付を示し、これは多くの場合データタイムスタンプと呼ばれます。 たとえば、現在の日に前日の売上高の統計データを収集する場合、前日はビジネストランザクションが実行された日付であり、データタイムスタンプを表します。
dws_user_info_all_di_spark ノードを構成する
uid フィールドに基づいて dwd_log_info_di_spark テーブルと ods_user_info_d_spark テーブルを結合して、dws_user_info_all_di_spark テーブルを生成します。
ノードコードを構成します。
dws_user_info_all_di_spark ノードをダブルクリックして、ノードの構成タブに移動します。 構成タブで、次のステートメントを記述します。
-- シナリオ: 次のサンプルコードの SQL ステートメントは Spark SQL ステートメントです。 uid フィールドに基づいて dwd_log_info_di_spark テーブルと ods_user_info_d_spark テーブルを結合し、指定された dt パーティションにデータを書き込むことができます。 -- 注意: -- DataWorks は、スケジューリングシナリオで日次増分データを宛先テーブルの対応するパーティションに書き込むために使用できるスケジューリングパラメータを提供します。 -- 実際の開発シナリオでは、${変数名} 形式でノードコードに変数を定義できます。 次に、ノードの構成タブの [プロパティ] タブで、スケジューリングパラメータを値として変数に割り当てることができます。 これにより、スケジューリングシナリオのスケジューリングパラメータの構成に基づいて、ノードコードのスケジューリングパラメータの値が動的に置き換えられます。 CREATE TABLE IF NOT EXISTS dws_user_info_all_di_spark ( uid STRING COMMENT 'ユーザーID', gender STRING COMMENT '性別', age_range STRING COMMENT '年齢層', zodiac STRING COMMENT '星座', device STRING COMMENT '端末タイプ', method STRING COMMENT 'HTTP リクエストタイプ', url STRING COMMENT 'URL', `time` STRING COMMENT 'yyyymmddhh:mi:ss 形式の時間' ) PARTITIONED BY (dt STRING); -- パーティションを追加します。 ALTER TABLE dws_user_info_all_di_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}'); -- 基本ユーザー情報テーブルと新しいログテーブルからデータを挿入します。 INSERT OVERWRITE TABLE dws_user_info_all_di_spark PARTITION (dt = '${bizdate}') SELECT COALESCE(a.uid, b.uid) AS uid, b.gender AS gender, b.age_range AS age_range, b.zodiac AS zodiac, a.device AS device, a.method AS method, a.url AS url, a.tm FROM dwd_log_info_di_spark as a LEFT OUTER JOIN ods_user_info_d_spark as b ON a.uid = b.uid ;ノードのスケジューリングプロパティを構成します。
セクション
説明
スクリーンショット
パラメータの追加
[パラメータの追加] セクションの [スケジューリングパラメータ] をクリックします。 テーブルに表示される行で、スケジューリングパラメータとスケジューリングパラメータの値を指定できます。
パラメータ名を
bizdateに設定します。パラメータ値を
$[yyyymmdd-1]に設定します。
詳細については、「スケジューリングパラメータの構成」をご参照ください。

依存関係
このセクションでは、出力テーブルが現在のノードの出力名として使用されていることを確認します。
出力テーブルには、
ワークスペース名.ノード名の形式で名前が付けられます。詳細については、「スケジューリングの依存関係の構成」をご参照ください。
説明[スケジュール] セクションで、[スケジューリングサイクル] パラメータを [日] に設定します。 現在のノードの [スケジュールされた時間] パラメータを個別に構成する必要はありません。 現在のノードが毎日実行されるようにスケジュールされている時間は、ワークフローの workshop_start_spark ゼロロードノードのスケジュールされた時間によって決まります。 現在のノードは、毎日 00:30 以降に実行されるようにスケジュールされています。
構成を保存します。
この例では、ビジネス要件に基づいて他の必要な構成項目を構成できます。 構成が完了したら、ノードの構成タブの上部ツールバーにある
アイコンをクリックして、ノード構成を保存します。データマージ結果を確認します。
祖先ノードと現在のノードが正常に実行されたら、DataStudio ページの左側のナビゲーションウィンドウで [アドホッククエリ] をクリックします。 [アドホッククエリ] ウィンドウで、[EMR Spark SQL] タイプのアドホッククエリタスクを作成し、SQL ステートメントを記述して、現在のノードによって作成されたテーブルが期待どおりに生成されているかどうかを確認します。
-- パーティションフィルター条件を現在の操作のデータタイムスタンプに更新する必要があります。 たとえば、ノードが 2024 年 8 月 8 日に実行されるようにスケジュールされている場合、ノードのデータタイムスタンプは 20240807 であり、これはノードのスケジュールされた時間よりも 1 日前です。 SELECT * FROM dws_user_info_all_di_spark WHERE dt ='データタイムスタンプ';説明このトピックの SQL ステートメントでは、スケジューリングパラメータ
${bizdate}が構成され、値T-1がスケジューリングパラメータに割り当てられます。 バッチコンピューティングシナリオでは、bizdate はビジネストランザクションが実行された日付を示し、これは多くの場合データタイムスタンプと呼ばれます。 たとえば、現在の日に前日の売上高の統計データを収集する場合、前日はビジネストランザクションが実行された日付であり、データタイムスタンプを表します。
ads_user_info_1d_spark ノードを構成する
dws_user_info_all_di_spark テーブルで最大値とカウントの計算を実行して、消費のための基本ユーザープロファイルテーブルとして ads_user_info_1d_spark テーブルを生成します。
ノードコードを構成します。
ads_user_info_1d_spark ノードをダブルクリックして、ノードの構成タブに移動します。 構成タブで、次のステートメントを記述します。
-- シナリオ: 次のサンプルコードの SQL ステートメントは Spark SQL ステートメントです。 Spark SQL 関数を使用して、Spark SQL の dws_user_info_all_di_spark テーブルをさらに処理し、データを ads_user_info_1d_spark テーブルに書き込むことができます。 -- 注意: -- DataWorks は、スケジューリングシナリオで日次増分データを宛先テーブルの対応するパーティションに書き込むために使用できるスケジューリングパラメータを提供します。 -- 実際の開発シナリオでは、${変数名} 形式でノードコードに変数を定義できます。 次に、ノードの構成タブの [プロパティ] タブで、スケジューリングパラメータを値として変数に割り当てることができます。 これにより、スケジューリングシナリオのスケジューリングパラメータの構成に基づいて、ノードコードのスケジューリングパラメータの値が動的に置き換えられます。 CREATE TABLE IF NOT EXISTS ads_user_info_1d_spark ( uid STRING COMMENT 'ユーザーID', device STRING COMMENT '端末タイプ', pv BIGINT COMMENT 'ページビュー', gender STRING COMMENT '性別', age_range STRING COMMENT '年齢層', zodiac STRING COMMENT '星座' ) PARTITIONED BY ( dt STRING ); ALTER TABLE ads_user_info_1d_spark ADD IF NOT EXISTS PARTITION (dt='${bizdate}'); INSERT OVERWRITE TABLE ads_user_info_1d_spark PARTITION (dt='${bizdate}') SELECT uid , MAX(device) , COUNT(0) AS pv , MAX(gender) , MAX(age_range) , MAX(zodiac) FROM dws_user_info_all_di_spark WHERE dt = '${bizdate}' GROUP BY uid;ノードのスケジューリングプロパティを構成します。
セクション
説明
スクリーンショット
パラメータの追加
[パラメータの追加] セクションの [スケジューリングパラメータ] をクリックします。 テーブルに表示される行で、スケジューリングパラメータとスケジューリングパラメータの値を指定できます。
パラメータ名を
bizdateに設定します。パラメータ値を
$[yyyymmdd-1]に設定します。
詳細については、「スケジューリングパラメータの構成」をご参照ください。

依存関係
このセクションでは、出力テーブルが現在のノードの出力名として使用されていることを確認します。
出力テーブルには、
ワークスペース名.ノード名の形式で名前が付けられます。詳細については、「スケジューリングの依存関係の構成」をご参照ください。
説明[スケジュール] セクションで、[スケジューリングサイクル] パラメータを [日] に設定します。 現在のノードの [スケジュールされた時間] パラメータを個別に構成する必要はありません。 現在のノードが毎日実行されるようにスケジュールされている時間は、ワークフローの workshop_start_spark ゼロロードノードのスケジュールされた時間によって決まります。 現在のノードは、毎日 00:30 以降に実行されるようにスケジュールされています。
構成を保存します。
この例では、ビジネス要件に基づいて他の必要な構成項目を構成できます。 構成が完了したら、ノードの構成タブの上部ツールバーにある
アイコンをクリックして、ノード構成を保存します。ユーザープロファイルテーブルの結果を確認します。
祖先ノードと現在のノードが正常に実行されたら、DataStudio ページの左側のナビゲーションウィンドウで [アドホッククエリ] をクリックします。 [アドホッククエリ] ウィンドウで、[EMR Spark SQL] タイプのアドホッククエリタスクを作成し、SQL ステートメントを記述して、現在のノードによって作成されたテーブルが期待どおりに生成されているかどうかを確認します。
-- パーティションフィルター条件を現在の操作のデータタイムスタンプに更新する必要があります。 たとえば、ノードが 2023 年 2 月 22 日に実行されるようにスケジュールされている場合、ノードのデータタイムスタンプは 20230221 であり、これはノードのスケジュールされた時間よりも 1 日前です。 SELECT * FROM ads_user_info_1d_spark WHERE dt ='データタイムスタンプ';説明このトピックの SQL ステートメントでは、スケジューリングパラメータ
${bizdate}が構成され、値T-1がスケジューリングパラメータに割り当てられます。 バッチコンピューティングシナリオでは、bizdate はビジネストランザクションが実行された日付を示し、これは多くの場合データタイムスタンプと呼ばれます。 たとえば、現在の日に前日の売上高の統計データを収集する場合、前日はビジネストランザクションが実行された日付であり、データタイムスタンプを表します。
ステップ 3: ワークフローをコミットする
ワークフローを構成したら、ワークフローが期待どおりに実行できるかどうかをテストします。 テストが成功したら、ワークフローをコミットし、ワークフローがデプロイされるのを待ちます。
ワークフローの構成タブで、
アイコンをクリックしてワークフローを実行します。ワークフローのすべてのノードの横に
アイコンが表示されたら、
アイコンをクリックしてワークフローをコミットします。[コミット] ダイアログボックスで、コミットするノードを選択し、説明を入力して、[I/O の不整合アラートを無視する] を選択します。 次に、[確認] をクリックします。
ワークフローがコミットされたら、ワークフロー内のノードをデプロイできます。
ワークフローの構成タブの右上隅にある [デプロイ] をクリックします。 [デプロイタスクの作成] ページが表示されます。
デプロイするノードを選択し、[デプロイ] をクリックします。 [デプロイタスクの作成] ダイアログボックスで、[デプロイ] をクリックします。
ステップ 4: 本番環境でノードを実行する
ある日にノードをデプロイした後、ノード用に生成されたインスタンスは次の日に実行されるようにスケジュールできます。 [データバックフィル] 機能を使用して、デプロイされたワークフロー内のノードのデータをバックフィルできます。 これにより、ノードを本番環境で実行できるかどうかを確認できます。 詳細については、「データのバックフィルとデータバックフィルインスタンスの表示 (新バージョン)」をご参照ください。
ノードをデプロイした後、右上隅にある [オペレーションセンター] をクリックします。
ワークフローの構成タブの上部ツールバーにある [オペレーションセンター] をクリックして、[オペレーションセンター] ページに移動することもできます。
左側のナビゲーションウィンドウで、 を選択します。 [自動トリガーノード] ページで、workshop_start_spark ゼロロードノードを見つけてクリックします。 ノードの DAG が表示されます。
workshop_start_spark ノードを右クリックし、 を選択します。
[データのバックフィル] パネルで、データをバックフィルするノードを選択し、[データタイムスタンプ] パラメータを構成して、[送信してリダイレクト] をクリックします。 [データバックフィルインスタンスがリストされている] ページが表示されます。
すべての SQL ノードが正常に実行されるまで [更新] をクリックします。