すべてのプロダクト
Search
ドキュメントセンター

DataWorks:EMR Hive ノードを使用したデータ処理

最終更新日:Mar 26, 2026

このチュートリアルでは、DataWorks の E-MapReduce (EMR) Hive ノードを使用して、未加工のユーザーデータとログデータを最終的なユーザープロファイルデータセットに変換します。このチュートリアルを完了すると、以下のことができるようになります:

  • 3つのレイヤーからなるデータ処理パイプライン (ODS → DWD → DWS → ADS) の設計

  • IP アドレスから地理的リージョンを導出するためのユーザー定義関数 (UDF) の登録

  • 各処理ノード用の SQL の設定

  • 開発環境でのパイプラインの実行と検証

  • 本番環境へのワークフローのデプロイとバックフィル実行による検証

このチュートリアルでは、料金が発生するライブリソースが作成されます。予期せぬ費用を避けるため、すべてのノードで有効期間を設定するか、チュートリアル完了後にゼロロードノード workshop_start_emrフリーズ

前提条件

開始する前に、以下が完了していることを確認してください:

  • データ同期。詳細については、「データ同期」をご参照ください。

ステップ 1:データ処理パイプラインの設計

このチュートリアルでは、4つのレイヤーからなるデータアーキテクチャを使用します。各レイヤーには明確な役割があります:

レイヤー役割
ODS (オペレーショナルデータストア)Object Storage Service (OSS) から同期された未加工の入力データ — 変更なし
DWD (データウェアハウス詳細)ODS から派生した、クレンジングおよび構造化されたデータ
DWS (データウェアハウスサマリー)DWD を他の ODS テーブルと結合して生成されたエンリッチ化されたデータ
ADS (アプリケーションデータサービス)ダウンストリームでの利用に対応した集計済み出力

2つの ODS ソーステーブル — ods_user_info_d_emr (基本ユーザー情報) と ods_raw_log_d_emr (ウェブサイトアクセスログ) — は、Object Storage Service (OSS) から同期されます。後続の各レイヤーは、1つの EMR Hive ノードとして実装されます:

レイヤーノード名機能
DWDdwd_log_info_di_emrods_raw_log_d_emr からの未加工のログデータをクレンジングします。getregion UDF と正規表現を使用して、各ログレコードを構造化されたフィールドに分割します。
DWSdws_user_info_all_di_emrクレンジングされたログデータを ods_user_info_d_emr の基本ユーザー情報と結合し、エンリッチ化されたデータセットを生成します。
ADSads_user_info_1d_emrエンリッチ化されたデータをユーザー ID で集計し、最終的なユーザープロファイルを生成します。

パイプライン構造を設定するには:

  1. DataWorks コンソールの ワークスペース ページに移動し、ご利用のワークスペースの DataStudio を開きます。

  2. DATA STUDIO ペインの ワークスペースディレクトリ セクションで、データ同期時に作成した workshop_emr ワークフローを開きます。

  3. dwd_log_info_di_emrdws_user_info_all_di_emrads_user_info_1d_emr という名前の3つの EMR Hive ノードを作成します。

  4. スケジューリングの依存関係を設定し、dwd_log_info_di_emr が最初に実行され、次に dws_user_info_all_di_emr、最後に ads_user_info_1d_emr が実行されるようにします。

結果として得られる依存関係グラフは次のようになります:

image

ステップ 2:UDF の登録

dwd_log_info_di_emr ノードは、getregion という名前の UDF を使用して、各 IP アドレスから地理的リージョンを導出します。ノードを設定する前に、必要な JAR ファイルをアップロードし、UDF を登録します。

