全部產品
Search
文件中心

Data Management:雲下及他雲Database Backup管理

更新時間:Feb 12, 2025

資料災備(DBS)除支援阿里雲資料庫和阿里雲ECS自建資料庫的災備外,還支援對雲下及其他雲平台資料庫進行災備。

注意事項

若備份庫表存在表結構不合理、大表、大欄位等情況,備份執行個體的規格過小可能會導致後續備份執行個體資源不足,從而引發備份異常。因此,建議您在建立時選擇較高規格的備份執行個體,以免後續備份出現異常。

操作步驟

產品自動備份

購買備份執行個體(邏輯備份)

  1. 登入Data Management 5.0
  2. 在頂部功能表列中,選擇安全與規範(DBS) > 資料災備(DBS) > 災備資料來源

    說明

    若您使用的是極簡模式的控制台,請單擊控制台左上方的2023-01-28_15-57-17.png表徵圖,選擇全部功能 > 安全與規範(DBS) > 資料災備(DBS) > 災備資料來源

  3. 在上方選擇地區,在雲下及他雲資料庫 > 產品自動備份頁簽下,單擊目標資料來源ID進入資料來源詳情頁。

  4. 備份策略頁面中,單擊配置備份策略

  5. 選擇備份計劃頁面單擊購買備份計劃,進入DBS售賣頁。

  6. 配置如下參數,單擊頁面右下角的立即購買

    配置項

    說明

    商品類型

    請選擇備份執行個體(訂用帳戶)

    備份執行個體地區

    選擇要存放備份資料的地區。

    說明

    請確保備份執行個體所在地區與ECS執行個體所在地區相同。

    資料來源類型

    當前僅支援MySQL

    規格

    規格越高,備份與恢複的效能越高,支援的規格為:micro(入門型)、small(低配型)、medium(中配型)、large(高配型)、xlarge(高配型-無資料傳輸量上限)。

    說明
    • 如果資料庫執行個體(例如生產環境的資料庫)需要高效能的備份執行個體快速執行備份與恢複任務,建議選擇xlarge或large規格,擷取更高的備份恢複效能。

    • 對備份恢複效能(速度)要求不高,您可以通過計算,選擇性價比最高的備份執行個體規格。更多資訊,請參見如何選擇備份執行個體規格

    • 若備份庫表存在表結構不合理、大表、大欄位等情況,備份執行個體的規格過小可能會導致後續備份執行個體資源不足,從而引發備份異常。因此,建議您在建立時選擇較高規格的備份執行個體,以免後續備份出現異常。

    備份方式

    請選擇邏輯備份

    儲存空間

    您建立時無需選擇容量,後續根據實際存入DBS內建儲存中的資料量計費。計費詳情,請參見儲存費用

    資源群組

    配置資源群組。選擇預設或自訂的資源群組,方便備份執行個體管理。

    購買數量

    按需選擇購買數量,多個資料庫執行個體需要建立多個備份執行個體,例如您希望備份資料庫執行個體A與資料庫執行個體B,需要購買2個備份執行個體。

    購買時間長度

    選擇該備份執行個體的購買時間長度。

  7. 確認訂單頁面,確認訂單資訊,閱讀並選中服務合約,單擊去支付

    支付成功後,請返回步驟5的選擇備份計劃頁面,單擊已完成支付,即可查看到已建立的備份執行個體。

    image

配置備份策略

  1. 選擇備份計劃頁面選中待配置的備份執行個體,並單擊下一步

    image

  2. 選擇庫表頁面,選中需要備份的庫或表,單擊image移動到已選擇對象框中,單擊提交

    image..png

  3. 提交成功後,您可以在備份策略頁面的邏輯備份頁簽下,單擊啟動按鈕啟動備份。

    單擊啟動後系統會立即發起一次全量和增量備份任務。

    image

    說明

    如果您期望先完成一些其他動作(例如修改備份策略等),您也可以選擇當前暫不啟動備份,但後續系統會根據備份策略在備份時間自動啟動備份。

使用者自動備份

重要
  • 目前僅支援MySQL 5.5版本的資料來源。

  • 目前僅支援華東1(杭州)地區。

