All Products
Search
Document Center

Tablestore:Use Function Compute to cleanse data

Last Updated:Nov 11, 2024

Tablestore provides highly concurrent write performance and low storage cost and is suitable for storing IoT data, logs, and monitoring data. When you write data to a Tablestore data table, you can cleanse the data by using Function Compute and write the cleansed data to another data table in Tablestore. You can access raw data or cleansed data in Tablestore in real time.

Sample scenarios

You want to write log data that includes three fields to data table in Tablestore. To efficiently query the logs, you must write the logs in which the value of the level field is greater than 1 to another data table named result in Tablestore. The following table describes the fields that are included in the logs.

Field

Type

Description

id

Integer

The ID of the log.

level

Integer

The level of the log. A larger value indicates a higher level.

message

String

The content of the log.

Step 1: Enable the Stream feature for the data table

Before you create a trigger, you must enable the Stream feature for the data table in the Tablestore console to allow the function to process incremental data that is written to the table.

  1. Log on to the Tablestore console.

  2. In the top navigation bar, select a region.

  3. On the Overview page, click the name of the instance that you want to manage or click Manage Instance in the Actions column.

  4. On the Tables tab of the Instance Details tab, click the name of the data table that you want to manage and then click the Tunnels tab. Alternatively, you can click the fig_001 icon and then click Tunnels.

  5. On the Tunnels tab, click Enable in the Stream Information section.

  6. In the Enable Stream dialog box, configure the Log Expiration Time parameter and click Enable.

    The value of the Log Expiration Time parameter must be a non-zero integer. Unit: hours. Maximum value: 168.

    Important

    The Log Expiration Time parameter cannot be modified after it is specified. Proceed with caution.

