全部产品
Search
文档中心

大数据开发治理平台 DataWorks:Notebook 进阶开发

更新时间:Jan 06, 2026

本文将引导您掌握 DataWorks Notebook 的进阶用法,核心目标是帮助您打通交互式开发与生产调度执行之间的壁垒。您将学习如何通过代码复用、数据挂载、参数管理等工程化手段提升开发效率,并掌握连接 MaxCompute Spark、EMR Serverless Spark 及 AnalyticDB for Spark 等多种计算引擎的实践技巧与调试方法。

说明

推荐您优先阅读Notebook 基础开发

理解开发与生产环境的差异

DataWorks Notebook 的核心定位是可被调度执行的开发和分析工具。这意味着它存在两种运行环境:

  • 开发环境:在Data Studio的Notebook 节点编辑页面,直接点击运行单元格,代码在个人开发环境实例中执行。此环境为快速验证、调试代码逻辑而设计。

  • 生产环境:Notebook 节点被提交并发布后,通过周期调度或补数据等触发执行。代码在一个独立的、临时的任务实例中运行。此环境为稳定、可靠地执行生产任务而设计。

这两种环境在功能支持上存在显著差异,提前理解这些差异是高效开发的关键。

开发与生产环境能力差异速查表

功能点

开发环境(运行单元格)

生产环境(周期调度/补数据等)

引用项目资源 (.py)

  • 首次引用:自动下载并生效。

  • 更新后:您需要点击工具栏的重启按钮,以重新加载已更新的 .py 模块。

    重要

    需在Data Studio设置中,修改资源冲突处理策略Dataworks › Notebook › Resource Reference: Download StrategyautoOverwrite。

自动生效。

读写数据集 (OSS/NAS)

需在个人开发环境中挂载数据集。

需在调度配置中挂载数据集。

引用工作空间参数 (${...})

生效,在代码执行前自动进行文本替换。

生效,在任务执行前自动进行文本替换。

Spark 会话管理

Spark会话的默认闲置释放时间为2小时,2小时内无新的代码执行将自动释放。

任务实例级短会话,随任务实例执行而自动创建与销毁。

在生产环境复用代码与数据

引用项目资源(.py 文件)

将通用的函数或类封装在独立的 .py 文件中,通过##@resource_reference{"自定义名.py"}引用MaxCompute资源的方式,实现代码的模块化与复用,提升代码的可维护性。

  1. 创建并发布 Python 资源

    1. 在 DataWorks Data Studio左侧导航栏单击image,进入资源管理

    2. 在资源管理目录树下,右键单击目标目录或单击右上角的 + ,选择新建资源 > MaxCompute Python,命名为 my_utils.py

    3. 文件内容处,点击在线编辑,将您调试好的工具函数代码粘贴到代码编辑框并保存

      # my_utils.py
      def greet(name):
          return f"Hello, {name} from resource file!"
    4. 单击工具栏中的保存,然后发布该资源,使其对开发和生产任务可见

  2. 在 Notebook 中引用资源

    在 Notebook 的Python 单元格第一行,使用 ##@resource_reference 语法引用已发布的资源。

    ##@resource_reference{"my_utils.py"}
    # 如果资源在目录下,比如 my_folder/my_utils.py,也是通过##@resource_reference{"my_utils.py"},无需带目录名称。
    from my_utils import greet
    
    message = greet('DataWorks')
    print(message)
  3. 开发环境调试运行

    运行Python单元格,将打印如下结果:

    Hello, DataWorks from resource file!
    重要

    在开发环境调试运行时,如果识别到代码中包含##@resource_reference,系统将会把资源管理中的目标文件自动下载至个人目录的workspace/_dataworks/resource_references路径中,从而进行目标文件的引用。

    若报错ModuleNotFoundError,点击编辑器上方工具栏的重启按钮重新加载资源后再重试。

  4. 发布至生产环境并验证

    保存发布此 Notebook 节点后,前往运维中心 > 周期任务单击测试运行。任务成功后,您将在日志中看到输出 Hello, DataWorks from resource file!

    重要

    若报错There is no file with id ...,请先将Python资源发布至生产环境。

更多操作,请参见MaxCompute资源与函数

读写数据集(OSS/NAS)

在 Notebook 任务运行时,方便地读写存储在 OSS 或 NAS 上的大规模文件数据。

开发环境调试

  1. 挂载数据集:进入个人开发环境详情页,在存储配置 > 数据集中配置数据集。

  2. 在代码中访问:数据集将被挂载到个人开发环境的挂载路径下,在代码中直接读写该路径。

    # 假设已在个人开发环境设置了挂载路径为/mnt/data/dataset的数据集
    import pandas as pd
    
    # 直接使用挂载路径
    file_path = '/mnt/data/dataset/testfile.csv'
    df = pd.read_csv(file_path)
    
    # 使用 PyODPS 将数据写入 MaxCompute
    o = %odps
    o.write_table('mc_test_table', df, overwrite=True)
    print(f"成功将数据写入 MaxCompute 表 mc_test_table。")

