All Products
Search
Document Center

Key Management Service:Use Function Compute to rotate generic secrets

Last Updated:Feb 23, 2024

You can use Function Compute to rotate generic secrets to improve the security of sensitive information. This topic describes how to use Function Compute to rotate generic secrets.

Background information

Function Compute is a fully managed, event-driven computing service. Function Compute allows you to focus on coding without the need to obtain or manage infrastructure resources such as servers. This way, you need to only write and upload your code. Function Compute allocates computing resources, runs tasks in an elastic and reliable manner, and provides features such as log query, performance monitoring, and alerting. For more information, see What is Function Compute?.

Billing

If you use Function Compute to rotate generic secrets, you are not charged for secret rotation that is provided by Key Management Service (KMS). You are charged for Function Compute and Serverless Workflow that is integrated into Function Compute. For more information about billing, see Billing of Function Compute and Billing of Serverless Workflow.

Important

Secrets Manager provides dynamic secrets that are automatically rotated. If dynamic secrets can meet your requirements for security, we recommend that you use dynamic secrets to reduce costs. For more information, see Overview of Dynamic ApsaraDB RDS secrets,Overview of Dynamic RAM secrets, or Overview of Dynamic ECS secrets.

Secret rotation process

This topic describes how to rotate a generic secret in the following scenarios:

  • Rotate a generic secret of an ApsaraDB RDS database in dual-account mode by using ApsaraDB RDS API

  • Rotate generic secrets of a self-managed MySQL database in dual-account mode by using a privileged account

Rotate a generic secret of an ApsaraDB RDS database in dual-account mode by using ApsaraDB RDS API

You can use Serverless Workflow and Function Compute to rotate a generic secret.

1. Prepare for secret rotation

  • Activate Function Compute. For more information, see Activate Function Compute.

  • Create an account that is used to log on to your ApsaraDB RDS database. Then, create a generic secret for the account in the KMS console. If the generic secret is rotated for the first time, a new account is generated.

    The secret value is in the JSON format. Example: The secret name is used as the scheduling parameter that will be passed to Serverless Workflow.

    {
       "AccountName": "",
     "AccountPassword": ""
    }
    Note

    AccountName specifies the username of the account that is used to log on to your ApsaraDB RDS database, and AccountPassword specifies the password of the account that is used to log on to your ApsaraDB RDS database.

  • Create a normal service role for Function Compute and grant permissions to allow Function Compute to access KMS. For more information, see Grant Function Compute permissions to access other Alibaba Cloud services.

    • Attach the AliyunFCDefaultRolePolicy policy to the normal service role of Function Compute.

    • Attach the AliyunSTSAssumeRoleAccess system policy to the normal service role.

    • Attach a custom policy that allows access to KMS to the normal service role. The following script shows the content of the policy:

      {
          "Version": "1",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "kms:GetSecretValue",
                      "kms:GetRandomPassword",
                      "kms:PutSecretValue",
                      "kms:UpdateSecretVersionStage"
                  ],
                  "Resource": "*"
              },
              {
                  "Effect": "Allow",
                  "Action": [
                      "rds:GrantAccountPrivilege",
                      "rds:DescribeAccounts",
                      "rds:ResetAccountPassword",
                      "rds:CreateAccount"
                  ],
                  "Resource": "*"
              }
          ]
      }

2. Create secret rotation functions

