Build a multimodal image retrieval system using the vector retrieval capabilities of Tablestore and the multimodal embedding model from Alibaba Cloud Model Studio. The system supports search by natural language and search by image. It is suitable for scenarios such as E-commerce product search, smart photo album management, and media asset retrieval.
Solution overview
The process of building a multimodal image retrieval system includes the following core steps:
Create a table and an index: Create a Tablestore data table to store image data and a search index to enable vector retrieval.
Vectorize images: Use the Alibaba Cloud Model Studio multimodal embedding model to convert images into high-dimensional vector representations.
Write vector data: Write the generated image vector data and related metadata to Tablestore in batches.
Perform multimodal retrieval: Convert a query image or natural language text into a vector. Then, perform a similarity search in the search index. You can also use metadata conditions for precise filtering.

Preparations
Before you build the retrieval system, you must configure the environment, set up credentials, and prepare the data.
1. Install the SDKs
Make sure that Python 3.12 or later is installed.
Run the following commands to install the Tablestore Python SDK and the Alibaba Cloud Model Studio SDK.
pip install tablestore pip install dashscope pip install Pillow
2. Configure environment variables
Configure your access credentials as environment variables to ensure code security and cross-environment portability.
Before you start, obtain an API key from the Alibaba Cloud Model Studio platform and an AccessKey. Then, go to the Tablestore console to create an instance and obtain its name and endpoint.
For security reasons, public network access is disabled by default for new Tablestore instances. To use a public endpoint, go to the Network Management page of the instance to enable public network access.
export DASHSCOPE_API_KEY=<Your Alibaba Cloud Model Studio API key>
export tablestore_end_point=<Your Tablestore instance endpoint>
export tablestore_instance_name=<Your Tablestore instance name>
export tablestore_access_key_id=<Your AccessKey ID>
export tablestore_access_key_secret=<Your AccessKey secret>3. Prepare the image data
Use your own image data or the demo dataset provided in the tutorial.
git clone https://github.com/aliyun/alibabacloud-tablestore-ai-demo.gitYou can also download the demo project file directly: alibabacloud-tablestore-ai-demo-main
Step 1: Create a table and an index
Create a data table to store image vector data and a search index to support vector retrieval. Customize the table schema and index configuration based on your data characteristics and requirements. To quickly test the demo, use the following sample configuration.
1. Create a data table
# -*- coding: utf-8 -*-
"""
Create a Tablestore data table.
"""
import os
import tablestore
def main():
# Initialize the Tablestore client.
client = tablestore.OTSClient(
os.getenv("tablestore_end_point"),
os.getenv("tablestore_access_key_id"),
os.getenv("tablestore_access_key_secret"),
os.getenv("tablestore_instance_name"),
retry_policy=tablestore.WriteRetryPolicy(),
)
# Create a data table and define the primary key.
table_name = "multi_modal_retrieval"
table_meta = tablestore.TableMeta(table_name, [("image_id", "STRING")])
table_options = tablestore.TableOptions()
reserved_throughput = tablestore.ReservedThroughput(tablestore.CapacityUnit(0, 0))
try:
client.create_table(table_meta, table_options, reserved_throughput)
print(f"Data table '{table_name}' created successfully.")
except tablestore.OTSServiceError as e:
if "OTSObjectAlreadyExist" in str(e):
print(f"Data table '{table_name}' already exists.")
else:
raise
if __name__ == "__main__":
main()
2. Create a search index
Vector data is stored as strings in the Tablestore data table. To enable vector retrieval, you must create a search index and configure the vector field type. This enables similarity calculation and fast retrieval of high-dimensional vectors.
# -*- coding: utf-8 -*-
"""
Create a Tablestore search index that contains a vector field.
"""
import os
import tablestore
def main():
# Initialize the Tablestore client.
client = tablestore.OTSClient(
os.getenv("tablestore_end_point"),
os.getenv("tablestore_access_key_id"),
os.getenv("tablestore_access_key_secret"),
os.getenv("tablestore_instance_name"),
retry_policy=tablestore.WriteRetryPolicy(),
)
table_name = "multi_modal_retrieval"
index_name = "index"
# Define index fields.
field_schemas = [
tablestore.FieldSchema("image_id", tablestore.FieldType.KEYWORD, index=True, enable_sort_and_agg=True),
tablestore.FieldSchema("city", tablestore.FieldType.KEYWORD, index=True, enable_sort_and_agg=True),
tablestore.FieldSchema("height", tablestore.FieldType.LONG, index=True, enable_sort_and_agg=True),
tablestore.FieldSchema("width", tablestore.FieldType.LONG, index=True, enable_sort_and_agg=True),
tablestore.FieldSchema(
"vector",
tablestore.FieldType.VECTOR,
vector_options=tablestore.VectorOptions(
data_type=tablestore.VectorDataType.VD_FLOAT_32,
dimension=1024,
metric_type=tablestore.VectorMetricType.VM_COSINE,
),
),
]
try:
index_meta = tablestore.SearchIndexMeta(field_schemas)
client.create_search_index(table_name, index_name, index_meta)
print(f"Search index '{index_name}' created successfully.")
except tablestore.OTSServiceError as e:
if "OTSObjectAlreadyExist" in str(e):
print(f"Search index '{index_name}' already exists.")
else:
raise
if __name__ == "__main__":
main()
Step 2: Vectorize images
Call the Alibaba Cloud Model Studio multimodal vectorization model to process and vectorize images. The following example shows how to vectorize local images. For more information, see Multimodal embeddings.
Vectorizing many images can be time-consuming. The demo project provides a pre-processed vector data file named data.json that you can use directly in Step 3.# -*- coding: utf-8 -*-
"""
Demo of local image vectorization.
This shows how to use the Alibaba Cloud Model Studio multimodal vectorization model to vectorize local images.
It outputs key information such as original image details, vector dimensions, and the first few elements of the vector.
"""
import base64
import os
from pathlib import Path
import dashscope
from PIL import Image
def image_to_base64(image_path):
"""Convert an image file to a base64 string."""
with open(image_path, "rb") as f:
image_data = f.read()
return base64.b64encode(image_data).decode("utf-8")
def get_image_embedding(image_path):
"""
Call the Alibaba Cloud Model Studio multimodal vectorization model to vectorize a local image.
"""
# Convert the local image to base64.
base64_image = image_to_base64(image_path)
# Get the image format.
suffix = Path(image_path).suffix.lower()
if suffix in [".jpg", ".jpeg"]:
mime_type = "image/jpeg"
elif suffix == ".png":
mime_type = "image/png"
elif suffix == ".gif":
mime_type = "image/gif"
elif suffix == ".webp":
mime_type = "image/webp"
else:
mime_type = "image/jpeg" # Use jpeg by default.
# Construct the data URI.
data_uri = f"data:{mime_type};base64,{base64_image}"
# Call the multimodal vectorization API.
resp = dashscope.MultiModalEmbedding.call(
model="multimodal-embedding-v1",
input=[{"image": data_uri, "factor": 1.0}]
)
if resp.status_code == 200:
return resp.output["embeddings"][0]["embedding"]
else:
raise Exception(f"Vectorization failed: {resp.code} - {resp.message}")
def get_image_info(image_path):
"""Get basic information about the image."""
with Image.open(image_path) as img:
return {
"filename": os.path.basename(image_path),
"format": img.format,
"mode": img.mode,
"width": img.width,
"height": img.height,
"size_bytes": os.path.getsize(image_path),
}
def main():
# Configure paths.
current_dir = Path(__file__).parent
project_root = current_dir
image_dir = project_root / "data" / "photograph"
print("=" * 60)
print("Local image vectorization demo")
print("=" * 60)
# Get the list of images.
image_files = [f for f in os.listdir(image_dir) if f.lower().endswith(('.jpg', '.jpeg', '.png', '.gif', '.webp'))]
if not image_files:
print("No image files found.")
return
# Select the first image for the demo.
demo_image = image_files[0]
image_path = image_dir / demo_image
print(f"\n[1/3] Reading image information")
print("-" * 60)
# Get image information.
image_info = get_image_info(image_path)
print(f"Filename: {image_info['filename']}")
print(f"Format: {image_info['format']}")
print(f"Mode: {image_info['mode']}")
print(f"Width: {image_info['width']} px")
print(f"Height: {image_info['height']} px")
print(f"File size: {image_info['size_bytes']:,} bytes")
print(f"\n[2/3] Calling vectorization API")
print("-" * 60)
print("Calling the Alibaba Cloud Model Studio multimodal vectorization model...")
# Vectorize.
vector = get_image_embedding(str(image_path))
print(f"\n[3/3] Vectorization result")
print("-" * 60)
print(f"Vector dimension: {len(vector)}")
print(f"Vector type: {type(vector[0]).__name__}")
print(f"First 10 elements of the vector:")
for i, v in enumerate(vector[:10]):
print(f" [{i}] {v:.8f}")
print(" ...")
print(f"Last 5 elements of the vector:")
for i, v in enumerate(vector[-5:], start=len(vector)-5):
print(f" [{i}] {v:.8f}")
# Calculate the vector norm.
import math
norm = math.sqrt(sum(v * v for v in vector))
print(f"\nVector L2 norm: {norm:.8f}")
print("\n" + "=" * 60)
print("Vectorization demo complete!")
print("=" * 60)
if __name__ == "__main__":
main()
Step 3: Write vector data
Import image vector data into the Tablestore data table in batches. The following example reads pre-processed vector data from the demo project and writes it in batches. If you use your own business data, combine the image vectorization and data writing operations.
# -*- coding: utf-8 -*-
"""
Batch write image data to Tablestore.
"""
import json
import os
from pathlib import Path
import tablestore
def main():
# Initialize the Tablestore client.
client = tablestore.OTSClient(
os.getenv("tablestore_end_point"),
os.getenv("tablestore_access_key_id"),
os.getenv("tablestore_access_key_secret"),
os.getenv("tablestore_instance_name"),
retry_policy=tablestore.WriteRetryPolicy(),
)
table_name = "multi_modal_retrieval"
batch_size = 100
# Load data from the JSON file.
data_path = Path(__file__).parent / "data" / "data.json"
with open(data_path, "r", encoding="utf-8") as f:
data_array = json.load(f)
print(f"Loaded {len(data_array)} records.")
# Batch write to Tablestore.
put_row_items = []
success_count = 0
for idx, item in enumerate(data_array):
primary_key = [("image_id", item["image_id"])]
attribute_columns = [
("city", item.get("city", "unknown")),
("vector", json.dumps(item["vector"])),
("width", item.get("width", 0)),
("height", item.get("height", 0)),
]
row = tablestore.Row(primary_key, attribute_columns)
condition = tablestore.Condition(tablestore.RowExistenceExpectation.IGNORE)
put_row_items.append(tablestore.PutRowItem(row, condition))
# Batch write.
if len(put_row_items) >= batch_size or idx == len(data_array) - 1:
request = tablestore.BatchWriteRowRequest()
request.add(tablestore.TableInBatchWriteRowItem(table_name, put_row_items))
result = client.batch_write_row(request)
if result.is_all_succeed():
success_count += len(put_row_items)
print(f"Progress: {idx + 1}/{len(data_array)} - Wrote {len(put_row_items)} rows successfully.")
put_row_items = []
print(f"Complete: Successfully wrote {success_count} rows.")
if __name__ == "__main__":
main()
Step 4: Perform multimodal retrieval
The multimodal image retrieval system supports two retrieval modes: search by natural language and search by image. The system converts the query content into a vector representation, performs a similarity calculation in the vector index, and returns the images that best match the query semantics. You can also use metadata conditions, such as city and image size, for precise filtering.
Search by natural language
# -*- coding: utf-8 -*-
"""
Semantic search example.
Includes multiple query scenarios:
1. Semantic search using only query text.
2. Semantic search using query text and filter conditions (city, height, width).
"""
import os
import dashscope
import tablestore
from dashscope import MultiModalEmbeddingItemText
def get_client():
"""Create a Tablestore client."""
endpoint = os.getenv("tablestore_end_point")
instance_name = os.getenv("tablestore_instance_name")
access_key_id = os.getenv("tablestore_access_key_id")
access_key_secret = os.getenv("tablestore_access_key_secret")
client = tablestore.OTSClient(
endpoint,
access_key_id,
access_key_secret,
instance_name,
retry_policy=tablestore.WriteRetryPolicy(),
)
return client
def text_to_embedding(text: str) -> list[float]:
"""Convert text to an embedding."""
resp = dashscope.MultiModalEmbedding.call(
model="multimodal-embedding-v1",
input=[MultiModalEmbeddingItemText(text=text, factor=1.0)]
)
if resp.status_code == 200:
return resp.output["embeddings"][0]["embedding"]
else:
raise Exception(f"Text vectorization failed: {resp.code} - {resp.message}")
def search_by_text_only(client, table_name, index_name, query_text: str, top_k: int = 10):
"""
Scenario 1: Semantic search using only query text.
"""
print(f"\n{'='*60}")
print(f"Scenario 1: Search using only query text")
print(f"Query text: '{query_text}'")
print(f"Number of results: {top_k}")
print("="*60)
# Convert text to vector.
query_vector = text_to_embedding(query_text)
# Build the vector query.
query = tablestore.KnnVectorQuery(
field_name='vector',
top_k=top_k,
float32_query_vector=query_vector,
)
# Sort by score.
sort = tablestore.Sort(sorters=[tablestore.ScoreSort(sort_order=tablestore.SortOrder.DESC)])
search_query = tablestore.SearchQuery(
query,
limit=top_k,
get_total_count=False,
sort=sort
)
# Execute the search.
search_response = client.search(
table_name=table_name,
index_name=index_name,
search_query=search_query,
columns_to_get=tablestore.ColumnsToGet(
column_names=["image_id", "city", "height", "width"],
return_type=tablestore.ColumnReturnType.SPECIFIED
)
)
print(f"\nRequest ID: {search_response.request_id}")
print(f"\nSearch results:")
print("-" * 60)
for idx, hit in enumerate(search_response.search_hits):
row_item = parse_search_hit(hit)
print(f"{idx + 1}. Score: {hit.score:.4f} | {row_item}")
return search_response.search_hits
def search_with_city_filter(client, table_name, index_name, query_text: str, city: str, top_k: int = 10):
"""
Scenario 2: Search using query text and a city filter.
"""
print(f"\n{'='*60}")
print(f"Scenario 2: Query text + City filter")
print(f"Query text: '{query_text}'")
print(f"City filter: {city}")
print(f"Number of results: {top_k}")
print("="*60)
query_vector = text_to_embedding(query_text)
# Build the vector query with a city filter.
query = tablestore.KnnVectorQuery(
field_name='vector',
top_k=top_k,
float32_query_vector=query_vector,
filter=tablestore.TermQuery(field_name='city', column_value=city)
)
sort = tablestore.Sort(sorters=[tablestore.ScoreSort(sort_order=tablestore.SortOrder.DESC)])
search_query = tablestore.SearchQuery(query, limit=top_k, get_total_count=False, sort=sort)
search_response = client.search(
table_name=table_name,
index_name=index_name,
search_query=search_query,
columns_to_get=tablestore.ColumnsToGet(
column_names=["image_id", "city", "height", "width"],
return_type=tablestore.ColumnReturnType.SPECIFIED
)
)
print(f"\nRequest ID: {search_response.request_id}")
print(f"\nSearch results:")
print("-" * 60)
for idx, hit in enumerate(search_response.search_hits):
row_item = parse_search_hit(hit)
print(f"{idx + 1}. Score: {hit.score:.4f} | {row_item}")
return search_response.search_hits
def search_with_size_filter(client, table_name, index_name, query_text: str,
height_range: tuple = None, width_range: tuple = None, top_k: int = 10):
"""
Scenario 3: Search using query text and size filters (height, width).
"""
print(f"\n{'='*60}")
print(f"Scenario 3: Query text + Size filter")
print(f"Query text: '{query_text}'")
print(f"Height range: {height_range}")
print(f"Width range: {width_range}")
print(f"Number of results: {top_k}")
print("="*60)
query_vector = text_to_embedding(query_text)
# Build filter conditions.
must_queries = []
if height_range:
must_queries.append(tablestore.RangeQuery(
field_name='height',
range_from=height_range[0],
range_to=height_range[1],
include_lower=True,
include_upper=True
))
if width_range:
must_queries.append(tablestore.RangeQuery(
field_name='width',
range_from=width_range[0],
range_to=width_range[1],
include_lower=True,
include_upper=True
))
vector_filter = tablestore.BoolQuery(must_queries=must_queries) if must_queries else None
query = tablestore.KnnVectorQuery(
field_name='vector',
top_k=top_k,
float32_query_vector=query_vector,
filter=vector_filter
)
sort = tablestore.Sort(sorters=[tablestore.ScoreSort(sort_order=tablestore.SortOrder.DESC)])
search_query = tablestore.SearchQuery(query, limit=top_k, get_total_count=False, sort=sort)
search_response = client.search(
table_name=table_name,
index_name=index_name,
search_query=search_query,
columns_to_get=tablestore.ColumnsToGet(
column_names=["image_id", "city", "height", "width"],
return_type=tablestore.ColumnReturnType.SPECIFIED
)
)
print(f"\nRequest ID: {search_response.request_id}")
print(f"\nSearch results:")
print("-" * 60)
for idx, hit in enumerate(search_response.search_hits):
row_item = parse_search_hit(hit)
print(f"{idx + 1}. Score: {hit.score:.4f} | {row_item}")
return search_response.search_hits
def search_with_combined_filters(client, table_name, index_name, query_text: str,
cities: list = None, height_range: tuple = None,
width_range: tuple = None, top_k: int = 10):
"""
Scenario 4: Search using query text and combined filters (city list, height, width).
"""
print(f"\n{'='*60}")
print(f"Scenario 4: Query text + Combined filters")
print(f"Query text: '{query_text}'")
print(f"City list: {cities}")
print(f"Height range: {height_range}")
print(f"Width range: {width_range}")
print(f"Number of results: {top_k}")
print("="*60)
query_vector = text_to_embedding(query_text)
# Build combined filter conditions.
must_queries = []
if cities and len(cities) > 0:
must_queries.append(tablestore.TermsQuery(field_name='city', column_values=cities))
if height_range:
must_queries.append(tablestore.RangeQuery(
field_name='height',
range_from=height_range[0],
range_to=height_range[1],
include_lower=True,
include_upper=True
))
if width_range:
must_queries.append(tablestore.RangeQuery(
field_name='width',
range_from=width_range[0],
range_to=width_range[1],
include_lower=True,
include_upper=True
))
vector_filter = tablestore.BoolQuery(must_queries=must_queries) if must_queries else None
query = tablestore.KnnVectorQuery(
field_name='vector',
top_k=top_k,
float32_query_vector=query_vector,
filter=vector_filter
)
sort = tablestore.Sort(sorters=[tablestore.ScoreSort(sort_order=tablestore.SortOrder.DESC)])
search_query = tablestore.SearchQuery(query, limit=top_k, get_total_count=False, sort=sort)
search_response = client.search(
table_name=table_name,
index_name=index_name,
search_query=search_query,
columns_to_get=tablestore.ColumnsToGet(
column_names=["image_id", "city", "height", "width"],
return_type=tablestore.ColumnReturnType.SPECIFIED
)
)
print(f"\nRequest ID: {search_response.request_id}")
print(f"\nSearch results:")
print("-" * 60)
for idx, hit in enumerate(search_response.search_hits):
row_item = parse_search_hit(hit)
print(f"{idx + 1}. Score: {hit.score:.4f} | {row_item}")
return search_response.search_hits
def parse_search_hit(hit):
"""Parse search results."""
row_item = {}
primary_key = hit.row[0]
row_item["image_id"] = primary_key[0][1]
attribute_columns = hit.row[1]
for col in attribute_columns:
key = col[0]
val = col[1]
row_item[key] = val
return row_item
def main():
# Configure parameters.
table_name = "multi_modal_retrieval"
index_name = "index"
print("=" * 60)
print("Tablestore multimodal semantic search demo")
print("=" * 60)
# Create a client.
client = get_client()
print("Tablestore client created successfully.")
# Scenario 1: Semantic search using only a natural language description.
# Use complete natural language sentences, not just simple keywords.
search_by_text_only(
client, table_name, index_name,
"A fluffy puppy running on the grass",
top_k=5
)
# Scenario 2: Natural language description + City filter.
search_with_city_filter(
client, table_name, index_name,
"A willow tree by the lake with a mountain range in the distance",
city="hangzhou",
top_k=5
)
# Scenario 3: Natural language description + Size filter.
# Find high-resolution landscape images.
search_with_size_filter(
client, table_name, index_name,
"A modern city skyline brightly lit at night",
height_range=(500, 1024),
width_range=(800, 1024),
top_k=5
)
# Scenario 4: Natural language description + Combined filters.
search_with_combined_filters(
client, table_name, index_name,
"Snow-capped mountains in the distance, with sunlight glistening on the snow",
cities=["hangzhou", "shanghai", "beijing"],
height_range=(0, 1024),
width_range=(0, 1024),
top_k=5
)
print("\n" + "=" * 60)
print("All search scenario demos are complete!")
print("=" * 60)
if __name__ == "__main__":
main()Search by image
# -*- coding: utf-8 -*-
"""
Search by image example.
Vectorizes a local image and then retrieves similar images from Tablestore.
"""
import base64
import os
from pathlib import Path
import dashscope
import tablestore
def get_client():
"""Create a Tablestore client."""
endpoint = os.getenv("tablestore_end_point")
instance_name = os.getenv("tablestore_instance_name")
access_key_id = os.getenv("tablestore_access_key_id")
access_key_secret = os.getenv("tablestore_access_key_secret")
client = tablestore.OTSClient(
endpoint,
access_key_id,
access_key_secret,
instance_name,
retry_policy=tablestore.WriteRetryPolicy(),
)
return client
def image_to_embedding(image_path: str) -> list[float]:
"""
Convert a local image to an embedding.
"""
# Read the image and convert it to base64.
with open(image_path, "rb") as f:
image_data = f.read()
base64_image = base64.b64encode(image_data).decode("utf-8")
# Determine the MIME type based on the file extension.
suffix = Path(image_path).suffix.lower()
if suffix in [".jpg", ".jpeg"]:
mime_type = "image/jpeg"
elif suffix == ".png":
mime_type = "image/png"
elif suffix == ".gif":
mime_type = "image/gif"
elif suffix == ".webp":
mime_type = "image/webp"
else:
mime_type = "image/jpeg" # Use jpeg by default.
# Construct the data URI.
data_uri = f"data:{mime_type};base64,{base64_image}"
# Call the multimodal vectorization API.
resp = dashscope.MultiModalEmbedding.call(
model="multimodal-embedding-v1",
input=[{"image": data_uri, "factor": 1.0}]
)
if resp.status_code == 200:
return resp.output["embeddings"][0]["embedding"]
else:
raise Exception(f"Image vectorization failed: {resp.code} - {resp.message}")
def search_by_image(client, table_name, index_name, image_path: str, top_k: int = 10):
"""
Search by image: Perform a semantic search using a local image.
"""
print(f"\n{'='*60}")
print(f"Search by image")
print(f"Query image: {image_path}")
print(f"Number of results: {top_k}")
print("="*60)
# Vectorize the image.
print("Vectorizing the query image...")
query_vector = image_to_embedding(image_path)
print(f"Vectorization complete. Dimension: {len(query_vector)}")
# Build the vector query.
query = tablestore.KnnVectorQuery(
field_name='vector',
top_k=top_k,
float32_query_vector=query_vector,
)
# Sort by score.
sort = tablestore.Sort(sorters=[tablestore.ScoreSort(sort_order=tablestore.SortOrder.DESC)])
search_query = tablestore.SearchQuery(
query,
limit=top_k,
get_total_count=False,
sort=sort
)
# Execute the search.
search_response = client.search(
table_name=table_name,
index_name=index_name,
search_query=search_query,
columns_to_get=tablestore.ColumnsToGet(
column_names=["image_id", "city", "height", "width"],
return_type=tablestore.ColumnReturnType.SPECIFIED
)
)
print(f"\nRequest ID: {search_response.request_id}")
print(f"\nSearch results:")
print("-" * 60)
for idx, hit in enumerate(search_response.search_hits):
row_item = parse_search_hit(hit)
print(f"{idx + 1}. Score: {hit.score:.4f} | {row_item}")
return search_response.search_hits
def search_by_image_with_filter(client, table_name, index_name, image_path: str,
cities: list = None, height_range: tuple = None,
width_range: tuple = None, top_k: int = 10):
"""
Search by image with filters: Perform a semantic search using a local image and apply filter conditions.
"""
print(f"\n{'='*60}")
print(f"Search by image + Filters")
print(f"Query image: {image_path}")
print(f"City list: {cities}")
print(f"Height range: {height_range}")
print(f"Width range: {width_range}")
print(f"Number of results: {top_k}")
print("="*60)
# Vectorize the image.
print("Vectorizing the query image...")
query_vector = image_to_embedding(image_path)
print(f"Vectorization complete. Dimension: {len(query_vector)}")
# Build filter conditions.
must_queries = []
if cities and len(cities) > 0:
must_queries.append(tablestore.TermsQuery(field_name='city', column_values=cities))
if height_range:
must_queries.append(tablestore.RangeQuery(
field_name='height',
range_from=height_range[0],
range_to=height_range[1],
include_lower=True,
include_upper=True
))
if width_range:
must_queries.append(tablestore.RangeQuery(
field_name='width',
range_from=width_range[0],
range_to=width_range[1],
include_lower=True,
include_upper=True
))
vector_filter = tablestore.BoolQuery(must_queries=must_queries) if must_queries else None
# Build the vector query.
query = tablestore.KnnVectorQuery(
field_name='vector',
top_k=top_k,
float32_query_vector=query_vector,
filter=vector_filter
)
# Sort by score.
sort = tablestore.Sort(sorters=[tablestore.ScoreSort(sort_order=tablestore.SortOrder.DESC)])
search_query = tablestore.SearchQuery(query, limit=top_k, get_total_count=False, sort=sort)
# Execute the search.
search_response = client.search(
table_name=table_name,
index_name=index_name,
search_query=search_query,
columns_to_get=tablestore.ColumnsToGet(
column_names=["image_id", "city", "height", "width"],
return_type=tablestore.ColumnReturnType.SPECIFIED
)
)
print(f"\nRequest ID: {search_response.request_id}")
print(f"\nSearch results:")
print("-" * 60)
for idx, hit in enumerate(search_response.search_hits):
row_item = parse_search_hit(hit)
print(f"{idx + 1}. Score: {hit.score:.4f} | {row_item}")
return search_response.search_hits
def parse_search_hit(hit):
"""Parse search results."""
row_item = {}
primary_key = hit.row[0]
row_item["image_id"] = primary_key[0][1]
attribute_columns = hit.row[1]
for col in attribute_columns:
key = col[0]
val = col[1]
row_item[key] = val
return row_item
def main():
# Configure parameters.
table_name = "multi_modal_retrieval"
index_name = "index"
print("=" * 60)
print("Tablestore search by image demo")
print("=" * 60)
# Create a client.
client = get_client()
print("Tablestore client created successfully.")
# Get the project root directory.
current_dir = Path(__file__).parent
data_dir = current_dir / "data" / "photograph"
# Get a sample image to use as the query image.
sample_images = list(data_dir.glob("*.jpg"))
if not sample_images:
print("Error: No sample image found. Make sure there are .jpg images in the data/photograph folder.")
return
# Use the first image as the query example.
query_image_path = str(sample_images[0])
print(f"\nUsing sample image: {query_image_path}")
# Scenario 1: Search by image using only an image.
search_by_image(client, table_name, index_name, query_image_path, top_k=5)
# Scenario 2: Search by image + Filters.
# Search for similar images only in specific cities.
search_by_image_with_filter(
client, table_name, index_name,
query_image_path,
cities=["hangzhou", "shanghai"],
top_k=5
)
# Scenario 3: Search by image + Size filter.
# Search for similar landscape images (width greater than height).
search_by_image_with_filter(
client, table_name, index_name,
query_image_path,
width_range=(800, 1024),
top_k=5
)
print("\n" + "=" * 60)
print("Search by image demo complete!")
print("=" * 60)
if __name__ == "__main__":
main()Visualization search interface
Build an interactive search interface based on Gradio to provide an intuitive graphical user experience. This interface depends on the local image folder in the demo project and is suitable for quick tests and demonstrations. If you use your own data, refer to the code to implement a similar interface.
Install Gradio and its dependencies.
pip install gradio gradio_rangesliderStart the visualization interface.
python src/gradio_app.pyAfter the application starts, open the application URL (such as
http://localhost:7860) to open the search interface.Feature
Description
Search by image
Upload a local image to query for similar images.
Search by natural language
Enter a natural language description, such as "snow-capped mountains in the distance" or "a fluffy puppy running on the grass".
Top K
Set the number of results to return (1-30).
Height/Width range
Filter by image dimensions.
City filter
Filter by city (multiple selections are supported).
References
For the complete tutorial project, see the GitHub repository