全部產品
Search
文件中心

Tablestore:Agent Memory SDK

更新時間:Jan 07, 2026

基於Tablestore的Agent Memory SDK架構,主要支援Memory和Knowledge情境,為AI Agent應用提供持久化、高效能的記憶儲存和語義檢索能力,協助開發人員快速構建具有上下文理解和長期記憶能力的智能應用。

核心架構

image

架構優勢

  • 輕量化設計:抽象通用儲存介面,降低業務開發複雜度,在技術深度與易用性之間實現平衡。開發人員無需直接處理底層資料庫介面調用,專註於商務邏輯開發即可快速產出結果。

  • 情境驅動設計:針對Memory即時記憶儲存和Knowledge長期語義檢索兩大核心情境,提供完整的解決方案。在滿足基礎儲存需求的同時,整合摘要記錄、事實資料提取、使用者畫像標籤挖掘等業務情境功能,實現儲存與應用的深度融合。

  • 業務價值驗證:基於成熟的業界最佳實務,開發人員無需進行複雜的技術調研,可直接在自有業務情境中快速驗證和落地AI應用的商業價值。

快速接入

以下通過Python樣本示範SDK的完整接入和使用流程,Java接入方式請參考使用說明

環境準備

確保已安裝Python運行環境,可通過python3 --version命令查看版本資訊。

安裝SDK

pip3 install tablestore-for-agent-memory

配置環境變數

設定以下必需的環境變數:

  • TABLESTORE_ACCESS_KEY_ID:阿里雲帳號或RAM使用者的AccessKey ID。

  • TABLESTORE_ACCESS_KEY_SECRET:阿里雲帳號或RAM使用者的AccessKey Secret。

  • TABLESTORE_INSTANCE_NAME:執行個體名稱,可在Table Store控制台擷取。

  • TABLESTORE_ENDPOINT:執行個體訪問地址,可在Table Store控制台擷取。

範例程式碼:Memory情境

Memory情境主要用於管理AI Agent的會話記憶,包括會話管理和訊息儲存等核心功能。以下樣本示範了如何建立會話、記錄對話以及查詢記錄的完整流程。

建立會話和寫入對話記錄

import tablestore
from tablestore_for_agent_memory.base.common import MetaType, microseconds_timestamp
from tablestore_for_agent_memory.memory.memory_store import MemoryStore
from tablestore_for_agent_memory.base.base_memory_store import Session, Message
import os


