Go
package main
import (
"encoding/json"
"fmt"
"net/http"
"os"
"time"
"github.com/google/uuid"
"github.com/gorilla/websocket"
)
const (
wsURL = "wss://dashscope.aliyuncs.com/api-ws/v1/inference/" // WebSocket服務端地址
outputFile = "output.mp3" // 輸出檔案路徑
)
func main() {
// 若沒有將API Key配置到環境變數,可將下行替換為:apiKey := "your_api_key"。不建議在生產環境中直接將API Key寫入程式碼到代碼中,以減少API Key泄露風險。
apiKey := os.Getenv("DASHSCOPE_API_KEY")
// 檢查並清空輸出檔案
if err := clearOutputFile(outputFile); err != nil {
fmt.Println("清空輸出檔案失敗:", err)
return
}
// 串連WebSocket服務
conn, err := connectWebSocket(apiKey)
if err != nil {
fmt.Println("串連WebSocket失敗:", err)
return
}
defer closeConnection(conn)
// 啟動一個goroutine來接收結果
done, taskStarted := startResultReceiver(conn)
// 發送run-task指令
taskID, err := sendRunTaskCmd(conn)
if err != nil {
fmt.Println("發送run-task指令失敗:", err)
return
}
// 等待task-started事件
for !*taskStarted {
time.Sleep(100 * time.Millisecond)
}
// 發送待合成文本
if err := sendContinueTaskCmd(conn, taskID); err != nil {
fmt.Println("發送待合成文本失敗:", err)
return
}
// 發送finish-task指令
if err := sendFinishTaskCmd(conn, taskID); err != nil {
fmt.Println("發送finish-task指令失敗:", err)
return
}
// 等待接收結果的goroutine完成
<-done
}
var dialer = websocket.DefaultDialer
// 定義結構體來表示JSON資料
type Header struct {
Action string `json:"action"`
TaskID string `json:"task_id"`
Streaming string `json:"streaming"`
Event string `json:"event"`
ErrorCode string `json:"error_code,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
Attributes map[string]interface{} `json:"attributes"`
}
type Payload struct {
TaskGroup string `json:"task_group"`
Task string `json:"task"`
Function string `json:"function"`
Model string `json:"model"`
Parameters Params `json:"parameters"`
Input Input `json:"input"`
}
type Params struct {
TextType string `json:"text_type"`
Voice string `json:"voice"`
Format string `json:"format"`
SampleRate int `json:"sample_rate"`
Volume int `json:"volume"`
Rate int `json:"rate"`
Pitch int `json:"pitch"`
EnableSSML bool `json:"enable_ssml"`
}
type Input struct {
Text string `json:"text"`
}
type Event struct {
Header Header `json:"header"`
Payload Payload `json:"payload"`
}
// 串連WebSocket服務
func connectWebSocket(apiKey string) (*websocket.Conn, error) {
header := make(http.Header)
header.Add("X-DashScope-DataInspection", "enable")
header.Add("Authorization", fmt.Sprintf("bearer %s", apiKey))
conn, _, err := dialer.Dial(wsURL, header)
if err != nil {
fmt.Println("串連WebSocket失敗:", err)
return nil, err
}
return conn, nil
}
// 發送run-task指令
func sendRunTaskCmd(conn *websocket.Conn) (string, error) {
runTaskCmd, taskID, err := generateRunTaskCmd()
if err != nil {
return "", err
}
err = conn.WriteMessage(websocket.TextMessage, []byte(runTaskCmd))
return taskID, err
}
// 產生run-task指令
func generateRunTaskCmd() (string, string, error) {
taskID := uuid.New().String()
runTaskCmd := Event{
Header: Header{
Action: "run-task",
TaskID: taskID,
Streaming: "duplex",
},
Payload: Payload{
TaskGroup: "audio",
Task: "tts",
Function: "SpeechSynthesizer",
Model: "cosyvoice-v3-flash",
Parameters: Params{
TextType: "PlainText",
Voice: "longanyang",
Format: "mp3",
SampleRate: 22050,
Volume: 50,
Rate: 1,
Pitch: 1,
// 如果enable_ssml設為true,只允許發送一次continue-task指令,否則會報錯“Text request limit violated, expected 1.”
EnableSSML: false,
},
Input: Input{},
},
}
runTaskCmdJSON, err := json.Marshal(runTaskCmd)
return string(runTaskCmdJSON), taskID, err
}
// 發送待合成文本
func sendContinueTaskCmd(conn *websocket.Conn, taskID string) error {
texts := []string{"床前明月光", "疑是地上霜", "舉頭望明月", "低頭思故鄉"}
for _, text := range texts {
runTaskCmd, err := generateContinueTaskCmd(text, taskID)
if err != nil {
return err
}
err = conn.WriteMessage(websocket.TextMessage, []byte(runTaskCmd))
if err != nil {
return err
}
}
return nil
}
// 產生continue-task指令
func generateContinueTaskCmd(text string, taskID string) (string, error) {
runTaskCmd := Event{
Header: Header{
Action: "continue-task",
TaskID: taskID,
Streaming: "duplex",
},
Payload: Payload{
Input: Input{
Text: text,
},
},
}
runTaskCmdJSON, err := json.Marshal(runTaskCmd)
return string(runTaskCmdJSON), err
}
// 啟動一個goroutine來接收結果
func startResultReceiver(conn *websocket.Conn) (chan struct{}, *bool) {
done := make(chan struct{})
taskStarted := new(bool)
*taskStarted = false
go func() {
defer close(done)
for {
msgType, message, err := conn.ReadMessage()
if err != nil {
fmt.Println("解析伺服器訊息失敗:", err)
return
}
if msgType == websocket.BinaryMessage {
// 處理二進位音頻流
if err := writeBinaryDataToFile(message, outputFile); err != nil {
fmt.Println("寫入位元據失敗:", err)
return
}
} else {
// 處理簡訊
var event Event
err = json.Unmarshal(message, &event)
if err != nil {
fmt.Println("解析事件失敗:", err)
continue
}
if handleEvent(conn, event, taskStarted) {
return
}
}
}
}()
return done, taskStarted
}
// 處理事件
func handleEvent(conn *websocket.Conn, event Event, taskStarted *bool) bool {
switch event.Header.Event {
case "task-started":
fmt.Println("收到task-started事件")
*taskStarted = true
case "result-generated":
// 忽略result-generated事件
return false
case "task-finished":
fmt.Println("任務完成")
return true
case "task-failed":
handleTaskFailed(event, conn)
return true
default:
fmt.Printf("預料之外的事件:%v\n", event)
}
return false
}
// 處理任務失敗事件
func handleTaskFailed(event Event, conn *websocket.Conn) {
if event.Header.ErrorMessage != "" {
fmt.Printf("任務失敗:%s\n", event.Header.ErrorMessage)
} else {
fmt.Println("未知原因導致任務失敗")
}
}
// 關閉串連
func closeConnection(conn *websocket.Conn) {
if conn != nil {
conn.Close()
}
}
// 寫入位元據到檔案
func writeBinaryDataToFile(data []byte, filePath string) error {
file, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
defer file.Close()
_, err = file.Write(data)
if err != nil {
return err
}
return nil
}
// 發送finish-task指令
func sendFinishTaskCmd(conn *websocket.Conn, taskID string) error {
finishTaskCmd, err := generateFinishTaskCmd(taskID)
if err != nil {
return err
}
err = conn.WriteMessage(websocket.TextMessage, []byte(finishTaskCmd))
return err
}
// 產生finish-task指令
func generateFinishTaskCmd(taskID string) (string, error) {
finishTaskCmd := Event{
Header: Header{
Action: "finish-task",
TaskID: taskID,
Streaming: "duplex",
},
Payload: Payload{
Input: Input{},
},
}
finishTaskCmdJSON, err := json.Marshal(finishTaskCmd)
return string(finishTaskCmdJSON), err
}
// 清空輸出檔案
func clearOutputFile(filePath string) error {
file, err := os.OpenFile(filePath, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
file.Close()
return nil
}
C#
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
class Program {
// 若沒有將API Key配置到環境變數,可將下行替換為:private const string ApiKey="your_api_key"。不建議在生產環境中直接將API Key寫入程式碼到代碼中,以減少API Key泄露風險。
private static readonly string ApiKey = Environment.GetEnvironmentVariable("DASHSCOPE_API_KEY") ?? throw new InvalidOperationException("DASHSCOPE_API_KEY environment variable is not set.");
// WebSocket伺服器位址
private const string WebSocketUrl = "wss://dashscope.aliyuncs.com/api-ws/v1/inference/";
// 輸出檔案路徑
private const string OutputFilePath = "output.mp3";
// WebSocket用戶端
private static ClientWebSocket _webSocket = new ClientWebSocket();
// 取消令牌源
private static CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
// 任務ID
private static string? _taskId;
// 任務是否已啟動
private static TaskCompletionSource<bool> _taskStartedTcs = new TaskCompletionSource<bool>();
static async Task Main(string[] args) {
try {
// 清空輸出檔案
ClearOutputFile(OutputFilePath);
// 串連WebSocket服務
await ConnectToWebSocketAsync(WebSocketUrl);
// 啟動接收訊息的任務
Task receiveTask = ReceiveMessagesAsync();
// 發送run-task指令
_taskId = GenerateTaskId();
await SendRunTaskCommandAsync(_taskId);
// 等待task-started事件
await _taskStartedTcs.Task;
// 持續發送continue-task指令
string[] texts = {
"床前明月光",
"疑是地上霜",
"舉頭望明月",
"低頭思故鄉"
};
foreach (string text in texts) {
await SendContinueTaskCommandAsync(text);
}
// 發送finish-task指令
await SendFinishTaskCommandAsync(_taskId);
// 等待接收任務完成
await receiveTask;
Console.WriteLine("任務完成,串連已關閉。");
} catch (OperationCanceledException) {
Console.WriteLine("任務被取消。");
} catch (Exception ex) {
Console.WriteLine($"發生錯誤:{ex.Message}");
} finally {
_cancellationTokenSource.Cancel();
_webSocket.Dispose();
}
}
private static void ClearOutputFile(string filePath) {
if (File.Exists(filePath)) {
File.WriteAllText(filePath, string.Empty);
Console.WriteLine("輸出檔案已清空。");
} else {
Console.WriteLine("輸出檔案不存在,無需清空。");
}
}
private static async Task ConnectToWebSocketAsync(string url) {
var uri = new Uri(url);
if (_webSocket.State == WebSocketState.Connecting || _webSocket.State == WebSocketState.Open) {
return;
}
// 設定WebSocket串連的頭部資訊
_webSocket.Options.SetRequestHeader("Authorization", $"bearer {ApiKey}");
_webSocket.Options.SetRequestHeader("X-DashScope-DataInspection", "enable");
try {
await _webSocket.ConnectAsync(uri, _cancellationTokenSource.Token);
Console.WriteLine("已成功串連到WebSocket服務。");
} catch (OperationCanceledException) {
Console.WriteLine("WebSocket串連被取消。");
} catch (Exception ex) {
Console.WriteLine($"WebSocket串連失敗: {ex.Message}");
throw;
}
}
private static async Task SendRunTaskCommandAsync(string taskId) {
var command = CreateCommand("run-task", taskId, "duplex", new {
task_group = "audio",
task = "tts",
function = "SpeechSynthesizer",
model = "cosyvoice-v3-flash",
parameters = new
{
text_type = "PlainText",
voice = "longanyang",
format = "mp3",
sample_rate = 22050,
volume = 50,
rate = 1,
pitch = 1,
// 如果enable_ssml設為true,只允許發送一次continue-task指令,否則會報錯“Text request limit violated, expected 1.”
enable_ssml = false
},
input = new { }
});
await SendJsonMessageAsync(command);
Console.WriteLine("已發送run-task指令。");
}
private static async Task SendContinueTaskCommandAsync(string text) {
if (_taskId == null) {
throw new InvalidOperationException("任務ID未初始化。");
}
var command = CreateCommand("continue-task", _taskId, "duplex", new {
input = new {
text
}
});
await SendJsonMessageAsync(command);
Console.WriteLine("已發送continue-task指令。");
}
private static async Task SendFinishTaskCommandAsync(string taskId) {
var command = CreateCommand("finish-task", taskId, "duplex", new {
input = new { }
});
await SendJsonMessageAsync(command);
Console.WriteLine("已發送finish-task指令。");
}
private static async Task SendJsonMessageAsync(string message) {
var buffer = Encoding.UTF8.GetBytes(message);
try {
await _webSocket.SendAsync(new ArraySegment<byte>(buffer), WebSocketMessageType.Text, true, _cancellationTokenSource.Token);
} catch (OperationCanceledException) {
Console.WriteLine("訊息發送被取消。");
}
}
private static async Task ReceiveMessagesAsync() {
while (_webSocket.State == WebSocketState.Open) {
var response = await ReceiveMessageAsync();
if (response != null) {
var eventStr = response.RootElement.GetProperty("header").GetProperty("event").GetString();
switch (eventStr) {
case "task-started":
Console.WriteLine("任務已啟動。");
_taskStartedTcs.TrySetResult(true);
break;
case "task-finished":
Console.WriteLine("任務已完成。");
_cancellationTokenSource.Cancel();
break;
case "task-failed":
Console.WriteLine("任務失敗:" + response.RootElement.GetProperty("header").GetProperty("error_message").GetString());
_cancellationTokenSource.Cancel();
break;
default:
// result-generated可在此處理
break;
}
}
}
}
private static async Task<JsonDocument?> ReceiveMessageAsync() {
var buffer = new byte[1024 * 4];
var segment = new ArraySegment<byte>(buffer);
try {
WebSocketReceiveResult result = await _webSocket.ReceiveAsync(segment, _cancellationTokenSource.Token);
if (result.MessageType == WebSocketMessageType.Close) {
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", _cancellationTokenSource.Token);
return null;
}
if (result.MessageType == WebSocketMessageType.Binary) {
// 處理位元據
Console.WriteLine("接收到位元據...");
// 將位元據儲存到檔案
using (var fileStream = new FileStream(OutputFilePath, FileMode.Append)) {
fileStream.Write(buffer, 0, result.Count);
}
return null;
}
string message = Encoding.UTF8.GetString(buffer, 0, result.Count);
return JsonDocument.Parse(message);
} catch (OperationCanceledException) {
Console.WriteLine("訊息接收被取消。");
return null;
}
}
private static string GenerateTaskId() {
return Guid.NewGuid().ToString("N").Substring(0, 32);
}
private static string CreateCommand(string action, string taskId, string streaming, object payload) {
var command = new {
header = new {
action,
task_id = taskId,
streaming
},
payload
};
return JsonSerializer.Serialize(command);
}
}
PHP
範例程式碼目錄結構為:
my-php-project/
├── composer.json
├── vendor/
└── index.php
composer.json內容如下,相關依賴的版本號碼請根據實際情況自行決定:
{
"require": {
"react/event-loop": "^1.3",
"react/socket": "^1.11",
"react/stream": "^1.2",
"react/http": "^1.1",
"ratchet/pawl": "^0.4"
},
"autoload": {
"psr-4": {
"App\\": "src/"
}
}
}
index.php內容如下:
<?php
require __DIR__ . '/vendor/autoload.php';
use Ratchet\Client\Connector;
use React\EventLoop\Loop;
use React\Socket\Connector as SocketConnector;
// 若沒有將API Key配置到環境變數,可將下行替換為:$api_key="your_api_key"。不建議在生產環境中直接將API Key寫入程式碼到代碼中,以減少API Key泄露風險。
$api_key = getenv("DASHSCOPE_API_KEY");
$websocket_url = 'wss://dashscope.aliyuncs.com/api-ws/v1/inference/'; // WebSocket伺服器位址
$output_file = 'output.mp3'; // 輸出檔案路徑
$loop = Loop::get();
if (file_exists($output_file)) {
// 清空檔案內容
file_put_contents($output_file, '');
}
// 建立自訂的連接器
$socketConnector = new SocketConnector($loop, [
'tcp' => [
'bindto' => '0.0.0.0:0',
],
'tls' => [
'verify_peer' => false,
'verify_peer_name' => false,
],
]);
$connector = new Connector($loop, $socketConnector);
$headers = [
'Authorization' => 'bearer ' . $api_key,
'X-DashScope-DataInspection' => 'enable'
];
$connector($websocket_url, [], $headers)->then(function ($conn) use ($loop, $output_file) {
echo "串連到WebSocket伺服器\n";
// 產生任務ID
$taskId = generateTaskId();
// 發送 run-task 指令
sendRunTaskMessage($conn, $taskId);
// 定義發送 continue-task 指令的函數
$sendContinueTask = function() use ($conn, $loop, $taskId) {
// 待發送的文本
$texts = ["床前明月光", "疑是地上霜", "舉頭望明月", "低頭思故鄉"];
$continueTaskCount = 0;
foreach ($texts as $text) {
$continueTaskMessage = json_encode([
"header" => [
"action" => "continue-task",
"task_id" => $taskId,
"streaming" => "duplex"
],
"payload" => [
"input" => [
"text" => $text
]
]
]);
echo "準備發送continue-task指令: " . $continueTaskMessage . "\n";
$conn->send($continueTaskMessage);
$continueTaskCount++;
}
echo "發送的continue-task指令個數為:" . $continueTaskCount . "\n";
// 發送 finish-task 指令
sendFinishTaskMessage($conn, $taskId);
};
// 標記是否收到 task-started 事件
$taskStarted = false;
// 監聽訊息
$conn->on('message', function($msg) use ($conn, $sendContinueTask, $loop, &$taskStarted, $taskId, $output_file) {
if ($msg->isBinary()) {
// 寫入位元據到本地檔案
file_put_contents($output_file, $msg->getPayload(), FILE_APPEND);
} else {
// 處理非二進位訊息
$response = json_decode($msg, true);
if (isset($response['header']['event'])) {
handleEvent($conn, $response, $sendContinueTask, $loop, $taskId, $taskStarted);
} else {
echo "未知的訊息格式\n";
}
}
});
// 監聽串連關閉
$conn->on('close', function($code = null, $reason = null) {
echo "串連已關閉\n";
if ($code !== null) {
echo "關閉代碼: " . $code . "\n";
}
if ($reason !== null) {
echo "關閉原因:" . $reason . "\n";
}
});
}, function ($e) {
echo "無法串連:{$e->getMessage()}\n";
});
$loop->run();
/**
* 產生任務ID
* @return string
*/
function generateTaskId(): string {
return bin2hex(random_bytes(16));
}
/**
* 發送 run-task 指令
* @param $conn
* @param $taskId
*/
function sendRunTaskMessage($conn, $taskId) {
$runTaskMessage = json_encode([
"header" => [
"action" => "run-task",
"task_id" => $taskId,
"streaming" => "duplex"
],
"payload" => [
"task_group" => "audio",
"task" => "tts",
"function" => "SpeechSynthesizer",
"model" => "cosyvoice-v3-flash",
"parameters" => [
"text_type" => "PlainText",
"voice" => "longanyang",
"format" => "mp3",
"sample_rate" => 22050,
"volume" => 50,
"rate" => 1,
"pitch" => 1,
// 如果enable_ssml設為true,只允許發送一次continue-task指令,否則會報錯“Text request limit violated, expected 1.”
"enable_ssml" => false
],
"input" => (object) []
]
]);
echo "準備發送run-task指令: " . $runTaskMessage . "\n";
$conn->send($runTaskMessage);
echo "run-task指令已發送\n";
}
/**
* 讀取音頻檔案
* @param string $filePath
* @return bool|string
*/
function readAudioFile(string $filePath) {
$voiceData = file_get_contents($filePath);
if ($voiceData === false) {
echo "無法讀取音頻檔案\n";
}
return $voiceData;
}
/**
* 分割音頻資料
* @param string $data
* @param int $chunkSize
* @return array
*/
function splitAudioData(string $data, int $chunkSize): array {
return str_split($data, $chunkSize);
}
/**
* 發送 finish-task 指令
* @param $conn
* @param $taskId
*/
function sendFinishTaskMessage($conn, $taskId) {
$finishTaskMessage = json_encode([
"header" => [
"action" => "finish-task",
"task_id" => $taskId,
"streaming" => "duplex"
],
"payload" => [
"input" => (object) []
]
]);
echo "準備發送finish-task指令: " . $finishTaskMessage . "\n";
$conn->send($finishTaskMessage);
echo "finish-task指令已發送\n";
}
/**
* 處理事件
* @param $conn
* @param $response
* @param $sendContinueTask
* @param $loop
* @param $taskId
* @param $taskStarted
*/
function handleEvent($conn, $response, $sendContinueTask, $loop, $taskId, &$taskStarted) {
switch ($response['header']['event']) {
case 'task-started':
echo "任務開始,發送continue-task指令...\n";
$taskStarted = true;
// 發送 continue-task 指令
$sendContinueTask();
break;
case 'result-generated':
// 忽略result-generated事件
break;
case 'task-finished':
echo "任務完成\n";
$conn->close();
break;
case 'task-failed':
echo "任務失敗\n";
echo "錯誤碼:" . $response['header']['error_code'] . "\n";
echo "錯誤資訊:" . $response['header']['error_message'] . "\n";
$conn->close();
break;
case 'error':
echo "錯誤:" . $response['payload']['message'] . "\n";
break;
default:
echo "未知事件:" . $response['header']['event'] . "\n";
break;
}
// 如果任務已完成,關閉串連
if ($response['header']['event'] == 'task-finished') {
// 等待1秒以確保所有資料都已傳輸完畢
$loop->addTimer(1, function() use ($conn) {
$conn->close();
echo "用戶端關閉串連\n";
});
}
// 如果沒有收到 task-started 事件,關閉串連
if (!$taskStarted && in_array($response['header']['event'], ['task-failed', 'error'])) {
$conn->close();
}
}
Node.js
需安裝相關依賴:
npm install ws
npm install uuid
範例程式碼如下:
const WebSocket = require('ws');
const fs = require('fs');
const uuid = require('uuid').v4;
// 若沒有將API Key配置到環境變數,可將下行替換為:apiKey = 'your_api_key'。不建議在生產環境中直接將API Key寫入程式碼到代碼中,以減少API Key泄露風險。
const apiKey = process.env.DASHSCOPE_API_KEY;
// WebSocket伺服器位址
const url = 'wss://dashscope.aliyuncs.com/api-ws/v1/inference/';
// 輸出檔案路徑
const outputFilePath = 'output.mp3';
// 清空輸出檔案
fs.writeFileSync(outputFilePath, '');
// 建立WebSocket用戶端
const ws = new WebSocket(url, {
headers: {
Authorization: `bearer ${apiKey}`,
'X-DashScope-DataInspection': 'enable'
}
});
let taskStarted = false;
let taskId = uuid();
ws.on('open', () => {
console.log('已串連到WebSocket伺服器');
// 發送run-task指令
const runTaskMessage = JSON.stringify({
header: {
action: 'run-task',
task_id: taskId,
streaming: 'duplex'
},
payload: {
task_group: 'audio',
task: 'tts',
function: 'SpeechSynthesizer',
model: 'cosyvoice-v3-flash',
parameters: {
text_type: 'PlainText',
voice: 'longanyang', // 音色
format: 'mp3', // 音頻格式
sample_rate: 22050, // 採樣率
volume: 50, // 音量
rate: 1, // 語速
pitch: 1, // 音調
enable_ssml: false // 是否開啟SSML功能。如果enable_ssml設為true,只允許發送一次continue-task指令,否則會報錯“Text request limit violated, expected 1.”
},
input: {}
}
});
ws.send(runTaskMessage);
console.log('已發送run-task訊息');
});
const fileStream = fs.createWriteStream(outputFilePath, { flags: 'a' });
ws.on('message', (data, isBinary) => {
if (isBinary) {
// 寫入位元據到檔案
fileStream.write(data);
} else {
const message = JSON.parse(data);
switch (message.header.event) {
case 'task-started':
taskStarted = true;
console.log('任務已開始');
// 發送continue-task指令
sendContinueTasks(ws);
break;
case 'task-finished':
console.log('任務已完成');
ws.close();
fileStream.end(() => {
console.log('檔案流已關閉');
});
break;
case 'task-failed':
console.error('任務失敗:', message.header.error_message);
ws.close();
fileStream.end(() => {
console.log('檔案流已關閉');
});
break;
default:
// 可以在這裡處理result-generated
break;
}
}
});
function sendContinueTasks(ws) {
const texts = [
'床前明月光,',
'疑是地上霜。',
'舉頭望明月,',
'低頭思故鄉。'
];
texts.forEach((text, index) => {
setTimeout(() => {
if (taskStarted) {
const continueTaskMessage = JSON.stringify({
header: {
action: 'continue-task',
task_id: taskId,
streaming: 'duplex'
},
payload: {
input: {
text: text
}
}
});
ws.send(continueTaskMessage);
console.log(`已發送continue-task,文本:${text}`);
}
}, index * 1000); // 每隔1秒發送一次
});
// 發送finish-task指令
setTimeout(() => {
if (taskStarted) {
const finishTaskMessage = JSON.stringify({
header: {
action: 'finish-task',
task_id: taskId,
streaming: 'duplex'
},
payload: {
input: {}
}
});
ws.send(finishTaskMessage);
console.log('已發送finish-task');
}
}, texts.length * 1000 + 1000); // 在所有continue-task指令發送完畢後1秒發送
}
ws.on('close', () => {
console.log('已斷開與WebSocket伺服器的串連');
});
Java
如您使用Java程式設計語言,建議採用Java DashScope SDK進行開發,詳情請參見Java SDK。
以下是Java WebSocket的調用樣本。在運行樣本前,請確保已匯入以下依賴:
Java-WebSocket
jackson-databind
推薦您使用Maven或Gradle管理依賴包,其配置如下:
pom.xml
<dependencies>
<!-- WebSocket Client -->
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.5.3</version>
</dependency>
<!-- JSON Processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.0</version>
</dependency>
</dependencies>
build.gradle
// 省略其它代碼
dependencies {
// WebSocket Client
implementation 'org.java-websocket:Java-WebSocket:1.5.3'
// JSON Processing
implementation 'com.fasterxml.jackson.core:jackson-databind:2.13.0'
}
// 省略其它代碼
Java代碼如下:
import com.fasterxml.jackson.databind.ObjectMapper;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.*;
public class TTSWebSocketClient extends WebSocketClient {
private final String taskId = UUID.randomUUID().toString();
private final String outputFile = "output_" + System.currentTimeMillis() + ".mp3";
private boolean taskFinished = false;
public TTSWebSocketClient(URI serverUri, Map<String, String> headers) {
super(serverUri, headers);
}
@Override
public void onOpen(ServerHandshake serverHandshake) {
System.out.println("串連成功");
// 發送run-task指令
// 如果enable_ssml設為true,只允許發送一次continue-task指令,否則會報錯“Text request limit violated, expected 1.”
String runTaskCommand = "{ \"header\": { \"action\": \"run-task\", \"task_id\": \"" + taskId + "\", \"streaming\": \"duplex\" }, \"payload\": { \"task_group\": \"audio\", \"task\": \"tts\", \"function\": \"SpeechSynthesizer\", \"model\": \"cosyvoice-v3-flash\", \"parameters\": { \"text_type\": \"PlainText\", \"voice\": \"longanyang\", \"format\": \"mp3\", \"sample_rate\": 22050, \"volume\": 50, \"rate\": 1, \"pitch\": 1, \"enable_ssml\": false }, \"input\": {} }}";
send(runTaskCommand);
}
@Override
public void onMessage(String message) {
System.out.println("收到服務端返回的訊息:" + message);
try {
// Parse JSON message
Map<String, Object> messageMap = new ObjectMapper().readValue(message, Map.class);
if (messageMap.containsKey("header")) {
Map<String, Object> header = (Map<String, Object>) messageMap.get("header");
if (header.containsKey("event")) {
String event = (String) header.get("event");
if ("task-started".equals(event)) {
System.out.println("收到服務端返回的task-started事件");
List<String> texts = Arrays.asList(
"床前明月光,疑是地上霜",
"舉頭望明月,低頭思故鄉"
);
for (String text : texts) {
// 發送continue-task指令
sendContinueTask(text);
}
// 發送finish-task指令
sendFinishTask();
} else if ("task-finished".equals(event)) {
System.out.println("收到服務端返回的task-finished事件");
taskFinished = true;
closeConnection();
} else if ("task-failed".equals(event)) {
System.out.println("任務失敗:" + message);
closeConnection();
}
}
}
} catch (Exception e) {
System.err.println("出現異常:" + e.getMessage());
}
}
@Override
public void onMessage(ByteBuffer message) {
System.out.println("收到的二進位音頻資料大小為:" + message.remaining());
try (FileOutputStream fos = new FileOutputStream(outputFile, true)) {
byte[] buffer = new byte[message.remaining()];
message.get(buffer);
fos.write(buffer);
System.out.println("音頻資料已寫入本地檔案" + outputFile + "中");
} catch (IOException e) {
System.err.println("音頻資料寫入本地檔案失敗:" + e.getMessage());
}
}
@Override
public void onClose(int code, String reason, boolean remote) {
System.out.println("串連關閉:" + reason + " (" + code + ")");
}
@Override
public void onError(Exception ex) {
System.err.println("報錯:" + ex.getMessage());
ex.printStackTrace();
}
private void sendContinueTask(String text) {
String command = "{ \"header\": { \"action\": \"continue-task\", \"task_id\": \"" + taskId + "\", \"streaming\": \"duplex\" }, \"payload\": { \"input\": { \"text\": \"" + text + "\" } }}";
send(command);
}
private void sendFinishTask() {
String command = "{ \"header\": { \"action\": \"finish-task\", \"task_id\": \"" + taskId + "\", \"streaming\": \"duplex\" }, \"payload\": { \"input\": {} }}";
send(command);
}
private void closeConnection() {
if (!isClosed()) {
close();
}
}
public static void main(String[] args) {
try {
String apiKey = System.getenv("DASHSCOPE_API_KEY");
if (apiKey == null || apiKey.isEmpty()) {
System.err.println("請設定 DASHSCOPE_API_KEY 環境變數");
return;
}
Map<String, String> headers = new HashMap<>();
headers.put("Authorization", "bearer " + apiKey);
TTSWebSocketClient client = new TTSWebSocketClient(new URI("wss://dashscope.aliyuncs.com/api-ws/v1/inference/"), headers);
client.connect();
while (!client.isClosed() && !client.taskFinished) {
Thread.sleep(1000);
}
} catch (Exception e) {
System.err.println("串連WebSocket服務失敗:" + e.getMessage());
e.printStackTrace();
}
}
}
Python
如您使用Python程式設計語言,建議採用Python DashScope SDK進行開發,詳情請參見Python SDK。
以下是Python WebSocket的調用樣本。在運行樣本前,請確保通過如下方式匯入依賴:
pip uninstall websocket-client
pip uninstall websocket
pip install websocket-client
重要 請不要將運行範例程式碼的Python檔案命名為“websocket.py”,否則會報錯(AttributeError: module 'websocket' has no attribute 'WebSocketApp'. Did you mean: 'WebSocket'?)。
import websocket
import json
import uuid
import os
import time
class TTSClient:
def __init__(self, api_key, uri):
"""
初始化 TTSClient 執行個體
參數:
api_key (str): 鑒權用的 API Key
uri (str): WebSocket 服務地址
"""
self.api_key = api_key # 替換為你的 API Key
self.uri = uri # 替換為你的 WebSocket 地址
self.task_id = str(uuid.uuid4()) # 產生唯一任務 ID
self.output_file = f"output_{int(time.time())}.mp3" # 輸出音頻檔案路徑
self.ws = None # WebSocketApp 執行個體
self.task_started = False # 是否收到 task-started
self.task_finished = False # 是否收到 task-finished / task-failed
def on_open(self, ws):
"""
WebSocket 串連建立時回呼函數
發送 run-task 指令開啟語音合成任務
"""
print("WebSocket 已串連")
# 構造 run-task 指令
run_task_cmd = {
"header": {
"action": "run-task",
"task_id": self.task_id,
"streaming": "duplex"
},
"payload": {
"task_group": "audio",
"task": "tts",
"function": "SpeechSynthesizer",
"model": "cosyvoice-v3-flash",
"parameters": {
"text_type": "PlainText",
"voice": "longanyang",
"format": "mp3",
"sample_rate": 22050,
"volume": 50,
"rate": 1,
"pitch": 1,
# 如果enable_ssml設為True,只允許發送一次continue-task指令,否則會報錯“Text request limit violated, expected 1.”
"enable_ssml": False
},
"input": {}
}
}
# 發送 run-task 指令
ws.send(json.dumps(run_task_cmd))
print("已發送 run-task 指令")
def on_message(self, ws, message):
"""
接收到訊息時的回呼函數
區分文本和二進位訊息處理
"""
if isinstance(message, str):
# 處理 JSON 簡訊
try:
msg_json = json.loads(message)
print(f"收到 JSON 訊息: {msg_json}")
if "header" in msg_json:
header = msg_json["header"]
if "event" in header:
event = header["event"]
if event == "task-started":
print("任務已啟動")
self.task_started = True
# 發送 continue-task 指令
texts = [
"床前明月光,疑是地上霜",
"舉頭望明月,低頭思故鄉"
]
for text in texts:
self.send_continue_task(text)
# 所有 continue-task 發送完成後發送 finish-task
self.send_finish_task()
elif event == "task-finished":
print("任務已完成")
self.task_finished = True
self.close(ws)
elif event == "task-failed":
error_msg = msg_json.get("error_message", "未知錯誤")
print(f"任務失敗: {error_msg}")
self.task_finished = True
self.close(ws)
except json.JSONDecodeError as e:
print(f"JSON 解析失敗: {e}")
else:
# 處理二進位訊息(音頻資料)
print(f"收到二進位訊息,大小: {len(message)} 位元組")
with open(self.output_file, "ab") as f:
f.write(message)
print(f"已將音頻資料寫入本地檔案{self.output_file}中")
def on_error(self, ws, error):
"""發生錯誤時的回調"""
print(f"WebSocket 出錯: {error}")
def on_close(self, ws, close_status_code, close_msg):
"""串連關閉時的回調"""
print(f"WebSocket 已關閉: {close_msg} ({close_status_code})")
def send_continue_task(self, text):
"""發送 continue-task 指令,附帶要合成的常值內容"""
cmd = {
"header": {
"action": "continue-task",
"task_id": self.task_id,
"streaming": "duplex"
},
"payload": {
"input": {
"text": text
}
}
}
self.ws.send(json.dumps(cmd))
print(f"已發送 continue-task 指令,常值內容: {text}")
def send_finish_task(self):
"""發送 finish-task 指令,結束語音合成任務"""
cmd = {
"header": {
"action": "finish-task",
"task_id": self.task_id,
"streaming": "duplex"
},
"payload": {
"input": {}
}
}
self.ws.send(json.dumps(cmd))
print("已發送 finish-task 指令")
def close(self, ws):
"""主動關閉串連"""
if ws and ws.sock and ws.sock.connected:
ws.close()
print("已主動關閉串連")
def run(self):
"""啟動 WebSocket 用戶端"""
# 佈建要求頭部(鑒權)
header = {
"Authorization": f"bearer {self.api_key}",
"X-DashScope-DataInspection": "enable"
}
# 建立 WebSocketApp 執行個體
self.ws = websocket.WebSocketApp(
self.uri,
header=header,
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close
)
print("正在監聽 WebSocket 訊息...")
self.ws.run_forever() # 啟動長串連監聽
# 樣本使用方式
if __name__ == "__main__":
API_KEY = os.environ.get("DASHSCOPE_API_KEY") # 如您未將API Key配置到環境變數,請將API_KEY 設定為您的 API Key
SERVER_URI = "wss://dashscope.aliyuncs.com/api-ws/v1/inference/" # 替換為你的 WebSocket 地址
client = TTSClient(API_KEY, SERVER_URI)
client.run()