All Products
Search
Document Center

E-MapReduce:Submit tasks through Apache Airflow

Last Updated:Apr 17, 2025

Apache Airflow is a powerful workflow automation and scheduling tool that enables developers to orchestrate, schedule, and monitor the execution of data pipelines. EMR Serverless Spark provides a serverless computing environment for processing large-scale data tasks. This topic describes how to use Apache Airflow to automatically submit tasks to EMR Serverless Spark to automate job scheduling and execution, helping you manage data processing tasks more effectively.

Background information

Apache Livy interacts with Spark through a REST interface, which greatly simplifies the communication complexity between Spark and application servers. For information about the Livy API, see REST API.

When submitting tasks using Apache Airflow, you can interact with Serverless Spark by using either the Livy Operator or the EmrServerlessSparkStartJobRunOperator. You can select a method based on your business requirements.

Method

Scenarios

Method 1: Submit tasks using the Livy Operator

If your environment already has Livy service deployed and you need interactive search capabilities, we recommend that you use Method 1.

Method 2: Submit tasks using the EmrServerlessSparkStartJobRunOperator

EmrServerlessSparkStartJobRunOperator is a component provided by Alibaba Cloud EMR Serverless Spark specifically for submitting EMR Serverless Spark tasks through Airflow. It is deeply integrated with Airflow's DAG mechanism, simplifying task orchestration and scheduling.

If you are creating a new project or working completely based on EMR Serverless Spark, we recommend that you use Method 2 to achieve higher performance and lower maintenance complexity.

Prerequisites

Considerations

The EmrServerlessSparkStartJobRunOperator does not output the actual job logs. If you need to view detailed job logs, log on to the EMR Serverless Spark console, find the corresponding task instance by task run ID, and then check and analyze task logs in the Log Investigation tab or Spark UI.

Method 1: Submit tasks using the Livy Operator

Step 1: Create a Livy gateway and access token

  1. Create and start a gateway.

    1. Go to the Gateways page.

      1. Log on to the EMR console.

      2. In the left-side navigation pane, choose EMR Serverless > Spark.

      3. On the Spark page, find the desired workspace and click the name of the workspace.

      4. In the left-side navigation pane of the EMR Serverless Spark page, choose Operation Center > Gateways.

    2. Click the Livy Gateway tab.

    3. On the Livy Gateway page, click Create Livy Gateway.

    4. On the Create Gateway page, enter a Name (for example, Livy-gateway) and click Create.

      Adjust other parameters based on your specific requirements. For more information about parameters, see Manage gateways.

    5. On the Livy Gateway page, click Start in the Actions column of the created gateway.

  2. Create a token.

    1. On the Gateway page, click Token Management in the Actions column of Livy-gateway.

    2. Click Create Token.

    3. In the Create Token dialog box, enter a Name (for example, Livy-token) and click OK.

    4. Copy the token.

      Important

      After the token is created, you must immediately copy the token. You can no longer view the token after you leave the page. If your token expires or is lost, reset the token or create a new token.

Step 2: Configure Apache Airflow

  1. Run the following command to install Apache Livy in Apache Airflow:

    pip install apache-airflow-providers-apache-livy
  2. Add a connection.

    UI method

    Find the connection named livy_default in Airflow and modify its information. Alternatively, you can manually add a connection on the Airflow Web page. For more information, see Creating a Connection.

    You can modify the following connection properties:

    • Host: Enter the Endpoint information from the gateway.

    • Schema: Enter https.

    • Extra: Enter a JSON string. x-acs-spark-livy-token is the token that you copied in the previous step.

      {
        "x-acs-spark-livy-token": "6ac**********kfu"
      }

    Cli method

    Run the corresponding command through Airflow CLI to establish a connection. For more information, see Creating a Connection.

    airflow connections add 'livy_default' \
        --conn-json '{
            "conn_type": "livy",
            "host": "pre-emr-spark-livy-gateway-cn-hangzhou.data.aliyun.com/api/v1/workspace/w-xxxxxxx/livycompute/lc-xxxxxxx",   # The endpoint of the gateway.
            "schema": "https",
            "extra": {
                "x-acs-spark-livy-token": "6ac**********kfu"  # The token that you copied in the previous step.
            }
        }'

Step 3: Write a DAG and submit tasks

Apache Airflow allows you to use Directed Acyclic Graphs (DAGs) to declare the mode for running jobs. The following sample code shows how to use the LivyOperator of Apache Airflow to run a Spark job.

Run the Python script file obtained from Object Storage Service (OSS).

from datetime import timedelta, datetime
from airflow import DAG
from airflow.providers.apache.livy.operators.livy import LivyOperator

