6.2 MCP Server & Client Roles

Resource provision, capability advertisement, session negotiation

Architecture Overview

The Model Context Protocol defines two primary roles: MCP Clients (model-side) and MCP Servers (resource-side). This client-server architecture enables clean separation of concerns, scalability, and flexible deployment patterns while maintaining security and performance.

Core Principle

MCP follows a client-server model where clients (AI models/applications) discover and consume capabilities provided by servers (resource providers), with standardized communication protocols ensuring interoperability across different implementations.

Client vs Server Responsibilities

🤖 MCP Client

Model/Application Side

  • Discovery: Find and connect to available MCP servers
  • Session Management: Establish and maintain connections
  • Authentication: Handle security credentials and tokens
  • Request Orchestration: Coordinate multi-server operations
  • Protocol Translation: Convert model requests to MCP format
  • Error Handling: Manage failures and retries gracefully

🔌 MCP Server

Resource Provider Side

  • Resource Exposure: Make data and tools available via MCP
  • Capability Advertisement: Announce available features
  • Request Processing: Execute operations securely
  • Access Control: Manage permissions and authorization
  • State Management: Handle session state and context
  • Performance Optimization: Efficient resource utilization

MCP Client Implementation

Core Client Architecture

import asyncio import json from typing import Dict, List, Optional, Any from dataclasses import dataclass @dataclass class MCPServer: name: str uri: str capabilities: List[str] status: str version: str class MCPClient: def __init__(self): self.servers: Dict[str, MCPServer] = {} self.connections: Dict[str, Any] = {} self.session_tokens: Dict[str, str] = {} async def discover_servers(self, discovery_endpoints: List[str]) -> List[MCPServer]: """Discover available MCP servers from registry endpoints""" discovered_servers = [] for endpoint in discovery_endpoints: try: response = await self._http_get(f"{endpoint}/mcp/servers") servers_data = json.loads(response) for server_info in servers_data["servers"]: server = MCPServer( name=server_info["name"], uri=server_info["uri"], capabilities=server_info["capabilities"], status=server_info["status"], version=server_info["version"] ) discovered_servers.append(server) self.servers[server.name] = server except Exception as e: print(f"Discovery failed for {endpoint}: {e}") return discovered_servers async def connect_to_server(self, server_name: str, auth_token: Optional[str] = None) -> bool: """Establish connection with authentication""" if server_name not in self.servers: raise ValueError(f"Server {server_name} not found") server = self.servers[server_name] try: # Establish WebSocket connection connection = await self._create_websocket_connection(server.uri) # Perform handshake handshake_msg = { "jsonrpc": "2.0", "method": "initialize", "params": { "protocolVersion": "1.0", "clientInfo": { "name": "AI Assistant MCP Client", "version": "1.0.0" }, "capabilities": ["tools", "resources", "prompts"] }, "id": 1 } if auth_token: handshake_msg["params"]["auth"] = {"token": auth_token} await connection.send(json.dumps(handshake_msg)) response = await connection.recv() # Process handshake response handshake_result = json.loads(response) if "error" in handshake_result: raise Exception(f"Handshake failed: {handshake_result['error']}") self.connections[server_name] = connection return True except Exception as e: print(f"Failed to connect to {server_name}: {e}") return False

Resource and Tool Discovery

async def list_resources(self, server_name: str) -> List[Dict]: """Get available resources from a server""" connection = self.connections.get(server_name) if not connection: raise ConnectionError(f"Not connected to {server_name}") request = { "jsonrpc": "2.0", "method": "resources/list", "id": self._get_next_id() } await connection.send(json.dumps(request)) response = await connection.recv() result = json.loads(response) return result.get("result", {}).get("resources", []) async def list_tools(self, server_name: str) -> List[Dict]: """Get available tools from a server""" connection = self.connections.get(server_name) if not connection: raise ConnectionError(f"Not connected to {server_name}") request = { "jsonrpc": "2.0", "method": "tools/list", "id": self._get_next_id() } await connection.send(json.dumps(request)) response = await connection.recv() result = json.loads(response) return result.get("result", {}).get("tools", []) async def invoke_tool(self, server_name: str, tool_name: str, arguments: Dict) -> Any: """Execute a tool on the remote server""" connection = self.connections.get(server_name) if not connection: raise ConnectionError(f"Not connected to {server_name}") request = { "jsonrpc": "2.0", "method": "tools/call", "params": { "name": tool_name, "arguments": arguments }, "id": self._get_next_id() } await connection.send(json.dumps(request)) response = await connection.recv() result = json.loads(response) if "error" in result: raise Exception(f"Tool execution failed: {result['error']}") return result.get("result")

MCP Server Implementation

Core Server Architecture

