Cloud Config allows you to create rules based on Function Compute to audit associated resources. When a rule is triggered, Cloud Config invokes the corresponding rule function to evaluate the associated resources and returns the compliance evaluation results of the resources. This topic provides Python examples of the code and input parameters of a custom rule function.

Function code

A rule is a piece of logical judgment code that is stored in a rule function. Cloud Config audits resources based on rules. When a rule is triggered, Cloud Config invokes the corresponding rule function to evaluate the associated resources. In this example, the code of the custom rule function contains the following key functions:
  • handler

    The handler function is the entry function that is invoked when the rule is triggered. You must define the handler function when you develop the code of the rule function.

  • put_evaluations

    The put_evaluations function is invoked in the handler function to return compliance evaluation results to Cloud Config in callbacks.

You can use the following sample Python code to create a rule:
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
import json
import logging

from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.request import CommonRequest

logger = logging.getLogger()
# The compliance evaluation results.
COMPLIANCE_TYPE_COMPLIANT = 'COMPLIANT'
COMPLIANCE_TYPE_NON_COMPLIANT = 'NON_COMPLIANT'
COMPLIANCE_TYPE_NOT_APPLICABLE = 'NOT_APPLICABLE'
# The types of pushed configurations.
CONFIGURATION_TYPE_COMMON = 'COMMON'
CONFIGURATION_TYPE_OVERSIZE = 'OVERSIZE'
CONFIGURATION_TYPE_NONE = 'NONE'


# The entry function that orchestrates and processes business logic. 
def handler(event, context):
    """
    Process an event.
    :param event: the event.
    :param context: the context.
    :return: the compliance evaluation result.
    """
    # Check whether the event is valid. You can directly use the following sample code in your own function code. 
    evt = validate_event(event)
    if not evt:
        return None

    rule_parameters = evt.get('ruleParameters')
    result_token = evt.get('resultToken')
    invoking_event = evt.get('invokingEvent')
    ordering_timestamp = evt.get('orderingTimestamp')

    # Obtain the configurations of a resource. If a rule is set to be triggered upon configuration changes, the input parameters have values. After a rule that you create is triggered or you manually trigger a rule, Cloud Config invokes the rule function to evaluate associated resources one by one. If the configurations of a resource change, Cloud Config automatically invokes the corresponding rule function to evaluate the resource whose configurations change. 
    # If a rule is set to be periodically triggered, the input parameters are empty. In this case, you can query the configurations of a resource to be evaluated by calling the corresponding API operation. 
    configuration_item = invoking_event.get('configurationItem')
    account_id = configuration_item.get('accountId')
    resource_id = configuration_item.get('resourceId')
    resource_type = configuration_item.get('resourceType')
    region_id = configuration_item.get('regionId')

    # Check whether the size of the pushed resource configurations is greater than 100 KB. If yes, call an API operation to query complete data. 
    configuration_type = invoking_event.get('configurationType')
    if configuration_type and configuration_type == CONFIGURATION_TYPE_OVERSIZE:
        resource_result = get_discovered_resource(context, resource_id, resource_type, region_id)
        resource_json = json.loads(resource_result)
        configuration_item["configuration"] = resource_json["DiscoveredResourceDetail"]["Configuration"]

    # Evaluate resources. You must implement the evaluation logic based on the business scenario. The following code is only for reference: 
    compliance_type, annotation = evaluate_configuration_item(
        rule_parameters, configuration_item)

    # Set the compliance evaluation result. The result must use the syntax shown in the following sample: 
    evaluations = [
        {
            'accountId': account_id,
            'complianceResourceId': resource_id,
            'complianceResourceType': resource_type,
            'complianceRegionId': region_id,
            'orderingTimestamp': ordering_timestamp,
            'complianceType': compliance_type,
            'annotation': annotation
        }
    ]

    # Return the compliance evaluation result and write it to Cloud Config. You can directly use the following sample code in your own function code. 
    put_evaluations(context, result_token, evaluations)
    return evaluations


# Evaluate a resource based on the input parameters and resource configuration. You must implement the evaluation logic based on the business scenario. The following code is only for reference: 
def evaluate_configuration_item(rule_parameters, configuration_item):
    """
    Evaluate a resource.
    :param rule_parameters: the input parameters of the rule.
    :param configuration_item: the configurations of the resource.
    :return: the compliance evaluation result.
    """
    # Initialize the return values.
    compliance_type = COMPLIANCE_TYPE_NON_COMPLIANT
    annotation = None

    # Obtain the complete resource configurations.
    full_configuration = configuration_item['configuration']
    if not full_configuration:
        annotation = 'Configuration is empty.'
        return compliance_type, annotation

    # Convert the configurations to a JSON object.
    configuration = parse_json(full_configuration)
    if not configuration:
        annotation = 'Configuration:{} is invalid.'.format(full_configuration)
        return compliance_type, annotation
    return compliance_type, annotation


