全部產品
Search
文件中心

AnalyticDB:Airflow調度Spark

更新時間:Jul 25, 2025

Airflow是比較流行的開源調度工具,可以實現各類工作負載的DAG編排與調度。您可以通過Spark Airflow Operator、Spark-Submit命令列工具來調度Spark任務。本文介紹如何通過Airflow調度AnalyticDB for MySQL Spark作業。

前提條件

調度Spark SQL作業

AnalyticDB for MySQL支援使用批處理互動式兩種方法執行Spark SQL。選擇的執行方式不同,調度的操作步驟也有所不同。詳細步驟如下:

批處理

Spark Airflow Operator命令列工具

  1. 安裝Airflow Spark外掛程式。執行如下命令:

    pip install https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20230608/qvjf/adb_spark_airflow-0.0.1-py3-none-any.whl
  2. 建立Connection,樣本如下:

    {
      "auth_type": "AK",
      "access_key_id": "<your_access_key_ID>",
      "access_key_secret": "<your_access_key_secret>",
      "region": "<your_region>"
    }

    參數說明:

    參數

    說明

    auth_type

    認證方式,固定填寫為AK,表示使用AK認證。

    access_key_id

    阿里雲帳號或具備AnalyticDB for MySQL存取權限的RAM使用者的AccessKey ID。

    如何擷取AccessKey ID和AccessKey Secret,請參見帳號與許可權

    access_key_secret

    阿里雲帳號或具備AnalyticDB for MySQL存取權限的RAM使用者的AccessKey Secret。

    如何擷取AccessKey ID和AccessKey Secret,請參見帳號與許可權

    region

    AnalyticDB for MySQL叢集的地區ID。

  3. 建立DAG聲明Spark工作流程,本文的DAG聲明檔案為spark_dags.py

    from datetime import datetime
    
    from airflow.models.dag import DAG
    from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator
    
    with DAG(
        dag_id="my_dag_name",
        default_args={"cluster_id": "<your_cluster_ID>", "rg_name": "<your_resource_group>", "region": "<your_region>"},
    ) as dag:
    
        spark_sql = AnalyticDBSparkSQLOperator(
            task_id="task2",
            sql="SHOW DATABASES;"
        )
    
        spark_sql
    

    參數說明如下:

    DAG配置參數

    參數

    是否必填

    說明

    dag_id

    DAG的名稱,您可以自訂。

    default_args

    • cluster_id:AnalyticDB for MySQL叢集ID。

    • rg_name:AnalyticDB for MySQL叢集Job型資源群組名稱。

    • region:AnalyticDB for MySQL叢集的地區ID。

    更多選填參數及說明,請參見DAG參數說明

    AnalyticDBSparkSQLOperator配置參數

    參數

    是否必填

    說明

    task_id

    任務ID。

    SQL

    Spark SQL語句。

    更多選填參數及說明,請參見Airflow參數說明

  4. spark_dags.py檔案存放至Airflow Configuration聲明dags_folder所在的檔案夾中。

  5. 執行DAG。具體操作請參見Airflow社區文檔

Spark-Submit命令列工具

說明

