cloud-native Data Warehouse AnalyticDB for MySQL Edition supports Airflow clusters. This topic describes how to purchase and manage Airflow clusters.

Background information

Airflow is an open source scheduling tool written in Python. Airflow was first used for Airbnb in 2014. Airflow was made open source in the spring of 2015 before it became an Apache Incubator project in 2016. Airflow assembles workflows that have dependencies into directed acyclic graphs (DAGs) to schedule complex tasks. The following terms help you have a better understanding of Airflow:
  • DAG

    DAGs in Airflow define an entire job. Tasks on the same DAG have the same schedule.

  • task

    A task must belong to a DAG. Task dependencies are configured in each DAG. You can also configure task dependencies across DAGs. However, we recommend that you do not perform this operation because task dependencies across DAGs are less visualized and make it difficult to manage dependencies.

  • DAG run

    When a DAG meets its schedule or is triggered, a DAG run is created. A DAG run indicates a DAG instance.

  • task instance

    When a task is scheduled to start, a task instance is created. A task instance represents a specific run of a task.

AnalyticDB for MySQL Edition 3.0 uses Airflow 1.10.12. For more information, see Apache Airflow Documentation.

Create an Airflow cluster

Note The Airflow feature is in canary release. To experience this feature, Submit a ticket.
  1. Log on to the AnalyticDB for MySQL console by your Alibaba Cloud account.
  2. In the upper-left corner of the page, select the region where clusters reside.
  3. In the left-side navigation pane, click Clusters.
  4. On the V3.0 Clusters tab, click the target Cluster ID.
  5. In the left-side navigation pane, choose Airflow > Database Accounts.
  6. In the Note message, click Create to go to the buy page.
    Note It takes about 10 minutes to create an Airflow cluster. While an Airflow cluster is being created, all features related to Airflow are disabled.

Manage database accounts

The first step to run a DAG task is to bind a database account in a cluster. You must configure a valid password for the account. Otherwise, the DAG task fails. You can bind multiple accounts that are created in the cluster. These accounts include a privileged account and standard accounts. This way, you can specify a username in an Airflow DAG script and verify whether the user who submits the Spark task is valid. If a specified account does not exist or if the username and password of an account do not match, the DAG task fails.

  1. Log on to the AnalyticDB for MySQL console by your Alibaba Cloud account.
  2. In the upper-left corner of the page, select the region where clusters reside.
  3. In the left-side navigation pane, click Clusters.
  4. On the V3.0 Clusters tab, click the target Cluster ID.
  5. In the left-side navigation pane, choose Airflow > Database Accounts.
  6. On the page that appears, click Bind Database Account. In the Bind Database Account panel, specify Account and Password and click OK.

    The bound database account is displayed in the database account list. You can also click Test Connectivity in the Actions column corresponding to the database account to test whether the account can be connected.Bind Database Account In the Actions column, you can also click Delete to unbind the database account or click Modify to reset the password for the account.

Manage DAG tasks

DAGs allow you to orchestrate tasks by writing scripts in Python. By default, the AnalyticDB for MySQL console provides a basic script template for you to write scripts. SparkJobOperator that is encapsulated in AnalyticDB for MySQL Edition can be seamlessly connected to Spark clusters in AnalyticDB for MySQL Edition 3.0 to process offline data.

