本文为您介绍如何使用DataWorks的PyODPS类型节点,借助开源结巴中文分词包实现对中文字段的分词并写入新的表,以及如何通过闭包函数使用自定义词典进行分词。
背景信息
注意 本文的操作仅作为代码示例,不建议用于实际的生产环境。
操作步骤
- 创建业务流程。
- 登录DataWorks控制台。
- 在左侧导航栏,单击工作空间列表。
- 选择工作空间所在地域后,单击相应工作空间后的进入数据开发。
- 鼠标悬停至
图标,单击业务流程。
- 在新建业务流程对话框中,输入业务名称和描述。注意 业务名称必须是大小写字母、中文、数字、下划线(_)以及小数点(.),且不能超过128个字符。
- 单击新建。
- 上传jieba-master.zip包。
- 展开新建的业务流程下的MaxCompute,右键单击资源,选择新建 > Archive。
- 在新建资源对话框中,配置各项参数。
参数 描述 资源名称 资源的名称,无需和上传的文件名保持一致,但需要符合以下规范: - 资源名称仅包含中文、字母、数字、英文句号(.)、下划线(_)和短划线(-)。
- 资源类型为Archive时,资源名称和文件名的后缀必须一致,且后缀名包含.zip、.tgz、.tar.gz或.tar。
目标文件夹 默认当前所在文件夹的路径,您可以进行修改。 资源类型 此处选择Archive类型。 说明 如果该资源包已经在MaxCompute(ODPS)客户端上传过,请取消勾选上传为ODPS资源,否则上传会报错。MaxCompute引擎实例 从下拉列表中选择该资源所在的计算引擎。 如果您所在的工作空间仅绑定一个实例,则不会显示该参数。
上传文件 单击点击上传,在本地选择已下载的文件jieba-master.zip后,单击打开。 - 在新建资源对话框中,单击确定。
- 单击工具栏中的
图标,在提交新版本对话框中,输入变更描述。
- 单击确认。
- 创建测试数据表。
- 展开新建的业务流程下的MaxCompute,右键单击表,选择新建表。
- 在新建表对话框中,输入表名(示例为jieba_test),并选择MaxCompute引擎实例。
- 单击提交。
- 单击DDL模式,输入以下建表DDL语句。本教程准备了两列测试数据,您在后续开发过程中可以选择一列进行分词。
CREATE TABLE jieba_test ( `chinese` string, `content` string );
- 单击生成表结构。
- 在确认操作对话框中,单击确认。
- 在基本属性区域,输入表的中文名。
- 单击提交到生产环境。
- 在提交生产确认对话框中,选中我已悉知风险,确认提交,单击确认。
- 以同样的方式创建存放测试结果的数据表。本例仅对测试数据的chinese列进行分词处理,因此结果表仅有一列。DDL语句如下所示。
CREATE TABLE jieba_result ( `chinese` string ) ;
- 下载分词测试数据。
- 上传测试数据:
- 在数据开发页面,单击
图标。
- 在数据导入向导对话框中,输入需要导入数据的测试表jieba_test并选中,单击下一步。
- 单击浏览,上传您下载至本地的jieba_test.csv文件,单击下一步。
- 选中按名称匹配,单击导入数据。
- 在数据开发页面,单击
- 创建PyODPS 2节点。
- 展开业务流程中的MaxCompute,右键单击数据开发,选择新建 > PyODPS 2。
- 在新建节点对话框中,输入节点名称(示例为word_split),并选择目标文件夹。说明 节点名称必须是大小写字母、中文、数字、下划线(_)和小数点(.),且不能超过128个字符。
- 单击提交。
- 在节点的编辑页面,选中MaxCompute引擎实例,并输入下述PyODPS代码。
def test(input_var): import jieba import sys reload(sys) sys.setdefaultencoding('utf-8') result=jieba.cut(input_var, cut_all=False) return "/ ".join(result) hints = { 'odps.isolation.session.enable': True } libraries =['jieba-master.zip'] #引用您的jieba-master.zip压缩包。 iris = o.get_table('jieba_test').to_df() #引用您的jieba_test表中的数据。 example = iris.chinese.map(test).execute(hints=hints, libraries=libraries) print(example) #查看分词结果,分词结构为MAP类型数据。 abci=list(example) #将分词结果转为list类型的数据。 i = 0 for i in range(i,len(abci)): pq=str(abci[i]) o.write_table('jieba_result',[pq]) #通过循环,逐条写入数据至结果表jieba_result中。 i+=1 else: print("done")
- 单击工具栏中的
图标,保存输入的代码。
- 单击工具栏中的
图标,在参数对话框中,从调度资源组下拉列表选择需要使用的资源组。
- 单击确定,测试PyODPS 2节点。
- 在页面下方的运行日志区域,查看结巴分词程序的运行结果。
- 创建和运行ODPS SQL节点。
- 展开业务流程中的MaxCompute,右键单击数据开发,选择新建 > ODPS SQL。
- 在新建节点对话框中,输入节点名称,并选择目标文件夹。说明 节点名称必须是大小写字母、中文、数字、下划线(_)和小数点(.),且不能超过128个字符。
- 单击提交。
- 在节点的编辑页面,输入SQL语句
select * from jieba_result;
。 - 单击工具栏中的
图标,保存输入的查询语句。
- 单击工具栏中的
图标,在参数对话框中,从调度资源组下拉列表选择需要使用的资源组。
- 单击确定,运行查询语句。
- 在成本估计对话框中,确认预估费用后,单击运行。
- 在页面下方的运行日志区域,查看运行结果。
- 如果开源结巴分词的词库无法满足您的需求,需要使用自定义的词典。PyODPS自定义函数可以读取上传至MaxCompute的资源(表资源或文件资源)。此时,自定义函数需要写为闭包函数或Callable类。
本文以使用闭包函数的方式,引用上传至MaxCompute的资源文件(即自定义词典)key_words.txt:
- 展开业务流程中的MaxCompute,右键单击资源,选择新建 > File。
- 在新建资源对话框中,配置各项参数。
参数 描述 资源名称 资源的名称,仅支持中文、字母、数字、英文句号(.)、下划线(_)和短划线(-)。 目标文件夹 默认当前所在文件夹的路径,您可以进行修改。 资源类型 此处选择File类型。 本文中的资源文件,在新建时请务必选中上传为ODPS资源。
MaxCompute引擎实例 从下拉列表中选择该资源所在的引擎实例。 - 单击确定。
- 在资源的编辑页面,选择MaxCompute引擎实例,并输入自定义词典的内容。词典格式:
- 一个词占一行。
- 每一行包括词、词频(可省略)和词性(可省略),使用空格分隔,且不可以颠倒顺序。
如果您从本地上传词典文件至DataWorks,则文件必须为UTF-8编码格式。
- 单击工具栏中的
图标提交资源。
- 创建一个PyODPS 2节点,并输入如下代码。
def test(resources): import jieba import sys reload(sys) sys.setdefaultencoding('utf-8') fileobj = resources[0] def h(input_var):#在嵌套函数h()中,执行词典加载和分词。 import jieba jieba.load_userdict(fileobj) result=jieba.cut(input_var, cut_all=False) return "/ ".join(result) return h hints = { 'odps.isolation.session.enable': True } libraries =['jieba-master.zip'] #引用您的jieba-master.zip压缩包。 iris = o.get_table('jieba_test').to_df() #引用您的jieba_test表中的数据。 file_object = o.get_resource('key_words.txt') #get_resource()引用odps资源。 example = iris.chinese.map(test, resources=[file_object]).execute(hints=hints, libraries=libraries) #map调用函数,并传递resources参数。 print(example) #查看分词结果,分词结构为MAP类型数据。 abci=list(example) #将分词结果转为list类型数据。 i = 0 for i in range(i,len(abci)): pq=str(abci[i]) o.write_table('jieba_result',[pq]) #通过循环,逐条写入数据至结果表jieba_result中。 i+=1 else: print("done")
- 运行代码,对比引用自定义词典前后的结果。