MaxCompute memungkinkan Anda menggunakan Apache Airflow untuk menjadwalkan pekerjaan melalui antarmuka Python. Topik ini menjelaskan cara menggunakan operator Python dari Apache Airflow untuk menjadwalkan pekerjaan MaxCompute.
Informasi latar belakang
Apache Airflow adalah alat sumber terbuka yang dikembangkan oleh Airbnb. Alat ini ditulis dalam Python dan digunakan untuk menjadwalkan pekerjaan. Apache Airflow menggunakan grafik asiklik berarah (DAG) untuk mendefinisikan sekelompok pekerjaan dengan hubungan ketergantungan, serta menjadwalkan pekerjaan tersebut berdasarkan hubungan tersebut. Apache Airflow juga memungkinkan Anda mendefinisikan subpekerjaan menggunakan antarmuka Python. Alat ini mendukung berbagai macam operator untuk memenuhi kebutuhan bisnis Anda. Untuk informasi lebih lanjut tentang Apache Airflow, lihat Apache Airflow.
Prasyarat
Sebelum menggunakan Apache Airflow untuk menjadwalkan pekerjaan MaxCompute, pastikan kondisi berikut terpenuhi:
Apache Airflow telah diinstal dan dijalankan.
Untuk informasi lebih lanjut, lihat Mulai Cepat.
Versi Apache Airflow 1.10.7 digunakan dalam topik ini.
Langkah 1: Tulis skrip Python untuk penjadwalan pekerjaan dan simpan file ke direktori home Apache Airflow
Tulis skrip Python untuk penjadwalan pekerjaan dan simpan sebagai file .py. File skrip mencakup logika penjadwalan lengkap dan nama pekerjaan yang ingin dijadwalkan. Dalam langkah ini, file skrip Python bernama Airflow_MC.py dibuat. File ini berisi konten berikut:
# -*- coding: UTF-8 -*-
import sys
import os
from odps import ODPS
from odps import options
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from configparser import ConfigParser
import time
reload(sys)
sys.setdefaultencoding('utf8')
# Ubah format pengkodean default.
# Pengaturan parameter MaxCompute
options.sql.settings = {'options.tunnel.limit_instance_tunnel': False, 'odps.sql.allow.fullscan': True}
cfg = ConfigParser()
cfg.read("odps.ini")
print(cfg.items())
# Ganti variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dengan ID AccessKey akun pengguna.
# Ganti variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_SECRET dengan Rahasia AccessKey akun pengguna.
# Kami menyarankan agar Anda tidak langsung menggunakan string ID AccessKey dan Rahasia AccessKey Anda.
odps = ODPS(cfg.get("odps",os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID')),cfg.get("odps",os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET')),cfg.get("odps","project"),cfg.get("odps","endpoint"))
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retry_delay': timedelta(minutes=5),
'start_date':datetime(2020,1,15)
# 'email': ['airflow@example.com'],
# 'email_on_failure': False,
# 'email_on_retry': False,
# 'retries': 1,
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
# Alur kerja penjadwalan
dag = DAG(
'Airflow_MC', default_args=default_args, schedule_interval=timedelta(seconds=30))
def read_sql(sqlfile):
with io.open(sqlfile, encoding='utf-8', mode='r') as f:
sql=f.read()
f.closed
return sql
# Penjadwalan pekerjaan
def get_time():
print 'Waktu saat ini {}'.format(time.time())
return time.time()
# Penjadwalan pekerjaan
def mc_job ():
project = odps.get_project() # Dapatkan informasi proyek default.
instance=odps.run_sql("select * from long_chinese;")
print(instance.get_logview_address())
instance.wait_for_success()
with instance.open_reader() as reader:
count = reader.count
print("Jumlah catatan data dalam tabel: {}".format(count))
for record in reader:
print record
return count
t1 = PythonOperator (
task_id = 'get_time' ,
provide_context = False ,
python_callable = get_time,
dag = dag )
t2 = PythonOperator (
task_id = 'mc_job' ,
provide_context = False ,
python_callable = mc_job ,
dag = dag )
t2.set_upstream(t1)Langkah 2: Kirim skrip untuk penjadwalan pekerjaan
Di jendela baris perintah, jalankan perintah berikut untuk mengirimkan skrip Python yang ditulis di Langkah 1.
python Airflow_MC.pyDi jendela baris perintah, jalankan perintah berikut untuk menghasilkan alur kerja penjadwalan dan menjalankan pekerjaan uji.
# cetak daftar DAG aktif airflow list_dags # mencetak daftar tugas untuk "tutorial" dag_id airflow list_tasks Airflow_MC # mencetak hierarki tugas dalam DAG tutorial airflow list_tasks Airflow_MC --tree # Jalankan pekerjaan uji. airflow test Airflow_MC get_time 2010-01-16 airflow test Airflow_MC mc_job 2010-01-16
Langkah 3: Jalankan pekerjaan
Anda dapat masuk ke antarmuka web Apache Airflow. Di halaman DAGs, temukan alur kerja yang telah dikirimkan dan klik ikon
di kolom Links untuk menjalankan pekerjaan.

Langkah 4: Lihat hasil eksekusi pekerjaan
Anda dapat mengklik nama pekerjaan dan melihat alur kerja di tab Graph View. Kemudian, klik pekerjaan dalam alur kerja, seperti mc_job. Dalam kotak dialog yang muncul, klik View Log untuk melihat hasil eksekusi pekerjaan.
