すべてのプロダクト
Search
ドキュメントセンター

Tablestore:AgentScope Runtime と Tablestore の連携

最終更新日:Jun 22, 2026

AgentScope Runtime は、メモリサービスおよびセッション履歴サービスの永続的なバックエンドとして Tablestore をサポートしています。これにより、AI エージェントアプリケーションのセッション履歴と長期記憶を、高性能かつ高可用性のストレージに保存できます。

主な機能

サービス

機能

ユースケース

TablestoreMemoryService

長期記憶を保存し、全文検索をサポート

ユーザー設定、ナレッジベース、セッションをまたぐメモリ

TablestoreSessionHistoryService

セッション履歴を保存

会話コンテキストとメッセージ記録

(.venv) kitkat@U-HCR97YPR-2059 agentscope-runtime %

前提条件

開始する前に、環境を準備し、必要な依存関係をインストールします。

  1. 環境の準備Python ランタイム環境がインストールされていることを確認します。python3 --version コマンドを実行してバージョンを確認します。

  2. プロジェクトのダウンロード: AgentScope Runtime 1.0.5 のリリースパッケージ agentscope-runtime.zip をダウンロードして解凍します。解凍後、プロジェクトディレクトリに移動します。

    重要

    Tablestore 連携機能は AgentScope Runtime 1.0.5 およびそれ以前のバージョンでのみ利用できます。指定されたリリースパッケージを使用してください。

    cd agentscope-runtime
  3. 仮想環境の作成: Python の仮想環境を作成して有効化します。

    python3 -m venv .venv && source .venv/bin/activate
  4. 依存関係のインストール: プロジェクトのコア依存関係と Tablestore 拡張をインストールします。

    pip3 install -e . && pip3 install tablestore-for-agent-memory langchain-community
  5. 環境変数の設定: Tablestore への接続と LLM の呼び出しに必要な、次の環境変数を設定します。

    パラメーター

    説明

    TABLESTORE_ENDPOINT

    Tablestore インスタンスのエンドポイント。 Tablestore コンソールで確認できます。

    TABLESTORE_INSTANCE_NAME

    Tablestore インスタンス名。

    TABLESTORE_ACCESS_KEY_ID

    Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID。

    TABLESTORE_ACCESS_KEY_SECRET

    Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey Secret。

    DASHSCOPE_API_KEY

    LLM 呼び出しに使用する Model Studio の API キー

    export TABLESTORE_ENDPOINT="https://<instance>.<region-id>.ots.aliyuncs.com"
    export TABLESTORE_INSTANCE_NAME="<instance-name>"
    export TABLESTORE_ACCESS_KEY_ID="<your-access-key-id>"
    export TABLESTORE_ACCESS_KEY_SECRET="<your-access-key-secret>"
    export DASHSCOPE_API_KEY="<your-dashscope-api-key>"

サンプルコード:メモリ

メモリサービスは、AI エージェントの長期記憶を管理し、セッションをまたぐ情報の保存とセマンティック検索をサポートします。

長期記憶の追加

以下の例では、ユーザー情報を長期記憶として Tablestore に書き込む方法を示します。コードをexamples/tablestore_memory_add.pyという名前のファイルに保存します。

import os
import asyncio
from tablestore import AsyncOTSClient
from agentscope_runtime.engine.services.memory import TablestoreMemoryService
from agentscope_runtime.engine.schemas.agent_schemas import (
    Message,
    MessageType,
    Role,
    TextContent,
    ContentType,
)
async def main():
    endpoint = os.getenv("TABLESTORE_ENDPOINT")
    instance_name = os.getenv("TABLESTORE_INSTANCE_NAME")
    access_key_id = os.getenv("TABLESTORE_ACCESS_KEY_ID")
    access_key_secret = os.getenv("TABLESTORE_ACCESS_KEY_SECRET")
    client = AsyncOTSClient(endpoint, access_key_id, access_key_secret, instance_name)
    memory_service = TablestoreMemoryService(
        tablestore_client=client,
        table_name="agentscope_runtime_memory",
    )
    await memory_service.start()
    print("Memory Service started successfully")
    user_id = "user_001"
    session_id = "session_001"
    messages = [
        Message(
            type=MessageType.MESSAGE,
            role=Role.USER,
            content=[TextContent(type=ContentType.TEXT, text="I like Sichuan food")],
        ),
        Message(
            type=MessageType.MESSAGE,
            role=Role.USER,
            content=[TextContent(type=ContentType.TEXT, text="My birthday is March 15th")],
        ),
    ]
    print(f"Preparing to write {len(messages)} memories:")
    for msg in messages:
        print(f"  - [{msg.role}] {msg.get_text_content()}")
    await memory_service.add_memory(
        user_id=user_id,
        messages=messages,
        session_id=session_id,
    )
    print(f"Write successful, user_id={user_id}, session_id={session_id}")
    await memory_service.stop()
