資料災備(DBS)除支援阿里雲資料庫和阿里雲ECS自建資料庫的災備外,還支援對雲下及其他雲平台資料庫進行災備。
注意事項
若備份庫表存在表結構不合理、大表、大欄位等情況,備份執行個體的規格過小可能會導致後續備份執行個體資源不足,從而引發備份異常。因此,建議您在建立時選擇較高規格的備份執行個體,以免後續備份出現異常。
操作步驟
產品自動備份
購買備份執行個體(邏輯備份)
- 登入Data Management 5.0。
在頂部功能表列中,選擇安全與規範(DBS) > 資料災備(DBS) > 災備資料來源。
說明若您使用的是極簡模式的控制台,請單擊控制台左上方的
表徵圖,選擇全部功能 > 安全與規範(DBS) > 資料災備(DBS) > 災備資料來源。在上方選擇地區,在雲下及他雲資料庫 > 產品自動備份頁簽下,單擊目標資料來源ID進入資料來源詳情頁。
在備份策略頁面中,單擊配置備份策略。
在選擇備份計劃頁面單擊購買備份計劃,進入DBS售賣頁。
配置如下參數,單擊頁面右下角的立即購買。
配置項
說明
商品類型
請選擇備份執行個體(訂用帳戶)。
備份執行個體地區
選擇要存放備份資料的地區。
說明請確保備份執行個體所在地區與ECS執行個體所在地區相同。
資料來源類型
當前僅支援MySQL。
規格
規格越高,備份與恢複的效能越高,支援的規格為:micro(入門型)、small(低配型)、medium(中配型)、large(高配型)、xlarge(高配型-無資料傳輸量上限)。
說明如果資料庫執行個體(例如生產環境的資料庫)需要高效能的備份執行個體快速執行備份與恢複任務,建議選擇xlarge或large規格,擷取更高的備份恢複效能。
對備份恢複效能(速度)要求不高,您可以通過計算,選擇性價比最高的備份執行個體規格。更多資訊,請參見如何選擇備份執行個體規格。
若備份庫表存在表結構不合理、大表、大欄位等情況,備份執行個體的規格過小可能會導致後續備份執行個體資源不足,從而引發備份異常。因此,建議您在建立時選擇較高規格的備份執行個體,以免後續備份出現異常。
備份方式
請選擇邏輯備份。
儲存空間
您建立時無需選擇容量,後續根據實際存入DBS內建儲存中的資料量計費。計費詳情,請參見儲存費用。
資源群組
配置資源群組。選擇預設或自訂的資源群組,方便備份執行個體管理。
購買數量
按需選擇購買數量,多個資料庫執行個體需要建立多個備份執行個體,例如您希望備份資料庫執行個體A與資料庫執行個體B,需要購買2個備份執行個體。
購買時間長度
選擇該備份執行個體的購買時間長度。
在確認訂單頁面,確認訂單資訊,閱讀並選中服務合約,單擊去支付。
支付成功後,請返回步驟5的選擇備份計劃頁面,單擊已完成支付,即可查看到已建立的備份執行個體。

配置備份策略
在選擇備份計劃頁面選中待配置的備份執行個體,並單擊下一步。

在選擇庫表頁面,選中需要備份的庫或表,單擊
移動到已選擇對象框中,單擊提交。
提交成功後,您可以在備份策略頁面的邏輯備份頁簽下,單擊啟動按鈕啟動備份。
單擊啟動後系統會立即發起一次全量和增量備份任務。
說明如果您期望先完成一些其他動作(例如修改備份策略等),您也可以選擇當前暫不啟動備份,但後續系統會根據備份策略在備份時間自動啟動備份。
使用者自動備份
目前僅支援MySQL 5.5版本的資料來源。
目前僅支援華東1(杭州)地區。
配置備份源以及上傳備份檔案
- 登入Data Management 5.0。
在頂部功能表列中,選擇安全與規範(DBS) > 資料災備(DBS) > 災備資料來源。
說明若您使用的是極簡模式的控制台,請單擊控制台左上方的
表徵圖,選擇全部功能 > 安全與規範(DBS) > 資料災備(DBS) > 災備資料來源。在上方選擇地區,在雲下及他雲資料庫頁簽下,根據資料來源類型選擇新增入口。
單擊新增資料來源,在彈出的對話方塊中,配置如下資訊,並選中目標備份計劃,單擊頁面右下角下一步。

