全部產品
Search
文件中心

Tablestore:AgentScope Runtime整合Tablestore

更新時間:Feb 01, 2026

AgentScope Runtime 支援使用阿里雲Table Store(Tablestore)作為 Memory Service 和 Session History Service 的持久化後端,為 AI Agent 應用提供高效能、高可用的會話歷史和長期記憶儲存能力。

2026-01-30_17-03-06 (1)

核心功能

服務

功能

適用情境

TablestoreMemoryService

長期記憶儲存,支援全文檢索索引

使用者偏好、知識庫、跨會話記憶

TablestoreSessionHistoryService

會話歷史儲存

對話上下文、訊息記錄

準備工作

在開始使用 Tablestore 儲存服務之前,需要完成以下環境配置和依賴安裝。

  1. 環境準備:確保已安裝Python運行環境,可通過python3 --version命令查看版本資訊。

  2. 下載專案:下載 AgentScope Runtime 1.0.5 Release 包agentscope-runtime.zip並解壓,解壓後進入專案目錄。

    重要

    Tablestore 整合功能僅在 AgentScope Runtime 1.0.5 及之前的版本中可用,請確保使用指定版本的 Release 包。

    cd agentscope-runtime
  3. 建立虛擬環境:建立 Python 虛擬環境並啟用。

    python3 -m venv .venv && source .venv/bin/activate
  4. 安裝依賴:安裝專案核心依賴 + Tablestore 擴充。

    pip3 install -e . && pip3 install tablestore-for-agent-memory langchain-community
  5. 配置環境變數:設定以下必需的環境變數,用於串連 Tablestore 和調用大模型。

    環境變數

    說明

    TABLESTORE_ENDPOINT

    Tablestore執行個體訪問地址,可在Table Store控制台擷取

    TABLESTORE_INSTANCE_NAME

    Tablestore執行個體名稱

    TABLESTORE_ACCESS_KEY_ID

    阿里雲帳號或RAM使用者的AccessKey ID

    TABLESTORE_ACCESS_KEY_SECRET

    阿里雲帳號或RAM使用者的AccessKey Secret

    DASHSCOPE_API_KEY

    百鍊平台API Key(用於調用大模型)

    export TABLESTORE_ENDPOINT="https://<instance>.<region-id>.ots.aliyuncs.com"
    export TABLESTORE_INSTANCE_NAME="<instance-name>"
    export TABLESTORE_ACCESS_KEY_ID="<your-access-key-id>"
    export TABLESTORE_ACCESS_KEY_SECRET="<your-access-key-secret>"
    export DASHSCOPE_API_KEY="<your-dashscope-api-key>"

範例程式碼:Memory

Memory Service 用於管理 AI Agent 的長期記憶,支援跨會話的資訊儲存和語義檢索。

添加長期記憶

以下樣本示範如何將使用者資訊作為長期記憶寫入 Tablestore。將代碼儲存到 examples/tablestore_memory_add.py 檔案。

import os
import asyncio

from tablestore import AsyncOTSClient
from agentscope_runtime.engine.services.memory import TablestoreMemoryService
from agentscope_runtime.engine.schemas.agent_schemas import (
    Message,
    MessageType,
    Role,
    TextContent,
    ContentType,
)


