Airflow is a popular open source tool that is used to orchestrate and schedule various workflows as directed acyclic graphs (DAGs). You can use spark-submit and Spark SQL CLI to enable Airflow to schedule Spark jobs. The serverless Spark engine of Data Lake Analytics (DLA) provides a CLI package. This package allows you to submit Spark jobs by using spark-submit or Spark SQL CLI. To schedule Spark jobs of DLA by using Airflow, you need only to replace the Apache Spark CLI package with the DLA Spark CLI package and configure the required parameters.

Preparations

  • Install Airflow.
    1. Install Airflow and start it. For more information, see the documentation provided by the Apache Airflow community.
    2. Run the following command to install the Airflow Spark plug-in:
      pip3 install apache-airflow-providers-apache-spark
      Note
      • You must install Python 3 before you can install the Airflow Spark plug-in apache-airflow-providers-apache-spark.
      • When you install the apache-airflow-providers-apache-spark plug-in, PySpark developed by the Apache Spark community is automatically installed. If you need to uninstall PySpark, run the following command:
        pip3 uninstall pyspark
  • Download the Spark CLI package of DLA and configure the related parameters.
    1. Download the Spark CLI package of DLA and configure the related parameters. For more information, see Use the spark-submit CLI.
    2. Run the following command to configure the path where spark-submit and Spark SQL commands are saved:
      export PATH=$PATH:/your/dla/spark/path/bin
      Note Before you start the Airflow scheduler, you need to add spark-submit and Spark SQL commands to the path. Otherwise, the Airflow scheduler may fail to find spark-submit and Spark SQL commands.

Procedure

  1. Write the following code to the dla_spark_demo.py file of the Airflow DAG.
    from airflow.models import DAG
    from airflow.providers.apache.spark.operators.spark_jdbc import SparkJDBCOperator
    from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator
    from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
    from airflow.utils.dates import days_ago
    
    args = {
        'owner': 'Aliyun DLA',
    }
    
    with DAG(
        dag_id='example_dla_spark_operator',
        default_args=args,
        schedule_interval=None,
        start_date=days_ago(2),
        tags=['example'],
    ) as dag:
        dla_spark_conf = {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.resourceSpec": "medium",
            "spark.sql.hive.metastore.version": "dla",
            "spark.dla.connectors": "oss",
            "spark.hadoop.job.oss.fileoutputcommitter.enable": "true"
        }
        # [START howto_operator_spark_submit]
        submit_job = SparkSubmitOperator(
            conf = dla_spark_conf,
            application="oss://your-bucket/jar/pi.py",
            task_id="submit_job",
            verbose=True
        )
        # [END howto_operator_spark_submit]
    
        # [START howto_operator_spark_sql]
        sql_job = SparkSqlOperator(
            conn_id="spark_default",
            sql="SELECT * FROM yourdb.yourtable",
            conf=",".join([k+"="+v for k,v in dla_spark_conf.items()]),
            task_id="sql_job",
            verbose=True
        )
        # [END howto_operator_spark_sql]
    
        submit_job >> sql_job
  2. Run the Airflow DAG of the serverless Spark engine in DLA.
    Place the dla_spark_demo.py file in the dags directory under the Airflow installation directory. Then, run the DAG. For more information, see the documentation provided by the Apache Airflow community.

Usage notes

  • A container is the minimum unit for scheduling resources in the serverless Spark engine of DLA. You can use the spark.driver.resourceSpec and spark.executor.resourceSpec parameters to specify the container specifications of the driver and executors. The Apache Hadoop community allows you to apply for resources by specifying CPU cores and memory of the driver and executors. The Spark CLI package of DLA is compatible with the resource configuration capabilities of the Apache Hadoop community. If you specify the number of CPU cores and memory of the driver and executors, these specifications are automatically converted into the minimum container specifications that provide more CPU cores and memory than you specified. For example, if the executor_cores parameter is set to 2 and the executor_memory parameter is set to 5, the Spark CLI package automatically sets the spark.executor.resourceSpec parameter of DLA to medium based on the parameter settings.
  • For DLA-specific parameters, you can configure them in the spark-defaults.conf file under the conf folder. You can also use Airflow parameters to configure them. DLA-specific parameters include vcName, regionId, keyId, secretId, and ossUploadPath.
  • The serverless Spark engine can use only external tables to access metadata of DLA. Therefore, SparkJDBCOperator does not support the settings of cmd_type='jdbc_to_spark' and save_mode="overwrite".
    Note This issue does not exist when the serverless Spark engine of DLA accesses metadata of a self-managed Hive cluster. For more information about how to access metadata of a Hive cluster, see Hive.
  • If you use Airflow to schedule jobs by calling the Livy API, you need to reconstruct the Livy API as the DLA Spark CLI. The DLA Spark team is developing a DLA version that is compatible with the Livy API to reduce migration costs. For more information, contact Expert service.