Paraformer 即時語音辨識服務基於 WebSocket 通訊協定,以支援流式即時通訊。然而,在高並發情境下,為每個請求獨立建立和銷毀 WebSocket 串連會產生巨大的網路與系統資源開銷,並引入顯著的串連延遲。為最佳化效能並確保穩定性,DashScope SDK 內建了高效的資源複用機制(如串連池與對象池)。本文檔將詳細介紹如何利用 DashScope Java SDK 中的這些特性,在高並發情境下高效調用 Paraformer 即時語音辨識服務。
如需使用“中國內地(北京)”地區的模型,請前往“中國內地(北京)”地區的API-KEY頁面
使用者指南:關於模型介紹和選型建議請參見即時語音辨識-Fun-ASR/Gummy/Paraformer。
前提條件
Java SDK通過內建的串連池和自訂的對象池協同工作,實現最佳效能。
串連池:SDK 內部整合的 OkHttp3 串連池,負責管理和複用底層的 WebSocket 串連,減少網路握手開銷。此功能預設開啟。
對象池:基於
commons-pool2實現,用於維護一組已預先建立好串連的Recognition對象。從池中擷取對象可消除串連建立的延遲,顯著降低首包延遲。
實現步驟
添加依賴
根據專案構建工具,在依賴設定檔中添加 dashscope-sdk-java 和 commons-pool2。
以Maven和Gradle為例,配置如下:
Maven
開啟您的Maven專案的
pom.xml檔案。在
<dependencies>標籤內添加以下依賴資訊。
<dependency> <groupId>com.alibaba</groupId> <artifactId>dashscope-sdk-java</artifactId> <!-- 請將 'the-latest-version' 替換為2.16.9及以上版本,可在如下連結查詢相關版本號碼:https://mvnrepository.com/artifact/com.alibaba/dashscope-sdk-java --> <version>the-latest-version</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <!-- 請將 'the-latest-version' 替換為最新版本,可在如下連結查詢相關版本號碼:https://mvnrepository.com/artifact/org.apache.commons/commons-pool2 --> <version>the-latest-version</version> </dependency>儲存
pom.xml檔案。使用Maven命令(如
mvn clean install或mvn compile)來更新專案依賴
Gradle
開啟您的Gradle專案的
build.gradle檔案。在
dependencies塊內添加以下依賴資訊。dependencies { // 請將 'the-latest-version' 替換為2.16.9及以上版本,可在如下連結查詢相關版本號碼:https://mvnrepository.com/artifact/com.alibaba/dashscope-sdk-java implementation group: 'com.alibaba', name: 'dashscope-sdk-java', version: 'the-latest-version' // 請將 'the-latest-version' 替換為最新版本,可在如下連結查詢相關版本號碼:https://mvnrepository.com/artifact/org.apache.commons/commons-pool2 implementation group: 'org.apache.commons', name: 'commons-pool2', version: 'the-latest-version' }儲存
build.gradle檔案。在命令列中,切換到您的專案根目錄,執行以下Gradle命令來更新專案依賴。
./gradlew build --refresh-dependencies或者,如果您使用的是Windows系統,命令應為:
gradlew build --refresh-dependencies
配置串連池
通過環境變數配置串連池關鍵參數:
環境變數
描述
DASHSCOPE_CONNECTION_POOL_SIZE
串連池大小。
推薦值:峰值並發數的 2 倍以上。
預設值:32。
DASHSCOPE_MAXIMUM_ASYNC_REQUESTS
最大非同步請求數。
推薦值:與
DASHSCOPE_CONNECTION_POOL_SIZE保持一致。預設值:32。
DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST
單主機最大非同步請求數。
推薦值:與
DASHSCOPE_CONNECTION_POOL_SIZE保持一致。預設值:32。
設定物件池
通過環境變數設定物件池大小:
環境變數
描述
RECOGNITION_OBJECTPOOL_SIZE
對象池大小。
推薦值:峰值並發數的 1.5 至 2 倍。
預設值:500。
重要對象池的大小(
RECOGNITION_OBJECTPOOL_SIZE)必須小於或等於串連池的大小(DASHSCOPE_CONNECTION_POOL_SIZE)。否則,當對象池請求對象時,若串連池已滿,會導致調用線程阻塞,等待可用串連。對象池大小不應超過您賬戶的 QPS(每秒查詢率)限制。
通過如下代碼建立對象池:
class RecognitionObjectPool { // 。。。這裡省略其它代碼,完整樣本請參見完整代碼 public static GenericObjectPool<Recognition> getInstance() { lock.lock(); if (recognitionGenericObjectPool == null) { // 您可以在這裡設定對象池的大小。或在環境變數RECOGNITION_OBJECTPOOL_SIZE中設定。 // 建議設定為伺服器最大並發串連數的1.5到2倍。 int objectPoolSize = getObjectivePoolSize(); System.out.println("RECOGNITION_OBJECTPOOL_SIZE: " + objectPoolSize); RecognitionObjectFactory recognitionObjectFactory = new RecognitionObjectFactory(); GenericObjectPoolConfig<Recognition> config = new GenericObjectPoolConfig<>(); config.setMaxTotal(objectPoolSize); config.setMaxIdle(objectPoolSize); config.setMinIdle(objectPoolSize); recognitionGenericObjectPool = new GenericObjectPool<>(recognitionObjectFactory, config); } lock.unlock(); return recognitionGenericObjectPool; } }從對象池中擷取Recognition對象
如果當前未歸還的對象數量已超過對象池的最大容量,系統會額外建立一個新的
Recognition對象。此類新建立的對象需要重新進行初始化並建立 WebSocket 串連,無法利用對象池的既有串連資源,因此不具備複用效果。
recognizer = RecognitionObjectPool.getInstance().borrowObject();進行語音辨識
調用
Recognition對象的call或streamCall方法進行語音辨識。歸還
Recognition對象語音辨識任務結束後,歸還Recognition對象,以便後續任務可以複用該對象。
不要歸還未完成任務或任務失敗的對象。
RecognitionObjectPool.getInstance().returnObject(recognizer);
完整代碼
package org.alibaba.bailian.example.examples;
import com.alibaba.dashscope.audio.asr.recognition.Recognition;
import com.alibaba.dashscope.audio.asr.recognition.RecognitionParam;
import com.alibaba.dashscope.audio.asr.recognition.RecognitionResult;
import com.alibaba.dashscope.common.ResultCallback;
import com.alibaba.dashscope.exception.NoApiKeyException;
import com.alibaba.dashscope.utils.ApiKey;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import java.io.FileInputStream;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
/**
* Before making high-concurrency calls to the ASR service,
* please configure the connection pool size through following environment
* variables.
*
* DASHSCOPE_MAXIMUM_ASYNC_REQUESTS=2000
* DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST=2000
* DASHSCOPE_CONNECTION_POOL_SIZE=2000
*
* The default is 32, and it is recommended to set it to 2 times the maximum
* concurrent connections of a single server.
*/
public class Main {
public static void checkoutEnv(String envName, int defaultSize) {
if (System.getenv(envName) != null) {
System.out.println("[ENV CHECK]: " + envName + " "
+ System.getenv(envName));
} else {
System.out.println("[ENV CHECK]: " + envName
+ " Using Default which is " + defaultSize);
}
}
public static void main(String[] args)
throws NoApiKeyException, InterruptedException {
// Check for connection pool env
checkoutEnv("DASHSCOPE_CONNECTION_POOL_SIZE", 32);
checkoutEnv("DASHSCOPE_MAXIMUM_ASYNC_REQUESTS", 32);
checkoutEnv("DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST", 32);
checkoutEnv(RecognitionObjectPool.RECOGNITION_OBJECTPOOL_SIZE_ENV, RecognitionObjectPool.DEFAULT_OBJECT_POOL_SIZE);
int threadNums = 3;
String currentDir = System.getProperty("user.dir");
// Please replace the path with your audio source
Path[] filePaths = {
Paths.get(currentDir, "asr_example.wav"),
Paths.get(currentDir, "asr_example.wav"),
Paths.get(currentDir, "asr_example.wav"),
};
// Use ThreadPool to run recognition tasks
ExecutorService executorService = Executors.newFixedThreadPool(threadNums);
for (int i = 0; i < threadNums; i++) {
executorService.submit(new RealtimeRecognizeTask(filePaths));
}
executorService.shutdown();
// wait for all tasks to complete
executorService.awaitTermination(10, TimeUnit.MINUTES);
System.exit(0);
}
}
class RecognitionObjectFactory extends BasePooledObjectFactory<Recognition> {
public RecognitionObjectFactory() {
super();
}
@Override
public Recognition create() throws Exception {
return new Recognition();
}
@Override
public PooledObject<Recognition> wrap(Recognition obj) {
return new DefaultPooledObject<>(obj);
}
}
class RecognitionObjectPool {
public static GenericObjectPool<Recognition> recognitionGenericObjectPool;
public static String RECOGNITION_OBJECTPOOL_SIZE_ENV =
"RECOGNITION_OBJECTPOOL_SIZE";
public static int DEFAULT_OBJECT_POOL_SIZE = 500;
private static Lock lock = new java.util.concurrent.locks.ReentrantLock();
public static int getObjectivePoolSize() {
try {
Integer n = Integer.parseInt(System.getenv(RECOGNITION_OBJECTPOOL_SIZE_ENV));
return n;
} catch (NumberFormatException e) {
return DEFAULT_OBJECT_POOL_SIZE;
}
}
public static GenericObjectPool<Recognition> getInstance() {
lock.lock();
if (recognitionGenericObjectPool == null) {
// You can set the object pool size here. or in environment variable
// RECOGNITION_OBJECTPOOL_SIZE It is recommended to set it to 1.5 to 2
// times your server's maximum concurrent connections.
int objectPoolSize = getObjectivePoolSize();
System.out.println("RECOGNITION_OBJECTPOOL_SIZE: "
+ objectPoolSize);
RecognitionObjectFactory recognitionObjectFactory =
new RecognitionObjectFactory();
GenericObjectPoolConfig<Recognition> config =
new GenericObjectPoolConfig<>();
config.setMaxTotal(objectPoolSize);
config.setMaxIdle(objectPoolSize);
config.setMinIdle(objectPoolSize);
recognitionGenericObjectPool =
new GenericObjectPool<>(recognitionObjectFactory, config);
}
lock.unlock();
return recognitionGenericObjectPool;
}
}
class RealtimeRecognizeTask implements Runnable {
private static final Object lock = new Object();
private Path[] filePaths;
public RealtimeRecognizeTask(Path[] filePaths) {
this.filePaths = filePaths;
}
/**
* Set your DashScope API key. In
* fact, if you have set DASHSCOPE_API_KEY in your environment variable, you
* can ignore this, and the SDK will automatically get the api_key from the
* environment variable
* */
private static String getDashScopeApiKey() throws NoApiKeyException {
String dashScopeApiKey = null;
try {
ApiKey apiKey = new ApiKey();
dashScopeApiKey =
ApiKey.getApiKey(null); // Retrieve from environment variable.
} catch (NoApiKeyException e) {
System.out.println("No API key found in environment.");
}
if (dashScopeApiKey == null) {
// If you cannot set api_key in your environment variable,
// you can set it here by code
dashScopeApiKey = "your-dashscope-apikey";
}
return dashScopeApiKey;
}
public void runCallback() {
for (Path filePath : filePaths) {
// Create recognition params
// you can customize the recognition parameters, like model, format,
// sample_rate
RecognitionParam param = null;
try {
param =
RecognitionParam.builder()
.model("paraformer-realtime-v2")
.format(
"pcm") // 'pcm'、'wav'、'opus'、'speex'、'aac'、'amr', you
// can check the supported formats in the document
.sampleRate(16000) // supported 8000、16000
.apiKey(getDashScopeApiKey()) // use getDashScopeApiKey to get
// api key.
.build();
} catch (Exception e) {
throw new RuntimeException(e);
}
Recognition recognizer = null;
// if recv onError
final boolean[] hasError = {false};
try {
recognizer = RecognitionObjectPool.getInstance().borrowObject();
String threadName = Thread.currentThread().getName();
ResultCallback<RecognitionResult> callback =
new ResultCallback<RecognitionResult>() {
@Override
public void onEvent(RecognitionResult message) {
synchronized (lock) {
if (message.isSentenceEnd()) {
System.out.println("[process " + threadName
+ "] Fix:" + message.getSentence().getText());
} else {
System.out.println("[process " + threadName
+ "] Result: " + message.getSentence().getText());
}
}
}
@Override
public void onComplete() {
System.out.println("[" + threadName + "] Recognition complete");
}
@Override
public void onError(Exception e) {
System.out.println("[" + threadName
+ "] RecognitionCallback error: " + e.getMessage());
hasError[0] = true;
}
};
// Please replace the path with your audio file path
System.out.println(
"[" + threadName + "] Input file_path is: " + filePath);
FileInputStream fis = null;
// Read file and send audio by chunks
try {
fis = new FileInputStream(filePath.toFile());
} catch (Exception e) {
System.out.println("Error when loading file: " + filePath);
e.printStackTrace();
}
// set param & callback
recognizer.call(param, callback);
// chunk size set to 100 ms for 16KHz sample rate
byte[] buffer = new byte[3200];
int bytesRead;
// Loop to read chunks of the file
while ((bytesRead = fis.read(buffer)) != -1) {
ByteBuffer byteBuffer;
if (bytesRead < buffer.length) {
byteBuffer = ByteBuffer.wrap(buffer, 0, bytesRead);
} else {
byteBuffer = ByteBuffer.wrap(buffer);
}
// Send the ByteBuffer to the recognition instance
recognizer.sendAudioFrame(byteBuffer);
Thread.sleep(100);
buffer = new byte[3200];
}
System.out.println(
"[" + threadName + "] send audio done");
recognizer.stop();
System.out.println(
"[" + threadName + "] asr task finished");
} catch (Exception e) {
e.printStackTrace();
hasError[0] = true;
}
if (recognizer != null) {
try {
if (hasError[0] == true) {
// Invalid the recognition object error.
recognizer.getDuplexApi().close(1000, "bye");
RecognitionObjectPool.getInstance().invalidateObject(recognizer);
} else {
// Return the recognition object to the pool if no error or exception.
RecognitionObjectPool.getInstance().returnObject(recognizer);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
@Override
public void run() {
runCallback();
}
}推薦配置
以下配置基於在指定規格的阿里雲伺服器上僅運行 Paraformer 即時語音辨識服務的測試結果。過高的並發數可能導致任務處理延遲。
其中單機並發數指的是同一時刻正在啟動並執行Paraformer即時語音辨識任務數,也可以理解為背景工作執行緒數。
機器配置(阿里雲) | 單機最大並發數 | 對象池大小 | 串連池大小 |
4核8GiB | 100 | 500 | 2000 |
8核16GiB | 200 | 500 | 2000 |
16核32GiB | 400 | 500 | 2000 |
資源管理與異常處理
任務成功:當語音辨識任務正常完成時,必須調用GenericObjectPool的returnObject方法將Recognition對象歸還到池中,以便複用。
在當前代碼中,對應
RecognitionObjectPool.getInstance().returnObject(recognizer)重要不要歸還未完成任務或任務失敗的Recognition對象。
任務失敗:當 SDK 內部或商務邏輯拋出異常導致任務中斷時,必須執行以下兩個操作:
主動關閉底層的 WebSocket 串連
從對象池中廢棄該對象,防止被再次使用
// 在當前代碼中對應如下內容 // 關閉串連 recognizer.getDuplexApi().close(1000, "bye"); // 在對象池中廢棄出現異常的recognizer RecognitionObjectPool.getInstance().invalidateObject(recognizer);在服務出現TaskFailed報錯時,不需要額外處理。
調用預熱與耗時統計說明
在對 DashScope Java SDK 進行並發調用延遲等效能評估時,建議在正式測試前執行充分的預熱操作。預熱能夠確保測量結果準確反映服務在穩定點下的真實效能,避免因初始串連耗時導致的資料偏差。
串連複用機制
DashScope Java SDK 通過全域單例的串連池高效管理和複用 WebSocket 串連,旨在減少頻繁建連和斷連的開銷,提升高並發情境下的處理能力。
該機制的工作特點如下:
按需建立:SDK 不會在服務啟動時預建立 WebSocket 串連,而是在首次調用時按需建立。
限時複用:請求完成後,串連將在池中保留最多 60 秒以備複用。
若 60 秒內有新請求,將複用現有串連,避免重複握手開銷。
若串連空閑超過 60 秒,將被自動關閉以釋放資源。
預熱的重要性
在以下情境中,串連池中可能沒有可複用的活躍串連,導致請求需要建立串連:
應用剛啟動,尚未發起任何調用。
服務空閑時間超過 60 秒,池中串連已因逾時而關閉。
在這些情境下,首次或初期請求會觸發完整的 WebSocket 建連過程(包括 TCP 握手、TLS 加密協商和協議升級),其端到端延遲會顯著高於後續複用串連的請求。這部分額外耗時源於網路連接初始化,並非服務本身的處理延遲。因此,若未進行預熱,效能測試結果會因包含初始建連時間而產生偏差。
推薦做法
為擷取可靠的效能資料,在正式進行效能壓測或延遲統計前,請遵循以下預熱步驟:
類比正式測試的並發層級,提前發起一定數量的調用(例如,持續 1-2 分鐘),以充分填充串連池。
確認串連池已建立並維持足夠的活躍串連後,再開始正式的效能資料採集。
通過合理的預熱,可使 SDK 串連池進入穩定複用狀態,從而測量出更具代表性的延遲指標,真實反映服務線上上平穩運行時的效能。