Cloud Config provides multiple managed rules based on Baseline for Classified Protection of Cybersecurity 2.0 and platform compliance experience. If these managed rules cannot meet your demands, you can submit a ticket to contact Alibaba Cloud after-sales engineers, or develop your own custom rules.

Background information

This topic describes how to create a custom rule through the following sample rule:
  • Rule: checks whether Elastic Compute Service (ECS) instances are started from a specific image.
  • Programming language: Python 3.

Differences between custom rules and managed rules

For more information about the managed rules supported by Alibaba Cloud, see Managed rules. By using custom rules, you can better support personalized compliance scenarios. Custom rules work in the same way as managed rules, with the following differences:
  • A managed rule is a rule function that has been created in Function Compute. To use a managed rule, you can directly select the rule in the Cloud Config console.
  • To use a custom rule, you must first create a rule function in Function Compute. Then, you can enter the Alibaba Cloud Resource Name (ARN) of the function in the Cloud Config console to create the custom rule.

Create a custom rule

  1. Create a function in the Function Compute console.

    For more information, see Create a function.

    After you create a function, a function ARN is generated. You can view the ARN on the Overview page of the function.Function properties
    Note Currently, Function Compute supports the following programming languages: Java 8, Node.js 6, Node.js 8, Python 2.7, Python 3, PHP 7.2, and .NET Core 2.1. To create a function in Java 8 or .NET Core, you can only upload a code package or use a code package stored in Object Storage Service (OSS). To create functions in other languages, you can upload code packages, use code packages stored in OSS, or edit function code online. For more information, see Function Compute.
  2. Create a rule in the Cloud Config console. When creating the rule, select the region, service, and function in the Function ARN section.
    For more information, see Create a rule.

Develop the code of a rule function

A rule is essentially a piece of logic judgment code that is stored in a rule function. When the rule is used to evaluate resources, the rule function is triggered to implement the evaluation.

  • The following is the code of the sample rule. The code contains two key functions. One of them is handler, which is the entry function that is called when the custom rule is triggered. You must define the handler function when you develop the code of the rule function.Create a function
  • The other function is put_evaluations, which is called in handler to return the evaluation result.
#! /usr/bin/env python
# -*- encoding: utf-8 -*-
import logging
import json
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.auth.credentials import StsTokenCredential
from aliyunsdkcore.acs_exception.exceptions import ClientException
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkcore.request import CommonRequest

logger = logging.getLogger()

# The resource types to which the rule function applies. Separate resource types with commas (,).
MATCH_RESOURCE_TYPES = ''

# The compliance types.
COMPLIACE_TYPE_COMPLIANT = 'COMPLIANT'
COMPLIACE_TYPE_NON_COMPLIANT = 'NON_COMPLIANT'
COMPLIACE_TYPE_NOT_APPLICABLE = 'NOT_APPLICABLE'
COMPLIACE_TYPE_INSUFFICIENT_DATA = 'INSUFFICIENT_DATA'


def handler(event, context):
    """
    Process an event.
    :param event: the event.
    :param context: the context.
    :return: the evaluation result.
    """
    # Check whether the event is valid.
    evt = validate_event(event)
    if not evt:
        return None

    rule_parameters = evt.get('ruleParameters')
    result_token = evt.get('resultToken')
    invoking_event = evt.get('invokingEvent')

    # Initialize the return values.
    compliance_type = COMPLIACE_TYPE_NOT_APPLICABLE
    annotation = None

    # Obtain the configuration item.
    configuration_item = invoking_event.get('configurationItem')
    if not configuration_item:
        logger.error('Configuration item is empty.')
        return None

    ordering_timestamp = configuration_item.get('captureTime')
    resource_id = configuration_item.get('resourceId')
    resource_type = configuration_item.get('resourceType')

    # Obtain the evaluation result.
    compliance_type, annotation = evaluate_configuration_item(
        rule_parameters, configuration_item)

    # The evaluation result.
    evaluations = [
        {
            'complianceResourceId': resource_id,
            'complianceResourceType': resource_type,
            'orderingTimestamp': ordering_timestamp,
            'complianceType': compliance_type,
            'annotation': annotation
        }
    ]

    # Return the evaluation result.
    put_evaluations(context, result_token, evaluations)

    return evaluations


