All Products
Search
Document Center

DataWorks:Create an EMR MR node

Last Updated:Mar 26, 2026

E-MapReduce (EMR) MR nodes run Hadoop MapReduce jobs on your EMR cluster directly from DataWorks. Each job distributes work across multiple parallel map tasks, making them suited for large-scale batch processing. This topic walks you through creating an EMR MR node and developing a WordCount job that reads text from an Object Storage Service (OSS) bucket and counts word frequencies.

Workflow overview

PhaseSteps
PrepareCreate input data and build the JAR file
CreateCreate an EMR MR node in DataStudio
DevelopReference the JAR file and write the task command
ScheduleConfigure scheduling properties
DeployCommit and deploy the task
VerifyCheck output in OSS or via a Hive external table

Prerequisites

Before you begin, make sure you have:

Required for all users

Required only in specific scenarios

  • (RAM users) The RAM user added to the workspace with the Develop or Workspace Administrator role. The Workspace Administrator role has more permissions than necessary. Exercise caution when you assign the Workspace Administrator role. See Add workspace members and assign roles to them.

  • (Optional) An EMR JAR resource created, if your node references open-source libraries. See Create and use an EMR resource.

  • (Optional) User-defined functions (UDFs) uploaded as EMR JAR resources and registered with EMR, if your node uses UDFs. See Create an EMR function.

Limitations

Prepare the sample data and JAR file

Create the input file

Create a file named input01.txt with the following content:

hadoop emr hadoop dw
hive hadoop
dw emr

Upload the input file to OSS

  1. Log on to the OSS console. In the left-side navigation pane, click Buckets.

  2. On the Buckets page, click the name of your bucket to go to the Objects page. This example uses the onaliyun-bucket-2 bucket.

  3. On the Objects page, click Create Directory to create two directories:

    • Set Directory Name to emr/datas/wordcount02/inputs to store the input data.

    • Set Directory Name to emr/jars to store JAR files.

  4. Go to the emr/datas/wordcount02/inputs directory and click Upload Object. In the Files to Upload section, click Select Files and upload input01.txt.

Build the JAR file

  1. Open your IntelliJ IDEA project and add the following POM dependencies:

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-common</artifactId>
        <version>2.8.5</version> <!--The version used by EMR MR is 2.8.5.-->
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>2.8.5</version>
    </dependency>
  2. Configure OSS credentials and endpoint in your code.

    Important

    Do not hard-code your AccessKey ID and AccessKey secret in source code or commit them to version control. The following sample code uses placeholder values for illustration only. In production, load credentials from environment variables or a secrets manager.

    ParameterDescription
    ${accessKeyId}Your AccessKey ID
    ${accessKeySecret}Your AccessKey secret
    ${endpoint}The OSS endpoint for the region where your EMR cluster resides. See OSS regions and endpoints.
    conf.set("fs.oss.accessKeyId", "${accessKeyId}");
    conf.set("fs.oss.accessKeySecret", "${accessKeySecret}");
    conf.set("fs.oss.endpoint","${endpoint}");

    The following is the complete WordCount sample. It extends the standard Hadoop WordCount example with OSS credentials so the job can read from and write to OSS.

    package cn.apache.hadoop.onaliyun.examples;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class EmrWordCount {
        public static class TokenizerMapper
                extends Mapper<Object, Text, Text, IntWritable> {
            private final static IntWritable one = new IntWritable(1);
            private Text word = new Text();
    
            public void map(Object key, Text value, Context context
            ) throws IOException, InterruptedException {
                StringTokenizer itr = new StringTokenizer(value.toString());
                while (itr.hasMoreTokens()) {
                    word.set(itr.nextToken());
                    context.write(word, one);
                }
            }
        }
    
        public static class IntSumReducer
                extends Reducer<Text, IntWritable, Text, IntWritable> {
            private IntWritable result = new IntWritable();
    
            public void reduce(Text key, Iterable<IntWritable> values,
                               Context context
            ) throws IOException, InterruptedException {
                int sum = 0;
                for (IntWritable val : values) {
                    sum += val.get();
                }
                result.set(sum);
                context.write(key, result);
            }
        }
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
            if (otherArgs.length < 2) {
                System.err.println("Usage: wordcount <in> [<in>...] <out>");
                System.exit(2);
            }
            conf.set("fs.oss.accessKeyId", "${accessKeyId}");
            conf.set("fs.oss.accessKeySecret", "${accessKeySecret}");
            conf.set("fs.oss.endpoint", "${endpoint}");
            Job job = Job.getInstance(conf, "word count");
            job.setJarByClass(EmrWordCount.class);
            job.setMapperClass(TokenizerMapper.class);
            job.setCombinerClass(IntSumReducer.class);
            job.setReducerClass(IntSumReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            for (int i = 0; i < otherArgs.length - 1; ++i) {
                FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
            }
            FileOutputFormat.setOutputPath(job,
                    new Path(otherArgs[otherArgs.length - 1]));
            System.exit(job.waitForCompletion(true) ?  0 : 1);
        }
    }
  3. Package the code into a JAR file. This example produces a file named onaliyun_mr_wordcount-1.0-SNAPSHOT.jar.

