All Products
Search
Document Center

Data Lake Analytics - Deprecated:Use Airflow to schedule Spark jobs of DLA

Last Updated:Feb 07, 2024

Airflow is a popular open source tool that is used to orchestrate and schedule various workloads as directed acyclic graphs (DAGs). You can use spark-submit and Spark SQL CLI to enable Airflow to schedule Spark jobs. The serverless Spark engine of Data Lake Analytics (DLA) provides a CLI package. This package allows you to submit Spark jobs by using spark-submit or Spark SQL CLI. To schedule Spark jobs of DLA by using Airflow, you need to only replace the Apache Spark CLI package with the DLA Spark CLI package and configure the required parameters.

Important

DLA is discontinued. AnalyticDB for MySQL supports the features of DLA and provides additional features and enhanced performance. For more information about how to use Airflow to schedule AnalyticDB for MySQL Spark jobs, see Use Airflow to schedule Spark jobs.

Prerequisites

  • Install Airflow.

    1. Install Airflow and start it. For more information, see Installation of Airflow.

    2. Run the following command to install the Airflow Spark plug-in:

      pip3 install apache-airflow-providers-apache-spark
      Note
      • You must install Python 3 before you can install the Airflow Spark plug-in.

      • When you install the apache-airflow-providers-apache-spark plug-in, PySpark that is developed by the Apache Spark community is automatically installed. If you want to uninstall PySpark, run the following command:

        pip3 uninstall pyspark
  • Download the Spark CLI package of DLA and configure the parameters.

    1. Download the Spark CLI package of DLA and configure the parameters. For more information, see Use the spark-submit CLI.

    2. Run the following command to configure the path in which spark-submit and Spark SQL commands are saved:

      export PATH=$PATH:/your/dla/spark/path/bin
      Note

      Before you start the Airflow scheduler, you must add the address of the spark-submit and Spark SQL commands to the path of the Airflow scheduler. Otherwise, the Airflow scheduler may fail to find spark-submit and Spark SQL commands.

Procedure

  1. Write the following sample code to the dla_spark_demo.py file of the Airflow DAG:

    from airflow.models import DAG
    from airflow.providers.apache.spark.operators.spark_jdbc import SparkJDBCOperator
    from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator
    from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
    from airflow.utils.dates import days_ago
    
    args = {
        'owner': 'Aliyun DLA',
    }
    
    with DAG(
        dag_id='example_dla_spark_operator',
        default_args=args,
        schedule_interval=None,
        start_date=days_ago(2),
        tags=['example'],
    ) as dag:
        dla_spark_conf = {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.resourceSpec": "medium",
            "spark.sql.hive.metastore.version": "dla",
            "spark.dla.connectors": "oss",
            "spark.hadoop.job.oss.fileoutputcommitter.enable": "true"
        }
        # [START howto_operator_spark_submit]
        submit_job = SparkSubmitOperator(
            conf = dla_spark_conf,
            application="oss://your-bucket/jar/pi.py",
            task_id="submit_job",
            verbose=True
        )
        # [END howto_operator_spark_submit]
    
        # [START howto_operator_spark_sql]
        sql_job = SparkSqlOperator(
            conn_id="spark_default",
            sql="SELECT * FROM yourdb.yourtable",
            conf=",".join([k+"="+v for k,v in dla_spark_conf.items()]),
            task_id="sql_job",
            verbose=True
        )
        # [END howto_operator_spark_sql]
    
        submit_job >> sql_job
  2. Run the Airflow DAG of the serverless Spark engine in DLA.

    Place the dla_spark_demo.py file in the dags directory in the Airflow installation directory. Then, run the DAG. For more information, see Tutorials of Airflow.

Usage notes

  • A container is the minimum unit for scheduling resources in the serverless Spark engine of DLA. You can use the spark.driver.resourceSpec and spark.executor.resourceSpec parameters to specify the container specifications of the driver and executors. The Apache Hadoop community allows you to apply for resources by specifying CPU cores and memory of the driver and executors. The Spark CLI package of DLA is compatible with the resource configuration capabilities of the Apache Hadoop community. If you specify the number of CPU cores and memory of the driver and executors, these specifications are automatically converted into the minimum container specifications that provide more CPU cores and memory than you specified. For example, if you set the executor_cores parameter to 2 and the executor_memory parameter to 5, the Spark CLI package automatically sets the spark.executor.resourceSpec parameter of DLA to medium.

  • For DLA-specific parameters, you can configure them in the spark-defaults.conf file in the conf folder. You can also use Airflow parameters to configure them. DLA-specific parameters include vcName, regionId, keyId, secretId, and ossUploadPath.

  • The serverless Spark engine can use only external tables to access metadata of DLA. Therefore, SparkJDBCOperator does not support the settings of cmd_type='jdbc_to_spark' and save_mode="overwrite".

    Note

    This issue does not exist when the serverless Spark engine of DLA accesses metadata of a self-managed Hive cluster. For more information about how to access metadata of a Hive cluster, see Access Hive clusters.

  • If you use Airflow to schedule jobs by calling the Livy API, you need to reconstruct the Livy API as the DLA Spark CLI. The DLA Spark team is developing a DLA version that is compatible with the Livy API to reduce migration costs. For more information, see Expert service.