Cost-bounded research agent#

Enforce a hard USD and wall-clock cap on an LLM-driven research loop using LangGOAP’s composable early-termination policies.

An open-ended research agent (search → summarize → follow up → synthesize → revise) can quietly burn through $50 before anyone notices. This notebook wires up three policies that together guarantee the run halts as soon as either budget runs out:

  • MaxCostPolicy(usd=5.00) reads world_state['total_cost_usd'] and halts when it crosses the cap.

  • MaxWallClockPolicy(seconds=600) reads the wall_clock_started_at anchor that the planner stamps on its first invocation.

  • FirstOfPolicy(...) composes both: terminate when any inner policy fires.

The actions in this notebook simulate LLM cost emission deterministically. To swap in real LLM calls, replace each action’s execute with a ChatOpenAI invocation wrapped in a callback handler that accumulates response.usage_metadata.total_tokens * cost_per_token into world_state['total_cost_usd']. The exact pattern is shown at the end of the notebook.

Backed by tests/integration/test_early_termination.py.

Domain — five research steps, each with its own $cost#

Action

Cost (USD)

Effect

gather_sources

$0.50

sources_found

summarise

$1.20

summaries_drafted

cross_check

$1.50

cross_checked

draft_report

$1.80

report_drafted

polish_report

$0.40

report_polished

Total if every step runs: $5.40 — the $5.00 cap is enough for the first four steps but cuts off polish_report. The agent must produce the best possible result within budget.

from typing import Any

from langgoap import (
    ActionSpec,
    FirstOfPolicy,
    GoalSpec,
    GoapGraph,
    MaxCostPolicy,
    MaxWallClockPolicy,
)


def research_step(name: str, dollars: float, effect_key: str) -> Any:
    """Build an action that simulates an LLM-backed research step.

    The action accumulates ``total_cost_usd`` into world state — this
    is exactly the shape ``MaxCostPolicy`` reads.  In production a
    LangChain ``BaseCallbackHandler`` populates this key from
    ``response.usage_metadata`` on every LLM call.
    """

    def execute(world_state: dict[str, Any]) -> dict[str, Any]:
        spent = float(world_state.get('total_cost_usd', 0.0))
        return {
            effect_key: True,
            'total_cost_usd': spent + dollars,
            'last_step': name,
        }

    return execute


actions = [
    ActionSpec(
        name='gather_sources',
        preconditions={},
        effects={'sources_found': True, 'total_cost_usd': 0.0},
        execute=research_step('gather_sources', 0.50, 'sources_found'),
    ),
    ActionSpec(
        name='summarise',
        preconditions={'sources_found': True},
        effects={'summaries_drafted': True, 'total_cost_usd': 0.0},
        execute=research_step('summarise', 1.20, 'summaries_drafted'),
    ),
    ActionSpec(
        name='cross_check',
        preconditions={'summaries_drafted': True},
        effects={'cross_checked': True, 'total_cost_usd': 0.0},
        execute=research_step('cross_check', 1.50, 'cross_checked'),
    ),
    ActionSpec(
        name='draft_report',
        preconditions={'cross_checked': True},
        effects={'report_drafted': True, 'total_cost_usd': 0.0},
        execute=research_step('draft_report', 1.80, 'report_drafted'),
    ),
    ActionSpec(
        name='polish_report',
        preconditions={'report_drafted': True},
        effects={'report_polished': True, 'total_cost_usd': 0.0},
        execute=research_step('polish_report', 0.40, 'report_polished'),
    ),
]

goal = GoalSpec(conditions={'report_polished': True})

print(f'Total cost if every step runs: $'
      f'{sum([0.50, 1.20, 1.50, 1.80, 0.40]):.2f}')
Total cost if every step runs: $5.40

Without termination policies — the agent overshoots the budget#

The default agent runs every step and spends $5.40, $0.40 over budget. No alarm; the goal is achieved.

graph_unbounded = GoapGraph(actions)

result = graph_unbounded.invoke(goal=goal, world_state={})

executed = [
    h.action_name for h in result['execution_history'] if h.success
]
print(f"status: {result['status']}")
print(f"steps:  {executed}")
print(f"spent:  ${result['world_state']['total_cost_usd']:.2f}")
status: goal_achieved
steps:  ['gather_sources', 'summarise', 'cross_check', 'draft_report', 'polish_report']
spent:  $5.40

With FirstOfPolicy(MaxCost($5.00), MaxWallClock(10min))#

The composite halts on whichever budget runs out first. Cost accumulates faster than wall-clock here, so MaxCostPolicy fires after draft_report ($5.00 spent). polish_report is skipped. Final status is terminated — distinct from goal_achieved so downstream tooling knows the writeup is partial.

graph_bounded = GoapGraph(
    actions,
    termination_policies=[
        FirstOfPolicy(
            MaxCostPolicy(usd=5.00),
            MaxWallClockPolicy(seconds=600),
        ),
    ],
)

