All Products
Search
Document Center

Tablestore:Multimodal image retrieval system based on Tablestore

Last Updated:Dec 21, 2025

Build a multimodal image retrieval system using the vector retrieval capabilities of Tablestore and the multimodal embedding model from Alibaba Cloud Model Studio. The system supports search by natural language and search by image. It is suitable for scenarios such as E-commerce product search, smart photo album management, and media asset retrieval.

How it works

The process of building a multimodal image retrieval system includes the following core steps:

  1. Create a table and an index: Create a Tablestore data table to store image data and a search index to enable vector retrieval.

  2. Vectorize images: Use the Alibaba Cloud Model Studio multimodal embedding model to convert images into high-dimensional vector representations.

  3. Write vector data: Write the generated image vector data and related metadata to Tablestore in batches.

  4. Perform multimodal retrieval: Convert a query image or natural language text into a vector. Then, perform a similarity search in the search index. You can also use metadata conditions for precise filtering.

Preparations

Before you build the retrieval system, you must configure the environment, set up credentials, and prepare the data.

1. Install the SDKs

  1. Make sure that Python 3.12 or later is installed.

  2. Run the following commands to install the Tablestore Python SDK and the Alibaba Cloud Model Studio SDK.

    pip install tablestore
    pip install dashscope
    pip install Pillow

2. Configure environment variables

Configure your access credentials as environment variables to ensure code security and cross-environment portability.

Before you start, obtain an API key from the Alibaba Cloud Model Studio platform and an AccessKey. Then, go to the Tablestore console to create an instance and obtain its name and endpoint.
Note

For security reasons, public network access is disabled by default for new Tablestore instances. To use a public endpoint, go to the Network Management page of the instance to enable public network access.

export DASHSCOPE_API_KEY=<Your Alibaba Cloud Model Studio API key>
export tablestore_end_point=<Your Tablestore instance endpoint>
export tablestore_instance_name=<Your Tablestore instance name>
export tablestore_access_key_id=<Your AccessKey ID>
export tablestore_access_key_secret=<Your AccessKey secret>

3. Prepare the image data

Use your own image data or the demo dataset provided in the tutorial.

git clone https://github.com/aliyun/alibabacloud-tablestore-ai-demo.git

You can also download the demo project file directly: alibabacloud-tablestore-ai-demo-main

Step 1: Create a table and an index

Create a data table to store image vector data and a search index to support vector retrieval. Customize the table schema and index configuration based on your data characteristics and requirements. To quickly test the demo, use the following sample configuration.

1. Create a data table

# -*- coding: utf-8 -*-
"""
Create a Tablestore data table.
"""

import os
import tablestore


def get_client():
    """Create a Tablestore client."""
    endpoint = os.getenv("tablestore_end_point")
    instance_name = os.getenv("tablestore_instance_name")
    access_key_id = os.getenv("tablestore_access_key_id")
    access_key_secret = os.getenv("tablestore_access_key_secret")

    client = tablestore.OTSClient(
        endpoint,
        access_key_id,
        access_key_secret,
        instance_name,
        retry_policy=tablestore.WriteRetryPolicy(),
    )
    return client


def create_table(client, table_name):
    """Create a data table."""
    try:
        # Define the primary key.
        schema_of_primary_key = [("image_id", "STRING")]
        table_meta = tablestore.TableMeta(table_name, schema_of_primary_key)
        table_options = tablestore.TableOptions()
        reserved_throughput = tablestore.ReservedThroughput(tablestore.CapacityUnit(0, 0))
        client.create_table(table_meta, table_options, reserved_throughput)
        print(f"Table '{table_name}' created successfully.")
    except tablestore.OTSClientError as e:
        print(f"Failed to create table (client error): {e}")
    except tablestore.OTSServiceError as e:
        if "already exist" in str(e).lower() or "OTSObjectAlreadyExist" in str(e):
            print(f"Table '{table_name}' already exists. Skip creation.")
        else:
            print(f"Failed to create table (server-side error): {e}")


def main():
    # Configure parameters.
    table_name = "multi_modal_retrieval"

    print("=" * 50)
    print("Create a Tablestore data table")
    print("=" * 50)

    # Create a client.
    client = get_client()
    print("Tablestore client created successfully.")

    # Create the table.
    print("\nCreating data table...")
    create_table(client, table_name)

    print("\n" + "=" * 50)
    print("Data table created.")
    print("=" * 50)


