表格存储高并发的写入性能以及低廉的存储成本非常适合物联网、日志、监控数据的存储。将数据写入到表格存储时,您可以通过函数计算对新增的数据做简单的清洗,将清洗后的数据写回到表格存储的另一种数据表中。同时,您也可以实时访问表格存储中的原始数据和结果数据。
样例场景
假设写入表格存储的为日志数据,且日志数据包括如下三个字段。为了便于日志查询,您需要将level>1的日志写入到表格存储的另一张数据表result中。
字段名称 |
类型 |
说明 |
id |
整型 |
日志ID。 |
level |
整型 |
日志的等级,数值越大表示日志等级越高。 |
message |
字符串 |
日志的内容。 |
步骤一:为数据表开启Stream功能
使用触发器功能需要先在表格存储控制台开启数据表的Stream功能,才能在函数计算中处理写入表格存储中的增量数据。
- 登录表格存储控制台。
- 在概览页面,单击实例名称或在操作列单击实例管理。
- 在实例详情页签的数据表列表区域,单击数据表名称后选择实时消费通道页签或单击
后选择实时消费通道。
- 在实时消费通道页签,单击Stream信息对应的开启。
- 在开启Stream功能对话框,设置日志过期时长,单击开启。
日志过期时长取值为非零整数,单位为小时,最长时长为168小时。
步骤二:配置Tablestore触发器
在函数计算控制台创建Tablestore触发器来处理Tablestore数据表的实时数据流。
- 创建函数计算的服务。
- 登录函数计算控制台。
- 在左侧导航栏,单击服务及函数。
- 在顶部菜单栏,选择地域。
- 在服务列表页面,单击创建服务。
- 在创建服务面板,填写服务名称和描述,按需设置日志与链路追踪功能。
- 单击确定。
创建完成后,在服务列表页面,您可以查看到已创建的服务及其配置信息。
- 创建函数计算的函数。
- 在服务列表页面,单击目标服务名称。
- 在函数管理页面,单击创建函数。
- 在创建函数页面,选择创建函数的方式为从零开始创建。
- 在基本设置区域,根据下表说明填写函数基本信息。
参数 |
是否必填 |
说明 |
本文示例 |
函数名称 |
否 |
填写自定义的函数名称。必须以字母开头,可包含数字、字母(区分大小写)、下划线(_)和短划线(-),不超过64个字符。
|
Function |
运行环境 |
是 |
选择您熟悉的语言,例如Python、Java、PHP、Node.js等。函数计算支持的运行环境,请参见管理函数。
|
Python 3.6 |
请求处理程序类型 |
是 |
使用表格存储触发器时,必须设置为处理事件请求。
|
处理事件请求 |
实例类型 |
是 |
选择适合您的实例类型。取值范围如下:
更多信息,请参见实例类型及使用模式。关于各种实例类型的计费详情,请参见计费概述。
|
弹性实例 |
内存规格 |
是 |
设置函数执行内存。
- 选择输入:在下拉列表中选择所需内存。
- 手动输入:单击手动输入内存大小,可自定义函数执行内存。内存规格说明如下:
- 弹性实例:取值范围[128, 3072],单位为MB。
- 性能实例:取值范围[4, 32],单位为GB。
|
512 MB |
函数创建完成后,在函数管理页面,即可查看已创建的函数。
- 在配置触发器区域,根据下表说明填写触发器相关参数。
参数 |
操作 |
本文示例 |
触发器类型 |
选择表格存储Tablestore。
|
表格存储Tablestore |
名称 |
自定义填写触发器名称。 |
Tablestore-trigger |
实例 |
在下拉列表中选择已创建的Tablestore实例。 |
distribute-test |
表格 |
在下拉列表中选择已创建的数据表。 |
source_data |
角色名称 |
选择AliyunTableStoreStreamNotificationRole。
说明 如果您第一次创建该类型的触发器,则需要单击确定后,在弹出的对话框中选择立即授权,并根据系统提示完成角色创建和授权。
|
AliyunTableStoreStreamNotificationRole |
- 单击创建。
创建好的触发器会自动显示在
触发器管理页签。
说明 您也可以在表格存储控制台中数据表的触发器管理页签,查看和创建Tablestore触发器。
步骤三:验证测试
创建触发器后,通过在表格存储中写入和查询数据验证数据清洗是否成功。
- 编写代码。
- 在函数管理页面,单击函数名称。
- 在函数详情页面,单击函数代码页签,在代码编辑器中编写代码。
此处以Python函数代码为例介绍。其中INSTANCE_NAME(表格存储的实例名称)、REGION(使用的区域)、ENDPOINT(服务地址)需要根据情况进行修改。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import cbor
import json
import tablestore as ots
INSTANCE_NAME = 'distribute-test'
REGION = 'cn-shanghai'
ENDPOINT = 'http://%s.%s.vpc.tablestore.aliyuncs.com'%(INSTANCE_NAME, REGION)
RESULT_TABLENAME = 'result'
def _utf8(input):
return str(bytearray(input, "utf-8"))
def get_attrbute_value(record, column):
attrs = record[u'Columns']
for x in attrs:
if x[u'ColumnName'] == column:
return x['Value']
def get_pk_value(record, column):
attrs = record[u'PrimaryKey']
for x in attrs:
if x['ColumnName'] == column:
return x['Value']
#由于已经授权了AliyunOTSFullAccess权限,此处获取的credentials具有访问表格存储的权限。
def get_ots_client(context):
creds = context.credentials
client = ots.OTSClient(ENDPOINT, creds.accessKeyId, creds.accessKeySecret, INSTANCE_NAME, sts_token = creds.securityToken)
return client
def save_to_ots(client, record):
id = int(get_pk_value(record, 'id'))
level = int(get_attrbute_value(record, 'level'))
msg = get_attrbute_value(record, 'message')
pk = [(_utf8('id'), id),]
attr = [(_utf8('level'), level), (_utf8('message'), _utf8(msg)),]
row = ots.Row(pk, attr)
client.put_row(RESULT_TABLENAME, row)
def handler(event, context):
records = cbor.loads(event)
#records = json.loads(event)
client = get_ots_client(context)
for record in records['Records']:
level = int(get_attrbute_value(record, 'level'))
if level > 1:
save_to_ots(client, record)
else:
print "Level <= 1, ignore."
- 向source_data数据表中写入数据,依次填入id、level和message信息,并在result表中查询清洗后的数据。
- 当向soure_data表中写入level>1的数据时,数据会同步到result表中。
- 当向soure_data表中写入level<=1的数据时,数据不会同步到result表中。