すべてのプロダクト
Search
ドキュメントセンター

OpenAPI Explorer:OpenAPI MCP サーバーと AI エージェントの統合

最終更新日:Jan 22, 2026

この Topic では、OAuth 2.0 権限付与フローを使用してエージェントを OpenAPI MCP サーバーに接続する方法を説明します。OAuth コールバックの処理や、一般的なエージェントフレームワークとの統合のためのサンプルコードを提供します。

OAuth 権限付与フローのカスタマイズ

以下のサンプルコードは、MCP 公式ソースコードに基づいており、OAuth 権限付与フローを実装しています。handle_redirect および handle_callback 関数内のコードを修正することで、ユーザーエクスペリエンスをカスタマイズできます。

重要

InMemoryTokenStorage クラスはデモ目的でのみ提供されています。本番環境では、不正アクセスを防ぐために、OAuth トークンに対して安全で永続的なストレージメカニズムを使用する必要があります。

# oauth_handler.py
import asyncio
import webbrowser
from http.server import BaseHTTPRequestHandler, HTTPServer
import threading
from mcp.client.auth import OAuthClientProvider, TokenStorage
from mcp.shared.auth import OAuthToken, OAuthClientInformationFull
from urllib.parse import parse_qs, urlparse


class InMemoryTokenStorage(TokenStorage):
    """デモ用のインメモリトークンストレージ実装。"""

    def __init__(self):
        self.tokens: OAuthToken | None = None
        self.client_info: OAuthClientInformationFull | None = None

    async def get_tokens(self) -> OAuthToken | None:
        """保存されているトークンを取得します。"""
        return self.tokens

    async def set_tokens(self, tokens: OAuthToken) -> None:
        """トークンを保存します。"""
        self.tokens = tokens

    async def get_client_info(self) -> OAuthClientInformationFull | None:
        """保存されているクライアント情報を取得します。"""
        return self.client_info

    async def set_client_info(self, client_info: OAuthClientInformationFull) -> None:
        """クライアント情報を保存します。"""
        self.client_info = client_info


class CallbackHandler(BaseHTTPRequestHandler):
    """OAuth コールバック用の HTTP ハンドラ。"""
    
    def __init__(self, callback_server, *args, **kwargs):
        self.callback_server = callback_server
        super().__init__(*args, **kwargs)
    
    def do_GET(self):
        """OAuth コールバックの GET リクエストを処理します。"""
        try:
            # コールバック URL からパラメーターを解析します。
            parsed_url = urlparse(self.path)
            params = parse_qs(parsed_url.query)
            
            if 'code' in params:
                # 認証コードを取得します。
                code = params['code'][0]
                state = params.get('state', [None])[0]
                
                # 結果を保存します。
                self.callback_server.auth_code = code
                self.callback_server.auth_state = state
                self.callback_server.auth_received = True
                
                # 成功ページを返します。
                self.send_response(200)
                self.send_header('Content-type', 'text/html; charset=utf-8')
                self.end_headers()
                
                success_html = """<!DOCTYPE html>
                <html lang="ja-JP">
                <head>
                    <meta charset="UTF-8">
                    <meta name="viewport" content="width=device-width, initial-scale=1.0">
                    <title>権限付与に成功しました</title>
                </head>
                <body>
                    <h1>権限付与に成功しました。</h1>
                    <p>アプリケーションに戻ることができます。</p>
                    <p>このウィンドウは  秒後に自動的に閉じます。</p>
                    <button onclick="window.close()">閉じる</button>
                    <script>
                        let count = 3;
                        const el = document.getElementById('countdown');
                        const timer = setInterval(() => {
                            count--;
                            el.textContent = count;
                            if (count <= 0) {
                                clearInterval(timer);
                                window.close();
                            }
                        }, 1000);
                    </script>
                </body>
                </html>
                """
                self.wfile.write(success_html.encode('utf-8'))
                
            elif 'error' in params:
                # エラーを処理します。
                error = params['error'][0]
                error_description = params.get('error_description', ['Unknown error'])[0]
                
                self.callback_server.auth_error = f"{error}: {error_description}"
                self.callback_server.auth_received = True
                
                # エラーメッセージを返します。
                self.send_response(400)
                self.send_header('Content-type', 'text/html; charset=utf-8')
                self.end_headers()
                
                error_html = f"""<!DOCTYPE html>
                <html lang=\"ja-JP\">
                <head>
                    <meta charset=\"UTF-8\">
                    <meta name=\"viewport\" content=\"width=device-width, initial-scale=1.0\">
                    <title>権限付与に失敗しました</title>
                </head>
                <body>
                    <h1>権限付与に失敗しました</h1>
                    <p>権限付与プロセス中にエラーが発生しました。</p>
                    <p>{error}</p>
                    <p>{error_description}</p>
                    <button onclick="window.close()">閉じる</button>
                </body>
                </html>
                """
                self.wfile.write(error_html.encode('utf-8'))
            
        except Exception as e:
            self.callback_server.auth_error = str(e)
            self.callback_server.auth_received = True
            
            self.send_response(500)
            self.send_header('Content-type', 'text/html; charset=utf-8')
            self.end_headers()
            
            internal_error_html = f"""<!DOCTYPE html>
            <html lang=\"ja-JP\">
            <head>
                <meta charset=\"UTF-8\">
                <meta name=\"viewport\" content=\"width=device-width, initial-scale=1.0\">
                <title>サーバーエラー</title>
            </head>
            <body>
                <h1>内部サーバーエラー</h1>
                <p>サーバーで内部エラーが発生し、リクエストを完了できませんでした。</p>
                <pre>{str(e)}</pre>
                <button onclick="window.close()">閉じる</button>
            </body>
            </html>
            """
            self.wfile.write(internal_error_html.encode('utf-8'))
    
    def log_message(self, format, *args):
        """ログ出力を抑制します"""
        pass