if __name__ == "__main__":
    main()

2. Create a search index

Vector data is stored as strings in the Tablestore data table. To enable vector retrieval, you must create a search index and configure the vector field type. This enables similarity calculation and fast retrieval of high-dimensional vectors.

# -*- coding: utf-8 -*-
"""
Create a Tablestore search index.
"""

import os
import tablestore


def get_client():
    """Create a Tablestore client."""
    endpoint = os.getenv("tablestore_end_point")
    instance_name = os.getenv("tablestore_instance_name")
    access_key_id = os.getenv("tablestore_access_key_id")
    access_key_secret = os.getenv("tablestore_access_key_secret")

    client = tablestore.OTSClient(
        endpoint,
        access_key_id,
        access_key_secret,
        instance_name,
        retry_policy=tablestore.WriteRetryPolicy(),
    )
    return client


def create_search_index(client, table_name, index_name, dimension=1024):
    """Create a search index."""
    try:
        # Define index fields.
        metadata_mappings = [
            tablestore.FieldSchema(
                "image_id",
                tablestore.FieldType.KEYWORD,
                index=True,
                enable_sort_and_agg=True,
            ),
            tablestore.FieldSchema(
                "city",
                tablestore.FieldType.KEYWORD,
                index=True,
                enable_sort_and_agg=True,
            ),
            tablestore.FieldSchema(
                "height",
                tablestore.FieldType.LONG,
                index=True,
                enable_sort_and_agg=True,
            ),
            tablestore.FieldSchema(
                "width",
                tablestore.FieldType.LONG,
                index=True,
                enable_sort_and_agg=True,
            ),
            tablestore.FieldSchema(
                "vector",
                tablestore.FieldType.VECTOR,
                vector_options=tablestore.VectorOptions(
                    data_type=tablestore.VectorDataType.VD_FLOAT_32,
                    dimension=dimension,
                    metric_type=tablestore.VectorMetricType.VM_COSINE,
                ),
            ),
        ]

        index_meta = tablestore.SearchIndexMeta(metadata_mappings)
        client.create_search_index(table_name, index_name, index_meta)
        print(f"Search index '{index_name}' created successfully.")
    except tablestore.OTSClientError as e:
        print(f"Failed to create index (client error): {e}")
    except tablestore.OTSServiceError as e:
        if "already exist" in str(e).lower() or "OTSObjectAlreadyExist" in str(e):
            print(f"Search index '{index_name}' already exists. Skip creation.")
        else:
            print(f"Failed to create index (server-side error): {e}")


def main():
    # Configure parameters.
    table_name = "multi_modal_retrieval"
    index_name = "index"
    dimension = 1024

    print("=" * 50)
    print("Create a Tablestore search index")
    print("=" * 50)

    # Create a client.
    client = get_client()
    print("Tablestore client created successfully.")

    # Create the index.
    print("\nCreating search index...")
    create_search_index(client, table_name, index_name, dimension)

    print("\n" + "=" * 50)
    print("Search index created.")
    print("=" * 50)


if __name__ == "__main__":
    main()

Step 2: Vectorize images

Call the Alibaba Cloud Model Studio multimodal vectorization model to process and vectorize images. The following example shows how to vectorize local images. For more information, see Multimodal embeddings.

Vectorizing many images can be time-consuming. The demo project provides a pre-processed vector data file named data.json that you can use directly in Step 3.
# -*- coding: utf-8 -*-
"""
Demo of local image vectorization.
This shows how to use the Alibaba Cloud Model Studio multimodal vectorization model to vectorize local images.
It outputs key information such as original image details, vector dimensions, and the first few elements of the vector.
"""

import base64
import os
from pathlib import Path

import dashscope
from PIL import Image


def image_to_base64(image_path):
    """Convert an image file to a base64 string."""
    with open(image_path, "rb") as f:
        image_data = f.read()
    return base64.b64encode(image_data).decode("utf-8")


