すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:データの処理

最終更新日:Mar 20, 2025

このトピックでは、DataWorks で E-MapReduce(EMR)Hive ノードを使用してデータを処理する方法について説明します。 同期後に Object Storage Service(OSS)バケットに格納されている ods_user_info_d_emr テーブルと ods_raw_log_d_emr テーブルのデータを処理して、必要なユーザープロファイルデータを抽出する方法について説明します。

前提条件

必要なデータが同期されています。

ステップ 1:ワークフローを設計する

ワークフローノード間の依存関係の構成については、「データ同期」をご参照ください。

新しく作成したワークフローをダブルクリックして、編集ページにアクセスします。 [ノードの作成] をクリックし、[EMR Hive] を選択して、右側の編集ページにドラッグします。 [ノードの作成] ダイアログボックスで、[ノード名] を入力し、[確認] をクリックします。

dwd_log_info_di_emrdws_user_info_all_di_emrads_user_info_1d_emr という名前の 3 つの EMR Hive ノードを作成し、以下に示すように依存関係を設定します。

  • dwd_log_info_di_emr: 未加工の OSS ログデータをクレンジングします。

  • dws_user_info_all_di_emr: クレンジングされたログデータを基本的なユーザー情報と集計します。

  • ads_user_info_1d_emr: 最終的なユーザープロファイルデータを生成します。

image

ステップ 2:関数の作成

同期された未加工ログデータを正しくフォーマットするには、関数を使用してターゲットフォーマットに変換する必要があります。 この例では、IP アドレスをリージョンに変換するための関数コードパッケージが提供されています。 関数コードパッケージをローカルマシンにダウンロードし、DataWorks で関数として登録してから、必要に応じて関数を呼び出します。

リソースのアップロード

  1. ip2region-emr.jar ファイルをダウンロードします。

  2. [データ開発] ページで、WorkShop ワークフローを開き、[EMR] を右クリックし、[リソースの作成] > [EMR JAR] を選択します。 新しいリソースのパラメーターを構成し、[作成] をクリックします。 image

    主なパラメーターは次のとおりです。

    • [ストレージパス]: EMR クラスタ構成が格納されている準備環境の OSS バケットを選択します。

    • [ファイルをアップロード]: ダウンロードした ip2region-emr.jar ファイルを選択します。

    ビジネスニーズに応じて他のパラメーターを構成するか、デフォルト設定を使用します。

  3. ツールバーの image.png をクリックし、リソースを開発環境の EMR エンジンプロジェクトに送信します。

関数の登録

  1. [データ開発] ページで、ワークフローを開き、[EMR] を右クリックし、[関数の作成] を選択します。

  2. [関数の作成] ダイアログボックスで、[関数名] フィールドに「getregion」と入力し、[作成] をクリックして、関数の情報を構成します。 image

    主なパラメーターは次のとおりです。

    • [リソース]: ip2region-emr.jar ファイルを選択します。

    • [クラス名]: org.alidata.emr.udf.Ip2Region と入力します。

    ビジネスニーズに応じて他のパラメーターを構成するか、デフォルト設定を使用します。

  3. ツールバーの image.png をクリックし、関数を開発環境の EMR エンジンプロジェクトに送信します。

ステップ 3:EMR Hive ノードの構成

dwd_log_info_di_emr ノードの作成

1. コードの編集

dwd_log_info_di_emr ノードをダブルクリックして、ノード構成タブにアクセスします。 表示される構成タブに次の文を入力します。

説明

ワークスペースが DataStudio の複数の EMR コンピュートエンジンに関連付けられている場合は、必要な [EMR エンジン] を選択します。 関連付けられている EMR コンピュートエンジンが 1 つだけの場合は、選択は不要です。

-- ODS レイヤテーブルを作成する
CREATE TABLE IF NOT EXISTS dwd_log_info_di_emr (
  ip STRING COMMENT 'IP アドレス',
  uid STRING COMMENT 'ユーザーの ID',
  `time` STRING COMMENT 'yyyymmddhh:mi:ss 形式の時間',
  status STRING COMMENT 'サーバーから返される状態コード',
  bytes STRING COMMENT 'クライアントに返されるバイト数',
  region STRING COMMENT 'IP アドレスに基づいて取得されるリージョン',
  method STRING COMMENT 'HTTP リクエストのタイプ',
  url STRING COMMENT 'URL',
  protocol STRING COMMENT 'HTTP のバージョン番号',
  referer STRING COMMENT 'ソース URL',
  device STRING COMMENT '端末タイプ',
  identity STRING COMMENT 'アクセス タイプ。クローラー、フィード、ユーザー、または不明の場合があります'
)
PARTITIONED BY (
  dt STRING
);

