You can create an E-MapReduce (EMR) MR node to process a large dataset by using multiple parallel map tasks. This way, you can perform parallel computing on large datasets. This topic describes how to create an EMR MR node. It also describes the development process of an EMR MR node. In this process, an Object Storage Service (OSS) bucket is used, and the number of words in an OSS object is calculated by using the EMR MR node.

Prerequisites

  • An Alibaba Cloud EMR cluster is created. The inbound rules of the security group to which the cluster belongs include the following rules:
    • Action: Allow
    • Protocol type: Custom TCP
    • Port range: 8898/8898
    • Authorization object: 100.104.0.0/16
  • An EMR compute engine instance is associated with the desired workspace. The EMR folder is displayed only after you associate an EMR compute engine instance with the workspace on the Workspace Management page. For more information, see Configure a workspace.
  • If you integrate Hive with Ranger in EMR, you must modify whitelist configurations and restart Hive before you develop EMR nodes in DataWorks. Otherwise, the error message Cannot modify spark.yarn.queue at runtime or Cannot modify SKYNET_BIZDATE at runtime is returned when you run EMR nodes.
    1. You can modify the whitelist configurations by using custom parameters in EMR. You can append key-value pairs to the value of a custom parameter. In this example, the custom parameter for Hive components is used. The following code provides an example:
      hive.security.authorization.sqlstd.confwhitelist.append=tez.*|spark.*|mapred.*|mapreduce.*|ALISA.*|SKYNET.*
      Note In the code, ALISA.* and SKYNET.* are configurations in DataWorks.
    2. After the whitelist configurations are modified, you must restart the Hive service to make the configurations take effect. For more information, see Restart a service.
  • An exclusive resource group for scheduling is created, and the resource group is associated with the virtual private cloud (VPC) where the EMR cluster resides. For more information, see Create and use an exclusive resource group for scheduling.
    Note You can use only exclusive resource groups for scheduling to run EMR Hive nodes.
  • Open source code is uploaded as an EMR JAR resource. You can reference the open source code in your EMR MR node. .
  • User-defined functions (UDFs) are uploaded as EMR JAR resources and are registered with EMR. You can reference the UDFs in your EMR MR node. For more information about how to register a UDF function, see Create an EMR function.
  • The following operations are performed if you use the node development example in this topic to run the EMR MR node:
    • An OSS bucket is created. For more information, see Create buckets.
    • A Java project is created by using IntelliJ IDEA.

Background information

This topic describes the node development process of an EMR MR node. In this process, an OSS bucket is used, and the number of words in an OSS object is calculated by using the EMR MR node. You must replace the information involved in the development process with the actual information. The information includes the object name, bucket name, and directory.

