"""LangGraph observer node \u2014 routes the graph after each action.
After each action execution the observer decides whether to:
- end (goal achieved, replan budget exhausted, no plan, or hard fail),
- re-enter the planner (replan), or
- continue executing (more actions remain in the current plan).
"""
from __future__ import annotations
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from langgoap.termination import TerminationPolicy
from langgraph.graph import END
from langgraph.types import Command
from langgoap.actions import ActionSpec
from langgoap.goals import GoalSpec, MultiGoal
from langgoap.graph.nodes._helpers import _planning_keys, logger
from langgoap.graph.state import ActionResult, GoapState
from langgoap.history import (
ExecutionRecord,
StoreExecutionHistory,
compute_goal_hash,
)
from langgoap.planner.types import Plan
from langgoap.state import PlanningState
from langgoap.tracing import NullTracer, PlanningTracer, SafeTracerProxy
from langgoap.types import ReplanStrategy
def _get_last_failed_action(state: GoapState) -> str | None:
"""Extract the action name from the last failed ActionResult in history."""
history: list[ActionResult] = state.get("execution_history", [])
for result in reversed(history):
if not result.success:
return result.action_name
return None
def _get_max_retries(action_name: str, actions: list[ActionSpec]) -> int:
"""Look up the max_retries value for *action_name*, defaulting to 0."""
for action in actions:
if action.name == action_name:
return action.max_retries
return 0
def _resolve_active_goal(state: GoapState) -> GoalSpec | None:
"""Resolve the goal that was active at termination (handles ``MultiGoal``)."""
raw_goal = state.get("goal")
if raw_goal is None:
return None
if isinstance(raw_goal, MultiGoal):
idx = state.get("current_subgoal_index", 0)
if idx >= len(raw_goal.goals):
idx = len(raw_goal.goals) - 1
return raw_goal.goals[idx]
return raw_goal
def _outcome_from_status(status: str) -> str | None:
"""Map a terminal status string to an ``ExecutionRecord`` outcome, or ``None``."""
if status == "goal_achieved":
return "success"
if status in {"failed", "no_plan", "error"}:
return "failed"
return None
def _is_goal_satisfied(goal: GoalSpec, world_state: dict[str, Any]) -> bool:
"""Return True when every goal condition is present and equal in ``world_state``.
``k in world_state`` must be checked first because ``dict.get(k)`` returns
``None`` for missing keys, which would incorrectly satisfy a ``None``-valued
goal condition when the key is simply absent.
:class:`~langgoap.planner.utility.NirvanaGoal` instances are
explicitly never satisfied — utility-AI agents drive themselves
forward without a completion criterion and the loop ends only when
the planner runs out of applicable actions.
"""
from langgoap.planner.utility import is_nirvana
if is_nirvana(goal):
return False
return all(
k in world_state and world_state[k] == v for k, v in goal.conditions.items()
)
def _route_multi_goal(
raw_goal: MultiGoal, state: GoapState, world_state: dict[str, Any]
) -> Command[str] | None:
"""Advance ``MultiGoal`` bookkeeping; ``None`` means fall through to single-goal logic.
In ``sequential`` mode, a satisfied sub-goal advances the index and
resets per-sub-goal accounting (replan budget, blacklist, failure counts).
In ``any`` mode, the planner already committed to a specific sub-goal via
``current_subgoal_index`` \u2014 satisfying it ends the run; we do not chase
the remaining alternatives.
"""
idx = state.get("current_subgoal_index", 0)
if idx >= len(raw_goal.goals):
return Command(goto=END, update={"status": "goal_achieved"})
current_sub = raw_goal.goals[idx]
if not _is_goal_satisfied(current_sub, world_state):
return None
if raw_goal.mode == "sequential" and idx + 1 < len(raw_goal.goals):
return Command(
goto="planner",
update={
"current_subgoal_index": idx + 1,
"plan": None,
"current_step": 0,
"replan_reason": "subgoal_achieved",
"replan_count": 0,
"blacklisted_actions": [],
"action_failure_counts": {},
},
)
return Command(goto=END, update={"status": "goal_achieved"})
def _route_replan_limit(goal: GoalSpec, state: GoapState) -> Command[str] | None:
"""Emit a terminal ``failed`` command when the replan budget is exhausted."""
replan_count: int = state.get("replan_count", 0)
if goal.policy.max_replans > 0 and replan_count >= goal.policy.max_replans:
return Command(
goto=END,
update={
"status": "failed",
"replan_reason": f"max_replans_exceeded ({goal.policy.max_replans})",
},
)
return None
[docs]
class GoapObserver:
"""LangGraph node that decides the next step via Command routing.
After each action execution, the observer checks:
1. Goal achieved \u2192 END
2. Action failed \u2192 replan
3. EVERY_ACTION replan strategy \u2192 replan
4. State deviation from expected \u2192 replan
5. More actions remain \u2192 continue executing
6. Plan exhausted but goal not met \u2192 replan
Args:
actions: The available action specs. When provided, deviation
detection compares only planning-relevant keys (those
appearing in action preconditions/effects and goal
conditions) rather than the entire world state. This
prevents non-planning data (document lists, LLM responses)
from triggering spurious replanning.
tracer: Optional :class:`PlanningTracer` invoked on terminal
states (``on_goal_achieved`` / ``on_plan_failed``).
**Known limitation (v0.1.0):** mid-run ``MultiGoal``
sub-goal completions do **not** emit tracer events \u2014 the
tracer stays silent until the whole ``MultiGoal``
terminates. A future ``on_subgoal_achieved`` hook is out
of scope for v0.1.0.
history: Optional :class:`StoreExecutionHistory` that records
one :class:`ExecutionRecord` per terminal state so downstream
analytics can query past runs by goal or by failing action.
For ``MultiGoal`` runs the record captures the sub-goal
that was active at termination.
"""
[docs]
def __init__(
self,
actions: list[ActionSpec] | None = None,
*,
tracer: PlanningTracer | None = None,
history: StoreExecutionHistory | None = None,
termination_policies: "list[TerminationPolicy] | None" = None,
) -> None:
self._actions = actions or []
# See ``GoapExecutor.__init__`` for the SafeTracerProxy rationale.
self._tracer: PlanningTracer = SafeTracerProxy(tracer or NullTracer())
self._history = history
self._termination_policies: list[TerminationPolicy] = (
list(termination_policies) if termination_policies else []
)
def _consult_termination(self, state: GoapState) -> Command[str] | None:
"""Return a terminal Command if any policy fires, else ``None``.
Each policy call is wrapped in try/except so a misbehaving
user policy never crashes the observer — termination is a
routing decision, not a place to leak exceptions.
"""
for policy in self._termination_policies:
try:
decision = policy.should_terminate(state)
except Exception as exc:
logger.warning(
"Termination policy %r raised: %s — treating as no-op",
getattr(policy, "name", type(policy).__name__),
exc,
)
continue
if decision is not None:
logger.info(
"Early termination by %s: %s",
decision.policy_name,
decision.reason,
)
return Command(
goto=END,
update={
"status": "terminated",
"replan_reason": (f"{decision.policy_name}: {decision.reason}"),
},
)
return None
def __call__(self, state: GoapState) -> Command[str]:
cmd = self._route(state)
self._fire_sync_tracer(state, cmd)
self._maybe_record_sync(state, cmd)
return cmd
async def acall(self, state: GoapState) -> Command[str]:
"""Async variant \u2014 mirrors :meth:`__call__` but fires async hooks."""
cmd = self._route(state)
await self._fire_async_tracer(state, cmd)
await self._maybe_record_async(state, cmd)
return cmd
def _fire_sync_tracer(self, state: GoapState, cmd: Command[str]) -> None:
update = cmd.update or {}
if cmd.goto == END:
status = update.get("status")
if status == "goal_achieved":
self._tracer.on_goal_achieved(state.get("world_state", {}))
elif status in {"failed", "no_plan", "error", "terminated"}:
reason = update.get("replan_reason") or status
self._tracer.on_plan_failed(reason, 0.0)
async def _fire_async_tracer(self, state: GoapState, cmd: Command[str]) -> None:
update = cmd.update or {}
if cmd.goto == END:
status = update.get("status")
if status == "goal_achieved":
await self._tracer.aon_goal_achieved(state.get("world_state", {}))
elif status in {"failed", "no_plan", "error", "terminated"}:
reason = update.get("replan_reason") or status
await self._tracer.aon_plan_failed(reason, 0.0)
def _build_record(
self, state: GoapState, cmd: Command[str]
) -> ExecutionRecord | None:
"""Return an ``ExecutionRecord`` for a terminal command, or ``None``.
Records are only built when the command routes to ``END`` with a
non-trivial status \u2014 no record is emitted for intermediate
observer decisions. For ``MultiGoal`` runs, the record captures
the sub-goal that was active at termination so downstream
analytics can attribute success/failure correctly.
"""
if cmd.goto != END:
return None
goal = _resolve_active_goal(state)
if goal is None:
return None
outcome = _outcome_from_status((cmd.update or {}).get("status", "unknown"))
if outcome is None:
return None
plan_obj: Plan | None = state.get("plan")
return ExecutionRecord(
goal_hash=compute_goal_hash(goal),
goal_conditions=dict(goal.conditions),
plan_actions=tuple(plan_obj.action_names) if plan_obj else (),
expected_cost=plan_obj.total_cost if plan_obj else 0.0,
actual_cost=plan_obj.total_cost if plan_obj else 0.0,
outcome=outcome,
replan_count=state.get("replan_count", 0),
timestamp=datetime.now(timezone.utc),
)
def _maybe_record_sync(self, state: GoapState, cmd: Command[str]) -> None:
if self._history is None:
return
record = self._build_record(state, cmd)
if record is None:
return
try:
self._history.record(record)
except Exception: # pragma: no cover - defensive
logger.exception("Failed to record execution history")
async def _maybe_record_async(self, state: GoapState, cmd: Command[str]) -> None:
if self._history is None:
return
record = self._build_record(state, cmd)
if record is None:
return
try:
await self._history.arecord(record)
except Exception: # pragma: no cover - defensive
logger.exception("Failed to record execution history")
def _route(self, state: GoapState) -> Command[str]:
raw_goal: GoalSpec | MultiGoal | None = state.get("goal")
world_state: dict[str, Any] = state.get("world_state", {})
plan_obj: Plan | None = state.get("plan")
current_step: int = state.get("current_step", 0)
status: str = state.get("status", "")
# Termination policies short-circuit everything else: a hit
# cost ceiling or wall-clock budget must beat the goal-satisfied
# check below so a run that overshoots its budget while
# incidentally finishing still surfaces the termination reason.
terminate_cmd = self._consult_termination(state)
if terminate_cmd is not None:
return terminate_cmd
if raw_goal is None:
return Command(
goto=END,
update={"status": "error", "replan_reason": "no goal specified"},
)
# ``MultiGoal`` dispatch: advance sub-goals or short-circuit to
# the single-goal logic for the remainder.
if isinstance(raw_goal, MultiGoal):
multi_cmd = _route_multi_goal(raw_goal, state, world_state)
if multi_cmd is not None:
return multi_cmd
goal = raw_goal.goals[state.get("current_subgoal_index", 0)]
else:
goal = raw_goal
if _is_goal_satisfied(goal, world_state):
return Command(goto=END, update={"status": "goal_achieved"})
limit_cmd = _route_replan_limit(goal, state)
if limit_cmd is not None:
return limit_cmd
if status == "no_plan":
return Command(goto=END, update={"status": "no_plan"})
never = goal.policy.replan_strategy == ReplanStrategy.NEVER
if status == "action_failed":
return self._route_action_failed(state, never)
if goal.policy.replan_strategy == ReplanStrategy.EVERY_ACTION:
return Command(
goto="planner",
update={"replan_reason": "every_action_replan"},
)
if (
not never
and plan_obj is not None
and current_step > 0
and current_step <= len(plan_obj.expected_states)
and goal.policy.replan_strategy == ReplanStrategy.ON_DEVIATION
):
deviation_cmd = self._route_deviation_check(
goal, plan_obj, world_state, current_step
)
if deviation_cmd is not None:
return deviation_cmd
if plan_obj is not None and current_step < len(plan_obj):
return Command(goto="executor")
if never:
return Command(
goto=END,
update={"status": "failed", "replan_reason": "plan_exhausted"},
)
return Command(
goto="planner",
update={"replan_reason": "plan_exhausted"},
)
def _route_action_failed(self, state: GoapState, never: bool) -> Command[str]:
"""Handle the ``action_failed`` status: blacklist + replan, or stop."""
if never:
# With NEVER, treat a failed action as a hard stop.
return Command(
goto=END,
update={"status": "failed", "replan_reason": "action_failed"},
)
failed_name = _get_last_failed_action(state)
if failed_name is None:
return Command(
goto="planner",
update={"replan_reason": "action_failed"},
)
counts = dict(state.get("action_failure_counts", {}))
counts[failed_name] = counts.get(failed_name, 0) + 1
blacklist = list(state.get("blacklisted_actions", []))
max_retries = _get_max_retries(failed_name, self._actions)
if counts[failed_name] > max_retries and failed_name not in blacklist:
blacklist.append(failed_name)
return Command(
goto="planner",
update={
"replan_reason": "action_failed",
"action_failure_counts": counts,
"blacklisted_actions": blacklist,
},
)
def _route_deviation_check(
self,
goal: GoalSpec,
plan_obj: Plan,
world_state: dict[str, Any],
current_step: int,
) -> Command[str] | None:
"""Return a replan command when planning-relevant state has deviated."""
# Compare only planning-relevant keys so that changes to execution
# context (documents, generations, etc.) don't trigger replanning.
pkeys = _planning_keys(self._actions, goal)
planning_state = PlanningState.from_dict(world_state, keys=pkeys)
expected_raw = plan_obj.expected_states[current_step - 1]
# Filter expected state to the same planning keys so
# manually-constructed plans (e.g. in tests) don't cause
# false deviations from non-planning keys.
expected = PlanningState.from_dict(expected_raw.to_dict(), keys=pkeys)
if planning_state != expected:
return Command(
goto="planner",
update={"replan_reason": "state_deviation"},
)
return None
__all__ = ["GoapObserver"]