Source code for langgoap.viz.ascii

"""ASCII-art renderers for Plan objects.

These renderers never touch the terminal directly; they return plain
strings so callers can log, pipe to files, or embed them in notebooks.
"""

from __future__ import annotations

from typing import TYPE_CHECKING, Sequence

from langgoap.planner.csp import build_dependency_graph

if TYPE_CHECKING:
    from langgoap.planner.csp import CSPMetadata, ScheduleEntry
    from langgoap.planner.explain import (
        InfeasibilityExplanation,
        ResourceShortfall,
    )
    from langgoap.planner.types import Plan


def _render_schedule_block(plan: Plan, schedule: Sequence[ScheduleEntry]) -> list[str]:
    """Emit the scheduled-action block, grouping same-start entries."""
    by_start: dict[float, list[int]] = {}
    for i, entry in enumerate(schedule):
        by_start.setdefault(entry.start.total_seconds(), []).append(i)
    out: list[str] = []
    for start_t in sorted(by_start):
        indices = by_start[start_t]
        if len(indices) > 1:
            out.append(f"├── t={start_t:g}s (parallel):")
            for i in indices:
                dur = schedule[i].duration.total_seconds()
                out.append(
                    f"│     • [{i}] {plan.actions[i].name}  " f"duration={dur:g}s"
                )
        else:
            i = indices[0]
            dur = schedule[i].duration.total_seconds()
            dur_str = f"  duration={dur:g}s" if dur > 0 else ""
            out.append(f"├── t={start_t:g}s  [{i}] {plan.actions[i].name}{dur_str}")
    return out


def _render_linear_block(plan: Plan) -> list[str]:
    """Emit the unscheduled linear tree with precondition-based deps."""
    deps = build_dependency_graph(plan.actions)
    out: list[str] = []
    for i, action in enumerate(plan.actions):
        dep_indices = deps[i]
        dep_str = f"  (after: {dep_indices})" if dep_indices else ""
        out.append(f"├── [{i}] {action.name}{dep_str}")
    return out


def _render_resources_block(csp: CSPMetadata) -> list[str]:
    """Emit the resource-usage section plus makespan (if present)."""
    out = ["", "Resources:"]
    for usage in csp.resource_usage:
        bound = ""
        if usage.constraint_max is not None:
            bound = f" / {usage.constraint_max:g}"
        elif usage.constraint_min is not None:
            bound = f" >= {usage.constraint_min:g}"
        marker = "OK" if usage.satisfied else "VIOLATED"
        out.append(f"  {usage.key}: {usage.total:g}{bound}  [{marker}]")
    if csp.makespan is not None:
        out.append(f"  makespan: {csp.makespan.total_seconds():g}s")
    return out


def _format_shortfall(sf: ResourceShortfall) -> str | None:
    """Format a single resource shortfall entry, or None if unbounded."""
    if sf.available_max is not None:
        return (
            f"    - {sf.key}: {sf.required:g} / {sf.available_max:g} "
            f"(overrun: {sf.overrun:g})"
        )
    if sf.available_min is not None:
        return (
            f"    - {sf.key}: {sf.required:g} >= {sf.available_min:g} "
            f"(shortfall: {sf.overrun:g})"
        )
    return None


def _render_infeasibility_block(explanation: InfeasibilityExplanation) -> list[str]:
    """Emit the infeasibility-explanation section."""
    out = ["", "Infeasibility Explanation:"]
    if explanation.conflicting_constraints:
        out.append("  Conflicting constraints:")
        for c in explanation.conflicting_constraints:
            bound = ""
            if c.max is not None:
                bound = f"max={c.max:g}"
            if c.min is not None:
                bound = f"min={c.min:g}" if not bound else f"{bound}, min={c.min:g}"
            out.append(f"    - {c.key} ({bound}, weight={c.weight:g})")
    if explanation.resource_shortfalls:
        out.append("  Resource shortfalls:")
        for sf in explanation.resource_shortfalls:
            line = _format_shortfall(sf)
            if line is not None:
                out.append(line)
    if explanation.suggestion:
        out.append(f"  Suggestion: {explanation.suggestion}")
    return out


[docs] def render_ascii( plan: Plan, *, show_resources: bool = True, show_schedule: bool = True, ) -> str: """Render a Plan as a top-to-bottom ASCII tree. Each action is one line prefixed with its index. Dependency arrows are drawn to the left. When a schedule is attached, entries at the same start time are grouped into a ``┬── parallel:`` block. Args: plan: Plan to render. show_resources: If True, append a resource usage table. show_schedule: If True, render parallel groups from the schedule. Returns: ASCII string representation of the plan. """ if len(plan) == 0: return "Plan: (empty — goal already satisfied)\n" lines: list[str] = [ f"Plan ({len(plan)} step{'s' if len(plan) != 1 else ''}, " f"cost={plan.total_cost:g})" ] csp = plan.metadata.csp schedule = ( csp.schedule if (show_schedule and csp is not None and csp.schedule) else None ) if schedule is not None and len(schedule) == len(plan.actions): lines.extend(_render_schedule_block(plan, schedule)) else: lines.extend(_render_linear_block(plan)) if show_resources and csp is not None and csp.resource_usage: lines.extend(_render_resources_block(csp)) if csp is not None and csp.explanation is not None: lines.extend(_render_infeasibility_block(csp.explanation)) return "\n".join(lines) + "\n"
[docs] def render_ascii_gantt(plan: Plan, *, width: int = 60) -> str: """Render the plan's schedule as an ASCII Gantt chart. Args: plan: Plan whose ``metadata.csp.schedule`` will be rendered. width: Number of characters for the time axis (excluding labels). Returns: ASCII Gantt block as a plain string. Raises: ValueError: If the plan has no schedule. """ csp = plan.metadata.csp if csp is None or not csp.schedule: raise ValueError( "render_ascii_gantt requires a plan with a CSP schedule " "(plan.metadata.csp.schedule is empty)." ) if width < 10: raise ValueError(f"width must be >= 10, got {width}") max_end = max(entry.end.total_seconds() for entry in csp.schedule) if max_end <= 0: max_end = 1.0 scale = width / max_end name_width = max(len(entry.action_name) for entry in csp.schedule) lines: list[str] = [] lines.append(f"{'action'.ljust(name_width)} |{'-' * width}| " f"0..{max_end:g}s") for entry in csp.schedule: start_col = int(entry.start.total_seconds() * scale) end_col = max(start_col + 1, int(entry.end.total_seconds() * scale)) bar = [" "] * width for col in range(start_col, min(end_col, width)): bar[col] = "#" lines.append( f"{entry.action_name.ljust(name_width)} |{''.join(bar)}| " f"{entry.start.total_seconds():g}-{entry.end.total_seconds():g}s" ) return "\n".join(lines) + "\n"