Cost-bounded research agent#
Enforce a hard USD and wall-clock cap on an LLM-driven research loop using LangGOAP’s composable early-termination policies.
An open-ended research agent (search → summarize → follow up → synthesize → revise) can quietly burn through $50 before anyone notices. This notebook wires up three policies that together guarantee the run halts as soon as either budget runs out:
MaxCostPolicy(usd=5.00)readsworld_state['total_cost_usd']and halts when it crosses the cap.MaxWallClockPolicy(seconds=600)reads thewall_clock_started_atanchor that the planner stamps on its first invocation.FirstOfPolicy(...)composes both: terminate when any inner policy fires.
The actions in this notebook simulate LLM cost emission
deterministically. To swap in real LLM calls, replace each
action’s execute with a ChatOpenAI invocation wrapped in a
callback handler that accumulates
response.usage_metadata.total_tokens * cost_per_token into
world_state['total_cost_usd']. The exact pattern is shown at
the end of the notebook.
Backed by tests/integration/test_early_termination.py.
Domain — five research steps, each with its own $cost#
Action |
Cost (USD) |
Effect |
|---|---|---|
|
$0.50 |
|
|
$1.20 |
|
|
$1.50 |
|
|
$1.80 |
|
|
$0.40 |
|
Total if every step runs: $5.40 — the $5.00 cap is enough for
the first four steps but cuts off polish_report. The agent must
produce the best possible result within budget.
from typing import Any
from langgoap import (
ActionSpec,
FirstOfPolicy,
GoalSpec,
GoapGraph,
MaxCostPolicy,
MaxWallClockPolicy,
)
def research_step(name: str, dollars: float, effect_key: str) -> Any:
"""Build an action that simulates an LLM-backed research step.
The action accumulates ``total_cost_usd`` into world state — this
is exactly the shape ``MaxCostPolicy`` reads. In production a
LangChain ``BaseCallbackHandler`` populates this key from
``response.usage_metadata`` on every LLM call.
"""
def execute(world_state: dict[str, Any]) -> dict[str, Any]:
spent = float(world_state.get('total_cost_usd', 0.0))
return {
effect_key: True,
'total_cost_usd': spent + dollars,
'last_step': name,
}
return execute
actions = [
ActionSpec(
name='gather_sources',
preconditions={},
effects={'sources_found': True, 'total_cost_usd': 0.0},
execute=research_step('gather_sources', 0.50, 'sources_found'),
),
ActionSpec(
name='summarise',
preconditions={'sources_found': True},
effects={'summaries_drafted': True, 'total_cost_usd': 0.0},
execute=research_step('summarise', 1.20, 'summaries_drafted'),
),
ActionSpec(
name='cross_check',
preconditions={'summaries_drafted': True},
effects={'cross_checked': True, 'total_cost_usd': 0.0},
execute=research_step('cross_check', 1.50, 'cross_checked'),
),
ActionSpec(
name='draft_report',
preconditions={'cross_checked': True},
effects={'report_drafted': True, 'total_cost_usd': 0.0},
execute=research_step('draft_report', 1.80, 'report_drafted'),
),
ActionSpec(
name='polish_report',
preconditions={'report_drafted': True},
effects={'report_polished': True, 'total_cost_usd': 0.0},
execute=research_step('polish_report', 0.40, 'report_polished'),
),
]
goal = GoalSpec(conditions={'report_polished': True})
print(f'Total cost if every step runs: $'
f'{sum([0.50, 1.20, 1.50, 1.80, 0.40]):.2f}')
Total cost if every step runs: $5.40
Without termination policies — the agent overshoots the budget#
The default agent runs every step and spends $5.40, $0.40 over budget. No alarm; the goal is achieved.
graph_unbounded = GoapGraph(actions)
result = graph_unbounded.invoke(goal=goal, world_state={})
executed = [
h.action_name for h in result['execution_history'] if h.success
]
print(f"status: {result['status']}")
print(f"steps: {executed}")
print(f"spent: ${result['world_state']['total_cost_usd']:.2f}")
status: goal_achieved
steps: ['gather_sources', 'summarise', 'cross_check', 'draft_report', 'polish_report']
spent: $5.40
With FirstOfPolicy(MaxCost($5.00), MaxWallClock(10min))#
The composite halts on whichever budget runs out first. Cost
accumulates faster than wall-clock here, so MaxCostPolicy fires
after draft_report ($5.00 spent). polish_report is skipped.
Final status is terminated — distinct from goal_achieved so
downstream tooling knows the writeup is partial.
graph_bounded = GoapGraph(
actions,
termination_policies=[
FirstOfPolicy(
MaxCostPolicy(usd=5.00),
MaxWallClockPolicy(seconds=600),
),
],
)
result = graph_bounded.invoke(goal=goal, world_state={})
executed = [
h.action_name for h in result['execution_history'] if h.success
]
print(f"status: {result['status']}")
print(f"steps run: {executed}")
print(f"spent: ${result['world_state']['total_cost_usd']:.2f}")
print(f"reason: {result.get('replan_reason')}")
status: terminated
steps run: ['gather_sources', 'summarise', 'cross_check', 'draft_report']
spent: $5.00
reason: MaxCostPolicy: Cost budget of $5.0000 exceeded (spent=$5.0000)
Surface a partial-but-honest summary on termination#
When status == 'terminated', your wrapping code knows the writeup is
incomplete and can render an honest progress report — what shipped,
what got skipped, why it stopped. This is the production pattern:
the agent never lies about its budget.
ALL_STEPS = [
'gather_sources',
'summarise',
'cross_check',
'draft_report',
'polish_report',
]
def render_partial_report(result: dict[str, Any]) -> str:
executed = [
h.action_name for h in result['execution_history'] if h.success
]
skipped = [s for s in ALL_STEPS if s not in executed]
spent = result['world_state'].get('total_cost_usd', 0.0)
lines = [
f"## Competitor-Analysis Writeup ({result['status']})",
'',
f'- Spent: ${spent:.2f}',
f'- Completed steps: {executed}',
]
if skipped:
lines.append(f'- Skipped (over budget): {skipped}')
lines.append(
f' Termination reason: {result.get("replan_reason")!r}'
)
return '\n'.join(lines)
print(render_partial_report(result))
## Competitor-Analysis Writeup (terminated)
- Spent: $5.00
- Completed steps: ['gather_sources', 'summarise', 'cross_check', 'draft_report']
- Skipped (over budget): ['polish_report']
Termination reason: 'MaxCostPolicy: Cost budget of $5.0000 exceeded (spent=$5.0000)'
Production pattern — wire a real LLM via a callback handler#
The action above simulates cost; for a real research agent each step
calls an LLM and the cost is whatever the model reports. LangChain’s
BaseCallbackHandler is the natural integration point — accumulate
response.usage_metadata into total_cost_usd and MaxCostPolicy
reads it without any further wiring.
from langchain_core.callbacks import BaseCallbackHandler
from langchain_openai import ChatOpenAI
# OpenAI gpt-4o-mini pricing as of 2026-04 — adjust per your provider.
INPUT_PER_1K = 0.00015
OUTPUT_PER_1K = 0.0006
class CostAccumulator(BaseCallbackHandler):
def __init__(self, world_state: dict) -> None:
self.world_state = world_state
def on_llm_end(self, response, **_):
usage = response.llm_output.get('token_usage', {})
cost = (
usage.get('prompt_tokens', 0) / 1000 * INPUT_PER_1K
+ usage.get('completion_tokens', 0) / 1000 * OUTPUT_PER_1K
)
spent = self.world_state.get('total_cost_usd', 0.0)
self.world_state['total_cost_usd'] = spent + cost
def real_summarise(world_state):
accumulator = CostAccumulator(world_state)
llm = ChatOpenAI(model='gpt-4o-mini', temperature=0,
callbacks=[accumulator])
sources = world_state['sources']
response = llm.invoke(f'Summarise these sources:\n{sources}')
return {'summaries_drafted': True,
'summary_text': response.content,
'total_cost_usd': world_state['total_cost_usd']}
Same MaxCostPolicy(usd=5.00) works unchanged — the policy just
reads total_cost_usd.
Mix and match termination policies#
Beyond cost and wall-clock, LangGOAP ships:
MaxActionsPolicy(N)— terminate after N executed actions.MaxTokensPolicy(N)— readtotal_tokensfrom world state.MaxLLMCallsPolicy(N)— readllm_call_countfrom world state.OnStuckPolicy()— graceful (error=False) termination when the planner emitsno_plan. Useful with utility-style agents.FirstOfPolicy(*policies)— terminate when ANY inner fires.AllOfPolicy(*policies)— terminate only when EVERY inner fires.
Custom policies satisfy a tiny TerminationPolicy Protocol — a
name attribute and a should_terminate(state) method returning
EarlyTermination | None. See langgoap/termination.py for the
source.
Next steps#
See
basics/termination_policies.ipynbfor the same policies in isolation on a synthetic workload.Wire
CostAccumulatorinto a real LangChainBaseCallbackHandlerto driveMaxCostPolicyfrom live LLM usage rather than per-action constants.Compose
MaxCostPolicywithMaxWallClockPolicyandMaxLLMCallsPolicyviaFirstOfPolicyfor layered safety nets.