All Products
Search
Document Center

AnalyticDB:Use Airflow to schedule Spark jobs

Last Updated:May 12, 2025

Airflow is a popular open source tool used to orchestrate and schedule various workflows as directed acyclic graphs (DAGs). You can schedule Spark jobs by using the Spark Airflow Operator or the spark-submit command-line tool. This topic describes how to use Airflow to schedule AnalyticDB for MySQL Spark jobs.

Prerequisites

Schedule Spark SQL jobs

AnalyticDB for MySQL allows you to execute Spark SQL in batch or interactive mode. The schedule procedure varies based on the execution mode.

Batch mode

Spark Airflow Operator

  1. Run the following command to install the Airflow Spark plug-in:

    pip install https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20230608/qvjf/adb_spark_airflow-0.0.1-py3-none-any.whl
  2. Create a connection. Example:

    {
      "auth_type": "AK",
      "access_key_id": "",
      "access_key_secret": "",
      "region": ""
    }

    The following table describes the parameters.

    Parameter

    Description

    auth_type

    The authentication method. Set the value to AK, which specifies that AccessKey pairs are used for authentication.

    access_key_id

    The AccessKey ID of your Alibaba Cloud account or a Resource Access Management (RAM) user that has access permissions on AnalyticDB for MySQL.

    For information about how to obtain an AccessKey ID and an AccessKey secret, see Accounts and permissions.

    access_key_secret

    The AccessKey secret of your Alibaba Cloud account or a RAM user that has access permissions on AnalyticDB for MySQL.

    For information about how to obtain an AccessKey ID and an AccessKey secret, see Accounts and permissions.

    region

    The region ID of the AnalyticDB for MySQL cluster.

  3. Create a declaration file for an Airflow DAG. In this example, a file named spark_dags.py is created.

    from datetime import datetime
    
    from airflow.models.dag import DAG
    from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator
    
    with DAG(
        dag_id="my_dag_name",
        default_args={"cluster_id": "", "rg_name": "", "region": ""},
    ) as dag:
    
        spark_sql = AnalyticDBSparkSQLOperator(
            task_id="task2",
            sql="SHOW DATABASES;"
        )
    
        spark_sql
    

    The following tables describe the parameters.

    DAG configuration parameters

    Parameter

    Required

    Description

    dag_id

    Yes

    The name of the DAG. You can enter a custom name.

    default_args

    Yes

    • cluster_id: the ID of the AnalyticDB for MySQL cluster.

    • rg_name: the name of the job resource group in the AnalyticDB for MySQL cluster.

    • region: the region ID of the AnalyticDB for MySQL cluster.

    For more information about optional parameters, see DAG parameters.

    AnalyticDBSparkSQLOperator configuration parameters

    Parameter

    Required

    Description

    task_id

    Yes

    The job ID.

    SQL

    Yes

    The Spark SQL statement.

    For more information about optional parameters, see Airflow parameters.

  4. Store the spark_dags.py file in the folder where the Airflow configuration declaration file dags_folder is located.

  5. Execute the DAG. For more information, see Airflow documentation.

Spark-submit

Note

