Apache Airflow is a powerful workflow automation and scheduling tool that allows you to orchestrate, schedule, and monitor data pipelines. EMR Serverless Spark provides a serverless computing environment for large-scale data processing jobs. This topic describes how to use Apache Airflow to automate job submission to EMR Serverless Spark, enabling automated scheduling and more efficient management of data processing jobs.
Background information
Apache Livy uses a REST API to interact with Spark, simplifying communication between Spark and application servers. For more information about the Livy API, see REST API.
To submit jobs from Airflow, you can interact with EMR Serverless Spark using either the LivyOperator or the EmrServerlessSparkStartJobRunOperator. Choose the method that best suits your scenario.
|
Method |
Scenarios |
|
Choose this method if you want to use an open-source Airflow operator to submit jobs to EMR Serverless Spark. |
|
|
Method 2: Submit jobs using the EmrServerlessSparkStartJobRunOperator |
The If you are starting a new project or building exclusively on EMR Serverless Spark, use Method 2 for higher performance and lower operational complexity. |
Prerequisites
-
Airflow is installed and running. For more information, see Installation of Airflow.
-
You have created a workspace. For more information, see Create a workspace.
Usage notes
The EmrServerlessSparkStartJobRunOperator does not currently output job logs. To view detailed logs, log on to the EMR console, find the job run by its ID, and inspect the logs on the Log Exploration tab or in the Spark UI.
Method 1: Submit jobs using the LivyOperator
Step 1: Create a Livy Gateway and token
-
Create and start a gateway.
Go to the Gateway page.
Log on to the EMR console.
In the navigation pane on the left, choose .
On the Spark page, click the name of the target workspace.
On the EMR Serverless Spark page, in the navigation pane on the left, click .
-
Click the Livy Gateway tab.
-
On the Livy Gateway page, click Create Livy Gateway.
-
On the Create Gateway page, enter a Name (for example, Livy-gateway) and click create.
Adjust other parameters as needed. For more information, see Manage gateways.
-
On the Livy Gateway page, find the gateway that you created and click START in the Actions column.
-
Create a token.
-
On the Gateway page, find Livy-gateway and click Token Management in the Actions column.
-
Click Create Token.
-
In the Create Token dialog box, enter a Name (for example, Livy-token) and click OK.
-
Copy the token information.
ImportantCopy the token immediately after creation, as it cannot be viewed later. If the token expires or is lost, create or reset it.
-
Step 2: Configure Apache Airflow
-
Run the following command to install the Apache Livy provider in your Airflow environment:
pip install apache-airflow-providers-apache-livy -
Add a connection.
UI
In Airflow, find the default connection named livy_default and modify its details. Alternatively, you can manually add a connection on the Airflow web UI. For more information, see Create a connection.
Configure the following parameters:
-
Host: Enter the Endpoint of the gateway.
-
Schema: Enter https.
-
Extra: Enter a JSON string.
x-acs-spark-livy-tokenis the token that you copied in the previous step.{ "x-acs-spark-livy-token": "6ac**********kfu" }
CLI
Run the corresponding command using the Airflow CLI to create a connection. For more information, see Create a connection.
airflow connections add 'livy_default' \ --conn-json '{ "conn_type": "livy", "host": "pre-emr-spark-livy-gateway-cn-hangzhou.data.aliyun.com/api/v1/workspace/w-xxxxxxx/livycompute/lc-xxxxxxx", # The endpoint of the gateway. "schema": "https", "extra": { "x-acs-spark-livy-token": "6ac**********kfu" # The token that you copied in the preceding step. } }' -
Step 3: Write a DAG and submit the job
An Airflow Directed Acyclic Graph (DAG) defines how tasks are executed. The following example shows how to use the LivyOperator in Airflow to run a Spark job.
Fetch and run a JAR file from Alibaba Cloud Object Storage Service (OSS).
from datetime import timedelta, datetime
from airflow import DAG
from airflow.providers.apache.livy.operators.livy import LivyOperator
default_args = {
'owner': 'aliyun',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Initiate DAG
livy_operator_sparkpi_dag = DAG(
dag_id="livy_operator_sparkpi_dag", # The unique identifier for the DAG.
default_args=default_args,
schedule_interval=None,
start_date=datetime(2024, 5, 20),
tags=['example', 'spark', 'livy'],
catchup=False
)
# Define the Livy task with LivyOperator.
# Replace the file path with your own.
livy_sparkpi_submit_task = LivyOperator(
file="oss://<YourBucket>/jars/spark-examples_2.12-3.3.1.jar",
class_name="org.apache.spark.examples.SparkPi",
args=['1000'],
driver_memory="1g",
driver_cores=1,
executor_memory="1g",
executor_cores=2,
num_executors=1,
name="LivyOperator SparkPi",
task_id="livy_sparkpi_submit_task",
dag=livy_operator_sparkpi_dag,
)
livy_sparkpi_submit_task
The following table describes the parameters.
|
Parameter |
Description |
|
|
The unique identifier for the DAG. |
|
|
Specifies the scheduling interval for the DAG. |
|
|
The date from which the DAG can be scheduled. |
|
|
Adds tags to the DAG for easy categorization and searching. |
|
|
Controls whether to backfill historical tasks. If set to |
|
|
The path to the file for your Spark job. In this example, it is the path to a JAR package uploaded to Alibaba Cloud OSS. Replace the path as required. For information about how to upload files, see Simple upload. |
|
|
The main class in the JAR package. |
|
|
The command-line arguments passed to the Spark job. |
|
|
The memory size and number of cores for the driver, respectively. |
|
|
The memory size and number of cores for each executor, respectively. |
|
|
The number of executors. |
|
|
The name of the Spark job. |
|
|
The unique identifier for the Airflow task. |
|
|
Associates the task with the DAG. |
Method 2: Use the EmrServerlessSparkStartJobRunOperator
Step 1: Configure Apache Airflow
-
Install the airflow-alibaba-provider plugin on each Airflow node.
The airflow-alibaba-provider plugin, provided by the EMR Serverless Spark team, includes the
EmrServerlessSparkStartJobRunOperatorcomponent, which is designed for submitting EMR Serverless Spark jobs.pip install airflow_alibaba_provider-0.0.3-py3-none-any.whl -
Add a connection.
CLI
Run the corresponding command using the Airflow CLI to create a connection. For more information, see Create a connection.
airflow connections add 'emr-serverless-spark-id' \ --conn-json '{ "conn_type": "emr_serverless_spark", "extra": { "auth_type": "AK", # Specifies the use of Alibaba Cloud AccessKey (AK) for authentication. "access_key_id": "<yourAccesskeyId>", # Your Alibaba Cloud AccessKey ID. "access_key_secret": "<yourAccesskeyKey>", # Your Alibaba Cloud AccessKey secret. "region": "<yourRegion>" } }'UI
Manually add a connection on the Airflow web UI. For more information, see Create a connection.
On the Add Connection page, configure the following parameters.
The following table describes the parameters.
Parameter
Description
Connection Id
In this example, the value is emr-serverless-spark-id.
Connection Type
Select Generic. If this type is not available, you can select Email.
Extra
Enter the following content.
{ "auth_type": "AK", # Specifies the use of Alibaba Cloud AccessKey (AK) for authentication. "access_key_id": "<yourAccesskeyId>", # Your Alibaba Cloud AccessKey ID. "access_key_secret": "<yourAccesskeyKey>", # Your Alibaba Cloud AccessKey secret. "region": "<yourRegion>" }
Step 2: Write a DAG and submit the job
The following examples show how to use the EmrServerlessSparkStartJobRunOperator in Airflow to run different types of Spark jobs.
JAR job
This scenario shows how to use an Airflow task to submit a pre-compiled Spark JAR job to Alibaba Cloud EMR Serverless Spark.
from __future__ import annotations
from datetime import datetime
from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator
# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"
DAG_ID = "emr_spark_jar"
with DAG(
dag_id=DAG_ID,
start_date=datetime(2024, 5, 1),
default_args={},
max_active_runs=1,
catchup=False,
) as dag:
emr_spark_jar = EmrServerlessSparkStartJobRunOperator(
task_id="emr_spark_jar",
emr_serverless_spark_conn_id="emr-serverless-spark-id",
region="cn-hangzhou",
polling_interval=5,
workspace_id="w-7e2f1750c6b3****",
resource_queue_id="root_queue",
code_type="JAR",
name="airflow-emr-spark-jar",
entry_point="oss://<YourBucket>/spark-resource/examples/jars/spark-examples_2.12-3.3.1.jar",
entry_point_args=["1"],
spark_submit_parameters="--class org.apache.spark.examples.SparkPi --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
is_prod=True,
engine_release_version=None
)
emr_spark_jar
SQL command
Run SQL commands directly in an Airflow DAG.
from __future__ import annotations
from datetime import datetime
from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator
# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "emr_spark_sql"
with DAG(
dag_id=DAG_ID,
start_date=datetime(2024, 5, 1),
default_args={},
max_active_runs=1,
catchup=False,
) as dag:
emr_spark_sql = EmrServerlessSparkStartJobRunOperator(
task_id="emr_spark_sql",
emr_serverless_spark_conn_id="emr-serverless-spark-id",
region="cn-hangzhou",
polling_interval=5,
workspace_id="w-7e2f1750c6b3****",
resource_queue_id="root_queue",
code_type="SQL",
name="airflow-emr-spark-sql",
entry_point=None,
entry_point_args=["-e","show tables;show tables;"],
spark_submit_parameters="--class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
is_prod=True,
engine_release_version=None,
)
emr_spark_sql
SQL file from OSS
Fetch and run an SQL script file from Alibaba Cloud Object Storage Service (OSS).
from __future__ import annotations
from datetime import datetime
from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator
# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"
DAG_ID = "emr_spark_sql_2"
with DAG(
dag_id=DAG_ID,
start_date=datetime(2024, 5, 1),
default_args={},
max_active_runs=1,
catchup=False,
) as dag:
emr_spark_sql_2 = EmrServerlessSparkStartJobRunOperator(
task_id="emr_spark_sql_2",
emr_serverless_spark_conn_id="emr-serverless-spark-id",
region="cn-hangzhou",
polling_interval=5,
workspace_id="w-ae42e9c92927****",
resource_queue_id="root_queue",
code_type="SQL",
name="airflow-emr-spark-sql-2",
entry_point="",
entry_point_args=["-f", "oss://<YourBucket>/spark-resource/examples/sql/show_db.sql"],
spark_submit_parameters="--class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
is_prod=True,
engine_release_version=None
)
emr_spark_sql_2
Python file from OSS
Fetch and run a Python script file from Alibaba Cloud Object Storage Service (OSS).
from __future__ import annotations
from datetime import datetime
from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator
# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"
DAG_ID = "emr_spark_python"
with DAG(
dag_id=DAG_ID,
start_date=datetime(2024, 5, 1),
default_args={},
max_active_runs=1,
catchup=False,
) as dag:
emr_spark_python = EmrServerlessSparkStartJobRunOperator(
task_id="emr_spark_python",
emr_serverless_spark_conn_id="emr-serverless-spark-id",
region="cn-hangzhou",
polling_interval=5,
workspace_id="w-ae42e9c92927****",
resource_queue_id="root_queue",
code_type="PYTHON",
name="airflow-emr-spark-python",
entry_point="oss://<YourBucket>/spark-resource/examples/src/main/python/pi.py",
entry_point_args=["1"],
spark_submit_parameters="--conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
is_prod=True,
engine_release_version=None
)
emr_spark_python
The following table describes the parameters.
|
Parameter |
Type |
Description |
|
|
|
The unique identifier of the Airflow task. |
|
|
|
The connection ID for EMR Serverless Spark. |
|
|
|
The region of the EMR Spark workspace. |
|
|
|
The interval, in seconds, at which Airflow polls the job status. |
|
|
|
The unique identifier of the EMR Spark workspace. |
|
|
|
The ID of the resource queue used by the EMR Spark job. |
|
|
|
The job type. Valid values are SQL, PYTHON, and JAR. The meaning of the |
|
|
|
The name of the EMR Spark job. |
|
|
|
Specify the file location of the startup task, such as a JAR, SQL, or Python file. The meaning of this parameter varies depending on the |
|
|
|
A list of arguments passed to the Spark application. |
|
|
|
Contains additional parameters for the |
|
|
|
Specifies the environment where the task runs. When set to True, this indicates that the task will be executed in the production environment, and |
|
|
|
The EMR Spark engine version. Defaults to "esr-2.1-native" (Spark 3.3.1, Scala 2.12, native runtime). |