result = graph_bounded.invoke(goal=goal, world_state={})

executed = [
    h.action_name for h in result['execution_history'] if h.success
]
print(f"status:        {result['status']}")
print(f"steps run:     {executed}")
print(f"spent:         ${result['world_state']['total_cost_usd']:.2f}")
print(f"reason:        {result.get('replan_reason')}")
status:        terminated
steps run:     ['gather_sources', 'summarise', 'cross_check', 'draft_report']
spent:         $5.00
reason:        MaxCostPolicy: Cost budget of $5.0000 exceeded (spent=$5.0000)

Surface a partial-but-honest summary on termination#

When status == 'terminated', your wrapping code knows the writeup is incomplete and can render an honest progress report — what shipped, what got skipped, why it stopped. This is the production pattern: the agent never lies about its budget.

ALL_STEPS = [
    'gather_sources',
    'summarise',
    'cross_check',
    'draft_report',
    'polish_report',
]


def render_partial_report(result: dict[str, Any]) -> str:
    executed = [
        h.action_name for h in result['execution_history'] if h.success
    ]
    skipped = [s for s in ALL_STEPS if s not in executed]
    spent = result['world_state'].get('total_cost_usd', 0.0)
    lines = [
        f"## Competitor-Analysis Writeup ({result['status']})",
        '',
        f'- Spent: ${spent:.2f}',
        f'- Completed steps: {executed}',
    ]
    if skipped:
        lines.append(f'- Skipped (over budget): {skipped}')
        lines.append(
            f'  Termination reason: {result.get("replan_reason")!r}'
        )
    return '\n'.join(lines)


print(render_partial_report(result))
## Competitor-Analysis Writeup (terminated)

- Spent: $5.00
- Completed steps: ['gather_sources', 'summarise', 'cross_check', 'draft_report']
- Skipped (over budget): ['polish_report']
  Termination reason: 'MaxCostPolicy: Cost budget of $5.0000 exceeded (spent=$5.0000)'

Production pattern — wire a real LLM via a callback handler#

The action above simulates cost; for a real research agent each step calls an LLM and the cost is whatever the model reports. LangChain’s BaseCallbackHandler is the natural integration point — accumulate response.usage_metadata into total_cost_usd and MaxCostPolicy reads it without any further wiring.

from langchain_core.callbacks import BaseCallbackHandler
from langchain_openai import ChatOpenAI

# OpenAI gpt-4o-mini pricing as of 2026-04 — adjust per your provider.
INPUT_PER_1K = 0.00015
OUTPUT_PER_1K = 0.0006


class CostAccumulator(BaseCallbackHandler):
    def __init__(self, world_state: dict) -> None:
        self.world_state = world_state

    def on_llm_end(self, response, **_):
        usage = response.llm_output.get('token_usage', {})
        cost = (
            usage.get('prompt_tokens', 0) / 1000 * INPUT_PER_1K
            + usage.get('completion_tokens', 0) / 1000 * OUTPUT_PER_1K
        )
        spent = self.world_state.get('total_cost_usd', 0.0)
        self.world_state['total_cost_usd'] = spent + cost


def real_summarise(world_state):
    accumulator = CostAccumulator(world_state)
    llm = ChatOpenAI(model='gpt-4o-mini', temperature=0,
                     callbacks=[accumulator])
    sources = world_state['sources']
    response = llm.invoke(f'Summarise these sources:\n{sources}')
    return {'summaries_drafted': True,
            'summary_text': response.content,
            'total_cost_usd': world_state['total_cost_usd']}

Same MaxCostPolicy(usd=5.00) works unchanged — the policy just reads total_cost_usd.

Mix and match termination policies#

Beyond cost and wall-clock, LangGOAP ships:

  • MaxActionsPolicy(N) — terminate after N executed actions.

  • MaxTokensPolicy(N) — read total_tokens from world state.

  • MaxLLMCallsPolicy(N) — read llm_call_count from world state.

  • OnStuckPolicy() — graceful (error=False) termination when the planner emits no_plan. Useful with utility-style agents.

  • FirstOfPolicy(*policies) — terminate when ANY inner fires.

  • AllOfPolicy(*policies) — terminate only when EVERY inner fires.

Custom policies satisfy a tiny TerminationPolicy Protocol — a name attribute and a should_terminate(state) method returning EarlyTermination | None. See langgoap/termination.py for the source.

Next steps#

  • See basics/termination_policies.ipynb for the same policies in isolation on a synthetic workload.

  • Wire CostAccumulator into a real LangChain BaseCallbackHandler to drive MaxCostPolicy from live LLM usage rather than per-action constants.

  • Compose MaxCostPolicy with MaxWallClockPolicy and MaxLLMCallsPolicy via FirstOfPolicy for layered safety nets.