すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Pig ジョブの作成と実行

最終更新日:Jan 11, 2025

このトピックでは、E-MapReduce(EMR)クラスターで Pig ジョブを作成および実行する方法について説明します。

Pig を使用して OSS データにアクセスする

Pig を使用して Object Storage Service(OSS)データにアクセスする場合、次の形式で OSS パスを指定する必要があります。
oss://${accessKeyId}:${accessKeySecret}@${bucket}.${endpoint}/${path}
パラメーター:
  • ${accessKeyId}: Alibaba Cloud アカウントの AccessKey ID。
  • ${accessKeySecret}: AccessKey ID に対応する AccessKey シークレット。
  • ${bucket}: AccessKey ID に対応するバケット。
  • ${endpoint}: OSS へのアクセスに使用するネットワークエンドポイント。クラスターが存在するリージョンによって異なります。OSS バケットは、クラスターが存在するリージョンにある必要があります。

    詳細については、「OSS エンドポイント」をご参照ください。

  • ${path}: バケット内のファイルのパス。

手順

Pig の script1-hadoop.pig ファイルを例として使用します。

  1. スクリプトを準備します。
    上記の OSS パスに基づいて、スクリプト内の JAR ファイルのパス、入力パス、出力パスを変更します。
    /*
     * Licensed to the Apache Software Foundation (ASF) under one
    * or more contributor license agreements.  See the NOTICE file
    * distributed with this work for additional information
    * regarding copyright ownership.  The ASF licenses this file
    * to you under the Apache License, Version 2.0 (the
    * "License"); you may not use this file except in compliance
    * with the License.  You may obtain a copy of the License at
    *
    *     http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    * See the License for the specific language governing permissions and
    * limitations under the License.
    */
    -- クエリフレーズの頻度(Hadoop クラスター)
    -- このスクリプトは、Excite 検索エンジンの検索クエリログファイルを処理し、特定の時間帯に特に高い頻度で発生する検索フレーズを見つけます。
    -- スクリプトで含まれている UDF を呼び出すことができるように、チュートリアル JAR ファイルを登録します。
    REGISTER oss://${AccessKeyId}:${AccessKeySecret}@${bucket}.${endpoint}/data/tutorial.jar;
    -- PigStorage 関数を使用して、excite ログファイルをレコードの配列として▒raw▒バッグにロードします。
    -- 入力: (user,time,query)
    raw = LOAD 'oss://${AccessKeyId}:${AccessKeySecret}@${bucket}.${endpoint}/data/excite.log.bz2' USING PigStorage('\t') AS (user, time, query);
    -- クエリフィールドが空または URL の場合、NonURLDetector UDF を呼び出してレコードを削除します。
    clean1 = FILTER raw BY org.apache.pig.tutorial.NonURLDetector(query);
    -- ToLower UDF を呼び出して、クエリフィールドを小文字に変更します。
    clean2 = FOREACH clean1 GENERATE user, time,     org.apache.pig.tutorial.ToLower(query) as query;
    -- ログファイルには1日分のクエリのみが含まれているため、時間だけに注目します。
    -- excite クエリログのタイムスタンプ形式は YYMMDDHHMMSS です。
    -- ExtractHour UDF を呼び出して、time フィールドから時間(HH)を抽出します。
    houred = FOREACH clean2 GENERATE user, org.apache.pig.tutorial.ExtractHour(time) as hour, query;
    -- NGramGenerator UDF を呼び出して、クエリの n-gram を構成します。
    ngramed1 = FOREACH houred GENERATE user, hour, flatten(org.apache.pig.tutorial.NGramGenerator(query)) as ngram;
    -- DISTINCT コマンドを使用して、すべてのレコードの一意の n-gram を取得します。
    ngramed2 = DISTINCT ngramed1;
    -- GROUP コマンドを使用して、n-gram と時間でレコードをグループ化します。
    hour_frequency1 = GROUP ngramed2 BY (ngram, hour);
    -- COUNT 関数を使用して、各 n-gram のカウント(出現回数)を取得します。
    hour_frequency2 = FOREACH hour_frequency1 GENERATE flatten($0), COUNT($1) as count;
    -- GROUP コマンドを使用して、n-gram のみでレコードをグループ化します。
    -- 各グループは個別の n-gram に対応し、各時間のカウントを持ちます。
    uniq_frequency1 = GROUP hour_frequency2 BY group::ngram;
    -- 各グループについて、この n-gram が特に高い頻度で使用される時間を特定します。
    -- ScoreGenerator UDF を呼び出して、n-gram の「人気」スコアを計算します。
    uniq_frequency2 = FOREACH uniq_frequency1 GENERATE flatten($0), flatten(org.apache.pig.tutorial.ScoreGenerator($1));
    -- FOREACH-GENERATE コマンドを使用して、フィールドに名前を割り当てます。
    uniq_frequency3 = FOREACH uniq_frequency2 GENERATE $1 as hour, $0 as ngram, $2 as score, $3 as count, $4 as mean;
    -- FILTER コマンドを使用して、スコアが 2.0 以下のすべてのレコードを移動します。
    filtered_uniq_frequency = FILTER uniq_frequency3 BY score > 2.0;
    -- ORDER コマンドを使用して、残りのレコードを時間とスコアでソートします。
    ordered_uniq_frequency = ORDER filtered_uniq_frequency BY hour, score;
    -- PigStorage 関数を使用して結果を保存します。
    -- 出力: (hour, n-gram, score, count, average_counts_among_all_hours)
    STORE ordered_uniq_frequency INTO 'oss://${AccessKeyId}:${AccessKeySecret}@${bucket}.${endpoint}/data/script1-hadoop-results' USING PigStorage();

    script1-hadoop.pig スクリプトを OSS パス(例:oss://emr/jars/)にアップロードします。

  2. ジョブを作成します。

    EMR コンソールの [データプラットフォーム] タブで Pig ジョブを作成します。詳細については、「Pig ジョブの構成」をご参照ください。

    ジョブの内容:
    -f ossref://emr/jars/script1-hadoop.pig
  3. ジョブを実行します。

    [実行] をクリックしてジョブを実行します。ジョブを既存のクラスターに関連付けることができます。また、システムが自動的にクラスターを作成し、ジョブをクラスターに関連付けるようにすることもできます。