Topik ini menjelaskan cara menghubungkan agen ke server OpenAPI MCP menggunakan alur otorisasi OAuth 2.0, serta menyediakan contoh kode untuk menangani callback OAuth dan mengintegrasikannya dengan framework agen populer.
Kustomisasi alur otorisasi OAuth
Kode contoh berikut, yang didasarkan pada kode sumber MCP resmi, mengimplementasikan alur otorisasi OAuth. Anda dapat menyesuaikan pengalaman pengguna dengan memodifikasi kode dalam fungsi handle_redirect dan handle_callback.
Kelas InMemoryTokenStorage hanya disediakan untuk tujuan demonstrasi. Di lingkungan produksi, Anda harus menggunakan mekanisme penyimpanan persisten yang aman untuk token OAuth guna mencegah akses tidak sah.
# oauth_handler.py
import asyncio
import webbrowser
from http.server import BaseHTTPRequestHandler, HTTPServer
import threading
from mcp.client.auth import OAuthClientProvider, TokenStorage
from mcp.shared.auth import OAuthToken, OAuthClientInformationFull
from urllib.parse import parse_qs, urlparse
class InMemoryTokenStorage(TokenStorage):
"""Implementasi penyimpanan token demo di memori."""
def __init__(self):
self.tokens: OAuthToken | None = None
self.client_info: OAuthClientInformationFull | None = None
async def get_tokens(self) -> OAuthToken | None:
"""Mendapatkan token yang tersimpan."""
return self.tokens
async def set_tokens(self, tokens: OAuthToken) -> None:
"""Menyimpan token."""
self.tokens = tokens
async def get_client_info(self) -> OAuthClientInformationFull | None:
"""Mendapatkan informasi klien yang tersimpan."""
return self.client_info
async def set_client_info(self, client_info: OAuthClientInformationFull) -> None:
"""Menyimpan informasi klien."""
self.client_info = client_info
class CallbackHandler(BaseHTTPRequestHandler):
"""Penangan HTTP untuk callback OAuth."""
def __init__(self, callback_server, *args, **kwargs):
self.callback_server = callback_server
super().__init__(*args, **kwargs)
def do_GET(self):
"""Menangani permintaan GET untuk callback OAuth."""
try:
# Mengurai parameter dari URL callback.
parsed_url = urlparse(self.path)
params = parse_qs(parsed_url.query)
if 'code' in params:
# Mendapatkan kode otorisasi.
code = params['code'][0]
state = params.get('state', [None])[0]
# Menyimpan hasilnya.
self.callback_server.auth_code = code
self.callback_server.auth_state = state
self.callback_server.auth_received = True
# Mengembalikan halaman sukses.
self.send_response(200)
self.send_header('Content-type', 'text/html; charset=utf-8')
self.end_headers()
success_html = """<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Otorisasi berhasil</title>
</head>
<body>
<h1>Otorisasi berhasil.</h1>
<p>Anda sekarang dapat kembali ke aplikasi Anda. </p>
<p>Jendela ini akan otomatis ditutup dalam detik. </p>
<button onclick="window.close()">Close</button>
<script>
let count = 3;
const el = document.getElementById('countdown');
const timer = setInterval(() => {
count--;
el.textContent = count;
if (count <= 0) {
clearInterval(timer);
window.close();
}
}, 1000);
</script>
</body>
</html>
"""
self.wfile.write(success_html.encode('utf-8'))
elif 'error' in params:
# Menangani error.
error = params['error'][0]
error_description = params.get('error_description', ['Unknown error'])[0]
self.callback_server.auth_error = f"{error}: {error_description}"
self.callback_server.auth_received = True
# Mengembalikan pesan error.
self.send_response(400)
self.send_header('Content-type', 'text/html; charset=utf-8')
self.end_headers()
error_html = f"""<!DOCTYPE html>
<html lang=\"zh-CN\">
<head>
<meta charset=\"UTF-8\">
<meta name=\"viewport\" content=\"width=device-width, initial-scale=1.0\">
<title>Otorisasi gagal</title>
</head>
<body>
<h1>Otorisasi gagal</h1>
<p>Terjadi kesalahan selama proses otorisasi. </p>
<p>{error}</p>
<p>{error_description}</p>
<button onclick="window.close()">Close</button>
</body>
</html>
"""
self.wfile.write(error_html.encode('utf-8'))
except Exception as e:
self.callback_server.auth_error = str(e)
self.callback_server.auth_received = True
self.send_response(500)
self.send_header('Content-type', 'text/html; charset=utf-8')
self.end_headers()
internal_error_html = f"""<!DOCTYPE html>
<html lang=\"zh-CN\">
<head>
<meta charset=\"UTF-8\">
<meta name=\"viewport\" content=\"width=device-width, initial-scale=1.0\">
<title>Error server</title>
</head>
<body>
<h1>Error server internal</h1>
<p>Server mengalami error internal dan tidak dapat menyelesaikan permintaan Anda. </p>
<pre>{str(e)}</pre>
<button onclick="window.close()">Close</button>
</body>
</html>
"""
self.wfile.write(internal_error_html.encode('utf-8'))
def log_message(self, format, *args):
"""Menonaktifkan output log"""
pass
class CallbackServer:
"""Server callback OAuth"""
def __init__(self, port=3000):
self.port = port
self.server = None
self.thread = None
self.auth_code = None
self.auth_state = None
self.auth_error = None
self.auth_received = False
def start(self):
"""Memulai server callback"""
handler = lambda *args, **kwargs: CallbackHandler(self, *args, **kwargs)
self.server = HTTPServer(('localhost', self.port), handler)
self.thread = threading.Thread(target=self.server.serve_forever, daemon=True)
self.thread.start()
print(f"Server callback OAuth dimulai, mendengarkan pada port {self.port}")
def stop(self):
"""Menghentikan server callback"""
if self.server:
self.server.shutdown()
self.server.server_close()
if self.thread:
self.thread.join(timeout=1)
print("Server callback OAuth dihentikan")
async def wait_for_callback(self, timeout=300):
"""Menunggu callback OAuth"""
start_time = asyncio.get_event_loop().time()
while not self.auth_received:
if asyncio.get_event_loop().time() - start_time > timeout:
raise TimeoutError("Waktu tunggu callback OAuth habis")
await asyncio.sleep(0.1)
if self.auth_error:
raise Exception(f"Otorisasi OAuth gagal: {self.auth_error}")
return self.auth_code, self.auth_state
# Instans global server callback.
_callback_server = None
async def handle_redirect(auth_url: str) -> None:
"""Secara otomatis membuka browser untuk otorisasi OAuth"""
global _callback_server
# Memulai server callback.
if _callback_server is None:
_callback_server = CallbackServer(port=3000)
_callback_server.start()
print(f"Membuka browser untuk otorisasi OAuth...")
print(f"URL otorisasi: {auth_url}")
# Secara otomatis membuka browser.
webbrowser.open(auth_url)
async def handle_callback() -> tuple[str, str | None]:
"""Secara otomatis menangani callback OAuth"""
global _callback_server
if _callback_server is None:
raise Exception("Server callback belum dimulai")
print("Menunggu otorisasi OAuth selesai...")
try:
# Menunggu callback.
code, state = await _callback_server.wait_for_callback()
print("Otorisasi OAuth berhasil!")
return code, state
except Exception as e:
print(f"Otorisasi OAuth gagal: {e}")
raise
finally:
# Membersihkan status server, tetapi tetap menjalankan server untuk digunakan kembali.
_callback_server.auth_code = None
_callback_server.auth_state = None
_callback_server.auth_error = None
_callback_server.auth_received = FalseIntegrasikan server OpenAPI MCP ke dalam agen AI
Setelah memiliki penangan OAuth, Anda dapat mengintegrasikan server MCP dengan framework agen. Bagian ini menyediakan contoh integrasi untuk AgentScope dan LangGraph.
AgentScope
AgentScope adalah framework agen open-source dari Alibaba yang mendukung fitur seperti manajemen tool agen, kontrol memori jangka panjang agen, dan Generasi yang Diperkaya dengan Pengambilan Data (RAG) cerdas.
# -*- coding: utf-8 -*-
"""Titik masuk utama contoh agen ReAct."""
import asyncio
import os
from agentscope.agent import ReActAgent, UserAgent
from agentscope.formatter import DashScopeChatFormatter
from agentscope.memory import InMemoryMemory
from agentscope.model import DashScopeChatModel
from agentscope.tool import (
Toolkit,
execute_shell_command,
execute_python_code,
view_text_file,
)
from agentscope.mcp import HttpStatelessClient
from mcp.client.auth import OAuthClientProvider, OAuthClientInformationFull, OAuthClientMetadata, OAuthToken
from pydantic import AnyUrl
from oauth_handler import InMemoryTokenStorage, handle_redirect, handle_callback
# openai base
# baca dari .env
load_dotenv()
server_url = "https://openapi-mcp.cn-hangzhou.aliyuncs.com/accounts/14******/custom/****/id/KXy******/mcp"
memory_token_storage = InMemoryTokenStorage()
oauth_provider = OAuthClientProvider(
server_url=server_url,
client_metadata=OAuthClientMetadata(
client_name="AgentScopeExampleClient",
redirect_uris=[AnyUrl("http://localhost:3000/callback")],
grant_types=["authorization_code", "refresh_token"],
response_types=["code"],
scope=None,
),
storage=memory_token_storage,
redirect_handler=handle_redirect,
callback_handler=handle_callback,
)
stateless_client = HttpStatelessClient(
# Nama server MCP.
name="mcp_services_stateless",
transport="streamable_http",
url=server_url,
auth=oauth_provider,
)
async def main() -> None:
"""Titik masuk utama untuk contoh agen ReAct."""
toolkit = Toolkit()
# toolkit.register_tool_function(execute_shell_command)
# toolkit.register_tool_function(execute_python_code)
# toolkit.register_tool_function(view_text_file)
await toolkit.register_mcp_client(stateless_client)
agent = ReActAgent(
name="AlibabaCloudOpsAgent",
sys_prompt="Anda adalah asisten O&M Alibaba Cloud. Anda ahli dalam menggunakan berbagai layanan Alibaba Cloud, seperti ECS, RDS, dan VPC, untuk memenuhi permintaan saya.",
model=DashScopeChatModel(
api_key=os.environ.get("DASHSCOPE_API_KEY"),
model_name="qwen3-max-preview",
enable_thinking=False,
stream=True,
),
formatter=DashScopeChatFormatter(),
toolkit=toolkit,
memory=InMemoryMemory(),
)
user = UserAgent("User")
msg = None
while True:
msg = await user(msg)
if msg.get_text_content() == "exit":
break
msg = await agent(msg)
asyncio.run(main())LangGraph
LangGraph adalah framework open-source yang dirancang untuk membangun, menerapkan, dan mengelola alur kerja agen AI stateful.
import asyncio
import sys
from dotenv import load_dotenv
import os
from langgraph.prebuilt import create_react_agent
from langchain.chat_models import init_chat_model
from langchain_mcp_adapters.client import MultiServerMCPClient
from mcp.client.auth import OAuthClientProvider, OAuthClientInformationFull, OAuthClientMetadata, OAuthToken
from pydantic import AnyUrl
from oauth_handler import InMemoryTokenStorage, handle_callback, handle_redirect
# openai base
# baca dari .env
load_dotenv()
async def make_agent():
model = init_chat_model(model=os.getenv("OPENAI_MODEL"), api_key=os.getenv("OPENAI_API_KEY"), base_url=os.getenv("OPENAI_BASE_URL"), model_provider='openai')
# Tentukan URL server MCP.
server_url = "https://openapi-mcp.cn-hangzhou.aliyuncs.com/accounts/1025904068912955/custom/test-ecs/id/1kB196nPAhRIbH1z/mcp"
oauth_provider = OAuthClientProvider(
server_url=server_url,
client_metadata=OAuthClientMetadata(
client_name="Example MCP Client",
redirect_uris=[AnyUrl("http://localhost:3000/callback")],
grant_types=["authorization_code", "refresh_token"],
response_types=["code"],
scope=None,
),
storage=InMemoryTokenStorage(),
redirect_handler=handle_redirect,
callback_handler=handle_callback,
)
mcp_client = MultiServerMCPClient(
{
"resourcecenter": {
"url": server_url,
"transport": "streamable_http",
"auth": oauth_provider
}
}
)
tools = await mcp_client.get_tools()
agent = create_react_agent(
model=model,
tools=tools,
prompt="You are a helpful assistant"
)
return agent
async def chat_loop():
"""Loop obrolan"""
# Membuat agen.
print("Menginisialisasi asisten AI...")
agent = await make_agent()
print("Asisten AI siap! Masukkan 'quit' atau 'exit' untuk keluar.\n")
# Riwayat obrolan.
messages = []
while True:
try:
# Mendapatkan input pengguna.
user_input=input ("User: ").strip()
# Memeriksa perintah keluar.
if user_input.lower() in ['quit', 'exit', 'leave']:
print("Sampai jumpa!")
break
# Melewatkan input kosong.
if not user_input:
continue
# Menambahkan pesan pengguna ke riwayat.
messages.append({"role": "user", "content": user_input})
print("AI: ", end="", flush=True)
# Memanggil agen.
response = await agent.ainvoke(
{"messages": messages},
{"recursion_limit": 50}
)
# Mengambil respons AI.
ai_response = response["messages"][-1].content
print(ai_response)
# Menambahkan respons AI ke riwayat.
messages.append({"role": "assistant", "content": ai_response})
print() # Memisahkan dengan baris kosong.
except KeyboardInterrupt:
print("\n\nSampai jumpa!")
break
except Exception as e:
print(f"Pesan error: {e}")
print("Silakan coba lagi...\n")
async def main():
await chat_loop()
# Menjalankan fungsi utama.
if __name__ == "__main__":
asyncio.run(main())