You must create a Function Compute service and secret rotation functions in the Function Compute console. For more information, see Quickly create a function.

  1. Create a Function Compute service.

    When you create a Function Compute service, you must click Show Advanced Options and set Access to VPC to Yes. Then, configure VPC, vSwitch, and Security Group. Make sure that the Function Compute service can access ApsaraDB RDS API and KMS by using the specified security group and vSwitch in the specified VPC.

  2. Create functions.

    In this example, Python is used. When you create the functions, select Python 3.9 for Runtime Environments. The following code provides an example. You can modify the code based on your business requirements.

    # -*- coding: utf-8 -*-
    import json
    import logging
    import os
    
    from aliyunsdkrds.request.v20140815.CreateAccountRequest import CreateAccountRequest
    from aliyunsdkrds.request.v20140815.DescribeAccountsRequest import DescribeAccountsRequest
    from aliyunsdkrds.request.v20140815.GrantAccountPrivilegeRequest import GrantAccountPrivilegeRequest
    from aliyunsdkrds.request.v20140815.ResetAccountPasswordRequest import ResetAccountPasswordRequest
    
    from aliyunsdkcore.acs_exception.exceptions import ServerException
    from aliyunsdkcore.auth.credentials import StsTokenCredential
    from aliyunsdkcore.client import AcsClient
    from aliyunsdkkms.request.v20160120.GetRandomPasswordRequest import GetRandomPasswordRequest
    from aliyunsdkkms.request.v20160120.GetSecretValueRequest import GetSecretValueRequest
    from aliyunsdkkms.request.v20160120.PutSecretValueRequest import PutSecretValueRequest
    from aliyunsdkkms.request.v20160120.UpdateSecretVersionStageRequest import UpdateSecretVersionStageRequest
    
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    
    def handler(event, context):
        evt = json.loads(event)
        secret_name = evt['SecretName']
        region_id = evt['RegionId']
        step = evt['Step']
        instance_id = evt['InstanceId']
        version_id = evt.get('VersionId')
        if not version_id:
            version_id = context.requestId
        credentials = StsTokenCredential(context.credentials.accessKeyId, context.credentials.accessKeySecret,
                                         context.credentials.securityToken)
        client = AcsClient(region_id=region_id, credential=credentials)
    
        endpoint = "kms-vpc." + region_id + ".aliyuncs.com"
        client.add_endpoint(region_id, 'kms', endpoint)
        resp = get_secret_value(client, secret_name)
        if "Generic" != resp['SecretType']:
            logger.error("Secret %s is not enabled for rotation" % secret_name)
            raise ValueError("Secret %s is not enabled for rotation" % secret_name)
    
        if step == "new":
            new_phase(client, secret_name, version_id)
    
        elif step == "set":
            set_phase(client, instance_id, secret_name, version_id)
    
        elif step == "end":
            end_phase(client, secret_name, version_id)
    
        else:
            logger.error("handler: Invalid step parameter %s for secret %s" % (step, secret_name))
            raise ValueError("Invalid step parameter %s for secret %s" % (step, secret_name))
        return {"VersionId": version_id}
    
    
    def new_phase(client, secret_name, version_id):
        current_dict = get_secret_dict(client, secret_name, "ACSCurrent")
        try:
            get_secret_dict(client, secret_name, "ACSPending", version_id)
            logger.info("new: Successfully retrieved secret for %s." % secret_name)
        except ServerException as e:
            if e.error_code != 'Forbidden.ResourceNotFound':
                raise ValueError("Can to find secret %s " % (secret_name))
            current_dict['AccountName'] = get_alt_account_name(current_dict['AccountName'])
    
            exclude_characters = os.environ[
                'EXCLUDE_CHARACTERS'] if 'EXCLUDE_CHARACTERS' in os.environ else "\\\"\',./:;<>?[]{|}~`"
            passwd = get_random_password(client, exclude_characters)
            current_dict['AccountPassword'] = passwd['RandomPassword']
            put_secret_value(client, secret_name, version_id, json.dumps(current_dict),
                             json.dumps(['ACSPending']))
            logger.info(
                "new: Successfully put secret for secret_name %s and version %s." % (secret_name, version_id))
    
    
    def set_phase(client, instance_id, secret_name, version_id):
        current_dict = get_secret_dict(client, secret_name, "ACSCurrent")
        pending_dict = get_secret_dict(client, secret_name, "ACSPending", version_id)
        pending_resp = describe_accounts(client, instance_id, pending_dict["AccountName"])
        pending_accounts = pending_resp["Accounts"]["DBInstanceAccount"]
    
        if get_alt_account_name(current_dict['AccountName']) != pending_dict['AccountName']:
            logger.error("set: Attempting to modify user %s other than current user or rotation %s" % (
                pending_dict['AccountName'], current_dict['AccountName']))
            raise ValueError("Attempting to modify user %s other than current user or rotation %s" % (
                pending_dict['AccountName'], current_dict['AccountName']))
    
        current_resp = describe_accounts(client, instance_id, current_dict["AccountName"])
        current_accounts = current_resp["Accounts"]["DBInstanceAccount"]
        if len(current_accounts) == 0:
            logger.error("set: Unable to log into database using current credentials for secret %s" % secret_name)
            raise ValueError("Unable to log into database using current credentials for secret %s" % secret_name)
        if len(pending_accounts) == 0:
            create_rds_account(client, instance_id, pending_dict["AccountName"],
                               pending_dict["AccountPassword"], current_accounts[0]["AccountType"])
            pending_accounts = describe_accounts(client, instance_id, pending_dict["AccountName"])["Accounts"][
                "DBInstanceAccount"]
        else:
            # reset password
            reset_account_password(client, instance_id, pending_dict["AccountName"], pending_dict["AccountPassword"])
    
        current_privileges = current_accounts[0]["DatabasePrivileges"]["DatabasePrivilege"]
        pending_privileges = pending_accounts[0]["DatabasePrivileges"]["DatabasePrivilege"]
        if len(current_privileges) > 0:
            for current_privilege in current_privileges:
                is_contains = False
                for pending_privilege in pending_privileges:
                    if current_privilege["DBName"] == pending_privilege["DBName"] and current_privilege[
                        "AccountPrivilege"] == pending_privilege["AccountPrivilege"]:
                        is_contains = True
                        continue
                if not is_contains:
                    grant_account_privilege(client, instance_id, pending_dict["AccountName"], current_privilege["DBName"],
                                            current_privilege["AccountPrivilege"])
    
    
    def end_phase(client, secret_name, version_id):
        update_secret_version_stage(client, secret_name, 'ACSCurrent', move_to_version=version_id)
        update_secret_version_stage(client, secret_name, 'ACSPending', remove_from_version=version_id)
        logger.info(
            "end: Successfully set ACSCurrent stage to version %s for secret %s." % (version_id, secret_name))
    
    
    def get_secret_dict(client, secret_name, stage, version_id=None):
        required_fields = ['AccountName', 'AccountPassword']
        if version_id:
            secret = get_secret_value(client, secret_name, version_id, stage)
        else:
            secret = get_secret_value(client, secret_name, stage=stage)
        plaintext = secret['SecretData']
        secret_dict = json.loads(plaintext)
        for field in required_fields:
            if field not in secret_dict:
                raise KeyError("%s key is missing from secret JSON" % field)
        return secret_dict
    
    
    def get_alt_account_name(current_account_name):
        rotation_suffix = "_rt"
        if current_account_name.endswith(rotation_suffix):
            return current_account_name[:(len(rotation_suffix) * -1)]
        else:
            new_account_name = current_account_name + rotation_suffix
            if len(new_account_name) > 16:
                raise ValueError(
                    "Unable to rotation user, account_name length with _rotation appended would exceed 16 characters")
            return new_account_name
    
    
    def get_secret_value(client, secret_name, version_id=None, stage=None):
        request = GetSecretValueRequest()
        request.set_accept_format('json')
        request.set_SecretName(secret_name)
        if version_id:
            request.set_VersionId(version_id)
        if stage:
            request.set_VersionStage(stage)
        response = client.do_action_with_exception(request)
        return json.loads(response)
    
    
    def put_secret_value(client, secret_name, version_id, secret_data, version_stages=None):
        request = PutSecretValueRequest()
        request.set_accept_format('json')
        request.set_SecretName(secret_name)
        request.set_VersionId(version_id)
        if version_stages:
            request.set_VersionStages(version_stages)
        request.set_SecretData(secret_data)
        response = client.do_action_with_exception(request)
        return json.loads(response)
    
    
    def get_random_password(client, exclude_characters=None):
        request = GetRandomPasswordRequest()
        request.set_accept_format('json')
        if exclude_characters:
            request.set_ExcludeCharacters(exclude_characters)
        response = client.do_action_with_exception(request)
        return json.loads(response)
    
    
    def update_secret_version_stage(client, secret_name, version_stage, remove_from_version=None, move_to_version=None):
        request = UpdateSecretVersionStageRequest()
        request.set_accept_format('json')
        request.set_VersionStage(version_stage)
        request.set_SecretName(secret_name)
        if remove_from_version:
            request.set_RemoveFromVersion(remove_from_version)
        if move_to_version:
            request.set_MoveToVersion(move_to_version)
        response = client.do_action_with_exception(request)
        return json.loads(response)
    
    
    def create_rds_account(client, db_instance_id, account_name, account_password, account_type):
        request = CreateAccountRequest()
        request.set_accept_format('json')
        request.set_DBInstanceId(db_instance_id)
        request.set_AccountName(account_name)
        request.set_AccountPassword(account_password)
        request.set_AccountType(account_type)
        response = client.do_action_with_exception(request)
        return json.loads(response)
    
    
    def grant_account_privilege(client, db_instance_id, account_name, db_name, account_privilege):
        request = GrantAccountPrivilegeRequest()
        request.set_accept_format('json')
        request.set_DBInstanceId(db_instance_id)
        request.set_AccountName(account_name)
        request.set_DBName(db_name)
        request.set_AccountPrivilege(account_privilege)
        response = client.do_action_with_exception(request)
        return json.loads(response)
    
    
    def describe_accounts(client, db_instance_id, account_name):
        request = DescribeAccountsRequest()
        request.set_accept_format('json')
        request.set_DBInstanceId(db_instance_id)
        request.set_AccountName(account_name)
        response = client.do_action_with_exception(request)
        return json.loads(response)
    
    
    def reset_account_password(client, db_instance_id, account_name, account_password):
        request = ResetAccountPasswordRequest()
        request.set_accept_format('json')
        request.set_DBInstanceId(db_instance_id)
        request.set_AccountName(account_name)
        request.set_AccountPassword(account_password)
        response = client.do_action_with_exception(request)
        return json.loads(response)
                            

