All Products
Search
Document Center

Key Management Service:Rotasi rahasia generik menggunakan Function Compute

Last Updated:Apr 01, 2026

Otomatisasi rotasi rahasia generik yang disimpan di KMS Secrets Manager menggunakan Function Compute dan CloudFlow. Panduan ini mencakup dua skenario rotasi database: instans ApsaraDB RDS yang diakses melalui API RDS, dan database MySQL yang dikelola sendiri yang diakses melalui akun istimewa.

Penting

KMS Secrets Manager menyediakan rahasia dinamis dengan rotasi otomatis bawaan untuk ApsaraDB RDS, RAM, dan ECS. Jika rahasia dinamis mencakup kasus penggunaan Anda, gunakanlah—biayanya lebih rendah dan tidak memerlukan kode kustom. Lihat Ikhtisar rahasia ApsaraDB RDS dinamis, Ikhtisar rahasia RAM dinamis, atau Ikhtisar rahasia ECS dinamis.

Penagihan

Rotasi rahasia KMS gratis. Biaya berlaku untuk Function Compute dan CloudFlow. Lihat Penagihan Function Compute dan Penagihan CloudFlow.

Pilih strategi rotasi

Kedua skenario dalam panduan ini menggunakan rotasi dua akun (pengguna bergantian). Strategi ini mempertahankan dua akun database—hanya satu yang aktif pada satu waktu—sehingga aplikasi Anda selalu memiliki kredensial yang valid selama rotasi. Akun yang tidak aktif menerima kredensial baru, lalu menjadi akun aktif.

StrategiCara kerjaKetersediaan selama rotasiPaling cocok untuk
Single-accountMemperbarui kredensial untuk satu akun secara langsungJeda singkat saat kredensial diperbaruiPengaturan sederhana, pengguna ad hoc
Dual-accountBergantian antara dua akun; salah satu selalu aktifTanpa gangguanDatabase produksi, persyaratan ketersediaan tinggi

Akhiran rotasi _rt mengidentifikasi akun alternatif. Misalnya, jika akun aktif adalah appuser, maka akun alternatifnya adalah appuser_rt. Nama akun termasuk akhiran tersebut tidak boleh melebihi 16 karakter.

Cara kerja

Kedua skenario mengikuti proses yang sama yang diatur oleh CloudFlow. Jadwal berbasis waktu CloudFlow memicu alur, yang memanggil fungsi Function Compute yang sama beberapa kali dengan nilai Step berbeda:

StepApa yang dilakukan fungsiIdempotensi
newMenghasilkan password baru, menulisnya ke versi rahasia tertunda (ACSPending)Jika ACSPending sudah ada untuk ID versi saat ini, melewati pembuatan rahasia
setMembuat atau memperbarui akun database alternatif dengan kredensial tertundaAman untuk dicoba ulang — memverifikasi status akun sebelum melakukan perubahan
testMemverifikasi kredensial tertunda dapat terhubung ke database (hanya untuk skenario MySQL)Pemeriksaan konektivitas hanya-baca
endMeningkatkan ACSPending menjadi ACSCurrent, menyelesaikan rotasiPembaruan tahap bersifat atomik

Setiap langkah bersifat idempoten, sehingga alur aman untuk dicoba ulang setelah kegagalan sebagian.

Skenario 1: Rotasi rahasia generik ApsaraDB RDS

Gunakan pendekatan ini ketika database Anda berjalan di ApsaraDB RDS dan Anda ingin merotasi kredensial menggunakan API RDS.

Prasyarat

Sebelum memulai, pastikan Anda telah memiliki:

  • Akun Function Compute aktif. Lihat Aktifkan Function Compute.

  • Akun database ApsaraDB RDS dan rahasia generik untuk akun tersebut di konsol KMS.

  • Peran layanan Function Compute dengan izin yang tercantum di Langkah 1.

Langkah 1: Siapkan izin

Buat peran layanan untuk Function Compute dan sambungkan kebijakan berikut:

  1. AliyunFCDefaultRolePolicy — kebijakan sistem

  2. AliyunSTSAssumeRoleAccess — kebijakan sistem

  3. Kebijakan kustom yang memberikan akses ke KMS dan RDS:

{
    "Version": "1",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kms:GetSecretValue",
                "kms:GetRandomPassword",
                "kms:PutSecretValue",
                "kms:UpdateSecretVersionStage"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "rds:GrantAccountPrivilege",
                "rds:DescribeAccounts",
                "rds:ResetAccountPassword",
                "rds:CreateAccount"
            ],
            "Resource": "*"
        }
    ]
}

Lihat Berikan izin Function Compute untuk mengakses layanan Alibaba Cloud lainnya untuk instruksi tentang menyambungkan kebijakan.

Langkah 2: Buat rahasia generik

Buat rahasia generik di konsol KMS dengan JSON berikut sebagai nilai rahasia. Nama rahasia nantinya diteruskan sebagai parameter penjadwalan ke CloudFlow.

