全部產品
Search
文件中心

E-MapReduce:通過Apache Airflow提交任務

更新時間:Jul 18, 2025

Apache Airflow是一個強大的工作流程自動化和調度工具,它允許開發人員編排、計劃和監控資料管道的執行。EMR Serverless Spark為處理大規模資料處理任務提供了一個無伺服器計算環境。本文為您介紹如何通過Apache Airflow實現自動化地向EMR Serverless Spark提交任務,以實現作業調度和執行的自動化,協助您更有效地管理資料處理任務。

背景資訊

Apache Livy通過REST介面與Spark進行互動,極大簡化了Spark和應用程式伺服器之間的通訊複雜度。關於Livy API,請參見REST API

在使用Apache Airflow提交任務時,您可以通過使用Livy Operator或EmrServerlessSparkStartJobRunOperator兩種方式與Serverless Spark進行互動。請根據實際情況選擇最適合的方案。

方式

適用情境

方式一:使用Livy Operator提交任務

如果您希望使用Airflow的開源Operator提交任務到Serverless Spark,可以選擇這種方式。

方式二:使用EmrServerlessSparkStartJobRunOperator提交任務

EmrServerlessSparkStartJobRunOperator是由阿里雲EMR Serverless Spark提供的一個組件,專門用於通過Airflow提交EMR Serverless Spark任務,與Airflow的DAG機制深度整合,簡化任務編排和調度。

如果您是建立專案或完全基於EMR Serverless Spark,建議選擇方式二,以實現更高的效能和更低的營運複雜度。

前提條件

注意事項

當前EmrServerlessSparkStartJobRunOperator未輸出實際作業的日誌。如果您需要查看詳細的作業日誌,請登入EMR Serverless Spark控制台,通過任務運行ID找到對應的任務執行個體,然後,您可以在日誌探查頁簽或者Spark UI中進一步檢查和分析任務日誌。

方式一:使用Livy Operator提交任務

步驟一:建立Livy Gateway及訪問Token

  1. 建立並啟動Gateway。

    1. 進入Gateway頁面。

      1. 登入E-MapReduce控制台

      2. 在左側導覽列,選擇EMR Serverless > Spark

      3. Spark頁面,單擊目標工作空間名稱。

      4. EMR Serverless Spark頁面,單擊左側導覽列中的營運中心 > Gateway

    2. 單擊Livy Gateway頁簽。

    3. Livy Gateway頁面,單擊建立Livy Gateway

    4. 在建立Gateway頁面,輸入名稱(例如,Livy-gateway),單擊建立

      其餘參數請根據具體情況進行調整,更多參數資訊請參見Gateway管理

    5. Livy Gateway頁面,單擊已建立Gateway操作列的啟動

  2. 建立Token。

    1. Gateway頁面,單擊Livy-gateway操作列的Token管理

    2. 單擊建立Token

    3. 建立Token對話方塊中,輸入名稱(例如,Livy-token),單擊確定

    4. 複製Token資訊。

      重要

      Token建立完成後,請務必立即複製新Token的資訊,後續不支援查看。如果您的Token到期或遺失,請選擇建立Token或重設Token。

步驟二:配置Apache Airflow

  1. 執行以下命令,在Apache Airflow環境中安裝Apache Livy。

    pip install apache-airflow-providers-apache-livy
  2. 添加Connection。

    UI方式

    在Airflow中找到預設為livy_default的Connection,並對其資訊進行修改;或者您也可以在Airflow Web頁面手動添加Connection,詳情請參見建立Connection

    涉及以下資訊:

    • Host:填寫為Gateway中的Endpoint資訊。

    • Schema:填寫為https

    • Extra:填寫JSON字串,x-acs-spark-livy-token為您前一個步驟中複製的Token資訊。

      {
        "x-acs-spark-livy-token": "6ac**********kfu"
      }

    CLI方式

    通過Airflow CLI執行相應命令來建立Connection,詳情請參見建立Connection

    airflow connections add 'livy_default' \
        --conn-json '{
            "conn_type": "livy",
            "host": "pre-emr-spark-livy-gateway-cn-hangzhou.data.aliyun.com/api/v1/workspace/w-xxxxxxx/livycompute/lc-xxxxxxx",   # Gateway中的Endpoint資訊。
            "schema": "https",
            "extra": {
                "x-acs-spark-livy-token": "6ac**********kfu"  # 為您前一個步驟中複製的Token資訊。
            }
        }'

步驟三: 編寫DAG並提交任務

Airflow的DAG(Directed Acyclic Graph)定義允許您聲明任務執行的方式,以下是通過Airflow使用Livy Operator執行Spark任務的樣本。

從阿里雲OSS擷取並執行Python指令檔。

from datetime import timedelta, datetime
from airflow import DAG
from airflow.providers.apache.livy.operators.livy import LivyOperator