生产环境部署

  1. 挂载数据集:在 Notebook 节点编辑页面的右侧导航栏,进入调度配置 > 调度策略,添加上述数据集。

  2. 在代码中访问:提交发布之后,数据集将被挂载到生产环境的挂载路径下,在代码中直接读写该路径。

    # 假设已在个人开发环境设置了挂载路径为/mnt/data/dataset的数据集
    import pandas as pd
    
    # 直接使用挂载路径
    file_path = '/mnt/data/dataset/testfile.csv'
    df = pd.read_csv(file_path)
    
    # 使用 PyODPS 将数据写入 MaxCompute
    o = %odps
    o.write_table('mc_test_table', df, overwrite=True)
    print(f"成功将数据写入 MaxCompute 表 mc_test_table。")
更多操作,请参见在个人开发环境中使用数据集

引用工作空间参数

重要

仅支持DataWorks专业版及以上。

DataWorks 在已有调度参数的基础上引入工作空间参数,主要是为了解决跨任务、跨节点的全局配置复用与环境隔离问题。可在SQL单元格和Python单元格内以${workspace.param}格式来引用工作空间参数,其中param是您创建的工作空间参数名称。

1、创建工作空间参数:使用前,需前往 DataWorks 运维中心 > 调度设置 > 工作空间参数 页面创建所需参数。

2、引用工作空间参数

  • SQL单元格引用工作空间参数。

    SELECT '${workspace.param}';

    查询工作空间参数,运行成功后,将打印输出工作空间参数的具体赋值。

  • Python单元格引用工作空间参数。

    print('${workspace.param}')

    输出工作空间参数,运行成功后,将打印输出工作空间参数的具体赋值。

更多详情,请参见使用工作空间参数

Magic Command与计算引擎交互

Magic Command 是以 %%% 开头的特殊命令,用于简化Python单元格与各类计算资源的交互。

连接 MaxCompute

说明

在建立MaxCompute计算资源连接前,请确保已绑定MaxCompute计算资源

  • %odps:获取 PyODPS 入口对象

    此命令返回一个与当前MaxCompute项目绑定、已认证的 PyODPS 对象,避免在代码中硬编码 AccessKey,是与 MaxCompute 交互的推荐方式。

    1. 使用Magic Command创建MaxCompute连接。 输入%odps,右下角会出现MaxCompute的计算资源选择器入口(并自动选择计算资源)。点击右下角的MaxCompute项目名称,可更换MaxCompute项目。

      o=%odps 
    2. 使用获取到的MaxCompute计算资源运行PyODPS脚本。

      例如,获取当前项目下的所有表:

      with o.execute_sql('show tables').open_reader() as reader:
          print(reader.raw)
  • %maxframe:建立 MaxFrame 连接

    此命令用于创建 MaxFrame 会话,为 MaxCompute 提供类似 Pandas 的分布式数据处理能力。

    # 连接并访问MaxCompute MaxFrame Session
    mf_session = %maxframe
    
    df = mf_session.read_odps_table('your_mc_table')
    print(df.head())
    
    # 开发调试结束后,应手动销毁会话以释放资源
    mf_session.destroy()

连接 Spark 计算资源

DataWorks Notebook 支持连接多种 Spark 引擎。不同引擎在连接方式、执行上下文和资源管理上存在差异。

重要

同一个Notebook节点,仅支持使用Magic Command连接一种计算资源。

引擎对比

特性

MaxCompute Spark

EMR Serverless Spark

AnalyticDB for Spark

连接命令

%maxcompute_spark

%emr_serverless_spark

%adb_spark add

说明

执行后,整个 Notebook 内核的执行上下文会切换至远端的 PySpark 环境。后续单元格可直接编写 PySpark 代码。

前提条件

绑定 MaxCompute 资源

绑定 EMR 计算资源 并创建 Livy Gateway。

绑定 ADB Spark 计算资源。

开发环境模式

自动创建/复用 Livy 会话。

连接已有的 Livy Gateway,创建会话。

自动创建/复用 Spark Connect Server。

生产环境模式

Livy模式:通过 Livy 服务提交 Spark 作业。

spark-submit 批处理模式:纯批处理,不保留会话状态。

Spark Connect Server 模式:通过 Spark 连接服务进行交互。

生产资源释放

任务实例结束后自动释放会话。

任务实例结束后自动清理资源。

