All Products
Search
Document Center

Alibaba Cloud Model Studio:Real-time speech synthesis

Last Updated:May 12, 2026

Real-time speech synthesis converts text into natural speech over a WebSocket connection. Model Studio offers CosyVoice, Qwen-TTS model families with streaming input and output, voice cloning, voice design, and fine-grained audio control for use cases such as voice assistants, audiobooks, and intelligent customer service.

Overview

Low-latency real-time speech synthesis over WebSocket, built for voice assistants, intelligent customer service, live captioning, and other scenarios that require instant responses.

  • Streaming input and output (full-duplex WebSocket) with low time to first audio, ideal for real-time conversations such as voice assistants and intelligent customer service

  • Adjustable speech rate, pitch, volume, and bitrate for fine-grained voice control

  • Compatible with mainstream audio formats (PCM, WAV, MP3, Opus) and supports up to 48 kHz sample rate output

  • Supports Instruction-based control, allowing natural-language instructions to control voice expressiveness

  • Supports Voice cloning and Voice Design voice customization

If you don't need real-time output, use Speech synthesis - Qwen(HTTP API), which is suited for batch scenarios such as audiobooks and courseware dubbing. For model selection guidance, see Speech synthesis.

Prerequisites

Quick start

The following examples demonstrate speech synthesis for each model family. For more language examples and detailed parameter descriptions, see the API reference of each model.

CosyVoice

Important

cosyvoice-v3.5-plus and cosyvoice-v3.5-flash are currently available only in the Beijing region and support only voice design and voice cloning scenarios (no system voices). Before using these models, create a target voice by following Voice cloning or Voice Design. After you create the voice, set the voice field in your code to your voice ID and set the model field to the corresponding model.

The following example shows how to synthesize speech with a system voice (see Voice list).

Python

# coding=utf-8

    import os
    import dashscope
    from dashscope.audio.tts_v2 import *

    # The API Key differs between the Singapore and Beijing regions. Obtain an API Key: https://www.alibabacloud.com/help/zh/model-studio/get-api-key
    # If the environment variable is not configured, replace the following line with your Model Studio API Key: dashscope.api_key = "sk-xxx"
    dashscope.api_key = os.environ.get('DASHSCOPE_API_KEY')

    # The following is the Singapore region URL. If you use a model in the Beijing region, replace the URL with: wss://dashscope.aliyuncs.com/api-ws/v1/inference
    dashscope.base_websocket_api_url='wss://dashscope-intl.aliyuncs.com/api-ws/v1/inference'

    # Model
    # Different model versions require corresponding voice types:
    # cosyvoice-v3-flash/cosyvoice-v3-plus: Use voices such as longanyang.
    # cosyvoice-v2: Use voices such as longxiaochun_v2.
    # Each voice supports different languages. When synthesizing non-Chinese languages such as Japanese or Korean, select a voice that supports the target language. For details, see the CosyVoice voice list.
    model = "cosyvoice-v3-flash"
    # Voice
    voice = "longanyang"

    # Instantiate SpeechSynthesizer and pass request parameters such as model and voice in the constructor
    synthesizer = SpeechSynthesizer(model=model, voice=voice)
    # Send text for synthesis and obtain binary audio
    audio = synthesizer.call("How is the weather today?")
    # The first text submission requires establishing a WebSocket connection, so the first-packet latency includes connection setup time
    print('[Metric] requestId: {}, first package delay: {} ms'.format(
        synthesizer.get_last_request_id(),
        synthesizer.get_first_package_delay()))

    # Save audio to a local file
    with open('output.mp3', 'wb') as f:
        f.write(audio)

Java

