すべてのプロダクト
Search
ドキュメントセンター

AnalyticDB:Airflow を使用して Spark ジョブをスケジュールする

最終更新日:Nov 09, 2025

Airflow は、さまざまなワークロードを有向非循環グラフ (DAG) としてオーケストレーションおよびスケジュールする、人気のオープンソーススケジューリングツールです。Spark ジョブは、Spark Airflow Operator または spark-submit コマンドラインツールを使用してスケジュールできます。このトピックでは、Airflow を使用して AnalyticDB for MySQL Spark ジョブをスケジュールする方法について説明します。

前提条件

Spark SQL ジョブのスケジュール

AnalyticDB for MySQL では、バッチモードまたは対話モードで Spark SQL を実行できます。スケジュール手順は実行モードによって異なります。

バッチモード

Spark Airflow Operator

  1. 次のコマンドを実行して、Airflow Spark プラグインをインストールします。

    pip install https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20230608/qvjf/adb_spark_airflow-0.0.1-py3-none-any.whl
  2. 接続を作成します。例:

    {
      "auth_type": "AK",
      "access_key_id": "<your_access_key_ID>",
      "access_key_secret": "<your_access_key_secret>",
      "region": "<your_region>"
    }

    次の表にパラメーターを示します。

    パラメーター

    説明

    auth_type

    認証方式。値を AK に設定します。これは、AccessKey ペアが認証に使用されることを指定します。

    access_key_id

    Alibaba Cloud アカウントの AccessKey ID、または AnalyticDB for MySQL へのアクセス権限を持つ Resource Access Management (RAM) ユーザー。

    AccessKey ID と AccessKey Secret の取得方法の詳細については、「アカウントと権限」をご参照ください。

    access_key_secret

    Alibaba Cloud アカウントの AccessKey Secret、または AnalyticDB for MySQL へのアクセス権限を持つ RAM ユーザー。

    AccessKey ID と AccessKey Secret の取得方法の詳細については、「アカウントと権限」をご参照ください。

    region

    AnalyticDB for MySQL クラスターのリージョン ID。

  3. Airflow DAG の宣言ファイルを作成します。この例では、spark_dags.py という名前のファイルが作成されます。

    from datetime import datetime
    
    from airflow.models.dag import DAG
    from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator
    
    with DAG(
        dag_id="my_dag_name",
        default_args={"cluster_id": "<your_cluster_ID>", "rg_name": "<your_resource_group>", "region": "<your_region>"},
    ) as dag:
    
        spark_sql = AnalyticDBSparkSQLOperator(
            task_id="task2",
            sql="SHOW DATABASES;"
        )
    
        spark_sql
    

    次の表にパラメーターを示します。

    DAG 構成パラメーター

    パラメーター

    必須

    説明

    dag_id

    はい

    DAG の名前。カスタム名を入力できます。

    default_args

    はい

    • cluster_id: AnalyticDB for MySQL クラスターの ID。

    • rg_name: AnalyticDB for MySQL クラスター内のジョブリソースグループの名前。

    • region: AnalyticDB for MySQL クラスターのリージョン ID。

    詳細については、「DAG パラメーター」をご参照ください。

    AnalyticDBSparkSQLOperator 構成パラメーター

    パラメーター

    必須

    説明

    task_id

    はい

    ジョブ ID。

    SQL

    はい

    Spark SQL 文。

    詳細については、「Airflow パラメーター」をご参照ください。

  4. Airflow 構成宣言ファイル dags_folder があるフォルダーに spark_dags.py ファイルを保存します。

  5. DAG を実行します。詳細については、「Airflow ドキュメント」をご参照ください。

Spark-submit

説明

