Source code for langgoap.viz.dot

"""Graphviz DOT renderer for Plan objects."""

from __future__ import annotations

import re
from typing import TYPE_CHECKING, Sequence

from langgoap.planner.csp import build_dependency_graph

if TYPE_CHECKING:
    from langgoap.planner.csp import CSPMetadata, ScheduleEntry
    from langgoap.planner.types import Plan


_ID_SAFE = re.compile(r"[^A-Za-z0-9_]")


def _node_id(index: int, name: str) -> str:
    return f"a{index}_{_ID_SAFE.sub('_', name)}"


def _escape_dot(text: str) -> str:
    return text.replace("\\", "\\\\").replace('"', '\\"')


[docs] def render_dot( plan: Plan, *, show_resources: bool = True, show_schedule: bool = True, ) -> str: """Render a Plan as a Graphviz DOT digraph. Args: plan: Plan to render. show_resources: If True, append a legend node with resource usage. show_schedule: If True and the plan has a schedule, group actions with equal start time into clusters. Returns: DOT source code as a plain string. """ if len(plan) == 0: return 'digraph Plan {\n rankdir=TB;\n empty [label="(empty plan)"];\n}\n' lines: list[str] = [ "digraph Plan {", " rankdir=TB;", ' node [shape=box, style="rounded,filled", fillcolor="#e8f4ff"];', ] node_ids = [_node_id(i, a.name) for i, a in enumerate(plan.actions)] csp = plan.metadata.csp schedule = ( csp.schedule if (show_schedule and csp is not None and csp.schedule) else None ) if schedule is not None and len(schedule) == len(plan.actions): lines.extend(_emit_scheduled_nodes(plan, node_ids, schedule)) else: lines.extend(_emit_unscheduled_nodes(plan, node_ids)) lines.extend(_emit_edges(plan, node_ids)) if show_resources and csp is not None and csp.resource_usage: lines.append(_emit_resource_legend(csp)) lines.append("}") return "\n".join(lines) + "\n"
def _emit_scheduled_nodes( plan: Plan, node_ids: list[str], schedule: Sequence[ScheduleEntry] ) -> list[str]: """Emit nodes grouped by start time, wrapping parallel groups in clusters.""" by_start: dict[float, list[int]] = {} for i, entry in enumerate(schedule): by_start.setdefault(entry.start.total_seconds(), []).append(i) out: list[str] = [] for cluster_idx, start_t in enumerate(sorted(by_start)): indices = by_start[start_t] if len(indices) > 1: out.append(f" subgraph cluster_{cluster_idx} {{") out.append(f' label="t={start_t:g}s";') out.append(' style="dashed";') for i in indices: label = _format_label(plan, i, schedule[i]) out.append(f' {node_ids[i]} [label="{label}"];') out.append(" }") else: i = indices[0] label = _format_label(plan, i, schedule[i]) out.append(f' {node_ids[i]} [label="{label}"];') return out def _emit_unscheduled_nodes(plan: Plan, node_ids: list[str]) -> list[str]: """Emit one node per action, annotating non-default costs.""" out: list[str] = [] for i, action in enumerate(plan.actions): label = _escape_dot(action.name) cost = action.get_cost({}) if cost != 1.0: label = f"{label}\\ncost={cost:g}" out.append(f' {node_ids[i]} [label="{label}"];') return out def _emit_edges(plan: Plan, node_ids: list[str]) -> list[str]: """Emit dependency edges, falling back to a dashed sequential chain.""" deps = build_dependency_graph(plan.actions) out: list[str] = [] has_any_edge = False for j in range(len(plan.actions)): for i in deps[j]: out.append(f" {node_ids[i]} -> {node_ids[j]};") has_any_edge = True if not has_any_edge and len(plan.actions) > 1: for i in range(len(plan.actions) - 1): out.append(f" {node_ids[i]} -> {node_ids[i + 1]} [style=dashed];") return out def _emit_resource_legend(csp: CSPMetadata) -> str: """Build the resource-usage legend node as a single DOT statement.""" legend_lines = ["Resources:"] for usage in csp.resource_usage: bound = "" if usage.constraint_max is not None: bound = f" / {usage.constraint_max:g}" elif usage.constraint_min is not None: bound = f" >= {usage.constraint_min:g}" status = "OK" if usage.satisfied else "VIOLATED" legend_lines.append(f"{usage.key}: {usage.total:g}{bound} [{status}]") legend = "\\l".join(_escape_dot(line) for line in legend_lines) + "\\l" return ' legend [shape=note, fillcolor="#fff8dc", ' f'label="{legend}"];' def _format_label(plan: Plan, index: int, schedule_entry: ScheduleEntry) -> str: action = plan.actions[index] label = _escape_dot(action.name) duration_s = schedule_entry.duration.total_seconds() if duration_s > 0: label = f"{label}\\nduration={duration_s:g}s" return label