全部产品
Search
文档中心

云安全中心:跨账号批量扫描应急漏洞

更新时间:Apr 03, 2026

当高危漏洞被公开披露后,管理多个云账号的企业需要快速评估所有账号的受影响情况。本文介绍如何使用 Python 脚本调用云安全中心的 ModifyEmgVulSubmit API,结合资源目录(RD)自动获取成员账号列表,实现跨账号批量扫描应急漏洞。

挑战:大规模环境下的应急漏洞响应瓶颈

假设某互联网企业在资源目录(RD)中管理着 150 个成员账号。Linux 系统组件 polkit pkexec 被曝出本地提权漏洞(CVE-2021-4034),安全团队需立即评估全集团的受影响状况。

在这种场景下,传统的运维模式面临以下难题:

  • 手动响应滞后:逐一登录 150 个账号发起扫描,耗时数小时甚至数天,极易错失漏洞收敛的最佳窗口期。

  • 扫描覆盖死角:依靠人工登记账号列表,难以确保新加入 RD 的成员账号被纳入扫描范围。

  • 自动化设计门槛:简单的并发脚本若缺乏 API 节流机制,会因触发云端流控导致任务批量失败。

  • 风险视角孤岛:扫描结果分散,安全主管无法在一个界面查看全集团的漏洞概览及处置进度。

方案概述

本方案在本地运行 Python 脚本,通过资源目录自动获取所有成员账号,然后调用云安全中心 API 批量下发应急漏洞扫描任务。扫描完成后,通过 Agentic SOC 的日志服务集中查看全部账号的漏洞扫描结果。整体流程如下:

image

运维人员在本地通过专用 RAM 用户的 AccessKey 运行 Python 脚本。脚本首先调用 DescribeEmgVulItem API 验证配置中的漏洞名称是否有效,然后调用资源目录 ListAccounts API 自动获取所有成员账号,最后逐一调用云安全中心 ModifyEmgVulSubmit API 下发扫描任务。扫描结果通过 Agentic SOC 汇聚到日志服务(SLS),运维人员在一个界面即可查看全部账号的漏洞状态。

实施步骤

步骤一:配置环境与权限

  1. 准备环境:

    • 一个具备管理员权限的阿里云企业认证账号。

    • 开通资源目录

    • 资源目录成员均已开通云安全中心(高级版及以上),并在Agentic SOC中完成日志接入。详情请参见接入配置

    • Python 3.6及以上的本地环境。

  2. 使用阿里云账号创建RAM用户(例如VulnScanner),并为其授予以下权限:

    • AliyunYundunSASFullAccess(必需):用于调用云安全中心 API 下发扫描任务。本文使用系统策略以快速验证,生产环境建议创建自定义策略仅授予ModifyEmgVulSubmit 和 DescribeEmgVulItem 的调用权限。

    • AliyunResourceDirectoryReadOnlyAccess(推荐):用于自动获取资源目录下的所有成员账号。若不授予此权限,需要在配置文件中手动维护账号列表。

  3. 启用OpenAPI调用并妥善保存AccessKey(在步骤三:执行扫描时使用)。

    重要

    严禁使用阿里云主账号的 AccessKey 执行脚本。请务必使用上述专用 RAM 用户的 AccessKey,并定期轮转以降低泄露风险。

