Failing Sensors

State estimation methods typically assumes input measurements to be perfect, or only polluted by random noise. However, in practical applications, sensors may fail or provide erroneous data due to various reasons such as hardware malfunctions, environmental conditions, or communication issues. This notebook addresses this problem following the approach proposed in Riva et al., 2025.

Both the GEIM and the PBDW methods are extended to handle failing sensors. Two kind of failures are considered:

  • Hard failures: the sensor stops providing data altogether.

  • Drift failures: the sensor provides erroneous data, e.g., biased measurements, given \(\mathbf{y}\) as the true measurement vector (already including “standard noise”), the faulty sensor provides \(\mathbf{y}_{\text{faulty}} = \mathbf{y} + \mathcal{N}(\kappa, \rho^2)\) where \(\mathcal{N}(\kappa, \rho^2)\) is a Gaussian random variable with mean \(\kappa\) and variance \(\rho^2\).

In this notebook, we are going to consider a MP simulation of a MSFR, during the Unprotected Loss Of Flow (ULOFF) transient.

Import OpenFOAM case

Using pyforce functions we can import directly the OpenFOAM case.

[ ]:
from pyforce.tools.write_read import ReadFromOF

of = ReadFromOF('../Datasets/MSFR-ULOFF')
grid = of.mesh()

# import pyvista as pv

# pl = pv.Plotter()
# pl.add_mesh(grid, color='lightgrey', show_edges=True)
# pl.add_axes()
# pl.add_bounding_box()
# pl.show(jupyter_backend='pythreejs')

Let us load the temperature field

[2]:
field = 'T'

dataset, times = of.import_field(field)
Importing T using pyvista - 100.00%

Let us scale it to [0,1] range for numerical stability

[3]:
from pyforce.tools.scalers import MinMaxScaler

scaler = MinMaxScaler(global_scale=True)
scaler.fit(dataset)

snapshots = scaler.transform(dataset)

Let us split the data into train and test

[4]:
from pyforce.tools.functions_list import train_test_split

train_times, test_times, train_snaps, test_snaps = train_test_split(
    times, snapshots, test_size=0.2, random_state=42
)

# Sorting the training snapshots according to time
import numpy as np
_argsort = np.argsort(train_times)
train_times = np.array(train_times)[_argsort]
train_snaps.sort(_argsort)

# Sorting the testing snapshots according to time
_argsort = np.argsort(test_times)
test_times = np.array(test_times)[_argsort]
test_snaps.sort(_argsort)

GEIM

We first test the GEIM method with failing sensors.

[5]:
geim_on_abs_err = dict()
geim_on_rel_err = dict()

noise_std = 0.02 # 2% noise

geim_estimation = dict()

Offline Phase

At first, the GEIM magic functions and sensors are computed in the offline phase.

[6]:
from pyforce.offline.geim import GEIM as GEIMoffline

geim_off = GEIMoffline(grid, varname=field)

Sensors will be placed everywhere in the domain.

[7]:
sens_params = {
    's': 0.025, # spread of the Gaussian kernels - hyperparameter to be tuned according to the physical system
    'xm_list': grid.cell_centers().points[::20].tolist() # optional (sampling every 10 points to speed up the process)
}

res_geim_offline = geim_off.fit(train_snaps, Mmax = 20, sensor_params=sens_params,
                                verbose=True)
Creating Gaussian Sensor Library: 100.000 / 100.00% - 0.000567 s/it
GEIM Iteration 20/20 - MaxAbsErr: 1.0234e-03 - MaxRelErr: 4.1995e-03

Let us plot the sensors just placed

[8]:
from matplotlib import patches

fig = geim_off.plot_sensors(M=20, levels=40, fig_length=6, fontsize_sens=10,
                                color_sens='blue', cmap='Blues', view='xz')

def get_core_msfr_geometry(ax):

    rect_blanket = patches.Rectangle((1.13, -1.88/2), 0.7, 1.88, linewidth=1, facecolor='white',
                                     edgecolor='black')
    ax.add_patch(rect_blanket)

get_core_msfr_geometry(fig.get_axes()[0])

ax = fig.get_axes()[0]
ax.set_aspect('equal')
../../_images/Tutorials_Advanced_05_failing_sensors_16_0.png

Online Phase: baseline

The baseline is the reconstruction using noisy data, but with all the sensors working. The TR-GEIM version is used.

[9]:
from pyforce.online.geim import GEIM as GEIMonline

