This guide explains how to schedule AnalyticDB for MySQL Spark jobs using Apache Airflow. Airflow orchestrates workloads as directed acyclic graphs (DAGs). You can connect Airflow to AnalyticDB for MySQL using two methods:
| Method | Best for |
|---|---|
| Spark Airflow Operator | Tighter integration with AnalyticDB for MySQL; uses AccessKey authentication; supports both SQL and JAR jobs |
| spark-submit | Standard Apache Airflow Spark provider; suitable if you already use the apache-airflow-providers-apache-spark package |
Prerequisites
Before you begin, make sure you have:
An AnalyticDB for MySQL Enterprise Edition, Basic Edition, or Data Lakehouse Edition cluster
A job resource group or Spark interactive resource group created for the cluster
Python 3.7 or later
The Airflow server's IP address added to the cluster whitelist
Schedule Spark SQL jobs
AnalyticDB for MySQL supports Spark SQL in batch mode and interactive mode. The setup differs between the two.
Batch mode
Spark Airflow Operator
Install the Airflow Spark plug-in:
pip install https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20230608/qvjf/adb_spark_airflow-0.0.1-py3-none-any.whlCreate an Airflow connection. In the Airflow web UI, go to Admin > Connections and add a connection with the following JSON as the connection extra:
ImportantUse a Resource Access Management (RAM) user with the minimum required permissions. Do not use your Alibaba Cloud root account credentials.
Parameter Description auth_typeAuthentication method. Set to AKto use AccessKey pair authentication.access_key_idThe AccessKey ID of your RAM user with access to AnalyticDB for MySQL. access_key_secretThe AccessKey secret of your RAM user. regionThe region ID of the AnalyticDB for MySQL cluster. { "auth_type": "AK", "access_key_id": "<your_access_key_ID>", "access_key_secret": "<your_access_key_secret>", "region": "<your_region>" }Create a DAG file named
spark_dags.py. The following example usesAnalyticDBSparkSQLOperatorto run aSHOW DATABASESquery:DAG parameters:
Parameter Required Description dag_idYes The DAG name. default_argsYes Cluster-level defaults: cluster_id(cluster ID),rg_name(job resource group name),region(region ID). For more information, see DAG parameters.AnalyticDBSparkSQLOperator parameters:
Parameter Required Description task_idYes The job ID. sqlYes The Spark SQL statement. For more information, see Airflow parameters. from datetime import datetime from airflow.models.dag import DAG from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator with DAG( dag_id="my_dag_name", default_args={"cluster_id": "<your_cluster_ID>", "rg_name": "<your_resource_group>", "region": "<your_region>"}, ) as dag: spark_sql = AnalyticDBSparkSQLOperator( task_id="task2", sql="SHOW DATABASES;" ) spark_sqlCopy
spark_dags.pyto thedags_folderdirectory defined in your Airflow configuration.Trigger the DAG from the Airflow web UI. For guidance, see the Airflow tutorial.
spark-submit
clusterId, regionId, keyId, secretId) in the conf/spark-defaults.conf file or as Airflow parameters. For the full list, see Spark application configuration parameters.Install the Airflow Spark plug-in:
ImportantInstalling
apache-airflow-providers-apache-sparkautomatically installs PySpark. To remove PySpark, runpip3 uninstall pyspark.pip3 install apache-airflow-providers-apache-sparkDownload the spark-submit package and configure the parameters.
Add the spark-submit binary to Airflow's
PATHbefore starting Airflow:ImportantSet
PATHbefore starting Airflow. If Airflow starts without the spark-submit path, it cannot locate the command at runtime.export PATH=$PATH:</your/adb/spark/path/bin>Create a DAG file named
demo.py:from airflow.models import DAG from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator from airflow.utils.dates import days_ago args = { 'owner': 'Aliyun ADB Spark', } with DAG( dag_id='example_spark_operator', default_args=args, schedule_interval=None, start_date=days_ago(2), tags=['example'], ) as dag: adb_spark_conf = { "spark.driver.resourceSpec": "medium", "spark.executor.resourceSpec": "medium" } # Submit a Spark application from an OSS path submit_job = SparkSubmitOperator( conf=adb_spark_conf, application="oss://<bucket_name>/jar/pi.py", task_id="submit_job", verbose=True ) # Run a Spark SQL query sql_job = SparkSqlOperator( conn_id="spark_default", sql="SELECT * FROM yourdb.yourtable", conf=",".join([k + "=" + v for k, v in adb_spark_conf.items()]), task_id="sql_job", verbose=True ) submit_job >> sql_jobCopy
demo.pyto thedagsfolder in the Airflow installation directory.Trigger the DAG from the Airflow web UI. For guidance, see the Airflow tutorial.
Interactive mode
Interactive mode connects Airflow to the AnalyticDB for MySQL cluster through a JDBC endpoint using HiveServer2 Thrift.
Get the Spark interactive resource group endpoint: You must click Apply for Endpoint next to Public Endpoint to request a public endpoint in the following cases:
The client tool used to submit Spark SQL jobs is deployed on a local machine or an external server.
The client tool used to submit Spark SQL jobs is deployed on an ECS instance, and the ECS instance and the AnalyticDB for MySQL cluster are not in the same VPC.
Log on to the AnalyticDB for MySQL console. In the upper-left corner, select a region. In the left-side navigation pane, click Clusters, and then click your cluster ID.
In the navigation pane, choose Cluster Management > Resource Management, and click the Resource Groups tab.
Find the target resource group and click Details in the Actions column. Copy the internal or public endpoint. You can also copy the JDBC connection string from the Port field.

