All Products
Search
Document Center

Tablestore:Integrasikan AgentScope Runtime dengan Tablestore

Last Updated:Jan 31, 2026

AgentScope Runtime mendukung penggunaan Alibaba Cloud Tablestore sebagai backend persistensi untuk Memory Service dan Session History Service, menyediakan penyimpanan berkinerja tinggi dan highly available (HA) untuk riwayat sesi dan memori jangka panjang dalam aplikasi AI Agent.

2026-01-30_17-03-06 (1)

Fitur utama

Service

Feature

Skenario

TablestoreMemoryService

Menyimpan memori jangka panjang dan mendukung full-text index

Preferensi pengguna, basis pengetahuan, dan memori lintas sesi

TablestoreSessionHistoryService

Menyimpan riwayat sesi

Konteks percakapan dan catatan pesan

Persiapan

Sebelum menggunakan layanan penyimpanan Tablestore, lengkapi konfigurasi lingkungan dan instalasi dependensi berikut.

  1. Persiapkan lingkungan: Pastikan lingkungan runtime Python telah terinstal. Jalankan perintah python3 --version untuk memeriksa versinya.

  2. Unduh proyek: Unduh dan ekstrak paket rilis AgentScope Runtime 1.0.5 agentscope-runtime.zip. Setelah diekstrak, navigasikan ke folder proyek.

    Penting

    Fitur integrasi Tablestore hanya tersedia di AgentScope Runtime 1.0.5 dan versi sebelumnya. Pastikan Anda menggunakan paket rilis yang ditentukan.

    cd agentscope-runtime
  3. Buat lingkungan virtual: Buat dan aktifkan lingkungan virtual Python.

    python3 -m venv .venv && source .venv/bin/activate
  4. Instal dependensi: Instal dependensi inti proyek dan ekstensi Tablestore.

    pip3 install -e . && pip3 install tablestore-for-agent-memory langchain-community
  5. Konfigurasikan variabel lingkungan: Tetapkan variabel lingkungan berikut untuk menghubungkan ke Tablestore dan memanggil Large Language Model (LLM).

    Variabel lingkungan

    Deskripsi

    TABLESTORE_ENDPOINT

    Titik akhir instans Tablestore. Anda dapat menemukannya di Konsol Tablestore.

    TABLESTORE_INSTANCE_NAME

    Nama instans Tablestore.

    TABLESTORE_ACCESS_KEY_ID

    AccessKey ID dari Akun Alibaba Cloud atau Pengguna Resource Access Management (RAM) Anda.

    TABLESTORE_ACCESS_KEY_SECRET

    Rahasia AccessKey dari Akun Alibaba Cloud atau Pengguna RAM Anda.

    DASHSCOPE_API_KEY

    Kunci API dari platform Alibaba Cloud Model Studio. Ini digunakan untuk memanggil LLM.

    export TABLESTORE_ENDPOINT="https://<instance>.<region-id>.ots.aliyuncs.com"
    export TABLESTORE_INSTANCE_NAME="<instance-name>"
    export TABLESTORE_ACCESS_KEY_ID="<your-access-key-id>"
    export TABLESTORE_ACCESS_KEY_SECRET="<your-access-key-secret>"
    export DASHSCOPE_API_KEY="<your-dashscope-api-key>"

Kode contoh: Memori

Memory Service mengelola memori jangka panjang AI Agent dan mendukung penyimpanan informasi lintas sesi serta pengambilan semantik.

Tambahkan memori jangka panjang

Contoh berikut menunjukkan cara menulis informasi pengguna ke Tablestore sebagai memori jangka panjang. Simpan kode ke file examples/tablestore_memory_add.py.

import os
import asyncio

from tablestore import AsyncOTSClient
from agentscope_runtime.engine.services.memory import TablestoreMemoryService
from agentscope_runtime.engine.schemas.agent_schemas import (
    Message,
    MessageType,
    Role,
    TextContent,
    ContentType,
)


