Apache Airflow is a powerful workflow automation and scheduling tool that enables developers to orchestrate, schedule, and monitor the execution of data pipelines. EMR Serverless Spark provides a serverless computing environment for processing large-scale data tasks. This topic describes how to use Apache Airflow to automatically submit tasks to EMR Serverless Spark to automate job scheduling and execution, helping you manage data processing tasks more effectively.
Background information
Apache Livy interacts with Spark through a REST interface, which greatly simplifies the communication complexity between Spark and application servers. For information about the Livy API, see REST API.
When submitting tasks using Apache Airflow, you can interact with Serverless Spark by using either the Livy Operator or the EmrServerlessSparkStartJobRunOperator. You can select a method based on your business requirements.
Method | Scenarios |
If your environment already has Livy service deployed and you need interactive search capabilities, we recommend that you use Method 1. | |
Method 2: Submit tasks using the EmrServerlessSparkStartJobRunOperator | EmrServerlessSparkStartJobRunOperator is a component provided by Alibaba Cloud EMR Serverless Spark specifically for submitting EMR Serverless Spark tasks through Airflow. It is deeply integrated with Airflow's DAG mechanism, simplifying task orchestration and scheduling. If you are creating a new project or working completely based on EMR Serverless Spark, we recommend that you use Method 2 to achieve higher performance and lower maintenance complexity. |
Prerequisites
Airflow service is installed and started. For more information, see Installation of Airflow.
A workspace is created. For more information, see Create a workspace.
Considerations
The EmrServerlessSparkStartJobRunOperator does not output the actual job logs. If you need to view detailed job logs, log on to the EMR Serverless Spark console, find the corresponding task instance by task run ID, and then check and analyze task logs in the Log Investigation tab or Spark UI.
Method 1: Submit tasks using the Livy Operator
Step 1: Create a Livy gateway and access token
Create and start a gateway.
Go to the Gateways page.
Log on to the EMR console.
In the left-side navigation pane, choose
.On the Spark page, find the desired workspace and click the name of the workspace.
In the left-side navigation pane of the EMR Serverless Spark page, choose
.
Click the Livy Gateway tab.
On the Livy Gateway page, click Create Livy Gateway.
On the Create Gateway page, enter a Name (for example, Livy-gateway) and click Create.
Adjust other parameters based on your specific requirements. For more information about parameters, see Manage gateways.
On the Livy Gateway page, click Start in the Actions column of the created gateway.
Create a token.
On the Gateway page, click Token Management in the Actions column of Livy-gateway.
Click Create Token.
In the Create Token dialog box, enter a Name (for example, Livy-token) and click OK.
Copy the token.
ImportantAfter the token is created, you must immediately copy the token. You can no longer view the token after you leave the page. If your token expires or is lost, reset the token or create a new token.
Step 2: Configure Apache Airflow
Run the following command to install Apache Livy in Apache Airflow:
pip install apache-airflow-providers-apache-livy
Add a connection.
UI method
Find the connection named livy_default in Airflow and modify its information. Alternatively, you can manually add a connection on the Airflow Web page. For more information, see Creating a Connection.
You can modify the following connection properties:
Host: Enter the Endpoint information from the gateway.
Schema: Enter https.
Extra: Enter a JSON string.
x-acs-spark-livy-token
is the token that you copied in the previous step.{ "x-acs-spark-livy-token": "6ac**********kfu" }
Cli method
Run the corresponding command through Airflow CLI to establish a connection. For more information, see Creating a Connection.
airflow connections add 'livy_default' \ --conn-json '{ "conn_type": "livy", "host": "pre-emr-spark-livy-gateway-cn-hangzhou.data.aliyun.com/api/v1/workspace/w-xxxxxxx/livycompute/lc-xxxxxxx", # The endpoint of the gateway. "schema": "https", "extra": { "x-acs-spark-livy-token": "6ac**********kfu" # The token that you copied in the previous step. } }'
Step 3: Write a DAG and submit tasks
Apache Airflow allows you to use Directed Acyclic Graphs (DAGs) to declare the mode for running jobs. The following sample code shows how to use the LivyOperator of Apache Airflow to run a Spark job.
Run the Python script file obtained from Object Storage Service (OSS).
from datetime import timedelta, datetime
from airflow import DAG
from airflow.providers.apache.livy.operators.livy import LivyOperator
default_args = {
'owner': 'aliyun',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Initiate DAG
livy_operator_sparkpi_dag = DAG(
dag_id="livy_operator_sparkpi_dag", # The unique identifier of the DAG.
default_args=default_args,
schedule_interval=None,
start_date=datetime(2024, 5, 20),
tags=['example', 'spark', 'livy'],
catchup=False
)
# define livy task with LivyOperator
# Replace the value of file based on your business requirements.
livy_sparkpi_submit_task = LivyOperator(
file="oss://<YourBucket>/jars/spark-examples_2.12-3.3.1.jar",
class_name="org.apache.spark.examples.SparkPi",
args=['1000'],
driver_memory="1g",
driver_cores=1,
executor_memory="1g",
executor_cores=2,
num_executors=1,
name="LivyOperator SparkPi",
task_id="livy_sparkpi_submit_task",
dag=livy_operator_sparkpi_dag,
)
livy_sparkpi_submit_task
The following table describes the parameters.
Parameter | Description |
| The unique identifier of the DAG, used to distinguish different DAGs. |
| Defines the scheduling interval of the DAG. |
| Defines the start date of the DAG, indicating from which day the scheduling begins. |
| Adds tags to the DAG for categorization and search. |
| Controls whether to run historical tasks. If set to |
| The file path for your Spark task. This example uses the path of a JAR package uploaded to Alibaba Cloud OSS. Replace it based on your business requirements. For information about upload operations, see Simple upload. |
| Specifies the main class name in the JAR package. |
| The command line arguments passed to the Spark task. |
| Specify the memory size and number of cores for the driver, respectively. |
| Specify the memory size and number of cores for each executor, respectively. |
| Specifies the number of executors. |
| The name of the Spark task. |
| The unique identifier of the Airflow task. |
| Associates the task with the DAG. |
Method 2: Submit tasks using the EmrServerlessSparkStartJobRunOperator
Step 1: Configure Apache Airflow
Install the airflow-alibaba-provider plug-in on each node of Airflow.
The airflow-alibaba-provider plug-in is provided by the EMR Serverless Spark team and includes the EmrServerlessSparkStartJobRunOperator component specifically for submitting EMR Serverless Spark tasks.
pip install airflow_alibaba_provider-0.0.3-py3-none-any.whl
Add a connection.
Cli method
Run the corresponding command through Airflow CLI to establish a connection. For more information, see Creating a Connection.
airflow connections add 'emr-serverless-spark-id' \ --conn-json '{ "conn_type": "emr_serverless_spark", "extra": { "auth_type": "AK", #Specifies the use of Alibaba Cloud AccessKey (AK) authentication method. "access_key_id": "<yourAccesskeyId>", # The AccessKey ID of your Alibaba Cloud account. "access_key_secret": "<yourAccesskeyKey>", # The AccessKey Secret of your Alibaba Cloud account. "region": "<yourRegion>" } }'
Ui method
Manually add a connection on the Airflow Web page. For more information, see Creating a Connection.
On the Add Connection page, configure the following information.
The following table describes the required parameters.
Parameter
Description
Connection Id
In this example, emr-serverless-spark-id is used.
Connection Type
Select Generic. If this type is not available, you can also select Email.
Extra
Enter the following content.
{ "auth_type": "AK", #Specifies the use of Alibaba Cloud AccessKey (AK) authentication method. "access_key_id": "<yourAccesskeyId>", # The AccessKey ID of your Alibaba Cloud account. "access_key_secret": "<yourAccesskeyKey>", # The AccessKey Secret of your Alibaba Cloud account. "region": "<yourRegion>" }
Step 2: Write a DAG and submit tasks
Apache Airflow allows you to use Directed Acyclic Graphs (DAGs) to declare the mode for running jobs. The following examples show how to use the EmrServerlessSparkStartJobRunOperator to run different types of Spark jobs.
Submit a JAR package
This scenario involves using an Airflow task to submit a precompiled Spark JAR job to Alibaba Cloud EMR Serverless Spark.
from __future__ import annotations
from datetime import datetime
from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator
# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"
DAG_ID = "emr_spark_jar"
with DAG(
dag_id=DAG_ID,
start_date=datetime(2024, 5, 1),
default_args={},
max_active_runs=1,
catchup=False,
) as dag:
emr_spark_jar = EmrServerlessSparkStartJobRunOperator(
task_id="emr_spark_jar",
emr_serverless_spark_conn_id="emr-serverless-spark-id",
region="cn-hangzhou",
polling_interval=5,
workspace_id="w-7e2f1750c6b3****",
resource_queue_id="root_queue",
code_type="JAR",
name="airflow-emr-spark-jar",
entry_point="oss://<YourBucket>/spark-resource/examples/jars/spark-examples_2.12-3.3.1.jar",
entry_point_args=["1"],
spark_submit_parameters="--class org.apache.spark.examples.SparkPi --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
is_prod=True,
engine_release_version=None
)
emr_spark_jar
Submit an SQL file
Execute SQL commands directly in an Airflow DAG.
from __future__ import annotations
from datetime import datetime
from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator
# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "emr_spark_sql"
with DAG(
dag_id=DAG_ID,
start_date=datetime(2024, 5, 1),
default_args={},
max_active_runs=1,
catchup=False,
) as dag:
emr_spark_sql = EmrServerlessSparkStartJobRunOperator(
task_id="emr_spark_sql",
emr_serverless_spark_conn_id="emr-serverless-spark-id",
region="cn-hangzhou",
polling_interval=5,
workspace_id="w-7e2f1750c6b3****",
resource_queue_id="root_queue",
code_type="SQL",
name="airflow-emr-spark-sql",
entry_point=None,
entry_point_args=["-e","show tables;show tables;"],
spark_submit_parameters="--class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
is_prod=True,
engine_release_version=None,
)
emr_spark_sql
Submit an SQL file from OSS
Run the SQL script file obtained from Object Storage Service (OSS).
from __future__ import annotations
from datetime import datetime
from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator
# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"
DAG_ID = "emr_spark_sql_2"
with DAG(
dag_id=DAG_ID,
start_date=datetime(2024, 5, 1),
default_args={},
max_active_runs=1,
catchup=False,
) as dag:
emr_spark_sql_2 = EmrServerlessSparkStartJobRunOperator(
task_id="emr_spark_sql_2",
emr_serverless_spark_conn_id="emr-serverless-spark-id",
region="cn-hangzhou",
polling_interval=5,
workspace_id="w-ae42e9c92927****",
resource_queue_id="root_queue",
code_type="SQL",
name="airflow-emr-spark-sql-2",
entry_point="",
entry_point_args=["-f", "oss://<YourBucket>/spark-resource/examples/sql/show_db.sql"],
spark_submit_parameters="--class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
is_prod=True,
engine_release_version=None
)
emr_spark_sql_2
Submit a Python script from OSS
Run the Python script file obtained from Object Storage Service (OSS).
from __future__ import annotations
from datetime import datetime
from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator
# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"
DAG_ID = "emr_spark_python"
with DAG(
dag_id=DAG_ID,
start_date=datetime(2024, 5, 1),
default_args={},
max_active_runs=1,
catchup=False,
) as dag:
emr_spark_python = EmrServerlessSparkStartJobRunOperator(
task_id="emr_spark_python",
emr_serverless_spark_conn_id="emr-serverless-spark-id",
region="cn-hangzhou",
polling_interval=5,
workspace_id="w-ae42e9c92927****",
resource_queue_id="root_queue",
code_type="PYTHON",
name="airflow-emr-spark-python",
entry_point="oss://<YourBucket>/spark-resource/examples/src/main/python/pi.py",
entry_point_args=["1"],
spark_submit_parameters="--conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
is_prod=True,
engine_release_version=None
)
emr_spark_python
The following table describes the parameters.
Parameter | Parameter type | Description |
|
| Specifies the unique identifier of the Airflow task. |
|
| Specifies the Connection ID used by Airflow to connect to EMR Serverless Spark. |
|
| Specifies the region where EMR Spark is located. |
|
| Sets the time interval for Airflow to poll the task status, in seconds. |
|
| The unique identifier of the EMR Spark workspace. |
|
| Specifies the ID of the resource queue used by the EMR Spark task. |
|
| The task type, which can be SQL, Python, or JAR. Depending on the task type, the entry_point parameter will have different meanings. |
|
| The name of the EMR Spark task. |
|
| Specifies the file location to start the task, such as a JAR, SQL, or Python file. The meaning of this parameter varies depending on the |
|
| A list of arguments passed to the Spark program. |
|
| Contains additional parameters for the |
|
| Specifies the environment in which the task runs. When set to True, it indicates that the task will be executed in a production environment, and |
|
| Sets the version of the EMR Spark engine. The default value is "esr-2.1-native", corresponding to Spark version 3.3.1 and Scala version 2.12, using the native runtime. |