All Products
Search
Document Center

Tablestore:Agent Memory SDK

Last Updated:Jan 15, 2026

The Agent Memory SDK is a framework built on Tablestore that supports Memory and Knowledge scenarios. It provides persistent, high-performance memory storage and semantic retrieval for AI Agent applications, allowing you to quickly build intelligent applications with contextual understanding and long-term memory.

Core architecture

image

Architectural advantages

  • Lightweight design: The SDK abstracts common storage interfaces to reduce development complexity and balances technical depth with ease of use. This allows developers to focus on business logic and achieve results quickly without directly handling low-level database API calls.

  • Scenario-driven design: The SDK provides complete solutions for two core scenarios: real-time memory storage (Memory) and long-term semantic retrieval (Knowledge). It meets basic storage needs and integrates features for business scenarios, such as summary recording, factual data extraction, and user persona tag mining. This design deeply integrates storage and application.

  • Proven business value: The SDK is based on mature industry best practices. This allows developers to quickly validate and implement the business value of AI applications in their own scenarios without requiring complex technical research.

Quick integration

The following Python examples demonstrate how to integrate and use the SDK. For more information about Java integration, see the Java integration instructions.

Prerequisites

Ensure that a Python runtime environment is installed. Run the python3 --version command to check the version.

Install the SDK

pip3 install tablestore-for-agent-memory

Configure environment variables

Set the following required environment variables:

  • TABLESTORE_ACCESS_KEY_ID: The AccessKey ID of your Alibaba Cloud account or Resource Access Management (RAM) user.

  • TABLESTORE_ACCESS_KEY_SECRET: The AccessKey secret of your Alibaba Cloud account or RAM user.

  • TABLESTORE_INSTANCE_NAME: The name of the instance. Obtain it from the Tablestore console.

  • TABLESTORE_ENDPOINT: The endpoint of the instance. Obtain it from the Tablestore console.

Example code: Memory scenario

The Memory scenario is used to manage the session memory of an AI Agent. It includes core features such as session management and message storage. The following example demonstrates how to create a session, record a conversation, and query the history.

Create a session and write conversation records

import tablestore
from tablestore_for_agent_memory.base.common import MetaType, microseconds_timestamp
from tablestore_for_agent_memory.memory.memory_store import MemoryStore
from tablestore_for_agent_memory.base.base_memory_store import Session, Message
import os


