Tablestore高並發的寫入效能以及低廉的儲存成本非常適合物聯網、日誌、監控資料的儲存。將資料寫入到Tablestore時,您可以通過Function Compute對新增的資料做簡單的清洗,將清洗後的資料寫回到Tablestore的另一種資料表中。同時,您也可以即時訪問Tablestore中的未經處理資料和結果資料。
範例情境
假設寫入Tablestore的為日誌資料,且日誌資料包括如下三個欄位。為了便於日誌查詢,您需要將level>1的日誌寫入到Tablestore的另一張資料表result中。
欄位名稱 |
類型 |
說明 |
id |
整型 |
日誌ID。 |
level |
整型 |
日誌的等級,數值越大表示日誌等級越高。 |
message |
字串 |
日誌的內容。 |
步驟一:為資料表開啟Stream功能
使用觸發器功能需要先在Tablestore控制台開啟資料表的Stream功能,才能在Function Compute中處理寫入Tablestore中的增量資料。
- 登入Tablestore控制台。
- 在概覽頁面,單擊執行個體名稱或在操作列單擊執行個體管理。
- 在執行個體詳情頁簽的資料表列表地區,單擊資料表名稱後選擇即時消費通道頁簽或單擊後選擇即時消費通道。
- 在即時消費通道頁簽,單擊Stream資訊對應的開啟。
- 在開啟Stream功能對話方塊,設定日誌到期時間長度,單擊開啟。
日誌到期時間長度取值為非零整數,單位為小時,最長時間長度為168小時。
说明 日誌到期時間長度設定後不能修改,請謹慎設定。
步驟二:配置Tablestore觸發器
在Function Compute控制台建立Tablestore觸發器來處理Tablestore資料表的即時資料流。
- 建立Function Compute的服務。
- 登入Function Compute控制台。
- 在左側導覽列,單擊服務及函數。
- 在頂部功能表列,選擇地區。
- 在服務列表頁面,單擊建立服務。
- 在建立服務面板,填寫服務名稱和描述,按需設定日誌與鏈路追蹤功能。
- 單擊確定。
建立完成後,在服務列表頁面,您可以查看到已建立的服務及其配置資訊。
- 建立Function Compute的函數。
- 在服務列表頁面,單擊目標服務名稱。
- 在函數管理頁面,單擊建立函數。
- 在建立函數頁面,選擇建立函數的方式為從零開始建立。
- 在基本設定地區,根據下表說明填寫函數基本資料。
參數 |
是否必填 |
說明 |
本文樣本 |
函數名稱 |
否 |
填寫自訂的函數名稱。必須以字母開頭,可包含數字、字母(區分大小寫)、底線(_)和短劃線(-),不超過64個字元。
说明 如果不填寫名稱,Function Compute會自動為您建立。
|
Function |
運行環境 |
是 |
選擇您熟悉的語言,例如Python、Java、PHP、Node.js等。Function Compute支援的運行環境,請參見t1881006.html#multiTask3514。
|
Python 3.6 |
請求處理常式類型 |
是 |
使用Tablestore觸發器時,必須設定為處理事件請求。
|
處理事件請求 |
執行個體類型 |
是 |
選擇適合您的執行個體類型。取值範圍如下:
更多資訊,請參見t1911640.html#concept_2542889。關於各種執行個體類型的計費詳情,請參見t1880954.html#concept_2557114。
|
彈性執行個體 |
記憶體規格 |
是 |
設定函數執行記憶體。
- 選擇輸入:在下拉式清單中選擇所需記憶體。
- 手動輸入:單擊手動輸入記憶體大小,可自訂函數執行記憶體。記憶體規格說明如下:
- 彈性執行個體:取值範圍[128, 3072],單位為MB。
- 效能執行個體:取值範圍[4, 32],單位為GB。
|
512 MB |
函數建立完成後,在函數管理頁面,即可查看已建立的函數。
- 在配置觸發器地區,根據下表說明填寫觸發器相關參數。
參數 |
操作 |
本文樣本 |
觸發器類型 |
選擇Tablestore。
|
|
名稱 |
自訂填寫觸發器名稱。 |
Tablestore-trigger |
執行個體 |
在下拉式清單中選擇已建立的Tablestore執行個體。 |
distribute-test |
表格 |
在下拉式清單中選擇已建立的資料表。 |
source_data |
角色名稱 |
選擇AliyunTableStoreStreamNotificationRole。
说明 如果您第一次建立該類型的觸發器,則需要單擊確定後,在彈出的對話方塊中選擇立即授權,並根據系統提示完成角色建立和授權。
|
AliyunTableStoreStreamNotificationRole |
- 單擊建立。
建立好的觸發器會自動顯示在
觸發器管理頁簽。
说明 您也可以在Tablestore控制台中資料表的觸發器管理頁簽,查看和建立Tablestore觸發器。
步驟三:驗證測試
建立觸發器後,通過在Tablestore中寫入和查詢資料驗證資料清洗是否成功。
- 編寫代碼。
- 在函數管理頁面,單擊函數名稱。
- 在函數詳情頁面,單擊函數代碼頁簽,在代碼編輯器中編寫代碼。
此處以Python函數代碼為例介紹。其中INSTANCE_NAME(Tablestore的執行個體名稱)、REGION(使用的地區)、ENDPOINT(服務地址)需要根據情況進行修改。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import cbor
import json
import tablestore as ots
INSTANCE_NAME = 'distribute-test'
REGION = 'cn-shanghai'
ENDPOINT = 'http://%s.%s.vpc.tablestore.aliyuncs.com'%(INSTANCE_NAME, REGION)
RESULT_TABLENAME = 'result'
def _utf8(input):
return str(bytearray(input, "utf-8"))
def get_attrbute_value(record, column):
attrs = record[u'Columns']
for x in attrs:
if x[u'ColumnName'] == column:
return x['Value']
def get_pk_value(record, column):
attrs = record[u'PrimaryKey']
for x in attrs:
if x['ColumnName'] == column:
return x['Value']
#由於已經授權了AliyunOTSFullAccess許可權,此處擷取的credentials具有訪問Tablestore的許可權。
def get_ots_client(context):
creds = context.credentials
client = ots.OTSClient(ENDPOINT, creds.accessKeyId, creds.accessKeySecret, INSTANCE_NAME, sts_token = creds.securityToken)
return client
def save_to_ots(client, record):
id = int(get_pk_value(record, 'id'))
level = int(get_attrbute_value(record, 'level'))
msg = get_attrbute_value(record, 'message')
pk = [(_utf8('id'), id),]
attr = [(_utf8('level'), level), (_utf8('message'), _utf8(msg)),]
row = ots.Row(pk, attr)
client.put_row(RESULT_TABLENAME, row)
def handler(event, context):
records = cbor.loads(event)
#records = json.loads(event)
client = get_ots_client(context)
for record in records['Records']:
level = int(get_attrbute_value(record, 'level'))
if level > 1:
save_to_ots(client, record)
else:
print "Level <= 1, ignore."
- 向source_data資料表中寫入資料,依次填入id、level和message資訊,並在result表中查詢清洗後的資料。
- 當向soure_data表中寫入level>1的資料時,資料會同步到result表中。
- 當向soure_data表中寫入level<=1的資料時,資料不會同步到result表中。