async def main():
    endpoint = os.getenv("TABLESTORE_ENDPOINT")
    instance_name = os.getenv("TABLESTORE_INSTANCE_NAME")
    access_key_id = os.getenv("TABLESTORE_ACCESS_KEY_ID")
    access_key_secret = os.getenv("TABLESTORE_ACCESS_KEY_SECRET")

    client = AsyncOTSClient(endpoint, access_key_id, access_key_secret, instance_name)

    memory_service = TablestoreMemoryService(
        tablestore_client=client,
        table_name="agentscope_runtime_memory",
    )
    await memory_service.start()
    print("Memory Service started successfully")

    user_id = "user_001"
    session_id = "session_001"

    messages = [
        Message(
            type=MessageType.MESSAGE,
            role=Role.USER,
            content=[TextContent(type=ContentType.TEXT, text="I like Sichuan food")],
        ),
        Message(
            type=MessageType.MESSAGE,
            role=Role.USER,
            content=[TextContent(type=ContentType.TEXT, text="My birthday is March 15th")],
        ),
    ]

    print(f"Preparing to write {len(messages)} memories:")
    for msg in messages:
        print(f"  - [{msg.role}] {msg.get_text_content()}")

    await memory_service.add_memory(
        user_id=user_id,
        messages=messages,
        session_id=session_id,
    )
    print(f"Write successful, user_id={user_id}, session_id={session_id}")

    await memory_service.stop()


if __name__ == "__main__":
    asyncio.run(main())

Jalankan contoh:

python examples/tablestore_memory_add.py

Cari memori jangka panjang

Contoh berikut menunjukkan cara mengkueri memori jangka panjang pengguna menggunakan pengambilan semantik. Simpan kode ke file examples/tablestore_memory_search.py.

Catatan

Setelah data ditulis, indeks pencarian memerlukan beberapa detik untuk disinkronkan. Jika Anda tidak dapat menemukan data saat menjalankan contoh berikut, tunggu hingga sinkronisasi indeks pencarian selesai.

import os
import asyncio

from tablestore import AsyncOTSClient
from agentscope_runtime.engine.services.memory import TablestoreMemoryService
from agentscope_runtime.engine.schemas.agent_schemas import (
    Message,
    MessageType,
    Role,
    TextContent,
    ContentType,
)


async def main():
    endpoint = os.getenv("TABLESTORE_ENDPOINT")
    instance_name = os.getenv("TABLESTORE_INSTANCE_NAME")
    access_key_id = os.getenv("TABLESTORE_ACCESS_KEY_ID")
    access_key_secret = os.getenv("TABLESTORE_ACCESS_KEY_SECRET")

    client = AsyncOTSClient(endpoint, access_key_id, access_key_secret, instance_name)

    memory_service = TablestoreMemoryService(
        tablestore_client=client,
        table_name="agentscope_runtime_memory",
    )
    await memory_service.start()
    print("Memory Service started successfully")

    user_id = "user_001"

    query = [
        Message(
            type=MessageType.MESSAGE,
            role=Role.USER,
            content=[TextContent(type=ContentType.TEXT, text="What kind of food do I like?")],
        )
    ]
    print(f"Search query: {query[0].get_text_content()}")

    results = await memory_service.search_memory(user_id=user_id, messages=query)
    print(f"Search results, found {len(results)} related memories:")
    for mem in results:
        print(f"  - [{mem.role}] {mem.get_text_content()}")

    all_memories = await memory_service.list_memory(user_id=user_id)
    print(f"User {user_id} has a total of {len(all_memories)} long-term memories")

    await memory_service.stop()


if __name__ == "__main__":
    asyncio.run(main())

Jalankan contoh:

python examples/tablestore_memory_search.py

Kode contoh: Sesi

Session History Service mengelola sesi dan catatan pesan untuk menyimpan konteks percakapan.

Buat sesi dan tambahkan pesan

Contoh berikut menunjukkan cara membuat sesi dan menulis pesan percakapan. Simpan kode ke file examples/tablestore_session_create.py.

import os
import asyncio

from tablestore import AsyncOTSClient
from agentscope_runtime.engine.services.session_history import TablestoreSessionHistoryService
from agentscope_runtime.engine.schemas.agent_schemas import (
    Message,
    MessageType,
    Role,
    TextContent,
    ContentType,
)


