All Products
Search
Document Center

DataWorks:EMR MR node

Last Updated:Jul 29, 2025

In E-MapReduce (EMR) task development, you can create a MapReduce (MR) node to decompose a large dataset into multiple map tasks for parallel processing. This method significantly improves data processing efficiency. This topic uses an example that reads a text file from Object Storage Service (OSS) and counts the words in the file to describe how to develop and configure an EMR MR node job.

Prerequisites

  • An EMR cluster is created and the cluster is registered to DataWorks. For more information, see DataStudio (old version): Associate an EMR computing resource.

  • (Required if you use a RAM user to develop tasks) The RAM user is added to your DataWorks workspace as a member and is assigned the Development or Workspace Manager role. The Workspace Manager role has more permissions than necessary. Exercise caution when you assign the Workspace Manager role. For more information about how to add a member, see Add workspace members and assign roles to them.

    Note

    If you use an Alibaba Cloud account, you can skip this operation.

  • A workspace directory is created. For more information, see Workspace directories.

  • If you want to reference open source code resources or user-defined functions in the node, you must create them in Resource Management. For more information, see Resource management.

  • If you use the job development example in this topic to run the workflow, you must also create an OSS bucket. For more information about how to create an OSS bucket, see Create buckets in the console.

  • An EMR MR node is created. For more information, see Create an auto-triggered task.

Limits

  • This type of node can be run only on a serverless resource group or an exclusive resource group for scheduling. We recommend that you use a serverless resource group.

  • If you want to manage metadata for a DataLake or custom cluster in DataWorks, you must first configure EMR-HOOK in the cluster. For more information about how to configure EMR-HOOK, see Configure EMR-HOOK for Hive.

    Note

    If you do not configure EMR-HOOK in your cluster, metadata cannot be displayed in real time, audit logs cannot be generated, or data lineages cannot be displayed in DataWorks. In addition, EMR-related administration tasks cannot be run.

Prepare initial data and a JAR resource package

Prepare initial data

Create a file named input01.txt that contains the following initial data:

hadoop emr hadoop dw
hive hadoop
dw emr

Upload the initial data file

  1. Log on to the OSS console. In the navigation pane on the left, click Buckets.

  2. Click the destination bucket name to go to the Files page.

    In this example, the onaliyun-bucket-2 bucket is used.

  3. Click Create Directory to create directories to store initial data and JAR resources.

    • Set Directory Name to emr/datas/wordcount02/inputs to create a directory to store the initial data.

    • Set Directory Name to emr/jars to create a directory to store JAR resources.

  4. Upload the initial data file to the directory created for storing initial data.

    • Navigate to the /emr/datas/wordcount02/inputs path and click Upload.

    • In the Files To Upload section, click Scan For Files, select the input01.txt file, and then click Upload.

