"""Convenience builder for GOAP execution graphs.
GoapGraph assembles a LangGraph StateGraph with planner, executor,
and observer nodes wired together for the GOAP execution loop.
"""
from __future__ import annotations
import random
from typing import TYPE_CHECKING, Any, cast
from langchain_core.language_models import BaseChatModel
from langchain_core.runnables import RunnableConfig, RunnableLambda
from langgraph.checkpoint.base import BaseCheckpointSaver
from langgraph.graph import START, StateGraph
from langgraph.graph.state import CompiledStateGraph
from langgraph.types import Checkpointer
from langgoap.actions import ActionSpec
from langgoap.conditions import AsyncConditionResolver, ConditionResolver
from langgoap.goals import GoalSpec, MultiGoal
from langgoap.graph.nodes import GoapExecutor, GoapObserver, GoapPlanner
from langgoap.graph.state import GoapState
from langgoap.guards import ActionGuard, AsyncActionGuard
from langgoap.history import StoreExecutionHistory
from langgoap.planner.transitions import TransitionModel
from langgoap.sensors import AsyncSensor, Sensor
from langgoap.serde import install_langgoap_serde
from langgoap.tracing import PlanningTracer
if TYPE_CHECKING:
from langgoap.planner.strategy import PlanningStrategy
from langgoap.stuck import StuckHandler
from langgoap.termination import TerminationPolicy
[docs]
class GoapGraph:
"""Builder that produces a compiled LangGraph for GOAP execution.
Supports two usage styles:
**Explicit compile + invoke** (for power users who need the compiled graph)::
graph = GoapGraph(actions=[action1, action2, ...])
compiled = graph.compile(checkpointer=saver)
result = compiled.invoke({
"goal": GoalSpec(conditions={"done": True}),
"world_state": {"a": True},
})
**Convenience invoke** (single-shot, no persistence)::
graph = GoapGraph(actions=[action1, action2, ...])
result = graph.invoke(
goal=GoalSpec(conditions={"done": True}),
world_state={"a": True},
)
**Custom planning strategy**::
from langgoap.planner.strategy import LazyDecompositionStrategy
graph = GoapGraph(
actions=[action1, action2, ...],
strategy=LazyDecompositionStrategy(lookahead=2),
)
result = graph.invoke(goal=goal, world_state={})
The graph structure is::
START → planner → executor → observer ──→ END
↑ │
└─────────────────────┘
Args:
actions: The action library available to the planner and executor.
strategy: Optional :class:`~langgoap.planner.strategy.PlanningStrategy`.
Defaults to the A\\* strategy wired inside :class:`GoapPlanner`.
tracer: Optional :class:`~langgoap.tracing.PlanningTracer` for
planning/execution observability.
history: Optional :class:`~langgoap.history.StoreExecutionHistory`
for persisting execution traces.
sensors: Optional list of :class:`~langgoap.sensors.Sensor` /
:class:`~langgoap.sensors.AsyncSensor` run before planning.
guards: Optional list of :class:`~langgoap.guards.ActionGuard` /
:class:`~langgoap.guards.AsyncActionGuard` evaluated before each
action executes.
resolvers: Optional list of dynamic condition resolvers applied to
precondition evaluation.
record_expansions: Forwarded to :class:`GoapPlanner`; enables node
expansion trace recording for post-hoc analysis.
transition_model: Optional
:class:`~langgoap.planner.transitions.TransitionModel` threaded
into :class:`GoapExecutor`. When provided, actions with no
``execute`` callable (or those returning ``None``) have their
runtime effects drawn from ``transition_model.sample``. Use
the same model you passed to :class:`MCTSStrategy` so plan-time
and runtime share one noise distribution.
rng: Optional ``random.Random`` forwarded to the executor for
reproducible sampling.
"""
[docs]
def __init__(
self,
actions: list[ActionSpec],
*,
strategy: PlanningStrategy | None = None,
tracer: PlanningTracer | None = None,
history: StoreExecutionHistory | None = None,
sensors: list[Sensor | AsyncSensor] | None = None,
guards: list[ActionGuard | AsyncActionGuard] | None = None,
resolvers: list[ConditionResolver | AsyncConditionResolver] | None = None,
record_expansions: bool = False,
transition_model: TransitionModel | None = None,
rng: random.Random | None = None,
stuck_handlers: "list[StuckHandler] | None" = None,
max_stuck_iterations: int = 3,
termination_policies: "list[TerminationPolicy] | None" = None,
) -> None:
self.actions = actions
self._strategy = strategy
self._tracer = tracer
self._history = history
self._sensors = sensors
self._guards = guards
self._resolvers = resolvers
self._record_expansions = record_expansions
self._transition_model = transition_model
self._rng = rng
self._stuck_handlers = stuck_handlers
self._max_stuck_iterations = max_stuck_iterations
self._termination_policies = termination_policies
def compile(
self,
checkpointer: Checkpointer | None = None,
store: Any = None,
*,
interrupt_before: list[str] | None = None,
interrupt_after: list[str] | None = None,
) -> CompiledStateGraph:
"""Build and compile the GOAP StateGraph.
Args:
checkpointer: Optional LangGraph checkpointer for persistence.
When provided, the checkpointer's serde is swapped for a
LangGOAP-aware subclass (see :mod:`langgoap.serde`) so that
frozen dataclasses using ``MappingProxyType`` fields
round-trip through msgpack correctly.
store: Optional LangGraph store for shared state.
interrupt_before: Optional list of node names to interrupt
before. Forwarded to ``StateGraph.compile``. Use
``["executor"]`` to gate each action execution behind a
human-in-the-loop checkpoint boundary, then resume via
``compiled.invoke(None, config=...)``.
interrupt_after: Optional list of node names to interrupt
after. Forwarded to ``StateGraph.compile``.
Returns:
A compiled StateGraph ready for invocation.
"""
# Install LangGOAP's MappingProxyType-aware serde on the
# checkpointer so frozen dataclasses serialize cleanly. Skipped
# silently when no checkpointer is provided — no-ops don't need
# a custom serde because state never hits disk.
# isinstance narrows Checkpointer (= BaseCheckpointSaver | None |
# Literal[False]) to BaseCheckpointSaver, satisfying install_langgoap_serde's
# parameter type without a cast or type: ignore.
if isinstance(checkpointer, BaseCheckpointSaver):
# Harvest user-defined Pydantic form classes referenced from
# ActionSpec.require_human_approval so the LangGOAP serde's
# allowlist permits them through msgpack round-trip. Without
# this, the checkpointer would block deserialization and the
# form-binding gate would silently disappear after a resume.
extras: list[type] = []
for action in self.actions:
req = getattr(action, "require_human_approval", None)
if isinstance(req, type):
extras.append(req)
install_langgoap_serde(
checkpointer,
extra_allowed_types=tuple(extras),
)
builder = StateGraph(GoapState)
# All three nodes are wrapped with RunnableLambda so LangGraph
# dispatches to the async variant under ``ainvoke`` and the sync
# variant under ``invoke``. Without this wiring, the planner
# and observer would only ever see the sync ``__call__`` path
# and their async tracer hooks would never fire.
planner = GoapPlanner(
self.actions,
strategy=self._strategy,
tracer=self._tracer,
sensors=self._sensors,
resolvers=self._resolvers,
record_expansions=self._record_expansions,
stuck_handlers=self._stuck_handlers,
max_stuck_iterations=self._max_stuck_iterations,
)
executor = GoapExecutor(
tracer=self._tracer,
guards=self._guards,
transition_model=self._transition_model,
rng=self._rng,
actions=self.actions,
)
observer = GoapObserver(
self.actions,
tracer=self._tracer,
history=self._history,
termination_policies=self._termination_policies,
)
builder.add_node(
"planner",
RunnableLambda(func=planner.__call__, afunc=planner.acall),
)
builder.add_node(
"executor",
RunnableLambda(func=executor.__call__, afunc=executor.acall),
)
builder.add_node(
"observer",
RunnableLambda(func=observer.__call__, afunc=observer.acall),
)
# Wire edges
builder.add_edge(START, "planner")
builder.add_edge("planner", "executor")
builder.add_edge("executor", "observer")
# Observer uses Command(goto=...) for routing — no explicit edges needed
compile_kwargs: dict[str, Any] = {
"checkpointer": checkpointer,
"store": store,
}
if interrupt_before is not None:
compile_kwargs["interrupt_before"] = interrupt_before
if interrupt_after is not None:
compile_kwargs["interrupt_after"] = interrupt_after
return builder.compile(**compile_kwargs)
def invoke(
self,
goal: GoalSpec | MultiGoal,
world_state: dict[str, Any] | None = None,
config: RunnableConfig | None = None,
) -> GoapState:
"""Convenience method: compile and invoke the graph in one call.
Suitable for single-shot executions that don't need persistence or
time-travel. For repeated invocations with the same compiled graph,
use :meth:`compile` directly.
Args:
goal: The goal to achieve. May be a single :class:`GoalSpec`
or a :class:`MultiGoal` wrapping several sub-goals.
world_state: Initial world state (defaults to an empty dict).
config: Optional LangGraph run configuration
(e.g. ``{"configurable": {"thread_id": "..."}}``)
Returns:
The final :class:`~langgoap.graph.state.GoapState` after the
GOAP loop completes.
"""
compiled = self.compile()
input_state: GoapState = {
"goal": goal,
"world_state": world_state or {},
}
# The compiled graph's invoke() is typed as returning dict[str, Any];
# cast to GoapState because we own the graph's output schema entirely.
return cast(GoapState, compiled.invoke(input_state, config=config))
async def ainvoke(
self,
goal: GoalSpec | MultiGoal,
world_state: dict[str, Any] | None = None,
config: RunnableConfig | None = None,
) -> GoapState:
"""Async convenience method: compile and invoke the graph.
Identical to :meth:`invoke` but uses ``ainvoke`` on the compiled
graph, enabling native async execution of action callables.
Args:
goal: The goal to achieve. May be a single :class:`GoalSpec`
or a :class:`MultiGoal` wrapping several sub-goals.
world_state: Initial world state (defaults to an empty dict).
config: Optional LangGraph run configuration.
Returns:
The final :class:`~langgoap.graph.state.GoapState` after the
GOAP loop completes.
"""
compiled = self.compile()
input_state: GoapState = {
"goal": goal,
"world_state": world_state or {},
}
return cast(GoapState, await compiled.ainvoke(input_state, config=config))
def invoke_nl(
self,
request: str,
world_state: dict[str, Any] | None = None,
*,
llm: BaseChatModel,
config: RunnableConfig | None = None,
structured_output_kwargs: dict[str, Any] | None = None,
) -> GoapState:
"""Interpret a natural language request and execute the GOAP loop.
Convenience method that creates a :class:`~langgoap.interpreter.GoalInterpreter`,
converts the request to a :class:`~langgoap.goals.GoalSpec`, and invokes
the graph in one call.
Args:
request: Natural language description of the goal.
world_state: Initial world state (defaults to an empty dict).
llm: LangChain chat model for goal interpretation.
config: Optional LangGraph run configuration.
structured_output_kwargs: Forwarded to
:class:`~langgoap.interpreter.GoalInterpreter` and then to
``llm.with_structured_output()``. Use
``{"method": "function_calling"}`` for OpenAI when the
conditions dict must stay open-ended.
Returns:
The final :class:`~langgoap.graph.state.GoapState` after the
GOAP loop completes.
"""
from langgoap.interpreter import (
GoalInterpreter,
_default_structured_output_kwargs,
)
if structured_output_kwargs is None:
structured_output_kwargs = _default_structured_output_kwargs(llm)
interpreter = GoalInterpreter(
llm=llm,
actions=self.actions,
structured_output_kwargs=structured_output_kwargs,
)
goal = interpreter.interpret(request, world_state=world_state)
return self.invoke(goal=goal, world_state=world_state, config=config)
async def ainvoke_nl(
self,
request: str,
world_state: dict[str, Any] | None = None,
*,
llm: BaseChatModel,
config: RunnableConfig | None = None,
structured_output_kwargs: dict[str, Any] | None = None,
) -> GoapState:
"""Async variant of :meth:`invoke_nl`.
Args:
request: Natural language description of the goal.
world_state: Initial world state (defaults to an empty dict).
llm: LangChain chat model for goal interpretation.
config: Optional LangGraph run configuration.
structured_output_kwargs: Forwarded to
:class:`~langgoap.interpreter.GoalInterpreter` and then to
``llm.with_structured_output()``. Use
``{"method": "function_calling"}`` for OpenAI when the
conditions dict must stay open-ended.
Returns:
The final :class:`~langgoap.graph.state.GoapState` after the
GOAP loop completes.
"""
from langgoap.interpreter import (
GoalInterpreter,
_default_structured_output_kwargs,
)
if structured_output_kwargs is None:
structured_output_kwargs = _default_structured_output_kwargs(llm)
interpreter = GoalInterpreter(
llm=llm,
actions=self.actions,
structured_output_kwargs=structured_output_kwargs,
)
goal = await interpreter.ainterpret(request, world_state=world_state)
return await self.ainvoke(goal=goal, world_state=world_state, config=config)