{
    "AccountName": "<rds-account-name>",
    "AccountPassword": "<rds-account-password>"
}
FieldDescription
AccountNameUsername akun database RDS
AccountPasswordPassword akun database RDS

Langkah 3: Buat fungsi rotasi

  1. Di konsol Function Compute, buat layanan. Klik Show Advanced Options, atur Access to VPC ke Yes, dan konfigurasikan VPC, vSwitch, dan Security Group. Grup keamanan harus mengizinkan fungsi untuk mengakses API RDS dan titik akhir VPC KMS (kms-vpc.<region-id>.aliyuncs.com).

  2. Buat fungsi dengan Runtime Environments diatur ke Python 3.9. Gunakan kode berikut:

# -*- coding: utf-8 -*-
import json
import logging
import os

from aliyunsdkrds.request.v20140815.CreateAccountRequest import CreateAccountRequest
from aliyunsdkrds.request.v20140815.DescribeAccountsRequest import DescribeAccountsRequest
from aliyunsdkrds.request.v20140815.GrantAccountPrivilegeRequest import GrantAccountPrivilegeRequest
from aliyunsdkrds.request.v20140815.ResetAccountPasswordRequest import ResetAccountPasswordRequest

from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkcore.auth.credentials import StsTokenCredential
from aliyunsdkcore.client import AcsClient
from aliyunsdkkms.request.v20160120.GetRandomPasswordRequest import GetRandomPasswordRequest
from aliyunsdkkms.request.v20160120.GetSecretValueRequest import GetSecretValueRequest
from aliyunsdkkms.request.v20160120.PutSecretValueRequest import PutSecretValueRequest
from aliyunsdkkms.request.v20160120.UpdateSecretVersionStageRequest import UpdateSecretVersionStageRequest

logger = logging.getLogger()
logger.setLevel(logging.INFO)


def handler(event, context):
    evt = json.loads(event)
    secret_name = evt['SecretName']
    region_id = evt['RegionId']
    step = evt['Step']
    instance_id = evt['InstanceId']
    version_id = evt.get('VersionId')
    if not version_id:
        version_id = context.requestId
    credentials = StsTokenCredential(context.credentials.accessKeyId, context.credentials.accessKeySecret,
                                     context.credentials.securityToken)
    client = AcsClient(region_id=region_id, credential=credentials)

    endpoint = "kms-vpc." + region_id + ".aliyuncs.com"
    client.add_endpoint(region_id, 'kms', endpoint)
    resp = get_secret_value(client, secret_name)
    if "Generic" != resp['SecretType']:
        logger.error("Secret %s is not enabled for rotation" % secret_name)
        raise ValueError("Secret %s is not enabled for rotation" % secret_name)

    if step == "new":
        new_phase(client, secret_name, version_id)

    elif step == "set":
        set_phase(client, instance_id, secret_name, version_id)

    elif step == "end":
        end_phase(client, secret_name, version_id)

    else:
        logger.error("handler: Invalid step parameter %s for secret %s" % (step, secret_name))
        raise ValueError("Invalid step parameter %s for secret %s" % (step, secret_name))
    return {"VersionId": version_id}


def new_phase(client, secret_name, version_id):
    current_dict = get_secret_dict(client, secret_name, "ACSCurrent")
    try:
        get_secret_dict(client, secret_name, "ACSPending", version_id)
        logger.info("new: Successfully retrieved secret for %s." % secret_name)
    except ServerException as e:
        if e.error_code != 'Forbidden.ResourceNotFound':
            raise ValueError("Can to find secret %s " % (secret_name))
        current_dict['AccountName'] = get_alt_account_name(current_dict['AccountName'])

        exclude_characters = os.environ[
            'EXCLUDE_CHARACTERS'] if 'EXCLUDE_CHARACTERS' in os.environ else "\\\"\',./:;<>?[]{|}~`"
        passwd = get_random_password(client, exclude_characters)
        current_dict['AccountPassword'] = passwd['RandomPassword']
        put_secret_value(client, secret_name, version_id, json.dumps(current_dict),
                         json.dumps(['ACSPending']))
        logger.info(
            "new: Successfully put secret for secret_name %s and version %s." % (secret_name, version_id))


