All Products
Search
Document Center

Alibaba Cloud Model Studio:Qwen-Omni-Realtime

Last Updated:Jun 12, 2026

Qwen-Omni-Realtime processes streaming audio and image inputs (including video frames) and generates text and audio responses in real time.

Supported regions: Singapore, China (Beijing). Each region requires its own API key.

How to use

1. Establish connection

Qwen-Omni-Realtime supports WebSocket and WebRTC. WebSocket suits server-side integration with quick setup. WebRTC targets browser-based low-latency voice scenarios, transmitting audio over UDP with built-in echo cancellation and noise reduction.

WebSocket

Native WebSocket

Connection parameters:

Parameter

Description

Endpoint

China (Beijing) region: wss://dashscope.aliyuncs.com/api-ws/v1/realtime

Singapore region: wss://{WorkspaceId}.ap-southeast-1.maas.aliyuncs.com/api-ws/v1/realtime. Replace {WorkspaceId} with your actual workspace ID.

Query parameter

Use the model query parameter to specify the model. Example: ?model=qwen3.5-omni-plus-realtime

Request header

Use a Bearer token for authentication: Authorization: Bearer DASHSCOPE_API_KEY

DASHSCOPE_API_KEY is the API key from Model Studio.
# pip install websocket-client
import json
import websocket
import os

API_KEY=os.getenv("DASHSCOPE_API_KEY")
API_URL = "wss://{WorkspaceId}.ap-southeast-1.maas.aliyuncs.com/api-ws/v1/realtime?model=qwen3.5-omni-plus-realtime"

headers = [
    "Authorization: Bearer " + API_KEY
]

def on_open(ws):
    print(f"Connected to server: {API_URL}")
def on_message(ws, message):
    data = json.loads(message)
    print("Received event:", json.dumps(data, indent=2))
def on_error(ws, error):
    print("Error:", error)

ws = websocket.WebSocketApp(
    API_URL,
    header=headers,
    on_open=on_open,
    on_message=on_message,
    on_error=on_error
)

ws.run_forever()

DashScope Python SDK

# SDK version 1.23.9 or later is required.
import os
import json
from dashscope.audio.qwen_omni import OmniRealtimeConversation,OmniRealtimeCallback
import dashscope
# API keys for the Singapore and China (Beijing) regions are different. To get an API key, see https://www.alibabacloud.com/help/en/model-studio/get-api-key.
# If you have not configured an API key, change the following line to dashscope.api_key = "sk-xxx".
dashscope.api_key = os.getenv("DASHSCOPE_API_KEY")

class PrintCallback(OmniRealtimeCallback):
    def on_open(self) -> None:
        print("Connected Successfully")
    def on_event(self, response: dict) -> None:
        print("Received event:")
        print(json.dumps(response, indent=2, ensure_ascii=False))
    def on_close(self, close_status_code: int, close_msg: str) -> None:
        print(f"Connection closed (code={close_status_code}, msg={close_msg}).")

callback = PrintCallback()
conversation = OmniRealtimeConversation(
    model="qwen3.5-omni-plus-realtime",
    callback=callback,
    # The following URL is for the Singapore region. When calling, replace WorkspaceId with your actual workspace ID. URLs vary by region.
    url="wss://{WorkspaceId}.ap-southeast-1.maas.aliyuncs.com/api-ws/v1/realtime"
)
try:
    conversation.connect()
    print("Conversation started. Press Ctrl+C to exit.")
    conversation.thread.join()
except KeyboardInterrupt:
    conversation.close()

DashScope Java SDK

// SDK version 2.20.9 or later is required.
import com.alibaba.dashscope.audio.omni.*;
import com.alibaba.dashscope.exception.NoApiKeyException;
import com.google.gson.JsonObject;
import java.util.concurrent.CountDownLatch;

public class Main {
    public static void main(String[] args) throws InterruptedException, NoApiKeyException {
        CountDownLatch latch = new CountDownLatch(1);
        OmniRealtimeParam param = OmniRealtimeParam.builder()
                .model("qwen3.5-omni-plus-realtime")
                .apikey(System.getenv("DASHSCOPE_API_KEY"))
                // The following URL is for the Singapore region. When calling, replace WorkspaceId with your actual workspace ID. URLs vary by region.
                .url("wss://{WorkspaceId}.ap-southeast-1.maas.aliyuncs.com/api-ws/v1/realtime")
                .build();

        OmniRealtimeConversation conversation = new OmniRealtimeConversation(param, new OmniRealtimeCallback() {
            @Override
            public void onOpen() {
                System.out.println("Connected Successfully");
            }
            @Override
            public void onEvent(JsonObject message) {
                System.out.println(message);
            }
            @Override
            public void onClose(int code, String reason) {
                System.out.println("connection closed code: " + code + ", reason: " + reason);
                latch.countDown();
            }
        });
        conversation.connect();
        latch.await();
        conversation.close(1000, "bye");
        System.exit(0);
    }
}

WebRTC

Establishing a WebRTC connection involves two stages:

  1. SDP exchange (HTTP): The client sends its media capabilities and network addresses (Offer SDP) to the server via HTTP POST. The server returns its information (Answer SDP) to complete the capability negotiation.

  2. Connection (automatic): After the negotiation, the WebRTC layer automatically establishes the audio transport channel.

SDP exchange configuration:

Parameter

Description

Request URL

POST https://{endpoint}/api/v1/webrtc/realtime

The WebRTC feature is currently available by allowlist only. Contact your sales manager to get the endpoint.

Query parameter

Use the model query parameter to specify the model. Example: ?model=qwen3.5-omni-plus-realtime

Content-Type

application/sdp

Request header

Authorization: Bearer DASHSCOPE_API_KEY

Request body

The client-generated Offer SDP string

Response

Success: HTTP 200 with the server Answer SDP string. Failure: HTTP 4xx with a JSON error message.

Connection code examples:

# pip install aiortc aiohttp certifi
import asyncio, aiohttp, ssl, certifi
from aiortc import RTCPeerConnection, RTCConfiguration, RTCSessionDescription
from aiortc.mediastreams import AudioStreamTrack

API_KEY = "your-api-key"
MODEL = "qwen3.5-omni-plus-realtime"
SIGNALING_URL = f"https://{{endpoint}}/api/v1/webrtc/realtime?model={MODEL}"

async def connect():
    pc = RTCPeerConnection(RTCConfiguration(iceServers=[]))

    # Add an audio track to ensure the Offer SDP contains m=audio (required by the server)
    pc.addTrack(AudioStreamTrack())

    # Create a DataChannel to trigger SDP negotiation (name is customizable; the server pushes events through a channel named "txt")
    pc.createDataChannel("oai-events")

    # SDP exchange: create an Offer and send it to the server
    offer = await pc.createOffer()
    await pc.setLocalDescription(offer)

    async with aiohttp.ClientSession() as session:
        async with session.post(
            SIGNALING_URL,
            ssl=ssl.create_default_context(cafile=certifi.where()),
            data=offer.sdp.encode("utf-8"),
            headers={
                "Content-Type": "application/sdp",
                "Authorization": f"Bearer {API_KEY}",
            },
        ) as resp:
            if not resp.ok:
                raise Exception(f"SDP exchange failed: {resp.status} {await resp.text()}")
            answer_sdp = await resp.text()

    print("=== Offer SDP ===")
    print(offer.sdp)
    print("=== Answer SDP ===")
    print(answer_sdp)

    # ICE connection is established automatically
    await pc.setRemoteDescription(RTCSessionDescription(sdp=answer_sdp, type="answer"))
    print("WebRTC connection established")
    return pc
const API_KEY = 'your-api-key';
const API_URL = 'https://{endpoint}/api/v1/webrtc/realtime?model=qwen3.5-omni-plus-realtime';

async function connect() {
  const pc = new RTCPeerConnection({ iceServers: [] });

  // Add an audio track to ensure the Offer SDP contains m=audio (required by the server)
  const stream = await navigator.mediaDevices.getUserMedia({ audio: true });
  stream.getAudioTracks().forEach(t => pc.addTrack(t, stream));

  // Create a DataChannel to trigger SDP negotiation (name is customizable; the server pushes events through a channel named "txt")
  pc.createDataChannel('oai-events');

  // Wait for ICE gathering to complete before sending the Offer to get the Answer
  pc.onicegatheringstatechange = async () => {
    if (pc.iceGatheringState !== 'complete') return;
    const resp = await fetch(API_URL, {
      method: 'POST',
      headers: {
        'Content-Type': 'application/sdp',
        'Authorization': `Bearer ${API_KEY}`,
      },
      body: pc.localDescription.sdp,
    });
    if (!resp.ok) throw new Error('SDP exchange failed: ' + resp.status);
    const answerSdp = await resp.text();
    // ICE connection is established automatically
    await pc.setRemoteDescription({ type: 'answer', sdp: answerSdp });
    console.log('WebRTC connection established');
  };

  // Create the Offer
  const offer = await pc.createOffer();
  await pc.setLocalDescription(offer);
  return pc;
}

2. Configure session

Send the session.update client event:

{
    // A client-generated event ID.
    "event_id": "event_ToPZqeobitzUJnt3QqtWg",
    // The event type. Must be "session.update".
    "type": "session.update",
    // The session configuration.
    "session": {
        // The output modality. Set this to ["text"] for text-only output, or ["text", "audio"] for both text and audio output.
        "modalities": [
            "text",
            "audio"
        ],
        // The voice for the audio output.
        "voice": "Ethan",
        // The input audio format. Only "pcm" is supported. The input audio must be a PCM audio stream at a 16 kHz sample rate.
        "input_audio_format": "pcm",
        // The output audio format. Only "pcm" is supported. The output audio is a PCM audio stream at a 24 kHz sample rate.
        "output_audio_format": "pcm",
        // A system instruction to define the model's goal or role.
        "instructions": "You are an AI customer service agent for a five-star hotel. Answer customer inquiries about room types, facilities, prices, and booking policies accurately and in a friendly manner. Always respond with a professional and helpful attitude. Do not provide unconfirmed information or information beyond the scope of the hotel's services.",
        // Enables server-side voice activity detection (VAD). If enabled, the server automatically detects the start and end of speech.
        // If null, the client controls when to trigger model responses.
        "turn_detection": {
            // The VAD type. Valid values: "server_vad" and "semantic_vad". We recommend "semantic_vad" for the qwen3.5-omni-realtime model.
            "type": "semantic_vad",
            // The VAD detection threshold. We recommend increasing this value in noisy environments and decreasing it in quiet environments.
            "threshold": 0.5,
            // The silence duration in milliseconds (ms) that signals the end of an utterance. The model triggers a response if this duration is exceeded.
            "silence_duration_ms": 800
        }
    }
}

3. Input audio and images

Audio input is required; image input is optional. The input method depends on the protocol.

WebSocket

Send Base64-encoded audio and image data to the server buffer using the input_audio_buffer.append and input_image_buffer.append events.

Images can come from local files or real-time video stream captures.
With server-side VAD enabled, the server automatically submits data and triggers a response at end-of-utterance. With VAD disabled (manual mode), call the input_audio_buffer.commit event to submit data after sending.

WebRTC

Audio and video tracks (RTP media channels) added during connection establishment transmit data to the server automatically.

  • Audio: Transmitted directly through the audio track (RTP). No input_audio_buffer.append events needed.

  • Images: Sent as video frames through the video track (RTP). input_image_buffer.append is not supported.

WebRTC only supports server-side VAD mode (server_vad or semantic_vad). Manual mode is not supported.

4. Receive model responses

The response format depends on the configured output modality.

WebSocket

WebRTC

  • Text only

    Same as WebSocket. Receive streaming text events through the DataChannel.

  • Text and audio

    • Text: Received through the DataChannel as streaming text events, same as WebSocket.

    • Audio: Received and played in real time through RTP tracks. No response.audio.delta events needed.

Model selection

Qwen3.5-Omni-Realtime improves over Qwen3-Omni-Flash-Realtime in the following areas:

  • Intelligence level

    On par with Qwen3.5-Plus.

  • Web search

    Built-in web search — the model autonomously searches to answer real-time questions. For details, see Web search.

  • Tool calling

    Function calling — the model autonomously invokes external tools. For details, see Qwen-Omni-Realtime series.

  • Semantic interruption

    Identifies conversational intent to prevent interruptions from backchanneling and background noise.

  • Voice control

    Control volume, speaking rate, and emotion via voice commands (e.g., "speak faster", "louder", "in a happy tone").

  • Supported languages

    Supports speech recognition for 113 languages and dialects and speech generation for 36 languages and dialects.

  • Supported voices

    Supports 55 voices, including 47 multilingual voices and 8 dialectal voices. For a complete list, see Voice list.

  • Voice cloning

    Use a custom cloned voice for real-time conversations (Qwen3.5-omni-plus-realtime and Qwen3.5-omni-flash-realtime). For details, see Voice cloning.

Check the Model Studio console for model names, context, pricing, and snapshot versions. For concurrency rate limits, see Rate limits.

Limitations

  • Web search and tool calling are mutually exclusive.

  • A single WebSocket session can last up to 120 minutes. The connection closes automatically at this limit.

  • The model retains conversation history up to the following turn and duration limits. When exceeded, the oldest history is discarded. Max duration is the cumulative audio or video (image frame) duration retained in context.

    Video is input as extracted frames (recommended: 1 fps). Video max duration is the cumulative frame duration retained — for example, 240 s means only frames from the last 240 seconds are kept.
    The qwen3-omni-flash-realtime model has a limit of 8 dialog turns (typically reached first). Its duration limit depends on the model's context length and is not listed separately.

    Model

    Audio max turns

    Video max turns

    Audio max duration

    Video max duration

    qwen3.5-omni-plus-realtime

    100 turns

    50 turns

    600 seconds

    240 seconds

    qwen3.5-omni-flash-realtime

    80 turns

    50 turns

    480 seconds

    120 seconds

    qwen3-omni-flash-realtime

    8 turns

    8 turns

Getting started

Get an API key and set it as an environment variable.

Select a programming language and follow the steps to start a real-time chat.

WebSocket

DashScope Python SDK

  • Runtime environment

Ensure Python 3.10 or later is installed.

Install PyAudio for your operating system.

macOS

brew install portaudio && pip install pyaudio

Debian/Ubuntu

  • If you are not using a virtual environment, you can install it directly using the system package manager:

    sudo apt-get install python3-pyaudio
  • If you are using a virtual environment, first install the build dependencies:

    sudo apt update
    sudo apt install -y python3-dev portaudio19-dev

    Then, install it with pip in the activated virtual environment:

    pip install pyaudio

CentOS

sudo yum install -y portaudio portaudio-devel && pip install pyaudio

Windows

pip install pyaudio

Install the other dependencies:

