本文为您介绍如何创建PyODPS 3节点,以及在DataWorks使用PyODPS 3的限制。
背景信息
- DataWorks支持可视化创建Python资源,如果您需要在PyODPS节点中调用第三方包,请使用独享调度资源组并通过运维助手进行安装。
- DataWorks运维助手中安装的Python第三方包,仅支持在DataWorks独享调度资源组本地运行PyODPS任务代码时引用,如果需要在MaxCompute的Python UDF中引用Python第三方包,详情请参见Python UDF使用第三方包。
- 如果您的PyODPS任务需要访问特殊的网络环境(如VPC网络或IDC网络中的数据源或服务等),请使用独享调度资源组,并参考网络连通解决方案打通独享资源组与目标环境的网络连通。
- PyODPS语法及更多信息请参见PyODPS文档。
- PyODPS节点分为PyODPS 2和PyODPS 3两种,二者的区别在于底层Python语言版本不同。PyODPS 2底层Python语言版本为Python 2,PyODPS 3底层Python语言版本为Python 3,请您根据实际使用的Python语言版本创建PyODPS节点。
使用限制
- DataWorks建议您在PyODPS节点内获取到本地处理的数据不超过50 MB,该操作受限于DataWorks执行资源的不同规格(包括公共调度资源组和独享调度资源组),处理的本地数据过多并超出操作系统阈值时可能发生OOM(Got killed)错误。请避免在PyODPS节点中写入过多的数据处理代码。详情请参见高效使用PyODPS最佳实践。
- 如果您发现有Got killed报错,即表明内存使用超限,进程被中止。因此,请尽量避免本地的数据操作。通过PyODPS发起的SQL和DataFrame任务(除to_pandas外)不受此限制。
- 非自定义函数代码可以使用平台预装的Numpy和Pandas。不支持其他带有二进制代码的三方包。
- 由于兼容性原因,在DataWorks中,options.tunnel.use_instance_tunnel默认设置为False。如果需要全局开启instance tunnel,需要手动将该值设置为True。
- 当Python 3的子版本号不同(例如Python 3.8和Python 3.7)时,字节码的定义有所不同。
目前MaxCompute使用的Python 3版本为3.7,当使用其它版本Python 3中的部分语法(例如Python 3.8中的finally block)时,执行会报错,建议您选择Python 3.7。
- PyODPS 3支持运行在公共资源组和2020年4月之后购买的独享调度资源组上。如果您的独享调度资源组的创建时间较早,请通过DataWorks交流群联系值班同学升级资源组。
使用流程
更多关于PyODPS语法说明,请参见PyODPS文档。操作 | 说明 |
---|---|
新建PyODPS 3节点 | DataWorks提供PyODPS 3节点类型,集成了MaxCompute的Python SDK。您可以创建PyODPS节点直接编辑Python代码。 |
ODPS入口 | DataWorks的PyODPS 3节点中,将会包含一个全局的变量odps或o,即ODPS入口,您无需手动定义ODPS入口。 |
执行SQL | PyODPS 3支持ODPS SQL的查询,您可以在PyODPS节点中执行SQL。 |
设置运行参数 | 您可以通过设置hints参数,来设置运行时的参数。参数类型是dict。 |
读取运行结果 | 您可以读取SQL执行结果。 |
DataFrame | 您还可以通过DataFrame的方式处理数据。 |
配置调度属性 | 若节点需要周期性调度,您需要定义节点调度时的相关属性。 |
提交节点 | 您可以提交节点,标准模式工作空间提交任务后,任务仅在开发环境生效,开发环境中的任务不进行自动调度,您需要通过任务发布流程,将任务发布至生产环境后,任务才会周期调度运行。 |
新建PyODPS 3节点
- 进入数据开发页面。
- 登录DataWorks控制台。
- 在左侧导航栏,单击工作空间列表。
- 选择工作空间所在地域后,单击相应工作空间后的数据开发。
- 鼠标悬停至
图标,单击 。
- 在新建节点对话框中,输入节点名称,并选择目标文件夹。
- 单击提交。
ODPS入口
DataWorks的PyODPS 3节点中,将会包含一个全局的变量odps或o,即ODPS入口,您无需手动定义ODPS入口。
print(odps.exist_table('PyODPS_iris'))
执行SQL
您可以在PyODPS节点中执行SQL,详情请参见SQL。
Dataworks上默认未开启instance tunnel,即instance.open_reader默认使用Result接口(最多一万条记录)。您可以通过reader.count获取记录数。如果您需要迭代获取全部数据,则需要关闭
limit
限制。您可以通过下列语句在全局范围内打开Instance Tunnel并关闭limit
限制。 options.tunnel.use_instance_tunnel = True
options.tunnel.limit_instance_tunnel = False # 关闭limit限制,读取全部数据。
with instance.open_reader() as reader:
# 通过Instance Tunnel可读取全部数据。
您也可以通过在tunnel=True
,实现仅对本次open_reader开启instance tunnel。同时,您还可以添加 limit=False
,实现仅对本次关闭limit
限制。
with instance.open_reader(tunnel=True, limit=False) as reader:
# 本次open_reader使用Instance Tunnel接口,且能读取全部数据。
设置运行参数
您可以通过设置hints参数,来设置运行时的参数,参数类型是dict。 Hints参数的详情请参见SET操作。o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})
对全局配置设置sql.settings后,每次运行时,都需要添加相关的运行时的参数。
from odps import options
options.sql.settings = {'odps.sql.mapper.split.size': 16}
o.execute_sql('select * from PyODPS_iris') # 根据全局配置添加hints。
读取运行结果
运行SQL的实例能够直接执行open_reader的操作,有以下两种情况:- SQL返回了结构化的数据。
with o.execute_sql('select * from dual').open_reader() as reader: for record in reader: # 处理每一个record。
- 可能执行的是desc等SQL语句,通过reader.raw属性,获取到原始的SQL执行结果。
with o.execute_sql('desc dual').open_reader() as reader: print(reader.raw)
说明 如果使用了自定义调度参数,页面上直接触发运行PyODPS 3节点时,需要写死时间,PyODPS节点无法直接替换。
DataFrame
- 执行
在DataWorks的环境里,DataFrame的执行需要显式调用立即执行的方法。
from odps.df import DataFrame iris = DataFrame(o.get_table('pyodps_iris')) for record in iris[iris.sepal_width < 3].execute(): # 调用立即执行的方法,处理每条Record。
如果您需要在Print时调用立即执行,需要开启
options.interactive
。from odps import options from odps.df import DataFrame options.interactive = True # 在开始处打开开关。 iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepal_width.sum()) # Print时会立即执行。
- 打印详细信息
通过设置
options.verbose
选项。在DataWorks上,默认已经处于打开状态,运行过程会打印Logview等详细过程。
配置调度属性
单击节点编辑区域右侧的调度配置,在参数区域配置自定义参数,PyODPS节点与SQL节点定义变量的方式不同,详情请参见调度参数配置。
与DataWorks中的SQL节点不同,为了避免影响代码,PyODPS节点不会在代码中替换 ${param_name}这样的字符串,而是在执行代码前,在全局变量中增加一个名为args
的dict,调度参数可以在此获取。例如,在参数中设置ds=${yyyymmdd}
,则可以通过以下方式在代码中获取该参数。
print('ds=' + args['ds'])
ds=20161116
说明 如果您需要获取名为
ds=${yyyymmdd}
的分区,则可以使用如下方法。o.get_table('table_name').get_partition('ds=' + args['ds'])
提交节点
重要 提交节点前,您需要设置重跑属性和依赖的上游节点。
- 单击工具栏中的
图标。
- 在提交新版本对话框中,输入备注。
- 单击确认。
任务运维,详情请参见:周期任务运维概述
如何判断Python自定义脚本任务的成功完成
判断逻辑和Shell节点一致,详情请参见: 如何判断Shell自定义脚本任务的成功完成。