def set_phase(client, instance_id, secret_name, version_id):
    current_dict = get_secret_dict(client, secret_name, "ACSCurrent")
    pending_dict = get_secret_dict(client, secret_name, "ACSPending", version_id)
    pending_resp = describe_accounts(client, instance_id, pending_dict["AccountName"])
    pending_accounts = pending_resp["Accounts"]["DBInstanceAccount"]

    if get_alt_account_name(current_dict['AccountName']) != pending_dict['AccountName']:
        logger.error("set: Attempting to modify user %s other than current user or rotation %s" % (
            pending_dict['AccountName'], current_dict['AccountName']))
        raise ValueError("Attempting to modify user %s other than current user or rotation %s" % (
            pending_dict['AccountName'], current_dict['AccountName']))

    current_resp = describe_accounts(client, instance_id, current_dict["AccountName"])
    current_accounts = current_resp["Accounts"]["DBInstanceAccount"]
    if len(current_accounts) == 0:
        logger.error("set: Unable to log into database using current credentials for secret %s" % secret_name)
        raise ValueError("Unable to log into database using current credentials for secret %s" % secret_name)
    if len(pending_accounts) == 0:
        create_rds_account(client, instance_id, pending_dict["AccountName"],
                           pending_dict["AccountPassword"], current_accounts[0]["AccountType"])
        pending_accounts = describe_accounts(client, instance_id, pending_dict["AccountName"])["Accounts"][
            "DBInstanceAccount"]
    else:
        # reset password
        reset_account_password(client, instance_id, pending_dict["AccountName"], pending_dict["AccountPassword"])

    current_privileges = current_accounts[0]["DatabasePrivileges"]["DatabasePrivilege"]
    pending_privileges = pending_accounts[0]["DatabasePrivileges"]["DatabasePrivilege"]
    if len(current_privileges) > 0:
        for current_privilege in current_privileges:
            is_contains = False
            for pending_privilege in pending_privileges:
                if current_privilege["DBName"] == pending_privilege["DBName"] and current_privilege[
                    "AccountPrivilege"] == pending_privilege["AccountPrivilege"]:
                    is_contains = True
                    continue
            if not is_contains:
                grant_account_privilege(client, instance_id, pending_dict["AccountName"], current_privilege["DBName"],
                                        current_privilege["AccountPrivilege"])


def end_phase(client, secret_name, version_id):
    update_secret_version_stage(client, secret_name, 'ACSCurrent', move_to_version=version_id)
    update_secret_version_stage(client, secret_name, 'ACSPending', remove_from_version=version_id)
    logger.info(
        "end: Successfully set ACSCurrent stage to version %s for secret %s." % (version_id, secret_name))


def get_secret_dict(client, secret_name, stage, version_id=None):
    required_fields = ['AccountName', 'AccountPassword']
    if version_id:
        secret = get_secret_value(client, secret_name, version_id, stage)
    else:
        secret = get_secret_value(client, secret_name, stage=stage)
    plaintext = secret['SecretData']
    secret_dict = json.loads(plaintext)
    for field in required_fields:
        if field not in secret_dict:
            raise KeyError("%s key is missing from secret JSON" % field)
    return secret_dict


def get_alt_account_name(current_account_name):
    rotation_suffix = "_rt"
    if current_account_name.endswith(rotation_suffix):
        return current_account_name[:(len(rotation_suffix) * -1)]
    else:
        new_account_name = current_account_name + rotation_suffix
        if len(new_account_name) > 16:
            raise ValueError(
                "Unable to rotation user, account_name length with _rotation appended would exceed 16 characters")
        return new_account_name


def get_secret_value(client, secret_name, version_id=None, stage=None):
    request = GetSecretValueRequest()
    request.set_accept_format('json')
    request.set_SecretName(secret_name)
    if version_id:
        request.set_VersionId(version_id)
    if stage:
        request.set_VersionStage(stage)
    response = client.do_action_with_exception(request)
    return json.loads(response)


def put_secret_value(client, secret_name, version_id, secret_data, version_stages=None):
    request = PutSecretValueRequest()
    request.set_accept_format('json')
    request.set_SecretName(secret_name)
    request.set_VersionId(version_id)
    if version_stages:
        request.set_VersionStages(version_stages)
    request.set_SecretData(secret_data)
    response = client.do_action_with_exception(request)
    return json.loads(response)


def get_random_password(client, exclude_characters=None):
    request = GetRandomPasswordRequest()
    request.set_accept_format('json')
    if exclude_characters:
        request.set_ExcludeCharacters(exclude_characters)
    response = client.do_action_with_exception(request)
    return json.loads(response)


def update_secret_version_stage(client, secret_name, version_stage, remove_from_version=None, move_to_version=None):
    request = UpdateSecretVersionStageRequest()
    request.set_accept_format('json')
    request.set_VersionStage(version_stage)
    request.set_SecretName(secret_name)
    if remove_from_version:
        request.set_RemoveFromVersion(remove_from_version)
    if move_to_version:
        request.set_MoveToVersion(move_to_version)
    response = client.do_action_with_exception(request)
    return json.loads(response)


def create_rds_account(client, db_instance_id, account_name, account_password, account_type):
    request = CreateAccountRequest()
    request.set_accept_format('json')
    request.set_DBInstanceId(db_instance_id)
    request.set_AccountName(account_name)
    request.set_AccountPassword(account_password)
    request.set_AccountType(account_type)
    response = client.do_action_with_exception(request)
    return json.loads(response)


def grant_account_privilege(client, db_instance_id, account_name, db_name, account_privilege):
    request = GrantAccountPrivilegeRequest()
    request.set_accept_format('json')
    request.set_DBInstanceId(db_instance_id)
    request.set_AccountName(account_name)
    request.set_DBName(db_name)
    request.set_AccountPrivilege(account_privilege)
    response = client.do_action_with_exception(request)
    return json.loads(response)