步骤二:下载脚本并配置扫描参数

  1. 下载以下三个文件至本地工作目录:

    • scan_manager.py:主程序,负责遍历账号并调用扫描 API。

    • config.ini:配置文件,定义服务地址、扫描目标漏洞等参数。

    • requirements.txt:Python 依赖库列表。

    scan_manager.py

    # -*- coding: utf-8 -*-
    # This file is auto-generated, don't edit it. Thanks.
    import os
    import sys
    import time
    import configparser
    from typing import List, Dict
    
    from alibabacloud_credentials.client import Client as CredentialClient
    from alibabacloud_sas20181203 import models as sas_20181203_models
    from alibabacloud_sas20181203.client import Client as Sas20181203Client
    from alibabacloud_tea_openapi import models as open_api_models
    from alibabacloud_tea_util import models as util_models
    from alibabacloud_tea_util.client import Client as UtilClient
    
    # --- 资源目录客户端的导入 ---
    from alibabacloud_resourcemanager20200331 import models as resourcemanager_20200331_models
    from alibabacloud_resourcemanager20200331.client import Client as ResourceManager20200331Client
    
    
    class Sample:
        def __init__(self):
            pass
    
        @staticmethod
        def create_client(endpoint: str) -> Sas20181203Client:
            credential = CredentialClient()
            config = open_api_models.Config(
                credential=credential
            )
            config.endpoint = endpoint
            return Sas20181203Client(config)
    
        @staticmethod
        def resolve_vul_names(client: Sas20181203Client, target_names: List[str]) -> Dict[str, str]:
            """通过 DescribeEmgVulItem API 将控制台漏洞名称解析为 API 所需的漏洞名称。
    
            将控制台应急漏洞页面的漏洞名称作为 VulName 参数传给 DescribeEmgVulItem,
            从返回结果中提取 Name 字段,用于后续调用 ModifyEmgVulSubmit。
    
            Args:
                client: 云安全中心客户端。
                target_names: 配置文件中的漏洞名称列表(控制台显示名称)。
    
            Returns:
                Dict,key 为 API 漏洞名称(用于 ModifyEmgVulSubmit),value 为显示名称(用于日志输出)。
            """
            runtime = util_models.RuntimeOptions()
    
            resolved: Dict[str, str] = {}
            for target in target_names:
                try:
                    request = sas_20181203_models.DescribeEmgVulItemRequest(
                        vul_name=target
                    )
                    response = client.describe_emg_vul_item_with_options(request, runtime)
                    vul_items = response.body.grouped_vul_items or []
    
                    if vul_items:
                        vul_name = vul_items[0].name
                        resolved[vul_name] = target
                        print(f"  ✅ '{target}' → {vul_name}")
                    else:
                        print(f"  ⚠️ 未找到匹配的应急漏洞: '{target}'")
                except Exception as e:
                    print(f"  ❌ 查询漏洞 '{target}' 失败: {e}")
    
            return resolved
    
        @staticmethod
        def main(args: List[str]) -> None:
            # --- 从 config.ini 文件读取配置 ---
            config = configparser.ConfigParser(delimiters=('=',))
            # 指定以 utf-8 编码读取,避免中文乱码
            config.read('config.ini', encoding='utf-8')
    
            try:
                # 读取扫描配置
                endpoint = config.get('scan_config', 'endpoint')
                scan_targets_str = config.get('scan_config', 'scan_targets')
                target_names = [name.strip() for name in scan_targets_str.split(',') if name.strip()]
                account_ids_str = config.get('scan_config', 'member_uids')
                account_ids = [uid.strip() for uid in account_ids_str.split(',') if uid.strip()]
                max_qps = config.getint('scan_config', 'max_qps')
    
            except (configparser.NoSectionError, configparser.NoOptionError) as e:
                print(f"❌ 配置文件 'config.ini' 读取错误: {e}")
                print("请确保 config.ini 文件存在且包含 [scan_config] 部分和所有必需的键。")
                sys.exit(1)
    
            # --- 资产自动发现逻辑 ---
            # 如果配置文件中的 member_uids 为空,则尝试从资源目录自动获取
            if not account_ids:
                print("ℹ️ 'member_uids' 为空,尝试从资源目录自动发现成员账号...")
                try:
                    # 资源管理服务的通用 Endpoint
                    rm_endpoint = 'resourcemanager.aliyuncs.com'
    
                    # 创建资源目录客户端
                    credential = CredentialClient()
                    rm_config = open_api_models.Config(credential=credential, endpoint=rm_endpoint)
                    rm_client = ResourceManager20200331Client(rm_config)
    
                    all_accounts = []
                    page_number = 1
                    while True:
                        list_accounts_request = resourcemanager_20200331_models.ListAccountsRequest(
                            page_number=page_number,
                            page_size=100  # 使用最大 PageSize 减少请求次数
                        )
                        # 调用 ListAccounts API,循环处理分页
                        response = rm_client.list_accounts(list_accounts_request)
                        if response.body.accounts and response.body.accounts.account:
                            current_page_accounts = response.body.accounts.account
                            all_accounts.extend(current_page_accounts)
                            # 当返回的记录数小于 PageSize 时,表示已是最后一页
                            if len(current_page_accounts) < 100:
                                break
                            page_number += 1
                        else:
                            break  # 没有更多账号
    
                    # 从返回结果中提取账号ID
                    account_ids = [acc.account_id for acc in all_accounts]
    
                    if account_ids:
                        print(f"✅ 成功从资源目录发现 {len(account_ids)} 个成员账号。")
                    else:
                        print("⚠️ 未能从资源目录发现任何成员账号。请检查权限或资源目录状态。")
                except Exception as e:
                    print(f"❌ 从资源目录自动发现账号失败: {e}")
                    print("   请确保 RAM 用户拥有 'AliyunResourceDirectoryReadOnlyAccess' 权限,或在 config.ini 中手动配置 'member_uids'。")
                    # 如果自动发现失败且手动也未配置,则退出
                    if not any(account_ids):
                        sys.exit(1)
            # --- 配置读取结束 ---
    
            client = Sample.create_client(endpoint)
            runtime = util_models.RuntimeOptions()
    
            # --- 通过 API 解析漏洞名称 ---
            print("正在查询应急漏洞列表...")
            vul_map = Sample.resolve_vul_names(client, target_names)
    
            if not vul_map:
                print("⚠️ 没有匹配到任何漏洞,程序退出。")
                return
    
            vul_names = list(vul_map.keys())
            print(f"✅ 成功匹配 {len(vul_names)} 个漏洞")
    
            total_tasks = len(vul_names) * len(account_ids)
            if total_tasks == 0:
                print("⚠️  没有在 config.ini 中找到要执行的漏洞或账号,程序退出。")
                return
    
            print(f"开始处理 {len(vul_names)} 个漏洞 × {len(account_ids)} 个账号 = {total_tasks} 个扫描任务")
            print(f"QPS控制为 {max_qps},预计完成时间: {total_tasks / max_qps:.1f} 秒")
    
            success_count = 0
            failure_count = 0
            start_time = time.time()
            last_request_time = 0
            MIN_INTERVAL = 1.0 / max_qps
            request_count = 0
    
            for name in vul_names:
                for account_id in account_ids:
                    request_count += 1
    
                    # QPS控制
                    current_time = time.time()
                    elapsed_since_last = current_time - last_request_time
                    if elapsed_since_last < MIN_INTERVAL:
                        time.sleep(MIN_INTERVAL - elapsed_since_last)
                    last_request_time = time.time()
    
                    display_name = vul_map.get(name, name)
    
                    try:
                        modify_emg_vul_submit_request = sas_20181203_models.ModifyEmgVulSubmitRequest(
                            name=name,
                            user_agreement='yes',
                            resource_directory_account_id=account_id
                        )
    
                        client.modify_emg_vul_submit_with_options(modify_emg_vul_submit_request, runtime)
                        success_count += 1
                        elapsed_time = time.time() - start_time
                        avg_qps = request_count / elapsed_time if elapsed_time > 0 else 0
    
                        print(f"✅ [{request_count}/{total_tasks}] 成功: 漏洞='{display_name}', 账号ID={account_id} "
                              f"(当前平均QPS: {avg_qps:.1f})")
    
                    except Exception as error:
                        failure_count += 1
                        print(f"❌ [{request_count}/{total_tasks}] 失败: 漏洞='{display_name}', 账号ID={account_id}")
    
                        error_message = getattr(error, 'message', str(error))
                        print(f"   错误信息: {error_message}")
                        recommend_message = getattr(error, 'data', {}).get('Recommend')
                        if recommend_message:
                            print(f"   诊断建议: {recommend_message}")
    
                        error_str = str(error).lower()
                        if 'throttling' in error_str or 'qps' in error_str:
                            print("   ⚠️  检测到API流控,自动增加等待时间...")
                            time.sleep(2)
    
            total_time = time.time() - start_time
            final_qps = request_count / total_time if total_time > 0 else 0
    
            print(f"\n{'=' * 50}")
            print(f"处理完成! 总耗时: {total_time:.1f} 秒")
            print(f"成功: {success_count}, 失败: {failure_count}, 总计: {total_tasks}")
            print(f"实际平均QPS: {final_qps:.1f} (目标QPS: {max_qps})")
            print(f"{'=' * 50}")
    
    
    if __name__ == '__main__':
        Sample.main(sys.argv[1:])
        

    config.ini

    [scan_config]
    # 云安全中心的服务地址(Endpoint),必须根据资产所在地域手动修改。
    # 例如,上海地域为 tds.cn-shanghai.aliyuncs.com
    endpoint = tds.cn-shanghai.aliyuncs.com
    
    # API请求速率(QPS)限制,默认为6次/秒。
    max_qps = 6
    
    # 成员账号阿里云账号列表,用英文逗号分隔。
    # 如果RAM用户已授予资源目录读取权限,请将此项留空,脚本会自动获取。
    member_uids =
    
    # 要扫描的应急漏洞名称,多个漏洞用英文逗号分隔。
    # 名称可从云安全中心控制台的应急漏洞页面直接复制。
    # 脚本启动时会通过 DescribeEmgVulItem API 自动解析为 API 所需的漏洞名称。
    scan_targets = polkit pkexec 本地提权漏洞(CVE-2021-4034)

    requirements.txt

    alibabacloud_sas20181203
    alibabacloud_resourcemanager20200331
    alibabacloud_credentials
  2. 打开config.ini,根据实际需求配置以下参数:

    • endpoint:根据资产所在地域,修改为对应的云安全中心服务接入点。脚本每次运行针对单个地域,若资产分布在多个地域,需为每个地域分别配置并执行。

    • scan_targets:填入需要扫描的应急漏洞名称,多个漏洞用英文逗号分隔。名称可从云安全中心控制台的应急漏洞页面直接复制。脚本启动时会通过 DescribeEmgVulItem API 自动解析为 API 所需的漏洞名称。

    • max_qps:API 请求速率上限,默认为 6 次/秒,适用于大多数场景。若账号规模较大且需要更高速率,可在配额中心申请提升云安全中心相关 API 的频率配额。

    • member_uids:待扫描的成员账号 ID 列表。若 RAM 用户已有资源目录读取权限,留空即可,脚本会自动获取所有成员账号。

