# Online Phase: Failing sensors
# Author: Stefano Riva, PhD Student, NRG, Politecnico di Milano
# Latest Code Update: 24 October 2025
# Latest Doc Update: 24 October 2025
import numpy as np
import pyvista as pv
from collections import namedtuple
import os
import scipy
from ..tools.functions_list import FunctionsList
from ..tools.backends import IntegralCalculator, LoopProgress, Timer
from ..tools.write_read import ImportFunctionsList
from .online_base import OnlineDDROM, OnlineSensors
from .geim import GEIM
from .pbdw import PBDW
import warnings
warnings.filterwarnings("ignore", category=RuntimeWarning)
[docs]
class FailingGEIM(GEIM):
"""
Class for the online phase of GEIM with failing sensors.
Inherits from GEIM class.
Parameters
----------
grid : pv.UnstructuredGrid
The computational grid.
gdim : int, optional
Geometrical dimension of the problem (default is 3).
varname : str, optional
Name of the variable in the grid (default is 'u').
"""
def __init__(self, grid: pv.UnstructuredGrid, gdim: int = 3, varname: str = 'u'):
super().__init__(grid, gdim=gdim, varname=varname)
[docs]
def get_measurements(self, snaps: FunctionsList | np.ndarray, M: int = None,
noise_std: float = 0.0,
drift_dict: dict = None) -> np.ndarray:
r"""
This method extracts the measures from the input functions at the magic sensors locations. It allows also to add Gaussian noise to the measurements.
Moreover, the drift failure of the sensor can be simulated by providing a drift_dict with the following keys:
- 'kappa' : float
Shift from the average value of the sensor.
- 'rho' : float, optional
High frequency oscillation amplitude.
- 'idx_failed' : list of int, optional
List of indices of the failed sensors.
- 'mu_failure' : int, optional (Default = 0)
Starting index of the failure in the snapshots.
Parameters
----------
snaps : FunctionsList or np.ndarray
Function object to extract the measures from.
M : int, optional (Default = None)
Number of sensors to use for the extraction. If `None`, all available magic sensors are used.
noise_std : float, optional (Default = 0.0)
Standard deviation of the Gaussian noise to be added to the measurements. Default is 0.
drift_dict : dict, optional (Default = None)
Dictionary containing the parameters for the drift failure simulation.
Returns
-------
measures : np.ndarray
Measures :math:`\{y_m\}_{m=1}^M`, shaped :math:`(M,N_s)`.
"""
if isinstance(snaps, FunctionsList):
snaps = snaps.return_matrix()
elif isinstance(snaps, np.ndarray):
if snaps.ndim == 1:
snaps = np.atleast_2d(snaps).T
else:
raise TypeError("Input must be a FunctionsList or a numpy ndarray.")
assert snaps.shape[0] == self._basis.fun_shape, "Input shape must match the magic functions shape."
measures = self.sensors.action(snaps, M=M) # shape (M, Ns)
if noise_std > 0.0:
noise = np.random.normal(0, noise_std, measures.shape)
measures += noise
# Handle sensor drift failure
if drift_dict is not None:
assert 'kappa' in drift_dict, "'drift_dict' must contain the key 'kappa'."
kappa = drift_dict['kappa']
assert 'rho' in drift_dict, "'drift_dict' must contain the key 'rho'."
rho = drift_dict['rho']
assert 'idx_failed' in drift_dict, "'drift_dict' must contain the key 'idx_failed'."
idx_failed = drift_dict['idx_failed']
assert min(idx_failed) >= 0 and max(idx_failed) < measures.shape[0], f"'idx_failed' must contain indices between 0 and {measures.shape[0]-1}."
if 'mu_failure' in drift_dict:
mu_failure = drift_dict['mu_failure']
else:
mu_failure = 0
drift = np.random.normal(kappa, rho, size=measures[idx_failed, mu_failure:].shape)
# Apply the drift to the failed sensors
measures[idx_failed, mu_failure:] += drift
return measures
def _solve_hardfailure_trgeim_linear_system(self, measures: np.ndarray,
regularization_params: dict,
hard_failure_dict: dict):
r"""
Computes the reduced coefficients :math:`\{\beta_m\}_{m=1}^{M}}` with as many magic functions/points (synthetic) as the input measures :math:`M`
.. math::
y_m = u(\vec{x}_m) \qquad m = 1, \dots, M_{max}
from the following Tikhonov-regularized linear system
.. math::
(\mathbb{B}^T\mathbb{B} + \lambda \mathbb{T}^T\mathbb{T})\boldsymbol{\beta} = \mathbb{B}^T\mathbf{y}+\lambda \mathbb{T}^T\mathbb{T}\langle \boldsymbol{\beta}\rangle
where :math:`\mathbb{T}` is the Tikhonov matrix, :math:`\langle \boldsymbol{\beta}\rangle` is the mean of the training reduced coefficients, and :math:`\lambda` is the regularization parameter.
This method is able to handle hard failures by removing the failed sensors from the linear system after the failure occurs.
Parameters
----------
measures : np.ndarray
Measurements vector :math:`\{y_m\}_{m=1}^{M_{max}}` of the function `u` at the sensors locations.
regularization_params : dict
Dictionary containing the regularization parameters. It must contain the following keys:
- 'type': str, the type of regularization.
- 'lambda': float, the regularization parameter.
hard_failure_dict : dict
Dictionary containing the parameters for the hard failure simulation. It must contain the following keys:
- 'idx_failed': list of int, list of indices of the failed sensors.
- 'mu_failure': int, starting index of the failure in the snapshots.
Returns
----------
beta_coeff : np.ndarray
Array of coefficients for the interpolant :math:`\{\beta_m\}_{m=1}^{M_{max}}`
"""
assert measures.shape[0] <= len(self._basis), "The number of measures must be equal to the number of magic functions"
if self.matrix_B is None:
self.compute_B_matrix()
if self.tikhonov is None:
raise ValueError("Tikhonov matrices not set. Please call 'set_tikhonov_matrices' method first.")
# Extract quantities for the linear system
_m = measures.shape[0]
lambda_reg = regularization_params['lambda']
_B = self.matrix_B[:_m, :_m]
_T = self.tikhonov['T'][:_m, :_m]
_beta_mean = self.tikhonov['beta_mean'][:_m]
# Assemble the linear system
A = _B.T @ _B + lambda_reg * (_T.T @ _T)
rhs = _B.T @ measures[:_m] + lambda_reg * (_T.T @ _T @ _beta_mean[:, np.newaxis])
beta_coeff = np.zeros((_m, measures.shape[1]))
for mu in range(measures.shape[1]):
if mu < hard_failure_dict['mu_failure']:
beta_coeff[:, mu] = scipy.linalg.solve(A, rhs[:, mu])
else:
_rhs = np.delete(rhs[:, mu], hard_failure_dict['idx_failed'], axis=0)
_A = np.delete(
np.delete(A, hard_failure_dict['idx_failed'], axis=1),
hard_failure_dict['idx_failed'], axis=0
)
_beta_sol = scipy.linalg.solve(_A, _rhs)
idx_sol = 0
for i in range(_m):
if i in hard_failure_dict['idx_failed']:
beta_coeff[i, mu] = 0.0
else:
beta_coeff[i, mu] = _beta_sol[idx_sol]
idx_sol += 1
return beta_coeff
[docs]
def estimate(self, measures: np.ndarray,
regularization_params: dict = None,
hard_failure_dict: dict = None
):
r"""
Estimate the state using the GEIM given the measures at the sensor locations.
Parameters
----------
measures : np.ndarray
Measures :math:`\{y_m\}_{m=1}^{M_{max}}`, shaped :math:`(M_{max},N_s)`.
regularization_params : dict, optional (Default = None)
Dictionary containing the regularization parameters. If `None`, no regularization is applied.
At the moment, the only supported regularization is Tikhonov from `Introini et al. (2023) <https://doi.org/10.1016/j.cma.2022.115773>`_.
hard_failure_dict : dict, optional (Default = None)
Dictionary containing the parameters for the hard failure simulation.
If provided, the method will handle hard failures by removing the corresponding sensors from the estimation process.
Returns
-------
estimation : FunctionsList
An instance of `FunctionsList` containing the estimated state.
"""
assert measures.shape[0] <= len(self.sensors), f"The number of measures {measures.shape[0]} must be less than or equal to the number of magic sensors {len(self.sensors)}."
if self.matrix_B is None:
self.compute_B_matrix()
if regularization_params is not None and hard_failure_dict is None:
if regularization_params['type'] == 'tikhonov':
beta_coeff = self._solve_trgeim_linear_system(measures, regularization_params)
else:
raise ValueError(f"Regularization type {regularization_params['type']} not recognized: available types: 'tikhonov'.")
elif hard_failure_dict is not None:
assert regularization_params is not None, "Regularization parameters must be provided for hard failure handling."
assert regularization_params['type'] == 'tikhonov', "Only Tikhonov regularization is supported for hard failure handling."
beta_coeff = self._solve_hardfailure_trgeim_linear_system(measures, regularization_params, hard_failure_dict)
else:
beta_coeff = self._solve_geim_linear_system(measures)
estimation = self._reconstruct(beta_coeff)
return estimation
[docs]
class FailingPBDW(PBDW):
"""
Class for the online phase of PBDW with failing sensors.
Inherits from PBDW class.
Parameters
----------
grid : pv.UnstructuredGrid
The computational grid.
gdim : int, optional
Geometrical dimension of the problem (default is 3).
varname : str, optional
Name of the variable in the grid (default is 'u').
"""
def __init__(self, grid: pv.UnstructuredGrid, gdim: int = 3, varname: str = 'u'):
super().__init__(grid, gdim=gdim, varname=varname)
[docs]
def get_measurements(self, snaps: FunctionsList | np.ndarray, M: int = None,
noise_std: float = 0.0, drift_dict: dict = None):
r"""
This method extracts the measures from the input functions at the magic sensors locations. It allows also to add Gaussian noise to the measurements.
Moreover, the drift failure of the sensor can be simulated by providing a drift_dict with the following keys:
- 'kappa' : float
Shift from the average value of the sensor.
- 'rho' : float, optional
High frequency oscillation amplitude.
- 'idx_failed' : list of int, optional
List of indices of the failed sensors.
- 'mu_failure' : int, optional (Default = 0)
Starting index of the failure in the snapshots.
Parameters
----------
snaps : FunctionsList or np.ndarray
Function object to extract the measures from.
M : int, optional (Default = None)
Number of sensors to use for the extraction. If `None`, all available magic sensors are used.
noise_std : float, optional (Default = 0.0)
Standard deviation of the Gaussian noise to be added to the measurements. Default is 0.
drift_dict : dict, optional (Default = None)
Dictionary containing the parameters for the drift failure simulation.
Returns
-------
measures : np.ndarray
Measures :math:`\{y_m\}_{m=1}^M`, shaped :math:`(M,N_s)`.
"""
if isinstance(snaps, FunctionsList):
snaps = snaps.return_matrix()
elif isinstance(snaps, np.ndarray):
if snaps.ndim == 1:
snaps = np.atleast_2d(snaps).T
else:
raise TypeError("Input must be a FunctionsList or a numpy ndarray.")
assert snaps.shape[0] == self._basis.fun_shape, "Input shape must match the magic functions shape."
measures = self.sensors.action(snaps, M=M) # shape (M, Ns)
if noise_std > 0.0:
noise = np.random.normal(0, noise_std, measures.shape)
measures += noise
# Handle sensor drift failure
if drift_dict is not None:
assert 'kappa' in drift_dict, "'drift_dict' must contain the key 'kappa'."
kappa = drift_dict['kappa']
assert 'rho' in drift_dict, "'drift_dict' must contain the key 'rho'."
rho = drift_dict['rho']
assert 'idx_failed' in drift_dict, "'drift_dict' must contain the key 'idx_failed'."
idx_failed = drift_dict['idx_failed']
assert min(idx_failed) >= 0 and max(idx_failed) < measures.shape[0], f"'idx_failed' must contain indices between 0 and {measures.shape[0]-1}."
if 'mu_failure' in drift_dict:
mu_failure = drift_dict['mu_failure']
else:
mu_failure = 0
drift = np.random.normal(kappa, rho, size=measures[idx_failed, mu_failure:].shape)
# Apply the drift to the failed sensors
measures[idx_failed, mu_failure:] += drift
return measures
def _solve_hardfailure_pbdw_linear_system(self, measures: np.ndarray, xi: float = 0.0,
hard_failure_dict: dict = None):
r"""
Computes the reduced coefficients :math:`\{\alpha_n\}_{n=1}^{N}}` and :math:`\{\theta_m\}_{m=1}^{M}}` with as many basis sensors as the input measures :math:`M`. The following linear system is solved
.. math::
\left[
\begin{array}{ccc}
\xi \cdot M \cdot \mathbb{I} + \mathbb{A} & & \mathbb{K} \\ & & \\
\mathbb{K}^T & & 0
\end{array}
\right] \cdot
\left[
\begin{array}{c}
\boldsymbol{\theta} \\ \\ \boldsymbol{\alpha}
\end{array}
\right] =
\left[
\begin{array}{c}
\mathbf{y} \\ \\ \mathbf{0}
\end{array}
\right]
given :math:`\mathbf{y}\in\mathbb{R}^M` as the measurements vector.
This method is able to handle hard failures by removing the failed sensors from the linear system after the failure occurs.
Parameters
----------
measures : np.ndarray
Measurements vector :math:`\{y_m\}_{m=1}^{M_{max}}` of the function `u` at the sensors locations.
xi : float, optional (Default = 0.0)
Regularization parameter for the regularization. Default is 0.0
Returns
----------
beta_coeff : np.ndarray
Array of coefficients for the interpolant :math:`\{\beta_m\}_{m=1}^{M_{max}}`
"""
assert measures.shape[0] >= len(self._basis), "The number of measures must be larger than or equal to the number of basis functions"
assert measures.shape[0] <= len(self.sensors._library), "The number of measures must be less than or equal to the number of sensors"
if self.matrix_A is None or self.matrix_K is None or self.matrix_Z is None:
self.compute_matrices()
_m = measures.shape[0]
_A = self.matrix_A[:_m, :_m] + xi * _m * np.eye(_m)
_K = self.matrix_K[:_m]
_Z = self.matrix_Z
# Assemble the full linear system
sys_matrix = np.block([
[_A, _K],
[_K.T, np.zeros((_Z.shape[0], _Z.shape[0]))]
])
# Right-hand side
rhs = np.concatenate([measures, np.zeros((_Z.shape[0], measures.shape[1]))], axis=0)
# Solve the linear system
_coeffs = np.zeros((_A.shape[0] + _Z.shape[0], measures.shape[1]))
for mu in range(measures.shape[1]):
if mu < hard_failure_dict['mu_failure']:
_coeffs[:, mu] = scipy.linalg.solve(sys_matrix, rhs[:, mu])
else:
_rhs = np.delete(rhs[:, mu], hard_failure_dict['idx_failed'], axis=0)
_sys_matrix = np.delete(
np.delete(sys_matrix, hard_failure_dict['idx_failed'], axis=1),
hard_failure_dict['idx_failed'], axis=0
)
_coeff_sol = scipy.linalg.solve(_sys_matrix, _rhs)
idx_sol = 0
for i in range(_A.shape[0] + _Z.shape[0]):
if i in hard_failure_dict['idx_failed']:
_coeffs[i, mu] = 0.0
else:
_coeffs[i, mu] = _coeff_sol[idx_sol]
idx_sol += 1
theta_coeff = _coeffs[:_m]
alpha_coeff = _coeffs[_m:]
return alpha_coeff, theta_coeff
[docs]
def estimate(self, measures: np.ndarray, xi: float = 0.0,
hard_failure_dict: dict = None):
r"""
Estimate the state using the PBDW given the measures at the sensor locations.
Parameters
----------
measures : np.ndarray
Measures :math:`\{y_m\}_{m=1}^{M}`, shaped :math:`(M, N_s)`.
xi : float, optional (Default = 0.0)
Regularization parameter for the regularization. Default is 0.0
hard_failure_dict : dict, optional (Default = None)
Dictionary containing the parameters for the hard failure simulation.
If provided, the method will handle hard failures by removing the corresponding sensors from the estimation process.-
Returns
-------
estimation : FunctionsList
An instance of `FunctionsList` containing the estimated state.
"""
if self.matrix_A is None or self.matrix_K is None or self.matrix_Z is None:
self.compute_matrices()
if hard_failure_dict is not None:
alpha_coeffs, theta_coeffs = self._solve_hardfailure_pbdw_linear_system(measures, xi=xi, hard_failure_dict=hard_failure_dict)
else:
alpha_coeffs, theta_coeffs = self._solve_pbdw_linear_system(measures, xi=xi)
estimation = self._reconstruct(alpha_coeffs, theta_coeffs)
return estimation