def describe_accounts(client, db_instance_id, account_name):
    request = DescribeAccountsRequest()
    request.set_accept_format('json')
    request.set_DBInstanceId(db_instance_id)
    request.set_AccountName(account_name)
    response = client.do_action_with_exception(request)
    return json.loads(response)


def reset_account_password(client, db_instance_id, account_name, account_password):
    request = ResetAccountPasswordRequest()
    request.set_accept_format('json')
    request.set_DBInstanceId(db_instance_id)
    request.set_AccountName(account_name)
    request.set_AccountPassword(account_password)
    response = client.do_action_with_exception(request)
    return json.loads(response)

Langkah 4: Buat alur rotasi CloudFlow

  1. Masuk ke konsol CloudFlow dan pilih wilayah yang sama dengan fungsi Function Compute Anda.

  2. Di halaman Flows, klik Create flow.

  3. Di halaman Create Flow, klik Create Flow with Code, atur YAML Definition ke berikut, lalu klik Next Step:

version: v1
type: flow
steps:
  - type: task
    name: RotateSecretNew
    resourceArn: <function-arn>
    inputMappings:
      - target: SecretName
        source: $input.payload.SecretName
      - target: RegionId
        source: $input.payload.RegionId
      - target: InstanceId
        source: $input.payload.InstanceId
      - target: Step
        source: new
  - type: task
    name: RotateSecretSet
    resourceArn: <function-arn>
    inputMappings:
      - target: SecretName
        source: $input.payload.SecretName
      - target: RegionId
        source: $input.payload.RegionId
      - target: InstanceId
        source: $input.payload.InstanceId
      - target: Step
        source: set
      - target: VersionId
        source: $local.VersionId
  - type: task
    name: RotateSecretEnd
    resourceArn: <function-arn>
    inputMappings:
      - target: SecretName
        source: $input.payload.SecretName
      - target: RegionId
        source: $input.payload.RegionId
      - target: InstanceId
        source: $input.payload.InstanceId
      - target: Step
        source: end
      - target: VersionId
        source: $local.VersionId

Ganti <function-arn> dengan Nama Sumber Daya Alibaba Cloud (ARN) dari fungsi yang Anda buat.

  1. Konfigurasikan peran alur, lalu klik Create Flow.

  2. Untuk menjadwalkan rotasi otomatis, buat jadwal berbasis waktu. Lihat Buat jadwal berbasis waktu. Atur Payload ke:

{
    "SecretName": "<secret-name>",
    "RegionId": "<region-id>",
    "InstanceId": "<rds-instance-id>"
}
FieldDescription
SecretNameNama rahasia generik di KMS
RegionIdWilayah tempat rahasia dan fungsi dideploy
InstanceIdID instans ApsaraDB RDS

Langkah 5: Verifikasi rotasi

Setelah mengatur alur, picu eksekusi secara manual untuk memastikan semuanya berfungsi:

  1. Di konsol CloudFlow, buka alur Anda dan klik Start Execution dengan JSON payload yang sama seperti di Langkah 4.

  2. Tunggu hingga eksekusi selesai. Ketiga langkah (RotateSecretNew, RotateSecretSet, RotateSecretEnd) harus menunjukkan status Succeeded.

  3. Di konsol KMS, buka rahasia dan periksa riwayat versinya. Label ACSCurrent sekarang harus mengarah ke versi baru.

Langkah 6: (Opsional) Hubungkan aplikasi ke Secrets Manager

Gunakan Secrets Manager JDBC untuk mengambil kredensial yang telah dirotasi di aplikasi Anda tanpa perubahan kode. Lihat Secrets Manager JDBC.

Skenario 2: Rotasi rahasia generik MySQL yang dikelola sendiri

Gunakan pendekatan ini ketika database Anda adalah instans MySQL yang dikelola sendiri. Skenario ini menggunakan akun istimewa untuk mengelola rotasi kredensial dan menambahkan langkah test untuk memverifikasi konektivitas sebelum menyelesaikan rotasi.

Prasyarat

Sebelum memulai, pastikan Anda telah memiliki:

  • Akun Function Compute aktif. Lihat Aktifkan Function Compute.

  • Akun MySQL istimewa dan rahasia generik untuk akun tersebut di konsol KMS.

  • Rahasia generik kedua untuk akun reguler yang akan dirotasi.

  • Peran layanan Function Compute dengan izin yang tercantum di Langkah 1.

Langkah 1: Siapkan izin

Buat peran layanan untuk Function Compute dan sambungkan kebijakan berikut:

  1. AliyunFCDefaultRolePolicy — kebijakan sistem

  2. AliyunSTSAssumeRoleAccess — kebijakan sistem

  3. Kebijakan kustom yang memberikan akses ke KMS:

