Cloud Config allows you to create custom rules based on Function Compute to audit associated resources. If a rule is triggered, Cloud Config invokes the corresponding rule function to evaluate the associated resources and returns the compliance evaluation results of the resources. This topic describes the input parameters of a custom rule function and provides relevant sample Python code.
Function code
handler
The
handler
function is the entry function that is invoked when the rule is triggered. You must define thehandler
function when you develop the code of the rule function.put_evaluations
The
put_evaluations
function is invoked in thehandler
function to return compliance evaluation results to Cloud Config in callbacks.
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
import json
import logging
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.request import CommonRequest
logger = logging.getLogger()
# The compliance evaluation results.
COMPLIANCE_TYPE_COMPLIANT = 'COMPLIANT'
COMPLIANCE_TYPE_NON_COMPLIANT = 'NON_COMPLIANT'
COMPLIANCE_TYPE_NOT_APPLICABLE = 'NOT_APPLICABLE'
# The types of pushed configurations.
CONFIGURATION_TYPE_COMMON = 'COMMON'
CONFIGURATION_TYPE_OVERSIZE = 'OVERSIZE'
CONFIGURATION_TYPE_NONE = 'NONE'
# The entry function that orchestrates and processes business logic.
def handler(event, context):
"""
Process an event.
:param event: the event.
:param context: the context.
:return: the compliance evaluation result.
"""
# Check whether the event is valid. You can copy the following sample code based on your business scenario.
evt = validate_event(event)
if not evt:
return None
rule_parameters = evt.get('ruleParameters')
result_token = evt.get('resultToken')
invoking_event = evt.get('invokingEvent')
ordering_timestamp = evt.get('orderingTimestamp')
# Obtain the configurations of a resource. If a rule is set to be triggered upon configuration changes, the input parameters have values. After a rule that you create is triggered or you manually trigger a rule, Cloud Config invokes the rule function to evaluate associated resources one by one. If the configurations of a resource change, Cloud Config automatically invokes the corresponding rule function to evaluate the resource whose configurations change.
# If a rule is set to be periodically triggered, the input parameters are empty. In this case, you can query the configurations of a resource to be evaluated by calling the corresponding API operation.
configuration_item = invoking_event.get('configurationItem')
account_id = configuration_item.get('accountId')
resource_id = configuration_item.get('resourceId')
resource_type = configuration_item.get('resourceType')
region_id = configuration_item.get('regionId')
# Check whether the size of the pushed resource configurations is greater than 100 KB. If yes, call an API operation to query complete data.
configuration_type = invoking_event.get('configurationType')
if configuration_type and configuration_type == CONFIGURATION_TYPE_OVERSIZE:
resource_result = get_discovered_resource(context, resource_id, resource_type, region_id)
resource_json = json.loads(resource_result)
configuration_item["configuration"] = resource_json["DiscoveredResourceDetail"]["Configuration"]
# Evaluate resources. You must implement the evaluation logic based on the business scenario. The following code is for reference only:
compliance_type, annotation = evaluate_configuration_item(
rule_parameters, configuration_item)
# Set the compliance evaluation result. The result must use the syntax shown in the following sample:
evaluations = [
{
'accountId': account_id,
'complianceResourceId': resource_id,
'complianceResourceType': resource_type,
'complianceRegionId': region_id,
'orderingTimestamp': ordering_timestamp,
'complianceType': compliance_type,
'annotation': annotation
}
]
# Return the compliance evaluation result and write it to Cloud Config. You can directly use the following sample code in your own function code.
put_evaluations(context, result_token, evaluations)
return evaluations
# Evaluate a resource based on the input parameters and resource configuration. You must implement the evaluation logic based on the business scenario. The following code is only for reference:
def evaluate_configuration_item(rule_parameters, configuration_item):
"""
Evaluate a resource.
:param rule_parameters: the input parameters of the rule.
:param configuration_item: the configurations of the resource.
:return: the compliance evaluation result.
"""
# Initialize the return values.
compliance_type = COMPLIANCE_TYPE_NON_COMPLIANT
annotation = None
# Obtain the complete resource configurations.
full_configuration = configuration_item['configuration']
if not full_configuration:
annotation = 'Configuration is empty.'
return compliance_type, annotation
# Convert the configurations to a JSON object.
configuration = parse_json(full_configuration)
if not configuration:
annotation = 'Configuration:{} is invalid.'.format(full_configuration)
return compliance_type, annotation
return compliance_type, annotation
def validate_event(event):
"""
Check whether an event is valid.
:param event: the event.
:return: the JSON object.
"""
if not event:
logger.error('Event is empty.')
evt = parse_json(event)
logger.info('Loading event: %s .' % evt)
if 'resultToken' not in evt:
logger.error('ResultToken is empty.')
return None
if 'ruleParameters' not in evt:
logger.error('RuleParameters is empty.')
return None
if 'invokingEvent' not in evt:
logger.error('InvokingEvent is empty.')
return None
return evt
def parse_json(content):
"""
Convert a JSON string to a JSON object.
:param content: the JSON string.
:return: the JSON object.
"""
try:
return json.loads(content)
except Exception as e:
logger.error('Parse content:{} to json error:{}.'.format(content, e))
return None
# Return the compliance evaluation result and write it to Cloud Config. You can directly use the following sample code in your own function code.
def put_evaluations(context, result_token, evaluations):
"""
Call Cloud Config API to return the compliance evaluation result and write it to Cloud Config.
:param context: the Function Compute context.
:param result_token: the result token.
:param evaluations: the compliance evaluation result.
:return: None
"""
# Enter the AccessKey ID and AccessKey secret. Your account must be granted the AliyunConfigFullAccess permission.
client = AcsClient(
'LTAI4FxbrTniVtqg1FZW****',
'C0GXsd6UU6ECUmrsCgPXA6OEvy****',
'ap-southeast-1',
)
# Create a request and configure the parameters. In this example, the domain is set to config.ap-southeast-1.aliyuncs.com.
request = CommonRequest()
request.set_domain('config.ap-southeast-1.aliyuncs.com')
request.set_version('2019-01-08')
request.set_action_name('PutEvaluations')
request.add_body_params('ResultToken', result_token)
request.add_body_params('Evaluations', evaluations)
request.set_method('POST')
try:
response = client.do_action_with_exception(request)
logger.info('PutEvaluations with request: {}, response: {}.'.format(request, response))
except Exception as e:
logger.error('PutEvaluations error: %s' % e)
# Obtain the details of a resource. You can copy the following sample code based on your business scenario.
def get_discovered_resource(context, resource_id, resource_type, region_id):
"""
Call an API operation to obtain the details of a resource.
:param context: the Function Compute context.
:param resource_id: the ID of the resource.
:param resource_type: the type of the resource.
:param region_id: the ID of the region where the resource resides.
:return: the details of the resource.
"""
# Enter the AccessKey ID and AccessKey secret. Your account must be granted the AliyunConfigFullAccess or AliyunConfigReadOnlyAccess permission.
client = AcsClient(
'LTAI4FxbrTniVtqg1FZW****',
'C0GXsd6UU6ECUmrsCgPXA6OEvy****',
'cn-shanghai',
)
request = CommonRequest()
request.set_domain('config.ap-southeast-1.aliyuncs.com')
request.set_version('2020-09-07')
request.set_action_name('GetDiscoveredResource')
request.add_query_param('ResourceId', resource_id)
request.add_query_param('ResourceType', resource_type)
request.add_query_param('Region', region_id)
request.set_method('GET')
try:
response = client.do_action_with_exception(request)
resource_result = str(response, encoding='utf-8')
return resource_result
except Exception as e:
logger.error('GetDiscoveredResource error: %s' % e)
Input parameters
{
"orderingTimestamp": "Timestamp when the evaluation starts",
"invokingEvent": {
"messageType": "Message type",
"configurationType": "Type of pushed configurations",
"configurationItem": {
"accountId": "The ID of the Alibaba Cloud account to which the resource belongs",
"availabilityZone": "The ID of the zone where the resource resides",
"regionId": "The ID of the region where the resource resides",
"configuration": "Resource configurations",
"captureTime": "Timestamp when Cloud Config detected a resource change event and generated a log",
"resourceCreationTime": "Timestamp when the resource was created",
"resourceStatus": "Resource status",
"resourceId": "Resource ID",
"resourceName": "Resource name",
"resourceType": "Resource type",
"tags": "Tags"
}
},
"ruleParameters": {
"key": "value"
},
resultToken: "Result token"
}