Source code for buffalo_panel.app.tui.internal.simulation_tree

"""Tree widget for browsing Buffalo Panel simulation configurations."""

from __future__ import annotations

from dataclasses import dataclass, is_dataclass
from typing import Literal

from textual.widgets import Tree
from textual.widgets._tree import (
    TreeNode,  # pyright: ignore[reportPrivateImportUsage]
)

from buffalo_panel.config import (
    PanelCaseSpec,
    SchemaClassTree,
    SchemaFieldTree,
    schema_tree,
)
from buffalo_panel.config.internal.loading import (
    AIRFOIL_SPEC_BY_TYPE,
    exact_airfoil_source_choices,
)

from .simulation_data import (
    AIRFOIL_ENTRY_FIELD_NAME,
    AIRFOIL_ENTRY_PATH_LENGTH,
    AIRFOIL_SOURCE_FIELD_NAME,
    AIRFOIL_SWITCH_SOURCE_COUNT,
    TWO_DIMENSIONAL_TUPLE_LENGTH,
    NodePath,
    SimulationDataManager,
    SimulationValue,
)

type SchemaNode = SchemaClassTree | SchemaFieldTree
type NodeKind = Literal["field", "list_item", "mapping_entry"]


[docs] @dataclass(frozen=True, slots=True) class SimulationNodeData: """Structured metadata stored on each visible tree node.""" path: NodePath """Canonical location of the node value inside simulation data.""" schema: SchemaNode """Schema node describing the field or object at ``path``.""" kind: NodeKind """Whether the node represents a field, list item, or mapping entry.""" key_label: str | None = None """User-facing label for renamable mapping keys when relevant.""" @property def is_mapping_entry(self) -> bool: """Return whether the node represents one mapping entry key.""" return self.kind == "mapping_entry"
[docs] class SimulationTree(Tree[SimulationNodeData]): """Widget for navigating the simulation configuration hierarchy."""
[docs] def rebuild( self, manager: SimulationDataManager, cursor_path: NodePath | None = None, ) -> None: """Re-populate the tree from the current simulation data.""" expanded_paths = self._get_expanded_paths() self.root.remove_children() root_schema = schema_tree(PanelCaseSpec) self._populate_tree(self.root, manager.data, root_schema, (), manager) self._restore_expansion_state(expanded_paths) self.root.expand() if cursor_path is not None: self.call_after_refresh(self._restore_cursor_path, cursor_path)
def _populate_tree( self, parent: TreeNode[SimulationNodeData], data: SimulationValue, schema_node: SchemaNode | None, path: NodePath, manager: SimulationDataManager, ) -> None: """Recursively populate the tree from schema-aware case data.""" if isinstance(schema_node, SchemaClassTree): if not isinstance(data, dict): return field_map = {f.name: f for f in schema_node.fields} def add_field(name: str) -> None: if name not in field_map or name not in data: return field_schema = field_map[name] child_path = (*path, name) child_value = data[name] child_label = str(field_schema.metadata.get("label", name)) child_schema = self._resolve_mapping_schema( child_value, field_schema.object_schema or field_schema, ) self._add_tree_node( parent=parent, label=child_label, value=child_value, node_data=SimulationNodeData( path=child_path, schema=field_schema, kind="field", ), child_schema=child_schema, manager=manager, ) add_field("type") self._add_airfoil_source_node(parent, schema_node, path, manager) for field_info in schema_node.fields: if field_info.name != "type": add_field(field_info.name) return item_schema = ( schema_node.item_schema if isinstance(schema_node, SchemaFieldTree) else None ) parent_meta = ( schema_node.metadata if isinstance(schema_node, SchemaFieldTree) else {} ) if isinstance(data, list): item_node_schema = item_schema or schema_node if item_node_schema is None: return label_field = parent_meta.get("collection_item_label_field") tuple_labels: tuple[str, ...] = () if ( parent_meta.get("value_kind") == "tuple" and parent_meta.get("tuple_length") == TWO_DIMENSIONAL_TUPLE_LENGTH ): tuple_labels = ("x", "y") for index, child_value in enumerate(data): if index < len(tuple_labels): label = tuple_labels[index] elif ( isinstance(child_value, dict) and isinstance(label_field, str) and label_field in child_value ): label = str(child_value[label_field]) else: label = f"[{index}]" child_path = (*path, index) self._add_tree_node( parent=parent, label=label, value=child_value, node_data=SimulationNodeData( path=child_path, schema=item_node_schema, kind="list_item", ), child_schema=item_node_schema, manager=manager, ) return if isinstance(data, dict) and isinstance(schema_node, SchemaFieldTree): key_label = parent_meta.get("collection_key_label", "Name") for key, child_value in data.items(): child_path = (*path, key) resolved_schema = self._resolve_mapping_schema( child_value, item_schema or schema_node, ) self._add_tree_node( parent=parent, label=str(key), value=child_value, node_data=SimulationNodeData( path=child_path, schema=resolved_schema, kind="mapping_entry", key_label=str(key_label), ), child_schema=resolved_schema, manager=manager, ) def _add_tree_node( self, *, parent: TreeNode[SimulationNodeData], label: str, value: SimulationValue, node_data: SimulationNodeData, child_schema: SchemaNode | None, manager: SimulationDataManager, ) -> None: """Create a node, attach metadata, and recurse for containers.""" if manager.is_branch_value(value): node = parent.add(label, data=node_data, expand=False) if child_schema is not None: self._populate_tree( node, value, child_schema, node_data.path, manager ) return parent.add_leaf(label, data=node_data) def _get_expanded_paths(self) -> set[NodePath]: """Return paths for all currently expanded nodes.""" expanded: set[NodePath] = set() stack: list[TreeNode[SimulationNodeData]] = [self.root] while stack: node = stack.pop() if node.is_expanded and node.data is not None: expanded.add(node.data.path) stack.extend(node.children) return expanded def _restore_expansion_state(self, expanded_paths: set[NodePath]) -> None: """Expand nodes whose paths were previously expanded.""" stack: list[TreeNode[SimulationNodeData]] = [self.root] while stack: node = stack.pop() if node.data is not None and node.data.path in expanded_paths: node.expand() stack.extend(node.children)
[docs] def find_node_by_path( self, path: NodePath ) -> TreeNode[SimulationNodeData] | None: """Return the first tree node whose stored path matches ``path``.""" stack: list[TreeNode[SimulationNodeData]] = [self.root] while stack: node = stack.pop() if node.data is not None and node.data.path == path: return node stack.extend(node.children) return None
def _add_airfoil_source_node( self, parent: TreeNode[SimulationNodeData], schema_node: SchemaClassTree, path: NodePath, manager: SimulationDataManager, ) -> None: """Insert one synthetic source selector for exact-switch airfoils.""" airfoil_type = self._airfoil_entry_type(path, manager) if airfoil_type is None: return source_choices = exact_airfoil_source_choices(airfoil_type) if len(source_choices) < AIRFOIL_SWITCH_SOURCE_COUNT: return parent.add_leaf( "Source", data=SimulationNodeData( path=(*path, AIRFOIL_SOURCE_FIELD_NAME), schema=self._airfoil_source_schema(schema_node), kind="field", ), ) @staticmethod def _airfoil_entry_type( path: NodePath, manager: SimulationDataManager ) -> str | None: """Return the airfoil family for one airfoil-entry path.""" if ( len(path) != AIRFOIL_ENTRY_PATH_LENGTH or path[0] != "geometry" or path[1] != "bodies" or not isinstance(path[2], int) or path[3] != AIRFOIL_ENTRY_FIELD_NAME ): return None airfoil_value = manager.get_value(path) if not isinstance(airfoil_value, dict): return None airfoil_type = airfoil_value.get("type") if isinstance(airfoil_type, str): return airfoil_type return None def _restore_cursor_path(self, path: NodePath) -> None: """Restore the cursor to one path after the tree refresh settles.""" cursor_node = self.find_node_by_path(path) if cursor_node is None: return self._expand_node_ancestors(cursor_node) self.move_cursor(cursor_node, animate=False) # Note: app is set by Textual when widget is mounted. self.app._update_details_pane(cursor_node) # type: ignore[attr-defined] @staticmethod def _expand_node_ancestors(node: TreeNode[SimulationNodeData]) -> None: """Expand ancestor nodes so one target node can become visible.""" ancestors: list[TreeNode[SimulationNodeData]] = [] current = node.parent while current is not None: ancestors.append(current) current = current.parent for ancestor in reversed(ancestors): ancestor.expand() @staticmethod def _resolve_mapping_schema( value: SimulationValue, fallback: SchemaNode, ) -> SchemaNode: """Resolve polymorphic schema entries such as airfoil definitions.""" if isinstance(value, dict): spec_type = value.get("type") if isinstance(spec_type, str): candidate = AIRFOIL_SPEC_BY_TYPE.get(spec_type) if candidate is not None and is_dataclass(candidate): return schema_tree(candidate) return fallback @staticmethod def _airfoil_source_schema( parent_schema: SchemaClassTree, ) -> SchemaFieldTree: """Build one synthetic schema field for the airfoil source selector.""" return SchemaFieldTree( name=AIRFOIL_SOURCE_FIELD_NAME, type_name="Literal['designation', 'params']", optional=False, docstring=( "Select whether this airfoil is defined by its encoded " "designation string or by explicit parameters." ), metadata={ "label": "Source", "short_help": ( "Choose whether this airfoil uses designation or " "explicit parameters." ), "choices": ("designation", "params"), "synthetic": True, "parent_schema": parent_schema.class_name, }, )