6.3 Integrating MCP with Agents
Routing requests, context expansion, secure tool boundaries
Integration Architecture
Integrating the Model Context Protocol with AI agents creates a powerful architecture where agents can dynamically discover, access, and utilize external resources and tools. This integration enables agents to extend their capabilities beyond their core training while maintaining security and control.
Key Benefits
MCP integration allows agents to access real-time data, execute external tools, expand context windows, and maintain secure boundaries between agent reasoning and external resource access.
Agent-MCP Architecture
AI Agent Layer
Reasoning • Planning • Decision Making
Reasoning • Planning • Decision Making
↓
MCP Integration Layer
Request Routing • Context Expansion • Security
Request Routing • Context Expansion • Security
↓
File System
Databases
APIs
Tools
Agent-MCP Bridge Implementation
Core Integration Bridge
import asyncio
import json
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
class RequestType(Enum):
RESOURCE_READ = "resource_read"
TOOL_CALL = "tool_call"
CONTEXT_EXPANSION = "context_expansion"
CAPABILITY_DISCOVERY = "capability_discovery"
@dataclass
class AgentRequest:
request_id: str
agent_id: str
request_type: RequestType
target_server: Optional[str]
parameters: Dict[str, Any]
security_context: Dict[str, Any]
priority: int = 1
class AgentMCPBridge:
def __init__(self, agent_id: str):
self.agent_id = agent_id
self.mcp_client = None
self.request_router = RequestRouter()
self.context_manager = ContextManager()
self.security_manager = SecurityManager()
self.active_sessions: Dict[str, Any] = {}
async def initialize(self, mcp_config: Dict):
"""Initialize MCP connections and discover capabilities"""
from mcp_client import MCPClient
self.mcp_client = MCPClient()
# Discover and connect to MCP servers
discovery_endpoints = mcp_config.get("discovery_endpoints", [])
servers = await self.mcp_client.discover_servers(discovery_endpoints)
# Establish connections based on agent needs
for server in servers:
if self._should_connect_to_server(server):
auth_token = await self.security_manager.get_auth_token(
server.name)
success = await self.mcp_client.connect_to_server(
server.name, auth_token
)
if success:
# Discover server capabilities
tools = await self.mcp_client.list_tools(server.name)
resources = await self.mcp_client.list_resources(server.name)
self.request_router.register_server_capabilities(
server.name, tools, resources
)
async def process_agent_request(self, request: AgentRequest) -> Any:
"""Process an agent request through MCP"""
try:
# Security validation
if not await self.security_manager.validate_request(request):
raise SecurityError("Request failed security validation")
# Route request to appropriate server
target_server = await self.request_router.route_request(request)
# Execute request based on type
if request.request_type == RequestType.TOOL_CALL:
result = await self._execute_tool_call(target_server, request)
elif request.request_type == RequestType.RESOURCE_READ:
result = await self._read_resource(target_server, request)
elif request.request_type == RequestType.CONTEXT_EXPANSION:
result = await self._expand_context(target_server, request)
else:
raise ValueError(f"Unsupported request type: {request.request_type}")
# Post-process and contextualize result
processed_result = await self.context_manager.process_result(
result, request
)
return processed_result
except Exception as e:
return {
"error": str(e),
"request_id": request.request_id,
"timestamp": asyncio.get_event_loop().time()
}
async def _execute_tool_call(self, server_name: str, request: AgentRequest) -> Any:
"""Execute a tool call through MCP"""
tool_name = request.parameters.get("tool_name")
tool_args = request.parameters.get("arguments", {})
# Add agent context to tool arguments
tool_args["_agent_context"] = {
"agent_id": request.agent_id,
"request_id": request.request_id,
"security_level": request.security_context.get("level", "standard")
}
result = await self.mcp_client.invoke_tool(
server_name, tool_name, tool_args
)
return result
Request Router Implementation
class RequestRouter:
def __init__(self):
self.server_capabilities: Dict[str, Dict] = {}
self.routing_rules: List[Dict] = []
self.load_balancer = LoadBalancer()
def register_server_capabilities(self, server_name: str, tools: List, resources: List):
"""Register what each server can provide"""
self.server_capabilities[server_name] = {
"tools": {tool["name"]: tool for tool in tools},
"resources": {res["uri"]: res for res in resources},
"last_updated": asyncio.get_event_loop().time()
}
async def route_request(self, request: AgentRequest) -> str:
"""Determine the best server to handle the request"""
# If target server specified, validate and use it
if request.target_server:
if self._can_handle_request(request.target_server, request):
return request.target_server
else:
raise ValueError(f"Server {request.target_server} cannot handle request")
# Find servers that can handle the request
candidates = []
for server_name, capabilities in self.server_capabilities.items():
if self._can_handle_request(server_name, request):
score = self._score_server(server_name, request)
candidates.append((server_name, score))
if not candidates:
raise ValueError("No server available to handle request")
# Select best server based on scoring
candidates.sort(key=lambda x: x[1], reverse=True)
best_server = candidates[0][0]
# Apply load balancing if multiple good options
if len(candidates) > 1 and candidates[0][1] == candidates[1][1]:
best_server = self.load_balancer.select_server([c[0] for c in candidates[:3]])
return best_server
def _can_handle_request(self, server_name: str, request: AgentRequest) -> bool:
"""Check if server has required capabilities"""
capabilities = self.server_capabilities.get(server_name, {})
if request.request_type == RequestType.TOOL_CALL:
tool_name = request.parameters.get("tool_name")
return tool_name in capabilities.get("tools", {})
elif request.request_type == RequestType.RESOURCE_READ:
resource_uri = request.parameters.get("resource_uri")
return resource_uri in capabilities.get("resources", {})
return False
def _score_server(self, server_name: str, request: AgentRequest) -> float:
"""Score server based on various factors"""
score = 1.0
# Factor in server load
load = self.load_balancer.get_server_load(server_name)
score *= (1.0 - load)
# Factor in geographical proximity
# Factor in historical performance
# Factor in security requirements
return score
Context Expansion & Management
Dynamic Context Expansion
class ContextManager:
def __init__(self):
self.context_cache: Dict[str, Any] = {}
self.expansion_strategies: Dict[str, Callable] = {}
self.max_context_size = 10000 # tokens
async def expand_context_for_agent(self, agent_request: str, mcp_servers: List[str]) -> str:
"""Dynamically expand agent context using MCP resources"""
expanded_context = agent_request
context_additions = []
# Analyze request to identify context expansion opportunities
expansion_needs = self._analyze_context_needs(agent_request)
for need in expansion_needs:
if need["type"] == "file_reference":
content = await self._fetch_file_content(need["path"], mcp_servers)
if content:
context_additions.append(f"File {need['path']}:\n{content}")
elif need["type"] == "database_query":
results = await self._execute_database_query(need["query"], mcp_servers)
if results:
context_additions.append(f"Query results:\n{results}")
elif need["type"] == "api_data":
data = await self._fetch_api_data(need["endpoint"], mcp_servers)
if data:
context_additions.append(f"API data:\n{data}")
# Merge context additions efficiently
if context_additions:
additional_context = "\n\n--- Additional Context ---\n" + "\n\n".join(context_additions)
# Check context size limits
if self._estimate_token_count(expanded_context + additional_context) <= self.max_context_size:
expanded_context += additional_context
else:
# Prioritize and truncate context
expanded_context = self._prioritize_and_truncate_context(
expanded_context, context_additions
)
return expanded_context
def _analyze_context_needs(self, request: str) -> List[Dict]:
"""Analyze request to identify what context might be helpful"""
needs = []
# Look for file path patterns
import re
file_patterns = re.findall(rr'(?:file|path|document):\s*([^\s]+)', request)
for path in file_patterns:
needs.append({"type": "file_reference", "path": path})
# Look for database/query patterns
if re.search(rr'\b(?:database|query|table|sql)\b', request.lower()):
needs.append({"type": "database_query", "query": request})
# Look for API/service patterns
if re.search(rr'\b(?:api|service|endpoint|http)\b', request.lower()):
needs.append({"type": "api_data", "endpoint": request})
return needs
async def process_result(self, result: Any, request: AgentRequest) -> Dict:
"""Post-process MCP results for agent consumption"""
processed = {
"original_result": result,
"request_id": request.request_id,
"timestamp": asyncio.get_event_loop().time(),
"metadata": {}
}
# Add context metadata
processed["metadata"]["source_type"] = request.request_type.value
processed["metadata"]["agent_id"] = request.agent_id
# Format result based on type
if request.request_type == RequestType.TOOL_CALL:
processed["formatted_result"] = self._format_tool_result(result)
elif request.request_type == RequestType.RESOURCE_READ:
processed["formatted_result"] = self._format_resource_result(result)
# Cache result for future reference
cache_key = f"{request.agent_id}:{request.request_id}"
self.context_cache[cache_key] = processed
return processed
Security & Tool Boundaries
Authentication & Authorization
- Agent identity verification
- Resource access control
- Role-based permissions
- Token management
Input Validation
- Request sanitization
- Parameter validation
- Schema enforcement
- Injection prevention
Boundary Enforcement
- Resource sandboxing
- Execution limits
- Network restrictions
- File system boundaries
Monitoring & Auditing
- Request logging
- Access tracking
- Anomaly detection
- Compliance reporting
Security Manager Implementation
class SecurityManager:
def __init__(self span):
self.access_policies: Dict[str, Dict] = {}
self.agent_credentials: Dict[str, Dict] = {}
self.audit_logger = AuditLogger()
self.rate_limiter = RateLimiter()
async def validate_request(self, request: AgentRequest) -> bool:
"""Comprehensive request validation"""
try:
# Rate limiting check
if not await self.rate_limiter.check_rate_limit(request.agent_id):
await self.audit_logger.log_security_event(
"rate_limit_exceeded", request
)
return False
# Agent authentication
if not self._validate_agent_identity(request):
await self.audit_logger.log_security_event(
"authentication_failed", request
)
return False
# Authorization check
if not self._check_authorization(request):
await self.audit_logger.log_security_event(
"authorization_failed", request
)
return False
# Input validation
if not self._validate_input_parameters(request):
await self.audit_logger.log_security_event(
"input_validation_failed", request
)
return False
# Resource boundary check
if not self._check_resource_boundaries(request):
await self.audit_logger.log_security_event(
"boundary_violation", request
)
return False
# Log successful validation
await self.audit_logger.log_access_request(request)
return True
except Exception as e:
await self.audit_logger.log_security_event(
"validation_error", request, error=str(e)
)
return False
def _check_resource_boundaries(self, request: AgentRequest) -> bool:
"""Enforce resource access boundaries"""
agent_policy = self.access_policies.get(request.agent_id, {})
allowed_resources = agent_policy.get("allowed_resources", [])
if request.request_type == RequestType.RESOURCE_READ:
resource_uri = request.parameters.get("resource_uri")
return any(
resource_uri.startswith(allowed)
for allowed in allowed_resources
)
elif request.request_type == RequestType.TOOL_CALL:
tool_name = request.parameters.get("tool_name")
allowed_tools = agent_policy.get("allowed_tools", [])
return tool_name in allowed_tools
return True
async def get_auth_token(self, server_name: str) -> Optional[str]:
"""Get authentication token for MCP server"""
# Implementation depends on your auth system
return self.agent_credentials.get(server_name, {}).get("token")
Common Integration Patterns
🔄 Reactive Integration
Event-Driven Pattern
- Agents respond to external events via MCP
- Real-time data streams
- Webhook integrations
- Asynchronous processing
🎯 Proactive Integration
Agent-Initiated Pattern
- Agents actively seek information
- Scheduled data fetching
- Context pre-loading
- Predictive resource access
Example: File Analysis Agent
class FileAnalysisAgent:
def __init__(self):
self.mcp_bridge = AgentMCPBridge("file-analysis-agent")
self.analysis_tools = ["file_scanner", "content_analyzer", "security_checker"]
async def analyze_directory(self, directory_path: str) -> Dict:
"""Analyze all files in a directory using MCP tools"""
results = {"files": [], "summary": {}, "security_issues": []}
# Step 1: List files using MCP resource
list_request = AgentRequest(
request_id=self._generate_request_id(),
agent_id="file-analysis-agent",
request_type=RequestType.TOOL_CALL,
target_server="filesystem-server",
parameters={
"tool_name": "list_directory",
"arguments": {"path": directory_path, "recursive": True}
},
security_context={"level": "standard"}
)
file_list = await self.mcp_bridge.process_agent_request(list_request)
# Step 2: Analyze each file
for file_info in file_list.get("files", []):
file_analysis = await self._analyze_single_file(file_info["path"])
results["files"].append(file_analysis)
# Aggregate security issues
results["security_issues"].extend(
file_analysis.get("security_issues", [])
)
# Step 3: Generate summary
results["summary"] = self._generate_summary(results["files"])
return results
async def _analyze_single_file(self, file_path: str) -> Dict:
"""Analyze a single file using multiple MCP tools"""
analysis_results = {}
# Content analysis
content_request = AgentRequest(
request_id=self._generate_request_id(),
agent_id="file-analysis-agent",
request_type=RequestType.TOOL_CALL,
target_server="analysis-server",
parameters={
"tool_name": "analyze_content",
"arguments": {"file_path": file_path}
},
security_context={"level": "standard"}
)
content_analysis = await self.mcp_bridge.process_agent_request(content_request)
analysis_results["content"] = content_analysis
# Security scan
security_request = AgentRequest(
request_id=self._generate_request_id(),
agent_id="file-analysis-agent",
request_type=RequestType.TOOL_CALL,
target_server="security-server",
parameters={
"tool_name": "security_scan",
"arguments": {"file_path": file_path}
},
security_context={"level": "high"}
)
security_analysis = await self.mcp_bridge.process_agent_request(security_request)
analysis_results["security"] = security_analysis
return {
"path": file_path,
"analysis": analysis_results,
"security_issues": security_analysis.get("issues", []),
"timestamp": asyncio.get_event_loop().time()
}
Integration Best Practices
Design Principles
- Loose Coupling: Keep agent logic separate from MCP resource access
- Security First: Always validate and sanitize external data
- Graceful Degradation: Handle MCP server failures elegantly
- Context Awareness: Expand context intelligently, not blindly
- Performance Optimization: Cache results and batch requests when possible
Common Pitfalls to Avoid
- Over-fetching: Don't retrieve more data than necessary
- Security Bypass: Never skip security validation for "trusted" sources
- Blocking Operations: Use async operations for all MCP calls
- Error Propagation: Handle MCP errors without crashing the agent
- Resource Leaks: Always clean up connections and resources