すべてのプロダクト
Search
ドキュメントセンター

Tablestore:AgentScope Runtime と Tablestore の連携

最終更新日:Feb 01, 2026

AgentScope Runtime は、メモリサービスとセッション履歴サービスの永続化バックエンドとして、Alibaba Cloud Tablestore の使用をサポートしています。これにより、AI エージェントアプリケーションにおけるセッション履歴と長期メモリのために、高性能かつ高可用性 (HA) のストレージが提供されます。

2026-01-30_17-03-06 (1)

主な特徴

サービス

特徴

シナリオ

TablestoreMemoryService

長期メモリを格納し、フルテキストインデックスをサポート

ユーザープリファレンス、ナレッジベース、セッション横断メモリ

TablestoreSessionHistoryService

セッション履歴を格納

会話コンテキストとメッセージレコード

事前準備

Tablestore ストレージサービスを使用する前に、以下の環境設定と依存関係のインストールを完了してください。

  1. 環境の準備Python ランタイム環境がインストールされていることを確認してください。python3 --version コマンドを実行してバージョンを確認します。

  2. プロジェクトのダウンロード: AgentScope Runtime 1.0.5 のリリースパッケージ agentscope-runtime.zip をダウンロードして解凍します。解凍後、プロジェクトフォルダに移動します。

    重要

    Tablestore 連携機能は AgentScope Runtime 1.0.5 以前のバージョンでのみ利用可能です。必ず指定されたリリースパッケージを使用してください。

    cd agentscope-runtime
  3. 仮想環境の作成: Python 仮想環境を作成してアクティベートします。

    python3 -m venv .venv && source .venv/bin/activate
  4. 依存関係のインストール: コアプロジェクトの依存関係と Tablestore 拡張機能をインストールします。

    pip3 install -e . && pip3 install tablestore-for-agent-memory langchain-community
  5. 環境変数の設定: Tablestore への接続と大規模言語モデル (LLM) の呼び出しに必要な以下の環境変数を設定します。

    環境変数

    説明

    TABLESTORE_ENDPOINT

    Tablestore インスタンスのエンドポイント。Tablestore コンソールで確認できます。

    TABLESTORE_INSTANCE_NAME

    Tablestore インスタンスの名前。

    TABLESTORE_ACCESS_KEY_ID

    ご利用の Alibaba Cloud アカウントまたは Resource Access Management (RAM) ユーザーの AccessKey ID。

    TABLESTORE_ACCESS_KEY_SECRET

    ご利用の Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey Secret。

    DASHSCOPE_API_KEY

    Alibaba Cloud Model Studio プラットフォームの API キー。大規模言語モデル (LLM) の呼び出しに使用されます。

    export TABLESTORE_ENDPOINT="https://<instance>.<region-id>.ots.aliyuncs.com"
    export TABLESTORE_INSTANCE_NAME="<instance-name>"
    export TABLESTORE_ACCESS_KEY_ID="<your-access-key-id>"
    export TABLESTORE_ACCESS_KEY_SECRET="<your-access-key-secret>"
    export DASHSCOPE_API_KEY="<your-dashscope-api-key>"

コード例:メモリ

メモリサービスは、AI エージェントの長期メモリを管理します。セッションをまたいだ情報の格納とセマンティック検索をサポートします。

長期メモリの追加

以下の例は、ユーザー情報を長期メモリとして Tablestore に書き込む方法を示しています。コードを examples/tablestore_memory_add.py ファイルに保存してください。

import os
import asyncio

from tablestore import AsyncOTSClient
from agentscope_runtime.engine.services.memory import TablestoreMemoryService
from agentscope_runtime.engine.schemas.agent_schemas import (
    Message,
    MessageType,
    Role,
    TextContent,
    ContentType,
)


