6.3 Integrating MCP with Agents

Routing requests, context expansion, secure tool boundaries

Integration Architecture

Integrating the Model Context Protocol with AI agents creates a powerful architecture where agents can dynamically discover, access, and utilize external resources and tools. This integration enables agents to extend their capabilities beyond their core training while maintaining security and control.

Key Benefits

MCP integration allows agents to access real-time data, execute external tools, expand context windows, and maintain secure boundaries between agent reasoning and external resource access.

Agent-MCP Architecture

AI Agent Layer
Reasoning • Planning • Decision Making
MCP Integration Layer
Request Routing • Context Expansion • Security
File System
Databases
APIs
Tools

Agent-MCP Bridge Implementation

Core Integration Bridge

import asyncio import json from typing import Dict, List, Any, Optional from dataclasses import dataclass from enum import Enum class RequestType(Enum): RESOURCE_READ = "resource_read" TOOL_CALL = "tool_call" CONTEXT_EXPANSION = "context_expansion" CAPABILITY_DISCOVERY = "capability_discovery" @dataclass class AgentRequest: request_id: str agent_id: str request_type: RequestType target_server: Optional[str] parameters: Dict[str, Any] security_context: Dict[str, Any] priority: int = 1 class AgentMCPBridge: def __init__(self, agent_id: str): self.agent_id = agent_id self.mcp_client = None self.request_router = RequestRouter() self.context_manager = ContextManager() self.security_manager = SecurityManager() self.active_sessions: Dict[str, Any] = {} async def initialize(self, mcp_config: Dict): """Initialize MCP connections and discover capabilities""" from mcp_client import MCPClient self.mcp_client = MCPClient() # Discover and connect to MCP servers discovery_endpoints = mcp_config.get("discovery_endpoints", []) servers = await self.mcp_client.discover_servers(discovery_endpoints) # Establish connections based on agent needs for server in servers: if self._should_connect_to_server(server): auth_token = await self.security_manager.get_auth_token( server.name) success = await self.mcp_client.connect_to_server( server.name, auth_token ) if success: # Discover server capabilities tools = await self.mcp_client.list_tools(server.name) resources = await self.mcp_client.list_resources(server.name) self.request_router.register_server_capabilities( server.name, tools, resources ) async def process_agent_request(self, request: AgentRequest) -> Any: """Process an agent request through MCP""" try: # Security validation if not await self.security_manager.validate_request(request): raise SecurityError("Request failed security validation") # Route request to appropriate server target_server = await self.request_router.route_request(request) # Execute request based on type if request.request_type == RequestType.TOOL_CALL: result = await self._execute_tool_call(target_server, request) elif request.request_type == RequestType.RESOURCE_READ: result = await self._read_resource(target_server, request) elif request.request_type == RequestType.CONTEXT_EXPANSION: result = await self._expand_context(target_server, request) else: raise ValueError(f"Unsupported request type: {request.request_type}") # Post-process and contextualize result processed_result = await self.context_manager.process_result( result, request ) return processed_result except Exception as e: return { "error": str(e), "request_id": request.request_id, "timestamp": asyncio.get_event_loop().time() } async def _execute_tool_call(self, server_name: str, request: AgentRequest) -> Any: """Execute a tool call through MCP""" tool_name = request.parameters.get("tool_name") tool_args = request.parameters.get("arguments", {}) # Add agent context to tool arguments tool_args["_agent_context"] = { "agent_id": request.agent_id, "request_id": request.request_id, "security_level": request.security_context.get("level", "standard") } result = await self.mcp_client.invoke_tool( server_name, tool_name, tool_args ) return result

Request Router Implementation

class RequestRouter: def __init__(self): self.server_capabilities: Dict[str, Dict] = {} self.routing_rules: List[Dict] = [] self.load_balancer = LoadBalancer() def register_server_capabilities(self, server_name: str, tools: List, resources: List): """Register what each server can provide""" self.server_capabilities[server_name] = { "tools": {tool["name"]: tool for tool in tools}, "resources": {res["uri"]: res for res in resources}, "last_updated": asyncio.get_event_loop().time() } async def route_request(self, request: AgentRequest) -> str: """Determine the best server to handle the request""" # If target server specified, validate and use it if request.target_server: if self._can_handle_request(request.target_server, request): return request.target_server else: raise ValueError(f"Server {request.target_server} cannot handle request") # Find servers that can handle the request candidates = [] for server_name, capabilities in self.server_capabilities.items(): if self._can_handle_request(server_name, request): score = self._score_server(server_name, request) candidates.append((server_name, score)) if not candidates: raise ValueError("No server available to handle request") # Select best server based on scoring candidates.sort(key=lambda x: x[1], reverse=True) best_server = candidates[0][0] # Apply load balancing if multiple good options if len(candidates) > 1 and candidates[0][1] == candidates[1][1]: best_server = self.load_balancer.select_server([c[0] for c in candidates[:3]]) return best_server def _can_handle_request(self, server_name: str, request: AgentRequest) -> bool: """Check if server has required capabilities""" capabilities = self.server_capabilities.get(server_name, {}) if request.request_type == RequestType.TOOL_CALL: tool_name = request.parameters.get("tool_name") return tool_name in capabilities.get("tools", {}) elif request.request_type == RequestType.RESOURCE_READ: resource_uri = request.parameters.get("resource_uri") return resource_uri in capabilities.get("resources", {}) return False def _score_server(self, server_name: str, request: AgentRequest) -> float: """Score server based on various factors""" score = 1.0 # Factor in server load load = self.load_balancer.get_server_load(server_name) score *= (1.0 - load) # Factor in geographical proximity # Factor in historical performance # Factor in security requirements return score

