This topic describes how to use Spark SQL to develop a streaming job.

Job template

Note : We recommend that you use this template in E-MapReduce V3.23.0 and later.
-- dbName: the name of the database where a table is to be created.
CREATE DATABASE IF NOT EXISTS ${dbName};
USE ${dbName};

-- Create a Log Service table.
-- slsTableName: the name of the Log Service table.
-- logProjectName: the name of the Log Service project.
-- logStoreName: the name of the Logstore in Log Service.
-- accessKeyId: the AccessKey ID provided to you by Alibaba Cloud.
-- accessKeySecret: the AccessKey secret provided to you by Alibaba Cloud.
-- endpoint: the endpoint of the Logstore in Log Service. For more information, see Regions and endpoints.
-- Define the Logstore-related fields explicitly as the STRING type.
-- The following system fields are reserved: `__logProject__` (String), `__logStore__` (String), `__shard__` (Int), `__time__` (Timestamp), `__topic__` (String), and `__source__` (String).
CREATE TABLE IF NOT EXISTS ${slsTableName} (col1 dataType[, col2 dataType])
USING loghub
OPTIONS (
sls.project = '${logProjectName}',
sls.store = '${logStoreName}',
access.key.id = '${accessKeyId}',
access.key.secret = '${accessKeySecret}',
endpoint = '${endpoint}');

-- Create a Hadoop Distributed File System (HDFS) table and define the column fields in the table.
-- hdfsTableName: the name of the HDFS table.
-- location: the storage path of data. You can store data in HDFS or Object Storage Service (OSS).
-- Supported data formats include CSV, JSON, ORC, and Parquet. The default format is Parquet.
CREATE TABLE IF NOT EXISTS ${hdfsTableName} (col1 dataType[, col2 dataType])
USING PARQUET
LOCATION '${location}';

-- Configure the method for reading tables. Both the STREAM and BATCH methods are supported. The default method is BATCH.
CREATE SCAN tmp_read_sls_table 
ON ${slsTableName} 
USING STREAM;

-- Create a streaming query job.
CREATE STREAM ${queryName}
OPTIONS(
outputMode='Append',
triggerType='ProcessingTime',
triggerInterval='30000',
checkpointLocation='${checkpointLocation}')
INSERT INTO ${hdfsTableName}
SELECT col1, col2
FROM tmp_read_sls_table
WHERE ${condition};