AgentScope Runtime 支援使用阿里雲Table Store(Tablestore)作為 Memory Service 和 Session History Service 的持久化後端,為 AI Agent 應用提供高效能、高可用的會話歷史和長期記憶儲存能力。

核心功能
服務 | 功能 | 適用情境 |
TablestoreMemoryService | 長期記憶儲存,支援全文檢索索引 | 使用者偏好、知識庫、跨會話記憶 |
TablestoreSessionHistoryService | 會話歷史儲存 | 對話上下文、訊息記錄 |
準備工作
在開始使用 Tablestore 儲存服務之前,需要完成以下環境配置和依賴安裝。
環境準備:確保已安裝Python運行環境,可通過
python3 --version命令查看版本資訊。下載專案:下載 AgentScope Runtime 1.0.5 Release 包agentscope-runtime.zip並解壓,解壓後進入專案目錄。
重要Tablestore 整合功能僅在 AgentScope Runtime 1.0.5 及之前的版本中可用,請確保使用指定版本的 Release 包。
cd agentscope-runtime建立虛擬環境:建立 Python 虛擬環境並啟用。
python3 -m venv .venv && source .venv/bin/activate安裝依賴:安裝專案核心依賴 + Tablestore 擴充。
pip3 install -e . && pip3 install tablestore-for-agent-memory langchain-community配置環境變數:設定以下必需的環境變數,用於串連 Tablestore 和調用大模型。
環境變數
說明
TABLESTORE_ENDPOINTTablestore執行個體訪問地址,可在Table Store控制台擷取
TABLESTORE_INSTANCE_NAMETablestore執行個體名稱
TABLESTORE_ACCESS_KEY_ID阿里雲帳號或RAM使用者的AccessKey ID
TABLESTORE_ACCESS_KEY_SECRET阿里雲帳號或RAM使用者的AccessKey Secret
DASHSCOPE_API_KEY百鍊平台API Key(用於調用大模型)
export TABLESTORE_ENDPOINT="https://<instance>.<region-id>.ots.aliyuncs.com" export TABLESTORE_INSTANCE_NAME="<instance-name>" export TABLESTORE_ACCESS_KEY_ID="<your-access-key-id>" export TABLESTORE_ACCESS_KEY_SECRET="<your-access-key-secret>" export DASHSCOPE_API_KEY="<your-dashscope-api-key>"
範例程式碼:Memory
Memory Service 用於管理 AI Agent 的長期記憶,支援跨會話的資訊儲存和語義檢索。
添加長期記憶
以下樣本示範如何將使用者資訊作為長期記憶寫入 Tablestore。將代碼儲存到 examples/tablestore_memory_add.py 檔案。
import os
import asyncio
from tablestore import AsyncOTSClient
from agentscope_runtime.engine.services.memory import TablestoreMemoryService
from agentscope_runtime.engine.schemas.agent_schemas import (
Message,
MessageType,
Role,
TextContent,
ContentType,
)
async def main():
endpoint = os.getenv("TABLESTORE_ENDPOINT")
instance_name = os.getenv("TABLESTORE_INSTANCE_NAME")
access_key_id = os.getenv("TABLESTORE_ACCESS_KEY_ID")
access_key_secret = os.getenv("TABLESTORE_ACCESS_KEY_SECRET")
client = AsyncOTSClient(endpoint, access_key_id, access_key_secret, instance_name)
memory_service = TablestoreMemoryService(
tablestore_client=client,
table_name="agentscope_runtime_memory",
)
await memory_service.start()
print("Memory Service 啟動成功")
user_id = "user_001"
session_id = "session_001"
messages = [
Message(
type=MessageType.MESSAGE,
role=Role.USER,
content=[TextContent(type=ContentType.TEXT, text="我喜歡吃川菜")],
),
Message(
type=MessageType.MESSAGE,
role=Role.USER,
content=[TextContent(type=ContentType.TEXT, text="我的生日是 3 月 15 日")],
),
]
print(f"準備寫入 {len(messages)} 條記憶:")
for msg in messages:
print(f" - [{msg.role}] {msg.get_text_content()}")
await memory_service.add_memory(
user_id=user_id,
messages=messages,
session_id=session_id,
)
print(f"寫入成功, user_id={user_id}, session_id={session_id}")
await memory_service.stop()
if __name__ == "__main__":
asyncio.run(main())運行樣本:
python examples/tablestore_memory_add.py搜尋長期記憶
以下樣本示範如何通過語義檢索查詢使用者的長期記憶。將代碼儲存到 examples/tablestore_memory_search.py 檔案。
資料寫入後,多元索引需要幾秒鐘完成同步。若使用後續範例程式碼查詢不到資料,需等待多元索引同步完成。
import os
import asyncio
from tablestore import AsyncOTSClient
from agentscope_runtime.engine.services.memory import TablestoreMemoryService
from agentscope_runtime.engine.schemas.agent_schemas import (
Message,
MessageType,
Role,
TextContent,
ContentType,
)
async def main():
endpoint = os.getenv("TABLESTORE_ENDPOINT")
instance_name = os.getenv("TABLESTORE_INSTANCE_NAME")
access_key_id = os.getenv("TABLESTORE_ACCESS_KEY_ID")
access_key_secret = os.getenv("TABLESTORE_ACCESS_KEY_SECRET")
client = AsyncOTSClient(endpoint, access_key_id, access_key_secret, instance_name)
memory_service = TablestoreMemoryService(
tablestore_client=client,
table_name="agentscope_runtime_memory",
)
await memory_service.start()
print("Memory Service 啟動成功")
user_id = "user_001"
query = [
Message(
type=MessageType.MESSAGE,
role=Role.USER,
content=[TextContent(type=ContentType.TEXT, text="我喜歡什麼口味的菜")],
)
]
print(f"搜尋查詢: {query[0].get_text_content()}")
results = await memory_service.search_memory(user_id=user_id, messages=query)
print(f"搜尋結果, 找到 {len(results)} 條相關記憶:")
for mem in results:
print(f" - [{mem.role}] {mem.get_text_content()}")
all_memories = await memory_service.list_memory(user_id=user_id)
print(f"使用者 {user_id} 共有 {len(all_memories)} 條長期記憶")
await memory_service.stop()
if __name__ == "__main__":
asyncio.run(main())運行樣本:
python examples/tablestore_memory_search.py範例程式碼:Session
Session History Service 用於管理會話和訊息記錄,儲存對話上下文。
建立會話並添加訊息
以下樣本示範如何建立會話並寫入對話訊息。將代碼儲存到 examples/tablestore_session_create.py 檔案。
import os
import asyncio
from tablestore import AsyncOTSClient
from agentscope_runtime.engine.services.session_history import TablestoreSessionHistoryService
from agentscope_runtime.engine.schemas.agent_schemas import (
Message,
MessageType,
Role,
TextContent,
ContentType,
)
async def main():
endpoint = os.getenv("TABLESTORE_ENDPOINT")
instance_name = os.getenv("TABLESTORE_INSTANCE_NAME")
access_key_id = os.getenv("TABLESTORE_ACCESS_KEY_ID")
access_key_secret = os.getenv("TABLESTORE_ACCESS_KEY_SECRET")
client = AsyncOTSClient(endpoint, access_key_id, access_key_secret, instance_name)
session_service = TablestoreSessionHistoryService(
tablestore_client=client,
session_table_name="agentscope_runtime_session",
message_table_name="agentscope_runtime_message",
)
await session_service.start()
print("Session History Service 啟動成功")
user_id = "user_001"
session_id = "session_003"
session = await session_service.create_session(
user_id=user_id,
session_id=session_id,
)
print(f"建立會話成功: session_id={session.id}, user_id={user_id}")
user_msg = Message(
type=MessageType.MESSAGE,
role=Role.USER,
content=[TextContent(type=ContentType.TEXT, text="你好,我叫張三")],
)
await session_service.append_message(session, user_msg)
print(f"寫入訊息: [{user_msg.role}] {user_msg.get_text_content()}")
assistant_msg = Message(
type=MessageType.MESSAGE,
role=Role.ASSISTANT,
content=[TextContent(type=ContentType.TEXT, text="你好張三!有什麼可以協助你的?")],
)
await session_service.append_message(session, assistant_msg)
print(f"寫入訊息: [{assistant_msg.role}] {assistant_msg.get_text_content()}")
print(f"會話 {session_id} 已寫入 2 條訊息")
await session_service.stop()
if __name__ == "__main__":
asyncio.run(main())運行樣本:
python examples/tablestore_session_create.py擷取會話歷史
以下樣本示範如何擷取指定會話的歷史訊息記錄。將代碼儲存到 examples/tablestore_session_get.py 檔案。
資料寫入後,多元索引需要幾秒鐘完成同步。若使用後續範例程式碼查詢不到資料,需等待多元索引同步完成。
import os
import asyncio
from tablestore import AsyncOTSClient
from agentscope_runtime.engine.services.session_history import TablestoreSessionHistoryService
async def main():
endpoint = os.getenv("TABLESTORE_ENDPOINT")
instance_name = os.getenv("TABLESTORE_INSTANCE_NAME")
access_key_id = os.getenv("TABLESTORE_ACCESS_KEY_ID")
access_key_secret = os.getenv("TABLESTORE_ACCESS_KEY_SECRET")
client = AsyncOTSClient(endpoint, access_key_id, access_key_secret, instance_name)
session_service = TablestoreSessionHistoryService(
tablestore_client=client,
session_table_name="agentscope_runtime_session",
message_table_name="agentscope_runtime_message",
)
await session_service.start()
print("Session History Service 啟動成功")
user_id = "user_001"
session_id = "session_003"
session = await session_service.get_session(user_id, session_id)
print(f"擷取會話 {session_id}, 共 {len(session.messages)} 條訊息:")
for msg in session.messages:
text = msg.get_text_content() if hasattr(msg, "get_text_content") else str(msg)
role = msg.role if hasattr(msg, "role") else "unknown"
print(f" - [{role}] {text}")
sessions = await session_service.list_sessions(user_id=user_id)
print(f"使用者 {user_id} 共有 {len(sessions)} 個會話")
await session_service.stop()
if __name__ == "__main__":
asyncio.run(main())運行樣本:
python examples/tablestore_session_get.py範例程式碼:互動式聊天 Demo
以下樣本展示如何構建一個完整的互動式聊天應用,整合大模型調用、會話歷史儲存和長期記憶檢索。將代碼儲存到 examples/tablestore_chat_demo.py 檔案。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# 本樣本展示如何構建一個互動式聊天 Demo,使用 Tablestore 儲存會話歷史和長期記憶
# 使用前請配置環境變數:
# TABLESTORE_ENDPOINT, TABLESTORE_INSTANCE_NAME,
# TABLESTORE_ACCESS_KEY_ID, TABLESTORE_ACCESS_KEY_SECRET, DASHSCOPE_API_KEY
import os
import asyncio
import uuid
import sys
import io
import logging
from tablestore import AsyncOTSClient
from dashscope import Generation
from rich.console import Console
from rich.live import Live
from rich.spinner import Spinner
console = Console()
from agentscope_runtime.engine.services.memory import TablestoreMemoryService
from agentscope_runtime.engine.services.session_history import TablestoreSessionHistoryService
from agentscope_runtime.engine.schemas.agent_schemas import (
Message,
MessageType,
Role,
TextContent,
ContentType,
)
memory_service = None
session_service = None
current_session = None
current_session_id = None
user_id = "cli_user"
chat_history = []
# Table and index names
MEMORY_TABLE = "chat_demo_memory"
SESSION_TABLE = "chat_demo_session"
MESSAGE_TABLE = "chat_demo_message"
# Memory service uses a fixed index name in SDK
MEMORY_SEARCH_INDEX = "agentscope_runtime_knowledge_search_index_name"
# Session service uses custom index names to avoid conflicts with other demos
SESSION_SECONDARY_INDEX = f"{SESSION_TABLE}_secondary_index"
SESSION_SEARCH_INDEX = f"{SESSION_TABLE}_search_index"
MESSAGE_SECONDARY_INDEX = f"{MESSAGE_TABLE}_secondary_index"
MESSAGE_SEARCH_INDEX = f"{MESSAGE_TABLE}_search_index"
async def delete_table_with_indexes(client, table_name):
"""Delete a table and all its indexes using native Tablestore SDK."""
import tablestore
try:
# Delete search indexes first
try:
search_indexes = await client.list_search_index(table_name)
for index_tuple in search_indexes:
await client.delete_search_index(index_tuple[0], index_tuple[1])
except Exception:
pass
# Delete the table
await client.delete_table(table_name)
print(f" Deleted table: {table_name}")
except tablestore.OTSServiceError as e:
if "OTSObjectNotExist" not in str(e):
print(f" Warning: Failed to delete {table_name}: {e}")
async def init_services():
global memory_service, session_service
client = AsyncOTSClient(
os.getenv("TABLESTORE_ENDPOINT"),
os.getenv("TABLESTORE_ACCESS_KEY_ID"),
os.getenv("TABLESTORE_ACCESS_KEY_SECRET"),
os.getenv("TABLESTORE_INSTANCE_NAME"),
)
# Check existing tables using native SDK
existing_tables = await client.list_table()
memory_exists = MEMORY_TABLE in existing_tables
session_existing = [t for t in [SESSION_TABLE, MESSAGE_TABLE] if t in existing_tables]
session_all_exist = len(session_existing) == 2
session_none_exist = len(session_existing) == 0
# Handle incomplete session tables - delete existing ones and recreate
if not session_all_exist and not session_none_exist:
print(f"Incomplete session tables detected: {session_existing}")
print("Cleaning up...")
for t in session_existing:
await delete_table_with_indexes(client, t)
# Wait for deletion to complete
await asyncio.sleep(3)
session_all_exist = False
# Create service instances
memory_service = TablestoreMemoryService(
tablestore_client=client,
table_name=MEMORY_TABLE,
)
session_service = TablestoreSessionHistoryService(
tablestore_client=client,
session_table_name=SESSION_TABLE,
message_table_name=MESSAGE_TABLE,
)
# Suppress SDK logs during init
logging.getLogger("tablestore_for_agent_memory").setLevel(logging.CRITICAL)
original_stderr = sys.stderr
sys.stderr = io.StringIO()
try:
# Init memory service
if memory_exists:
_init_memory_service_without_create(memory_service)
else:
await memory_service.start()
# Init session service - always use custom init to avoid index name conflicts
if session_all_exist:
_init_session_service_without_create(session_service)
else:
# Create tables and indexes using SDK with custom index names
await _create_session_tables(client)
_init_session_service_without_create(session_service)
finally:
sys.stderr = original_stderr
logging.getLogger("tablestore_for_agent_memory").setLevel(logging.WARNING)
print("Services initialized")
async def _create_session_tables(client):
"""Create session and message tables with custom index names."""
from tablestore_for_agent_memory.memory.async_memory_store import AsyncMemoryStore
# Create the memory store with custom index names
store = AsyncMemoryStore(
tablestore_client=client,
session_table_name=SESSION_TABLE,
message_table_name=MESSAGE_TABLE,
session_secondary_index_name=SESSION_SECONDARY_INDEX,
session_search_index_name=SESSION_SEARCH_INDEX,
message_secondary_index_name=MESSAGE_SECONDARY_INDEX,
message_search_index_name=MESSAGE_SEARCH_INDEX,
)
# Init tables and indexes
await store.init_table()
# Wait for table creation
await asyncio.sleep(1)
def _init_memory_service_without_create(service):
"""Initialize memory service store without creating tables."""
from tablestore_for_agent_memory.knowledge.async_knowledge_store import AsyncKnowledgeStore
service._knowledge_store = AsyncKnowledgeStore(
tablestore_client=service._tablestore_client,
vector_dimension=service._vector_dimension,
enable_multi_tenant=False,
table_name=service._table_name,
search_index_name=MEMORY_SEARCH_INDEX,
search_index_schema=service._search_index_schema,
text_field=service._text_field,
embedding_field=service._embedding_field,
vector_metric_type=service._vector_metric_type,
)
def _init_session_service_without_create(service):
"""Initialize session service store without creating tables."""
from tablestore_for_agent_memory.memory.async_memory_store import AsyncMemoryStore
service._memory_store = AsyncMemoryStore(
tablestore_client=service._tablestore_client,
session_table_name=SESSION_TABLE,
message_table_name=MESSAGE_TABLE,
session_secondary_index_name=SESSION_SECONDARY_INDEX,
session_search_index_name=SESSION_SEARCH_INDEX,
message_secondary_index_name=MESSAGE_SECONDARY_INDEX,
message_search_index_name=MESSAGE_SEARCH_INDEX,
session_secondary_index_meta=service._session_secondary_index_meta,
session_search_index_schema=service._session_search_index_schema,
message_search_index_schema=service._message_search_index_schema,
)
async def create_new_session():
global current_session, current_session_id, chat_history
current_session_id = f"session_{uuid.uuid4().hex[:8]}"
current_session = await session_service.create_session(
user_id=user_id,
session_id=current_session_id,
)
chat_history = []
print(f"New session created: {current_session_id}")
def create_message(role: str, text: str) -> Message:
return Message(
type=MessageType.MESSAGE,
role=role,
content=[TextContent(type=ContentType.TEXT, text=text)],
)
async def save_message(role: str, text: str):
if current_session is None:
return
msg = create_message(role, text)
await session_service.append_message(current_session, msg)
async def search_memory(query: str) -> str:
msg = create_message(Role.USER, query)
results = await memory_service.search_memory(user_id=user_id, messages=[msg])
if not results:
return ""
texts = [r.get_text_content() for r in results[:3] if r.get_text_content()]
return "\n".join([f" - {t}" for t in texts])
async def add_memory(text: str):
msg = create_message(Role.USER, text)
await memory_service.add_memory(
user_id=user_id,
messages=[msg],
session_id=current_session_id,
)
print(f"Memory saved: {text}")
def call_llm(messages: list, memory_context: str = "") -> str:
system_prompt = "You are a helpful AI assistant."
if memory_context:
system_prompt += f"\n\nUser's historical memory:\n{memory_context}"
dashscope_messages = [{"role": "system", "content": system_prompt}]
for role, content in messages:
dashscope_messages.append({"role": role, "content": content})
response = Generation.call(
model="qwen-turbo",
messages=dashscope_messages,
api_key=os.getenv("DASHSCOPE_API_KEY"),
result_format="message",
)
if response.status_code == 200:
return response.output.choices[0].message.content
return f"Error: {response.code} - {response.message}"
async def chat(user_input: str):
global chat_history
memory_context = await search_memory(user_input)
if memory_context:
print(f"[Memory] Found related memories:\n{memory_context}")
messages = chat_history + [("user", user_input)]
with Live(Spinner("dots", text="Thinking...", style="cyan"), console=console, refresh_per_second=10):
response = call_llm(messages, memory_context)
await save_message(Role.USER, user_input)
await save_message(Role.ASSISTANT, response)
chat_history.append(("user", user_input))
chat_history.append(("assistant", response))
console.print(f"\n[bold green]Assistant:[/bold green] {response}\n")
async def list_sessions():
sessions = await session_service.list_sessions(user_id=user_id)
print(f"Total {len(sessions)} sessions:")
for s in sessions[:10]:
print(f" - {s.id}")
async def list_memories():
memories = await memory_service.list_memory(user_id=user_id)
print(f"Total {len(memories)} memories:")
for m in memories[:10]:
text = m.get_text_content()
if text:
print(f" - {text[:60]}...")
async def load_session(session_id: str):
global current_session, current_session_id, chat_history
current_session_id = session_id
current_session = await session_service.get_session(user_id, session_id)
chat_history = []
for msg in current_session.messages:
text = msg.get_text_content() if hasattr(msg, "get_text_content") else str(msg)
role = msg.role if hasattr(msg, "role") else "unknown"
chat_history.append((role, text))
print(f"Loaded session: {session_id}, {len(current_session.messages)} messages")
def print_help():
print("""
Commands:
/new - Create new session
/sessions - List all sessions
/load <id> - Load session by ID
/remember <x> - Save memory
/memories - List all memories
/history - Show current chat history
/help - Show this help
/quit - Exit
Otherwise, just type your message to chat.
""")
async def main():
await init_services()
await create_new_session()
print_help()
while True:
try:
user_input = input("You: ").strip()
except (EOFError, KeyboardInterrupt):
print("\nBye!")
break
if not user_input:
continue
if user_input == "/quit":
print("Bye!")
break
elif user_input == "/help":
print_help()
elif user_input == "/new":
await create_new_session()
elif user_input == "/sessions":
await list_sessions()
elif user_input == "/memories":
await list_memories()
elif user_input == "/history":
print(f"Current session: {current_session_id}")
for role, text in chat_history:
print(f" [{role}] {text[:80]}...")
elif user_input.startswith("/load "):
session_id = user_input[6:].strip()
await load_session(session_id)
elif user_input.startswith("/remember "):
text = user_input[10:].strip()
await add_memory(text)
else:
await chat(user_input)
await memory_service.stop()
await session_service.stop()
if __name__ == "__main__":
asyncio.run(main())
運行樣本:
python examples/tablestore_chat_demo.py互動命令:
命令
功能
/new建立新會話
/sessions列出所有會話
/load <id>載入指定會話
/remember <內容>儲存長期記憶
/memories列出所有記憶
/history顯示當前會話歷史
/help顯示協助資訊
/quit退出