本文介绍流式查询配置的相关概念及配置参数。

查询配置

说明 流式查询配置在EMR-3.23.0及之后版本不建议使用,最新的查询配置请参见SCAN语句STREAM语句
在使用Spark SQL进行流式查询前,您需要了解以下两个概念:
  • 数据源配置:即Table的定义。

    Table的定义只包含数据源的配置,例如,Kafka数据源的连接地址和Topic名等。因为您可以对一个Table同时做多个业务无关的查询,所以Table定义中不应该包含具体的查询实例的运行配置。

  • 查询实例配置:具体每个Stream Query运行时的参数配置。

    每一个查询实例均需要单独配置。通过queryName,可以减少对查询SQL进行不必要的修改。查询实例参数设置使用的是SET语法,详情请参见配置说明

查询实例包括:
  • INSERT INTO ...
  • CREATE TABLE ... AS SELECT ...
每个查询实例的queryName均为SQL上下文中最近的一个,查询示例说明如下:
  • 情况一
    SET streaming.query.name=one_test_job
    
    -- query 1
    INSERT INTO tb_test_1 SELECT ...
    
    -- query 2
    INSERT INTO tb_test_2 SELECT ...
    
    -- 以上query1和query2的queryName都是"one_test_job",当然这是一种非法情况,每个查询实例必须有唯一queryName。
  • 情况二
    SET streaming.query.name=one_test_job_1
    SET streaming.query.name=one_test_job_2
    
    -- query 1
    CREATE TABLE tb_test_1 AS SELECT ...
    
    -- query1的queryName是"one_test_job_2"。

配置说明

配置类别 对应于DataFrame API SQL配置格式 说明 是否必选
queryName writeStream.queryName(...) SET streaming.query.name=$queryName 每个Stream Query的名称,各个Query的配置项会根据名称来区分。
option writeStream.option(...) SET spark.sql.streaming.query.options.$queryName.$optionName=$optionValue checkpointLocation:checkpoint目录。
自定义。
outputMode writeStream.outputMode(...) SET spark.sql.streaming.query.outputMode.$queryName=$outputMode output模式,默认为append模式。
trigger writeStream.trigger(...) SET spark.sql.streaming.query.trigger.$queryName=$triggerType trigger模式,默认为ProcessingTime
SET spark.sql.streaming.query.trigger.intervalMs.$queryName=$intervalMs trigger间隔,单位毫秒,默认为0。