Create a DAG
AnalyticDB for MySQL Edition provides a basic script template for you to write scripts in Python. Python has limits on code formats. We recommend that you use a Python integrated development environment (IDE) to write valid scripts before you submit tasks.
  1. In the AnalyticDB for MySQL Edition console, click the name of a cluster for which an Airflow cluster is created.
  2. In the left-side navigation pane, choose Airflow > DAG Tasks.
  3. On the page that appears, click Create DAG. In the Create DAG panel, write a Python script based on the basic script template that is provided to schedule tasks.
    Note The value of the dag_id parameter must be unique in an Airflow cluster. Multiple DAGs cannot use the same dag_id.
    Example:
    from datetime import datetime, timedelta
    from airflow import DAG
    from custom_operator.spark_operator import SparkJobOperator
    
    """
    Airflow Notes:
    1.Airflow 1.10.12 Version Official Doc: https://airflow.apache.org/docs/apache-airflow/1.10.12/index.html
    2.Don't delete the imported SparkJobOperator, and other airflow operators are not allowed;
    3.Due to the python formatter requirements, you'd better input this python code in the professional python IDE, and then paste the code here. Otherwise this code will not be resolved correctly by the python interpreter;
    """
    
    """
    DAG Notes:
    1.The dag_id is required, and should be unique for the airflow cluster;
    2.If you want the tasks to be re-run in regular , schedule_interval is needed;
    """
    default_args = {
        'owner': 'xxxx',
        'start_date': datetime(2021, 1, 21),
        'retries': 1,
        'retry_delay': timedelta(minutes=10),
    }
    dag = DAG(dag_id='dag_id_xxxx', default_args=default_args, schedule_interval=timedelta(minutes=10))
    
    # Task Definition
    spark_submit_task = SparkJobOperator(
        task_id='task_xxx',
        name='',
        file='',
        class_name='',
        num_executor=1,
        args=[''],
        conf={
            'spark.adb.userName': ''
        },
        dag=dag
    )
    
    # Run Task
    spark_submit_task
  4. Click OK.
DAG list
The created DAG task is displayed in the DAG task list. The task list contains the following information:
  • Enabled
    1. If the DAG task is a scheduled task and is enabled, the script is run based on the specified time interval.
    2. If the DAG task is disabled, the DAG task is suspended.
  • DAG ID
    The ID specified by the dag_id parameter. The DAG ID must be unique in an Airflow cluster. If the DAG ID is not unique, the task cannot be created. If you do not specify the dag_id parameter, the value of the first parameter of the DAG script is used. The following formats are supported:
    1. DAG(dag_id='dag_id_xxxx',
    2. DAG('dag_id_xxxx',
  • DAG Task Parsing Status

    AnalyticDB for MySQL Edition encapsulates the parsing process. After the DAG script is submitted, you can view the following parsing states in the console:

    1. Parsing: By default, the Airflow cluster parses the Python script at an interval of 5 minutes. If the Python file is being parsed, the task cannot be executed.
    2. Parsing Failed: The Airflow cluster detects an error when the DAG script is being parsed. Basic information of the error is provided to help you check the error in the written script.
    3. Parsed: The DAG script is parsed. Basic parameters of the script are written to the Airflow cluster. After the parameters are written, the DAG task waits to be executed at a specific point in time or after a specific operation.
  • Recent Task

    The task execution state and the number of execution results in each state when the DAG was last scheduled. The state of a task can be success, running, failed, none, upstream_failed, skipped, up_for_retry, up_for_reschedule, queued, scheduled, or sensing.

  • Last Running At

    The time when the DAG was last scheduled.

  • All Running Status of DAG

    The numbers of the following execution results of the DAG task: success, running, and failed.

  • Details
    • If the DAG script is submitted and parsed, you can view the DAG task dependencies and submitted source code.
    • If the DAG script is in the Parsing or Parsing Failed state, you can view only submitted code.
  • Modify
    If the DAG script fails to be parsed or if you want to modify the submitted DAG script, you can modify the script. A DAG ID is the unique identifier that the system uses to execute a task. Take note of the following items when you modify the DAG script:
    • If you modify other content instead of the DAG ID, the DAG still exists. The Airflow cluster parses the modified script again within 5 minutes.
    • If the DAG ID is modified, the original DAG is deleted. A new dag_id is created to identify the modified DAG script. The Airflow cluster parses the modified script again within 5 minutes.
  • Delete

    You can delete the DAG script that is identified by the dag_id and the task metadata of the script.