You can create an E-MapReduce (EMR) MR node to process a large dataset by using multiple parallel map tasks. EMR MR nodes help significantly improve data processing efficiency. This topic describes how to develop and configure an EMR MR node. In this example, the node is used to read data from an Object Storage Service (OSS) object in an OSS bucket and count the number of words in the OSS object.
Prerequisites
An EMR cluster is created and the cluster is registered to DataWorks. For more information, see Register an EMR cluster to DataWorks.
(Required if you use a RAM user to develop tasks) The RAM user is added to your DataWorks workspace as a member and is assigned the Development or Workspace Manager role. The Workspace Manager role has more permissions than necessary. Exercise caution when you assign the Workspace Manager role. For more information about how to add a member, see Add workspace members and assign roles to them.
NoteIf you use an Alibaba Cloud account, you can skip this operation.
A workspace directory is created. For more information, see Workspace directories.
The open source code is uploaded as a resource or user-defined functions (UDFs) are uploaded as resources in the RESOURCE MANAGEMENT: ALL pane. This prerequisite must be met if you want to reference open source code or UDFs in your EMR MR node. For more information, see Resource management.
An OSS bucket is created. To use the sample code for task development in this topic, you must prepare an OSS bucket. For more information about how to create an OSS bucket, see Create buckets.
An EMR MR node is created.
Limits
This type of node can be run only on a serverless resource group or an exclusive resource group for scheduling. We recommend that you use a serverless resource group.
If you want to manage metadata for a DataLake or custom cluster in DataWorks, you must configure EMR-HOOK in the cluster first. For more information about how to configure EMR-HOOK, see Use the Hive extension feature to record data lineage and historical access information.
NoteIf you do not configure EMR-HOOK in your cluster, metadata cannot be displayed in real time, audit logs cannot be generated, and data lineages cannot be displayed in DataWorks. EMR governance tasks also cannot be run.
Prepare initial data and a JAR resource package
Prepare initial data
Create a file that is named input01.txt
and contains the following initial data:
hadoop emr hadoop dw
hive hadoop
dw emr
Upload the file that stores the initial data
Log on to the OSS console. In the left-side navigation pane, click Buckets.
On the Buckets page, find the desired bucket and click the bucket name to go to the Objects page.
In this example, the
onaliyun-bucket-2
bucket is used.On the Objects page, click Create Directory to create directories that are used to store initial data and JAR resources.
Set Directory Name to
emr/datas/wordcount02/inputs
to create a directory that is used to store initial data.Set Directory Name to
emr/jars
to create a directory that is used to store JAR resources.
Upload the file that stores the initial data to the emr/datas/wordcount02/inputs directory.
Go to the
/emr/datas/wordcount02/inputs
directory and click Upload Object.In the Files to Upload section, click Select Files and upload the
input01.txt
file to the bucket.
Use the EMR MR node to read the OSS object and generate a JAR package
Open an existing IntelliJ IDEA project and add Project Object Model (POM) dependencies.
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-common</artifactId> <version>2.8.5</version> <!--The version used by EMR MR is 2.8.5.--> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.8.5</version> </dependency>
Configure the following parameters to read data from and write data to the OSS object.
ImportantAn Alibaba Cloud account has permissions to call all API operations. If the AccessKey pair of your Alibaba Cloud account is leaked, all resources in your Alibaba Cloud account may be exposed to high security risks. We recommend that you do not save the AccessKey ID and AccessKey secret of your Alibaba Cloud account into the project code or positions that can be easily located. We recommend that you use a RAM user to call API operations or perform routine O&M. The following sample code is provided only for reference. Keep the AccessKey pair of your Alibaba Cloud account confidential.
conf.set("fs.oss.accessKeyId", "${accessKeyId}"); conf.set("fs.oss.accessKeySecret", "${accessKeySecret}"); conf.set("fs.oss.endpoint","${endpoint}");
Parameter description:
${accessKeyId}
: the AccessKey ID of your Alibaba Cloud account.${accessKeySecret}
: the AccessKey secret of your Alibaba Cloud account.${endpoint}
: the endpoint of OSS. The endpoint is determined by the region where your EMR cluster resides. You must activate OSS in the region where your EMR cluster resides. For more information, see Regions and endpoints.
In this topic, the Java code is used to modify the WordCount example on the Hadoop official website. The configuration of the AccessKey ID and AccessKey secret is added to the code. This grants the job the permissions to access OSS objects.
After you write the preceding code, compress the code into a JAR package. In this example, a package named
onaliyun_mr_wordcount-1.0-SNAPSHOT.jar
is generated.
Procedure
On the configuration tab of the EMR MR node, perform the following operations:
Develop an EMR MR task
You can use one of the following methods based on your business requirements to develop an EMR MR task:
Method 1: Upload and reference an EMR JAR resource
DataWorks allows you to upload a resource from your on-premises machine to Data Studio before you can reference the resource. If the EMR MR node depends on large amounts of resources, the resources cannot be uploaded by using the DataWorks console. In this case, you can store the resources in Hadoop Distributed File System (HDFS) and reference the resources in the code of the EMR MR node.
Create an EMR JAR resource.
For more information about how to create an EMR JAR resource, see Resource management. In this example, the JAR package that is generated in the Prepare initial data and a JAR resource package section in this topic is stored in the
emr/jars
directory. Click Upload to upload the JAR package.Configure the Storage Path, Data Source, and Resource Group parameters.
Click Save.
Reference the EMR JAR resource.
Open the EMR MR node. The configuration tab of the node appears.
Find the resource that you want to reference in the RESOURCE MANAGEMENT: ALL pane in the left-side navigation pane of the Data Studio page, right-click the resource name, and then select Reference Resources. In this example, the resource is
onaliyun_mr_wordcount-1.0-SNAPSHOT.jar
.If the information in the ##@resource_reference{""} format appears on the configuration tab of the EMR MR node, the code resource is referenced. Then, run the following code. You must replace the information in the following code with the actual information. The information includes the resource package name, bucket name, and path.
##@resource_reference{"onaliyun_mr_wordcount-1.0-SNAPSHOT.jar"} onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs
NoteYou cannot add comments when you write code for the EMR MR node.
Method 2: Reference an OSS resource
The current node can reference an OSS resource by using the OSS REF method. When you run the node, DataWorks automatically loads the OSS resource specified in the node code. This method is commonly used in scenarios in which JAR dependencies are required in EMR tasks or EMR tasks need to depend on scripts.
Upload the JAR file.
Log on to the OSS console. In the top navigation bar, select a desired region. Then, in the left-side navigation pane, click Buckets.
On the Buckets page, find the desired bucket and click the bucket name to go to the Objects page.
In this example, the
onaliyun-bucket-2
bucket is used.Upload the JAR file to the created directory.
Go to the
emr/jars
directory. Click Upload Object. In the Files to Upload section, click Select Files and add theonaliyun_mr_wordcount-1.0-SNAPSHOT.jar
file. Then, click Upload Object.
Reference the JAR file.
Write code that is used to reference the JAR file.
On the configuration tab of the EMR MR node, write code that is used to reference the JAR file.
hadoop jar ossref://onaliyun-bucket-2/emr/jars/onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs
NoteThe preceding command is in the following format:
hadoop jar <Path where the JAR file to be referenced and executed is stored> <Full name of the main class to be executed> <Path where the file to be read is stored> <Path where the results are stored>
.Parameter description:
Parameter
Description
Path where the JAR file to be referenced and executed is stored
The path is in the
ossref://{endpoint}/{bucket}/{object}
format.endpoint: the endpoint of OSS. If the endpoint parameter is left empty, only a resource in an OSS bucket that resides in the same region as the current EMR cluster can be referenced.
bucket: a container that is used to store objects in OSS. Each bucket has a unique name. You can log on to the OSS console to view all buckets within the current logon account.
object: a file name or path that is stored in a bucket.
(Optional) Configure advanced parameters
You can configure specific parameters in the EMR Node Parameters section of the Properties tab. For more information about how to configure the parameters, see Spark Configuration. The following table describes the advanced parameters that can be configured for different types of EMR clusters.
DataLake cluster or custom cluster: created on the EMR on ECS page
Advanced parameter
Description
queue
The scheduling queue to which jobs are committed. Default value: default. For information about EMR YARN, see YARN schedulers.
priority
The priority. Default value: 1.
FLOW_SKIP_SQL_ANALYZE
The manner in which SQL statements are executed. Valid values:
true
: Multiple SQL statements are executed at a time.false
(default): Only one SQL statement is executed at a time.
NoteThis parameter is available only for testing in the development environment of a DataWorks workspace.
Others
You can add a custom parameter for the EMR MR node as an advanced parameter in advanced settings in the DataWorks console. When you commit the code for the EMR MR node in DataWorks, DataWorks automatically adds the custom parameter to a command in the
-D key=value
format.Hadoop cluster: created on the EMR on ECS page
Advanced parameter
Description
queue
The scheduling queue to which jobs are committed. Default value: default. For information about EMR YARN, see YARN schedulers.
priority
The priority. Default value: 1.
USE_GATEWAY
Specifies whether to use a gateway cluster to commit jobs on the current node. Valid values:
true
: Use a gateway cluster to commit jobs.false
(default): Use no gateway cluster to commit jobs. Jobs are automatically committed to the master node.
NoteIf the EMR cluster to which the node belongs is not associated with a gateway cluster but the USE_GATEWAY parameter is set to
true
, jobs may fail to be committed.Execute SQL statements
On the Debugging Configurations tab in the right-side navigation pane of the configuration tab of the node, configure the Computing Resource parameter in the Computing Resource section and configure the Resource Group parameter in the DataWorks Configurations section.
NoteYou can also configure the CUs For Computing parameter based on the resources required for task execution. The default value of this parameter is
0.25
.If you want to access a data source over the Internet or a virtual private cloud (VPC), you must use the resource group for scheduling that is connected to the data source. For more information, see Network connectivity solutions.
In the top toolbar of the configuration tab of the node, click Run to execute SQL statements.
If you want to run a task on the node on a regular basis, configure the scheduling information based on your business requirements.
After you configure the node, deploy the node. For more information, see Node/workflow release.
After you deploy the node, view the status of the node in Operation Center. For more information, see Getting started with Operation Center.
View the results
Log on to the OSS console. Then, you can view the results in the emr/datas/wordcount02/outputs directory in which the initial data is stored.
View the statistical results in the DataWorks console.
Create an EMR Hive node. For more information, see Node development.
On the EMR Hive node, create a Hive external table that is mounted to OSS. Then, use the Hive external table to read data from Hive tables in OSS. Sample code:
CREATE EXTERNAL TABLE IF NOT EXISTS wordcount02_result_tb ( `word` STRING COMMENT 'Word', `count` STRING COMMENT 'Count' ) ROW FORMAT delimited fields terminated by '\t' location 'oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs/'; SELECT * FROM wordcount02_result_tb;
The following figure shows the results.