5.4 Chaining Tools & Planning

Sequential vs parallel tool execution design patterns

Overview

Tool chaining enables AI agents to break down complex tasks into manageable steps, orchestrating multiple tools in sequence or parallel to achieve sophisticated workflows. This section explores design patterns for effective tool coordination and planning strategies.

Key Concepts

  • Sequential Chaining: Tools executed one after another with dependencies
  • Parallel Execution: Independent tools running concurrently
  • Conditional Routing: Dynamic path selection based on results
  • State Management: Maintaining context across tool invocations

Execution Patterns

1. Sequential Chain Pattern

Tool A
Tool B
Tool C
Result
class SequentialChain: def __init__(self, tools: List[Tool]): self.tools = tools self.state = {} async def execute(self, initial_input): """Execute tools sequentially, passing output to next input""" current_input = initial_input for i, tool in enumerate(self.tools): # Update state with current step self.state[f'step_{i}_input'] = current_input try: result = await tool.execute(current_input) self.state[f'step_{i}_output'] = result current_input = result except Exception as e: # Handle errors and potentially retry or skip if tool.is_optional: continue else: raise ChainExecutionError(f"Failed at step {i}: {e}") return current_input

2. Parallel Execution Pattern

Input
Tool A
Tool B
Tool C
Merge
class ParallelChain: def __init__(self, tools: List[Tool], merge_strategy: str = 'combine'): self.tools = tools self.merge_strategy = merge_strategy async def execute(self, input_data): """Execute tools in parallel and merge results""" # Create tasks for parallel execution tasks = [ asyncio.create_task(tool.execute(input_data)) for tool in self.tools ] # Wait for all tasks to complete results = await asyncio.gather(*tasks, return_exceptions=True) # Handle partial failures successful_results = [] errors = [] for i, result in enumerate(results): if isinstance(result, Exception): errors.append(f"Tool {i} failed: {result}") else: successful_results.append(result) # Merge successful results return self.merge_results(successful_results, errors) def merge_results(self, results, errors): if self.merge_strategy == 'combine': return {'results': results, 'errors': errors} elif self.merge_strategy == 'first_success': return results[0] if results else None elif self.merge_strategy == 'majority': return self.find_consensus(results)

LangChain Tool Chains

Simple Sequential Chain

from langchain.chains import SimpleSequentialChain from langchain.chains import LLMChain from langchain.prompts import PromptTemplate # Define individual chains synopsis_template = """You are a playwright. Given the title of play, write a synopsis for that title. Title: {title} Synopsis:""" synopsis_prompt = PromptTemplate( input_variables=["title"], template=synopsis_template ) synopsis_chain = LLMChain(llm=llm, prompt=synopsis_prompt) review_template = """You are a play critic. Given the synopsis of play, write a review for that play. Synopsis: {synopsis} Review:""" review_prompt = PromptTemplate( input_variables=["synopsis"], template=review_template ) review_chain = LLMChain(llm=llm, prompt=review_prompt) # Combine into sequential chain overall_chain = SimpleSequentialChain( chains=[synopsis_chain, review_chain], verbose=True ) # Execute the chain result = overall_chain.run("Romeo and Juliet")

Sequential Chain with Multiple Inputs/Outputs

from langchain.chains import SequentialChain # First chain: analyze the topic analysis_chain = LLMChain( llm=llm, prompt=analysis_prompt, output_key="analysis" ) # Second chain: create content based on analysis content_chain = LLMChain( llm=llm, prompt=content_prompt, output_key="content" ) # Third chain: review and improve review_chain = LLMChain( llm=llm, prompt=review_prompt, output_key="final_content" ) # Combine all chains sequential_chain = SequentialChain( chains=[analysis_chain, content_chain, review_chain], input_variables=["topic", "audience"], output_variables=["analysis", "content", "final_content"], verbose=True )

LangGraph Stateful Workflows

Graph-Based Tool Orchestration