Step 2: Create a function and Tablestore trigger

  1. Create a function.

    1. Log on to the Function Compute console.

    2. Optional: In the upper-right corner of the page, click Go to Function Compute 3.0.

      Note
      • Function Compute 3.0 provides various enhanced features. In this example, Function Compute 3.0 is used.

      • If Back to Function Compute 2.0 is displayed in the upper-right corner of the page, skip this step because you are already in the Function Compute 3.0 console.

    3. In the left-side navigation pane, click Functions.

    4. In the top navigation bar, select a region. On the Functions page, click Create Function.

    5. On the Create Function page, select a method to create a function, configure the following parameters, and then click Create.

      In this example, Event Function is selected to illustrate how to create a function to compute data modifications in Tablestore in real time.

      Note

      You can select Event Function, Web Function, or Task Function as the method that is used to create a function to process data in Tablestore. For more information, see Selection of methods to create functions.

      • If you want data processing to be automatically triggered by data modifications in Tablestore, select Event Function. For more information, see Create an event function.

      • If you want data processing to be automatically triggered by specific HTTP requests, select Web Function. For more information, see Create a web function.

      • If you want data processing to be periodically or asynchronously triggered, select Task Function. For more information, see Create a task function.

      • Basic Settings: Configure the Function Name parameter.

      • Code: Configure the runtime and code-related information of the function.

        Parameter

        Description

        Example

        Runtime

        Select a runtime, such as Python, Java, PHP, or Node.js, or Custom Container Image.

        Custom Container Image

        In this example, Python 3.9 is selected.

        Code Upload Method

        Specify how you want code to be uploaded to Function Compute.

        • Use Sample Code: You can select the sample code provided by Function Compute to create a function based on your business requirements. This is the default method.

        • Upload ZIP: Select and upload a .zip file that contains your function code.

        • Upload Folder: Select and upload the folder that contains your function code.

        • OSS: Upload code from an Object Storage Service (OSS) bucket. In this case, you must specify the Bucket Name and Object Name parameters.

        In this example, Use Sample Code and the Hello, world! sample code are selected.

      • Advanced Settings: Configure instance information and the function execution timeout period.

        Parameter

        Description

        Example

        Specifications

        Configure instance specifications, such as vCPU Capacity and Memory Capacity based on your business requirements. For more information about the billing of resources, see Billing overview.

        Note

        The ratio of vCPU specification to memory capacity (in GB) must range from 1:1 to 1:4.

        0.35 vCPUs, 512 MB

        Size of Temporary Disk

        Specify the size of the disk used to temporarily store files based on your business requirements.

        Valid values:

        • 512 MB: the default value. You are not charged for using a temporary disk of this size. Function Compute provides you with a free disk space of 512 MB.

        • 10 GB: You are charged based on the disk size of 9.5 GB.

        Note

        Data shares the space of the temporary disk and can be written to all directories in the disk.

        The lifecycle of the temporary disk is consistent with the lifecycle of the underlying instance. After the instance is recycled by the system, the data on the hard disk is cleared. To persist files, you can use File Storage NAS or OSS. For more information, see Configure a NAS file system and Configure an OSS file system.

        512 MB

        Execution Timeout Period

        Specify the timeout period for execution of the function. The default timeout period is 180 seconds, and the maximum timeout period is 86,400 seconds.

        180

        Handler

        Specify the handler of the function. The Function Compute runtime loads and invokes the handler to process requests. If you select Web Function to create a function, skip this parameter.

        Note

        If you set the Code Upload Method parameter to Use Sample Code, retain the value of the Handler parameter. If you select another code upload method, modify the value of the Handler parameter based on your business requirements. Otherwise, an error is reported when the function runs.

        index.handler

        Time Zone

        Select the time zone of the function. After you specify the time zone of the function, the environment variable TZ is automatically added to the function. The value is the time zone that you specify.

        UTC

        Function Role

        Specify the Resource Access Management (RAM) role of the function. Function Compute uses this role to generate a temporary AccessKey pair that is used to access your Alibaba Cloud resources and passes the AccessKey pair to your code.

        AliyunFCDefaultRole

        Access to VPC

        Specify whether to allow the function to access VPC resources. For more information, see Configure network settings.

        Yes

        VPC

        Specify the VPC. This parameter is required if you set Access to VPC to Yes. Create a VPC or select the ID of an existing VPC that you want the function to access from the drop-down list.

        fc.auto.create.vpc.1632317****

        vSwitch

        Specify the vSwitch. This parameter is required if you set Access to VPC to Yes. Create a vSwitch or select the ID of an existing vSwitch from the drop-down list.

        fc.auto.create.vswitch.vpc-bp1p8248****

        Security Group

        Specify the security group. This parameter is required if you set Access to VPC to Yes. Create a security group or select an existing security group from the drop-down list.

        fc.auto.create.SecurityGroup.vsw-bp15ftbbbbd****

        Allow Default NIC to Access Internet

        Specify whether to allow the function to access the Internet by using the default network interface controller (NIC). If you select No, the function cannot access the Internet by using the default NIC of Function Compute.

        Important

        If you use a static public IP address, you must set Allow Default NIC to Access Internet to No. Otherwise, the configured static public IP address does not take effect. For more information, see Configure static public IP addresses.

        Yes

        Logging

        Specify whether to enable the logging feature. Valid values:

        • Enable: Function Compute sends function execution logs to Simple Log Service for persistent storage. You can use these logs to debug code, analyze failures, and analyze data.

          Note

          After you enable the logging feature, logs that are printed to standard output (stdout) are collected by Simple Log Service. Then, you can use these logs to debug code, analyze failures, and analyze data.

        • Disable: You cannot use Simple Log Service to store and query function execution logs.

        Enable

      • (Optional) Environment Variables: Configure the environment variables in the runtime environment of the function. For more information, see Configure environment variables.

  2. Create a Tablestore trigger.

    1. On the Function Details tab, click the Configurations tab. In the left-side navigation pane, click Triggers and then click Create Trigger.

    2. In the Create Trigger panel, configure the parameters and click OK.

      Parameter

      Description

      Example

      Trigger Type

      The type of the trigger. Select Tablestore.

      Tablestore

      Name

      The name of the trigger.

      Tablestore-trigger

      Version or Alias

      The version or alias of the trigger. Default value: LATEST. If you want to create a trigger for another version or alias, select a version or alias from the Version or Alias drop-down list on the function details page. For more information about versions and aliases, see Manage versions and Manage aliases.

      LATEST

      Instance

      The name of the existing Tablestore instance.

      d00dd8xm****

      Table

      The name of the existing table.

      mytable

      Role Name

      Select AliyunTableStoreStreamNotificationRole.

      Note

      After you configure the preceding parameters, click OK. The first time you create a trigger of this type, click Authorize Now in the dialog box that appears.

      AliyunTableStoreStreamNotificationRole

      After the trigger is created, it is displayed on the Triggers tab. To modify or delete a trigger, see Trigger management.