class CallbackServer:
    """OAuth コールバックサーバー"""
    
    def __init__(self, port=3000):
        self.port = port
        self.server = None
        self.thread = None
        self.auth_code = None
        self.auth_state = None
        self.auth_error = None
        self.auth_received = False
    
    def start(self):
        """コールバックサーバーを起動します"""
        handler = lambda *args, **kwargs: CallbackHandler(self, *args, **kwargs)
        self.server = HTTPServer(('localhost', self.port), handler)
        self.thread = threading.Thread(target=self.server.serve_forever, daemon=True)
        self.thread.start()
        print(f"OAuth callback server started, listening on port {self.port}")
    
    def stop(self):
        """コールバックサーバーを停止します"""
        if self.server:
            self.server.shutdown()
            self.server.server_close()
        if self.thread:
            self.thread.join(timeout=1)
        print("OAuth callback server stopped")
    
    async def wait_for_callback(self, timeout=300):
        """OAuth コールバックを待ちます"""
        start_time = asyncio.get_event_loop().time()
        
        while not self.auth_received:
            if asyncio.get_event_loop().time() - start_time > timeout:
                raise TimeoutError("Timed out waiting for OAuth callback")
            await asyncio.sleep(0.1)
        
        if self.auth_error:
            raise Exception(f"OAuth authorization failed: {self.auth_error}")
        
        return self.auth_code, self.auth_state


# グローバルコールバックサーバーインスタンス。
_callback_server = None


async def handle_redirect(auth_url: str) -> None:
    """OAuth 権限付与のためにブラウザを自動的に開きます""""
    global _callback_server
    
    # コールバックサーバーを起動します。
    if _callback_server is None:
        _callback_server = CallbackServer(port=3000)
        _callback_server.start()
    
    print(f "Opening browser for OAuth authorization...")
    print(f"Authorization URL: {auth_url}")
    
    # ブラウザを自動的に開きます。
    webbrowser.open(auth_url)


async def handle_callback() -> tuple[str, str | None]:
    """OAuth コールバックを自動的に処理します"""
    global _callback_server
    
    if _callback_server is None:
        raise Exception("Callback server is not started")
    
    print("Waiting for OAuth authorization to complete...")
    
    try:
        # コールバックを待ちます。
        code, state = await _callback_server.wait_for_callback()
        print("OAuth authorization successful!")
        return code, state
    
    except Exception as e:
        print(f"OAuth authorization failed: {e}")
        raise
    
    finally:
        # サーバーの状態をクリーンアップしますが、再利用のためにサーバーは実行し続けます。
        _callback_server.auth_code = None
        _callback_server.auth_state = None
        _callback_server.auth_error = None
        _callback_server.auth_received = False

AI エージェントへの OpenAPI MCP サーバーの統合

OAuth ハンドラを作成したら、MCP サーバーをエージェントフレームワークと統合できます。このセクションでは、AgentScope と LangGraph の例を示します。

AgentScope

AgentScope は Alibaba のオープンソースのエージェントフレームワークです。 エージェントツール管理エージェントの長期記憶制御、インテリジェントな検索拡張生成 (RAG) などの特徴をサポートしています。

# -*- coding: utf-8 -*-
"""ReAct エージェントの例のメインエントリポイント。"""
import asyncio
import os

from agentscope.agent import ReActAgent, UserAgent
from agentscope.formatter import DashScopeChatFormatter
from agentscope.memory import InMemoryMemory
from agentscope.model import DashScopeChatModel
from agentscope.tool import (
    Toolkit,
    execute_shell_command,
    execute_python_code,
    view_text_file,
)
from agentscope.mcp import HttpStatelessClient

from mcp.client.auth import OAuthClientProvider, OAuthClientInformationFull, OAuthClientMetadata, OAuthToken
from pydantic import AnyUrl
from oauth_handler import InMemoryTokenStorage, handle_redirect, handle_callback


# openai ベース   
# .env から読み込み
load_dotenv()

server_url = "https://openapi-mcp.cn-hangzhou.aliyuncs.com/accounts/14******/custom/****/id/KXy******/mcp"

memory_token_storage = InMemoryTokenStorage()

