Cloud Config allows you to create custom rules based on Function Compute to audit associated resources. If a rule is triggered, Cloud Config invokes the corresponding rule function to evaluate the associated resources and returns the compliance evaluation results of the resources. This topic describes the input parameters of a custom rule function and provides relevant sample Python code.

Function code

A rule is a snippet of logical judgment code that is stored in a rule function. Cloud Config audits resources based on rules. When a rule is triggered, Cloud Config invokes the corresponding rule function to evaluate the associated resources. In this example, the code of the custom rule function contains the following key functions:
  • handler

    The handler function is the entry function that is invoked when the rule is triggered. You must define the handler function when you develop the code of the rule function.

  • put_evaluations

    The put_evaluations function is invoked in the handler function to return compliance evaluation results to Cloud Config in callbacks.

You can use the following sample Python code to create a rule:
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
import json
import logging

from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.request import CommonRequest

logger = logging.getLogger()
# The compliance evaluation results.
COMPLIANCE_TYPE_COMPLIANT = 'COMPLIANT'
COMPLIANCE_TYPE_NON_COMPLIANT = 'NON_COMPLIANT'
COMPLIANCE_TYPE_NOT_APPLICABLE = 'NOT_APPLICABLE'
# The types of pushed configurations.
CONFIGURATION_TYPE_COMMON = 'COMMON'
CONFIGURATION_TYPE_OVERSIZE = 'OVERSIZE'
CONFIGURATION_TYPE_NONE = 'NONE'


# The entry function that orchestrates and processes business logic. 
def handler(event, context):
    """
    Process an event.
    :param event: the event.
    :param context: the context.
    :return: the compliance evaluation result.
    """
    # Check whether the event is valid. You can copy the following sample code based on your business scenario. 
    evt = validate_event(event)
    if not evt:
        return None

    rule_parameters = evt.get('ruleParameters')
    result_token = evt.get('resultToken')
    invoking_event = evt.get('invokingEvent')
    ordering_timestamp = evt.get('orderingTimestamp')

    # Obtain the configurations of a resource. If a rule is set to be triggered upon configuration changes, the input parameters have values. After a rule that you create is triggered or you manually trigger a rule, Cloud Config invokes the rule function to evaluate associated resources one by one. If the configurations of a resource change, Cloud Config automatically invokes the corresponding rule function to evaluate the resource whose configurations change. 
    # If a rule is set to be periodically triggered, the input parameters are empty. In this case, you can query the configurations of a resource to be evaluated by calling the corresponding API operation. 
    configuration_item = invoking_event.get('configurationItem')
    account_id = configuration_item.get('accountId')
    resource_id = configuration_item.get('resourceId')
    resource_type = configuration_item.get('resourceType')
    region_id = configuration_item.get('regionId')

    # Check whether the size of the pushed resource configurations is greater than 100 KB. If yes, call an API operation to query complete data. 
    configuration_type = invoking_event.get('configurationType')
    if configuration_type and configuration_type == CONFIGURATION_TYPE_OVERSIZE:
        resource_result = get_discovered_resource(context, resource_id, resource_type, region_id)
        resource_json = json.loads(resource_result)
        configuration_item["configuration"] = resource_json["DiscoveredResourceDetail"]["Configuration"]

    # Evaluate resources. You must implement the evaluation logic based on the business scenario. The following code is for reference only: 
    compliance_type, annotation = evaluate_configuration_item(
        rule_parameters, configuration_item)

    # Set the compliance evaluation result. The result must use the syntax shown in the following sample: 
    evaluations = [
        {
            'accountId': account_id,
            'complianceResourceId': resource_id,
            'complianceResourceType': resource_type,
            'complianceRegionId': region_id,
            'orderingTimestamp': ordering_timestamp,
            'complianceType': compliance_type,
            'annotation': annotation
        }
    ]

    # Return the compliance evaluation result and write it to Cloud Config. You can directly use the following sample code in your own function code. 
    put_evaluations(context, result_token, evaluations)
    return evaluations


# Evaluate a resource based on the input parameters and resource configuration. You must implement the evaluation logic based on the business scenario. The following code is only for reference: 
def evaluate_configuration_item(rule_parameters, configuration_item):
    """
    Evaluate a resource.
    :param rule_parameters: the input parameters of the rule.
    :param configuration_item: the configurations of the resource.
    :return: the compliance evaluation result.
    """
    # Initialize the return values.
    compliance_type = COMPLIANCE_TYPE_NON_COMPLIANT
    annotation = None

    # Obtain the complete resource configurations.
    full_configuration = configuration_item['configuration']
    if not full_configuration:
        annotation = 'Configuration is empty.'
        return compliance_type, annotation

    # Convert the configurations to a JSON object.
    configuration = parse_json(full_configuration)
    if not configuration:
        annotation = 'Configuration:{} is invalid.'.format(full_configuration)
        return compliance_type, annotation
    return compliance_type, annotation