default_args = {
    'owner': 'aliyun',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Initiate DAG
livy_operator_sparkpi_dag = DAG(
    dag_id="livy_operator_sparkpi_dag",  # The unique identifier of the DAG.
    default_args=default_args,
    schedule_interval=None,
    start_date=datetime(2024, 5, 20),
    tags=['example', 'spark', 'livy'],
    catchup=False
)

# define livy task with LivyOperator
# Replace the value of file based on your business requirements.
livy_sparkpi_submit_task = LivyOperator(
    file="oss://<YourBucket>/jars/spark-examples_2.12-3.3.1.jar",
    class_name="org.apache.spark.examples.SparkPi",
    args=['1000'],
    driver_memory="1g",
    driver_cores=1,
    executor_memory="1g",
    executor_cores=2,
    num_executors=1,
    name="LivyOperator SparkPi",
    task_id="livy_sparkpi_submit_task", 
    dag=livy_operator_sparkpi_dag,
)

livy_sparkpi_submit_task

The following table describes the parameters.

Parameter

Description

dag_id

The unique identifier of the DAG, used to distinguish different DAGs.

schedule_interval

Defines the scheduling interval of the DAG. None indicates that the task needs to be triggered manually.

start_date

Defines the start date of the DAG, indicating from which day the scheduling begins.

tags

Adds tags to the DAG for categorization and search.

catchup

Controls whether to run historical tasks. If set to False, unexecuted tasks will not be run even if the start date is earlier.

file

The file path for your Spark task. This example uses the path of a JAR package uploaded to Alibaba Cloud OSS. Replace it based on your business requirements. For information about upload operations, see Simple upload.

class_name

Specifies the main class name in the JAR package.

args

The command line arguments passed to the Spark task.

driver_memory and driver_cores

Specify the memory size and number of cores for the driver, respectively.

executor_memory and executor_cores

Specify the memory size and number of cores for each executor, respectively.

num_executors

Specifies the number of executors.

name

The name of the Spark task.

task_id

The unique identifier of the Airflow task.

dag

Associates the task with the DAG.

Method 2: Submit tasks using the EmrServerlessSparkStartJobRunOperator

Step 1: Configure Apache Airflow

  1. Download airflow_alibaba_provider-0.0.3-py3-none-any.whl.

  2. Install the airflow-alibaba-provider plug-in on each node of Airflow.

    The airflow-alibaba-provider plug-in is provided by the EMR Serverless Spark team and includes the EmrServerlessSparkStartJobRunOperator component specifically for submitting EMR Serverless Spark tasks.

    pip install airflow_alibaba_provider-0.0.3-py3-none-any.whl
  3. Add a connection.

    Cli method

    Run the corresponding command through Airflow CLI to establish a connection. For more information, see Creating a Connection.

    airflow connections add 'emr-serverless-spark-id' \
        --conn-json '{
            "conn_type": "emr_serverless_spark",
            "extra": {
                "auth_type": "AK",  #Specifies the use of Alibaba Cloud AccessKey (AK) authentication method.
                "access_key_id": "<yourAccesskeyId>",  # The AccessKey ID of your Alibaba Cloud account.
                "access_key_secret": "<yourAccesskeyKey>",  # The AccessKey Secret of your Alibaba Cloud account.
                "region": "<yourRegion>"
            }
        }'

    Ui method

    Manually add a connection on the Airflow Web page. For more information, see Creating a Connection.

    On the Add Connection page, configure the following information.

    image

    The following table describes the required parameters.

    Parameter

    Description

    Connection Id

    In this example, emr-serverless-spark-id is used.

    Connection Type

    Select Generic. If this type is not available, you can also select Email.

    Extra

    Enter the following content.

    {
                "auth_type": "AK",  #Specifies the use of Alibaba Cloud AccessKey (AK) authentication method.
                "access_key_id": "<yourAccesskeyId>",  # The AccessKey ID of your Alibaba Cloud account.
                "access_key_secret": "<yourAccesskeyKey>",  # The AccessKey Secret of your Alibaba Cloud account.
                "region": "<yourRegion>"
            }

Step 2: Write a DAG and submit tasks

Apache Airflow allows you to use Directed Acyclic Graphs (DAGs) to declare the mode for running jobs. The following examples show how to use the EmrServerlessSparkStartJobRunOperator to run different types of Spark jobs.

Submit a JAR package

This scenario involves using an Airflow task to submit a precompiled Spark JAR job to Alibaba Cloud EMR Serverless Spark.

from __future__ import annotations

from datetime import datetime

from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator

# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"