geim_on = GEIMonline(grid, varname=field)
geim_on.set_basis(geim_off.magic_functions)
geim_on.set_magic_sensors(geim_off.magic_sensors.library)

# Assemble matrix for online phase
geim_on.compute_B_matrix()

# Regularisation matrix
geim_on.set_tikhonov_matrices(train_snaps=train_snaps)

Let us compute the errors on the test set

[10]:
_res = geim_on.compute_errors(test_snaps, noise_std = noise_std,
                            regularization_params = {'type': 'tikhonov', 'lambda': noise_std**2},
                             verbose=True)

geim_on_abs_err['baseline'] = _res[0]
geim_on_rel_err['baseline'] = _res[1]
Computing errors - T: 20.000 / 20.00 - 0.038041 s/it

Let us also estimate the field using all the available sensors (no failing sensors yet)

[11]:
_measures = geim_on.get_measurements(test_snaps, noise_std=noise_std)
geim_estimation['baseline'] = geim_on.estimate(_measures,
                                               regularization_params = {'type': 'tikhonov', 'lambda': noise_std**2} )

Let us make a plot of the results

[12]:
from matplotlib import pyplot as plt
from matplotlib import cm
def plot_contour(ax, domain, _snap,
                 cmap = cm.jet, levels = 20):
    snap = _snap

    cont = ax.tricontourf(*domain.T, snap, cmap = cmap, levels = levels)

    # Add Blanket
    rect_blanket = patches.Rectangle((1.13, -1.88/2), 0.7, 1.88, linewidth=1, facecolor='white',
                                     edgecolor='black',
                                    zorder=3)

    ax.add_patch(rect_blanket)

    ax.set_xticks([])
    ax.set_yticks([])

    return cont

nodes = grid.cell_centers().points[:, [0, 2]] # x-z plane

from IPython.display import clear_output
for tt in range(0, len(test_times), 10):

    fig, axs = plt.subplots(1, 3, figsize=(6*3,5))

    c = plot_contour(axs[0], nodes, test_snaps[tt],levels = np.linspace(0, 1, 50))
    plot_contour(axs[1], nodes, geim_estimation['baseline'][tt], levels = np.linspace(0, 1, 50))

    fig.colorbar(c, ax=axs[0])
    fig.colorbar(c, ax=axs[1])

    # Residual
    residual = np.abs(test_snaps[tt] - geim_estimation['baseline'][tt])
    c = plot_contour(axs[2], nodes, residual, levels = np.linspace(0, np.max(residual), 50), cmap=cm.inferno)
    fig.colorbar(c, ax=axs[2])

    # Labels
    axs[0].set_title('FOM')
    axs[1].set_title('TR-GEIM Baseline')
    axs[2].set_title('Residual')

    fig.suptitle(field+' at time = %.2f s' % (test_times[tt]), fontsize=16)

    plt.show()
    clear_output(wait=True)
    plt.close(fig)
../../_images/Tutorials_Advanced_05_failing_sensors_24_0.png

Online Phase: failing sensors

Now, we simulate failing sensors. We consider both hard failures and drift failures.

The class is initialized as before.

[13]:
from pyforce.online.failure_sens import FailingGEIM

failing_geim_on = FailingGEIM(grid, varname=field)

failing_geim_on.set_basis(geim_off.magic_functions)
failing_geim_on.set_magic_sensors(geim_off.magic_sensors.library)

# Assemble matrix for online phase
failing_geim_on.compute_B_matrix()

# Regularisation matrix
failing_geim_on.set_tikhonov_matrices(train_snaps=train_snaps)

Drift failures are simulated by adding a bias and variance to the measurements of the failed sensors.

Let us visualize the effect of the drift on measurements

[14]:
kappa_drift = 0.4 # 10% drift
rho_drift = 0.05 # 5% variance

drift_dict = {
    'kappa': kappa_drift,
    'rho': rho_drift,
    'idx_failed': [0,5,10],
    'mu_failure': 100
}

clean_meas = failing_geim_on.get_measurements(snapshots, noise_std=1e-12)
drifted_meas = failing_geim_on.get_measurements(snapshots, noise_std=1e-12, drift_dict=drift_dict)

fig, axs = plt.subplots(1, len(drift_dict['idx_failed']), figsize=(5 * len(drift_dict['idx_failed']), 4))

