All Products
Search
Document Center

Security Center:Automate cross-account emergency vulnerability scans

Last Updated:Jun 29, 2026

Use a Python script to batch-trigger Cloud Security Center emergency vulnerability scans across all Resource Directory (RD) member accounts. The script calls the ModifyEmgVulSubmit API and auto-discovers accounts through RD, eliminating manual account lists.

Background

Your organization manages 150 member accounts under RD. A local privilege escalation vulnerability (CVE-2021-4034) is found in polkit pkexec, and your security team must assess the blast radius across every account immediately.

Manual approaches have several problems:

  • Slow manual response: Logging in to 150 accounts one by one takes hours — the containment window may close before you finish.

  • Gaps in coverage: Hand-maintained account lists easily miss newly added RD members.

  • Automation pitfalls: A concurrent script without API throttling hits rate limits and causes batch failures.

  • Fragmented visibility: Scan results scatter across individual accounts with no single-pane view of organization-wide vulnerability status.

Solution overview

A Python script on your local machine pulls all member accounts from RD and submits emergency vulnerability scan tasks through the Cloud Security Center API. After scans complete, view results in the Agentic SOC dashboard.

image

The script authenticates with a RAM user's AccessKey and calls DescribeEmgVulItem to validate vulnerability names. It then uses the RD ListAccounts API to discover all member accounts and calls ModifyEmgVulSubmit for each account to submit a scan task. Results flow into SLS through Agentic SOC for centralized monitoring.

Procedure

Step 1: Set up the environment and permissions

  1. Verify the following prerequisites:

    • An enterprise-verified Alibaba Cloud account with administrator privileges.

    • Enable Resource Directory.

    • Cloud Security Center (Advanced edition or higher) enabled on all RD member accounts, with Agentic SOC log ingestion configured (Access configuration).

    • Python 3.6 or later installed locally.

  2. Create a RAM user (for example, VulnScanner) and grant it the following permissions:

    • AliyunYundunSASFullAccess (Required): Grants access to Cloud Security Center scanning APIs. This topic uses a system policy for quick validation. In production, create a custom policy that grants only ModifyEmgVulSubmit and DescribeEmgVulItem permissions.

    • AliyunResourceDirectoryReadOnlyAccess (Recommended): Enables auto-discovery of all RD member accounts. Without it, you must list account IDs manually in the configuration file.

  3. Enable OpenAPI access and save the AccessKey. You need it in Step 3: Run the scan.

    Important

    Never use the AccessKey of your Alibaba Cloud account (root account) to run the script. Always use the dedicated RAM user's AccessKey created above, and rotate it regularly to minimize the risk of credential leakage.