import com.alibaba.dashscope.audio.ttsv2.SpeechSynthesisParam;
    import com.alibaba.dashscope.audio.ttsv2.SpeechSynthesizer;
    import com.alibaba.dashscope.utils.Constants;

    import java.io.File;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.nio.ByteBuffer;

    public class Main {
        // Model
        // Different model versions require corresponding voice types:
        // cosyvoice-v3-flash/cosyvoice-v3-plus: Use voices such as longanyang.
        // cosyvoice-v2: Use voices such as longxiaochun_v2.
        // Each voice supports different languages. When synthesizing non-Chinese languages such as Japanese or Korean, select a voice that supports the target language. For details, see the CosyVoice voice list.
        private static String model = "cosyvoice-v3-flash";
        // Voice
        private static String voice = "longanyang";

        public static void streamAudioDataToSpeaker() {
            // Request parameters
            SpeechSynthesisParam param =
                    SpeechSynthesisParam.builder()
                            // The API Key differs between the Singapore and Beijing regions. Obtain an API Key: https://www.alibabacloud.com/help/zh/model-studio/get-api-key
                            // If the environment variable is not configured, replace the following line with your Model Studio API Key: .apiKey("sk-xxx")
                            .apiKey(System.getenv("DASHSCOPE_API_KEY"))
                            .model(model) // Model
                            .voice(voice) // Voice
                            .build();

            // Synchronous mode: disable callback (second parameter is null)
            SpeechSynthesizer synthesizer = new SpeechSynthesizer(param, null);
            ByteBuffer audio = null;
            try {
                // Block until audio is returned
                audio = synthesizer.call("How is the weather today?");
            } catch (Exception e) {
                throw new RuntimeException(e);
            } finally {
                // Close the WebSocket connection after the task completes
                synthesizer.getDuplexApi().close(1000, "bye");
            }
            if (audio != null) {
                // Save the audio data to a local file "output.mp3"
                File file = new File("output.mp3");
                // The first text submission requires establishing a WebSocket connection, so the first-packet latency includes connection setup time
                // Note: getFirstPackageDelay() requires dashscope-sdk-java 2.18.0 or later
                System.out.println(
                        "[Metric] requestId: "
                                + synthesizer.getLastRequestId()
                                + ", first package delay (ms): "
                                + synthesizer.getFirstPackageDelay());
                try (FileOutputStream fos = new FileOutputStream(file)) {
                    fos.write(audio.array());
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }

        public static void main(String[] args) {
            // The following is the Singapore region URL. If you use a model in the Beijing region, replace the URL with: wss://dashscope.aliyuncs.com/api-ws/v1/inference
            Constants.baseWebsocketApiUrl = "wss://dashscope-intl.aliyuncs.com/api-ws/v1/inference";
            streamAudioDataToSpeaker();
            System.exit(0);
        }
    }

Qwen-TTS

Synthesize speech with a system voice

The following example shows how to synthesize speech with a system voice (see Supported voices).

To use Instruction-based control, set model to qwen3-tts-instruct-flash-realtime and configure the instruction through the instructions parameter.

Python

Server commit mode

import os
import base64
import threading
import time
import dashscope
from dashscope.audio.qwen_tts_realtime import *


qwen_tts_realtime: QwenTtsRealtime = None
text_to_synthesize = [
    'Right? I love supermarkets like this.',
    'Especially during Chinese New Year,',
    'I go shopping at supermarkets.',
    'And I feel',
    'absolutely thrilled!',
    'I want to buy so many things!'
]

DO_VIDEO_TEST = False

def init_dashscope_api_key():
    """
        Set your DashScope API key. More information:
        https://github.com/aliyun/alibabacloud-bailian-speech-demo/blob/master/PREREQUISITES.md
    """

    # API keys differ between the Singapore and Beijing regions. Get an API key: https://www.alibabacloud.com/help/zh/model-studio/get-api-key
    if 'DASHSCOPE_API_KEY' in os.environ:
        dashscope.api_key = os.environ[
            'DASHSCOPE_API_KEY']  # Load API key from environment variable DASHSCOPE_API_KEY
    else:
        dashscope.api_key = 'your-dashscope-api-key'  # Set API key manually



class MyCallback(QwenTtsRealtimeCallback):
    def __init__(self):
        self.complete_event = threading.Event()
        self.file = open('result_24k.pcm', 'wb')

    def on_open(self) -> None:
        print('connection opened, init player')

    def on_close(self, close_status_code, close_msg) -> None:
        self.file.close()
        print('connection closed with code: {}, msg: {}, destroy player'.format(close_status_code, close_msg))

    def on_event(self, response: str) -> None:
        try:
            global qwen_tts_realtime
            type = response['type']
            if 'session.created' == type:
                print('start session: {}'.format(response['session']['id']))
            if 'response.audio.delta' == type:
                recv_audio_b64 = response['delta']
                self.file.write(base64.b64decode(recv_audio_b64))
            if 'response.done' == type:
                print(f'response {qwen_tts_realtime.get_last_response_id()} done')
            if 'session.finished' == type:
                print('session finished')
                self.complete_event.set()
        except Exception as e:
            print('[Error] {}'.format(e))
            return

    def wait_for_finished(self):
        self.complete_event.wait()


if __name__  == '__main__':
    init_dashscope_api_key()

    print('Initializing ...')

    callback = MyCallback()

    qwen_tts_realtime = QwenTtsRealtime(
        # To use instruction control, replace the model with qwen3-tts-instruct-flash-realtime
        model='qwen3-tts-flash-realtime',
        callback=callback,
        # This URL is for the Singapore region. If you use the Beijing region, replace it with: wss://dashscope.aliyuncs.com/api-ws/v1/realtime
        url='wss://dashscope-intl.aliyuncs.com/api-ws/v1/realtime'
        )

    qwen_tts_realtime.connect()
    qwen_tts_realtime.update_session(
        voice = 'Cherry',
        response_format = AudioFormat.PCM_24000HZ_MONO_16BIT,
        # To use instruction control, uncomment the following lines and replace the model with qwen3-tts-instruct-flash-realtime
        # instructions='Speak quickly with a rising intonation, suitable for introducing fashion products.',
        # optimize_instructions=True,
        mode = 'server_commit'        
    )
    for text_chunk in text_to_synthesize:
        print(f'send text: {text_chunk}')
        qwen_tts_realtime.append_text(text_chunk)
        time.sleep(0.1)
    qwen_tts_realtime.finish()
    callback.wait_for_finished()
    print('[Metric] session: {}, first audio delay: {}'.format(
                    qwen_tts_realtime.get_session_id(), 
                    qwen_tts_realtime.get_first_audio_delay(),
                    ))

Commit mode

import base64
import os
import threading
import dashscope
from dashscope.audio.qwen_tts_realtime import *


qwen_tts_realtime: QwenTtsRealtime = None
text_to_synthesize = [
    'This is the first sentence.',
    'This is the second sentence.',
    'This is the third sentence.',
]

DO_VIDEO_TEST = False

def init_dashscope_api_key():
    """
        Set your DashScope API key. More information:
        https://github.com/aliyun/alibabacloud-bailian-speech-demo/blob/master/PREREQUISITES.md
    """

    # API keys differ between the Singapore and Beijing regions. Get an API key: https://www.alibabacloud.com/help/zh/model-studio/get-api-key
    if 'DASHSCOPE_API_KEY' in os.environ:
        dashscope.api_key = os.environ[
            'DASHSCOPE_API_KEY']  # Load API key from environment variable DASHSCOPE_API_KEY
    else:
        dashscope.api_key = 'your-dashscope-api-key'  # Set API key manually



class MyCallback(QwenTtsRealtimeCallback):
    def __init__(self):
        super().__init__()
        self.response_counter = 0
        self.complete_event = threading.Event()
        self.file = open(f'result_{self.response_counter}_24k.pcm', 'wb')

    def reset_event(self):
        self.response_counter += 1
        self.file = open(f'result_{self.response_counter}_24k.pcm', 'wb')
        self.complete_event = threading.Event()

    def on_open(self) -> None:
        print('connection opened, init player')

    def on_close(self, close_status_code, close_msg) -> None:
        print('connection closed with code: {}, msg: {}, destroy player'.format(close_status_code, close_msg))

    def on_event(self, response: str) -> None:
        try:
            global qwen_tts_realtime
            type = response['type']
            if 'session.created' == type:
                print('start session: {}'.format(response['session']['id']))
            if 'response.audio.delta' == type:
                recv_audio_b64 = response['delta']
                self.file.write(base64.b64decode(recv_audio_b64))
            if 'response.done' == type:
                print(f'response {qwen_tts_realtime.get_last_response_id()} done')
                self.complete_event.set()
                self.file.close()
            if 'session.finished' == type:
                print('session finished')
                self.complete_event.set()
        except Exception as e:
            print('[Error] {}'.format(e))
            return

    def wait_for_response_done(self):
        self.complete_event.wait()


if __name__  == '__main__':
    init_dashscope_api_key()

    print('Initializing ...')

    callback = MyCallback()

    qwen_tts_realtime = QwenTtsRealtime(
        # To use instruction control, replace the model with qwen3-tts-instruct-flash-realtime
        model='qwen3-tts-flash-realtime',
        callback=callback, 
        # This URL is for the Singapore region. If you use the Beijing region, replace it with: wss://dashscope.aliyuncs.com/api-ws/v1/realtime
        url='wss://dashscope-intl.aliyuncs.com/api-ws/v1/realtime'
        )

    qwen_tts_realtime.connect()
    qwen_tts_realtime.update_session(
        voice = 'Cherry',
        response_format = AudioFormat.PCM_24000HZ_MONO_16BIT,
        # To use instruction control, uncomment the following lines and replace the model with qwen3-tts-instruct-flash-realtime
        # instructions='Speak quickly with a rising intonation, suitable for introducing fashion products.',
        # optimize_instructions=True,
        mode = 'commit'        
    )
    print(f'send text: {text_to_synthesize[0]}')
    qwen_tts_realtime.append_text(text_to_synthesize[0])
    qwen_tts_realtime.commit()
    callback.wait_for_response_done()
    callback.reset_event()
    
    print(f'send text: {text_to_synthesize[1]}')
    qwen_tts_realtime.append_text(text_to_synthesize[1])
    qwen_tts_realtime.commit()
    callback.wait_for_response_done()
    callback.reset_event()

    print(f'send text: {text_to_synthesize[2]}')
    qwen_tts_realtime.append_text(text_to_synthesize[2])
    qwen_tts_realtime.commit()
    callback.wait_for_response_done()
    
    qwen_tts_realtime.finish()
    print('[Metric] session: {}, first audio delay: {}'.format(
                    qwen_tts_realtime.get_session_id(), 
                    qwen_tts_realtime.get_first_audio_delay(),
                    ))

Java

Server commit mode

appendText()

import com.alibaba.dashscope.audio.qwen_tts_realtime.*;
import com.alibaba.dashscope.exception.NoApiKeyException;
import com.google.gson.JsonObject;
import javax.sound.sampled.LineUnavailableException;
import javax.sound.sampled.SourceDataLine;
import javax.sound.sampled.AudioFormat;
import javax.sound.sampled.DataLine;
import javax.sound.sampled.AudioSystem;
import java.io.*;
import java.util.Base64;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;

public class Main {
    static String[] textToSynthesize = {
            "Right? I really love this kind of supermarket.",
            "Especially during the Chinese New Year.",
            "Going to the supermarket.",
            "It just makes me feel.",
            "Super, super happy!",
            "I want to buy so many things!"
    };
    public static QwenTtsRealtimeAudioFormat ttsFormat = QwenTtsRealtimeAudioFormat.PCM_24000HZ_MONO_16BIT;

    // Real-time PCM audio player
    public static class RealtimePcmPlayer {
        private int sampleRate;
        private SourceDataLine line;
        private AudioFormat audioFormat;
        private Thread decoderThread;
        private Thread playerThread;
        private AtomicBoolean stopped = new AtomicBoolean(false);
        private Queue<String> b64AudioBuffer = new ConcurrentLinkedQueue<>();
        private Queue<byte[]> RawAudioBuffer = new ConcurrentLinkedQueue<>();
        private ByteArrayOutputStream totalAudioStream = new ByteArrayOutputStream();

        // Initialize the audio format and audio line.
        public RealtimePcmPlayer(int sampleRate) throws LineUnavailableException {
            this.sampleRate = sampleRate;
            this.audioFormat = new AudioFormat(this.sampleRate, 16, 1, true, false);
            DataLine.Info info = new DataLine.Info(SourceDataLine.class, audioFormat);
            line = (SourceDataLine) AudioSystem.getLine(info);
            line.open(audioFormat);
            line.start();
            decoderThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (!stopped.get()) {
                        String b64Audio = b64AudioBuffer.poll();
                        if (b64Audio != null) {
                            byte[] rawAudio = Base64.getDecoder().decode(b64Audio);
                            RawAudioBuffer.add(rawAudio);
                            // Write audio data to totalAudioStream.
                            try {
                                totalAudioStream.write(rawAudio);
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            }
                        } else {
                            try {
                                Thread.sleep(100);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        }
                    }
                }
            });
            playerThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (!stopped.get()) {
                        byte[] rawAudio = RawAudioBuffer.poll();
                        if (rawAudio != null) {
                            try {
                                playChunk(rawAudio);
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        } else {
                            try {
                                Thread.sleep(100);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        }
                    }
                }
            });
            decoderThread.start();
            playerThread.start();
        }

        // Play an audio chunk and block until playback completes.
        private void playChunk(byte[] chunk) throws IOException, InterruptedException {
            if (chunk == null || chunk.length == 0) return;

            int bytesWritten = 0;
            while (bytesWritten < chunk.length) {
                bytesWritten += line.write(chunk, bytesWritten, chunk.length - bytesWritten);
            }
            int audioLength = chunk.length / (this.sampleRate*2/1000);
            // Wait for the buffered audio to finish playing.
            Thread.sleep(audioLength - 10);
        }

        public void write(String b64Audio) {
            b64AudioBuffer.add(b64Audio);
        }

        public void cancel() {
            b64AudioBuffer.clear();
            RawAudioBuffer.clear();
        }

        public void waitForComplete() throws InterruptedException {
            while (!b64AudioBuffer.isEmpty() || !RawAudioBuffer.isEmpty()) {
                Thread.sleep(100);
            }
            line.drain();
        }

        public void shutdown() throws InterruptedException, IOException {
            stopped.set(true);
            decoderThread.join();
            playerThread.join();

            // Save the complete audio file.
            File file = new File("TotalAudio_"+ttsFormat.getSampleRate()+"."+ttsFormat.getFormat());
            try (FileOutputStream fos = new FileOutputStream(file)) {
                fos.write(totalAudioStream.toByteArray());
            }

            if (line != null && line.isRunning()) {
                line.drain();
                line.close();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException, LineUnavailableException, IOException {
        QwenTtsRealtimeParam param = QwenTtsRealtimeParam.builder()
                // To use instruction control, replace the model with qwen3-tts-instruct-flash-realtime.
                .model("qwen3-tts-flash-realtime")
                // Singapore endpoint. For China (Beijing), use wss://dashscope.aliyuncs.com/api-ws/v1/realtime.
                .url("wss://dashscope-intl.aliyuncs.com/api-ws/v1/realtime")
                // API keys differ between Singapore and China (Beijing). See https://www.alibabacloud.com/help/zh/model-studio/get-api-key.
                .apikey(System.getenv("DASHSCOPE_API_KEY"))
                .build();
        AtomicReference<CountDownLatch> completeLatch = new AtomicReference<>(new CountDownLatch(1));
        final AtomicReference<QwenTtsRealtime> qwenTtsRef = new AtomicReference<>(null);

        // Create a real-time audio player instance.
        RealtimePcmPlayer audioPlayer = new RealtimePcmPlayer(24000);

        QwenTtsRealtime qwenTtsRealtime = new QwenTtsRealtime(param, new QwenTtsRealtimeCallback() {
            @Override
            public void onOpen() {
                // Handle connection establishment.
            }
            @Override
            public void onEvent(JsonObject message) {
                String type = message.get("type").getAsString();
                switch(type) {
                    case "session.created":
                        // Handle session creation.
                        if (message.has("session")) {
                            String eventId = message.get("event_id").getAsString();
                            String sessionId = message.get("session").getAsJsonObject().get("id").getAsString();
                            System.out.println("[onEvent] session.created, session_id: "
                                    + sessionId + ", event_id: " + eventId);
                        }
                        break;
                    case "response.audio.delta":
                        String recvAudioB64 = message.get("delta").getAsString();
                        // Play audio in real time.
                        audioPlayer.write(recvAudioB64);
                        break;
                    case "response.done":
                        // Handle response completion.
                        break;
                    case "session.finished":
                        // Handle session termination.
                        completeLatch.get().countDown();
                    default:
                        break;
                }
            }
            @Override
            public void onClose(int code, String reason) {
                // Handle connection closure.
            }
        });
        qwenTtsRef.set(qwenTtsRealtime);
        try {
            qwenTtsRealtime.connect();
        } catch (NoApiKeyException e) {
            throw new RuntimeException(e);
        }
        QwenTtsRealtimeConfig config = QwenTtsRealtimeConfig.builder()
                .voice("Cherry")
                .responseFormat(ttsFormat)
                .mode("server_commit")
                // To use instruction control, uncomment the following lines and replace the model with qwen3-tts-instruct-flash-realtime.
                // .instructions("")
                // .optimizeInstructions(true)
                .build();
        qwenTtsRealtime.updateSession(config);
        for (String text:textToSynthesize) {
            qwenTtsRealtime.appendText(text);
            Thread.sleep(100);
        }
        qwenTtsRealtime.finish();
        completeLatch.get().await();
        qwenTtsRealtime.close();

        // Wait for audio playback to complete, then shut down the player.
        audioPlayer.waitForComplete();
        audioPlayer.shutdown();
        System.exit(0);
    }
}

Commit mode

commit()

import com.alibaba.dashscope.audio.qwen_tts_realtime.*;
import com.alibaba.dashscope.exception.NoApiKeyException;
import com.google.gson.JsonObject;
import javax.sound.sampled.LineUnavailableException;
import javax.sound.sampled.SourceDataLine;
import javax.sound.sampled.AudioFormat;
import javax.sound.sampled.DataLine;
import javax.sound.sampled.AudioSystem;
import java.io.*;
import java.util.Base64;
import java.util.Queue;
import java.util.Scanner;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;

public class Main {
    public static QwenTtsRealtimeAudioFormat ttsFormat = QwenTtsRealtimeAudioFormat.PCM_24000HZ_MONO_16BIT;
    // Real-time PCM audio player
    public static class RealtimePcmPlayer {
        private int sampleRate;
        private SourceDataLine line;
        private AudioFormat audioFormat;
        private Thread decoderThread;
        private Thread playerThread;
        private AtomicBoolean stopped = new AtomicBoolean(false);
        private Queue<String> b64AudioBuffer = new ConcurrentLinkedQueue<>();
        private Queue<byte[]> RawAudioBuffer = new ConcurrentLinkedQueue<>();
        private ByteArrayOutputStream totalAudioStream = new ByteArrayOutputStream();


        // Initialize the audio format and audio line.
        public RealtimePcmPlayer(int sampleRate) throws LineUnavailableException {
            this.sampleRate = sampleRate;
            this.audioFormat = new AudioFormat(this.sampleRate, 16, 1, true, false);
            DataLine.Info info = new DataLine.Info(SourceDataLine.class, audioFormat);
            line = (SourceDataLine) AudioSystem.getLine(info);
            line.open(audioFormat);
            line.start();
            decoderThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (!stopped.get()) {
                        String b64Audio = b64AudioBuffer.poll();
                        if (b64Audio != null) {
                            byte[] rawAudio = Base64.getDecoder().decode(b64Audio);
                            RawAudioBuffer.add(rawAudio);
                            // Write audio data to totalAudioStream.
                            try {
                                totalAudioStream.write(rawAudio);
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            }
                        } else {
                            try {
                                Thread.sleep(100);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        }
                    }
                }
            });
            playerThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (!stopped.get()) {
                        byte[] rawAudio = RawAudioBuffer.poll();
                        if (rawAudio != null) {
                            try {
                                playChunk(rawAudio);
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        } else {
                            try {
                                Thread.sleep(100);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        }
                    }
                }
            });
            decoderThread.start();
            playerThread.start();
        }

        // Play an audio chunk and block until playback completes.
        private void playChunk(byte[] chunk) throws IOException, InterruptedException {
            if (chunk == null || chunk.length == 0) return;

            int bytesWritten = 0;
            while (bytesWritten < chunk.length) {
                bytesWritten += line.write(chunk, bytesWritten, chunk.length - bytesWritten);
            }
            int audioLength = chunk.length / (this.sampleRate*2/1000);
            // Wait for the buffered audio to finish playing.
            Thread.sleep(audioLength - 10);
        }

        public void write(String b64Audio) {
            b64AudioBuffer.add(b64Audio);
        }

        public void cancel() {
            b64AudioBuffer.clear();
            RawAudioBuffer.clear();
        }

        public void waitForComplete() throws InterruptedException {
            // Wait for all buffered audio data to finish playing.
            while (!b64AudioBuffer.isEmpty() || !RawAudioBuffer.isEmpty()) {
                Thread.sleep(100);
            }
            // Wait for the audio line to drain.
            line.drain();
        }

        public void shutdown() throws InterruptedException {
            stopped.set(true);
            decoderThread.join();
            playerThread.join();
            // Save the complete audio file.
            File file = new File("TotalAudio_"+ttsFormat.getSampleRate()+"."+ttsFormat.getFormat());
            try (FileOutputStream fos = new FileOutputStream(file)) {
                fos.write(totalAudioStream.toByteArray());
            } catch (FileNotFoundException e) {
                throw new RuntimeException(e);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
            if (line != null && line.isRunning()) {
                line.drain();
                line.close();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException, LineUnavailableException, FileNotFoundException {
        Scanner scanner = new Scanner(System.in);

        QwenTtsRealtimeParam param = QwenTtsRealtimeParam.builder()
                // To use instruction control, replace the model with qwen3-tts-instruct-flash-realtime.
                .model("qwen3-tts-flash-realtime")
                // Singapore endpoint. For China (Beijing), use wss://dashscope.aliyuncs.com/api-ws/v1/realtime.
                .url("wss://dashscope-intl.aliyuncs.com/api-ws/v1/realtime")
                // API keys differ between Singapore and China (Beijing). See https://www.alibabacloud.com/help/zh/model-studio/get-api-key.
                .apikey(System.getenv("DASHSCOPE_API_KEY"))
                .build();

        AtomicReference<CountDownLatch> completeLatch = new AtomicReference<>(new CountDownLatch(1));

        // Create a real-time player instance.
        RealtimePcmPlayer audioPlayer = new RealtimePcmPlayer(24000);

        final AtomicReference<QwenTtsRealtime> qwenTtsRef = new AtomicReference<>(null);
        QwenTtsRealtime qwenTtsRealtime = new QwenTtsRealtime(param, new QwenTtsRealtimeCallback() {
            @Override
            public void onOpen() {
                System.out.println("connection opened");
                System.out.println("Enter text and press Enter to send. Enter 'quit' to exit the program.");
            }
            @Override
            public void onEvent(JsonObject message) {
                String type = message.get("type").getAsString();
                switch(type) {
                    case "session.created":
                        System.out.println("start session: " + message.get("session").getAsJsonObject().get("id").getAsString());
                        break;
                    case "response.audio.delta":
                        String recvAudioB64 = message.get("delta").getAsString();
                        byte[] rawAudio = Base64.getDecoder().decode(recvAudioB64);
                        // Play audio in real time.
                        audioPlayer.write(recvAudioB64);
                        break;
                    case "response.done":
                        System.out.println("response done");
                        // Wait for audio playback to complete.
                        try {
                            audioPlayer.waitForComplete();
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                        // Prepare for the next input.
                        completeLatch.get().countDown();
                        break;
                    case "session.finished":
                        System.out.println("session finished");
                        if (qwenTtsRef.get() != null) {
                            System.out.println("[Metric] response: " + qwenTtsRef.get().getResponseId() +
                                    ", first audio delay: " + qwenTtsRef.get().getFirstAudioDelay() + " ms");
                        }
                        completeLatch.get().countDown();
                    default:
                        break;
                }
            }
            @Override
            public void onClose(int code, String reason) {
                System.out.println("connection closed code: " + code + ", reason: " + reason);
                try {
                    // Wait for playback to complete, then shut down the player.
                    audioPlayer.waitForComplete();
                    audioPlayer.shutdown();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        qwenTtsRef.set(qwenTtsRealtime);
        try {
            qwenTtsRealtime.connect();
        } catch (NoApiKeyException e) {
            throw new RuntimeException(e);
        }
        QwenTtsRealtimeConfig config = QwenTtsRealtimeConfig.builder()
                .voice("Cherry")
                .responseFormat(ttsFormat)
                .mode("commit")
                // To use instruction control, uncomment the following lines and replace the model with qwen3-tts-instruct-flash-realtime.
                // .instructions("")
                // .optimizeInstructions(true)
                .build();
        qwenTtsRealtime.updateSession(config);

        // Read user input in a loop.
        while (true) {
            System.out.print("Enter the text to synthesize: ");
            String text = scanner.nextLine();

            // Exit when the user enters 'quit'.
            if ("quit".equalsIgnoreCase(text.trim())) {
                System.out.println("Closing the connection...");
                qwenTtsRealtime.finish();
                completeLatch.get().await();
                break;
            }

            // Skip empty input.
            if (text.trim().isEmpty()) {
                continue;
            }

            // Re-initialize the countdown latch.
            completeLatch.set(new CountDownLatch(1));

            // Send the text.
            qwenTtsRealtime.appendText(text);
            qwenTtsRealtime.commit();

            // Wait for the current synthesis to complete.
            completeLatch.get().await();
        }

        // Clean up resources.
        audioPlayer.waitForComplete();
        audioPlayer.shutdown();
        scanner.close();
        System.exit(0);
    }
}

Advanced features

The following features provide fine-grained control over speech synthesis.

Qwen-TTS interaction modes

The Qwen-TTS Realtime API provides two WebSocket interaction modes, switched through the session.mode parameter:

  • server_commit mode: The server intelligently handles text segmentation and synthesis timing. This mode suits continuous synthesis of large text blocks. The client only needs to append text without managing segmentation or submission.

  • commit mode: The client manually submits the text buffer to trigger synthesis. This mode suits scenarios that require precise control over synthesis timing, such as turn-by-turn synthesis in conversational AI.

Set the interaction mode in the SDK:

  • Python SDK: Set the mode parameter in the update_session method.

    qwen_tts_realtime.update_session(
        voice='Cherry',
        response_format=AudioFormat.PCM_24000HZ_MONO_16BIT,
        mode='server_commit'
    )
  • Java SDK: Set the mode parameter through QwenTtsRealtimeConfig.builder().

    QwenTtsRealtimeConfig config = QwenTtsRealtimeConfig.builder()
            .voice("Cherry")
            .responseFormat(ttsFormat)
            .mode("server_commit")
            .build();
    qwenTtsRealtime.updateSession(config);

For complete SDK code examples, see Python SDK and Java SDK. For details on the WebSocket event lifecycle and connection reuse, see Realtime speech synthesis - Qwen API reference.

Instruction-based control

Instruction-based control lets you precisely shape the vocal expression through natural language descriptions, without adjusting complex audio parameters. Describe the desired tone, speed, emotion, or timbre in plain text to produce the corresponding speech effect.

Supported models:

  • CosyVoice: cosyvoice-v3.5-plus, cosyvoice-v3.5-flash, cosyvoice-v3-flash

    Different models have different instruction format requirements:

    • cosyvoice-v3.5-plus, cosyvoice-v3.5-flash: Accept arbitrary instructions to control synthesis effects such as emotion and speed.

    • cosyvoice-v3-flash with Voice Design or Voice Clone timbres: Accept arbitrary instructions to control synthesis effects.

    • cosyvoice-v3-flash with system voices: Instructions must follow a fixed format. For details, see CosyVoice voice list.

  • Qwen-TTS: Only the Qwen3-TTS-Instruct-Flash-Realtime series models are supported.

How to use:

  • CosyVoice: Specify instructions through the instructions parameter, for example, "Fast pace with a rising intonation, suitable for introducing fashion products."

  • Qwen-TTS: Specify instructions through the instruction parameter, for example, "Fast pace with a rising intonation, suitable for introducing fashion products."

Supported languages for instruction text:

  • CosyVoice:

    • cosyvoice-v3.5-plus, cosyvoice-v3.5-flash: Chinese, English, French, German, Japanese, Korean, Russian, Portuguese, Thai, Indonesian, and Vietnamese.

    • cosyvoice-v3-flash: Chinese, English, French, German, Japanese, Korean, and Russian.

  • Qwen-TTS: Chinese and English only.

Instruction text length limits:

  • CosyVoice: Up to 100 characters. Chinese characters (including Simplified/Traditional Chinese, Japanese Kanji, and Korean Hanja) count as 2 characters each. Other characters (punctuation, letters, numbers, Japanese Kana, Korean Hangul, etc.) count as 1 character each.

  • Qwen-TTS: Up to 1,600 tokens.

Use cases:

  • Audiobook and radio drama voiceover

  • Advertising and promotional voiceover

  • Game character and animation voiceover

  • Emotionally expressive voice assistants

  • Documentary narration and news broadcasting

Tips for writing high-quality voice descriptions:

  • Core principles:

    1. Be specific, not vague: Use words that describe concrete vocal qualities, such as "deep," "crisp," or "slightly fast." Avoid subjective, low-information terms like "nice" or "normal."

    2. Be multidimensional, not single-faceted: A good description combines multiple dimensions (pitch, speed, emotion, etc.). Describing only one dimension (e.g., "high pitch") is too broad to produce a distinctive effect.

    3. Be objective, not subjective: Focus on the physical and perceptual qualities of the voice, not personal preferences. For example, use "slightly high pitch with energy" rather than "my favorite voice."

    4. Be original, not imitative: Describe the vocal qualities you want, rather than requesting imitation of specific public figures (such as celebrities or actors). Imitation requests involve copyright risks and are not supported.

    5. Be concise, not redundant: Make every word count. Avoid repeating synonyms or stacking meaningless intensifiers (e.g., "a very very great voice").

  • Description dimensions: Combining multiple dimensions creates richer expression effects.

    Dimension

    Example descriptions

    Pitch

    High, mid, low, slightly high, slightly low

    Speed

    Fast, moderate, slow, slightly fast, slightly slow

    Emotion

    Cheerful, calm, gentle, serious, lively, composed, soothing

    Timbre

    Magnetic, crisp, husky, mellow, sweet, rich, powerful

    Use case

    News broadcasting, advertising, audiobook, animation character, voice assistant, documentary narration

  • Examples:

    • Standard broadcasting style: Clear and precise articulation with standard pronunciation

    • Emotional escalation: Volume rising rapidly from normal conversation to a shout; straightforward personality with externalized, easily agitated emotions

    • Special emotional state: Slightly slurred pronunciation from a teary voice, slightly husky, with noticeable tension from a sobbing tone

    • Advertising voiceover style: Slightly high pitch, moderate speed, energetic and engaging, suitable for advertising

    • Gentle soothing style: Slightly slow speed, soft and sweet tone, warm and comforting like a caring friend

WebSocket direct connection examples

If you don't use the DashScope SDK, connect directly to the server through the raw WebSocket protocol for speech synthesis. The following examples provide minimal working implementations. You need to develop the actual business logic yourself. For the WebSocket protocol specification (server endpoint, request headers, interaction flow) of each model, see the corresponding API reference.

View WebSocket direct connection examples

CosyVoice

Go

package main

import (
	"encoding/json"
	"fmt"
	"net/http"
	"os"
	"strings"
	"time"

	"github.com/google/uuid"
	"github.com/gorilla/websocket"
)

const (
	// The following is the Singapore region URL. If you use a model in the Beijing region, replace the URL with: wss://dashscope.aliyuncs.com/api-ws/v1/inference/
	wsURL      = "wss://dashscope-intl.aliyuncs.com/api-ws/v1/inference/"
	outputFile = "output.mp3"
)

func main() {
	// The API Key differs between the Singapore and Beijing regions. Obtain an API Key: https://www.alibabacloud.com/help/zh/model-studio/get-api-key
	// If the environment variable is not configured, replace the following line with your Model Studio API Key: apiKey := "sk-xxx"
	apiKey := os.Getenv("DASHSCOPE_API_KEY")

	// Clear the output file
	os.Remove(outputFile)
	os.Create(outputFile)

	// Connect to WebSocket
	header := make(http.Header)
	header.Add("X-DashScope-DataInspection", "enable")
	header.Add("Authorization", fmt.Sprintf("bearer %s", apiKey))

	conn, resp, err := websocket.DefaultDialer.Dial(wsURL, header)
	if err != nil {
		if resp != nil {
			fmt.Printf("Connection failed, HTTP status code: %d\n", resp.StatusCode)
		}
		fmt.Println("Connection failed:", err)
		return
	}
	defer conn.Close()

	// Generate task ID
	taskID := uuid.New().String()
	fmt.Printf("Generated task ID: %s\n", taskID)

	// Send run-task event
	runTaskCmd := map[string]interface{}{
		"header": map[string]interface{}{
			"action":    "run-task",
			"task_id":   taskID,
			"streaming": "duplex",
		},
		"payload": map[string]interface{}{
			"task_group": "audio",
			"task":       "tts",
			"function":   "SpeechSynthesizer",
			"model":      "cosyvoice-v3-flash",
			"parameters": map[string]interface{}{
				"text_type":   "PlainText",
				"voice":       "longanyang",
				"format":      "mp3",
				"sample_rate": 22050,
				"volume":      50,
				"rate":        1,
				"pitch":       1,
				// If enable_ssml is set to true, only one continue-task event is allowed; otherwise the error "Text request limit violated, expected 1." will occur.
				"enable_ssml": false,
			},
			"input": map[string]interface{}{},
		},
	}

	runTaskJSON, _ := json.Marshal(runTaskCmd)
	fmt.Printf("Sending run-task event: %s\n", string(runTaskJSON))

	err = conn.WriteMessage(websocket.TextMessage, runTaskJSON)
	if err != nil {
		fmt.Println("Failed to send run-task:", err)
		return
	}

	textSent := false

	// Process messages
	for {
		messageType, message, err := conn.ReadMessage()
		if err != nil {
			fmt.Println("Failed to read message:", err)
			break
		}

		// Process binary messages
		if messageType == websocket.BinaryMessage {
			fmt.Printf("Received binary message, length: %d\n", len(message))
			file, _ := os.OpenFile(outputFile, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0644)
			file.Write(message)
			file.Close()
			continue
		}

		// Process text messages
		messageStr := string(message)
		fmt.Printf("Received text message: %s\n", strings.ReplaceAll(messageStr, "\n", ""))

		// Simple JSON parsing to get event type
		var msgMap map[string]interface{}
		if json.Unmarshal(message, &msgMap) == nil {
			if header, ok := msgMap["header"].(map[string]interface{}); ok {
				if event, ok := header["event"].(string); ok {
					fmt.Printf("Event type: %s\n", event)

					switch event {
					case "task-started":
						fmt.Println("=== Received task-started event ===")

						if !textSent {
							// Send continue-task events

							texts := []string{"Before my bed, moonlight shines bright, I suspect it's frost upon the ground.", "I raise my eyes to gaze at the bright moon, then bow my head, thinking of home."}

							for _, text := range texts {
								continueTaskCmd := map[string]interface{}{
									"header": map[string]interface{}{
										"action":    "continue-task",
										"task_id":   taskID,
										"streaming": "duplex",
									},
									"payload": map[string]interface{}{
										"input": map[string]interface{}{
											"text": text,
										},
									},
								}

								continueTaskJSON, _ := json.Marshal(continueTaskCmd)
								fmt.Printf("Sending continue-task event: %s\n", string(continueTaskJSON))

								err = conn.WriteMessage(websocket.TextMessage, continueTaskJSON)
								if err != nil {
									fmt.Println("Failed to send continue-task:", err)
									return
								}
							}

							textSent = true

							// Delay before sending finish-task
							time.Sleep(500 * time.Millisecond)

							// Send finish-task event
							finishTaskCmd := map[string]interface{}{
								"header": map[string]interface{}{
									"action":    "finish-task",
									"task_id":   taskID,
									"streaming": "duplex",
								},
								"payload": map[string]interface{}{
									"input": map[string]interface{}{},
								},
							}

							finishTaskJSON, _ := json.Marshal(finishTaskCmd)
							fmt.Printf("Sending finish-task event: %s\n", string(finishTaskJSON))

							err = conn.WriteMessage(websocket.TextMessage, finishTaskJSON)
							if err != nil {
								fmt.Println("Failed to send finish-task:", err)
								return
							}
						}

					case "task-finished":
						fmt.Println("=== Task completed ===")
						return

					case "task-failed":
						fmt.Println("=== Task failed ===")
						if header["error_message"] != nil {
							fmt.Printf("Error message: %s\n", header["error_message"])
						}
						return

					case "result-generated":
						fmt.Println("Received result-generated event")
					}
				}
			}
		}
	}
}

C#

using System.Net.WebSockets;
using System.Text;
using System.Text.Json;

class Program {
    // The API Key differs between the Singapore and Beijing regions. Obtain an API Key: https://www.alibabacloud.com/help/zh/model-studio/get-api-key
    // If the environment variable is not configured, replace the following line with your Model Studio API Key: private static readonly string ApiKey = "sk-xxx"
    private static readonly string ApiKey = Environment.GetEnvironmentVariable("DASHSCOPE_API_KEY") ?? throw new InvalidOperationException("DASHSCOPE_API_KEY environment variable is not set.");

    // The following is the Singapore region URL. If you use a model in the Beijing region, replace the URL with: wss://dashscope.aliyuncs.com/api-ws/v1/inference/
    private const string WebSocketUrl = "wss://dashscope-intl.aliyuncs.com/api-ws/v1/inference/";
    // Output file path
    private const string OutputFilePath = "output.mp3";

    // WebSocket client
    private static ClientWebSocket _webSocket = new ClientWebSocket();
    // Cancellation token source
    private static CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
    // Task ID
    private static string? _taskId;
    // Whether the task has started
    private static TaskCompletionSource<bool> _taskStartedTcs = new TaskCompletionSource<bool>();

    static async Task Main(string[] args) {
        try {
            // Clear the output file
            ClearOutputFile(OutputFilePath);

            // Connect to the WebSocket service
            await ConnectToWebSocketAsync(WebSocketUrl);

            // Start the message receiving task
            Task receiveTask = ReceiveMessagesAsync();

            // Send run-task event
            _taskId = GenerateTaskId();
            await SendRunTaskCommandAsync(_taskId);

            // Wait for the task-started event
            await _taskStartedTcs.Task;

            // Continuously send continue-task events
            string[] texts = {
                "Before my bed, moonlight shines bright",
                "I suspect it's frost upon the ground",
                "I raise my eyes to gaze at the bright moon",
                "then bow my head, thinking of home"
            };
            foreach (string text in texts) {
                await SendContinueTaskCommandAsync(text);
            }

            // Send finish-task event
            await SendFinishTaskCommandAsync(_taskId);

            // Wait for the receiving task to complete
            await receiveTask;

            Console.WriteLine("Task completed, connection closed.");
        } catch (OperationCanceledException) {
            Console.WriteLine("Task cancelled.");
        } catch (Exception ex) {
            Console.WriteLine($"Error occurred: {ex.Message}");
        } finally {
            _cancellationTokenSource.Cancel();
            _webSocket.Dispose();
        }
    }

    private static void ClearOutputFile(string filePath) {
        if (File.Exists(filePath)) {
            File.WriteAllText(filePath, string.Empty);
            Console.WriteLine("Output file cleared.");
        } else {
            Console.WriteLine("Output file does not exist, no need to clear.");
        }
    }

    private static async Task ConnectToWebSocketAsync(string url) {
        var uri = new Uri(url);
        if (_webSocket.State == WebSocketState.Connecting || _webSocket.State == WebSocketState.Open) {
            return;
        }

        // Set WebSocket connection headers
        _webSocket.Options.SetRequestHeader("Authorization", $"bearer {ApiKey}");
        _webSocket.Options.SetRequestHeader("X-DashScope-DataInspection", "enable");

        try {
            await _webSocket.ConnectAsync(uri, _cancellationTokenSource.Token);
            Console.WriteLine("Successfully connected to the WebSocket service.");
        } catch (OperationCanceledException) {
            Console.WriteLine("WebSocket connection cancelled.");
        } catch (Exception ex) {
            Console.WriteLine($"WebSocket connection failed: {ex.Message}");
            throw;
        }
    }

    private static async Task SendRunTaskCommandAsync(string taskId) {
        var command = CreateCommand("run-task", taskId, "duplex", new {
            task_group = "audio",
            task = "tts",
            function = "SpeechSynthesizer",
            model = "cosyvoice-v3-flash",
            parameters = new
            {
                text_type = "PlainText",
                voice = "longanyang",
                format = "mp3",
                sample_rate = 22050,
                volume = 50,
                rate = 1,
                pitch = 1,
                // If enable_ssml is set to true, only one continue-task event is allowed; otherwise the error "Text request limit violated, expected 1." will occur.
                enable_ssml = false
            },
            input = new { }
        });

        await SendJsonMessageAsync(command);
        Console.WriteLine("run-task event sent.");
    }

    private static async Task SendContinueTaskCommandAsync(string text) {
        if (_taskId == null) {
            throw new InvalidOperationException("Task ID not initialized.");
        }

        var command = CreateCommand("continue-task", _taskId, "duplex", new {
            input = new {
                text
            }
        });

        await SendJsonMessageAsync(command);
        Console.WriteLine("continue-task event sent.");
    }

    private static async Task SendFinishTaskCommandAsync(string taskId) {
        var command = CreateCommand("finish-task", taskId, "duplex", new {
            input = new { }
        });

        await SendJsonMessageAsync(command);
        Console.WriteLine("finish-task event sent.");
    }

    private static async Task SendJsonMessageAsync(string message) {
        var buffer = Encoding.UTF8.GetBytes(message);
        try {
            await _webSocket.SendAsync(new ArraySegment<byte>(buffer), WebSocketMessageType.Text, true, _cancellationTokenSource.Token);
        } catch (OperationCanceledException) {
            Console.WriteLine("Message sending cancelled.");
        }
    }

    private static async Task ReceiveMessagesAsync() {
        while (_webSocket.State == WebSocketState.Open) {
            var response = await ReceiveMessageAsync();
            if (response != null) {
                var eventStr = response.RootElement.GetProperty("header").GetProperty("event").GetString();
                switch (eventStr) {
                    case "task-started":
                        Console.WriteLine("Task started.");
                        _taskStartedTcs.TrySetResult(true);
                        break;
                    case "task-finished":
                        Console.WriteLine("Task completed.");
                        _cancellationTokenSource.Cancel();
                        break;
                    case "task-failed":
                        Console.WriteLine("Task failed: " + response.RootElement.GetProperty("header").GetProperty("error_message").GetString());
                        _cancellationTokenSource.Cancel();
                        break;
                    default:
                        // result-generated can be handled here
                        break;
                }
            }
        }
    }

    private static async Task<JsonDocument?> ReceiveMessageAsync() {
        var buffer = new byte[1024 * 4];
        var segment = new ArraySegment<byte>(buffer);

        try {
            WebSocketReceiveResult result = await _webSocket.ReceiveAsync(segment, _cancellationTokenSource.Token);

            if (result.MessageType == WebSocketMessageType.Close) {
                await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", _cancellationTokenSource.Token);
                return null;
            }

            if (result.MessageType == WebSocketMessageType.Binary) {
                // Process binary data
                Console.WriteLine("Received binary data...");

                // Save binary data to file
                using (var fileStream = new FileStream(OutputFilePath, FileMode.Append)) {
                    fileStream.Write(buffer, 0, result.Count);
                }

                return null;
            }

            string message = Encoding.UTF8.GetString(buffer, 0, result.Count);
            return JsonDocument.Parse(message);
        } catch (OperationCanceledException) {
            Console.WriteLine("Message receiving cancelled.");
            return null;
        }
    }

    private static string GenerateTaskId() {
        return Guid.NewGuid().ToString("N").Substring(0, 32);
    }

    private static string CreateCommand(string action, string taskId, string streaming, object payload) {
        var command = new {
            header = new {
                action,
                task_id = taskId,
                streaming
            },
            payload
        };

        return JsonSerializer.Serialize(command);
    }
}

PHP

The example code directory structure:

my-php-project/

├── composer.json

├── vendor/

└── index.php

The composer.json contents are as follows. Determine the appropriate dependency versions based on your requirements:

{
    "require": {
        "react/event-loop": "^1.3",
        "react/socket": "^1.11",
        "react/stream": "^1.2",
        "react/http": "^1.1",
        "ratchet/pawl": "^0.4"
    },
    "autoload": {
        "psr-4": {
            "App\\": "src/"
        }
    }
}

The index.php contents:

<?php

require __DIR__ . '/vendor/autoload.php';

use Ratchet\Client\Connector;
use React\EventLoop\Loop;
use React\Socket\Connector as SocketConnector;

// The API Key differs between the Singapore and Beijing regions. Obtain an API Key: https://www.alibabacloud.com/help/zh/model-studio/get-api-key
// If the environment variable is not configured, replace the following line with your Model Studio API Key: $api_key = "sk-xxx"
$api_key = getenv("DASHSCOPE_API_KEY");
// The following is the Singapore region URL. If you use a model in the Beijing region, replace the URL with: wss://dashscope.aliyuncs.com/api-ws/v1/inference/
$websocket_url = 'wss://dashscope-intl.aliyuncs.com/api-ws/v1/inference/'; // WebSocket server address
$output_file = 'output.mp3'; // Output file path

$loop = Loop::get();

if (file_exists($output_file)) {
    // Clear file content
    file_put_contents($output_file, '');
}

// Create a custom connector
$socketConnector = new SocketConnector($loop, [
    'tcp' => [
        'bindto' => '0.0.0.0:0',
    ],
    'tls' => [
        'verify_peer' => false,
        'verify_peer_name' => false,
    ],
]);

$connector = new Connector($loop, $socketConnector);

$headers = [
    'Authorization' => 'bearer ' . $api_key,
    'X-DashScope-DataInspection' => 'enable'
];

$connector($websocket_url, [], $headers)->then(function ($conn) use ($loop, $output_file) {
    echo "Connected to WebSocket server\n";

    // Generate task ID
    $taskId = generateTaskId();

    // Send run-task event
    sendRunTaskMessage($conn, $taskId);

    // Define the function for sending continue-task events
    $sendContinueTask = function() use ($conn, $loop, $taskId) {
        // Text to send
        $texts = ["Before my bed, moonlight shines bright", "I suspect it's frost upon the ground", "I raise my eyes to gaze at the bright moon", "then bow my head, thinking of home"];
        $continueTaskCount = 0;
        foreach ($texts as $text) {
            $continueTaskMessage = json_encode([
                "header" => [
                    "action" => "continue-task",
                    "task_id" => $taskId,
                    "streaming" => "duplex"
                ],
                "payload" => [
                    "input" => [
                        "text" => $text
                    ]
                ]
            ]);
            echo "Sending continue-task event: " . $continueTaskMessage . "\n";
            $conn->send($continueTaskMessage);
            $continueTaskCount++;
        }
        echo "Number of continue-task events sent: " . $continueTaskCount . "\n";

        // Send finish-task event
        sendFinishTaskMessage($conn, $taskId);
    };

    // Flag for whether the task-started event has been received
    $taskStarted = false;

    // Listen for messages
    $conn->on('message', function($msg) use ($conn, $sendContinueTask, $loop, &$taskStarted, $taskId, $output_file) {
        if ($msg->isBinary()) {
            // Write binary data to local file
            file_put_contents($output_file, $msg->getPayload(), FILE_APPEND);
        } else {
            // Process non-binary messages
            $response = json_decode($msg, true);

            if (isset($response['header']['event'])) {
                handleEvent($conn, $response, $sendContinueTask, $loop, $taskId, $taskStarted);
            } else {
                echo "Unknown message format\n";
            }
        }
    });

    // Listen for connection close
    $conn->on('close', function($code = null, $reason = null) {
        echo "Connection closed\n";
        if ($code !== null) {
            echo "Close code: " . $code . "\n";
        }
        if ($reason !== null) {
            echo "Close reason: " . $reason . "\n";
        }
    });
}, function ($e) {
    echo "Unable to connect: {$e->getMessage()}\n";
});

$loop->run();

/**
 * Generate task ID
 * @return string
 */
function generateTaskId(): string {
    return bin2hex(random_bytes(16));
}

/**
 * Send run-task event
 * @param $conn
 * @param $taskId
 */
function sendRunTaskMessage($conn, $taskId) {
    $runTaskMessage = json_encode([
        "header" => [
            "action" => "run-task",
            "task_id" => $taskId,
            "streaming" => "duplex"
        ],
        "payload" => [
            "task_group" => "audio",
            "task" => "tts",
            "function" => "SpeechSynthesizer",
            "model" => "cosyvoice-v3-flash",
            "parameters" => [
                "text_type" => "PlainText",
                "voice" => "longanyang",
                "format" => "mp3",
                "sample_rate" => 22050,
                "volume" => 50,
                "rate" => 1,
                "pitch" => 1,
                // If enable_ssml is set to true, only one continue-task event is allowed; otherwise the error "Text request limit violated, expected 1." will occur.
                "enable_ssml" => false
            ],
            "input" => (object) []
        ]
    ]);
    echo "Sending run-task event: " . $runTaskMessage . "\n";
    $conn->send($runTaskMessage);
    echo "run-task event sent\n";
}

/**
 * Read audio file
 * @param string $filePath
 * @return bool|string
 */
function readAudioFile(string $filePath) {
    $voiceData = file_get_contents($filePath);
    if ($voiceData === false) {
        echo "Unable to read audio file\n";
    }
    return $voiceData;
}

/**
 * Split audio data
 * @param string $data
 * @param int $chunkSize
 * @return array
 */
function splitAudioData(string $data, int $chunkSize): array {
    return str_split($data, $chunkSize);
}

/**
 * Send finish-task event
 * @param $conn
 * @param $taskId
 */
function sendFinishTaskMessage($conn, $taskId) {
    $finishTaskMessage = json_encode([
        "header" => [
            "action" => "finish-task",
            "task_id" => $taskId,
            "streaming" => "duplex"
        ],
        "payload" => [
            "input" => (object) []
        ]
    ]);
    echo "Sending finish-task event: " . $finishTaskMessage . "\n";
    $conn->send($finishTaskMessage);
    echo "finish-task event sent\n";
}

/**
 * Handle events
 * @param $conn
 * @param $response
 * @param $sendContinueTask
 * @param $loop
 * @param $taskId
 * @param $taskStarted
 */
function handleEvent($conn, $response, $sendContinueTask, $loop, $taskId, &$taskStarted) {
    switch ($response['header']['event']) {
        case 'task-started':
            echo "Task started, sending continue-task events...\n";
            $taskStarted = true;
            // Send continue-task events
            $sendContinueTask();
            break;
        case 'result-generated':
            // Received result-generated event
            break;
        case 'task-finished':
            echo "Task completed\n";
            $conn->close();
            break;
        case 'task-failed':
            echo "Task failed\n";
            echo "Error code: " . $response['header']['error_code'] . "\n";
            echo "Error message: " . $response['header']['error_message'] . "\n";
            $conn->close();
            break;
        case 'error':
            echo "Error: " . $response['payload']['message'] . "\n";
            break;
        default:
            echo "Unknown event: " . $response['header']['event'] . "\n";
            break;
    }

    // If the task is completed, close the connection
    if ($response['header']['event'] == 'task-finished') {
        // Wait 1 second to ensure all data has been transmitted
        $loop->addTimer(1, function() use ($conn) {
            $conn->close();
            echo "Client closed connection\n";
        });
    }

    // If the task-started event was not received, close the connection
    if (!$taskStarted && in_array($response['header']['event'], ['task-failed', 'error'])) {
        $conn->close();
    }
}

Node.js

Install the required dependencies:

npm install ws
npm install uuid

Example code:

const WebSocket = require('ws');
const fs = require('fs');
const uuid = require('uuid').v4;

// The API Key differs between the Singapore and Beijing regions. Obtain an API Key: https://www.alibabacloud.com/help/zh/model-studio/get-api-key
// If the environment variable is not configured, replace the following line with your Model Studio API Key: const apiKey = "sk-xxx"
const apiKey = process.env.DASHSCOPE_API_KEY;
// The following is the Singapore region URL. If you use a model in the Beijing region, replace the URL with: wss://dashscope.aliyuncs.com/api-ws/v1/inference/
const url = 'wss://dashscope-intl.aliyuncs.com/api-ws/v1/inference/';
// Output file path
const outputFilePath = 'output.mp3';

// Clear the output file
fs.writeFileSync(outputFilePath, '');

// Create WebSocket client
const ws = new WebSocket(url, {
  headers: {
    Authorization: `bearer ${apiKey}`,
    'X-DashScope-DataInspection': 'enable'
  }
});

let taskStarted = false;
let taskId = uuid();

ws.on('open', () => {
  console.log('Connected to WebSocket server');

  // Send run-task event
  const runTaskMessage = JSON.stringify({
    header: {
      action: 'run-task',
      task_id: taskId,
      streaming: 'duplex'
    },
    payload: {
      task_group: 'audio',
      task: 'tts',
      function: 'SpeechSynthesizer',
      model: 'cosyvoice-v3-flash',
      parameters: {
        text_type: 'PlainText',
        voice: 'longanyang', // Voice
        format: 'mp3', // Audio format
        sample_rate: 22050, // Sample rate
        volume: 50, // Volume
        rate: 1, // Speech rate
        pitch: 1, // Pitch
        enable_ssml: false // Whether to enable SSML. If enable_ssml is set to true, only one continue-task event is allowed; otherwise the error "Text request limit violated, expected 1." will occur.
      },
      input: {}
    }
  });
  ws.send(runTaskMessage);
  console.log('run-task message sent');
});

const fileStream = fs.createWriteStream(outputFilePath, { flags: 'a' });
ws.on('message', (data, isBinary) => {
  if (isBinary) {
    // Write binary data to file
    fileStream.write(data);
  } else {
    const message = JSON.parse(data);

    switch (message.header.event) {
      case 'task-started':
        taskStarted = true;
        console.log('Task started');
        // Send continue-task events
        sendContinueTasks(ws);
        break;
      case 'task-finished':
        console.log('Task completed');
        ws.close();
        fileStream.end(() => {
          console.log('File stream closed');
        });
        break;
      case 'task-failed':
        console.error('Task failed:', message.header.error_message);
        ws.close();
        fileStream.end(() => {
          console.log('File stream closed');
        });
        break;
      default:
        // result-generated can be handled here
        break;
    }
  }
});

function sendContinueTasks(ws) {
  const texts = [
    'Before my bed, moonlight shines bright,',
    'I suspect it\'s frost upon the ground.',
    'I raise my eyes to gaze at the bright moon,',
    'then bow my head, thinking of home.'
  ];

  texts.forEach((text, index) => {
    setTimeout(() => {
      if (taskStarted) {
        const continueTaskMessage = JSON.stringify({
          header: {
            action: 'continue-task',
            task_id: taskId,
            streaming: 'duplex'
          },
          payload: {
            input: {
              text: text
            }
          }
        });
        ws.send(continueTaskMessage);
        console.log(`Sent continue-task, text: ${text}`);
      }
    }, index * 1000); // Send one every second
  });

  // Send finish-task event
  setTimeout(() => {
    if (taskStarted) {
      const finishTaskMessage = JSON.stringify({
        header: {
          action: 'finish-task',
          task_id: taskId,
          streaming: 'duplex'
        },
        payload: {
          input: {}
        }
      });
      ws.send(finishTaskMessage);
      console.log('finish-task sent');
    }
  }, texts.length * 1000 + 1000); // Send 1 second after all continue-task events are sent
}

ws.on('close', () => {
  console.log('Disconnected from WebSocket server');
});

Java

For Java, we recommend using the Java DashScope SDK. For details, see Java SDK.

The following is a Java WebSocket example. Before running it, make sure you've imported these dependencies:

  • Java-WebSocket

  • jackson-databind

Use Maven or Gradle to manage dependencies. The configuration is as follows:

pom.xml

<dependencies>
    <!-- WebSocket Client -->
    <dependency>
        <groupId>org.java-websocket</groupId>
        <artifactId>Java-WebSocket</artifactId>
        <version>1.5.3</version>
    </dependency>

    <!-- JSON Processing -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.13.0</version>
    </dependency>
</dependencies>

build.gradle

// Other code omitted
dependencies {
  // WebSocket Client
  implementation 'org.java-websocket:Java-WebSocket:1.5.3'
  // JSON Processing
  implementation 'com.fasterxml.jackson.core:jackson-databind:2.13.0'
}
// Other code omitted

Java code:

import com.fasterxml.jackson.databind.ObjectMapper;

import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;

import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.*;

public class TTSWebSocketClient extends WebSocketClient {
    private final String taskId = UUID.randomUUID().toString();
    private final String outputFile = "output_" + System.currentTimeMillis() + ".mp3";
    private boolean taskFinished = false;

    public TTSWebSocketClient(URI serverUri, Map<String, String> headers) {
        super(serverUri, headers);
    }

    @Override
    public void onOpen(ServerHandshake serverHandshake) {
        System.out.println("Connection established");

        // Send run-task event
        // If enable_ssml is set to true, only one continue-task event is allowed; otherwise the error "Text request limit violated, expected 1." will occur.
        String runTaskCommand = "{ \"header\": { \"action\": \"run-task\", \"task_id\": \"" + taskId + "\", \"streaming\": \"duplex\" }, \"payload\": { \"task_group\": \"audio\", \"task\": \"tts\", \"function\": \"SpeechSynthesizer\", \"model\": \"cosyvoice-v3-flash\", \"parameters\": { \"text_type\": \"PlainText\", \"voice\": \"longanyang\", \"format\": \"mp3\", \"sample_rate\": 22050, \"volume\": 50, \"rate\": 1, \"pitch\": 1, \"enable_ssml\": false }, \"input\": {} }}";
        send(runTaskCommand);
    }

    @Override
    public void onMessage(String message) {
        System.out.println("Received message from server: " + message);
        try {
            // Parse JSON message
            Map<String, Object> messageMap = new ObjectMapper().readValue(message, Map.class);

            if (messageMap.containsKey("header")) {
                Map<String, Object> header = (Map<String, Object>) messageMap.get("header");

                if (header.containsKey("event")) {
                    String event = (String) header.get("event");

                    if ("task-started".equals(event)) {
                        System.out.println("Received task-started event from server");

                        List<String> texts = Arrays.asList(
                                "Before my bed, moonlight shines bright, I suspect it's frost upon the ground",
                                "I raise my eyes to gaze at the bright moon, then bow my head, thinking of home"
                        );

                        for (String text : texts) {
                            // Send continue-task event
                            sendContinueTask(text);
                        }

                        // Send finish-task event
                        sendFinishTask();
                    } else if ("task-finished".equals(event)) {
                        System.out.println("Received task-finished event from server");
                        taskFinished = true;
                        closeConnection();
                    } else if ("task-failed".equals(event)) {
                        System.out.println("Task failed: " + message);
                        closeConnection();
                    }
                }
            }
        } catch (Exception e) {
            System.err.println("Exception occurred: " + e.getMessage());
        }
    }

    @Override
    public void onMessage(ByteBuffer message) {
        System.out.println("Received binary audio data, size: " + message.remaining());

        try (FileOutputStream fos = new FileOutputStream(outputFile, true)) {
            byte[] buffer = new byte[message.remaining()];
            message.get(buffer);
            fos.write(buffer);
            System.out.println("Audio data written to local file " + outputFile);
        } catch (IOException e) {
            System.err.println("Failed to write audio data to local file: " + e.getMessage());
        }
    }

    @Override
    public void onClose(int code, String reason, boolean remote) {
        System.out.println("Connection closed: " + reason + " (" + code + ")");
    }

    @Override
    public void onError(Exception ex) {
        System.err.println("Error: " + ex.getMessage());
        ex.printStackTrace();
    }

    private void sendContinueTask(String text) {
        String command = "{ \"header\": { \"action\": \"continue-task\", \"task_id\": \"" + taskId + "\", \"streaming\": \"duplex\" }, \"payload\": { \"input\": { \"text\": \"" + text + "\" } }}";
        send(command);
    }

    private void sendFinishTask() {
        String command = "{ \"header\": { \"action\": \"finish-task\", \"task_id\": \"" + taskId + "\", \"streaming\": \"duplex\" }, \"payload\": { \"input\": {} }}";
        send(command);
    }

    private void closeConnection() {
        if (!isClosed()) {
            close();
        }
    }

    public static void main(String[] args) {
        try {
            // The API Key differs between the Singapore and Beijing regions. Obtain an API Key: https://www.alibabacloud.com/help/zh/model-studio/get-api-key
            // If the environment variable is not configured, replace the following line with your Model Studio API Key: String apiKey = "sk-xxx"
            String apiKey = System.getenv("DASHSCOPE_API_KEY");
            if (apiKey == null || apiKey.isEmpty()) {
                System.err.println("Please set the DASHSCOPE_API_KEY environment variable");
                return;
            }

            Map<String, String> headers = new HashMap<>();
            headers.put("Authorization", "bearer " + apiKey);
            // The following is the Singapore region URL. If you use a model in the Beijing region, replace the URL with: wss://dashscope.aliyuncs.com/api-ws/v1/inference/
            TTSWebSocketClient client = new TTSWebSocketClient(new URI("wss://dashscope-intl.aliyuncs.com/api-ws/v1/inference/"), headers);

            client.connect();

            while (!client.isClosed() && !client.taskFinished) {
                Thread.sleep(1000);
            }
        } catch (Exception e) {
            System.err.println("Failed to connect to WebSocket service: " + e.getMessage());
            e.printStackTrace();
        }
    }
}

Python

For Python, we recommend using the Python DashScope SDK. For details, see Python SDK.

The following is a Python WebSocket example. Before running it, install the dependency as follows:

pip uninstall websocket-client
pip uninstall websocket
pip install websocket-client
Important

Don't name your Python file "websocket.py". Otherwise, an error occurs (AttributeError: module 'websocket' has no attribute 'WebSocketApp'. Did you mean: 'WebSocket'?).

import websocket
import json
import uuid
import os
import time

class TTSClient:
    def __init__(self, api_key, uri):
        """
    Initialize a TTSClient instance.

    Parameters:
        api_key (str): API Key for authentication
        uri (str): WebSocket service address
    """
        self.api_key = api_key  # Replace with your API Key
        self.uri = uri  # Replace with your WebSocket address
        self.task_id = str(uuid.uuid4())  # Generate a unique task ID
        self.output_file = f"output_{int(time.time())}.mp3"  # Output audio file path
        self.ws = None  # WebSocketApp instance
        self.task_started = False  # Whether task-started has been received
        self.task_finished = False  # Whether task-finished / task-failed has been received

    def on_open(self, ws):
        """
    Callback when WebSocket connection is established.
    Sends the run-task event to start the speech synthesis task.
    """
        print("WebSocket connected")

        # Construct the run-task event
        run_task_cmd = {
            "header": {
                "action": "run-task",
                "task_id": self.task_id,
                "streaming": "duplex"
            },
            "payload": {
                "task_group": "audio",
                "task": "tts",
                "function": "SpeechSynthesizer",
                "model": "cosyvoice-v3-flash",
                "parameters": {
                    "text_type": "PlainText",
                    "voice": "longanyang",
                    "format": "mp3",
                    "sample_rate": 22050,
                    "volume": 50,
                    "rate": 1,
                    "pitch": 1,
                    # If enable_ssml is set to True, only one continue-task event is allowed; otherwise the error "Text request limit violated, expected 1." will occur.
                    "enable_ssml": False
                },
                "input": {}
            }
        }

        # Send the run-task event
        ws.send(json.dumps(run_task_cmd))
        print("run-task event sent")

    def on_message(self, ws, message):
        """
    Callback when a message is received.
    Handles text and binary messages separately.
    """
        if isinstance(message, str):
            # Process JSON text messages
            try:
                msg_json = json.loads(message)
                print(f"Received JSON message: {msg_json}")

                if "header" in msg_json:
                    header = msg_json["header"]

                    if "event" in header:
                        event = header["event"]

                        if event == "task-started":
                            print("Task started")
                            self.task_started = True

                            # Send continue-task events
                            texts = [
                                "Before my bed, moonlight shines bright, I suspect it's frost upon the ground",
                                "I raise my eyes to gaze at the bright moon, then bow my head, thinking of home"
                            ]

                            for text in texts:
                                self.send_continue_task(text)

                            # Send finish-task after all continue-task events are sent
                            self.send_finish_task()

                        elif event == "task-finished":
                            print("Task completed")
                            self.task_finished = True
                            self.close(ws)

                        elif event == "task-failed":
                            error_msg = msg_json.get("error_message", "Unknown error")
                            print(f"Task failed: {error_msg}")
                            self.task_finished = True
                            self.close(ws)

            except json.JSONDecodeError as e:
                print(f"JSON parsing failed: {e}")
        else:
            # Process binary messages (audio data)
            print(f"Received binary message, size: {len(message)} bytes")
            with open(self.output_file, "ab") as f:
                f.write(message)
            print(f"Audio data written to local file {self.output_file}")

    def on_error(self, ws, error):
        """Callback when an error occurs"""
        print(f"WebSocket error: {error}")

    def on_close(self, ws, close_status_code, close_msg):
        """Callback when connection is closed"""
        print(f"WebSocket closed: {close_msg} ({close_status_code})")

    def send_continue_task(self, text):
        """Send a continue-task event with the text content to synthesize"""
        cmd = {
            "header": {
                "action": "continue-task",
                "task_id": self.task_id,
                "streaming": "duplex"
            },
            "payload": {
                "input": {
                    "text": text
                }
            }
        }

        self.ws.send(json.dumps(cmd))
        print(f"Sent continue-task event, text content: {text}")

    def send_finish_task(self):
        """Send a finish-task event to end the speech synthesis task"""
        cmd = {
            "header": {
                "action": "finish-task",
                "task_id": self.task_id,
                "streaming": "duplex"
            },
            "payload": {
                "input": {}
            }
        }

        self.ws.send(json.dumps(cmd))
        print("finish-task event sent")

    def close(self, ws):
        """Actively close the connection"""
        if ws and ws.sock and ws.sock.connected:
            ws.close()
            print("Connection closed actively")

    def run(self):
        """Start the WebSocket client"""
        # Set request headers (authentication)
        header = {
            "Authorization": f"bearer {self.api_key}",
            "X-DashScope-DataInspection": "enable"
        }

        # Create a WebSocketApp instance
        self.ws = websocket.WebSocketApp(
            self.uri,
            header=header,
            on_open=self.on_open,
            on_message=self.on_message,
            on_error=self.on_error,
            on_close=self.on_close
        )

        print("Listening for WebSocket messages...")
        self.ws.run_forever()  # Start long-lived connection listener

# Example usage
if __name__ == "__main__":
    # The API Key differs between the Singapore and Beijing regions. Obtain an API Key: https://www.alibabacloud.com/help/zh/model-studio/get-api-key
    # If the environment variable is not configured, replace the following line with your Model Studio API Key: API_KEY = "sk-xxx"
    API_KEY = os.environ.get("DASHSCOPE_API_KEY")
    # The following is the Singapore region URL. If you use a model in the Beijing region, replace the URL with: wss://dashscope.aliyuncs.com/api-ws/v1/inference/
    SERVER_URI = "wss://dashscope-intl.aliyuncs.com/api-ws/v1/inference/"  # Replace with your WebSocket address

    client = TTSClient(API_KEY, SERVER_URI)
    client.run()

Qwen-TTS

  1. Create the client

    Python

    Create a Python file named tts_realtime_client.py and copy the following code into it:

    # -- coding: utf-8 --
    
    import asyncio
    import websockets
    import json
    import base64
    import time
    from typing import Optional, Callable, Dict, Any
    from enum import Enum
    
    class SessionMode(Enum):
        SERVER_COMMIT = "server_commit"
        COMMIT = "commit"
    
    class TTSRealtimeClient:
        """
        Client for interacting with the TTS Realtime API.
    
        This class provides methods for connecting to the TTS Realtime API, sending text data,
        receiving audio output, and managing WebSocket connections.
    
        Attributes:
            base_url (str):
                Base URL of the Realtime API.
            api_key (str):
                API Key for authentication.
            voice (str):
                Voice used by the server for speech synthesis.
            mode (SessionMode):
                Session mode, either server_commit or commit.
            audio_callback (Callable[[bytes], None]):
                Callback function for receiving audio data.
            language_type(str)
                Language of the synthesized speech. Options: Chinese, English, German, Italian, Portuguese, Spanish, Japanese, Korean, French, Russian, Auto
        """
    
        def __init__(
                self,
                base_url: str,
                api_key: str,
                voice: str = "Cherry",
                mode: SessionMode = SessionMode.SERVER_COMMIT,
                audio_callback: Optional[Callable[[bytes], None]] = None,
            language_type: str = "Auto"):
            self.base_url = base_url
            self.api_key = api_key
            self.voice = voice
            self.mode = mode
            self.ws = None
            self.audio_callback = audio_callback
            self.language_type = language_type
    
            # Current response state
            self._current_response_id = None
            self._current_item_id = None
            self._is_responding = False
            self._response_done_future = None
    
        async def connect(self) -> None:
            """Establish a WebSocket connection with the TTS Realtime API."""
            headers = {
                "Authorization": f"Bearer {self.api_key}"
            }
    
            self.ws = await websockets.connect(self.base_url, additional_headers=headers)
    
            # Set default session configuration
            await self.update_session({
                "mode": self.mode.value,
                "voice": self.voice,
                # To use the instruction control feature, uncomment the following lines and replace the model with qwen3-tts-instruct-flash-realtime in server_commit.py or commit.py
                # "instructions": "Speak quickly with a noticeable rising intonation, suitable for introducing fashion products.",
                # "optimize_instructions": true
                "language_type": self.language_type,
                "response_format": "pcm",
                "sample_rate": 24000
            })
    
        async def send_event(self, event) -> None:
            """Send an event to the server."""
            event['event_id'] = "event_" + str(int(time.time() * 1000))
            print(f"Sending event: type={event['type']}, event_id={event['event_id']}")
            await self.ws.send(json.dumps(event))
    
        async def update_session(self, config: Dict[str, Any]) -> None:
            """Update session configuration."""
            event = {
                "type": "session.update",
                "session": config
            }
            print("Updating session configuration: ", event)
            await self.send_event(event)
    
        async def append_text(self, text: str) -> None:
            """Send text data to the API."""
            event = {
                "type": "input_text_buffer.append",
                "text": text
            }
            await self.send_event(event)
    
        async def commit_text_buffer(self) -> None:
            """Commit the text buffer to trigger processing."""
            event = {
                "type": "input_text_buffer.commit"
            }
            await self.send_event(event)
    
        async def clear_text_buffer(self) -> None:
            """Clear the text buffer."""
            event = {
                "type": "input_text_buffer.clear"
            }
            await self.send_event(event)
    
        async def finish_session(self) -> None:
            """End the session."""
            event = {
                "type": "session.finish"
            }
            await self.send_event(event)
    
        async def wait_for_response_done(self):
            """Wait for the response.done event"""
            if self._response_done_future:
                await self._response_done_future
    
        async def handle_messages(self) -> None:
            """Process messages from the server."""
            try:
                async for message in self.ws:
                    event = json.loads(message)
                    event_type = event.get("type")
    
                    if event_type != "response.audio.delta":
                        print(f"Received event: {event_type}")
    
                    if event_type == "error":
                        print("Error: ", event.get('error', {}))
                        continue
                    elif event_type == "session.created":
                        print("Session created, ID: ", event.get('session', {}).get('id'))
                    elif event_type == "session.updated":
                        print("Session updated, ID: ", event.get('session', {}).get('id'))
                    elif event_type == "input_text_buffer.committed":
                        print("Text buffer committed, item ID: ", event.get('item_id'))
                    elif event_type == "input_text_buffer.cleared":
                        print("Text buffer cleared")
                    elif event_type == "response.created":
                        self._current_response_id = event.get("response", {}).get("id")
                        self._is_responding = True
                        # Create a new future to wait for response.done
                        self._response_done_future = asyncio.Future()
                        print("Response created, ID: ", self._current_response_id)
                    elif event_type == "response.output_item.added":
                        self._current_item_id = event.get("item", {}).get("id")
                        print("Output item added, ID: ", self._current_item_id)
                    # Process audio delta
                    elif event_type == "response.audio.delta" and self.audio_callback:
                        audio_bytes = base64.b64decode(event.get("delta", ""))
                        self.audio_callback(audio_bytes)
                    elif event_type == "response.audio.done":
                        print("Audio generation completed")
                    elif event_type == "response.done":
                        self._is_responding = False
                        self._current_response_id = None
                        self._current_item_id = None
                        # Mark the future as done
                        if self._response_done_future and not self._response_done_future.done():
                            self._response_done_future.set_result(True)
                        print("Response completed")
                    elif event_type == "session.finished":
                        print("Session ended")
    
            except websockets.exceptions.ConnectionClosed:
                print("Connection closed")
            except Exception as e:
                print("Error processing messages: ", str(e))
    
        async def close(self) -> None:
            """Close the WebSocket connection."""
            if self.ws:
                await self.ws.close()

    Java

    Create a Java file named TTSRealtimeClient.java and copy the following code into it:

    import com.google.gson.Gson;
    import com.google.gson.JsonObject;
    import org.java_websocket.client.WebSocketClient;
    import org.java_websocket.handshake.ServerHandshake;
    
    import java.net.URI;
    import java.util.Base64;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.CountDownLatch;
    import java.util.function.Consumer;
    
    /**
     * Client for interacting with the TTS Realtime API.
     *
     * This class provides methods for connecting to the TTS Realtime API, sending text data,
     * receiving audio output, and managing WebSocket connections.
     */
    public class TTSRealtimeClient {
    
        public enum SessionMode {
            SERVER_COMMIT("server_commit"),
            COMMIT("commit");
            private final String value;
            SessionMode(String value) { this.value = value; }
            public String getValue() { return value; }
        }
    
        /**
         * Audio callback interface
         */
        public interface AudioCallback {
            void onAudio(byte[] audioData);
        }
    
        private final String baseUrl;
        private final String apiKey;
        private final String voice;
        private final SessionMode mode;
        private final String languageType;
        private final AudioCallback audioCallback;
        private final Gson gson = new Gson();
    
        private WebSocketClient ws;
        private CountDownLatch responseDoneLatch;
        private CountDownLatch sessionFinishedLatch;
    
        public TTSRealtimeClient(String baseUrl, String apiKey, String voice,
                                 SessionMode mode, AudioCallback audioCallback,
                                 String languageType) {
            this.baseUrl = baseUrl;
            this.apiKey = apiKey;
            this.voice = voice;
            this.mode = mode;
            this.audioCallback = audioCallback;
            this.languageType = languageType;
        }
    
        public TTSRealtimeClient(String baseUrl, String apiKey, String voice,
                                 SessionMode mode, AudioCallback audioCallback) {
            this(baseUrl, apiKey, voice, mode, audioCallback, "Auto");
        }
    
        /**
         * Establish a WebSocket connection with the TTS Realtime API.
         */
        public void connect() throws Exception {
            Map<String, String> headers = new HashMap<>();
            headers.put("Authorization", "Bearer " + apiKey);
    
            responseDoneLatch = new CountDownLatch(0);
            sessionFinishedLatch = new CountDownLatch(1);
    
            ws = new WebSocketClient(new URI(baseUrl), headers) {
                @Override
                public void onOpen(ServerHandshake handshake) {
                    System.out.println("WebSocket connection established");
                    // Send default session configuration
                    JsonObject session = new JsonObject();
                    session.addProperty("mode", mode.getValue());
                    session.addProperty("voice", TTSRealtimeClient.this.voice);
                    // To use the instruction control feature, uncomment the following lines and replace the model with qwen3-tts-instruct-flash-realtime
                    // session.addProperty("instructions", "Speak quickly with a noticeable rising intonation, suitable for introducing fashion products.");
                    // session.addProperty("optimize_instructions", true);
                    session.addProperty("language_type", languageType);
                    session.addProperty("response_format", "pcm");
                    session.addProperty("sample_rate", 24000);
                    updateSession(session);
                }
    
                @Override
                public void onMessage(String message) {
                    JsonObject event = gson.fromJson(message, JsonObject.class);
                    String eventType = event.has("type") ? event.get("type").getAsString() : "";
    
                    if (!"response.audio.delta".equals(eventType)) {
                        System.out.println("Received event: " + eventType);
                    }
    
                    switch (eventType) {
                        case "error":
                            System.err.println("Error: " + event.get("error"));
                            break;
                        case "session.created":
                            System.out.println("Session created, ID: " +
                                event.getAsJsonObject("session").get("id").getAsString());
                            break;
                        case "session.updated":
                            System.out.println("Session updated, ID: " +
                                event.getAsJsonObject("session").get("id").getAsString());
                            break;
                        case "input_text_buffer.committed":
                            System.out.println("Text buffer committed, item ID: " + event.get("item_id"));
                            break;
                        case "input_text_buffer.cleared":
                            System.out.println("Text buffer cleared");
                            break;
                        case "response.created":
                            System.out.println("Response created, ID: " +
                                event.getAsJsonObject("response").get("id").getAsString());
                            responseDoneLatch = new CountDownLatch(1);
                            break;
                        case "response.output_item.added":
                            System.out.println("Output item added, ID: " +
                                event.getAsJsonObject("item").get("id").getAsString());
                            break;
                        case "response.audio.delta":
                            if (audioCallback != null) {
                                byte[] audioBytes = Base64.getDecoder().decode(
                                    event.get("delta").getAsString());
                                audioCallback.onAudio(audioBytes);
                            }
                            break;
                        case "response.audio.done":
                            System.out.println("Audio generation completed");
                            break;
                        case "response.done":
                            System.out.println("Response completed");
                            responseDoneLatch.countDown();
                            break;
                        case "session.finished":
                            System.out.println("Session ended");
                            sessionFinishedLatch.countDown();
                            break;
                    }
                }
    
                @Override
                public void onClose(int code, String reason, boolean remote) {
                    System.out.println("Connection closed: " + reason);
                }
    
                @Override
                public void onError(Exception ex) {
                    System.err.println("WebSocket error: " + ex.getMessage());
                }
            };
            ws.connectBlocking();
        }
    
        /**
         * Send an event to the server.
         */
        public void sendEvent(JsonObject event) {
            String eventId = "event_" + System.currentTimeMillis();
            event.addProperty("event_id", eventId);
            System.out.println("Sending event: type=" + event.get("type").getAsString()
                + ", event_id=" + eventId);
            ws.send(gson.toJson(event));
        }
    
        /**
         * Update session configuration.
         */
        public void updateSession(JsonObject config) {
            JsonObject event = new JsonObject();
            event.addProperty("type", "session.update");
            event.add("session", config);
            System.out.println("Updating session configuration: " + event);
            sendEvent(event);
        }
    
        /**
         * Send text data to the API.
         */
        public void appendText(String text) {
            JsonObject event = new JsonObject();
            event.addProperty("type", "input_text_buffer.append");
            event.addProperty("text", text);
            sendEvent(event);
        }
    
        /**
         * Commit the text buffer to trigger processing.
         */
        public void commitTextBuffer() {
            JsonObject event = new JsonObject();
            event.addProperty("type", "input_text_buffer.commit");
            sendEvent(event);
        }
    
        /**
         * Clear the text buffer.
         */
        public void clearTextBuffer() {
            JsonObject event = new JsonObject();
            event.addProperty("type", "input_text_buffer.clear");
            sendEvent(event);
        }
    
        /**
         * End the session.
         */
        public void finishSession() {
            JsonObject event = new JsonObject();
            event.addProperty("type", "session.finish");
            sendEvent(event);
        }
    
        /**
         * Wait for the response.done event.
         */
        public void waitForResponseDone() throws InterruptedException {
            responseDoneLatch.await();
        }
    
        /**
         * Wait for the session.finished event.
         */
        public void waitForSessionFinished() throws InterruptedException {
            sessionFinishedLatch.await();
        }
    
        /**
         * Close the WebSocket connection.
         */
        public void close() {
            if (ws != null) {
                ws.close();
            }
        }
    }
  2. Choose a synthesis mode

    The Realtime API supports the following two modes:

    • server_commit mode

      The client sends only text. The server intelligently determines segmentation and synthesis timing. This mode suits low-latency scenarios that don't require manual control over synthesis rhythm, such as GPS navigation.

    • commit mode

      The client adds text to a buffer and then triggers the server to synthesize the specified text. This mode suits scenarios that require fine-grained control over sentence breaks and pauses, such as news broadcasting.

    server_commit mode

    Python

    In the same directory as tts_realtime_client.py, create another Python file named server_commit.py and copy the following code into it:

    import os
    import asyncio
    import logging
    import wave
    from tts_realtime_client import TTSRealtimeClient, SessionMode
    import pyaudio
    
    # QwenTTS service configuration
    # To use the instruction control feature, replace the model with qwen3-tts-instruct-flash-realtime and uncomment the instructions in tts_realtime_client.py
    # The following is the Singapore region URL. If you use a model in the Beijing region, replace the URL with: wss://dashscope.aliyuncs.com/api-ws/v1/realtime?model=qwen3-tts-flash-realtime
    URL = "wss://dashscope-intl.aliyuncs.com/api-ws/v1/realtime?model=qwen3-tts-flash-realtime"
    # The API Key differs between the Singapore and Beijing regions. Obtain an API Key: https://www.alibabacloud.com/help/zh/model-studio/get-api-key
    # If the environment variable is not configured, replace the following line with your Model Studio API Key: API_KEY="sk-xxx"
    API_KEY = os.getenv("DASHSCOPE_API_KEY")
    
    if not API_KEY:
        raise ValueError("Please set DASHSCOPE_API_KEY environment variable")
    
    # Collect audio data
    _audio_chunks = []
    # Real-time playback related
    _AUDIO_SAMPLE_RATE = 24000
    _audio_pyaudio = pyaudio.PyAudio()
    _audio_stream = None  # Will be opened at runtime
    
    def _audio_callback(audio_bytes: bytes):
        """TTSRealtimeClient audio callback: real-time playback and caching"""
        global _audio_stream
        if _audio_stream is not None:
            try:
                _audio_stream.write(audio_bytes)
            except Exception as exc:
                logging.error(f"PyAudio playback error: {exc}")
        _audio_chunks.append(audio_bytes)
        logging.info(f"Received audio chunk: {len(audio_bytes)} bytes")
    
    def _save_audio_to_file(filename: str = "output.wav", sample_rate: int = 24000) -> bool:
        """Save collected audio data as a WAV file"""
        if not _audio_chunks:
            logging.warning("No audio data to save")
            return False
    
        try:
            audio_data = b"".join(_audio_chunks)
            with wave.open(filename, 'wb') as wav_file:
                wav_file.setnchannels(1)  # Mono
                wav_file.setsampwidth(2)  # 16-bit
                wav_file.setframerate(sample_rate)
                wav_file.writeframes(audio_data)
            logging.info(f"Audio saved to: {filename}")
            return True
        except Exception as exc:
            logging.error(f"Failed to save audio: {exc}")
            return False
    
    async def _produce_text(client: TTSRealtimeClient):
        """Send text fragments to the server"""
        text_fragments = [
            "Alibaba Cloud's large language model platform, Model Studio, is an all-in-one platform for developing and building large language model applications.",
            "Both developers and business users can deeply participate in the design and development of large language model applications.",
            "You can develop a large language model application in five minutes using a simple interface,",
            "or train a dedicated model in a few hours, allowing you to focus more energy on application innovation."
        ]
    
        logging.info("Sending text fragments…")
        for text in text_fragments:
            logging.info(f"Sending fragment: {text}")
            await client.append_text(text)
            await asyncio.sleep(0.1)  # Brief delay between fragments
    
        # Wait for server to complete internal processing before ending the session
        await asyncio.sleep(1.0)
        await client.finish_session()
    
    async def _run_demo():
        """Run the complete demo"""
        global _audio_stream
        # Open PyAudio output stream
        _audio_stream = _audio_pyaudio.open(
            format=pyaudio.paInt16,
            channels=1,
            rate=_AUDIO_SAMPLE_RATE,
            output=True,
            frames_per_buffer=1024
        )
    
        client = TTSRealtimeClient(
            base_url=URL,
            api_key=API_KEY,
            voice="Cherry",
            mode=SessionMode.SERVER_COMMIT,
            audio_callback=_audio_callback
        )
    
        # Establish connection
        await client.connect()
    
        # Run message handling and text sending in parallel
        consumer_task = asyncio.create_task(client.handle_messages())
        producer_task = asyncio.create_task(_produce_text(client))
    
        await producer_task  # Wait for text sending to complete
    
        # Wait for response.done
        await client.wait_for_response_done()
    
        # Close connection and cancel consumer task
        await client.close()
        consumer_task.cancel()
    
        # Close audio stream
        if _audio_stream is not None:
            _audio_stream.stop_stream()
            _audio_stream.close()
        _audio_pyaudio.terminate()
    
        # Save audio data
        os.makedirs("outputs", exist_ok=True)
        _save_audio_to_file(os.path.join("outputs", "qwen_tts_output.wav"))
    
    def main():
        """Synchronous entry point"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s [%(levelname)s] %(message)s',
            datefmt='%Y-%m-%d %H:%M:%S'
        )
        logging.info("Starting QwenTTS Realtime Client demo…")
        asyncio.run(_run_demo())
    
    if __name__ == "__main__":
        main()

    Run server_commit.py to hear the audio generated by the Realtime API in real time.

    Java

    In the same directory as TTSRealtimeClient.java, create another Java file named ServerCommit.java and copy the following code into it:

    import javax.sound.sampled.*;
    import java.io.*;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.ConcurrentLinkedQueue;
    import java.util.concurrent.atomic.AtomicBoolean;
    
    public class ServerCommit {
        // The following is the Singapore region URL. If you use a model in the Beijing region, replace the URL with: wss://dashscope.aliyuncs.com/api-ws/v1/realtime?model=qwen3-tts-flash-realtime
        private static final String URL = "wss://dashscope-intl.aliyuncs.com/api-ws/v1/realtime?model=qwen3-tts-flash-realtime";
        // The API Key differs between the Singapore and Beijing regions. Obtain an API Key: https://www.alibabacloud.com/help/zh/model-studio/get-api-key
        // If the environment variable is not configured, replace the following line with your Model Studio API Key: private static final String API_KEY = "sk-xxx";
        private static final String API_KEY = System.getenv("DASHSCOPE_API_KEY");
        private static final int SAMPLE_RATE = 24000;
    
        // Audio data cache
        private static final List<byte[]> audioChunks = new ArrayList<>();
        // Real-time playback queue
        private static final ConcurrentLinkedQueue<byte[]> playbackQueue = new ConcurrentLinkedQueue<>();
        private static final AtomicBoolean playing = new AtomicBoolean(true);
    
        public static void main(String[] args) throws Exception {
            if (API_KEY == null || API_KEY.isEmpty()) {
                throw new IllegalStateException("Please set the DASHSCOPE_API_KEY environment variable");
            }
    
            // Initialize audio playback
            AudioFormat format = new AudioFormat(SAMPLE_RATE, 16, 1, true, false);
            DataLine.Info info = new DataLine.Info(SourceDataLine.class, format);
            SourceDataLine audioLine = (SourceDataLine) AudioSystem.getLine(info);
            audioLine.open(format);
            audioLine.start();
    
            // Start playback thread
            Thread playerThread = new Thread(() -> {
                while (playing.get() || !playbackQueue.isEmpty()) {
                    byte[] chunk = playbackQueue.poll();
                    if (chunk != null) {
                        audioLine.write(chunk, 0, chunk.length);
                    } else {
                        try { Thread.sleep(10); } catch (InterruptedException ignored) {}
                    }
                }
            });
            playerThread.start();
    
            // Create TTS client
            // To use the instruction control feature, replace the model with qwen3-tts-instruct-flash-realtime and uncomment the instructions in TTSRealtimeClient.java
            TTSRealtimeClient client = new TTSRealtimeClient(
                URL, API_KEY, "Cherry",
                TTSRealtimeClient.SessionMode.SERVER_COMMIT,
                audioData -> {
                    playbackQueue.add(audioData);
                    audioChunks.add(audioData);
                    System.out.println("Received audio data: " + audioData.length + " bytes");
                }
            );
    
            client.connect();
    
            // Send text fragments
            String[] textFragments = {
                "Alibaba Cloud's large language model platform, Model Studio, is an all-in-one platform for developing and building large language model applications.",
                "Both developers and business users can deeply participate in the design and development of large language model applications.",
                "You can develop a large language model application in five minutes using a simple interface,",
                "or train a dedicated model in a few hours, allowing you to focus more energy on application innovation."
            };
    
            System.out.println("Sending text...");
            for (String text : textFragments) {
                System.out.println("Sending fragment: " + text);
                client.appendText(text);
                Thread.sleep(100);
            }
    
            Thread.sleep(1000);
            client.finishSession();
    
            // Wait for response to complete
            client.waitForResponseDone();
            client.waitForSessionFinished();
            client.close();
    
            // Wait for playback to complete
            playing.set(false);
            playerThread.join();
            audioLine.drain();
            audioLine.close();
    
            // Save audio file
            saveWav("output.wav");
            System.out.println("Done");
        }
    
        private static void saveWav(String filename) throws IOException {
            if (audioChunks.isEmpty()) {
                System.out.println("No audio data to save");
                return;
            }
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            for (byte[] chunk : audioChunks) {
                bos.write(chunk);
            }
            byte[] allAudio = bos.toByteArray();
            AudioFormat format = new AudioFormat(SAMPLE_RATE, 16, 1, true, false);
            AudioInputStream ais = new AudioInputStream(
                new ByteArrayInputStream(allAudio), format, allAudio.length / 2);
            new File("outputs").mkdirs();
            AudioSystem.write(ais, AudioFileFormat.Type.WAVE,
                new File("outputs/" + filename));
            System.out.println("Audio saved to: outputs/" + filename);
        }
    }

    Compile and run ServerCommit.java to hear the audio generated by the Realtime API in real time.

    commit mode

    Python

    In the same directory as tts_realtime_client.py, create another Python file named commit.py and copy the following code into it:

    import os
    import asyncio
    import logging
    import wave
    from tts_realtime_client import TTSRealtimeClient, SessionMode
    import pyaudio
    
    # QwenTTS service configuration
    # To use the instruction control feature, replace the model with qwen3-tts-instruct-flash-realtime and uncomment the instructions in tts_realtime_client.py
    # The following is the Singapore region URL. If you use a model in the Beijing region, replace the URL with: wss://dashscope.aliyuncs.com/api-ws/v1/realtime?model=qwen3-tts-flash-realtime
    URL = "wss://dashscope-intl.aliyuncs.com/api-ws/v1/realtime?model=qwen3-tts-flash-realtime"
    # The API Key differs between the Singapore and Beijing regions. Obtain an API Key: https://www.alibabacloud.com/help/zh/model-studio/get-api-key
    # If the environment variable is not configured, replace the following line with your Model Studio API Key: API_KEY="sk-xxx"
    API_KEY = os.getenv("DASHSCOPE_API_KEY")
    
    if not API_KEY:
        raise ValueError("Please set DASHSCOPE_API_KEY environment variable")
    
    # Collect audio data
    _audio_chunks = []
    _AUDIO_SAMPLE_RATE = 24000
    _audio_pyaudio = pyaudio.PyAudio()
    _audio_stream = None
    
    def _audio_callback(audio_bytes: bytes):
        """TTSRealtimeClient audio callback: real-time playback and caching"""
        global _audio_stream
        if _audio_stream is not None:
            try:
                _audio_stream.write(audio_bytes)
            except Exception as exc:
                logging.error(f"PyAudio playback error: {exc}")
        _audio_chunks.append(audio_bytes)
        logging.info(f"Received audio chunk: {len(audio_bytes)} bytes")
    
    def _save_audio_to_file(filename: str = "output.wav", sample_rate: int = 24000) -> bool:
        """Save collected audio data as a WAV file"""
        if not _audio_chunks:
            logging.warning("No audio data to save")
            return False
    
        try:
            audio_data = b"".join(_audio_chunks)
            with wave.open(filename, 'wb') as wav_file:
                wav_file.setnchannels(1)  # Mono
                wav_file.setsampwidth(2)  # 16-bit
                wav_file.setframerate(sample_rate)
                wav_file.writeframes(audio_data)
            logging.info(f"Audio saved to: {filename}")
            return True
        except Exception as exc:
            logging.error(f"Failed to save audio: {exc}")
            return False
    
    async def _user_input_loop(client: TTSRealtimeClient):
        """Continuously get user input and send text. When user enters empty text, send a commit event and end the session."""
        print("Enter text (press Enter directly to send a commit event and end the session, press Ctrl+C or Ctrl+D to exit the program):")
    
        while True:
            try:
                user_text = input("> ")
                if not user_text:  # User input is empty
                    # Empty input marks the end of a conversation: commit buffer -> end session -> exit loop
                    logging.info("Empty input, sending commit event and ending session")
                    await client.commit_text_buffer()
                    # Wait briefly for server to process commit, preventing premature session end that could lose audio
                    await asyncio.sleep(0.3)
                    await client.finish_session()
                    break  # Exit user input loop directly, no need to press Enter again
                else:
                    logging.info(f"Sending text: {user_text}")
                    await client.append_text(user_text)
    
            except EOFError:  # User pressed Ctrl+D
                break
            except KeyboardInterrupt:  # User pressed Ctrl+C
                break
    
        # End session
        logging.info("Ending session...")
    async def _run_demo():
        """Run the complete demo"""
        global _audio_stream
        # Open PyAudio output stream
        _audio_stream = _audio_pyaudio.open(
            format=pyaudio.paInt16,
            channels=1,
            rate=_AUDIO_SAMPLE_RATE,
            output=True,
            frames_per_buffer=1024
        )
    
        client = TTSRealtimeClient(
            base_url=URL,
            api_key=API_KEY,
            voice="Cherry",
            mode=SessionMode.COMMIT,  # Changed to COMMIT mode
            audio_callback=_audio_callback
        )
    
        # Establish connection
        await client.connect()
    
        # Run message handling and user input in parallel
        consumer_task = asyncio.create_task(client.handle_messages())
        producer_task = asyncio.create_task(_user_input_loop(client))
    
        await producer_task  # Wait for user input to complete
    
        # Wait for response.done
        await client.wait_for_response_done()
    
        # Close connection and cancel consumer task
        await client.close()
        consumer_task.cancel()
    
        # Close audio stream
        if _audio_stream is not None:
            _audio_stream.stop_stream()
            _audio_stream.close()
        _audio_pyaudio.terminate()
    
        # Save audio data
        os.makedirs("outputs", exist_ok=True)
        _save_audio_to_file(os.path.join("outputs", "qwen_tts_output.wav"))
    
    def main():
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s [%(levelname)s] %(message)s',
            datefmt='%Y-%m-%d %H:%M:%S'
        )
        logging.info("Starting QwenTTS Realtime Client demo…")
        asyncio.run(_run_demo())
    
    if __name__ == "__main__":
        main()

    Run commit.py. You can enter text multiple times. Press Enter without entering text to hear the audio returned by the Realtime API through your speakers.

    Java

    In the same directory as TTSRealtimeClient.java, create another Java file named Commit.java and copy the following code into it:

    import javax.sound.sampled.*;
    import java.io.*;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Scanner;
    import java.util.concurrent.ConcurrentLinkedQueue;
    import java.util.concurrent.atomic.AtomicBoolean;
    
    public class Commit {
        // The following is the Singapore region URL. If you use a model in the Beijing region, replace the URL with: wss://dashscope.aliyuncs.com/api-ws/v1/realtime?model=qwen3-tts-flash-realtime
        private static final String URL = "wss://dashscope-intl.aliyuncs.com/api-ws/v1/realtime?model=qwen3-tts-flash-realtime";
        // The API Key differs between the Singapore and Beijing regions. Obtain an API Key: https://www.alibabacloud.com/help/zh/model-studio/get-api-key
        // If the environment variable is not configured, replace the following line with your Model Studio API Key: private static final String API_KEY = "sk-xxx";
        private static final String API_KEY = System.getenv("DASHSCOPE_API_KEY");
        private static final int SAMPLE_RATE = 24000;
    
        private static final List<byte[]> audioChunks = new ArrayList<>();
        private static final ConcurrentLinkedQueue<byte[]> playbackQueue = new ConcurrentLinkedQueue<>();
        private static final AtomicBoolean playing = new AtomicBoolean(true);
    
        public static void main(String[] args) throws Exception {
            if (API_KEY == null || API_KEY.isEmpty()) {
                throw new IllegalStateException("Please set the DASHSCOPE_API_KEY environment variable");
            }
    
            // Initialize audio playback
            AudioFormat format = new AudioFormat(SAMPLE_RATE, 16, 1, true, false);
            DataLine.Info info = new DataLine.Info(SourceDataLine.class, format);
            SourceDataLine audioLine = (SourceDataLine) AudioSystem.getLine(info);
            audioLine.open(format);
            audioLine.start();
    
            // Start playback thread
            Thread playerThread = new Thread(() -> {
                while (playing.get() || !playbackQueue.isEmpty()) {
                    byte[] chunk = playbackQueue.poll();
                    if (chunk != null) {
                        audioLine.write(chunk, 0, chunk.length);
                    } else {
                        try { Thread.sleep(10); } catch (InterruptedException ignored) {}
                    }
                }
            });
            playerThread.start();
    
            // Create TTS client (commit mode)
            // To use the instruction control feature, replace the model with qwen3-tts-instruct-flash-realtime and uncomment the instructions in TTSRealtimeClient.java
            TTSRealtimeClient client = new TTSRealtimeClient(
                URL, API_KEY, "Cherry",
                TTSRealtimeClient.SessionMode.COMMIT,
                audioData -> {
                    playbackQueue.add(audioData);
                    audioChunks.add(audioData);
                    System.out.println("Received audio data: " + audioData.length + " bytes");
                }
            );
    
            client.connect();
    
            // Interactive input
            System.out.println("Enter text (press Enter directly to send a commit event and end the session, press Ctrl+D to exit the program):");
            Scanner scanner = new Scanner(System.in);
            while (true) {
                System.out.print("> ");
                if (!scanner.hasNextLine()) {
                    client.finishSession();
                    break;
                }
                String userText = scanner.nextLine();
                if (userText.isEmpty()) {
                    // Empty input: commit buffer and end session
                    System.out.println("Empty input, sending commit event and ending session");
                    client.commitTextBuffer();
                    Thread.sleep(300);
                    client.finishSession();
                    break;
                } else {
                    System.out.println("Sending text: " + userText);
                    client.appendText(userText);
                }
            }
            scanner.close();
    
            // Wait for response to complete
            client.waitForResponseDone();
            client.waitForSessionFinished();
            client.close();
    
            // Wait for playback to complete
            playing.set(false);
            playerThread.join();
            audioLine.drain();
            audioLine.close();
    
            // Save audio file
            saveWav("output.wav");
            System.out.println("Done");
        }
    
        private static void saveWav(String filename) throws IOException {
            if (audioChunks.isEmpty()) {
                System.out.println("No audio data to save");
                return;
            }
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            for (byte[] chunk : audioChunks) {
                bos.write(chunk);
            }
            byte[] allAudio = bos.toByteArray();
            AudioFormat format = new AudioFormat(SAMPLE_RATE, 16, 1, true, false);
            AudioInputStream ais = new AudioInputStream(
                new ByteArrayInputStream(allAudio), format, allAudio.length / 2);
            new File("outputs").mkdirs();
            AudioSystem.write(ais, AudioFileFormat.Type.WAVE,
                new File("outputs/" + filename));
            System.out.println("Audio saved to: outputs/" + filename);
        }
    }

    Compile and run Commit.java. You can enter text multiple times. Press Enter without entering text to hear the audio returned by the Realtime API through your speakers.

Sambert

Go

package main

import (
	"encoding/json"
	"fmt"
	"net/http"
	"os"
	"time"

	"github.com/google/uuid"
	"github.com/gorilla/websocket"
)

const (
	wsURL      = "wss://dashscope.aliyuncs.com/api-ws/v1/inference/" // WebSocket server address
	outputFile = "output.mp3"                                        // Output file path
)

func main() {
	// If the API Key is not configured in environment variables, replace the following line with: apiKey := "your_api_key". It is not recommended to hard-code the API Key directly in production code to reduce the risk of API Key leakage.
	apiKey := os.Getenv("DASHSCOPE_API_KEY")

	// Check and clear the output file
	if err := clearOutputFile(outputFile); err != nil {
		fmt.Println("Failed to clear output file:", err)
		return
	}

	// Connect to the WebSocket service
	conn, err := connectWebSocket(apiKey)
	if err != nil {
		fmt.Println("Failed to connect to WebSocket:", err)
		return
	}
	defer closeConnection(conn)

	// Create a channel for receiving task completion notification
	done := make(chan struct{})

	// Start an async goroutine for receiving messages
	go receiveMessage(conn, done)

	// Send run-task command
	if err := sendRunTaskMsg(conn); err != nil {
		fmt.Println("Failed to send run-task command:", err)
		return
	}

	// Wait for task completion or timeout
	select {
	case <-done:
		fmt.Println("Task ended")
	case <-time.After(5 * time.Minute):
		fmt.Println("Task timed out")
	}
}

// Message structure definition
type Message struct {
	Header  Header  `json:"header"`
	Payload Payload `json:"payload"`
}

// Header definition
type Header struct {
	Action       string                 `json:"action,omitempty"`
	TaskID       string                 `json:"task_id"`
	Streaming    string                 `json:"streaming,omitempty"`
	Event        string                 `json:"event,omitempty"`
	ErrorCode    string                 `json:"error_code,omitempty"`
	ErrorMessage string                 `json:"error_message,omitempty"`
	Attributes   map[string]interface{} `json:"attributes"`
}

// Payload definition
type Payload struct {
	Model      string     `json:"model,omitempty"`
	TaskGroup  string     `json:"task_group,omitempty"`
	Task       string     `json:"task,omitempty"`
	Function   string     `json:"function,omitempty"`
	Input      Input      `json:"input,omitempty"`
	Parameters Parameters `json:"parameters,omitempty"`
	Output     Output     `json:"output,omitempty"`
	Usage      Usage      `json:"usage,omitempty"`
}

// Input definition
type Input struct {
	Text string `json:"text"`
}

// Parameters definition
type Parameters struct {
	TextType                string  `json:"text_type"`
	Format                  string  `json:"format"`
	SampleRate              int     `json:"sample_rate"`
	Volume                  int     `json:"volume"`
	Rate                    float64 `json:"rate"`
	Pitch                   float64 `json:"pitch"`
	WordTimestampEnabled    bool    `json:"word_timestamp_enabled"`
	PhonemeTimestampEnabled bool    `json:"phoneme_timestamp_enabled"`
}

// Output definition
type Output struct {
	Sentence Sentence `json:"sentence"`
}

// Sentence definition
type Sentence struct {
	BeginTime int    `json:"begin_time"`
	EndTime   int    `json:"end_time"`
	Words     []Word `json:"words"`
}

// Word definition
type Word struct {
	Text      string    `json:"text"`
	BeginTime int       `json:"begin_time"`
	EndTime   int       `json:"end_time"`
	Phonemes  []Phoneme `json:"phonemes"`
}

// Phoneme definition
type Phoneme struct {
	BeginTime int    `json:"begin_time"`
	EndTime   int    `json:"end_time"`
	Text      string `json:"text"`
	Tone      int    `json:"tone"`
}

// Usage definition
type Usage struct {
	Characters int `json:"characters"`
}

func receiveMessage(conn *websocket.Conn, done chan struct{}) {
	for {
		msgType, message, err := conn.ReadMessage()
		if err != nil {
			fmt.Println("Failed to parse server message:", err)
			close(done)
			break
		}

		if msgType == websocket.BinaryMessage {
			// Process binary audio stream
			if err := writeBinaryDataToFile(message, outputFile); err != nil {
				fmt.Println("Failed to write binary data:", err)
				close(done)
				break
			}
			fmt.Println("Audio chunk written to local file")
		} else {
			// Process text messages
			var msg Message
			if err := json.Unmarshal(message, &msg); err != nil {
				fmt.Println("Failed to parse event:", err)
				continue
			}
			if handleMessage(conn, msg, done) {
				break
			}
		}
	}
}

func handleMessage(conn *websocket.Conn, msg Message, done chan struct{}) bool {
	switch msg.Header.Event {
	case "task-started":
		fmt.Println("Task started")

	case "result-generated":
	// Add code here if you need to retrieve additional messages

	case "task-finished":
		fmt.Println("Task completed")
		close(done)
		return true

	case "task-failed":
		if msg.Header.ErrorMessage != "" {
			fmt.Printf("Task failed: %s\n", msg.Header.ErrorMessage)
		} else {
			fmt.Println("Task failed due to unknown reason")
		}
		close(done)
		return true

	default:
		fmt.Printf("Unexpected event: %v\n", msg)
		close(done)
	}

	return false
}

func sendRunTaskMsg(conn *websocket.Conn) error {
	runTaskMsg, err := generateRunTaskMsg()
	if err != nil {
		return err
	}
	if err := conn.WriteMessage(websocket.TextMessage, []byte(runTaskMsg)); err != nil {
		return err
	}
	return nil
}

func generateRunTaskMsg() (string, error) {
	runTaskMessage := Message{
		Header: Header{
			Action:    "run-task",
			TaskID:    uuid.New().String(),
			Streaming: "out",
		},
		Payload: Payload{
			Model:     "sambert-zhichu-v1",
			TaskGroup: "audio",
			Task:      "tts",
			Function:  "SpeechSynthesizer",
			Input: Input{
				Text: "The white sun sets behind the mountains, and the Yellow River flows into the sea. To see a thousand miles further, you must climb one more story.",
			},
			Parameters: Parameters{
				TextType:                "PlainText",
				Format:                  "mp3",
				SampleRate:              16000,
				Volume:                  50,
				Rate:                    1.0,
				Pitch:                   1.0,
				WordTimestampEnabled:    true,
				PhonemeTimestampEnabled: true,
			},
		},
	}

	runTaskMsgJSON, err := json.Marshal(runTaskMessage)
	return string(runTaskMsgJSON), err
}

func connectWebSocket(apiKey string) (*websocket.Conn, error) {
	header := make(http.Header)
	header.Add("X-DashScope-DataInspection", "enable")
	header.Add("Authorization", fmt.Sprintf("bearer %s", apiKey))
	conn, _, err := websocket.DefaultDialer.Dial(wsURL, header)
	if err != nil {
		fmt.Println("Failed to connect to WebSocket:", err)
		return nil, err
	}
	return conn, nil
}

func writeBinaryDataToFile(data []byte, filePath string) error {
	file, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
	if err != nil {
		return err
	}
	defer file.Close()
	_, err = file.Write(data)
	return err
}

func closeConnection(conn *websocket.Conn) {
	if conn != nil {
		conn.Close()
	}
}

func clearOutputFile(filePath string) error {
	file, err := os.OpenFile(filePath, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
	if err != nil {
		return err
	}
	file.Close()
	return nil
}

C#

Example code:

using System.Net.WebSockets;
using System.Text;
using System.Text.Json;

class Program {
    // If the API Key is not configured in environment variables, replace the following line with: private const string ApiKey="your_api_key". It is not recommended to hard-code the API Key directly in production code to reduce the risk of API Key leakage.
    private static readonly string ApiKey = Environment.GetEnvironmentVariable("DASHSCOPE_API_KEY") ?? throw new InvalidOperationException("DASHSCOPE_API_KEY environment variable is not set.");

    private const string WebSocketUrl = "wss://dashscope.aliyuncs.com/api-ws/v1/inference/"; // WebSocket server address
    private const string OutputFilePath = "output.mp3"; // Output file path

    static async Task Main(string[] args) {
        var ws = new ClientWebSocket();
        try {
            // 1. Connect to the WebSocket service and authenticate
            await ConnectWithAuth(ws, WebSocketUrl);

            // 2. Start the message receiving thread
            var receiveTask = ReceiveMessages(ws);

            // 3. Send run-task command
            string textToSynthesize = "The white sun sets behind the mountains, and the Yellow River flows into the sea. To see a thousand miles further, you must climb one more story.";
            string taskId = GenerateTaskId();
            await SendRunTaskCommand(ws, textToSynthesize, taskId);

            // 4. Wait for the receiving task to complete
            await receiveTask;
        } catch (Exception ex) {
            Console.WriteLine($"Error: {ex.Message}");
        } finally {
            if (ws.State == WebSocketState.Open) {
                await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing connection", CancellationToken.None);
            }
        }
    }

    private static async Task ConnectWithAuth(ClientWebSocket ws, string url) {
        var uri = new Uri(url);
        ws.Options.SetRequestHeader("Authorization", $"bearer {ApiKey}");
        ws.Options.SetRequestHeader("X-DashScope-DataInspection", "enable");
        await ws.ConnectAsync(uri, CancellationToken.None);
        Console.WriteLine("Connected to WebSocket server.");
    }

    private static string GenerateTaskId() {
        return Guid.NewGuid().ToString("N");
    }

    private static async Task SendRunTaskCommand(ClientWebSocket ws, string text, string taskId) {
        var command = CreateRunTaskCommand(text, taskId);
        var buffer = Encoding.UTF8.GetBytes(command);
        await ws.SendAsync(new ArraySegment<byte>(buffer), WebSocketMessageType.Text, true, CancellationToken.None);
        Console.WriteLine("run-task command sent.");
    }

    private static string CreateRunTaskCommand(string text, string taskId) {
        var command = new {
            header = new {
                action = "run-task",
                task_id = taskId,
                streaming = "out"
            },
            payload = new {
                model = "sambert-zhichu-v1",
                task_group = "audio",
                task = "tts",
                function = "SpeechSynthesizer",
                input = new {
                    text = text
                },
                parameters = new {
                    text_type = "PlainText",
                    format = "mp3",
                    sample_rate = 16000,
                    volume = 50,
                    rate = 1,
                    pitch = 1,
                    word_timestamp_enabled = true,
                    phoneme_timestamp_enabled = true
                }
            }
        };
        return JsonSerializer.Serialize(command);
    }

    private static async Task ReceiveMessages(ClientWebSocket ws) {
        var buffer = new byte[1024 * 4];
        var fs = new FileStream(OutputFilePath, FileMode.Create, FileAccess.Write);
        bool taskStarted = false;
        bool taskFinished = false;

        while (ws.State == WebSocketState.Open && !taskFinished) {
            var result = await ws.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);

            switch (result.MessageType) {
                case WebSocketMessageType.Text:
                    var message = Encoding.UTF8.GetString(buffer, 0, result.Count);
                    var jsonMessage = JsonSerializer.Deserialize<JsonElement>(message);

                    ProcessTextMessage(jsonMessage, ref taskStarted, ref taskFinished);
                    break;
                case WebSocketMessageType.Binary:
                    if (taskStarted) {
                        await fs.WriteAsync(buffer, 0, result.Count);
                        Console.WriteLine("Received audio data.");
                    }
                    break;
                case WebSocketMessageType.Close:
                    Console.WriteLine("Server closed the connection.");
                    taskFinished = true;
                    break;
            }
        }
        fs.Close();
    }

    private static void ProcessTextMessage(JsonElement jsonMessage, ref bool taskStarted, ref bool taskFinished) {
        if (jsonMessage.TryGetProperty("header", out JsonElement header) && header.TryGetProperty("event", out JsonElement eventToken)) {
            var eventType = eventToken.GetString();
            switch (eventType) {
                case "task-started":
                    taskStarted = true;
                    Console.WriteLine("Task started.");
                    break;
                case "result-generated":
                    // Add code here if you need to retrieve additional messages
                    break;
                case "task-finished":
                    taskFinished = true;
                    Console.WriteLine("Task completed.");
                    break;
                case "task-failed":
                    taskFinished = true;
                    Console.WriteLine("Task failed.");
                    break;
            }
        }
    }
}

PHP

The example code directory structure:

my-php-project/

├── composer.json

├── vendor/

└── index.php

The composer.json contents are as follows. Determine the appropriate dependency versions based on your requirements:

{
    "require": {
        "react/event-loop": "^1.3",
        "react/socket": "^1.11",
        "react/stream": "^1.2",
        "react/http": "^1.1",
        "ratchet/pawl": "^0.4"
    },
    "autoload": {
        "psr-4": {
            "App\\": "src/"
        }
    }
}

The index.php contents:

<?php

require 'vendor/autoload.php';

use Ratchet\Client\Connector;
use React\EventLoop\Loop;
use React\Socket\Connector as SocketConnector;

# If the API Key is not configured in environment variables, replace the following line with: $api_key="your_api_key". It is not recommended to hard-code the API Key directly in production code to reduce the risk of API Key leakage.
$api_key = getenv("DASHSCOPE_API_KEY");
$websocket_url = 'wss://dashscope.aliyuncs.com/api-ws/v1/inference/'; // WebSocket server address
$output_file = 'output.mp3'; // Output file path

$loop = Loop::get();

if (file_exists($output_file)) {
    // Clear file content
    file_put_contents($output_file, '');
    echo "File cleared\n";
}

// Create a custom connector
$socketConnector = new SocketConnector($loop, [
    'tcp' => [
        'bindto' => '0.0.0.0:0',
    ],
    'tls' => [
        'verify_peer' => false,
        'verify_peer_name' => false,
    ],
]);

$connector = new Connector($loop, $socketConnector);

$headers = [
    'Authorization' => 'bearer ' . $api_key,
    'X-DashScope-DataInspection' => 'enable'
];

// Connect to the WebSocket service
$connector($websocket_url, [], $headers)
    ->then(function ($conn) use ($output_file) {
        echo "Connection established\n";

        // Receive WebSocket messages asynchronously
        $conn->on('message', function ($msg) use ($conn, $output_file) {
            if ($msg->isBinary()) {
                // Write binary data to local file
                file_put_contents($output_file, $msg->getPayload(), FILE_APPEND);
                echo "Binary data written to file\n";
            } else {
                $data = json_decode($msg, true);
                switch ($data['header']['event']) {
                    case 'task-started':
                        echo "Task started\n";
                        break;
                    case 'result-generated':
                        // Add code here if you need to retrieve additional messages
                        break;
                    case 'task-finished':
                        echo "Task completed\n";
                        $conn->close();
                        break;
                    case 'task-failed':
                        echo "Task failed: " . $data['header']['error_message'] . "\n";
                        $conn->close();
                        break;
                    default:
                        echo "Unknown event: " . $msg . "\n";
                }
            }
        });

        // Listen for connection close
        $conn->on('close', function($code = null, $reason = null) {
            echo "Connection closed\n";
            if ($code !== null) {
                echo "Close code: " . $code . "\n";
            }
            if ($reason !== null) {
                echo "Close reason: " . $reason . "\n";
            }
        });

        // Send run-task command
        $conn->send(json_encode([
            'header' => [
                'action' => 'run-task',
                'task_id' => bin2hex(random_bytes(16)),
                'streaming' => 'out'
            ],
            'payload' => [
                'model' => 'sambert-zhichu-v1',
                'task_group' => 'audio',
                'task' => 'tts',
                'function' => 'SpeechSynthesizer',
                'input' => [
                    'text' => 'Bright moonlight before my bed, I wonder if it is frost on the ground. I raise my head to watch the bright moon, and lower my head to think of my hometown.'
                ],
                'parameters' => [
                    'text_type' => 'PlainText',
                    'format' => 'mp3',
                    'sample_rate' => 16000,
                    'volume' => 50,
                    'rate' => 1,
                    'pitch' => 1,
                    'word_timestamp_enabled' => true,
                    'phoneme_timestamp_enabled' => true
                ]
            ]
        ]));
        echo "run-task command sent\n";
    }, function (Exception $e) {
        echo "Connection failed: {$e->getMessage()}\n";
        file_put_contents('error.log', $e->getMessage() . "\n", FILE_APPEND);
    });

$loop->run();

Node.js

Install the required dependencies:

npm install ws
npm install uuid

Example code:

const WebSocket = require('ws');
const fs = require('fs');
const { v4: uuidv4 } = require('uuid');

// If the API Key is not configured in environment variables, replace the following line with: apiKey = 'your_api_key'. It is not recommended to hard-code the API Key directly in production code to reduce the risk of API Key leakage.
const apiKey = process.env.DASHSCOPE_API_KEY;
const wsUrl = 'wss://dashscope.aliyuncs.com/api-ws/v1/inference/'; // WebSocket server address
const outputFilePath = 'output.mp3'; // Replace with your audio file path

async function main() {
  await checkAndClearOutputFile(outputFilePath);
  createWebSocketConnection();
}

const fileStream = fs.createWriteStream(outputFilePath, { flags: 'a' });
function createWebSocketConnection() {
  const ws = new WebSocket(wsUrl, {
    headers: {
      Authorization: `bearer ${apiKey}`,
      'X-DashScope-DataInspection': 'enable'
    }
  });

  ws.on('open', () => {
    console.log('Connected to WebSocket server');
    sendRunTaskMessage(ws);
  });

  ws.on('message', (data, isBinary) => handleWebSocketMessage(data, isBinary, ws));
  ws.on('error', (error) => console.error('WebSocket error:', error));
  ws.on('close', () => console.log('WebSocket connection closed'));

  return ws;
}

function sendRunTaskMessage(ws) {
  const taskId = uuidv4();
  const runTaskMessage = {
    header: {
      action: 'run-task',
      task_id: taskId,
      streaming: 'out'
    },
    payload: {
      model: 'sambert-zhichu-v1',
      task_group: 'audio',
      task: 'tts',
      function: 'SpeechSynthesizer',
      input: {
        text: 'The white sun sets behind the mountains, and the Yellow River flows into the sea. To see a thousand miles further, you must climb one more story.'
      },
      parameters: {
        text_type: 'PlainText',
        format: 'mp3',
        sample_rate: 16000,
        volume: 50,
        rate: 1,
        pitch: 1,
        word_timestamp_enabled: true,
        phoneme_timestamp_enabled: true
      }
    }
  };
  ws.send(JSON.stringify(runTaskMessage));
  console.log('run-task command sent');
}

function handleWebSocketMessage(data, isBinary, ws) {
  if (isBinary) {
    fileStream.write(data);
  } else {
    const message = JSON.parse(data);
    handleWebSocketEvent(message, ws);
  }
}

function handleWebSocketEvent(message, ws) {
  switch (message.header.event) {
    case 'task-started':
      console.log('Task started');
      break;
    case 'result-generated':
      console.log('Result generated');
      break;
    case 'task-finished':
      console.log('Task completed');
      ws.close();
      fileStream.end(() => {
        console.log('File stream closed');
      });
      break;
    case 'task-failed':
      console.error('Task failed:', message.header.error_message);
      ws.close();
      fileStream.end(() => {
        console.log('File stream closed');
      });
      break;
    default:
      console.log('Unknown event:', message.header.event);
  }
}

function checkAndClearOutputFile(filePath) {
  return new Promise((resolve, reject) => {
    fs.access(filePath, fs.F_OK, (err) => {
      if (!err) {
        fs.truncate(filePath, 0, (truncateErr) => {
          if (truncateErr) return reject(truncateErr);
          console.log('File cleared');
          resolve();
        });
      } else {
        fs.open(filePath, 'w', (openErr) => {
          if (openErr) return reject(openErr);
          console.log('File created');
          resolve();
        });
      }
    });
  });
}

main().catch(console.error);

Connection reuse

WebSocket services support connection reuse to improve resource efficiency and avoid the overhead of repeatedly establishing connections. After a synthesis task completes, the WebSocket connection can be reused for the next task without reconnecting.

Reuse flow:

  • CosyVoice / Sambert: The client sends finish-task (CosyVoice) or after the task completes, the server returns a task-finished event to end the task. The client can then send a new run-task event to start the next task.

  • Qwen-TTS: After the client sends session.finish, the server returns a session.finished event to end the session. The client can then establish a new session for the next synthesis task.

Important
  1. Wait for the server to return the completion event (task-finished or session.finished) before starting a new task.

  2. CosyVoice and Sambert require a different task_id for each task on a reused connection.

  3. If a failure occurs during task execution, the server returns an error event and closes the connection. The connection can't be reused after this.

  4. If no new task starts within 60 seconds after a task ends, the connection times out and disconnects automatically.

For details on events for each model, see the CosyVoice API reference, Qwen-TTS API reference.

High-concurrency best practices

In high-concurrency scenarios, creating and destroying a WebSocket connection for each request produces significant overhead. The DashScope SDK includes built-in connection pooling and object pooling to reuse connections and objects, significantly reducing latency and resource consumption.

View high-concurrency best practices

CosyVoice

Prerequisites

Python SDK: object pool optimization

The Python SDK uses SpeechSynthesizerObjectPool to implement object pool optimization, managing and reusing SpeechSynthesizer objects.

The object pool creates the specified number of SpeechSynthesizer instances at initialization and establishes WebSocket connections in advance. Objects obtained from the pool can start requests immediately without waiting for connection setup, reducing time to first audio. When a task completes and the object is returned to the pool, its WebSocket connection remains active for the next task.

Implementation steps

  1. Install the dependency: Install the DashScope package (pip install -U dashscope).

  2. Create and configure the object pool

    Set the object pool size through SpeechSynthesizerObjectPool. Recommended value: 1.5 to 2 times your peak concurrency. The object pool size must not exceed your account's QPS (queries per second) limit.

    Create a global singleton fixed-size object pool with the following code. The pool creates the specified number of SpeechSynthesizer objects and establishes WebSocket connections at initialization, so this takes some time.

    from dashscope.audio.tts_v2 import SpeechSynthesizerObjectPool
    
    synthesizer_object_pool = SpeechSynthesizerObjectPool(max_size=20)
  3. Borrow a SpeechSynthesizer object from the pool

    If the number of unreturned objects exceeds the pool's maximum capacity, the system creates an additional SpeechSynthesizer object.

    Such newly created objects require re-initialization and a new WebSocket connection, and can't leverage existing pool connections. They don't provide the reuse benefit.

    speech_synthesizer = connectionPool.borrow_synthesizer(
        model='cosyvoice-v3-flash',
        voice='longanyang',
        seed=12382,
        callback=synthesizer_callback
    )
  4. Perform speech synthesis

    Call the SpeechSynthesizer object's call or streaming_call method to perform speech synthesis.

  5. Return the SpeechSynthesizer object

    After the synthesis task completes, return the SpeechSynthesizer object so subsequent tasks can reuse it.

    Don't return objects with incomplete or failed tasks.

    connectionPool.return_synthesizer(speech_synthesizer)

Complete code

# !/usr/bin/env python3
# Copyright (C) Alibaba Group. All Rights Reserved.
# MIT License (https://opensource.org/licenses/MIT)

import os
import time
import threading

import dashscope
from dashscope.audio.tts_v2 import *

USE_CONNECTION_POOL = True
text_to_synthesize = [
    'First sentence: Welcome to Alibaba Cloud speech synthesis.',
    'Second sentence: Welcome to Alibaba Cloud speech synthesis.',
    'Third sentence: Welcome to Alibaba Cloud speech synthesis.',
]
connectionPool = None
if USE_CONNECTION_POOL:
    print('creating connection pool')
    start_time = time.time() * 1000
    connectionPool = SpeechSynthesizerObjectPool(max_size=3)
    end_time = time.time() * 1000
    print('connection pool created, cost: {} ms'.format(end_time - start_time))

def init_dashscope_api_key():
    '''
    Set your DashScope API-key. More information:
    https://github.com/aliyun/alibabacloud-bailian-speech-demo/blob/master/PREREQUISITES.md
    '''
    # The API Key differs between the Singapore and Beijing regions. Obtain an API Key: https://www.alibabacloud.com/help/zh/model-studio/get-api-key
    if 'DASHSCOPE_API_KEY' in os.environ:
        dashscope.api_key = os.environ[
            'DASHSCOPE_API_KEY']  # load API-key from environment variable DASHSCOPE_API_KEY
    else:
        dashscope.api_key = '<your-dashscope-api-key>'  # set API-key manually

def synthesis_text_to_speech_and_play_by_streaming_mode(text, task_id):
    global USE_CONNECTION_POOL, connectionPool
    '''
    Synthesize speech with given text by streaming mode, async call and play the synthesized audio in real-time.
    for more information, please refer to https://www.alibabacloud.com/help/document_detail/2712523.html
    '''

    complete_event = threading.Event()

    # Define a callback to handle the result

    class Callback(ResultCallback):
        def on_open(self):
            # when using object pool, on_open will be called after task start
            self.file = open(f'result_{task_id}.mp3', 'wb')
            print(f'[task_{task_id}] start')

        def on_complete(self):
            print(f'[task_{task_id}] speech synthesis task complete successfully.')
            complete_event.set()

        def on_error(self, message: str):
            print(f'[task_{task_id}] speech synthesis task failed, {message}')

        def on_close(self):
            # when using object pool, on_open will be called after task finished
            print(f'[task_{task_id}] finished')

        def on_event(self, message):
            # print(f'recv speech synthsis message {message}')
            pass

        def on_data(self, data: bytes) -> None:
            # send to player
            # save audio to file
            self.file.write(data)

    # Call the speech synthesizer callback
    synthesizer_callback = Callback()

    # Initialize the speech synthesizer
    # you can customize the synthesis parameters, like voice, format, sample_rate or other parameters
    if USE_CONNECTION_POOL:
        speech_synthesizer = connectionPool.borrow_synthesizer(
            model='cosyvoice-v3-flash',
            voice='longanyang',
            seed=12382,
            callback=synthesizer_callback
        )
    else:
        speech_synthesizer = SpeechSynthesizer(model='cosyvoice-v3-flash',
                                               voice='longanyang',
                                               seed=12382,
                                               callback=synthesizer_callback)
    try:
        speech_synthesizer.call(text)
    except Exception as e:
        print(f'[task_{task_id}] speech synthesis task failed, {e}')
        if USE_CONNECTION_POOL:
            # close the synthesizer connection manually if task failed when using connection pool.
            speech_synthesizer.close()
        return

    print('[task_{}] Synthesized text: {}'.format(task_id, text))
    complete_event.wait()
    print('[task_{}][Metric] requestId: {}, first package delay ms: {}'.format(
        task_id,
        speech_synthesizer.get_last_request_id(),
        speech_synthesizer.get_first_package_delay()))
    if USE_CONNECTION_POOL:
        connectionPool.return_synthesizer(speech_synthesizer)

# main function
if __name__ == '__main__':
    # The following is the Singapore region URL. If you use a model in the Beijing region, replace the URL with: wss://dashscope.aliyuncs.com/api-ws/v1/inference
    dashscope.base_websocket_api_url='wss://dashscope-intl.aliyuncs.com/api-ws/v1/inference'
    init_dashscope_api_key()
    task_thread_list = []
    for task_id in range(3):
        thread = threading.Thread(
            target=synthesis_text_to_speech_and_play_by_streaming_mode,
            args=(text_to_synthesize[task_id], task_id))
        task_thread_list.append(thread)

    for task_thread in task_thread_list:
        task_thread.start()

    for task_thread in task_thread_list:
        task_thread.join()

    if USE_CONNECTION_POOL:
        connectionPool.shutdown()

Resource management and error handling

  • Task success: When a synthesis task completes normally, call connectionPool.return_synthesizer(speech_synthesizer) to return the SpeechSynthesizer object to the pool for reuse.

    Important

    Don't return SpeechSynthesizer objects with incomplete or failed tasks.

  • Task failure: When an SDK internal error or business logic exception interrupts the task, close the underlying WebSocket connection: speech_synthesizer.close()

  • After all synthesis tasks complete, shut down the object pool: connectionPool.shutdown().

  • When the service returns a TaskFailed error, no additional handling is required.

Java SDK: connection pool and object pool optimization

The Java SDK uses a built-in connection pool and a custom object pool working together for optimal performance.

  • Connection pool: The OkHttp3 connection pool integrated within the SDK manages and reuses underlying WebSocket connections, reducing network handshake overhead. This feature is enabled by default.

  • Object pool: Built on commons-pool2, the object pool maintains a set of pre-connected SpeechSynthesizer objects. Borrowing objects from the pool eliminates connection setup latency, significantly reducing time to first audio.

Implementation steps

  1. Add dependencies

    Add dashscope-sdk-java and commons-pool2 to your dependency configuration based on your build tool.

    Maven and Gradle examples:

    Maven

    1. Open your Maven project's pom.xml file.

    2. Add the following dependencies inside the <dependencies> tag.

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>dashscope-sdk-java</artifactId>
        <!-- Replace 'the-latest-version' with version 2.16.9 or later. Check available versions at: https://mvnrepository.com/artifact/com.alibaba/dashscope-sdk-java -->
        <version>the-latest-version</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-pool2</artifactId>
        <!-- Replace 'the-latest-version' with the latest version. Check available versions at: https://mvnrepository.com/artifact/org.apache.commons/commons-pool2 -->
        <version>the-latest-version</version>
    </dependency>
    1. Save the pom.xml file.

    2. Run a Maven command (such as mvn clean install or mvn compile) to update the project dependencies.

    Gradle

    1. Open your Gradle project's build.gradle file.

    2. Add the following dependencies inside the dependencies block.

      dependencies {
          // Replace 'the-latest-version' with version 2.16.6 or later. Check available versions at: https://mvnrepository.com/artifact/com.alibaba/dashscope-sdk-java
          implementation group: 'com.alibaba', name: 'dashscope-sdk-java', version: 'the-latest-version'
          
          // Replace 'the-latest-version' with the latest version. Check available versions at: https://mvnrepository.com/artifact/org.apache.commons/commons-pool2
          implementation group: 'org.apache.commons', name: 'commons-pool2', version: 'the-latest-version'
      }
    3. Save the build.gradle file.

    4. In the terminal, navigate to your project root directory and run the following Gradle command to update the project dependencies.

      ./gradlew build --refresh-dependencies

      On Windows, run:

      gradlew build --refresh-dependencies
  2. Configure the connection pool

    Configure the connection pool parameters through environment variables:

    Environment variable

    Description

    DASHSCOPE_CONNECTION_POOL_SIZE

    Connection pool size.

    Recommended value: at least 2 times your peak concurrency.

    Default: 32.

    DASHSCOPE_MAXIMUM_ASYNC_REQUESTS

    Maximum number of async requests.

    Recommended value: same as DASHSCOPE_CONNECTION_POOL_SIZE.

    Default: 32.

    DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST

    Maximum number of async requests per host.

    Recommended value: same as DASHSCOPE_CONNECTION_POOL_SIZE.

    Default: 32.

  3. Configure the object pool

    Configure the object pool size through environment variables:

    Environment variable

    Description

    COSYVOICE_OBJECTPOOL_SIZE

    Object pool size.

    Recommended value: 1.5 to 2 times your peak concurrency.

    Default: 500.

    Important
    • The object pool size (COSYVOICE_OBJECTPOOL_SIZE) must be less than or equal to the connection pool size (DASHSCOPE_CONNECTION_POOL_SIZE). Otherwise, when the object pool requests objects and the connection pool is full, calling threads block while waiting for available connections.

    • The object pool size must not exceed your account's QPS (queries per second) limit.

    Create the object pool with the following code:

    class CosyvoiceObjectPool {
        // ...Other code omitted here. See the complete example for the full code.
        public static GenericObjectPool<SpeechSynthesizer> getInstance() {
            lock.lock();
            if (synthesizerPool == null) {
                // You can set the object pool size here, or in the COSYVOICE_OBJECTPOOL_SIZE environment variable.
                // It is recommended to set it to 1.5 to 2 times your server's maximum concurrent connections.
                int objectPoolSize = getObjectivePoolSize();
                SpeechSynthesizerObjectFactory speechSynthesizerObjectFactory =
                        new SpeechSynthesizerObjectFactory();
                GenericObjectPoolConfig<SpeechSynthesizer> config =
                        new GenericObjectPoolConfig<>();
                config.setMaxTotal(objectPoolSize);
                config.setMaxIdle(objectPoolSize);
                config.setMinIdle(objectPoolSize);
                synthesizerPool =
                        new GenericObjectPool<>(speechSynthesizerObjectFactory, config);
            }
            lock.unlock();
            return synthesizerPool;
        }
    }
  4. Borrow a SpeechSynthesizer object from the pool

    If the number of unreturned objects exceeds the pool's maximum capacity, the system creates an additional SpeechSynthesizer object.

    Such newly created objects require re-initialization and a new WebSocket connection, and can't leverage existing pool connections. They don't provide the reuse benefit.

    synthesizer = CosyvoiceObjectPool.getInstance().borrowObject();
  5. Perform speech synthesis

    Call the SpeechSynthesizer object's call or streamingCall method to perform speech synthesis.

  6. Return the SpeechSynthesizer object

    After the synthesis task completes, return the SpeechSynthesizer object so subsequent tasks can reuse it.

    Don't return objects with incomplete or failed tasks.

    CosyvoiceObjectPool.getInstance().returnObject(synthesizer);

Complete code

import com.alibaba.dashscope.audio.tts.SpeechSynthesisResult;
import com.alibaba.dashscope.audio.ttsv2.SpeechSynthesisAudioFormat;
import com.alibaba.dashscope.audio.ttsv2.SpeechSynthesisParam;
import com.alibaba.dashscope.audio.ttsv2.SpeechSynthesizer;
import com.alibaba.dashscope.common.ResultCallback;
import com.alibaba.dashscope.exception.NoApiKeyException;
import com.alibaba.dashscope.utils.Constants;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import java.time.LocalDateTime;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;

/**
 * You need to include org.apache.commons.pool2 and DashScope-related packages in your project.
 *
 * DashScope SDK 2.16.6 and later versions are optimized for high-concurrency scenarios.
 * Versions prior to DashScope SDK 2.16.6 are not recommended for high-concurrency use.
 *
 *
 * Before making high-concurrency calls to the TTS service,
 * please configure the connection pool parameters through the following environment variables.
 *
 * DASHSCOPE_MAXIMUM_ASYNC_REQUESTS
 * DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST
 * DASHSCOPE_CONNECTION_POOL_SIZE
 *
 */

class SpeechSynthesizerObjectFactory
        extends BasePooledObjectFactory<SpeechSynthesizer> {
    public SpeechSynthesizerObjectFactory() {
        super();
    }
    @Override
    public SpeechSynthesizer create() throws Exception {
        return new SpeechSynthesizer();
    }

    @Override
    public PooledObject<SpeechSynthesizer> wrap(SpeechSynthesizer obj) {
        return new DefaultPooledObject<>(obj);
    }
}

class CosyvoiceObjectPool {
    public static GenericObjectPool<SpeechSynthesizer> synthesizerPool;
    public static String COSYVOICE_OBJECTPOOL_SIZE_ENV = "COSYVOICE_OBJECTPOOL_SIZE";
    public static int DEFAULT_OBJECT_POOL_SIZE = 500;
    private static Lock lock = new java.util.concurrent.locks.ReentrantLock();
    public static int getObjectivePoolSize() {
        try {
            Integer n = Integer.parseInt(System.getenv(COSYVOICE_OBJECTPOOL_SIZE_ENV));
            System.out.println("Using Object Pool Size In Env: "+ n);
            return n;
        } catch (NumberFormatException e) {
            System.out.println("Using Default Object Pool Size: "+ DEFAULT_OBJECT_POOL_SIZE);
            return DEFAULT_OBJECT_POOL_SIZE;
        }
    }
    public static GenericObjectPool<SpeechSynthesizer> getInstance() {
        lock.lock();
        if (synthesizerPool == null) {
            // You can set the object pool size here, or in the COSYVOICE_OBJECTPOOL_SIZE environment variable.
            // It is recommended to set it to 1.5 to 2 times your server's maximum concurrent connections.
            int objectPoolSize = getObjectivePoolSize();
            SpeechSynthesizerObjectFactory speechSynthesizerObjectFactory =
                    new SpeechSynthesizerObjectFactory();
            GenericObjectPoolConfig<SpeechSynthesizer> config =
                    new GenericObjectPoolConfig<>();
            config.setMaxTotal(objectPoolSize);
            config.setMaxIdle(objectPoolSize);
            config.setMinIdle(objectPoolSize);
            synthesizerPool =
                    new GenericObjectPool<>(speechSynthesizerObjectFactory, config);
        }
        lock.unlock();
        return synthesizerPool;
    }
}

class SynthesizeTaskWithCallback implements Runnable {
    String[] textArray;
    String requestId;
    long timeCost;
    public SynthesizeTaskWithCallback(String[] textArray) {
        this.textArray = textArray;
    }
    @Override
    public void run() {
        SpeechSynthesizer synthesizer = null;
        long startTime = System.currentTimeMillis();
        // if recv onError
        final boolean[] hasError = {false};
        try {
            class ReactCallback extends ResultCallback<SpeechSynthesisResult> {
                ReactCallback() {}

                @Override
                public void onEvent(SpeechSynthesisResult message) {
                    if (message.getAudioFrame() != null) {
                        try {
                            byte[] bytesArray = message.getAudioFrame().array();
                            System.out.println("Received audio, audio stream length: " + bytesArray.length);
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    }
                }

                @Override
                public void onComplete() {}

                @Override
                public void onError(Exception e) {
                    System.out.println(e.getMessage());
                    e.printStackTrace();
                    hasError[0] = true;
                }
            }

            // Replace your-dashscope-api-key with your own API key
            String dashScopeApiKey = "your-dashscope-api-key";

            SpeechSynthesisParam param =
                    SpeechSynthesisParam.builder()
                            .model("cosyvoice-v3-flash")
                            .voice("longanyang")
                            // The API Key differs between the Singapore and Beijing regions. Obtain an API Key: https://www.alibabacloud.com/help/zh/model-studio/get-api-key
                            // If the environment variable is not configured, replace the following line with your Model Studio API Key: .apiKey("sk-xxx")
                            .apiKey(System.getenv("DASHSCOPE_API_KEY"))
                            .format(SpeechSynthesisAudioFormat
                                    .MP3_22050HZ_MONO_256KBPS) // Use PCM or MP3 for streaming synthesis
                            .apiKey(dashScopeApiKey)
                            .build();

            try {
                synthesizer = CosyvoiceObjectPool.getInstance().borrowObject();
                synthesizer.updateParamAndCallback(param, new ReactCallback());
                for (String text : textArray) {
                    synthesizer.streamingCall(text);
                }
                Thread.sleep(20);
                synthesizer.streamingComplete(60000);
                requestId = synthesizer.getLastRequestId();
            } catch (Exception e) {
                System.out.println("Exception e: " + e.toString());
                hasError[0] = true;
            }
        } catch (Exception e) {
            hasError[0] = true;
            throw new RuntimeException(e);
        }
        if (synthesizer != null) {
            try {
                if (hasError[0] == true) {
                    // If an error occurs, close the connection and invalidate the object in the pool.
                    synthesizer.getDuplexApi().close(1000, "bye");
                    CosyvoiceObjectPool.getInstance().invalidateObject(synthesizer);
                } else {
                    // If the task completes normally, return the object to the pool.
                    CosyvoiceObjectPool.getInstance().returnObject(synthesizer);
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
            long endTime = System.currentTimeMillis();
            timeCost = endTime - startTime;
            System.out.println("[Thread " + Thread.currentThread() + "] Speech synthesis task completed. Time cost: " + timeCost + " ms, RequestId " + requestId);
        }
    }
}

@Slf4j
public class SynthesizeTextToSpeechWithCallbackConcurrently {
    public static void checkoutEnv(String envName, int defaultSize) {
        if (System.getenv(envName) != null) {
            System.out.println("[ENV CHECK]: " + envName + " "
                    + System.getenv(envName));
        } else {
            System.out.println("[ENV CHECK]: " + envName
                    + " Using Default which is " + defaultSize);
        }
    }

    public static void main(String[] args)
            throws InterruptedException, NoApiKeyException {
        // The following is the Singapore region URL. If you use a model in the Beijing region, replace the URL with: wss://dashscope.aliyuncs.com/api-ws/v1/inference
        Constants.baseWebsocketApiUrl = "wss://dashscope-intl.aliyuncs.com/api-ws/v1/inference";
        // Check for connection pool env
        checkoutEnv("DASHSCOPE_CONNECTION_POOL_SIZE", 32);
        checkoutEnv("DASHSCOPE_MAXIMUM_ASYNC_REQUESTS", 32);
        checkoutEnv("DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST", 32);
        checkoutEnv(CosyvoiceObjectPool.COSYVOICE_OBJECTPOOL_SIZE_ENV, CosyvoiceObjectPool.DEFAULT_OBJECT_POOL_SIZE);

        int runTimes = 3;
        // Create the pool of SpeechSynthesis objects
        ExecutorService executorService = Executors.newFixedThreadPool(runTimes);

        for (int i = 0; i < runTimes; i++) {
            // Record the task submission time
            LocalDateTime submissionTime = LocalDateTime.now();
            executorService.submit(new SynthesizeTaskWithCallback(new String[] {
                    "Before my bed, moonlight gleams,", "It seems like frost upon the ground.", "I lift my gaze to watch the bright moon,", "Then bow my head, thinking of home."}));
        }

        // Shut down the ExecutorService and wait for all tasks to complete
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.MINUTES);
        System.exit(0);
    }
}

Recommended configuration

The following configurations are based on test results from running only the CosyVoice speech synthesis service on Alibaba Cloud servers with the specified specifications. Excessively high concurrency may increase task processing latency.

The per-machine concurrency refers to the number of CosyVoice speech synthesis tasks running simultaneously, equivalent to the number of worker threads.

Machine configuration (Alibaba Cloud)

Maximum per-machine concurrency

Object pool size

Connection pool size

4 vCPUs, 8 GiB

100

500

2000

8 vCPUs, 16 GiB

150

500

2000

16 vCPUs, 32 GiB

200

500

2000

Resource management and error handling

  • Task success: When a synthesis task completes normally, call the GenericObjectPool's returnObject method to return the SpeechSynthesizer object to the pool for reuse.

    In the current code, this corresponds to CosyvoiceObjectPool.getInstance().returnObject(synthesizer)

    Important

    Don't return SpeechSynthesizer objects with incomplete or failed tasks.

  • Task failure: When an SDK internal error or business logic exception interrupts the task, perform the following two operations:

    1. Close the underlying WebSocket connection.

    2. Invalidate the object in the pool to prevent it from being reused.

    // In the current code, this corresponds to:
    // Close the connection
    synthesizer.getDuplexApi().close(1000, "bye");
    // Invalidate the failed synthesizer in the object pool
    CosyvoiceObjectPool.getInstance().invalidateObject(synthesizer);
  • When the service returns a TaskFailed error, no additional handling is required.

Warm-up and latency measurement

When evaluating the performance of the DashScope Java SDK under concurrent calls (such as latency), run sufficient warm-up requests before the actual test. Warm-up ensures that measurement results accurately reflect steady-state performance, avoiding data skew caused by initial connection overhead.

Connection reuse mechanism

The DashScope Java SDK manages and reuses WebSocket connections through a global singleton connection pool, reducing the overhead of frequent connects and disconnects while improving throughput in high-concurrency scenarios.

How it works:

  • On-demand creation: The SDK doesn't pre-create WebSocket connections at startup. Connections are established on the first call.

  • Time-limited reuse: After a request completes, the connection remains in the pool for up to 60 seconds for reuse.

    • If a new request arrives within 60 seconds, it reuses the existing connection, avoiding repeated handshake overhead.

    • If a connection stays idle for more than 60 seconds, it's automatically closed to release resources.

Why warm-up matters

In the following scenarios, the pool may have no reusable active connections, causing requests to create new connections:

  • The application just started and hasn't made any calls yet.

  • The service has been idle for more than 60 seconds, and pool connections have timed out and closed.

In these scenarios, the first or initial requests trigger a full WebSocket connection process (including TCP handshake, TLS negotiation, and protocol upgrade). The end-to-end latency is significantly higher than subsequent requests that reuse connections. This additional overhead comes from network connection initialization, not from the service itself. Without warm-up, performance test results are skewed by the initial connection time.

SDK-side latency vs. actual first-packet latency

The first-packet latency reported by the SDK (such as the value from get_first_package_delay()) includes WebSocket connection setup and network transmission time. It doesn't equal the model service's actual first-packet latency.

The actual first-packet latency is the interval from when the server receives the run-task command to when it returns the first result-generated event. This value can be found in server-side logs.

In high-concurrency scenarios, the SDK-reported latency may be significantly higher than the server-side first-packet latency due to the overhead of establishing many connections and scheduling resources. If you observe high first-packet latency from the SDK:

  • Compare with the server-side first-packet latency in logs (from run-task to the first result-generated) to confirm whether model inference performance is normal.

  • Use the object pool or connection pool warm-up mechanism described above to eliminate WebSocket connection overhead, bringing the SDK-reported latency closer to the actual first-packet latency.

Recommended approach

To obtain reliable performance data, follow these warm-up steps before running performance benchmarks or latency measurements:

  1. Simulate the target concurrency level and send requests in advance (for example, for 1-2 minutes) to fully populate the connection pool.

  2. After confirming the connection pool has established and maintains enough active connections, start the actual performance data collection.

Proper warm-up brings the SDK connection pool into a stable reuse state, yielding more representative latency metrics that accurately reflect steady-state production performance.

Common exceptions in the Java SDK

Exception 1: The service traffic is stable, but the number of TCP connections on the server continues to increase

Cause:

Type 1:

Each SDK object requests a connection upon creation. If you do not use an object pool, the object is destroyed after each task completes. This action leaves the connection in an unreferenced state, and it is disconnected only after the server-side connection timeout of 61 seconds. Consequently, the connection cannot be reused during this 61-second period.

In high-concurrency scenarios, a new task creates a new connection if no reusable connections are available. This leads to the following issues:

  1. The number of connections continues to increase.

  2. Server performance degrades because an excessive number of connections consumes available server resources.

  3. The connection pool becomes full, and new tasks are blocked while they wait for available connections.

Type 2:

The `MaxIdle` parameter of the object pool is set to a value that is smaller than the `MaxTotal` parameter. As a result, when the pool has idle objects, any objects that exceed the `MaxIdle` limit are destroyed. This process can cause connection leaks. These leaked connections are disconnected only after a 61-second timeout. Similar to the Type 1 cause, this leads to a continuous increase in the number of connections.

Solution:

For the Type 1 cause, use an object pool.

For the Type 2 cause, check the object pool configuration parameters. Set `MaxIdle` and `MaxTotal` to the same value, and disable the automatic object pool destruction policy.

Exception 2: The task takes 60 seconds longer than a normal call

The cause is the same as for Exception 1. The connection pool has reached its maximum number of connections. A new task must wait 61 seconds for an unreferenced connection to time out before the task can obtain a new connection.

Exception 3: Tasks are slow when the service starts and then gradually return to normal

Cause:

During high-concurrency calls, a single object reuses its WebSocket connection for multiple tasks. Therefore, a WebSocket connection is typically created only when the service starts. Note that if high-concurrency calls are initiated immediately during the task startup stage, creating too many WebSocket connections at the same time may cause blocking.

Solution:

Gradually increase the concurrency, or add prefetch tasks after the service starts.

Exception 4: The server reports the "Invalid action('run-task')! Please follow the protocol!" error

Cause:

When a client-side error occurs, the server is not notified, and the connection remains in a task-in-progress state. If this connection and its associated object are then reused for a new task, a protocol error occurs, which causes the new task to fail.

Solution:

After a client-side exception is thrown, you must explicitly close the WebSocket connection and then return the object to the object pool.

Exception 5: The service traffic is stable, but the call volume has abnormal spikes

Cause:

Creating too many WebSocket connections at the same time causes blocking. Because incoming service traffic continues, a short-term backlog of tasks is created. After the blocking is resolved, all backlogged tasks are called at once. This causes a spike in call volume that can momentarily exceed the concurrency limit for your Alibaba Cloud account, which can result in task failures, server performance degradation, and other issues.

Creating too many WebSocket connections at once typically occurs in the following scenarios:

  • During the service startup stage

  • A network exception occurs that causes many WebSocket connections to be interrupted and reconnected at the same time.

  • Many server-side errors occur at the same time, which leads to many WebSocket reconnections. A common error occurs when the concurrency exceeds the account limit ("Requests rate limit exceeded, please try again later.").

Solution:

  1. Check your network conditions.

  2. Check whether many other server-side errors occurred before the spike.

  3. Increase the concurrency limit for your Alibaba Cloud account.

  4. Reduce the sizes of the object pool and connection pool. You can also limit the maximum concurrency using the upper limit of the object pool.

  5. Upgrade your server configuration or increase the number of servers.

Exception 6: All tasks slow down as the concurrency increases

Solution:

  1. Check whether you have reached the network bandwidth limit.

  2. Check whether the actual concurrency is too high for your server's specifications.

Supported regions

Available models vary by deployment region:

International

If you select the International deployment scope, model inference compute resources are dynamically scheduled worldwide, excluding the Chinese mainland. Static data is stored in your selected region. Supported region: Singapore.

To call the following models, select an API Key from the Singapore region:

  • CosyVoice: cosyvoice-v3-plus, cosyvoice-v3-flash

  • Qwen-TTS:

    • Qwen3-TTS-Instruct-Flash-Realtime: qwen3-tts-instruct-flash-realtime (stable, currently equivalent to qwen3-tts-instruct-flash-realtime-2026-01-22), qwen3-tts-instruct-flash-realtime-2026-01-22 (latest snapshot)

    • Qwen3-TTS-VD-Realtime: qwen3-tts-vd-realtime-2026-01-15 (latest snapshot), qwen3-tts-vd-realtime-2025-12-16 (snapshot)

    • Qwen3-TTS-VC-Realtime: qwen3-tts-vc-realtime-2026-01-15 (latest snapshot), qwen3-tts-vc-realtime-2025-11-27 (snapshot)

    • Qwen3-TTS-Flash-Realtime: qwen3-tts-flash-realtime (stable, currently equivalent to qwen3-tts-flash-realtime-2025-11-27), qwen3-tts-flash-realtime-2025-11-27 (latest snapshot), qwen3-tts-flash-realtime-2025-09-18 (snapshot)

Chinese mainland

If you select the Chinese mainland deployment scope, model inference compute resources are restricted to the Chinese mainland. Static data is stored in your selected region. Supported region: China (Beijing).

To call the following models, select an API Key from the Beijing region:

  • CosyVoice: cosyvoice-v3.5-plus, cosyvoice-v3.5-flash, cosyvoice-v3-plus, cosyvoice-v3-flash, cosyvoice-v2

  • Qwen-TTS:

    • Qwen3-TTS-Instruct-Flash-Realtime: qwen3-tts-instruct-flash-realtime (stable, currently equivalent to qwen3-tts-instruct-flash-realtime-2026-01-22), qwen3-tts-instruct-flash-realtime-2026-01-22 (latest snapshot)

    • Qwen3-TTS-VD-Realtime: qwen3-tts-vd-realtime-2026-01-15 (latest snapshot), qwen3-tts-vd-realtime-2025-12-16 (snapshot)

    • Qwen3-TTS-VC-Realtime: qwen3-tts-vc-realtime-2026-01-15 (latest snapshot), qwen3-tts-vc-realtime-2025-11-27 (snapshot)

    • Qwen3-TTS-Flash-Realtime: qwen3-tts-flash-realtime (stable, currently equivalent to qwen3-tts-flash-realtime-2025-11-27), qwen3-tts-flash-realtime-2025-11-27 (latest snapshot), qwen3-tts-flash-realtime-2025-09-18 (snapshot)

    • Qwen-TTS-Realtime: qwen-tts-realtime (stable, currently equivalent to qwen-tts-realtime-2025-07-15), qwen-tts-realtime-latest (latest, currently equivalent to qwen-tts-realtime-2025-07-15), qwen-tts-realtime-2025-07-15 (snapshot)

Supported voices

Different models support different voices. Set the voice request parameter to the value in the voice parameter column of the voice list.

API reference

FAQ

Q: How do I fix incorrect pronunciation in speech synthesis? How do I control the pronunciation of polyphonic characters?

  • Replace the polyphonic character with a homophone to quickly fix the pronunciation issue.

  • Use SSML markup language to control pronunciation .

Q: How do I troubleshoot silent audio when using a cloned voice?

  1. Verify the voice status

    Call the Voice cloning/design API API and confirm that the voice status is OK.

  2. Check model version consistency

    Make sure the target_model parameter used during voice cloning matches the model parameter used during speech synthesis. For example:

    • If you used cosyvoice-v3-plus for cloning

    • You must also use cosyvoice-v3-plus for synthesis

  3. Verify source audio quality

    Check whether the source audio used for voice cloning meets the Voice cloning/design API:

    • Audio duration: 10-20 seconds

    • Clear audio quality

    • No background noise

  4. Check request parameters

    Confirm that the voice parameter in your speech synthesis request is set to the cloned voice ID.

Q: What should I do if the cloned voice produces unstable or incomplete speech?

If the synthesized speech from a cloned voice exhibits any of the following issues:

  • Incomplete playback that only reads part of the text

  • Inconsistent synthesis quality

  • Abnormal pauses or silent segments in the speech

Possible cause: The source audio quality doesn't meet the requirements.

Solution: Check whether the source audio meets the following requirements. We recommend re-recording based on the Recording guide for voice cloning.

  • Check audio continuity: Make sure the speech content in the source audio is continuous, with no pauses or silent segments exceeding 2 seconds. Noticeable gaps in the audio can cause the model to misidentify silence or noise as voice characteristics, which degrades synthesis quality.

  • Check speech activity ratio: Make sure active speech accounts for more than 60% of the total audio duration. Excessive background noise or non-speech segments interfere with voice feature extraction.

  • Verify audio quality details:

    • Audio duration: 10-20 seconds (approximately 15 seconds recommended)

    • Clear pronunciation with steady speaking pace

    • No background noise, echo, or distortion

    • Concentrated voice energy with no long silent segments

Q: Why does the actual duration differ from the duration displayed in the WAV file header?

Speech synthesis uses a streaming mechanism that returns data progressively as it's generated. The duration in the saved WAV file header is an estimate and may contain inaccuracies. For precise duration, set format to pcm, wait for the complete synthesis result, and then add the WAV file header yourself.

Q: Why won't the audio play?

Troubleshoot based on the following scenarios:

  1. Audio saved as a complete file (such as xx.mp3)

    1. Audio format consistency: Make sure the audio format specified in the request parameters matches the file extension. For example, if the request format is set to wav but the file is saved as .mp3, playback may fail.

    2. Player compatibility: Confirm that your player supports the audio file's format and sample rate. Some players don't support high sample rates or specific audio encodings.

  2. Streaming audio playback

    1. Save the audio stream as a complete file and try playing it with a media player. If the file won't play, refer to the troubleshooting steps in Scenario 1.

    2. If the file plays correctly, the issue is likely in your streaming playback implementation. Confirm that your player supports streaming playback. Common tools and libraries that support streaming playback include ffmpeg, pyaudio (Python), AudioFormat (Java), and MediaSource (JavaScript).

Q: Why is audio playback stuttering?

Troubleshoot with the following steps:

  1. Check text send rate: Make sure the interval between text segments is appropriate, so that the next segment is sent before the previous audio finishes playing.

  2. Check callback function performance:

    • Confirm that the callback function doesn't contain excessive business logic that causes blocking.

    • The callback function runs on the WebSocket thread. Blocking it delays network packet reception and causes audio stutter.

    • Write audio data to a separate audio buffer and process it in a different thread to avoid blocking the WebSocket thread.

  3. Check network stability: Make sure your network connection is stable. Network fluctuations can cause audio transmission interruptions or delays.

Q: Why is speech synthesis taking a long time?

Troubleshoot with the following steps:

  1. Check input interval

    For streaming speech synthesis, confirm that the interval between text segments isn't too long (for example, waiting several seconds after sending one segment before sending the next). Long intervals increase the total synthesis time.

  2. Analyze performance metrics

    • First-packet latency: typically around 500 ms.

    • RTF (Real-Time Factor = total synthesis time / audio duration): should be less than 1.0 under normal conditions.

Q: How do I restrict an API key to speech synthesis only (permission isolation)?

Create a new workspace and authorize only specific models to limit the scope of your API key. For details, see Manage workspaces.