In E-MapReduce (EMR) task development, you can create a MapReduce (MR) node to decompose a large dataset into multiple map tasks for parallel processing. This method significantly improves data processing efficiency. This topic uses an example that reads a text file from Object Storage Service (OSS) and counts the words in the file to describe how to develop and configure an EMR MR node job.
Prerequisites
An EMR cluster is created and the cluster is registered to DataWorks. For more information, see DataStudio (old version): Associate an EMR computing resource.
(Required if you use a RAM user to develop tasks) The RAM user is added to your DataWorks workspace as a member and is assigned the Development or Workspace Manager role. The Workspace Manager role has more permissions than necessary. Exercise caution when you assign the Workspace Manager role. For more information about how to add a member, see Add workspace members and assign roles to them.
NoteIf you use an Alibaba Cloud account, you can skip this operation.
A workspace directory is created. For more information, see Workspace directories.
If you want to reference open source code resources or user-defined functions in the node, you must create them in Resource Management. For more information, see Resource management.
If you use the job development example in this topic to run the workflow, you must also create an OSS bucket. For more information about how to create an OSS bucket, see Create buckets in the console.
An EMR MR node is created. For more information, see Create an auto-triggered task.
Limits
This type of node can be run only on a serverless resource group or an exclusive resource group for scheduling. We recommend that you use a serverless resource group.
If you want to manage metadata for a DataLake or custom cluster in DataWorks, you must first configure EMR-HOOK in the cluster. For more information about how to configure EMR-HOOK, see Configure EMR-HOOK for Hive.
NoteIf you do not configure EMR-HOOK in your cluster, metadata cannot be displayed in real time, audit logs cannot be generated, or data lineages cannot be displayed in DataWorks. In addition, EMR-related administration tasks cannot be run.
Prepare initial data and a JAR resource package
Prepare initial data
Create a file named input01.txt that contains the following initial data:
hadoop emr hadoop dw
hive hadoop
dw emrUpload the initial data file
Log on to the OSS console. In the navigation pane on the left, click Buckets.
Click the destination bucket name to go to the Files page.
In this example, the
onaliyun-bucket-2bucket is used.Click Create Directory to create directories to store initial data and JAR resources.
Set Directory Name to
emr/datas/wordcount02/inputsto create a directory to store the initial data.Set Directory Name to
emr/jarsto create a directory to store JAR resources.
Upload the initial data file to the directory created for storing initial data.
Navigate to the
/emr/datas/wordcount02/inputspath and click Upload.In the Files To Upload section, click Scan For Files, select the
input01.txtfile, and then click Upload.
Use MapReduce to read the OSS file and generate a JAR package
Open an existing IDEA project and add pom dependencies.
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-common</artifactId> <version>2.8.5</version> <!--The version used by EMR-MR is 2.8.5.--> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.8.5</version> </dependency>To read data from and write data to an OSS file in MapReduce, you must configure the following parameters.
ImportantRisk warning: The AccessKey pair of an Alibaba Cloud account has all permissions for API operations. We recommend that you use a RAM user to call API operations or perform routine O&M. We strongly recommend that you do not save the AccessKey ID and AccessKey secret of your Alibaba Cloud account in your project code or any other location that is prone to leaks. An AccessKey pair leak may threaten the security of all resources in your account. The following sample code is for reference only. Keep your AccessKey information confidential.
conf.set("fs.oss.accessKeyId", "${accessKeyId}"); conf.set("fs.oss.accessKeySecret", "${accessKeySecret}"); conf.set("fs.oss.endpoint","${endpoint}");The following table describes the parameters.
${accessKeyId}: the AccessKey ID of your Alibaba Cloud account.${accessKeySecret}: the AccessKey secret of your Alibaba Cloud account.${endpoint}: The endpoint of the OSS bucket. The endpoint is determined by the region where your cluster resides. The OSS bucket must also reside in the same region as the cluster. For more information, see OSS regions and endpoints.
The following Java code provides an example of how to modify the WordCount example from the official Hadoop website. The AccessKey ID and AccessKey secret configurations are added to the code to grant the job permissions to access the OSS file.
After you edit the Java code, generate a JAR package from the code. In this example, a JAR package named
onaliyun_mr_wordcount-1.0-SNAPSHOT.jaris generated.
Procedure
On the EMR MR node editing page, perform the following development operations:
Develop an EMR MR task
You can select an operation plan based on your scenario:
Method 1: Upload a resource and then reference the EMR JAR resource
DataWorks lets you upload a resource from your on-premises machine to DataStudio and then reference the resource. If the EMR MR node depends on a large resource that cannot be uploaded on the DataWorks page, you can store the resource in HDFS and then reference it in the code.
Create an EMR JAR resource.
For more information, see Resource management. Store the JAR package generated in the Prepare initial data and a JAR resource package section to the
emr/jarsdirectory. Click the Click To Upload button to upload the JAR resource.Select a Storage Path, Data Source, and Resource Group.
Click the Save button.

