Source code for pyforce.online.pod

# Online Phase: Proper Orthogonal Decomposition
# Author: Stefano Riva, PhD Student, NRG, Politecnico di Milano
# Latest Code Update: 07 October 2025
# Latest Doc  Update: 07 October 2025

import numpy as np
import pyvista as pv
from collections import namedtuple
import os

from ..tools.functions_list import FunctionsList
from ..tools.backends import IntegralCalculator, LoopProgress, Timer
from ..tools.write_read import ImportFunctionsList
from .online_base import OnlineDDROM, SurrogateModelWrapper

[docs] class POD(OnlineDDROM): r""" A class to estimate the state using the POD modes. This class implements the pure projection method to estimate the state using the POD modes and allows for the integration with surrogate models for the reduced dynamics (e.g., interpolation, machine learning, etc.), adopting the scheme of the POD with Interpolation (Demo et al, 2019 - https://doi.org/10.48550/arXiv.1905.05982). Parameters ---------- grid : pyvista.UnstructuredGrid The grid on which the POD is performed. It is used to define the spatial domain of the snapshots. gdim : int, optional (Default = 3) The geometric dimension of the grid. It can be either 2 or 3. varname : str, optional The name of the variable to be used for the POD. Default is 'u'. """
[docs] def set_basis(self, basis: FunctionsList = None, path_folder: str = None, **kwargs): """ Assign the basis functions to the POD model either from a FunctionsList object or by loading from a folder. Parameters ---------- basis : FunctionsList, optional An instance of `FunctionsList` containing the basis functions. path_folder : str, optional The path to the folder containing the basis functions. **kwargs : dict Additional keyword arguments to pass to the loading function. Returns ------- None """ if basis is not None: self._basis = basis elif path_folder is not None: _filename = os.path.join(path_folder, f'PODmode_{self.varname}') self._basis = ImportFunctionsList(_filename, **kwargs) else: raise ValueError("Either 'basis' or 'path_folder' must be provided.")
def _reconstruct(self, coeffs: np.ndarray): r""" This method reconstructs the state from the reduced coefficients :math:`\{\alpha_k\}_{k=1}^N` using the POD modes :math:`\{\psi_k\}_{k=1}^N`. .. math:: u(\cdot;\,\boldsymbol{\mu}) = \sum_{k=1}^N \alpha_k(\boldsymbol{\mu}) \psi_k(\cdot) Parameters ---------- coeffs : np.ndarray Reduced coefficients :math:`\{\alpha_k\}_{k=1}^N`, shaped :math:`(N,N_s)`. Returns ------- estimation : FunctionsList An instance of `FunctionsList` containing the estimated state. """ assert coeffs.shape[0] <= len(self.basis), "The number of coefficients must be less than or equal to the number of POD modes." coeffs = np.atleast_2d(coeffs) estimation = FunctionsList(self.basis.fun_shape) for nn in range(coeffs.shape[1]): estimation.append( self.basis.lin_combine(coeffs[:, nn]) ) return estimation
[docs] def estimate(self, coeff_model: SurrogateModelWrapper, input_vector: np.ndarray): r""" This method can be adopted to estimate the state using a surrogate model :math:`\{\mathcal{F}_k\}_{k=1}^N` (i.e., `coeff_model`) for the reduced coefficients :math:`\{\alpha_k\}_{k=1}^N`, given the input vector :math:`\mathbf{z}`, such that .. math:: \alpha_k(\mathbf{z}) = \mathcal{F}_k(\mathbf{z}) Parameters ---------- coeff_model : SurrogateModelWrapper The surrogate model to use for estimating the coefficients. input_vector : np.ndarray The input vector for which to estimate the state. Returns ------- estimation : FunctionsList An instance of `FunctionsList` containing the estimated state. """ coeffs = coeff_model.predict(input_vector) return self._reconstruct(coeffs)
def _project(self, u: np.ndarray, N: int): r""" Projects the function `u` onto the first `N` POD modes to obtain the reduced coefficients :math:`\{\alpha_k\}_{k=1}^N`. The projection is done using the inner product in :math:`L_2`, i.e. .. math:: \alpha_k(\boldsymbol{\mu}) = (u(\cdot;\,\boldsymbol{\mu}), \,\psi_k)_{L^2}\qquad k = 1, \dots, N Parameters ---------- u : np.ndarray Function object to project onto the reduced space of dimension `N`. N : int Dimension of the reduced space, modes to be used. Returns ------- coeffs : np.ndarray Modal POD coefficients of `u`, :math:`\{\alpha_k\}_{k=1}^N`. """ coeffs = np.zeros(N) for nn in range(N): coeffs[nn] = self.calculator.L2_inner_product(u, self.basis[nn]) return coeffs def _reduce(self, snaps: FunctionsList | np.ndarray, N: int = None): r""" The reduced coefficients :math:`\{\alpha_k\}_{k=1}^N` of the snapshots using `N` modes :math:`\{\psi_k\}_{k=1}^N` are computed using projection in :math:`L_2`. Parameters ---------- snaps : FunctionsList or np.ndarray Function object to project onto the reduced space of dimension `N`. N : int Dimension of the reduced space, modes to be used. Returns ------- coeff : np.ndarray Modal POD coefficients of `u`, :math:`\{\alpha_k\}_{k=1}^N`. """ if N is None: N = len(self.basis) else: assert N <= len(self.basis), "N must be less than or equal to the number of POD modes." if isinstance(snaps, FunctionsList): snaps = snaps.return_matrix() elif isinstance(snaps, np.ndarray): if snaps.ndim == 1: snaps = np.atleast_2d(snaps).T # shape (Nh, 1) else: raise TypeError("Input must be a FunctionsList or a numpy ndarray.") assert snaps.shape[0] == self.basis.fun_shape, "Input shape must match the POD modes shape." coeffs = np.zeros((N, snaps.shape[1])) for nn in range(coeffs.shape[1]): coeffs[:, nn] = self._project(snaps[:, nn], N) return coeffs
[docs] def compute_errors(self, snaps: FunctionsList | np.ndarray, coeff_model: SurrogateModelWrapper, input_vector: np.ndarray, verbose: bool = False): r""" Computes the errors between the original snapshots and the reconstructed ones. Parameters ---------- snaps : FunctionsList or np.ndarray Original snapshots to compare with. coeff_model : SurrogateModelWrapper The surrogate model to use for estimating the coefficients. input_vector : np.ndarray The input vector for which to estimate the state. verbose : bool, optional If True, print progress messages. Default is False. Returns ---------- mean_abs_err : np.ndarray Average absolute error measured in :math:`L^2`. mean_rel_err : np.ndarray Average relative error measured in :math:`L^2`. computational_time : dict Dictionary with the CPU time of the most relevant operations during the online phase. """ if isinstance(snaps, FunctionsList): Ns = len(snaps) assert snaps.fun_shape == self.basis.fun_shape, "The shape of the snapshots must match the shape of the POD modes." # Convert FunctionsList to numpy array for processing snaps = snaps.return_matrix() elif isinstance(snaps, np.ndarray): Ns = snaps.shape[1] assert snaps.shape[0] == self.basis.fun_shape, "The shape of the snapshots must match the shape of the POD modes." else: raise TypeError("Input must be a FunctionsList or a numpy ndarray.") Nmax = len(coeff_model.models) abs_err = np.zeros((Ns, Nmax)) rel_err = np.zeros((Ns, Nmax)) # Variables to store computational time computational_time = dict() computational_time['StateEstimation'] = np.zeros((Ns, Nmax)) computational_time['Errors'] = np.zeros((Ns, Nmax)) timer = Timer() if verbose: print(f"Computing L2 norm of snapshot", end='\r') _snap_norm = list() for mu_i in range(Ns): timer.start() _snap_norm.append( self.calculator.L2_norm(snaps[:, mu_i]) ) computational_time['Errors'][mu_i, :] = timer.stop() if verbose: progressBar = LoopProgress(msg = f"Computing errors (POD-I) - {self.varname}", final = Nmax) estimated_coeffs = coeff_model.predict(input_vector) for nn in range(Nmax): if verbose: progressBar.update(1, percentage = False) timer.start() reconstructed_snaps = self._reconstruct(estimated_coeffs[:nn+1, :]) # shape (fun_shape, Ns) computational_time['StateEstimation'][:, nn] = timer.stop() for mu_i in range(Ns): timer.start() _resid = snaps[:, mu_i] - reconstructed_snaps(mu_i) abs_err[mu_i, nn] = self.calculator.L2_norm(_resid) rel_err[mu_i, nn] = abs_err[mu_i, nn] / _snap_norm[mu_i] computational_time['Errors'][mu_i, nn] += timer.stop() Results = namedtuple('Results', ['mean_abs_err', 'mean_rel_err', 'computational_time']) _res = Results(mean_abs_err = abs_err.mean(axis = 0), mean_rel_err = rel_err.mean(axis = 0), computational_time = computational_time) return _res