すべてのプロダクト
Search
ドキュメントセンター

Object Storage Service:Python 向け OSS SDK を使用したデータレプリケーション

最終更新日:Mar 27, 2025

データレプリケーションは、オブジェクトとオブジェクト操作 (作成、上書き、削除など) をソースバケットからデスティネーションバケットに自動的にレプリケートします。Object Storage Service (OSS) は、クロスリージョンレプリケーション (CRR) と同一リージョンレプリケーション (SRR) をサポートしています。

使用上の注意

  • このトピックのサンプルコードでは、中国 (杭州) リージョンのリージョン ID cn-hangzhou を使用しています。デフォルトでは、パブリックエンドポイントを使用してバケット内のリソースにアクセスします。バケットが配置されているのと同じ Alibaba Cloud サービスのリージョンからバケット内のリソースにアクセスする場合は、内部エンドポイントを使用します。OSS のリージョンとエンドポイントの詳細については、「リージョンとエンドポイント」をご参照ください。

  • デフォルトでは、Alibaba Cloud アカウントには、データレプリケーションに必要な権限があります。RAM ユーザーとして、または Security Token Service (STS) によって提供される一時的なアクセス認証情報を使用してデータをレプリケートする場合は、必要な権限を持っている必要があります。

    • データレプリケーションを有効にするには、oss:PutBucketReplication 権限が必要です。

    • レプリケーション時間制御 (RTC) 機能を有効または無効にするには、oss:PutBucketRtc 権限が必要です。

    • データレプリケーションルールを照会するには、oss:GetBucketReplication 権限が必要です。

    • データレプリケーションに使用可能なデスティネーションリージョンを照会するには、oss:GetBucketReplicationLocation 権限が必要です。

    • データレプリケーショントピックの進捗状況を照会するには、oss:GetBucketReplicationProgress 権限が必要です。

    • データレプリケーションを無効にするには、oss:DeleteBucketReplication 権限が必要です。

サンプルコード

データレプリケーションを有効にする

重要

データレプリケーションを有効にする前に、ソースバケットとデスティネーションバケットの両方がバージョン管理なしまたはバージョン管理有効の状態であることを確認してください。

次のサンプルコードは、データレプリケーションを有効にし、同じリージョンまたは異なるリージョンのソースバケットからデスティネーションバケットにデータをレプリケートするデータレプリケーショントピックを作成します。

import argparse
import alibabacloud_oss_v2 as oss

# コマンドライン引数パーサーを作成します。
parser = argparse.ArgumentParser(description="put bucket replication sample")
# コマンドライン引数を指定します。
parser.add_argument('--region', help='バケットが配置されているリージョン。', required=True)
parser.add_argument('--bucket', help='バケットの名前。', required=True)
parser.add_argument('--endpoint', help='他のサービスが OSS にアクセスするために使用できるドメイン名')
parser.add_argument('--sync_role', help='データをレプリケートするために OSS が使用することを承認するロール', required=True)
parser.add_argument('--target_bucket', help='データがレプリケートされるデスティネーションバケット', required=True)
parser.add_argument('--target_location', help='デスティネーションバケットが配置されているリージョン', required=True)