def get_image_embedding(image_path):
    """
    Call the Alibaba Cloud Model Studio multimodal vectorization model to vectorize a local image.
    """
    # Convert the local image to base64.
    base64_image = image_to_base64(image_path)

    # Get the image format.
    suffix = Path(image_path).suffix.lower()
    if suffix in [".jpg", ".jpeg"]:
        mime_type = "image/jpeg"
    elif suffix == ".png":
        mime_type = "image/png"
    elif suffix == ".gif":
        mime_type = "image/gif"
    elif suffix == ".webp":
        mime_type = "image/webp"
    else:
        mime_type = "image/jpeg"  # Use jpeg by default.

    # Construct the data URI.
    data_uri = f"data:{mime_type};base64,{base64_image}"

    # Call the multimodal vectorization API.
    resp = dashscope.MultiModalEmbedding.call(
        model="multimodal-embedding-v1",
        input=[{"image": data_uri, "factor": 1.0}]
    )

    if resp.status_code == 200:
        return resp.output["embeddings"][0]["embedding"]
    else:
        raise Exception(f"Vectorization failed: {resp.code} - {resp.message}")


def get_image_info(image_path):
    """Get basic information about the image."""
    with Image.open(image_path) as img:
        return {
            "filename": os.path.basename(image_path),
            "format": img.format,
            "mode": img.mode,
            "width": img.width,
            "height": img.height,
            "size_bytes": os.path.getsize(image_path),
        }


def main():
    # Configure paths.
    current_dir = Path(__file__).parent
    project_root = current_dir
    image_dir = project_root / "data" / "photograph"

    print("=" * 60)
    print("Local image vectorization demo")
    print("=" * 60)

    # Get the list of images.
    image_files = [f for f in os.listdir(image_dir) if f.lower().endswith(('.jpg', '.jpeg', '.png', '.gif', '.webp'))]
    
    if not image_files:
        print("No image files found.")
        return

    # Select the first image for the demo.
    demo_image = image_files[0]
    image_path = image_dir / demo_image

    print(f"\n[1/3] Reading image information")
    print("-" * 60)
    
    # Get image information.
    image_info = get_image_info(image_path)
    print(f"Filename: {image_info['filename']}")
    print(f"Format: {image_info['format']}")
    print(f"Mode: {image_info['mode']}")
    print(f"Width: {image_info['width']} px")
    print(f"Height: {image_info['height']} px")
    print(f"File size: {image_info['size_bytes']:,} bytes")

    print(f"\n[2/3] Calling vectorization API")
    print("-" * 60)
    print("Calling the Alibaba Cloud Model Studio multimodal vectorization model...")
    
    # Vectorize.
    vector = get_image_embedding(str(image_path))

    print(f"\n[3/3] Vectorization result")
    print("-" * 60)
    print(f"Vector dimension: {len(vector)}")
    print(f"Vector type: {type(vector[0]).__name__}")
    print(f"First 10 elements of the vector:")
    for i, v in enumerate(vector[:10]):
        print(f"  [{i}] {v:.8f}")
    print("  ...")
    print(f"Last 5 elements of the vector:")
    for i, v in enumerate(vector[-5:], start=len(vector)-5):
        print(f"  [{i}] {v:.8f}")

    # Calculate the vector norm.
    import math
    norm = math.sqrt(sum(v * v for v in vector))
    print(f"\nVector L2 norm: {norm:.8f}")

    print("\n" + "=" * 60)
    print("Vectorization demo complete!")
    print("=" * 60)


if __name__ == "__main__":
    main()

Step 3: Write vector data

Import image vector data into the Tablestore data table in batches. The following example reads pre-processed vector data from the demo project and writes it in batches. If you use your own business data, combine the image vectorization and data writing operations.

# -*- coding: utf-8 -*-
"""
Read data from data.json and write it to Tablestore.
This uses existing vector data from data.json without recalling the vectorization API.
"""

import json
import os
from pathlib import Path

import tablestore


def get_client():
    """Create a Tablestore client."""
    endpoint = os.getenv("tablestore_end_point")
    instance_name = os.getenv("tablestore_instance_name")
    access_key_id = os.getenv("tablestore_access_key_id")
    access_key_secret = os.getenv("tablestore_access_key_secret")

    client = tablestore.OTSClient(
        endpoint,
        access_key_id,
        access_key_secret,
        instance_name,
        retry_policy=tablestore.WriteRetryPolicy(),
    )
    return client


def load_data(data_json_path):
    """Load all data from data.json."""
    with open(data_json_path, "r", encoding="utf-8") as f:
        return json.load(f)


def batch_write_rows(client, table_name, put_row_items):
    """Batch write data to Tablestore."""
    request = tablestore.BatchWriteRowRequest()
    request.add(tablestore.TableInBatchWriteRowItem(table_name, put_row_items))
    result = client.batch_write_row(request)
    return result.is_all_succeed()


