資料複製是以非同步(近即時)方式將源Bucket中的檔案(Object)以及對Object的建立、更新和刪除等操作自動複製到目標Bucket。OSS支援跨地區複製(Cross-Region Replication)和同地區複製(Same-Region Replication)。
注意事項
本文範例程式碼以華東1(杭州)的地區ID
cn-hangzhou為例,預設使用外網Endpoint,如果您希望通過與OSS同地區的其他阿里雲產品訪問OSS,請使用內網Endpoint。關於OSS支援的Region與Endpoint的對應關係,請參見OSS地區和訪問網域名稱。阿里雲帳號預設擁有資料複製的相關許可權。如果您希望通過RAM使用者或者STS的方式執行資料複製相關操作,例如:
開啟資料複製,您必須擁有
oss:PutBucketReplication許可權。開啟或關閉資料複製時間控制(RTC)功能,您必須擁有
oss:PutBucketRtc許可權。查看資料複製規則,您必須擁有
oss:GetBucketReplication許可權。查看可複製的目標地區,您必須擁有
oss:GetBucketReplicationLocation許可權。查看資料複製進度,您必須擁有
oss:GetBucketReplicationProgress許可權。關閉資料複製,您必須擁有
oss:DeleteBucketReplication許可權
範例程式碼
開啟資料複製
開啟資料複製前,請確保源儲存空間與目標儲存空間同時處於非版本化或已啟用版本控制狀態。
以下代碼用於開啟資料複製,將源Bucket中的資料複製到相同或不同地區下的目標Bucket。
import argparse
import alibabacloud_oss_v2 as oss
# 建立一個命令列參數解析器
parser = argparse.ArgumentParser(description="put bucket replication sample")
# 添加需要的命令列參數
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
parser.add_argument('--sync_role', help='The role that you want to authorize OSS to use to replicate data', required=True)
parser.add_argument('--target_bucket', help='The destination bucket to which data is replicated', required=True)
parser.add_argument('--target_location', help='The region in which the destination bucket is located', required=True)
def main():
# 解析命令列參數
args = parser.parse_args()
# 從環境變數中載入訪問憑證資訊
credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()
# 使用SDK的預設配置
cfg = oss.config.load_default()
# 設定訪問憑證提供者
cfg.credentials_provider = credentials_provider
# 設定地區
cfg.region = args.region
# 如果指定了endpoint,則設定endpoint到配置中
if args.endpoint is not None:
cfg.endpoint = args.endpoint
# 建立OSS用戶端
client = oss.Client(cfg)
# 配置並執行PutBucketReplication請求
result = client.put_bucket_replication(oss.PutBucketReplicationRequest(
bucket=args.bucket, # 源儲存空間名稱
replication_configuration=oss.ReplicationConfiguration(
rules=[oss.ReplicationRule(
source_selection_criteria=oss.ReplicationSourceSelectionCriteria(
sse_kms_encrypted_objects=oss.SseKmsEncryptedObjects(
status=oss.StatusType.ENABLED,
),
),
rtc=oss.ReplicationTimeControl(
status='disabled', # 禁用複製時間控制
),
destination=oss.ReplicationDestination(
bucket=args.target_bucket, # 目標儲存空間名稱
location=args.target_location, # 目標儲存空間所在地區
transfer_type=oss.TransferType.INTERNAL, # 傳輸類型
),
historical_object_replication=oss.HistoricalObjectReplicationType.DISABLED, # 禁用歷史資料複製功能
sync_role=args.sync_role, # 同步角色
status='Disabled', # 禁用該規則
prefix_set=oss.ReplicationPrefixSet(
prefixs=['aaa/', 'bbb/'], # 首碼集,用於指定哪些首碼的對象需要複製
),
action='ALL', # 所有操作
)],
),
))
# 列印請求結果的狀態代碼和請求ID
print(f'status code: {result.status_code}, request id: {result.request_id}')
if __name__ == "__main__":
main()
查看資料複製規則
以下代碼用於查看Bucket的資料複製規則。
import argparse
import alibabacloud_oss_v2 as oss
# 建立一個命令列參數解析器
parser = argparse.ArgumentParser(description="get bucket replication sample")
# 添加必需的命令列參數
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
def main():
# 解析命令列參數
args = parser.parse_args()
# 從環境變數中載入訪問憑證
credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()
# 使用SDK的預設配置
cfg = oss.config.load_default()
# 設定訪問憑證提供者
cfg.credentials_provider = credentials_provider
# 設定地區
cfg.region = args.region
# 如果提供了endpoint,則設定endpoint到配置中
if args.endpoint is not None:
cfg.endpoint = args.endpoint
# 建立OSS用戶端
client = oss.Client(cfg)
# 擷取儲存空間的複製規則
result = client.get_bucket_replication(oss.GetBucketReplicationRequest(
bucket=args.bucket,
))
# 列印響應的基本資料
print(f'status code: {result.status_code},'
f' request id: {result.request_id},'
f' replication configuration: {result.replication_configuration}')
# 如果存在資料複製規則,則列印詳細的複製規則資訊
if result.replication_configuration.rules:
for r in result.replication_configuration.rules:
print(f'result: source selection criteria: {r.source_selection_criteria}, '
f'rtc: {r.rtc}, destination: {r.destination}, '
f'historical object replication: {r.historical_object_replication}, '
f'sync role: {r.sync_role}, status: {r.status}, '
f'encryption configuration: {r.encryption_configuration}, '
f'id: {r.id}, prefix set: {r.prefix_set}, action: {r.action}')
if __name__ == "__main__":
main()
設定資料複製時間控制(RTC)
以下代碼用於為已有的跨地區複製規則開啟或關閉資料複製時間控制(RTC)功能。
import argparse
import alibabacloud_oss_v2 as oss
# 建立一個命令列參數解析器
parser = argparse.ArgumentParser(description="put bucket rtc sample")
# 添加必需的命令列參數
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
parser.add_argument('--status', help='Specifies whether to enable RTC. Valid values: disabled, enabled', default='disabled')
parser.add_argument('--rule_id', help='The ID of the data replication rule for which you want to configure RTC.', required=True)
def main():
# 解析命令列參數
args = parser.parse_args()
# 從環境變數中載入訪問憑證
credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()
# 使用SDK的預設配置
cfg = oss.config.load_default()
# 設定訪問憑證提供者
cfg.credentials_provider = credentials_provider
# 設定地區
cfg.region = args.region
# 如果提供了endpoint,則設定endpoint到配置中
if args.endpoint is not None:
cfg.endpoint = args.endpoint
# 建立OSS用戶端
client = oss.Client(cfg)
# 配置RTC(Replication Time Control)並將其應用於指定的儲存空間
result = client.put_bucket_rtc(oss.PutBucketRtcRequest(
bucket=args.bucket,
rtc_configuration=oss.RtcConfiguration(
rtc=oss.ReplicationTimeControl(
status=args.status,
),
id=args.rule_id,
),
))
# 列印響應的基本資料
print(f'status code: {result.status_code},'
f' request id: {result.request_id}')
if __name__ == "__main__":
main()
查看可複製的目標地區
以下代碼用於查看Bucket的資料可複製的目標地區列表。
import argparse
import alibabacloud_oss_v2 as oss
# 建立一個命令列參數解析器
parser = argparse.ArgumentParser(description="get bucket replication location sample")
# 添加必需的命令列參數
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
def main():
# 解析命令列參數
args = parser.parse_args()
# 從環境變數中載入訪問憑證
credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()
# 使用SDK的預設配置
cfg = oss.config.load_default()
# 設定訪問憑證提供者
cfg.credentials_provider = credentials_provider
# 設定地區
cfg.region = args.region
# 如果提供了endpoint,則設定endpoint到配置中
if args.endpoint is not None:
cfg.endpoint = args.endpoint
# 建立OSS用戶端
client = oss.Client(cfg)
# 擷取儲存空間的複製位置資訊
result = client.get_bucket_replication_location(oss.GetBucketReplicationLocationRequest(
bucket=args.bucket,
))
# 列印響應的基本資料
print(f'status code: {result.status_code},'
f' request id: {result.request_id},'
f' replication location: {result.replication_location},'
f' location transfer type constraint: {result.replication_location.location_transfer_type_constraint},'
# f' location: {result.replication_location.location_transfer_type_constraint.location_transfer_types[0].location},'
# f' transfer types: {result.replication_location.location_transfer_type_constraint.location_transfer_types[0].transfer_types},'
# f' location: {result.replication_location.location_transfer_type_constraint.location_transfer_types[1].location},'
# f' transfer types: {result.replication_location.location_transfer_type_constraint.location_transfer_types[1].transfer_types},'
f' locationrtc constraint: {result.replication_location.locationrtc_constraint},'
)
# 如果存在位置傳輸類型約束資訊,則列印詳細的位置和傳輸類型資訊
if result.replication_location.location_transfer_type_constraint.location_transfer_types:
for r in result.replication_location.location_transfer_type_constraint.location_transfer_types:
print(f'result: location: {r.location}, transfer types: {r.transfer_types}')
if __name__ == "__main__":
main()
查看資料複製進度
資料複製進度分為歷史資料複製進度和新寫入資料複製進度。
歷史資料複製進度用百分比表示,僅對開啟了歷史資料複製的儲存空間有效。
新寫入資料複製進度用新寫入資料的時間點表示,代表這個時間點之前的資料已複製完成。
以下代碼用於查看Bucket中指定規則ID的資料複製進度。
import argparse
import alibabacloud_oss_v2 as oss
# 建立一個命令列參數解析器
parser = argparse.ArgumentParser(description="get bucket replication progress sample")
# 添加必需的命令列參數
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
parser.add_argument('--rule_id', help='The ID of the data replication rule for which you want to configure RTC.', required=True)
def main():
# 解析命令列參數
args = parser.parse_args()
# 從環境變數中載入訪問憑證
credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()
# 使用SDK的預設配置
cfg = oss.config.load_default()
# 設定訪問憑證提供者
cfg.credentials_provider = credentials_provider
# 設定地區
cfg.region = args.region
# 如果提供了endpoint,則設定endpoint到配置中
if args.endpoint is not None:
cfg.endpoint = args.endpoint
# 建立OSS用戶端
client = oss.Client(cfg)
# 擷取儲存空間的複製進度
result = client.get_bucket_replication_progress(oss.GetBucketReplicationProgressRequest(
bucket=args.bucket,
rule_id=args.rule_id,
))
# 列印響應的基本資料
print(f'status code: {result.status_code},'
f' request id: {result.request_id},'
f' replication progress: {result.replication_progress}')
# 如果存在資料複製規則,則列印詳細的複製規則資訊
if result.replication_progress.rules:
for r in result.replication_progress.rules:
print(f'result: historical object replication: {r.historical_object_replication}, '
f'progress: {r.progress}, '
f'id: {r.id}, '
f'prefix set: {r.prefix_set}, '
f'action: {r.action}, '
f'destination: {r.destination}, '
f'status: {r.status}')
if __name__ == "__main__":
main()
關閉資料複製
通過刪除儲存空間的複製規則,您可以關閉源儲存空間到目標儲存空間的資料複製關係。
以下代碼用於刪除Bucket中指定規則ID的資料複製關係。
import argparse
import alibabacloud_oss_v2 as oss
# 建立一個命令列參數解析器
parser = argparse.ArgumentParser(description="delete bucket replication sample")
# 添加必需的命令列參數
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
parser.add_argument('--rule_id', help='The ID of the data replication rule for which you want to configure RTC.', required=True)
def main():
# 解析命令列參數
args = parser.parse_args()
# 從環境變數中載入訪問憑證
credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()
# 使用SDK的預設配置
cfg = oss.config.load_default()
# 設定訪問憑證提供者
cfg.credentials_provider = credentials_provider
# 設定地區
cfg.region = args.region
# 如果提供了endpoint,則設定endpoint到配置中
if args.endpoint is not None:
cfg.endpoint = args.endpoint
# 建立OSS用戶端
client = oss.Client(cfg)
# 刪除儲存空間的複製規則
result = client.delete_bucket_replication(oss.DeleteBucketReplicationRequest(
bucket=args.bucket,
replication_rules=oss.ReplicationRules(
ids=[args.rule_id],
),
))
# 列印響應的基本資料
print(f'status code: {result.status_code},'
f' request id: {result.request_id},')
if __name__ == "__main__":
main()