步骤三:执行扫描

  1. 配置身份凭证。将步骤一创建的 RAM 用户 AccessKey 配置为环境变量。

    export ALIBABA_CLOUD_ACCESS_KEY_ID="YOUR_ACCESSKEY_ID"
    export ALIBABA_CLOUD_ACCESS_KEY_SECRET="YOUR_ACCESSKEY_SECRET"
  2. 安装依赖。

    pip3 install -r requirements.txt
  3. 执行扫描脚本。

    python3 scan_manager.py

    脚本实时打印进度。输出示例:

    正在查询应急漏洞列表...
      ✅ 'polkit pkexec 本地提权漏洞(CVE-2021-4034)' → emg_cve-2021-4034:EMG:AVD-2021-4034
    ✅ 成功匹配 1 个漏洞
    
    ℹ️ 'member_uids' 为空,尝试从资源目录自动发现成员账号...
    ✅ 成功从资源目录发现 150 个成员账号。
    
    开始处理 1 个漏洞 × 150 个账号 = 150 个扫描任务
    QPS控制为 6,预计完成时间: 25.0 秒
    
    ✅ [1/150] 成功: 漏洞='polkit pkexec 本地提权漏洞(CVE-2021-4034)', 账号ID=198XXXXXXXXXXX13 (当前平均QPS: 5.8)
    ✅ [2/150] 成功: 漏洞='polkit pkexec 本地提权漏洞(CVE-2021-4034)', 账号ID=142XXXXXXXXXXX84 (当前平均QPS: 5.9)
    ✅ [3/150] 成功: 漏洞='polkit pkexec 本地提权漏洞(CVE-2021-4034)', 账号ID=188XXXXXXXXXXX97 (当前平均QPS: 5.7)
    ...
    ✅ [150/150] 成功: 漏洞='polkit pkexec 本地提权漏洞(CVE-2021-4034)', 账号ID=156XXXXXXXXXXX62 (当前平均QPS: 5.9)
    
    ==================================================
    处理完成! 总耗时: 25.8 秒
    成功: 150, 失败: 0, 总计: 150
    实际平均QPS: 5.8 (目标QPS: 6)
    ==================================================