for i, idx in enumerate(drift_dict['idx_failed']):
    axs[i].plot(clean_meas[idx], label='Clean Measurement', color='blue')
    axs[i].plot(drifted_meas[idx], label='Drifted Measurement', color='red', linestyle='--')
    axs[i].set_title(f'Sensor {idx} Measurements')
    axs[i].set_xlabel('Time Step')
    axs[i].set_ylabel('Measurement Value')
    axs[i].legend()
../../_images/Tutorials_Advanced_05_failing_sensors_28_0.png

Let us compute the test errors with drift failures on a single sensor. The total number of sensors used is 20 as the number of basis and the influence of the failed index is investigated.

[15]:
geim_on_abs_err['drift'] = list()
geim_on_rel_err['drift'] = list()
geim_estimation['drift'] = list()

snaps_norm = list()
for _snap in test_snaps:
    snaps_norm.append(failing_geim_on.calculator.L2_norm(_snap))

for kk in range(len(failing_geim_on.basis)):

    print(f"Simulating drift failure on sensor {kk+1}/{len(failing_geim_on.basis)}", end='\r')

    drift_dict = {
        'kappa': kappa_drift,
        'rho': rho_drift,
        'idx_failed': [kk],
        'mu_failure': 0
    }

    drifted_mea = failing_geim_on.get_measurements(test_snaps, noise_std=noise_std, drift_dict=drift_dict)
    geim_estimation['drift'].append(failing_geim_on.estimate(   drifted_mea,
                                                                regularization_params={'type': 'tikhonov', 'lambda': noise_std**2}))

    _abs_err = list()
    _rel_err = list()
    for tt in range(len(test_snaps)):
        _abs_err.append(
            failing_geim_on.calculator.L2_norm(geim_estimation['drift'][-1](tt) - test_snaps(tt))
        )
        _rel_err.append(
            _abs_err[-1] / snaps_norm[tt]
        )

    geim_on_abs_err['drift'].append(np.mean(_abs_err))
    geim_on_rel_err['drift'].append(np.mean(_rel_err))
Simulating drift failure on sensor 20/20

Hard failures are simulated by removing the measurements of the failed sensors.

[16]:
geim_on_abs_err['hard'] = list()
geim_on_rel_err['hard'] = list()
geim_estimation['hard'] = list()

snaps_norm = list()
for _snap in test_snaps:
    snaps_norm.append(failing_geim_on.calculator.L2_norm(_snap))

for kk in range(len(failing_geim_on.basis)):

    print(f"Simulating hard failure on sensor {kk+1}/{len(failing_geim_on.basis)}", end='\r')

    hard_failure_dict = {
        'mu_failure': 0,
        'idx_failed': [kk]
    }

    # Get measurements
    measures = failing_geim_on.get_measurements(test_snaps, noise_std=noise_std)

    # Estimate the state
    geim_estimation['hard'].append(
        failing_geim_on.estimate(   measures,
                                    regularization_params = {'type': 'tikhonov', 'lambda': noise_std**2},
                                    hard_failure_dict=hard_failure_dict)
                                    )

    # Compute errors

    _abs_err = list()
    _rel_err = list()
    for tt in range(len(test_snaps)):
        _abs_err.append(
            failing_geim_on.calculator.L2_norm(geim_estimation['hard'][-1](tt) - test_snaps(tt))
        )
        _rel_err.append(
            _abs_err[-1] / snaps_norm[tt]
        )

    geim_on_abs_err['hard'].append(np.mean(_abs_err))
    geim_on_rel_err['hard'].append(np.mean(_rel_err))
Simulating hard failure on sensor 20/20

Let us plot the errors obtained with drift failures, with respect to the baseline (no failing sensors)

[17]:
fig, axs = plt.subplots(1, 2, figsize=(12,5))

# Baseline
axs[0].axhline(geim_on_abs_err['baseline'][-1], color='red', linestyle='--', label='Baseline')
axs[1].axhline(geim_on_rel_err['baseline'][-1], color='red', linestyle='--', label='Baseline')

# Drift failures
for failure in ['drift', 'hard']:
    axs[0].plot(geim_on_abs_err[failure], marker='o', label=failure + ' Failures')
    axs[1].plot(geim_on_rel_err[failure], marker='o', label=failure + ' Failures')

for ax in axs:
    ax.legend()
    ax.grid()
    ax.set_xticks(range(0, len(failing_geim_on.basis)+1, 2))
    ax.set_xlabel('Failed Sensor Index')
    ax.set_yscale('log')

