Apache Airflow adalah alat otomatisasi dan penjadwalan alur kerja yang andal, memungkinkan pengembang mengatur, menjadwalkan, serta memantau eksekusi pipa data. EMR Serverless Spark menyediakan lingkungan komputasi tanpa server untuk memproses pekerjaan pemrosesan data skala besar. Topik ini menjelaskan cara menggunakan Apache Airflow untuk mengaktifkan pengiriman pekerjaan otomatis ke EMR Serverless Spark. Pendekatan ini membantu Anda mengotomatiskan penjadwalan dan eksekusi pekerjaan guna mengelola pekerjaan pemrosesan data secara lebih efisien.
Informasi latar belakang
Apache Livy berinteraksi dengan Spark melalui pemanggilan API RESTful, sehingga secara signifikan menyederhanakan kompleksitas komunikasi antara Spark dan server aplikasi. Untuk informasi mengenai API Livy, lihat REST API.
Apache Airflow memungkinkan Anda menggunakan Livy Operator atau EmrServerlessSparkStartJobRunOperator untuk mengirimkan pekerjaan ke EMR Serverless Spark sesuai kebutuhan. Pilih metode yang paling sesuai dengan persyaratan Anda.
Metode | Skenario |
Jika Anda ingin menggunakan Operator open-source Apache Airflow untuk mengirimkan pekerjaan ke Serverless Spark, pilih metode ini. | |
Metode 2: Gunakan EmrServerlessSparkStartJobRunOperator untuk mengirimkan pekerjaan | EmrServerlessSparkStartJobRunOperator adalah komponen khusus yang disediakan oleh EMR Serverless Spark untuk Apache Airflow guna mengirimkan pekerjaan ke EMR Serverless Spark. Operator ini terintegrasi secara mendalam dengan mekanisme Directed Acyclic Graph (DAG) Apache Airflow dan mempermudah pengaturan serta penjadwalan pekerjaan. Jika Anda menggunakan proyek baru atau memproses data sepenuhnya berdasarkan EMR Serverless Spark, kami merekomendasikan metode ini untuk meningkatkan performa dan mengurangi kompleksitas operasional. |
Prasyarat
Airflow telah diinstal dan dijalankan. Untuk informasi selengkapnya, lihat Instalasi Airflow.
Ruang kerja telah dibuat. Untuk informasi selengkapnya, lihat Buat ruang kerja.
Catatan penggunaan
Anda tidak dapat memanggil operasi EmrServerlessSparkStartJobRunOperator untuk mengkueri log pekerjaan. Untuk melihat log pekerjaan, buka halaman EMR Serverless Spark dan cari eksekusi pekerjaan berdasarkan ID eksekusi pekerjaan tersebut. Selanjutnya, Anda dapat memeriksa dan menganalisis log pekerjaan pada tab Logs di halaman detail pekerjaan atau pada halaman Spark Jobs di Spark UI.
Metode 1: Gunakan Livy Operator untuk mengirimkan pekerjaan
Langkah 1: Buat gateway Livy dan token
Buat dan mulai gateway Livy.
Buka halaman Gateways.
Masuk ke Konsol EMR.
Pada panel navigasi sebelah kiri, pilih .
Pada halaman Spark, klik nama ruang kerja yang dituju.
Pada halaman EMR Serverless Spark, klik pada panel navigasi sebelah kiri.
Klik tab Livy Gateway.
Pada halaman Livy Gateways, klik Create Livy Gateway.
Pada halaman Create Gateway, masukkan Name (misalnya, Livy-gateway), lalu klik Create.
Anda dapat mengonfigurasi parameter lain sesuai kebutuhan. Untuk informasi selengkapnya, lihat Manajemen gateway.
Pada halaman Livy Gateways, temukan gateway yang telah dibuat, lalu klik Start pada kolom Actions.
Buat token.
Pada halaman Gateway, temukan Livy-gateway, lalu klik Tokens pada kolom Actions.
Klik Create Token.
Pada kotak dialog Create Token, masukkan Name (misalnya, Livy-token), lalu klik OK.
Salin token tersebut.
PentingSetelah token dibuat, Anda harus segera menyalinnya. Token tidak dapat dilihat kembali setelah Anda meninggalkan halaman. Jika token kedaluwarsa atau hilang, atur ulang token atau buat token baru.
Langkah 2: Konfigurasi Apache Airflow
Jalankan perintah berikut untuk menginstal Apache Livy di Apache Airflow:
pip install apache-airflow-providers-apache-livyTambahkan koneksi.
Gunakan UI
Temukan koneksi default bernama livy_default di Apache Airflow dan ubah koneksi tersebut. Atau, Anda juga dapat menambahkan koneksi secara manual melalui antarmuka web Apache Airflow. Untuk informasi selengkapnya, lihat Creating a Connection with the UI.
Isi informasi berikut:
Host: Masukkan Endpoint gateway.
Schema: Masukkan https.
Extra: Masukkan string JSON.
x-acs-spark-livy-tokenmenunjukkan token yang telah Anda salin pada langkah sebelumnya.{ "x-acs-spark-livy-token": "6ac**********kfu" }
Gunakan CLI
Gunakan antarmuka baris perintah (CLI) Airflow untuk menjalankan perintah guna menambahkan koneksi. Untuk informasi selengkapnya, lihat Creating a Connection.
airflow connections add 'livy_default' \ --conn-json '{ "conn_type": "livy", "host": "pre-emr-spark-livy-gateway-cn-hangzhou.data.aliyun.com/api/v1/workspace/w-xxxxxxx/livycompute/lc-xxxxxxx", # The endpoint of the gateway. "schema": "https", "extra": { "x-acs-spark-livy-token": "6ac**********kfu" # The token that you copied in the previous step. } }'
Langkah 3: Konfigurasi DAG dan kirimkan pekerjaan
Apache Airflow memungkinkan Anda menggunakan Directed Acyclic Graphs (DAG) untuk mendeklarasikan mode eksekusi pekerjaan. Kode contoh berikut menunjukkan cara menggunakan LivyOperator Apache Airflow untuk menjalankan pekerjaan Spark.
Jalankan file skrip Python yang diperoleh dari Object Storage Service (OSS).
from datetime import timedelta, datetime
from airflow import DAG
from airflow.providers.apache.livy.operators.livy import LivyOperator
default_args = {
'owner': 'aliyun',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Initiate DAG
livy_operator_sparkpi_dag = DAG(
dag_id="livy_operator_sparkpi_dag", # The unique ID of the DAG.
default_args=default_args,
schedule_interval=None,
start_date=datetime(2024, 5, 20),
tags=['example', 'spark', 'livy'],
catchup=False
)
# define livy task with LivyOperator
# Replace the value of file as needed.
livy_sparkpi_submit_task = LivyOperator(
file="oss://<YourBucket>/jars/spark-examples_2.12-3.3.1.jar",
class_name="org.apache.spark.examples.SparkPi",
args=['1000'],
driver_memory="1g",
driver_cores=1,
executor_memory="1g",
executor_cores=2,
num_executors=1,
name="LivyOperator SparkPi",
task_id="livy_sparkpi_submit_task",
dag=livy_operator_sparkpi_dag,
)
livy_sparkpi_submit_task
Tabel berikut menjelaskan parameter-parameter tersebut:
Parameter | Deskripsi |
| ID unik DAG. Anda dapat menggunakan ID ini untuk membedakan DAG yang berbeda. |
| Interval penjadwalan DAG. |
| Tanggal dimulainya penjadwalan DAG. |
| Tag yang ditambahkan ke DAG. Ini membantu Anda mengkategorikan dan mencari DAG. |
| Menentukan apakah pekerjaan historis yang belum dijalankan akan dijalankan. Jika parameter ini diatur ke |
| Jalur file yang sesuai dengan pekerjaan Spark. Dalam contoh ini, parameter ini diatur ke jalur paket JAR yang diunggah ke OSS. Anda dapat mengonfigurasi parameter ini sesuai kebutuhan. Untuk informasi tentang cara mengunggah file ke OSS, lihat Simple upload. |
| Nama kelas utama dalam paket JAR. |
| Argumen baris perintah untuk pekerjaan Spark. |
| Ukuran memori dan jumlah core untuk driver. |
| Ukuran memori dan jumlah core untuk setiap executor. |
| Jumlah executor. |
| Nama pekerjaan Spark. |
| Pengidentifikasi unik tugas Airflow. |
| Mengaitkan tugas dengan DAG. |
Metode 2: Gunakan EmrServerlessSparkStartJobRunOperator untuk mengirimkan pekerjaan
Langkah 1: Konfigurasi Apache Airflow
Instal plugin airflow-alibaba-provider pada setiap node Airflow.
Plugin airflow-alibaba-provider disediakan oleh EMR Serverless Spark dan berisi komponen EmrServerlessSparkStartJobRunOperator yang digunakan untuk mengirimkan pekerjaan ke EMR Serverless Spark.
pip install airflow_alibaba_provider-0.0.3-py3-none-any.whlTambahkan koneksi.
Gunakan CLI
Gunakan CLI Airflow untuk menjalankan perintah guna membuat koneksi. Untuk informasi selengkapnya, lihat Creating a Connection.
airflow connections add 'emr-serverless-spark-id' \ --conn-json '{ "conn_type": "emr_serverless_spark", "extra": { "auth_type": "AK", # The AccessKey pair is used for authentication. "access_key_id": "<yourAccesskeyId>", # The AccessKey ID of your Alibaba Cloud account. "access_key_secret": "<yourAccesskeyKey>", # The AccessKey secret of your Alibaba Cloud account. "region": "<yourRegion>" } }'Gunakan UI
Anda dapat membuat koneksi secara manual melalui antarmuka web Airflow. Untuk informasi selengkapnya, lihat Creating a Connection with the UI.
Pada halaman Add Connection, konfigurasikan parameter berikut.

Tabel berikut menjelaskan parameter-parameter tersebut:
Parameter
Deskripsi
Connection Id
ID koneksi. Dalam contoh ini, masukkan emr-serverless-spark-id.
Connection Type
Jenis koneksi. Dalam contoh ini, pilih Generic. Jika Generic tidak tersedia, Anda juga dapat memilih Email.
Extra
Konfigurasi tambahan. Dalam contoh ini, masukkan konten berikut:
{ "auth_type": "AK", # The AccessKey pair is used for authentication. "access_key_id": "<yourAccesskeyId>", # The AccessKey ID of your Alibaba Cloud account. "access_key_secret": "<yourAccesskeyKey>", # The AccessKey secret of your Alibaba Cloud account. "region": "<yourRegion>" }
Langkah 2: Konfigurasi DAG
Apache Airflow memungkinkan Anda menggunakan Directed Acyclic Graphs (DAG) untuk mendeklarasikan mode eksekusi pekerjaan. Kode contoh berikut memberikan contoh cara menggunakan EmrServerlessSparkStartJobRunOperator untuk menjalankan berbagai jenis pekerjaan Spark di Apache Airflow.
Kirimkan paket JAR
Gunakan tugas Airflow untuk mengirimkan pekerjaan Spark JAR yang telah dikompilasi sebelumnya ke EMR Serverless Spark.
from __future__ import annotations
from datetime import datetime
from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator
# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"
DAG_ID = "emr_spark_jar"
with DAG(
dag_id=DAG_ID,
start_date=datetime(2024, 5, 1),
default_args={},
max_active_runs=1,
catchup=False,
) as dag:
emr_spark_jar = EmrServerlessSparkStartJobRunOperator(
task_id="emr_spark_jar",
emr_serverless_spark_conn_id="emr-serverless-spark-id",
region="cn-hangzhou",
polling_interval=5,
workspace_id="w-7e2f1750c6b3****",
resource_queue_id="root_queue",
code_type="JAR",
name="airflow-emr-spark-jar",
entry_point="oss://<YourBucket>/spark-resource/examples/jars/spark-examples_2.12-3.3.1.jar",
entry_point_args=["1"],
spark_submit_parameters="--class org.apache.spark.examples.SparkPi --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
is_prod=True,
engine_release_version=None
)
emr_spark_jar
Kirimkan file SQL
Jalankan perintah SQL dalam DAG Airflow.
from __future__ import annotations
from datetime import datetime
from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator
# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "emr_spark_sql"
with DAG(
dag_id=DAG_ID,
start_date=datetime(2024, 5, 1),
default_args={},
max_active_runs=1,
catchup=False,
) as dag:
emr_spark_sql = EmrServerlessSparkStartJobRunOperator(
task_id="emr_spark_sql",
emr_serverless_spark_conn_id="emr-serverless-spark-id",
region="cn-hangzhou",
polling_interval=5,
workspace_id="w-7e2f1750c6b3****",
resource_queue_id="root_queue",
code_type="SQL",
name="airflow-emr-spark-sql",
entry_point=None,
entry_point_args=["-e","show tables;show tables;"],
spark_submit_parameters="--class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
is_prod=True,
engine_release_version=None,
)
emr_spark_sql
Kirimkan file SQL dari OSS
Jalankan file skrip SQL yang diperoleh dari OSS.
from __future__ import annotations
from datetime import datetime
from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator
# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"
DAG_ID = "emr_spark_sql_2"
with DAG(
dag_id=DAG_ID,
start_date=datetime(2024, 5, 1),
default_args={},
max_active_runs=1,
catchup=False,
) as dag:
emr_spark_sql_2 = EmrServerlessSparkStartJobRunOperator(
task_id="emr_spark_sql_2",
emr_serverless_spark_conn_id="emr-serverless-spark-id",
region="cn-hangzhou",
polling_interval=5,
workspace_id="w-ae42e9c92927****",
resource_queue_id="root_queue",
code_type="SQL",
name="airflow-emr-spark-sql-2",
entry_point="",
entry_point_args=["-f", "oss://<YourBucket>/spark-resource/examples/sql/show_db.sql"],
spark_submit_parameters="--class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
is_prod=True,
engine_release_version=None
)
emr_spark_sql_2
Kirimkan skrip Python dari OSS
Jalankan file skrip Python yang diperoleh dari OSS.
from __future__ import annotations
from datetime import datetime
from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator
# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"
DAG_ID = "emr_spark_python"
with DAG(
dag_id=DAG_ID,
start_date=datetime(2024, 5, 1),
default_args={},
max_active_runs=1,
catchup=False,
) as dag:
emr_spark_python = EmrServerlessSparkStartJobRunOperator(
task_id="emr_spark_python",
emr_serverless_spark_conn_id="emr-serverless-spark-id",
region="cn-hangzhou",
polling_interval=5,
workspace_id="w-ae42e9c92927****",
resource_queue_id="root_queue",
code_type="PYTHON",
name="airflow-emr-spark-python",
entry_point="oss://<YourBucket>/spark-resource/examples/src/main/python/pi.py",
entry_point_args=["1"],
spark_submit_parameters="--conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
is_prod=True,
engine_release_version=None
)
emr_spark_python
Tabel berikut menjelaskan parameter-parameter tersebut:
Parameter | Tipe | Deskripsi |
|
| Pengidentifikasi unik tugas Airflow. |
|
| ID koneksi antara Airflow dan EMR Serverless Spark. |
|
| Wilayah tempat pekerjaan EMR Spark dibuat. |
|
| Interval waktu Airflow mengkueri status pekerjaan. Satuan: detik. |
|
| Pengidentifikasi unik ruang kerja tempat pekerjaan EMR Spark berada. |
|
| ID antrian sumber daya yang digunakan oleh pekerjaan EMR Spark. |
|
| Jenis pekerjaan. Pekerjaan SQL, Python, dan JAR didukung. Makna parameter entry_point bervariasi tergantung jenis pekerjaan. |
|
| Nama pekerjaan EMR Spark. |
|
| Lokasi file yang digunakan untuk memulai pekerjaan. File JAR, SQL, dan Python didukung. Makna parameter ini bervariasi tergantung pada |
|
| Parameter yang diteruskan ke aplikasi Spark. |
|
| Parameter tambahan yang digunakan untuk perintah |
|
| Lingkungan tempat pekerjaan dijalankan. Jika parameter ini diatur ke True, pekerjaan dijalankan di lingkungan produksi. Dalam hal ini, parameter |
|
| Versi mesin EMR Spark. Nilai default: esr-2.1-native, yang menunjukkan mesin yang menjalankan Spark 3.3.1 dan Scala 2.12 dalam runtime native. |