You can create an E-MapReduce (EMR) MR node to process a large dataset by using multiple parallel map tasks. This way, you can perform parallel computing on large datasets. This topic describes how to create an EMR MR node. This topic also describes how to run an EMR MR node to read data from an Object Storage Service (OSS) object in an OSS bucket and count the number of words in the OSS object.

Prerequisites

  • The preparations that are required to create a node based on an EMR data lake cluster or an EMR Hadoop cluster are complete. The preparations vary based on the type of the EMR cluster. The preparations that are required to configure the settings on EMR and DataWorks sides based on your business requirements are complete.
  • If you want to reference open source code in your EMR MR node, make sure that the open source code is uploaded as an EMR JAR resource. For more information, see Create and use an EMR JAR resource.
  • If you want to reference user-defined functions (UDFs) in your EMR MR node, make sure that the UDFs are uploaded as EMR JAR resources and are registered with EMR. For more information about how to register a UDF, see Create an EMR function.
  • An OSS bucket is created. You can use the bucket to develop an EMR MR node that meets your business requirements by referring to the sample code for node development in this topic. For more information about how to create an OSS bucket, see Create buckets.

Background information

This topic describes how to create an EMR MR node and use the node to read data from an OSS object in an OSS bucket and count the number of words in the OSS object. You must replace the object name, bucket name, and directory that are involved in the development process with the actual information.

Prepare initial data and a JAR resource package

  1. Prepare initial data.
    Create the input01.txt file that contains the following initial data:
    hadoop emr hadoop dw
    hive hadoop
    dw emr
  2. Create directories that are used to store initial data and JAR resources.
    1. Log on to the OSS console.
    2. In the left-side navigation pane, click Buckets.
    3. On the Buckets page, find the desired bucket and click the bucket name to go to the Files page.
      In this example, the onaliyun-bucket-2 bucket is used.
    4. On the Files page, click Create Folder to create directories that are used to store initial data and JAR resources.
      • Set Folder Name to emr/datas/wordcount02/inputs to create a directory that is used to store initial data.
      • Set Folder Name to emr/jars to create a directory that is used to store JAR resources.
    5. Upload the file that stores the initial data to the emr/datas/wordcount02/inputs directory.
      1. Go to the emr/datas/wordcount02/inputs directory.
      2. Click Upload.
      3. In the Files to Upload section, click Select Files and upload the input01.txt file to the bucket. Upload the file
      4. Click Upload.
  3. Use the EMR MR node to read the OSS object and generate a JAR package.
    1. Open an existing IntelliJ IDEA project and add Project Object Model (POM) dependencies.
              <dependency>
                  <groupId>org.apache.hadoop</groupId>
                  <artifactId>hadoop-mapreduce-client-common</artifactId>
                  <version>2.8.5</version> <!--The version of the EMR MR node is V2.8.5.-->
              </dependency>
              <dependency>
                  <groupId>org.apache.hadoop</groupId>
                  <artifactId>hadoop-common</artifactId>
                  <version>2.8.5</version>
              </dependency>
    2. To read data from and write data to the OSS object, configure the following parameters:
      conf.set("fs.oss.accessKeyId", "${accessKeyId}");
      conf.set("fs.oss.accessKeySecret", "${accessKeySecret}");
      conf.set("fs.oss.endpoint","${endpoint}");
      Parameter description:
      • ${accessKeyId}: the AccessKey ID of your Alibaba Cloud account.
      • ${accessKeySecret}: the AccessKey secret of your Alibaba Cloud account.
      • ${endpoint}: the endpoint of OSS. The endpoint is determined by the region where your EMR cluster resides. You must activate OSS in the region where your EMR cluster resides. For more information, see Regions and endpoints.
      In this topic, the Java code is used to modify the WordCount example on the Hadoop official website. The configuration of the AccessKey ID and AccessKey secret is added to the code. This grants the job the permissions to access OSS files.
      package cn.apache.hadoop.onaliyun.examples;
      
      import java.io.IOException;
      import java.util.StringTokenizer;
      
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.io.IntWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.Mapper;
      import org.apache.hadoop.mapreduce.Reducer;
      import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
      import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
      import org.apache.hadoop.util.GenericOptionsParser;
      
      public class EmrWordCount {
          public static class TokenizerMapper
                  extends Mapper<Object, Text, Text, IntWritable> {
              private final static IntWritable one = new IntWritable(1);
              private Text word = new Text();
      
              public void map(Object key, Text value, Context context
              ) throws IOException, InterruptedException {
                  StringTokenizer itr = new StringTokenizer(value.toString());
                  while (itr.hasMoreTokens()) {
                      word.set(itr.nextToken());
                      context.write(word, one);
                  }
              }
          }
      
          public static class IntSumReducer
                  extends Reducer<Text, IntWritable, Text, IntWritable> {
              private IntWritable result = new IntWritable();
      
              public void reduce(Text key, Iterable<IntWritable> values,
                                 Context context
              ) throws IOException, InterruptedException {
                  int sum = 0;
                  for (IntWritable val : values) {
                      sum += val.get();
                  }
                  result.set(sum);
                  context.write(key, result);
              }
          }
      
          public static void main(String[] args) throws Exception {
              Configuration conf = new Configuration();
              String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
              if (otherArgs.length < 2) {
                  System.err.println("Usage: wordcount <in> [<in>...] <out>");
                  System.exit(2);
              }
              conf.set("fs.oss.accessKeyId", "${accessKeyId}"); // 
              conf.set("fs.oss.accessKeySecret", "${accessKeySecret}"); // 
              conf.set("fs.oss.endpoint", "${endpoint}"); //
              Job job = Job.getInstance(conf, "word count");
              job.setJarByClass(EmrWordCount.class);
              job.setMapperClass(TokenizerMapper.class);
              job.setCombinerClass(IntSumReducer.class);
              job.setReducerClass(IntSumReducer.class);
              job.setOutputKeyClass(Text.class);
              job.setOutputValueClass(IntWritable.class);
              for (int i = 0; i < otherArgs.length - 1; ++i) {
                  FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
              }
              FileOutputFormat.setOutputPath(job,
                      new Path(otherArgs[otherArgs.length - 1]));
              System.exit(job.waitForCompletion(true) ? 0 : 1);
          }
      }
                                      
    3. After you write the preceding code, compress the code into a JAR package. In this example, the onaliyun_mr_wordcount-1.0-SNAPSHOT.jar package is generated.