async def main():
    endpoint = os.getenv("TABLESTORE_ENDPOINT")
    instance_name = os.getenv("TABLESTORE_INSTANCE_NAME")
    access_key_id = os.getenv("TABLESTORE_ACCESS_KEY_ID")
    access_key_secret = os.getenv("TABLESTORE_ACCESS_KEY_SECRET")

    client = AsyncOTSClient(endpoint, access_key_id, access_key_secret, instance_name)

    memory_service = TablestoreMemoryService(
        tablestore_client=client,
        table_name="agentscope_runtime_memory",
    )
    await memory_service.start()
    print("Memory Service started successfully")

    user_id = "user_001"
    session_id = "session_001"

    messages = [
        Message(
            type=MessageType.MESSAGE,
            role=Role.USER,
            content=[TextContent(type=ContentType.TEXT, text="I like Sichuan food")],
        ),
        Message(
            type=MessageType.MESSAGE,
            role=Role.USER,
            content=[TextContent(type=ContentType.TEXT, text="My birthday is March 15th")],
        ),
    ]

    print(f"Preparing to write {len(messages)} memories:")
    for msg in messages:
        print(f"  - [{msg.role}] {msg.get_text_content()}")

    await memory_service.add_memory(
        user_id=user_id,
        messages=messages,
        session_id=session_id,
    )
    print(f"Write successful, user_id={user_id}, session_id={session_id}")

    await memory_service.stop()


if __name__ == "__main__":
    asyncio.run(main())

例を実行します:

python examples/tablestore_memory_add.py

長期メモリの検索

以下の例は、セマンティック検索を使用してユーザーの長期メモリをクエリする方法を示しています。コードを examples/tablestore_memory_search.py ファイルに保存してください。

説明

データが書き込まれた後、検索インデックスが同期されるまで数秒かかります。次の例を実行したときにデータが見つからない場合は、検索インデックスの同期が完了するまでお待ちください。

import os
import asyncio

from tablestore import AsyncOTSClient
from agentscope_runtime.engine.services.memory import TablestoreMemoryService
from agentscope_runtime.engine.schemas.agent_schemas import (
    Message,
    MessageType,
    Role,
    TextContent,
    ContentType,
)


async def main():
    endpoint = os.getenv("TABLESTORE_ENDPOINT")
    instance_name = os.getenv("TABLESTORE_INSTANCE_NAME")
    access_key_id = os.getenv("TABLESTORE_ACCESS_KEY_ID")
    access_key_secret = os.getenv("TABLESTORE_ACCESS_KEY_SECRET")

    client = AsyncOTSClient(endpoint, access_key_id, access_key_secret, instance_name)

    memory_service = TablestoreMemoryService(
        tablestore_client=client,
        table_name="agentscope_runtime_memory",
    )
    await memory_service.start()
    print("Memory Service started successfully")

    user_id = "user_001"

    query = [
        Message(
            type=MessageType.MESSAGE,
            role=Role.USER,
            content=[TextContent(type=ContentType.TEXT, text="What kind of food do I like?")],
        )
    ]
    print(f"Search query: {query[0].get_text_content()}")

    results = await memory_service.search_memory(user_id=user_id, messages=query)
    print(f"Search results, found {len(results)} related memories:")
    for mem in results:
        print(f"  - [{mem.role}] {mem.get_text_content()}")

    all_memories = await memory_service.list_memory(user_id=user_id)
    print(f"User {user_id} has a total of {len(all_memories)} long-term memories")

    await memory_service.stop()


if __name__ == "__main__":
    asyncio.run(main())

例を実行します:

python examples/tablestore_memory_search.py

コード例:セッション

セッション履歴サービスは、セッションとメッセージレコードを管理して、会話コンテキストを保存します。

セッションの作成とメッセージの追加

以下の例は、セッションを作成し、会話メッセージを書き込む方法を示しています。コードを examples/tablestore_session_create.py ファイルに保存してください。

import os
import asyncio

from tablestore import AsyncOTSClient
from agentscope_runtime.engine.services.session_history import TablestoreSessionHistoryService
from agentscope_runtime.engine.schemas.agent_schemas import (
    Message,
    MessageType,
    Role,
    TextContent,
    ContentType,
)