步骤四:查看扫描结果

登录RAM账号的登录云安全中心控制台,在左侧导航栏选择 Agentic SOC > 日志。在日志库列表中,选择标准化日志 > 漏洞日志,选择扫描时间范围后,单击查询/分析,查看漏洞扫描结果。

image

重要

应急漏洞仅支持扫描,不支持修复。应急漏洞是针对您服务器上安装的软件进行扫描后发现的风险,需要您根据漏洞详情中的修复建议,手动对软件应用升级或进行配置修改,排除安全隐患。

生产环境实践建议

建立常态化扫描机制

应急漏洞的披露不可预测,建议将脚本纳入定期执行计划,而非仅在漏洞爆发时临时运行。可通过以下方式实现自动化:

  • 使用 Linux crontab 或 Windows 任务计划程序定时执行脚本(例如每天一次),持续监控新披露的应急漏洞。

  • 结合阿里云函数计算(FC)部署为 Serverless 函数,通过定时触发器调度,无需维护常驻服务器。

强化 AccessKey 安全管理

  • 定期轮转:每 90 天轮换一次 RAM 用户的 AccessKey,降低密钥泄露风险。

  • 使用实例角色:若脚本运行在 ECS 实例上,推荐通过实例RAM角色授权,避免在环境变量中暴露 AccessKey。

  • 限制来源 IP:在 RAM 策略中添加 IP 条件,仅允许从运维专用网段调用 API。

