Source code for fdscore.fds_time

from __future__ import annotations

import numpy as np

from .types import SNParams, SDOFParams, FDSResult, FDSTimePlan
from .grid import build_frequency_grid
from ._time_plan import validate_time_plan_compatibility
from .validate import (
    ValidationError,
    _finite_positive_float_or_raise,
    _validate_nyquist_with_info,
    compat_dict,
    resolve_p_scale,
    validate_nyquist,
    validate_sdof,
    validate_sn,
)
from .preprocess import preprocess_signal
from .sdof_transfer import build_transfer_matrix
from .rainflow_damage import miner_damage_from_matrix
from ._fds_incremental import fds_incremental


def _fds_from_signal_fft(
    y: np.ndarray,
    *,
    fs: float,
    f0: np.ndarray,
    zeta: float,
    metric: str,
    k: float,
    c: float,
    p_scale: float,
    batch_size: int = 64,
    amplitude_from_range: bool = True,
    H: np.ndarray | None = None,
) -> np.ndarray:
    y = np.asarray(y, dtype=float)
    n = int(y.size)
    yf = np.fft.rfft(y)

    if H is None:
        H = build_transfer_matrix(fs=float(fs), n=n, f0_hz=f0, zeta=float(zeta), metric=metric)  # type: ignore[arg-type]

    out = np.zeros(H.shape[0], dtype=float)
    bs = max(1, int(batch_size))

    for i0 in range(0, H.shape[0], bs):
        i1 = min(i0 + bs, H.shape[0])
        resp = np.fft.irfft(H[i0:i1] * yf[None, :], n=n, axis=1)
        resp *= float(p_scale)

        dmg_batch = miner_damage_from_matrix(
            resp,
            k=float(k),
            c=float(c),
            amplitude_from_range=bool(amplitude_from_range),
        )
        out[i0:i1] = np.asarray(dmg_batch, dtype=float)

    return out


