Source code for fdscore.metrics

from __future__ import annotations

from typing import Sequence

import numpy as np

from .types import PSDResult, PSDMetricsResult
from .validate import ValidationError
from ._psd_utils import clip_tiny_negative_psd_or_raise

G0 = 9.80665
EULER_GAMMA = 0.5772156649015329
DEFAULT_BANDS_HZ: tuple[tuple[float, float], ...] = (
    (5.0, 20.0),
    (20.0, 80.0),
    (80.0, 200.0),
    (200.0, 400.0),
)


def _integrate_trapz(y: np.ndarray, x: np.ndarray) -> float:
    if y.size == 0 or x.size == 0:
        return 0.0

    trapz_fn = getattr(np, "trapezoid", None)
    if trapz_fn is not None:
        return float(trapz_fn(y, x))

    dx = np.diff(x)
    avg = 0.5 * (y[1:] + y[:-1])
    return float(np.sum(avg * dx))


def _rms_from_psd(psd: np.ndarray, freq_hz: np.ndarray) -> float:
    area = _integrate_trapz(np.clip(psd, 0.0, None), freq_hz)
    return float(np.sqrt(max(area, 0.0)))


def _spectral_moment(psd: np.ndarray, freq_hz: np.ndarray, order: int) -> float:
    w = 2.0 * np.pi * freq_hz
    return _integrate_trapz((w**order) * np.clip(psd, 0.0, None), freq_hz)


def _gaussian_peak_statistics(psd: np.ndarray, freq_hz: np.ndarray, duration_s: float) -> tuple[float, float, float, bool, float]:
    m0 = _spectral_moment(psd, freq_hz, order=0)
    m2 = _spectral_moment(psd, freq_hz, order=2)
    if m0 <= 0.0 or m2 <= 0.0 or duration_s <= 0.0:
        return float("nan"), float("nan"), float("nan"), False, float("nan")

    nu0 = (1.0 / (2.0 * np.pi)) * np.sqrt(m2 / m0)
    n_eff_raw = float(nu0) * float(duration_s)
    floor_applied = bool(n_eff_raw < np.e)
    n_eff = max(n_eff_raw, np.e)
    u = np.sqrt(2.0 * np.log(n_eff))
    if u <= 0.0:
        return float("nan"), float(nu0), float(n_eff), floor_applied, float(n_eff_raw)
    peak_factor = float(u + (EULER_GAMMA / u))
    return peak_factor, float(nu0), float(n_eff), floor_applied, float(n_eff_raw)


