このトピックでは、DMSNotebookOperator 操作の構成情報について説明します。
機能説明
DMS によって管理される Notebook ファイル (.pynb) を実行します。
前提条件
ワークスペースを作成済みであること。
リソースグループを作成済みであること。
セッションを再利用する場合、Notebook セッションを作成済みであること。
セッションを再利用する必要がない場合 (新しい Notebook セッションを作成する場合) は、テンプレート (Notebook セッションの構成情報) が作成されていること。
パラメーター
パラメーター file_path、run_params、profile、session_id、profile_id、cluster_id、session_name、profile_name、cluster_name は Jinja テンプレートを使用できます。
パラメーター | タイプ | 必須 | 説明 |
file_path | 文字列 | はい | Notebook ファイル (.pynb) のパス。 |
profile | dict | いいえ | Notebook セッションの構成情報。
|
profile_id | 文字列 | いいえ 説明 セッションを再利用しない場合に必須です。 |
|
profile_name | 文字列 | ||
cluster_type | 文字列 | DMS ワークスペースの計算クラスターのタイプ。有効な値:
| |
cluster_id | 文字列 |
説明 2 つのうちの 1 つを選択します。 | |
cluster_name | 文字列 | ||
spec | 文字列 | ドライバーのリソース仕様。有効な値:
| |
runtime_name | 文字列 | イメージ名。 | |
session_id | 文字列 | いいえ 説明 セッションを再利用する場合に必須です。 | 再利用されるセッションの情報。
説明 2 つのうちの 1 つを選択します。 |
session_name | 文字列 | ||
run_params | dict | いいえ | Notebook ファイル内の変数を置き換えることができる実行時パラメーター。 |
timeout | int | いいえ | Notebook ファイルの実行期間 (タイムアウト期間) (秒単位)。 |
polling_interval | int | いいえ | 実行結果がリフレッシュされる間隔。単位は秒です。デフォルト値は 10 です。 |
例
task_id と dag は Airflow の特定のパラメーターです。詳細については、「Airflow 公式ドキュメント」をご参照ください。
from airflow import DAG
from airflow.decorators import task
from airflow.models.param import Param
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
import json
from airflow.providers.alibaba_dms.cloud.operators.dms_notebook import DMSNotebookOperator
with DAG(
"dms_notebook_test",
params={
"x":3
},
) as dag:
notebook_operator = DMSNotebookOperator(
task_id='notebook_test_hz_name',
profile_name='hansheng_profile.48',
profile={},
cluster_type='spark',
cluster_name='spark_general2.218',
spec='4C32G',
runtime_name='Spark3.5_Scala2.12_Python3.9_General:1.0.9',
file_path='/Workspace/code/default/test.ipynb',
run_params={
'a':"{{ params.x }}"
},
polling_interval=5,
dag=dag
)
run_this_last = EmptyOperator(
task_id="run_this_last",
dag=dag,
)
notebook_operator >> run_this_last
if __name__ == "__main__":
dag.test(
run_conf={}
)