基於Tablestore的Agent Memory SDK架構,主要支援Memory和Knowledge情境,為AI Agent應用提供持久化、高效能的記憶儲存和語義檢索能力,協助開發人員快速構建具有上下文理解和長期記憶能力的智能應用。
核心架構
架構優勢
輕量化設計:抽象通用儲存介面,降低業務開發複雜度,在技術深度與易用性之間實現平衡。開發人員無需直接處理底層資料庫介面調用,專註於商務邏輯開發即可快速產出結果。
情境驅動設計:針對Memory即時記憶儲存和Knowledge長期語義檢索兩大核心情境,提供完整的解決方案。在滿足基礎儲存需求的同時,整合摘要記錄、事實資料提取、使用者畫像標籤挖掘等業務情境功能,實現儲存與應用的深度融合。
業務價值驗證:基於成熟的業界最佳實務,開發人員無需進行複雜的技術調研,可直接在自有業務情境中快速驗證和落地AI應用的商業價值。
快速接入
以下通過Python樣本示範SDK的完整接入和使用流程,Java接入方式請參考使用說明。
環境準備
確保已安裝Python運行環境,可通過python3 --version命令查看版本資訊。
安裝SDK
pip3 install tablestore-for-agent-memory配置環境變數
設定以下必需的環境變數:
TABLESTORE_ACCESS_KEY_ID:阿里雲帳號或RAM使用者的AccessKey ID。TABLESTORE_ACCESS_KEY_SECRET:阿里雲帳號或RAM使用者的AccessKey Secret。TABLESTORE_INSTANCE_NAME:執行個體名稱,可在Table Store控制台擷取。TABLESTORE_ENDPOINT:執行個體訪問地址,可在Table Store控制台擷取。
範例程式碼:Memory情境
Memory情境主要用於管理AI Agent的會話記憶,包括會話管理和訊息儲存等核心功能。以下樣本示範了如何建立會話、記錄對話以及查詢記錄的完整流程。
建立會話和寫入對話記錄
import tablestore
from tablestore_for_agent_memory.base.common import MetaType, microseconds_timestamp
from tablestore_for_agent_memory.memory.memory_store import MemoryStore
from tablestore_for_agent_memory.base.base_memory_store import Session, Message
import os
def main():
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')
required_env_vars = {
'TABLESTORE_ENDPOINT': endpoint,
'TABLESTORE_ACCESS_KEY_ID': access_key_id,
'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
'TABLESTORE_INSTANCE_NAME': instance_name
}
missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
print(f"錯誤: 缺少必需的環境變數: {', '.join(missing_vars)}")
print("請設定以下環境變數:")
for var in missing_vars:
print(f" export {var}=your_value")
exit(1)
tablestore_client = tablestore.OTSClient(
endpoint,
access_key_id,
access_key_secret,
instance_name,
retry_policy=tablestore.WriteRetryPolicy(),
)
session_secondary_index_meta = {
"meta_string": MetaType.STRING,
"meta_long": MetaType.INTEGER,
"meta_double": MetaType.DOUBLE,
"meta_boolean": MetaType.BOOLEAN,
"meta_bytes": MetaType.BINARY,
}
session_search_index_schema = [
tablestore.FieldSchema(
"title",
tablestore.FieldType.TEXT,
analyzer=tablestore.AnalyzerType.FUZZY,
analyzer_parameter=tablestore.FuzzyAnalyzerParameter(1, 4),
),
tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]
message_search_index_schema = [
tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]
memory_store = MemoryStore(
tablestore_client=tablestore_client,
session_secondary_index_meta=session_secondary_index_meta,
session_search_index_schema=session_search_index_schema,
message_search_index_schema=message_search_index_schema,
)
print("開始建立表和索引...")
try:
memory_store.init_table()
memory_store.init_search_index()
print("表和索引建立成功")
except Exception as e:
print(f"表和索引已存在或建立失敗: {e}")
print("\n====== 建立新會話 ======")
session = Session(user_id="test_user_1", session_id="session_001")
session.update_time = microseconds_timestamp()
session.title = "Table Store諮詢"
session.metadata = {
"meta_string": "web_source",
"meta_long": 1,
"meta_double": 1.0,
"meta_boolean": True,
"model_name": "qwen-max"
}
memory_store.put_session(session)
print(f"建立會話成功: user_id={session.user_id}, session_id={session.session_id}")
print("\n====== 第一輪對話 ======")
message_1 = Message(
session_id="session_001",
message_id="msg_001",
create_time=microseconds_timestamp()
)
message_1.content = "你好,請幫我介紹一下Table Store(Tablestore)是什嗎?"
message_1.metadata = {
"meta_string": "web",
"message_type": "user",
"meta_long": 1
}
memory_store.put_message(message_1)
print(f"使用者: {message_1.content}")
session.update_time = microseconds_timestamp()
memory_store.update_session(session)
message_2 = Message(
session_id="session_001",
message_id="msg_002",
create_time=microseconds_timestamp()
)
message_2.content = "Table Store(Tablestore)是阿里雲自研的第一代飛天產品,提供海量結構化資料存放區以及快速的查詢和分析服務。它支援多種資料模型,包括寬表模型、IM訊息模型、時序模型等,可以滿足不同情境的資料存放區需求。"
message_2.metadata = {
"message_type": "assistant",
"model": "qwen-max"
}
memory_store.put_message(message_2)
print(f"助手: {message_2.content}")
print("\n====== 第二輪對話 ======")
message_3 = Message(
session_id="session_001",
message_id="msg_003",
create_time=microseconds_timestamp()
)
message_3.content = "Table Store有哪些典型的應用情境?"
message_3.metadata = {
"meta_string": "web",
"message_type": "user",
"meta_long": 2
}
memory_store.put_message(message_3)
print(f"使用者: {message_3.content}")
session.update_time = microseconds_timestamp()
memory_store.update_session(session)
message_4 = Message(
session_id="session_001",
message_id="msg_004",
create_time=microseconds_timestamp()
)
message_4.content = """Table Store的典型應用情境包括:
1. AI Agent 記憶儲存:儲存知識庫、長期記憶、AI會話訊息等資訊
2. 中繼資料管理:儲存海量檔案、視頻、圖片的元資訊
3. 訊息資料:儲存IM聊天訊息、Feed流訊息等
4. 軌跡溯源:車連網軌跡、物流軌跡等時序資料
5. 科學巨量資料:氣象資料、基因資料等海量資料存放區
6. 推薦系統:使用者畫像、物品特徵等資料存放區
7. 風控系統:即時風控規則、歷史行為資料存放區"""
message_4.metadata = {
"message_type": "assistant",
"model": "qwen-max"
}
memory_store.put_message(message_4)
print(f"助手: {message_4.content}")
print("\n====== 會話建立和對話完成 ======")
print(f"會話ID: {session.session_id}")
print(f"使用者ID: {session.user_id}")
print(f"共完成 2 輪對話,4 條訊息")
if __name__ == "__main__":
main()查詢歷史會話列表
import tablestore
from tablestore_for_agent_memory.base.common import MetaType
from tablestore_for_agent_memory.memory.memory_store import MemoryStore
import os
def main():
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')
required_env_vars = {
'TABLESTORE_ENDPOINT': endpoint,
'TABLESTORE_ACCESS_KEY_ID': access_key_id,
'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
'TABLESTORE_INSTANCE_NAME': instance_name
}
missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
print(f"錯誤: 缺少必需的環境變數: {', '.join(missing_vars)}")
print("請設定以下環境變數:")
for var in missing_vars:
print(f" export {var}=your_value")
exit(1)
tablestore_client = tablestore.OTSClient(
endpoint,
access_key_id,
access_key_secret,
instance_name,
retry_policy=tablestore.WriteRetryPolicy(),
)
session_secondary_index_meta = {
"meta_string": MetaType.STRING,
"meta_long": MetaType.INTEGER,
"meta_double": MetaType.DOUBLE,
"meta_boolean": MetaType.BOOLEAN,
"meta_bytes": MetaType.BINARY,
}
session_search_index_schema = [
tablestore.FieldSchema(
"title",
tablestore.FieldType.TEXT,
analyzer=tablestore.AnalyzerType.FUZZY,
analyzer_parameter=tablestore.FuzzyAnalyzerParameter(1, 4),
),
tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]
message_search_index_schema = [
tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]
memory_store = MemoryStore(
tablestore_client=tablestore_client,
session_secondary_index_meta=session_secondary_index_meta,
session_search_index_schema=session_search_index_schema,
message_search_index_schema=message_search_index_schema,
)
print("====== 查詢歷史會話列表 ======\n")
user_id = "test_user_1"
max_count = 10
print(f"查詢使用者 {user_id} 的最近會話...")
try:
sessions = list(memory_store.list_recent_sessions(user_id=user_id, max_count=max_count))
if not sessions:
print(f"\n使用者 {user_id} 暫無歷史會話")
else:
print(f"\n共找到 {len(sessions)} 個會話:\n")
for idx, session in enumerate(sessions, 1):
print(f"會話 {idx}:")
print(f" - 會話ID: {session.session_id}")
print(f" - 使用者ID: {session.user_id}")
print(f" - 更新時間: {session.update_time if hasattr(session, 'update_time') else '未知'}")
if session.metadata:
print(f" - 中繼資料:")
for key, value in session.metadata.items():
print(f" {key}: {value}")
print()
except Exception as e:
print(f"查詢會話列表失敗: {e}")
print("====== 查詢完成 ======")
if __name__ == "__main__":
main()
查詢指定會話詳情
import tablestore
from tablestore_for_agent_memory.base.common import MetaType
from tablestore_for_agent_memory.memory.memory_store import MemoryStore
import os
def main():
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')
required_env_vars = {
'TABLESTORE_ENDPOINT': endpoint,
'TABLESTORE_ACCESS_KEY_ID': access_key_id,
'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
'TABLESTORE_INSTANCE_NAME': instance_name
}
missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
print(f"錯誤: 缺少必需的環境變數: {', '.join(missing_vars)}")
print("請設定以下環境變數:")
for var in missing_vars:
print(f" export {var}=your_value")
exit(1)
tablestore_client = tablestore.OTSClient(
endpoint,
access_key_id,
access_key_secret,
instance_name,
retry_policy=tablestore.WriteRetryPolicy(),
)
session_secondary_index_meta = {
"meta_string": MetaType.STRING,
"meta_long": MetaType.INTEGER,
"meta_double": MetaType.DOUBLE,
"meta_boolean": MetaType.BOOLEAN,
"meta_bytes": MetaType.BINARY,
}
session_search_index_schema = [
tablestore.FieldSchema(
"title",
tablestore.FieldType.TEXT,
analyzer=tablestore.AnalyzerType.FUZZY,
analyzer_parameter=tablestore.FuzzyAnalyzerParameter(1, 4),
),
tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]
message_search_index_schema = [
tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]
memory_store = MemoryStore(
tablestore_client=tablestore_client,
session_secondary_index_meta=session_secondary_index_meta,
session_search_index_schema=session_search_index_schema,
message_search_index_schema=message_search_index_schema,
)
print("====== 查詢指定會話的詳情 ======\n")
user_id = "test_user_1"
session_id = "session_001"
print(f"查詢會話詳情...")
print(f"使用者ID: {user_id}")
print(f"會話ID: {session_id}\n")
try:
session = memory_store.get_session(user_id=user_id, session_id=session_id)
if session:
print("會話詳細資料:")
print("=" * 50)
print(f"使用者ID: {session.user_id}")
print(f"會話ID: {session.session_id}")
print(f"更新時間: {session.update_time if hasattr(session, 'update_time') else '未知'}")
if session.metadata:
print("\n中繼資料資訊:")
print("-" * 50)
for key, value in session.metadata.items():
print(f" {key}: {value}")
else:
print("\n中繼資料: 無")
print("=" * 50)
else:
print(f"未找到指定的會話 (user_id={user_id}, session_id={session_id})")
except Exception as e:
print(f"查詢會話詳情失敗: {e}")
import traceback
traceback.print_exc()
print("\n====== 查詢完成 ======")
if __name__ == "__main__":
main()
查詢指定會話完整對話記錄
import tablestore
from tablestore_for_agent_memory.base.common import MetaType
from tablestore_for_agent_memory.memory.memory_store import MemoryStore
import os
def main():
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')
required_env_vars = {
'TABLESTORE_ENDPOINT': endpoint,
'TABLESTORE_ACCESS_KEY_ID': access_key_id,
'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
'TABLESTORE_INSTANCE_NAME': instance_name
}
missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
print(f"錯誤: 缺少必需的環境變數: {', '.join(missing_vars)}")
print("請設定以下環境變數:")
for var in missing_vars:
print(f" export {var}=your_value")
exit(1)
tablestore_client = tablestore.OTSClient(
endpoint,
access_key_id,
access_key_secret,
instance_name,
retry_policy=tablestore.WriteRetryPolicy(),
)
session_secondary_index_meta = {
"meta_string": MetaType.STRING,
"meta_long": MetaType.INTEGER,
"meta_double": MetaType.DOUBLE,
"meta_boolean": MetaType.BOOLEAN,
"meta_bytes": MetaType.BINARY,
}
session_search_index_schema = [
tablestore.FieldSchema(
"title",
tablestore.FieldType.TEXT,
analyzer=tablestore.AnalyzerType.FUZZY,
analyzer_parameter=tablestore.FuzzyAnalyzerParameter(1, 4),
),
tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]
message_search_index_schema = [
tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]
memory_store = MemoryStore(
tablestore_client=tablestore_client,
session_secondary_index_meta=session_secondary_index_meta,
session_search_index_schema=session_search_index_schema,
message_search_index_schema=message_search_index_schema,
)
print("====== 查詢指定會話的完整對話記錄 ======\n")
session_id = "session_001"
print(f"查詢會話對話記錄...")
print(f"會話ID: {session_id}\n")
try:
messages = list(memory_store.list_messages(session_id=session_id))
if not messages:
print(f"會話 {session_id} 暫無對話記錄")
else:
messages.sort(key=lambda m: m.create_time)
print(f"共找到 {len(messages)} 條訊息\n")
print("=" * 80)
round_num = 0
for idx, message in enumerate(messages):
message_type = message.metadata.get("message_type", "unknown")
if message_type == "user":
round_num += 1
print(f"\n第 {round_num} 輪對話:")
print("-" * 80)
role = "使用者" if message_type == "user" else "助手"
print(f"\n[{role}] (訊息ID: {message.message_id})")
print(f"內容: {message.content}")
print(f"建立時間: {message.create_time}")
if message.metadata and len(message.metadata) > 1:
print("中繼資料:")
for key, value in message.metadata.items():
if key != "message_type":
print(f" - {key}: {value}")
print("\n" + "=" * 80)
print(f"\n對話統計: 共 {round_num} 輪對話,{len(messages)} 條訊息")
except Exception as e:
print(f"查詢對話記錄失敗: {e}")
import traceback
traceback.print_exc()
print("\n====== 查詢完成 ======")
if __name__ == "__main__":
main()
範例程式碼:Knowledge情境
Knowledge情境專註於構建AI知識庫,支援海量文檔的向量化儲存和智能檢索。以下樣本展示如何建立知識庫、匯入文件,並通過向量檢索、全文檢索索引等方式實現智能問答。
範例程式碼使用阿里雲百鍊的text-embedding-v2模型進行向量化,需要先安裝相關依賴並將API Key配置為環境變數OPENAI_API_KEY。
pip3 install openai建立知識庫和寫入知識
資料寫入後,多元索引需要幾秒鐘完成同步。若使用後續範例程式碼查詢不到資料,需等待多元索引同步完成。
import tablestore
from tablestore_for_agent_memory.knowledge.knowledge_store import KnowledgeStore
from tablestore_for_agent_memory.base.base_knowledge_store import Document
from openai import OpenAI
import os
class OpenAIEmbedding:
def __init__(self, api_key, base_url=None, model="text-embedding-v2", dimension=1536):
self.client = OpenAI(
api_key=api_key,
base_url=base_url
)
self.model = model
self.dimension = dimension
def embedding(self, text):
try:
response = self.client.embeddings.create(
model=self.model,
input=text
)
return response.data[0].embedding
except Exception as e:
print(f"Embedding 調用異常: {e}")
return None
def main():
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')
openai_api_key = os.getenv('OPENAI_API_KEY')
required_env_vars = {
'TABLESTORE_ENDPOINT': endpoint,
'TABLESTORE_ACCESS_KEY_ID': access_key_id,
'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
'TABLESTORE_INSTANCE_NAME': instance_name,
'OPENAI_API_KEY': openai_api_key
}
missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
print(f"錯誤: 缺少必需的環境變數: {', '.join(missing_vars)}")
print("請設定以下環境變數:")
for var in missing_vars:
print(f" export {var}=your_value")
exit(1)
tablestore_client = tablestore.OTSClient(
endpoint,
access_key_id,
access_key_secret,
instance_name,
retry_policy=tablestore.WriteRetryPolicy(),
)
search_index_schema = [
tablestore.FieldSchema("user_id", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("category", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]
base_url = "https://dashscope.aliyuncs.com/compatible-mode/v1"
embedding_model = OpenAIEmbedding(
api_key=openai_api_key,
base_url=base_url,
model="text-embedding-v2",
dimension=1536
)
knowledge_store = KnowledgeStore(
tablestore_client=tablestore_client,
vector_dimension=1536,
enable_multi_tenant=True,
search_index_schema=search_index_schema,
)
print("開始建立表和索引...")
try:
knowledge_store.init_table()
print("表和索引建立成功")
except Exception as e:
print(f"表和索引已存在或建立失敗: {e}")
print("\n====== 寫入 Tablestore 知識庫文檔 ======\n")
documents_data = [
{
"id": "doc_001",
"text": "Table Store(Tablestore)是阿里雲自研的第一代飛天產品,提供海量結構化資料存放區以及快速的查詢和分析服務。Table Store的分布式儲存和強大的索引引擎能夠支援單表PB級儲存、千萬TPS以及毫秒級延遲的服務能力。",
"category": "產品介紹",
"meta_long": 1
},
{
"id": "doc_002",
"text": "Table Store支援寬表模型,單表支援PB級資料存放區和千萬QPS,適合儲存使用者畫像、訂單詳情等情境。同時支援時序模型,可以高效儲存和查詢物聯網裝置、監控系統產生的時序資料。",
"category": "資料模型",
"meta_long": 2
},
{
"id": "doc_003",
"text": "Table Store提供多種索引類型:主鍵索引支援快速的點查詢和範圍查詢;全域二級索引可以基於非主鍵列進行查詢;多元索引支援複雜的查詢條件組合和全文檢索索引;向量檢索支援 AI 情境的相似性搜尋。",
"category": "索引功能",
"meta_long": 3
},
{
"id": "doc_004",
"text": "Table Store適用於多種情境:中繼資料管理可以儲存海量檔案、視頻、圖片的元資訊;訊息資料用於儲存 IM 聊天訊息、Feed 流訊息;軌跡溯源儲存車連網軌跡、物流軌跡等時序資料;推薦系統儲存使用者畫像和物品特徵。",
"category": "應用情境",
"meta_long": 4
},
{
"id": "doc_005",
"text": "Table Store的多元索引支援豐富的查詢能力,包括精確查詢、範圍查詢、首碼查詢、萬用字元查詢、全文檢索索引、地理位置查詢、巢狀查詢等。同時支援排序、彙總、統計分析等進階功能。",
"category": "查詢能力",
"meta_long": 5
},
{
"id": "doc_006",
"text": "Table Store提供 Agent Memory 能力,包括 Memory Store 用於儲存會話和訊息記錄,Knowledge Store 用於儲存知識庫文檔並支援向量檢索。這些能力可以協助構建智能問答、對話機器人等 AI 應用。",
"category": "AI 能力",
"meta_long": 6
},
{
"id": "doc_007",
"text": "Table Store的向量檢索功能支援海量向量資料的儲存和高效檢索,可以應用於Image Search、語義搜尋、推薦系統等情境。支援 L2 距離、餘弦相似性等多種相似性演算法。",
"category": "向量檢索",
"meta_long": 7
},
{
"id": "doc_008",
"text": "Table Store提供多種資料保護機制:支援資料備份和恢複;提供資料生命週期管理,可以自動到期和刪除舊資料;支援資料加密儲存,保障資料安全。",
"category": "資料保護",
"meta_long": 8
}
]
tenant_id = "user_tablestore_001"
success_count = 0
for doc_data in documents_data:
try:
document = Document(document_id=doc_data["id"], tenant_id=tenant_id)
document.text = doc_data["text"]
document.embedding = embedding_model.embedding(document.text)
if document.embedding is None:
print(f"✗ 產生向量失敗,跳過文檔 {doc_data['id']}")
continue
document.metadata["category"] = doc_data["category"]
document.metadata["meta_long"] = doc_data["meta_long"]
document.metadata["meta_boolean"] = True
document.metadata["user_id"] = tenant_id
knowledge_store.put_document(document)
success_count += 1
print(f"✓ 寫入文檔 {doc_data['id']}: {doc_data['category']}")
print(f" 內容: {doc_data['text'][:60]}...")
print()
except Exception as e:
print(f"✗ 寫入文檔 {doc_data['id']} 失敗: {e}")
print("=" * 80)
print(f"\n寫入完成: 成功 {success_count}/{len(documents_data)} 條文檔")
print(f"租戶ID: {tenant_id}")
print(f"文檔類別: {', '.join(set([d['category'] for d in documents_data]))}")
print("\n提示: 資料寫入後,多元索引可能需要幾秒鐘時間完成同步")
if __name__ == "__main__":
main()向量檢索
import tablestore
from tablestore_for_agent_memory.knowledge.knowledge_store import KnowledgeStore
from openai import OpenAI
import os
class OpenAIEmbedding:
def __init__(self, api_key, base_url=None, model="text-embedding-v2", dimension=1536):
self.client = OpenAI(
api_key=api_key,
base_url=base_url
)
self.model = model
self.dimension = dimension
def embedding(self, text):
try:
response = self.client.embeddings.create(
model=self.model,
input=text
)
return response.data[0].embedding
except Exception as e:
print(f"Embedding 調用異常: {e}")
return None
def main():
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')
openai_api_key = os.getenv('OPENAI_API_KEY')
required_env_vars = {
'TABLESTORE_ENDPOINT': endpoint,
'TABLESTORE_ACCESS_KEY_ID': access_key_id,
'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
'TABLESTORE_INSTANCE_NAME': instance_name,
'OPENAI_API_KEY': openai_api_key
}
missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
print(f"錯誤: 缺少必需的環境變數: {', '.join(missing_vars)}")
print("請設定以下環境變數:")
for var in missing_vars:
print(f" export {var}=your_value")
exit(1)
tablestore_client = tablestore.OTSClient(
endpoint,
access_key_id,
access_key_secret,
instance_name,
retry_policy=tablestore.WriteRetryPolicy(),
)
search_index_schema = [
tablestore.FieldSchema("user_id", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("category", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]
base_url = "https://dashscope.aliyuncs.com/compatible-mode/v1"
embedding_model = OpenAIEmbedding(
api_key=openai_api_key,
base_url=base_url,
model="text-embedding-v2",
dimension=1536
)
knowledge_store = KnowledgeStore(
tablestore_client=tablestore_client,
vector_dimension=1536,
enable_multi_tenant=True,
search_index_schema=search_index_schema,
)
print("====== 向量檢索測試 ======\n")
query_text = "Table Store支援哪些索引類型?"
tenant_id = "user_tablestore_001"
print(f"查詢問題: {query_text}")
print(f"租戶ID: {tenant_id}")
print(f"返回結果數: Top 3\n")
try:
print("正在產生查詢向量...")
query_vector = embedding_model.embedding(query_text)
if query_vector is None:
print("產生查詢向量失敗")
else:
print(f"查詢向量產生成功,維度: {len(query_vector)}\n")
response = knowledge_store.vector_search(
query_vector=query_vector,
tenant_id=tenant_id,
limit=3
)
if not response.hits:
print("未找到相關文檔")
else:
print("=" * 80)
print(f"找到 {len(response.hits)} 個相關文檔:\n")
for idx, hit in enumerate(response.hits, 1):
doc = hit.document
score = hit.score
print(f"【結果 {idx}】")
print(f"文檔ID: {doc.document_id}")
print(f"相似性分數: {score:.4f}")
if hasattr(doc, 'metadata') and 'category' in doc.metadata:
print(f"類別: {doc.metadata['category']}")
print(f"內容: {doc.text}")
print("-" * 80)
print()
except Exception as e:
print(f"向量檢索失敗: {e}")
import traceback
traceback.print_exc()
print("\n====== 檢索完成 ======")
if __name__ == "__main__":
main()
全文檢索索引
import tablestore
from tablestore_for_agent_memory.knowledge.knowledge_store import KnowledgeStore, Filters
import os
def main():
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')
required_env_vars = {
'TABLESTORE_ENDPOINT': endpoint,
'TABLESTORE_ACCESS_KEY_ID': access_key_id,
'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
'TABLESTORE_INSTANCE_NAME': instance_name
}
missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
print(f"錯誤: 缺少必需的環境變數: {', '.join(missing_vars)}")
print("請設定以下環境變數:")
for var in missing_vars:
print(f" export {var}=your_value")
exit(1)
tablestore_client = tablestore.OTSClient(
endpoint,
access_key_id,
access_key_secret,
instance_name,
retry_policy=tablestore.WriteRetryPolicy(),
)
search_index_schema = [
tablestore.FieldSchema("user_id", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("category", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]
knowledge_store = KnowledgeStore(
tablestore_client=tablestore_client,
vector_dimension=1536,
enable_multi_tenant=True,
search_index_schema=search_index_schema,
)
print("====== 全文檢索索引測試 ======\n")
query_keyword = "向量檢索"
tenant_id = "user_tablestore_001"
print(f"查詢關鍵詞: {query_keyword}")
print(f"租戶ID: {tenant_id}")
print(f"返回結果數: Top 3\n")
try:
response = knowledge_store.search_documents(
tenant_id=tenant_id,
metadata_filter=Filters.text_match("text", query_keyword),
limit=3
)
if not response.hits:
print("未找到包含關鍵詞的文檔")
else:
print("=" * 80)
print(f"找到 {len(response.hits)} 個包含關鍵詞的文檔:\n")
for idx, hit in enumerate(response.hits, 1):
doc = hit.document
score = hit.score
print(f"【結果 {idx}】")
print(f"文檔ID: {doc.document_id}")
print(f"匹配分數: {score if score is not None else 'N/A'}")
if hasattr(doc, 'metadata') and 'category' in doc.metadata:
print(f"類別: {doc.metadata['category']}")
content = doc.text
if query_keyword in content:
highlighted = content.replace(query_keyword, f"【{query_keyword}】")
print(f"內容: {highlighted}")
else:
print(f"內容: {content}")
print("-" * 80)
print()
except Exception as e:
print(f"全文檢索索引失敗: {e}")
import traceback
traceback.print_exc()
print("\n====== 檢索完成 ======")
print("\n補充說明:")
print("- 全文檢索索引會在文檔的 text 欄位中搜尋包含查詢關鍵詞的文檔")
print("- 可以使用萬用字元、短語查詢等進階文法")
print("- 支援中文分詞和模糊比對")
if __name__ == "__main__":
main()
通用檢索
import tablestore
from tablestore_for_agent_memory.knowledge.knowledge_store import KnowledgeStore, Filters
import os
def main():
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')
required_env_vars = {
'TABLESTORE_ENDPOINT': endpoint,
'TABLESTORE_ACCESS_KEY_ID': access_key_id,
'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
'TABLESTORE_INSTANCE_NAME': instance_name
}
missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
print(f"錯誤: 缺少必需的環境變數: {', '.join(missing_vars)}")
print("請設定以下環境變數:")
for var in missing_vars:
print(f" export {var}=your_value")
exit(1)
tablestore_client = tablestore.OTSClient(
endpoint,
access_key_id,
access_key_secret,
instance_name,
retry_policy=tablestore.WriteRetryPolicy(),
)
search_index_schema = [
tablestore.FieldSchema("user_id", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("category", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]
knowledge_store = KnowledgeStore(
tablestore_client=tablestore_client,
vector_dimension=1536,
enable_multi_tenant=True,
search_index_schema=search_index_schema,
)
print("====== 通用檢索測試 ======\n")
tenant_id = "user_tablestore_001"
print("通用檢索支援基於中繼資料的靈活過濾查詢,不依賴向量或全文檢索索引")
print(f"租戶ID: {tenant_id}")
print(f"返回結果數: Top 3\n")
print("【情境 1】查詢類別為 '應用情境' 的文檔")
print("-" * 80)
try:
response = knowledge_store.search_documents(
tenant_id=tenant_id,
limit=3,
metadata_filter=Filters.eq("category", "應用情境"),
meta_data_to_get=["text", "category", "meta_long"]
)
if not response.hits:
print("未找到匹配的文檔\n")
else:
for idx, hit in enumerate(response.hits, 1):
doc = hit.document
print(f"\n結果 {idx}:")
print(f" 文檔ID: {doc.document_id}")
print(f" 類別: {doc.metadata.get('category', 'N/A')}")
print(f" 內容: {doc.text[:100]}...")
print()
except Exception as e:
print(f"檢索失敗: {e}\n")
print("\n【情境 2】查詢 meta_long > 3 且 meta_boolean = True 的文檔")
print("-" * 80)
try:
response = knowledge_store.search_documents(
tenant_id=tenant_id,
limit=3,
metadata_filter=Filters.logical_and([
Filters.gt("meta_long", 3),
Filters.eq("meta_boolean", True)
]),
meta_data_to_get=["text", "category", "meta_long"]
)
if not response.hits:
print("未找到匹配的文檔\n")
else:
for idx, hit in enumerate(response.hits, 1):
doc = hit.document
print(f"\n結果 {idx}:")
print(f" 文檔ID: {doc.document_id}")
print(f" 類別: {doc.metadata.get('category', 'N/A')}")
print(f" meta_long: {doc.metadata.get('meta_long', 'N/A')}")
print(f" 內容: {doc.text[:80]}...")
print()
except Exception as e:
print(f"檢索失敗: {e}\n")
print("\n【情境 3】查詢 meta_long 在 2-5 之間的文檔")
print("-" * 80)
try:
response = knowledge_store.search_documents(
tenant_id=tenant_id,
limit=3,
metadata_filter=Filters.logical_and([
Filters.gte("meta_long", 2),
Filters.lte("meta_long", 5)
]),
meta_data_to_get=["text", "category", "meta_long"]
)
if not response.hits:
print("未找到匹配的文檔\n")
else:
for idx, hit in enumerate(response.hits, 1):
doc = hit.document
print(f"\n結果 {idx}:")
print(f" 文檔ID: {doc.document_id}")
print(f" 類別: {doc.metadata.get('category', 'N/A')}")
print(f" meta_long: {doc.metadata.get('meta_long', 'N/A')}")
print(f" 內容: {doc.text[:80]}...")
print()
except Exception as e:
print(f"檢索失敗: {e}\n")
print("\n【情境 4】擷取所有文檔(不帶過濾條件)")
print("-" * 80)
try:
response = knowledge_store.search_documents(
tenant_id=tenant_id,
limit=3,
meta_data_to_get=["text", "category", "meta_long"]
)
if not response.hits:
print("未找到任何文檔\n")
else:
print(f"\n共找到 {len(response.hits)} 個文檔(顯示前3個):")
for idx, hit in enumerate(response.hits, 1):
doc = hit.document
print(f"\n結果 {idx}:")
print(f" 文檔ID: {doc.document_id}")
print(f" 類別: {doc.metadata.get('category', 'N/A')}")
print(f" 內容: {doc.text[:60]}...")
if response.next_token:
print(f"\n還有更多結果,可使用 next_token 進行翻頁查詢")
print()
except Exception as e:
print(f"檢索失敗: {e}\n")
print("\n" + "=" * 80)
print("\n====== 檢索完成 ======")
print("\n通用檢索特點:")
print("- 支援基於中繼資料欄位的靈活過濾")
print("- 支援精確匹配、範圍查詢、邏輯組合等")
print("- 不需要向量或全文檢索索引,適合結構化查詢")
print("- 可以指定返回的欄位,減少資料轉送量")
if __name__ == "__main__":
main()