すべてのプロダクト
Search
ドキュメントセンター

Object Storage Service:チュートリアル:IPC デバイス向けのインテリジェントセマンティック検索システムの構築

最終更新日:Mar 01, 2026

Object Storage Service (OSS) のデータインデックス機能を使用して、インターネットプロトコル (IP) カメラで収集された動画向けのインテリジェントセマンティック検索システムを構築できます。このシステムにより、動画に対してセマンティック検索を実行でき、インテリジェントセキュリティなどのシナリオに最適です。

ソリューション概要

image

インテリジェントセマンティック検索システムを構築するには、次の 2 つのステップに従います:

  1. バケットの作成と動画のアップロード:IP カメラで収集された元の動画ファイルを保存するためのバケットを作成します。次に、処理する動画ファイルをアップロードします。これにより、後続の動画検索の基盤が提供されます。

  2. AISearch 機能の有効化:バケットの AISearch 機能を有効にして、自然言語の説明に基づいたインテリジェントな検索を可能にします。

メリット

  • セマンティック検索:自然言語の説明と複数の複合条件に基づいた正確な検索をサポートします。これにより、ターゲットフレームを迅速に特定し、複雑なシナリオでの検索要件を満たすことができます。

  • マルチモーダル検索:動画、画像、テキストなどのデータに対する統一された管理および検索機能を提供します。これにより、技術的な障壁と運用保守 (O&M) コストが削減されます。

  • 水平スケーリング:OSS は無制限の容量と弾性的なスケーラビリティを提供します。大量のデータ増加にも容易に対応できます。

1. バケットの作成と動画のアップロード

  1. OSS コンソールにログインします。

  2. [バケット] ページに移動し、[バケットの作成] をクリックします。

  3. [バケットの作成] ページで、ipc-videos-oss-metaquery-demo のように、ビジネスに関連するバケット名を入力します。他のパラメーターはデフォルト設定のままでかまいません。

  4. [作成] をクリックします。バケットが作成されたら、[バケットの表示] をクリックします。

  5. [ファイルリスト] ページで、[ファイルのアップロード] > [ファイルのスキャン] をクリックします。 アップロードしたい動画ファイル、例えば Video A.mp4Video B.mp4Video C.mp4 を選択します。他のパラメーターはデフォルト構成のままにし、[ファイルのアップロード] をクリックします。

  6. (オプション) アップロードした動画ファイルにタグを設定します。対象ファイルの [操作] 列で、more > [その他] > [タギング] を選択します。表示されるダイアログボックスで、タグのキーと値のペアを追加します。例えば、インデックス作成時のフィルター条件として使用するために、タグキーを need-seek、値を true に設定します。インデックスクエリのフィルター条件として使用するために、タグキーを camera、値を camera-a に設定します。[OK] をクリックします。動画にタグを追加することで、インデックス作成時や検索時に、より正確なフィルタリングが可能になります。

2. AISearch 機能の有効化

バケットの AISearch 機能を有効にすると、自然言語の説明と複数の複合条件に基づいて動画を正確に検索できるようになります。

  1. 左側のナビゲーションウィンドウで、[ファイル管理] > [データインデックス] を選択します。

  2. [データインデックス] ページで、データインデックス機能を初めて使用する場合は、指示に従って AliyunMetaQueryDefaultRole ロールに権限を付与します。これにより、OSS がバケット内のデータを管理できるようになります。権限を付与した後、[データインデックスの有効化] をクリックし、[AISearch] を選択します。

  3. (オプション) AI コンテンツ認識:必要に応じて、[画像コンテンツ認識] または [動画コンテンツ認識] を選択します。

  4. (オプション) ファイルフィルタリングルール:特定のルールに一致するファイルのみに AI 分析を実行するように、このオプションを設定します。最大 5 つのファイルフィルタリングルールを設定できます。プレフィックス、ファイルサイズ、タグ、または LastModifiedTime でフィルタリングできます。例えば、タグフィルタリングルールを追加し、キーを need-seek、値を true に設定します。設定後、システムはこのタグを持つファイルのみをインデックス化します。

    説明

    ファイルフィルタリングを有効にすると、データインデックス、AISearch、およびコンテンツ認識の料金は、フィルタリングされたファイルの数にのみ基づいて課金されます。

  5. [有効化] をクリックします。