3. Create a Serverless Workflow flow for secret rotation

  1. Create a secret rotation flow. For more information, see Create a flow.

    1. Log on to the Serverless Workflow console.

    2. In the top navigation bar, select the region where you want to create a flow.

      Important

      You must select the region where the functions are created.

    3. On the Flows page, click Create flow.

    4. On the Create Flow page, click Create Flow with Code, configure the parameters, and then click Next Step.

      You must change the content in the YAML file of Definition to the following code:

      version: v1
      type: flow
      steps:
        - type: task
          name: RotateSecretNew
          resourceArn: the Alibaba Cloud Resource Name (ARN) of the function that you created
          inputMappings:
            - target: SecretName
              source: $input.payload.SecretName
            - target: RegionId
              source: $input.payload.RegionId
            - target: InstanceId
              source: $input.payload.InstanceId
            - target: Step
              source: new
        - type: task
          name: RotateSecretSet
          resourceArn: the ARN of the function that you created
          inputMappings:
            - target: SecretName
              source: $input.payload.SecretName
            - target: RegionId
              source: $input.payload.RegionId
            - target: InstanceId
              source: $input.payload.InstanceId
            - target: Step
              source: set
            - target: VersionId
              source: $local.VersionId
        - type: task
          name: RotateSecretEnd
          resourceArn: the ARN of the function that you created
          inputMappings:
            - target: SecretName
              source: $input.payload.SecretName
            - target: RegionId
              source: $input.payload.RegionId
            - target: InstanceId
              source: $input.payload.InstanceId
            - target: Step
              source: end
            - target: VersionId
              source: $local.VersionId
    5. Configure a flow role.

    6. Click Create Flow.

  2. Configure settings to rotate the generic secret at a scheduled time. For more information, see Create a time-based schedule.

    If you want to use Serverless Workflow to configure scheduled rotation of the generic secret, set Payload to the following content:

    {
      "SecretName": "",
      "RegionId": "",
      "InstanceId":""
    }
    Note

    SecretName specifies the secret name, RegionId specifies the region, and InstanceId specifies the ID of your ApsaraDB RDS instance.

