All Products
Search
Document Center

E-MapReduce:Submit jobs using Apache Airflow

Last Updated:Jun 20, 2026

Apache Airflow is a powerful workflow automation and scheduling tool that allows you to orchestrate, schedule, and monitor data pipelines. EMR Serverless Spark provides a serverless computing environment for large-scale data processing jobs. This topic describes how to use Apache Airflow to automate job submission to EMR Serverless Spark, enabling automated scheduling and more efficient management of data processing jobs.

Background information

Apache Livy uses a REST API to interact with Spark, simplifying communication between Spark and application servers. For more information about the Livy API, see REST API.

To submit jobs from Airflow, you can interact with EMR Serverless Spark using either the LivyOperator or the EmrServerlessSparkStartJobRunOperator. Choose the method that best suits your scenario.

Method

Scenarios

Method 1: Submit jobs using the LivyOperator

Choose this method if you want to use an open-source Airflow operator to submit jobs to EMR Serverless Spark.

Method 2: Submit jobs using the EmrServerlessSparkStartJobRunOperator

The EmrServerlessSparkStartJobRunOperator is a component provided by Alibaba Cloud EMR Serverless Spark. It is specifically designed to submit EMR Serverless Spark jobs through Airflow and is deeply integrated with Airflow's Directed Acyclic Graph (DAG) to simplify job orchestration and scheduling.

If you are starting a new project or building exclusively on EMR Serverless Spark, use Method 2 for higher performance and lower operational complexity.

Prerequisites

Usage notes

The EmrServerlessSparkStartJobRunOperator does not currently output job logs. To view detailed logs, log on to the EMR console, find the job run by its ID, and inspect the logs on the Log Exploration tab or in the Spark UI.

Method 1: Submit jobs using the LivyOperator

Step 1: Create a Livy Gateway and token

  1. Create and start a gateway.

    1. Go to the Gateway page.

      1. Log on to the EMR console.

      2. In the navigation pane on the left, choose Spark.

      3. On the Spark page, click the name of the target workspace.

      4. On the EMR Serverless Spark page, in the navigation pane on the left, click O&M Center > Gateway.

    2. Click the Livy Gateway tab.

    3. On the Livy Gateway page, click Create Livy Gateway.

    4. On the Create Gateway page, enter a Name (for example, Livy-gateway) and click create.

      Adjust other parameters as needed. For more information, see Manage gateways.

    5. On the Livy Gateway page, find the gateway that you created and click START in the Actions column.

  2. Create a token.

    1. On the Gateway page, find Livy-gateway and click Token Management in the Actions column.

    2. Click Create Token.

    3. In the Create Token dialog box, enter a Name (for example, Livy-token) and click OK.

    4. Copy the token information.

      Important

      Copy the token immediately after creation, as it cannot be viewed later. If the token expires or is lost, create or reset it.

Step 2: Configure Apache Airflow

  1. Run the following command to install the Apache Livy provider in your Airflow environment:

    pip install apache-airflow-providers-apache-livy
  2. Add a connection.

    UI

    In Airflow, find the default connection named livy_default and modify its details. Alternatively, you can manually add a connection on the Airflow web UI. For more information, see Create a connection.

    Configure the following parameters:

    • Host: Enter the Endpoint of the gateway.

    • Schema: Enter https.

    • Extra: Enter a JSON string. x-acs-spark-livy-token is the token that you copied in the previous step.

      {
        "x-acs-spark-livy-token": "6ac**********kfu"
      }

    CLI

    Run the corresponding command using the Airflow CLI to create a connection. For more information, see Create a connection.

    airflow connections add 'livy_default' \
        --conn-json '{
            "conn_type": "livy",
            "host": "pre-emr-spark-livy-gateway-cn-hangzhou.data.aliyun.com/api/v1/workspace/w-xxxxxxx/livycompute/lc-xxxxxxx",   # The endpoint of the gateway.
            "schema": "https",
            "extra": {
                "x-acs-spark-livy-token": "6ac**********kfu"  # The token that you copied in the preceding step.
            }
        }'

Step 3: Write a DAG and submit the job

An Airflow Directed Acyclic Graph (DAG) defines how tasks are executed. The following example shows how to use the LivyOperator in Airflow to run a Spark job.

Fetch and run a JAR file from Alibaba Cloud Object Storage Service (OSS).

from datetime import timedelta, datetime
from airflow import DAG
from airflow.providers.apache.livy.operators.livy import LivyOperator

