All Products
Search
Document Center

AnalyticDB:Use Airflow to schedule Spark jobs

Last Updated:Nov 24, 2025

Airflow is a popular open source scheduling tool that orchestrates and schedules various workloads as directed acyclic graphs (DAGs). You can schedule Spark jobs using the Spark Airflow Operator or the spark-submit command-line tool. This topic describes how to use Airflow to schedule AnalyticDB for MySQL Spark jobs.

Prerequisites

Schedule Spark SQL jobs

AnalyticDB for MySQL allows you to execute Spark SQL in batch or interactive mode. The schedule procedure varies based on the execution mode.

Batch mode

Spark Airflow Operator

  1. Run the following command to install the Airflow Spark plug-in:

    pip install https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20230608/qvjf/adb_spark_airflow-0.0.1-py3-none-any.whl
  2. Create a connection. Example:

    {
      "auth_type": "AK",
      "access_key_id": "<your_access_key_ID>",
      "access_key_secret": "<your_access_key_secret>",
      "region": "<your_region>"
    }

    The following table describes the parameters.

    Parameter

    Description

    auth_type

    The authentication method. Set the value to AK, which specifies that AccessKey pairs are used for authentication.

    access_key_id

    The AccessKey ID of your Alibaba Cloud account or a Resource Access Management (RAM) user that has access permissions on AnalyticDB for MySQL.

    For information about how to obtain an AccessKey ID and an AccessKey secret, see Accounts and permissions.

    access_key_secret

    The AccessKey secret of your Alibaba Cloud account or a RAM user that has access permissions on AnalyticDB for MySQL.

    For information about how to obtain an AccessKey ID and an AccessKey secret, see Accounts and permissions.

    region

    The region ID of the AnalyticDB for MySQL cluster.

  3. Create a declaration file for an Airflow DAG. In this example, a file named spark_dags.py is created.

    from datetime import datetime
    
    from airflow.models.dag import DAG
    from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator
    
    with DAG(
        dag_id="my_dag_name",
        default_args={"cluster_id": "<your_cluster_ID>", "rg_name": "<your_resource_group>", "region": "<your_region>"},
    ) as dag:
    
        spark_sql = AnalyticDBSparkSQLOperator(
            task_id="task2",
            sql="SHOW DATABASES;"
        )
    
        spark_sql
    

    The following tables describe the parameters.

    DAG configuration parameters

    Parameter

    Required

    Description

    dag_id

    Yes

    The name of the DAG. You can enter a custom name.

    default_args

    Yes

    • cluster_id: the ID of the AnalyticDB for MySQL cluster.

    • rg_name: the name of the job resource group in the AnalyticDB for MySQL cluster.

    • region: the region ID of the AnalyticDB for MySQL cluster.

    For more information, see DAG parameters.

    AnalyticDBSparkSQLOperator configuration parameters

    Parameter

    Required

    Description

    task_id

    Yes

    The job ID.

    SQL

    Yes

    The Spark SQL statement.

    For more information, see Airflow parameters.

  4. Store the spark_dags.py file in the folder where the Airflow configuration declaration file dags_folder is located.

  5. Execute the DAG. For more information, see Airflow documentation.

Spark-submit

Note

