REST API和CLI是Airflow中兩種常用的自動化管理工具,其中REST API允許通過HTTP請求自動化管理Airflow的工作流程(DAG)和任務,CLI可以直接通過命令來觸發、管理DAG。
REST API和CLI的更多資訊,請參見Airflow官網文檔。
使用REST API
擷取登入Airflow執行個體所需的憑證(Token)。
調用CreateAirflowLoginToken介面擷取Token。
說明登入Token的有效時間為2小時。超過2小時還未登入Airflow執行個體,則需要重新擷取。
登入Airflow執行個體。
通過cURL命令登入執行個體的樣本:
curl -i "https://data-cn-beijing-dms.aliyuncs.com/airflow/33***/8691522017****/af-b3a7f110a6vmvn797****/login?token=xxxxx"說明請替換請求樣本中的以下內容:data-cn-beijing-dms.aliyuncs.com(Airflow執行個體所在地區)、airflow(執行個體名稱)、33***(DMS租戶ID)、8691522017****(工作空間ID)、af-b3a7797****(Airflow執行個體ID)和Token。
返回結果樣本:
HTTP/2 302 date: Thu, 05 Jun 2025 09:07:18 GMT content-type: text/html; charset=utf-8 content-length: 309 vary: Origin vary: Access-Control-Request-Method vary: Access-Control-Request-Headers server: envoy location: /airflow/1/865084104****/af-ehrmszbxk735bl3x5****/home cache-control: no-store x-robots-tag: noindex, nofollow x-envoy-upstream-service-time: 248 set-cookie: session=xxxxxxx; Expires=Sat, 05 Jul 2025 09:07:18 GMT; Secure; HttpOnly; Path=/; SameSite=None; Partitioned x-content-type-options: nosniff x-xss-protection: 0 referrer-policy: no-referrer eagleeye-traceid: 0a032a1517491144384047056ec81f strict-transport-security: max-age=31536000 timing-allow-origin: * <!doctype html> <html lang=en> <title>Redirecting...</title> <h1>Redirecting...</h1> <p>You should be redirected automatically to the target URL: <a href="/airflow/1/8650841047****/af-ehrmszbxk735bl3x5v****/home">/airflow/1/8650841047****/af-ehrmszbxk735bl3x5v****/home</a>. If not, click the link.登入執行個體後,DMS自動會為您返回Session資訊。此Session資訊用於請求REST API。
調用REST API。
您可參考社區版Airflow REST API文檔調用API。
請求樣本:
curl 'https://data-cn-beijing-dms.aliyuncs.com/airflow/33***/8691522017****/af-b3a7f110a6vmvn797y****/api/v1/health' -b 'session=xxx'返回結果樣本:
{ "dag_processor": { "latest_dag_processor_heartbeat": null, "status": null }, "metadatabase": { "status": "healthy" }, "scheduler": { "latest_scheduler_heartbeat": "2025-06-05T09:13:03.075907+00:00", "status": "healthy" }, "triggerer": { "latest_triggerer_heartbeat": null, "status": null } }
使用CLI命令列
擷取登入Airflow執行個體所需的憑證(Token)。
調用CreateAirflowLoginToken介面擷取Token。
說明登入Token的有效時間為2小時。超過2小時還未登入Airflow執行個體,則需要重新擷取。
登入Airflow執行個體。
登入執行個體後,DMS自動會為您返回Session資訊。此Session資訊用於使用CLI執行Airflow命令。
如下樣本為使用cURL命令發送登入執行個體的請求:
curl -i "https://data-cn-beijing-dms.aliyuncs.com/airflow/33***/8691522017****/af-b3a7f110a6vmvn797****/login?token=xxxxx"說明請替換請求樣本中的以下內容:data-cn-beijing-dms.aliyuncs.com(Airflow執行個體所在地區)、airflow(執行個體名稱)、33***(DMS租戶ID)、8691522017****(工作空間ID)、af-b3a7797****(Airflow執行個體ID)和Token。
返回結果樣本:
HTTP/2 302 date: Thu, 05 Jun 2025 09:07:18 GMT content-type: text/html; charset=utf-8 content-length: 309 vary: Origin vary: Access-Control-Request-Method vary: Access-Control-Request-Headers server: envoy location: /airflow/1/865084104****/af-ehrmszbxk735bl3x5****/home cache-control: no-store x-robots-tag: noindex, nofollow x-envoy-upstream-service-time: 248 set-cookie: session=xxxxxxx; Expires=Sat, 05 Jul 2025 09:07:18 GMT; Secure; HttpOnly; Path=/; SameSite=None; Partitioned x-content-type-options: nosniff x-xss-protection: 0 referrer-policy: no-referrer eagleeye-traceid: 0a032a1517491144384047056ec81f strict-transport-security: max-age=31536000 timing-allow-origin: * <!doctype html> <html lang=en> <title>Redirecting...</title> <h1>Redirecting...</h1> <p>You should be redirected automatically to the target URL: <a href="/airflow/1/8650841047****/af-ehrmszbxk735bl3x5v****/home">/airflow/1/8650841047****/af-ehrmszbxk735bl3x5v****/home</a>. If not, click the link.調用CLI介面執行Airflow命令。
請求樣本:
curl -X 'POST' 'https://data-cn-beijing-dms.aliyuncs.com/airflow/33***/86915220175****/af-b3a7f110a6vmvn797y****/api/v1/command?command=version' -b 'session=faf9009b-exxx'返回結果樣本:
{ "stderr": "", "stdout": "2.10.4\n" }
樣本:在Python環境下調用REST API
執行如下命令安裝DMS SDK。
pip install alibabacloud_dms20250414調用REST API。
建立如下Python指令碼樣本,檔案名稱為dms_rest_api.py。
# -*- coding: utf-8 -*- # This file is auto-generated, don't edit it. Thanks. import os import sys from typing import List from alibabacloud_dms20250414.client import Client as Dms20250414Client from alibabacloud_credentials.client import Client as CredentialClient from alibabacloud_dms20250414.models import CreateAirflowLoginTokenResponseBodyData from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_dms20250414 import models as dms_20250414_models from alibabacloud_tea_util import models as util_models from alibabacloud_tea_util.client import Client as UtilClient import requests endpoints = { "cn-beijing": "dms.cn-beijing.aliyuncs.com", "cn-hangzhou": "dms.cn-hangzhou.aliyuncs.com", "cn-shanghai": "dms.cn-shanghai.aliyuncs.com", "cn-shenzhen": "dms.cn-shenzhen.aliyuncs.com", "ap-southeast-1": "dms.ap-southeast-1.aliyuncs.com" } class DmsAirflowRestApi: def __init__(self, endpoint: str): """ # 使用憑據初始化帳號Client @return: Client @throws Exception """ # 工程代碼建議使用更安全的無AK方式,憑據配置方式請參見:https://www.alibabacloud.com/help/document_detail/378659.html。 credential = CredentialClient() config = open_api_models.Config( credential=credential ) # Endpoint 請參考 https://api.aliyun.com/product/Dms config.endpoint = endpoint self.client = Dms20250414Client(config) def get_login_token(self, airflowId: str) -> CreateAirflowLoginTokenResponseBodyData: create_airflow_login_token_request = dms_20250414_models.CreateAirflowLoginTokenRequest( airflow_id=airflowId ) runtime = util_models.RuntimeOptions() try: # 複製代碼運行請自行列印 API 的傳回值 response = self.client.create_airflow_login_token_with_options(create_airflow_login_token_request, runtime) data = response.body.data return data except Exception as error: # 此處僅做列印展示,請謹慎對待異常處理,在工程專案中切勿直接忽略異常。 # 錯誤 message print(error.message) # 診斷地址 print(error.data.get("Recommend")) UtilClient.assert_as_string(error.message) def get_session_cookie(self, login_token: CreateAirflowLoginTokenResponseBodyData): login_url = f'{login_token.host}/login?token={login_token.token}' try: # 發送請求 response = requests.get(login_url) response.raise_for_status() print(response.headers) # 擷取 session cookie if 'session' in response.cookies: return response.cookies['session'] return None except Exception as e: print(f"Error: {e}") return None def list_dags(self, login_token: CreateAirflowLoginTokenResponseBodyData, session_cookie: str): login_url = f'{login_token.host}/api/v1/dags' try: cookies = {'session': session_cookie} response = requests.get(login_url, cookies=cookies) response.raise_for_status() print(response.json()) except Exception as e: print(f"Error: {e}") if __name__ == '__main__': region = sys.argv[1] airflowId = sys.argv[2] endpoint = endpoints.get(region) restApi = DmsAirflowRestApi(endpoint) login_token = restApi.get_login_token(airflowId) session_cookie = restApi.get_session_cookie(login_token) restApi.list_dags(login_token, session_cookie)說明您可調用CreateAirflowLoginToken介面擷取AirflowID。
配置AccessKey ID和AccessKey Secret。
export ALIBABA_CLOUD_ACCESS_KEY_ID=xxx export ALIBABA_CLOUD_ACCESS_KEY_SECRET=xxx執行Python指令碼。
python3 dms_rest_api.py ${region_id} ${airflow_id}樣本:
python3 dms_rest_api.py cn-hangzhou af-sxsssxx。
支援的CLI命令列
CLI命令列的使用方法,請參見官方CLI使用文檔。
cheat-sheet connections add
connections delete
dags backfill
dags delete
dags list
dags list-jobs
dags list-import-errors
dags list-runs
dags next-execution
dags pause
dags report
dags reserialize
dags show
dags state
dags test
dags trigger
dags unpause
db clean
providers behaviours
providers get
providers hooks
providers links
providers list
providers notifications
providers secrets
providers triggerer
providers widgets
tasks clear
tasks failed-deps
tasks list
tasks render
tasks state
tasks states-for-dag-run
tasks test
variables delete
variables get
variables set
variables list
version