Step 1: Create an EMR MR node

  1. Go to the DataStudio page. Log on to the DataWorks console. In the top navigation bar, select the target region. In the left-side navigation pane, choose Data Development and O&M > Data Development. Select the target workspace from the drop-down list and click Go to Data Development.

  2. Create an EMR MR node.

    1. Find the target workflow, right-click the workflow name, and choose Create Node > EMR > EMR MR. > Note: Alternatively, move the pointer over the Create icon and choose Create Node > EMR > EMR MR.

    2. In the Create Node dialog box, set Name, Engine Instance, Node Type, and Path. Click Confirm. > Note: Node names can contain only letters, digits, underscores (_), and periods (.).

Step 2: Develop the EMR MR task

Two methods are available for referencing the JAR file in your task. Method 1 uploads the JAR file to DataStudio as an EMR JAR resource — this is the recommended approach. Method 2 references the JAR file directly from OSS using the OSS REF method, which is useful when JAR dependencies are too large to upload through the console or when tasks need to depend on OSS-hosted scripts.

Method 1: Upload and reference an EMR JAR resource (recommended)

  1. Create an EMR JAR resource. Upload the JAR file to DataStudio. See Create and use an EMR resource. In this example, upload onaliyun_mr_wordcount-1.0-SNAPSHOT.jar from the emr/jars directory. The first time you use an EMR JAR resource, click Authorize to grant DataWorks access, then click Upload.

    新建JAR资源

  2. Reference the JAR file in your node.

    1. Open the EMR MR node.

    2. In the EMR folder, find your resource under Resource, right-click its name, and select Insert Resource Path. In this example, the resource is onaliyun_mr_wordcount-1.0-SNAPSHOT.jar. 引用资源

    3. After the resource path is inserted, write the task command. Replace the JAR name, bucket name, and directory paths with your actual values.

      ##@resource_reference{"onaliyun_mr_wordcount-1.0-SNAPSHOT.jar"}
      onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs

      The first line (##@resource_reference{...}`) is a DataWorks directive that declares the JAR file the task depends on. DataWorks loads this resource before running the command on the second line. The second line specifies the JAR file name, the fully qualified main class, the OSS input path, and the OSS output path.

      Note

      EMR MR node code does not support comments.

Method 2: Reference an OSS resource

The OSS REF method lets the node reference a JAR file stored in OSS directly, without uploading it to DataStudio first. When the task runs, DataWorks automatically loads the specified OSS resource.

  1. Upload the JAR file to OSS.

    1. Log on to the OSS console. In the left-side navigation pane, click Buckets.

    2. Click the name of your bucket (onaliyun-bucket-2 in this example) to go to the Objects page.

    3. Go to the emr/jars directory, click Upload Object, select onaliyun_mr_wordcount-1.0-SNAPSHOT.jar in the Files to Upload section, and click Upload Object.

  2. Reference the JAR file in your node. On the configuration tab of the EMR MR node, write the following command:

    ParameterDescription
    ossref://{bucket}/{object}JAR path when the OSS bucket is in the same region as the EMR cluster. If the endpoint is omitted, only OSS resources in the same region as the EMR cluster can be referenced.
    ossref://{endpoint}/{bucket}/{object}JAR path when the OSS bucket is in a different region. Include the endpoint to cross regions.
    hadoop jar ossref://onaliyun-bucket-2/emr/jars/onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs

    This command follows the format hadoop jar <JAR path> <main class> <input path> <output path>. The JAR path uses the ossref://{endpoint}/{bucket}/{object} format.

(Optional) Configure advanced parameters

Configure advanced parameters on the Advanced Settings tab. For more information about how to configure the parameters, see Spark Configuration. The available parameters depend on your cluster type.

DataLake cluster or custom cluster (created on the EMR on ECS page)

ParameterDescription
queueThe YARN scheduling queue. Default: default. See YARN schedulers.
priorityJob priority. Default: 1.
OthersCustom parameters added on the Advanced Settings tab are passed to the job in -D key=value format when the task is committed.

Hadoop cluster (created on the EMR on ECS page)

ParameterDescription
queueThe YARN scheduling queue. Default: default. See YARN schedulers.
priorityJob priority. Default: 1.
USE_GATEWAYWhether to submit jobs through a gateway cluster. Set to true to use a gateway cluster; false (default) submits jobs directly to the master node. If the cluster has no associated gateway cluster and USE_GATEWAY is true, job submission may fail.

Run the task

  1. In the toolbar, click the 高级运行 icon. In the Parameters dialog box, select the target resource group from the Resource Group Name drop-down list and click Run.

    To access a computing resource over the Internet or a virtual private cloud (VPC), use a resource group that is connected to that computing resource. See Network connectivity solutions.
  2. Click the 保存 icon to save the node configuration.

  3. (Optional) Perform smoke testing on the node in the development environment before or after committing it. See Perform smoke testing.

Step 3: Configure scheduling properties

To run the task on a schedule, click Properties in the right-side navigation pane and configure the scheduling settings. See Overview.

Configure the Rerun and Parent Nodes parameters on the Properties tab before committing the task.

Step 4: Deploy the task

After committing and deploying the task, the system runs it automatically on your defined schedule.

  1. Click the 保存 icon to save the task.

  2. Click the 提交 icon to commit the task. In the Submit dialog box, enter a Change description.

    - Configure the Rerun and Parent Nodes parameters on the Properties tab before committing. - If code review is enabled, the committed task can only be deployed after the code passes review. See Code review.
  3. If your workspace is in standard mode, click Deploy in the upper-right corner of the configuration tab to deploy the task to the production environment. See Deploy nodes.

View the results

After the task runs successfully, verify the output in two ways:

  • OSS console: Log on to the OSS console and browse to the emr/datas/wordcount02/outputs directory to see the output files.

    目标Bucket

  • DataWorks console: Create an EMR Hive node (see Create an EMR Hive node) and query the results by mounting a Hive external table to the OSS output path:

    CREATE EXTERNAL TABLE IF NOT EXISTS wordcount02_result_tb
    (
        `word` STRING COMMENT 'Word',
        `count` STRING COMMENT 'Count'
    )
    ROW FORMAT delimited fields terminated by '\t'
    location 'oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs/';
    
    SELECT * FROM wordcount02_result_tb;

    运行结果

What's next

After the task is deployed, it runs automatically on your defined schedule. To monitor run status and manage instances, click Operation Center in the upper-right corner of the configuration tab. See View and manage auto triggered tasks.