This topic describes how to use Spark SQL to develop a streaming job.

Note We recommend that you do not use this template in E-MapReduce V3.23.0 and later although this template is still supported.

Query statement blocks

It is difficult to properly express some job-related parameters in SQL query statements. To resolve this issue, add a SET statement before the specific SQL query statement to set required parameters. In the SET statement, an important parameter is streaming.query.name, which indicates the name of the SQL query. Each SQL query must have a unique name. Based on this query statement, you can set other parameters such as checkpoint for each SQL query. Based on the convention, you must append the SET streaming.query.name statement before each SQL query. Otherwise, an error may be returned during the query. A valid query statement block is as follows:
SET streaming.query.name=${queryName};

queryStatement

Job template

-- dbName: the name of the database where a table is to be created.
CREATE DATABASE IF NOT EXISTS ${dbName};
USE ${dbName};

-- Create a Log Service table.
-- slsTableName: the name of the Log Service table.
-- logProjectName: the name of the Log Service project.
-- logStoreName: the name of the Logstore in Log Service.
-- accessKeyId: the AccessKey ID provided to you by Alibaba Cloud.
-- accessKeySecret: the AccessKey secret provided to you by Alibaba Cloud.
-- endpoint: the endpoint of the Logstore in Log Service. For more information, see Service endpoint.
-- Define the Logstore-related fields explicitly as the STRING type.
-- The following system fields are reserved: `__logProject__` (String), `__logStore__` (String), `__shard__` (Int), `__time__` (Timestamp), `__topic__` (String), and `__source__` (String).
CREATE TABLE IF NOT EXISTS ${slsTableName} (col1 dataType[, col2 dataType])
USING loghub
OPTIONS (
sls.project = '${logProjectName}',
sls.store = '${logStoreName}',
access.key.id = '${accessKeyId}',
access.key.secret = '${accessKeySecret}',
endpoint = '${endpoint}');

-- Create a Hadoop Distributed File System (HDFS) table and define the column fields in the table.
-- hdfsTableName: the name of the HDFS table.
-- location: the storage path of data. You can store data in HDFS or Object Storage Service (OSS).
-- Supported data formats include CSV, JSON, ORC, and Parquet. The default format is Parquet.
CREATE TABLE IF NOT EXISTS ${hdfsTableName} (col1 dataType[, col2 dataType])
USING PARQUET
LOCATION '${location}';

-- Define some parameters for running each streaming query. Such parameters include:
-- streaming.query.name: the name of the streaming query job.
-- spark.sql.streaming.checkpointLocation.${queryName}: the directory where the checkpoint file of the streaming query job is stored.
SET streaming.query.name=${queryName};
SET spark.sql.streaming.query.options.${queryName}.checkpointLocation=${checkpointLocation};
-- The following parameters are optional and can be defined as required:
-- outputMode: the output mode of the query result. Default value: append.
-- trigger: the execution mode of the query. Default value: ProcessingTime. Currently, you can only set this parameter to ProcessingTime.
-- trigger.intervalMs: the interval between queries. Unit: milliseconds. Default value: 0.
-- SET spark.sql.streaming.query.outputMode.${queryName}=${outputMode};
SET spark.sql.streaming.query.trigger.${queryName}=ProcessingTime;
SET spark.sql.streaming.query.trigger.intervalMs.${queryName}=30;

INSERT INTO ${hdfsTableName}
SELECT col1, col2
FROM ${slsTableName}
WHERE ${condition};

Parameters

The following table describes several key parameters.

Parameter Description Default value
streaming.query.name The name of the streaming query job. This parameter must be explicitly configured.
spark.sql.streaming.query.options.${queryName}.checkpointLocation The directory where the checkpoint file of the streaming query job is stored. This parameter must be explicitly configured.
spark.sql.streaming.query.outputMode.${queryName} The output mode of the query result. append
spark.sql.streaming.query.trigger.${queryName} The execution mode of the query. Currently, you can only set this parameter to ProcessingTime. ProcessingTime
spark.sql.streaming.query.trigger.intervalMs.${queryName} The interval between queries. Unit: milliseconds. 0