async def main():
    endpoint = os.getenv("TABLESTORE_ENDPOINT")
    instance_name = os.getenv("TABLESTORE_INSTANCE_NAME")
    access_key_id = os.getenv("TABLESTORE_ACCESS_KEY_ID")
    access_key_secret = os.getenv("TABLESTORE_ACCESS_KEY_SECRET")

    client = AsyncOTSClient(endpoint, access_key_id, access_key_secret, instance_name)

    memory_service = TablestoreMemoryService(
        tablestore_client=client,
        table_name="agentscope_runtime_memory",
    )
    await memory_service.start()
    print("Memory Service 啟動成功")

    user_id = "user_001"
    session_id = "session_001"

    messages = [
        Message(
            type=MessageType.MESSAGE,
            role=Role.USER,
            content=[TextContent(type=ContentType.TEXT, text="我喜歡吃川菜")],
        ),
        Message(
            type=MessageType.MESSAGE,
            role=Role.USER,
            content=[TextContent(type=ContentType.TEXT, text="我的生日是 3 月 15 日")],
        ),
    ]

    print(f"準備寫入 {len(messages)} 條記憶:")
    for msg in messages:
        print(f"  - [{msg.role}] {msg.get_text_content()}")

    await memory_service.add_memory(
        user_id=user_id,
        messages=messages,
        session_id=session_id,
    )
    print(f"寫入成功, user_id={user_id}, session_id={session_id}")

    await memory_service.stop()


if __name__ == "__main__":
    asyncio.run(main())

運行樣本:

python examples/tablestore_memory_add.py

搜尋長期記憶

以下樣本示範如何通過語義檢索查詢使用者的長期記憶。將代碼儲存到 examples/tablestore_memory_search.py 檔案。

說明

資料寫入後,多元索引需要幾秒鐘完成同步。若使用後續範例程式碼查詢不到資料,需等待多元索引同步完成。

import os
import asyncio

from tablestore import AsyncOTSClient
from agentscope_runtime.engine.services.memory import TablestoreMemoryService
from agentscope_runtime.engine.schemas.agent_schemas import (
    Message,
    MessageType,
    Role,
    TextContent,
    ContentType,
)


async def main():
    endpoint = os.getenv("TABLESTORE_ENDPOINT")
    instance_name = os.getenv("TABLESTORE_INSTANCE_NAME")
    access_key_id = os.getenv("TABLESTORE_ACCESS_KEY_ID")
    access_key_secret = os.getenv("TABLESTORE_ACCESS_KEY_SECRET")

    client = AsyncOTSClient(endpoint, access_key_id, access_key_secret, instance_name)

    memory_service = TablestoreMemoryService(
        tablestore_client=client,
        table_name="agentscope_runtime_memory",
    )
    await memory_service.start()
    print("Memory Service 啟動成功")

    user_id = "user_001"

    query = [
        Message(
            type=MessageType.MESSAGE,
            role=Role.USER,
            content=[TextContent(type=ContentType.TEXT, text="我喜歡什麼口味的菜")],
        )
    ]
    print(f"搜尋查詢: {query[0].get_text_content()}")

    results = await memory_service.search_memory(user_id=user_id, messages=query)
    print(f"搜尋結果, 找到 {len(results)} 條相關記憶:")
    for mem in results:
        print(f"  - [{mem.role}] {mem.get_text_content()}")

    all_memories = await memory_service.list_memory(user_id=user_id)
    print(f"使用者 {user_id} 共有 {len(all_memories)} 條長期記憶")

    await memory_service.stop()


if __name__ == "__main__":
    asyncio.run(main())

運行樣本:

python examples/tablestore_memory_search.py

範例程式碼:Session

Session History Service 用於管理會話和訊息記錄,儲存對話上下文。

建立會話並添加訊息

以下樣本示範如何建立會話並寫入對話訊息。將代碼儲存到 examples/tablestore_session_create.py 檔案。

import os
import asyncio

from tablestore import AsyncOTSClient
from agentscope_runtime.engine.services.session_history import TablestoreSessionHistoryService
from agentscope_runtime.engine.schemas.agent_schemas import (
    Message,
    MessageType,
    Role,
    TextContent,
    ContentType,
)


