Source code for fdscore.shock_rolling

"""Rolling shock-spectrum evaluation on detected event windows.

This module computes SRS or PVSS values for each previously detected
shock-event window and stacks the results into a time-indexed matrix.
The returned object is useful for comparing transient severity across a
sequence of discrete shocks.
"""

from __future__ import annotations

import numpy as np

from .shock import compute_pvss_time, compute_srs_time
from .types import RollingERSResult, SDOFParams, ShockEventSet
from .validate import ValidationError


def _validate_rolling_inputs(
    *,
    x: np.ndarray,
    fs: float,
    events: ShockEventSet,
    peak_mode: str,
) -> np.ndarray:
    x = np.asarray(x, dtype=float)
    if x.ndim != 1 or x.size < 4:
        raise ValidationError("x must be a 1D array with length >= 4.")
    if not np.all(np.isfinite(x)):
        raise ValidationError("x must contain only finite values.")
    if not np.isfinite(fs) or float(fs) <= 0.0:
        raise ValidationError("fs must be finite and > 0.")
    if not isinstance(events, ShockEventSet):
        raise ValidationError("events must be a ShockEventSet.")
    if peak_mode not in ("abs", "pos", "neg"):
        raise ValidationError("rolling shock spectra currently support peak_mode 'abs', 'pos', or 'neg'.")
    if not np.isclose(float(events.fs), float(fs), rtol=0.0, atol=1e-12):
        raise ValidationError("events.fs must match fs.")
    if int(events.n_samples) != int(x.size):
        raise ValidationError("events.n_samples must match len(x).")
    for i, ev in enumerate(events.events):
        if int(ev.stop_index) - int(ev.start_index) < 4:
            raise ValidationError(
                f"Shock event window at index {i} must contain at least 4 samples for rolling shock spectra."
            )
    return x


def _rolling_from_events(
    *,
    x: np.ndarray,
    fs: float,
    sdof: SDOFParams,
    events: ShockEventSet,
    detrend: str,
    strict_nyquist: bool,
    peak_mode: str,
    spectrum_fn,
    source: str,
    ers_kind: str,
) -> RollingERSResult:
    x = _validate_rolling_inputs(x=x, fs=fs, events=events, peak_mode=peak_mode)

    rows: list[np.ndarray] = []
    centers: list[float] = []
    f_ref: np.ndarray | None = None

    for ev in events.events:
        seg = x[int(ev.start_index):int(ev.stop_index)]
        spec = spectrum_fn(
            seg,
            fs,
            sdof,
            detrend=detrend,
            strict_nyquist=strict_nyquist,
            peak_mode=peak_mode,
        )
        if f_ref is None:
            f_ref = np.asarray(spec.f, dtype=float)
        else:
            if spec.f.shape != f_ref.shape or not np.allclose(spec.f, f_ref, rtol=0.0, atol=1e-12):
                raise ValidationError("Incompatible rolling frequency grid across event windows.")
        rows.append(np.asarray(spec.response, dtype=float))
        centers.append(float(ev.peak_time_s))

    if f_ref is None:
        # No events: still expose the validated oscillator grid shape via an empty stack.
        empty = spectrum_fn(
            np.zeros(max(4, min(8, int(x.size))), dtype=float),
            fs,
            sdof,
            detrend="none",
            strict_nyquist=strict_nyquist,
            peak_mode=peak_mode,
        )
        f_ref = np.asarray(empty.f, dtype=float)
        response = np.zeros((0, f_ref.size), dtype=float)
        t_center = np.zeros(0, dtype=float)
    else:
        response = np.vstack(rows)
        t_center = np.asarray(centers, dtype=float)

    meta = {
        "source": source,
        "metric": sdof.metric,
        "q": float(sdof.q),
        "peak_mode": peak_mode,
        "ers_kind": ers_kind,
        "n_windows": int(response.shape[0]),
        "event_detector": dict(events.meta or {}),
        "provenance": {
            "detrend": detrend,
            "strict_nyquist": bool(strict_nyquist),
        },
    }
    return RollingERSResult(f=f_ref, t_center_s=t_center, response=response, meta=meta)


[docs] def compute_rolling_srs_time( x: np.ndarray, fs: float, sdof: SDOFParams, events: ShockEventSet, *, detrend: str = "median", strict_nyquist: bool = True, peak_mode: str = "abs", ) -> RollingERSResult: """Compute SRS on each detected shock-event window. Parameters ---------- x : numpy.ndarray One-dimensional source signal containing the detected events. fs : float Sampling rate in Hz. sdof : SDOFParams Oscillator-grid definition. For this wrapper, ``sdof.metric`` must be ``"acc"``. events : object Detected event windows that define the rolling segments. The input must be a ``ShockEventSet`` instance. detrend : {"linear", "mean", "median", "none"}, optional Preprocessing mode applied independently to each event window. strict_nyquist : bool, optional Whether oscillator frequencies at or above Nyquist should raise an error instead of being clipped. peak_mode : {"abs", "pos", "neg"}, optional Peak convention used for each event-local spectrum. Returns ------- object Rolling result returned as a ``RollingERSResult``. Each row corresponds to one detected event and each column corresponds to one oscillator frequency. Notes ----- Rolling shock spectra are currently event-window based rather than fixed-hop or sliding-window based. The time coordinate stored in the result corresponds to the detected peak time of each event. If no events are present, the function still returns a valid empty matrix with the validated oscillator frequency grid. """ return _rolling_from_events( x=x, fs=fs, sdof=sdof, events=events, detrend=detrend, strict_nyquist=strict_nyquist, peak_mode=peak_mode, spectrum_fn=compute_srs_time, source="compute_rolling_srs_time", ers_kind="shock_response_spectrum", )
[docs] def compute_rolling_pvss_time( x: np.ndarray, fs: float, sdof: SDOFParams, events: ShockEventSet, *, detrend: str = "median", strict_nyquist: bool = True, peak_mode: str = "abs", ) -> RollingERSResult: """Compute PVSS on each detected shock-event window. Parameters ---------- x : numpy.ndarray One-dimensional source signal containing the detected events. fs : float Sampling rate in Hz. sdof : SDOFParams Oscillator-grid definition. For this wrapper, ``sdof.metric`` must be ``"pv"``. events : object Detected event windows that define the rolling segments. The input must be a ``ShockEventSet`` instance. detrend : {"linear", "mean", "median", "none"}, optional Preprocessing mode applied independently to each event window. strict_nyquist : bool, optional Whether oscillator frequencies at or above Nyquist should raise an error instead of being clipped. peak_mode : {"abs", "pos", "neg"}, optional Peak convention used for each event-local spectrum. Returns ------- object Rolling result returned as a ``RollingERSResult``. Each row corresponds to one detected event and each column corresponds to one oscillator frequency. Notes ----- Rolling shock spectra are currently event-window based rather than fixed-hop or sliding-window based. The time coordinate stored in the result corresponds to the detected peak time of each event. If no events are present, the function still returns a valid empty matrix with the validated oscillator frequency grid. """ return _rolling_from_events( x=x, fs=fs, sdof=sdof, events=events, detrend=detrend, strict_nyquist=strict_nyquist, peak_mode=peak_mode, spectrum_fn=compute_pvss_time, source="compute_rolling_pvss_time", ers_kind="pseudo_velocity_shock_spectrum", )