Source code for langgoap.tracing.multi

"""Fan-out :class:`~langgoap.tracing.PlanningTracer` composer."""

from __future__ import annotations

import logging
from typing import Any

from langgoap.tracing._protocol import PlanningTracer

logger = logging.getLogger("langgoap.tracing")


[docs] class MultiTracer: """Fan a single event out to multiple tracers. Each inner tracer is invoked in sequence; exceptions raised by an individual tracer are caught and logged so one broken tracer cannot break the planner or stop other tracers from receiving the event. """
[docs] def __init__(self, tracers: list[PlanningTracer]) -> None: self._tracers = list(tracers)
@property def reflections(self) -> list[Any]: """Aggregate reflections from every inner tracer that exposes them. Enables :class:`~langgoap.graph.nodes.GoapPlanner` to surface reflection context into :class:`~langgoap.graph.state.GoapState` transparently when a :class:`~langgoap.reflexion.ReflexionTracer` is composed inside a ``MultiTracer``. """ result: list[Any] = [] for t in self._tracers: result.extend(getattr(t, "reflections", [])) return result def _fan_sync(self, method: str, *args: Any) -> None: for t in self._tracers: try: getattr(t, method)(*args) except Exception as exc: logger.warning( "Tracer %r raised during %s: %s", type(t).__name__, method, exc, ) async def _fan_async(self, method: str, *args: Any) -> None: for t in self._tracers: try: await getattr(t, method)(*args) except Exception as exc: logger.warning( "Tracer %r raised during %s: %s", type(t).__name__, method, exc, ) def on_plan_start(self, goal: Any, state: Any, strategy_name: str) -> None: self._fan_sync("on_plan_start", goal, state, strategy_name) def on_plan_complete(self, plan: Any, duration_ms: float) -> None: self._fan_sync("on_plan_complete", plan, duration_ms) def on_plan_failed(self, reason: str, duration_ms: float) -> None: self._fan_sync("on_plan_failed", reason, duration_ms) def on_action_start(self, action: Any, state: Any) -> None: self._fan_sync("on_action_start", action, state) def on_action_complete(self, result: Any) -> None: self._fan_sync("on_action_complete", result) def on_action_retry( self, action: Any, attempt: int, exception: BaseException, backoff_ms: float, ) -> None: self._fan_sync("on_action_retry", action, attempt, exception, backoff_ms) def on_strategy_chosen(self, strategy_name: str) -> None: self._fan_sync("on_strategy_chosen", strategy_name) def on_replan(self, reason: str, new_plan: Any) -> None: self._fan_sync("on_replan", reason, new_plan) def on_goal_achieved(self, final_state: Any) -> None: self._fan_sync("on_goal_achieved", final_state) def on_sensor_complete(self, sensor_name: str, updates: Any) -> None: self._fan_sync("on_sensor_complete", sensor_name, updates) def on_search_expand( self, node_id: int, state: Any, g: float, h: float, f: float, parent_id: int | None, action_name: str | None, ) -> None: self._fan_sync( "on_search_expand", node_id, state, g, h, f, parent_id, action_name ) def on_search_dead_end(self, reason: str, detail: dict[str, Any]) -> None: self._fan_sync("on_search_dead_end", reason, detail) def on_search_complete( self, nodes_explored: int, duration_ms: float, found: bool ) -> None: self._fan_sync("on_search_complete", nodes_explored, duration_ms, found) async def aon_plan_start(self, goal: Any, state: Any, strategy_name: str) -> None: await self._fan_async("aon_plan_start", goal, state, strategy_name) async def aon_plan_complete(self, plan: Any, duration_ms: float) -> None: await self._fan_async("aon_plan_complete", plan, duration_ms) async def aon_plan_failed(self, reason: str, duration_ms: float) -> None: await self._fan_async("aon_plan_failed", reason, duration_ms) async def aon_action_start(self, action: Any, state: Any) -> None: await self._fan_async("aon_action_start", action, state) async def aon_action_complete(self, result: Any) -> None: await self._fan_async("aon_action_complete", result) async def aon_action_retry( self, action: Any, attempt: int, exception: BaseException, backoff_ms: float, ) -> None: await self._fan_async( "aon_action_retry", action, attempt, exception, backoff_ms ) async def aon_strategy_chosen(self, strategy_name: str) -> None: await self._fan_async("aon_strategy_chosen", strategy_name) async def aon_replan(self, reason: str, new_plan: Any) -> None: await self._fan_async("aon_replan", reason, new_plan) async def aon_goal_achieved(self, final_state: Any) -> None: await self._fan_async("aon_goal_achieved", final_state) async def aon_sensor_complete(self, sensor_name: str, updates: Any) -> None: await self._fan_async("aon_sensor_complete", sensor_name, updates) async def aon_search_expand( self, node_id: int, state: Any, g: float, h: float, f: float, parent_id: int | None, action_name: str | None, ) -> None: await self._fan_async( "aon_search_expand", node_id, state, g, h, f, parent_id, action_name ) async def aon_search_dead_end(self, reason: str, detail: dict[str, Any]) -> None: await self._fan_async("aon_search_dead_end", reason, detail) async def aon_search_complete( self, nodes_explored: int, duration_ms: float, found: bool ) -> None: await self._fan_async("aon_search_complete", nodes_explored, duration_ms, found)
__all__ = ["MultiTracer"]