All Products
Search
Document Center

Elastic Compute Service:Automatically audit and remediate non-compliant security group rules

Last Updated:Nov 24, 2025

If a security group rule opens vulnerable ports, such as port 22 for the Secure Shell Protocol (SSH) and port 3389 for the Remote Desktop Protocol (RDP), to all IP addresses (0.0.0.0/0), your system is exposed to critical security threats. You can use Cloud Config to continuously monitor security group configurations and automatically remediate non-compliant configuration items to ensure system security.

Background

In an enterprise cloud environment, security groups are a core method for controlling network traffic and defining access rules for server instances. In complex, multi-instance scenarios, security group rules may contain the following risky configurations due to operations and maintenance (O&M) oversights or flawed policy designs:

  • Vulnerable ports exposed to all IP ranges: For example, opening SSH port 22, RDP port 3389, or database service ports such as 3306 and 6379 to the Internet (0.0.0.0/0). This directly exposes instances to the Internet, making them primary targets for brute-force attacks and data breaches.

  • Mixing internal and public services: Failing to distinguish between instance roles, such as public web services and internal databases. This can lead to incorrectly granting full IP access to non-public instances, creating a risk of lateral movement within the internal network.

Solution

You can create rules in Cloud Config to continuously monitor changes to security group rules. These rules are set to detect if vulnerable ports, such as 22, 3389, and 3306, are open to the public. If a new or modified security group rule allows these ports to be accessed from the Internet, a compliance audit is triggered. Cloud Config then automatically invokes Function Compute to run custom remediation logic. This logic uses an Alibaba Cloud software development kit (SDK) to adjust security group settings, for example, by deleting the risky rules. After remediation, the system re-evaluates the relevant rules to confirm the fix. You can also view the remediation details for non-compliant resources in the Cloud Config console. The entire process is transparent and traceable, which effectively prevents unauthorized public access. This solution improves O&M efficiency, reduces manual intervention, and ensures that resource configurations always meet security and compliance requirements. This enhances the security and stability of your environment.

image

Create a Cloud Config rule and a Function Compute remediation function

This solution uses Terraform to create a Cloud Config rule and Function Compute to automatically remediate non-compliant resources. This automates the detection and management of cloud resource compliance.

Note

If you are a Resource Access Management (RAM) user, grant the following permissions to the RAM user. For more information, see Grant permissions to a RAM user.

RAM access policy

This custom policy allows users to manage Elastic Compute Service (ECS) security group rules, and Function Compute services and functions.

{
  "Version": "1",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "iacservice:CreateExplorerModuleVersion",
        "iacservice:GetExplorerModule",
        "iacservice:CreateExplorerModule",
        "iacservice:ListExplorerModules",
        "iacservice:UpdateExplorerModuleAttribute",
        "iacservice:DeleteExplorerModule"
      ],
      "Resource": "acs:iacservice:*:*:explorermodule/*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "iacservice:CreateExplorerTask",
        "iacservice:UpdateExplorerTaskAttribute",
        "iacservice:GetExplorerTask",
        "iacservice:DeleteExplorerTask"
      ],
      "Resource": "acs:iacservice:*:*:explorertask/*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "iacservice:CreateJob",
        "iacservice:GetJob",
        "iacservice:listJobs",
        "iacservice:OperateJob"
      ],
      "Resource": "acs:iacservice:*:*:explorertask/*/job/*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "iacservice:ListResources",
        "iacservice:ListExplorerHistories",
        "iacservice:CreateExplorerHistory",
        "iacservice:ExportTerraformCode"
      ],
      "Resource": "*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "ecs:RevokeSecurityGroup",
        "ecs:DescribeSecurityGroups",
        "ecs:DescribeSecurityGroupAttributes"
      ],
      "Resource": "*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "fc:CreateService",
        "fc:DeleteService",
        "fc:UpdateService",
        "fc:CreateFunction",
        "fc:DeleteFunction",
        "fc:UpdateFunction",
        "fc:InvokeFunction",
        "fc:ListServices",
        "fc:ListFunctions",
        "fc:GetService",
        "fc:GetFunction"
      ],
      "Resource": "*"
    },
    {
      "Effect": "Allow",
      "Action": "config:*",
      "Resource": "*"
    }
  ]
}
Note

The sample code in this tutorial can be run with one click. Run now

Important

This solution implements automatic remediation by directly deleting non-compliant security group rules. This may affect business continuity. You can modify the remediation code in Function Compute as needed.

Terraform code

variable "region_id" {
  type    = string
  default = "cn-shenzhen"
}
# main.tf


provider "alicloud" {
  region     = var.region_id
}