if __name__ == "__main__":
    asyncio.run(main())

サンプルを実行します:

python examples/tablestore_memory_add.py

長期記憶の検索

次の例では、セマンティック検索を使用してユーザーの長期記憶をクエリする方法を示します。コードを examples/tablestore_memory_search.py という名前のファイルに保存します。

説明

データの書き込み後、検索インデックスの同期に数秒かかります。サンプルコードでデータが見つからない場合は、同期が完了するまで待ってください。

import os
import asyncio
from tablestore import AsyncOTSClient
from agentscope_runtime.engine.services.memory import TablestoreMemoryService
from agentscope_runtime.engine.schemas.agent_schemas import (
    Message,
    MessageType,
    Role,
    TextContent,
    ContentType,
)
async def main():
    endpoint = os.getenv("TABLESTORE_ENDPOINT")
    instance_name = os.getenv("TABLESTORE_INSTANCE_NAME")
    access_key_id = os.getenv("TABLESTORE_ACCESS_KEY_ID")
    access_key_secret = os.getenv("TABLESTORE_ACCESS_KEY_SECRET")
    client = AsyncOTSClient(endpoint, access_key_id, access_key_secret, instance_name)
    memory_service = TablestoreMemoryService(
        tablestore_client=client,
        table_name="agentscope_runtime_memory",
    )
    await memory_service.start()
    print("Memory Service started successfully")
    user_id = "user_001"
    query = [
        Message(
            type=MessageType.MESSAGE,
            role=Role.USER,
            content=[TextContent(type=ContentType.TEXT, text="What kind of food do I like?")],
        )
    ]
    print(f"Search query: {query[0].get_text_content()}")
    results = await memory_service.search_memory(user_id=user_id, messages=query)
    print(f"Search results, found {len(results)} related memories:")
    for mem in results:
        print(f"  - [{mem.role}] {mem.get_text_content()}")
    all_memories = await memory_service.list_memory(user_id=user_id)
    print(f"User {user_id} has a total of {len(all_memories)} long-term memories")
    await memory_service.stop()
if __name__ == "__main__":
    asyncio.run(main())

サンプルを実行します:

python examples/tablestore_memory_search.py

サンプルコード:セッション

セッション履歴サービスは、会話コンテキストを保持するために、セッションとメッセージ記録を管理します。

セッションの作成とメッセージの追加

次の例では、セッションを作成し、会話メッセージを書き込む方法を示します。コードを examples/tablestore_session_create.py という名前のファイルに保存します。