def main():
    # Configure parameters.
    table_name = "multi_modal_retrieval"
    batch_size = 100  # Number of rows per batch.

    # Configure paths.
    current_dir = Path(__file__).parent
    project_root = current_dir
    data_json_path = project_root / "data" / "data.json"

    print("=" * 60)
    print("Read data from data.json and write to Tablestore")
    print("=" * 60)

    # Create a Tablestore client.
    client = get_client()
    print("Tablestore client created successfully.")

    # Load data.
    print(f"\n[1/2] Loading data: {data_json_path}")
    data_array = load_data(data_json_path)
    print(f"Loaded {len(data_array)} data records.")

    # Write data.
    print(f"\n[2/2] Starting to write data...")
    put_row_items = []
    success_count = 0
    error_count = 0
    total = len(data_array)

    for idx, item in enumerate(data_array):
        try:
            image_id = item["image_id"]
            vector = item["vector"]
            city = item.get("city", "unknown")
            width = item.get("width", 0)
            height = item.get("height", 0)

            # Construct the row data.
            primary_key = [("image_id", image_id)]
            attribute_columns = [
                ("city", city),
                ("vector", json.dumps(vector)),
                ("width", width),
                ("height", height),
            ]
            row = tablestore.Row(primary_key, attribute_columns)
            condition = tablestore.Condition(tablestore.RowExistenceExpectation.IGNORE)
            item_row = tablestore.PutRowItem(row, condition)
            put_row_items.append(item_row)

            # Batch write.
            if len(put_row_items) >= batch_size or (idx == total - 1 and len(put_row_items) > 0):
                is_success = batch_write_rows(client, table_name, put_row_items)
                if is_success:
                    success_count += len(put_row_items)
                    print(f"Progress: {idx + 1}/{total} - Batch write of {len(put_row_items)} rows successful.")
                else:
                    error_count += len(put_row_items)
                    print(f"Progress: {idx + 1}/{total} - Batch write of {len(put_row_items)} rows failed.")
                put_row_items = []

        except Exception as e:
            error_count += 1
            print(f"Failed to process data {item.get('image_id', 'unknown')}: {e}")
            continue

    print("\n" + "=" * 60)
    print(f"Data writing complete!")
    print(f"Successful: {success_count} rows")
    print(f"Failed: {error_count} rows")
    print("=" * 60)


if __name__ == "__main__":
    main()

Step 4: Perform multimodal retrieval

The multimodal image retrieval system supports two retrieval modes: search by natural language and search by image. The system converts the query content into a vector representation, performs a similarity calculation in the vector index, and returns the images that best match the query semantics. You can also use metadata conditions, such as city and image size, for precise filtering.

Search by natural language

# -*- coding: utf-8 -*-
"""
Semantic search example.
Includes multiple query scenarios:
1. Semantic search using only query text.
2. Semantic search using query text and filter conditions (city, height, width).
"""

import os

import dashscope
import tablestore
from dashscope import MultiModalEmbeddingItemText


def get_client():
    """Create a Tablestore client."""
    endpoint = os.getenv("tablestore_end_point")
    instance_name = os.getenv("tablestore_instance_name")
    access_key_id = os.getenv("tablestore_access_key_id")
    access_key_secret = os.getenv("tablestore_access_key_secret")

    client = tablestore.OTSClient(
        endpoint,
        access_key_id,
        access_key_secret,
        instance_name,
        retry_policy=tablestore.WriteRetryPolicy(),
    )
    return client


def text_to_embedding(text: str) -> list[float]:
    """Convert text to an embedding."""
    resp = dashscope.MultiModalEmbedding.call(
        model="multimodal-embedding-v1",
        input=[MultiModalEmbeddingItemText(text=text, factor=1.0)]
    )
    if resp.status_code == 200:
        return resp.output["embeddings"][0]["embedding"]
    else:
        raise Exception(f"Text vectorization failed: {resp.code} - {resp.message}")


