すべてのプロダクト
Search
ドキュメントセンター

Key Management Service:Function Computeを使用したジェネリックシークレットのローテーション

最終更新日:Jan 20, 2025

Function Computeを使用して汎用シークレットをローテーションし、機密情報のセキュリティを向上させることができます。 このトピックでは、Function Computeを使用してジェネリックシークレットをローテーションする方法について説明します。

背景情報

Function Computeは、完全マネージド型のイベント駆動型コンピューティングサービスです。 Function Computeを使用すると、サーバーなどのインフラストラクチャリソースを取得または管理する必要なく、コーディングに集中できます。 この方法では、コードを記述してアップロードするだけです。 Function Computeは、コンピューティングリソースを割り当て、柔軟で信頼性の高い方法でタスクを実行し、ログクエリ、パフォーマンスモニタリング、アラートなどの機能を提供します。 詳細については、「Function Computeとは」をご参照ください。

課金

Function Computeを使用して一般的なシークレットをローテーションする場合、Key Management Service (KMS) によって提供されるシークレットローテーションに対しては課金されません。 Function Computeと、Function Computeに統合されたサーバーレスワークフローに対して課金されます。 課金の詳細については、「Function Computeの課金」および「Serverless Workflowの課金」をご参照ください。

重要

Secrets Managerは、自動的にローテーションされる動的シークレットを提供します。 動的シークレットがセキュリティ要件を満たすことができる場合は、動的シークレットを使用してコストを削減することを推奨します。 詳細については、「ダイナミックApsaraDB RDSシークレットの概要」「概要」、または「ダイナミックECSシークレットの概要」をご参照ください。

秘密の回転プロセス

このトピックでは、次のシナリオでジェネリックシークレットをローテーションする方法について説明します。

  • ApsaraDB RDS APIを使用して、デュアルアカウントモードでApsaraDB RDSデータベースのジェネリックシークレットを回転させる

  • 特権アカウントを使用して、デュアルアカウントモードで自己管理型MySQLデータベースの汎用シークレットを回転させる

ApsaraDB RDS APIを使用して、デュアルアカウントモードでApsaraDB RDSデータベースのジェネリックシークレットを回転させる

Serverless WorkflowとFunction Computeを使用して、ジェネリックシークレットをローテーションできます。

1. 秘密のローテーションの準備

  • Function Computeを有効にします。 詳細については、「Function Computeの有効化」をご参照ください。

  • ApsaraDB RDSデータベースへのログインに使用するアカウントを作成します。 次に、KMSコンソールでアカウントのジェネリックシークレットを作成します。 ジェネリックシークレットが初めてローテーションされた場合、新しいアカウントが生成されます。

    シークレット値はJSON形式です。 例: シークレット名は、Serverless Workflowに渡されるスケジューリングパラメーターとして使用されます。

    {
       "AccountName": "",
     "AccountPassword": ""
    }
    説明

    AccountNameは、ApsaraDB RDSデータベースへのログインに使用されるアカウントのユーザー名を指定し、AccountPasswordは、ApsaraDB RDSデータベースへのログインに使用されるアカウントのパスワードを指定します。

  • Function Computeの通常のサービスロールを作成し、Function ComputeがKMSにアクセスできるようにする権限を付与します。 詳細については、「他のAlibaba Cloudサービスへのアクセス権限付与」をご参照ください。

    • AliyunFCDefaultRolePolicyポリシーをFunction Computeの通常のサービスロールにアタッチします。

    • AliyunSTSAssumeRoleAccessシステムポリシーを通常のサービスロールにアタッチします。

    • KMSへのアクセスを許可するカスタムポリシーを通常のサービスロールにアタッチします。 次のスクリプトは、ポリシーの内容を示しています。

      {
          "Version": "1",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "kms:GetSecretValue",
                      "kms:GetRandomPassword",
                      "kms:PutSecretValue",
                      "kms:UpdateSecretVersionStage"
                  ],
                  "Resource": "*"
              },
              {
                  "Effect": "Allow",
                  "Action": [
                      "rds:GrantAccountPrivilege",
                      "rds:DescribeAccounts",
                      "rds:ResetAccountPassword",
                      "rds:CreateAccount"
                  ],
                  "Resource": "*"
              }
          ]
      }

2. 秘密回転関数の作成