import os
import asyncio
from tablestore import AsyncOTSClient
from agentscope_runtime.engine.services.session_history import TablestoreSessionHistoryService
from agentscope_runtime.engine.schemas.agent_schemas import (
    Message,
    MessageType,
    Role,
    TextContent,
    ContentType,
)
async def main():
    endpoint = os.getenv("TABLESTORE_ENDPOINT")
    instance_name = os.getenv("TABLESTORE_INSTANCE_NAME")
    access_key_id = os.getenv("TABLESTORE_ACCESS_KEY_ID")
    access_key_secret = os.getenv("TABLESTORE_ACCESS_KEY_SECRET")
    client = AsyncOTSClient(endpoint, access_key_id, access_key_secret, instance_name)
    session_service = TablestoreSessionHistoryService(
        tablestore_client=client,
        session_table_name="agentscope_runtime_session",
        message_table_name="agentscope_runtime_message",
    )
    await session_service.start()
    print("Session History Service started successfully")
    user_id = "user_001"
    session_id = "session_003"
    session = await session_service.create_session(
        user_id=user_id,
        session_id=session_id,
    )
    print(f"Session created successfully: session_id={session.id}, user_id={user_id}")
    user_msg = Message(
        type=MessageType.MESSAGE,
        role=Role.USER,
        content=[TextContent(type=ContentType.TEXT, text="Hello, my name is John Doe")],
    )
    await session_service.append_message(session, user_msg)
    print(f"Writing message: [{user_msg.role}] {user_msg.get_text_content()}")
    assistant_msg = Message(
        type=MessageType.MESSAGE,
        role=Role.ASSISTANT,
        content=[TextContent(type=ContentType.TEXT, text="Hello John Doe! How can I help you?")],
    )
    await session_service.append_message(session, assistant_msg)
    print(f"Writing message: [{assistant_msg.role}] {assistant_msg.get_text_content()}")
    print(f"Wrote 2 messages to session {session_id}")
    await session_service.stop()
if __name__ == "__main__":
    asyncio.run(main())

サンプルを実行します:

python examples/tablestore_session_create.py

セッション履歴の取得

次の例では、特定のセッションのメッセージレコードを取得する方法を示します。コードを examples/tablestore_session_get.py という名前のファイルに保存します。

説明

データの書き込み後、検索インデックスの同期に数秒かかります。サンプルコードでデータが見つからない場合は、同期が完了するまで待ってください。

import os
import asyncio
from tablestore import AsyncOTSClient
from agentscope_runtime.engine.services.session_history import TablestoreSessionHistoryService
async def main():
    endpoint = os.getenv("TABLESTORE_ENDPOINT")
    instance_name = os.getenv("TABLESTORE_INSTANCE_NAME")
    access_key_id = os.getenv("TABLESTORE_ACCESS_KEY_ID")
    access_key_secret = os.getenv("TABLESTORE_ACCESS_KEY_SECRET")
    client = AsyncOTSClient(endpoint, access_key_id, access_key_secret, instance_name)
    session_service = TablestoreSessionHistoryService(
        tablestore_client=client,
        session_table_name="agentscope_runtime_session",
        message_table_name="agentscope_runtime_message",
    )
    await session_service.start()
    print("Session History Service started successfully")
    user_id = "user_001"
    session_id = "session_003"
    session = await session_service.get_session(user_id, session_id)
    print(f"Getting session {session_id}, found {len(session.messages)} messages:")
    for msg in session.messages:
        text = msg.get_text_content() if hasattr(msg, "get_text_content") else str(msg)
        role = msg.role if hasattr(msg, "role") else "unknown"
        print(f"  - [{role}] {text}")
    sessions = await session_service.list_sessions(user_id=user_id)
    print(f"User {user_id} has a total of {len(sessions)} sessions")
    await session_service.stop()
if __name__ == "__main__":
    asyncio.run(main())

サンプルを実行します:

python examples/tablestore_session_get.py

サンプルコード:対話型チャットデモ