def search_by_text_only(client, table_name, index_name, query_text: str, top_k: int = 10):
    """
    Scenario 1: Semantic search using only query text.
    """
    print(f"\n{'='*60}")
    print(f"Scenario 1: Search using only query text")
    print(f"Query text: '{query_text}'")
    print(f"Number of results: {top_k}")
    print("="*60)

    # Convert text to vector.
    query_vector = text_to_embedding(query_text)

    # Build the vector query.
    query = tablestore.KnnVectorQuery(
        field_name='vector',
        top_k=top_k,
        float32_query_vector=query_vector,
    )

    # Sort by score.
    sort = tablestore.Sort(sorters=[tablestore.ScoreSort(sort_order=tablestore.SortOrder.DESC)])
    search_query = tablestore.SearchQuery(
        query,
        limit=top_k,
        get_total_count=False,
        sort=sort
    )

    # Execute the search.
    search_response = client.search(
        table_name=table_name,
        index_name=index_name,
        search_query=search_query,
        columns_to_get=tablestore.ColumnsToGet(
            column_names=["image_id", "city", "height", "width"],
            return_type=tablestore.ColumnReturnType.SPECIFIED
        )
    )

    print(f"\nRequest ID: {search_response.request_id}")
    print(f"\nSearch results:")
    print("-" * 60)

    for idx, hit in enumerate(search_response.search_hits):
        row_item = parse_search_hit(hit)
        print(f"{idx + 1}. Score: {hit.score:.4f} | {row_item}")

    return search_response.search_hits


def search_with_city_filter(client, table_name, index_name, query_text: str, city: str, top_k: int = 10):
    """
    Scenario 2: Search using query text and a city filter.
    """
    print(f"\n{'='*60}")
    print(f"Scenario 2: Query text + City filter")
    print(f"Query text: '{query_text}'")
    print(f"City filter: {city}")
    print(f"Number of results: {top_k}")
    print("="*60)

    query_vector = text_to_embedding(query_text)

    # Build the vector query with a city filter.
    query = tablestore.KnnVectorQuery(
        field_name='vector',
        top_k=top_k,
        float32_query_vector=query_vector,
        filter=tablestore.TermQuery(field_name='city', column_value=city)
    )

    sort = tablestore.Sort(sorters=[tablestore.ScoreSort(sort_order=tablestore.SortOrder.DESC)])
    search_query = tablestore.SearchQuery(query, limit=top_k, get_total_count=False, sort=sort)

    search_response = client.search(
        table_name=table_name,
        index_name=index_name,
        search_query=search_query,
        columns_to_get=tablestore.ColumnsToGet(
            column_names=["image_id", "city", "height", "width"],
            return_type=tablestore.ColumnReturnType.SPECIFIED
        )
    )

    print(f"\nRequest ID: {search_response.request_id}")
    print(f"\nSearch results:")
    print("-" * 60)

    for idx, hit in enumerate(search_response.search_hits):
        row_item = parse_search_hit(hit)
        print(f"{idx + 1}. Score: {hit.score:.4f} | {row_item}")

    return search_response.search_hits


def search_with_size_filter(client, table_name, index_name, query_text: str,
                             height_range: tuple = None, width_range: tuple = None, top_k: int = 10):
    """
    Scenario 3: Search using query text and size filters (height, width).
    """
    print(f"\n{'='*60}")
    print(f"Scenario 3: Query text + Size filter")
    print(f"Query text: '{query_text}'")
    print(f"Height range: {height_range}")
    print(f"Width range: {width_range}")
    print(f"Number of results: {top_k}")
    print("="*60)

    query_vector = text_to_embedding(query_text)

    # Build filter conditions.
    must_queries = []
    if height_range:
        must_queries.append(tablestore.RangeQuery(
            field_name='height',
            range_from=height_range[0],
            range_to=height_range[1],
            include_lower=True,
            include_upper=True
        ))
    if width_range:
        must_queries.append(tablestore.RangeQuery(
            field_name='width',
            range_from=width_range[0],
            range_to=width_range[1],
            include_lower=True,
            include_upper=True
        ))

    vector_filter = tablestore.BoolQuery(must_queries=must_queries) if must_queries else None

    query = tablestore.KnnVectorQuery(
        field_name='vector',
        top_k=top_k,
        float32_query_vector=query_vector,
        filter=vector_filter
    )

    sort = tablestore.Sort(sorters=[tablestore.ScoreSort(sort_order=tablestore.SortOrder.DESC)])
    search_query = tablestore.SearchQuery(query, limit=top_k, get_total_count=False, sort=sort)

    search_response = client.search(
        table_name=table_name,
        index_name=index_name,
        search_query=search_query,
        columns_to_get=tablestore.ColumnsToGet(
            column_names=["image_id", "city", "height", "width"],
            return_type=tablestore.ColumnReturnType.SPECIFIED
        )
    )

    print(f"\nRequest ID: {search_response.request_id}")
    print(f"\nSearch results:")
    print("-" * 60)

    for idx, hit in enumerate(search_response.search_hits):
        row_item = parse_search_hit(hit)
        print(f"{idx + 1}. Score: {hit.score:.4f} | {row_item}")

    return search_response.search_hits


