Otomatisasi rotasi rahasia generik yang disimpan di KMS Secrets Manager menggunakan Function Compute dan CloudFlow. Panduan ini mencakup dua skenario rotasi database: instans ApsaraDB RDS yang diakses melalui API RDS, dan database MySQL yang dikelola sendiri yang diakses melalui akun istimewa.
KMS Secrets Manager menyediakan rahasia dinamis dengan rotasi otomatis bawaan untuk ApsaraDB RDS, RAM, dan ECS. Jika rahasia dinamis mencakup kasus penggunaan Anda, gunakanlah—biayanya lebih rendah dan tidak memerlukan kode kustom. Lihat Ikhtisar rahasia ApsaraDB RDS dinamis, Ikhtisar rahasia RAM dinamis, atau Ikhtisar rahasia ECS dinamis.
Penagihan
Rotasi rahasia KMS gratis. Biaya berlaku untuk Function Compute dan CloudFlow. Lihat Penagihan Function Compute dan Penagihan CloudFlow.
Pilih strategi rotasi
Kedua skenario dalam panduan ini menggunakan rotasi dua akun (pengguna bergantian). Strategi ini mempertahankan dua akun database—hanya satu yang aktif pada satu waktu—sehingga aplikasi Anda selalu memiliki kredensial yang valid selama rotasi. Akun yang tidak aktif menerima kredensial baru, lalu menjadi akun aktif.
| Strategi | Cara kerja | Ketersediaan selama rotasi | Paling cocok untuk |
|---|---|---|---|
| Single-account | Memperbarui kredensial untuk satu akun secara langsung | Jeda singkat saat kredensial diperbarui | Pengaturan sederhana, pengguna ad hoc |
| Dual-account | Bergantian antara dua akun; salah satu selalu aktif | Tanpa gangguan | Database produksi, persyaratan ketersediaan tinggi |
Akhiran rotasi _rt mengidentifikasi akun alternatif. Misalnya, jika akun aktif adalah appuser, maka akun alternatifnya adalah appuser_rt. Nama akun termasuk akhiran tersebut tidak boleh melebihi 16 karakter.
Cara kerja
Kedua skenario mengikuti proses yang sama yang diatur oleh CloudFlow. Jadwal berbasis waktu CloudFlow memicu alur, yang memanggil fungsi Function Compute yang sama beberapa kali dengan nilai Step berbeda:
| Step | Apa yang dilakukan fungsi | Idempotensi |
|---|---|---|
new | Menghasilkan password baru, menulisnya ke versi rahasia tertunda (ACSPending) | Jika ACSPending sudah ada untuk ID versi saat ini, melewati pembuatan rahasia |
set | Membuat atau memperbarui akun database alternatif dengan kredensial tertunda | Aman untuk dicoba ulang — memverifikasi status akun sebelum melakukan perubahan |
test | Memverifikasi kredensial tertunda dapat terhubung ke database (hanya untuk skenario MySQL) | Pemeriksaan konektivitas hanya-baca |
end | Meningkatkan ACSPending menjadi ACSCurrent, menyelesaikan rotasi | Pembaruan tahap bersifat atomik |
Setiap langkah bersifat idempoten, sehingga alur aman untuk dicoba ulang setelah kegagalan sebagian.
Skenario 1: Rotasi rahasia generik ApsaraDB RDS
Gunakan pendekatan ini ketika database Anda berjalan di ApsaraDB RDS dan Anda ingin merotasi kredensial menggunakan API RDS.
Prasyarat
Sebelum memulai, pastikan Anda telah memiliki:
Akun Function Compute aktif. Lihat Aktifkan Function Compute.
Akun database ApsaraDB RDS dan rahasia generik untuk akun tersebut di konsol KMS.
Peran layanan Function Compute dengan izin yang tercantum di Langkah 1.
Langkah 1: Siapkan izin
Buat peran layanan untuk Function Compute dan sambungkan kebijakan berikut:
AliyunFCDefaultRolePolicy — kebijakan sistem
AliyunSTSAssumeRoleAccess — kebijakan sistem
Kebijakan kustom yang memberikan akses ke KMS dan RDS:
{
"Version": "1",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kms:GetSecretValue",
"kms:GetRandomPassword",
"kms:PutSecretValue",
"kms:UpdateSecretVersionStage"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"rds:GrantAccountPrivilege",
"rds:DescribeAccounts",
"rds:ResetAccountPassword",
"rds:CreateAccount"
],
"Resource": "*"
}
]
}Lihat Berikan izin Function Compute untuk mengakses layanan Alibaba Cloud lainnya untuk instruksi tentang menyambungkan kebijakan.
Langkah 2: Buat rahasia generik
Buat rahasia generik di konsol KMS dengan JSON berikut sebagai nilai rahasia. Nama rahasia nantinya diteruskan sebagai parameter penjadwalan ke CloudFlow.
{
"AccountName": "<rds-account-name>",
"AccountPassword": "<rds-account-password>"
}| Field | Description |
|---|---|
AccountName | Username akun database RDS |
AccountPassword | Password akun database RDS |
Langkah 3: Buat fungsi rotasi
Di konsol Function Compute, buat layanan. Klik Show Advanced Options, atur Access to VPC ke Yes, dan konfigurasikan VPC, vSwitch, dan Security Group. Grup keamanan harus mengizinkan fungsi untuk mengakses API RDS dan titik akhir VPC KMS (
kms-vpc.<region-id>.aliyuncs.com).Buat fungsi dengan Runtime Environments diatur ke Python 3.9. Gunakan kode berikut:
# -*- coding: utf-8 -*-
import json
import logging
import os
from aliyunsdkrds.request.v20140815.CreateAccountRequest import CreateAccountRequest
from aliyunsdkrds.request.v20140815.DescribeAccountsRequest import DescribeAccountsRequest
from aliyunsdkrds.request.v20140815.GrantAccountPrivilegeRequest import GrantAccountPrivilegeRequest
from aliyunsdkrds.request.v20140815.ResetAccountPasswordRequest import ResetAccountPasswordRequest
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkcore.auth.credentials import StsTokenCredential
from aliyunsdkcore.client import AcsClient
from aliyunsdkkms.request.v20160120.GetRandomPasswordRequest import GetRandomPasswordRequest
from aliyunsdkkms.request.v20160120.GetSecretValueRequest import GetSecretValueRequest
from aliyunsdkkms.request.v20160120.PutSecretValueRequest import PutSecretValueRequest
from aliyunsdkkms.request.v20160120.UpdateSecretVersionStageRequest import UpdateSecretVersionStageRequest
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def handler(event, context):
evt = json.loads(event)
secret_name = evt['SecretName']
region_id = evt['RegionId']
step = evt['Step']
instance_id = evt['InstanceId']
version_id = evt.get('VersionId')
if not version_id:
version_id = context.requestId
credentials = StsTokenCredential(context.credentials.accessKeyId, context.credentials.accessKeySecret,
context.credentials.securityToken)
client = AcsClient(region_id=region_id, credential=credentials)
endpoint = "kms-vpc." + region_id + ".aliyuncs.com"
client.add_endpoint(region_id, 'kms', endpoint)
resp = get_secret_value(client, secret_name)
if "Generic" != resp['SecretType']:
logger.error("Secret %s is not enabled for rotation" % secret_name)
raise ValueError("Secret %s is not enabled for rotation" % secret_name)
if step == "new":
new_phase(client, secret_name, version_id)
elif step == "set":
set_phase(client, instance_id, secret_name, version_id)
elif step == "end":
end_phase(client, secret_name, version_id)
else:
logger.error("handler: Invalid step parameter %s for secret %s" % (step, secret_name))
raise ValueError("Invalid step parameter %s for secret %s" % (step, secret_name))
return {"VersionId": version_id}
def new_phase(client, secret_name, version_id):
current_dict = get_secret_dict(client, secret_name, "ACSCurrent")
try:
get_secret_dict(client, secret_name, "ACSPending", version_id)
logger.info("new: Successfully retrieved secret for %s." % secret_name)
except ServerException as e:
if e.error_code != 'Forbidden.ResourceNotFound':
raise ValueError("Can to find secret %s " % (secret_name))
current_dict['AccountName'] = get_alt_account_name(current_dict['AccountName'])
exclude_characters = os.environ[
'EXCLUDE_CHARACTERS'] if 'EXCLUDE_CHARACTERS' in os.environ else "\\\"\',./:;<>?[]{|}~`"
passwd = get_random_password(client, exclude_characters)
current_dict['AccountPassword'] = passwd['RandomPassword']
put_secret_value(client, secret_name, version_id, json.dumps(current_dict),
json.dumps(['ACSPending']))
logger.info(
"new: Successfully put secret for secret_name %s and version %s." % (secret_name, version_id))
def set_phase(client, instance_id, secret_name, version_id):
current_dict = get_secret_dict(client, secret_name, "ACSCurrent")
pending_dict = get_secret_dict(client, secret_name, "ACSPending", version_id)
pending_resp = describe_accounts(client, instance_id, pending_dict["AccountName"])
pending_accounts = pending_resp["Accounts"]["DBInstanceAccount"]
if get_alt_account_name(current_dict['AccountName']) != pending_dict['AccountName']:
logger.error("set: Attempting to modify user %s other than current user or rotation %s" % (
pending_dict['AccountName'], current_dict['AccountName']))
raise ValueError("Attempting to modify user %s other than current user or rotation %s" % (
pending_dict['AccountName'], current_dict['AccountName']))
current_resp = describe_accounts(client, instance_id, current_dict["AccountName"])
current_accounts = current_resp["Accounts"]["DBInstanceAccount"]
if len(current_accounts) == 0:
logger.error("set: Unable to log into database using current credentials for secret %s" % secret_name)
raise ValueError("Unable to log into database using current credentials for secret %s" % secret_name)
if len(pending_accounts) == 0:
create_rds_account(client, instance_id, pending_dict["AccountName"],
pending_dict["AccountPassword"], current_accounts[0]["AccountType"])
pending_accounts = describe_accounts(client, instance_id, pending_dict["AccountName"])["Accounts"][
"DBInstanceAccount"]
else:
# reset password
reset_account_password(client, instance_id, pending_dict["AccountName"], pending_dict["AccountPassword"])
current_privileges = current_accounts[0]["DatabasePrivileges"]["DatabasePrivilege"]
pending_privileges = pending_accounts[0]["DatabasePrivileges"]["DatabasePrivilege"]
if len(current_privileges) > 0:
for current_privilege in current_privileges:
is_contains = False
for pending_privilege in pending_privileges:
if current_privilege["DBName"] == pending_privilege["DBName"] and current_privilege[
"AccountPrivilege"] == pending_privilege["AccountPrivilege"]:
is_contains = True
continue
if not is_contains:
grant_account_privilege(client, instance_id, pending_dict["AccountName"], current_privilege["DBName"],
current_privilege["AccountPrivilege"])
def end_phase(client, secret_name, version_id):
update_secret_version_stage(client, secret_name, 'ACSCurrent', move_to_version=version_id)
update_secret_version_stage(client, secret_name, 'ACSPending', remove_from_version=version_id)
logger.info(
"end: Successfully set ACSCurrent stage to version %s for secret %s." % (version_id, secret_name))
def get_secret_dict(client, secret_name, stage, version_id=None):
required_fields = ['AccountName', 'AccountPassword']
if version_id:
secret = get_secret_value(client, secret_name, version_id, stage)
else:
secret = get_secret_value(client, secret_name, stage=stage)
plaintext = secret['SecretData']
secret_dict = json.loads(plaintext)
for field in required_fields:
if field not in secret_dict:
raise KeyError("%s key is missing from secret JSON" % field)
return secret_dict
def get_alt_account_name(current_account_name):
rotation_suffix = "_rt"
if current_account_name.endswith(rotation_suffix):
return current_account_name[:(len(rotation_suffix) * -1)]
else:
new_account_name = current_account_name + rotation_suffix
if len(new_account_name) > 16:
raise ValueError(
"Unable to rotation user, account_name length with _rotation appended would exceed 16 characters")
return new_account_name
def get_secret_value(client, secret_name, version_id=None, stage=None):
request = GetSecretValueRequest()
request.set_accept_format('json')
request.set_SecretName(secret_name)
if version_id:
request.set_VersionId(version_id)
if stage:
request.set_VersionStage(stage)
response = client.do_action_with_exception(request)
return json.loads(response)
def put_secret_value(client, secret_name, version_id, secret_data, version_stages=None):
request = PutSecretValueRequest()
request.set_accept_format('json')
request.set_SecretName(secret_name)
request.set_VersionId(version_id)
if version_stages:
request.set_VersionStages(version_stages)
request.set_SecretData(secret_data)
response = client.do_action_with_exception(request)
return json.loads(response)
def get_random_password(client, exclude_characters=None):
request = GetRandomPasswordRequest()
request.set_accept_format('json')
if exclude_characters:
request.set_ExcludeCharacters(exclude_characters)
response = client.do_action_with_exception(request)
return json.loads(response)
def update_secret_version_stage(client, secret_name, version_stage, remove_from_version=None, move_to_version=None):
request = UpdateSecretVersionStageRequest()
request.set_accept_format('json')
request.set_VersionStage(version_stage)
request.set_SecretName(secret_name)
if remove_from_version:
request.set_RemoveFromVersion(remove_from_version)
if move_to_version:
request.set_MoveToVersion(move_to_version)
response = client.do_action_with_exception(request)
return json.loads(response)
def create_rds_account(client, db_instance_id, account_name, account_password, account_type):
request = CreateAccountRequest()
request.set_accept_format('json')
request.set_DBInstanceId(db_instance_id)
request.set_AccountName(account_name)
request.set_AccountPassword(account_password)
request.set_AccountType(account_type)
response = client.do_action_with_exception(request)
return json.loads(response)
def grant_account_privilege(client, db_instance_id, account_name, db_name, account_privilege):
request = GrantAccountPrivilegeRequest()
request.set_accept_format('json')
request.set_DBInstanceId(db_instance_id)
request.set_AccountName(account_name)
request.set_DBName(db_name)
request.set_AccountPrivilege(account_privilege)
response = client.do_action_with_exception(request)
return json.loads(response)
def describe_accounts(client, db_instance_id, account_name):
request = DescribeAccountsRequest()
request.set_accept_format('json')
request.set_DBInstanceId(db_instance_id)
request.set_AccountName(account_name)
response = client.do_action_with_exception(request)
return json.loads(response)
def reset_account_password(client, db_instance_id, account_name, account_password):
request = ResetAccountPasswordRequest()
request.set_accept_format('json')
request.set_DBInstanceId(db_instance_id)
request.set_AccountName(account_name)
request.set_AccountPassword(account_password)
response = client.do_action_with_exception(request)
return json.loads(response)Langkah 4: Buat alur rotasi CloudFlow
Masuk ke konsol CloudFlow dan pilih wilayah yang sama dengan fungsi Function Compute Anda.
Di halaman Flows, klik Create flow.
Di halaman Create Flow, klik Create Flow with Code, atur YAML Definition ke berikut, lalu klik Next Step:
version: v1
type: flow
steps:
- type: task
name: RotateSecretNew
resourceArn: <function-arn>
inputMappings:
- target: SecretName
source: $input.payload.SecretName
- target: RegionId
source: $input.payload.RegionId
- target: InstanceId
source: $input.payload.InstanceId
- target: Step
source: new
- type: task
name: RotateSecretSet
resourceArn: <function-arn>
inputMappings:
- target: SecretName
source: $input.payload.SecretName
- target: RegionId
source: $input.payload.RegionId
- target: InstanceId
source: $input.payload.InstanceId
- target: Step
source: set
- target: VersionId
source: $local.VersionId
- type: task
name: RotateSecretEnd
resourceArn: <function-arn>
inputMappings:
- target: SecretName
source: $input.payload.SecretName
- target: RegionId
source: $input.payload.RegionId
- target: InstanceId
source: $input.payload.InstanceId
- target: Step
source: end
- target: VersionId
source: $local.VersionIdGanti <function-arn> dengan Nama Sumber Daya Alibaba Cloud (ARN) dari fungsi yang Anda buat.
Konfigurasikan peran alur, lalu klik Create Flow.
Untuk menjadwalkan rotasi otomatis, buat jadwal berbasis waktu. Lihat Buat jadwal berbasis waktu. Atur Payload ke:
{
"SecretName": "<secret-name>",
"RegionId": "<region-id>",
"InstanceId": "<rds-instance-id>"
}| Field | Description |
|---|---|
SecretName | Nama rahasia generik di KMS |
RegionId | Wilayah tempat rahasia dan fungsi dideploy |
InstanceId | ID instans ApsaraDB RDS |
Langkah 5: Verifikasi rotasi
Setelah mengatur alur, picu eksekusi secara manual untuk memastikan semuanya berfungsi:
Di konsol CloudFlow, buka alur Anda dan klik Start Execution dengan JSON payload yang sama seperti di Langkah 4.
Tunggu hingga eksekusi selesai. Ketiga langkah (RotateSecretNew, RotateSecretSet, RotateSecretEnd) harus menunjukkan status Succeeded.
Di konsol KMS, buka rahasia dan periksa riwayat versinya. Label
ACSCurrentsekarang harus mengarah ke versi baru.
Langkah 6: (Opsional) Hubungkan aplikasi ke Secrets Manager
Gunakan Secrets Manager JDBC untuk mengambil kredensial yang telah dirotasi di aplikasi Anda tanpa perubahan kode. Lihat Secrets Manager JDBC.
Skenario 2: Rotasi rahasia generik MySQL yang dikelola sendiri
Gunakan pendekatan ini ketika database Anda adalah instans MySQL yang dikelola sendiri. Skenario ini menggunakan akun istimewa untuk mengelola rotasi kredensial dan menambahkan langkah test untuk memverifikasi konektivitas sebelum menyelesaikan rotasi.
Prasyarat
Sebelum memulai, pastikan Anda telah memiliki:
Akun Function Compute aktif. Lihat Aktifkan Function Compute.
Akun MySQL istimewa dan rahasia generik untuk akun tersebut di konsol KMS.
Rahasia generik kedua untuk akun reguler yang akan dirotasi.
Peran layanan Function Compute dengan izin yang tercantum di Langkah 1.
Langkah 1: Siapkan izin
Buat peran layanan untuk Function Compute dan sambungkan kebijakan berikut:
AliyunFCDefaultRolePolicy — kebijakan sistem
AliyunSTSAssumeRoleAccess — kebijakan sistem
Kebijakan kustom yang memberikan akses ke KMS:
{
"Version": "1",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kms:GetSecretValue",
"kms:GetRandomPassword",
"kms:Decrypt",
"kms:GenerateDataKey",
"kms:PutSecretValue",
"kms:UpdateSecretVersionStage"
],
"Resource": "*"
}
]
}Langkah 2: Buat rahasia generik
Buat dua rahasia generik di konsol KMS.
Rahasia akun istimewa — digunakan untuk membuat dan mengelola akun reguler:
{
"Endpoint": "<mysql-host>",
"AccountName": "<privileged-account-name>",
"AccountPassword": "<privileged-account-password>",
"SSL": false
}| Field | Description | Default |
|---|---|---|
Endpoint | Nama domain atau alamat IP database MySQL | — |
AccountName | Username akun istimewa | — |
AccountPassword | Password akun istimewa | — |
SSL | Apakah menggunakan sertifikat SSL untuk koneksi | false |
Rahasia akun reguler — rahasia yang akan dirotasi. Nama rahasia ini diteruskan sebagai parameter penjadwalan ke CloudFlow:
{
"Endpoint": "<mysql-host>",
"AccountName": "<account-name>",
"AccountPassword": "<account-password>",
"MasterSecret": "<privileged-secret-name>",
"SSL": false
}| Field | Description | Default |
|---|---|---|
Endpoint | Nama domain atau alamat IP database MySQL | — |
AccountName | Username akun yang akan dirotasi | — |
AccountPassword | Password akun yang akan dirotasi | — |
MasterSecret | Nama rahasia akun istimewa di KMS | — |
SSL | Apakah menggunakan sertifikat SSL untuk koneksi | false |
JikaSSLbernilaitrue, fungsi mengharapkan sertifikat SSL berada di/opt/python/certs/cert.pem. Tambahkan sertifikat ke layer kustom (lihat Langkah 3).
Langkah 3: Buat fungsi rotasi
Buat layanan Function Compute dengan akses VPC diaktifkan (konfigurasi sama seperti Skenario 1). Pastikan fungsi dapat mengakses instans MySQL dan titik akhir VPC KMS.
Buat fungsi dengan Runtime Environments diatur ke Python 3.9. Gunakan kode berikut:
# -*- coding: utf-8 -*-
import json
import logging
import os
try:
import pymysql
except:
os.system('pip install pymysql -t ./')
import pymysql
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkcore.auth.credentials import StsTokenCredential
from aliyunsdkcore.client import AcsClient
from aliyunsdkkms.request.v20160120.GetRandomPasswordRequest import GetRandomPasswordRequest
from aliyunsdkkms.request.v20160120.GetSecretValueRequest import GetSecretValueRequest
from aliyunsdkkms.request.v20160120.PutSecretValueRequest import PutSecretValueRequest
from aliyunsdkkms.request.v20160120.UpdateSecretVersionStageRequest import UpdateSecretVersionStageRequest
from aliyunsdkrds.request.v20140815.DescribeDBInstancesRequest import DescribeDBInstancesRequest
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def handler(event, context):
evt = json.loads(event)
secret_name = evt['SecretName']
region_id = evt['RegionId']
step = evt['Step']
version_id = evt.get('VersionId')
if not version_id:
version_id = context.requestId
credentials = StsTokenCredential(context.credentials.accessKeyId, context.credentials.accessKeySecret,
context.credentials.securityToken)
client = AcsClient(region_id=region_id, credential=credentials)
endpoint = "kms-vpc." + region_id + ".aliyuncs.com"
client.add_endpoint(region_id, 'kms', endpoint)
resp = get_secret_value(client, secret_name)
if "Generic" != resp['SecretType']:
logger.error("Secret %s is not enabled for rotation" % secret_name)
raise ValueError("Secret %s is not enabled for rotation" % secret_name)
if step == "new":
new_phase(client, secret_name, version_id)
elif step == "set":
set_phase(client, secret_name, version_id)
elif step == "test":
test_phase(client, secret_name, version_id)
elif step == "end":
end_phase(client, secret_name, version_id)
else:
logger.error("handler: Invalid step parameter %s for secret %s" % (step, secret_name))
raise ValueError("Invalid step parameter %s for secret %s" % (step, secret_name))
return {"VersionId": version_id}
def new_phase(client, secret_name, version_id):
current_dict = get_secret_dict(client, secret_name, "ACSCurrent")
try:
get_secret_dict(client, secret_name, "ACSPending", version_id)
logger.info("new: Successfully retrieved secret for %s." % secret_name)
except ServerException as e:
if e.error_code != 'Forbidden.ResourceNotFound':
raise
current_dict['AccountName'] = get_alt_account_name(current_dict['AccountName'])
exclude_characters = os.environ['EXCLUDE_CHARACTERS'] if 'EXCLUDE_CHARACTERS' in os.environ else '/@"\'\\'
passwd = get_random_password(client, exclude_characters)
current_dict['AccountPassword'] = passwd['RandomPassword']
put_secret_value(client, secret_name, version_id, json.dumps(current_dict),
json.dumps(['ACSPending']))
logger.info(
"new: Successfully put secret for secret_name %s and version %s." % (secret_name, version_id))
def set_phase(client, secret_name, version_id):
current_dict = get_secret_dict(client, secret_name, "ACSCurrent")
pending_dict = get_secret_dict(client, secret_name, "ACSPending", version_id)
conn = get_connection(pending_dict)
if conn:
conn.close()
logger.info(
"set: ACSPending secret is already set as password in MySQL DB for secret secret_name %s." % secret_name)
return
if get_alt_account_name(current_dict['AccountName']) != pending_dict['AccountName']:
logger.error("set: Attempting to modify user %s other than current user or rotation %s" % (
pending_dict['AccountName'], current_dict['AccountName']))
raise ValueError("Attempting to modify user %s other than current user or rotation %s" % (
pending_dict['AccountName'], current_dict['AccountName']))
if current_dict['Endpoint'] != pending_dict['Endpoint']:
logger.error("set: Attempting to modify user for Endpoint %s other than current Endpoint %s" % (
pending_dict['Endpoint'], current_dict['Endpoint']))
raise ValueError("Attempting to modify user for Endpoint %s other than current Endpoint %s" % (
pending_dict['Endpoint'], current_dict['Endpoint']))
conn = get_connection(current_dict)
if not conn:
logger.error("set: Unable to access the given database using current credentials for secret %s" % secret_name)
raise ValueError("Unable to access the given database using current credentials for secret %s" % secret_name)
conn.close()
master_secret = current_dict['MasterSecret']
master_dict = get_secret_dict(client, master_secret, "ACSCurrent")
if current_dict['Endpoint'] != master_dict['Endpoint'] and not is_rds_replica_database(current_dict, master_dict):
logger.error("set: Current database Endpoint %s is not the same Endpoint as/rds replica of master %s" % (
current_dict['Endpoint'], master_dict['Endpoint']))
raise ValueError("Current database Endpoint %s is not the same Endpoint as/rds replica of master %s" % (
current_dict['Endpoint'], master_dict['Endpoint']))
conn = get_connection(master_dict)
if not conn:
logger.error(
"set: Unable to access the given database using credentials in master secret secret %s" % master_secret)
raise ValueError("Unable to access the given database using credentials in master secret secret %s" % master_secret)
try:
with conn.cursor() as cur:
cur.execute("SELECT User FROM mysql.user WHERE User = %s", pending_dict['AccountName'])
if cur.rowcount == 0:
cur.execute("CREATE USER %s IDENTIFIED BY %s",
(pending_dict['AccountName'], pending_dict['AccountPassword']))
cur.execute("SHOW GRANTS FOR %s", current_dict['AccountName'])
for row in cur.fetchall():
if 'XA_RECOVER_ADMIN' in row[0]:
continue
grant = row[0].split(' TO ')
new_grant_escaped = grant[0].replace('%', '%%') # % is a special character in Python format strings.
cur.execute(new_grant_escaped + " TO %s ", (pending_dict['AccountName'],))
cur.execute("SELECT VERSION()")
ver = cur.fetchone()[0]
escaped_encryption_statement = get_escaped_encryption_statement(ver)
cur.execute("SELECT ssl_type, ssl_cipher, x509_issuer, x509_subject FROM mysql.user WHERE User = %s",
current_dict['AccountName'])
tls_options = cur.fetchone()
ssl_type = tls_options[0]
if not ssl_type:
cur.execute(escaped_encryption_statement + " NONE", pending_dict['AccountName'])
elif ssl_type == "ANY":
cur.execute(escaped_encryption_statement + " SSL", pending_dict['AccountName'])
elif ssl_type == "X509":
cur.execute(escaped_encryption_statement + " X509", pending_dict['AccountName'])
else:
cur.execute(escaped_encryption_statement + " CIPHER %s AND ISSUER %s AND SUBJECT %s",
(pending_dict['AccountName'], tls_options[1], tls_options[2], tls_options[3]))
password_option = get_password_option(ver)
cur.execute("SET PASSWORD FOR %s = " + password_option,
(pending_dict['AccountName'], pending_dict['AccountPassword']))
conn.commit()
logger.info("set: Successfully changed password for %s in MySQL DB for secret secret_name %s." % (
pending_dict['AccountName'], secret_name))
finally:
conn.close()
def test_phase(client, secret_name, version_id):
conn = get_connection(get_secret_dict(client, secret_name, "ACSPending", version_id))
if conn:
try:
with conn.cursor() as cur:
cur.execute("SELECT NOW()")
conn.commit()
finally:
conn.close()
logger.info("test: Successfully accessed into MySQL DB with ACSPending secret in %s." % secret_name)
return
else:
logger.error(
"test: Unable to access the given database with pending secret of secret secret_name %s" % secret_name)
raise ValueError("Unable to access the given database with pending secret of secret secret_name %s" % secret_name)
def end_phase(client, secret_name, version_id):
update_secret_version_stage(client, secret_name, 'ACSCurrent', move_to_version=version_id)
update_secret_version_stage(client, secret_name, 'ACSPending', remove_from_version=version_id)
logger.info(
"end: Successfully update ACSCurrent stage to version %s for secret %s." % (version_id, secret_name))
def get_connection(secret_dict):
port = int(secret_dict['Port']) if 'Port' in secret_dict else 3306
dbname = secret_dict['DBName'] if 'DBName' in secret_dict else None
use_ssl, fall_back = get_ssl_config(secret_dict)
conn = connect_and_authenticate(secret_dict, port, dbname, use_ssl)
if conn or not fall_back:
return conn
else:
return connect_and_authenticate(secret_dict, port, dbname, False)
def get_ssl_config(secret_dict):
if 'SSL' not in secret_dict:
return True, True
if isinstance(secret_dict['SSL'], bool):
return secret_dict['SSL'], False
if isinstance(secret_dict['SSL'], str):
ssl = secret_dict['SSL'].lower()
if ssl == "true":
return True, False
elif ssl == "false":
return False, False
else:
return True, True
return True, True
def connect_and_authenticate(secret_dict, port, dbname, use_ssl):
ssl = {'ca': '/opt/python/certs/cert.pem'} if use_ssl else None
try:
conn = pymysql.connect(host=secret_dict['Endpoint'], user=secret_dict['AccountName'],
password=secret_dict['AccountPassword'],
port=port, database=dbname, connect_timeout=5, ssl=ssl)
logger.info("Successfully established %s connection as user '%s' with Endpoint: '%s'" % (
"SSL/TLS" if use_ssl else "non SSL/TLS", secret_dict['AccountName'], secret_dict['Endpoint']))
return conn
except pymysql.OperationalError as e:
if 'certificate verify failed: IP address mismatch' in e.args[1]:
logger.error(
"Hostname verification failed when estlablishing SSL/TLS Handshake with Endpoint: %s" % secret_dict[
'Endpoint'])
return None
def get_secret_dict(client, secret_name, stage, version_id=None):
required_fields = ['Endpoint', 'AccountName', 'AccountPassword']
if version_id:
secret = get_secret_value(client, secret_name, version_id, stage)
else:
secret = get_secret_value(client, secret_name, stage=stage)
plaintext = secret['SecretData']
secret_dict = json.loads(plaintext)
for field in required_fields:
if field not in secret_dict:
raise KeyError("%s key is missing from secret JSON" % field)
return secret_dict
def get_alt_account_name(current_account_name):
rotation_suffix = "_rt"
if current_account_name.endswith(rotation_suffix):
return current_account_name[:(len(rotation_suffix) * -1)]
else:
new_account_name = current_account_name + rotation_suffix
if len(new_account_name) > 16:
raise ValueError(
"Unable to rotate user, account_name length with _rotation appended would exceed 16 characters")
return new_account_name
def get_password_option(version):
if version.startswith("8"):
return "%s"
else:
return "PASSWORD(%s)"
def get_escaped_encryption_statement(version):
if version.startswith("5.6"):
return "GRANT USAGE ON *.* TO %s@'%%' REQUIRE"
else:
return "ALTER USER %s@'%%' REQUIRE"
def is_rds_replica_database(client, replica_dict, master_dict):
replica_instance_id = replica_dict['Endpoint'].split(".")[0].replace('io', '')
master_instance_id = master_dict['Endpoint'].split(".")[0].replace('io', '')
try:
describe_response = describe_db_instances(client, replica_instance_id)
except Exception as err:
logger.warning("Encountered error while verifying rds replica status: %s" % err)
return False
items = describe_response['Items']
instances = items.get("DBInstance")
if not instances:
logger.info("Cannot verify replica status - no RDS instance found with identifier: %s" % replica_instance_id)
return False
current_instance = instances[0]
return master_instance_id == current_instance.get('DBInstanceId')
def get_secret_value(client, secret_name, version_id=None, stage=None):
request = GetSecretValueRequest()
request.set_accept_format('json')
request.set_SecretName(secret_name)
if version_id:
request.set_VersionId(version_id)
if stage:
request.set_VersionStage(stage)
response = client.do_action_with_exception(request)
return json.loads(response)
def put_secret_value(client, secret_name, version_id, secret_data, version_stages=None):
request = PutSecretValueRequest()
request.set_accept_format('json')
request.set_SecretName(secret_name)
request.set_VersionId(version_id)
if version_stages:
request.set_VersionStages(version_stages)
request.set_SecretData(secret_data)
response = client.do_action_with_exception(request)
return json.loads(response)
def get_random_password(client, exclude_characters=None):
request = GetRandomPasswordRequest()
request.set_accept_format('json')
if exclude_characters:
request.set_ExcludeCharacters(exclude_characters)
response = client.do_action_with_exception(request)
return json.loads(response)
def update_secret_version_stage(client, secret_name, version_stage, remove_from_version=None, move_to_version=None):
request = UpdateSecretVersionStageRequest()
request.set_accept_format('json')
request.set_VersionStage(version_stage)
request.set_SecretName(secret_name)
if remove_from_version:
request.set_RemoveFromVersion(remove_from_version)
if move_to_version:
request.set_MoveToVersion(move_to_version)
response = client.do_action_with_exception(request)
return json.loads(response)
def describe_db_instances(client, db_instance_id):
request = DescribeDBInstancesRequest()
request.set_accept_format('json')
request.set_DBInstanceId(db_instance_id)
response = client.do_action_with_exception(request)
return json.loads(response)Buat layer kustom untuk mengemas dependensi PyMySQL (dan opsional sertifikat SSL):
Jalankan perintah berikut untuk membuat paket layer:
mkdir my-secret-rotate cd my-secret-rotate pip install --target ./python pymysql # If SSL is required, create the certs directory and add your certificate: # mkdir python/certs && cp cert.pem python/certs/ zip -r my-secret-rotate.zip pythonDi konsol Function Compute, buat layer kustom menggunakan paket ZIP. Lihat Buat layer kustom.
Sambungkan layer kustom ke fungsi Anda. Di halaman Services, klik Functions untuk layanan Anda, temukan fungsi, klik Configure, dan tambahkan layer di bagian Layers. Lihat Kelola layer.
Langkah 4: Buat alur rotasi CloudFlow
Masuk ke konsol CloudFlow dan pilih wilayah yang sama dengan fungsi Anda.
Di halaman Flows, klik Create flow.
Di halaman Create Flow, klik Create Flow with Code, atur YAML Definition ke berikut, lalu klik Next Step:
version: v1
type: flow
steps:
- type: task
name: RotateSecretNew
resourceArn: <function-arn>
inputMappings:
- target: SecretName
source: $input.payload.SecretName
- target: RegionId
source: $input.payload.RegionId
- target: Step
source: new
- type: task
name: RotateSecretSet
resourceArn: <function-arn>
inputMappings:
- target: SecretName
source: $input.payload.SecretName
- target: RegionId
source: $input.payload.RegionId
- target: Step
source: set
- target: VersionId
source: $local.VersionId
- type: task
name: RotateSecretTest
resourceArn: <function-arn>
inputMappings:
- target: SecretName
source: $input.payload.SecretName
- target: RegionId
source: $input.payload.RegionId
- target: Step
source: test
- target: VersionId
source: $local.VersionId
- type: task
name: RotateSecretEnd
resourceArn: <function-arn>
inputMappings:
- target: SecretName
source: $input.payload.SecretName
- target: RegionId
source: $input.payload.RegionId
- target: Step
source: end
- target: VersionId
source: $local.VersionIdGanti <function-arn> dengan ARN dari fungsi yang Anda buat.
Konfigurasikan peran alur, lalu klik Create Flow.
Untuk menjadwalkan rotasi otomatis, buat jadwal berbasis waktu dan atur Payload ke:
{
"SecretName": "<secret-name>",
"RegionId": "<region-id>"
}| Field | Description |
|---|---|
SecretName | Nama rahasia generik akun reguler di KMS |
RegionId | Wilayah tempat rahasia dan fungsi dideploy |
Langkah 5: Verifikasi rotasi
Picu eksekusi secara manual untuk memastikan pengaturan berfungsi:
Di konsol CloudFlow, buka alur Anda dan klik Start Execution dengan JSON payload dari Langkah 4.
Tunggu hingga eksekusi selesai. Keempat langkah (RotateSecretNew, RotateSecretSet, RotateSecretTest, RotateSecretEnd) harus menunjukkan status Succeeded.
Di konsol KMS, buka rahasia dan periksa riwayat versinya. Label
ACSCurrentharus mengarah ke versi baru.
Langkah 6: (Opsional) Hubungkan aplikasi ke Secrets Manager
Gunakan Secrets Manager JDBC untuk mengambil kredensial yang telah dirotasi secara otomatis. Lihat Secrets Manager JDBC.
Langkah selanjutnya
Secrets Manager JDBC — integrasikan kredensial yang telah dirotasi dengan aplikasi berbasis JDBC
Ikhtisar rahasia ApsaraDB RDS dinamis — pertimbangkan rahasia dinamis jika database Anda berada di ApsaraDB RDS
Apa itu Function Compute? — pelajari lebih lanjut tentang layanan komputasi arsitektur tanpa server yang digunakan dalam panduan ini