以下の例では、LLM 呼び出し、セッション履歴の保存、長期記憶の取得を統合した完全な対話型チャットアプリケーションを構築する方法を説明します。コードを examples/tablestore_chat_demo.py という名前のファイルに保存します。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# この例では、Tablestore を使用してセッション履歴と長期記憶を保存する対話型チャットデモを構築する方法を説明します。
# 実行前に、次の環境変数を設定してください:
#   TABLESTORE_ENDPOINT, TABLESTORE_INSTANCE_NAME,
#   TABLESTORE_ACCESS_KEY_ID, TABLESTORE_ACCESS_KEY_SECRET, DASHSCOPE_API_KEY
import os
import asyncio
import uuid
import sys
import io
import logging
from tablestore import AsyncOTSClient
from dashscope import Generation
from rich.console import Console
from rich.live import Live
from rich.spinner import Spinner
console = Console()
from agentscope_runtime.engine.services.memory import TablestoreMemoryService
from agentscope_runtime.engine.services.session_history import TablestoreSessionHistoryService
from agentscope_runtime.engine.schemas.agent_schemas import (
    Message,
    MessageType,
    Role,
    TextContent,
    ContentType,
)
memory_service = None
session_service = None
current_session = None
current_session_id = None
user_id = "cli_user"
chat_history = []
# テーブル名とインデックス名
MEMORY_TABLE = "chat_demo_memory"
SESSION_TABLE = "chat_demo_session"
MESSAGE_TABLE = "chat_demo_message"
# メモリサービスは SDK 内で固定のインデックス名を使用します
MEMORY_SEARCH_INDEX = "agentscope_runtime_knowledge_search_index_name"
# セッションサービスは、他のデモとの競合を避けるためにカスタムインデックス名を使用します
SESSION_SECONDARY_INDEX = f"{SESSION_TABLE}_secondary_index"
SESSION_SEARCH_INDEX = f"{SESSION_TABLE}_search_index"
MESSAGE_SECONDARY_INDEX = f"{MESSAGE_TABLE}_secondary_index"
MESSAGE_SEARCH_INDEX = f"{MESSAGE_TABLE}_search_index"
async def delete_table_with_indexes(client, table_name):
    """ネイティブの Tablestore SDK を使用して、テーブルとすべてのインデックスを削除します。"""
    import tablestore
    try:
        # 先に検索インデックスを削除します
        try:
            search_indexes = await client.list_search_index(table_name)
            for index_tuple in search_indexes:
                await client.delete_search_index(index_tuple[0], index_tuple[1])
        except Exception:
            pass
        # テーブルを削除します
        await client.delete_table(table_name)
        print(f"  Deleted table: {table_name}")
    except tablestore.OTSServiceError as e:
        if "OTSObjectNotExist" not in str(e):
            print(f"  Warning: Failed to delete {table_name}: {e}")
async def init_services():
    global memory_service, session_service
    client = AsyncOTSClient(
        os.getenv("TABLESTORE_ENDPOINT"),
        os.getenv("TABLESTORE_ACCESS_KEY_ID"),
        os.getenv("TABLESTORE_ACCESS_KEY_SECRET"),
        os.getenv("TABLESTORE_INSTANCE_NAME"),
    )
    # ネイティブ SDK を使用して既存のテーブルを確認します
    existing_tables = await client.list_table()
    memory_exists = MEMORY_TABLE in existing_tables
    session_existing = [t for t in [SESSION_TABLE, MESSAGE_TABLE] if t in existing_tables]
    session_all_exist = len(session_existing) == 2
    session_none_exist = len(session_existing) == 0
    # セッションテーブルが不完全な場合は、既存のものを削除して再作成します
    if not session_all_exist and not session_none_exist:
        print(f"Incomplete session tables detected: {session_existing}")
        print("Cleaning up...")
        for t in session_existing:
            await delete_table_with_indexes(client, t)
        # 削除が完了するまで待機します
        await asyncio.sleep(3)
        session_all_exist = False
    # サービスインスタンスを作成します
    memory_service = TablestoreMemoryService(
        tablestore_client=client,
        table_name=MEMORY_TABLE,
    )
    session_service = TablestoreSessionHistoryService(
        tablestore_client=client,
        session_table_name=SESSION_TABLE,
        message_table_name=MESSAGE_TABLE,
    )
    # 初期化中は SDK ログを抑制します
    logging.getLogger("tablestore_for_agent_memory").setLevel(logging.CRITICAL)
    original_stderr = sys.stderr
    sys.stderr = io.StringIO()
    try:
        # メモリサービスを初期化します
        if memory_exists:
            _init_memory_service_without_create(memory_service)
        else:
            await memory_service.start()
        # セッションサービスを初期化します。インデックス名の競合を避けるため、常にカスタム初期化を使用します
        if session_all_exist:
            _init_session_service_without_create(session_service)
        else:
            # カスタムインデックス名を使用して、SDK でテーブルとインデックスを作成します
            await _create_session_tables(client)
            _init_session_service_without_create(session_service)
    finally:
        sys.stderr = original_stderr
        logging.getLogger("tablestore_for_agent_memory").setLevel(logging.WARNING)
    print("Services initialized")
