当您通过函数计算新建自定义规则时,如果规则被触发,配置审计会运行对应的规则函数对资源进行检测,并提供资源合规评估结果。本文通过Python示例为您介绍自定义规则函数的代码和入参。
函数代码
规则的本质是一段逻辑判断代码,这段代码存放在新建的函数中。在配置审计对资源的持续审计中,通过触发该函数的执行来实现对资源的评估。本函数代码主要有两个函数,具体如下:
handler
handler
为入口函数,即自定义规则触发时调用的函数。handler
在新建函数时进行设置。put_evaluations
put_evaluations
在handler
中调用,将评估结果回调至配置审计。
Python示例如下:
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
import json
import logging
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.request import CommonRequest
logger = logging.getLogger()
# 合规类型
COMPLIANCE_TYPE_COMPLIANT = 'COMPLIANT'
COMPLIANCE_TYPE_NON_COMPLIANT = 'NON_COMPLIANT'
COMPLIANCE_TYPE_NOT_APPLICABLE = 'NOT_APPLICABLE'
# 资源配置推送类型
CONFIGURATION_TYPE_COMMON = 'COMMON'
CONFIGURATION_TYPE_OVERSIZE = 'OVERSIZE'
CONFIGURATION_TYPE_NONE = 'NONE'
# 入口函数,完成业务逻辑编排和处理。
def handler(event, context):
"""
处理函数
:param event:事件
:param context:上下文
:return:评估结果
"""
# 校验Event,代码可直接复制。
evt = validate_event(event)
if not evt:
return None
rule_parameters = evt.get('ruleParameters')
result_token = evt.get('resultToken')
invoking_event = evt.get('invokingEvent')
ordering_timestamp = evt.get('orderingTimestamp')
# 资源配置信息。当规则触发机制设置为配置变更时,该入参有值。当您新建规则或手动执行规则时,配置审计会逐个调用函数触发对所有资源的评估。如果资源配置变更,配置审计根据变更的资源信息自动调用函数触发一次资源评估。
# 当规则触发机制设置为周期执行时,该入参为空。请您根据API自行实现待评估资源的查询逻辑。
configuration_item = invoking_event.get('configurationItem')
account_id = configuration_item.get('accountId')
resource_id = configuration_item.get('resourceId')
resource_type = configuration_item.get('resourceType')
region_id = configuration_item.get('regionId')
# 判断当前推送的资源配置信息是否大于配置(100 KB),如果是,则需要调用资源详情API进行完整数据查询。
configuration_type = invoking_event.get('configurationType')
if configuration_type and configuration_type == CONFIGURATION_TYPE_OVERSIZE:
resource_result = get_discovered_resource(context, resource_id, resource_type, region_id)
resource_json = json.loads(resource_result)
configuration_item["configuration"] = resource_json["DiscoveredResourceDetail"]["Configuration"]
# 对资源进行评估,需要根据实际业务自行实现评估逻辑,以下代码仅供参考。
compliance_type, annotation = evaluate_configuration_item(
rule_parameters, configuration_item)
# 设置评估结果,格式需符合以下示例要求。
evaluations = [
{
'accountId': account_id,
'complianceResourceId': resource_id,
'complianceResourceType': resource_type,
'complianceRegionId': region_id,
'orderingTimestamp': ordering_timestamp,
'complianceType': compliance_type,
'annotation': annotation
}
]
# 将评估结果返回并写入配置审计,代码可直接复制。
put_evaluations(context, result_token, evaluations)
return evaluations
# 根据入参和资源进行评估。需要根据实际业务自行实现评估逻辑,以下代码仅供参考。
def evaluate_configuration_item(rule_parameters, configuration_item):
"""
评估逻辑
:param rule_parameters:规则参数
:param configuration_item:配置项
:return:评估类型
"""
# 初始化返回值
compliance_type = COMPLIANCE_TYPE_NON_COMPLIANT
annotation = None
# 获取资源配置完整信息
full_configuration = configuration_item['configuration']
if not full_configuration:
annotation = 'Configuration is empty.'
return compliance_type, annotation
# 转换为JSON
configuration = parse_json(full_configuration)
if not configuration:
annotation = 'Configuration:{} is invalid.'.format(full_configuration)
return compliance_type, annotation
return compliance_type, annotation
def validate_event(event):
"""
校验Event
:param event:Event
:return:JSON对象
"""
if not event:
logger.error('Event is empty.')
evt = parse_json(event)
logger.info('Loading event: %s .' % evt)
if 'resultToken' not in evt:
logger.error('ResultToken is empty.')
return None
if 'ruleParameters' not in evt:
logger.error('RuleParameters is empty.')
return None
if 'invokingEvent' not in evt:
logger.error('InvokingEvent is empty.')
return None
return evt
def parse_json(content):
"""
JSON类型转换
:param content:JSON字符串
:return:JSON对象
"""
try:
return json.loads(content)
except Exception as e:
logger.error('Parse content:{} to json error:{}.'.format(content, e))
return None
# 评估结果返回,并写入配置审计,代码可直接复制。
def put_evaluations(context, result_token, evaluations):
"""
调用API返回并写入评估结果
:param context:函数计算上下文
:param result_token:回调令牌
:param evaluations:评估结果
:return: None
"""
# 输入AccessKey ID和AccessKey Secret,需具备权限AliyunConfigFullAccess。
client = AcsClient(
'LTAI4FxbrTniVtqg1FZW****',
'C0GXsd6UU6ECUmrsCgPXA6OEvy****',
'ap-southeast-1',
)
# 新建Request,并设置参数,config.ap-southeast-1.aliyuncs.com。
request = CommonRequest()
request.set_domain('config.ap-southeast-1.aliyuncs.com')
request.set_version('2019-01-08')
request.set_action_name('PutEvaluations')
request.add_body_params('ResultToken', result_token)
request.add_body_params('Evaluations', evaluations)
request.set_method('POST')
try:
response = client.do_action_with_exception(request)
logger.info('PutEvaluations with request: {}, response: {}.'.format(request, response))
except Exception as e:
logger.error('PutEvaluations error: %s' % e)
# 获取资源详情,代码可直接复制。
def get_discovered_resource(context, resource_id, resource_type, region_id):
"""
调用API获取资源配置详情
:param context:函数计算上下文
:param resource_id:资源ID
:param resource_type:资源类型
:param region_id:资源所属地域ID
:return: 资源详情
"""
# 输入AccessKey ID和AccessKey Secret,需具备权限AliyunConfigFullAccess或者AliyunConfigReadOnlyAccess。
client = AcsClient(
'LTAI4FxbrTniVtqg1FZW****',
'C0GXsd6UU6ECUmrsCgPXA6OEvy****',
'cn-shanghai',
)
request = CommonRequest()
request.set_domain('config.ap-southeast-1.aliyuncs.com')
request.set_version('2020-09-07')
request.set_action_name('GetDiscoveredResource')
request.add_query_param('ResourceId', resource_id)
request.add_query_param('ResourceType', resource_type)
request.add_query_param('Region', region_id)
request.set_method('GET')
try:
response = client.do_action_with_exception(request)
resource_result = str(response, encoding='utf-8')
return resource_result
except Exception as e:
logger.error('GetDiscoveredResource error: %s' % e)
函数入参
在函数中设置的入参信息保存在ruleParameters中,其他内容在规则触发时自动生成事件信息。JSON示例如下:
{
"orderingTimestamp": "评估执行开始时间戳",
"invokingEvent": {
"messageType": "消息类型",
"configurationType": "资源配置推送类型",
"configurationItem": {
"accountId": "资源所属阿里云账号ID",
"availabilityZone": "资源所属可用区",
"regionId": "资源所属地域",
"configuration": "资源的详细配置",
"captureTime": "配置审计发现资源变更事件并生成日志的时间戳",
"resourceCreationTime": "新建资源的时间戳",
"resourceStatus": "资源状态",
"resourceId": "资源ID",
"resourceName": "资源名称",
"resourceType": "资源类型",
"tags": "标签"
}
},
"ruleParameters": {
"key": "value"
},
"resultToken": "函数计算中的回调令牌"
}