async def main():
    endpoint = os.getenv("TABLESTORE_ENDPOINT")
    instance_name = os.getenv("TABLESTORE_INSTANCE_NAME")
    access_key_id = os.getenv("TABLESTORE_ACCESS_KEY_ID")
    access_key_secret = os.getenv("TABLESTORE_ACCESS_KEY_SECRET")

    client = AsyncOTSClient(endpoint, access_key_id, access_key_secret, instance_name)

    session_service = TablestoreSessionHistoryService(
        tablestore_client=client,
        session_table_name="agentscope_runtime_session",
        message_table_name="agentscope_runtime_message",
    )
    await session_service.start()
    print("Session History Service started successfully")

    user_id = "user_001"
    session_id = "session_003"

    session = await session_service.create_session(
        user_id=user_id,
        session_id=session_id,
    )
    print(f"Session created successfully: session_id={session.id}, user_id={user_id}")

    user_msg = Message(
        type=MessageType.MESSAGE,
        role=Role.USER,
        content=[TextContent(type=ContentType.TEXT, text="こんにちは、私の名前は田中一郎です")],
    )
    await session_service.append_message(session, user_msg)
    print(f"Writing message: [{user_msg.role}] {user_msg.get_text_content()}")

    assistant_msg = Message(
        type=MessageType.MESSAGE,
        role=Role.ASSISTANT,
        content=[TextContent(type=ContentType.TEXT, text="こんにちは、田中一郎さん!何かお手伝いできることはありますか?")],
    )
    await session_service.append_message(session, assistant_msg)
    print(f"Writing message: [{assistant_msg.role}] {assistant_msg.get_text_content()}")

    print(f"Wrote 2 messages to session {session_id}")

    await session_service.stop()


if __name__ == "__main__":
    asyncio.run(main())

例を実行します:

python examples/tablestore_session_create.py

セッション履歴の取得

以下の例は、特定のセッションの履歴メッセージレコードを取得する方法を示しています。コードを examples/tablestore_session_get.py ファイルに保存してください。

説明

データが書き込まれた後、検索インデックスが同期されるまで数秒かかります。次の例を実行したときにデータが見つからない場合は、検索インデックスの同期が完了するまでお待ちください。

import os
import asyncio

from tablestore import AsyncOTSClient
from agentscope_runtime.engine.services.session_history import TablestoreSessionHistoryService


async def main():
    endpoint = os.getenv("TABLESTORE_ENDPOINT")
    instance_name = os.getenv("TABLESTORE_INSTANCE_NAME")
    access_key_id = os.getenv("TABLESTORE_ACCESS_KEY_ID")
    access_key_secret = os.getenv("TABLESTORE_ACCESS_KEY_SECRET")

    client = AsyncOTSClient(endpoint, access_key_id, access_key_secret, instance_name)

    session_service = TablestoreSessionHistoryService(
        tablestore_client=client,
        session_table_name="agentscope_runtime_session",
        message_table_name="agentscope_runtime_message",
    )
    await session_service.start()
    print("Session History Service started successfully")

    user_id = "user_001"
    session_id = "session_003"

    session = await session_service.get_session(user_id, session_id)
    print(f"Getting session {session_id}, found {len(session.messages)} messages:")
    for msg in session.messages:
        text = msg.get_text_content() if hasattr(msg, "get_text_content") else str(msg)
        role = msg.role if hasattr(msg, "role") else "unknown"
        print(f"  - [{role}] {text}")

    sessions = await session_service.list_sessions(user_id=user_id)
    print(f"User {user_id} has a total of {len(sessions)} sessions")

    await session_service.stop()


if __name__ == "__main__":
    asyncio.run(main())

例を実行します:

python examples/tablestore_session_get.py

コード例:対話型チャットデモ

以下の例は、大規模言語モデル (LLM) の呼び出し、セッション履歴の格納、長期メモリの取得を統合した、完全な対話型チャットアプリケーションを構築する方法を示しています。コードを examples/tablestore_chat_demo.py ファイルに保存してください。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# この例は、Tablestore を使用してセッション履歴と長期メモリを格納する対話型チャットデモを構築する方法を示しています
# 実行する前に、次の環境変数を設定してください:
#   TABLESTORE_ENDPOINT, TABLESTORE_INSTANCE_NAME,
#   TABLESTORE_ACCESS_KEY_ID, TABLESTORE_ACCESS_KEY_SECRET, DASHSCOPE_API_KEY

