"""Goal and constraint specifications for GOAP planning."""
from __future__ import annotations
from dataclasses import dataclass, field
from types import MappingProxyType
from typing import TYPE_CHECKING, Any, Callable, Literal, Mapping, Sequence
from langgoap.types import ObjectiveDirection, ReplanStrategy
if TYPE_CHECKING:
from langgoap.constraints import BuilderOutput
from langgoap.planner.metrics import PlanQualityMetric
[docs]
@dataclass(frozen=True, slots=True)
class ConstraintSpec:
"""A resource/budget constraint for CSP optimization.
Constraints can be ``"hard"`` (the default) or ``"soft"``. A hard
constraint violation marks the plan ``INFEASIBLE``; a soft violation
contributes a penalty to the plan's :class:`~langgoap.score.HardSoftScore`
but does not disqualify the plan.
Attributes:
key: The resource key this constraint applies to
(e.g. ``"total_tokens"``, ``"cost_usd"``).
max: Upper bound. ``None`` means no upper bound.
min: Lower bound. ``None`` means no lower bound.
weight: Penalty weight per unit of violation (default 1.0).
Used by the CSP optimizer and the Score hierarchy.
level: ``"hard"`` (default) or ``"soft"``. Hard constraints
mark the plan infeasible on violation; soft constraints
subtract ``weight * violation_amount`` from the plan's
soft score.
"""
key: str
max: float | None = None
min: float | None = None
weight: float = 1.0
level: Literal["hard", "soft"] = "hard"
def __post_init__(self) -> None:
if self.min is not None and self.max is not None and self.min > self.max:
raise ValueError(
f"ConstraintSpec(key={self.key!r}): min ({self.min}) must be "
f"<= max ({self.max})."
)
if self.level not in ("hard", "soft"):
raise ValueError(
f"ConstraintSpec(key={self.key!r}): level must be "
f"'hard' or 'soft', got {self.level!r}."
)
@dataclass(frozen=True, slots=True)
class SoftGoal:
"""An optional goal condition with a utility weight (oversubscription planning).
Hard goal conditions (:attr:`GoalSpec.conditions`) *must* be satisfied for
a plan to be valid. Soft goals are optional — the planner prefers plans
that achieve them but will not fail if they cannot be satisfied.
Attributes:
conditions: Target conditions for this soft goal. Accepted as a
plain ``dict``; silently wrapped in :class:`~types.MappingProxyType`.
weight: Non-negative utility weight (default ``1.0``). Higher values
make this goal more valuable; the planner maximises the total
achieved weight across all soft goals when selecting among plans.
name: Optional human-readable identifier used in logging and
:class:`~langgoap.planner.csp.CSPMetadata` objective keys.
Defaults to ``str(conditions)`` when empty.
"""
conditions: MappingProxyType[str, Any] = field(
default_factory=lambda: MappingProxyType({})
)
weight: float = 1.0
name: str = ""
def __post_init__(self) -> None:
if not isinstance(self.conditions, MappingProxyType):
object.__setattr__(
self, "conditions", MappingProxyType(dict(self.conditions))
)
if self.weight < 0:
raise ValueError(f"SoftGoal.weight must be >= 0, got {self.weight}")
@property
def label(self) -> str:
"""Return name if set, else a compact conditions string."""
return self.name or str(dict(self.conditions))
@dataclass(frozen=True, slots=True)
class GoalPolicy:
"""Replanning + multi-goal-priority policy for a :class:`GoalSpec`.
Grouping these three orthogonal-to-the-conditions knobs into a
sub-dataclass keeps the :class:`GoalSpec` constructor focused on
*what* the goal is, not *how* the planner should chase it.
Attributes:
replan_strategy: When the observer should trigger replanning
during execution.
priority: Multi-goal scenarios use this to break ties when
:class:`MultiGoal` chooses the next sub-goal. Higher = more
important.
max_replans: Maximum number of replanning cycles before the
observer gives up. Guards against infinite replan loops
when an action keeps failing or the world state never
converges. Set to ``0`` to disable the limit.
"""
replan_strategy: ReplanStrategy = ReplanStrategy.ON_DEVIATION
priority: int = 0
max_replans: int = 10
def __post_init__(self) -> None:
if self.max_replans < 0:
raise ValueError("max_replans must be >= 0 (0 disables the limit)")
[docs]
@dataclass(frozen=True, slots=True)
class GoalSpec:
"""Specification of a GOAP goal.
Attributes:
conditions: Required world state for the goal to be satisfied.
Stored as an immutable MappingProxyType.
policy: :class:`GoalPolicy` bundling the replanning strategy,
multi-goal priority, and replan budget. Defaults to
``GoalPolicy()`` (ON_DEVIATION, priority 0, max_replans 10).
objectives: Optional optimization objectives for the CSP phase.
Maps resource/metric name to an :class:`~langgoap.types.ObjectiveDirection`.
Example: ``{"cost_usd": Minimize, "quality": Maximize}``.
constraints: Optional hard resource/budget constraints for the CSP phase.
soft_goals: Optional soft (penalty-weighted) goal predicates.
metrics: Optional plan-quality metrics consumed by the CSP phase.
"""
conditions: MappingProxyType[str, Any] = field(
default_factory=lambda: MappingProxyType({})
)
policy: GoalPolicy = field(default_factory=GoalPolicy)
objectives: MappingProxyType[str, ObjectiveDirection] | None = None
constraints: tuple[ConstraintSpec, ...] = field(default_factory=tuple)
soft_goals: tuple[SoftGoal, ...] = field(default_factory=tuple)
metrics: tuple["PlanQualityMetric", ...] = field(default_factory=tuple)
value: float | Callable[[Mapping[str, Any]], float] = 1.0
"""Utility weight consulted by ``MultiGoal(mode='best_value')``.
Either a static float or a callable resolved against world state.
Defaults to ``1.0`` so existing code is unaffected.
"""
def __post_init__(self) -> None:
# Accept plain dicts from callers and silently wrap conditions.
object.__setattr__(
self,
"conditions",
MappingProxyType(dict(self.conditions)),
)
# Normalise objectives: plain dict → MappingProxyType, empty → None.
if self.objectives is not None:
obj_dict = dict(self.objectives)
if not obj_dict:
# Empty objectives dict is semantically equivalent to no objectives;
# normalise to None so _needs_csp() and CSP checks behave correctly.
object.__setattr__(self, "objectives", None)
elif not isinstance(self.objectives, MappingProxyType):
object.__setattr__(self, "objectives", MappingProxyType(obj_dict))
# Normalise constraints: accept list or tuple from callers.
if not isinstance(self.constraints, tuple):
object.__setattr__(self, "constraints", tuple(self.constraints))
# Normalise soft_goals: accept list or tuple from callers.
if not isinstance(self.soft_goals, tuple):
object.__setattr__(self, "soft_goals", tuple(self.soft_goals))
# Normalise metrics: accept list or tuple from callers.
if not isinstance(self.metrics, tuple):
object.__setattr__(self, "metrics", tuple(self.metrics))
def __repr__(self) -> str:
parts = [f"conditions={dict(self.conditions)!r}"]
if self.policy != GoalPolicy():
parts.append(f"policy={self.policy!r}")
if self.objectives:
parts.append(f"objectives={dict(self.objectives)!r}")
if self.constraints:
parts.append(f"constraints={self.constraints!r}")
if self.soft_goals:
parts.append(f"soft_goals={self.soft_goals!r}")
return f"GoalSpec({', '.join(parts)})"
@classmethod
def from_builder(
cls,
conditions: dict[str, Any] | None = None,
builder_output: BuilderOutput | None = None,
**kwargs: Any,
) -> GoalSpec:
"""Construct a GoalSpec from a :class:`~langgoap.constraints.BuilderOutput`.
Args:
conditions: Goal conditions. Forwarded to the ``conditions``
field.
builder_output: Output of
:meth:`~langgoap.constraints.ConstraintBuilder.build`.
``None`` produces a GoalSpec with no constraints or
objectives, equivalent to ``GoalSpec(conditions=...)``.
**kwargs: Any other ``GoalSpec`` fields (e.g.
``policy=GoalPolicy(...)``, ``soft_goals=...``,
``metrics=...``).
Returns:
A new ``GoalSpec`` with the builder's constraints and
objectives wired into the standard fields.
"""
constraints: tuple[ConstraintSpec, ...] = ()
objectives: MappingProxyType[str, ObjectiveDirection] | None = None
if builder_output is not None:
constraints = builder_output.constraints
if builder_output.objectives:
objectives = builder_output.objectives
return cls(
conditions=MappingProxyType(dict(conditions or {})),
constraints=constraints,
objectives=objectives,
**kwargs,
)
@classmethod
def per_entity(
cls,
entity_ids: Sequence[str],
conditions: Mapping[str, Any],
*,
mode: Literal["sequential", "any"] = "any",
**goal_kwargs: Any,
) -> "MultiGoal":
"""Build a :class:`MultiGoal` with one :class:`GoalSpec` per entity.
The ``conditions`` mapping is treated as a template: every
string key and every string value is formatted with
:py:meth:`str.format`, substituting the placeholder
``{entity}`` with each entity id in turn. Non-string values
pass through unchanged so callers can parameterise e.g.
``{"safe_from_{entity}": True}`` or
``{"target_{entity}": (x, y)}``.
Args:
entity_ids: Non-empty sequence of entity identifiers. The
returned :class:`MultiGoal` holds one child goal per
id, in input order.
conditions: Template mapping. At least one key or string
value must contain ``{entity}`` — otherwise every
generated child would be identical, which is almost
certainly a caller mistake.
mode: Forwarded to :class:`MultiGoal`. Defaults to
``"any"`` because the canonical use case (per-
adversary escape goals) is "satisfy whichever is
cheapest". Use ``"sequential"`` when the per-entity
ordering encodes execution intent.
**goal_kwargs: Forwarded verbatim to every :class:`GoalSpec`
child (e.g. ``policy=GoalPolicy(...)``, ``soft_goals=``,
``metrics=``).
Returns:
A :class:`MultiGoal` whose ``goals`` tuple has one
:class:`GoalSpec` per entity id, in input order.
Raises:
ValueError: If ``entity_ids`` is empty, or if no key /
string value in ``conditions`` contains the
``{entity}`` placeholder.
"""
ids = list(entity_ids)
if not ids:
raise ValueError("GoalSpec.per_entity requires at least one entity id.")
template_has_placeholder = any(
"{entity}" in k or (isinstance(v, str) and "{entity}" in v)
for k, v in conditions.items()
)
if not template_has_placeholder:
raise ValueError(
"GoalSpec.per_entity requires '{entity}' in at least one "
"condition key or string value; otherwise every child "
"goal would be identical. Got conditions="
f"{dict(conditions)!r}."
)
children: list[GoalSpec] = []
for eid in ids:
formatted: dict[str, Any] = {}
for k, v in conditions.items():
new_k = k.format(entity=eid)
new_v = v.format(entity=eid) if isinstance(v, str) else v
formatted[new_k] = new_v
children.append(cls(conditions=MappingProxyType(formatted), **goal_kwargs))
return MultiGoal(goals=tuple(children), mode=mode)
# Public alias for GoalSpec. Use ``Goal`` in user-facing code.
Goal = GoalSpec
[docs]
@dataclass(frozen=True, slots=True)
class MultiGoal:
"""A composite goal made up of one or more :class:`GoalSpec` children.
Two execution modes are supported:
* ``"sequential"`` (default) — the observer plans and executes
``goals[0]`` to completion, then uses the resulting world state
as the starting state for ``goals[1]``, and so on. This is the
right model when sub-goals represent successive stages of a
workflow (e.g. "collect data → analyse → publish").
* ``"any"`` — the planner plans each sub-goal independently and
the observer picks the lowest-:class:`~langgoap.score.Score`
feasible plan. Useful when several possible goals are
acceptable and the system should chase the cheapest one.
Recursive (HTN-style) decomposition where a sub-goal is itself a
``MultiGoal`` is **out of scope for v0.1.0** — only flat
composition is supported and :meth:`__post_init__` rejects
non-:class:`GoalSpec` children with a clear ``ValueError``.
In ``"any"`` mode, ties on plan cost are broken by list order:
the first sub-goal whose plan has the lowest cost wins. This
makes the order of ``goals`` semantically meaningful when
several candidates are equally cheap.
Args:
goals: Non-empty sequence of :class:`GoalSpec` children.
Accepts tuples or lists; a list is coerced to a tuple.
mode: Either ``"sequential"`` or ``"any"``.
"""
goals: tuple[GoalSpec, ...]
mode: Literal["sequential", "any", "best_value"] = "sequential"
def __post_init__(self) -> None:
if not isinstance(self.goals, tuple):
object.__setattr__(self, "goals", tuple(self.goals))
if not self.goals:
raise ValueError("MultiGoal must contain at least one GoalSpec")
if self.mode not in ("sequential", "any", "best_value"):
raise ValueError(
"MultiGoal.mode must be 'sequential', 'any', or 'best_value'; "
f"got {self.mode!r}"
)
for i, g in enumerate(self.goals):
if not isinstance(g, GoalSpec):
raise ValueError(
f"MultiGoal.goals[{i}] must be a GoalSpec, got "
f"{type(g).__name__}. Nested MultiGoal (HTN-style "
"decomposition) is not supported in v0.1.0."
)