def search_with_combined_filters(client, table_name, index_name, query_text: str,
                                  cities: list = None, height_range: tuple = None,
                                  width_range: tuple = None, top_k: int = 10):
    """
    Scenario 4: Search using query text and combined filters (city list, height, width).
    """
    print(f"\n{'='*60}")
    print(f"Scenario 4: Query text + Combined filters")
    print(f"Query text: '{query_text}'")
    print(f"City list: {cities}")
    print(f"Height range: {height_range}")
    print(f"Width range: {width_range}")
    print(f"Number of results: {top_k}")
    print("="*60)

    query_vector = text_to_embedding(query_text)

    # Build combined filter conditions.
    must_queries = []

    if cities and len(cities) > 0:
        must_queries.append(tablestore.TermsQuery(field_name='city', column_values=cities))

    if height_range:
        must_queries.append(tablestore.RangeQuery(
            field_name='height',
            range_from=height_range[0],
            range_to=height_range[1],
            include_lower=True,
            include_upper=True
        ))

    if width_range:
        must_queries.append(tablestore.RangeQuery(
            field_name='width',
            range_from=width_range[0],
            range_to=width_range[1],
            include_lower=True,
            include_upper=True
        ))

    vector_filter = tablestore.BoolQuery(must_queries=must_queries) if must_queries else None

    query = tablestore.KnnVectorQuery(
        field_name='vector',
        top_k=top_k,
        float32_query_vector=query_vector,
        filter=vector_filter
    )

    sort = tablestore.Sort(sorters=[tablestore.ScoreSort(sort_order=tablestore.SortOrder.DESC)])
    search_query = tablestore.SearchQuery(query, limit=top_k, get_total_count=False, sort=sort)

    search_response = client.search(
        table_name=table_name,
        index_name=index_name,
        search_query=search_query,
        columns_to_get=tablestore.ColumnsToGet(
            column_names=["image_id", "city", "height", "width"],
            return_type=tablestore.ColumnReturnType.SPECIFIED
        )
    )

    print(f"\nRequest ID: {search_response.request_id}")
    print(f"\nSearch results:")
    print("-" * 60)

    for idx, hit in enumerate(search_response.search_hits):
        row_item = parse_search_hit(hit)
        print(f"{idx + 1}. Score: {hit.score:.4f} | {row_item}")

    return search_response.search_hits


def parse_search_hit(hit):
    """Parse search results."""
    row_item = {}
    primary_key = hit.row[0]
    row_item["image_id"] = primary_key[0][1]
    attribute_columns = hit.row[1]
    for col in attribute_columns:
        key = col[0]
        val = col[1]
        row_item[key] = val
    return row_item


def main():
    # Configure parameters.
    table_name = "multi_modal_retrieval"
    index_name = "index"

    print("=" * 60)
    print("Tablestore multimodal semantic search demo")
    print("=" * 60)

    # Create a client.
    client = get_client()
    print("Tablestore client created successfully.")

    # Scenario 1: Semantic search using only a natural language description.
    # Use complete natural language sentences, not just simple keywords.
    search_by_text_only(
        client, table_name, index_name,
        "A fluffy puppy running on the grass",
        top_k=5
    )

    # Scenario 2: Natural language description + City filter.
    search_with_city_filter(
        client, table_name, index_name,
        "A willow tree by the lake with a mountain range in the distance",
        city="hangzhou",
        top_k=5
    )

    # Scenario 3: Natural language description + Size filter.
    # Find high-resolution landscape images.
    search_with_size_filter(
        client, table_name, index_name,
        "A modern city skyline brightly lit at night",
        height_range=(500, 1024),
        width_range=(800, 1024),
        top_k=5
    )

    # Scenario 4: Natural language description + Combined filters.
    search_with_combined_filters(
        client, table_name, index_name,
        "Snow-capped mountains in the distance, with sunlight glistening on the snow",
        cities=["hangzhou", "shanghai", "beijing"],
        height_range=(0, 1024),
        width_range=(0, 1024),
        top_k=5
    )

    print("\n" + "=" * 60)
    print("All search scenario demos are complete!")
    print("=" * 60)