oauth_provider = OAuthClientProvider(
    server_url=server_url,
    client_metadata=OAuthClientMetadata(
        client_name="AgentScopeExampleClient",
        redirect_uris=[AnyUrl("http://localhost:3000/callback")],
        grant_types=["authorization_code", "refresh_token"],
        response_types=["code"],
        scope=None,
    ),
    storage=memory_token_storage,
    redirect_handler=handle_redirect,
    callback_handler=handle_callback,
)

stateless_client = HttpStatelessClient(
    # MCP サーバーの名前。
    name="mcp_services_stateless",
    transport="streamable_http",
    url=server_url,
    auth=oauth_provider,
)

async def main() -> None:
    """ReAct エージェントの例のメインエントリポイント。"""
    toolkit = Toolkit()
    # toolkit.register_tool_function(execute_shell_command)
    # toolkit.register_tool_function(execute_python_code)
    # toolkit.register_tool_function(view_text_file)
    await toolkit.register_mcp_client(stateless_client)

    agent = ReActAgent(
        name="AlibabaCloudOpsAgent",
        sys_prompt="あなたは Alibaba Cloud O&M アシスタントです。ECS、RDS、VPC などのさまざまな Alibaba Cloud サービスを駆使して、私のリクエストに応えるのが得意です。",
        model=DashScopeChatModel(
            api_key=os.environ.get("DASHSCOPE_API_KEY"),
            model_name="qwen3-max-preview",
            enable_thinking=False,
            stream=True,
        ),
        formatter=DashScopeChatFormatter(),
        toolkit=toolkit,
        memory=InMemoryMemory(),
    )
    user = UserAgent("User")

    msg = None
    while True:
        msg = await user(msg)
        if msg.get_text_content() == "exit":
            break
        msg = await agent(msg)


asyncio.run(main())

LangGraph

LangGraph は、ステートフルな AI エージェントワークフローを構築、デプロイ、管理するために設計されたオープンソースのフレームワークです。

import asyncio
import sys
from dotenv import load_dotenv
import os
from langgraph.prebuilt import create_react_agent
from langchain.chat_models import init_chat_model
from langchain_mcp_adapters.client import MultiServerMCPClient
from mcp.client.auth import OAuthClientProvider, OAuthClientInformationFull, OAuthClientMetadata, OAuthToken
from pydantic import AnyUrl 

from oauth_handler import InMemoryTokenStorage, handle_callback, handle_redirect


# openai ベース   
# .env から読み込み
load_dotenv()

async def make_agent():
    model = init_chat_model(model=os.getenv("OPENAI_MODEL"), api_key=os.getenv("OPENAI_API_KEY"), base_url=os.getenv("OPENAI_BASE_URL"), model_provider='openai')

    # MCP サーバーの URL を指定します。
    server_url = "https://openapi-mcp.cn-hangzhou.aliyuncs.com/accounts/1025904068912955/custom/test-ecs/id/1kB196nPAhRIbH1z/mcp"
    
    oauth_provider = OAuthClientProvider(
        server_url=server_url,
        client_metadata=OAuthClientMetadata(
            client_name="Example MCP Client",
            redirect_uris=[AnyUrl("http://localhost:3000/callback")],
            grant_types=["authorization_code", "refresh_token"],
            response_types=["code"],
            scope=None,
        ),
        storage=InMemoryTokenStorage(),
        redirect_handler=handle_redirect,
        callback_handler=handle_callback,
    )

    
    mcp_client = MultiServerMCPClient(
        {
            "resourcecenter": {
                "url": server_url,
                "transport": "streamable_http",
                "auth": oauth_provider
            }
        }
    )
    tools = await mcp_client.get_tools()
    
    agent = create_react_agent(
        model=model,  
        tools=tools,  
        prompt="You are a helpful assistant"  
    )

    return agent

async def chat_loop():
    """チャットループ"""
    # エージェントを作成します。
    print("Initializing AI assistant...")
    agent = await make_agent()
    print("AI assistant is ready! Enter 'quit' or 'exit' to leave.\n")
    
    # チャット履歴。
    messages = []
    
    while True:
        try:
            # ユーザー入力を取得します。
            user_input=input ("User: ").strip()
            
            # 終了コマンドを確認します。
            if user_input.lower() in ['quit', 'exit', 'leave']:
                print("Goodbye!")
                break
            
            # 空の入力をスキップします。
            if not user_input:
                continue
            
            # ユーザーメッセージを履歴に追加します。
            messages.append({"role": "user", "content": user_input})
            
            print("AI: ", end="", flush=True)
            
            # エージェントを呼び出します。
            response = await agent.ainvoke(
                {"messages": messages},
                {"recursion_limit": 50}
            )
            
            # AI 応答を抽出します。
            ai_response = response["messages"][-1].content
            print(ai_response)
            
            # AI 応答を履歴に追加します。
            messages.append({"role": "assistant", "content": ai_response})
            
            print() # 空白行で区切ります。
            
        except KeyboardInterrupt:
            print("\n\nGoodbye!")
            break
        except Exception as e:
            print(f"Error message: {e}")
            print("Please try again...\n")

async def main():
    await chat_loop()

# main 関数を実行します。
if __name__ == "__main__":
    asyncio.run(main())