Function Computeコンソールで、Function Computeサービスと秘密のローテーション関数を作成する必要があります。 詳細については、「関数の迅速な作成」をご参照ください。

  1. Function Computeサービスを作成します。

    Function Computeサービスを作成するときは、[詳細オプションの表示] をクリックし、[VPCへのアクセス][はい] に設定する必要があります。 次に、VPCvSwitchセキュリティグループを設定します。 Function Computeサービスが、指定されたセキュリティグループと指定されたVPCのvSwitchを使用してApsaraDB RDS APIとKMSにアクセスできることを確認します。

  2. 関数を作成します。

    この例では、Pythonが使用されます。 関数を作成するときは、ランタイム環境[Python 3.9] を選択します。 次のコードは例を提供します。 ビジネス要件に基づいてコードを変更できます。

    # -*- coding: utf-8 -*-
    import json
    import logging
    import os
    
    from aliyunsdkrds.request.v20140815.CreateAccountRequest import CreateAccountRequest
    from aliyunsdkrds.request.v20140815.DescribeAccountsRequest import DescribeAccountsRequest
    from aliyunsdkrds.request.v20140815.GrantAccountPrivilegeRequest import GrantAccountPrivilegeRequest
    from aliyunsdkrds.request.v20140815.ResetAccountPasswordRequest import ResetAccountPasswordRequest
    
    from aliyunsdkcore.acs_exception.exceptions import ServerException
    from aliyunsdkcore.auth.credentials import StsTokenCredential
    from aliyunsdkcore.client import AcsClient
    from aliyunsdkkms.request.v20160120.GetRandomPasswordRequest import GetRandomPasswordRequest
    from aliyunsdkkms.request.v20160120.GetSecretValueRequest import GetSecretValueRequest
    from aliyunsdkkms.request.v20160120.PutSecretValueRequest import PutSecretValueRequest
    from aliyunsdkkms.request.v20160120.UpdateSecretVersionStageRequest import UpdateSecretVersionStageRequest
    
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    
    def handler(event, context):
        evt = json.loads(event)
        secret_name = evt['SecretName']
        region_id = evt['RegionId']
        step = evt['Step']
        instance_id = evt['InstanceId']
        version_id = evt.get('VersionId')
        if not version_id:
            version_id = context.requestId
        credentials = StsTokenCredential(context.credentials.accessKeyId, context.credentials.accessKeySecret,
                                         context.credentials.securityToken)
        client = AcsClient(region_id=region_id, credential=credentials)
    
        endpoint = "kms-vpc." + region_id + ".aliyuncs.com"
        client.add_endpoint(region_id, 'kms', endpoint)
        resp = get_secret_value(client, secret_name)
        if "Generic" != resp['SecretType']:
            logger.error("Secret %s is not enabled for rotation" % secret_name)
            raise ValueError("Secret %s is not enabled for rotation" % secret_name)
    
        if step == "new":
            new_phase(client, secret_name, version_id)
    
        elif step == "set":
            set_phase(client, instance_id, secret_name, version_id)
    
        elif step == "end":
            end_phase(client, secret_name, version_id)
    
        else:
            logger.error("handler: Invalid step parameter %s for secret %s" % (step, secret_name))
            raise ValueError("Invalid step parameter %s for secret %s" % (step, secret_name))
        return {"VersionId": version_id}
    
    
    def new_phase(client, secret_name, version_id):
        current_dict = get_secret_dict(client, secret_name, "ACSCurrent")
        try:
            get_secret_dict(client, secret_name, "ACSPending", version_id)
            logger.info("new: Successfully retrieved secret for %s." % secret_name)
        except ServerException as e:
            if e.error_code != 'Forbidden.ResourceNotFound':
                raise ValueError("Can to find secret %s " % (secret_name))
            current_dict['AccountName'] = get_alt_account_name(current_dict['AccountName'])
    
            exclude_characters = os.environ[
                'EXCLUDE_CHARACTERS'] if 'EXCLUDE_CHARACTERS' in os.environ else "\\\"\',./:;<>?[]{|}~`"
            passwd = get_random_password(client, exclude_characters)
            current_dict['AccountPassword'] = passwd['RandomPassword']
            put_secret_value(client, secret_name, version_id, json.dumps(current_dict),
                             json.dumps(['ACSPending']))
            logger.info(
                "new: Successfully put secret for secret_name %s and version %s." % (secret_name, version_id))
    
    
    def set_phase(client, instance_id, secret_name, version_id):
        current_dict = get_secret_dict(client, secret_name, "ACSCurrent")
        pending_dict = get_secret_dict(client, secret_name, "ACSPending", version_id)
        pending_resp = describe_accounts(client, instance_id, pending_dict["AccountName"])
        pending_accounts = pending_resp["Accounts"]["DBInstanceAccount"]
    
        if get_alt_account_name(current_dict['AccountName']) != pending_dict['AccountName']:
            logger.error("set: Attempting to modify user %s other than current user or rotation %s" % (
                pending_dict['AccountName'], current_dict['AccountName']))
            raise ValueError("Attempting to modify user %s other than current user or rotation %s" % (
                pending_dict['AccountName'], current_dict['AccountName']))
    
        current_resp = describe_accounts(client, instance_id, current_dict["AccountName"])
        current_accounts = current_resp["Accounts"]["DBInstanceAccount"]
        if len(current_accounts) == 0:
            logger.error("set: Unable to log into database using current credentials for secret %s" % secret_name)
            raise ValueError("Unable to log into database using current credentials for secret %s" % secret_name)
        if len(pending_accounts) == 0:
            create_rds_account(client, instance_id, pending_dict["AccountName"],
                               pending_dict["AccountPassword"], current_accounts[0]["AccountType"])
            pending_accounts = describe_accounts(client, instance_id, pending_dict["AccountName"])["Accounts"][
                "DBInstanceAccount"]
        else:
            # reset password
            reset_account_password(client, instance_id, pending_dict["AccountName"], pending_dict["AccountPassword"])
    
        current_privileges = current_accounts[0]["DatabasePrivileges"]["DatabasePrivilege"]
        pending_privileges = pending_accounts[0]["DatabasePrivileges"]["DatabasePrivilege"]
        if len(current_privileges) > 0:
            for current_privilege in current_privileges:
                is_contains = False
                for pending_privilege in pending_privileges:
                    if current_privilege["DBName"] == pending_privilege["DBName"] and current_privilege[
                        "AccountPrivilege"] == pending_privilege["AccountPrivilege"]:
                        is_contains = True
                        continue
                if not is_contains:
                    grant_account_privilege(client, instance_id, pending_dict["AccountName"], current_privilege["DBName"],
                                            current_privilege["AccountPrivilege"])
    
    
    def end_phase(client, secret_name, version_id):
        update_secret_version_stage(client, secret_name, 'ACSCurrent', move_to_version=version_id)
        update_secret_version_stage(client, secret_name, 'ACSPending', remove_from_version=version_id)
        logger.info(
            "end: Successfully set ACSCurrent stage to version %s for secret %s." % (version_id, secret_name))
    
    
    def get_secret_dict(client, secret_name, stage, version_id=None):
        required_fields = ['AccountName', 'AccountPassword']
        if version_id:
            secret = get_secret_value(client, secret_name, version_id, stage)
        else:
            secret = get_secret_value(client, secret_name, stage=stage)
        plaintext = secret['SecretData']
        secret_dict = json.loads(plaintext)
        for field in required_fields:
            if field not in secret_dict:
                raise KeyError("%s key is missing from secret JSON" % field)
        return secret_dict
    
    
    def get_alt_account_name(current_account_name):
        rotation_suffix = "_rt"
        if current_account_name.endswith(rotation_suffix):
            return current_account_name[:(len(rotation_suffix) * -1)]
        else:
            new_account_name = current_account_name + rotation_suffix
            if len(new_account_name) > 16:
                raise ValueError(
                    "Unable to rotation user, account_name length with _rotation appended would exceed 16 characters")
            return new_account_name
    
    
    def get_secret_value(client, secret_name, version_id=None, stage=None):
        request = GetSecretValueRequest()
        request.set_accept_format('json')
        request.set_SecretName(secret_name)
        if version_id:
            request.set_VersionId(version_id)
        if stage:
            request.set_VersionStage(stage)
        response = client.do_action_with_exception(request)
        return json.loads(response)
    
    
    def put_secret_value(client, secret_name, version_id, secret_data, version_stages=None):
        request = PutSecretValueRequest()
        request.set_accept_format('json')
        request.set_SecretName(secret_name)
        request.set_VersionId(version_id)
        if version_stages:
            request.set_VersionStages(version_stages)
        request.set_SecretData(secret_data)
        response = client.do_action_with_exception(request)
        return json.loads(response)
    
    
    def get_random_password(client, exclude_characters=None):
        request = GetRandomPasswordRequest()
        request.set_accept_format('json')
        if exclude_characters:
            request.set_ExcludeCharacters(exclude_characters)
        response = client.do_action_with_exception(request)
        return json.loads(response)
    
    
    def update_secret_version_stage(client, secret_name, version_stage, remove_from_version=None, move_to_version=None):
        request = UpdateSecretVersionStageRequest()
        request.set_accept_format('json')
        request.set_VersionStage(version_stage)
        request.set_SecretName(secret_name)
        if remove_from_version:
            request.set_RemoveFromVersion(remove_from_version)
        if move_to_version:
            request.set_MoveToVersion(move_to_version)
        response = client.do_action_with_exception(request)
        return json.loads(response)
    
    
    def create_rds_account(client, db_instance_id, account_name, account_password, account_type):
        request = CreateAccountRequest()
        request.set_accept_format('json')
        request.set_DBInstanceId(db_instance_id)
        request.set_AccountName(account_name)
        request.set_AccountPassword(account_password)
        request.set_AccountType(account_type)
        response = client.do_action_with_exception(request)
        return json.loads(response)
    
    
    def grant_account_privilege(client, db_instance_id, account_name, db_name, account_privilege):
        request = GrantAccountPrivilegeRequest()
        request.set_accept_format('json')
        request.set_DBInstanceId(db_instance_id)
        request.set_AccountName(account_name)
        request.set_DBName(db_name)
        request.set_AccountPrivilege(account_privilege)
        response = client.do_action_with_exception(request)
        return json.loads(response)
    
    
    def describe_accounts(client, db_instance_id, account_name):
        request = DescribeAccountsRequest()
        request.set_accept_format('json')
        request.set_DBInstanceId(db_instance_id)
        request.set_AccountName(account_name)
        response = client.do_action_with_exception(request)
        return json.loads(response)
    
    
    def reset_account_password(client, db_instance_id, account_name, account_password):
        request = ResetAccountPasswordRequest()
        request.set_accept_format('json')
        request.set_DBInstanceId(db_instance_id)
        request.set_AccountName(account_name)
        request.set_AccountPassword(account_password)
        response = client.do_action_with_exception(request)
        return json.loads(response)
                            

