All Products
Search
Document Center

AnalyticDB:Use Airflow to schedule Spark jobs

Last Updated:Mar 28, 2026

This guide explains how to schedule AnalyticDB for MySQL Spark jobs using Apache Airflow. Airflow orchestrates workloads as directed acyclic graphs (DAGs). You can connect Airflow to AnalyticDB for MySQL using two methods:

MethodBest for
Spark Airflow OperatorTighter integration with AnalyticDB for MySQL; uses AccessKey authentication; supports both SQL and JAR jobs
spark-submitStandard Apache Airflow Spark provider; suitable if you already use the apache-airflow-providers-apache-spark package

Prerequisites

Before you begin, make sure you have:

Schedule Spark SQL jobs

AnalyticDB for MySQL supports Spark SQL in batch mode and interactive mode. The setup differs between the two.

Batch mode

Spark Airflow Operator

  1. Install the Airflow Spark plug-in:

    pip install https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20230608/qvjf/adb_spark_airflow-0.0.1-py3-none-any.whl
  2. Create an Airflow connection. In the Airflow web UI, go to Admin > Connections and add a connection with the following JSON as the connection extra:

    Important

    Use a Resource Access Management (RAM) user with the minimum required permissions. Do not use your Alibaba Cloud root account credentials.

    ParameterDescription
    auth_typeAuthentication method. Set to AK to use AccessKey pair authentication.
    access_key_idThe AccessKey ID of your RAM user with access to AnalyticDB for MySQL.
    access_key_secretThe AccessKey secret of your RAM user.
    regionThe region ID of the AnalyticDB for MySQL cluster.
    {
      "auth_type": "AK",
      "access_key_id": "<your_access_key_ID>",
      "access_key_secret": "<your_access_key_secret>",
      "region": "<your_region>"
    }
  3. Create a DAG file named spark_dags.py. The following example uses AnalyticDBSparkSQLOperator to run a SHOW DATABASES query:

    DAG parameters:

    ParameterRequiredDescription
    dag_idYesThe DAG name.
    default_argsYesCluster-level defaults: cluster_id (cluster ID), rg_name (job resource group name), region (region ID). For more information, see DAG parameters.

    AnalyticDBSparkSQLOperator parameters:

    ParameterRequiredDescription
    task_idYesThe job ID.
    sqlYesThe Spark SQL statement. For more information, see Airflow parameters.
    from datetime import datetime
    
    from airflow.models.dag import DAG
    from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator
    
    with DAG(
        dag_id="my_dag_name",
        default_args={"cluster_id": "<your_cluster_ID>", "rg_name": "<your_resource_group>", "region": "<your_region>"},
    ) as dag:
    
        spark_sql = AnalyticDBSparkSQLOperator(
            task_id="task2",
            sql="SHOW DATABASES;"
        )
    
        spark_sql
  4. Copy spark_dags.py to the dags_folder directory defined in your Airflow configuration.

  5. Trigger the DAG from the Airflow web UI. For guidance, see the Airflow tutorial.

spark-submit

Note You can set AnalyticDB for MySQL-specific parameters (clusterId, regionId, keyId, secretId) in the conf/spark-defaults.conf file or as Airflow parameters. For the full list, see Spark application configuration parameters.
  1. Install the Airflow Spark plug-in:

    Important

    Installing apache-airflow-providers-apache-spark automatically installs PySpark. To remove PySpark, run pip3 uninstall pyspark.

    pip3 install apache-airflow-providers-apache-spark
  2. Download the spark-submit package and configure the parameters.

  3. Add the spark-submit binary to Airflow's PATH before starting Airflow:

    Important

    Set PATH before starting Airflow. If Airflow starts without the spark-submit path, it cannot locate the command at runtime.

    export PATH=$PATH:</your/adb/spark/path/bin>
  4. Create a DAG file named demo.py:

    from airflow.models import DAG
    from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator
    from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
    from airflow.utils.dates import days_ago
    args = {
        'owner': 'Aliyun ADB Spark',
    }
    with DAG(
        dag_id='example_spark_operator',
        default_args=args,
        schedule_interval=None,
        start_date=days_ago(2),
        tags=['example'],
    ) as dag:
        adb_spark_conf = {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.resourceSpec": "medium"
        }
        # Submit a Spark application from an OSS path
        submit_job = SparkSubmitOperator(
            conf=adb_spark_conf,
            application="oss://<bucket_name>/jar/pi.py",
            task_id="submit_job",
            verbose=True
        )
        # Run a Spark SQL query
        sql_job = SparkSqlOperator(
            conn_id="spark_default",
            sql="SELECT * FROM yourdb.yourtable",
            conf=",".join([k + "=" + v for k, v in adb_spark_conf.items()]),
            task_id="sql_job",
            verbose=True
        )
        submit_job >> sql_job
  5. Copy demo.py to the dags folder in the Airflow installation directory.

  6. Trigger the DAG from the Airflow web UI. For guidance, see the Airflow tutorial.

