Tablestore高並發的寫入效能以及低廉的儲存成本非常適合物聯網、日誌、監控資料的儲存。將資料寫入到Tablestore時,您可以通過Function Compute對新增的資料做簡單的清洗,將清洗後的資料寫回到Tablestore的另一種資料表中。同時,您也可以即時訪問Tablestore中的未經處理資料和結果資料。

範例情境

假設寫入Tablestore的為日誌資料,且日誌資料包括如下三個欄位。為了便於日誌查詢,您需要將level>1的日誌寫入到Tablestore的另一張資料表result中。

欄位名稱 類型 說明
id 整型 日誌ID。
level 整型 日誌的等級,數值越大表示日誌等級越高。
message 字串 日誌的內容。

步驟一:為資料表開啟Stream功能

使用觸發器功能需要先在Tablestore控制台開啟資料表的Stream功能,才能在Function Compute中處理寫入Tablestore中的增量資料。

  1. 登入Tablestore控制台
  2. 概覽頁面,單擊執行個體名稱或在操作列單擊執行個體管理
  3. 執行個體詳情頁簽的資料表列表地區,單擊資料表名稱後選擇即時消費通道頁簽或單擊fig_001後選擇即時消費通道
  4. 即時消費通道頁簽,單擊Stream資訊對應的開啟
  5. 開啟Stream功能對話方塊,設定日誌到期時間長度,單擊開啟

    日誌到期時間長度取值為非零整數,單位為小時,最長時間長度為168小時。

    说明 日誌到期時間長度設定後不能修改,請謹慎設定。

步驟二:配置Tablestore觸發器

在Function Compute控制台建立Tablestore觸發器來處理Tablestore資料表的即時資料流。

  1. 建立Function Compute的服務。
    1. 登入Function Compute控制台
    2. 在左側導覽列,單擊服務及函數
    3. 在頂部功能表列,選擇地區。
    4. 服務列表頁面,單擊建立服務
    5. 建立服務面板,填寫服務名稱和描述,按需設定日誌與鏈路追蹤功能。
      關於參數說明的更多資訊,請參見t1881003.html#multiTask427
    6. 單擊確定
      建立完成後,在服務列表頁面,您可以查看到已建立的服務及其配置資訊。
  2. 建立Function Compute的函數。
    说明 系統支援從零開始建立、使用容器鏡像建立、使用模板建立等方式建立函數。此處以從零開始建立方式為例介紹,其他建立方式,請分別參見使用容器鏡像建立函數t1881125.html#multiTask1250
    1. 服務列表頁面,單擊目標服務名稱。
    2. 函數管理頁面,單擊建立函數
    3. 建立函數頁面,選擇建立函數的方式為從零開始建立
    4. 基本設定地區,根據下表說明填寫函數基本資料。
      參數 是否必填 說明 本文樣本
      函數名稱 填寫自訂的函數名稱。必須以字母開頭,可包含數字、字母(區分大小寫)、底線(_)和短劃線(-),不超過64個字元。
      说明 如果不填寫名稱,Function Compute會自動為您建立。
      Function
      運行環境 選擇您熟悉的語言,例如Python、Java、PHP、Node.js等。Function Compute支援的運行環境,請參見t1881006.html#multiTask3514 Python 3.6
      請求處理常式類型 使用Tablestore觸發器時,必須設定為處理事件請求 處理事件請求
      執行個體類型 選擇適合您的執行個體類型。取值範圍如下:
      • 彈性執行個體
      • 效能執行個體

      更多資訊,請參見t1911640.html#concept_2542889。關於各種執行個體類型的計費詳情,請參見t1880954.html#concept_2557114

      彈性執行個體
      記憶體規格 設定函數執行記憶體。
      • 選擇輸入:在下拉式清單中選擇所需記憶體。
      • 手動輸入:單擊手動輸入記憶體大小,可自訂函數執行記憶體。記憶體規格說明如下:
        • 彈性執行個體:取值範圍[128, 3072],單位為MB。
        • 效能執行個體:取值範圍[4, 32],單位為GB。
        说明 輸入的記憶體必須為64 MB的倍數。
      512 MB

      函數建立完成後,在函數管理頁面,即可查看已建立的函數。

    5. 配置觸發器地區,根據下表說明填寫觸發器相關參數。
      參數 操作 本文樣本
      觸發器類型 選擇Tablestore
      名稱 自訂填寫觸發器名稱。 Tablestore-trigger
      執行個體 在下拉式清單中選擇已建立的Tablestore執行個體。 distribute-test
      表格 在下拉式清單中選擇已建立的資料表。 source_data
      角色名稱 選擇AliyunTableStoreStreamNotificationRole
      说明 如果您第一次建立該類型的觸發器,則需要單擊確定後,在彈出的對話方塊中選擇立即授權,並根據系統提示完成角色建立和授權。
      AliyunTableStoreStreamNotificationRole
    6. 單擊建立
      建立好的觸發器會自動顯示在觸發器管理頁簽。
      说明 您也可以在Tablestore控制台中資料表的觸發器管理頁簽,查看和建立Tablestore觸發器。