ALTER TABLE dwd_log_info_di_emr ADD IF NOT EXISTS PARTITION (dt='${bizdate}');

set hive.vectorized.execution.enabled = false;
INSERT OVERWRITE TABLE dwd_log_info_di_emr PARTITION (dt='${bizdate}')
SELECT ip
  , uid
  , tm
  , status
  , bytes 
  , getregion(ip) AS region -- ユーザー定義関数(UDF)を使用して IP アドレスに基づいてリージョンを取得します。
  , regexp_extract(request, '(^[^ ]+) .*') AS method -- 正規表現を使用してリクエストを 3 つのフィールドに分割します。
  , regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') AS url
  , regexp_extract(request, '.* ([^ ]+$)') AS protocol 
  , regexp_extract(referer, '^[^/]+://([^/]+){1}') AS referer  -- 正規表現を使用してリファラーをスクラブし、より正確な URL を取得します。
  , CASE
    WHEN lower(agent) RLIKE 'android' THEN 'android' -- agent パラメーターの値から端末とアクセス タイプを取得します。
    WHEN lower(agent) RLIKE 'iphone' THEN 'iphone'
    WHEN lower(agent) RLIKE 'ipad' THEN 'ipad'
    WHEN lower(agent) RLIKE 'macintosh' THEN 'macintosh'
    WHEN lower(agent) RLIKE 'windows phone' THEN 'windows_phone'
    WHEN lower(agent) RLIKE 'windows' THEN 'windows_pc'
    ELSE 'unknown'
  END AS device
  , CASE
    WHEN lower(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler'
    WHEN lower(agent) RLIKE 'feed'
    OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') RLIKE 'feed' THEN 'feed'
    WHEN lower(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)'
    AND agent RLIKE '^[Mozilla|Opera]'
    AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') NOT RLIKE 'feed' THEN 'user'
    ELSE 'unknown'
  END AS identity
  FROM (
    SELECT SPLIT(col, '##@@')[0] AS ip
    , SPLIT(col, '##@@')[1] AS uid
    , SPLIT(col, '##@@')[2] AS tm
    , SPLIT(col, '##@@')[3] AS request
    , SPLIT(col, '##@@')[4] AS status
    , SPLIT(col, '##@@')[5] AS bytes
    , SPLIT(col, '##@@')[6] AS referer
    , SPLIT(col, '##@@')[7] AS agent
    FROM ods_raw_log_d_emr
  WHERE dt = '${bizdate}'
) a;

2. スケジューリング情報の構成

スケジューリングシナリオを設定するには、毎日 00:30 に ods_raw_log_d_emruser_log.txtods_raw_log_d_emrdwd_log_info_di_emrods_raw_log_d_emrdwd_log_info_di_emr ノードをトリガーするようにシステムを構成します。 これにより、OSS から データが EMR の テーブルに同期されます。 その後、 ノードは テーブルのデータを処理し、結果は テーブルの営業時間パーティションに格納されます。

構成項目

構成内容

スケジューリングパラメーター

[スケジューリングパラメーター] 領域に以下を追加します。

  • パラメーター名: bizdate

  • パラメーター値: $[yyyymmdd-1]

image

時間属性

[再実行属性][成功または失敗に関係なく再実行] に設定します。

image

スケジューリングの依存関係

[スケジューリングの依存関係] で、出力テーブルがこのノードの出力として設定されていることを確認します。

形式は WorkSpaceName.NodeName です

image

説明

時間属性を構成するには、[スケジューリングサイクル] を日次 に設定します。 現在のノードの [時間指定スケジューリング時間] を個別に構成する必要はありません。 このノードの日次トリガー時間は、ワークフロー内の仮想ノード workshop_start_emr の時間指定スケジューリング時間によって決定され、毎日 00:30 以降に実行されるようにスケジュールされます。

3. 構成の保存

この例に必要なその他の項目を必要に応じて構成します。 完了したら、ノードコード編集ページのツールバーにある image.png ボタンをクリックして、現在の構成を保存します。

dws_user_info_all_di_emr ノードの作成

1. コードの編集

dws_user_info_all_di_emr ノードをダブルクリックして、ノード構成タブにアクセスします。 表示される構成タブに次の文を入力します。

説明

ワークスペースが DataStudio の複数の EMR コンピュートエンジンに関連付けられている場合は、必要な [EMR エンジン] を選択します。 関連付けられている EMR コンピュートエンジンが 1 つだけの場合は、選択は不要です。

-- DW レイヤテーブルを作成する
CREATE TABLE IF NOT EXISTS dws_user_info_all_di_emr (
  uid STRING COMMENT 'ユーザーの ID',
  gender STRING COMMENT 'ユーザーの性別',
  age_range STRING COMMENT 'ユーザーの年齢層',
  zodiac STRING COMMENT 'ユーザーの星座',
  region STRING COMMENT 'IP アドレスに基づいて取得されるリージョン',
  device STRING COMMENT '端末タイプ',
  identity STRING COMMENT 'アクセス タイプ。クローラー、フィード、ユーザー、または不明の場合があります',
  method STRING COMMENT 'HTTP リクエストのタイプ',
  url STRING COMMENT 'URL',
  referer STRING COMMENT 'ソース URL',
  `time` STRING COMMENT 'yyyymmddhh:mi:ss 形式の時間'
)
PARTITIONED BY (
  dt STRING
);

ALTER TABLE dws_user_info_all_di_emr ADD IF NOT EXISTS PARTITION (dt='${bizdate}');

INSERT OVERWRITE TABLE dws_user_info_all_di_emr PARTITION (dt='${bizdate}')
SELECT COALESCE(a.uid, b.uid) AS uid
  , b.gender
  , b.age_range
  , b.zodiac
  , a.region
  , a.device
  , a.identity
  , a.method
  , a.url
  , a.referer
  , a.`time`
FROM (
  SELECT *
  FROM dwd_log_info_di_emr
  WHERE dt = '${bizdate}'
) a
LEFT OUTER JOIN (
  SELECT *
  FROM ods_user_info_d_emr
  WHERE dt = '${bizdate}'
) b
ON a.uid = b.uid;

2. スケジューリング情報の構成

スケジューリングシナリオを実装するには、毎日 00:30 に、アップストリームタスク ods_user_info_d_emrdwd_log_info_di_emrdws_user_info_all_di_emrods_user_info_d_emrdwd_log_info_di_emrdws_user_info_all_di_emr と が完了したら、 ノードがトリガーされるようにシステムを構成します。 これにより、 テーブルと テーブルがマージされ、結果が テーブルに書き込まれます。

構成項目

構成内容

スケジューリングパラメーター

[スケジューリングパラメーター] 領域に以下を追加します。

  • パラメーター名: bizdate

  • パラメーター値: $[yyyymmdd-1]

image

時間属性

[再実行属性][成功または失敗に関係なく再実行] に設定します。

image

スケジューリングの依存関係

[スケジューリングの依存関係] で、出力テーブルがこのノードの出力として設定されていることを確認します。

形式は WorkSpaceName.NodeName です

image

説明

時間属性を構成するには、[スケジューリングサイクル] を日次 に設定します。 現在のノードの [時間指定スケジューリング時間] を個別に構成する必要はありません。 このノードの日次トリガー時間は、ワークフロー内の仮想ノード workshop_start_emr の時間指定スケジューリング時間によって決定され、毎日 00:30 以降に実行されるようにスケジュールされます。

3. 構成の保存

この例に必要なその他の項目を必要に応じて構成します。 完了したら、ノードコード編集ページのツールバーにある image.png ボタンをクリックして、現在の構成を保存します。

ads_user_info_1d_emr ノードの作成

1. コードの編集

ads_user_info_1d_emr ノードをダブルクリックして、ノード構成タブにアクセスします。 表示される構成タブに次の文を入力します。

説明

ワークスペースが DataStudio の複数の EMR コンピュートエンジンに関連付けられている場合は、必要な [EMR エンジン] を選択します。 関連付けられている EMR コンピュートエンジンが 1 つだけの場合は、選択は不要です。

-- RPT レイヤテーブルを作成する
CREATE TABLE IF NOT EXISTS ads_user_info_1d_emr (
  uid STRING COMMENT 'ユーザーの ID',
  region STRING COMMENT 'IP アドレスに基づいて取得されるリージョン',
  device STRING COMMENT '端末タイプ',
  pv BIGINT COMMENT 'pv',
  gender STRING COMMENT 'ユーザーの性別',
  age_range STRING COMMENT 'ユーザーの年齢層',
  zodiac STRING COMMENT 'ユーザーの星座'
)
PARTITIONED BY (
  dt STRING
);

ALTER TABLE ads_user_info_1d_emr ADD IF NOT EXISTS PARTITION (dt='${bizdate}');

INSERT OVERWRITE TABLE ads_user_info_1d_emr PARTITION (dt='${bizdate}')
SELECT uid
  , MAX(region)
  , MAX(device)
  , COUNT(0) AS pv
  , MAX(gender)
  , MAX(age_range)
  , MAX(zodiac)
FROM dws_user_info_all_di_emr
WHERE dt = '${bizdate}'
GROUP BY uid;

2. スケジューリング情報の構成

アップストリーム dws_user_info_all_di_emrods_user_info_d_emrdwd_log_info_di_emrads_user_info_1d_emr ノードタスクが テーブルと テーブルをマージした後、 ノードタスクをトリガーして、データをさらに処理し、消費可能なデータを生成できます。

構成項目

構成内容

スケジューリングパラメーター

[スケジューリングパラメーター] 領域に以下を追加します。

  • パラメーター名: bizdate

  • パラメーター値: $[yyyymmdd-1]

image

時間属性

[再実行属性][成功または失敗に関係なく再実行] に設定します。

image

スケジューリングの依存関係

[スケジューリングの依存関係] で、出力テーブルがこのノードの出力として設定されていることを確認します。

形式は WorkSpaceName.NodeName です

image

説明

時間属性を構成するには、[スケジューリングサイクル] を日次 に設定します。 現在のノードの [時間指定スケジューリング時間] を個別に構成する必要はありません。 このノードの日次トリガー時間は、ワークフロー内の仮想ノード workshop_start_emr の時間指定スケジューリング時間によって決定され、毎日 00:30 以降に実行されるようにスケジュールされます。

3. 構成の保存

この例に必要なその他の項目を必要に応じて構成します。 完了したら、ノードコード編集ページのツールバーにある image.png ボタンをクリックして、現在の構成を保存します。

ステップ 4:ワークフローをコミットする

ワークフローを構成した後、テストして、期待どおりに実行されることを確認します。 テストが成功したら、ワークフローをコミットして、デプロイメントを待ちます。

  1. ワークフロー編集ページで、Run をクリックしてワークフローを実行します。

  2. ワークフロー内のすべてのノードに 成功 が表示されたら、提交 をクリックして、正常に実行されたワークフローを送信できます。

  3. [送信] ダイアログボックスで、送信が必要なノードを選択し、[入力と出力の不整合に関する警告を無視する] オプションをオンにして、[確認] をクリックします。

  4. 送信が成功したら、各ワークフローノードを公開します。

    1. ページの右側にある [公開] をクリックして、デプロイメントパッケージの作成ページにアクセスします。

    2. 公開するノードを選択し、[選択した項目を公開] をクリックし、[公開の確認] ダイアログボックスで、[公開] をクリックします。

ステップ 5:本番環境でノードを実行する

ノードが公開されると、インスタンスが作成され、翌日に実行されるようにスケジュールされます。 [データバックフィル] を使用して、公開されたワークフローでデータバックフィル操作を実行し、本番環境での機能を確認できます。 詳細については、「データバックフィルを実行し、データバックフィルインスタンスを表示する(新バージョン)」をご参照ください。

  1. ノードが正常に公開されたら、右上隅にある [オペレーションセンター] をクリックします。

    また、ワークフロー編集ページに移動し、ツールバーの [操作に移動] をクリックして、[オペレーションセンター] ページにアクセスすることもできます。

  2. 左側のナビゲーションバーで [定期的タスク操作] > [定期的タスク] をクリックして [定期的タスク] ページにアクセスし、workshop_start_emr 仮想ノードをクリックします。

  3. 右側の DAG 図で、workshop_start_emr ノードを右クリックし、[データバックフィル] > [現在のノードと子孫ノード] を選択します。

  4. データバックフィルが必要なタスクを選択し、営業日を入力して、[送信してジャンプ] をクリックします。

  5. データバックフィルページで、すべての SQL タスクが正常に実行されるまで [更新] をクリックできます。

次のステップ

定期的なスケジューリングシナリオでノードによって生成されたテーブルデータがビジネス要件を満たしていることを確認するために、監視ルールを構成して、ノードによって生成されたテーブルデータの品質を監視できます。