JAR リソースのアップロード

  1. ip2region-emr.jar をダウンロードします。

  2. DataStudio ページの左側のナビゲーションウィンドウで、image アイコンをクリックしてリソース管理ペインを開きます。

  3. [リソース管理] ペインで、[リソースの作成] をクリックします。[リソースの作成] ダイアログボックスで、[タイプ] ドロップダウンリストから [EMR Jar] を選択し、名前を設定して [OK] をクリックします。

  4. EMR JAR リソースの設定タブで、次のパラメーターを設定します:

    パラメーター
    ファイル ソースオンプレミス
    ファイルコンテンツ[アップロード] をクリックし、ダウンロードした ip2region-emr.jar ファイルを選択します。
    ストレージ パスOSS。ご利用の EMR クラスターに関連付けられている OSS バケットを選択します。
    データソースデータ同期時にワークスペースに関連付けた計算リソースを選択します。
    [リソースグループ]環境準備時に作成したサーバーレスリソースグループを選択します。
  5. [保存] をクリックし、次に [デプロイ] をクリックして、リソースを開発環境と本番環境の両方にデプロイします。

UDF の登録

  1. [リソース管理] ペインで、作成した EMR JAR リソースを右クリックし、[関数の作成] > [EMR 関数] を選択します。ダイアログボックスで、[名前]getregion に設定し、[OK] をクリックします。

  2. EMR UDF の設定タブで、次のパラメーターを設定します:

    パラメーター
    機能タイプOTHER
    [データソース]データ同期時にワークスペースに関連付けた計算リソースを選択します。
    EMR データベースdefault
    [リソースグループ]環境準備時に作成したサーバーレスリソースグループを選択します。
    オーナー必要な権限を持つユーザーを選択します。
    クラス名org.alidata.emr.udf.Ip2Region
    [リソース]作成した EMR JAR リソースの名前を選択します。
  3. [保存] をクリックし、次に [デプロイ] をクリックして、UDF を両方の環境にデプロイします。

ステップ 3:EMR Hive ノードの設定

各ノードは、処理パイプラインの1つのレイヤーを実装する SQL スクリプトを実行します。各ノードを個別に設定します。

dwd_log_info_di_emr の設定