import os
import asyncio
import uuid
import sys
import io
import logging

from tablestore import AsyncOTSClient
from dashscope import Generation
from rich.console import Console
from rich.live import Live
from rich.spinner import Spinner

console = Console()

from agentscope_runtime.engine.services.memory import TablestoreMemoryService
from agentscope_runtime.engine.services.session_history import TablestoreSessionHistoryService
from agentscope_runtime.engine.schemas.agent_schemas import (
    Message,
    MessageType,
    Role,
    TextContent,
    ContentType,
)

memory_service = None
session_service = None
current_session = None
current_session_id = None
user_id = "cli_user"
chat_history = []

# テーブル名とインデックス名
MEMORY_TABLE = "chat_demo_memory"
SESSION_TABLE = "chat_demo_session"
MESSAGE_TABLE = "chat_demo_message"
# メモリサービスは SDK で固定のインデックス名を使用します
MEMORY_SEARCH_INDEX = "agentscope_runtime_knowledge_search_index_name"
# セッションサービスは、他のデモとの競合を避けるためにカスタムインデックス名を使用します
SESSION_SECONDARY_INDEX = f"{SESSION_TABLE}_secondary_index"
SESSION_SEARCH_INDEX = f"{SESSION_TABLE}_search_index"
MESSAGE_SECONDARY_INDEX = f"{MESSAGE_TABLE}_secondary_index"
MESSAGE_SEARCH_INDEX = f"{MESSAGE_TABLE}_search_index"


async def delete_table_with_indexes(client, table_name):
    """ネイティブの Tablestore SDK を使用して、テーブルとそのすべてのインデックスを削除します。"""
    import tablestore

    try:
        # まず検索インデックスを削除します
        try:
            search_indexes = await client.list_search_index(table_name)
            for index_tuple in search_indexes:
                await client.delete_search_index(index_tuple[0], index_tuple[1])
        except Exception:
            pass

        # テーブルを削除します
        await client.delete_table(table_name)
        print(f"  Deleted table: {table_name}")
    except tablestore.OTSServiceError as e:
        if "OTSObjectNotExist" not in str(e):
            print(f"  Warning: Failed to delete {table_name}: {e}")


async def init_services():
    global memory_service, session_service

    client = AsyncOTSClient(
        os.getenv("TABLESTORE_ENDPOINT"),
        os.getenv("TABLESTORE_ACCESS_KEY_ID"),
        os.getenv("TABLESTORE_ACCESS_KEY_SECRET"),
        os.getenv("TABLESTORE_INSTANCE_NAME"),
    )

    # ネイティブ SDK を使用して既存のテーブルを確認します
    existing_tables = await client.list_table()

    memory_exists = MEMORY_TABLE in existing_tables
    session_existing = [t for t in [SESSION_TABLE, MESSAGE_TABLE] if t in existing_tables]
    session_all_exist = len(session_existing) == 2
    session_none_exist = len(session_existing) == 0

    # 不完全なセッションテーブルを処理します - 既存のものを削除して再作成します
    if not session_all_exist and not session_none_exist:
        print(f"Incomplete session tables detected: {session_existing}")
        print("Cleaning up...")
        for t in session_existing:
            await delete_table_with_indexes(client, t)
        # 削除が完了するのを待ちます
        await asyncio.sleep(3)
        session_all_exist = False

    # サービスインスタンスを作成します
    memory_service = TablestoreMemoryService(
        tablestore_client=client,
        table_name=MEMORY_TABLE,
    )
    session_service = TablestoreSessionHistoryService(
        tablestore_client=client,
        session_table_name=SESSION_TABLE,
        message_table_name=MESSAGE_TABLE,
    )

    # 初期化中に SDK ログを抑制します
    logging.getLogger("tablestore_for_agent_memory").setLevel(logging.CRITICAL)
    original_stderr = sys.stderr
    sys.stderr = io.StringIO()

    try:
        # メモリサービスを初期化します
        if memory_exists:
            _init_memory_service_without_create(memory_service)
        else:
            await memory_service.start()

        # セッションサービスを初期化します - インデックス名の競合を避けるために常にカスタム初期化を使用します
        if session_all_exist:
            _init_session_service_without_create(session_service)
        else:
            # カスタムインデックス名を使用して SDK でテーブルとインデックスを作成します
            await _create_session_tables(client)
            _init_session_service_without_create(session_service)

    finally:
        sys.stderr = original_stderr
        logging.getLogger("tablestore_for_agent_memory").setLevel(logging.WARNING)

    print("Services initialized")


