本文为您介绍如何使用DataWorks的PyODPS类型节点,借助开源结巴中文分词包实现对中文字段的分词并写入新的表,以及如何通过闭包函数使用自定义词典进行分词。

前提条件

  • 请首先确保您已经完成DataWorks工作空间的创建,本示例使用绑定多个MaxCompute计算引擎的简单模式工作空间,详情请参见创建工作空间
  • 请在GitHub下载开源结巴分词中文包clone

背景信息

PyODPS集成了MaxCompute的Python SDK。您可以在DataWorks的PyODPS节点上直接编辑Python代码,并使用MaxCompute的Python SDK。关于PyODPS节点的详情请参见创建PyODPS 2节点
注意 本文的操作仅作为代码示例,不建议用于实际的生产环境。

操作步骤

  1. 创建业务流程。
    1. 登录DataWorks控制台
    2. 在左侧导航栏,单击工作空间列表
    3. 选择工作空间所在地域后,单击相应工作空间后的进入数据开发
    4. 鼠标悬停至新建图标,单击业务流程
    5. 新建业务流程对话框中,输入业务名称描述
      注意 业务名称必须是大小写字母、中文、数字、下划线(_)以及小数点(.),且不能超过128个字符。
    6. 单击新建
  2. 上传jieba-master.zip包。
    1. 展开新建的业务流程下的MaxCompute,右键单击资源,选择新建 > Archive
    2. 新建资源对话框中,配置各项参数。
      新建资源
      参数 描述
      资源名称 资源的名称,无需和上传的文件名保持一致,但需要符合以下规范:
      • 资源名称仅包含中文、字母、数字、英文句号(.)、下划线(_)和短划线(-)。
      • 资源类型为Archive时,资源名称和文件名的后缀必须一致,且后缀名包含.zip.tgz.tar.gz.tar
      目标文件夹 默认当前所在文件夹的路径,您可以进行修改。
      资源类型 此处选择Archive类型。
      说明 如果该资源包已经在MaxCompute(ODPS)客户端上传过,请取消勾选上传为ODPS资源,否则上传会报错。
      MaxCompute引擎实例 从下拉列表中选择该资源所在的计算引擎。

      如果您所在的工作空间仅绑定一个实例,则不会显示该参数。

      上传文件 单击点击上传,在本地选择已下载的文件jieba-master.zip后,单击打开
    3. 新建资源对话框中,单击确定
    4. 单击工具栏中的提交图标,在提交新版本对话框中,输入变更描述
    5. 单击确认
  3. 创建测试数据表。
    1. 展开新建的业务流程下的MaxCompute,右键单击,选择新建表
    2. 新建表对话框中,输入表名(示例为jieba_test),并选择MaxCompute引擎实例
    3. 单击提交
    4. 单击DDL模式,输入以下建表DDL语句。
      本教程准备了两列测试数据,您在后续开发过程中可以选择一列进行分词。
      CREATE TABLE jieba_test (
          `chinese` string,
          `content` string
      );
    5. 单击生成表结构
    6. 确认操作对话框中,单击确认
    7. 基本属性区域,输入表的中文名
    8. 单击提交到生产环境
    9. 提交生产确认对话框中,选中我已悉知风险,确认提交,单击确认
  4. 以同样的方式创建存放测试结果的数据表。
    本例仅对测试数据的chinese列进行分词处理,因此结果表仅有一列。DDL语句如下所示。
    CREATE TABLE jieba_result (
        `chinese` string
    ) ;
  5. 下载分词测试数据
  6. 上传测试数据:
    1. 数据开发页面,单击导入图标。
    2. 数据导入向导对话框中,输入需要导入数据的测试表jieba_test并选中,单击下一步
    3. 单击浏览,上传您下载至本地的jieba_test.csv文件,单击下一步
    4. 选中按名称匹配,单击导入数据
  7. 创建PyODPS 2节点。
    1. 展开业务流程中的MaxCompute,右键单击数据开发,选择新建 > PyODPS 2
    2. 新建节点对话框中,输入节点名称(示例为word_split),并选择目标文件夹
      说明 节点名称必须是大小写字母、中文、数字、下划线(_)和小数点(.),且不能超过128个字符。
    3. 单击提交
    4. 在节点的编辑页面,选中MaxCompute引擎实例,并输入下述PyODPS代码。
      def test(input_var):
          import jieba
          import sys
          reload(sys)
          sys.setdefaultencoding('utf-8')
          result=jieba.cut(input_var, cut_all=False)
          return "/ ".join(result)
      hints = {
          'odps.isolation.session.enable': True
      }
      libraries =['jieba-master.zip']  #引用您的jieba-master.zip压缩包。
      iris = o.get_table('jieba_test').to_df()  #引用您的jieba_test表中的数据。
      example = iris.chinese.map(test).execute(hints=hints, libraries=libraries)
      print(example)  #查看分词结果,分词结构为MAP类型数据。
      abci=list(example)   #将分词结果转为list类型的数据。
      i = 0
      for i in range(i,len(abci)):
          pq=str(abci[i])
          o.write_table('jieba_result',[pq])  #通过循环,逐条写入数据至结果表jieba_result中。
          i+=1
      else:
          print("done")
    5. 单击工具栏中的保存图标,保存输入的代码。
    6. 单击工具栏中的运行图标,在参数对话框中,从调度资源组下拉列表选择需要使用的资源组。
    7. 单击确定,测试PyODPS 2节点。
    8. 在页面下方的运行日志区域,查看结巴分词程序的运行结果。
  8. 创建和运行ODPS SQL节点。
    1. 展开业务流程中的MaxCompute,右键单击数据开发,选择新建 > ODPS SQL
    2. 新建节点对话框中,输入节点名称,并选择目标文件夹
      说明 节点名称必须是大小写字母、中文、数字、下划线(_)和小数点(.),且不能超过128个字符。
    3. 单击提交
    4. 在节点的编辑页面,输入SQL语句select * from jieba_result;
    5. 单击工具栏中的保存图标,保存输入的查询语句。
    6. 单击工具栏中的运行图标,在参数对话框中,从调度资源组下拉列表选择需要使用的资源组。
    7. 单击确定,运行查询语句。
    8. 成本估计对话框中,确认预估费用后,单击运行
    9. 在页面下方的运行日志区域,查看运行结果。
  9. 如果开源结巴分词的词库无法满足您的需求,需要使用自定义的词典。
    PyODPS自定义函数可以读取上传至MaxCompute的资源(表资源或文件资源)。此时,自定义函数需要写为闭包函数或Callable类。

    本文以使用闭包函数的方式,引用上传至MaxCompute的资源文件(即自定义词典)key_words.txt

    1. 展开业务流程中的MaxCompute,右键单击资源,选择新建 > File
    2. 新建资源对话框中,配置各项参数。
      资源
      参数 描述
      资源名称 资源的名称,仅支持中文、字母、数字、英文句号(.)、下划线(_)和短划线(-)。
      目标文件夹 默认当前所在文件夹的路径,您可以进行修改。
      资源类型 此处选择File类型。

      本文中的资源文件,在新建时请务必选中上传为ODPS资源

      MaxCompute引擎实例 从下拉列表中选择该资源所在的引擎实例。
    3. 单击确定
    4. 在资源的编辑页面,选择MaxCompute引擎实例,并输入自定义词典的内容。
      词典格式:
      • 一个词占一行。
      • 每一行包括词、词频(可省略)和词性(可省略),使用空格分隔,且不可以颠倒顺序。

      如果您从本地上传词典文件至DataWorks,则文件必须为UTF-8编码格式。

    5. 单击工具栏中的提交图标提交资源。
    6. 创建一个PyODPS 2节点,并输入如下代码。
      def test(resources):
          import jieba
          import sys
          reload(sys)
          sys.setdefaultencoding('utf-8')
          fileobj = resources[0]
      
          def h(input_var):#在嵌套函数h()中,执行词典加载和分词。
              import jieba
              jieba.load_userdict(fileobj)
              result=jieba.cut(input_var, cut_all=False)
              return "/ ".join(result)
          return h
      hints = {
          'odps.isolation.session.enable': True
      }
      libraries =['jieba-master.zip']  #引用您的jieba-master.zip压缩包。
      iris = o.get_table('jieba_test').to_df()  #引用您的jieba_test表中的数据。
      
      file_object = o.get_resource('key_words.txt') #get_resource()引用odps资源。
      example = iris.chinese.map(test, resources=[file_object]).execute(hints=hints, libraries=libraries) #map调用函数,并传递resources参数。
      
      print(example)  #查看分词结果,分词结构为MAP类型数据。
      abci=list(example)   #将分词结果转为list类型数据。
      i = 0
      for i in range(i,len(abci)):
          pq=str(abci[i])
          o.write_table('jieba_result',[pq])  #通过循环,逐条写入数据至结果表jieba_result中。
          i+=1
      else:
          print("done")
    7. 运行代码,对比引用自定义词典前后的结果。