All Products
Search
Document Center

Tablestore:Agent Memory SDK

Last Updated:Nov 29, 2025

The Agent Memory SDK is a framework built on Tablestore that supports Memory and Knowledge scenarios. It provides persistent, high-performance memory storage and semantic retrieval for AI Agent applications, allowing you to quickly build intelligent applications with contextual understanding and long-term memory.

Core architecture

image

Architectural advantages

  • Lightweight design: The SDK abstracts common storage interfaces to reduce development complexity and balances technical depth with ease of use. This allows developers to focus on business logic and achieve results quickly without needing to handle low-level database API calls directly.

  • Scenario-driven design: The SDK provides complete solutions for two core scenarios: real-time memory storage (Memory) and long-term semantic retrieval (Knowledge). It meets basic storage needs and integrates features for business scenarios, such as summary recording, factual data extraction, and user persona tag mining. This design achieves a deep integration of storage and application.

  • Proven business value: The SDK is based on mature industry best practices. This allows developers to quickly validate and implement the business value of AI applications in their own scenarios without requiring complex technical research.

Quick integration

The following Python examples demonstrate how to integrate and use the SDK. For more information about Java integration, see the instructions.

Prerequisites

Ensure that a Python runtime environment is installed. Run the python -version command to check the version.

Install the SDK

pip install tablestore-for-agent-memory

Configure environment variables

Set the following required environment variables:

  • TABLESTORE_ACCESS_KEY_ID: The AccessKey ID of your Alibaba Cloud account or RAM user.

  • TABLESTORE_ACCESS_KEY_SECRET: The AccessKey secret of your Alibaba Cloud account or RAM user.

  • TABLESTORE_INSTANCE_NAME: The name of the instance. Get it from the Tablestore console.

  • TABLESTORE_ENDPOINT: The endpoint of the instance. Get it from the Tablestore console.

Example code: Memory scenario

The Memory scenario is used to manage the session memory of an AI Agent. It includes core features such as session management and message storage. The following example demonstrates how to create a session, record a conversation, and query the history.

Create a session and write conversation records

import tablestore
from tablestore_for_agent_memory.base.common import MetaType, microseconds_timestamp
from tablestore_for_agent_memory.memory.memory_store import MemoryStore
from tablestore_for_agent_memory.base.base_memory_store import Session, Message
import os

# Read configurations from environment variables
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')

# Check for required environment variables
required_env_vars = {
    'TABLESTORE_ENDPOINT': endpoint,
    'TABLESTORE_ACCESS_KEY_ID': access_key_id,
    'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
    'TABLESTORE_INSTANCE_NAME': instance_name
}

missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
    print(f"Error: Missing required environment variables: {', '.join(missing_vars)}")
    print("Set the following environment variables:")
    for var in missing_vars:
        print(f"  export {var}=your_value")
    exit(1)

# Create a Tablestore SDK client
tablestore_client = tablestore.OTSClient(
    endpoint,
    access_key_id,
    access_key_secret,
    instance_name,
    retry_policy=tablestore.WriteRetryPolicy(),
)

# The fields to return when list_recent_sessions is called based on the update time of the session
session_secondary_index_meta = {
    "meta_string": MetaType.STRING,
    "meta_long": MetaType.INTEGER,
    "meta_double": MetaType.DOUBLE,
    "meta_boolean": MetaType.BOOLEAN,
    "meta_bytes": MetaType.BINARY,
}

# Metadata of the search index for the session table
session_search_index_schema = [
    tablestore.FieldSchema(
        "title",
        tablestore.FieldType.TEXT,
        analyzer=tablestore.AnalyzerType.FUZZY,
        analyzer_parameter=tablestore.FuzzyAnalyzerParameter(1, 4),
    ),
    tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
    tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
    tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]

# Metadata of the search index for the message table
message_search_index_schema = [
    tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
    tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
    tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]

# Initialize MemoryStore
memory_store = MemoryStore(
    tablestore_client=tablestore_client,
    session_secondary_index_meta=session_secondary_index_meta,
    session_search_index_schema=session_search_index_schema,
    message_search_index_schema=message_search_index_schema,
)

print("Creating tables and indexes...")
# Create tables, including secondary indexes. This needs to be done only once.
try:
    memory_store.init_table()
    memory_store.init_search_index()
    print("Tables and indexes created successfully.")
except Exception as e:
    print(f"Tables and indexes already exist or failed to be created: {e}")

# ====== Create a new session and have a two-round conversation ======
print("\n====== Create a new session ======")

# Create a Session
session = Session(user_id="test_user_1", session_id="session_001")
session.update_time = microseconds_timestamp()
session.title = "Tablestore Consultation"
session.metadata = {
    "meta_string": "web_source",
    "meta_long": 1,
    "meta_double": 1.0,
    "meta_boolean": True,
    "model_name": "qwen-max"
}