def main():
    # コマンドライン引数を解析します。
    args = parser.parse_args()

    # 認証のために環境変数からアクセス認証情報を取得します。
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # SDK のデフォルト構成を使用します。
    cfg = oss.config.load_default()
    # 認証プロバイダーを指定します。
    cfg.credentials_provider = credentials_provider
    # リージョンを指定します。
    cfg.region = args.region
    # エンドポイントがコマンドラインから提供されている場合は、構成内のエンドポイントを提供されたエンドポイントで更新します。
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # OSS クライアントを作成します。
    client = oss.Client(cfg)

    # PutBucketReplication リクエストを構成して実行します。
    result = client.put_bucket_replication(oss.PutBucketReplicationRequest(
            bucket=args.bucket,  # ソースバケットの名前。
            replication_configuration=oss.ReplicationConfiguration(
                rules=[oss.ReplicationRule(
                    source_selection_criteria=oss.ReplicationSourceSelectionCriteria(
                        sse_kms_encrypted_objects=oss.SseKmsEncryptedObjects(
                            status=oss.StatusType.ENABLED,
                        ),
                    ),
                    rtc=oss.ReplicationTimeControl(
                        status='disabled',  # RTC を無効にします。
                    ),
                    destination=oss.ReplicationDestination(
                        bucket=args.target_bucket,  # デスティネーションバケットの名前。
                        location=args.target_location,  # デスティネーションバケットのリージョン。
                        transfer_type=oss.TransferType.INTERNAL,  # 転送タイプ。
                    ),
                    historical_object_replication=oss.HistoricalObjectReplicationType.DISABLED,  # 既存データのレプリケーションを禁止します。
                    sync_role=args.sync_role,  # データレプリケーションのロール。
                    status='Disabled',  # ルールを無効にします。
                    prefix_set=oss.ReplicationPrefixSet(
                        prefixs=['aaa/', 'bbb/'],  # レプリケートされるオブジェクトの名前のプレフィックス。
                    ),
                    action='ALL',  # すべての操作。
                )],
            ),
    ))

    # HTTP ステータスコードとリクエスト ID を表示します。
    print(f'status code: {result.status_code}, request id: {result.request_id}')

if __name__ == "__main__":
    main()

データレプリケーションルールを照会する

次のコードは、バケットのデータレプリケーションルールを照会します。

import argparse
import alibabacloud_oss_v2 as oss

# コマンドライン引数パーサーを作成します。
parser = argparse.ArgumentParser(description="get bucket replication sample")
# コマンドライン引数を追加します。
parser.add_argument('--region', help='バケットが配置されているリージョン。', required=True)
parser.add_argument('--bucket', help='バケットの名前。', required=True)
parser.add_argument('--endpoint', help='他のサービスが OSS にアクセスするために使用できるドメイン名')

def main():
    # コマンドライン引数を解析します。
    args = parser.parse_args()

    # 環境変数からアクセス認証情報を取得します。
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # SDK のデフォルト構成を使用します。
    cfg = oss.config.load_default()
    # 認証プロバイダーを指定します。
    cfg.credentials_provider = credentials_provider
    # リージョンを指定します。
    cfg.region = args.region
    # エンドポイントがコマンドラインから提供されている場合は、構成内のエンドポイントを提供されたエンドポイントで更新します。
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # OSS クライアントを作成します。
    client = oss.Client(cfg)

    # バケットのレプリケーションルールを照会します。
    result = client.get_bucket_replication(oss.GetBucketReplicationRequest(
            bucket=args.bucket,
    ))

    # 基本的なレスポンス情報を表示します。
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' replication configuration: {result.replication_configuration}')

    # データレプリケーションルールがある場合は、ルールの詳細を表示します。
    if result.replication_configuration.rules:
        for r in result.replication_configuration.rules:
            print(f'result: source selection criteria: {r.source_selection_criteria}, '
                  f'rtc: {r.rtc}, destination: {r.destination}, '
                  f'historical object replication: {r.historical_object_replication}, '
                  f'sync role: {r.sync_role}, status: {r.status}, '
                  f'encryption configuration: {r.encryption_configuration}, '
                  f'id: {r.id}, prefix set: {r.prefix_set}, action: {r.action}')

if __name__ == "__main__":
    main()

RTC 機能を有効または無効にする

次のサンプルコードは、CRR ルールの RTC 機能を有効または無効にします。

import argparse
import alibabacloud_oss_v2 as oss

# コマンドライン引数パーサーを作成します。
parser = argparse.ArgumentParser(description="put bucket rtc sample")

# コマンドライン引数を追加します。
parser.add_argument('--region', help='バケットが配置されているリージョン。', required=True)
parser.add_argument('--bucket', help='バケットの名前。', required=True)
parser.add_argument('--endpoint', help='他のサービスが OSS にアクセスするために使用できるドメイン名')
parser.add_argument('--status', help='RTC を有効にするかどうかを指定します。有効な値: disabled、enabled', default='disabled')
parser.add_argument('--rule_id', help='RTC を構成するデータレプリケーションルールの ID。', required=True)

