Source code for fdscore.ers_spectral

"""Spectral/random extreme-response spectra from acceleration PSD inputs.

This module adds a PSD-domain ERS workflow alongside the existing
time-domain ``compute_ers_time(...)`` route. The spectral workflow
interprets the ERS as an expected extreme response of a stationary
Gaussian process over a specified duration, rather than as the maximum
observed in a realized time record.
"""

from __future__ import annotations

import numpy as np

from .types import PSDParams, SDOFParams, ERSResult
from .grid import build_frequency_grid
from .validate import (
    ValidationError,
    _bool_flag_or_raise,
    _finite_positive_float_or_raise,
    ers_compat_dict,
    validate_sdof,
)
from .sdof_transfer import build_transfer_psd
from .psd_welch import compute_psd_welch
from ._psd_utils import clip_tiny_negative_psd_or_raise


_EULER_GAMMA = 0.5772156649015329
_EDGE_BANDWIDTH_SCALE = 2.0


def _gaussian_peak_factor_davenport(peak_rate_hz: np.ndarray, duration_s: float) -> np.ndarray:
    """Return the expected Gaussian maximum factor via Davenport/Gumbel."""
    n_peaks = np.maximum(np.asarray(peak_rate_hz, dtype=float) * float(duration_s), np.e)
    u = np.sqrt(2.0 * np.log(n_peaks))
    return u + (_EULER_GAMMA / u)


def _expected_max_from_response_psd(
    *,
    f_hz: np.ndarray,
    response_psd: np.ndarray,
    duration_s: float,
) -> np.ndarray:
    """Convert one-sided response PSDs into expected extreme maxima."""
    freq = np.asarray(f_hz, dtype=float).reshape(-1)
    psd = np.asarray(response_psd, dtype=float)

    if psd.ndim == 1:
        psd = psd.reshape(1, -1)
    if psd.ndim != 2 or psd.shape[1] != freq.size:
        raise ValidationError("response_psd must be a 1D or 2D array aligned with f_hz.")

    m0 = np.trapezoid(psd, freq, axis=1)
    m2 = np.trapezoid(((2.0 * np.pi * freq) ** 2)[None, :] * psd, freq, axis=1)

    out = np.zeros(psd.shape[0], dtype=float)
    positive = m0 > 0.0
    if not np.any(positive):
        return out

    peak_rate_hz = np.zeros_like(m0)
    peak_rate_hz[positive] = (1.0 / (2.0 * np.pi)) * np.sqrt(np.maximum(m2[positive] / m0[positive], 0.0))
    g = _gaussian_peak_factor_davenport(peak_rate_hz[positive], float(duration_s))
    out[positive] = g * np.sqrt(m0[positive])
    return out


def _integrals_b(h: np.ndarray, b: int, zeta: float) -> np.ndarray:
    """Return Lalanne integral ``I_b`` for the requested exponent."""
    alpha = 2.0 * np.sqrt(1.0 - zeta * zeta)
    beta = 2.0 * (1.0 - 2.0 * zeta * zeta)

    c0 = zeta / (np.pi * alpha)
    c1 = (h * h + alpha * h + 1.0) / (h * h - alpha * h + 1.0)
    c2 = (2.0 * h + alpha) / (2.0 * zeta)
    c3 = (2.0 * h - alpha) / (2.0 * zeta)
    c4 = 4.0 * zeta / np.pi
    c5 = np.arctan(c2) + np.arctan(c3)

    if b == 0:
        return c0 * np.log(c1) + (1.0 / np.pi) * c5
    if b == 2:
        return c0 * np.log(1.0 / c1) + (1.0 / np.pi) * c5
    if b == 4:
        i0 = c0 * np.log(c1) + (1.0 / np.pi) * c5
        i2 = -c0 * np.log(c1) + (1.0 / np.pi) * c5
        return c4 * h + beta * i2 - i0
    raise ValidationError("Supported Lalanne integral exponents are 0, 2, and 4.")


