All Products
Search
Document Center

Spark SQL

Last Updated: Apr 30, 2021

The serverless Spark and Presto engines of Data Lake Analytics (DLA) and Data Lake Formation share metadata with each other.

Access the metadata service of DLA

The serverless Spark engine of DLA supports a variety of metadata services. This engine allows you to access the metadata service of self-managed Hive databases and the metadata service of DLA. The metadata service of DLA allows multiple engines to access metadata in DLA. This enables these engines to share the metadata with each other. When you use DLA to perform specified operations, the serverless Spark engine can read data from and use the database and table schemas that are created in the process. The operations include executing SQL statements, triggering a metadata discovery task, and creating a data warehouse by synchronizing full data within T+1 days. The metadata that is created or modified by using Spark SQL can also be accessed by other engines. The following figure shows the relationship between Spark SQL, DLA SQL, and the metadata service of DLA.
You can log on to the DLA console. In the left-side navigation pane, choose Serverless Presto > Execute to view all the database and table schemas in DLA, analyze and query table data online, and manage the permissions of RAM users on the databases and tables. The following figure shows the Execute page.
Execute page

Submit a Spark SQL job

To access the metadata service of DLA, you need only to add the configuration item spark.sql.hive.metastore.version": "dla when you submit a job in the DLA console. The following example demonstrates how to extract data from the table0 table in the 1k_tables database in the DLA metadata:
{
    "sqls": [
        "select * from `1k_tables`.`table0` limit 100",
        "insert into `1k_tables`.`table0` values(1, 'test')"
    ],
    "name": "sql test",
    "jars": [
        "oss://test/hive-serde-3.1.2.jar"
    ],
    "conf": {
        "spark.dla.connectors": "oss",
        "spark.driver.resourceSpec": "small",
        "spark.sql.hive.metastore.version": "dla",
        "spark.sql.catalogImplementation": "hive",
        "spark.executor.instances": 10,
        "spark.dla.job.log.oss.uri": "oss://test/spark-logs",
        "spark.executor.resourceSpec": "small"
    }
}
Note
  • The sqls parameter allows you to submit SQL jobs without the need to submit JAR packages. This makes the operation easier for developers who are familiar with SQL. The value of this parameter is an array. This allows you to execute multiple SQL statements in a job. Multiple SQL statements are separated by commas (,).

  • You can also submit Spark SQL jobs by using open APIs. The method to submit Spark SQL jobs is the same as the method to submit Spark jobs of other types.

  • DLA also provides a Spark-SQL command-line tool to submit Spark SQL jobs. For more information about the tool, see Use the Spark-SQL command-line tool.

Syntax of the sqls parameter