resource "local_file" "python_script" {
  content  = <<EOF
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
import sys

sys.path.append('/opt/python')
import json
import logging
import jmespath  # Use jmespath instead of jsonpath.
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_tea_openapi.client import Client as OpenApiClient
from alibabacloud_openapi_util.client import Client as OpenApiUtilClient
from alibabacloud_tea_util import models as util_models

logger = logging.getLogger()


def handler(event, context):
    logger.info(f"This is event: {str(event, encoding='utf-8')}")
    get_resources_non_compliant(event, context)


def get_resources_non_compliant(event, context):
    # Get information about non-compliant resources.
    resources = parse_json(event)
    # Traverse the non-compliant resources and perform remediation.
    for resource in resources:
        remediation(resource, context)


def parse_json(content):
    """
    Parse string to json object
    :param content: json string content
    :return: Json object
    """
    try:
        return json.loads(content)
    except Exception as e:
        logger.error('Parse content:{} to json error:{}.'.format(content, e))
        return None


def remediation(resource, context):
    logger.info(f"Information about the resource to be remediated: {resource}")
    region_id = resource['regionId']
    account_id = resource['accountId']
    resource_id = resource['resourceId']
    resource_type = resource['resourceType']
    if resource_type == 'ACS::ECS::SecurityGroup':
        # Get the configuration of the non-compliant security group and re-verify it to ensure the accuracy of the assessment.
        resource_result = get_discovered_resource(context, resource_id, resource_type, region_id)
        configuration = json.loads(resource_result["body"]["DiscoveredResourceDetail"]["Configuration"])
        # Check whether the security group is a managed security group.
        is_managed_security_group = configuration.get('ServiceManaged')
        # Use jmespath to get the IDs of security group rules that have an inbound direction and grant access to 0.0.0.0/0.
        delete_security_group_rule_ids = jmespath.search(
            "Permissions.Permission[?SourceCidrIp=='0.0.0.0/0'].SecurityGroupRuleId",
            configuration
        )
        # If the security group is not a managed security group and has an inbound rule that grants access to 0.0.0.0/0, delete the rule.
        if is_managed_security_group is False and delete_security_group_rule_ids:
            logger.info(f"Note: Deleting security group rule {region_id}:{resource_id}:{delete_security_group_rule_ids}")
            revoke_security_group(context, region_id, resource_id, delete_security_group_rule_ids)


def revoke_security_group(context, region_id, resource_id, security_group_rule_ids):
    creds = context.credentials
    config = open_api_models.Config(
        access_key_id=creds.access_key_id,
        access_key_secret=creds.access_key_secret,
        security_token=creds.security_token,
        endpoint=f'ecs.{region_id}.aliyuncs.com'
    )
    client = OpenApiClient(config)
    params = open_api_models.Params(
        style='RPC',  # API style
        version='2014-05-26',  # API version number
        action='RevokeSecurityGroup',  # API name
        method='POST',  # Request method
        pathname='/',  # The API path. The default path for an RPC API is "/".
        protocol='HTTPS',  # The API protocol.
        auth_type='AK',
        req_body_type='json',  # The format of the request body.
        body_type='json'  # The format of the response body.
    )
    query = {'RegionId': region_id, 'SecurityGroupId': resource_id, 'SecurityGroupRuleId': security_group_rule_ids}
    # Create an API request object.
    request = open_api_models.OpenApiRequest(
        query=OpenApiUtilClient.query(query),
    )
    runtime = util_models.RuntimeOptions()
    response = client.call_api(params, request, runtime)
    logger.info(f"Deletion result: {response}")


# Get resource details.
def get_discovered_resource(context, resource_id, resource_type, region_id):
    """
    Call an API operation to get the configuration details of a resource.
    :param context: The Function Compute context.
    :param resource_id: The resource ID.
    :param resource_type: The resource type.
    :param region_id: The ID of the region where the resource resides.
    :return: The resource details.
    """
    # The service role for Function Compute (FC) must have the AliyunConfigFullAccess permission.
    creds = context.credentials
    config = open_api_models.Config(
        access_key_id=creds.access_key_id,
        access_key_secret=creds.access_key_secret,
        security_token=creds.security_token,
        endpoint='config.cn-shanghai.aliyuncs.com'
    )
    client = OpenApiClient(config)
    params = open_api_models.Params(
        style='RPC',  # API style
        version='2020-09-07',  # API version number
        action='GetDiscoveredResource',  # API name
        method='POST',  # Request method
        pathname='/',  # The API path. The default path for an RPC API is "/".
        protocol='HTTPS',  # The API protocol.
        auth_type='AK',
        req_body_type='json',  # The format of the request body.
        body_type='json'  # The format of the response body.
    )
    query = {'ResourceId': resource_id, 'ResourceType': resource_type, 'Region': region_id}
    # Create an API request object.
    request = open_api_models.OpenApiRequest(
        query=OpenApiUtilClient.query(query),
    )
    runtime = util_models.RuntimeOptions()
    try:
        response = client.call_api(params, request, runtime)
        return response
    except Exception as e:
        logger.error('GetDiscoveredResource error: %s' % e)

EOF
  filename = "${path.module}/python/index.py"
}

