All Products
Search
Document Center

Object Storage Service:Data replication (Python SDK V2)

Last Updated:Mar 20, 2026

Data replication automatically copies objects and object operations—including creation, overwriting, and deletion—from a source bucket to a destination bucket. Object Storage Service (OSS) supports two replication modes: cross-region replication (CRR) and same-region replication (SRR).

Usage notes

  • The sample code in this topic uses cn-hangzhou (China (Hangzhou)) as the example region and connects through the public endpoint by default. To access OSS from other Alibaba Cloud services in the same region, use an internal endpoint instead. For the full list of regions and endpoints, see Regions and endpoints.

  • Alibaba Cloud accounts have all data replication permissions by default. RAM users and identities authenticated with Security Token Service (STS) temporary credentials must be granted the relevant permissions explicitly.

    OperationRequired permission
    Enable data replicationoss:PutBucketReplication
    Enable or disable RTCoss:PutBucketRtc
    Query replication rulesoss:GetBucketReplication
    Query destination regionsoss:GetBucketReplicationLocation
    Query replication progressoss:GetBucketReplicationProgress
    Disable data replicationoss:DeleteBucketReplication

Prerequisites

Before you begin, ensure that you have:

  • Both the source bucket and the destination bucket in the unversioned or versioning-enabled state

  • The permissions listed in Usage notes for any operations your RAM user or STS identity needs to perform

Enable data replication

Important

Both the source bucket and the destination bucket must be in the unversioned or versioning-enabled state before you enable data replication.

The following code creates a replication rule that copies data from a source bucket to a destination bucket, either in the same region or across regions. It uses oss.PutBucketReplicationRequest to submit the configuration.

import argparse
import alibabacloud_oss_v2 as oss

# Create a command-line argument parser.
parser = argparse.ArgumentParser(description="put bucket replication sample")
# Specify the command-line arguments.
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
parser.add_argument('--sync_role', help='The role that you want to authorize OSS to use to replicate data', required=True)
parser.add_argument('--target_bucket', help='The destination bucket to which data is replicated', required=True)
parser.add_argument('--target_location', help='The region in which the destination bucket is located', required=True)

def main():
    # Parse the command-line arguments.
    args = parser.parse_args()

    # Obtain access credentials from environment variables for authentication.
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # Use the default configuration of the SDK.
    cfg = oss.config.load_default()
    # Specify the credential provider.
    cfg.credentials_provider = credentials_provider
    # Specify the region.
    cfg.region = args.region
    # If an endpoint is provided from the command line, update the endpoint in the configuration with the provided endpoint.
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # Create an OSS client.
    client = oss.Client(cfg)

    # Configure and execute the PutBucketReplication request.
    result = client.put_bucket_replication(oss.PutBucketReplicationRequest(
            bucket=args.bucket,  # The name of the source bucket.
            replication_configuration=oss.ReplicationConfiguration(
                rules=[oss.ReplicationRule(
                    source_selection_criteria=oss.ReplicationSourceSelectionCriteria(
                        sse_kms_encrypted_objects=oss.SseKmsEncryptedObjects(
                            status=oss.StatusType.ENABLED,
                        ),
                    ),
                    rtc=oss.ReplicationTimeControl(
                        status='disabled',  # Disable RTC.
                    ),
                    destination=oss.ReplicationDestination(
                        bucket=args.target_bucket,  # The name of the destination bucket.
                        location=args.target_location,  # The region of the destination bucket.
                        transfer_type=oss.TransferType.INTERNAL,  # The transfer type.
                    ),
                    historical_object_replication=oss.HistoricalObjectReplicationType.DISABLED,  # Forbid replication of historical data.
                    sync_role=args.sync_role,  # The role for data replication.
                    status='Disabled',  # Disable the rule.
                    prefix_set=oss.ReplicationPrefixSet(
                        prefixs=['aaa/', 'bbb/'],  # The prefixes in the names of objects to be replicated.
                    ),
                    action='ALL',  # All operations.
                )],
            ),
    ))

    # Display the HTTP status code and request ID.
    print(f'status code: {result.status_code}, request id: {result.request_id}')