async def main():
    endpoint = os.getenv("TABLESTORE_ENDPOINT")
    instance_name = os.getenv("TABLESTORE_INSTANCE_NAME")
    access_key_id = os.getenv("TABLESTORE_ACCESS_KEY_ID")
    access_key_secret = os.getenv("TABLESTORE_ACCESS_KEY_SECRET")

    client = AsyncOTSClient(endpoint, access_key_id, access_key_secret, instance_name)

    session_service = TablestoreSessionHistoryService(
        tablestore_client=client,
        session_table_name="agentscope_runtime_session",
        message_table_name="agentscope_runtime_message",
    )
    await session_service.start()
    print("Session History Service 啟動成功")

    user_id = "user_001"
    session_id = "session_003"

    session = await session_service.create_session(
        user_id=user_id,
        session_id=session_id,
    )
    print(f"建立會話成功: session_id={session.id}, user_id={user_id}")

    user_msg = Message(
        type=MessageType.MESSAGE,
        role=Role.USER,
        content=[TextContent(type=ContentType.TEXT, text="你好,我叫張三")],
    )
    await session_service.append_message(session, user_msg)
    print(f"寫入訊息: [{user_msg.role}] {user_msg.get_text_content()}")

    assistant_msg = Message(
        type=MessageType.MESSAGE,
        role=Role.ASSISTANT,
        content=[TextContent(type=ContentType.TEXT, text="你好張三!有什麼可以協助你的?")],
    )
    await session_service.append_message(session, assistant_msg)
    print(f"寫入訊息: [{assistant_msg.role}] {assistant_msg.get_text_content()}")

    print(f"會話 {session_id} 已寫入 2 條訊息")

    await session_service.stop()


if __name__ == "__main__":
    asyncio.run(main())

運行樣本:

python examples/tablestore_session_create.py

擷取會話歷史

以下樣本示範如何擷取指定會話的歷史訊息記錄。將代碼儲存到 examples/tablestore_session_get.py 檔案。

說明

資料寫入後,多元索引需要幾秒鐘完成同步。若使用後續範例程式碼查詢不到資料,需等待多元索引同步完成。

import os
import asyncio

from tablestore import AsyncOTSClient
from agentscope_runtime.engine.services.session_history import TablestoreSessionHistoryService


async def main():
    endpoint = os.getenv("TABLESTORE_ENDPOINT")
    instance_name = os.getenv("TABLESTORE_INSTANCE_NAME")
    access_key_id = os.getenv("TABLESTORE_ACCESS_KEY_ID")
    access_key_secret = os.getenv("TABLESTORE_ACCESS_KEY_SECRET")

    client = AsyncOTSClient(endpoint, access_key_id, access_key_secret, instance_name)

    session_service = TablestoreSessionHistoryService(
        tablestore_client=client,
        session_table_name="agentscope_runtime_session",
        message_table_name="agentscope_runtime_message",
    )
    await session_service.start()
    print("Session History Service 啟動成功")

    user_id = "user_001"
    session_id = "session_003"

    session = await session_service.get_session(user_id, session_id)
    print(f"擷取會話 {session_id}, 共 {len(session.messages)} 條訊息:")
    for msg in session.messages:
        text = msg.get_text_content() if hasattr(msg, "get_text_content") else str(msg)
        role = msg.role if hasattr(msg, "role") else "unknown"
        print(f"  - [{role}] {text}")

    sessions = await session_service.list_sessions(user_id=user_id)
    print(f"使用者 {user_id} 共有 {len(sessions)} 個會話")

    await session_service.stop()


if __name__ == "__main__":
    asyncio.run(main())

運行樣本:

python examples/tablestore_session_get.py

範例程式碼:互動式聊天 Demo

以下樣本展示如何構建一個完整的互動式聊天應用,整合大模型調用、會話歷史儲存和長期記憶檢索。將代碼儲存到 examples/tablestore_chat_demo.py 檔案。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# 本樣本展示如何構建一個互動式聊天 Demo,使用 Tablestore 儲存會話歷史和長期記憶
# 使用前請配置環境變數:
#   TABLESTORE_ENDPOINT, TABLESTORE_INSTANCE_NAME,
#   TABLESTORE_ACCESS_KEY_ID, TABLESTORE_ACCESS_KEY_SECRET, DASHSCOPE_API_KEY