Step 2: Download the script and configure scan parameters

  1. Download the following three files to your working directory:

    • scan_manager.py: Main script. Iterates through accounts and calls the scan API for each.

    • config.ini: Configuration file. Specifies the service endpoint, target vulnerabilities, and scan parameters.

    • requirements.txt: Required Python libraries.

    scan_manager.py

    # -*- coding: utf-8 -*-
    # Cross-account batch scan script for emergency vulnerabilities
    import os
    import sys
    import time
    import configparser
    from typing import List, Dict
    
    from alibabacloud_credentials.client import Client as CredentialClient
    from alibabacloud_sas20181203 import models as sas_20181203_models
    from alibabacloud_sas20181203.client import Client as Sas20181203Client
    from alibabacloud_tea_openapi import models as open_api_models
    from alibabacloud_tea_util import models as util_models
    from alibabacloud_tea_util.client import Client as UtilClient
    
    # --- Import the Resource Directory client ---
    from alibabacloud_resourcemanager20200331 import models as resourcemanager_20200331_models
    from alibabacloud_resourcemanager20200331.client import Client as ResourceManager20200331Client
    
    
    class Sample:
        def __init__(self):
            pass
    
        @staticmethod
        def create_client(endpoint: str) -> Sas20181203Client:
            credential = CredentialClient()
            config = open_api_models.Config(
                credential=credential
            )
            config.endpoint = endpoint
            return Sas20181203Client(config)
    
        @staticmethod
        def resolve_vul_names(client: Sas20181203Client, target_names: List[str]) -> Dict[str, str]:
            """Uses the DescribeEmgVulItem API to resolve console display names of vulnerabilities into the internal names required by the API.
    
            It passes the vulnerability name from the console's Emergency Vulnerabilities page as the VulName parameter to DescribeEmgVulItem
            and extracts the Name field from the response for subsequent calls to ModifyEmgVulSubmit.
    
            Args:
                client: The Cloud Security Center client.
                target_names: A list of vulnerability names from the configuration file (console display names).
    
            Returns:
                A dictionary where the key is the API vulnerability name (for ModifyEmgVulSubmit) and the value is the display name (for log output).
            """
            runtime = util_models.RuntimeOptions()
    
            resolved: Dict[str, str] = {}
            for target in target_names:
                try:
                    request = sas_20181203_models.DescribeEmgVulItemRequest(
                        vul_name=target
                    )
                    response = client.describe_emg_vul_item_with_options(request, runtime)
                    vul_items = response.body.grouped_vul_items or []
    
                    if vul_items:
                        vul_name = vul_items[0].name
                        resolved[vul_name] = target
                        print(f"  ✅ '{target}' → {vul_name}")
                    else:
                        print(f"  ⚠️ No matching emergency vulnerability found: '{target}'")
                except Exception as e:
                    print(f"  ❌ Failed to query vulnerability '{target}': {e}")
    
            return resolved
    
        @staticmethod
        def main(args: List[str]) -> None:
            # --- Read configurations from the config.ini file ---
            config = configparser.ConfigParser(delimiters=('=',))
            # Read the file using UTF-8 encoding to prevent garbled characters.
            config.read('config.ini', encoding='utf-8')
    
            try:
                # Read scan configurations
                endpoint = config.get('scan_config', 'endpoint')
                scan_targets_str = config.get('scan_config', 'scan_targets')
                target_names = [name.strip() for name in scan_targets_str.split(',') if name.strip()]
                account_ids_str = config.get('scan_config', 'member_uids')
                account_ids = [uid.strip() for uid in account_ids_str.split(',') if uid.strip()]
                max_qps = config.getint('scan_config', 'max_qps')
    
            except (configparser.NoSectionError, configparser.NoOptionError) as e:
                print(f"❌ Error reading configuration file 'config.ini': {e}")
                print("Please make sure the config.ini file exists and contains the [scan_config] section with all required keys.")
                sys.exit(1)
    
            # --- Asset auto-discovery logic ---
            # If member_uids is empty in the configuration file, attempt to automatically retrieve the list from Resource Directory.
            if not account_ids:
                print("ℹ️ 'member_uids' is empty, attempting to auto-discover member accounts from Resource Directory...")
                try:
                    # The general endpoint for Resource Manager.
                    rm_endpoint = 'resourcemanager.aliyuncs.com'
    
                    # Create a Resource Directory client.
                    credential = CredentialClient()
                    rm_config = open_api_models.Config(credential=credential, endpoint=rm_endpoint)
                    rm_client = ResourceManager20200331Client(rm_config)
    
                    all_accounts = []
                    page_number = 1
                    while True:
                        list_accounts_request = resourcemanager_20200331_models.ListAccountsRequest(
                            page_number=page_number,
                            page_size=100  # Use the maximum PageSize to reduce the number of requests.
                        )
                        # Call the ListAccounts API and handle pagination in a loop.
                        response = rm_client.list_accounts(list_accounts_request)
                        if response.body.accounts and response.body.accounts.account:
                            current_page_accounts = response.body.accounts.account
                            all_accounts.extend(current_page_accounts)
                            # If the number of returned records is less than the PageSize, it is the last page.
                            if len(current_page_accounts) < 100:
                                break
                            page_number += 1
                        else:
                            break  # No more accounts
    
                    # Extract account IDs from the response.
                    account_ids = [acc.account_id for acc in all_accounts]
    
                    if account_ids:
                        print(f"✅ Successfully discovered {len(account_ids)} member accounts from Resource Directory.")
                    else:
                        print("⚠️ Failed to discover any member accounts from Resource Directory. Check your permissions or the status of Resource Directory.")
                except Exception as e:
                    print(f"❌ Failed to auto-discover accounts from Resource Directory: {e}")
                    print("   Make sure the RAM user has the 'AliyunResourceDirectoryReadOnlyAccess' permission, or manually configure 'member_uids' in config.ini.")
                    # If auto-discovery fails and no manual configuration is provided, exit.
                        sys.exit(1)
            # --- End of configuration reading ---
    
            client = Sample.create_client(endpoint)
            runtime = util_models.RuntimeOptions()
    
            # --- Resolve vulnerability names by using the API ---
            print("Querying the list of emergency vulnerabilities...")
            vul_map = Sample.resolve_vul_names(client, target_names)
    
            if not vul_map:
                print("⚠️ No matching vulnerabilities found. Exiting program.")
                return
    
            vul_names = list(vul_map.keys())
            print(f"✅ Successfully matched {len(vul_names)} vulnerabilities")
    
            total_tasks = len(vul_names) * len(account_ids)
            if total_tasks == 0:
                print("⚠️  No vulnerabilities or accounts found in config.ini to process. Exiting program.")
                return
    
            print(f"Starting to process {len(vul_names)} vulnerabilities × {len(account_ids)} accounts = {total_tasks} scan tasks")
            print(f"QPS is throttled at {max_qps}. Estimated time to completion: {total_tasks / max_qps:.1f} seconds")
    
            success_count = 0
            failure_count = 0
            start_time = time.time()
            last_request_time = 0
            MIN_INTERVAL = 1.0 / max_qps
            request_count = 0
    
            for name in vul_names:
                for account_id in account_ids:
                    request_count += 1
    
                    # QPS throttling
                    current_time = time.time()
                    elapsed_since_last = current_time - last_request_time
                    if elapsed_since_last < MIN_INTERVAL:
                        time.sleep(MIN_INTERVAL - elapsed_since_last)
                    last_request_time = time.time()
    
                    display_name = vul_map.get(name, name)
    
                    try:
                        modify_emg_vul_submit_request = sas_20181203_models.ModifyEmgVulSubmitRequest(
                            name=name,
                            user_agreement='yes',
                            resource_directory_account_id=account_id
                        )
    
                        client.modify_emg_vul_submit_with_options(modify_emg_vul_submit_request, runtime)
                        success_count += 1
                        elapsed_time = time.time() - start_time
                        avg_qps = request_count / elapsed_time if elapsed_time > 0 else 0
    
                        print(f"✅ [{request_count}/{total_tasks}] Success: Vulnerability='{display_name}', Account ID={account_id} "
                              f"(Current avg QPS: {avg_qps:.1f})")
    
                    except Exception as error:
                        failure_count += 1
                        print(f"❌ [{request_count}/{total_tasks}] Failure: Vulnerability='{display_name}', Account ID={account_id}")
    
                        error_message = getattr(error, 'message', str(error))
                        print(f"   Error message: {error_message}")
                        recommend_message = getattr(error, 'data', {}).get('Recommend')
                        if recommend_message:
                            print(f"   Diagnostic suggestion: {recommend_message}")
    
                        error_str = str(error).lower()
                        if 'throttling' in error_str or 'qps' in error_str:
                            print("   ⚠️  API throttling detected. Automatically increasing wait time...")
                            time.sleep(2)
    
            total_time = time.time() - start_time
            final_qps = request_count / total_time if total_time > 0 else 0
    
            print(f"\n{'=' * 50}")
            print(f"Processing complete! Total time: {total_time:.1f} seconds")
            print(f"Success: {success_count}, Failures: {failure_count}, Total: {total_tasks}")
            print(f"Actual avg QPS: {final_qps:.1f} (Target QPS: {max_qps})")
            print(f"{'=' * 50}")
    
    
    if __name__ == '__main__':
        Sample.main(sys.argv[1:])
        

    config.ini

    [scan_config]
    # The service endpoint for Cloud Security Center. You must change this based on the region where your assets are located.
    # For example, the endpoint for the China (Shanghai) region is tds.cn-shanghai.aliyuncs.com.
    endpoint = tds.cn-shanghai.aliyuncs.com
    
    # The API request rate limit, in queries per second (QPS). The default value is 6.
    max_qps = 6
    
    # A comma-separated list of member account IDs.
    # If the RAM user has read permissions for Resource Directory, leave this empty. The script automatically retrieves the list.
    member_uids =
    
    # The names of the emergency vulnerabilities to scan for, separated by commas.
    # You can copy the names directly from the Emergency Vulnerabilities page in the Cloud Security Center console.
    # At startup, the script uses the DescribeEmgVulItem API to resolve these names into the format required by the scan API.
    scan_targets = polkit pkexec Local Privilege Escalation Vulnerability (CVE-2021-4034)

    requirements.txt

    alibabacloud_sas20181203
    alibabacloud_resourcemanager20200331
    alibabacloud_credentials
  2. Open config.ini and set the following parameters:

    • endpoint: The Cloud Security Center service endpoint for the region where your assets are located. The script targets one region per run — run it separately for each region if assets span multiple regions.

    • scan_targets: Emergency vulnerability names to scan, comma-separated. Copy names directly from the Urgent Vulnerability page in the console. At startup, the script calls DescribeEmgVulItem to resolve display names into internal API names.

    • max_qps: Maximum API request rate in QPS. Default: 6, sufficient for most scenarios. For higher rates with large account counts, request a quota increase in Quota Center.

    • member_uids: Comma-separated member account IDs to scan. Leave empty if the RAM user has RD read permissions — the script discovers all accounts automatically.

