All Products
Search
Document Center

Spark SQL

Last Updated: Nov 03, 2020

The Spark engine, Data Lake Analytics (DLA) SQL engine, and data lake formation service share metadata.

Access the DLA metadata service by using the Spark engine

The Spark engine supports multiple metadata services. It can access self-managed HiveMetaStore (For more information, see Hive data source connection) and metadata of data lakes similar to DLA. You can use a variety of engines to access the metadata management service for data lakes similar to DLA. Metadata can be shared among multiple engines. The database and table schemas created during the execution of DLA SQL statements, metadata crawling, and one-click data warehousing in data lakes can be read and used by the Spark engine. The metadata created or modified by using Spark SQL can also be accessed by other engines. The following figure shows the relationship between Spark SQL, DLA SQL, and the metadata service.
Metadata service accessed by multiple engines of data lakes
You can log on to the DLA console and click Execute in the left-side navigation pane to view all the database and table schemas of data lakes, perform online analysis and query on the tables, and manage the permissions of RAM users on the databases and tables. The following figure shows the Execute page.
Execute page

Submit a Spark SQL job

To connect to the DLA metadata service in the DLA console, you must add the configuration item "spark.sql.hive.metastore.version": "dla" to the code when you submit a Spark SQL job. The following example demonstrates how to extract the content of the table table0 under the database 1k_tables in the data lake metadata:
{
    "sqls": [
        "select * from `1k_tables`.`table0` limit 100",
        "insert into `1k_tables`.`table0` values(1, 'test')"
    ],
    "name": "sql test",
    "jars": [
        "oss://test/hive-serde-3.1.2.jar"
    ],
    "conf": {
        "spark.dla.connectors": "oss",
        "spark.driver.resourceSpec": "small",
        "spark.sql.hive.metastore.version": "dla",
        "spark.executor.instances": 10,
        "spark.dla.job.log.oss.uri": "oss://test/spark-logs",
        "spark.executor.resourceSpec": "small"
    }
}
Notes:
  1. The "sqls" keyword allows you to submit SQL jobs without the need to submit JAR packages. This makes the operation easier and is suitable for developers who are familiar with SQL. The parameter value is an array, which allows you to execute multiple SQL statements in a job. In this example, multiple SQL statements are separated by commas (,).

  2. You can submit Spark SQL jobs in the DLA console or by using APIs. The method of submitting Spark SQL jobs by using APIs is the same as the method of submitting other types of Spark jobs.

  3. You can also submit Spark SQL jobs by using the Spark-SQL toolkit. For more information, see

    Spark-SQL tool.

Use Spark SQL in the code

You can execute SQL statements, manage metadata, and read and write tables in the DLA console. The following code uses PySpark as an example. The method of using Spark SQL in other languages is similar. You must create a Python file as shown in the following code, save the file as example.py, and then upload it to OSS.
from pyspark.sql import SparkSession

if __name__ == "__main__":
    # init pyspark context
    spark = SparkSession \
        .builder \
        .appName("Python SQL Test") \
        .getOrCreate()
    
    # create a database
    spark.sql(
            "create database if not exists dlatest comment 'c' location 'oss://test/warehouse/' WITH DBPROPERTIES(k1='v1', k2='v2')")
    # create table
    spark.sql(
            "create table dlatest.tp(col1 INT)  PARTITIONED BY (p1 STRING, p2 STRING) location 'oss://test/warehouse/tp' STORED AS parquet TBLPROPERTIES ('parquet.compress'='SNAPPY')")
    # show structure
    print(spark.sql("show create table tp").collect()[0])
    
    # insert data
    spark.sql("INSERT into tp partition(p1='a',p2='a') values(1)")
    
    # show data
    spark.sql("select * from tp").show()
Execute the following JSON code to submit a job in the DLA console:
{
    "name": "DLA SQL Test",
    "file": "oss://path/to/example.py"
    "conf": {
        "spark.driver.resourceSpec": "small",
        "spark.sql.hive.metastore.version": "dla",
        "spark.dla.connectors": "oss",
        "spark.executor.instances": 1,
        "spark.dla.job.log.oss.uri": "oss://path/to/spark-logs",
        "spark.executor.resourceSpec": "small"
    }
}
After the execution succeeds, you can find the dlatest database and its tp table on the Execute page in the DLA console.
Notice

