表格存储高并发的写入性能以及低廉的存储成本非常适合物联网、日志、监控数据的存储。将数据写入到表格存储时,您可以通过函数计算对新增的数据做简单的清洗,将清洗后的数据写回到表格存储的另一种数据表中。同时,您也可以实时访问表格存储中的原始数据和结果数据。

样例场景

假设写入表格存储的为日志数据,且日志数据包括如下三个字段。为了便于日志查询,您需要将level>1的日志写入到表格存储的另一张数据表result中。

字段名称 类型 说明
id 整型 日志ID。
level 整型 日志的等级,数值越大表示日志等级越高。
message 字符串 日志的内容。

步骤一:为数据表开启Stream功能

使用触发器功能需要先在表格存储控制台开启数据表的Stream功能,才能在函数计算中处理写入表格存储中的增量数据。

  1. 登录表格存储控制台
  2. 概览页面,单击实例名称或在操作列单击实例管理
  3. 实例详情页签的数据表列表区域,单击数据表名称后选择实时消费通道页签或单击fig_001后选择实时消费通道
  4. 实时消费通道页签,单击Stream信息对应的开启
  5. 开启Stream功能对话框,设置日志过期时长,单击开启

    日志过期时长取值为非零整数,单位为小时,最长时长为168小时。

    说明 日志过期时长设置后不能修改,请谨慎设置。

步骤二:配置Tablestore触发器

在函数计算控制台创建Tablestore触发器来处理Tablestore数据表的实时数据流。

  1. 创建函数计算的服务。
    1. 登录函数计算控制台
    2. 在左侧导航栏,单击服务及函数
    3. 在顶部菜单栏,选择地域。
    4. 服务列表页面,单击创建服务
    5. 创建服务面板,填写服务名称和描述,按需设置日志与链路追踪功能。
      关于参数说明的更多信息,请参见管理服务
    6. 单击确定
      创建完成后,在服务列表页面,您可以查看到已创建的服务及其配置信息。
  2. 创建函数计算的函数。
    说明 系统支持从零开始创建、使用容器镜像创建、使用模板创建等方式创建函数。此处以从零开始创建方式为例介绍,其他创建方式,请分别参见使用容器镜像创建函数使用模板创建函数
    1. 服务列表页面,单击目标服务名称。
    2. 函数管理页面,单击创建函数
    3. 创建函数页面,选择创建函数的方式为从零开始创建
    4. 基本设置区域,根据下表说明填写函数基本信息。
      参数 是否必填 说明 本文示例
      函数名称 填写自定义的函数名称。必须以字母开头,可包含数字、字母(区分大小写)、下划线(_)和短划线(-),不超过64个字符。
      说明 如果不填写名称,函数计算会自动为您创建。
      Function
      运行环境 选择您熟悉的语言,例如Python、Java、PHP、Node.js等。函数计算支持的运行环境,请参见管理函数 Python 3.6
      请求处理程序类型 使用表格存储触发器时,必须设置为处理事件请求 处理事件请求
      实例类型 选择适合您的实例类型。取值范围如下:
      • 弹性实例
      • 性能实例

      更多信息,请参见实例类型及使用模式。关于各种实例类型的计费详情,请参见计费概述

      弹性实例
      内存规格 设置函数执行内存。
      • 选择输入:在下拉列表中选择所需内存。
      • 手动输入:单击手动输入内存大小,可自定义函数执行内存。内存规格说明如下:
        • 弹性实例:取值范围[128, 3072],单位为MB。
        • 性能实例:取值范围[4, 32],单位为GB。
        说明 输入的内存必须为64 MB的倍数。
      512 MB

      函数创建完成后,在函数管理页面,即可查看已创建的函数。

    5. 配置触发器区域,根据下表说明填写触发器相关参数。
      参数 操作 本文示例
      触发器类型 选择表格存储Tablestore 表格存储Tablestore
      名称 自定义填写触发器名称。 Tablestore-trigger
      实例 在下拉列表中选择已创建的Tablestore实例。 distribute-test
      表格 在下拉列表中选择已创建的数据表。 source_data
      角色名称 选择AliyunTableStoreStreamNotificationRole
      说明 如果您第一次创建该类型的触发器,则需要单击确定后,在弹出的对话框中选择立即授权,并根据系统提示完成角色创建和授权。
      AliyunTableStoreStreamNotificationRole
    6. 单击创建
      创建好的触发器会自动显示在触发器管理页签。
      说明 您也可以在表格存储控制台中数据表的触发器管理页签,查看和创建Tablestore触发器。

步骤三:验证测试

创建触发器后,通过在表格存储中写入和查询数据验证数据清洗是否成功。

  1. 编写代码。
    1. 函数管理页面,单击函数名称。
    2. 在函数详情页面,单击函数代码页签,在代码编辑器中编写代码。
      此处以Python函数代码为例介绍。其中INSTANCE_NAME(表格存储的实例名称)、REGION(使用的区域)、ENDPOINT(服务地址)需要根据情况进行修改。
      #!/usr/bin/env python
      # -*- coding: utf-8 -*-
      import cbor
      import json
      import tablestore as ots
      INSTANCE_NAME = 'distribute-test'
      REGION = 'cn-shanghai'
      ENDPOINT = 'http://%s.%s.vpc.tablestore.aliyuncs.com'%(INSTANCE_NAME, REGION)
      RESULT_TABLENAME = 'result'
      def _utf8(input):
          return str(bytearray(input, "utf-8"))
      def get_attrbute_value(record, column):
          attrs = record[u'Columns']
          for x in attrs:
              if x[u'ColumnName'] == column:
                  return x['Value']
      def get_pk_value(record, column):
          attrs = record[u'PrimaryKey']
          for x in attrs:
              if x['ColumnName'] == column:
                  return x['Value']
      #由于已经授权了AliyunOTSFullAccess权限,此处获取的credentials具有访问表格存储的权限。
      def get_ots_client(context):
          creds = context.credentials
          client = ots.OTSClient(ENDPOINT, creds.accessKeyId, creds.accessKeySecret, INSTANCE_NAME, sts_token = creds.securityToken)
          return client
      def save_to_ots(client, record):
          id = int(get_pk_value(record, 'id'))
          level = int(get_attrbute_value(record, 'level'))
          msg = get_attrbute_value(record, 'message')
          pk = [(_utf8('id'), id),]
          attr = [(_utf8('level'), level), (_utf8('message'), _utf8(msg)),]
          row = ots.Row(pk, attr)
          client.put_row(RESULT_TABLENAME, row)
      def handler(event, context):
          records = cbor.loads(event)
          #records = json.loads(event)
          client = get_ots_client(context)
          for record in records['Records']:
              level = int(get_attrbute_value(record, 'level'))
              if level > 1:
                  save_to_ots(client, record)
              else:
                  print "Level <= 1, ignore."
  2. 向source_data数据表中写入数据,依次填入id、level和message信息,并在result表中查询清洗后的数据。
    • 当向soure_data表中写入level>1的数据时,数据会同步到result表中。
    • 当向soure_data表中写入level<=1的数据时,数据不会同步到result表中。