default_args = {
    'owner': 'aliyun',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Initiate DAG
livy_operator_sparkpi_dag = DAG(
    dag_id="livy_operator_sparkpi_dag",  # The unique identifier for the DAG.
    default_args=default_args,
    schedule_interval=None,
    start_date=datetime(2024, 5, 20),
    tags=['example', 'spark', 'livy'],
    catchup=False
)

# Define the Livy task with LivyOperator.
# Replace the file path with your own.
livy_sparkpi_submit_task = LivyOperator(
    file="oss://<YourBucket>/jars/spark-examples_2.12-3.3.1.jar",
    class_name="org.apache.spark.examples.SparkPi",
    args=['1000'],
    driver_memory="1g",
    driver_cores=1,
    executor_memory="1g",
    executor_cores=2,
    num_executors=1,
    name="LivyOperator SparkPi",
    task_id="livy_sparkpi_submit_task", 
    dag=livy_operator_sparkpi_dag,
)

livy_sparkpi_submit_task

The following table describes the parameters.

Parameter

Description

dag_id

The unique identifier for the DAG.

schedule_interval

Specifies the scheduling interval for the DAG. None indicates that the task must be triggered manually.

start_date

The date from which the DAG can be scheduled.

tags

Adds tags to the DAG for easy categorization and searching.

catchup

Controls whether to backfill historical tasks. If set to False, unexecuted tasks are not backfilled, even if the start date is in the past.

file

The path to the file for your Spark job. In this example, it is the path to a JAR package uploaded to Alibaba Cloud OSS. Replace the path as required. For information about how to upload files, see Simple upload.

class_name

The main class in the JAR package.

args

The command-line arguments passed to the Spark job.

driver_memory and driver_cores

The memory size and number of cores for the driver, respectively.

executor_memory and executor_cores

The memory size and number of cores for each executor, respectively.

num_executors

The number of executors.

name

The name of the Spark job.

task_id

The unique identifier for the Airflow task.

dag

Associates the task with the DAG.

Method 2: Use the EmrServerlessSparkStartJobRunOperator

Step 1: Configure Apache Airflow

  1. Download airflow_alibaba_provider-0.0.3-py3-none-any.whl.

  2. Install the airflow-alibaba-provider plugin on each Airflow node.

    The airflow-alibaba-provider plugin, provided by the EMR Serverless Spark team, includes the EmrServerlessSparkStartJobRunOperator component, which is designed for submitting EMR Serverless Spark jobs.

    pip install airflow_alibaba_provider-0.0.3-py3-none-any.whl
  3. Add a connection.

    CLI

    Run the corresponding command using the Airflow CLI to create a connection. For more information, see Create a connection.

    airflow connections add 'emr-serverless-spark-id' \
        --conn-json '{
            "conn_type": "emr_serverless_spark",
            "extra": {
                "auth_type": "AK",  # Specifies the use of Alibaba Cloud AccessKey (AK) for authentication.
                "access_key_id": "<yourAccesskeyId>",  # Your Alibaba Cloud AccessKey ID.
                "access_key_secret": "<yourAccesskeyKey>",  # Your Alibaba Cloud AccessKey secret.
                "region": "<yourRegion>"
            }
        }'

    UI

    Manually add a connection on the Airflow web UI. For more information, see Create a connection.

    On the Add Connection page, configure the following parameters.

    The following table describes the parameters.

    Parameter

    Description

    Connection Id

    In this example, the value is emr-serverless-spark-id.

    Connection Type

    Select Generic. If this type is not available, you can select Email.

    Extra

    Enter the following content.

    {
                "auth_type": "AK",  # Specifies the use of Alibaba Cloud AccessKey (AK) for authentication.
                "access_key_id": "<yourAccesskeyId>",  # Your Alibaba Cloud AccessKey ID.
                "access_key_secret": "<yourAccesskeyKey>",  # Your Alibaba Cloud AccessKey secret.
                "region": "<yourRegion>"
            }

Step 2: Write a DAG and submit the job

The following examples show how to use the EmrServerlessSparkStartJobRunOperator in Airflow to run different types of Spark jobs.

JAR job

This scenario shows how to use an Airflow task to submit a pre-compiled Spark JAR job to Alibaba Cloud EMR Serverless Spark.

from __future__ import annotations

from datetime import datetime

from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator

# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"