def main():
    endpoint = os.getenv('TABLESTORE_ENDPOINT')
    access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
    access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
    instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')

    required_env_vars = {
        'TABLESTORE_ENDPOINT': endpoint,
        'TABLESTORE_ACCESS_KEY_ID': access_key_id,
        'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
        'TABLESTORE_INSTANCE_NAME': instance_name
    }

    missing_vars = [var for var, value in required_env_vars.items() if not value]
    if missing_vars:
        print(f"Error: Missing required environment variables: {', '.join(missing_vars)}")
        print("Set the following environment variables:")
        for var in missing_vars:
            print(f"  export {var}=your_value")
        exit(1)

    tablestore_client = tablestore.OTSClient(
        endpoint,
        access_key_id,
        access_key_secret,
        instance_name,
        retry_policy=tablestore.WriteRetryPolicy(),
    )

    session_secondary_index_meta = {
        "meta_string": MetaType.STRING,
        "meta_long": MetaType.INTEGER,
        "meta_double": MetaType.DOUBLE,
        "meta_boolean": MetaType.BOOLEAN,
        "meta_bytes": MetaType.BINARY,
    }

    session_search_index_schema = [
        tablestore.FieldSchema(
            "title",
            tablestore.FieldType.TEXT,
            analyzer=tablestore.AnalyzerType.FUZZY,
            analyzer_parameter=tablestore.FuzzyAnalyzerParameter(1, 4),
        ),
        tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
        tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
        tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
    ]

    message_search_index_schema = [
        tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
        tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
        tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
    ]

    memory_store = MemoryStore(
        tablestore_client=tablestore_client,
        session_secondary_index_meta=session_secondary_index_meta,
        session_search_index_schema=session_search_index_schema,
        message_search_index_schema=message_search_index_schema,
    )

    print("Creating tables and indexes...")
    try:
        memory_store.init_table()
        memory_store.init_search_index()
        print("Tables and indexes created successfully.")
    except Exception as e:
        print(f"Tables and indexes already exist or failed to be created: {e}")

    print("\n====== Create a new session ======")

    session = Session(user_id="test_user_1", session_id="session_001")
    session.update_time = microseconds_timestamp()
    session.title = "Tablestore Consultation"
    session.metadata = {
        "meta_string": "web_source",
        "meta_long": 1,
        "meta_double": 1.0,
        "meta_boolean": True,
        "model_name": "qwen-max"
    }

    memory_store.put_session(session)
    print(f"Session created successfully: user_id={session.user_id}, session_id={session.session_id}")

    print("\n====== First round of conversation ======")

    message_1 = Message(
        session_id="session_001",
        message_id="msg_001",
        create_time=microseconds_timestamp()
    )
    message_1.content = "Hello, can you tell me what Tablestore is?"
    message_1.metadata = {
        "meta_string": "web",
        "message_type": "user",
        "meta_long": 1
    }
    memory_store.put_message(message_1)
    print(f"User: {message_1.content}")

    session.update_time = microseconds_timestamp()
    memory_store.update_session(session)

    message_2 = Message(
        session_id="session_001",
        message_id="msg_002",
        create_time=microseconds_timestamp()
    )
    message_2.content = "Tablestore is a first-generation Apsara product developed by Alibaba Cloud. It provides storage for massive amounts of structured data and offers fast query and analysis services. It supports multiple data models, such as wide table, IM message, and time series models, to meet data storage needs in different scenarios."
    message_2.metadata = {
        "message_type": "assistant",
        "model": "qwen-max"
    }
    memory_store.put_message(message_2)
    print(f"Assistant: {message_2.content}")

    print("\n====== Second round of conversation ======")

    message_3 = Message(
        session_id="session_001",
        message_id="msg_003",
        create_time=microseconds_timestamp()
    )
    message_3.content = "What are some typical application scenarios for Tablestore?"
    message_3.metadata = {
        "meta_string": "web",
        "message_type": "user",
        "meta_long": 2
    }
    memory_store.put_message(message_3)
    print(f"User: {message_3.content}")

    session.update_time = microseconds_timestamp()
    memory_store.update_session(session)

    message_4 = Message(
        session_id="session_001",
        message_id="msg_004",
        create_time=microseconds_timestamp()
    )
    message_4.content = """Typical application scenarios for Tablestore include the following:
1. AI Agent memory storage: Store knowledge bases, long-term memory, AI session messages, and other information.
2. Metadata management: Store metadata for massive files, videos, and images.
3. Message data: Store IM chat messages, Feed stream messages, and more.
4. Trajectory tracing: Store time series data such as vehicle and logistics trajectories.
5. Scientific big data: Store massive data such as meteorological and genetic data.
6. Recommendation systems: Store data for user personas and item features.
7. Risk control systems: Store real-time risk control rules and historical behavioral data."""
    message_4.metadata = {
        "message_type": "assistant",
        "model": "qwen-max"
    }
    memory_store.put_message(message_4)
    print(f"Assistant: {message_4.content}")

    print("\n====== Session creation and conversation complete ======")
    print(f"Session ID: {session.session_id}")
    print(f"User ID: {session.user_id}")
    print(f"Completed 2 rounds of conversation with 4 messages")


if __name__ == "__main__":
    main()

Query the list of historical sessions

import tablestore
from tablestore_for_agent_memory.base.common import MetaType
from tablestore_for_agent_memory.memory.memory_store import MemoryStore
import os