4. (Optional) Connect applications to Secrets Manager

You can use Secrets Manager JDBC to connect applications to Secrets Manager. For more information, see Secrets Manager JDBC.

Rotate generic secrets of a self-managed MySQL database in dual-account mode by using a privileged account

You can use Serverless Workflow and Function Compute to rotate generic secrets.

1. Prepare for secret rotation

  • Activate Function Compute. For more information, see Activate Function Compute.

  • Create a privileged account that is used to log on to your self-managed MySQL database and create a generic secret for the privileged account in the KMS console. The privileged account is also used to create an account or modify the password of an account that is used to log on to your self-managed MySQL database.

    The secret value is in the JSON format. Example:

    {
     "Endpoint": "",
     "AccountName": "",
     "AccountPassword": "",
     "SSL":false
    }
    Note

    Endpoint specifies the domain name or address of your self-managed MySQL database, and AccountName specifies the username of the privileged account. AccountPassword specifies the password of the privileged account, and SSL specifies whether to use a certificate. Valid values for SSL are true and false. Default value: false. By default, the certificate is stored in the /opt/python/certs/cert.pem directory.

  • Create an account that is used to log on to your self-managed MySQL database and create a generic secret for the account in the KMS console. If the generic secret is rotated for the first time, a new account is created.

    The secret value is in the JSON format. Example: The secret name is used as the scheduling parameter that will be passed to Serverless Workflow.

    {
     "Endpoint": "",
     "AccountName": "",
     "AccountPassword": "",
     "MasterSecret":"",
     "SSL":false
    }
    Note

    Endpoint specifies the domain name or address of your self-managed MySQL database, AccountName specifies the username of the account that is used to log on to your self-managed MySQL database, AccountPassword specifies the password of the account that is used to log on to your self-managed MySQL database, MasterSecret specifies the privileged account, and SSL specifies whether to use a certificate. Valid values for SSL are true and false. Default value: false. By default, the certificate is stored in the /opt/python/certs/cert.pem directory.

  • Create a normal service role for Function Compute and grant permissions to allow Function Compute to access KMS. For more information, see Grant Function Compute permissions to access other Alibaba Cloud services.

    • Attach the AliyunFCDefaultRolePolicy policy to the normal service role of Function Compute.

    • Attach the AliyunSTSAssumeRoleAccess system policy to the normal service role.

    • Attach a custom policy that allows access to KMS to the normal service role. The following script shows the content of the policy:

      {
          "Version": "1",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "kms:GetSecretValue",
                      "kms:GetRandomPassword",
                      "kms:Decrypt",
                      "kms:GenerateDataKey",
                      "kms:PutSecretValue",
                      "kms:UpdateSecretVersionStage"
                  ],
                  "Resource": "*"
              }
          ]
      }
                              