def _acceleration_to_velocity_displacement_psd(psd_acc_ms2: np.ndarray, freq_hz: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
    w = 2.0 * np.pi * freq_hz
    psd_v = np.zeros_like(psd_acc_ms2, dtype=float)
    psd_d = np.zeros_like(psd_acc_ms2, dtype=float)
    valid = w > 0.0
    psd_v[valid] = psd_acc_ms2[valid] / (w[valid] ** 2)
    psd_d[valid] = psd_acc_ms2[valid] / (w[valid] ** 4)
    return psd_v, psd_d


def _normalize_acc_unit(unit: str) -> str:
    u = str(unit).strip().lower()
    if u in ("g",):
        return "g"
    if u in ("m/s2", "m/s^2"):
        return "m/s2"
    raise ValidationError("Unsupported acceleration unit. Use 'g' or 'm/s2'.")


def _infer_acc_to_m_s2(*, acc_unit: str | None, acc_to_m_s2: float | None) -> tuple[float, str]:
    if acc_to_m_s2 is not None:
        k = float(acc_to_m_s2)
        if not np.isfinite(k) or k <= 0:
            raise ValidationError("acc_to_m_s2 must be finite and > 0.")
        return k, "scale"

    if acc_unit is not None:
        unit = _normalize_acc_unit(acc_unit)
        return (G0 if unit == "g" else 1.0), unit

    raise ValidationError(
        "Acceleration unit is required for PSD metrics. Provide acc_unit='g'|'m/s2' "
        "or acc_to_m_s2."
    )


def _validate_bands_hz(bands_hz: Sequence[tuple[float, float]]) -> tuple[tuple[float, float], ...]:
    if len(bands_hz) == 0:
        raise ValidationError("bands_hz must not be empty.")
    out: list[tuple[float, float]] = []
    for lo, hi in bands_hz:
        flo = float(lo)
        fhi = float(hi)
        if not (np.isfinite(flo) and np.isfinite(fhi) and flo >= 0 and fhi > flo):
            raise ValidationError("Each frequency band must satisfy finite 0 <= f_lo < f_hi.")
        out.append((flo, fhi))
    return tuple(out)


def _format_band_edge(freq_hz: float) -> str:
    val = float(freq_hz)
    rounded = round(val)
    if np.isclose(val, float(rounded), rtol=0.0, atol=1e-12):
        return str(int(rounded))
    label = f"{val:.6g}".replace("-", "m").replace(".", "p")
    return label.replace("+", "")


def _make_band_key(f_lo: float, f_hi: float) -> str:
    return f"rms_g_{_format_band_edge(f_lo)}_{_format_band_edge(f_hi)}Hz"


[docs] def compute_psd_metrics( psd: PSDResult | np.ndarray, *, f_hz: np.ndarray | None = None, duration_s: float | None = None, acc_unit: str | None = None, acc_to_m_s2: float | None = None, bands_hz: Sequence[tuple[float, float]] = DEFAULT_BANDS_HZ, ) -> PSDMetricsResult: r"""Compute scalar summary metrics from an acceleration PSD. The routine derives RMS, Gaussian peak estimates, velocity and displacement metrics, and band-limited RMS values from a one-sided acceleration PSD. Parameters ---------- psd : PSDResult or numpy.ndarray Either a ``PSDResult`` or a one-dimensional PSD array. f_hz : numpy.ndarray or None Frequency vector in Hz when ``psd`` is passed as a raw array. Must be omitted when ``psd`` is a ``PSDResult``. duration_s : float or None Exposure duration used for Gaussian peak estimation. If ``None``, peak-related outputs are returned as ``nan``. acc_unit : str or None Acceleration unit of the PSD values. Supported values are ``"g"`` and ``"m/s2"``. acc_to_m_s2 : float or None Explicit conversion factor from the PSD acceleration unit to :math:`m/s^2`. If provided, overrides ``acc_unit``. bands_hz : collections.abc.Sequence Sequence of ``(f_lo, f_hi)`` frequency-band pairs used to compute band-limited RMS acceleration in ``g``. Returns ------- PSDMetricsResult Structured set of scalar metrics including acceleration RMS and peak, zero-upcrossing rate, effective cycles, velocity metrics, displacement metrics, and per-band RMS values. Notes ----- RMS values are obtained by integrating the one-sided PSD. Velocity and displacement PSDs are derived from acceleration through division by :math:`\omega^2` and :math:`\omega^4`, respectively, for nonzero angular frequency. When ``duration_s`` is provided, the peak factor is estimated with a Gaussian extreme-value approximation of Davenport form: .. math:: \nu_0 = \frac{1}{2 \pi} \sqrt{\frac{m_2}{m_0}} .. math:: n_{eff} = \nu_0 T .. math:: u = \sqrt{2 \ln(n_{eff})} .. math:: peak\_factor \approx u + \frac{\gamma_E}{u} where :math:`m_0` and :math:`m_2` are spectral moments and :math:`\gamma_E` is the Euler-Mascheroni constant. The implementation floors :math:`n_{eff}` at :math:`e` to avoid a singular or negative argument in the logarithm for very short exposures or very low zero-upcrossing rates. These peak estimates assume a stationary Gaussian response process. They should be interpreted cautiously for strongly non-stationary environments, deterministic transients, or markedly non-Gaussian signals. References ---------- Davenport, A. G. (1964). "Note on the Distribution of the Largest Value of a Random Function with Application to Gust Loading." *Proceedings of the Institution of Civil Engineers*, 28(2), 187-196. """ if isinstance(psd, PSDResult): if f_hz is not None: raise ValidationError("f_hz must be None when psd is a PSDResult.") freq = np.asarray(psd.f, dtype=float).reshape(-1) p = np.asarray(psd.psd, dtype=float).reshape(-1) else: if f_hz is None: raise ValidationError("f_hz is required when psd is a raw array.") freq = np.asarray(f_hz, dtype=float).reshape(-1) p = np.asarray(psd, dtype=float).reshape(-1) if freq.ndim != 1 or p.ndim != 1 or freq.shape != p.shape or freq.size < 2: raise ValidationError("PSD inputs must be 1D arrays of the same length >= 2.") if not (np.all(np.isfinite(freq)) and np.all(np.isfinite(p))): raise ValidationError("PSD inputs must be finite.") if not np.all(np.diff(freq) > 0): raise ValidationError("Frequency vector must be strictly increasing.") if np.any(freq < 0): raise ValidationError("Frequency vector must be >= 0.") p = clip_tiny_negative_psd_or_raise(p, label="PSD input") bands = _validate_bands_hz(bands_hz) band_keys = [_make_band_key(f_lo, f_hi) for f_lo, f_hi in bands] if len(set(band_keys)) != len(band_keys): raise ValidationError("bands_hz must map to unique output keys in band_rms_g.") if duration_s is not None: if not np.isfinite(duration_s) or float(duration_s) <= 0.0: raise ValidationError("duration_s must be finite and > 0 when provided.") dur = float(duration_s) else: dur = float("nan") acc_scale, resolved_unit = _infer_acc_to_m_s2(acc_unit=acc_unit, acc_to_m_s2=acc_to_m_s2) psd_acc_ms2 = p * (float(acc_scale) ** 2) psd_acc_g2 = psd_acc_ms2 / (G0**2) rms_acc_m_s2 = _rms_from_psd(psd_acc_ms2, freq) rms_acc_g = rms_acc_m_s2 / G0 if np.isfinite(dur): peak_factor, nu0, n_eff, n_eff_floor_applied, n_eff_raw = _gaussian_peak_statistics(psd_acc_ms2, freq, dur) else: peak_factor, nu0, n_eff, n_eff_floor_applied, n_eff_raw = float("nan"), float("nan"), float("nan"), False, float("nan") peak_acc_m_s2 = float(rms_acc_m_s2 * peak_factor) if np.isfinite(peak_factor) else float("nan") peak_acc_g = peak_acc_m_s2 / G0 if np.isfinite(peak_acc_m_s2) else float("nan") psd_v, psd_d = _acceleration_to_velocity_displacement_psd(psd_acc_ms2, freq) rms_vel_m_s = _rms_from_psd(psd_v, freq) rms_disp_m = _rms_from_psd(psd_d, freq) if np.isfinite(dur): peak_factor_v, nu0_v, n_eff_v, n_eff_floor_applied_v, n_eff_raw_v = _gaussian_peak_statistics(psd_v, freq, dur) peak_factor_d, nu0_d, n_eff_d, n_eff_floor_applied_d, n_eff_raw_d = _gaussian_peak_statistics(psd_d, freq, dur) else: peak_factor_v, nu0_v, n_eff_v, n_eff_floor_applied_v, n_eff_raw_v = float("nan"), float("nan"), float("nan"), False, float("nan") peak_factor_d, nu0_d, n_eff_d, n_eff_floor_applied_d, n_eff_raw_d = float("nan"), float("nan"), float("nan"), False, float("nan") peak_vel_m_s = float(rms_vel_m_s * peak_factor_v) if np.isfinite(peak_factor_v) else float("nan") peak_disp_m = float(rms_disp_m * peak_factor_d) if np.isfinite(peak_factor_d) else float("nan") peak_disp_mm = float(peak_disp_m * 1000.0) if np.isfinite(peak_disp_m) else float("nan") disp_pk_pk_mm = float(2.0 * peak_disp_mm) if np.isfinite(peak_disp_mm) else float("nan") band_rms_g: dict[str, float] = {} band_coverage: dict[str, dict[str, float | bool | None]] = {} for f_lo, f_hi in bands: key = _make_band_key(f_lo, f_hi) mask = (freq >= f_lo) & (freq <= f_hi) if np.any(mask): freq_band = freq[mask] band_rms_g[key] = _rms_from_psd(psd_acc_g2[mask], freq_band) covered_lo = float(freq_band[0]) covered_hi = float(freq_band[-1]) partial = bool((covered_lo > (f_lo + 1e-12)) or (covered_hi < (f_hi - 1e-12))) coverage_fraction = float(max(covered_hi - covered_lo, 0.0) / (f_hi - f_lo)) band_coverage[key] = { "requested_hz": (float(f_lo), float(f_hi)), "covered_hz": (covered_lo, covered_hi), "has_data": True, "partial_coverage": partial, "coverage_fraction": coverage_fraction, } else: band_rms_g[key] = float("nan") band_coverage[key] = { "requested_hz": (float(f_lo), float(f_hi)), "covered_hz": None, "has_data": False, "partial_coverage": False, "coverage_fraction": 0.0, } out_meta = { "source": "compute_psd_metrics", "input_acc_unit": resolved_unit, "acc_to_m_s2": float(acc_scale), "duration_s": dur, "bands_hz": [tuple(map(float, b)) for b in bands], "band_coverage": band_coverage, "peak_statistics": { "acc": { "zero_upcrossing_hz": float(nu0), "effective_cycles_raw": float(n_eff_raw), "effective_cycles_used": float(n_eff), "n_eff_floor_applied": bool(n_eff_floor_applied), }, "vel": { "zero_upcrossing_hz": float(nu0_v), "effective_cycles_raw": float(n_eff_raw_v), "effective_cycles_used": float(n_eff_v), "n_eff_floor_applied": bool(n_eff_floor_applied_v), }, "disp": { "zero_upcrossing_hz": float(nu0_d), "effective_cycles_raw": float(n_eff_raw_d), "effective_cycles_used": float(n_eff_d), "n_eff_floor_applied": bool(n_eff_floor_applied_d), }, }, } return PSDMetricsResult( rms_acc_g=float(rms_acc_g), rms_acc_m_s2=float(rms_acc_m_s2), peak_acc_g=float(peak_acc_g), peak_acc_m_s2=float(peak_acc_m_s2), peak_factor=float(peak_factor), zero_upcrossing_hz=float(nu0), effective_cycles=float(n_eff), rms_vel_m_s=float(rms_vel_m_s), peak_vel_m_s=float(peak_vel_m_s), rms_disp_mm=float(rms_disp_m * 1000.0), peak_disp_mm=float(peak_disp_mm), disp_pk_pk_mm=float(disp_pk_pk_mm), band_rms_g=band_rms_g, meta=out_meta, )