def main():
    endpoint = os.getenv('TABLESTORE_ENDPOINT')
    access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
    access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
    instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')

    required_env_vars = {
        'TABLESTORE_ENDPOINT': endpoint,
        'TABLESTORE_ACCESS_KEY_ID': access_key_id,
        'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
        'TABLESTORE_INSTANCE_NAME': instance_name
    }

    missing_vars = [var for var, value in required_env_vars.items() if not value]
    if missing_vars:
        print(f"Error: Missing required environment variables: {', '.join(missing_vars)}")
        print("Set the following environment variables:")
        for var in missing_vars:
            print(f"  export {var}=your_value")
        exit(1)

    tablestore_client = tablestore.OTSClient(
        endpoint,
        access_key_id,
        access_key_secret,
        instance_name,
        retry_policy=tablestore.WriteRetryPolicy(),
    )

    session_secondary_index_meta = {
        "meta_string": MetaType.STRING,
        "meta_long": MetaType.INTEGER,
        "meta_double": MetaType.DOUBLE,
        "meta_boolean": MetaType.BOOLEAN,
        "meta_bytes": MetaType.BINARY,
    }

    session_search_index_schema = [
        tablestore.FieldSchema(
            "title",
            tablestore.FieldType.TEXT,
            analyzer=tablestore.AnalyzerType.FUZZY,
            analyzer_parameter=tablestore.FuzzyAnalyzerParameter(1, 4),
        ),
        tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
        tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
        tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
    ]

    message_search_index_schema = [
        tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
        tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
        tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
    ]

    memory_store = MemoryStore(
        tablestore_client=tablestore_client,
        session_secondary_index_meta=session_secondary_index_meta,
        session_search_index_schema=session_search_index_schema,
        message_search_index_schema=message_search_index_schema,
    )

    print("====== Querying list of historical sessions ======\n")

    user_id = "test_user_1"
    max_count = 10

    print(f"Querying recent sessions for user {user_id}...")

    try:
        sessions = list(memory_store.list_recent_sessions(user_id=user_id, max_count=max_count))
        
        if not sessions:
            print(f"\nUser {user_id} has no historical sessions")
        else:
            print(f"\nFound {len(sessions)} sessions in total:\n")
            
            for idx, session in enumerate(sessions, 1):
                print(f"Session {idx}:")
                print(f"  - Session ID: {session.session_id}")
                print(f"  - User ID: {session.user_id}")
                print(f"  - Update time: {session.update_time if hasattr(session, 'update_time') else 'Unknown'}")
                
                if session.metadata:
                    print(f"  - Metadata:")
                    for key, value in session.metadata.items():
                        print(f"      {key}: {value}")
                print()
                
    except Exception as e:
        print(f"Failed to query the session list: {e}")

    print("====== Query complete ======")


if __name__ == "__main__":
    main()

Query the details of a specific session

import tablestore
from tablestore_for_agent_memory.base.common import MetaType
from tablestore_for_agent_memory.memory.memory_store import MemoryStore
import os


def main():
    endpoint = os.getenv('TABLESTORE_ENDPOINT')
    access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
    access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
    instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')

    required_env_vars = {
        'TABLESTORE_ENDPOINT': endpoint,
        'TABLESTORE_ACCESS_KEY_ID': access_key_id,
        'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
        'TABLESTORE_INSTANCE_NAME': instance_name
    }

    missing_vars = [var for var, value in required_env_vars.items() if not value]
    if missing_vars:
        print(f"Error: Missing required environment variables: {', '.join(missing_vars)}")
        print("Set the following environment variables:")
        for var in missing_vars:
            print(f"  export {var}=your_value")
        exit(1)

    tablestore_client = tablestore.OTSClient(
        endpoint,
        access_key_id,
        access_key_secret,
        instance_name,
        retry_policy=tablestore.WriteRetryPolicy(),
    )

    session_secondary_index_meta = {
        "meta_string": MetaType.STRING,
        "meta_long": MetaType.INTEGER,
        "meta_double": MetaType.DOUBLE,
        "meta_boolean": MetaType.BOOLEAN,
        "meta_bytes": MetaType.BINARY,
    }

    session_search_index_schema = [
        tablestore.FieldSchema(
            "title",
            tablestore.FieldType.TEXT,
            analyzer=tablestore.AnalyzerType.FUZZY,
            analyzer_parameter=tablestore.FuzzyAnalyzerParameter(1, 4),
        ),
        tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
        tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
        tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
    ]

    message_search_index_schema = [
        tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
        tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
        tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
    ]

    memory_store = MemoryStore(
        tablestore_client=tablestore_client,
        session_secondary_index_meta=session_secondary_index_meta,
        session_search_index_schema=session_search_index_schema,
        message_search_index_schema=message_search_index_schema,
    )

    print("====== Querying details of a specific session ======\n")

    user_id = "test_user_1"
    session_id = "session_001"

    print(f"Querying session details...")
    print(f"User ID: {user_id}")
    print(f"Session ID: {session_id}\n")

    try:
        session = memory_store.get_session(user_id=user_id, session_id=session_id)
        
        if session:
            print("Session details:")
            print("=" * 50)
            print(f"User ID: {session.user_id}")
            print(f"Session ID: {session.session_id}")
            print(f"Update time: {session.update_time if hasattr(session, 'update_time') else 'Unknown'}")
            
            if session.metadata:
                print("\nMetadata information:")
                print("-" * 50)
                for key, value in session.metadata.items():
                    print(f"  {key}: {value}")
            else:
                print("\nMetadata: None")
                
            print("=" * 50)
        else:
            print(f"The specified session was not found (user_id={user_id}, session_id={session_id})")
            
    except Exception as e:
        print(f"Failed to query session details: {e}")
        import traceback
        traceback.print_exc()

    print("\n====== Query complete ======")


if __name__ == "__main__":
    main()

Query the complete conversation record of a specific session