Context Expansion & Management

Dynamic Context Expansion

class ContextManager: def __init__(self): self.context_cache: Dict[str, Any] = {} self.expansion_strategies: Dict[str, Callable] = {} self.max_context_size = 10000 # tokens async def expand_context_for_agent(self, agent_request: str, mcp_servers: List[str]) -> str: """Dynamically expand agent context using MCP resources""" expanded_context = agent_request context_additions = [] # Analyze request to identify context expansion opportunities expansion_needs = self._analyze_context_needs(agent_request) for need in expansion_needs: if need["type"] == "file_reference": content = await self._fetch_file_content(need["path"], mcp_servers) if content: context_additions.append(f"File {need['path']}:\n{content}") elif need["type"] == "database_query": results = await self._execute_database_query(need["query"], mcp_servers) if results: context_additions.append(f"Query results:\n{results}") elif need["type"] == "api_data": data = await self._fetch_api_data(need["endpoint"], mcp_servers) if data: context_additions.append(f"API data:\n{data}") # Merge context additions efficiently if context_additions: additional_context = "\n\n--- Additional Context ---\n" + "\n\n".join(context_additions) # Check context size limits if self._estimate_token_count(expanded_context + additional_context) <= self.max_context_size: expanded_context += additional_context else: # Prioritize and truncate context expanded_context = self._prioritize_and_truncate_context( expanded_context, context_additions ) return expanded_context def _analyze_context_needs(self, request: str) -> List[Dict]: """Analyze request to identify what context might be helpful""" needs = [] # Look for file path patterns import re file_patterns = re.findall(rr'(?:file|path|document):\s*([^\s]+)', request) for path in file_patterns: needs.append({"type": "file_reference", "path": path}) # Look for database/query patterns if re.search(rr'\b(?:database|query|table|sql)\b', request.lower()): needs.append({"type": "database_query", "query": request}) # Look for API/service patterns if re.search(rr'\b(?:api|service|endpoint|http)\b', request.lower()): needs.append({"type": "api_data", "endpoint": request}) return needs async def process_result(self, result: Any, request: AgentRequest) -> Dict: """Post-process MCP results for agent consumption""" processed = { "original_result": result, "request_id": request.request_id, "timestamp": asyncio.get_event_loop().time(), "metadata": {} } # Add context metadata processed["metadata"]["source_type"] = request.request_type.value processed["metadata"]["agent_id"] = request.agent_id # Format result based on type if request.request_type == RequestType.TOOL_CALL: processed["formatted_result"] = self._format_tool_result(result) elif request.request_type == RequestType.RESOURCE_READ: processed["formatted_result"] = self._format_resource_result(result) # Cache result for future reference cache_key = f"{request.agent_id}:{request.request_id}" self.context_cache[cache_key] = processed return processed

Security & Tool Boundaries

🔐 Authentication & Authorization
  • Agent identity verification
  • Resource access control
  • Role-based permissions
  • Token management
🛡️ Input Validation
  • Request sanitization
  • Parameter validation
  • Schema enforcement
  • Injection prevention
⚠️ Boundary Enforcement
  • Resource sandboxing
  • Execution limits
  • Network restrictions
  • File system boundaries
📊 Monitoring & Auditing
  • Request logging
  • Access tracking
  • Anomaly detection
  • Compliance reporting

Security Manager Implementation