async def _create_session_tables(client):
    """カスタムインデックス名でセッションテーブルとメッセージテーブルを作成します。"""
    from tablestore_for_agent_memory.memory.async_memory_store import AsyncMemoryStore

    # カスタムインデックス名でメモリストアを作成します
    store = AsyncMemoryStore(
        tablestore_client=client,
        session_table_name=SESSION_TABLE,
        message_table_name=MESSAGE_TABLE,
        session_secondary_index_name=SESSION_SECONDARY_INDEX,
        session_search_index_name=SESSION_SEARCH_INDEX,
        message_secondary_index_name=MESSAGE_SECONDARY_INDEX,
        message_search_index_name=MESSAGE_SEARCH_INDEX,
    )
    # テーブルとインデックスを初期化します
    await store.init_table()
    # テーブル作成を待ちます
    await asyncio.sleep(1)


def _init_memory_service_without_create(service):
    """テーブルを作成せずにメモリサービスのストアを初期化します。"""
    from tablestore_for_agent_memory.knowledge.async_knowledge_store import AsyncKnowledgeStore

    service._knowledge_store = AsyncKnowledgeStore(
        tablestore_client=service._tablestore_client,
        vector_dimension=service._vector_dimension,
        enable_multi_tenant=False,
        table_name=service._table_name,
        search_index_name=MEMORY_SEARCH_INDEX,
        search_index_schema=service._search_index_schema,
        text_field=service._text_field,
        embedding_field=service._embedding_field,
        vector_metric_type=service._vector_metric_type,
    )


def _init_session_service_without_create(service):
    """テーブルを作成せずにセッションサービスのストアを初期化します。"""
    from tablestore_for_agent_memory.memory.async_memory_store import AsyncMemoryStore

    service._memory_store = AsyncMemoryStore(
        tablestore_client=service._tablestore_client,
        session_table_name=SESSION_TABLE,
        message_table_name=MESSAGE_TABLE,
        session_secondary_index_name=SESSION_SECONDARY_INDEX,
        session_search_index_name=SESSION_SEARCH_INDEX,
        message_secondary_index_name=MESSAGE_SECONDARY_INDEX,
        message_search_index_name=MESSAGE_SEARCH_INDEX,
        session_secondary_index_meta=service._session_secondary_index_meta,
        session_search_index_schema=service._session_search_index_schema,
        message_search_index_schema=service._message_search_index_schema,
    )


async def create_new_session():
    global current_session, current_session_id, chat_history
    current_session_id = f"session_{uuid.uuid4().hex[:8]}"
    current_session = await session_service.create_session(
        user_id=user_id,
        session_id=current_session_id,
    )
    chat_history = []
    print(f"New session created: {current_session_id}")


def create_message(role: str, text: str) -> Message:
    return Message(
        type=MessageType.MESSAGE,
        role=role,
        content=[TextContent(type=ContentType.TEXT, text=text)],
    )


async def save_message(role: str, text: str):
    if current_session is None:
        return
    msg = create_message(role, text)
    await session_service.append_message(current_session, msg)


async def search_memory(query: str) -> str:
    msg = create_message(Role.USER, query)
    results = await memory_service.search_memory(user_id=user_id, messages=[msg])
    if not results:
        return ""
    texts = [r.get_text_content() for r in results[:3] if r.get_text_content()]
    return "\n".join([f"  - {t}" for t in texts])


async def add_memory(text: str):
    msg = create_message(Role.USER, text)
    await memory_service.add_memory(
        user_id=user_id,
        messages=[msg],
        session_id=current_session_id,
    )
    print(f"Memory saved: {text}")


