Apache Airflow是一個強大的工作流程自動化和調度工具,它允許開發人員編排、計劃和監控資料管道的執行。EMR Serverless Spark為處理大規模資料處理任務提供了一個無伺服器計算環境。本文為您介紹如何通過Apache Airflow實現自動化地向EMR Serverless Spark提交任務,以實現作業調度和執行的自動化,協助您更有效地管理資料處理任務。
背景資訊
Apache Livy通過REST介面與Spark進行互動,極大簡化了Spark和應用程式伺服器之間的通訊複雜度。關於Livy API,請參見REST API。
在使用Apache Airflow提交任務時,您可以通過使用Livy Operator或EmrServerlessSparkStartJobRunOperator兩種方式與Serverless Spark進行互動。請根據實際情況選擇最適合的方案。
方式 | 適用情境 |
如果您希望使用Airflow的開源Operator提交任務到Serverless Spark,可以選擇這種方式。 | |
EmrServerlessSparkStartJobRunOperator是由阿里雲EMR Serverless Spark提供的一個組件,專門用於通過Airflow提交EMR Serverless Spark任務,與Airflow的DAG機制深度整合,簡化任務編排和調度。 如果您是建立專案或完全基於EMR Serverless Spark,建議選擇方式二,以實現更高的效能和更低的營運複雜度。 |
前提條件
已安裝並啟動Airflow服務,詳情請參見Installation of Airflow。
已建立工作空間,詳情請參見建立工作空間。
注意事項
當前EmrServerlessSparkStartJobRunOperator未輸出實際作業的日誌。如果您需要查看詳細的作業日誌,請登入EMR Serverless Spark控制台,通過任務運行ID找到對應的任務執行個體,然後,您可以在日誌探查頁簽或者Spark UI中進一步檢查和分析任務日誌。
方式一:使用Livy Operator提交任務
步驟一:建立Livy Gateway及訪問Token
建立並啟動Gateway。
進入Gateway頁面。
在左側導覽列,選擇。
在Spark頁面,單擊目標工作空間名稱。
在EMR Serverless Spark頁面,單擊左側導覽列中的。
單擊Livy Gateway頁簽。
在Livy Gateway頁面,單擊建立Livy Gateway。
在建立Gateway頁面,輸入名稱(例如,Livy-gateway),單擊建立。
其餘參數請根據具體情況進行調整,更多參數資訊請參見Gateway管理。
在Livy Gateway頁面,單擊已建立Gateway操作列的啟動。
建立Token。
在Gateway頁面,單擊Livy-gateway操作列的Token管理。
單擊建立Token。
在建立Token對話方塊中,輸入名稱(例如,Livy-token),單擊確定。
複製Token資訊。
重要Token建立完成後,請務必立即複製新Token的資訊,後續不支援查看。如果您的Token到期或遺失,請選擇建立Token或重設Token。
步驟二:配置Apache Airflow
執行以下命令,在Apache Airflow環境中安裝Apache Livy。
pip install apache-airflow-providers-apache-livy添加Connection。
UI方式
在Airflow中找到預設為livy_default的Connection,並對其資訊進行修改;或者您也可以在Airflow Web頁面手動添加Connection,詳情請參見建立Connection。
涉及以下資訊:
Host:填寫為Gateway中的Endpoint資訊。
Schema:填寫為https。
Extra:填寫JSON字串,
x-acs-spark-livy-token為您前一個步驟中複製的Token資訊。{ "x-acs-spark-livy-token": "6ac**********kfu" }
CLI方式
通過Airflow CLI執行相應命令來建立Connection,詳情請參見建立Connection。
airflow connections add 'livy_default' \ --conn-json '{ "conn_type": "livy", "host": "pre-emr-spark-livy-gateway-cn-hangzhou.data.aliyun.com/api/v1/workspace/w-xxxxxxx/livycompute/lc-xxxxxxx", # Gateway中的Endpoint資訊。 "schema": "https", "extra": { "x-acs-spark-livy-token": "6ac**********kfu" # 為您前一個步驟中複製的Token資訊。 } }'
步驟三: 編寫DAG並提交任務
Airflow的DAG(Directed Acyclic Graph)定義允許您聲明任務執行的方式,以下是通過Airflow使用Livy Operator執行Spark任務的樣本。
從阿里雲OSS擷取並執行Python指令檔。
from datetime import timedelta, datetime
from airflow import DAG
from airflow.providers.apache.livy.operators.livy import LivyOperator
default_args = {
'owner': 'aliyun',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Initiate DAG
livy_operator_sparkpi_dag = DAG(
dag_id="livy_operator_sparkpi_dag", # DAG的唯一識別碼。
default_args=default_args,
schedule_interval=None,
start_date=datetime(2024, 5, 20),
tags=['example', 'spark', 'livy'],
catchup=False
)
# define livy task with LivyOperator
# 請根據實際情況替換file內容。
livy_sparkpi_submit_task = LivyOperator(
file="oss://<YourBucket>/jars/spark-examples_2.12-3.3.1.jar",
class_name="org.apache.spark.examples.SparkPi",
args=['1000'],
driver_memory="1g",
driver_cores=1,
executor_memory="1g",
executor_cores=2,
num_executors=1,
name="LivyOperator SparkPi",
task_id="livy_sparkpi_submit_task",
dag=livy_operator_sparkpi_dag,
)
livy_sparkpi_submit_task
涉及參數如下表所示。
參數 | 描述 |
| DAG的唯一識別碼,用於區分不同的DAG。 |
| 定義DAG的調度間隔。 |
| 定義DAG的起始日期,表示從哪一天開始調度。 |
| 為DAG添加標籤,便於分類和搜尋。 |
| 控制是否補跑歷史任務。如果為 |
| 為您的Spark任務對應的檔案路徑,本文樣本為上傳至阿里雲OSS上的JAR包的路徑,請您根據實際情況替換。上傳操作可參見簡單上傳。 |
| 指定JAR包中的主類名。 |
| 傳遞給Spark任務的命令列參數。 |
| 分別指定Driver的記憶體大小和核心數。 |
| 分別指定每個Executor的記憶體大小和核心數。 |
| 指定Executor的數量。 |
| Spark任務的名稱。 |
| Airflow任務的唯一識別碼。 |
| 將任務與DAG關聯起來。 |
方式二:使用EmrServerlessSparkStartJobRunOperator提交任務
步驟一:配置Apache Airflow
在Airflow的每個節點上安裝airflow-alibaba-provider外掛程式。
airflow-alibaba-provider外掛程式是由EMR Serverless Spark團隊提供,包含了一個專門用於提交EMR Serverless Spark任務的EmrServerlessSparkStartJobRunOperator組件。
pip install airflow_alibaba_provider-0.0.3-py3-none-any.whl添加Connection。
CLI方式
通過Airflow CLI執行相應命令來建立Connection,詳情請參見建立Connection。
airflow connections add 'emr-serverless-spark-id' \ --conn-json '{ "conn_type": "emr_serverless_spark", "extra": { "auth_type": "AK", #指定使用阿里雲的AccessKey(AK)方式認證。 "access_key_id": "<yourAccesskeyId>", # 阿里雲帳號的AccessKey ID。 "access_key_secret": "<yourAccesskeyKey>", # 阿里雲帳號的AccessKey Secret。 "region": "<yourRegion>" } }'UI方式
通過在Airflow Web頁面手動添加Connection,詳情請參見建立Connection。
在Add Connection頁面,配置以下資訊。

涉及參數如下表所示。
參數
說明
Connection Id
本文樣本為emr-serverless-spark-id。
Connection Type
選擇Generic。如果沒有該類型,您也可以選擇Email。
Extra
填寫內容如下。
{ "auth_type": "AK", #指定使用阿里雲的AccessKey(AK)方式認證。 "access_key_id": "<yourAccesskeyId>", # 阿里雲帳號的AccessKey ID。 "access_key_secret": "<yourAccesskeyKey>", # 阿里雲帳號的AccessKey Secret。 "region": "<yourRegion>" }
步驟二:編寫DAG並提交任務
Airflow的DAG(Directed Acyclic Graph)定義允許您聲明任務執行的方式,以下是通過Airflow使用EmrServerlessSparkStartJobRunOperator執行不同類型的Spark作業的樣本。
提交JAR包
此情境涉及使用Airflow任務提交一個先行編譯的Spark JAR作業到阿里雲EMR Serverless Spark。
from __future__ import annotations
from datetime import datetime
from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator
# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"
DAG_ID = "emr_spark_jar"
with DAG(
dag_id=DAG_ID,
start_date=datetime(2024, 5, 1),
default_args={},
max_active_runs=1,
catchup=False,
) as dag:
emr_spark_jar = EmrServerlessSparkStartJobRunOperator(
task_id="emr_spark_jar",
emr_serverless_spark_conn_id="emr-serverless-spark-id",
region="cn-hangzhou",
polling_interval=5,
workspace_id="w-7e2f1750c6b3****",
resource_queue_id="root_queue",
code_type="JAR",
name="airflow-emr-spark-jar",
entry_point="oss://<YourBucket>/spark-resource/examples/jars/spark-examples_2.12-3.3.1.jar",
entry_point_args=["1"],
spark_submit_parameters="--class org.apache.spark.examples.SparkPi --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
is_prod=True,
engine_release_version=None
)
emr_spark_jar
提交SQL檔案
在Airflow DAG中直接執行SQL命令。
from __future__ import annotations
from datetime import datetime
from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator
# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "emr_spark_sql"
with DAG(
dag_id=DAG_ID,
start_date=datetime(2024, 5, 1),
default_args={},
max_active_runs=1,
catchup=False,
) as dag:
emr_spark_sql = EmrServerlessSparkStartJobRunOperator(
task_id="emr_spark_sql",
emr_serverless_spark_conn_id="emr-serverless-spark-id",
region="cn-hangzhou",
polling_interval=5,
workspace_id="w-7e2f1750c6b3****",
resource_queue_id="root_queue",
code_type="SQL",
name="airflow-emr-spark-sql",
entry_point=None,
entry_point_args=["-e","show tables;show tables;"],
spark_submit_parameters="--class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
is_prod=True,
engine_release_version=None,
)
emr_spark_sql
從OSS提交SQL檔案
從阿里雲OSS擷取並執行SQL指令檔。
from __future__ import annotations
from datetime import datetime
from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator
# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"
DAG_ID = "emr_spark_sql_2"
with DAG(
dag_id=DAG_ID,
start_date=datetime(2024, 5, 1),
default_args={},
max_active_runs=1,
catchup=False,
) as dag:
emr_spark_sql_2 = EmrServerlessSparkStartJobRunOperator(
task_id="emr_spark_sql_2",
emr_serverless_spark_conn_id="emr-serverless-spark-id",
region="cn-hangzhou",
polling_interval=5,
workspace_id="w-ae42e9c92927****",
resource_queue_id="root_queue",
code_type="SQL",
name="airflow-emr-spark-sql-2",
entry_point="",
entry_point_args=["-f", "oss://<YourBucket>/spark-resource/examples/sql/show_db.sql"],
spark_submit_parameters="--class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
is_prod=True,
engine_release_version=None
)
emr_spark_sql_2
從OSS提交Python指令碼
從阿里雲OSS擷取並執行Python指令檔。
from __future__ import annotations
from datetime import datetime
from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator
# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"
DAG_ID = "emr_spark_python"
with DAG(
dag_id=DAG_ID,
start_date=datetime(2024, 5, 1),
default_args={},
max_active_runs=1,
catchup=False,
) as dag:
emr_spark_python = EmrServerlessSparkStartJobRunOperator(
task_id="emr_spark_python",
emr_serverless_spark_conn_id="emr-serverless-spark-id",
region="cn-hangzhou",
polling_interval=5,
workspace_id="w-ae42e9c92927****",
resource_queue_id="root_queue",
code_type="PYTHON",
name="airflow-emr-spark-python",
entry_point="oss://<YourBucket>/spark-resource/examples/src/main/python/pi.py",
entry_point_args=["1"],
spark_submit_parameters="--conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
is_prod=True,
engine_release_version=None
)
emr_spark_python
涉及參數如下表所示。
參數 | 參數類型 | 描述 |
|
| 指定Airflow任務的唯一識別碼。 |
|
| 指定Airflow用於串連EMR Serverless Spark的Connection ID。 |
|
| 指定EMR Spark所處的地區。 |
|
| 設定Airflow輪詢任務狀態的時間間隔,單位為秒。 |
|
| EMR Spark工作區的唯一識別碼。 |
|
| 用於指定EMR Spark任務所使用資源隊列的ID。 |
|
| 任務類型,可以是SQL、Python或JAR,根據任務類型,entry_point參數將有不同的含義。 |
|
| EMR Spark任務的名稱。 |
|
| 指定啟動任務的檔案位置,例如JAR、SQL或Python檔案。根據 |
|
| 傳遞給Spark程式的參數列表。 |
|
| 包含用於 |
|
| 指定任務啟動並執行環境。當設定為True時,則表明任務將在生產環境中執行, |
|
| 設定EMR Spark引擎的版本。預設值是"esr-2.1-native",對應Spark版本為3.3.1和Scala版本為2.12,使用原生運行時。 |