You can configure specific parameters of AnalyticDB for MySQL in the AnalyticDB for MySQL Spark conf/spark-defaults.conf configuration file or using Airflow parameters. The specific parameters include clusterId, regionId, keyId, and secretId. For more information, see Spark application configuration parameters.

  1. Run the following command to install the Airflow Spark plug-in:

    pip3 install apache-airflow-providers-apache-spark
    Important
    • You must install Python 3 before you can install the Airflow Spark plug-in.

    • When you install the apache-airflow-providers-apache-spark plug-in, PySpark, which is developed by the Apache Spark community, is automatically installed. If you want to uninstall PySpark, run the following command:

      pip3 uninstall pyspark
  2. Download the spark-submit package and configure the parameters.

  3. Run the following command to add the address of spark-submit to the path of Airflow:

    export PATH=$PATH:</your/adb/spark/path/bin>
    Important

    Before you start Airflow, you must add the address of spark-submit to the path of Airflow. Otherwise, the system may fail to find the spark-submit command when scheduling jobs.

  4. Create a declaration file for an Airflow DAG. In this example, a file named demo.py is created.

    from airflow.models import DAG
    from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator
    from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
    from airflow.utils.dates import days_ago
    args = {
        'owner': 'Aliyun ADB Spark',
    }
    with DAG(
        dag_id='example_spark_operator',
        default_args=args,
        schedule_interval=None,
        start_date=days_ago(2),
        tags=['example'],
    ) as dag:
        adb_spark_conf = {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.resourceSpec": "medium"
        }
        # [START howto_operator_spark_submit]
        submit_job = SparkSubmitOperator(
            conf=adb_spark_conf,
            application="oss://<bucket_name>/jar/pi.py",
            task_id="submit_job",
            verbose=True
        )
        # [END howto_operator_spark_submit]
        # [START howto_operator_spark_sql]
        sql_job = SparkSqlOperator(
            conn_id="spark_default",
            sql="SELECT * FROM yourdb.yourtable",
            conf=",".join([k + "=" + v for k, v in adb_spark_conf.items()]),
            task_id="sql_job",
            verbose=True
        )
        # [END howto_operator_spark_sql]
        submit_job >> sql_job
    
  5. Store the demo.py file in the dags folder of the Airflow installation directory.

  6. Execute the DAG. For more information, see Airflow documentation.

Interactive mode

  1. Obtain the endpoint of the Spark Interactive resource group.

    1. Log on to the AnalyticDB for MySQL console. In the upper-left corner of the console, select a region. In the left-side navigation pane, click Clusters. Find the cluster that you want to manage and click the cluster ID.

    2. In the navigation pane on the left, choose Cluster Management > Resource Management, and click the Resource Groups tab.

    3. Find the target resource group and click Details in the Actions column to view the internal endpoint and public endpoint. You can click the image icon next to an endpoint to copy it. You can also click the image icon in the parentheses next to Port to copy the JDBC connection string.

      In the following cases, you must click Apply for Endpoint next to Public Endpoint to manually request a public endpoint.

      • The client tool used to submit Spark SQL jobs is deployed on a local machine or an external server.

      • The client tool used to submit Spark SQL jobs is deployed on an ECS instance, and the ECS instance and the AnalyticDB for MySQL cluster are not in the same VPC.

      image

  2. Install the apache-airflow-providers-apache-hive and apache-airflow-providers-common-sql dependencies.

  3. Access the Airflow web interface. In the top navigation bar, choose Admin > Connections.

  4. Click the image button. On the Add Connections page, configure the parameters that are described in the following table.

    Parameter

    Description

    Connection Id

    The name of the connection. In this example, adb_spark_cluster is used.

    Connection Type

    Select Hive Server 2 Thrift.

    Host

    The endpoint obtained in Step 1. Replace default in the endpoint with the actual name of the database and delete the resource_group=<resource group name> suffix from the endpoint.

    Example: jdbc:hive2://amv-t4naxpqk****sparkwho.ads.aliyuncs.com:10000/adb_demo.

    Schema

    The name of the database. In this example, adb_demo is used.

    Login

    The names of the database account and the interactive resource group in the AnalyticDB for MySQL cluster. Format: resource_group_name/database_account_name.

    Example: spark_interactive_prod/spark_user.

    Password

    The password of the AnalyticDB for MySQL database account.

    Port

    The port number for Spark interactive resource groups. Set the value to 10000.

    Extra

    The authentication method. Enter the following content, which specifies that username and password authentication is used:

    {
      "auth_mechanism": "CUSTOM"
    }
  5. Write a DAG file.

    from airflow import DAG
    from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
    from datetime import datetime
    
    default_args = {
        'owner': 'airflow',
        'start_date': datetime(2025, 2, 10),
        'retries': 1,
    }
    
    dag = DAG(
        'adb_spark_sql_test',
        default_args=default_args,
        schedule_interval='@daily',
    )
    
    
    jdbc_query = SQLExecuteQueryOperator(
        task_id='execute_spark_sql_query', 
        conn_id='adb_spark_cluster',  
        sql='show databases',  
        dag=dag
    )
    
    jdbc_query

    The following table describes the parameters.

    Parameter

    Required

    Description

    task_id

    Yes

    The job ID. You can enter a custom ID.

    conn_id

    Yes

    The name of the connection. Enter the ID of the connection that you created in Step 4.

    sql

    Yes

    The Spark SQL statement.

    For more information, see Airflow parameters.

  6. On the Airflow web interface, click the image button next to the DAG.

