Airflow は、さまざまなワークロードを有向非循環グラフ (DAG) としてオーケストレーションおよびスケジュールする、人気のオープンソーススケジューリングツールです。Spark ジョブは、Spark Airflow Operator または spark-submit コマンドラインツールを使用してスケジュールできます。このトピックでは、Airflow を使用して AnalyticDB for MySQL Spark ジョブをスケジュールする方法について説明します。
前提条件
AnalyticDB for MySQL Enterprise Edition、Basic Edition、または Data Lakehouse Edition クラスターが作成されていること。
AnalyticDB for MySQL クラスター用にジョブリソースグループまたは Spark 対話型リソースグループが作成されていること。
Python 3.7 以降がインストールされていること。
Airflow を実行するサーバーの IP アドレスが、AnalyticDB for MySQL クラスターのホワイトリストに追加されていること。
Spark SQL ジョブのスケジュール
AnalyticDB for MySQL では、バッチモードまたは対話モードで Spark SQL を実行できます。スケジュール手順は実行モードによって異なります。
バッチモード
Spark Airflow Operator
次のコマンドを実行して、Airflow Spark プラグインをインストールします。
pip install https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20230608/qvjf/adb_spark_airflow-0.0.1-py3-none-any.whl接続を作成します。例:
{ "auth_type": "AK", "access_key_id": "<your_access_key_ID>", "access_key_secret": "<your_access_key_secret>", "region": "<your_region>" }次の表にパラメーターを示します。
パラメーター
説明
auth_type
認証方式。値を AK に設定します。これは、AccessKey ペアが認証に使用されることを指定します。
access_key_id
Alibaba Cloud アカウントの AccessKey ID、または AnalyticDB for MySQL へのアクセス権限を持つ Resource Access Management (RAM) ユーザー。
AccessKey ID と AccessKey Secret の取得方法の詳細については、「アカウントと権限」をご参照ください。
access_key_secret
Alibaba Cloud アカウントの AccessKey Secret、または AnalyticDB for MySQL へのアクセス権限を持つ RAM ユーザー。
AccessKey ID と AccessKey Secret の取得方法の詳細については、「アカウントと権限」をご参照ください。
region
AnalyticDB for MySQL クラスターのリージョン ID。
Airflow DAG の宣言ファイルを作成します。この例では、
spark_dags.pyという名前のファイルが作成されます。from datetime import datetime from airflow.models.dag import DAG from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator with DAG( dag_id="my_dag_name", default_args={"cluster_id": "<your_cluster_ID>", "rg_name": "<your_resource_group>", "region": "<your_region>"}, ) as dag: spark_sql = AnalyticDBSparkSQLOperator( task_id="task2", sql="SHOW DATABASES;" ) spark_sql次の表にパラメーターを示します。
DAG 構成パラメーター
パラメーター
必須
説明
dag_id
はい
DAG の名前。カスタム名を入力できます。
default_args
はい
cluster_id: AnalyticDB for MySQL クラスターの ID。
rg_name: AnalyticDB for MySQL クラスター内のジョブリソースグループの名前。
region: AnalyticDB for MySQL クラスターのリージョン ID。
詳細については、「DAG パラメーター」をご参照ください。
AnalyticDBSparkSQLOperator 構成パラメーター
パラメーター
必須
説明
task_id
はい
ジョブ ID。
SQL
はい
Spark SQL 文。
詳細については、「Airflow パラメーター」をご参照ください。
Airflow 構成宣言ファイル dags_folder があるフォルダーに
spark_dags.pyファイルを保存します。DAG を実行します。詳細については、「Airflow ドキュメント」をご参照ください。
Spark-submit
AnalyticDB for MySQL Spark conf/spark-defaults.conf 構成ファイルまたは Airflow パラメーターで、AnalyticDB for MySQL の特定のパラメーターを構成できます。特定のパラメーターには、clusterId、regionId、keyId、secretId が含まれます。詳細については、「Spark アプリケーション構成パラメーター」をご参照ください。
次のコマンドを実行して、Airflow Spark プラグインをインストールします。
pip3 install apache-airflow-providers-apache-spark重要Airflow Spark プラグインをインストールする前に、Python 3 をインストールする必要があります。
apache-airflow-providers-apache-spark プラグインをインストールすると、Apache Spark コミュニティによって開発された PySpark が自動的にインストールされます。PySpark をアンインストールする場合は、次のコマンドを実行します。
pip3 uninstall pyspark
次のコマンドを実行して、spark-submit のアドレスを Airflow のパスに追加します。
export PATH=$PATH:</your/adb/spark/path/bin>重要Airflow を開始する前に、spark-submit のアドレスを Airflow のパスに追加する必要があります。そうしないと、ジョブのスケジューリング時にシステムが spark-submit コマンドを見つけられない可能性があります。
Airflow DAG の宣言ファイルを作成します。この例では、demo.py という名前のファイルが作成されます。
from airflow.models import DAG from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator from airflow.utils.dates import days_ago args = { 'owner': 'Aliyun ADB Spark', } with DAG( dag_id='example_spark_operator', default_args=args, schedule_interval=None, start_date=days_ago(2), tags=['example'], ) as dag: adb_spark_conf = { "spark.driver.resourceSpec": "medium", "spark.executor.resourceSpec": "medium" } # [START howto_operator_spark_submit] submit_job = SparkSubmitOperator( conf=adb_spark_conf, application="oss://<bucket_name>/jar/pi.py", task_id="submit_job", verbose=True ) # [END howto_operator_spark_submit] # [START howto_operator_spark_sql] sql_job = SparkSqlOperator( conn_id="spark_default", sql="SELECT * FROM yourdb.yourtable", conf=",".join([k + "=" + v for k, v in adb_spark_conf.items()]), task_id="sql_job", verbose=True ) # [END howto_operator_spark_sql] submit_job >> sql_jobAirflow インストールディレクトリの dags フォルダーに demo.py ファイルを保存します。
DAG を実行します。詳細については、「Airflow ドキュメント」をご参照ください。
対話モード
Spark 対話型リソースグループのエンドポイントを取得します。
AnalyticDB for MySQL コンソールにログインします。コンソールの左上隅でリージョンを選択します。左側のナビゲーションウィンドウで、クラスターリスト をクリックします。管理するクラスターを見つけて、クラスター ID をクリックします。
左側のナビゲーションウィンドウで、 を選択し、リソースグループ管理 タブをクリックします。
目的のリソースグループを探し、詳細 を 操作 列でクリックすると、内部エンドポイントと public endpoint が表示されます。エンドポイントの横にある
アイコンをクリックしてコピーできます。また、[ポート] の横にある括弧内の
アイコンをクリックして、JDBC 接続文字列をコピーすることもできます。次の場合、[パブリックエンドポイント] の横にある [ネットワークをリクエスト] をクリックして、パブリックエンドポイントを手動でリクエストする必要があります。
Spark SQL ジョブを送信するために使用されるクライアントツールが、ローカルマシンまたは外部サーバーにデプロイされている。
Spark SQL ジョブを送信するために使用されるクライアントツールが ECS インスタンスにデプロイされており、ECS インスタンスと AnalyticDB for MySQL クラスターが同じ VPC にない。