Use MapReduce to read the OSS file and generate a JAR package

  1. Open an existing IDEA project and add pom dependencies.

            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-common</artifactId>
                <version>2.8.5</version> <!--The version used by EMR-MR is 2.8.5.-->
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>2.8.5</version>
            </dependency>
  2. To read data from and write data to an OSS file in MapReduce, you must configure the following parameters.

    Important

    Risk warning: The AccessKey pair of an Alibaba Cloud account has all permissions for API operations. We recommend that you use a RAM user to call API operations or perform routine O&M. We strongly recommend that you do not save the AccessKey ID and AccessKey secret of your Alibaba Cloud account in your project code or any other location that is prone to leaks. An AccessKey pair leak may threaten the security of all resources in your account. The following sample code is for reference only. Keep your AccessKey information confidential.

    conf.set("fs.oss.accessKeyId", "${accessKeyId}");
    conf.set("fs.oss.accessKeySecret", "${accessKeySecret}");
    conf.set("fs.oss.endpoint","${endpoint}");

    The following table describes the parameters.

    • ${accessKeyId}: the AccessKey ID of your Alibaba Cloud account.

    • ${accessKeySecret}: the AccessKey secret of your Alibaba Cloud account.

    • ${endpoint}: The endpoint of the OSS bucket. The endpoint is determined by the region where your cluster resides. The OSS bucket must also reside in the same region as the cluster. For more information, see OSS regions and endpoints.

    The following Java code provides an example of how to modify the WordCount example from the official Hadoop website. The AccessKey ID and AccessKey secret configurations are added to the code to grant the job permissions to access the OSS file.

    Sample code

    package cn.apache.hadoop.onaliyun.examples;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class EmrWordCount {
        public static class TokenizerMapper
                extends Mapper<Object, Text, Text, IntWritable> {
            private final static IntWritable one = new IntWritable(1);
            private Text word = new Text();
    
            public void map(Object key, Text value, Context context
            ) throws IOException, InterruptedException {
                StringTokenizer itr = new StringTokenizer(value.toString());
                while (itr.hasMoreTokens()) {
                    word.set(itr.nextToken());
                    context.write(word, one);
                }
            }
        }
    
        public static class IntSumReducer
                extends Reducer<Text, IntWritable, Text, IntWritable> {
            private IntWritable result = new IntWritable();
    
            public void reduce(Text key, Iterable<IntWritable> values,
                               Context context
            ) throws IOException, InterruptedException {
                int sum = 0;
                for (IntWritable val : values) {
                    sum += val.get();
                }
                result.set(sum);
                context.write(key, result);
            }
        }
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
            if (otherArgs.length < 2) {
                System.err.println("Usage: wordcount <in> [<in>...] <out>");
                System.exit(2);
            }
            conf.set("fs.oss.accessKeyId", "${accessKeyId}"); // 
            conf.set("fs.oss.accessKeySecret", "${accessKeySecret}"); // 
            conf.set("fs.oss.endpoint", "${endpoint}"); //
            Job job = Job.getInstance(conf, "word count");
            job.setJarByClass(EmrWordCount.class);
            job.setMapperClass(TokenizerMapper.class);
            job.setCombinerClass(IntSumReducer.class);
            job.setReducerClass(IntSumReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            for (int i = 0; i < otherArgs.length - 1; ++i) {
                FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
            }
            FileOutputFormat.setOutputPath(job,
                    new Path(otherArgs[otherArgs.length - 1]));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }
                                    
  3. After you edit the Java code, generate a JAR package from the code. In this example, a JAR package named onaliyun_mr_wordcount-1.0-SNAPSHOT.jar is generated.

Procedure

  1. On the EMR MR node editing page, perform the following development operations:

    Develop an EMR MR task

    You can select an operation plan based on your scenario:

    Method 1: Upload a resource and then reference the EMR JAR resource

    DataWorks lets you upload a resource from your on-premises machine to DataStudio and then reference the resource. If the EMR MR node depends on a large resource that cannot be uploaded on the DataWorks page, you can store the resource in HDFS and then reference it in the code.

    1. Create an EMR JAR resource.

      1. For more information, see Resource management. Store the JAR package generated in the Prepare initial data and a JAR resource package section to the emr/jars directory. Click the Click To Upload button to upload the JAR resource.

      2. Select a Storage Path, Data Source, and Resource Group.

      3. Click the Save button.

      image

    2. Reference the EMR JAR resource.

      1. You can open the created EMR MR node to access the code editor page.

      2. In the Resource Management section of the left navigation pane, find the resource to reference. In this example, the resource is onaliyun_mr_wordcount-1.0-SNAPSHOT.jar. Right-click the resource and select Reference Resource.

      3. After you select a resource, a success message confirming the reference is displayed on the code editor page for the EMR MR node. Next, run the following command. Remember to replace the example resource package, bucket name, and path with your actual information.

        ##@resource_reference{"onaliyun_mr_wordcount-1.0-SNAPSHOT.jar"}
        onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs
        Note

        Comment statements are not supported when you edit the code for an EMR MR node.

    Method 2: Directly reference an OSS resource

    The current node can directly reference an OSS resource using the OSS REF method. When you run the EMR node, DataWorks automatically loads the OSS resource in the code to the local device. This method is often used in scenarios where EMR tasks need to run JAR dependencies or depend on scripts.

    1. Upload the JAR resource.

      1. After you complete code development, log on to the OSS console and click Buckets in the navigation pane on the left.

      2. Click the destination bucket name to go to the Files page.

        In this example, the onaliyun-bucket-2 bucket is used.

      3. Upload the JAR resource to the directory created for storing JAR resources.

        Go to the emr/jars directory and click Upload. In the Files To Upload section, click Scan For Files, add the onaliyun_mr_wordcount-1.0-SNAPSHOT.jar file to the bucket, and then click Upload.

    2. Reference the JAR resource.

      On the EMR MR node editing page, edit the code to reference the JAR resource.

      On the EMR MR node editing page, edit the code for referencing the JAR resource.

      hadoop jar ossref://onaliyun-bucket-2/emr/jars/onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs
      Note

      The preceding command uses the following format: hadoop jar <Path to the JAR file to be referenced and run> <Full name of the main class to be run> <Directory of the file to be read> <Directory for the results>.

      The following table describes the parameter for the path to the JAR file that is referenced and run.

      Parameter

      Description

      Path where the JAR file to be referenced and run is stored

      The path is in the ossref://{endpoint}/{bucket}/{object} format.

      • endpoint: the endpoint of the OSS bucket. If the endpoint parameter is left empty, only the OSS bucket that resides in the same region as the current EMR cluster can be used.

      • Bucket: a container that is used to store objects in OSS. Each Bucket has a unique name. You can log on to the OSS console to view all Buckets within the current logon account.

      • object: a specific object (file name or path) that is stored in a Bucket.

    (Optional) Configure advanced parameters

    On the Scheduling Configuration pane on the right side of the node, you can configure the parameters described in the following table in the EMR Node Parameters > DataWorks Parameters section.

    Note
    • The available advanced parameters vary based on the EMR cluster type, as described in the following table.

    • You can configure open source Spark properties in the EMR Node Parameters > Spark Parameters section of the Scheduling Configuration pane on the right side of the node.

    DataLake cluster/custom cluster: EMR on ECS

    Advanced parameter

    Description

    queue

    The scheduling queue to which jobs are submitted. The default value is default. For information about EMR YARN, see Basic queue configurations.

    priority

    The priority. The default value is 1.

    FLOW_SKIP_SQL_ANALYZE

    The method that is used to execute SQL statements. Valid values:

    • true: Multiple SQL statements are executed at a time.

    • false (default value): Only one SQL statement is executed at a time.

    Note

    This parameter is supported only for testing the running process in the data development environment.

    Others

    You can also directly append custom MR task parameters in the advanced configuration. When you submit the code, DataWorks automatically adds the new parameters to the command using the -D key=value statement.

    Hadoop cluster: EMR on ECS

    Advanced parameter

    Description

    queue

    The scheduling queue to which jobs are submitted. The default value is default. For information about EMR YARN, see Basic queue configurations.

    priority

    The priority. The default value is 1.

    USE_GATEWAY

    Specifies whether to submit jobs on this node through a Gateway cluster. Valid values:

    • true: submits jobs through a Gateway cluster.

    • false (default value): does not submit jobs through a Gateway cluster. By default, jobs are submitted to the header node.

    Note

    If the cluster to which this node belongs is not associated with a Gateway cluster, but you manually set this parameter to true, the subsequent submission of EMR jobs fails.

    Execute an SQL task

    1. On the Debugging Configurations tab, in the Computing Resource section, configure the Computing Resource and DataWorks Resource Group parameters.

      Note
      • You can also configure Scheduling CUs based on the resources required for task execution. The default CU value is 0.25.

      • To access a data source over the internet or a VPC, you must use an exclusive resource group for scheduling that passes the connectivity test with the data source. For more information, see Network connectivity solutions.

    2. In the parameter dialog box on the toolbar, select your data source and click Run to execute the SQL task.

  2. If you want to run the node task on a regular basis, you can configure its scheduling properties. For more information, see Node scheduling.

  3. After the node is configured, you must publish it. For more information, see Publish a node or workflow.

  4. After the task is published, you can view the running status of the auto-triggered task in Operation Center. For more information, see Getting started with Operation Center.

View the results

  • Log on to the OSS console. You can view the results in the output directory of the destination bucket. An example path is emr/datas/wordcount02/outputs.目标Bucket

  • Read the statistical results in DataWorks:

    1. Create an EMR Hive node. For more information, see Create an auto-triggered task.

    2. On the EMR Hive node, create a Hive external table that is mounted to OSS and read the table data. The following code provides an example.

      CREATE EXTERNAL TABLE IF NOT EXISTS wordcount02_result_tb
      (
          `word` STRING COMMENT 'word',
          `cout` STRING COMMENT 'count'   
      ) 
      ROW FORMAT delimited fields terminated by '\t'
      location 'oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs/';
      
      SELECT * FROM wordcount02_result_tb;

      The following figure shows the execution results.运行结果