if __name__ == "__main__":
    main()

Query replication rules

The following code retrieves all replication rules configured for a bucket using oss.GetBucketReplicationRequest.

import argparse
import alibabacloud_oss_v2 as oss

# Create a command-line argument parser.
parser = argparse.ArgumentParser(description="get bucket replication sample")
# Add the command-line arguments.
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')

def main():
    # Parse the command-line arguments.
    args = parser.parse_args()

    # Obtain access credentials from environment variables.
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # Use the default configuration of the SDK.
    cfg = oss.config.load_default()
    # Specify the credential provider.
    cfg.credentials_provider = credentials_provider
    # Specify the region.
    cfg.region = args.region
    # If an endpoint is provided from the command line, update the endpoint in the configuration with the provided endpoint.
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # Create an OSS client.
    client = oss.Client(cfg)

    # Query the replication rules for the bucket.
    result = client.get_bucket_replication(oss.GetBucketReplicationRequest(
            bucket=args.bucket,
    ))

    # Display the basic response information.
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' replication configuration: {result.replication_configuration}')

    # If there are data replication rules, display rule details.
    if result.replication_configuration.rules:
        for r in result.replication_configuration.rules:
            print(f'result: source selection criteria: {r.source_selection_criteria}, '
                  f'rtc: {r.rtc}, destination: {r.destination}, '
                  f'historical object replication: {r.historical_object_replication}, '
                  f'sync role: {r.sync_role}, status: {r.status}, '
                  f'encryption configuration: {r.encryption_configuration}, '
                  f'id: {r.id}, prefix set: {r.prefix_set}, action: {r.action}')

if __name__ == "__main__":
    main()

Enable or disable the RTC feature

Replication time control (RTC) applies to CRR rules. The following code toggles RTC for a specific replication rule using oss.PutBucketRtcRequest.

import argparse
import alibabacloud_oss_v2 as oss

# Create a command-line argument parser.
parser = argparse.ArgumentParser(description="put bucket rtc sample")

# Add the command-line arguments.
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
parser.add_argument('--status', help='Specifies whether to enable RTC. Valid values: disabled, enabled', default='disabled')
parser.add_argument('--rule_id', help='The ID of the data replication rule for which you want to configure RTC.', required=True)

def main():
    # Parse the command-line arguments.
    args = parser.parse_args()

    # Obtain access credentials from environment variables.
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # Use the default configuration of the SDK.
    cfg = oss.config.load_default()
    # Specify the credential provider.
    cfg.credentials_provider = credentials_provider
    # Specify the region.
    cfg.region = args.region
    # If an endpoint is provided from the command line, update the endpoint in the configuration with the provided endpoint.
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # Create an OSS client.
    client = oss.Client(cfg)

    # Create an RTC state configuration and apply it to the specified bucket.
    result = client.put_bucket_rtc(oss.PutBucketRtcRequest(
            bucket=args.bucket,
            rtc_configuration=oss.RtcConfiguration(
                rtc=oss.ReplicationTimeControl(
                    status=args.status,
                ),
                id=args.rule_id,
            ),
    ))

    # Display the basic response information.
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id}')

if __name__ == "__main__":
    main()

Query replication locations

The following code retrieves the regions to which data in a bucket can be replicated, using oss.GetBucketReplicationLocationRequest. The response includes any transfer type constraints for each destination region.

import argparse
import alibabacloud_oss_v2 as oss

# Create a command-line argument parser.
parser = argparse.ArgumentParser(description="get bucket replication location sample")
# Add the command-line arguments.
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')