Schedule Spark JAR jobs

Spark Airflow Operator

  1. Run the following command to install the Airflow Spark plug-in:

    pip install https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20230608/qvjf/adb_spark_airflow-0.0.1-py3-none-any.whl
  2. Create a connection. Example:

    {
      "auth_type": "AK",
      "access_key_id": "<your_access_key_ID>",
      "access_key_secret": "<your_access_key_secret>",
      "region": "<your_region>"
    }

    The following table describes the parameters.

    Parameter

    Description

    auth_type

    The authentication method. Set the value to AK, which specifies that AccessKey pairs are used for authentication.

    access_key_id

    The AccessKey ID of your Alibaba Cloud account or a Resource Access Management (RAM) user that has access permissions on AnalyticDB for MySQL.

    For information about how to obtain an AccessKey ID and an AccessKey secret, see Accounts and permissions.

    access_key_secret

    The AccessKey secret of your Alibaba Cloud account or a RAM user that has access permissions on AnalyticDB for MySQL.

    For information about how to obtain an AccessKey ID and an AccessKey secret, see Accounts and permissions.

    region

    The region ID of the AnalyticDB for MySQL cluster.

  3. Create a declaration file for an Airflow DAG. In this example, a file named spark_dags.py is created.

    from datetime import datetime
    
    from airflow.models.dag import DAG
    from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkBatchOperator
    from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator
    
    with DAG(
        dag_id=DAG_ID,
        default_args={"cluster_id": "your cluster", "rg_name": "your resource group", "region": "your region"},
    ) as dag:
        spark_pi = AnalyticDBSparkBatchOperator(
            task_id="task1",
            file="local:///tmp/spark-examples.jar",
            class_name="org.apache.spark.examples.SparkPi",
        )
    
        spark_lr = AnalyticDBSparkBatchOperator(
            task_id="task2",
            file="local:///tmp/spark-examples.jar",
            class_name="org.apache.spark.examples.SparkLR",
        )
    
        spark_pi >> spark_lr
    
        from tests_common.test_utils.watcher import watcher
    
        # This test needs watcher to properly mark success/failure
        # when "tearDown" task with trigger rule is part of the DAG
        list(dag.tasks) >> watcher()
        

    DAG configuration parameters

    Parameter

    Required

    Description

    dag_id

    Yes

    The name of the DAG. You can enter a custom name.

    default_args

    Yes

    • cluster_id: the ID of the AnalyticDB for MySQL cluster.

    • rg_name: the name of the job resource group in the AnalyticDB for MySQL cluster.

    • region: the region ID of the AnalyticDB for MySQL cluster.

    For more information, see DAG parameters.

    AnalyticDBSparkBatchOperator configuration parameters

    Parameter

    Required

    Description

    task_id

    Yes

    The job ID.

    file

    Yes

    The absolute path of the main file of the Spark application. The main file can be a JAR package that contains the entry point or an executable file that serves as the entry point for the Python application.

    Important

    The main files of Spark applications must be stored in OSS.

    The OSS bucket and the AnalyticDB for MySQL cluster must reside in the same region.

    class_name

    Yes if specific conditions are met

    • The entry point of the Java or Scala application.

    • Python applications do not require entry points.

    For more information, see AnalyticDBSparkBatchOperator parameters.

  4. Store the spark_dags.py file in the folder where the Airflow configuration declaration file dags_folder is located.

  5. Execute the DAG. For more information, see Airflow documentation.

