Pig development manual

Last Updated: Apr 19, 2017

Use OSS in Pig

Refer to the following form for OSS paths:

oss://${AccessKeyId}:${AccessKeySecret}@${bucket}.${endpoint}/${path}

Parameter description:

${accessKeyId}: The AccessKeyId of your account.

${accessKeySecret}: The password of the AccessKeyId.

${bucket}: The bucket of the AccessKeyId.

${Endpoint}: The network used for the access to OSS. It depends on the region of your cluster, and the corresponding OSS should also be in the region of the cluster.

${Path}: The bucket path.

Specific values can be found Here.

Take script1-hadoop.pig in the Pig as an example. Upload tutorial.jar and excite.log.bz2 in Pig to the OSS. Suppose the uploading path is oss://emr/jars/tutorial.jar and oss://emr/data/excite.log.bz2.

For the operation, see the following steps:

  1. Write scripts. Modify the jar file path and input/output path in the script, as shown below. Note that the OSS path settings should follow the form of oss://${accesskeyId}:${accessKeySecret}@${bucket}.${endpoint}/object/path.

    1. /*
    2. * Licensed to the Apache Software Foundation (ASF) under one
    3. * or more contributor license agreements. See the NOTICE file
    4. * distributed with this work for additional information
    5. * regarding copyright ownership. The ASF licenses this file
    6. * to you under the Apache License, Version 2.0 (the
    7. * "License"); you may not use this file except in compliance
    8. * with the License. You may obtain a copy of the License at
    9. *
    10. * http://www.apache.org/licenses/LICENSE-2.0
    11. *
    12. * Unless required by applicable law or agreed to in writing, software
    13. * distributed under the License is distributed on an "AS IS" BASIS,
    14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    15. * See the License for the specific language governing permissions and
    16. * limitations under the License.
    17. */
    18. -- Query Phrase Popularity (Hadoop cluster)
    19. -- This script processes a search query log file from the Excite search engine and finds search phrases that occur with particular high frequency during certain times of the day.
    20. -- Register the tutorial JAR file so that the included UDFs can be called in the script.
    21. REGISTER oss://${AccessKeyId}:${AccessKeySecret}@${bucket}.${endpoint}/data/tutorial.jar;
    22. -- Use the PigStorage function to load the excite log file into the raw bag as an array of records.
    23. -- Input: (user,time,query)
    24. raw = LOAD 'oss://${AccessKeyId}:${AccessKeySecret}@${bucket}.${endpoint}/data/excite.log.bz2' USING PigStorage('\t') AS (user, time, query);
    25. -- Call the NonURLDetector UDF to remove records if the query field is empty or a URL.
    26. clean1 = FILTER raw BY org.apache.pig.tutorial.NonURLDetector(query);
    27. -- Call the ToLower UDF to change the query field to lowercase.
    28. clean2 = FOREACH clean1 GENERATE user, time, org.apache.pig.tutorial.ToLower(query) as query;
    29. -- Because the log file only contains queries for a single day, we are only interested in the hour.
    30. -- The excite query log timestamp format is YYMMDDHHMMSS.
    31. -- Call the ExtractHour UDF to extract the hour (HH) from the time field.
    32. houred = FOREACH clean2 GENERATE user, org.apache.pig.tutorial.ExtractHour(time) as hour, query;
    33. -- Call the NGramGenerator UDF to compose the n-grams of the query.
    34. ngramed1 = FOREACH houred GENERATE user, hour, flatten(org.apache.pig.tutorial.NGramGenerator(query)) as ngram;
    35. -- Use the DISTINCT command to get the unique n-grams for all records.
    36. ngramed2 = DISTINCT ngramed1;
    37. -- Use the GROUP command to group records by n-gram and hour.
    38. hour_frequency1 = GROUP ngramed2 BY (ngram, hour);
    39. -- Use the COUNT function to get the count (occurrences) of each n-gram.
    40. hour_frequency2 = FOREACH hour_frequency1 GENERATE flatten($0), COUNT($1) as count;
    41. -- Use the GROUP command to group records by n-gram only.
    42. -- Each group now corresponds to a distinct n-gram and has the count for each hour.
    43. uniq_frequency1 = GROUP hour_frequency2 BY group::ngram;
    44. -- For each group, identify the hour in which this n-gram is used with a particularly high frequency.
    45. -- Call the ScoreGenerator UDF to calculate a "popularity" score for the n-gram.
    46. uniq_frequency2 = FOREACH uniq_frequency1 GENERATE flatten($0), flatten(org.apache.pig.tutorial.ScoreGenerator($1));
    47. -- Use the FOREACH-GENERATE command to assign names to the fields.
    48. uniq_frequency3 = FOREACH uniq_frequency2 GENERATE $1 as hour, $0 as ngram, $2 as score, $3 as count, $4 as mean;
    49. -- Use the FILTER command to move all records with a score less than or equal to 2.0.
    50. filtered_uniq_frequency = FILTER uniq_frequency3 BY score > 2.0;
    51. -- Use the ORDER command to sort the remaining records by hour and score.
    52. ordered_uniq_frequency = ORDER filtered_uniq_frequency BY hour, score;
    53. -- Use the PigStorage function to store the results.
    54. -- Output: (hour, n-gram, score, count, average_counts_among_all_hours)
    55. STORE ordered_uniq_frequency INTO 'oss://${AccessKeyId}:${AccessKeySecret}@${bucket}.${endpoint}/data/script1-hadoop-results' USING PigStorage();
  2. Create a job. Store the script compiled in Step 1 to OSS, for example, to oss://emr/jars/script1-hadoop.pig. Then create the following job in E-MapReduce:

    Basic configurations

  3. Create an execution plan and run it. Create an execution plan in E-MapReduce and add the created Pig job to the execution plan. Select Run Now as the policy, so that the script1-hadoop job can be run in the selected cluster.

Thank you! We've received your feedback.