async def main():
    endpoint = os.getenv("TABLESTORE_ENDPOINT")
    instance_name = os.getenv("TABLESTORE_INSTANCE_NAME")
    access_key_id = os.getenv("TABLESTORE_ACCESS_KEY_ID")
    access_key_secret = os.getenv("TABLESTORE_ACCESS_KEY_SECRET")

    client = AsyncOTSClient(endpoint, access_key_id, access_key_secret, instance_name)

    session_service = TablestoreSessionHistoryService(
        tablestore_client=client,
        session_table_name="agentscope_runtime_session",
        message_table_name="agentscope_runtime_message",
    )
    await session_service.start()
    print("Session History Service started successfully")

    user_id = "user_001"
    session_id = "session_003"

    session = await session_service.create_session(
        user_id=user_id,
        session_id=session_id,
    )
    print(f"Session created successfully: session_id={session.id}, user_id={user_id}")

    user_msg = Message(
        type=MessageType.MESSAGE,
        role=Role.USER,
        content=[TextContent(type=ContentType.TEXT, text="Hello, my name is John Doe")],
    )
    await session_service.append_message(session, user_msg)
    print(f"Writing message: [{user_msg.role}] {user_msg.get_text_content()}")

    assistant_msg = Message(
        type=MessageType.MESSAGE,
        role=Role.ASSISTANT,
        content=[TextContent(type=ContentType.TEXT, text="Hello John Doe! How can I help you?")],
    )
    await session_service.append_message(session, assistant_msg)
    print(f"Writing message: [{assistant_msg.role}] {assistant_msg.get_text_content()}")

    print(f"Wrote 2 messages to session {session_id}")

    await session_service.stop()


if __name__ == "__main__":
    asyncio.run(main())

Jalankan contoh:

python examples/tablestore_session_create.py

Ambil riwayat sesi

Contoh berikut menunjukkan cara mengambil catatan pesan historis untuk sesi tertentu. Simpan kode ke file examples/tablestore_session_get.py.

Catatan

Setelah data ditulis, indeks pencarian memerlukan beberapa detik untuk disinkronkan. Jika Anda tidak dapat menemukan data saat menjalankan contoh berikut, tunggu hingga sinkronisasi indeks pencarian selesai.

import os
import asyncio

from tablestore import AsyncOTSClient
from agentscope_runtime.engine.services.session_history import TablestoreSessionHistoryService


async def main():
    endpoint = os.getenv("TABLESTORE_ENDPOINT")
    instance_name = os.getenv("TABLESTORE_INSTANCE_NAME")
    access_key_id = os.getenv("TABLESTORE_ACCESS_KEY_ID")
    access_key_secret = os.getenv("TABLESTORE_ACCESS_KEY_SECRET")

    client = AsyncOTSClient(endpoint, access_key_id, access_key_secret, instance_name)

    session_service = TablestoreSessionHistoryService(
        tablestore_client=client,
        session_table_name="agentscope_runtime_session",
        message_table_name="agentscope_runtime_message",
    )
    await session_service.start()
    print("Session History Service started successfully")

    user_id = "user_001"
    session_id = "session_003"

    session = await session_service.get_session(user_id, session_id)
    print(f"Getting session {session_id}, found {len(session.messages)} messages:")
    for msg in session.messages:
        text = msg.get_text_content() if hasattr(msg, "get_text_content") else str(msg)
        role = msg.role if hasattr(msg, "role") else "unknown"
        print(f"  - [{role}] {text}")

    sessions = await session_service.list_sessions(user_id=user_id)
    print(f"User {user_id} has a total of {len(sessions)} sessions")

    await session_service.stop()


if __name__ == "__main__":
    asyncio.run(main())

Jalankan contoh:

python examples/tablestore_session_get.py

Kode contoh: Demo chat interaktif