def main():
    endpoint = os.getenv('TABLESTORE_ENDPOINT')
    access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
    access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
    instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')

    required_env_vars = {
        'TABLESTORE_ENDPOINT': endpoint,
        'TABLESTORE_ACCESS_KEY_ID': access_key_id,
        'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
        'TABLESTORE_INSTANCE_NAME': instance_name
    }

    missing_vars = [var for var, value in required_env_vars.items() if not value]
    if missing_vars:
        print(f"錯誤: 缺少必需的環境變數: {', '.join(missing_vars)}")
        print("請設定以下環境變數:")
        for var in missing_vars:
            print(f"  export {var}=your_value")
        exit(1)

    tablestore_client = tablestore.OTSClient(
        endpoint,
        access_key_id,
        access_key_secret,
        instance_name,
        retry_policy=tablestore.WriteRetryPolicy(),
    )

    session_secondary_index_meta = {
        "meta_string": MetaType.STRING,
        "meta_long": MetaType.INTEGER,
        "meta_double": MetaType.DOUBLE,
        "meta_boolean": MetaType.BOOLEAN,
        "meta_bytes": MetaType.BINARY,
    }

    session_search_index_schema = [
        tablestore.FieldSchema(
            "title",
            tablestore.FieldType.TEXT,
            analyzer=tablestore.AnalyzerType.FUZZY,
            analyzer_parameter=tablestore.FuzzyAnalyzerParameter(1, 4),
        ),
        tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
        tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
        tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
    ]

    message_search_index_schema = [
        tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
        tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
        tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
    ]

    memory_store = MemoryStore(
        tablestore_client=tablestore_client,
        session_secondary_index_meta=session_secondary_index_meta,
        session_search_index_schema=session_search_index_schema,
        message_search_index_schema=message_search_index_schema,
    )

    print("開始建立表和索引...")
    try:
        memory_store.init_table()
        memory_store.init_search_index()
        print("表和索引建立成功")
    except Exception as e:
        print(f"表和索引已存在或建立失敗: {e}")

    print("\n====== 建立新會話 ======")

    session = Session(user_id="test_user_1", session_id="session_001")
    session.update_time = microseconds_timestamp()
    session.title = "Table Store諮詢"
    session.metadata = {
        "meta_string": "web_source",
        "meta_long": 1,
        "meta_double": 1.0,
        "meta_boolean": True,
        "model_name": "qwen-max"
    }

    memory_store.put_session(session)
    print(f"建立會話成功: user_id={session.user_id}, session_id={session.session_id}")

    print("\n====== 第一輪對話 ======")

    message_1 = Message(
        session_id="session_001",
        message_id="msg_001",
        create_time=microseconds_timestamp()
    )
    message_1.content = "你好,請幫我介紹一下Table Store(Tablestore)是什嗎?"
    message_1.metadata = {
        "meta_string": "web",
        "message_type": "user",
        "meta_long": 1
    }
    memory_store.put_message(message_1)
    print(f"使用者: {message_1.content}")

    session.update_time = microseconds_timestamp()
    memory_store.update_session(session)

    message_2 = Message(
        session_id="session_001",
        message_id="msg_002",
        create_time=microseconds_timestamp()
    )
    message_2.content = "Table Store(Tablestore)是阿里雲自研的第一代飛天產品,提供海量結構化資料存放區以及快速的查詢和分析服務。它支援多種資料模型,包括寬表模型、IM訊息模型、時序模型等,可以滿足不同情境的資料存放區需求。"
    message_2.metadata = {
        "message_type": "assistant",
        "model": "qwen-max"
    }
    memory_store.put_message(message_2)
    print(f"助手: {message_2.content}")

    print("\n====== 第二輪對話 ======")

    message_3 = Message(
        session_id="session_001",
        message_id="msg_003",
        create_time=microseconds_timestamp()
    )
    message_3.content = "Table Store有哪些典型的應用情境?"
    message_3.metadata = {
        "meta_string": "web",
        "message_type": "user",
        "meta_long": 2
    }
    memory_store.put_message(message_3)
    print(f"使用者: {message_3.content}")

    session.update_time = microseconds_timestamp()
    memory_store.update_session(session)

    message_4 = Message(
        session_id="session_001",
        message_id="msg_004",
        create_time=microseconds_timestamp()
    )
    message_4.content = """Table Store的典型應用情境包括:
1. AI Agent 記憶儲存:儲存知識庫、長期記憶、AI會話訊息等資訊
2. 中繼資料管理:儲存海量檔案、視頻、圖片的元資訊
3. 訊息資料:儲存IM聊天訊息、Feed流訊息等
4. 軌跡溯源:車連網軌跡、物流軌跡等時序資料
5. 科學巨量資料:氣象資料、基因資料等海量資料存放區
6. 推薦系統:使用者畫像、物品特徵等資料存放區
7. 風控系統:即時風控規則、歷史行為資料存放區"""
    message_4.metadata = {
        "message_type": "assistant",
        "model": "qwen-max"
    }
    memory_store.put_message(message_4)
    print(f"助手: {message_4.content}")

    print("\n====== 會話建立和對話完成 ======")
    print(f"會話ID: {session.session_id}")
    print(f"使用者ID: {session.user_id}")
    print(f"共完成 2 輪對話,4 條訊息")


if __name__ == "__main__":
    main()

查詢歷史會話列表

import tablestore
from tablestore_for_agent_memory.base.common import MetaType
from tablestore_for_agent_memory.memory.memory_store import MemoryStore
import os


def main():
    endpoint = os.getenv('TABLESTORE_ENDPOINT')
    access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
    access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
    instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')

    required_env_vars = {
        'TABLESTORE_ENDPOINT': endpoint,
        'TABLESTORE_ACCESS_KEY_ID': access_key_id,
        'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
        'TABLESTORE_INSTANCE_NAME': instance_name
    }

    missing_vars = [var for var, value in required_env_vars.items() if not value]
    if missing_vars:
        print(f"錯誤: 缺少必需的環境變數: {', '.join(missing_vars)}")
        print("請設定以下環境變數:")
        for var in missing_vars:
            print(f"  export {var}=your_value")
        exit(1)

    tablestore_client = tablestore.OTSClient(
        endpoint,
        access_key_id,
        access_key_secret,
        instance_name,
        retry_policy=tablestore.WriteRetryPolicy(),
    )

    session_secondary_index_meta = {
        "meta_string": MetaType.STRING,
        "meta_long": MetaType.INTEGER,
        "meta_double": MetaType.DOUBLE,
        "meta_boolean": MetaType.BOOLEAN,
        "meta_bytes": MetaType.BINARY,
    }

    session_search_index_schema = [
        tablestore.FieldSchema(
            "title",
            tablestore.FieldType.TEXT,
            analyzer=tablestore.AnalyzerType.FUZZY,
            analyzer_parameter=tablestore.FuzzyAnalyzerParameter(1, 4),
        ),
        tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
        tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
        tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
    ]

    message_search_index_schema = [
        tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
        tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
        tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
    ]

    memory_store = MemoryStore(
        tablestore_client=tablestore_client,
        session_secondary_index_meta=session_secondary_index_meta,
        session_search_index_schema=session_search_index_schema,
        message_search_index_schema=message_search_index_schema,
    )

    print("====== 查詢歷史會話列表 ======\n")

    user_id = "test_user_1"
    max_count = 10

    print(f"查詢使用者 {user_id} 的最近會話...")

    try:
        sessions = list(memory_store.list_recent_sessions(user_id=user_id, max_count=max_count))
        
        if not sessions:
            print(f"\n使用者 {user_id} 暫無歷史會話")
        else:
            print(f"\n共找到 {len(sessions)} 個會話:\n")
            
            for idx, session in enumerate(sessions, 1):
                print(f"會話 {idx}:")
                print(f"  - 會話ID: {session.session_id}")
                print(f"  - 使用者ID: {session.user_id}")
                print(f"  - 更新時間: {session.update_time if hasattr(session, 'update_time') else '未知'}")
                
                if session.metadata:
                    print(f"  - 中繼資料:")
                    for key, value in session.metadata.items():
                        print(f"      {key}: {value}")
                print()
                
    except Exception as e:
        print(f"查詢會話列表失敗: {e}")

    print("====== 查詢完成 ======")


