通過Function Compute訪問Tablestore,對錶格儲存增量資料進行Realtime Compute。
背景信息
Function Compute(Function Compute)是一個事件驅動的服務,通過Function Compute,無需管理伺服器等運行情況,只需編寫代碼並上傳。Function
Compute準備計算資源,並以Auto Scaling的方式運行使用者代碼,而您只需根據實際代碼運行所消耗的資源進行付費。更多資訊,請參見t1880953.html#concept_2259850。
Tablestore Stream是用於擷取Tablestore表中增量資料的一個資料通道,通過建立Tablestore觸發器,能夠實現Stream和Function
Compute的自動對接,讓計算函數中自訂的程式邏輯自動處理Tablestore表中發生的資料修改。
使用情境
如下圖所示,使用Function Compute可以實現如下任務。
- 資料同步:同步Tablestore中的即時資料到資料緩衝、搜尋引擎或者其他資料庫執行個體中。
- 資料歸檔:增量歸檔Tablestore中的資料到OSS等做冷備份。
- 事件驅動:利用觸發器觸發函數調用IoT套件、雲應用API或者做訊息通知等。
注意事項
請確保Tablestore資料表與Function Compute服務處於同一地區。
步驟一:為資料表開啟Stream功能
使用觸發器功能需要先在Tablestore控制台開啟資料表的Stream功能,才能在Function Compute中處理寫入Tablestore中的增量資料。
- 登入Tablestore控制台。
- 在概覽頁面,單擊執行個體名稱或在操作列單擊執行個體管理。
- 在執行個體詳情頁簽的資料表列表地區,單擊資料表名稱後選擇即時消費通道頁簽或單擊後選擇即時消費通道。
- 在即時消費通道頁簽,單擊Stream資訊對應的開啟。
- 在開啟Stream功能對話方塊,設定日誌到期時間長度,單擊開啟。
日誌到期時間長度取值為非零整數,單位為小時,最長時間長度為168小時。
说明 日誌到期時間長度設定後不能修改,請謹慎設定。
步驟二:配置Tablestore觸發器
在Function Compute控制台建立Tablestore觸發器來處理Tablestore資料表的即時資料流。
- 建立Function Compute的服務。
- 登入Function Compute控制台。
- 在左側導覽列,單擊服務及函數。
- 在頂部功能表列,選擇地區。
- 在服務列表頁面,單擊建立服務。
- 在建立服務面板,填寫服務名稱和描述,按需設定日誌與鏈路追蹤功能。
- 單擊確定。
建立完成後,在服務列表頁面,您可以查看到已建立的服務及其配置資訊。
- 建立Function Compute的函數。
- 在服務列表頁面,單擊目標服務名稱。
- 在函數管理頁面,單擊建立函數。
- 在建立函數頁面,選擇建立函數的方式為使用標準Runtime從零建立。
- 在基本設定地區,根據下表說明填寫函數基本資料。
參數 |
是否必填 |
說明 |
本文樣本 |
函數名稱 |
否 |
填寫自訂的函數名稱。
说明 如果不填寫名稱,Function Compute會自動為您建立。
|
Function |
運行環境 |
是 |
選擇您熟悉的語言,例如Python、Java、PHP、Node.js等。Function Compute支援的運行環境,請參見t1881006.html#multiTask3514。
|
Python 3.6 |
代碼上傳方式 |
否 |
預設選擇使用範例程式碼建立函數,函數建立完成後內建Function Compute平台為其配置的範例程式碼。您也可以選擇以下三種方式上傳您自己的代碼。
- 通過ZIP包上傳代碼:選擇函數代碼ZIP包。
- 通過檔案夾上傳代碼:選擇包含函數代碼的檔案夾。
- 通過OSS上傳代碼:選擇上傳函數代碼的Bucket名稱和檔案名稱。
|
使用範例程式碼 |
啟動命令 |
否 |
程式的啟動命令。如果不配置啟動命令,您需要在代碼的根目錄手動建立一個名稱為bootstrap的啟動指令碼,您的程式通過此指令碼來啟動。
说明 僅當您選擇使用自訂運行時平滑遷移Web Server時,需配置此參數。
|
npm run start |
監聽連接埠 |
是 |
您的代碼中的HTTP Server所監聽的連接埠。
说明 僅當您選擇使用自訂運行時平滑遷移Web Server時,需配置此參數。
|
9000 |
請求處理常式類型 |
是 |
選擇請求處理常式類型。取值範圍如下:
- 處理事件請求:通過定時器、調用API/SDK或其他阿里雲服務的觸發器來觸發函數執行。
- 處理HTTP請求:用於處理HTTP請求或Websocekt請求的函數。如果您的使用情境是Web情境,建議您使用自訂運行時平滑遷移Web Server。具體資訊,請參見。
使用Tablestore觸發器時,此參數必須設定為處理事件請求。
|
處理事件請求 |
執行個體類型 |
是 |
選擇適合您的執行個體類型。取值範圍如下:
更多資訊,請參見t1911640.html#concept_2542889。關於各種執行個體類型的計費詳情,請參見t1880954.html#concept_2557114。
|
彈性執行個體 |
記憶體規格 |
是 |
設定函數執行記憶體。
- 選擇輸入:在下拉式清單中選擇所需記憶體。
- 手動輸入:僅適用於彈性執行個體,單擊手動輸入記憶體大小,可自訂函數執行記憶體。取值範圍[128, 3072],單位為MB。
|
512 MB |
執行個體並發度 |
是 |
設定函數執行個體的並發度,具體資訊,請參見t1912490.html#task_2552850。
|
1 |
請求處理常式 |
是 |
佈建要求處理常式,Function Compute的運行時會載入並調用您的請求處理常式處理請求。 |
index.handler |
函數建立完成後,在函數管理頁面,即可查看已建立的函數。
- 在配置觸發器地區,根據下表說明填寫觸發器相關參數。
參數 |
操作 |
本文樣本 |
觸發器類型 |
選擇Tablestore。
说明 當請求處理常式類型選擇處理HTTP請求時,此參數預設為HTTP觸發器。
|
|
名稱 |
自訂填寫觸發器名稱。 |
Tablestore-trigger |
執行個體 |
在下拉式清單中選擇已建立的Tablestore執行個體。 |
distribute-test |
表格 |
在下拉式清單中選擇已建立的資料表。 |
source_data |
角色名稱 |
選擇AliyunTableStoreStreamNotificationRole。
说明 如果您第一次建立該類型的觸發器,則需要單擊確定後,在彈出的對話方塊中選擇立即授權,並根據系統提示完成角色建立和授權。
|
AliyunTableStoreStreamNotificationRole |
- 單擊建立。
建立好的觸發器會自動顯示在
觸發器管理頁簽。
说明 您也可以在Tablestore控制台中資料表的觸發器管理頁簽,查看和建立Tablestore觸發器。
步驟三:驗證測試
Function Compute支援函數的線上調試功能,您可以構建觸發的Event,並測試代碼邏輯是否符合期望。
由於Tablestore觸發函數服務的Event是CBOR格式,該資料格式是一種類似JSON的二進位格式,可以按照如下方法進行線上調試。
- 編寫代碼。
- 在函數管理頁面,單擊函數名稱。
- 在函數詳情頁面,單擊函數代碼頁簽,在代碼編輯器中編寫代碼。
在代碼中使用records = json.loads(event)
處理自訂的測試觸發事件。
import logging
import cbor
import json
def get_attribute_value(record, column):
attrs = record[u'Columns']
for x in attrs:
if x[u'ColumnName'] == column:
return x['Value']
def get_pk_value(record, column):
attrs = record[u'PrimaryKey']
for x in attrs:
if x['ColumnName'] == column:
return x['Value']
def handler(event, context):
logger = logging.getLogger()
logger.info("Begin to handle event")
#records = cbor.loads(event)
records = json.loads(event)
for record in records['Records']:
logger.info("Handle record: %s", record)
pk_0 = get_pk_value(record, "pk_0")
attr_0 = get_attribute_value(record, "attr_0")
return 'OK'
- 驗證測試。
- 在函數代碼頁簽,單擊測試函數右側的表徵圖後,從下拉式清單中,選擇配置測試參數。
- 在配置測試參數對話方塊,選擇建立新測試事件或編輯已有測試事件頁簽,選擇事件模板為Tablestore,並填寫事件名稱和事件內容。單擊確定。
- 單擊測試函數,查看是否符合期望。
當使用records=json.loads(event)
測試OK後,可以修改為records = cbor.loads(event)
並單擊部署代碼,當Tablestore中有資料寫入時,相關的函數邏輯就會觸發。
附錄:資料處理
配置測試參數時,需要按照特定的資料格式設定事件內容。
- 資料格式
Tablestore觸發器使用CBOR格式對增量資料進行編碼構成Function Compute的Event。增量資料的具體資料格式如下:
{
"Version": "string",
"Records": [
{
"Type": "string",
"Info": {
"Timestamp": int64
},
"PrimaryKey": [
{
"ColumnName": "string",
"Value": formated_value
}
],
"Columns": [
{
"Type": "string",
"ColumnName": "string",
"Value": formated_value,
"Timestamp": int64
}
]
}
]
}
- 成員定義
成員定義請參見下表。
參數 |
說明 |
Version |
payload版本號碼,目前為Sync-v1。類型為string。 |
Records |
資料表中的增量資料行數組。包含如下內部成員:
- Type:資料行類型,包含PutRow、UpdateRow和DeleteRow。類型為string。
- Info:包含Timestamp內部成員。Timestamp表示該行的最後修改UTC時間。類型為int64。
|
PrimaryKey |
主鍵列數組。包含如下內部成員:
- ColumnName:主鍵列名稱。類型為string。
- Value:主鍵列內容。類型為formated_value,支援integer、string和blob。
|
Columns |
屬性列數組。包括如下內部成員:
- Type:屬性列類型,包含Put、DeleteOneVersion和DeleteAllVersions。類型為string。
- ColumnName:屬性列名稱。類型為string。
- Value:屬性列內容。類型為formated_value,支援integer、boolean、double、string和blob。
- Timestamp:屬性列最後修改UTC時間。類型為int64。
|
- 資料樣本
{
"Version": "Sync-v1",
"Records": [
{
"Type": "PutRow",
"Info": {
"Timestamp": 1506416585740836
},
"PrimaryKey": [
{
"ColumnName": "pk_0",
"Value": 1506416585881590900
},
{
"ColumnName": "pk_1",
"Value": "2017-09-26 17:03:05.8815909 +0800 CST"
},
{
"ColumnName": "pk_2",
"Value": 1506416585741000
}
],
"Columns": [
{
"Type": "Put",
"ColumnName": "attr_0",
"Value": "hello_table_store",
"Timestamp": 1506416585741
},
{
"Type": "Put",
"ColumnName": "attr_1",
"Value": 1506416585881590900,
"Timestamp": 1506416585741
}
]
}
]
}
附錄:使用已有Function Compute建立Tablestore觸發器
在Tablestore控制台,使用已有Function Compute的服務以及服務下的函數建立Tablestore觸發器。
- 登入Tablestore控制台。
- 在概覽頁面,單擊執行個體名稱或在操作列單擊執行個體管理。
- 在執行個體詳情頁簽的資料表列表地區,單擊資料表名稱。
- 在觸發器管理頁簽,單擊使用已有Function Compute。
- 在建立觸發器對話方塊,選擇Function Compute服務以及服務下的函數,並填寫觸發器名稱,單擊確定。