Prepare initial data and a JAR resource package

  1. Prepare initial data.
    Create the input01.txt file that contains the following initial data:
    hadoop emr hadoop dw
    hive hadoop
    dw emr
  2. Create directories that are used to store initial data and JAR resources.
    1. Log on to the OSS console.
    2. In the left-side navigation pane, click Buckets.
    3. On the Buckets page, find the desired bucket and click the bucket name to go to the Files page.
      In this example, the onaliyun-bucket-2 bucket is used.
    4. On the Files page, click Create Folder to create directories that are used to store initial data and JAR resources.
      • Set Folder Name to emr/datas/wordcount02/inputs to create a directory that is used to store initial data.
      • Set Folder Name to emr/jars to create a directory that is used to store JAR resources.
    5. Upload the file that stores the initial data to the emr/datas/wordcount02/inputs directory.
      1. Go to the emr/datas/wordcount02/inputs directory.
      2. Click Upload.
      3. In the Files to Upload section, click Select Files and upload the input01.txt file to the bucket. Upload the file
      4. Click Upload.
  3. Use the EMR MR node to read the OSS object and generate a JAR package.
    1. Open the created IntelliJ IDEA project and add Project Object Model (POM) dependencies.
              <dependency>
                  <groupId>org.apache.hadoop</groupId>
                  <artifactId>hadoop-mapreduce-client-common</artifactId>
                  <version>2.8.5</version> <!--The version of the EMR MR node is V2.8.5.-->
              </dependency>
              <dependency>
                  <groupId>org.apache.hadoop</groupId>
                  <artifactId>hadoop-common</artifactId>
                  <version>2.8.5</version>
              </dependency>
    2. Configure the following parameters to read data from and write data to the OSS object:
      conf.set("fs.oss.accessKeyId", "${accessKeyId}");
      conf.set("fs.oss.accessKeySecret", "${accessKeySecret}");
      conf.set("fs.oss.endpoint","${endpoint}");
      Parameter description:
      • ${accessKeyId}: the AccessKey ID of your Alibaba Cloud account.
      • ${accessKeySecret}: the AccessKey secret of your Alibaba Cloud account.
      • ${endpoint}: the endpoint of OSS. The endpoint is determined by the region where your EMR cluster resides. You must activate OSS in the region where your EMR cluster resides. For more information, see Regions and endpoints.
      In the following code, the WordCount program of Hadoop is modified. The AccessKey ID and AccessKey secret are added to the code. This way, the EMR MR node has permissions to access the OSS object.
      package cn.apache.hadoop.onaliyun.examples;
      
      import java.io.IOException;
      import java.util.StringTokenizer;
      
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.io.IntWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.Mapper;
      import org.apache.hadoop.mapreduce.Reducer;
      import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
      import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
      import org.apache.hadoop.util.GenericOptionsParser;
      
      public class EmrWordCount {
          public static class TokenizerMapper
                  extends Mapper<Object, Text, Text, IntWritable> {
              private final static IntWritable one = new IntWritable(1);
              private Text word = new Text();
      
              public void map(Object key, Text value, Context context
              ) throws IOException, InterruptedException {
                  StringTokenizer itr = new StringTokenizer(value.toString());
                  while (itr.hasMoreTokens()) {
                      word.set(itr.nextToken());
                      context.write(word, one);
                  }
              }
          }
      
          public static class IntSumReducer
                  extends Reducer<Text, IntWritable, Text, IntWritable> {
              private IntWritable result = new IntWritable();
      
              public void reduce(Text key, Iterable<IntWritable> values,
                                 Context context
              ) throws IOException, InterruptedException {
                  int sum = 0;
                  for (IntWritable val : values) {
                      sum += val.get();
                  }
                  result.set(sum);
                  context.write(key, result);
              }
          }
      
          public static void main(String[] args) throws Exception {
              Configuration conf = new Configuration();
              String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
              if (otherArgs.length < 2) {
                  System.err.println("Usage: wordcount <in> [<in>...] <out>");
                  System.exit(2);
              }
              conf.set("fs.oss.accessKeyId", "${accessKeyId}"); // 
              conf.set("fs.oss.accessKeySecret", "${accessKeySecret}"); // 
              conf.set("fs.oss.endpoint", "${endpoint}"); //
              Job job = Job.getInstance(conf, "word count");
              job.setJarByClass(EmrWordCount.class);
              job.setMapperClass(TokenizerMapper.class);
              job.setCombinerClass(IntSumReducer.class);
              job.setReducerClass(IntSumReducer.class);
              job.setOutputKeyClass(Text.class);
              job.setOutputValueClass(IntWritable.class);
              for (int i = 0; i < otherArgs.length - 1; ++i) {
                  FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
              }
              FileOutputFormat.setOutputPath(job,
                      new Path(otherArgs[otherArgs.length - 1]));
              System.exit(job.waitForCompletion(true) ? 0 : 1);
          }
      }
                                      
    3. Compress the preceding code into a JAR package after you write the code. In this example, the onaliyun_mr_wordcount-1.0-SNAPSHOT.jar. JAR package is generated.

Create and use an EMR MR node