Contoh berikut menunjukkan cara membangun aplikasi chat interaktif lengkap yang mengintegrasikan pemanggilan LLM, penyimpanan riwayat sesi, dan pengambilan memori jangka panjang. Simpan kode ke file examples/tablestore_chat_demo.py.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# This example shows how to build an interactive chat demo that uses Tablestore to store session history and long-term memory
# Before running, configure the following environment variables:
#   TABLESTORE_ENDPOINT, TABLESTORE_INSTANCE_NAME,
#   TABLESTORE_ACCESS_KEY_ID, TABLESTORE_ACCESS_KEY_SECRET, DASHSCOPE_API_KEY

import os
import asyncio
import uuid
import sys
import io
import logging

from tablestore import AsyncOTSClient
from dashscope import Generation
from rich.console import Console
from rich.live import Live
from rich.spinner import Spinner

console = Console()

from agentscope_runtime.engine.services.memory import TablestoreMemoryService
from agentscope_runtime.engine.services.session_history import TablestoreSessionHistoryService
from agentscope_runtime.engine.schemas.agent_schemas import (
    Message,
    MessageType,
    Role,
    TextContent,
    ContentType,
)

memory_service = None
session_service = None
current_session = None
current_session_id = None
user_id = "cli_user"
chat_history = []

# Table and index names
MEMORY_TABLE = "chat_demo_memory"
SESSION_TABLE = "chat_demo_session"
MESSAGE_TABLE = "chat_demo_message"
# Memory service uses a fixed index name in SDK
MEMORY_SEARCH_INDEX = "agentscope_runtime_knowledge_search_index_name"
# Session service uses custom index names to avoid conflicts with other demos
SESSION_SECONDARY_INDEX = f"{SESSION_TABLE}_secondary_index"
SESSION_SEARCH_INDEX = f"{SESSION_TABLE}_search_index"
MESSAGE_SECONDARY_INDEX = f"{MESSAGE_TABLE}_secondary_index"
MESSAGE_SEARCH_INDEX = f"{MESSAGE_TABLE}_search_index"


async def delete_table_with_indexes(client, table_name):
    """Delete a table and all its indexes using native Tablestore SDK."""
    import tablestore

    try:
        # Delete search indexes first
        try:
            search_indexes = await client.list_search_index(table_name)
            for index_tuple in search_indexes:
                await client.delete_search_index(index_tuple[0], index_tuple[1])
        except Exception:
            pass

        # Delete the table
        await client.delete_table(table_name)
        print(f"  Deleted table: {table_name}")
    except tablestore.OTSServiceError as e:
        if "OTSObjectNotExist" not in str(e):
            print(f"  Warning: Failed to delete {table_name}: {e}")


async def init_services():
    global memory_service, session_service

    client = AsyncOTSClient(
        os.getenv("TABLESTORE_ENDPOINT"),
        os.getenv("TABLESTORE_ACCESS_KEY_ID"),
        os.getenv("TABLESTORE_ACCESS_KEY_SECRET"),
        os.getenv("TABLESTORE_INSTANCE_NAME"),
    )

    # Check existing tables using native SDK
    existing_tables = await client.list_table()

    memory_exists = MEMORY_TABLE in existing_tables
    session_existing = [t for t in [SESSION_TABLE, MESSAGE_TABLE] if t in existing_tables]
    session_all_exist = len(session_existing) == 2
    session_none_exist = len(session_existing) == 0

    # Handle incomplete session tables - delete existing ones and recreate
    if not session_all_exist and not session_none_exist:
        print(f"Incomplete session tables detected: {session_existing}")
        print("Cleaning up...")
        for t in session_existing:
            await delete_table_with_indexes(client, t)
        # Wait for deletion to complete
        await asyncio.sleep(3)
        session_all_exist = False

    # Create service instances
    memory_service = TablestoreMemoryService(
        tablestore_client=client,
        table_name=MEMORY_TABLE,
    )
    session_service = TablestoreSessionHistoryService(
        tablestore_client=client,
        session_table_name=SESSION_TABLE,
        message_table_name=MESSAGE_TABLE,
    )

    # Suppress SDK logs during init
    logging.getLogger("tablestore_for_agent_memory").setLevel(logging.CRITICAL)
    original_stderr = sys.stderr
    sys.stderr = io.StringIO()

    try:
        # Init memory service
        if memory_exists:
            _init_memory_service_without_create(memory_service)
        else:
            await memory_service.start()

        # Init session service - always use custom init to avoid index name conflicts
        if session_all_exist:
            _init_session_service_without_create(session_service)
        else:
            # Create tables and indexes using SDK with custom index names
            await _create_session_tables(client)
            _init_session_service_without_create(session_service)

    finally:
        sys.stderr = original_stderr
        logging.getLogger("tablestore_for_agent_memory").setLevel(logging.WARNING)

    print("Services initialized")


