REST API と CLI は、Airflow で一般的に使用される 2 つの自動管理ツールです。REST API を使用すると、HTTP リクエストを通じて Airflow ワークフロー (DAG) とタスクの管理を自動化できます。一方、CLI を使用すると、コマンドを通じて直接 DAG をトリガーおよび管理できます。
詳細については、「Airflow 公式ドキュメント」をご参照ください。
REST API の使用
Airflow インスタンスへのログインに必要な認証情報 (トークン) を取得します。
CreateAirflowLoginToken 操作を呼び出してトークンを取得します。
説明ログイン トークンは 2 時間有効です。2 時間以内に Airflow インスタンスにログインしない場合は、新しいトークンを取得する必要があります。
Airflow インスタンスにログインします。
cURL コマンドを使用してインスタンスにログインする例:
curl -i "https://data-cn-beijing-dms.aliyuncs.com/airflow/33***/8691522017****/af-b3a7f110a6vmvn797****/login?token=xxxxx"説明リクエスト例の次の内容を置き換えます: data-cn-beijing-dms.aliyuncs.com (Airflow インスタンスが配置されているリージョン)、airflow (インスタンス名)、33*** (DMS テナント ID)、8691522017**** (ワークスペース ID)、af-b3a7797**** (Airflow インスタンス ID)、および Token。
応答の例:
HTTP/2 302 date: Thu, 05 Jun 2025 09:07:18 GMT content-type: text/html; charset=utf-8 content-length: 309 vary: Origin vary: Access-Control-Request-Method vary: Access-Control-Request-Headers server: envoy location: /airflow/1/865084104****/af-ehrmszbxk735bl3x5****/home cache-control: no-store x-robots-tag: noindex, nofollow x-envoy-upstream-service-time: 248 set-cookie: session=xxxxxxx; Expires=Sat, 05 Jul 2025 09:07:18 GMT; Secure; HttpOnly; Path=/; SameSite=None; Partitioned x-content-type-options: nosniff x-xss-protection: 0 referrer-policy: no-referrer eagleeye-traceid: 0a032a1517491144384047056ec81f strict-transport-security: max-age=31536000 timing-allow-origin: * <!doctype html> <html lang=en> <title>Redirecting...</title> <h1>Redirecting...</h1> <p>You should be redirected automatically to the target URL: <a href="/airflow/1/8650841047****/af-ehrmszbxk735bl3x5v****/home">/airflow/1/8650841047****/af-ehrmszbxk735bl3x5v****/home</a>. If not, click the link.インスタンスにログインすると、DMS は自動的にセッション情報を返します。このセッション情報は、REST API をリクエストするために使用されます。
REST API を呼び出します。
コミュニティ Airflow REST API ドキュメント を参照して API を呼び出すことができます。
リクエストの例:
curl 'https://data-cn-beijing-dms.aliyuncs.com/airflow/33***/8691522017****/af-b3a7f110a6vmvn797y****/api/v1/health' -b 'session=xxx'応答の例:
{ "dag_processor": { "latest_dag_processor_heartbeat": null, "status": null }, "metadatabase": { "status": "healthy" }, "scheduler": { "latest_scheduler_heartbeat": "2025-06-05T09:13:03.075907+00:00", "status": "healthy" }, "triggerer": { "latest_triggerer_heartbeat": null, "status": null } }
CLI コマンドラインの使用
Airflow インスタンスへのログインに必要な認証情報 (トークン) を取得します。
CreateAirflowLoginToken 操作を呼び出してトークンを取得します。
説明ログイン トークンは 2 時間有効です。2 時間以内に Airflow インスタンスにログインしない場合は、新しいトークンを取得する必要があります。
Airflow インスタンスにログインします。
インスタンスにログインすると、DMS は自動的にセッション情報を返します。このセッション情報は、CLI を使用して Airflow コマンドを実行するために使用されます。
次の例は、cURL コマンドを使用してインスタンスにログインするためのリクエストを送信する方法を示しています:
curl -i "https://data-cn-beijing-dms.aliyuncs.com/airflow/33***/8691522017****/af-b3a7f110a6vmvn797****/login?token=xxxxx"説明リクエスト例の次の内容を置き換えます: data-cn-beijing-dms.aliyuncs.com (Airflow インスタンスが配置されているリージョン)、airflow (インスタンス名)、33*** (DMS テナント ID)、8691522017**** (ワークスペース ID)、af-b3a7797**** (Airflow インスタンス ID)、および Token。
応答の例:
HTTP/2 302 date: Thu, 05 Jun 2025 09:07:18 GMT content-type: text/html; charset=utf-8 content-length: 309 vary: Origin vary: Access-Control-Request-Method vary: Access-Control-Request-Headers server: envoy location: /airflow/1/865084104****/af-ehrmszbxk735bl3x5****/home cache-control: no-store x-robots-tag: noindex, nofollow x-envoy-upstream-service-time: 248 set-cookie: session=xxxxxxx; Expires=Sat, 05 Jul 2025 09:07:18 GMT; Secure; HttpOnly; Path=/; SameSite=None; Partitioned x-content-type-options: nosniff x-xss-protection: 0 referrer-policy: no-referrer eagleeye-traceid: 0a032a1517491144384047056ec81f strict-transport-security: max-age=31536000 timing-allow-origin: * <!doctype html> <html lang=en> <title>Redirecting...</title> <h1>Redirecting...</h1> <p>You should be redirected automatically to the target URL: <a href="/airflow/1/8650841047****/af-ehrmszbxk735bl3x5v****/home">/airflow/1/8650841047****/af-ehrmszbxk735bl3x5v****/home</a>. If not, click the link.CLI インターフェイスを呼び出して Airflow コマンドを実行します。
リクエストの例:
curl -X 'POST' 'https://data-cn-beijing-dms.aliyuncs.com/airflow/33***/86915220175****/af-b3a7f110a6vmvn797y****/api/v1/command?command=version' -b 'session=faf9009b-exxx'応答の例:
{ "stderr": "", "stdout": "2.10.4\n" }
例: Python 環境での REST API の呼び出し
次のコマンドを実行して DMS SDK をインストールします。
pip install alibabacloud_dms20250414REST API を呼び出します。
ファイル名 dms_rest_api.py で、次の Python スクリプト例を作成します。
# -*- coding: utf-8 -*- # このファイルは自動生成されたものです。編集しないでください。ありがとうございます。 import os import sys from typing import List from alibabacloud_dms20250414.client import Client as Dms20250414Client from alibabacloud_credentials.client import Client as CredentialClient from alibabacloud_dms20250414.models import CreateAirflowLoginTokenResponseBodyData from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_dms20250414 import models as dms_20250414_models from alibabacloud_tea_util import models as util_models from alibabacloud_tea_util.client import Client as UtilClient import requests endpoints = { "cn-beijing": "dms.cn-beijing.aliyuncs.com", "cn-hangzhou": "dms.cn-hangzhou.aliyuncs.com", "cn-shanghai": "dms.cn-shanghai.aliyuncs.com", "cn-shenzhen": "dms.cn-shenzhen.aliyuncs.com", "ap-southeast-1": "dms.ap-southeast-1.aliyuncs.com" } class DmsAirflowRestApi: def __init__(self, endpoint: str): """ # 認証情報を使用してアカウント Client を初期化します @return: Client @throws Exception """ # 本番コードでは、AccessKey を使用しない、より安全なアプローチを推奨します。認証情報の設定方法については、https://www.alibabacloud.com/help/document_detail/378659.html をご参照ください。 credential = CredentialClient() config = open_api_models.Config( credential=credential ) # エンドポイントについては、https://api.aliyun.com/product/Dms をご参照ください config.endpoint = endpoint self.client = Dms20250414Client(config) def get_login_token(self, airflowId: str) -> CreateAirflowLoginTokenResponseBodyData: create_airflow_login_token_request = dms_20250414_models.CreateAirflowLoginTokenRequest( airflow_id=airflowId ) runtime = util_models.RuntimeOptions() try: # 必要に応じて、API 操作の応答を表示するコードを記述します。 response = self.client.create_airflow_login_token_with_options(create_airflow_login_token_request, runtime) data = response.body.data return data except Exception as error: # 実際のビジネスシナリオでは例外を慎重に処理し、プロジェクトで例外を無視しないでください。この例では、エラーメッセージが出力されます。 # エラーメッセージを出力します。 print(error.message) # トラブルシューティングのための情報を表示します。 print(error.data.get("Recommend")) UtilClient.assert_as_string(error.message) def get_session_cookie(self, login_token: CreateAirflowLoginTokenResponseBodyData): login_url = f'{login_token.host}/login?token={login_token.token}' try: # リクエストを送信します。 response = requests.get(login_url) response.raise_for_status() print(response.headers) # セッションクッキーを取得します if 'session' in response.cookies: return response.cookies['session'] return None except Exception as e: print(f"Error: {e}") return None def list_dags(self, login_token: CreateAirflowLoginTokenResponseBodyData, session_cookie: str): login_url = f'{login_token.host}/api/v1/dags' try: cookies = {'session': session_cookie} response = requests.get(login_url, cookies=cookies) response.raise_for_status() print(response.json()) except Exception as e: print(f"Error: {e}") if __name__ == '__main__': region = sys.argv[1] airflowId = sys.argv[2] endpoint = endpoints.get(region) restApi = DmsAirflowRestApi(endpoint) login_token = restApi.get_login_token(airflowId) session_cookie = restApi.get_session_cookie(login_token) restApi.list_dags(login_token, session_cookie)説明CreateAirflowLoginToken 操作を呼び出して AirflowID を取得できます。
AccessKey ID と AccessKey Secret を設定します。
export ALIBABA_CLOUD_ACCESS_KEY_ID=xxx export ALIBABA_CLOUD_ACCESS_KEY_SECRET=xxxPython スクリプトを実行します。
python3 dms_rest_api.py ${region_id} ${airflow_id}例:
python3 dms_rest_api.py cn-hangzhou af-sxsssxx。
サポートされている CLI コマンド
CLI コマンドの使用方法の詳細については、「公式 CLI ドキュメント」をご参照ください。
cheat-sheet connections add
connections delete
dags backfill
dags delete
dags list
dags list-jobs
dags list-import-errors
dags list-runs
dags next-execution
dags pause
dags report
dags reserialize
dags show
dags state
dags test
dags trigger
dags unpause
db clean
providers behaviours
providers get
providers hooks
providers list
providers links
providers notifications
providers secrets
providers triggerer
providers widgets
tasks clear
tasks failed-deps
tasks list
tasks render
tasks state
tasks states-for-dag-run
tasks test
variables delete
variables get
variables set
variables list
version