CosyVoice 語音合成服務基於 WebSocket 通訊協定,以支援流式即時通訊。然而,在高並發情境下,為每個請求獨立建立和銷毀 WebSocket 串連會產生巨大的網路與系統資源開銷,並引入顯著的串連延遲。為最佳化效能並確保穩定性,DashScope SDK 內建了高效的資源複用機制(如串連池與對象池)。本文檔將詳細介紹如何利用 DashScope Python 和 Java SDK 中的這些特性,在高並發情境下高效調用 CosyVoice 服務。
如需使用“中國內地(北京)”地區的模型,請前往“中國內地(北京)”地區的API-KEY頁面
前提條件
Python SDK:對象池最佳化
Python SDK 通過 SpeechSynthesizerObjectPool 實現對象池最佳化,用於管理和複用 SpeechSynthesizer 對象。
對象池在初始化時會立即建立指定數量的 SpeechSynthesizer 執行個體並預先建立 WebSocket 串連。從池中擷取對象時無需等待串連建立,可直接發起請求,有效降低首包延遲。當任務完成並將對象歸還到對象池後,其 WebSocket 串連不會關閉,而是保持活躍狀態等待下次任務複用。
實現步驟
安裝依賴:安裝DashScope依賴(
pip install -U dashscope)建立並設定物件池
對象池大小需要通過
SpeechSynthesizerObjectPool進行設定。推薦值:峰值並發數的 1.5 至 2 倍。對象池大小不應超過您賬戶的 QPS(每秒查詢率)限制。通過以下代碼建立全域單例固定大小對象池。對象池在初始化時會立即建立指定數量的
SpeechSynthesizer對象並建立 WebSocket 串連,因此會有一定耗時。from dashscope.audio.tts_v2 import SpeechSynthesizerObjectPool synthesizer_object_pool = SpeechSynthesizerObjectPool(max_size=20)從對象池中擷取
SpeechSynthesizer對象如果當前未歸還的對象數量已超過對象池的最大容量,系統會額外建立一個新的
SpeechSynthesizer對象。此類新建立的對象需要重新進行初始化並建立 WebSocket 串連,無法利用對象池的既有串連資源,因此不具備複用效果。
speech_synthesizer = connectionPool.borrow_synthesizer( model='cosyvoice-v3-flash', voice='longanyang', seed=12382, callback=synthesizer_callback )進行語音合成
調用
SpeechSynthesizer對象的call或streaming_call方法進行語音合成。歸還
SpeechSynthesizer對象語音合成任務結束後,歸還
SpeechSynthesizer對象,以便後續任務可以複用該對象。不要歸還未完成任務或任務失敗的對象。
connectionPool.return_synthesizer(speech_synthesizer)
完整代碼
# !/usr/bin/env python3
# Copyright (C) Alibaba Group. All Rights Reserved.
# MIT License (https://opensource.org/licenses/MIT)
import os
import time
import threading
import dashscope
from dashscope.audio.tts_v2 import *
USE_CONNECTION_POOL = True
text_to_synthesize = [
'第一句、歡迎使用阿里巴巴語音合成服務。',
'第二句、歡迎使用阿里巴巴語音合成服務。',
'第三句、歡迎使用阿里巴巴語音合成服務。',
]
connectionPool = None
if USE_CONNECTION_POOL:
print('creating connection pool')
start_time = time.time() * 1000
connectionPool = SpeechSynthesizerObjectPool(max_size=3)
end_time = time.time() * 1000
print('connection pool created, cost: {} ms'.format(end_time - start_time))
def init_dashscope_api_key():
'''
Set your DashScope API-key. More information:
https://github.com/aliyun/alibabacloud-bailian-speech-demo/blob/master/PREREQUISITES.md
'''
if 'DASHSCOPE_API_KEY' in os.environ:
dashscope.api_key = os.environ[
'DASHSCOPE_API_KEY'] # load API-key from environment variable DASHSCOPE_API_KEY
else:
dashscope.api_key = '<your-dashscope-api-key>' # set API-key manually
def synthesis_text_to_speech_and_play_by_streaming_mode(text, task_id):
global USE_CONNECTION_POOL, connectionPool
'''
Synthesize speech with given text by streaming mode, async call and play the synthesized audio in real-time.
for more information, please refer to https://www.alibabacloud.com/help/document_detail/2712523.html
'''
complete_event = threading.Event()
# Define a callback to handle the result
class Callback(ResultCallback):
def on_open(self):
# when using object pool, on_open will be called after task start
self.file = open(f'result_{task_id}.mp3', 'wb')
print(f'[task_{task_id}] start')
def on_complete(self):
print(f'[task_{task_id}] speech synthesis task complete successfully.')
complete_event.set()
def on_error(self, message: str):
print(f'[task_{task_id}] speech synthesis task failed, {message}')
def on_close(self):
# when using object pool, on_open will be called after task finished
print(f'[task_{task_id}] finished')
def on_event(self, message):
# print(f'recv speech synthsis message {message}')
pass
def on_data(self, data: bytes) -> None:
# send to player
# save audio to file
self.file.write(data)
# Call the speech synthesizer callback
synthesizer_callback = Callback()
# Initialize the speech synthesizer
# you can customize the synthesis parameters, like voice, format, sample_rate or other parameters
if USE_CONNECTION_POOL:
speech_synthesizer = connectionPool.borrow_synthesizer(
model='cosyvoice-v3-flash',
voice='longanyang',
seed=12382,
callback=synthesizer_callback
)
else:
speech_synthesizer = SpeechSynthesizer(model='cosyvoice-v3-flash',
voice='longanyang',
seed=12382,
callback=synthesizer_callback)
try:
speech_synthesizer.call(text)
except Exception as e:
print(f'[task_{task_id}] speech synthesis task failed, {e}')
if USE_CONNECTION_POOL:
# close the synthesizer connection manually if task failed when using connection pool.
speech_synthesizer.close()
return
print('[task_{}] Synthesized text: {}'.format(task_id, text))
complete_event.wait()
print('[task_{}][Metric] requestId: {}, first package delay ms: {}'.format(
task_id,
speech_synthesizer.get_last_request_id(),
speech_synthesizer.get_first_package_delay()))
if USE_CONNECTION_POOL:
connectionPool.return_synthesizer(speech_synthesizer)
# main function
if __name__ == '__main__':
init_dashscope_api_key()
task_thread_list = []
for task_id in range(3):
thread = threading.Thread(
target=synthesis_text_to_speech_and_play_by_streaming_mode,
args=(text_to_synthesize[task_id], task_id))
task_thread_list.append(thread)
for task_thread in task_thread_list:
task_thread.start()
for task_thread in task_thread_list:
task_thread.join()
if USE_CONNECTION_POOL:
connectionPool.shutdown()資源管理與異常處理
任務成功:當語音合成任務正常完成時,必須調用
connectionPool.return_synthesizer(speech_synthesizer)將SpeechSynthesizer對象歸還到池中,以便複用。重要不要歸還未完成任務或任務失敗的
SpeechSynthesizer對象。任務失敗:當 SDK 內部或商務邏輯拋出異常導致任務中斷時,主動關閉底層的 WebSocket 串連:
speech_synthesizer.close()在所有語音合成任務完成後,要通過如下方式關閉對象池:
connectionPool.shutdown()。在服務出現TaskFailed報錯時,不需要額外處理。
Java SDK:串連池與對象池最佳化
Java SDK通過內建的串連池和自訂的對象池協同工作,實現最佳效能。
串連池:SDK 內部整合的 OkHttp3 串連池,負責管理和複用底層的 WebSocket 串連,減少網路握手開銷。此功能預設開啟。
對象池:基於
commons-pool2實現,用於維護一組已預先建立好串連的SpeechSynthesizer對象。從池中擷取對象可消除串連建立的延遲,顯著降低首包延遲。
實現步驟
添加依賴
根據專案構建工具,在依賴設定檔中添加 dashscope-sdk-java 和 commons-pool2。
以Maven和Gradle為例,配置如下:
Maven
開啟您的Maven專案的
pom.xml檔案。在
<dependencies>標籤內添加以下依賴資訊。
<dependency> <groupId>com.alibaba</groupId> <artifactId>dashscope-sdk-java</artifactId> <!-- 請將 'the-latest-version' 替換為2.16.9及以上版本,可在如下連結查詢相關版本號碼:https://mvnrepository.com/artifact/com.alibaba/dashscope-sdk-java --> <version>the-latest-version</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <!-- 請將 'the-latest-version' 替換為最新版本,可在如下連結查詢相關版本號碼:https://mvnrepository.com/artifact/org.apache.commons/commons-pool2 --> <version>the-latest-version</version> </dependency>儲存
pom.xml檔案。使用Maven命令(如
mvn clean install或mvn compile)來更新專案依賴
Gradle
開啟您的Gradle專案的
build.gradle檔案。在
dependencies塊內添加以下依賴資訊。dependencies { // 請將 'the-latest-version' 替換為2.16.6及以上版本,可在如下連結查詢相關版本號碼:https://mvnrepository.com/artifact/com.alibaba/dashscope-sdk-java implementation group: 'com.alibaba', name: 'dashscope-sdk-java', version: 'the-latest-version' // 請將 'the-latest-version' 替換為最新版本,可在如下連結查詢相關版本號碼:https://mvnrepository.com/artifact/org.apache.commons/commons-pool2 implementation group: 'org.apache.commons', name: 'commons-pool2', version: 'the-latest-version' }儲存
build.gradle檔案。在命令列中,切換到您的專案根目錄,執行以下Gradle命令來更新專案依賴。
./gradlew build --refresh-dependencies或者,如果您使用的是Windows系統,命令應為:
gradlew build --refresh-dependencies
配置串連池
通過環境變數配置串連池關鍵參數:
環境變數
描述
DASHSCOPE_CONNECTION_POOL_SIZE
串連池大小。
推薦值:峰值並發數的 2 倍以上。
預設值:32。
DASHSCOPE_MAXIMUM_ASYNC_REQUESTS
最大非同步請求數。
推薦值:與
DASHSCOPE_CONNECTION_POOL_SIZE保持一致。預設值:32。
DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST
單主機最大非同步請求數。
推薦值:與
DASHSCOPE_CONNECTION_POOL_SIZE保持一致。預設值:32。
設定物件池
通過環境變數設定物件池大小:
環境變數
描述
COSYVOICE_OBJECTPOOL_SIZE
對象池大小。
推薦值:峰值並發數的 1.5 至 2 倍。
預設值:500。
重要對象池的大小(
COSYVOICE_OBJECTPOOL_SIZE)必須小於或等於串連池的大小(DASHSCOPE_CONNECTION_POOL_SIZE)。否則,當對象池請求對象時,若串連池已滿,會導致調用線程阻塞,等待可用串連。對象池大小不應超過您賬戶的 QPS(每秒查詢率)限制。
通過如下代碼建立對象池:
class CosyvoiceObjectPool { // 。。。這裡省略其它代碼,完整樣本請參見完整代碼 public static GenericObjectPool<SpeechSynthesizer> getInstance() { lock.lock(); if (synthesizerPool == null) { // 您可以在這裡設定對象池的大小。或在環境變數COSYVOICE_OBJECTPOOL_SIZE中設定。 // 建議設定為伺服器最大並發串連數的1.5到2倍。 int objectPoolSize = getObjectivePoolSize(); SpeechSynthesizerObjectFactory speechSynthesizerObjectFactory = new SpeechSynthesizerObjectFactory(); GenericObjectPoolConfig<SpeechSynthesizer> config = new GenericObjectPoolConfig<>(); config.setMaxTotal(objectPoolSize); config.setMaxIdle(objectPoolSize); config.setMinIdle(objectPoolSize); synthesizerPool = new GenericObjectPool<>(speechSynthesizerObjectFactory, config); } lock.unlock(); return synthesizerPool; } }從對象池中擷取
SpeechSynthesizer對象如果當前未歸還的對象數量已超過對象池的最大容量,系統會額外建立一個新的
SpeechSynthesizer對象。此類新建立的對象需要重新進行初始化並建立 WebSocket 串連,無法利用對象池的既有串連資源,因此不具備複用效果。
synthesizer = CosyvoiceObjectPool.getInstance().borrowObject();進行語音合成
調用
SpeechSynthesizer對象的call或streamingCall方法進行語音合成。歸還
SpeechSynthesizer對象語音合成任務結束後,歸還
SpeechSynthesizer對象,以便後續任務可以複用該對象。不要歸還未完成任務或任務失敗的對象。
CosyvoiceObjectPool.getInstance().returnObject(synthesizer);
完整代碼
package org.alibaba.bailian.example.examples;
import com.alibaba.dashscope.audio.tts.SpeechSynthesisResult;
import com.alibaba.dashscope.audio.ttsv2.SpeechSynthesisAudioFormat;
import com.alibaba.dashscope.audio.ttsv2.SpeechSynthesisParam;
import com.alibaba.dashscope.audio.ttsv2.SpeechSynthesizer;
import com.alibaba.dashscope.common.ResultCallback;
import com.alibaba.dashscope.exception.NoApiKeyException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import java.time.LocalDateTime;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
/**
* 您需要在專案中引入org.apache.commons.pool2和DashScope相關的包。
*
* DashScope SDK 2.16.6及後續版本針對高並發情境進行了最佳化,
* DashScope SDK 2.16.6之前的版本不推薦在高並發情境下使用。
*
*
* 在對TTS服務進行高並發調用之前,
* 請通過以下環境變數配置串連池的相關參數。
*
* DASHSCOPE_MAXIMUM_ASYNC_REQUESTS
* DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST
* DASHSCOPE_CONNECTION_POOL_SIZE
*
*/
class SpeechSynthesizerObjectFactory
extends BasePooledObjectFactory<SpeechSynthesizer> {
public SpeechSynthesizerObjectFactory() {
super();
}
@Override
public SpeechSynthesizer create() throws Exception {
return new SpeechSynthesizer();
}
@Override
public PooledObject<SpeechSynthesizer> wrap(SpeechSynthesizer obj) {
return new DefaultPooledObject<>(obj);
}
}
class CosyvoiceObjectPool {
public static GenericObjectPool<SpeechSynthesizer> synthesizerPool;
public static String COSYVOICE_OBJECTPOOL_SIZE_ENV = "COSYVOICE_OBJECTPOOL_SIZE";
public static int DEFAULT_OBJECT_POOL_SIZE = 500;
private static Lock lock = new java.util.concurrent.locks.ReentrantLock();
public static int getObjectivePoolSize() {
try {
Integer n = Integer.parseInt(System.getenv(COSYVOICE_OBJECTPOOL_SIZE_ENV));
System.out.println("Using Object Pool Size In Env: "+ n);
return n;
} catch (NumberFormatException e) {
System.out.println("Using Default Object Pool Size: "+ DEFAULT_OBJECT_POOL_SIZE);
return DEFAULT_OBJECT_POOL_SIZE;
}
}
public static GenericObjectPool<SpeechSynthesizer> getInstance() {
lock.lock();
if (synthesizerPool == null) {
// 您可以在這裡設定對象池的大小。或在環境變數COSYVOICE_OBJECTPOOL_SIZE中設定。
// 建議設定為伺服器最大並發串連數的1.5到2倍。
int objectPoolSize = getObjectivePoolSize();
SpeechSynthesizerObjectFactory speechSynthesizerObjectFactory =
new SpeechSynthesizerObjectFactory();
GenericObjectPoolConfig<SpeechSynthesizer> config =
new GenericObjectPoolConfig<>();
config.setMaxTotal(objectPoolSize);
config.setMaxIdle(objectPoolSize);
config.setMinIdle(objectPoolSize);
synthesizerPool =
new GenericObjectPool<>(speechSynthesizerObjectFactory, config);
}
lock.unlock();
return synthesizerPool;
}
}
class SynthesizeTaskWithCallback implements Runnable {
String[] textArray;
String requestId;
long timeCost;
public SynthesizeTaskWithCallback(String[] textArray) {
this.textArray = textArray;
}
@Override
public void run() {
SpeechSynthesizer synthesizer = null;
long startTime = System.currentTimeMillis();
// if recv onError
final boolean[] hasError = {false};
try {
class ReactCallback extends ResultCallback<SpeechSynthesisResult> {
ReactCallback() {}
@Override
public void onEvent(SpeechSynthesisResult message) {
if (message.getAudioFrame() != null) {
try {
byte[] bytesArray = message.getAudioFrame().array();
System.out.println("收到音頻,音頻檔案流length為:" + bytesArray.length);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
@Override
public void onComplete() {}
@Override
public void onError(Exception e) {
System.out.println(e.getMessage());
e.printStackTrace();
hasError[0] = true;
}
}
// 將your-dashscope-api-key替換成您自己的API-KEY
String dashScopeApiKey = "your-dashscope-api-key";
SpeechSynthesisParam param =
SpeechSynthesisParam.builder()
.model("cosyvoice-v3-flash")
.voice("longanyang")
.format(SpeechSynthesisAudioFormat
.MP3_22050HZ_MONO_256KBPS) // 流式合成使用PCM或者MP3
.apiKey(dashScopeApiKey)
.build();
try {
synthesizer = CosyvoiceObjectPool.getInstance().borrowObject();
synthesizer.updateParamAndCallback(param, new ReactCallback());
for (String text : textArray) {
synthesizer.streamingCall(text);
}
Thread.sleep(20);
synthesizer.streamingComplete(60000);
requestId = synthesizer.getLastRequestId();
} catch (Exception e) {
System.out.println("Exception e: " + e.toString());
hasError[0] = true;
}
} catch (Exception e) {
hasError[0] = true;
throw new RuntimeException(e);
}
if (synthesizer != null) {
try {
if (hasError[0] == true) {
// 如果出現異常,則關閉串連並在對象池中禁用該對象。
synthesizer.getDuplexApi().close(1000, "bye");
CosyvoiceObjectPool.getInstance().invalidateObject(synthesizer);
} else {
// 如果任務正常結束,則歸還對象。
CosyvoiceObjectPool.getInstance().returnObject(synthesizer);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
long endTime = System.currentTimeMillis();
timeCost = endTime - startTime;
System.out.println("[線程 " + Thread.currentThread() + "] 語音合成任務結束。耗時 " + timeCost + " ms, RequestId " + requestId);
}
}
}
@Slf4j
public class SynthesizeTextToSpeechWithCallbackConcurrently {
public static void checkoutEnv(String envName, int defaultSize) {
if (System.getenv(envName) != null) {
System.out.println("[ENV CHECK]: " + envName + " "
+ System.getenv(envName));
} else {
System.out.println("[ENV CHECK]: " + envName
+ " Using Default which is " + defaultSize);
}
}
public static void main(String[] args)
throws InterruptedException, NoApiKeyException {
// Check for connection pool env
checkoutEnv("DASHSCOPE_CONNECTION_POOL_SIZE", 32);
checkoutEnv("DASHSCOPE_MAXIMUM_ASYNC_REQUESTS", 32);
checkoutEnv("DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST", 32);
checkoutEnv(CosyvoiceObjectPool.COSYVOICE_OBJECTPOOL_SIZE_ENV, CosyvoiceObjectPool.DEFAULT_OBJECT_POOL_SIZE);
int runTimes = 3;
// Create the pool of SpeechSynthesis objects
ExecutorService executorService = Executors.newFixedThreadPool(runTimes);
for (int i = 0; i < runTimes; i++) {
// Record the task submission time
LocalDateTime submissionTime = LocalDateTime.now();
executorService.submit(new SynthesizeTaskWithCallback(new String[] {
"床前明月光,", "疑似地上霜。", "舉頭望明月,", "低頭思故鄉。"}));
}
// Shut down the ExecutorService and wait for all tasks to complete
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
System.exit(0);
}
}推薦配置
以下配置基於在指定規格的阿里雲伺服器上僅運行 CosyVoice 語音合成服務的測試結果。過高的並發數可能導致任務處理延遲。
其中單機並發數指的是同一時刻正在啟動並執行CosyVoice語音合成任務數,也可以理解為背景工作執行緒數。
機器配置(阿里雲) | 單機最大並發數 | 對象池大小 | 串連池大小 |
4核8GiB | 100 | 500 | 2000 |
8核16GiB | 150 | 500 | 2000 |
16核32GiB | 200 | 500 | 2000 |
資源管理與異常處理
任務成功:當語音合成任務正常完成時,必須調用GenericObjectPool的returnObject方法將
SpeechSynthesizer對象歸還到池中,以便複用。在當前代碼中,對應
CosyvoiceObjectPool.getInstance().returnObject(synthesizer)重要不要歸還未完成任務或任務失敗的
SpeechSynthesizer對象。任務失敗:當 SDK 內部或商務邏輯拋出異常導致任務中斷時,必須執行以下兩個操作:
主動關閉底層的 WebSocket 串連
從對象池中廢棄該對象,防止被再次使用
// 在當前代碼中對應如下內容 // 關閉串連 synthesizer.getDuplexApi().close(1000, "bye"); // 在對象池中廢棄出現異常的synthesizer CosyvoiceObjectPool.getInstance().invalidateObject(synthesizer);在服務出現TaskFailed報錯時,不需要額外處理。
調用預熱與耗時統計說明
在對 DashScope Java SDK 進行並發調用延遲等效能評估時,建議在正式測試前執行充分的預熱操作。預熱能夠確保測量結果準確反映服務在穩定點下的真實效能,避免因初始串連耗時導致的資料偏差。
串連複用機制
DashScope Java SDK 通過全域單例的串連池高效管理和複用 WebSocket 串連,旨在減少頻繁建連和斷連的開銷,提升高並發情境下的處理能力。
該機制的工作特點如下:
按需建立:SDK 不會在服務啟動時預建立 WebSocket 串連,而是在首次調用時按需建立。
限時複用:請求完成後,串連將在池中保留最多 60 秒以備複用。
若 60 秒內有新請求,將複用現有串連,避免重複握手開銷。
若串連空閑超過 60 秒,將被自動關閉以釋放資源。
預熱的重要性
在以下情境中,串連池中可能沒有可複用的活躍串連,導致請求需要建立串連:
應用剛啟動,尚未發起任何調用。
服務空閑時間超過 60 秒,池中串連已因逾時而關閉。
在這些情境下,首次或初期請求會觸發完整的 WebSocket 建連過程(包括 TCP 握手、TLS 加密協商和協議升級),其端到端延遲會顯著高於後續複用串連的請求。這部分額外耗時源於網路連接初始化,並非服務本身的處理延遲。因此,若未進行預熱,效能測試結果會因包含初始建連時間而產生偏差。
推薦做法
為擷取可靠的效能資料,在正式進行效能壓測或延遲統計前,請遵循以下預熱步驟:
類比正式測試的並發層級,提前發起一定數量的調用(例如,持續 1-2 分鐘),以充分填充串連池。
確認串連池已建立並維持足夠的活躍串連後,再開始正式的效能資料採集。
通過合理的預熱,可使 SDK 串連池進入穩定複用狀態,從而測量出更具代表性的延遲指標,真實反映服務線上上平穩運行時的效能。