async def _create_session_tables(client):
    """Create session and message tables with custom index names."""
    from tablestore_for_agent_memory.memory.async_memory_store import AsyncMemoryStore

    # Create the memory store with custom index names
    store = AsyncMemoryStore(
        tablestore_client=client,
        session_table_name=SESSION_TABLE,
        message_table_name=MESSAGE_TABLE,
        session_secondary_index_name=SESSION_SECONDARY_INDEX,
        session_search_index_name=SESSION_SEARCH_INDEX,
        message_secondary_index_name=MESSAGE_SECONDARY_INDEX,
        message_search_index_name=MESSAGE_SEARCH_INDEX,
    )
    # Init tables and indexes
    await store.init_table()
    # Wait for table creation
    await asyncio.sleep(1)


def _init_memory_service_without_create(service):
    """Initialize memory service store without creating tables."""
    from tablestore_for_agent_memory.knowledge.async_knowledge_store import AsyncKnowledgeStore

    service._knowledge_store = AsyncKnowledgeStore(
        tablestore_client=service._tablestore_client,
        vector_dimension=service._vector_dimension,
        enable_multi_tenant=False,
        table_name=service._table_name,
        search_index_name=MEMORY_SEARCH_INDEX,
        search_index_schema=service._search_index_schema,
        text_field=service._text_field,
        embedding_field=service._embedding_field,
        vector_metric_type=service._vector_metric_type,
    )


def _init_session_service_without_create(service):
    """Initialize session service store without creating tables."""
    from tablestore_for_agent_memory.memory.async_memory_store import AsyncMemoryStore

    service._memory_store = AsyncMemoryStore(
        tablestore_client=service._tablestore_client,
        session_table_name=SESSION_TABLE,
        message_table_name=MESSAGE_TABLE,
        session_secondary_index_name=SESSION_SECONDARY_INDEX,
        session_search_index_name=SESSION_SEARCH_INDEX,
        message_secondary_index_name=MESSAGE_SECONDARY_INDEX,
        message_search_index_name=MESSAGE_SEARCH_INDEX,
        session_secondary_index_meta=service._session_secondary_index_meta,
        session_search_index_schema=service._session_search_index_schema,
        message_search_index_schema=service._message_search_index_schema,
    )


async def create_new_session():
    global current_session, current_session_id, chat_history
    current_session_id = f"session_{uuid.uuid4().hex[:8]}"
    current_session = await session_service.create_session(
        user_id=user_id,
        session_id=current_session_id,
    )
    chat_history = []
    print(f"New session created: {current_session_id}")


def create_message(role: str, text: str) -> Message:
    return Message(
        type=MessageType.MESSAGE,
        role=role,
        content=[TextContent(type=ContentType.TEXT, text=text)],
    )


async def save_message(role: str, text: str):
    if current_session is None:
        return
    msg = create_message(role, text)
    await session_service.append_message(current_session, msg)


async def search_memory(query: str) -> str:
    msg = create_message(Role.USER, query)
    results = await memory_service.search_memory(user_id=user_id, messages=[msg])
    if not results:
        return ""
    texts = [r.get_text_content() for r in results[:3] if r.get_text_content()]
    return "\n".join([f"  - {t}" for t in texts])


async def add_memory(text: str):
    msg = create_message(Role.USER, text)
    await memory_service.add_memory(
        user_id=user_id,
        messages=[msg],
        session_id=current_session_id,
    )
    print(f"Memory saved: {text}")