配置備份源以及上傳備份檔案

  1. 登入Data Management 5.0
  2. 在頂部功能表列中,選擇安全與規範(DBS) > 資料災備(DBS) > 災備資料來源

    說明

    若您使用的是極簡模式的控制台,請單擊控制台左上方的2023-01-28_15-57-17.png表徵圖,選擇全部功能 > 安全與規範(DBS) > 資料災備(DBS) > 災備資料來源

  3. 在上方選擇地區,在雲下及他雲資料庫頁簽下,根據資料來源類型選擇新增入口。

  4. 單擊新增資料來源,在彈出的對話方塊中,配置如下資訊,並選中目標備份計劃,單擊頁面右下角下一步

    image

    配置項

    說明

    資料來源名稱

    建議配置具有業務意義的名稱,便於後續識別。

    引擎類型

    資料庫引擎類型,當前僅支援MySQL。

    引擎版本

    選擇資料庫引擎的版本。

    引擎參數

    {"lower_case_table_names":1}

    若沒有可選備份計劃,單擊購買備份計劃,進入DBS售賣頁,購買備份計劃。

    說明

    商品類型

    請選擇備份執行個體(訂用帳戶),不支援隨用隨付。

    備份執行個體地區

    選擇要存放備份資料的地區。

    資料來源類型

    選擇MySQL

    規格

    選擇xmicro。該規格提供的免費資料量額度等,請參見備份計劃規格

    備份方式

    選擇物理備份

    儲存空間

    您建立時無需選擇容量,後續根據實際存入DBS內建儲存中的資料量計費。計費詳情,請參見儲存費用

    資源群組

    配置資源群組。選擇預設或自訂的資源群組,方便備份執行個體管理。

    購買數量

    按需選擇購買數量,多個資料庫執行個體需要建立多個備份執行個體,例如您希望備份資料庫執行個體A與資料庫執行個體B,需要購買2個備份執行個體。

    購買時間長度

    選擇該備份執行個體的購買時間長度。

  5. 將備份組上傳到指定Bucket中。上傳方法,請參見使用者自動備份資料上傳指南

  6. 上傳完成後,單擊確定

查看備份資訊

使用者自動備份頁簽,單擊目標資料來源ID。

image

image

配置備份策略

  1. 使用者自動備份頁簽,單擊目標資料來源操作列的查看備份策略

    image

    image

  2. 配置完成之後,單擊確定

查看和下載備份資料

  1. 使用者自動備份頁簽,單擊目標資料來源ID。

  2. 單擊左側導覽列中的備份資料

    說明

    使用者執行上傳資料指令碼並完成資料來源建立後,當使用者新產生一個備份組資料,資料備份管理系統會同步這個備份資料到備份資料頁面。

  3. 單擊目標備份組操作列的下載按鈕擷取下載連結,將備份組下載到本地。

建立恢複任務

說明

恢複任務要求資料備份頁面上必須存在已產生的備份組,並且該備份組的狀態需為完成

  1. 使用者自動備份頁簽,單擊目標資料來源ID。

  2. 單擊左側導覽列中的備份資料,單擊建立恢複任務,手動設定恢複參數。

    參數名稱

    說明

    恢複任務名稱

    請輸入本次恢複任務的名稱。建議配置具有業務意義的名稱,便於後續識別。

    恢複位置

    選擇恢複執行個體位置,預設為恢複到RDS新執行個體。

    資料庫所在位置

    選擇資料庫所在位置,預設為RDS。

    執行個體地區

    選擇執行個體所在地區,目前僅支援華東1(杭州)地區。

    VPC

    選擇建立的專用網路。

    VSwitch

    選擇建立的交換器。

    執行個體系列

    選擇恢複的執行個體系列。

    執行個體規格

    選擇恢複的執行個體規格。

    儲存空間

    選擇所需儲存空間。

    恢複方式

    選擇恢複方式,目前僅支援按時間點恢複。

    還原時間

    選擇還原時間,可選擇恢復範圍請參考恢複方式參數後的資料。

  3. 配置完成之後單擊提交,會產生一個按所選還原時間點恢複的恢複任務,該任務資訊將以列表的形式展示在恢複任務頁面。

查看恢複任務

  1. 使用者自動備份頁簽,單擊目標資料來源ID。

  2. 單擊左側導覽列中的任務管理 > 恢複任務

  3. 點擊恢複結果列中的執行個體ID,可跳轉至恢複的RDS執行個體基本資料頁面。

查看恢複演練

  1. 使用者自動備份頁簽,單擊目標資料來源ID。

  2. 單擊左側導覽列中的恢複演練

    恢複演練概覽指標

    查看恢複演練概覽,包含恢複任務成功率恢複時間長度均值資料備份演練覆蓋率記錄備份演練覆蓋率

    名稱

    說明

    恢複任務成功率

    在恢復點在篩選時間範圍內的恢複任務成功率。

    恢複時間長度均值

    在篩選時間範圍內恢複成功任務的平均耗時。

    資料備份演練覆蓋率

    在篩選時間範圍內資料備份在開源MySQL或RDS MySQL上執行過恢複演練的覆蓋率。

    記錄備份演練覆蓋率

    在篩選時間範圍內的記錄備份在開源MySQL或RDS MySQL上執行過恢複演練的覆蓋率。

    恢複演練時間軸

    通過時間軸展示在篩選時間範圍內的各個時間點的恢複演練詳情,在時間軸上點擊滑鼠左鍵可以查看目前時間點的演練資訊。

    備份資料恢複演練詳情

    單擊資料備份,查看資料備份恢複演練詳情列表。單擊演練結果列執行個體ID,進入恢複的rds 執行個體的基本資料頁面。

    單擊記錄備份,可以查看記錄備份恢複演練詳情列表。

使用者自動備份資料上傳指南

準備工作

  • 提前配置資料來源,獲得資料來源ID,如何配置資料來源請參見添加資料來源

  • 建立一個RAM使用者並授予管理對應產品的許可權,準備好AccessKeyId和AccessKeySecret資訊。具體操作,請參見建立RAM使用者為RAM使用者授權