default_args = {
    'owner': 'aliyun',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Initiate DAG
livy_operator_sparkpi_dag = DAG(
    dag_id="livy_operator_sparkpi_dag",  # DAG的唯一識別碼。
    default_args=default_args,
    schedule_interval=None,
    start_date=datetime(2024, 5, 20),
    tags=['example', 'spark', 'livy'],
    catchup=False
)

# define livy task with LivyOperator
# 請根據實際情況替換file內容。
livy_sparkpi_submit_task = LivyOperator(
    file="oss://<YourBucket>/jars/spark-examples_2.12-3.3.1.jar",
    class_name="org.apache.spark.examples.SparkPi",
    args=['1000'],
    driver_memory="1g",
    driver_cores=1,
    executor_memory="1g",
    executor_cores=2,
    num_executors=1,
    name="LivyOperator SparkPi",
    task_id="livy_sparkpi_submit_task", 
    dag=livy_operator_sparkpi_dag,
)

livy_sparkpi_submit_task

涉及參數如下表所示。

參數

描述

dag_id

DAG的唯一識別碼,用於區分不同的DAG。

schedule_interval

定義DAG的調度間隔。None表示需要手動觸發任務。

start_date

定義DAG的起始日期,表示從哪一天開始調度。

tags

為DAG添加標籤,便於分類和搜尋。

catchup

控制是否補跑歷史任務。如果為False,即使起始日期較早,也不會補跑未執行的任務。

file

為您的Spark任務對應的檔案路徑,本文樣本為上傳至阿里雲OSS上的JAR包的路徑,請您根據實際情況替換。上傳操作可參見簡單上傳

class_name

指定JAR包中的主類名。

args

傳遞給Spark任務的命令列參數。

driver_memorydriver_cores

分別指定Driver的記憶體大小和核心數。

executor_memoryexecutor_cores

分別指定每個Executor的記憶體大小和核心數。

num_executors

指定Executor的數量。

name

Spark任務的名稱。

task_id

Airflow任務的唯一識別碼。

dag

將任務與DAG關聯起來。

方式二:使用EmrServerlessSparkStartJobRunOperator提交任務

步驟一:配置Apache Airflow

  1. 下載airflow_alibaba_provider-0.0.3-py3-none-any.whl

  2. 在Airflow的每個節點上安裝airflow-alibaba-provider外掛程式。

    airflow-alibaba-provider外掛程式是由EMR Serverless Spark團隊提供,包含了一個專門用於提交EMR Serverless Spark任務的EmrServerlessSparkStartJobRunOperator組件。

    pip install airflow_alibaba_provider-0.0.3-py3-none-any.whl
  3. 添加Connection。

    CLI方式

    通過Airflow CLI執行相應命令來建立Connection,詳情請參見建立Connection

    airflow connections add 'emr-serverless-spark-id' \
        --conn-json '{
            "conn_type": "emr_serverless_spark",
            "extra": {
                "auth_type": "AK",  #指定使用阿里雲的AccessKey(AK)方式認證。
                "access_key_id": "<yourAccesskeyId>",  # 阿里雲帳號的AccessKey ID。
                "access_key_secret": "<yourAccesskeyKey>",  # 阿里雲帳號的AccessKey Secret。
                "region": "<yourRegion>"
            }
        }'

    UI方式

    通過在Airflow Web頁面手動添加Connection,詳情請參見建立Connection

    Add Connection頁面,配置以下資訊。

    image

    涉及參數如下表所示。

    參數

    說明

    Connection Id

    本文樣本為emr-serverless-spark-id。

    Connection Type

    選擇Generic。如果沒有該類型,您也可以選擇Email

    Extra

    填寫內容如下。

    {
                "auth_type": "AK",  #指定使用阿里雲的AccessKey(AK)方式認證。
                "access_key_id": "<yourAccesskeyId>",  # 阿里雲帳號的AccessKey ID。
                "access_key_secret": "<yourAccesskeyKey>",  # 阿里雲帳號的AccessKey Secret。
                "region": "<yourRegion>"
            }

步驟二:編寫DAG並提交任務

Airflow的DAG(Directed Acyclic Graph)定義允許您聲明任務執行的方式,以下是通過Airflow使用EmrServerlessSparkStartJobRunOperator執行不同類型的Spark作業的樣本。

提交JAR包

此情境涉及使用Airflow任務提交一個先行編譯的Spark JAR作業到阿里雲EMR Serverless Spark。

from __future__ import annotations

from datetime import datetime

from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator

# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"