説明

メタデータインデックスの構築には時間がかかります。正確な所要時間は、バケット内のオブジェクト数によって異なります。時間がかかりすぎる場合は、ページをリフレッシュして有効化のステータスを確認できます。

image

image

結果の検証

a yard with a parked car (駐車された車のある庭) のような説明的なテキストを入力するだけで、システムは説明に一致する関連動画を返します。

  1. [バケット] ページで、ご利用のバケットの名前をクリックします。

  2. [ファイル] ページで、動画がアップロードされていることを確認します。

  3. 左側のナビゲーションウィンドウで、[ファイル管理] > [データインデックス] を選択します。

  4. [データインデックス] ページで、検索ボックスに a yard with a parked car と入力します。[メディアタイプ] で、Video を選択します。

  5. (オプション) [オブジェクトタギング] で、キーを camera、値を camera-a に設定します。システムは、タグ camera=camera-a を持つ動画のみを返し、他のすべての動画をフィルタリングします。

  6. [検索] をクリックします。

  7. 検索結果からファイルパスをコピーします。オブジェクト ページに戻り、コピーしたファイルパスを検索ボックスに貼り付けて検索し、説明に一致する動画を表示します。

2025-05-23_16-41-02 (1)

本番環境への適用

この機能を本番環境に統合する必要がある場合は、次の点を考慮してください:

本番データのインジェスト

実際のビジネスシナリオでは、IP カメラなどの監視デバイスは、大量の動画データを継続的に生成します。OSS Software Development Kit (SDK) を統合して、録画された動画セグメントを指定されたバケットにリアルタイムでアップロードすることを推奨します。これにより、データアップロードの安定性と適時性が確保され、システム全体の可用性とリアルタイム処理能力が向上します。

次の例は、OSS Python SDK を使用してファイルアップロードマネージャーを呼び出し、動画をアップロードする方法を示しています:

コードサンプル

import argparse
import alibabacloud_oss_v2 as oss
from alibabacloud_oss_v2.models import ListObjectsRequest, PutObjectTaggingRequest, Tagging, TagSet, Tag, PutObjectRequest
import os

def upload_video(client, bucket, video_config):
    """# 動画をアップロード"""
    try:
        # ファイルが存在するかどうかを確認
        if not os.path.exists(video_config['path']):
            print(f'File does not exist: {video_config["path"]}')
            return False

        # ファイルをアップロードするためのオブジェクトを作成
        uploader = client.uploader()

        # アップロードリクエストを実行 - filepath パラメーターを正しく渡す
        result = uploader.upload_file(
            filepath=video_config['path'],  # filepath パラメーターを追加
            request=PutObjectRequest(
                bucket=bucket,
                key=video_config['key']
            )
        )

        print(f'Upload {video_config["key"]} successful:')
        print(f'status code: {result.status_code},'
              f' request id: {result.request_id}')
        return True
    except Exception as e:
        print(f'Upload {video_config["key"]} failed: {str(e)}')
        return False

def main():
    # OSS の構成
    args = argparse.Namespace(
        region='cn-beijing',
        bucket='ipc-videos-oss-metaquery-demo',
        endpoint='https://oss-cn-beijing.aliyuncs.com'
    )

    # OSS クライアントを構成
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider
    cfg.region = args.region
    cfg.endpoint = args.endpoint
    client = oss.Client(cfg)

    # 動画ファイルの設定 (異なるタグを含む)
    videos = [
        {
            'path': '<Your-Path>/VideoA.mp4',
            'key': 'VideoA.mp4'
        },
        {
            'path': '<Your-Path>/VideoB.mp4',
            'key': 'VideoB.mp4'
        },
        {
            'path': '<Your-Path>/VideoC.mp4',
            'key': 'VideoC.mp4'
        }
    ]

    # すべての動画をアップロード
    for video in videos:
        print(f"\nStart uploading video: {video['key']}")
        upload_video(
            client=client,
            bucket=args.bucket,
            video_config=video
        )