class SecurityManager: def __init__(self span): self.access_policies: Dict[str, Dict] = {} self.agent_credentials: Dict[str, Dict] = {} self.audit_logger = AuditLogger() self.rate_limiter = RateLimiter() async def validate_request(self, request: AgentRequest) -> bool: """Comprehensive request validation""" try: # Rate limiting check if not await self.rate_limiter.check_rate_limit(request.agent_id): await self.audit_logger.log_security_event( "rate_limit_exceeded", request ) return False # Agent authentication if not self._validate_agent_identity(request): await self.audit_logger.log_security_event( "authentication_failed", request ) return False # Authorization check if not self._check_authorization(request): await self.audit_logger.log_security_event( "authorization_failed", request ) return False # Input validation if not self._validate_input_parameters(request): await self.audit_logger.log_security_event( "input_validation_failed", request ) return False # Resource boundary check if not self._check_resource_boundaries(request): await self.audit_logger.log_security_event( "boundary_violation", request ) return False # Log successful validation await self.audit_logger.log_access_request(request) return True except Exception as e: await self.audit_logger.log_security_event( "validation_error", request, error=str(e) ) return False def _check_resource_boundaries(self, request: AgentRequest) -> bool: """Enforce resource access boundaries""" agent_policy = self.access_policies.get(request.agent_id, {}) allowed_resources = agent_policy.get("allowed_resources", []) if request.request_type == RequestType.RESOURCE_READ: resource_uri = request.parameters.get("resource_uri") return any( resource_uri.startswith(allowed) for allowed in allowed_resources ) elif request.request_type == RequestType.TOOL_CALL: tool_name = request.parameters.get("tool_name") allowed_tools = agent_policy.get("allowed_tools", []) return tool_name in allowed_tools return True async def get_auth_token(self, server_name: str) -> Optional[str]: """Get authentication token for MCP server""" # Implementation depends on your auth system return self.agent_credentials.get(server_name, {}).get("token")

Common Integration Patterns

🔄 Reactive Integration

Event-Driven Pattern

  • Agents respond to external events via MCP
  • Real-time data streams
  • Webhook integrations
  • Asynchronous processing

🎯 Proactive Integration

Agent-Initiated Pattern

  • Agents actively seek information
  • Scheduled data fetching
  • Context pre-loading
  • Predictive resource access

Example: File Analysis Agent

class FileAnalysisAgent: def __init__(self): self.mcp_bridge = AgentMCPBridge("file-analysis-agent") self.analysis_tools = ["file_scanner", "content_analyzer", "security_checker"] async def analyze_directory(self, directory_path: str) -> Dict: """Analyze all files in a directory using MCP tools""" results = {"files": [], "summary": {}, "security_issues": []} # Step 1: List files using MCP resource list_request = AgentRequest( request_id=self._generate_request_id(), agent_id="file-analysis-agent", request_type=RequestType.TOOL_CALL, target_server="filesystem-server", parameters={ "tool_name": "list_directory", "arguments": {"path": directory_path, "recursive": True} }, security_context={"level": "standard"} ) file_list = await self.mcp_bridge.process_agent_request(list_request) # Step 2: Analyze each file for file_info in file_list.get("files", []): file_analysis = await self._analyze_single_file(file_info["path"]) results["files"].append(file_analysis) # Aggregate security issues results["security_issues"].extend( file_analysis.get("security_issues", []) ) # Step 3: Generate summary results["summary"] = self._generate_summary(results["files"]) return results async def _analyze_single_file(self, file_path: str) -> Dict: """Analyze a single file using multiple MCP tools""" analysis_results = {} # Content analysis content_request = AgentRequest( request_id=self._generate_request_id(), agent_id="file-analysis-agent", request_type=RequestType.TOOL_CALL, target_server="analysis-server", parameters={ "tool_name": "analyze_content", "arguments": {"file_path": file_path} }, security_context={"level": "standard"} ) content_analysis = await self.mcp_bridge.process_agent_request(content_request) analysis_results["content"] = content_analysis # Security scan security_request = AgentRequest( request_id=self._generate_request_id(), agent_id="file-analysis-agent", request_type=RequestType.TOOL_CALL, target_server="security-server", parameters={ "tool_name": "security_scan", "arguments": {"file_path": file_path} }, security_context={"level": "high"} ) security_analysis = await self.mcp_bridge.process_agent_request(security_request) analysis_results["security"] = security_analysis return { "path": file_path, "analysis": analysis_results, "security_issues": security_analysis.get("issues", []), "timestamp": asyncio.get_event_loop().time() }

Integration Best Practices

Design Principles

  • Loose Coupling: Keep agent logic separate from MCP resource access
  • Security First: Always validate and sanitize external data
  • Graceful Degradation: Handle MCP server failures elegantly
  • Context Awareness: Expand context intelligently, not blindly
  • Performance Optimization: Cache results and batch requests when possible

Common Pitfalls to Avoid

  • Over-fetching: Don't retrieve more data than necessary
  • Security Bypass: Never skip security validation for "trusted" sources
  • Blocking Operations: Use async operations for all MCP calls
  • Error Propagation: Handle MCP errors without crashing the agent
  • Resource Leaks: Always clean up connections and resources