このトピックでは、Airflow を使用して Notebook ファイルをスケジュールし、Notebook の実行進捗をモニターする方法について説明します。
前提条件
Airflow を使用した Notebook のスケジュール
- DMS コンソール V5.0 にログインします。
[ワークスペース管理] ページに移動します。
DMS は、ワークスペースにアクセスするための 2 つのパスを提供します。必要に応じて選択できます。
パス 1
左上隅の
アイコンにポインターを移動し、 を選択します。説明DMS コンソールを通常モードで使用する場合は、上部のナビゲーションバーで を選択します。

パス 2
ページの左側にあるデータインテリジェンスファクトリ
アイコンをクリックし、[ワークスペース管理] をクリックします。説明簡略化されていないコンソールを使用している場合は、上部のメニューバーから を選択します。
(リソースマネージャー) ページの WORKSPACE エリアで
をクリックし、[Notebook ファイルの作成] を選択します。Notebook ファイルに任意のコード (例:
print(1)) を構成します。REPOS (リポジトリ) エリアで Python コードを記述して、Notebook のスケジュールに必要なパラメーターを構成します。以下はサンプルコードです。
from doctest import debug from airflow import DAG from airflow.decorators import task from airflow.models.param import Param from airflow.operators.bash import BashOperator from airflow.operators.empty import EmptyOperator from airflow.providers.alibaba_dms.cloud.operators.dms_notebook import DMSNotebookOperator with DAG( "dms_notebook_sy_hz_name", params={ }, ) as dag: notebook_operator = DMSNotebookOperator( task_id='dms_notebook_sy_hz_name', profile_name='test', profile={}, cluster_type='spark', cluster_name='spark_cluster_855298', spec='4C16G', runtime_name='Spark3.5_Scala2.12_Python3.9_General:1.0.9', file_path='/Workspace/code/default/test.ipynb', run_params={'a':10}, polling_interval=5, debug=True, dag=dag ) run_this_last = EmptyOperator( task_id="run_this_last22", dag=dag, ) notebook_operator >> run_this_last if __name__ == "__main__": dag.test( run_conf={} )コード内の一部のパラメーターの説明は次のとおりです。説明されていないパラメーターについては、デフォルト値を維持できます。
パラメーター
型
必須
説明
task_id
string
はい
定義するタスクの一意の識別子です。
profile_name
string
はい
プロファイル名です。
右側のサイドバーにある
(構成管理) アイコンをクリックして、新しいプロファイルを構成できます。cluster_type
string
はい
Notebook セッションインスタンスで構成されたクラスタータイプです。
現在、CPU クラスターと Spark クラスターの 2 種類のクラスターがサポートされています。CPU クラスターは DMS によってデフォルトで作成されますが、Spark クラスターは手動で作成する必要があります。詳細については、「Spark クラスターの作成」をご参照ください。
cluster_name
string
はい
クラスター名です。
spec
string
はい
クラスターの仕様です。
現在、デフォルト仕様の 4C16G のみがサポートされています。
runtime_name
string
はい
実行時環境です。
現在、Spark 実行時環境は
Spark3.5_Scala2.12_Python3.9_General:1.0.9とSpark3.3_Scala2.12_Python3.9_General:1.0.9のみをサポートしています。file_path
string
はい
ファイルパスです。
ファイルパスを表示します。パスのフォーマットは
/Workspace/code/defaultです。例:/Workspace/code/default/test.ipynb。run_params
dict
いいえ
Notebook ファイル内の変数を置き換えることができる実行時パラメーターです。
timeout
int
いいえ
Notebook Cell の最大実行時間です。単位は秒です。
Cell の実行時間がタイムアウト値を超えると、ファイル全体のスケジュールが停止します。
polling_interval
int
いいえ
実行結果をリフレッシュする間隔です。単位は秒です。デフォルト値は 10 です。
リポジトリ名をクリックし、構成環境ページで、後で公開するために環境をワークフローインスタンスにバインドします。ワークフローインスタンスは、1 種類の環境にのみバインドできます。
ターゲットリポジトリ名にカーソルを合わせ、
をクリックし、[公開] と [この公開の環境] (リポジトリにバインドされた環境) を選択して、[OK] をクリックします。説明公開操作には 10 秒の遅延があります。
Notebook を実行します。
ワークスペースの左側にある
ワークフローアイコンをクリックし、ターゲットの Airflow インスタンス名をクリックして Airflow スペースに入ります。[コード] ページで、公開されたコードが現在のページに同期されていることを確認します。
現在のページへの同期を確認した後、右上隅にある
実行ボタンをクリックします。[グラフ] タブをクリックし、対応するタスクを見つけてクリックします。
[ログ] タブをクリックして、タスク実行ログを表示します。
タスクの実行中は、いつでも Notebook の実行進捗を表示できます。
Notebook の実行進捗の表示
DAG ページで実行進捗を表示する
[ログ] タブで現在のタスク実行進捗を表示します。たとえば、[progress] が [2/15] と表示されている場合、合計 15 個の Cell があり、現在 2 番目の Cell が実行されていることを示します。
説明notebook run successが表示されると、タスクが完了したことを示します。Notebook ファイルページで実行進捗を表示する
[ログ] ページで、ログ内の Notebook ページの [url] の後にあるリンクをクリックして Notebook ファイルに入り、実行ステータスをモニターします。右上隅のリフレッシュボタンをクリックすると、実行進捗をリアルタイムで更新できます。
実行結果が表示されると、Notebook タスクが完了したことを示します。