Agent Memory SDK は、Tablestore 上に構築されたフレームワークで、Memory および Knowledge シナリオをサポートします。AI エージェントアプリケーション向けに、永続的で高性能なメモリストレージとセマンティック検索を提供し、文脈理解と長期記憶を備えたインテリジェントなアプリケーションを迅速に構築できます。
コアアーキテクチャ
アーキテクチャの利点
軽量設計:SDK は共通のストレージインターフェイスを抽象化して開発の複雑さを軽減し、技術的な深さと使いやすさのバランスを取っています。これにより、開発者は低レベルのデータベース API 呼び出しを直接処理することなく、ビジネスロジックに集中して迅速に成果を上げることができます。
シナリオ駆動設計:SDK は、リアルタイムのメモリストレージ (Memory) と長期的なセマンティック検索 (Knowledge) という 2 つのコアシナリオに対して完全なソリューションを提供します。基本的なストレージニーズを満たすだけでなく、要約の記録、事実データの抽出、ユーザーペルソナタグのマイニングなど、ビジネスシナリオ向けの機能を統合しています。この設計により、ストレージとアプリケーションが深く統合されます。
実証済みのビジネス価値:SDK は、成熟した業界のベストプラクティスに基づいています。これにより、開発者は複雑な技術調査を必要とせずに、自身のシナリオで AI アプリケーションのビジネス価値を迅速に検証し、実装することができます。
クイック統合
以下の Python の例では、SDK を統合して使用する方法を示します。Java の統合に関する詳細については、「Java 統合手順」をご参照ください。
前提条件
Python ランタイム環境がインストールされていることを確認してください。python3 --version コマンドを実行してバージョンを確認してください。
SDK のインストール
pip3 install tablestore-for-agent-memory環境変数の設定
必要な環境変数を以下のように設定します:
TABLESTORE_ACCESS_KEY_ID:ご利用の Alibaba Cloud アカウントまたは Resource Access Management (RAM) ユーザーの AccessKey ID。TABLESTORE_ACCESS_KEY_SECRET:ご利用の Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey シークレット。TABLESTORE_INSTANCE_NAME:インスタンス名。Tablestore コンソールから取得します。TABLESTORE_ENDPOINT:インスタンスのエンドポイント。Tablestore コンソールから取得します。
コード例:Memory シナリオ
Memory シナリオは、AI エージェントのセッションメモリを管理するために使用されます。セッション管理やメッセージストレージなどのコア機能が含まれています。以下の例では、セッションの作成、会話の記録、履歴のクエリ方法を示します。
セッションの作成と会話レコードの書き込み
import tablestore
from tablestore_for_agent_memory.base.common import MetaType, microseconds_timestamp
from tablestore_for_agent_memory.memory.memory_store import MemoryStore
from tablestore_for_agent_memory.base.base_memory_store import Session, Message
import os
def main():
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')
required_env_vars = {
'TABLESTORE_ENDPOINT': endpoint,
'TABLESTORE_ACCESS_KEY_ID': access_key_id,
'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
'TABLESTORE_INSTANCE_NAME': instance_name
}
missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
print(f"Error: Missing required environment variables: {', '.join(missing_vars)}")
print("Set the following environment variables:")
for var in missing_vars:
print(f" export {var}=your_value")
exit(1)
tablestore_client = tablestore.OTSClient(
endpoint,
access_key_id,
access_key_secret,
instance_name,
retry_policy=tablestore.WriteRetryPolicy(),
)
session_secondary_index_meta = {
"meta_string": MetaType.STRING,
"meta_long": MetaType.INTEGER,
"meta_double": MetaType.DOUBLE,
"meta_boolean": MetaType.BOOLEAN,
"meta_bytes": MetaType.BINARY,
}
session_search_index_schema = [
tablestore.FieldSchema(
"title",
tablestore.FieldType.TEXT,
analyzer=tablestore.AnalyzerType.FUZZY,
analyzer_parameter=tablestore.FuzzyAnalyzerParameter(1, 4),
),
tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]
message_search_index_schema = [
tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]
memory_store = MemoryStore(
tablestore_client=tablestore_client,
session_secondary_index_meta=session_secondary_index_meta,
session_search_index_schema=session_search_index_schema,
message_search_index_schema=message_search_index_schema,
)
print("Creating tables and indexes...")
try:
memory_store.init_table()
memory_store.init_search_index()
print("Tables and indexes created successfully.")
except Exception as e:
print(f"Tables and indexes already exist or failed to be created: {e}")
print("\n====== Create a new session ======")
session = Session(user_id="test_user_1", session_id="session_001")
session.update_time = microseconds_timestamp()
session.title = "Tablestore Consultation"
session.metadata = {
"meta_string": "web_source",
"meta_long": 1,
"meta_double": 1.0,
"meta_boolean": True,
"model_name": "qwen-max"
}
memory_store.put_session(session)
print(f"Session created successfully: user_id={session.user_id}, session_id={session.session_id}")
print("\n====== First round of conversation ======")
message_1 = Message(
session_id="session_001",
message_id="msg_001",
create_time=microseconds_timestamp()
)
message_1.content = "Hello, can you tell me what Tablestore is?"
message_1.metadata = {
"meta_string": "web",
"message_type": "user",
"meta_long": 1
}
memory_store.put_message(message_1)
print(f"User: {message_1.content}")
session.update_time = microseconds_timestamp()
memory_store.update_session(session)
message_2 = Message(
session_id="session_001",
message_id="msg_002",
create_time=microseconds_timestamp()
)
message_2.content = "Tablestore is a first-generation Apsara product developed by Alibaba Cloud. It provides storage for massive amounts of structured data and offers fast query and analysis services. It supports multiple data models, such as wide table, IM message, and time series models, to meet data storage needs in different scenarios."
message_2.metadata = {
"message_type": "assistant",
"model": "qwen-max"
}
memory_store.put_message(message_2)
print(f"Assistant: {message_2.content}")
print("\n====== Second round of conversation ======")
message_3 = Message(
session_id="session_001",
message_id="msg_003",
create_time=microseconds_timestamp()
)
message_3.content = "What are some typical application scenarios for Tablestore?"
message_3.metadata = {
"meta_string": "web",
"message_type": "user",
"meta_long": 2
}
memory_store.put_message(message_3)
print(f"User: {message_3.content}")
session.update_time = microseconds_timestamp()
memory_store.update_session(session)
message_4 = Message(
session_id="session_001",
message_id="msg_004",
create_time=microseconds_timestamp()
)
message_4.content = """Typical application scenarios for Tablestore include the following:
1. AI Agent memory storage: Store knowledge bases, long-term memory, AI session messages, and other information.
2. Metadata management: Store metadata for massive files, videos, and images.
3. Message data: Store IM chat messages, Feed stream messages, and more.
4. Trajectory tracing: Store time series data such as vehicle and logistics trajectories.
5. Scientific big data: Store massive data such as meteorological and genetic data.
6. Recommendation systems: Store data for user personas and item features.
7. Risk control systems: Store real-time risk control rules and historical behavioral data."""
message_4.metadata = {
"message_type": "assistant",
"model": "qwen-max"
}
memory_store.put_message(message_4)
print(f"Assistant: {message_4.content}")
print("\n====== Session creation and conversation complete ======")
print(f"Session ID: {session.session_id}")
print(f"User ID: {session.user_id}")
print(f"Completed 2 rounds of conversation with 4 messages")
if __name__ == "__main__":
main()履歴セッションリストのクエリ
import tablestore
from tablestore_for_agent_memory.base.common import MetaType
from tablestore_for_agent_memory.memory.memory_store import MemoryStore
import os
def main():
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')
required_env_vars = {
'TABLESTORE_ENDPOINT': endpoint,
'TABLESTORE_ACCESS_KEY_ID': access_key_id,
'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
'TABLESTORE_INSTANCE_NAME': instance_name
}
missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
print(f"Error: Missing required environment variables: {', '.join(missing_vars)}")
print("Set the following environment variables:")
for var in missing_vars:
print(f" export {var}=your_value")
exit(1)
tablestore_client = tablestore.OTSClient(
endpoint,
access_key_id,
access_key_secret,
instance_name,
retry_policy=tablestore.WriteRetryPolicy(),
)
session_secondary_index_meta = {
"meta_string": MetaType.STRING,
"meta_long": MetaType.INTEGER,
"meta_double": MetaType.DOUBLE,
"meta_boolean": MetaType.BOOLEAN,
"meta_bytes": MetaType.BINARY,
}
session_search_index_schema = [
tablestore.FieldSchema(
"title",
tablestore.FieldType.TEXT,
analyzer=tablestore.AnalyzerType.FUZZY,
analyzer_parameter=tablestore.FuzzyAnalyzerParameter(1, 4),
),
tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]
message_search_index_schema = [
tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]
memory_store = MemoryStore(
tablestore_client=tablestore_client,
session_secondary_index_meta=session_secondary_index_meta,
session_search_index_schema=session_search_index_schema,
message_search_index_schema=message_search_index_schema,
)
print("====== Querying list of historical sessions ======\n")
user_id = "test_user_1"
max_count = 10
print(f"Querying recent sessions for user {user_id}...")
try:
sessions = list(memory_store.list_recent_sessions(user_id=user_id, max_count=max_count))
if not sessions:
print(f"\nUser {user_id} has no historical sessions")
else:
print(f"\nFound {len(sessions)} sessions in total:\n")
for idx, session in enumerate(sessions, 1):
print(f"Session {idx}:")
print(f" - Session ID: {session.session_id}")
print(f" - User ID: {session.user_id}")
print(f" - Update time: {session.update_time if hasattr(session, 'update_time') else 'Unknown'}")
if session.metadata:
print(f" - Metadata:")
for key, value in session.metadata.items():
print(f" {key}: {value}")
print()
except Exception as e:
print(f"Failed to query the session list: {e}")
print("====== Query complete ======")
if __name__ == "__main__":
main()
特定セッションの詳細のクエリ
import tablestore
from tablestore_for_agent_memory.base.common import MetaType
from tablestore_for_agent_memory.memory.memory_store import MemoryStore
import os
def main():
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')
required_env_vars = {
'TABLESTORE_ENDPOINT': endpoint,
'TABLESTORE_ACCESS_KEY_ID': access_key_id,
'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
'TABLESTORE_INSTANCE_NAME': instance_name
}
missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
print(f"Error: Missing required environment variables: {', '.join(missing_vars)}")
print("Set the following environment variables:")
for var in missing_vars:
print(f" export {var}=your_value")
exit(1)
tablestore_client = tablestore.OTSClient(
endpoint,
access_key_id,
access_key_secret,
instance_name,
retry_policy=tablestore.WriteRetryPolicy(),
)
session_secondary_index_meta = {
"meta_string": MetaType.STRING,
"meta_long": MetaType.INTEGER,
"meta_double": MetaType.DOUBLE,
"meta_boolean": MetaType.BOOLEAN,
"meta_bytes": MetaType.BINARY,
}
session_search_index_schema = [
tablestore.FieldSchema(
"title",
tablestore.FieldType.TEXT,
analyzer=tablestore.AnalyzerType.FUZZY,
analyzer_parameter=tablestore.FuzzyAnalyzerParameter(1, 4),
),
tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]
message_search_index_schema = [
tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]
memory_store = MemoryStore(
tablestore_client=tablestore_client,
session_secondary_index_meta=session_secondary_index_meta,
session_search_index_schema=session_search_index_schema,
message_search_index_schema=message_search_index_schema,
)
print("====== Querying details of a specific session ======\n")
user_id = "test_user_1"
session_id = "session_001"
print(f"Querying session details...")
print(f"User ID: {user_id}")
print(f"Session ID: {session_id}\n")
try:
session = memory_store.get_session(user_id=user_id, session_id=session_id)
if session:
print("Session details:")
print("=" * 50)
print(f"User ID: {session.user_id}")
print(f"Session ID: {session.session_id}")
print(f"Update time: {session.update_time if hasattr(session, 'update_time') else 'Unknown'}")
if session.metadata:
print("\nMetadata information:")
print("-" * 50)
for key, value in session.metadata.items():
print(f" {key}: {value}")
else:
print("\nMetadata: None")
print("=" * 50)
else:
print(f"The specified session was not found (user_id={user_id}, session_id={session_id})")
except Exception as e:
print(f"Failed to query session details: {e}")
import traceback
traceback.print_exc()
print("\n====== Query complete ======")
if __name__ == "__main__":
main()
特定セッションの完全な会話レコードのクエリ
import tablestore
from tablestore_for_agent_memory.base.common import MetaType
from tablestore_for_agent_memory.memory.memory_store import MemoryStore
import os
def main():
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')
required_env_vars = {
'TABLESTORE_ENDPOINT': endpoint,
'TABLESTORE_ACCESS_KEY_ID': access_key_id,
'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
'TABLESTORE_INSTANCE_NAME': instance_name
}
missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
print(f"Error: Missing required environment variables: {', '.join(missing_vars)}")
print("Set the following environment variables:")
for var in missing_vars:
print(f" export {var}=your_value")
exit(1)
tablestore_client = tablestore.OTSClient(
endpoint,
access_key_id,
access_key_secret,
instance_name,
retry_policy=tablestore.WriteRetryPolicy(),
)
session_secondary_index_meta = {
"meta_string": MetaType.STRING,
"meta_long": MetaType.INTEGER,
"meta_double": MetaType.DOUBLE,
"meta_boolean": MetaType.BOOLEAN,
"meta_bytes": MetaType.BINARY,
}
session_search_index_schema = [
tablestore.FieldSchema(
"title",
tablestore.FieldType.TEXT,
analyzer=tablestore.AnalyzerType.FUZZY,
analyzer_parameter=tablestore.FuzzyAnalyzerParameter(1, 4),
),
tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]
message_search_index_schema = [
tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]
memory_store = MemoryStore(
tablestore_client=tablestore_client,
session_secondary_index_meta=session_secondary_index_meta,
session_search_index_schema=session_search_index_schema,
message_search_index_schema=message_search_index_schema,
)
print("====== Querying the complete conversation record of a specific session ======\n")
session_id = "session_001"
print(f"Querying session conversation record...")
print(f"Session ID: {session_id}\n")
try:
messages = list(memory_store.list_messages(session_id=session_id))
if not messages:
print(f"Session {session_id} has no conversation records")
else:
messages.sort(key=lambda m: m.create_time)
print(f"Found {len(messages)} messages in total\n")
print("=" * 80)
round_num = 0
for idx, message in enumerate(messages):
message_type = message.metadata.get("message_type", "unknown")
if message_type == "user":
round_num += 1
print(f"\nRound {round_num} of conversation:")
print("-" * 80)
role = "User" if message_type == "user" else "Assistant"
print(f"\n[{role}] (Message ID: {message.message_id})")
print(f"Content: {message.content}")
print(f"Creation time: {message.create_time}")
if message.metadata and len(message.metadata) > 1:
print("Metadata:")
for key, value in message.metadata.items():
if key != "message_type":
print(f" - {key}: {value}")
print("\n" + "=" * 80)
print(f"\nConversation statistics: {round_num} rounds of conversation, {len(messages)} messages in total")
except Exception as e:
print(f"Failed to query conversation records: {e}")
import traceback
traceback.print_exc()
print("\n====== Query complete ======")
if __name__ == "__main__":
main()
コード例:Knowledge シナリオ
Knowledge シナリオは、AI ナレッジベースの構築に重点を置いています。大量のドキュメントのベクトル化ストレージとインテリジェントな検索をサポートします。以下の例では、ナレッジベースの作成、ドキュメントのインポート、およびベクトル検索や全文検索などのメソッドを使用したインテリジェントな Q&A の実行方法を示します。
このコード例では、ベクトル化のために Alibaba Cloud Model Studio のtext-embedding-v2モデルを使用します。まず、関連する依存関係をインストールし、ご利用の API キーをOPENAI_API_KEY環境変数として設定する必要があります。
pip3 install openaiナレッジベースの作成とナレッジの書き込み
データを書き込んだ後、多次元インデックスが同期されるまで数秒かかります。以下のコード例でデータをクエリできない場合は、同期が完了するまでお待ちください。
import tablestore
from tablestore_for_agent_memory.knowledge.knowledge_store import KnowledgeStore
from tablestore_for_agent_memory.base.base_knowledge_store import Document
from openai import OpenAI
import os
class OpenAIEmbedding:
def __init__(self, api_key, base_url=None, model="text-embedding-v2", dimension=1536):
self.client = OpenAI(
api_key=api_key,
base_url=base_url
)
self.model = model
self.dimension = dimension
def embedding(self, text):
try:
response = self.client.embeddings.create(
model=self.model,
input=text
)
return response.data[0].embedding
except Exception as e:
print(f"Embedding call exception: {e}")
return None
def main():
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')
openai_api_key = os.getenv('OPENAI_API_KEY')
required_env_vars = {
'TABLESTORE_ENDPOINT': endpoint,
'TABLESTORE_ACCESS_KEY_ID': access_key_id,
'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
'TABLESTORE_INSTANCE_NAME': instance_name,
'OPENAI_API_KEY': openai_api_key
}
missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
print(f"Error: Missing required environment variables: {', '.join(missing_vars)}")
print("Set the following environment variables:")
for var in missing_vars:
print(f" export {var}=your_value")
exit(1)
tablestore_client = tablestore.OTSClient(
endpoint,
access_key_id,
access_key_secret,
instance_name,
retry_policy=tablestore.WriteRetryPolicy(),
)
search_index_schema = [
tablestore.FieldSchema("user_id", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("category", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]
base_url = "https://dashscope.aliyuncs.com/compatible-mode/v1"
embedding_model = OpenAIEmbedding(
api_key=openai_api_key,
base_url=base_url,
model="text-embedding-v2",
dimension=1536
)
knowledge_store = KnowledgeStore(
tablestore_client=tablestore_client,
vector_dimension=1536,
enable_multi_tenant=True,
search_index_schema=search_index_schema,
)
print("Creating tables and indexes...")
try:
knowledge_store.init_table()
print("Tables and indexes created successfully.")
except Exception as e:
print(f"Tables and indexes already exist or failed to be created: {e}")
print("\n====== Writing documents to the Tablestore knowledge base ======\n")
documents_data = [
{
"id": "doc_001",
"text": "Tablestore is a first-generation Apsara product developed by Alibaba Cloud. It provides storage for massive amounts of structured data and offers fast query and analysis services. Tablestore's distributed storage and powerful index engine support petabyte-level storage for a single table, tens of millions of TPS, and millisecond-level latency.",
"category": "Product Introduction",
"meta_long": 1
},
{
"id": "doc_002",
"text": "Tablestore supports the wide table model, with a single table supporting petabyte-level data storage and tens of millions of QPS, suitable for storing user personas, order details, and other scenarios. It also supports the time series model, which can efficiently store and query time series data generated by IoT devices and monitoring systems.",
"category": "Data Model",
"meta_long": 2
},
{
"id": "doc_003",
"text": "Tablestore provides multiple index types: primary key index for fast point queries and range queries; global secondary index for queries based on non-primary key columns; search index for complex query combinations and full-text search; and vector search for similarity searches in AI scenarios.",
"category": "Index Features",
"meta_long": 3
},
{
"id": "doc_004",
"text": "Tablestore is suitable for various scenarios: metadata management for storing metadata of massive files, videos, and images; message data for storing IM chat messages and Feed stream messages; trajectory tracing for storing time series data such as vehicle and logistics trajectories; and recommendation systems for storing user personas and item features.",
"category": "Application Scenarios",
"meta_long": 4
},
{
"id": "doc_005",
"text": "Tablestore's search index supports rich query capabilities, including term query, range query, prefix query, wildcard query, full-text search, geo query, and nested query. It also supports advanced features such as sorting, aggregation, and statistical analysis.",
"category": "Query Capabilities",
"meta_long": 5
},
{
"id": "doc_006",
"text": "Tablestore provides Agent Memory capabilities, including Memory Store for storing sessions and message records, and Knowledge Store for storing knowledge base documents and supporting vector search. These capabilities help build AI applications such as intelligent Q&A and chatbots.",
"category": "AI Capabilities",
"meta_long": 6
},
{
"id": "doc_007",
"text": "Tablestore's vector search feature supports the storage and efficient retrieval of massive vector data. It can be applied to scenarios such as image search, semantic search, and recommendation systems. It supports multiple similarity algorithms, including L2 distance and cosine similarity.",
"category": "Vector Search",
"meta_long": 7
},
{
"id": "doc_008",
"text": "Tablestore provides multiple data protection mechanisms: it supports data backup and recovery; provides time to live management to automatically expire and delete old data; and supports data encryption at rest to ensure data security.",
"category": "Data Protection",
"meta_long": 8
}
]
tenant_id = "user_tablestore_001"
success_count = 0
for doc_data in documents_data:
try:
document = Document(document_id=doc_data["id"], tenant_id=tenant_id)
document.text = doc_data["text"]
document.embedding = embedding_model.embedding(document.text)
if document.embedding is None:
print(f"✗ Failed to generate vector, skipping document {doc_data['id']}")
continue
document.metadata["category"] = doc_data["category"]
document.metadata["meta_long"] = doc_data["meta_long"]
document.metadata["meta_boolean"] = True
document.metadata["user_id"] = tenant_id
knowledge_store.put_document(document)
success_count += 1
print(f"✓ Wrote document {doc_data['id']}: {doc_data['category']}")
print(f" Content: {doc_data['text'][:60]}...")
print()
except Exception as e:
print(f"✗ Failed to write document {doc_data['id']}: {e}")
print("=" * 80)
print(f"\nWrite complete: Succeeded for {success_count}/{len(documents_data)} documents")
print(f"Tenant ID: {tenant_id}")
print(f"Document categories: {', '.join(set([d['category'] for d in documents_data]))}")
print("\nNote: After data is written, the search index may take a few seconds to synchronize.")
if __name__ == "__main__":
main()ベクトル検索
import tablestore
from tablestore_for_agent_memory.knowledge.knowledge_store import KnowledgeStore
from openai import OpenAI
import os
class OpenAIEmbedding:
def __init__(self, api_key, base_url=None, model="text-embedding-v2", dimension=1536):
self.client = OpenAI(
api_key=api_key,
base_url=base_url
)
self.model = model
self.dimension = dimension
def embedding(self, text):
try:
response = self.client.embeddings.create(
model=self.model,
input=text
)
return response.data[0].embedding
except Exception as e:
print(f"Embedding call exception: {e}")
return None
def main():
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')
openai_api_key = os.getenv('OPENAI_API_KEY')
required_env_vars = {
'TABLESTORE_ENDPOINT': endpoint,
'TABLESTORE_ACCESS_KEY_ID': access_key_id,
'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
'TABLESTORE_INSTANCE_NAME': instance_name,
'OPENAI_API_KEY': openai_api_key
}
missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
print(f"Error: Missing required environment variables: {', '.join(missing_vars)}")
print("Set the following environment variables:")
for var in missing_vars:
print(f" export {var}=your_value")
exit(1)
tablestore_client = tablestore.OTSClient(
endpoint,
access_key_id,
access_key_secret,
instance_name,
retry_policy=tablestore.WriteRetryPolicy(),
)
search_index_schema = [
tablestore.FieldSchema("user_id", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("category", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]
base_url = "https://dashscope.aliyuncs.com/compatible-mode/v1"
embedding_model = OpenAIEmbedding(
api_key=openai_api_key,
base_url=base_url,
model="text-embedding-v2",
dimension=1536
)
knowledge_store = KnowledgeStore(
tablestore_client=tablestore_client,
vector_dimension=1536,
enable_multi_tenant=True,
search_index_schema=search_index_schema,
)
print("====== Vector search test ======\n")
query_text = "What index types does Tablestore support?"
tenant_id = "user_tablestore_001"
print(f"Query question: {query_text}")
print(f"Tenant ID: {tenant_id}")
print(f"Number of results to return: Top 3\n")
try:
print("Generating query vector...")
query_vector = embedding_model.embedding(query_text)
if query_vector is None:
print("Failed to generate query vector")
else:
print(f"Query vector generated successfully, dimension: {len(query_vector)}\n")
response = knowledge_store.vector_search(
query_vector=query_vector,
tenant_id=tenant_id,
limit=3
)
if not response.hits:
print("No relevant documents found")
else:
print("=" * 80)
print(f"Found {len(response.hits)} relevant documents:\n")
for idx, hit in enumerate(response.hits, 1):
doc = hit.document
score = hit.score
print(f"[Result {idx}]")
print(f"Document ID: {doc.document_id}")
print(f"Similarity score: {score:.4f}")
if hasattr(doc, 'metadata') and 'category' in doc.metadata:
print(f"Category: {doc.metadata['category']}")
print(f"Content: {doc.text}")
print("-" * 80)
print()
except Exception as e:
print(f"Vector search failed: {e}")
import traceback
traceback.print_exc()
print("\n====== Search complete ======")
if __name__ == "__main__":
main()
全文検索
import tablestore
from tablestore_for_agent_memory.knowledge.knowledge_store import KnowledgeStore, Filters
import os
def main():
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')
required_env_vars = {
'TABLESTORE_ENDPOINT': endpoint,
'TABLESTORE_ACCESS_KEY_ID': access_key_id,
'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
'TABLESTORE_INSTANCE_NAME': instance_name
}
missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
print(f"Error: Missing required environment variables: {', '.join(missing_vars)}")
print("Set the following environment variables:")
for var in missing_vars:
print(f" export {var}=your_value")
exit(1)
tablestore_client = tablestore.OTSClient(
endpoint,
access_key_id,
access_key_secret,
instance_name,
retry_policy=tablestore.WriteRetryPolicy(),
)
search_index_schema = [
tablestore.FieldSchema("user_id", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("category", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]
knowledge_store = KnowledgeStore(
tablestore_client=tablestore_client,
vector_dimension=1536,
enable_multi_tenant=True,
search_index_schema=search_index_schema,
)
print("====== Full-text search test ======\n")
query_keyword = "vector search"
tenant_id = "user_tablestore_001"
print(f"Query keyword: {query_keyword}")
print(f"Tenant ID: {tenant_id}")
print(f"Number of results to return: Top 3\n")
try:
response = knowledge_store.search_documents(
tenant_id=tenant_id,
metadata_filter=Filters.text_match("text", query_keyword),
limit=3
)
if not response.hits:
print("No documents containing the keyword were found")
else:
print("=" * 80)
print(f"Found {len(response.hits)} documents containing the keyword:\n")
for idx, hit in enumerate(response.hits, 1):
doc = hit.document
score = hit.score
print(f"[Result {idx}]")
print(f"Document ID: {doc.document_id}")
print(f"Match score: {score if score is not None else 'N/A'}")
if hasattr(doc, 'metadata') and 'category' in doc.metadata:
print(f"Category: {doc.metadata['category']}")
content = doc.text
if query_keyword in content:
highlighted = content.replace(query_keyword, f"[{query_keyword}]")
print(f"Content: {highlighted}")
else:
print(f"Content: {content}")
print("-" * 80)
print()
except Exception as e:
print(f"Full-text search failed: {e}")
import traceback
traceback.print_exc()
print("\n====== Search complete ======")
print("\nAdditional notes:")
print("- Full-text search looks for documents containing the query keyword in the text field.")
print("- You can use advanced syntax such as wildcards and phrase queries.")
print("- Supports Chinese tokenization and fuzzy matching.")
if __name__ == "__main__":
main()
一般検索
import tablestore
from tablestore_for_agent_memory.knowledge.knowledge_store import KnowledgeStore, Filters
import os
def main():
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')
required_env_vars = {
'TABLESTORE_ENDPOINT': endpoint,
'TABLESTORE_ACCESS_KEY_ID': access_key_id,
'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
'TABLESTORE_INSTANCE_NAME': instance_name
}
missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
print(f"Error: Missing required environment variables: {', '.join(missing_vars)}")
print("Set the following environment variables:")
for var in missing_vars:
print(f" export {var}=your_value")
exit(1)
tablestore_client = tablestore.OTSClient(
endpoint,
access_key_id,
access_key_secret,
instance_name,
retry_policy=tablestore.WriteRetryPolicy(),
)
search_index_schema = [
tablestore.FieldSchema("user_id", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("category", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]
knowledge_store = KnowledgeStore(
tablestore_client=tablestore_client,
vector_dimension=1536,
enable_multi_tenant=True,
search_index_schema=search_index_schema,
)
print("====== General search test ======\n")
tenant_id = "user_tablestore_001"
print("General search supports flexible filtered queries based on metadata, independent of vector or full-text search.")
print(f"Tenant ID: {tenant_id}")
print(f"Number of results to return: Top 3\n")
print("[Scenario 1] Query for documents where the category is 'Application Scenarios'")
print("-" * 80)
try:
response = knowledge_store.search_documents(
tenant_id=tenant_id,
limit=3,
metadata_filter=Filters.eq("category", "Application Scenarios"),
meta_data_to_get=["text", "category", "meta_long"]
)
if not response.hits:
print("No matching documents found\n")
else:
for idx, hit in enumerate(response.hits, 1):
doc = hit.document
print(f"\nResult {idx}:")
print(f" Document ID: {doc.document_id}")
print(f" Category: {doc.metadata.get('category', 'N/A')}")
print(f" Content: {doc.text[:100]}...")
print()
except Exception as e:
print(f"Search failed: {e}\n")
print("\n[Scenario 2] Query for documents where meta_long > 3 and meta_boolean = True")
print("-" * 80)
try:
response = knowledge_store.search_documents(
tenant_id=tenant_id,
limit=3,
metadata_filter=Filters.logical_and([
Filters.gt("meta_long", 3),
Filters.eq("meta_boolean", True)
]),
meta_data_to_get=["text", "category", "meta_long"]
)
if not response.hits:
print("No matching documents found\n")
else:
for idx, hit in enumerate(response.hits, 1):
doc = hit.document
print(f"\nResult {idx}:")
print(f" Document ID: {doc.document_id}")
print(f" Category: {doc.metadata.get('category', 'N/A')}")
print(f" meta_long: {doc.metadata.get('meta_long', 'N/A')}")
print(f" Content: {doc.text[:80]}...")
print()
except Exception as e:
print(f"Search failed: {e}\n")
print("\n[Scenario 3] Query for documents where meta_long is between 2 and 5")
print("-" * 80)
try:
response = knowledge_store.search_documents(
tenant_id=tenant_id,
limit=3,
metadata_filter=Filters.logical_and([
Filters.gte("meta_long", 2),
Filters.lte("meta_long", 5)
]),
meta_data_to_get=["text", "category", "meta_long"]
)
if not response.hits:
print("No matching documents found\n")
else:
for idx, hit in enumerate(response.hits, 1):
doc = hit.document
print(f"\nResult {idx}:")
print(f" Document ID: {doc.document_id}")
print(f" Category: {doc.metadata.get('category', 'N/A')}")
print(f" meta_long: {doc.metadata.get('meta_long', 'N/A')}")
print(f" Content: {doc.text[:80]}...")
print()
except Exception as e:
print(f"Search failed: {e}\n")
print("\n[Scenario 4] Get all documents (without filter conditions)")
print("-" * 80)
try:
response = knowledge_store.search_documents(
tenant_id=tenant_id,
limit=3,
meta_data_to_get=["text", "category", "meta_long"]
)
if not response.hits:
print("No documents found\n")
else:
print(f"\nFound {len(response.hits)} documents in total (displaying the first 3):")
for idx, hit in enumerate(response.hits, 1):
doc = hit.document
print(f"\nResult {idx}:")
print(f" Document ID: {doc.document_id}")
print(f" Category: {doc.metadata.get('category', 'N/A')}")
print(f" Content: {doc.text[:60]}...")
if response.next_token:
print(f"\nThere are more results. Use next_token for pagination.")
print()
except Exception as e:
print(f"Search failed: {e}\n")
print("\n" + "=" * 80)
print("\n====== Search complete ======")
print("\nGeneral search features:")
print("- Supports flexible filtering based on metadata fields.")
print("- Supports exact match, range query, logical combinations, and more.")
print("- Does not require vector or full-text search. Suitable for structured queries.")
print("- You can specify the fields to return to reduce data transfer.")
if __name__ == "__main__":
main()
関連ドキュメント
プロジェクトアドレス: Tablestore for Agent Memory