All Products
Search
Document Center

Key Management Service:Rotate generic secrets by using Function Compute

Last Updated:Mar 31, 2026

Automate the rotation of generic secrets stored in KMS Secrets Manager using Function Compute and CloudFlow. This guide covers two database rotation scenarios: ApsaraDB RDS instances accessed through the RDS API, and self-managed MySQL databases accessed through a privileged account.

Important

KMS Secrets Manager provides dynamic secrets with built-in automatic rotation for ApsaraDB RDS, RAM, and ECS. If dynamic secrets cover your use case, use them instead — they cost less and require no custom code. See Overview of dynamic ApsaraDB RDS secrets, Overview of dynamic RAM secrets, or Overview of dynamic ECS secrets.

Billing

KMS secret rotation is free. Charges apply for Function Compute and CloudFlow. See Billing of Function Compute and Billing of CloudFlow.

Choose a rotation strategy

Both scenarios in this guide use dual-account (alternating user) rotation. This strategy maintains two database accounts — only one is active at a time — so your application always has valid credentials during rotation. The inactive account gets the new credentials, then becomes the active one.

StrategyHow it worksAvailability during rotationBest for
Single-accountUpdates credentials for one account in placeBrief gap while credentials updateSimple setups, ad hoc users
Dual-accountAlternates between two accounts; one is always activeNo interruptionProduction databases, high-availability requirements

The rotation suffix _rt identifies the alternate account. For example, if the active account is appuser, the alternate account is appuser_rt. Account names including the suffix cannot exceed 16 characters.

How it works

Both scenarios follow the same CloudFlow-orchestrated process. A CloudFlow time-based schedule triggers the flow, which calls the same Function Compute function multiple times with different Step values:

StepWhat the function doesIdempotency
newGenerates a new password, writes it to the pending secret version (ACSPending)If ACSPending already exists for the current version ID, skips secret creation
setCreates or updates the alternate database account with the pending credentialsSafe to retry — verifies account state before making changes
testVerifies the pending credentials can connect to the database (MySQL scenario only)Read-only connectivity check
endPromotes ACSPending to ACSCurrent, completing the rotationStage update is atomic

Each step is idempotent, so the flow is safe to retry after partial failures.

Scenario 1: Rotate an ApsaraDB RDS generic secret

Use this approach when your database runs on ApsaraDB RDS and you want to rotate credentials using the RDS API.

Prerequisites

Before you begin, make sure you have:

  • An active Function Compute account. See Activate Function Compute.

  • An ApsaraDB RDS database account and a generic secret for it in the KMS console.

  • A Function Compute service role with the permissions listed in Step 1.

Step 1: Set up permissions

Create a service role for Function Compute and attach the following policies:

  1. AliyunFCDefaultRolePolicy — system policy

  2. AliyunSTSAssumeRoleAccess — system policy

  3. A custom policy granting KMS and RDS access:

{
    "Version": "1",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kms:GetSecretValue",
                "kms:GetRandomPassword",
                "kms:PutSecretValue",
                "kms:UpdateSecretVersionStage"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "rds:GrantAccountPrivilege",
                "rds:DescribeAccounts",
                "rds:ResetAccountPassword",
                "rds:CreateAccount"
            ],
            "Resource": "*"
        }
    ]
}

See Grant Function Compute permissions to access other Alibaba Cloud services for instructions on attaching policies.

Step 2: Create the generic secret

Create a generic secret in the KMS console with the following JSON as the secret value. The secret name is later passed as a scheduling parameter to CloudFlow.

{
    "AccountName": "<rds-account-name>",
    "AccountPassword": "<rds-account-password>"
}
FieldDescription
AccountNameUsername of the RDS database account
AccountPasswordPassword of the RDS database account

Step 3: Create the rotation function

  1. In the Function Compute console, create a service. Click Show Advanced Options, set Access to VPC to Yes, and configure VPC, vSwitch, and Security Group. The security group must allow the function to reach both the RDS API and the KMS VPC endpoint (kms-vpc.<region-id>.aliyuncs.com).

  2. Create a function with Runtime Environments set to Python 3.9. Use the following code:

# -*- coding: utf-8 -*-
import json
import logging
import os

from aliyunsdkrds.request.v20140815.CreateAccountRequest import CreateAccountRequest
from aliyunsdkrds.request.v20140815.DescribeAccountsRequest import DescribeAccountsRequest
from aliyunsdkrds.request.v20140815.GrantAccountPrivilegeRequest import GrantAccountPrivilegeRequest
from aliyunsdkrds.request.v20140815.ResetAccountPasswordRequest import ResetAccountPasswordRequest

