File uploads, code releases, and data sharing pipelines can introduce malicious files—ransomware, mining programs, web shells, and more—that threaten your systems and data. Security Center's malicious file detection uses a multi-engine platform to scan files from your servers or Object Storage Service (OSS) buckets and flag threats before they cause damage.
Use cases
| Scenario | Description$0.0002 per detection$1.50 per 10,000 detections per month |
|---|---|
| Server attack prevention | Prevent worms and mining Trojans to avoid server resource consumption or use in DDoS attacks. |
| Targeted attack defense | Detect back doors and hacking tools to prevent data theft or remote system control. |
| Office network and file storage | Identify documents or compressed files with malicious macros or scripts to prevent phishing and credential theft. |
| Full system environment detection | Scan for and remove ransomware and infectious viruses to prevent data from being held for ransom or widespread system paralysis. |
Choose a detection method
Security Center supports two detection methods: SDK integration and console operation.
| SDK integration | Console operation | |
|---|---|---|
| How to use | Integrate the SDK into your business code (Java or Python) and call it programmatically | Use the Security Center console to scan buckets through a graphical interface |
| Supported file sources | Server files (local paths) and OSS files (via a valid download URL) | OSS files only |
Use SDK integration if your application handles file uploads in real time—for example, user-generated content, code releases, or partner integrations—and you need to scan files immediately as they arrive, before they enter your storage or processing pipeline.
Use console operation if you need to scan existing files in OSS buckets on a one-time or scheduled basis—for security baselines, audits, or periodic hygiene checks.
Scope
Supported file formats
| Category | Extensions |
|---|---|
| Compressed packages | .7z, .zip, .rar, .tar, .gz, .bz2, .xz, .lzma, .ar, .tar.gz |
| Scripts and web shells | .php, .jsp, .jspx, .asp, .aspx, .sh, .py, .ps1, .pl, .bat, .html |
| Documents | .doc, .pdf, .ppt, .xsl, .rtf, .hta, .chm |
| Executables and binaries | .apk, .exe, .dll, .ocx, .com, .so, .sys, .ko, .obj |
| Images | .jpg |
Compressed and encrypted file detection
Compressed packages: Security Center can decompress and scan unencrypted compressed packages up to 5 layers deep, with up to 1,000 files per package, and a total decompressed size of up to 1 GB. Decompression is disabled by default.
Encrypted files: Security Center automatically decrypts and scans files encrypted with OSS server-side encryption (SSE-KMS or SSE-OSS).
Storage class restrictions
Only OSS files in the Standard and Infrequent Access storage classes are supported. Files in the Archive storage class are not supported.
Supported regions for OSS bucket detection
China (Qingdao), China (Beijing), China (Zhangjiakou), China (Hohhot), China (Ulanqab)
China (Hangzhou), China (Shanghai)
China (Shenzhen), China (Heyuan), China (Guangzhou)
China (Chengdu), China (Hong Kong)
Singapore, Malaysia (Kuala Lumpur), Indonesia (Jakarta), Philippines (Manila), Thailand (Bangkok), Japan (Tokyo), South Korea (Seoul), US (Silicon Valley), US (Virginia), Germany (Frankfurt), UK (London)
Supported virus types
For the full list of detectable virus types, see Supported virus types in the appendix.
Prerequisites
Before you begin, ensure that you have:
An Alibaba Cloud account with enterprise identity verification completed (required for the free trial)
An OSS bucket in a supported region (for console or SDK URL-based detection)
Activate the service
Go to Security Center console - Risk Governance - Malicious File Detection. In the upper-left corner, select the region where your assets are located: Chinese Mainland or Outside Chinese Mainland.
Choose an activation method:
Try Now (free trial): Accounts with completed enterprise identity verification can activate a free trial with a quota of 10,000 file detections.
ImportantEach account is eligible for one free trial. If you activate a paid plan before your free trial quota is depleted, the remaining free detections are carried over and consumed first.
Buy Now (subscription): On the purchase page, in the Malicious File Detection area, set Purchase or Not to Yes and enter the quantity. The minimum purchase is 100,000 detections. Unused resources are forfeited upon expiration.
Activate Pay-as-you-go: In the Activate Pay-as-you-go dialog box, click Activate Pay-as-you-go. Optionally, enable Enable Policy to automatically scan newly uploaded files in selected buckets.
Configure notifications (optional)
Security Center can push malicious file alerts to a DingTalk group automatically.
Go to Security Center consoleSecurity Center consoleSecurity Center consoleSecurity Center console - System Settings - Notification Settings. In the upper-left corner, select the region where your assets are located.
On the DingTalk Chatbot tab, click Add Chatbot.
For Notify On, select all levels to detect malicious files.
For more configuration options, see Configure DingTalk robot notifications.
Run detection: console operation
Use the Security Center console to scan files stored in OSS buckets. Two modes are available:
Manual detection: One-time scans of existing files, supporting both full and incremental detection.
Automatic detection: Continuous scanning of newly uploaded files based on a policy.
Manual detection (existing files)
Go to Security Center console - Risk Governance - Malicious File Detection. In the upper-left corner, select the region where your assets are located. Then go to the OSS File Check tab.
ImportantIf a bucket does not appear in the list, click Synchronize Buckets to refresh.
Start a detection task:
Full detection scans all files in one or more buckets:
To scan a single bucket, click Check in the Actions column.
To scan multiple buckets, select them and click Batch Detection.
Incremental detection scans only files that are new or have been modified since the last scan. In the Actions column for the target bucket, click Incremental Detection.
Configure detection parameters in the dialog box:
Parameter Description File Check Type File extensions to scan. All types are scanned by default. Decompression Level Maximum decompression depth, up to 5 layers. Disabled by default. Extraction Limit Maximum number of files to extract per package, up to 1,000. File Decryption Type Decryption method for SSE-OSS or SSE-KMS encrypted files. Scan Path Scan a specific prefix (Match by Prefix) or all files in the bucket (Configure for Entire Bucket). Wait for the scan to complete. After a detection task finishes, results are typically available after a 1–2 hour delay. Monitor progress on the Bucket List page under File Check Status. The status changes from Not Checked to Checked when the scan is complete.
Automatic detection (incremental files)
Set up a detection policy to scan new files as they are uploaded.
Go to Security Center console - Risk Governance - Malicious File Detection. In the upper-left corner, select the region where your assets are located.
In the upper-right corner, click Policy Management.
Create or modify a policy:
To create a new policy, click Create Policy in the Policy Management panel.
To modify an existing policy, click Edit in the Actions column and add the target bucket to the policy scope.
Configure the policy parameters: For other parameters (decompression, decryption, scan path), see step 3 of manual detection.
WarningAfter a policy takes effect, it does not automatically apply to newly added buckets. To scan new buckets, edit the policy and add them to the Effective Buckets list.
WarningIf a detection task runs beyond the configured time window, it pauses automatically and resumes in the next detection cycle.
Parameter Description Increment Real-time Detection Enable: Triggers a scan immediately when a new file is uploaded to the bucket. Supported only in Chinese Mainland regions. Disable: Scans new files on a schedule based on the Detection Period and File Detection Time settings. Effective Bucket The buckets this policy applies to. Each bucket can be associated with only one policy. Detection Cycle Frequency of scheduled detection tasks. File Check Time Time window for scheduled scans. The interval must be at least 1 hour. Click OK.
Run detection: SDK integration
Integrate the malicious file detection SDK into your business code to scan server files or OSS files via URL.
Step 1: Set up access credentials
Use a Resource Access Management (RAM) user instead of your Alibaba Cloud account to follow the principle of least privilege.
Create a RAM user
Log on to the RAM console as an Alibaba Cloud account or RAM administrator. On the Identities > Users page, click Create User.
Enter the logon name and other user details. For Access Mode, select Using permanent AccessKey to access.
Complete security authentication. The system generates an AccessKey ID and AccessKey Secret. Save these credentials securely—you can download them as a CSV file.
For more information, see Create a RAM user and Create an AccessKey pair.
Grant permissions
On the Users page, find the RAM user and click Add Permissions in the Actions column.
In the Policy section, select AliyunYundunSASFullAccess and click Grant permissions.
For more information, see Grant permissions to a RAM user.
Configure environment variables
Store the AccessKey pair as environment variables so the SDK reads them automatically at runtime.
Linux/macOS:
# Replace yourAccessKeyID and yourAccessKeySecret with your actual values.
export ALIBABA_CLOUD_ACCESS_KEY_ID=yourAccessKeyID
export ALIBABA_CLOUD_ACCESS_KEY_SECRET=yourAccessKeySecretEnvironment variables set with export are valid for the current session only. To persist them across sessions, add the export commands to your shell's startup file (for example, ~/.bashrc or ~/.zshrc).
Windows (GUI):
On your desktop, right-click This PC and choose Properties > Advanced system settings > Environment Variables > New under System variables or User variables. Then, configure the following:
| Variable | Value |
|---|---|
Variable name: ALIBABA_CLOUD_ACCESS_KEY_ID | Variable value: yourAccessKeyID |
Variable name: ALIBABA_CLOUD_ACCESS_KEY_SECRET | Variable value: yourAccessKeySecret |
To verify, open the command prompt and run echo %ALIBABA_CLOUD_ACCESS_KEY_ID%. If the command returns your AccessKey ID, the configuration is successful.
Windows (Command Prompt):
setx ALIBABA_CLOUD_ACCESS_KEY_ID yourAccessKeyID /M
setx ALIBABA_CLOUD_ACCESS_KEY_SECRET yourAccessKeySecret /MThe /M parameter sets a system environment variable. Omit it to set a user environment variable.
Windows (PowerShell):
# System-scope (requires administrator permissions):
[System.Environment]::SetEnvironmentVariable('ALIBABA_CLOUD_ACCESS_KEY_ID', 'yourAccessKeyID', [System.EnvironmentVariableTarget]::Machine)
[System.Environment]::SetEnvironmentVariable('ALIBABA_CLOUD_ACCESS_KEY_SECRET', 'yourAccessKeySecret', [System.EnvironmentVariableTarget]::Machine)
# User-scope:
[System.Environment]::SetEnvironmentVariable('ALIBABA_CLOUD_ACCESS_KEY_ID', 'yourAccessKeyID', [System.EnvironmentVariableTarget]::User)
[System.Environment]::SetEnvironmentVariable('ALIBABA_CLOUD_ACCESS_KEY_SECRET', 'yourAccessKeySecret', [System.EnvironmentVariableTarget]::User)
# Session-scope (current session only):
$env:ALIBABA_CLOUD_ACCESS_KEY_ID = "yourAccessKeyID"
$env:ALIBABA_CLOUD_ACCESS_KEY_SECRET = "yourAccessKeySecret"Verify the configuration by running Get-ChildItem env:ALIBABA_CLOUD_ACCESS_KEY_ID in PowerShell. If the command returns your AccessKey ID, the configuration is successful.
For more information, see Configure environment variables.
Step 2: Install the SDK
Java
Prerequisite: JDK 1.8 or later.
Go to the Java SDK repository and download the latest release. Add the downloaded SDK file as a dependency in your project.
Python
Prerequisite: Python 3.6 or later.
Online installation:
pip install -U alibabacloud_filedetectOffline installation: Download the latest release from the Python SDK repository, upload it to your environment, decompress it, and run:
# The directory name varies by version, for example, alibabacloud_filedetect-1.0.0. cd alibabacloud_filedetect-x.x.x/ python setup.py install
Step 3: Write detection code
The SDK supports four detection modes: synchronous file detection, asynchronous file detection, synchronous URL detection, and asynchronous URL detection. All examples use ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables for authentication.
Replace the path, url, and md5 placeholders in the sample code with your actual values before running.
Java
package com.aliyun.filedetect.sample;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import com.aliyun.filedetect.*;
public class Sample {
/**
* Synchronous file detection interface
* @param detector Detector object
* @param path Path of the file to be detected
* @param timeout_ms Set the timeout period in milliseconds
* @param wait_if_queuefull If the detection queue is full, false indicates returning an error immediately without waiting, while true indicates waiting until the queue has space.
* @throws InterruptedException
*/
public static DetectResult detectFileSync(OpenAPIDetector detector, String path, int timeout_ms, boolean wait_if_queuefull) throws InterruptedException {
if (null == detector || null == path) return null;
DetectResult result = null;
while(true) {
result = detector.detectSync(path, timeout_ms);
if (null == result) break;
if (result.error_code != ERR_CODE.ERR_DETECT_QUEUE_FULL) break;
if (!wait_if_queuefull) break;
detector.waitQueueAvailable(-1);
}
return result;
}
/**
* Asynchronous file detection interface
* @param detector Detector object
* @param path Path of the file to be detected
* @param timeout_ms Set the timeout period in milliseconds
* @param wait_if_queuefull If the detection queue is full, false indicates returning an error immediately without waiting, while true indicates waiting until the queue has space.
* @param callback Callback function for the result
* @throws InterruptedException
*/
public static int detectFile(OpenAPIDetector detector, String path, int timeout_ms, boolean wait_if_queuefull, IDetectResultCallback callback) throws InterruptedException {
if (null == detector || null == path || null == callback) return ERR_CODE.ERR_INIT.value();
int result = ERR_CODE.ERR_INIT.value();
if (wait_if_queuefull) {
final IDetectResultCallback real_callback = callback;
callback = new IDetectResultCallback() {
public void onScanResult(int seq, String file_path, DetectResult callback_res) {
if (callback_res.error_code == ERR_CODE.ERR_DETECT_QUEUE_FULL) return;
real_callback.onScanResult(seq, file_path, callback_res);
}
};
}
while(true) {
result = detector.detect(path, timeout_ms, callback);
if (result != ERR_CODE.ERR_DETECT_QUEUE_FULL.value()) break;
if (!wait_if_queuefull) break;
detector.waitQueueAvailable(-1);
}
return result;
}
/**
* Synchronous URL file detection interface
* @param detector Detector object
* @param url URL of the file to be detected
* @param md5 MD5 hash of the file to be detected
* @param timeout_ms Set the timeout period in milliseconds
* @param wait_if_queuefull If the detection queue is full, false indicates returning an error immediately without waiting, while true indicates waiting until the queue has space.
* @throws InterruptedException
*/
public static DetectResult detectUrlSync(OpenAPIDetector detector, String url, String md5, int timeout_ms, boolean wait_if_queuefull) throws InterruptedException {
if (null == detector || null == url || null == md5) return null;
DetectResult result = null;
while(true) {
result = detector.detectUrlSync(url, md5, timeout_ms);
if (null == result) break;
if (result.error_code != ERR_CODE.ERR_DETECT_QUEUE_FULL) break;
if (!wait_if_queuefull) break;
detector.waitQueueAvailable(-1);
}
return result;
}
/**
* Asynchronous URL file detection interface
* @param detector Detector object
* @param url URL of the file to be detected
* @param md5 MD5 hash of the file to be detected
* @param timeout_ms Set the timeout period in milliseconds
* @param wait_if_queuefull If the detection queue is full, false indicates returning an error immediately without waiting, while true indicates waiting until the queue has space.
* @param callback Callback function for the result
* @throws InterruptedException
*/
public static int detectUrl(OpenAPIDetector detector, String url, String md5, int timeout_ms, boolean wait_if_queuefull, IDetectResultCallback callback) throws InterruptedException {
if (null == detector || null == url || null == md5 || null == callback) return ERR_CODE.ERR_INIT.value();
int result = ERR_CODE.ERR_INIT.value();
if (wait_if_queuefull) {
final IDetectResultCallback real_callback = callback;
callback = new IDetectResultCallback() {
public void onScanResult(int seq, String file_path, DetectResult callback_res) {
if (callback_res.error_code == ERR_CODE.ERR_DETECT_QUEUE_FULL) return;
real_callback.onScanResult(seq, file_path, callback_res);
}
};
}
while(true) {
result = detector.detectUrl(url, md5, timeout_ms, callback);
if (result != ERR_CODE.ERR_DETECT_QUEUE_FULL.value()) break;
if (!wait_if_queuefull) break;
detector.waitQueueAvailable(-1);
}
return result;
}
/**
* Format the detection result
* @param result Detection result object
* @return Formatted string
*/
public static String formatDetectResult(DetectResult result) {
if (result.isSucc()) {
DetectResult.DetectResultInfo info = result.getDetectResultInfo();
String msg = String.format("[DETECT RESULT] [SUCCEED] %s", formatDetectResultInfo(info));
if (info.compresslist != null) {
int idx = 1;
for (DetectResult.CompressFileDetectResultInfo comp_res : info.compresslist) {
msg += String.format("\n\t\t\t [COMPRESS FILE] [IDX:%d] %s", idx++, formatCompressFileDetectResultInfo(comp_res));
}
}
return msg;
}
DetectResult.ErrorInfo info = result.getErrorInfo();
return String.format("[DETECT RESULT] [FAIL] md5: %s, time: %d, error_code: %s, error_message: %s"
, info.md5, info.time, info.error_code.name(), info.error_string);
}
private static String formatDetectResultInfo(DetectResult.DetectResultInfo info) {
String msg = String.format("MD5: %s, TIME: %d, RESULT: %s, SCORE: %d", info.md5, info.time, info.result.name(), info.score);
if (info.compresslist != null) {
msg += String.format(", COMPRESS_FILES: %d", info.compresslist.size());
}
DetectResult.VirusInfo vinfo = info.getVirusInfo();
if (vinfo != null) {
msg += String.format(", VIRUS_TYPE: %s, EXT_INFO: %s", vinfo.virus_type, vinfo.ext_info);
}
return msg;
}
private static String formatCompressFileDetectResultInfo(DetectResult.CompressFileDetectResultInfo info) {
String msg = String.format("PATH: %s, \t\t RESULT: %s, SCORE: %d", info.path, info.result.name(), info.score);
DetectResult.VirusInfo vinfo = info.getVirusInfo();
if (vinfo != null) {
msg += String.format(", VIRUS_TYPE: %s, EXT_INFO: %s", vinfo.virus_type, vinfo.ext_info);
}
return msg;
}
/**
* Synchronously detect a directory or file
* @param path The specified path can be a file or a directory. If it is a directory, its content is recursively traversed.
* @param is_sync Specifies whether to use the synchronous interface. We recommend using the asynchronous interface. true: synchronous, false: asynchronous
* @throws InterruptedException
*/
public static void detectDirOrFileSync(OpenAPIDetector detector, String path, int timeout_ms, Map<String, DetectResult> result_map) throws InterruptedException {
File file = new File(path);
String abs_path = file.getAbsolutePath();
if (file.isDirectory()) {
String[] ss = file.list();
if (ss == null) return;
for (String s : ss) {
String subpath = abs_path + File.separator + s;
detectDirOrFileSync(detector, subpath, timeout_ms, result_map);
}
return;
}
System.out.println(String.format("[detectFileSync] [BEGIN] queueSize: %d, path: %s, timeout: %d", detector.getQueueSize(), abs_path, timeout_ms));
DetectResult res = detectFileSync(detector, abs_path, timeout_ms, true);
System.err.println(String.format(" [ END ] %s", formatDetectResult(res)));
result_map.put(abs_path, res);
}
/**
* Asynchronously detect a directory or file
* @param path The specified path can be a file or a directory. If it is a directory, its content is recursively traversed.
* @param is_sync Specifies whether to use the synchronous interface. We recommend using the asynchronous interface. true: synchronous, false: asynchronous
* @throws InterruptedException
*/
public static void detectDirOrFile(OpenAPIDetector detector, String path, int timeout_ms, IDetectResultCallback callback) throws InterruptedException {
File file = new File(path);
String abs_path = file.getAbsolutePath();
if (file.isDirectory()) {
String[] ss = file.list();
if (ss == null) return;
for (String s : ss) {
String subpath = abs_path + File.separator + s;
detectDirOrFile(detector, subpath, timeout_ms, callback);
}
return;
}
int seq = detectFile(detector, abs_path, timeout_ms, true, callback);
System.out.println(String.format("[detectFile] [BEGIN] seq: %d, queueSize: %d, path: %s, timeout: %d", seq, detector.getQueueSize(), abs_path, timeout_ms));
}
/**
* Start detecting a file or directory
* @param path The specified path can be a file or a directory. If it is a directory, its content is recursively traversed.
* @param is_sync Specifies whether to use the synchronous interface. We recommend using the asynchronous interface. true: synchronous, false: asynchronous
* @throws InterruptedException
*/
public static void scan(final OpenAPIDetector detector, String path, int detect_timeout_ms, boolean is_sync) throws InterruptedException {
System.out.println(String.format("[SCAN] [START] path: %s, detect_timeout_ms: %d, is_sync: %b", path, detect_timeout_ms, is_sync));
long start_time = System.currentTimeMillis();
final Map<String, DetectResult> result_map = new HashMap<>();
if (is_sync) {
detectDirOrFileSync(detector, path, detect_timeout_ms, result_map);
} else {
detectDirOrFile(detector, path, detect_timeout_ms, new IDetectResultCallback() {
public void onScanResult(int seq, String file_path, DetectResult callback_res) {
System.err.println(String.format("[detectFile] [ END ] seq: %d, queueSize: %d, %s", seq, detector.getQueueSize(), formatDetectResult(callback_res)));
result_map.put(file_path, callback_res);
}
});
// Wait for the task to complete.
detector.waitQueueEmpty(-1);
}
long used_time = System.currentTimeMillis() - start_time;
System.out.println(String.format("[SCAN] [ END ] used_time: %d, files: %d", used_time, result_map.size()));
int fail_count = 0;
int white_count = 0;
int black_count = 0;
for (Map.Entry<String, DetectResult> entry : result_map.entrySet()) {
DetectResult res = entry.getValue();
if (res.isSucc()) {
if (res.getDetectResultInfo().result == DetectResult.RESULT.RES_BLACK) {
black_count ++;
} else {
white_count ++;
}
} else {
fail_count ++;
}
}
System.out.println(String.format(" fail_count: %d, white_count: %d, black_count: %d"
, fail_count, white_count, black_count));
}
public static void main(String[] args_) throws Exception {
// Get the detector instance.
OpenAPIDetector detector = OpenAPIDetector.getInstance();
// Initialize.
ERR_CODE init_ret = detector.init(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
System.out.println("INIT RET: " + init_ret.name());
// Set decompression parameters (optional, compressed packages are not decompressed by default).
boolean decompress = true; // Specifies whether to identify and decompress compressed files. Default is false.
int decompressMaxLayer = 5; // Maximum decompression level. Effective when decompress is true.
int decompressMaxFileCount = 1000; // Maximum number of decompressed files. Effective when decompress is true.
ERR_CODE initdec_ret = detector.initDecompress(decompress, decompressMaxLayer, decompressMaxFileCount);
System.out.println("INIT_DECOMPRESS RET: " + initdec_ret.name());
if (true) {
// Example 1: Scan a local directory or file.
boolean is_sync_scan = false; // Specifies whether to use asynchronous or synchronous detection. Asynchronous detection offers better performance. false indicates asynchronous detection.
int timeout_ms = 500000; // Detection time for a single sample, in milliseconds.
String path = "test2.php"; // The file or directory to be scanned.
// Start the scan and wait for it to finish.
scan(detector, path, timeout_ms, is_sync_scan);
}
if (true) {
// Example 2: Scan a URL file.
int timeout_ms = 500000; // Detection time for a single sample, in milliseconds.
String url = "https://xxxxxxxx.oss-cn-hangzhou-1.aliyuncs.com/xxxxx/xxxxxxxxxxxxxx?Expires=1*****25&OSSAccessKeyId=xxx"; // The URL file to be scanned.
String md5 = "a767f*************6e21d000000"; // The MD5 hash of the file to be scanned.
// Synchronous scan. To perform an asynchronous scan, call the detectUrl interface.
System.out.println(String.format("[detectUrlSync] [BEGIN] URL: %s, MD5: %s, TIMEOUT: %d", url, md5, timeout_ms));
DetectResult result = detectUrlSync(detector, url, md5, timeout_ms, true);
System.err.println(String.format("[detectUrlSync] [ END ] %s", formatDetectResult(result)));
}
// Deinitialize.
System.out.println("Over.");
detector.uninit();
}
}Python
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
import threading
import time
import traceback
from alibabacloud_filedetect.OpenAPIDetector import OpenAPIDetector
from alibabacloud_filedetect.IDetectResultCallback import IDetectResultCallback
from alibabacloud_filedetect.ERR_CODE import ERR_CODE
from alibabacloud_filedetect.DetectResult import DetectResult
class Sample(object):
def __init__(self):
pass
"""
Synchronous file detection interface
@param detector: Detector object
@param path: Path of the file to be detected
@param timeout_ms: Set the timeout period in milliseconds
@param wait_if_queuefull: If the detection queue is full, False indicates returning an error immediately without waiting, while True indicates waiting until the queue has space.
"""
def detectFileSync(self, detector, path, timeout_ms, wait_if_queuefull):
if detector is None or path is None:
return None
result = None
while True:
result = detector.detectSync(path, timeout_ms)
if result is None:
break
if result.error_code != ERR_CODE.ERR_DETECT_QUEUE_FULL:
break
if wait_if_queuefull is False:
break
detector.waitQueueAvailable(-1)
return result
"""
Asynchronous file detection interface
@param detector: Detector object
@param path: Path of the file to be detected
@param timeout_ms: Set the timeout period in milliseconds
@param wait_if_queuefull: If the detection queue is full, False indicates returning an error immediately without waiting, while True indicates waiting until the queue has space.
@param callback: Callback function for the result
"""
def detectFile(self, detector, path, timeout_ms, wait_if_queuefull, callback):
if detector is None or path is None or callback is None:
return ERR_CODE.ERR_INIT.value
result = ERR_CODE.ERR_INIT.value
if wait_if_queuefull:
real_callback = callback
class AsyncTaskCallback(IDetectResultCallback):
def onScanResult(self, seq, file_path, callback_res):
if callback_res.error_code == ERR_CODE.ERR_DETECT_QUEUE_FULL:
return
real_callback.onScanResult(seq, file_path, callback_res)
callback = AsyncTaskCallback()
while True:
result = detector.detect(path, timeout_ms, callback)
if result != ERR_CODE.ERR_DETECT_QUEUE_FULL.value:
break
if wait_if_queuefull is False:
break
detector.waitQueueAvailable(-1)
return result
"""
Synchronous URL file detection interface
@param detector: Detector object
@param url: URL of the file to be detected
@param md5: MD5 hash of the file to be detected
@param timeout_ms: Set the timeout period in milliseconds
@param wait_if_queuefull: If the detection queue is full, False indicates returning an error immediately without waiting, while True indicates waiting until the queue has space.
"""
def detectUrlSync(self, detector, url, md5, timeout_ms, wait_if_queuefull):
if detector is None or url is None or md5 is None:
return None
result = None
while True:
result = detector.detectUrlSync(url, md5, timeout_ms)
if result is None:
break
if result.error_code != ERR_CODE.ERR_DETECT_QUEUE_FULL:
break
if wait_if_queuefull is False:
break
detector.waitQueueAvailable(-1)
return result
"""
Asynchronous URL file detection interface
@param detector: Detector object
@param url: URL of the file to be detected
@param md5: MD5 hash of the file to be detected
@param timeout_ms: Set the timeout period in milliseconds
@param wait_if_queuefull: If the detection queue is full, False indicates returning an error immediately without waiting, while True indicates waiting until the queue has space.
@param callback: Callback function for the result
"""
def detectUrl(self, detector, url, md5, timeout_ms, wait_if_queuefull, callback):
if detector is None or url is None or md5 is None or callback is None:
return ERR_CODE.ERR_INIT.value
result = ERR_CODE.ERR_INIT.value
if wait_if_queuefull:
real_callback = callback
class AsyncTaskCallback(IDetectResultCallback):
def onScanResult(self, seq, file_path, callback_res):
if callback_res.error_code == ERR_CODE.ERR_DETECT_QUEUE_FULL:
return
real_callback.onScanResult(seq, file_path, callback_res)
callback = AsyncTaskCallback()
while True:
result = detector.detectUrl(url, md5, timeout_ms, callback)
if result != ERR_CODE.ERR_DETECT_QUEUE_FULL.value:
break
if wait_if_queuefull is False:
break
detector.waitQueueAvailable(-1)
return result
"""
Format the detection result
@param result: Detection result object
@return: Formatted string
"""
@staticmethod
def formatDetectResult(result):
msg = ""
if result.isSucc():
info = result.getDetectResultInfo()
msg = "[DETECT RESULT] [SUCCEED] {}".format(Sample.formatDetectResultInfo(info))
if info.compresslist is not None:
idx = 1
for comp_res in info.compresslist:
msg += "\n\t\t\t [COMPRESS FILE] [IDX:{}] {}".format(idx, Sample.formatCompressFileDetectResultInfo(comp_res))
idx += 1
else:
info = result.getErrorInfo()
msg = "[DETECT RESULT] [FAIL] md5: {}, time: {}, error_code: {}, error_message: {}".format(info.md5,
info.time, info.error_code.name, info.error_string)
return msg
@staticmethod
def formatDetectResultInfo(info):
msg = "MD5: {}, TIME: {}, RESULT: {}, SCORE: {}".format(info.md5, info.time, info.result.name, info.score)
if info.compresslist is not None:
msg += ", COMPRESS_FILES: {}".format(len(info.compresslist))
vinfo = info.getVirusInfo()
if vinfo is not None:
msg += ", VIRUS_TYPE: {}, EXT_INFO: {}".format(vinfo.virus_type, vinfo.ext_info)
return msg
@staticmethod
def formatCompressFileDetectResultInfo(info):
msg = "PATH: {}, \t\t RESULT: {}, SCORE: {}".format(info.path, info.result.name, info.score)
vinfo = info.getVirusInfo()
if vinfo is not None:
msg += ", VIRUS_TYPE: {}, EXT_INFO: {}".format(vinfo.virus_type, vinfo.ext_info)
return msg
"""
Synchronously detect a directory or file
@param path: The specified path can be a file or a directory. If it is a directory, its content is recursively traversed.
@param is_sync: Specifies whether to use the synchronous interface. We recommend using the asynchronous interface. True: synchronous, False: asynchronous
"""
def detectDirOrFileSync(self, detector, path, timeout_ms, result_map):
abs_path = os.path.abspath(path)
if os.path.isdir(abs_path):
sub_files = os.listdir(abs_path)
if len(sub_files) == 0:
return
for sub_file in sub_files:
sub_path = os.path.join(abs_path, sub_file)
self.detectDirOrFileSync(detector, sub_path, timeout_ms, result_map)
return
print("[detectFileSync] [BEGIN] queueSize: {}, path: {}, timeout: {}".format(
detector.getQueueSize(), abs_path, timeout_ms))
res = self.detectFileSync(detector, abs_path, timeout_ms, True)
print(" [ END ] {}".format(Sample.formatDetectResult(res)))
result_map[abs_path] = res
return
"""
Asynchronously detect a directory or file
@param path: The specified path can be a file or a directory. If it is a directory, its content is recursively traversed.
@param is_sync: Specifies whether to use the synchronous interface. We recommend using the asynchronous interface. True: synchronous, False: asynchronous
"""
def detectDirOrFile(self, detector, path, timeout_ms, callback):
abs_path = os.path.abspath(path)
if os.path.isdir(abs_path):
sub_files = os.listdir(abs_path)
if len(sub_files) == 0:
return
for sub_file in sub_files:
sub_path = os.path.join(abs_path, sub_file)
self.detectDirOrFile(detector, sub_path, timeout_ms, callback)
return
seq = self.detectFile(detector, abs_path, timeout_ms, True, callback)
print("[detectFile] [BEGIN] seq: {}, queueSize: {}, path: {}, timeout: {}".format(
seq, detector.getQueueSize(), abs_path, timeout_ms))
return
"""
Start detecting a file or directory
@param path: The specified path can be a file or a directory. If it is a directory, its content is recursively traversed.
@param is_sync: Specifies whether to use the synchronous interface. We recommend using the asynchronous interface. True: synchronous, False: asynchronous
"""
def scan(self, detector, path, detect_timeout_ms, is_sync):
try:
print("[SCAN] [START] path: {}, detect_timeout_ms: {}, is_sync: {}".format(path, detect_timeout_ms, is_sync))
start_time = time.time()
result_map = {}
if is_sync:
self.detectDirOrFileSync(detector, path, detect_timeout_ms, result_map)
else:
class AsyncTaskCallback(IDetectResultCallback):
def onScanResult(self, seq, file_path, callback_res):
print("[detectFile] [ END ] seq: {}, queueSize: {}, {}".format(seq,
detector.getQueueSize(), Sample.formatDetectResult(callback_res)))
result_map[file_path] = callback_res
self.detectDirOrFile(detector, path, detect_timeout_ms, AsyncTaskCallback())
# Wait for the task to complete.
detector.waitQueueEmpty(-1)
used_time_ms = (time.time() - start_time) * 1000
print("[SCAN] [ END ] used_time: {}, files: {}".format(int(used_time_ms), len(result_map)))
failed_count = 0
white_count = 0
black_count = 0
for file_path, res in result_map.items():
if res.isSucc():
if res.getDetectResultInfo().result == DetectResult.RESULT.RES_BLACK:
black_count += 1
else:
white_count += 1
else:
failed_count += 1
print(" fail_count: {}, white_count: {}, black_count: {}".format(
failed_count, white_count, black_count))
except Exception as e:
print(traceback.format_exc(), file=sys.stderr)
def main(self):
# Get the detector instance.
detector = OpenAPIDetector.get_instance()
# Read the AccessKey ID and AccessKey Secret from environment variables.
access_key_id = os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID')
access_key_secret = os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET')
# Initialize.
init_ret = detector.init(access_key_id, access_key_secret)
print("INIT RET: {}".format(init_ret.name))
# Set decompression parameters (optional, compressed packages are not decompressed by default).
decompress = True # Specifies whether to identify and decompress compressed files. Default is false.
decompressMaxLayer = 5 # Maximum decompression level. Effective when decompress is true.
decompressMaxFileCount = 1000 # Maximum number of decompressed files. Effective when decompress is true.
initdec_ret = detector.initDecompress(decompress, decompressMaxLayer, decompressMaxFileCount)
print("INIT_DECOMPRESS RET: {}".format(initdec_ret.name))
if True:
# Example 1: Scan a local directory or file.
is_sync_scan = False # Specifies whether to use asynchronous or synchronous detection. Asynchronous detection offers better performance. False indicates asynchronous detection.
timeout_ms = 500000 # Detection time for a single sample, in milliseconds.
path = "test.bin" # The file or directory to be scanned.
# Start the scan and wait for it to finish.
self.scan(detector, path, timeout_ms, is_sync_scan)
if True:
# Example 2: Scan a URL file.
timeout_ms = 500000
url = "https://xxxxxxxx.oss-cn-hangzhou-1.aliyuncs.com/xxxxx/xxxxxxxxxxxxxx?Expires=1671448125&OSSAccessKeyId=xxx" # The URL file to be scanned.
md5 = "a767ffc59d93125c7505b6e21d000000"
# Synchronous scan. To perform an asynchronous scan, call the detectUrl interface.
print("[detectUrlSync] [BEGIN] URL: {}, MD5: {}, TIMEOUT: {}".format(url, md5, timeout_ms))
result = self.detectUrlSync(detector, url, md5, timeout_ms, True)
print("[detectUrlSync] [ END ] {}".format(Sample.formatDetectResult(result)))
# Deinitialize.
print("Over.")
detector.uninit()
if __name__ == "__main__":
sample = Sample()
sample.main()Step 4: Parse the detection result
Each detection returns a DetectResult object. Use the following fields to interpret the result.
Result fields
| Field | Type | Description |
|---|---|---|
md5 | String | MD5 hash of the file |
time | long | Detection duration, in milliseconds |
error_code | ERR_CODE | Error code. ERR_SUCC indicates success. |
error_string | String | Detailed error description |
result | RESULT (enum) | RES_WHITE: Safe; RES_BLACK: Suspicious or malicious; RES_PENDING: Detection in progress |
score | int | Risk score from 0 to 100. Higher score means higher risk. |
virus_type | String | Virus type, such as WebShell, MalScript, or Hacktool |
ext_info | String | Extended information in JSON format with detailed detection context |
compresslist | List | Detection results for individual files within a compressed package (when decompression is enabled) |
Score interpretation
| Score | Risk level | Recommended action |
|---|---|---|
| 0–60 | Safe | No action required |
| 61–70 | Low risk | Manual review recommended |
| 71–80 | Medium (suspicious) | Isolate or delete the file |
| 81–100 | High (malicious) | Isolate or delete immediately |
Error codes
| Error code | Description | Cause | Recommended action |
|---|---|---|---|
ERR_SUCC | Success | Detection completed successfully | None |
ERR_INIT | Initialization error | SDK not initialized, or initialized more than once | Call init() successfully before calling the detection interface |
ERR_FILE_NOT_FOUND | File not found | The specified local file path does not exist | Check the file path and read permissions |
ERR_CALL_API | API call error | An unknown error occurred when calling the cloud API | Check code and parameter values |
ERR_TIMEOUT | Timeout | Detection time exceeded timeout_ms | Increase timeout_ms or check your network connection |
ERR_UPLOAD | Upload failed | Failed to upload the file to the cloud for analysis | Add retry logic—this is usually a transient network issue |
ERR_ABORT | Task aborted | The program exited before detection completed, or the subscription quota is exhausted, or the account balance is insufficient | Purchase more file detections or top up your account |
ERR_DETECT_QUEUE_FULL | Queue full | Detection tasks were submitted too fast and the server queue is full | Call waitQueueAvailable() to wait for the queue, or retry later |
ERR_TIMEOUT_QUEUE | Queue timeout | Waiting time in the queue was too long | — |
ERR_MD5 | MD5 format error | The MD5 string format is incorrect | Correct the MD5 format |
ERR_URL | URL format error | The URL string format is incorrect | Correct the URL format |
View and handle detection results
View results
Console (At-risk File Overview tab)
This tab shows all risk files found across all detection methods—console scans and SDK calls. Filter and search by risk level, threat tag, or other attributes.
For compressed packages, click the expand icon next to the file name to see individual risk files within the package.
Console (OSS File Check tab)
This tab shows risk statistics per bucket: number of risk files and detection progress. Click Details in the Actions column to see the list of associated malicious files.
Logs
If you enable Log Analysis or Log Management in Security Center, malicious file detection logs are automatically stored in a dedicated Logstore for in-depth queries and trace analysis. See Malicious file detection logs and Log Management.
Handle results
Take action on detected files by adding them to the whitelist, ignoring them, or denying access. For details, see Handle detection results.
Billing
Billing is based on the number of files scanned, not total file size.
| Billing method | Details |
|---|---|
| Free trial | 10,000 file detections. Available to accounts with completed enterprise identity verification. Each account is eligible for one free trial. |
| Subscription | USD 1.5 per 10,000 detections per month. Minimum purchase: 100,000 detections. The plan is active for the selected validity period. Unused resources are forfeited upon expiration. |
| Pay-as-you-go | USD 0.0002 per detection, billed per calendar day. After you enable the pay-as-you-go service, a basic service fee of USD 0.0072/hour also applies. |
Billing for compressed files: When compressed package scanning is enabled, each file inside the package counts as one detection. Scanning a compressed package that contains 1,000 files consumes 1,000 detections.
Handling results: Processing detection results (adding to whitelist, ignoring, denying access) is included in the service fee at no additional charge.
Limitations
| Item | Limit |
|---|---|
| Billing methods per account | One at a time. Switching billing methods does not affect existing scan results or periodic policy execution. |
| Single file size (SDK) | 100 MB |
| Single file size (console) | 1 GB |
| Total file size per task | Unlimited |
| Compressed file scan depth | 5 layers |
| Compressed file count per package | 1,000 files, 1 GB total |
| QPS—trial | 10 |
| QPS—paid | 20 |
Files that exceed the size limit, or decompressed files that exceed the quantity or size limit, are not scanned. Unscanned files are not billed.
If scan requests exceed queue capacity, they are queued. To reduce wait times, optimize your concurrent processing or call waitQueueAvailable() in the SDK.
Unsubscribe from the service
Subscription
On the Security Center console - OverviewSecurity Center console - OverviewSecurity Center console - Overview page, in the Subscription section, click Change > Downgrade.
On the Order Downgrade tab, in the Malicious File Detection section, set Purchase or Not to No and click Order Now.
Pay-as-you-go
On the Security Center console - OverviewSecurity Center console - OverviewSecurity Center console - Overview page, in the Pay-as-you-go section, turn off the Malicious File Detection switch.
Alternatively, go to Security Center console - Risk Governance - Malicious File Detection. On the At-risk File Overview tab, in the Used Quotas section, click Suspended.
FAQ
Are used file scan quotas refundable?
No. Confirm the file scope and scan policy before starting a scan to avoid wasting your quota.
Why do "Total Files" and "Total Scanned Files" show different numbers on the OSS File Scan tab?
Total Files counts each compressed package as one file. Total Scanned Files counts each file inside a decompressed package separately. If you scan a compressed package with many files, Total Scanned Files will be much higher than Total Files. Billing is based on Total Scanned Files.
What should I do if a scan is interrupted because of an insufficient quota?
For subscription users: purchase additional quota or disable Compressed Package Decompression in the scan settings to reduce consumption per job, then re-run the scan. See Upgrades and downgrades.
How can I check the remaining quota and authorization details for the malicious file SDK?
Go to Security Center console - Risk Governance - SDK for Malicious File Detection. In the upper-left corner, select the region where your assets are located.
In the upper-right corner, click SDK and Permissions.
The SDK Authorization for User panel shows the version, activation status, authorization limits, remaining quotas, frequency limits, and expiration time.
Appendix
API reference
Use the following API operations for programmatic integration. For more information, see File detect and Malicious File Detection OSS.
GetFileDetectApiInvokeInfo – Retrieves usage information about the malicious file detection SDK
CreateFileDetectUploadUrl – Gets file upload parameters
CreateFileDetect – Submits a file for detection
GetFileDetectResult – Retrieves the file detection result
ListCompressFileDetectResult – Retrieves detection results for files in an archive
Supported virus types
Virus type (virus_type) | Description |
|---|---|
| Backdoor | Reverse shell back door |
| DDoS | DDoS Trojan |
| Downloader | Downloader Trojan |
| Engtest | DPI engine test program |
| Hacktool | Hacking tool |
| Trojan | Critical program |
| Malbaseware | Contaminated basic software |
| MalScript | Malicious script |
| Malware | Malicious program |
| Miner | Mining program |
| Proxytool | Proxy tool |
| RansomWare | Ransomware |
| RiskWare | Riskware |
| Rootkit | Rootkit |
| Stealer | Information-stealing tool |
| Scanner | Scanner |
| Suspicious | Suspicious program |
| Virus | Infectious virus |
| WebShell | Web shell |
| Worm | Worm |
| AdWare | Adware |
| Patcher | Cracking program |
| Gametool | Private server tool |