#! /usr/bin/env python
# -*- encoding: utf-8 -*-
import logging
import json
import os
from aliyunsdkcore.client import AcsClient
from aliyunsdkram.request.v20150501 import ListPoliciesForUserRequest
from aliyunsdkram.request.v20150501 import GetPolicyRequest
from aliyunsdkram.request.v20150501 import GetUserMFAInfoRequest
from aliyunsdkcore.auth.credentials import StsTokenCredential
from aliyunsdkcore.acs_exception.exceptions import ClientException
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkcore.request import CommonRequest
logger = logging.getLogger()
# Matched resource types
MATCH_RESOURCE_TYPES = os.environ.get('resourceTypes')
# AccessKey/SecretKey, need AliyunConfigFullAccess policy
AK = os.environ.get('AK')
SK = os.environ.get('SK')
# Valid compliace type
COMPLIACE_TYPE_COMPLIANT = 'COMPLIANT'
COMPLIACE_TYPE_NON_COMPLIANT = 'NON_COMPLIANT'
COMPLIACE_TYPE_NOT_APPLICABLE = 'NOT_APPLICABLE'
COMPLIACE_TYPE_INSUFFICIENT_DATA = 'INSUFFICIENT_DATA'
def handler(event, context):
# Validate event
evt = validate_event(event)
if not evt:
return None
rule_parameters = evt.get('ruleParameters')
result_token = evt.get('resultToken')
invoking_event = evt.get('invokingEvent')
# Initilize
compliance_type = COMPLIACE_TYPE_NOT_APPLICABLE
annotation = None
# Get configuration item
configuration_item = invoking_event.get('configurationItem')
if not configuration_item:
logger.error('Configuration item is empty.')
return None
ordering_timestamp = configuration_item.get('captureTime')
resource_id = configuration_item.get('resourceId')
resource_type = configuration_item.get('resourceType')
creds = context.credentials
# Get compliace result
compliance_type, annotation = evaluate_configuration_item(rule_parameters, configuration_item, creds)
# Compliance result
evaluations = [
{
'complianceResourceId': resource_id,
'complianceResourceType': resource_type,
'orderingTimestamp': ordering_timestamp,
'complianceType': compliance_type,
'annotation': annotation
}
]
# Put evaluation result by invoking open api
put_evaluations(context, result_token, evaluations)
return evaluations
def evaluate_configuration_item(rule_parameters, configuration_item, creds):
# Initilize
compliance_type = COMPLIACE_TYPE_NOT_APPLICABLE
annotation = None
# Get resource type and configuration
resource_type = configuration_item['resourceType']
full_configuration = configuration_item['configuration']
# Check resource type
if MATCH_RESOURCE_TYPES and resource_type not in MATCH_RESOURCE_TYPES.split(','):
annotation = 'Resource type is {}, not in {}.'.format(
resource_type, MATCH_RESOURCE_TYPES)
return compliance_type, annotation
# Check configuration
if not full_configuration:
annotation = 'Configuration is empty.'
return compliance_type, annotation
# Parse to json object
configuration = parse_json(full_configuration)
if not configuration:
annotation = 'Configuration:{} in invald.'.format(full_configuration)
return compliance_type, annotation
# =========== Customer code start =========== #
if 'UserName' in configuration and configuration['UserName']:
user_name = configuration['UserName']
if rule_parameters and 'dangerousActions' in rule_parameters and rule_parameters['dangerousActions']:
actions = rule_parameters['dangerousActions'].split(',')
isvalid, reason = validate_user_bind_MFADevice(user_name, actions,
'only_custom' in rule_parameters and
rule_parameters['only_custom'] == 'true')
if isvalid:
compliance_type, annotation = COMPLIACE_TYPE_COMPLIANT, None
else:
compliance_type, annotation = COMPLIACE_TYPE_NON_COMPLIANT, reason
else:
compliance_type, annotation = COMPLIACE_TYPE_COMPLIANT, 'rule parameter:{dangerousActions} not specified'
else:
compliance_type, annotation = COMPLIACE_TYPE_INSUFFICIENT_DATA, 'ram user not named'
# =========== Customer code end =========== #
return compliance_type, annotation
def validate_user_bind_MFADevice(user_name, input_actions, only_custom):
client = AcsClient(AK, SK)
list_user_policy_req = ListPoliciesForUserRequest.ListPoliciesForUserRequest()
list_user_policy_req.set_UserName(user_name)
list_user_policy_res = None
try:
list_user_policy_res = client.do_action_with_exception(list_user_policy_req)
except ServerException as se:
if se.get_http_status() == 404 and se.get_error_code() == 'EntityNotExist.User':
return True, None
else:
return False, se.get_error_msg()
if list_user_policy_res is None:
return True, None
list_user_policy_json_res = json.loads(list_user_policy_res)
user_policies = None
try:
user_policies = list(filter(lambda x: x['PolicyType'] == 'Custom',
list_user_policy_json_res['Policies']['Policy'])) if only_custom else \
list_user_policy_json_res['Policies']['Policy']
except Exception as ex:
return True, None
if user_policies is None or len(user_policies) == 0:
return True, None
policy_action_list = list()
for policy in user_policies:
get_policy_req = GetPolicyRequest.GetPolicyRequest()
get_policy_req.set_PolicyName(policy['PolicyName'])
get_policy_req.set_PolicyType(policy['PolicyType'])
get_policy_res = None
try:
get_policy_res = client.do_action_with_exception(get_policy_req)
policy_document = json.loads(json.loads(get_policy_res)['DefaultPolicyVersion']['PolicyDocument'])
for action in list(map(lambda x: x['Action'],
list(filter(lambda x: x['Effect'] == 'Allow', policy_document['Statement'])))):
if type(action) is list:
policy_action_list.extend(action)
else:
policy_action_list.append(action)
except ServerException as se:
logger.error(se)
except Exception as ex:
logger.error(ex)
policy_action_set = set(policy_action_list)
if policy_action_set.intersection(set(input_actions)):
get_user_mfa_req = GetUserMFAInfoRequest.GetUserMFAInfoRequest()
get_user_mfa_req.set_UserName(user_name)
get_user_mfa_res = None
try:
get_user_mfa_res = client.do_action_with_exception(get_user_mfa_req)
if ('SerialNumber' not in get_user_mfa_res) or ('MFADevice' not in get_user_mfa_res):
return False, 'user MFADevice empty'
else:
return True, None
except ServerException as se:
return False, se.get_error_msg()
else:
return True, None
def validate_event(event):
if not event:
logger.error('Event is empty.')
evt = parse_json(event)
logger.info('Loading event: %s .' % evt)
if 'resultToken' not in evt:
logger.error('ResultToken is empty.')
return None
if 'ruleParameters' not in evt:
logger.error('RuleParameters is empty.')
return None
if 'invokingEvent' not in evt:
logger.error('InvokingEvent is empty.')
return None
return evt
def parse_json(content):
try:
return json.loads(content)
except Exception as e:
logger.error('Parse content:{} to json error:{}.'.format(content, e))
return None
def put_evaluations(context, result_token, evaluations):
# ak/sk, need AliyunConfigFullAccess policy
client = AcsClient(AK, SK, 'ap-southeast-1')
# Open api request
request = CommonRequest()
request.set_domain('config.ap-southeast-1.aliyuncs.com')
request.set_version('2019-01-08')
request.set_action_name('PutEvaluations')
request.add_body_params('ResultToken', result_token)
request.add_body_params('Evaluations', evaluations)
request.set_method('POST')
try:
response = client.do_action_with_exception(request)
logger.info('PutEvaluations with request: {}, response: {}.'.format(request, response))
except Exception as e:
logger.error('PutEvaluations error: %s' % e)
The code checks whether MFA is enabled for a RAM user. The following table describes
the main parameters in the code.
Parameter |
Description |
Example |
user_name |
The name of the RAM user. |
test123@169827232854****.onaliyun.com |
rule_parameters |
The parameters of the rule. |
dangerousActions |
input_actions |
The high-risk operations that you want to manage. |
ecs:*,oss:*,log:* |
configuration_item |
The configuration items of the resource. |
For more information, see What is the data structure of functions that can be used to create custom rules? |
only_custom |
Specifies whether to check only custom policies. Valid values:
|
true |