Tablestore provides highly concurrent write performance and low storage cost and is
suitable for storing IoT data, logs, and monitoring data. When you write data to a
Tablestore data table, you can cleanse the data by using Function Compute and write
the cleansed data to another data table in Tablestore. You can access raw data or
cleansed data in Tablestore in real time.
Sample scenarios
You want to write log data that includes three fields to Tablestore. To efficiently
query the logs, you must write the logs in which the value of the level field is greater
than 1 to another data table named result. The following table describes the fields
that are included in the logs.
Field |
Type |
Description |
id |
Integer |
The ID of the log. |
level |
Integer |
The level of the log. A larger value indicates a higher level. |
message |
String |
The content of the log. |
Step 1: Enable the Stream feature for the data table
Before you create a trigger, you must enable the Stream feature for the data table
in the Tablestore console to allow the function to process incremental data that is
written to the table.
- Log on to the Tablestore console.
- On the Overview page, click the name of the instance that you want to manage or click Manage Instance in the Actions column of the instance that you want to manage.
- In the Tables section of the Instance details tab, click the name of the required data table and click the Tunnels tab. Alternatively, you can click the
icon and then click Tunnels.
- On the Tunnels tab, click Enabled in the Stream Information section.
- In the Enable Stream dialog box, configure the Log Expiration Time parameter and click Enabled.
The value of the Log Expiration Time parameter must be a non-zero integer and cannot
be changed after it is specified. Unit: hours. Maximum value: 168.
Note Specify a value for the Log Expiration Time parameter based on your business requirements.
Step 2: Configure a Tablestore trigger
You can create a Tablestore trigger in the Function Compute console to process the
real-time data stream in a Tablestore data table.
- Create a Function Compute service.
- Log on to the Function Compute console.
- In the left-side navigation pane, click Services & Functions.
- In the top navigation bar, select a region.
- On the Services page, click Create Service.
- In the Create Service panel, configure the Name and Description parameters and configure the Logging and
Tracing Analysis parameters based on your business requirements.
- Click OK.
After you create the service, you can view the service and service configurations
on the Services page.
- Create a Function Compute function.
- On the Services page, click the name of the service that you want to manage.
- In the left-side navigation pane, click Functions and then click Create Function.
- On the Create Function page, select Start from Scratch.
- In the Basic Settings section, configure the parameters. The following table describes the parameters.
Parameter |
Required |
Description |
Example |
Function Name |
No |
Enter a name for the function. The name can be up to 64 characters in length and can
contain digits, letters, underscores (_), and hyphens (-). The function name is case-sensitive
and must start with a letter.
Note If you leave this parameter empty, Function Compute automatically creates a name for
your function.
|
Function |
Runtime Environments |
Yes |
Select a language, for example, Python, Java, PHP, or Node.js. For information about
the runtime environments that are supported by Function Compute, see Manage functions.
|
Python 3.6 |
Request Type |
Yes |
If you want to use a Tablestore trigger, select Event Requests.
|
Event Requests |
Instance Category |
Yes |
Select an instance category for the function. Valid values:
- Elastic Instance
- Performance Instance
For more information, see Instance types and instance modes. For information about the billing of each instance category, see Overview.
|
Elastic Instance |
Memory Capacity |
Yes |
Specify the size of the memory that is required to execute the function by using one
of the following methods:
- Select a value: Select a value from the drop-down list.
- Input a value: Click Enter Memory Size and enter a value for the memory size. Valid values:
- Elastic Instance: [128, 3072]. Unit: MB.
- Performance Instance: [4, 32]. Unit: GB.
Note The value must be a multiple of 64 MB.
|
512 MB |
After you create the function, you can view the function on the Functions page.
- In the Configure Trigger section, configure the parameters. The following table describes the parameters.
Parameter |
Description |
Example |
Trigger Type |
Select Tablestore.
|
Tablestore |
Name |
Enter a name for the trigger. |
Tablestore-trigger |
Instance |
Select a Tablestore instance from the drop-down list. |
distribute-test |
Table |
Select a data table from the drop-down list. |
source_data |
Role Name |
Select AliyunTableStoreStreamNotificationRole.
Note After you configure the preceding parameters, click OK. The first time you create a trigger of this type, click Authorize Now in the message that appears, create the role, and assign permissions to the role
as prompted.
|
AliyunTableStoreStreamNotificationRole |
- Click Create.
The trigger that you created is displayed on the
Triggers tab.
Note You can also view and create Tablestore triggers on the Trigger tab of the table in the Tablestore console.
Step 3: Verify data cleansing
After you create a trigger, you can write data to Tablestore and query the data to
verify whether the data is cleansed as expected.
- Write code.
- On the Functions page, click the name of the required function.
- On the function details page, click the Code tab to write code in the code editor.
In this example, the function code is written in Python. Set the following parameters
to actual values: INSTANCE_NAME, REGION, and ENDPOINT.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import cbor
import json
import tablestore as ots
INSTANCE_NAME = 'distribute-test'
REGION = 'cn-shanghai'
ENDPOINT = 'http://%s.%s.vpc.tablestore.aliyuncs.com'%(INSTANCE_NAME, REGION)
RESULT_TABLENAME = 'result'
def _utf8(input):
return str(bytearray(input, "utf-8"))
def get_attrbute_value(record, column):
attrs = record[u'Columns']
for x in attrs:
if x[u'ColumnName'] == column:
return x['Value']
def get_pk_value(record, column):
attrs = record[u'PrimaryKey']
for x in attrs:
if x['ColumnName'] == column:
return x['Value']
# The obtained credentials can be used to access Tablestore because the AliyunOTSFullAccess policy is attached to the role.
def get_ots_client(context):
creds = context.credentials
client = ots.OTSClient(ENDPOINT, creds.accessKeyId, creds.accessKeySecret, INSTANCE_NAME, sts_token = creds.securityToken)
return client
def save_to_ots(client, record):
id = int(get_pk_value(record, 'id'))
level = int(get_attrbute_value(record, 'level'))
msg = get_attrbute_value(record, 'message')
pk = [(_utf8('id'), id),]
attr = [(_utf8('level'), level), (_utf8('message'), _utf8(msg)),]
row = ots.Row(pk, attr)
client.put_row(RESULT_TABLENAME, row)
def handler(event, context):
records = cbor.loads(event)
#records = json.loads(event)
client = get_ots_client(context)
for record in records['Records']:
level = int(get_attrbute_value(record, 'level'))
if level > 1:
save_to_ots(client, record)
else:
print "Level <= 1, ignore."
- Write data to the data table named source_data. Enter the values of the id, level,
and message fields and query the cleansed data in the table named result.
- When you write a log in which the value of the level field is greater than 1 to the
source_data table, the log is synchronized to the result table.
- When you write a log in which the value of the level field is less than or equal to
1 to the source_data table, the log is not synchronized to the result table.