"""Event-oriented shock detection for transient time histories.
This module locates discrete shock events in a one-dimensional signal and
returns window metadata suitable for event-wise SRS or PVSS analysis.
Detection is threshold based and is performed on a preprocessed signal
using ``scipy.signal.find_peaks``.
"""
from __future__ import annotations
import math
import numpy as np
from scipy.signal import find_peaks
from ._shock_signal import preprocess_shock_signal
from .types import ShockEvent, ShockEventSet
from .validate import ValidationError
def _validate_event_detector_inputs(
*,
x: np.ndarray,
fs: float,
detrend: str,
polarity: str,
threshold_reference: str,
threshold_multiplier: float,
threshold_value: float | None,
min_separation_s: float,
window_s: float | None,
) -> np.ndarray:
x = np.asarray(x, dtype=float)
if x.ndim != 1 or x.size < 4:
raise ValidationError("x must be a 1D array with length >= 4.")
if not np.all(np.isfinite(x)):
raise ValidationError("x must contain only finite values.")
if not np.isfinite(fs) or float(fs) <= 0.0:
raise ValidationError("fs must be finite and > 0.")
preprocess_shock_signal(np.zeros(4, dtype=float), detrend=detrend)
if polarity not in ("abs", "pos", "neg"):
raise ValidationError("polarity must be one of: 'abs', 'pos', 'neg'.")
if threshold_reference not in ("rms", "std", "peak"):
raise ValidationError("threshold_reference must be one of: 'rms', 'std', 'peak'.")
if not np.isfinite(threshold_multiplier) or float(threshold_multiplier) <= 0.0:
raise ValidationError("threshold_multiplier must be finite and > 0.")
if threshold_value is not None and (not np.isfinite(threshold_value) or float(threshold_value) <= 0.0):
raise ValidationError("threshold_value must be finite and > 0 when provided.")
if not np.isfinite(min_separation_s) or float(min_separation_s) < 0.0:
raise ValidationError("min_separation_s must be finite and >= 0.")
if window_s is not None and (not np.isfinite(window_s) or float(window_s) <= 0.0):
raise ValidationError("window_s must be finite and > 0 when provided.")
return x
def _event_threshold(
detector: np.ndarray,
*,
threshold_reference: str,
threshold_multiplier: float,
threshold_value: float | None,
) -> float:
if threshold_value is not None:
return float(threshold_value)
if threshold_reference == "rms":
ref = math.sqrt(float(np.mean(detector * detector)))
elif threshold_reference == "std":
ref = float(np.std(detector))
else:
ref = max(0.0, float(np.max(detector)))
return float(threshold_multiplier) * ref
[docs]
def detect_shock_events(
x: np.ndarray,
fs: float,
*,
detrend: str = "median",
polarity: str = "abs",
threshold_reference: str = "rms",
threshold_multiplier: float = 5.0,
threshold_value: float | None = None,
min_separation_s: float = 0.05,
window_s: float | None = None,
) -> ShockEventSet:
"""Detect discrete shock events in a 1D acceleration time history.
Parameters
----------
x : numpy.ndarray
One-dimensional acceleration time history.
fs : float
Sampling rate in Hz.
detrend : {"linear", "mean", "median", "none"}, optional
Preprocessing mode applied before peak detection.
polarity : {"abs", "pos", "neg"}, optional
Whether detection is performed on absolute, positive-only, or
negative-only peaks.
threshold_reference : {"rms", "std", "peak"}, optional
Reference statistic used when ``threshold_value`` is not provided.
threshold_multiplier : float, optional
Positive multiplier applied to the selected threshold reference.
threshold_value : float or None, optional
Explicit absolute threshold. When provided, it overrides the
reference-based threshold calculation.
min_separation_s : float, optional
Minimum temporal separation enforced between detected peaks.
window_s : float or None, optional
Total extraction-window duration centered on each detected peak.
When omitted, the detector derives the window from
``min_separation_s``.
Returns
-------
object
Detected events returned as a ``ShockEventSet``, together with
sampling metadata and detector settings.
Notes
-----
This is an axis-first detector intended for event-oriented shock
workflows. It is most useful when a long recording contains multiple
isolated transients and each transient should be analyzed with a
dedicated local spectrum.
The stored event windows are index based. They can therefore be used
directly to slice the original signal without recomputing detection
state.
"""
x = _validate_event_detector_inputs(
x=x,
fs=fs,
detrend=detrend,
polarity=polarity,
threshold_reference=threshold_reference,
threshold_multiplier=threshold_multiplier,
threshold_value=threshold_value,
min_separation_s=min_separation_s,
window_s=window_s,
)
y = preprocess_shock_signal(x, detrend=detrend)
if polarity == "abs":
detector = np.abs(y)
elif polarity == "pos":
detector = y
else:
detector = -y
threshold = _event_threshold(
detector,
threshold_reference=threshold_reference,
threshold_multiplier=threshold_multiplier,
threshold_value=threshold_value,
)
distance = max(1, int(math.floor(float(min_separation_s) * float(fs))) + 1)
peaks, _ = find_peaks(detector, height=threshold, distance=distance)
if window_s is None:
half_window = max(0, int(math.floor(float(min_separation_s) * float(fs) / 2.0)))
else:
half_window = max(0, int(math.floor(float(window_s) * float(fs) / 2.0)))
events: list[ShockEvent] = []
for idx in peaks.tolist():
start = max(0, idx - half_window)
stop = min(int(y.size), idx + half_window + 1)
peak_value = float(y[idx])
events.append(
ShockEvent(
peak_index=int(idx),
start_index=int(start),
stop_index=int(stop),
peak_time_s=float(idx / float(fs)),
start_time_s=float(start / float(fs)),
stop_time_s=float((stop - 1) / float(fs)),
peak_value=peak_value,
peak_abs=abs(peak_value),
polarity="pos" if peak_value >= 0.0 else "neg",
)
)
meta = {
"source": "detect_shock_events",
"detrend": detrend,
"polarity": polarity,
"threshold_reference": threshold_reference,
"threshold_multiplier": float(threshold_multiplier),
"threshold_value": None if threshold_value is None else float(threshold_value),
"effective_threshold": float(threshold),
"min_separation_s": float(min_separation_s),
"window_s": float(window_s) if window_s is not None else float(min_separation_s),
}
return ShockEventSet(events=tuple(events), fs=float(fs), n_samples=int(y.size), meta=meta)