2. Create secret rotation functions

  1. Create a Function Compute service. For more information, see Quickly create a function.

    When you create a Function Compute service, you must click Show Advanced Options and set Access to VPC to Yes. Then, configure VPC, vSwitch, and Security Group. Make sure that the Function Compute service can access ApsaraDB RDS API and KMS by using the specified security group and vSwitch in the specified VPC.

  2. Create functions. For more information, see Quickly create a function.

    In this example, Python is used. When you create the functions, select Python 3.9 for Runtime Environments. The following code provides an example. You can modify the code based on your business requirements.

    # -*- coding: utf-8 -*-
    import json
    import logging
    import os
    
    try:
        import pymysql
    except:
        os.system('pip install pymysql -t ./')
        import pymysql
    from aliyunsdkcore.acs_exception.exceptions import ServerException
    from aliyunsdkcore.auth.credentials import StsTokenCredential
    from aliyunsdkcore.client import AcsClient
    from aliyunsdkkms.request.v20160120.GetRandomPasswordRequest import GetRandomPasswordRequest
    from aliyunsdkkms.request.v20160120.GetSecretValueRequest import GetSecretValueRequest
    from aliyunsdkkms.request.v20160120.PutSecretValueRequest import PutSecretValueRequest
    from aliyunsdkkms.request.v20160120.UpdateSecretVersionStageRequest import UpdateSecretVersionStageRequest
    from aliyunsdkrds.request.v20140815.DescribeDBInstancesRequest import DescribeDBInstancesRequest
    
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    
    def handler(event, context):
        evt = json.loads(event)
        secret_name = evt['SecretName']
        region_id = evt['RegionId']
        step = evt['Step']
        version_id = evt.get('VersionId')
        if not version_id:
            version_id = context.requestId
        credentials = StsTokenCredential(context.credentials.accessKeyId, context.credentials.accessKeySecret,
                                         context.credentials.securityToken)
        client = AcsClient(region_id=region_id, credential=credentials)
    
        endpoint = "kms-vpc." + region_id + ".aliyuncs.com"
        client.add_endpoint(region_id, 'kms', endpoint)
        resp = get_secret_value(client, secret_name)
        if "Generic" != resp['SecretType']:
            logger.error("Secret %s is not enabled for rotation" % secret_name)
            raise ValueError("Secret %s is not enabled for rotation" % secret_name)
    
        if step == "new":
            new_phase(client, secret_name, version_id)
    
        elif step == "set":
            set_phase(client, secret_name, version_id)
    
        elif step == "test":
            test_phase(client, secret_name, version_id)
    
        elif step == "end":
            end_phase(client, secret_name, version_id)
    
        else:
            logger.error("handler: Invalid step parameter %s for secret %s" % (step, secret_name))
            raise ValueError("Invalid step parameter %s for secret %s" % (step, secret_name))
        return {"VersionId": version_id}
    
    
    def new_phase(client, secret_name, version_id):
        current_dict = get_secret_dict(client, secret_name, "ACSCurrent")
        try:
            get_secret_dict(client, secret_name, "ACSPending", version_id)
            logger.info("new: Successfully retrieved secret for %s." % secret_name)
        except ServerException as e:
            if e.error_code != 'Forbidden.ResourceNotFound':
                raise
            current_dict['AccountName'] = get_alt_account_name(current_dict['AccountName'])
    
            exclude_characters = os.environ['EXCLUDE_CHARACTERS'] if 'EXCLUDE_CHARACTERS' in os.environ else '/@"\'\\'
            passwd = get_random_password(client, exclude_characters)
            current_dict['AccountPassword'] = passwd['RandomPassword']
            put_secret_value(client, secret_name, version_id, json.dumps(current_dict),
                             json.dumps(['ACSPending']))
            logger.info(
                "new: Successfully put secret for secret_name %s and version %s." % (secret_name, version_id))
    
    
    def set_phase(client, secret_name, version_id):
        current_dict = get_secret_dict(client, secret_name, "ACSCurrent")
        pending_dict = get_secret_dict(client, secret_name, "ACSPending", version_id)
    
        conn = get_connection(pending_dict)
        if conn:
            conn.close()
            logger.info(
                "set: ACSPending secret is already set as password in MySQL DB for secret secret_name %s." % secret_name)
            return
    
        if get_alt_account_name(current_dict['AccountName']) != pending_dict['AccountName']:
            logger.error("set: Attempting to modify user %s other than current user or rotation %s" % (
                pending_dict['AccountName'], current_dict['AccountName']))
            raise ValueError("Attempting to modify user %s other than current user or rotation %s" % (
                pending_dict['AccountName'], current_dict['AccountName']))
    
        if current_dict['Endpoint'] != pending_dict['Endpoint']:
            logger.error("set: Attempting to modify user for Endpoint %s other than current Endpoint %s" % (
                pending_dict['Endpoint'], current_dict['Endpoint']))
            raise ValueError("Attempting to modify user for Endpoint %s other than current Endpoint %s" % (
                pending_dict['Endpoint'], current_dict['Endpoint']))
    
        conn = get_connection(current_dict)
        if not conn:
            logger.error("set: Unable to access the given database using current credentials for secret %s" % secret_name)
            raise ValueError("Unable to access the given database using current credentials for secret %s" % secret_name)
        conn.close()
    
        master_secret = current_dict['MasterSecret']
        master_dict = get_secret_dict(client, master_secret, "ACSCurrent")
        if current_dict['Endpoint'] != master_dict['Endpoint'] and not is_rds_replica_database(current_dict, master_dict):
            logger.error("set: Current database Endpoint %s is not the same Endpoint as/rds replica of master %s" % (
                current_dict['Endpoint'], master_dict['Endpoint']))
            raise ValueError("Current database Endpoint %s is not the same Endpoint as/rds replica of master %s" % (
                current_dict['Endpoint'], master_dict['Endpoint']))
    
        conn = get_connection(master_dict)
        if not conn:
            logger.error(
                "set: Unable to access the given database using credentials in master secret secret %s" % master_secret)
            raise ValueError("Unable to access the given database using credentials in master secret secret %s" % master_secret)
    
        try:
            with conn.cursor() as cur:
                cur.execute("SELECT User FROM mysql.user WHERE User = %s", pending_dict['AccountName'])
                if cur.rowcount == 0:
                    cur.execute("CREATE USER %s IDENTIFIED BY %s",
                                (pending_dict['AccountName'], pending_dict['AccountPassword']))
    
                cur.execute("SHOW GRANTS FOR %s", current_dict['AccountName'])
                for row in cur.fetchall():
                    if 'XA_RECOVER_ADMIN' in row[0]:
                        continue
                    grant = row[0].split(' TO ')
                    new_grant_escaped = grant[0].replace('%', '%%')  # % is a special cha30racter in Python format strings.
                    cur.execute(new_grant_escaped + " TO %s ", (pending_dict['AccountName'],))
                cur.execute("SELECT VERSION()")
                ver = cur.fetchone()[0]
    
                escaped_encryption_statement = get_escaped_encryption_statement(ver)
                cur.execute("SELECT ssl_type, ssl_cipher, x509_issuer, x509_subject FROM mysql.user WHERE User = %s",
                            current_dict['AccountName'])
                tls_options = cur.fetchone()
                ssl_type = tls_options[0]
                if not ssl_type:
                    cur.execute(escaped_encryption_statement + " NONE", pending_dict['AccountName'])
                elif ssl_type == "ANY":
                    cur.execute(escaped_encryption_statement + " SSL", pending_dict['AccountName'])
                elif ssl_type == "X509":
                    cur.execute(escaped_encryption_statement + " X509", pending_dict['AccountName'])
                else:
                    cur.execute(escaped_encryption_statement + " CIPHER %s AND ISSUER %s AND SUBJECT %s",
                                (pending_dict['AccountName'], tls_options[1], tls_options[2], tls_options[3]))
    
                password_option = get_password_option(ver)
                cur.execute("SET PASSWORD FOR %s = " + password_option,
                            (pending_dict['AccountName'], pending_dict['AccountPassword']))
                conn.commit()
                logger.info("set: Successfully changed password for %s in MySQL DB for secret secret_name %s." % (
                    pending_dict['AccountName'], secret_name))
        finally:
            conn.close()
    
    
    def test_phase(client, secret_name, version_id):
        conn = get_connection(get_secret_dict(client, secret_name, "ACSPending", version_id))
        if conn:
            try:
                with conn.cursor() as cur:
                    cur.execute("SELECT NOW()")
                    conn.commit()
            finally:
                conn.close()
    
            logger.info("test: Successfully accessed into MySQL DB with ACSPending secret in %s." % secret_name)
            return
        else:
            logger.error(
                "test: Unable to access the given database with pending secret of secret secret_name %s" % secret_name)
            raise ValueError("Unable to access the given database with pending secret of secret secret_name %s" % secret_name)
    
    
    def end_phase(client, secret_name, version_id):
        update_secret_version_stage(client, secret_name, 'ACSCurrent', move_to_version=version_id)
        update_secret_version_stage(client, secret_name, 'ACSPending', remove_from_version=version_id)
        logger.info(
            "end: Successfully update ACSCurrent stage to version %s for secret %s." % (version_id, secret_name))
    
    
    def get_connection(secret_dict):
        port = int(secret_dict['Port']) if 'Port' in secret_dict else 3306
        dbname = secret_dict['DBName'] if 'DBName' in secret_dict else None
    
        use_ssl, fall_back = get_ssl_config(secret_dict)
    
        conn = connect_and_authenticate(secret_dict, port, dbname, use_ssl)
        if conn or not fall_back:
            return conn
        else:
            return connect_and_authenticate(secret_dict, port, dbname, False)
    
    
    def get_ssl_config(secret_dict):
        if 'SSL' not in secret_dict:
            return True, True
    
        if isinstance(secret_dict['SSL'], bool):
            return secret_dict['SSL'], False
    
        if isinstance(secret_dict['SSL'], str):
            ssl = secret_dict['SSL'].lower()
            if ssl == "true":
                return True, False
            elif ssl == "false":
                return False, False
            else:
                return True, True
    
        return True, True
    
    
    def connect_and_authenticate(secret_dict, port, dbname, use_ssl):
        ssl = {'ca': '/opt/python/certs/cert.pem'} if use_ssl else None
    
        try:
            conn = pymysql.connect(host=secret_dict['Endpoint'], user=secret_dict['AccountName'],
                                   password=secret_dict['AccountPassword'],
                                   port=port, database=dbname, connect_timeout=5, ssl=ssl)
            logger.info("Successfully established %s connection as user '%s' with Endpoint: '%s'" % (
                "SSL/TLS" if use_ssl else "non SSL/TLS", secret_dict['AccountName'], secret_dict['Endpoint']))
            return conn
        except pymysql.OperationalError as e:
            if 'certificate verify failed: IP address mismatch' in e.args[1]:
                logger.error(
                    "Hostname verification failed when estlablishing SSL/TLS Handshake with Endpoint: %s" % secret_dict[
                        'Endpoint'])
            return None
    
    
    def get_secret_dict(client, secret_name, stage, version_id=None):
        required_fields = ['Endpoint', 'AccountName', 'AccountPassword']
        if version_id:
            secret = get_secret_value(client, secret_name, version_id, stage)
        else:
            secret = get_secret_value(client, secret_name, stage=stage)
        plaintext = secret['SecretData']
        secret_dict = json.loads(plaintext)
        for field in required_fields:
            if field not in secret_dict:
                raise KeyError("%s key is missing from secret JSON" % field)
        return secret_dict
    
    
    def get_alt_account_name(current_account_name):
        rotation_suffix = "_rt"
        if current_account_name.endswith(rotation_suffix):
            return current_account_name[:(len(rotation_suffix) * -1)]
        else:
            new_account_name = current_account_name + rotation_suffix
            if len(new_account_name) > 16:
                raise ValueError(
                    "Unable to rotate user, account_name length with _rotation appended would exceed 16 characters")
            return new_account_name
    
    
    def get_password_option(version):
        if version.startswith("8"):
            return "%s"
        else:
            return "PASSWORD(%s)"
    
    
    def get_escaped_encryption_statement(version):
        if version.startswith("5.6"):
            return "GRANT USAGE ON *.* TO %s@'%%' REQUIRE"
        else:
            return "ALTER USER %s@'%%' REQUIRE"
    
    
    def is_rds_replica_database(client, replica_dict, master_dict):
        replica_instance_id = replica_dict['Endpoint'].split(".")[0].replace('io', '')
        master_instance_id = master_dict['Endpoint'].split(".")[0].replace('io', '')
        try:
            describe_response = describe_db_instances(client, replica_instance_id)
        except Exception as err:
            logger.warning("Encountered error while verifying rds replica status: %s" % err)
            return False
        items = describe_response['Items']
        instances = items.get("DBInstance")
        if not instances:
            logger.info("Cannot verify replica status - no RDS instance found with identifier: %s" % replica_instance_id)
            return False
    
        current_instance = instances[0]
        return master_instance_id == current_instance.get('DBInstanceId')
    
    
    def get_secret_value(client, secret_name, version_id=None, stage=None):
        request = GetSecretValueRequest()
        request.set_accept_format('json')
        request.set_SecretName(secret_name)
        if version_id:
            request.set_VersionId(version_id)
        if stage:
            request.set_VersionStage(stage)
        response = client.do_action_with_exception(request)
        return json.loads(response)
    
    
    def put_secret_value(client, secret_name, version_id, secret_data, version_stages=None):
        request = PutSecretValueRequest()
        request.set_accept_format('json')
        request.set_SecretName(secret_name)
        request.set_VersionId(version_id)
        if version_stages:
            request.set_VersionStages(version_stages)
        request.set_SecretData(secret_data)
        response = client.do_action_with_exception(request)
        return json.loads(response)
    
    
    def get_random_password(client, exclude_characters=None):
        request = GetRandomPasswordRequest()
        request.set_accept_format('json')
        if exclude_characters:
            request.set_ExcludeCharacters(exclude_characters)
        response = client.do_action_with_exception(request)
        return json.loads(response)
    
    
    def update_secret_version_stage(client, secret_name, version_stage, remove_from_version=None, move_to_version=None):
        request = UpdateSecretVersionStageRequest()
        request.set_accept_format('json')
        request.set_VersionStage(version_stage)
        request.set_SecretName(secret_name)
        if remove_from_version:
            request.set_RemoveFromVersion(remove_from_version)
        if move_to_version:
            request.set_MoveToVersion(move_to_version)
        response = client.do_action_with_exception(request)
        return json.loads(response)
    
    
    def describe_db_instances(client, db_instance_id):
        request = DescribeDBInstancesRequest()
        request.set_accept_format('json')
        request.set_DBInstanceId(db_instance_id)
        response = client.do_action_with_exception(request)
        return json.loads(response)
    
    
                            
  3. Create a custom layer. For more information, see Create a custom layer.

    You can use custom layers to prevent frequent installation of dependencies. In this example, Python is used. In the sample code, you need to install the PyMySQL dependencies. If an SSL certificate is used for the MySQL database, you can also add the SSL certificate to the layer.

    1. Build the ZIP package of the layer.

      1. Run the mkdir my-secret-rotate command to create a working directory.

      2. Run the cd my-secret-rotate command to go to the working directory.

      3. Run the pip install --target ./python pymysql command to install the dependencies in the my-secret-rotate/python directory.

      4. If an SSL certificate is used for the MySQL database, create the certs directory in the python directory and save the cert.pem file to the certs directory.

      5. Go to the my-secret-rotate directory and run the zip -r my-secret-rotate.zip python command to package the dependencies.

    2. Create a custom layer in the Function Compute console.

  4. Configure the custom layer for the functions. For more information, see Manage layers.

    1. On the Services page, find the service whose function you want to manage and click Functions in the Actions column.

    2. Click Functions, find the function that you want to manage, and then click Configure in the Actions column. In the Layers section, add the custom layer for the function.