import os
import asyncio
import uuid
import sys
import io
import logging

from tablestore import AsyncOTSClient
from dashscope import Generation
from rich.console import Console
from rich.live import Live
from rich.spinner import Spinner

console = Console()

from agentscope_runtime.engine.services.memory import TablestoreMemoryService
from agentscope_runtime.engine.services.session_history import TablestoreSessionHistoryService
from agentscope_runtime.engine.schemas.agent_schemas import (
    Message,
    MessageType,
    Role,
    TextContent,
    ContentType,
)

memory_service = None
session_service = None
current_session = None
current_session_id = None
user_id = "cli_user"
chat_history = []

# Table and index names
MEMORY_TABLE = "chat_demo_memory"
SESSION_TABLE = "chat_demo_session"
MESSAGE_TABLE = "chat_demo_message"
# Memory service uses a fixed index name in SDK
MEMORY_SEARCH_INDEX = "agentscope_runtime_knowledge_search_index_name"
# Session service uses custom index names to avoid conflicts with other demos
SESSION_SECONDARY_INDEX = f"{SESSION_TABLE}_secondary_index"
SESSION_SEARCH_INDEX = f"{SESSION_TABLE}_search_index"
MESSAGE_SECONDARY_INDEX = f"{MESSAGE_TABLE}_secondary_index"
MESSAGE_SEARCH_INDEX = f"{MESSAGE_TABLE}_search_index"


async def delete_table_with_indexes(client, table_name):
    """Delete a table and all its indexes using native Tablestore SDK."""
    import tablestore

    try:
        # Delete search indexes first
        try:
            search_indexes = await client.list_search_index(table_name)
            for index_tuple in search_indexes:
                await client.delete_search_index(index_tuple[0], index_tuple[1])
        except Exception:
            pass

        # Delete the table
        await client.delete_table(table_name)
        print(f"  Deleted table: {table_name}")
    except tablestore.OTSServiceError as e:
        if "OTSObjectNotExist" not in str(e):
            print(f"  Warning: Failed to delete {table_name}: {e}")


async def init_services():
    global memory_service, session_service

    client = AsyncOTSClient(
        os.getenv("TABLESTORE_ENDPOINT"),
        os.getenv("TABLESTORE_ACCESS_KEY_ID"),
        os.getenv("TABLESTORE_ACCESS_KEY_SECRET"),
        os.getenv("TABLESTORE_INSTANCE_NAME"),
    )

    # Check existing tables using native SDK
    existing_tables = await client.list_table()

    memory_exists = MEMORY_TABLE in existing_tables
    session_existing = [t for t in [SESSION_TABLE, MESSAGE_TABLE] if t in existing_tables]
    session_all_exist = len(session_existing) == 2
    session_none_exist = len(session_existing) == 0

    # Handle incomplete session tables - delete existing ones and recreate
    if not session_all_exist and not session_none_exist:
        print(f"Incomplete session tables detected: {session_existing}")
        print("Cleaning up...")
        for t in session_existing:
            await delete_table_with_indexes(client, t)
        # Wait for deletion to complete
        await asyncio.sleep(3)
        session_all_exist = False

    # Create service instances
    memory_service = TablestoreMemoryService(
        tablestore_client=client,
        table_name=MEMORY_TABLE,
    )
    session_service = TablestoreSessionHistoryService(
        tablestore_client=client,
        session_table_name=SESSION_TABLE,
        message_table_name=MESSAGE_TABLE,
    )

    # Suppress SDK logs during init
    logging.getLogger("tablestore_for_agent_memory").setLevel(logging.CRITICAL)
    original_stderr = sys.stderr
    sys.stderr = io.StringIO()

    try:
        # Init memory service
        if memory_exists:
            _init_memory_service_without_create(memory_service)
        else:
            await memory_service.start()

        # Init session service - always use custom init to avoid index name conflicts
        if session_all_exist:
            _init_session_service_without_create(session_service)
        else:
            # Create tables and indexes using SDK with custom index names
            await _create_session_tables(client)
            _init_session_service_without_create(session_service)

    finally:
        sys.stderr = original_stderr
        logging.getLogger("tablestore_for_agent_memory").setLevel(logging.WARNING)

    print("Services initialized")