if __name__ == "__main__":
    main()

検索機能の統合

本番環境では、検索機能をバックエンドサービスに統合することを推奨します。OSS SDK を使用して呼び出しを自動化し、コンソールでの手動操作を回避します。

次のコードサンプルは、OSS MetaQuery 仕様に準拠した XML リクエストを構築して検索結果を取得する方法を示しています:

コードサンプル

# -*- coding: utf-8 -*-
import argparse
import alibabacloud_oss_v2 as oss
# XML 応答を解析
import xml.etree.ElementTree as ET
import json 
from datetime import datetime 

def get_search_conditions():
    """# ユーザーから複数条件の入力を取得"""
    print("Enter a semantic description (example: a yard with a parked car)")
    query = input("> Semantic keywords: ").strip()
    while not query:
        print("Semantic keywords cannot be empty!")
        query = input("> Semantic keywords: ").strip()
    return query  

def build_metaquery_xml(query):
    """# OSS 仕様に準拠した MetaQuery XML を構築"""
    xml_parts = [f'<Query>{query}</Query>']
    # MediaTypes タグを追加 (ここでは video にハードコーディング)
    xml_parts.append('<MediaTypes><MediaType>video</MediaType></MediaTypes>')

    meta_query_xml = f'''<MetaQuery>
    {"".join(xml_parts)}
</MetaQuery>'''
    return meta_query_xml  # UTF-8 バイトストリームとしてエンコード


def format_result(key, pre_url):
    """# 単一の検索結果をフォーマット"""
    return f""" File path: {key}
 File address: {pre_url}
-----------------------"""

def semantic_search():
    # コマンドライン引数に直接値を割り当て
    args = argparse.Namespace(
        region = 'cn-beijing',  # ご利用のリージョンに置き換えてください
        bucket = 'ipc-videos-oss-metaquery-demo',  # ご利用のバケット名に置き換えてください
        endpoint = 'https://oss-cn-beijing.aliyuncs.com',  # ご利用のエンドポイントに置き換えてください。不要な場合は、空のままにするか削除できます。
    )

    # OSS クライアントを初期化
    credentials = oss.credentials.EnvironmentVariableCredentialsProvider()
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials
    cfg.region = args.region
    if args.endpoint:
        cfg.endpoint = args.endpoint
    client = oss.Client(cfg)

    # ユーザー入力を取得
    query = get_search_conditions()  # クエリのみを取得
    # リクエストを構築
    try:
        # リクエストボディ (XML データ) を構築
        data_str = build_metaquery_xml(query)

        # 操作の入力を定義
        req = oss.OperationInput(
            op_name='DoMetaQuery',  # カスタムアクション名
            method='POST',            # HTTP メソッド
            parameters={              # クエリパラメーター
                'metaQuery': '',
                'mode': 'semantic',
                'comp': 'query',
            },
            headers=None,             # カスタムリクエストヘッダー (オプション)
            body=data_str.encode("utf-8"),  # リクエストボディ (UTF-8 バイトストリームとしてエンコード)
            bucket=args.bucket,       # ターゲットバケット名
        )

        # 汎用インターフェイスを呼び出して操作を実行
        resp = client.invoke_operation(req)

    except oss.exceptions.ServiceError as e:
        print(f" Server error: {e.message}")
        return
    
    root = ET.fromstring(resp.http_response.content.decode('utf-8'))
     # すべての File 要素を検索
    files = root.findall('.//File')
    
    print(f"\n Found {len(files)} matching results:")
    


    for i, file in enumerate(files, 1):
        print(f"\nFile {i}:")

        # さまざまなプロパティを取得して出力
        uri_element = file.find('URI')
        uri = uri_element.text if uri_element is not None else 'N/A'
        print(f"  URI: {uri}")

        key_element = file.find('Filename')
        key = key_element.text if key_element is not None else 'N/A'
        print(f"  File name: {key}")

        size_element = file.find('Size')
        size = size_element.text if size_element is not None else 'N/A'
        print(f"  Size: {size}")

        modified_time_element = file.find('FileModifiedTime')
        modified_time = modified_time_element.text if modified_time_element is not None else 'N/A'
        print(f"  Modified time: {modified_time}")

        content_type_element = file.find('ContentType')
        content_type = content_type_element.text if content_type_element is not None else 'N/A'
        print(f"  ContentType: {content_type}")

        media_type_element = file.find('MediaType')
        media_type = media_type_element.text if media_type_element is not None else 'N/A'
        print(f"  MediaType: {media_type}")

        # 必要に応じて、ImageHeight、ImageWidth、OSSStorageClass など、さらに多くのプロパティを取得して出力できます。
        # image_height_element = file.find('ImageHeight')
        # image_height = image_height_element.text if image_height_element is not None else 'N/A'
        # print(f"  ImageHeight: {image_height}")

        # 署名付き URL を生成 (必要な場合)
        if key != 'N/A': # ファイル名が存在する場合にのみ URL を生成
             try:
                pre_url = client.presign(
                    oss.GetObjectRequest(
                        bucket=args.bucket,  # バケット名を指定
                        key=key,        # オブジェクトキーを指定
                    )
                )
                print(f"  File address (signed URL): {pre_url.url}")
             except Exception as e:
                 print(f"  Failed to generate signed URL: {e}")


        print("-" * 20) # 区切り文字
        