對於AnalyticDB for MySQL特有的配置項,例如clusterId、regionId、keyId和secretId,您可以在AnalyticDB for MySQL Spark工具包的設定檔conf/spark-defaults.conf中進行配置,也可以通過Airflow參數來配置。詳情請參見Spark應用配置參數

  1. 安裝Airflow Spark外掛程式。執行如下命令:

    pip3 install apache-airflow-providers-apache-spark
    重要
    • 您需要使用Python3來安裝Airflow Spark外掛程式。

    • 安裝apache-airflow-providers-apache-spark會預設安裝社區版Pyspark,需要執行如下命令將pyspark卸載。

      pip3 uninstall pyspark
  2. 下載Spark-Submit命令列工具包並進行配置

  3. 配置PATH路徑。執行以下命令,將Spark-Submit命令列工具的地址加入Airflow執行地址。

    export PATH=$PATH:</your/adb/spark/path/bin>
    重要

    在啟動Airflow之前需要將Spark-Submit加入到PATH中,否則調度任務可能會找不到Spark-Submit命令。

  4. 準備DAG聲明檔案。本文以建立Airflow DAG的demo.py檔案為例。

    from airflow.models import DAG
    from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator
    from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
    from airflow.utils.dates import days_ago
    args = {
        'owner': 'Aliyun ADB Spark',
    }
    with DAG(
        dag_id='example_spark_operator',
        default_args=args,
        schedule_interval=None,
        start_date=days_ago(2),
        tags=['example'],
    ) as dag:
        adb_spark_conf = {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.resourceSpec": "medium"
        }
        # [START howto_operator_spark_submit]
        submit_job = SparkSubmitOperator(
            conf=adb_spark_conf,
            application="oss://<bucket_name>/jar/pi.py",
            task_id="submit_job",
            verbose=True
        )
        # [END howto_operator_spark_submit]
        # [START howto_operator_spark_sql]
        sql_job = SparkSqlOperator(
            conn_id="spark_default",
            sql="SELECT * FROM yourdb.yourtable",
            conf=",".join([k + "=" + v for k, v in adb_spark_conf.items()]),
            task_id="sql_job",
            verbose=True
        )
        # [END howto_operator_spark_sql]
        submit_job >> sql_job
    
  5. 將編輯完成的demo.py檔案放至Airflow安裝目錄的dags目錄下。

  6. 執行DAG。具體操作請參見Airflow社區文檔

互動式

  1. 擷取Spark Interactive型資源群組的串連地址。

    1. 登入雲原生資料倉儲AnalyticDB MySQL控制台,在左上方選擇叢集所在地區。在左側導覽列,單擊集群清單,然後單擊目的地組群ID。

    2. 在左側導覽列,單擊集群管理 > 資源管理,單擊資源組管理頁簽。

    3. 單擊對應資源群組操作列的詳情,查看內網串連地址和公網串連地址。您可單擊連接埠號碼括弧內的image按鈕,複製串連地址。

      以下兩種情況,您需要單擊公網地址後的申請網路,手動申請公網串連地址。

      • 提交Spark SQL作業的用戶端工具部署在本地。

      • 提交Spark SQL作業的用戶端工具部署在ECS上,且ECSAnalyticDB for MySQL不屬於同一VPC。

  2. 安裝apache-airflow-providers-apache-hiveapache-airflow-providers-common-sql依賴。

  3. 訪問Airflow Web介面,在頂部導覽列單擊Admin > Connections

  4. 單擊image按鈕,在Add Connections頁面配置如下參數:

    參數

    說明

    Connection Id

    串連名稱。本文樣本為adb_spark_cluster

    Connection Type

    選擇Hive Server 2 Thrift

    Host

    請填寫步驟1中擷取的串連地址。串連地址中的default需替換為實際的資料庫名,並且需要刪除串連地址中的resource_group=<資源群組名稱>尾碼。

    例如:jdbc:hive2://amv-t4naxpqk****sparkwho.ads.aliyuncs.com:10000/adb_demo

    Schema

    串連的資料庫。本文樣本為adb_demo

    Login

    AnalyticDB for MySQL的資料庫帳號及Interactive型資源群組名稱。格式為資源群組名稱/資料庫帳號名稱

    例如:本文樣本資源群組名稱為spark_interactive_prod,資料庫帳號名稱為spark_user,此處填寫為spark_interactive_prod/spark_user

    Password

    AnalyticDB for MySQL資料庫帳號的密碼。

    Port

    Spark Interactive型資源群組的連接埠號碼,固定為10000

    Extra

    認證方式,固定填寫以下內容,表示使用使用者名稱和密碼認證。

    {
      "auth_mechanism": "CUSTOM"
    }
  5. 編寫DAG檔案。

    from airflow import DAG
    from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
    from datetime import datetime
    
    default_args = {
        'owner': 'airflow',
        'start_date': datetime(2025, 2, 10),
        'retries': 1,
    }
    
    dag = DAG(
        'adb_spark_sql_test',
        default_args=default_args,
        schedule_interval='@daily',
    )
    
    
    jdbc_query = SQLExecuteQueryOperator(
        task_id='execute_spark_sql_query', 
        conn_id='adb_spark_cluster',  
        sql='show databases',  
        dag=dag
    )
    
    jdbc_query

    參數說明:

    參數

    是否必填

    說明

    task_id

    任務ID。您可以自訂。

    conn_id

    串連名稱。此處填寫步驟4建立的Connection ID。

    sql

    Spark SQL語句。

    更多選填參數及說明,請參見Airflow參數說明

  6. 在Airflow Web介面,單擊對應DAG右側image按鈕。