環境依賴

  • 命令工具:Bash、Python3。

  • Python相關:oss2、alibabacloud_openapi_util、alibabacloud_tea_openapi、alibabacloud_tea_util。

## 安裝阿里雲OpenAPI SDK
pip3 install --upgrade pip
pip3 install -i https://mirrors.aliyun.com/pypi/simple/ oss2 alibabacloud_openapi_util alibabacloud_tea_openapi alibabacloud_tea_util

完整上傳指令碼

備份資料上傳指令碼分為兩部分:

Bash指令碼

指令碼執行處理入口,按需替換實際執行的參數。

boot_backup.sh:負責類比本地xtrabackup全量備份的流程。

#!/bin/bash
## 阿里雲帳號AccessKey擁有所有API的存取權限,風險很高。強烈建議您建立並使用RAM使用者進行API訪問或日常營運
## 強烈建議不要把 AccessKey 和 AccessKeySecret 儲存到代碼裡,會存在密鑰泄漏風險,您可以根據業務需要,儲存到設定檔裡

AK=<ALIBABA_CLOUD_ACCESS_KEY_ID>
SK=<ALIBABA_CLOUD_ACCESS_KEY_SECRET>
DBS_API=dbs-api.<您的資料來源所在地區ID,例如cn-hangzhou>.aliyuncs.com
DATASOURCE_ID=<您的資料來源ID>

## 擷取指令碼當前路徑
BASE_PATH=$(cd `dirname $0`; pwd)

TS=`date +%Y%m%d_%H%M%S`
XTRA_DATA_DIR=~/tool/xtrabackup_data/$TS

mkdir -p $XTRA_DATA_DIR

## 執行xtrabackup備份命令,將錯誤記錄檔輸出的到xtrabackup.log檔案
~/innobackupex --defaults-file=/etc/my.cnf --user='root' --password='root' --host='localhost' --port=3306 --socket='/var/lib/mysql/mysql.sock' --parallel=4 $XTRA_DATA_DIR/ 2>$XTRA_DATA_DIR/xtrabackup.log

## 擷取當前xtrabackup備份的目錄
backup_dir=`ls $XTRA_DATA_DIR | grep -v xtrabackup.log | head -n1`
echo -e "\033[33mexecute innobackupex success, backup_dir: $backup_dir" && echo -n -e "\033[0m" && chmod 755 $XTRA_DATA_DIR/$backup_dir

## 打包成tar.gz檔案
cd $XTRA_DATA_DIR/$backup_dir && tar -czvf ../$backup_dir.tar.gz . && file ../$backup_dir.tar.gz
echo -e "\033[33mpackage to $backup_dir.tar.gz" && echo -n -e "\033[0m" && sleep 2

## 調用Python指令碼進行上傳備份資料,並註冊備份組中繼資料
python3 $BASE_PATH/upload_and_register_backup_set.py --access_key_id $AK --access_key_secret $SK --endpoint $DBS_API --datasource_id $DATASOURCE_ID --region_code=cn-hangzhou --data_type=FullBackup --file_path=$XTRA_DATA_DIR/$backup_dir.tar.gz --xtrabackup_log_path=$XTRA_DATA_DIR/xtrabackup.log

