全部产品
Search
文档中心

Key Management Service:Gunakan Function Compute untuk merotasi rahasia generik

更新时间:Jul 02, 2025

Anda dapat menggunakan Function Compute untuk merotasi rahasia generik guna meningkatkan keamanan informasi sensitif. Topik ini menjelaskan cara menggunakan Function Compute untuk merotasi rahasia generik.

Informasi latar belakang

Function Compute adalah layanan komputasi berbasis acara yang sepenuhnya dikelola. Dengan Function Compute, Anda dapat fokus pada penulisan kode tanpa perlu memperoleh atau mengelola sumber daya infrastruktur seperti server. Cukup tulis dan unggah kode Anda, lalu Function Compute akan mengalokasikan sumber daya komputasi, menjalankan tugas secara elastis dan andal, serta menyediakan fitur seperti log kueri, pemantauan kinerja, dan peringatan. Untuk informasi lebih lanjut, lihat Apa itu Function Compute?.

Penagihan

Jika Anda menggunakan Function Compute untuk merotasi rahasia generik, Anda tidak dikenakan biaya untuk rotasi rahasia yang disediakan oleh Key Management Service (KMS). Namun, Anda akan dikenakan biaya untuk Function Compute dan CloudFlow yang diintegrasikan ke dalam Function Compute. Untuk informasi lebih lanjut tentang penagihan, lihat Penagihan Function Compute dan Penagihan CloudFlow.

Penting

Secrets Manager menyediakan rahasia dinamis yang secara otomatis dirotasi. Jika rahasia dinamis dapat memenuhi persyaratan keamanan Anda, kami sarankan Anda menggunakan rahasia dinamis untuk mengurangi biaya. Untuk informasi lebih lanjut, lihat Ikhtisar Rahasia Dinamis ApsaraDB RDS, Ikhtisar, atau Ikhtisar Rahasia Dinamis ECS.

Proses rotasi rahasia

Topik ini menjelaskan cara merotasi rahasia generik dalam skenario berikut:

  • Rotasi rahasia generik database ApsaraDB RDS dalam mode dua akun menggunakan API ApsaraDB RDS.

  • Rotasi rahasia generik database MySQL yang dikelola sendiri dalam mode dua akun menggunakan akun istimewa.

Rotasi rahasia generik database ApsaraDB RDS dalam mode dua akun menggunakan API ApsaraDB RDS

Anda dapat menggunakan CloudFlow dan Function Compute untuk merotasi rahasia generik.

1. Persiapkan untuk rotasi rahasia

  • Aktifkan Function Compute. Untuk informasi lebih lanjut, lihat Aktifkan Function Compute.

  • Buat akun yang digunakan untuk masuk ke database ApsaraDB RDS Anda. Kemudian, buat rahasia generik untuk akun tersebut di konsol KMS. Jika rahasia generik dirotasi untuk pertama kalinya, akun baru akan dibuat.

    Nilai rahasia dalam format JSON. Contoh: Nama rahasia digunakan sebagai parameter penjadwalan yang akan diteruskan ke CloudFlow.

    {
       "AccountName": "",
     "AccountPassword": ""
    }
    Catatan

    AccountName menentukan nama pengguna akun yang digunakan untuk masuk ke database ApsaraDB RDS Anda, dan AccountPassword menentukan kata sandi akun yang digunakan untuk masuk ke database ApsaraDB RDS Anda.

  • Buat peran layanan normal untuk Function Compute dan berikan izin untuk mengizinkan Function Compute mengakses KMS. Untuk informasi lebih lanjut, lihat Berikan Izin Function Compute untuk Mengakses Layanan Alibaba Cloud Lainnya.

    • Lampirkan kebijakan AliyunFCDefaultRolePolicy ke peran layanan normal Function Compute.

    • Lampirkan kebijakan sistem AliyunSTSAssumeRoleAccess ke peran layanan normal.

    • Lampirkan kebijakan kustom yang mengizinkan akses ke KMS ke peran layanan normal. Skrip berikut menunjukkan isi dari kebijakan:

      {
          "Version": "1",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "kms:GetSecretValue",
                      "kms:GetRandomPassword",
                      "kms:PutSecretValue",
                      "kms:UpdateSecretVersionStage"
                  ],
                  "Resource": "*"
              },
              {
                  "Effect": "Allow",
                  "Action": [
                      "rds:GrantAccountPrivilege",
                      "rds:DescribeAccounts",
                      "rds:ResetAccountPassword",
                      "rds:CreateAccount"
                  ],
                  "Resource": "*"
              }
          ]
      }

2. Buat fungsi rotasi rahasia