Step 3: Run the scan

  1. Export the RAM user's AccessKey (created in Step 1) as environment variables.

    export ALIBABA_CLOUD_ACCESS_KEY_ID="YOUR_ACCESSKEY_ID"
    export ALIBABA_CLOUD_ACCESS_KEY_SECRET="YOUR_ACCESSKEY_SECRET"
  2. Install dependencies.

    pip3 install -r requirements.txt
  3. Run the scan script.

    python3 scan_manager.py

    The script prints progress in real time. Sample output:

    Querying the list of emergency vulnerabilities...
    ✅ 'polkit pkexec Local Privilege Escalation Vulnerability (CVE-2021-4034)' → emg_cve-2021-4034:EMG:AVD-2021-4034
    ✅ Successfully matched 1 vulnerability
    
    ℹ️ 'member_uids' is empty, attempting to auto-discover member accounts from Resource Directory...
    ✅ Successfully discovered 150 member accounts from Resource Directory.
    
    Starting to process 1 vulnerability × 150 accounts = 150 scan tasks
    QPS is throttled at 6. Estimated time to completion: 25.0 seconds
    
    ✅ [1/150] Success: Vulnerability='polkit pkexec Local Privilege Escalation Vulnerability (CVE-2021-4034)', Account ID=198XXXXXXXXXXX13 (Current avg QPS: 5.8)
    ✅ [2/150] Success: Vulnerability='polkit pkexec Local Privilege Escalation Vulnerability (CVE-2021-4034)', Account ID=142XXXXXXXXXXX84 (Current avg QPS: 5.9)
    ✅ [3/150] Success: Vulnerability='polkit pkexec Local Privilege Escalation Vulnerability (CVE-2021-4034)', Account ID=188XXXXXXXXXXX97 (Current avg QPS: 5.7)
    ...
    ✅ [150/150] Success: Vulnerability='polkit pkexec Local Privilege Escalation Vulnerability (CVE-2021-4034)', Account ID=156XXXXXXXXXXX62 (Current avg QPS: 5.9)
    
    ==================================================
    Processing complete! Total time: 25.8 seconds
    Success: 150, Failures: 0, Total: 150
    Actual avg QPS: 5.8 (Target QPS: 6)
    ==================================================