[docs] def prepare_fds_time_plan( *, fs: float, n_samples: int, sdof: SDOFParams, strict_nyquist: bool = True, ) -> FDSTimePlan: r"""Precompute the FFT-domain transfer data for repeated FDS evaluations. A time-domain FDS call repeatedly uses the same oscillator grid, damping, and FFT-bin transfer matrix when the sampling configuration is fixed. ``FDSTimePlan`` stores that reusable state so that repeated calls can skip transfer-matrix reconstruction. Parameters ---------- fs: Sampling rate in Hz. n_samples: Number of samples in the time histories that will reuse this plan. sdof: Oscillator-grid definition and chosen response metric. strict_nyquist: If ``True``, frequencies above Nyquist raise ``ValidationError``. If ``False``, the frequency grid is truncated to the valid Nyquist range. Returns ------- FDSTimePlan Reusable transfer plan containing the validated oscillator grid, the implied damping ratio, and the complex FFT-domain transfer matrix. Notes ----- The stored matrix ``H`` is materialized as ``complex128`` with shape ``(len(f0), n_fft_bins)``. Memory therefore scales approximately as .. math:: len(f_0) \times n_{fft\_bins} \times 16 \text{ bytes} so plans trade memory for speed. For example, 400 oscillators and a 4 s signal sampled at 1 kHz require roughly 12 MB for the matrix alone. """ validate_sdof(sdof) if sdof.metric not in ("pv", "disp", "vel", "acc"): raise ValidationError("sdof.metric must be one of: 'pv','disp','vel','acc'.") fs = _finite_positive_float_or_raise(fs, field="fs") if not isinstance(n_samples, (int, np.integer)) or int(n_samples) < 4: raise ValidationError("n_samples must be an integer >= 4.") f0 = build_frequency_grid(sdof) f0 = validate_nyquist(f0, fs=fs, strict=bool(strict_nyquist)) zeta = 1.0 / (2.0 * float(sdof.q)) H = build_transfer_matrix( fs=fs, n=int(n_samples), f0_hz=f0, zeta=float(zeta), metric=sdof.metric, ) return FDSTimePlan( fs=fs, n_samples=int(n_samples), f=np.asarray(f0, dtype=float), zeta=float(zeta), metric=sdof.metric, H=np.asarray(H), )
[docs] def compute_fds_time( x: np.ndarray, fs: float, sn: SNParams, sdof: SDOFParams, *, p_scale: float | None = None, detrend: str = "linear", strict_nyquist: bool = True, batch_size: int = 64, plan: FDSTimePlan | None = None, engine: str = "incremental", zoh_r_max: float = 0.2, ) -> FDSResult: r"""Compute a time-domain Fatigue Damage Spectrum from an input history. The returned spectrum contains Miner damage evaluated independently for each SDOF oscillator in the grid defined by ``sdof``. The result also carries a compatibility signature in ``meta["compat"]`` so that downstream operations, especially inversion, can verify that the same fatigue conventions were used. Pipeline -------- The computation follows this sequence for a base-acceleration time history ``x``: 1. Optionally preprocess the signal with ``preprocess_signal(...)`` using the requested ``detrend`` mode. 2. Validate the oscillator grid against Nyquist and derive the shared damping ratio and S-N parameters. 3. Evaluate the oscillator bank with the selected internal engine: * ``engine="incremental"`` integrates each SDOF oscillator sample-by-sample using exact ZOH state-transition matrices. For oscillators close to Nyquist, the input is adaptively upsampled to control ZOH attenuation error. * ``engine="fft"`` builds or reuses the FFT-domain transfer matrix, applies it to ``rfft(x)``, and reconstructs each oscillator response with ``irfft`` in batches. 4. Apply ASTM-style rainflow counting and Miner's linear damage rule to each oscillator response or response reversal stream. 5. Return ``FDSResult`` together with compatibility and provenance metadata describing the selected engine and preprocessing choices. Parameters ---------- x : numpy.ndarray One-dimensional base-acceleration time history. fs : float Sampling rate in Hz. sn : SNParams S-N curve definition used for Miner damage accumulation. sdof : SDOFParams Oscillator-grid definition and response metric. p_scale : float or None Optional scale factor applied to each oscillator response before rainflow counting. In physical workflows this represents the stress-response proportionality used to convert response to the fatigue-driving quantity. detrend : str Optional preprocessing mode passed to ``preprocess_signal(...)``. Supported values are ``"linear"``, ``"mean"``, and ``"none"``. strict_nyquist : bool If ``True``, oscillator frequencies above Nyquist raise ``ValidationError``. If ``False``, the grid is truncated to the valid Nyquist range and the truncation is recorded in the result metadata. batch_size : int Number of oscillators processed per inverse FFT batch. Only used when ``engine="fft"``. plan : FDSTimePlan or None Optional precomputed transfer plan created by :func:`prepare_fds_time_plan`. Only used when ``engine="fft"``. engine : {"incremental", "fft"} Internal computation engine. ``"incremental"`` (default) Sample-by-sample SDOF integration via exact ZOH state-transition matrices. Rainflow counting is performed online during integration so the full ``(n_osc, n_samples)`` response matrix is never materialised. Provides super-linear speedup over ``"fft"`` for long signals and low memory overhead regardless of signal length. ``"fft"`` Original FFT-based engine. Applies the continuous SDOF frequency response function to the signal spectrum and reconstructs each oscillator response with ``irfft``. Use this engine when exact bit-for-bit reproducibility with pre-existing results is required. .. note:: The two engines use different SDOF discretisation schemes (ZOH vs. continuous FRF on discrete FFT bins) and will therefore produce slightly different damage values, particularly for oscillators above roughly ``0.3 * fs / 2``. For the typical configuration of ``fs = 1000 Hz`` and ``fmax = 400 Hz`` the discrepancy in Miner damage is below 5 % across the full grid and below 1 % below 150 Hz. zoh_r_max : float Maximum tolerated ``f0 / Nyquist_effective`` ratio for the ``"incremental"`` engine. Controls the adaptive upsampling that corrects the ZOH attenuation error for high-frequency oscillators. Ignored when ``engine="fft"``. Smaller values yield higher accuracy at the cost of larger upsample factors for oscillators near the top of the frequency grid. Larger values prioritise throughput. * ``0.30`` - max error about 0.5 dB, upsample up to 3x * ``0.20`` - max error about 0.1 dB, upsample up to 4x *(default)* * ``0.15`` - max error about 0.05 dB, upsample up to 6x Returns ------- FDSResult Damage spectrum on the validated oscillator grid. ``meta["compat"]`` records the fatigue and response conventions required by downstream operations. Notes ----- The computation assumes a linear SDOF transfer from base acceleration to the selected metric and evaluates fatigue on the reconstructed response histories. For fixed ``slope_k``, the absolute damage level scales globally as .. math:: \frac{p_{scale}^k}{N_{ref} S_{ref}^k} Therefore ``p_scale``, ``ref_stress``, and ``ref_cycles`` change the magnitude of ``damage(f)`` but not its relative shape. When only the spectral shape and a compatible FDS-to-PSD inversion matter, a normalized S-N definition together with ``p_scale=1.0`` is usually sufficient. The choice of ``detrend`` can materially affect low-frequency damage, especially for displacement- and pseudo-velocity-based metrics, because offsets and slow drifts are amplified by the low-frequency dynamics of the oscillator bank. If ``p_scale`` is omitted, ``p_scale=1.0`` is assumed only for normalized S-N parameters with ``ref_stress=1`` and ``ref_cycles=1``. Physical workflows must pass ``p_scale`` explicitly. References ---------- ASTM E1049-85(2017). Standard Practices for Cycle Counting in Fatigue Analysis. Miner, M. A. (1945). "Cumulative Damage in Fatigue." Journal of Applied Mechanics, 12(3), A159-A164. Crandall, S. H., & Mark, W. D. (1963). Random Vibrations in Mechanical Systems. Academic Press. """ validate_sn(sn) validate_sdof(sdof) if sdof.metric not in ("pv", "disp", "vel", "acc"): raise ValidationError("sdof.metric must be one of: 'pv','disp','vel','acc'.") if engine not in ("incremental", "fft"): raise ValidationError("engine must be one of: 'incremental', 'fft'.") p_scale_resolved = resolve_p_scale(p_scale=p_scale, sn=sn) if detrend not in ("linear", "mean", "none"): raise ValidationError("detrend must be one of: 'linear', 'mean', 'none'.") if not isinstance(batch_size, int) or batch_size <= 0: raise ValidationError("batch_size must be an int > 0.") fs = _finite_positive_float_or_raise(fs, field="fs") x = np.asarray(x, dtype=float) if x.ndim != 1 or x.size < 4: raise ValidationError("x must be a 1D array with length >= 4.") if not np.all(np.isfinite(x)): raise ValidationError("x must contain only finite values.") f_requested = build_frequency_grid(sdof) f0, nyquist_info = _validate_nyquist_with_info(f_requested, fs=fs, strict=strict_nyquist) zeta = 1.0 / (2.0 * float(sdof.q)) k = float(sn.slope_k) c = float(sn.ref_cycles) * (float(sn.ref_stress) ** k) y = preprocess_signal(x, mode=detrend) if engine == "incremental": damage = fds_incremental( y, fs=fs, f0=f0, zeta=float(zeta), metric=sdof.metric, k=float(k), c=float(c), p_scale=float(p_scale_resolved), amplitude_from_range=bool(sn.amplitude_from_range), zoh_r_max=float(zoh_r_max), ) engine_tag = "time_rainflow_incremental_zoh_numba" provenance_extra: dict = {"zoh_r_max": float(zoh_r_max)} else: # engine == "fft" if plan is None: H = build_transfer_matrix(fs=fs, n=int(y.size), f0_hz=f0, zeta=float(zeta), metric=sdof.metric) # type: ignore[arg-type] else: H = validate_time_plan_compatibility( plan=plan, fs=fs, n_samples=int(y.size), f0=f0, zeta=float(zeta), metric=sdof.metric, ) damage = _fds_from_signal_fft( y, fs=fs, f0=f0, zeta=float(zeta), metric=sdof.metric, k=float(k), c=float(c), p_scale=float(p_scale_resolved), batch_size=int(batch_size), amplitude_from_range=bool(sn.amplitude_from_range), H=H, ) engine_tag = "time_rainflow_fft_numba" provenance_extra = { "batch_size": int(batch_size), "transfer_plan": bool(plan is not None), } meta = { "compat": compat_dict( sn=sn, metric=sdof.metric, q=sdof.q, p_scale=p_scale_resolved, engine=engine_tag, ), "provenance": { "source": "compute_fds_time", "engine": engine, "detrend": detrend, **provenance_extra, **nyquist_info, }, } return FDSResult(f=f0, damage=damage, meta=meta)