すべてのプロダクト
Search
ドキュメントセンター

Data Management:REST API と CLI を使用したワークフローの自動管理

最終更新日:Nov 09, 2025

REST API と CLI は、Airflow で一般的に使用される 2 つの自動管理ツールです。REST API を使用すると、HTTP リクエストを通じて Airflow ワークフロー (DAG) とタスクの管理を自動化できます。一方、CLI を使用すると、コマンドを通じて直接 DAG をトリガーおよび管理できます。

詳細については、「Airflow 公式ドキュメント」をご参照ください。

REST API の使用

  1. Airflow インスタンスへのログインに必要な認証情報 (トークン) を取得します。

    CreateAirflowLoginToken 操作を呼び出してトークンを取得します。

    説明

    ログイン トークンは 2 時間有効です。2 時間以内に Airflow インスタンスにログインしない場合は、新しいトークンを取得する必要があります。

  2. Airflow インスタンスにログインします。

    cURL コマンドを使用してインスタンスにログインする例:

    curl -i "https://data-cn-beijing-dms.aliyuncs.com/airflow/33***/8691522017****/af-b3a7f110a6vmvn797****/login?token=xxxxx"
    説明

    リクエスト例の次の内容を置き換えます: data-cn-beijing-dms.aliyuncs.com (Airflow インスタンスが配置されているリージョン)、airflow (インスタンス名)、33*** (DMS テナント ID)、8691522017**** (ワークスペース ID)、af-b3a7797**** (Airflow インスタンス ID)、および Token。

    応答の例:

    HTTP/2 302
    date: Thu, 05 Jun 2025 09:07:18 GMT
    content-type: text/html; charset=utf-8
    content-length: 309
    vary: Origin
    vary: Access-Control-Request-Method
    vary: Access-Control-Request-Headers
    server: envoy
    location: /airflow/1/865084104****/af-ehrmszbxk735bl3x5****/home
    cache-control: no-store
    x-robots-tag: noindex, nofollow
    x-envoy-upstream-service-time: 248
    set-cookie: session=xxxxxxx; Expires=Sat, 05 Jul 2025 09:07:18 GMT; Secure; HttpOnly; Path=/; SameSite=None; Partitioned
    x-content-type-options: nosniff
    x-xss-protection: 0
    referrer-policy: no-referrer
    eagleeye-traceid: 0a032a1517491144384047056ec81f
    strict-transport-security: max-age=31536000
    timing-allow-origin: *
    
    <!doctype html>
    <html lang=en>
    <title>Redirecting...</title>
    <h1>Redirecting...</h1>
    <p>You should be redirected automatically to the target URL: <a href="/airflow/1/8650841047****/af-ehrmszbxk735bl3x5v****/home">/airflow/1/8650841047****/af-ehrmszbxk735bl3x5v****/home</a>. If not, click the link.

    インスタンスにログインすると、DMS は自動的にセッション情報を返します。このセッション情報は、REST API をリクエストするために使用されます。

  3. REST API を呼び出します。

    コミュニティ Airflow REST API ドキュメント を参照して API を呼び出すことができます。

    リクエストの例:

    curl 'https://data-cn-beijing-dms.aliyuncs.com/airflow/33***/8691522017****/af-b3a7f110a6vmvn797y****/api/v1/health' -b 'session=xxx' 

    応答の例:

    {
      "dag_processor": {
        "latest_dag_processor_heartbeat": null,
        "status": null
      },
      "metadatabase": {
        "status": "healthy"
      },
      "scheduler": {
        "latest_scheduler_heartbeat": "2025-06-05T09:13:03.075907+00:00",
        "status": "healthy"
      },
      "triggerer": {
        "latest_triggerer_heartbeat": null,
        "status": null
      }
    }

