Source code for buffalo_panel.config.internal.loading

"""Load and serialize structured Buffalo Panel case specs."""

from __future__ import annotations

import json
from collections.abc import Mapping, Sequence
from dataclasses import MISSING, fields, is_dataclass
from pathlib import Path
from types import NoneType, UnionType
from typing import (
    Literal,
    TypeGuard,
    Union,
    cast,
    get_args,
    get_origin,
    get_type_hints,
)

import buffalo_wings.airfoil as bwa
import yaml

from buffalo_panel.config.internal.schema import PanelCaseSpec

type AirfoilSource = Literal["designation", "params"]
type ApproximateAirfoilSwitchResult = tuple[
    dict[str, object],
    bwa.AirfoilApproximateSwitchResult,
]

_VARIADIC_TUPLE_ARG_COUNT = 2
SUPPORTED_SUFFIXES = {".json", ".yaml", ".yml"}
AIRFOIL_SPEC_BY_TYPE: dict[str, type[object]] = {
    "biconvex": bwa.BiconvexAirfoilSpec,
    "biconvex_parabola": bwa.BiconvexParabolaAirfoilSpec,
    "circular_arc": bwa.CircularArcAirfoilSpec,
    "cst": bwa.CstAirfoilSpec,
    "ellipse": bwa.EllipseAirfoilSpec,
    "file": bwa.FileAirfoilSpec,
    "flat_plate": bwa.FlatPlateAirfoilSpec,
    "joukowski": bwa.JoukowskiAirfoilSpec,
    "naca4": bwa.Naca4AirfoilSpec,
    "naca4_modified": bwa.Naca4ModifiedAirfoilSpec,
    "naca5": bwa.Naca5AirfoilSpec,
    "naca5_modified": bwa.Naca5ModifiedAirfoilSpec,
    "naca6": bwa.Naca6AirfoilSpec,
    "naca6a": bwa.Naca6AAirfoilSpec,
    "naca16": bwa.Naca16AirfoilSpec,
    "parsec": bwa.ParsecAirfoilSpec,
    "points": bwa.PointsAirfoilSpec,
    "polygon": bwa.PolygonAirfoilSpec,
    "spline": bwa.SplineAirfoilSpec,
}


