7.2 Core Agent Loop

Perceive → Reason → Act → Reflect cycle

The Agent Loop Foundation

The core agent loop represents the fundamental cycle that drives intelligent agent behavior. This cyclical process enables agents to continuously interact with their environment, make decisions, and improve their performance over time.

Core Loop Components

The agent loop consists of four essential phases: Perceive (gather information), Reason (process and plan), Act (execute decisions), and Reflect (learn and improve). This cycle repeats continuously to maintain agent autonomy and effectiveness.

The Four Phases

👁️ Perceive

Environmental Sensing

  • Gather sensory input from environment
  • Process incoming messages and data
  • Update knowledge base with new information
  • Identify changes and opportunities

Input: Raw environmental data, messages, sensors

🧠 Reason

Decision Making & Planning

  • Analyze perceived information
  • Generate possible action plans
  • Evaluate options against goals
  • Select optimal course of action

Output: Action plan, decision rationale

⚡ Act

Action Execution

  • Execute planned actions
  • Use tools and resources
  • Communicate with other agents
  • Modify environment state

Result: Environmental changes, communications sent

🔄 Reflect

Learning & Improvement

  • Evaluate action outcomes
  • Update knowledge and beliefs
  • Learn from successes and failures
  • Adjust strategies for future cycles

Impact: Improved future decision making

Implementation Patterns

The core agent loop can be implemented in various ways depending on the agent framework and application requirements.

class CoreAgentLoop:
    def __init__(self):
        self.perception_module = PerceptionModule()
        self.reasoning_module = ReasoningModule()
        self.action_module = ActionModule()
        self.reflection_module = ReflectionModule()
        self.memory = AgentMemory()
    
    def run_cycle(self):
        """Execute one complete agent loop cycle"""
        # Phase 1: Perceive
        perception_data = self.perception_module.perceive()
        self.memory.store_perception(perception_data)
        
        # Phase 2: Reason
        action_plan = self.reasoning_module.reason(
            perception_data, 
            self.memory.get_context()
        )
        
        # Phase 3: Act
        action_result = self.action_module.execute(action_plan)
        
        # Phase 4: Reflect
        reflection = self.reflection_module.reflect(
            perception_data,
            action_plan, 
            action_result
        )
        
        # Update memory with complete cycle
        self.memory.store_cycle(
            perception_data, 
            action_plan, 
            action_result, 
            reflection
        )
        
        return action_result
class PerceptionModule:
    def perceive(self):
        """Gather information from environment and internal state"""
        return {
            'environment_state': self.sense_environment(),
            'internal_state': self.check_internal_status(),
            'messages': self.receive_messages(),
            'goals': self.get_current_goals()
        }

class ReasoningModule:
    def reason(self, perception, context):
        """Generate action plan based on perception and context"""
        # Analyze current situation
        situation_analysis = self.analyze_situation(perception, context)
        
        # Generate possible actions
        action_options = self.generate_options(situation_analysis)
        
        # Evaluate and select best action
        selected_action = self.evaluate_options(action_options, context)
        
        return selected_action

class ActionModule:
    def execute(self, action_plan):
        """Execute the planned action"""
        try:
            result = self.perform_action(action_plan)
            return {'success': True, 'result': result}
        except Exception as e:
            return {'success': False, 'error': str(e)}

class ReflectionModule:
    def reflect(self, perception, action_plan, action_result):
        """Learn from the cycle and update knowledge"""
        # Evaluate action effectiveness
        effectiveness = self.evaluate_effectiveness(
            action_plan, 
            action_result
        )
        
        # Update beliefs and knowledge
        knowledge_updates = self.update_knowledge(
            perception, 
            action_result,
            effectiveness
        )
        
        # Generate learning insights
        insights = self.generate_insights(effectiveness, knowledge_updates)
        
        return {
            'effectiveness': effectiveness,
            'knowledge_updates': knowledge_updates,
            'insights': insights
        }

Continuous Loop Operation

Agents typically run the core loop continuously, adapting their behavior based on environmental changes and learning from experience.

class ContinuousAgent:
    def __init__(self):
        self.loop = CoreAgentLoop()
        self.running = False
        self.cycle_count = 0
    
    def start(self):
        """Start the continuous agent loop"""
        self.running = True
        
        while self.running:
            try:
                # Execute one cycle
                result = self.loop.run_cycle()
                self.cycle_count += 1
                
                # Optional: pause between cycles
                if self.should_pause():
                    time.sleep(self.get_pause_duration())
                
                # Check for termination conditions
                if self.should_terminate():
                    self.stop()
                    
            except Exception as e:
                self.handle_error(e)
    
    def stop(self):
        """Stop the agent loop gracefully"""
        self.running = False
        self.loop.reflection_module.final_reflection()
    
    def should_pause(self):
        """Determine if agent should pause between cycles"""
        return not self.has_urgent_tasks()
    
    def should_terminate(self):
        """Check termination conditions"""
        return (
            self.all_goals_achieved() or 
            self.cycle_count > self.max_cycles or
            self.received_stop_signal()
        )