def validate_event(event):
    """
    Check whether an event is valid.
    :param event: the event.
    :return: the JSON object.
    """
    if not event:
        logger.error('Event is empty.')
    evt = parse_json(event)
    logger.info('Loading event: %s .' % evt)

    if 'resultToken' not in evt:
        logger.error('ResultToken is empty.')
        return None
    if 'ruleParameters' not in evt:
        logger.error('RuleParameters is empty.')
        return None
    if 'invokingEvent' not in evt:
        logger.error('InvokingEvent is empty.')
        return None
    return evt


def parse_json(content):
    """
    Convert a JSON string to a JSON object.
    :param content: the JSON string.
    :return: the JSON object.
    """
    try:
        return json.loads(content)
    except Exception as e:
        logger.error('Parse content:{} to json error:{}.'.format(content, e))
        return None


# Return the compliance evaluation result and write it to Cloud Config. You can directly use the following sample code in your own function code. 
def put_evaluations(context, result_token, evaluations):
    """
    Call Cloud Config API to return the compliance evaluation result and write it to Cloud Config.
    :param context: the Function Compute context.
    :param result_token: the result token.
    :param evaluations: the compliance evaluation result.
    :return: None
    """
    # Enter the AccessKey ID and AccessKey secret. Your account must have the AliyunConfigFullAccess permission. 
    client = AcsClient(
        'LTAI4FxbrTniVtqg1FZW****',
        'C0GXsd6UU6ECUmrsCgPXA6OEvy****',
        'ap-southeast-1',
    )

    # Create a request and set the parameters. In this example, the domain is set to config.ap-southeast-1.aliyuncs.com. 
    request = CommonRequest()
    request.set_domain('config.ap-southeast-1.aliyuncs.com')
    request.set_version('2019-01-08')
    request.set_action_name('PutEvaluations')
    request.add_body_params('ResultToken', result_token)
    request.add_body_params('Evaluations', evaluations)
    request.set_method('POST')

    try:
        response = client.do_action_with_exception(request)
        logger.info('PutEvaluations with request: {}, response: {}.'.format(request, response))
    except Exception as e:
        logger.error('PutEvaluations error: %s' % e)


# Obtain the details of a resource. You can directly use the following sample code in your own function code. 
def get_discovered_resource(context, resource_id, resource_type, region_id):
    """
    Call an API operation to obtain the details of a resource.
    :param context: the Function Compute context.
    :param resource_id: the ID of the resource.
    :param resource_type: the type of the resource.
    :param region_id: the ID of the region in which the resource resides.
    :return: the details of the resource.
    """
    # Enter the AccessKey ID and AccessKey secret. Your account must have the AliyunConfigFullAccess or AliyunConfigReadOnlyAccess permission. 
    client = AcsClient(
        'LTAI4FxbrTniVtqg1FZW****',
        'C0GXsd6UU6ECUmrsCgPXA6OEvy****',
        'cn-shanghai',
    )

    request = CommonRequest()
    request.set_domain('config.ap-southeast-1.aliyuncs.com')
    request.set_version('2020-09-07')
    request.set_action_name('GetDiscoveredResource')
    request.add_query_param('ResourceId', resource_id)
    request.add_query_param('ResourceType', resource_type)
    request.add_query_param('Region', region_id)
    request.set_method('GET')

    try:
        response = client.do_action_with_exception(request)
        resource_result = str(response, encoding='utf-8')
        return resource_result
    except Exception as e:
        logger.error('GetDiscoveredResource error: %s' % e)

                

Input parameters

In an event message, the ruleParameters field stores the input parameters of a rule function. Other fields provide the event information when the custom rule is triggered. The following code shows the fields in an event message in the JSON format:
{
    "orderingTimestamp": "Timestamp when the evaluation starts",
    "invokingEvent": {
        "messageType": "Message type",
        "configurationType": "Type of pushed configurations",
        "configurationItem": {
            "accountId": "ID of your Alibaba Cloud account",
            "arn": "Resource ARN",
            "availabilityZone": "Zone",
            "regionId": "Region ID",
            "configuration": "Detailed resource configurations",
            "captureTime": "Timestamp when Cloud Config detects a resource change event and generates a log",
            "resourceCreationTime": "Timestamp when the resource was created",
            "resourceStatus": "Resource status",
            "resourceId": "Resource ID",
            "resourceName": "Resource name",
            "resourceType": "Resource type",
            "tags": "Tags"
        }
    },
    "ruleParameters": {
        "key": "value"
    },
    resultToken: "Result token"
}