AnalyticDB for MySQL Spark conf/spark-defaults.conf 構成ファイルまたは Airflow パラメーターで、AnalyticDB for MySQL の特定のパラメーターを構成できます。特定のパラメーターには、clusterId、regionId、keyId、secretId が含まれます。詳細については、「Spark アプリケーション構成パラメーター」をご参照ください。

  1. 次のコマンドを実行して、Airflow Spark プラグインをインストールします。

    pip3 install apache-airflow-providers-apache-spark
    重要
    • Airflow Spark プラグインをインストールする前に、Python 3 をインストールする必要があります。

    • apache-airflow-providers-apache-spark プラグインをインストールすると、Apache Spark コミュニティによって開発された PySpark が自動的にインストールされます。PySpark をアンインストールする場合は、次のコマンドを実行します。

      pip3 uninstall pyspark
  2. spark-submit パッケージをダウンロードしてパラメーターを構成します

  3. 次のコマンドを実行して、spark-submit のアドレスを Airflow のパスに追加します。

    export PATH=$PATH:</your/adb/spark/path/bin>
    重要

    Airflow を開始する前に、spark-submit のアドレスを Airflow のパスに追加する必要があります。そうしないと、ジョブのスケジューリング時にシステムが spark-submit コマンドを見つけられない可能性があります。

  4. Airflow DAG の宣言ファイルを作成します。この例では、demo.py という名前のファイルが作成されます。

    from airflow.models import DAG
    from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator
    from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
    from airflow.utils.dates import days_ago
    args = {
        'owner': 'Aliyun ADB Spark',
    }
    with DAG(
        dag_id='example_spark_operator',
        default_args=args,
        schedule_interval=None,
        start_date=days_ago(2),
        tags=['example'],
    ) as dag:
        adb_spark_conf = {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.resourceSpec": "medium"
        }
        # [START howto_operator_spark_submit]
        submit_job = SparkSubmitOperator(
            conf=adb_spark_conf,
            application="oss://<bucket_name>/jar/pi.py",
            task_id="submit_job",
            verbose=True
        )
        # [END howto_operator_spark_submit]
        # [START howto_operator_spark_sql]
        sql_job = SparkSqlOperator(
            conn_id="spark_default",
            sql="SELECT * FROM yourdb.yourtable",
            conf=",".join([k + "=" + v for k, v in adb_spark_conf.items()]),
            task_id="sql_job",
            verbose=True
        )
        # [END howto_operator_spark_sql]
        submit_job >> sql_job
    
  5. Airflow インストールディレクトリの dags フォルダーに demo.py ファイルを保存します。

  6. DAG を実行します。詳細については、「Airflow ドキュメント」をご参照ください。

対話モード

  1. Spark 対話型リソースグループのエンドポイントを取得します。

    1. AnalyticDB for MySQL コンソールにログインします。コンソールの左上隅でリージョンを選択します。左側のナビゲーションウィンドウで、クラスターリスト をクリックします。管理するクラスターを見つけて、クラスター ID をクリックします。

    2. 左側のナビゲーションウィンドウで、クラスター管理 > リソース管理 を選択し、リソースグループ管理 タブをクリックします。

    3. 目的のリソースグループを探し、詳細操作 列でクリックすると、内部エンドポイントと public endpoint が表示されます。エンドポイントの横にある image アイコンをクリックしてコピーできます。また、[ポート] の横にある括弧内の image アイコンをクリックして、JDBC 接続文字列をコピーすることもできます。

      次の場合、[パブリックエンドポイント] の横にある [ネットワークをリクエスト] をクリックして、パブリックエンドポイントを手動でリクエストする必要があります。

      • Spark SQL ジョブを送信するために使用されるクライアントツールが、ローカルマシンまたは外部サーバーにデプロイされている。

      • Spark SQL ジョブを送信するために使用されるクライアントツールが ECS インスタンスにデプロイされており、ECS インスタンスと AnalyticDB for MySQL クラスターが同じ VPC にない。

      image

  2. apache-airflow-providers-apache-hive および apache-airflow-providers-common-sql の依存関係をインストールします。

  3. Airflow Web インターフェイスにアクセスします。上部のナビゲーションバーで、[管理] > [接続] を選択します。

  4. image ボタンをクリックします。[接続の追加] ページで、次の表で説明されているパラメーターを構成します。

    パラメーター

    説明

    接続 ID

    接続の名前。この例では、adb_spark_cluster が使用されます。

    接続タイプ

    [Hive Server 2 Thrift] を選択します。

    ホスト

    ステップ 1 で取得したエンドポイント。エンドポイントの default を実際のデータベース名に置き換え、エンドポイントから resource_group=<resource group name> サフィックスを削除します。

    例: jdbc:hive2://amv-t4naxpqk****sparkwho.ads.aliyuncs.com:10000/adb_demo

    スキーマ

    データベースの名前。この例では、adb_demo が使用されます。

    ログイン

    AnalyticDB for MySQL クラスターのデータベースアカウントと対話型リソースグループの名前。フォーマット: resource_group_name/database_account_name

    例: spark_interactive_prod/spark_user

    パスワード

    AnalyticDB for MySQL データベースアカウントのパスワード。

    ポート

    Spark 対話型リソースグループのポート番号。値を 10000 に設定します。

    追加

    認証方式。ユーザー名とパスワード認証が使用されることを指定する次の内容を入力します。

    {
      "auth_mechanism": "CUSTOM"
    }
  5. DAG ファイルを記述します。

    from airflow import DAG
    from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
    from datetime import datetime
    
    default_args = {
        'owner': 'airflow',
        'start_date': datetime(2025, 2, 10),
        'retries': 1,
    }
    
    dag = DAG(
        'adb_spark_sql_test',
        default_args=default_args,
        schedule_interval='@daily',
    )
    
    
    jdbc_query = SQLExecuteQueryOperator(
        task_id='execute_spark_sql_query', 
        conn_id='adb_spark_cluster',  
        sql='show databases',  
        dag=dag
    )
    
    jdbc_query

    次の表にパラメーターを示します。

    パラメーター

    必須

    説明

    task_id

    はい

    ジョブ ID。カスタム ID を入力できます。

    conn_id

    はい

    接続の名前。ステップ 4 で作成した接続の ID を入力します。

    sql

    はい

    Spark SQL 文。

    詳細については、「Airflow パラメーター」をご参照ください。

  6. Airflow Web インターフェイスで、DAG の横にある image ボタンをクリックします。