# Save the session
memory_store.put_session(session)
print(f"Session created successfully: user_id={session.user_id}, session_id={session.session_id}")

# ====== First round of conversation ======
print("\n====== First round of conversation ======")

# User question
message_1 = Message(
    session_id="session_001",
    message_id="msg_001",
    create_time=microseconds_timestamp()
)
message_1.content = "Hello, can you tell me what Tablestore is?"
message_1.metadata = {
    "meta_string": "web",
    "message_type": "user",
    "meta_long": 1
}
memory_store.put_message(message_1)
print(f"User: {message_1.content}")

# Update the session update time
session.update_time = microseconds_timestamp()
memory_store.update_session(session)

# LLM response
message_2 = Message(
    session_id="session_001",
    message_id="msg_002",
    create_time=microseconds_timestamp()
)
message_2.content = "Tablestore is a first-generation Apsara product developed by Alibaba Cloud. It provides storage for massive amounts of structured data and offers fast query and analysis services. It supports multiple data models, such as wide table, IM message, and time series models, to meet data storage needs in different scenarios."
message_2.metadata = {
    "message_type": "assistant",
    "model": "qwen-max"
}
memory_store.put_message(message_2)
print(f"Assistant: {message_2.content}")

# ====== Second round of conversation ======
print("\n====== Second round of conversation ======")

# User continues to ask
message_3 = Message(
    session_id="session_001",
    message_id="msg_003",
    create_time=microseconds_timestamp()
)
message_3.content = "What are some typical application scenarios for Tablestore?"
message_3.metadata = {
    "meta_string": "web",
    "message_type": "user",
    "meta_long": 2
}
memory_store.put_message(message_3)
print(f"User: {message_3.content}")

# Update the session update time
session.update_time = microseconds_timestamp()
memory_store.update_session(session)

# LLM response
message_4 = Message(
    session_id="session_001",
    message_id="msg_004",
    create_time=microseconds_timestamp()
)
message_4.content = """Typical application scenarios for Tablestore include the following:
1. AI Agent memory storage: Store knowledge bases, long-term memory, AI session messages, and other information.
2. Metadata management: Store metadata for massive files, videos, and images.
3. Message data: Store IM chat messages, Feed stream messages, and more.
4. Trajectory tracing: Store time series data such as vehicle and logistics trajectories.
5. Scientific big data: Store massive data such as meteorological and genetic data.
6. Recommendation systems: Store data for user personas and item features.
7. Risk control systems: Store real-time risk control rules and historical behavioral data."""
message_4.metadata = {
    "message_type": "assistant",
    "model": "qwen-max"
}
memory_store.put_message(message_4)
print(f"Assistant: {message_4.content}")

print("\n====== Session creation and conversation complete ======")
print(f"Session ID: {session.session_id}")
print(f"User ID: {session.user_id}")
print(f"Completed 2 rounds of conversation with 4 messages")

Query the list of historical sessions

import tablestore
from tablestore_for_agent_memory.base.common import MetaType
from tablestore_for_agent_memory.memory.memory_store import MemoryStore
import os

# Read configurations from environment variables
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')

# Check for required environment variables
required_env_vars = {
    'TABLESTORE_ENDPOINT': endpoint,
    'TABLESTORE_ACCESS_KEY_ID': access_key_id,
    'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
    'TABLESTORE_INSTANCE_NAME': instance_name
}

missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
    print(f"Error: Missing required environment variables: {', '.join(missing_vars)}")
    print("Set the following environment variables:")
    for var in missing_vars:
        print(f"  export {var}=your_value")
    exit(1)

# Create a Tablestore SDK client
tablestore_client = tablestore.OTSClient(
    endpoint,
    access_key_id,
    access_key_secret,
    instance_name,
    retry_policy=tablestore.WriteRetryPolicy(),
)

# The fields to return when list_recent_sessions is called based on the update time of the session
session_secondary_index_meta = {
    "meta_string": MetaType.STRING,
    "meta_long": MetaType.INTEGER,
    "meta_double": MetaType.DOUBLE,
    "meta_boolean": MetaType.BOOLEAN,
    "meta_bytes": MetaType.BINARY,
}

# Metadata of the search index for the session table
session_search_index_schema = [
    tablestore.FieldSchema(
        "title",
        tablestore.FieldType.TEXT,
        analyzer=tablestore.AnalyzerType.FUZZY,
        analyzer_parameter=tablestore.FuzzyAnalyzerParameter(1, 4),
    ),
    tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
    tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
    tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]

# Metadata of the search index for the message table
message_search_index_schema = [
    tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
    tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
    tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]

# Initialize MemoryStore
memory_store = MemoryStore(
    tablestore_client=tablestore_client,
    session_secondary_index_meta=session_secondary_index_meta,
    session_search_index_schema=session_search_index_schema,
    message_search_index_schema=message_search_index_schema,
)

print("====== Querying list of historical sessions ======\n")

