Airflow is a popular open source tool used to orchestrate and schedule various workflows as directed acyclic graphs (DAGs). You can schedule Spark jobs by using the Spark Airflow Operator or the spark-submit command-line tool. This topic describes how to use Airflow to schedule AnalyticDB for MySQL Spark jobs.
Prerequisites
An AnalyticDB for MySQL Enterprise Edition, Basic Edition, or Data Lakehouse Edition cluster is created.
A job resource group or a Spark interactive resource group is created for the AnalyticDB for MySQL cluster.
Python 3.7 or later is installed.
The IP address of the server that runs Airflow is added to an IP address whitelist of the AnalyticDB for MySQL cluster.
Schedule Spark SQL jobs
AnalyticDB for MySQL allows you to execute Spark SQL in batch or interactive mode. The schedule procedure varies based on the execution mode.
Batch mode
Spark Airflow Operator
Run the following command to install the Airflow Spark plug-in:
pip install https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20230608/qvjf/adb_spark_airflow-0.0.1-py3-none-any.whl
Create a connection. Example:
{ "auth_type": "AK", "access_key_id": "", "access_key_secret": "", "region": "" }
The following table describes the parameters.
Parameter
Description
auth_type
The authentication method. Set the value to AK, which specifies that AccessKey pairs are used for authentication.
access_key_id
The AccessKey ID of your Alibaba Cloud account or a Resource Access Management (RAM) user that has access permissions on AnalyticDB for MySQL.
For information about how to obtain an AccessKey ID and an AccessKey secret, see Accounts and permissions.
access_key_secret
The AccessKey secret of your Alibaba Cloud account or a RAM user that has access permissions on AnalyticDB for MySQL.
For information about how to obtain an AccessKey ID and an AccessKey secret, see Accounts and permissions.
region
The region ID of the AnalyticDB for MySQL cluster.
Create a declaration file for an Airflow DAG. In this example, a file named
spark_dags.py
is created.from datetime import datetime from airflow.models.dag import DAG from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator with DAG( dag_id="my_dag_name", default_args={"cluster_id": "", "rg_name": "", "region": ""}, ) as dag: spark_sql = AnalyticDBSparkSQLOperator( task_id="task2", sql="SHOW DATABASES;" ) spark_sql
The following tables describe the parameters.
DAG configuration parameters
Parameter
Required
Description
dag_id
Yes
The name of the DAG. You can enter a custom name.
default_args
Yes
cluster_id: the ID of the AnalyticDB for MySQL cluster.
rg_name: the name of the job resource group in the AnalyticDB for MySQL cluster.
region: the region ID of the AnalyticDB for MySQL cluster.
For more information about optional parameters, see DAG parameters.
AnalyticDBSparkSQLOperator configuration parameters
Parameter
Required
Description
task_id
Yes
The job ID.
SQL
Yes
The Spark SQL statement.
For more information about optional parameters, see Airflow parameters.
Store the
spark_dags.py
file in the folder where the Airflow configuration declaration file dags_folder is located.Execute the DAG. For more information, see Airflow documentation.
Spark-submit
You can configure specific parameters of AnalyticDB for MySQL in the spark-defaults.conf
file that is stored in the conf folder of AnalyticDB for MySQL Spark. You can also use Airflow parameters to configure the parameters. The specific parameters of AnalyticDB for MySQL include clusterId, regionId, keyId, secretId, and ossUploadPath. For more information, see the "Spark application configuration parameters" section of the Use spark-submit to develop Spark applications topic.
Run the following command to install the Airflow Spark plug-in:
pip3 install apache-airflow-providers-apache-spark
ImportantYou must install Python 3 before you can install the Airflow Spark plug-in.
When you install the apache-airflow-providers-apache-spark plug-in, PySpark, which is developed by the Apache Spark community, is automatically installed. If you want to uninstall PySpark, run the following command:
pip3 uninstall pyspark
Download AnalyticDB for MySQL spark-submit and configure the parameters.
Run the following command to add the address of AnalyticDB for MySQL spark-submit to the path of Airflow:
export PATH=$PATH:
ImportantBefore you start Airflow, you must add the address of AnalyticDB for MySQL spark-submit to the path of Airflow. Otherwise, the system may fail to find spark-submit to schedule jobs.
Create a declaration file for an Airflow DAG to declare a Spark workflow. In this example, a file named demo.py is created.
from airflow.models import DAG from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator from airflow.utils.dates import days_ago args = { 'owner': 'Aliyun ADB Spark', } with DAG( dag_id='example_spark_operator', default_args=args, schedule_interval=None, start_date=days_ago(2), tags=['example'], ) as dag: adb_spark_conf = { "spark.driver.resourceSpec": "medium", "spark.executor.resourceSpec": "medium" } # [START howto_operator_spark_submit] submit_job = SparkSubmitOperator( conf=adb_spark_conf, application="oss:///jar/pi.py", task_id="submit_job", verbose=True ) # [END howto_operator_spark_submit] # [START howto_operator_spark_sql] sql_job = SparkSqlOperator( conn_id="spark_default", sql="SELECT * FROM yourdb.yourtable", conf=",".join([k + "=" + v for k, v in adb_spark_conf.items()]), task_id="sql_job", verbose=True ) # [END howto_operator_spark_sql] submit_job >> sql_job
Store the demo.py file in the dags folder of the Airflow installation directory.
Execute the DAG. For more information, see Airflow documentation.
Interactive mode
Obtain the connection URL of the Spark interactive resource group.
Log on to the AnalyticDB for MySQL console. In the upper-left corner of the console, select a region. In the left-side navigation pane, click Clusters. On the Enterprise Edition, Basic Edition, or Data Lakehouse Edition tab, find the cluster that you want to manage and click the cluster ID.
In the left-side navigation pane, choose . On the page that appears, click the Resource Groups tab.
Find the Spark interactive resource group that you created and click Details in the Actions column to view the internal or public connection URL of the resource group. You can click the
icon within the parentheses next to the corresponding port number to copy the connection URL.
You must click Apply for Endpoint next to Public Endpoint to manually apply for a public endpoint in the following scenarios:
The client tool that is used to submit a Spark SQL job is deployed on an on-premises or external server.
The client tool that is used to submit a Spark SQL job is deployed on an Elastic Compute Service (ECS) instance that resides in a different virtual private cloud (VPC) from your AnalyticDB for MySQL cluster.
Install the apache-airflow-providers-apache-hive and apache-airflow-providers-common-sql dependencies.
Access the Airflow web interface. In the top navigation bar, choose
.Click the
button. On the Add Connections page, configure the parameters that are described in the following table.
Parameter
Description
Connection Id
The name of the connection. In this example,
adb_spark_cluster
is used.Connection Type
The type of the connection. Select Hive Server 2 Thrift.
Host
The endpoint obtained in Step 1. Replace
default
in the endpoint with the actual name of the database and delete theresource_group=<resource group name>
suffix from the endpoint.Example:
jdbc:hive2://amv-t4naxpqk****sparkwho.ads.aliyuncs.com:10000/adb_demo
.Schema
The name of the database. In this example,
adb_demo
is used.Login
The names of the database account and the resource group in the AnalyticDB for MySQL cluster. Format:
resource_group_name/database_account_name
.Example:
spark_interactive_prod/spark_user
.Password
The password of the database account of the AnalyticDB for MySQL cluster.
Port
The port number for Spark interactive resource groups. Set the value to 10000.
Extra
The authentication method. Enter the following content, which specifies that username and password authentication is used:
{ "auth_mechanism": "CUSTOM" }
Write a DAG file.
from airflow import DAG from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator from datetime import datetime default_args = { 'owner': 'airflow', 'start_date': datetime(2025, 2, 10), 'retries': 1, } dag = DAG( 'adb_spark_sql_test', default_args=default_args, schedule_interval='@daily', ) jdbc_query = SQLExecuteQueryOperator( task_id='execute_spark_sql_query', conn_id='adb_spark_cluster', sql='show databases', dag=dag ) jdbc_query
The following table describes the parameters.
Parameter
Required
Description
task_id
Yes
The job ID. You can enter a custom ID.
conn_id
Yes
The name of the connection. Enter the ID of the connection that you created in Step 4.
sql
Yes
The Spark SQL statement.
For more information about optional parameters, see Airflow parameters.
On the Airflow web interface, click the
button next to the DAG.
Schedule Spark JAR jobs
Spark Airflow Operator
Run the following command to install the Airflow Spark plug-in:
pip install https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20230608/qvjf/adb_spark_airflow-0.0.1-py3-none-any.whl
Create a connection. Example:
{ "auth_type": "AK", "access_key_id": "", "access_key_secret": "", "region": "" }
The following table describes the parameters.
Parameter
Description
auth_type
The authentication method. Set the value to AK, which specifies that AccessKey pairs are used for authentication.
access_key_id
The AccessKey ID of your Alibaba Cloud account or a Resource Access Management (RAM) user that has access permissions on AnalyticDB for MySQL.
For information about how to obtain an AccessKey ID and an AccessKey secret, see Accounts and permissions.
access_key_secret
The AccessKey secret of your Alibaba Cloud account or a RAM user that has access permissions on AnalyticDB for MySQL.
For information about how to obtain an AccessKey ID and an AccessKey secret, see Accounts and permissions.
region
The region ID of the AnalyticDB for MySQL cluster.
Create a declaration file for an Airflow DAG. In this example, a file named
spark_dags.py
is created.from datetime import datetime from airflow.models.dag import DAG from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkBatchOperator from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator with DAG( dag_id=DAG_ID, default_args={"cluster_id": "your cluster", "rg_name": "your resource group", "region": "your region"}, ) as dag: spark_pi = AnalyticDBSparkBatchOperator( task_id="task1", file="local:///tmp/spark-examples.jar", class_name="org.apache.spark.examples.SparkPi", ) spark_lr = AnalyticDBSparkBatchOperator( task_id="task2", file="local:///tmp/spark-examples.jar", class_name="org.apache.spark.examples.SparkLR", ) spark_pi >> spark_lr from tests_common.test_utils.watcher import watcher # This test needs watcher to properly mark success/failure # when "tearDown" task with trigger rule is part of the DAG list(dag.tasks) >> watcher()
DAG configuration parameters
Parameter
Required
Description
dag_id
Yes
The name of the DAG. You can enter a custom name.
default_args
Yes
cluster_id: the ID of the AnalyticDB for MySQL cluster.
rg_name: the name of the job resource group in the AnalyticDB for MySQL cluster.
region: the region ID of the AnalyticDB for MySQL cluster.
For more information about optional parameters, see DAG parameters.
AnalyticDBSparkBatchOperator configuration parameters
Parameter
Required
Description
task_id
Yes
The job ID.
file
Yes
The absolute path of the main file of the Spark application. The main file can be a JAR package that contains the entry point or an executable file that serves as the entry point for the Python application.
ImportantThe main files of Spark applications must be stored in Object Storage Service (OSS).
The OSS bucket and the AnalyticDB for MySQL cluster must reside in the same region.
class_name
Yes if specific conditions are met
The entry point of the Java or Scala application.
Python applications do not require entry points.
For more information about optional parameters, see AnalyticDBSparkBatchOperator parameters.
Store the
spark_dags.py
file in the folder where the Airflow configuration declaration file dags_folder is located.Execute the DAG. For more information, see Airflow documentation.
Spark-submit
You can configure specific parameters of AnalyticDB for MySQL in the spark-defaults.conf
file that is stored in the conf folder of AnalyticDB for MySQL Spark. You can also use Airflow parameters to configure the parameters. The specific parameters of AnalyticDB for MySQL include clusterId, regionId, keyId, secretId, and ossUploadPath. For more information, see the "Spark application configuration parameters" section of the Use spark-submit to develop Spark applications topic.
Run the following command to install the Airflow Spark plug-in:
pip3 install apache-airflow-providers-apache-spark
ImportantYou must install Python 3 before you can install the Airflow Spark plug-in.
When you install the apache-airflow-providers-apache-spark plug-in, PySpark, which is developed by the Apache Spark community, is automatically installed. If you want to uninstall PySpark, run the following command:
pip3 uninstall pyspark
Download AnalyticDB for MySQL spark-submit and configure the parameters.
Run the following command to add the address of AnalyticDB for MySQL spark-submit to the path of Airflow:
export PATH=$PATH:
ImportantBefore you start Airflow, you must add the address of AnalyticDB for MySQL spark-submit to the path of Airflow. Otherwise, the system may fail to find spark-submit to schedule jobs.
Create a declaration file for an Airflow DAG. In this example, a file named demo.py is created.
from datetime import datetime from airflow.models.dag import DAG from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkBatchOperator from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator with DAG( dag_id=DAG_ID, start_date=datetime(2021, 1, 1), schedule=None, default_args={"cluster_id": "your cluster", "rg_name": "your resource group", "region": "your region"}, max_active_runs=1, catchup=False, ) as dag: spark_pi = AnalyticDBSparkBatchOperator( task_id="task1", file="local:///tmp/spark-examples.jar", class_name="org.apache.spark.examples.SparkPi", ) spark_lr = AnalyticDBSparkBatchOperator( task_id="task2", file="local:///tmp/spark-examples.jar", class_name="org.apache.spark.examples.SparkLR", ) spark_pi >> spark_lr from tests_common.test_utils.watcher import watcher # This test needs watcher to properly mark success/failure # when "tearDown" task with trigger rule is part of the DAG list(dag.tasks) >> watcher()
Parameters:
DAG configuration parameters
Parameter
Required
Description
dag_id
Yes
The name of the DAG. You can enter a custom name.
default_args
Yes
cluster_id: the ID of the AnalyticDB for MySQL cluster.
rg_name: the name of the job resource group in the AnalyticDB for MySQL cluster.
region: the region ID of the AnalyticDB for MySQL cluster.
For more information about optional parameters, see DAG parameters.
AnalyticDBSparkBatchOperator configuration parameters
Parameter
Required
Description
task_id
Yes
The job ID.
file
Yes
The absolute path of the main file of the Spark application. The main file can be a JAR package that contains the entry point or an executable file that serves as the entry point for the Python application.
ImportantThe main files of Spark applications must be stored in Object Storage Service (OSS).
The OSS bucket and the AnalyticDB for MySQL cluster must reside in the same region.
class_name
Yes if specific conditions are met
The entry point of the Java or Scala application.
Python applications do not require entry points.
For more information about optional parameters, see AnalyticDBSparkBatchOperator parameters.
Store the demo.py file in the dags folder of the Airflow installation directory.
Execute the DAG. For more information, see Airflow documentation.