[docs] def load_panel_case(path: str | Path) -> PanelCaseSpec: """ Load a structured panel case from a JSON or YAML file. Parameters ---------- path : str | Path File path to a serialized Buffalo Panel case. Supported suffixes are ``.json``, ``.yaml``, and ``.yml``. Returns ------- PanelCaseSpec Typed case spec ready for ``solve_panel_case``. Raises ------ ValueError If the file suffix is unsupported, the parsed document is not a mapping, or the payload cannot be converted into the case schema. json.JSONDecodeError If a JSON file is syntactically invalid. yaml.YAMLError If a YAML file is syntactically invalid. """ case_path = Path(path) suffix = case_path.suffix.lower() if suffix not in SUPPORTED_SUFFIXES: msg = "Panel case files must use .json, .yaml, or .yml." raise ValueError(msg) text = case_path.read_text(encoding="utf-8") raw_data = json.loads(text) if suffix == ".json" else yaml.safe_load(text) if not isinstance(raw_data, Mapping): msg = "Panel case file must contain a mapping at the document root." raise ValueError(msg) return panel_case_from_mapping(cast(Mapping[str, object], raw_data))
[docs] def panel_case_from_mapping(data: Mapping[str, object]) -> PanelCaseSpec: """ Build a typed panel case spec from parsed mapping data. Parameters ---------- data : Mapping[str, object] Parsed JSON- or YAML-compatible payload. Returns ------- PanelCaseSpec Typed case spec with nested Buffalo Panel and Buffalo Wings schema dataclasses. Raises ------ ValueError If required fields are missing, unknown fields are present, or an airfoil entry has an unsupported schema ``type`` discriminator. """ return _build_panel_case(data)
[docs] def panel_case_to_mapping(spec: PanelCaseSpec) -> dict[str, object]: """ Convert a typed panel case spec into a YAML-friendly mapping. Parameters ---------- spec : PanelCaseSpec Typed panel case specification to serialize. Returns ------- dict[str, object] Mapping representation suitable for YAML or JSON serialization. Notes ----- Fields whose values are ``None`` are omitted from the serialized mapping. This keeps mutually exclusive schema branches such as airfoil ``designation`` versus ``params`` compact while preserving a valid payload for ``panel_case_from_mapping``. """ serialized = _serialize_value(spec) if not isinstance(serialized, dict): raise TypeError("PanelCaseSpec must serialize to a mapping.") return cast(dict[str, object], serialized)
def default_airfoil_mapping( airfoil_type: str, *, source: AirfoilSource | None = None, ) -> dict[str, object]: """ Return one starter mapping for a Buffalo Wings airfoil family. Parameters ---------- airfoil_type : str Buffalo Wings airfoil-family discriminator. source : AirfoilSource | None, optional Preferred source branch when the family supports more than one. Returns ------- dict[str, object] Serialized starter mapping for the requested family. Raises ------ ValueError If ``airfoil_type`` is unknown. """ spec = bwa.AirfoilFactory.default_spec( airfoil_type, source=source, ) serialized = _serialize_value(spec) if not isinstance(serialized, dict): raise TypeError("Airfoil spec must serialize to a mapping.") return cast(dict[str, object], serialized) def exact_airfoil_source_choices( airfoil_type: str, ) -> tuple[AirfoilSource, ...]: """Return the exact-switch source choices for one airfoil family.""" return bwa.AirfoilFactory.exact_source_choices(airfoil_type) def airfoil_mapping_source( mapping: Mapping[str, object], ) -> AirfoilSource | None: """Return which mutually exclusive source branch one mapping uses.""" return bwa.AirfoilFactory.mapping_source(mapping) def switch_airfoil_source_exact( mapping: Mapping[str, object], target_source: AirfoilSource, ) -> dict[str, object]: """Switch one exact-only NACA family between designation and params.""" switched = bwa.AirfoilFactory.switch_source_exact( mapping, target_source, ) serialized = _serialize_value(switched) if not isinstance(serialized, dict): raise TypeError("Switched airfoil spec must serialize to a mapping.") return cast(dict[str, object], serialized) def switch_airfoil_source_approx( mapping: Mapping[str, object], target_source: AirfoilSource, ) -> dict[str, object]: """Switch one airfoil entry between source branches approximately.""" switched = bwa.AirfoilFactory.switch_source_approx( mapping, target_source, ) serialized = _serialize_value(switched) if not isinstance(serialized, dict): raise TypeError("Switched airfoil spec must serialize to a mapping.") return cast(dict[str, object], serialized) def switch_airfoil_source_approx_with_metadata( mapping: Mapping[str, object], target_source: AirfoilSource, ) -> ApproximateAirfoilSwitchResult: """Switch one airfoil entry approximately and return metadata.""" result = bwa.AirfoilFactory.switch_source_approx_result( mapping, target_source, ) serialized = _serialize_value(result.spec) if not isinstance(serialized, dict): raise TypeError("Switched airfoil spec must serialize to a mapping.") return cast(dict[str, object], serialized), result
[docs] def dump_panel_case( spec: PanelCaseSpec, *, file_type: Literal["json", "yaml"] = "yaml", ) -> str: """ Serialize a typed panel case spec to JSON or YAML text. Parameters ---------- spec : PanelCaseSpec Typed panel case specification to serialize. file_type : Literal["json", "yaml"], optional Output text format. Returns ------- str Serialized JSON or YAML document text. Raises ------ ValueError If ``format`` is unsupported. """ payload = panel_case_to_mapping(spec) if file_type == "json": return json.dumps(payload, indent=2) + "\n" if file_type == "yaml": return yaml.safe_dump(payload, sort_keys=False) raise ValueError("format must be 'json' or 'yaml'.")
[docs] def save_panel_case(spec: PanelCaseSpec, path: str | Path) -> None: """ Serialize a typed panel case spec to a JSON or YAML file. Parameters ---------- spec : PanelCaseSpec Typed panel case specification to serialize. path : str | Path Destination file path. Supported suffixes are ``.json``, ``.yaml``, and ``.yml``. Raises ------ ValueError If the file suffix is unsupported. """ case_path = Path(path) suffix = case_path.suffix.lower() if suffix not in SUPPORTED_SUFFIXES: msg = "Panel case files must use .json, .yaml, or .yml." raise ValueError(msg) file_type: Literal["json", "yaml"] = "json" if suffix == ".json" else "yaml" case_path.write_text( dump_panel_case(spec, file_type=file_type), encoding="utf-8" )
def _build_panel_case(data: Mapping[str, object]) -> PanelCaseSpec: """ Build the top-level case. Parameters ---------- data : Mapping[str, object] Raw mapping data for the panel case. Returns ------- PanelCaseSpec The populated case specification. Raises ------ ValueError If the payload cannot be converted into ``PanelCaseSpec``. """ return _build_dataclass(PanelCaseSpec, dict(data)) def _serialize_value(value: object) -> object: """ Recursively convert schema values into YAML-friendly builtin containers. Parameters ---------- value : object Value from a typed schema tree. Returns ------- object Serialized value composed only of builtin scalars, lists, and dicts. """ if is_dataclass(value): serialized_items: dict[str, object] = {} for field in fields(value): field_value = getattr(value, field.name) if field_value is None: continue serialized_items[field.name] = _serialize_value(field_value) return serialized_items if isinstance(value, Mapping): value_map = cast(Mapping[str, object], value) serialized_map: dict[str, object] = {} for key, item in value_map.items(): serialized_map[key] = _serialize_value(item) return serialized_map if isinstance(value, list | tuple): items = cast(Sequence[object], value) return [_serialize_value(item) for item in items] return value def _build_dataclass[T](cls: type[T], data: Mapping[str, object]) -> T: """ Recursively convert mapping data into a dataclass instance. Parameters ---------- cls : type[T] The dataclass type to instantiate. data : Mapping[str, object] The raw data mapping for the dataclass fields. Returns ------- T An instance of the dataclass. Raises ------ TypeError If ``cls`` is not a dataclass. ValueError If unknown fields are present or required fields are missing. """ if not is_dataclass(cls): raise TypeError("cls must be a dataclass type.") type_hints = get_type_hints(cls) field_names = {field.name for field in fields(cls)} unknown = set(data) - field_names if unknown: raise ValueError( f"{cls.__name__} received unknown fields: {sorted(unknown)!r}." ) kwargs: dict[str, object] = {} for field in fields(cls): if field.name not in data: if field.default is MISSING and field.default_factory is MISSING: raise ValueError( f"{cls.__name__} missing required field {field.name!r}." ) continue value = _convert_value( data[field.name], type_hints[field.name], path=f"{cls.__name__}.{field.name}", ) _validate_constraints( value, field.metadata, path=f"{cls.__name__}.{field.name}" ) kwargs[field.name] = value return cast(T, cls(**kwargs)) def _convert_value(value: object, target_type: object, *, path: str) -> object: """ Convert one parsed value to the requested schema field type. Parameters ---------- value : object The raw value to convert. target_type : object The expected type or annotation for the value. path : str Diagnostic path for error messages (e.g. "PanelCaseSpec.units"). Returns ------- object The converted value. """ origin = get_origin(target_type) if origin is not None: return _convert_origin_value(value, origin, get_args(target_type), path) if isinstance(target_type, type) and is_dataclass(target_type): return _convert_dataclass_value( value, cast(type[object], target_type), path=path, ) # ``get_type_hints`` returns runtime annotation objects that Pyright cannot # fully narrow after the origin and dataclass checks above. return _convert_scalar_value( value, target_type, # pyright: ignore[reportUnknownArgumentType] path=path, ) def _convert_origin_value( value: object, origin: object, args: tuple[object, ...], path: str, ) -> object: """ Convert values whose field annotation has a typing origin. Parameters ---------- value : object The raw value to convert. origin : object The origin type (e.g., list, dict, Union). args : tuple[object, ...] The arguments to the generic type (e.g., item types). path : str Diagnostic path for error messages. Returns ------- object The converted value or the original value if the origin is unhandled. """ if origin is Literal: return _convert_literal(value, args, path=path) if origin is list: return _convert_list(value, args, path=path) if origin is tuple: return _convert_tuple(value, args, path=path) if origin is dict: return _convert_dict(value, args, path=path) if origin is UnionType or origin is Union: return _convert_union(value, args, path=path) return value def _convert_literal( value: object, args: tuple[object, ...], *, path: str, ) -> object: """ Convert a value for a ``typing.Literal`` annotation. Parameters ---------- value : object The raw value. args : tuple[object, ...] The valid literal values. path : str Diagnostic path for error messages. Returns ------- object The value if it is within the allowed literal values. Raises ------ ValueError If the value is not one of the allowed literals. """ if value not in args: raise ValueError(f"{path} must be one of {args!r}.") return value def _convert_list( value: object, args: tuple[object, ...], *, path: str, ) -> list[object]: """ Convert a sequence value for a list annotation. Parameters ---------- value : object The raw sequence value. args : tuple[object, ...] The item type argument for the list. path : str Diagnostic path for error messages. Returns ------- list[object] The list of converted items. Raises ------ ValueError If the value is not a sequence. """ if not _is_sequence(value): raise ValueError(f"{path} must be a list.") item_type = args[0] if args else object return [ _convert_value(item, item_type, path=f"{path}[{index}]") for index, item in enumerate(value) ] def _convert_tuple( value: object, args: tuple[object, ...], *, path: str, ) -> tuple[object, ...]: """ Convert a sequence value for a tuple annotation. Parameters ---------- value : object The raw sequence value. args : tuple[object, ...] The item type arguments for the tuple. path : str Diagnostic path for error messages. Returns ------- tuple[object, ...] The tuple of converted items. Raises ------ ValueError If the value is not a sequence or its length does not match the tuple annotation. """ if not _is_sequence(value): raise ValueError(f"{path} must be a tuple-compatible sequence.") items = list(value) if len(args) == _VARIADIC_TUPLE_ARG_COUNT and args[1] is Ellipsis: return tuple( _convert_value(item, args[0], path=f"{path}[{index}]") for index, item in enumerate(items) ) if len(items) != len(args): raise ValueError(f"{path} must contain {len(args)} items.") return tuple( _convert_value(item, item_type, path=f"{path}[{index}]") for index, (item, item_type) in enumerate(zip(items, args, strict=True)) ) def _convert_dict( value: object, args: tuple[object, ...], *, path: str, ) -> dict[object, object]: """ Convert a mapping value for a dict annotation. Parameters ---------- value : object The raw mapping value. args : tuple[object, ...] The key and value type arguments for the dict. path : str Diagnostic path for error messages. Returns ------- dict[object, object] The dictionary with converted keys and values. Raises ------ ValueError If the value is not a mapping. """ if not isinstance(value, Mapping): raise ValueError(f"{path} must be a mapping.") value_map = cast(Mapping[object, object], value) key_type = args[0] if args else object value_type = args[1] if len(args) > 1 else object return { _convert_value(key, key_type, path=f"{path}.<key>"): _convert_value( item, value_type, path=f"{path}[{key!r}]", ) for key, item in value_map.items() } def _convert_dataclass_value( value: object, target_type: type[object], *, path: str, ) -> object: """ Convert a mapping value into a dataclass instance. Parameters ---------- value : object The raw mapping value. target_type : type[object] The dataclass type. path : str Diagnostic path for error messages. Returns ------- object The instantiated dataclass. Raises ------ ValueError If the value is not a mapping. """ if not isinstance(value, Mapping): raise ValueError(f"{path} must be a mapping.") return _build_dataclass( target_type, cast(Mapping[str, object], value), ) def _convert_scalar_value( value: object, target_type: object, *, path: str, ) -> object: """ Convert scalar JSON/YAML values to builtin schema scalar types. Parameters ---------- value : object The raw scalar value. target_type : object The expected builtin type (str, int, float, bool). path : str Diagnostic path for error messages. Returns ------- object The converted scalar value. Raises ------ ValueError If the value cannot be converted to the target type. """ if target_type is str: if isinstance(value, str): return value raise ValueError(f"{path} must be a string.") if target_type is int: if isinstance(value, int) and not isinstance(value, bool): return value raise ValueError(f"{path} must be an integer.") if target_type is float: if isinstance(value, int | float) and not isinstance(value, bool): return float(value) if isinstance(value, str): try: return float(value) except ValueError: pass raise ValueError(f"{path} must be a number.") if target_type is bool: if isinstance(value, bool): return value raise ValueError(f"{path} must be a boolean.") return value def _convert_union( value: object, args: tuple[object, ...], *, path: str, ) -> object: """ Convert a value against a union field type. Parameters ---------- value : object The raw value. args : tuple[object, ...] The candidate types in the union. path : str Diagnostic path for error messages. Returns ------- object The value converted to the first matching candidate type. Raises ------ ValueError If the value does not match any candidate type in the union. """ if value is None and NoneType in args: return None if isinstance(value, Mapping): value_map = cast(Mapping[object, object], value) airfoil_spec = _convert_airfoil_union(value_map, args) if airfoil_spec is not None: return airfoil_spec value_object = value # pyright: ignore[reportUnknownVariableType] errors: list[str] = [] for candidate in args: if candidate is NoneType: continue try: return _convert_value( value_object, # pyright: ignore[reportUnknownArgumentType] candidate, path=path, ) except (TypeError, ValueError) as exc: errors.append(str(exc)) raise ValueError(f"{path} did not match any supported type: {errors!r}.") def _convert_airfoil_union( value: Mapping[object, object], args: tuple[object, ...], ) -> bwa.AirfoilDefinitionSpec | None: """Resolve one embedded Buffalo Wings airfoil union by discriminator.""" spec_type = value.get("type") if not isinstance(spec_type, str): return None spec_cls = AIRFOIL_SPEC_BY_TYPE.get(spec_type) if spec_cls is None or spec_cls not in args: raise ValueError(f"Unsupported airfoil type {spec_type!r}.") return cast( bwa.AirfoilDefinitionSpec, _build_dataclass(spec_cls, cast(Mapping[str, object], value)), ) def _is_sequence(value: object) -> TypeGuard[Sequence[object]]: """ Return whether ``value`` is a non-string sequence. Parameters ---------- value : object The value to check. Returns ------- TypeGuard[Sequence[object]] True if the value is a sequence but not a string or bytes. """ return isinstance(value, Sequence) and not isinstance(value, str | bytes) def _is_mapping(value: object) -> TypeGuard[Mapping[object, object]]: """ Return whether ``value`` is a mapping. Parameters ---------- value : object The value to check. Returns ------- TypeGuard[Mapping[object, object]] True if the value is a mapping. """ return isinstance(value, Mapping) def _validate_constraints( value: object, metadata: Mapping[str, object], *, path: str, ) -> None: """Check scalar and collection constraints from schema metadata.""" # Scalar range checks if isinstance(value, int | float) and not isinstance(value, bool): min_val = metadata.get("minimum") if min_val is not None and value < cast(float, min_val): raise ValueError(f"{path} must be >= {min_val}.") max_val = metadata.get("maximum") if max_val is not None and value > cast(float, max_val): raise ValueError(f"{path} must be <= {max_val}.") ex_min = metadata.get("exclusive_minimum") if ex_min is not None and value <= cast(float, ex_min): raise ValueError(f"{path} must be > {ex_min}.") ex_max = metadata.get("exclusive_maximum") if ex_max is not None and value >= cast(float, ex_max): raise ValueError(f"{path} must be < {ex_max}.") # Collection size checks if _is_sequence(value): min_items = metadata.get("min_items") if min_items is not None and len(value) < cast(int, min_items): raise ValueError(f"{path} must contain at least {min_items} items.") if _is_mapping(value): min_items = metadata.get("min_items") if min_items is not None and len(value) < cast(int, min_items): raise ValueError(f"{path} must contain at least {min_items} items.")