Python指令碼

  • upload_and_register_backup_set.py:用於上傳全量和記錄備份資料,解析對應的中繼資料,並進行備份組中繼資料的註冊。

    # -*- coding: utf-8 -*-
    # This file is auto-generated, don't edit it. Thanks.
    import os
    import argparse
    import re
    import time
    import json
    from datetime import datetime
    
    import oss2
    from alibabacloud_openapi_util.client import Client as OpenApiUtilClient
    from alibabacloud_tea_openapi import models as open_api_models
    from alibabacloud_tea_openapi.client import Client as OpenApiClient
    from alibabacloud_tea_util import models as util_models
    
    import xtrabackup_info_parser
    import xtrabackup_log_parser
    
    def init_command_args():
        parser = argparse.ArgumentParser(description="A sample command-line parser.")
        parser.add_argument("--access_key_id", help="Aliyun AccessKeyId.")
        parser.add_argument("--access_key_secret", help="Aliyun AccessKeySecret.")
        parser.add_argument("--endpoint", help="Aliyun API Endpoint.")
        parser.add_argument("--region_code", help="Aliyun DataSource RegionCode.")
        parser.add_argument("--datasource_id", help="Aliyun DataSourceId.")
        parser.add_argument("--data_type", help="BackupSet DataType: FullBackup | LogBackup.")
        parser.add_argument("--file_path", help="BackupSet File Path.")
        parser.add_argument("--xtrabackup_info_path", help="Xtrabackup Info Path.")
        parser.add_argument("--xtrabackup_log_path", help="Xtrabackup Log Path.")
        parser.add_argument("--begin_time", help="Binlog Begin Time.")
        parser.add_argument("--end_time", help="Binlog End Time.")
    
        args = parser.parse_args()
        if args.access_key_id:
            print(f"access_key_id: ************")
        if args.access_key_secret:
            print(f"access_key_secret: ************")
        if args.endpoint:
            print(f"endpoint: {args.endpoint}")
        if args.region_code:
            print(f"region_code: {args.region_code}")
        if args.datasource_id:
            print(f"datasource_id: {args.datasource_id}")
        if args.data_type:
            print(f"data_type: {args.data_type}")
        if args.file_path:
            print(f"file_path: {args.file_path}")
        if args.xtrabackup_info_path:
            print(f"xtrabackup_info_path: {args.xtrabackup_info_path}")
        if args.xtrabackup_log_path:
            print(f"xtrabackup_log_path: {args.xtrabackup_log_path}")
        if args.begin_time:
            print(f"begin_time: {args.begin_time}")
        if args.end_time:
            print(f"end_time: {args.end_time}")
    
        print('\n')
        return args
    
    
    def date_to_unix_timestamp(date_str):
        dt_obj = datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S")
        # 使用.time()方法得到時間元組,然後用time.mktime轉換為秒級時間戳記
        timestamp_seconds = time.mktime(dt_obj.timetuple())
        return int(timestamp_seconds) * 1000
    
    
    def create_oss_client(params):
        # 阿里雲OSS認證資訊
        access_key_id = params['AccessKeyId']
        access_key_secret = params['AccessKeySecret']
        security_token = params['SecurityToken']
        bucket_name = params['BucketName']
        endpoint = params['OssEndpoint']
    
        # 初始化OSS用戶端
        auth = oss2.StsAuth(access_key_id, access_key_secret, security_token)
        return oss2.Bucket(auth, endpoint, bucket_name)
    
    
    def upload_oss_file(oss_client, file_path, object_key):
        """
        分區上傳大檔案到OSS
        :param oss_client:
        :param file_path: 本地檔案路徑
        :param object_key: OSS中的對象鍵,即檔案名稱
        """
        # 設定分區大小,單位為位元組,預設1MB
        part_size = 1024 * 1024 * 5
        # 初始化分區上傳
        upload_id = oss_client.init_multipart_upload(object_key).upload_id
    
        # 開啟檔案並讀取內容
        with open(file_path, 'rb') as file_obj:
            parts = []
            while True:
                data = file_obj.read(part_size)
                if not data:
                    break
                # 上傳分區
                result = oss_client.upload_part(object_key, upload_id, len(parts) + 1, data)
                parts.append(oss2.models.PartInfo(len(parts) + 1, result.etag))
    
            # 完成分區上傳
            oss_client.complete_multipart_upload(object_key, upload_id, parts)
    
    
    class OssUploader:
    
        def __init__(self, access_key_id, access_key_secret, endpoint, region_code, datasource_id):
            self.access_key_id = access_key_id
            self.access_key_secret = access_key_secret
            self.endpoint = endpoint
            self.region_code = region_code
            self.datasource_id = datasource_id
    
            config = open_api_models.Config(access_key_id, access_key_secret)
            # Endpoint 請參考 https://api.aliyun.com/product/Rds
            config.endpoint = endpoint
            self.client = OpenApiClient(config)
    
        """
        註冊備份組中繼資料
        """
        def configure_backup_set_info(self, req_param):
            params = open_api_models.Params(
                # 介面名稱,
                action='ConfigureBackupSetInfo',
                # 介面版本,
                version='2021-01-01',
                # 介面協議,
                protocol='HTTPS',
                # 介面 HTTP 方法,
                method='POST',
                auth_type='AK',
                style='RPC',
                # 介面 PATH,
                pathname='/',
                # 介面請求體內容格式,
                req_body_type='json',
                # 介面響應體內容格式,
                body_type='json'
            )
            # runtime options
            runtime = util_models.RuntimeOptions()
            request = open_api_models.OpenApiRequest(
                query=OpenApiUtilClient.query(req_param)
            )
            # 傳回值為 Map 類型,可從 Map 中獲得三類資料:響應體 body、回應標頭 headers、HTTP 返回的狀態代碼 statusCode。
            print(f"ConfigureBackupSetInfo request: {req_param}")
            data = self.client.call_api(params, request, runtime)
    
            print(f"ConfigureBackupSetInfo response: {data}")
            return data['body']['Data']
    
        """
        擷取oss上傳資訊
        """
        def describe_bak_datasource_storage_access_info(self, req_param):
            params = open_api_models.Params(
                # 介面名稱,
                action='DescribeBakDataSourceStorageAccessInfo',
                # 介面版本,
                version='2021-01-01',
                # 介面協議,
                protocol='HTTPS',
                # 介面 HTTP 方法,
                method='POST',
                auth_type='AK',
                style='RPC',
                # 介面 PATH,
                pathname='/',
                # 介面請求體內容格式,
                req_body_type='json',
                # 介面響應體內容格式,
                body_type='json'
            )
            # runtime options
            runtime = util_models.RuntimeOptions()
            request = open_api_models.OpenApiRequest(
                query=OpenApiUtilClient.query(req_param)
            )
            # 傳回值為 Map 類型,可從 Map 中獲得三類資料:響應體 body、回應標頭 headers、HTTP 返回的狀態代碼 statusCode。
            print(f"DescribeBakDataSourceStorageAccessInfo request: {req_param}")
            data = self.client.call_api(params, request, runtime)
    
            print(f"DescribeBakDataSourceStorageAccessInfo response: {data}")
            return data['body']['Data']
    
        def _fetch_oss_access_info(self, params):
            info = self.describe_bak_datasource_storage_access_info({
                'RegionId': params['RegionId'],
                'DataSourceId': params['DataSourceId'],
                'RegionCode': params['RegionCode'],
                'BackupType': params['BackupType'],
                'BackupSetId': params['BackupSetId']
            })
            return info['OssAccessInfo']
    
        def upload_and_register_backup_set(self, file_path, data_type, extra_meta):
            filename = os.path.basename(file_path)
            params = {'BackupMode': 'Automated', 'BackupMethod': 'Physical', 'RegionId': self.region_code,
                      'RegionCode': self.region_code, 'DataSourceId': self.datasource_id, 'BackupSetName': filename,
                      'ExtraMeta': extra_meta, 'BackupType': data_type, 'UploadStatus': 'WaitingUpload'}
    
            # 首次註冊備份組,返回備份組ID
            data = self.configure_backup_set_info(params)
            params['BackupSetId'] = data['BackupSetId']
            print(f"------ configure_backup_set_info success: {file_path}, {data_type}, {params['BackupSetId']}, WaitingUpload\n")
    
            # 上傳資料到oss
            oss_info = self._fetch_oss_access_info(params)
            oss_client = create_oss_client(oss_info)
            upload_oss_file(oss_client, file_path, oss_info['ObjectKey'])
            print(f"------ upload_oss_file success: {file_path}, {data_type}, {params['BackupSetId']}\n")
    
            # 標記備份組上傳完成
            params['UploadStatus'] = 'UploadSuccess'
            self.configure_backup_set_info(params)
            print(f"------ configure_backup_set_info success: {file_path}, {data_type}, {params['BackupSetId']}, UploadSuccess\n")
    
    
    if __name__ == '__main__':
        args = init_command_args()
        uploader = OssUploader(args.access_key_id, args.access_key_secret,
                               args.endpoint, args.region_code, args.datasource_id)
    
        # 全量和記錄備份分別通過不同方式構造extraMeta
        extra_meta = '{}'
        if args.data_type == 'FullBackup':
            obj = {}
            if args.xtrabackup_log_path is not None:
                obj = xtrabackup_log_parser.analyze_slave_status(logpath=args.xtrabackup_log_path)
            elif args.xtrabackup_info_path is not None:
                parser = xtrabackup_info_parser.ExtraMetaParser(file_path=args.xtrabackup_info_path)
                obj = parser.get_extra_meta()
            extra_meta = {'BINLOG_FILE': obj.get('BINLOG_FILE'),
                          'version': obj.get("SERVER_VERSION"),
                          'dataBegin': date_to_unix_timestamp(obj.get("START_TIME")),
                          'dataEnd': date_to_unix_timestamp(obj.get("END_TIME")),
                          'consistentTime': int(date_to_unix_timestamp(obj.get("END_TIME")) / 1000)}
            extra_meta = json.dumps(extra_meta)
    
        elif args.data_type == 'LogBackup':
            obj = {'dataBegin': date_to_unix_timestamp(args.begin_time),
                   'dataEnd': date_to_unix_timestamp(args.end_time)}
            extra_meta = json.dumps(obj)
        print(f"get extra meta json string: {extra_meta}")
    
        # 上傳資料,並註冊備份組元資訊
        uploader.upload_and_register_backup_set(file_path=args.file_path, data_type=args.data_type, extra_meta=extra_meta)
  • xtrabackup_info_parser.py:使用備份檔案xtrabackup_info進行中繼資料解析。

    # -*- coding: utf-8 -*-
    # This file is auto-generated, don't edit it. Thanks.
    import re
    import json
    
    
    class ExtraMetaParser:
        def __init__(self, file_path):
            self.file_path = file_path
            pass
    
        def _parse_xtrabackup_info(self):
            config_data = {}
            with open(self.file_path, 'r') as file:
                for line in file:
                    line = line.strip()
                    if line and not line.startswith('#'):
                        key, value = line.split('=', 1)
                        config_data[key.strip()] = value.strip()
            return config_data
    
        def get_extra_meta(self):
            config_data = self._parse_xtrabackup_info()
            print(f"xtrabackup_info: {config_data}")
    
            binlog_pos = config_data.get("binlog_pos")
            pattern = f"filename '(.*)', position (.*)"
            match = re.search(pattern, binlog_pos)
    
            return {'BINLOG_FILE': match.group(1),
                    'SERVER_VERSION': config_data.get("server_version"),
                    'START_TIME': config_data.get("start_time"),
                    'END_TIME': config_data.get("end_time")}
  • xtrabackup_log_parser.py:使用備份產生的記錄檔xtrabackup.log進行中繼資料解析。

    #!/usr/bin/python
    # coding:utf8
    
    import io
    import re
    import sys
    from datetime import datetime
    
    from six import binary_type, text_type
    
    
    def parse_date_part(date_part, time_part):
        """
        根據給定的日期部分和時間部分,解析並返回完整的日期時間字串。
        """
        # 擷取當前年份的前兩位
        current_century = datetime.now().strftime("%Y")[:2]
    
        year_short = date_part[:2]
        # 組合完整年份
        year_full = current_century + year_short
        date_full = year_full + date_part[2:]
        datetime_str = date_full + " " + time_part
    
        dt = datetime.strptime(datetime_str, "%Y%m%d %H:%M:%S")
        formatted_datetime = dt.strftime("%Y-%m-%d %H:%M:%S")
        return text_type(formatted_datetime)
    
    
    def analyze_slave_status(logpath=None):
        slave_status = {}
        start_time_pattern = (
            r"(\d{6}) (\d{2}:\d{2}:\d{2}) .*Connecting to MySQL server host:"
        )
        """
            240925 17:46:58 completed OK!
            240925 02:22:58 innobackupex: completed OK!
            240925 02:22:58 xtrabackup: completed OK!
        """
        end_time_pattern = r"(\d{6}) (\d{2}:\d{2}:\d{2}) .*completed OK!"
        with io.open(logpath, "rb") as fp:
            lines = fp.read().splitlines()
            for i in reversed(range(len(lines))):
                line = lines[i]
                if isinstance(line, binary_type):
                    line = line.decode("utf-8")
    
                m = re.search(start_time_pattern, line)
                if m:
                    # 提取日期和時間部分
                    date_part = m.group(1)
                    time_part = m.group(2)
                    slave_status["START_TIME"] = parse_date_part(date_part, time_part)
                    continue
    
                m = re.search(r"Using server version (\S*)", line)
                if m:
                    slave_status["SERVER_VERSION"] = text_type(m.group(1))
                    continue
    
                m = re.search("MySQL binlog position:", line)
                if m:
                    binlog_line = line
                    m = re.search(r"filename '(\S*)'", binlog_line)
                    if m:
                        slave_status["BINLOG_FILE"] = text_type(m.group(1))
                    m = re.search(r"position (\d+)", binlog_line)
                    m2 = re.search(r"position '(\d+)'", binlog_line)
                    if m:
                        try:
                            slave_status["BINLOG_POS"] = int(m.group(1))
                        except ValueError:
                            pass
                    elif m2:
                        try:
                            slave_status["BINLOG_POS"] = int(m2.group(1))
                        except ValueError:
                            pass
                    continue
    
                m = re.search("consistent_time (\d+)", line)
                if m:
                    try:
                        slave_status["CONSISTENT_TIME"] = int(m.group(1))
                    except ValueError:
                        pass
                    continue
    
                m = re.search(end_time_pattern, line)
                if m:
                    date_part = m.group(1)
                    time_part = m.group(2)
                    slave_status["END_TIME"] = parse_date_part(date_part, time_part)
                    continue
    
        return slave_status
    
    if __name__ == "__main__":
        logpath = sys.argv[1]
        slave_status = analyze_slave_status(logpath)
        print(slave_status)
    