Spark-submit

Note

You can configure specific parameters of AnalyticDB for MySQL in the AnalyticDB for MySQL Spark conf/spark-defaults.conf configuration file or using Airflow parameters. The specific parameters include clusterId, regionId, keyId, secretId, and ossUploadPath. For more information, see Spark application configuration parameters.

  1. Run the following command to install the Airflow Spark plug-in:

    pip3 install apache-airflow-providers-apache-spark
    Important
    • You must install Python 3 before you can install the Airflow Spark plug-in.

    • When you install the apache-airflow-providers-apache-spark plug-in, PySpark, which is developed by the Apache Spark community, is automatically installed. If you want to uninstall PySpark, run the following command:

      pip3 uninstall pyspark
  2. Download the spark-submit package and configure the parameters.

  3. Run the following command to add the address of spark-submit to the path of Airflow:

    export PATH=$PATH:</your/adb/spark/path/bin>
    Important

    Before you start Airflow, you must add the address of spark-submit to the path of Airflow. Otherwise, the system may fail to find the spark-submit command when scheduling jobs.

  4. Create a declaration file for an Airflow DAG. In this example, a file named demo.py is created.

    from datetime import datetime
    
    from airflow.models.dag import DAG
    from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkBatchOperator
    from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator
    
    with DAG(
        dag_id=DAG_ID,
        start_date=datetime(2021, 1, 1),
        schedule=None,
        default_args={"cluster_id": "your cluster", "rg_name": "your resource group", "region": "your region"},
        max_active_runs=1,
        catchup=False,
    ) as dag:
        spark_pi = AnalyticDBSparkBatchOperator(
            task_id="task1",
            file="local:///tmp/spark-examples.jar",
            class_name="org.apache.spark.examples.SparkPi",
        )
    
        spark_lr = AnalyticDBSparkBatchOperator(
            task_id="task2",
            file="local:///tmp/spark-examples.jar",
            class_name="org.apache.spark.examples.SparkLR",
        )
    
        spark_pi >> spark_lr
    
        from tests_common.test_utils.watcher import watcher
    
        # This test needs watcher to properly mark success/failure
        # when "tearDown" task with trigger rule is part of the DAG
        list(dag.tasks) >> watcher()
        

    The following tables describe the parameters.

    DAG configuration parameters

    Parameter

    Required

    Description

    dag_id

    Yes

    The name of the DAG. You can enter a custom name.

    default_args

    Yes

    • cluster_id: the ID of the AnalyticDB for MySQL cluster.

    • rg_name: the name of the job resource group in the AnalyticDB for MySQL cluster.

    • region: the region ID of the AnalyticDB for MySQL cluster.

    For more information, see DAG parameters.

    AnalyticDBSparkBatchOperator configuration parameters

    Parameter

    Required

    Description

    task_id

    Yes

    The job ID.

    file

    Yes

    The absolute path of the main file of the Spark application. The main file can be a JAR package that contains the entry point or an executable file that serves as the entry point for the Python application.

    Important

    The main files of Spark applications must be stored in OSS.

    The OSS bucket and the AnalyticDB for MySQL cluster must reside in the same region.

    class_name

    Yes if specific conditions are met

    • The entry point of the Java or Scala application.

    • Python applications do not require entry points.

    For more information, see AnalyticDBSparkBatchOperator parameters.

  5. Store the demo.py file in the dags folder of the Airflow installation directory.

  6. Execute the DAG. For more information, see Airflow documentation.