CLI コマンドラインの使用

  1. Airflow インスタンスへのログインに必要な認証情報 (トークン) を取得します。

    CreateAirflowLoginToken 操作を呼び出してトークンを取得します。

    説明

    ログイン トークンは 2 時間有効です。2 時間以内に Airflow インスタンスにログインしない場合は、新しいトークンを取得する必要があります。

  2. Airflow インスタンスにログインします。

    インスタンスにログインすると、DMS は自動的にセッション情報を返します。このセッション情報は、CLI を使用して Airflow コマンドを実行するために使用されます。

    次の例は、cURL コマンドを使用してインスタンスにログインするためのリクエストを送信する方法を示しています:

    curl -i "https://data-cn-beijing-dms.aliyuncs.com/airflow/33***/8691522017****/af-b3a7f110a6vmvn797****/login?token=xxxxx"
    説明

    リクエスト例の次の内容を置き換えます: data-cn-beijing-dms.aliyuncs.com (Airflow インスタンスが配置されているリージョン)、airflow (インスタンス名)、33*** (DMS テナント ID)、8691522017**** (ワークスペース ID)、af-b3a7797**** (Airflow インスタンス ID)、および Token。

    応答の例:

    HTTP/2 302
    date: Thu, 05 Jun 2025 09:07:18 GMT
    content-type: text/html; charset=utf-8
    content-length: 309
    vary: Origin
    vary: Access-Control-Request-Method
    vary: Access-Control-Request-Headers
    server: envoy
    location: /airflow/1/865084104****/af-ehrmszbxk735bl3x5****/home
    cache-control: no-store
    x-robots-tag: noindex, nofollow
    x-envoy-upstream-service-time: 248
    set-cookie: session=xxxxxxx; Expires=Sat, 05 Jul 2025 09:07:18 GMT; Secure; HttpOnly; Path=/; SameSite=None; Partitioned
    x-content-type-options: nosniff
    x-xss-protection: 0
    referrer-policy: no-referrer
    eagleeye-traceid: 0a032a1517491144384047056ec81f
    strict-transport-security: max-age=31536000
    timing-allow-origin: *
    
    <!doctype html>
    <html lang=en>
    <title>Redirecting...</title>
    <h1>Redirecting...</h1>
    <p>You should be redirected automatically to the target URL: <a href="/airflow/1/8650841047****/af-ehrmszbxk735bl3x5v****/home">/airflow/1/8650841047****/af-ehrmszbxk735bl3x5v****/home</a>. If not, click the link.
  3. CLI インターフェイスを呼び出して Airflow コマンドを実行します。

    リクエストの例:

    curl  -X 'POST' 'https://data-cn-beijing-dms.aliyuncs.com/airflow/33***/86915220175****/af-b3a7f110a6vmvn797y****/api/v1/command?command=version' -b 'session=faf9009b-exxx' 

    応答の例:

    {
      "stderr": "",
      "stdout": "2.10.4\n"
    }