このノードは、3つの操作を使用して未加工のログデータをクレンジングします:getregion UDF は各 IP アドレスから地理的リージョンを導出し、正規表現はリクエストフィールドから HTTP メソッド、URL、プロトコルを抽出し、ユーザーエージェント文字列に対する CASE-WHEN 式はデバイスタイプとアクセス ID を分類します。ソースレコードは ods_raw_log_d_emr から ##@@ で区切られた文字列として到着し、処理前に個々のフィールドに分割されます。

  1. ワークフローキャンバスで、dwd_log_info_di_emr ノードにカーソルを合わせ、[ノードを開く] をクリックします。[リマインダー] ダイアログボックスで、[保存して開く] をクリックします。

  2. コードエディタに、次の SQL を入力します:

    -- ODS レイヤーにテーブルを作成します。
    CREATE TABLE IF NOT EXISTS dwd_log_info_di_emr (
      ip STRING COMMENT 'IP アドレス',
      uid STRING COMMENT 'ユーザー ID',
      `time` STRING COMMENT 'yyyymmddhh:mi:ss 形式の時刻',
      status STRING COMMENT 'サーバーから返される状態コード',
      bytes STRING COMMENT 'クライアントに返されるバイト数',
      region STRING COMMENT 'IP アドレスに基づいて取得されるリージョン',
      method STRING COMMENT 'HTTP リクエストタイプ',
      url STRING COMMENT 'URL',
      protocol STRING COMMENT 'HTTP のバージョン番号',
      referer STRING COMMENT 'ソース URL',
      device STRING COMMENT '端末タイプ',
      identity STRING COMMENT 'アクセスタイプ。クローラー、フィード、ユーザー、または不明'
    )
    PARTITIONED BY (
      dt STRING
    );
    
    ALTER TABLE dwd_log_info_di_emr ADD IF NOT EXISTS PARTITION (dt='${bizdate}');
    
    set hive.vectorized.execution.enabled = false;
    INSERT OVERWRITE TABLE dwd_log_info_di_emr PARTITION (dt='${bizdate}')
    SELECT ip
      , uid
      , tm
      , status
      , bytes
      , getregion(ip) AS region -- UDF を使用して IP アドレスに基づいてリージョンを取得します。
      , regexp_extract(request, '(^[^ ]+) .*') AS method -- 正規表現を使用してリクエストから3つのフィールドを抽出します。
      , regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') AS url
      , regexp_extract(request, '.* ([^ ]+$)') AS protocol
      , regexp_extract(referer, '^[^/]+://([^/]+){1}') AS referer  -- 正規表現を使用して HTTP リファラーをクレンジングし、より正確な URL を取得します。
      , CASE
        WHEN lower(agent) RLIKE 'android' THEN 'android' -- agent パラメーターの値から端末とアクセスタイプを取得します。
        WHEN lower(agent) RLIKE 'iphone' THEN 'iphone'
        WHEN lower(agent) RLIKE 'ipad' THEN 'ipad'
        WHEN lower(agent) RLIKE 'macintosh' THEN 'macintosh'
        WHEN lower(agent) RLIKE 'windows phone' THEN 'windows_phone'
        WHEN lower(agent) RLIKE 'windows' THEN 'windows_pc'
        ELSE 'unknown'
      END AS device
      , CASE
        WHEN lower(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler'
        WHEN lower(agent) RLIKE 'feed'
        OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') RLIKE 'feed' THEN 'feed'
        WHEN lower(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)'
        AND agent RLIKE '^[Mozilla|Opera]'
        AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') NOT RLIKE 'feed' THEN 'user'
        ELSE 'unknown'
      END AS identity
      FROM (
        SELECT SPLIT(col, '##@@')[0] AS ip
        , SPLIT(col, '##@@')[1] AS uid
        , SPLIT(col, '##@@')[2] AS tm
        , SPLIT(col, '##@@')[3] AS request
        , SPLIT(col, '##@@')[4] AS status
        , SPLIT(col, '##@@')[5] AS bytes
        , SPLIT(col, '##@@')[6] AS referer
        , SPLIT(col, '##@@')[7] AS agent
        FROM ods_raw_log_d_emr
      WHERE dt = '${bizdate}'
    ) a;
  3. 右側のナビゲーションウィンドウで [デバッグ設定] をクリックし、次のパラメーターを設定します:

    パラメーター
    計算リソースご利用のワークスペースに関連付けられた EMR 計算リソースを選択します。詳細については、「環境の準備」をご参照ください。
    リソースグループ環境準備時に作成したサーバーレスリソースグループを選択します。
    スクリプトパラメーター空白のままにします。ワークフローをステップ 4 で実行すると、${bizdate} 変数は定数値に置き換えられます。
  4. 上部のツールバーにある image アイコンをクリックしてノードを保存します。

dws_user_info_all_di_emr の設定

このノードは、クレンジングされたログデータをユーザープロファイルの属性でエンリッチ化します。dwd_log_info_di_emr (ログデータ) と ods_user_info_d_emr (基本ユーザーデータ) の間でユーザー ID (uid) をキーに LEFT OUTER JOIN を実行し、性別、年齢層、星座などのデモグラフィックフィールドを各ログレコードに追加します。ログテーブルに一致するユーザーエントリがないレコードも保持されます。

  1. ワークフローキャンバスで、dws_user_info_all_di_emr ノードにカーソルを合わせ、[ノードを開く] をクリックします。[リマインダー] ダイアログボックスで、[保存して開く] をクリックします。

  2. コードエディタに、次の SQL を入力します:

    -- DW レイヤーにテーブルを作成します。
    CREATE TABLE IF NOT EXISTS dws_user_info_all_di_emr (
      uid STRING COMMENT 'ユーザー ID',
      gender STRING COMMENT '性別',
      age_range STRING COMMENT '年齢層',
      zodiac STRING COMMENT '星座',
      region STRING COMMENT 'IP アドレスに基づいて取得されるリージョン',
      device STRING COMMENT '端末タイプ',
      identity STRING COMMENT 'アクセスタイプ。クローラー、フィード、ユーザー、または不明',
      method STRING COMMENT 'HTTP リクエストタイプ',
      url STRING COMMENT 'URL',
      referer STRING COMMENT 'ソース URL',
      `time` STRING COMMENT 'yyyymmddhh:mi:ss 形式の時刻'
    )
    PARTITIONED BY (
      dt STRING
    );
    
    ALTER TABLE dws_user_info_all_di_emr ADD IF NOT EXISTS PARTITION (dt='${bizdate}');
    
    INSERT OVERWRITE TABLE dws_user_info_all_di_emr PARTITION (dt='${bizdate}')
    SELECT COALESCE(a.uid, b.uid) AS uid
      , b.gender
      , b.age_range
      , b.zodiac
      , a.region
      , a.device
      , a.identity
      , a.method
      , a.url
      , a.referer
      , a.`time`
    FROM (
      SELECT *
      FROM dwd_log_info_di_emr
      WHERE dt = '${bizdate}'
    ) a
    LEFT OUTER JOIN (
      SELECT *
      FROM ods_user_info_d_emr
      WHERE dt = '${bizdate}'
    ) b
    ON a.uid = b.uid;
  3. 右側のナビゲーションウィンドウで [デバッグ設定] をクリックし、前のノードと同じパラメーターを設定します:

    パラメーター
    コンピューティングリソースご利用のワークスペースに関連付けられた EMR コンピューティングリソースを選択します。 詳細については、「環境の準備」をご参照ください。
    リソースグループ環境の準備中に作成されたサーバーレスリソースグループを選択します。
    スクリプトパラメーター空のままにします。
  4. image アイコンをクリックしてノードを保存します。

ads_user_info_1d_emr の設定

このノードは、ユーザーごとのすべてのログアクティビティを集計して、最終的なユーザープロファイルを生成します。行を uid でグループ化し、MAX を適用してリージョン、デバイス、性別、年齢層、星座を統合し (すべてのセッションで最も優勢な値を取得)、COUNT を使用してユーザーごとの合計ページビュー (PV) 数を計算します。

  1. ワークフローキャンバスで、ads_user_info_1d_emr ノードにカーソルを合わせ、[ノードを開く] をクリックします。[リマインダー] ダイアログボックスで、[保存して開く] をクリックします。

  2. コードエディタに、次の SQL を入力します:

    -- ADS レイヤーにテーブルを作成します。
    CREATE TABLE IF NOT EXISTS ads_user_info_1d_emr (
      uid STRING COMMENT 'ユーザー ID',
      region STRING COMMENT 'IP アドレスに基づいて取得されるリージョン',
      device STRING COMMENT '端末タイプ',
      pv BIGINT COMMENT 'PV',
      gender STRING COMMENT '性別',
      age_range STRING COMMENT '年齢層',
      zodiac STRING COMMENT '星座'
    )
    PARTITIONED BY (
      dt STRING
    );
    
    ALTER TABLE ads_user_info_1d_emr ADD IF NOT EXISTS PARTITION (dt='${bizdate}');
    
    INSERT OVERWRITE TABLE ads_user_info_1d_emr PARTITION (dt='${bizdate}')
    SELECT uid
      , MAX(region)
      , MAX(device)
      , COUNT(0) AS pv
      , MAX(gender)
      , MAX(age_range)
      , MAX(zodiac)
    FROM dws_user_info_all_di_emr
    WHERE dt = '${bizdate}'
    GROUP BY uid;
  3. 右側のナビゲーションウィンドウで [デバッグ設定] をクリックし、前のノードと同じパラメーターを設定します:

    パラメーター
    コンピューティング リソースワークスペースに関連付けられている EMR 計算リソースを選択します。「環境の準備」をご参照ください。
    リソースグループ環境準備時に作成したサーバーレスリソースグループを選択します。
    スクリプトパラメータ空白のままにします。
  4. image アイコンをクリックしてノードを保存します。

ステップ 4:パイプラインの実行と検証

開発環境でワークフローを実行し、3つのノードすべてが正しく実行されることを確認します。

  1. ワークフロー設定タブの上部ツールバーで、[実行] をクリックします。[実行時パラメーターの入力] ダイアログボックスで、${bizdate} の値として 20250223 を入力し、[OK] をクリックします。入力した値は、この実行のすべてのノードスクリプトで ${bizdate} を置き換えます。yyyymmdd 形式の任意の日付を使用してください。

  2. 実行が完了したら、出力を検証します。DataStudio ページの左側のナビゲーションウィンドウで、image アイコンをクリックして DATA STUDIO ペインを開きます。[ワークスペースディレクトリ] セクションで、work ディレクトリを右クリックし、[ノードの作成] > [EMR] > [EMR Hive] を選択してクエリノードを作成します。次のクエリを実行し、<data_timestamp>ads_user_info_1d_emr ノードのデータタイムスタンプに置き換えます:

    • クエリが行を返した場合、データ処理は完了です。

    • クエリが行を返さない場合、ワークフロー実行時に入力した ${bizdate} の値が、クエリの dt の値と一致しているか確認してください。実行で使用されたデータタイムスタンプを確認するには、右側のナビゲーションウィンドウで [実行履歴] をクリックし、[アクション] 列の [表示] をクリックします。データタイムスタンプは、実行ログに partition=[pt=xxx] の形式で表示されます。

    データタイムスタンプは、ノードのスケジュールされた実行日より1日早くなります。たとえば、ノードが2025年2月23日に実行される場合、データタイムスタンプは 20250222 です。
    SELECT * FROM ads_user_info_1d_emr WHERE dt=<data_timestamp>;

ステップ 5:ワークフローのデプロイ

ワークフローを本番環境にデプロイし、ノードがスケジュールに従って自動的に実行されるようにします。

スケジューリングパラメーターはワークフローレベルで設定されます。各ノードで個別に設定する必要はありません。
  1. DataStudio ページの左側のナビゲーションウィンドウで、image アイコンをクリックします。DATA STUDIO ペインの [ワークスペースディレクトリ] セクションで、ワークフロー名をクリックして設定タブを開きます。

  2. 上部ツールバーで、[デプロイ] をクリックします。

  3. [デプロイ] タブで、[本番環境へのデプロイを開始] をクリックし、画面の指示に従います。

ステップ 6:本番環境での検証

デプロイ後、ノードはインスタンスを生成し、翌日から自動的に実行されます。データバックフィルを使用してノードをすぐに実行し、本番環境で正しく動作することを確認します。詳細については、「データバックフィルとデータバックフィルインスタンスの表示 (新バージョン)」をご参照ください。

  1. DataStudio ページの右上隅にある [オペレーションセンター] をクリックします。または、左上隅の 图标 アイコンをクリックし、[すべてのプロダクト] > [データ開発とタスクオペレーション] > [オペレーションセンター] を選択します。

  2. 左側のナビゲーションウィンドウで、[自動トリガーノード O&M] > [自動トリガーノード] を選択します。ゼロロードノード workshop_start_emr を見つけ、ノード名をクリックします。

  3. 有向非巡回グラフ (DAG) ビューで、workshop_start_emr を右クリックし、[実行] > [現在および下流ノードを遡及的に実行] を選択します。

  4. [データバックフィル] パネルで、バックフィルしたいノードを選択し、[データタイムスタンプ] パラメーターを設定して、[送信してリダイレクト] をクリックします。

  5. データバックフィルページの上部にある [更新] をクリックして、すべてのノードが正常に完了したか確認します。

次のステップ