3. シークレットローテーション用のサーバーレスワークフローフローの作成

  1. 秘密のローテーションフローを作成します。 詳細については、「ワークフローの作成」をご参照ください。

    1. Serverless Workflowコンソールにログインします。

    2. 上部のナビゲーションバーで、フローを作成するリージョンを選択します。

      重要

      関数が作成されるリージョンを選択する必要があります。

    3. [フロー] ページで、[フローの作成] をクリックします。

    4. [フローの作成] ページで、[コードによるフローの作成] をクリックし、パラメーターを設定し、[次のステップ] をクリックします。

      DefinitionのYAMLファイルの内容を次のコードに変更する必要があります。

      version: v1
      type: flow
      steps:
        - type: task
          name: RotateSecretNew
          resourceArn: the Alibaba Cloud Resource Name (ARN) of the function that you created
          inputMappings:
            - target: SecretName
              source: $input.payload.SecretName
            - target: RegionId
              source: $input.payload.RegionId
            - target: InstanceId
              source: $input.payload.InstanceId
            - target: Step
              source: new
        - type: task
          name: RotateSecretSet
          resourceArn: the ARN of the function that you created
          inputMappings:
            - target: SecretName
              source: $input.payload.SecretName
            - target: RegionId
              source: $input.payload.RegionId
            - target: InstanceId
              source: $input.payload.InstanceId
            - target: Step
              source: set
            - target: VersionId
              source: $local.VersionId
        - type: task
          name: RotateSecretEnd
          resourceArn: the ARN of the function that you created
          inputMappings:
            - target: SecretName
              source: $input.payload.SecretName
            - target: RegionId
              source: $input.payload.RegionId
            - target: InstanceId
              source: $input.payload.InstanceId
            - target: Step
              source: end
            - target: VersionId
              source: $local.VersionId
    5. フローロールを設定します。

    6. [フローの作成] をクリックします。

  2. スケジュールされた時間にジェネリックシークレットをローテーションするように設定します。 詳細については、「時間ベースのスケジュールの作成」をご参照ください。

    Serverless Workflowを使用してジェネリックシークレットのスケジュールされたローテーションを設定する場合は、[ペイロード] を次の内容に設定します。

    {
      "SecretName": "",
      "RegionId": "",
      "InstanceId":""
    }
    説明

    SecretNameはシークレット名、RegionIdはリージョン、InstanceIdはApsaraDB RDSインスタンスのIDを指定します。