# Query the list of recent sessions for a specific user
user_id = "test_user_1"
max_count = 10  # Return a maximum of 10 sessions

print(f"Querying recent sessions for user {user_id}...")

try:
    # Get the session list (sorted by update time in descending order)
    sessions = list(memory_store.list_recent_sessions(user_id=user_id, max_count=max_count))
    
    if not sessions:
        print(f"\nUser {user_id} has no historical sessions")
    else:
        print(f"\nFound {len(sessions)} sessions in total:\n")
        
        for idx, session in enumerate(sessions, 1):
            print(f"Session {idx}:")
            print(f"  - Session ID: {session.session_id}")
            print(f"  - User ID: {session.user_id}")
            print(f"  - Title: {session.title if hasattr(session, 'title') and session.title else 'No title'}")
            print(f"  - Creation time: {session.create_time if hasattr(session, 'create_time') else 'Unknown'}")
            print(f"  - Update time: {session.update_time if hasattr(session, 'update_time') else 'Unknown'}")
            
            # Display metadata
            if session.metadata:
                print(f"  - Metadata:")
                for key, value in session.metadata.items():
                    print(f"      {key}: {value}")
            print()
            
except Exception as e:
    print(f"Failed to query the session list: {e}")

print("====== Query complete ======")

Query the details of a specific session

import tablestore
from tablestore_for_agent_memory.base.common import MetaType
from tablestore_for_agent_memory.memory.memory_store import MemoryStore
import os

# Read configurations from environment variables
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')

# Check for required environment variables
required_env_vars = {
    'TABLESTORE_ENDPOINT': endpoint,
    'TABLESTORE_ACCESS_KEY_ID': access_key_id,
    'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
    'TABLESTORE_INSTANCE_NAME': instance_name
}

missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
    print(f"Error: Missing required environment variables: {', '.join(missing_vars)}")
    print("Set the following environment variables:")
    for var in missing_vars:
        print(f"  export {var}=your_value")
    exit(1)

# Create a Tablestore SDK client
tablestore_client = tablestore.OTSClient(
    endpoint,
    access_key_id,
    access_key_secret,
    instance_name,
    retry_policy=tablestore.WriteRetryPolicy(),
)

# The fields to return when list_recent_sessions is called based on the update time of the session
session_secondary_index_meta = {
    "meta_string": MetaType.STRING,
    "meta_long": MetaType.INTEGER,
    "meta_double": MetaType.DOUBLE,
    "meta_boolean": MetaType.BOOLEAN,
    "meta_bytes": MetaType.BINARY,
}

# Metadata of the search index for the session table
session_search_index_schema = [
    tablestore.FieldSchema(
        "title",
        tablestore.FieldType.TEXT,
        analyzer=tablestore.AnalyzerType.FUZZY,
        analyzer_parameter=tablestore.FuzzyAnalyzerParameter(1, 4),
    ),
    tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
    tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
    tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]

# Metadata of the search index for the message table
message_search_index_schema = [
    tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
    tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
    tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]

# Initialize MemoryStore
memory_store = MemoryStore(
    tablestore_client=tablestore_client,
    session_secondary_index_meta=session_secondary_index_meta,
    session_search_index_schema=session_search_index_schema,
    message_search_index_schema=message_search_index_schema,
)

print("====== Querying details of a specific session ======\n")

# Specify the session to query
user_id = "test_user_1"
session_id = "session_001"

print(f"Querying session details...")
print(f"User ID: {user_id}")
print(f"Session ID: {session_id}\n")

try:
    # Get session details
    session = memory_store.get_session(user_id=user_id, session_id=session_id)
    
    if session:
        print("Session details:")
        print("=" * 50)
        print(f"User ID: {session.user_id}")
        print(f"Session ID: {session.session_id}")
        print(f"Title: {session.title if hasattr(session, 'title') and session.title else 'No title'}")
        print(f"Creation time: {session.create_time if hasattr(session, 'create_time') else 'Unknown'}")
        print(f"Update time: {session.update_time if hasattr(session, 'update_time') else 'Unknown'}")
        
        # Display complete metadata information
        if session.metadata:
            print("\nMetadata information:")
            print("-" * 50)
            for key, value in session.metadata.items():
                print(f"  {key}: {value}")
        else:
            print("\nMetadata: None")
            
        print("=" * 50)
    else:
        print(f"The specified session was not found (user_id={user_id}, session_id={session_id})")
        
except Exception as e:
    print(f"Failed to query session details: {e}")
    import traceback
    traceback.print_exc()

print("\n====== Query complete ======")

Query the complete conversation record of a specific session

import tablestore
from tablestore_for_agent_memory.base.common import MetaType
from tablestore_for_agent_memory.memory.memory_store import MemoryStore
import os

# Read configurations from environment variables
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')