配置項
說明
資料來源名稱
建議配置具有業務意義的名稱,便於後續識別。
引擎類型
資料庫引擎類型,當前僅支援MySQL。
引擎版本
選擇資料庫引擎的版本。
引擎參數
{"lower_case_table_names":1}
若沒有可選備份計劃,單擊購買備份計劃,進入DBS售賣頁,購買備份計劃。
說明
商品類型
請選擇備份執行個體(訂用帳戶),不支援隨用隨付。
備份執行個體地區
選擇要存放備份資料的地區。
資料來源類型
選擇MySQL。
規格
選擇xmicro。該規格提供的免費資料量額度等,請參見備份計劃規格。
備份方式
選擇物理備份。
儲存空間
您建立時無需選擇容量,後續根據實際存入DBS內建儲存中的資料量計費。計費詳情,請參見儲存費用。
資源群組
配置資源群組。選擇預設或自訂的資源群組,方便備份執行個體管理。
購買數量
按需選擇購買數量,多個資料庫執行個體需要建立多個備份執行個體,例如您希望備份資料庫執行個體A與資料庫執行個體B,需要購買2個備份執行個體。
購買時間長度
選擇該備份執行個體的購買時間長度。
將備份組上傳到指定Bucket中。上傳方法,請參見使用者自動備份資料上傳指南。
上傳完成後,單擊確定。
查看備份資訊
在使用者自動備份頁簽,單擊目標資料來源ID。


配置備份策略
在使用者自動備份頁簽,單擊目標資料來源操作列的查看備份策略。