Spark JAR ジョブのスケジュール

Spark Airflow Operator

  1. 次のコマンドを実行して、Airflow Spark プラグインをインストールします。

    pip install https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20230608/qvjf/adb_spark_airflow-0.0.1-py3-none-any.whl
  2. 接続を作成します。例:

    {
      "auth_type": "AK",
      "access_key_id": "<your_access_key_ID>",
      "access_key_secret": "<your_access_key_secret>",
      "region": "<your_region>"
    }

    次の表にパラメーターを示します。

    パラメーター

    説明

    auth_type

    認証方式。値を AK に設定します。これは、AccessKey ペアが認証に使用されることを指定します。

    access_key_id

    Alibaba Cloud アカウントの AccessKey ID、または AnalyticDB for MySQL へのアクセス権限を持つ Resource Access Management (RAM) ユーザー。

    AccessKey ID と AccessKey Secret の取得方法の詳細については、「アカウントと権限」をご参照ください。

    access_key_secret

    Alibaba Cloud アカウントの AccessKey Secret、または AnalyticDB for MySQL へのアクセス権限を持つ RAM ユーザー。

    AccessKey ID と AccessKey Secret の取得方法の詳細については、「アカウントと権限」をご参照ください。

    region

    AnalyticDB for MySQL クラスターのリージョン ID。

  3. Airflow DAG の宣言ファイルを作成します。この例では、spark_dags.py という名前のファイルが作成されます。

    from datetime import datetime
    
    from airflow.models.dag import DAG
    from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkBatchOperator
    from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator
    
    with DAG(
        dag_id=DAG_ID,
        default_args={"cluster_id": "your cluster", "rg_name": "your resource group", "region": "your region"},
    ) as dag:
        spark_pi = AnalyticDBSparkBatchOperator(
            task_id="task1",
            file="local:///tmp/spark-examples.jar",
            class_name="org.apache.spark.examples.SparkPi",
        )
    
        spark_lr = AnalyticDBSparkBatchOperator(
            task_id="task2",
            file="local:///tmp/spark-examples.jar",
            class_name="org.apache.spark.examples.SparkLR",
        )
    
        spark_pi >> spark_lr
    
        from tests_common.test_utils.watcher import watcher
    
        # このテストでは、成功/失敗を正しくマークするために watcher が必要です
        # "tearDown" タスクがトリガールールとともに DAG の一部である場合
        list(dag.tasks) >> watcher()
        

    DAG 構成パラメーター

    パラメーター

    必須

    説明

    dag_id

    はい

    DAG の名前。カスタム名を入力できます。

    default_args

    はい

    • cluster_id: AnalyticDB for MySQL クラスターの ID。

    • rg_name: AnalyticDB for MySQL クラスター内のジョブリソースグループの名前。

    • region: AnalyticDB for MySQL クラスターのリージョン ID。

    詳細については、「DAG パラメーター」をご参照ください。

    AnalyticDBSparkBatchOperator 構成パラメーター

    パラメーター

    必須

    説明

    task_id

    はい

    ジョブ ID。

    file

    はい

    Spark アプリケーションのメインファイルの絶対パス。メインファイルは、エントリポイントを含む JAR パッケージ、または Python アプリケーションのエントリポイントとして機能する実行可能ファイルです。

    重要

    Spark アプリケーションのメインファイルは OSS に保存する必要があります。

    OSS バケットと AnalyticDB for MySQL クラスターは同じリージョンに存在する必要があります。

    class_name

    特定の条件が満たされた場合ははい

    • Java または Scala アプリケーションのエントリポイント。

    • Python アプリケーションにはエントリポイントは必要ありません。

    詳細については、「AnalyticDBSparkBatchOperator パラメーター」をご参照ください。

  4. Airflow 構成宣言ファイル dags_folder があるフォルダーに spark_dags.py ファイルを保存します。

  5. DAG を実行します。詳細については、「Airflow ドキュメント」をご参照ください。