if __name__ == "__main__":
    semantic_search()

プログラムを実行した後、a yard with a parked car のような説明的なテキストを入力して検索を実行できます。システムはデータインデックスを使用して、説明に一致する検索結果を返します。URL を使用して、動画の詳細を直接表示できます。

Found 1 matching result:

File 1:
  URI: oss://ipc-videos-oss-metaquery-demo/VideoA.mp4
  File name: VideoA.mp4
  Size: 2311252
  Modified time: 2025-05-23T17:38:10+08:00
  ContentType: video/mp4
  MediaType: video
  File address (signed URL): https://ipc-videos-oss-metaquery-demo.oss-cn-beijing.aliyuncs.com/VideoA.mp4?x-oss-signature-version=OSS4-HMAC-SHA256&x-oss-date=20250523T094511Z&x-oss-expires=900&x-oss-credential=LTAI********************%2F20250523%2Fcn-beijing%2Foss%2Faliyun_v4_request&x-oss-signature=0bf38092c42a179ff0e8334c8bea3fd92f5a78599038e816e2ed3e02755542af
--------------------

タグフィルターの設定

大量の動画データを扱う場合、パスだけでファイルを管理することは、検索や分類において非効率なことがよくあります。OSS のタギング機能を使用して、ファイルにキーと値のタグを追加することを推奨します。これにより、カメラ ID や地理的リージョンによるフィルタリングなど、ビジネスニーズに基づいてデータを迅速にフィルタリングおよび分類できます。

システムに分析対象の 3 つの動画ファイル (VideoA.mp4VideoB.mp4VideoC.mp4) が次のようにあると仮定します:

VideoA.mp4

VideoB.mp4

VideoC.mp4

example

example (1)

example

裏庭の動画、camera-a で撮影されたものとしてマーク

自動販売機の動画、camera-b で撮影されたものとしてマーク

裏庭の動画、内容は Video A に似ており、camera-c で撮影されたものとしてマーク

ファイルをアップロードする際に直接タグを設定できます。また、アップロード後に動的に管理して、さまざまなビジネスニーズに対応することもできます。

アップロード時のタグ設定