4. (オプション) アプリケーションをSecrets Managerに接続する

Secrets Manager JDBCを使用して、アプリケーションをSecrets Managerに接続できます。 詳細については、「Secrets Manager JDBC」をご参照ください。

特権アカウントを使用して、デュアルアカウントモードで自己管理型MySQLデータベースの汎用シークレットを回転させる

Serverless WorkflowとFunction Computeを使用して、ジェネリックシークレットをローテーションできます。

1. 秘密のローテーションの準備

  • Function Computeを有効にします。 詳細については、「Function Computeの有効化」をご参照ください。

  • 自己管理型MySQLデータベースへのログインに使用する特権アカウントを作成し、KMSコンソールで特権アカウントの一般的なシークレットを作成します。 特権アカウントは、自己管理MySQLデータベースへのログインに使用されるアカウントの作成またはパスワードの変更にも使用されます。

    シークレット値はJSON形式です。 例:

    {
     "Endpoint": "",
     "AccountName": "",
     "AccountPassword": "",
     "SSL":false
    }
    説明

    Endpointは自己管理型MySQLデータベースのドメイン名またはアドレスを指定し、AccountNameは特権アカウントのユーザー名を指定します。 AccountPasswordは特権アカウントのパスワードを指定し、SSLは証明書を使用するかどうかを指定します。 SSLの有効な値はtrueとfalseです。 デフォルト値:false デフォルトでは、証明書は /opt/python/certs/cert.pemディレクトリに保存されます。

  • 自己管理型MySQLデータベースへのログインに使用するアカウントを作成し、KMSコンソールでそのアカウントの一般的なシークレットを作成します。 ジェネリックシークレットが初めてローテーションされた場合、新しいアカウントが作成されます。

    シークレット値はJSON形式です。 例: シークレット名は、Serverless Workflowに渡されるスケジューリングパラメーターとして使用されます。

    {
     "Endpoint": "",
     "AccountName": "",
     "AccountPassword": "",
     "MasterSecret":"",
     "SSL":false
    }
    説明

    EndpointはセルフマネージドMySQLデータベースのドメイン名またはアドレスを指定します。AccountNameはセルフマネージドMySQLデータベースへのログインに使用されるアカウントのユーザー名を指定します。AccountPasswordはセルフマネージドMySQLデータベースへのログインに使用されるアカウントのパスワードを指定します。MasterSecretは特権アカウントを指定します。SSLは証明書を使用するかどうかを指定します。 SSLの有効な値はtrueとfalseです。 デフォルト値:false デフォルトでは、証明書は /opt/python/certs/cert.pemディレクトリに保存されます。

  • Function Computeの通常のサービスロールを作成し、Function ComputeがKMSにアクセスできるようにする権限を付与します。 詳細については、「他のAlibaba Cloudサービスへのアクセス権限付与」をご参照ください。

    • AliyunFCDefaultRolePolicyポリシーをFunction Computeの通常のサービスロールにアタッチします。

    • AliyunSTSAssumeRoleAccessシステムポリシーを通常のサービスロールにアタッチします。

    • KMSへのアクセスを許可するカスタムポリシーを通常のサービスロールにアタッチします。 次のスクリプトは、ポリシーの内容を示しています。

      {
          "Version": "1",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "kms:GetSecretValue",
                      "kms:GetRandomPassword",
                      "kms:Decrypt",
                      "kms:GenerateDataKey",
                      "kms:PutSecretValue",
                      "kms:UpdateSecretVersionStage"
                  ],
                  "Resource": "*"
              }
          ]
      }
                              

