# Online Phase: Empirical Interpolation Method (EIM)
# Author: Stefano Riva, PhD Student, NRG, Politecnico di Milano
# Latest Code Update: 07 October 2025
# Latest Doc Update: 07 October 2025
import numpy as np
import pyvista as pv
from collections import namedtuple
import os
import scipy
from ..tools.functions_list import FunctionsList
from ..tools.backends import IntegralCalculator, LoopProgress, Timer
from ..tools.write_read import ImportFunctionsList
from .online_base import OnlineDDROM
import warnings
warnings.filterwarnings("ignore", category=RuntimeWarning)
[docs]
class EIM(OnlineDDROM):
r"""
A class to estimate the state using the EIM.
This class implements the empirical interpolation method (EIM) to estimate the state using measurements.
Parameters
----------
grid : pyvista.UnstructuredGrid
The grid on which the POD is performed. It is used to define the spatial domain of the snapshots.
gdim : int, optional (Default = 3)
The geometric dimension of the grid. It can be either 2 or 3.
varname : str, optional
The name of the variable to be used for the POD. Default is 'u'.
"""
def __init__(self, grid: pv.UnstructuredGrid, gdim: int = 3, varname: str = 'u'):
super().__init__(grid, gdim=gdim, varname=varname)
self.matrix_B = None
self.tikhonov = None # Dictionary to store tikhonov regularization matrices
self.train_beta_coeffs = None # Training reduced coefficients for Tikhonov regularization
[docs]
def set_basis(self, basis = None, path_folder: str = None, **kwargs):
"""
Assign the magic functions to the EIM model either from a FunctionsList object or by loading from a folder.
Parameters
----------
basis : FunctionsList, optional
An instance of `FunctionsList` containing the basis functions.
path_folder : str, optional
The path to the folder containing the basis functions.
**kwargs : dict
Additional keyword arguments to pass to the loading function.
Returns
-------
None
"""
if basis is not None:
self._basis = basis
elif path_folder is not None:
_filename = os.path.join(path_folder, f'mf_{self.varname}')
self._basis = ImportFunctionsList(_filename, **kwargs)
else:
raise ValueError("Either 'basis' or 'path_folder' must be provided.")
[docs]
def set_magic_points(self, magic_points: dict = None, path_folder: str = None):
"""
Set the magic points for the EIM.
Parameters
----------
magic_points : list of int
A list containing the indices of the magic points.
"""
if magic_points is not None:
self.magic_points = magic_points
elif path_folder is not None:
_filename = os.path.join(path_folder, f'magic_points_{self.varname}.npy')
self.magic_points = np.load(_filename, allow_pickle=True).item()
else:
raise ValueError("Either 'magic_points' or 'path_folder' must be provided.")
[docs]
def compute_B_matrix(self):
r"""
Compute the matrix B used in the EIM: this matrix is the evluation of the magic functions at the magic points.
.. math::
B_{ij} = q_j(x_i) \qquad i,j = 1, \ldots, M
where :math:`\{q_j\}_{j=1}^M` are the magic functions and :math:`\{x_i\}_{i=1}^M` are the magic points.
"""
M = len(self.magic_points['idx'])
assert M == len(self._basis), f"Number of magic points {M} must be equal to the number of magic functions {len(self._basis)}."
self.matrix_B = self._basis.return_matrix()[self.magic_points['idx']]
[docs]
def set_tikhonov_matrices(self, beta_coeffs: np.ndarray = None, train_snaps: FunctionsList | np.ndarray = None):
r"""
This method is used to compute the matrices and vectors needed for the Tikhonov regularization, following `Introini et al. (2023) <https://doi.org/10.1016/j.cma.2022.115773>`_, from the training reduced coefficients.
Given the training reduced coefficients :math:`\{\beta_m(\boldsymbol{\mu}_i)\}_{m=1}^{M}, i=1,\ldots,N_{train}` it computes the following:
.. math::
\langle \boldsymbol{\beta} \rangle = \sum_{i=1}^{N_{train}} \boldsymbol{\beta}(\boldsymbol{\mu}_i) \in\mathbb{R}^{M}
.. math::
\sigma_{\beta_j}^2 = \frac{1}{N_{train}-1}\sum_{i=1}^{N_{train}} (\beta_j(\boldsymbol{\mu}_i) - \langle \beta_j \rangle)^2 \qquad j=1,\ldots,M
from which the Tikhonov matrix is defined:
.. math::
\mathbb{T} = \mathrm{diag}\left(\frac{1}{|\sigma_{\beta_1}|}, \ldots, \frac{1}{|\sigma_{\beta_M}|}\right) \in\mathbb{R}^{M\times M}
The training coefficients can be provided as input or they can be computed from the training snapshots using the `_reduce` method.
Parameters
----------
beta_coeffs : np.ndarray, optional
The training reduced coefficients, shaped :math:`(M, N_{train})`. If not provided, they will be computed from the training snapshots.
train_snaps : FunctionsList or np.ndarray, optional
The training snapshots to compute the reduced coefficients from. Required if `beta_coeffs` is not provided.
"""
if beta_coeffs is not None:
self.train_beta_coeffs = beta_coeffs
elif train_snaps is not None:
self.train_beta_coeffs = self._reduce(train_snaps)
else:
raise ValueError("Either 'beta_coeffs' or 'train_snaps' must be provided.")
self.tikhonov = {
'beta_mean': np.mean(self.train_beta_coeffs, axis=1), # shape (M,)
'beta_std': np.std(self.train_beta_coeffs, axis=1)
}
# Compute Tikhonov matrix
self.tikhonov['T'] = np.diag(1.0 / (self.tikhonov['beta_std'] + 1e-16)) # shape (M, M)
[docs]
def get_measurements(self, snaps: FunctionsList | np.ndarray, M: int = None,
noise_std: float = 0.0):
r"""
This method extracts the measures from the input functions at the magic points locations.
Parameters
----------
snaps : FunctionsList or np.ndarray
Function object to extract the measures from.
M : int, optional (Default = None)
Number of magic points to use for the extraction. If `None`, all available magic points are used.
noise_std : float, optional (Default = 0.0)
Standard deviation of the Gaussian noise to be added to the measurements. Default is 0.
Returns
-------
measures : np.ndarray
Measures :math:`\{y_m\}_{m=1}^M`, shaped :math:`(M,N_s)`.
"""
if isinstance(snaps, FunctionsList):
snaps = snaps.return_matrix()
elif isinstance(snaps, np.ndarray):
if snaps.ndim == 1:
snaps = np.atleast_2d(snaps).T
else:
raise TypeError("Input must be a FunctionsList or a numpy ndarray.")
assert snaps.shape[0] == self._basis.fun_shape, "Input shape must match the magic functions shape."
measures = snaps[self.magic_points['idx'][:M]]
if noise_std > 0.0:
noise = np.random.normal(0, noise_std, measures.shape)
measures += noise
return measures
def _reconstruct(self, coeffs: np.ndarray):
r"""
This method reconstructs the state from the reduced coefficients :math:`\{\beta_m\}_{m=1}^M` using the magic functions :math:`\{q_m\}_{m=1}^M`.
.. math::
u(\cdot;\,\boldsymbol{\mu}) = \sum_{m=1}^M \beta_m(\boldsymbol{\mu}) q_m(\cdot)
Parameters
----------
coeffs : np.ndarray
Reduced coefficients :math:`\{\beta_m\}_{m=1}^M`, shaped :math:`(M,N_s)`.
Returns
-------
estimation : FunctionsList
An instance of `FunctionsList` containing the estimated state.
"""
assert coeffs.shape[0] <= len(self.basis), "The number of coefficients must be less than or equal to the number of POD modes."
coeffs = np.atleast_2d(coeffs)
estimation = FunctionsList(self.basis.fun_shape)
for nn in range(coeffs.shape[1]):
estimation.append(
self.basis.lin_combine(coeffs[:, nn])
)
return estimation
def _solve_eim_linear_system(self, measures: np.ndarray):
r"""
Computes the reduced coefficients :math:`\{\beta_m\}_{m=1}^{M}}` with as many magic functions/points (synthetic) as the input measures :math:`M`.
.. math::
y_m = u(\vec{x}_m) \qquad m = 1, \dots, M_{max}
Parameters
----------
measures : np.ndarray
Measurements vector :math:`\{y_m\}_{m=1}^{M_{max}}` of the function `u` at the sensors locations.
Returns
----------
beta_coeff : np.ndarray
Array of coefficients for the interpolant :math:`\{\beta_m\}_{m=1}^{M_{max}}`
"""
assert measures.shape[0] <= len(self._basis), "The number of measures must be equal to the number of magic functions"
if self.matrix_B is None:
self.compute_B_matrix()
_m = measures.shape[0]
beta_coeff = scipy.linalg.solve(self.matrix_B[:_m, :_m], measures[:_m], lower=True)
return beta_coeff
def _solve_treim_linear_system(self, measures: np.ndarray, regularization_params: dict):
r"""
Computes the reduced coefficients :math:`\{\beta_m\}_{m=1}^{M}}` with as many magic functions/points (synthetic) as the input measures :math:`M`
.. math::
y_m = u(\vec{x}_m) \qquad m = 1, \dots, M_{max}
from the following Tikhonov-regularized linear system
.. math::
(\mathbb{B}^T\mathbb{B} + \lambda \mathbb{T}^T\mathbb{T})\boldsymbol{\beta} = \mathbb{B}^T\mathbf{y}+\lambda \mathbb{T}^T\mathbb{T}\langle \boldsymbol{\beta}\rangle
where :math:`\mathbb{T}` is the Tikhonov matrix, :math:`\langle \boldsymbol{\beta}\rangle` is the mean of the training reduced coefficients, and :math:`\lambda` is the regularization parameter.
Parameters
----------
measures : np.ndarray
Measurements vector :math:`\{y_m\}_{m=1}^{M_{max}}` of the function `u` at the sensors locations.
regularization_params : dict
Dictionary containing the regularization parameters. It must contain the following keys:
- 'type': str, the type of regularization.
- 'lambda': float, the regularization parameter.
Returns
----------
beta_coeff : np.ndarray
Array of coefficients for the interpolant :math:`\{\beta_m\}_{m=1}^{M_{max}}`
"""
assert measures.shape[0] <= len(self._basis), "The number of measures must be equal to the number of magic functions"
if self.matrix_B is None:
self.compute_B_matrix()
if self.tikhonov is None:
raise ValueError("Tikhonov matrices not set. Please call 'set_tikhonov_matrices' method first.")
# Extract quantities for the linear system
_m = measures.shape[0]
lambda_reg = regularization_params['lambda']
_B = self.matrix_B[:_m, :_m]
_T = self.tikhonov['T'][:_m, :_m]
_beta_mean = self.tikhonov['beta_mean'][:_m]
# Solve the linear system
A = _B.T @ _B + lambda_reg * (_T.T @ _T)
rhs = _B.T @ measures[:_m] + lambda_reg * (_T.T @ _T @ _beta_mean[:, np.newaxis])
beta_coeff = scipy.linalg.solve(A, rhs)
return beta_coeff
[docs]
def estimate(self, measures: np.ndarray, M: int = None,
regularization_params: dict = None):
r"""
Estimate the state using the EIM given the measures at the magic points locations.
Parameters
----------
measures : np.ndarray
Measures :math:`\{y_m\}_{m=1}^{M_{max}}`, shaped :math:`(M_{max},N_s)`.
M : int, optional (Default = None)
Number of magic points to use for the extraction. If `None`, all available magic points are used.
regularization_params : dict, optional (Default = None)
Dictionary containing the regularization parameters. If `None`, no regularization is applied.
At the moment, the only supported regularization is Tikhonov from `Introini et al. (2023) <https://doi.org/10.1016/j.cma.2022.115773>`_.
Returns
-------
estimation : FunctionsList
An instance of `FunctionsList` containing the estimated state.
"""
if M is None:
M = len(self.magic_points['idx'])
else:
assert M <= len(self.magic_points['idx']), "M cannot be larger than the number of magic points available"
assert measures.shape[0] <= M, f"The number of measures {measures.shape[0]} must be less than or equal to the number of magic points {M}."
if self.matrix_B is None:
self.compute_B_matrix()
if regularization_params is not None:
if regularization_params['type'] == 'tikhonov':
beta_coeff = self._solve_treim_linear_system(measures, regularization_params)
else:
raise ValueError(f"Regularization type {regularization_params['type']} not recognized: available types: 'tikhonov'.")
else:
beta_coeff = self._solve_eim_linear_system(measures)
estimation = self._reconstruct(beta_coeff)
return estimation
def _reduce(self, snaps: FunctionsList | np.ndarray, M: int = None):
r"""
This method can be used to generate the reduced coefficients based on the magic functions and points, by extracting the measures from the input functions and solve the associated EIM linear system.
Parameters
----------
snaps : FunctionsList or np.ndarray
Function object to project onto the reduced space of dimension `M`.
M : int, optional (Default = None)
Number of magic functions/points to use for the projection. If `None`, all available magic functions/points are used.
Returns
-------
beta_coeff : np.ndarray
Reduced coefficients :math:`\{\beta_m\}_{m=1}^M`, shaped :math:`(M,N_s)`.
"""
if M is None:
M = len(self._basis)
else:
assert M <= len(self._basis), "M cannot be larger than the number of magic functions available"
if isinstance(snaps, FunctionsList):
snaps = snaps.return_matrix()
elif isinstance(snaps, np.ndarray):
if snaps.ndim == 1:
snaps = np.atleast_2d(snaps).T # shape (Nh, 1)
else:
raise TypeError("Input must be a FunctionsList or a numpy ndarray.")
assert snaps.shape[0] == self._basis.fun_shape, "Input shape must match the SVD modes shape."
if self.matrix_B is None:
self.compute_B_matrix()
measures = self.get_measurements(snaps, M)
return self._solve_eim_linear_system(measures)
[docs]
def compute_errors(self, snaps: FunctionsList | np.ndarray, Mmax: int = None,
noise_std: float = 0.0, regularization_params: dict = None,
verbose: bool = False):
r"""
Computes the errors between the original snapshots and the reconstructed ones.
Parameters
----------
snaps : FunctionsList or np.ndarray
Original snapshots to compare with.
Mmax : int, optional
Maximum number of sensors and magic functions to use for the reconstruction. If None, all reduced basis is used.
noise_std : float, optional (Default = 0.0)
Standard deviation of the Gaussian noise to be added to the measurements. Default is 0.
regularization_params : dict, optional (Default = None)
Dictionary containing the regularization parameters. If `None`, no regularization is applied.
At the moment, the only supported regularization is Tikhonov from `Introini et al. (2023) <https://doi.org/10.1016/j.cma.2022.115773>`_.
verbose : bool, optional
If True, print progress messages. Default is False.
Returns
----------
mean_abs_err : np.ndarray
Average absolute error measured in :math:`L^2`.
mean_rel_err : np.ndarray
Average relative error measured in :math:`L^2`.
computational_time : dict
Dictionary with the CPU time of the most relevant operations during the online phase.
"""
if isinstance(snaps, FunctionsList):
Ns = len(snaps)
assert snaps.fun_shape == self._basis.fun_shape, "The shape of the snapshots must match the shape of the magic functions."
# Convert FunctionsList to numpy array for processing
snaps = snaps.return_matrix()
elif isinstance(snaps, np.ndarray):
Ns = snaps.shape[1]
assert snaps.shape[0] == self._basis.fun_shape, "The shape of the snapshots must match the shape of the magic functions."
else:
raise TypeError("Input must be a FunctionsList or a numpy ndarray.")
if Mmax is None:
Mmax = len(self._basis)
else:
assert Mmax <= len(self._basis), f"Mmax={Mmax} must be less than or equal to the number of magic functions, {len(self._basis)}."
if self.matrix_B is None:
self.compute_B_matrix()
abs_err = np.zeros((Ns, Mmax))
rel_err = np.zeros((Ns, Mmax))
# Variables to store computational time
computational_time = dict()
computational_time['Measures'] = np.zeros((Ns, Mmax))
computational_time['StateEstimation'] = np.zeros((Ns, Mmax))
computational_time['Errors'] = np.zeros((Ns, Mmax))
timer = Timer()
if verbose:
print(f"Computing L2 norm of snapshot", end='\r')
_snap_norm = list()
for mu_i in range(Ns):
timer.start()
_snap_norm.append(
self.calculator.L2_norm(snaps[:, mu_i])
)
computational_time['Errors'][mu_i, :] = timer.stop()
if verbose:
progressBar = LoopProgress(msg = f"Computing errors - {self.varname}", final = Mmax)
for mm in range(Mmax):
if verbose:
progressBar.update(1, percentage = False)
# Extract measures
timer.start()
_measures = self.get_measurements(snaps, M=mm+1, noise_std=noise_std) # shape (M, Ns)
computational_time['Measures'][:, mm] = timer.stop()
timer.start()
reconstructed_snaps = self.estimate(_measures, M=mm+1, regularization_params=regularization_params)
computational_time['StateEstimation'][:, mm] = timer.stop()
for mu_i in range(Ns):
timer.start()
_resid = snaps[:, mu_i] - reconstructed_snaps(mu_i)
abs_err[mu_i, mm] = self.calculator.L2_norm(_resid)
rel_err[mu_i, mm] = abs_err[mu_i, mm] / _snap_norm[mu_i]
computational_time['Errors'][mu_i, mm] += timer.stop()
Results = namedtuple('Results', ['mean_abs_err', 'mean_rel_err', 'computational_time'])
_res = Results(mean_abs_err = abs_err.mean(axis = 0), mean_rel_err = rel_err.mean(axis = 0), computational_time = computational_time)
return _res