Source code for pyforce.online.indirect_reconstruction

# Indirect Reconstruction algorithm tools
# Author: Stefano Riva, PhD Student, NRG, Politecnico di Milano
# Latest Code Update: 08 October 2025
# Latest Doc  Update: 08 October 2025

import numpy as np
from scipy.optimize import brute, shgo, differential_evolution
from collections import namedtuple
import pyvista as pv

from ..tools.backends import IntegralCalculator, LoopProgress, Timer
from ..tools.functions_list import FunctionsList
from .online_base import OnlineDDROM, SurrogateModelWrapper, OnlineSensors

# Define Objective Function
[docs] def objective(mu: np.ndarray, matrix_B: np.ndarray, target_measure: np.ndarray, maps: SurrogateModelWrapper): r""" This function evaluates the standard loss function :math:`\mathcal{L}_{PE}` to be minimized during the Parameter Estimation phase .. math:: \mathcal{L}_{PE}(\boldsymbol{\mu}) = \|B\boldsymbol{\beta}(\boldsymbol{\mu}) - \mathbf{y}\|_2^2 \qquad\qquad \text{ given }\mathcal{F}_m(\boldsymbol{\mu}) = \beta_m(\boldsymbol{\mu}) given the maps :math:`\mathcal{F}_m:\boldsymbol{\mu} \rightarrow \beta_m` (:math:`m = 1, \dots, M`). Parameters ---------- mu : np.ndarray The input parameter vector :math:`\boldsymbol{\mu} \in \mathbb{R}^{p}`. matrix_B : np.ndarray The (G)EIM matrix :math:`B \in \mathbb{R}^{M \times M}`. target_measure : np.ndarray The target measurement vector :math:`\mathbf{y} \in \mathbb{R}^{M}`. maps : SurrogateModelWrapper The surrogate model wrapper containing the maps :math:`\mathcal{F}_m` from parameters to (G)EIM coefficients. """ M = len(target_measure) assert matrix_B.shape == (M, M), f"Matrix B shape {matrix_B.shape} must be (M, M) with M = {M}." beta = maps.predict(mu)[:M] # shape (M,) return (np.linalg.norm( np.dot(matrix_B, beta) - target_measure))**2
# Parameter Estimation using scipy optimization tools
[docs] class ParameterEstimation(): def __init__(self, grid: pv.UnstructuredGrid, maps: SurrogateModelWrapper, bnds: list, gdim: int = 3): r""" A class to perform Parameter Estimation (PE) exploiting GEIM magic functions and sensors and numerical optimisation starting from a set of measurements :math:`\mathbf{y}\in\mathbb{R}^M` .. math:: \hat{\boldsymbol{\mu}} = \text{arg}\,\min\limits_{\boldsymbol{\mu}\in\mathcal{D}} \mathcal{L}_{PE}(\boldsymbol{\mu}) Parameters ---------- grid : pv.UnstructuredGrid The computational grid where the solution is defined. maps : SurrogateModelWrapper A surrogate model wrapper containing the maps :math:`\mathcal{F}_m` from parameters to (G)EIM coefficients. bnds : list A list of tuples defining the bounds for each parameter in the optimisation problem. gdim : int, optional (Default = 3) The geometric dimension of the problem (2D or 3D). """ self.grid = grid self.gdim = gdim # Store surrogate model -> parameters to (G)EIM coefficients self.maps = maps # Store bounds self.p = len(bnds) # Number of parameters self.bnds = bnds # Initialize (G)EIM matrix self.matrix_B = None
[docs] def set_magic_functions(self, basis: FunctionsList = None, path_folder: str = None, **kwargs): """ Assign the magic functions to the GEIM model either from a FunctionsList object or by loading from a folder. Parameters ---------- basis : FunctionsList, optional An instance of `FunctionsList` containing the basis functions. path_folder : str, optional The path to the folder containing the basis functions. **kwargs : dict Additional keyword arguments to pass to the loading function. Returns ------- None """ if basis is not None: self._basis = basis elif path_folder is not None: _filename = os.path.join(path_folder, f'mf_{self.varname}') self._basis = ImportFunctionsList(_filename, **kwargs) else: raise ValueError("Either 'basis' or 'path_folder' must be provided.")
[docs] def set_magic_sensors(self, sensors: FunctionsList = None, path_folder: str = None, **kwargs): """ Assign the magic sensors to the GEIM model either from a FunctionsList object or by loading from a folder. Parameters ---------- sensors : FunctionsList, optional An instance of `FunctionsList` containing the sensor functions. path_folder : str, optional The path to the folder containing the sensor functions. **kwargs : dict Additional keyword arguments to pass to the loading function. Returns ------- None """ if sensors is not None: self.sensors = OnlineSensors(library=sensors, gdim=self.gdim, grid=self.grid) elif path_folder is not None: _filename = os.path.join(path_folder, f'ms_{self.varname}') self.sensors = OnlineSensors(library=ImportFunctionsList(_filename, **kwargs), gdim=self.gdim, grid=self.grid) else: raise ValueError("Either 'sensors' or 'path_folder' must be provided.")
[docs] def compute_B_matrix(self): r""" Compute the matrix B used in the GEIM: this matrix is the evaluation of the magic functions at the magic sensors. .. math:: B_{ij} = v_i(q_j) \qquad i,j = 1, \ldots, M where :math:`\{q_j\}_{j=1}^M` are the magic functions and :math:`\{v_i\}_{i=1}^M` are the magic sensors. """ M = len(self.sensors._library) self.Mmax = M assert M == len(self._basis), f"Number of magic sensors {M} must be equal to the number of magic functions {len(self._basis)}." self.matrix_B = self.sensors.action(self._basis)
[docs] def get_measurements(self, snaps: FunctionsList | np.ndarray, M: int = None, noise_std: float = 0.0): r""" This method extracts the measures from the input functions at the magic sensors locations. Parameters ---------- snaps : FunctionsList or np.ndarray Function object to extract the measures from. M : int, optional (Default = None) Number of sensors to use for the extraction. If `None`, all available magic sensors are used. noise_std : float, optional (Default = 0.0) Standard deviation of the Gaussian noise to be added to the measurements. Default is 0. Returns ------- measures : np.ndarray Measures :math:`\{y_m\}_{m=1}^M`, shaped :math:`(M,N_s)`. """ if isinstance(snaps, FunctionsList): snaps = snaps.return_matrix() elif isinstance(snaps, np.ndarray): if snaps.ndim == 1: snaps = np.atleast_2d(snaps).T else: raise TypeError("Input must be a FunctionsList or a numpy ndarray.") assert snaps.shape[0] == self._basis.fun_shape, "Input shape must match the magic functions shape." measures = self.sensors.action(snaps, M=M) # shape (M, Ns) if noise_std > 0.0: noise = np.random.normal(0, noise_std, measures.shape) measures += noise return measures
[docs] def optimise(self, measure: np.ndarray, use_brute = True, grid_elem = 10): """ This auxiliary function performs the optimisation process given an input collection of measurements. Two options are available: - **brute force method** for finding the first guess of the parameter estimation + Least squares - **differential evolution method** for finding the first guess of the parameter estimation + Least squares Parameters ---------- measure : np.ndarray Measurement vector of `M` elements, array-like. use_brute : bool, optional (Default = True) brute force method for finding the first guess of the parameter estimation grid_elem : int, optional (Default = 10) Number of elements in the grid for the brute force method Returns ---------- solution : np.ndarray Solution of the optimisation (after least squares) guess : np.ndarray Solution of the optimisation (before least squares) """ M = len(measure) assert(M <= self.Mmax) if use_brute: # defining bounds for optimisation opt_bnds = list() for param in range(self.p): delta_mu = (self.bnds[param][1] - self.bnds[param][0]) / grid_elem opt_bnds.append(slice(self.bnds[param][0], self.bnds[param][1], delta_mu)) sol = brute(objective, opt_bnds, args=(self.matrix_B[:M, :M], measure, self.maps)) else: sol = differential_evolution(objective, self.bnds, args=(self.matrix_B[:M, :M], measure, self.maps)).x return sol
[docs] def compute_errors(self, test_param: np.ndarray, test_snaps: FunctionsList | np.ndarray, M: int, noise_std = 0.0, use_brute = True, grid_elem = 10, verbose = False): r""" The absolute and relative error of the PE phase, using different measurements coming from the test set, are computed. Parameters ---------- test_param : np.ndarray The test parameter set :math:`\{\boldsymbol{\mu}^{(n)}\}_{n=1}^{N_{test}}`, shaped :math:`(N_{test}, p)`. test_snaps : FunctionsList or np.ndarray The test snapshots set :math:`\{u^{(n)}\}_{n=1}^{N_{test}}`. M : int Number of sensors to use for the generation of the measures. noise_std : float, optional (Default = 0.0) Standard deviation of the Gaussian noise to be added to the measurements. use_brute : bool, optional (Default = True) If `True`, the brute force method is used for the initial guess in the optimisation. If `False`, differential evolution is used. grid_elem : int, optional (Default = 10) Number of elements in the grid for the brute force method. verbose : bool, optional (Default = False) If `True`, progress information is printed to the console. Returns ------- mean_abs_err : np.ndarray Average absolute error of the parameter estimation mean_rel_err : np.ndarray Average relative error of the parameter estimation computational_time : dict Dictionary with the CPU time of the most relevant operations during the online phase. mu_PE : list List containing the estimated parameters after least squares at varying number of measurements mu_PE_guess : list List containing the estimated parameters before least squares at varying number of measurements """ Ns = len(test_snaps) if verbose: bar = LoopProgress(msg = f"Computing PE errors", final = Ns) mu_PE = np.zeros((Ns, M, self.p)) # Variables to store computational time computational_time = dict() computational_time['Measures'] = np.zeros((Ns, M)) computational_time['Optimisation'] = np.zeros((Ns, M)) timer = Timer() # Loop over test parameters for idx_mu in range(Ns): for mm in range(M): # Get measurements timer.start() measure = self.get_measurements( test_snaps(idx_mu), M=mm+1, noise_std=noise_std ) computational_time['Measures'][idx_mu, mm] = timer.stop() # Perform optimisation timer.start() mu_PE[idx_mu, mm, :] = self.optimise( measure=measure, use_brute=use_brute, grid_elem=grid_elem ) computational_time['Optimisation'][idx_mu, mm] = timer.stop() if verbose: bar.update(1) # Compute errors abs_err = np.abs(mu_PE - test_param[:, np.newaxis, :]).mean(axis=0) # shape (M, p) rel_err = (np.abs(mu_PE - test_param[:, np.newaxis, :]) / (np.abs(test_param[:, np.newaxis, :]) + 1e-12)).mean(axis=0) # shape (M, p) # Store results Results = namedtuple('Results', ['mean_abs_err', 'mean_rel_err', 'computational_time', 'mu_PE']) return Results(mean_abs_err=abs_err, mean_rel_err=rel_err, computational_time=computational_time, mu_PE=mu_PE)