You can configure specific parameters of AnalyticDB for MySQL in the spark-defaults.conf file that is stored in the conf folder of AnalyticDB for MySQL Spark. You can also use Airflow parameters to configure the parameters. The specific parameters of AnalyticDB for MySQL include clusterId, regionId, keyId, secretId, and ossUploadPath. For more information, see the "Spark application configuration parameters" section of the Use spark-submit to develop Spark applications topic.

  1. Run the following command to install the Airflow Spark plug-in:

    pip3 install apache-airflow-providers-apache-spark
    Important
    • You must install Python 3 before you can install the Airflow Spark plug-in.

    • When you install the apache-airflow-providers-apache-spark plug-in, PySpark, which is developed by the Apache Spark community, is automatically installed. If you want to uninstall PySpark, run the following command:

    • pip3 uninstall pyspark
  2. Download AnalyticDB for MySQL spark-submit and configure the parameters.

  3. Run the following command to add the address of AnalyticDB for MySQL spark-submit to the path of Airflow:

    export PATH=$PATH:
    Important

    Before you start Airflow, you must add the address of AnalyticDB for MySQL spark-submit to the path of Airflow. Otherwise, the system may fail to find spark-submit to schedule jobs.

  4. Create a declaration file for an Airflow DAG to declare a Spark workflow. In this example, a file named demo.py is created.

    from airflow.models import DAG
    from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator
    from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
    from airflow.utils.dates import days_ago
    args = {
        'owner': 'Aliyun ADB Spark',
    }
    with DAG(
        dag_id='example_spark_operator',
        default_args=args,
        schedule_interval=None,
        start_date=days_ago(2),
        tags=['example'],
    ) as dag:
        adb_spark_conf = {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.resourceSpec": "medium"
        }
        # [START howto_operator_spark_submit]
        submit_job = SparkSubmitOperator(
            conf=adb_spark_conf,
            application="oss:///jar/pi.py",
            task_id="submit_job",
            verbose=True
        )
        # [END howto_operator_spark_submit]
        # [START howto_operator_spark_sql]
        sql_job = SparkSqlOperator(
            conn_id="spark_default",
            sql="SELECT * FROM yourdb.yourtable",
            conf=",".join([k + "=" + v for k, v in adb_spark_conf.items()]),
            task_id="sql_job",
            verbose=True
        )
        # [END howto_operator_spark_sql]
        submit_job >> sql_job
    
  5. Store the demo.py file in the dags folder of the Airflow installation directory.

  6. Execute the DAG. For more information, see Airflow documentation.

Interactive mode

  1. Obtain the connection URL of the Spark interactive resource group.

    1. Log on to the AnalyticDB for MySQL console. In the upper-left corner of the console, select a region. In the left-side navigation pane, click Clusters. On the Enterprise Edition, Basic Edition, or Data Lakehouse Edition tab, find the cluster that you want to manage and click the cluster ID.

    2. In the left-side navigation pane, choose Cluster Management > Resource Management. On the page that appears, click the Resource Groups tab.

    3. Find the Spark interactive resource group that you created and click Details in the Actions column to view the internal or public connection URL of the resource group. You can click the image icon within the parentheses next to the corresponding port number to copy the connection URL.

      You must click Apply for Endpoint next to Public Endpoint to manually apply for a public endpoint in the following scenarios:

      • The client tool that is used to submit a Spark SQL job is deployed on an on-premises or external server.

      • The client tool that is used to submit a Spark SQL job is deployed on an Elastic Compute Service (ECS) instance that resides in a different virtual private cloud (VPC) from your AnalyticDB for MySQL cluster.

  2. Install the apache-airflow-providers-apache-hive and apache-airflow-providers-common-sql dependencies.

  3. Access the Airflow web interface. In the top navigation bar, choose Admin > Connections.

  4. Click the image button. On the Add Connections page, configure the parameters that are described in the following table.

    Parameter

    Description

    Connection Id

    The name of the connection. In this example, adb_spark_cluster is used.

    Connection Type

    The type of the connection. Select Hive Server 2 Thrift.

    Host

    The endpoint obtained in Step 1. Replace default in the endpoint with the actual name of the database and delete the resource_group=<resource group name> suffix from the endpoint.

    Example: jdbc:hive2://amv-t4naxpqk****sparkwho.ads.aliyuncs.com:10000/adb_demo.

    Schema

    The name of the database. In this example, adb_demo is used.

    Login

    The names of the database account and the resource group in the AnalyticDB for MySQL cluster. Format: resource_group_name/database_account_name.

    Example: spark_interactive_prod/spark_user.

    Password

    The password of the database account of the AnalyticDB for MySQL cluster.

    Port

    The port number for Spark interactive resource groups. Set the value to 10000.

    Extra

    The authentication method. Enter the following content, which specifies that username and password authentication is used:

    {
      "auth_mechanism": "CUSTOM"
    }
  5. Write a DAG file.

    from airflow import DAG
    from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
    from datetime import datetime
    
    default_args = {
        'owner': 'airflow',
        'start_date': datetime(2025, 2, 10),
        'retries': 1,
    }
    
    dag = DAG(
        'adb_spark_sql_test',
        default_args=default_args,
        schedule_interval='@daily',
    )
    
    
    jdbc_query = SQLExecuteQueryOperator(
        task_id='execute_spark_sql_query', 
        conn_id='adb_spark_cluster',  
        sql='show databases',  
        dag=dag
    )
    
    jdbc_query

    The following table describes the parameters.

    Parameter

    Required

    Description

    task_id

    Yes

    The job ID. You can enter a custom ID.

    conn_id

    Yes

    The name of the connection. Enter the ID of the connection that you created in Step 4.

    sql

    Yes

    The Spark SQL statement.

    For more information about optional parameters, see Airflow parameters.

  6. On the Airflow web interface, click the image button next to the DAG.

