全部产品
Search
文档中心

E-MapReduce:Gunakan Apache Airflow untuk mengirimkan pekerjaan

更新时间:Nov 10, 2025

Apache Airflow adalah alat otomatisasi dan penjadwalan alur kerja yang andal, memungkinkan pengembang mengatur, menjadwalkan, serta memantau eksekusi pipa data. EMR Serverless Spark menyediakan lingkungan komputasi tanpa server untuk memproses pekerjaan pemrosesan data skala besar. Topik ini menjelaskan cara menggunakan Apache Airflow untuk mengaktifkan pengiriman pekerjaan otomatis ke EMR Serverless Spark. Pendekatan ini membantu Anda mengotomatiskan penjadwalan dan eksekusi pekerjaan guna mengelola pekerjaan pemrosesan data secara lebih efisien.

Informasi latar belakang

Apache Livy berinteraksi dengan Spark melalui pemanggilan API RESTful, sehingga secara signifikan menyederhanakan kompleksitas komunikasi antara Spark dan server aplikasi. Untuk informasi mengenai API Livy, lihat REST API.

Apache Airflow memungkinkan Anda menggunakan Livy Operator atau EmrServerlessSparkStartJobRunOperator untuk mengirimkan pekerjaan ke EMR Serverless Spark sesuai kebutuhan. Pilih metode yang paling sesuai dengan persyaratan Anda.

Metode

Skenario

Metode 1: Gunakan Livy Operator untuk mengirimkan pekerjaan

Jika Anda ingin menggunakan Operator open-source Apache Airflow untuk mengirimkan pekerjaan ke Serverless Spark, pilih metode ini.

Metode 2: Gunakan EmrServerlessSparkStartJobRunOperator untuk mengirimkan pekerjaan

EmrServerlessSparkStartJobRunOperator adalah komponen khusus yang disediakan oleh EMR Serverless Spark untuk Apache Airflow guna mengirimkan pekerjaan ke EMR Serverless Spark. Operator ini terintegrasi secara mendalam dengan mekanisme Directed Acyclic Graph (DAG) Apache Airflow dan mempermudah pengaturan serta penjadwalan pekerjaan.

Jika Anda menggunakan proyek baru atau memproses data sepenuhnya berdasarkan EMR Serverless Spark, kami merekomendasikan metode ini untuk meningkatkan performa dan mengurangi kompleksitas operasional.

Prasyarat

  • Airflow telah diinstal dan dijalankan. Untuk informasi selengkapnya, lihat Instalasi Airflow.

  • Ruang kerja telah dibuat. Untuk informasi selengkapnya, lihat Buat ruang kerja.

Catatan penggunaan

Anda tidak dapat memanggil operasi EmrServerlessSparkStartJobRunOperator untuk mengkueri log pekerjaan. Untuk melihat log pekerjaan, buka halaman EMR Serverless Spark dan cari eksekusi pekerjaan berdasarkan ID eksekusi pekerjaan tersebut. Selanjutnya, Anda dapat memeriksa dan menganalisis log pekerjaan pada tab Logs di halaman detail pekerjaan atau pada halaman Spark Jobs di Spark UI.

Metode 1: Gunakan Livy Operator untuk mengirimkan pekerjaan

Langkah 1: Buat gateway Livy dan token

  1. Buat dan mulai gateway Livy.

    1. Buka halaman Gateways.

      1. Masuk ke Konsol EMR.

      2. Pada panel navigasi sebelah kiri, pilih EMR Serverless > Spark.

      3. Pada halaman Spark, klik nama ruang kerja yang dituju.

      4. Pada halaman EMR Serverless Spark, klik Operation Center > Gateway pada panel navigasi sebelah kiri.

    2. Klik tab Livy Gateway.

    3. Pada halaman Livy Gateways, klik Create Livy Gateway.

    4. Pada halaman Create Gateway, masukkan Name (misalnya, Livy-gateway), lalu klik Create.

      Anda dapat mengonfigurasi parameter lain sesuai kebutuhan. Untuk informasi selengkapnya, lihat Manajemen gateway.

    5. Pada halaman Livy Gateways, temukan gateway yang telah dibuat, lalu klik Start pada kolom Actions.

  2. Buat token.

    1. Pada halaman Gateway, temukan Livy-gateway, lalu klik Tokens pada kolom Actions.

    2. Klik Create Token.

    3. Pada kotak dialog Create Token, masukkan Name (misalnya, Livy-token), lalu klik OK.

    4. Salin token tersebut.

      Penting

      Setelah token dibuat, Anda harus segera menyalinnya. Token tidak dapat dilihat kembali setelah Anda meninggalkan halaman. Jika token kedaluwarsa atau hilang, atur ulang token atau buat token baru.