def main():
    # Parse the command-line arguments.
    args = parser.parse_args()

    # Obtain access credentials from environment variables.
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # Use the default configuration of the SDK.
    cfg = oss.config.load_default()
    # Specify the credential provider.
    cfg.credentials_provider = credentials_provider
    # Specify the region.
    cfg.region = args.region
    # If an endpoint is provided from the command line, update the endpoint in the configuration with the provided endpoint.
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # Create an OSS client.
    client = oss.Client(cfg)

    # Query the regions to which data in the specified bucket can be replicated.
    result = client.get_bucket_replication_location(oss.GetBucketReplicationLocationRequest(
            bucket=args.bucket,
    ))

    # Display the basic response information.
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' replication location: {result.replication_location},'
          f' location transfer type constraint: {result.replication_location.location_transfer_type_constraint},'
          f' locationrtc constraint: {result.replication_location.locationrtc_constraint},'
    )

    # If restrictions on destination regions for data replication exist, display region details and transfer types.
    if result.replication_location.location_transfer_type_constraint.location_transfer_types:
        for r in result.replication_location.location_transfer_type_constraint.location_transfer_types:
            print(f'result: location: {r.location}, transfer types: {r.transfer_types}')

if __name__ == "__main__":
    main()

Query replication progress

OSS tracks two types of replication progress:
Historical replication: expressed as a percentage. Only available when historical object replication is enabled for the bucket.
Incremental replication: expressed as a point in time. All objects uploaded to the source bucket before this point in time have been replicated.

The following code queries the replication progress for a specific rule ID using oss.GetBucketReplicationProgressRequest.

import argparse
import alibabacloud_oss_v2 as oss

# Create a command-line argument parser.
parser = argparse.ArgumentParser(description="get bucket replication progress sample")
# Add the command-line arguments.
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
parser.add_argument('--rule_id', help='The ID of the data replication rule to query.', required=True)

def main():
    # Parse the command-line arguments.
    args = parser.parse_args()

    # Obtain access credentials from environment variables.
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # Use the default configuration of the SDK.
    cfg = oss.config.load_default()
    # Specify the credential provider.
    cfg.credentials_provider = credentials_provider
    # Specify the region.
    cfg.region = args.region
    # If an endpoint is provided from the command line, update the endpoint in the configuration with the provided endpoint.
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # Create an OSS client.
    client = oss.Client(cfg)

    # Query the data replication progress.
    result = client.get_bucket_replication_progress(oss.GetBucketReplicationProgressRequest(
            bucket=args.bucket,
            rule_id=args.rule_id,
    ))

    # Display the basic response information.
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' replication progress: {result.replication_progress}')

    # If the data replication rule exists, display rule details.
    if result.replication_progress.rules:
        for r in result.replication_progress.rules:
            print(f'result: historical object replication: {r.historical_object_replication}, '
                  f'progress: {r.progress}, '
                  f'id: {r.id}, '
                  f'prefix set: {r.prefix_set}, '
                  f'action: {r.action}, '
                  f'destination: {r.destination}, '
                  f'status: {r.status}')

if __name__ == "__main__":
    main()

Disable data replication

To disable data replication, delete the replication rule configured on the source bucket. The following code deletes a replication rule by its rule ID using oss.DeleteBucketReplicationRequest.

import argparse
import alibabacloud_oss_v2 as oss

# Create a command-line argument parser.
parser = argparse.ArgumentParser(description="delete bucket replication sample")

# Add the command-line arguments.
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
parser.add_argument('--rule_id', help='The ID of the data replication rule to delete.', required=True)

def main():
    # Parse the command-line arguments.
    args = parser.parse_args()

    # Obtain access credentials from environment variables.
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # Use the default configuration of the SDK.
    cfg = oss.config.load_default()

    # Specify the credential provider.
    cfg.credentials_provider = credentials_provider

    # Specify the region.
    cfg.region = args.region

    # If an endpoint is provided from the command line, update the endpoint in the configuration with the provided endpoint.
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # Create an OSS client.
    client = oss.Client(cfg)

    # Delete the replication rule.
    result = client.delete_bucket_replication(oss.DeleteBucketReplicationRequest(
            bucket=args.bucket,
            replication_rules=oss.ReplicationRules(
                ids=[args.rule_id],
            ),
    ))

    # Display the basic response information.
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},')

if __name__ == "__main__":
    main()