Anda harus membuat layanan Function Compute dan fungsi rotasi rahasia di konsol Function Compute. Untuk informasi lebih lanjut, lihat Buat Fungsi dengan Cepat.

  1. Buat layanan Function Compute.

    Saat membuat layanan Function Compute, klik Show Advanced Options dan atur Access to VPC menjadi Yes. Kemudian, konfigurasikan VPC, vSwitch, dan Security Group. Pastikan bahwa layanan Function Compute dapat mengakses API ApsaraDB RDS dan KMS menggunakan grup keamanan dan vSwitch tertentu di VPC tertentu.

  2. Buat fungsi.

    Dalam contoh ini, Python digunakan. Saat membuat fungsi, pilih Python 3.9 untuk Runtime Environments. Kode berikut memberikan contoh. Anda dapat memodifikasi kode sesuai dengan kebutuhan bisnis Anda.

    # -*- coding: utf-8 -*-
    import json
    import logging
    import os
    
    from aliyunsdkrds.request.v20140815.CreateAccountRequest import CreateAccountRequest
    from aliyunsdkrds.request.v20140815.DescribeAccountsRequest import DescribeAccountsRequest
    from aliyunsdkrds.request.v20140815.GrantAccountPrivilegeRequest import GrantAccountPrivilegeRequest
    from aliyunsdkrds.request.v20140815.ResetAccountPasswordRequest import ResetAccountPasswordRequest
    
    from aliyunsdkcore.acs_exception.exceptions import ServerException
    from aliyunsdkcore.auth.credentials import StsTokenCredential
    from aliyunsdkcore.client import AcsClient
    from aliyunsdkkms.request.v20160120.GetRandomPasswordRequest import GetRandomPasswordRequest
    from aliyunsdkkms.request.v20160120.GetSecretValueRequest import GetSecretValueRequest
    from aliyunsdkkms.request.v20160120.PutSecretValueRequest import PutSecretValueRequest
    from aliyunsdkkms.request.v20160120.UpdateSecretVersionStageRequest import UpdateSecretVersionStageRequest
    
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    
    def handler(event, context):
        evt = json.loads(event)
        secret_name = evt['SecretName']
        region_id = evt['RegionId']
        step = evt['Step']
        instance_id = evt['InstanceId']
        version_id = evt.get('VersionId')
        if not version_id:
            version_id = context.requestId
        credentials = StsTokenCredential(context.credentials.accessKeyId, context.credentials.accessKeySecret,
                                         context.credentials.securityToken)
        client = AcsClient(region_id=region_id, credential=credentials)
    
        endpoint = "kms-vpc." + region_id + ".aliyuncs.com"
        client.add_endpoint(region_id, 'kms', endpoint)
        resp = get_secret_value(client, secret_name)
        if "Generic" != resp['SecretType']:
            logger.error("Secret %s is not enabled for rotation" % secret_name)
            raise ValueError("Secret %s is not enabled for rotation" % secret_name)
    
        if step == "new":
            new_phase(client, secret_name, version_id)
    
        elif step == "set":
            set_phase(client, instance_id, secret_name, version_id)
    
        elif step == "end":
            end_phase(client, secret_name, version_id)
    
        else:
            logger.error("handler: Invalid step parameter %s for secret %s" % (step, secret_name))
            raise ValueError("Invalid step parameter %s for secret %s" % (step, secret_name))
        return {"VersionId": version_id}
    
    
    def new_phase(client, secret_name, version_id):
        current_dict = get_secret_dict(client, secret_name, "ACSCurrent")
        try:
            get_secret_dict(client, secret_name, "ACSPending", version_id)
            logger.info("new: Successfully retrieved secret for %s." % secret_name)
        except ServerException as e:
            if e.error_code != 'Forbidden.ResourceNotFound':
                raise ValueError("Can to find secret %s " % (secret_name))
            current_dict['AccountName'] = get_alt_account_name(current_dict['AccountName'])
    
            exclude_characters = os.environ[
                'EXCLUDE_CHARACTERS'] if 'EXCLUDE_CHARACTERS' in os.environ else "\\\"\',./:;<>?[]{|}~`"
            passwd = get_random_password(client, exclude_characters)
            current_dict['AccountPassword'] = passwd['RandomPassword']
            put_secret_value(client, secret_name, version_id, json.dumps(current_dict),
                             json.dumps(['ACSPending']))
            logger.info(
                "new: Successfully put secret for secret_name %s and version %s." % (secret_name, version_id))
    
    
    def set_phase(client, instance_id, secret_name, version_id):
        current_dict = get_secret_dict(client, secret_name, "ACSCurrent")
        pending_dict = get_secret_dict(client, secret_name, "ACSPending", version_id)
        pending_resp = describe_accounts(client, instance_id, pending_dict["AccountName"])
        pending_accounts = pending_resp["Accounts"]["DBInstanceAccount"]
    
        if get_alt_account_name(current_dict['AccountName']) != pending_dict['AccountName']:
            logger.error("set: Attempting to modify user %s other than current user or rotation %s" % (
                pending_dict['AccountName'], current_dict['AccountName']))
            raise ValueError("Attempting to modify user %s other than current user or rotation %s" % (
                pending_dict['AccountName'], current_dict['AccountName']))
    
        current_resp = describe_accounts(client, instance_id, current_dict["AccountName"])
        current_accounts = current_resp["Accounts"]["DBInstanceAccount"]
        if len(current_accounts) == 0:
            logger.error("set: Unable to log into database using current credentials for secret %s" % secret_name)
            raise ValueError("Unable to log into database using current credentials for secret %s" % secret_name)
        if len(pending_accounts) == 0:
            create_rds_account(client, instance_id, pending_dict["AccountName"],
                               pending_dict["AccountPassword"], current_accounts[0]["AccountType"])
            pending_accounts = describe_accounts(client, instance_id, pending_dict["AccountName"])["Accounts"][
                "DBInstanceAccount"]
        else:
            # reset password
            reset_account_password(client, instance_id, pending_dict["AccountName"], pending_dict["AccountPassword"])
    
        current_privileges = current_accounts[0]["DatabasePrivileges"]["DatabasePrivilege"]
        pending_privileges = pending_accounts[0]["DatabasePrivileges"]["DatabasePrivilege"]
        if len(current_privileges) > 0:
            for current_privilege in current_privileges:
                is_contains = False
                for pending_privilege in pending_privileges:
                    if current_privilege["DBName"] == pending_privilege["DBName"] and current_privilege[
                        "AccountPrivilege"] == pending_privilege["AccountPrivilege"]:
                        is_contains = True
                        continue
                if not is_contains:
                    grant_account_privilege(client, instance_id, pending_dict["AccountName"], current_privilege["DBName"],
                                            current_privilege["AccountPrivilege"])
    
    
    def end_phase(client, secret_name, version_id):
        update_secret_version_stage(client, secret_name, 'ACSCurrent', move_to_version=version_id)
        update_secret_version_stage(client, secret_name, 'ACSPending', remove_from_version=version_id)
        logger.info(
            "end: Successfully set ACSCurrent stage to version %s for secret %s." % (version_id, secret_name))
    
    
    def get_secret_dict(client, secret_name, stage, version_id=None):
        required_fields = ['AccountName', 'AccountPassword']
        if version_id:
            secret = get_secret_value(client, secret_name, version_id, stage)
        else:
            secret = get_secret_value(client, secret_name, stage=stage)
        plaintext = secret['SecretData']
        secret_dict = json.loads(plaintext)
        for field in required_fields:
            if field not in secret_dict:
                raise KeyError("%s key is missing from secret JSON" % field)
        return secret_dict
    
    
    def get_alt_account_name(current_account_name):
        rotation_suffix = "_rt"
        if current_account_name.endswith(rotation_suffix):
            return current_account_name[:(len(rotation_suffix) * -1)]
        else:
            new_account_name = current_account_name + rotation_suffix
            if len(new_account_name) > 16:
                raise ValueError(
                    "Unable to rotation user, account_name length with _rotation appended would exceed 16 characters")
            return new_account_name
    
    
    def get_secret_value(client, secret_name, version_id=None, stage=None):
        request = GetSecretValueRequest()
        request.set_accept_format('json')
        request.set_SecretName(secret_name)
        if version_id:
            request.set_VersionId(version_id)
        if stage:
            request.set_VersionStage(stage)
        response = client.do_action_with_exception(request)
        return json.loads(response)
    
    
    def put_secret_value(client, secret_name, version_id, secret_data, version_stages=None):
        request = PutSecretValueRequest()
        request.set_accept_format('json')
        request.set_SecretName(secret_name)
        request.set_VersionId(version_id)
        if version_stages:
            request.set_VersionStages(version_stages)
        request.set_SecretData(secret_data)
        response = client.do_action_with_exception(request)
        return json.loads(response)
    
    
    def get_random_password(client, exclude_characters=None):
        request = GetRandomPasswordRequest()
        request.set_accept_format('json')
        if exclude_characters:
            request.set_ExcludeCharacters(exclude_characters)
        response = client.do_action_with_exception(request)
        return json.loads(response)
    
    
    def update_secret_version_stage(client, secret_name, version_stage, remove_from_version=None, move_to_version=None):
        request = UpdateSecretVersionStageRequest()
        request.set_accept_format('json')
        request.set_VersionStage(version_stage)
        request.set_SecretName(secret_name)
        if remove_from_version:
            request.set_RemoveFromVersion(remove_from_version)
        if move_to_version:
            request.set_MoveToVersion(move_to_version)
        response = client.do_action_with_exception(request)
        return json.loads(response)
    
    
    def create_rds_account(client, db_instance_id, account_name, account_password, account_type):
        request = CreateAccountRequest()
        request.set_accept_format('json')
        request.set_DBInstanceId(db_instance_id)
        request.set_AccountName(account_name)
        request.set_AccountPassword(account_password)
        request.set_AccountType(account_type)
        response = client.do_action_with_exception(request)
        return json.loads(response)
    
    
    def grant_account_privilege(client, db_instance_id, account_name, db_name, account_privilege):
        request = GrantAccountPrivilegeRequest()
        request.set_accept_format('json')
        request.set_DBInstanceId(db_instance_id)
        request.set_AccountName(account_name)
        request.set_DBName(db_name)
        request.set_AccountPrivilege(account_privilege)
        response = client.do_action_with_exception(request)
        return json.loads(response)
    
    
    def describe_accounts(client, db_instance_id, account_name):
        request = DescribeAccountsRequest()
        request.set_accept_format('json')
        request.set_DBInstanceId(db_instance_id)
        request.set_AccountName(account_name)
        response = client.do_action_with_exception(request)
        return json.loads(response)
    
    
    def reset_account_password(client, db_instance_id, account_name, account_password):
        request = ResetAccountPasswordRequest()
        request.set_accept_format('json')
        request.set_DBInstanceId(db_instance_id)
        request.set_AccountName(account_name)
        request.set_AccountPassword(account_password)
        response = client.do_action_with_exception(request)
        return json.loads(response)
                            

