リアルタイムワークフローでは、標準プロトコルを使用して、カスタムの大規模言語モデル (LLM) を接続できます。
自己開発 LLM の標準インターフェイス (OpenAI 仕様)
LLM API が OpenAI 仕様に準拠している場合、標準の OpenAI 構成を使用して、カスタム LLM サービスをワークフローに統合できます。現在、OpenAI 準拠サービスへのストリームベースのリクエストのみがサポートされています。
LLM ノードで、 [自社開発 (OpenAI 仕様)] を選択し、次のパラメーターを構成します:
名前 | タイプ | 必須 | 説明 | 値の例 |
ModelId | 文字列 | はい | 標準の OpenAI モデルフィールドです。モデル名を指定します。 | abc |
API-KEY | 文字列 | はい | 標準の OpenAI api_key フィールドです。API 認証情報を提供します。 | AUJH-pfnTNMPBm6iWXcJAcWsrscb5KYaLitQhHBLKrI |
ターゲットモデルの HTTPS アドレス | 文字列 | はい | 標準の OpenAI base_url フィールドです。ターゲットサービスのリクエストアドレスを指定します。 説明 Alibaba Cloud は、OpenAI 仕様の要件に従い、base_url に /chat/completions を自動的に追加します。パスがこのフォーマットと一致していることを確認してください。 | http://www.abc.com |
リアルタイムワークフローが実行されると、次のフォーマットで標準の OpenAI データをアセンブルします。その後、設定したカスタムモデルの HTTPS アドレスに POST リクエストを送信して結果を取得します。入力パラメーターは次のとおりです。
名前 | タイプ | 必須 | 説明 | 値の例 |
messages | 配列 | はい | 過去の会話コンテキストです。最大 20 件のコンテキストレコードが保持されます。配列の早い段階で表示されるレコードは、以前の質問または回答を表します。 説明 システムは、ユーザーの現在の入力と会話履歴を自動的に組み合わせ、大規模言語モデルに送信します。 | [{'role': 'user', 'content': 'What is the weather like today?'},{'role': 'assistant', 'content': 'The weather is sunny today.'},{'role': 'user', 'content': 'What about tomorrow?'}] |
model | 文字列 | はい | モデル名。 | abc |
stream | ブール値 | はい | ストリームモードを使用するかどうかを指定します。現在、ストリームのみがサポートされています。 | True |
extendData | オブジェクト | はい | 追加情報。 | {'instanceId':'68e00b6640e*****3e943332fee7','channelId':'123','sentenceId':'3',userData':'{"aaaa":"bbbb"}'} |
| 文字列 | はい | インスタンス ID。 | 68e00b6640e*****3e943332fee7 |
| 文字列 | はい | チャネル ID。 | 123 |
| Int | はい | Q&A ペア ID。 説明 同じユーザーの質問に対して、エージェントの応答は同じ ID を使用します。 | 3 |
| String | いいえ | 電話の発信者番号。 | 13800000001 |
| String | いいえ | 電話の着信者番号。 | 13800000002 |
| 文字列 | いいえ | インスタンスの開始時に渡した UserData ビジネスフィールドのデータ。 | {"aaaa":"bbbb"} |
カスタム LLM (OpenAI 仕様) サーバー
Python
import json
import time
from loguru import logger
from flask import Flask, request, jsonify, Response
app = Flask(__name__)
API_KEY = "YOURAPIKEY"
@app.route('/v1/chat/completions', methods=['POST'])
def chat_completion():
# API キーを確認
auth_header = request.headers.get('Authorization')
if not auth_header or auth_header.split()[1] != API_KEY:
return jsonify({"error": "Unauthorized"}), 401
data = request.json
logger.info(f"data is {data}")
task_id = request.args.get('task_id')
room_id = request.args.get('room_id')
for header, value in request.headers.items():
logger.info(f"{header}: {value}")
# クエリパラメーターを出力
logger.info("\nQuery Parameters:")
for key, value in request.args.items():
logger.info(f"{key}: {value}")
logger.info(f"task_id: {task_id}, room_id: {room_id}")
stream = data.get('stream', False)
if stream:
return Response(generate_stream_response(data), content_type='text/event-stream')
else:
return jsonify(generate_response(data))
def generate_response(data):
response = "これはアナログ AI アシスタントの応答です。実際のアプリケーションでは、ここで実際の AI モデルを呼び出します。"
return {
"id": "chatcmpl-123",
"object": "chat.completion",
"created": int(time.time()),
"model": data['model'],
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": response
},
"finish_reason": "stop"
}],
"usage": {
"prompt_tokens": sum(len(m['content']) for m in data['messages']),
"completion_tokens": len(response),
"total_tokens": sum(len(m['content']) for m in data['messages']) + len(response)
}
}
def generate_stream_response(data):
response = "これはアナログ AI アシスタントのストリーム応答です。実際のアプリケーションでは、ここで実際の AI モデルを呼び出します。"
words = list(response)
for i, word in enumerate(words):
chunk = {
"id": "chatcmpl-123",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": data['model'],
"choices": [{
"index": 0,
"delta": {
"content": word,
"tool_calls": [
{
"id": "call_abc123",
"type": "function",
"function": {
"name": "hangup",
"arguments": "\{\}"
}
}
]
},
"finish_reason": None if i < len(words) - 1 else "stop"
}]
}
logger.info(chunk)
yield f"data: {json.dumps(chunk)}\n\n"
time.sleep(0.1) # 処理時間をシミュレート
yield "data: [DONE]\n\n"
if __name__ == '__main__':
logger.info(f"Server is running with API_KEY: {API_KEY}")
app.run(port=8083, debug=True)