{
    "Version": "1",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kms:GetSecretValue",
                "kms:GetRandomPassword",
                "kms:Decrypt",
                "kms:GenerateDataKey",
                "kms:PutSecretValue",
                "kms:UpdateSecretVersionStage"
            ],
            "Resource": "*"
        }
    ]
}

Langkah 2: Buat rahasia generik

Buat dua rahasia generik di konsol KMS.

Rahasia akun istimewa — digunakan untuk membuat dan mengelola akun reguler:

{
    "Endpoint": "<mysql-host>",
    "AccountName": "<privileged-account-name>",
    "AccountPassword": "<privileged-account-password>",
    "SSL": false
}
FieldDescriptionDefault
EndpointNama domain atau alamat IP database MySQL
AccountNameUsername akun istimewa
AccountPasswordPassword akun istimewa
SSLApakah menggunakan sertifikat SSL untuk koneksifalse

Rahasia akun reguler — rahasia yang akan dirotasi. Nama rahasia ini diteruskan sebagai parameter penjadwalan ke CloudFlow:

{
    "Endpoint": "<mysql-host>",
    "AccountName": "<account-name>",
    "AccountPassword": "<account-password>",
    "MasterSecret": "<privileged-secret-name>",
    "SSL": false
}
FieldDescriptionDefault
EndpointNama domain atau alamat IP database MySQL
AccountNameUsername akun yang akan dirotasi
AccountPasswordPassword akun yang akan dirotasi
MasterSecretNama rahasia akun istimewa di KMS
SSLApakah menggunakan sertifikat SSL untuk koneksifalse
Jika SSL bernilai true, fungsi mengharapkan sertifikat SSL berada di /opt/python/certs/cert.pem. Tambahkan sertifikat ke layer kustom (lihat Langkah 3).

Langkah 3: Buat fungsi rotasi

  1. Buat layanan Function Compute dengan akses VPC diaktifkan (konfigurasi sama seperti Skenario 1). Pastikan fungsi dapat mengakses instans MySQL dan titik akhir VPC KMS.

  2. Buat fungsi dengan Runtime Environments diatur ke Python 3.9. Gunakan kode berikut:

# -*- coding: utf-8 -*-
import json
import logging
import os

try:
    import pymysql
except:
    os.system('pip install pymysql -t ./')
    import pymysql
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkcore.auth.credentials import StsTokenCredential
from aliyunsdkcore.client import AcsClient
from aliyunsdkkms.request.v20160120.GetRandomPasswordRequest import GetRandomPasswordRequest
from aliyunsdkkms.request.v20160120.GetSecretValueRequest import GetSecretValueRequest
from aliyunsdkkms.request.v20160120.PutSecretValueRequest import PutSecretValueRequest
from aliyunsdkkms.request.v20160120.UpdateSecretVersionStageRequest import UpdateSecretVersionStageRequest
from aliyunsdkrds.request.v20140815.DescribeDBInstancesRequest import DescribeDBInstancesRequest

logger = logging.getLogger()
logger.setLevel(logging.INFO)


def handler(event, context):
    evt = json.loads(event)
    secret_name = evt['SecretName']
    region_id = evt['RegionId']
    step = evt['Step']
    version_id = evt.get('VersionId')
    if not version_id:
        version_id = context.requestId
    credentials = StsTokenCredential(context.credentials.accessKeyId, context.credentials.accessKeySecret,
                                     context.credentials.securityToken)
    client = AcsClient(region_id=region_id, credential=credentials)

    endpoint = "kms-vpc." + region_id + ".aliyuncs.com"
    client.add_endpoint(region_id, 'kms', endpoint)
    resp = get_secret_value(client, secret_name)
    if "Generic" != resp['SecretType']:
        logger.error("Secret %s is not enabled for rotation" % secret_name)
        raise ValueError("Secret %s is not enabled for rotation" % secret_name)

    if step == "new":
        new_phase(client, secret_name, version_id)

    elif step == "set":
        set_phase(client, secret_name, version_id)

    elif step == "test":
        test_phase(client, secret_name, version_id)

    elif step == "end":
        end_phase(client, secret_name, version_id)

    else:
        logger.error("handler: Invalid step parameter %s for secret %s" % (step, secret_name))
        raise ValueError("Invalid step parameter %s for secret %s" % (step, secret_name))
    return {"VersionId": version_id}


def new_phase(client, secret_name, version_id):
    current_dict = get_secret_dict(client, secret_name, "ACSCurrent")
    try:
        get_secret_dict(client, secret_name, "ACSPending", version_id)
        logger.info("new: Successfully retrieved secret for %s." % secret_name)
    except ServerException as e:
        if e.error_code != 'Forbidden.ResourceNotFound':
            raise
        current_dict['AccountName'] = get_alt_account_name(current_dict['AccountName'])

        exclude_characters = os.environ['EXCLUDE_CHARACTERS'] if 'EXCLUDE_CHARACTERS' in os.environ else '/@"\'\\'
        passwd = get_random_password(client, exclude_characters)
        current_dict['AccountPassword'] = passwd['RandomPassword']
        put_secret_value(client, secret_name, version_id, json.dumps(current_dict),
                         json.dumps(['ACSPending']))
        logger.info(
            "new: Successfully put secret for secret_name %s and version %s." % (secret_name, version_id))