# Check for required environment variables
required_env_vars = {
    'TABLESTORE_ENDPOINT': endpoint,
    'TABLESTORE_ACCESS_KEY_ID': access_key_id,
    'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
    'TABLESTORE_INSTANCE_NAME': instance_name
}

missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
    print(f"Error: Missing required environment variables: {', '.join(missing_vars)}")
    print("Set the following environment variables:")
    for var in missing_vars:
        print(f"  export {var}=your_value")
    exit(1)

# Create a Tablestore SDK client
tablestore_client = tablestore.OTSClient(
    endpoint,
    access_key_id,
    access_key_secret,
    instance_name,
    retry_policy=tablestore.WriteRetryPolicy(),
)

# The fields to return when list_recent_sessions is called based on the update time of the session
session_secondary_index_meta = {
    "meta_string": MetaType.STRING,
    "meta_long": MetaType.INTEGER,
    "meta_double": MetaType.DOUBLE,
    "meta_boolean": MetaType.BOOLEAN,
    "meta_bytes": MetaType.BINARY,
}

# Metadata of the search index for the session table
session_search_index_schema = [
    tablestore.FieldSchema(
        "title",
        tablestore.FieldType.TEXT,
        analyzer=tablestore.AnalyzerType.FUZZY,
        analyzer_parameter=tablestore.FuzzyAnalyzerParameter(1, 4),
    ),
    tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
    tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
    tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]

# Metadata of the search index for the message table
message_search_index_schema = [
    tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
    tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
    tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]

# Initialize MemoryStore
memory_store = MemoryStore(
    tablestore_client=tablestore_client,
    session_secondary_index_meta=session_secondary_index_meta,
    session_search_index_schema=session_search_index_schema,
    message_search_index_schema=message_search_index_schema,
)

print("====== Querying the complete conversation record of a specific session ======\n")

# Specify the session to query
session_id = "session_001"

print(f"Querying session conversation record...")
print(f"Session ID: {session_id}\n")

try:
    # Get all message records for the session
    messages = list(memory_store.list_messages(session_id=session_id))
    
    if not messages:
        print(f"Session {session_id} has no conversation records")
    else:
        # Sort by creation time in ascending order (from oldest to newest)
        messages.sort(key=lambda m: m.create_time)
        
        print(f"Found {len(messages)} messages in total\n")
        print("=" * 80)
        
        # Display the conversation by round
        round_num = 0
        for idx, message in enumerate(messages):
            # Check if it is a user message (a new round)
            message_type = message.metadata.get("message_type", "unknown")
            
            if message_type == "user":
                round_num += 1
                print(f"\nRound {round_num} of conversation:")
                print("-" * 80)
            
            # Display message details
            role = "User" if message_type == "user" else "Assistant"
            print(f"\n[{role}] (Message ID: {message.message_id})")
            print(f"Content: {message.content}")
            print(f"Creation time: {message.create_time}")
            
            # Display metadata (if any)
            if message.metadata and len(message.metadata) > 1:  # Exclude cases with only message_type
                print("Metadata:")
                for key, value in message.metadata.items():
                    if key != "message_type":  # message_type has already been displayed
                        print(f"  - {key}: {value}")
        
        print("\n" + "=" * 80)
        print(f"\nConversation statistics: {round_num} rounds of conversation, {len(messages)} messages in total")
        
except Exception as e:
    print(f"Failed to query conversation records: {e}")
    import traceback
    traceback.print_exc()

print("\n====== Query complete ======")

Example code: Knowledge scenario

The Knowledge scenario focuses on building AI knowledge bases. It supports vectorized storage and intelligent retrieval of large volumes of documents. The following example demonstrates how to create a knowledge base, import documents, and perform intelligent Q&A using methods such as vector search and full-text search.

The example code uses the text-embedding-v2 model from Alibaba Cloud Model Studio for vectorization. You must first install the relevant dependencies and set your API key as the OPENAI_API_KEY environment variable.
pip install openai

Create a knowledge base and write knowledge

After you write data, the search index takes a few seconds to sync. If you cannot query data using the following code examples, wait for the synchronization to complete.

import tablestore
from tablestore_for_agent_memory.knowledge.knowledge_store import KnowledgeStore
from tablestore_for_agent_memory.base.base_knowledge_store import Document
from openai import OpenAI
import os

# General Embedding class (using the OpenAI protocol)
class OpenAIEmbedding:
    """Call the Embedding model using the OpenAI protocol (supports Model Studio, OpenAI, etc.)"""
    
    def __init__(self, api_key, base_url=None, model="text-embedding-v2", dimension=1536):
        """
        Initialize the Embedding client
        
        Args:
            api_key: API key
            base_url: API base URL (for Model Studio, use https://dashscope.aliyuncs.com/compatible-mode/v1)
            model: Model name
            dimension: Vector dimensions
        """
        self.client = OpenAI(
            api_key=api_key,
            base_url=base_url
        )
        self.model = model
        self.dimension = dimension
    
    def embedding(self, text):
        """Convert text to a vector"""
        try:
            response = self.client.embeddings.create(
                model=self.model,
                input=text
            )
            return response.data[0].embedding
        except Exception as e:
            print(f"Embedding call exception: {e}")
            return None