def evaluate_configuration_item(rule_parameters, configuration_item):
    """
    Evaluate a configuration item.
    :param rule_parameters: the rule parameters.
    :param configuration_item: the configuration item.
    :return: the compliance type and annotation.
    """
    # Initialize the return values.
    compliance_type = COMPLIACE_TYPE_NOT_APPLICABLE
    annotation = None

    # Obtain the resource type and configuration.
    resource_type = configuration_item['resourceType']
    full_configuration = configuration_item['configuration']

    # Check whether the resource type is valid.
    if MATCH_RESOURCE_TYPES and resource_type not in MATCH_RESOURCE_TYPES.split(','):
        annotation = 'Resource type is {}, not in {}.'.format(
            resource_type, MATCH_RESOURCE_TYPES)
        return compliance_type, annotation

    # Check whether the configuration is empty.
    if not full_configuration:
        annotation = 'Configuration is empty.'
        return compliance_type, annotation

    # Convert the configuration to a JSON object.
    configuration = parse_json(full_configuration)
    if not configuration:
        annotation = 'Configuration:{} in invald.'.format(full_configuration)
        return compliance_type, annotation

    # =========== Your code starts here. =========== #


    # =========== Your code ends here. =========== #

    return compliance_type, annotation


def validate_event(event):
    """
    Check whether an event is valid.
    :param event: the event.
    :return: the JSON object.
    """
    if not event:
        logger.error('Event is empty.')

    evt = parse_json(event)
    logger.info('Loading event: %s .' % evt)

    if 'resultToken' not in evt:
        logger.error('ResultToken is empty.')
        return None

    if 'ruleParameters' not in evt:
        logger.error('RuleParameters is empty.')
        return None

    if 'invokingEvent' not in evt:
        logger.error('InvokingEvent is empty.')
        return None

    return evt


def parse_json(content):
    """
    Convert a JSON string to a JSON object.
    :param content: the JSON string.
    :return: the JSON object.
    """
    try:
        return json.loads(content)
    except Exception as e:
        logger.error('Parse content:{} to json error:{}.'.format(content, e))
        return None


def put_evaluations(context, result_token, evaluations):
    """
    Call the Cloud Config API to return the evaluation result.
    :param context: the Function Compute context.
    :param result_token: the result token.
    :param evaluations: the evaluation result.
    :return: None.
    """
    # You must enter your own AccessKey ID and AccessKey secret and have the AliyunConfigFullAccess permission.
    client = AcsClient(
        'XXXX',
        'XXXXXXX',
        'cn-shanghai',
    )

    # Create a request and set required parameters. Domain is set to config.cn-shanghai.aliyuncs.com or config.ap-southeast-1.aliyuncs.com
    request = CommonRequest()
    request.set_domain('config.ap-southeast-1.aliyuncs.com')
    request.set_version('2019-01-08')
    request.set_action_name('PutEvaluations')
    request.add_body_params('ResultToken', result_token)
    request.add_body_params('Evaluations', evaluations)
    request.set_method('POST')

    try:
        response = client.do_action_with_exception(request)
        logger.info('PutEvaluations with request: {}, response: {}.'.format(request, response))
    except Exception as e:
        logger.error('PutEvaluations error: %s' % e)

            

Input parameters of a rule function

event: the event information. The ruleParameters field stores the rule-specific parameters. Other fields provides the event information when the custom rule is triggered. JSON format:

{
    version:"version number",
    orderingTimestamp:"command execution start time",
    invokingEvent:{
      messageType:"message type",
        configurationItem:{
          "accountId":"user ID",
            "arn":"resource ARN",
            "availabilityZone":"zone",
            "regionId":"region ID",
            "configuration":"the configuration information of the resource in the form of a string, which varies for different resources",
            "configurationDiff":"configuration changes",
            "relationship":"relationship",
            "relationshipDiff":"relationship changes",
            "captureTime":"capture time",
            "resourceCreationTime":"resource creation time",
            "resourceStatus":"resource status",
            "resourceId":"resource ID",
      "resourceName":"resource name",
            "resourceType":"resource type",
            "supplementaryConfiguration":"supplementary configuration",
            "tags":"tags"
        },
        notificationCreationTimestamp:"event message generation time"
    },
    ruleParameters:{
      {"key":"value"}
    },
    resultToken:"the result token in Function Compute" 
}
            

context: the context information that is automatically set when the custom rule is triggered.

  • context.credentials.access_key_id:"AccessKey ID"
  • context.credentials.access_key_secret:"AccessKey secret"
  • context.region:"region information"

After you create the rule function in Function Compute, copy the ARN of the function and continue to create the custom rule in Cloud Config.