This topic describes how to use Spark SQL to develop a streaming query job in EMR V3.23.0 or later.

Job template

-- dbName: the database name
CREATE DATABASE IF NOT EXISTS ${dbName};
USE ${dbName};

-- Create a Log Service table.
-- slsTableName: the name of the Log Service table.
-- logProjectName: the name of the Log Service project.
-- logStoreName: the name of the Logstore in Log Service.
-- accessKeyId: your AccessKey ID.
-- accessKeySecret: your AccessKey secret.
-- endpoint: the endpoint of the Logstore in Log Service.
-- When you explicitly define a Logstore field, you must specify a value of the STRING data type.
-- Reserve the following system fields: `__logProject__` (STRING), `__logStore__` (STRING), `__shard__` (INT), `__time__` (TIMESTAMP), `__topic__` (STRING), and `__source__` (STRING).
CREATE TABLE IF NOT EXISTS ${slsTableName} (col1 dataType[, col2 dataType])
USING loghub
OPTIONS (
sls.project = '${logProjectName}',
sls.store = '${logStoreName}',
access.key.id = '${accessKeyId}',
access.key.secret = '${accessKeySecret}',
endpoint = '${endpoint}');

-- Create an HDFS table and define the column fields in the table.
-- hdfsTableName: the name of the HDFS table.
-- location: the data storage path. You can store data in HDFS or Object Storage Service (OSS).
-- Supported data formats: delta, csv, json, orc, and parquet. Default value: delta.
CREATE TABLE IF NOT EXISTS ${hdfsTableName} (col1 dataType[, col2 dataType])
USING delta
LOCATION '${location}';

-- Configure the method for reading tables. Valid values: STREAM and BATCH. Default value: BATCH.
CREATE SCAN tmp_read_sls_table 
ON ${slsTableName} 
USING STREAM;

-- Create a streaming query job.
CREATE STREAM ${queryName}
OPTIONS(
outputMode='Append',
triggerType='ProcessingTime',
triggerInterval='30000',
checkpointLocation='${checkpointLocation}')
INSERT INTO ${hdfsTableName}
SELECT col1, col2
FROM tmp_read_sls_table
WHERE ${condition};
Note For more information about endpoints, see Regions and endpoints.