def set_phase(client, secret_name, version_id):
    current_dict = get_secret_dict(client, secret_name, "ACSCurrent")
    pending_dict = get_secret_dict(client, secret_name, "ACSPending", version_id)

    conn = get_connection(pending_dict)
    if conn:
        conn.close()
        logger.info(
            "set: ACSPending secret is already set as password in MySQL DB for secret secret_name %s." % secret_name)
        return

    if get_alt_account_name(current_dict['AccountName']) != pending_dict['AccountName']:
        logger.error("set: Attempting to modify user %s other than current user or rotation %s" % (
            pending_dict['AccountName'], current_dict['AccountName']))
        raise ValueError("Attempting to modify user %s other than current user or rotation %s" % (
            pending_dict['AccountName'], current_dict['AccountName']))

    if current_dict['Endpoint'] != pending_dict['Endpoint']:
        logger.error("set: Attempting to modify user for Endpoint %s other than current Endpoint %s" % (
            pending_dict['Endpoint'], current_dict['Endpoint']))
        raise ValueError("Attempting to modify user for Endpoint %s other than current Endpoint %s" % (
            pending_dict['Endpoint'], current_dict['Endpoint']))

    conn = get_connection(current_dict)
    if not conn:
        logger.error("set: Unable to access the given database using current credentials for secret %s" % secret_name)
        raise ValueError("Unable to access the given database using current credentials for secret %s" % secret_name)
    conn.close()

    master_secret = current_dict['MasterSecret']
    master_dict = get_secret_dict(client, master_secret, "ACSCurrent")
    if current_dict['Endpoint'] != master_dict['Endpoint'] and not is_rds_replica_database(current_dict, master_dict):
        logger.error("set: Current database Endpoint %s is not the same Endpoint as/rds replica of master %s" % (
            current_dict['Endpoint'], master_dict['Endpoint']))
        raise ValueError("Current database Endpoint %s is not the same Endpoint as/rds replica of master %s" % (
            current_dict['Endpoint'], master_dict['Endpoint']))

    conn = get_connection(master_dict)
    if not conn:
        logger.error(
            "set: Unable to access the given database using credentials in master secret secret %s" % master_secret)
        raise ValueError("Unable to access the given database using credentials in master secret secret %s" % master_secret)

    try:
        with conn.cursor() as cur:
            cur.execute("SELECT User FROM mysql.user WHERE User = %s", pending_dict['AccountName'])
            if cur.rowcount == 0:
                cur.execute("CREATE USER %s IDENTIFIED BY %s",
                            (pending_dict['AccountName'], pending_dict['AccountPassword']))

            cur.execute("SHOW GRANTS FOR %s", current_dict['AccountName'])
            for row in cur.fetchall():
                if 'XA_RECOVER_ADMIN' in row[0]:
                    continue
                grant = row[0].split(' TO ')
                new_grant_escaped = grant[0].replace('%', '%%')  # % is a special character in Python format strings.
                cur.execute(new_grant_escaped + " TO %s ", (pending_dict['AccountName'],))
            cur.execute("SELECT VERSION()")
            ver = cur.fetchone()[0]

            escaped_encryption_statement = get_escaped_encryption_statement(ver)
            cur.execute("SELECT ssl_type, ssl_cipher, x509_issuer, x509_subject FROM mysql.user WHERE User = %s",
                        current_dict['AccountName'])
            tls_options = cur.fetchone()
            ssl_type = tls_options[0]
            if not ssl_type:
                cur.execute(escaped_encryption_statement + " NONE", pending_dict['AccountName'])
            elif ssl_type == "ANY":
                cur.execute(escaped_encryption_statement + " SSL", pending_dict['AccountName'])
            elif ssl_type == "X509":
                cur.execute(escaped_encryption_statement + " X509", pending_dict['AccountName'])
            else:
                cur.execute(escaped_encryption_statement + " CIPHER %s AND ISSUER %s AND SUBJECT %s",
                            (pending_dict['AccountName'], tls_options[1], tls_options[2], tls_options[3]))

            password_option = get_password_option(ver)
            cur.execute("SET PASSWORD FOR %s = " + password_option,
                        (pending_dict['AccountName'], pending_dict['AccountPassword']))
            conn.commit()
            logger.info("set: Successfully changed password for %s in MySQL DB for secret secret_name %s." % (
                pending_dict['AccountName'], secret_name))
    finally:
        conn.close()


