This topic describes the SCAN syntax in Spark SQL. The SCAN syntax is available in E-MapReduce V3.23.0 and later versions.
Why is the SCAN syntax required?
The SCAN syntax defines how to read a table. Essentially, the SCAN syntax creates a logical table and specifies parameters to define how to read the source table. Why is it necessary to use the SCAN syntax to define how to read a table after the table is defined? Based on the data model design of Spark DataSource V2, a table is not confined to a batch data source or a streaming data source. In addition, Spark uses an external catalog to parse data from various data sources to support the multi-catalog design. In this context:
- When defining a table based on Spark DataSource V1, you only need to define the basic information of the table, instead of defining parameters about how to read the table. For example, when defining a Kafka data source table, you only need to define the Kafka connection address and topic. You do not need to specify parameters such as maxOffsetsPerTrigger (rate limit on the maximum number of offsets processed per trigger interval) in the table definition. These parameters are specified at runtime.
- When using an external catalog to provide the data source metadata, you do not need to explicitly create a table in a Spark session. Based on the above considerations, we provide the SCAN syntax to define how to read a data source.
This syntax is an attempt to support the data model design of Spark DataSource V2. It also implements batch and streaming queries in Spark SQL in a unified manner. For example, you can define a Kafka data source table and run two SCAN statements for batch read and streaming read.
- The SCAN syntax defines how to read a table. Essentially, the SCAN syntax creates a logical table and specifies parameters to define how to read the source table. This logical table is regarded as a temporary view instance in Spark. After you exit the Spark session, the SCAN definition is automatically deleted.
- The view defined by the SCAN syntax can only be used as a data source table, not a data output table.
- You can directly process the source table, but you can only perform batch read instead of streaming read on the table. If the source table does not have a batch read interface, an error is returned.
CREATE SCAN tbName_alias ON tbName USING queryType OPTIONS (propertyName=propertyValue[,propertyName=propertyValue]*)
You can set queryType to either of the following values:
- BATCH: performs batch read on the source table tbName and defines the temporary view tbName_alias.
- STREAM: performs streaming read on the source table tbName and defines the temporary view tbName_alias.
You can define the runtime parameters for reading the data source in the OPTIONS clause. The runtime parameters define how to read data. They may vary depending on data sources. For example, maxOffsetsPerTrigger is a runtime parameter for Kafka data sources. For more information about the runtime parameters, see Data sources.
The following table lists the common parameters that you can set for data sources in addition to the data source-specific parameters.
|watermark.column||The event time column in the table.||None|
|watermark.delayThreshold||The minimum time that the system waits before late data is discarded since the time the latest data was processed. For example, set this parameter to 1 minute or 5 hours.||None|
- Create a Log Service data source table.
spark-sql> CREATE TABLE loghub_table_intput_test(content string) > USING loghub > OPTIONS > (...)
- Process the Log Service table data offline using the BATCH method, and count the number
of data records generated till the current time.
spark-sql> CREATE SCAN loghub_table_intput_test_batch > ON loghub_table_intput_test > USING BATCH; spark-sql> SELECT COUNT(*) FROM loghub_table_intput_test_batch;
- Process the Log Service table data using the STREAM method.
spark-sql> CREATE TABLE loghub_table_output_test(content string) > USING loghub > OPTIONS > (...) spark-sql> CREATE SCAN loghub_table_intput_test_stream > ON loghub_table_intput_test > USING STREAM > OPTIONS( > "watermark.column"="data_time", > "watermark.delayThreshold"="2 minutes") -- The following example shows an invalid SELECT operation on a streaming table, which results in an error. spark-sql> SELECT COUNT(*) FROM loghub_table_test_stream; Error in query: Queries with streaming sources must be executed with writeStream.start();; spark-sql> INSERT INTO loghub_table_output_test SELECT content FROM loghub_table_intput_test_stream;