pip install websocket-client dashscope
  • Interaction mode

    • VAD mode (Voice Activity Detection, automatically detects the start and end of speech)

      The server responds after detecting the end of the user's speech.

    • Manual mode (press to speak, release to send)

      The client controls the start and end of speech. After speaking, your application must notify the server.

    VAD mode

    Create a Python file named vad_dash.py and copy the following code into the file:

    vad_dash.py

    # Dependencies: dashscope >= 1.23.9, pyaudio
    import os
    import base64
    import time
    import pyaudio
    from dashscope.audio.qwen_omni import MultiModality, AudioFormat,OmniRealtimeCallback,OmniRealtimeConversation
    import dashscope
    
    # Configuration: URL, API key, voice, model, model role
    # Specify the region. 'intl' for Singapore region, 'cn' for China (Beijing) region.
    region = 'intl'
    base_domain = '{WorkspaceId}.ap-southeast-1.maas.aliyuncs.com' if region == 'intl' else 'dashscope.aliyuncs.com'
    url = f'wss://{base_domain}/api-ws/v1/realtime'
    # Configure the API key. If the environment variable is not set, replace the following line with your API key: dashscope.api_key = "sk-xxx"
    dashscope.api_key = os.getenv('DASHSCOPE_API_KEY')
    # Specify the voice.
    voice = 'Ethan'
    # Specify the model.
    model = 'qwen3.5-omni-plus-realtime'
    # Specify the model role.
    instructions = "You are Xiaoyun, a personal assistant. Answer the user's questions in a humorous and witty way."
    class SimpleCallback(OmniRealtimeCallback):
        def __init__(self, pya):
            self.pya = pya
            self.out = None
        def on_open(self):
            # Initialize the audio output stream.
            self.out = self.pya.open(
                format=pyaudio.paInt16,
                channels=1,
                rate=24000,
                output=True
            )
        def on_event(self, response):
            if response['type'] == 'response.audio.delta':
                # Play the audio.
                self.out.write(base64.b64decode(response['delta']))
            elif response['type'] == 'conversation.item.input_audio_transcription.delta':
                # Streaming preview: text is the confirmed prefix, stash is the unconfirmed suffix.
                preview = response.get('text', '') + response.get('stash', '')
                print(f"\r[User] {preview}", end='', flush=True)
            elif response['type'] == 'conversation.item.input_audio_transcription.completed':
                # Transcription completed. Print the final text and move to a new line.
                print(f"\r[User] {response['transcript']}")
            elif response['type'] == 'response.audio_transcript.done':
                # Print the assistant's response text.
                print(f"[LLM] {response['transcript']}")
    
    # 1. Initialize the audio device.
    pya = pyaudio.PyAudio()
    # 2. Create the callback function and conversation.
    callback = SimpleCallback(pya)
    conv = OmniRealtimeConversation(model=model, callback=callback, url=url)
    # 3. Connect and configure the session.
    conv.connect()
    conv.update_session(output_modalities=[MultiModality.AUDIO, MultiModality.TEXT], voice=voice, instructions=instructions)
    # 4. Initialize the audio input stream.
    mic = pya.open(format=pyaudio.paInt16, channels=1, rate=16000, input=True)
    # 5. Main loop to process audio input.
    print("Conversation started. Speak into the microphone (Ctrl+C to exit)...")
    try:
        while True:
            audio_data = mic.read(3200, exception_on_overflow=False)
            conv.append_audio(base64.b64encode(audio_data).decode())
            time.sleep(0.01)
    except KeyboardInterrupt:
        # Clean up resources.
        conv.close()
        mic.close()
        callback.out.close()
        pya.terminate()
        print("\nConversation ended")

    Run vad_dash.py to start a real-time conversation through your microphone. The system detects speech and streams audio to the server.

    Manual mode

    Create a Python file named manual_dash.py and copy the following code into the file:

    manual_dash.py

    # Dependencies: dashscope >= 1.23.9, pyaudio
    import os
    import base64
    import sys
    import threading
    import pyaudio
    from dashscope.audio.qwen_omni import *
    import dashscope
    
    # If the environment variable is not set, replace the next line with your API key: dashscope.api_key = "sk-xxx"
    dashscope.api_key = os.getenv('DASHSCOPE_API_KEY')
    voice = 'Ethan'
    
    class MyCallback(OmniRealtimeCallback):
        """A minimal callback: initializes the speaker on connection and plays the returned audio directly in the event handler."""
        def __init__(self, ctx):
            super().__init__()
            self.ctx = ctx
    
        def on_open(self) -> None:
            # Initialize PyAudio and the speaker (24 kHz, mono, 16-bit) after the connection is established.
            print('connection opened')
            try:
                self.ctx['pya'] = pyaudio.PyAudio()
                self.ctx['out'] = self.ctx['pya'].open(
                    format=pyaudio.paInt16,
                    channels=1,
                    rate=24000,
                    output=True
                )
                print('audio output initialized')
            except Exception as e:
                print('[Error] audio init failed: {}'.format(e))
    
        def on_close(self, close_status_code, close_msg) -> None:
            print('connection closed with code: {}, msg: {}'.format(close_status_code, close_msg))
            sys.exit(0)
    
        def on_event(self, response: str) -> None:
            try:
                t = response['type']
                handlers = {
                    'session.created': lambda r: print('start session: {}'.format(r['session']['id'])),
                    'conversation.item.input_audio_transcription.delta': lambda r: print('\rquestion: {}'.format(r.get('text', '') + r.get('stash', '')), end='', flush=True),
                    'conversation.item.input_audio_transcription.completed': self._transcription_completed,
                    'response.audio_transcript.delta': lambda r: print('llm text: {}'.format(r['delta'])),
                    'response.audio.delta': self._play_audio,
                    'response.done': self._response_done,
                }
                h = handlers.get(t)
                if h:
                    h(response)
            except Exception as e:
                print('[Error] {}'.format(e))
    
        def _transcription_completed(self, response):
            print()
            self.ctx['transcription_done'].set()
    
        def _play_audio(self, response):
            # Decode the base64 data and write it directly to the output stream for playback.
            if self.ctx['out'] is None:
                return
            try:
                data = base64.b64decode(response['delta'])
                self.ctx['out'].write(data)
            except Exception as e:
                print('[Error] audio playback failed: {}'.format(e))
    
        def _response_done(self, response):
            # Mark the current turn as complete, allowing the main loop to proceed.
            if self.ctx['conv'] is not None:
                print('[Metric] response: {}, first text delay: {}, first audio delay: {}'.format(
                    self.ctx['conv'].get_last_response_id(),
                    self.ctx['conv'].get_last_first_text_delay(),
                    self.ctx['conv'].get_last_first_audio_delay(),
                ))
            if self.ctx['resp_done'] is not None:
                self.ctx['resp_done'].set()
    
    def shutdown_ctx(ctx):
        """Safely release audio and PyAudio resources."""
        try:
            if ctx['out'] is not None:
                ctx['out'].close()
                ctx['out'] = None
        except Exception:
            pass
        try:
            if ctx['pya'] is not None:
                ctx['pya'].terminate()
                ctx['pya'] = None
        except Exception:
            pass
    
    
    def stream_record_and_send(pya_inst, conversation, sample_rate=16000, chunk_size=3200):
        stop_evt = threading.Event()
        stream = pya_inst.open(
            format=pyaudio.paInt16,
            channels=1,
            rate=sample_rate,
            input=True,
            frames_per_buffer=chunk_size
        )
    
        def _reader():
            while not stop_evt.is_set():
                try:
                    data = stream.read(chunk_size, exception_on_overflow=False)
                    conversation.append_audio(base64.b64encode(data).decode())
                except Exception:
                    break
    
        t = threading.Thread(target=_reader, daemon=True)
        t.start()
        input()
        stop_evt.set()
        t.join(timeout=1.0)
        stream.close()
    
    
    if __name__  == '__main__':
        print('Initializing ...')
        # Runtime context: stores audio and conversation handles.
        ctx = {'pya': None, 'out': None, 'conv': None, 'resp_done': threading.Event(), 'transcription_done': threading.Event()}
        callback = MyCallback(ctx)
        conversation = OmniRealtimeConversation(
            model='qwen3.5-omni-plus-realtime',
            callback=callback,
            # The following URL is for the Singapore region. When calling, replace WorkspaceId with your actual workspace ID. URLs vary by region.
            url="wss://{WorkspaceId}.ap-southeast-1.maas.aliyuncs.com/api-ws/v1/realtime",
        )
        try:
            conversation.connect()
        except Exception as e:
            print('[Error] connect failed: {}'.format(e))
            sys.exit(1)
    
        ctx['conv'] = conversation
        # Session configuration: enable text and audio output, and disable server-side VAD for manual recording.
        conversation.update_session(
            output_modalities=[MultiModality.AUDIO, MultiModality.TEXT],
            voice=voice,
            enable_input_audio_transcription=True,
            input_audio_transcription_model='qwen3-asr-flash-realtime',
            enable_turn_detection=False,
            instructions="You are Xiaoyun, a personal assistant. Please answer the user's questions accurately and in a friendly manner, always responding with a helpful attitude."
        )
    
        try:
            turn = 1
            while True:
                print(f"\n--- turn {turn} ---")
                print("Press Enter to start recording (enter q and press Enter to exit)...")
                user_input = input()
                if user_input.strip().lower() in ['q', 'quit']:
                    print("User requested to exit...")
                    break
                print("Recording... Press Enter again to stop recording.")
                if ctx['pya'] is None:
                    ctx['pya'] = pyaudio.PyAudio()
                stream_record_and_send(ctx['pya'], conversation)
    
                ctx['transcription_done'].clear()
                ctx['resp_done'].clear()
                conversation.commit()
                ctx['transcription_done'].wait(timeout=10)
                print("Waiting for model response...")
                conversation.create_response()
                ctx['resp_done'].wait()
                turn += 1
        except KeyboardInterrupt:
            print("\nProgram interrupted by user.")
        finally:
            shutdown_ctx(ctx)
            print("Program exited.")

    Run manual_dash.py. Press Enter to start recording, and press Enter again to stop and send. The model's audio response plays automatically.

DashScope Java SDK

Select an interaction mode

  • VAD mode (Voice Activity Detection, automatically detects the start and end of speech)

    The Realtime API detects when you start and stop speaking and responds.

  • Manual mode (press to talk, release to send)

    The client controls the start and end of speech. After speaking, the client must send a message to the server.

VAD mode

OmniServerVad.java

import com.alibaba.dashscope.audio.omni.*;
import com.alibaba.dashscope.exception.NoApiKeyException;
import com.google.gson.JsonObject;
import javax.sound.sampled.*;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Base64;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;

public class OmniServerVad {
    static class SequentialAudioPlayer {
        private final SourceDataLine line;
        private final Queue<byte[]> audioQueue = new ConcurrentLinkedQueue<>();
        private final Thread playerThread;
        private final AtomicBoolean shouldStop = new AtomicBoolean(false);

        public SequentialAudioPlayer() throws LineUnavailableException {
            AudioFormat format = new AudioFormat(24000, 16, 1, true, false);
            line = AudioSystem.getSourceDataLine(format);
            line.open(format);
            line.start();

            playerThread = new Thread(() -> {
                while (!shouldStop.get()) {
                    byte[] audio = audioQueue.poll();
                    if (audio != null) {
                        line.write(audio, 0, audio.length);
                    } else {
                        try { Thread.sleep(10); } catch (InterruptedException ignored) {}
                    }
                }
            }, "AudioPlayer");
            playerThread.start();
        }

        public void play(String base64Audio) {
            try {
                byte[] audio = Base64.getDecoder().decode(base64Audio);
                audioQueue.add(audio);
            } catch (Exception e) {
                System.err.println("Failed to decode audio: " + e.getMessage());
            }
        }

        public void cancel() {
            audioQueue.clear();
            line.flush();
        }

        public void close() {
            shouldStop.set(true);
            try { playerThread.join(1000); } catch (InterruptedException ignored) {}
            line.drain();
            line.close();
        }
    }