DAG_ID = "emr_spark_jar"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime(2024, 5, 1),
    default_args={},
    max_active_runs=1,
    catchup=False,
) as dag:
    emr_spark_jar = EmrServerlessSparkStartJobRunOperator(
        task_id="emr_spark_jar",
        emr_serverless_spark_conn_id="emr-serverless-spark-id",
        region="cn-hangzhou",
        polling_interval=5,
        workspace_id="w-7e2f1750c6b3****",
        resource_queue_id="root_queue",
        code_type="JAR",
        name="airflow-emr-spark-jar",
        entry_point="oss://<YourBucket>/spark-resource/examples/jars/spark-examples_2.12-3.3.1.jar",
        entry_point_args=["1"],
        spark_submit_parameters="--class org.apache.spark.examples.SparkPi --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
        is_prod=True,
        engine_release_version=None
    )

    emr_spark_jar

SQL command

Run SQL commands directly in an Airflow DAG.

from __future__ import annotations

from datetime import datetime

from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator

# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "emr_spark_sql"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime(2024, 5, 1),
    default_args={},
    max_active_runs=1,
    catchup=False,
) as dag:
    emr_spark_sql = EmrServerlessSparkStartJobRunOperator(
        task_id="emr_spark_sql",
        emr_serverless_spark_conn_id="emr-serverless-spark-id",
        region="cn-hangzhou",
        polling_interval=5,
        workspace_id="w-7e2f1750c6b3****",
        resource_queue_id="root_queue",
        code_type="SQL",
        name="airflow-emr-spark-sql",
        entry_point=None,
        entry_point_args=["-e","show tables;show tables;"],
        spark_submit_parameters="--class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
        is_prod=True,
        engine_release_version=None,
    )

    emr_spark_sql

SQL file from OSS

Fetch and run an SQL script file from Alibaba Cloud Object Storage Service (OSS).

from __future__ import annotations

from datetime import datetime

from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator

# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"

DAG_ID = "emr_spark_sql_2"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime(2024, 5, 1),
    default_args={},
    max_active_runs=1,
    catchup=False,
) as dag:
    emr_spark_sql_2 = EmrServerlessSparkStartJobRunOperator(
        task_id="emr_spark_sql_2",
        emr_serverless_spark_conn_id="emr-serverless-spark-id",
        region="cn-hangzhou",
        polling_interval=5,
        workspace_id="w-ae42e9c92927****",
        resource_queue_id="root_queue",
        code_type="SQL",
        name="airflow-emr-spark-sql-2",
        entry_point="",
        entry_point_args=["-f", "oss://<YourBucket>/spark-resource/examples/sql/show_db.sql"],
        spark_submit_parameters="--class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
        is_prod=True,
        engine_release_version=None
    )

    emr_spark_sql_2

Python file from OSS

Fetch and run a Python script file from Alibaba Cloud Object Storage Service (OSS).

from __future__ import annotations

from datetime import datetime

from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator

# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"

DAG_ID = "emr_spark_python"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime(2024, 5, 1),
    default_args={},
    max_active_runs=1,
    catchup=False,
) as dag:
    emr_spark_python = EmrServerlessSparkStartJobRunOperator(
        task_id="emr_spark_python",
        emr_serverless_spark_conn_id="emr-serverless-spark-id",
        region="cn-hangzhou",
        polling_interval=5,
        workspace_id="w-ae42e9c92927****",
        resource_queue_id="root_queue",
        code_type="PYTHON",
        name="airflow-emr-spark-python",
        entry_point="oss://<YourBucket>/spark-resource/examples/src/main/python/pi.py",
        entry_point_args=["1"],
        spark_submit_parameters="--conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
        is_prod=True,
        engine_release_version=None
    )

    emr_spark_python

The following table describes the parameters.

Parameter

Type

Description

task_id

str

The unique identifier of the Airflow task.

emr_serverless_spark_conn_id

str

The connection ID for EMR Serverless Spark.

region

str

The region of the EMR Spark workspace.

polling_interval

int

The interval, in seconds, at which Airflow polls the job status.

workspace_id

str

The unique identifier of the EMR Spark workspace.

resource_queue_id

str

The ID of the resource queue used by the EMR Spark job.

code_type

str

The job type. Valid values are SQL, PYTHON, and JAR. The meaning of the entry_point parameter varies based on the job type.

name

str

The name of the EMR Spark job.

entry_point

str

Specify the file location of the startup task, such as a JAR, SQL, or Python file. The meaning of this parameter varies depending on the code_type.

entry_point_args

List

A list of arguments passed to the Spark application.

spark_submit_parameters

str

Contains additional parameters for the spark-submit command.

is_prod

bool

Specifies the environment where the task runs. When set to True, this indicates that the task will be executed in the production environment, and resource_queue_id should specify the ID of the resource queue for the production environment, such as root_queue.

engine_release_version

str

The EMR Spark engine version. Defaults to "esr-2.1-native" (Spark 3.3.1, Scala 2.12, native runtime).