Source code for pyforce.offline.eim

# Offline Phase: Empirical Interpolation Method (EIM) - including discrete empirical interpolation method (DEIM)
# Author: Stefano Riva, PhD Student, NRG, Politecnico di Milano
# Latest Code Update: 07 October 2025
# Latest Doc  Update: 07 October 2025

import numpy as np
import scipy
from collections import namedtuple
import os
import warnings

import scipy.linalg

from ..tools.functions_list import FunctionsList
from ..tools.backends import IntegralCalculator, LoopProgress, Timer
from .offline_base import OfflineDDROM

[docs] class EIM(OfflineDDROM): r""" A class implementing the Empirical Interpolation Method (EIM) for dimensionality reduction and sensor placement. Parameters ---------- grid: pv.UnstructuredGrid The computational grid. gdim: int, optional (default=3) The geometric dimension of the problem. It can be either 2 or 3. varname: str, optional (default='u') The name of the variable to analyse. Default is 'u'. """
[docs] def fit(self, train_snaps: FunctionsList, Mmax: int, _xm_idx : list = None, verbose= False): r""" The greedy algorithm chooses the magic functions and magic points by minimising the reconstruction error. Parameters ---------- train_snaps : FunctionsList List of snapshots serving as training set. Mmax : int Integer input indicating the maximum number of functions and sensors to define _xm_idx : list, optional (default = None) User-defined available indices for the points (sensors), if `None` the indices are all mesh elements. verbose : boolean, optional (Default = False) If `True`, print of the progress is enabled. Returns ---------- maxAbsErr : np.ndarray Maximum absolute error measured in :math:`L^\infty` (max norm) maxRelErr : np.ndarray Maximum relative error measured in :math:`L^\infty` (max norm) beta_coeff : np.ndarray Matrix of the reduced coefficients :math:`\{ \beta_m \}_{m=1}^M`, obtained by greedy procedure """ # Select if the user provided cell field data or point field data if train_snaps.fun_shape == self.grid.n_cells: self.field_type = 'cell' self.Nh = self.grid.n_cells nodes = self.grid.cell_centers().points elif train_snaps.fun_shape == self.grid.n_points: self.field_type = 'point' self.Nh = self.grid.n_points nodes = self.grid.points else: if train_snaps.fun_shape * self.gdim == self.grid.n_cells or train_snaps.fun_shape * self.gdim == self.grid.n_points: raise ValueError(f"The shape of the input functions ({train_snaps.fun_shape}) seems to be a vector field, not supported: only scalar fields are allowed.") raise ValueError(f"The shape of the input functions ({train_snaps.fun_shape}) does not match the number of cells ({self.grid.n_cells}) or points ({self.grid.n_points}) of the grid.") Ns_train = len(train_snaps) # Generate sensor library if _xm_idx is None: xm = np.arange(0, self.Nh, 1, dtype=int) else: xm = np.asarray(_xm_idx, dtype=int) # Initialize arrays beta_coeff = np.zeros((Mmax, len(train_snaps))) self.magic_functions = FunctionsList(dofs = train_snaps.fun_shape) self.magic_points = { 'idx': list(), 'points': list() } self.generating_fun = list() # Initialize errors maxAbsErr = np.zeros(Mmax) # Convert snaps to matrix snaps_matrix = train_snaps.return_matrix() # Generating the first magic function and associated point from the function maximizing the L_inf norm mm = 0 self.generating_fun.append( np.abs(snaps_matrix[xm]).max(axis=0).argmax() ) self.magic_points['idx'].append( xm[np.abs(snaps_matrix[xm, self.generating_fun[mm]]).argmax()] ) self.magic_points['points'].append( nodes[self.magic_points['idx'][mm], :] ) self.magic_functions.append( snaps_matrix[:, self.generating_fun[mm]] / snaps_matrix[self.magic_points['idx'][mm], self.generating_fun[mm]] ) # Generate the first interpolant self.matrix_B = np.zeros((Mmax, Mmax)) self.matrix_B[mm, mm] = self.magic_functions[mm][self.magic_points['idx'][mm]] beta_coeff[mm] = snaps_matrix[self.magic_points['idx'][mm]] interpolants = self._reconstruct_from_coeffs(beta_coeff[mm].reshape(1, -1)) # Main Loop for mm in range(1, Mmax): residuals_matrix = snaps_matrix - interpolants.return_matrix() # Find the next point with maximum residual self.generating_fun.append( np.abs(residuals_matrix[xm]).max(axis=0).argmax() ) # Assign maximum errors maxAbsErr[mm-1] = np.abs(residuals_matrix[xm, self.generating_fun[mm]]).max() if verbose: print(f'EIM Iteration {(mm+1)+0:03} | Abs Err (Linfty): {maxAbsErr[mm-1]:.2e}', end="\r") # Find the next magic point self.magic_points['idx'].append( xm[np.abs(residuals_matrix[xm, self.generating_fun[mm]]).argmax()] ) self.magic_points['points'].append( nodes[self.magic_points['idx'][mm], :] ) # Generate the next magic function self.magic_functions.append( residuals_matrix[:, self.generating_fun[mm]] / residuals_matrix[self.magic_points['idx'][mm], self.generating_fun[mm]] ) # Update matrix B self.matrix_B[:mm+1, :mm+1] = self.magic_functions.return_matrix()[self.magic_points['idx']] # Solve for the reduced coefficients for mu_i in range(Ns_train): beta_coeff[:mm+1, mu_i] = self._solve_eim_linear_system(snaps_matrix[self.magic_points['idx'], mu_i]) # Update the interpolant interpolants = self._reconstruct_from_coeffs(beta_coeff[:mm+1]) # Final error assignment residuals_matrix = snaps_matrix - interpolants.return_matrix() self.generating_fun.append( np.abs(residuals_matrix[xm]).max(axis=0).argmax() ) maxAbsErr[mm] = np.abs(residuals_matrix[xm, self.generating_fun[-1]]).max() if verbose: print(f' Iteration {(mm+1)+0:03} | Abs Err (Linfty): {maxAbsErr[mm-1]:.2e} - EIM done') return maxAbsErr, beta_coeff
[docs] def compute_lebesgue_constant(self): r""" This method computes the Lebesgue constant associated with the selected magic points. The procedure exploits the Lagrange basis functions, depicted in `Tiglio and Villanueva (2021) <https://iopscience.iop.org/article/10.1088/1361-6382/abf894>`_. Returns ------- Lambda : float The Lebesgue constant. """ if not hasattr(self, 'magic_functions'): raise ValueError("The model has not been fitted yet. Please call the 'fit' method before computing the Lebesgue constant.") Lambda = list() for mm in range(1, len(self.magic_functions)+1): _mf = self.magic_functions.return_matrix()[:,:mm] _matrix_B = self.matrix_B[:mm, :mm] lagrange_fun = scipy.linalg.solve(_matrix_B, _mf.T, lower=True).T # sum of absolute values at each point leb_at_x = np.sum(np.abs(lagrange_fun), axis=1) # max over domain Lambda.append(np.max(leb_at_x)) return Lambda
def _reconstruct_from_coeffs(self, coeffs: np.ndarray): r""" This method reconstructs the function `u` from the reduced coefficients :math:`\{\beta_m\}_{m=1}^M` using the magic functions :math:`\{q_m\}_{m=1}^M`. .. math:: u(\cdot;\,\boldsymbol{\mu}) = \sum_{k=1}^N \beta_k(\boldsymbol{\mu}) q_k(\cdot) Parameters ---------- coeffs : np.ndarray Reduced coefficients :math:`\{\beta_m\}_{m=1}^M`, shaped :math:`(M,N_s)`. Returns ------- u : FunctionsList Reconstructed functions. """ assert coeffs.shape[0] <= len(self.magic_functions), "The number of coefficients must be lower to the number of magic functions" coeffs = np.atleast_2d(coeffs) reconstructed_snaps = FunctionsList(dofs = self.Nh) for nn in range(coeffs.shape[1]): reconstructed_snaps.append( self.magic_functions.lin_combine(coeffs[:, nn]) ) return reconstructed_snaps def _solve_eim_linear_system(self, measures: np.ndarray): r""" Computes the reduced coefficients :math:`\{\beta_m\}_{m=1}^{M}}` with as many magic functions/points (synthetic) as the input measures :math:`M`. .. math:: y_m = u(\vec{x}_m) \qquad m = 1, \dots, M_{max} Parameters ---------- measures : np.ndarray Measurements vector :math:`\{y_m\}_{m=1}^{M_{max}}` of the function `u` at the sensors locations. Returns ---------- beta_coeff : np.ndarray Array of coefficients for the interpolant :math:`\{\beta_m\}_{m=1}^{M_{max}}` """ assert measures.shape[0] <= len(self.magic_functions), "The number of measures must be equal to the number of magic functions" _m = measures.shape[0] beta_coeff = scipy.linalg.solve(self.matrix_B[:_m, :_m], measures[:_m], lower = True) return beta_coeff
[docs] def reconstruct(self, measures: np.ndarray): r""" This method reconstructs the state by obtaining the reduced coefficients :math:`\{\beta_m\}_{m=1}^M` from the input measures :math:`\{y_m\}_{m=1}^M` by solving the EIM linear system .. math:: \mathbb{B} \boldsymbol{\beta} = \mathbf{y} where - :math:`\mathbb{B}` is the matrix of magic functions - :math:`\boldsymbol{\beta}` are the reduced coefficients - :math:`\mathbf{y}` are the input measures Parameters ---------- measures : np.ndarray Input measures :math:`\{y_m\}_{m=1}^M`, shaped :math:`(M,N_s)`. Returns ------- u : FunctionsList Reconstructed functions. """ beta_coeff = self._solve_eim_linear_system(measures) return self._reconstruct_from_coeffs(beta_coeff)
[docs] def reduce(self, snaps: FunctionsList | np.ndarray, M: int = None): r""" This method can be used to generate the reduced coefficients based on the magic functions and points, by extracting the measures from the input functions and solve the associated EIM linear system. Parameters ---------- snaps : FunctionsList or np.ndarray Function object to project onto the reduced space of dimension `M`. M : int, optional (Default = None) Number of magic functions/points to use for the projection. If `None`, all available magic functions/points are used. Returns ------- beta_coeff : np.ndarray Reduced coefficients :math:`\{\beta_m\}_{m=1}^M`, shaped :math:`(M,N_s)`. """ if M is None: M = len(self.magic_functions) else: assert M <= len(self.magic_functions), "M cannot be larger than the number of magic functions available" if isinstance(snaps, FunctionsList): snaps = snaps.return_matrix() elif isinstance(snaps, np.ndarray): if snaps.ndim == 1: snaps = np.atleast_2d(snaps).T # shape (Nh, 1) else: raise TypeError("Input must be a FunctionsList or a numpy ndarray.") assert snaps.shape[0] == self.magic_functions.fun_shape, "Input shape must match the SVD modes shape." measures = snaps[self.magic_points['idx'][:M]] return self._solve_eim_linear_system(measures)
def _get_measures(self, snaps: FunctionsList | np.ndarray, M: int = None): r""" This method extracts the measures from the input functions at the magic points locations. Parameters ---------- snaps : FunctionsList or np.ndarray Function object to extract the measures from. M : int, optional (Default = None) Number of magic points to use for the extraction. If `None`, all available magic points Returns ------- measures : np.ndarray Measures :math:`\{y_m\}_{m=1}^M`, shaped :math:`(M,N_s)`. """ if isinstance(snaps, FunctionsList): snaps = snaps.return_matrix() elif isinstance(snaps, np.ndarray): if snaps.ndim == 1: snaps = np.atleast_2d(snaps).T else: raise TypeError("Input must be a FunctionsList or a numpy ndarray.") assert snaps.shape[0] == self.magic_functions.fun_shape, "Input shape must match the SVD modes shape." measures = snaps[self.magic_points['idx'][:M]] return measures
[docs] def compute_errors(self, snaps: FunctionsList | np.ndarray, Mmax: int = None, verbose: bool = False): r""" Computes the errors between the original snapshots and the reconstructed ones. Parameters ---------- snaps : FunctionsList or np.ndarray Original snapshots to compare with. Mmax : int, optional Maximum number of sensors and magic functions to use for the reconstruction. If None, all reduced basis is used. verbose : bool, optional If True, print progress messages. Default is False. Returns ---------- mean_abs_err : np.ndarray Average absolute error measured in :math:`L^2`. mean_rel_err : np.ndarray Average relative error measured in :math:`L^2`. computational_time : dict Dictionary with the CPU time of the most relevant operations during the online phase. """ if isinstance(snaps, FunctionsList): Ns = len(snaps) assert snaps.fun_shape == self.magic_functions.fun_shape, "The shape of the snapshots must match the shape of the magic functions." # Convert FunctionsList to numpy array for processing snaps = snaps.return_matrix() elif isinstance(snaps, np.ndarray): Ns = snaps.shape[1] assert snaps.shape[0] == self.magic_functions.fun_shape, "The shape of the snapshots must match the shape of the magic functions." else: raise TypeError("Input must be a FunctionsList or a numpy ndarray.") if Mmax is None: Mmax = len(self.magic_functions) else: assert Mmax <= len(self.magic_functions), f"Mmax={Mmax} must be less than or equal to the number of magic functions, {len(self.magic_functions)}." abs_err = np.zeros((Ns, Mmax)) rel_err = np.zeros((Ns, Mmax)) # Variables to store computational time computational_time = dict() computational_time['Measurements'] = np.zeros((Ns, Mmax)) computational_time['StateEstimation'] = np.zeros((Ns, Mmax)) computational_time['Errors'] = np.zeros((Ns, Mmax)) timer = Timer() if verbose: print(f"Computing L2 norm of snapshot", end='\r') _snap_norm = list() for mu_i in range(Ns): timer.start() _snap_norm.append( self.calculator.L2_norm(snaps[:, mu_i]) ) computational_time['Errors'][mu_i, :] = timer.stop() if verbose: progressBar = LoopProgress(msg = f"Computing errors - {self.varname}", final = Mmax) for mm in range(Mmax): if verbose: progressBar.update(1, percentage = False) timer.start() _measures = self._get_measures(snaps, M=mm+1) # shape (M, Ns) computational_time['Measurements'][:, mm] = timer.stop() timer.start() reconstructed_snaps = self.reconstruct(_measures) computational_time['StateEstimation'][:, mm] = timer.stop() for mu_i in range(Ns): timer.start() _resid = snaps[:, mu_i] - reconstructed_snaps(mu_i) abs_err[mu_i, mm] = self.calculator.L2_norm(_resid) rel_err[mu_i, mm] = abs_err[mu_i, mm] / _snap_norm[mu_i] computational_time['Errors'][mu_i, mm] += timer.stop() Results = namedtuple('Results', ['mean_abs_err', 'mean_rel_err', 'computational_time']) _res = Results(mean_abs_err = abs_err.mean(axis = 0), mean_rel_err = rel_err.mean(axis = 0), computational_time = computational_time) return _res
[docs] def save(self, path_folder: str, **kwargs): r""" Save the magic functions and points to a specified path. Parameters ---------- path_folder : str The folder path where the model will be saved. **kwargs : dict Additional keyword arguments for saving options. """ os.makedirs(path_folder, exist_ok=True) # Save the magic functions self.magic_functions.store( f'mf_{self.varname}', filename=os.path.join(path_folder, f'mf_{self.varname}'), **kwargs) np.save(os.path.join(path_folder, f'magic_points_{self.varname}.npy'), self.magic_points)
[docs] def deim(basis_functions: FunctionsList, Mmax: int, varname: str = 'u', _xm_idx : list = None, path_folder: str = None): r""" This function implements the Discrete Empirical Interpolation Method (DEIM) for selecting magic points from a given set of basis functions. Parameters ---------- basis_functions : FunctionsList The list of basis functions from which to select magic points. Mmax : int The maximum number of magic points to select. varname : str, optional The name of the variable for which the magic points are being selected. Default is 'u'. _xm_idx : list, optional A list of indices to consider for magic point selection. If None, all indices are considered. path_folder : str, optional The folder path where the selected magic points will be saved. If None, the points are simply returned. Returns ------- magic_pt : np.ndarray An array of selected magic point indices. P : np.ndarray The DEIM selection matrix (observation operator). """ modes = basis_functions.return_matrix() # Define available indices if _xm_idx is None: xm = np.arange(modes.shape[0], dtype=int) else: xm = _xm_idx # Initialize DEIM arrays magic_pt = np.zeros(Mmax, dtype=int) P = np.zeros((Mmax, modes.shape[0])) # First DEIM point p = np.argmax(np.abs(modes[xm, 0])) P[0, xm[p]] = 1 magic_pt[0] = xm[p] # DEIM loop for j in range(1, Mmax): print(f'DEIM iteration {j+1}/{Mmax}', end='\r') # Avoid warnings for numerical issues with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) # Solve linear system PtU = (P[:j] @ modes[:, :j]).reshape(j,j) y = modes[magic_pt[:j], j] c = np.linalg.solve(PtU, y) # Compute the residual r = modes[xm, j] - modes[xm, :j] @ c # Select the next DEIM point p = np.argmax(np.abs(r)) P[j, xm[p]] = 1 magic_pt[j] = xm[p] if path_folder is not None: os.makedirs(path_folder, exist_ok=True) np.save(os.path.join(path_folder, f'magic_points_deim_{varname}.npy'), magic_pt) return magic_pt, P