    public static void main(String[] args) {
        try {
            SequentialAudioPlayer player = new SequentialAudioPlayer();
            AtomicBoolean userIsSpeaking = new AtomicBoolean(false);
            AtomicBoolean shouldStop = new AtomicBoolean(false);

            OmniRealtimeParam param = OmniRealtimeParam.builder()
                    .model("qwen3.5-omni-plus-realtime")
                    .apikey(System.getenv("DASHSCOPE_API_KEY"))
                    // The following URL is for the Singapore region. When calling, replace WorkspaceId with your actual workspace ID. URLs vary by region.
                    .url("wss://{WorkspaceId}.ap-southeast-1.maas.aliyuncs.com/api-ws/v1/realtime")
                    .build();

            OmniRealtimeConversation conversation = new OmniRealtimeConversation(param, new OmniRealtimeCallback() {
                @Override public void onOpen() {
                    System.out.println("Connection established.");
                }
                @Override public void onClose(int code, String reason) {
                    System.out.println("Connection closed (" + code + "): " + reason);
                    shouldStop.set(true);
                }
                @Override public void onEvent(JsonObject event) {
                    handleEvent(event, player, userIsSpeaking);
                }
            });

            conversation.connect();
            conversation.updateSession(OmniRealtimeConfig.builder()
                    .modalities(Arrays.asList(OmniRealtimeModality.AUDIO, OmniRealtimeModality.TEXT))
                    .voice("Ethan")
                    .enableTurnDetection(true)
                    .enableInputAudioTranscription(true)
                    .parameters(Map.of("instructions",
                            "You are an AI customer service agent for a five-star hotel. Answer customer inquiries about room types, facilities, prices, and booking policies accurately and in a friendly manner. Always respond with a professional and helpful attitude. Do not provide unconfirmed information or information beyond the scope of the hotel's services."))
                    .build()
            );

            System.out.println("Start speaking (speech start/end is automatically detected, press Ctrl+C to exit)...");
            AudioFormat format = new AudioFormat(16000, 16, 1, true, false);
            TargetDataLine mic = AudioSystem.getTargetDataLine(format);
            mic.open(format);
            mic.start();

            ByteBuffer buffer = ByteBuffer.allocate(3200);
            while (!shouldStop.get()) {
                int bytesRead = mic.read(buffer.array(), 0, buffer.capacity());
                if (bytesRead > 0) {
                    try {
                        conversation.appendAudio(Base64.getEncoder().encodeToString(buffer.array()));
                    } catch (Exception e) {
                        if (e.getMessage() != null && e.getMessage().contains("closed")) {
                            System.out.println("Conversation closed. Stopping recording.");
                            break;
                        }
                    }
                }
                Thread.sleep(20);
            }

            conversation.close(1000, "Normal termination");
            player.close();
            mic.close();
            System.out.println("\nProgram exited.");

        } catch (NoApiKeyException e) {
            System.err.println("API key not found: Set the DASHSCOPE_API_KEY environment variable.");
            System.exit(1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void handleEvent(JsonObject event, SequentialAudioPlayer player, AtomicBoolean userIsSpeaking) {
        String type = event.get("type").getAsString();
        switch (type) {
            case "input_audio_buffer.speech_started":
                System.out.println("\n[User started speaking]");
                player.cancel();
                userIsSpeaking.set(true);
                break;
            case "input_audio_buffer.speech_stopped":
                System.out.println("[User stopped speaking]");
                userIsSpeaking.set(false);
                break;
            case "response.audio.delta":
                if (!userIsSpeaking.get()) {
                    player.play(event.get("delta").getAsString());
                }
                break;
            case "conversation.item.input_audio_transcription.delta":
                // Streaming preview: text is confirmed prefix, stash is unconfirmed suffix
                String preview2 = event.get("text").getAsString() + event.get("stash").getAsString();
                System.out.print("\rUser: " + preview2);
                break;
            case "conversation.item.input_audio_transcription.completed":
                System.out.println();
                break;
            case "response.audio_transcript.done":
                System.out.println("Assistant: " + event.get("transcript").getAsString());
                break;
            case "response.done":
                System.out.println("Response completed.");
                break;
        }
    }
}

Run OmniServerVad.main() to start a real-time conversation through your microphone. The system detects speech and sends audio to the server.

Manual mode

OmniWithoutServerVad.java

// DashScope Java SDK 2.20.9 or later is required.

import com.alibaba.dashscope.audio.omni.*;
import com.alibaba.dashscope.exception.NoApiKeyException;
import com.google.gson.JsonObject;
import javax.sound.sampled.*;
import java.io.IOException;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

public class Main {
    public static class RealtimePcmPlayer {
        private int sampleRate;
        private SourceDataLine line;
        private AudioFormat audioFormat;
        private Thread decoderThread;
        private Thread playerThread;
        private AtomicBoolean stopped = new AtomicBoolean(false);
        private Queue<String> b64AudioBuffer = new ConcurrentLinkedQueue<>();
        private Queue<byte[]> RawAudioBuffer = new ConcurrentLinkedQueue<>();

        public RealtimePcmPlayer(int sampleRate) throws LineUnavailableException {
            this.sampleRate = sampleRate;
            this.audioFormat = new AudioFormat(this.sampleRate, 16, 1, true, false);
            DataLine.Info info = new DataLine.Info(SourceDataLine.class, audioFormat);
            line = (SourceDataLine) AudioSystem.getLine(info);
            line.open(audioFormat);
            line.start();
            decoderThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (!stopped.get()) {
                        String b64Audio = b64AudioBuffer.poll();
                        if (b64Audio != null) {
                            byte[] rawAudio = Base64.getDecoder().decode(b64Audio);
                            RawAudioBuffer.add(rawAudio);
                        } else {
                            try {
                                Thread.sleep(100);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        }
                    }
                }
            });
            playerThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (!stopped.get()) {
                        byte[] rawAudio = RawAudioBuffer.poll();
                        if (rawAudio != null) {
                            try {
                                playChunk(rawAudio);
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        } else {
                            try {
                                Thread.sleep(100);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        }
                    }
                }
            });
            decoderThread.start();
            playerThread.start();
        }

        // Plays an audio chunk and blocks until playback is complete.
        private void playChunk(byte[] chunk) throws IOException, InterruptedException {
            if (chunk == null || chunk.length == 0) return;

            int bytesWritten = 0;
            while (bytesWritten < chunk.length) {
                bytesWritten += line.write(chunk, bytesWritten, chunk.length - bytesWritten);
            }
            int audioLength = chunk.length / (this.sampleRate*2/1000);
            // Waits for the audio in the buffer to finish playing.
            Thread.sleep(audioLength - 10);
        }

        public void write(String b64Audio) {
            b64AudioBuffer.add(b64Audio);
        }

        public void cancel() {
            b64AudioBuffer.clear();
            RawAudioBuffer.clear();
        }

        public void waitForComplete() throws InterruptedException {
            while (!b64AudioBuffer.isEmpty() || !RawAudioBuffer.isEmpty()) {
                Thread.sleep(100);
            }
            line.drain();
        }

        public void shutdown() throws InterruptedException {
            stopped.set(true);
            decoderThread.join();
            playerThread.join();
            if (line != null && line.isRunning()) {
                line.drain();
                line.close();
            }
        }
    } 
    // Records audio and streams it to the conversation in real time.
    private static void recordAndSend(TargetDataLine line, OmniRealtimeConversation conversation) {
        byte[] buffer = new byte[3200];
        AtomicBoolean stopRecording = new AtomicBoolean(false);

        // Starts a thread to listen for the Enter key.
        Thread enterKeyListener = new Thread(() -> {
            try {
                System.in.read();
                stopRecording.set(true);
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        enterKeyListener.start();

        // Records and sends audio in real time.
        while (!stopRecording.get()) {
            int count = line.read(buffer, 0, buffer.length);
            if (count > 0) {
                byte[] chunk = new byte[count];
                System.arraycopy(buffer, 0, chunk, 0, count);
                conversation.appendAudio(Base64.getEncoder().encodeToString(chunk));
            }
        }
    }

    public static void main(String[] args) throws InterruptedException, LineUnavailableException {
        OmniRealtimeParam param = OmniRealtimeParam.builder()
                .model("qwen3.5-omni-plus-realtime")
                // API keys for Singapore and China (Beijing) are different. Get an API key from: https://www.alibabacloud.com/help/en/model-studio/get-api-key
                // If the DASHSCOPE_API_KEY environment variable is not set, replace the apikey call below with your Model Studio API key, for example: .apikey("sk-xxx")
                .apikey(System.getenv("DASHSCOPE_API_KEY"))
                // The following URL is for the Singapore region. When calling, replace WorkspaceId with your actual workspace ID. URLs vary by region.
                .url("wss://{WorkspaceId}.ap-southeast-1.maas.aliyuncs.com/api-ws/v1/realtime")
                .build();
        AtomicReference<CountDownLatch> responseDoneLatch = new AtomicReference<>(null);
        responseDoneLatch.set(new CountDownLatch(1));
        AtomicReference<CountDownLatch> transcriptionDoneLatch = new AtomicReference<>(null);
        transcriptionDoneLatch.set(new CountDownLatch(1));

        RealtimePcmPlayer audioPlayer = new RealtimePcmPlayer(24000);
        final AtomicReference<OmniRealtimeConversation> conversationRef = new AtomicReference<>(null);
        OmniRealtimeConversation conversation = new OmniRealtimeConversation(param, new OmniRealtimeCallback() {
            @Override
            public void onOpen() {
                System.out.println("connection opened");
            }
            @Override
            public void onEvent(JsonObject message) {
                String type = message.get("type").getAsString();
                switch(type) {
                    case "session.created":
                        System.out.println("start session: " + message.get("session").getAsJsonObject().get("id").getAsString());
                        break;
                    case "conversation.item.input_audio_transcription.delta":
                        // Streaming preview: text is confirmed prefix, stash is unconfirmed suffix
                        String transcriptPreview = message.get("text").getAsString() + message.get("stash").getAsString();
                        System.out.print("\rquestion: " + transcriptPreview);
                        break;
                    case "conversation.item.input_audio_transcription.completed":
                        System.out.println();
                        transcriptionDoneLatch.get().countDown();
                        break;
                    case "response.audio_transcript.delta":
                        System.out.println("got llm response delta: " + message.get("delta").getAsString());
                        break;
                    case "response.audio.delta":
                        String recvAudioB64 = message.get("delta").getAsString();
                        audioPlayer.write(recvAudioB64);
                        break;
                    case "response.done":
                        System.out.println("======RESPONSE DONE======");
                        if (conversationRef.get() != null) {
                            System.out.println("[Metric] response: " + conversationRef.get().getResponseId() +
                                    ", first text delay: " + conversationRef.get().getFirstTextDelay() +
                                    " ms, first audio delay: " + conversationRef.get().getFirstAudioDelay() + " ms");
                        }
                        responseDoneLatch.get().countDown();
                        break;
                    default:
                        break;
                }
            }
            @Override
            public void onClose(int code, String reason) {
                System.out.println("connection closed code: " + code + ", reason: " + reason);
            }
        });
        conversationRef.set(conversation);
        try {
            conversation.connect();
        } catch (NoApiKeyException e) {
            throw new RuntimeException(e);
        }
        OmniRealtimeConfig config = OmniRealtimeConfig.builder()
                .modalities(Arrays.asList(OmniRealtimeModality.AUDIO, OmniRealtimeModality.TEXT))
                .voice("Ethan")
                .enableTurnDetection(false)
                // Set the model's role.
                .parameters(new HashMap<String, Object>() {{
                    put("instructions","You are Xiaoyun, a personal assistant. Please answer the user's questions accurately and in a friendly manner, always responding with a helpful attitude.");
                }})
                .build();
        conversation.updateSession(config);

        // Set up microphone for recording.
        AudioFormat format = new AudioFormat(16000, 16, 1, true, false);
        DataLine.Info info = new DataLine.Info(TargetDataLine.class, format);

        if (!AudioSystem.isLineSupported(info)) {
            System.out.println("Line not supported");
            return;
        }

        TargetDataLine line = null;
        try {
            line = (TargetDataLine) AudioSystem.getLine(info);
            line.open(format);
            line.start();

            while (true) {
                System.out.println("Press Enter to start recording...");
                try {
                    System.in.read();
                } catch (IOException e) {
                    System.err.println("Error reading input: " + e.getMessage());
                    break; // Exit the loop if an error occurs.
                }
                System.out.println("Recording started. Speak now... Press Enter again to stop recording and send.");
                recordAndSend(line, conversation);
                conversation.commit();
                // Wait for transcription to complete before triggering model response to avoid interleaved output
                transcriptionDoneLatch.get().await(10, TimeUnit.SECONDS);
                System.out.println("Waiting for model response...");
                conversation.createResponse(null, null);
                responseDoneLatch.get().await();
                // Resets the latches for the next turn.
                responseDoneLatch.set(new CountDownLatch(1));
                transcriptionDoneLatch.set(new CountDownLatch(1));
            }
        } catch (LineUnavailableException e) {
            e.printStackTrace();
        } finally {
            if (line != null) {
                line.stop();
                line.close();
            }
        }
    }}

Run OmniWithoutServerVad.main(). Press Enter to start recording, and press it again to stop and send. The model's response plays automatically.

WebSocket (Python)

  • Prepare the runtime environment

    Ensure Python 3.10 or later is installed.

    Install pyaudio for your operating system.

    macOS

    brew install portaudio && pip install pyaudio

    Debian/Ubuntu

    sudo apt-get install python3-pyaudio
    
    or
    
    pip install pyaudio
    We recommend using pip install pyaudio. If the installation fails, first install the portaudio dependency for your operating system.

    CentOS

    sudo yum install -y portaudio portaudio-devel && pip install pyaudio

    Windows

    pip install pyaudio

    Install the WebSocket dependency:

    pip install websockets==15.0.1
  • Create the client

    Create a file named omni_realtime_client.py and copy the following code into it:

    omni_realtime_client.py

    import asyncio
    import websockets
    import json
    import base64
    import time
    from typing import Optional, Callable, List, Dict, Any
    from enum import Enum
    
    class TurnDetectionMode(Enum):
        SERVER_VAD = "server_vad"
        SEMANTIC_VAD = "semantic_vad"  # Recommended for the qwen3.5-omni-realtime model
        MANUAL = "manual"
    
    class OmniRealtimeClient:
    
        def __init__(
                self,
                base_url,
                api_key: str,
                model: str = "",
                voice: str = "Ethan",
                instructions: str = "You are a helpful assistant.",
                turn_detection_mode: TurnDetectionMode = TurnDetectionMode.SERVER_VAD,
                on_text_delta: Optional[Callable[[str], None]] = None,
                on_audio_delta: Optional[Callable[[bytes], None]] = None,
                on_input_transcript: Optional[Callable[[str], None]] = None,
                on_output_transcript: Optional[Callable[[str], None]] = None,
                extra_event_handlers: Optional[Dict[str, Callable[[Dict[str, Any]], None]]] = None
        ):
            self.base_url = base_url
            self.api_key = api_key
            self.model = model
            self.voice = voice
            self.instructions = instructions
            self.ws = None
            self.on_text_delta = on_text_delta
            self.on_audio_delta = on_audio_delta
            self.on_input_transcript = on_input_transcript
            self.on_output_transcript = on_output_transcript
            self.turn_detection_mode = turn_detection_mode
            self.extra_event_handlers = extra_event_handlers or {}
    
            # Current response status
            self._current_response_id = None
            self._current_item_id = None
            self._is_responding = False
            # Input/output transcript printing status
            self._print_input_transcript = True
            self._output_transcript_buffer = ""
    
        async def connect(self) -> None:
            """Establish a WebSocket connection with the Realtime API."""
            url = f"{self.base_url}?model={self.model}"
            headers = {
                "Authorization": f"Bearer {self.api_key}"
            }
            self.ws = await websockets.connect(url, additional_headers=headers)
    
            # session configuration
            session_config = {
                "modalities": ["text", "audio"],
                "voice": self.voice,
                "instructions": self.instructions,
                "input_audio_format": "pcm",
                "output_audio_format": "pcm",
                "input_audio_transcription": {
                    "model": "qwen3-asr-flash-realtime"
                }
            }
    
            if self.turn_detection_mode == TurnDetectionMode.MANUAL:
                session_config['turn_detection'] = None
                await self.update_session(session_config)
            elif self.turn_detection_mode == TurnDetectionMode.SERVER_VAD:
                session_config['turn_detection'] = {
                    "type": "server_vad",
                    "threshold": 0.1,
                    "prefix_padding_ms": 500,
                    "silence_duration_ms": 900
                }
                await self.update_session(session_config)
            elif self.turn_detection_mode == TurnDetectionMode.SEMANTIC_VAD:
                session_config['turn_detection'] = {
                    "type": "semantic_vad",
                    "threshold": 0.1,
                    "prefix_padding_ms": 500,
                    "silence_duration_ms": 900
                }
                await self.update_session(session_config)
            else:
                raise ValueError(f"Invalid turn detection mode: {self.turn_detection_mode}")
    
        async def send_event(self, event) -> None:
            event['event_id'] = "event_" + str(int(time.time() * 1000))
            await self.ws.send(json.dumps(event))
    
        async def update_session(self, config: Dict[str, Any]) -> None:
            """Update the session configuration."""
            event = {
                "type": "session.update",
                "session": config
            }
            await self.send_event(event)
    
        async def stream_audio(self, audio_chunk: bytes) -> None:
            """Stream raw audio data to the API."""
            # Only 16-bit, 16 kHz, mono PCM is supported.
            audio_b64 = base64.b64encode(audio_chunk).decode()
            append_event = {
                "type": "input_audio_buffer.append",
                "audio": audio_b64
            }
            await self.send_event(append_event)
    
        async def commit_audio_buffer(self) -> None:
            """Commit the audio buffer to trigger processing."""
            event = {
                "type": "input_audio_buffer.commit"
            }
            await self.send_event(event)
    
        async def append_image(self, image_chunk: bytes) -> None:
            """Append image data to the image buffer.
            Image data can come from local files or a real-time video stream.
            Note:
                - The image format must be JPG or JPEG. We recommend a resolution of 480p or 720p. The maximum supported resolution is 1080p.
                - A single image after Base64 encoding must not exceed 256 KB. We recommend keeping the raw image size below 190 KB before encoding.
                - Encode the image data into Base64 before sending.
                - We recommend sending images at one frame per second.
                - You must send audio data at least once before you send image data.
            """
            image_b64 = base64.b64encode(image_chunk).decode()
            event = {
                "type": "input_image_buffer.append",
                "image": image_b64
            }
            await self.send_event(event)
    
        async def create_response(self) -> None:
            """Request the API to generate a response. This is required only in Manual mode."""
            event = {
                "type": "response.create"
            }
            await self.send_event(event)
    
        async def cancel_response(self) -> None:
            """Cancel the current response."""
            event = {
                "type": "response.cancel"
            }
            await self.send_event(event)
    
        async def handle_interruption(self):
            """Handle a user interruption of the current response."""
            if not self._is_responding:
                return
            # 1. Cancel the current response.
            if self._current_response_id:
                await self.cancel_response()
    
            self._is_responding = False
            self._current_response_id = None
            self._current_item_id = None
    
        async def handle_messages(self) -> None:
            try:
                async for message in self.ws:
                    event = json.loads(message)
                    event_type = event.get("type")
                    if event_type == "error":
                        print(" Error: ", event['error'])
                        continue
                    elif event_type == "response.created":
                        self._current_response_id = event.get("response", {}).get("id")
                        self._is_responding = True
                    elif event_type == "response.output_item.added":
                        self._current_item_id = event.get("item", {}).get("id")
                    elif event_type == "response.done":
                        self._is_responding = False
                        self._current_response_id = None
                        self._current_item_id = None
                    elif event_type == "input_audio_buffer.speech_started":
                        print("Speech start detected")
                        if self._is_responding:
                            print("Handling interruption")
                            await self.handle_interruption()
                    elif event_type == "input_audio_buffer.speech_stopped":
                        print("Speech end detected")
                    elif event_type == "response.text.delta":
                        if self.on_text_delta:
                            self.on_text_delta(event["delta"])
                    elif event_type == "response.audio.delta":
                        if self.on_audio_delta:
                            audio_bytes = base64.b64decode(event["delta"])
                            self.on_audio_delta(audio_bytes)
                    elif event_type == "conversation.item.input_audio_transcription.delta":
                        preview = event.get("text", "") + event.get("stash", "")
                        print(f"\rUser: {preview}", end='', flush=True)
                    elif event_type == "conversation.item.input_audio_transcription.completed":
                        transcript = event.get("transcript", "")
                        print()
                        if self.on_input_transcript:
                            await asyncio.to_thread(self.on_input_transcript, transcript)
                            self._print_input_transcript = True
                    elif event_type == "response.audio_transcript.delta":
                        if self.on_output_transcript:
                            delta = event.get("delta", "")
                            if not self._print_input_transcript:
                                self._output_transcript_buffer += delta
                            else:
                                if self._output_transcript_buffer:
                                    await asyncio.to_thread(self.on_output_transcript, self._output_transcript_buffer)
                                    self._output_transcript_buffer = ""
                                await asyncio.to_thread(self.on_output_transcript, delta)
                    elif event_type == "response.audio_transcript.done":
                        print(f"assistant: {event.get('transcript', '')}")
                        self._print_input_transcript = False
                    elif event_type in self.extra_event_handlers:
                        self.extra_event_handlers[event_type](event)
            except websockets.exceptions.ConnectionClosed:
                print(" Connection closed")
            except Exception as e:
                print(" Error in message handling: ", str(e))
        async def close(self) -> None:
            """Close the WebSocket connection."""
            if self.ws:
                await self.ws.close()
  • Select an interaction mode

    • VAD mode (Voice Activity Detection, automatically detects the start and end of speech)

      The Realtime API detects when you start and stop speaking and generates a response.

    • Manual mode (press to speak, release to send)

      You control when to start and stop sending audio. After speaking, the client must send a message to the server to generate a response.

    VAD mode

    In the same directory as omni_realtime_client.py, create a file named vad_mode.py and copy the following code into it:

    vad_mode.py

    # -- coding: utf-8 --
    import os, asyncio, pyaudio, queue, threading
    from omni_realtime_client import OmniRealtimeClient, TurnDetectionMode
    
    # Audio player class that handles interruptions
    class AudioPlayer:
        def __init__(self, pyaudio_instance, rate=24000):
            self.stream = pyaudio_instance.open(format=pyaudio.paInt16, channels=1, rate=rate, output=True)
            self.queue = queue.Queue()
            self.stop_evt = threading.Event()
            self.interrupt_evt = threading.Event()
            threading.Thread(target=self._run, daemon=True).start()
    
        def _run(self):
            while not self.stop_evt.is_set():
                try:
                    data = self.queue.get(timeout=0.5)
                    if data is None: break
                    if not self.interrupt_evt.is_set(): self.stream.write(data)
                    self.queue.task_done()
                except queue.Empty: continue
    
        def add_audio(self, data): self.queue.put(data)
        def handle_interrupt(self): self.interrupt_evt.set(); self.queue.queue.clear()
        def stop(self): self.stop_evt.set(); self.queue.put(None); self.stream.stop_stream(); self.stream.close()
    
    # Record from the microphone and send the audio
    async def record_and_send(client):
        p = pyaudio.PyAudio()
        stream = p.open(format=pyaudio.paInt16, channels=1, rate=16000, input=True, frames_per_buffer=3200)
        print("Recording started. Please speak...")
        try:
            while True:
                audio_data = stream.read(3200)
                await client.stream_audio(audio_data)
                await asyncio.sleep(0.02)
        finally:
            stream.stop_stream(); stream.close(); p.terminate()
    
    async def main():
        p = pyaudio.PyAudio()
        player = AudioPlayer(pyaudio_instance=p)
    
        client = OmniRealtimeClient(
            # This is the base_url for the Singapore region. The base_url for the China (Beijing) region is wss://dashscope.aliyuncs.com/api-ws/v1/realtime.
            base_url="wss://{WorkspaceId}.ap-southeast-1.maas.aliyuncs.com/api-ws/v1/realtime",
            api_key=os.environ.get("DASHSCOPE_API_KEY"),
            model="qwen3.5-omni-plus-realtime",
            voice="Ethan",
            instructions="You are Xiaoyun, a witty and humorous assistant.",
            # SEMANTIC_VAD is recommended for models like qwen3.5-omni-realtime.
            turn_detection_mode=TurnDetectionMode.SEMANTIC_VAD,
            on_text_delta=lambda t: print(f"\nassistant: {t}", end="", flush=True),
            on_audio_delta=player.add_audio,
        )
    
        await client.connect()
        print("Connection successful. Starting the real-time conversation...")
    
        # Run concurrently
        await asyncio.gather(client.handle_messages(), record_and_send(client))
    
    if __name__ == "__main__":
        try:
            asyncio.run(main())
        except KeyboardInterrupt:
            print("\nProgram exited.")

    Run vad_mode.py to start a real-time conversation through your microphone. The system detects speech and streams audio to the server.

    Manual mode

    In the same directory as omni_realtime_client.py, create a file named manual_mode.py and copy the following code into it:

    manual_mode.py

    # -- coding: utf-8 --
    import os
    import asyncio
    import time
    import threading
    import queue
    import pyaudio
    from omni_realtime_client import OmniRealtimeClient, TurnDetectionMode
    
    
    class AudioPlayer:
        """Real-time audio player class."""
    
        def __init__(self, sample_rate=24000, channels=1, sample_width=2):
            self.sample_rate = sample_rate
            self.channels = channels
            self.sample_width = sample_width  # 2 bytes for 16-bit
            self.audio_queue = queue.Queue()
            self.is_playing = False
            self.play_thread = None
            self.pyaudio_instance = None
            self.stream = None
            self._lock = threading.Lock()  # Add a lock for synchronized access.
            self._last_data_time = time.time()  # Record the time when the last data was received.
            self._response_done = False  # Add a flag to indicate that the response is complete.
            self._waiting_for_response = False  # Flag to indicate whether the client is waiting for a server response.
            # Record the time of the last write to the audio stream and the duration of the last audio chunk for more accurate playback end detection.
            self._last_play_time = time.time()
            self._last_chunk_duration = 0.0
    
        def start(self):
            """Start the audio player."""
            with self._lock:
                if self.is_playing:
                    return
    
                self.is_playing = True
    
                try:
                    self.pyaudio_instance = pyaudio.PyAudio()
    
                    # Create an audio output stream.
                    self.stream = self.pyaudio_instance.open(
                        format=pyaudio.paInt16,  # 16-bit
                        channels=self.channels,
                        rate=self.sample_rate,
                        output=True,
                        frames_per_buffer=1024
                    )
    
                    # Start the playback thread.
                    self.play_thread = threading.Thread(target=self._play_audio)
                    self.play_thread.daemon = True
                    self.play_thread.start()
    
                    print("Audio player started")
                except Exception as e:
                    print(f"Failed to start audio player: {e}")
                    self._cleanup_resources()
                    raise
    
        def stop(self):
            """Stop the audio player."""
            with self._lock:
                if not self.is_playing:
                    return
    
                self.is_playing = False
    
            # Clear the queue.
            while not self.audio_queue.empty():
                try:
                    self.audio_queue.get_nowait()
                except queue.Empty:
                    break
    
            # Wait for the playback thread to finish. Wait outside the lock to avoid a deadlock.
            if self.play_thread and self.play_thread.is_alive():
                self.play_thread.join(timeout=2.0)
    
            # Acquire the lock again to clean up resources.
            with self._lock:
                self._cleanup_resources()
    
            print("Audio player stopped")
    
        def _cleanup_resources(self):
            """Clean up audio resources. This must be called within the lock."""
            try:
                # Close the audio stream.
                if self.stream:
                    if not self.stream.is_stopped():
                        self.stream.stop_stream()
                    self.stream.close()
                    self.stream = None
            except Exception as e:
                print(f"Error closing audio stream: {e}")
    
            try:
                if self.pyaudio_instance:
                    self.pyaudio_instance.terminate()
                    self.pyaudio_instance = None
            except Exception as e:
                print(f"Error terminating PyAudio: {e}")
    
        def add_audio_data(self, audio_data):
            """Add audio data to the playback queue."""
            if self.is_playing and audio_data:
                self.audio_queue.put(audio_data)
                with self._lock:
                    self._last_data_time = time.time()  # Update the time when the last data was received.
                    self._waiting_for_response = False  # Data received, no longer waiting.
    
        def stop_receiving_data(self):
            """Mark that no more new audio data will be received."""
            with self._lock:
                self._response_done = True
                self._waiting_for_response = False  # Response ended, no longer waiting.
    
        def prepare_for_next_turn(self):
            """Reset the player state for the next conversation turn."""
            with self._lock:
                self._response_done = False
                self._last_data_time = time.time()
                self._last_play_time = time.time()
                self._last_chunk_duration = 0.0
                self._waiting_for_response = True  # Start waiting for the next response.
    
            # Clear any remaining audio data from the previous turn.
            while not self.audio_queue.empty():
                try:
                    self.audio_queue.get_nowait()
                except queue.Empty:
                    break
    
        def is_finished_playing(self):
            """Check if all audio data has been played."""
            with self._lock:
                queue_size = self.audio_queue.qsize()
                time_since_last_data = time.time() - self._last_data_time
                time_since_last_play = time.time() - self._last_play_time
    
                # ---------------------- Smart end detection ----------------------
                # 1. Preferred method: If the server has marked completion and the playback queue is empty,
                #    wait for the most recent audio chunk to finish playing (chunk duration + 0.1s tolerance).
                if self._response_done and queue_size == 0:
                    min_wait = max(self._last_chunk_duration + 0.1, 0.5)  # Wait at least 0.5s.
                    if time_since_last_play >= min_wait:
                        return True
    
                # 2. Fallback method: If no new data has been received for over a second and the playback queue is empty.
                #    This logic serves as a safeguard if the server does not explicitly send `response.done`.
                if not self._waiting_for_response and queue_size == 0 and time_since_last_data > 1.0:
                    print("\n(No new audio received for a while, assuming playback is finished)")
                    return True
    
                return False
    
        def _play_audio(self):
            """Worker thread for playing audio data."""
            while True:
                # Check if it should stop.
                with self._lock:
                    if not self.is_playing:
                        break
                    stream_ref = self.stream  # Get a reference to the stream.
    
                try:
                    # Get audio data from the queue, with a timeout of 0.1 seconds.
                    audio_data = self.audio_queue.get(timeout=0.1)
    
                    # Check the status and stream validity again.
                    with self._lock:
                        if self.is_playing and stream_ref and not stream_ref.is_stopped():
                            try:
                                # Play the audio data.
                                stream_ref.write(audio_data)
                                # Update the latest playback information.
                                self._last_play_time = time.time()
                                self._last_chunk_duration = len(audio_data) / (
                                            self.channels * self.sample_width) / self.sample_rate
                            except Exception as e:
                                print(f"Error writing to audio stream: {e}")
                                break
    
                    # Mark this data block as processed.
                    self.audio_queue.task_done()
    
                except queue.Empty:
                    # Continue waiting if the queue is empty.
                    continue
                except Exception as e:
                    print(f"Error playing audio: {e}")
                    break
    
    
    class MicrophoneRecorder:
        """Real-time microphone recorder."""
    
        def __init__(self, sample_rate=16000, channels=1, chunk_size=3200):
            self.sample_rate = sample_rate
            self.channels = channels
            self.chunk_size = chunk_size
            self.pyaudio_instance = None
            self.stream = None
            self.frames = []
            self._is_recording = False
            self._record_thread = None
    
        def _recording_thread(self):
            """Recording worker thread."""
            # Continuously read data from the audio stream while _is_recording is True.
            while self._is_recording:
                try:
                    # Use exception_on_overflow=False to avoid crashing due to a buffer overflow.
                    data = self.stream.read(self.chunk_size, exception_on_overflow=False)
                    self.frames.append(data)
                except (IOError, OSError) as e:
                    # When the stream is closed, a read operation may raise an error.
                    print(f"Error reading from recording stream, it might be closed: {e}")
                    break
    
        def start(self):
            """Start recording."""
            if self._is_recording:
                print("Recording is already in progress.")
                return
    
            self.frames = []
            self._is_recording = True
    
            try:
                self.pyaudio_instance = pyaudio.PyAudio()
                self.stream = self.pyaudio_instance.open(
                    format=pyaudio.paInt16,
                    channels=self.channels,
                    rate=self.sample_rate,
                    input=True,
                    frames_per_buffer=self.chunk_size
                )
    
                self._record_thread = threading.Thread(target=self._recording_thread)
                self._record_thread.daemon = True
                self._record_thread.start()
                print("Microphone recording started...")
            except Exception as e:
                print(f"Failed to start microphone: {e}")
                self._is_recording = False
                self._cleanup()
                raise
    
        def stop(self):
            """Stop recording and return the audio data."""
            if not self._is_recording:
                return None
    
            self._is_recording = False
    
            # Wait for the recording thread to exit safely.
            if self._record_thread:
                self._record_thread.join(timeout=1.0)
    
            self._cleanup()
    
            print("Microphone recording stopped.")
            return b''.join(self.frames)
    
        def _cleanup(self):
            """Safely clean up PyAudio resources."""
            if self.stream:
                try:
                    if self.stream.is_active():
                        self.stream.stop_stream()
                    self.stream.close()
                except Exception as e:
                    print(f"Error closing audio stream: {e}")
    
            if self.pyaudio_instance:
                try:
                    self.pyaudio_instance.terminate()
                except Exception as e:
                    print(f"Error terminating PyAudio instance: {e}")
    
            self.stream = None
            self.pyaudio_instance = None
    
    
    async def interactive_test():
        """
        Interactive test for multi-turn conversations with audio and image support.
        """
        # ------------------- 1. Initialization and connection (one-time) -------------------
        # API keys for the Singapore and China (Beijing) regions are different. To get an API key, see https://www.alibabacloud.com/help/en/model-studio/get-api-key
        api_key = os.environ.get("DASHSCOPE_API_KEY")
        if not api_key:
            print("Please set the DASHSCOPE_API_KEY environment variable.")
            return
    
        print("--- Real-time Multimodal Audio/Video Chat Client ---")
        print("Initializing audio player and client...")
    
        audio_player = AudioPlayer()
        audio_player.start()
    
        def on_audio_received(audio_data):
            audio_player.add_audio_data(audio_data)
    
        def on_response_done(event):
            print("\n(Received response end marker)")
            audio_player.stop_receiving_data()
    
        realtime_client = OmniRealtimeClient(
            # This is the base_url for the Singapore region. If you use a model in the China (Beijing) region, replace the base_url with wss://dashscope.aliyuncs.com/api-ws/v1/realtime.
            base_url="wss://{WorkspaceId}.ap-southeast-1.maas.aliyuncs.com/api-ws/v1/realtime",
            api_key=api_key,
            model="qwen3.5-omni-plus-realtime",
            voice="Ethan",
            instructions="You are Xiaoyun, a personal assistant. Please answer the user's questions accurately and in a friendly manner, always responding with a helpful attitude.", # Set the model's role.
            on_text_delta=lambda text: print(f"assistant: {text}", end="", flush=True),
            on_audio_delta=on_audio_received,
            turn_detection_mode=TurnDetectionMode.MANUAL,
            extra_event_handlers={"response.done": on_response_done}
        )
    
        message_handler_task = None
        try:
            await realtime_client.connect()
            print("Connected to the server. Enter 'q' or 'quit' to exit at any time.")
            message_handler_task = asyncio.create_task(realtime_client.handle_messages())
            await asyncio.sleep(0.5)
    
            turn_counter = 1
            # ------------------- 2. Multi-turn conversation loop -------------------
            while True:
                print(f"\n--- Turn {turn_counter} ---")
                audio_player.prepare_for_next_turn()
    
                recorded_audio = None
                image_paths = []
    
                # --- Get user input: Record from microphone ---
                loop = asyncio.get_event_loop()
                recorder = MicrophoneRecorder(sample_rate=16000)  # We recommend a 16k sample rate for speech recognition.
    
                print("Ready to record. Press Enter to start recording (or enter 'q' to exit)...")
                user_input = await loop.run_in_executor(None, input)
                if user_input.strip().lower() in ['q', 'quit']:
                    print("User requested to exit...")
                    return
    
                try:
                    recorder.start()
                except Exception:
                    print("Could not start recording. Please check your microphone permissions and device. Skipping this turn.")
                    continue
    
                print("Recording... Press Enter again to stop recording.")
                await loop.run_in_executor(None, input)
    
                recorded_audio = recorder.stop()
    
                if not recorded_audio or len(recorded_audio) == 0:
                    print("No valid audio was recorded. Please start this turn again.")
                    continue
    
                # --- Get image input (optional) ---
                # The image input feature is disabled by default. Uncomment the code below to enable it.
                # print("\nEnter the absolute path of an [image file] on each line (optional). When finished, enter 's' or press Enter to send the request.")
                # while True:
                #     path = input("Image path: ").strip()
                #     if path.lower() == 's' or path == '':
                #         break
                #     if path.lower() in ['q', 'quit']:
                #         print("User requested to exit...")
                #         return
                #
                #     if not os.path.isabs(path):
                #         print("Error: Please enter an absolute path.")
                #         continue
                #     if not os.path.exists(path):
                #         print(f"Error: File not found -> {path}")
                #         continue
                #     image_paths.append(path)
                #     print(f"Image added: {os.path.basename(path)}")
    
                # --- 3. Send data and get response ---
                print("\n--- Input Confirmation ---")
                print(f"Audio to process: 1 (from microphone), Images: {len(image_paths)}")
                print("------------------")
    
                # 3.1 Send the recorded audio.
                try:
                    print(f"Sending microphone recording ({len(recorded_audio)} bytes)")
                    await realtime_client.stream_audio(recorded_audio)
                    await asyncio.sleep(0.1)
                except Exception as e:
                    print(f"Failed to send microphone recording: {e}")
                    continue
    
                # 3.2 Send all image files.
                # The image input feature is disabled by default. Uncomment the code below to enable it.
                # for i, path in enumerate(image_paths):
                #     try:
                #         with open(path, "rb") as f:
                #             data = f.read()
                #         print(f"Sending image {i+1}: {os.path.basename(path)} ({len(data)} bytes)")
                #         await realtime_client.append_image(data)
                #         await asyncio.sleep(0.1)
                #     except Exception as e:
                #         print(f"Failed to send image {os.path.basename(path)}: {e}")
    
                # 3.3 Submit and wait for the response.
                print("Submitting all inputs, requesting server response...")
                await realtime_client.commit_audio_buffer()
                await realtime_client.create_response()
    
                print("Waiting for and playing server response audio...")
                start_time = time.time()
                max_wait_time = 60
                while not audio_player.is_finished_playing():
                    if time.time() - start_time > max_wait_time:
                        print(f"\nWait timed out ({max_wait_time} seconds). Moving to the next turn.")
                        break
                    await asyncio.sleep(0.2)
                
                print("\nAudio playback for this turn is complete!")
                turn_counter += 1
    
        except (asyncio.CancelledError, KeyboardInterrupt):
            print("\nProgram was interrupted.")
        except Exception as e:
            print(f"An unhandled error occurred: {e}")
        finally:
            # ------------------- 4. Clean up resources -------------------
            print("\nClosing connection and cleaning up resources...")
            if message_handler_task and not message_handler_task.done():
                message_handler_task.cancel()
    
            if 'realtime_client' in locals() and realtime_client.ws and not realtime_client.ws.close:
                await realtime_client.close()
                print("Connection closed.")
    
            audio_player.stop()
            print("Program exited.")
    
    
    if __name__ == "__main__":
        try:
            asyncio.run(interactive_test())
        except KeyboardInterrupt:
            print("\nProgram was forcibly exited by the user.")

    Run manual_mode.py. Press Enter to start recording, and press Enter again to stop and send.

WebRTC

Python

  • Runtime environment

    Python 3.10 or later is required. Install the following dependencies:

    pip install aiortc aiohttp sounddevice numpy certifi av
  • Run the demo

    Create a Python file named webrtc_demo.py and paste the following code:

    webrtc_demo.py

    # Dependencies: pip install aiortc aiohttp sounddevice numpy certifi av
    import asyncio
    import json
    import os
    import queue
    import ssl
    import threading
    
    import aiohttp
    import certifi
    import numpy as np
    import sounddevice as sd
    from aiortc import RTCPeerConnection, RTCConfiguration, RTCSessionDescription
    from aiortc.contrib.media import MediaPlayer
    from av import AudioFrame
    
    # Replace with your API key, or set the DASHSCOPE_API_KEY environment variable
    API_KEY = os.getenv("DASHSCOPE_API_KEY", "your-api-key")
    MODEL = "qwen3.5-omni-plus-realtime"
    # Replace {endpoint} with the endpoint obtained from your account manager
    SIGNALING_URL = f"https://{{endpoint}}/api/v1/webrtc/realtime?model={MODEL}"
    
    
    # --------------- Audio frame parsing ---------------
    
    def _nb_channels(frame: AudioFrame) -> int:
        """Get the number of channels in an audio frame, compatible with different PyAV versions"""
        if hasattr(frame.layout, "nb_channels"):
            return int(frame.layout.nb_channels)
        ch = getattr(frame.layout, "channels", 1)
        if isinstance(ch, (tuple, list)):
            return len(ch)
        return int(ch)
    
    
    def audioframe_to_s16_samples(frame: AudioFrame) -> np.ndarray:
        """
        Server audio frames are stereo interleaved. Direct reshape causes channel misalignment.
        Rearrange to (samples, channels) based on actual channel count.
        Different aiortc decoder versions return different array shapes for the same audio,
        so unified handling is needed here.
        """
        arr = np.asarray(frame.to_ndarray())
        ch = _nb_channels(frame)
        samples = int(frame.samples)
    
        if arr.ndim == 2 and arr.shape[0] == ch and arr.shape[1] == samples:
            return arr.T.copy()
        if arr.ndim == 2 and arr.shape[0] == 1 and arr.shape[1] == samples * ch:
            return arr.reshape(-1).reshape(samples, ch).copy()
        if arr.ndim == 1 and arr.shape[0] == samples * ch:
            return arr.reshape(samples, ch).copy()
    
        flat = arr.reshape(-1)
        if ch > 0 and flat.size % ch == 0:
            return flat.reshape(flat.size // ch, ch).copy()
        raise ValueError(f"unexpected shape={arr.shape}, ch={ch}, samples={samples}")
    
    
    # --------------- Low-latency audio player ---------------
    
    class RemoteAudioPlayer:
        """
        Low-latency audio player that plays 5ms audio blocks to minimize delay.
        Supports voice interruption: clears the buffer when the user starts speaking,
        stopping playback of old model responses.
        Merges stereo server audio to mono (average of left and right channels) for playback.
        """
        def __init__(self, samplerate=48000, out_channels=1, blocksize=240, max_seconds=0.2):
            self.samplerate = samplerate
            self.out_channels = out_channels
            self.blocksize = blocksize
            self._q = queue.Queue(maxsize=max(5, int(max_seconds * samplerate / blocksize) + 5))
            self._lock = threading.Lock()
            self._rb_size = max(1, int(max_seconds * samplerate))
            self._rb = np.zeros((self._rb_size, out_channels), dtype=np.int16)
            self._rb_w = 0
            self._rb_r = 0
            self._rb_len = 0
            self._stream = None
            self._closed = False
    
        def start(self):
            if self._stream:
                return
    
            def callback(outdata, frames, _time, status):
                if self._closed:
                    outdata[:] = np.zeros((frames, self.out_channels), dtype=np.int16)
                    return
                while True:
                    try:
                        chunk = self._q.get_nowait()
                    except queue.Empty:
                        break
                    with self._lock:
                        self._write_rb(chunk)
                with self._lock:
                    out = self._read_rb(frames)
                outdata[:] = out
    
            self._stream = sd.OutputStream(
                samplerate=self.samplerate,
                channels=self.out_channels,
                dtype="int16",
                blocksize=self.blocksize,
                callback=callback,
            )
            self._stream.start()
    
        def clear(self):
            """Clear playback buffer for voice interruption"""
            try:
                while True:
                    self._q.get_nowait()
            except queue.Empty:
                pass
            with self._lock:
                self._rb_w = 0
                self._rb_r = 0
                self._rb_len = 0
                self._rb[:] = 0
    
        def _write_rb(self, chunk: np.ndarray):
            n = int(chunk.shape[0])
            if n <= 0:
                return
            overflow = max(0, self._rb_len + n - self._rb_size)
            if overflow > 0:
                self._rb_r = (self._rb_r + overflow) % self._rb_size
                self._rb_len -= overflow
            end = self._rb_size - self._rb_w
            if n <= end:
                self._rb[self._rb_w:self._rb_w + n] = chunk
            else:
                self._rb[self._rb_w:] = chunk[:end]
                self._rb[:n - end] = chunk[end:]
            self._rb_w = (self._rb_w + n) % self._rb_size
            self._rb_len += n
    
        def _read_rb(self, frames: int) -> np.ndarray:
            if self._rb_len <= 0:
                return np.zeros((frames, self.out_channels), dtype=np.int16)
            n = min(frames, self._rb_len)
            out = np.zeros((frames, self.out_channels), dtype=np.int16)
            end = self._rb_size - self._rb_r
            if n <= end:
                out[:n] = self._rb[self._rb_r:self._rb_r + n]
            else:
                out[:end] = self._rb[self._rb_r:]
                out[end:n] = self._rb[:n - end]
            self._rb_r = (self._rb_r + n) % self._rb_size
            self._rb_len -= n
            return out
    
        async def push_frame(self, frame: AudioFrame):
            """Receive audio frames, auto-merge channels and enqueue"""
            if self._closed:
                return
            pcm = audioframe_to_s16_samples(frame)
            in_ch = pcm.shape[1]
            if self.out_channels == 1:
                if in_ch == 1:
                    out = pcm
                else:
                    out = np.mean(pcm.astype(np.int32), axis=1).astype(np.int16).reshape(-1, 1)
            else:
                if in_ch == self.out_channels:
                    out = pcm
                elif in_ch == 1 and self.out_channels == 2:
                    out = np.repeat(pcm, 2, axis=1)
                else:
                    out = pcm[:, :self.out_channels]
            try:
                self._q.put_nowait(out)
            except queue.Full:
                try:
                    self._q.get_nowait()
                except queue.Empty:
                    pass
                try:
                    self._q.put_nowait(out)
                except queue.Full:
                    pass
    
        async def close(self):
            self._closed = True
            if self._stream:
                self._stream.stop()
                self._stream.close()
                self._stream = None
    
    
    # --------------- main ---------------
    
    async def main():
        pc = RTCPeerConnection(RTCConfiguration(iceServers=[]))
    
        # Initialize audio player (mono output, 5ms blocksize for low latency)
        speaker = RemoteAudioPlayer(samplerate=48000, out_channels=1, blocksize=240, max_seconds=0.2)
        speaker.start()
    
        # Initialize microphone (macOS avfoundation; for Linux use pulse or alsa)
        mic = MediaPlayer("none:0", format="avfoundation",
                          options={"sample_rate": "48000", "channels": "1"})
        if not mic.audio:
            raise RuntimeError("No microphone detected. Check the avfoundation audio device index.")
        pc.addTrack(mic.audio)
    
        # Client creates DataChannel (name is customizable); server pushes events through a channel named "txt"
        pc.createDataChannel("oai-events")
    
        remote_dc = None
        got_first_txt_msg = False
    
        def make_session_update() -> dict:
            """Build session.update config: voice, audio format, VAD strategy, inference parameters"""
            return {
                "type": "session.update",
                "session": {
                    "modalities": ["text", "audio"],
                    "voice": "Tina",
                    "input_audio_format": "pcm",
                    "output_audio_format": "pcm",
                    "instructions": "You are a friendly AI assistant.",
                    "turn_detection": {"type": "server_vad", "threshold": 0.5, "silence_duration_ms": 800},
                    "max_tokens": 16384,
                    "temperature": 0.9,
                },
            }
    
        # Handle server-pushed DataChannel events
        @pc.on("datachannel")
        def on_datachannel(ch):
            nonlocal remote_dc, got_first_txt_msg
            print(f"[DC] Received server DataChannel: {ch.label}")
            if ch.label == "txt":
                remote_dc = ch
    
            @ch.on("message")
            def on_msg(msg):
                nonlocal got_first_txt_msg
                try:
                    evt = json.loads(msg)
                except Exception:
                    return
                print(f"[{ch.label}] {evt.get('type')}")
    
                # Clear playback buffer when user starts speaking (voice interruption)
                if isinstance(evt, dict) and evt.get("type") == "input_audio_buffer.speech_started":
                    speaker.clear()
                    print("[Playback] User speech detected, clearing buffer (interruption)")
    
                # Send session.update after receiving first message on txt channel
                if ch.label == "txt" and not got_first_txt_msg:
                    got_first_txt_msg = True
                    if remote_dc and remote_dc.readyState == "open":
                        remote_dc.send(json.dumps(make_session_update(), ensure_ascii=False))
                        print("[DC] session.update sent")
    
        # Receive server audio and play with low latency
        @pc.on("track")
        async def on_track(track):
            if track.kind == "audio":
                async def _play():
                    try:
                        while True:
                            frame = await track.recv()
                            await speaker.push_frame(frame)
                    except Exception:
                        pass
                asyncio.create_task(_play())
    
        @pc.on("iceconnectionstatechange")
        def on_ice():
            print(f"[ICE] {pc.iceConnectionState}")
    
        @pc.on("connectionstatechange")
        async def on_conn():
            print(f"[Connection] {pc.connectionState}")
            if pc.connectionState in ("failed", "closed", "disconnected"):
                await pc.close()
    
        # SDP exchange: create Offer and POST to signaling server, get Answer
        offer = await pc.createOffer()
        await pc.setLocalDescription(offer)
    
        async with aiohttp.ClientSession() as session:
            async with session.post(
                SIGNALING_URL,
                ssl=ssl.create_default_context(cafile=certifi.where()),
                data=offer.sdp.encode("utf-8"),
                headers={
                    "Content-Type": "application/sdp",
                    "Authorization": f"Bearer {API_KEY}",
                },
                timeout=aiohttp.ClientTimeout(total=10),
            ) as resp:
                if not resp.ok:
                    raise Exception(f"SDP exchange failed: {resp.status} {await resp.text()}")
                answer_sdp = await resp.text()
    
        await pc.setRemoteDescription(RTCSessionDescription(sdp=answer_sdp, type="answer"))
        print("SDP exchange complete, waiting for connection...")
    
        try:
            await asyncio.Event().wait()
        except (KeyboardInterrupt, asyncio.CancelledError):
            pass
        finally:
            print(f"\nExiting. Final state: connection={pc.connectionState}, ICE={pc.iceConnectionState}")
            await speaker.close()
            try:
                if mic and mic.audio:
                    mic.audio.stop()
            except Exception:
                pass
            await pc.close()
    
    asyncio.run(main())

    Run webrtc_demo.py to start a real-time conversation with the Qwen-Omni-Realtime model through your microphone. The system detects the start of your speech and sends audio to the server automatically.

JavaScript

  • Prerequisites

    • Use a modern browser that supports WebRTC (Chrome, Edge, Firefox, Safari, etc.).

    • The browser requires microphone permission.

    • Due to browser cross-origin security policies, the browser cannot directly send the connection request to the server. You need to run a curl command in the terminal to complete the connection setup.

  • Run the demo

    Create an HTML file named webrtc_demo.html and paste the following code:

    webrtc_demo.html

    <!DOCTYPE html>
    <html lang="en">
    <head>
        <meta charset="UTF-8" />
        <title>WebRTC Realtime Voice Chat</title>
        <style>
            * { box-sizing: border-box; margin: 0; padding: 0; }
            body { font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, "Helvetica Neue", Arial, sans-serif; background: #f5f7fa; color: #1d2129; padding: 24px; line-height: 1.6; }
    
            .container { max-width: 800px; margin: 0 auto; }
    
            h1 { font-size: 22px; font-weight: 600; margin-bottom: 20px; color: #1d2129; }
    
            /* Sticky top bar */
            .sticky-top { position: sticky; top: 0; z-index: 100; background: #f5f7fa; margin: 0 -24px 16px; padding: 12px 24px; border-bottom: 1px solid transparent; transition: border-color .2s; }
            .sticky-top.scrolled { border-bottom-color: #e5e6eb; }
    
            /* Toolbar */
            .toolbar { display: flex; align-items: center; gap: 10px; flex-wrap: wrap; margin-bottom: 12px; }
            .toolbar label { display: flex; align-items: center; gap: 6px; font-size: 13px; color: #4e5969; cursor: pointer; }
    
            /* Buttons */
            button { padding: 8px 18px; font-size: 13px; font-weight: 500; border: 1px solid #c9cdd4; border-radius: 6px; background: #fff; color: #1d2129; cursor: pointer; transition: all .15s; }
            button:hover:not(:disabled) { border-color: #165dff; color: #165dff; }
            button:disabled { opacity: .4; cursor: not-allowed; }
            .btn-primary { background: #165dff; border-color: #165dff; color: #fff; }
            .btn-primary:hover:not(:disabled) { background: #4080ff; border-color: #4080ff; color: #fff; }
            .btn-danger { border-color: #f53f3f; color: #f53f3f; }
            .btn-danger:hover:not(:disabled) { background: #f53f3f; color: #fff; }
    
            /* Status indicator */
            .status-bar { display: flex; align-items: center; gap: 8px; padding: 10px 14px; border-radius: 8px; background: #fff; border: 1px solid #e5e6eb; font-size: 13px; }
            .status-dot { width: 8px; height: 8px; border-radius: 50%; background: #c9cdd4; flex-shrink: 0; }
            .status-dot.connected { background: #00b42a; }
            .status-dot.connecting { background: #ff7d00; animation: pulse 1s infinite; }
            .status-dot.error { background: #f53f3f; }
            @keyframes pulse { 0%,100% { opacity: 1; } 50% { opacity: .4; } }
    
            /* SDP card */
            .card { background: #fff; border: 1px solid #e5e6eb; border-radius: 10px; padding: 16px; margin-bottom: 16px; }
            .card-title { font-size: 13px; font-weight: 600; color: #4e5969; margin-bottom: 8px; }
            .step-num { display: inline-flex; align-items: center; justify-content: center; width: 20px; height: 20px; border-radius: 50%; background: #165dff; color: #fff; font-size: 11px; font-weight: 600; margin-right: 6px; }
            .card-hint { font-size: 12px; color: #86909c; margin-top: 6px; }
    
            textarea { width: 100%; font-family: "SF Mono", "Fira Code", "Fira Mono", Menlo, Consolas, monospace; font-size: 12px; padding: 10px; border: 1px solid #e5e6eb; border-radius: 6px; resize: vertical; background: #f7f8fa; color: #1d2129; transition: border-color .15s; }
            textarea:focus { outline: none; border-color: #165dff; background: #fff; }
    
            /* Video */
            .video-section { margin-bottom: 16px; }
            .video-label { font-size: 13px; color: #86909c; margin-bottom: 6px; }
            video { width: 320px; max-width: 100%; background: #000; border-radius: 8px; display: block; }
    
            /* Events panel */
            .events-title { font-size: 14px; font-weight: 600; color: #1d2129; margin-bottom: 10px; }
            .events-container { display: flex; flex-direction: column; gap: 6px; }
            .event-item { background: #fff; border: 1px solid #e5e6eb; border-radius: 8px; overflow: hidden; }
            .event-header { display: flex; align-items: center; gap: 8px; padding: 8px 12px; cursor: pointer; user-select: none; font-size: 12px; }
            .event-header:hover { background: #f7f8fa; }
            .event-arrow { font-size: 14px; font-weight: 700; width: 18px; text-align: center; }
            .event-arrow.server { color: #00b42a; }
            .event-arrow.client { color: #165dff; }
            .event-label { color: #4e5969; }
            .event-time { color: #c9cdd4; margin-left: auto; font-size: 11px; }
            .event-body { display: none; padding: 10px 12px; background: #f7f8fa; border-top: 1px solid #e5e6eb; }
            .event-body pre { margin: 0; font-size: 11px; font-family: "SF Mono", Menlo, Consolas, monospace; color: #4e5969; white-space: pre-wrap; word-break: break-all; }
            .events-empty { font-size: 13px; color: #c9cdd4; padding: 16px 0; text-align: center; }
        </style>
    </head>
    <body>
    <div class="container">
        <h1>WebRTC Realtime Voice Chat</h1>
    
        <div class="sticky-top">
            <div class="toolbar">
                <button id="startBtn" class="btn-primary">Start Session</button>
                <button id="setAnswerBtn" disabled>Set Answer</button>
                <button id="endBtn" class="btn-danger" disabled>End Session</button>
                <button id="downloadBtn" disabled>Download Remote Audio</button>
                <label>
                    <input id="sendVideoCheckbox" type="checkbox" />
                    Enable Video
                </label>
            </div>
    
            <div class="status-bar">
                <span class="status-dot" id="statusDot"></span>
                <span id="statusText">Ready</span>
            </div>
        </div>
    
        <div class="card">
            <div class="card-title"><span class="step-num">1</span>Offer SDP</div>
            <div style="margin-bottom: 8px;">
                <button id="copyOfferBtn" disabled>Copy Offer SDP</button>
            </div>
            <textarea id="offerBox" rows="6" readonly placeholder="Auto-generated after clicking Start Session"></textarea>
            <div class="card-hint">Auto-generated after ICE gathering completes. Copy and send to the server via curl to get the Answer.</div>
        </div>
    
        <div class="card">
            <div class="card-title"><span class="step-num">2</span>curl Command</div>
            <div style="margin-bottom: 8px;">
                <button id="copyCurlBtn" disabled>Copy curl Command</button>
            </div>
            <textarea id="curlBox" rows="6" readonly placeholder="Auto-filled after Offer SDP is generated"></textarea>
            <div class="card-hint">Copy this command to your terminal. Paste the returned Answer SDP below.</div>
        </div>
    
        <div class="card">
            <div class="card-title"><span class="step-num">3</span>Answer SDP</div>
            <textarea id="answerBox" rows="6" placeholder="Paste the Answer SDP returned by curl here"></textarea>
            <div class="card-hint">After pasting, click Set Answer above to establish the connection.</div>
        </div>
    
        <div class="video-section" id="videoSection" style="display:none;">
            <div class="video-label">Local Video Preview</div>
            <video id="localVideo" autoplay playsinline muted></video>
        </div>
    
        <div class="events-title">Events (DataChannel)</div>
        <div id="events" class="events-container"></div>
    </div>
    
    <script>
        const eventsDiv = document.getElementById('events');
        const startBtn = document.getElementById('startBtn');
        const setAnswerBtn = document.getElementById('setAnswerBtn');
        const endBtn = document.getElementById('endBtn');
        const downloadBtn = document.getElementById('downloadBtn');
        const copyOfferBtn = document.getElementById('copyOfferBtn');
        const statusDot = document.getElementById('statusDot');
        const statusText = document.getElementById('statusText');
    
        const copyCurlBtn = document.getElementById('copyCurlBtn');
        const curlBox = document.getElementById('curlBox');
    
        const sendVideoCheckbox = document.getElementById('sendVideoCheckbox');
        const localVideo = document.getElementById('localVideo');
    
        const offerBox = document.getElementById('offerBox');
        const answerBox = document.getElementById('answerBox');
    
        let pc = null;
        let hiddenRemoteAudioEl = null;
    
        let mediaRecorder = null;
        let recordedChunks = [];
        let audioBlob = null;
    
        let localStream = null;
    
        let sendCanvas = null;
        let sendCanvasCtx = null;
        let sendCanvasStream = null;
        let sendRafId = 0;
    
        let gatedAudioTracks = [];
        let gatedVideoTracks = [];
        let audioSender = null;
        let videoSender = null;
        let audioTrack = null;
        let videoTrack = null;
    
        function setStatus(text, state) {
          statusText.textContent = text;
          statusDot.className = 'status-dot' + (state ? ' ' + state : '');
        }
    
        function gateMedia(on) {
          for (const t of gatedAudioTracks) t.enabled = !!on;
          for (const t of gatedVideoTracks) t.enabled = !!on;
        }
    
        function sendUpdate(channel) {
          const update = {
            event_id: `event_${Date.now()}`,
            type: "session.update",
            session: {
              input_audio_format: "pcm",
              input_audio_transcription: { model: "qwen3-asr-flash-realtime" },
              instructions: "You are a helpful assistant.",
              modalities: ["text", "audio"],
              output_audio_format: "pcm",
              smooth_output: false,
              turn_detection: {
                prefix_padding_ms: 500,
                silence_duration_ms: 800,
                threshold: 0.5,
                type: "server_vad",
              },
            },
          };
          if (channel && channel.readyState === "open") channel.send(JSON.stringify(update));
        }
    
        // ===== Events panel =====
        const events = [];
        function nowTs() { return new Date().toLocaleTimeString(); }
    
        function renderEvents() {
          eventsDiv.innerHTML = "";
          if (events.length === 0) {
            const empty = document.createElement("div");
            empty.className = "events-empty";
            empty.textContent = "Waiting for events...";
            eventsDiv.appendChild(empty);
            return;
          }
    
          for (const item of events) {
            const { event, timestamp } = item;
            const isClient = event?.type?.includes("update") || event?.type?.includes("create");
    
            const wrap = document.createElement("div");
            wrap.className = "event-item";
    
            const header = document.createElement("div");
            header.className = "event-header";
    
            const arrow = document.createElement("span");
            arrow.className = "event-arrow " + (isClient ? "client" : "server");
            arrow.textContent = isClient ? "↓" : "↑";
    
            const label = document.createElement("span");
            label.className = "event-label";
            const who = isClient ? "client" : "server";
            const type = event?.type ?? "message";
            label.textContent = `${who}: ${type}`;
    
            const time = document.createElement("span");
            time.className = "event-time";
            time.textContent = timestamp;
    
            const body = document.createElement("div");
            body.className = "event-body";
            const pre = document.createElement("pre");
            pre.textContent = JSON.stringify(event, null, 2);
            body.appendChild(pre);
    
            header.onclick = () => { body.style.display = body.style.display === "block" ? "none" : "block"; };
    
            header.appendChild(arrow);
            header.appendChild(label);
            header.appendChild(time);
            wrap.appendChild(header);
            wrap.appendChild(body);
    
            eventsDiv.appendChild(wrap);
          }
        }
    
        function clearUIEvents() { events.length = 0; renderEvents(); }
        function pushEventFromDataChannel(eventObj) {
          const ts = eventObj.timestamp || nowTs();
          if (!eventObj.timestamp) eventObj.timestamp = ts;
          events.unshift({ event: eventObj, timestamp: ts });
          renderEvents();
        }
    
        function normalizeSdpForSetRemote(sdp) {
          sdp = String(sdp).trim().replace(/\r?\n/g, "\r\n");
          if (!sdp.endsWith("\r\n")) sdp += "\r\n";
          return sdp;
        }
    
        // ===== WebRTC =====
        startBtn.onclick = () => startSession().catch(err => console.log("startSession error:", err));
        endBtn.onclick = () => endSession();
        setAnswerBtn.onclick = () => setRemoteAnswerFromUI().catch(err => console.log("setRemoteAnswer error:", err));
        copyOfferBtn.onclick = async () => {
          const txt = offerBox.value;
          if (!txt) return;
          await navigator.clipboard.writeText(txt);
          alert("Offer SDP copied");
        };
        copyCurlBtn.onclick = async () => {
          const txt = curlBox.value;
          if (!txt) return;
          await navigator.clipboard.writeText(txt);
          alert("curl command copied. Run it in your terminal.");
        };
        downloadBtn.onclick = () => {
          if (audioBlob) downloadBlob(audioBlob, 'remote-audio.webm');
          else alert('No audio recording available');
        };
    
        async function startSession() {
          if (pc) return;
    
          pc = new RTCPeerConnection({ iceServers: [] });
          clearUIEvents();
          setStatus('Requesting microphone access...', 'connecting');
    
          offerBox.value = "";
          answerBox.value = "";
          curlBox.value = "";
          setAnswerBtn.disabled = true;
          copyOfferBtn.disabled = true;
          copyCurlBtn.disabled = true;
    
          endBtn.disabled = false;
          downloadBtn.disabled = true;
    
          pc.onconnectionstatechange = () => {
            if (!pc) return;
            if (pc.connectionState === 'connected') {
              setStatus('Connected. Start speaking.', 'connected');
            } else if (["failed", "closed", "disconnected"].includes(pc.connectionState)) {
              console.log("onconnectionstatechange:", pc.connectionState);
              endSession(true);
            }
          };
    
          pc.ontrack = async (e) => {
            const stream = e.streams[0];
            ensureHiddenAudioEl();
            hiddenRemoteAudioEl.srcObject = stream;
            try { await hiddenRemoteAudioEl.play(); } catch {}
            startRecordingRemoteStream(stream);
          };
    
          const wantVideo = !!sendVideoCheckbox.checked;
    
          const localPreviewFps = 30;
          const sendFps = 2;
    
          const constraints = wantVideo
            ? {
                audio: true,
                video: {
                  facingMode: { ideal: "user" },
                  frameRate: { ideal: localPreviewFps, max: localPreviewFps },
                  width: { ideal: 640 },
                  height: { ideal: 480 },
                }
              }
            : { audio: true };
    
          localStream = await navigator.mediaDevices.getUserMedia(constraints);
    
          const videoSection = document.getElementById('videoSection');
          if (wantVideo) {
            localVideo.srcObject = localStream;
            localVideo.style.display = "block";
            videoSection.style.display = "";
            try { await localVideo.play(); } catch {}
          } else {
            localVideo.srcObject = null;
            localVideo.style.display = "none";
            videoSection.style.display = "none";
          }
    
          gatedAudioTracks = [];
          gatedVideoTracks = [];
    
          localStream.getAudioTracks().forEach(t => {
            pc.addTrack(t, localStream);
            gatedAudioTracks.push(t);
          });
    
          if (wantVideo) {
            if (sendRafId) cancelAnimationFrame(sendRafId);
            sendRafId = 0;
            if (sendCanvasStream) sendCanvasStream.getTracks().forEach(t => t.stop());
            sendCanvasStream = null;
            sendCanvasCtx = null;
            sendCanvas = null;
    
            const settings = localStream.getVideoTracks()[0].getSettings();
            sendCanvas = document.createElement("canvas");
            sendCanvas.width = settings.width || 640;
            sendCanvas.height = settings.height || 480;
            sendCanvasCtx = sendCanvas.getContext("2d", { alpha: false });
    
            sendCanvasStream = sendCanvas.captureStream(sendFps);
            const lowFpsTrack = sendCanvasStream.getVideoTracks()[0];
            pc.addTrack(lowFpsTrack, sendCanvasStream);
            gatedVideoTracks.push(lowFpsTrack);
    
            const pump = () => {
              if (!sendCanvasCtx || !sendCanvas) return;
              try { sendCanvasCtx.drawImage(localVideo, 0, 0, sendCanvas.width, sendCanvas.height); } catch {}
              sendRafId = requestAnimationFrame(pump);
            };
            sendRafId = requestAnimationFrame(pump);
          }
    
          gateMedia(false);
    
          audioSender = pc.getSenders().find(s => s.track?.kind === 'audio');
          videoSender = pc.getSenders().find(s => s.track?.kind === 'video');
          audioTrack = audioSender?.track;
          videoTrack = videoSender?.track;
    
          await audioSender?.replaceTrack(null);
          await videoSender?.replaceTrack(videoTrack ? null : undefined);
    
          const dc = pc.createDataChannel('oai-events');
    
          dc.onopen = () => console.log("DC open");
          dc.onmessage = (e) => {
            handleDcMessage(e.data, dc);
          };
    
          pc.ondatachannel = (event) => {
            const ch = event.channel;
            ch.onmessage = (e) => {
                handleDcMessage(e.data, ch);
            };
          };
    
          function handleDcMessage(data, channel) {
              let obj;
              try { obj = JSON.parse(data); }
              catch (err) {
                pushEventFromDataChannel({ type: "raw", data: String(data), parseError: String(err) });
                return;
              }
              pushEventFromDataChannel(obj);
    
              if (obj?.type === "session.created") {
                console.log("Session created, opening media gate.");
                gateMedia(true);
                if(audioSender) audioSender.replaceTrack(audioTrack);
                if(videoSender && videoTrack) videoSender.replaceTrack(videoTrack);
    
                sendUpdate(channel);
              }
          }
    
          pc.onicegatheringstatechange = () => {
            if (!pc) return;
            if (pc.iceGatheringState === "complete" && pc.localDescription?.sdp) {
              const sdp = pc.localDescription.sdp;
              offerBox.value = sdp;
              copyOfferBtn.disabled = false;
              setAnswerBtn.disabled = false;
    
              const escapedSdp = sdp.replace(/'/g, "'\\''");
              curlBox.value = `curl -X POST 'https://{endpoint}/api/v1/webrtc/realtime?model=qwen3.5-omni-plus-realtime' \\\n  -H 'Content-Type: application/sdp' \\\n  -H 'Authorization: Bearer $DASHSCOPE_API_KEY' \\\n  --data-binary '${escapedSdp}'`;
              copyCurlBtn.disabled = false;
    
              setStatus('Offer SDP generated. Copy the curl command to your terminal to get the Answer SDP.', 'connecting');
              console.log("ICE Gathering Complete. Ready to set remote description.");
            }
          };
    
          const offer = await pc.createOffer();
          await pc.setLocalDescription(offer);
        }
    
        async function setRemoteAnswerFromUI() {
          if (!pc) return alert('Click "Start Session" first to generate the Offer.');
          const txt = answerBox.value.trim();
          if (!txt) return alert("Please paste the Answer SDP");
    
          const answerSdp = normalizeSdpForSetRemote(txt);
          try {
              await pc.setRemoteDescription({ type: 'answer', sdp: answerSdp });
              setStatus('Establishing connection...', 'connecting');
          } catch (e) {
              alert("Failed to set Answer: " + e.message);
              console.error(e);
          }
        }
    
        function endSession(silent = false) {
          if (sendRafId) cancelAnimationFrame(sendRafId);
          sendRafId = 0;
    
          if (sendCanvasStream) {
            sendCanvasStream.getTracks().forEach(t => t.stop());
          }
          sendCanvasStream = null;
          sendCanvasCtx = null;
          sendCanvas = null;
    
          try { if (mediaRecorder && mediaRecorder.state !== "inactive") mediaRecorder.stop(); } catch {}
          mediaRecorder = null;
    
          if (localStream) {
            localStream.getTracks().forEach(t => t.stop());
            localStream = null;
          }
          localVideo.srcObject = null;
          localVideo.style.display = "none";
          document.getElementById('videoSection').style.display = "none";
    
          if (pc) {
            try { pc.close(); } catch {}
            pc = null;
          }
    
          gatedAudioTracks = [];
          gatedVideoTracks = [];
    
          if (hiddenRemoteAudioEl) {
            try { hiddenRemoteAudioEl.pause(); } catch {}
            hiddenRemoteAudioEl.srcObject = null;
            hiddenRemoteAudioEl.remove();
            hiddenRemoteAudioEl = null;
          }
    
          endBtn.disabled = true;
          setAnswerBtn.disabled = true;
          copyOfferBtn.disabled = true;
          copyCurlBtn.disabled = true;
          downloadBtn.disabled = !audioBlob;
    
          setStatus('Disconnected', '');
          if (!silent) console.log("session ended");
        }
    
        function ensureHiddenAudioEl() {
          if (hiddenRemoteAudioEl) return;
          hiddenRemoteAudioEl = document.createElement("audio");
          hiddenRemoteAudioEl.autoplay = true;
          hiddenRemoteAudioEl.playsInline = true;
          hiddenRemoteAudioEl.muted = false;
          hiddenRemoteAudioEl.style.display = "none";
          document.body.appendChild(hiddenRemoteAudioEl);
        }
    
        function startRecordingRemoteStream(remoteStream) {
          const audioTracks = remoteStream.getAudioTracks();
          if (!audioTracks.length) return;
    
          const audioStream = new MediaStream(audioTracks);
          recordedChunks = [];
          audioBlob = null;
          downloadBtn.disabled = true;
    
          try {
            mediaRecorder = new MediaRecorder(audioStream, { mimeType: 'audio/webm' });
          } catch (err) {
            console.log("MediaRecorder create failed:", err);
            return;
          }
    
          mediaRecorder.ondataavailable = (e) => {
            if (e.data && e.data.size > 0) recordedChunks.push(e.data);
          };
    
          mediaRecorder.onstop = () => {
            audioBlob = new Blob(recordedChunks, { type: 'audio/webm' });
            downloadBtn.disabled = !audioBlob || audioBlob.size === 0;
          };
    
          mediaRecorder.start();
        }
    
        function downloadBlob(blob, filename) {
          const url = URL.createObjectURL(blob);
          const a = document.createElement('a');
          a.style.display = 'none';
          a.href = url;
          a.download = filename;
          document.body.appendChild(a);
          a.click();
          URL.revokeObjectURL(url);
          a.remove();
        }
    
        renderEvents();
    
        const stickyTop = document.querySelector('.sticky-top');
        window.addEventListener('scroll', () => {
          stickyTop.classList.toggle('scrolled', window.scrollY > 10);
        }, { passive: true });
    </script>
    </body>
    </html>

    Open this file in a browser and follow these steps:

    1. Click Start Session. The page automatically generates the Offer SDP and the corresponding curl command.

    2. Click Copy curl Command and run it in your terminal. The output is the Answer SDP.

    3. Paste the Answer SDP into the Answer SDP text box, then click Set Answer to establish the connection and start the voice chat.

Interaction flow

VAD mode

Set session.turn_detection.type in session.update to "server_vad" or "semantic_vad" to enable VAD mode. Suitable for voice call scenarios. Both WebSocket and WebRTC support VAD mode with the same server events; they differ only in how audio and images are transmitted.

WebRTC only supports VAD mode and does not support Manual mode. With WebRTC, audio is transmitted directly via RTP without sending input_audio_buffer.append events; images are transmitted via video tracks without support for input_image_buffer.append events. Control commands and server events are transmitted via DataChannel with the same event types as WebSocket.

The interaction flow is as follows:

  1. The client sends audio data. WebSocket sends it via input_audio_buffer.append events; WebRTC transmits it automatically via the audio track (RTP) without manually sending events.

  2. The server detects the start of speech and sends the input_audio_buffer.speech_started event via DataChannel (WebRTC) or WebSocket.

  3. The server detects the end of speech and sends the input_audio_buffer.speech_stopped event.

  4. The server commits the audio buffer and sends the input_audio_buffer.committed event.

  5. The server begins generating a response, sending conversation.item.created and other events. Audio responses are returned incrementally via the WebSocket response.audio.delta event, or transmitted directly via the WebRTC audio track (RTP).

  6. During the response, the server returns incremental text transcription via response.audio_transcript.delta events, and sends the response.done event when the response is complete.

Lifecycle

Client events

Server events

Session initialization

session.update

Session configuration

session.created

Session created.

session.updated

Session configuration updated.

User audio input

input_audio_buffer.append

WebSocket: Appends audio to the buffer via this event.

input_image_buffer.append

WebSocket: Appends an image to the buffer via this event.
WebRTC: Audio is transmitted automatically via the RTP audio track, and images are transmitted via the video track. These events are not needed.

input_audio_buffer.speech_started

Speech start detected.

input_audio_buffer.speech_stopped

Speech end detected.

input_audio_buffer.committed

Audio buffer committed.

Server audio output

None

response.created

Response generation started.

response.output_item.added

New output item added to the response.

conversation.item.created

Conversation item created.

response.content_part.added

New content part added to the assistant message.

response.audio_transcript.delta

Incrementally generated transcribed text.

response.audio.delta

WebSocket: Incrementally generated audio from the model is returned via this event. WebRTC: Audio is transmitted directly via the RTP audio track; this event is not returned.

response.audio_transcript.done

Text transcription completed.

response.audio.done

Audio generation completed.

response.content_part.done

Streaming of the assistant's text or audio content is complete.

response.output_item.done

The assistant's entire output item has finished streaming.

response.done

Response completed.

conversation.item.input_audio_transcription.completed

User audio input transcription completed (requires enabling input_audio_transcription in session.update).

Manual mode

Set session.turn_detection in session.update to null for manual mode. The client sends input_audio_buffer.commit and response.create to request a response. Suitable for push-to-talk scenarios such as voice messages in chat apps.

The interaction flow is as follows:

  1. The client can send the input_audio_buffer.append and input_image_buffer.append events at any time to append audio and images to the buffer.

    You must send at least one input_audio_buffer.append event before you send an input_image_buffer.append event.
  2. The client sends the input_audio_buffer.commit event to commit the audio and image buffers, signaling to the server that all user input (audio and images) for the current turn has been sent.

  3. The server responds with an input_audio_buffer.committed event.

  4. The client sends the response.create event and waits for the server to return the model's output.

  5. The server responds with a conversation.item.created event.

Lifecycle

Client events

Server events

Session initialization

session.update

Session configuration

session.created

Session created.

session.updated

Session configuration updated.

User audio input

input_audio_buffer.append

Appends audio to the buffer.

input_image_buffer.append

Appends an image to the buffer.

input_audio_buffer.commit

Commits the audio and image buffers.

response.create

Requests a model response.

input_audio_buffer.committed

Audio buffer committed.

Server audio output

input_audio_buffer.clear

Clears audio from the buffer.

response.created

Response generation started.

response.output_item.added

New output item added to the response.

conversation.item.created

Conversation item created.

response.content_part.added

New content part added to the assistant message item.

response.audio_transcript.delta

Incrementally generated transcribed text.

response.audio.delta

Incrementally generated audio from the model.

response.audio_transcript.done

Text transcription completed.

response.audio.done

Audio generation completed.

response.content_part.done

Streaming of the assistant's text or audio content is complete.

response.output_item.done

The assistant's entire output item has finished streaming.

response.done

Response completed.

Web search

Web search lets the model use real-time data to answer questions about timely information such as stock prices and weather. The model automatically determines if a search is needed.

Only qwen3.5-omni-plus-realtime supports web search. Disabled by default; enable it with session.update.
For billing, see the agent policy in billing rules.

Enable web search

Add the following parameters to the session.update event:

  • enable_search: Set to true to enable the web search feature.

  • search_options.enable_source: Set to true to include the sources of the search results in the response.

For more parameters, see session.update.

Response format

When web search is enabled, the usage object in response.done includes a plugins field with search metering information:

{
    "usage": {
        "total_tokens": 2937,
        "input_tokens": 2554,
        "output_tokens": 383,
        "input_tokens_details": {
            "text_tokens": 2512,
            "audio_tokens": 42
        },
        "output_tokens_details": {
            "text_tokens": 90,
            "audio_tokens": 293
        },
        "plugins": {
            "search": {
                "count": 1,
                "strategy": "agent"
            }
        }
    }
}

Code example

Enable web search in a real-time conversation:

DashScope Python SDK

Pass the enable_search and search_options parameters in the update_session call:

import os
import base64
import time
import json
import pyaudio
from dashscope.audio.qwen_omni import MultiModality, AudioFormat, OmniRealtimeCallback, OmniRealtimeConversation
import dashscope

dashscope.api_key = os.getenv('DASHSCOPE_API_KEY')
url = 'wss://{WorkspaceId}.ap-southeast-1.maas.aliyuncs.com/api-ws/v1/realtime'
model = 'qwen3.5-omni-plus-realtime'
voice = 'Tina'

class SearchCallback(OmniRealtimeCallback):
    def __init__(self, pya):
        self.pya = pya
        self.out = None
    def on_open(self):
        self.out = self.pya.open(format=pyaudio.paInt16, channels=1, rate=24000, output=True)
    def on_event(self, response):
        if response['type'] == 'response.audio.delta':
            self.out.write(base64.b64decode(response['delta']))
        elif response['type'] == 'conversation.item.input_audio_transcription.delta':
            preview = response.get('text', '') + response.get('stash', '')
            print(f"\r[User] {preview}", end='', flush=True)
        elif response['type'] == 'conversation.item.input_audio_transcription.completed':
            print(f"\r[User] {response['transcript']}")
        elif response['type'] == 'response.audio_transcript.done':
            print(f"[LLM] {response['transcript']}")
        elif response['type'] == 'response.done':
            usage = response.get('response', {}).get('usage', {})
            plugins = usage.get('plugins', {})
            if plugins.get('search'):
                print(f"[Search] count={plugins['search']['count']}, strategy={plugins['search']['strategy']}")

pya = pyaudio.PyAudio()
callback = SearchCallback(pya)
conv = OmniRealtimeConversation(model=model, callback=callback, url=url)
conv.connect()
conv.update_session(
    output_modalities=[MultiModality.AUDIO, MultiModality.TEXT],
    voice=voice,
    instructions="You are Xiaoyun, a personal assistant.",
    enable_search=True,
    search_options={'enable_source': True}
)
mic = pya.open(format=pyaudio.paInt16, channels=1, rate=16000, input=True)
print("Web search is enabled. Speak into the microphone (Ctrl+C to exit)...")
try:
    while True:
        audio_data = mic.read(3200, exception_on_overflow=False)
        conv.append_audio(base64.b64encode(audio_data).decode())
        time.sleep(0.01)
except KeyboardInterrupt:
    conv.close()
    mic.close()
    callback.out.close()
    pya.terminate()
    print("\nConversation ended.")

DashScope Java SDK

In the updateSession method, pass the web search configuration in the parameters argument:

import com.alibaba.dashscope.audio.omni.*;
import com.alibaba.dashscope.exception.NoApiKeyException;
import com.google.gson.JsonObject;
import javax.sound.sampled.*;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;

public class OmniSearch {
    static class SequentialAudioPlayer {
        private final SourceDataLine line;
        private final Queue<byte[]> audioQueue = new ConcurrentLinkedQueue<>();
        private final Thread playerThread;
        private final AtomicBoolean shouldStop = new AtomicBoolean(false);

        public SequentialAudioPlayer() throws LineUnavailableException {
            AudioFormat format = new AudioFormat(24000, 16, 1, true, false);
            line = AudioSystem.getSourceDataLine(format);
            line.open(format);
            line.start();
            playerThread = new Thread(() -> {
                while (!shouldStop.get()) {
                    byte[] audio = audioQueue.poll();
                    if (audio != null) {
                        line.write(audio, 0, audio.length);
                    } else {
                        try { Thread.sleep(10); } catch (InterruptedException ignored) {}
                    }
                }
            }, "AudioPlayer");
            playerThread.start();
        }

        public void play(String base64Audio) {
            audioQueue.add(Base64.getDecoder().decode(base64Audio));
        }
        public void close() {
            shouldStop.set(true);
            try { playerThread.join(1000); } catch (InterruptedException ignored) {}
            line.drain();
            line.close();
        }
    }

    public static void main(String[] args) {
        try {
            SequentialAudioPlayer player = new SequentialAudioPlayer();
            AtomicBoolean shouldStop = new AtomicBoolean(false);

            OmniRealtimeParam param = OmniRealtimeParam.builder()
                    .model("qwen3.5-omni-plus-realtime")
                    .apikey(System.getenv("DASHSCOPE_API_KEY"))
                    .url("wss://{WorkspaceId}.ap-southeast-1.maas.aliyuncs.com/api-ws/v1/realtime")
                    .build();

            OmniRealtimeConversation conversation = new OmniRealtimeConversation(param, new OmniRealtimeCallback() {
                @Override public void onOpen() {
                    System.out.println("Connection established.");
                }
                @Override public void onClose(int code, String reason) {
                    System.out.println("Connection closed.");
                    shouldStop.set(true);
                }
                @Override public void onEvent(JsonObject event) {
                    String type = event.get("type").getAsString();
                    if ("response.audio.delta".equals(type)) {
                        player.play(event.get("delta").getAsString());
                    } else if ("response.audio_transcript.done".equals(type)) {
                        System.out.println("[LLM] " + event.get("transcript").getAsString());
                    } else if ("response.done".equals(type)) {
                        JsonObject response = event.getAsJsonObject("response");
                        if (response != null && response.has("usage")) {
                            JsonObject usage = response.getAsJsonObject("usage");
                            if (usage.has("plugins")) {
                                JsonObject plugins = usage.getAsJsonObject("plugins");
                                if (plugins.has("search")) {
                                    JsonObject search = plugins.getAsJsonObject("search");
                                    System.out.println("[Search] count=" + search.get("count").getAsInt()
                                            + ", strategy=" + search.get("strategy").getAsString());
                                }
                            }
                        }
                    }
                }
            });

            conversation.connect();
            conversation.updateSession(OmniRealtimeConfig.builder()
                    .modalities(Arrays.asList(OmniRealtimeModality.AUDIO, OmniRealtimeModality.TEXT))
                    .voice("Tina")
                    .enableTurnDetection(true)
                    .enableInputAudioTranscription(true)
                    .parameters(Map.of(
                            "instructions", "You are Xiaoyun, a personal assistant.",
                            "enable_search", true,
                            "search_options", Map.of("enable_source", true)
                    ))
                    .build()
            );

            System.out.println("Web search is enabled. Start speaking (press Ctrl+C to exit)...");
            AudioFormat format = new AudioFormat(16000, 16, 1, true, false);
            TargetDataLine mic = AudioSystem.getTargetDataLine(format);
            mic.open(format);
            mic.start();

            ByteBuffer buffer = ByteBuffer.allocate(3200);
            while (!shouldStop.get()) {
                int bytesRead = mic.read(buffer.array(), 0, buffer.capacity());
                if (bytesRead > 0) {
                    conversation.appendAudio(Base64.getEncoder().encodeToString(buffer.array()));
                }
                Thread.sleep(20);
            }

            conversation.close(1000, "Normal termination");
            player.close();
            mic.close();
        } catch (NoApiKeyException e) {
            System.err.println("API key not found: Set the DASHSCOPE_API_KEY environment variable.");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

WebSocket (Python)

Add the enable_search and search_options fields to the JSON payload of the session.update event:

import json
import os
import websocket
import base64
import pyaudio
import threading

API_KEY = os.getenv("DASHSCOPE_API_KEY")
API_URL = "wss://{WorkspaceId}.ap-southeast-1.maas.aliyuncs.com/api-ws/v1/realtime?model=qwen3.5-omni-plus-realtime"

pya = pyaudio.PyAudio()
out_stream = pya.open(format=pyaudio.paInt16, channels=1, rate=24000, output=True)

def on_open(ws):
    ws.send(json.dumps({
        "type": "session.update",
        "session": {
            "modalities": ["text", "audio"],
            "voice": "Tina",
            "instructions": "You are Xiaoyun, a personal assistant.",
            "input_audio_format": "pcm",
            "output_audio_format": "pcm",
            "enable_search": True,
            "search_options": {
                "enable_source": True
            }
        }
    }))
    print("Web search is enabled. Speak into the microphone...")
    def send_audio():
        mic = pya.open(format=pyaudio.paInt16, channels=1, rate=16000, input=True)
        try:
            while True:
                audio = mic.read(3200, exception_on_overflow=False)
                ws.send(json.dumps({
                    "type": "input_audio_buffer.append",
                    "audio": base64.b64encode(audio).decode()
                }))
        except Exception:
            mic.close()
    threading.Thread(target=send_audio, daemon=True).start()

def on_message(ws, message):
    event = json.loads(message)
    if event["type"] == "response.audio.delta":
        out_stream.write(base64.b64decode(event["delta"]))
    elif event["type"] == "response.audio_transcript.done":
        print(f"[LLM] {event['transcript']}")
    elif event["type"] == "response.done":
        usage = event.get("response", {}).get("usage", {})
        plugins = usage.get("plugins", {})
        if plugins.get("search"):
            print(f"[Search] count={plugins['search']['count']}, strategy={plugins['search']['strategy']}")

def on_error(ws, error):
    print(f"Error: {error}")

headers = ["Authorization: Bearer " + API_KEY]
ws = websocket.WebSocketApp(API_URL, header=headers, on_open=on_open, on_message=on_message, on_error=on_error)
ws.run_forever()

API reference

Billing and rate limits

Billing

Billing is token-based, metered by modality (audio, image, text). Check the Model Studio console for pricing.

Rules for converting audio and images to tokens

Audio

  • Qwen3.5-Omni-Realtime:

    • Input audio: total tokens = Audio duration (seconds) * 7

    • Output audio: total tokens = Audio duration (seconds) * 12.5

  • Qwen3-Omni-Flash-Realtime: Input and output audio use the same formula: total tokens = Audio duration (seconds) * 12.5

  • Qwen-Omni-Turbo-Realtime: Input and output audio use the same formula: total tokens = Audio duration (seconds) * 25

    Audio durations of less than 1 second are billed as 1 second.

Image

  • The Qwen3.5-Omni-Realtime model uses 1 token per 32x32 pixels

  • The Qwen3-Omni-Flash-Realtime model uses 1 token per 32x32 pixels

  • The Qwen-Omni-Turbo-Realtime model uses 1 token per 28x28 pixels

An image consumes 4–1,280 tokens. Use the following code to estimate token consumption based on image dimensions and session duration:

# Install the Pillow library by running: pip install Pillow
from PIL import Image
import math

# For the Qwen-Omni-Turbo-Realtime model, the scaling factor is 28.
# factor = 28
# For the Qwen3-Omni-Flash-Realtime and Qwen3.5-Omni-Realtime models, the scaling factor is 32.
factor = 32

def token_calculate(image_path='', duration=10):
    """
    :param image_path: Image path
    :param duration: Session duration
    :return: Total tokens for the image based on session duration
    """
    if len(image_path) > 0:
        # Open the image file.
        image = Image.open(image_path)
        # Get the image's original dimensions.
        height = image.height
        width = image.width
        print(f"Image dimensions before scaling: height={height}, width={width}")
        # Adjust the height to a multiple of factor.
        h_bar = round(height / factor) * factor
        # Adjust the width to a multiple of factor.
        w_bar = round(width / factor) * factor
        # Lower limit for image tokens: 4 tokens.
        min_pixels = factor * factor * 4
        # Upper limit for image tokens: 1,280 tokens.
        max_pixels = 1280 * factor * factor
        # Scale the image to fit the pixel count limits.
        if h_bar * w_bar > max_pixels:
            # Calculate the scaling factor beta to avoid exceeding max_pixels.
            beta = math.sqrt((height * width) / max_pixels)
            # Recalculate the adjusted height to ensure it is an integer multiple of factor.
            h_bar = math.floor(height / beta / factor) * factor
            # Recalculate the adjusted width to ensure it is an integer multiple of factor.
            w_bar = math.floor(width / beta / factor) * factor
        elif h_bar * w_bar < min_pixels:
            # Calculate the scaling factor beta so the scaled image pixel count is not less than min_pixels.
            beta = math.sqrt(min_pixels / (height * width))
            # Recalculate the adjusted height to ensure it is an integer multiple of factor.
            h_bar = math.ceil(height * beta / factor) * factor
            # Recalculate the adjusted width to ensure it is an integer multiple of factor.
            w_bar = math.ceil(width * beta / factor) * factor
        print(f"Image dimensions after scaling: height={h_bar}, width={w_bar}")
        # Calculate image tokens.
        token = int((h_bar * w_bar) / (factor * factor))
        print(f"Token count after scaling: {token}")
        total_token = token * math.ceil(duration / 2)
        print(f"Total tokens: {total_token}")
        return total_token
    else:
        print("Error: image_path is empty. Cannot calculate tokens.")
        return 0

if __name__ == "__main__":
    total_token = token_calculate(image_path="xxx/test.jpg", duration=10)

Rate limiting

For model rate limits, see Rate limits.

Error codes

If the model call fails and returns an error message, see Error codes for resolution.

Voice list

For a list of voices available for the Qwen-Omni-Realtime model, see Voices.