Step 4: View scan results

Log on to the Cloud Security Center console as the RAM user. In the left-side navigation pane, choose Agentic SOC > Log. From the Logstore list, select Standardized Log > Vulnerability Activity. Set a time range covering your scan window and click Search & Analyze.

Important

Emergency vulnerabilities support scanning only — not automated remediation. These vulnerabilities are detected in software installed on your servers. Follow the remediation advice in the vulnerability details to manually upgrade or reconfigure the affected software.

Best practices for production environments

Set up routine scanning

Rather than running the script only after a disclosure, schedule it to run regularly so new vulnerabilities are caught automatically:

  • Use a crontab job or Windows Task Scheduler to run the script daily.

  • Deploy the script as a Function Compute (FC) function with a time-based trigger to avoid maintaining a dedicated server.

Harden AccessKey management

  • Rotate regularly: Rotate the RAM user's AccessKey every 90 days to limit exposure if a key is compromised.

  • Use instance RAM roles: If the script runs on an ECS instance, assign an instance RAM role instead of exporting an AccessKey into environment variables.

  • Restrict source IPs: Add an IP condition to the RAM policy so that API calls are accepted only from your operations network.

FAQ

Scans were submitted successfully, but no records appear in the Agentic SOC vulnerability log

  • Log collection not enabled: In Agentic SOC, verify that the Vulnerability Activity collection toggle is enabled on the Log page.

  • Scan still in progress: ModifyEmgVulSubmit only submits the scan task — the actual scan runs asynchronously and may take several minutes to tens of minutes depending on asset count.

  • No vulnerabilities detected: The Vulnerability Log only records detected vulnerabilities. If a member account's assets are not affected, no entries are generated. To verify, log on to the member account and check Risk Governance > Vulnerabilities > Urgent Vulnerability.

  • Multi-account logs not aggregated: Member account logs must be aggregated into the management account through Agentic SOC. Verify that Cloud Security Center is enabled on all member accounts and centralized multi-account onboarding is complete.

Vulnerability name resolution fails with "No matching emergency vulnerability found"

Verify that the names in scan_targets of config.ini exactly match the Urgent Vulnerability page in the Cloud Security Center console, including parentheses and spaces. Copy names directly from the console to avoid errors.

If the name is correct but resolution still fails, the vulnerability may have passed its emergency response period and is no longer available through DescribeEmgVulItem.