All Products
Search
Document Center

E-MapReduce:Use Apache Airflow to submit a job

Last Updated:Mar 26, 2026

If you orchestrate data pipelines with Apache Airflow and want to trigger Spark jobs on EMR Serverless Spark automatically, this guide covers two methods: the open-source LivyOperator (via a Livy gateway) and the EMR-native EmrServerlessSparkStartJobRunOperator (via the airflow-alibaba-provider plugin).

Choose a method

Method When to use
Method 1: LivyOperator Submit jobs using the standard Apache Airflow Livy provider. Works with any Airflow setup.
Method 2: EmrServerlessSparkStartJobRunOperator Submit jobs using the EMR-native operator, which integrates directly with the Directed Acyclic Graph (DAG) mechanism. Recommended for new projects built entirely on EMR Serverless Spark.

Prerequisites

Before you begin, ensure that you have:

Usage notes

You cannot call the EmrServerlessSparkStartJobRunOperator operation to query job logs. If you want to view job logs, you must go to the EMR Serverless Spark page and find the job run whose logs you want to view by job run ID. Then, you can check and analyze the job logs on the Logs Exploration tab of the job details page or on the Spark Jobs page in the Spark UI.

Method 1: Use LivyOperator

Apache Livy provides a REST API that abstracts Spark cluster communication. For information about the Livy API, see REST API. The LivyOperator in Apache Airflow uses this API to submit jobs to EMR Serverless Spark through a Livy gateway.

Step 1: Create a Livy gateway and token

Create a Livy gateway:

  1. Log on to the EMR console.

  2. In the left-side navigation pane, choose EMR Serverless > Spark.

  3. Click the name of the target workspace.

  4. In the left-side navigation pane, click Operation Center > Gateway.

  5. Click the Livy Gateway tab.

  6. Click Create Livy Gateway.

  7. Enter a Name (for example, Livy-gateway) and click Create. For other parameters, see Manage gateways.

  8. On the Livy Gateway page, find the gateway you created and click Start in the Actions column.

Create a token:

  1. On the Gateway page, find Livy-gateway and click Tokens in the Actions column.

  2. Click Create Token.

  3. In the Create Token dialog box, enter a Name (for example, Livy-token) and click OK.

  4. Copy the token immediately.

Important

The token is only visible once. After you leave the page, you cannot retrieve it. If the token is lost or expires, reset it or create a new one.

Step 2: Configure Airflow

  1. Install the Apache Livy provider:

    pip install apache-airflow-providers-apache-livy
  2. Add a connection to Airflow. Use either the UI or the CLI. UI: Find the default connection named livy_default and edit it, or create a new connection. See Creating a connection with the UI. Set the following fields: CLI: Run the following command. Replace the host value with your gateway endpoint and x-acs-spark-livy-token with the token you copied.

    Field Value
    Host The endpoint of the Livy gateway
    Schema https
    Extra {"x-acs-spark-livy-token": "<your-token>"}
    airflow connections add 'livy_default' \
        --conn-json '{
            "conn_type": "livy",
            "host": "pre-emr-spark-livy-gateway-cn-hangzhou.data.aliyun.com/api/v1/workspace/w-xxxxxxx/livycompute/lc-xxxxxxx",
            "schema": "https",
            "extra": {
                "x-acs-spark-livy-token": "6ac**********kfu"
            }
        }'

    See Creating a connection with the CLI for more options.

Step 3: Define a DAG and submit a job

The following example uses LivyOperator to run a SparkPi job from a JAR stored in Object Storage Service (OSS).

from datetime import timedelta, datetime
from airflow import DAG
from airflow.providers.apache.livy.operators.livy import LivyOperator