from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkcore.auth.credentials import StsTokenCredential
from aliyunsdkcore.client import AcsClient
from aliyunsdkkms.request.v20160120.GetRandomPasswordRequest import GetRandomPasswordRequest
from aliyunsdkkms.request.v20160120.GetSecretValueRequest import GetSecretValueRequest
from aliyunsdkkms.request.v20160120.PutSecretValueRequest import PutSecretValueRequest
from aliyunsdkkms.request.v20160120.UpdateSecretVersionStageRequest import UpdateSecretVersionStageRequest

logger = logging.getLogger()
logger.setLevel(logging.INFO)


def handler(event, context):
    evt = json.loads(event)
    secret_name = evt['SecretName']
    region_id = evt['RegionId']
    step = evt['Step']
    instance_id = evt['InstanceId']
    version_id = evt.get('VersionId')
    if not version_id:
        version_id = context.requestId
    credentials = StsTokenCredential(context.credentials.accessKeyId, context.credentials.accessKeySecret,
                                     context.credentials.securityToken)
    client = AcsClient(region_id=region_id, credential=credentials)

    endpoint = "kms-vpc." + region_id + ".aliyuncs.com"
    client.add_endpoint(region_id, 'kms', endpoint)
    resp = get_secret_value(client, secret_name)
    if "Generic" != resp['SecretType']:
        logger.error("Secret %s is not enabled for rotation" % secret_name)
        raise ValueError("Secret %s is not enabled for rotation" % secret_name)

    if step == "new":
        new_phase(client, secret_name, version_id)

    elif step == "set":
        set_phase(client, instance_id, secret_name, version_id)

    elif step == "end":
        end_phase(client, secret_name, version_id)

    else:
        logger.error("handler: Invalid step parameter %s for secret %s" % (step, secret_name))
        raise ValueError("Invalid step parameter %s for secret %s" % (step, secret_name))
    return {"VersionId": version_id}


def new_phase(client, secret_name, version_id):
    current_dict = get_secret_dict(client, secret_name, "ACSCurrent")
    try:
        get_secret_dict(client, secret_name, "ACSPending", version_id)
        logger.info("new: Successfully retrieved secret for %s." % secret_name)
    except ServerException as e:
        if e.error_code != 'Forbidden.ResourceNotFound':
            raise ValueError("Can to find secret %s " % (secret_name))
        current_dict['AccountName'] = get_alt_account_name(current_dict['AccountName'])

        exclude_characters = os.environ[
            'EXCLUDE_CHARACTERS'] if 'EXCLUDE_CHARACTERS' in os.environ else "\\\"\',./:;<>?[]{|}~`"
        passwd = get_random_password(client, exclude_characters)
        current_dict['AccountPassword'] = passwd['RandomPassword']
        put_secret_value(client, secret_name, version_id, json.dumps(current_dict),
                         json.dumps(['ACSPending']))
        logger.info(
            "new: Successfully put secret for secret_name %s and version %s." % (secret_name, version_id))


def set_phase(client, instance_id, secret_name, version_id):
    current_dict = get_secret_dict(client, secret_name, "ACSCurrent")
    pending_dict = get_secret_dict(client, secret_name, "ACSPending", version_id)
    pending_resp = describe_accounts(client, instance_id, pending_dict["AccountName"])
    pending_accounts = pending_resp["Accounts"]["DBInstanceAccount"]

    if get_alt_account_name(current_dict['AccountName']) != pending_dict['AccountName']:
        logger.error("set: Attempting to modify user %s other than current user or rotation %s" % (
            pending_dict['AccountName'], current_dict['AccountName']))
        raise ValueError("Attempting to modify user %s other than current user or rotation %s" % (
            pending_dict['AccountName'], current_dict['AccountName']))

    current_resp = describe_accounts(client, instance_id, current_dict["AccountName"])
    current_accounts = current_resp["Accounts"]["DBInstanceAccount"]
    if len(current_accounts) == 0:
        logger.error("set: Unable to log into database using current credentials for secret %s" % secret_name)
        raise ValueError("Unable to log into database using current credentials for secret %s" % secret_name)
    if len(pending_accounts) == 0:
        create_rds_account(client, instance_id, pending_dict["AccountName"],
                           pending_dict["AccountPassword"], current_accounts[0]["AccountType"])
        pending_accounts = describe_accounts(client, instance_id, pending_dict["AccountName"])["Accounts"][
            "DBInstanceAccount"]
    else:
        # reset password
        reset_account_password(client, instance_id, pending_dict["AccountName"], pending_dict["AccountPassword"])

    current_privileges = current_accounts[0]["DatabasePrivileges"]["DatabasePrivilege"]
    pending_privileges = pending_accounts[0]["DatabasePrivileges"]["DatabasePrivilege"]
    if len(current_privileges) > 0:
        for current_privilege in current_privileges:
            is_contains = False
            for pending_privilege in pending_privileges:
                if current_privilege["DBName"] == pending_privilege["DBName"] and current_privilege[
                    "AccountPrivilege"] == pending_privilege["AccountPrivilege"]:
                    is_contains = True
                    continue
            if not is_contains:
                grant_account_privilege(client, instance_id, pending_dict["AccountName"], current_privilege["DBName"],
                                        current_privilege["AccountPrivilege"])