常见问题

扫描提交成功,但在 Agentic SOC 的漏洞日志中看不到记录

  • 日志采集未开启:在 Agentic SOC 的日志中,确认漏洞日志的采集开关已开启。

  • 扫描尚未完成:ModifyEmgVulSubmit 仅提交扫描任务,实际扫描是异步执行的。根据资产规模,扫描可能需要数分钟到数十分钟才能完成。

  • 未检测到漏洞:漏洞日志仅记录实际检测到的漏洞。如果成员账号的资产上不存在该应急漏洞,扫描完成后不会产生漏洞日志记录。可先登录成员账号,在漏洞管理 > 应急漏洞页面确认单个账号是否有扫描结果。

  • 多账号日志未汇聚:成员账号的日志需通过 Agentic SOC 汇聚到管理账号。确认成员账号已开通云安全中心并完成多账号统一接入配置。

漏洞名称解析失败,提示"未找到匹配的应急漏洞"

请确认 config.iniscan_targets 填写的名称与云安全中心控制台应急漏洞页面显示的名称完全一致(包括括号和空格)。建议直接从控制台复制漏洞名称,避免手动输入导致的字符差异。

如果确认名称无误仍然解析失败,可能该漏洞已过应急响应周期,不再支持通过 DescribeEmgVulItem API 查询和主动扫描。