Source code for langgoap.history

"""Execution history persistence on top of LangGraph ``BaseStore``.

Ships the long-deferred ``DESIGN.md`` §14.4.  ``StoreExecutionHistory``
records one :class:`ExecutionRecord` per planner-executor loop
completion and answers two queries:

* ``query_by_goal(goal_hash, limit)`` — most recent executions for a
  given goal
* ``query_failures(action_name)`` — recent executions that ended in
  ``"failed"`` and included ``action_name`` in the plan

Both queries and their async counterparts use **only** ``store.get()``
/ ``store.aget()``.  ``BaseStore.search()`` is *deliberately not
called* because ``AsyncPostgresStore`` and ``RedisStore`` interpret
``search`` as a vector-similarity query that requires an embedder —
forcing every history user to configure one would be unacceptable.

The implementation instead maintains two reverse indexes:

* goal index at ``("langgoap", "goal_index", goal_hash)`` → bounded
  newest-first list of execution UUIDs
* failure index at ``("langgoap", "failure_index", action_name)`` →
  same structure, populated only when ``outcome == "failed"``

Both indexes are bounded (``index_limit``, default 100) so their read
cost stays constant and their write cost is a single ``get`` + single
``put``.  Older IDs drop off the index but the underlying records are
left in the primary namespace, reachable via direct UUID lookup.

Trade-offs accepted for v0.1.0:

* **Read amplification** — two ``get()`` calls per returned record.
  Execution history is diagnostic, not a hot path.
* **Write amplification** — one primary write plus one or two index
  writes per record.
* **Race conditions** — concurrent writers to the same ``goal_hash``
  can lose an ID if they read-modify-write the index simultaneously.
  ``GoapObserver`` records exactly once at the end of the loop so the
  race window is small; a transactional index is post-v0.1.0.
"""

from __future__ import annotations

import hashlib
import json
import uuid
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Mapping

from langgraph.store.base import BaseStore

from langgoap.goals import GoalSpec

_EXECUTIONS_NS = ("langgoap", "executions")
_GOAL_INDEX_NS = ("langgoap", "goal_index")
_FAILURE_INDEX_NS = ("langgoap", "failure_index")


