全部產品
Search
文件中心

Tablestore:使用Function Compute清洗資料

更新時間:Nov 12, 2024

Table Store高並發的寫入效能以及低廉的儲存成本非常適合物聯網、日誌、監控資料的儲存。將資料寫入到Table Store時,您可以通過Function Compute對新增的資料做簡單的清洗,將清洗後的資料寫回到Table Store的另一種資料表中。同時,您也可以即時訪問Table Store中的未經處理資料和結果資料。

範例情境

假設寫入Table Store的為日誌資料,且日誌資料包括如下三個欄位。為了便於日誌查詢,您需要將level>1的日誌寫入到Table Store的另一張資料表result中。

欄位名稱

類型

說明

id

整型

日誌ID。

level

整型

日誌的等級,數值越大表示日誌等級越高。

message

字串

日誌的內容。

步驟一:為資料表開啟Stream功能

使用觸發器功能需要先在Table Store控制台開啟資料表的Stream功能,才能在Function Compute中處理寫入Table Store中的增量資料。

  1. 登入Table Store控制台

  2. 在頁面上方,選擇地區。

  3. 概覽頁面,單擊執行個體名稱或在執行個體操作列單擊執行個體管理

  4. 執行個體詳情頁簽的資料表列表頁簽,單擊資料表名稱後選擇即時消費通道頁簽或單擊fig_001後選擇即時消費通道

  5. 即時消費通道頁簽,單擊Stream資訊對應的開啟

  6. 開啟Stream功能對話方塊,設定日誌到期時間長度,單擊開啟

    日誌到期時間長度取值為非零整數,單位為小時,最長時間長度為168小時。

    重要

    日誌到期時間長度設定後不能修改,請謹慎設定。