2. 秘密回転関数の作成

  1. Function Computeサービスを作成します。 詳細については、「関数の迅速な作成」をご参照ください。

    Function Computeサービスを作成するときは、[詳細オプションの表示] をクリックし、[VPCへのアクセス][はい] に設定する必要があります。 次に、VPCvSwitchセキュリティグループを設定します。 Function Computeサービスが、指定されたセキュリティグループと指定されたVPCのvSwitchを使用してApsaraDB RDS APIとKMSにアクセスできることを確認します。

  2. 関数を作成します。 詳細については、「関数の迅速な作成」をご参照ください。

    この例では、Pythonが使用されます。 関数を作成するときは、ランタイム環境[Python 3.9] を選択します。 次のコードは例を提供します。 ビジネス要件に基づいてコードを変更できます。

    # -*- coding: utf-8 -*-
    import json
    import logging
    import os
    
    try:
        import pymysql
    except:
        os.system('pip install pymysql -t ./')
        import pymysql
    from aliyunsdkcore.acs_exception.exceptions import ServerException
    from aliyunsdkcore.auth.credentials import StsTokenCredential
    from aliyunsdkcore.client import AcsClient
    from aliyunsdkkms.request.v20160120.GetRandomPasswordRequest import GetRandomPasswordRequest
    from aliyunsdkkms.request.v20160120.GetSecretValueRequest import GetSecretValueRequest
    from aliyunsdkkms.request.v20160120.PutSecretValueRequest import PutSecretValueRequest
    from aliyunsdkkms.request.v20160120.UpdateSecretVersionStageRequest import UpdateSecretVersionStageRequest
    from aliyunsdkrds.request.v20140815.DescribeDBInstancesRequest import DescribeDBInstancesRequest
    
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    
    def handler(event, context):
        evt = json.loads(event)
        secret_name = evt['SecretName']
        region_id = evt['RegionId']
        step = evt['Step']
        version_id = evt.get('VersionId')
        if not version_id:
            version_id = context.requestId
        credentials = StsTokenCredential(context.credentials.accessKeyId, context.credentials.accessKeySecret,
                                         context.credentials.securityToken)
        client = AcsClient(region_id=region_id, credential=credentials)
    
        endpoint = "kms-vpc." + region_id + ".aliyuncs.com"
        client.add_endpoint(region_id, 'kms', endpoint)
        resp = get_secret_value(client, secret_name)
        if "Generic" != resp['SecretType']:
            logger.error("Secret %s is not enabled for rotation" % secret_name)
            raise ValueError("Secret %s is not enabled for rotation" % secret_name)
    
        if step == "new":
            new_phase(client, secret_name, version_id)
    
        elif step == "set":
            set_phase(client, secret_name, version_id)
    
        elif step == "test":
            test_phase(client, secret_name, version_id)
    
        elif step == "end":
            end_phase(client, secret_name, version_id)
    
        else:
            logger.error("handler: Invalid step parameter %s for secret %s" % (step, secret_name))
            raise ValueError("Invalid step parameter %s for secret %s" % (step, secret_name))
        return {"VersionId": version_id}
    
    
    def new_phase(client, secret_name, version_id):
        current_dict = get_secret_dict(client, secret_name, "ACSCurrent")
        try:
            get_secret_dict(client, secret_name, "ACSPending", version_id)
            logger.info("new: Successfully retrieved secret for %s." % secret_name)
        except ServerException as e:
            if e.error_code != 'Forbidden.ResourceNotFound':
                raise
            current_dict['AccountName'] = get_alt_account_name(current_dict['AccountName'])
    
            exclude_characters = os.environ['EXCLUDE_CHARACTERS'] if 'EXCLUDE_CHARACTERS' in os.environ else '/@"\'\\'
            passwd = get_random_password(client, exclude_characters)
            current_dict['AccountPassword'] = passwd['RandomPassword']
            put_secret_value(client, secret_name, version_id, json.dumps(current_dict),
                             json.dumps(['ACSPending']))
            logger.info(
                "new: Successfully put secret for secret_name %s and version %s." % (secret_name, version_id))
    
    
    def set_phase(client, secret_name, version_id):
        current_dict = get_secret_dict(client, secret_name, "ACSCurrent")
        pending_dict = get_secret_dict(client, secret_name, "ACSPending", version_id)
    
        conn = get_connection(pending_dict)
        if conn:
            conn.close()
            logger.info(
                "set: ACSPending secret is already set as password in MySQL DB for secret secret_name %s." % secret_name)
            return
    
        if get_alt_account_name(current_dict['AccountName']) != pending_dict['AccountName']:
            logger.error("set: Attempting to modify user %s other than current user or rotation %s" % (
                pending_dict['AccountName'], current_dict['AccountName']))
            raise ValueError("Attempting to modify user %s other than current user or rotation %s" % (
                pending_dict['AccountName'], current_dict['AccountName']))
    
        if current_dict['Endpoint'] != pending_dict['Endpoint']:
            logger.error("set: Attempting to modify user for Endpoint %s other than current Endpoint %s" % (
                pending_dict['Endpoint'], current_dict['Endpoint']))
            raise ValueError("Attempting to modify user for Endpoint %s other than current Endpoint %s" % (
                pending_dict['Endpoint'], current_dict['Endpoint']))
    
        conn = get_connection(current_dict)
        if not conn:
            logger.error("set: Unable to access the given database using current credentials for secret %s" % secret_name)
            raise ValueError("Unable to access the given database using current credentials for secret %s" % secret_name)
        conn.close()
    
        master_secret = current_dict['MasterSecret']
        master_dict = get_secret_dict(client, master_secret, "ACSCurrent")
        if current_dict['Endpoint'] != master_dict['Endpoint'] and not is_rds_replica_database(current_dict, master_dict):
            logger.error("set: Current database Endpoint %s is not the same Endpoint as/rds replica of master %s" % (
                current_dict['Endpoint'], master_dict['Endpoint']))
            raise ValueError("Current database Endpoint %s is not the same Endpoint as/rds replica of master %s" % (
                current_dict['Endpoint'], master_dict['Endpoint']))
    
        conn = get_connection(master_dict)
        if not conn:
            logger.error(
                "set: Unable to access the given database using credentials in master secret secret %s" % master_secret)
            raise ValueError("Unable to access the given database using credentials in master secret secret %s" % master_secret)
    
        try:
            with conn.cursor() as cur:
                cur.execute("SELECT User FROM mysql.user WHERE User = %s", pending_dict['AccountName'])
                if cur.rowcount == 0:
                    cur.execute("CREATE USER %s IDENTIFIED BY %s",
                                (pending_dict['AccountName'], pending_dict['AccountPassword']))
    
                cur.execute("SHOW GRANTS FOR %s", current_dict['AccountName'])
                for row in cur.fetchall():
                    if 'XA_RECOVER_ADMIN' in row[0]:
                        continue
                    grant = row[0].split(' TO ')
                    new_grant_escaped = grant[0].replace('%', '%%')  # % is a special cha30racter in Python format strings.
                    cur.execute(new_grant_escaped + " TO %s ", (pending_dict['AccountName'],))
                cur.execute("SELECT VERSION()")
                ver = cur.fetchone()[0]
    
                escaped_encryption_statement = get_escaped_encryption_statement(ver)
                cur.execute("SELECT ssl_type, ssl_cipher, x509_issuer, x509_subject FROM mysql.user WHERE User = %s",
                            current_dict['AccountName'])
                tls_options = cur.fetchone()
                ssl_type = tls_options[0]
                if not ssl_type:
                    cur.execute(escaped_encryption_statement + " NONE", pending_dict['AccountName'])
                elif ssl_type == "ANY":
                    cur.execute(escaped_encryption_statement + " SSL", pending_dict['AccountName'])
                elif ssl_type == "X509":
                    cur.execute(escaped_encryption_statement + " X509", pending_dict['AccountName'])
                else:
                    cur.execute(escaped_encryption_statement + " CIPHER %s AND ISSUER %s AND SUBJECT %s",
                                (pending_dict['AccountName'], tls_options[1], tls_options[2], tls_options[3]))
    
                password_option = get_password_option(ver)
                cur.execute("SET PASSWORD FOR %s = " + password_option,
                            (pending_dict['AccountName'], pending_dict['AccountPassword']))
                conn.commit()
                logger.info("set: Successfully changed password for %s in MySQL DB for secret secret_name %s." % (
                    pending_dict['AccountName'], secret_name))
        finally:
            conn.close()
    
    
    def test_phase(client, secret_name, version_id):
        conn = get_connection(get_secret_dict(client, secret_name, "ACSPending", version_id))
        if conn:
            try:
                with conn.cursor() as cur:
                    cur.execute("SELECT NOW()")
                    conn.commit()
            finally:
                conn.close()
    
            logger.info("test: Successfully accessed into MySQL DB with ACSPending secret in %s." % secret_name)
            return
        else:
            logger.error(
                "test: Unable to access the given database with pending secret of secret secret_name %s" % secret_name)
            raise ValueError("Unable to access the given database with pending secret of secret secret_name %s" % secret_name)
    
    
    def end_phase(client, secret_name, version_id):
        update_secret_version_stage(client, secret_name, 'ACSCurrent', move_to_version=version_id)
        update_secret_version_stage(client, secret_name, 'ACSPending', remove_from_version=version_id)
        logger.info(
            "end: Successfully update ACSCurrent stage to version %s for secret %s." % (version_id, secret_name))
    
    
    def get_connection(secret_dict):
        port = int(secret_dict['Port']) if 'Port' in secret_dict else 3306
        dbname = secret_dict['DBName'] if 'DBName' in secret_dict else None
    
        use_ssl, fall_back = get_ssl_config(secret_dict)
    
        conn = connect_and_authenticate(secret_dict, port, dbname, use_ssl)
        if conn or not fall_back:
            return conn
        else:
            return connect_and_authenticate(secret_dict, port, dbname, False)
    
    
    def get_ssl_config(secret_dict):
        if 'SSL' not in secret_dict:
            return True, True
    
        if isinstance(secret_dict['SSL'], bool):
            return secret_dict['SSL'], False
    
        if isinstance(secret_dict['SSL'], str):
            ssl = secret_dict['SSL'].lower()
            if ssl == "true":
                return True, False
            elif ssl == "false":
                return False, False
            else:
                return True, True
    
        return True, True
    
    
    def connect_and_authenticate(secret_dict, port, dbname, use_ssl):
        ssl = {'ca': '/opt/python/certs/cert.pem'} if use_ssl else None
    
        try:
            conn = pymysql.connect(host=secret_dict['Endpoint'], user=secret_dict['AccountName'],
                                   password=secret_dict['AccountPassword'],
                                   port=port, database=dbname, connect_timeout=5, ssl=ssl)
            logger.info("Successfully established %s connection as user '%s' with Endpoint: '%s'" % (
                "SSL/TLS" if use_ssl else "non SSL/TLS", secret_dict['AccountName'], secret_dict['Endpoint']))
            return conn
        except pymysql.OperationalError as e:
            if 'certificate verify failed: IP address mismatch' in e.args[1]:
                logger.error(
                    "Hostname verification failed when estlablishing SSL/TLS Handshake with Endpoint: %s" % secret_dict[
                        'Endpoint'])
            return None
    
    
    def get_secret_dict(client, secret_name, stage, version_id=None):
        required_fields = ['Endpoint', 'AccountName', 'AccountPassword']
        if version_id:
            secret = get_secret_value(client, secret_name, version_id, stage)
        else:
            secret = get_secret_value(client, secret_name, stage=stage)
        plaintext = secret['SecretData']
        secret_dict = json.loads(plaintext)
        for field in required_fields:
            if field not in secret_dict:
                raise KeyError("%s key is missing from secret JSON" % field)
        return secret_dict
    
    
    def get_alt_account_name(current_account_name):
        rotation_suffix = "_rt"
        if current_account_name.endswith(rotation_suffix):
            return current_account_name[:(len(rotation_suffix) * -1)]
        else:
            new_account_name = current_account_name + rotation_suffix
            if len(new_account_name) > 16:
                raise ValueError(
                    "Unable to rotate user, account_name length with _rotation appended would exceed 16 characters")
            return new_account_name
    
    
    def get_password_option(version):
        if version.startswith("8"):
            return "%s"
        else:
            return "PASSWORD(%s)"
    
    
    def get_escaped_encryption_statement(version):
        if version.startswith("5.6"):
            return "GRANT USAGE ON *.* TO %s@'%%' REQUIRE"
        else:
            return "ALTER USER %s@'%%' REQUIRE"
    
    
    def is_rds_replica_database(client, replica_dict, master_dict):
        replica_instance_id = replica_dict['Endpoint'].split(".")[0].replace('io', '')
        master_instance_id = master_dict['Endpoint'].split(".")[0].replace('io', '')
        try:
            describe_response = describe_db_instances(client, replica_instance_id)
        except Exception as err:
            logger.warning("Encountered error while verifying rds replica status: %s" % err)
            return False
        items = describe_response['Items']
        instances = items.get("DBInstance")
        if not instances:
            logger.info("Cannot verify replica status - no RDS instance found with identifier: %s" % replica_instance_id)
            return False
    
        current_instance = instances[0]
        return master_instance_id == current_instance.get('DBInstanceId')
    
    
    def get_secret_value(client, secret_name, version_id=None, stage=None):
        request = GetSecretValueRequest()
        request.set_accept_format('json')
        request.set_SecretName(secret_name)
        if version_id:
            request.set_VersionId(version_id)
        if stage:
            request.set_VersionStage(stage)
        response = client.do_action_with_exception(request)
        return json.loads(response)
    
    
    def put_secret_value(client, secret_name, version_id, secret_data, version_stages=None):
        request = PutSecretValueRequest()
        request.set_accept_format('json')
        request.set_SecretName(secret_name)
        request.set_VersionId(version_id)
        if version_stages:
            request.set_VersionStages(version_stages)
        request.set_SecretData(secret_data)
        response = client.do_action_with_exception(request)
        return json.loads(response)
    
    
    def get_random_password(client, exclude_characters=None):
        request = GetRandomPasswordRequest()
        request.set_accept_format('json')
        if exclude_characters:
            request.set_ExcludeCharacters(exclude_characters)
        response = client.do_action_with_exception(request)
        return json.loads(response)
    
    
    def update_secret_version_stage(client, secret_name, version_stage, remove_from_version=None, move_to_version=None):
        request = UpdateSecretVersionStageRequest()
        request.set_accept_format('json')
        request.set_VersionStage(version_stage)
        request.set_SecretName(secret_name)
        if remove_from_version:
            request.set_RemoveFromVersion(remove_from_version)
        if move_to_version:
            request.set_MoveToVersion(move_to_version)
        response = client.do_action_with_exception(request)
        return json.loads(response)
    
    
    def describe_db_instances(client, db_instance_id):
        request = DescribeDBInstancesRequest()
        request.set_accept_format('json')
        request.set_DBInstanceId(db_instance_id)
        response = client.do_action_with_exception(request)
        return json.loads(response)
    
    
                            
  3. カスタムレイヤーを作成します。 詳細については、「カスタムレイヤーの作成」をご参照ください。

    カスタムレイヤーを使用して、依存関係の頻繁なインストールを防ぐことができます。 この例では、Pythonが使用されます。 サンプルコードでは、PyMySQL依存関係をインストールする必要があります。 MySQLデータベースにSSL証明書が使用されている場合は、SSL証明書をレイヤーに追加することもできます。

    1. レイヤーのZIPパッケージを作成します。

      1. mkdir my-secret-rotateコマンドを実行して、作業ディレクトリを作成します。

      2. cd my-secret-rotateコマンドを実行して、作業ディレクトリに移動します。

      3. を実行します。Run thepip install -- target. /python pymysqlコマンドを実行して、my-secret-rotate/pythonディレクトリに依存関係をインストールします。

      4. MySQLデータベースにSSL証明書が使用されている場合は、pythonディレクトリにcertsディレクトリを作成し、cert.pemファイルをcertsディレクトリに保存します。

      5. my-secret-rotateディレクトリに移動し、zip -r my-secret-rotate.zip pythonコマンドを実行して依存関係をパッケージ化します。

    2. Function Computeコンソールでカスタムレイヤーを作成します。

  4. 関数のカスタムレイヤーを設定します。 詳細については、「レイヤーの管理」をご参照ください。

    1. [サービス] ページで、機能を管理するサービスを見つけ、[操作] 列の [機能] をクリックします。

    2. [関数] をクリックし、管理する関数を見つけて、[操作] 列の [設定] をクリックします。 [レイヤー] セクションで、関数のカスタムレイヤーを追加します。