Bash指令碼處理流程

  1. 配置阿里雲AccessKeyId和AccessKeySecret、DBS OpenAPI Endpoint,以及對應的資料來源ID。

    #!/bin/bash
    ## 阿里雲帳號AccessKey擁有所有API的存取權限,風險很高。強烈建議您建立並使用RAM使用者進行API訪問或日常營運
    ## 強烈建議不要把 AccessKey 和 AccessKeySecret 儲存到代碼裡,會存在密鑰泄漏風險,您可以根據業務需要,儲存到設定檔裡
    
    AK=<ALIBABA_CLOUD_ACCESS_KEY_ID>
    SK=<ALIBABA_CLOUD_ACCESS_KEY_SECRET>
    DBS_API=dbs-api.<您的資料來源所在地區ID,例如cn-hangzhou>.aliyuncs.com
    DATASOURCE_ID=<您的資料來源ID>
  2. 執行xtrabackup備份命令,將備份資料儲存到指定的目錄,同時將錯誤記錄檔輸出到xtrabackup.log檔案中。

    ## 擷取指令碼當前路徑
    BASE_PATH=$(cd `dirname $0`; pwd)
    
    TS=`date +%Y%m%d_%H%M%S`
    XTRA_DATA_DIR=~/tool/xtrabackup_data/$TS
    
    mkdir -p $XTRA_DATA_DIR
    
    ## 執行xtrabackup備份命令,將錯誤記錄檔輸出的到xtrabackup.log檔案
    ~/innobackupex --defaults-file=/etc/my.cnf --user='<您的資料庫帳號,例如root>' --password='<您的資料庫密碼,例如root>' --host='<您的資料庫IP,例如localhost>' --port=<您的資料庫連接埠號碼,例如3306> --socket='/var/lib/mysql/mysql.sock' --parallel=4 $XTRA_DATA_DIR/ 2>$XTRA_DATA_DIR/xtrabackup.log
    • 後續將解析xtrabackup.log檔案獲得全量備份組的中繼資料資訊:備份開始時間、備份結束時間、一致性時間點、對應Binlog名稱。

    • 限制說明:

      當前執行xtrabackup備份命令,暫不支援壓縮和加密參數(--compress --compress-threads和--encrypt --encrypt-key-file --encrypt-threads),使用xtrabackup備份命令時請先刪除對應的參數。

  3. 將檔案目錄形式的全量備份資料進行gzip壓縮和打包成tar.gz格式的檔案。

    ## 擷取當前xtrabackup備份的目錄
    backup_dir=`ls $XTRA_DATA_DIR | grep -v xtrabackup.log | head -n1`
    echo -e "\033[33mexecute innobackupex success, backup_dir: $backup_dir" && echo -n -e "\033[0m" && chmod 755 $XTRA_DATA_DIR/$backup_dir
    
    ## 打包成tar.gz檔案
    cd $XTRA_DATA_DIR/$backup_dir && tar -czvf ../$backup_dir.tar.gz . && file ../$backup_dir.tar.gz
    echo -e "\033[33mpackage to $backup_dir.tar.gz" && echo -n -e "\033[0m" && sleep 2
    • 限制說明:當前僅支援檔案格式的備份資料上傳,支援如下格式:

      • tar:目錄檔案進行tar打包。

      • tar.gz:目錄檔案進行tar打包,然後進行gzip壓縮。

    • 注意事項:

      • 使用tar命令打包前,需要chmod 755命令修改檔案目錄許可權。

      • 需進入檔案夾根目錄,進行tar命令的打包和壓縮。

  4. 調用Python指令碼upload_and_register_backup_set.py進行備份資料的上傳,同時註冊備份組中繼資料。

    ## 調用Python指令碼進行上傳備份資料,並註冊備份組中繼資料
    python3 $BASE_PATH/upload_and_register_backup_set.py --access_key_id $AK --access_key_secret $SK --endpoint $DBS_API --datasource_id $DATASOURCE_ID --region_code=cn-hangzhou --data_type=FullBackup --file_path=$XTRA_DATA_DIR/$backup_dir.tar.gz --xtrabackup_log_path=$XTRA_DATA_DIR/xtrabackup.log

    參數

    說明

    --access_key_id

    阿里雲AccessKeyId。

    --access_key_secret

    阿里雲AccessKeySecret。

    --endpoint

    DBS OpenAPI對應的Endpoint。請參見服務存取點

    --datasource_id

    DBS對應的資料來源ID。

    --region_code

    對應地區資訊。

    --data_type

    備份資料類型,全量備份組:FullBackup,記錄備份:LogBackup。

    --file_path

    全量備份資料路徑。

    --xtrabackup_log_path

    全量備份xtrabackup命令產生的xtrabackup.log檔案路徑。

