为RAM用户开启多因素认证(MFA),可以提高RAM用户的使用安全。通过配置审计的托管规则(高权限的RAM用户开启MFA),可以检测所有RAM用户是否开启MFA。如果您只想检测指定RAM用户(例如:高风险用户)是否开启MFA,请使用本文所述的自定义规则的方法。
前提条件
- 请确保您已开通消息服务MNS服务。具体操作,请参见开通消息服务MNS并授权。
- 请确保您已开通函数计算服务。具体操作,请参见使用控制台创建函数。
- 请确保您已创建RAM用户Alice,且已授予系统权限AliyunECSFullAccess(管理云服务器ECS的权限)。具体操作,请参见创建RAM用户和为RAM用户授权。
数据规划
数据规划如下表所示。
云服务 | 参数 | 示例 |
---|---|---|
配置审计 | 规则名称 | RAMUserMFA |
规则触发机制 | 配置变更,周期执行 | |
触发频率 | 1小时 | |
规则入参名称 | dangerousActions | |
期望值 | ecs:*,oss:*,log:* | |
消息服务 | 主题名称 | MNSTestConfig |
主题地域 | 新加坡 说明 由于配置审计部署在新加坡,为了减少网络损耗,建议消息服务MNS的主题地域选择新加坡。 | |
访问控制 | RAM用户名称 | Alice |
RAM用户ID | 25849250231246**** | |
默认权限策略 | AliyunECSFullAccess | |
函数计算 | 服务 | Ram_User |
函数 | RamDangerousPolicyUserBindMFA |
操作流程
操作流程如下图所示。