3. シークレットローテーション用のサーバーレスワークフローフローの作成

  1. 秘密のローテーションフローを作成します。 詳細については、「ワークフローの作成」をご参照ください。

    1. Serverless Workflowコンソールにログインします。

    2. 上部のナビゲーションバーで、フローを作成するリージョンを選択します。

      重要

      関数が作成されるリージョンを選択する必要があります。

    3. [フロー] ページで、[フローの作成] をクリックします。

    4. [フローの作成] ページで、[コードによるフローの作成] をクリックし、パラメーターを設定し、[次のステップ] をクリックします。

      DefinitionのYAMLファイルの内容を次のコードに変更する必要があります。

      version: v1
      type: flow
      steps:
        - type: task
          name: RotateSecretNew
          resourceArn: the ARN of the function that you created
          inputMappings:
            - target: SecretName
              source: $input.payload.SecretName
            - target: RegionId
              source: $input.payload.RegionId
            - target: Step
              source: new
        - type: task
          name: RotateSecretSet
          resourceArn: the ARN of the function that you created
          inputMappings:
            - target: SecretName
              source: $input.payload.SecretName
            - target: RegionId
              source: $input.payload.RegionId
            - target: Step
              source: set
            - target: VersionId
              source: $local.VersionId
        - type: task
          name: RotateSecretTest
          resourceArn: the ARN of the function that you created
          inputMappings:
            - target: SecretName
              source: $input.payload.SecretName
            - target: RegionId
              source: $input.payload.RegionId
            - target: Step
              source: test
            - target: VersionId
              source: $local.VersionId
        - type: task
          name: RotateSecretEnd
          resourceArn: the ARN of the function that you created
          inputMappings:
            - target: SecretName
              source: $input.payload.SecretName
            - target: RegionId
              source: $input.payload.RegionId
            - target: Step
              source: end
            - target: VersionId
              source: $local.VersionId
    5. フローロールを設定します。

    6. [フローの作成] をクリックします。

  2. スケジュールされた時間にジェネリックシークレットをローテーションするように設定します。 詳細については、「時間ベースのスケジュールの作成」をご参照ください。

    Serverless Workflowを使用してジェネリックシークレットのスケジュールされたローテーションを設定する場合は、[ペイロード] を次の内容に設定します。

    {
      "SecretName": "",
      "RegionId": ""
    }
    説明

    SecretNameはシークレット名を指定し、RegionIdはリージョンを指定します。

4. (オプション) アプリケーションをSecrets Managerに接続する

Secrets Manager JDBCを使用して、アプリケーションをSecrets Managerに接続できます。 詳細については、「Secrets Manager JDBC」をご参照ください。