This topic describes how to use Spark SQL to develop a streaming job.

Note We recommend that you do not use this template in E-MapReduce V3.23.0 or later.

Query statement block

Some job parameters such as streaming.query.name cannot be expressed in SQL statements. Therefore, you must first use the SET statement to specify these parameters.

The following code shows a valid query statement block:
SET streaming.query.name=${queryName};
queryStatement

Job template

-- Create a database.
-- dbName: the database name.
CREATE DATABASE IF NOT EXISTS ${dbName};
USE ${dbName};

-- Create a Log Service table.
-- slsTableName: the name of the Log Service table.
-- logProjectName: the name of the Log Service project.
-- logStoreName: the name of the Logstore in Log Service.
-- accessKeyId: your AccessKey ID.
-- accessKeySecret: your AccessKey secret.
-- endpoint: the endpoint of the Logstore in Log Service.
-- When you explicitly define a Logstore field, you must specify a value of the STRING data type.
-- Reserve the following system fields: `__logProject__` (STRING), `__logStore__` (STRING), `__shard__` (INT), `__time__` (TIMESTAMP), `__topic__` (STRING), and `__source__` (STRING).
CREATE TABLE IF NOT EXISTS ${slsTableName} (col1 dataType[, col2 dataType])
USING loghub
OPTIONS (
sls.project = '${logProjectName}',
sls.store = '${logStoreName}',
access.key.id = '${accessKeyId}',
access.key.secret = '${accessKeySecret}',
endpoint = '${endpoint}');

-- Create an HDFS table and define the column fields in the table.
-- hdfsTableName: the name of the HDFS table.
-- location: the data storage path. You can store data in HDFS or Object Storage Service (OSS).
-- Supported data formats: delta, csv, json, orc, and parquet. Default value: delta.
CREATE TABLE IF NOT EXISTS ${hdfsTableName} (col1 dataType[, col2 dataType])
USING delta
LOCATION '${location}';

-- Define parameters for running a streaming query:
-- streaming.query.name: the name of the streaming query job.
-- spark.sql.streaming.checkpointLocation.${queryName}: the directory where the checkpoint file of the streaming query job is stored.
SET streaming.query.name=${queryName};
SET spark.sql.streaming.query.options.${queryName}.checkpointLocation=${checkpointLocation};
-- Optional parameters:
-- outputMode: the output mode of the query result. Default value: append.
-- trigger: the execution mode of the query. Default value: ProcessingTime. You can set this parameter only to ProcessingTime.
-- trigger.intervalMs: the interval between queries. Unit: milliseconds. Default value: 0.
-- SET spark.sql.streaming.query.outputMode.${queryName}=${outputMode};
SET spark.sql.streaming.query.trigger.${queryName}=ProcessingTime;
SET spark.sql.streaming.query.trigger.intervalMs.${queryName}=30;

INSERT INTO ${hdfsTableName}
SELECT col1, col2
FROM ${slsTableName}
WHERE ${condition};
Note For more information about endpoints, see Service endpoint.

Parameters

The following table describes the key parameters.
Parameter Description Default value
streaming.query.name The name of the streaming query job. This parameter must be explicitly configured.
spark.sql.streaming.query.options.${queryName}.checkpointLocation The directory where the checkpoint file of the streaming query job is stored. This parameter must be explicitly configured.
spark.sql.streaming.query.outputMode.${queryName} The output mode of the query result. append
spark.sql.streaming.query.trigger.${queryName} The execution mode of the query. You can set this parameter only to ProcessingTime. ProcessingTime
spark.sql.streaming.query.trigger.intervalMs.${queryName} The interval between queries. Unit: milliseconds. 0