You can enable multi-factor authentication (MFA) for a RAM user to improve the logon security of the RAM user. The ram-risky-policy-user-mfa-check managed rule provided by Cloud Config can be used to check whether MFA is enabled for all RAM users. To check whether MFA is enabled for a specified RAM user, such as a RAM user that is authorized to perform high-risk operations, you must use a custom rule.

Prerequisites

Items

To create a custom rule in the Cloud Config console, you must configure the required items. The following table describes the items.
Alibaba Cloud service Item Example
Cloud Config Rule name RAMUserMFA
Trigger type Configuration change and periodic execution
Evaluation frequency Interval of 1 hour
Name of the input parameter dangerousActions
Expected value of the input parameter ecs:*,oss:*,log:*
Message Service (MNS) Topic MNSTestConfig
Region Singapore (Singapore)
Note

Cloud Config is deployed in the Singapore (Singapore) region. To reduce packet loss, we recommend that you specify Singapore (Singapore) as the region for the MNS topic.

Access control RAM user name Alice
RAM user ID 25849250231246****
Policy AliyunECSFullAccess
Function Compute Service Ram_User
Function RamDangerousPolicyUserBindMFA

Process

The following figure shows the process of creating a custom rule. Process

Procedure

  1. Create a service.
    1. Log on to the Function Compute console.
    2. In the left-side navigation pane, click Services & Functions.
    3. In the top navigation bar, select a region,such as Singapore.
    4. On the Services page, click Create Service.
    5. In the Create Service panel, enter Ram_User in the Name field.
    6. Click OK.
  2. Create a function.
    1. On the details page of the Ram_User service, click Functions in the left-side navigation pane. Then, click Create Function.
    2. On the Create Function page, set the Function Name parameter to RamDangerousPolicyUserBindMFA and the Runtime Environments parameter to Python 3.6, and use the default values for other parameters.
    3. Click Create.
  3. Configure the environment variable of the function.
    1. On the details page of the RamDangerousPolicyUserBindMFA function, click the Configurations tab.
    2. In the Environment Variables section, click Modify.
    3. Click Add Variable, and specify a name and value for the environment variable.
      Key Value description Examples
      AK The AccessKey ID of your Alibaba Cloud account. For more information about how to obtain an AccessKey ID, see Obtain an AccessKey pair. LTAI4G6JZSANb8MZMkm1****
      SK The AccessKey secret of your Alibaba Cloud account. For more information about how to obtain an AccessKey secret, see Obtain an AccessKey pair. EMLHThhpD2UJqH1DXuAKii2sI****
      ResourceTypes The type of the resource. ACS::RAM::User
    4. Click OK.
  4. Write code for the function that checks whether MFA is enabled for a specified RAM user.
    1. On the details page of the RamDangerousPolicyUserBindMFA function, click the Code tab. In the code editor, select the index.py file.
    2. Copy and paste the following code in the index.py file:
      #!/usr/bin/env python
      # -*- encoding: utf-8 -*-
      
      import logging
      import json
      import os
      from aliyunsdkcore.client import AcsClient
      from aliyunsdkram.request.v20150501 import ListPoliciesForUserRequest
      from aliyunsdkram.request.v20150501 import GetPolicyRequest
      from aliyunsdkram.request.v20150501 import GetUserMFAInfoRequest
      from aliyunsdkcore.auth.credentials import StsTokenCredential
      from aliyunsdkcore.acs_exception.exceptions import ClientException
      from aliyunsdkcore.acs_exception.exceptions import ServerException
      from aliyunsdkcore.request import CommonRequest
      
      logger = logging.getLogger()
      
      
      # AccessKey/SecretKey, need AliyunConfigFullAccess policy
      AK = 'LTAI4FgrMeKLB7NqDmPe****'
      SK = 'dylEiakiwLFB1CufDyxyCwlCxZ****'
      
      # Valid compliace type
      COMPLIACE_TYPE_COMPLIANT = 'COMPLIANT'
      COMPLIACE_TYPE_NON_COMPLIANT = 'NON_COMPLIANT'
      COMPLIACE_TYPE_NOT_APPLICABLE = 'NOT_APPLICABLE'
      COMPLIACE_TYPE_INSUFFICIENT_DATA = 'INSUFFICIENT_DATA'
      
      
      def handler(event, context):
          # Validate event
          evt = validate_event(event)
          if not evt:
              return None
      
          rule_parameters = evt.get('ruleParameters')
          result_token = evt.get('resultToken')
          ordering_timestamp = evt.get('orderingTimestamp')
          invoking_event = evt.get('invokingEvent')
      
          # Initilize
          compliance_type = COMPLIACE_TYPE_NOT_APPLICABLE
          annotation = None
      
      
          configuration_item = invoking_event.get('configurationItem')
          if not configuration_item:
              logger.error('Configuration item is empty.')
              return None
      
          resource_id = configuration_item.get('resourceId')
          resource_type = configuration_item.get('resourceType')
          region_id = configuration_item.get('regionId')
      
          creds = context.credentials
          # Get compliace result
          compliance_type, annotation = evaluate_configuration_item(rule_parameters, configuration_item, creds)
      
          # Compliance result
          evaluations = [
              {
                  'complianceResourceId': resource_id,
                  'complianceResourceType': resource_type,
                  'complianceRegionId': region_id,
                  'orderingTimestamp': ordering_timestamp,
                  'complianceType': compliance_type,
                  'annotation': annotation
              }
          ]
      
          put_evaluations(context, result_token, evaluations)
          return evaluations
      
      
      def evaluate_configuration_item(rule_parameters, configuration_item, creds):
          # Initilize
          compliance_type = COMPLIACE_TYPE_NOT_APPLICABLE
          annotation = None
      
          # Get resource type and configuration
          resource_type = configuration_item['resourceType']
          full_configuration = configuration_item['configuration']
      
      
          # Check configuration
          if not full_configuration:
              annotation = 'Configuration is empty.'
              return compliance_type, annotation
      
          # Parse to json object
          configuration = parse_json(full_configuration)
          if not configuration:
              annotation = 'Configuration:{} in invald.'.format(full_configuration)
              return compliance_type, annotation
      
          # =========== Customer code start =========== #
          if 'UserPrincipalName' in configuration and configuration['UserPrincipalName']:
              user_name = configuration_item['resourceName']
              user_principal_name = configuration['UserPrincipalName']
              if rule_parameters and 'dangerousActions' in rule_parameters and rule_parameters['dangerousActions']:
                  actions = rule_parameters['dangerousActions'].split(',')
                  isvalid, reason = validate_user_bind_MFADevice(user_name, user_principal_name, actions)
                  if isvalid:
                      compliance_type, annotation = COMPLIACE_TYPE_COMPLIANT, None
                  else:
                      compliance_type, annotation = COMPLIACE_TYPE_NON_COMPLIANT, reason
              else:
                  compliance_type, annotation = COMPLIACE_TYPE_COMPLIANT, 'rule parameter:{dangerousActions} not specified'
          else:
              compliance_type, annotation = COMPLIACE_TYPE_INSUFFICIENT_DATA, 'ram user not named'
      
          # =========== Customer code end =========== #
          return compliance_type, annotation
      
      
      def validate_user_bind_MFADevice(user_name, user_principal_name, input_actions):
          client = AcsClient(AK, SK)
          # Get user policy list
          list_user_policy_req = ListPoliciesForUserRequest.ListPoliciesForUserRequest()
          list_user_policy_req.set_UserName(user_name)
          list_user_policy_res = None
          try:
              list_user_policy_res = client.do_action_with_exception(list_user_policy_req)
          except ServerException as se:
              if se.get_http_status() == 404 and se.get_error_code() == 'EntityNotExist.User':
                  return True, None
              else:
                  return False, se.get_error_msg()
          if list_user_policy_res is None:
              return True, None
      
          list_user_policy_json_res = json.loads(list_user_policy_res)
          user_policies = list_user_policy_json_res['Policies']['Policy']
          if user_policies is None or len(user_policies) == 0:
              return True, None
      
          # Get user policy filter by input_actions
          policy_action_list = list()
          for policy in user_policies:
              get_policy_req = GetPolicyRequest.GetPolicyRequest()
              get_policy_req.set_PolicyName(policy['PolicyName'])
              get_policy_req.set_PolicyType(policy['PolicyType'])
              get_policy_res = None
              try:
                  get_policy_res = client.do_action_with_exception(get_policy_req)
                  policy_document = json.loads(json.loads(get_policy_res)['DefaultPolicyVersion']['PolicyDocument'])
                  for action in list(map(lambda x: x['Action'],
                                         list(filter(lambda x: x['Effect'] == 'Allow', policy_document['Statement'])))):
                      if type(action) is list:
                          policy_action_list.extend(action)
                      else:
                          policy_action_list.append(action)
              except ServerException as se:
                  logger.error(se)
              except Exception as ex:
                  logger.error(ex)
          policy_action_set = set(policy_action_list)
          logger.info('Policy actions: {}, input: {} .'.format(str(policy_action_set), str(input_actions)))
      
          # Verify policy actions contains input_actions
          if policy_action_set.intersection(set(input_actions)):
              get_user_mfa_req = GetUserMFAInfoRequest.GetUserMFAInfoRequest()
              get_user_mfa_req.set_UserName(user_principal_name)
              get_user_mfa_res = None
              try:
                  get_user_mfa_res = client.do_action_with_exception(get_user_mfa_req)
                  logger.info('GetUserMFAInfo user_name: {}, result: {} .'.format(user_name, str(get_user_mfa_res)))
                  if ('SerialNumber' not in get_user_mfa_res) or ('MFADevice' not in get_user_mfa_res):
                      return False, 'user MFADevice empty'
                  else:
                      return True, None
              except ServerException as se:
                  return False, se.get_error_msg()
          else:
              return True, None
      
      
      def validate_event(event):
          if not event:
              logger.error('Event is empty.')
      
          evt = parse_json(event)
          logger.info('Loading event: %s .' % evt)
      
          if 'resultToken' not in evt:
              logger.error('ResultToken is empty.')
              return None
      
          if 'ruleParameters' not in evt:
              logger.error('RuleParameters is empty.')
              return None
      
          if 'invokingEvent' not in evt:
              logger.error('InvokingEvent is empty.')
              return None
          return evt
      
      
      def parse_json(content):
          try:
              return json.loads(content)
          except Exception as e:
              logger.error('Parse content:{} to json error:{}.'.format(content, e))
              return None
      
      
      def put_evaluations(context, result_token, evaluations):
          # ak/sk, need AliyunConfigFullAccess policy
          client = AcsClient(AK, SK, 'ap-southeast-1')
      
          # Open api request
          request = CommonRequest()
          request.set_domain('config.ap-southeast-1.aliyuncs.com')
          request.set_version('2019-01-08')
          request.set_action_name('PutEvaluations')
          request.add_body_params('ResultToken', result_token)
          request.add_body_params('Evaluations', evaluations)
          request.set_method('POST')
      
          try:
              response = client.do_action_with_exception(request)
              logger.info('PutEvaluations with request: {}, response: {}.'.format(request, response))
          except Exception as e:
              logger.error('PutEvaluations error: %s' % e)
      The code is used to check whether MFA is enabled for a specified RAM user. The following table describes the parameters in the code.
      Item Description Examples
      AK The AccessKey ID of your Alibaba Cloud account. The value must be the same as the AccessKey ID specified in Step 3. LTAI4FgrMeKLB7NqDmPe****
      SK The AccessKey secret of your Alibaba Cloud account. The value must be the same as the AccessKey secret specified in Step 3. dylEiakiwLFB1CufDyxyCwlCxZ****
      user_name The name of the RAM user. Alice.
      rule_parameters The input parameter of the rule. dangerousActions
      input_actions The high-risk operations that you want to manage. ecs:*,oss:*,log:*
      configuration_item The configuration of the resource. For more information, see What is the data structure of functions that can be used to create custom rules?
      Note The sample code is used to check whether MFA is enabled for a specified RAM user. For more information about other parameters that can be used to check RAM users, see What is the data structure of functions that can be used to create custom rules?
    3. In the code editor, click Deploy in the upper-right corner.
  5. Create a custom rule.
    1. Log on to the Cloud Config console.
    2. In the left-side navigation pane, click Rules.
    3. On the Rules page, click Create Rule.
    4. On the Create Rule page, click Create Custom Rule.
    5. In the Function ARN section of the Properties step, set the Region parameter to Singapore, the Service parameter to Ram_User, and the Function parameter to RamDangerousPolicyUserBindMFA. Enter RamUserMFA in the Rule Name field, select Configuration Change and Periodical Execution for the Trigger Type parameter, set the Frequency parameter to 1 Hour, and then click Next.
      Create Rule wizard
    6. In the Assess Resource Scope step, click Custom Resource Types, select RAM User as the type of resources that you want to associate with the rule, and then click Next.
      Assess Resource Scope step
    7. In the Parameters step, click Add Rule Parameter. Set the Key parameter to dangerousActions and the Expected Value parameter to ecs:*,oss:*,log:*. Then, click Next.
      Parameters step
      Note The name and expected value of the input parameter must be the same as the values of the rule_parameters and input_actions parameters configured in Step 4.
    8. In the Modify step, click Next.
    9. In the Preview and Save step, check the configurations and click Submit.
  6. View the compliance evaluation result of RAM User Alice.
    1. Click View Details.
    2. Click the Result tab.
    3. In the Compliance Result of Related Resources section of the Result tab, click the ID of the RAM user to view the compliance evaluation result.
      View the compliance evaluation result
  7. Specify an MNS topic to which you want to deliver resource incompliance events.
    Specify an MNS topic, such as MNSTestConfig, to which you want to deliver resource incompliance events. After the configuration is complete, MNS sends notifications when resource incompliance events occur. For more information, see Deliver resource data to an MNS topic.