3. Buat alur CloudFlow untuk rotasi rahasia

  1. Buat alur rotasi rahasia. Untuk informasi lebih lanjut, lihat Buat Alur Kerja.

    1. Masuk ke konsol CloudFlow.

    2. Di bilah navigasi atas, pilih wilayah tempat Anda ingin membuat alur.

      Penting

      Anda harus memilih wilayah tempat fungsi dibuat.

    3. Di halaman Flows, klik Create flow.

    4. Di halaman Create Flow, klik Create Flow with Code, konfigurasikan parameter, lalu klik Next Step.

      Anda harus mengubah konten dalam file YAML Definition menjadi kode berikut:

      version: v1
      type: flow
      steps:
        - type: task
          name: RotateSecretNew
          resourceArn: the Alibaba Cloud Resource Name (ARN) of the function that you created
          inputMappings:
            - target: SecretName
              source: $input.payload.SecretName
            - target: RegionId
              source: $input.payload.RegionId
            - target: InstanceId
              source: $input.payload.InstanceId
            - target: Step
              source: new
        - type: task
          name: RotateSecretSet
          resourceArn: the ARN of the function that you created
          inputMappings:
            - target: SecretName
              source: $input.payload.SecretName
            - target: RegionId
              source: $input.payload.RegionId
            - target: InstanceId
              source: $input.payload.InstanceId
            - target: Step
              source: set
            - target: VersionId
              source: $local.VersionId
        - type: task
          name: RotateSecretEnd
          resourceArn: the ARN of the function that you created
          inputMappings:
            - target: SecretName
              source: $input.payload.SecretName
            - target: RegionId
              source: $input.payload.RegionId
            - target: InstanceId
              source: $input.payload.InstanceId
            - target: Step
              source: end
            - target: VersionId
              source: $local.VersionId
    5. Konfigurasikan peran alur.

    6. Klik Create Flow.

  2. Konfigurasikan pengaturan untuk merotasi rahasia generik pada waktu terjadwal. Untuk informasi lebih lanjut, lihat Buat Jadwal Berbasis Waktu.

    Jika Anda ingin menggunakan CloudFlow untuk mengonfigurasi rotasi terjadwal rahasia generik, atur Payload menjadi konten berikut:

    {
      "SecretName": "",
      "RegionId": "",
      "InstanceId":""
    }
    Catatan

    SecretName menentukan nama rahasia, RegionId menentukan wilayah, dan InstanceId menentukan ID instance ApsaraDB RDS Anda.

