This topic describes how to efficiently call the Paraformer real-time speech recognition service using the DashScope Java SDK in high-concurrency scenarios.
This document applies only to the China (Beijing) region. To use the model, you must use an API key from the China (Beijing) region.
User Guide: For model descriptions and selection recommendations, see Real-time Speech Recognition.
The Paraformer real-time speech recognition service is implemented using the WebSocket protocol. In high-concurrency scenarios, repeatedly creating WebSocket connections can consume significant resources.
When you use the DashScope Java SDK, you can set an appropriate size for the connection and object pools based on your server's specifications to reduce operational overhead.
Prerequisites
You have activated Alibaba Cloud Model Studio and obtained an API key. Configure the API key as an environment variable instead of hard coding it in your code to prevent security risks from code leaks.
Recommended configurations
The sizes of the connection and object pools affect performance. If these pools are too small or too large, performance can be degraded. We recommend that you configure the pool sizes based on your server's specifications.
The following table lists the recommended configurations based on tests that were conducted on a server running only the Paraformer real-time speech recognition service.
Common server configurations (Alibaba Cloud) | Maximum concurrency of a single server | Object pool size | Connection pool size |
4-core 8 GiB | 100 | 500 | 2000 |
8-core 16 GiB | 200 | 500 | 2000 |
16-core 32 GiB | 400 | 500 | 2000 |
The maximum concurrency of a single server is the number of Paraformer real-time speech recognition tasks that are running simultaneously. This is equivalent to the number of worker threads.
Configurable parameters
Connection pool
The DashScope Java SDK uses the connection pool provided by OkHttp3 to reuse WebSocket connections. This reduces the time and resource overhead from repeatedly creating WebSocket connections.
The connection pool is an optimization feature that is enabled by default in the SDK. You can configure the size of the connection pool based on your scenario.
Before you run the Java service, you can configure the connection pool parameters as environment variables on the server. The following table describes these parameters.
DASHSCOPE_CONNECTION_POOL_SIZE | Configures the connection pool size. We recommend that you set this parameter to a value that is at least twice the peak concurrency. The default value is 32. |
DASHSCOPE_MAXIMUM_ASYNC_REQUESTS | Configures the maximum number of asynchronous requests. We recommend that you set this parameter to the same value as the connection pool size. The default value is 32. For more information, see the referenced document. |
DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST | Configures the maximum number of asynchronous requests for a single host. We recommend that you set this parameter to the same value as the connection pool size. The default value is 32. For more information, see the referenced document. |
Object pool
We recommend that you use an object pool to reuse Recognition objects. This further reduces the memory and time overhead from repeatedly creating and destroying objects.
Before you run the Java service, you can configure the object pool size as an environment variable or in your code. The following table describes the object pool configuration parameter.
RECOGNITION_OBJECTPOOL_SIZE | The size of the object pool. The default value is 500. We recommend that you set this parameter to a value that is 1.5 to 2 times the peak concurrency. The object pool size must be less than or equal to the connection pool size. Otherwise, objects may wait for connections, which can cause calls to block. |
For more information, see Set API key as environment variable.
Sample code
The following code provides an example of how to use a resource pool. The object pool is a global singleton object.
By default, each Alibaba Cloud account can submit 20 Paraformer real-time speech recognition tasks per second.
To request a higher queries per second (QPS) limit, contact us.
Before you run the sample code, download the asr_example.wav sample audio file, or replace it with a local audio file.
package org.alibaba.bailian.example.examples;
import com.alibaba.dashscope.audio.asr.recognition.Recognition;
import com.alibaba.dashscope.audio.asr.recognition.RecognitionParam;
import com.alibaba.dashscope.audio.asr.recognition.RecognitionResult;
import com.alibaba.dashscope.common.ResultCallback;
import com.alibaba.dashscope.exception.NoApiKeyException;
import com.alibaba.dashscope.utils.ApiKey;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import java.io.FileInputStream;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
/**
* Before making high-concurrency calls to the ASR service,
* please configure the connection pool size through the following environment
* variables.
*
* DASHSCOPE_MAXIMUM_ASYNC_REQUESTS=2000
* DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST=2000
* DASHSCOPE_CONNECTION_POOL_SIZE=2000
*
* The default is 32, and it is recommended to set it to 2 times the maximum
* concurrent connections on a single server.
*/
public class Main {
public static void checkoutEnv(String envName, int defaultSize) {
if (System.getenv(envName) != null) {
System.out.println("[ENV CHECK]: " + envName + " "
+ System.getenv(envName));
} else {
System.out.println("[ENV CHECK]: " + envName
+ " Using Default which is " + defaultSize);
}
}
public static void main(String[] args)
throws NoApiKeyException, InterruptedException {
// Check for connection pool env
checkoutEnv("DASHSCOPE_CONNECTION_POOL_SIZE", 32);
checkoutEnv("DASHSCOPE_MAXIMUM_ASYNC_REQUESTS", 32);
checkoutEnv("DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST", 32);
checkoutEnv(RecognitionObjectPool.RECOGNITION_OBJECTPOOL_SIZE_ENV, RecognitionObjectPool.DEFAULT_OBJECT_POOL_SIZE);
int threadNums = 3;
String currentDir = System.getProperty("user.dir");
// Please replace the path with your audio source
Path[] filePaths = {
Paths.get(currentDir, "asr_example.wav"),
Paths.get(currentDir, "asr_example.wav"),
Paths.get(currentDir, "asr_example.wav"),
};
// Use ThreadPool to run recognition tasks
ExecutorService executorService = Executors.newFixedThreadPool(threadNums);
for (int i = 0; i < threadNums; i++) {
executorService.submit(new RealtimeRecognizeTask(filePaths));
}
executorService.shutdown();
// wait for all tasks to complete
executorService.awaitTermination(10, TimeUnit.MINUTES);
System.exit(0);
}
}
class RecognitionObjectFactory extends BasePooledObjectFactory<Recognition> {
public RecognitionObjectFactory() {
super();
}
@Override
public Recognition create() throws Exception {
return new Recognition();
}
@Override
public PooledObject<Recognition> wrap(Recognition obj) {
return new DefaultPooledObject<>(obj);
}
}
class RecognitionObjectPool {
public static GenericObjectPool<Recognition> recognitionGenericObjectPool;
public static String RECOGNITION_OBJECTPOOL_SIZE_ENV =
"RECOGNITION_OBJECTPOOL_SIZE";
public static int DEFAULT_OBJECT_POOL_SIZE = 500;
private static Lock lock = new java.util.concurrent.locks.ReentrantLock();
public static int getObjectivePoolSize() {
try {
Integer n = Integer.parseInt(System.getenv(RECOGNITION_OBJECTPOOL_SIZE_ENV));
return n;
} catch (NumberFormatException e) {
return DEFAULT_OBJECT_POOL_SIZE;
}
}
public static GenericObjectPool<Recognition> getInstance() {
lock.lock();
if (recognitionGenericObjectPool == null) {
// You can set the object pool size here. or in environment variable
// RECOGNITION_OBJECTPOOL_SIZE It is recommended to set it to 1.5 to 2
// times your server's maximum concurrent connections.
int objectPoolSize = getObjectivePoolSize();
System.out.println("RECOGNITION_OBJECTPOOL_SIZE: "
+ objectPoolSize);
RecognitionObjectFactory recognitionObjectFactory =
new RecognitionObjectFactory();
GenericObjectPoolConfig<Recognition> config =
new GenericObjectPoolConfig<>();
config.setMaxTotal(objectPoolSize);
config.setMaxIdle(objectPoolSize);
config.setMinIdle(objectPoolSize);
recognitionGenericObjectPool =
new GenericObjectPool<>(recognitionObjectFactory, config);
}
lock.unlock();
return recognitionGenericObjectPool;
}
}
class RealtimeRecognizeTask implements Runnable {
private static final Object lock = new Object();
private Path[] filePaths;
public RealtimeRecognizeTask(Path[] filePaths) {
this.filePaths = filePaths;
}
/**
* Set your DashScope API key. If you have set the DASHSCOPE_API_KEY
* environment variable, you can ignore this step. The SDK automatically gets the
* API key from the environment variable.
* */
private static String getDashScopeApiKey() throws NoApiKeyException {
String dashScopeApiKey = null;
try {
ApiKey apiKey = new ApiKey();
dashScopeApiKey =
ApiKey.getApiKey(null); // Retrieve from environment variable.
} catch (NoApiKeyException e) {
System.out.println("No API key found in environment.");
}
if (dashScopeApiKey == null) {
// If you cannot set api_key in your environment variable,
// you can set it here by code.
dashScopeApiKey = "your-dashscope-apikey";
}
return dashScopeApiKey;
}
public void runCallback() {
for (Path filePath : filePaths) {
// Create recognition params.
// You can customize recognition parameters, such as model, format,
// and sample_rate.
RecognitionParam param = null;
try {
param =
RecognitionParam.builder()
.model("paraformer-realtime-v2")
.format(
"pcm") // 'pcm', 'wav', 'opus', 'speex', 'aac', or 'amr'.
// You can check the documentation for supported formats.
.sampleRate(16000) // Supported sample rates: 8000 and 16000.
.apiKey(getDashScopeApiKey()) // Use getDashScopeApiKey to get the
// API key.
.build();
} catch (Exception e) {
throw new RuntimeException(e);
}
Recognition recognizer = null;
// if recv onError
final boolean[] hasError = {false};
try {
recognizer = RecognitionObjectPool.getInstance().borrowObject();
String threadName = Thread.currentThread().getName();
ResultCallback<RecognitionResult> callback =
new ResultCallback<RecognitionResult>() {
@Override
public void onEvent(RecognitionResult message) {
synchronized (lock) {
if (message.isSentenceEnd()) {
System.out.println("[process " + threadName
+ "] Fix:" + message.getSentence().getText());
} else {
System.out.println("[process " + threadName
+ "] Result: " + message.getSentence().getText());
}
}
}
@Override
public void onComplete() {
System.out.println("[" + threadName + "] Recognition complete");
}
@Override
public void onError(Exception e) {
System.out.println("[" + threadName
+ "] RecognitionCallback error: " + e.getMessage());
hasError[0] = true;
}
};
// Please replace the path with your audio file path.
System.out.println(
"[" + threadName + "] Input file_path is: " + filePath);
FileInputStream fis = null;
// Read file and send audio by chunks.
try {
fis = new FileInputStream(filePath.toFile());
} catch (Exception e) {
System.out.println("Error when loading file: " + filePath);
e.printStackTrace();
}
// set param and callback
recognizer.call(param, callback);
// chunk size set to 100 ms for 16KHz sample rate
byte[] buffer = new byte[3200];
int bytesRead;
// Loop to read chunks of the file.
while ((bytesRead = fis.read(buffer)) != -1) {
ByteBuffer byteBuffer;
if (bytesRead < buffer.length) {
byteBuffer = ByteBuffer.wrap(buffer, 0, bytesRead);
} else {
byteBuffer = ByteBuffer.wrap(buffer);
}
// Send the ByteBuffer to the recognition instance.
recognizer.sendAudioFrame(byteBuffer);
Thread.sleep(100);
buffer = new byte[3200];
}
System.out.println(
"[" + threadName + "] send audio done");
recognizer.stop();
System.out.println(
"[" + threadName + "] asr task finished");
} catch (Exception e) {
e.printStackTrace();
hasError[0] = true;
}
if (recognizer != null) {
try {
if (hasError[0] == true) {
// Invalidate the recognition object if an error occurs.
recognizer.getDuplexApi().close(1000, "bye");
RecognitionObjectPool.getInstance().invalidateObject(recognizer);
} else {
// Return the recognition object to the pool if no error or exception occurs.
RecognitionObjectPool.getInstance().returnObject(recognizer);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
@Override
public void run() {
runCallback();
}
}Exception handling
If a TaskFailed error occurs or the SDK throws an exception, the recognizer object for the current task cannot be reused. In this case, you must close the WebSocket connection and invalidate the object in the object pool.
// Close the connection
recognizer.getDuplexApi().close(1000, "bye");
// Invalidate the object
RecognitionObjectPool.getInstance().invalidateObject(recognizer);Note that the invalidateObject and returnObject methods are mutually exclusive:
The
invalidateObjectmethod: Use this method to invalidate an object when an error occurs.The
returnObjectmethod: Use this method to return a valid object to the pool when no error occurs.