Tablestore provides highly concurrent write performance and low storage cost and is suitable for storing IoT data, logs, and monitoring data. When you write data to a Tablestore data table, you can cleanse the data by using Function Compute and write the cleansed data to another data table in Tablestore. You can access raw data or cleansed data in Tablestore in real time.

Sample scenarios

You want to write log data that includes three fields to Tablestore. To efficiently query the logs, you must write the logs in which the value of the level field is greater than 1 to another data table named result. The following table describes the fields that are included in the logs.

Field Type Description
id Integer The ID of the log.
level Integer The level of the log. A larger value indicates a higher level.
message String The content of the log.

Step 1: Enable the Stream feature for the data table

Before you create a trigger, you must enable the Stream feature for the data table in the Tablestore console to allow the function to process incremental data that is written to the table.

  1. Log on to the Tablestore console.
  2. On the Overview page, click the name of the instance that you want to manage or click Manage Instance in the Actions column of the instance that you want to manage.
  3. In the Tables section of the Instance details tab, click the name of the required data table and click the Tunnels tab. Alternatively, you can click the fig_001 icon and then click Tunnels.
  4. On the Tunnels tab, click Enabled in the Stream Information section.
  5. In the Enable Stream dialog box, configure the Log Expiration Time parameter and click Enabled.

    The value of the Log Expiration Time parameter must be a non-zero integer and cannot be changed after it is specified. Unit: hours. Maximum value: 168.

    Note Specify a value for the Log Expiration Time parameter based on your business requirements.

Step 2: Configure a Tablestore trigger

You can create a Tablestore trigger in the Function Compute console to process the real-time data stream in a Tablestore data table.

  1. Create a Function Compute service.
    1. Log on to the Function Compute console.
    2. In the left-side navigation pane, click Services & Functions.
    3. In the top navigation bar, select a region.
    4. On the Services page, click Create Service.
    5. In the Create Service panel, configure the Name and Description parameters and configure the Logging and Tracing Analysis parameters based on your business requirements.
      For more information about the parameters, see Manage microservices.
    6. Click OK.
      After you create the service, you can view the service and service configurations on the Services page.
  2. Create a Function Compute function.
    Note You can create a function from scratch, by using a container image, or by using a template. The following procedure describes how to create a function from scratch. For information about how to create a function by using other methods, see Use a container image to create a function and Use function templates to create functions.
    1. On the Services page, click the name of the service that you want to manage.
    2. In the left-side navigation pane, click Functions and then click Create Function.
    3. On the Create Function page, select Start from Scratch.
    4. In the Basic Settings section, configure the parameters. The following table describes the parameters.
      Parameter Required Description Example
      Function Name No Enter a name for the function. The name can be up to 64 characters in length and can contain digits, letters, underscores (_), and hyphens (-). The function name is case-sensitive and must start with a letter.
      Note If you leave this parameter empty, Function Compute automatically creates a name for your function.
      Function
      Runtime Environments Yes Select a language, for example, Python, Java, PHP, or Node.js. For information about the runtime environments that are supported by Function Compute, see Manage functions. Python 3.6
      Request Type Yes If you want to use a Tablestore trigger, select Event Requests. Event Requests
      Instance Category Yes Select an instance category for the function. Valid values:
      • Elastic Instance
      • Performance Instance

      For more information, see Instance types and instance modes. For information about the billing of each instance category, see Overview.

      Elastic Instance
      Memory Capacity Yes Specify the size of the memory that is required to execute the function by using one of the following methods:
      • Select a value: Select a value from the drop-down list.
      • Input a value: Click Enter Memory Size and enter a value for the memory size. Valid values:
        • Elastic Instance: [128, 3072]. Unit: MB.
        • Performance Instance: [4, 32]. Unit: GB.
        Note The value must be a multiple of 64 MB.
      512 MB

      After you create the function, you can view the function on the Functions page.

    5. In the Configure Trigger section, configure the parameters. The following table describes the parameters.
      Parameter Description Example
      Trigger Type Select Tablestore. Tablestore
      Name Enter a name for the trigger. Tablestore-trigger
      Instance Select a Tablestore instance from the drop-down list. distribute-test
      Table Select a data table from the drop-down list. source_data
      Role Name Select AliyunTableStoreStreamNotificationRole.
      Note After you configure the preceding parameters, click OK. The first time you create a trigger of this type, click Authorize Now in the message that appears, create the role, and assign permissions to the role as prompted.
      AliyunTableStoreStreamNotificationRole
    6. Click Create.
      The trigger that you created is displayed on the Triggers tab.
      Note You can also view and create Tablestore triggers on the Trigger tab of the table in the Tablestore console.

Step 3: Verify data cleansing

After you create a trigger, you can write data to Tablestore and query the data to verify whether the data is cleansed as expected.

  1. Write code.
    1. On the Functions page, click the name of the required function.
    2. On the function details page, click the Code tab to write code in the code editor.
      In this example, the function code is written in Python. Set the following parameters to actual values: INSTANCE_NAME, REGION, and ENDPOINT.
      #!/usr/bin/env python
      # -*- coding: utf-8 -*-
      import cbor
      import json
      import tablestore as ots
      INSTANCE_NAME = 'distribute-test'
      REGION = 'cn-shanghai'
      ENDPOINT = 'http://%s.%s.vpc.tablestore.aliyuncs.com'%(INSTANCE_NAME, REGION)
      RESULT_TABLENAME = 'result'
      def _utf8(input):
          return str(bytearray(input, "utf-8"))
      def get_attrbute_value(record, column):
          attrs = record[u'Columns']
          for x in attrs:
              if x[u'ColumnName'] == column:
                  return x['Value']
      def get_pk_value(record, column):
          attrs = record[u'PrimaryKey']
          for x in attrs:
              if x['ColumnName'] == column:
                  return x['Value']
      # The obtained credentials can be used to access Tablestore because the AliyunOTSFullAccess policy is attached to the role. 
      def get_ots_client(context):
          creds = context.credentials
          client = ots.OTSClient(ENDPOINT, creds.accessKeyId, creds.accessKeySecret, INSTANCE_NAME, sts_token = creds.securityToken)
          return client
      def save_to_ots(client, record):
          id = int(get_pk_value(record, 'id'))
          level = int(get_attrbute_value(record, 'level'))
          msg = get_attrbute_value(record, 'message')
          pk = [(_utf8('id'), id),]
          attr = [(_utf8('level'), level), (_utf8('message'), _utf8(msg)),]
          row = ots.Row(pk, attr)
          client.put_row(RESULT_TABLENAME, row)
      def handler(event, context):
          records = cbor.loads(event)
          #records = json.loads(event)
          client = get_ots_client(context)
          for record in records['Records']:
              level = int(get_attrbute_value(record, 'level'))
              if level > 1:
                  save_to_ots(client, record)
              else:
                  print "Level <= 1, ignore."
  2. Write data to the data table named source_data. Enter the values of the id, level, and message fields and query the cleansed data in the table named result.
    • When you write a log in which the value of the level field is greater than 1 to the source_data table, the log is synchronized to the result table.
    • When you write a log in which the value of the level field is less than or equal to 1 to the source_data table, the log is not synchronized to the result table.