4. (Opsional) Hubungkan aplikasi ke Secrets Manager

Anda dapat menggunakan Secrets Manager JDBC untuk menghubungkan aplikasi ke Secrets Manager. Untuk informasi lebih lanjut, lihat Secrets Manager JDBC.

Rotasi rahasia generik database MySQL yang dikelola sendiri dalam mode dua akun menggunakan akun istimewa

Anda dapat menggunakan CloudFlow dan Function Compute untuk merotasi rahasia generik.

1. Persiapkan untuk rotasi rahasia

  • Aktifkan Function Compute. Untuk informasi lebih lanjut, lihat Aktifkan Function Compute.

  • Buat akun istimewa yang digunakan untuk masuk ke database MySQL yang dikelola sendiri dan buat rahasia generik untuk akun istimewa tersebut di konsol KMS. Akun istimewa juga digunakan untuk membuat akun atau memodifikasi kata sandi akun yang digunakan untuk masuk ke database MySQL yang dikelola sendiri.

    Nilai rahasia dalam format JSON. Contoh:

    {
     "Endpoint": "",
     "AccountName": "",
     "AccountPassword": "",
     "SSL":false
    }
    Catatan

    Endpoint menentukan nama domain atau alamat database MySQL yang dikelola sendiri Anda, dan AccountName menentukan nama pengguna akun istimewa. AccountPassword menentukan kata sandi akun istimewa, dan SSL menentukan apakah akan menggunakan sertifikat. Nilai valid untuk SSL adalah true dan false. Nilai default: false. Secara default, sertifikat disimpan di direktori /opt/python/certs/cert.pem.

  • Buat akun yang digunakan untuk masuk ke database MySQL yang dikelola sendiri dan buat rahasia generik untuk akun tersebut di konsol KMS. Jika rahasia generik dirotasi untuk pertama kalinya, akun baru akan dibuat.

    Nilai rahasia dalam format JSON. Contoh: Nama rahasia digunakan sebagai parameter penjadwalan yang akan diteruskan ke CloudFlow.

    {
     "Endpoint": "",
     "AccountName": "",
     "AccountPassword": "",
     "MasterSecret":"",
     "SSL":false
    }
    Catatan

    Endpoint menentukan nama domain atau alamat database MySQL yang dikelola sendiri Anda, AccountName menentukan nama pengguna akun yang digunakan untuk masuk ke database MySQL yang dikelola sendiri Anda, AccountPassword menentukan kata sandi akun yang digunakan untuk masuk ke database MySQL yang dikelola sendiri Anda, MasterSecret menentukan akun istimewa, dan SSL menentukan apakah akan menggunakan sertifikat. Nilai valid untuk SSL adalah true dan false. Nilai default: false. Secara default, sertifikat disimpan di direktori /opt/python/certs/cert.pem.

  • Buat peran layanan normal untuk Function Compute dan berikan izin untuk mengizinkan Function Compute mengakses KMS. Untuk informasi lebih lanjut, lihat Berikan Izin Function Compute untuk Mengakses Layanan Alibaba Cloud Lainnya.

    • Lampirkan kebijakan AliyunFCDefaultRolePolicy ke peran layanan normal Function Compute.

    • Lampirkan kebijakan sistem AliyunSTSAssumeRoleAccess ke peran layanan normal.

    • Lampirkan kebijakan kustom yang mengizinkan akses ke KMS ke peran layanan normal. Skrip berikut menunjukkan isi dari kebijakan:

      {
          "Version": "1",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "kms:GetSecretValue",
                      "kms:GetRandomPassword",
                      "kms:Decrypt",
                      "kms:GenerateDataKey",
                      "kms:PutSecretValue",
                      "kms:UpdateSecretVersionStage"
                  ],
                  "Resource": "*"
              }
          ]
      }
                              