def _rms_sum_lalanne(
    *,
    f0_hz: np.ndarray,
    f_psd_hz: np.ndarray,
    psd_baseacc: np.ndarray,
    zeta: float,
    motion: str,
) -> np.ndarray:
    """Compute Lalanne RMS sums over a piecewise-constant PSD."""
    f0 = np.asarray(f0_hz, dtype=float).reshape(-1)
    f_psd = np.asarray(f_psd_hz, dtype=float).reshape(-1)
    pyy = np.asarray(psd_baseacc, dtype=float)

    if pyy.ndim == 1:
        pyy_rows = pyy.reshape(1, -1)
    elif pyy.ndim == 2:
        pyy_rows = pyy
    else:
        raise ValidationError("psd_baseacc must be a 1D or 2D array aligned with f_psd_hz.")

    if pyy_rows.shape[1] != f_psd.size:
        raise ValidationError("psd_baseacc must be aligned with f_psd_hz.")
    if pyy_rows.shape[0] not in (1, f0.size):
        raise ValidationError(
            "2D psd_baseacc inputs must have either one row or one row per oscillator in f0_hz."
        )

    df = float(f_psd[1] - f_psd[0])
    f1 = f_psd - 0.5 * df
    f2 = f_psd + 0.5 * df
    f1[0] = f_psd[0]
    f2[-1] = f_psd[-1]

    b = {"rel_disp": 0, "rel_vel": 2, "rel_acc": 4}[motion]
    h1 = f1[:, None] / f0[None, :]
    h2 = f2[:, None] / f0[None, :]
    delta = _integrals_b(h2, b, zeta) - _integrals_b(h1, b, zeta)
    return np.sum(delta.T * pyy_rows, axis=1)


def _expected_max_acc_lalanne(
    *,
    f_psd_hz: np.ndarray,
    psd_baseacc: np.ndarray,
    f0_hz: np.ndarray,
    zeta: float,
    duration_s: float,
) -> np.ndarray:
    """Compute acceleration ERS via the Lalanne relative-displacement backbone.

    This formulation uses relative-displacement and relative-velocity RMS
    values to estimate the expected extreme acceleration response through
    the classical random-vibration ERS relationship

    ``ERS = (2*pi*f_n)^2 * z_rms * g(n0, T)``.
    """
    f0 = np.asarray(f0_hz, dtype=float).reshape(-1)
    c0 = np.pi / (4.0 * float(zeta))
    c_disp = c0 / (((2.0 * np.pi) ** 4) * (f0**3))
    c_vel = c0 / (((2.0 * np.pi) ** 2) * f0)

    z_rms2 = _rms_sum_lalanne(
        f0_hz=f0,
        f_psd_hz=f_psd_hz,
        psd_baseacc=psd_baseacc,
        zeta=float(zeta),
        motion="rel_disp",
    ) * c_disp
    dz_rms2 = _rms_sum_lalanne(
        f0_hz=f0,
        f_psd_hz=f_psd_hz,
        psd_baseacc=psd_baseacc,
        zeta=float(zeta),
        motion="rel_vel",
    ) * c_vel

    z_rms = np.sqrt(np.maximum(z_rms2, 0.0))
    dz_rms = np.sqrt(np.maximum(dz_rms2, 0.0))

    out = np.zeros_like(f0)
    positive = z_rms > 0.0
    if not np.any(positive):
        return out

    n0 = np.zeros_like(f0)
    n0[positive] = (1.0 / np.pi) * dz_rms[positive] / z_rms[positive]
    g = _gaussian_peak_factor_davenport(n0[positive], float(duration_s))
    out[positive] = ((2.0 * np.pi * f0[positive]) ** 2) * z_rms[positive] * g
    return out