Install the required dependencies:
pip install apache-airflow-providers-apache-hive "apache-airflow-providers-common-sql==1.21.0"For package details, see apache-airflow-providers-apache-hive and apache-airflow-providers-common-sql.
In the Airflow web UI, go to Admin > Connections.
Click
to add a connection. Configure the following parameters:Parameter Description Connection Id The connection name. Example: adb_spark_cluster.Connection Type Select Hive Server 2 Thrift. Host The endpoint from step 1. Replace defaultwith the actual database name and remove theresource_group=<resource group name>suffix. Example:jdbc:hive2://amv-t4naxpqk****sparkwho.ads.aliyuncs.com:10000/adb_demo.Schema The database name. Example: adb_demo.Login The resource group name and database account in resource_group_name/database_account_nameformat. Example:spark_interactive_prod/spark_user.Password The AnalyticDB for MySQL database account password. Port 10000Extra {"auth_mechanism": "CUSTOM"}Create a DAG file. The following example runs
show databaseson a daily schedule:Parameter Required Description task_idYes The job ID. conn_idYes The connection name from step 4. sqlYes The Spark SQL statement. from airflow import DAG from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator from datetime import datetime default_args = { 'owner': 'airflow', 'start_date': datetime(2025, 2, 10), 'retries': 1, } dag = DAG( 'adb_spark_sql_test', default_args=default_args, schedule_interval='@daily', ) jdbc_query = SQLExecuteQueryOperator( task_id='execute_spark_sql_query', conn_id='adb_spark_cluster', # Connection created in step 4 sql='show databases', dag=dag ) jdbc_queryIn the Airflow web UI, click
next to the DAG to trigger it.
Schedule Spark JAR jobs
Spark Airflow Operator
Install the Airflow Spark plug-in:
pip install https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20230608/qvjf/adb_spark_airflow-0.0.1-py3-none-any.whlCreate an Airflow connection. In the Airflow web UI, go to Admin > Connections and add a connection with the following JSON:
ImportantUse a RAM user with the minimum required permissions. Do not use your root account credentials. For details, see Accounts and permissions.
Parameter Description auth_typeAuthentication method. Set to AK.access_key_idThe AccessKey ID of your RAM user. access_key_secretThe AccessKey secret of your RAM user. regionThe region ID of the AnalyticDB for MySQL cluster. { "auth_type": "AK", "access_key_id": "<your_access_key_ID>", "access_key_secret": "<your_access_key_secret>", "region": "<your_region>" }Create a DAG file named
spark_dags.py. The following example runs two JAR-based tasks in sequence usingAnalyticDBSparkBatchOperator:ImportantStore all Spark application main files in Object Storage Service (OSS). The OSS bucket and the AnalyticDB for MySQL cluster must be in the same region.
DAG parameters:
Parameter Required Description dag_idYes The DAG name. default_argsYes Cluster-level defaults: cluster_id,rg_name,region. For more information, see DAG parameters.AnalyticDBSparkBatchOperator parameters:
Parameter Required Description task_idYes The job ID. fileYes Absolute path to the Spark application's main file — a JAR package (Java/Scala) or Python entry-point file. Must be stored in OSS. class_nameRequired for Java/Scala The entry point class. Python applications do not require this. For more information, see AnalyticDBSparkBatchOperator parameters. from datetime import datetime from airflow.models.dag import DAG from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkBatchOperator from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator with DAG( dag_id=DAG_ID, default_args={"cluster_id": "your cluster", "rg_name": "your resource group", "region": "your region"}, ) as dag: spark_pi = AnalyticDBSparkBatchOperator( task_id="task1", file="local:///tmp/spark-examples.jar", class_name="org.apache.spark.examples.SparkPi", ) spark_lr = AnalyticDBSparkBatchOperator( task_id="task2", file="local:///tmp/spark-examples.jar", class_name="org.apache.spark.examples.SparkLR", ) spark_pi >> spark_lrCopy
spark_dags.pyto thedags_folderdirectory defined in your Airflow configuration.Trigger the DAG from the Airflow web UI. For guidance, see the Airflow tutorial.
spark-submit
clusterId, regionId, keyId, secretId, ossUploadPath) in the conf/spark-defaults.conf file or as Airflow parameters. For the full list, see Spark application configuration parameters.Install the Airflow Spark plug-in:
ImportantInstalling
apache-airflow-providers-apache-sparkautomatically installs PySpark. To remove PySpark, runpip3 uninstall pyspark.pip3 install apache-airflow-providers-apache-sparkDownload the spark-submit package and configure the parameters.
Add the spark-submit binary to Airflow's
PATHbefore starting Airflow:ImportantSet
PATHbefore starting Airflow. If Airflow starts without the spark-submit path, it cannot locate the command at runtime.export PATH=$PATH:</your/adb/spark/path/bin>Create a DAG file named
demo.py. The following example runs two JAR tasks in sequence:DAG parameters:
Parameter Required Description dag_idYes The DAG name. default_argsYes Cluster-level defaults: cluster_id,rg_name,region. For more information, see DAG parameters.AnalyticDBSparkBatchOperator parameters:
Parameter Required Description task_idYes The job ID. fileYes Absolute path to the Spark application's main file. Must be stored in OSS. class_nameRequired for Java/Scala The entry point class. Python applications do not require this. For more information, see AnalyticDBSparkBatchOperator parameters. from datetime import datetime from airflow.models.dag import DAG from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkBatchOperator from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator with DAG( dag_id=DAG_ID, start_date=datetime(2021, 1, 1), schedule=None, default_args={"cluster_id": "your cluster", "rg_name": "your resource group", "region": "your region"}, max_active_runs=1, catchup=False, ) as dag: spark_pi = AnalyticDBSparkBatchOperator( task_id="task1", file="local:///tmp/spark-examples.jar", class_name="org.apache.spark.examples.SparkPi", ) spark_lr = AnalyticDBSparkBatchOperator( task_id="task2", file="local:///tmp/spark-examples.jar", class_name="org.apache.spark.examples.SparkLR", ) spark_pi >> spark_lrCopy
demo.pyto thedagsfolder in the Airflow installation directory.Trigger the DAG from the Airflow web UI. For guidance, see the Airflow tutorial.