この Topic では、OAuth 2.0 権限付与フローを使用してエージェントを OpenAPI MCP サーバーに接続する方法を説明します。OAuth コールバックの処理や、一般的なエージェントフレームワークとの統合のためのサンプルコードを提供します。
OAuth 権限付与フローのカスタマイズ
以下のサンプルコードは、MCP 公式ソースコードに基づいており、OAuth 権限付与フローを実装しています。handle_redirect および handle_callback 関数内のコードを修正することで、ユーザーエクスペリエンスをカスタマイズできます。
重要
InMemoryTokenStorage クラスはデモ目的でのみ提供されています。本番環境では、不正アクセスを防ぐために、OAuth トークンに対して安全で永続的なストレージメカニズムを使用する必要があります。
# oauth_handler.py
import asyncio
import webbrowser
from http.server import BaseHTTPRequestHandler, HTTPServer
import threading
from mcp.client.auth import OAuthClientProvider, TokenStorage
from mcp.shared.auth import OAuthToken, OAuthClientInformationFull
from urllib.parse import parse_qs, urlparse
class InMemoryTokenStorage(TokenStorage):
"""デモ用のインメモリトークンストレージ実装。"""
def __init__(self):
self.tokens: OAuthToken | None = None
self.client_info: OAuthClientInformationFull | None = None
async def get_tokens(self) -> OAuthToken | None:
"""保存されているトークンを取得します。"""
return self.tokens
async def set_tokens(self, tokens: OAuthToken) -> None:
"""トークンを保存します。"""
self.tokens = tokens
async def get_client_info(self) -> OAuthClientInformationFull | None:
"""保存されているクライアント情報を取得します。"""
return self.client_info
async def set_client_info(self, client_info: OAuthClientInformationFull) -> None:
"""クライアント情報を保存します。"""
self.client_info = client_info
class CallbackHandler(BaseHTTPRequestHandler):
"""OAuth コールバック用の HTTP ハンドラ。"""
def __init__(self, callback_server, *args, **kwargs):
self.callback_server = callback_server
super().__init__(*args, **kwargs)
def do_GET(self):
"""OAuth コールバックの GET リクエストを処理します。"""
try:
# コールバック URL からパラメーターを解析します。
parsed_url = urlparse(self.path)
params = parse_qs(parsed_url.query)
if 'code' in params:
# 認証コードを取得します。
code = params['code'][0]
state = params.get('state', [None])[0]
# 結果を保存します。
self.callback_server.auth_code = code
self.callback_server.auth_state = state
self.callback_server.auth_received = True
# 成功ページを返します。
self.send_response(200)
self.send_header('Content-type', 'text/html; charset=utf-8')
self.end_headers()
success_html = """<!DOCTYPE html>
<html lang="ja-JP">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>権限付与に成功しました</title>
</head>
<body>
<h1>権限付与に成功しました。</h1>
<p>アプリケーションに戻ることができます。</p>
<p>このウィンドウは 秒後に自動的に閉じます。</p>
<button onclick="window.close()">閉じる</button>
<script>
let count = 3;
const el = document.getElementById('countdown');
const timer = setInterval(() => {
count--;
el.textContent = count;
if (count <= 0) {
clearInterval(timer);
window.close();
}
}, 1000);
</script>
</body>
</html>
"""
self.wfile.write(success_html.encode('utf-8'))
elif 'error' in params:
# エラーを処理します。
error = params['error'][0]
error_description = params.get('error_description', ['Unknown error'])[0]
self.callback_server.auth_error = f"{error}: {error_description}"
self.callback_server.auth_received = True
# エラーメッセージを返します。
self.send_response(400)
self.send_header('Content-type', 'text/html; charset=utf-8')
self.end_headers()
error_html = f"""<!DOCTYPE html>
<html lang=\"ja-JP\">
<head>
<meta charset=\"UTF-8\">
<meta name=\"viewport\" content=\"width=device-width, initial-scale=1.0\">
<title>権限付与に失敗しました</title>
</head>
<body>
<h1>権限付与に失敗しました</h1>
<p>権限付与プロセス中にエラーが発生しました。</p>
<p>{error}</p>
<p>{error_description}</p>
<button onclick="window.close()">閉じる</button>
</body>
</html>
"""
self.wfile.write(error_html.encode('utf-8'))
except Exception as e:
self.callback_server.auth_error = str(e)
self.callback_server.auth_received = True
self.send_response(500)
self.send_header('Content-type', 'text/html; charset=utf-8')
self.end_headers()
internal_error_html = f"""<!DOCTYPE html>
<html lang=\"ja-JP\">
<head>
<meta charset=\"UTF-8\">
<meta name=\"viewport\" content=\"width=device-width, initial-scale=1.0\">
<title>サーバーエラー</title>
</head>
<body>
<h1>内部サーバーエラー</h1>
<p>サーバーで内部エラーが発生し、リクエストを完了できませんでした。</p>
<pre>{str(e)}</pre>
<button onclick="window.close()">閉じる</button>
</body>
</html>
"""
self.wfile.write(internal_error_html.encode('utf-8'))
def log_message(self, format, *args):
"""ログ出力を抑制します"""
pass
class CallbackServer:
"""OAuth コールバックサーバー"""
def __init__(self, port=3000):
self.port = port
self.server = None
self.thread = None
self.auth_code = None
self.auth_state = None
self.auth_error = None
self.auth_received = False
def start(self):
"""コールバックサーバーを起動します"""
handler = lambda *args, **kwargs: CallbackHandler(self, *args, **kwargs)
self.server = HTTPServer(('localhost', self.port), handler)
self.thread = threading.Thread(target=self.server.serve_forever, daemon=True)
self.thread.start()
print(f"OAuth callback server started, listening on port {self.port}")
def stop(self):
"""コールバックサーバーを停止します"""
if self.server:
self.server.shutdown()
self.server.server_close()
if self.thread:
self.thread.join(timeout=1)
print("OAuth callback server stopped")
async def wait_for_callback(self, timeout=300):
"""OAuth コールバックを待ちます"""
start_time = asyncio.get_event_loop().time()
while not self.auth_received:
if asyncio.get_event_loop().time() - start_time > timeout:
raise TimeoutError("Timed out waiting for OAuth callback")
await asyncio.sleep(0.1)
if self.auth_error:
raise Exception(f"OAuth authorization failed: {self.auth_error}")
return self.auth_code, self.auth_state
# グローバルコールバックサーバーインスタンス。
_callback_server = None
async def handle_redirect(auth_url: str) -> None:
"""OAuth 権限付与のためにブラウザを自動的に開きます""""
global _callback_server
# コールバックサーバーを起動します。
if _callback_server is None:
_callback_server = CallbackServer(port=3000)
_callback_server.start()
print(f "Opening browser for OAuth authorization...")
print(f"Authorization URL: {auth_url}")
# ブラウザを自動的に開きます。
webbrowser.open(auth_url)
async def handle_callback() -> tuple[str, str | None]:
"""OAuth コールバックを自動的に処理します"""
global _callback_server
if _callback_server is None:
raise Exception("Callback server is not started")
print("Waiting for OAuth authorization to complete...")
try:
# コールバックを待ちます。
code, state = await _callback_server.wait_for_callback()
print("OAuth authorization successful!")
return code, state
except Exception as e:
print(f"OAuth authorization failed: {e}")
raise
finally:
# サーバーの状態をクリーンアップしますが、再利用のためにサーバーは実行し続けます。
_callback_server.auth_code = None
_callback_server.auth_state = None
_callback_server.auth_error = None
_callback_server.auth_received = FalseAI エージェントへの OpenAPI MCP サーバーの統合
OAuth ハンドラを作成したら、MCP サーバーをエージェントフレームワークと統合できます。このセクションでは、AgentScope と LangGraph の例を示します。
AgentScope
AgentScope は Alibaba のオープンソースのエージェントフレームワークです。 エージェントツール管理、エージェントの長期記憶制御、インテリジェントな検索拡張生成 (RAG) などの特徴をサポートしています。
# -*- coding: utf-8 -*-
"""ReAct エージェントの例のメインエントリポイント。"""
import asyncio
import os
from agentscope.agent import ReActAgent, UserAgent
from agentscope.formatter import DashScopeChatFormatter
from agentscope.memory import InMemoryMemory
from agentscope.model import DashScopeChatModel
from agentscope.tool import (
Toolkit,
execute_shell_command,
execute_python_code,
view_text_file,
)
from agentscope.mcp import HttpStatelessClient
from mcp.client.auth import OAuthClientProvider, OAuthClientInformationFull, OAuthClientMetadata, OAuthToken
from pydantic import AnyUrl
from oauth_handler import InMemoryTokenStorage, handle_redirect, handle_callback
# openai ベース
# .env から読み込み
load_dotenv()
server_url = "https://openapi-mcp.cn-hangzhou.aliyuncs.com/accounts/14******/custom/****/id/KXy******/mcp"
memory_token_storage = InMemoryTokenStorage()
oauth_provider = OAuthClientProvider(
server_url=server_url,
client_metadata=OAuthClientMetadata(
client_name="AgentScopeExampleClient",
redirect_uris=[AnyUrl("http://localhost:3000/callback")],
grant_types=["authorization_code", "refresh_token"],
response_types=["code"],
scope=None,
),
storage=memory_token_storage,
redirect_handler=handle_redirect,
callback_handler=handle_callback,
)
stateless_client = HttpStatelessClient(
# MCP サーバーの名前。
name="mcp_services_stateless",
transport="streamable_http",
url=server_url,
auth=oauth_provider,
)
async def main() -> None:
"""ReAct エージェントの例のメインエントリポイント。"""
toolkit = Toolkit()
# toolkit.register_tool_function(execute_shell_command)
# toolkit.register_tool_function(execute_python_code)
# toolkit.register_tool_function(view_text_file)
await toolkit.register_mcp_client(stateless_client)
agent = ReActAgent(
name="AlibabaCloudOpsAgent",
sys_prompt="あなたは Alibaba Cloud O&M アシスタントです。ECS、RDS、VPC などのさまざまな Alibaba Cloud サービスを駆使して、私のリクエストに応えるのが得意です。",
model=DashScopeChatModel(
api_key=os.environ.get("DASHSCOPE_API_KEY"),
model_name="qwen3-max-preview",
enable_thinking=False,
stream=True,
),
formatter=DashScopeChatFormatter(),
toolkit=toolkit,
memory=InMemoryMemory(),
)
user = UserAgent("User")
msg = None
while True:
msg = await user(msg)
if msg.get_text_content() == "exit":
break
msg = await agent(msg)
asyncio.run(main())LangGraph
LangGraph は、ステートフルな AI エージェントワークフローを構築、デプロイ、管理するために設計されたオープンソースのフレームワークです。
import asyncio
import sys
from dotenv import load_dotenv
import os
from langgraph.prebuilt import create_react_agent
from langchain.chat_models import init_chat_model
from langchain_mcp_adapters.client import MultiServerMCPClient
from mcp.client.auth import OAuthClientProvider, OAuthClientInformationFull, OAuthClientMetadata, OAuthToken
from pydantic import AnyUrl
from oauth_handler import InMemoryTokenStorage, handle_callback, handle_redirect
# openai ベース
# .env から読み込み
load_dotenv()
async def make_agent():
model = init_chat_model(model=os.getenv("OPENAI_MODEL"), api_key=os.getenv("OPENAI_API_KEY"), base_url=os.getenv("OPENAI_BASE_URL"), model_provider='openai')
# MCP サーバーの URL を指定します。
server_url = "https://openapi-mcp.cn-hangzhou.aliyuncs.com/accounts/1025904068912955/custom/test-ecs/id/1kB196nPAhRIbH1z/mcp"
oauth_provider = OAuthClientProvider(
server_url=server_url,
client_metadata=OAuthClientMetadata(
client_name="Example MCP Client",
redirect_uris=[AnyUrl("http://localhost:3000/callback")],
grant_types=["authorization_code", "refresh_token"],
response_types=["code"],
scope=None,
),
storage=InMemoryTokenStorage(),
redirect_handler=handle_redirect,
callback_handler=handle_callback,
)
mcp_client = MultiServerMCPClient(
{
"resourcecenter": {
"url": server_url,
"transport": "streamable_http",
"auth": oauth_provider
}
}
)
tools = await mcp_client.get_tools()
agent = create_react_agent(
model=model,
tools=tools,
prompt="You are a helpful assistant"
)
return agent
async def chat_loop():
"""チャットループ"""
# エージェントを作成します。
print("Initializing AI assistant...")
agent = await make_agent()
print("AI assistant is ready! Enter 'quit' or 'exit' to leave.\n")
# チャット履歴。
messages = []
while True:
try:
# ユーザー入力を取得します。
user_input=input ("User: ").strip()
# 終了コマンドを確認します。
if user_input.lower() in ['quit', 'exit', 'leave']:
print("Goodbye!")
break
# 空の入力をスキップします。
if not user_input:
continue
# ユーザーメッセージを履歴に追加します。
messages.append({"role": "user", "content": user_input})
print("AI: ", end="", flush=True)
# エージェントを呼び出します。
response = await agent.ainvoke(
{"messages": messages},
{"recursion_limit": 50}
)
# AI 応答を抽出します。
ai_response = response["messages"][-1].content
print(ai_response)
# AI 応答を履歴に追加します。
messages.append({"role": "assistant", "content": ai_response})
print() # 空白行で区切ります。
except KeyboardInterrupt:
print("\n\nGoodbye!")
break
except Exception as e:
print(f"Error message: {e}")
print("Please try again...\n")
async def main():
await chat_loop()
# main 関数を実行します。
if __name__ == "__main__":
asyncio.run(main())