Langkah 2: Konfigurasi Apache Airflow

  1. Jalankan perintah berikut untuk menginstal Apache Livy di Apache Airflow:

    pip install apache-airflow-providers-apache-livy
  2. Tambahkan koneksi.

    Gunakan UI

    Temukan koneksi default bernama livy_default di Apache Airflow dan ubah koneksi tersebut. Atau, Anda juga dapat menambahkan koneksi secara manual melalui antarmuka web Apache Airflow. Untuk informasi selengkapnya, lihat Creating a Connection with the UI.

    Isi informasi berikut:

    • Host: Masukkan Endpoint gateway.

    • Schema: Masukkan https.

    • Extra: Masukkan string JSON. x-acs-spark-livy-token menunjukkan token yang telah Anda salin pada langkah sebelumnya.

      {
        "x-acs-spark-livy-token": "6ac**********kfu"
      }

    Gunakan CLI

    Gunakan antarmuka baris perintah (CLI) Airflow untuk menjalankan perintah guna menambahkan koneksi. Untuk informasi selengkapnya, lihat Creating a Connection.

    airflow connections add 'livy_default' \
        --conn-json '{
            "conn_type": "livy",
            "host": "pre-emr-spark-livy-gateway-cn-hangzhou.data.aliyun.com/api/v1/workspace/w-xxxxxxx/livycompute/lc-xxxxxxx",   # The endpoint of the gateway.
            "schema": "https",
            "extra": {
                "x-acs-spark-livy-token": "6ac**********kfu"  # The token that you copied in the previous step.
            }
        }'

Langkah 3: Konfigurasi DAG dan kirimkan pekerjaan

Apache Airflow memungkinkan Anda menggunakan Directed Acyclic Graphs (DAG) untuk mendeklarasikan mode eksekusi pekerjaan. Kode contoh berikut menunjukkan cara menggunakan LivyOperator Apache Airflow untuk menjalankan pekerjaan Spark.

Jalankan file skrip Python yang diperoleh dari Object Storage Service (OSS).

from datetime import timedelta, datetime
from airflow import DAG
from airflow.providers.apache.livy.operators.livy import LivyOperator

