"""OR-Tools CP-SAT constraint optimizer for GOAP plans.
Runs after A* to validate resource constraints, optimize multi-objective
trade-offs, and compute temporal schedules. When no constraints or objectives
exist, the CSP is skipped entirely (zero overhead).
Architecture:
A* planner → CSP optimizer → Optimized Plan
Resource validation uses a pure-Python fast path (no solver invocation).
CP-SAT is used for temporal scheduling and multi-plan optimization.
"""
from __future__ import annotations
import enum
import logging
import time
from dataclasses import dataclass, field
from datetime import timedelta
from types import MappingProxyType
from typing import TYPE_CHECKING, Any, Literal
from ortools.sat.python import cp_model as _cp_model
if TYPE_CHECKING:
from langgoap.planner.types import Plan
from langgoap.actions import ActionSpec
from langgoap.goals import ConstraintSpec, GoalSpec, SoftGoal
from langgoap.types import ObjectiveDirection
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Types
# ---------------------------------------------------------------------------
[docs]
class CSPStatus(str, enum.Enum):
"""Status of a CSP solve attempt."""
OPTIMAL = "optimal"
FEASIBLE = "feasible"
INFEASIBLE = "infeasible"
ERROR = "error"
SKIPPED = "skipped"
[docs]
@dataclass(frozen=True, slots=True)
class ResourceUsage:
"""Per-key resource breakdown for a plan.
Attributes:
key: Resource identifier (e.g. ``"tokens"``, ``"cost_usd"``).
total: Total consumption across all actions.
constraint_min: Lower bound from the constraint, or ``None``.
constraint_max: Upper bound from the constraint, or ``None``.
satisfied: Whether the total falls within the constraint bounds.
level: Constraint level.
* ``"hard"`` — the usage corresponds to a
:class:`~langgoap.goals.ConstraintSpec` with
``level="hard"``. Violation marks the plan
:attr:`CSPStatus.INFEASIBLE`.
* ``"soft"`` — the usage corresponds to a soft constraint.
Violation contributes to the plan's soft score.
* ``"info"`` (default) — informational only; the resource
key has no matching ``ConstraintSpec``. Violation is
impossible because there is no bound to violate.
"""
key: str
total: float
constraint_min: float | None = None
constraint_max: float | None = None
satisfied: bool = True
level: Literal["hard", "soft", "info"] = "info"
[docs]
@dataclass(frozen=True, slots=True)
class ScheduleEntry:
"""Per-action temporal slot in a schedule.
Attributes:
action_name: Name of the scheduled action.
start: Start time offset from plan begin.
duration: Duration of the action.
end: End time (start + duration).
"""
action_name: str
start: timedelta
duration: timedelta
end: timedelta
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def compute_resource_totals(
actions: tuple[ActionSpec, ...],
) -> dict[str, float]:
"""Sum ``action.resources`` across all actions in a plan.
Actions without resources are silently skipped.
"""
totals: dict[str, float] = {}
for action in actions:
if action.resources is not None:
for key, value in action.resources.items():
totals[key] = totals.get(key, 0.0) + value
return totals
def build_dependency_graph(
actions: tuple[ActionSpec, ...],
) -> dict[int, list[int]]:
"""Build a precedence DAG from precondition/effect chains.
Action j depends on action i (i < j) if j's preconditions include a
condition that is in i's effects and no closer producer exists between
them.
Dynamic-effect actions are treated as potential producers whenever
the precondition key appears in their declared ``effect_keys``. The
value cannot be introspected statically, so this is intentionally
over-inclusive: it may add dependency edges that the runtime would
not require, but it never drops a real edge. The resulting
schedule remains correct; it may just be less parallel than the
theoretical optimum.
Returns:
Mapping from action index to list of predecessor indices.
"""
deps: dict[int, list[int]] = {i: [] for i in range(len(actions))}
for j in range(len(actions)):
if not actions[j].preconditions:
continue
for key, value in actions[j].preconditions.items():
for i in range(j - 1, -1, -1):
prev = actions[i]
produces = (
key in (prev.effect_keys or frozenset())
if prev.has_dynamic_effects
else prev.effects.get(key) == value # type: ignore[union-attr]
)
if produces:
if i not in deps[j]:
deps[j].append(i)
break # closest producer found
return deps
# Back-compat aliases; prefer the public names in new code.
_compute_resource_totals = compute_resource_totals
_build_dependency_graph = build_dependency_graph
# ---------------------------------------------------------------------------
# Public API — validate_plan (pure-Python fast path)
# ---------------------------------------------------------------------------
def validate_plan(
plan: Plan,
goal: GoalSpec,
*,
scale: int = 1000,
) -> CSPMetadata:
"""Validate a plan against goal constraints and compute a temporal schedule.
Pure-Python path for resource-only constraints (no ortools needed).
Invokes CP-SAT only when temporal scheduling is required (actions have
durations).
Args:
plan: The plan to validate.
goal: Goal specification with constraints and objectives.
scale: Integer scaling factor for CP-SAT (float → int conversion).
Returns:
CSPMetadata with validation results.
"""
t0 = time.monotonic()
# No constraints → skip entirely
if not goal.constraints and goal.objectives is None:
return CSPMetadata(
status=CSPStatus.SKIPPED,
solver_time_ms=(time.monotonic() - t0) * 1000,
scale_factor=scale,
)
totals = compute_resource_totals(plan.actions)
usage_list, all_hard_satisfied = _compute_resource_usage(totals, goal)
obj_values = _compute_objective_values(totals, plan, goal)
status = CSPStatus.FEASIBLE if all_hard_satisfied else CSPStatus.INFEASIBLE
# Schedule only when actions carry durations and no hard resource
# constraint is already violated (scheduling a known-infeasible plan
# wastes solver time).
schedule_meta: CSPMetadata | None = None
if all_hard_satisfied and any(a.duration is not None for a in plan.actions):
schedule_meta = schedule_plan(plan, scale=scale)
# Scheduling failure overrides a feasible resource check.
final_status = status
if schedule_meta is not None and schedule_meta.status == CSPStatus.INFEASIBLE:
final_status = CSPStatus.INFEASIBLE
explanation = None
if final_status == CSPStatus.INFEASIBLE:
from langgoap.planner.explain import explain_infeasibility
meta_for_explain = CSPMetadata(
status=final_status,
resource_usage=tuple(usage_list),
)
explanation = explain_infeasibility(plan, goal, meta_for_explain)
elapsed = (time.monotonic() - t0) * 1000
return CSPMetadata(
status=final_status,
solver_time_ms=elapsed,
resource_usage=tuple(usage_list),
objective_values=MappingProxyType(obj_values),
schedule=schedule_meta.schedule if schedule_meta else (),
makespan=schedule_meta.makespan if schedule_meta else None,
plans_evaluated=1,
scale_factor=scale,
explanation=explanation,
)
def _compute_resource_usage(
totals: dict[str, float], goal: GoalSpec
) -> tuple[list[ResourceUsage], bool]:
"""Classify every resource against its constraint, returning usage + hard-sat flag.
Hard violations mark the plan INFEASIBLE; soft violations are recorded
but do not. Resource keys that have no declared constraint are returned
with ``level="info"`` for downstream reporting.
"""
constraint_map: dict[str, ConstraintSpec] = {c.key: c for c in goal.constraints}
usage_list: list[ResourceUsage] = []
all_hard_satisfied = True
for key, constraint in constraint_map.items():
total = totals.get(key, 0.0)
satisfied = not (
(constraint.max is not None and total > constraint.max)
or (constraint.min is not None and total < constraint.min)
)
if not satisfied and constraint.level == "hard":
all_hard_satisfied = False
usage_list.append(
ResourceUsage(
key=key,
total=total,
constraint_min=constraint.min,
constraint_max=constraint.max,
satisfied=satisfied,
level=constraint.level,
)
)
for key, total in totals.items():
if key not in constraint_map:
usage_list.append(ResourceUsage(key=key, total=total, level="info"))
return usage_list, all_hard_satisfied
def _compute_objective_values(
totals: dict[str, float], plan: Plan, goal: GoalSpec
) -> dict[str, float]:
"""Collect objective resource totals and evaluate soft goals."""
obj_values: dict[str, float] = {}
if goal.objectives is not None:
for obj_key in goal.objectives:
obj_values[obj_key] = totals.get(obj_key, 0.0)
if goal.soft_goals and plan.expected_states:
final_state = plan.expected_states[-1]
for sg in goal.soft_goals:
achieved = final_state.satisfies(sg.conditions)
obj_values[f"soft_goal:{sg.label}"] = sg.weight if achieved else 0.0
return obj_values
# ---------------------------------------------------------------------------
# Public API — schedule_plan (CP-SAT temporal scheduling)
# ---------------------------------------------------------------------------
def schedule_plan(
plan: Plan,
*,
scale: int = 1000,
) -> CSPMetadata:
"""Compute a temporal schedule for a plan using CP-SAT.
Creates an ``IntervalVar`` per action with precedence constraints from
the dependency graph and minimizes makespan.
Args:
plan: The plan to schedule.
scale: Temporal resolution in ticks per second. With the default
value of 1000 each tick represents 1 millisecond. Lower values
give coarser resolution; higher values give finer resolution.
Returns:
CSPMetadata with schedule entries and makespan.
Returns ``SKIPPED`` if no actions have durations.
"""
t0 = time.monotonic()
if not any(a.duration is not None for a in plan.actions):
return CSPMetadata(
status=CSPStatus.SKIPPED,
solver_time_ms=(time.monotonic() - t0) * 1000,
scale_factor=scale,
)
cp_model = _cp_model
model = cp_model.CpModel()
durations_ms = _scaled_durations_ms(plan, scale)
starts, makespan = _build_schedule_vars(model, plan, durations_ms)
solver = cp_model.CpSolver()
solver_status = solver.solve(model)
elapsed = (time.monotonic() - t0) * 1000
if solver_status not in (cp_model.OPTIMAL, cp_model.FEASIBLE):
return CSPMetadata(
status=CSPStatus.INFEASIBLE,
solver_time_ms=elapsed,
scale_factor=scale,
)
schedule_entries = _decode_schedule(plan, solver, starts, durations_ms, scale)
makespan_val = solver.value(makespan)
csp_status = (
CSPStatus.OPTIMAL if solver_status == cp_model.OPTIMAL else CSPStatus.FEASIBLE
)
return CSPMetadata(
status=csp_status,
solver_time_ms=elapsed,
schedule=tuple(schedule_entries),
makespan=timedelta(milliseconds=makespan_val * 1000 / scale),
scale_factor=scale,
)
def _scaled_durations_ms(plan: Plan, scale: int) -> list[int]:
"""Return each action's scaled integer duration; 0 for durationless actions."""
out: list[int] = []
for a in plan.actions:
if a.duration is not None:
out.append(max(int(a.duration.total_seconds() * scale), 0))
else:
out.append(0)
return out
def _build_schedule_vars(
model: Any, plan: Plan, durations_ms: list[int]
) -> tuple[list[Any], Any]:
"""Create start/interval/makespan variables and attach precedence constraints."""
n = len(plan.actions)
horizon = sum(durations_ms) + 1
starts: list[Any] = []
for i in range(n):
start = model.new_int_var(0, horizon, f"start_{i}")
model.new_interval_var(
start, durations_ms[i], start + durations_ms[i], f"interval_{i}"
)
starts.append(start)
# Precedence constraints from dependency graph
deps = build_dependency_graph(plan.actions)
for j, predecessors in deps.items():
for i in predecessors:
model.add(starts[j] >= starts[i] + durations_ms[i])
# Objective: minimize makespan
makespan = model.new_int_var(0, horizon, "makespan")
for i in range(n):
model.add(makespan >= starts[i] + durations_ms[i])
model.minimize(makespan)
return starts, makespan
def _decode_schedule(
plan: Plan,
solver: Any,
starts: list[Any],
durations_ms: list[int],
scale: int,
) -> list[ScheduleEntry]:
"""Materialize ``ScheduleEntry`` instances from solved CP-SAT variables."""
entries: list[ScheduleEntry] = []
for i in range(len(plan.actions)):
start_val = solver.value(starts[i])
dur_val = durations_ms[i]
entries.append(
ScheduleEntry(
action_name=plan.actions[i].name,
start=timedelta(milliseconds=start_val * 1000 / scale),
duration=timedelta(milliseconds=dur_val * 1000 / scale),
end=timedelta(milliseconds=(start_val + dur_val) * 1000 / scale),
)
)
return entries
# ---------------------------------------------------------------------------
# Public API — optimize_plans (CP-SAT multi-plan selection)
# ---------------------------------------------------------------------------
def optimize_plans(
plans: list[Plan],
goal: GoalSpec,
*,
scale: int = 1000,
) -> tuple[Plan, CSPMetadata]:
"""Select the best plan from candidates using CP-SAT.
Creates a boolean selection variable per plan, applies resource constraints
on the selected plan, and optimizes a weighted objective combining
``GoalSpec.objectives``.
Args:
plans: Candidate plans to evaluate.
goal: Goal with constraints and objectives.
scale: Scaling factor for float → int conversion.
Returns:
Tuple of (selected plan, CSPMetadata).
Raises:
ValueError: If plans list is empty.
"""
if not plans:
raise ValueError("No plans to optimize")
t0 = time.monotonic()
cp_model = _cp_model
model = cp_model.CpModel()
n = len(plans)
plan_resources: list[dict[str, float]] = [
compute_resource_totals(p.actions) for p in plans
]
all_keys: set[str] = set()
for res in plan_resources:
all_keys.update(res.keys())
selected, resource_vars = _build_selection_vars(
model, n, plan_resources, all_keys, scale
)
constraint_map: dict[str, ConstraintSpec] = {c.key: c for c in goal.constraints}
soft_violation_terms = _wire_constraints(
model, resource_vars, constraint_map, plan_resources, scale
)
_set_optimization_objective(
model,
selected,
resource_vars,
soft_violation_terms,
plans,
goal,
scale,
)
solver = cp_model.CpSolver()
solver_status = solver.solve(model)
elapsed = (time.monotonic() - t0) * 1000
if solver_status not in (cp_model.OPTIMAL, cp_model.FEASIBLE):
return plans[0], CSPMetadata(
status=CSPStatus.INFEASIBLE,
solver_time_ms=elapsed,
plans_evaluated=n,
scale_factor=scale,
)
chosen_idx = _decode_selection(solver, selected, n)
chosen_plan = plans[chosen_idx]
chosen_resources = plan_resources[chosen_idx]
usage_list = _build_usage_list(all_keys, constraint_map, chosen_resources)
obj_values: dict[str, float] = {}
if goal.objectives:
for obj_key in goal.objectives:
obj_values[obj_key] = chosen_resources.get(obj_key, 0.0)
csp_status = (
CSPStatus.OPTIMAL if solver_status == cp_model.OPTIMAL else CSPStatus.FEASIBLE
)
return chosen_plan, CSPMetadata(
status=csp_status,
solver_time_ms=elapsed,
resource_usage=tuple(usage_list),
objective_values=MappingProxyType(obj_values),
plans_evaluated=n,
scale_factor=scale,
)
def _build_selection_vars(
model: Any,
n: int,
plan_resources: list[dict[str, float]],
all_keys: set[str],
scale: int,
) -> tuple[list[Any], dict[str, Any]]:
"""Create boolean plan selectors plus one IntVar per resource key.
The resource IntVars are constrained so each one equals the weighted
sum of ``selected[i] * scaled_value[i]`` — effectively a dispatch
table that yields the chosen plan's resource total.
"""
selected = [model.new_bool_var(f"plan_{i}") for i in range(n)]
model.add_exactly_one(selected)
resource_vars: dict[str, Any] = {}
for key in all_keys:
scaled_values = [int(res.get(key, 0.0) * scale) for res in plan_resources]
rv = model.new_int_var(0, max(scaled_values) + 1, f"resource_{key}")
model.add(rv == sum(selected[i] * scaled_values[i] for i in range(n)))
resource_vars[key] = rv
return selected, resource_vars
def _compute_big_m(
plan_resources: list[dict[str, float]],
constraint_map: dict[str, ConstraintSpec],
scale: int,
) -> int:
"""Return a safe Big-M bound for soft-violation slack variables."""
max_scaled = 1
for res in plan_resources:
for v in res.values():
scaled = int(abs(v) * scale) + 1
if scaled > max_scaled:
max_scaled = scaled
# Also account for explicit bounds when larger than observed totals.
for constraint in constraint_map.values():
if constraint.max is not None:
max_scaled = max(max_scaled, int(abs(constraint.max) * scale) + 1)
if constraint.min is not None:
max_scaled = max(max_scaled, int(abs(constraint.min) * scale) + 1)
return max_scaled * 2 + 1
def _wire_constraints(
model: Any,
resource_vars: dict[str, Any],
constraint_map: dict[str, ConstraintSpec],
plan_resources: list[dict[str, float]],
scale: int,
) -> list[Any]:
"""Attach hard bounds directly; emit soft-violation slack terms.
Hard constraints become solver assertions. Soft constraints introduce
a non-negative slack variable (Big-M bounded) whose weighted value is
returned so the caller can fold it into the objective (R1).
"""
soft_violation_terms: list[Any] = []
big_m = _compute_big_m(plan_resources, constraint_map, scale)
for key, constraint in constraint_map.items():
if key not in resource_vars:
# Constrained key not present in any plan — model it as a zero var.
resource_vars[key] = model.new_int_var(0, 0, f"resource_{key}")
rv = resource_vars[key]
if constraint.level == "hard":
if constraint.max is not None:
model.add(rv <= int(constraint.max * scale))
if constraint.min is not None:
model.add(rv >= int(constraint.min * scale))
continue
# Soft bound: violations allowed but penalized.
weight_scaled = max(int(constraint.weight * scale), 1)
if constraint.max is not None:
viol_max = model.new_int_var(0, big_m, f"soft_viol_max_{key}")
model.add(viol_max >= rv - int(constraint.max * scale))
soft_violation_terms.append(viol_max * weight_scaled)
if constraint.min is not None:
viol_min = model.new_int_var(0, big_m, f"soft_viol_min_{key}")
model.add(viol_min >= int(constraint.min * scale) - rv)
soft_violation_terms.append(viol_min * weight_scaled)
return soft_violation_terms
def _set_optimization_objective(
model: Any,
selected: list[Any],
resource_vars: dict[str, Any],
soft_violation_terms: list[Any],
plans: list[Plan],
goal: GoalSpec,
scale: int,
) -> None:
"""Combine hard objectives, soft-constraint penalties, and soft-goal rewards.
Falls back to plain total-cost minimisation when no objectives, soft
constraints, or soft goals are declared.
"""
objective_terms: list[Any] = []
if goal.objectives:
for obj_key, direction in goal.objectives.items():
if obj_key not in resource_vars:
continue
if direction == ObjectiveDirection.MINIMIZE:
# Minimizing → positive coefficient (solver minimizes)
objective_terms.append(resource_vars[obj_key])
else:
# Maximizing → negative coefficient (solver minimizes -value)
objective_terms.append(-resource_vars[obj_key])
# Soft goals: reward weighted achievement (binary per plan × weight).
soft_goal_terms: list[Any] = []
if goal.soft_goals:
for sg in goal.soft_goals:
weight_scaled = max(int(sg.weight * scale), 1)
for i, p in enumerate(plans):
achieved = bool(
p.expected_states and p.expected_states[-1].satisfies(sg.conditions)
)
if achieved:
# Subtract (negative in minimise) to reward achievement
soft_goal_terms.append(-selected[i] * weight_scaled)
combined_terms = objective_terms + soft_violation_terms + soft_goal_terms
if combined_terms:
model.minimize(sum(combined_terms))
else:
# No objectives or soft violations — prefer lower total cost.
cost_terms = [
selected[i] * int(plans[i].total_cost * scale) for i in range(len(plans))
]
model.minimize(sum(cost_terms))
def _decode_selection(solver: Any, selected: list[Any], n: int) -> int:
"""Return the index of the plan the solver picked (defaults to 0)."""
for i in range(n):
if solver.value(selected[i]):
return i
return 0
def _build_usage_list(
all_keys: set[str],
constraint_map: dict[str, ConstraintSpec],
chosen_resources: dict[str, float],
) -> list[ResourceUsage]:
"""Materialize ResourceUsage entries from the chosen plan's totals.
Hard constraints are satisfied by the solver; for soft constraints
the ``satisfied`` flag reflects the actual chosen totals vs. bounds.
Keys declared in constraints but absent from every plan are appended
with ``total=0`` so downstream consumers see a complete picture.
"""
usage_list: list[ResourceUsage] = []
for key in sorted(all_keys):
cspec = constraint_map.get(key)
total = chosen_resources.get(key, 0.0)
satisfied = True
if cspec is not None:
if cspec.max is not None and total > cspec.max:
satisfied = False
if cspec.min is not None and total < cspec.min:
satisfied = False
usage_list.append(
ResourceUsage(
key=key,
total=total,
constraint_min=cspec.min if cspec else None,
constraint_max=cspec.max if cspec else None,
satisfied=satisfied,
level=cspec.level if cspec else "info",
)
)
for key, cspec in constraint_map.items():
if key not in all_keys:
usage_list.append(
ResourceUsage(
key=key,
total=0.0,
constraint_min=cspec.min,
constraint_max=cspec.max,
satisfied=(cspec.min is None or cspec.min <= 0),
level=cspec.level,
)
)
return usage_list
# ---------------------------------------------------------------------------
# Public API — pareto_plans (Pareto-optimal plan enumeration)
# ---------------------------------------------------------------------------
def _dominates(
a_obj: dict[str, float],
b_obj: dict[str, float],
objectives: "MappingProxyType[str, Any]",
) -> bool:
"""Return True if plan-A dominates plan-B on all objectives.
A dominates B when A is at least as good on every objective AND strictly
better on at least one. "Better" is direction-dependent:
MINIMIZE → lower is better, MAXIMIZE → higher is better.
"""
from langgoap.types import ObjectiveDirection
at_least_as_good_count = 0
strictly_better_count = 0
for key, direction in objectives.items():
a_val = a_obj.get(key, 0.0)
b_val = b_obj.get(key, 0.0)
if direction == ObjectiveDirection.MINIMIZE:
if a_val > b_val:
return False
if a_val < b_val:
strictly_better_count += 1
else: # MAXIMIZE
if a_val < b_val:
return False
if a_val > b_val:
strictly_better_count += 1
at_least_as_good_count += 1
return at_least_as_good_count > 0 and strictly_better_count > 0
def pareto_plans(
plans: list["Plan"],
goal: GoalSpec,
*,
scale: int = 1000,
max_frontier: int | None = None,
) -> list[tuple["Plan", CSPMetadata]]:
"""Enumerate the Pareto-optimal subset of candidate plans.
From a finite set of candidate plans, returns the non-dominated subset
(Pareto frontier) ordered by primary objective value. A plan A
*dominates* plan B when A is at least as good on **all** objectives and
strictly better on **at least one**.
Hard constraints are respected: plans violating any hard
:class:`~langgoap.goals.ConstraintSpec` are excluded from the frontier
entirely before dominance is computed.
When ``goal.objectives`` is empty or ``None``, every feasible plan is
non-dominated (no objective can distinguish them) and the full feasible
set is returned (subject to *max_frontier*) sorted by plan cost.
Args:
plans: Candidate plans. Typically produced by running A*
with different starting states, action subsets, or by
:class:`~langgoap.planner.strategy.LazyDecompositionStrategy`.
goal: Goal spec with constraints and objectives.
scale: Integer scaling factor for resource computations
(forwarded to :func:`validate_plan`).
max_frontier: Cap on the number of returned frontier plans. The
``max_frontier`` plans with the best primary-objective
value are kept. ``None`` returns all non-dominated
plans.
Returns:
A list of ``(Plan, CSPMetadata)`` tuples for non-dominated plans,
ordered by the primary (first) objective value. Empty when all
plans are infeasible or the input list is empty.
Raises:
ValueError: When *plans* is empty.
"""
if not plans:
raise ValueError("pareto_plans: plans list must not be empty")
t0 = time.monotonic()
feasible = _collect_feasible_plans(plans, goal, scale=scale)
if not feasible:
return []
plan_obj_values = _collect_objective_values(feasible, goal)
if not goal.objectives:
# No objectives — all feasible plans are non-dominated; sort by cost.
frontier = sorted(feasible, key=lambda t: t[0].total_cost)
if max_frontier is not None:
frontier = frontier[:max_frontier]
return frontier
frontier_items = _pareto_frontier(feasible, plan_obj_values, goal.objectives)
frontier = _sort_by_primary_objective(frontier_items, goal.objectives)
if max_frontier is not None:
frontier = frontier[:max_frontier]
elapsed_ms = (time.monotonic() - t0) * 1000
return _annotate_metadata(frontier, len(plans), elapsed_ms, scale)
def _collect_feasible_plans(
plans: list["Plan"], goal: GoalSpec, *, scale: int
) -> list[tuple["Plan", CSPMetadata]]:
"""Validate each plan and keep only the ones whose hard constraints hold."""
feasible: list[tuple["Plan", CSPMetadata]] = []
for p in plans:
meta = validate_plan(p, goal, scale=scale)
if meta.status != CSPStatus.INFEASIBLE:
feasible.append((p, meta))
return feasible
def _collect_objective_values(
feasible: list[tuple["Plan", CSPMetadata]], goal: GoalSpec
) -> list[dict[str, float]]:
"""Return per-plan objective dicts, filling in missing keys from resource totals."""
plan_obj_values: list[dict[str, float]] = []
for p, meta in feasible:
obj: dict[str, float] = dict(meta.objective_values)
# Also include resource totals that match objective keys but weren't
# in meta.objective_values (e.g. from the pure-Python fast path).
if goal.objectives:
totals = compute_resource_totals(p.actions)
for key in goal.objectives:
if key not in obj:
obj[key] = totals.get(key, 0.0)
plan_obj_values.append(obj)
return plan_obj_values
def _pareto_frontier(
feasible: list[tuple["Plan", CSPMetadata]],
plan_obj_values: list[dict[str, float]],
objectives: "MappingProxyType[str, Any]",
) -> list[tuple[tuple["Plan", CSPMetadata], dict[str, float]]]:
"""Return the non-dominated ``(item, objective_values)`` pairs."""
n = len(feasible)
dominated = [False] * n
for i in range(n):
for j in range(n):
if i == j or dominated[j]:
continue
if _dominates(plan_obj_values[j], plan_obj_values[i], objectives):
dominated[i] = True
break
return [(feasible[i], plan_obj_values[i]) for i in range(n) if not dominated[i]]
def _sort_by_primary_objective(
frontier_items: list[tuple[tuple["Plan", CSPMetadata], dict[str, float]]],
objectives: "MappingProxyType[str, Any]",
) -> list[tuple["Plan", CSPMetadata]]:
"""Sort frontier items by the first declared objective, honoring its direction."""
from langgoap.types import ObjectiveDirection
primary_key, primary_dir = next(iter(objectives.items()))
reverse_sort = primary_dir == ObjectiveDirection.MAXIMIZE
frontier_items.sort(key=lambda t: t[1].get(primary_key, 0.0), reverse=reverse_sort)
return [item for item, _ in frontier_items]
def _annotate_metadata(
frontier: list[tuple["Plan", CSPMetadata]],
plans_evaluated: int,
elapsed_ms: float,
scale: int,
) -> list[tuple["Plan", CSPMetadata]]:
"""Stamp each metadata with aggregate solver time and plan count."""
result: list[tuple["Plan", CSPMetadata]] = []
for p, meta in frontier:
annotated = CSPMetadata(
status=meta.status,
solver_time_ms=elapsed_ms,
resource_usage=meta.resource_usage,
objective_values=meta.objective_values,
schedule=meta.schedule,
makespan=meta.makespan,
plans_evaluated=plans_evaluated,
scale_factor=scale,
explanation=meta.explanation,
)
result.append((p, annotated))
return result