def end_phase(client, secret_name, version_id):
    update_secret_version_stage(client, secret_name, 'ACSCurrent', move_to_version=version_id)
    update_secret_version_stage(client, secret_name, 'ACSPending', remove_from_version=version_id)
    logger.info(
        "end: Successfully set ACSCurrent stage to version %s for secret %s." % (version_id, secret_name))


def get_secret_dict(client, secret_name, stage, version_id=None):
    required_fields = ['AccountName', 'AccountPassword']
    if version_id:
        secret = get_secret_value(client, secret_name, version_id, stage)
    else:
        secret = get_secret_value(client, secret_name, stage=stage)
    plaintext = secret['SecretData']
    secret_dict = json.loads(plaintext)
    for field in required_fields:
        if field not in secret_dict:
            raise KeyError("%s key is missing from secret JSON" % field)
    return secret_dict


def get_alt_account_name(current_account_name):
    rotation_suffix = "_rt"
    if current_account_name.endswith(rotation_suffix):
        return current_account_name[:(len(rotation_suffix) * -1)]
    else:
        new_account_name = current_account_name + rotation_suffix
        if len(new_account_name) > 16:
            raise ValueError(
                "Unable to rotation user, account_name length with _rotation appended would exceed 16 characters")
        return new_account_name


def get_secret_value(client, secret_name, version_id=None, stage=None):
    request = GetSecretValueRequest()
    request.set_accept_format('json')
    request.set_SecretName(secret_name)
    if version_id:
        request.set_VersionId(version_id)
    if stage:
        request.set_VersionStage(stage)
    response = client.do_action_with_exception(request)
    return json.loads(response)


def put_secret_value(client, secret_name, version_id, secret_data, version_stages=None):
    request = PutSecretValueRequest()
    request.set_accept_format('json')
    request.set_SecretName(secret_name)
    request.set_VersionId(version_id)
    if version_stages:
        request.set_VersionStages(version_stages)
    request.set_SecretData(secret_data)
    response = client.do_action_with_exception(request)
    return json.loads(response)


def get_random_password(client, exclude_characters=None):
    request = GetRandomPasswordRequest()
    request.set_accept_format('json')
    if exclude_characters:
        request.set_ExcludeCharacters(exclude_characters)
    response = client.do_action_with_exception(request)
    return json.loads(response)


def update_secret_version_stage(client, secret_name, version_stage, remove_from_version=None, move_to_version=None):
    request = UpdateSecretVersionStageRequest()
    request.set_accept_format('json')
    request.set_VersionStage(version_stage)
    request.set_SecretName(secret_name)
    if remove_from_version:
        request.set_RemoveFromVersion(remove_from_version)
    if move_to_version:
        request.set_MoveToVersion(move_to_version)
    response = client.do_action_with_exception(request)
    return json.loads(response)


def create_rds_account(client, db_instance_id, account_name, account_password, account_type):
    request = CreateAccountRequest()
    request.set_accept_format('json')
    request.set_DBInstanceId(db_instance_id)
    request.set_AccountName(account_name)
    request.set_AccountPassword(account_password)
    request.set_AccountType(account_type)
    response = client.do_action_with_exception(request)
    return json.loads(response)


def grant_account_privilege(client, db_instance_id, account_name, db_name, account_privilege):
    request = GrantAccountPrivilegeRequest()
    request.set_accept_format('json')
    request.set_DBInstanceId(db_instance_id)
    request.set_AccountName(account_name)
    request.set_DBName(db_name)
    request.set_AccountPrivilege(account_privilege)
    response = client.do_action_with_exception(request)
    return json.loads(response)