resource "local_file" "requirements_txt" {
  content  = <<EOF
  alibabacloud-tea-openapi
  jmespath>= 0.10.0
  EOF
  filename = "${path.module}/python/requests/requirements.txt"
}
locals {
  code_dir       = "${path.module}/python/"
  archive_output = "${path.module}/code.zip"
  base64_output  = "${path.module}/code_base64.txt"
}

data "archive_file" "code_package" {
  type        = "zip"
  source_dir  = local.code_dir
  output_path = local.archive_output

  depends_on = [
    local_file.python_script,
    local_file.requirements_txt,
  ]
}

resource "null_resource" "upload_code" {
  provisioner "local-exec" {
    command = <<EOT
    base64 -w 0 ${local.archive_output} > ${local.base64_output}
    EOT

    interpreter = ["sh", "-c"]
  }

  depends_on = [data.archive_file.code_package]
}

data "local_file" "base64_encoded_code" {
  filename   = local.base64_output
  depends_on = [null_resource.upload_code]
}
resource "alicloud_fcv3_function" "fc_function" {
  runtime       = "python3.10"
  handler       = "index.handler"
  function_name = "HHM-FC-TEST"
  role          = alicloud_ram_role.role.arn

  code {
    zip_file = data.local_file.base64_encoded_code.content
  }
  lifecycle {
    ignore_changes = [
      code
    ]
  }

  # Explicitly set log_config to empty.
  log_config {}

  depends_on = [data.local_file.base64_encoded_code]
}

resource "alicloud_config_rule" "default" {
  rule_name    = "SPM0014-sg-disallow-risky-ports-for-all-ips"
  description  = "Prohibits security groups from opening vulnerable ports 22 and 3389 to all IP ranges."
  source_owner = "ALIYUN"
  # (Required, ForceNew) Specifies whether you or Alibaba Cloud owns and manages the rule. Valid values: CUSTOM_FC: The rule is a custom rule that you own. ● ALIYUN: The rule is a managed rule that Alibaba Cloud owns.
  source_identifier = "sg-risky-ports-check"
  # The identifier of the rule. For a managed rule, the value is the name of the managed rule. For a custom rule, the value is the Alibaba Cloud Resource Name (ARN) of the custom rule. (Required, ForceNew)
  resource_types_scope = ["ACS::ECS::SecurityGroup"]
  # The IDs of resources that are excluded from the monitoring scope. Separate multiple IDs with commas (,). This parameter applies only to rules created based on managed rules. For custom rules, this parameter is empty.
  config_rule_trigger_types = "ConfigurationItemChangeNotification" # The rule is triggered when a configuration changes.
  # Valid values: One_Hour, Three_Hours, Six_Hours, Twelve_Hours, and TwentyFour_Hours.
  risk_level = 1 #    ● 1: Critical ● 2: Warning ● 3: Informational

  input_parameters = {
    "ports" : "22,3389"
  }
}

resource "alicloud_config_remediation" "default" {
  config_rule_id          = alicloud_config_rule.default.id
  remediation_template_id = alicloud_fcv3_function.fc_function.function_arn
  remediation_source_type = "CUSTOM"
  invoke_type             = "AUTO_EXECUTION"
  params                  = "{}"
  remediation_type        = "FC"
}

resource "random_integer" "default" {
  min = 10000
  max = 99999
}

resource "alicloud_ram_role" "role" {
  name        = "tf-example-role-${random_integer.default.result}"
  document    = <<EOF
   {
    "Statement": [
       {
        "Action": "sts:AssumeRole",
        "Effect": "Allow",
        "Principal": {
          "Service": [
            "fc.aliyuncs.com"
          ]
        }
      }
    ],
    "Version": "1"
  }
  EOF
  description = "Ecs ram role."
  force       = true
}
resource "alicloud_ram_policy" "policy" {
  policy_name     = "tf-example-ram-policy-${random_integer.default.result}"
  policy_document = <<EOF
   {
    "Statement": [
       {
        "Action": [
          "config:GetDiscoveredResource",
          "ecs:RevokeSecurityGroup"
        ],
        "Effect":  "Allow",
        "Resource": ["*"]
      }
    ],
    "Version": "1"
  }
  EOF
  description     = "This is a policy test."
  force           = true
}

resource "alicloud_ram_role_policy_attachment" "attach" {
  policy_name = alicloud_ram_policy.policy.policy_name
  policy_type = "Custom"
  role_name   = alicloud_ram_role.role.name
}

View the results

  1. Log on to the Cloud Config console to view the created rule.

    image

  1. Log on to the Function Compute console to view the created function.

image

View the remediation results

Before remediation

  1. Non-compliant resources are displayed in Cloud Config.

    image

  2. Log on to the ECS console to view the security group.

    image

After remediation

  1. The automatic remediation details are displayed in Cloud Config.

    image

  1. View the security group in the ECS console after remediation.

    image

References