Airflowは一般的なオープンソーススケジューラで、さまざまなコマンドラインツールと使いやすいwebインターフェイスを提供します。 気流は、さまざまなワークフローを有向非巡回グラフ (DAG) として調整およびスケジュールできます。 Airflowを使用して、AnalyticDB for MySQLでのオフラインデータおよびリアルタイムデータ処理の抽出、変換、読み込み (ETL) ジョブをインテリジェントに調整できます。 これにより、データ処理が自動化され、処理効率が向上します。
前提条件
AnalyticDB for MySQL Data Lakehouse Editionクラスターが作成されます。
気流がインストールされています。 詳細については、「エアフローのインストール」をご参照ください。
Airflowを実行するサーバーのIPアドレスが、AnalyticDB for MySQLクラスターのIPアドレスホワイトリストに追加されます。 詳細については、「IPアドレスホワイトリストの設定」をご参照ください。
手順
apache-airflow-providers-mysqlパッケージが表示されているかどうかを確認します。
Airflow webインターフェイスにアクセスします。 上部のナビゲーションバーで、 を選択します。
[プロバイダー] ページで、apache-airflow-Providers-mysqlパッケージが表示されているかどうかを確認します。
(条件付き) apache-airflow-providers-mysqlパッケージが表示されない場合は、次のコマンドを実行してapache-airflow-providers-mysqlパッケージを手動でインストールします。
pip install apache-airflow-providers-mysql
重要上記のコマンドの実行後に
OSError: mysql_config not found
エラーメッセージが返された場合は、yum install mysql-devel
コマンドを実行してMySQLをインストールし、上記のコマンドを再実行してapache-airflow-providers-mysqlパッケージをインストールします。
AnalyticDB for MySQLクラスターへの接続を確立します。
上部のナビゲーションバーで、 を選択します。
アイコンをクリックし
ます。 [接続の追加] ページで、次の表に示すパラメーターを設定します。
パラメーター
説明
接続id
接続 の名前です。
接続タイプ
接続のドライバーです。 [MySQL] を選択します。
ホスト
AnalyticDB for MySQLクラスターへの接続に使用されるエンドポイント。 AnalyticDB for MySQLコンソールの [クラスター情報] ページでエンドポイントを取得できます。
ログイン
AnalyticDB for MySQLクラスターのデータベースアカウントの名前。
Password
AnalyticDB for MySQLクラスターのデータベースアカウントのパスワード。
ポート
AnalyticDB for MySQLクラスターのポート番号。 値を3306に設定します。
説明その他のパラメータはオプションです。 ビジネス要件に基づいてパラメーターを設定します。
Airflowインストールディレクトリに移動し、
airflow.cfg
ファイルのdags_folderパラメーターを確認します。Airflowインストールディレクトリに移動します。
cd /root/airflow
airflow.cfg
ファイルのdags_folderパラメーターを確認します。cat file.cfg
(条件付きで必要) dags_folderパラメーターで指定したパスにフォルダが存在しない場合は、
mkdir
コマンドを実行してフォルダを作成します。説明たとえば、dags_folderパラメーターに
/root/airflow/dags
パスを指定し、/root/airflow
パスにdags
という名前のフォルダーが含まれていない場合、/root/airflowパスにdagsフォルダーを作成できます。
を書くDAGファイル。 この例では、
mysql_dags.py
という名前のDAGファイルが使用されています。気流の輸入DAGからの
from airflow import DAG from airflow.providers.mysql.operators.mysql import MySqlOperator from airflow.utils.dates import days_ago default_args = { 'owner': 'airflow', } dag = DAG( 'example_mysql', default_args=default_args, start_date=days_ago(2), tags=['example'], ) mysql_test = MySqlOperator( task_id='mysql_test', mysql_conn_id='test', sql='SHOW DATABASES;', dag=dag, ) mysql_test_task = MySqlOperator( task_id='mysql_test_task', mysql_conn_id='test', sql='SELECT * FROM test;', dag=dag, ) mysql_test >> mysql_test_task if __name__ == "__main__": dag.cli()
パラメーター:
mysql_conn_id
: ステップ2で確立された接続の名前。sql
: ビジネス固有のSQL文。
その他のパラメーターについては、「DAG」をご参照ください。
Airflow webインターフェイスで、[DAG] タブをクリックし、管理するDAGをクリックします。 表示されるページの右上隅で、
アイコン.
実行が成功した後、DAG名の右側にある緑色の円をクリックすると、DAGの実行の詳細が表示されます。
重要AirflowスケジューラがUTCタイムゾーンを使用するため、表示されるDAGの実行時間はUTC + 8タイムゾーンの対応する時間よりも8時間前です。