This topic describes the node development process of an EMR MR node. In this process, an OSS bucket is used, and the number of words in an OSS object is calculated by using the EMR MR node.

  1. Go to the DataStudio page.
    1. Log on to the DataWorks console.
    2. In the left-side navigation pane, click Workspaces.
    3. In the top navigation bar, select the region where your workspace resides, find the workspace, and then click Data Analytics in the Actions column.
  2. On the DataStudio page, move the pointer over the Create icon and choose EMR > EMR MR.
    Alternatively, you can find the required workflow, right-click the workflow name, and then choose Create > EMR > EMR MR.
  3. In the Create Node dialog box, set the Node Name and Location parameters.
    Note The node name must be 1 to 128 characters in length and can contain letters, digits, underscores (_), and periods (.).
  4. Click Commit.
  5. Create an EMR JAR resource.
    In this step, the JAR package is stored in the emr/jars directory. If this is the first time you use DataWorks to access OSS, click Authorize next to OSS to authorize DataWorks and EMR to access OSS. Create a JAR resource
  6. Reference the EMR JAR resource.
    1. Open the EMR MR node. The configuration tab of the node appears.
    2. Find the resource that you want to reference under Resource in the EMR folder, right-click the resource name, and then select Insert Resource Path. In this step, the resource that will be referenced is onaliyun_mr_wordcount-1.0-SNAPSHOT.jar..
      Insert Resource Path
    3. The code resource is referenced if the message shown in the following figure appears on the configuration tab of the EMR MR node. Then, run the following code.
      You must replace the information in the following code with the actual information. The information includes the resource package name, bucket name, and directory.
      ##@resource_reference{"onaliyun_mr_wordcount-1.0-SNAPSHOT.jar"}
      onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs
      Resource referenced
  7. In the right-side navigation pane, click Advanced Settings. On the Advanced Settings tab, change the values of the parameters.
    • "SPARK_CONF": "--conf spark.driver.memory=2g --conf xxx=xxx": the parameters that are required to run Spark jobs. You can configure multiple parameters in the --conf xxx=xxx format.
    • "queue": the scheduling queue to which jobs are committed. Default value: default.
    • "vcores": the number of CPU cores. Default value:1.
    • "memory": the memory that is allocated to the launcher, in MB. Default value: 2048.
    • "priority": the priority. Default value: 1.
    • "FLOW_SKIP_SQL_ANALYZE": specifies how SQL statements are executed. A value of false indicates that only one SQL statement is executed at a time. A value of true indicates that multiple SQL statements are executed at a time.
    • "USE_GATEWAY": specifies whether a gateway cluster is used to submit jobs on the current node. A value of true indicates that a gateway cluster is used to submit jobs. A value of false indicates that a gateway cluster is not used to submit jobs and jobs are submitted to the header node by default.
      Note If the EMR cluster to which the node belongs is not associated with a gateway cluster but you set the USE_GATEWAY parameter to true, jobs may fail to be submitted.
  8. In the right-side navigation pane, click Properties. On the Properties tab, you can configure properties for the EMR Presto node.
    For more information about how to configure basic properties for the EMR Presto node, see Configure basic properties.
  9. Save and commit the node.
    Notice You must set the Rerun and Parent Nodes parameters before you can commit the node.
    1. Click the Save icon in the toolbar to save the node.
    2. Click the Commit icon in the toolbar.
    3. In the Commit Node dialog box, enter your comments in the Change description field.
    4. Click OK.
    In a workspace in standard mode, you must click Deploy in the upper-right corner after you commit the node. For more information, see Deploy nodes.

View the result

  • Log on to the OSS console. Then, you can view the result in the emr/datas/wordcount02/inputs directory in which the initial data is stored. Destination bucket
  • View the statistical result in the DataWorks console.
    1. Create an EMR Hive node. For more information, see Create an EMR Hive node.
    2. Create Hive external tables that are mounted to OSS in the EMR Hive node. Sample code:
      CREATE EXTERNAL TABLE IF NOT EXISTS wordcount02_result_tb
      (
          `word` STRING COMMENT 'Word',
          `count` STRING COMMENT 'Count'   
      ) 
      ROW FORMAT delimited fields terminated by '\t'
      location 'oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs/';
      
      SELECT * FROM wordcount02_result_tb;
      The following figure shows the result. Results