from langgraph.graph import StateGraph, END from typing import TypedDict, List class AgentState(TypedDict): messages: List[str] next_action: str context: dict results: dict def research_node(state: AgentState): """Conduct research using search tools""" query = state["messages"][-1] # Use search tool search_results = search_tool.invoke(query) return { "results": {**state["results"], "research": search_results}, "next_action": "analyze" } def analyze_node(state: AgentState): """Analyze research results""" research_data = state["results"]["research"] # Use analysis tool analysis = analysis_tool.invoke(research_data) return { "results": {**state["results"], "analysis": analysis}, "next_action": "synthesize" } def synthesize_node(state: AgentState): """Synthesize final response""" research = state["results"]["research"] analysis = state["results"]["analysis"] # Combine information synthesis = synthesis_tool.invoke({ "research": research, "analysis": analysis }) return { "results": {**state["results"], "final": synthesis}, "next_action": "end" } def route_next(state: AgentState): """Determine next node based on state""" action = state["next_action"] if action == "analyze": return "analyze" elif action == "synthesize": return "synthesize" else: return END # Build the graph workflow = StateGraph(AgentState) # Add nodes workflow.add_node("research", research_node) workflow.add_node("analyze", analyze_node) workflow.add_node("synthesize", synthesize_node) # Add edges workflow.set_entry_point("research") workflow.add_conditional_edges("research", route_next) workflow.add_conditional_edges("analyze", route_next) workflow.add_conditional_edges("synthesize", route_next) # Compile the graph app = workflow.compile()

Planning Strategies

Forward Planning

Start with the goal and work backward to determine required steps and tools.

  • Goal decomposition
  • Dependency analysis
  • Resource allocation
  • Timeline estimation
Reactive Planning

Adapt the plan dynamically based on intermediate results and changing conditions.

  • Result evaluation
  • Path adjustment
  • Error recovery
  • Opportunistic optimization
Hierarchical Planning

Break complex tasks into nested sub-plans with different levels of abstraction.

  • Multi-level decomposition
  • Abstract task modeling
  • Recursive planning
  • Context inheritance
Constraint-Based Planning

Consider resource constraints, dependencies, and requirements when planning execution.

  • Resource constraints
  • Temporal dependencies
  • Quality requirements
  • Cost optimization

Advanced Orchestration Patterns

1. Fan-out/Fan-in Pattern

class FanOutFanIn: async def execute(self, input_data): # Fan-out: distribute input to multiple processors chunks = self.split_input(input_data) # Process chunks in parallel tasks = [ self.process_chunk(chunk) for chunk in chunks ] results = await asyncio.gather(*tasks) # Fan-in: merge results return self.merge_results(results)

2. Pipeline with Checkpoints

class CheckpointPipeline: def __init__(self, tools, checkpoint_strategy='always'): self.tools = tools self.checkpoint_strategy = checkpoint_strategy self.checkpoints = {} async def execute(self, input_data, resume_from=None): start_index = resume_from or 0 current_data = input_data for i in range(start_index, len(self.tools)): tool = self.tools[i] # Execute tool current_data = await tool.execute(current_data) # Save checkpoint if needed if self.should_checkpoint(i): self.save_checkpoint(i, current_data) return current_data def save_checkpoint(self, step, data): self.checkpoints[step] = { 'data': data, 'timestamp': time.time() }

3. Conditional Branching

class ConditionalBranch: def __init__(self, condition_func, true_chain, false_chain): self.condition = condition_func self.true_chain = true_chain self.false_chain = false_chain async def execute(self, input_data): if await self.condition(input_data): return await self.true_chain.execute(input_data) else: return await self.false_chain.execute(input_data) # Example usage sentiment_branch = ConditionalBranch( condition_func=lambda data: analyze_sentiment(data) > 0.5, true_chain=positive_response_chain, false_chain=negative_response_chain )

Best Practices

State Management
  • Keep state minimal and focused
  • Use immutable state updates
  • Implement state validation
  • Plan for state recovery
Error Handling
  • Implement graceful degradation
  • Use circuit breakers
  • Plan alternative paths
  • Log execution traces
Performance
  • Optimize for parallel execution
  • Cache intermediate results
  • Use streaming when possible
  • Monitor execution metrics
Debugging
  • Add comprehensive logging
  • Implement visualization tools
  • Create step-by-step traces
  • Enable replay capabilities