# Read configurations from environment variables
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')
openai_api_key = os.getenv('OPENAI_API_KEY')

# Check for required environment variables
required_env_vars = {
    'TABLESTORE_ENDPOINT': endpoint,
    'TABLESTORE_ACCESS_KEY_ID': access_key_id,
    'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
    'TABLESTORE_INSTANCE_NAME': instance_name,
    'OPENAI_API_KEY': openai_api_key
}

missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
    print(f"Error: Missing required environment variables: {', '.join(missing_vars)}")
    print("Set the following environment variables:")
    for var in missing_vars:
        print(f"  export {var}=your_value")
    exit(1)

# Create a Tablestore SDK client
tablestore_client = tablestore.OTSClient(
    endpoint,
    access_key_id,
    access_key_secret,
    instance_name,
    retry_policy=tablestore.WriteRetryPolicy(),
)

# The meta fields to index
search_index_schema = [
    tablestore.FieldSchema("user_id", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("category", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
    tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
    tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]

# Use the Model Studio text-embedding-v2 model (called via the OpenAI protocol, 1536 dimensions)
base_url = "https://dashscope.aliyuncs.com/compatible-mode/v1"
embedding_model = OpenAIEmbedding(
    api_key=openai_api_key,
    base_url=base_url,
    model="text-embedding-v2",
    dimension=1536
)

# Initialize KnowledgeStore
knowledge_store = KnowledgeStore(
    tablestore_client=tablestore_client,
    vector_dimension=1536,  # The vector dimension of text-embedding-v2 is 1536
    enable_multi_tenant=True,  # Enable multitenancy
    search_index_schema=search_index_schema,
)

print("Creating tables and indexes...")
# Create tables, including search indexes. This needs to be done only once.
try:
    knowledge_store.init_table()
    print("Tables and indexes created successfully.")
except Exception as e:
    print(f"Tables and indexes already exist or failed to be created: {e}")

print("\n====== Writing documents to the Tablestore knowledge base ======\n")

# Prepare multiple knowledge documents about Tablestore (with diverse topics)
documents_data = [
    {
        "id": "doc_001",
        "text": "Tablestore is a first-generation Apsara product developed by Alibaba Cloud. It provides storage for massive amounts of structured data and offers fast query and analysis services. Tablestore's distributed storage and powerful index engine support petabyte-level storage for a single table, tens of millions of TPS, and millisecond-level latency.",
        "category": "Product Introduction",
        "meta_long": 1
    },
    {
        "id": "doc_002",
        "text": "Tablestore supports the wide table model, with a single table supporting petabyte-level data storage and tens of millions of QPS, suitable for storing user personas, order details, and other scenarios. It also supports the time series model, which can efficiently store and query time series data generated by IoT devices and monitoring systems.",
        "category": "Data Model",
        "meta_long": 2
    },
    {
        "id": "doc_003",
        "text": "Tablestore provides multiple index types: primary key index for fast point queries and range queries; global secondary index for queries based on non-primary key columns; search index for complex query combinations and full-text search; and vector search for similarity searches in AI scenarios.",
        "category": "Index Features",
        "meta_long": 3
    },
    {
        "id": "doc_004",
        "text": "Tablestore is suitable for various scenarios: metadata management for storing metadata of massive files, videos, and images; message data for storing IM chat messages and Feed stream messages; trajectory tracing for storing time series data such as vehicle and logistics trajectories; and recommendation systems for storing user personas and item features.",
        "category": "Application Scenarios",
        "meta_long": 4
    },
    {
        "id": "doc_005",
        "text": "Tablestore's search index supports rich query capabilities, including term query, range query, prefix query, wildcard query, full-text search, geo query, and nested query. It also supports advanced features such as sorting, aggregation, and statistical analysis.",
        "category": "Query Capabilities",
        "meta_long": 5
    },
    {
        "id": "doc_006",
        "text": "Tablestore provides Agent Memory capabilities, including Memory Store for storing sessions and message records, and Knowledge Store for storing knowledge base documents and supporting vector search. These capabilities help build AI applications such as intelligent Q&A and chatbots.",
        "category": "AI Capabilities",
        "meta_long": 6
    },
    {
        "id": "doc_007",
        "text": "Tablestore's vector search feature supports the storage and efficient retrieval of massive vector data. It can be applied to scenarios such as image search, semantic search, and recommendation systems. It supports multiple similarity algorithms, including L2 distance and cosine similarity.",
        "category": "Vector Search",
        "meta_long": 7
    },
    {
        "id": "doc_008",
        "text": "Tablestore provides multiple data protection mechanisms: it supports data backup and recovery; provides time to live management to automatically expire and delete old data; and supports data encryption at rest to ensure data security.",
        "category": "Data Protection",
        "meta_long": 8
    }
]

# Write documents
tenant_id = "user_tablestore_001"
success_count = 0

for doc_data in documents_data:
    try:
        # Create a Document object
        document = Document(document_id=doc_data["id"], tenant_id=tenant_id)
        document.text = doc_data["text"]
        
        # Generate a vector
        document.embedding = embedding_model.embedding(document.text)
        
        if document.embedding is None:
            print(f"✗ Failed to generate vector, skipping document {doc_data['id']}")
            continue
        
        # Set metadata
        document.metadata["category"] = doc_data["category"]
        document.metadata["meta_long"] = doc_data["meta_long"]
        document.metadata["meta_boolean"] = True
        document.metadata["user_id"] = tenant_id
        
        # Write to the database
        knowledge_store.put_document(document)
        
        success_count += 1
        print(f"✓ Wrote document {doc_data['id']}: {doc_data['category']}")
        print(f"  Content: {doc_data['text'][:60]}...")
        print()
        
    except Exception as e:
        print(f"✗ Failed to write document {doc_data['id']}: {e}")

print("=" * 80)
print(f"\nWrite complete: Succeeded for {success_count}/{len(documents_data)} documents")
print(f"Tenant ID: {tenant_id}")
print(f"Document categories: {', '.join(set([d['category'] for d in documents_data]))}")
print("\nNote: After data is written, the search index may take a few seconds to synchronize.")

Vector search

import tablestore
from tablestore_for_agent_memory.knowledge.knowledge_store import KnowledgeStore
from openai import OpenAI
import os


# General Embedding class (using the OpenAI protocol)
class OpenAIEmbedding:
    """Call the Embedding model using the OpenAI protocol (supports Model Studio, OpenAI, etc.)"""
    
    def __init__(self, api_key, base_url=None, model="text-embedding-v2", dimension=1536):
        """
        Initialize the Embedding client
        
        Args:
            api_key: API key
            base_url: API base URL (for Model Studio, use https://dashscope.aliyuncs.com/compatible-mode/v1)
            model: Model name
            dimension: Vector dimensions
        """
        self.client = OpenAI(
            api_key=api_key,
            base_url=base_url
        )
        self.model = model
        self.dimension = dimension
    
    def embedding(self, text):
        """Convert text to a vector"""
        try:
            response = self.client.embeddings.create(
                model=self.model,
                input=text
            )
            return response.data[0].embedding
        except Exception as e:
            print(f"Embedding call exception: {e}")
            return None


# Read configurations from environment variables
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')
openai_api_key = os.getenv('OPENAI_API_KEY')

# Check for required environment variables
required_env_vars = {
    'TABLESTORE_ENDPOINT': endpoint,
    'TABLESTORE_ACCESS_KEY_ID': access_key_id,
    'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
    'TABLESTORE_INSTANCE_NAME': instance_name,
    'OPENAI_API_KEY': openai_api_key
}

missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
    print(f"Error: Missing required environment variables: {', '.join(missing_vars)}")
    print("Set the following environment variables:")
    for var in missing_vars:
        print(f"  export {var}=your_value")
    exit(1)

# Create a Tablestore SDK client
tablestore_client = tablestore.OTSClient(
    endpoint,
    access_key_id,
    access_key_secret,
    instance_name,
    retry_policy=tablestore.WriteRetryPolicy(),
)

# The meta fields to index
search_index_schema = [
    tablestore.FieldSchema("user_id", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("category", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
    tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
    tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]

# Use the Model Studio text-embedding-v2 model (called via the OpenAI protocol, 1536 dimensions)
base_url = "https://dashscope.aliyuncs.com/compatible-mode/v1"
embedding_model = OpenAIEmbedding(
    api_key=openai_api_key,
    base_url=base_url,
    model="text-embedding-v2",
    dimension=1536
)

# Initialize KnowledgeStore
knowledge_store = KnowledgeStore(
    tablestore_client=tablestore_client,
    vector_dimension=1536,  # The vector dimension of text-embedding-v2 is 1536
    enable_multi_tenant=True,  # Enable multitenancy
    search_index_schema=search_index_schema,
)

print("====== Vector search test ======\n")

# Query question
query_text = "What index types does Tablestore support?"
tenant_id = "user_tablestore_001"

print(f"Query question: {query_text}")
print(f"Tenant ID: {tenant_id}")
print(f"Number of results to return: Top 3\n")

try:
    # Convert the query text to a vector
    print("Generating query vector...")
    query_vector = embedding_model.embedding(query_text)
    
    if query_vector is None:
        print("Failed to generate query vector")
    else:
        print(f"Query vector generated successfully, dimension: {len(query_vector)}\n")
        
        # Perform vector search
        response = knowledge_store.vector_search(
            query_vector=query_vector,
            tenant_id=tenant_id,
            limit=3  # Return only the top 3
        )
    
        if not response.hits:
            print("No relevant documents found")
        else:
            print("=" * 80)
            print(f"Found {len(response.hits)} relevant documents:\n")
            
            for idx, hit in enumerate(response.hits, 1):
                doc = hit.document
                score = hit.score
                
                print(f"[Result {idx}]")
                print(f"Document ID: {doc.document_id}")
                print(f"Similarity score: {score:.4f}")
                
                if hasattr(doc, 'metadata') and 'category' in doc.metadata:
                    print(f"Category: {doc.metadata['category']}")
                
                print(f"Content: {doc.text}")
                print("-" * 80)
            
            print()
        
except Exception as e:
    print(f"Vector search failed: {e}")
    import traceback
    traceback.print_exc()

print("\n====== Search complete ======")

Full-text search

import tablestore
from tablestore_for_agent_memory.knowledge.knowledge_store import KnowledgeStore, Filters
import os


# Read configurations from environment variables
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')

# Check for required environment variables
required_env_vars = {
    'TABLESTORE_ENDPOINT': endpoint,
    'TABLESTORE_ACCESS_KEY_ID': access_key_id,
    'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
    'TABLESTORE_INSTANCE_NAME': instance_name
}

missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
    print(f"Error: Missing required environment variables: {', '.join(missing_vars)}")
    print("Set the following environment variables:")
    for var in missing_vars:
        print(f"  export {var}=your_value")
    exit(1)

# Create a Tablestore SDK client
tablestore_client = tablestore.OTSClient(
    endpoint,
    access_key_id,
    access_key_secret,
    instance_name,
    retry_policy=tablestore.WriteRetryPolicy(),
)

# The meta fields to index
search_index_schema = [
    tablestore.FieldSchema("user_id", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("category", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
    tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
    tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]

# Initialize KnowledgeStore
knowledge_store = KnowledgeStore(
    tablestore_client=tablestore_client,
    vector_dimension=1536,  # The vector dimension of text-embedding-v2 is 1536
    enable_multi_tenant=True,  # Enable multitenancy
    search_index_schema=search_index_schema,
)

print("====== Full-text search test ======\n")

# Query keyword
query_keyword = "vector search"
tenant_id = "user_tablestore_001"

print(f"Query keyword: {query_keyword}")
print(f"Tenant ID: {tenant_id}")
print(f"Number of results to return: Top 3\n")

try:
    # Perform a full-text search (use text_match for text matching)
    response = knowledge_store.search_documents(
        tenant_id=tenant_id,
        metadata_filter=Filters.text_match("text", query_keyword),
        limit=3  # Return only the top 3
    )
    
    if not response.hits:
        print("No documents containing the keyword were found")
    else:
        print("=" * 80)
        print(f"Found {len(response.hits)} documents containing the keyword:\n")
        
        for idx, hit in enumerate(response.hits, 1):
            doc = hit.document
            score = hit.score
            
            print(f"[Result {idx}]")
            print(f"Document ID: {doc.document_id}")
            print(f"Match score: {score if score is not None else 'N/A'}")
            
            if hasattr(doc, 'metadata') and 'category' in doc.metadata:
                print(f"Category: {doc.metadata['category']}")
            
            # Highlight the matched keyword
            content = doc.text
            if query_keyword in content:
                # Simple highlighting
                highlighted = content.replace(query_keyword, f"[{query_keyword}]")
                print(f"Content: {highlighted}")
            else:
                print(f"Content: {content}")
            
            print("-" * 80)
        
        print()
        
except Exception as e:
    print(f"Full-text search failed: {e}")
    import traceback
    traceback.print_exc()

print("\n====== Search complete ======")

print("\nAdditional notes:")
print("- Full-text search looks for documents containing the query keyword in the text field.")
print("- You can use advanced syntax such as wildcards and phrase queries.")
print("- Supports Chinese tokenization and fuzzy matching.")

General search

import tablestore
from tablestore_for_agent_memory.knowledge.knowledge_store import KnowledgeStore, Filters
import os


# Read configurations from environment variables
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')

# Check for required environment variables
required_env_vars = {
    'TABLESTORE_ENDPOINT': endpoint,
    'TABLESTORE_ACCESS_KEY_ID': access_key_id,
    'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
    'TABLESTORE_INSTANCE_NAME': instance_name
}

missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
    print(f"Error: Missing required environment variables: {', '.join(missing_vars)}")
    print("Set the following environment variables:")
    for var in missing_vars:
        print(f"  export {var}=your_value")
    exit(1)

# Create a Tablestore SDK client
tablestore_client = tablestore.OTSClient(
    endpoint,
    access_key_id,
    access_key_secret,
    instance_name,
    retry_policy=tablestore.WriteRetryPolicy(),
)

# The meta fields to index
search_index_schema = [
    tablestore.FieldSchema("user_id", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("category", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
    tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
    tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]

# Initialize KnowledgeStore
knowledge_store = KnowledgeStore(
    tablestore_client=tablestore_client,
    vector_dimension=1536,  # The vector dimension of text-embedding-v2 is 1536
    enable_multi_tenant=True,  # Enable multitenancy
    search_index_schema=search_index_schema,
)

print("====== General search test ======\n")

tenant_id = "user_tablestore_001"

print("General search supports flexible filtered queries based on metadata, independent of vector or full-text search.")
print(f"Tenant ID: {tenant_id}")
print(f"Number of results to return: Top 3\n")

# Test scenario 1: Filter by category
print("[Scenario 1] Query for documents where the category is 'Application Scenarios'")
print("-" * 80)

try:
    response = knowledge_store.search_documents(
        tenant_id=tenant_id,
        limit=3,
        metadata_filter=Filters.eq("category", "Application Scenarios"),
        meta_data_to_get=["text", "category", "meta_long"]  # Return only specified fields
    )
    
    if not response.hits:
        print("No matching documents found\n")
    else:
        for idx, hit in enumerate(response.hits, 1):
            doc = hit.document
            print(f"\nResult {idx}:")
            print(f"  Document ID: {doc.document_id}")
            print(f"  Category: {doc.metadata.get('category', 'N/A')}")
            print(f"  Content: {doc.text[:100]}...")
        print()
        
except Exception as e:
    print(f"Search failed: {e}\n")

# Test scenario 2: Filter by combined conditions
print("\n[Scenario 2] Query for documents where meta_long > 3 and meta_boolean = True")
print("-" * 80)

try:
    response = knowledge_store.search_documents(
        tenant_id=tenant_id,
        limit=3,
        metadata_filter=Filters.logical_and([
            Filters.gt("meta_long", 3),
            Filters.eq("meta_boolean", True)
        ]),
        meta_data_to_get=["text", "category", "meta_long"]
    )
    
    if not response.hits:
        print("No matching documents found\n")
    else:
        for idx, hit in enumerate(response.hits, 1):
            doc = hit.document
            print(f"\nResult {idx}:")
            print(f"  Document ID: {doc.document_id}")
            print(f"  Category: {doc.metadata.get('category', 'N/A')}")
            print(f"  meta_long: {doc.metadata.get('meta_long', 'N/A')}")
            print(f"  Content: {doc.text[:80]}...")
        print()
        
except Exception as e:
    print(f"Search failed: {e}\n")

# Test scenario 3: Range query
print("\n[Scenario 3] Query for documents where meta_long is between 2 and 5")
print("-" * 80)

try:
    response = knowledge_store.search_documents(
        tenant_id=tenant_id,
        limit=3,
        metadata_filter=Filters.logical_and([
            Filters.gte("meta_long", 2),  # Greater than or equal to 2
            Filters.lte("meta_long", 5)   # Less than or equal to 5
        ]),
        meta_data_to_get=["text", "category", "meta_long"]
    )
    
    if not response.hits:
        print("No matching documents found\n")
    else:
        for idx, hit in enumerate(response.hits, 1):
            doc = hit.document
            print(f"\nResult {idx}:")
            print(f"  Document ID: {doc.document_id}")
            print(f"  Category: {doc.metadata.get('category', 'N/A')}")
            print(f"  meta_long: {doc.metadata.get('meta_long', 'N/A')}")
            print(f"  Content: {doc.text[:80]}...")
        print()
        
except Exception as e:
    print(f"Search failed: {e}\n")

# Test scenario 4: Get all documents (without filter conditions)
print("\n[Scenario 4] Get all documents (without filter conditions)")
print("-" * 80)

try:
    response = knowledge_store.search_documents(
        tenant_id=tenant_id,
        limit=3,
        meta_data_to_get=["text", "category", "meta_long"]
    )
    
    if not response.hits:
        print("No documents found\n")
    else:
        print(f"\nFound {len(response.hits)} documents in total (displaying the first 3):")
        for idx, hit in enumerate(response.hits, 1):
            doc = hit.document
            print(f"\nResult {idx}:")
            print(f"  Document ID: {doc.document_id}")
            print(f"  Category: {doc.metadata.get('category', 'N/A')}")
            print(f"  Content: {doc.text[:60]}...")
        
        if response.next_token:
            print(f"\nThere are more results. Use next_token for pagination.")
        print()
        
except Exception as e:
    print(f"Search failed: {e}\n")

print("\n" + "=" * 80)
print("\n====== Search complete ======")

print("\nGeneral search features:")
print("- Supports flexible filtering based on metadata fields.")
print("- Supports exact match, range query, logical combinations, and more.")
print("- Does not require vector or full-text search. Suitable for structured queries.")
print("- You can specify the fields to return to reduce data transfer.")

References