"""CSV export helpers for surface post-processing outputs."""
from __future__ import annotations
import csv
from collections.abc import Sequence
from pathlib import Path
from typing import Literal
import numpy as np
from buffalo_panel.post.internal.results import (
PanelSolution2D,
SurfaceQuantities2D,
)
from buffalo_panel.type_aliases import FloatArray, IntArray
type SurfaceQuantityRequest = Literal[
"velocity",
"cp",
"panel_lift_coefficient",
]
type SurfaceColumnName = Literal[
"tangent_velocity",
"normal_velocity",
"cp",
"panel_lift_coefficient",
]
[docs]
def export_surface_quantities_csv(
results: PanelSolution2D,
path: str | Path,
*,
case_name: str,
quantities: Sequence[SurfaceQuantityRequest],
) -> None:
"""
Export requested surface quantities to one CSV file.
Parameters
----------
results : PanelSolution2D
Runtime post-processing solution to export.
path : str | Path
Output CSV path.
case_name : str
Human-readable case name written to the metadata preamble.
quantities : list[SurfaceQuantityRequest]
Requested surface quantities to expand into CSV columns.
"""
output_path = Path(path)
columns = _expanded_surface_columns(quantities)
body_rows = _surface_rows(results, columns=columns)
with output_path.open("w", encoding="utf-8", newline="") as csv_file:
csv_file.write(f"# case_name: {case_name}\n")
csv_file.write(
f"# cl_pressure: {results.integrated.cl_pressure:.16e}\n"
)
csv_file.write(
f"# cl_circulation: {results.integrated.cl_circulation:.16e}\n"
)
csv_file.write(
f"# cd_pressure: {results.integrated.cd_pressure:.16e}\n"
)
csv_file.write(
f"# cm_pressure: {results.integrated.cm_pressure:.16e}\n"
)
csv_file.write("\n")
writer = csv.writer(csv_file)
writer.writerow([
"body_id",
"surface_side",
"x",
"s_from_le",
*columns,
])
writer.writerows(body_rows)
def _expanded_surface_columns(
quantities: Sequence[SurfaceQuantityRequest],
) -> list[SurfaceColumnName]:
"""Expand user-facing surface quantity requests into CSV columns."""
columns: list[SurfaceColumnName] = []
for quantity in quantities:
if quantity == "velocity":
columns.extend(["tangent_velocity", "normal_velocity"])
elif quantity == "cp":
columns.append("cp")
else:
columns.append("panel_lift_coefficient")
return columns
def _surface_rows(
results: PanelSolution2D,
*,
columns: list[SurfaceColumnName],
) -> list[list[str | float]]:
"""Build ordered lower-then-upper surface rows for one solution."""
geometry = results.geometry
body_indices = np.asarray(geometry.body_panel_indices, dtype=np.int32)
leading_edge_node_index = int(np.argmin(geometry.x))
lower_indices = body_indices[body_indices < leading_edge_node_index]
upper_indices = body_indices[body_indices >= leading_edge_node_index]
lower_rows = _surface_rows_for_side(
results,
columns=columns,
panel_indices=lower_indices[::-1],
surface_side="lower",
)
upper_rows = _surface_rows_for_side(
results,
columns=columns,
panel_indices=upper_indices,
surface_side="upper",
)
return lower_rows + upper_rows
def _surface_rows_for_side(
results: PanelSolution2D,
*,
columns: list[SurfaceColumnName],
panel_indices: IntArray,
surface_side: str,
) -> list[list[str | float]]:
"""Build CSV rows for one surface branch ordered from LE to TE."""
geometry = results.geometry
surface = results.surface
rows: list[list[str | float]] = []
s_running = 0.0
for raw_index in np.asarray(panel_indices, dtype=np.int32).tolist():
panel_index = int(raw_index)
s_from_le = s_running + 0.5 * float(geometry.length[panel_index])
row: list[str | float] = [
geometry.geometry_name,
surface_side,
float(surface.x[panel_index]),
s_from_le,
]
for column in columns:
row.append(_surface_quantity_value(surface, column, panel_index))
rows.append(row)
s_running += float(geometry.length[panel_index])
return rows
def _surface_quantity_value(
surface: SurfaceQuantities2D,
quantity: SurfaceColumnName,
panel_index: int,
) -> float:
"""Return one surface quantity value for the requested panel index."""
if quantity == "tangent_velocity":
if not surface.supports_surface_velocity:
raise ValueError(
"Surface tangent velocity is not available for this solution."
)
values: FloatArray = surface.tangent_velocity
elif quantity == "normal_velocity":
if not surface.supports_surface_velocity:
raise ValueError(
"Surface normal velocity is not available for this solution."
)
values = surface.normal_velocity
elif quantity == "cp":
if not surface.supports_surface_pressure:
raise ValueError(
"Surface pressure coefficient is not available for this "
"solution."
)
values = surface.cp
else:
if surface.panel_lift_coefficient is None:
raise ValueError(
"Panel lift coefficient is not available for this solution."
)
values = surface.panel_lift_coefficient
return float(values[panel_index])