Airflow is a popular open source tool that is used to orchestrate and schedule various workloads as directed acyclic graphs (DAGs). You can use spark-submit and Spark SQL CLI to enable Airflow to schedule Spark jobs. The serverless Spark engine of Data Lake Analytics (DLA) provides a CLI package. This package allows you to submit Spark jobs by using spark-submit or Spark SQL CLI. To schedule Spark jobs of DLA by using Airflow, you need to only replace the Apache Spark CLI package with the DLA Spark CLI package and configure the required parameters.
DLA is discontinued. AnalyticDB for MySQL supports the features of DLA and provides additional features and enhanced performance. For more information about how to use Airflow to schedule AnalyticDB for MySQL Spark jobs, see Use Airflow to schedule Spark jobs.
Prerequisites
Install Airflow.
Install Airflow and start it. For more information, see Installation of Airflow.
Run the following command to install the Airflow Spark plug-in:
pip3 install apache-airflow-providers-apache-spark
NoteYou must install Python 3 before you can install the Airflow Spark plug-in.
When you install the apache-airflow-providers-apache-spark plug-in, PySpark that is developed by the Apache Spark community is automatically installed. If you want to uninstall PySpark, run the following command:
pip3 uninstall pyspark
Download the Spark CLI package of DLA and configure the parameters.
Download the Spark CLI package of DLA and configure the parameters. For more information, see Use the spark-submit CLI.
Run the following command to configure the path in which spark-submit and Spark SQL commands are saved:
export PATH=$PATH:/your/dla/spark/path/bin
NoteBefore you start the Airflow scheduler, you must add the address of the spark-submit and Spark SQL commands to the path of the Airflow scheduler. Otherwise, the Airflow scheduler may fail to find spark-submit and Spark SQL commands.
Procedure
Write the following sample code to the dla_spark_demo.py file of the Airflow DAG:
from airflow.models import DAG from airflow.providers.apache.spark.operators.spark_jdbc import SparkJDBCOperator from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator from airflow.utils.dates import days_ago args = { 'owner': 'Aliyun DLA', } with DAG( dag_id='example_dla_spark_operator', default_args=args, schedule_interval=None, start_date=days_ago(2), tags=['example'], ) as dag: dla_spark_conf = { "spark.driver.resourceSpec": "medium", "spark.executor.resourceSpec": "medium", "spark.sql.hive.metastore.version": "dla", "spark.dla.connectors": "oss", "spark.hadoop.job.oss.fileoutputcommitter.enable": "true" } # [START howto_operator_spark_submit] submit_job = SparkSubmitOperator( conf = dla_spark_conf, application="oss://your-bucket/jar/pi.py", task_id="submit_job", verbose=True ) # [END howto_operator_spark_submit] # [START howto_operator_spark_sql] sql_job = SparkSqlOperator( conn_id="spark_default", sql="SELECT * FROM yourdb.yourtable", conf=",".join([k+"="+v for k,v in dla_spark_conf.items()]), task_id="sql_job", verbose=True ) # [END howto_operator_spark_sql] submit_job >> sql_job
Run the Airflow DAG of the serverless Spark engine in DLA.
Place the dla_spark_demo.py file in the dags directory in the Airflow installation directory. Then, run the DAG. For more information, see Tutorials of Airflow.
Usage notes
A container is the minimum unit for scheduling resources in the serverless Spark engine of DLA. You can use the
spark.driver.resourceSpec
andspark.executor.resourceSpec
parameters to specify the container specifications of the driver and executors. The Apache Hadoop community allows you to apply for resources by specifying CPU cores and memory of the driver and executors. The Spark CLI package of DLA is compatible with the resource configuration capabilities of the Apache Hadoop community. If you specify the number of CPU cores and memory of the driver and executors, these specifications are automatically converted into the minimum container specifications that provide more CPU cores and memory than you specified. For example, if you set theexecutor_cores
parameter to 2 and theexecutor_memory
parameter to 5, the Spark CLI package automatically sets thespark.executor.resourceSpec
parameter of DLA to medium.For DLA-specific parameters, you can configure them in the spark-defaults.conf file in the conf folder. You can also use Airflow parameters to configure them. DLA-specific parameters include
vcName
,regionId
,keyId
,secretId
, andossUploadPath
.The serverless Spark engine can use only external tables to access metadata of DLA. Therefore,
SparkJDBCOperator
does not support the settings ofcmd_type='jdbc_to_spark'
andsave_mode="overwrite"
.NoteThis issue does not exist when the serverless Spark engine of DLA accesses metadata of a self-managed Hive cluster. For more information about how to access metadata of a Hive cluster, see Access Hive clusters.
If you use Airflow to schedule jobs by calling the Livy API, you need to reconstruct the Livy API as the DLA Spark CLI. The DLA Spark team is developing a DLA version that is compatible with the Livy API to reduce migration costs. For more information, see Expert service.