Automate the rotation of generic secrets stored in KMS Secrets Manager using Function Compute and CloudFlow. This guide covers two database rotation scenarios: ApsaraDB RDS instances accessed through the RDS API, and self-managed MySQL databases accessed through a privileged account.
KMS Secrets Manager provides dynamic secrets with built-in automatic rotation for ApsaraDB RDS, RAM, and ECS. If dynamic secrets cover your use case, use them instead — they cost less and require no custom code. See Overview of dynamic ApsaraDB RDS secrets, Overview of dynamic RAM secrets, or Overview of dynamic ECS secrets.
Billing
KMS secret rotation is free. Charges apply for Function Compute and CloudFlow. See Billing of Function Compute and Billing of CloudFlow.
Choose a rotation strategy
Both scenarios in this guide use dual-account (alternating user) rotation. This strategy maintains two database accounts — only one is active at a time — so your application always has valid credentials during rotation. The inactive account gets the new credentials, then becomes the active one.
| Strategy | How it works | Availability during rotation | Best for |
|---|---|---|---|
| Single-account | Updates credentials for one account in place | Brief gap while credentials update | Simple setups, ad hoc users |
| Dual-account | Alternates between two accounts; one is always active | No interruption | Production databases, high-availability requirements |
The rotation suffix _rt identifies the alternate account. For example, if the active account is appuser, the alternate account is appuser_rt. Account names including the suffix cannot exceed 16 characters.
How it works
Both scenarios follow the same CloudFlow-orchestrated process. A CloudFlow time-based schedule triggers the flow, which calls the same Function Compute function multiple times with different Step values:
| Step | What the function does | Idempotency |
|---|---|---|
new | Generates a new password, writes it to the pending secret version (ACSPending) | If ACSPending already exists for the current version ID, skips secret creation |
set | Creates or updates the alternate database account with the pending credentials | Safe to retry — verifies account state before making changes |
test | Verifies the pending credentials can connect to the database (MySQL scenario only) | Read-only connectivity check |
end | Promotes ACSPending to ACSCurrent, completing the rotation | Stage update is atomic |
Each step is idempotent, so the flow is safe to retry after partial failures.
Scenario 1: Rotate an ApsaraDB RDS generic secret
Use this approach when your database runs on ApsaraDB RDS and you want to rotate credentials using the RDS API.
Prerequisites
Before you begin, make sure you have:
An active Function Compute account. See Activate Function Compute.
An ApsaraDB RDS database account and a generic secret for it in the KMS console.
A Function Compute service role with the permissions listed in Step 1.
Step 1: Set up permissions
Create a service role for Function Compute and attach the following policies:
AliyunFCDefaultRolePolicy — system policy
AliyunSTSAssumeRoleAccess — system policy
A custom policy granting KMS and RDS access:
{
"Version": "1",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kms:GetSecretValue",
"kms:GetRandomPassword",
"kms:PutSecretValue",
"kms:UpdateSecretVersionStage"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"rds:GrantAccountPrivilege",
"rds:DescribeAccounts",
"rds:ResetAccountPassword",
"rds:CreateAccount"
],
"Resource": "*"
}
]
}See Grant Function Compute permissions to access other Alibaba Cloud services for instructions on attaching policies.
Step 2: Create the generic secret
Create a generic secret in the KMS console with the following JSON as the secret value. The secret name is later passed as a scheduling parameter to CloudFlow.
{
"AccountName": "<rds-account-name>",
"AccountPassword": "<rds-account-password>"
}| Field | Description |
|---|---|
AccountName | Username of the RDS database account |
AccountPassword | Password of the RDS database account |
Step 3: Create the rotation function
In the Function Compute console, create a service. Click Show Advanced Options, set Access to VPC to Yes, and configure VPC, vSwitch, and Security Group. The security group must allow the function to reach both the RDS API and the KMS VPC endpoint (
kms-vpc.<region-id>.aliyuncs.com).Create a function with Runtime Environments set to Python 3.9. Use the following code:
# -*- coding: utf-8 -*-
import json
import logging
import os
from aliyunsdkrds.request.v20140815.CreateAccountRequest import CreateAccountRequest
from aliyunsdkrds.request.v20140815.DescribeAccountsRequest import DescribeAccountsRequest
from aliyunsdkrds.request.v20140815.GrantAccountPrivilegeRequest import GrantAccountPrivilegeRequest
from aliyunsdkrds.request.v20140815.ResetAccountPasswordRequest import ResetAccountPasswordRequest
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkcore.auth.credentials import StsTokenCredential
from aliyunsdkcore.client import AcsClient
from aliyunsdkkms.request.v20160120.GetRandomPasswordRequest import GetRandomPasswordRequest
from aliyunsdkkms.request.v20160120.GetSecretValueRequest import GetSecretValueRequest
from aliyunsdkkms.request.v20160120.PutSecretValueRequest import PutSecretValueRequest
from aliyunsdkkms.request.v20160120.UpdateSecretVersionStageRequest import UpdateSecretVersionStageRequest
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def handler(event, context):
evt = json.loads(event)
secret_name = evt['SecretName']
region_id = evt['RegionId']
step = evt['Step']
instance_id = evt['InstanceId']
version_id = evt.get('VersionId')
if not version_id:
version_id = context.requestId
credentials = StsTokenCredential(context.credentials.accessKeyId, context.credentials.accessKeySecret,
context.credentials.securityToken)
client = AcsClient(region_id=region_id, credential=credentials)
endpoint = "kms-vpc." + region_id + ".aliyuncs.com"
client.add_endpoint(region_id, 'kms', endpoint)
resp = get_secret_value(client, secret_name)
if "Generic" != resp['SecretType']:
logger.error("Secret %s is not enabled for rotation" % secret_name)
raise ValueError("Secret %s is not enabled for rotation" % secret_name)
if step == "new":
new_phase(client, secret_name, version_id)
elif step == "set":
set_phase(client, instance_id, secret_name, version_id)
elif step == "end":
end_phase(client, secret_name, version_id)
else:
logger.error("handler: Invalid step parameter %s for secret %s" % (step, secret_name))
raise ValueError("Invalid step parameter %s for secret %s" % (step, secret_name))
return {"VersionId": version_id}
def new_phase(client, secret_name, version_id):
current_dict = get_secret_dict(client, secret_name, "ACSCurrent")
try:
get_secret_dict(client, secret_name, "ACSPending", version_id)
logger.info("new: Successfully retrieved secret for %s." % secret_name)
except ServerException as e:
if e.error_code != 'Forbidden.ResourceNotFound':
raise ValueError("Can to find secret %s " % (secret_name))
current_dict['AccountName'] = get_alt_account_name(current_dict['AccountName'])
exclude_characters = os.environ[
'EXCLUDE_CHARACTERS'] if 'EXCLUDE_CHARACTERS' in os.environ else "\\\"\',./:;<>?[]{|}~`"
passwd = get_random_password(client, exclude_characters)
current_dict['AccountPassword'] = passwd['RandomPassword']
put_secret_value(client, secret_name, version_id, json.dumps(current_dict),
json.dumps(['ACSPending']))
logger.info(
"new: Successfully put secret for secret_name %s and version %s." % (secret_name, version_id))
def set_phase(client, instance_id, secret_name, version_id):
current_dict = get_secret_dict(client, secret_name, "ACSCurrent")
pending_dict = get_secret_dict(client, secret_name, "ACSPending", version_id)
pending_resp = describe_accounts(client, instance_id, pending_dict["AccountName"])
pending_accounts = pending_resp["Accounts"]["DBInstanceAccount"]
if get_alt_account_name(current_dict['AccountName']) != pending_dict['AccountName']:
logger.error("set: Attempting to modify user %s other than current user or rotation %s" % (
pending_dict['AccountName'], current_dict['AccountName']))
raise ValueError("Attempting to modify user %s other than current user or rotation %s" % (
pending_dict['AccountName'], current_dict['AccountName']))
current_resp = describe_accounts(client, instance_id, current_dict["AccountName"])
current_accounts = current_resp["Accounts"]["DBInstanceAccount"]
if len(current_accounts) == 0:
logger.error("set: Unable to log into database using current credentials for secret %s" % secret_name)
raise ValueError("Unable to log into database using current credentials for secret %s" % secret_name)
if len(pending_accounts) == 0:
create_rds_account(client, instance_id, pending_dict["AccountName"],
pending_dict["AccountPassword"], current_accounts[0]["AccountType"])
pending_accounts = describe_accounts(client, instance_id, pending_dict["AccountName"])["Accounts"][
"DBInstanceAccount"]
else:
# reset password
reset_account_password(client, instance_id, pending_dict["AccountName"], pending_dict["AccountPassword"])
current_privileges = current_accounts[0]["DatabasePrivileges"]["DatabasePrivilege"]
pending_privileges = pending_accounts[0]["DatabasePrivileges"]["DatabasePrivilege"]
if len(current_privileges) > 0:
for current_privilege in current_privileges:
is_contains = False
for pending_privilege in pending_privileges:
if current_privilege["DBName"] == pending_privilege["DBName"] and current_privilege[
"AccountPrivilege"] == pending_privilege["AccountPrivilege"]:
is_contains = True
continue
if not is_contains:
grant_account_privilege(client, instance_id, pending_dict["AccountName"], current_privilege["DBName"],
current_privilege["AccountPrivilege"])
def end_phase(client, secret_name, version_id):
update_secret_version_stage(client, secret_name, 'ACSCurrent', move_to_version=version_id)
update_secret_version_stage(client, secret_name, 'ACSPending', remove_from_version=version_id)
logger.info(
"end: Successfully set ACSCurrent stage to version %s for secret %s." % (version_id, secret_name))
def get_secret_dict(client, secret_name, stage, version_id=None):
required_fields = ['AccountName', 'AccountPassword']
if version_id:
secret = get_secret_value(client, secret_name, version_id, stage)
else:
secret = get_secret_value(client, secret_name, stage=stage)
plaintext = secret['SecretData']
secret_dict = json.loads(plaintext)
for field in required_fields:
if field not in secret_dict:
raise KeyError("%s key is missing from secret JSON" % field)
return secret_dict
def get_alt_account_name(current_account_name):
rotation_suffix = "_rt"
if current_account_name.endswith(rotation_suffix):
return current_account_name[:(len(rotation_suffix) * -1)]
else:
new_account_name = current_account_name + rotation_suffix
if len(new_account_name) > 16:
raise ValueError(
"Unable to rotation user, account_name length with _rotation appended would exceed 16 characters")
return new_account_name
def get_secret_value(client, secret_name, version_id=None, stage=None):
request = GetSecretValueRequest()
request.set_accept_format('json')
request.set_SecretName(secret_name)
if version_id:
request.set_VersionId(version_id)
if stage:
request.set_VersionStage(stage)
response = client.do_action_with_exception(request)
return json.loads(response)
def put_secret_value(client, secret_name, version_id, secret_data, version_stages=None):
request = PutSecretValueRequest()
request.set_accept_format('json')
request.set_SecretName(secret_name)
request.set_VersionId(version_id)
if version_stages:
request.set_VersionStages(version_stages)
request.set_SecretData(secret_data)
response = client.do_action_with_exception(request)
return json.loads(response)
def get_random_password(client, exclude_characters=None):
request = GetRandomPasswordRequest()
request.set_accept_format('json')
if exclude_characters:
request.set_ExcludeCharacters(exclude_characters)
response = client.do_action_with_exception(request)
return json.loads(response)
def update_secret_version_stage(client, secret_name, version_stage, remove_from_version=None, move_to_version=None):
request = UpdateSecretVersionStageRequest()
request.set_accept_format('json')
request.set_VersionStage(version_stage)
request.set_SecretName(secret_name)
if remove_from_version:
request.set_RemoveFromVersion(remove_from_version)
if move_to_version:
request.set_MoveToVersion(move_to_version)
response = client.do_action_with_exception(request)
return json.loads(response)
def create_rds_account(client, db_instance_id, account_name, account_password, account_type):
request = CreateAccountRequest()
request.set_accept_format('json')
request.set_DBInstanceId(db_instance_id)
request.set_AccountName(account_name)
request.set_AccountPassword(account_password)
request.set_AccountType(account_type)
response = client.do_action_with_exception(request)
return json.loads(response)
def grant_account_privilege(client, db_instance_id, account_name, db_name, account_privilege):
request = GrantAccountPrivilegeRequest()
request.set_accept_format('json')
request.set_DBInstanceId(db_instance_id)
request.set_AccountName(account_name)
request.set_DBName(db_name)
request.set_AccountPrivilege(account_privilege)
response = client.do_action_with_exception(request)
return json.loads(response)
def describe_accounts(client, db_instance_id, account_name):
request = DescribeAccountsRequest()
request.set_accept_format('json')
request.set_DBInstanceId(db_instance_id)
request.set_AccountName(account_name)
response = client.do_action_with_exception(request)
return json.loads(response)
def reset_account_password(client, db_instance_id, account_name, account_password):
request = ResetAccountPasswordRequest()
request.set_accept_format('json')
request.set_DBInstanceId(db_instance_id)
request.set_AccountName(account_name)
request.set_AccountPassword(account_password)
response = client.do_action_with_exception(request)
return json.loads(response)Step 4: Create the CloudFlow rotation flow
Log on to the CloudFlow console and select the same region as your Function Compute functions.
On the Flows page, click Create flow.
On the Create Flow page, click Create Flow with Code, set the Definition YAML to the following, and click Next Step:
version: v1
type: flow
steps:
- type: task
name: RotateSecretNew
resourceArn: <function-arn>
inputMappings:
- target: SecretName
source: $input.payload.SecretName
- target: RegionId
source: $input.payload.RegionId
- target: InstanceId
source: $input.payload.InstanceId
- target: Step
source: new
- type: task
name: RotateSecretSet
resourceArn: <function-arn>
inputMappings:
- target: SecretName
source: $input.payload.SecretName
- target: RegionId
source: $input.payload.RegionId
- target: InstanceId
source: $input.payload.InstanceId
- target: Step
source: set
- target: VersionId
source: $local.VersionId
- type: task
name: RotateSecretEnd
resourceArn: <function-arn>
inputMappings:
- target: SecretName
source: $input.payload.SecretName
- target: RegionId
source: $input.payload.RegionId
- target: InstanceId
source: $input.payload.InstanceId
- target: Step
source: end
- target: VersionId
source: $local.VersionIdReplace <function-arn> with the Alibaba Cloud Resource Name (ARN) of the function you created.
Configure a flow role, then click Create Flow.
To schedule automatic rotation, create a time-based schedule. See Create a time-based schedule. Set Payload to:
{
"SecretName": "<secret-name>",
"RegionId": "<region-id>",
"InstanceId": "<rds-instance-id>"
}| Field | Description |
|---|---|
SecretName | Name of the generic secret in KMS |
RegionId | Region where the secret and function are deployed |
InstanceId | ID of the ApsaraDB RDS instance |
Step 5: Verify the rotation
After setting up the flow, manually trigger an execution to confirm everything works:
In the CloudFlow console, open your flow and click Start Execution with the same payload JSON from Step 4.
Wait for the execution to complete. All three steps (RotateSecretNew, RotateSecretSet, RotateSecretEnd) should show a Succeeded status.
In the KMS console, open the secret and check its version history. The
ACSCurrentlabel should now point to a new version.
Step 6: (Optional) Connect applications to Secrets Manager
Use Secrets Manager JDBC to retrieve the rotated credentials in your application without code changes. See Secrets Manager JDBC.
Scenario 2: Rotate a self-managed MySQL generic secret
Use this approach when your database is a self-managed MySQL instance. This scenario uses a privileged account to manage credential rotation and adds a test step to verify connectivity before completing the rotation.
Prerequisites
Before you begin, make sure you have:
An active Function Compute account. See Activate Function Compute.
A privileged MySQL account and a generic secret for it in the KMS console.
A second generic secret for the regular account to be rotated.
A Function Compute service role with the permissions listed in Step 1.
Step 1: Set up permissions
Create a service role for Function Compute and attach the following policies:
AliyunFCDefaultRolePolicy — system policy
AliyunSTSAssumeRoleAccess — system policy
A custom policy granting KMS access:
{
"Version": "1",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kms:GetSecretValue",
"kms:GetRandomPassword",
"kms:Decrypt",
"kms:GenerateDataKey",
"kms:PutSecretValue",
"kms:UpdateSecretVersionStage"
],
"Resource": "*"
}
]
}Step 2: Create the generic secrets
Create two generic secrets in the KMS console.
Privileged account secret — used to create and manage regular accounts:
{
"Endpoint": "<mysql-host>",
"AccountName": "<privileged-account-name>",
"AccountPassword": "<privileged-account-password>",
"SSL": false
}| Field | Description | Default |
|---|---|---|
Endpoint | Domain name or IP address of the MySQL database | — |
AccountName | Username of the privileged account | — |
AccountPassword | Password of the privileged account | — |
SSL | Whether to use an SSL certificate for the connection | false |
Regular account secret — the secret that will be rotated. Its name is passed as the scheduling parameter to CloudFlow:
{
"Endpoint": "<mysql-host>",
"AccountName": "<account-name>",
"AccountPassword": "<account-password>",
"MasterSecret": "<privileged-secret-name>",
"SSL": false
}| Field | Description | Default |
|---|---|---|
Endpoint | Domain name or IP address of the MySQL database | — |
AccountName | Username of the account to rotate | — |
AccountPassword | Password of the account to rotate | — |
MasterSecret | Name of the privileged account secret in KMS | — |
SSL | Whether to use an SSL certificate for the connection | false |
WhenSSListrue, the function expects the SSL certificate at/opt/python/certs/cert.pem. Add the certificate to a custom layer (see Step 3).
Step 3: Create the rotation function
Create a Function Compute service with VPC access enabled (same configuration as Scenario 1). Make sure the function can reach the MySQL instance and the KMS VPC endpoint.
Create a function with Runtime Environments set to Python 3.9. Use the following code:
# -*- coding: utf-8 -*-
import json
import logging
import os
try:
import pymysql
except:
os.system('pip install pymysql -t ./')
import pymysql
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkcore.auth.credentials import StsTokenCredential
from aliyunsdkcore.client import AcsClient
from aliyunsdkkms.request.v20160120.GetRandomPasswordRequest import GetRandomPasswordRequest
from aliyunsdkkms.request.v20160120.GetSecretValueRequest import GetSecretValueRequest
from aliyunsdkkms.request.v20160120.PutSecretValueRequest import PutSecretValueRequest
from aliyunsdkkms.request.v20160120.UpdateSecretVersionStageRequest import UpdateSecretVersionStageRequest
from aliyunsdkrds.request.v20140815.DescribeDBInstancesRequest import DescribeDBInstancesRequest
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def handler(event, context):
evt = json.loads(event)
secret_name = evt['SecretName']
region_id = evt['RegionId']
step = evt['Step']
version_id = evt.get('VersionId')
if not version_id:
version_id = context.requestId
credentials = StsTokenCredential(context.credentials.accessKeyId, context.credentials.accessKeySecret,
context.credentials.securityToken)
client = AcsClient(region_id=region_id, credential=credentials)
endpoint = "kms-vpc." + region_id + ".aliyuncs.com"
client.add_endpoint(region_id, 'kms', endpoint)
resp = get_secret_value(client, secret_name)
if "Generic" != resp['SecretType']:
logger.error("Secret %s is not enabled for rotation" % secret_name)
raise ValueError("Secret %s is not enabled for rotation" % secret_name)
if step == "new":
new_phase(client, secret_name, version_id)
elif step == "set":
set_phase(client, secret_name, version_id)
elif step == "test":
test_phase(client, secret_name, version_id)
elif step == "end":
end_phase(client, secret_name, version_id)
else:
logger.error("handler: Invalid step parameter %s for secret %s" % (step, secret_name))
raise ValueError("Invalid step parameter %s for secret %s" % (step, secret_name))
return {"VersionId": version_id}
def new_phase(client, secret_name, version_id):
current_dict = get_secret_dict(client, secret_name, "ACSCurrent")
try:
get_secret_dict(client, secret_name, "ACSPending", version_id)
logger.info("new: Successfully retrieved secret for %s." % secret_name)
except ServerException as e:
if e.error_code != 'Forbidden.ResourceNotFound':
raise
current_dict['AccountName'] = get_alt_account_name(current_dict['AccountName'])
exclude_characters = os.environ['EXCLUDE_CHARACTERS'] if 'EXCLUDE_CHARACTERS' in os.environ else '/@"\'\\'
passwd = get_random_password(client, exclude_characters)
current_dict['AccountPassword'] = passwd['RandomPassword']
put_secret_value(client, secret_name, version_id, json.dumps(current_dict),
json.dumps(['ACSPending']))
logger.info(
"new: Successfully put secret for secret_name %s and version %s." % (secret_name, version_id))
def set_phase(client, secret_name, version_id):
current_dict = get_secret_dict(client, secret_name, "ACSCurrent")
pending_dict = get_secret_dict(client, secret_name, "ACSPending", version_id)
conn = get_connection(pending_dict)
if conn:
conn.close()
logger.info(
"set: ACSPending secret is already set as password in MySQL DB for secret secret_name %s." % secret_name)
return
if get_alt_account_name(current_dict['AccountName']) != pending_dict['AccountName']:
logger.error("set: Attempting to modify user %s other than current user or rotation %s" % (
pending_dict['AccountName'], current_dict['AccountName']))
raise ValueError("Attempting to modify user %s other than current user or rotation %s" % (
pending_dict['AccountName'], current_dict['AccountName']))
if current_dict['Endpoint'] != pending_dict['Endpoint']:
logger.error("set: Attempting to modify user for Endpoint %s other than current Endpoint %s" % (
pending_dict['Endpoint'], current_dict['Endpoint']))
raise ValueError("Attempting to modify user for Endpoint %s other than current Endpoint %s" % (
pending_dict['Endpoint'], current_dict['Endpoint']))
conn = get_connection(current_dict)
if not conn:
logger.error("set: Unable to access the given database using current credentials for secret %s" % secret_name)
raise ValueError("Unable to access the given database using current credentials for secret %s" % secret_name)
conn.close()
master_secret = current_dict['MasterSecret']
master_dict = get_secret_dict(client, master_secret, "ACSCurrent")
if current_dict['Endpoint'] != master_dict['Endpoint'] and not is_rds_replica_database(current_dict, master_dict):
logger.error("set: Current database Endpoint %s is not the same Endpoint as/rds replica of master %s" % (
current_dict['Endpoint'], master_dict['Endpoint']))
raise ValueError("Current database Endpoint %s is not the same Endpoint as/rds replica of master %s" % (
current_dict['Endpoint'], master_dict['Endpoint']))
conn = get_connection(master_dict)
if not conn:
logger.error(
"set: Unable to access the given database using credentials in master secret secret %s" % master_secret)
raise ValueError("Unable to access the given database using credentials in master secret secret %s" % master_secret)
try:
with conn.cursor() as cur:
cur.execute("SELECT User FROM mysql.user WHERE User = %s", pending_dict['AccountName'])
if cur.rowcount == 0:
cur.execute("CREATE USER %s IDENTIFIED BY %s",
(pending_dict['AccountName'], pending_dict['AccountPassword']))
cur.execute("SHOW GRANTS FOR %s", current_dict['AccountName'])
for row in cur.fetchall():
if 'XA_RECOVER_ADMIN' in row[0]:
continue
grant = row[0].split(' TO ')
new_grant_escaped = grant[0].replace('%', '%%') # % is a special character in Python format strings.
cur.execute(new_grant_escaped + " TO %s ", (pending_dict['AccountName'],))
cur.execute("SELECT VERSION()")
ver = cur.fetchone()[0]
escaped_encryption_statement = get_escaped_encryption_statement(ver)
cur.execute("SELECT ssl_type, ssl_cipher, x509_issuer, x509_subject FROM mysql.user WHERE User = %s",
current_dict['AccountName'])
tls_options = cur.fetchone()
ssl_type = tls_options[0]
if not ssl_type:
cur.execute(escaped_encryption_statement + " NONE", pending_dict['AccountName'])
elif ssl_type == "ANY":
cur.execute(escaped_encryption_statement + " SSL", pending_dict['AccountName'])
elif ssl_type == "X509":
cur.execute(escaped_encryption_statement + " X509", pending_dict['AccountName'])
else:
cur.execute(escaped_encryption_statement + " CIPHER %s AND ISSUER %s AND SUBJECT %s",
(pending_dict['AccountName'], tls_options[1], tls_options[2], tls_options[3]))
password_option = get_password_option(ver)
cur.execute("SET PASSWORD FOR %s = " + password_option,
(pending_dict['AccountName'], pending_dict['AccountPassword']))
conn.commit()
logger.info("set: Successfully changed password for %s in MySQL DB for secret secret_name %s." % (
pending_dict['AccountName'], secret_name))
finally:
conn.close()
def test_phase(client, secret_name, version_id):
conn = get_connection(get_secret_dict(client, secret_name, "ACSPending", version_id))
if conn:
try:
with conn.cursor() as cur:
cur.execute("SELECT NOW()")
conn.commit()
finally:
conn.close()
logger.info("test: Successfully accessed into MySQL DB with ACSPending secret in %s." % secret_name)
return
else:
logger.error(
"test: Unable to access the given database with pending secret of secret secret_name %s" % secret_name)
raise ValueError("Unable to access the given database with pending secret of secret secret_name %s" % secret_name)
def end_phase(client, secret_name, version_id):
update_secret_version_stage(client, secret_name, 'ACSCurrent', move_to_version=version_id)
update_secret_version_stage(client, secret_name, 'ACSPending', remove_from_version=version_id)
logger.info(
"end: Successfully update ACSCurrent stage to version %s for secret %s." % (version_id, secret_name))
def get_connection(secret_dict):
port = int(secret_dict['Port']) if 'Port' in secret_dict else 3306
dbname = secret_dict['DBName'] if 'DBName' in secret_dict else None
use_ssl, fall_back = get_ssl_config(secret_dict)
conn = connect_and_authenticate(secret_dict, port, dbname, use_ssl)
if conn or not fall_back:
return conn
else:
return connect_and_authenticate(secret_dict, port, dbname, False)
def get_ssl_config(secret_dict):
if 'SSL' not in secret_dict:
return True, True
if isinstance(secret_dict['SSL'], bool):
return secret_dict['SSL'], False
if isinstance(secret_dict['SSL'], str):
ssl = secret_dict['SSL'].lower()
if ssl == "true":
return True, False
elif ssl == "false":
return False, False
else:
return True, True
return True, True
def connect_and_authenticate(secret_dict, port, dbname, use_ssl):
ssl = {'ca': '/opt/python/certs/cert.pem'} if use_ssl else None
try:
conn = pymysql.connect(host=secret_dict['Endpoint'], user=secret_dict['AccountName'],
password=secret_dict['AccountPassword'],
port=port, database=dbname, connect_timeout=5, ssl=ssl)
logger.info("Successfully established %s connection as user '%s' with Endpoint: '%s'" % (
"SSL/TLS" if use_ssl else "non SSL/TLS", secret_dict['AccountName'], secret_dict['Endpoint']))
return conn
except pymysql.OperationalError as e:
if 'certificate verify failed: IP address mismatch' in e.args[1]:
logger.error(
"Hostname verification failed when estlablishing SSL/TLS Handshake with Endpoint: %s" % secret_dict[
'Endpoint'])
return None
def get_secret_dict(client, secret_name, stage, version_id=None):
required_fields = ['Endpoint', 'AccountName', 'AccountPassword']
if version_id:
secret = get_secret_value(client, secret_name, version_id, stage)
else:
secret = get_secret_value(client, secret_name, stage=stage)
plaintext = secret['SecretData']
secret_dict = json.loads(plaintext)
for field in required_fields:
if field not in secret_dict:
raise KeyError("%s key is missing from secret JSON" % field)
return secret_dict
def get_alt_account_name(current_account_name):
rotation_suffix = "_rt"
if current_account_name.endswith(rotation_suffix):
return current_account_name[:(len(rotation_suffix) * -1)]
else:
new_account_name = current_account_name + rotation_suffix
if len(new_account_name) > 16:
raise ValueError(
"Unable to rotate user, account_name length with _rotation appended would exceed 16 characters")
return new_account_name
def get_password_option(version):
if version.startswith("8"):
return "%s"
else:
return "PASSWORD(%s)"
def get_escaped_encryption_statement(version):
if version.startswith("5.6"):
return "GRANT USAGE ON *.* TO %s@'%%' REQUIRE"
else:
return "ALTER USER %s@'%%' REQUIRE"
def is_rds_replica_database(client, replica_dict, master_dict):
replica_instance_id = replica_dict['Endpoint'].split(".")[0].replace('io', '')
master_instance_id = master_dict['Endpoint'].split(".")[0].replace('io', '')
try:
describe_response = describe_db_instances(client, replica_instance_id)
except Exception as err:
logger.warning("Encountered error while verifying rds replica status: %s" % err)
return False
items = describe_response['Items']
instances = items.get("DBInstance")
if not instances:
logger.info("Cannot verify replica status - no RDS instance found with identifier: %s" % replica_instance_id)
return False
current_instance = instances[0]
return master_instance_id == current_instance.get('DBInstanceId')
def get_secret_value(client, secret_name, version_id=None, stage=None):
request = GetSecretValueRequest()
request.set_accept_format('json')
request.set_SecretName(secret_name)
if version_id:
request.set_VersionId(version_id)
if stage:
request.set_VersionStage(stage)
response = client.do_action_with_exception(request)
return json.loads(response)
def put_secret_value(client, secret_name, version_id, secret_data, version_stages=None):
request = PutSecretValueRequest()
request.set_accept_format('json')
request.set_SecretName(secret_name)
request.set_VersionId(version_id)
if version_stages:
request.set_VersionStages(version_stages)
request.set_SecretData(secret_data)
response = client.do_action_with_exception(request)
return json.loads(response)
def get_random_password(client, exclude_characters=None):
request = GetRandomPasswordRequest()
request.set_accept_format('json')
if exclude_characters:
request.set_ExcludeCharacters(exclude_characters)
response = client.do_action_with_exception(request)
return json.loads(response)
def update_secret_version_stage(client, secret_name, version_stage, remove_from_version=None, move_to_version=None):
request = UpdateSecretVersionStageRequest()
request.set_accept_format('json')
request.set_VersionStage(version_stage)
request.set_SecretName(secret_name)
if remove_from_version:
request.set_RemoveFromVersion(remove_from_version)
if move_to_version:
request.set_MoveToVersion(move_to_version)
response = client.do_action_with_exception(request)
return json.loads(response)
def describe_db_instances(client, db_instance_id):
request = DescribeDBInstancesRequest()
request.set_accept_format('json')
request.set_DBInstanceId(db_instance_id)
response = client.do_action_with_exception(request)
return json.loads(response)Create a custom layer to package the PyMySQL dependency (and optionally the SSL certificate):
Run the following commands to build the layer package:
mkdir my-secret-rotate cd my-secret-rotate pip install --target ./python pymysql # If SSL is required, create the certs directory and add your certificate: # mkdir python/certs && cp cert.pem python/certs/ zip -r my-secret-rotate.zip pythonIn the Function Compute console, create a custom layer using the ZIP package. See Create a custom layer.
Attach the custom layer to your function. On the Services page, click Functions for your service, find the function, click Configure, and add the layer in the Layers section. See Manage layers.
Step 4: Create the CloudFlow rotation flow
Log on to the CloudFlow console and select the same region as your functions.
On the Flows page, click Create flow.
On the Create Flow page, click Create Flow with Code, set the Definition YAML to the following, and click Next Step:
version: v1
type: flow
steps:
- type: task
name: RotateSecretNew
resourceArn: <function-arn>
inputMappings:
- target: SecretName
source: $input.payload.SecretName
- target: RegionId
source: $input.payload.RegionId
- target: Step
source: new
- type: task
name: RotateSecretSet
resourceArn: <function-arn>
inputMappings:
- target: SecretName
source: $input.payload.SecretName
- target: RegionId
source: $input.payload.RegionId
- target: Step
source: set
- target: VersionId
source: $local.VersionId
- type: task
name: RotateSecretTest
resourceArn: <function-arn>
inputMappings:
- target: SecretName
source: $input.payload.SecretName
- target: RegionId
source: $input.payload.RegionId
- target: Step
source: test
- target: VersionId
source: $local.VersionId
- type: task
name: RotateSecretEnd
resourceArn: <function-arn>
inputMappings:
- target: SecretName
source: $input.payload.SecretName
- target: RegionId
source: $input.payload.RegionId
- target: Step
source: end
- target: VersionId
source: $local.VersionIdReplace <function-arn> with the ARN of the function you created.
Configure a flow role, then click Create Flow.
To schedule automatic rotation, create a time-based schedule and set Payload to:
{
"SecretName": "<secret-name>",
"RegionId": "<region-id>"
}| Field | Description |
|---|---|
SecretName | Name of the regular account generic secret in KMS |
RegionId | Region where the secret and function are deployed |
Step 5: Verify the rotation
Manually trigger an execution to confirm the setup:
In the CloudFlow console, open your flow and click Start Execution with the payload JSON from Step 4.
Wait for the execution to complete. All four steps (RotateSecretNew, RotateSecretSet, RotateSecretTest, RotateSecretEnd) should show a Succeeded status.
In the KMS console, open the secret and check its version history. The
ACSCurrentlabel should point to a new version.
Step 6: (Optional) Connect applications to Secrets Manager
Use Secrets Manager JDBC to retrieve rotated credentials automatically. See Secrets Manager JDBC.
What's next
Secrets Manager JDBC — integrate rotated credentials with JDBC-based applications
Overview of dynamic ApsaraDB RDS secrets — consider dynamic secrets if your database is on ApsaraDB RDS
What is Function Compute? — learn more about the serverless compute service used in this guide