任务实例结束后自动释放资源。

适用场景

与 MaxCompute 生态紧密集成的通用批处理、ETL 任务。

需要灵活配置、与开源大数据生态(如 Hudi, Iceberg)交互的复杂分析任务。

针对 ADB for MySQL C-Store 表的高性能交互式查询与分析。

MaxCompute Spark

说明

在建立MaxCompute计算资源连接前,请确保已绑定MaxCompute计算资源

通过 Livy 连接到 MaxCompute 项目内置的 Spark 引擎。

  1. 建立连接:在 Python 单元格中运行以下命令。系统将自动创建或复用 Spark 会话。

    # 创建Spark Session,并停止Livy。
    %maxcompute_spark
  2. 执行 PySpark 代码:连接成功后,在新的 Python 单元格中,使用 %%spark Cell Magic 执行 PySpark 代码。

    # 在使用MaxCompute Spark时,Python单元格中必须以%%spark作为首行代码
    %%spark
    
    df = spark.sql("SELECT * FROM your_mc_table LIMIT 10")
    df.show()
  3. 手动释放连接:在开发调试结束后,可手动停止或删除会话。在生产环境运行时,系统将自动停止并删除当前任务实例的Livy,无需手动处理。

    # 清理Spark Session,并停止Livy。
    %maxcompute_spark stop
    
    # 清理Spark Session,并停止Livy,最后删除Livy
    %maxcompute_spark delete

EMR Serverless Spark

说明

在建立计算资源连接前,请先在工作空间绑定EMR Serverless Spark计算资源,并创建Livy Gateway

通过连接您预先创建的 Livy Gateway 与 EMR Serverless Spark 交互。

  1. 建立连接:在执行命令前,需在单元格右下角选择要连接的 EMR 计算资源Livy Gateway

    # 基础连接
    %emr_serverless_spark
    
    # 或在连接时传入自定义 Spark 参数,注意需要自定义Spark参数时,需要写两个百分号
    %%emr_serverless_spark
    {
      "spark_conf": {
        "spark.emr.serverless.environmentId": "<EMR Serverless Spark 运行环境ID>",
        "spark.emr.serverless.network.service.name": "<EMR Serverless Spark 网络连接ID>",
        "spark.driver.cores": "1",
        "spark.driver.memory": "8g",
        "spark.executor.cores": "1",
        "spark.executor.memory": "2g",
        "spark.driver.maxResultSize": "32g"
      }
    }
    说明

    自定义参数与全局配置的关系

    • 默认行为:您在此处定义的自定义参数仅对本次连接(Session)生效,是一次性的。如果您未提供自定义参数,系统将自动使用在管理中心中配置的全局参数。

    • 推荐用法:对于需要在多个任务或被多人复用的配置,建议在管理中心 > Serverless Spark > SPARK参数中进行全局配置,以保证一致性并方便统一管理。

    • 优先级规则:当同一个参数在自定义参数和全局配置中都被设置时,哪个最终生效,取决于管理中心中的局配置是否优先选项:

      • 勾选:全局配置将覆盖本次的自定义参数。

      • 不勾选:本次的自定义参数将覆盖全局配置。

  2. (可选)重新连接:若Livy gateway页面的token被管理员误删后可通过该命令重新创建。

    # 重新连接,刷新当前个人开发环境的livy token
    %emr_serverless_spark refresh_token
  3. 执行 PySpark 或 SQL 代码:连接成功后,内核已切换。您可以在 Python 单元格 中直接编写 PySpark 代码,或在 EMR Spark SQL 单元格 中编写 SQL。

    1. 通过EMR Spark SQL向计算资源提交并执行SQL代码

      经过%emr_serverless_spark成功建立连接后,可在EMR Spark SQL Cell中,直接写SQL语句,无需在单元格中选择计算资源。

      EMR Spark SQL Cell将复用%emr_serverless_spark的连接,提交至目标计算资源中运行。

      image

    2. 通过Python向计算资源提交并执行PySpark代码

      通过 %emr_serverless_spark 成功建立连接后,可以在新的Python单元格中提交和执行PySpark代码,无需在单元格中添加%%spark前缀。

      image

  4. 手动释放连接

    重要

    在多人共享一个 Livy Gateway 的情况下,`stop` 或 `delete` 命令会影响所有正在使用该网关的用户,请谨慎操作。

    # 清理Spark Session,并停止Livy。
    %emr_serverless_spark stop
    
    # 清理Spark Session,并停止Livy,最后删除Livy
    %emr_serverless_spark delete

AnalyticDB for Spark

说明

在建立计算资源连接前,请先在工作空间绑定AnalyticDB for Spark计算资源