Interactive mode

Interactive mode connects Airflow to the AnalyticDB for MySQL cluster through a JDBC endpoint using HiveServer2 Thrift.

  1. Get the Spark interactive resource group endpoint: You must click Apply for Endpoint next to Public Endpoint to request a public endpoint in the following cases:

    • The client tool used to submit Spark SQL jobs is deployed on a local machine or an external server.

    • The client tool used to submit Spark SQL jobs is deployed on an ECS instance, and the ECS instance and the AnalyticDB for MySQL cluster are not in the same VPC.

    1. Log on to the AnalyticDB for MySQL console. In the upper-left corner, select a region. In the left-side navigation pane, click Clusters, and then click your cluster ID.

    2. In the navigation pane, choose Cluster Management > Resource Management, and click the Resource Groups tab.

    3. Find the target resource group and click Details in the Actions column. Copy the internal or public endpoint. You can also copy the JDBC connection string from the Port field. image

  2. Install the required dependencies:

    pip install apache-airflow-providers-apache-hive "apache-airflow-providers-common-sql==1.21.0"

    For package details, see apache-airflow-providers-apache-hive and apache-airflow-providers-common-sql.

  3. In the Airflow web UI, go to Admin > Connections.

  4. Click image to add a connection. Configure the following parameters:

    ParameterDescription
    Connection IdThe connection name. Example: adb_spark_cluster.
    Connection TypeSelect Hive Server 2 Thrift.
    HostThe endpoint from step 1. Replace default with the actual database name and remove the resource_group=<resource group name> suffix. Example: jdbc:hive2://amv-t4naxpqk****sparkwho.ads.aliyuncs.com:10000/adb_demo.
    SchemaThe database name. Example: adb_demo.
    LoginThe resource group name and database account in resource_group_name/database_account_name format. Example: spark_interactive_prod/spark_user.
    PasswordThe AnalyticDB for MySQL database account password.
    Port10000
    Extra{"auth_mechanism": "CUSTOM"}
  5. Create a DAG file. The following example runs show databases on a daily schedule:

    ParameterRequiredDescription
    task_idYesThe job ID.
    conn_idYesThe connection name from step 4.
    sqlYesThe Spark SQL statement.
    from airflow import DAG
    from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
    from datetime import datetime
    
    default_args = {
        'owner': 'airflow',
        'start_date': datetime(2025, 2, 10),
        'retries': 1,
    }
    
    dag = DAG(
        'adb_spark_sql_test',
        default_args=default_args,
        schedule_interval='@daily',
    )
    
    jdbc_query = SQLExecuteQueryOperator(
        task_id='execute_spark_sql_query',
        conn_id='adb_spark_cluster',  # Connection created in step 4
        sql='show databases',
        dag=dag
    )
    
    jdbc_query
  6. In the Airflow web UI, click image next to the DAG to trigger it.

Schedule Spark JAR jobs