Reference the EMR JAR resource.
You can open the created EMR MR node to access the code editor page.
In the Resource Management section of the left navigation pane, find the resource to reference. In this example, the resource is
onaliyun_mr_wordcount-1.0-SNAPSHOT.jar. Right-click the resource and select Reference Resource.After you select a resource, a success message confirming the reference is displayed on the code editor page for the EMR MR node. Next, run the following command. Remember to replace the example resource package, bucket name, and path with your actual information.
##@resource_reference{"onaliyun_mr_wordcount-1.0-SNAPSHOT.jar"} onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputsNoteComment statements are not supported when you edit the code for an EMR MR node.
Method 2: Directly reference an OSS resource
The current node can directly reference an OSS resource using the OSS REF method. When you run the EMR node, DataWorks automatically loads the OSS resource in the code to the local device. This method is often used in scenarios where EMR tasks need to run JAR dependencies or depend on scripts.
Upload the JAR resource.
After you complete code development, log on to the OSS console and click Buckets in the navigation pane on the left.
Click the destination bucket name to go to the Files page.
In this example, the
onaliyun-bucket-2bucket is used.Upload the JAR resource to the directory created for storing JAR resources.
Go to the
emr/jarsdirectory and click Upload. In the Files To Upload section, click Scan For Files, add theonaliyun_mr_wordcount-1.0-SNAPSHOT.jarfile to the bucket, and then click Upload.
Reference the JAR resource.
On the EMR MR node editing page, edit the code to reference the JAR resource.
On the EMR MR node editing page, edit the code for referencing the JAR resource.
hadoop jar ossref://onaliyun-bucket-2/emr/jars/onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputsNoteThe preceding command uses the following format:
hadoop jar <Path to the JAR file to be referenced and run> <Full name of the main class to be run> <Directory of the file to be read> <Directory for the results>.The following table describes the parameter for the path to the JAR file that is referenced and run.
Parameter
Description
Path where the JAR file to be referenced and run is stored
The path is in the
ossref://{endpoint}/{bucket}/{object}format.endpoint: the endpoint of the OSS bucket. If the endpoint parameter is left empty, only the OSS bucket that resides in the same region as the current EMR cluster can be used.
Bucket: a container that is used to store objects in OSS. Each Bucket has a unique name. You can log on to the OSS console to view all Buckets within the current logon account.
object: a specific object (file name or path) that is stored in a Bucket.
(Optional) Configure advanced parameters
On the Scheduling Configuration pane on the right side of the node, you can configure the parameters described in the following table in the section.
NoteThe available advanced parameters vary based on the EMR cluster type, as described in the following table.
You can configure open source Spark properties in the section of the Scheduling Configuration pane on the right side of the node.
DataLake cluster/custom cluster: EMR on ECS
Advanced parameter
Description
queue
The scheduling queue to which jobs are submitted. The default value is default. For information about EMR YARN, see Basic queue configurations.
priority
The priority. The default value is 1.
FLOW_SKIP_SQL_ANALYZE
The method that is used to execute SQL statements. Valid values:
true: Multiple SQL statements are executed at a time.false(default value): Only one SQL statement is executed at a time.
NoteThis parameter is supported only for testing the running process in the data development environment.
Others
You can also directly append custom MR task parameters in the advanced configuration. When you submit the code, DataWorks automatically adds the new parameters to the command using the
-D key=valuestatement.Hadoop cluster: EMR on ECS
Advanced parameter
Description
queue
The scheduling queue to which jobs are submitted. The default value is default. For information about EMR YARN, see Basic queue configurations.
priority
The priority. The default value is 1.
USE_GATEWAY
Specifies whether to submit jobs on this node through a Gateway cluster. Valid values:
true: submits jobs through a Gateway cluster.false(default value): does not submit jobs through a Gateway cluster. By default, jobs are submitted to the header node.
NoteIf the cluster to which this node belongs is not associated with a Gateway cluster, but you manually set this parameter to
true, the subsequent submission of EMR jobs fails.Execute an SQL task
On the Debugging Configurations tab, in the Computing Resource section, configure the Computing Resource and DataWorks Resource Group parameters.
NoteYou can also configure Scheduling CUs based on the resources required for task execution. The default CU value is
0.25.To access a data source over the internet or a VPC, you must use an exclusive resource group for scheduling that passes the connectivity test with the data source. For more information, see Network connectivity solutions.
In the parameter dialog box on the toolbar, select your data source and click Run to execute the SQL task.
If you want to run the node task on a regular basis, you can configure its scheduling properties. For more information, see Node scheduling.
After the node is configured, you must publish it. For more information, see Publish a node or workflow.
After the task is published, you can view the running status of the auto-triggered task in Operation Center. For more information, see Getting started with Operation Center.
View the results
Log on to the OSS console. You can view the results in the output directory of the destination bucket. An example path is emr/datas/wordcount02/outputs.

Read the statistical results in DataWorks:
Create an EMR Hive node. For more information, see Create an auto-triggered task.
On the EMR Hive node, create a Hive external table that is mounted to OSS and read the table data. The following code provides an example.
CREATE EXTERNAL TABLE IF NOT EXISTS wordcount02_result_tb ( `word` STRING COMMENT 'word', `cout` STRING COMMENT 'count' ) ROW FORMAT delimited fields terminated by '\t' location 'oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs/'; SELECT * FROM wordcount02_result_tb;The following figure shows the execution results.