操作步骤
- 新建服务。
- 登录函数计算控制台。
- 在左侧导航栏,单击服务及函数。
- 在顶部菜单栏,选择地域,例如:新加坡。
- 在服务列表页面,单击创建服务。
- 在创建服务面板,名称输入Ram_User。
- 单击确定。
- 新建函数。
- 在服务Ram_User的函数管理页面,单击创建函数。
- 在创建函数页面,函数名称输入RamDangerousPolicyUserBindMFA,运行环境选择Python 3.6,其他参数保持默认值。
- 单击创建。
- 配置函数的环境变量。
- 在函数RamDangerousPolicyUserBindMFA的函数代码页签,单击函数配置页签。
- 在环境变量区域,单击编辑。
- 单击添加变量,输入该环境变量的变量名称和变量值。
变量 值 示例 AK 当前阿里云账号的AccessKey ID。关于如何获取AccessKey ID,请参见获取AccessKey。 LTAI4G6JZSANb8MZMkm1**** SK 当前阿里云账号的AccessKey Secret。关于如何获取AccessKey Secret,请参见获取AccessKey。 EMLHThhpD2UJqH1DXuAKii2sI**** ResourceTypes 资源类型。 ACS::RAM::User - 单击确定。
- 配置检测RAM用户是否开启MFA的代码。
- 在函数RamDangerousPolicyUserBindMFA的函数代码页签,单击文件index.py。
- 拷贝并粘贴如下代码至文件index.py。
#!/usr/bin/env python # -*- encoding: utf-8 -*- import logging import json import os from aliyunsdkcore.client import AcsClient from aliyunsdkram.request.v20150501 import ListPoliciesForUserRequest from aliyunsdkram.request.v20150501 import GetPolicyRequest from aliyunsdkram.request.v20150501 import GetUserMFAInfoRequest from aliyunsdkcore.auth.credentials import StsTokenCredential from aliyunsdkcore.acs_exception.exceptions import ClientException from aliyunsdkcore.acs_exception.exceptions import ServerException from aliyunsdkcore.request import CommonRequest logger = logging.getLogger() # AccessKey/SecretKey, need AliyunConfigFullAccess policy AK = 'LTAI4FgrMeKLB7NqDmPe****' SK = 'dylEiakiwLFB1CufDyxyCwlCxZ****' # Valid compliace type COMPLIACE_TYPE_COMPLIANT = 'COMPLIANT' COMPLIACE_TYPE_NON_COMPLIANT = 'NON_COMPLIANT' COMPLIACE_TYPE_NOT_APPLICABLE = 'NOT_APPLICABLE' COMPLIACE_TYPE_INSUFFICIENT_DATA = 'INSUFFICIENT_DATA' def handler(event, context): # Validate event evt = validate_event(event) if not evt: return None rule_parameters = evt.get('ruleParameters') result_token = evt.get('resultToken') ordering_timestamp = evt.get('orderingTimestamp') invoking_event = evt.get('invokingEvent') # Initialize compliance_type = COMPLIACE_TYPE_NOT_APPLICABLE annotation = None configuration_item = invoking_event.get('configurationItem') if not configuration_item: logger.error('Configuration item is empty.') return None resource_id = configuration_item.get('resourceId') resource_type = configuration_item.get('resourceType') region_id = configuration_item.get('regionId') creds = context.credentials # Get compliace result compliance_type, annotation = evaluate_configuration_item(rule_parameters, configuration_item, creds) # Compliance result evaluations = [ { 'complianceResourceId': resource_id, 'complianceResourceType': resource_type, 'complianceRegionId': region_id, 'orderingTimestamp': ordering_timestamp, 'complianceType': compliance_type, 'annotation': annotation } ] put_evaluations(context, result_token, evaluations) return evaluations def evaluate_configuration_item(rule_parameters, configuration_item, creds): # Initialize compliance_type = COMPLIACE_TYPE_NOT_APPLICABLE annotation = None # Get resource type and configuration resource_type = configuration_item['resourceType'] full_configuration = configuration_item['configuration'] # Check configuration if not full_configuration: annotation = 'Configuration is empty.' return compliance_type, annotation # Parse to json object configuration = parse_json(full_configuration) if not configuration: annotation = 'Configuration:{} in invald.'.format(full_configuration) return compliance_type, annotation # =========== Customer code start =========== # if 'UserPrincipalName' in configuration and configuration['UserPrincipalName']: user_name = configuration_item['resourceName'] user_principal_name = configuration['UserPrincipalName'] if rule_parameters and 'dangerousActions' in rule_parameters and rule_parameters['dangerousActions']: actions = rule_parameters['dangerousActions'].split(',') isvalid, reason = validate_user_bind_MFADevice(user_name, user_principal_name, actions) if isvalid: compliance_type, annotation = COMPLIACE_TYPE_COMPLIANT, None else: compliance_type, annotation = COMPLIACE_TYPE_NON_COMPLIANT, reason else: compliance_type, annotation = COMPLIACE_TYPE_COMPLIANT, 'rule parameter:{dangerousActions} not specified' else: compliance_type, annotation = COMPLIACE_TYPE_INSUFFICIENT_DATA, 'ram user not named' # =========== Customer code end =========== # return compliance_type, annotation def validate_user_bind_MFADevice(user_name, user_principal_name, input_actions): client = AcsClient(AK, SK) # Get user policy list list_user_policy_req = ListPoliciesForUserRequest.ListPoliciesForUserRequest() list_user_policy_req.set_UserName(user_name) list_user_policy_res = None try: list_user_policy_res = client.do_action_with_exception(list_user_policy_req) except ServerException as se: if se.get_http_status() == 404 and se.get_error_code() == 'EntityNotExist.User': return True, None else: return False, se.get_error_msg() if list_user_policy_res is None: return True, None list_user_policy_json_res = json.loads(list_user_policy_res) user_policies = list_user_policy_json_res['Policies']['Policy'] if user_policies is None or len(user_policies) == 0: return True, None # Get user policy filter by input_actions policy_action_list = list() for policy in user_policies: get_policy_req = GetPolicyRequest.GetPolicyRequest() get_policy_req.set_PolicyName(policy['PolicyName']) get_policy_req.set_PolicyType(policy['PolicyType']) get_policy_res = None try: get_policy_res = client.do_action_with_exception(get_policy_req) policy_document = json.loads(json.loads(get_policy_res)['DefaultPolicyVersion']['PolicyDocument']) for action in list(map(lambda x: x['Action'], list(filter(lambda x: x['Effect'] == 'Allow', policy_document['Statement'])))): if type(action) is list: policy_action_list.extend(action) else: policy_action_list.append(action) except ServerException as se: logger.error(se) except Exception as ex: logger.error(ex) policy_action_set = set(policy_action_list) logger.info('Policy actions: {}, input: {} .'.format(str(policy_action_set), str(input_actions))) # Verify policy actions contains input_actions if policy_action_set.intersection(set(input_actions)): get_user_mfa_req = GetUserMFAInfoRequest.GetUserMFAInfoRequest() get_user_mfa_req.set_UserName(user_principal_name) get_user_mfa_res = None try: get_user_mfa_res = client.do_action_with_exception(get_user_mfa_req) logger.info('GetUserMFAInfo user_name: {}, result: {} .'.format(user_name, str(get_user_mfa_res))) if ('SerialNumber' not in get_user_mfa_res) or ('MFADevice' not in get_user_mfa_res): return False, 'user MFADevice empty' else: return True, None except ServerException as se: return False, se.get_error_msg() else: return True, None def validate_event(event): if not event: logger.error('Event is empty.') evt = parse_json(event) logger.info('Loading event: %s .' % evt) if 'resultToken' not in evt: logger.error('ResultToken is empty.') return None if 'ruleParameters' not in evt: logger.error('RuleParameters is empty.') return None if 'invokingEvent' not in evt: logger.error('InvokingEvent is empty.') return None return evt def parse_json(content): try: return json.loads(content) except Exception as e: logger.error('Parse content:{} to json error:{}.'.format(content, e)) return None def put_evaluations(context, result_token, evaluations): # ak/sk, need AliyunConfigFullAccess policy client = AcsClient(AK, SK, 'ap-southeast-1') # Open api request request = CommonRequest() request.set_domain('config.ap-southeast-1.aliyuncs.com') request.set_version('2019-01-08') request.set_action_name('PutEvaluations') request.add_body_params('ResultToken', result_token) request.add_body_params('Evaluations', evaluations) request.set_method('POST') try: response = client.do_action_with_exception(request) logger.info('PutEvaluations with request: {}, response: {}.'.format(request, response)) except Exception as e: logger.error('PutEvaluations error: %s' % e)
本段代码用于检测RAM用户是否开启MFA,代码中主要参数说明如下表所示。参数 说明 示例 AK 当前阿里云账号的AccessKey ID。该参数必须与步骤 3中的AK相同。 LTAI4FgrMeKLB7NqDmPe**** SK 当前阿里云账号的AccessKey Secret。该参数必须与步骤 3中的SK相同。 dylEiakiwLFB1CufDyxyCwlCxZ**** user_name RAM用户。 Alice rule_parameters 规则参数。 dangerousActions input_actions 指定的危险操作。 ecs:*,oss:*,log:* configuration_item 资源的配置详情。 请参见自定义规则的数据结构如何。 说明 本段代码以检测RAM用户是否开启MFA为例。如果您需要通过其他参数检测RAM用户,请参见自定义规则的数据结构是什么。 - 单击右上角的保存并部署。
- 新建自定义规则。
- 登录配置审计控制台。
- 在左侧导航栏,单击规则。
- 在规则页面,单击新建规则。
- 在新建规则页面,单击新建自定义规则。
- 在基本属性页面,函数Arn的所在地域选择新加坡、服务选择Ram_User、函数选择RamDangerousPolicyUserBindMFA,规则名称输入RamUserMFA,规则触发机制选择配置变更和周期执行,触发频率选择1小时,单击下一步。
- 在评估资源范围页面,先单击自定义资源类型,再选择规则关联的资源类型为Ram用户,单击下一步。
- 在参数设置页面,单击添加规则入参,规则入参名称输入dangerousActions,期望值输入ecs:*,oss:*,log:*,单击下一步。
- 在修正设置页面,单击下一步。
- 在预览并保存页面,单击提交。
- 查看RAM用户Alice的检测结果。
- 在新建规则成功页面,单击查看规则详情。
- 单击检测结果页签。
- 在关联资源的合规结果区域,单击目标资源ID链接,查看该资源的合规结果。
- 设置资源合规事件投递。设置资源合规事件MNSTestConfig投递到消息服务MNS的指定主题(Topic)后,您会收到来自消息服务MNS的不合规通知。具体操作,请参见设置投递数据到消息服务MNS。