def _extend_psd_high_edge_auto(
    *,
    f_psd_hz: np.ndarray,
    psd_baseacc: np.ndarray,
    fn_hz: float,
    zeta: float,
    nyquist_hz: float,
    bandwidth_scale: float = _EDGE_BANDWIDTH_SCALE,
) -> tuple[np.ndarray, np.ndarray]:
    """Extend a cropped PSD tail with a raised-cosine taper.

    The completion span is proportional to the oscillator bandwidth:

    ``delta_f = bandwidth_scale * zeta * f_n``.

    The added tail starts at the last available PSD point and decays
    smoothly to zero over that span, clipped at ``nyquist_hz``.
    """
    f = np.asarray(f_psd_hz, dtype=float).reshape(-1)
    p = np.asarray(psd_baseacc, dtype=float).reshape(-1)

    if f.size < 2 or f.shape != p.shape:
        raise ValidationError("f_psd_hz and psd_baseacc must be aligned 1D arrays.")
    if float(nyquist_hz) <= float(f[-1]):
        return f, p

    delta_f = float(bandwidth_scale) * float(zeta) * float(fn_hz)
    if delta_f <= 0.0:
        return f, p

    f_stop = min(float(nyquist_hz), float(f[-1]) + delta_f)
    if f_stop <= float(f[-1]):
        return f, p

    df = float(f[1] - f[0])
    extra_f = np.arange(float(f[-1]) + df, f_stop + 0.5 * df, df)
    if extra_f.size == 0:
        return f, p

    x = (extra_f - float(f[-1])) / (f_stop - float(f[-1]))
    taper = 0.5 * (1.0 + np.cos(np.pi * x))
    extra_p = float(p[-1]) * taper
    return np.concatenate([f, extra_f]), np.concatenate([p, extra_p])