Schedule Spark JAR jobs

Spark Airflow Operator

  1. Run the following command to install the Airflow Spark plug-in:

    pip install https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20230608/qvjf/adb_spark_airflow-0.0.1-py3-none-any.whl
  2. Create a connection. Example:

    {
      "auth_type": "AK",
      "access_key_id": "",
      "access_key_secret": "",
      "region": ""
    }

    The following table describes the parameters.

    Parameter

    Description

    auth_type

    The authentication method. Set the value to AK, which specifies that AccessKey pairs are used for authentication.

    access_key_id

    The AccessKey ID of your Alibaba Cloud account or a Resource Access Management (RAM) user that has access permissions on AnalyticDB for MySQL.

    For information about how to obtain an AccessKey ID and an AccessKey secret, see Accounts and permissions.

    access_key_secret

    The AccessKey secret of your Alibaba Cloud account or a RAM user that has access permissions on AnalyticDB for MySQL.

    For information about how to obtain an AccessKey ID and an AccessKey secret, see Accounts and permissions.

    region

    The region ID of the AnalyticDB for MySQL cluster.

  3. Create a declaration file for an Airflow DAG. In this example, a file named spark_dags.py is created.

    from datetime import datetime
    
    from airflow.models.dag import DAG
    from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkBatchOperator
    from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator
    
    with DAG(
        dag_id=DAG_ID,
        default_args={"cluster_id": "your cluster", "rg_name": "your resource group", "region": "your region"},
    ) as dag:
        spark_pi = AnalyticDBSparkBatchOperator(
            task_id="task1",
            file="local:///tmp/spark-examples.jar",
            class_name="org.apache.spark.examples.SparkPi",
        )
    
        spark_lr = AnalyticDBSparkBatchOperator(
            task_id="task2",
            file="local:///tmp/spark-examples.jar",
            class_name="org.apache.spark.examples.SparkLR",
        )
    
        spark_pi >> spark_lr
    
        from tests_common.test_utils.watcher import watcher
    
        # This test needs watcher to properly mark success/failure
        # when "tearDown" task with trigger rule is part of the DAG
        list(dag.tasks) >> watcher()
        

    DAG configuration parameters

    Parameter

    Required

    Description

    dag_id

    Yes

    The name of the DAG. You can enter a custom name.

    default_args

    Yes

    • cluster_id: the ID of the AnalyticDB for MySQL cluster.

    • rg_name: the name of the job resource group in the AnalyticDB for MySQL cluster.

    • region: the region ID of the AnalyticDB for MySQL cluster.

    For more information about optional parameters, see DAG parameters.

    AnalyticDBSparkBatchOperator configuration parameters

    Parameter

    Required

    Description

    task_id

    Yes

    The job ID.

    file

    Yes

    The absolute path of the main file of the Spark application. The main file can be a JAR package that contains the entry point or an executable file that serves as the entry point for the Python application.

    Important

    The main files of Spark applications must be stored in Object Storage Service (OSS).

    The OSS bucket and the AnalyticDB for MySQL cluster must reside in the same region.

    class_name

    Yes if specific conditions are met

    • The entry point of the Java or Scala application.

    • Python applications do not require entry points.

    For more information about optional parameters, see AnalyticDBSparkBatchOperator parameters.

  4. Store the spark_dags.py file in the folder where the Airflow configuration declaration file dags_folder is located.

  5. Execute the DAG. For more information, see Airflow documentation.