步驟二:建立函數和Tablestore觸發器

  1. 建立函數。

    1. 登入Function Compute控制台

    2. 可選:在頁面右上方,單擊體驗Function Compute3.0

      說明
      • Function Compute3.0進行了多項功能改進,本文採用Function Compute3.0進行Function Compute的使用介紹。

      • 若您已進入新版控制台頁面(頁面右上方的按鈕為返回Function Compute2.0),則無需執行此操作。

    3. 在左側導覽列,單擊函數

    4. 在頂部功能表列,選擇地區,然後在函數頁面,單擊建立函數

    5. 建立函數頁面,按需選擇建立函數的方式,配置以下配置項,然後單擊建立

      此處以建立事件函數為例,介紹對錶格儲存中資料修改進行Realtime Compute的操作。

      說明

      使用Function Compute時,您可以通過建立事件函數Web函數任務函數實現對錶格儲存中資料的處理。更多資訊,請參見函數類型選型

      • 如果要Tablestore中的資料變更自動觸發資料處理,請建立事件函數。具體操作,請參見建立事件函數

      • 如果要通過特定HTTP請求觸發資料處理,請建立Web函數。具體操作,請參見建立Web函數

      • 如果要定時或非同步觸發資料處理,請建立任務函數。具體操作,請參見建立任務函數

      • 基本設定:設定函數名稱

      • 函數代碼:配置函數的運行環境和代碼相關資訊。

        配置項

        說明

        樣本

        運行環境

        選擇您熟悉的語言,例如Python、Java、PHP、Node.js或自訂容器等。

        自訂容器鏡像。

        此處選擇Python 3.9

        代碼上傳方式

        選擇代碼上傳到Function Compute的方式。

        • 使用範例程式碼:預設,您可以根據業務需要選擇Function Compute為您提供的建立函數的範例程式碼。

        • 通過 ZIP 包上傳代碼:選擇函數代碼ZIP包並上傳。

        • 通過檔案夾上傳代碼:選擇包含函數代碼的檔案夾並上傳。

        • 通過 OSS 上傳代碼:選擇上傳函數代碼的Bucket 名稱檔案名稱

        此處請選擇使用範例程式碼後,在範例程式碼列表中選擇Hello, world! 樣本

      • 進階配置:配置函數的執行個體相關資訊和函數執行逾時時間等。

        配置項

        說明

        樣本

        規格方案

        根據您的業務情況,選擇或手動輸入合理的vCPU規格記憶體規格組合。關於各資源使用的計費詳情,請參見計費概述

        說明

        vCPU大小(單位為核)與記憶體大小(單位為GB)的比例必須設定在1:1到1:4之間。

        0.35核,512 MB

        臨時硬碟大小

        根據您的業務情況,選擇臨時隱藏檔的硬碟大小。

        取值說明如下。

        • 512 MB:預設值。不計費,Function Compute為您提供512 MB以內的硬碟免費使用額度。

        • 10 GB:按9.5 GB進行計費。

        說明

        臨時硬碟中所有目錄可寫,共用臨時硬碟的空間。

        臨時硬碟大小與底層執行函數的執行個體生命週期一致,執行個體被系統回收後,硬碟上的資料也會消失。如您需要對檔案進行持久化儲存,可以選擇掛載NAS或OSS。具體操作,請參見配置NAS檔案系統配置OSSObject Storage Service

        512 MB

        執行逾時時間

        設定逾時時間。執行逾時時間預設為180秒,最長為86400秒。

        180

        請求處理常式

        佈建要求處理常式,Function Compute的運行時會載入並調用您的請求處理常式處理請求。建立函數的方式選擇Web函數時,無需設定此配置項。

        說明

        代碼上傳方式選擇使用範例程式碼時,不需要修改請求處理常式。當選擇其他代碼上傳方式時,則需要根據實際情況修改請求處理常式,否則函數執行時會報錯。

        index.handler

        時區

        選擇函數的時區。此處設定函數的時區後,將自動為函數添加一條環境變數TZ,其值為您設定的目標時區。

        UTC

        函數角色

        Function Compute平台會使用這個RAM角色來產生訪問您的阿里雲資源的臨時密鑰,並傳遞給您的代碼。

        重要

        需授予函數角色訪問Table Store服務的許可權。更多資訊,請參見附錄:授予Function Compute訪問Table Store的許可權

        AliyunFCDefaultRole

        允許訪問 VPC

        是否允許函數訪問VPC內資源。更多資訊,請參見配置網路

        專用網路

        允許訪問 VPC選擇時必填。建立新的VPC或在下拉式清單中選擇要訪問的VPC ID。

        fc.auto.create.vpc.1632317****

        交換器

        允許訪問 VPC選擇時必填。建立新的交換器或在下拉式清單中選擇交換器ID。

        fc.auto.create.vswitch.vpc-bp1p8248****

        安全性群組

        允許訪問 VPC選擇時必填。建立新的安全性群組或在下拉式清單中選擇安全性群組。

        fc.auto.create.SecurityGroup.vsw-bp15ftbbbbd****

        允許函數預設網卡訪問公網

        是否允許函數可以通過預設網卡訪問公網。關閉後,當前服務中的函數將無法通過Function Compute的預設網卡訪問公網。

        重要

        使用固定公網IP地址功能時,您必須關閉允許函數預設網卡訪問公網,否則配置的固定公網IP地址不生效。更多資訊,請參見配置固定公網IP地址

        日誌功能

        是否啟用阿里雲Log Service。取值說明如下:

        • 啟用:函數的執行日誌被持久化儲存到Log Service,方便您代碼調試、故障分析和資料分析等。

          說明

          啟用日誌功能後,函數中列印到 stdout 的內容就會被阿里雲Log Service採集到。然後您可以查看函數的執行日誌,從而方便您的代碼調試、故障分析、資料分析等操作。

        • 禁用:函數的執行日誌將無法通過Log Service儲存和查詢。

        啟用

      • (可選)環境變數:設定函數運行環境中的環境變數。更多資訊,請參見配置環境變數

  2. 建立Tablestore觸發器。

    1. 函數詳情頁簽,選擇配置頁簽,在左側導覽列,單擊觸發器,然後單擊建立觸發器

    2. 在建立觸發程序面板,填寫相關資訊,然後單擊確定

      配置項

      操作

      本文樣本

      觸發器類型

      選擇Table Store Tablestore

      Table StoreTablestore

      名稱

      自訂填寫觸發器名稱。

      Tablestore-trigger

      版本或別名

      預設值為LATEST,如果您需要建立其他版本或別名的觸發器,需先在函數詳情頁的版本或別名下拉式清單選擇該版本或別名。關於版本和別名的簡介,請參見版本管理別名管理

      LATEST

      執行個體

      在列表中選擇已建立的Tablestore執行個體。

      d00dd8xm****

      表格

      在列表中選擇已建立的表格。

      mytable

      角色名稱

      選擇AliyunTableStoreStreamNotificationRole

      說明

      如果您第一次建立該類型的觸發器,則需要在單擊確定後,在彈出的對話方塊中選擇立即授權

      AliyunTableStoreStreamNotificationRole

      建立完成後,在觸發器名稱列表中顯示已建立的觸發器。如需對建立的觸發器進行修改或刪除,具體操作,請參見觸發器管理