[docs] def compute_ers_spectral_psd( f_psd_hz: np.ndarray, psd_baseacc: np.ndarray, *, duration_s: float, sdof: SDOFParams, nyquist_hz: float | None = None, edge_correction: bool = True, ) -> ERSResult: r"""Compute a spectral/random ERS from a one-sided acceleration PSD. This workflow interprets the ERS as the expected extreme response of a stationary Gaussian process over ``duration_s``. It is therefore a different quantity from :func:`fdscore.compute_ers_time`, which extracts the maximum observed in a realized time history. For generic metrics, the response PSD is built as .. math:: P_{resp}(f; f_n) = \left| H(f; f_n) \right|^2 P_{base}(f) and the response moments .. math:: m_0 = \int P_{resp}(f) \, df .. math:: m_2 = \int (2\pi f)^2 P_{resp}(f) \, df are used to estimate the peak rate and the expected Gaussian maximum. For ``sdof.metric="acc"``, the implementation uses a Lalanne-style relative-displacement/random-peak formulation that is better aligned with classical random-vibration ERS practice for acceleration spectra. Other metrics use the exact response-PSD moment route. Parameters ---------- f_psd_hz : numpy.ndarray One-sided PSD frequency grid in Hz. psd_baseacc : numpy.ndarray One-sided base-acceleration PSD defined on ``f_psd_hz``. duration_s : float Exposure duration associated with the expected extreme response. sdof : SDOFParams Oscillator-grid definition and response metric. nyquist_hz : float or None, optional Original Nyquist limit of the underlying time-history sampling. This is used only when ``edge_correction=True`` and the PSD has been cropped below that limit. edge_correction : bool, optional Whether to apply an automatic high-frequency edge correction for cropped PSDs. The correction is a per-oscillator raised-cosine taper over a bandwidth proportional to ``zeta * f_n`` and is a no-op when the PSD already reaches ``nyquist_hz``. Returns ------- ERSResult Spectral/random ERS evaluated on the oscillator grid defined by ``sdof``. Notes ----- This route assumes that the PSD is an adequate descriptor of the environment and that Gaussian extreme-value statistics are an acceptable approximation for the response process. The high-frequency edge correction exists because PSD exports are often cropped below the original Nyquist frequency for plotting or exchange. Near the top of the oscillator grid, that cropping can artificially suppress the response moments unless the missing tail is reconstructed. References ---------- Davenport, A. G. (1964). Note on the Distribution of the Largest Value of a Random Function with Application to Gust Loading. Lalanne, C. Mechanical Vibration and Shock Analysis, Volume 3 (Random Vibration). """ validate_sdof(sdof) if sdof.metric not in ("pv", "disp", "vel", "acc"): raise ValidationError("sdof.metric must be one of: 'pv','disp','vel','acc'.") duration_s = _finite_positive_float_or_raise(duration_s, field="duration_s") edge_correction = _bool_flag_or_raise(edge_correction, field="edge_correction") nyquist_val = None if nyquist_hz is None else _finite_positive_float_or_raise(nyquist_hz, field="nyquist_hz") f_psd = np.asarray(f_psd_hz, dtype=float).reshape(-1) Pyy = np.asarray(psd_baseacc, dtype=float).reshape(-1) if f_psd.size < 2 or Pyy.size < 2 or f_psd.shape != Pyy.shape: raise ValidationError("f_psd_hz and psd_baseacc must be 1D arrays of the same length >= 2.") if not (np.all(np.isfinite(f_psd)) and np.all(np.isfinite(Pyy))): raise ValidationError("PSD inputs must be finite.") if not np.all(np.diff(f_psd) > 0): raise ValidationError("f_psd_hz must be strictly increasing.") if float(f_psd[0]) < 0.0: raise ValidationError("f_psd_hz must be >= 0.") Pyy = clip_tiny_negative_psd_or_raise(Pyy, label="psd_baseacc") f0 = build_frequency_grid(sdof) zeta = 1.0 / (2.0 * float(sdof.q)) if sdof.metric == "acc": response = _expected_max_acc_lalanne( f_psd_hz=f_psd, psd_baseacc=Pyy, f0_hz=f0, zeta=zeta, duration_s=duration_s, ) response_model = "lalanne_relative_displacement" else: H = build_transfer_psd(f_psd_hz=f_psd, f0_hz=f0, zeta=zeta, metric=sdof.metric) P_resp = (np.abs(H) ** 2) * Pyy[None, :] response = _expected_max_from_response_psd(f_hz=f_psd, response_psd=P_resp, duration_s=duration_s) response_model = "exact_response_psd_moments" edge_applied = False corrected_count = 0 if edge_correction and nyquist_val is not None and float(f_psd[-1]) < nyquist_val: corrected = response.copy() if sdof.metric == "acc": grouped_ext: dict[int, list[tuple[int, np.ndarray]]] = {} f_ext_by_group: dict[int, np.ndarray] = {} for i, fn in enumerate(f0): f_ext, P_ext = _extend_psd_high_edge_auto( f_psd_hz=f_psd, psd_baseacc=Pyy, fn_hz=float(fn), zeta=float(zeta), nyquist_hz=float(nyquist_val), ) extra_count = int(f_ext.size - f_psd.size) if extra_count <= 0: continue grouped_ext.setdefault(extra_count, []).append((i, P_ext)) f_ext_by_group[extra_count] = f_ext for extra_count, members in grouped_ext.items(): indices = np.asarray([idx for idx, _ in members], dtype=int) p_matrix = np.vstack([p_ext for _, p_ext in members]) corrected[indices] = _expected_max_acc_lalanne( f_psd_hz=f_ext_by_group[extra_count], psd_baseacc=p_matrix, f0_hz=f0[indices], zeta=zeta, duration_s=duration_s, ) corrected_count += int(indices.size) else: for i, fn in enumerate(f0): f_ext, P_ext = _extend_psd_high_edge_auto( f_psd_hz=f_psd, psd_baseacc=Pyy, fn_hz=float(fn), zeta=float(zeta), nyquist_hz=float(nyquist_val), ) if f_ext.size == f_psd.size: continue H_ext = build_transfer_psd( f_psd_hz=f_ext, f0_hz=np.asarray([fn], dtype=float), zeta=zeta, metric=sdof.metric, ) P_resp_ext = (np.abs(H_ext) ** 2) * P_ext[None, :] corrected[i] = _expected_max_from_response_psd( f_hz=f_ext, response_psd=P_resp_ext, duration_s=duration_s, )[0] corrected_count += 1 response = corrected edge_applied = corrected_count > 0 meta = { "compat": ers_compat_dict( metric=sdof.metric, q=sdof.q, peak_mode="expected_gaussian_max", engine="spectral_random_psd", ers_kind="random_extreme_response_spectrum", ), "metric": sdof.metric, "q": float(sdof.q), "zeta": float(zeta), "peak_mode": "expected_gaussian_max", "provenance": { "source": "compute_ers_spectral_psd", "duration_s": float(duration_s), "peak_model": "gaussian_davenport", "response_model": response_model, "edge_correction_enabled": bool(edge_correction), "edge_correction_applied": bool(edge_applied), "edge_correction_mode": "auto_bandwidth_taper", "edge_bandwidth_scale": float(_EDGE_BANDWIDTH_SCALE), "edge_corrected_oscillator_count": int(corrected_count), "input_psd_fmax_hz": float(f_psd[-1]), "nyquist_hz": None if nyquist_val is None else float(nyquist_val), }, } return ERSResult(f=f0, response=np.asarray(response, dtype=float), meta=meta)
[docs] def compute_ers_spectral_time( x: np.ndarray, fs: float, *, sdof: SDOFParams, psd: PSDParams, duration_s: float | None = None, edge_correction: bool = True, ) -> ERSResult: r"""Compute a spectral/random ERS from a time history through Welch. This convenience route first estimates a one-sided acceleration PSD with :func:`fdscore.compute_psd_welch` and then delegates to :func:`fdscore.compute_ers_spectral_psd`. Parameters ---------- x : numpy.ndarray One-dimensional base-acceleration time history. fs : float Sampling rate in Hz. sdof : SDOFParams Oscillator-grid definition and response metric. psd : PSDParams PSD-estimation settings passed to ``compute_psd_welch(...)``. duration_s : float or None, optional Exposure duration associated with the expected extreme response. If ``None``, uses ``len(x) / fs``. edge_correction : bool, optional Whether to enable the automatic high-frequency edge correction when the internally estimated PSD is cropped below Nyquist. Returns ------- ERSResult Spectral/random ERS evaluated from the internally estimated PSD. Notes ----- This function is a convenience wrapper. It is not equivalent to :func:`fdscore.compute_ers_time`, which operates on the realized time history directly and extracts the maximum observed response. """ if not bool(psd.onesided): raise ValidationError("compute_ers_spectral_time requires PSDParams.onesided=True.") fs = _finite_positive_float_or_raise(fs, field="fs") x_arr = np.asarray(x, dtype=float) if x_arr.ndim != 1: raise ValidationError("x must be a 1D array.") if duration_s is None: duration_s = float(x_arr.size) / float(fs) else: duration_s = _finite_positive_float_or_raise(duration_s, field="duration_s") f_psd, Pyy = compute_psd_welch(x_arr, fs=fs, psd=psd) out = compute_ers_spectral_psd( f_psd_hz=f_psd, psd_baseacc=Pyy, duration_s=float(duration_s), sdof=sdof, nyquist_hz=float(fs) / 2.0, edge_correction=edge_correction, ) meta = dict(out.meta) provenance = dict(meta.get("provenance", {})) provenance["source"] = "compute_ers_spectral_time" provenance["fs_hz"] = float(fs) provenance["psd_method"] = str(psd.method) provenance["psd_window"] = str(psd.window) provenance["psd_nperseg"] = None if psd.nperseg is None else int(psd.nperseg) provenance["psd_noverlap"] = None if psd.noverlap is None else int(psd.noverlap) provenance["psd_detrend"] = str(psd.detrend) provenance["psd_fmin_hz"] = None if psd.fmin is None else float(psd.fmin) provenance["psd_fmax_hz"] = None if psd.fmax is None else float(psd.fmax) meta["provenance"] = provenance return ERSResult(f=np.asarray(out.f, dtype=float), response=np.asarray(out.response, dtype=float), meta=meta)