例: Python 環境での REST API の呼び出し

  1. 次のコマンドを実行して DMS SDK をインストールします。

    pip install alibabacloud_dms20250414
  2. REST API を呼び出します。

    1. ファイル名 dms_rest_api.py で、次の Python スクリプト例を作成します。

      # -*- coding: utf-8 -*-
      # このファイルは自動生成されたものです。編集しないでください。ありがとうございます。
      import os
      import sys
      
      from typing import List
      
      from alibabacloud_dms20250414.client import Client as Dms20250414Client
      from alibabacloud_credentials.client import Client as CredentialClient
      from alibabacloud_dms20250414.models import CreateAirflowLoginTokenResponseBodyData
      from alibabacloud_tea_openapi import models as open_api_models
      from alibabacloud_dms20250414 import models as dms_20250414_models
      from alibabacloud_tea_util import models as util_models
      from alibabacloud_tea_util.client import Client as UtilClient
      import requests
      
      endpoints = {
          "cn-beijing": "dms.cn-beijing.aliyuncs.com",
          "cn-hangzhou": "dms.cn-hangzhou.aliyuncs.com",
          "cn-shanghai": "dms.cn-shanghai.aliyuncs.com",
          "cn-shenzhen": "dms.cn-shenzhen.aliyuncs.com",
          "ap-southeast-1": "dms.ap-southeast-1.aliyuncs.com"
      }
      
      
      class DmsAirflowRestApi:
      
          def __init__(self, endpoint: str):
              """
              # 認証情報を使用してアカウント Client を初期化します
              @return: Client
              @throws Exception
              """
              # 本番コードでは、AccessKey を使用しない、より安全なアプローチを推奨します。認証情報の設定方法については、https://www.alibabacloud.com/help/document_detail/378659.html をご参照ください。
              credential = CredentialClient()
              config = open_api_models.Config(
                  credential=credential
              )
              # エンドポイントについては、https://api.aliyun.com/product/Dms をご参照ください
              config.endpoint = endpoint
              self.client = Dms20250414Client(config)
      
          def get_login_token(self, airflowId: str) -> CreateAirflowLoginTokenResponseBodyData:
              create_airflow_login_token_request = dms_20250414_models.CreateAirflowLoginTokenRequest(
                  airflow_id=airflowId
              )
              runtime = util_models.RuntimeOptions()
              try:
                  # 必要に応じて、API 操作の応答を表示するコードを記述します。
                  response = self.client.create_airflow_login_token_with_options(create_airflow_login_token_request, runtime)
                  data = response.body.data
                  return data
              except Exception as error:
                  # 実際のビジネスシナリオでは例外を慎重に処理し、プロジェクトで例外を無視しないでください。この例では、エラーメッセージが出力されます。
                  # エラーメッセージを出力します。
                  print(error.message)
                  # トラブルシューティングのための情報を表示します。
                  print(error.data.get("Recommend"))
                  UtilClient.assert_as_string(error.message)
      
          def get_session_cookie(self, login_token: CreateAirflowLoginTokenResponseBodyData):
              login_url = f'{login_token.host}/login?token={login_token.token}'
              try:
                  # リクエストを送信します。
                  response = requests.get(login_url)
                  response.raise_for_status()
      
                  print(response.headers)
                  # セッションクッキーを取得します
                  if 'session' in response.cookies:
                      return response.cookies['session']
      
                  return None
      
              except Exception as e:
                  print(f"Error: {e}")
                  return None
      
          def list_dags(self, login_token: CreateAirflowLoginTokenResponseBodyData, session_cookie: str):
              login_url = f'{login_token.host}/api/v1/dags'
              try:
                  cookies = {'session': session_cookie}
                  response = requests.get(login_url, cookies=cookies)
                  response.raise_for_status()
      
                  print(response.json())
      
              except Exception as e:
                  print(f"Error: {e}")
      
      
      if __name__ == '__main__':
          region = sys.argv[1]
          airflowId = sys.argv[2]
      
          endpoint = endpoints.get(region)
          restApi = DmsAirflowRestApi(endpoint)
      
          login_token = restApi.get_login_token(airflowId)
          session_cookie = restApi.get_session_cookie(login_token)
          restApi.list_dags(login_token, session_cookie)
      
      説明

      CreateAirflowLoginToken 操作を呼び出して AirflowID を取得できます。

    2. AccessKey ID と AccessKey Secret を設定します。

      export ALIBABA_CLOUD_ACCESS_KEY_ID=xxx
      export ALIBABA_CLOUD_ACCESS_KEY_SECRET=xxx
    3. Python スクリプトを実行します。

      python3 dms_rest_api.py ${region_id} ${airflow_id}

      例: python3 dms_rest_api.py cn-hangzhou af-sxsssxx

サポートされている CLI コマンド

CLI コマンドの使用方法の詳細については、「公式 CLI ドキュメント」をご参照ください。

cheat-sheet connections add
connections delete
dags backfill
dags delete
dags list
dags list-jobs
dags list-import-errors
dags list-runs
dags next-execution
dags pause
dags report
dags reserialize
dags show
dags state
dags test
dags trigger
dags unpause
db clean
providers behaviours
providers get
providers hooks
providers list
providers links
providers notifications
providers secrets
providers triggerer
providers widgets
tasks clear
tasks failed-deps
tasks list
tasks render
tasks state
tasks states-for-dag-run
tasks test
variables delete
variables get
variables set
variables list
version