E-MapReduce (EMR) Streaming SQL nodes allow you to use SQL statements to develop streaming
analytics jobs. This topic describes how to create an EMR Streaming SQL node and use
the node to develop data.
Limits
- An EMR Spark Streaming node can be run only on an exclusive resource group for scheduling.
- If the exclusive resource group for scheduling and EMR cluster that you use are created
before June 10, 2021, you must submit a ticket to upgrade the resource group and the EMR cluster.
Create an EMR Streaming SQL node and use the node to develop data
- Go to the DataStudio page.
- Log on to the DataWorks console.
- In the left-side navigation pane, click Workspaces.
- In the top navigation bar, select the region where your workspace resides, find the
workspace, and then click Data Analytics in the Actions column.
- Create a workflow.
If you have a workflow, skip this step.
- Move the pointer over the
icon and select Workflow.
- In the Create Workflow dialog box, set the Workflow Name parameter.
- Click Create.
- Create an EMR Streaming SQL node.
- On the DataStudio page, move the pointer over the
icon and choose .
Alternatively, you can find the required workflow, right-click the workflow name,
and then choose .
- In the Create Node dialog box, set the Node Name, Node Type, and Location parameters.
Note The node name must be 1 to 128 characters in length and can contain letters, digits,
underscores (_), and periods (.).
- Click Commit. Then, the configuration tab of the EMR Streaming SQL node appears.
- Use the EMR Streaming SQL node to develop data.
- Select the EMR compute engine instance.
On the configuration tab of the EMR Streaming SQL node, select the EMR compute engine
instance.
- Write code for the EMR Streaming SQL node.
On the configuration tab of the EMR Streaming SQL node, write code for the node. The
following code provides an example:
-- dbName: the database name.
CREATE DATABASE IF NOT EXISTS ${dbName};
USE ${dbName};
-- Create a Log Service table.
-- slsTableName: the name of the Log Service table.
-- logProjectName: the name of the Log Service project.
-- logStoreName: the name of the Logstore in Log Service.
-- accessKeyId: the AccessKey ID of your Alibaba Cloud account.
-- accessKeySecret: the AccessKey ID of your Alibaba Cloud account.
-- endpoint: the endpoint of the Logstore in Log Service.
-- When you explicitly specify a field in the Logstore, the field must be of the STRING data type.
-- Reserve the following system fields: `__logProject__` (STRING), `__logStore__` (STRING), `__shard__` (INT), `__time__` (TIMESTAMP), `__topic__` (STRING), and `__source__` (STRING).
CREATE TABLE IF NOT EXISTS ${slsTableName} (col1 dataType[, col2 dataType])
USING loghub
OPTIONS (
sls.project = '${logProjectName}',
sls.store = '${logStoreName}',
access.key.id = '${accessKeyId}',
access.key.secret = '${accessKeySecret}',
endpoint = '${endpoint}');
-- Create an HDFS table and define the column fields in the table.
-- hdfsTableName: the name of the HDFS table.
-- location: the path under which data is stored. You can set this parameter to HDFS or OSS.
-- Supported data formats: delta, csv, json, orc, and parquet. Default value: delta.
CREATE TABLE IF NOT EXISTS ${hdfsTableName} (col1 dataType[, col2 dataType])
USING delta
LOCATION '${location}';
-- The method for reading the tables. Both the STREAM and BATCH methods are supported. The default method is BATCH.
CREATE SCAN tmp_read_sls_table
ON ${slsTableName}
USING STREAM;
-- Create a streaming query job.
CREATE STREAM ${queryName}
OPTIONS(
outputMode='Append',
triggerType='ProcessingTime',
triggerInterval='30000',
checkpointLocation='${checkpointLocation}')
INSERT INTO ${hdfsTableName}
SELECT col1, col2
FROM tmp_read_sls_table
WHERE ${condition};
For more information about EMR Streaming SQL, see
Common keywords.
- Configure a resource group for scheduling.
- Click the
icon in the top toolbar. In the Parameters dialog box, select the desired resource
group for scheduling. You can also configure custom parameters based on your business
requirements.
- Click OK.
- Save and run the EMR Streaming SQL node.
In the top toolbar, click the

icon to save the EMR Streaming SQL node and click the

icon to run the EMR Streaming SQL node.
- Click Advanced Settings in the right-side navigation pane. On the Advanced Settings tab, change the values
of the parameters.
- "USE_GATEWAY":true: If you set this parameter to true, the EMR Streaming SQL node
is automatically committed to the master node of an EMR gateway cluster.
- "SPARK_CONF": "--conf spark.driver.memory=2g --conf xxx=xxx": the parameters for running
Spark jobs. You can configure multiple parameters in the --conf xxx=xxx format.
- "queue": the scheduling queue to which jobs are committed. Default value: default.
- "priority": the priority. Default value: 1.
- "FLOW_SKIP_SQL_ANALYZE": specifies how SQL statements are executed. The value false
indicates that only one SQL statement is executed at a time, and the value true indicates
that multiple SQL statements are executed at a time.
- Configure properties for the EMR Spark Streaming node.
If you want the system to periodically run the EMR Spark Streaming node, you can click
Properties in the right-side navigation pane to configure properties for the node based on your
business requirements.
- Commit and deploy the MySQL node.
- Click the
icon in the top toolbar to save the node.
- Click the
icon in the top toolbar to commit the node.
- In the Commit Node dialog box, enter your comments in the Change description field.
- Click OK.
If you use a workspace in standard mode, you must deploy the node in the production
environment after you commit the node. Click
Deploy in the upper-right corner. For more information, see
Deploy nodes.
- View the EMR Spark Streaming node.
- Click Operation Center in the upper-right corner of the DataStudio page to go to Operation Center.
- View the EMR Spark Streaming node that is running. For more information, see Manage real-time computing nodes.