if __name__ == "__main__":
    main()

查詢指定會話詳情

import tablestore
from tablestore_for_agent_memory.base.common import MetaType
from tablestore_for_agent_memory.memory.memory_store import MemoryStore
import os


def main():
    endpoint = os.getenv('TABLESTORE_ENDPOINT')
    access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
    access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
    instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')

    required_env_vars = {
        'TABLESTORE_ENDPOINT': endpoint,
        'TABLESTORE_ACCESS_KEY_ID': access_key_id,
        'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
        'TABLESTORE_INSTANCE_NAME': instance_name
    }

    missing_vars = [var for var, value in required_env_vars.items() if not value]
    if missing_vars:
        print(f"錯誤: 缺少必需的環境變數: {', '.join(missing_vars)}")
        print("請設定以下環境變數:")
        for var in missing_vars:
            print(f"  export {var}=your_value")
        exit(1)

    tablestore_client = tablestore.OTSClient(
        endpoint,
        access_key_id,
        access_key_secret,
        instance_name,
        retry_policy=tablestore.WriteRetryPolicy(),
    )

    session_secondary_index_meta = {
        "meta_string": MetaType.STRING,
        "meta_long": MetaType.INTEGER,
        "meta_double": MetaType.DOUBLE,
        "meta_boolean": MetaType.BOOLEAN,
        "meta_bytes": MetaType.BINARY,
    }

    session_search_index_schema = [
        tablestore.FieldSchema(
            "title",
            tablestore.FieldType.TEXT,
            analyzer=tablestore.AnalyzerType.FUZZY,
            analyzer_parameter=tablestore.FuzzyAnalyzerParameter(1, 4),
        ),
        tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
        tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
        tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
    ]

    message_search_index_schema = [
        tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
        tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
        tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
    ]

    memory_store = MemoryStore(
        tablestore_client=tablestore_client,
        session_secondary_index_meta=session_secondary_index_meta,
        session_search_index_schema=session_search_index_schema,
        message_search_index_schema=message_search_index_schema,
    )

    print("====== 查詢指定會話的詳情 ======\n")

    user_id = "test_user_1"
    session_id = "session_001"

    print(f"查詢會話詳情...")
    print(f"使用者ID: {user_id}")
    print(f"會話ID: {session_id}\n")

    try:
        session = memory_store.get_session(user_id=user_id, session_id=session_id)
        
        if session:
            print("會話詳細資料:")
            print("=" * 50)
            print(f"使用者ID: {session.user_id}")
            print(f"會話ID: {session.session_id}")
            print(f"更新時間: {session.update_time if hasattr(session, 'update_time') else '未知'}")
            
            if session.metadata:
                print("\n中繼資料資訊:")
                print("-" * 50)
                for key, value in session.metadata.items():
                    print(f"  {key}: {value}")
            else:
                print("\n中繼資料: 無")
                
            print("=" * 50)
        else:
            print(f"未找到指定的會話 (user_id={user_id}, session_id={session_id})")
            
    except Exception as e:
        print(f"查詢會話詳情失敗: {e}")
        import traceback
        traceback.print_exc()

    print("\n====== 查詢完成 ======")


if __name__ == "__main__":
    main()

查詢指定會話完整對話記錄

import tablestore
from tablestore_for_agent_memory.base.common import MetaType
from tablestore_for_agent_memory.memory.memory_store import MemoryStore
import os