import tablestore
from tablestore_for_agent_memory.base.common import MetaType
from tablestore_for_agent_memory.memory.memory_store import MemoryStore
import os


def main():
    endpoint = os.getenv('TABLESTORE_ENDPOINT')
    access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
    access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
    instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')

    required_env_vars = {
        'TABLESTORE_ENDPOINT': endpoint,
        'TABLESTORE_ACCESS_KEY_ID': access_key_id,
        'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
        'TABLESTORE_INSTANCE_NAME': instance_name
    }

    missing_vars = [var for var, value in required_env_vars.items() if not value]
    if missing_vars:
        print(f"Error: Missing required environment variables: {', '.join(missing_vars)}")
        print("Set the following environment variables:")
        for var in missing_vars:
            print(f"  export {var}=your_value")
        exit(1)

    tablestore_client = tablestore.OTSClient(
        endpoint,
        access_key_id,
        access_key_secret,
        instance_name,
        retry_policy=tablestore.WriteRetryPolicy(),
    )

    session_secondary_index_meta = {
        "meta_string": MetaType.STRING,
        "meta_long": MetaType.INTEGER,
        "meta_double": MetaType.DOUBLE,
        "meta_boolean": MetaType.BOOLEAN,
        "meta_bytes": MetaType.BINARY,
    }

    session_search_index_schema = [
        tablestore.FieldSchema(
            "title",
            tablestore.FieldType.TEXT,
            analyzer=tablestore.AnalyzerType.FUZZY,
            analyzer_parameter=tablestore.FuzzyAnalyzerParameter(1, 4),
        ),
        tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
        tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
        tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
    ]

    message_search_index_schema = [
        tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
        tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
        tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
    ]

    memory_store = MemoryStore(
        tablestore_client=tablestore_client,
        session_secondary_index_meta=session_secondary_index_meta,
        session_search_index_schema=session_search_index_schema,
        message_search_index_schema=message_search_index_schema,
    )

    print("====== Querying the complete conversation record of a specific session ======\n")

    session_id = "session_001"

    print(f"Querying session conversation record...")
    print(f"Session ID: {session_id}\n")

    try:
        messages = list(memory_store.list_messages(session_id=session_id))
        
        if not messages:
            print(f"Session {session_id} has no conversation records")
        else:
            messages.sort(key=lambda m: m.create_time)
            
            print(f"Found {len(messages)} messages in total\n")
            print("=" * 80)
            
            round_num = 0
            for idx, message in enumerate(messages):
                message_type = message.metadata.get("message_type", "unknown")
                
                if message_type == "user":
                    round_num += 1
                    print(f"\nRound {round_num} of conversation:")
                    print("-" * 80)
                
                role = "User" if message_type == "user" else "Assistant"
                print(f"\n[{role}] (Message ID: {message.message_id})")
                print(f"Content: {message.content}")
                print(f"Creation time: {message.create_time}")
                
                if message.metadata and len(message.metadata) > 1:
                    print("Metadata:")
                    for key, value in message.metadata.items():
                        if key != "message_type":
                            print(f"  - {key}: {value}")
            
            print("\n" + "=" * 80)
            print(f"\nConversation statistics: {round_num} rounds of conversation, {len(messages)} messages in total")
            
    except Exception as e:
        print(f"Failed to query conversation records: {e}")
        import traceback
        traceback.print_exc()

    print("\n====== Query complete ======")


if __name__ == "__main__":
    main()

Example code: Knowledge scenario

The Knowledge scenario focuses on building AI knowledge bases. It supports vectorized storage and intelligent retrieval of large volumes of documents. The following example demonstrates how to create a knowledge base, import documents, and perform intelligent Q&A using methods such as vector search and full-text search.

The example code uses the text-embedding-v2 model from Alibaba Cloud Model Studio for vectorization. You must first install the relevant dependencies and set your API key as the OPENAI_API_KEY environment variable.
pip3 install openai

Create a knowledge base and write knowledge

After you write data, the search index takes a few seconds to sync. If you cannot query data using the following code examples, wait for the synchronization to complete.

import tablestore
from tablestore_for_agent_memory.knowledge.knowledge_store import KnowledgeStore
from tablestore_for_agent_memory.base.base_knowledge_store import Document
from openai import OpenAI
import os


class OpenAIEmbedding:
    def __init__(self, api_key, base_url=None, model="text-embedding-v2", dimension=1536):
        self.client = OpenAI(
            api_key=api_key,
            base_url=base_url
        )
        self.model = model
        self.dimension = dimension
    
    def embedding(self, text):
        try:
            response = self.client.embeddings.create(
                model=self.model,
                input=text
            )
            return response.data[0].embedding
        except Exception as e:
            print(f"Embedding call exception: {e}")
            return None