async def _create_session_tables(client):
    """Create session and message tables with custom index names."""
    from tablestore_for_agent_memory.memory.async_memory_store import AsyncMemoryStore

    # Create the memory store with custom index names
    store = AsyncMemoryStore(
        tablestore_client=client,
        session_table_name=SESSION_TABLE,
        message_table_name=MESSAGE_TABLE,
        session_secondary_index_name=SESSION_SECONDARY_INDEX,
        session_search_index_name=SESSION_SEARCH_INDEX,
        message_secondary_index_name=MESSAGE_SECONDARY_INDEX,
        message_search_index_name=MESSAGE_SEARCH_INDEX,
    )
    # Init tables and indexes
    await store.init_table()
    # Wait for table creation
    await asyncio.sleep(1)


def _init_memory_service_without_create(service):
    """Initialize memory service store without creating tables."""
    from tablestore_for_agent_memory.knowledge.async_knowledge_store import AsyncKnowledgeStore

    service._knowledge_store = AsyncKnowledgeStore(
        tablestore_client=service._tablestore_client,
        vector_dimension=service._vector_dimension,
        enable_multi_tenant=False,
        table_name=service._table_name,
        search_index_name=MEMORY_SEARCH_INDEX,
        search_index_schema=service._search_index_schema,
        text_field=service._text_field,
        embedding_field=service._embedding_field,
        vector_metric_type=service._vector_metric_type,
    )


def _init_session_service_without_create(service):
    """Initialize session service store without creating tables."""
    from tablestore_for_agent_memory.memory.async_memory_store import AsyncMemoryStore

    service._memory_store = AsyncMemoryStore(
        tablestore_client=service._tablestore_client,
        session_table_name=SESSION_TABLE,
        message_table_name=MESSAGE_TABLE,
        session_secondary_index_name=SESSION_SECONDARY_INDEX,
        session_search_index_name=SESSION_SEARCH_INDEX,
        message_secondary_index_name=MESSAGE_SECONDARY_INDEX,
        message_search_index_name=MESSAGE_SEARCH_INDEX,
        session_secondary_index_meta=service._session_secondary_index_meta,
        session_search_index_schema=service._session_search_index_schema,
        message_search_index_schema=service._message_search_index_schema,
    )


async def create_new_session():
    global current_session, current_session_id, chat_history
    current_session_id = f"session_{uuid.uuid4().hex[:8]}"
    current_session = await session_service.create_session(
        user_id=user_id,
        session_id=current_session_id,
    )
    chat_history = []
    print(f"New session created: {current_session_id}")


def create_message(role: str, text: str) -> Message:
    return Message(
        type=MessageType.MESSAGE,
        role=role,
        content=[TextContent(type=ContentType.TEXT, text=text)],
    )


async def save_message(role: str, text: str):
    if current_session is None:
        return
    msg = create_message(role, text)
    await session_service.append_message(current_session, msg)


async def search_memory(query: str) -> str:
    msg = create_message(Role.USER, query)
    results = await memory_service.search_memory(user_id=user_id, messages=[msg])
    if not results:
        return ""
    texts = [r.get_text_content() for r in results[:3] if r.get_text_content()]
    return "\n".join([f"  - {t}" for t in texts])


async def add_memory(text: str):
    msg = create_message(Role.USER, text)
    await memory_service.add_memory(
        user_id=user_id,
        messages=[msg],
        session_id=current_session_id,
    )
    print(f"Memory saved: {text}")