axs[0].set_ylabel('Mean Absolute Error')
axs[1].set_ylabel('Mean Relative Error')

fig.suptitle('GEIM with Failing Sensors', fontsize=16)
[17]:
Text(0.5, 0.98, 'GEIM with Failing Sensors')
../../_images/Tutorials_Advanced_05_failing_sensors_34_1.png

Let us make a plot of the reconstructions

[51]:
from IPython.display import clear_output

ncols = len(geim_estimation)+1
nrows = 2

_idx_failed_to_plot = 0

for tt in range(0, len(test_times), 10):

    fig, axs = plt.subplots(nrows, ncols, figsize=(6*ncols, 5 * nrows))

    c = plot_contour(axs[0,0], nodes, test_snaps[tt],levels = np.linspace(0, 1, 50))
    fig.colorbar(c, ax=axs[0,0])
    axs[0,0].set_title('FOM')

    for i, (key, estimation) in enumerate(geim_estimation.items()):

        if key == 'baseline':
            _est = estimation
        else:
            _est = estimation[_idx_failed_to_plot]

        c = plot_contour(axs[0,i+1], nodes, _est[tt], levels = np.linspace(0, 1, 50))
        fig.colorbar(c, ax=axs[0, i+1])

        # Residual
        residual = np.abs(test_snaps[tt] - _est[tt])
        c = plot_contour(axs[1,i+1], nodes, residual, levels = np.linspace(0, np.max(residual), 50), cmap=cm.inferno)
        fig.colorbar(c, ax=axs[1,i+1])

        axs[0, i+1].set_title(f'GEIM - {key} Failures')

    fig.suptitle(field+' - Failed Sensor Index: %d at time = %.2f s' % (_idx_failed_to_plot, test_times[tt]), fontsize=16)
    axs[1,0].axis('off')

    plt.show()
    clear_output(wait=True)
    plt.close(fig)
../../_images/Tutorials_Advanced_05_failing_sensors_36_0.png

PBDW

We now test the PBDW method with failing sensors. We are going to use the POD basis for the background space, and the SGREEDY sensors for the update space.

[18]:
pbdw_on_abs_err = dict()
pbdw_on_rel_err = dict()

pbdw_estimation = dict()

Offline Phase

At first, the POD basis for the background space is computed in the offline phase.

[19]:
from pyforce.offline.pod import POD

pod_off = POD(grid, varname=field)
pod_off.fit(train_snaps, verbose=True)
Computing T correlation matrix: 220.000 / 220.00 - 0.005787 s/it
Eigenvalues calculated in 1.286165 seconds.

Let us plot the singular values decay

[20]:
pod_off.plot_eigenvalues().show()
/var/folders/rq/_c546f811656mn_2__gt6l_00000gn/T/ipykernel_2058/311611219.py:1: UserWarning: FigureCanvasAgg is non-interactive, and thus cannot be shown
  pod_off.plot_eigenvalues().show()
../../_images/Tutorials_Advanced_05_failing_sensors_42_1.png

Let us compute the POD modes

[21]:
pod_off.compute_basis(train_snaps, rank=10)

Let us place the sensors using the SGREED algorithm on the POD basis

[22]:
from pyforce.offline.sgreedy import SGREEDY

sens_params = {
    's': 0.025, # spread of the Gaussian kernels - hyperparameter to be tuned according to the physical system
    'xm_list': grid.cell_centers().points[::20].tolist() # optional (sampling every 10 points to speed up the process)
}

sgreedy_off = SGREEDY(grid, varname=field)
sgreedy_off.fit(pod_off.pod_modes, Mmax=20, verbose=True,
                sensor_params=sens_params)
Creating Gaussian Sensor Library: 100.000 / 100.00% - 0.000581 s/it
SGREEDY: m = 19, n = 10 | beta_n,m = 0.117150

Let us plot the sensors

[23]:
fig = sgreedy_off.plot_sensors(M=20, levels=40, fig_length=6, fontsize_sens=10,
                                color_sens='blue', cmap='Blues', view='xz')

get_core_msfr_geometry(fig.get_axes()[0])

ax = fig.get_axes()[0]
ax.set_aspect('equal')
/Users/sriva/miniconda3/envs/pyforce-env/lib/python3.10/site-packages/pyforce/offline/sgreedy.py:267: UserWarning: The following kwargs were not used by contour: 'show_edges'
  sc = axs.tricontourf(*nodes.T, _cumulative, cmap=cmap, levels=levels, show_edges=False)