調度Spark Jar作業

Spark Airflow Operator命令列工具

  1. 安裝Airflow Spark外掛程式。執行如下命令:

    pip install https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20230608/qvjf/adb_spark_airflow-0.0.1-py3-none-any.whl
  2. 建立Connection,樣本如下:

    {
      "auth_type": "AK",
      "access_key_id": "<your_access_key_ID>",
      "access_key_secret": "<your_access_key_secret>",
      "region": "<your_region>"
    }

    參數說明:

    參數

    說明

    auth_type

    認證方式,固定填寫為AK,表示使用AK認證。

    access_key_id

    阿里雲帳號或具備AnalyticDB for MySQL存取權限的RAM使用者的AccessKey ID。

    如何擷取AccessKey ID和AccessKey Secret,請參見帳號與許可權

    access_key_secret

    阿里雲帳號或具備AnalyticDB for MySQL存取權限的RAM使用者的AccessKey Secret。

    如何擷取AccessKey ID和AccessKey Secret,請參見帳號與許可權

    region

    AnalyticDB for MySQL叢集的地區ID。

  3. 建立DAG聲明Spark工作流程,本文的DAG聲明檔案為 spark_dags.py

    from datetime import datetime
    
    from airflow.models.dag import DAG
    from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkBatchOperator
    from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator
    
    with DAG(
        dag_id=DAG_ID,
        default_args={"cluster_id": "your cluster", "rg_name": "your resource group", "region": "your region"},
    ) as dag:
        spark_pi = AnalyticDBSparkBatchOperator(
            task_id="task1",
            file="local:///tmp/spark-examples.jar",
            class_name="org.apache.spark.examples.SparkPi",
        )
    
        spark_lr = AnalyticDBSparkBatchOperator(
            task_id="task2",
            file="local:///tmp/spark-examples.jar",
            class_name="org.apache.spark.examples.SparkLR",
        )
    
        spark_pi >> spark_lr
    
        from tests_common.test_utils.watcher import watcher
    
        # This test needs watcher in order to properly mark success/failure
        # when "tearDown" task with trigger rule is part of the DAG
        list(dag.tasks) >> watcher()
        

    DAG配置參數

    參數

    是否必填

    說明

    dag_id

    DAG的名稱,您可以自訂。

    default_args

    • cluster_id:AnalyticDB for MySQL叢集ID。

    • rg_name:AnalyticDB for MySQL叢集Job型資源群組名稱。

    • region:AnalyticDB for MySQL叢集的地區ID。

    更多選填參數及說明,請參見DAG參數說明

    AnalyticDBSparkBatchOperator配置參數

    參數

    是否必填

    說明

    task_id

    任務ID。

    file

    Spark應用主檔案的儲存路徑,檔案路徑需為絕對路徑。主檔案是入口類所在的JAR包或者Python的入口執行檔案。

    重要

    Spark應用主檔案目前只支援儲存在OSS中。

    OSS Bucket與AnalyticDB for MySQL叢集需要在同一地區。

    class_name

    條件必填

    • Java或Scala程式入口類名稱,必填參數。

    • Python不需要指定入口類,非必填參數。

    更多選填參數及說明,請參見AnalyticDBSparkBatchOperator參數說明

  4. spark_dags.py檔案存放至Airflow Configuration聲明dags_folder所在的檔案夾中。

  5. 執行DAG。具體操作請參見Airflow社區文檔

