全部产品
Search
文档中心

大模型服务平台百炼:实时语音合成

更新时间:May 12, 2026

实时语音合成通过 WebSocket 协议将文本实时转换为自然语音。百炼提供 CosyVoice、Qwen-TTS 系列模型,支持流式输入与输出,具备声音复刻、声音设计及精细化音频控制能力,适用于语音助手、有声读物、智能客服等场景。

概述

通过 WebSocket 协议实现低延迟实时语音合成,适用于语音助手、智能客服、直播字幕等需要即时响应的场景。

  • 支持流式输入与输出(双向 WebSocket),首包延迟低,适合语音助手、智能客服等实时对话场景

  • 可调节语速、语调、音量与码率,实现精细的语音效果控制

  • 兼容主流音频格式(PCM、WAV、MP3、Opus),最高支持 48kHz 采样率输出

  • 支持指令控制,可通过自然语言指令控制语音表现力

  • 支持声音复刻声音设计音色定制

如果您不需要实时输出,可以使用语音合成-千问(HTTP API),适合有声读物、课件配音等批量场景。如需了解各模型的选型建议,请参见语音合成

前提条件

快速开始

以下是各模型系列的语音合成示例代码。更多语言的示例代码和详细参数说明,请参见各模型的API参考

CosyVoice

重要

cosyvoice-v3.5-plus 和 cosyvoice-v3.5-flash 模型目前仅在北京地域可用,且仅支持声音设计和声音复刻场景(无系统音色)。使用前,请先参见声音复刻声音设计创建目标音色。创建完成后,将代码中的 voice 字段设置为您的音色 ID,并将 model 字段指定为对应模型即可。

以下示例演示如何使用系统音色(参见音色列表)进行语音合成。

Python

# coding=utf-8

    import os
    import dashscope
    from dashscope.audio.tts_v2 import *

    # 新加坡地域和北京地域的API Key不同。获取API Key:https://www.alibabacloud.com/help/zh/model-studio/get-api-key
    # 若没有配置环境变量,请用百炼API Key将下行替换为:dashscope.api_key = "sk-xxx"
    dashscope.api_key = os.environ.get('DASHSCOPE_API_KEY')

    # 以下为新加坡地域url,若使用北京地域的模型,需将url替换为:wss://dashscope.aliyuncs.com/api-ws/v1/inference
    dashscope.base_websocket_api_url='wss://dashscope-intl.aliyuncs.com/api-ws/v1/inference'

    # 模型
    # 不同模型版本需要使用对应版本的音色:
    # cosyvoice-v3-flash/cosyvoice-v3-plus:使用longanyang等音色。
    # cosyvoice-v2:使用longxiaochun_v2等音色。
    # 每个音色支持的语言不同,合成日语、韩语等非中文语言时,需选择支持对应语言的音色。详见CosyVoice音色列表。
    model = "cosyvoice-v3-flash"
    # 音色
    voice = "longanyang"

    # 实例化SpeechSynthesizer,并在构造方法中传入模型(model)、音色(voice)等请求参数
    synthesizer = SpeechSynthesizer(model=model, voice=voice)
    # 发送待合成文本,获取二进制音频
    audio = synthesizer.call("今天天气怎么样?")
    # 首次发送文本时需建立 WebSocket 连接,因此首包延迟会包含连接建立的耗时
    print('[Metric] requestId为:{},首包延迟为:{}毫秒'.format(
        synthesizer.get_last_request_id(),
        synthesizer.get_first_package_delay()))

    # 将音频保存至本地
    with open('output.mp3', 'wb') as f:
        f.write(audio)

Java

