The Agent Memory SDK is a framework built on Tablestore that supports Memory and Knowledge scenarios. It provides persistent, high-performance memory storage and semantic retrieval for AI Agent applications, allowing you to quickly build intelligent applications with contextual understanding and long-term memory.
Core architecture
Architectural advantages
Lightweight design: The SDK abstracts common storage interfaces to reduce development complexity and balances technical depth with ease of use. This allows developers to focus on business logic and achieve results quickly without needing to handle low-level database API calls directly.
Scenario-driven design: The SDK provides complete solutions for two core scenarios: real-time memory storage (Memory) and long-term semantic retrieval (Knowledge). It meets basic storage needs and integrates features for business scenarios, such as summary recording, factual data extraction, and user persona tag mining. This design achieves a deep integration of storage and application.
Proven business value: The SDK is based on mature industry best practices. This allows developers to quickly validate and implement the business value of AI applications in their own scenarios without requiring complex technical research.
Quick integration
The following Python examples demonstrate how to integrate and use the SDK. For more information about Java integration, see the instructions.
Prerequisites
Ensure that a Python runtime environment is installed. Run the python -version command to check the version.
Install the SDK
pip install tablestore-for-agent-memoryConfigure environment variables
Set the following required environment variables:
TABLESTORE_ACCESS_KEY_ID: The AccessKey ID of your Alibaba Cloud account or RAM user.TABLESTORE_ACCESS_KEY_SECRET: The AccessKey secret of your Alibaba Cloud account or RAM user.TABLESTORE_INSTANCE_NAME: The name of the instance. Get it from the Tablestore console.TABLESTORE_ENDPOINT: The endpoint of the instance. Get it from the Tablestore console.
Example code: Memory scenario
The Memory scenario is used to manage the session memory of an AI Agent. It includes core features such as session management and message storage. The following example demonstrates how to create a session, record a conversation, and query the history.
Create a session and write conversation records
import tablestore
from tablestore_for_agent_memory.base.common import MetaType, microseconds_timestamp
from tablestore_for_agent_memory.memory.memory_store import MemoryStore
from tablestore_for_agent_memory.base.base_memory_store import Session, Message
import os
# Read configurations from environment variables
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')
# Check for required environment variables
required_env_vars = {
'TABLESTORE_ENDPOINT': endpoint,
'TABLESTORE_ACCESS_KEY_ID': access_key_id,
'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
'TABLESTORE_INSTANCE_NAME': instance_name
}
missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
print(f"Error: Missing required environment variables: {', '.join(missing_vars)}")
print("Set the following environment variables:")
for var in missing_vars:
print(f" export {var}=your_value")
exit(1)
# Create a Tablestore SDK client
tablestore_client = tablestore.OTSClient(
endpoint,
access_key_id,
access_key_secret,
instance_name,
retry_policy=tablestore.WriteRetryPolicy(),
)
# The fields to return when list_recent_sessions is called based on the update time of the session
session_secondary_index_meta = {
"meta_string": MetaType.STRING,
"meta_long": MetaType.INTEGER,
"meta_double": MetaType.DOUBLE,
"meta_boolean": MetaType.BOOLEAN,
"meta_bytes": MetaType.BINARY,
}
# Metadata of the search index for the session table
session_search_index_schema = [
tablestore.FieldSchema(
"title",
tablestore.FieldType.TEXT,
analyzer=tablestore.AnalyzerType.FUZZY,
analyzer_parameter=tablestore.FuzzyAnalyzerParameter(1, 4),
),
tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]
# Metadata of the search index for the message table
message_search_index_schema = [
tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]
# Initialize MemoryStore
memory_store = MemoryStore(
tablestore_client=tablestore_client,
session_secondary_index_meta=session_secondary_index_meta,
session_search_index_schema=session_search_index_schema,
message_search_index_schema=message_search_index_schema,
)
print("Creating tables and indexes...")
# Create tables, including secondary indexes. This needs to be done only once.
try:
memory_store.init_table()
memory_store.init_search_index()
print("Tables and indexes created successfully.")
except Exception as e:
print(f"Tables and indexes already exist or failed to be created: {e}")
# ====== Create a new session and have a two-round conversation ======
print("\n====== Create a new session ======")
# Create a Session
session = Session(user_id="test_user_1", session_id="session_001")
session.update_time = microseconds_timestamp()
session.title = "Tablestore Consultation"
session.metadata = {
"meta_string": "web_source",
"meta_long": 1,
"meta_double": 1.0,
"meta_boolean": True,
"model_name": "qwen-max"
}
# Save the session
memory_store.put_session(session)
print(f"Session created successfully: user_id={session.user_id}, session_id={session.session_id}")
# ====== First round of conversation ======
print("\n====== First round of conversation ======")
# User question
message_1 = Message(
session_id="session_001",
message_id="msg_001",
create_time=microseconds_timestamp()
)
message_1.content = "Hello, can you tell me what Tablestore is?"
message_1.metadata = {
"meta_string": "web",
"message_type": "user",
"meta_long": 1
}
memory_store.put_message(message_1)
print(f"User: {message_1.content}")
# Update the session update time
session.update_time = microseconds_timestamp()
memory_store.update_session(session)
# LLM response
message_2 = Message(
session_id="session_001",
message_id="msg_002",
create_time=microseconds_timestamp()
)
message_2.content = "Tablestore is a first-generation Apsara product developed by Alibaba Cloud. It provides storage for massive amounts of structured data and offers fast query and analysis services. It supports multiple data models, such as wide table, IM message, and time series models, to meet data storage needs in different scenarios."
message_2.metadata = {
"message_type": "assistant",
"model": "qwen-max"
}
memory_store.put_message(message_2)
print(f"Assistant: {message_2.content}")
# ====== Second round of conversation ======
print("\n====== Second round of conversation ======")
# User continues to ask
message_3 = Message(
session_id="session_001",
message_id="msg_003",
create_time=microseconds_timestamp()
)
message_3.content = "What are some typical application scenarios for Tablestore?"
message_3.metadata = {
"meta_string": "web",
"message_type": "user",
"meta_long": 2
}
memory_store.put_message(message_3)
print(f"User: {message_3.content}")
# Update the session update time
session.update_time = microseconds_timestamp()
memory_store.update_session(session)
# LLM response
message_4 = Message(
session_id="session_001",
message_id="msg_004",
create_time=microseconds_timestamp()
)
message_4.content = """Typical application scenarios for Tablestore include the following:
1. AI Agent memory storage: Store knowledge bases, long-term memory, AI session messages, and other information.
2. Metadata management: Store metadata for massive files, videos, and images.
3. Message data: Store IM chat messages, Feed stream messages, and more.
4. Trajectory tracing: Store time series data such as vehicle and logistics trajectories.
5. Scientific big data: Store massive data such as meteorological and genetic data.
6. Recommendation systems: Store data for user personas and item features.
7. Risk control systems: Store real-time risk control rules and historical behavioral data."""
message_4.metadata = {
"message_type": "assistant",
"model": "qwen-max"
}
memory_store.put_message(message_4)
print(f"Assistant: {message_4.content}")
print("\n====== Session creation and conversation complete ======")
print(f"Session ID: {session.session_id}")
print(f"User ID: {session.user_id}")
print(f"Completed 2 rounds of conversation with 4 messages")
Query the list of historical sessions
import tablestore
from tablestore_for_agent_memory.base.common import MetaType
from tablestore_for_agent_memory.memory.memory_store import MemoryStore
import os
# Read configurations from environment variables
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')
# Check for required environment variables
required_env_vars = {
'TABLESTORE_ENDPOINT': endpoint,
'TABLESTORE_ACCESS_KEY_ID': access_key_id,
'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
'TABLESTORE_INSTANCE_NAME': instance_name
}
missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
print(f"Error: Missing required environment variables: {', '.join(missing_vars)}")
print("Set the following environment variables:")
for var in missing_vars:
print(f" export {var}=your_value")
exit(1)
# Create a Tablestore SDK client
tablestore_client = tablestore.OTSClient(
endpoint,
access_key_id,
access_key_secret,
instance_name,
retry_policy=tablestore.WriteRetryPolicy(),
)
# The fields to return when list_recent_sessions is called based on the update time of the session
session_secondary_index_meta = {
"meta_string": MetaType.STRING,
"meta_long": MetaType.INTEGER,
"meta_double": MetaType.DOUBLE,
"meta_boolean": MetaType.BOOLEAN,
"meta_bytes": MetaType.BINARY,
}
# Metadata of the search index for the session table
session_search_index_schema = [
tablestore.FieldSchema(
"title",
tablestore.FieldType.TEXT,
analyzer=tablestore.AnalyzerType.FUZZY,
analyzer_parameter=tablestore.FuzzyAnalyzerParameter(1, 4),
),
tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]
# Metadata of the search index for the message table
message_search_index_schema = [
tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]
# Initialize MemoryStore
memory_store = MemoryStore(
tablestore_client=tablestore_client,
session_secondary_index_meta=session_secondary_index_meta,
session_search_index_schema=session_search_index_schema,
message_search_index_schema=message_search_index_schema,
)
print("====== Querying list of historical sessions ======\n")
# Query the list of recent sessions for a specific user
user_id = "test_user_1"
max_count = 10 # Return a maximum of 10 sessions
print(f"Querying recent sessions for user {user_id}...")
try:
# Get the session list (sorted by update time in descending order)
sessions = list(memory_store.list_recent_sessions(user_id=user_id, max_count=max_count))
if not sessions:
print(f"\nUser {user_id} has no historical sessions")
else:
print(f"\nFound {len(sessions)} sessions in total:\n")
for idx, session in enumerate(sessions, 1):
print(f"Session {idx}:")
print(f" - Session ID: {session.session_id}")
print(f" - User ID: {session.user_id}")
print(f" - Title: {session.title if hasattr(session, 'title') and session.title else 'No title'}")
print(f" - Creation time: {session.create_time if hasattr(session, 'create_time') else 'Unknown'}")
print(f" - Update time: {session.update_time if hasattr(session, 'update_time') else 'Unknown'}")
# Display metadata
if session.metadata:
print(f" - Metadata:")
for key, value in session.metadata.items():
print(f" {key}: {value}")
print()
except Exception as e:
print(f"Failed to query the session list: {e}")
print("====== Query complete ======")
Query the details of a specific session
import tablestore
from tablestore_for_agent_memory.base.common import MetaType
from tablestore_for_agent_memory.memory.memory_store import MemoryStore
import os
# Read configurations from environment variables
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')
# Check for required environment variables
required_env_vars = {
'TABLESTORE_ENDPOINT': endpoint,
'TABLESTORE_ACCESS_KEY_ID': access_key_id,
'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
'TABLESTORE_INSTANCE_NAME': instance_name
}
missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
print(f"Error: Missing required environment variables: {', '.join(missing_vars)}")
print("Set the following environment variables:")
for var in missing_vars:
print(f" export {var}=your_value")
exit(1)
# Create a Tablestore SDK client
tablestore_client = tablestore.OTSClient(
endpoint,
access_key_id,
access_key_secret,
instance_name,
retry_policy=tablestore.WriteRetryPolicy(),
)
# The fields to return when list_recent_sessions is called based on the update time of the session
session_secondary_index_meta = {
"meta_string": MetaType.STRING,
"meta_long": MetaType.INTEGER,
"meta_double": MetaType.DOUBLE,
"meta_boolean": MetaType.BOOLEAN,
"meta_bytes": MetaType.BINARY,
}
# Metadata of the search index for the session table
session_search_index_schema = [
tablestore.FieldSchema(
"title",
tablestore.FieldType.TEXT,
analyzer=tablestore.AnalyzerType.FUZZY,
analyzer_parameter=tablestore.FuzzyAnalyzerParameter(1, 4),
),
tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]
# Metadata of the search index for the message table
message_search_index_schema = [
tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]
# Initialize MemoryStore
memory_store = MemoryStore(
tablestore_client=tablestore_client,
session_secondary_index_meta=session_secondary_index_meta,
session_search_index_schema=session_search_index_schema,
message_search_index_schema=message_search_index_schema,
)
print("====== Querying details of a specific session ======\n")
# Specify the session to query
user_id = "test_user_1"
session_id = "session_001"
print(f"Querying session details...")
print(f"User ID: {user_id}")
print(f"Session ID: {session_id}\n")
try:
# Get session details
session = memory_store.get_session(user_id=user_id, session_id=session_id)
if session:
print("Session details:")
print("=" * 50)
print(f"User ID: {session.user_id}")
print(f"Session ID: {session.session_id}")
print(f"Title: {session.title if hasattr(session, 'title') and session.title else 'No title'}")
print(f"Creation time: {session.create_time if hasattr(session, 'create_time') else 'Unknown'}")
print(f"Update time: {session.update_time if hasattr(session, 'update_time') else 'Unknown'}")
# Display complete metadata information
if session.metadata:
print("\nMetadata information:")
print("-" * 50)
for key, value in session.metadata.items():
print(f" {key}: {value}")
else:
print("\nMetadata: None")
print("=" * 50)
else:
print(f"The specified session was not found (user_id={user_id}, session_id={session_id})")
except Exception as e:
print(f"Failed to query session details: {e}")
import traceback
traceback.print_exc()
print("\n====== Query complete ======")
Query the complete conversation record of a specific session
import tablestore
from tablestore_for_agent_memory.base.common import MetaType
from tablestore_for_agent_memory.memory.memory_store import MemoryStore
import os
# Read configurations from environment variables
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')
# Check for required environment variables
required_env_vars = {
'TABLESTORE_ENDPOINT': endpoint,
'TABLESTORE_ACCESS_KEY_ID': access_key_id,
'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
'TABLESTORE_INSTANCE_NAME': instance_name
}
missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
print(f"Error: Missing required environment variables: {', '.join(missing_vars)}")
print("Set the following environment variables:")
for var in missing_vars:
print(f" export {var}=your_value")
exit(1)
# Create a Tablestore SDK client
tablestore_client = tablestore.OTSClient(
endpoint,
access_key_id,
access_key_secret,
instance_name,
retry_policy=tablestore.WriteRetryPolicy(),
)
# The fields to return when list_recent_sessions is called based on the update time of the session
session_secondary_index_meta = {
"meta_string": MetaType.STRING,
"meta_long": MetaType.INTEGER,
"meta_double": MetaType.DOUBLE,
"meta_boolean": MetaType.BOOLEAN,
"meta_bytes": MetaType.BINARY,
}
# Metadata of the search index for the session table
session_search_index_schema = [
tablestore.FieldSchema(
"title",
tablestore.FieldType.TEXT,
analyzer=tablestore.AnalyzerType.FUZZY,
analyzer_parameter=tablestore.FuzzyAnalyzerParameter(1, 4),
),
tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]
# Metadata of the search index for the message table
message_search_index_schema = [
tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]
# Initialize MemoryStore
memory_store = MemoryStore(
tablestore_client=tablestore_client,
session_secondary_index_meta=session_secondary_index_meta,
session_search_index_schema=session_search_index_schema,
message_search_index_schema=message_search_index_schema,
)
print("====== Querying the complete conversation record of a specific session ======\n")
# Specify the session to query
session_id = "session_001"
print(f"Querying session conversation record...")
print(f"Session ID: {session_id}\n")
try:
# Get all message records for the session
messages = list(memory_store.list_messages(session_id=session_id))
if not messages:
print(f"Session {session_id} has no conversation records")
else:
# Sort by creation time in ascending order (from oldest to newest)
messages.sort(key=lambda m: m.create_time)
print(f"Found {len(messages)} messages in total\n")
print("=" * 80)
# Display the conversation by round
round_num = 0
for idx, message in enumerate(messages):
# Check if it is a user message (a new round)
message_type = message.metadata.get("message_type", "unknown")
if message_type == "user":
round_num += 1
print(f"\nRound {round_num} of conversation:")
print("-" * 80)
# Display message details
role = "User" if message_type == "user" else "Assistant"
print(f"\n[{role}] (Message ID: {message.message_id})")
print(f"Content: {message.content}")
print(f"Creation time: {message.create_time}")
# Display metadata (if any)
if message.metadata and len(message.metadata) > 1: # Exclude cases with only message_type
print("Metadata:")
for key, value in message.metadata.items():
if key != "message_type": # message_type has already been displayed
print(f" - {key}: {value}")
print("\n" + "=" * 80)
print(f"\nConversation statistics: {round_num} rounds of conversation, {len(messages)} messages in total")
except Exception as e:
print(f"Failed to query conversation records: {e}")
import traceback
traceback.print_exc()
print("\n====== Query complete ======")
Example code: Knowledge scenario
The Knowledge scenario focuses on building AI knowledge bases. It supports vectorized storage and intelligent retrieval of large volumes of documents. The following example demonstrates how to create a knowledge base, import documents, and perform intelligent Q&A using methods such as vector search and full-text search.
The example code uses thetext-embedding-v2model from Alibaba Cloud Model Studio for vectorization. You must first install the relevant dependencies and set your API key as theOPENAI_API_KEYenvironment variable.
pip install openaiCreate a knowledge base and write knowledge
After you write data, the search index takes a few seconds to sync. If you cannot query data using the following code examples, wait for the synchronization to complete.
import tablestore
from tablestore_for_agent_memory.knowledge.knowledge_store import KnowledgeStore
from tablestore_for_agent_memory.base.base_knowledge_store import Document
from openai import OpenAI
import os
# General Embedding class (using the OpenAI protocol)
class OpenAIEmbedding:
"""Call the Embedding model using the OpenAI protocol (supports Model Studio, OpenAI, etc.)"""
def __init__(self, api_key, base_url=None, model="text-embedding-v2", dimension=1536):
"""
Initialize the Embedding client
Args:
api_key: API key
base_url: API base URL (for Model Studio, use https://dashscope.aliyuncs.com/compatible-mode/v1)
model: Model name
dimension: Vector dimensions
"""
self.client = OpenAI(
api_key=api_key,
base_url=base_url
)
self.model = model
self.dimension = dimension
def embedding(self, text):
"""Convert text to a vector"""
try:
response = self.client.embeddings.create(
model=self.model,
input=text
)
return response.data[0].embedding
except Exception as e:
print(f"Embedding call exception: {e}")
return None
# Read configurations from environment variables
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')
openai_api_key = os.getenv('OPENAI_API_KEY')
# Check for required environment variables
required_env_vars = {
'TABLESTORE_ENDPOINT': endpoint,
'TABLESTORE_ACCESS_KEY_ID': access_key_id,
'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
'TABLESTORE_INSTANCE_NAME': instance_name,
'OPENAI_API_KEY': openai_api_key
}
missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
print(f"Error: Missing required environment variables: {', '.join(missing_vars)}")
print("Set the following environment variables:")
for var in missing_vars:
print(f" export {var}=your_value")
exit(1)
# Create a Tablestore SDK client
tablestore_client = tablestore.OTSClient(
endpoint,
access_key_id,
access_key_secret,
instance_name,
retry_policy=tablestore.WriteRetryPolicy(),
)
# The meta fields to index
search_index_schema = [
tablestore.FieldSchema("user_id", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("category", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]
# Use the Model Studio text-embedding-v2 model (called via the OpenAI protocol, 1536 dimensions)
base_url = "https://dashscope.aliyuncs.com/compatible-mode/v1"
embedding_model = OpenAIEmbedding(
api_key=openai_api_key,
base_url=base_url,
model="text-embedding-v2",
dimension=1536
)
# Initialize KnowledgeStore
knowledge_store = KnowledgeStore(
tablestore_client=tablestore_client,
vector_dimension=1536, # The vector dimension of text-embedding-v2 is 1536
enable_multi_tenant=True, # Enable multitenancy
search_index_schema=search_index_schema,
)
print("Creating tables and indexes...")
# Create tables, including search indexes. This needs to be done only once.
try:
knowledge_store.init_table()
print("Tables and indexes created successfully.")
except Exception as e:
print(f"Tables and indexes already exist or failed to be created: {e}")
print("\n====== Writing documents to the Tablestore knowledge base ======\n")
# Prepare multiple knowledge documents about Tablestore (with diverse topics)
documents_data = [
{
"id": "doc_001",
"text": "Tablestore is a first-generation Apsara product developed by Alibaba Cloud. It provides storage for massive amounts of structured data and offers fast query and analysis services. Tablestore's distributed storage and powerful index engine support petabyte-level storage for a single table, tens of millions of TPS, and millisecond-level latency.",
"category": "Product Introduction",
"meta_long": 1
},
{
"id": "doc_002",
"text": "Tablestore supports the wide table model, with a single table supporting petabyte-level data storage and tens of millions of QPS, suitable for storing user personas, order details, and other scenarios. It also supports the time series model, which can efficiently store and query time series data generated by IoT devices and monitoring systems.",
"category": "Data Model",
"meta_long": 2
},
{
"id": "doc_003",
"text": "Tablestore provides multiple index types: primary key index for fast point queries and range queries; global secondary index for queries based on non-primary key columns; search index for complex query combinations and full-text search; and vector search for similarity searches in AI scenarios.",
"category": "Index Features",
"meta_long": 3
},
{
"id": "doc_004",
"text": "Tablestore is suitable for various scenarios: metadata management for storing metadata of massive files, videos, and images; message data for storing IM chat messages and Feed stream messages; trajectory tracing for storing time series data such as vehicle and logistics trajectories; and recommendation systems for storing user personas and item features.",
"category": "Application Scenarios",
"meta_long": 4
},
{
"id": "doc_005",
"text": "Tablestore's search index supports rich query capabilities, including term query, range query, prefix query, wildcard query, full-text search, geo query, and nested query. It also supports advanced features such as sorting, aggregation, and statistical analysis.",
"category": "Query Capabilities",
"meta_long": 5
},
{
"id": "doc_006",
"text": "Tablestore provides Agent Memory capabilities, including Memory Store for storing sessions and message records, and Knowledge Store for storing knowledge base documents and supporting vector search. These capabilities help build AI applications such as intelligent Q&A and chatbots.",
"category": "AI Capabilities",
"meta_long": 6
},
{
"id": "doc_007",
"text": "Tablestore's vector search feature supports the storage and efficient retrieval of massive vector data. It can be applied to scenarios such as image search, semantic search, and recommendation systems. It supports multiple similarity algorithms, including L2 distance and cosine similarity.",
"category": "Vector Search",
"meta_long": 7
},
{
"id": "doc_008",
"text": "Tablestore provides multiple data protection mechanisms: it supports data backup and recovery; provides time to live management to automatically expire and delete old data; and supports data encryption at rest to ensure data security.",
"category": "Data Protection",
"meta_long": 8
}
]
# Write documents
tenant_id = "user_tablestore_001"
success_count = 0
for doc_data in documents_data:
try:
# Create a Document object
document = Document(document_id=doc_data["id"], tenant_id=tenant_id)
document.text = doc_data["text"]
# Generate a vector
document.embedding = embedding_model.embedding(document.text)
if document.embedding is None:
print(f"✗ Failed to generate vector, skipping document {doc_data['id']}")
continue
# Set metadata
document.metadata["category"] = doc_data["category"]
document.metadata["meta_long"] = doc_data["meta_long"]
document.metadata["meta_boolean"] = True
document.metadata["user_id"] = tenant_id
# Write to the database
knowledge_store.put_document(document)
success_count += 1
print(f"✓ Wrote document {doc_data['id']}: {doc_data['category']}")
print(f" Content: {doc_data['text'][:60]}...")
print()
except Exception as e:
print(f"✗ Failed to write document {doc_data['id']}: {e}")
print("=" * 80)
print(f"\nWrite complete: Succeeded for {success_count}/{len(documents_data)} documents")
print(f"Tenant ID: {tenant_id}")
print(f"Document categories: {', '.join(set([d['category'] for d in documents_data]))}")
print("\nNote: After data is written, the search index may take a few seconds to synchronize.")
Vector search
import tablestore
from tablestore_for_agent_memory.knowledge.knowledge_store import KnowledgeStore
from openai import OpenAI
import os
# General Embedding class (using the OpenAI protocol)
class OpenAIEmbedding:
"""Call the Embedding model using the OpenAI protocol (supports Model Studio, OpenAI, etc.)"""
def __init__(self, api_key, base_url=None, model="text-embedding-v2", dimension=1536):
"""
Initialize the Embedding client
Args:
api_key: API key
base_url: API base URL (for Model Studio, use https://dashscope.aliyuncs.com/compatible-mode/v1)
model: Model name
dimension: Vector dimensions
"""
self.client = OpenAI(
api_key=api_key,
base_url=base_url
)
self.model = model
self.dimension = dimension
def embedding(self, text):
"""Convert text to a vector"""
try:
response = self.client.embeddings.create(
model=self.model,
input=text
)
return response.data[0].embedding
except Exception as e:
print(f"Embedding call exception: {e}")
return None
# Read configurations from environment variables
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')
openai_api_key = os.getenv('OPENAI_API_KEY')
# Check for required environment variables
required_env_vars = {
'TABLESTORE_ENDPOINT': endpoint,
'TABLESTORE_ACCESS_KEY_ID': access_key_id,
'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
'TABLESTORE_INSTANCE_NAME': instance_name,
'OPENAI_API_KEY': openai_api_key
}
missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
print(f"Error: Missing required environment variables: {', '.join(missing_vars)}")
print("Set the following environment variables:")
for var in missing_vars:
print(f" export {var}=your_value")
exit(1)
# Create a Tablestore SDK client
tablestore_client = tablestore.OTSClient(
endpoint,
access_key_id,
access_key_secret,
instance_name,
retry_policy=tablestore.WriteRetryPolicy(),
)
# The meta fields to index
search_index_schema = [
tablestore.FieldSchema("user_id", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("category", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]
# Use the Model Studio text-embedding-v2 model (called via the OpenAI protocol, 1536 dimensions)
base_url = "https://dashscope.aliyuncs.com/compatible-mode/v1"
embedding_model = OpenAIEmbedding(
api_key=openai_api_key,
base_url=base_url,
model="text-embedding-v2",
dimension=1536
)
# Initialize KnowledgeStore
knowledge_store = KnowledgeStore(
tablestore_client=tablestore_client,
vector_dimension=1536, # The vector dimension of text-embedding-v2 is 1536
enable_multi_tenant=True, # Enable multitenancy
search_index_schema=search_index_schema,
)
print("====== Vector search test ======\n")
# Query question
query_text = "What index types does Tablestore support?"
tenant_id = "user_tablestore_001"
print(f"Query question: {query_text}")
print(f"Tenant ID: {tenant_id}")
print(f"Number of results to return: Top 3\n")
try:
# Convert the query text to a vector
print("Generating query vector...")
query_vector = embedding_model.embedding(query_text)
if query_vector is None:
print("Failed to generate query vector")
else:
print(f"Query vector generated successfully, dimension: {len(query_vector)}\n")
# Perform vector search
response = knowledge_store.vector_search(
query_vector=query_vector,
tenant_id=tenant_id,
limit=3 # Return only the top 3
)
if not response.hits:
print("No relevant documents found")
else:
print("=" * 80)
print(f"Found {len(response.hits)} relevant documents:\n")
for idx, hit in enumerate(response.hits, 1):
doc = hit.document
score = hit.score
print(f"[Result {idx}]")
print(f"Document ID: {doc.document_id}")
print(f"Similarity score: {score:.4f}")
if hasattr(doc, 'metadata') and 'category' in doc.metadata:
print(f"Category: {doc.metadata['category']}")
print(f"Content: {doc.text}")
print("-" * 80)
print()
except Exception as e:
print(f"Vector search failed: {e}")
import traceback
traceback.print_exc()
print("\n====== Search complete ======")
Full-text search
import tablestore
from tablestore_for_agent_memory.knowledge.knowledge_store import KnowledgeStore, Filters
import os
# Read configurations from environment variables
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')
# Check for required environment variables
required_env_vars = {
'TABLESTORE_ENDPOINT': endpoint,
'TABLESTORE_ACCESS_KEY_ID': access_key_id,
'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
'TABLESTORE_INSTANCE_NAME': instance_name
}
missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
print(f"Error: Missing required environment variables: {', '.join(missing_vars)}")
print("Set the following environment variables:")
for var in missing_vars:
print(f" export {var}=your_value")
exit(1)
# Create a Tablestore SDK client
tablestore_client = tablestore.OTSClient(
endpoint,
access_key_id,
access_key_secret,
instance_name,
retry_policy=tablestore.WriteRetryPolicy(),
)
# The meta fields to index
search_index_schema = [
tablestore.FieldSchema("user_id", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("category", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]
# Initialize KnowledgeStore
knowledge_store = KnowledgeStore(
tablestore_client=tablestore_client,
vector_dimension=1536, # The vector dimension of text-embedding-v2 is 1536
enable_multi_tenant=True, # Enable multitenancy
search_index_schema=search_index_schema,
)
print("====== Full-text search test ======\n")
# Query keyword
query_keyword = "vector search"
tenant_id = "user_tablestore_001"
print(f"Query keyword: {query_keyword}")
print(f"Tenant ID: {tenant_id}")
print(f"Number of results to return: Top 3\n")
try:
# Perform a full-text search (use text_match for text matching)
response = knowledge_store.search_documents(
tenant_id=tenant_id,
metadata_filter=Filters.text_match("text", query_keyword),
limit=3 # Return only the top 3
)
if not response.hits:
print("No documents containing the keyword were found")
else:
print("=" * 80)
print(f"Found {len(response.hits)} documents containing the keyword:\n")
for idx, hit in enumerate(response.hits, 1):
doc = hit.document
score = hit.score
print(f"[Result {idx}]")
print(f"Document ID: {doc.document_id}")
print(f"Match score: {score if score is not None else 'N/A'}")
if hasattr(doc, 'metadata') and 'category' in doc.metadata:
print(f"Category: {doc.metadata['category']}")
# Highlight the matched keyword
content = doc.text
if query_keyword in content:
# Simple highlighting
highlighted = content.replace(query_keyword, f"[{query_keyword}]")
print(f"Content: {highlighted}")
else:
print(f"Content: {content}")
print("-" * 80)
print()
except Exception as e:
print(f"Full-text search failed: {e}")
import traceback
traceback.print_exc()
print("\n====== Search complete ======")
print("\nAdditional notes:")
print("- Full-text search looks for documents containing the query keyword in the text field.")
print("- You can use advanced syntax such as wildcards and phrase queries.")
print("- Supports Chinese tokenization and fuzzy matching.")
General search
import tablestore
from tablestore_for_agent_memory.knowledge.knowledge_store import KnowledgeStore, Filters
import os
# Read configurations from environment variables
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')
# Check for required environment variables
required_env_vars = {
'TABLESTORE_ENDPOINT': endpoint,
'TABLESTORE_ACCESS_KEY_ID': access_key_id,
'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
'TABLESTORE_INSTANCE_NAME': instance_name
}
missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
print(f"Error: Missing required environment variables: {', '.join(missing_vars)}")
print("Set the following environment variables:")
for var in missing_vars:
print(f" export {var}=your_value")
exit(1)
# Create a Tablestore SDK client
tablestore_client = tablestore.OTSClient(
endpoint,
access_key_id,
access_key_secret,
instance_name,
retry_policy=tablestore.WriteRetryPolicy(),
)
# The meta fields to index
search_index_schema = [
tablestore.FieldSchema("user_id", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("category", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]
# Initialize KnowledgeStore
knowledge_store = KnowledgeStore(
tablestore_client=tablestore_client,
vector_dimension=1536, # The vector dimension of text-embedding-v2 is 1536
enable_multi_tenant=True, # Enable multitenancy
search_index_schema=search_index_schema,
)
print("====== General search test ======\n")
tenant_id = "user_tablestore_001"
print("General search supports flexible filtered queries based on metadata, independent of vector or full-text search.")
print(f"Tenant ID: {tenant_id}")
print(f"Number of results to return: Top 3\n")
# Test scenario 1: Filter by category
print("[Scenario 1] Query for documents where the category is 'Application Scenarios'")
print("-" * 80)
try:
response = knowledge_store.search_documents(
tenant_id=tenant_id,
limit=3,
metadata_filter=Filters.eq("category", "Application Scenarios"),
meta_data_to_get=["text", "category", "meta_long"] # Return only specified fields
)
if not response.hits:
print("No matching documents found\n")
else:
for idx, hit in enumerate(response.hits, 1):
doc = hit.document
print(f"\nResult {idx}:")
print(f" Document ID: {doc.document_id}")
print(f" Category: {doc.metadata.get('category', 'N/A')}")
print(f" Content: {doc.text[:100]}...")
print()
except Exception as e:
print(f"Search failed: {e}\n")
# Test scenario 2: Filter by combined conditions
print("\n[Scenario 2] Query for documents where meta_long > 3 and meta_boolean = True")
print("-" * 80)
try:
response = knowledge_store.search_documents(
tenant_id=tenant_id,
limit=3,
metadata_filter=Filters.logical_and([
Filters.gt("meta_long", 3),
Filters.eq("meta_boolean", True)
]),
meta_data_to_get=["text", "category", "meta_long"]
)
if not response.hits:
print("No matching documents found\n")
else:
for idx, hit in enumerate(response.hits, 1):
doc = hit.document
print(f"\nResult {idx}:")
print(f" Document ID: {doc.document_id}")
print(f" Category: {doc.metadata.get('category', 'N/A')}")
print(f" meta_long: {doc.metadata.get('meta_long', 'N/A')}")
print(f" Content: {doc.text[:80]}...")
print()
except Exception as e:
print(f"Search failed: {e}\n")
# Test scenario 3: Range query
print("\n[Scenario 3] Query for documents where meta_long is between 2 and 5")
print("-" * 80)
try:
response = knowledge_store.search_documents(
tenant_id=tenant_id,
limit=3,
metadata_filter=Filters.logical_and([
Filters.gte("meta_long", 2), # Greater than or equal to 2
Filters.lte("meta_long", 5) # Less than or equal to 5
]),
meta_data_to_get=["text", "category", "meta_long"]
)
if not response.hits:
print("No matching documents found\n")
else:
for idx, hit in enumerate(response.hits, 1):
doc = hit.document
print(f"\nResult {idx}:")
print(f" Document ID: {doc.document_id}")
print(f" Category: {doc.metadata.get('category', 'N/A')}")
print(f" meta_long: {doc.metadata.get('meta_long', 'N/A')}")
print(f" Content: {doc.text[:80]}...")
print()
except Exception as e:
print(f"Search failed: {e}\n")
# Test scenario 4: Get all documents (without filter conditions)
print("\n[Scenario 4] Get all documents (without filter conditions)")
print("-" * 80)
try:
response = knowledge_store.search_documents(
tenant_id=tenant_id,
limit=3,
meta_data_to_get=["text", "category", "meta_long"]
)
if not response.hits:
print("No documents found\n")
else:
print(f"\nFound {len(response.hits)} documents in total (displaying the first 3):")
for idx, hit in enumerate(response.hits, 1):
doc = hit.document
print(f"\nResult {idx}:")
print(f" Document ID: {doc.document_id}")
print(f" Category: {doc.metadata.get('category', 'N/A')}")
print(f" Content: {doc.text[:60]}...")
if response.next_token:
print(f"\nThere are more results. Use next_token for pagination.")
print()
except Exception as e:
print(f"Search failed: {e}\n")
print("\n" + "=" * 80)
print("\n====== Search complete ======")
print("\nGeneral search features:")
print("- Supports flexible filtering based on metadata fields.")
print("- Supports exact match, range query, logical combinations, and more.")
print("- Does not require vector or full-text search. Suitable for structured queries.")
print("- You can specify the fields to return to reduce data transfer.")
References
Project address: Tablestore for Agent Memory