業務中使用者上傳檔案、代碼發布、資料共用等情境,可能引入勒索病毒、挖礦程式、WebShell等惡意檔案,對業務系統和資料安全構成威脅。惡意檔案檢測功能依託Security Center的多引擎檢測平台,提供多種方式識別和處理這些風險。本文檔旨在協助不同角色的使用者根據業務情境,快速選擇最適合的檢測方式。
功能概述
常見應用情境
應用情境 | 說明 |
伺服器攻擊防範 | 防範蠕蟲、挖礦木馬等,避免伺服器資源被消耗或被用於DDoS攻擊。 |
定向攻擊防禦 | 檢測後門、駭客工具等,防止資料被竊取或系統被遠端控制。 |
辦公網路與檔案儲存體 | 識別含有惡意宏或指令碼的文檔/壓縮包,防範釣魚和憑據失竊。 |
全系統內容檢測 | 查殺勒索病毒和感染型病毒,避免資料被加密勒索或系統大範圍癱瘓。 |
檢測方式
Security Center提供整合SDK和控制台操作兩種方式,執行檢測任務。
特性 | 整合SDK | 控制台操作 |
使用方式 | 在業務代碼中整合SDK進行調用,支援通過Java或Python方式接入。 | 登入Security Center控制台,通過圖形化介面操作。 |
支援檢測的檔案 |
| 僅支援Object Storage Service檔案。 |
適用範圍
支援的檔案類型
壓縮檔檢測: 支援解壓並掃描未加密的壓縮包,解壓層級與檔案數可配置。
加密檔案檢測: 支援自動解密並掃描經OSS服務端加密(SSE-KMS、SSE-OSS)的資料。
支援的檔案格式:
壓縮包:
.7z、.zip、.rar、.tar、.gz、.bz2、.xz、.lzma、.ar、.tar.gz。指令碼/WebShell:
.php、.jsp、.jspx、.asp、.aspx、.sh、.py、.ps1、.pl、.bat。文件類型:
.doc、.pdf、.ppt、.xsl、.rtf、.hta、.chm。可執行/二進位檔案:
.apk、.exe、.dll、.ocx、.com、.so、.sys、.ko、.obj。
支援的病毒檔案:支援檢測反彈Shell後門、DDoS木馬、挖礦程式等病毒檔案,更多內容參見支援檢測的病毒類型。
儲存類型限制:僅支援檢測儲存類型為“標準儲存”和“低頻訪問”的OSS檔案,不支援“Archive Storage”類型。
支援檢測的OSS Bucket的地區:
華北1(青島)、華北2(北京)、華北3(張家口)、華北5(呼和浩特)、華北6(烏蘭察布)
華東1(杭州)、華東2(上海)
華南1(深圳)、華南2(河源)、華南3(廣州)
西南1(成都)、中國香港
新加坡、馬來西亞(吉隆坡)、印尼(雅加達)、菲律賓(馬尼拉)、泰國(曼穀)、日本(東京)、韓國(首爾)、美國(矽谷)、美國(維吉尼亞)、德國(法蘭克福)、英國(倫敦)
開通服務與配置通知
購買並開通服務
訪問Security Center控制台-風險治理-惡意檔案SDK,在頁面左側頂部,選擇需防護資產所在的地區:中國內地或非中國內地。
根據頁面引導,選擇立即試用、立即購買(訂用帳戶)或 開通隨用隨付模式 來啟用功能。
立即試用:如果阿里雲帳號已通過企業認證,可以通過免費試用開通惡意檔案檢測SDK功能。
重要每個阿里雲帳號僅能申請一次免費試用。
試用版提供10,000次惡意檔案檢測次數。
訂用帳戶:引導跳轉至購買頁面,請參照如下說明完成購買配置並支付。
在惡意檔案檢測地區,將是否選購置為是。
按需輸入所需的檢測檔案次數即購買數量(10萬次起售)。
隨用隨付:
在隨用隨付開通彈窗中,單擊開通隨用隨付。
開啟策略(可選):勾選後將自動開啟對所選Bucket中新上傳檔案的周期性檢測策略。該策略立即生效。如需調整檢測範圍,可在策略配置中修改。
也可前往Security Center購買頁進行操作,更多內容請參見購買Security Center。
配置通知(可選)
Security Center提供DingTalk機器人通知功能,可將檢測到的惡意檔案警示資訊自動推送至DingTalk群組,以便及時響應檢測風險。操作步驟如下:
訪問Security Center控制台-系統設定-通知設定,在頁面左側頂部,選擇需防護資產所在的地區:中國內地或非中國內地。
在DingTalk機器人頁簽,單擊添加聊天機器人。
通知打開:建議勾選惡意檔案檢測的所有等級。
其他資訊:請參見配置DingTalk機器人通知。
配置並執行檢測任務
控制台操作
如果待檢測的檔案儲存體在阿里雲Object Storage Service Bucket中,可以直接在Security Center控制台執行對目標OSS Bucket內檔案進行批量或周期性掃描。
手動檢測:此方式適用於對存量檔案進行一次性的掃描,支援全量檢測和增量檢測。
自動檢測:此方式適用於對新增檔案進行持久性的自動掃描。
手動檢測(存量檔案)
進入OSS檔案檢測頁面
訪問Security Center控制台-風險治理-惡意檔案SDK,在頁面左側頂部,選擇需防護資產所在的地區:中國內地或非中國內地。並進入OSS檔案檢測頁簽。
發起檢測任務
重要如果Bucket沒有在OSS檔案檢測的列表中,請單擊同步Bucket,同步最新的Bucket列表。
全量檢測:
功能說明:檢測單個或多個 Bucket 內的所有檔案。
操作入口:
單個檢測:在目標 Bucket 操作列單擊檢測。
批量檢測:勾選多個 Bucket 後,單擊列表下方的批量檢測。
增量檢測
功能說明:針對已檢測且上次檢測後有檔案更新的Bucket,僅檢測自上次任務以來新增或變更的檔案。
操作入口:在目標 Bucket 操作列,單擊增量檢測。
配置檢測參數
在彈出的對話方塊中,根據需求進行配置。
檔案檢測類型:選擇要掃描的副檔名,預設檢測全部類型的檔案。
解壓層級和單個壓縮包解壓檔案數量限制:如果需要掃描壓縮包內部,請設定解壓層數(最多5層)和單個壓縮包檔案數量(最多1000個),預設為不解壓。
檔案解密類型:如果OSS檔案使用了服務端加密(SSE-OSS或SSE-KMS),請選擇對應的解密方式,以便Security Center解密後進行檢測。
掃描路徑:
按首碼匹配:輸入檔案名稱首碼來匹配掃描指定檔案。
配置到整個Bucket:掃描整個Bucket檔案。
等待檢測任務完成
說明檢測任務完成後,後台需要進行資料統計和同步,結果通常會有 1-2小時 的延遲,請您耐心等待。
可在Bucket列表頁,查看目標檔案檢測狀態,當狀態由未檢測變成已檢測時,即表示檢測已完成。
自動檢測(增量檔案)
進入策略管理頁面
訪問Security Center控制台-風險治理-惡意檔案SDK,在頁面左側頂部,選擇需防護資產所在的地區:中國內地或非中國內地。
單擊右上方策略管理。
建立或修改檢測策略
建立新策略:在策略管理面板,單擊新增策略。
修改現有策略:在已有策略的操作列的編輯,將目標Bucket添加到該策略的生效範圍內。
參照如下說明完成配置後,單擊確定。
即時增量檢測:
說明僅中國內地地區支援配置即時增量檢測,非中國內地地區不支援。
開啟:當有新檔案上傳到指定的Bucket時,會立即觸發檢測。
關閉:按照設定的檢測周期(例如每天、每3天)和檔案檢測時間(例如淩晨02:00-04:00)對周期內新增的檔案進行掃描。
生效Bucket:選擇此策略要應用的一個或多個Bucket。若此Bucket已被其他策略佔用,則不可重複選擇。
警告策略生效後,不會自動檢測新增Bucket中的檔案,若有檢測需求請手動編輯策略,將其添加到“生效 Bucket”列表中。
附表:選擇檢測任務執行循環。
檔案檢測時間:選擇檢測任務執行時間,間隔不能低於1小時。
警告若檢測任務運行超過設定檢測時間,將自動暫停,並在下一個檢測周期內自動重啟任務。
其他參數(如解壓、解密、掃描路徑):請參見手動檢測的配置檢測參數。
整合SDK
在業務代碼中整合惡意檔案檢測SDK,在返回結果中擷取惡意檔案資訊,實現對檔案的檢測。
步驟一:準備訪問憑證與許可權
建立存取金鑰
為保障賬戶安全,請勿使用主帳號AccessKey。建議建立專用於API訪問的RAM使用者,並為其授予最小必要許可權具體操作,請參見建立RAM使用者、建立AccessKey。
建立帳號
已有RAM帳號
使用主帳號或Resource Access Management員帳號登入RAM控制台,並進入目標使用者詳情頁。
在認證管理頁簽下的AccessKey地區,單擊建立AccessKey。
根據實際業務選擇使用情境,並勾選我確認必須建立 AccessKey。單擊繼續建立。
完成安全驗證,,系統會自動為RAM使用者產生一個AccessKey ID和AccessKey Secret,請妥善保管密鑰資訊。
說明可下載密鑰CSV檔案或複製內容儲存到本地。更多內容,請參見建立AccessKey。
配置權限原則
在使用者頁面,單擊目標RAM使用者,單擊操作列的添加許可權。
在權限原則地區,選擇AliyunYundunSASFullAccess後,單擊確認新增授權。
說明更多內容請參見為RAM使用者授權。
配置環境變數
阿里雲SDK支援通過定義
ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET環境變數來建立預設的訪問憑證。調用介面時,程式直接存取憑證,讀取存取密鑰(即AccessKey)並自動完成鑒權。更多資訊,請參見配置環境變數。Linux/macOS
通過export命令配置環境變數
重要使用export命令配置的臨時環境變數僅當前會話有效,當會話退出之後所設定的環境變數將會丟失。若需長期保留環境變數,可將export命令配置到對應作業系統的啟動設定檔中。
配置AccessKey ID並按斷行符號。
# 將<ACCESS_KEY_ID>替換為您自己的AccessKey ID。 export ALIBABA_CLOUD_ACCESS_KEY_ID=yourAccessKeyID配置AccessKey Secret並斷行符號。
# 將<ACCESS_KEY_SECRET>替換為您自己的AccessKey Secret。 export ALIBABA_CLOUD_ACCESS_KEY_SECRET=yourAccessKeySecret驗證是否配置成功。
執行
echo $ALIBABA_CLOUD_ACCESS_KEY_ID命令,如果返回正確的AccessKey ID,則說明配置成功。
Windows
通過圖形化使用者介面GUI
操作步驟
以下為Windows 10中通過圖形化使用者介面設定環境變數的步驟。
在案頭按右鍵此電腦,選擇屬性>進階系統設定>環境變數>系統變數/使用者變數>建立,完成以下配置:
變數
樣本值
AccessKey ID
變數名:ALIBABA_CLOUD_ACCESS_KEY_ID
變數值:LTAI****************
AccessKey Secret
變數名:ALIBABA_CLOUD_ACCESS_KEY_SECRET
變數值:yourAccessKeySecret
測試設定是否成功
單擊開始(或快速鍵:Win+R)> 運行(輸入 cmd)> 確定(或按 Enter 鍵),開啟命令提示字元,執行
echo %ALIBABA_CLOUD_ACCESS_KEY_ID%、echo %ALIBABA_CLOUD_ACCESS_KEY_SECRET%命令。若返回正確的AccessKey,則說明配置成功。
通過命令列提示符CMD
操作步驟
以管理員身份開啟命令提示字元,並使用以下命令在系統中新增環境變數。
setx ALIBABA_CLOUD_ACCESS_KEY_ID yourAccessKeyID /M setx ALIBABA_CLOUD_ACCESS_KEY_SECRET yourAccessKeySecret /M其中
/M表示系統級環境變數,設定使用者級環境變數時可以不攜帶該參數。測試設定是否成功
單擊開始(或快速鍵:Win+R)> 運行(輸入 cmd)> 確定(或按 Enter 鍵),開啟命令提示字元,執行
echo %ALIBABA_CLOUD_ACCESS_KEY_ID%、echo %ALIBABA_CLOUD_ACCESS_KEY_SECRET%命令。若返回正確的AccessKey,則說明配置成功。
通過Windows PowerShell
在PowerShell中,設定新的環境變數(對所有新會話都有效):
[System.Environment]::SetEnvironmentVariable('ALIBABA_CLOUD_ACCESS_KEY_ID', 'yourAccessKeyID', [System.EnvironmentVariableTarget]::User) [System.Environment]::SetEnvironmentVariable('ALIBABA_CLOUD_ACCESS_KEY_SECRET', 'yourAccessKeySecret', [System.EnvironmentVariableTarget]::User)為所有使用者佈建環境變數(需要管理員權限):
[System.Environment]::SetEnvironmentVariable('ALIBABA_CLOUD_ACCESS_KEY_ID', 'yourAccessKeyID', [System.EnvironmentVariableTarget]::Machine) [System.Environment]::SetEnvironmentVariable('ALIBABA_CLOUD_ACCESS_KEY_SECRET', 'yourAccessKeySecret', [System.EnvironmentVariableTarget]::Machine)設定臨時的環境變數(僅當前會話有效):
$env:ALIBABA_CLOUD_ACCESS_KEY_ID = "yourAccessKeyID" $env:ALIBABA_CLOUD_ACCESS_KEY_SECRET = "yourAccessKeySecret"在PowerShell中,執行
Get-ChildItem env:ALIBABA_CLOUD_ACCESS_KEY_ID、Get-ChildItem env:ALIBABA_CLOUD_ACCESS_KEY_SECRET命令。若返回正確的AccessKey,則說明配置成功。
步驟二:安裝SDK
根據開發語言選擇相應的安裝方式,目前僅支援Java接入和Python接入。
Java 接入
環境要求:JDK 1.8 或更高版本。
擷取方式:
訪問Java SDK程式碼程式庫下載最新版本的 Java SDK 檔案。
將下載的SDK檔案引用至專案中。
Python 接入
環境要求:Python 3.6 或更高版本。
擷取方式:
線上安裝:使用pip命令快速安裝。
<BASH> pip install -U alibabacloud_filedetect離線安裝:
在有網路的環境中,訪問 Python程式碼程式庫並下載最新版本的Python SDK。
將下載的源碼包上傳至專案環境並解壓。
進入解壓後的SDK根目錄,執行安裝命令:
<BASH> # 切換至python SDK根目錄 # 目錄名可能因版本而異,如alibabacloud_filedetect-1.0.0 cd alibabacloud_filedetect-x.x.x/ # 使用您的python版本進行安裝 python setup.py install
步驟三:編寫代碼並執行檢測
環境配置完成後,可參考如下Python和Java程式碼範例,編寫代碼。
請根據實際情況修改範例程式碼中的待檢測檔案/目錄路徑path、URL和MD5等參數。
package com.aliyun.filedetect.sample;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import com.aliyun.filedetect.*;
public class Sample {
/**
* 同步檢測檔案介面
* @param detector 檢測器對象
* @param path 待檢測的檔案路徑
* @param timeout_ms 設定逾時時間,單位為毫秒
* @param wait_if_queuefull 如果檢測隊列滿了,false表示不等待直接返回錯誤,true表示一直等待直到隊列不滿時
* @throws InterruptedException
*/
public static DetectResult detectFileSync(OpenAPIDetector detector, String path, int timeout_ms, boolean wait_if_queuefull) throws InterruptedException {
if (null == detector || null == path) return null;
DetectResult result = null;
while(true) {
result = detector.detectSync(path, timeout_ms);
if (null == result) break;
if (result.error_code != ERR_CODE.ERR_DETECT_QUEUE_FULL) break;
if (!wait_if_queuefull) break;
detector.waitQueueAvailable(-1);
}
return result;
}
/**
* 非同步檢測檔案介面
* @param detector 檢測器對象
* @param path 待檢測的檔案路徑
* @param timeout_ms 設定逾時時間,單位為毫秒
* @param wait_if_queuefull 如果檢測隊列滿了,false表示不等待直接返回錯誤,true表示一直等待直到隊列不滿時
* @param callback 結果回呼函數
* @throws InterruptedException
*/
public static int detectFile(OpenAPIDetector detector, String path, int timeout_ms, boolean wait_if_queuefull, IDetectResultCallback callback) throws InterruptedException {
if (null == detector || null == path || null == callback) return ERR_CODE.ERR_INIT.value();
int result = ERR_CODE.ERR_INIT.value();
if (wait_if_queuefull) {
final IDetectResultCallback real_callback = callback;
callback = new IDetectResultCallback() {
public void onScanResult(int seq, String file_path, DetectResult callback_res) {
if (callback_res.error_code == ERR_CODE.ERR_DETECT_QUEUE_FULL) return;
real_callback.onScanResult(seq, file_path, callback_res);
}
};
}
while(true) {
result = detector.detect(path, timeout_ms, callback);
if (result != ERR_CODE.ERR_DETECT_QUEUE_FULL.value()) break;
if (!wait_if_queuefull) break;
detector.waitQueueAvailable(-1);
}
return result;
}
/**
* 同步檢測URL檔案介面
* @param detector 檢測器對象
* @param url 待檢測的檔案URL
* @param md5 待檢測的檔案md5
* @param timeout_ms 設定逾時時間,單位為毫秒
* @param wait_if_queuefull 如果檢測隊列滿了,false表示不等待直接返回錯誤,true表示一直等待直到隊列不滿時
* @throws InterruptedException
*/
public static DetectResult detectUrlSync(OpenAPIDetector detector, String url, String md5, int timeout_ms, boolean wait_if_queuefull) throws InterruptedException {
if (null == detector || null == url || null == md5) return null;
DetectResult result = null;
while(true) {
result = detector.detectUrlSync(url, md5, timeout_ms);
if (null == result) break;
if (result.error_code != ERR_CODE.ERR_DETECT_QUEUE_FULL) break;
if (!wait_if_queuefull) break;
detector.waitQueueAvailable(-1);
}
return result;
}
/**
* 非同步檢測URL檔案介面
* @param detector 檢測器對象
* @param url 待檢測的檔案URL
* @param md5 待檢測的檔案md5
* @param timeout_ms 設定逾時時間,單位為毫秒
* @param wait_if_queuefull 如果檢測隊列滿了,false表示不等待直接返回錯誤,true表示一直等待直到隊列不滿時
* @param callback 結果回呼函數
* @throws InterruptedException
*/
public static int detectUrl(OpenAPIDetector detector, String url, String md5, int timeout_ms, boolean wait_if_queuefull, IDetectResultCallback callback) throws InterruptedException {
if (null == detector || null == url || null == md5 || null == callback) return ERR_CODE.ERR_INIT.value();
int result = ERR_CODE.ERR_INIT.value();
if (wait_if_queuefull) {
final IDetectResultCallback real_callback = callback;
callback = new IDetectResultCallback() {
public void onScanResult(int seq, String file_path, DetectResult callback_res) {
if (callback_res.error_code == ERR_CODE.ERR_DETECT_QUEUE_FULL) return;
real_callback.onScanResult(seq, file_path, callback_res);
}
};
}
while(true) {
result = detector.detectUrl(url, md5, timeout_ms, callback);
if (result != ERR_CODE.ERR_DETECT_QUEUE_FULL.value()) break;
if (!wait_if_queuefull) break;
detector.waitQueueAvailable(-1);
}
return result;
}
/**
* 格式化檢測結果
* @param result 檢測結果對象
* @return 格式化後的字串
*/
public static String formatDetectResult(DetectResult result) {
if (result.isSucc()) {
DetectResult.DetectResultInfo info = result.getDetectResultInfo();
String msg = String.format("[DETECT RESULT] [SUCCEED] %s", formatDetectResultInfo(info));
if (info.compresslist != null) {
int idx = 1;
for (DetectResult.CompressFileDetectResultInfo comp_res : info.compresslist) {
msg += String.format("\n\t\t\t [COMPRESS FILE] [IDX:%d] %s", idx++, formatCompressFileDetectResultInfo(comp_res));
}
}
return msg;
}
DetectResult.ErrorInfo info = result.getErrorInfo();
return String.format("[DETECT RESULT] [FAIL] md5: %s, time: %d, error_code: %s, error_message: %s"
, info.md5, info.time, info.error_code.name(), info.error_string);
}
private static String formatDetectResultInfo(DetectResult.DetectResultInfo info) {
String msg = String.format("MD5: %s, TIME: %d, RESULT: %s, SCORE: %d", info.md5, info.time, info.result.name(), info.score);
if (info.compresslist != null) {
msg += String.format(", COMPRESS_FILES: %d", info.compresslist.size());
}
DetectResult.VirusInfo vinfo = info.getVirusInfo();
if (vinfo != null) {
msg += String.format(", VIRUS_TYPE: %s, EXT_INFO: %s", vinfo.virus_type, vinfo.ext_info);
}
return msg;
}
private static String formatCompressFileDetectResultInfo(DetectResult.CompressFileDetectResultInfo info) {
String msg = String.format("PATH: %s, \t\t RESULT: %s, SCORE: %d", info.path, info.result.name(), info.score);
DetectResult.VirusInfo vinfo = info.getVirusInfo();
if (vinfo != null) {
msg += String.format(", VIRUS_TYPE: %s, EXT_INFO: %s", vinfo.virus_type, vinfo.ext_info);
}
return msg;
}
/**
* 同步檢測目錄或檔案
* @param path 指定路徑,可以是檔案或者目錄。如果路徑為目錄,則遞迴遍曆其內容
* @param is_sync 是否使用同步介面,推薦使用非同步。 true是同步, false是非同步
* @throws InterruptedException
*/
public static void detectDirOrFileSync(OpenAPIDetector detector, String path, int timeout_ms, Map<String, DetectResult> result_map) throws InterruptedException {
File file = new File(path);
String abs_path = file.getAbsolutePath();
if (file.isDirectory()) {
String[] ss = file.list();
if (ss == null) return;
for (String s : ss) {
String subpath = abs_path + File.separator + s;
detectDirOrFileSync(detector, subpath, timeout_ms, result_map);
}
return;
}
System.out.println(String.format("[detectFileSync] [BEGIN] queueSize: %d, path: %s, timeout: %d", detector.getQueueSize(), abs_path, timeout_ms));
DetectResult res = detectFileSync(detector, abs_path, timeout_ms, true);
System.err.println(String.format(" [ END ] %s", formatDetectResult(res)));
result_map.put(abs_path, res);
}
/**
* 非同步檢測目錄或檔案
* @param path 指定路徑,可以是檔案或者目錄。如果路徑為目錄,則遞迴遍曆其內容
* @param is_sync 是否使用同步介面,推薦使用非同步。 true是同步, false是非同步
* @throws InterruptedException
*/
public static void detectDirOrFile(OpenAPIDetector detector, String path, int timeout_ms, IDetectResultCallback callback) throws InterruptedException {
File file = new File(path);
String abs_path = file.getAbsolutePath();
if (file.isDirectory()) {
String[] ss = file.list();
if (ss == null) return;
for (String s : ss) {
String subpath = abs_path + File.separator + s;
detectDirOrFile(detector, subpath, timeout_ms, callback);
}
return;
}
int seq = detectFile(detector, abs_path, timeout_ms, true, callback);
System.out.println(String.format("[detectFile] [BEGIN] seq: %d, queueSize: %d, path: %s, timeout: %d", seq, detector.getQueueSize(), abs_path, timeout_ms));
}
/**
* 開始對檔案或目錄進行檢測
* @param path 指定路徑,可以是檔案或者目錄。如果路徑為目錄,則遞迴遍曆其內容
* @param is_sync 是否使用同步介面,推薦使用非同步。 true是同步, false是非同步
* @throws InterruptedException
*/
public static void scan(final OpenAPIDetector detector, String path, int detect_timeout_ms, boolean is_sync) throws InterruptedException {
System.out.println(String.format("[SCAN] [START] path: %s, detect_timeout_ms: %d, is_sync: %b", path, detect_timeout_ms, is_sync));
long start_time = System.currentTimeMillis();
final Map<String, DetectResult> result_map = new HashMap<>();
if (is_sync) {
detectDirOrFileSync(detector, path, detect_timeout_ms, result_map);
} else {
detectDirOrFile(detector, path, detect_timeout_ms, new IDetectResultCallback() {
public void onScanResult(int seq, String file_path, DetectResult callback_res) {
System.err.println(String.format("[detectFile] [ END ] seq: %d, queueSize: %d, %s", seq, detector.getQueueSize(), formatDetectResult(callback_res)));
result_map.put(file_path, callback_res);
}
});
// 等待任務執行完成
detector.waitQueueEmpty(-1);
}
long used_time = System.currentTimeMillis() - start_time;
System.out.println(String.format("[SCAN] [ END ] used_time: %d, files: %d", used_time, result_map.size()));
int fail_count = 0;
int white_count = 0;
int black_count = 0;
for (Map.Entry<String, DetectResult> entry : result_map.entrySet()) {
DetectResult res = entry.getValue();
if (res.isSucc()) {
if (res.getDetectResultInfo().result == DetectResult.RESULT.RES_BLACK) {
black_count ++;
} else {
white_count ++;
}
} else {
fail_count ++;
}
}
System.out.println(String.format(" fail_count: %d, white_count: %d, black_count: %d"
, fail_count, white_count, black_count));
}
public static void main(String[] args_) throws Exception {
// 擷取檢測器執行個體
OpenAPIDetector detector = OpenAPIDetector.getInstance();
// 初始化
ERR_CODE init_ret = detector.init(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
System.out.println("INIT RET: " + init_ret.name());
// 設定解壓縮參數(可選,預設不解壓壓縮包)
boolean decompress = true; // 是否識別壓縮檔並解壓,預設為false
int decompressMaxLayer = 5; // 最大解壓層數,decompress參數為true時生效
int decompressMaxFileCount = 1000; // 最大解壓檔案數,decompress參數為true時生效
ERR_CODE initdec_ret = detector.initDecompress(decompress, decompressMaxLayer, decompressMaxFileCount);
System.out.println("INIT_DECOMPRESS RET: " + initdec_ret.name());
if (true) {
// 樣本用法1:掃描本地目錄或檔案
boolean is_sync_scan = false; // 是非同步檢測還是同步檢測。非同步檢測效能更好。false表示非同步檢測
int timeout_ms = 500000; // 單個樣本檢測時間,單位為毫秒
String path = "test2.php"; // 待掃描的檔案或目錄
// 啟動掃描,直到掃描結束
scan(detector, path, timeout_ms, is_sync_scan);
}
if (true) {
// 樣本用法2:掃描URL檔案
int timeout_ms = 500000; // 單個樣本檢測時間,單位為毫秒
String url = "https://xxxxxxxx.oss-cn-hangzhou-1.aliyuncs.com/xxxxx/xxxxxxxxxxxxxx?Expires=1*****25&OSSAccessKeyId=xxx"; // 待掃描的URL檔案
String md5 = "a767f*************6e21d000000"; // 待掃描的檔案MD5
// 同步掃描。如果需要非同步掃描,調用detectUrl介面
System.out.println(String.format("[detectUrlSync] [BEGIN] URL: %s, MD5: %s, TIMEOUT: %d", url, md5, timeout_ms));
DetectResult result = detectUrlSync(detector, url, md5, timeout_ms, true);
System.err.println(String.format("[detectUrlSync] [ END ] %s", formatDetectResult(result)));
}
// 反初始化
System.out.println("Over.");
detector.uninit();
}
}
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
import threading
import time
import traceback
from alibabacloud_filedetect.OpenAPIDetector import OpenAPIDetector
from alibabacloud_filedetect.IDetectResultCallback import IDetectResultCallback
from alibabacloud_filedetect.ERR_CODE import ERR_CODE
from alibabacloud_filedetect.DetectResult import DetectResult
class Sample(object):
def __init__(self):
pass
"""
同步檢測檔案介面
@param detector 檢測器對象
@param path 待檢測的檔案路徑
@param timeout_ms 設定逾時時間,單位為毫秒
@param wait_if_queuefull 如果檢測隊列滿了,False表示不等待直接返回錯誤,True表示一直等待直到隊列不滿時
"""
def detectFileSync(self, detector, path, timeout_ms, wait_if_queuefull):
if detector is None or path is None:
return None
result = None
while True:
result = detector.detectSync(path, timeout_ms)
if result is None:
break
if result.error_code != ERR_CODE.ERR_DETECT_QUEUE_FULL:
break
if wait_if_queuefull is False:
break
detector.waitQueueAvailable(-1)
return result
"""
非同步檢測檔案介面
@param detector 檢測器對象
@param path 待檢測的檔案路徑
@param timeout_ms 設定逾時時間,單位為毫秒
@param wait_if_queuefull 如果檢測隊列滿了,False表示不等待直接返回錯誤,True表示一直等待直到隊列不滿時
@param callback 結果回呼函數
"""
def detectFile(self, detector, path, timeout_ms, wait_if_queuefull, callback):
if detector is None or path is None or callback is None:
return ERR_CODE.ERR_INIT.value
result = ERR_CODE.ERR_INIT.value
if wait_if_queuefull:
real_callback = callback
class AsyncTaskCallback(IDetectResultCallback):
def onScanResult(self, seq, file_path, callback_res):
if callback_res.error_code == ERR_CODE.ERR_DETECT_QUEUE_FULL:
return
real_callback.onScanResult(seq, file_path, callback_res)
callback = AsyncTaskCallback()
while True:
result = detector.detect(path, timeout_ms, callback)
if result != ERR_CODE.ERR_DETECT_QUEUE_FULL.value:
break
if wait_if_queuefull is False:
break
detector.waitQueueAvailable(-1)
return result
"""
同步檢測URL檔案介面
@param detector 檢測器對象
@param url 待檢測的檔案URL
@param md5 待檢測的檔案md5
@param timeout_ms 設定逾時時間,單位為毫秒
@param wait_if_queuefull 如果檢測隊列滿了,false表示不等待直接返回錯誤,true表示一直等待直到隊列不滿時
"""
def detectUrlSync(self, detector, url, md5, timeout_ms, wait_if_queuefull):
if detector is None or url is None or md5 is None:
return None
result = None
while True:
result = detector.detectUrlSync(url, md5, timeout_ms)
if result is None:
break
if result.error_code != ERR_CODE.ERR_DETECT_QUEUE_FULL:
break
if wait_if_queuefull is False:
break
detector.waitQueueAvailable(-1)
return result
"""
非同步檢測URL檔案介面
@param detector 檢測器對象
@param url 待檢測的檔案URL
@param md5 待檢測的檔案md5
@param timeout_ms 設定逾時時間,單位為毫秒
@param wait_if_queuefull 如果檢測隊列滿了,false表示不等待直接返回錯誤,true表示一直等待直到隊列不滿時
@param callback 結果回呼函數
"""
def detectUrl(self, detector, url, md5, timeout_ms, wait_if_queuefull, callback):
if detector is None or url is None or md5 is None or callback is None:
return ERR_CODE.ERR_INIT.value
result = ERR_CODE.ERR_INIT.value
if wait_if_queuefull:
real_callback = callback
class AsyncTaskCallback(IDetectResultCallback):
def onScanResult(self, seq, file_path, callback_res):
if callback_res.error_code == ERR_CODE.ERR_DETECT_QUEUE_FULL:
return
real_callback.onScanResult(seq, file_path, callback_res)
callback = AsyncTaskCallback()
while True:
result = detector.detectUrl(url, md5, timeout_ms, callback)
if result != ERR_CODE.ERR_DETECT_QUEUE_FULL.value:
break
if wait_if_queuefull is False:
break
detector.waitQueueAvailable(-1)
return result
"""
格式化檢測結果
@param result 檢測結果對象
@return 格式化後的字串
"""
@staticmethod
def formatDetectResult(result):
msg = ""
if result.isSucc():
info = result.getDetectResultInfo()
msg = "[DETECT RESULT] [SUCCEED] {}".format(Sample.formatDetectResultInfo(info))
if info.compresslist is not None:
idx = 1
for comp_res in info.compresslist:
msg += "\n\t\t\t [COMPRESS FILE] [IDX:{}] {}".format(idx, Sample.formatCompressFileDetectResultInfo(comp_res))
idx += 1
else:
info = result.getErrorInfo()
msg = "[DETECT RESULT] [FAIL] md5: {}, time: {}, error_code: {}, error_message: {}".format(info.md5,
info.time, info.error_code.name, info.error_string)
return msg
@staticmethod
def formatDetectResultInfo(info):
msg = "MD5: {}, TIME: {}, RESULT: {}, SCORE: {}".format(info.md5, info.time, info.result.name, info.score)
if info.compresslist is not None:
msg += ", COMPRESS_FILES: {}".format(len(info.compresslist))
vinfo = info.getVirusInfo()
if vinfo is not None:
msg += ", VIRUS_TYPE: {}, EXT_INFO: {}".format(vinfo.virus_type, vinfo.ext_info)
return msg
@staticmethod
def formatCompressFileDetectResultInfo(info):
msg = "PATH: {}, \t\t RESULT: {}, SCORE: {}".format(info.path, info.result.name, info.score)
vinfo = info.getVirusInfo()
if vinfo is not None:
msg += ", VIRUS_TYPE: {}, EXT_INFO: {}".format(vinfo.virus_type, vinfo.ext_info)
return msg
"""
同步檢測目錄或檔案
@param path 指定路徑,可以是檔案或者目錄。如果路徑為目錄,則遞迴遍曆其內容
@param is_sync 是否使用同步介面,推薦使用非同步。 True是同步,False是非同步
"""
def detectDirOrFileSync(self, detector, path, timeout_ms, result_map):
abs_path = os.path.abspath(path)
if os.path.isdir(abs_path):
sub_files = os.listdir(abs_path)
if len(sub_files) == 0:
return
for sub_file in sub_files:
sub_path = os.path.join(abs_path, sub_file)
self.detectDirOrFileSync(detector, sub_path, timeout_ms, result_map)
return
print("[detectFileSync] [BEGIN] queueSize: {}, path: {}, timeout: {}".format(
detector.getQueueSize(), abs_path, timeout_ms))
res = self.detectFileSync(detector, abs_path, timeout_ms, True)
print(" [ END ] {}".format(Sample.formatDetectResult(res)))
result_map[abs_path] = res
return
"""
非同步檢測目錄或檔案
@param path 指定路徑,可以是檔案或者目錄。如果路徑為目錄,則遞迴遍曆其內容
@param is_sync 是否使用同步介面,推薦使用非同步。True是同步, False是非同步
"""
def detectDirOrFile(self, detector, path, timeout_ms, callback):
abs_path = os.path.abspath(path)
if os.path.isdir(abs_path):
sub_files = os.listdir(abs_path)
if len(sub_files) == 0:
return
for sub_file in sub_files:
sub_path = os.path.join(abs_path, sub_file)
self.detectDirOrFile(detector, sub_path, timeout_ms, callback)
return
seq = self.detectFile(detector, abs_path, timeout_ms, True, callback)
print("[detectFile] [BEGIN] seq: {}, queueSize: {}, path: {}, timeout: {}".format(
seq, detector.getQueueSize(), abs_path, timeout_ms))
return
"""
開始對檔案或目錄進行檢測
@param path 指定路徑,可以是檔案或者目錄。如果路徑為目錄,則遞迴遍曆其內容
@param is_sync 是否使用同步介面,推薦使用非同步。 True是同步,False是非同步
"""
def scan(self, detector, path, detect_timeout_ms, is_sync):
try:
print("[SCAN] [START] path: {}, detect_timeout_ms: {}, is_sync: {}".format(path, detect_timeout_ms, is_sync))
start_time = time.time()
result_map = {}
if is_sync:
self.detectDirOrFileSync(detector, path, detect_timeout_ms, result_map)
else:
class AsyncTaskCallback(IDetectResultCallback):
def onScanResult(self, seq, file_path, callback_res):
print("[detectFile] [ END ] seq: {}, queueSize: {}, {}".format(seq,
detector.getQueueSize(), Sample.formatDetectResult(callback_res)))
result_map[file_path] = callback_res
self.detectDirOrFile(detector, path, detect_timeout_ms, AsyncTaskCallback())
# 等待任務執行完成
detector.waitQueueEmpty(-1)
used_time_ms = (time.time() - start_time) * 1000
print("[SCAN] [ END ] used_time: {}, files: {}".format(int(used_time_ms), len(result_map)))
failed_count = 0
white_count = 0
black_count = 0
for file_path, res in result_map.items():
if res.isSucc():
if res.getDetectResultInfo().result == DetectResult.RESULT.RES_BLACK:
black_count += 1
else:
white_count += 1
else:
failed_count += 1
print(" fail_count: {}, white_count: {}, black_count: {}".format(
failed_count, white_count, black_count))
except Exception as e:
print(traceback.format_exc(), file=sys.stderr)
def main(self):
# 擷取檢測器執行個體
detector = OpenAPIDetector.get_instance()
# 讀取環境變數中的Access Key ID和Access Key Secret
access_key_id = os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID')
access_key_secret = os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET')
# 初始化
init_ret = detector.init(access_key_id, access_key_secret)
print("INIT RET: {}".format(init_ret.name))
# 設定解壓縮參數(可選,預設不解壓壓縮包)
decompress = True # 是否識別壓縮檔並解壓,預設為false
decompressMaxLayer = 5 # 最大解壓層數,decompress參數為true時生效
decompressMaxFileCount = 1000 # 最大解壓檔案數,decompress參數為true時生效
initdec_ret = detector.initDecompress(decompress, decompressMaxLayer, decompressMaxFileCount)
print("INIT_DECOMPRESS RET: {}".format(initdec_ret.name))
if True:
# 樣本用法1:掃描本地目錄或檔案
is_sync_scan = False # 是非同步檢測還是同步檢測。非同步檢測效能更好。False表示非同步檢測
timeout_ms = 500000 # 單個樣本檢測時間,單位為毫秒
path = "test.bin" # 待掃描的檔案或目錄
# 啟動掃描,直到掃描結束
self.scan(detector, path, timeout_ms, is_sync_scan)
if True:
# 樣本用法2:掃描URL檔案
timeout_ms = 500000
url = "https://xxxxxxxx.oss-cn-hangzhou-1.aliyuncs.com/xxxxx/xxxxxxxxxxxxxx?Expires=1671448125&OSSAccessKeyId=xxx" # 待掃描的URL檔案
md5 = "a767ffc59d93125c7505b6e21d000000"
# 同步掃描。如果需要非同步掃描,調用detectUrl介面
print("[detectUrlSync] [BEGIN] URL: {}, MD5: {}, TIMEOUT: {}".format(url, md5, timeout_ms))
result = self.detectUrlSync(detector, url, md5, timeout_ms, True)
print("[detectUrlSync] [ END ] {}".format(Sample.formatDetectResult(result)))
# 反初始化
print("Over.")
detector.uninit()
if __name__ == "__main__":
sample = Sample()
sample.main()步驟四:解析返回結果
調用SDK進行惡意檔案檢測後,可在程式的運行結果中查看檢測結果,同時結果將會同步到Security Center控制台中。
結果資料結構:每次檢測都會返回一個
DetectResult對象,其核心欄位如下:欄位名
類型
描述
md5String
檔案的MD5雜湊值。
timelong
本次檢測所用時間(毫秒)。
error_codeERR_CODE
錯誤碼,
ERR_SUCC表示成功。error_stringString
詳細的錯誤資訊描述。
resultRESULT (enum)
檢測結果枚舉:
RES_WHITE:安全RES_BLACK:可疑/惡意RES_PENDING: 檢測中。
scoreint
檢測分值,範圍為 0-100,分數越高,風險越高。
0 ~ 60:安全,檔案可信通常無需處理。
61 ~ 70:風險 (低危),檔案存在輕微風險,建議人工複核。
71 ~ 80:可疑 (中危),檔案有較大機率是惡意的,建議隔離或刪除。
81 ~ 100:惡意 (高危),檔案基本可確定為惡意檔案,應立即隔離或刪除。
virus_typeString
病毒類型,例如WebShell、MalScript、Hacktool" 等。
ext_infoString
擴充資訊,通常為JSON格式字串,包含更詳細的檢測上下文。
compresslistList
若檢測的是壓縮包且開啟瞭解壓,此列表包含壓縮包內每個檔案的檢測結果。
錯誤碼與故障排查
錯誤碼
說明
錯誤原因
建議處理方式
ERR_SUCC成功
檢測成功完成。
無
ERR_INIT初始化錯誤
SDK需要初始化,或重複進行了初始化。
確保在調用檢測介面前已成功調用
init()。ERR_FILE_NOT_FOUND檔案未找到
指定的本地檔案路徑不存在。
檢查檔案路徑是否正確,以及程式是否有檔案讀取許可權。
ERR_CALL_APIAPI調用錯誤
調用雲端API時發生未知錯誤。
檢測代碼及參數值。
ERR_TIMEOUT逾時
檢測時間超過了設定的
timeout_ms。增加
timeout_ms參數的值,或檢查網路連接。ERR_UPLOAD上傳失敗
檔案上傳至雲端進行分析時失敗,可重試。
通常為網路問題,建議增加重試邏輯。
ERR_ABORT任務終止
程式在檢測完成前退出,可能當前檢測檔案數量已超過預付費購買數量或賬戶餘額不足,無法支援後付費。
請追加購買檢測檔案數量或完成賬戶儲值。
ERR_DETECT_QUEUE_FULL檢測隊列滿
提交檢測任務過於頻繁,服務端隊列已滿。
業務高峰期可能觸發。可調用
waitQueueAvailable()等待隊列可用,或稍後重試。ERR_TIMEOUT_QUEUE隊列逾時
在隊列中等待的時間過長。
ERR_MD5MD5格式錯誤
提供的MD5字串格式不正確。
修改MD5格式。
ERR_URLURL格式錯誤
提供的URL字串格式不正確。
修改URL格式。
查看與處理檢測結果
查看結果
風險檔案總覽頁簽:
集中展示所有檢測方式(OSS檢測和API調用)發現的風險檔案。可在此搜尋、篩選風險檔案,並查看其風險等級、威脅標籤等資訊。
如果是壓縮包檔案,可單擊檔案前的展開
表徵圖,查看該壓縮包內的風險檔案清單。
OSS檔案檢測頁簽:
以Bucket維度展示風險統計資訊,如每個Bucket的風險檔案數、檢測進度等。
可單擊操作列的詳情,查看關聯的惡意檔案清單。
結果投遞:如果已開通Security Center日誌分析或日誌管理功能,惡意檔案檢測日誌將自動存入Security Center的專屬Logstore,方便您進行深度查詢與溯源分析。詳情請參見惡意檔案檢測日誌和日誌管理。
結果處理:支援對檢測結果進行加白名单、忽略、禁止訪問等操作,具體操作請參見處理檢測結果。
退訂服務
訂用帳戶
在Security Center控制台-總覽頁面的訂用帳戶服務地區,單擊。
在訂單降配頁簽的惡意檔案檢測地區,將是否選購置為否後,單擊立即下單。
隨用隨付
方式一:在Security Center控制台-總覽頁面隨用隨付服務地區,關閉惡意檔案檢測SDK開關。
方式二:
訪問Security Center控制台-風險治理-惡意檔案SDK,在頁面左側頂部,選擇需防護資產所在的地區:中國內地或非中國內地。
在風險檔案總覽頁簽,單擊的已使用檢測次數地區的停止使用。
計費說明
使用惡意檔案檢測SDK功能按照檢測的檔案個數計算檔案檢測次數進行收費。
計費方式:支援免費試用、訂用帳戶(預付費)和隨用隨付(後付費)。
免費試用:完成企業實名認證的阿里雲帳號可免費試用,提供10,000次檢測次數。
重要若免費試用次數未使用完,開通付費版功能後,剩餘的免費試用次數可繼續且優先抵扣使用。
訂用帳戶:資源套件單價為1.5美元/萬次/月,10萬次起售,購買後在所選有效期間內可用,到期後清零。
隨用隨付:
檔案檢測次數以自然日為計費周期,單價為0.0002美元/次。
開通隨用隨付服務後,系統將收取Security Center的基礎服務費(0.0072美元/小時)。
壓縮包計費:如果開啟了壓縮包檢測,將按照解壓後的檔案個數計算消耗的檢測次數。
樣本:檢測一個包含1000個小檔案的壓縮包,將消耗1000次檢測次數。
處理檢測結果:包含在功能服務費中,不額外收費。
更多付費模式和計費邏輯說明,請參見計費概述。
配額與限制
購買限制:在同一時間內,同一阿里雲帳號只能選擇一種計費方式。
說明切換計費方式後,不影響已檢測結果的展示和周期性策略的執行。
檔案大小限制:
任務總檔案大小: 無限制。
單個檔案檢測大小限制:
通過 SDK 檢測離線檔案:單個檔案不超過 100 MB。
通過控制台檢測 OSS 檔案:單個檔案不超過 1 GB。
超限說明:
當檔案大小超過限制或解壓後的檔案超出數量/大小限制時,相關檔案將不被檢測,未檢測檔案不會統計計費次數。
惡意檔案檢測請求若超出隊列容量時需等待,或最佳化並發處理能力。
壓縮包檔案檢測限制:最多解壓5層、1000個檔案、總大小最多1GB。
環境限制:不支援金融雲和政務雲環境。
預設介面請求頻率(QPS):試用版為10 次/秒,付費版為20 次/秒。
說明檢測速度受網路狀況、伺服器效能、雲產品規格等多種因素影響。
SDK 內部採用非同步隊列機制來應對請求峰值,提高並發處理能力。當內部隊列滿時,會暫停處理新請求,直至隊列有可用空間。
附錄
API參考文檔
可以通過調用API介面方式使用惡意檔案檢測功能,部分參考文檔如下,更多內容請參見惡意檔案檢測SDK、惡意檔案檢測OSS。
支援檢測的病毒類型
病毒類型(virus_type) | 病毒名稱 |
Backdoor | 反彈Shell後門 |
DDoS | DDoS木馬 |
Downloader | 下載器木馬 |
Engtest | 引擎測試程式 |
Hacktool | 駭客工具 |
Trojan | 高危程式 |
Malbaseware | 被汙染的基礎軟體 |
MalScript | 惡意指令碼 |
Malware | 惡意程式 |
Miner | 挖礦程式 |
Proxytool | 代理工具 |
RansomWare | 勒索病毒 |
RiskWare | 風險軟體 |
Rootkit | Rootkit |
Stealer | 竊密工具 |
Scanner | 掃描器 |
Suspicious | 可疑程式 |
Virus | 感染型病毒 |
WebShell | 網站後門 |
Worm | 蠕蟲 |
AdWare | 廣告軟體 |
Patcher | 破解程式 |
Gametool | 私服工具 |
常見問題
計費與額度問題
對於已經使用的檔案檢測次數,支援返還嗎?
不支援。檢測前請確認檢測的檔案範圍和檢測策略,以免因誤操作,浪費檔案檢測次數。
為什麼OSS檔案檢測頁簽的“檔案總數”和“檢測檔案總數”不一致?
檔案總數:指Bucket中檔案的自然個數(一個壓縮包計為 1 個)。
檢測檔案總數:實際掃描的檔案總數量。若開啟壓縮包掃描,其內部檔案也會計入,總數取決於解壓層級和最大解壓檔案數的設定。
當掃描了包含大量檔案的壓縮包時,“檢測檔案總數”會遠大於“檔案總數”,按實際檢測的檔案數收費。
因額度不足導致檢測任務中斷了怎麼辦?
如果使用訂用帳戶模式,當剩餘檢測次數不足時,掃描任務會中斷。可參考如下解決方案:
升級配置:在Security Center控制台-總覽頁面的訂用帳戶服務地區,單擊變更配置>立即升級,買足夠的惡意檔案檢測次數。具體說明,請參見升級與降配。
修改檢測策略:在檢測設定中關閉壓縮包解壓,以減少單次任務消耗的次數,然後重新執行掃描。
授權管理問題
如何查看惡意檔案SDK的授權詳情(如剩餘次數、到期時間)?
可在控制台查看當前帳號的個人SDK授權情況,包含版本、開通狀態、授許可權制,剩餘次數、頻率限制、到期時間等資訊。
訪問Security Center控制台-風險治理-惡意檔案SDK,在頁面左側頂部,選擇需防護資產所在的地區:中國內地或非中國內地。
單擊右上方的介面及許可權詳情。