All Products
Search
Document Center

DataWorks:EMR MR Node

Last Updated:Mar 27, 2026

This tutorial shows you how to run a word count MapReduce job on an E-MapReduce (EMR) cluster using DataWorks Data Studio. The job reads a text file from Object Storage Service (OSS) and counts word occurrences. By splitting large datasets into multiple parallel Map tasks, MapReduce significantly improves data processing efficiency. You will prepare source data, build a JAR package, configure an EMR MR node, run the job, and view the results.

Prerequisites

Before you begin, make sure you have:

  • An EMR cluster created and registered with DataWorks. See Data Studio: Associate an EMR computing resource

  • (Optional) If you use a Resource Access Management (RAM) user, add the user to the workspace with the Developer or Workspace Administrator role. The Workspace Administrator role has extensive permissions — grant it with caution. See Add members to a workspace. Alibaba Cloud account users can skip this step

  • An OSS bucket. See Create buckets

Limitations

  • EMR MR nodes can only run on a Serverless Resource Group (recommended) or an Exclusive Scheduling Resource Group.

  • For DataLake or custom clusters, configure EMR-HOOK on the cluster before managing metadata in DataWorks. See Configure Hive EMR-HOOK.

    Note

    Without EMR-HOOK, DataWorks cannot display metadata in real time, generate audit logs, display Data Lineage, or perform EMR-related data governance tasks.

How it works

This tutorial has five stages:

Stage Description
1. Prepare source data Create a sample text file and upload it to OSS
2. Build the JAR package Write and compile a MapReduce word count program that reads from OSS
3. Configure the EMR MR node Upload or reference the JAR package in Data Studio, then enter the run command
4. Run the job Execute the node and configure scheduling if needed
5. View results Check the output files in OSS or query them with a Hive external table

Step 1: Prepare source data

Create the input file

Create a file named input01.txt with the following content:

hadoop emr hadoop dw
hive hadoop
dw emr

Upload source data to OSS

  1. Log in to the OSS console. In the left navigation pane, click Bucket List.

  2. Click the target bucket name to open the File Management page. This tutorial uses onaliyun-bucket-2.

  3. Click Create Directory to create two directories:

    • emr/datas/wordcount02/inputs — for the source data file

    • emr/jars — for the JAR package

  4. Navigate to the emr/datas/wordcount02/inputs directory, click Upload File, then click Scan for Files in the Files to upload section. Select input01.txt and click Upload File.

Step 2: Build the JAR package

Add Maven dependencies

In your IDEA project, add the following dependencies to pom.xml. Use version 2.8.5 to match the EMR MR cluster runtime — version mismatches cause class compatibility errors.

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-common</artifactId>
    <version>2.8.5</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.8.5</version>
</dependency>

Write the MapReduce program

The program uses three OSS configuration parameters to access files:

Parameter Description
fs.oss.accessKeyId Your AccessKey ID
fs.oss.accessKeySecret Your AccessKey secret
fs.oss.endpoint The OSS endpoint for the region where your cluster and bucket reside. See Regions and endpoints. The OSS bucket must be in the same region as the EMR cluster.
Important

An Alibaba Cloud account's AccessKey grants full permissions for all API operations. Use a RAM user's AccessKey for API calls and daily operations. Never hardcode credentials in your code or commit them to a repository.

The following sample code adapts the standard Hadoop WordCount example with OSS access. Note the package name (cn.apache.hadoop.onaliyun.examples) and class name (EmrWordCount) — you will use both when submitting the job in Step 3.

Sample code