Advanced Loop Patterns

🔄 Interruptible Loop

Allows external interruption for urgent events

def interruptible_cycle(self):
    # Check for interrupts before each phase
    if self.has_interrupt():
        return self.handle_interrupt()
    
    perception = self.perceive()
    
    if self.has_interrupt():
        return self.handle_interrupt()
    
    plan = self.reason(perception)
    # ... continue with interrupt checks

⚡ Parallel Processing

Run phases concurrently when possible

async def parallel_cycle(self):
    # Perception and reflection can run in parallel
    perception_task = asyncio.create_task(
        self.perceive_async()
    )
    reflection_task = asyncio.create_task(
        self.reflect_on_previous_cycle()
    )
    
    perception, reflection = await asyncio.gather(
        perception_task, reflection_task
    )
    
    # Reasoning and action are sequential
    plan = await self.reason_async(perception)
    result = await self.act_async(plan)
Planning Layer
Strategic reasoning, long-term goals, resource allocation
Execution Layer
Task decomposition, plan monitoring, coordination
Reactive Layer
Immediate responses, safety behaviors, reflex actions

Three-Layer Architecture Implementation

import asyncio import time from abc import ABC, abstractmethod from typing import Dict, List, Any, Optional, Callable from dataclasses import dataclass from enum import Enum import threading class LayerPriority(Enum): REACTIVE = 1 EXECUTION = 2 PLANNING = 3 @dataclass class LayerMessage: source_layer: str target_layer: str message_type: str content: Dict[str, Any] priority: int timestamp: float class Layer(ABC): def __init__(self, name: str, agent_id: str): self.name = name self.agent_id = agent_id self.active = True self.message_queue = asyncio.Queue() self.performance_metrics = {} self.layer_state = {} @abstractmethod async def process(self, input_data: Dict) -> Dict: pass @abstractmethod def get_layer_type(self) -> LayerPriority: pass async def send_message(self, target_layer: str, message_type: str, content: Dict): """Send message to another layer""" message = LayerMessage( source_layer=self.name, target_layer=target_layer, message_type=message_type, content=content, priority=self.get_layer_type().value, timestamp=time.time() ) # This would be handled by the architecture coordinator return message class ReactiveLayer(Layer): def __init__(self, agent_id: str): super().__init__("reactive", agent_id) self.behaviors = {} self.inhibition_rules = {} self.activation_threshold = 0.5 def get_layer_type(self) -> LayerPriority: return LayerPriority.REACTIVE def add_behavior(self, name: str, condition: Callable, action: Callable, priority: int): """Add a reactive behavior""" self.behaviors[name] = { "condition": condition, "action": action, "priority": priority, "active": True } async def process(self, input_data: Dict) -> Dict: """Process sensory input and generate immediate responses""" activated_behaviors = [] # Check all behavior conditions for name, behavior in self.behaviors.items(): if behavior["active"] and behavior["condition"](input_data): activation_strength = self._calculate_activation(input_data, behavior) if activation_strength > self.activation_threshold: activated_behaviors.append({ "name": name, "behavior": behavior, "strength": activation_strength }) # Apply subsumption and arbitration winning_behavior = self._arbitrate_behaviors(activated_behaviors) if winning_behavior: result = await winning_behavior["behavior"]["action"](input_data) # Send urgent messages to upper layers if needed if self._is_critical_situation(input_data): await self.send_message( "execution", "urgent_situation", {"situation": input_data, "response": result} ) return { "action": result, "behavior_used": winning_behavior["name"], "confidence": winning_behavior["strength"] } return {"action": None, "behavior_used": None} def _arbitrate_behaviors(self, activated_behaviors: List[Dict]) -> Optional[Dict]: """Select winning behavior using priority and strength""" if not activated_behaviors: return None # Sort by priority first, then by activation strength activated_behaviors.sort( key=lambda x: (x["behavior"]["priority"], x["strength"]), reverse=True ) return activated_behaviors[0] class ExecutionLayer(Layer): def __init__(self, agent_id: str): super().__init__("execution", agent_id) self.current_plan = None self.task_queue = [] self.monitoring_active = True self.plan_library = PlanLibrary() def get_layer_type(self) -> LayerPriority: return LayerPriority.EXECUTION async def process(self, input_data: Dict) -> Dict: """Execute current plan and monitor progress""" results = {} # Handle messages from other layers await self._process_layer_messages() # Monitor current plan execution if self.current_plan: monitoring_result = await self._monitor_plan_execution(input_data) results["monitoring"] = monitoring_result if monitoring_result.get("needs_replanning"): await self.send_message( "planning", "replan_request", {"current_plan": self.current_plan, "situation": input_data} ) # Execute next step in current plan if self.current_plan and not self.current_plan.get("completed"): execution_result = await self._execute_plan_step(input_data) results["execution"] = execution_result # Process task queue if self.task_queue: task_result = await self._process_next_task() results["task_processing"] = task_result return results async def _monitor_plan_execution(self, input_data: Dict) -> Dict: """Monitor the execution of the current plan""" if not self.current_plan: return {"status": "no_plan"} plan_progress = self._assess_plan_progress() environment_changes = self._detect_environment_changes(input_data) needs_replanning = ( plan_progress.get("stuck", False) or environment_changes.get("significant_change", False) or plan_progress.get("success_probability", 1.0) < 0.3 ) return { "progress": plan_progress, "environment_changes": environment_changes, "needs_replanning": needs_replanning, "status": "monitoring" } async def _execute_plan_step(self, input_data: Dict) -> Dict: """Execute the next step in the current plan""" current_step = self.current_plan.get("current_step", 0) steps = self.current_plan.get("steps", []) if current_step >= len(steps): self.current_plan["completed"] = True return {"status": "plan_completed"} step = steps[current_step] try: # Execute the step step_result = await self._execute_step(step, input_data) if step_result.get("success"): self.current_plan["current_step"] = current_step + 1 return { "status": "step_completed", "step": step, "result": step_result } else: return { "status": "step_failed", "step": step, "error": step_result.get("error") } except Exception as e: return { "status": "execution_error", "step": step, "error": str(e) } class PlanningLayer(Layer): def __init__(self, agent_id: str): super().__init__("planning", agent_id) self.goals = [] self.world_model = WorldModel() self.planner = Planner() self.resource_manager = ResourceManager() def get_layer_type(self) -> LayerPriority: return LayerPriority.PLANNING async def process(self, input_data: Dict) -> Dict: """High-level planning and goal management""" results = {} # Update world model with new information await self.world_model.update(input_data) # Handle messages from other layers messages = await self._process_layer_messages() # Handle replanning requests for message in messages: if message.message_type == "replan_request": new_plan = await self._handle_replan_request(message.content) await self.send_message( "execution", "new_plan", {"plan": new_plan} ) results["replanning"] = new_plan # Goal management goal_updates = await self._manage_goals(input_data) results["goal_management"] = goal_updates # Resource planning resource_allocation = await self.resource_manager.plan_allocation( self.goals, self.world_model.get_state() ) results["resource_planning"] = resource_allocation # Strategic planning for new goals if self._should_create_new_plan(): new_strategic_plan = await self._create_strategic_plan() results["strategic_planning"] = new_strategic_plan return results async def _handle_replan_request(self, request_content: Dict) -> Dict: """Handle replanning requests from execution layer""" current_plan = request_content.get("current_plan") situation = request_content.get("situation") # Analyze why replanning is needed analysis = self._analyze_replanning_need(current_plan, situation) # Generate new plan based on analysis new_plan = await self.planner.create_plan( goals=self._extract_relevant_goals(current_plan), world_state=self.world_model.get_state(), constraints=analysis.get("constraints", []) ) return new_plan class LayeredAgentArchitecture: def __init__(self, agent_id: str): self.agent_id = agent_id self.reactive_layer = ReactiveLayer(agent_id) self.execution_layer = ExecutionLayer(agent_id) self.planning_layer = PlanningLayer(agent_id) self.message_bus = MessageBus() self.coordination_active = True async def start(self): """Start all layers and coordination""" # Start each layer in separate tasks tasks = [ asyncio.create_task(self._run_layer(self.reactive_layer)), asyncio.create_task(self._run_layer(self.execution_layer)), asyncio.create_task(self._run_layer(self.planning_layer)), asyncio.create_task(self._coordinate_layers()) ] await asyncio.gather(*tasks) async def _run_layer(self, layer: Layer): """Run a single layer continuously""" while layer.active: try: # Get input appropriate for this layer layer_input = await self._get_layer_input(layer) # Process input result = await layer.process(layer_input) # Handle layer output await self._handle_layer_output(layer, result) # Layer-specific sleep intervals if isinstance(layer, ReactiveLayer): await asyncio.sleep(0.01) # 100Hz for reactive elif isinstance(layer, ExecutionLayer): await asyncio.sleep(0.1) # 10Hz for execution else: # Planning layer await asyncio.sleep(1.0) # 1Hz for planning except Exception as e: print(f"Error in {layer.name} layer: {e}") async def _coordinate_layers(self): """Coordinate communication between layers""" while self.coordination_active: try: # Handle inter-layer messages await self.message_bus.process_messages() # Monitor layer health health_status = self._monitor_layer_health() # Resolve conflicts between layers await self._resolve_layer_conflicts() await asyncio.sleep(0.05) # 20Hz coordination except Exception as e: print(f"Error in coordination: {e}")