def validate_event(event):
    """
    Check whether an event is valid.
    :param event: the event.
    :return: the JSON object.
    """
    if not event:
        logger.error('Event is empty.')
    evt = parse_json(event)
    logger.info('Loading event: %s .' % evt)

    if 'resultToken' not in evt:
        logger.error('ResultToken is empty.')
        return None
    if 'ruleParameters' not in evt:
        logger.error('RuleParameters is empty.')
        return None
    if 'invokingEvent' not in evt:
        logger.error('InvokingEvent is empty.')
        return None
    return evt


def parse_json(content):
    """
    Convert a JSON string to a JSON object.
    :param content: the JSON string.
    :return: the JSON object.
    """
    try:
        return json.loads(content)
    except Exception as e:
        logger.error('Parse content:{} to json error:{}.'.format(content, e))
        return None


# Return the compliance evaluation result and write it to Cloud Config. You can directly use the following sample code in your own function code. 
def put_evaluations(context, result_token, evaluations):
    """
    Call Cloud Config API to return the compliance evaluation result and write it to Cloud Config.
    :param context: the Function Compute context.
    :param result_token: the result token.
    :param evaluations: the compliance evaluation result.
    :return: None
    """
    # Enter the AccessKey ID and AccessKey secret. Your account must be granted the AliyunConfigFullAccess permission. 
    client = AcsClient(
        'LTAI4FxbrTniVtqg1FZW****',
        'C0GXsd6UU6ECUmrsCgPXA6OEvy****',
        'ap-southeast-1',
    )

    # Create a request and configure the parameters. In this example, the domain is set to config.ap-southeast-1.aliyuncs.com. 
    request = CommonRequest()
    request.set_domain('config.ap-southeast-1.aliyuncs.com')
    request.set_version('2019-01-08')
    request.set_action_name('PutEvaluations')
    request.add_body_params('ResultToken', result_token)
    request.add_body_params('Evaluations', evaluations)
    request.set_method('POST')

    try:
        response = client.do_action_with_exception(request)
        logger.info('PutEvaluations with request: {}, response: {}.'.format(request, response))
    except Exception as e:
        logger.error('PutEvaluations error: %s' % e)


# Obtain the details of a resource. You can copy the following sample code based on your business scenario. 
def get_discovered_resource(context, resource_id, resource_type, region_id):
    """
    Call an API operation to obtain the details of a resource.
    :param context: the Function Compute context.
    :param resource_id: the ID of the resource.
    :param resource_type: the type of the resource.
    :param region_id: the ID of the region where the resource resides.
    :return: the details of the resource.
    """
    # Enter the AccessKey ID and AccessKey secret. Your account must be granted the AliyunConfigFullAccess or AliyunConfigReadOnlyAccess permission. 
    client = AcsClient(
        'LTAI4FxbrTniVtqg1FZW****',
        'C0GXsd6UU6ECUmrsCgPXA6OEvy****',
        'cn-shanghai',
    )

    request = CommonRequest()
    request.set_domain('config.ap-southeast-1.aliyuncs.com')
    request.set_version('2020-09-07')
    request.set_action_name('GetDiscoveredResource')
    request.add_query_param('ResourceId', resource_id)
    request.add_query_param('ResourceType', resource_type)
    request.add_query_param('Region', region_id)
    request.set_method('GET')

    try:
        response = client.do_action_with_exception(request)
        resource_result = str(response, encoding='utf-8')
        return resource_result
    except Exception as e:
        logger.error('GetDiscoveredResource error: %s' % e)

                

Input parameters

In an event message, the ruleParameters field stores the input parameters of a rule function. Other fields provide the event information when the custom rule is triggered. The following code shows the fields in an event message in the JSON format:
{
    "orderingTimestamp": "Timestamp when the evaluation starts",
    "invokingEvent": {
        "messageType": "Message type",
        "configurationType": "Type of pushed configurations",
        "configurationItem": {
            "accountId": "The ID of the Alibaba Cloud account to which the resource belongs",
            "availabilityZone": "The ID of the zone where the resource resides",
            "regionId": "The ID of the region where the resource resides",
            "configuration": "Resource configurations",
            "captureTime": "Timestamp when Cloud Config detected a resource change event and generated a log",
            "resourceCreationTime": "Timestamp when the resource was created",
            "resourceStatus": "Resource status",
            "resourceId": "Resource ID",
            "resourceName": "Resource name",
            "resourceType": "Resource type",
            "tags": "Tags"
        }
    },
    "ruleParameters": {
        "key": "value"
    },
    resultToken: "Result token"
}