This topic describes the Data Definition Language (DDL) syntax in Spark SQL.

Syntax

CREATE TABLE tbName[(columnName dataType [,columnName dataType]*)]
USING providerName
OPTIONS(propertyName=propertyValue[,propertyName=propertyValue]*);
-- CTAS

CREATE TABLE tbName[(columnName dataType [,columnName dataType]*)]
USING providerName
OPTIONS(propertyName=propertyValue[,propertyName=propertyValue]*)
AS
queryStatement;

The CTAS statement combines the statements that are respectively used to create a table and write query results to the table. After the operations are completed, a table is created, a StreamQuery instance is generated, and the query results are written to the result table.

Note

It can be seen from the syntax that the field information of the table is optional. That is, different data source implementation methods mean different requirements for the definition of the table schema. Specifically, the following points can be mentioned:
  • If no schema is specified during table creation, the schema of the data source is automatically identified.
  • If a schema is specified during table creation, the schema must be a subset of the schema of the data source, and the data type of the two must be the same.
For example:
  • CREATE TABLE kafka_table 
    USING kafka 
    OPTIONS (
    kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",
    subscribe = "${TOPIC_NAME}",
    output.mode = "${OUTPUT_MODE}",
    kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",
    kafka.schema.record.name = "${SCHEMA_RECORD_NAME}",
    kafka.schema.record.namespace = "${SCHEMA_RECORD_NAMESPACE}");
    For the Kafka data source, if the schema of a table is not specified, the system automatically retrieves the schema definition of the corresponding topic from Kafka Schema Registry.
  • CREATE TABLE kafka_table(col1 string, col2 double)
    USING kafka 
    OPTIONS (
    kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",
    subscribe = "${TOPIC_NAME}",
    output.mode = "${OUTPUT_MODE}",
    kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",
    kafka.schema.record.name = "${SCHEMA_RECORD_NAME}",
    kafka.schema.record.namespace = "${SCHEMA_RECORD_NAMESPACE}");

    For the Kafka data source, if the schema of a table is specified, the system automatically retrieves the schema definition of the corresponding topic from Kafka Schema Registry. Then, the system checks whether the table schema information, including the field names and types, is the same as the topic schema information. We recommend that you use the same schema definition in SQL declarations as the schema of the external data store. Specifically, keep the names and types of fields in the declaration table the same as those in the external table.