def test_phase(client, secret_name, version_id):
    conn = get_connection(get_secret_dict(client, secret_name, "ACSPending", version_id))
    if conn:
        try:
            with conn.cursor() as cur:
                cur.execute("SELECT NOW()")
                conn.commit()
        finally:
            conn.close()

        logger.info("test: Successfully accessed into MySQL DB with ACSPending secret in %s." % secret_name)
        return
    else:
        logger.error(
            "test: Unable to access the given database with pending secret of secret secret_name %s" % secret_name)
        raise ValueError("Unable to access the given database with pending secret of secret secret_name %s" % secret_name)


def end_phase(client, secret_name, version_id):
    update_secret_version_stage(client, secret_name, 'ACSCurrent', move_to_version=version_id)
    update_secret_version_stage(client, secret_name, 'ACSPending', remove_from_version=version_id)
    logger.info(
        "end: Successfully update ACSCurrent stage to version %s for secret %s." % (version_id, secret_name))


def get_connection(secret_dict):
    port = int(secret_dict['Port']) if 'Port' in secret_dict else 3306
    dbname = secret_dict['DBName'] if 'DBName' in secret_dict else None

    use_ssl, fall_back = get_ssl_config(secret_dict)

    conn = connect_and_authenticate(secret_dict, port, dbname, use_ssl)
    if conn or not fall_back:
        return conn
    else:
        return connect_and_authenticate(secret_dict, port, dbname, False)


def get_ssl_config(secret_dict):
    if 'SSL' not in secret_dict:
        return True, True

    if isinstance(secret_dict['SSL'], bool):
        return secret_dict['SSL'], False

    if isinstance(secret_dict['SSL'], str):
        ssl = secret_dict['SSL'].lower()
        if ssl == "true":
            return True, False
        elif ssl == "false":
            return False, False
        else:
            return True, True

    return True, True


def connect_and_authenticate(secret_dict, port, dbname, use_ssl):
    ssl = {'ca': '/opt/python/certs/cert.pem'} if use_ssl else None

    try:
        conn = pymysql.connect(host=secret_dict['Endpoint'], user=secret_dict['AccountName'],
                               password=secret_dict['AccountPassword'],
                               port=port, database=dbname, connect_timeout=5, ssl=ssl)
        logger.info("Successfully established %s connection as user '%s' with Endpoint: '%s'" % (
            "SSL/TLS" if use_ssl else "non SSL/TLS", secret_dict['AccountName'], secret_dict['Endpoint']))
        return conn
    except pymysql.OperationalError as e:
        if 'certificate verify failed: IP address mismatch' in e.args[1]:
            logger.error(
                "Hostname verification failed when estlablishing SSL/TLS Handshake with Endpoint: %s" % secret_dict[
                    'Endpoint'])
        return None


def get_secret_dict(client, secret_name, stage, version_id=None):
    required_fields = ['Endpoint', 'AccountName', 'AccountPassword']
    if version_id:
        secret = get_secret_value(client, secret_name, version_id, stage)
    else:
        secret = get_secret_value(client, secret_name, stage=stage)
    plaintext = secret['SecretData']
    secret_dict = json.loads(plaintext)
    for field in required_fields:
        if field not in secret_dict:
            raise KeyError("%s key is missing from secret JSON" % field)
    return secret_dict


def get_alt_account_name(current_account_name):
    rotation_suffix = "_rt"
    if current_account_name.endswith(rotation_suffix):
        return current_account_name[:(len(rotation_suffix) * -1)]
    else:
        new_account_name = current_account_name + rotation_suffix
        if len(new_account_name) > 16:
            raise ValueError(
                "Unable to rotate user, account_name length with _rotation appended would exceed 16 characters")
        return new_account_name


def get_password_option(version):
    if version.startswith("8"):
        return "%s"
    else:
        return "PASSWORD(%s)"


def get_escaped_encryption_statement(version):
    if version.startswith("5.6"):
        return "GRANT USAGE ON *.* TO %s@'%%' REQUIRE"
    else:
        return "ALTER USER %s@'%%' REQUIRE"


def is_rds_replica_database(client, replica_dict, master_dict):
    replica_instance_id = replica_dict['Endpoint'].split(".")[0].replace('io', '')
    master_instance_id = master_dict['Endpoint'].split(".")[0].replace('io', '')
    try:
        describe_response = describe_db_instances(client, replica_instance_id)
    except Exception as err:
        logger.warning("Encountered error while verifying rds replica status: %s" % err)
        return False
    items = describe_response['Items']
    instances = items.get("DBInstance")
    if not instances:
        logger.info("Cannot verify replica status - no RDS instance found with identifier: %s" % replica_instance_id)
        return False

    current_instance = instances[0]
    return master_instance_id == current_instance.get('DBInstanceId')


def get_secret_value(client, secret_name, version_id=None, stage=None):
    request = GetSecretValueRequest()
    request.set_accept_format('json')
    request.set_SecretName(secret_name)
    if version_id:
        request.set_VersionId(version_id)
    if stage:
        request.set_VersionStage(stage)
    response = client.do_action_with_exception(request)
    return json.loads(response)