2. Buat fungsi rotasi rahasia

  1. Buat layanan Function Compute. Untuk informasi lebih lanjut, lihat Buat Fungsi dengan Cepat.

    Saat membuat layanan Function Compute, klik Show Advanced Options dan atur Access to VPC menjadi Yes. Kemudian, konfigurasikan VPC, vSwitch, dan Security Group. Pastikan bahwa layanan Function Compute dapat mengakses API ApsaraDB RDS dan KMS menggunakan grup keamanan dan vSwitch tertentu di VPC tertentu.

  2. Buat fungsi. Untuk informasi lebih lanjut, lihat Buat Fungsi dengan Cepat.

    Dalam contoh ini, Python digunakan. Saat membuat fungsi, pilih Python 3.9 untuk Runtime Environments. Kode berikut memberikan contoh. Anda dapat memodifikasi kode sesuai dengan kebutuhan bisnis Anda.

    # -*- coding: utf-8 -*-
    import json
    import logging
    import os
    
    try:
        import pymysql
    except:
        os.system('pip install pymysql -t ./')
        import pymysql
    from aliyunsdkcore.acs_exception.exceptions import ServerException
    from aliyunsdkcore.auth.credentials import StsTokenCredential
    from aliyunsdkcore.client import AcsClient
    from aliyunsdkkms.request.v20160120.GetRandomPasswordRequest import GetRandomPasswordRequest
    from aliyunsdkkms.request.v20160120.GetSecretValueRequest import GetSecretValueRequest
    from aliyunsdkkms.request.v20160120.PutSecretValueRequest import PutSecretValueRequest
    from aliyunsdkkms.request.v20160120.UpdateSecretVersionStageRequest import UpdateSecretVersionStageRequest
    from aliyunsdkrds.request.v20140815.DescribeDBInstancesRequest import DescribeDBInstancesRequest
    
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    
    def handler(event, context):
        evt = json.loads(event)
        secret_name = evt['SecretName']
        region_id = evt['RegionId']
        step = evt['Step']
        version_id = evt.get('VersionId')
        if not version_id:
            version_id = context.requestId
        credentials = StsTokenCredential(context.credentials.accessKeyId, context.credentials.accessKeySecret,
                                         context.credentials.securityToken)
        client = AcsClient(region_id=region_id, credential=credentials)
    
        endpoint = "kms-vpc." + region_id + ".aliyuncs.com"
        client.add_endpoint(region_id, 'kms', endpoint)
        resp = get_secret_value(client, secret_name)
        if "Generic" != resp['SecretType']:
            logger.error("Secret %s is not enabled for rotation" % secret_name)
            raise ValueError("Secret %s is not enabled for rotation" % secret_name)
    
        if step == "new":
            new_phase(client, secret_name, version_id)
    
        elif step == "set":
            set_phase(client, secret_name, version_id)
    
        elif step == "test":
            test_phase(client, secret_name, version_id)
    
        elif step == "end":
            end_phase(client, secret_name, version_id)
    
        else:
            logger.error("handler: Invalid step parameter %s for secret %s" % (step, secret_name))
            raise ValueError("Invalid step parameter %s for secret %s" % (step, secret_name))
        return {"VersionId": version_id}
    
    
    def new_phase(client, secret_name, version_id):
        current_dict = get_secret_dict(client, secret_name, "ACSCurrent")
        try:
            get_secret_dict(client, secret_name, "ACSPending", version_id)
            logger.info("new: Successfully retrieved secret for %s." % secret_name)
        except ServerException as e:
            if e.error_code != 'Forbidden.ResourceNotFound':
                raise
            current_dict['AccountName'] = get_alt_account_name(current_dict['AccountName'])
    
            exclude_characters = os.environ['EXCLUDE_CHARACTERS'] if 'EXCLUDE_CHARACTERS' in os.environ else '/@"\'\\'
            passwd = get_random_password(client, exclude_characters)
            current_dict['AccountPassword'] = passwd['RandomPassword']
            put_secret_value(client, secret_name, version_id, json.dumps(current_dict),
                             json.dumps(['ACSPending']))
            logger.info(
                "new: Successfully put secret for secret_name %s and version %s." % (secret_name, version_id))
    
    
    def set_phase(client, secret_name, version_id):
        current_dict = get_secret_dict(client, secret_name, "ACSCurrent")
        pending_dict = get_secret_dict(client, secret_name, "ACSPending", version_id)
    
        conn = get_connection(pending_dict)
        if conn:
            conn.close()
            logger.info(
                "set: ACSPending secret is already set as password in MySQL DB for secret secret_name %s." % secret_name)
            return
    
        if get_alt_account_name(current_dict['AccountName']) != pending_dict['AccountName']:
            logger.error("set: Attempting to modify user %s other than current user or rotation %s" % (
                pending_dict['AccountName'], current_dict['AccountName']))
            raise ValueError("Attempting to modify user %s other than current user or rotation %s" % (
                pending_dict['AccountName'], current_dict['AccountName']))
    
        if current_dict['Endpoint'] != pending_dict['Endpoint']:
            logger.error("set: Attempting to modify user for Endpoint %s other than current Endpoint %s" % (
                pending_dict['Endpoint'], current_dict['Endpoint']))
            raise ValueError("Attempting to modify user for Endpoint %s other than current Endpoint %s" % (
                pending_dict['Endpoint'], current_dict['Endpoint']))
    
        conn = get_connection(current_dict)
        if not conn:
            logger.error("set: Unable to access the given database using current credentials for secret %s" % secret_name)
            raise ValueError("Unable to access the given database using current credentials for secret %s" % secret_name)
        conn.close()
    
        master_secret = current_dict['MasterSecret']
        master_dict = get_secret_dict(client, master_secret, "ACSCurrent")
        if current_dict['Endpoint'] != master_dict['Endpoint'] and not is_rds_replica_database(current_dict, master_dict):
            logger.error("set: Current database Endpoint %s is not the same Endpoint as/rds replica of master %s" % (
                current_dict['Endpoint'], master_dict['Endpoint']))
            raise ValueError("Current database Endpoint %s is not the same Endpoint as/rds replica of master %s" % (
                current_dict['Endpoint'], master_dict['Endpoint']))
    
        conn = get_connection(master_dict)
        if not conn:
            logger.error(
                "set: Unable to access the given database using credentials in master secret secret %s" % master_secret)
            raise ValueError("Unable to access the given database using credentials in master secret secret %s" % master_secret)
    
        try:
            with conn.cursor() as cur:
                cur.execute("SELECT User FROM mysql.user WHERE User = %s", pending_dict['AccountName'])
                if cur.rowcount == 0:
                    cur.execute("CREATE USER %s IDENTIFIED BY %s",
                                (pending_dict['AccountName'], pending_dict['AccountPassword']))
    
                cur.execute("SHOW GRANTS FOR %s", current_dict['AccountName'])
                for row in cur.fetchall():
                    if 'XA_RECOVER_ADMIN' in row[0]:
                        continue
                    grant = row[0].split(' TO ')
                    new_grant_escaped = grant[0].replace('%', '%%')  # % adalah karakter khusus dalam string format Python.
                    cur.execute(new_grant_escaped + " TO %s ", (pending_dict['AccountName'],))
                cur.execute("SELECT VERSION()")
                ver = cur.fetchone()[0]
    
                escaped_encryption_statement = get_escaped_encryption_statement(ver)
                cur.execute("SELECT ssl_type, ssl_cipher, x509_issuer, x509_subject FROM mysql.user WHERE User = %s",
                            current_dict['AccountName'])
                tls_options = cur.fetchone()
                ssl_type = tls_options[0]
                if not ssl_type:
                    cur.execute(escaped_encryption_statement + " NONE", pending_dict['AccountName'])
                elif ssl_type == "ANY":
                    cur.execute(escaped_encryption_statement + " SSL", pending_dict['AccountName'])
                elif ssl_type == "X509":
                    cur.execute(escaped_encryption_statement + " X509", pending_dict['AccountName'])
                else:
                    cur.execute(escaped_encryption_statement + " CIPHER %s AND ISSUER %s AND SUBJECT %s",
                                (pending_dict['AccountName'], tls_options[1], tls_options[2], tls_options[3]))
    
                password_option = get_password_option(ver)
                cur.execute("SET PASSWORD FOR %s = " + password_option,
                            (pending_dict['AccountName'], pending_dict['AccountPassword']))
                conn.commit()
                logger.info("set: Successfully changed password for %s in MySQL DB for secret secret_name %s." % (
                    pending_dict['AccountName'], secret_name))
        finally:
            conn.close()
    
    
    def test_phase(client, secret_name, version_id):
        conn = get_connection(get_secret_dict(client, secret_name, "ACSPending", version_id))
        if conn:
            try:
                with conn.cursor() as cur:
                    cur.execute("SELECT NOW()")
                    conn.commit()
            finally:
                conn.close()
    
            logger.info("test: Successfully accessed into MySQL DB with ACSPending secret in %s." % secret_name)
            return
        else:
            logger.error(
                "test: Unable to access the given database with pending secret of secret secret_name %s" % secret_name)
            raise ValueError("Unable to access the given database with pending secret of secret secret_name %s" % secret_name)
    
    
    def end_phase(client, secret_name, version_id):
        update_secret_version_stage(client, secret_name, 'ACSCurrent', move_to_version=version_id)
        update_secret_version_stage(client, secret_name, 'ACSPending', remove_from_version=version_id)
        logger.info(
            "end: Successfully update ACSCurrent stage to version %s for secret %s." % (version_id, secret_name))
    
    
    def get_connection(secret_dict):
        port = int(secret_dict['Port']) if 'Port' in secret_dict else 3306
        dbname = secret_dict['DBName'] if 'DBName' in secret_dict else None
    
        use_ssl, fall_back = get_ssl_config(secret_dict)
    
        conn = connect_and_authenticate(secret_dict, port, dbname, use_ssl)
        if conn or not fall_back:
            return conn
        else:
            return connect_and_authenticate(secret_dict, port, dbname, False)
    
    
    def get_ssl_config(secret_dict):
        if 'SSL' not in secret_dict:
            return True, True
    
        if isinstance(secret_dict['SSL'], bool):
            return secret_dict['SSL'], False
    
        if isinstance(secret_dict['SSL'], str):
            ssl = secret_dict['SSL'].lower()
            if ssl == "true":
                return True, False
            elif ssl == "false":
                return False, False
            else:
                return True, True
    
        return True, True
    
    
    def connect_and_authenticate(secret_dict, port, dbname, use_ssl):
        ssl = {'ca': '/opt/python/certs/cert.pem'} if use_ssl else None
    
        try:
            conn = pymysql.connect(host=secret_dict['Endpoint'], user=secret_dict['AccountName'],
                                   password=secret_dict['AccountPassword'],
                                   port=port, database=dbname, connect_timeout=5, ssl=ssl)
            logger.info("Successfully established %s connection as user '%s' with Endpoint: '%s'" % (
                "SSL/TLS" if use_ssl else "non SSL/TLS", secret_dict['AccountName'], secret_dict['Endpoint']))
            return conn
        except pymysql.OperationalError as e:
            if 'certificate verify failed: IP address mismatch' in e.args[1]:
                logger.error(
                    "Hostname verification failed when estlablishing SSL/TLS Handshake with Endpoint: %s" % secret_dict[
                        'Endpoint'])
            return None
    
    
    def get_secret_dict(client, secret_name, stage, version_id=None):
        required_fields = ['Endpoint', 'AccountName', 'AccountPassword']
        if version_id:
            secret = get_secret_value(client, secret_name, version_id, stage)
        else:
            secret = get_secret_value(client, secret_name, stage=stage)
        plaintext = secret['SecretData']
        secret_dict = json.loads(plaintext)
        for field in required_fields:
            if field not in secret_dict:
                raise KeyError("%s key is missing from secret JSON" % field)
        return secret_dict
    
    
    def get_alt_account_name(current_account_name):
        rotation_suffix = "_rt"
        if current_account_name.endswith(rotation_suffix):
            return current_account_name[:(len(rotation_suffix) * -1)]
        else:
            new_account_name = current_account_name + rotation_suffix
            if len(new_account_name) > 16:
                raise ValueError(
                    "Unable to rotate user, account_name length with _rotation appended would exceed 16 characters")
            return new_account_name
    
    
    def get_password_option(version):
        if version.startswith("8"):
            return "%s"
        else:
            return "PASSWORD(%s)"
    
    
    def get_escaped_encryption_statement(version):
        if version.startswith("5.6"):
            return "GRANT USAGE ON *.* TO %s@'%%' REQUIRE"
        else:
            return "ALTER USER %s@'%%' REQUIRE"
    
    
    def is_rds_replica_database(client, replica_dict, master_dict):
        replica_instance_id = replica_dict['Endpoint'].split(".")[0].replace('io', '')
        master_instance_id = master_dict['Endpoint'].split(".")[0].replace('io', '')
        try:
            describe_response = describe_db_instances(client, replica_instance_id)
        except Exception as err:
            logger.warning("Encountered error while verifying rds replica status: %s" % err)
            return False
        items = describe_response['Items']
        instances = items.get("DBInstance")
        if not instances:
            logger.info("Cannot verify replica status - no RDS instance found with identifier: %s" % replica_instance_id)
            return False
    
        current_instance = instances[0]
        return master_instance_id == current_instance.get('DBInstanceId')
    
    
    def get_secret_value(client, secret_name, version_id=None, stage=None):
        request = GetSecretValueRequest()
        request.set_accept_format('json')
        request.set_SecretName(secret_name)
        if version_id:
            request.set_VersionId(version_id)
        if stage:
            request.set_VersionStage(stage)
        response = client.do_action_with_exception(request)
        return json.loads(response)
    
    
    def put_secret_value(client, secret_name, version_id, secret_data, version_stages=None):
        request = PutSecretValueRequest()
        request.set_accept_format('json')
        request.set_SecretName(secret_name)
        request.set_VersionId(version_id)
        if version_stages:
            request.set_VersionStages(version_stages)
        request.set_SecretData(secret_data)
        response = client.do_action_with_exception(request)
        return json.loads(response)
    
    
    def get_random_password(client, exclude_characters=None):
        request = GetRandomPasswordRequest()
        request.set_accept_format('json')
        if exclude_characters:
            request.set_ExcludeCharacters(exclude_characters)
        response = client.do_action_with_exception(request)
        return json.loads(response)
    
    
    def update_secret_version_stage(client, secret_name, version_stage, remove_from_version=None, move_to_version=None):
        request = UpdateSecretVersionStageRequest()
        request.set_accept_format('json')
        request.set_VersionStage(version_stage)
        request.set_SecretName(secret_name)
        if remove_from_version:
            request.set_RemoveFromVersion(remove_from_version)
        if move_to_version:
            request.set_MoveToVersion(move_to_version)
        response = client.do_action_with_exception(request)
        return json.loads(response)
    
    
    def describe_db_instances(client, db_instance_id):
        request = DescribeDBInstancesRequest()
        request.set_accept_format('json')
        request.set_DBInstanceId(db_instance_id)
        response = client.do_action_with_exception(request)
        return json.loads(response)
    
    
                            
  3. Buat lapisan kustom. Untuk informasi lebih lanjut, lihat Buat Lapisan Kustom.

    Anda dapat menggunakan lapisan kustom untuk mencegah instalasi dependensi yang sering. Dalam contoh ini, Python digunakan. Dalam kode sampel, Anda perlu menginstal dependensi PyMySQL. Jika sertifikat SSL digunakan untuk database MySQL, Anda juga dapat menambahkan sertifikat SSL ke lapisan.

    1. Bangun paket ZIP lapisan.

      1. Jalankan perintah mkdir my-secret-rotate untuk membuat direktori kerja.

      2. Jalankan perintah cd my-secret-rotate untuk masuk ke direktori kerja.

      3. Jalankan perintah pip install --target ./python pymysql untuk menginstal dependensi di direktori my-secret-rotate/python.

      4. Jika sertifikat SSL digunakan untuk database MySQL, buat direktori certs di direktori python dan simpan file cert.pem ke direktori certs.

      5. Masuk ke direktori my-secret-rotate dan jalankan perintah zip -r my-secret-rotate.zip python untuk mengemas dependensi.

    2. Buat lapisan kustom di konsol Function Compute.

  4. Konfigurasikan lapisan kustom untuk fungsi. Untuk informasi lebih lanjut, lihat Kelola Lapisan.

    1. Di halaman Services, temukan layanan yang fungsinya ingin Anda kelola dan klik Functions di kolom Actions.

    2. Klik Functions, temukan fungsi yang ingin Anda kelola, lalu klik Configure di kolom Actions. Di bagian Layers, tambahkan lapisan kustom untuk fungsi tersebut.