def describe_accounts(client, db_instance_id, account_name):
    request = DescribeAccountsRequest()
    request.set_accept_format('json')
    request.set_DBInstanceId(db_instance_id)
    request.set_AccountName(account_name)
    response = client.do_action_with_exception(request)
    return json.loads(response)


def reset_account_password(client, db_instance_id, account_name, account_password):
    request = ResetAccountPasswordRequest()
    request.set_accept_format('json')
    request.set_DBInstanceId(db_instance_id)
    request.set_AccountName(account_name)
    request.set_AccountPassword(account_password)
    response = client.do_action_with_exception(request)
    return json.loads(response)

Step 4: Create the CloudFlow rotation flow

  1. Log on to the CloudFlow console and select the same region as your Function Compute functions.

  2. On the Flows page, click Create flow.

  3. On the Create Flow page, click Create Flow with Code, set the Definition YAML to the following, and click Next Step:

version: v1
type: flow
steps:
  - type: task
    name: RotateSecretNew
    resourceArn: <function-arn>
    inputMappings:
      - target: SecretName
        source: $input.payload.SecretName
      - target: RegionId
        source: $input.payload.RegionId
      - target: InstanceId
        source: $input.payload.InstanceId
      - target: Step
        source: new
  - type: task
    name: RotateSecretSet
    resourceArn: <function-arn>
    inputMappings:
      - target: SecretName
        source: $input.payload.SecretName
      - target: RegionId
        source: $input.payload.RegionId
      - target: InstanceId
        source: $input.payload.InstanceId
      - target: Step
        source: set
      - target: VersionId
        source: $local.VersionId
  - type: task
    name: RotateSecretEnd
    resourceArn: <function-arn>
    inputMappings:
      - target: SecretName
        source: $input.payload.SecretName
      - target: RegionId
        source: $input.payload.RegionId
      - target: InstanceId
        source: $input.payload.InstanceId
      - target: Step
        source: end
      - target: VersionId
        source: $local.VersionId

Replace <function-arn> with the Alibaba Cloud Resource Name (ARN) of the function you created.

  1. Configure a flow role, then click Create Flow.

  2. To schedule automatic rotation, create a time-based schedule. See Create a time-based schedule. Set Payload to:

{
    "SecretName": "<secret-name>",
    "RegionId": "<region-id>",
    "InstanceId": "<rds-instance-id>"
}
FieldDescription
SecretNameName of the generic secret in KMS
RegionIdRegion where the secret and function are deployed
InstanceIdID of the ApsaraDB RDS instance

Step 5: Verify the rotation

After setting up the flow, manually trigger an execution to confirm everything works:

  1. In the CloudFlow console, open your flow and click Start Execution with the same payload JSON from Step 4.

  2. Wait for the execution to complete. All three steps (RotateSecretNew, RotateSecretSet, RotateSecretEnd) should show a Succeeded status.

  3. In the KMS console, open the secret and check its version history. The ACSCurrent label should now point to a new version.

Step 6: (Optional) Connect applications to Secrets Manager

Use Secrets Manager JDBC to retrieve the rotated credentials in your application without code changes. See Secrets Manager JDBC.

Scenario 2: Rotate a self-managed MySQL generic secret

Use this approach when your database is a self-managed MySQL instance. This scenario uses a privileged account to manage credential rotation and adds a test step to verify connectivity before completing the rotation.

Prerequisites

Before you begin, make sure you have:

  • An active Function Compute account. See Activate Function Compute.

  • A privileged MySQL account and a generic secret for it in the KMS console.

  • A second generic secret for the regular account to be rotated.

  • A Function Compute service role with the permissions listed in Step 1.

Step 1: Set up permissions

Create a service role for Function Compute and attach the following policies:

  1. AliyunFCDefaultRolePolicy — system policy

  2. AliyunSTSAssumeRoleAccess — system policy

  3. A custom policy granting KMS access:

{
    "Version": "1",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kms:GetSecretValue",
                "kms:GetRandomPassword",
                "kms:Decrypt",
                "kms:GenerateDataKey",
                "kms:PutSecretValue",
                "kms:UpdateSecretVersionStage"
            ],
            "Resource": "*"
        }
    ]
}

Step 2: Create the generic secrets

Create two generic secrets in the KMS console.

Privileged account secret — used to create and manage regular accounts:

{
    "Endpoint": "<mysql-host>",
    "AccountName": "<privileged-account-name>",
    "AccountPassword": "<privileged-account-password>",
    "SSL": false
}
FieldDescriptionDefault
EndpointDomain name or IP address of the MySQL database
AccountNameUsername of the privileged account
AccountPasswordPassword of the privileged account
SSLWhether to use an SSL certificate for the connectionfalse