DAG_ID = "emr_spark_jar"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime(2024, 5, 1),
    default_args={},
    max_active_runs=1,
    catchup=False,
) as dag:
    emr_spark_jar = EmrServerlessSparkStartJobRunOperator(
        task_id="emr_spark_jar",
        emr_serverless_spark_conn_id="emr-serverless-spark-id",
        region="cn-hangzhou",
        polling_interval=5,
        workspace_id="w-7e2f1750c6b3****",
        resource_queue_id="root_queue",
        code_type="JAR",
        name="airflow-emr-spark-jar",
        entry_point="oss://<YourBucket>/spark-resource/examples/jars/spark-examples_2.12-3.3.1.jar",
        entry_point_args=["1"],
        spark_submit_parameters="--class org.apache.spark.examples.SparkPi --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
        is_prod=True,
        engine_release_version=None
    )

    emr_spark_jar

提交SQL檔案

在Airflow DAG中直接執行SQL命令。

from __future__ import annotations

from datetime import datetime

from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator

# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "emr_spark_sql"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime(2024, 5, 1),
    default_args={},
    max_active_runs=1,
    catchup=False,
) as dag:
    emr_spark_sql = EmrServerlessSparkStartJobRunOperator(
        task_id="emr_spark_sql",
        emr_serverless_spark_conn_id="emr-serverless-spark-id",
        region="cn-hangzhou",
        polling_interval=5,
        workspace_id="w-7e2f1750c6b3****",
        resource_queue_id="root_queue",
        code_type="SQL",
        name="airflow-emr-spark-sql",
        entry_point=None,
        entry_point_args=["-e","show tables;show tables;"],
        spark_submit_parameters="--class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
        is_prod=True,
        engine_release_version=None,
    )

    emr_spark_sql

從OSS提交SQL檔案

從阿里雲OSS擷取並執行SQL指令檔。

from __future__ import annotations

from datetime import datetime

from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator

# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"

DAG_ID = "emr_spark_sql_2"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime(2024, 5, 1),
    default_args={},
    max_active_runs=1,
    catchup=False,
) as dag:
    emr_spark_sql_2 = EmrServerlessSparkStartJobRunOperator(
        task_id="emr_spark_sql_2",
        emr_serverless_spark_conn_id="emr-serverless-spark-id",
        region="cn-hangzhou",
        polling_interval=5,
        workspace_id="w-ae42e9c92927****",
        resource_queue_id="root_queue",
        code_type="SQL",
        name="airflow-emr-spark-sql-2",
        entry_point="",
        entry_point_args=["-f", "oss://<YourBucket>/spark-resource/examples/sql/show_db.sql"],
        spark_submit_parameters="--class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
        is_prod=True,
        engine_release_version=None
    )

    emr_spark_sql_2

從OSS提交Python指令碼

從阿里雲OSS擷取並執行Python指令檔。

from __future__ import annotations

from datetime import datetime

from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator

# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"

DAG_ID = "emr_spark_python"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime(2024, 5, 1),
    default_args={},
    max_active_runs=1,
    catchup=False,
) as dag:
    emr_spark_python = EmrServerlessSparkStartJobRunOperator(
        task_id="emr_spark_python",
        emr_serverless_spark_conn_id="emr-serverless-spark-id",
        region="cn-hangzhou",
        polling_interval=5,
        workspace_id="w-ae42e9c92927****",
        resource_queue_id="root_queue",
        code_type="PYTHON",
        name="airflow-emr-spark-python",
        entry_point="oss://<YourBucket>/spark-resource/examples/src/main/python/pi.py",
        entry_point_args=["1"],
        spark_submit_parameters="--conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
        is_prod=True,
        engine_release_version=None
    )

    emr_spark_python

涉及參數如下表所示。

參數

參數類型

描述

task_id

str

指定Airflow任務的唯一識別碼。

emr_serverless_spark_conn_id

str

指定Airflow用於串連EMR Serverless Spark的Connection ID。

region

str

指定EMR Spark所處的地區。

polling_interval

int

設定Airflow輪詢任務狀態的時間間隔,單位為秒。

workspace_id

str

EMR Spark工作區的唯一識別碼。

resource_queue_id

str

用於指定EMR Spark任務所使用資源隊列的ID。

code_type

str

任務類型,可以是SQL、Python或JAR,根據任務類型,entry_point參數將有不同的含義。

name

str

EMR Spark任務的名稱。

entry_point

str

指定啟動任務的檔案位置,例如JAR、SQL或Python檔案。根據code_type的不同,此參數代表的含義不同。

entry_point_args

List

傳遞給Spark程式的參數列表。

spark_submit_parameters

str

包含用於spark-submit命令的額外參數。

is_prod

bool

指定任務啟動並執行環境。當設定為True時,則表明任務將在生產環境中執行,resource_queue_id應指定對應生產環境的資源隊列ID,例如root_queue。

engine_release_version

str

設定EMR Spark引擎的版本。預設值是"esr-2.1-native",對應Spark版本為3.3.1和Scala版本為2.12,使用原生運行時。