6.2 MCP Server & Client Roles
Resource provision, capability advertisement, session negotiation
Architecture Overview
The Model Context Protocol defines two primary roles: MCP Clients (model-side) and MCP Servers (resource-side). This client-server architecture enables clean separation of concerns, scalability, and flexible deployment patterns while maintaining security and performance.
Core Principle
MCP follows a client-server model where clients (AI models/applications) discover and consume capabilities provided by servers (resource providers), with standardized communication protocols ensuring interoperability across different implementations.
Client vs Server Responsibilities
🤖 MCP Client
Model/Application Side
- Discovery: Find and connect to available MCP servers
- Session Management: Establish and maintain connections
- Authentication: Handle security credentials and tokens
- Request Orchestration: Coordinate multi-server operations
- Protocol Translation: Convert model requests to MCP format
- Error Handling: Manage failures and retries gracefully
🔌 MCP Server
Resource Provider Side
- Resource Exposure: Make data and tools available via MCP
- Capability Advertisement: Announce available features
- Request Processing: Execute operations securely
- Access Control: Manage permissions and authorization
- State Management: Handle session state and context
- Performance Optimization: Efficient resource utilization
MCP Client Implementation
Core Client Architecture
import asyncio
import json
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
@dataclass
class MCPServer:
name: str
uri: str
capabilities: List[str]
status: str
version: str
class MCPClient:
def __init__(self):
self.servers: Dict[str, MCPServer] = {}
self.connections: Dict[str, Any] = {}
self.session_tokens: Dict[str, str] = {}
async def discover_servers(self, discovery_endpoints: List[str]) -> List[MCPServer]:
"""Discover available MCP servers from registry endpoints"""
discovered_servers = []
for endpoint in discovery_endpoints:
try:
response = await self._http_get(f"{endpoint}/mcp/servers")
servers_data = json.loads(response)
for server_info in servers_data["servers"]:
server = MCPServer(
name=server_info["name"],
uri=server_info["uri"],
capabilities=server_info["capabilities"],
status=server_info["status"],
version=server_info["version"]
)
discovered_servers.append(server)
self.servers[server.name] = server
except Exception as e:
print(f"Discovery failed for {endpoint}: {e}")
return discovered_servers
async def connect_to_server(self, server_name: str, auth_token: Optional[str] = None) -> bool:
"""Establish connection with authentication"""
if server_name not in self.servers:
raise ValueError(f"Server {server_name} not found")
server = self.servers[server_name]
try:
# Establish WebSocket connection
connection = await self._create_websocket_connection(server.uri)
# Perform handshake
handshake_msg = {
"jsonrpc": "2.0",
"method": "initialize",
"params": {
"protocolVersion": "1.0",
"clientInfo": {
"name": "AI Assistant MCP Client",
"version": "1.0.0"
},
"capabilities": ["tools", "resources", "prompts"]
},
"id": 1
}
if auth_token:
handshake_msg["params"]["auth"] = {"token": auth_token}
await connection.send(json.dumps(handshake_msg))
response = await connection.recv()
# Process handshake response
handshake_result = json.loads(response)
if "error" in handshake_result:
raise Exception(f"Handshake failed: {handshake_result['error']}")
self.connections[server_name] = connection
return True
except Exception as e:
print(f"Failed to connect to {server_name}: {e}")
return False
Resource and Tool Discovery
async def list_resources(self,
server_name: str) -> List[Dict]:
"""Get available resources from a server"""
connection = self.connections.get(server_name)
if not connection:
raise ConnectionError(f"Not connected to {server_name}")
request = {
"jsonrpc": "2.0",
"method": "resources/list",
"id": self._get_next_id()
}
await connection.send(json.dumps(request))
response = await connection.recv()
result = json.loads(response)
return result.get("result", {}).get("resources", [])
async def list_tools(self, server_name: str) -> List[Dict]:
"""Get available tools from a server"""
connection = self.connections.get(server_name)
if not connection:
raise ConnectionError(f"Not connected to {server_name}")
request = {
"jsonrpc": "2.0",
"method": "tools/list",
"id": self._get_next_id()
}
await connection.send(json.dumps(request))
response = await connection.recv()
result = json.loads(response)
return result.get("result", {}).get("tools", [])
async def invoke_tool(self, server_name: str,
tool_name: str, arguments: Dict) -> Any:
"""Execute a tool on the remote server"""
connection = self.connections.get(server_name)
if not connection:
raise ConnectionError(f"Not connected to {server_name}")
request = {
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": tool_name,
"arguments": arguments
},
"id": self._get_next_id()
}
await connection.send(json.dumps(request))
response = await connection.recv()
result = json.loads(response)
if "error" in result:
raise Exception(f"Tool execution failed: {result['error']}")
return result.get("result")
MCP Server Implementation
Core Server Architecture
from abc import ABC, abstractmethod
from typing import Dict, List, Any, Callable
import websockets
import json
class MCPTool(ABC):
def __init__(self, name: str, description: str, schema: Dict):
self.name = name
self.description = description
self.schema = schema
@abstractmethod
async def execute(self, arguments: Dict) -> Any:
pass
class MCPResource:
def __init__(self, uri: str, name: str, description: str, mime_type: str):
self.uri = uri
self.name = name
self.description = description
self.mime_type = mime_type
class MCPServer:
def __init__(self, name: str, version: str):
self.name = name
self.version = version
self.tools: Dict[str, MCPTool] = {}
self.resources: Dict[str, MCPResource] = {}
self.clients: Dict[str, Any] = {}
self.capabilities = ["tools", "resources"]
def register_tool(self, tool: MCPTool):
"""Register a tool to be exposed via MCP"""
self.tools[tool.name] = tool
print(f"Registered tool: {tool.name}")
def register_resource(self, resource: MCPResource):
"""Register a resource to be exposed via MCP"""
self.resources[resource.uri] = resource
print(f"Registered resource: {resource.name}")
async def start_server(self, host: str = "localhost", port: int = 8080):
"""Start the MCP server"""
print(f"Starting MCP server {self.name} v{self.version} on {host}:{port}")
async def handle_client(websocket, path):
client_id = f"{websocket.remote_address[0]}:{websocket.remote_address[1]}"
self.clients[client_id] = websocket
try:
async for message in websocket:
await self._handle_message(client_id, message)
except websockets.exceptions.ConnectionClosed:
print(f"Client {client_id} disconnected")
finally:
del self.clients[client_id]
await websockets.serve(handle_client, host, port)
async def _handle_message(self, client_id: str, message: str):
"""Process incoming MCP messages"""
try:
request = json.loads(message)
method = request.get("method")
params = request.get("params", {})
request_id = request.get("id")
# Route to appropriate handler
if method == "initialize":
response = await self._handle_initialize(params)
elif method == "tools/list":
response = await self._handle_list_tools()
elif method == "tools/call":
response = await self._handle_tool_call(params)
elif method == "resources/list":
response = await self._handle_list_resources()
elif method == "resources/read":
response = await self._handle_read_resource(params)
else:
response = {
"error": {
"code": -32601,
"message": f"Method not found: {method}"
}
}
# Send response
response_msg = {
"jsonrpc": "2.0",
"id": request_id
}
response_msg.update(response)
websocket = self.clients[client_id]
await websocket.send(json.dumps(response_msg))
except Exception as e:
error_response = {
"jsonrpc": "2.0",
"error": {
"code": -32603,
"message": f"Internal error: {str(e)}"
},
"id": request.get("id") if 'request' in locals() else None
}
websocket = self.clients[client_id]
await websocket.send(json.dumps(error_response))
Message Handlers
async def _handle_initialize(self, params: Dict) -> Dict:
"""Handle client initialization"""
client_info = params.get("clientInfo", {})
protocol_version = params.get("protocolVersion")
if protocol_version != "1.0":
return {
"error": {
"code": -32602,
"message": f"Unsupported protocol version: {protocol_version}"
}
}
return {
"result": {
"protocolVersion": "1.0",
"serverInfo": {
"name": self.name,
"version": self.version
},
"capabilities": self.capabilities
}
}
async def _handle_list_tools(self) -> Dict:
"""Return list of available tools"""
tools_list = [
{
"name": tool.name,
"description": tool.description,
"inputSchema": tool.schema
}
for tool in self.tools.values()
]
return {"result": {"tools": tools_list}}
async def _handle_tool_call(self, params: Dict) -> Dict:
"""Execute a tool call"""
tool_name = params.get("name")
arguments = params.get("arguments", {})
if tool_name not in self.tools:
return {
"error": {
"code": -32602,
"message": f"Tool not found: {tool_name}"
}
}
try:
tool = self.tools[tool_name]
result = await tool.execute(arguments)
return {
"result": {
"content": [
{
"type": "text",
"text": str(result)
}
]
}
}
except Exception as e:
return {
"error": {
"code": -32603,
"message": f"Tool execution failed: {str(e)}"
}
}
Session Management & Negotiation
Connection Lifecycle
Discovery
→
Connection
→
Handshake
→
Authentication
→
Operation
→
Cleanup
Capability Negotiation
class CapabilityNegotiator:
def __init__(self):
self.supported_capabilities = {
"resources": {"version": "1.0", "features": ["subscribe", "list", "read"]},
"tools": {"version": "1.0", "features": ["call", "list"]},
"prompts": {"version": "1.0", "features": ["get", "list"]},
"logging": {"version": "1.0", "features": ["setLevel"]}
}
def negotiate(self, client_capabilities: List[str], server_capabilities: List[str]) -> Dict:
"""Determine mutually supported capabilities"""
mutual_capabilities = []
for capability in client_capabilities:
if capability in server_capabilities:
if capability in self.supported_capabilities:
mutual_capabilities.append({
"name": capability,
**self.supported_capabilities[capability]
})
return {
"negotiated": mutual_capabilities,
"session_id": self._generate_session_id()
}
def _generate_session_id(self) -> str:
import uuid
return str(uuid.uuid4())
Key Capabilities by Role
Client Discovery
- Server registry integration
- Dynamic capability detection
- Health monitoring
- Load balancing
Server Security
- Authentication & authorization
- Rate limiting
- Input validation
- Audit logging
Performance
- Connection pooling
- Request batching
- Caching strategies
- Streaming responses
Reliability
- Automatic reconnection
- Circuit breakers
- Graceful degradation
- Transaction support
Implementation Best Practices
Client Best Practices
- Connection Management: Use connection pools and implement proper cleanup
- Error Handling: Implement exponential backoff and circuit breakers
- Discovery: Cache server information and refresh periodically
- Authentication: Securely store and rotate credentials
- Monitoring: Track connection health and performance metrics
Server Best Practices
- Resource Management: Implement proper resource limits and cleanup
- Security: Validate all inputs and implement proper authorization
- Performance: Use async operations and implement caching
- Monitoring: Provide health endpoints and detailed logging
- Documentation: Clearly document all capabilities and schemas