Spark-submit

説明

AnalyticDB for MySQL Spark conf/spark-defaults.conf 構成ファイルまたは Airflow パラメーターで、AnalyticDB for MySQL の特定のパラメーターを構成できます。特定のパラメーターには、clusterId、regionId、keyId、secretId、ossUploadPath が含まれます。詳細については、「Spark アプリケーション構成パラメーター」をご参照ください。

  1. 次のコマンドを実行して、Airflow Spark プラグインをインストールします。

    pip3 install apache-airflow-providers-apache-spark
    重要
    • Airflow Spark プラグインをインストールする前に、Python 3 をインストールする必要があります。

    • apache-airflow-providers-apache-spark プラグインをインストールすると、Apache Spark コミュニティによって開発された PySpark が自動的にインストールされます。PySpark をアンインストールする場合は、次のコマンドを実行します。

      pip3 uninstall pyspark
  2. spark-submit パッケージをダウンロードしてパラメーターを構成します

  3. 次のコマンドを実行して、spark-submit のアドレスを Airflow のパスに追加します。

    export PATH=$PATH:</your/adb/spark/path/bin>
    重要

    Airflow を開始する前に、spark-submit のアドレスを Airflow のパスに追加する必要があります。そうしないと、ジョブのスケジューリング時にシステムが spark-submit コマンドを見つけられない可能性があります。

  4. Airflow DAG の宣言ファイルを作成します。この例では、demo.py という名前のファイルが作成されます。

    from datetime import datetime
    
    from airflow.models.dag import DAG
    from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkBatchOperator
    from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator
    
    with DAG(
        dag_id=DAG_ID,
        start_date=datetime(2021, 1, 1),
        schedule=None,
        default_args={"cluster_id": "your cluster", "rg_name": "your resource group", "region": "your region"},
        max_active_runs=1,
        catchup=False,
    ) as dag:
        spark_pi = AnalyticDBSparkBatchOperator(
            task_id="task1",
            file="local:///tmp/spark-examples.jar",
            class_name="org.apache.spark.examples.SparkPi",
        )
    
        spark_lr = AnalyticDBSparkBatchOperator(
            task_id="task2",
            file="local:///tmp/spark-examples.jar",
            class_name="org.apache.spark.examples.SparkLR",
        )
    
        spark_pi >> spark_lr
    
        from tests_common.test_utils.watcher import watcher
    
        # このテストでは、成功/失敗を正しくマークするために watcher が必要です
        # "tearDown" タスクがトリガールールとともに DAG の一部である場合
        list(dag.tasks) >> watcher()
        

    次の表にパラメーターを示します。

    DAG 構成パラメーター

    パラメーター

    必須

    説明

    dag_id

    はい

    DAG の名前。カスタム名を入力できます。

    default_args

    はい

    • cluster_id: AnalyticDB for MySQL クラスターの ID。

    • rg_name: AnalyticDB for MySQL クラスター内のジョブリソースグループの名前。

    • region: AnalyticDB for MySQL クラスターのリージョン ID。

    詳細については、「DAG パラメーター」をご参照ください。

    AnalyticDBSparkBatchOperator 構成パラメーター

    パラメーター

    必須

    説明

    task_id

    はい

    ジョブ ID。

    file

    はい

    Spark アプリケーションのメインファイルの絶対パス。メインファイルは、エントリポイントを含む JAR パッケージ、または Python アプリケーションのエントリポイントとして機能する実行可能ファイルです。

    重要

    Spark アプリケーションのメインファイルは OSS に保存する必要があります。

    OSS バケットと AnalyticDB for MySQL クラスターは同じリージョンに存在する必要があります。

    class_name

    特定の条件が満たされた場合ははい

    • Java または Scala アプリケーションのエントリポイント。

    • Python アプリケーションにはエントリポイントは必要ありません。

    詳細については、「AnalyticDBSparkBatchOperator パラメーター」をご参照ください。

  5. Airflow インストールディレクトリの dags フォルダーに demo.py ファイルを保存します。

  6. DAG を実行します。詳細については、「Airflow ドキュメント」をご参照ください。