E-MapReduce (EMR) Streaming SQL nodes allow you to use SQL statements to develop streaming analytics jobs. This topic describes how to create an EMR Streaming SQL node and use the node to develop data.

Prerequisites

  • An Alibaba Cloud EMR cluster is created. The inbound rules of the security group to which the cluster belongs include the following rules:
    • Action: Allow
    • Protocol type: Custom TCP
    • Port range: 8898/8898
    • Authorization object: 100.104.0.0/16
  • An EMR compute engine instance is associated with the current workspace. The EMR folder is displayed on the DataStudio page only after you associate an EMR compute engine instance with the workspace on the Workspace Management page. For more information, see Configure a workspace.
  • A resource group is created.

    An exclusive resource group for scheduling is created. For more information, see Create and use an exclusive resource group for scheduling.

  • The hadoop.http.authentication.simple.anonymous.allowed parameter in the configurations for Hadoop File System (HDFS) of the EMR cluster is set to true in the EMR console, and the HDFS and YARN services are restarted. EMR parameters

Limits

  • An EMR Spark Streaming node can be run only on an exclusive resource group for scheduling.
  • If the exclusive resource group for scheduling and the EMR cluster that you use are created before June 10, 2021, you must submit a ticket to upgrade the resource group and EMR cluster.

Create an EMR Streaming SQL node and use the node to develop data

  1. Go to the DataStudio page.
    1. Log on to the DataWorks console.
    2. In the left-side navigation pane, click Workspaces.
    3. In the top navigation bar, select the region where your workspace resides, find the workspace, and then click Data Analytics in the Actions column.
  2. Create a workflow.
    If you have a workflow, skip this step.
    1. Move the pointer over the Create icon and select Workflow.
    2. In the Create Workflow dialog box, set the Workflow Name parameter.
    3. Click Create.
  3. Create an EMR Streaming SQL node.
    1. On the DataStudio page, move the pointer over the Create icon and choose EMR > EMR Streaming SQL.
      Alternatively, you can find the required workflow, right-click the workflow name, and then choose Create > EMR > EMR Streaming SQL.
    2. In the Create Node dialog box, set the Node Name, Node Type, and Location parameters.
      Note The node name must be 1 to 128 characters in length and can contain letters, digits, underscores (_), and periods (.).
    3. Click Commit. Then, the configuration tab of the EMR Streaming SQL node appears.
  4. Use the EMR Streaming SQL node to develop data.
    1. Select the EMR compute engine instance.
      On the configuration tab of the EMR Streaming SQL node, select the EMR compute engine instance.
    2. Write code for the EMR Streaming SQL node.
      On the configuration tab of the EMR Streaming SQL node, write code for the node. The following code provides an example:
      -- dbName: the database name. 
      CREATE DATABASE IF NOT EXISTS ${dbName};
      USE ${dbName};
      
      -- Create a Log Service table. 
      -- slsTableName: the name of the Log Service table. 
      -- logProjectName: the name of the Log Service project. 
      -- logStoreName: the name of the Logstore in Log Service. 
      -- accessKeyId: the AccessKey ID of your Alibaba Cloud account. 
      -- accessKeySecret: the AccessKey ID of your Alibaba Cloud account. 
      -- endpoint: the endpoint of the Logstore in Log Service. 
      -- When you explicitly specify a field in the Logstore, the field must be of the STRING data type. 
      -- Reserve the following system fields: `__logProject__` (STRING), `__logStore__` (STRING), `__shard__` (INT), `__time__` (TIMESTAMP), `__topic__` (STRING), and `__source__` (STRING).
      CREATE TABLE IF NOT EXISTS ${slsTableName} (col1 dataType[, col2 dataType]) 
      USING loghub
      OPTIONS (
      sls.project = '${logProjectName}',
      sls.store = '${logStoreName}',
      access.key.id = '${accessKeyId}',
      access.key.secret = '${accessKeySecret}',
      endpoint = '${endpoint}');
      
      -- Create an HDFS table and define the column fields in the table. 
      -- hdfsTableName: the name of the HDFS table. 
      -- location: the path under which data is stored. You can set this parameter to HDFS or OSS. 
      -- Supported data formats: delta, csv, json, orc, and parquet. Default value: delta. 
      CREATE TABLE IF NOT EXISTS ${hdfsTableName} (col1 dataType[, col2 dataType])
      USING delta
      LOCATION '${location}';
      
      -- The method for reading the tables. Both the STREAM and BATCH methods are supported. The default method is BATCH. 
      CREATE SCAN tmp_read_sls_table 
      ON ${slsTableName} 
      USING STREAM;
      
      -- Create a streaming query job. 
      CREATE STREAM ${queryName}
      OPTIONS(
      outputMode='Append',
      triggerType='ProcessingTime',
      triggerInterval='30000',
      checkpointLocation='${checkpointLocation}')
      INSERT INTO ${hdfsTableName}
      SELECT col1, col2
      FROM tmp_read_sls_table
      WHERE ${condition};
      For more information about EMR Streaming SQL, see Common keywords.
    3. Configure a resource group for scheduling.
      • Click the Run with Parameters icon in the top toolbar. In the Parameters dialog box, select the desired resource group for scheduling. You can also configure custom parameters based on your business requirements.
      • Click OK.
    4. Save and run the EMR Streaming SQL node.
      In the top toolbar, click the Save icon to save the EMR Streaming SQL node and click the Run icon to run the EMR Streaming SQL node.
  5. Click Advanced Settings in the right-side navigation pane. On the Advanced Settings tab, change the values of the parameters.
    • "USE_GATEWAY":true: If you set this parameter to true, the EMR Streaming SQL node is automatically committed to the master node of an EMR gateway cluster.
    • "SPARK_CONF": "--conf spark.driver.memory=2g --conf xxx=xxx": the parameters for running Spark jobs. You can configure multiple parameters in the --conf xxx=xxx format.
    • "queue": the scheduling queue to which jobs are committed. Default value: default.
    • "priority": the priority. Default value: 1.
    • "FLOW_SKIP_SQL_ANALYZE": specifies how SQL statements are executed. The value false indicates that only one SQL statement is executed at a time, and the value true indicates that multiple SQL statements are executed at a time.
  6. Configure properties for the EMR Spark Streaming node.
    If you want the system to periodically run the EMR Spark Streaming node, you can click Configure in the right-side navigation pane to configure properties for the node based on your business requirements.
    • Configure basic properties for the EMR Spark Streaming node. For more information, see Basic properties.
    • Configure time properties for the EMR Spark Streaming node.

      You can select the mode to start the node and the mode to rerun the node. For more information, see Configure time properties.

    • Configure resource properties for the EMR Spark Streaming node. For more information, see Configure the resource group.
  7. Commit and deploy the MySQL node.
    1. Click the Save icon in the top toolbar to save the node.
    2. Click the Submit icon in the top toolbar to commit the node.
    3. In the Commit Node dialog box, enter your comments in the Change description field.
    4. Click OK.
    If you use a workspace in standard mode, you must deploy the node in the production environment after you commit the node. Click Deploy in the upper-right corner. For more information, see Deploy nodes.
  8. View the EMR Spark Streaming node.
    1. Click Operation Center in the upper-right corner of the DataStudio page to go to Operation Center.
    2. View the EMR Spark Streaming node that is running. For more information, see Manage real-time computing nodes.