Spark-submit

Note

You can configure specific parameters of AnalyticDB for MySQL in the spark-defaults.conf file that is stored in the conf folder of AnalyticDB for MySQL Spark. You can also use Airflow parameters to configure the parameters. The specific parameters of AnalyticDB for MySQL include clusterId, regionId, keyId, secretId, and ossUploadPath. For more information, see the "Spark application configuration parameters" section of the Use spark-submit to develop Spark applications topic.

  1. Run the following command to install the Airflow Spark plug-in:

    pip3 install apache-airflow-providers-apache-spark
    Important
    • You must install Python 3 before you can install the Airflow Spark plug-in.

    • When you install the apache-airflow-providers-apache-spark plug-in, PySpark, which is developed by the Apache Spark community, is automatically installed. If you want to uninstall PySpark, run the following command:

    • pip3 uninstall pyspark
  2. Download AnalyticDB for MySQL spark-submit and configure the parameters.

  3. Run the following command to add the address of AnalyticDB for MySQL spark-submit to the path of Airflow:

    export PATH=$PATH:
    Important

    Before you start Airflow, you must add the address of AnalyticDB for MySQL spark-submit to the path of Airflow. Otherwise, the system may fail to find spark-submit to schedule jobs.

  4. Create a declaration file for an Airflow DAG. In this example, a file named demo.py is created.

    from datetime import datetime
    
    from airflow.models.dag import DAG
    from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkBatchOperator
    from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator
    
    with DAG(
        dag_id=DAG_ID,
        start_date=datetime(2021, 1, 1),
        schedule=None,
        default_args={"cluster_id": "your cluster", "rg_name": "your resource group", "region": "your region"},
        max_active_runs=1,
        catchup=False,
    ) as dag:
        spark_pi = AnalyticDBSparkBatchOperator(
            task_id="task1",
            file="local:///tmp/spark-examples.jar",
            class_name="org.apache.spark.examples.SparkPi",
        )
    
        spark_lr = AnalyticDBSparkBatchOperator(
            task_id="task2",
            file="local:///tmp/spark-examples.jar",
            class_name="org.apache.spark.examples.SparkLR",
        )
    
        spark_pi >> spark_lr
    
        from tests_common.test_utils.watcher import watcher
    
        # This test needs watcher to properly mark success/failure
        # when "tearDown" task with trigger rule is part of the DAG
        list(dag.tasks) >> watcher()
        

    Parameters:

    DAG configuration parameters

    Parameter

    Required

    Description

    dag_id

    Yes

    The name of the DAG. You can enter a custom name.

    default_args

    Yes

    • cluster_id: the ID of the AnalyticDB for MySQL cluster.

    • rg_name: the name of the job resource group in the AnalyticDB for MySQL cluster.

    • region: the region ID of the AnalyticDB for MySQL cluster.

    For more information about optional parameters, see DAG parameters.

    AnalyticDBSparkBatchOperator configuration parameters

    Parameter

    Required

    Description

    task_id

    Yes

    The job ID.

    file

    Yes

    The absolute path of the main file of the Spark application. The main file can be a JAR package that contains the entry point or an executable file that serves as the entry point for the Python application.

    Important

    The main files of Spark applications must be stored in Object Storage Service (OSS).

    The OSS bucket and the AnalyticDB for MySQL cluster must reside in the same region.

    class_name

    Yes if specific conditions are met

    • The entry point of the Java or Scala application.

    • Python applications do not require entry points.

    For more information about optional parameters, see AnalyticDBSparkBatchOperator parameters.

  5. Store the demo.py file in the dags folder of the Airflow installation directory.

  6. Execute the DAG. For more information, see Airflow documentation.