def put_secret_value(client, secret_name, version_id, secret_data, version_stages=None):
    request = PutSecretValueRequest()
    request.set_accept_format('json')
    request.set_SecretName(secret_name)
    request.set_VersionId(version_id)
    if version_stages:
        request.set_VersionStages(version_stages)
    request.set_SecretData(secret_data)
    response = client.do_action_with_exception(request)
    return json.loads(response)


def get_random_password(client, exclude_characters=None):
    request = GetRandomPasswordRequest()
    request.set_accept_format('json')
    if exclude_characters:
        request.set_ExcludeCharacters(exclude_characters)
    response = client.do_action_with_exception(request)
    return json.loads(response)


def update_secret_version_stage(client, secret_name, version_stage, remove_from_version=None, move_to_version=None):
    request = UpdateSecretVersionStageRequest()
    request.set_accept_format('json')
    request.set_VersionStage(version_stage)
    request.set_SecretName(secret_name)
    if remove_from_version:
        request.set_RemoveFromVersion(remove_from_version)
    if move_to_version:
        request.set_MoveToVersion(move_to_version)
    response = client.do_action_with_exception(request)
    return json.loads(response)


def describe_db_instances(client, db_instance_id):
    request = DescribeDBInstancesRequest()
    request.set_accept_format('json')
    request.set_DBInstanceId(db_instance_id)
    response = client.do_action_with_exception(request)
    return json.loads(response)
  1. Buat layer kustom untuk mengemas dependensi PyMySQL (dan opsional sertifikat SSL):

    1. Jalankan perintah berikut untuk membuat paket layer:

      mkdir my-secret-rotate
      cd my-secret-rotate
      pip install --target ./python pymysql
      # If SSL is required, create the certs directory and add your certificate:
      # mkdir python/certs && cp cert.pem python/certs/
      zip -r my-secret-rotate.zip python
    2. Di konsol Function Compute, buat layer kustom menggunakan paket ZIP. Lihat Buat layer kustom.

  2. Sambungkan layer kustom ke fungsi Anda. Di halaman Services, klik Functions untuk layanan Anda, temukan fungsi, klik Configure, dan tambahkan layer di bagian Layers. Lihat Kelola layer.

Langkah 4: Buat alur rotasi CloudFlow

  1. Masuk ke konsol CloudFlow dan pilih wilayah yang sama dengan fungsi Anda.

  2. Di halaman Flows, klik Create flow.

  3. Di halaman Create Flow, klik Create Flow with Code, atur YAML Definition ke berikut, lalu klik Next Step:

version: v1
type: flow
steps:
  - type: task
    name: RotateSecretNew
    resourceArn: <function-arn>
    inputMappings:
      - target: SecretName
        source: $input.payload.SecretName
      - target: RegionId
        source: $input.payload.RegionId
      - target: Step
        source: new
  - type: task
    name: RotateSecretSet
    resourceArn: <function-arn>
    inputMappings:
      - target: SecretName
        source: $input.payload.SecretName
      - target: RegionId
        source: $input.payload.RegionId
      - target: Step
        source: set
      - target: VersionId
        source: $local.VersionId
  - type: task
    name: RotateSecretTest
    resourceArn: <function-arn>
    inputMappings:
      - target: SecretName
        source: $input.payload.SecretName
      - target: RegionId
        source: $input.payload.RegionId
      - target: Step
        source: test
      - target: VersionId
        source: $local.VersionId
  - type: task
    name: RotateSecretEnd
    resourceArn: <function-arn>
    inputMappings:
      - target: SecretName
        source: $input.payload.SecretName
      - target: RegionId
        source: $input.payload.RegionId
      - target: Step
        source: end
      - target: VersionId
        source: $local.VersionId

Ganti <function-arn> dengan ARN dari fungsi yang Anda buat.

  1. Konfigurasikan peran alur, lalu klik Create Flow.

  2. Untuk menjadwalkan rotasi otomatis, buat jadwal berbasis waktu dan atur Payload ke:

{
    "SecretName": "<secret-name>",
    "RegionId": "<region-id>"
}
FieldDescription
SecretNameNama rahasia generik akun reguler di KMS
RegionIdWilayah tempat rahasia dan fungsi dideploy

Langkah 5: Verifikasi rotasi

Picu eksekusi secara manual untuk memastikan pengaturan berfungsi:

  1. Di konsol CloudFlow, buka alur Anda dan klik Start Execution dengan JSON payload dari Langkah 4.

  2. Tunggu hingga eksekusi selesai. Keempat langkah (RotateSecretNew, RotateSecretSet, RotateSecretTest, RotateSecretEnd) harus menunjukkan status Succeeded.

  3. Di konsol KMS, buka rahasia dan periksa riwayat versinya. Label ACSCurrent harus mengarah ke versi baru.

Langkah 6: (Opsional) Hubungkan aplikasi ke Secrets Manager

Gunakan Secrets Manager JDBC untuk mengambil kredensial yang telah dirotasi secara otomatis. Lihat Secrets Manager JDBC.

Langkah selanjutnya