動画ファイルをアップロードする際にタグを設定して、アップロードとタグ管理の操作を統合できます。これにより、データ管理の効率が向上します。

次の例は、OSS Python SDK のファイルアップロードマネージャーを使用して動画ファイルをアップロードし、同時にタグを設定する方法を示しています:

コードサンプル

import argparse
import alibabacloud_oss_v2 as oss
from alibabacloud_oss_v2.models import PutObjectRequest
import os

def upload_video_with_tags(client, bucket, video_config):
    """# タグ付きで動画をアップロード"""
    try:
        # ファイルが存在するかどうかを確認
        if not os.path.exists(video_config['path']):
            print(f'File does not exist: {video_config["path"]}')
            return False

        # ファイルをアップロードするためのオブジェクトを作成
        uploader = client.uploader()

        # タグ文字列を直接使用 (キーと値が単純な文字列の場合、エンコーディングは不要)
        tagging_str = f"camera={video_config['camera_tag']}"

        # アップロードリクエストを実行 - タグを直接付与
        result = uploader.upload_file(
            filepath=video_config['path'],
            request=PutObjectRequest(
                bucket=bucket,
                key=video_config['key'],
                tagging=tagging_str
            )
        )

        print(f'Upload {video_config["key"]} successful:')
        print(f'status code: {result.status_code},'
              f' request id: {result.request_id}')
        print(f'Tag added: {tagging_str}')
        return True
    except Exception as e:
        print(f'Upload {video_config["key"]} failed: {str(e)}')
        return False


def main():
    # OSS の構成
    args = argparse.Namespace(
        region='cn-beijing',
        bucket='ipc-videos-oss-metaquery-demo',
        endpoint='https://oss-cn-beijing.aliyuncs.com'
    )

    # OSS クライアントを構成
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider
    cfg.region = args.region
    cfg.endpoint = args.endpoint
    client = oss.Client(cfg)

    # 動画ファイルの設定 (異なるタグを含む)
    videos = [
        {
            'path': '<Your-Path>/VideoA.mp4',
            'key': 'VideoA.mp4',
            'camera_tag': 'camera-a'
        },
        {
            'path': '<Your-Path>/VideoB.mp4',
            'key': 'VideoB.mp4',
            'camera_tag': 'camera-b'
        },
        {
            'path': '<Your-Path>/VideoC.mp4',
            'key': 'VideoC.mp4',
            'camera_tag': 'camera-c'
        }
    ]

    # すべての動画をアップロードしてタグを追加
    for video in videos:
        print(f"\nStart uploading video: {video['key']}")
        upload_video_with_tags(
            client=client,
            bucket=args.bucket,
            video_config=video
        )


if __name__ == "__main__":
    main()
アップロード後のタグ管理

ファイルが既にアップロードされている場合でも、いつでもタグを追加または変更できます。これにより、データタグの動的なメンテナンスと正確性が確保されます。

次の例は、Python SDK を使用して API 操作を呼び出し、タグを追加する方法を示しています:

コードサンプル

import argparse
import alibabacloud_oss_v2 as oss
from alibabacloud_oss_v2.models import GetObjectTaggingRequest, PutObjectTaggingRequest, Tagging, TagSet, Tag


def apply_tags_to_frame(client, bucket, frame_key, tags):
    """# 動画にタグを追加"""
    try:
        # タギングオブジェクトを構築
        tagging = Tagging(
            version=1,
            tag_set=TagSet(tags=tags)
        )
        
        # タグを更新するリクエストを作成
        put_tag_request = PutObjectTaggingRequest(
            bucket=bucket,
            key=frame_key,
            tagging=tagging
        )
        
        # オブジェクトのタグを更新
        result = client.put_object_tagging(put_tag_request)
        
        # 出力用にタグを文字列に変換
        tags_str = '&'.join([f"{tag.key}={tag.value}" for tag in tags])
        print(f"Successfully added tags to {frame_key}: {tags_str}")
        return True
    except Exception as e:
        print(f"Failed to add tags: {str(e)}")
        return False

