This document describes how to use Function Compute to cleanse the data in Table Store.
The highly concurrent write performance and low storage cost of Table Store make it suitable for Internet of Things (IoT) applications, including storing logs and monitoring data. You can write data to Table Store, simultaneously perform a simple cleansing of the newly added data in Function Compute, and write the cleansed data back to the Table Store result table. Meanwhile, you can access the original data and the result data in real time.
Assume that the data to be written is log data, which contains three fields:
|level||Integer||Log level (the greater the number, the higher the level)|
Logs of level>1 must be written to another data table for dedicated querying.
Creating instances and data tables
Create a Table Store instance in the Table Store console(this time using East China Node 2 distribute-test as an example), and create a source table(source_data)and a result table(result). The primary key is id (integer). Since Table Store uses a schemafree structure, other attribute column fields need not be predefined.
Taking source_data as an example, create as in the following diagram:
Enabling the streaming function of the data source table
The trigger function requires that the Stream function of the data table be enabled before the incremental data written in Table Store can be processed in Function Compute.
Expiration time of Stream records is the maximum time for incremental data to be read through the stream API.
Since the trigger can bind only existing functions, first create services and functions in the same region on the Function Compute console.
Create a Function Compute service
The following process describes creating services and processing functions on the Function Compute console, using East China Node 2 as an example.
- Create a service in East China Node 2.
- Create a function and select:
- The function name is: etl_test. Select the Python 2.7 environment, and edit code online
- The function entry is: etl_test.handler
- The code is edited later. Now click Next.
- Service authorization
Since Function Compute writes the running log to the log service and simultaneously reads and writes the Table Store data tables, Function Compute must have certain permissions. For convenience, first add AliyunOTSFullAccess and AliyunLogFullAccess permissions. In actual usage, we recommend that you add permissions based on the principle of least privilege.
- Click Complete authorization and create a function.
- Modify the function code.
After creating the function, click the corresponding
Run Code, then edit the code and save it. Modify INSTANCE_NAME (the Table Store instance name) and REGION (region used) as appropriate:
#! /usr/bin/env python # -*- coding: utf-8 -*- import cbor import json import tablestore as ots INSTANCE_NAME = 'distribute-test' REGION = 'cn-shanghai' ENDPOINT = 'http://%s.%s.ots-internal.aliyuncs.com'%(INSTANCE_NAME, REGION) RESULT_TABLENAME = 'result' def _utf8(input): return str(bytearray(input, "utf-8")) def get_attrbute_value(record, column): attrs = record[u'Columns'] for x in attrs: if x[u'ColumnName'] == column: return x['Value'] def get_pk_value(record, column): attrs = record[u'PrimaryKey'] for x in attrs: if x['ColumnName'] == column: return x['Value'] #Because the AliyunOTSFullAccess permission has been granted, the credentials obtained here are authorized to access Table Store. def get_ots_client(context): creds = context.credentials client = ots.OTSClient(ENDPOINT, creds.accessKeyId, creds.accessKeySecret, INSTANCE_NAME, sts_token = creds.securityToken) return client def save_to_ots(client, record): id = int(get_pk_value(record, 'id')) level = int(get_attrbute_value(record, 'level')) msg = get_attrbute_value(record, 'message') pk = [(_utf8('id'), id),] attr = [(_utf8('level'), level), (_utf8('message'), _utf8(msg)),] row = ots.Row(pk, attr) client.put_row(RESULT_TABLENAME, row) def handler(event, context): records = cbor.loads(event) #records = json.loads(event) client = get_ots_client(context) for record in records['Records']: level = int(get_attrbute_value(record, 'level')) if level > 1: save_to_ots(client, record) else: print "Level <= 1, ignore."
Binding a Trigger
- Go back to the instance management page of Table Store and click the Use Trigger button behind the source_data table to enter the trigger binding interface. Click Use an existing function, select the newly created service and function, and check the
Send event notification of Table Storepermission for confirmation.
- After binding is successful, you will see the following information:
- Write data to the source_data table.
In the Data Editor page of source_data , click Insert, and enter the id, level and message information in sequence as follows.
- Query the cleansed data from the result table
Click on the Data Editor page of the result table, where you can query the data that is newly written to source_data.
Data of level <= 1 written to soure_data is not synchronized to the result table.