"""Tree widget for browsing Buffalo Panel simulation configurations."""
from __future__ import annotations
from dataclasses import dataclass, is_dataclass
from typing import Literal
from textual.widgets import Tree
from textual.widgets._tree import (
TreeNode, # pyright: ignore[reportPrivateImportUsage]
)
from buffalo_panel.config import (
PanelCaseSpec,
SchemaClassTree,
SchemaFieldTree,
schema_tree,
)
from buffalo_panel.config.internal.loading import (
AIRFOIL_SPEC_BY_TYPE,
exact_airfoil_source_choices,
)
from .simulation_data import (
AIRFOIL_ENTRY_FIELD_NAME,
AIRFOIL_ENTRY_PATH_LENGTH,
AIRFOIL_SOURCE_FIELD_NAME,
AIRFOIL_SWITCH_SOURCE_COUNT,
TWO_DIMENSIONAL_TUPLE_LENGTH,
NodePath,
SimulationDataManager,
SimulationValue,
)
type SchemaNode = SchemaClassTree | SchemaFieldTree
type NodeKind = Literal["field", "list_item", "mapping_entry"]
[docs]
@dataclass(frozen=True, slots=True)
class SimulationNodeData:
"""Structured metadata stored on each visible tree node."""
path: NodePath
"""Canonical location of the node value inside simulation data."""
schema: SchemaNode
"""Schema node describing the field or object at ``path``."""
kind: NodeKind
"""Whether the node represents a field, list item, or mapping entry."""
key_label: str | None = None
"""User-facing label for renamable mapping keys when relevant."""
@property
def is_mapping_entry(self) -> bool:
"""Return whether the node represents one mapping entry key."""
return self.kind == "mapping_entry"
[docs]
class SimulationTree(Tree[SimulationNodeData]):
"""Widget for navigating the simulation configuration hierarchy."""
[docs]
def rebuild(
self,
manager: SimulationDataManager,
cursor_path: NodePath | None = None,
) -> None:
"""Re-populate the tree from the current simulation data."""
expanded_paths = self._get_expanded_paths()
self.root.remove_children()
root_schema = schema_tree(PanelCaseSpec)
self._populate_tree(self.root, manager.data, root_schema, (), manager)
self._restore_expansion_state(expanded_paths)
self.root.expand()
if cursor_path is not None:
self.call_after_refresh(self._restore_cursor_path, cursor_path)
def _populate_tree(
self,
parent: TreeNode[SimulationNodeData],
data: SimulationValue,
schema_node: SchemaNode | None,
path: NodePath,
manager: SimulationDataManager,
) -> None:
"""Recursively populate the tree from schema-aware case data."""
if isinstance(schema_node, SchemaClassTree):
if not isinstance(data, dict):
return
field_map = {f.name: f for f in schema_node.fields}
def add_field(name: str) -> None:
if name not in field_map or name not in data:
return
field_schema = field_map[name]
child_path = (*path, name)
child_value = data[name]
child_label = str(field_schema.metadata.get("label", name))
child_schema = self._resolve_mapping_schema(
child_value,
field_schema.object_schema or field_schema,
)
self._add_tree_node(
parent=parent,
label=child_label,
value=child_value,
node_data=SimulationNodeData(
path=child_path,
schema=field_schema,
kind="field",
),
child_schema=child_schema,
manager=manager,
)
add_field("type")
self._add_airfoil_source_node(parent, schema_node, path, manager)
for field_info in schema_node.fields:
if field_info.name != "type":
add_field(field_info.name)
return
item_schema = (
schema_node.item_schema
if isinstance(schema_node, SchemaFieldTree)
else None
)
parent_meta = (
schema_node.metadata
if isinstance(schema_node, SchemaFieldTree)
else {}
)
if isinstance(data, list):
item_node_schema = item_schema or schema_node
if item_node_schema is None:
return
label_field = parent_meta.get("collection_item_label_field")
tuple_labels: tuple[str, ...] = ()
if (
parent_meta.get("value_kind") == "tuple"
and parent_meta.get("tuple_length")
== TWO_DIMENSIONAL_TUPLE_LENGTH
):
tuple_labels = ("x", "y")
for index, child_value in enumerate(data):
if index < len(tuple_labels):
label = tuple_labels[index]
elif (
isinstance(child_value, dict)
and isinstance(label_field, str)
and label_field in child_value
):
label = str(child_value[label_field])
else:
label = f"[{index}]"
child_path = (*path, index)
self._add_tree_node(
parent=parent,
label=label,
value=child_value,
node_data=SimulationNodeData(
path=child_path,
schema=item_node_schema,
kind="list_item",
),
child_schema=item_node_schema,
manager=manager,
)
return
if isinstance(data, dict) and isinstance(schema_node, SchemaFieldTree):
key_label = parent_meta.get("collection_key_label", "Name")
for key, child_value in data.items():
child_path = (*path, key)
resolved_schema = self._resolve_mapping_schema(
child_value,
item_schema or schema_node,
)
self._add_tree_node(
parent=parent,
label=str(key),
value=child_value,
node_data=SimulationNodeData(
path=child_path,
schema=resolved_schema,
kind="mapping_entry",
key_label=str(key_label),
),
child_schema=resolved_schema,
manager=manager,
)
def _add_tree_node(
self,
*,
parent: TreeNode[SimulationNodeData],
label: str,
value: SimulationValue,
node_data: SimulationNodeData,
child_schema: SchemaNode | None,
manager: SimulationDataManager,
) -> None:
"""Create a node, attach metadata, and recurse for containers."""
if manager.is_branch_value(value):
node = parent.add(label, data=node_data, expand=False)
if child_schema is not None:
self._populate_tree(
node, value, child_schema, node_data.path, manager
)
return
parent.add_leaf(label, data=node_data)
def _get_expanded_paths(self) -> set[NodePath]:
"""Return paths for all currently expanded nodes."""
expanded: set[NodePath] = set()
stack: list[TreeNode[SimulationNodeData]] = [self.root]
while stack:
node = stack.pop()
if node.is_expanded and node.data is not None:
expanded.add(node.data.path)
stack.extend(node.children)
return expanded
def _restore_expansion_state(self, expanded_paths: set[NodePath]) -> None:
"""Expand nodes whose paths were previously expanded."""
stack: list[TreeNode[SimulationNodeData]] = [self.root]
while stack:
node = stack.pop()
if node.data is not None and node.data.path in expanded_paths:
node.expand()
stack.extend(node.children)
[docs]
def find_node_by_path(
self, path: NodePath
) -> TreeNode[SimulationNodeData] | None:
"""Return the first tree node whose stored path matches ``path``."""
stack: list[TreeNode[SimulationNodeData]] = [self.root]
while stack:
node = stack.pop()
if node.data is not None and node.data.path == path:
return node
stack.extend(node.children)
return None
def _add_airfoil_source_node(
self,
parent: TreeNode[SimulationNodeData],
schema_node: SchemaClassTree,
path: NodePath,
manager: SimulationDataManager,
) -> None:
"""Insert one synthetic source selector for exact-switch airfoils."""
airfoil_type = self._airfoil_entry_type(path, manager)
if airfoil_type is None:
return
source_choices = exact_airfoil_source_choices(airfoil_type)
if len(source_choices) < AIRFOIL_SWITCH_SOURCE_COUNT:
return
parent.add_leaf(
"Source",
data=SimulationNodeData(
path=(*path, AIRFOIL_SOURCE_FIELD_NAME),
schema=self._airfoil_source_schema(schema_node),
kind="field",
),
)
@staticmethod
def _airfoil_entry_type(
path: NodePath, manager: SimulationDataManager
) -> str | None:
"""Return the airfoil family for one airfoil-entry path."""
if (
len(path) != AIRFOIL_ENTRY_PATH_LENGTH
or path[0] != "geometry"
or path[1] != "bodies"
or not isinstance(path[2], int)
or path[3] != AIRFOIL_ENTRY_FIELD_NAME
):
return None
airfoil_value = manager.get_value(path)
if not isinstance(airfoil_value, dict):
return None
airfoil_type = airfoil_value.get("type")
if isinstance(airfoil_type, str):
return airfoil_type
return None
def _restore_cursor_path(self, path: NodePath) -> None:
"""Restore the cursor to one path after the tree refresh settles."""
cursor_node = self.find_node_by_path(path)
if cursor_node is None:
return
self._expand_node_ancestors(cursor_node)
self.move_cursor(cursor_node, animate=False)
# Note: app is set by Textual when widget is mounted.
self.app._update_details_pane(cursor_node) # type: ignore[attr-defined]
@staticmethod
def _expand_node_ancestors(node: TreeNode[SimulationNodeData]) -> None:
"""Expand ancestor nodes so one target node can become visible."""
ancestors: list[TreeNode[SimulationNodeData]] = []
current = node.parent
while current is not None:
ancestors.append(current)
current = current.parent
for ancestor in reversed(ancestors):
ancestor.expand()
@staticmethod
def _resolve_mapping_schema(
value: SimulationValue,
fallback: SchemaNode,
) -> SchemaNode:
"""Resolve polymorphic schema entries such as airfoil definitions."""
if isinstance(value, dict):
spec_type = value.get("type")
if isinstance(spec_type, str):
candidate = AIRFOIL_SPEC_BY_TYPE.get(spec_type)
if candidate is not None and is_dataclass(candidate):
return schema_tree(candidate)
return fallback
@staticmethod
def _airfoil_source_schema(
parent_schema: SchemaClassTree,
) -> SchemaFieldTree:
"""Build one synthetic schema field for the airfoil source selector."""
return SchemaFieldTree(
name=AIRFOIL_SOURCE_FIELD_NAME,
type_name="Literal['designation', 'params']",
optional=False,
docstring=(
"Select whether this airfoil is defined by its encoded "
"designation string or by explicit parameters."
),
metadata={
"label": "Source",
"short_help": (
"Choose whether this airfoil uses designation or "
"explicit parameters."
),
"choices": ("designation", "params"),
"synthetic": True,
"parent_schema": parent_schema.class_name,
},
)