import com.alibaba.dashscope.audio.ttsv2.SpeechSynthesisParam;
    import com.alibaba.dashscope.audio.ttsv2.SpeechSynthesizer;
    import com.alibaba.dashscope.utils.Constants;

    import java.io.File;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.nio.ByteBuffer;

    public class Main {
        // 模型
        // 不同模型版本需要使用对应版本的音色:
        // cosyvoice-v3-flash/cosyvoice-v3-plus:使用longanyang等音色。
        // cosyvoice-v2:使用longxiaochun_v2等音色。
        // 每个音色支持的语言不同,合成日语、韩语等非中文语言时,需选择支持对应语言的音色。详见CosyVoice音色列表。
        private static String model = "cosyvoice-v3-flash";
        // 音色
        private static String voice = "longanyang";

        public static void streamAudioDataToSpeaker() {
            // 请求参数
            SpeechSynthesisParam param =
                    SpeechSynthesisParam.builder()
                            // 新加坡地域和北京地域的API Key不同。获取API Key:https://www.alibabacloud.com/help/zh/model-studio/get-api-key
                            // 若没有配置环境变量,请用百炼API Key将下行替换为:.apiKey("sk-xxx")
                            .apiKey(System.getenv("DASHSCOPE_API_KEY"))
                            .model(model) // 模型
                            .voice(voice) // 音色
                            .build();

            // 同步模式:禁用回调(第二个参数为null)
            SpeechSynthesizer synthesizer = new SpeechSynthesizer(param, null);
            ByteBuffer audio = null;
            try {
                // 阻塞直至音频返回
                audio = synthesizer.call("今天天气怎么样?");
            } catch (Exception e) {
                throw new RuntimeException(e);
            } finally {
                // 任务结束关闭websocket连接
                synthesizer.getDuplexApi().close(1000, "bye");
            }
            if (audio != null) {
                // 将音频数据保存到本地文件"output.mp3"中
                File file = new File("output.mp3");
                // 首次发送文本时需建立 WebSocket 连接,因此首包延迟会包含连接建立的耗时
                // 注意:getFirstPackageDelay() 需要 dashscope-sdk-java 2.18.0 及以上版本
                System.out.println(
                        "[Metric] requestId为:"
                                + synthesizer.getLastRequestId()
                                + "首包延迟(毫秒)为:"
                                + synthesizer.getFirstPackageDelay());
                try (FileOutputStream fos = new FileOutputStream(file)) {
                    fos.write(audio.array());
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }

        public static void main(String[] args) {
            // 以下为新加坡地域url,若使用北京地域的模型,需将url替换为:wss://dashscope.aliyuncs.com/api-ws/v1/inference
            Constants.baseWebsocketApiUrl = "wss://dashscope-intl.aliyuncs.com/api-ws/v1/inference";
            streamAudioDataToSpeaker();
            System.exit(0);
        }
    }

Qwen-TTS

使用系统音色进行语音合成

以下示例演示如何使用系统音色(参见支持的音色)进行语音合成。

如需使用指令控制功能,请将 model 替换为 qwen3-tts-instruct-flash-realtime,并通过 instructions 参数设置指令。

Python

server commit模式

import os
import base64
import threading
import time
import dashscope
from dashscope.audio.qwen_tts_realtime import *


qwen_tts_realtime: QwenTtsRealtime = None
text_to_synthesize = [
    '对吧~我就特别喜欢这种超市,',
    '尤其是过年的时候',
    '去逛超市',
    '就会觉得',
    '超级超级开心!',
    '想买好多好多的东西呢!'
]

DO_VIDEO_TEST = False

def init_dashscope_api_key():
    """
        Set your DashScope API-key. More information:
        https://github.com/aliyun/alibabacloud-bailian-speech-demo/blob/master/PREREQUISITES.md
    """

    # 新加坡和北京地域的API Key不同。获取API Key:https://www.alibabacloud.com/help/zh/model-studio/get-api-key
    if 'DASHSCOPE_API_KEY' in os.environ:
        dashscope.api_key = os.environ[
            'DASHSCOPE_API_KEY']  # load API-key from environment variable DASHSCOPE_API_KEY
    else:
        dashscope.api_key = 'your-dashscope-api-key'  # set API-key manually



class MyCallback(QwenTtsRealtimeCallback):
    def __init__(self):
        self.complete_event = threading.Event()
        self.file = open('result_24k.pcm', 'wb')

    def on_open(self) -> None:
        print('connection opened, init player')

    def on_close(self, close_status_code, close_msg) -> None:
        self.file.close()
        print('connection closed with code: {}, msg: {}, destroy player'.format(close_status_code, close_msg))

    def on_event(self, response: str) -> None:
        try:
            global qwen_tts_realtime
            type = response['type']
            if 'session.created' == type:
                print('start session: {}'.format(response['session']['id']))
            if 'response.audio.delta' == type:
                recv_audio_b64 = response['delta']
                self.file.write(base64.b64decode(recv_audio_b64))
            if 'response.done' == type:
                print(f'response {qwen_tts_realtime.get_last_response_id()} done')
            if 'session.finished' == type:
                print('session finished')
                self.complete_event.set()
        except Exception as e:
            print('[Error] {}'.format(e))
            return

    def wait_for_finished(self):
        self.complete_event.wait()


if __name__  == '__main__':
    init_dashscope_api_key()

    print('Initializing ...')

    callback = MyCallback()

    qwen_tts_realtime = QwenTtsRealtime(
        # 如需使用指令控制功能,请将model替换为qwen3-tts-instruct-flash-realtime
        model='qwen3-tts-flash-realtime',
        callback=callback,
        # 以下为新加坡地域url,若使用北京地域的模型,需将url替换为:wss://dashscope.aliyuncs.com/api-ws/v1/realtime
        url='wss://dashscope-intl.aliyuncs.com/api-ws/v1/realtime'
        )

    qwen_tts_realtime.connect()
    qwen_tts_realtime.update_session(
        voice = 'Cherry',
        response_format = AudioFormat.PCM_24000HZ_MONO_16BIT,
        # 如需使用指令控制功能,请取消下方注释,并将model替换为qwen3-tts-instruct-flash-realtime
        # instructions='语速较快,带有明显的上扬语调,适合介绍时尚产品。',
        # optimize_instructions=True,
        mode = 'server_commit'        
    )
    for text_chunk in text_to_synthesize:
        print(f'send text: {text_chunk}')
        qwen_tts_realtime.append_text(text_chunk)
        time.sleep(0.1)
    qwen_tts_realtime.finish()
    callback.wait_for_finished()
    print('[Metric] session: {}, first audio delay: {}'.format(
                    qwen_tts_realtime.get_session_id(), 
                    qwen_tts_realtime.get_first_audio_delay(),
                    ))

commit模式

import base64
import os
import threading
import dashscope
from dashscope.audio.qwen_tts_realtime import *


qwen_tts_realtime: QwenTtsRealtime = None
text_to_synthesize = [
    '这是第一句话。',
    '这是第二句话。',
    '这是第三句话。',
]

DO_VIDEO_TEST = False

def init_dashscope_api_key():
    """
        Set your DashScope API-key. More information:
        https://github.com/aliyun/alibabacloud-bailian-speech-demo/blob/master/PREREQUISITES.md
    """

    # 新加坡和北京地域的API Key不同。获取API Key:https://www.alibabacloud.com/help/zh/model-studio/get-api-key
    if 'DASHSCOPE_API_KEY' in os.environ:
        dashscope.api_key = os.environ[
            'DASHSCOPE_API_KEY']  # load API-key from environment variable DASHSCOPE_API_KEY
    else:
        dashscope.api_key = 'your-dashscope-api-key'  # set API-key manually



class MyCallback(QwenTtsRealtimeCallback):
    def __init__(self):
        super().__init__()
        self.response_counter = 0
        self.complete_event = threading.Event()
        self.file = open(f'result_{self.response_counter}_24k.pcm', 'wb')

    def reset_event(self):
        self.response_counter += 1
        self.file = open(f'result_{self.response_counter}_24k.pcm', 'wb')
        self.complete_event = threading.Event()

    def on_open(self) -> None:
        print('connection opened, init player')

    def on_close(self, close_status_code, close_msg) -> None:
        print('connection closed with code: {}, msg: {}, destroy player'.format(close_status_code, close_msg))

    def on_event(self, response: str) -> None:
        try:
            global qwen_tts_realtime
            type = response['type']
            if 'session.created' == type:
                print('start session: {}'.format(response['session']['id']))
            if 'response.audio.delta' == type:
                recv_audio_b64 = response['delta']
                self.file.write(base64.b64decode(recv_audio_b64))
            if 'response.done' == type:
                print(f'response {qwen_tts_realtime.get_last_response_id()} done')
                self.complete_event.set()
                self.file.close()
            if 'session.finished' == type:
                print('session finished')
                self.complete_event.set()
        except Exception as e:
            print('[Error] {}'.format(e))
            return

    def wait_for_response_done(self):
        self.complete_event.wait()


if __name__  == '__main__':
    init_dashscope_api_key()

    print('Initializing ...')

    callback = MyCallback()

    qwen_tts_realtime = QwenTtsRealtime(
        # 如需使用指令控制功能,请将model替换为qwen3-tts-instruct-flash-realtime
        model='qwen3-tts-flash-realtime',
        callback=callback, 
        # 以下为新加坡地域url,若使用北京地域的模型,需将url替换为:wss://dashscope.aliyuncs.com/api-ws/v1/realtime
        url='wss://dashscope-intl.aliyuncs.com/api-ws/v1/realtime'
        )

    qwen_tts_realtime.connect()
    qwen_tts_realtime.update_session(
        voice = 'Cherry',
        response_format = AudioFormat.PCM_24000HZ_MONO_16BIT,
        # 如需使用指令控制功能,请取消下方注释,并将model替换为qwen3-tts-instruct-flash-realtime
        # instructions='语速较快,带有明显的上扬语调,适合介绍时尚产品。',
        # optimize_instructions=True,
        mode = 'commit'        
    )
    print(f'send text: {text_to_synthesize[0]}')
    qwen_tts_realtime.append_text(text_to_synthesize[0])
    qwen_tts_realtime.commit()
    callback.wait_for_response_done()
    callback.reset_event()
    
    print(f'send text: {text_to_synthesize[1]}')
    qwen_tts_realtime.append_text(text_to_synthesize[1])
    qwen_tts_realtime.commit()
    callback.wait_for_response_done()
    callback.reset_event()

    print(f'send text: {text_to_synthesize[2]}')
    qwen_tts_realtime.append_text(text_to_synthesize[2])
    qwen_tts_realtime.commit()
    callback.wait_for_response_done()
    
    qwen_tts_realtime.finish()
    print('[Metric] session: {}, first audio delay: {}'.format(
                    qwen_tts_realtime.get_session_id(), 
                    qwen_tts_realtime.get_first_audio_delay(),
                    ))

Java

server commit模式

import com.alibaba.dashscope.audio.qwen_tts_realtime.*;
import com.alibaba.dashscope.exception.NoApiKeyException;
import com.google.gson.JsonObject;
import javax.sound.sampled.LineUnavailableException;
import javax.sound.sampled.SourceDataLine;
import javax.sound.sampled.AudioFormat;
import javax.sound.sampled.DataLine;
import javax.sound.sampled.AudioSystem;
import java.io.*;
import java.util.Base64;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;

public class Main {
    static String[] textToSynthesize = {
            "对吧~我就特别喜欢这种超市",
            "尤其是过年的时候",
            "去逛超市",
            "就会觉得",
            "超级超级开心!",
            "想买好多好多的东西呢!"
    };
    public static QwenTtsRealtimeAudioFormat ttsFormat = QwenTtsRealtimeAudioFormat.PCM_24000HZ_MONO_16BIT;

    // 实时PCM音频播放器类
    public static class RealtimePcmPlayer {
        private int sampleRate;
        private SourceDataLine line;
        private AudioFormat audioFormat;
        private Thread decoderThread;
        private Thread playerThread;
        private AtomicBoolean stopped = new AtomicBoolean(false);
        private Queue<String> b64AudioBuffer = new ConcurrentLinkedQueue<>();
        private Queue<byte[]> RawAudioBuffer = new ConcurrentLinkedQueue<>();
        private ByteArrayOutputStream totalAudioStream = new ByteArrayOutputStream();

        // 构造函数初始化音频格式和音频线路
        public RealtimePcmPlayer(int sampleRate) throws LineUnavailableException {
            this.sampleRate = sampleRate;
            this.audioFormat = new AudioFormat(this.sampleRate, 16, 1, true, false);
            DataLine.Info info = new DataLine.Info(SourceDataLine.class, audioFormat);
            line = (SourceDataLine) AudioSystem.getLine(info);
            line.open(audioFormat);
            line.start();
            decoderThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (!stopped.get()) {
                        String b64Audio = b64AudioBuffer.poll();
                        if (b64Audio != null) {
                            byte[] rawAudio = Base64.getDecoder().decode(b64Audio);
                            RawAudioBuffer.add(rawAudio);
                            // 将音频数据写入 totalAudioStream
                            try {
                                totalAudioStream.write(rawAudio);
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            }
                        } else {
                            try {
                                Thread.sleep(100);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        }
                    }
                }
            });
            playerThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (!stopped.get()) {
                        byte[] rawAudio = RawAudioBuffer.poll();
                        if (rawAudio != null) {
                            try {
                                playChunk(rawAudio);
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        } else {
                            try {
                                Thread.sleep(100);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        }
                    }
                }
            });
            decoderThread.start();
            playerThread.start();
        }

        // 播放一个音频块并阻塞直到播放完成
        private void playChunk(byte[] chunk) throws IOException, InterruptedException {
            if (chunk == null || chunk.length == 0) return;

            int bytesWritten = 0;
            while (bytesWritten < chunk.length) {
                bytesWritten += line.write(chunk, bytesWritten, chunk.length - bytesWritten);
            }
            int audioLength = chunk.length / (this.sampleRate*2/1000);
            // 等待缓冲区中的音频播放完成
            Thread.sleep(audioLength - 10);
        }

        public void write(String b64Audio) {
            b64AudioBuffer.add(b64Audio);
        }

        public void cancel() {
            b64AudioBuffer.clear();
            RawAudioBuffer.clear();
        }

        public void waitForComplete() throws InterruptedException {
            while (!b64AudioBuffer.isEmpty() || !RawAudioBuffer.isEmpty()) {
                Thread.sleep(100);
            }
            line.drain();
        }

        public void shutdown() throws InterruptedException, IOException {
            stopped.set(true);
            decoderThread.join();
            playerThread.join();

            // 保存完整音频文件
            File file = new File("TotalAudio_"+ttsFormat.getSampleRate()+"."+ttsFormat.getFormat());
            try (FileOutputStream fos = new FileOutputStream(file)) {
                fos.write(totalAudioStream.toByteArray());
            }

            if (line != null && line.isRunning()) {
                line.drain();
                line.close();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException, LineUnavailableException, IOException {
        QwenTtsRealtimeParam param = QwenTtsRealtimeParam.builder()
                // 如需使用指令控制功能,请将model替换为qwen3-tts-instruct-flash-realtime
                .model("qwen3-tts-flash-realtime")
                // 以下为新加坡地域url,若使用北京地域的模型,需将url替换为:wss://dashscope.aliyuncs.com/api-ws/v1/realtime
                .url("wss://dashscope-intl.aliyuncs.com/api-ws/v1/realtime")
                // 新加坡和北京地域的API Key不同。获取API Key:https://www.alibabacloud.com/help/zh/model-studio/get-api-key
                .apikey(System.getenv("DASHSCOPE_API_KEY"))
                .build();
        AtomicReference<CountDownLatch> completeLatch = new AtomicReference<>(new CountDownLatch(1));
        final AtomicReference<QwenTtsRealtime> qwenTtsRef = new AtomicReference<>(null);

        // 创建实时音频播放器实例
        RealtimePcmPlayer audioPlayer = new RealtimePcmPlayer(24000);

        QwenTtsRealtime qwenTtsRealtime = new QwenTtsRealtime(param, new QwenTtsRealtimeCallback() {
            @Override
            public void onOpen() {
                // 连接建立时的处理
            }
            @Override
            public void onEvent(JsonObject message) {
                String type = message.get("type").getAsString();
                switch(type) {
                    case "session.created":
                        // 会话创建时的处理
                        if (message.has("session")) {
                            String eventId = message.get("event_id").getAsString();
                            String sessionId = message.get("session").getAsJsonObject().get("id").getAsString();
                            System.out.println("[onEvent] session.created, session_id: "
                                    + sessionId + ", event_id: " + eventId);
                        }
                        break;
                    case "response.audio.delta":
                        String recvAudioB64 = message.get("delta").getAsString();
                        // 实时播放音频
                        audioPlayer.write(recvAudioB64);
                        break;
                    case "response.done":
                        // 响应完成时的处理
                        break;
                    case "session.finished":
                        // 会话结束时的处理
                        completeLatch.get().countDown();
                    default:
                        break;
                }
            }
            @Override
            public void onClose(int code, String reason) {
                // 连接关闭时的处理
            }
        });
        qwenTtsRef.set(qwenTtsRealtime);
        try {
            qwenTtsRealtime.connect();
        } catch (NoApiKeyException e) {
            throw new RuntimeException(e);
        }
        QwenTtsRealtimeConfig config = QwenTtsRealtimeConfig.builder()
                .voice("Cherry")
                .responseFormat(ttsFormat)
                .mode("server_commit")
                // 如需使用指令控制功能,请取消下方注释,并将model替换为qwen3-tts-instruct-flash-realtime
                // .instructions("")
                // .optimizeInstructions(true)
                .build();
        qwenTtsRealtime.updateSession(config);
        for (String text:textToSynthesize) {
            qwenTtsRealtime.appendText(text);
            Thread.sleep(100);
        }
        qwenTtsRealtime.finish();
        completeLatch.get().await();
        qwenTtsRealtime.close();

        // 等待音频播放完成并关闭播放器
        audioPlayer.waitForComplete();
        audioPlayer.shutdown();
        System.exit(0);
    }
}

commit模式

import com.alibaba.dashscope.audio.qwen_tts_realtime.*;
import com.alibaba.dashscope.exception.NoApiKeyException;
import com.google.gson.JsonObject;
import javax.sound.sampled.LineUnavailableException;
import javax.sound.sampled.SourceDataLine;
import javax.sound.sampled.AudioFormat;
import javax.sound.sampled.DataLine;
import javax.sound.sampled.AudioSystem;
import java.io.*;
import java.util.Base64;
import java.util.Queue;
import java.util.Scanner;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;

public class Main {
    public static QwenTtsRealtimeAudioFormat ttsFormat = QwenTtsRealtimeAudioFormat.PCM_24000HZ_MONO_16BIT;
    // 实时PCM音频播放器类
    public static class RealtimePcmPlayer {
        private int sampleRate;
        private SourceDataLine line;
        private AudioFormat audioFormat;
        private Thread decoderThread;
        private Thread playerThread;
        private AtomicBoolean stopped = new AtomicBoolean(false);
        private Queue<String> b64AudioBuffer = new ConcurrentLinkedQueue<>();
        private Queue<byte[]> RawAudioBuffer = new ConcurrentLinkedQueue<>();
        private ByteArrayOutputStream totalAudioStream = new ByteArrayOutputStream();


        // 构造函数初始化音频格式和音频线路
        public RealtimePcmPlayer(int sampleRate) throws LineUnavailableException {
            this.sampleRate = sampleRate;
            this.audioFormat = new AudioFormat(this.sampleRate, 16, 1, true, false);
            DataLine.Info info = new DataLine.Info(SourceDataLine.class, audioFormat);
            line = (SourceDataLine) AudioSystem.getLine(info);
            line.open(audioFormat);
            line.start();
            decoderThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (!stopped.get()) {
                        String b64Audio = b64AudioBuffer.poll();
                        if (b64Audio != null) {
                            byte[] rawAudio = Base64.getDecoder().decode(b64Audio);
                            RawAudioBuffer.add(rawAudio);
                            // 将音频数据写入 totalAudioStream
                            try {
                                totalAudioStream.write(rawAudio);
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            }
                        } else {
                            try {
                                Thread.sleep(100);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        }
                    }
                }
            });
            playerThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (!stopped.get()) {
                        byte[] rawAudio = RawAudioBuffer.poll();
                        if (rawAudio != null) {
                            try {
                                playChunk(rawAudio);
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        } else {
                            try {
                                Thread.sleep(100);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        }
                    }
                }
            });
            decoderThread.start();
            playerThread.start();
        }

        // 播放一个音频块并阻塞直到播放完成
        private void playChunk(byte[] chunk) throws IOException, InterruptedException {
            if (chunk == null || chunk.length == 0) return;

            int bytesWritten = 0;
            while (bytesWritten < chunk.length) {
                bytesWritten += line.write(chunk, bytesWritten, chunk.length - bytesWritten);
            }
            int audioLength = chunk.length / (this.sampleRate*2/1000);
            // 等待缓冲区中的音频播放完成
            Thread.sleep(audioLength - 10);
        }

        public void write(String b64Audio) {
            b64AudioBuffer.add(b64Audio);
        }

        public void cancel() {
            b64AudioBuffer.clear();
            RawAudioBuffer.clear();
        }

        public void waitForComplete() throws InterruptedException {
            // 等待所有缓冲区中的音频数据播放完成
            while (!b64AudioBuffer.isEmpty() || !RawAudioBuffer.isEmpty()) {
                Thread.sleep(100);
            }
            // 等待音频线路播放完成
            line.drain();
        }

        public void shutdown() throws InterruptedException {
            stopped.set(true);
            decoderThread.join();
            playerThread.join();
            // 保存完整音频文件
            File file = new File("TotalAudio_"+ttsFormat.getSampleRate()+"."+ttsFormat.getFormat());
            try (FileOutputStream fos = new FileOutputStream(file)) {
                fos.write(totalAudioStream.toByteArray());
            } catch (FileNotFoundException e) {
                throw new RuntimeException(e);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
            if (line != null && line.isRunning()) {
                line.drain();
                line.close();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException, LineUnavailableException, FileNotFoundException {
        Scanner scanner = new Scanner(System.in);

        QwenTtsRealtimeParam param = QwenTtsRealtimeParam.builder()
                // 如需使用指令控制功能,请将model替换为qwen3-tts-instruct-flash-realtime
                .model("qwen3-tts-flash-realtime")
                // 以下为新加坡地域url,若使用北京地域的模型,需将url替换为:wss://dashscope.aliyuncs.com/api-ws/v1/realtime
                .url("wss://dashscope-intl.aliyuncs.com/api-ws/v1/realtime")
                // 新加坡和北京地域的API Key不同。获取API Key:https://www.alibabacloud.com/help/zh/model-studio/get-api-key
                .apikey(System.getenv("DASHSCOPE_API_KEY"))
                .build();

        AtomicReference<CountDownLatch> completeLatch = new AtomicReference<>(new CountDownLatch(1));

        // 创建实时播放器实例
        RealtimePcmPlayer audioPlayer = new RealtimePcmPlayer(24000);

        final AtomicReference<QwenTtsRealtime> qwenTtsRef = new AtomicReference<>(null);
        QwenTtsRealtime qwenTtsRealtime = new QwenTtsRealtime(param, new QwenTtsRealtimeCallback() {
            //            File file = new File("result_24k.pcm");
//            FileOutputStream fos = new FileOutputStream(file);
            @Override
            public void onOpen() {
                System.out.println("connection opened");
                System.out.println("输入文本并按Enter发送,输入'quit'退出程序");
            }
            @Override
            public void onEvent(JsonObject message) {
                String type = message.get("type").getAsString();
                switch(type) {
                    case "session.created":
                        System.out.println("start session: " + message.get("session").getAsJsonObject().get("id").getAsString());
                        break;
                    case "response.audio.delta":
                        String recvAudioB64 = message.get("delta").getAsString();
                        byte[] rawAudio = Base64.getDecoder().decode(recvAudioB64);
                        //                            fos.write(rawAudio);
                        // 实时播放音频
                        audioPlayer.write(recvAudioB64);
                        break;
                    case "response.done":
                        System.out.println("response done");
                        // 等待音频播放完成
                        try {
                            audioPlayer.waitForComplete();
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                        // 为下一次输入做准备
                        completeLatch.get().countDown();
                        break;
                    case "session.finished":
                        System.out.println("session finished");
                        if (qwenTtsRef.get() != null) {
                            System.out.println("[Metric] response: " + qwenTtsRef.get().getResponseId() +
                                    ", first audio delay: " + qwenTtsRef.get().getFirstAudioDelay() + " ms");
                        }
                        completeLatch.get().countDown();
                    default:
                        break;
                }
            }
            @Override
            public void onClose(int code, String reason) {
                System.out.println("connection closed code: " + code + ", reason: " + reason);
                try {
//                    fos.close();
                    // 等待播放完成并关闭播放器
                    audioPlayer.waitForComplete();
                    audioPlayer.shutdown();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        qwenTtsRef.set(qwenTtsRealtime);
        try {
            qwenTtsRealtime.connect();
        } catch (NoApiKeyException e) {
            throw new RuntimeException(e);
        }
        QwenTtsRealtimeConfig config = QwenTtsRealtimeConfig.builder()
                .voice("Cherry")
                .responseFormat(ttsFormat)
                .mode("commit")
                // 如需使用指令控制功能,请取消下方注释,并将model替换为qwen3-tts-instruct-flash-realtime
                // .instructions("")
                // .optimizeInstructions(true)
                .build();
        qwenTtsRealtime.updateSession(config);

        // 循环读取用户输入
        while (true) {
            System.out.print("请输入要合成的文本: ");
            String text = scanner.nextLine();

            // 如果用户输入quit,则退出程序
            if ("quit".equalsIgnoreCase(text.trim())) {
                System.out.println("正在关闭连接...");
                qwenTtsRealtime.finish();
                completeLatch.get().await();
                break;
            }

            // 如果用户输入为空,跳过
            if (text.trim().isEmpty()) {
                continue;
            }

            // 重新初始化倒计时锁存器
            completeLatch.set(new CountDownLatch(1));

            // 发送文本
            qwenTtsRealtime.appendText(text);
            qwenTtsRealtime.commit();

            // 等待本次合成完成
            completeLatch.get().await();
        }

        // 清理资源
        audioPlayer.waitForComplete();
        audioPlayer.shutdown();
        scanner.close();
        System.exit(0);
    }
}

进阶功能

以下功能用于精细控制语音合成效果。

Qwen-TTS 交互模式

Qwen-TTS Realtime API 提供两种 WebSocket 交互模式,通过 session.mode 参数切换:

  • server_commit 模式:由服务端智能处理文本分段与合成时机,适合大段文本的连续合成场景。客户端只需持续追加文本,无需关注分段和提交。

  • commit 模式:由客户端主动提交文本缓冲区以触发合成,适合需要精确控制合成时机的场景(如对话式 AI 逐轮合成)。

SDK 中设置交互模式

  • Python SDK:在 update_session 方法中通过 mode 参数设置。

    qwen_tts_realtime.update_session(
        voice='Cherry',
        response_format=AudioFormat.PCM_24000HZ_MONO_16BIT,
        mode='server_commit'
    )
  • Java SDK:通过 QwenTtsRealtimeConfig.builder() 设置 mode 参数。

    QwenTtsRealtimeConfig config = QwenTtsRealtimeConfig.builder()
            .voice("Cherry")
            .responseFormat(ttsFormat)
            .mode("server_commit")
            .build();
    qwenTtsRealtime.updateSession(config);

完整的 SDK 代码示例请参见Python SDKJava SDK。WebSocket 事件生命周期和连接复用方式的详细说明,请参见实时语音合成-千问API参考

指令控制

指令控制允许您通过自然语言描述精确控制语音的表达效果,无需调整复杂的音频参数。只需用简单的文字描述,即可让合成语音呈现特定的音调、语速、情感或音色特点,无需调整复杂的音频参数。

支持的模型

  • CosyVoice:cosyvoice-v3.5-pluscosyvoice-v3.5-flashcosyvoice-v3-flash

    不同模型对指令的格式要求不同:

    • cosyvoice-v3.5-pluscosyvoice-v3.5-flash:可输入任意指令控制合成效果(如情感、语速等)。

    • cosyvoice-v3-flash 的声音设计或声音复刻音色:可输入任意指令控制合成效果。

    • cosyvoice-v3-flash 的系统音色:指令必须使用固定格式和内容,详情请参见CosyVoice音色列表

  • Qwen-TTS:仅支持千问3-TTS-Instruct-Flash-Realtime系列模型。

使用方式

  • CosyVoice:通过 instructions 参数指定指令内容,例如“语速较快,带有明显的上扬语调,适合介绍时尚产品”。

  • Qwen-TTS:通过 instruction 参数指定指令内容,例如“语速较快,带有明显的上扬语调,适合介绍时尚产品”。

指令文本支持的语言

  • CosyVoice:

    • cosyvoice-v3.5-pluscosyvoice-v3.5-flash:中文、英文、法语、德语、日语、韩语、俄语、葡萄牙语、泰语、印尼语、越南语。

    • cosyvoice-v3-flash:中文、英文、法语、德语、日语、韩语、俄语。

  • Qwen-TTS:仅支持中文和英文。

指令文本长度限制

  • CosyVoice:不超过 100 字符。汉字(包括简体/繁体汉字、日文汉字和韩文汉字)按 2 个字符计算,其他字符(如标点符号、字母、数字、日韩文假名/谚文等)按 1 个字符计算。

  • Qwen-TTS:不超过 1600 Token。

适用场景

  • 有声书和广播剧配音

  • 广告和宣传片配音

  • 游戏角色和动画配音

  • 情感化的智能语音助手

  • 纪录片和新闻播报

如何编写高质量的声音描述:

  • 核心原则:

    1. 具体而非模糊:使用能描绘具体声音特质的词语,如“低沉”、“清脆”、“语速偏快”。避免使用“好听”、“普通”等主观且缺乏信息量的词汇。

    2. 多维而非单一:好的描述通常结合多个维度(如音调、语速、情感等)。仅描述单一维度(如“高音”)过于宽泛,难以生成特色鲜明的效果。

    3. 客观而非主观:聚焦声音本身的物理和感知特征,而非个人喜好。例如,用“音调偏高,带有活力”代替“我最喜欢的声音”。

    4. 原创而非模仿:请描述声音的特质,而非要求模仿特定人物(如名人、演员)。模仿请求涉及版权风险,且模型不支持直接模仿。

    5. 简洁而非冗余:确保每个词都有意义。避免重复同义词或堆砌无意义的强调词(如”非常非常棒的声音”)。

  • 描述维度参考:组合多个维度可以创造更丰富的表达效果。

    维度

    描述示例

    音调

    高音、中音、低音、偏高、偏低

    语速

    快速、中速、缓慢、偏快、偏慢

    情感

    开朗、沉稳、温柔、严肃、活泼、冷静、治愈

    特点

    有磁性、清脆、沙哑、圆润、甜美、浑厚、有力

    用途

    新闻播报、广告配音、有声书、动画角色、语音助手、纪录片解说

  • 示例:

    • 标准播音风格:吐字清晰精准,字正腔圆

    • 情绪递进效果:音量由正常对话迅速增强至高喊,性格直率,情绪易激动且外露

    • 特殊情感状态:哭腔导致发音略微含糊,略显沙哑,带有明显哭腔的紧张感

    • 广告配音风格:音调偏高,语速中等,充满活力和感染力,适合广告配音

    • 温柔治愈风格:语速偏慢,音调温柔甜美,语气治愈温暖,像贴心朋友般关怀

WebSocket 直连示例

如果您不使用 DashScope SDK,可以通过 WebSocket 原始协议直接连接服务端进行语音合成。以下示例仅提供最基础的调通实现,实际业务代码需您自行开发。各模型的 WebSocket 协议说明(服务端点、请求头、交互流程),请参见对应的 API 参考文档。

点击查看 WebSocket 直连示例

CosyVoice

Go

package main

import (
	"encoding/json"
	"fmt"
	"net/http"
	"os"
	"strings"
	"time"

	"github.com/google/uuid"
	"github.com/gorilla/websocket"
)

const (
	// 以下为新加坡地域url,若使用北京地域的模型,需将url替换为:wss://dashscope.aliyuncs.com/api-ws/v1/inference/
	wsURL      = "wss://dashscope-intl.aliyuncs.com/api-ws/v1/inference/"
	outputFile = "output.mp3"
)

func main() {
	// 新加坡和北京地域的API Key不同。获取API Key:https://www.alibabacloud.com/help/zh/model-studio/get-api-key
	// 若没有配置环境变量,请用百炼API Key将下行替换为:apiKey := "sk-xxx"
	apiKey := os.Getenv("DASHSCOPE_API_KEY")

	// 清空输出文件
	os.Remove(outputFile)
	os.Create(outputFile)

	// 连接WebSocket
	header := make(http.Header)
	header.Add("X-DashScope-DataInspection", "enable")
	header.Add("Authorization", fmt.Sprintf("bearer %s", apiKey))

	conn, resp, err := websocket.DefaultDialer.Dial(wsURL, header)
	if err != nil {
		if resp != nil {
			fmt.Printf("连接失败 HTTP状态码: %d\n", resp.StatusCode)
		}
		fmt.Println("连接失败:", err)
		return
	}
	defer conn.Close()

	// 生成任务ID
	taskID := uuid.New().String()
	fmt.Printf("生成任务ID: %s\n", taskID)

	// 发送run-task事件
	runTaskCmd := map[string]interface{}{
		"header": map[string]interface{}{
			"action":    "run-task",
			"task_id":   taskID,
			"streaming": "duplex",
		},
		"payload": map[string]interface{}{
			"task_group": "audio",
			"task":       "tts",
			"function":   "SpeechSynthesizer",
			"model":      "cosyvoice-v3-flash",
			"parameters": map[string]interface{}{
				"text_type":   "PlainText",
				"voice":       "longanyang",
				"format":      "mp3",
				"sample_rate": 22050,
				"volume":      50,
				"rate":        1,
				"pitch":       1,
				// 如果enable_ssml设为true,只允许发送一次continue-task事件,否则会报错“Text request limit violated, expected 1.”
				"enable_ssml": false,
			},
			"input": map[string]interface{}{},
		},
	}

	runTaskJSON, _ := json.Marshal(runTaskCmd)
	fmt.Printf("发送run-task事件: %s\n", string(runTaskJSON))

	err = conn.WriteMessage(websocket.TextMessage, runTaskJSON)
	if err != nil {
		fmt.Println("发送run-task失败:", err)
		return
	}

	textSent := false

	// 处理消息
	for {
		messageType, message, err := conn.ReadMessage()
		if err != nil {
			fmt.Println("读取消息失败:", err)
			break
		}

		// 处理二进制消息
		if messageType == websocket.BinaryMessage {
			fmt.Printf("收到二进制消息,长度: %d\n", len(message))
			file, _ := os.OpenFile(outputFile, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0644)
			file.Write(message)
			file.Close()
			continue
		}

		// 处理文本消息
		messageStr := string(message)
		fmt.Printf("收到文本消息: %s\n", strings.ReplaceAll(messageStr, "\n", ""))

		// 简单解析JSON获取event类型
		var msgMap map[string]interface{}
		if json.Unmarshal(message, &msgMap) == nil {
			if header, ok := msgMap["header"].(map[string]interface{}); ok {
				if event, ok := header["event"].(string); ok {
					fmt.Printf("事件类型: %s\n", event)

					switch event {
					case "task-started":
						fmt.Println("=== 收到task-started事件 ===")

						if !textSent {
							// 发送continue-task事件

							texts := []string{"床前明月光,疑是地上霜。", "举头望明月,低头思故乡。"}

							for _, text := range texts {
								continueTaskCmd := map[string]interface{}{
									"header": map[string]interface{}{
										"action":    "continue-task",
										"task_id":   taskID,
										"streaming": "duplex",
									},
									"payload": map[string]interface{}{
										"input": map[string]interface{}{
											"text": text,
										},
									},
								}

								continueTaskJSON, _ := json.Marshal(continueTaskCmd)
								fmt.Printf("发送continue-task事件: %s\n", string(continueTaskJSON))

								err = conn.WriteMessage(websocket.TextMessage, continueTaskJSON)
								if err != nil {
									fmt.Println("发送continue-task失败:", err)
									return
								}
							}

							textSent = true

							// 延迟发送finish-task
							time.Sleep(500 * time.Millisecond)

							// 发送finish-task事件
							finishTaskCmd := map[string]interface{}{
								"header": map[string]interface{}{
									"action":    "finish-task",
									"task_id":   taskID,
									"streaming": "duplex",
								},
								"payload": map[string]interface{}{
									"input": map[string]interface{}{},
								},
							}

							finishTaskJSON, _ := json.Marshal(finishTaskCmd)
							fmt.Printf("发送finish-task事件: %s\n", string(finishTaskJSON))

							err = conn.WriteMessage(websocket.TextMessage, finishTaskJSON)
							if err != nil {
								fmt.Println("发送finish-task失败:", err)
								return
							}
						}

					case "task-finished":
						fmt.Println("=== 任务完成 ===")
						return

					case "task-failed":
						fmt.Println("=== 任务失败 ===")
						if header["error_message"] != nil {
							fmt.Printf("错误信息: %s\n", header["error_message"])
						}
						return

					case "result-generated":
						fmt.Println("收到result-generated事件")
					}
				}
			}
		}
	}
}

C#

using System.Net.WebSockets;
using System.Text;
using System.Text.Json;

class Program {
    // 新加坡和北京地域的API Key不同。获取API Key:https://www.alibabacloud.com/help/zh/model-studio/get-api-key
    // 若没有配置环境变量,请用百炼API Key将下行替换为:private static readonly string ApiKey = "sk-xxx"
    private static readonly string ApiKey = Environment.GetEnvironmentVariable("DASHSCOPE_API_KEY") ?? throw new InvalidOperationException("DASHSCOPE_API_KEY environment variable is not set.");

    // 以下为新加坡地域url,若使用北京地域的模型,需将url替换为:wss://dashscope.aliyuncs.com/api-ws/v1/inference/
    private const string WebSocketUrl = "wss://dashscope-intl.aliyuncs.com/api-ws/v1/inference/";
    // 输出文件路径
    private const string OutputFilePath = "output.mp3";

    // WebSocket客户端
    private static ClientWebSocket _webSocket = new ClientWebSocket();
    // 取消令牌源
    private static CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
    // 任务ID
    private static string? _taskId;
    // 任务是否已启动
    private static TaskCompletionSource<bool> _taskStartedTcs = new TaskCompletionSource<bool>();

    static async Task Main(string[] args) {
        try {
            // 清空输出文件
            ClearOutputFile(OutputFilePath);

            // 连接WebSocket服务
            await ConnectToWebSocketAsync(WebSocketUrl);

            // 启动接收消息的任务
            Task receiveTask = ReceiveMessagesAsync();

            // 发送run-task事件
            _taskId = GenerateTaskId();
            await SendRunTaskCommandAsync(_taskId);

            // 等待task-started事件
            await _taskStartedTcs.Task;

            // 持续发送continue-task事件
            string[] texts = {
                "床前明月光",
                "疑是地上霜",
                "举头望明月",
                "低头思故乡"
            };
            foreach (string text in texts) {
                await SendContinueTaskCommandAsync(text);
            }

            // 发送finish-task事件
            await SendFinishTaskCommandAsync(_taskId);

            // 等待接收任务完成
            await receiveTask;

            Console.WriteLine("任务完成,连接已关闭。");
        } catch (OperationCanceledException) {
            Console.WriteLine("任务被取消。");
        } catch (Exception ex) {
            Console.WriteLine($"发生错误:{ex.Message}");
        } finally {
            _cancellationTokenSource.Cancel();
            _webSocket.Dispose();
        }
    }

    private static void ClearOutputFile(string filePath) {
        if (File.Exists(filePath)) {
            File.WriteAllText(filePath, string.Empty);
            Console.WriteLine("输出文件已清空。");
        } else {
            Console.WriteLine("输出文件不存在,无需清空。");
        }
    }

    private static async Task ConnectToWebSocketAsync(string url) {
        var uri = new Uri(url);
        if (_webSocket.State == WebSocketState.Connecting || _webSocket.State == WebSocketState.Open) {
            return;
        }

        // 设置WebSocket连接的头部信息
        _webSocket.Options.SetRequestHeader("Authorization", $"bearer {ApiKey}");
        _webSocket.Options.SetRequestHeader("X-DashScope-DataInspection", "enable");

        try {
            await _webSocket.ConnectAsync(uri, _cancellationTokenSource.Token);
            Console.WriteLine("已成功连接到WebSocket服务。");
        } catch (OperationCanceledException) {
            Console.WriteLine("WebSocket连接被取消。");
        } catch (Exception ex) {
            Console.WriteLine($"WebSocket连接失败: {ex.Message}");
            throw;
        }
    }

    private static async Task SendRunTaskCommandAsync(string taskId) {
        var command = CreateCommand("run-task", taskId, "duplex", new {
            task_group = "audio",
            task = "tts",
            function = "SpeechSynthesizer",
            model = "cosyvoice-v3-flash",
            parameters = new
            {
                text_type = "PlainText",
                voice = "longanyang",
                format = "mp3",
                sample_rate = 22050,
                volume = 50,
                rate = 1,
                pitch = 1,
                // 如果enable_ssml设为true,只允许发送一次continue-task事件,否则会报错“Text request limit violated, expected 1.”
                enable_ssml = false
            },
            input = new { }
        });

        await SendJsonMessageAsync(command);
        Console.WriteLine("已发送run-task事件。");
    }

    private static async Task SendContinueTaskCommandAsync(string text) {
        if (_taskId == null) {
            throw new InvalidOperationException("任务ID未初始化。");
        }

        var command = CreateCommand("continue-task", _taskId, "duplex", new {
            input = new {
                text
            }
        });

        await SendJsonMessageAsync(command);
        Console.WriteLine("已发送continue-task事件。");
    }

    private static async Task SendFinishTaskCommandAsync(string taskId) {
        var command = CreateCommand("finish-task", taskId, "duplex", new {
            input = new { }
        });

        await SendJsonMessageAsync(command);
        Console.WriteLine("已发送finish-task事件。");
    }

    private static async Task SendJsonMessageAsync(string message) {
        var buffer = Encoding.UTF8.GetBytes(message);
        try {
            await _webSocket.SendAsync(new ArraySegment<byte>(buffer), WebSocketMessageType.Text, true, _cancellationTokenSource.Token);
        } catch (OperationCanceledException) {
            Console.WriteLine("消息发送被取消。");
        }
    }

    private static async Task ReceiveMessagesAsync() {
        while (_webSocket.State == WebSocketState.Open) {
            var response = await ReceiveMessageAsync();
            if (response != null) {
                var eventStr = response.RootElement.GetProperty("header").GetProperty("event").GetString();
                switch (eventStr) {
                    case "task-started":
                        Console.WriteLine("任务已启动。");
                        _taskStartedTcs.TrySetResult(true);
                        break;
                    case "task-finished":
                        Console.WriteLine("任务已完成。");
                        _cancellationTokenSource.Cancel();
                        break;
                    case "task-failed":
                        Console.WriteLine("任务失败:" + response.RootElement.GetProperty("header").GetProperty("error_message").GetString());
                        _cancellationTokenSource.Cancel();
                        break;
                    default:
                        // result-generated可在此处理
                        break;
                }
            }
        }
    }

    private static async Task<JsonDocument?> ReceiveMessageAsync() {
        var buffer = new byte[1024 * 4];
        var segment = new ArraySegment<byte>(buffer);

        try {
            WebSocketReceiveResult result = await _webSocket.ReceiveAsync(segment, _cancellationTokenSource.Token);

            if (result.MessageType == WebSocketMessageType.Close) {
                await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", _cancellationTokenSource.Token);
                return null;
            }

            if (result.MessageType == WebSocketMessageType.Binary) {
                // 处理二进制数据
                Console.WriteLine("接收到二进制数据...");

                // 将二进制数据保存到文件
                using (var fileStream = new FileStream(OutputFilePath, FileMode.Append)) {
                    fileStream.Write(buffer, 0, result.Count);
                }

                return null;
            }

            string message = Encoding.UTF8.GetString(buffer, 0, result.Count);
            return JsonDocument.Parse(message);
        } catch (OperationCanceledException) {
            Console.WriteLine("消息接收被取消。");
            return null;
        }
    }

    private static string GenerateTaskId() {
        return Guid.NewGuid().ToString("N").Substring(0, 32);
    }

    private static string CreateCommand(string action, string taskId, string streaming, object payload) {
        var command = new {
            header = new {
                action,
                task_id = taskId,
                streaming
            },
            payload
        };

        return JsonSerializer.Serialize(command);
    }
}

PHP

示例代码目录结构为:

my-php-project/

├── composer.json

├── vendor/

└── index.php

composer.json内容如下,相关依赖的版本号请根据实际情况自行决定:

{
    "require": {
        "react/event-loop": "^1.3",
        "react/socket": "^1.11",
        "react/stream": "^1.2",
        "react/http": "^1.1",
        "ratchet/pawl": "^0.4"
    },
    "autoload": {
        "psr-4": {
            "App\\": "src/"
        }
    }
}

index.php内容如下:

<?php

require __DIR__ . '/vendor/autoload.php';

use Ratchet\Client\Connector;
use React\EventLoop\Loop;
use React\Socket\Connector as SocketConnector;

// 新加坡和北京地域的API Key不同。获取API Key:https://www.alibabacloud.com/help/zh/model-studio/get-api-key
// 若没有配置环境变量,请用百炼API Key将下行替换为:$api_key = "sk-xxx"
$api_key = getenv("DASHSCOPE_API_KEY");
// 以下为新加坡地域url,若使用北京地域的模型,需将url替换为:wss://dashscope.aliyuncs.com/api-ws/v1/inference/
$websocket_url = 'wss://dashscope-intl.aliyuncs.com/api-ws/v1/inference/'; // WebSocket服务器地址
$output_file = 'output.mp3'; // 输出文件路径

$loop = Loop::get();

if (file_exists($output_file)) {
    // 清空文件内容
    file_put_contents($output_file, '');
}

// 创建自定义的连接器
$socketConnector = new SocketConnector($loop, [
    'tcp' => [
        'bindto' => '0.0.0.0:0',
    ],
    'tls' => [
        'verify_peer' => false,
        'verify_peer_name' => false,
    ],
]);

$connector = new Connector($loop, $socketConnector);

$headers = [
    'Authorization' => 'bearer ' . $api_key,
    'X-DashScope-DataInspection' => 'enable'
];

$connector($websocket_url, [], $headers)->then(function ($conn) use ($loop, $output_file) {
    echo "连接到WebSocket服务器\n";

    // 生成任务ID
    $taskId = generateTaskId();

    // 发送 run-task 事件
    sendRunTaskMessage($conn, $taskId);

    // 定义发送 continue-task 事件的函数
    $sendContinueTask = function() use ($conn, $loop, $taskId) {
        // 待发送的文本
        $texts = ["床前明月光", "疑是地上霜", "举头望明月", "低头思故乡"];
        $continueTaskCount = 0;
        foreach ($texts as $text) {
            $continueTaskMessage = json_encode([
                "header" => [
                    "action" => "continue-task",
                    "task_id" => $taskId,
                    "streaming" => "duplex"
                ],
                "payload" => [
                    "input" => [
                        "text" => $text
                    ]
                ]
            ]);
            echo "准备发送continue-task事件: " . $continueTaskMessage . "\n";
            $conn->send($continueTaskMessage);
            $continueTaskCount++;
        }
        echo "发送的continue-task事件个数为:" . $continueTaskCount . "\n";

        // 发送 finish-task 事件
        sendFinishTaskMessage($conn, $taskId);
    };

    // 标记是否收到 task-started 事件
    $taskStarted = false;

    // 监听消息
    $conn->on('message', function($msg) use ($conn, $sendContinueTask, $loop, &$taskStarted, $taskId, $output_file) {
        if ($msg->isBinary()) {
            // 写入二进制数据到本地文件
            file_put_contents($output_file, $msg->getPayload(), FILE_APPEND);
        } else {
            // 处理非二进制消息
            $response = json_decode($msg, true);

            if (isset($response['header']['event'])) {
                handleEvent($conn, $response, $sendContinueTask, $loop, $taskId, $taskStarted);
            } else {
                echo "未知的消息格式\n";
            }
        }
    });

    // 监听连接关闭
    $conn->on('close', function($code = null, $reason = null) {
        echo "连接已关闭\n";
        if ($code !== null) {
            echo "关闭代码: " . $code . "\n";
        }
        if ($reason !== null) {
            echo "关闭原因:" . $reason . "\n";
        }
    });
}, function ($e) {
    echo "无法连接:{$e->getMessage()}\n";
});

$loop->run();

/**
 * 生成任务ID
 * @return string
 */
function generateTaskId(): string {
    return bin2hex(random_bytes(16));
}

/**
 * 发送 run-task 事件
 * @param $conn
 * @param $taskId
 */
function sendRunTaskMessage($conn, $taskId) {
    $runTaskMessage = json_encode([
        "header" => [
            "action" => "run-task",
            "task_id" => $taskId,
            "streaming" => "duplex"
        ],
        "payload" => [
            "task_group" => "audio",
            "task" => "tts",
            "function" => "SpeechSynthesizer",
            "model" => "cosyvoice-v3-flash",
            "parameters" => [
                "text_type" => "PlainText",
                "voice" => "longanyang",
                "format" => "mp3",
                "sample_rate" => 22050,
                "volume" => 50,
                "rate" => 1,
                "pitch" => 1,
                // 如果enable_ssml设为true,只允许发送一次continue-task事件,否则会报错“Text request limit violated, expected 1.”
                "enable_ssml" => false
            ],
            "input" => (object) []
        ]
    ]);
    echo "准备发送run-task事件: " . $runTaskMessage . "\n";
    $conn->send($runTaskMessage);
    echo "run-task事件已发送\n";
}

/**
 * 读取音频文件
 * @param string $filePath
 * @return bool|string
 */
function readAudioFile(string $filePath) {
    $voiceData = file_get_contents($filePath);
    if ($voiceData === false) {
        echo "无法读取音频文件\n";
    }
    return $voiceData;
}

/**
 * 分割音频数据
 * @param string $data
 * @param int $chunkSize
 * @return array
 */
function splitAudioData(string $data, int $chunkSize): array {
    return str_split($data, $chunkSize);
}

/**
 * 发送 finish-task 事件
 * @param $conn
 * @param $taskId
 */
function sendFinishTaskMessage($conn, $taskId) {
    $finishTaskMessage = json_encode([
        "header" => [
            "action" => "finish-task",
            "task_id" => $taskId,
            "streaming" => "duplex"
        ],
        "payload" => [
            "input" => (object) []
        ]
    ]);
    echo "准备发送finish-task事件: " . $finishTaskMessage . "\n";
    $conn->send($finishTaskMessage);
    echo "finish-task事件已发送\n";
}

/**
 * 处理事件
 * @param $conn
 * @param $response
 * @param $sendContinueTask
 * @param $loop
 * @param $taskId
 * @param $taskStarted
 */
function handleEvent($conn, $response, $sendContinueTask, $loop, $taskId, &$taskStarted) {
    switch ($response['header']['event']) {
        case 'task-started':
            echo "任务开始,发送continue-task事件...\n";
            $taskStarted = true;
            // 发送 continue-task 事件
            $sendContinueTask();
            break;
        case 'result-generated':
            // 收到result-generated事件
            break;
        case 'task-finished':
            echo "任务完成\n";
            $conn->close();
            break;
        case 'task-failed':
            echo "任务失败\n";
            echo "错误代码:" . $response['header']['error_code'] . "\n";
            echo "错误信息:" . $response['header']['error_message'] . "\n";
            $conn->close();
            break;
        case 'error':
            echo "错误:" . $response['payload']['message'] . "\n";
            break;
        default:
            echo "未知事件:" . $response['header']['event'] . "\n";
            break;
    }

    // 如果任务已完成,关闭连接
    if ($response['header']['event'] == 'task-finished') {
        // 等待1秒以确保所有数据都已传输完毕
        $loop->addTimer(1, function() use ($conn) {
            $conn->close();
            echo "客户端关闭连接\n";
        });
    }

    // 如果没有收到 task-started 事件,关闭连接
    if (!$taskStarted && in_array($response['header']['event'], ['task-failed', 'error'])) {
        $conn->close();
    }
}

Node.js

需安装相关依赖:

npm install ws
npm install uuid

示例代码如下:

const WebSocket = require('ws');
const fs = require('fs');
const uuid = require('uuid').v4;

// 新加坡和北京地域的API Key不同。获取API Key:https://www.alibabacloud.com/help/zh/model-studio/get-api-key
// 若没有配置环境变量,请用百炼API Key将下行替换为:const apiKey = "sk-xxx"
const apiKey = process.env.DASHSCOPE_API_KEY;
// 以下为新加坡地域url,若使用北京地域的模型,需将url替换为:wss://dashscope.aliyuncs.com/api-ws/v1/inference/
const url = 'wss://dashscope-intl.aliyuncs.com/api-ws/v1/inference/';
// 输出文件路径
const outputFilePath = 'output.mp3';

// 清空输出文件
fs.writeFileSync(outputFilePath, '');

// 创建WebSocket客户端
const ws = new WebSocket(url, {
  headers: {
    Authorization: `bearer ${apiKey}`,
    'X-DashScope-DataInspection': 'enable'
  }
});

let taskStarted = false;
let taskId = uuid();

ws.on('open', () => {
  console.log('已连接到WebSocket服务器');

  // 发送run-task事件
  const runTaskMessage = JSON.stringify({
    header: {
      action: 'run-task',
      task_id: taskId,
      streaming: 'duplex'
    },
    payload: {
      task_group: 'audio',
      task: 'tts',
      function: 'SpeechSynthesizer',
      model: 'cosyvoice-v3-flash',
      parameters: {
        text_type: 'PlainText',
        voice: 'longanyang', // 音色
        format: 'mp3', // 音频格式
        sample_rate: 22050, // 采样率
        volume: 50, // 音量
        rate: 1, // 语速
        pitch: 1, // 音调
        enable_ssml: false // 是否开启SSML功能。如果enable_ssml设为true,只允许发送一次continue-task事件,否则会报错“Text request limit violated, expected 1.”
      },
      input: {}
    }
  });
  ws.send(runTaskMessage);
  console.log('已发送run-task消息');
});

const fileStream = fs.createWriteStream(outputFilePath, { flags: 'a' });
ws.on('message', (data, isBinary) => {
  if (isBinary) {
    // 写入二进制数据到文件
    fileStream.write(data);
  } else {
    const message = JSON.parse(data);

    switch (message.header.event) {
      case 'task-started':
        taskStarted = true;
        console.log('任务已开始');
        // 发送continue-task事件
        sendContinueTasks(ws);
        break;
      case 'task-finished':
        console.log('任务已完成');
        ws.close();
        fileStream.end(() => {
          console.log('文件流已关闭');
        });
        break;
      case 'task-failed':
        console.error('任务失败:', message.header.error_message);
        ws.close();
        fileStream.end(() => {
          console.log('文件流已关闭');
        });
        break;
      default:
        // 可以在这里处理result-generated
        break;
    }
  }
});

function sendContinueTasks(ws) {
  const texts = [
    '床前明月光,',
    '疑是地上霜。',
    '举头望明月,',
    '低头思故乡。'
  ];

  texts.forEach((text, index) => {
    setTimeout(() => {
      if (taskStarted) {
        const continueTaskMessage = JSON.stringify({
          header: {
            action: 'continue-task',
            task_id: taskId,
            streaming: 'duplex'
          },
          payload: {
            input: {
              text: text
            }
          }
        });
        ws.send(continueTaskMessage);
        console.log(`已发送continue-task,文本:${text}`);
      }
    }, index * 1000); // 每隔1秒发送一次
  });

  // 发送finish-task事件
  setTimeout(() => {
    if (taskStarted) {
      const finishTaskMessage = JSON.stringify({
        header: {
          action: 'finish-task',
          task_id: taskId,
          streaming: 'duplex'
        },
        payload: {
          input: {}
        }
      });
      ws.send(finishTaskMessage);
      console.log('已发送finish-task');
    }
  }, texts.length * 1000 + 1000); // 在所有continue-task事件发送完毕后1秒发送
}

ws.on('close', () => {
  console.log('已断开与WebSocket服务器的连接');
});

Java

如您使用Java编程语言,建议采用Java DashScope SDK进行开发,详情请参见Java SDK

以下是Java WebSocket的调用示例。在运行示例前,请确保已导入以下依赖:

  • Java-WebSocket

  • jackson-databind

推荐您使用Maven或Gradle管理依赖包,其配置如下:

pom.xml

<dependencies>
    <!-- WebSocket Client -->
    <dependency>
        <groupId>org.java-websocket</groupId>
        <artifactId>Java-WebSocket</artifactId>
        <version>1.5.3</version>
    </dependency>

    <!-- JSON Processing -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.13.0</version>
    </dependency>
</dependencies>

build.gradle

// 省略其它代码
dependencies {
  // WebSocket Client
  implementation 'org.java-websocket:Java-WebSocket:1.5.3'
  // JSON Processing
  implementation 'com.fasterxml.jackson.core:jackson-databind:2.13.0'
}
// 省略其它代码

Java代码如下:

import com.fasterxml.jackson.databind.ObjectMapper;

import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;

import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.*;

public class TTSWebSocketClient extends WebSocketClient {
    private final String taskId = UUID.randomUUID().toString();
    private final String outputFile = "output_" + System.currentTimeMillis() + ".mp3";
    private boolean taskFinished = false;

    public TTSWebSocketClient(URI serverUri, Map<String, String> headers) {
        super(serverUri, headers);
    }

    @Override
    public void onOpen(ServerHandshake serverHandshake) {
        System.out.println("连接成功");

        // 发送run-task事件
        // 如果enable_ssml设为true,只允许发送一次continue-task事件,否则会报错“Text request limit violated, expected 1.”
        String runTaskCommand = "{ \"header\": { \"action\": \"run-task\", \"task_id\": \"" + taskId + "\", \"streaming\": \"duplex\" }, \"payload\": { \"task_group\": \"audio\", \"task\": \"tts\", \"function\": \"SpeechSynthesizer\", \"model\": \"cosyvoice-v3-flash\", \"parameters\": { \"text_type\": \"PlainText\", \"voice\": \"longanyang\", \"format\": \"mp3\", \"sample_rate\": 22050, \"volume\": 50, \"rate\": 1, \"pitch\": 1, \"enable_ssml\": false }, \"input\": {} }}";
        send(runTaskCommand);
    }

    @Override
    public void onMessage(String message) {
        System.out.println("收到服务端返回的消息:" + message);
        try {
            // Parse JSON message
            Map<String, Object> messageMap = new ObjectMapper().readValue(message, Map.class);

            if (messageMap.containsKey("header")) {
                Map<String, Object> header = (Map<String, Object>) messageMap.get("header");

                if (header.containsKey("event")) {
                    String event = (String) header.get("event");

                    if ("task-started".equals(event)) {
                        System.out.println("收到服务端返回的task-started事件");

                        List<String> texts = Arrays.asList(
                                "床前明月光,疑是地上霜",
                                "举头望明月,低头思故乡"
                        );

                        for (String text : texts) {
                            // 发送continue-task事件
                            sendContinueTask(text);
                        }

                        // 发送finish-task事件
                        sendFinishTask();
                    } else if ("task-finished".equals(event)) {
                        System.out.println("收到服务端返回的task-finished事件");
                        taskFinished = true;
                        closeConnection();
                    } else if ("task-failed".equals(event)) {
                        System.out.println("任务失败:" + message);
                        closeConnection();
                    }
                }
            }
        } catch (Exception e) {
            System.err.println("出现异常:" + e.getMessage());
        }
    }

    @Override
    public void onMessage(ByteBuffer message) {
        System.out.println("收到的二进制音频数据大小为:" + message.remaining());

        try (FileOutputStream fos = new FileOutputStream(outputFile, true)) {
            byte[] buffer = new byte[message.remaining()];
            message.get(buffer);
            fos.write(buffer);
            System.out.println("音频数据已写入本地文件" + outputFile + "中");
        } catch (IOException e) {
            System.err.println("音频数据写入本地文件失败:" + e.getMessage());
        }
    }

    @Override
    public void onClose(int code, String reason, boolean remote) {
        System.out.println("连接关闭:" + reason + " (" + code + ")");
    }

    @Override
    public void onError(Exception ex) {
        System.err.println("报错:" + ex.getMessage());
        ex.printStackTrace();
    }

    private void sendContinueTask(String text) {
        String command = "{ \"header\": { \"action\": \"continue-task\", \"task_id\": \"" + taskId + "\", \"streaming\": \"duplex\" }, \"payload\": { \"input\": { \"text\": \"" + text + "\" } }}";
        send(command);
    }

    private void sendFinishTask() {
        String command = "{ \"header\": { \"action\": \"finish-task\", \"task_id\": \"" + taskId + "\", \"streaming\": \"duplex\" }, \"payload\": { \"input\": {} }}";
        send(command);
    }

    private void closeConnection() {
        if (!isClosed()) {
            close();
        }
    }

    public static void main(String[] args) {
        try {
            // 新加坡和北京地域的API Key不同。获取API Key:https://www.alibabacloud.com/help/zh/model-studio/get-api-key
            // 若没有配置环境变量,请用百炼API Key将下行替换为:String apiKey = "sk-xxx"
            String apiKey = System.getenv("DASHSCOPE_API_KEY");
            if (apiKey == null || apiKey.isEmpty()) {
                System.err.println("请设置 DASHSCOPE_API_KEY 环境变量");
                return;
            }

            Map<String, String> headers = new HashMap<>();
            headers.put("Authorization", "bearer " + apiKey);
            // 以下为新加坡地域url,若使用北京地域的模型,需将url替换为:wss://dashscope.aliyuncs.com/api-ws/v1/inference/
            TTSWebSocketClient client = new TTSWebSocketClient(new URI("wss://dashscope-intl.aliyuncs.com/api-ws/v1/inference/"), headers);

            client.connect();

            while (!client.isClosed() && !client.taskFinished) {
                Thread.sleep(1000);
            }
        } catch (Exception e) {
            System.err.println("连接WebSocket服务失败:" + e.getMessage());
            e.printStackTrace();
        }
    }
}

Python

如您使用Python编程语言,建议采用Python DashScope SDK进行开发,详情请参见Python SDK

以下是Python WebSocket的调用示例。在运行示例前,请确保通过如下方式导入依赖:

pip uninstall websocket-client
pip uninstall websocket
pip install websocket-client
重要

请不要将运行示例代码的Python文件命名为“websocket.py”,否则会报错(AttributeError: module 'websocket' has no attribute 'WebSocketApp'. Did you mean: 'WebSocket'?)。

import websocket
import json
import uuid
import os
import time


class TTSClient:
    def __init__(self, api_key, uri):
        """
    初始化 TTSClient 实例

    参数:
        api_key (str): 鉴权用的 API Key
        uri (str): WebSocket 服务地址
    """
        self.api_key = api_key  # 替换为你的 API Key
        self.uri = uri  # 替换为你的 WebSocket 地址
        self.task_id = str(uuid.uuid4())  # 生成唯一任务 ID
        self.output_file = f"output_{int(time.time())}.mp3"  # 输出音频文件路径
        self.ws = None  # WebSocketApp 实例
        self.task_started = False  # 是否收到 task-started
        self.task_finished = False  # 是否收到 task-finished / task-failed

    def on_open(self, ws):
        """
    WebSocket 连接建立时回调函数
    发送 run-task 事件开启语音合成任务
    """
        print("WebSocket 已连接")

        # 构造 run-task 事件
        run_task_cmd = {
            "header": {
                "action": "run-task",
                "task_id": self.task_id,
                "streaming": "duplex"
            },
            "payload": {
                "task_group": "audio",
                "task": "tts",
                "function": "SpeechSynthesizer",
                "model": "cosyvoice-v3-flash",
                "parameters": {
                    "text_type": "PlainText",
                    "voice": "longanyang",
                    "format": "mp3",
                    "sample_rate": 22050,
                    "volume": 50,
                    "rate": 1,
                    "pitch": 1,
                    # 如果enable_ssml设为True,只允许发送一次continue-task事件,否则会报错“Text request limit violated, expected 1.”
                    "enable_ssml": False
                },
                "input": {}
            }
        }

        # 发送 run-task 事件
        ws.send(json.dumps(run_task_cmd))
        print("已发送 run-task 事件")

    def on_message(self, ws, message):
        """
    接收到消息时的回调函数
    区分文本和二进制消息处理
    """
        if isinstance(message, str):
            # 处理 JSON 文本消息
            try:
                msg_json = json.loads(message)
                print(f"收到 JSON 消息: {msg_json}")

                if "header" in msg_json:
                    header = msg_json["header"]

                    if "event" in header:
                        event = header["event"]

                        if event == "task-started":
                            print("任务已启动")
                            self.task_started = True

                            # 发送 continue-task 事件
                            texts = [
                                "床前明月光,疑是地上霜",
                                "举头望明月,低头思故乡"
                            ]

                            for text in texts:
                                self.send_continue_task(text)

                            # 所有 continue-task 发送完成后发送 finish-task
                            self.send_finish_task()

                        elif event == "task-finished":
                            print("任务已完成")
                            self.task_finished = True
                            self.close(ws)

                        elif event == "task-failed":
                            error_msg = msg_json.get("error_message", "未知错误")
                            print(f"任务失败: {error_msg}")
                            self.task_finished = True
                            self.close(ws)

            except json.JSONDecodeError as e:
                print(f"JSON 解析失败: {e}")
        else:
            # 处理二进制消息(音频数据)
            print(f"收到二进制消息,大小: {len(message)} 字节")
            with open(self.output_file, "ab") as f:
                f.write(message)
            print(f"已将音频数据写入本地文件{self.output_file}中")

    def on_error(self, ws, error):
        """发生错误时的回调"""
        print(f"WebSocket 出错: {error}")

    def on_close(self, ws, close_status_code, close_msg):
        """连接关闭时的回调"""
        print(f"WebSocket 已关闭: {close_msg} ({close_status_code})")

    def send_continue_task(self, text):
        """发送 continue-task 事件,附带要合成的文本内容"""
        cmd = {
            "header": {
                "action": "continue-task",
                "task_id": self.task_id,
                "streaming": "duplex"
            },
            "payload": {
                "input": {
                    "text": text
                }
            }
        }

        self.ws.send(json.dumps(cmd))
        print(f"已发送 continue-task 事件,文本内容: {text}")

    def send_finish_task(self):
        """发送 finish-task 事件,结束语音合成任务"""
        cmd = {
            "header": {
                "action": "finish-task",
                "task_id": self.task_id,
                "streaming": "duplex"
            },
            "payload": {
                "input": {}
            }
        }

        self.ws.send(json.dumps(cmd))
        print("已发送 finish-task 事件")

    def close(self, ws):
        """主动关闭连接"""
        if ws and ws.sock and ws.sock.connected:
            ws.close()
            print("已主动关闭连接")

    def run(self):
        """启动 WebSocket 客户端"""
        # 设置请求头部(鉴权)
        header = {
            "Authorization": f"bearer {self.api_key}",
            "X-DashScope-DataInspection": "enable"
        }

        # 创建 WebSocketApp 实例
        self.ws = websocket.WebSocketApp(
            self.uri,
            header=header,
            on_open=self.on_open,
            on_message=self.on_message,
            on_error=self.on_error,
            on_close=self.on_close
        )

        print("正在监听 WebSocket 消息...")
        self.ws.run_forever()  # 启动长连接监听


# 示例使用方式
if __name__ == "__main__":
    # 新加坡和北京地域的API Key不同。获取API Key:https://www.alibabacloud.com/help/zh/model-studio/get-api-key
    # 若没有配置环境变量,请用百炼API Key将下行替换为:API_KEY = "sk-xxx"
    API_KEY = os.environ.get("DASHSCOPE_API_KEY")
    # 以下为新加坡地域url,若使用北京地域的模型,需将url替换为:wss://dashscope.aliyuncs.com/api-ws/v1/inference/
    SERVER_URI = "wss://dashscope-intl.aliyuncs.com/api-ws/v1/inference/"  # 替换为你的 WebSocket 地址

    client = TTSClient(API_KEY, SERVER_URI)
    client.run()

Qwen-TTS

  1. 创建客户端

    Python

    在本地新建 Python 文件,命名为tts_realtime_client.py并复制以下代码到文件中:

    # -- coding: utf-8 --
    
    import asyncio
    import websockets
    import json
    import base64
    import time
    from typing import Optional, Callable, Dict, Any
    from enum import Enum
    
    class SessionMode(Enum):
        SERVER_COMMIT = "server_commit"
        COMMIT = "commit"
    
    class TTSRealtimeClient:
        """
        与 TTS Realtime API 交互的客户端。
    
        该类提供了连接 TTS Realtime API、发送文本数据、获取音频输出以及管理 WebSocket 连接的相关方法。
    
        属性说明:
            base_url (str):
                Realtime API 的基础地址。
            api_key (str):
                用于身份验证的 API Key。
            voice (str):
                服务端合成语音所使用的声音。
            mode (SessionMode):
                会话模式,可选 server_commit 或 commit。
            audio_callback (Callable[[bytes], None]):
                接收音频数据的回调函数。
            language_type(str)
                合成的语音的语种,可选值Chinese、English、German、Italian、Portuguese、Spanish、Japanese、Korean、French、Russian、Auto
        """
    
        def __init__(
                self,
                base_url: str,
                api_key: str,
                voice: str = "Cherry",
                mode: SessionMode = SessionMode.SERVER_COMMIT,
                audio_callback: Optional[Callable[[bytes], None]] = None,
            language_type: str = "Auto"):
            self.base_url = base_url
            self.api_key = api_key
            self.voice = voice
            self.mode = mode
            self.ws = None
            self.audio_callback = audio_callback
            self.language_type = language_type
    
            # 当前回复状态
            self._current_response_id = None
            self._current_item_id = None
            self._is_responding = False
            self._response_done_future = None
    
        async def connect(self) -> None:
            """与 TTS Realtime API 建立 WebSocket 连接。"""
            headers = {
                "Authorization": f"Bearer {self.api_key}"
            }
    
            self.ws = await websockets.connect(self.base_url, additional_headers=headers)
    
            # 设置默认会话配置
            await self.update_session({
                "mode": self.mode.value,
                "voice": self.voice,
                # 如需使用指令控制功能,请取消下方注释,并在server_commit.py或commit.py中将model替换为qwen3-tts-instruct-flash-realtime
                # "instructions": "语速较快,带有明显的上扬语调,适合介绍时尚产品。",
                # "optimize_instructions": true
                "language_type": self.language_type,
                "response_format": "pcm",
                "sample_rate": 24000
            })
    
        async def send_event(self, event) -> None:
            """发送事件到服务器。"""
            event['event_id'] = "event_" + str(int(time.time() * 1000))
            print(f"发送事件: type={event['type']}, event_id={event['event_id']}")
            await self.ws.send(json.dumps(event))
    
        async def update_session(self, config: Dict[str, Any]) -> None:
            """更新会话配置。"""
            event = {
                "type": "session.update",
                "session": config
            }
            print("更新会话配置: ", event)
            await self.send_event(event)
    
        async def append_text(self, text: str) -> None:
            """向 API 发送文本数据。"""
            event = {
                "type": "input_text_buffer.append",
                "text": text
            }
            await self.send_event(event)
    
        async def commit_text_buffer(self) -> None:
            """提交文本缓冲区以触发处理。"""
            event = {
                "type": "input_text_buffer.commit"
            }
            await self.send_event(event)
    
        async def clear_text_buffer(self) -> None:
            """清除文本缓冲区。"""
            event = {
                "type": "input_text_buffer.clear"
            }
            await self.send_event(event)
    
        async def finish_session(self) -> None:
            """结束会话。"""
            event = {
                "type": "session.finish"
            }
            await self.send_event(event)
    
        async def wait_for_response_done(self):
            """等待 response.done 事件"""
            if self._response_done_future:
                await self._response_done_future
    
        async def handle_messages(self) -> None:
            """处理来自服务器的消息。"""
            try:
                async for message in self.ws:
                    event = json.loads(message)
                    event_type = event.get("type")
    
                    if event_type != "response.audio.delta":
                        print(f"收到事件: {event_type}")
    
                    if event_type == "error":
                        print("错误: ", event.get('error', {}))
                        continue
                    elif event_type == "session.created":
                        print("会话创建,ID: ", event.get('session', {}).get('id'))
                    elif event_type == "session.updated":
                        print("会话更新,ID: ", event.get('session', {}).get('id'))
                    elif event_type == "input_text_buffer.committed":
                        print("文本缓冲区已提交,项目ID: ", event.get('item_id'))
                    elif event_type == "input_text_buffer.cleared":
                        print("文本缓冲区已清除")
                    elif event_type == "response.created":
                        self._current_response_id = event.get("response", {}).get("id")
                        self._is_responding = True
                        # 创建新的 future 来等待 response.done
                        self._response_done_future = asyncio.Future()
                        print("响应已创建,ID: ", self._current_response_id)
                    elif event_type == "response.output_item.added":
                        self._current_item_id = event.get("item", {}).get("id")
                        print("输出项已添加,ID: ", self._current_item_id)
                    # 处理音频增量
                    elif event_type == "response.audio.delta" and self.audio_callback:
                        audio_bytes = base64.b64decode(event.get("delta", ""))
                        self.audio_callback(audio_bytes)
                    elif event_type == "response.audio.done":
                        print("音频生成完成")
                    elif event_type == "response.done":
                        self._is_responding = False
                        self._current_response_id = None
                        self._current_item_id = None
                        # 标记 future 完成
                        if self._response_done_future and not self._response_done_future.done():
                            self._response_done_future.set_result(True)
                        print("响应完成")
                    elif event_type == "session.finished":
                        print("会话已结束")
    
            except websockets.exceptions.ConnectionClosed:
                print("连接已关闭")
            except Exception as e:
                print("消息处理出错: ", str(e))
    
        async def close(self) -> None:
            """关闭 WebSocket 连接。"""
            if self.ws:
                await self.ws.close()

    Java

    在本地新建 Java 文件,命名为TTSRealtimeClient.java并复制以下代码到文件中:

    import com.google.gson.Gson;
    import com.google.gson.JsonObject;
    import org.java_websocket.client.WebSocketClient;
    import org.java_websocket.handshake.ServerHandshake;
    
    import java.net.URI;
    import java.util.Base64;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.CountDownLatch;
    import java.util.function.Consumer;
    
    /**
     * 与 TTS Realtime API 交互的客户端。
     *
     * 该类提供了连接 TTS Realtime API、发送文本数据、获取音频输出以及管理 WebSocket 连接的相关方法。
     */
    public class TTSRealtimeClient {
    
        public enum SessionMode {
            SERVER_COMMIT("server_commit"),
            COMMIT("commit");
            private final String value;
            SessionMode(String value) { this.value = value; }
            public String getValue() { return value; }
        }
    
        /**
         * 音频回调接口
         */
        public interface AudioCallback {
            void onAudio(byte[] audioData);
        }
    
        private final String baseUrl;
        private final String apiKey;
        private final String voice;
        private final SessionMode mode;
        private final String languageType;
        private final AudioCallback audioCallback;
        private final Gson gson = new Gson();
    
        private WebSocketClient ws;
        private CountDownLatch responseDoneLatch;
        private CountDownLatch sessionFinishedLatch;
    
        public TTSRealtimeClient(String baseUrl, String apiKey, String voice,
                                 SessionMode mode, AudioCallback audioCallback,
                                 String languageType) {
            this.baseUrl = baseUrl;
            this.apiKey = apiKey;
            this.voice = voice;
            this.mode = mode;
            this.audioCallback = audioCallback;
            this.languageType = languageType;
        }
    
        public TTSRealtimeClient(String baseUrl, String apiKey, String voice,
                                 SessionMode mode, AudioCallback audioCallback) {
            this(baseUrl, apiKey, voice, mode, audioCallback, "Auto");
        }
    
        /**
         * 与 TTS Realtime API 建立 WebSocket 连接。
         */
        public void connect() throws Exception {
            Map<String, String> headers = new HashMap<>();
            headers.put("Authorization", "Bearer " + apiKey);
    
            responseDoneLatch = new CountDownLatch(0);
            sessionFinishedLatch = new CountDownLatch(1);
    
            ws = new WebSocketClient(new URI(baseUrl), headers) {
                @Override
                public void onOpen(ServerHandshake handshake) {
                    System.out.println("WebSocket 连接已建立");
                    // 发送默认会话配置
                    JsonObject session = new JsonObject();
                    session.addProperty("mode", mode.getValue());
                    session.addProperty("voice", TTSRealtimeClient.this.voice);
                    // 如需使用指令控制功能,请取消下方注释,并将model替换为qwen3-tts-instruct-flash-realtime
                    // session.addProperty("instructions", "语速较快,带有明显的上扬语调,适合介绍时尚产品。");
                    // session.addProperty("optimize_instructions", true);
                    session.addProperty("language_type", languageType);
                    session.addProperty("response_format", "pcm");
                    session.addProperty("sample_rate", 24000);
                    updateSession(session);
                }
    
                @Override
                public void onMessage(String message) {
                    JsonObject event = gson.fromJson(message, JsonObject.class);
                    String eventType = event.has("type") ? event.get("type").getAsString() : "";
    
                    if (!"response.audio.delta".equals(eventType)) {
                        System.out.println("收到事件: " + eventType);
                    }
    
                    switch (eventType) {
                        case "error":
                            System.err.println("错误: " + event.get("error"));
                            break;
                        case "session.created":
                            System.out.println("会话创建,ID: " +
                                event.getAsJsonObject("session").get("id").getAsString());
                            break;
                        case "session.updated":
                            System.out.println("会话更新,ID: " +
                                event.getAsJsonObject("session").get("id").getAsString());
                            break;
                        case "input_text_buffer.committed":
                            System.out.println("文本缓冲区已提交,项目ID: " + event.get("item_id"));
                            break;
                        case "input_text_buffer.cleared":
                            System.out.println("文本缓冲区已清除");
                            break;
                        case "response.created":
                            System.out.println("响应已创建,ID: " +
                                event.getAsJsonObject("response").get("id").getAsString());
                            responseDoneLatch = new CountDownLatch(1);
                            break;
                        case "response.output_item.added":
                            System.out.println("输出项已添加,ID: " +
                                event.getAsJsonObject("item").get("id").getAsString());
                            break;
                        case "response.audio.delta":
                            if (audioCallback != null) {
                                byte[] audioBytes = Base64.getDecoder().decode(
                                    event.get("delta").getAsString());
                                audioCallback.onAudio(audioBytes);
                            }
                            break;
                        case "response.audio.done":
                            System.out.println("音频生成完成");
                            break;
                        case "response.done":
                            System.out.println("响应完成");
                            responseDoneLatch.countDown();
                            break;
                        case "session.finished":
                            System.out.println("会话已结束");
                            sessionFinishedLatch.countDown();
                            break;
                    }
                }
    
                @Override
                public void onClose(int code, String reason, boolean remote) {
                    System.out.println("连接已关闭: " + reason);
                }
    
                @Override
                public void onError(Exception ex) {
                    System.err.println("WebSocket 错误: " + ex.getMessage());
                }
            };
            ws.connectBlocking();
        }
    
        /**
         * 发送事件到服务器。
         */
        public void sendEvent(JsonObject event) {
            String eventId = "event_" + System.currentTimeMillis();
            event.addProperty("event_id", eventId);
            System.out.println("发送事件: type=" + event.get("type").getAsString()
                + ", event_id=" + eventId);
            ws.send(gson.toJson(event));
        }
    
        /**
         * 更新会话配置。
         */
        public void updateSession(JsonObject config) {
            JsonObject event = new JsonObject();
            event.addProperty("type", "session.update");
            event.add("session", config);
            System.out.println("更新会话配置: " + event);
            sendEvent(event);
        }
    
        /**
         * 向 API 发送文本数据。
         */
        public void appendText(String text) {
            JsonObject event = new JsonObject();
            event.addProperty("type", "input_text_buffer.append");
            event.addProperty("text", text);
            sendEvent(event);
        }
    
        /**
         * 提交文本缓冲区以触发处理。
         */
        public void commitTextBuffer() {
            JsonObject event = new JsonObject();
            event.addProperty("type", "input_text_buffer.commit");
            sendEvent(event);
        }
    
        /**
         * 清除文本缓冲区。
         */
        public void clearTextBuffer() {
            JsonObject event = new JsonObject();
            event.addProperty("type", "input_text_buffer.clear");
            sendEvent(event);
        }
    
        /**
         * 结束会话。
         */
        public void finishSession() {
            JsonObject event = new JsonObject();
            event.addProperty("type", "session.finish");
            sendEvent(event);
        }
    
        /**
         * 等待 response.done 事件。
         */
        public void waitForResponseDone() throws InterruptedException {
            responseDoneLatch.await();
        }
    
        /**
         * 等待 session.finished 事件。
         */
        public void waitForSessionFinished() throws InterruptedException {
            sessionFinishedLatch.await();
        }
    
        /**
         * 关闭 WebSocket 连接。
         */
        public void close() {
            if (ws != null) {
                ws.close();
            }
        }
    }
  2. 选择语音合成模式

    Realtime API支持以下两种模式:

    • server_commit 模式

      客户端仅发送文本。服务端会智能判断文本分段方式与合成时机。适合低延迟且无需手动控制合成节奏的场景,例如 GPS 导航。

    • commit 模式

      客户端先将文本添加至缓冲区,再主动触发服务端合成指定文本。适合需精细控制断句和停顿的场景,例如新闻播报。

    server_commit 模式

    Python

    tts_realtime_client.py的同级目录下新建另一个 Python 文件,命名为server_commit.py,并将以下代码复制进文件中:

    import os
    import asyncio
    import logging
    import wave
    from tts_realtime_client import TTSRealtimeClient, SessionMode
    import pyaudio
    
    # QwenTTS 服务配置
    # 如需使用指令控制功能,请将model替换为qwen3-tts-instruct-flash-realtime,并在tts_realtime_client.py中取消instructions的注释
    # 以下为新加坡地域url,若使用北京地域的模型,需将url替换为:wss://dashscope.aliyuncs.com/api-ws/v1/realtime?model=qwen3-tts-flash-realtime
    URL = "wss://dashscope-intl.aliyuncs.com/api-ws/v1/realtime?model=qwen3-tts-flash-realtime"
    # 新加坡和北京地域的API Key不同。获取API Key:https://www.alibabacloud.com/help/zh/model-studio/get-api-key
    # 若没有配置环境变量,请用百炼API Key将下行替换为:API_KEY="sk-xxx"
    API_KEY = os.getenv("DASHSCOPE_API_KEY")
    
    if not API_KEY:
        raise ValueError("Please set DASHSCOPE_API_KEY environment variable")
    
    # 收集音频数据
    _audio_chunks = []
    # 实时播放相关
    _AUDIO_SAMPLE_RATE = 24000
    _audio_pyaudio = pyaudio.PyAudio()
    _audio_stream = None  # 将在运行时打开
    
    def _audio_callback(audio_bytes: bytes):
        """TTSRealtimeClient 音频回调: 实时播放并缓存"""
        global _audio_stream
        if _audio_stream is not None:
            try:
                _audio_stream.write(audio_bytes)
            except Exception as exc:
                logging.error(f"PyAudio playback error: {exc}")
        _audio_chunks.append(audio_bytes)
        logging.info(f"Received audio chunk: {len(audio_bytes)} bytes")
    
    def _save_audio_to_file(filename: str = "output.wav", sample_rate: int = 24000) -> bool:
        """将收集到的音频数据保存为 WAV 文件"""
        if not _audio_chunks:
            logging.warning("No audio data to save")
            return False
    
        try:
            audio_data = b"".join(_audio_chunks)
            with wave.open(filename, 'wb') as wav_file:
                wav_file.setnchannels(1)  # 单声道
                wav_file.setsampwidth(2)  # 16-bit
                wav_file.setframerate(sample_rate)
                wav_file.writeframes(audio_data)
            logging.info(f"Audio saved to: {filename}")
            return True
        except Exception as exc:
            logging.error(f"Failed to save audio: {exc}")
            return False
    
    async def _produce_text(client: TTSRealtimeClient):
        """向服务器发送文本片段"""
        text_fragments = [
            "阿里云的大模型服务平台百炼是一站式的大模型开发及应用构建平台。",
            "不论是开发者还是业务人员,都能深入参与大模型应用的设计和构建。", 
            "您可以通过简单的界面操作,在5分钟内开发出一款大模型应用,",
            "或在几小时内训练出一个专属模型,从而将更多精力专注于应用创新。",
        ]
    
        logging.info("Sending text fragments…")
        for text in text_fragments:
            logging.info(f"Sending fragment: {text}")
            await client.append_text(text)
            await asyncio.sleep(0.1)  # 片段间稍作延时
    
        # 等待服务器完成内部处理后结束会话
        await asyncio.sleep(1.0)
        await client.finish_session()
    
    async def _run_demo():
        """运行完整 Demo"""
        global _audio_stream
        # 打开 PyAudio 输出流
        _audio_stream = _audio_pyaudio.open(
            format=pyaudio.paInt16,
            channels=1,
            rate=_AUDIO_SAMPLE_RATE,
            output=True,
            frames_per_buffer=1024
        )
    
        client = TTSRealtimeClient(
            base_url=URL,
            api_key=API_KEY,
            voice="Cherry",
            mode=SessionMode.SERVER_COMMIT,
            audio_callback=_audio_callback
        )
    
        # 建立连接
        await client.connect()
    
        # 并行执行消息处理与文本发送
        consumer_task = asyncio.create_task(client.handle_messages())
        producer_task = asyncio.create_task(_produce_text(client))
    
        await producer_task  # 等待文本发送完成
    
        # 等待 response.done
        await client.wait_for_response_done()
    
        # 关闭连接并取消消费者任务
        await client.close()
        consumer_task.cancel()
    
        # 关闭音频流
        if _audio_stream is not None:
            _audio_stream.stop_stream()
            _audio_stream.close()
        _audio_pyaudio.terminate()
    
        # 保存音频数据
        os.makedirs("outputs", exist_ok=True)
        _save_audio_to_file(os.path.join("outputs", "qwen_tts_output.wav"))
    
    def main():
        """同步入口"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s [%(levelname)s] %(message)s',
            datefmt='%Y-%m-%d %H:%M:%S'
        )
        logging.info("Starting QwenTTS Realtime Client demo…")
        asyncio.run(_run_demo())
    
    if __name__ == "__main__":
        main() 

    运行server_commit.py,即可听到 Realtime API实时生成的音频。

    Java

    TTSRealtimeClient.java的同级目录下新建另一个 Java 文件,命名为ServerCommit.java,并将以下代码复制进文件中:

    import javax.sound.sampled.*;
    import java.io.*;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.ConcurrentLinkedQueue;
    import java.util.concurrent.atomic.AtomicBoolean;
    
    public class ServerCommit {
        // 以下为新加坡地域url,若使用北京地域的模型,需将url替换为:wss://dashscope.aliyuncs.com/api-ws/v1/realtime?model=qwen3-tts-flash-realtime
        private static final String URL = "wss://dashscope-intl.aliyuncs.com/api-ws/v1/realtime?model=qwen3-tts-flash-realtime";
        // 新加坡和北京地域的API Key不同。获取API Key:https://www.alibabacloud.com/help/zh/model-studio/get-api-key
        // 若没有配置环境变量,请用百炼API Key将下行替换为:private static final String API_KEY = "sk-xxx";
        private static final String API_KEY = System.getenv("DASHSCOPE_API_KEY");
        private static final int SAMPLE_RATE = 24000;
    
        // 音频数据缓存
        private static final List<byte[]> audioChunks = new ArrayList<>();
        // 实时播放队列
        private static final ConcurrentLinkedQueue<byte[]> playbackQueue = new ConcurrentLinkedQueue<>();
        private static final AtomicBoolean playing = new AtomicBoolean(true);
    
        public static void main(String[] args) throws Exception {
            if (API_KEY == null || API_KEY.isEmpty()) {
                throw new IllegalStateException("请设置 DASHSCOPE_API_KEY 环境变量");
            }
    
            // 初始化音频播放
            AudioFormat format = new AudioFormat(SAMPLE_RATE, 16, 1, true, false);
            DataLine.Info info = new DataLine.Info(SourceDataLine.class, format);
            SourceDataLine audioLine = (SourceDataLine) AudioSystem.getLine(info);
            audioLine.open(format);
            audioLine.start();
    
            // 启动播放线程
            Thread playerThread = new Thread(() -> {
                while (playing.get() || !playbackQueue.isEmpty()) {
                    byte[] chunk = playbackQueue.poll();
                    if (chunk != null) {
                        audioLine.write(chunk, 0, chunk.length);
                    } else {
                        try { Thread.sleep(10); } catch (InterruptedException ignored) {}
                    }
                }
            });
            playerThread.start();
    
            // 创建 TTS 客户端
            // 如需使用指令控制功能,请将model替换为qwen3-tts-instruct-flash-realtime,并在TTSRealtimeClient.java中取消instructions的注释
            TTSRealtimeClient client = new TTSRealtimeClient(
                URL, API_KEY, "Cherry",
                TTSRealtimeClient.SessionMode.SERVER_COMMIT,
                audioData -> {
                    playbackQueue.add(audioData);
                    audioChunks.add(audioData);
                    System.out.println("收到音频数据: " + audioData.length + " bytes");
                }
            );
    
            client.connect();
    
            // 发送文本片段
            String[] textFragments = {
                "阿里云的大模型服务平台百炼是一站式的大模型开发及应用构建平台。",
                "不论是开发者还是业务人员,都能深入参与大模型应用的设计和构建。",
                "您可以通过简单的界面操作,在5分钟内开发出一款大模型应用,",
                "或在几小时内训练出一个专属模型,从而将更多精力专注于应用创新。"
            };
    
            System.out.println("开始发送文本...");
            for (String text : textFragments) {
                System.out.println("发送片段: " + text);
                client.appendText(text);
                Thread.sleep(100);
            }
    
            Thread.sleep(1000);
            client.finishSession();
    
            // 等待响应完成
            client.waitForResponseDone();
            client.waitForSessionFinished();
            client.close();
    
            // 等待播放完成
            playing.set(false);
            playerThread.join();
            audioLine.drain();
            audioLine.close();
    
            // 保存音频文件
            saveWav("output.wav");
            System.out.println("完成");
        }
    
        private static void saveWav(String filename) throws IOException {
            if (audioChunks.isEmpty()) {
                System.out.println("没有音频数据可保存");
                return;
            }
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            for (byte[] chunk : audioChunks) {
                bos.write(chunk);
            }
            byte[] allAudio = bos.toByteArray();
            AudioFormat format = new AudioFormat(SAMPLE_RATE, 16, 1, true, false);
            AudioInputStream ais = new AudioInputStream(
                new ByteArrayInputStream(allAudio), format, allAudio.length / 2);
            new File("outputs").mkdirs();
            AudioSystem.write(ais, AudioFileFormat.Type.WAVE,
                new File("outputs/" + filename));
            System.out.println("音频已保存到: outputs/" + filename);
        }
    }

    编译并运行ServerCommit.java,即可听到 Realtime API实时生成的音频。

    commit 模式

    Python

    tts_realtime_client.py的同级目录下新建另一个 Python 文件,命名为commit.py,并将以下代码复制进文件中:

    import os
    import asyncio
    import logging
    import wave
    from tts_realtime_client import TTSRealtimeClient, SessionMode
    import pyaudio
    
    # QwenTTS 服务配置
    # 如需使用指令控制功能,请将model替换为qwen3-tts-instruct-flash-realtime,并在tts_realtime_client.py中取消instructions的注释
    # 以下为新加坡地域url,若使用北京地域的模型,需将url替换为:wss://dashscope.aliyuncs.com/api-ws/v1/realtime?model=qwen3-tts-flash-realtime
    URL = "wss://dashscope-intl.aliyuncs.com/api-ws/v1/realtime?model=qwen3-tts-flash-realtime"
    # 新加坡和北京地域的API Key不同。获取API Key:https://www.alibabacloud.com/help/zh/model-studio/get-api-key
    # 若没有配置环境变量,请用百炼API Key将下行替换为:API_KEY="sk-xxx"
    API_KEY = os.getenv("DASHSCOPE_API_KEY")
    
    if not API_KEY:
        raise ValueError("Please set DASHSCOPE_API_KEY environment variable")
    
    # 收集音频数据
    _audio_chunks = []
    _AUDIO_SAMPLE_RATE = 24000
    _audio_pyaudio = pyaudio.PyAudio()
    _audio_stream = None
    
    def _audio_callback(audio_bytes: bytes):
        """TTSRealtimeClient 音频回调: 实时播放并缓存"""
        global _audio_stream
        if _audio_stream is not None:
            try:
                _audio_stream.write(audio_bytes)
            except Exception as exc:
                logging.error(f"PyAudio playback error: {exc}")
        _audio_chunks.append(audio_bytes)
        logging.info(f"Received audio chunk: {len(audio_bytes)} bytes")
    
    def _save_audio_to_file(filename: str = "output.wav", sample_rate: int = 24000) -> bool:
        """将收集到的音频数据保存为 WAV 文件"""
        if not _audio_chunks:
            logging.warning("No audio data to save")
            return False
    
        try:
            audio_data = b"".join(_audio_chunks)
            with wave.open(filename, 'wb') as wav_file:
                wav_file.setnchannels(1)  # 单声道
                wav_file.setsampwidth(2)  # 16-bit
                wav_file.setframerate(sample_rate)
                wav_file.writeframes(audio_data)
            logging.info(f"Audio saved to: {filename}")
            return True
        except Exception as exc:
            logging.error(f"Failed to save audio: {exc}")
            return False
    
    async def _user_input_loop(client: TTSRealtimeClient):
        """持续获取用户输入并发送文本,当用户输入空文本时发送commit事件并结束本次会话"""
        print("请输入文本(直接按Enter发送commit事件并结束本次会话,按Ctrl+C或Ctrl+D结束整个程序):")
        
        while True:
            try:
                user_text = input("> ")
                if not user_text:  # 用户输入为空
                    # 空输入视为一次对话的结束: 提交缓冲区 -> 结束会话 -> 跳出循环
                    logging.info("空输入,发送 commit 事件并结束本次会话")
                    await client.commit_text_buffer()
                    # 适当等待服务器处理 commit,防止过早结束会话导致丢失音频
                    await asyncio.sleep(0.3)
                    await client.finish_session()
                    break  # 直接退出用户输入循环,无需再次回车
                else:
                    logging.info(f"发送文本: {user_text}")
                    await client.append_text(user_text)
                    
            except EOFError:  # 用户按下Ctrl+D
                break
            except KeyboardInterrupt:  # 用户按下Ctrl+C
                break
        
        # 结束会话
        logging.info("结束会话...")
    async def _run_demo():
        """运行完整 Demo"""
        global _audio_stream
        # 打开 PyAudio 输出流
        _audio_stream = _audio_pyaudio.open(
            format=pyaudio.paInt16,
            channels=1,
            rate=_AUDIO_SAMPLE_RATE,
            output=True,
            frames_per_buffer=1024
        )
    
        client = TTSRealtimeClient(
            base_url=URL,
            api_key=API_KEY,
            voice="Cherry",
            mode=SessionMode.COMMIT,  # 修改为COMMIT模式
            audio_callback=_audio_callback
        )
    
        # 建立连接
        await client.connect()
    
        # 并行执行消息处理与用户输入
        consumer_task = asyncio.create_task(client.handle_messages())
        producer_task = asyncio.create_task(_user_input_loop(client))
    
        await producer_task  # 等待用户输入完成
    
        # 等待 response.done
        await client.wait_for_response_done()
    
        # 关闭连接并取消消费者任务
        await client.close()
        consumer_task.cancel()
    
        # 关闭音频流
        if _audio_stream is not None:
            _audio_stream.stop_stream()
            _audio_stream.close()
        _audio_pyaudio.terminate()
    
        # 保存音频数据
        os.makedirs("outputs", exist_ok=True)
        _save_audio_to_file(os.path.join("outputs", "qwen_tts_output.wav"))
    
    def main():
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s [%(levelname)s] %(message)s',
            datefmt='%Y-%m-%d %H:%M:%S'
        )
        logging.info("Starting QwenTTS Realtime Client demo…")
        asyncio.run(_run_demo())
    
    if __name__ == "__main__":
        main() 

    运行commit.py,可多次输入要合成的文本。在未输入文本的情况下单击 Enter 键,您将从扬声器听到 Realtime API返回的音频。

    Java

    TTSRealtimeClient.java的同级目录下新建另一个 Java 文件,命名为Commit.java,并将以下代码复制进文件中:

    import javax.sound.sampled.*;
    import java.io.*;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Scanner;
    import java.util.concurrent.ConcurrentLinkedQueue;
    import java.util.concurrent.atomic.AtomicBoolean;
    
    public class Commit {
        // 以下为新加坡地域url,若使用北京地域的模型,需将url替换为:wss://dashscope.aliyuncs.com/api-ws/v1/realtime?model=qwen3-tts-flash-realtime
        private static final String URL = "wss://dashscope-intl.aliyuncs.com/api-ws/v1/realtime?model=qwen3-tts-flash-realtime";
        // 新加坡和北京地域的API Key不同。获取API Key:https://www.alibabacloud.com/help/zh/model-studio/get-api-key
        // 若没有配置环境变量,请用百炼API Key将下行替换为:private static final String API_KEY = "sk-xxx";
        private static final String API_KEY = System.getenv("DASHSCOPE_API_KEY");
        private static final int SAMPLE_RATE = 24000;
    
        private static final List<byte[]> audioChunks = new ArrayList<>();
        private static final ConcurrentLinkedQueue<byte[]> playbackQueue = new ConcurrentLinkedQueue<>();
        private static final AtomicBoolean playing = new AtomicBoolean(true);
    
        public static void main(String[] args) throws Exception {
            if (API_KEY == null || API_KEY.isEmpty()) {
                throw new IllegalStateException("请设置 DASHSCOPE_API_KEY 环境变量");
            }
    
            // 初始化音频播放
            AudioFormat format = new AudioFormat(SAMPLE_RATE, 16, 1, true, false);
            DataLine.Info info = new DataLine.Info(SourceDataLine.class, format);
            SourceDataLine audioLine = (SourceDataLine) AudioSystem.getLine(info);
            audioLine.open(format);
            audioLine.start();
    
            // 启动播放线程
            Thread playerThread = new Thread(() -> {
                while (playing.get() || !playbackQueue.isEmpty()) {
                    byte[] chunk = playbackQueue.poll();
                    if (chunk != null) {
                        audioLine.write(chunk, 0, chunk.length);
                    } else {
                        try { Thread.sleep(10); } catch (InterruptedException ignored) {}
                    }
                }
            });
            playerThread.start();
    
            // 创建 TTS 客户端(commit 模式)
            // 如需使用指令控制功能,请将model替换为qwen3-tts-instruct-flash-realtime,并在TTSRealtimeClient.java中取消instructions的注释
            TTSRealtimeClient client = new TTSRealtimeClient(
                URL, API_KEY, "Cherry",
                TTSRealtimeClient.SessionMode.COMMIT,
                audioData -> {
                    playbackQueue.add(audioData);
                    audioChunks.add(audioData);
                    System.out.println("收到音频数据: " + audioData.length + " bytes");
                }
            );
    
            client.connect();
    
            // 交互式输入
            System.out.println("请输入文本(直接按Enter发送commit事件并结束本次会话,按Ctrl+D结束程序):");
            Scanner scanner = new Scanner(System.in);
            while (true) {
                System.out.print("> ");
                if (!scanner.hasNextLine()) {
                    client.finishSession();
                    break;
                }
                String userText = scanner.nextLine();
                if (userText.isEmpty()) {
                    // 空输入:提交缓冲区并结束会话
                    System.out.println("空输入,发送 commit 事件并结束本次会话");
                    client.commitTextBuffer();
                    Thread.sleep(300);
                    client.finishSession();
                    break;
                } else {
                    System.out.println("发送文本: " + userText);
                    client.appendText(userText);
                }
            }
            scanner.close();
    
            // 等待响应完成
            client.waitForResponseDone();
            client.waitForSessionFinished();
            client.close();
    
            // 等待播放完成
            playing.set(false);
            playerThread.join();
            audioLine.drain();
            audioLine.close();
    
            // 保存音频文件
            saveWav("output.wav");
            System.out.println("完成");
        }
    
        private static void saveWav(String filename) throws IOException {
            if (audioChunks.isEmpty()) {
                System.out.println("没有音频数据可保存");
                return;
            }
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            for (byte[] chunk : audioChunks) {
                bos.write(chunk);
            }
            byte[] allAudio = bos.toByteArray();
            AudioFormat format = new AudioFormat(SAMPLE_RATE, 16, 1, true, false);
            AudioInputStream ais = new AudioInputStream(
                new ByteArrayInputStream(allAudio), format, allAudio.length / 2);
            new File("outputs").mkdirs();
            AudioSystem.write(ais, AudioFileFormat.Type.WAVE,
                new File("outputs/" + filename));
            System.out.println("音频已保存到: outputs/" + filename);
        }
    }

    编译并运行Commit.java,可多次输入要合成的文本。在未输入文本的情况下单击 Enter 键,您将从扬声器听到 Realtime API返回的音频。

连接复用

WebSocket 服务支持连接复用以提升资源利用效率,避免重复建立连接的开销。一个合成任务结束后,WebSocket 连接可以被复用以开启下一个任务,无需重新建立连接。

复用流程

  • CosyVoice / Sambert:客户端发送 finish-task(CosyVoice)或在任务完成后,服务端返回 task-finished 事件以结束任务。之后客户端可重新发送 run-task 事件开启新任务。

  • Qwen-TTS:客户端发送 session.finish 后,服务端返回 session.finished 事件以结束会话。之后客户端可重新建立新会话开启下一个合成任务。

重要
  1. 必须等服务端返回结束事件(task-finishedsession.finished)后才可发起新任务。

  2. CosyVoice 和 Sambert 在复用连接中的不同任务需要使用不同的 task_id

  3. 如果在任务执行过程中发生失败,服务端将返回错误事件并关闭连接,此时该连接无法继续复用。

  4. 如果在任务结束后 60 秒没有新的任务,连接会超时自动断开。

各模型的事件详细说明,请参见对应的CosyVoice API参考Qwen-TTS API参考

高并发最佳实践

在高并发场景下,为每个请求独立创建和销毁 WebSocket 连接会产生巨大的开销。DashScope SDK 内置了连接池与对象池机制,用于复用连接和对象,显著降低延迟和资源消耗。

点击查看高并发最佳实践

CosyVoice

前提条件

Python SDK:对象池优化

Python SDK 通过 SpeechSynthesizerObjectPool 实现对象池优化,用于管理和复用 SpeechSynthesizer 对象。

对象池在初始化时会立即创建指定数量的 SpeechSynthesizer 实例并预先建立 WebSocket 连接。从池中获取对象时无需等待连接建立,可直接发起请求,有效降低首包延迟。当任务完成并将对象归还到对象池后,其 WebSocket 连接不会关闭,而是保持活跃状态等待下次任务复用。

实现步骤

  1. 安装依赖:安装DashScope依赖(pip install -U dashscope

  2. 创建并配置对象池

    对象池大小需要通过SpeechSynthesizerObjectPool进行设置。推荐值:峰值并发数的 1.5 至 2 倍。对象池大小不应超过您账户的 QPS(每秒查询率)限制。

    通过以下代码创建全局单例固定大小对象池。对象池在初始化时会立即创建指定数量的 SpeechSynthesizer 对象并建立 WebSocket 连接,因此会有一定耗时。

    from dashscope.audio.tts_v2 import SpeechSynthesizerObjectPool
    
    synthesizer_object_pool = SpeechSynthesizerObjectPool(max_size=20)
  3. 从对象池中获取SpeechSynthesizer对象

    如果当前未归还的对象数量已超过对象池的最大容量,系统会额外创建一个新的SpeechSynthesizer对象。

    此类新创建的对象需要重新进行初始化并建立 WebSocket 连接,无法利用对象池的既有连接资源,因此不具备复用效果。

    speech_synthesizer = connectionPool.borrow_synthesizer(
        model='cosyvoice-v3-flash',
        voice='longanyang',
        seed=12382,
        callback=synthesizer_callback
    )
  4. 进行语音合成

    调用SpeechSynthesizer对象的call或streaming_call方法进行语音合成。

  5. 归还SpeechSynthesizer对象

    语音合成任务结束后,归还SpeechSynthesizer对象,以便后续任务可以复用该对象。

    不要归还未完成任务或任务失败的对象。

    connectionPool.return_synthesizer(speech_synthesizer)

完整代码

# !/usr/bin/env python3
# Copyright (C) Alibaba Group. All Rights Reserved.
# MIT License (https://opensource.org/licenses/MIT)

import os
import time
import threading

import dashscope
from dashscope.audio.tts_v2 import *


USE_CONNECTION_POOL = True
text_to_synthesize = [
    '第一句、欢迎使用阿里巴巴语音合成服务。',
    '第二句、欢迎使用阿里巴巴语音合成服务。',
    '第三句、欢迎使用阿里巴巴语音合成服务。',
]
connectionPool = None
if USE_CONNECTION_POOL:
    print('creating connection pool')
    start_time = time.time() * 1000
    connectionPool = SpeechSynthesizerObjectPool(max_size=3)
    end_time = time.time() * 1000
    print('connection pool created, cost: {} ms'.format(end_time - start_time))

def init_dashscope_api_key():
    '''
    Set your DashScope API-key. More information:
    https://github.com/aliyun/alibabacloud-bailian-speech-demo/blob/master/PREREQUISITES.md
    '''
    # 新加坡地域和北京地域的API Key不同。获取API Key:https://www.alibabacloud.com/help/zh/model-studio/get-api-key
    if 'DASHSCOPE_API_KEY' in os.environ:
        dashscope.api_key = os.environ[
            'DASHSCOPE_API_KEY']  # load API-key from environment variable DASHSCOPE_API_KEY
    else:
        dashscope.api_key = '<your-dashscope-api-key>'  # set API-key manually


def synthesis_text_to_speech_and_play_by_streaming_mode(text, task_id):
    global USE_CONNECTION_POOL, connectionPool
    '''
    Synthesize speech with given text by streaming mode, async call and play the synthesized audio in real-time.
    for more information, please refer to https://www.alibabacloud.com/help/document_detail/2712523.html
    '''

    complete_event = threading.Event()

    # Define a callback to handle the result

    class Callback(ResultCallback):
        def on_open(self):
            # when using object pool, on_open will be called after task start
            self.file = open(f'result_{task_id}.mp3', 'wb')
            print(f'[task_{task_id}] start')

        def on_complete(self):
            print(f'[task_{task_id}] speech synthesis task complete successfully.')
            complete_event.set()

        def on_error(self, message: str):
            print(f'[task_{task_id}] speech synthesis task failed, {message}')

        def on_close(self):
            # when using object pool, on_open will be called after task finished
            print(f'[task_{task_id}] finished')

        def on_event(self, message):
            # print(f'recv speech synthsis message {message}')
            pass

        def on_data(self, data: bytes) -> None:
            # send to player
            # save audio to file
            self.file.write(data)

    # Call the speech synthesizer callback
    synthesizer_callback = Callback()

    # Initialize the speech synthesizer
    # you can customize the synthesis parameters, like voice, format, sample_rate or other parameters
    if USE_CONNECTION_POOL:
        speech_synthesizer = connectionPool.borrow_synthesizer(
            model='cosyvoice-v3-flash',
            voice='longanyang',
            seed=12382,
            callback=synthesizer_callback
        )
    else:
        speech_synthesizer = SpeechSynthesizer(model='cosyvoice-v3-flash',
                                               voice='longanyang',
                                               seed=12382,
                                               callback=synthesizer_callback)
    try:
        speech_synthesizer.call(text)
    except Exception as e:
        print(f'[task_{task_id}] speech synthesis task failed, {e}')
        if USE_CONNECTION_POOL:
            # close the synthesizer connection manually if task failed when using connection pool.
            speech_synthesizer.close()
        return

    print('[task_{}] Synthesized text: {}'.format(task_id, text))
    complete_event.wait()
    print('[task_{}][Metric] requestId: {}, first package delay ms: {}'.format(
        task_id,
        speech_synthesizer.get_last_request_id(),
        speech_synthesizer.get_first_package_delay()))
    if USE_CONNECTION_POOL:
        connectionPool.return_synthesizer(speech_synthesizer)


# main function
if __name__ == '__main__':
    # 以下为新加坡地域url,若使用北京地域的模型,需将url替换为:wss://dashscope.aliyuncs.com/api-ws/v1/inference
    dashscope.base_websocket_api_url='wss://dashscope-intl.aliyuncs.com/api-ws/v1/inference'
    init_dashscope_api_key()
    task_thread_list = []
    for task_id in range(3):
        thread = threading.Thread(
            target=synthesis_text_to_speech_and_play_by_streaming_mode,
            args=(text_to_synthesize[task_id], task_id))
        task_thread_list.append(thread)

    for task_thread in task_thread_list:
        task_thread.start()

    for task_thread in task_thread_list:
        task_thread.join()

    if USE_CONNECTION_POOL:
        connectionPool.shutdown()

资源管理与异常处理

  • 任务成功:当语音合成任务正常完成时,必须调用 connectionPool.return_synthesizer(speech_synthesizer) 将 SpeechSynthesizer 对象归还到池中,以便复用。

    重要

    不要归还未完成任务或任务失败的SpeechSynthesizer对象。

  • 任务失败:当 SDK 内部或业务逻辑抛出异常导致任务中断时,主动关闭底层的 WebSocket 连接:speech_synthesizer.close()

  • 在所有语音合成任务完成后,要通过如下方式关闭对象池:connectionPool.shutdown()

  • 在服务出现TaskFailed报错时,不需要额外处理。

Java SDK:连接池与对象池优化

Java SDK通过内置的连接池和自定义的对象池协同工作,实现最佳性能。

  • 连接池:SDK 内部集成的 OkHttp3 连接池,负责管理和复用底层的 WebSocket 连接,减少网络握手开销。此功能默认开启。

  • 对象池:基于 commons-pool2 实现,用于维护一组已预先建立好连接的 SpeechSynthesizer 对象。从池中获取对象可消除连接建立的延迟,显著降低首包延迟。

实现步骤

  1. 添加依赖

    根据项目构建工具,在依赖配置文件中添加 dashscope-sdk-java 和 commons-pool2。

    以Maven和Gradle为例,配置如下:

    Maven

    1. 打开您的Maven项目的pom.xml文件。

    2. <dependencies>标签内添加以下依赖信息。

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>dashscope-sdk-java</artifactId>
        <!-- 请将 'the-latest-version' 替换为2.16.9及以上版本,可在如下链接查询相关版本号:https://mvnrepository.com/artifact/com.alibaba/dashscope-sdk-java -->
        <version>the-latest-version</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-pool2</artifactId>
        <!-- 请将 'the-latest-version' 替换为最新版本,可在如下链接查询相关版本号:https://mvnrepository.com/artifact/org.apache.commons/commons-pool2 -->
        <version>the-latest-version</version>
    </dependency>
    1. 保存pom.xml文件。

    2. 使用Maven命令(如mvn clean installmvn compile)来更新项目依赖

    Gradle

    1. 打开您的Gradle项目的build.gradle文件。

    2. dependencies块内添加以下依赖信息。

      dependencies {
          // 请将 'the-latest-version' 替换为2.16.6及以上版本,可在如下链接查询相关版本号:https://mvnrepository.com/artifact/com.alibaba/dashscope-sdk-java
          implementation group: 'com.alibaba', name: 'dashscope-sdk-java', version: 'the-latest-version'
          
          // 请将 'the-latest-version' 替换为最新版本,可在如下链接查询相关版本号:https://mvnrepository.com/artifact/org.apache.commons/commons-pool2
          implementation group: 'org.apache.commons', name: 'commons-pool2', version: 'the-latest-version'
      }
    3. 保存build.gradle文件。

    4. 在命令行中,切换到您的项目根目录,执行以下Gradle命令来更新项目依赖。

      ./gradlew build --refresh-dependencies

      或者,如果您使用的是Windows系统,命令应为:

      gradlew build --refresh-dependencies
  2. 配置连接池

    通过环境变量配置连接池关键参数:

    环境变量

    描述

    DASHSCOPE_CONNECTION_POOL_SIZE

    连接池大小。

    推荐值:峰值并发数的 2 倍以上。

    默认值:32。

    DASHSCOPE_MAXIMUM_ASYNC_REQUESTS

    最大异步请求数。

    推荐值:与 DASHSCOPE_CONNECTION_POOL_SIZE 保持一致。

    默认值:32。

    DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST

    单主机最大异步请求数。

    推荐值:与 DASHSCOPE_CONNECTION_POOL_SIZE 保持一致。

    默认值:32。

  3. 配置对象池

    通过环境变量配置对象池大小:

    环境变量

    描述

    COSYVOICE_OBJECTPOOL_SIZE

    对象池大小。

    推荐值:峰值并发数的 1.5 至 2 倍。

    默认值:500。

    重要
    • 对象池的大小(COSYVOICE_OBJECTPOOL_SIZE)必须小于或等于连接池的大小(DASHSCOPE_CONNECTION_POOL_SIZE)。否则,当对象池请求对象时,若连接池已满,会导致调用线程阻塞,等待可用连接。

    • 对象池大小不应超过您账户的 QPS(每秒查询率)限制。

    通过如下代码创建对象池:

    class CosyvoiceObjectPool {
        // 。。。这里省略其它代码,完整示例请参见完整代码
        public static GenericObjectPool<SpeechSynthesizer> getInstance() {
            lock.lock();
            if (synthesizerPool == null) {
                // 您可以在这里设置对象池的大小。或在环境变量COSYVOICE_OBJECTPOOL_SIZE中设置。
                // 建议设置为服务器最大并发连接数的1.5到2倍。
                int objectPoolSize = getObjectivePoolSize();
                SpeechSynthesizerObjectFactory speechSynthesizerObjectFactory =
                        new SpeechSynthesizerObjectFactory();
                GenericObjectPoolConfig<SpeechSynthesizer> config =
                        new GenericObjectPoolConfig<>();
                config.setMaxTotal(objectPoolSize);
                config.setMaxIdle(objectPoolSize);
                config.setMinIdle(objectPoolSize);
                synthesizerPool =
                        new GenericObjectPool<>(speechSynthesizerObjectFactory, config);
            }
            lock.unlock();
            return synthesizerPool;
        }
    }
  4. 从对象池中获取SpeechSynthesizer对象

    如果当前未归还的对象数量已超过对象池的最大容量,系统会额外创建一个新的SpeechSynthesizer对象。

    此类新创建的对象需要重新进行初始化并建立 WebSocket 连接,无法利用对象池的既有连接资源,因此不具备复用效果。

    synthesizer = CosyvoiceObjectPool.getInstance().borrowObject();
  5. 进行语音合成

    调用SpeechSynthesizer对象的call或streamingCall方法进行语音合成。

  6. 归还SpeechSynthesizer对象

    语音合成任务结束后,归还SpeechSynthesizer对象,以便后续任务可以复用该对象。

    不要归还未完成任务或任务失败的对象。

    CosyvoiceObjectPool.getInstance().returnObject(synthesizer);

完整代码

import com.alibaba.dashscope.audio.tts.SpeechSynthesisResult;
import com.alibaba.dashscope.audio.ttsv2.SpeechSynthesisAudioFormat;
import com.alibaba.dashscope.audio.ttsv2.SpeechSynthesisParam;
import com.alibaba.dashscope.audio.ttsv2.SpeechSynthesizer;
import com.alibaba.dashscope.common.ResultCallback;
import com.alibaba.dashscope.exception.NoApiKeyException;
import com.alibaba.dashscope.utils.Constants;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import java.time.LocalDateTime;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;

/**
 * 您需要在项目中引入org.apache.commons.pool2和DashScope相关的包。
 *
 * DashScope SDK 2.16.6及后续版本针对高并发场景进行了优化,
 * DashScope SDK 2.16.6之前的版本不推荐在高并发场景下使用。
 *
 *
 * 在对TTS服务进行高并发调用之前,
 * 请通过以下环境变量配置连接池的相关参数。
 *
 * DASHSCOPE_MAXIMUM_ASYNC_REQUESTS
 * DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST
 * DASHSCOPE_CONNECTION_POOL_SIZE
 *
 */

class SpeechSynthesizerObjectFactory
        extends BasePooledObjectFactory<SpeechSynthesizer> {
    public SpeechSynthesizerObjectFactory() {
        super();
    }
    @Override
    public SpeechSynthesizer create() throws Exception {
        return new SpeechSynthesizer();
    }

    @Override
    public PooledObject<SpeechSynthesizer> wrap(SpeechSynthesizer obj) {
        return new DefaultPooledObject<>(obj);
    }
}

class CosyvoiceObjectPool {
    public static GenericObjectPool<SpeechSynthesizer> synthesizerPool;
    public static String COSYVOICE_OBJECTPOOL_SIZE_ENV = "COSYVOICE_OBJECTPOOL_SIZE";
    public static int DEFAULT_OBJECT_POOL_SIZE = 500;
    private static Lock lock = new java.util.concurrent.locks.ReentrantLock();
    public static int getObjectivePoolSize() {
        try {
            Integer n = Integer.parseInt(System.getenv(COSYVOICE_OBJECTPOOL_SIZE_ENV));
            System.out.println("Using Object Pool Size In Env: "+ n);
            return n;
        } catch (NumberFormatException e) {
            System.out.println("Using Default Object Pool Size: "+ DEFAULT_OBJECT_POOL_SIZE);
            return DEFAULT_OBJECT_POOL_SIZE;
        }
    }
    public static GenericObjectPool<SpeechSynthesizer> getInstance() {
        lock.lock();
        if (synthesizerPool == null) {
            // 您可以在这里设置对象池的大小。或在环境变量COSYVOICE_OBJECTPOOL_SIZE中设置。
            // 建议设置为服务器最大并发连接数的1.5到2倍。
            int objectPoolSize = getObjectivePoolSize();
            SpeechSynthesizerObjectFactory speechSynthesizerObjectFactory =
                    new SpeechSynthesizerObjectFactory();
            GenericObjectPoolConfig<SpeechSynthesizer> config =
                    new GenericObjectPoolConfig<>();
            config.setMaxTotal(objectPoolSize);
            config.setMaxIdle(objectPoolSize);
            config.setMinIdle(objectPoolSize);
            synthesizerPool =
                    new GenericObjectPool<>(speechSynthesizerObjectFactory, config);
        }
        lock.unlock();
        return synthesizerPool;
    }
}

class SynthesizeTaskWithCallback implements Runnable {
    String[] textArray;
    String requestId;
    long timeCost;
    public SynthesizeTaskWithCallback(String[] textArray) {
        this.textArray = textArray;
    }
    @Override
    public void run() {
        SpeechSynthesizer synthesizer = null;
        long startTime = System.currentTimeMillis();
        // if recv onError
        final boolean[] hasError = {false};
        try {
            class ReactCallback extends ResultCallback<SpeechSynthesisResult> {
                ReactCallback() {}

                @Override
                public void onEvent(SpeechSynthesisResult message) {
                    if (message.getAudioFrame() != null) {
                        try {
                            byte[] bytesArray = message.getAudioFrame().array();
                            System.out.println("收到音频,音频文件流length为:" + bytesArray.length);
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    }
                }

                @Override
                public void onComplete() {}

                @Override
                public void onError(Exception e) {
                    System.out.println(e.getMessage());
                    e.printStackTrace();
                    hasError[0] = true;
                }
            }

            // 将your-dashscope-api-key替换成您自己的API-KEY
            String dashScopeApiKey = "your-dashscope-api-key";

            SpeechSynthesisParam param =
                    SpeechSynthesisParam.builder()
                            .model("cosyvoice-v3-flash")
                            .voice("longanyang")
                            // 新加坡地域和北京地域的API Key不同。获取API Key:https://www.alibabacloud.com/help/zh/model-studio/get-api-key
                            // 若没有配置环境变量,请用百炼API Key将下行替换为:.apiKey("sk-xxx")
                            .apiKey(System.getenv("DASHSCOPE_API_KEY"))
                            .format(SpeechSynthesisAudioFormat
                                    .MP3_22050HZ_MONO_256KBPS) // 流式合成使用PCM或者MP3
                            .apiKey(dashScopeApiKey)
                            .build();

            try {
                synthesizer = CosyvoiceObjectPool.getInstance().borrowObject();
                synthesizer.updateParamAndCallback(param, new ReactCallback());
                for (String text : textArray) {
                    synthesizer.streamingCall(text);
                }
                Thread.sleep(20);
                synthesizer.streamingComplete(60000);
                requestId = synthesizer.getLastRequestId();
            } catch (Exception e) {
                System.out.println("Exception e: " + e.toString());
                hasError[0] = true;
            }
        } catch (Exception e) {
            hasError[0] = true;
            throw new RuntimeException(e);
        }
        if (synthesizer != null) {
            try {
                if (hasError[0] == true) {
                    // 如果出现异常,则关闭连接并在对象池中禁用该对象。
                    synthesizer.getDuplexApi().close(1000, "bye");
                    CosyvoiceObjectPool.getInstance().invalidateObject(synthesizer);
                } else {
                    // 如果任务正常结束,则归还对象。
                    CosyvoiceObjectPool.getInstance().returnObject(synthesizer);
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
            long endTime = System.currentTimeMillis();
            timeCost = endTime - startTime;
            System.out.println("[线程 " + Thread.currentThread() + "] 语音合成任务结束。耗时 " + timeCost + " ms, RequestId " + requestId);
        }
    }
}

@Slf4j
public class SynthesizeTextToSpeechWithCallbackConcurrently {
    public static void checkoutEnv(String envName, int defaultSize) {
        if (System.getenv(envName) != null) {
            System.out.println("[ENV CHECK]: " + envName + " "
                    + System.getenv(envName));
        } else {
            System.out.println("[ENV CHECK]: " + envName
                    + " Using Default which is " + defaultSize);
        }
    }

    public static void main(String[] args)
            throws InterruptedException, NoApiKeyException {
        // 以下为新加坡地域url,若使用北京地域的模型,需将url替换为:wss://dashscope.aliyuncs.com/api-ws/v1/inference
        Constants.baseWebsocketApiUrl = "wss://dashscope-intl.aliyuncs.com/api-ws/v1/inference";
        // Check for connection pool env
        checkoutEnv("DASHSCOPE_CONNECTION_POOL_SIZE", 32);
        checkoutEnv("DASHSCOPE_MAXIMUM_ASYNC_REQUESTS", 32);
        checkoutEnv("DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST", 32);
        checkoutEnv(CosyvoiceObjectPool.COSYVOICE_OBJECTPOOL_SIZE_ENV, CosyvoiceObjectPool.DEFAULT_OBJECT_POOL_SIZE);

        int runTimes = 3;
        // Create the pool of SpeechSynthesis objects
        ExecutorService executorService = Executors.newFixedThreadPool(runTimes);

        for (int i = 0; i < runTimes; i++) {
            // Record the task submission time
            LocalDateTime submissionTime = LocalDateTime.now();
            executorService.submit(new SynthesizeTaskWithCallback(new String[] {
                    "床前明月光,", "疑似地上霜。", "举头望明月,", "低头思故乡。"}));
        }

        // Shut down the ExecutorService and wait for all tasks to complete
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.MINUTES);
        System.exit(0);
    }
}

推荐配置

以下配置基于在指定规格的阿里云服务器上仅运行 CosyVoice 语音合成服务的测试结果。过高的并发数可能导致任务处理延迟。

其中单机并发数指的是同一时刻正在运行的CosyVoice语音合成任务数,也可以理解为工作线程数。

机器配置(阿里云)

单机最大并发数

对象池大小

连接池大小

4核8GiB

100

500

2000

8核16GiB

150

500

2000

16核32GiB

200

500

2000

资源管理与异常处理

  • 任务成功:当语音合成任务正常完成时,必须调用GenericObjectPool的returnObject方法将SpeechSynthesizer对象归还到池中,以便复用。

    在当前代码中,对应CosyvoiceObjectPool.getInstance().returnObject(synthesizer)

    重要

    不要归还未完成任务或任务失败的SpeechSynthesizer对象。

  • 任务失败:当 SDK 内部或业务逻辑抛出异常导致任务中断时,必须执行以下两个操作:

    1. 主动关闭底层的 WebSocket 连接

    2. 从对象池中废弃该对象,防止被再次使用

    // 在当前代码中对应如下内容
    // 关闭连接
    synthesizer.getDuplexApi().close(1000, "bye");
    // 在对象池中废弃出现异常的synthesizer
    CosyvoiceObjectPool.getInstance().invalidateObject(synthesizer);
  • 在服务出现TaskFailed报错时,不需要额外处理。

调用预热与耗时统计说明

在对 DashScope Java SDK 进行并发调用延迟等性能评估时,建议在正式测试前执行充分的预热操作。预热能够确保测量结果准确反映服务在稳定状态下的真实性能,避免因初始连接耗时导致的数据偏差。

连接复用机制

DashScope Java SDK 通过全局单例的连接池高效管理和复用 WebSocket 连接,旨在减少频繁建连和断连的开销,提升高并发场景下的处理能力。

该机制的工作特点如下:

  • 按需创建:SDK 不会在服务启动时预创建 WebSocket 连接,而是在首次调用时按需建立。

  • 限时复用:请求完成后,连接将在池中保留最多 60 秒以备复用。

    • 若 60 秒内有新请求,将复用现有连接,避免重复握手开销。

    • 若连接空闲超过 60 秒,将被自动关闭以释放资源。

预热的重要性

在以下场景中,连接池中可能没有可复用的活跃连接,导致请求需要新建连接:

  • 应用刚启动,尚未发起任何调用。

  • 服务空闲时间超过 60 秒,池中连接已因超时而关闭。

在这些场景下,首次或初期请求会触发完整的 WebSocket 建连过程(包括 TCP 握手、TLS 加密协商和协议升级),其端到端延迟会显著高于后续复用连接的请求。这部分额外耗时源于网络连接初始化,并非服务本身的处理延迟。因此,若未进行预热,性能测试结果会因包含初始建连时间而产生偏差。

SDK侧延迟与实际首包延迟的区别

SDK侧打印的首包延迟(如通过 get_first_package_delay() 获取的值)包含了 WebSocket 建联和网络传输等耗时,并不等同于模型服务的实际首包延迟。

实际首包延迟是指从服务端收到 run-task 指令到返回第一个 result-generated 事件的时间间隔,该值可通过服务端日志查看。

在高并发场景下,由于大量连接的建立和资源调度,SDK侧打印的延迟数值可能显著高于服务端的实际首包延迟。如果您观察到 SDK 报告的首包延迟较高,建议您:

  • 对比服务端日志中的首包延迟(从 run-task 到首个 result-generated),确认模型推理性能是否正常。

  • 使用上述对象池或连接池机制进行预热,消除 WebSocket 建连开销,使 SDK 侧打印的延迟更接近实际首包延迟。

推荐做法

为获取可靠的性能数据,在正式进行性能压测或延迟统计前,请遵循以下预热步骤:

  1. 模拟正式测试的并发级别,提前发起一定数量的调用(例如,持续 1-2 分钟),以充分填充连接池。

  2. 确认连接池已建立并维持足够的活跃连接后,再开始正式的性能数据采集。

通过合理的预热,可使 SDK 连接池进入稳定复用状态,从而测量出更具代表性的延迟指标,真实反映服务在线上平稳运行时的性能。

Java SDK常见异常

异常 1、 业务流量平稳,但是服务器 TCP 连接数持续上升

出错原因:

类型一:

每一个 SDK 对象创建时都会申请一个连接。如果没有使用对象池,每一次任务结束后对象都被析构。此时这一个连接将进入无引用状态,需要等待 61s 秒后服务端报错连接超时才会真正断开,这会导致这个连接在 61 秒内不可复用。

在高并发场景下,新的任务在发现没有可复用连接时会创建新连接,会造成如下后果:

  1. 连接数持续上升。

  2. 由于连接数过多,服务器资源不足,服务器卡顿。

  3. 连接池被打满、新任务由于启动时需要等待可用连接而阻塞。

类型二:

对象池配置的MaxIdle小于MaxTotal,导致在对象闲置时,超过MaxIdle的对象被销毁,从而造成连接泄漏。泄漏的连接需要等待61秒超时后断连,同类型一造成连接数持续上升。

解决方法

对于类型一,使用对象池解决。

对于类型二,检查对象池配置参数,设置MaxIdle和MaxTotal相等,关闭对象池自动销毁策略解决。

异常 2、任务耗时比正常调用多 60 秒

同“异常 1”,连接池已经达到最大连接限制,新的任务需要等待无引用状态的连接 61 秒触发超时后才可以获得连接。

异常 3、服务启动时任务慢,之后慢慢恢复正常

出错原因

在高并发调用时,同一个对象会复用同一个WebSocket连接,因此WebSocket连接只会在服务启动时创建。需要注意的是,任务启动阶段如果立刻开始较高并发调用,同时创建过多的WebSocket连接会导致阻塞。

解决方法

启动服务后逐步提升并发量,或增加预热任务。

异常 4、服务端报错 Invalid action('run-task')! Please follow the protocol!

出错原因

这是由于出现了客户端报错后,服务端不知道客户端出错,连接处于任务中状态。此时连接和对象被复用并开启下一个任务,导致流程错误,下一个任务失败。

解决方法

在抛出异常后主动关闭 WebSocket 连接后归还对象池。

异常 5、业务流量平稳,调用量出现异常尖刺

出错原因

同时创建过多 WebSocket 连接导致阻塞,但业务流量持续打进来,导致任务短时间积压,并且在阻塞后所有积压任务立刻调用。这会造成调用量尖刺,并且有可能造成瞬时超过账号的并发数限制导致部分任务失败、服务器卡顿等。

这种瞬间创建过多 WebSocket 的情况多发生于:

  • 服务启动阶段

  • 网络出现异常,大量 WebSocket 连接同时中断重连

  • 某一时刻出现大量服务端报错,导致大量 WebSocket 重连。常见报错如并发数超过账号限制(“Requests rate limit exceeded, please try again later.”)。

解决方法

  1. 检查网络情况。

  2. 排查尖刺前是否出现大量其他服务端报错。

  3. 提高账号并发限制。

  4. 调小对象池和连接池大小,通过对象池上限限制最大并发数。

  5. 提升服务器配置或扩充机器数。

异常 6、随着并发数提升,所有任务都变慢

解决方法

  1. 检查是否已经达到网络带宽上限。

  2. 检查实际并发数是否已经过高。

适用范围

不同服务部署范围支持的模型不同:

国际

服务部署范围为国际时,模型推理计算资源在全球范围内动态调度(不含中国内地);静态数据存储于您所选的地域。该部署范围支持的地域:新加坡。

调用以下模型时,请选择新加坡地域的API Key

  • CosyVoice:cosyvoice-v3-plus、cosyvoice-v3-flash

  • Qwen-TTS

    • 千问3-TTS-Instruct-Flash-Realtime:qwen3-tts-instruct-flash-realtime(稳定版,当前等同qwen3-tts-instruct-flash-realtime-2026-01-22)、qwen3-tts-instruct-flash-realtime-2026-01-22(最新快照版)

    • 千问3-TTS-VD-Realtimeqwen3-tts-vd-realtime-2026-01-15(最新快照版)、qwen3-tts-vd-realtime-2025-12-16(快照版)

    • 千问3-TTS-VC-Realtimeqwen3-tts-vc-realtime-2026-01-15(最新快照版)、qwen3-tts-vc-realtime-2025-11-27(快照版)

    • 千问3-TTS-Flash-Realtimeqwen3-tts-flash-realtime(稳定版,当前等同qwen3-tts-flash-realtime-2025-11-27)、qwen3-tts-flash-realtime-2025-11-27(最新快照版)、qwen3-tts-flash-realtime-2025-09-18(快照版)

中国内地

服务部署范围为中国内地时,模型推理计算资源仅限于中国内地;静态数据存储于您所选的地域。该部署范围支持的地域:华北2(北京)。

调用以下模型时,请选择北京地域的API Key

  • CosyVoice:cosyvoice-v3.5-plus、cosyvoice-v3.5-flash、cosyvoice-v3-plus、cosyvoice-v3-flash、cosyvoice-v2

  • Qwen-TTS

    • 千问3-TTS-Instruct-Flash-Realtime:qwen3-tts-instruct-flash-realtime(稳定版,当前等同qwen3-tts-instruct-flash-realtime-2026-01-22)、qwen3-tts-instruct-flash-realtime-2026-01-22(最新快照版)

    • 千问3-TTS-VD-Realtimeqwen3-tts-vd-realtime-2026-01-15(最新快照版)、qwen3-tts-vd-realtime-2025-12-16(快照版)

    • 千问3-TTS-VC-Realtimeqwen3-tts-vc-realtime-2026-01-15(最新快照版)、qwen3-tts-vc-realtime-2025-11-27(快照版)

    • 千问3-TTS-Flash-Realtimeqwen3-tts-flash-realtime(稳定版,当前等同qwen3-tts-flash-realtime-2025-11-27)、qwen3-tts-flash-realtime-2025-11-27(最新快照版)、qwen3-tts-flash-realtime-2025-09-18(快照版)

    • 千问-TTS-Realtimeqwen-tts-realtime(稳定版,当前等同qwen-tts-realtime-2025-07-15)、qwen-tts-realtime-latest(最新版,当前等同qwen-tts-realtime-2025-07-15)、qwen-tts-realtime-2025-07-15(快照版)

支持的音色

不同模型支持的音色有所差异。使用时,将请求参数 voice 设置为音色列表中 voice参数 列对应的值即可。

API参考

常见问题

Q:语音合成发音错误怎么办?多音字如何控制发音?

  • 将多音字替换为同音的其他汉字,快速解决发音问题。

  • 使用 SSML 标记语言控制发音。

Q:使用复刻音色生成的音频无声音如何排查?

  1. 确认音色状态

    调用CosyVoice声音复刻/设计API接口,确认音色的 status 是否为 OK

  2. 检查模型版本一致性

    确保复刻音色时使用的 target_model 参数与语音合成时的 model 参数完全一致。例如:

    • 复刻时使用 cosyvoice-v3-plus

    • 合成时也必须使用 cosyvoice-v3-plus

  3. 验证源音频质量

    检查复刻音色时使用的源音频是否符合CosyVoice声音复刻/设计API

    • 音频时长:10-20秒

    • 音质清晰

    • 无背景噪音

  4. 检查请求参数

    确认语音合成请求中的 voice 参数已设置为复刻音色的 ID。

Q:声音复刻后合成效果不稳定或语音不完整怎么办?

如果复刻音色后合成的语音出现以下问题:

  • 语音播放不完整,只读出部分文字

  • 合成效果不稳定,时好时坏

  • 语音中包含异常停顿或静音段

可能原因:源音频质量不符合要求。

解决方案:检查源音频是否符合以下要求。建议按照录音操作指南重新录制。

  • 检查音频连续性:确保源音频中语音内容连续,避免出现超过 2 秒的停顿或静音段。音频中的明显空白段会导致模型将静音或噪声误识别为音色特征,从而影响生成效果。

  • 检查语音活动比例:确保有效语音占音频总时长的 60% 以上。背景噪声或非语音段过多会干扰音色特征提取。

  • 验证音频质量细节:

    • 音频时长:10~20 秒(推荐 15 秒左右)

    • 发音清晰,语速平稳

    • 无背景噪音、回音、杂音

    • 语音能量集中,无长时间静音段

Q:为什么语音合成的实际时长与 WAV 文件显示的时长不一致?

语音合成采用流式机制,边合成边返回数据,因此保存的 WAV 文件头中的时长是预估值,存在一定误差。如需精确时长,可将 format 设置为 pcm,待获取完整合成结果后自行添加 WAV 文件头信息。

Q:为什么音频无法播放?

请按以下场景逐一排查:

  1. 音频保存为完整文件(如 xx.mp3)的情况

    1. 音频格式一致性:确保请求参数中设置的音频格式与文件后缀一致。例如,请求参数设置为 wav,但文件保存为 .mp3,可能导致播放失败。

    2. 播放器兼容性:确认播放器是否支持该音频文件的格式和采样率。部分播放器可能不支持高采样率或特定编码的音频文件。

  2. 流式播放音频的情况

    1. 将音频流保存为完整文件,尝试用播放器播放。如果文件无法播放,请参考场景 1 的排查方法。

    2. 如果文件可以正常播放,则问题可能出在流式播放的实现上。请确认播放器是否支持流式播放。常见的支持流式播放的工具和库包括:ffmpeg、pyaudio(Python)、AudioFormat(Java)、MediaSource(JavaScript)等。

Q:为什么音频播放卡顿?

请按以下步骤逐一排查:

  1. 检查文本发送速度:确保文本发送间隔合理,避免前一段音频播放完毕后下一段文本尚未发送。

  2. 检查回调函数性能:

    • 确认回调函数中是否存在过多业务逻辑导致阻塞。

    • 回调函数运行在 WebSocket 线程中,若被阻塞会影响网络数据包的接收,进而导致音频接收卡顿。

    • 建议将音频数据写入独立的音频缓冲区(audio buffer),在其他线程中读取并处理,避免阻塞 WebSocket 线程。

  3. 检查网络稳定性:确保网络连接稳定,避免因网络波动导致音频传输中断或延迟。

Q:语音合成耗时较长是什么原因?

请按以下步骤排查:

  1. 检查输入间隔

    如果是流式语音合成,请确认文本发送间隔是否过长(如上一段发出后延迟数秒才发送下一段),过长的间隔会导致合成总时长增加。

  2. 分析性能指标

    • 首包延迟:正常约 500ms。

    • RTF(实时率 = 合成总耗时 / 音频时长):正常应小于 1.0。

Q:如何限制 API Key 仅用于语音合成服务(权限隔离)?

您可以通过新建业务空间并仅授权特定模型来限制 API Key 的使用范围。详情请参见业务空间管理