Airflow是比較流行的開源調度工具,可以實現各類工作負載的DAG編排與調度。您可以通過Spark Airflow Operator、Spark-Submit命令列工具來調度Spark任務。本文介紹如何通過Airflow調度AnalyticDB for MySQL Spark作業。
前提條件
AnalyticDB for MySQL叢集的產品系列為企業版、基礎版或湖倉版。
AnalyticDB for MySQL叢集中已建立Job型資源群組或Spark引擎的Interactive型資源群組。
已安裝Python環境,且Python版本為3.7及以上版本。
已安裝Airflow,且Airflow的版本為2.9.0及以上版本。
已將運行Airflow的伺服器IP地址添加至AnalyticDB for MySQL叢集的白名單中。
調度Spark SQL作業
AnalyticDB for MySQL支援使用批處理和互動式兩種方法執行Spark SQL。選擇的執行方式不同,調度的操作步驟也有所不同。詳細步驟如下:
批處理
Spark Airflow Operator命令列工具
安裝Airflow Spark外掛程式。執行如下命令:
pip install https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20230608/qvjf/adb_spark_airflow-0.0.1-py3-none-any.whl建立Connection,樣本如下:
{ "auth_type": "AK", "access_key_id": "<your_access_key_ID>", "access_key_secret": "<your_access_key_secret>", "region": "<your_region>" }參數說明:
參數
說明
auth_type
認證方式,固定填寫為AK,表示使用AK認證。
access_key_id
阿里雲帳號或具備AnalyticDB for MySQL存取權限的RAM使用者的AccessKey ID。
如何擷取AccessKey ID和AccessKey Secret,請參見帳號與許可權。
access_key_secret
阿里雲帳號或具備AnalyticDB for MySQL存取權限的RAM使用者的AccessKey Secret。
如何擷取AccessKey ID和AccessKey Secret,請參見帳號與許可權。
region
AnalyticDB for MySQL叢集的地區ID。
建立DAG聲明Spark工作流程,本文的DAG聲明檔案為
spark_dags.py。from datetime import datetime from airflow.models.dag import DAG from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator with DAG( dag_id="my_dag_name", default_args={"cluster_id": "<your_cluster_ID>", "rg_name": "<your_resource_group>", "region": "<your_region>"}, ) as dag: spark_sql = AnalyticDBSparkSQLOperator( task_id="task2", sql="SHOW DATABASES;" ) spark_sql參數說明如下:
DAG配置參數
參數
是否必填
說明
dag_id
是
DAG的名稱,您可以自訂。
default_args
是
cluster_id:AnalyticDB for MySQL叢集ID。
rg_name:AnalyticDB for MySQL叢集Job型資源群組名稱。
region:AnalyticDB for MySQL叢集的地區ID。
更多選填參數及說明,請參見DAG參數說明。
AnalyticDBSparkSQLOperator配置參數
參數
是否必填
說明
task_id
是
任務ID。
SQL
是
Spark SQL語句。
更多選填參數及說明,請參見Airflow參數說明。
將
spark_dags.py檔案存放至Airflow Configuration聲明dags_folder所在的檔案夾中。執行DAG。具體操作請參見Airflow社區文檔。
Spark-Submit命令列工具
對於AnalyticDB for MySQL特有的配置項,例如clusterId、regionId、keyId和secretId,您可以在AnalyticDB for MySQL Spark工具包的設定檔conf/spark-defaults.conf中進行配置,也可以通過Airflow參數來配置。詳情請參見Spark應用配置參數。
安裝Airflow Spark外掛程式。執行如下命令:
pip3 install apache-airflow-providers-apache-spark重要您需要使用Python3來安裝Airflow Spark外掛程式。
安裝apache-airflow-providers-apache-spark會預設安裝社區版Pyspark,需要執行如下命令將pyspark卸載。
pip3 uninstall pyspark
配置PATH路徑。執行以下命令,將Spark-Submit命令列工具的地址加入Airflow執行地址。
export PATH=$PATH:</your/adb/spark/path/bin>重要在啟動Airflow之前需要將Spark-Submit加入到PATH中,否則調度任務可能會找不到Spark-Submit命令。
準備DAG聲明檔案。本文以建立Airflow DAG的demo.py檔案為例。
from airflow.models import DAG from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator from airflow.utils.dates import days_ago args = { 'owner': 'Aliyun ADB Spark', } with DAG( dag_id='example_spark_operator', default_args=args, schedule_interval=None, start_date=days_ago(2), tags=['example'], ) as dag: adb_spark_conf = { "spark.driver.resourceSpec": "medium", "spark.executor.resourceSpec": "medium" } # [START howto_operator_spark_submit] submit_job = SparkSubmitOperator( conf=adb_spark_conf, application="oss://<bucket_name>/jar/pi.py", task_id="submit_job", verbose=True ) # [END howto_operator_spark_submit] # [START howto_operator_spark_sql] sql_job = SparkSqlOperator( conn_id="spark_default", sql="SELECT * FROM yourdb.yourtable", conf=",".join([k + "=" + v for k, v in adb_spark_conf.items()]), task_id="sql_job", verbose=True ) # [END howto_operator_spark_sql] submit_job >> sql_job將編輯完成的demo.py檔案放至Airflow安裝目錄的dags目錄下。
執行DAG。具體操作請參見Airflow社區文檔。
互動式
擷取Spark Interactive型資源群組的串連地址。
登入雲原生資料倉儲AnalyticDB MySQL控制台,在左上方選擇叢集所在地區。在左側導覽列,單擊集群清單,然後單擊目的地組群ID。
在左側導覽列,單擊,單擊資源組管理頁簽。
單擊對應資源群組操作列的詳情,查看內網串連地址和公網串連地址。您可單擊連接埠號碼括弧內的
按鈕,複製串連地址。以下兩種情況,您需要單擊公網地址後的申請網路,手動申請公網串連地址。
提交Spark SQL作業的用戶端工具部署在本地。
提交Spark SQL作業的用戶端工具部署在ECS上,且ECS與AnalyticDB for MySQL不屬於同一VPC。
安裝apache-airflow-providers-apache-hive和apache-airflow-providers-common-sql依賴。
訪問Airflow Web介面,在頂部導覽列單擊。
單擊
按鈕,在Add Connections頁面配置如下參數:參數
說明
Connection Id
串連名稱。本文樣本為
adb_spark_cluster。Connection Type
選擇Hive Server 2 Thrift。
Host
請填寫步驟1中擷取的串連地址。串連地址中的
default需替換為實際的資料庫名,並且需要刪除串連地址中的resource_group=<資源群組名稱>尾碼。例如:
jdbc:hive2://amv-t4naxpqk****sparkwho.ads.aliyuncs.com:10000/adb_demo。Schema
串連的資料庫。本文樣本為
adb_demo。Login
AnalyticDB for MySQL的資料庫帳號及Interactive型資源群組名稱。格式為
資源群組名稱/資料庫帳號名稱。例如:本文樣本資源群組名稱為spark_interactive_prod,資料庫帳號名稱為spark_user,此處填寫為
spark_interactive_prod/spark_user。Password
AnalyticDB for MySQL資料庫帳號的密碼。
Port
Spark Interactive型資源群組的連接埠號碼,固定為10000。
Extra
認證方式,固定填寫以下內容,表示使用使用者名稱和密碼認證。
{ "auth_mechanism": "CUSTOM" }編寫DAG檔案。
from airflow import DAG from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator from datetime import datetime default_args = { 'owner': 'airflow', 'start_date': datetime(2025, 2, 10), 'retries': 1, } dag = DAG( 'adb_spark_sql_test', default_args=default_args, schedule_interval='@daily', ) jdbc_query = SQLExecuteQueryOperator( task_id='execute_spark_sql_query', conn_id='adb_spark_cluster', sql='show databases', dag=dag ) jdbc_query參數說明:
參數
是否必填
說明
task_id
是
任務ID。您可以自訂。
conn_id
是
串連名稱。此處填寫步驟4建立的Connection ID。
sql
是
Spark SQL語句。
更多選填參數及說明,請參見Airflow參數說明。
在Airflow Web介面,單擊對應DAG右側
按鈕。
調度Spark Jar作業
Spark Airflow Operator命令列工具
安裝Airflow Spark外掛程式。執行如下命令:
pip install https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20230608/qvjf/adb_spark_airflow-0.0.1-py3-none-any.whl建立Connection,樣本如下:
{ "auth_type": "AK", "access_key_id": "<your_access_key_ID>", "access_key_secret": "<your_access_key_secret>", "region": "<your_region>" }參數說明:
參數
說明
auth_type
認證方式,固定填寫為AK,表示使用AK認證。
access_key_id
阿里雲帳號或具備AnalyticDB for MySQL存取權限的RAM使用者的AccessKey ID。
如何擷取AccessKey ID和AccessKey Secret,請參見帳號與許可權。
access_key_secret
阿里雲帳號或具備AnalyticDB for MySQL存取權限的RAM使用者的AccessKey Secret。
如何擷取AccessKey ID和AccessKey Secret,請參見帳號與許可權。
region
AnalyticDB for MySQL叢集的地區ID。
建立DAG聲明Spark工作流程,本文的DAG聲明檔案為
spark_dags.py。from datetime import datetime from airflow.models.dag import DAG from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkBatchOperator from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator with DAG( dag_id=DAG_ID, default_args={"cluster_id": "your cluster", "rg_name": "your resource group", "region": "your region"}, ) as dag: spark_pi = AnalyticDBSparkBatchOperator( task_id="task1", file="local:///tmp/spark-examples.jar", class_name="org.apache.spark.examples.SparkPi", ) spark_lr = AnalyticDBSparkBatchOperator( task_id="task2", file="local:///tmp/spark-examples.jar", class_name="org.apache.spark.examples.SparkLR", ) spark_pi >> spark_lr from tests_common.test_utils.watcher import watcher # This test needs watcher in order to properly mark success/failure # when "tearDown" task with trigger rule is part of the DAG list(dag.tasks) >> watcher()DAG配置參數
參數
是否必填
說明
dag_id
是
DAG的名稱,您可以自訂。
default_args
是
cluster_id:AnalyticDB for MySQL叢集ID。
rg_name:AnalyticDB for MySQL叢集Job型資源群組名稱。
region:AnalyticDB for MySQL叢集的地區ID。
更多選填參數及說明,請參見DAG參數說明。
AnalyticDBSparkBatchOperator配置參數
參數
是否必填
說明
task_id
是
任務ID。
file
是
Spark應用主檔案的儲存路徑,檔案路徑需為絕對路徑。主檔案是入口類所在的JAR包或者Python的入口執行檔案。
重要Spark應用主檔案目前只支援儲存在OSS中。
OSS Bucket與AnalyticDB for MySQL叢集需要在同一地區。
class_name
條件必填
Java或Scala程式入口類名稱,必填參數。
Python不需要指定入口類,非必填參數。
更多選填參數及說明,請參見AnalyticDBSparkBatchOperator參數說明。
將
spark_dags.py檔案存放至Airflow Configuration聲明dags_folder所在的檔案夾中。執行DAG。具體操作請參見Airflow社區文檔。
Spark-Submit命令列工具
對於AnalyticDB for MySQL特有的配置項,例如clusterId、regionId、keyId、secretId、ossUploadPath,您可以在AnalyticDB for MySQL Spark工具包的設定檔conf/spark-defaults.conf中進行配置,也可以通過Airflow參數來配置。詳情請參見Spark應用配置參數。
安裝Airflow Spark外掛程式。執行如下命令:
pip3 install apache-airflow-providers-apache-spark重要您需要使用Python3來安裝Airflow Spark外掛程式。
安裝apache-airflow-providers-apache-spark會預設安裝社區版Pyspark,需要執行如下命令將pyspark卸載。
pip3 uninstall pyspark
配置PATH路徑。執行以下命令,將Spark-Submit命令列工具的地址加入Airflow執行地址。
export PATH=$PATH:</your/adb/spark/path/bin>重要在啟動Airflow之前需要將Spark-Submit加入到PATH中,否則調度任務可能會找不到Spark-Submit命令。
準備DAG聲明檔案。本文以建立Airflow DAG的demo.py檔案為例。
from datetime import datetime from airflow.models.dag import DAG from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkBatchOperator from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator with DAG( dag_id=DAG_ID, start_date=datetime(2021, 1, 1), schedule=None, default_args={"cluster_id": "your cluster", "rg_name": "your resource group", "region": "your region"}, max_active_runs=1, catchup=False, ) as dag: spark_pi = AnalyticDBSparkBatchOperator( task_id="task1", file="local:///tmp/spark-examples.jar", class_name="org.apache.spark.examples.SparkPi", ) spark_lr = AnalyticDBSparkBatchOperator( task_id="task2", file="local:///tmp/spark-examples.jar", class_name="org.apache.spark.examples.SparkLR", ) spark_pi >> spark_lr from tests_common.test_utils.watcher import watcher # This test needs watcher in order to properly mark success/failure # when "tearDown" task with trigger rule is part of the DAG list(dag.tasks) >> watcher()參數說明:
DAG配置參數
參數
是否必填
說明
dag_id
是
DAG的名稱,您可以自訂。
default_args
是
cluster_id:AnalyticDB for MySQL叢集ID。
rg_name:AnalyticDB for MySQL叢集Job型資源群組名稱。
region:AnalyticDB for MySQL叢集的地區ID。
更多選填參數及說明,請參見DAG參數說明。
AnalyticDBSparkBatchOperator配置參數
參數
是否必填
說明
task_id
是
任務ID。
file
是
Spark應用主檔案的儲存路徑,檔案路徑需為絕對路徑。主檔案是入口類所在的JAR包或者Python的入口執行檔案。
重要Spark應用主檔案目前只支援儲存在OSS中。
OSS Bucket與AnalyticDB for MySQL叢集需要在同一地區。
class_name
條件必填
Java或Scala程式入口類名稱,必填參數。
Python不需要指定入口類,非必填參數。
更多選填參數及說明,請參見AnalyticDBSparkBatchOperator參數說明。
將編輯完成的demo.py檔案放至Airflow安裝目錄的dags目錄下。
執行DAG。具體操作請參見Airflow社區文檔。