This topic describes the configuration information of the DMSNotebookOperator operation.
Feature description
Executes a Notebook file (.pynb) managed by DMS.
Prerequisites
You have created a workspace.
You have created a resource group.
To reuse a session, you must ensure that you have created a Notebook session.
If you do not need to reuse a session (create a new Notebook session), you must ensure that you have created a template (configuration information for the Notebook session).
Parameters
Parameters file_path, run_params, profile, session_id, profile_id, cluster_id, session_name, profile_name, cluster_name can use Jinja templates.
Parameter | Type | Required | Description |
file_path | string | Yes | The path of the Notebook file (.pynb). |
profile | dict | No | The configuration information of the Notebook session.
|
profile_id | string | No Note Required when not reusing a session. |
|
profile_name | string | ||
cluster_type | string | The type of the compute cluster for the DMS workspace. Valid values:
| |
cluster_id | string |
Note Choose one of the two. | |
cluster_name | string | ||
spec | string | The resource specification of the driver. Valid values:
| |
runtime_name | string | The image name. | |
session_id | string | No Note Required when reusing a session. | The information of the reused session.
Note Choose one of the two. |
session_name | string | ||
run_params | dict | No | The runtime parameters that can replace variables in the Notebook file. |
timeout | int | No | The execution duration (timeout period) of the Notebook file, in seconds. |
polling_interval | int | No | The interval at which the execution result is refreshed. The unit is seconds. The default value is 10. |
Example
The task_id and dag parameters are specific to Airflow. For more information, see the official Airflow documentation.
from airflow import DAG
from airflow.decorators import task
from airflow.models.param import Param
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
import json
from airflow.providers.alibaba_dms.cloud.operators.dms_notebook import DMSNotebookOperator
with DAG(
"dms_notebook_test",
params={
"x":3
},
) as dag:
notebook_operator = DMSNotebookOperator(
task_id='notebook_test_hz_name',
profile_name='hansheng_profile.48',
profile={},
cluster_type='spark',
cluster_name='spark_general2.218',
spec='4C32G',
runtime_name='Spark3.5_Scala2.12_Python3.9_General:1.0.9',
file_path='/Workspace/code/default/test.ipynb',
run_params={
'a':"{{ params.x }}"
},
polling_interval=5,
dag=dag
)
run_this_last = EmptyOperator(
task_id="run_this_last",
dag=dag,
)
notebook_operator >> run_this_last
if __name__ == "__main__":
dag.test(
run_conf={}
)