Create and use an EMR MR node

This topic describes how to create an EMR MR node and use the node to read data from an OSS object in an OSS bucket and count the number of words in the OSS object.

  1. Go to the DataStudio page.
    1. Log on to the DataWorks console.
    2. In the left-side navigation pane, click Workspaces.
    3. In the top navigation bar, select the region in which the workspace in which you want to create a MySQL node resides. Find the workspace and click DataStudio in the Actions column.
  2. Create a workflow.
    If you have an existing workflow, skip this step.
    1. Move the pointer over the Create icon and select Create Workflow.
    2. In the Create Workflow dialog box, configure the Workflow Name parameter.
    3. Click Create.
  3. Create an EMR MR node.
    1. Move the pointer over the Create icon icon and choose Create Node > EMR > EMR MR.
      You can also find the desired workflow, right-click the workflow, and then choose Create Node > EMR > EMR MR.
    2. In the Create Node dialog box, configure the Name, Engine Instance, Node Type, and Path parameters.
      Note The node name must be 1 to 128 characters in length and can contain letters, digits, underscores (_), and periods (.).
    3. Click Commit. Then, the configuration tab of the EMR MR node appears.
  4. Create and reference an EMR JAR resource.
    Note If the EMR MR node depends on large amounts of resources, the resources cannot be uploaded by using the DataWorks console. In this case, you can store the resources in HDFS and reference the resources in the code of the EMR MR node. Sample code:
    spark-submit --master yarn
    --deploy-mode cluster
    --name SparkPi
    --driver-memory 4G
    --driver-cores 1
    --num-executors 5
    --executor-memory 4G
    --executor-cores 1
    --class org.apache.spark.examples.JavaSparkPi
    hdfs:///tmp/jars/spark-examples_2.11-2.4.8.jar 100
    1. Create an EMR JAR resource.
      For more information about how to create an EMR JAR resource, see Create and use an EMR JAR resource. In this example, the JAR package that is generated in the Prepare initial data and a JAR resource package section is stored in the emr/jars directory. The directory is used to store JAR resources. The first time you use DataWorks to access OSS, click Authorize to the right of OSS to authorize DataWorks and EMR to access OSS. Create a JAR resource
    2. Reference the EMR JAR resource.
      1. Open the EMR MR node. The configuration tab of the node appears.
      2. Find the resource that you want to reference under Resource in the EMR folder, right-click the resource name, and then select Insert Resource Path. In this example, the resource that is referenced is onaliyun_mr_wordcount-1.0-SNAPSHOT.jar. Insert Resource Path
      3. If the message shown in the following figure appears on the configuration tab of the EMR MR node, the code resource is referenced. Then, run the following code. You must replace the information in the following code with the actual information. The information includes the resource package name, bucket name, and directory.
        ##@resource_reference{"onaliyun_mr_wordcount-1.0-SNAPSHOT.jar"}
        onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs
        Note You cannot add comments when you write code for the EMR MR node.
        Resource referenced
  5. Configure the parameters on the Advanced Settings tab.
    The following table describes the advanced parameters that are configured for different types of EMR clusters.
    Cluster type Advanced parameter
    EMR data lake cluster
    • "queue": the scheduling queue to which jobs are committed. Default value: default.
    • "priority": the priority. Default value: 1.
    Note You can add a custom parameter for the EMR MR node as an advanced parameter on the Advanced Settings tab in the DataWorks console. When you commit the code for the EMR MR node in DataWorks, DataWorks adds the custom parameter to a command in the -D key=value format.
    EMR Hadoop cluster
  6. Commit and deploy the MySQL node.
    1. Click the Save icon in the top toolbar to save the node.
    2. Click the Submit icon in the top toolbar to commit the node.
    3. In the Commit Node dialog box, configure the Change description parameter.
    4. Click OK.
    If you use a workspace in standard mode, you must deploy the node in the production environment after you commit the node. On the left side of the top navigation bar, click Deploy. For more information, see Deploy nodes.
  7. View the MySQL node.
    1. Click Operation Center in the upper-right corner of the DataStudio page to go to Operation Center.
    2. View the scheduled MySQL node. For more information, see View and manage auto triggered nodes.

Check the result

  • Log on to the OSS console. Then, you can view the result in the emr/datas/wordcount02/inputs directory in which the initial data is stored. Destination bucket
  • View the statistical result in the DataWorks console.
    1. Create an EMR Hive node. For more information, see Create an EMR Hive node.
    2. On the EMR Hive node, create a Hive external table that is mounted to OSS. Then, use the Hive external table to read data from Hive tables in OSS. Sample code:
      CREATE EXTERNAL TABLE IF NOT EXISTS wordcount02_result_tb
      (
          `word` STRING COMMENT 'Word',
          `count` STRING COMMENT 'Count'   
      ) 
      ROW FORMAT delimited fields terminated by '\t'
      location 'oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs/';
      
      SELECT * FROM wordcount02_result_tb;
      The following figure shows the results. Results