[docs] def compute_goal_hash(goal: GoalSpec) -> str: """Return a stable short hash of a goal's conditions. The hash is deterministic across runs (unlike Python's built-in ``hash``) and short enough to read in logs. It is the same key used by :class:`StoreExecutionHistory` to file records under its goal reverse-index, so callers can look up the history of a goal by passing the result to :meth:`StoreExecutionHistory.query_by_goal`. Example:: history = StoreExecutionHistory(store) records = history.query_by_goal(compute_goal_hash(goal)) """ items = sorted((str(k), v) for k, v in goal.conditions.items()) canonical = json.dumps(items, default=str, sort_keys=True) return hashlib.sha256(canonical.encode()).hexdigest()[:16]
[docs] @dataclass(frozen=True, slots=True) class ExecutionRecord: """One planner → executor → observer loop outcome. Attributes: goal_hash: Stable identifier for the goal (user-chosen or derived from ``GoalSpec``). goal_conditions: The goal's target condition dict, stored for later diagnostics. plan_actions: Tuple of action names in execution order. expected_cost: The plan's ``total_cost`` at planning time. actual_cost: Realised cost after execution. Equal to ``expected_cost`` for purely deterministic runs; may differ when dynamic costs change mid-execution. outcome: One of ``"success"``, ``"failed"``, ``"partial"``. Open string so new tracer implementations can introduce extra states without churning the schema. replan_count: Number of replans that happened during execution. timestamp: UTC timestamp at record creation. metadata: Optional free-form dict for tracer-specific extras (token counts, provider, model name, etc.). Never required by the indexing code. """ goal_hash: str goal_conditions: Mapping[str, Any] plan_actions: tuple[str, ...] expected_cost: float actual_cost: float outcome: str replan_count: int timestamp: datetime metadata: Mapping[str, Any] = field(default_factory=dict) def to_dict(self) -> dict[str, Any]: """Serialise to a plain ``dict`` suitable for ``BaseStore.put``.""" return { "goal_hash": self.goal_hash, "goal_conditions": dict(self.goal_conditions), "plan_actions": list(self.plan_actions), "expected_cost": self.expected_cost, "actual_cost": self.actual_cost, "outcome": self.outcome, "replan_count": self.replan_count, "timestamp": self.timestamp.isoformat(), "metadata": dict(self.metadata), } @classmethod def from_dict(cls, data: Mapping[str, Any]) -> ExecutionRecord: """Deserialise from the format produced by :meth:`to_dict`.""" return cls( goal_hash=data["goal_hash"], goal_conditions=dict(data.get("goal_conditions", {})), plan_actions=tuple(data.get("plan_actions", ())), expected_cost=float(data["expected_cost"]), actual_cost=float(data["actual_cost"]), outcome=data["outcome"], replan_count=int(data.get("replan_count", 0)), timestamp=datetime.fromisoformat(data["timestamp"]), metadata=dict(data.get("metadata", {})), )
def _item_value(item: Any) -> Any: """Unwrap ``BaseStore.get`` / ``aget`` return values. LangGraph's store protocol wraps values in a ``SearchItem`` / ``Item`` object with a ``.value`` attribute. Some alternate implementations (and older versions) return the raw value directly. Normalise both shapes. """ if item is None: return None return getattr(item, "value", item)
[docs] class StoreExecutionHistory: """Execution history backed by a LangGraph :class:`BaseStore`. Args: store: Any ``BaseStore`` implementation. ``get``/``put`` are the only methods invoked (plus ``aget``/``aput`` on the async path). ``search()`` is **never** called — see module docstring for the rationale. index_limit: Maximum number of execution IDs kept in a single reverse-index entry. Defaults to 100 which is enough for most diagnostic use cases without incurring large per-write overhead. Example:: store = InMemoryStore() history = StoreExecutionHistory(store) history.record(ExecutionRecord(...)) recent = history.query_by_goal("goal_abc", limit=5) """
[docs] def __init__(self, store: BaseStore, *, index_limit: int = 100) -> None: self._store = store self._index_limit = index_limit
@staticmethod def goal_hash_for(goal: GoalSpec) -> str: """Return the goal-index key this history would file ``goal`` under. Thin wrapper around :func:`compute_goal_hash` exposed as an instance-accessible helper so callers can write ``history.query_by_goal(history.goal_hash_for(goal))`` without importing the module-level function explicitly. """ return compute_goal_hash(goal) # ------------------------------------------------------------------ # Sync API # ------------------------------------------------------------------ def record(self, record: ExecutionRecord) -> None: """Persist ``record`` and update the reverse indexes.""" execution_id = uuid.uuid4().hex self._store.put(_EXECUTIONS_NS, execution_id, record.to_dict()) self._push_id(_GOAL_INDEX_NS, record.goal_hash, execution_id) if record.outcome == "failed": for action_name in record.plan_actions: self._push_id(_FAILURE_INDEX_NS, action_name, execution_id) def query_by_goal(self, goal_hash: str, limit: int = 10) -> list[ExecutionRecord]: """Return up to ``limit`` most-recent records for ``goal_hash``.""" ids = self._read_ids(_GOAL_INDEX_NS, goal_hash) return self._fetch_records(ids, limit) def query_failures( self, action_name: str, limit: int = 10 ) -> list[ExecutionRecord]: """Return up to ``limit`` most-recent failed runs that touched ``action_name``.""" ids = self._read_ids(_FAILURE_INDEX_NS, action_name) return self._fetch_records(ids, limit) # ------------------------------------------------------------------ # Async API (NL2: full parity with the sync methods) # ------------------------------------------------------------------ async def arecord(self, record: ExecutionRecord) -> None: """Async variant of :meth:`record`.""" execution_id = uuid.uuid4().hex await self._store.aput(_EXECUTIONS_NS, execution_id, record.to_dict()) await self._apush_id(_GOAL_INDEX_NS, record.goal_hash, execution_id) if record.outcome == "failed": for action_name in record.plan_actions: await self._apush_id(_FAILURE_INDEX_NS, action_name, execution_id) async def aquery_by_goal( self, goal_hash: str, limit: int = 10 ) -> list[ExecutionRecord]: """Async variant of :meth:`query_by_goal`.""" ids = await self._aread_ids(_GOAL_INDEX_NS, goal_hash) return await self._afetch_records(ids, limit) async def aquery_failures( self, action_name: str, limit: int = 10 ) -> list[ExecutionRecord]: """Async variant of :meth:`query_failures`.""" ids = await self._aread_ids(_FAILURE_INDEX_NS, action_name) return await self._afetch_records(ids, limit) # ------------------------------------------------------------------ # Internal index helpers # ------------------------------------------------------------------ def _read_ids(self, namespace: tuple[str, ...], key: str) -> list[str]: item = self._store.get(namespace, key) value = _item_value(item) if not value: return [] return list(value.get("execution_ids", [])) async def _aread_ids(self, namespace: tuple[str, ...], key: str) -> list[str]: item = await self._store.aget(namespace, key) value = _item_value(item) if not value: return [] return list(value.get("execution_ids", [])) def _bounded_id_list(self, ids: list[str], execution_id: str) -> list[str]: """Prepend ``execution_id`` and truncate to the index-limit window. Shared between sync and async push paths so the bounding logic (most-recent-first ordering, ``index_limit`` cap) cannot diverge across the two surfaces. """ ids.insert(0, execution_id) if len(ids) > self._index_limit: ids = ids[: self._index_limit] return ids def _push_id(self, namespace: tuple[str, ...], key: str, execution_id: str) -> None: ids = self._bounded_id_list(self._read_ids(namespace, key), execution_id) self._store.put(namespace, key, {"execution_ids": ids}) async def _apush_id( self, namespace: tuple[str, ...], key: str, execution_id: str ) -> None: ids = self._bounded_id_list(await self._aread_ids(namespace, key), execution_id) await self._store.aput(namespace, key, {"execution_ids": ids}) def _fetch_records(self, ids: list[str], limit: int) -> list[ExecutionRecord]: results: list[ExecutionRecord] = [] for execution_id in ids: if len(results) >= limit: break item = self._store.get(_EXECUTIONS_NS, execution_id) value = _item_value(item) if value is None: continue results.append(ExecutionRecord.from_dict(value)) return results async def _afetch_records( self, ids: list[str], limit: int ) -> list[ExecutionRecord]: results: list[ExecutionRecord] = [] for execution_id in ids: if len(results) >= limit: break item = await self._store.aget(_EXECUTIONS_NS, execution_id) value = _item_value(item) if value is None: continue results.append(ExecutionRecord.from_dict(value)) return results
__all__ = ["ExecutionRecord", "StoreExecutionHistory", "compute_goal_hash"]