def main():
    # コマンドライン引数を解析します。
    args = parser.parse_args()

    # 環境変数からアクセス認証情報を取得します。
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # SDK のデフォルト構成を使用します。
    cfg = oss.config.load_default()
    # 認証プロバイダーを指定します。
    cfg.credentials_provider = credentials_provider
    # リージョンを指定します。
    cfg.region = args.region
    # エンドポイントがコマンドラインから提供されている場合は、構成内のエンドポイントを提供されたエンドポイントで更新します。
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # OSS クライアントを作成します。
    client = oss.Client(cfg)

    # RTS 状態構成を作成し、指定されたバケットに適用します。
    result = client.put_bucket_rtc(oss.PutBucketRtcRequest(
            bucket=args.bucket,
            rtc_configuration=oss.RtcConfiguration(
                rtc=oss.ReplicationTimeControl(
                    status=args.status,
                ),
                id=args.rule_id,
            ),
    ))

    # 基本的なレスポンス情報を表示します。
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id}')

if __name__ == "__main__":
    main()

データをレプリケートできるリージョンを照会する

次のサンプルコードは、指定されたソースバケットからデータをレプリケートできるリージョンを照会します。

import argparse
import alibabacloud_oss_v2 as oss

# コマンドライン引数パーサーを作成します。
parser = argparse.ArgumentParser(description="get bucket replication location sample")
# コマンドライン引数を追加します。
parser.add_argument('--region', help='バケットが配置されているリージョン。', required=True)
parser.add_argument('--bucket', help='バケットの名前。', required=True)
parser.add_argument('--endpoint', help='他のサービスが OSS にアクセスするために使用できるドメイン名')


def main():
    # コマンドライン引数を解析します。
    args = parser.parse_args()

    # 環境変数からアクセス認証情報を取得します。
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # SDK のデフォルト構成を使用します。
    cfg = oss.config.load_default()
    # 認証プロバイダーを指定します。
    cfg.credentials_provider = credentials_provider
    # リージョンを指定します。
    cfg.region = args.region
    # エンドポイントがコマンドラインから提供されている場合は、構成内のエンドポイントを提供されたエンドポイントで更新します。
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # OSS クライアントを作成します。
    client = oss.Client(cfg)

    # 指定されたバケット内のデータをレプリケートできるリージョンを照会します。
    result = client.get_bucket_replication_location(oss.GetBucketReplicationLocationRequest(
            bucket=args.bucket,
    ))

    # 基本的なレスポンス情報を表示します。
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' replication location: {result.replication_location},'
          f' location transfer type constraint: {result.replication_location.location_transfer_type_constraint},'
          # f' location: {result.replication_location.location_transfer_type_constraint.location_transfer_types[0].location},'
          # f' transfer types: {result.replication_location.location_transfer_type_constraint.location_transfer_types[0].transfer_types},'
          # f' location: {result.replication_location.location_transfer_type_constraint.location_transfer_types[1].location},'
          # f' transfer types: {result.replication_location.location_transfer_type_constraint.location_transfer_types[1].transfer_types},'
          f' locationrtc constraint: {result.replication_location.locationrtc_constraint},'
    )

    # データレプリケーションのデスティネーションリージョンに制限がある場合は、リージョンの詳細と転送タイプを表示します。
    if result.replication_location.location_transfer_type_constraint.location_transfer_types:
        for r in result.replication_location.location_transfer_type_constraint.location_transfer_types:
            print(f'result: location: {r.location}, transfer types: {r.transfer_types}')

if __name__ == "__main__":
    main()

データレプリケーショントピックの進捗状況を照会する

説明

既存データレプリケーショントピックと増分データレプリケーショントピックの進捗状況を照会できます。

  • 既存データレプリケーショントピックの進捗状況はパーセンテージで表されます。既存データレプリケーションが有効になっているバケットについてのみ、既存データレプリケーショントピックの進捗状況を照会できます。

  • 増分データレプリケーショントピックの進捗状況は、特定の時点として表されます。その時点より前にソースバケットに保存されているデータがレプリケートされます。