default_args = {
    'owner': 'aliyun',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Initialize the DAG
livy_operator_sparkpi_dag = DAG(
    dag_id="livy_operator_sparkpi_dag",
    default_args=default_args,
    schedule_interval=None,   # Trigger manually
    start_date=datetime(2024, 5, 20),
    tags=['example', 'spark', 'livy'],
    catchup=False
)

# Define the Livy task
livy_sparkpi_submit_task = LivyOperator(
    file="oss://<YourBucket>/jars/spark-examples_2.12-3.3.1.jar",  # Replace with your OSS path
    class_name="org.apache.spark.examples.SparkPi",
    args=['1000'],
    driver_memory="1g",
    driver_cores=1,
    executor_memory="1g",
    executor_cores=2,
    num_executors=1,
    name="LivyOperator SparkPi",
    task_id="livy_sparkpi_submit_task",
    dag=livy_operator_sparkpi_dag,
)

livy_sparkpi_submit_task

Replace the file value with the OSS path of your JAR. For instructions on uploading files to OSS, see Simple upload.

DAG parameters:

Parameter Description
dag_id Unique identifier of the DAG.
schedule_interval Schedule for the DAG. None means manual trigger only.
start_date Date from which the DAG is eligible to run.
tags Labels for categorizing and searching DAGs in the UI.
catchup Whether to run skipped historical runs. False skips them even if start_date is in the past.

LivyOperator parameters:

Parameter Description
file Path to the Spark job file. Supports OSS JAR paths.
class_name Main class in the JAR package.
args Command-line arguments passed to the Spark job.
driver_memory, driver_cores Memory and CPU cores for the driver.
executor_memory, executor_cores Memory and CPU cores per executor.
num_executors Number of executors.
name Display name of the Spark job.
task_id Unique identifier of the Airflow task within the DAG.
dag Associates the task with its parent DAG.

Method 2: Use EmrServerlessSparkStartJobRunOperator

EmrServerlessSparkStartJobRunOperator is an EMR Serverless Spark-native operator provided through the airflow-alibaba-provider plugin. It integrates directly with Airflow's DAG mechanism and supports JAR, SQL, and Python job types.

EmrServerlessSparkStartJobRunOperator does not support querying job logs. To view logs, go to the EMR Serverless Spark console, find the job run by its job run ID, then check the Logs Exploration tab on the job details page or the Spark Jobs page in the Spark UI.

Step 1: Configure Airflow

  1. Download airflow_alibaba_provider-0.0.3-py3-none-any.whl.

  2. Install the plugin on each Airflow node:

    pip install airflow_alibaba_provider-0.0.3-py3-none-any.whl
  3. Add a connection to Airflow. Use either the CLI or the UI. CLI:

    Field Value
    Connection Id emr-serverless-spark-id
    Connection Type Generic (or Email if Generic is unavailable)
    Extra {"auth_type": "AK", "access_key_id": "<yourAccesskeyId>", "access_key_secret": "<yourAccesskeyKey>", "region": "<yourRegion>"}
    airflow connections add 'emr-serverless-spark-id' \
        --conn-json '{
            "conn_type": "emr_serverless_spark",
            "extra": {
                "auth_type": "AK",
                "access_key_id": "<yourAccesskeyId>",
                "access_key_secret": "<yourAccesskeyKey>",
                "region": "<yourRegion>"
            }
        }'

    See Creating a connection with the CLI for more options. UI: Go to Admin > Connections > Add Connection and fill in the following fields. See Creating a connection with the UI.

    image

Step 2: Define DAGs and submit jobs

All examples use EmrServerlessSparkStartJobRunOperator to submit jobs to EMR Serverless Spark. Replace workspace_id, region, and entry point values with your own.

Submit a JAR job

from __future__ import annotations
from datetime import datetime
from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator

# mypy: disable-error-code="call-arg"

with DAG(
    dag_id="emr_spark_jar",
    start_date=datetime(2024, 5, 1),
    default_args={},
    max_active_runs=1,
    catchup=False,
) as dag:
    emr_spark_jar = EmrServerlessSparkStartJobRunOperator(
        task_id="emr_spark_jar",
        emr_serverless_spark_conn_id="emr-serverless-spark-id",
        region="cn-hangzhou",
        polling_interval=5,
        workspace_id="w-7e2f1750c6b3****",
        resource_queue_id="root_queue",
        code_type="JAR",
        name="airflow-emr-spark-jar",
        entry_point="oss://<YourBucket>/spark-resource/examples/jars/spark-examples_2.12-3.3.1.jar",
        entry_point_args=["1"],
        spark_submit_parameters="--class org.apache.spark.examples.SparkPi --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
        is_prod=True,
        engine_release_version=None
    )

    emr_spark_jar

