Airflow is a popular open source scheduling tool that orchestrates and schedules various workloads as directed acyclic graphs (DAGs). You can schedule Spark jobs using the Spark Airflow Operator or the spark-submit command-line tool. This topic describes how to use Airflow to schedule AnalyticDB for MySQL Spark jobs.
Prerequisites
An AnalyticDB for MySQL Enterprise Edition, Basic Edition, or Data Lakehouse Edition cluster is created.
A job resource group or a Spark interactive resource group is created for the AnalyticDB for MySQL cluster.
Python 3.7 or later is installed.
The IP address of the server that runs Airflow is added to a whitelist of the AnalyticDB for MySQL cluster.
Schedule Spark SQL jobs
AnalyticDB for MySQL allows you to execute Spark SQL in batch or interactive mode. The schedule procedure varies based on the execution mode.
Batch mode
Spark Airflow Operator
Run the following command to install the Airflow Spark plug-in:
pip install https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20230608/qvjf/adb_spark_airflow-0.0.1-py3-none-any.whlCreate a connection. Example:
{ "auth_type": "AK", "access_key_id": "<your_access_key_ID>", "access_key_secret": "<your_access_key_secret>", "region": "<your_region>" }The following table describes the parameters.
Parameter
Description
auth_type
The authentication method. Set the value to AK, which specifies that AccessKey pairs are used for authentication.
access_key_id
The AccessKey ID of your Alibaba Cloud account or a Resource Access Management (RAM) user that has access permissions on AnalyticDB for MySQL.
For information about how to obtain an AccessKey ID and an AccessKey secret, see Accounts and permissions.
access_key_secret
The AccessKey secret of your Alibaba Cloud account or a RAM user that has access permissions on AnalyticDB for MySQL.
For information about how to obtain an AccessKey ID and an AccessKey secret, see Accounts and permissions.
region
The region ID of the AnalyticDB for MySQL cluster.
Create a declaration file for an Airflow DAG. In this example, a file named
spark_dags.pyis created.from datetime import datetime from airflow.models.dag import DAG from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator with DAG( dag_id="my_dag_name", default_args={"cluster_id": "<your_cluster_ID>", "rg_name": "<your_resource_group>", "region": "<your_region>"}, ) as dag: spark_sql = AnalyticDBSparkSQLOperator( task_id="task2", sql="SHOW DATABASES;" ) spark_sqlThe following tables describe the parameters.
DAG configuration parameters
Parameter
Required
Description
dag_id
Yes
The name of the DAG. You can enter a custom name.
default_args
Yes
cluster_id: the ID of the AnalyticDB for MySQL cluster.
rg_name: the name of the job resource group in the AnalyticDB for MySQL cluster.
region: the region ID of the AnalyticDB for MySQL cluster.
For more information, see DAG parameters.
AnalyticDBSparkSQLOperator configuration parameters
Parameter
Required
Description
task_id
Yes
The job ID.
SQL
Yes
The Spark SQL statement.
For more information, see Airflow parameters.
Store the
spark_dags.pyfile in the folder where the Airflow configuration declaration file dags_folder is located.Execute the DAG. For more information, see Airflow documentation.
Spark-submit
You can configure specific parameters of AnalyticDB for MySQL in the AnalyticDB for MySQL Spark conf/spark-defaults.conf configuration file or using Airflow parameters. The specific parameters include clusterId, regionId, keyId, and secretId. For more information, see Spark application configuration parameters.
Run the following command to install the Airflow Spark plug-in:
pip3 install apache-airflow-providers-apache-sparkImportantYou must install Python 3 before you can install the Airflow Spark plug-in.
When you install the apache-airflow-providers-apache-spark plug-in, PySpark, which is developed by the Apache Spark community, is automatically installed. If you want to uninstall PySpark, run the following command:
pip3 uninstall pyspark
Download the spark-submit package and configure the parameters.
Run the following command to add the address of spark-submit to the path of Airflow:
export PATH=$PATH:</your/adb/spark/path/bin>ImportantBefore you start Airflow, you must add the address of spark-submit to the path of Airflow. Otherwise, the system may fail to find the spark-submit command when scheduling jobs.
Create a declaration file for an Airflow DAG. In this example, a file named demo.py is created.
from airflow.models import DAG from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator from airflow.utils.dates import days_ago args = { 'owner': 'Aliyun ADB Spark', } with DAG( dag_id='example_spark_operator', default_args=args, schedule_interval=None, start_date=days_ago(2), tags=['example'], ) as dag: adb_spark_conf = { "spark.driver.resourceSpec": "medium", "spark.executor.resourceSpec": "medium" } # [START howto_operator_spark_submit] submit_job = SparkSubmitOperator( conf=adb_spark_conf, application="oss://<bucket_name>/jar/pi.py", task_id="submit_job", verbose=True ) # [END howto_operator_spark_submit] # [START howto_operator_spark_sql] sql_job = SparkSqlOperator( conn_id="spark_default", sql="SELECT * FROM yourdb.yourtable", conf=",".join([k + "=" + v for k, v in adb_spark_conf.items()]), task_id="sql_job", verbose=True ) # [END howto_operator_spark_sql] submit_job >> sql_jobStore the demo.py file in the dags folder of the Airflow installation directory.
Execute the DAG. For more information, see Airflow documentation.
Interactive mode
Obtain the endpoint of the Spark Interactive resource group.
Log on to the AnalyticDB for MySQL console. In the upper-left corner of the console, select a region. In the left-side navigation pane, click Clusters. Find the cluster that you want to manage and click the cluster ID.
In the navigation pane on the left, choose , and click the Resource Groups tab.
Find the target resource group and click Details in the Actions column to view the internal endpoint and public endpoint. You can click the
icon next to an endpoint to copy it. You can also click the
icon in the parentheses next to Port to copy the JDBC connection string.In the following cases, you must click Apply for Endpoint next to Public Endpoint to manually request a public endpoint.
The client tool used to submit Spark SQL jobs is deployed on a local machine or an external server.
The client tool used to submit Spark SQL jobs is deployed on an ECS instance, and the ECS instance and the AnalyticDB for MySQL cluster are not in the same VPC.