Regular account secret — the secret that will be rotated. Its name is passed as the scheduling parameter to CloudFlow:

{
    "Endpoint": "<mysql-host>",
    "AccountName": "<account-name>",
    "AccountPassword": "<account-password>",
    "MasterSecret": "<privileged-secret-name>",
    "SSL": false
}
FieldDescriptionDefault
EndpointDomain name or IP address of the MySQL database
AccountNameUsername of the account to rotate
AccountPasswordPassword of the account to rotate
MasterSecretName of the privileged account secret in KMS
SSLWhether to use an SSL certificate for the connectionfalse
When SSL is true, the function expects the SSL certificate at /opt/python/certs/cert.pem. Add the certificate to a custom layer (see Step 3).

Step 3: Create the rotation function

  1. Create a Function Compute service with VPC access enabled (same configuration as Scenario 1). Make sure the function can reach the MySQL instance and the KMS VPC endpoint.

  2. Create a function with Runtime Environments set to Python 3.9. Use the following code:

# -*- coding: utf-8 -*-
import json
import logging
import os

try:
    import pymysql
except:
    os.system('pip install pymysql -t ./')
    import pymysql
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkcore.auth.credentials import StsTokenCredential
from aliyunsdkcore.client import AcsClient
from aliyunsdkkms.request.v20160120.GetRandomPasswordRequest import GetRandomPasswordRequest
from aliyunsdkkms.request.v20160120.GetSecretValueRequest import GetSecretValueRequest
from aliyunsdkkms.request.v20160120.PutSecretValueRequest import PutSecretValueRequest
from aliyunsdkkms.request.v20160120.UpdateSecretVersionStageRequest import UpdateSecretVersionStageRequest
from aliyunsdkrds.request.v20140815.DescribeDBInstancesRequest import DescribeDBInstancesRequest

logger = logging.getLogger()
logger.setLevel(logging.INFO)


def handler(event, context):
    evt = json.loads(event)
    secret_name = evt['SecretName']
    region_id = evt['RegionId']
    step = evt['Step']
    version_id = evt.get('VersionId')
    if not version_id:
        version_id = context.requestId
    credentials = StsTokenCredential(context.credentials.accessKeyId, context.credentials.accessKeySecret,
                                     context.credentials.securityToken)
    client = AcsClient(region_id=region_id, credential=credentials)

    endpoint = "kms-vpc." + region_id + ".aliyuncs.com"
    client.add_endpoint(region_id, 'kms', endpoint)
    resp = get_secret_value(client, secret_name)
    if "Generic" != resp['SecretType']:
        logger.error("Secret %s is not enabled for rotation" % secret_name)
        raise ValueError("Secret %s is not enabled for rotation" % secret_name)

    if step == "new":
        new_phase(client, secret_name, version_id)

    elif step == "set":
        set_phase(client, secret_name, version_id)

    elif step == "test":
        test_phase(client, secret_name, version_id)

    elif step == "end":
        end_phase(client, secret_name, version_id)

    else:
        logger.error("handler: Invalid step parameter %s for secret %s" % (step, secret_name))
        raise ValueError("Invalid step parameter %s for secret %s" % (step, secret_name))
    return {"VersionId": version_id}


def new_phase(client, secret_name, version_id):
    current_dict = get_secret_dict(client, secret_name, "ACSCurrent")
    try:
        get_secret_dict(client, secret_name, "ACSPending", version_id)
        logger.info("new: Successfully retrieved secret for %s." % secret_name)
    except ServerException as e:
        if e.error_code != 'Forbidden.ResourceNotFound':
            raise
        current_dict['AccountName'] = get_alt_account_name(current_dict['AccountName'])

        exclude_characters = os.environ['EXCLUDE_CHARACTERS'] if 'EXCLUDE_CHARACTERS' in os.environ else '/@"\'\\'
        passwd = get_random_password(client, exclude_characters)
        current_dict['AccountPassword'] = passwd['RandomPassword']
        put_secret_value(client, secret_name, version_id, json.dumps(current_dict),
                         json.dumps(['ACSPending']))
        logger.info(
            "new: Successfully put secret for secret_name %s and version %s." % (secret_name, version_id))