3. Buat alur CloudFlow untuk rotasi rahasia

  1. Buat alur rotasi rahasia. Untuk informasi lebih lanjut, lihat Buat Alur Kerja.

    1. Masuk ke konsol CloudFlow.

    2. Di bilah navigasi atas, pilih wilayah tempat Anda ingin membuat alur.

      Penting

      Anda harus memilih wilayah tempat fungsi dibuat.

    3. Di halaman Flows, klik Create flow.

    4. Di halaman Create Flow, klik Create Flow with Code, konfigurasikan parameter, lalu klik Next Step.

      Anda harus mengubah konten dalam file YAML Definition menjadi kode berikut:

      version: v1
      type: flow
      steps:
        - type: task
          name: RotateSecretNew
          resourceArn: ARN fungsi yang Anda buat
          inputMappings:
            - target: SecretName
              source: $input.payload.SecretName
            - target: RegionId
              source: $input.payload.RegionId
            - target: Step
              source: new
        - type: task
          name: RotateSecretSet
          resourceArn: ARN fungsi yang Anda buat
          inputMappings:
            - target: SecretName
              source: $input.payload.SecretName
            - target: RegionId
              source: $input.payload.RegionId
            - target: Step
              source: set
            - target: VersionId
              source: $local.VersionId
        - type: task
          name: RotateSecretTest
          resourceArn: ARN fungsi yang Anda buat
          inputMappings:
            - target: SecretName
              source: $input.payload.SecretName
            - target: RegionId
              source: $input.payload.RegionId
            - target: Step
              source: test
            - target: VersionId
              source: $local.VersionId
        - type: task
          name: RotateSecretEnd
          resourceArn: ARN fungsi yang Anda buat
          inputMappings:
            - target: SecretName
              source: $input.payload.SecretName
            - target: RegionId
              source: $input.payload.RegionId
            - target: Step
              source: end
            - target: VersionId
              source: $local.VersionId
    5. Konfigurasikan peran alur.

    6. Klik Create Flow.

  2. Konfigurasikan pengaturan untuk merotasi rahasia generik pada waktu terjadwal. Untuk informasi lebih lanjut, lihat Buat Jadwal Berbasis Waktu.

    Jika Anda ingin menggunakan CloudFlow untuk mengonfigurasi rotasi terjadwal rahasia generik, atur Payload menjadi konten berikut:

    {
      "SecretName": "",
      "RegionId": ""
    }
    Catatan

    SecretName menentukan nama rahasia, sedangkan RegionId menentukan wilayah.

4. (Opsional) Hubungkan aplikasi ke Secrets Manager

Anda dapat menggunakan Secrets Manager JDBC untuk menghubungkan aplikasi ke Secrets Manager. Untuk informasi lebih lanjut, lihat Secrets Manager JDBC.