../../_images/Tutorials_Advanced_05_failing_sensors_48_1.png

Online Phase: baseline

The baseline is the reconstruction using noisy data, but with all the sensors working.

[24]:
from pyforce.online.pbdw import PBDW as PBDWonline

pbdw_on = PBDWonline(grid, varname=field)
pbdw_on.set_basis(pod_off.pod_modes)
pbdw_on.set_basis_sensors(sgreedy_off.sensors.library)

# Assemble matrix for online phase
pbdw_on.compute_matrices()

Let us compute the errors on the test set

[25]:
_res = pbdw_on.compute_errors(  test_snaps, noise_std = noise_std,
                                verbose=True)

pbdw_on_abs_err['baseline'] = _res[0]
pbdw_on_rel_err['baseline'] = _res[1]
Computing errors - T: 11.000 / 11.00 - 0.054644 s/it

Let us also estimate the field using all the available sensors (no failing sensors yet)

[26]:
_measures = pbdw_on.get_measurements(test_snaps, noise_std=noise_std)
pbdw_estimation['baseline'] = pbdw_on.estimate(_measures)

Let us make a plot of the results

[27]:
from IPython.display import clear_output
for tt in range(0, len(test_times), 10):

    fig, axs = plt.subplots(1, 3, figsize=(6*3,5))

    c = plot_contour(axs[0], nodes, test_snaps[tt],levels = np.linspace(0, 1, 50))
    plot_contour(axs[1], nodes, pbdw_estimation['baseline'][tt], levels = np.linspace(0, 1, 50))

    fig.colorbar(c, ax=axs[0])
    fig.colorbar(c, ax=axs[1])

    # Residual
    residual = np.abs(test_snaps[tt] - pbdw_estimation['baseline'][tt])
    c = plot_contour(axs[2], nodes, residual, levels = np.linspace(0, np.max(residual), 50), cmap=cm.inferno)
    fig.colorbar(c, ax=axs[2])

    # Labels
    axs[0].set_title('FOM')
    axs[1].set_title('PBDW Baseline')
    axs[2].set_title('Residual')

    fig.suptitle(field+' at time = %.2f s' % (test_times[tt]), fontsize=16)

    plt.show()
    clear_output(wait=True)
    plt.close(fig)
../../_images/Tutorials_Advanced_05_failing_sensors_56_0.png

Online Phase: failing sensors

Now, we simulate failing sensors. We consider both hard failures and drift failures.

The class is initialized as before.

[28]:
from pyforce.online.failure_sens import FailingPBDW

failing_pbdw_on = FailingPBDW(grid, varname=field)

failing_pbdw_on.set_basis(pod_off.pod_modes)
failing_pbdw_on.set_basis_sensors(sgreedy_off.sensors.library)

# Assemble matrix for online phase
failing_pbdw_on.compute_matrices()

Drift failures are simulated by adding a bias and variance to the measurements of the failed sensors. Let us compute the test errors with drift failures on a single sensor. The total number of sensors used is 20 as the number of basis and the influence of the failed index is investigated.

[36]:
pbdw_on_abs_err['drift'] = list()
pbdw_on_rel_err['drift'] = list()
pbdw_estimation['drift'] = list()

snaps_norm = list()
for _snap in test_snaps:
    snaps_norm.append(failing_pbdw_on.calculator.L2_norm(_snap))

for kk in range(len(failing_pbdw_on.sensors)):

    print(f"Simulating drift failure on sensor {kk+1}/{len(failing_pbdw_on.sensors)}", end='\r')

    drift_dict = {
        'kappa': kappa_drift,
        'rho': rho_drift,
        'idx_failed': [kk],
        'mu_failure': 0
    }

    drifted_mea = failing_pbdw_on.get_measurements(test_snaps, noise_std=noise_std, drift_dict=drift_dict)
    pbdw_estimation['drift'].append(failing_pbdw_on.estimate(   drifted_mea ))

    _abs_err = list()
    _rel_err = list()
    for tt in range(len(test_snaps)):
        _abs_err.append(
            failing_pbdw_on.calculator.L2_norm(pbdw_estimation['drift'][-1](tt) - test_snaps(tt))
        )
        _rel_err.append(
            _abs_err[-1] / snaps_norm[tt]
        )

    pbdw_on_abs_err['drift'].append(np.mean(_abs_err))
    pbdw_on_rel_err['drift'].append(np.mean(_rel_err))