def main():
    endpoint = os.getenv('TABLESTORE_ENDPOINT')
    access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
    access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
    instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')
    openai_api_key = os.getenv('OPENAI_API_KEY')

    required_env_vars = {
        'TABLESTORE_ENDPOINT': endpoint,
        'TABLESTORE_ACCESS_KEY_ID': access_key_id,
        'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
        'TABLESTORE_INSTANCE_NAME': instance_name,
        'OPENAI_API_KEY': openai_api_key
    }

    missing_vars = [var for var, value in required_env_vars.items() if not value]
    if missing_vars:
        print(f"Error: Missing required environment variables: {', '.join(missing_vars)}")
        print("Set the following environment variables:")
        for var in missing_vars:
            print(f"  export {var}=your_value")
        exit(1)

    tablestore_client = tablestore.OTSClient(
        endpoint,
        access_key_id,
        access_key_secret,
        instance_name,
        retry_policy=tablestore.WriteRetryPolicy(),
    )

    search_index_schema = [
        tablestore.FieldSchema("user_id", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("category", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
        tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
        tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
    ]

    base_url = "https://dashscope.aliyuncs.com/compatible-mode/v1"
    embedding_model = OpenAIEmbedding(
        api_key=openai_api_key,
        base_url=base_url,
        model="text-embedding-v2",
        dimension=1536
    )

    knowledge_store = KnowledgeStore(
        tablestore_client=tablestore_client,
        vector_dimension=1536,
        enable_multi_tenant=True,
        search_index_schema=search_index_schema,
    )

    print("Creating tables and indexes...")
    try:
        knowledge_store.init_table()
        print("Tables and indexes created successfully.")
    except Exception as e:
        print(f"Tables and indexes already exist or failed to be created: {e}")

    print("\n====== Writing documents to the Tablestore knowledge base ======\n")

    documents_data = [
        {
            "id": "doc_001",
            "text": "Tablestore is a first-generation Apsara product developed by Alibaba Cloud. It provides storage for massive amounts of structured data and offers fast query and analysis services. Tablestore's distributed storage and powerful index engine support petabyte-level storage for a single table, tens of millions of TPS, and millisecond-level latency.",
            "category": "Product Introduction",
            "meta_long": 1
        },
        {
            "id": "doc_002",
            "text": "Tablestore supports the wide table model, with a single table supporting petabyte-level data storage and tens of millions of QPS, suitable for storing user personas, order details, and other scenarios. It also supports the time series model, which can efficiently store and query time series data generated by IoT devices and monitoring systems.",
            "category": "Data Model",
            "meta_long": 2
        },
        {
            "id": "doc_003",
            "text": "Tablestore provides multiple index types: primary key index for fast point queries and range queries; global secondary index for queries based on non-primary key columns; search index for complex query combinations and full-text search; and vector search for similarity searches in AI scenarios.",
            "category": "Index Features",
            "meta_long": 3
        },
        {
            "id": "doc_004",
            "text": "Tablestore is suitable for various scenarios: metadata management for storing metadata of massive files, videos, and images; message data for storing IM chat messages and Feed stream messages; trajectory tracing for storing time series data such as vehicle and logistics trajectories; and recommendation systems for storing user personas and item features.",
            "category": "Application Scenarios",
            "meta_long": 4
        },
        {
            "id": "doc_005",
            "text": "Tablestore's search index supports rich query capabilities, including term query, range query, prefix query, wildcard query, full-text search, geo query, and nested query. It also supports advanced features such as sorting, aggregation, and statistical analysis.",
            "category": "Query Capabilities",
            "meta_long": 5
        },
        {
            "id": "doc_006",
            "text": "Tablestore provides Agent Memory capabilities, including Memory Store for storing sessions and message records, and Knowledge Store for storing knowledge base documents and supporting vector search. These capabilities help build AI applications such as intelligent Q&A and chatbots.",
            "category": "AI Capabilities",
            "meta_long": 6
        },
        {
            "id": "doc_007",
            "text": "Tablestore's vector search feature supports the storage and efficient retrieval of massive vector data. It can be applied to scenarios such as image search, semantic search, and recommendation systems. It supports multiple similarity algorithms, including L2 distance and cosine similarity.",
            "category": "Vector Search",
            "meta_long": 7
        },
        {
            "id": "doc_008",
            "text": "Tablestore provides multiple data protection mechanisms: it supports data backup and recovery; provides time to live management to automatically expire and delete old data; and supports data encryption at rest to ensure data security.",
            "category": "Data Protection",
            "meta_long": 8
        }
    ]

    tenant_id = "user_tablestore_001"
    success_count = 0

    for doc_data in documents_data:
        try:
            document = Document(document_id=doc_data["id"], tenant_id=tenant_id)
            document.text = doc_data["text"]
            
            document.embedding = embedding_model.embedding(document.text)
            
            if document.embedding is None:
                print(f"✗ Failed to generate vector, skipping document {doc_data['id']}")
                continue
            
            document.metadata["category"] = doc_data["category"]
            document.metadata["meta_long"] = doc_data["meta_long"]
            document.metadata["meta_boolean"] = True
            document.metadata["user_id"] = tenant_id
            
            knowledge_store.put_document(document)
            
            success_count += 1
            print(f"✓ Wrote document {doc_data['id']}: {doc_data['category']}")
            print(f"  Content: {doc_data['text'][:60]}...")
            print()
            
        except Exception as e:
            print(f"✗ Failed to write document {doc_data['id']}: {e}")

    print("=" * 80)
    print(f"\nWrite complete: Succeeded for {success_count}/{len(documents_data)} documents")
    print(f"Tenant ID: {tenant_id}")
    print(f"Document categories: {', '.join(set([d['category'] for d in documents_data]))}")
    print("\nNote: After data is written, the search index may take a few seconds to synchronize.")


if __name__ == "__main__":
    main()

Vector search

import tablestore
from tablestore_for_agent_memory.knowledge.knowledge_store import KnowledgeStore
from openai import OpenAI
import os


class OpenAIEmbedding:
    def __init__(self, api_key, base_url=None, model="text-embedding-v2", dimension=1536):
        self.client = OpenAI(
            api_key=api_key,
            base_url=base_url
        )
        self.model = model
        self.dimension = dimension
    
    def embedding(self, text):
        try:
            response = self.client.embeddings.create(
                model=self.model,
                input=text
            )
            return response.data[0].embedding
        except Exception as e:
            print(f"Embedding call exception: {e}")
            return None


def main():
    endpoint = os.getenv('TABLESTORE_ENDPOINT')
    access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
    access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
    instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')
    openai_api_key = os.getenv('OPENAI_API_KEY')

    required_env_vars = {
        'TABLESTORE_ENDPOINT': endpoint,
        'TABLESTORE_ACCESS_KEY_ID': access_key_id,
        'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
        'TABLESTORE_INSTANCE_NAME': instance_name,
        'OPENAI_API_KEY': openai_api_key
    }

    missing_vars = [var for var, value in required_env_vars.items() if not value]
    if missing_vars:
        print(f"Error: Missing required environment variables: {', '.join(missing_vars)}")
        print("Set the following environment variables:")
        for var in missing_vars:
            print(f"  export {var}=your_value")
        exit(1)

    tablestore_client = tablestore.OTSClient(
        endpoint,
        access_key_id,
        access_key_secret,
        instance_name,
        retry_policy=tablestore.WriteRetryPolicy(),
    )

    search_index_schema = [
        tablestore.FieldSchema("user_id", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("category", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
        tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
        tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
    ]

    base_url = "https://dashscope.aliyuncs.com/compatible-mode/v1"
    embedding_model = OpenAIEmbedding(
        api_key=openai_api_key,
        base_url=base_url,
        model="text-embedding-v2",
        dimension=1536
    )

    knowledge_store = KnowledgeStore(
        tablestore_client=tablestore_client,
        vector_dimension=1536,
        enable_multi_tenant=True,
        search_index_schema=search_index_schema,
    )

    print("====== Vector search test ======\n")

    query_text = "What index types does Tablestore support?"
    tenant_id = "user_tablestore_001"

    print(f"Query question: {query_text}")
    print(f"Tenant ID: {tenant_id}")
    print(f"Number of results to return: Top 3\n")

    try:
        print("Generating query vector...")
        query_vector = embedding_model.embedding(query_text)
        
        if query_vector is None:
            print("Failed to generate query vector")
        else:
            print(f"Query vector generated successfully, dimension: {len(query_vector)}\n")
            
            response = knowledge_store.vector_search(
                query_vector=query_vector,
                tenant_id=tenant_id,
                limit=3
            )
        
            if not response.hits:
                print("No relevant documents found")
            else:
                print("=" * 80)
                print(f"Found {len(response.hits)} relevant documents:\n")
                
                for idx, hit in enumerate(response.hits, 1):
                    doc = hit.document
                    score = hit.score
                    
                    print(f"[Result {idx}]")
                    print(f"Document ID: {doc.document_id}")
                    print(f"Similarity score: {score:.4f}")
                    
                    if hasattr(doc, 'metadata') and 'category' in doc.metadata:
                        print(f"Category: {doc.metadata['category']}")
                    
                    print(f"Content: {doc.text}")
                    print("-" * 80)
                
                print()
            
    except Exception as e:
        print(f"Vector search failed: {e}")
        import traceback
        traceback.print_exc()

    print("\n====== Search complete ======")


if __name__ == "__main__":
    main()

Full-text search

import tablestore
from tablestore_for_agent_memory.knowledge.knowledge_store import KnowledgeStore, Filters
import os


def main():
    endpoint = os.getenv('TABLESTORE_ENDPOINT')
    access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
    access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
    instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')

    required_env_vars = {
        'TABLESTORE_ENDPOINT': endpoint,
        'TABLESTORE_ACCESS_KEY_ID': access_key_id,
        'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
        'TABLESTORE_INSTANCE_NAME': instance_name
    }

    missing_vars = [var for var, value in required_env_vars.items() if not value]
    if missing_vars:
        print(f"Error: Missing required environment variables: {', '.join(missing_vars)}")
        print("Set the following environment variables:")
        for var in missing_vars:
            print(f"  export {var}=your_value")
        exit(1)

    tablestore_client = tablestore.OTSClient(
        endpoint,
        access_key_id,
        access_key_secret,
        instance_name,
        retry_policy=tablestore.WriteRetryPolicy(),
    )

    search_index_schema = [
        tablestore.FieldSchema("user_id", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("category", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
        tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
        tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
    ]

    knowledge_store = KnowledgeStore(
        tablestore_client=tablestore_client,
        vector_dimension=1536,
        enable_multi_tenant=True,
        search_index_schema=search_index_schema,
    )

    print("====== Full-text search test ======\n")

    query_keyword = "vector search"
    tenant_id = "user_tablestore_001"

    print(f"Query keyword: {query_keyword}")
    print(f"Tenant ID: {tenant_id}")
    print(f"Number of results to return: Top 3\n")

    try:
        response = knowledge_store.search_documents(
            tenant_id=tenant_id,
            metadata_filter=Filters.text_match("text", query_keyword),
            limit=3
        )
        
        if not response.hits:
            print("No documents containing the keyword were found")
        else:
            print("=" * 80)
            print(f"Found {len(response.hits)} documents containing the keyword:\n")
            
            for idx, hit in enumerate(response.hits, 1):
                doc = hit.document
                score = hit.score
                
                print(f"[Result {idx}]")
                print(f"Document ID: {doc.document_id}")
                print(f"Match score: {score if score is not None else 'N/A'}")
                
                if hasattr(doc, 'metadata') and 'category' in doc.metadata:
                    print(f"Category: {doc.metadata['category']}")
                
                content = doc.text
                if query_keyword in content:
                    highlighted = content.replace(query_keyword, f"[{query_keyword}]")
                    print(f"Content: {highlighted}")
                else:
                    print(f"Content: {content}")
                
                print("-" * 80)
            
            print()
            
    except Exception as e:
        print(f"Full-text search failed: {e}")
        import traceback
        traceback.print_exc()

    print("\n====== Search complete ======")

    print("\nAdditional notes:")
    print("- Full-text search looks for documents containing the query keyword in the text field.")
    print("- You can use advanced syntax such as wildcards and phrase queries.")
    print("- Supports Chinese tokenization and fuzzy matching.")


if __name__ == "__main__":
    main()

General search

import tablestore
from tablestore_for_agent_memory.knowledge.knowledge_store import KnowledgeStore, Filters
import os


def main():
    endpoint = os.getenv('TABLESTORE_ENDPOINT')
    access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
    access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
    instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')

    required_env_vars = {
        'TABLESTORE_ENDPOINT': endpoint,
        'TABLESTORE_ACCESS_KEY_ID': access_key_id,
        'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
        'TABLESTORE_INSTANCE_NAME': instance_name
    }

    missing_vars = [var for var, value in required_env_vars.items() if not value]
    if missing_vars:
        print(f"Error: Missing required environment variables: {', '.join(missing_vars)}")
        print("Set the following environment variables:")
        for var in missing_vars:
            print(f"  export {var}=your_value")
        exit(1)

    tablestore_client = tablestore.OTSClient(
        endpoint,
        access_key_id,
        access_key_secret,
        instance_name,
        retry_policy=tablestore.WriteRetryPolicy(),
    )

    search_index_schema = [
        tablestore.FieldSchema("user_id", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("category", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
        tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
        tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
        tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
    ]

    knowledge_store = KnowledgeStore(
        tablestore_client=tablestore_client,
        vector_dimension=1536,
        enable_multi_tenant=True,
        search_index_schema=search_index_schema,
    )

    print("====== General search test ======\n")

    tenant_id = "user_tablestore_001"

    print("General search supports flexible filtered queries based on metadata, independent of vector or full-text search.")
    print(f"Tenant ID: {tenant_id}")
    print(f"Number of results to return: Top 3\n")

    print("[Scenario 1] Query for documents where the category is 'Application Scenarios'")
    print("-" * 80)

    try:
        response = knowledge_store.search_documents(
            tenant_id=tenant_id,
            limit=3,
            metadata_filter=Filters.eq("category", "Application Scenarios"),
            meta_data_to_get=["text", "category", "meta_long"]
        )
        
        if not response.hits:
            print("No matching documents found\n")
        else:
            for idx, hit in enumerate(response.hits, 1):
                doc = hit.document
                print(f"\nResult {idx}:")
                print(f"  Document ID: {doc.document_id}")
                print(f"  Category: {doc.metadata.get('category', 'N/A')}")
                print(f"  Content: {doc.text[:100]}...")
            print()
            
    except Exception as e:
        print(f"Search failed: {e}\n")

    print("\n[Scenario 2] Query for documents where meta_long > 3 and meta_boolean = True")
    print("-" * 80)

    try:
        response = knowledge_store.search_documents(
            tenant_id=tenant_id,
            limit=3,
            metadata_filter=Filters.logical_and([
                Filters.gt("meta_long", 3),
                Filters.eq("meta_boolean", True)
            ]),
            meta_data_to_get=["text", "category", "meta_long"]
        )
        
        if not response.hits:
            print("No matching documents found\n")
        else:
            for idx, hit in enumerate(response.hits, 1):
                doc = hit.document
                print(f"\nResult {idx}:")
                print(f"  Document ID: {doc.document_id}")
                print(f"  Category: {doc.metadata.get('category', 'N/A')}")
                print(f"  meta_long: {doc.metadata.get('meta_long', 'N/A')}")
                print(f"  Content: {doc.text[:80]}...")
            print()
            
    except Exception as e:
        print(f"Search failed: {e}\n")

    print("\n[Scenario 3] Query for documents where meta_long is between 2 and 5")
    print("-" * 80)

    try:
        response = knowledge_store.search_documents(
            tenant_id=tenant_id,
            limit=3,
            metadata_filter=Filters.logical_and([
                Filters.gte("meta_long", 2),
                Filters.lte("meta_long", 5)
            ]),
            meta_data_to_get=["text", "category", "meta_long"]
        )
        
        if not response.hits:
            print("No matching documents found\n")
        else:
            for idx, hit in enumerate(response.hits, 1):
                doc = hit.document
                print(f"\nResult {idx}:")
                print(f"  Document ID: {doc.document_id}")
                print(f"  Category: {doc.metadata.get('category', 'N/A')}")
                print(f"  meta_long: {doc.metadata.get('meta_long', 'N/A')}")
                print(f"  Content: {doc.text[:80]}...")
            print()
            
    except Exception as e:
        print(f"Search failed: {e}\n")

    print("\n[Scenario 4] Get all documents (without filter conditions)")
    print("-" * 80)

    try:
        response = knowledge_store.search_documents(
            tenant_id=tenant_id,
            limit=3,
            meta_data_to_get=["text", "category", "meta_long"]
        )
        
        if not response.hits:
            print("No documents found\n")
        else:
            print(f"\nFound {len(response.hits)} documents in total (displaying the first 3):")
            for idx, hit in enumerate(response.hits, 1):
                doc = hit.document
                print(f"\nResult {idx}:")
                print(f"  Document ID: {doc.document_id}")
                print(f"  Category: {doc.metadata.get('category', 'N/A')}")
                print(f"  Content: {doc.text[:60]}...")
            
            if response.next_token:
                print(f"\nThere are more results. Use next_token for pagination.")
            print()
            
    except Exception as e:
        print(f"Search failed: {e}\n")

    print("\n" + "=" * 80)
    print("\n====== Search complete ======")

    print("\nGeneral search features:")
    print("- Supports flexible filtering based on metadata fields.")
    print("- Supports exact match, range query, logical combinations, and more.")
    print("- Does not require vector or full-text search. Suitable for structured queries.")
    print("- You can specify the fields to return to reduce data transfer.")


if __name__ == "__main__":
    main()

References