def frame_tags():
    # OSS の構成
    args = argparse.Namespace(
        region='cn-beijing',   
        frame_bucket='ipc-videos-oss-metaquery-demo',     
        endpoint='https://oss-cn-beijing.aliyuncs.com'
    )
    
    # OSS クライアントを構成
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider
    cfg.region = args.region
    cfg.endpoint = args.endpoint
    client = oss.Client(cfg)

    # ご利用の動画名を設定
    videos = [
        'VideoA.mp4',    
        'VideoB.mp4',
        'VideoC.mp4'
    ]

    # 各動画を処理
    for video_filename in videos:
        print(f"\nStart processing tags for video {video_filename}") # 元の抽出されたファイル名を出力

        # ファイル名拡張子を削除、例:'VideoA.mp4' -> 'VideoA'
        # サンプルコードでは、動画ファイル名の最後の文字をカメラ識別子として抽出します。これは参考用です。実際のビジネス要件に基づいてタグを追加できます。
        video_name_base = video_filename.split('.')[0] if '.' in video_filename else video_filename
        tags = [
            Tag(key='camera', value=f'camera-{video_name_base[-1].lower()}' if video_name_base else 'camera-unknown'),
            # Tag(key='category', value='video_monitoring')       # 実際の業務カテゴリ情報を追加
        ]

        # 動画にタグを追加
        apply_tags_to_frame(client, args.frame_bucket, video_filename, tags)

        print(f"Finished processing tags for video {video_filename}\n") # 元の抽出されたファイル名を出力

if __name__ == "__main__":
    frame_tags()
タグフィルターを使用した検索

次の例は、OSS Python SDK を使用して、セマンティック理解とタグフィルタリングの両方を使用する複合クエリリクエストを送信する方法を示しています:

コードサンプル

# -*- coding: utf-8 -*-
import argparse
import alibabacloud_oss_v2 as oss
# XML 応答を解析
import xml.etree.ElementTree as ET
import json
from datetime import datetime
# --- 新規インポート ---
from alibabacloud_oss_v2.models import GetObjectTaggingRequest
# --- 新規インポートの終わり ---

def get_inputs():
    """# ユーザーのセマンティッククエリ (必須) とフィルター条件 (オプション) を取得"""
    # 1. 必須のセマンティッククエリを取得
    print(" Enter a semantic description (example: a yard with a parked car)")
    query = input("> Semantic keywords: ").strip()
    while not query:
        print(" Semantic keywords cannot be empty!")
        query = input("> Semantic keywords: ").strip()

    conditions = [] # フィルター条件のリストを初期化
    target_tag = None # --- ターゲットタグを格納するために使用 ---

    # 2. オプションのタグフィルターを取得
    print("\n (Optional) Enter tag filter conditions (used for client-side filtering, example: camera=camera-a, press Enter to skip)") 
    tag_input = input("> Tag (format key=value): ").strip()
    if tag_input:
        if '=' in tag_input:
            # conditions.append({
            #     'field': 'Tags',
            #     'value': tag_input,
            #     'op': 'eq'
            # })
            target_tag = tag_input
            print(f" Info: The tag '{target_tag}' will be used for client-side filtering after getting the results.")
        else:
             print(" Incorrect tag format, ignored. Use the 'key=value' format.")

    # --- target_tag を返す ---
    return query, conditions, target_tag