Simulating drift failure on sensor 20/20

Hard failures are simulated by removing the measurements of the failed sensors.

[37]:
pbdw_on_abs_err['hard'] = list()
pbdw_on_rel_err['hard'] = list()
pbdw_estimation['hard'] = list()

snaps_norm = list()
for _snap in test_snaps:
    snaps_norm.append(failing_pbdw_on.calculator.L2_norm(_snap))

for kk in range(len(failing_pbdw_on.sensors)):

    print(f"Simulating hard failure on sensor {kk+1}/{len(failing_pbdw_on.sensors)}", end='\r')

    hard_failure_dict = {
        'mu_failure': 0,
        'idx_failed': [kk]
    }

    # Get measurements
    measures = failing_pbdw_on.get_measurements(test_snaps, noise_std=noise_std)

    # Estimate the state
    pbdw_estimation['hard'].append(
        failing_pbdw_on.estimate(   measures,
                                    hard_failure_dict=hard_failure_dict)
                                    )

    # Compute errors

    _abs_err = list()
    _rel_err = list()
    for tt in range(len(test_snaps)):
        _abs_err.append(
            failing_pbdw_on.calculator.L2_norm(pbdw_estimation['hard'][-1](tt) - test_snaps(tt))
        )
        _rel_err.append(
            _abs_err[-1] / snaps_norm[tt]
        )

    pbdw_on_abs_err['hard'].append(np.mean(_abs_err))
    pbdw_on_rel_err['hard'].append(np.mean(_rel_err))
Simulating hard failure on sensor 20/20

Let us plot the errors obtained with drift failures, with respect to the baseline (no failing sensors)

[38]:
fig, axs = plt.subplots(1, 2, figsize=(12,5))

# Baseline
axs[0].axhline(pbdw_on_abs_err['baseline'][-1], color='red', linestyle='--', label='Baseline')
axs[1].axhline(pbdw_on_rel_err['baseline'][-1], color='red', linestyle='--', label='Baseline')

# Drift failures
for failure in ['drift', 'hard']:
    axs[0].plot(pbdw_on_abs_err[failure], marker='o', label=failure + ' Failures')
    axs[1].plot(pbdw_on_rel_err[failure], marker='o', label=failure + ' Failures')

for ax in axs:
    ax.legend()
    ax.grid()
    ax.set_xticks(range(0, len(failing_geim_on.basis)+1, 2))
    ax.set_xlabel('Failed Sensor Index')
    ax.set_yscale('log')

axs[0].set_ylabel('Mean Absolute Error')
axs[1].set_ylabel('Mean Relative Error')

fig.suptitle('PBDW with Failing Sensors', fontsize=16)
[38]:
Text(0.5, 0.98, 'PBDW with Failing Sensors')
../../_images/Tutorials_Advanced_05_failing_sensors_64_1.png

Let us plot the reconstructions

[52]:
from IPython.display import clear_output

ncols = len(pbdw_estimation)+1
nrows = 2

_idx_failed_to_plot = 0

for tt in range(0, len(test_times), 10):

    fig, axs = plt.subplots(nrows, ncols, figsize=(6*ncols, 5 * nrows))

    c = plot_contour(axs[0,0], nodes, test_snaps[tt],levels = np.linspace(0, 1, 50))
    fig.colorbar(c, ax=axs[0,0])
    axs[0,0].set_title('FOM')

    for i, (key, estimation) in enumerate(pbdw_estimation.items()):

        if key == 'baseline':
            _est = estimation
        else:
            _est = estimation[_idx_failed_to_plot]

        c = plot_contour(axs[0,i+1], nodes, _est[tt], levels = np.linspace(0, 1, 50))
        fig.colorbar(c, ax=axs[0, i+1])

        # Residual
        residual = np.abs(test_snaps[tt] - _est[tt])
        c = plot_contour(axs[1,i+1], nodes, residual, levels = np.linspace(0, np.max(residual), 50), cmap=cm.inferno)
        fig.colorbar(c, ax=axs[1,i+1])

        axs[0, i+1].set_title(f'PBDW - {key} Failures')

    fig.suptitle(field+' - Failed Sensor Index: %d at time = %.2f s' % (_idx_failed_to_plot, test_times[tt]), fontsize=16)
    axs[1,0].axis('off')

    plt.show()
    clear_output(wait=True)
    plt.close(fig)
../../_images/Tutorials_Advanced_05_failing_sensors_66_0.png