def call_llm(messages: list, memory_context: str = "") -> str:
    system_prompt = "You are a helpful AI assistant."
    if memory_context:
        system_prompt += f"\n\nUser's historical memory:\n{memory_context}"

    dashscope_messages = [{"role": "system", "content": system_prompt}]
    for role, content in messages:
        dashscope_messages.append({"role": role, "content": content})

    response = Generation.call(
        model="qwen-turbo",
        messages=dashscope_messages,
        api_key=os.getenv("DASHSCOPE_API_KEY"),
        result_format="message",
    )
    if response.status_code == 200:
        return response.output.choices[0].message.content
    return f"Error: {response.code} - {response.message}"


async def chat(user_input: str):
    global chat_history
    memory_context = await search_memory(user_input)
    if memory_context:
        print(f"[Memory] Found related memories:\n{memory_context}")

    messages = chat_history + [("user", user_input)]

    with Live(Spinner("dots", text="Thinking...", style="cyan"), console=console, refresh_per_second=10):
        response = call_llm(messages, memory_context)

    await save_message(Role.USER, user_input)
    await save_message(Role.ASSISTANT, response)
    chat_history.append(("user", user_input))
    chat_history.append(("assistant", response))

    console.print(f"\n[bold green]Assistant:[/bold green] {response}\n")


async def list_sessions():
    sessions = await session_service.list_sessions(user_id=user_id)
    print(f"Total {len(sessions)} sessions:")
    for s in sessions[:10]:
        print(f"  - {s.id}")


async def list_memories():
    memories = await memory_service.list_memory(user_id=user_id)
    print(f"Total {len(memories)} memories:")
    for m in memories[:10]:
        text = m.get_text_content()
        if text:
            print(f"  - {text[:60]}...")


async def load_session(session_id: str):
    global current_session, current_session_id, chat_history
    current_session_id = session_id
    current_session = await session_service.get_session(user_id, session_id)
    chat_history = []
    for msg in current_session.messages:
        text = msg.get_text_content() if hasattr(msg, "get_text_content") else str(msg)
        role = msg.role if hasattr(msg, "role") else "unknown"
        chat_history.append((role, text))
    print(f"Loaded session: {session_id}, {len(current_session.messages)} messages")


def print_help():
    print("""
Commands:
  /new          - Create new session
  /sessions     - List all sessions
  /load <id>    - Load session by ID
  /remember <x> - Save memory
  /memories     - List all memories
  /history      - Show current chat history
  /help         - Show this help
  /quit         - Exit

Otherwise, just type your message to chat.
""")


async def main():
    await init_services()
    await create_new_session()
    print_help()

    while True:
        try:
            user_input = input("You: ").strip()
        except (EOFError, KeyboardInterrupt):
            print("\nBye!")
            break

        if not user_input:
            continue

        if user_input == "/quit":
            print("Bye!")
            break
        elif user_input == "/help":
            print_help()
        elif user_input == "/new":
            await create_new_session()
        elif user_input == "/sessions":
            await list_sessions()
        elif user_input == "/memories":
            await list_memories()
        elif user_input == "/history":
            print(f"Current session: {current_session_id}")
            for role, text in chat_history:
                print(f"  [{role}] {text[:80]}...")
        elif user_input.startswith("/load "):
            session_id = user_input[6:].strip()
            await load_session(session_id)
        elif user_input.startswith("/remember "):
            text = user_input[10:].strip()
            await add_memory(text)
        else:
            await chat(user_input)

    await memory_service.stop()
    await session_service.stop()


if __name__ == "__main__":
    asyncio.run(main())
  • 運行樣本:

    python examples/tablestore_chat_demo.py
  • 互動命令:

    命令

    功能

    /new

    建立新會話

    /sessions

    列出所有會話

    /load <id>

    載入指定會話

    /remember <內容>

    儲存長期記憶

    /memories

    列出所有記憶

    /history

    顯示當前會話歷史

    /help

    顯示協助資訊

    /quit

    退出

相關文檔