All Products
Search
Document Center

MaxCompute:Use Apache Airflow to schedule MaxCompute jobs

Last Updated:Aug 15, 2023

MaxCompute allows you to use Apache Airflow to schedule jobs by using Python interfaces. This topic describes how to use Python operators of Apache Airflow to schedule MaxCompute jobs.

Background information

Apache Airflow is an open source tool that is developed by Airbnb. Apache Airflow is written in Python and used to schedule jobs. Apache Airflow uses a directed acyclic graph (DAG) to define a group of jobs that have dependency relationships and schedule these jobs based on their relationships. Apache Airflow also allows you to define subjobs by using Python interfaces. Apache Airflow supports a variety of operators to meet your business requirements. For more information about Apache Airflow, see Apache Airflow.

Prerequisites

Before you use Apache Airflow to schedule MaxCompute jobs, make sure that the following conditions are met:

  • Python on MaxCompute (PyODPS) is installed.

    For more information, see Installation guide and limits.

  • Apache Airflow is installed and started.

    For more information, see Quick Start.

    Apache Airflow 1.10.7 is used in this topic.

Step 1: Write a Python script for job scheduling and save the file to the home directory of Apache Airflow

Write a Python script for job scheduling and save it as a .py file. The script file includes complete scheduling logic and the name of the job that you want to schedule. In this step, a Python script file named Airiflow_MC.py is created. This file contains the following content:

# -*- coding: UTF-8 -*-
import sys
import os
from odps import ODPS
from odps import options
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from configparser import ConfigParser
import time
reload(sys)
sys.setdefaultencoding('utf8')
# Change the default encoding format.
# MaxCompute parameter settings
options.sql.settings = {'options.tunnel.limit_instance_tunnel': False, 'odps.sql.allow.fullscan': True}
cfg = ConfigParser()
cfg.read("odps.ini")
print(cfg.items())
# Replace the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable with the AccessKey ID of the user account. 
# Replace the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable with the AccessKey secret of the user account. 
# We recommend that you do not directly use the strings of your AccessKey ID and AccessKey secret.
odps = ODPS(cfg.get("odps",os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID')),cfg.get("odps",os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET')),cfg.get("odps","project"),cfg.get("odps","endpoint"))
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retry_delay': timedelta(minutes=5),
    'start_date':datetime(2020,1,15)
    # 'email': ['airflow@example.com'],
    # 'email_on_failure': False,
    # 'email_on_retry': False,
    # 'retries': 1,
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}
# Scheduling workflow
dag = DAG(
    'Airiflow_MC', default_args=default_args, schedule_interval=timedelta(seconds=30))
def read_sql(sqlfile):
    with io.open(sqlfile, encoding='utf-8', mode='r') as f:
        sql=f.read()
    f.closed
    return sql
# Job scheduling
def get_time():
    print 'Current time {}'.format(time.time())
    return time.time()
# Job scheduling
def mc_job ():

    project = odps.get_project()  # Obtain information of the default project. 
    instance=odps.run_sql("select * from long_chinese;")
    print(instance.get_logview_address())
    instance.wait_for_success()
    with instance.open_reader() as reader:
        count = reader.count
    print("Number of data records in the table: {}".format(count))
    for record in reader:
        print record
    return count
t1 = PythonOperator (
    task_id = 'get_time' ,
    provide_context = False ,
    python_callable = get_time,
    dag = dag )

t2 = PythonOperator (
    task_id = 'mc_job' ,
    provide_context = False ,
    python_callable = mc_job ,
    dag = dag )
t2.set_upstream(t1)

Step 2: Submit the script for job scheduling

  1. In the command line window of the system, run the following command to submit the Python script that is written in Step 1.

    python Airiflow_MC.py
  2. In the command line window of the system, run the following commands to generate a scheduling workflow and run a test job.

    # print the list of active DAGs
    airflow list_dags
    
    # prints the list of tasks the "tutorial" dag_id
    airflow list_tasks Airiflow_MC
    
    # prints the hierarchy of tasks in the tutorial DAG
    airflow list_tasks Airiflow_MC --tree
    # Run a test job.
    airflow test Airiflow_MC get_time 2010-01-16
    airflow test Airiflow_MC mc_job 2010-01-16

Step 3: Run a job

You can log on to the web UI of Apache Airflow. On the DAGs page, find the workflow that you submit and click the 运行 icon in the Links column to run a job.

运行调度作业

Step 4: View the execution result of the job

You can click the name of the job and view the workflow on the Graph View tab. Then, click a job in the workflow, such as mc_job. In the dialog box that appears, click View Log to view the execution result of the job.

调度流程