if __name__ == "__main__":
    main()

Search by image

# -*- coding: utf-8 -*-
"""
Search by image example.
Vectorizes a local image and then retrieves similar images from Tablestore.
"""

import base64
import os
from pathlib import Path

import dashscope
import tablestore


def get_client():
    """Create a Tablestore client."""
    endpoint = os.getenv("tablestore_end_point")
    instance_name = os.getenv("tablestore_instance_name")
    access_key_id = os.getenv("tablestore_access_key_id")
    access_key_secret = os.getenv("tablestore_access_key_secret")

    client = tablestore.OTSClient(
        endpoint,
        access_key_id,
        access_key_secret,
        instance_name,
        retry_policy=tablestore.WriteRetryPolicy(),
    )
    return client


def image_to_embedding(image_path: str) -> list[float]:
    """
    Convert a local image to an embedding.
    """
    # Read the image and convert it to base64.
    with open(image_path, "rb") as f:
        image_data = f.read()
    base64_image = base64.b64encode(image_data).decode("utf-8")

    # Determine the MIME type based on the file extension.
    suffix = Path(image_path).suffix.lower()
    if suffix in [".jpg", ".jpeg"]:
        mime_type = "image/jpeg"
    elif suffix == ".png":
        mime_type = "image/png"
    elif suffix == ".gif":
        mime_type = "image/gif"
    elif suffix == ".webp":
        mime_type = "image/webp"
    else:
        mime_type = "image/jpeg"  # Use jpeg by default.

    # Construct the data URI.
    data_uri = f"data:{mime_type};base64,{base64_image}"

    # Call the multimodal vectorization API.
    resp = dashscope.MultiModalEmbedding.call(
        model="multimodal-embedding-v1",
        input=[{"image": data_uri, "factor": 1.0}]
    )

    if resp.status_code == 200:
        return resp.output["embeddings"][0]["embedding"]
    else:
        raise Exception(f"Image vectorization failed: {resp.code} - {resp.message}")


def search_by_image(client, table_name, index_name, image_path: str, top_k: int = 10):
    """
    Search by image: Perform a semantic search using a local image.
    """
    print(f"\n{'='*60}")
    print(f"Search by image")
    print(f"Query image: {image_path}")
    print(f"Number of results: {top_k}")
    print("="*60)

    # Vectorize the image.
    print("Vectorizing the query image...")
    query_vector = image_to_embedding(image_path)
    print(f"Vectorization complete. Dimension: {len(query_vector)}")

    # Build the vector query.
    query = tablestore.KnnVectorQuery(
        field_name='vector',
        top_k=top_k,
        float32_query_vector=query_vector,
    )

    # Sort by score.
    sort = tablestore.Sort(sorters=[tablestore.ScoreSort(sort_order=tablestore.SortOrder.DESC)])
    search_query = tablestore.SearchQuery(
        query,
        limit=top_k,
        get_total_count=False,
        sort=sort
    )

    # Execute the search.
    search_response = client.search(
        table_name=table_name,
        index_name=index_name,
        search_query=search_query,
        columns_to_get=tablestore.ColumnsToGet(
            column_names=["image_id", "city", "height", "width"],
            return_type=tablestore.ColumnReturnType.SPECIFIED
        )
    )

    print(f"\nRequest ID: {search_response.request_id}")
    print(f"\nSearch results:")
    print("-" * 60)

    for idx, hit in enumerate(search_response.search_hits):
        row_item = parse_search_hit(hit)
        print(f"{idx + 1}. Score: {hit.score:.4f} | {row_item}")

    return search_response.search_hits