def call_llm(messages: list, memory_context: str = "") -> str:
    system_prompt = "あなたは役立つ AI アシスタントです。"
    if memory_context:
        system_prompt += f"\n\nユーザーの長期メモリ:\n{memory_context}"

    dashscope_messages = [{"role": "system", "content": system_prompt}]
    for role, content in messages:
        dashscope_messages.append({"role": role, "content": content})

    response = Generation.call(
        model="qwen-turbo",
        messages=dashscope_messages,
        api_key=os.getenv("DASHSCOPE_API_KEY"),
        result_format="message",
    )
    if response.status_code == 200:
        return response.output.choices[0].message.content
    return f"エラー: {response.code} - {response.message}"


async def chat(user_input: str):
    global chat_history
    memory_context = await search_memory(user_input)
    if memory_context:
        print(f"[メモリ] 関連するメモリが見つかりました:\n{memory_context}")

    messages = chat_history + [("user", user_input)]

    with Live(Spinner("dots", text="考え中...", style="cyan"), console=console, refresh_per_second=10):
        response = call_llm(messages, memory_context)

    await save_message(Role.USER, user_input)
    await save_message(Role.ASSISTANT, response)
    chat_history.append(("user", user_input))
    chat_history.append(("assistant", response))

    console.print(f"\n[bold green]アシスタント:[/bold green] {response}\n")


async def list_sessions():
    sessions = await session_service.list_sessions(user_id=user_id)
    print(f"合計 {len(sessions)} 件のセッション:")
    for s in sessions[:10]:
        print(f"  - {s.id}")


async def list_memories():
    memories = await memory_service.list_memory(user_id=user_id)
    print(f"合計 {len(memories)} 件のメモリ:")
    for m in memories[:10]:
        text = m.get_text_content()
        if text:
            print(f"  - {text[:60]}...")


async def load_session(session_id: str):
    global current_session, current_session_id, chat_history
    current_session_id = session_id
    current_session = await session_service.get_session(user_id, session_id)
    chat_history = []
    for msg in current_session.messages:
        text = msg.get_text_content() if hasattr(msg, "get_text_content") else str(msg)
        role = msg.role if hasattr(msg, "role") else "unknown"
        chat_history.append((role, text))
    print(f"セッションを読み込みました: {session_id}, {len(current_session.messages)} 件のメッセージ")


def print_help():
    print("""
コマンド:
  /new          - 新しいセッションを作成
  /sessions     - すべてのセッションを一覧表示
  /load <id>    - ID でセッションを読み込み
  /remember <content> - 長期メモリを保存
  /memories     - すべての長期メモリを一覧表示
  /history      - 現在のチャット履歴を表示
  /help         - このヘルプを表示
  /quit         - 終了

それ以外の場合は、メッセージを入力してチャットを開始してください。
""")


async def main():
    await init_services()
    await create_new_session()
    print_help()

    while True:
        try:
            user_input = input("あなた: ").strip()
        except (EOFError, KeyboardInterrupt):
            print("\nさようなら!")
            break

        if not user_input:
            continue

        if user_input == "/quit":
            print("さようなら!")
            break
        elif user_input == "/help":
            print_help()
        elif user_input == "/new":
            await create_new_session()
        elif user_input == "/sessions":
            await list_sessions()
        elif user_input == "/memories":
            await list_memories()
        elif user_input == "/history":
            print(f"現在のセッション: {current_session_id}")
            for role, text in chat_history:
                print(f"  [{role}] {text[:80]}...")
        elif user_input.startswith("/load "):
            session_id = user_input[6:].strip()
            await load_session(session_id)
        elif user_input.startswith("/remember "):
            text = user_input[10:].strip()
            await add_memory(text)
        else:
            await chat(user_input)

    await memory_service.stop()
    await session_service.stop()


if __name__ == "__main__":
    asyncio.run(main())
  • 例を実行します:

    python examples/tablestore_chat_demo.py
  • 対話型コマンド:

    コマンド

    機能

    /new

    新しいセッションを作成

    /sessions

    すべてのセッションを一覧表示

    /load <id>

    特定のセッションを読み込み

    /remember <content>

    長期メモリを保存

    /memories

    すべてのメモリを一覧表示

    /history

    現在のセッション履歴を表示

    /help

    ヘルプ情報を表示

    /quit

    終了

参考