def call_llm(messages: list, memory_context: str = "") -> str:
    system_prompt = "You are a helpful AI assistant."
    if memory_context:
        system_prompt += f"\n\nUser's long-term memory:\n{memory_context}"

    dashscope_messages = [{"role": "system", "content": system_prompt}]
    for role, content in messages:
        dashscope_messages.append({"role": role, "content": content})

    response = Generation.call(
        model="qwen-turbo",
        messages=dashscope_messages,
        api_key=os.getenv("DASHSCOPE_API_KEY"),
        result_format="message",
    )
    if response.status_code == 200:
        return response.output.choices[0].message.content
    return f"Error: {response.code} - {response.message}"


async def chat(user_input: str):
    global chat_history
    memory_context = await search_memory(user_input)
    if memory_context:
        print(f"[Memory] Found related memories:\n{memory_context}")

    messages = chat_history + [("user", user_input)]

    with Live(Spinner("dots", text="Thinking...", style="cyan"), console=console, refresh_per_second=10):
        response = call_llm(messages, memory_context)

    await save_message(Role.USER, user_input)
    await save_message(Role.ASSISTANT, response)
    chat_history.append(("user", user_input))
    chat_history.append(("assistant", response))

    console.print(f"\n[bold green]Assistant:[/bold green] {response}\n")


async def list_sessions():
    sessions = await session_service.list_sessions(user_id=user_id)
    print(f"Total {len(sessions)} sessions:")
    for s in sessions[:10]:
        print(f"  - {s.id}")


async def list_memories():
    memories = await memory_service.list_memory(user_id=user_id)
    print(f"Total {len(memories)} memories:")
    for m in memories[:10]:
        text = m.get_text_content()
        if text:
            print(f"  - {text[:60]}...")


async def load_session(session_id: str):
    global current_session, current_session_id, chat_history
    current_session_id = session_id
    current_session = await session_service.get_session(user_id, session_id)
    chat_history = []
    for msg in current_session.messages:
        text = msg.get_text_content() if hasattr(msg, "get_text_content") else str(msg)
        role = msg.role if hasattr(msg, "role") else "unknown"
        chat_history.append((role, text))
    print(f"Loaded session: {session_id}, {len(current_session.messages)} messages")


def print_help():
    print("""
Commands:
  /new          - Create new session
  /sessions     - List all sessions
  /load <id>    - Load session by ID
  /remember <content> - Save long-term memory
  /memories     - List all long-term memories
  /history      - Show current chat history
  /help         - Show this help
  /quit         - Exit

Otherwise, just type your message to chat.
""")


async def main():
    await init_services()
    await create_new_session()
    print_help()

    while True:
        try:
            user_input = input("You: ").strip()
        except (EOFError, KeyboardInterrupt):
            print("\nBye!")
            break

        if not user_input:
            continue

        if user_input == "/quit":
            print("Bye!")
            break
        elif user_input == "/help":
            print_help()
        elif user_input == "/new":
            await create_new_session()
        elif user_input == "/sessions":
            await list_sessions()
        elif user_input == "/memories":
            await list_memories()
        elif user_input == "/history":
            print(f"Current session: {current_session_id}")
            for role, text in chat_history:
                print(f"  [{role}] {text[:80]}...")
        elif user_input.startswith("/load "):
            session_id = user_input[6:].strip()
            await load_session(session_id)
        elif user_input.startswith("/remember "):
            text = user_input[10:].strip()
            await add_memory(text)
        else:
            await chat(user_input)

    await memory_service.stop()
    await session_service.stop()


if __name__ == "__main__":
    asyncio.run(main())
  • Jalankan contoh:

    python examples/tablestore_chat_demo.py
  • Perintah interaktif:

    Perintah

    Fitur

    /new

    Buat sesi baru

    /sessions

    Daftar semua sesi

    /load <id>

    Muat sesi tertentu

    /remember <content>

    Simpan memori jangka panjang

    /memories

    Daftar semua memori

    /history

    Tampilkan riwayat sesi saat ini

    /help

    Tampilkan informasi bantuan

    /quit

    Keluar

Referensi