def set_phase(client, secret_name, version_id):
    current_dict = get_secret_dict(client, secret_name, "ACSCurrent")
    pending_dict = get_secret_dict(client, secret_name, "ACSPending", version_id)

    conn = get_connection(pending_dict)
    if conn:
        conn.close()
        logger.info(
            "set: ACSPending secret is already set as password in MySQL DB for secret secret_name %s." % secret_name)
        return

    if get_alt_account_name(current_dict['AccountName']) != pending_dict['AccountName']:
        logger.error("set: Attempting to modify user %s other than current user or rotation %s" % (
            pending_dict['AccountName'], current_dict['AccountName']))
        raise ValueError("Attempting to modify user %s other than current user or rotation %s" % (
            pending_dict['AccountName'], current_dict['AccountName']))

    if current_dict['Endpoint'] != pending_dict['Endpoint']:
        logger.error("set: Attempting to modify user for Endpoint %s other than current Endpoint %s" % (
            pending_dict['Endpoint'], current_dict['Endpoint']))
        raise ValueError("Attempting to modify user for Endpoint %s other than current Endpoint %s" % (
            pending_dict['Endpoint'], current_dict['Endpoint']))

    conn = get_connection(current_dict)
    if not conn:
        logger.error("set: Unable to access the given database using current credentials for secret %s" % secret_name)
        raise ValueError("Unable to access the given database using current credentials for secret %s" % secret_name)
    conn.close()

    master_secret = current_dict['MasterSecret']
    master_dict = get_secret_dict(client, master_secret, "ACSCurrent")
    if current_dict['Endpoint'] != master_dict['Endpoint'] and not is_rds_replica_database(current_dict, master_dict):
        logger.error("set: Current database Endpoint %s is not the same Endpoint as/rds replica of master %s" % (
            current_dict['Endpoint'], master_dict['Endpoint']))
        raise ValueError("Current database Endpoint %s is not the same Endpoint as/rds replica of master %s" % (
            current_dict['Endpoint'], master_dict['Endpoint']))

    conn = get_connection(master_dict)
    if not conn:
        logger.error(
            "set: Unable to access the given database using credentials in master secret secret %s" % master_secret)
        raise ValueError("Unable to access the given database using credentials in master secret secret %s" % master_secret)

    try:
        with conn.cursor() as cur:
            cur.execute("SELECT User FROM mysql.user WHERE User = %s", pending_dict['AccountName'])
            if cur.rowcount == 0:
                cur.execute("CREATE USER %s IDENTIFIED BY %s",
                            (pending_dict['AccountName'], pending_dict['AccountPassword']))

            cur.execute("SHOW GRANTS FOR %s", current_dict['AccountName'])
            for row in cur.fetchall():
                if 'XA_RECOVER_ADMIN' in row[0]:
                    continue
                grant = row[0].split(' TO ')
                new_grant_escaped = grant[0].replace('%', '%%')  # % is a special character in Python format strings.
                cur.execute(new_grant_escaped + " TO %s ", (pending_dict['AccountName'],))
            cur.execute("SELECT VERSION()")
            ver = cur.fetchone()[0]

            escaped_encryption_statement = get_escaped_encryption_statement(ver)
            cur.execute("SELECT ssl_type, ssl_cipher, x509_issuer, x509_subject FROM mysql.user WHERE User = %s",
                        current_dict['AccountName'])
            tls_options = cur.fetchone()
            ssl_type = tls_options[0]
            if not ssl_type:
                cur.execute(escaped_encryption_statement + " NONE", pending_dict['AccountName'])
            elif ssl_type == "ANY":
                cur.execute(escaped_encryption_statement + " SSL", pending_dict['AccountName'])
            elif ssl_type == "X509":
                cur.execute(escaped_encryption_statement + " X509", pending_dict['AccountName'])
            else:
                cur.execute(escaped_encryption_statement + " CIPHER %s AND ISSUER %s AND SUBJECT %s",
                            (pending_dict['AccountName'], tls_options[1], tls_options[2], tls_options[3]))

            password_option = get_password_option(ver)
            cur.execute("SET PASSWORD FOR %s = " + password_option,
                        (pending_dict['AccountName'], pending_dict['AccountPassword']))
            conn.commit()
            logger.info("set: Successfully changed password for %s in MySQL DB for secret secret_name %s." % (
                pending_dict['AccountName'], secret_name))
    finally:
        conn.close()


def test_phase(client, secret_name, version_id):
    conn = get_connection(get_secret_dict(client, secret_name, "ACSPending", version_id))
    if conn:
        try:
            with conn.cursor() as cur:
                cur.execute("SELECT NOW()")
                conn.commit()
        finally:
            conn.close()

        logger.info("test: Successfully accessed into MySQL DB with ACSPending secret in %s." % secret_name)
        return
    else:
        logger.error(
            "test: Unable to access the given database with pending secret of secret secret_name %s" % secret_name)
        raise ValueError("Unable to access the given database with pending secret of secret secret_name %s" % secret_name)


