E-MapReduce (EMR) MR nodes run Hadoop MapReduce jobs on your EMR cluster directly from DataWorks. Each job distributes work across multiple parallel map tasks, making them suited for large-scale batch processing. This topic walks you through creating an EMR MR node and developing a WordCount job that reads text from an Object Storage Service (OSS) bucket and counts word frequencies.
Workflow overview
| Phase | Steps |
|---|---|
| Prepare | Create input data and build the JAR file |
| Create | Create an EMR MR node in DataStudio |
| Develop | Reference the JAR file and write the task command |
| Schedule | Configure scheduling properties |
| Deploy | Commit and deploy the task |
| Verify | Check output in OSS or via a Hive external table |
Prerequisites
Before you begin, make sure you have:
Required for all users
An EMR cluster registered to DataWorks. See DataStudio (old version): Associate an EMR computing resource.
A serverless resource group purchased, associated with your workspace, and connected to the network. See Create and use a serverless resource group.
A workflow created in DataStudio. See Create a workflow.
An OSS bucket. See Create buckets.
Required only in specific scenarios
(RAM users) The RAM user added to the workspace with the Develop or Workspace Administrator role. The Workspace Administrator role has more permissions than necessary. Exercise caution when you assign the Workspace Administrator role. See Add workspace members and assign roles to them.
(Optional) An EMR JAR resource created, if your node references open-source libraries. See Create and use an EMR resource.
(Optional) User-defined functions (UDFs) uploaded as EMR JAR resources and registered with EMR, if your node uses UDFs. See Create an EMR function.
Limitations
EMR MR nodes run only on a serverless resource group or an exclusive resource group for scheduling. Use a serverless resource group when possible.
To manage metadata for a DataLake or custom cluster in DataWorks, configure EMR-HOOK on your cluster first. Without EMR-HOOK, metadata updates are not real-time, audit logs are not generated, data lineage is not displayed, and EMR governance tasks cannot run. See Use the Hive extension feature to record data lineage and historical access information.
Prepare the sample data and JAR file
Create the input file
Create a file named input01.txt with the following content:
hadoop emr hadoop dw
hive hadoop
dw emrUpload the input file to OSS
Log on to the OSS console. In the left-side navigation pane, click Buckets.
On the Buckets page, click the name of your bucket to go to the Objects page. This example uses the
onaliyun-bucket-2bucket.On the Objects page, click Create Directory to create two directories:
Set Directory Name to
emr/datas/wordcount02/inputsto store the input data.Set Directory Name to
emr/jarsto store JAR files.
Go to the
emr/datas/wordcount02/inputsdirectory and click Upload Object. In the Files to Upload section, click Select Files and uploadinput01.txt.
Build the JAR file
Open your IntelliJ IDEA project and add the following POM dependencies:
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-common</artifactId> <version>2.8.5</version> <!--The version used by EMR MR is 2.8.5.--> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.8.5</version> </dependency>Configure OSS credentials and endpoint in your code.
ImportantDo not hard-code your AccessKey ID and AccessKey secret in source code or commit them to version control. The following sample code uses placeholder values for illustration only. In production, load credentials from environment variables or a secrets manager.
Parameter Description ${accessKeyId}Your AccessKey ID ${accessKeySecret}Your AccessKey secret ${endpoint}The OSS endpoint for the region where your EMR cluster resides. See OSS regions and endpoints. conf.set("fs.oss.accessKeyId", "${accessKeyId}"); conf.set("fs.oss.accessKeySecret", "${accessKeySecret}"); conf.set("fs.oss.endpoint","${endpoint}");The following is the complete WordCount sample. It extends the standard Hadoop WordCount example with OSS credentials so the job can read from and write to OSS.
package cn.apache.hadoop.onaliyun.examples; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class EmrWordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } conf.set("fs.oss.accessKeyId", "${accessKeyId}"); conf.set("fs.oss.accessKeySecret", "${accessKeySecret}"); conf.set("fs.oss.endpoint", "${endpoint}"); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(EmrWordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for (int i = 0; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }Package the code into a JAR file. This example produces a file named
onaliyun_mr_wordcount-1.0-SNAPSHOT.jar.
Step 1: Create an EMR MR node
Go to the DataStudio page. Log on to the DataWorks console. In the top navigation bar, select the target region. In the left-side navigation pane, choose Data Development and O&M > Data Development. Select the target workspace from the drop-down list and click Go to Data Development.
Create an EMR MR node.
Find the target workflow, right-click the workflow name, and choose Create Node > EMR > EMR MR. > Note: Alternatively, move the pointer over the Create icon and choose Create Node > EMR > EMR MR.
In the Create Node dialog box, set Name, Engine Instance, Node Type, and Path. Click Confirm. > Note: Node names can contain only letters, digits, underscores (_), and periods (.).
Step 2: Develop the EMR MR task
Two methods are available for referencing the JAR file in your task. Method 1 uploads the JAR file to DataStudio as an EMR JAR resource — this is the recommended approach. Method 2 references the JAR file directly from OSS using the OSS REF method, which is useful when JAR dependencies are too large to upload through the console or when tasks need to depend on OSS-hosted scripts.
Method 1: Upload and reference an EMR JAR resource (recommended)
Create an EMR JAR resource. Upload the JAR file to DataStudio. See Create and use an EMR resource. In this example, upload
onaliyun_mr_wordcount-1.0-SNAPSHOT.jarfrom theemr/jarsdirectory. The first time you use an EMR JAR resource, click Authorize to grant DataWorks access, then click Upload.
Reference the JAR file in your node.
Open the EMR MR node.
In the EMR folder, find your resource under Resource, right-click its name, and select Insert Resource Path. In this example, the resource is
onaliyun_mr_wordcount-1.0-SNAPSHOT.jar.
After the resource path is inserted, write the task command. Replace the JAR name, bucket name, and directory paths with your actual values.
##@resource_reference{"onaliyun_mr_wordcount-1.0-SNAPSHOT.jar"} onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputsThe first line (##@resource_reference{...}`) is a DataWorks directive that declares the JAR file the task depends on. DataWorks loads this resource before running the command on the second line. The second line specifies the JAR file name, the fully qualified main class, the OSS input path, and the OSS output path.
NoteEMR MR node code does not support comments.
Method 2: Reference an OSS resource
The OSS REF method lets the node reference a JAR file stored in OSS directly, without uploading it to DataStudio first. When the task runs, DataWorks automatically loads the specified OSS resource.
Upload the JAR file to OSS.
Log on to the OSS console. In the left-side navigation pane, click Buckets.
Click the name of your bucket (
onaliyun-bucket-2in this example) to go to the Objects page.Go to the
emr/jarsdirectory, click Upload Object, selectonaliyun_mr_wordcount-1.0-SNAPSHOT.jarin the Files to Upload section, and click Upload Object.
Reference the JAR file in your node. On the configuration tab of the EMR MR node, write the following command:
Parameter Description ossref://{bucket}/{object}JAR path when the OSS bucket is in the same region as the EMR cluster. If the endpoint is omitted, only OSS resources in the same region as the EMR cluster can be referenced. ossref://{endpoint}/{bucket}/{object}JAR path when the OSS bucket is in a different region. Include the endpoint to cross regions. hadoop jar ossref://onaliyun-bucket-2/emr/jars/onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputsThis command follows the format
hadoop jar <JAR path> <main class> <input path> <output path>. The JAR path uses theossref://{endpoint}/{bucket}/{object}format.
(Optional) Configure advanced parameters
Configure advanced parameters on the Advanced Settings tab. For more information about how to configure the parameters, see Spark Configuration. The available parameters depend on your cluster type.
DataLake cluster or custom cluster (created on the EMR on ECS page)
| Parameter | Description |
|---|---|
queue | The YARN scheduling queue. Default: default. See YARN schedulers. |
priority | Job priority. Default: 1. |
| Others | Custom parameters added on the Advanced Settings tab are passed to the job in -D key=value format when the task is committed. |
Hadoop cluster (created on the EMR on ECS page)
| Parameter | Description |
|---|---|
queue | The YARN scheduling queue. Default: default. See YARN schedulers. |
priority | Job priority. Default: 1. |
USE_GATEWAY | Whether to submit jobs through a gateway cluster. Set to true to use a gateway cluster; false (default) submits jobs directly to the master node. If the cluster has no associated gateway cluster and USE_GATEWAY is true, job submission may fail. |
Run the task
In the toolbar, click the
icon. In the Parameters dialog box, select the target resource group from the Resource Group Name drop-down list and click Run.To access a computing resource over the Internet or a virtual private cloud (VPC), use a resource group that is connected to that computing resource. See Network connectivity solutions.
Click the
icon to save the node configuration.(Optional) Perform smoke testing on the node in the development environment before or after committing it. See Perform smoke testing.
Step 3: Configure scheduling properties
To run the task on a schedule, click Properties in the right-side navigation pane and configure the scheduling settings. See Overview.
Configure the Rerun and Parent Nodes parameters on the Properties tab before committing the task.
Step 4: Deploy the task
After committing and deploying the task, the system runs it automatically on your defined schedule.
Click the
icon to save the task.Click the
icon to commit the task. In the Submit dialog box, enter a Change description.- Configure the Rerun and Parent Nodes parameters on the Properties tab before committing. - If code review is enabled, the committed task can only be deployed after the code passes review. See Code review.
If your workspace is in standard mode, click Deploy in the upper-right corner of the configuration tab to deploy the task to the production environment. See Deploy nodes.
View the results
After the task runs successfully, verify the output in two ways:
OSS console: Log on to the OSS console and browse to the
emr/datas/wordcount02/outputsdirectory to see the output files.
DataWorks console: Create an EMR Hive node (see Create an EMR Hive node) and query the results by mounting a Hive external table to the OSS output path:
CREATE EXTERNAL TABLE IF NOT EXISTS wordcount02_result_tb ( `word` STRING COMMENT 'Word', `count` STRING COMMENT 'Count' ) ROW FORMAT delimited fields terminated by '\t' location 'oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs/'; SELECT * FROM wordcount02_result_tb;
What's next
After the task is deployed, it runs automatically on your defined schedule. To monitor run status and manage instances, click Operation Center in the upper-right corner of the configuration tab. See View and manage auto triggered tasks.