async def _create_session_tables(client):
    """カスタムインデックス名でセッションテーブルとメッセージテーブルを作成します。"""
    from tablestore_for_agent_memory.memory.async_memory_store import AsyncMemoryStore
    # カスタムインデックス名でメモリストアを作成します
    store = AsyncMemoryStore(
        tablestore_client=client,
        session_table_name=SESSION_TABLE,
        message_table_name=MESSAGE_TABLE,
        session_secondary_index_name=SESSION_SECONDARY_INDEX,
        session_search_index_name=SESSION_SEARCH_INDEX,
        message_secondary_index_name=MESSAGE_SECONDARY_INDEX,
        message_search_index_name=MESSAGE_SEARCH_INDEX,
    )
    # テーブルとインデックスを初期化します
    await store.init_table()
    # テーブル作成を待機します
    await asyncio.sleep(1)
def _init_memory_service_without_create(service):
    """テーブルを作成せずにメモリサービスストアを初期化します。"""
    from tablestore_for_agent_memory.knowledge.async_knowledge_store import AsyncKnowledgeStore
    service._knowledge_store = AsyncKnowledgeStore(
        tablestore_client=service._tablestore_client,
        vector_dimension=service._vector_dimension,
        enable_multi_tenant=False,
        table_name=service._table_name,
        search_index_name=MEMORY_SEARCH_INDEX,
        search_index_schema=service._search_index_schema,
        text_field=service._text_field,
        embedding_field=service._embedding_field,
        vector_metric_type=service._vector_metric_type,
    )
def _init_session_service_without_create(service):
    """テーブルを作成せずにセッションサービスストアを初期化します。"""
    from tablestore_for_agent_memory.memory.async_memory_store import AsyncMemoryStore
    service._memory_store = AsyncMemoryStore(
        tablestore_client=service._tablestore_client,
        session_table_name=SESSION_TABLE,
        message_table_name=MESSAGE_TABLE,
        session_secondary_index_name=SESSION_SECONDARY_INDEX,
        session_search_index_name=SESSION_SEARCH_INDEX,
        message_secondary_index_name=MESSAGE_SECONDARY_INDEX,
        message_search_index_name=MESSAGE_SEARCH_INDEX,
        session_secondary_index_meta=service._session_secondary_index_meta,
        session_search_index_schema=service._session_search_index_schema,
        message_search_index_schema=service._message_search_index_schema,
    )
async def create_new_session():
    global current_session, current_session_id, chat_history
    current_session_id = f"session_{uuid.uuid4().hex[:8]}"
    current_session = await session_service.create_session(
        user_id=user_id,
        session_id=current_session_id,
    )
    chat_history = []
    print(f"New session created: {current_session_id}")
def create_message(role: str, text: str) -> Message:
    return Message(
        type=MessageType.MESSAGE,
        role=role,
        content=[TextContent(type=ContentType.TEXT, text=text)],
    )
async def save_message(role: str, text: str):
    if current_session is None:
        return
    msg = create_message(role, text)
    await session_service.append_message(current_session, msg)
async def search_memory(query: str) -> str:
    msg = create_message(Role.USER, query)
    results = await memory_service.search_memory(user_id=user_id, messages=[msg])
    if not results:
        return ""
    texts = [r.get_text_content() for r in results[:3] if r.get_text_content()]
    return "\n".join([f"  - {t}" for t in texts])
async def add_memory(text: str):
    msg = create_message(Role.USER, text)
    await memory_service.add_memory(
        user_id=user_id,
        messages=[msg],
        session_id=current_session_id,
    )
    print(f"Memory saved: {text}")