def end_phase(client, secret_name, version_id):
    update_secret_version_stage(client, secret_name, 'ACSCurrent', move_to_version=version_id)
    update_secret_version_stage(client, secret_name, 'ACSPending', remove_from_version=version_id)
    logger.info(
        "end: Successfully update ACSCurrent stage to version %s for secret %s." % (version_id, secret_name))


def get_connection(secret_dict):
    port = int(secret_dict['Port']) if 'Port' in secret_dict else 3306
    dbname = secret_dict['DBName'] if 'DBName' in secret_dict else None

    use_ssl, fall_back = get_ssl_config(secret_dict)

    conn = connect_and_authenticate(secret_dict, port, dbname, use_ssl)
    if conn or not fall_back:
        return conn
    else:
        return connect_and_authenticate(secret_dict, port, dbname, False)


def get_ssl_config(secret_dict):
    if 'SSL' not in secret_dict:
        return True, True

    if isinstance(secret_dict['SSL'], bool):
        return secret_dict['SSL'], False

    if isinstance(secret_dict['SSL'], str):
        ssl = secret_dict['SSL'].lower()
        if ssl == "true":
            return True, False
        elif ssl == "false":
            return False, False
        else:
            return True, True

    return True, True


def connect_and_authenticate(secret_dict, port, dbname, use_ssl):
    ssl = {'ca': '/opt/python/certs/cert.pem'} if use_ssl else None

    try:
        conn = pymysql.connect(host=secret_dict['Endpoint'], user=secret_dict['AccountName'],
                               password=secret_dict['AccountPassword'],
                               port=port, database=dbname, connect_timeout=5, ssl=ssl)
        logger.info("Successfully established %s connection as user '%s' with Endpoint: '%s'" % (
            "SSL/TLS" if use_ssl else "non SSL/TLS", secret_dict['AccountName'], secret_dict['Endpoint']))
        return conn
    except pymysql.OperationalError as e:
        if 'certificate verify failed: IP address mismatch' in e.args[1]:
            logger.error(
                "Hostname verification failed when estlablishing SSL/TLS Handshake with Endpoint: %s" % secret_dict[
                    'Endpoint'])
        return None


def get_secret_dict(client, secret_name, stage, version_id=None):
    required_fields = ['Endpoint', 'AccountName', 'AccountPassword']
    if version_id:
        secret = get_secret_value(client, secret_name, version_id, stage)
    else:
        secret = get_secret_value(client, secret_name, stage=stage)
    plaintext = secret['SecretData']
    secret_dict = json.loads(plaintext)
    for field in required_fields:
        if field not in secret_dict:
            raise KeyError("%s key is missing from secret JSON" % field)
    return secret_dict


def get_alt_account_name(current_account_name):
    rotation_suffix = "_rt"
    if current_account_name.endswith(rotation_suffix):
        return current_account_name[:(len(rotation_suffix) * -1)]
    else:
        new_account_name = current_account_name + rotation_suffix
        if len(new_account_name) > 16:
            raise ValueError(
                "Unable to rotate user, account_name length with _rotation appended would exceed 16 characters")
        return new_account_name


def get_password_option(version):
    if version.startswith("8"):
        return "%s"
    else:
        return "PASSWORD(%s)"


def get_escaped_encryption_statement(version):
    if version.startswith("5.6"):
        return "GRANT USAGE ON *.* TO %s@'%%' REQUIRE"
    else:
        return "ALTER USER %s@'%%' REQUIRE"


def is_rds_replica_database(client, replica_dict, master_dict):
    replica_instance_id = replica_dict['Endpoint'].split(".")[0].replace('io', '')
    master_instance_id = master_dict['Endpoint'].split(".")[0].replace('io', '')
    try:
        describe_response = describe_db_instances(client, replica_instance_id)
    except Exception as err:
        logger.warning("Encountered error while verifying rds replica status: %s" % err)
        return False
    items = describe_response['Items']
    instances = items.get("DBInstance")
    if not instances:
        logger.info("Cannot verify replica status - no RDS instance found with identifier: %s" % replica_instance_id)
        return False

    current_instance = instances[0]
    return master_instance_id == current_instance.get('DBInstanceId')


def get_secret_value(client, secret_name, version_id=None, stage=None):
    request = GetSecretValueRequest()
    request.set_accept_format('json')
    request.set_SecretName(secret_name)
    if version_id:
        request.set_VersionId(version_id)
    if stage:
        request.set_VersionStage(stage)
    response = client.do_action_with_exception(request)
    return json.loads(response)


