A Tablestore trigger connects Tablestore — a distributed NoSQL data storage service built on the Apsara system — as an event source of Function Compute. When rows in a Tablestore data table are inserted, updated, or deleted, the trigger invokes your function with the incremental data encoded as an event — letting you build real-time data pipelines without managing polling infrastructure.
Common use case: A data source writes to Table A. The row change triggers a function that cleanses or transforms the data, then writes the result to Table B for downstream reads. The entire pipeline is serverless and scales automatically.
If a function invocation fails, Function Compute retries the invocation continuously until the Stream log data in Tablestore expires. To avoid reprocessing already-handled records, build idempotent functions and track processed record versions in your code.
How it works
Enable the Stream feature on a Tablestore data table to capture row-level changes as a stream of incremental records.
Create a Tablestore trigger in Function Compute and bind it to the table.
When a row changes, Tablestore encodes the incremental data in Concise Binary Object Representation (CBOR) format and passes it as the event parameter to your function.
Your function parses the event, inspects the operation type (
PutRow,UpdateRow, orDeleteRow), and processes the record accordingly.
Limits
Tablestore triggers are supported in the following regions: China (Beijing), China (Hangzhou), China (Shanghai), China (Shenzhen), China (Hong Kong), Japan (Tokyo), Singapore, and Germany (Frankfurt).
The Tablestore data table and the Function Compute service must be in the same region.
To access a function associated with a Tablestore trigger over an internal network, use the VPC endpoint format
{instance}.{region}.vpc.tablestore.aliyuncs.com. Do not use the Tablestore internal endpoint in this case.Function execution duration cannot exceed one minute.
Usage notes
Avoid invocation loops. If your function writes back to the same table that triggers it, the write creates another change event, which triggers the function again. Design your data flow so that function output goes to a different table.
Retry behavior. When a function invocation fails, the trigger retries indefinitely until the Stream log data expires. The log expiration period is set when you enable the Stream feature and cannot be changed afterward.
Prerequisites
Before you begin, make sure you have:
A Function Compute service and function. See Create a service and Create a function
A Tablestore instance and data table in the same region as your Function Compute service. See Create instances and Create tables
Step 1: Enable the Stream feature for the data table
The Stream feature captures row-level changes to a Tablestore table as incremental data. Enable it before creating the trigger.
Log on to the Tablestore console.
In the top navigation bar, select the region.
On the Overview page, click the instance name or click Manage Instance in the Actions column.
On the Instance Details tab, click the Tables tab. Click the name of the data table, then click the Tunnels tab. Alternatively, click the
icon and select Tunnels.On the Tunnels tab, click Enable in the Stream Information section.
In the Enable Stream dialog box, set the Log Expiration Time and click Enable. The value must be a non-zero integer in hours. Maximum: 168 hours.
The Log Expiration Time cannot be changed after you set it. Choose carefully.
Step 2: Create a Tablestore trigger
Log on to the Function Compute console.
In the left-side navigation pane, click Services & Functions.
In the top navigation bar, select the region.
On the Services page, find the service and click Functions in the Actions column.
Click the function name to open the function details page.
On the Triggers tab, select a version or alias from the Version or Alias drop-down list, then click Create Trigger.
In the Create Trigger panel, set the following parameters and click OK.
| Parameter | Description | Example |
|---|---|---|
| Trigger type | Select Tablestore. | Tablestore |
| Name | Enter a name for the trigger. | Tablestore-trigger |
| Version or alias | Defaults to LATEST. To bind to a specific version or alias, select it from the Version or Alias drop-down on the function details page. See Manage versions and Manage aliases. | LATEST |
| Instance | Select the Tablestore instance from the drop-down list. | d00dd8xm**** |
| Table | Select the data table from the drop-down list. | mytable |
| Role name | Select AliyunTableStoreStreamNotificationRole. | AliyunTableStoreStreamNotificationRole |
If this is your first time creating a Tablestore trigger, click Authorize Now in the prompt that appears to grant the required permissions.
After the trigger is created, it appears on the Triggers tab. To modify or delete a trigger, see Manage triggers.
Step 3: Understand the event format
The trigger encodes incremental data in CBOR format and passes it as the event parameter to your function. The Records[].Type field identifies the operation (PutRow, UpdateRow, or DeleteRow) — use this field to branch your processing logic for inserts, updates, and deletes.
The following examples show events for all three operation types.
PutRow (row inserted):
{
"Version": "Sync-v1",
"Records": [
{
"Type": "PutRow",
"Info": {
"Timestamp": 1506416585740836
},
"PrimaryKey": [
{
"ColumnName": "pk_0",
"Value": 1506416585881590900
},
{
"ColumnName": "pk_1",
"Value": "2017-09-26 17:03:05.8815909 +0800 CST"
},
{
"ColumnName": "pk_2",
"Value": 1506416585741000
}
],
"Columns": [
{
"Type": "Put",
"ColumnName": "attr_0",
"Value": "hello_table_store",
"Timestamp": 1506416585741
},
{
"Type": "Put",
"ColumnName": "attr_1",
"Value": 1506416585881590900,
"Timestamp": 1506416585741
}
]
}
]
}UpdateRow (row updated) and DeleteRow (row deleted):
{
"Version": "Sync-v1",
"Records": [
{
"Type": "UpdateRow",
"Info": {
"Timestamp": 1506416600000000
},
"PrimaryKey": [
{
"ColumnName": "pk_0",
"Value": 1506416585881590900
}
],
"Columns": [
{
"Type": "Put",
"ColumnName": "attr_0",
"Value": "updated_value",
"Timestamp": 1506416600000
},
{
"Type": "DeleteOneVersion",
"ColumnName": "attr_1",
"Timestamp": 1506416585741
}
]
},
{
"Type": "DeleteRow",
"Info": {
"Timestamp": 1506416610000000
},
"PrimaryKey": [
{
"ColumnName": "pk_0",
"Value": 9999999999999999
}
],
"Columns": [
{
"Type": "DeleteAllVersions",
"ColumnName": "attr_0",
"Timestamp": 1506416610000
}
]
}
]
}Event fields
| Field | Type | Description |
|---|---|---|
Version | String | Payload version. Example: Sync-v1. |
Records | Array | Incremental data rows captured from the table. |
Records[].Type | String | Operation type: PutRow, UpdateRow, or DeleteRow. Use this field to branch your processing logic. |
Records[].Info.Timestamp | Int64 | UTC timestamp of when the row was last modified. |
PrimaryKey[].ColumnName | String | Name of the primary key column. |
PrimaryKey[].Value | formatted_value | Primary key value. Can be Integer, String, or Blob. |
Columns[].Type | String | Attribute column operation: Put, DeleteOneVersion, or DeleteAllVersions. |
Columns[].ColumnName | String | Name of the attribute column. |
Columns[].Value | formatted_value | Attribute column value. Can be Integer, Boolean, Double, String, or Blob. |
Columns[].Timestamp | Int64 | UTC timestamp of when the attribute column was last modified. |
Step 4: Write and test the function
Configure test parameters
Before testing, configure a test event that simulates the Tablestore trigger payload.
On the function details page, click the Code tab.
Click the
icon next to Test Function and select Configure Test Parameters.In the Configure Test Parameters panel, select Create New Test Event or Modify Existing Test Event, set the Event Name, paste a sample event payload in the editor, and click OK.
Write the function code
The following examples parse the event, branch on the operation type, and extract primary key and attribute column values. Use Records[].Type to handle inserts, updates, and deletes differently.
Python:
import logging
import json
def get_attribute_value(record, column):
attrs = record[u'Columns']
for x in attrs:
if x[u'ColumnName'] == column:
return x['Value']
def get_pk_value(record, column):
attrs = record[u'PrimaryKey']
for x in attrs:
if x['ColumnName'] == column:
return x['Value']
def handler(event, context):
logger = logging.getLogger()
logger.info("Begin to handle event")
# The event is JSON-parsed from the CBOR-encoded payload
records = json.loads(event)
for record in records['Records']:
logger.info("Handle record: %s", record)
operation_type = record['Type'] # PutRow, UpdateRow, or DeleteRow
pk_0 = get_pk_value(record, "pk_0")
if operation_type == 'PutRow':
attr_0 = get_attribute_value(record, "attr_0")
logger.info("Insert: pk_0=%s, attr_0=%s", pk_0, attr_0)
elif operation_type == 'UpdateRow':
attr_0 = get_attribute_value(record, "attr_0")
logger.info("Update: pk_0=%s, attr_0=%s", pk_0, attr_0)
elif operation_type == 'DeleteRow':
logger.info("Delete: pk_0=%s", pk_0)
return 'OK'Java:
import com.aliyun.fc.runtime.Context;
import com.aliyun.fc.runtime.StreamRequestHandler;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.io.*;
import java.nio.charset.StandardCharsets;
public class TablestoreTriggerHandler implements StreamRequestHandler {
@Override
public void handleRequest(InputStream inputStream, OutputStream outputStream, Context context)
throws IOException {
String event = new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
JsonObject payload = new Gson().fromJson(event, JsonObject.class);
JsonArray records = payload.getAsJsonArray("Records");
for (int i = 0; i < records.size(); i++) {
JsonObject record = records.get(i).getAsJsonObject();
String type = record.get("Type").getAsString();
context.getLogger().info("Operation type: " + type);
// Add your per-operation logic here: PutRow, UpdateRow, or DeleteRow
}
outputStream.write("OK".getBytes(StandardCharsets.UTF_8));
}
}Node.js:
const handler = async (event, context) => {
const payload = JSON.parse(event.toString());
for (const record of payload.Records) {
const type = record.Type; // PutRow, UpdateRow, or DeleteRow
context.logger.info(`Operation type: ${type}`);
if (type === 'PutRow') {
// Handle insert
} else if (type === 'UpdateRow') {
// Handle update
} else if (type === 'DeleteRow') {
// Handle delete
}
}
return 'OK';
};
module.exports.handler = handler;Deploy and test
Enter or paste your function code in the code editor, then click Deploy.
Click Test Function to run the function with your configured test event.
View the execution result and logs on the Code tab.
Troubleshooting
The trigger is not created in a specific region. Check whether the region supports Tablestore triggers. For the supported region list, see Limits.
The Tablestore table does not appear when creating the trigger. The table must be in the same region as your Function Compute service. Verify that both resources are in the same region and try again.
The error "Invocation canceled by client" appears repeatedly. The client-side timeout is shorter than the function execution duration. Increase the client timeout. For details, see What do I do if the client is disconnected and the message "Invocation canceled by client" is reported?
Data is written to the table but the trigger does not fire. Work through the following checks:
Verify that the Stream feature is enabled on the table. See Step 1: Enable the Stream feature for the data table.
Verify that the trigger was created with the correct role (
AliyunTableStoreStreamNotificationRole). See Step 2: Create a Tablestore trigger.Check the function run logs to see whether the function is executing but failing. If the function fails, it retries until the Stream log data expires.
What's next
Manage triggers — modify or delete existing triggers
Manage versions — manage function versions and aliases