配置完成之後,單擊確定。
查看和下載備份資料
在使用者自動備份頁簽,單擊目標資料來源ID。
單擊左側導覽列中的備份資料。
說明使用者執行上傳資料指令碼並完成資料來源建立後,當使用者新產生一個備份組資料,資料備份管理系統會同步這個備份資料到備份資料頁面。
單擊目標備份組操作列的下載按鈕擷取下載連結,將備份組下載到本地。
建立恢複任務
恢複任務要求資料備份頁面上必須存在已產生的備份組,並且該備份組的狀態需為完成。
在使用者自動備份頁簽,單擊目標資料來源ID。
單擊左側導覽列中的備份資料,單擊建立恢複任務,手動設定恢複參數。
參數名稱
說明
恢複任務名稱
請輸入本次恢複任務的名稱。建議配置具有業務意義的名稱,便於後續識別。
恢複位置
選擇恢複執行個體位置,預設為恢複到RDS新執行個體。
資料庫所在位置
選擇資料庫所在位置,預設為RDS。
執行個體地區
選擇執行個體所在地區,目前僅支援華東1(杭州)地區。
VPC
選擇建立的專用網路。
VSwitch
選擇建立的交換器。
執行個體系列
選擇恢複的執行個體系列。
執行個體規格
選擇恢複的執行個體規格。
儲存空間
選擇所需儲存空間。
恢複方式
選擇恢複方式,目前僅支援按時間點恢複。
還原時間
選擇還原時間,可選擇恢復範圍請參考恢複方式參數後的資料。
配置完成之後單擊提交,會產生一個按所選還原時間點恢複的恢複任務,該任務資訊將以列表的形式展示在恢複任務頁面。
查看恢複任務
在使用者自動備份頁簽,單擊目標資料來源ID。
單擊左側導覽列中的任務管理 > 恢複任務。
點擊恢複結果列中的執行個體ID,可跳轉至恢複的RDS執行個體基本資料頁面。
查看恢複演練
在使用者自動備份頁簽,單擊目標資料來源ID。
單擊左側導覽列中的恢複演練。
恢複演練概覽指標
查看恢複演練概覽,包含恢複任務成功率、恢複時間長度均值、資料備份演練覆蓋率、記錄備份演練覆蓋率。
名稱
說明
恢複任務成功率
在恢復點在篩選時間範圍內的恢複任務成功率。
恢複時間長度均值
在篩選時間範圍內恢複成功任務的平均耗時。
資料備份演練覆蓋率
在篩選時間範圍內資料備份在開源MySQL或RDS MySQL上執行過恢複演練的覆蓋率。
記錄備份演練覆蓋率
在篩選時間範圍內的記錄備份在開源MySQL或RDS MySQL上執行過恢複演練的覆蓋率。
恢複演練時間軸
通過時間軸展示在篩選時間範圍內的各個時間點的恢複演練詳情,在時間軸上點擊滑鼠左鍵可以查看目前時間點的演練資訊。
備份資料恢複演練詳情
單擊資料備份,查看資料備份恢複演練詳情列表。單擊演練結果列執行個體ID,進入恢複的rds 執行個體的基本資料頁面。
單擊記錄備份,可以查看記錄備份恢複演練詳情列表。
使用者自動備份資料上傳指南
準備工作
環境依賴
命令工具:Bash、Python3。
Python相關:oss2、alibabacloud_openapi_util、alibabacloud_tea_openapi、alibabacloud_tea_util。
## 安裝阿里雲OpenAPI SDK
pip3 install --upgrade pip
pip3 install -i https://mirrors.aliyun.com/pypi/simple/ oss2 alibabacloud_openapi_util alibabacloud_tea_openapi alibabacloud_tea_util完整上傳指令碼
備份資料上傳指令碼分為兩部分:
Bash指令碼
指令碼執行處理入口,按需替換實際執行的參數。
boot_backup.sh:負責類比本地xtrabackup全量備份的流程。
#!/bin/bash
## 阿里雲帳號AccessKey擁有所有API的存取權限,風險很高。強烈建議您建立並使用RAM使用者進行API訪問或日常營運
## 強烈建議不要把 AccessKey 和 AccessKeySecret 儲存到代碼裡,會存在密鑰泄漏風險,您可以根據業務需要,儲存到設定檔裡
AK=<ALIBABA_CLOUD_ACCESS_KEY_ID>
SK=<ALIBABA_CLOUD_ACCESS_KEY_SECRET>
DBS_API=dbs-api.<您的資料來源所在地區ID,例如cn-hangzhou>.aliyuncs.com
DATASOURCE_ID=<您的資料來源ID>
## 擷取指令碼當前路徑
BASE_PATH=$(cd `dirname $0`; pwd)
TS=`date +%Y%m%d_%H%M%S`
XTRA_DATA_DIR=~/tool/xtrabackup_data/$TS
mkdir -p $XTRA_DATA_DIR
## 執行xtrabackup備份命令,將錯誤記錄檔輸出的到xtrabackup.log檔案
~/innobackupex --defaults-file=/etc/my.cnf --user='root' --password='root' --host='localhost' --port=3306 --socket='/var/lib/mysql/mysql.sock' --parallel=4 $XTRA_DATA_DIR/ 2>$XTRA_DATA_DIR/xtrabackup.log
## 擷取當前xtrabackup備份的目錄
backup_dir=`ls $XTRA_DATA_DIR | grep -v xtrabackup.log | head -n1`
echo -e "\033[33mexecute innobackupex success, backup_dir: $backup_dir" && echo -n -e "\033[0m" && chmod 755 $XTRA_DATA_DIR/$backup_dir
## 打包成tar.gz檔案
cd $XTRA_DATA_DIR/$backup_dir && tar -czvf ../$backup_dir.tar.gz . && file ../$backup_dir.tar.gz
echo -e "\033[33mpackage to $backup_dir.tar.gz" && echo -n -e "\033[0m" && sleep 2
## 調用Python指令碼進行上傳備份資料,並註冊備份組中繼資料
python3 $BASE_PATH/upload_and_register_backup_set.py --access_key_id $AK --access_key_secret $SK --endpoint $DBS_API --datasource_id $DATASOURCE_ID --region_code=cn-hangzhou --data_type=FullBackup --file_path=$XTRA_DATA_DIR/$backup_dir.tar.gz --xtrabackup_log_path=$XTRA_DATA_DIR/xtrabackup.logPython指令碼
upload_and_register_backup_set.py:用於上傳全量和記錄備份資料,解析對應的中繼資料,並進行備份組中繼資料的註冊。
# -*- coding: utf-8 -*- # This file is auto-generated, don't edit it. Thanks. import os import argparse import re import time import json from datetime import datetime import oss2 from alibabacloud_openapi_util.client import Client as OpenApiUtilClient from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_tea_openapi.client import Client as OpenApiClient from alibabacloud_tea_util import models as util_models import xtrabackup_info_parser import xtrabackup_log_parser def init_command_args(): parser = argparse.ArgumentParser(description="A sample command-line parser.") parser.add_argument("--access_key_id", help="Aliyun AccessKeyId.") parser.add_argument("--access_key_secret", help="Aliyun AccessKeySecret.") parser.add_argument("--endpoint", help="Aliyun API Endpoint.") parser.add_argument("--region_code", help="Aliyun DataSource RegionCode.") parser.add_argument("--datasource_id", help="Aliyun DataSourceId.") parser.add_argument("--data_type", help="BackupSet DataType: FullBackup | LogBackup.") parser.add_argument("--file_path", help="BackupSet File Path.") parser.add_argument("--xtrabackup_info_path", help="Xtrabackup Info Path.") parser.add_argument("--xtrabackup_log_path", help="Xtrabackup Log Path.") parser.add_argument("--begin_time", help="Binlog Begin Time.") parser.add_argument("--end_time", help="Binlog End Time.") args = parser.parse_args() if args.access_key_id: print(f"access_key_id: ************") if args.access_key_secret: print(f"access_key_secret: ************") if args.endpoint: print(f"endpoint: {args.endpoint}") if args.region_code: print(f"region_code: {args.region_code}") if args.datasource_id: print(f"datasource_id: {args.datasource_id}") if args.data_type: print(f"data_type: {args.data_type}") if args.file_path: print(f"file_path: {args.file_path}") if args.xtrabackup_info_path: print(f"xtrabackup_info_path: {args.xtrabackup_info_path}") if args.xtrabackup_log_path: print(f"xtrabackup_log_path: {args.xtrabackup_log_path}") if args.begin_time: print(f"begin_time: {args.begin_time}") if args.end_time: print(f"end_time: {args.end_time}") print('\n') return args def date_to_unix_timestamp(date_str): dt_obj = datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S") # 使用.time()方法得到時間元組,然後用time.mktime轉換為秒級時間戳記 timestamp_seconds = time.mktime(dt_obj.timetuple()) return int(timestamp_seconds) * 1000 def create_oss_client(params): # 阿里雲OSS認證資訊 access_key_id = params['AccessKeyId'] access_key_secret = params['AccessKeySecret'] security_token = params['SecurityToken'] bucket_name = params['BucketName'] endpoint = params['OssEndpoint'] # 初始化OSS用戶端 auth = oss2.StsAuth(access_key_id, access_key_secret, security_token) return oss2.Bucket(auth, endpoint, bucket_name) def upload_oss_file(oss_client, file_path, object_key): """ 分區上傳大檔案到OSS :param oss_client: :param file_path: 本地檔案路徑 :param object_key: OSS中的對象鍵,即檔案名稱 """ # 設定分區大小,單位為位元組,預設1MB part_size = 1024 * 1024 * 5 # 初始化分區上傳 upload_id = oss_client.init_multipart_upload(object_key).upload_id # 開啟檔案並讀取內容 with open(file_path, 'rb') as file_obj: parts = [] while True: data = file_obj.read(part_size) if not data: break # 上傳分區 result = oss_client.upload_part(object_key, upload_id, len(parts) + 1, data) parts.append(oss2.models.PartInfo(len(parts) + 1, result.etag)) # 完成分區上傳 oss_client.complete_multipart_upload(object_key, upload_id, parts) class OssUploader: def __init__(self, access_key_id, access_key_secret, endpoint, region_code, datasource_id): self.access_key_id = access_key_id self.access_key_secret = access_key_secret self.endpoint = endpoint self.region_code = region_code self.datasource_id = datasource_id config = open_api_models.Config(access_key_id, access_key_secret) # Endpoint 請參考 https://api.aliyun.com/product/Rds config.endpoint = endpoint self.client = OpenApiClient(config) """ 註冊備份組中繼資料 """ def configure_backup_set_info(self, req_param): params = open_api_models.Params( # 介面名稱, action='ConfigureBackupSetInfo', # 介面版本, version='2021-01-01', # 介面協議, protocol='HTTPS', # 介面 HTTP 方法, method='POST', auth_type='AK', style='RPC', # 介面 PATH, pathname='/', # 介面請求體內容格式, req_body_type='json', # 介面響應體內容格式, body_type='json' ) # runtime options runtime = util_models.RuntimeOptions() request = open_api_models.OpenApiRequest( query=OpenApiUtilClient.query(req_param) ) # 傳回值為 Map 類型,可從 Map 中獲得三類資料:響應體 body、回應標頭 headers、HTTP 返回的狀態代碼 statusCode。 print(f"ConfigureBackupSetInfo request: {req_param}") data = self.client.call_api(params, request, runtime) print(f"ConfigureBackupSetInfo response: {data}") return data['body']['Data'] """ 擷取oss上傳資訊 """ def describe_bak_datasource_storage_access_info(self, req_param): params = open_api_models.Params( # 介面名稱, action='DescribeBakDataSourceStorageAccessInfo', # 介面版本, version='2021-01-01', # 介面協議, protocol='HTTPS', # 介面 HTTP 方法, method='POST', auth_type='AK', style='RPC', # 介面 PATH, pathname='/', # 介面請求體內容格式, req_body_type='json', # 介面響應體內容格式, body_type='json' ) # runtime options runtime = util_models.RuntimeOptions() request = open_api_models.OpenApiRequest( query=OpenApiUtilClient.query(req_param) ) # 傳回值為 Map 類型,可從 Map 中獲得三類資料:響應體 body、回應標頭 headers、HTTP 返回的狀態代碼 statusCode。 print(f"DescribeBakDataSourceStorageAccessInfo request: {req_param}") data = self.client.call_api(params, request, runtime) print(f"DescribeBakDataSourceStorageAccessInfo response: {data}") return data['body']['Data'] def _fetch_oss_access_info(self, params): info = self.describe_bak_datasource_storage_access_info({ 'RegionId': params['RegionId'], 'DataSourceId': params['DataSourceId'], 'RegionCode': params['RegionCode'], 'BackupType': params['BackupType'], 'BackupSetId': params['BackupSetId'] }) return info['OssAccessInfo'] def upload_and_register_backup_set(self, file_path, data_type, extra_meta): filename = os.path.basename(file_path) params = {'BackupMode': 'Automated', 'BackupMethod': 'Physical', 'RegionId': self.region_code, 'RegionCode': self.region_code, 'DataSourceId': self.datasource_id, 'BackupSetName': filename, 'ExtraMeta': extra_meta, 'BackupType': data_type, 'UploadStatus': 'WaitingUpload'} # 首次註冊備份組,返回備份組ID data = self.configure_backup_set_info(params) params['BackupSetId'] = data['BackupSetId'] print(f"------ configure_backup_set_info success: {file_path}, {data_type}, {params['BackupSetId']}, WaitingUpload\n") # 上傳資料到oss oss_info = self._fetch_oss_access_info(params) oss_client = create_oss_client(oss_info) upload_oss_file(oss_client, file_path, oss_info['ObjectKey']) print(f"------ upload_oss_file success: {file_path}, {data_type}, {params['BackupSetId']}\n") # 標記備份組上傳完成 params['UploadStatus'] = 'UploadSuccess' self.configure_backup_set_info(params) print(f"------ configure_backup_set_info success: {file_path}, {data_type}, {params['BackupSetId']}, UploadSuccess\n") if __name__ == '__main__': args = init_command_args() uploader = OssUploader(args.access_key_id, args.access_key_secret, args.endpoint, args.region_code, args.datasource_id) # 全量和記錄備份分別通過不同方式構造extraMeta extra_meta = '{}' if args.data_type == 'FullBackup': obj = {} if args.xtrabackup_log_path is not None: obj = xtrabackup_log_parser.analyze_slave_status(logpath=args.xtrabackup_log_path) elif args.xtrabackup_info_path is not None: parser = xtrabackup_info_parser.ExtraMetaParser(file_path=args.xtrabackup_info_path) obj = parser.get_extra_meta() extra_meta = {'BINLOG_FILE': obj.get('BINLOG_FILE'), 'version': obj.get("SERVER_VERSION"), 'dataBegin': date_to_unix_timestamp(obj.get("START_TIME")), 'dataEnd': date_to_unix_timestamp(obj.get("END_TIME")), 'consistentTime': int(date_to_unix_timestamp(obj.get("END_TIME")) / 1000)} extra_meta = json.dumps(extra_meta) elif args.data_type == 'LogBackup': obj = {'dataBegin': date_to_unix_timestamp(args.begin_time), 'dataEnd': date_to_unix_timestamp(args.end_time)} extra_meta = json.dumps(obj) print(f"get extra meta json string: {extra_meta}") # 上傳資料,並註冊備份組元資訊 uploader.upload_and_register_backup_set(file_path=args.file_path, data_type=args.data_type, extra_meta=extra_meta)xtrabackup_info_parser.py:使用備份檔案xtrabackup_info進行中繼資料解析。
# -*- coding: utf-8 -*- # This file is auto-generated, don't edit it. Thanks. import re import json class ExtraMetaParser: def __init__(self, file_path): self.file_path = file_path pass def _parse_xtrabackup_info(self): config_data = {} with open(self.file_path, 'r') as file: for line in file: line = line.strip() if line and not line.startswith('#'): key, value = line.split('=', 1) config_data[key.strip()] = value.strip() return config_data def get_extra_meta(self): config_data = self._parse_xtrabackup_info() print(f"xtrabackup_info: {config_data}") binlog_pos = config_data.get("binlog_pos") pattern = f"filename '(.*)', position (.*)" match = re.search(pattern, binlog_pos) return {'BINLOG_FILE': match.group(1), 'SERVER_VERSION': config_data.get("server_version"), 'START_TIME': config_data.get("start_time"), 'END_TIME': config_data.get("end_time")}xtrabackup_log_parser.py:使用備份產生的記錄檔xtrabackup.log進行中繼資料解析。
#!/usr/bin/python # coding:utf8 import io import re import sys from datetime import datetime from six import binary_type, text_type def parse_date_part(date_part, time_part): """ 根據給定的日期部分和時間部分,解析並返回完整的日期時間字串。 """ # 擷取當前年份的前兩位 current_century = datetime.now().strftime("%Y")[:2] year_short = date_part[:2] # 組合完整年份 year_full = current_century + year_short date_full = year_full + date_part[2:] datetime_str = date_full + " " + time_part dt = datetime.strptime(datetime_str, "%Y%m%d %H:%M:%S") formatted_datetime = dt.strftime("%Y-%m-%d %H:%M:%S") return text_type(formatted_datetime) def analyze_slave_status(logpath=None): slave_status = {} start_time_pattern = ( r"(\d{6}) (\d{2}:\d{2}:\d{2}) .*Connecting to MySQL server host:" ) """ 240925 17:46:58 completed OK! 240925 02:22:58 innobackupex: completed OK! 240925 02:22:58 xtrabackup: completed OK! """ end_time_pattern = r"(\d{6}) (\d{2}:\d{2}:\d{2}) .*completed OK!" with io.open(logpath, "rb") as fp: lines = fp.read().splitlines() for i in reversed(range(len(lines))): line = lines[i] if isinstance(line, binary_type): line = line.decode("utf-8") m = re.search(start_time_pattern, line) if m: # 提取日期和時間部分 date_part = m.group(1) time_part = m.group(2) slave_status["START_TIME"] = parse_date_part(date_part, time_part) continue m = re.search(r"Using server version (\S*)", line) if m: slave_status["SERVER_VERSION"] = text_type(m.group(1)) continue m = re.search("MySQL binlog position:", line) if m: binlog_line = line m = re.search(r"filename '(\S*)'", binlog_line) if m: slave_status["BINLOG_FILE"] = text_type(m.group(1)) m = re.search(r"position (\d+)", binlog_line) m2 = re.search(r"position '(\d+)'", binlog_line) if m: try: slave_status["BINLOG_POS"] = int(m.group(1)) except ValueError: pass elif m2: try: slave_status["BINLOG_POS"] = int(m2.group(1)) except ValueError: pass continue m = re.search("consistent_time (\d+)", line) if m: try: slave_status["CONSISTENT_TIME"] = int(m.group(1)) except ValueError: pass continue m = re.search(end_time_pattern, line) if m: date_part = m.group(1) time_part = m.group(2) slave_status["END_TIME"] = parse_date_part(date_part, time_part) continue return slave_status if __name__ == "__main__": logpath = sys.argv[1] slave_status = analyze_slave_status(logpath) print(slave_status)
Bash指令碼處理流程
配置阿里雲AccessKeyId和AccessKeySecret、DBS OpenAPI Endpoint,以及對應的資料來源ID。
#!/bin/bash ## 阿里雲帳號AccessKey擁有所有API的存取權限,風險很高。強烈建議您建立並使用RAM使用者進行API訪問或日常營運 ## 強烈建議不要把 AccessKey 和 AccessKeySecret 儲存到代碼裡,會存在密鑰泄漏風險,您可以根據業務需要,儲存到設定檔裡 AK=<ALIBABA_CLOUD_ACCESS_KEY_ID> SK=<ALIBABA_CLOUD_ACCESS_KEY_SECRET> DBS_API=dbs-api.<您的資料來源所在地區ID,例如cn-hangzhou>.aliyuncs.com DATASOURCE_ID=<您的資料來源ID>執行xtrabackup備份命令,將備份資料儲存到指定的目錄,同時將錯誤記錄檔輸出到xtrabackup.log檔案中。
## 擷取指令碼當前路徑 BASE_PATH=$(cd `dirname $0`; pwd) TS=`date +%Y%m%d_%H%M%S` XTRA_DATA_DIR=~/tool/xtrabackup_data/$TS mkdir -p $XTRA_DATA_DIR ## 執行xtrabackup備份命令,將錯誤記錄檔輸出的到xtrabackup.log檔案 ~/innobackupex --defaults-file=/etc/my.cnf --user='<您的資料庫帳號,例如root>' --password='<您的資料庫密碼,例如root>' --host='<您的資料庫IP,例如localhost>' --port=<您的資料庫連接埠號碼,例如3306> --socket='/var/lib/mysql/mysql.sock' --parallel=4 $XTRA_DATA_DIR/ 2>$XTRA_DATA_DIR/xtrabackup.log後續將解析xtrabackup.log檔案獲得全量備份組的中繼資料資訊:備份開始時間、備份結束時間、一致性時間點、對應Binlog名稱。
限制說明:
當前執行xtrabackup備份命令,暫不支援壓縮和加密參數(--compress --compress-threads和--encrypt --encrypt-key-file --encrypt-threads),使用xtrabackup備份命令時請先刪除對應的參數。
將檔案目錄形式的全量備份資料進行gzip壓縮和打包成tar.gz格式的檔案。
## 擷取當前xtrabackup備份的目錄 backup_dir=`ls $XTRA_DATA_DIR | grep -v xtrabackup.log | head -n1` echo -e "\033[33mexecute innobackupex success, backup_dir: $backup_dir" && echo -n -e "\033[0m" && chmod 755 $XTRA_DATA_DIR/$backup_dir ## 打包成tar.gz檔案 cd $XTRA_DATA_DIR/$backup_dir && tar -czvf ../$backup_dir.tar.gz . && file ../$backup_dir.tar.gz echo -e "\033[33mpackage to $backup_dir.tar.gz" && echo -n -e "\033[0m" && sleep 2限制說明:當前僅支援檔案格式的備份資料上傳,支援如下格式:
tar:目錄檔案進行tar打包。
tar.gz:目錄檔案進行tar打包,然後進行gzip壓縮。
注意事項:
使用tar命令打包前,需要chmod 755命令修改檔案目錄許可權。
需進入檔案夾根目錄,進行tar命令的打包和壓縮。
調用Python指令碼upload_and_register_backup_set.py進行備份資料的上傳,同時註冊備份組中繼資料。
## 調用Python指令碼進行上傳備份資料,並註冊備份組中繼資料 python3 $BASE_PATH/upload_and_register_backup_set.py --access_key_id $AK --access_key_secret $SK --endpoint $DBS_API --datasource_id $DATASOURCE_ID --region_code=cn-hangzhou --data_type=FullBackup --file_path=$XTRA_DATA_DIR/$backup_dir.tar.gz --xtrabackup_log_path=$XTRA_DATA_DIR/xtrabackup.log參數
說明
--access_key_id
阿里雲AccessKeyId。
--access_key_secret
阿里雲AccessKeySecret。
--endpoint
DBS OpenAPI對應的Endpoint。請參見服務存取點。
--datasource_id
DBS對應的資料來源ID。
--region_code
對應地區資訊。
--data_type
備份資料類型,全量備份組:FullBackup,記錄備份:LogBackup。
--file_path
全量備份資料路徑。
--xtrabackup_log_path
全量備份xtrabackup命令產生的xtrabackup.log檔案路徑。
Python指令碼處理流程
main函數主入口:
根據data_type傳參為FullBackug或者LogBackup,分別構造中繼資料註冊依賴的extra_meta資訊:
說明BINLOG_FILE:對應Binlog名稱。
dataBegin:備份開始時間,毫秒層級時間戳記。
dataEnd:備份結束時間,毫秒層級時間戳記。
consistentTime:一致性時間點,秒級時間戳記。
全量備份extra_meta格式:
{ 'BINLOG_FILE':'mysql-bin.001', 'version':'5.5', 'dataBegin':17274********, 'dataEnd':17274********, 'consistentTime':17274******** }記錄備份extra_meta格式:
{ 'dataBegin':17274********, 'dataEnd':17274******** }
調用OssUploader.upload_and_register_backup_set方法進行備份資料上傳和中繼資料註冊。
if __name__ == '__main__': args = init_command_args() uploader = OssUploader(args.access_key_id, args.access_key_secret, args.endpoint, args.region_code, args.datasource_id) # 全量和記錄備份分別通過不同方式構造extraMeta extra_meta = '{}' if args.data_type == 'FullBackup': obj = {} if args.xtrabackup_log_path is not None: obj = xtrabackup_log_parser.analyze_slave_status(logpath=args.xtrabackup_log_path) elif args.xtrabackup_info_path is not None: parser = xtrabackup_info_parser.ExtraMetaParser(file_path=args.xtrabackup_info_path) obj = parser.get_extra_meta() extra_meta = {'BINLOG_FILE': obj.get('BINLOG_FILE'), 'version': obj.get("SERVER_VERSION"), 'dataBegin': date_to_unix_timestamp(obj.get("START_TIME")), 'dataEnd': date_to_unix_timestamp(obj.get("END_TIME")), 'consistentTime': int(date_to_unix_timestamp(obj.get("END_TIME")) / 1000)} extra_meta = json.dumps(extra_meta) elif args.data_type == 'LogBackup': obj = {'dataBegin': date_to_unix_timestamp(args.begin_time), 'dataEnd': date_to_unix_timestamp(args.end_time)} extra_meta = json.dumps(obj) print(f"get extra meta json string: {extra_meta}") # 上傳資料,並註冊備份組元資訊 uploader.upload_and_register_backup_set(file_path=args.file_path, data_type=args.data_type, extra_meta=extra_meta)OssUploader.upload_and_register_backup_set方法備份資料上傳流程。
class OssUploader: def __init__(self, access_key_id, access_key_secret, endpoint, region_code, datasource_id): self.access_key_id = access_key_id self.access_key_secret = access_key_secret self.endpoint = endpoint self.region_code = region_code self.datasource_id = datasource_id config = open_api_models.Config(access_key_id, access_key_secret) # Endpoint 請參考 https://api.aliyun.com/product/Rds config.endpoint = endpoint self.client = OpenApiClient(config) """ 註冊備份組中繼資料 """ def configure_backup_set_info(self, req_param): params = open_api_models.Params( # 介面名稱, action='ConfigureBackupSetInfo', # 介面版本, version='2021-01-01', # 介面協議, protocol='HTTPS', # 介面 HTTP 方法, method='POST', auth_type='AK', style='RPC', # 介面 PATH, pathname='/', # 介面請求體內容格式, req_body_type='json', # 介面響應體內容格式, body_type='json' ) # runtime options runtime = util_models.RuntimeOptions() request = open_api_models.OpenApiRequest( query=OpenApiUtilClient.query(req_param) ) # 傳回值為 Map 類型,可從 Map 中獲得三類資料:響應體 body、回應標頭 headers、HTTP 返回的狀態代碼 statusCode。 print(f"ConfigureBackupSetInfo request: {req_param}") data = self.client.call_api(params, request, runtime) print(f"ConfigureBackupSetInfo response: {data}") return data['body']['Data'] """ 擷取oss上傳資訊 """ def describe_bak_datasource_storage_access_info(self, req_param): params = open_api_models.Params( # 介面名稱, action='DescribeBakDataSourceStorageAccessInfo', # 介面版本, version='2021-01-01', # 介面協議, protocol='HTTPS', # 介面 HTTP 方法, method='POST', auth_type='AK', style='RPC', # 介面 PATH, pathname='/', # 介面請求體內容格式, req_body_type='json', # 介面響應體內容格式, body_type='json' ) # runtime options runtime = util_models.RuntimeOptions() request = open_api_models.OpenApiRequest( query=OpenApiUtilClient.query(req_param) ) # 傳回值為 Map 類型,可從 Map 中獲得三類資料:響應體 body、回應標頭 headers、HTTP 返回的狀態代碼 statusCode。 print(f"DescribeBakDataSourceStorageAccessInfo request: {req_param}") data = self.client.call_api(params, request, runtime) print(f"DescribeBakDataSourceStorageAccessInfo response: {data}") return data['body']['Data'] def _fetch_oss_access_info(self, params): info = self.describe_bak_datasource_storage_access_info({ 'RegionId': params['RegionId'], 'DataSourceId': params['DataSourceId'], 'RegionCode': params['RegionCode'], 'BackupType': params['BackupType'], 'BackupSetId': params['BackupSetId'] }) return info['OssAccessInfo'] def upload_and_register_backup_set(self, file_path, data_type, extra_meta): filename = os.path.basename(file_path) params = {'BackupMode': 'Automated', 'BackupMethod': 'Physical', 'RegionId': self.region_code, 'RegionCode': self.region_code, 'DataSourceId': self.datasource_id, 'BackupSetName': filename, 'ExtraMeta': extra_meta, 'BackupType': data_type, 'UploadStatus': 'WaitingUpload'} # 首次註冊備份組,返回備份組ID data = self.configure_backup_set_info(params) params['BackupSetId'] = data['BackupSetId'] print(f"------ configure_backup_set_info success: {file_path}, {data_type}, {params['BackupSetId']}, WaitingUpload\n") # 上傳資料到oss oss_info = self._fetch_oss_access_info(params) oss_client = create_oss_client(oss_info) upload_oss_file(oss_client, file_path, oss_info['ObjectKey']) print(f"------ upload_oss_file success: {file_path}, {data_type}, {params['BackupSetId']}\n") # 標記備份組上傳完成 params['UploadStatus'] = 'UploadSuccess' self.configure_backup_set_info(params) print(f"------ configure_backup_set_info success: {file_path}, {data_type}, {params['BackupSetId']}, UploadSuccess\n")