Source code for pyforce.offline.pod

# Offline Phase: Singular Value Decomposition and Proper Orthogonal Decomposition
# Author: Stefano Riva, PhD Student, NRG, Politecnico di Milano
# Latest Code Update: 07 October 2025
# Latest Doc  Update: 07 October 2025

import numpy as np
import scipy
import pyvista as pv
import matplotlib.pyplot as plt
from sklearn.utils.extmath import randomized_svd
from collections import namedtuple
import os

from ..tools.functions_list import FunctionsList
from ..tools.backends import IntegralCalculator, LoopProgress, Timer
from .offline_base import OfflineDDROM
import warnings

[docs] class rSVD(OfflineDDROM): r""" A class to perform randomized Singular Value Decomposition (rSVD) on a list of snapshots :math:`u(\mathbf{x};\,\boldsymbol{\mu})` dependent on some parameter :math:`\boldsymbol{\mu}\in\mathcal{P}\subset\mathbb{R}^p`. Parameters ---------- grid : pyvista.UnstructuredGrid The grid on which the rSVD is performed. It is used to define the spatial domain of the snapshots. gdim : int, optional (Default = 3) The geometric dimension of the grid. It can be either 2 or 3. varname : str, optional (default='u') The name of the variable to be used for the rSVD. Default is 'u'. """
[docs] def fit(self, train_snaps: FunctionsList, rank: int, verbose: bool=False, **kwargs): r""" This method is used to perform the randomized SVD on the training snapshots. Parameters ---------- train_snaps : FunctionsList The training snapshots used to compute the rSVD modes. rank : int The rank for the truncated SVD. verbose : bool, optional If True, print progress messages. Default is False. kwargs : dict, optional Additional keyword arguments for the SVD solver. """ Ns = len(train_snaps) if verbose: print('Computing ' + self.varname + ' SVD', end='\r') _time = Timer() _time.start() # Ignore runtime warnings from sklearn with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) # Compute the randomized SVD _U, _S, _ = randomized_svd(train_snaps.return_matrix(), n_components=rank, **kwargs) self.singular_values = _S self.svd_modes = FunctionsList(train_snaps.fun_shape) self.svd_modes.build_from_matrix(_U) elapsed_time = _time.stop() if verbose: print(f"SVD of {self.varname} snapshots calculated in {elapsed_time:.6f} seconds (cpu).")
[docs] def plot_sing_vals(self): """ Simple method to plot the eigenvalues of the POD modes: in linear scale, residual and cumulative energy. Returns ------- fig : matplotlib.figure.Figure The figure containing the plots of the eigenvalues, residual energy, and cumulative energy. """ fontsize = 17 fig, axs = plt.subplots(1, 3, figsize=(16, 4), sharex=True) _Nplot = np.arange(1, len(self.singular_values)+1) axs[0].plot(_Nplot, self.singular_values, 'r', label='Singular Values') axs[0].set_ylabel(r'Singular Values: $\sigma_r$', fontsize=fontsize) axs[1].semilogy(_Nplot[:-1], 1 - np.cumsum(self.singular_values[:-1]**2) / np.sum(self.singular_values**2), 'b', label='Cumulative residual energy') axs[1].set_ylabel(r'Residual energy: $1 - \frac{\sum_{k=1}^r \sigma_k^2}{\sum_{n=1}^{r_{max}} \sigma_n^2}$', fontsize=fontsize) axs[2].plot(_Nplot, np.cumsum(self.singular_values**2) / np.sum(self.singular_values**2), 'g', label='Cumulative energy') axs[2].set_ylabel(r'Cumulative energy: $\frac{\sum_{k=1}^r \sigma_k^2}{\sum_{n=1}^{N_s} \sigma_n^2}$', fontsize=fontsize) for ax in axs: ax.set_xlabel(r'Rank $r$', fontsize=fontsize) ax.grid(which='both', linestyle='--', linewidth=0.5) ax.tick_params(axis='both', which='major', labelsize=fontsize-2) fig.suptitle(f'Singular Values - {self.varname}', y=1.02, fontsize=fontsize+3) plt.tight_layout() return fig
def _project(self, u: np.ndarray, N: int): r""" Projects the function `u` onto the first `N` SVD modes to obtain the reduced coefficients :math:`\{\alpha_k\}_{k=1}^N`. The projection is done using the inner product in :math:`l_2` (euclidean product). Parameters ---------- u : np.ndarray Function object to project onto the reduced space of dimension `N`. N : int Dimension of the reduced space, modes to be used. Returns ------- coeffs : np.ndarray Modal SVD coefficients of `u`, :math:`\{\alpha_k\}_{k=1}^N`. """ coeffs = self.svd_modes.return_matrix()[:, :N].T @ u return coeffs
[docs] def reduce(self, snaps: FunctionsList | np.ndarray, N: int = None): r""" The reduced coefficients :math:`\{\alpha_k\}_{k=1}^N` of the snapshots using `N` modes :math:`\{\psi_k\}_{k=1}^N` are computed using projection in :math:`l_2`. Parameters ---------- snaps : FunctionsList or np.ndarray Function object to project onto the reduced space of dimension `N`. N : int Dimension of the reduced space, modes to be used. Returns ------- coeff : np.ndarray Modal SVD coefficients of `u`, :math:`\{\alpha_k\}_{k=1}^N`. """ if N is None: N = len(self.svd_modes) else: assert N <= len(self.svd_modes), "N must be less than or equal to the number of SVD modes." if isinstance(snaps, FunctionsList): snaps = snaps.return_matrix() elif isinstance(snaps, np.ndarray): if snaps.ndim == 1: snaps = np.atleast_2d(snaps).T # shape (Nh, 1) else: raise TypeError("Input must be a FunctionsList or a numpy ndarray.") assert snaps.shape[0] == self.svd_modes.fun_shape, "Input shape must match the SVD modes shape." coeffs = np.zeros((N, snaps.shape[1])) for nn in range(coeffs.shape[1]): coeffs[:, nn] = self._project(snaps[:, nn], N) return coeffs
[docs] def reconstruct(self, coeffs: np.ndarray): r""" This method reconstructs the function `u` from the reduced coefficients :math:`\{\alpha_k\}_{k=1}^N` using the POD modes :math:`\{\psi_k\}_{k=1}^N`. .. math:: u(\cdot;\,\boldsymbol{\mu}) = \sum_{k=1}^N \alpha_k(\boldsymbol{\mu}) \psi_k(\cdot) Parameters ---------- coeffs : np.ndarray Reduced coefficients :math:`\{\alpha_k\}_{k=1}^N`, shaped :math:`(N,N_s)`. Returns ------- u : FunctionsList Reconstructed functions. """ assert coeffs.shape[0] <= len(self.svd_modes), "The number of coefficients must be less than or equal to the number of POD modes." coeffs = np.atleast_2d(coeffs) reconstructed_snaps = FunctionsList(self.svd_modes.fun_shape) for nn in range(coeffs.shape[1]): reconstructed_snaps.append( self.svd_modes.lin_combine(coeffs[:, nn]) ) return reconstructed_snaps
[docs] def compute_errors(self, snaps: FunctionsList | np.ndarray, Nmax: int = None, verbose: bool = False): r""" Computes the errors between the original snapshots and the reconstructed ones. Parameters ---------- snaps : FunctionsList or np.ndarray Original snapshots to compare with. Nmax : int, optional Maximum number of modes to use for the reconstruction. If None, all modes are used. verbose : bool, optional If True, print progress messages. Default is False. Returns ---------- mean_abs_err : np.ndarray Average absolute error measured in :math:`L^2`. mean_rel_err : np.ndarray Average relative error measured in :math:`L^2`. computational_time : dict Dictionary with the CPU time of the most relevant operations during the online phase. """ if isinstance(snaps, FunctionsList): Ns = len(snaps) assert snaps.fun_shape == self.svd_modes.fun_shape, "The shape of the snapshots must match the shape of the SVD modes." # Convert FunctionsList to numpy array for processing snaps = snaps.return_matrix() elif isinstance(snaps, np.ndarray): Ns = snaps.shape[1] assert snaps.shape[0] == self.svd_modes.fun_shape, "The shape of the snapshots must match the shape of the SVD modes." else: raise TypeError("Input must be a FunctionsList or a numpy ndarray.") if Nmax is None: Nmax = len(self.svd_modes) else: assert Nmax <= len(self.svd_modes), f"Nmax={Nmax} must be less than or equal to the number of SVD modes, {len(self.svd_modes)}." abs_err = np.zeros((Ns, Nmax)) rel_err = np.zeros((Ns, Nmax)) # Variables to store computational time computational_time = dict() computational_time['StateEstimation'] = np.zeros((Ns, Nmax)) computational_time['Errors'] = np.zeros((Ns, Nmax)) timer = Timer() if verbose: print(f"Computing L2 norm of snapshot", end='\r') _snap_norm = list() for mu_i in range(Ns): timer.start() _snap_norm.append( self.calculator.L2_norm(snaps[:, mu_i]) ) computational_time['Errors'][mu_i, :] = timer.stop() if verbose: progressBar = LoopProgress(msg = f"Computing errors (SVD-projection) - {self.varname}", final = Nmax) for nn in range(Nmax): if verbose: progressBar.update(1, percentage = False) timer.start() _coeffs = self.reduce(snaps, nn+1) # shape (N, Ns) reconstructed_snaps = self.reconstruct(_coeffs) computational_time['StateEstimation'][:, nn] = timer.stop() for mu_i in range(Ns): timer.start() _resid = snaps[:, mu_i] - reconstructed_snaps(mu_i) abs_err[mu_i, nn] = self.calculator.L2_norm(_resid) rel_err[mu_i, nn] = abs_err[mu_i, nn] / _snap_norm[mu_i] computational_time['Errors'][mu_i, nn] += timer.stop() Results = namedtuple('Results', ['mean_abs_err', 'mean_rel_err', 'computational_time']) _res = Results(mean_abs_err = abs_err.mean(axis = 0), mean_rel_err = rel_err.mean(axis = 0), computational_time = computational_time) return _res
[docs] def save(self, path_folder: str, **kwargs): r""" Save the SVD modes and the singular values to a specified path. Parameters ---------- path_folder : str The folder path where the model will be saved. **kwargs : dict Additional keyword arguments for saving options. """ os.makedirs(path_folder, exist_ok=True) # Save the SVD modes self.svd_modes.store(f'SVDmode_{self.varname}', filename=os.path.join(path_folder, f'SVDmode_{self.varname}'), **kwargs) np.save(os.path.join(path_folder, f'sing_vals_{self.varname}.npy'), self.singular_values)
[docs] class POD(OfflineDDROM): r""" A class to perform the POD on a list of snapshots :math:`u(\mathbf{x};\,\boldsymbol{\mu})` dependent on some parameter :math:`\boldsymbol{\mu}\in\mathcal{P}\subset\mathbb{R}^p`. Parameters ---------- grid : pyvista.UnstructuredGrid The grid on which the POD is performed. It is used to define the spatial domain of the snapshots. gdim : int, optional (Default = 3) The geometric dimension of the grid. It can be either 2 or 3. varname : str, optional The name of the variable to be used for the POD. Default is 'u'. """
[docs] def fit(self, train_snaps: FunctionsList, verbose: bool=False): r""" This method is used to obtain the eigendecomposition of the correlation matrix :math:`C\in\mathbb{R}^{N_s\times N_s}` from a list of snapshots as `FunctionsList`. .. math:: C_{ij} = \left(u(\cdot;\,\boldsymbol{\mu}_i),\,u(\cdot;\,\boldsymbol{\mu}_j)\right)_{L^2}\qquad i,j = 1, \dots, N_s .. math:: C \boldsymbol{\eta_n} = \lambda_n \boldsymbol{\eta_n}\qquad\qquad\qquad n = 1, \dots, N_s The eigenvalues :math:`\lambda_n` and eigenvectors :math:`\boldsymbol{\eta_n}` are then computed. Parameters ---------- train_snaps : FunctionsList The training snapshots used to compute the POD modes. verbose : bool, optional If True, print progress messages. Default is False. """ Ns = len(train_snaps) # Calculate the correlation matrix if verbose: progressBar = LoopProgress(msg = "Computing " + self.varname + ' correlation matrix', final = Ns) _time = Timer() _time.start() corr_matr = np.zeros((Ns, Ns)) for ii in range(Ns): for jj in range(ii, Ns): corr_matr[ii, jj] = self.calculator.L2_inner_product(train_snaps(ii), train_snaps(jj)) corr_matr[jj, ii] = corr_matr[ii, jj] if verbose: progressBar.update(1, percentage = False) eigenvalues, eigenvectors = scipy.linalg.eigh(corr_matr, subset_by_value=[0,np.inf]) # Sort eigenvalues and eigenvectors in descending order sorted_indexes = np.argsort(eigenvalues)[::-1] self.eigenvalues = eigenvalues[sorted_indexes] self.eigenvectors = eigenvectors[:, sorted_indexes] elapsed_time = _time.stop() if verbose: print(f"Eigenvalues calculated in {elapsed_time:.6f} seconds.")
[docs] def plot_eigenvalues(self): """ Simple method to plot the eigenvalues of the POD modes: in linear scale, residual and cumulative energy. Returns ------- fig : matplotlib.figure.Figure The figure containing the plots of the eigenvalues, residual energy, and cumulative energy. """ fig, axs = plt.subplots(1, 3, figsize=(16, 4), sharex=True) _Nplot = np.arange(1, len(self.eigenvalues)+1) axs[0].plot(_Nplot, self.eigenvalues, 'r', label='Eigenvalues') axs[0].set_ylabel(r'Eigenvalues $\lambda_r$') axs[1].semilogy(_Nplot, 1 - np.cumsum(self.eigenvalues) / np.sum(self.eigenvalues), 'b', label='Cumulative residual energy') axs[1].set_ylabel(r'Residual energy $1 - \frac{\sum_{k=1}^r \lambda_k}{\sum_{n=1}^{N_s} \lambda_n}$') axs[2].plot(_Nplot, np.cumsum(self.eigenvalues) / np.sum(self.eigenvalues), 'g', label='Cumulative energy') axs[2].set_ylabel(r'Cumulative energy $\frac{\sum_{k=1}^r \lambda_k}{\sum_{n=1}^{N_s} \lambda_n}$') for ax in axs: ax.set_xlabel(r'Rank $r$') ax.grid(which='both', linestyle='--', linewidth=0.5) plt.tight_layout() return fig
[docs] def gram_schmidt(self, fun: np.ndarray): r""" Perform a step of the Gram-Schmidt process on POD modes :math:`\{\psi_k\}_{k=1}^r` adding `fun` :math:`=f` to enforce the orthonormality in :math:`L^2` .. math:: \psi_{r+1} = f - \sum_{k=1}^r \frac{(f, \psi_k)_{L^2}}{(\psi_k, \psi_k)_{L^2}}\psi_k Parameters ---------- fun : np.ndarray Function to add to the POD basis. Returns ------- normalised_fun : np.ndarray Orthonormalised function :math:`\psi_{r+1}` with respect to the POD basis :math:`\{\psi_k\}_{k=1}^r`. """ ii = len(self.pod_modes) # Defining the summation term _rescaling = list() for jj in range(ii+1): if jj < ii: _rescaling.append( self.calculator.L2_inner_product(fun, self.pod_modes(jj)) / self.calculator.L2_inner_product(self.pod_modes(jj), self.pod_modes(jj)) * self.pod_modes(jj) ) # Computing the orthonormalised function normalised_fun = fun - sum(_rescaling) return normalised_fun / self.calculator.L2_norm(normalised_fun)
[docs] def compute_basis(self, train_snaps: FunctionsList, rank: int, normalise: bool = False): r""" Computes the POD modes. To enforce the orthonormality in :math:`L^2`, the Gram-Schmidt procedure can be used, if the number of modes to be used is high the numerical error in the eigendecomposition may be too large and the orthonormality is lost. Parameters ---------- train_snap : FunctionsList List of snapshots onto which the POD is performed. rank : int Integer input indicating the number of modes to define. normalise : boolean, optional (Default = False) If True, the Gram-Schmidt procedure is used to normalise the POD modes. """ self.pod_modes = FunctionsList(train_snaps.fun_shape) for rr in range(rank): _mode = train_snaps.lin_combine(self.eigenvectors[:,rr] / np.sqrt(self.eigenvalues[rr])) if normalise: # Perform Gram-Schmidt process to ensure orthonormality self.pod_modes.append( self.gram_schmidt(_mode) ) else: self.pod_modes.append( _mode )
def _project(self, u: np.ndarray, N: int): r""" Projects the function `u` onto the first `N` POD modes to obtain the reduced coefficients :math:`\{\alpha_k\}_{k=1}^N`. The projection is done using the inner product in :math:`L_2`, i.e. .. math:: \alpha_k(\boldsymbol{\mu}) = (u(\cdot;\,\boldsymbol{\mu}), \,\psi_k)_{L^2}\qquad k = 1, \dots, N Parameters ---------- u : np.ndarray Function object to project onto the reduced space of dimension `N`. N : int Dimension of the reduced space, modes to be used. Returns ------- coeffs : np.ndarray Modal POD coefficients of `u`, :math:`\{\alpha_k\}_{k=1}^N`. """ coeffs = np.zeros(N) for nn in range(N): coeffs[nn] = self.calculator.L2_inner_product(u, self.pod_modes[nn]) return coeffs
[docs] def reduce(self, snaps: FunctionsList | np.ndarray, N: int = None): r""" The reduced coefficients :math:`\{\alpha_k\}_{k=1}^N` of the snapshots using `N` modes :math:`\{\psi_k\}_{k=1}^N` are computed using projection in :math:`L_2`. Parameters ---------- snaps : FunctionsList or np.ndarray Function object to project onto the reduced space of dimension `N`. N : int Dimension of the reduced space, modes to be used. Returns ------- coeff : np.ndarray Modal POD coefficients of `u`, :math:`\{\alpha_k\}_{k=1}^N`. """ if N is None: N = len(self.pod_modes) else: assert N <= len(self.pod_modes), "N must be less than or equal to the number of POD modes." if isinstance(snaps, FunctionsList): snaps = snaps.return_matrix() elif isinstance(snaps, np.ndarray): if snaps.ndim == 1: snaps = np.atleast_2d(snaps).T # shape (Nh, 1) else: raise TypeError("Input must be a FunctionsList or a numpy ndarray.") assert snaps.shape[0] == self.pod_modes.fun_shape, "Input shape must match the POD modes shape." coeffs = np.zeros((N, snaps.shape[1])) for nn in range(coeffs.shape[1]): coeffs[:, nn] = self._project(snaps[:, nn], N) return coeffs
[docs] def reconstruct(self, coeffs: np.ndarray): r""" This method reconstructs the function `u` from the reduced coefficients :math:`\{\alpha_k\}_{k=1}^N` using the POD modes :math:`\{\psi_k\}_{k=1}^N`. .. math:: u(\cdot;\,\boldsymbol{\mu}) = \sum_{k=1}^N \alpha_k(\boldsymbol{\mu}) \psi_k(\cdot) Parameters ---------- coeffs : np.ndarray Reduced coefficients :math:`\{\alpha_k\}_{k=1}^N`, shaped :math:`(N,N_s)`. Returns ------- u : FunctionsList Reconstructed functions. """ assert coeffs.shape[0] <= len(self.pod_modes), "The number of coefficients must be less than or equal to the number of POD modes." coeffs = np.atleast_2d(coeffs) reconstructed_snaps = FunctionsList(self.pod_modes.fun_shape) for nn in range(coeffs.shape[1]): reconstructed_snaps.append( self.pod_modes.lin_combine(coeffs[:, nn]) ) return reconstructed_snaps
[docs] def compute_errors(self, snaps: FunctionsList | np.ndarray, Nmax: int = None, verbose: bool = False): r""" Computes the errors between the original snapshots and the reconstructed ones. Parameters ---------- snaps : FunctionsList or np.ndarray Original snapshots to compare with. Nmax : int, optional Maximum number of modes to use for the reconstruction. If None, all modes are used. verbose : bool, optional If True, print progress messages. Default is False. Returns ---------- mean_abs_err : np.ndarray Average absolute error measured in :math:`L^2`. mean_rel_err : np.ndarray Average relative error measured in :math:`L^2`. computational_time : dict Dictionary with the CPU time of the most relevant operations during the online phase. """ if isinstance(snaps, FunctionsList): Ns = len(snaps) assert snaps.fun_shape == self.pod_modes.fun_shape, "The shape of the snapshots must match the shape of the POD modes." # Convert FunctionsList to numpy array for processing snaps = snaps.return_matrix() elif isinstance(snaps, np.ndarray): Ns = snaps.shape[1] assert snaps.shape[0] == self.pod_modes.fun_shape, "The shape of the snapshots must match the shape of the POD modes." else: raise TypeError("Input must be a FunctionsList or a numpy ndarray.") if Nmax is None: Nmax = len(self.pod_modes) else: assert Nmax <= len(self.pod_modes), f"Nmax={Nmax} must be less than or equal to the number of POD modes, {len(self.pod_modes)}." abs_err = np.zeros((Ns, Nmax)) rel_err = np.zeros((Ns, Nmax)) # Variables to store computational time computational_time = dict() computational_time['StateEstimation'] = np.zeros((Ns, Nmax)) computational_time['Errors'] = np.zeros((Ns, Nmax)) timer = Timer() if verbose: print(f"Computing L2 norm of snapshot", end='\r') _snap_norm = list() for mu_i in range(Ns): timer.start() _snap_norm.append( self.calculator.L2_norm(snaps[:, mu_i]) ) computational_time['Errors'][mu_i, :] = timer.stop() if verbose: progressBar = LoopProgress(msg = f"Computing errors (POD-projection) - {self.varname}", final = Nmax) for nn in range(Nmax): if verbose: progressBar.update(1, percentage = False) timer.start() _coeffs = self.reduce(snaps, nn+1) # shape (N, Ns) reconstructed_snaps = self.reconstruct(_coeffs) computational_time['StateEstimation'][:, nn] = timer.stop() for mu_i in range(Ns): timer.start() _resid = snaps[:, mu_i] - reconstructed_snaps(mu_i) abs_err[mu_i, nn] = self.calculator.L2_norm(_resid) rel_err[mu_i, nn] = abs_err[mu_i, nn] / _snap_norm[mu_i] computational_time['Errors'][mu_i, nn] += timer.stop() Results = namedtuple('Results', ['mean_abs_err', 'mean_rel_err', 'computational_time']) _res = Results(mean_abs_err = abs_err.mean(axis = 0), mean_rel_err = rel_err.mean(axis = 0), computational_time = computational_time) return _res
[docs] def save(self, path_folder: str, **kwargs): r""" Save the POD modes and the eigenvalues to a specified path. Parameters ---------- path_folder : str The folder path where the model will be saved. **kwargs : dict Additional keyword arguments for saving options. """ os.makedirs(path_folder, exist_ok=True) # Save the POD modes self.pod_modes.store(f'PODmode_{self.varname}', filename=os.path.join(path_folder, f'PODmode_{self.varname}'), **kwargs) np.save(os.path.join(path_folder, f'eigenvalues_{self.varname}.npy'), self.eigenvalues)
[docs] class HierarchicalSVD(rSVD): r""" A class to perform hierarchical Singular Value Decomposition (hSVD) from `Iwen and Ong (2016) <https://epubs.siam.org/doi/10.1137/140971500>`_. This class inherits from the `rSVD` class and extends its functionality to perform hSVD and update the SVD modes accordingly. This method is particularly useful for large datasets where a hierarchical approach can improve computational efficiency, especially when dealing with parametric problems. Parameters ---------- grid : pyvista.UnstructuredGrid The grid on which the hSVD is performed. It is used to define the spatial domain of the snapshots. gdim : int, optional (Default = 3) The geometric dimension of the grid. It can be either 2 or 3. varname : str, optional (default='u') The name of the variable to be used for the hSVD. Default is 'u'. """ def __init__(self, grid, gdim=3, varname='u', **kwargs): super().__init__(grid, gdim, varname, **kwargs) self.svd_modes = None self.singular_values = None
[docs] def update(self, train_snaps: FunctionsList = None, new_modes: FunctionsList = None, new_sing_vals: np.ndarray = None, rank: int = None, **kwargs): r""" This method updates the SVD modes and singular values using the hierarchical SVD approach, either using new training snapshots or directly providing new modes and singular values. Parameters ---------- train_snaps : FunctionsList, optional New training snapshots used to compute the new SVD modes and singular values. new_modes : FunctionsList, optional New SVD modes to be used for the update. new_sing_vals : np.ndarray, optional New singular values to be used for the update. """ if rank is not None: _rank = rank else: _rank = len(self.singular_values) if self.singular_values is not None else len(train_snaps) if train_snaps is not None: # Ignore runtime warnings from sklearn with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) # Perform rSVD on the new training snapshots _U_new, _S_new, _ = randomized_svd(train_snaps.return_matrix(), n_components=_rank, **kwargs) new_modes = FunctionsList(train_snaps.fun_shape) new_modes.build_from_matrix(_U_new) new_sing_vals = _S_new else: assert new_modes is not None and new_sing_vals is not None, "Either train_snaps or both new_modes and new_sing_vals must be provided." if self.svd_modes is None: # First update, simply assign the new modes and singular values self.svd_modes = new_modes self.singular_values = new_sing_vals else: # Ignore runtime warnings from sklearn with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) _A = np.hstack([self.svd_modes.return_matrix() @ np.diag(self.singular_values), new_modes.return_matrix() @ np.diag(new_sing_vals)]) _U_updated, _S_updated, _ = randomized_svd(_A, n_components=_rank, **kwargs) # Update the SVD modes and singular values self.svd_modes = FunctionsList(self.svd_modes.fun_shape) self.svd_modes.build_from_matrix(_U_updated) self.singular_values = _S_updated
[docs] class IncrementalSVD(rSVD): r""" A class to perform incremental Singular Value Decomposition (iSVD) from `Brand (2002) <https://link.springer.com/chapter/10.1007/3-540-47969-4_47>`_. This class inherits from the `rSVD` class and extends its functionality to perform iSVD and update the SVD modes accordingly. This method is particularly useful for streaming data or when new snapshots become available over time. Parameters ---------- grid : pyvista.UnstructuredGrid The grid on which the iSVD is performed. It is used to define the spatial domain of the snapshots. gdim : int, optional (Default = 3) The geometric dimension of the grid. It can be either 2 or 3. varname : str, optional (default='u') The name of the variable to be used for the iSVD. Default is 'u'. """ def __init__(self, grid, gdim=3, varname='u', **kwargs): super().__init__(grid, gdim, varname, **kwargs) self.svd_modes = None self.singular_values = None
[docs] def fit(self, train_snaps: FunctionsList, rank: int, verbose: bool=False, **kwargs): r""" This method is used to perform the randomized SVD on the training snapshots. Parameters ---------- train_snaps : FunctionsList The training snapshots used to compute the rSVD modes. rank : int The rank for the truncated SVD. verbose : bool, optional If True, print progress messages. Default is False. kwargs : dict, optional Additional keyword arguments for the SVD solver. """ self.Ns = len(train_snaps) self.Nh = train_snaps.fun_shape self.rank = rank if verbose: print('Computing ' + self.varname + ' SVD', end='\r') _time = Timer() _time.start() # Ignore runtime warnings from sklearn with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) _U, _S, _Vh = randomized_svd(train_snaps.return_matrix(), n_components=rank, **kwargs) self.singular_values = _S self.svd_modes = FunctionsList(train_snaps.fun_shape) self.svd_modes.build_from_matrix(_U) self.Vh = _Vh elapsed_time = _time.stop() if verbose: print(f"SVD of {self.varname} snapshots calculated in {elapsed_time:.6f} seconds (cpu).")
[docs] def update(self, new_snap: FunctionsList | np.ndarray): r""" This method updates the SVD modes and singular values using the incremental SVD approach with a new snapshot. """ if isinstance(new_snap, FunctionsList): new_snap = new_snap.return_matrix() elif isinstance(new_snap, np.ndarray): if new_snap.ndim == 1: new_snap = np.atleast_2d(new_snap).T # shape (Nh, 1) else: raise TypeError("Input must be a FunctionsList or a numpy ndarray.") # Step 1: eigen-decom (projection onto the existing SVD modes) L, num_new_snaps = self._eigen_decomposition(new_snap) # Step 2: non-orthogonal components over the existing SVD modes with QR J, K = self._qr_decomposition(L, new_snap) # Step 3: new SVD Uplus, Splus, Vhplus = self._compute_new_svd(L, K) # Step 4: update the SVD with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) self.svd_modes = FunctionsList(snap_matrix = np.hstack([self.svd_modes.return_matrix(), J]) @ Uplus) self.singular_values = Splus tmp_V = np.vstack([ np.hstack([self.Vh.T, np.zeros((self.Vh.shape[1], J.shape[1]))]), np.hstack([np.zeros((J.shape[1], self.Vh.shape[0])), np.eye(J.shape[1])])]) self.Vh = (tmp_V @ Vhplus.T).T # Update number of snapshots and check SVD dimensions self.Ns += num_new_snaps self._check_svd()
def _eigen_decomposition(self, new_data: np.ndarray): """ This method performs the eigen-decomposition step of the incremental SVD algorithm. """ with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) L = self.svd_modes.return_matrix().T @ new_data num_new_snaps = new_data.shape[1] assert L.shape[0] == self.rank assert L.shape[1] == num_new_snaps return L, num_new_snaps def _qr_decomposition(self, L: np.ndarray, new_data: np.ndarray): """ This method performs the QR decomposition step of the incremental SVD algorithm. """ with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) H = new_data - self.svd_modes.return_matrix() @ L J, K = np.linalg.qr(H) return J, K def _compute_new_svd(self, L: np.ndarray, K: np.ndarray): """ This method computes the new SVD after the eigen-decomposition and QR decomposition steps. """ Q = np.vstack([np.hstack([np.diag(self.singular_values), L]), np.hstack([np.zeros((K.shape[0], self.singular_values.shape[0])), K])]) # Ignore runtime warnings from sklearn with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) Uplus, Splus, Vhplus = randomized_svd(Q, n_components=self.rank) return Uplus, Splus, Vhplus def _check_svd(self): assert self.svd_modes.return_matrix().shape[0] == self.Nh assert self.svd_modes.return_matrix().shape[1] == self.rank assert self.singular_values.shape[0] == self.rank assert self.Vh.shape[0] == self.rank assert self.Vh.shape[1] == self.Ns