すべてのプロダクト
Search
ドキュメントセンター

AnalyticDB:Airflowを使用してXIHE SQLジョブをスケジュールする

最終更新日:Aug 08, 2024

Airflowは一般的なオープンソーススケジューラで、さまざまなコマンドラインツールと使いやすいwebインターフェイスを提供します。 気流は、さまざまなワークフローを有向非巡回グラフ (DAG) として調整およびスケジュールできます。 Airflowを使用して、AnalyticDB for MySQLでのオフラインデータおよびリアルタイムデータ処理の抽出、変換、読み込み (ETL) ジョブをインテリジェントに調整できます。 これにより、データ処理が自動化され、処理効率が向上します。

前提条件

  • AnalyticDB for MySQL Data Lakehouse Editionクラスターが作成されます。

  • 気流がインストールされています。 詳細については、「エアフローのインストール」をご参照ください。

  • Airflowを実行するサーバーのIPアドレスが、AnalyticDB for MySQLクラスターのIPアドレスホワイトリストに追加されます。 詳細については、「IPアドレスホワイトリストの設定」をご参照ください。

手順

  1. apache-airflow-providers-mysqlパッケージが表示されているかどうかを確認します。

    1. Airflow webインターフェイスにアクセスします。 上部のナビゲーションバーで、 [管理者] > [プロバイダー] を選択します。

    2. [プロバイダー] ページで、apache-airflow-Providers-mysqlパッケージが表示されているかどうかを確認します。

    3. (条件付き) apache-airflow-providers-mysqlパッケージが表示されない場合は、次のコマンドを実行してapache-airflow-providers-mysqlパッケージを手動でインストールします。

      pip install apache-airflow-providers-mysql
      重要

      上記のコマンドの実行後にOSError: mysql_config not foundエラーメッセージが返された場合は、yum install mysql-develコマンドを実行してMySQLをインストールし、上記のコマンドを再実行してapache-airflow-providers-mysqlパッケージをインストールします。

  2. AnalyticDB for MySQLクラスターへの接続を確立します。

    1. 上部のナビゲーションバーで、 [管理] > [接続] を選択します。

    2. アイコンをクリックしimageます。 [接続の追加] ページで、次の表に示すパラメーターを設定します。

      パラメーター

      説明

      接続id

      接続 の名前です。

      接続タイプ

      接続のドライバーです。 [MySQL] を選択します。

      ホスト

      AnalyticDB for MySQLクラスターへの接続に使用されるエンドポイント。 AnalyticDB for MySQLコンソール[クラスター情報] ページでエンドポイントを取得できます。

      ログイン

      AnalyticDB for MySQLクラスターのデータベースアカウントの名前。

      Password

      AnalyticDB for MySQLクラスターのデータベースアカウントのパスワード。

      ポート

      AnalyticDB for MySQLクラスターのポート番号。 値を3306に設定します。

      説明

      その他のパラメータはオプションです。 ビジネス要件に基づいてパラメーターを設定します。

  3. Airflowインストールディレクトリに移動し、airflow.cfgファイルのdags_folderパラメーターを確認します。

    1. Airflowインストールディレクトリに移動します。

      cd /root/airflow
    2. airflow.cfgファイルのdags_folderパラメーターを確認します。

      cat file.cfg
    3. (条件付きで必要) dags_folderパラメーターで指定したパスにフォルダが存在しない場合は、mkdirコマンドを実行してフォルダを作成します。

      説明

      たとえば、dags_folderパラメーターに /root/airflow/dagsパスを指定し、/root/airflowパスにdagsという名前のフォルダーが含まれていない場合、/root/airflowパスにdagsフォルダーを作成できます。

  4. を書くDAGファイル。 この例では、mysql_dags.pyという名前のDAGファイルが使用されています

    気流の輸入DAGからの

    from airflow import DAG
    from airflow.providers.mysql.operators.mysql import MySqlOperator
    from airflow.utils.dates import days_ago
    
    default_args = {
        'owner': 'airflow',
    }
    
    dag = DAG(
        'example_mysql',
        default_args=default_args,
        start_date=days_ago(2),
        tags=['example'],
    )
    
    mysql_test = MySqlOperator(
        task_id='mysql_test',
        mysql_conn_id='test',
        sql='SHOW DATABASES;',
        dag=dag,
    )
    mysql_test_task = MySqlOperator(
        task_id='mysql_test_task',
        mysql_conn_id='test',
        sql='SELECT * FROM test;',
        dag=dag,
    )
    mysql_test >> mysql_test_task
    if __name__ == "__main__":
        dag.cli()
    

    パラメーター:

    • mysql_conn_id: ステップ2で確立された接続の名前。

    • sql: ビジネス固有のSQL文。

    その他のパラメーターについては、「DAG」をご参照ください。

  5. Airflow webインターフェイス、[DAG] タブをクリックし、管理するDAGをクリックします。 表示されるページの右上隅で、imageアイコン.

    実行が成功した後、DAG名の右側にある緑色の円をクリックすると、DAGの実行の詳細が表示されます。

    image

    image

    重要

    AirflowスケジューラがUTCタイムゾーンを使用するため、表示されるDAGの実行時間はUTC + 8タイムゾーンの対応する時間よりも8時間前です。