Spark Airflow Operator

  1. Install the Airflow Spark plug-in:

    pip install https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20230608/qvjf/adb_spark_airflow-0.0.1-py3-none-any.whl
  2. Create an Airflow connection. In the Airflow web UI, go to Admin > Connections and add a connection with the following JSON:

    Important

    Use a RAM user with the minimum required permissions. Do not use your root account credentials. For details, see Accounts and permissions.

    ParameterDescription
    auth_typeAuthentication method. Set to AK.
    access_key_idThe AccessKey ID of your RAM user.
    access_key_secretThe AccessKey secret of your RAM user.
    regionThe region ID of the AnalyticDB for MySQL cluster.
    {
      "auth_type": "AK",
      "access_key_id": "<your_access_key_ID>",
      "access_key_secret": "<your_access_key_secret>",
      "region": "<your_region>"
    }
  3. Create a DAG file named spark_dags.py. The following example runs two JAR-based tasks in sequence using AnalyticDBSparkBatchOperator:

    Important

    Store all Spark application main files in Object Storage Service (OSS). The OSS bucket and the AnalyticDB for MySQL cluster must be in the same region.

    DAG parameters:

    ParameterRequiredDescription
    dag_idYesThe DAG name.
    default_argsYesCluster-level defaults: cluster_id, rg_name, region. For more information, see DAG parameters.

    AnalyticDBSparkBatchOperator parameters:

    ParameterRequiredDescription
    task_idYesThe job ID.
    fileYesAbsolute path to the Spark application's main file — a JAR package (Java/Scala) or Python entry-point file. Must be stored in OSS.
    class_nameRequired for Java/ScalaThe entry point class. Python applications do not require this. For more information, see AnalyticDBSparkBatchOperator parameters.
    from datetime import datetime
    
    from airflow.models.dag import DAG
    from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkBatchOperator
    from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator
    
    with DAG(
        dag_id=DAG_ID,
        default_args={"cluster_id": "your cluster", "rg_name": "your resource group", "region": "your region"},
    ) as dag:
        spark_pi = AnalyticDBSparkBatchOperator(
            task_id="task1",
            file="local:///tmp/spark-examples.jar",
            class_name="org.apache.spark.examples.SparkPi",
        )
    
        spark_lr = AnalyticDBSparkBatchOperator(
            task_id="task2",
            file="local:///tmp/spark-examples.jar",
            class_name="org.apache.spark.examples.SparkLR",
        )
    
        spark_pi >> spark_lr
  4. Copy spark_dags.py to the dags_folder directory defined in your Airflow configuration.

  5. Trigger the DAG from the Airflow web UI. For guidance, see the Airflow tutorial.

spark-submit

Note You can set AnalyticDB for MySQL-specific parameters (clusterId, regionId, keyId, secretId, ossUploadPath) in the conf/spark-defaults.conf file or as Airflow parameters. For the full list, see Spark application configuration parameters.
  1. Install the Airflow Spark plug-in:

    Important

    Installing apache-airflow-providers-apache-spark automatically installs PySpark. To remove PySpark, run pip3 uninstall pyspark.

    pip3 install apache-airflow-providers-apache-spark
  2. Download the spark-submit package and configure the parameters.

  3. Add the spark-submit binary to Airflow's PATH before starting Airflow:

    Important

    Set PATH before starting Airflow. If Airflow starts without the spark-submit path, it cannot locate the command at runtime.

    export PATH=$PATH:</your/adb/spark/path/bin>
  4. Create a DAG file named demo.py. The following example runs two JAR tasks in sequence:

    DAG parameters:

    ParameterRequiredDescription
    dag_idYesThe DAG name.
    default_argsYesCluster-level defaults: cluster_id, rg_name, region. For more information, see DAG parameters.

    AnalyticDBSparkBatchOperator parameters:

    ParameterRequiredDescription
    task_idYesThe job ID.
    fileYesAbsolute path to the Spark application's main file. Must be stored in OSS.
    class_nameRequired for Java/ScalaThe entry point class. Python applications do not require this. For more information, see AnalyticDBSparkBatchOperator parameters.
    from datetime import datetime
    
    from airflow.models.dag import DAG
    from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkBatchOperator
    from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator
    
    with DAG(
        dag_id=DAG_ID,
        start_date=datetime(2021, 1, 1),
        schedule=None,
        default_args={"cluster_id": "your cluster", "rg_name": "your resource group", "region": "your region"},
        max_active_runs=1,
        catchup=False,
    ) as dag:
        spark_pi = AnalyticDBSparkBatchOperator(
            task_id="task1",
            file="local:///tmp/spark-examples.jar",
            class_name="org.apache.spark.examples.SparkPi",
        )
    
        spark_lr = AnalyticDBSparkBatchOperator(
            task_id="task2",
            file="local:///tmp/spark-examples.jar",
            class_name="org.apache.spark.examples.SparkLR",
        )
    
        spark_pi >> spark_lr
  5. Copy demo.py to the dags folder in the Airflow installation directory.

  6. Trigger the DAG from the Airflow web UI. For guidance, see the Airflow tutorial.