def put_secret_value(client, secret_name, version_id, secret_data, version_stages=None):
    request = PutSecretValueRequest()
    request.set_accept_format('json')
    request.set_SecretName(secret_name)
    request.set_VersionId(version_id)
    if version_stages:
        request.set_VersionStages(version_stages)
    request.set_SecretData(secret_data)
    response = client.do_action_with_exception(request)
    return json.loads(response)


def get_random_password(client, exclude_characters=None):
    request = GetRandomPasswordRequest()
    request.set_accept_format('json')
    if exclude_characters:
        request.set_ExcludeCharacters(exclude_characters)
    response = client.do_action_with_exception(request)
    return json.loads(response)


def update_secret_version_stage(client, secret_name, version_stage, remove_from_version=None, move_to_version=None):
    request = UpdateSecretVersionStageRequest()
    request.set_accept_format('json')
    request.set_VersionStage(version_stage)
    request.set_SecretName(secret_name)
    if remove_from_version:
        request.set_RemoveFromVersion(remove_from_version)
    if move_to_version:
        request.set_MoveToVersion(move_to_version)
    response = client.do_action_with_exception(request)
    return json.loads(response)


def describe_db_instances(client, db_instance_id):
    request = DescribeDBInstancesRequest()
    request.set_accept_format('json')
    request.set_DBInstanceId(db_instance_id)
    response = client.do_action_with_exception(request)
    return json.loads(response)
  1. Create a custom layer to package the PyMySQL dependency (and optionally the SSL certificate):

    1. Run the following commands to build the layer package:

      mkdir my-secret-rotate
      cd my-secret-rotate
      pip install --target ./python pymysql
      # If SSL is required, create the certs directory and add your certificate:
      # mkdir python/certs && cp cert.pem python/certs/
      zip -r my-secret-rotate.zip python
    2. In the Function Compute console, create a custom layer using the ZIP package. See Create a custom layer.

  2. Attach the custom layer to your function. On the Services page, click Functions for your service, find the function, click Configure, and add the layer in the Layers section. See Manage layers.

Step 4: Create the CloudFlow rotation flow

  1. Log on to the CloudFlow console and select the same region as your functions.

  2. On the Flows page, click Create flow.

  3. On the Create Flow page, click Create Flow with Code, set the Definition YAML to the following, and click Next Step:

version: v1
type: flow
steps:
  - type: task
    name: RotateSecretNew
    resourceArn: <function-arn>
    inputMappings:
      - target: SecretName
        source: $input.payload.SecretName
      - target: RegionId
        source: $input.payload.RegionId
      - target: Step
        source: new
  - type: task
    name: RotateSecretSet
    resourceArn: <function-arn>
    inputMappings:
      - target: SecretName
        source: $input.payload.SecretName
      - target: RegionId
        source: $input.payload.RegionId
      - target: Step
        source: set
      - target: VersionId
        source: $local.VersionId
  - type: task
    name: RotateSecretTest
    resourceArn: <function-arn>
    inputMappings:
      - target: SecretName
        source: $input.payload.SecretName
      - target: RegionId
        source: $input.payload.RegionId
      - target: Step
        source: test
      - target: VersionId
        source: $local.VersionId
  - type: task
    name: RotateSecretEnd
    resourceArn: <function-arn>
    inputMappings:
      - target: SecretName
        source: $input.payload.SecretName
      - target: RegionId
        source: $input.payload.RegionId
      - target: Step
        source: end
      - target: VersionId
        source: $local.VersionId

Replace <function-arn> with the ARN of the function you created.

  1. Configure a flow role, then click Create Flow.

  2. To schedule automatic rotation, create a time-based schedule and set Payload to:

{
    "SecretName": "<secret-name>",
    "RegionId": "<region-id>"
}
FieldDescription
SecretNameName of the regular account generic secret in KMS
RegionIdRegion where the secret and function are deployed

Step 5: Verify the rotation

Manually trigger an execution to confirm the setup:

  1. In the CloudFlow console, open your flow and click Start Execution with the payload JSON from Step 4.

  2. Wait for the execution to complete. All four steps (RotateSecretNew, RotateSecretSet, RotateSecretTest, RotateSecretEnd) should show a Succeeded status.

  3. In the KMS console, open the secret and check its version history. The ACSCurrent label should point to a new version.

Step 6: (Optional) Connect applications to Secrets Manager

Use Secrets Manager JDBC to retrieve rotated credentials automatically. See Secrets Manager JDBC.

What's next