3. Create a Serverless Workflow flow for secret rotation

  1. Create a secret rotation flow. For more information, see Create a flow.

    1. Log on to the Serverless Workflow console.

    2. In the top navigation bar, select the region where you want to create a flow.

      Important

      You must select the region where the functions are created.

    3. On the Flows page, click Create flow.

    4. On the Create Flow page, click Create Flow with Code, configure the parameters, and then click Next Step.

      You must change the content in the YAML file of Definition to the following code:

      version: v1
      type: flow
      steps:
        - type: task
          name: RotateSecretNew
          resourceArn: the ARN of the function that you created
          inputMappings:
            - target: SecretName
              source: $input.payload.SecretName
            - target: RegionId
              source: $input.payload.RegionId
            - target: Step
              source: new
        - type: task
          name: RotateSecretSet
          resourceArn: the ARN of the function that you created
          inputMappings:
            - target: SecretName
              source: $input.payload.SecretName
            - target: RegionId
              source: $input.payload.RegionId
            - target: Step
              source: set
            - target: VersionId
              source: $local.VersionId
        - type: task
          name: RotateSecretTest
          resourceArn: the ARN of the function that you created
          inputMappings:
            - target: SecretName
              source: $input.payload.SecretName
            - target: RegionId
              source: $input.payload.RegionId
            - target: Step
              source: test
            - target: VersionId
              source: $local.VersionId
        - type: task
          name: RotateSecretEnd
          resourceArn: the ARN of the function that you created
          inputMappings:
            - target: SecretName
              source: $input.payload.SecretName
            - target: RegionId
              source: $input.payload.RegionId
            - target: Step
              source: end
            - target: VersionId
              source: $local.VersionId
    5. Configure a flow role.

    6. Click Create Flow.

  2. Configure settings to rotate the generic secrets at a scheduled time. For more information, see Create a time-based schedule.

    If you want to use Serverless Workflow to configure scheduled rotation of the generic secrets, set Payload to the following content:

    {
      "SecretName": "",
      "RegionId": ""
    }
    Note

    SecretName specifies the secret name and RegionId specifies the region.

4. (Optional) Connect applications to Secrets Manager

You can use Secrets Manager JDBC to connect applications to Secrets Manager. For more information, see Secrets Manager JDBC.