def main():
    endpoint = os.getenv('TABLESTORE_ENDPOINT')
    access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
    access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
    instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')

    required_env_vars = {
        'TABLESTORE_ENDPOINT': endpoint,
        'TABLESTORE_ACCESS_KEY_ID': access_key_id,
        'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
        'TABLESTORE_INSTANCE_NAME': instance_name
    }

    missing_vars = [var for var, value in required_env_vars.items() if not value]
    if missing_vars:
        print(f"錯誤: 缺少必需的環境變數: {', '.join(missing_vars)}")
        print("請設定以下環境變數:")
        for var in missing_vars:
            print(f"  export {var}=your_value")
        exit(1)

    tablestore_client = tablestore.OTSClient(
        endpoint,
        access_key_id,
        access_key_secret,
        instance_name,
        retry_policy=tablestore.WriteRetryPolicy(),
    )

    session_secondary_index_meta = {
        "meta_string": MetaType.STRING,
        "meta_long": MetaType.INTEGER,
        "meta_double": MetaType.DOUBLE,
        "meta_boolean": MetaType.BOOLEAN,
        "meta_bytes": MetaType.BINARY,
    }

    session_search_index_schema = [
        tablestore.FieldSchema(
            "title",
            tablestore.FieldType.TEXT,
            analyzer=tablestore.AnalyzerType.FUZZY,
            analyzer_parameter=tablestore.FuzzyAnalyzerParameter(1, 4),
        ),
        tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
        tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
        tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
    ]

    message_search_index_schema = [
        tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
        tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
        tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
    ]

    memory_store = MemoryStore(
        tablestore_client=tablestore_client,
        session_secondary_index_meta=session_secondary_index_meta,
        session_search_index_schema=session_search_index_schema,
        message_search_index_schema=message_search_index_schema,
    )

    print("====== 查詢指定會話的完整對話記錄 ======\n")

    session_id = "session_001"

    print(f"查詢會話對話記錄...")
    print(f"會話ID: {session_id}\n")

    try:
        messages = list(memory_store.list_messages(session_id=session_id))
        
        if not messages:
            print(f"會話 {session_id} 暫無對話記錄")
        else:
            messages.sort(key=lambda m: m.create_time)
            
            print(f"共找到 {len(messages)} 條訊息\n")
            print("=" * 80)
            
            round_num = 0
            for idx, message in enumerate(messages):
                message_type = message.metadata.get("message_type", "unknown")
                
                if message_type == "user":
                    round_num += 1
                    print(f"\n第 {round_num} 輪對話:")
                    print("-" * 80)
                
                role = "使用者" if message_type == "user" else "助手"
                print(f"\n[{role}] (訊息ID: {message.message_id})")
                print(f"內容: {message.content}")
                print(f"建立時間: {message.create_time}")
                
                if message.metadata and len(message.metadata) > 1:
                    print("中繼資料:")
                    for key, value in message.metadata.items():
                        if key != "message_type":
                            print(f"  - {key}: {value}")
            
            print("\n" + "=" * 80)
            print(f"\n對話統計: 共 {round_num} 輪對話,{len(messages)} 條訊息")
            
    except Exception as e:
        print(f"查詢對話記錄失敗: {e}")
        import traceback
        traceback.print_exc()

    print("\n====== 查詢完成 ======")


if __name__ == "__main__":
    main()

範例程式碼:Knowledge情境

Knowledge情境專註於構建AI知識庫,支援海量文檔的向量化儲存和智能檢索。以下樣本展示如何建立知識庫、匯入文件,並通過向量檢索、全文檢索索引等方式實現智能問答。

範例程式碼使用阿里雲百鍊的text-embedding-v2模型進行向量化,需要先安裝相關依賴並將API Key配置為環境變數OPENAI_API_KEY
pip3 install openai

建立知識庫和寫入知識

資料寫入後,多元索引需要幾秒鐘完成同步。若使用後續範例程式碼查詢不到資料,需等待多元索引同步完成。

import tablestore
from tablestore_for_agent_memory.knowledge.knowledge_store import KnowledgeStore
from tablestore_for_agent_memory.base.base_knowledge_store import Document
from openai import OpenAI
import os


class OpenAIEmbedding:
    def __init__(self, api_key, base_url=None, model="text-embedding-v2", dimension=1536):
        self.client = OpenAI(
            api_key=api_key,
            base_url=base_url
        )
        self.model = model
        self.dimension = dimension
    
    def embedding(self, text):
        try:
            response = self.client.embeddings.create(
                model=self.model,
                input=text
            )
            return response.data[0].embedding
        except Exception as e:
            print(f"Embedding 調用異常: {e}")
            return None