步驟三:驗證測試

建立觸發器後,通過在Table Store中寫入和查詢資料驗證資料清洗是否成功。

  1. 函數詳情頁簽的代碼頁簽,使用代碼編輯器中編寫代碼。

    此處以Python函數代碼為例介紹。其中INSTANCE_NAME(Table Store的執行個體名稱)、REGION(使用的地區)、ENDPOINT(服務地址)和RESULT_TABLENAME(結果表)需要根據情況進行修改。

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    import cbor
    import json
    import tablestore as ots
    
    INSTANCE_NAME = 'distribute-test'
    REGION = 'cn-shanghai'
    ENDPOINT = 'http://%s.%s.vpc.tablestore.aliyuncs.com' % (INSTANCE_NAME, REGION)
    RESULT_TABLENAME = 'result'
    
    
    def get_attrbute_value(record, column):
        attrs = record[u'Columns']
        for x in attrs:
            if x[u'ColumnName'] == column:
                return x['Value']
    
    
    def get_pk_value(record, column):
        attrs = record[u'PrimaryKey']
        for x in attrs:
            if x['ColumnName'] == column:
                return x['Value']
    
    
    # 由於已經授權了AliyunOTSFullAccess許可權,此處擷取的credentials具有訪問Table Store的許可權。
    def get_ots_client(context):
        creds = context.credentials
        client = ots.OTSClient(ENDPOINT, creds.access_key_id, creds.access_key_secret, INSTANCE_NAME,
                               sts_token=creds.security_token)
        return client
    
    
    def save_to_ots(client, record):
        id = int(get_pk_value(record, 'id'))
        level = int(get_attrbute_value(record, 'level'))
        msg = get_attrbute_value(record, 'message')
        pk = [('id', id), ]
        attr = [('level', level), ('message', msg), ]
        row = ots.Row(pk, attr)
        client.put_row(RESULT_TABLENAME, row)
    
    
    def handler(event, context):
        records = cbor.loads(event)
        # records = json.loads(event)
        client = get_ots_client(context)
        for record in records['Records']:
            level = int(get_attrbute_value(record, 'level'))
            if level > 1:
                save_to_ots(client, record)
            else:
                print("level <= 1, ignore.")
    
  2. 向source_data資料表中寫入資料,依次填入id、level和message資訊,並在result表中查詢清洗後的資料。

    • 當向source_data表中寫入level>1的資料時,資料會同步到result表中。

    • 當向source_data表中寫入level<=1的資料時,資料不會同步到result表中。

常見問題

  • 如果您無法在某一地區建立Tablestore觸發器,請確認支援建立Tablestore觸發器的地區,具體請參見注意事項

  • 如果您在建立Tablestore觸發器時無法找到已經建立好的Table Store資料表,請確認Table Store資料表與Function Compute服務是否處於同一地區。

  • 使用Tablestore觸發器時,總是會報用戶端取消的報錯,一般是由於用戶端調用函數時設定的逾時時間小於函數執行時間。建議您將用戶端逾時時間調大,具體請參見用戶端中斷連線,報錯Invocation canceled by client怎麼辦?

  • 如果Tablestore資料表中有新增的資料,但是Tablestore觸發器沒有被觸發,您可以從以下方面進行排查。關於觸發器不能正常觸發的詳細排查方案可參見觸發器不能正常觸發函數執行怎麼辦?

    • 確認資料表是否開啟了Stream功能,具體請參見為資料表開啟Stream功能

    • 確認在建立觸發器時配置的角色是否正確,您可以使用預設的觸發器角色AliyunTableStoreStreamNotificationRole,具體請參見建立Tablestore觸發器

    • 查看是否有函數作業記錄,可以根據日誌確認是否是函數執行失敗。函數執行失敗後,會一直重試直到Tablestore中的日誌資料到期。

  • 如果函數執行時報錯“access_key_id is None or empty.”,請確認配置的函數角色是否擁有訪問Table Store的許可權,具體請參見附錄:授予Function Compute訪問Table Store的許可權