apache-airflow-providers-apache-hive および apache-airflow-providers-common-sql の依存関係をインストールします。
Airflow Web インターフェイスにアクセスします。上部のナビゲーションバーで、 を選択します。
ボタンをクリックします。[接続の追加] ページで、次の表で説明されているパラメーターを構成します。パラメーター
説明
接続 ID
接続の名前。この例では、
adb_spark_clusterが使用されます。接続タイプ
[Hive Server 2 Thrift] を選択します。
ホスト
ステップ 1 で取得したエンドポイント。エンドポイントの
defaultを実際のデータベース名に置き換え、エンドポイントからresource_group=<resource group name>サフィックスを削除します。例:
jdbc:hive2://amv-t4naxpqk****sparkwho.ads.aliyuncs.com:10000/adb_demo。スキーマ
データベースの名前。この例では、
adb_demoが使用されます。ログイン
AnalyticDB for MySQL クラスターのデータベースアカウントと対話型リソースグループの名前。フォーマット:
resource_group_name/database_account_name。例:
spark_interactive_prod/spark_user。パスワード
AnalyticDB for MySQL データベースアカウントのパスワード。
ポート
Spark 対話型リソースグループのポート番号。値を 10000 に設定します。
追加
認証方式。ユーザー名とパスワード認証が使用されることを指定する次の内容を入力します。
{ "auth_mechanism": "CUSTOM" }DAG ファイルを記述します。
from airflow import DAG from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator from datetime import datetime default_args = { 'owner': 'airflow', 'start_date': datetime(2025, 2, 10), 'retries': 1, } dag = DAG( 'adb_spark_sql_test', default_args=default_args, schedule_interval='@daily', ) jdbc_query = SQLExecuteQueryOperator( task_id='execute_spark_sql_query', conn_id='adb_spark_cluster', sql='show databases', dag=dag ) jdbc_query次の表にパラメーターを示します。
パラメーター
必須
説明
task_id
はい
ジョブ ID。カスタム ID を入力できます。
conn_id
はい
接続の名前。ステップ 4 で作成した接続の ID を入力します。
sql
はい
Spark SQL 文。
詳細については、「Airflow パラメーター」をご参照ください。
Airflow Web インターフェイスで、DAG の横にある
ボタンをクリックします。
Spark JAR ジョブのスケジュール
Spark Airflow Operator
次のコマンドを実行して、Airflow Spark プラグインをインストールします。
pip install https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20230608/qvjf/adb_spark_airflow-0.0.1-py3-none-any.whl接続を作成します。例:
{ "auth_type": "AK", "access_key_id": "<your_access_key_ID>", "access_key_secret": "<your_access_key_secret>", "region": "<your_region>" }次の表にパラメーターを示します。
パラメーター
説明
auth_type
認証方式。値を AK に設定します。これは、AccessKey ペアが認証に使用されることを指定します。
access_key_id
Alibaba Cloud アカウントの AccessKey ID、または AnalyticDB for MySQL へのアクセス権限を持つ Resource Access Management (RAM) ユーザー。
AccessKey ID と AccessKey Secret の取得方法の詳細については、「アカウントと権限」をご参照ください。
access_key_secret
Alibaba Cloud アカウントの AccessKey Secret、または AnalyticDB for MySQL へのアクセス権限を持つ RAM ユーザー。
AccessKey ID と AccessKey Secret の取得方法の詳細については、「アカウントと権限」をご参照ください。
region
AnalyticDB for MySQL クラスターのリージョン ID。
Airflow DAG の宣言ファイルを作成します。この例では、
spark_dags.pyという名前のファイルが作成されます。from datetime import datetime from airflow.models.dag import DAG from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkBatchOperator from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator with DAG( dag_id=DAG_ID, default_args={"cluster_id": "your cluster", "rg_name": "your resource group", "region": "your region"}, ) as dag: spark_pi = AnalyticDBSparkBatchOperator( task_id="task1", file="local:///tmp/spark-examples.jar", class_name="org.apache.spark.examples.SparkPi", ) spark_lr = AnalyticDBSparkBatchOperator( task_id="task2", file="local:///tmp/spark-examples.jar", class_name="org.apache.spark.examples.SparkLR", ) spark_pi >> spark_lr from tests_common.test_utils.watcher import watcher # このテストでは、成功/失敗を正しくマークするために watcher が必要です # "tearDown" タスクがトリガールールとともに DAG の一部である場合 list(dag.tasks) >> watcher()DAG 構成パラメーター
パラメーター
必須
説明
dag_id
はい
DAG の名前。カスタム名を入力できます。
default_args
はい
cluster_id: AnalyticDB for MySQL クラスターの ID。
rg_name: AnalyticDB for MySQL クラスター内のジョブリソースグループの名前。
region: AnalyticDB for MySQL クラスターのリージョン ID。
詳細については、「DAG パラメーター」をご参照ください。
AnalyticDBSparkBatchOperator 構成パラメーター
パラメーター
必須
説明
task_id
はい
ジョブ ID。
file
はい
Spark アプリケーションのメインファイルの絶対パス。メインファイルは、エントリポイントを含む JAR パッケージ、または Python アプリケーションのエントリポイントとして機能する実行可能ファイルです。
重要Spark アプリケーションのメインファイルは OSS に保存する必要があります。
OSS バケットと AnalyticDB for MySQL クラスターは同じリージョンに存在する必要があります。
class_name
特定の条件が満たされた場合ははい
Java または Scala アプリケーションのエントリポイント。
Python アプリケーションにはエントリポイントは必要ありません。
詳細については、「AnalyticDBSparkBatchOperator パラメーター」をご参照ください。
Airflow 構成宣言ファイル dags_folder があるフォルダーに
spark_dags.pyファイルを保存します。DAG を実行します。詳細については、「Airflow ドキュメント」をご参照ください。
Spark-submit
AnalyticDB for MySQL Spark conf/spark-defaults.conf 構成ファイルまたは Airflow パラメーターで、AnalyticDB for MySQL の特定のパラメーターを構成できます。特定のパラメーターには、clusterId、regionId、keyId、secretId、ossUploadPath が含まれます。詳細については、「Spark アプリケーション構成パラメーター」をご参照ください。
次のコマンドを実行して、Airflow Spark プラグインをインストールします。
pip3 install apache-airflow-providers-apache-spark重要Airflow Spark プラグインをインストールする前に、Python 3 をインストールする必要があります。
apache-airflow-providers-apache-spark プラグインをインストールすると、Apache Spark コミュニティによって開発された PySpark が自動的にインストールされます。PySpark をアンインストールする場合は、次のコマンドを実行します。
pip3 uninstall pyspark
次のコマンドを実行して、spark-submit のアドレスを Airflow のパスに追加します。
export PATH=$PATH:</your/adb/spark/path/bin>重要Airflow を開始する前に、spark-submit のアドレスを Airflow のパスに追加する必要があります。そうしないと、ジョブのスケジューリング時にシステムが spark-submit コマンドを見つけられない可能性があります。
Airflow DAG の宣言ファイルを作成します。この例では、demo.py という名前のファイルが作成されます。
from datetime import datetime from airflow.models.dag import DAG from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkBatchOperator from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator with DAG( dag_id=DAG_ID, start_date=datetime(2021, 1, 1), schedule=None, default_args={"cluster_id": "your cluster", "rg_name": "your resource group", "region": "your region"}, max_active_runs=1, catchup=False, ) as dag: spark_pi = AnalyticDBSparkBatchOperator( task_id="task1", file="local:///tmp/spark-examples.jar", class_name="org.apache.spark.examples.SparkPi", ) spark_lr = AnalyticDBSparkBatchOperator( task_id="task2", file="local:///tmp/spark-examples.jar", class_name="org.apache.spark.examples.SparkLR", ) spark_pi >> spark_lr from tests_common.test_utils.watcher import watcher # このテストでは、成功/失敗を正しくマークするために watcher が必要です # "tearDown" タスクがトリガールールとともに DAG の一部である場合 list(dag.tasks) >> watcher()次の表にパラメーターを示します。
DAG 構成パラメーター
パラメーター
必須
説明
dag_id
はい
DAG の名前。カスタム名を入力できます。
default_args
はい
cluster_id: AnalyticDB for MySQL クラスターの ID。
rg_name: AnalyticDB for MySQL クラスター内のジョブリソースグループの名前。
region: AnalyticDB for MySQL クラスターのリージョン ID。
詳細については、「DAG パラメーター」をご参照ください。
AnalyticDBSparkBatchOperator 構成パラメーター
パラメーター
必須
説明
task_id
はい
ジョブ ID。
file
はい
Spark アプリケーションのメインファイルの絶対パス。メインファイルは、エントリポイントを含む JAR パッケージ、または Python アプリケーションのエントリポイントとして機能する実行可能ファイルです。
重要Spark アプリケーションのメインファイルは OSS に保存する必要があります。
OSS バケットと AnalyticDB for MySQL クラスターは同じリージョンに存在する必要があります。
class_name
特定の条件が満たされた場合ははい
Java または Scala アプリケーションのエントリポイント。
Python アプリケーションにはエントリポイントは必要ありません。
詳細については、「AnalyticDBSparkBatchOperator パラメーター」をご参照ください。
Airflow インストールディレクトリの dags フォルダーに demo.py ファイルを保存します。
DAG を実行します。詳細については、「Airflow ドキュメント」をご参照ください。