def build_metaquery_xml(query, conditions):
    """# OSS 仕様に準拠した MetaQuery XML を構築 (セマンティッククエリとオプションのフィルターを含む)"""
    # 常にセマンティッククエリ部分を含める
    xml_parts = [f'<Query>{query}</Query>']
    
    # メディアタイプの制限を追加
    xml_parts.append('<MediaTypes><MediaType>video</MediaType></MediaTypes>')
    
    # オプションのフィルター条件を追加 - セマンティックモードでは、一部のフィールドのみがサポートされます (ここでは Filename など)
    for cond in conditions:
        # セマンティックモードでは、Tags フィールドの SimpleQuery の構築をスキップします (条件に Tags を追加しなくなりましたが、念のため残しておきます)
        if cond['field'] == 'Tags':
            continue

        json_query = json.dumps({
            "Field": cond['field'],
            "Value": cond['value'],
            "Operation": cond['op']
        }, ensure_ascii=False)
        xml_parts.append(f'<SimpleQuery>{json_query}</SimpleQuery>')

    # 完全な MetaQuery XML に結合
    meta_query_xml = f'''<MetaQuery>
    {"".join(xml_parts)}
</MetaQuery>'''
    return meta_query_xml

def format_result(key, pre_url):
    """# 単一の検索結果をフォーマット"""
    return f""" File address: {pre_url}
 File path: {key}
-----------------------"""

def perform_search():
    # コマンドライン引数に直接値を割り当て
    args = argparse.Namespace(
        region='cn-beijing',  # ご利用のリージョンに置き換えてください
        bucket='ipc-videos-oss-metaquery-demo',  # ご利用のバケット名に置き換えてください
        endpoint='https://oss-cn-beijing.aliyuncs.com',  # ご利用のエンドポイントに置き換えてください
    )

    # OSS クライアントを初期化
    credentials = oss.credentials.EnvironmentVariableCredentialsProvider()
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials
    cfg.region = args.region
    if args.endpoint:
        cfg.endpoint = args.endpoint
    client = oss.Client(cfg)

    # ユーザー入力 (セマンティッククエリ + オプション条件 + ターゲットタグ) を取得
    query, conditions, target_tag = get_inputs()

    # リクエストを構築
    try:
        # リクエストボディ (XML データ) を構築
        # --- 条件のみを渡す (Tags なし) ---
        data_str = build_metaquery_xml(query, conditions)

        # 操作の入力を定義
        req = oss.OperationInput(
            op_name='DoMetaQuery',
            method='POST',
            parameters={
                'metaQuery': '',
                'mode': 'semantic', # <-- セマンティックモードを再追加
                'comp': 'query',
            },
            headers=None,
            body=data_str.encode("utf-8"),
            bucket=args.bucket,
        )

        # 汎用インターフェイスを呼び出して操作を実行
        print("\n Sending DoMetaQuery request...")
        resp = client.invoke_operation(req)
        print(f"\n Request successful, HTTP status code: {resp.http_response.status_code}")
    except oss.exceptions.ServiceError as e:
        print(f" Server error (ServiceError): {e.message}")
        print(f"   - HTTP Status Code: {e.status_code}")
        print(f"   - Error Code: {e.error_code}")
        print(f"   - Request ID: {e.request_id}")
        return
    except oss.exceptions.ClientError as e: # クライアントエラーのキャッチを追加
        print(f" Client or network error (ClientError): {e.message}")
        return
    except Exception as e: # その他の考えられる例外をキャッチ
        print(f" Unknown error: {e}")
        import traceback
        traceback.print_exc() # 完全なトレースバックを出力
        return


    # 結果を解析して処理...
    final_results_count = 0 # --- 新規:最終的な一致結果を集計するためのカウンター ---
    try:
        root = ET.fromstring(resp.http_response.content.decode('utf-8'))
        # すべての File 要素を検索
        files = root.findall('.//File')

        print(f"\n Got {len(files)} initial matching results from OSS, starting client-side tag filtering...")

        if not files:
            # NextContinuationToken を確認、ページングが必要な場合があります
            next_token_elem = root.find('.//NextContinuationToken')
            if next_token_elem is not None and next_token_elem.text:
                print(" Note: There may be more results. The current implementation does not handle paging.")
            print("\n No initial matching results.")
            return # ファイルがない場合は早期にリターン


        for i, file in enumerate(files, 1):
            # ファイル名を取得
            key_element = file.find('Filename')
            if key_element is None:
                print(f" Warning: The {i}-th initial result is missing the Filename field, skipped.")
                continue
            key = key_element.text

            # --- クライアントサイドのタグフィルタリング ---
            if target_tag:
                try:
                    tagging_req = GetObjectTaggingRequest(bucket=args.bucket, key=key)
                    tagging_resp = client.get_object_tagging(tagging_req)
                    # 返されたタグセットにターゲットタグが含まれているか確認
                    tag_found = False
                    target_k, target_v = target_tag.split('=', 1)

                    if tagging_resp.tag_set and tagging_resp.tag_set.tags: # .tags を使用
                        for tag in tagging_resp.tag_set.tags: # .tags を使用

                            if tag.key == target_k and tag.value == target_v:
                                tag_found = True
                                break
                    if not tag_found:
                        continue # タグが一致しないため、このファイルをスキップ
                except oss.exceptions.ServiceError as tag_err:
                     if tag_err.status_code == 404 and tag_err.error_code == 'NoSuchTagSet':
                         continue
                     else:
                        print(f" Warning: Error getting tags for file '{key}': {tag_err.error_code} - {tag_err.message}, skipped.")
                        continue
                except Exception as tag_e:
                    print(f" Warning: Unknown error while getting or processing tags for file '{key}': {tag_e}, skipped.")
                    # --- デバッグ用のトレースバックを追加 ---
                    import traceback
                    traceback.print_exc()
                    # --- 追加の終わり ---
                    continue
            # --- クライアントサイドのタグフィルタリングの終わり ---

            # --- タグフィルターを通過した場合 (またはタグフィルターが設定されていない場合)、結果を処理して出力 ---
            final_results_count += 1 # 最終結果カウンターをインクリメント
            print(f"\n[{final_results_count}] File '{key}' matches all conditions:") # 最終結果番号を出力

            # 署名付き URL を生成
            try:
                pre_url = client.presign(
                    oss.GetObjectRequest(
                        bucket=args.bucket,
                        key=key,
                    )
                )
                print(format_result(key, pre_url.url))
            except Exception as presign_e:
                print(f" Warning: Error generating signed URL for file '{key}': {presign_e}")
                print(format_result(key, "[Could not generate URL]"))
        # --- ループ後に最終統計を出力 ---
        print(f"\n Client-side filtering complete, found {final_results_count} final matching results.")

    except ET.ParseError as xml_e:
        print(f" Error: Error parsing OSS response XML - {xml_e}")
    except Exception as parse_e:
        print(f" Error: Unexpected error while processing results - {parse_e}")