次のサンプルコードは、バケットの特定のレプリケーションルール ID を持つデータレプリケーショントピックの進捗状況を照会します。

import argparse
import alibabacloud_oss_v2 as oss

# コマンドライン引数パーサーを作成します。
parser = argparse.ArgumentParser(description="get bucket replication progress sample")
# コマンドライン引数を追加します。
parser.add_argument('--region', help='バケットが配置されているリージョン。', required=True)
parser.add_argument('--bucket', help='バケットの名前。', required=True)
parser.add_argument('--endpoint', help='他のサービスが OSS にアクセスするために使用できるドメイン名')
parser.add_argument('--rule_id', help='RTC を構成するデータレプリケーションルールの ID。', required=True)

def main():
    # コマンドライン引数を解析します。
    args = parser.parse_args()

    # 環境変数からアクセス認証情報を取得します。
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # SDK のデフォルト構成を使用します。
    cfg = oss.config.load_default()
    # 認証プロバイダーを指定します。
    cfg.credentials_provider = credentials_provider
    # リージョンを指定します。
    cfg.region = args.region
    # エンドポイントがコマンドラインから提供されている場合は、構成内のエンドポイントを提供されたエンドポイントで更新します。
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # OSS クライアントを作成します。
    client = oss.Client(cfg)

    # データレプリケーションの進捗状況を照会します。
    result = client.get_bucket_replication_progress(oss.GetBucketReplicationProgressRequest(
            bucket=args.bucket,
            rule_id=args.rule_id,
    ))

    # 基本的なレスポンス情報を表示します。
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' replication progress: {result.replication_progress}')

    # データレプリケーションルールが存在する場合は、ルールの詳細を表示します。
    if result.replication_progress.rules:
        for r in result.replication_progress.rules:
            print(f'result: historical object replication: {r.historical_object_replication}, '
                  f'progress: {r.progress}, '
                  f'id: {r.id}, '
                  f'prefix set: {r.prefix_set}, '
                  f'action: {r.action}, '
                  f'destination: {r.destination}, '
                  f'status: {r.status}')

if __name__ == "__main__":
    main()

データレプリケーションを無効にする

ソースバケットに構成されているレプリケーションルールを削除して、バケットのデータレプリケーションを無効にすることができます。

次のサンプルコードは、指定されたバケットから特定の ID を持つレプリケーションルールを削除します。

import argparse
import alibabacloud_oss_v2 as oss

# コマンドライン引数パーサーを作成します。
parser = argparse.ArgumentParser(description="delete bucket replication sample")

# コマンドライン引数を追加します。
parser.add_argument('--region', help='バケットが配置されているリージョン。', required=True)
parser.add_argument('--bucket', help='バケットの名前。', required=True)
parser.add_argument('--endpoint', help='他のサービスが OSS にアクセスするために使用できるドメイン名')
parser.add_argument('--rule_id', help='RTC を構成するデータレプリケーションルールの ID。', required=True)

def main():
    # コマンドライン引数を解析します。
    args = parser.parse_args()

    # 環境変数からアクセス認証情報を取得します。
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # SDK のデフォルト構成を使用します。
    cfg = oss.config.load_default()

    # 認証プロバイダーを指定します。
    cfg.credentials_provider = credentials_provider

    # リージョンを指定します。
    cfg.region = args.region

    # エンドポイントがコマンドラインから提供されている場合は、構成内のエンドポイントを提供されたエンドポイントで更新します。
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # OSS クライアントを作成します。
    client = oss.Client(cfg)

    # レプリケーションルールを削除します。
    result = client.delete_bucket_replication(oss.DeleteBucketReplicationRequest(
            bucket=args.bucket,
            replication_rules=oss.ReplicationRules(
                ids=[args.rule_id],
            ),
    ))

    # 基本的なレスポンス情報を表示します。
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},')

if __name__ == "__main__":
    main()