步驟三:驗證測試

建立觸發器後,通過在Tablestore中寫入和查詢資料驗證資料清洗是否成功。

  1. 編寫代碼。
    1. 函數管理頁面,單擊函數名稱。
    2. 在函數詳情頁面,單擊函數代碼頁簽,在代碼編輯器中編寫代碼。
      此處以Python函數代碼為例介紹。其中INSTANCE_NAME(Tablestore的執行個體名稱)、REGION(使用的地區)、ENDPOINT(服務地址)需要根據情況進行修改。
      #!/usr/bin/env python
      # -*- coding: utf-8 -*-
      import cbor
      import json
      import tablestore as ots
      INSTANCE_NAME = 'distribute-test'
      REGION = 'cn-shanghai'
      ENDPOINT = 'http://%s.%s.vpc.tablestore.aliyuncs.com'%(INSTANCE_NAME, REGION)
      RESULT_TABLENAME = 'result'
      def _utf8(input):
          return str(bytearray(input, "utf-8"))
      def get_attrbute_value(record, column):
          attrs = record[u'Columns']
          for x in attrs:
              if x[u'ColumnName'] == column:
                  return x['Value']
      def get_pk_value(record, column):
          attrs = record[u'PrimaryKey']
          for x in attrs:
              if x['ColumnName'] == column:
                  return x['Value']
      #由於已經授權了AliyunOTSFullAccess許可權,此處擷取的credentials具有訪問Tablestore的許可權。
      def get_ots_client(context):
          creds = context.credentials
          client = ots.OTSClient(ENDPOINT, creds.accessKeyId, creds.accessKeySecret, INSTANCE_NAME, sts_token = creds.securityToken)
          return client
      def save_to_ots(client, record):
          id = int(get_pk_value(record, 'id'))
          level = int(get_attrbute_value(record, 'level'))
          msg = get_attrbute_value(record, 'message')
          pk = [(_utf8('id'), id),]
          attr = [(_utf8('level'), level), (_utf8('message'), _utf8(msg)),]
          row = ots.Row(pk, attr)
          client.put_row(RESULT_TABLENAME, row)
      def handler(event, context):
          records = cbor.loads(event)
          #records = json.loads(event)
          client = get_ots_client(context)
          for record in records['Records']:
              level = int(get_attrbute_value(record, 'level'))
              if level > 1:
                  save_to_ots(client, record)
              else:
                  print "Level <= 1, ignore."
  2. 向source_data資料表中寫入資料,依次填入id、level和message資訊,並在result表中查詢清洗後的資料。
    • 當向soure_data表中寫入level>1的資料時,資料會同步到result表中。
    • 當向soure_data表中寫入level<=1的資料時,資料不會同步到result表中。