Source code for langgoap.graph.nodes.executor

"""LangGraph executor node \u2014 runs the current action in the plan."""

from __future__ import annotations

import asyncio
import inspect
import random
import time
from typing import Any

from langgoap.actions import ActionSpec
from langgoap.graph.nodes._execution import (
    _apply_result,
    _build_failure,
    _build_guard_block,
    _build_success,
    _check_human_approval,
    _prepare_execution,
)
from langgoap.graph.nodes._helpers import logger
from langgoap.graph.state import GoapState
from langgoap.guards import (
    ActionGuard,
    AsyncActionGuard,
    has_blocking_failure,
    run_guards_async,
    run_guards_sync,
)
from langgoap.planner.transitions import TransitionModel
from langgoap.tracing import NullTracer, PlanningTracer, SafeTracerProxy


[docs] class GoapExecutor: """LangGraph node that executes the current action in the plan. Runs ``plan.actions[current_step]``, updates world_state with the action's effects, and appends to execution_history. Both sync (:meth:`__call__`) and async (:meth:`acall`) paths share the same pre-condition checks, result building, and failure handling via the module-level helpers :func:`_prepare_execution`, :func:`_apply_result`, :func:`_build_success`, and :func:`_build_failure`. Args: tracer: Optional :class:`PlanningTracer` invoked around each action execution. guards: Optional list of :class:`~langgoap.guards.ActionGuard` or :class:`~langgoap.guards.AsyncActionGuard` objects evaluated *before* each action executes. A ``WARN``-severity failure is logged but execution continues. A ``BLOCK``-severity failure aborts the action and immediately blacklists it (equivalent to ``max_retries + 1`` failures), triggering replanning via the observer. transition_model: Optional :class:`~langgoap.planner.transitions.TransitionModel` that drives runtime effects when an action's ``execute`` callable returns ``None`` (or is absent). Dict returns from ``execute`` remain authoritative. Pair with a seeded ``rng`` for reproducible stochastic rollouts. rng: Optional ``random.Random`` forwarded to ``transition_model.sample``. When omitted, a single ``random.Random()`` is created at construction time and reused for every action \u2014 callers who want true reproducibility should pass a seeded instance. """
[docs] def __init__( self, *, tracer: PlanningTracer | None = None, guards: list[ActionGuard | AsyncActionGuard] | None = None, transition_model: TransitionModel | None = None, rng: random.Random | None = None, actions: list[ActionSpec] | None = None, ) -> None: # Wrap the tracer in SafeTracerProxy at construction so call # sites can invoke hooks directly (``self._tracer.on_X(...)``) # without per-call try/except scaffolding. Idempotent if the # caller already passed a SafeTracerProxy. self._tracer: PlanningTracer = SafeTracerProxy(tracer or NullTracer()) self._guards: list[ActionGuard | AsyncActionGuard] = guards or [] self._transition_model: TransitionModel | None = transition_model self._rng: random.Random = rng if rng is not None else random.Random() # Optional source-of-truth action map for rebinding fields the # serializer cannot round-trip (callable execute, Pydantic # type references on require_human_approval, human_input_key # strings, qos). Restored by :meth:`_rebind` before each # action runs. self._actions: dict[str, ActionSpec] = ( {a.name: a for a in actions} if actions else {} )
def _rebind(self, action: ActionSpec) -> ActionSpec: """Restore the live ``ActionSpec`` for ``action.name`` if known. Falls back to the supplied ``action`` when the executor was constructed without an action list (the legacy case). Mirrors the planner-side rebind for fields the serializer drops on the way through msgpack. """ return self._actions.get(action.name, action) def __call__(self, state: GoapState) -> dict[str, Any]: prep = _prepare_execution(state) if isinstance(prep, dict): return prep world_state, plan_obj, current_step, action = prep action = self._rebind(action) logger.info( "Executing action %r (step %d/%d)", action.name, current_step + 1, len(plan_obj), ) self._tracer.on_action_start(action, world_state) state_before = dict(world_state) # GuardRails: run all registered guards before executing. if self._guards: guard_results = run_guards_sync(self._guards, action, world_state) if has_blocking_failure(guard_results): result = _build_guard_block( action, guard_results, state_before, world_state ) self._tracer.on_action_complete(result) return result # Human-in-the-loop gate: interrupt() raises GraphInterrupt on # first pass (checkpointer persists state); returns the resume # payload on the second pass. denial = _check_human_approval(action, world_state, state_before) if denial is not None: self._tracer.on_action_complete(denial) return denial try: raw = _execute_with_retries_sync( action, world_state, tracer=self._tracer, rng=self._rng, ) _apply_result( raw, action, world_state, transition_model=self._transition_model, rng=self._rng, ) result = _build_success(action, current_step, state_before, world_state) except Exception as e: result = _build_failure(action, e, state_before, world_state) self._tracer.on_action_complete(result) return result async def acall(self, state: GoapState) -> dict[str, Any]: """Async variant of the executor node. Uses :func:`async_execute_action` to resolve the best callable: 1. ``action.aexecute`` (explicit async callable) 2. ``action.execute`` if it is a coroutine function 3. ``action.execute`` via ``loop.run_in_executor`` (sync in thread) 4. No callable \u2014 declared effects applied """ prep = _prepare_execution(state) if isinstance(prep, dict): return prep world_state, plan_obj, current_step, action = prep action = self._rebind(action) logger.info( "Executing action %r async (step %d/%d)", action.name, current_step + 1, len(plan_obj), ) await self._tracer.aon_action_start(action, world_state) state_before = dict(world_state) if self._guards: guard_results = await run_guards_async(self._guards, action, world_state) if has_blocking_failure(guard_results): result = _build_guard_block( action, guard_results, state_before, world_state ) await self._tracer.aon_action_complete(result) return result denial = _check_human_approval(action, world_state, state_before) if denial is not None: await self._tracer.aon_action_complete(denial) return denial try: raw = await _execute_with_retries_async( action, world_state, tracer=self._tracer, rng=self._rng, ) _apply_result( raw, action, world_state, transition_model=self._transition_model, rng=self._rng, ) result = _build_success(action, current_step, state_before, world_state) except Exception as e: result = _build_failure(action, e, state_before, world_state) await self._tracer.aon_action_complete(result) return result
def _execute_with_retries_sync( action: ActionSpec, world_state: dict[str, Any], *, tracer: PlanningTracer, rng: random.Random | None, ) -> Any: """Run ``action`` with the sync retry loop dictated by ``action.qos``. When ``action.qos`` is ``None`` this is a single attempt — preserving the legacy non-retry semantics. Otherwise the executor re-invokes the action up to ``qos.max_attempts`` times, sleeping ``qos.compute_backoff_ms(attempt)`` between attempts and emitting ``on_action_retry`` before each sleep. Returns the raw result of the successful invocation, or re-raises the last exception when every attempt is exhausted. """ if action.aexecute is not None and action.execute is None: raise RuntimeError( f"Action {action.name!r} is async-only (aexecute is set, " "execute is None). Use GoapGraph.ainvoke() or " "compiled.ainvoke() to run async actions." ) if action.execute is not None and inspect.iscoroutinefunction(action.execute): raise RuntimeError( f"Action {action.name!r} has an async execute callable. " "Use GoapGraph.ainvoke() or compiled.ainvoke() instead." ) qos = action.qos max_attempts = qos.max_attempts if qos is not None else 1 backoff_rng = rng if rng is not None else random.Random() last_exc: BaseException | None = None for attempt in range(1, max_attempts + 1): try: if action.execute is not None: return action.execute(world_state) return None except Exception as exc: last_exc = exc if qos is None or not qos.should_retry(exc, attempt): raise backoff_ms = qos.compute_backoff_ms(attempt, rng=backoff_rng) tracer.on_action_retry(action, attempt, exc, backoff_ms) time.sleep(backoff_ms / 1000.0) assert last_exc is not None # pragma: no cover - guarded by ActionQos validator raise last_exc async def _execute_with_retries_async( action: ActionSpec, world_state: dict[str, Any], *, tracer: PlanningTracer, rng: random.Random | None, ) -> Any: """Async sibling of :func:`_execute_with_retries_sync`.""" qos = action.qos max_attempts = qos.max_attempts if qos is not None else 1 backoff_rng = rng if rng is not None else random.Random() last_exc: BaseException | None = None for attempt in range(1, max_attempts + 1): try: return await async_execute_action(action, world_state) except Exception as exc: last_exc = exc if qos is None or not qos.should_retry(exc, attempt): raise backoff_ms = qos.compute_backoff_ms(attempt, rng=backoff_rng) await tracer.aon_action_retry(action, attempt, exc, backoff_ms) await asyncio.sleep(backoff_ms / 1000.0) assert last_exc is not None # pragma: no cover - guarded by ActionQos validator raise last_exc async def async_execute_action(action: ActionSpec, world_state: dict[str, Any]) -> Any: """Resolve and await the best async callable for *action*. Resolution priority: 1. ``action.aexecute`` \u2014 explicit async callable (highest priority) 2. ``action.execute`` if it is a coroutine function 3. ``action.execute`` run in the default thread-pool executor (sync \u2192 async) 4. No callable \u2014 returns ``None``; caller applies declared effects This is a public helper so that advanced callers (e.g. custom executors, test utilities) can reuse the resolution logic without duplicating it. """ if action.aexecute is not None: return await action.aexecute(world_state) if action.execute is not None: if inspect.iscoroutinefunction(action.execute): return await action.execute(world_state) loop = asyncio.get_running_loop() return await loop.run_in_executor(None, action.execute, world_state) return None __all__ = ["GoapExecutor", "async_execute_action"]