All Products
Search
Document Center

AnalyticDB:Use Airflow to schedule XIHE SQL jobs

Last Updated:Aug 07, 2024

Airflow is a popular open source scheduler that provides a variety of command-line tools and an easy-to-use web interface. Airflow can orchestrate and schedule various workflows as directed acyclic graphs (DAGs). You can use Airflow to intelligently orchestrate extract, transform, load (ETL) jobs for offline data and real-time data processing in AnalyticDB for MySQL. This automates data processing and improves processing efficiency.

Prerequisites

  • An AnalyticDB for MySQL Data Lakehouse Edition cluster is created.

  • Airflow is installed. For more information, see Installation of Airflow.

  • The IP address of the server that runs Airflow is added to an IP address whitelist of the AnalyticDB for MySQL cluster. For more information, see Configure an IP address whitelist.

Procedure

  1. Check whether the apache-airflow-providers-mysql package is displayed.

    1. Access the Airflow web interface. In the top navigation bar, choose Admin > Providers.

    2. On the Providers page, check whether the apache-airflow-providers-mysql package is displayed.

    3. (Conditionally required) If the apache-airflow-providers-mysql package is not displayed, run the following command to manually install the apache-airflow-providers-mysql package:

      pip install apache-airflow-providers-mysql
      Important

      If the OSError: mysql_config not found error message is returned after you run the preceding command, run the yum install mysql-devel command to install MySQL and then rerun the preceding command to install the apache-airflow-providers-mysql package.

  2. Establish a connection to the AnalyticDB for MySQL cluster.

    1. In the top navigation bar, choose Admin > Connections.

    2. Click the image icon. On the Add Connections page, configure the parameters that are described in the following table.

      Parameter

      Description

      Connection id

      The name of the connection.

      Connection Type

      The type of the connection. Select MySQL.

      Host

      The endpoint that is used to connect to the AnalyticDB for MySQL cluster. You can obtain the endpoint on the Cluster Information page of the AnalyticDB for MySQL console.

      Login

      The name of the database account of the AnalyticDB for MySQL cluster.

      Password

      The password of the database account of the AnalyticDB for MySQL cluster.

      Port

      The port number of the AnalyticDB for MySQL cluster. Set the value to 3306.

      Note

      Other parameters are optional. Configure the parameters based on your business requirements.

  3. Go to the Airflow installation directory and check the dags_folder parameter in the airflow.cfg file.

    1. Go to the Airflow installation directory.

      cd /root/airflow
    2. Check the dags_folder parameter in the airflow.cfg file.

      cat file.cfg
    3. (Conditionally required) If no folder exists in the path specified by the dags_folder parameter, run the mkdir command to create a folder.

      Note

      For example, if the dags_folder parameter specifies the /root/airflow/dags path and the /root/airflow path does not contain a folder named dags, you can create the dags folder in the /root/airflow path.

  4. Write a DAG file. In this example, a DAG file named mysql_dags.py is used.

    from airflow import DAG
    from airflow.providers.mysql.operators.mysql import MySqlOperator
    from airflow.utils.dates import days_ago
    
    default_args = {
        'owner': 'airflow',
    }
    
    dag = DAG(
        'example_mysql',
        default_args=default_args,
        start_date=days_ago(2),
        tags=['example'],
    )
    
    mysql_test = MySqlOperator(
        task_id='mysql_test',
        mysql_conn_id='test',
        sql='SHOW DATABASES;',
        dag=dag,
    )
    mysql_test_task = MySqlOperator(
        task_id='mysql_test_task',
        mysql_conn_id='test',
        sql='SELECT * FROM test;',
        dag=dag,
    )
    mysql_test >> mysql_test_task
    if __name__ == "__main__":
        dag.cli()
    

    Parameters:

    • mysql_conn_id: the name of the connection established in Step 2.

    • sql: the business-specific SQL statement.

    For information about other parameters, see DAGs.

  5. On the Airflow web interface, click the DAGs tab and click the DAG that you want to manage. In the upper-right corner of the page that appears, click the image icon.

    After the execution is successful, you can click the green circle to the right of the DAG name to view the execution details of the DAG.

    image

    image

    Important

    The displayed execution time of the DAG is 8 hours earlier than the corresponding time in the UTC+8 time zone because the Airflow scheduler uses the UTC time zone.