DAG_ID = "emr_spark_jar"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime(2024, 5, 1),
    default_args={},
    max_active_runs=1,
    catchup=False,
) as dag:
    emr_spark_jar = EmrServerlessSparkStartJobRunOperator(
        task_id="emr_spark_jar",
        emr_serverless_spark_conn_id="emr-serverless-spark-id",
        region="cn-hangzhou",
        polling_interval=5,
        workspace_id="w-7e2f1750c6b3****",
        resource_queue_id="root_queue",
        code_type="JAR",
        name="airflow-emr-spark-jar",
        entry_point="oss://<YourBucket>/spark-resource/examples/jars/spark-examples_2.12-3.3.1.jar",
        entry_point_args=["1"],
        spark_submit_parameters="--class org.apache.spark.examples.SparkPi --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
        is_prod=True,
        engine_release_version=None
    )

    emr_spark_jar

Submit an SQL file

Execute SQL commands directly in an Airflow DAG.

from __future__ import annotations

from datetime import datetime

from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator

# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "emr_spark_sql"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime(2024, 5, 1),
    default_args={},
    max_active_runs=1,
    catchup=False,
) as dag:
    emr_spark_sql = EmrServerlessSparkStartJobRunOperator(
        task_id="emr_spark_sql",
        emr_serverless_spark_conn_id="emr-serverless-spark-id",
        region="cn-hangzhou",
        polling_interval=5,
        workspace_id="w-7e2f1750c6b3****",
        resource_queue_id="root_queue",
        code_type="SQL",
        name="airflow-emr-spark-sql",
        entry_point=None,
        entry_point_args=["-e","show tables;show tables;"],
        spark_submit_parameters="--class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
        is_prod=True,
        engine_release_version=None,
    )

    emr_spark_sql

Submit an SQL file from OSS

Run the SQL script file obtained from Object Storage Service (OSS).

from __future__ import annotations

from datetime import datetime

from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator

# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"

DAG_ID = "emr_spark_sql_2"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime(2024, 5, 1),
    default_args={},
    max_active_runs=1,
    catchup=False,
) as dag:
    emr_spark_sql_2 = EmrServerlessSparkStartJobRunOperator(
        task_id="emr_spark_sql_2",
        emr_serverless_spark_conn_id="emr-serverless-spark-id",
        region="cn-hangzhou",
        polling_interval=5,
        workspace_id="w-ae42e9c92927****",
        resource_queue_id="root_queue",
        code_type="SQL",
        name="airflow-emr-spark-sql-2",
        entry_point="",
        entry_point_args=["-f", "oss://<YourBucket>/spark-resource/examples/sql/show_db.sql"],
        spark_submit_parameters="--class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
        is_prod=True,
        engine_release_version=None
    )

    emr_spark_sql_2

Submit a Python script from OSS

Run the Python script file obtained from Object Storage Service (OSS).

from __future__ import annotations

from datetime import datetime

from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator

# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"

DAG_ID = "emr_spark_python"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime(2024, 5, 1),
    default_args={},
    max_active_runs=1,
    catchup=False,
) as dag:
    emr_spark_python = EmrServerlessSparkStartJobRunOperator(
        task_id="emr_spark_python",
        emr_serverless_spark_conn_id="emr-serverless-spark-id",
        region="cn-hangzhou",
        polling_interval=5,
        workspace_id="w-ae42e9c92927****",
        resource_queue_id="root_queue",
        code_type="PYTHON",
        name="airflow-emr-spark-python",
        entry_point="oss://<YourBucket>/spark-resource/examples/src/main/python/pi.py",
        entry_point_args=["1"],
        spark_submit_parameters="--conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
        is_prod=True,
        engine_release_version=None
    )

    emr_spark_python

The following table describes the parameters.

Parameter

Parameter type

Description

task_id

str

Specifies the unique identifier of the Airflow task.

emr_serverless_spark_conn_id

str

Specifies the Connection ID used by Airflow to connect to EMR Serverless Spark.

region

str

Specifies the region where EMR Spark is located.

polling_interval

int

Sets the time interval for Airflow to poll the task status, in seconds.

workspace_id

str

The unique identifier of the EMR Spark workspace.

resource_queue_id

str

Specifies the ID of the resource queue used by the EMR Spark task.

code_type

str

The task type, which can be SQL, Python, or JAR. Depending on the task type, the entry_point parameter will have different meanings.

name

str

The name of the EMR Spark task.

entry_point

str

Specifies the file location to start the task, such as a JAR, SQL, or Python file. The meaning of this parameter varies depending on the code_type.

entry_point_args

List

A list of arguments passed to the Spark program.

spark_submit_parameters

str

Contains additional parameters for the spark-submit command.

is_prod

bool

Specifies the environment in which the task runs. When set to True, it indicates that the task will be executed in a production environment, and resource_queue_id should specify the resource queue ID corresponding to the production environment, such as root_queue.

engine_release_version

str

Sets the version of the EMR Spark engine. The default value is "esr-2.1-native", corresponding to Spark version 3.3.1 and Scala version 2.12, using the native runtime.