Source code for fdscore.inverse_iterative_spectral

from __future__ import annotations

import numpy as np

from ._inversion_utils import (
    apply_edge_caps,
    blend_log_curves,
    build_edge_taper_weights,
    effective_smoothing_window_bins,
    iterative_param_usage,
    smooth_psd_log10,
)
from .types import FDSResult, PSDResult, IterativeInversionParams, SDOFParams, SNParams
from .validate import ValidationError, ensure_compat_inversion
from .sdof_transfer import build_transfer_psd
from .fds_spectral import compute_fds_spectral_psd


[docs] def invert_fds_iterative_spectral( target: FDSResult, *, f_psd_hz: np.ndarray, psd_seed: np.ndarray, duration_s: float, sn: SNParams, sdof: SDOFParams, p_scale: float, params: IterativeInversionParams = IterativeInversionParams(), ) -> PSDResult: r"""Iteratively synthesize a PSD that matches a target FDS with a spectral predictor. This routine solves the inverse problem "find an acceleration PSD whose predicted FDS matches ``target``" by repeatedly calling the spectral predictor :func:`fdscore.compute_fds_spectral_psd`. Algorithm --------- Let :math:`F_{target}` be the target damage spectrum and let :math:`F(P)` be the spectral predictor evaluated at candidate PSD :math:`P`. The method builds a PSD-to-oscillator influence matrix from the SDOF transfer model and converts it to a normalized redistribution matrix :math:`\alpha`. Oscillator-wise multiplicative gains are computed as .. math:: s_i = \left(\frac{F_{target, i}}{F_i(P)}\right)^{2 / k} and clipped to the interval defined by ``gain_min`` and ``gain_max``. Those gains are projected back to PSD bins through .. math:: u = \exp\left(\alpha^T \log(s)\right) The candidate PSD is then updated multiplicatively as .. math:: P \leftarrow P \, u^{\gamma} followed by optional smoothing, prior blending, and edge capping. The updated PSD is re-evaluated and scored by the median absolute log10-domain mismatch. The function always outputs a one-sided acceleration PSD on the user supplied ``f_psd_hz`` grid. Parameters ---------- target : FDSResult Target FDS result, expressed as damage versus oscillator frequency. f_psd_hz : numpy.ndarray Frequency grid in Hz for the synthesized acceleration PSD. psd_seed : numpy.ndarray Strictly positive seed PSD defined on ``f_psd_hz``. duration_s : float Exposure duration used by the spectral predictor. sn : SNParams S-N curve definition used by the predictor. sdof : SDOFParams Oscillator-grid definition and response metric used by the predictor. p_scale : float Response scale factor used by the predictor. It must be compatible with the way ``target`` was computed. params : IterativeInversionParams Iteration and regularization parameters. Returns ------- PSDResult Synthesized acceleration PSD on ``f_psd_hz``. The result metadata includes convergence diagnostics, reconstructed FDS traces, and explicit per-engine parameter usage. Notes ----- The update strategy implemented here is a library-specific multiplicative inversion heuristic. It is not claimed as a closed-form method from the fatigue literature. The convergence metric stored in ``meta["diagnostics"]["best_err"]`` is the median absolute log10-domain mismatch over bins where both target and predicted damage are positive: .. math:: median\left(\left|\log_{10} F_{pred} - \log_{10} F_{target}\right|\right) The main loop performs two predictor evaluations per iteration: one on the current PSD to derive oscillator-wise gains, and one after the multiplicative update to score the candidate that may become the new best solution. If post-smoothing and post-refinement are enabled, an additional spectral inversion pass is launched from the smoothed candidate using a reduced parameter set. The metadata distinguishes whether the best result came from the main loop, the post-smooth stage, or the post-refine stage. References ---------- Dirlik, T. (1985). Application of computers in fatigue analysis. Miner, M. A. (1945). "Cumulative Damage in Fatigue." Journal of Applied Mechanics, 12(3), A159-A164. """ if not np.isfinite(duration_s) or float(duration_s) <= 0: raise ValidationError("duration_s must be finite and > 0.") if not np.isfinite(p_scale) or float(p_scale) <= 0: raise ValidationError("p_scale must be finite and > 0.") if not np.isfinite(sdof.q) or float(sdof.q) <= 0: raise ValidationError("sdof.q must be finite and > 0.") f_psd = np.asarray(f_psd_hz, dtype=float).reshape(-1) P0 = np.asarray(psd_seed, dtype=float).reshape(-1) if f_psd.size < 2 or P0.size < 2 or f_psd.shape != P0.shape: raise ValidationError("f_psd_hz and psd_seed must be 1D arrays with same shape >= 2.") if not np.all(np.diff(f_psd) > 0): raise ValidationError("f_psd_hz must be strictly increasing.") if np.any(P0 <= 0) or not np.all(np.isfinite(P0)): raise ValidationError("psd_seed must be finite and strictly positive.") q = float(sdof.q) ensure_compat_inversion(target=target, metric=sdof.metric, q=q, p_scale=p_scale, sn=sn, sdof=sdof) f0 = np.asarray(target.f, dtype=float).reshape(-1) zeta = 1.0 / (2.0 * q) H = build_transfer_psd(f_psd_hz=f_psd, f0_hz=f0, zeta=zeta, metric=sdof.metric) B = np.abs(H) ** 2 B_eff = np.clip(B, 1e-300, None) if float(params.alpha_sharpness) != 1.0: B_eff = B_eff ** float(params.alpha_sharpness) alpha = B_eff / (B_eff.sum(axis=0) + 1e-30) sens = B_eff.sum(axis=0) sens_n = sens / (np.max(sens) + 1e-30) prior_w_sens = np.clip(1.0 - sens_n, 0.0, 1.0) ** float(max(params.prior_power, 0.0)) prior_w = np.clip(float(params.prior_blend), 0.0, 1.0) * prior_w_sens edge_w = build_edge_taper_weights(f_psd=f_psd, edge_hz=params.edge_anchor_hz) prior_w = np.clip(prior_w + np.clip(float(params.edge_anchor_blend), 0.0, 1.0) * edge_w, 0.0, 1.0) use_prior = bool(np.any(prior_w > 0.0)) floor = float(params.floor) P = np.clip(P0.copy(), floor, None) hist_err: list[float] = [] bestP = P.copy() bestErr = float("inf") bestF = None best_stage = "main_loop" def predictor(Pyy: np.ndarray) -> np.ndarray: fds = compute_fds_spectral_psd( f_psd_hz=f_psd, psd_baseacc=Pyy, duration_s=float(duration_s), sn=sn, sdof=sdof, p_scale=float(p_scale), ) return np.asarray(fds.damage, dtype=float) target_fds = np.asarray(target.damage, dtype=float) iters = int(params.iters) if iters <= 0: raise ValidationError("params.iters must be >= 1.") for it in range(iters): pred = predictor(P) safe = (target_fds > 0) & (pred > 0) s = np.ones_like(target_fds) if np.any(safe): s[safe] = (target_fds[safe] / pred[safe]) ** (2.0 / float(sn.slope_k)) s = np.clip(s, float(params.gain_min), float(params.gain_max)) u = np.exp(alpha.T @ np.log(s + 1e-30)) P *= u ** float(params.gamma) P = np.clip(P, floor, None) do_smooth = bool(params.smooth_enabled) and int(params.smooth_window_bins) > 1 if do_smooth: every = int(params.smooth_every_n_iters) if every > 0: do_smooth = ((it + 1) % every) == 0 if do_smooth: P = smooth_psd_log10(P, win=int(params.smooth_window_bins), floor=floor) P = np.clip(P, floor, None) if use_prior: P = blend_log_curves(cur=P, ref=P0, weight=prior_w, floor=floor) if (float(params.tail_cap_ratio) > 0.0) or (float(params.low_cap_ratio) > 0.0): P = apply_edge_caps( P=P, f_psd=f_psd, tail_cap_start_hz=float(params.tail_cap_start_hz), tail_cap_ratio=float(params.tail_cap_ratio), low_cap_ratio=float(params.low_cap_ratio), floor=floor, ) pred_eval = predictor(P) safe_eval = (target_fds > 0) & (pred_eval > 0) if np.any(safe_eval): err = float(np.median(np.abs(np.log10(pred_eval[safe_eval]) - np.log10(target_fds[safe_eval])))) else: err = float("inf") hist_err.append(err) if err < bestErr: bestErr = err bestP = P.copy() bestF = pred_eval.copy() best_stage = "main_loop" Pfin = bestP Ffin = bestF if bestF is not None else predictor(Pfin) err_fin = bestErr post_smooth_err: float | None = None post_refine_err: float | None = None post_refine_err_history: list[float] = [] post_refine_params_meta: dict[str, float | int] | None = None if int(params.post_smooth_window_bins) > 1: Ps = smooth_psd_log10(Pfin, win=int(params.post_smooth_window_bins), floor=floor) blend = float(np.clip(params.post_smooth_blend, 0.0, 1.0)) if blend >= 1.0: Pp = Ps elif blend <= 0.0: Pp = np.clip(Pfin, floor, None) else: Pp = np.exp((1.0 - blend) * np.log(np.clip(Pfin, floor, None)) + blend * np.log(np.clip(Ps, floor, None))) Fp = predictor(Pp) safe = (target_fds > 0) & (Fp > 0) if np.any(safe): err_p = float(np.median(np.abs(np.log10(Fp[safe]) - np.log10(target_fds[safe])))) else: err_p = float("inf") post_smooth_err = err_p if err_p <= err_fin: Pfin, Ffin, err_fin = Pp, Fp, err_p best_stage = "post_smooth" if int(params.post_refine_iters) > 0: refine_params = IterativeInversionParams( iters=int(params.post_refine_iters), gamma=float(params.post_refine_gamma), gain_min=float(params.post_refine_min), gain_max=float(params.post_refine_max), alpha_sharpness=float(params.alpha_sharpness), floor=floor, smooth_enabled=True, smooth_window_bins=int(params.post_smooth_window_bins), smooth_every_n_iters=1, prior_blend=float(params.prior_blend), prior_power=float(params.prior_power), edge_anchor_hz=float(params.edge_anchor_hz), edge_anchor_blend=float(params.edge_anchor_blend), tail_cap_start_hz=float(params.tail_cap_start_hz), tail_cap_ratio=float(params.tail_cap_ratio), low_cap_ratio=float(params.low_cap_ratio), post_smooth_window_bins=0, post_smooth_blend=0.0, post_refine_iters=0, post_refine_gamma=float(params.post_refine_gamma), post_refine_min=float(params.post_refine_min), post_refine_max=float(params.post_refine_max), ) post_refine_params_meta = { "iters": int(refine_params.iters), "gamma": float(refine_params.gamma), "gain_min": float(refine_params.gain_min), "gain_max": float(refine_params.gain_max), "smooth_window_bins": int(effective_smoothing_window_bins(refine_params.smooth_window_bins)), "post_refine_iters": int(refine_params.post_refine_iters), } Pref = invert_fds_iterative_spectral( target, f_psd_hz=f_psd, psd_seed=Pp, duration_s=float(duration_s), sn=sn, sdof=sdof, p_scale=float(p_scale), params=refine_params, ) pref_diag = Pref.meta.get("diagnostics", {}) post_refine_err = float(pref_diag.get("best_err", float("inf"))) post_refine_err_history = [float(x) for x in pref_diag.get("err_history", [])] if post_refine_err <= err_fin: Pfin = np.asarray(Pref.psd, dtype=float) Ffin = np.asarray(pref_diag.get("best_recon_fds", Ffin), dtype=float) err_fin = post_refine_err best_stage = "post_refine" meta = { "diagnostics": { "best_err": float(err_fin), "best_stage": best_stage, "err_history": hist_err, "err_history_scope": "main_loop_only", "post_smooth_err": post_smooth_err, "post_refine_err": post_refine_err, "post_refine_err_history": post_refine_err_history, "post_refine_params": post_refine_params_meta, "best_recon_fds": Ffin, "iters": int(params.iters), "predictor_evals_per_iteration": 2, }, "param_usage": iterative_param_usage(engine="spectral", params=params), "compat": target.meta.get("compat", {}), "provenance": {"source": "invert_fds_iterative_spectral"}, } return PSDResult(f=f_psd, psd=Pfin, meta=meta)