通过创建 Spark Connect Server 连接到 AnalyticDB for Spark 引擎。

  1. 建立连接:为确保网络连通,必须在连接参数中正确配置交换机 ID 和安全组 ID。 在执行命令前,需在单元格右下角选择 ADB Spark 计算资源

    # 必须配置交换机ID和安全组ID以建立网络连接
    %adb_spark add \
     --spark-conf spark.adb.version=3.5 \
     --spark-conf spark.adb.eni.enabled=true \
     --spark-conf spark.adb.eni.vswitchId=<ADB的交换机ID> \
     --spark-conf spark.adb.eni.securityGroupId=<个人开发环境的安全组ID>

    如何查找交换机ID和安全组ID?

    • 交换机ID (vswitchId):前往阿里云AnalyticDB MySQL控制台,在实例详情页查看网络信息交换机ID

    • 安全组ID (securityGroupId):进入个人开发环境详情页的网络设置中所选安全组,查看以sg-开头的安全组ID。

      重要

      为确保网络连通,创建个人开发环境时,建议选择与您的AnalyticDB for Spark实例相同的VPC和交换机。

  2. 执行 PySpark 代码:连接成功后,在新的 Python 单元格中,执行 PySpark 代码。

    # 只能对 C-Store 表执行操作
    df = spark.sql("SELECT * FROM my_adb_cstore_table LIMIT 10")
    df.show()
    说明:AnalyticDB for Spark 引擎当前只能处理具有 'storagePolicy'='COLD' 属性的 C-Store 表。
  3. 手动释放连接:在开发环境调试结束后,手动清理连接会话以节约资源。生产环境运行时,系统将自动清理资源。

    %adb_spark cleanup

附录:Magic Command 速查表

Magic Command

Magic Command含义

适用计算引擎

o = %odps

获取 PyODPS 入口对象

MaxCompute

mf_session = %maxframe

建立 MaxFrame 连接

%maxcompute_spark

创建Spark Session

MaxCompute Spark

%maxcompute_spark stop

清理Spark Session,并停止Livy。

%maxcompute_spark delete

清理Spark Session,停止并删除Livy。

%%spark

在Python单元格中,连接已创建的Spark计算资源。

%emr_serverless_spark

创建Spark Session

EMR Serverless Spark

%emr_serverless_spark info

查看Livy Gateway的详细信息。

%emr_serverless_spark stop

清理Spark Session,并停止Livy。

%emr_serverless_spark delete

清理Spark Session,停止并删除Livy。

%emr_serverless_spark refresh_token

刷新个人开发环境的Livy Token

%adb_spark add

创建并连接到一个可复用的ADB Spark会话(Session)。

AnalyticDB for Spark

%adb_spark info

查看Spark Session信息.

%adb_spark cleanup

停止并清理当前的Spark连接会话。

常见问题

  • Q:引用工作空间资源时,报错ModuleNotFoundErrorThere is no file with id ...

    请按照以下步骤检查:

    1. 请前往数据开发 > 资源管理,确保MaxCompute Python资源已保存;如果是生产环境报该错误,需确认资源已发布至生产环境。

    2. 请点击Notebook编辑器上方工具栏的重启按钮,尝试重新加载资源。

  • Q:当我更新工作空间资源后,为什么还是引用旧资源?

    A:当对资源修改重新发布时,需在Data Studio设置中,修改资源冲突处理策略Dataworks › Notebook › Resource Reference: Download StrategyautoOverwrite,并在Notebook顶部工具栏点击重启Kernel

  • Q:引用数据集时,开发环境报错FileNotFoundError

    A:请确保已在当前选择的个人开发环境中挂载数据集。

  • Q:引用数据集时,开发环境成功,但生产环境报错Execute mount dataset exception! Please check your dataset config

    A:请确保已在Notebook节点的调度配置挂载数据集,并给OSS数据集完成授权

    image

  • Q:如何查看个人开发环境的版本?

    A:进入个人开发环境后,通过快捷键CMD+SHIFT+P,并输入ABOUT查看当前版本。若随着产品功能更新与迭代,需要0.5.69及以上版本的个人开发环境实例,可通过界面升级提示弹窗执行一键升级

  • Q:连接 Spark 引擎失败?

    1. 通用检查:前往工作空间详情的计算资源列表,确认对应的计算资源(MaxCompute/EMR/ADB)是否已正确绑定到当前工作空间,且您的账号具备相应权限。

    2. EMR Serverless Spark:检查 Livy Gateway 是否已创建且状态正常。

    3. AnalyticDB for Spark:重点排查网络问题。确认 vswitchIdsecurityGroupId 配置正确,确保个人开发环境与 ADB Spark 实例之间网络互通。检查安全组规则是否允许必要的端口通信。