default_args = {
    'owner': 'aliyun',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Initiate DAG
livy_operator_sparkpi_dag = DAG(
    dag_id="livy_operator_sparkpi_dag",  # The unique ID of the DAG.
    default_args=default_args,
    schedule_interval=None,
    start_date=datetime(2024, 5, 20),
    tags=['example', 'spark', 'livy'],
    catchup=False
)

# define livy task with LivyOperator
# Replace the value of file as needed.
livy_sparkpi_submit_task = LivyOperator(
    file="oss://<YourBucket>/jars/spark-examples_2.12-3.3.1.jar",
    class_name="org.apache.spark.examples.SparkPi",
    args=['1000'],
    driver_memory="1g",
    driver_cores=1,
    executor_memory="1g",
    executor_cores=2,
    num_executors=1,
    name="LivyOperator SparkPi",
    task_id="livy_sparkpi_submit_task", 
    dag=livy_operator_sparkpi_dag,
)

livy_sparkpi_submit_task

Tabel berikut menjelaskan parameter-parameter tersebut:

Parameter

Deskripsi

dag_id

ID unik DAG. Anda dapat menggunakan ID ini untuk membedakan DAG yang berbeda.

schedule_interval

Interval penjadwalan DAG. None berarti Anda perlu menjadwalkan pekerjaan dalam DAG secara manual.

start_date

Tanggal dimulainya penjadwalan DAG.

tags

Tag yang ditambahkan ke DAG. Ini membantu Anda mengkategorikan dan mencari DAG.

catchup

Menentukan apakah pekerjaan historis yang belum dijalankan akan dijalankan. Jika parameter ini diatur ke False, pekerjaan historis tidak akan dijalankan meskipun tanggal mulai lebih awal.

file

Jalur file yang sesuai dengan pekerjaan Spark. Dalam contoh ini, parameter ini diatur ke jalur paket JAR yang diunggah ke OSS. Anda dapat mengonfigurasi parameter ini sesuai kebutuhan. Untuk informasi tentang cara mengunggah file ke OSS, lihat Simple upload.

class_name

Nama kelas utama dalam paket JAR.

args

Argumen baris perintah untuk pekerjaan Spark.

driver_memory dan driver_cores

Ukuran memori dan jumlah core untuk driver.

executor_memory dan executor_cores

Ukuran memori dan jumlah core untuk setiap executor.

num_executors

Jumlah executor.

name

Nama pekerjaan Spark.

task_id

Pengidentifikasi unik tugas Airflow.

dag

Mengaitkan tugas dengan DAG.

Metode 2: Gunakan EmrServerlessSparkStartJobRunOperator untuk mengirimkan pekerjaan

Langkah 1: Konfigurasi Apache Airflow

  1. Unduh airflow_alibaba_provider-0.0.3-py3-none-any.whl.

  2. Instal plugin airflow-alibaba-provider pada setiap node Airflow.

    Plugin airflow-alibaba-provider disediakan oleh EMR Serverless Spark dan berisi komponen EmrServerlessSparkStartJobRunOperator yang digunakan untuk mengirimkan pekerjaan ke EMR Serverless Spark.

    pip install airflow_alibaba_provider-0.0.3-py3-none-any.whl
  3. Tambahkan koneksi.

    Gunakan CLI

    Gunakan CLI Airflow untuk menjalankan perintah guna membuat koneksi. Untuk informasi selengkapnya, lihat Creating a Connection.

    airflow connections add 'emr-serverless-spark-id' \
        --conn-json '{
            "conn_type": "emr_serverless_spark",
            "extra": {
                "auth_type": "AK",  # The AccessKey pair is used for authentication.
                "access_key_id": "<yourAccesskeyId>",  # The AccessKey ID of your Alibaba Cloud account.
                "access_key_secret": "<yourAccesskeyKey>",  # The AccessKey secret of your Alibaba Cloud account.
                "region": "<yourRegion>"
            }
        }'

    Gunakan UI

    Anda dapat membuat koneksi secara manual melalui antarmuka web Airflow. Untuk informasi selengkapnya, lihat Creating a Connection with the UI.

    Pada halaman Add Connection, konfigurasikan parameter berikut.

    image

    Tabel berikut menjelaskan parameter-parameter tersebut:

    Parameter

    Deskripsi

    Connection Id

    ID koneksi. Dalam contoh ini, masukkan emr-serverless-spark-id.

    Connection Type

    Jenis koneksi. Dalam contoh ini, pilih Generic. Jika Generic tidak tersedia, Anda juga dapat memilih Email.

    Extra

    Konfigurasi tambahan. Dalam contoh ini, masukkan konten berikut:

    {
                "auth_type": "AK",  # The AccessKey pair is used for authentication.
                "access_key_id": "<yourAccesskeyId>",  # The AccessKey ID of your Alibaba Cloud account.
                "access_key_secret": "<yourAccesskeyKey>",  # The AccessKey secret of your Alibaba Cloud account.
                "region": "<yourRegion>"
            }

Langkah 2: Konfigurasi DAG

Apache Airflow memungkinkan Anda menggunakan Directed Acyclic Graphs (DAG) untuk mendeklarasikan mode eksekusi pekerjaan. Kode contoh berikut memberikan contoh cara menggunakan EmrServerlessSparkStartJobRunOperator untuk menjalankan berbagai jenis pekerjaan Spark di Apache Airflow.

Kirimkan paket JAR

Gunakan tugas Airflow untuk mengirimkan pekerjaan Spark JAR yang telah dikompilasi sebelumnya ke EMR Serverless Spark.

from __future__ import annotations

from datetime import datetime

from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator

# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"

DAG_ID = "emr_spark_jar"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime(2024, 5, 1),
    default_args={},
    max_active_runs=1,
    catchup=False,
) as dag:
    emr_spark_jar = EmrServerlessSparkStartJobRunOperator(
        task_id="emr_spark_jar",
        emr_serverless_spark_conn_id="emr-serverless-spark-id",
        region="cn-hangzhou",
        polling_interval=5,
        workspace_id="w-7e2f1750c6b3****",
        resource_queue_id="root_queue",
        code_type="JAR",
        name="airflow-emr-spark-jar",
        entry_point="oss://<YourBucket>/spark-resource/examples/jars/spark-examples_2.12-3.3.1.jar",
        entry_point_args=["1"],
        spark_submit_parameters="--class org.apache.spark.examples.SparkPi --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
        is_prod=True,
        engine_release_version=None
    )

    emr_spark_jar

Kirimkan file SQL

Jalankan perintah SQL dalam DAG Airflow.

from __future__ import annotations

from datetime import datetime

from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator

# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "emr_spark_sql"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime(2024, 5, 1),
    default_args={},
    max_active_runs=1,
    catchup=False,
) as dag:
    emr_spark_sql = EmrServerlessSparkStartJobRunOperator(
        task_id="emr_spark_sql",
        emr_serverless_spark_conn_id="emr-serverless-spark-id",
        region="cn-hangzhou",
        polling_interval=5,
        workspace_id="w-7e2f1750c6b3****",
        resource_queue_id="root_queue",
        code_type="SQL",
        name="airflow-emr-spark-sql",
        entry_point=None,
        entry_point_args=["-e","show tables;show tables;"],
        spark_submit_parameters="--class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
        is_prod=True,
        engine_release_version=None,
    )

    emr_spark_sql

Kirimkan file SQL dari OSS

Jalankan file skrip SQL yang diperoleh dari OSS.

from __future__ import annotations

from datetime import datetime

from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator

# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"

DAG_ID = "emr_spark_sql_2"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime(2024, 5, 1),
    default_args={},
    max_active_runs=1,
    catchup=False,
) as dag:
    emr_spark_sql_2 = EmrServerlessSparkStartJobRunOperator(
        task_id="emr_spark_sql_2",
        emr_serverless_spark_conn_id="emr-serverless-spark-id",
        region="cn-hangzhou",
        polling_interval=5,
        workspace_id="w-ae42e9c92927****",
        resource_queue_id="root_queue",
        code_type="SQL",
        name="airflow-emr-spark-sql-2",
        entry_point="",
        entry_point_args=["-f", "oss://<YourBucket>/spark-resource/examples/sql/show_db.sql"],
        spark_submit_parameters="--class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
        is_prod=True,
        engine_release_version=None
    )

    emr_spark_sql_2

Kirimkan skrip Python dari OSS

Jalankan file skrip Python yang diperoleh dari OSS.

from __future__ import annotations

from datetime import datetime

from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator

# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"

DAG_ID = "emr_spark_python"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime(2024, 5, 1),
    default_args={},
    max_active_runs=1,
    catchup=False,
) as dag:
    emr_spark_python = EmrServerlessSparkStartJobRunOperator(
        task_id="emr_spark_python",
        emr_serverless_spark_conn_id="emr-serverless-spark-id",
        region="cn-hangzhou",
        polling_interval=5,
        workspace_id="w-ae42e9c92927****",
        resource_queue_id="root_queue",
        code_type="PYTHON",
        name="airflow-emr-spark-python",
        entry_point="oss://<YourBucket>/spark-resource/examples/src/main/python/pi.py",
        entry_point_args=["1"],
        spark_submit_parameters="--conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
        is_prod=True,
        engine_release_version=None
    )

    emr_spark_python

Tabel berikut menjelaskan parameter-parameter tersebut:

Parameter

Tipe

Deskripsi

task_id

str

Pengidentifikasi unik tugas Airflow.

emr_serverless_spark_conn_id

str

ID koneksi antara Airflow dan EMR Serverless Spark.

region

str

Wilayah tempat pekerjaan EMR Spark dibuat.

polling_interval

int

Interval waktu Airflow mengkueri status pekerjaan. Satuan: detik.

workspace_id

str

Pengidentifikasi unik ruang kerja tempat pekerjaan EMR Spark berada.

resource_queue_id

str

ID antrian sumber daya yang digunakan oleh pekerjaan EMR Spark.

code_type

str

Jenis pekerjaan. Pekerjaan SQL, Python, dan JAR didukung. Makna parameter entry_point bervariasi tergantung jenis pekerjaan.

name

str

Nama pekerjaan EMR Spark.

entry_point

str

Lokasi file yang digunakan untuk memulai pekerjaan. File JAR, SQL, dan Python didukung. Makna parameter ini bervariasi tergantung pada code_type.

entry_point_args

List

Parameter yang diteruskan ke aplikasi Spark.

spark_submit_parameters

str

Parameter tambahan yang digunakan untuk perintah spark-submit.

is_prod

bool

Lingkungan tempat pekerjaan dijalankan. Jika parameter ini diatur ke True, pekerjaan dijalankan di lingkungan produksi. Dalam hal ini, parameter resource_queue_id harus diatur ke ID antrian sumber daya yang sesuai di lingkungan produksi, seperti root_queue.

engine_release_version

str

Versi mesin EMR Spark. Nilai default: esr-2.1-native, yang menunjukkan mesin yang menjalankan Spark 3.3.1 dan Scala 2.12 dalam runtime native.