The name of the DLA metadata service is not case-sensitive. Case sensitivity is ignored when this service references database and table names.

Precautions for reading and writing tables stored in Hive formats

When you use the one-click data warehousing service or manually execute SQL statements in the DLA console to create a table, you must specify the required JAR package to submit a Spark job if you specify the data read/write format of the table in a Hive format, such as JSON or CSV.
Notice

We recommend that you download the JAR package of Hive Serde from the official Maven repository, such as hive-serde.

If the created table is saved in a Hive format, you must submit the required JAR package as a third-party JAR package when you submit a Spark job. The following code shows an example:
{
    "name": "DLA Meta Test",
    "sqls": ["SELECT * FROM HiveDB.JsonTable"],
    "jars": [
        "oss://test/hive-serde-3.1.2.jar"
    ],
    "conf": {
        "spark.driver.resourceSpec": "small",
        "spark.sql.hive.metastore.version": "dla",
        "spark.dla.connectors": "oss",
        "spark.executor.instances": 1,
        "spark.dla.job.log.oss.uri": "oss://test/spark-logs",
        "spark.executor.resourceSpec": "small"
    }
}

Limits and precautions

1. The Spark engine is connected to the data lake metadata service. You are allowed to create only external databases and external tables, and perform read/write operations on these tables in Spark.
When you create a database, you must explicitly specify LOCATION for the database. The following SQL statement is an example:
CREATE DATABASE db1 LOCATION 'oss://test/db1/'
Similarly, when you create a table, you must explicitly specify LOCATION for the table. The following SQL statement is an example:
CREATE TABLE table1(col1 INT) LOCATION 'oss://test/db1/table1/'
Note the following items:
  • When you execute the DROP statement to delete a table or a partition of a table in the Spark engine, the files are not deleted from the OSS bucket where the table is stored.

  • The LOCATION parameter that you specified when you create a table must be the subfolder of the LOCATION parameter where the database is stored.

  • The PARTITION LOCATION parameter that you specified when you add a partition for the table must be the subfolder of the LOCATION parameter where the table is stored.

  • The RENAME PARTITION statement does not change the directory structure of OSS to which the partition belongs.

2. The Spark engine supports only external tables that are stored in OSS.

DLA supports various data stores such as ApsaraDB RDS and Tablestore. The Spark engine allows you to use the data lake metadata service to read and write external tables that are stored in OSS.
If you use Spark to create databases and tables, you must specify a valid OSS directory for LOCATION.
More data stores will be supported in the future.
3. You are not allowed to create a database named DEFAULT.
Note the following items:
  • Do not create or perform operations on a database named DEFAULT in the Spark engine.

  • Before you perform operations on the table, you must execute the USE DatabaseName statement to switch to the required database. You can also explicitly specify the database to which a table belongs, such as SELELCT * FROM db1.table1.

4. The Spark engine has some limits on ALTER statements.
You can modify the metadata of databases and tables by using statements such as ALTER DATABASE.  The Spark engine has the following limits on ALTER statements:
  • The ALTER statements can only be executed to modify the comment of a database. They cannot be executed to modify the location or properties of a database.

  • The ALTER statements can only be executed to modify the columns or properties of a table. For example, you can add a column or modify properties. Note that the column cannot be a partition key column.

5. The Spark engine does not allow you to store user-defined functions (UDFs) in the DLA metadata service.
You can create and use UDFs in the Spark code. However, these UDFs cannot be registered with the metadata server as general functions. You can create a temporary function in a Spark job and then use this function in the job. The following statement is an example:
CREATE TEMPORARY FUNCTION IF NOT EXISTS func1 as
'com.aliyun.dla.SimpleUDFExample' USING JAR '/path/to/jar1',
JAR '/path/to/jar2'
Persistent functions will be supported in later versions.
6. Some limits are imposed when you perform operations on Spark tables on the Execute page in the DLA console.
After you create database db1 and table table1 in Spark, the system explicitly checks whether the database and table exist when you perform operations on these tables on the Execute page in the DLA console. If you want to delete the database, execute the following statement:
DROP DATABASE IF EXISTS db1
7. Spark SQL does not support GRANT statements.
You are not allowed to execute a GRANT statement by using the Spark engine to modify the permissions of RAM users. This requirement is the same as that of open source communities. If you want to modify the permissions of a RAM user, execute the GRANT or REVOKE statement on the Execute page in the DLA console.