def call_llm(messages: list, memory_context: str = "") -> str:
    system_prompt = "You are a helpful AI assistant."
    if memory_context:
        system_prompt += f"\n\nUser's long-term memory:\n{memory_context}"
    dashscope_messages = [{"role": "system", "content": system_prompt}]
    for role, content in messages:
        dashscope_messages.append({"role": role, "content": content})
    response = Generation.call(
        model="qwen-turbo",
        messages=dashscope_messages,
        api_key=os.getenv("DASHSCOPE_API_KEY"),
        result_format="message",
    )
    if response.status_code == 200:
        return response.output.choices[0].message.content
    return f"Error: {response.code} - {response.message}"
async def chat(user_input: str):
    global chat_history
    memory_context = await search_memory(user_input)
    if memory_context:
        print(f"[Memory] Found related memories:\n{memory_context}")
    messages = chat_history + [("user", user_input)]
    with Live(Spinner("dots", text="Thinking...", style="cyan"), console=console, refresh_per_second=10):
        response = call_llm(messages, memory_context)
    await save_message(Role.USER, user_input)
    await save_message(Role.ASSISTANT, response)
    chat_history.append(("user", user_input))
    chat_history.append(("assistant", response))
    console.print(f"\n[bold green]Assistant:[/bold green] {response}\n")
async def list_sessions():
    sessions = await session_service.list_sessions(user_id=user_id)
    print(f"Total {len(sessions)} sessions:")
    for s in sessions[:10]:
        print(f"  - {s.id}")
async def list_memories():
    memories = await memory_service.list_memory(user_id=user_id)
    print(f"Total {len(memories)} memories:")
    for m in memories[:10]:
        text = m.get_text_content()
        if text:
            print(f"  - {text[:60]}...")
async def load_session(session_id: str):
    global current_session, current_session_id, chat_history
    current_session_id = session_id
    current_session = await session_service.get_session(user_id, session_id)
    chat_history = []
    for msg in current_session.messages:
        text = msg.get_text_content() if hasattr(msg, "get_text_content") else str(msg)
        role = msg.role if hasattr(msg, "role") else "unknown"
        chat_history.append((role, text))
    print(f"Loaded session: {session_id}, {len(current_session.messages)} messages")
def print_help():
    print("""
コマンド:
  /new          - 新しいセッションを作成
  /sessions     - すべてのセッションを一覧表示
  /load <id>    - ID を指定してセッションを読み込み
  /remember <content> - コンテンツを長期記憶として保存
  /memories     - すべてのメモリを一覧表示
  /history      - 現在のチャット履歴を表示
  /help         - このヘルプを表示
  /quit         - 終了
コマンド以外は、そのままメッセージを入力してチャットしてください。
""")
async def main():
    await init_services()
    await create_new_session()
    print_help()
    while True:
        try:
            user_input = input("You: ").strip()
        except (EOFError, KeyboardInterrupt):
            print("\nBye!")
            break
        if not user_input:
            continue
        if user_input == "/quit":
            print("Bye!")
            break
        elif user_input == "/help":
            print_help()
        elif user_input == "/new":
            await create_new_session()
        elif user_input == "/sessions":
            await list_sessions()
        elif user_input == "/memories":
            await list_memories()
        elif user_input == "/history":
            print(f"Current session: {current_session_id}")
            for role, text in chat_history:
                print(f"  [{role}] {text[:80]}...")
        elif user_input.startswith("/load "):
            session_id = user_input[6:].strip()
            await load_session(session_id)
        elif user_input.startswith("/remember "):
            text = user_input[10:].strip()
            await add_memory(text)
        else:
            await chat(user_input)
    await memory_service.stop()
    await session_service.stop()
if __name__ == "__main__":
    asyncio.run(main())
        
  • サンプルを実行します:

    python examples/tablestore_chat_demo.py
  • 対話コマンド:

    コマンド

    説明

    /new

    新しいセッションを作成

    /sessions

    すべてのセッションを一覧表示

    /load <id>

    ID を指定してセッションを読み込み

    /remember <content>

    コンテンツを長期記憶として保存

    /memories

    すべてのメモリを一覧表示

    /history

    現在のチャット履歴を表示

    /help

    このヘルプを表示

    /quit

    終了

関連ドキュメント