在您使用MCP官方源碼開發符合OAuth 2.0規範的授權流程,並準備在Agent中整合OpenAPI MCP Server時,本文將為您提供全面且詳盡的指導。
自訂OAuth授權流程
範例程式碼基於MCP官方源碼實現了OAuth授權流程。如需修改OAuth授權流程,請調整handle_redirect和handle_callback中的相關代碼。
重要
在生產環境中,應根據實際情況妥善儲存OAuth Token。本文將以InMemoryTokenStorage為例進行示範。
# oauth_handler.py
import asyncio
import webbrowser
from http.server import BaseHTTPRequestHandler, HTTPServer
import threading
from mcp.client.auth import OAuthClientProvider, TokenStorage
from mcp.shared.auth import OAuthToken, OAuthClientInformationFull
from urllib.parse import parse_qs, urlparse
class InMemoryTokenStorage(TokenStorage):
"""Demo In-memory token storage implementation."""
def __init__(self):
self.tokens: OAuthToken | None = None
self.client_info: OAuthClientInformationFull | None = None
async def get_tokens(self) -> OAuthToken | None:
"""Get stored tokens."""
return self.tokens
async def set_tokens(self, tokens: OAuthToken) -> None:
"""Store tokens."""
self.tokens = tokens
async def get_client_info(self) -> OAuthClientInformationFull | None:
"""Get stored client information."""
return self.client_info
async def set_client_info(self, client_info: OAuthClientInformationFull) -> None:
"""Store client information."""
self.client_info = client_info
class CallbackHandler(BaseHTTPRequestHandler):
"""HTTP handler for OAuth callback."""
def __init__(self, callback_server, *args, **kwargs):
self.callback_server = callback_server
super().__init__(*args, **kwargs)
def do_GET(self):
"""Handle GET request for OAuth callback."""
try:
# 解析回調URL中的參數
parsed_url = urlparse(self.path)
params = parse_qs(parsed_url.query)
if 'code' in params:
# 擷取授權碼
code = params['code'][0]
state = params.get('state', [None])[0]
# 儲存結果
self.callback_server.auth_code = code
self.callback_server.auth_state = state
self.callback_server.auth_received = True
# 返回成功頁面
self.send_response(200)
self.send_header('Content-type', 'text/html; charset=utf-8')
self.end_headers()
success_html = """<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>授權成功</title>
</head>
<body>
<h1>授權成功</h1>
<p>您已完成授權,可以返回應用繼續使用。</p>
<p>視窗將在 <span id="countdown">3</span> 秒後自動關閉。</p>
<button onclick="window.close()">立即關閉</button>
<script>
let count = 3;
const el = document.getElementById('countdown');
const timer = setInterval(() => {
count--;
el.textContent = count;
if (count <= 0) {
clearInterval(timer);
window.close();
}
}, 1000);
</script>
</body>
</html>
"""
self.wfile.write(success_html.encode('utf-8'))
elif 'error' in params:
# 處理錯誤
error = params['error'][0]
error_description = params.get('error_description', ['Unknown error'])[0]
self.callback_server.auth_error = f"{error}: {error_description}"
self.callback_server.auth_received = True
# 返回錯誤頁面
self.send_response(400)
self.send_header('Content-type', 'text/html; charset=utf-8')
self.end_headers()
error_html = f"""<!DOCTYPE html>
<html lang=\"zh-CN\">
<head>
<meta charset=\"UTF-8\">
<meta name=\"viewport\" content=\"width=device-width, initial-scale=1.0\">
<title>授權失敗</title>
</head>
<body>
<h1>授權失敗</h1>
<p>在授權過程中發生錯誤。</p>
<p><strong>錯誤碼:</strong>{error}</p>
<p><strong>錯誤描述:</strong>{error_description}</p>
<button onclick=\"window.close()\">關閉視窗</button>
</body>
</html>
"""
self.wfile.write(error_html.encode('utf-8'))
except Exception as e:
self.callback_server.auth_error = str(e)
self.callback_server.auth_received = True
self.send_response(500)
self.send_header('Content-type', 'text/html; charset=utf-8')
self.end_headers()
internal_error_html = f"""<!DOCTYPE html>
<html lang=\"zh-CN\">
<head>
<meta charset=\"UTF-8\">
<meta name=\"viewport\" content=\"width=device-width, initial-scale=1.0\">
<title>伺服器錯誤</title>
</head>
<body>
<h1>伺服器內部錯誤</h1>
<p>抱歉,伺服器遇到了一個內部錯誤,無法完成您的請求。</p>
<pre>{str(e)}</pre>
<button onclick=\"window.close()\">關閉視窗</button>
</body>
</html>
"""
self.wfile.write(internal_error_html.encode('utf-8'))
def log_message(self, format, *args):
"""靜默日誌輸出"""
pass
class CallbackServer:
"""OAuth 回調伺服器"""
def __init__(self, port=3000):
self.port = port
self.server = None
self.thread = None
self.auth_code = None
self.auth_state = None
self.auth_error = None
self.auth_received = False
def start(self):
"""啟動回調伺服器"""
handler = lambda *args, **kwargs: CallbackHandler(self, *args, **kwargs)
self.server = HTTPServer(('localhost', self.port), handler)
self.thread = threading.Thread(target=self.server.serve_forever, daemon=True)
self.thread.start()
print(f"OAuth 回調伺服器已啟動,監聽連接埠 {self.port}")
def stop(self):
"""停止回調伺服器"""
if self.server:
self.server.shutdown()
self.server.server_close()
if self.thread:
self.thread.join(timeout=1)
print("OAuth 回調伺服器已停止")
async def wait_for_callback(self, timeout=300):
"""等待OAuth回調"""
start_time = asyncio.get_event_loop().time()
while not self.auth_received:
if asyncio.get_event_loop().time() - start_time > timeout:
raise TimeoutError("等待OAuth回調逾時")
await asyncio.sleep(0.1)
if self.auth_error:
raise Exception(f"OAuth授權失敗: {self.auth_error}")
return self.auth_code, self.auth_state
# 全域回調伺服器執行個體
_callback_server = None
async def handle_redirect(auth_url: str) -> None:
"""自動開啟瀏覽器進行OAuth授權"""
global _callback_server
# 啟動回調伺服器
if _callback_server is None:
_callback_server = CallbackServer(port=3000)
_callback_server.start()
print(f"正在開啟瀏覽器進行OAuth授權...")
print(f"授權URL: {auth_url}")
# 自動開啟瀏覽器
webbrowser.open(auth_url)
async def handle_callback() -> tuple[str, str | None]:
"""自動處理OAuth回調"""
global _callback_server
if _callback_server is None:
raise Exception("回調伺服器未啟動")
print("等待OAuth授權完成...")
try:
# 等待回調
code, state = await _callback_server.wait_for_callback()
print("OAuth授權成功!")
return code, state
except Exception as e:
print(f"OAuth授權失敗: {e}")
raise
finally:
# 清理伺服器狀態,但保持伺服器運行以便重用
_callback_server.auth_code = None
_callback_server.auth_state = None
_callback_server.auth_error = None
_callback_server.auth_received = False在Agent中整合MCP
本文將利用主流Agent架構,通過OAuth認證實現與OpenAPI MCP Server的串連,並結合大模型與MCP Tools完成對阿里雲資源的操作。
AgentScope
AgentScope是阿里巴巴開源的Agent架構,支援智能體工具管理、智能體長期記憶控制和智能化RAG等。
# -*- coding: utf-8 -*-
"""The main entry point of the ReAct agent example."""
import asyncio
import os
from agentscope.agent import ReActAgent, UserAgent
from agentscope.formatter import DashScopeChatFormatter
from agentscope.memory import InMemoryMemory
from agentscope.model import DashScopeChatModel
from agentscope.tool import (
Toolkit,
execute_shell_command,
execute_python_code,
view_text_file,
)
from agentscope.mcp import HttpStatelessClient
from mcp.client.auth import OAuthClientProvider, OAuthClientInformationFull, OAuthClientMetadata, OAuthToken
from pydantic import AnyUrl
from oauth_handler import InMemoryTokenStorage, handle_redirect, handle_callback
# openai base
# read from .env
load_dotenv()
server_url = "https://openapi-mcp.cn-hangzhou.aliyuncs.com/accounts/14******/custom/****/id/KXy******/mcp"
memory_token_storage = InMemoryTokenStorage()
oauth_provider = OAuthClientProvider(
server_url=server_url,
client_metadata=OAuthClientMetadata(
client_name="AgentScopeExampleClient",
redirect_uris=[AnyUrl("http://localhost:3000/callback")],
grant_types=["authorization_code", "refresh_token"],
response_types=["code"],
scope=None,
),
storage=memory_token_storage,
redirect_handler=handle_redirect,
callback_handler=handle_callback,
)
stateless_client = HttpStatelessClient(
# 用於標識 MCP 的名稱
name="mcp_services_stateless",
transport="streamable_http",
url=server_url,
auth=oauth_provider,
)
async def main() -> None:
"""The main entry point for the ReAct agent example."""
toolkit = Toolkit()
# toolkit.register_tool_function(execute_shell_command)
# toolkit.register_tool_function(execute_python_code)
# toolkit.register_tool_function(view_text_file)
await toolkit.register_mcp_client(stateless_client)
agent = ReActAgent(
name="AlibabaCloudOpsAgent",
sys_prompt="你是阿里雲營運助手,善於使用各種阿里雲產品如 ECS、RDS、VPC 等,完成我的需求。",
model=DashScopeChatModel(
api_key=os.environ.get("DASHSCOPE_API_KEY"),
model_name="qwen3-max-preview",
enable_thinking=False,
stream=True,
),
formatter=DashScopeChatFormatter(),
toolkit=toolkit,
memory=InMemoryMemory(),
)
user = UserAgent("User")
msg = None
while True:
msg = await user(msg)
if msg.get_text_content() == "exit":
break
msg = await agent(msg)
asyncio.run(main())LangGraph
LangGraph是一個用於構建、管理和部署長期運行、有狀態智能體的底層編排架構。
import asyncio
import sys
from dotenv import load_dotenv
import os
from langgraph.prebuilt import create_react_agent
from langchain.chat_models import init_chat_model
from langchain_mcp_adapters.client import MultiServerMCPClient
from mcp.client.auth import OAuthClientProvider, OAuthClientInformationFull, OAuthClientMetadata, OAuthToken
from pydantic import AnyUrl
from oauth_handler import InMemoryTokenStorage, handle_callback, handle_redirect
# openai base
# read from .env
load_dotenv()
async def make_agent():
model = init_chat_model(model=os.getenv("OPENAI_MODEL"), api_key=os.getenv("OPENAI_API_KEY"), base_url=os.getenv("OPENAI_BASE_URL"), model_provider='openai')
# 使用與 MCP 服務相同的伺服器 URL
server_url = "https://openapi-mcp.cn-hangzhou.aliyuncs.com/accounts/1025904068912955/custom/test-ecs/id/1kB196nPAhRIbH1z/mcp"
oauth_provider = OAuthClientProvider(
server_url=server_url,
client_metadata=OAuthClientMetadata(
client_name="Example MCP Client",
redirect_uris=[AnyUrl("http://localhost:3000/callback")],
grant_types=["authorization_code", "refresh_token"],
response_types=["code"],
scope=None,
),
storage=InMemoryTokenStorage(),
redirect_handler=handle_redirect,
callback_handler=handle_callback,
)
mcp_client = MultiServerMCPClient(
{
"resourcecenter": {
"url": server_url,
"transport": "streamable_http",
"auth": oauth_provider
}
}
)
tools = await mcp_client.get_tools()
agent = create_react_agent(
model=model,
tools=tools,
prompt="You are a helpful assistant"
)
return agent
async def chat_loop():
"""對話迴圈"""
# 建立代理
print("正在初始化AI助手...")
agent = await make_agent()
print("AI助手已就緒!輸入 'quit' 或 'exit' 退出\n")
# 對話歷史
messages = []
while True:
try:
# 擷取使用者輸入
user_input = input("使用者: ").strip()
# 檢查退出命令
if user_input.lower() in ['quit', 'exit', '退出']:
print("再見!")
break
# 跳過空輸入
if not user_input:
continue
# 添加使用者訊息到歷史
messages.append({"role": "user", "content": user_input})
print("AI: ", end="", flush=True)
# 調用代理
response = await agent.ainvoke(
{"messages": messages},
{"recursion_limit": 50}
)
# 提取AI回複
ai_response = response["messages"][-1].content
print(ai_response)
# 添加AI回複到歷史
messages.append({"role": "assistant", "content": ai_response})
print() # 空行分隔
except KeyboardInterrupt:
print("\n\n再見!")
break
except Exception as e:
print(f"錯誤: {e}")
print("請重試...\n")
async def main():
await chat_loop()
# 運行主函數
if __name__ == "__main__":
asyncio.run(main())