Function Computeを使用して汎用シークレットをローテーションし、機密情報のセキュリティを向上させることができます。 このトピックでは、Function Computeを使用してジェネリックシークレットをローテーションする方法について説明します。
背景情報
Function Computeは、完全マネージド型のイベント駆動型コンピューティングサービスです。 Function Computeを使用すると、サーバーなどのインフラストラクチャリソースを取得または管理する必要なく、コーディングに集中できます。 この方法では、コードを記述してアップロードするだけです。 Function Computeは、コンピューティングリソースを割り当て、柔軟で信頼性の高い方法でタスクを実行し、ログクエリ、パフォーマンスモニタリング、アラートなどの機能を提供します。 詳細については、「Function Computeとは」をご参照ください。
課金
Function Computeを使用して一般的なシークレットをローテーションする場合、Key Management Service (KMS) によって提供されるシークレットローテーションに対しては課金されません。 Function Computeと、Function Computeに統合されたサーバーレスワークフローに対して課金されます。 課金の詳細については、「Function Computeの課金」および「Serverless Workflowの課金」をご参照ください。
Secrets Managerは、自動的にローテーションされる動的シークレットを提供します。 動的シークレットがセキュリティ要件を満たすことができる場合は、動的シークレットを使用してコストを削減することを推奨します。 詳細については、「ダイナミックApsaraDB RDSシークレットの概要」、「概要」、または「ダイナミックECSシークレットの概要」をご参照ください。
秘密の回転プロセス
このトピックでは、次のシナリオでジェネリックシークレットをローテーションする方法について説明します。
ApsaraDB RDS APIを使用して、デュアルアカウントモードでApsaraDB RDSデータベースのジェネリックシークレットを回転させる
特権アカウントを使用して、デュアルアカウントモードで自己管理型MySQLデータベースの汎用シークレットを回転させる
ApsaraDB RDS APIを使用して、デュアルアカウントモードでApsaraDB RDSデータベースのジェネリックシークレットを回転させる
Serverless WorkflowとFunction Computeを使用して、ジェネリックシークレットをローテーションできます。
1. 秘密のローテーションの準備
Function Computeを有効にします。 詳細については、「Function Computeの有効化」をご参照ください。
ApsaraDB RDSデータベースへのログインに使用するアカウントを作成します。 次に、KMSコンソールでアカウントのジェネリックシークレットを作成します。 ジェネリックシークレットが初めてローテーションされた場合、新しいアカウントが生成されます。
シークレット値はJSON形式です。 例: シークレット名は、Serverless Workflowに渡されるスケジューリングパラメーターとして使用されます。
{ "AccountName": "", "AccountPassword": "" }
説明AccountNameは、ApsaraDB RDSデータベースへのログインに使用されるアカウントのユーザー名を指定し、AccountPasswordは、ApsaraDB RDSデータベースへのログインに使用されるアカウントのパスワードを指定します。
Function Computeの通常のサービスロールを作成し、Function ComputeがKMSにアクセスできるようにする権限を付与します。 詳細については、「他のAlibaba Cloudサービスへのアクセス権限付与」をご参照ください。
AliyunFCDefaultRolePolicyポリシーをFunction Computeの通常のサービスロールにアタッチします。
AliyunSTSAssumeRoleAccessシステムポリシーを通常のサービスロールにアタッチします。
KMSへのアクセスを許可するカスタムポリシーを通常のサービスロールにアタッチします。 次のスクリプトは、ポリシーの内容を示しています。
{ "Version": "1", "Statement": [ { "Effect": "Allow", "Action": [ "kms:GetSecretValue", "kms:GetRandomPassword", "kms:PutSecretValue", "kms:UpdateSecretVersionStage" ], "Resource": "*" }, { "Effect": "Allow", "Action": [ "rds:GrantAccountPrivilege", "rds:DescribeAccounts", "rds:ResetAccountPassword", "rds:CreateAccount" ], "Resource": "*" } ] }
2. 秘密回転関数の作成
Function Computeコンソールで、Function Computeサービスと秘密のローテーション関数を作成する必要があります。 詳細については、「関数の迅速な作成」をご参照ください。
Function Computeサービスを作成します。
Function Computeサービスを作成するときは、[詳細オプションの表示] をクリックし、[VPCへのアクセス] を [はい] に設定する必要があります。 次に、VPC、vSwitch、セキュリティグループを設定します。 Function Computeサービスが、指定されたセキュリティグループと指定されたVPCのvSwitchを使用してApsaraDB RDS APIとKMSにアクセスできることを確認します。
関数を作成します。
この例では、Pythonが使用されます。 関数を作成するときは、ランタイム環境で [Python 3.9] を選択します。 次のコードは例を提供します。 ビジネス要件に基づいてコードを変更できます。
# -*- coding: utf-8 -*- import json import logging import os from aliyunsdkrds.request.v20140815.CreateAccountRequest import CreateAccountRequest from aliyunsdkrds.request.v20140815.DescribeAccountsRequest import DescribeAccountsRequest from aliyunsdkrds.request.v20140815.GrantAccountPrivilegeRequest import GrantAccountPrivilegeRequest from aliyunsdkrds.request.v20140815.ResetAccountPasswordRequest import ResetAccountPasswordRequest from aliyunsdkcore.acs_exception.exceptions import ServerException from aliyunsdkcore.auth.credentials import StsTokenCredential from aliyunsdkcore.client import AcsClient from aliyunsdkkms.request.v20160120.GetRandomPasswordRequest import GetRandomPasswordRequest from aliyunsdkkms.request.v20160120.GetSecretValueRequest import GetSecretValueRequest from aliyunsdkkms.request.v20160120.PutSecretValueRequest import PutSecretValueRequest from aliyunsdkkms.request.v20160120.UpdateSecretVersionStageRequest import UpdateSecretVersionStageRequest logger = logging.getLogger() logger.setLevel(logging.INFO) def handler(event, context): evt = json.loads(event) secret_name = evt['SecretName'] region_id = evt['RegionId'] step = evt['Step'] instance_id = evt['InstanceId'] version_id = evt.get('VersionId') if not version_id: version_id = context.requestId credentials = StsTokenCredential(context.credentials.accessKeyId, context.credentials.accessKeySecret, context.credentials.securityToken) client = AcsClient(region_id=region_id, credential=credentials) endpoint = "kms-vpc." + region_id + ".aliyuncs.com" client.add_endpoint(region_id, 'kms', endpoint) resp = get_secret_value(client, secret_name) if "Generic" != resp['SecretType']: logger.error("Secret %s is not enabled for rotation" % secret_name) raise ValueError("Secret %s is not enabled for rotation" % secret_name) if step == "new": new_phase(client, secret_name, version_id) elif step == "set": set_phase(client, instance_id, secret_name, version_id) elif step == "end": end_phase(client, secret_name, version_id) else: logger.error("handler: Invalid step parameter %s for secret %s" % (step, secret_name)) raise ValueError("Invalid step parameter %s for secret %s" % (step, secret_name)) return {"VersionId": version_id} def new_phase(client, secret_name, version_id): current_dict = get_secret_dict(client, secret_name, "ACSCurrent") try: get_secret_dict(client, secret_name, "ACSPending", version_id) logger.info("new: Successfully retrieved secret for %s." % secret_name) except ServerException as e: if e.error_code != 'Forbidden.ResourceNotFound': raise ValueError("Can to find secret %s " % (secret_name)) current_dict['AccountName'] = get_alt_account_name(current_dict['AccountName']) exclude_characters = os.environ[ 'EXCLUDE_CHARACTERS'] if 'EXCLUDE_CHARACTERS' in os.environ else "\\\"\',./:;<>?[]{|}~`" passwd = get_random_password(client, exclude_characters) current_dict['AccountPassword'] = passwd['RandomPassword'] put_secret_value(client, secret_name, version_id, json.dumps(current_dict), json.dumps(['ACSPending'])) logger.info( "new: Successfully put secret for secret_name %s and version %s." % (secret_name, version_id)) def set_phase(client, instance_id, secret_name, version_id): current_dict = get_secret_dict(client, secret_name, "ACSCurrent") pending_dict = get_secret_dict(client, secret_name, "ACSPending", version_id) pending_resp = describe_accounts(client, instance_id, pending_dict["AccountName"]) pending_accounts = pending_resp["Accounts"]["DBInstanceAccount"] if get_alt_account_name(current_dict['AccountName']) != pending_dict['AccountName']: logger.error("set: Attempting to modify user %s other than current user or rotation %s" % ( pending_dict['AccountName'], current_dict['AccountName'])) raise ValueError("Attempting to modify user %s other than current user or rotation %s" % ( pending_dict['AccountName'], current_dict['AccountName'])) current_resp = describe_accounts(client, instance_id, current_dict["AccountName"]) current_accounts = current_resp["Accounts"]["DBInstanceAccount"] if len(current_accounts) == 0: logger.error("set: Unable to log into database using current credentials for secret %s" % secret_name) raise ValueError("Unable to log into database using current credentials for secret %s" % secret_name) if len(pending_accounts) == 0: create_rds_account(client, instance_id, pending_dict["AccountName"], pending_dict["AccountPassword"], current_accounts[0]["AccountType"]) pending_accounts = describe_accounts(client, instance_id, pending_dict["AccountName"])["Accounts"][ "DBInstanceAccount"] else: # reset password reset_account_password(client, instance_id, pending_dict["AccountName"], pending_dict["AccountPassword"]) current_privileges = current_accounts[0]["DatabasePrivileges"]["DatabasePrivilege"] pending_privileges = pending_accounts[0]["DatabasePrivileges"]["DatabasePrivilege"] if len(current_privileges) > 0: for current_privilege in current_privileges: is_contains = False for pending_privilege in pending_privileges: if current_privilege["DBName"] == pending_privilege["DBName"] and current_privilege[ "AccountPrivilege"] == pending_privilege["AccountPrivilege"]: is_contains = True continue if not is_contains: grant_account_privilege(client, instance_id, pending_dict["AccountName"], current_privilege["DBName"], current_privilege["AccountPrivilege"]) def end_phase(client, secret_name, version_id): update_secret_version_stage(client, secret_name, 'ACSCurrent', move_to_version=version_id) update_secret_version_stage(client, secret_name, 'ACSPending', remove_from_version=version_id) logger.info( "end: Successfully set ACSCurrent stage to version %s for secret %s." % (version_id, secret_name)) def get_secret_dict(client, secret_name, stage, version_id=None): required_fields = ['AccountName', 'AccountPassword'] if version_id: secret = get_secret_value(client, secret_name, version_id, stage) else: secret = get_secret_value(client, secret_name, stage=stage) plaintext = secret['SecretData'] secret_dict = json.loads(plaintext) for field in required_fields: if field not in secret_dict: raise KeyError("%s key is missing from secret JSON" % field) return secret_dict def get_alt_account_name(current_account_name): rotation_suffix = "_rt" if current_account_name.endswith(rotation_suffix): return current_account_name[:(len(rotation_suffix) * -1)] else: new_account_name = current_account_name + rotation_suffix if len(new_account_name) > 16: raise ValueError( "Unable to rotation user, account_name length with _rotation appended would exceed 16 characters") return new_account_name def get_secret_value(client, secret_name, version_id=None, stage=None): request = GetSecretValueRequest() request.set_accept_format('json') request.set_SecretName(secret_name) if version_id: request.set_VersionId(version_id) if stage: request.set_VersionStage(stage) response = client.do_action_with_exception(request) return json.loads(response) def put_secret_value(client, secret_name, version_id, secret_data, version_stages=None): request = PutSecretValueRequest() request.set_accept_format('json') request.set_SecretName(secret_name) request.set_VersionId(version_id) if version_stages: request.set_VersionStages(version_stages) request.set_SecretData(secret_data) response = client.do_action_with_exception(request) return json.loads(response) def get_random_password(client, exclude_characters=None): request = GetRandomPasswordRequest() request.set_accept_format('json') if exclude_characters: request.set_ExcludeCharacters(exclude_characters) response = client.do_action_with_exception(request) return json.loads(response) def update_secret_version_stage(client, secret_name, version_stage, remove_from_version=None, move_to_version=None): request = UpdateSecretVersionStageRequest() request.set_accept_format('json') request.set_VersionStage(version_stage) request.set_SecretName(secret_name) if remove_from_version: request.set_RemoveFromVersion(remove_from_version) if move_to_version: request.set_MoveToVersion(move_to_version) response = client.do_action_with_exception(request) return json.loads(response) def create_rds_account(client, db_instance_id, account_name, account_password, account_type): request = CreateAccountRequest() request.set_accept_format('json') request.set_DBInstanceId(db_instance_id) request.set_AccountName(account_name) request.set_AccountPassword(account_password) request.set_AccountType(account_type) response = client.do_action_with_exception(request) return json.loads(response) def grant_account_privilege(client, db_instance_id, account_name, db_name, account_privilege): request = GrantAccountPrivilegeRequest() request.set_accept_format('json') request.set_DBInstanceId(db_instance_id) request.set_AccountName(account_name) request.set_DBName(db_name) request.set_AccountPrivilege(account_privilege) response = client.do_action_with_exception(request) return json.loads(response) def describe_accounts(client, db_instance_id, account_name): request = DescribeAccountsRequest() request.set_accept_format('json') request.set_DBInstanceId(db_instance_id) request.set_AccountName(account_name) response = client.do_action_with_exception(request) return json.loads(response) def reset_account_password(client, db_instance_id, account_name, account_password): request = ResetAccountPasswordRequest() request.set_accept_format('json') request.set_DBInstanceId(db_instance_id) request.set_AccountName(account_name) request.set_AccountPassword(account_password) response = client.do_action_with_exception(request) return json.loads(response)
3. シークレットローテーション用のサーバーレスワークフローフローの作成
秘密のローテーションフローを作成します。 詳細については、「ワークフローの作成」をご参照ください。
Serverless Workflowコンソールにログインします。
上部のナビゲーションバーで、フローを作成するリージョンを選択します。
重要関数が作成されるリージョンを選択する必要があります。
[フロー] ページで、[フローの作成] をクリックします。
[フローの作成] ページで、[コードによるフローの作成] をクリックし、パラメーターを設定し、[次のステップ] をクリックします。
DefinitionのYAMLファイルの内容を次のコードに変更する必要があります。
version: v1 type: flow steps: - type: task name: RotateSecretNew resourceArn: the Alibaba Cloud Resource Name (ARN) of the function that you created inputMappings: - target: SecretName source: $input.payload.SecretName - target: RegionId source: $input.payload.RegionId - target: InstanceId source: $input.payload.InstanceId - target: Step source: new - type: task name: RotateSecretSet resourceArn: the ARN of the function that you created inputMappings: - target: SecretName source: $input.payload.SecretName - target: RegionId source: $input.payload.RegionId - target: InstanceId source: $input.payload.InstanceId - target: Step source: set - target: VersionId source: $local.VersionId - type: task name: RotateSecretEnd resourceArn: the ARN of the function that you created inputMappings: - target: SecretName source: $input.payload.SecretName - target: RegionId source: $input.payload.RegionId - target: InstanceId source: $input.payload.InstanceId - target: Step source: end - target: VersionId source: $local.VersionId
フローロールを設定します。
[フローの作成] をクリックします。
スケジュールされた時間にジェネリックシークレットをローテーションするように設定します。 詳細については、「時間ベースのスケジュールの作成」をご参照ください。
Serverless Workflowを使用してジェネリックシークレットのスケジュールされたローテーションを設定する場合は、[ペイロード] を次の内容に設定します。
{ "SecretName": "", "RegionId": "", "InstanceId":"" }
説明SecretNameはシークレット名、RegionIdはリージョン、InstanceIdはApsaraDB RDSインスタンスのIDを指定します。
4. (オプション) アプリケーションをSecrets Managerに接続する
Secrets Manager JDBCを使用して、アプリケーションをSecrets Managerに接続できます。 詳細については、「Secrets Manager JDBC」をご参照ください。
特権アカウントを使用して、デュアルアカウントモードで自己管理型MySQLデータベースの汎用シークレットを回転させる
Serverless WorkflowとFunction Computeを使用して、ジェネリックシークレットをローテーションできます。
1. 秘密のローテーションの準備
Function Computeを有効にします。 詳細については、「Function Computeの有効化」をご参照ください。
自己管理型MySQLデータベースへのログインに使用する特権アカウントを作成し、KMSコンソールで特権アカウントの一般的なシークレットを作成します。 特権アカウントは、自己管理MySQLデータベースへのログインに使用されるアカウントの作成またはパスワードの変更にも使用されます。
シークレット値はJSON形式です。 例:
{ "Endpoint": "", "AccountName": "", "AccountPassword": "", "SSL":false }
説明Endpointは自己管理型MySQLデータベースのドメイン名またはアドレスを指定し、AccountNameは特権アカウントのユーザー名を指定します。 AccountPasswordは特権アカウントのパスワードを指定し、SSLは証明書を使用するかどうかを指定します。 SSLの有効な値はtrueとfalseです。 デフォルト値:false デフォルトでは、証明書は /opt/python/certs/cert.pemディレクトリに保存されます。
自己管理型MySQLデータベースへのログインに使用するアカウントを作成し、KMSコンソールでそのアカウントの一般的なシークレットを作成します。 ジェネリックシークレットが初めてローテーションされた場合、新しいアカウントが作成されます。
シークレット値はJSON形式です。 例: シークレット名は、Serverless Workflowに渡されるスケジューリングパラメーターとして使用されます。
{ "Endpoint": "", "AccountName": "", "AccountPassword": "", "MasterSecret":"", "SSL":false }
説明EndpointはセルフマネージドMySQLデータベースのドメイン名またはアドレスを指定します。AccountNameはセルフマネージドMySQLデータベースへのログインに使用されるアカウントのユーザー名を指定します。AccountPasswordはセルフマネージドMySQLデータベースへのログインに使用されるアカウントのパスワードを指定します。MasterSecretは特権アカウントを指定します。SSLは証明書を使用するかどうかを指定します。 SSLの有効な値はtrueとfalseです。 デフォルト値:false デフォルトでは、証明書は /opt/python/certs/cert.pemディレクトリに保存されます。
Function Computeの通常のサービスロールを作成し、Function ComputeがKMSにアクセスできるようにする権限を付与します。 詳細については、「他のAlibaba Cloudサービスへのアクセス権限付与」をご参照ください。
AliyunFCDefaultRolePolicyポリシーをFunction Computeの通常のサービスロールにアタッチします。
AliyunSTSAssumeRoleAccessシステムポリシーを通常のサービスロールにアタッチします。
KMSへのアクセスを許可するカスタムポリシーを通常のサービスロールにアタッチします。 次のスクリプトは、ポリシーの内容を示しています。
{ "Version": "1", "Statement": [ { "Effect": "Allow", "Action": [ "kms:GetSecretValue", "kms:GetRandomPassword", "kms:Decrypt", "kms:GenerateDataKey", "kms:PutSecretValue", "kms:UpdateSecretVersionStage" ], "Resource": "*" } ] }
2. 秘密回転関数の作成
Function Computeサービスを作成します。 詳細については、「関数の迅速な作成」をご参照ください。
Function Computeサービスを作成するときは、[詳細オプションの表示] をクリックし、[VPCへのアクセス] を [はい] に設定する必要があります。 次に、VPC、vSwitch、セキュリティグループを設定します。 Function Computeサービスが、指定されたセキュリティグループと指定されたVPCのvSwitchを使用してApsaraDB RDS APIとKMSにアクセスできることを確認します。
関数を作成します。 詳細については、「関数の迅速な作成」をご参照ください。
この例では、Pythonが使用されます。 関数を作成するときは、ランタイム環境で [Python 3.9] を選択します。 次のコードは例を提供します。 ビジネス要件に基づいてコードを変更できます。
# -*- coding: utf-8 -*- import json import logging import os try: import pymysql except: os.system('pip install pymysql -t ./') import pymysql from aliyunsdkcore.acs_exception.exceptions import ServerException from aliyunsdkcore.auth.credentials import StsTokenCredential from aliyunsdkcore.client import AcsClient from aliyunsdkkms.request.v20160120.GetRandomPasswordRequest import GetRandomPasswordRequest from aliyunsdkkms.request.v20160120.GetSecretValueRequest import GetSecretValueRequest from aliyunsdkkms.request.v20160120.PutSecretValueRequest import PutSecretValueRequest from aliyunsdkkms.request.v20160120.UpdateSecretVersionStageRequest import UpdateSecretVersionStageRequest from aliyunsdkrds.request.v20140815.DescribeDBInstancesRequest import DescribeDBInstancesRequest logger = logging.getLogger() logger.setLevel(logging.INFO) def handler(event, context): evt = json.loads(event) secret_name = evt['SecretName'] region_id = evt['RegionId'] step = evt['Step'] version_id = evt.get('VersionId') if not version_id: version_id = context.requestId credentials = StsTokenCredential(context.credentials.accessKeyId, context.credentials.accessKeySecret, context.credentials.securityToken) client = AcsClient(region_id=region_id, credential=credentials) endpoint = "kms-vpc." + region_id + ".aliyuncs.com" client.add_endpoint(region_id, 'kms', endpoint) resp = get_secret_value(client, secret_name) if "Generic" != resp['SecretType']: logger.error("Secret %s is not enabled for rotation" % secret_name) raise ValueError("Secret %s is not enabled for rotation" % secret_name) if step == "new": new_phase(client, secret_name, version_id) elif step == "set": set_phase(client, secret_name, version_id) elif step == "test": test_phase(client, secret_name, version_id) elif step == "end": end_phase(client, secret_name, version_id) else: logger.error("handler: Invalid step parameter %s for secret %s" % (step, secret_name)) raise ValueError("Invalid step parameter %s for secret %s" % (step, secret_name)) return {"VersionId": version_id} def new_phase(client, secret_name, version_id): current_dict = get_secret_dict(client, secret_name, "ACSCurrent") try: get_secret_dict(client, secret_name, "ACSPending", version_id) logger.info("new: Successfully retrieved secret for %s." % secret_name) except ServerException as e: if e.error_code != 'Forbidden.ResourceNotFound': raise current_dict['AccountName'] = get_alt_account_name(current_dict['AccountName']) exclude_characters = os.environ['EXCLUDE_CHARACTERS'] if 'EXCLUDE_CHARACTERS' in os.environ else '/@"\'\\' passwd = get_random_password(client, exclude_characters) current_dict['AccountPassword'] = passwd['RandomPassword'] put_secret_value(client, secret_name, version_id, json.dumps(current_dict), json.dumps(['ACSPending'])) logger.info( "new: Successfully put secret for secret_name %s and version %s." % (secret_name, version_id)) def set_phase(client, secret_name, version_id): current_dict = get_secret_dict(client, secret_name, "ACSCurrent") pending_dict = get_secret_dict(client, secret_name, "ACSPending", version_id) conn = get_connection(pending_dict) if conn: conn.close() logger.info( "set: ACSPending secret is already set as password in MySQL DB for secret secret_name %s." % secret_name) return if get_alt_account_name(current_dict['AccountName']) != pending_dict['AccountName']: logger.error("set: Attempting to modify user %s other than current user or rotation %s" % ( pending_dict['AccountName'], current_dict['AccountName'])) raise ValueError("Attempting to modify user %s other than current user or rotation %s" % ( pending_dict['AccountName'], current_dict['AccountName'])) if current_dict['Endpoint'] != pending_dict['Endpoint']: logger.error("set: Attempting to modify user for Endpoint %s other than current Endpoint %s" % ( pending_dict['Endpoint'], current_dict['Endpoint'])) raise ValueError("Attempting to modify user for Endpoint %s other than current Endpoint %s" % ( pending_dict['Endpoint'], current_dict['Endpoint'])) conn = get_connection(current_dict) if not conn: logger.error("set: Unable to access the given database using current credentials for secret %s" % secret_name) raise ValueError("Unable to access the given database using current credentials for secret %s" % secret_name) conn.close() master_secret = current_dict['MasterSecret'] master_dict = get_secret_dict(client, master_secret, "ACSCurrent") if current_dict['Endpoint'] != master_dict['Endpoint'] and not is_rds_replica_database(current_dict, master_dict): logger.error("set: Current database Endpoint %s is not the same Endpoint as/rds replica of master %s" % ( current_dict['Endpoint'], master_dict['Endpoint'])) raise ValueError("Current database Endpoint %s is not the same Endpoint as/rds replica of master %s" % ( current_dict['Endpoint'], master_dict['Endpoint'])) conn = get_connection(master_dict) if not conn: logger.error( "set: Unable to access the given database using credentials in master secret secret %s" % master_secret) raise ValueError("Unable to access the given database using credentials in master secret secret %s" % master_secret) try: with conn.cursor() as cur: cur.execute("SELECT User FROM mysql.user WHERE User = %s", pending_dict['AccountName']) if cur.rowcount == 0: cur.execute("CREATE USER %s IDENTIFIED BY %s", (pending_dict['AccountName'], pending_dict['AccountPassword'])) cur.execute("SHOW GRANTS FOR %s", current_dict['AccountName']) for row in cur.fetchall(): if 'XA_RECOVER_ADMIN' in row[0]: continue grant = row[0].split(' TO ') new_grant_escaped = grant[0].replace('%', '%%') # % is a special cha30racter in Python format strings. cur.execute(new_grant_escaped + " TO %s ", (pending_dict['AccountName'],)) cur.execute("SELECT VERSION()") ver = cur.fetchone()[0] escaped_encryption_statement = get_escaped_encryption_statement(ver) cur.execute("SELECT ssl_type, ssl_cipher, x509_issuer, x509_subject FROM mysql.user WHERE User = %s", current_dict['AccountName']) tls_options = cur.fetchone() ssl_type = tls_options[0] if not ssl_type: cur.execute(escaped_encryption_statement + " NONE", pending_dict['AccountName']) elif ssl_type == "ANY": cur.execute(escaped_encryption_statement + " SSL", pending_dict['AccountName']) elif ssl_type == "X509": cur.execute(escaped_encryption_statement + " X509", pending_dict['AccountName']) else: cur.execute(escaped_encryption_statement + " CIPHER %s AND ISSUER %s AND SUBJECT %s", (pending_dict['AccountName'], tls_options[1], tls_options[2], tls_options[3])) password_option = get_password_option(ver) cur.execute("SET PASSWORD FOR %s = " + password_option, (pending_dict['AccountName'], pending_dict['AccountPassword'])) conn.commit() logger.info("set: Successfully changed password for %s in MySQL DB for secret secret_name %s." % ( pending_dict['AccountName'], secret_name)) finally: conn.close() def test_phase(client, secret_name, version_id): conn = get_connection(get_secret_dict(client, secret_name, "ACSPending", version_id)) if conn: try: with conn.cursor() as cur: cur.execute("SELECT NOW()") conn.commit() finally: conn.close() logger.info("test: Successfully accessed into MySQL DB with ACSPending secret in %s." % secret_name) return else: logger.error( "test: Unable to access the given database with pending secret of secret secret_name %s" % secret_name) raise ValueError("Unable to access the given database with pending secret of secret secret_name %s" % secret_name) def end_phase(client, secret_name, version_id): update_secret_version_stage(client, secret_name, 'ACSCurrent', move_to_version=version_id) update_secret_version_stage(client, secret_name, 'ACSPending', remove_from_version=version_id) logger.info( "end: Successfully update ACSCurrent stage to version %s for secret %s." % (version_id, secret_name)) def get_connection(secret_dict): port = int(secret_dict['Port']) if 'Port' in secret_dict else 3306 dbname = secret_dict['DBName'] if 'DBName' in secret_dict else None use_ssl, fall_back = get_ssl_config(secret_dict) conn = connect_and_authenticate(secret_dict, port, dbname, use_ssl) if conn or not fall_back: return conn else: return connect_and_authenticate(secret_dict, port, dbname, False) def get_ssl_config(secret_dict): if 'SSL' not in secret_dict: return True, True if isinstance(secret_dict['SSL'], bool): return secret_dict['SSL'], False if isinstance(secret_dict['SSL'], str): ssl = secret_dict['SSL'].lower() if ssl == "true": return True, False elif ssl == "false": return False, False else: return True, True return True, True def connect_and_authenticate(secret_dict, port, dbname, use_ssl): ssl = {'ca': '/opt/python/certs/cert.pem'} if use_ssl else None try: conn = pymysql.connect(host=secret_dict['Endpoint'], user=secret_dict['AccountName'], password=secret_dict['AccountPassword'], port=port, database=dbname, connect_timeout=5, ssl=ssl) logger.info("Successfully established %s connection as user '%s' with Endpoint: '%s'" % ( "SSL/TLS" if use_ssl else "non SSL/TLS", secret_dict['AccountName'], secret_dict['Endpoint'])) return conn except pymysql.OperationalError as e: if 'certificate verify failed: IP address mismatch' in e.args[1]: logger.error( "Hostname verification failed when estlablishing SSL/TLS Handshake with Endpoint: %s" % secret_dict[ 'Endpoint']) return None def get_secret_dict(client, secret_name, stage, version_id=None): required_fields = ['Endpoint', 'AccountName', 'AccountPassword'] if version_id: secret = get_secret_value(client, secret_name, version_id, stage) else: secret = get_secret_value(client, secret_name, stage=stage) plaintext = secret['SecretData'] secret_dict = json.loads(plaintext) for field in required_fields: if field not in secret_dict: raise KeyError("%s key is missing from secret JSON" % field) return secret_dict def get_alt_account_name(current_account_name): rotation_suffix = "_rt" if current_account_name.endswith(rotation_suffix): return current_account_name[:(len(rotation_suffix) * -1)] else: new_account_name = current_account_name + rotation_suffix if len(new_account_name) > 16: raise ValueError( "Unable to rotate user, account_name length with _rotation appended would exceed 16 characters") return new_account_name def get_password_option(version): if version.startswith("8"): return "%s" else: return "PASSWORD(%s)" def get_escaped_encryption_statement(version): if version.startswith("5.6"): return "GRANT USAGE ON *.* TO %s@'%%' REQUIRE" else: return "ALTER USER %s@'%%' REQUIRE" def is_rds_replica_database(client, replica_dict, master_dict): replica_instance_id = replica_dict['Endpoint'].split(".")[0].replace('io', '') master_instance_id = master_dict['Endpoint'].split(".")[0].replace('io', '') try: describe_response = describe_db_instances(client, replica_instance_id) except Exception as err: logger.warning("Encountered error while verifying rds replica status: %s" % err) return False items = describe_response['Items'] instances = items.get("DBInstance") if not instances: logger.info("Cannot verify replica status - no RDS instance found with identifier: %s" % replica_instance_id) return False current_instance = instances[0] return master_instance_id == current_instance.get('DBInstanceId') def get_secret_value(client, secret_name, version_id=None, stage=None): request = GetSecretValueRequest() request.set_accept_format('json') request.set_SecretName(secret_name) if version_id: request.set_VersionId(version_id) if stage: request.set_VersionStage(stage) response = client.do_action_with_exception(request) return json.loads(response) def put_secret_value(client, secret_name, version_id, secret_data, version_stages=None): request = PutSecretValueRequest() request.set_accept_format('json') request.set_SecretName(secret_name) request.set_VersionId(version_id) if version_stages: request.set_VersionStages(version_stages) request.set_SecretData(secret_data) response = client.do_action_with_exception(request) return json.loads(response) def get_random_password(client, exclude_characters=None): request = GetRandomPasswordRequest() request.set_accept_format('json') if exclude_characters: request.set_ExcludeCharacters(exclude_characters) response = client.do_action_with_exception(request) return json.loads(response) def update_secret_version_stage(client, secret_name, version_stage, remove_from_version=None, move_to_version=None): request = UpdateSecretVersionStageRequest() request.set_accept_format('json') request.set_VersionStage(version_stage) request.set_SecretName(secret_name) if remove_from_version: request.set_RemoveFromVersion(remove_from_version) if move_to_version: request.set_MoveToVersion(move_to_version) response = client.do_action_with_exception(request) return json.loads(response) def describe_db_instances(client, db_instance_id): request = DescribeDBInstancesRequest() request.set_accept_format('json') request.set_DBInstanceId(db_instance_id) response = client.do_action_with_exception(request) return json.loads(response)
カスタムレイヤーを作成します。 詳細については、「カスタムレイヤーの作成」をご参照ください。
カスタムレイヤーを使用して、依存関係の頻繁なインストールを防ぐことができます。 この例では、Pythonが使用されます。 サンプルコードでは、PyMySQL依存関係をインストールする必要があります。 MySQLデータベースにSSL証明書が使用されている場合は、SSL証明書をレイヤーに追加することもできます。
レイヤーのZIPパッケージを作成します。
mkdir my-secret-rotate
コマンドを実行して、作業ディレクトリを作成します。cd my-secret-rotate
コマンドを実行して、作業ディレクトリに移動します。を実行します。Run the
pip install -- target. /python pymysql
コマンドを実行して、my-secret-rotate/pythonディレクトリに依存関係をインストールします。MySQLデータベースにSSL証明書が使用されている場合は、pythonディレクトリにcertsディレクトリを作成し、cert.pemファイルをcertsディレクトリに保存します。
my-secret-rotateディレクトリに移動し、
zip -r my-secret-rotate.zip python
コマンドを実行して依存関係をパッケージ化します。
Function Computeコンソールでカスタムレイヤーを作成します。
関数のカスタムレイヤーを設定します。 詳細については、「レイヤーの管理」をご参照ください。
[サービス] ページで、機能を管理するサービスを見つけ、[操作] 列の [機能] をクリックします。
[関数] をクリックし、管理する関数を見つけて、[操作] 列の [設定] をクリックします。 [レイヤー] セクションで、関数のカスタムレイヤーを追加します。
3. シークレットローテーション用のサーバーレスワークフローフローの作成
秘密のローテーションフローを作成します。 詳細については、「ワークフローの作成」をご参照ください。
Serverless Workflowコンソールにログインします。
上部のナビゲーションバーで、フローを作成するリージョンを選択します。
重要関数が作成されるリージョンを選択する必要があります。
[フロー] ページで、[フローの作成] をクリックします。
[フローの作成] ページで、[コードによるフローの作成] をクリックし、パラメーターを設定し、[次のステップ] をクリックします。
DefinitionのYAMLファイルの内容を次のコードに変更する必要があります。
version: v1 type: flow steps: - type: task name: RotateSecretNew resourceArn: the ARN of the function that you created inputMappings: - target: SecretName source: $input.payload.SecretName - target: RegionId source: $input.payload.RegionId - target: Step source: new - type: task name: RotateSecretSet resourceArn: the ARN of the function that you created inputMappings: - target: SecretName source: $input.payload.SecretName - target: RegionId source: $input.payload.RegionId - target: Step source: set - target: VersionId source: $local.VersionId - type: task name: RotateSecretTest resourceArn: the ARN of the function that you created inputMappings: - target: SecretName source: $input.payload.SecretName - target: RegionId source: $input.payload.RegionId - target: Step source: test - target: VersionId source: $local.VersionId - type: task name: RotateSecretEnd resourceArn: the ARN of the function that you created inputMappings: - target: SecretName source: $input.payload.SecretName - target: RegionId source: $input.payload.RegionId - target: Step source: end - target: VersionId source: $local.VersionId
フローロールを設定します。
[フローの作成] をクリックします。
スケジュールされた時間にジェネリックシークレットをローテーションするように設定します。 詳細については、「時間ベースのスケジュールの作成」をご参照ください。
Serverless Workflowを使用してジェネリックシークレットのスケジュールされたローテーションを設定する場合は、[ペイロード] を次の内容に設定します。
{ "SecretName": "", "RegionId": "" }
説明SecretNameはシークレット名を指定し、RegionIdはリージョンを指定します。
4. (オプション) アプリケーションをSecrets Managerに接続する
Secrets Manager JDBCを使用して、アプリケーションをSecrets Managerに接続できます。 詳細については、「Secrets Manager JDBC」をご参照ください。