Source code for pyforce.online.pbdw

# Online Phase: Parameterised Background Data-Weak (PBDW) Method - including regularized version
# Author: Stefano Riva, PhD Student, NRG, Politecnico di Milano
# Latest Code Update: 07 October 2025
# Latest Doc  Update: 07 October 2025

import warnings
import numpy as np
import pyvista as pv
from collections import namedtuple
import os
import scipy

from ..tools.functions_list import FunctionsList
from ..tools.backends import IntegralCalculator, LoopProgress, Timer
from ..tools.write_read import ImportFunctionsList
from .online_base import OnlineDDROM, OnlineSensors

[docs] class PBDW(OnlineDDROM): r""" A class to estimate the state using the PBDW. This class implements the parameterized background data-weak (PBDW) method to estimate the state using measurements. Parameters ---------- grid : pyvista.UnstructuredGrid The computational grid. It is used to define the spatial domain of the snapshots. gdim : int, optional (Default = 3) The geometric dimension of the grid. It can be either 2 or 3. varname : str, optional The name of the variable to be used for the POD. Default is 'u'. """ def __init__(self, grid: pv.UnstructuredGrid, gdim: int = 3, varname: str = 'u'): super().__init__(grid, gdim=gdim, varname=varname) self.matrix_K = None self.matrix_A = None self.matrix_Z = None
[docs] def set_basis(self, basis: FunctionsList = None, path_folder: str = None, **kwargs): """ Assign the basis functions of the reduced space :math:`Z_N` either from a FunctionsList object or by loading from a folder. Parameters ---------- basis : FunctionsList, optional An instance of `FunctionsList` containing the basis functions. path_folder : str, optional The path to the folder containing the basis functions. **kwargs : dict Additional keyword arguments to pass to the loading function. Returns ------- None """ if basis is not None: self._basis = basis elif path_folder is not None: _filename = os.path.join(path_folder, f'mf_{self.varname}') self._basis = ImportFunctionsList(_filename, **kwargs) else: raise ValueError("Either 'basis' or 'path_folder' must be provided.") self.N = len(self._basis)
[docs] def set_basis_sensors(self, sensors: FunctionsList = None, path_folder: str = None, **kwargs): """ Assign the basis sensors to the PBDW model either from a FunctionsList object or by loading from a folder. At the moment, they are the Riesz representation in :math:`L^2` of the functionals. Parameters ---------- sensors : FunctionsList, optional An instance of `FunctionsList` containing the sensor functions. path_folder : str, optional The path to the folder containing the sensor functions. **kwargs : dict Additional keyword arguments to pass to the loading function. Returns ------- None """ if sensors is not None: self.sensors = OnlineSensors(library=sensors, gdim=self.gdim, grid=self.grid) elif path_folder is not None: _filename = os.path.join(path_folder, f'ms_{self.varname}') self.sensors = OnlineSensors(library=ImportFunctionsList(_filename, **kwargs), gdim=self.gdim, grid=self.grid) else: raise ValueError("Either 'sensors' or 'path_folder' must be provided.")
[docs] def compute_matrices(self): r""" Compute the matrices K, A and Z used in the PBDW: given :math:`\{g_j\}_{j=1}^M` the basis sensors and :math:`\{\zeta_i\}_{i=1}^N` the basis functions .. math:: \mathbb{A}_{ij} = (g_i, g_j)_{L^2} \in\mathbb{R}^{M\times M} .. math:: \mathbb{K}_{ji} = (\zeta_i, g_j)_{L^2} \in\mathbb{R}^{M\times N} .. math:: \mathbb{Z}_{ij} = (\zeta_i, \zeta_j)_{L^2} \in\mathbb{R}^{N\times N} where :math:`(\cdot, \cdot)_{L^2}` is the :math:`L^2` inner product. """ M = len(self.sensors._library) assert M >= self.N, f"Number of sensors {M} must be larger than or equal to the number of basis functions {self.N}." self.matrix_A = self.sensors.action(self.sensors._library) # shape (M, M) self.matrix_K = self.sensors.action(self._basis) # shape (M, N) self.matrix_Z = np.zeros((self.N, self.N)) for i in range(self.N): for j in range(i, self.N): if i >= j: self.matrix_Z[i, j] = self.calculator.L2_inner_product(self._basis(i), self._basis(j)) else: self.matrix_Z[j, i] = self.matrix_Z[i, j]
[docs] def get_measurements(self, snaps: FunctionsList | np.ndarray, M: int = None, noise_std: float = 0.0): r""" This method extracts the measures from the input functions at the sensor locations. Parameters ---------- snaps : FunctionsList or np.ndarray Function object to extract the measures from. M : int, optional (Default = None) Number of sensors to use for the extraction. If `None`, all available sensors are used. noise_std : float, optional (Default = 0.0) Standard deviation of the Gaussian noise to be added to the measurements. Default is 0. Returns ------- measures : np.ndarray Measures :math:`\{y_m\}_{m=1}^M`, shaped :math:`(M,N_s)`. """ if isinstance(snaps, FunctionsList): snaps = snaps.return_matrix() elif isinstance(snaps, np.ndarray): if snaps.ndim == 1: snaps = np.atleast_2d(snaps).T else: raise TypeError("Input must be a FunctionsList or a numpy ndarray.") assert snaps.shape[0] == self._basis.fun_shape, "Input shape must match the magic functions shape." measures = self.sensors.action(snaps, M=M) # shape (M, Ns) if noise_std > 0.0: noise = np.random.normal(0, noise_std, measures.shape) measures += noise return measures
def _reconstruct(self, alpha_coeffs: np.ndarray, theta_coeffs: np.ndarray): r""" This method reconstructs the state from the reduced coefficients :math:`\{\alpha_n\}_{n=1}^N` and :math:`\{\theta_m\}_{m=1}^M` .. math:: u(\cdot;\,\boldsymbol{\mu}) = \sum_{n=1}^N \alpha_n(\boldsymbol{\mu}) \zeta_n(\cdot)+\sum_{m=1}^M \theta_m(\boldsymbol{\mu}) g_m(\cdot) given the basis functions :math:`\{\zeta_n\}_{n=1}^N` and the basis sensors :math:`\{g_m\}_{m=1}^M`. Parameters ---------- alpha_coeffs : np.ndarray Array of coefficients for the basis functions :math:`\{\alpha_n\}_{n=1}^N`, shaped :math:`(N,N_s)`. theta_coeffs : np.ndarray Array of coefficients for the basis sensors :math:`\{\theta_m\}_{m=1}^M`, shaped :math:`(M,N_s)`. Returns ------- estimation : FunctionsList An instance of `FunctionsList` containing the estimated state. """ assert alpha_coeffs.shape[0] <= self.N, "The number of coefficients must be less than or equal to the number of basis functions." assert theta_coeffs.shape[0] <= len(self.sensors._library), "The number of coefficients must be less than or equal to the number of sensors." alpha_coeffs = np.atleast_2d(alpha_coeffs) theta_coeffs = np.atleast_2d(theta_coeffs) assert alpha_coeffs.shape[1] == theta_coeffs.shape[1], "The number of columns of alpha_coeffs and theta_coeffs must be equal." estimation = FunctionsList(self.basis.fun_shape) for mu in range(alpha_coeffs.shape[1]): estimation.append( self.basis.lin_combine(alpha_coeffs[:, mu]) + self.sensors._library.lin_combine(theta_coeffs[:, mu]) ) return estimation def _solve_pbdw_linear_system(self, measures: np.ndarray, xi: float = 0.0): r""" Computes the reduced coefficients :math:`\{\alpha_n\}_{n=1}^{N}}` and :math:`\{\theta_m\}_{m=1}^{M}}` with as many basis sensors as the input measures :math:`M`. The following linear system is solved .. math:: \left[ \begin{array}{ccc} \xi \cdot M \cdot \mathbb{I} + \mathbb{A} & & \mathbb{K} \\ & & \\ \mathbb{K}^T & & 0 \end{array} \right] \cdot \left[ \begin{array}{c} \boldsymbol{\theta} \\ \\ \boldsymbol{\alpha} \end{array} \right] = \left[ \begin{array}{c} \mathbf{y} \\ \\ \mathbf{0} \end{array} \right] given :math:`\mathbf{y}\in\mathbb{R}^M` as the measurements vector. Parameters ---------- measures : np.ndarray Measurements vector :math:`\{y_m\}_{m=1}^{M_{max}}` of the function `u` at the sensors locations. xi : float, optional (Default = 0.0) Regularization parameter for the regularization. Default is 0.0 Returns ---------- beta_coeff : np.ndarray Array of coefficients for the interpolant :math:`\{\beta_m\}_{m=1}^{M_{max}}` """ assert measures.shape[0] >= len(self._basis), "The number of measures must be larger than or equal to the number of basis functions" assert measures.shape[0] <= len(self.sensors._library), "The number of measures must be less than or equal to the number of sensors" if self.matrix_A is None or self.matrix_K is None or self.matrix_Z is None: self.compute_matrices() _m = measures.shape[0] _A = self.matrix_A[:_m, :_m] + xi * _m * np.eye(_m) _K = self.matrix_K[:_m] _Z = self.matrix_Z # Assemble the full linear system sys_matrix = np.block([ [_A, _K], [_K.T, np.zeros((_Z.shape[0], _Z.shape[0]))] ]) # Right-hand side rhs = np.concatenate([measures, np.zeros((_Z.shape[0], measures.shape[1]))], axis=0) # Solve the linear system _coeffs = scipy.linalg.solve(sys_matrix, rhs) theta_coeff = _coeffs[:_m] alpha_coeff = _coeffs[_m:] return alpha_coeff, theta_coeff
[docs] def estimate(self, measures: np.ndarray, xi: float = 0.0): r""" Estimate the state using the PBDW given the measures at the sensor locations. Parameters ---------- measures : np.ndarray Measures :math:`\{y_m\}_{m=1}^{M}`, shaped :math:`(M, N_s)`. xi : float, optional (Default = 0.0) Regularization parameter for the regularization. Default is 0.0 Returns ------- estimation : FunctionsList An instance of `FunctionsList` containing the estimated state. """ if self.matrix_A is None or self.matrix_K is None or self.matrix_Z is None: self.compute_matrices() alpha_coeffs, theta_coeffs = self._solve_pbdw_linear_system(measures, xi=xi) estimation = self._reconstruct(alpha_coeffs, theta_coeffs) return estimation
def _reduce(self): r""" Not implemented for PBDW. """ warnings.warn("The '_reduce' method is not implemented for PBDW.") pass
[docs] def compute_errors(self, snaps: FunctionsList | np.ndarray, Mmax: int = None, noise_std: float = 0.0, xi: float = 0.0, verbose: bool = False): r""" Computes the errors between the original snapshots and the reconstructed ones. Parameters ---------- snaps : FunctionsList or np.ndarray Original snapshots to compare with. Mmax : int, optional Maximum number of sensors and magic functions to use for the reconstruction. If None, all reduced basis is used. noise_std : float, optional (Default = 0.0) Standard deviation of the Gaussian noise to be added to the measurements. Default is 0. xi : float, optional (Default = 0.0) Regularization parameter for the regularization. Default is 0.0 verbose : bool, optional If True, print progress messages. Default is False. Returns ---------- mean_abs_err : np.ndarray Average absolute error measured in :math:`L^2`. mean_rel_err : np.ndarray Average relative error measured in :math:`L^2`. computational_time : dict Dictionary with the CPU time of the most relevant operations during the online phase. """ if isinstance(snaps, FunctionsList): Ns = len(snaps) assert snaps.fun_shape == self._basis.fun_shape, "The shape of the snapshots must match the shape of the magic functions." # Convert FunctionsList to numpy array for processing snaps = snaps.return_matrix() elif isinstance(snaps, np.ndarray): Ns = snaps.shape[1] assert snaps.shape[0] == self._basis.fun_shape, "The shape of the snapshots must match the shape of the magic functions." else: raise TypeError("Input must be a FunctionsList or a numpy ndarray.") if Mmax is None: Mmax = len(self.sensors) else: assert Mmax <= len(self.sensors), f"Mmax={Mmax} must be less than or equal to the number of sensors, {len(self.sensors)}." if self.matrix_A is None or self.matrix_K is None or self.matrix_Z is None: self.compute_matrices() abs_err = np.zeros((Ns, Mmax - self.N + 1)) rel_err = np.zeros((Ns, Mmax - self.N + 1)) # Variables to store computational time computational_time = dict() computational_time['Measures'] = np.zeros((Ns, Mmax - self.N + 1)) computational_time['StateEstimation'] = np.zeros((Ns, Mmax - self.N + 1)) computational_time['Errors'] = np.zeros((Ns, Mmax - self.N + 1)) timer = Timer() if verbose: print(f"Computing L2 norm of snapshot", end='\r') _snap_norm = list() for mu_i in range(Ns): timer.start() _snap_norm.append( self.calculator.L2_norm(snaps[:, mu_i]) ) computational_time['Errors'][mu_i, :] = timer.stop() if verbose: progressBar = LoopProgress(msg = f"Computing errors - {self.varname}", final = Mmax - self.N + 1) for mm in range(self.N, Mmax+1): if verbose: progressBar.update(1, percentage = False) # Extract measures timer.start() _measures = self.get_measurements(snaps, M=mm, noise_std=noise_std) # shape (M, Ns) computational_time['Measures'][:, mm - self.N] = timer.stop() timer.start() reconstructed_snaps = self.estimate(_measures, xi=xi) computational_time['StateEstimation'][:, mm - self.N] = timer.stop() for mu_i in range(Ns): timer.start() _resid = snaps[:, mu_i] - reconstructed_snaps(mu_i) abs_err[mu_i, mm - self.N] = self.calculator.L2_norm(_resid) rel_err[mu_i, mm - self.N] = abs_err[mu_i, mm - self.N] / _snap_norm[mu_i] computational_time['Errors'][mu_i, mm - self.N] += timer.stop() Results = namedtuple('Results', ['mean_abs_err', 'mean_rel_err', 'computational_time']) _res = Results(mean_abs_err = abs_err.mean(axis = 0), mean_rel_err = rel_err.mean(axis = 0), computational_time = computational_time) return _res