"""Load and serialize structured Buffalo Panel case specs."""
from __future__ import annotations
import json
from collections.abc import Mapping, Sequence
from dataclasses import MISSING, fields, is_dataclass
from pathlib import Path
from types import NoneType, UnionType
from typing import (
Literal,
TypeGuard,
Union,
cast,
get_args,
get_origin,
get_type_hints,
)
import buffalo_wings.airfoil as bwa
import yaml
from buffalo_panel.config.internal.schema import PanelCaseSpec
type AirfoilSource = Literal["designation", "params"]
type ApproximateAirfoilSwitchResult = tuple[
dict[str, object],
bwa.AirfoilApproximateSwitchResult,
]
_VARIADIC_TUPLE_ARG_COUNT = 2
SUPPORTED_SUFFIXES = {".json", ".yaml", ".yml"}
AIRFOIL_SPEC_BY_TYPE: dict[str, type[object]] = {
"biconvex": bwa.BiconvexAirfoilSpec,
"biconvex_parabola": bwa.BiconvexParabolaAirfoilSpec,
"circular_arc": bwa.CircularArcAirfoilSpec,
"cst": bwa.CstAirfoilSpec,
"ellipse": bwa.EllipseAirfoilSpec,
"file": bwa.FileAirfoilSpec,
"flat_plate": bwa.FlatPlateAirfoilSpec,
"joukowski": bwa.JoukowskiAirfoilSpec,
"naca4": bwa.Naca4AirfoilSpec,
"naca4_modified": bwa.Naca4ModifiedAirfoilSpec,
"naca5": bwa.Naca5AirfoilSpec,
"naca5_modified": bwa.Naca5ModifiedAirfoilSpec,
"naca6": bwa.Naca6AirfoilSpec,
"naca6a": bwa.Naca6AAirfoilSpec,
"naca16": bwa.Naca16AirfoilSpec,
"parsec": bwa.ParsecAirfoilSpec,
"points": bwa.PointsAirfoilSpec,
"polygon": bwa.PolygonAirfoilSpec,
"spline": bwa.SplineAirfoilSpec,
}
[docs]
def load_panel_case(path: str | Path) -> PanelCaseSpec:
"""
Load a structured panel case from a JSON or YAML file.
Parameters
----------
path : str | Path
File path to a serialized Buffalo Panel case.
Supported suffixes are ``.json``, ``.yaml``, and ``.yml``.
Returns
-------
PanelCaseSpec
Typed case spec ready for ``solve_panel_case``.
Raises
------
ValueError
If the file suffix is unsupported, the parsed document is not a
mapping, or the payload cannot be converted into the case schema.
json.JSONDecodeError
If a JSON file is syntactically invalid.
yaml.YAMLError
If a YAML file is syntactically invalid.
"""
case_path = Path(path)
suffix = case_path.suffix.lower()
if suffix not in SUPPORTED_SUFFIXES:
msg = "Panel case files must use .json, .yaml, or .yml."
raise ValueError(msg)
text = case_path.read_text(encoding="utf-8")
raw_data = json.loads(text) if suffix == ".json" else yaml.safe_load(text)
if not isinstance(raw_data, Mapping):
msg = "Panel case file must contain a mapping at the document root."
raise ValueError(msg)
return panel_case_from_mapping(cast(Mapping[str, object], raw_data))
[docs]
def panel_case_from_mapping(data: Mapping[str, object]) -> PanelCaseSpec:
"""
Build a typed panel case spec from parsed mapping data.
Parameters
----------
data : Mapping[str, object]
Parsed JSON- or YAML-compatible payload.
Returns
-------
PanelCaseSpec
Typed case spec with nested Buffalo Panel and Buffalo Wings schema
dataclasses.
Raises
------
ValueError
If required fields are missing, unknown fields are present, or an
airfoil entry has an unsupported schema ``type`` discriminator.
"""
return _build_panel_case(data)
[docs]
def panel_case_to_mapping(spec: PanelCaseSpec) -> dict[str, object]:
"""
Convert a typed panel case spec into a YAML-friendly mapping.
Parameters
----------
spec : PanelCaseSpec
Typed panel case specification to serialize.
Returns
-------
dict[str, object]
Mapping representation suitable for YAML or JSON serialization.
Notes
-----
Fields whose values are ``None`` are omitted from the serialized mapping.
This keeps mutually exclusive schema branches such as airfoil
``designation`` versus ``params`` compact while preserving a valid payload
for ``panel_case_from_mapping``.
"""
serialized = _serialize_value(spec)
if not isinstance(serialized, dict):
raise TypeError("PanelCaseSpec must serialize to a mapping.")
return cast(dict[str, object], serialized)
def default_airfoil_mapping(
airfoil_type: str,
*,
source: AirfoilSource | None = None,
) -> dict[str, object]:
"""
Return one starter mapping for a Buffalo Wings airfoil family.
Parameters
----------
airfoil_type : str
Buffalo Wings airfoil-family discriminator.
source : AirfoilSource | None, optional
Preferred source branch when the family supports more than one.
Returns
-------
dict[str, object]
Serialized starter mapping for the requested family.
Raises
------
ValueError
If ``airfoil_type`` is unknown.
"""
spec = bwa.AirfoilFactory.default_spec(
airfoil_type,
source=source,
)
serialized = _serialize_value(spec)
if not isinstance(serialized, dict):
raise TypeError("Airfoil spec must serialize to a mapping.")
return cast(dict[str, object], serialized)
def exact_airfoil_source_choices(
airfoil_type: str,
) -> tuple[AirfoilSource, ...]:
"""Return the exact-switch source choices for one airfoil family."""
return bwa.AirfoilFactory.exact_source_choices(airfoil_type)
def airfoil_mapping_source(
mapping: Mapping[str, object],
) -> AirfoilSource | None:
"""Return which mutually exclusive source branch one mapping uses."""
return bwa.AirfoilFactory.mapping_source(mapping)
def switch_airfoil_source_exact(
mapping: Mapping[str, object],
target_source: AirfoilSource,
) -> dict[str, object]:
"""Switch one exact-only NACA family between designation and params."""
switched = bwa.AirfoilFactory.switch_source_exact(
mapping,
target_source,
)
serialized = _serialize_value(switched)
if not isinstance(serialized, dict):
raise TypeError("Switched airfoil spec must serialize to a mapping.")
return cast(dict[str, object], serialized)
def switch_airfoil_source_approx(
mapping: Mapping[str, object],
target_source: AirfoilSource,
) -> dict[str, object]:
"""Switch one airfoil entry between source branches approximately."""
switched = bwa.AirfoilFactory.switch_source_approx(
mapping,
target_source,
)
serialized = _serialize_value(switched)
if not isinstance(serialized, dict):
raise TypeError("Switched airfoil spec must serialize to a mapping.")
return cast(dict[str, object], serialized)
def switch_airfoil_source_approx_with_metadata(
mapping: Mapping[str, object],
target_source: AirfoilSource,
) -> ApproximateAirfoilSwitchResult:
"""Switch one airfoil entry approximately and return metadata."""
result = bwa.AirfoilFactory.switch_source_approx_result(
mapping,
target_source,
)
serialized = _serialize_value(result.spec)
if not isinstance(serialized, dict):
raise TypeError("Switched airfoil spec must serialize to a mapping.")
return cast(dict[str, object], serialized), result
[docs]
def dump_panel_case(
spec: PanelCaseSpec,
*,
file_type: Literal["json", "yaml"] = "yaml",
) -> str:
"""
Serialize a typed panel case spec to JSON or YAML text.
Parameters
----------
spec : PanelCaseSpec
Typed panel case specification to serialize.
file_type : Literal["json", "yaml"], optional
Output text format.
Returns
-------
str
Serialized JSON or YAML document text.
Raises
------
ValueError
If ``format`` is unsupported.
"""
payload = panel_case_to_mapping(spec)
if file_type == "json":
return json.dumps(payload, indent=2) + "\n"
if file_type == "yaml":
return yaml.safe_dump(payload, sort_keys=False)
raise ValueError("format must be 'json' or 'yaml'.")
[docs]
def save_panel_case(spec: PanelCaseSpec, path: str | Path) -> None:
"""
Serialize a typed panel case spec to a JSON or YAML file.
Parameters
----------
spec : PanelCaseSpec
Typed panel case specification to serialize.
path : str | Path
Destination file path.
Supported suffixes are ``.json``, ``.yaml``, and ``.yml``.
Raises
------
ValueError
If the file suffix is unsupported.
"""
case_path = Path(path)
suffix = case_path.suffix.lower()
if suffix not in SUPPORTED_SUFFIXES:
msg = "Panel case files must use .json, .yaml, or .yml."
raise ValueError(msg)
file_type: Literal["json", "yaml"] = "json" if suffix == ".json" else "yaml"
case_path.write_text(
dump_panel_case(spec, file_type=file_type), encoding="utf-8"
)
def _build_panel_case(data: Mapping[str, object]) -> PanelCaseSpec:
"""
Build the top-level case.
Parameters
----------
data : Mapping[str, object]
Raw mapping data for the panel case.
Returns
-------
PanelCaseSpec
The populated case specification.
Raises
------
ValueError
If the payload cannot be converted into ``PanelCaseSpec``.
"""
return _build_dataclass(PanelCaseSpec, dict(data))
def _serialize_value(value: object) -> object:
"""
Recursively convert schema values into YAML-friendly builtin containers.
Parameters
----------
value : object
Value from a typed schema tree.
Returns
-------
object
Serialized value composed only of builtin scalars, lists, and dicts.
"""
if is_dataclass(value):
serialized_items: dict[str, object] = {}
for field in fields(value):
field_value = getattr(value, field.name)
if field_value is None:
continue
serialized_items[field.name] = _serialize_value(field_value)
return serialized_items
if isinstance(value, Mapping):
value_map = cast(Mapping[str, object], value)
serialized_map: dict[str, object] = {}
for key, item in value_map.items():
serialized_map[key] = _serialize_value(item)
return serialized_map
if isinstance(value, list | tuple):
items = cast(Sequence[object], value)
return [_serialize_value(item) for item in items]
return value
def _build_dataclass[T](cls: type[T], data: Mapping[str, object]) -> T:
"""
Recursively convert mapping data into a dataclass instance.
Parameters
----------
cls : type[T]
The dataclass type to instantiate.
data : Mapping[str, object]
The raw data mapping for the dataclass fields.
Returns
-------
T
An instance of the dataclass.
Raises
------
TypeError
If ``cls`` is not a dataclass.
ValueError
If unknown fields are present or required fields are missing.
"""
if not is_dataclass(cls):
raise TypeError("cls must be a dataclass type.")
type_hints = get_type_hints(cls)
field_names = {field.name for field in fields(cls)}
unknown = set(data) - field_names
if unknown:
raise ValueError(
f"{cls.__name__} received unknown fields: {sorted(unknown)!r}."
)
kwargs: dict[str, object] = {}
for field in fields(cls):
if field.name not in data:
if field.default is MISSING and field.default_factory is MISSING:
raise ValueError(
f"{cls.__name__} missing required field {field.name!r}."
)
continue
value = _convert_value(
data[field.name],
type_hints[field.name],
path=f"{cls.__name__}.{field.name}",
)
_validate_constraints(
value, field.metadata, path=f"{cls.__name__}.{field.name}"
)
kwargs[field.name] = value
return cast(T, cls(**kwargs))
def _convert_value(value: object, target_type: object, *, path: str) -> object:
"""
Convert one parsed value to the requested schema field type.
Parameters
----------
value : object
The raw value to convert.
target_type : object
The expected type or annotation for the value.
path : str
Diagnostic path for error messages (e.g. "PanelCaseSpec.units").
Returns
-------
object
The converted value.
"""
origin = get_origin(target_type)
if origin is not None:
return _convert_origin_value(value, origin, get_args(target_type), path)
if isinstance(target_type, type) and is_dataclass(target_type):
return _convert_dataclass_value(
value,
cast(type[object], target_type),
path=path,
)
# ``get_type_hints`` returns runtime annotation objects that Pyright cannot
# fully narrow after the origin and dataclass checks above.
return _convert_scalar_value(
value,
target_type, # pyright: ignore[reportUnknownArgumentType]
path=path,
)
def _convert_origin_value(
value: object,
origin: object,
args: tuple[object, ...],
path: str,
) -> object:
"""
Convert values whose field annotation has a typing origin.
Parameters
----------
value : object
The raw value to convert.
origin : object
The origin type (e.g., list, dict, Union).
args : tuple[object, ...]
The arguments to the generic type (e.g., item types).
path : str
Diagnostic path for error messages.
Returns
-------
object
The converted value or the original value if the origin is unhandled.
"""
if origin is Literal:
return _convert_literal(value, args, path=path)
if origin is list:
return _convert_list(value, args, path=path)
if origin is tuple:
return _convert_tuple(value, args, path=path)
if origin is dict:
return _convert_dict(value, args, path=path)
if origin is UnionType or origin is Union:
return _convert_union(value, args, path=path)
return value
def _convert_literal(
value: object,
args: tuple[object, ...],
*,
path: str,
) -> object:
"""
Convert a value for a ``typing.Literal`` annotation.
Parameters
----------
value : object
The raw value.
args : tuple[object, ...]
The valid literal values.
path : str
Diagnostic path for error messages.
Returns
-------
object
The value if it is within the allowed literal values.
Raises
------
ValueError
If the value is not one of the allowed literals.
"""
if value not in args:
raise ValueError(f"{path} must be one of {args!r}.")
return value
def _convert_list(
value: object,
args: tuple[object, ...],
*,
path: str,
) -> list[object]:
"""
Convert a sequence value for a list annotation.
Parameters
----------
value : object
The raw sequence value.
args : tuple[object, ...]
The item type argument for the list.
path : str
Diagnostic path for error messages.
Returns
-------
list[object]
The list of converted items.
Raises
------
ValueError
If the value is not a sequence.
"""
if not _is_sequence(value):
raise ValueError(f"{path} must be a list.")
item_type = args[0] if args else object
return [
_convert_value(item, item_type, path=f"{path}[{index}]")
for index, item in enumerate(value)
]
def _convert_tuple(
value: object,
args: tuple[object, ...],
*,
path: str,
) -> tuple[object, ...]:
"""
Convert a sequence value for a tuple annotation.
Parameters
----------
value : object
The raw sequence value.
args : tuple[object, ...]
The item type arguments for the tuple.
path : str
Diagnostic path for error messages.
Returns
-------
tuple[object, ...]
The tuple of converted items.
Raises
------
ValueError
If the value is not a sequence or its length does not match
the tuple annotation.
"""
if not _is_sequence(value):
raise ValueError(f"{path} must be a tuple-compatible sequence.")
items = list(value)
if len(args) == _VARIADIC_TUPLE_ARG_COUNT and args[1] is Ellipsis:
return tuple(
_convert_value(item, args[0], path=f"{path}[{index}]")
for index, item in enumerate(items)
)
if len(items) != len(args):
raise ValueError(f"{path} must contain {len(args)} items.")
return tuple(
_convert_value(item, item_type, path=f"{path}[{index}]")
for index, (item, item_type) in enumerate(zip(items, args, strict=True))
)
def _convert_dict(
value: object,
args: tuple[object, ...],
*,
path: str,
) -> dict[object, object]:
"""
Convert a mapping value for a dict annotation.
Parameters
----------
value : object
The raw mapping value.
args : tuple[object, ...]
The key and value type arguments for the dict.
path : str
Diagnostic path for error messages.
Returns
-------
dict[object, object]
The dictionary with converted keys and values.
Raises
------
ValueError
If the value is not a mapping.
"""
if not isinstance(value, Mapping):
raise ValueError(f"{path} must be a mapping.")
value_map = cast(Mapping[object, object], value)
key_type = args[0] if args else object
value_type = args[1] if len(args) > 1 else object
return {
_convert_value(key, key_type, path=f"{path}.<key>"): _convert_value(
item,
value_type,
path=f"{path}[{key!r}]",
)
for key, item in value_map.items()
}
def _convert_dataclass_value(
value: object,
target_type: type[object],
*,
path: str,
) -> object:
"""
Convert a mapping value into a dataclass instance.
Parameters
----------
value : object
The raw mapping value.
target_type : type[object]
The dataclass type.
path : str
Diagnostic path for error messages.
Returns
-------
object
The instantiated dataclass.
Raises
------
ValueError
If the value is not a mapping.
"""
if not isinstance(value, Mapping):
raise ValueError(f"{path} must be a mapping.")
return _build_dataclass(
target_type,
cast(Mapping[str, object], value),
)
def _convert_scalar_value(
value: object,
target_type: object,
*,
path: str,
) -> object:
"""
Convert scalar JSON/YAML values to builtin schema scalar types.
Parameters
----------
value : object
The raw scalar value.
target_type : object
The expected builtin type (str, int, float, bool).
path : str
Diagnostic path for error messages.
Returns
-------
object
The converted scalar value.
Raises
------
ValueError
If the value cannot be converted to the target type.
"""
if target_type is str:
if isinstance(value, str):
return value
raise ValueError(f"{path} must be a string.")
if target_type is int:
if isinstance(value, int) and not isinstance(value, bool):
return value
raise ValueError(f"{path} must be an integer.")
if target_type is float:
if isinstance(value, int | float) and not isinstance(value, bool):
return float(value)
if isinstance(value, str):
try:
return float(value)
except ValueError:
pass
raise ValueError(f"{path} must be a number.")
if target_type is bool:
if isinstance(value, bool):
return value
raise ValueError(f"{path} must be a boolean.")
return value
def _convert_union(
value: object,
args: tuple[object, ...],
*,
path: str,
) -> object:
"""
Convert a value against a union field type.
Parameters
----------
value : object
The raw value.
args : tuple[object, ...]
The candidate types in the union.
path : str
Diagnostic path for error messages.
Returns
-------
object
The value converted to the first matching candidate type.
Raises
------
ValueError
If the value does not match any candidate type in the union.
"""
if value is None and NoneType in args:
return None
if isinstance(value, Mapping):
value_map = cast(Mapping[object, object], value)
airfoil_spec = _convert_airfoil_union(value_map, args)
if airfoil_spec is not None:
return airfoil_spec
value_object = value # pyright: ignore[reportUnknownVariableType]
errors: list[str] = []
for candidate in args:
if candidate is NoneType:
continue
try:
return _convert_value(
value_object, # pyright: ignore[reportUnknownArgumentType]
candidate,
path=path,
)
except (TypeError, ValueError) as exc:
errors.append(str(exc))
raise ValueError(f"{path} did not match any supported type: {errors!r}.")
def _convert_airfoil_union(
value: Mapping[object, object],
args: tuple[object, ...],
) -> bwa.AirfoilDefinitionSpec | None:
"""Resolve one embedded Buffalo Wings airfoil union by discriminator."""
spec_type = value.get("type")
if not isinstance(spec_type, str):
return None
spec_cls = AIRFOIL_SPEC_BY_TYPE.get(spec_type)
if spec_cls is None or spec_cls not in args:
raise ValueError(f"Unsupported airfoil type {spec_type!r}.")
return cast(
bwa.AirfoilDefinitionSpec,
_build_dataclass(spec_cls, cast(Mapping[str, object], value)),
)
def _is_sequence(value: object) -> TypeGuard[Sequence[object]]:
"""
Return whether ``value`` is a non-string sequence.
Parameters
----------
value : object
The value to check.
Returns
-------
TypeGuard[Sequence[object]]
True if the value is a sequence but not a string or bytes.
"""
return isinstance(value, Sequence) and not isinstance(value, str | bytes)
def _is_mapping(value: object) -> TypeGuard[Mapping[object, object]]:
"""
Return whether ``value`` is a mapping.
Parameters
----------
value : object
The value to check.
Returns
-------
TypeGuard[Mapping[object, object]]
True if the value is a mapping.
"""
return isinstance(value, Mapping)
def _validate_constraints(
value: object,
metadata: Mapping[str, object],
*,
path: str,
) -> None:
"""Check scalar and collection constraints from schema metadata."""
# Scalar range checks
if isinstance(value, int | float) and not isinstance(value, bool):
min_val = metadata.get("minimum")
if min_val is not None and value < cast(float, min_val):
raise ValueError(f"{path} must be >= {min_val}.")
max_val = metadata.get("maximum")
if max_val is not None and value > cast(float, max_val):
raise ValueError(f"{path} must be <= {max_val}.")
ex_min = metadata.get("exclusive_minimum")
if ex_min is not None and value <= cast(float, ex_min):
raise ValueError(f"{path} must be > {ex_min}.")
ex_max = metadata.get("exclusive_maximum")
if ex_max is not None and value >= cast(float, ex_max):
raise ValueError(f"{path} must be < {ex_max}.")
# Collection size checks
if _is_sequence(value):
min_items = metadata.get("min_items")
if min_items is not None and len(value) < cast(int, min_items):
raise ValueError(f"{path} must contain at least {min_items} items.")
if _is_mapping(value):
min_items = metadata.get("min_items")
if min_items is not None and len(value) < cast(int, min_items):
raise ValueError(f"{path} must contain at least {min_items} items.")