Step 3: Verify data cleansing

After you create a trigger, you can write data to Tablestore and query the data to verify whether the data is cleansed as expected.

  1. On the Function Details tab, click the Code tab. On the Code tab, write the function code in the code editor.

    In this example, the function code is written in Python. You must modify the values of the INSTANCE_NAME, REGION, ENDPOINT, and RESULT_TABLENAME parameters in the sample code based on your business requirements.

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    import cbor
    import json
    import tablestore as ots
    
    INSTANCE_NAME = 'distribute-test'
    REGION = 'cn-shanghai'
    ENDPOINT = 'http://%s.%s.vpc.tablestore.aliyuncs.com' % (INSTANCE_NAME, REGION)
    RESULT_TABLENAME = 'result'
    
    
    def get_attrbute_value(record, column):
        attrs = record[u'Columns']
        for x in attrs:
            if x[u'ColumnName'] == column:
                return x['Value']
    
    
    def get_pk_value(record, column):
        attrs = record[u'PrimaryKey']
        for x in attrs:
            if x['ColumnName'] == column:
                return x['Value']
    
    
    # The obtained credentials can be used to access Tablestore because the AliyunOTSFullAccess policy is attached to the RAM role. 
    def get_ots_client(context):
        creds = context.credentials
        client = ots.OTSClient(ENDPOINT, creds.access_key_id, creds.access_key_secret, INSTANCE_NAME,
                               sts_token=creds.security_token)
        return client
    
    
    def save_to_ots(client, record):
        id = int(get_pk_value(record, 'id'))
        level = int(get_attrbute_value(record, 'level'))
        msg = get_attrbute_value(record, 'message')
        pk = [('id', id), ]
        attr = [('level', level), ('message', msg), ]
        row = ots.Row(pk, attr)
        client.put_row(RESULT_TABLENAME, row)
    
    
    def handler(event, context):
        records = cbor.loads(event)
        # records = json.loads(event)
        client = get_ots_client(context)
        for record in records['Records']:
            level = int(get_attrbute_value(record, 'level'))
            if level > 1:
                save_to_ots(client, record)
            else:
                print("level <= 1, ignore.")
    
  2. Write data to the table named source_data. Enter the values of the id, level, and message fields in sequence and query the cleansed data in the table named result.

    • When you write data in which the value of the level field is greater than 1 to the table named source_data, the data is synchronized to the table named result.

    • When you write data in which the value of the level field is less than or equal to 1 to the table named source_data, the data is not synchronized to the table named result.

FAQ

  • If you cannot create a Tablestore trigger in a region, check whether the region supports Tablestore triggers. For more information, see Usage notes.

  • If you cannot find an existing Tablestore data table when you create a Tablestore trigger, check whether the data table resides in the same region as the associated service in Function Compute.

  • In most cases, if an error that indicates a client cancels invocation is repeatedly reported when you use a Tablestore trigger, the timeout period configured for function execution on the client is shorter than the actual function execution duration. In this case, we recommend that you increase the client timeout period. For more information, see What do I do if the client is disconnected and the message "Invocation canceled by client" is reported?

  • If data is written to a Tablestore data table but the associated Tablestore trigger is not triggered, you can troubleshoot the issue by performing the following steps. For more information about how to troubleshoot trigger failures, see What do I do if a trigger cannot trigger function execution?

    • Make sure that the Stream feature is enabled for the data table. For more information, see Step 1: Enable the Stream feature for the data table.

    • Check whether the role is correctly configured when you create the trigger. You can use the default role AliyunTableStoreStreamNotificationRole. For more information, see Create a Tablestore trigger.

    • View the function execution logs to check whether the function failed to be executed. If a function fails to be executed, the function is retried until the log data in Tablestore expires.

  • If the "access_key_id is None or empty." error message is returned when a function is executed, check whether the role configured for the function is granted the permissions to access Tablestore. For more information, see Appendix: Grant Function Compute the permissions to access Tablestore.