Spark-Submit命令列工具

說明

對於AnalyticDB for MySQL特有的配置項,例如clusterId、regionId、keyId、secretId、ossUploadPath,您可以在AnalyticDB for MySQL Spark工具包的設定檔conf/spark-defaults.conf中進行配置,也可以通過Airflow參數來配置。詳情請參見Spark應用配置參數

  1. 安裝Airflow Spark外掛程式。執行如下命令:

    pip3 install apache-airflow-providers-apache-spark
    重要
    • 您需要使用Python3來安裝Airflow Spark外掛程式。

    • 安裝apache-airflow-providers-apache-spark會預設安裝社區版Pyspark,需要執行如下命令將pyspark卸載。

      pip3 uninstall pyspark
  2. 下載Spark-Submit命令列工具包並進行配置

  3. 配置PATH路徑。執行以下命令,將Spark-Submit命令列工具的地址加入Airflow執行地址。

    export PATH=$PATH:</your/adb/spark/path/bin>
    重要

    在啟動Airflow之前需要將Spark-Submit加入到PATH中,否則調度任務可能會找不到Spark-Submit命令。

  4. 準備DAG聲明檔案。本文以建立Airflow DAG的demo.py檔案為例。

    from datetime import datetime
    
    from airflow.models.dag import DAG
    from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkBatchOperator
    from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator
    
    with DAG(
        dag_id=DAG_ID,
        start_date=datetime(2021, 1, 1),
        schedule=None,
        default_args={"cluster_id": "your cluster", "rg_name": "your resource group", "region": "your region"},
        max_active_runs=1,
        catchup=False,
    ) as dag:
        spark_pi = AnalyticDBSparkBatchOperator(
            task_id="task1",
            file="local:///tmp/spark-examples.jar",
            class_name="org.apache.spark.examples.SparkPi",
        )
    
        spark_lr = AnalyticDBSparkBatchOperator(
            task_id="task2",
            file="local:///tmp/spark-examples.jar",
            class_name="org.apache.spark.examples.SparkLR",
        )
    
        spark_pi >> spark_lr
    
        from tests_common.test_utils.watcher import watcher
    
        # This test needs watcher in order to properly mark success/failure
        # when "tearDown" task with trigger rule is part of the DAG
        list(dag.tasks) >> watcher()
        

    參數說明:

    DAG配置參數

    參數

    是否必填

    說明

    dag_id

    DAG的名稱,您可以自訂。

    default_args

    • cluster_id:AnalyticDB for MySQL叢集ID。

    • rg_name:AnalyticDB for MySQL叢集Job型資源群組名稱。

    • region:AnalyticDB for MySQL叢集的地區ID。

    更多選填參數及說明,請參見DAG參數說明

    AnalyticDBSparkBatchOperator配置參數

    參數

    是否必填

    說明

    task_id

    任務ID。

    file

    Spark應用主檔案的儲存路徑,檔案路徑需為絕對路徑。主檔案是入口類所在的JAR包或者Python的入口執行檔案。

    重要

    Spark應用主檔案目前只支援儲存在OSS中。

    OSS Bucket與AnalyticDB for MySQL叢集需要在同一地區。

    class_name

    條件必填

    • Java或Scala程式入口類名稱,必填參數。

    • Python不需要指定入口類,非必填參數。

    更多選填參數及說明,請參見AnalyticDBSparkBatchOperator參數說明

  5. 將編輯完成的demo.py檔案放至Airflow安裝目錄的dags目錄下。

  6. 執行DAG。具體操作請參見Airflow社區文檔