if __name__ == "__main__":
    perform_search()

プログラムを実行した後、駐車された車のある庭を含む動画をフィルタリングするには、次の手順を実行します:

  1. 説明フィールドに、検索キーワード a yard with a parked car を入力します。

  2. タグフィルター条件を camera = camera-a に設定します。

この例では、Video A と Video C の両方に、説明 a yard with a parked car に一致するシーンが含まれています。ただし、camera-a のタグが付いた動画の検索結果のみを保持するタグフィルターを設定したため、最終的な検索結果には Video A のみが含まれます。

Sending DoMetaQuery request...

Request successful, HTTP status code: 200

Got 2 initial matching results from OSS, starting client-side tag filtering...

[1] File 'VideoA.mp4' matches all conditions:
 File address: https://ipc-videos-oss-metaquery-demo.oss-cn-beijing.aliyuncs.com/VideoA.mp4?x-oss-signature-version=OSS4-HMAC-SHA256&x-oss-date=20250526T054908Z&x-oss-expires=900&x-oss-credential=LTAI********************%2Fcn-beijing%2Foss%2Faliyun_v4_request&x-oss-signature=01bbf29790763d8e0f177d4cb0469cb00ae1c69d565219edb3866f75110b37ab
 File path: VideoA.mp4
-----------------------

 Client-side filtering complete, found 1 final matching result.