Install the apache-airflow-providers-apache-hive and apache-airflow-providers-common-sql dependencies.
Access the Airflow web interface. In the top navigation bar, choose .
Click the
button. On the Add Connections page, configure the parameters that are described in the following table.Parameter
Description
Connection Id
The name of the connection. In this example,
adb_spark_clusteris used.Connection Type
Select Hive Server 2 Thrift.
Host
The endpoint obtained in Step 1. Replace
defaultin the endpoint with the actual name of the database and delete theresource_group=<resource group name>suffix from the endpoint.Example:
jdbc:hive2://amv-t4naxpqk****sparkwho.ads.aliyuncs.com:10000/adb_demo.Schema
The name of the database. In this example,
adb_demois used.Login
The names of the database account and the interactive resource group in the AnalyticDB for MySQL cluster. Format:
resource_group_name/database_account_name.Example:
spark_interactive_prod/spark_user.Password
The password of the AnalyticDB for MySQL database account.
Port
The port number for Spark interactive resource groups. Set the value to 10000.
Extra
The authentication method. Enter the following content, which specifies that username and password authentication is used:
{ "auth_mechanism": "CUSTOM" }Write a DAG file.
from airflow import DAG from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator from datetime import datetime default_args = { 'owner': 'airflow', 'start_date': datetime(2025, 2, 10), 'retries': 1, } dag = DAG( 'adb_spark_sql_test', default_args=default_args, schedule_interval='@daily', ) jdbc_query = SQLExecuteQueryOperator( task_id='execute_spark_sql_query', conn_id='adb_spark_cluster', sql='show databases', dag=dag ) jdbc_queryThe following table describes the parameters.
Parameter
Required
Description
task_id
Yes
The job ID. You can enter a custom ID.
conn_id
Yes
The name of the connection. Enter the ID of the connection that you created in Step 4.
sql
Yes
The Spark SQL statement.
For more information, see Airflow parameters.
On the Airflow web interface, click the
button next to the DAG.
Schedule Spark JAR jobs
Spark Airflow Operator
Run the following command to install the Airflow Spark plug-in:
pip install https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20230608/qvjf/adb_spark_airflow-0.0.1-py3-none-any.whlCreate a connection. Example:
{ "auth_type": "AK", "access_key_id": "<your_access_key_ID>", "access_key_secret": "<your_access_key_secret>", "region": "<your_region>" }The following table describes the parameters.
Parameter
Description
auth_type
The authentication method. Set the value to AK, which specifies that AccessKey pairs are used for authentication.
access_key_id
The AccessKey ID of your Alibaba Cloud account or a Resource Access Management (RAM) user that has access permissions on AnalyticDB for MySQL.
For information about how to obtain an AccessKey ID and an AccessKey secret, see Accounts and permissions.
access_key_secret
The AccessKey secret of your Alibaba Cloud account or a RAM user that has access permissions on AnalyticDB for MySQL.
For information about how to obtain an AccessKey ID and an AccessKey secret, see Accounts and permissions.
region
The region ID of the AnalyticDB for MySQL cluster.
Create a declaration file for an Airflow DAG. In this example, a file named
spark_dags.pyis created.from datetime import datetime from airflow.models.dag import DAG from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkBatchOperator from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator with DAG( dag_id=DAG_ID, default_args={"cluster_id": "your cluster", "rg_name": "your resource group", "region": "your region"}, ) as dag: spark_pi = AnalyticDBSparkBatchOperator( task_id="task1", file="local:///tmp/spark-examples.jar", class_name="org.apache.spark.examples.SparkPi", ) spark_lr = AnalyticDBSparkBatchOperator( task_id="task2", file="local:///tmp/spark-examples.jar", class_name="org.apache.spark.examples.SparkLR", ) spark_pi >> spark_lr from tests_common.test_utils.watcher import watcher # This test needs watcher to properly mark success/failure # when "tearDown" task with trigger rule is part of the DAG list(dag.tasks) >> watcher()DAG configuration parameters
Parameter
Required
Description
dag_id
Yes
The name of the DAG. You can enter a custom name.
default_args
Yes
cluster_id: the ID of the AnalyticDB for MySQL cluster.
rg_name: the name of the job resource group in the AnalyticDB for MySQL cluster.
region: the region ID of the AnalyticDB for MySQL cluster.
For more information, see DAG parameters.
AnalyticDBSparkBatchOperator configuration parameters
Parameter
Required
Description
task_id
Yes
The job ID.
file
Yes
The absolute path of the main file of the Spark application. The main file can be a JAR package that contains the entry point or an executable file that serves as the entry point for the Python application.
ImportantThe main files of Spark applications must be stored in OSS.
The OSS bucket and the AnalyticDB for MySQL cluster must reside in the same region.
class_name
Yes if specific conditions are met
The entry point of the Java or Scala application.
Python applications do not require entry points.
For more information, see AnalyticDBSparkBatchOperator parameters.
Store the
spark_dags.pyfile in the folder where the Airflow configuration declaration file dags_folder is located.Execute the DAG. For more information, see Airflow documentation.
Spark-submit
You can configure specific parameters of AnalyticDB for MySQL in the AnalyticDB for MySQL Spark conf/spark-defaults.conf configuration file or using Airflow parameters. The specific parameters include clusterId, regionId, keyId, secretId, and ossUploadPath. For more information, see Spark application configuration parameters.
Run the following command to install the Airflow Spark plug-in:
pip3 install apache-airflow-providers-apache-sparkImportantYou must install Python 3 before you can install the Airflow Spark plug-in.
When you install the apache-airflow-providers-apache-spark plug-in, PySpark, which is developed by the Apache Spark community, is automatically installed. If you want to uninstall PySpark, run the following command:
pip3 uninstall pyspark
Download the spark-submit package and configure the parameters.
Run the following command to add the address of spark-submit to the path of Airflow:
export PATH=$PATH:</your/adb/spark/path/bin>ImportantBefore you start Airflow, you must add the address of spark-submit to the path of Airflow. Otherwise, the system may fail to find the spark-submit command when scheduling jobs.
Create a declaration file for an Airflow DAG. In this example, a file named demo.py is created.
from datetime import datetime from airflow.models.dag import DAG from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkBatchOperator from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator with DAG( dag_id=DAG_ID, start_date=datetime(2021, 1, 1), schedule=None, default_args={"cluster_id": "your cluster", "rg_name": "your resource group", "region": "your region"}, max_active_runs=1, catchup=False, ) as dag: spark_pi = AnalyticDBSparkBatchOperator( task_id="task1", file="local:///tmp/spark-examples.jar", class_name="org.apache.spark.examples.SparkPi", ) spark_lr = AnalyticDBSparkBatchOperator( task_id="task2", file="local:///tmp/spark-examples.jar", class_name="org.apache.spark.examples.SparkLR", ) spark_pi >> spark_lr from tests_common.test_utils.watcher import watcher # This test needs watcher to properly mark success/failure # when "tearDown" task with trigger rule is part of the DAG list(dag.tasks) >> watcher()The following tables describe the parameters.
DAG configuration parameters
Parameter
Required
Description
dag_id
Yes
The name of the DAG. You can enter a custom name.
default_args
Yes
cluster_id: the ID of the AnalyticDB for MySQL cluster.
rg_name: the name of the job resource group in the AnalyticDB for MySQL cluster.
region: the region ID of the AnalyticDB for MySQL cluster.
For more information, see DAG parameters.
AnalyticDBSparkBatchOperator configuration parameters
Parameter
Required
Description
task_id
Yes
The job ID.
file
Yes
The absolute path of the main file of the Spark application. The main file can be a JAR package that contains the entry point or an executable file that serves as the entry point for the Python application.
ImportantThe main files of Spark applications must be stored in OSS.
The OSS bucket and the AnalyticDB for MySQL cluster must reside in the same region.
class_name
Yes if specific conditions are met
The entry point of the Java or Scala application.
Python applications do not require entry points.
For more information, see AnalyticDBSparkBatchOperator parameters.
Store the demo.py file in the dags folder of the Airflow installation directory.
Execute the DAG. For more information, see Airflow documentation.