Submit an inline SQL job

from __future__ import annotations
from datetime import datetime
from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator

# mypy: disable-error-code="call-arg"

with DAG(
    dag_id="emr_spark_sql",
    start_date=datetime(2024, 5, 1),
    default_args={},
    max_active_runs=1,
    catchup=False,
) as dag:
    emr_spark_sql = EmrServerlessSparkStartJobRunOperator(
        task_id="emr_spark_sql",
        emr_serverless_spark_conn_id="emr-serverless-spark-id",
        region="cn-hangzhou",
        polling_interval=5,
        workspace_id="w-7e2f1750c6b3****",
        resource_queue_id="root_queue",
        code_type="SQL",
        name="airflow-emr-spark-sql",
        entry_point=None,
        entry_point_args=["-e", "show tables;show tables;"],
        spark_submit_parameters="--class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
        is_prod=True,
        engine_release_version=None,
    )

    emr_spark_sql

Submit an SQL file from OSS

from __future__ import annotations
from datetime import datetime
from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator

# mypy: disable-error-code="call-arg"

with DAG(
    dag_id="emr_spark_sql_2",
    start_date=datetime(2024, 5, 1),
    default_args={},
    max_active_runs=1,
    catchup=False,
) as dag:
    emr_spark_sql_2 = EmrServerlessSparkStartJobRunOperator(
        task_id="emr_spark_sql_2",
        emr_serverless_spark_conn_id="emr-serverless-spark-id",
        region="cn-hangzhou",
        polling_interval=5,
        workspace_id="w-ae42e9c92927****",
        resource_queue_id="root_queue",
        code_type="SQL",
        name="airflow-emr-spark-sql-2",
        entry_point="",
        entry_point_args=["-f", "oss://<YourBucket>/spark-resource/examples/sql/show_db.sql"],
        spark_submit_parameters="--class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
        is_prod=True,
        engine_release_version=None
    )

    emr_spark_sql_2

Submit a Python script from OSS

from __future__ import annotations
from datetime import datetime
from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator

# mypy: disable-error-code="call-arg"

with DAG(
    dag_id="emr_spark_python",
    start_date=datetime(2024, 5, 1),
    default_args={},
    max_active_runs=1,
    catchup=False,
) as dag:
    emr_spark_python = EmrServerlessSparkStartJobRunOperator(
        task_id="emr_spark_python",
        emr_serverless_spark_conn_id="emr-serverless-spark-id",
        region="cn-hangzhou",
        polling_interval=5,
        workspace_id="w-ae42e9c92927****",
        resource_queue_id="root_queue",
        code_type="PYTHON",
        name="airflow-emr-spark-python",
        entry_point="oss://<YourBucket>/spark-resource/examples/src/main/python/pi.py",
        entry_point_args=["1"],
        spark_submit_parameters="--conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
        is_prod=True,
        engine_release_version=None
    )

    emr_spark_python

EmrServerlessSparkStartJobRunOperator parameters:

Parameter Type Description
task_id str Unique identifier of the Airflow task within the DAG.
emr_serverless_spark_conn_id str ID of the Airflow connection to EMR Serverless Spark.
region str Region where the EMR Serverless Spark workspace is located.
polling_interval int Interval in seconds at which Airflow polls for job status.
workspace_id str ID of the EMR Serverless Spark workspace.
resource_queue_id str ID of the resource queue for the job.
code_type str Job type: JAR, SQL, or PYTHON. Determines how entry_point is interpreted.
name str Display name of the Spark job.
entry_point str Location of the file used to start the job. JAR, SQL, and Python files are supported. The meaning of this parameter varies based on code_type.
entry_point_args List Arguments passed to the Spark application.
spark_submit_parameters str Additional spark-submit parameters, such as class name and resource configuration.
is_prod bool True runs the job in the production environment. The resource_queue_id must match the production resource queue (for example, root_queue).
engine_release_version str EMR Spark engine version. Default is esr-2.1-native (Spark 3.3.1, Scala 2.12, native runtime).