package cn.apache.hadoop.onaliyun.examples;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class EmrWordCount {
    public static class TokenizerMapper
            extends Mapper<Object, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context
        ) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class IntSumReducer
            extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values,
                           Context context
        ) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length < 2) {
            System.err.println("Usage: wordcount <in> [<in>...] <out>");
            System.exit(2);
        }
        conf.set("fs.oss.accessKeyId", "${accessKeyId}");
        conf.set("fs.oss.accessKeySecret", "${accessKeySecret}");
        conf.set("fs.oss.endpoint", "${endpoint}");
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(EmrWordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        for (int i = 0; i < otherArgs.length - 1; ++i) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }
        FileOutputFormat.setOutputPath(job,
                new Path(otherArgs[otherArgs.length - 1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

Build the JAR

Build the JAR package. This tutorial uses onaliyun_mr_wordcount-1.0-SNAPSHOT.jar as the file name.

Step 3: Configure the EMR MR node

Data Studio provides two ways to make the JAR available to the EMR MR node. Choose based on your situation:

Method When to use
Upload and reference The JAR is developed locally and you want to manage it inside DataWorks. Use this when the file is small enough to upload through the UI.
Reference from OSS The JAR is already in OSS, or it is too large to upload through the DataWorks UI. DataWorks automatically downloads the JAR to the execution environment at runtime.

Method 1: Upload and reference

  1. Create an EMR JAR resource in Data Studio. See Resource management. Store the JAR in the emr/jars directory, click Click to upload, then select onaliyun_mr_wordcount-1.0-SNAPSHOT.jar.

  2. Set the Storage Path, Data Source, and Resource Group, then click Save.

    image

  3. Open the EMR MR node and go to the code editor page.

  4. In the resource management pane on the left, right-click onaliyun_mr_wordcount-1.0-SNAPSHOT.jar and select Reference Resource.

  5. After the reference annotation appears, enter the run command. The fully qualified class name is cn.apache.hadoop.onaliyun.examples.EmrWordCount — the package and class name from the Java code in Step 2. Replace the bucket name and paths with your actual values.

    Note

    Comment statements are not supported in the EMR MR node editor.

    ##@resource_reference{"onaliyun_mr_wordcount-1.0-SNAPSHOT.jar"}
    onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs

Method 2: Reference from OSS

You can directly reference an OSS resource from the node by using OSS REF. DataWorks automatically downloads the referenced OSS resource to the local execution environment at runtime.

  1. Upload the JAR to OSS. Log in to the OSS console and navigate to the emr/jars directory in onaliyun-bucket-2. Click Upload File, then click Scan for Files to add onaliyun_mr_wordcount-1.0-SNAPSHOT.jar.

  2. On the EMR MR node configuration page, enter the following command. The format is: hadoop jar <path_to_JAR> <fully_qualified_class_name> <input_directory> <output_directory>

    Component Description
    endpoint The OSS endpoint. If left blank, DataWorks can only access OSS in the same region as the EMR cluster. The bucket must be in the same region as the cluster.
    bucket The OSS bucket name.
    object The file path within the bucket.
    hadoop jar ossref://onaliyun-bucket-2/emr/jars/onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs

    The ossref:// prefix tells DataWorks to download the JAR before execution. The path format is ossref://{endpoint}/{bucket}/{object}:

Configure advanced parameters (optional)

Set additional parameters in the EMR Node Parameter or DataWorks Parameter section on the right side of the node configuration pane. Available parameters depend on the cluster type.

Datalake or custom cluster
Parameter Default Description
queue default The YARN scheduling queue for the job. See Basic queue configurations.
priority 1 Job priority.
FLOW_SKIP_SQL_ANALYZE false Set to true to execute multiple SQL statements at once; false executes one SQL statement at a time. For use in the data development environment only.
Others Custom MR job parameters added as -D key=value statements when DataWorks submits the job.
Hadoop cluster
Parameter Default Description
queue default The YARN scheduling queue for the job. See Basic queue configurations.
priority 1 Job priority.
USE_GATEWAY false Set to true to submit the job through a gateway cluster; false submits to the header node. If the cluster has no associated gateway cluster and this is set to true, job submission fails.

Step 4: Run the job

  1. In the Run Configuration section, set Compute Resources and Resource Group.

    Note
    • You can also configure Scheduling CUs based on the resource requirements of the task. The default CU value is 0.25.

    • To access data sources over the public internet or a VPC, use a scheduling resource group that has passed the connectivity test with the data source. See Network connectivity solutions.

  2. In the parameter dialog in the toolbar, select the data source and click Run.

  3. To run the node on a schedule, configure scheduling properties. See Node scheduling configuration.

  4. Deploy the node to make it operational. See Node and workflow deployment.

Step 5: View results

After the job completes, check the output in two ways:

In the OSS console: Navigate to the emr/datas/wordcount02/outputs directory in onaliyun-bucket-2 to find the output files.

目标Bucket

In DataWorks using Hive: Create an EMR Hive node (see Create a node for a scheduled workflow), then create an external Hive table that points to the output path and query it:

CREATE EXTERNAL TABLE IF NOT EXISTS wordcount02_result_tb
(
    `word` STRING COMMENT 'word',
    `cout` STRING COMMENT 'count'
)
ROW FORMAT delimited fields terminated by '\t'
location 'oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs/';

SELECT * FROM wordcount02_result_tb;

The query returns the word count results stored in the output directory.

运行结果

What's next