def search_by_image_with_filter(client, table_name, index_name, image_path: str,
                                 cities: list = None, height_range: tuple = None,
                                 width_range: tuple = None, top_k: int = 10):
    """
    Search by image with filters: Perform a semantic search using a local image and apply filter conditions.
    """
    print(f"\n{'='*60}")
    print(f"Search by image + Filters")
    print(f"Query image: {image_path}")
    print(f"City list: {cities}")
    print(f"Height range: {height_range}")
    print(f"Width range: {width_range}")
    print(f"Number of results: {top_k}")
    print("="*60)

    # Vectorize the image.
    print("Vectorizing the query image...")
    query_vector = image_to_embedding(image_path)
    print(f"Vectorization complete. Dimension: {len(query_vector)}")

    # Build filter conditions.
    must_queries = []

    if cities and len(cities) > 0:
        must_queries.append(tablestore.TermsQuery(field_name='city', column_values=cities))

    if height_range:
        must_queries.append(tablestore.RangeQuery(
            field_name='height',
            range_from=height_range[0],
            range_to=height_range[1],
            include_lower=True,
            include_upper=True
        ))

    if width_range:
        must_queries.append(tablestore.RangeQuery(
            field_name='width',
            range_from=width_range[0],
            range_to=width_range[1],
            include_lower=True,
            include_upper=True
        ))

    vector_filter = tablestore.BoolQuery(must_queries=must_queries) if must_queries else None

    # Build the vector query.
    query = tablestore.KnnVectorQuery(
        field_name='vector',
        top_k=top_k,
        float32_query_vector=query_vector,
        filter=vector_filter
    )

    # Sort by score.
    sort = tablestore.Sort(sorters=[tablestore.ScoreSort(sort_order=tablestore.SortOrder.DESC)])
    search_query = tablestore.SearchQuery(query, limit=top_k, get_total_count=False, sort=sort)

    # Execute the search.
    search_response = client.search(
        table_name=table_name,
        index_name=index_name,
        search_query=search_query,
        columns_to_get=tablestore.ColumnsToGet(
            column_names=["image_id", "city", "height", "width"],
            return_type=tablestore.ColumnReturnType.SPECIFIED
        )
    )

    print(f"\nRequest ID: {search_response.request_id}")
    print(f"\nSearch results:")
    print("-" * 60)

    for idx, hit in enumerate(search_response.search_hits):
        row_item = parse_search_hit(hit)
        print(f"{idx + 1}. Score: {hit.score:.4f} | {row_item}")

    return search_response.search_hits


def parse_search_hit(hit):
    """Parse search results."""
    row_item = {}
    primary_key = hit.row[0]
    row_item["image_id"] = primary_key[0][1]
    attribute_columns = hit.row[1]
    for col in attribute_columns:
        key = col[0]
        val = col[1]
        row_item[key] = val
    return row_item


def main():
    # Configure parameters.
    table_name = "multi_modal_retrieval"
    index_name = "index"

    print("=" * 60)
    print("Tablestore search by image demo")
    print("=" * 60)

    # Create a client.
    client = get_client()
    print("Tablestore client created successfully.")

    # Get the project root directory.
    current_dir = Path(__file__).parent
    data_dir = current_dir / "data" / "photograph"

    # Get a sample image to use as the query image.
    sample_images = list(data_dir.glob("*.jpg"))
    if not sample_images:
        print("Error: No sample image found. Make sure there are .jpg images in the data/photograph folder.")
        return

    # Use the first image as the query example.
    query_image_path = str(sample_images[0])
    print(f"\nUsing sample image: {query_image_path}")

    # Scenario 1: Search by image using only an image.
    search_by_image(client, table_name, index_name, query_image_path, top_k=5)

    # Scenario 2: Search by image + Filters.
    # Search for similar images only in specific cities.
    search_by_image_with_filter(
        client, table_name, index_name,
        query_image_path,
        cities=["hangzhou", "shanghai"],
        top_k=5
    )

    # Scenario 3: Search by image + Size filter.
    # Search for similar landscape images (width greater than height).
    search_by_image_with_filter(
        client, table_name, index_name,
        query_image_path,
        width_range=(800, 1024),
        top_k=5
    )

    print("\n" + "=" * 60)
    print("Search by image demo complete!")
    print("=" * 60)


if __name__ == "__main__":
    main()

Visualization search interface

Build an interactive search interface based on Gradio to provide an intuitive graphical user experience. This interface depends on the local image folder in the demo project and is suitable for quick tests and demonstrations. If you use your own data, refer to the code to implement a similar interface.

  1. Install Gradio and its dependencies.

    pip install gradio gradio_rangeslider
  2. Start the visualization interface.

    python src/gradio_app.py

    After the application starts, open the application URL (such as http://localhost:7860) to open the search interface.

    Feature

    Description

    Search by image

    Upload a local image to query for similar images.

    Search by natural language

    Enter a natural language description, such as "snow-capped mountains in the distance" or "a fluffy puppy running on the grass".

    Top K

    Set the number of results to return (1-30).

    Height/Width range

    Filter by image dimensions.

    City filter

    Filter by city (multiple selections are supported).

References