本ページでは、Spark SQL を使用してストリーミングジョブを開発する方法について説明します。

ジョブテンプレート

このテンプレートは E-MapReduce V3.23.0 以降のバージョンでの使用をお勧めします。
-- dbName: the name of the database where a table is to be created.
CREATE DATABASE IF NOT EXISTS ${dbName};
USE $ {dbName};

-- Create a Log Service table.
-- slsTableName: the name of the Log Service table.
-- logProjectName: the name of the Log Service project.
-- logStoreName: the name of the Logstore in Log Service.
-- accessKeyId: the AccessKey ID provided to you by Alibaba Cloud.
-- accessKeySecret: the AccessKey secret provided to you by Alibaba Cloud.
-- endpoint: the endpoint of the Logstore in Log Service. For more information, see: リージョンとエンドポイント
-- Each field in the Log Service table must be defined. Each field in the table corresponds to a field in the Logstore. All custom fields must be of the String type.
-- The following system fields are reserved: `__logProject__` (String), `__logStore__` (String), `__shard__` (Int), `__time__` (Timestamp), `__topic__` (String), and `__source__` (String).
CREATE TABLE IF NOT EXISTS ${slsTableName} (col1 dataType[, col2 dataType])
USING loghub
OPTIONS (
sls.project = '${logProjectName}',
sls.store = '${logStoreName}',
access.key.id = '${accessKeyId}',
access.key.secret = '${accessKeySecret}',
endpoint = '${endpoint}');

-- Create a Hadoop Distributed File System (HDFS) table and define the column fields in the table.
-- hdfsTableName: the name of the HDFS table.
-- location: the storage path of data. Both HDFS and OSS paths are supported.
-- Supported data formats: CSV, JSON, ORC, and Parquet. The default format is Parquet.
CREATE TABLE IF NOT EXISTS ${hdfsTableName} (col1 dataType[, col2 dataType])
USING PARQUET
LOCATION '${location}';

-- Configure the method for reading tables. Both the STREAM and BATCH methods are supported. The default method is BATCH.
CREATE SCAN tmp_read_sls_table 
ON ${slsTableName} 
USING STREAM

-- Create a streaming query job.
CREATE STREAM ${queryName}
OPTIONS(
outputMode='Append',
triggerType='ProcessingTime',
triggerInterval=30000,
checkpointLocation='${checkpointLocation}')
INSERT INTO ${hdfsTableName}
SELECT col1、col2
FROM tmp_read_sls_table
WHERE ${condition}