You can call SparkSession.sql to execute SQL statements by using one of the following methods:

  • Specify SQL statements in the sqls parameter. After you submit a job, DLA extracts the statements from the sqls parameter and provides these statements to the template class of SQL statements. Then, the template class calls SparkSession.sql to execute these statements.

    Before you specify SQL statements in the sqls parameter, make sure that the characters in the statements comply with the JSON syntax. This is because the SQL statements are included in a JSON character string. Therefore, you must escape the following symbols before you enter the statements in the sqls parameter:

    Symbol

    Symbol after escaping

    Double quotation marks (")

    \"

    Line feed

    \n

    Carriage return

    \r

    Tab character

    \t

    Backslash (\) (followed by unescaped characters such as d and w)

    \\\\

    Note

    If you want to write the REGEX '\d+' regular expression in SQL statements, enter REGEX '\\\\d+' in the sqls parameter. This is because backslashes (\) in the JSON character string need to be escaped once. The regular expression that is submitted to SparkSession.sql is REGEX '\\d+'. Then, SparkSession.sql escapes the backslashes (\) in the REGEX '\\d+' regular expression. The final regular expression obtained by the serverless Spark engine is REGEX '\d+'.

    Example:

    // The source SQL file.
    select * from t where 
    col  regex  "\d+"
    
    // The SQL file after escaping.
    {
      "sqls":["select * from t where \n col regex \"\\\\d+\""],
      "conf":{}
    }
  • Specify the OSS directory in the sqls parameter. We recommend that you use this method. The OSS directory stores an SQL file. The file contains the SQL statements that you want to execute. The SQL statements in the SQL file are separated by semicolons (;). The template class of SQL statements downloads SQL statements in the directory and submits these statements to SparkSession.sql. Then, SparkSession.sql executes these statements.

    SQL statements are directly stored in OSS and can be downloaded by the template class of SQL statements. Data transmission in the JSON format is not required. Therefore, you do not need to perform special processing on double quotation marks ("), line feeds, carriage returns, or tab characters. However, regular expressions must comply with the specifications of Spark SQL statements.

  • Example:

    // The source SQL file.
    select * from t where 
    col  regex  "\d+"
    
    // The SQL file after escaping.
    select * from t where 
    col  regex  "\\d+"
    
    {
      "sqls":["oss://path/to/your.sql"],
      "conf":{}
    }

Use Spark SQL in code

You can execute SQL statements, manage metadata, and read data from and write data to tables in Spark SQL. The following code uses PySpark as an example. The method of using Spark SQL in other languages is similar. Before you use Spark SQL in code, you must create the example.py file and upload the file to OSS.
from pyspark.sql import SparkSession

if __name__ == "__main__":
    # init pyspark context
    spark = SparkSession \
        .builder \
        .appName("Python SQL Test") \
        .getOrCreate()
    
    # create a database
    spark.sql(
            "create database if not exists dlatest comment 'c' location 'oss://test/warehouse/' WITH DBPROPERTIES(k1='v1', k2='v2')")
    # create table
    spark.sql(
            "create table dlatest.tp(col1 INT)  PARTITIONED BY (p1 STRING, p2 STRING) location 'oss://test/warehouse/tp' STORED AS parquet TBLPROPERTIES ('parquet.compress'='SNAPPY')")
    # show structure
    print(spark.sql("show create table tp").collect()[0])
    
    # insert data
    spark.sql("INSERT into tp partition(p1='a',p2='a') values(1)")
    
    # show data
    spark.sql("select * from tp").show()
Run the following JSON code to submit a job in the DLA console:
{
    "name": "DLA SQL Test",
    "file": "oss://path/to/example.py"
    "conf": {
        "spark.driver.resourceSpec": "small",
        "spark.sql.hive.metastore.version": "dla",
        "spark.sql.catalogImplementation": "hive",
        "spark.dla.connectors": "oss",
        "spark.executor.instances": 1,
        "spark.dla.job.log.oss.uri": "oss://path/to/spark-logs",
        "spark.executor.resourceSpec": "small"
    }
}
After the job is submitted, you can find the dlatest database and the tp table in the database on the Execute page in the DLA console.
Notice

The name of the DLA metadata service is not case-sensitive. Case sensitivity is ignored for the database and table names that are referenced by this service.

Precautions for reading data from and writing data to tables stored in Hive formats

When you submit a Spark job in the following scenarios, the related JAR packages are required: 1. You use the one-click data warehousing service. 2. You manually execute SQL statements in the DLA console to create tables. DLA reads data from and writes data to the tables in a Hive format, such as JSON or CSV.
Notice

We recommend that you download the Hive Serde JAR package from the official Maven repository.

If the table that you created is saved in a specific Hive format, you must submit the required JAR package as a third-party JAR package when you submit a Spark job. The following code is an example:
{
    "name": "DLA Meta Test",
    "sqls": ["SELECT * FROM HiveDB.JsonTable"],
    "jars": [
        "oss://test/hive-serde-3.1.2.jar"
    ],
    "conf": {
        "spark.driver.resourceSpec": "small",
        "spark.sql.hive.metastore.version": "dla",
        "spark.sql.catalogImplementation": "hive",
        "spark.dla.connectors": "oss",
        "spark.executor.instances": 1,
        "spark.dla.job.log.oss.uri": "oss://test/spark-logs",
        "spark.executor.resourceSpec": "small"
    }
}

Limits and precautions

1. Spark SQL allows you to create only external databases and external tables and perform read and write operations on these tables.
If you connect Spark SQL to the DLA metadata service, you can create only external tables and perform read and write operations on these tables.
When you create a database, you must explicitly specify LOCATION for the database. Sample statement:
CREATE DATABASE db1 LOCATION 'oss://test/db1/'
Similarly, when you create a table, you must explicitly specify LOCATION for the table. Sample statement:
CREATE TABLE table1(col1 INT) LOCATION 'oss://test/db1/table1/'
Take note of the following items:
  • When you use Spark SQL to drop a DLA table or partition of a DLA table, the OSS table that is mapped to the DLA table is not deleted.

  • When you create a table, LOCATION specified for the table must be a subfolder in the directory specified by LOCATION of the database.

  • When you add a partition to a table, LOCATION specified for the partition must be a subfolder in the directory specified by LOCATION of the table.

  • When you execute the RENAME PARTITION statement, the OSS directory structure does not change.

2. Spark SQL supports only external tables that are stored in OSS.

The SQL statements of DLA support data from multiple data store services, such as ApsaraDB RDS and Tablestore. When you use the metadata service in Spark SQL, you can read data from and write data to the external tables that are stored in OSS.
If you use Spark SQL to create databases and tables, you must specify a valid OSS directory for LOCATION.
More data stores will be supported in the future.
3. You cannot create a database named DEFAULT.
When this rule applies, take note of the following items:
  • You cannot create or perform operations on a database named DEFAULT in Spark SQL.

  • Before you execute SQL statements in Spark SQL, you must execute the USE DatabaseName statement to switch to the destination database. You can also explicitly specify the database to which a table belongs in a SELECT statement, such as SELELCT * FROM db1.table1.

4. Spark SQL has some limits on ALTER statements.
You can modify the metadata of databases and tables by using ALTER statements such as ALTER DATABASE. Spark SQL has the following limits on ALTER statements:
  • The ALTER statements can only be executed to modify the comment of a database. The statements cannot be executed to modify the location or properties of a database.

  • The ALTER statements can only be executed to modify the columns or properties of a table. For example, you can add a column or modify properties. In this case, the columns cannot be partition key columns.

5. Spark SQL does not allow you to store user-defined functions (UDFs) in the DLA metadata service.
You can create and use UDFs in the Spark code. However, these UDFs cannot be registered with the DLA metadata service as general functions. You can create a temporary function for a Spark job and then use this function in the job. Sample statement used to create a temporary function:
CREATE TEMPORARY FUNCTION IF NOT EXISTS func1 as
'com.aliyun.dla.SimpleUDFExample' USING JAR '/path/to/jar1',
JAR '/path/to/jar2'
Persistent functions will be supported in later versions.
6. Some limits are imposed when you execute SQL statements on Spark tables in the DLA console.
After you create the db1 database and the table1 table in Spark SQL, DLA must explicitly check whether the database or table exists when you execute SQL statements on the database or table. If you want to drop the database, you must execute the following statement:
DROP DATABASE IF EXISTS db1
7. Spark SQL does not support GRANT statements.
You cannot execute a GRANT statement in the serverless Spark engine to modify the permissions of RAM users. This rule is defined by the Apache Spark community. To modify the permissions of RAM users, you can execute GRANT or REVOKE statements on the Execute page in the DLA console.