def main():
    endpoint = os.getenv('TABLESTORE_ENDPOINT')
    access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
    access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
    instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')
    openai_api_key = os.getenv('OPENAI_API_KEY')

    required_env_vars = {
        'TABLESTORE_ENDPOINT': endpoint,
        'TABLESTORE_ACCESS_KEY_ID': access_key_id,
        'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
        'TABLESTORE_INSTANCE_NAME': instance_name,
        'OPENAI_API_KEY': openai_api_key
    }

    missing_vars = [var for var, value in required_env_vars.items() if not value]
    if missing_vars:
        print(f"錯誤: 缺少必需的環境變數: {', '.join(missing_vars)}")
        print("請設定以下環境變數:")
        for var in missing_vars:
            print(f"  export {var}=your_value")
        exit(1)

    tablestore_client = tablestore.OTSClient(
        endpoint,
        access_key_id,
        access_key_secret,
        instance_name,
        retry_policy=tablestore.WriteRetryPolicy(),
    )

    search_index_schema = [
        tablestore.FieldSchema("user_id", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("category", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
        tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
        tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
    ]

    base_url = "https://dashscope.aliyuncs.com/compatible-mode/v1"
    embedding_model = OpenAIEmbedding(
        api_key=openai_api_key,
        base_url=base_url,
        model="text-embedding-v2",
        dimension=1536
    )

    knowledge_store = KnowledgeStore(
        tablestore_client=tablestore_client,
        vector_dimension=1536,
        enable_multi_tenant=True,
        search_index_schema=search_index_schema,
    )

    print("開始建立表和索引...")
    try:
        knowledge_store.init_table()
        print("表和索引建立成功")
    except Exception as e:
        print(f"表和索引已存在或建立失敗: {e}")

    print("\n====== 寫入 Tablestore 知識庫文檔 ======\n")

    documents_data = [
        {
            "id": "doc_001",
            "text": "Table Store(Tablestore)是阿里雲自研的第一代飛天產品,提供海量結構化資料存放區以及快速的查詢和分析服務。Table Store的分布式儲存和強大的索引引擎能夠支援單表PB級儲存、千萬TPS以及毫秒級延遲的服務能力。",
            "category": "產品介紹",
            "meta_long": 1
        },
        {
            "id": "doc_002",
            "text": "Table Store支援寬表模型,單表支援PB級資料存放區和千萬QPS,適合儲存使用者畫像、訂單詳情等情境。同時支援時序模型,可以高效儲存和查詢物聯網裝置、監控系統產生的時序資料。",
            "category": "資料模型",
            "meta_long": 2
        },
        {
            "id": "doc_003",
            "text": "Table Store提供多種索引類型:主鍵索引支援快速的點查詢和範圍查詢;全域二級索引可以基於非主鍵列進行查詢;多元索引支援複雜的查詢條件組合和全文檢索索引;向量檢索支援 AI 情境的相似性搜尋。",
            "category": "索引功能",
            "meta_long": 3
        },
        {
            "id": "doc_004",
            "text": "Table Store適用於多種情境:中繼資料管理可以儲存海量檔案、視頻、圖片的元資訊;訊息資料用於儲存 IM 聊天訊息、Feed 流訊息;軌跡溯源儲存車連網軌跡、物流軌跡等時序資料;推薦系統儲存使用者畫像和物品特徵。",
            "category": "應用情境",
            "meta_long": 4
        },
        {
            "id": "doc_005",
            "text": "Table Store的多元索引支援豐富的查詢能力,包括精確查詢、範圍查詢、首碼查詢、萬用字元查詢、全文檢索索引、地理位置查詢、巢狀查詢等。同時支援排序、彙總、統計分析等進階功能。",
            "category": "查詢能力",
            "meta_long": 5
        },
        {
            "id": "doc_006",
            "text": "Table Store提供 Agent Memory 能力,包括 Memory Store 用於儲存會話和訊息記錄,Knowledge Store 用於儲存知識庫文檔並支援向量檢索。這些能力可以協助構建智能問答、對話機器人等 AI 應用。",
            "category": "AI 能力",
            "meta_long": 6
        },
        {
            "id": "doc_007",
            "text": "Table Store的向量檢索功能支援海量向量資料的儲存和高效檢索,可以應用於Image Search、語義搜尋、推薦系統等情境。支援 L2 距離、餘弦相似性等多種相似性演算法。",
            "category": "向量檢索",
            "meta_long": 7
        },
        {
            "id": "doc_008",
            "text": "Table Store提供多種資料保護機制:支援資料備份和恢複;提供資料生命週期管理,可以自動到期和刪除舊資料;支援資料加密儲存,保障資料安全。",
            "category": "資料保護",
            "meta_long": 8
        }
    ]

    tenant_id = "user_tablestore_001"
    success_count = 0

    for doc_data in documents_data:
        try:
            document = Document(document_id=doc_data["id"], tenant_id=tenant_id)
            document.text = doc_data["text"]
            
            document.embedding = embedding_model.embedding(document.text)
            
            if document.embedding is None:
                print(f"✗ 產生向量失敗,跳過文檔 {doc_data['id']}")
                continue
            
            document.metadata["category"] = doc_data["category"]
            document.metadata["meta_long"] = doc_data["meta_long"]
            document.metadata["meta_boolean"] = True
            document.metadata["user_id"] = tenant_id
            
            knowledge_store.put_document(document)
            
            success_count += 1
            print(f"✓ 寫入文檔 {doc_data['id']}: {doc_data['category']}")
            print(f"  內容: {doc_data['text'][:60]}...")
            print()
            
        except Exception as e:
            print(f"✗ 寫入文檔 {doc_data['id']} 失敗: {e}")

    print("=" * 80)
    print(f"\n寫入完成: 成功 {success_count}/{len(documents_data)} 條文檔")
    print(f"租戶ID: {tenant_id}")
    print(f"文檔類別: {', '.join(set([d['category'] for d in documents_data]))}")
    print("\n提示: 資料寫入後,多元索引可能需要幾秒鐘時間完成同步")


if __name__ == "__main__":
    main()

向量檢索

import tablestore
from tablestore_for_agent_memory.knowledge.knowledge_store import KnowledgeStore
from openai import OpenAI
import os


class OpenAIEmbedding:
    def __init__(self, api_key, base_url=None, model="text-embedding-v2", dimension=1536):
        self.client = OpenAI(
            api_key=api_key,
            base_url=base_url
        )
        self.model = model
        self.dimension = dimension
    
    def embedding(self, text):
        try:
            response = self.client.embeddings.create(
                model=self.model,
                input=text
            )
            return response.data[0].embedding
        except Exception as e:
            print(f"Embedding 調用異常: {e}")
            return None


def main():
    endpoint = os.getenv('TABLESTORE_ENDPOINT')
    access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
    access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
    instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')
    openai_api_key = os.getenv('OPENAI_API_KEY')

    required_env_vars = {
        'TABLESTORE_ENDPOINT': endpoint,
        'TABLESTORE_ACCESS_KEY_ID': access_key_id,
        'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
        'TABLESTORE_INSTANCE_NAME': instance_name,
        'OPENAI_API_KEY': openai_api_key
    }

    missing_vars = [var for var, value in required_env_vars.items() if not value]
    if missing_vars:
        print(f"錯誤: 缺少必需的環境變數: {', '.join(missing_vars)}")
        print("請設定以下環境變數:")
        for var in missing_vars:
            print(f"  export {var}=your_value")
        exit(1)

    tablestore_client = tablestore.OTSClient(
        endpoint,
        access_key_id,
        access_key_secret,
        instance_name,
        retry_policy=tablestore.WriteRetryPolicy(),
    )

    search_index_schema = [
        tablestore.FieldSchema("user_id", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("category", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
        tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
        tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
    ]

    base_url = "https://dashscope.aliyuncs.com/compatible-mode/v1"
    embedding_model = OpenAIEmbedding(
        api_key=openai_api_key,
        base_url=base_url,
        model="text-embedding-v2",
        dimension=1536
    )

    knowledge_store = KnowledgeStore(
        tablestore_client=tablestore_client,
        vector_dimension=1536,
        enable_multi_tenant=True,
        search_index_schema=search_index_schema,
    )

    print("====== 向量檢索測試 ======\n")

    query_text = "Table Store支援哪些索引類型?"
    tenant_id = "user_tablestore_001"

    print(f"查詢問題: {query_text}")
    print(f"租戶ID: {tenant_id}")
    print(f"返回結果數: Top 3\n")

    try:
        print("正在產生查詢向量...")
        query_vector = embedding_model.embedding(query_text)
        
        if query_vector is None:
            print("產生查詢向量失敗")
        else:
            print(f"查詢向量產生成功,維度: {len(query_vector)}\n")
            
            response = knowledge_store.vector_search(
                query_vector=query_vector,
                tenant_id=tenant_id,
                limit=3
            )
        
            if not response.hits:
                print("未找到相關文檔")
            else:
                print("=" * 80)
                print(f"找到 {len(response.hits)} 個相關文檔:\n")
                
                for idx, hit in enumerate(response.hits, 1):
                    doc = hit.document
                    score = hit.score
                    
                    print(f"【結果 {idx}】")
                    print(f"文檔ID: {doc.document_id}")
                    print(f"相似性分數: {score:.4f}")
                    
                    if hasattr(doc, 'metadata') and 'category' in doc.metadata:
                        print(f"類別: {doc.metadata['category']}")
                    
                    print(f"內容: {doc.text}")
                    print("-" * 80)
                
                print()
            
    except Exception as e:
        print(f"向量檢索失敗: {e}")
        import traceback
        traceback.print_exc()

    print("\n====== 檢索完成 ======")


if __name__ == "__main__":
    main()

全文檢索索引

import tablestore
from tablestore_for_agent_memory.knowledge.knowledge_store import KnowledgeStore, Filters
import os


def main():
    endpoint = os.getenv('TABLESTORE_ENDPOINT')
    access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
    access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
    instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')

    required_env_vars = {
        'TABLESTORE_ENDPOINT': endpoint,
        'TABLESTORE_ACCESS_KEY_ID': access_key_id,
        'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
        'TABLESTORE_INSTANCE_NAME': instance_name
    }

    missing_vars = [var for var, value in required_env_vars.items() if not value]
    if missing_vars:
        print(f"錯誤: 缺少必需的環境變數: {', '.join(missing_vars)}")
        print("請設定以下環境變數:")
        for var in missing_vars:
            print(f"  export {var}=your_value")
        exit(1)

    tablestore_client = tablestore.OTSClient(
        endpoint,
        access_key_id,
        access_key_secret,
        instance_name,
        retry_policy=tablestore.WriteRetryPolicy(),
    )

    search_index_schema = [
        tablestore.FieldSchema("user_id", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("category", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
        tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
        tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
    ]

    knowledge_store = KnowledgeStore(
        tablestore_client=tablestore_client,
        vector_dimension=1536,
        enable_multi_tenant=True,
        search_index_schema=search_index_schema,
    )

    print("====== 全文檢索索引測試 ======\n")

    query_keyword = "向量檢索"
    tenant_id = "user_tablestore_001"

    print(f"查詢關鍵詞: {query_keyword}")
    print(f"租戶ID: {tenant_id}")
    print(f"返回結果數: Top 3\n")

    try:
        response = knowledge_store.search_documents(
            tenant_id=tenant_id,
            metadata_filter=Filters.text_match("text", query_keyword),
            limit=3
        )
        
        if not response.hits:
            print("未找到包含關鍵詞的文檔")
        else:
            print("=" * 80)
            print(f"找到 {len(response.hits)} 個包含關鍵詞的文檔:\n")
            
            for idx, hit in enumerate(response.hits, 1):
                doc = hit.document
                score = hit.score
                
                print(f"【結果 {idx}】")
                print(f"文檔ID: {doc.document_id}")
                print(f"匹配分數: {score if score is not None else 'N/A'}")
                
                if hasattr(doc, 'metadata') and 'category' in doc.metadata:
                    print(f"類別: {doc.metadata['category']}")
                
                content = doc.text
                if query_keyword in content:
                    highlighted = content.replace(query_keyword, f"【{query_keyword}】")
                    print(f"內容: {highlighted}")
                else:
                    print(f"內容: {content}")
                
                print("-" * 80)
            
            print()
            
    except Exception as e:
        print(f"全文檢索索引失敗: {e}")
        import traceback
        traceback.print_exc()

    print("\n====== 檢索完成 ======")

    print("\n補充說明:")
    print("- 全文檢索索引會在文檔的 text 欄位中搜尋包含查詢關鍵詞的文檔")
    print("- 可以使用萬用字元、短語查詢等進階文法")
    print("- 支援中文分詞和模糊比對")


if __name__ == "__main__":
    main()

通用檢索

import tablestore
from tablestore_for_agent_memory.knowledge.knowledge_store import KnowledgeStore, Filters
import os


def main():
    endpoint = os.getenv('TABLESTORE_ENDPOINT')
    access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
    access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
    instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')

    required_env_vars = {
        'TABLESTORE_ENDPOINT': endpoint,
        'TABLESTORE_ACCESS_KEY_ID': access_key_id,
        'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
        'TABLESTORE_INSTANCE_NAME': instance_name
    }

    missing_vars = [var for var, value in required_env_vars.items() if not value]
    if missing_vars:
        print(f"錯誤: 缺少必需的環境變數: {', '.join(missing_vars)}")
        print("請設定以下環境變數:")
        for var in missing_vars:
            print(f"  export {var}=your_value")
        exit(1)

    tablestore_client = tablestore.OTSClient(
        endpoint,
        access_key_id,
        access_key_secret,
        instance_name,
        retry_policy=tablestore.WriteRetryPolicy(),
    )

    search_index_schema = [
        tablestore.FieldSchema("user_id", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("category", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
        tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
        tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
    ]

    knowledge_store = KnowledgeStore(
        tablestore_client=tablestore_client,
        vector_dimension=1536,
        enable_multi_tenant=True,
        search_index_schema=search_index_schema,
    )

    print("====== 通用檢索測試 ======\n")

    tenant_id = "user_tablestore_001"

    print("通用檢索支援基於中繼資料的靈活過濾查詢,不依賴向量或全文檢索索引")
    print(f"租戶ID: {tenant_id}")
    print(f"返回結果數: Top 3\n")

    print("【情境 1】查詢類別為 '應用情境' 的文檔")
    print("-" * 80)

    try:
        response = knowledge_store.search_documents(
            tenant_id=tenant_id,
            limit=3,
            metadata_filter=Filters.eq("category", "應用情境"),
            meta_data_to_get=["text", "category", "meta_long"]
        )
        
        if not response.hits:
            print("未找到匹配的文檔\n")
        else:
            for idx, hit in enumerate(response.hits, 1):
                doc = hit.document
                print(f"\n結果 {idx}:")
                print(f"  文檔ID: {doc.document_id}")
                print(f"  類別: {doc.metadata.get('category', 'N/A')}")
                print(f"  內容: {doc.text[:100]}...")
            print()
            
    except Exception as e:
        print(f"檢索失敗: {e}\n")

    print("\n【情境 2】查詢 meta_long > 3 且 meta_boolean = True 的文檔")
    print("-" * 80)

    try:
        response = knowledge_store.search_documents(
            tenant_id=tenant_id,
            limit=3,
            metadata_filter=Filters.logical_and([
                Filters.gt("meta_long", 3),
                Filters.eq("meta_boolean", True)
            ]),
            meta_data_to_get=["text", "category", "meta_long"]
        )
        
        if not response.hits:
            print("未找到匹配的文檔\n")
        else:
            for idx, hit in enumerate(response.hits, 1):
                doc = hit.document
                print(f"\n結果 {idx}:")
                print(f"  文檔ID: {doc.document_id}")
                print(f"  類別: {doc.metadata.get('category', 'N/A')}")
                print(f"  meta_long: {doc.metadata.get('meta_long', 'N/A')}")
                print(f"  內容: {doc.text[:80]}...")
            print()
            
    except Exception as e:
        print(f"檢索失敗: {e}\n")

    print("\n【情境 3】查詢 meta_long 在 2-5 之間的文檔")
    print("-" * 80)

    try:
        response = knowledge_store.search_documents(
            tenant_id=tenant_id,
            limit=3,
            metadata_filter=Filters.logical_and([
                Filters.gte("meta_long", 2),
                Filters.lte("meta_long", 5)
            ]),
            meta_data_to_get=["text", "category", "meta_long"]
        )
        
        if not response.hits:
            print("未找到匹配的文檔\n")
        else:
            for idx, hit in enumerate(response.hits, 1):
                doc = hit.document
                print(f"\n結果 {idx}:")
                print(f"  文檔ID: {doc.document_id}")
                print(f"  類別: {doc.metadata.get('category', 'N/A')}")
                print(f"  meta_long: {doc.metadata.get('meta_long', 'N/A')}")
                print(f"  內容: {doc.text[:80]}...")
            print()
            
    except Exception as e:
        print(f"檢索失敗: {e}\n")

    print("\n【情境 4】擷取所有文檔(不帶過濾條件)")
    print("-" * 80)

    try:
        response = knowledge_store.search_documents(
            tenant_id=tenant_id,
            limit=3,
            meta_data_to_get=["text", "category", "meta_long"]
        )
        
        if not response.hits:
            print("未找到任何文檔\n")
        else:
            print(f"\n共找到 {len(response.hits)} 個文檔(顯示前3個):")
            for idx, hit in enumerate(response.hits, 1):
                doc = hit.document
                print(f"\n結果 {idx}:")
                print(f"  文檔ID: {doc.document_id}")
                print(f"  類別: {doc.metadata.get('category', 'N/A')}")
                print(f"  內容: {doc.text[:60]}...")
            
            if response.next_token:
                print(f"\n還有更多結果,可使用 next_token 進行翻頁查詢")
            print()
            
    except Exception as e:
        print(f"檢索失敗: {e}\n")

    print("\n" + "=" * 80)
    print("\n====== 檢索完成 ======")

    print("\n通用檢索特點:")
    print("- 支援基於中繼資料欄位的靈活過濾")
    print("- 支援精確匹配、範圍查詢、邏輯組合等")
    print("- 不需要向量或全文檢索索引,適合結構化查詢")
    print("- 可以指定返回的欄位,減少資料轉送量")


if __name__ == "__main__":
    main()

相關文檔