Python指令碼處理流程

  1. main函數主入口:

    根據data_type傳參為FullBackug或者LogBackup,分別構造中繼資料註冊依賴的extra_meta資訊:

    說明
    • BINLOG_FILE:對應Binlog名稱。

    • dataBegin:備份開始時間,毫秒層級時間戳記。

    • dataEnd:備份結束時間,毫秒層級時間戳記。

    • consistentTime:一致性時間點,秒級時間戳記。

    • 全量備份extra_meta格式:

      {
        'BINLOG_FILE':'mysql-bin.001',
        'version':'5.5',
        'dataBegin':17274********,
        'dataEnd':17274********,
        'consistentTime':17274********
      }
    • 記錄備份extra_meta格式:

      {
        'dataBegin':17274********,
        'dataEnd':17274********
      }
  2. 調用OssUploader.upload_and_register_backup_set方法進行備份資料上傳和中繼資料註冊。

    if __name__ == '__main__':
        args = init_command_args()
        uploader = OssUploader(args.access_key_id, args.access_key_secret,
                               args.endpoint, args.region_code, args.datasource_id)
    
        # 全量和記錄備份分別通過不同方式構造extraMeta
        extra_meta = '{}'
        if args.data_type == 'FullBackup':
            obj = {}
            if args.xtrabackup_log_path is not None:
                obj = xtrabackup_log_parser.analyze_slave_status(logpath=args.xtrabackup_log_path)
            elif args.xtrabackup_info_path is not None:
                parser = xtrabackup_info_parser.ExtraMetaParser(file_path=args.xtrabackup_info_path)
                obj = parser.get_extra_meta()
            extra_meta = {'BINLOG_FILE': obj.get('BINLOG_FILE'),
                          'version': obj.get("SERVER_VERSION"),
                          'dataBegin': date_to_unix_timestamp(obj.get("START_TIME")),
                          'dataEnd': date_to_unix_timestamp(obj.get("END_TIME")),
                          'consistentTime': int(date_to_unix_timestamp(obj.get("END_TIME")) / 1000)}
            extra_meta = json.dumps(extra_meta)
    
        elif args.data_type == 'LogBackup':
            obj = {'dataBegin': date_to_unix_timestamp(args.begin_time),
                   'dataEnd': date_to_unix_timestamp(args.end_time)}
            extra_meta = json.dumps(obj)
        print(f"get extra meta json string: {extra_meta}")
    
        # 上傳資料,並註冊備份組元資訊
        uploader.upload_and_register_backup_set(file_path=args.file_path, data_type=args.data_type, extra_meta=extra_meta)
  3. OssUploader.upload_and_register_backup_set方法備份資料上傳流程。

    class OssUploader:
    
        def __init__(self, access_key_id, access_key_secret, endpoint, region_code, datasource_id):
            self.access_key_id = access_key_id
            self.access_key_secret = access_key_secret
            self.endpoint = endpoint
            self.region_code = region_code
            self.datasource_id = datasource_id
    
            config = open_api_models.Config(access_key_id, access_key_secret)
            # Endpoint 請參考 https://api.aliyun.com/product/Rds
            config.endpoint = endpoint
            self.client = OpenApiClient(config)
    
        """
        註冊備份組中繼資料
        """
        def configure_backup_set_info(self, req_param):
            params = open_api_models.Params(
                # 介面名稱,
                action='ConfigureBackupSetInfo',
                # 介面版本,
                version='2021-01-01',
                # 介面協議,
                protocol='HTTPS',
                # 介面 HTTP 方法,
                method='POST',
                auth_type='AK',
                style='RPC',
                # 介面 PATH,
                pathname='/',
                # 介面請求體內容格式,
                req_body_type='json',
                # 介面響應體內容格式,
                body_type='json'
            )
            # runtime options
            runtime = util_models.RuntimeOptions()
            request = open_api_models.OpenApiRequest(
                query=OpenApiUtilClient.query(req_param)
            )
            # 傳回值為 Map 類型,可從 Map 中獲得三類資料:響應體 body、回應標頭 headers、HTTP 返回的狀態代碼 statusCode。
            print(f"ConfigureBackupSetInfo request: {req_param}")
            data = self.client.call_api(params, request, runtime)
    
            print(f"ConfigureBackupSetInfo response: {data}")
            return data['body']['Data']
    
        """
        擷取oss上傳資訊
        """
        def describe_bak_datasource_storage_access_info(self, req_param):
            params = open_api_models.Params(
                # 介面名稱,
                action='DescribeBakDataSourceStorageAccessInfo',
                # 介面版本,
                version='2021-01-01',
                # 介面協議,
                protocol='HTTPS',
                # 介面 HTTP 方法,
                method='POST',
                auth_type='AK',
                style='RPC',
                # 介面 PATH,
                pathname='/',
                # 介面請求體內容格式,
                req_body_type='json',
                # 介面響應體內容格式,
                body_type='json'
            )
            # runtime options
            runtime = util_models.RuntimeOptions()
            request = open_api_models.OpenApiRequest(
                query=OpenApiUtilClient.query(req_param)
            )
            # 傳回值為 Map 類型,可從 Map 中獲得三類資料:響應體 body、回應標頭 headers、HTTP 返回的狀態代碼 statusCode。
            print(f"DescribeBakDataSourceStorageAccessInfo request: {req_param}")
            data = self.client.call_api(params, request, runtime)
    
            print(f"DescribeBakDataSourceStorageAccessInfo response: {data}")
            return data['body']['Data']
    
        def _fetch_oss_access_info(self, params):
            info = self.describe_bak_datasource_storage_access_info({
                'RegionId': params['RegionId'],
                'DataSourceId': params['DataSourceId'],
                'RegionCode': params['RegionCode'],
                'BackupType': params['BackupType'],
                'BackupSetId': params['BackupSetId']
            })
            return info['OssAccessInfo']
    
        def upload_and_register_backup_set(self, file_path, data_type, extra_meta):
            filename = os.path.basename(file_path)
            params = {'BackupMode': 'Automated', 'BackupMethod': 'Physical', 'RegionId': self.region_code,
                      'RegionCode': self.region_code, 'DataSourceId': self.datasource_id, 'BackupSetName': filename,
                      'ExtraMeta': extra_meta, 'BackupType': data_type, 'UploadStatus': 'WaitingUpload'}
    
            # 首次註冊備份組,返回備份組ID
            data = self.configure_backup_set_info(params)
            params['BackupSetId'] = data['BackupSetId']
            print(f"------ configure_backup_set_info success: {file_path}, {data_type}, {params['BackupSetId']}, WaitingUpload\n")
    
            # 上傳資料到oss
            oss_info = self._fetch_oss_access_info(params)
            oss_client = create_oss_client(oss_info)
            upload_oss_file(oss_client, file_path, oss_info['ObjectKey'])
            print(f"------ upload_oss_file success: {file_path}, {data_type}, {params['BackupSetId']}\n")
    
            # 標記備份組上傳完成
            params['UploadStatus'] = 'UploadSuccess'
            self.configure_backup_set_info(params)
            print(f"------ configure_backup_set_info success: {file_path}, {data_type}, {params['BackupSetId']}, UploadSuccess\n")