5.4 Chaining Tools & Planning
Sequential vs parallel tool execution design patterns
Overview
Tool chaining enables AI agents to break down complex tasks into manageable steps, orchestrating multiple tools in sequence or parallel to achieve sophisticated workflows. This section explores design patterns for effective tool coordination and planning strategies.
Key Concepts
- Sequential Chaining: Tools executed one after another with dependencies
- Parallel Execution: Independent tools running concurrently
- Conditional Routing: Dynamic path selection based on results
- State Management: Maintaining context across tool invocations
Execution Patterns
1. Sequential Chain Pattern
Tool A
→
Tool B
→
Tool C
→
Result
class SequentialChain:
def __init__(self, tools: List[Tool]):
self.tools = tools
self.state = {}
async def execute(self, initial_input):
"""Execute tools sequentially, passing output to next input"""
current_input = initial_input
for i, tool in enumerate(self.tools):
# Update state with current step
self.state[f'step_{i}_input'] = current_input
try:
result = await tool.execute(current_input)
self.state[f'step_{i}_output'] = result
current_input = result
except Exception as e:
# Handle errors and potentially retry or skip
if tool.is_optional:
continue
else:
raise ChainExecutionError(f"Failed at step {i}: {e}")
return current_input
2. Parallel Execution Pattern
Input
→
Tool A
Tool B
Tool C
→
Merge
class ParallelChain:
def __init__(self, tools: List[Tool],
merge_strategy: str = 'combine'):
self.tools = tools
self.merge_strategy = merge_strategy
async def execute(self, input_data):
"""Execute tools in parallel and merge results"""
# Create tasks for parallel execution
tasks = [
asyncio.create_task(tool.execute(input_data))
for tool in self.tools
]
# Wait for all tasks to complete
results = await asyncio.gather(*tasks,
return_exceptions=True)
# Handle partial failures
successful_results = []
errors = []
for i, result in enumerate(results):
if isinstance(result, Exception):
errors.append(f"Tool {i} failed: {result}")
else:
successful_results.append(result)
# Merge successful results
return self.merge_results(successful_results, errors)
def merge_results(self, results, errors):
if self.merge_strategy == 'combine':
return {'results': results, 'errors': errors}
elif self.merge_strategy == 'first_success':
return results[0] if results else None
elif self.merge_strategy == 'majority':
return self.find_consensus(results)
LangChain Tool Chains
Simple Sequential Chain
from langchain.chains import SimpleSequentialChain
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
# Define individual chains
synopsis_template = """You are a playwright. Given the title of play,
write a synopsis for that title.
Title: {title}
Synopsis:"""
synopsis_prompt = PromptTemplate(
input_variables=["title"],
template=synopsis_template
)
synopsis_chain = LLMChain(llm=llm, prompt=synopsis_prompt)
review_template = """You are a play critic. Given the synopsis of play,
write a review for that play.
Synopsis: {synopsis}
Review:"""
review_prompt = PromptTemplate(
input_variables=["synopsis"],
template=review_template
)
review_chain = LLMChain(llm=llm, prompt=review_prompt)
# Combine into sequential chain
overall_chain = SimpleSequentialChain(
chains=[synopsis_chain, review_chain],
verbose=True
)
# Execute the chain
result = overall_chain.run("Romeo and Juliet")
Sequential Chain with Multiple Inputs/Outputs
from langchain.chains import SequentialChain
# First chain: analyze the topic
analysis_chain = LLMChain(
llm=llm,
prompt=analysis_prompt,
output_key="analysis"
)
# Second chain: create content based on analysis
content_chain = LLMChain(
llm=llm,
prompt=content_prompt,
output_key="content"
)
# Third chain: review and improve
review_chain = LLMChain(
llm=llm,
prompt=review_prompt,
output_key="final_content"
)
# Combine all chains
sequential_chain = SequentialChain(
chains=[analysis_chain, content_chain, review_chain],
input_variables=["topic", "audience"],
output_variables=["analysis", "content", "final_content"],
verbose=True
)
LangGraph Stateful Workflows
Graph-Based Tool Orchestration
from langgraph.graph import StateGraph, END
from typing import TypedDict, List
class AgentState(TypedDict):
messages: List[str]
next_action: str
context: dict
results: dict
def research_node(state: AgentState):
"""Conduct research using search tools"""
query = state["messages"][-1]
# Use search tool
search_results = search_tool.invoke(query)
return {
"results": {**state["results"], "research": search_results},
"next_action": "analyze"
}
def analyze_node(state: AgentState):
"""Analyze research results"""
research_data = state["results"]["research"]
# Use analysis tool
analysis = analysis_tool.invoke(research_data)
return {
"results": {**state["results"], "analysis": analysis},
"next_action": "synthesize"
}
def synthesize_node(state: AgentState):
"""Synthesize final response"""
research = state["results"]["research"]
analysis = state["results"]["analysis"]
# Combine information
synthesis = synthesis_tool.invoke({
"research": research,
"analysis": analysis
})
return {
"results": {**state["results"], "final": synthesis},
"next_action": "end"
}
def route_next(state: AgentState):
"""Determine next node based on state"""
action = state["next_action"]
if action == "analyze":
return "analyze"
elif action == "synthesize":
return "synthesize"
else:
return END
# Build the graph
workflow = StateGraph(AgentState)
# Add nodes
workflow.add_node("research", research_node)
workflow.add_node("analyze", analyze_node)
workflow.add_node("synthesize", synthesize_node)
# Add edges
workflow.set_entry_point("research")
workflow.add_conditional_edges("research", route_next)
workflow.add_conditional_edges("analyze", route_next)
workflow.add_conditional_edges("synthesize", route_next)
# Compile the graph
app = workflow.compile()
Planning Strategies
Forward Planning
Start with the goal and work backward to determine required steps and tools.
- Goal decomposition
- Dependency analysis
- Resource allocation
- Timeline estimation
Reactive Planning
Adapt the plan dynamically based on intermediate results and changing conditions.
- Result evaluation
- Path adjustment
- Error recovery
- Opportunistic optimization
Hierarchical Planning
Break complex tasks into nested sub-plans with different levels of abstraction.
- Multi-level decomposition
- Abstract task modeling
- Recursive planning
- Context inheritance
Constraint-Based Planning
Consider resource constraints, dependencies, and requirements when planning execution.
- Resource constraints
- Temporal dependencies
- Quality requirements
- Cost optimization
Advanced Orchestration Patterns
1. Fan-out/Fan-in Pattern
class FanOutFanIn:
async def execute(self, input_data):
# Fan-out: distribute input to multiple processors
chunks = self.split_input(input_data)
# Process chunks in parallel
tasks = [
self.process_chunk(chunk)
for chunk in chunks
]
results = await asyncio.gather(*tasks)
# Fan-in: merge results
return self.merge_results(results)
2. Pipeline with Checkpoints
class CheckpointPipeline:
def __init__(self, tools, checkpoint_strategy='always'):
self.tools = tools
self.checkpoint_strategy = checkpoint_strategy
self.checkpoints = {}
async def execute(self, input_data, resume_from=None):
start_index = resume_from or 0
current_data = input_data
for i in range(start_index, len(self.tools)):
tool = self.tools[i]
# Execute tool
current_data = await tool.execute(current_data)
# Save checkpoint if needed
if self.should_checkpoint(i):
self.save_checkpoint(i, current_data)
return current_data
def save_checkpoint(self, step, data):
self.checkpoints[step] = {
'data': data,
'timestamp': time.time()
}
3. Conditional Branching
class ConditionalBranch:
def __init__(self, condition_func, true_chain, false_chain):
self.condition = condition_func
self.true_chain = true_chain
self.false_chain = false_chain
async def execute(self, input_data):
if await self.condition(input_data):
return await self.true_chain.execute(input_data)
else:
return await self.false_chain.execute(input_data)
# Example usage
sentiment_branch = ConditionalBranch(
condition_func=lambda data: analyze_sentiment(data) > 0.5,
true_chain=positive_response_chain,
false_chain=negative_response_chain
)
Best Practices
State Management
- Keep state minimal and focused
- Use immutable state updates
- Implement state validation
- Plan for state recovery
Error Handling
- Implement graceful degradation
- Use circuit breakers
- Plan alternative paths
- Log execution traces
Performance
- Optimize for parallel execution
- Cache intermediate results
- Use streaming when possible
- Monitor execution metrics
Debugging
- Add comprehensive logging
- Implement visualization tools
- Create step-by-step traces
- Enable replay capabilities