from abc import ABC, abstractmethod from typing import Dict, List, Any, Callable import websockets import json class MCPTool(ABC): def __init__(self, name: str, description: str, schema: Dict): self.name = name self.description = description self.schema = schema @abstractmethod async def execute(self, arguments: Dict) -> Any: pass class MCPResource: def __init__(self, uri: str, name: str, description: str, mime_type: str): self.uri = uri self.name = name self.description = description self.mime_type = mime_type class MCPServer: def __init__(self, name: str, version: str): self.name = name self.version = version self.tools: Dict[str, MCPTool] = {} self.resources: Dict[str, MCPResource] = {} self.clients: Dict[str, Any] = {} self.capabilities = ["tools", "resources"] def register_tool(self, tool: MCPTool): """Register a tool to be exposed via MCP""" self.tools[tool.name] = tool print(f"Registered tool: {tool.name}") def register_resource(self, resource: MCPResource): """Register a resource to be exposed via MCP""" self.resources[resource.uri] = resource print(f"Registered resource: {resource.name}") async def start_server(self, host: str = "localhost", port: int = 8080): """Start the MCP server""" print(f"Starting MCP server {self.name} v{self.version} on {host}:{port}") async def handle_client(websocket, path): client_id = f"{websocket.remote_address[0]}:{websocket.remote_address[1]}" self.clients[client_id] = websocket try: async for message in websocket: await self._handle_message(client_id, message) except websockets.exceptions.ConnectionClosed: print(f"Client {client_id} disconnected") finally: del self.clients[client_id] await websockets.serve(handle_client, host, port) async def _handle_message(self, client_id: str, message: str): """Process incoming MCP messages""" try: request = json.loads(message) method = request.get("method") params = request.get("params", {}) request_id = request.get("id") # Route to appropriate handler if method == "initialize": response = await self._handle_initialize(params) elif method == "tools/list": response = await self._handle_list_tools() elif method == "tools/call": response = await self._handle_tool_call(params) elif method == "resources/list": response = await self._handle_list_resources() elif method == "resources/read": response = await self._handle_read_resource(params) else: response = { "error": { "code": -32601, "message": f"Method not found: {method}" } } # Send response response_msg = { "jsonrpc": "2.0", "id": request_id } response_msg.update(response) websocket = self.clients[client_id] await websocket.send(json.dumps(response_msg)) except Exception as e: error_response = { "jsonrpc": "2.0", "error": { "code": -32603, "message": f"Internal error: {str(e)}" }, "id": request.get("id") if 'request' in locals() else None } websocket = self.clients[client_id] await websocket.send(json.dumps(error_response))

Message Handlers

async def _handle_initialize(self, params: Dict) -> Dict: """Handle client initialization""" client_info = params.get("clientInfo", {}) protocol_version = params.get("protocolVersion") if protocol_version != "1.0": return { "error": { "code": -32602, "message": f"Unsupported protocol version: {protocol_version}" } } return { "result": { "protocolVersion": "1.0", "serverInfo": { "name": self.name, "version": self.version }, "capabilities": self.capabilities } } async def _handle_list_tools(self) -> Dict: """Return list of available tools""" tools_list = [ { "name": tool.name, "description": tool.description, "inputSchema": tool.schema } for tool in self.tools.values() ] return {"result": {"tools": tools_list}} async def _handle_tool_call(self, params: Dict) -> Dict: """Execute a tool call""" tool_name = params.get("name") arguments = params.get("arguments", {}) if tool_name not in self.tools: return { "error": { "code": -32602, "message": f"Tool not found: {tool_name}" } } try: tool = self.tools[tool_name] result = await tool.execute(arguments) return { "result": { "content": [ { "type": "text", "text": str(result) } ] } } except Exception as e: return { "error": { "code": -32603, "message": f"Tool execution failed: {str(e)}" } }

Session Management & Negotiation

Connection Lifecycle

Discovery
Connection
Handshake
Authentication
Operation
Cleanup

Capability Negotiation

class CapabilityNegotiator: def __init__(self): self.supported_capabilities = { "resources": {"version": "1.0", "features": ["subscribe", "list", "read"]}, "tools": {"version": "1.0", "features": ["call", "list"]}, "prompts": {"version": "1.0", "features": ["get", "list"]}, "logging": {"version": "1.0", "features": ["setLevel"]} } def negotiate(self, client_capabilities: List[str], server_capabilities: List[str]) -> Dict: """Determine mutually supported capabilities""" mutual_capabilities = [] for capability in client_capabilities: if capability in server_capabilities: if capability in self.supported_capabilities: mutual_capabilities.append({ "name": capability, **self.supported_capabilities[capability] }) return { "negotiated": mutual_capabilities, "session_id": self._generate_session_id() } def _generate_session_id(self) -> str: import uuid return str(uuid.uuid4())

Key Capabilities by Role

🔍 Client Discovery
  • Server registry integration
  • Dynamic capability detection
  • Health monitoring
  • Load balancing
🔐 Server Security
  • Authentication & authorization
  • Rate limiting
  • Input validation
  • Audit logging
Performance
  • Connection pooling
  • Request batching
  • Caching strategies
  • Streaming responses
🔄 Reliability
  • Automatic reconnection
  • Circuit breakers
  • Graceful degradation
  • Transaction support

Implementation Best Practices

Client Best Practices

  • Connection Management: Use connection pools and implement proper cleanup
  • Error Handling: Implement exponential backoff and circuit breakers
  • Discovery: Cache server information and refresh periodically
  • Authentication: Securely store and rotate credentials
  • Monitoring: Track connection health and performance metrics

Server Best Practices

  • Resource Management: Implement proper resource limits and cleanup
  • Security: Validate all inputs and implement proper authorization
  • Performance: Use async operations and implement caching
  • Monitoring: Provide health endpoints and detailed logging
  • Documentation: Clearly document all capabilities and schemas