Add layer to perform RBF interpolation in reduced order modeling (#315)

* add RBF implementation in pytorch (in layers)
* add POD-RBF example (baseline for nonintrusive closure)
* Add POD only and POD+RBF implementation
* add POD-RBF in tutorial 8
This commit is contained in:
Anna Ivagnes
2024-08-12 14:46:22 +02:00
committed by GitHub
parent 16261c9baf
commit d4ced3a7d7
14 changed files with 1111 additions and 224 deletions

View File

@@ -82,6 +82,7 @@ Layers
Proper Orthogonal Decomposition <layers/pod.rst>
Periodic Boundary Condition Embedding <layers/pbc_embedding.rst>
Fourier Feature Embedding <layers/fourier_embedding.rst>
Radial Basis Function Interpolation <layers/rbf_layer.rst>
Adaptive Activation Functions
-------------------------------

View File

@@ -43,4 +43,4 @@ Supervised Learning
:titlesonly:
Unstructured convolutional autoencoder via continuous convolution<tutorials/tutorial4/tutorial.rst>
POD-NN for reduced order modeling<tutorials/tutorial8/tutorial.rst>
POD-RBF and POD-NN for reduced order modeling<tutorials/tutorial8/tutorial.rst>

View File

@@ -0,0 +1,7 @@
RBFBlock
======================
.. currentmodule:: pina.model.layers.rbf_layer
.. autoclass:: RBFBlock
:members:
:show-inheritance:

View File

@@ -1,18 +1,20 @@
Tutorial: Reduced order model (PODNN) for parametric problems
===============================================================
Tutorial: Reduced order model (POD-RBF or POD-NN) for parametric problems
=========================================================================
The tutorial aims to show how to employ the **PINA** library in order to
apply a reduced order modeling technique [1]. Such methodologies have
several similarities with machine learning approaches, since the main
goal consists of predicting the solution of differential equations
goal consists in predicting the solution of differential equations
(typically parametric PDEs) in a real-time fashion.
In particular we are going to use the Proper Orthogonal Decomposition
with Neural Network (PODNN) [2], which basically perform a dimensional
reduction using the POD approach, approximating the parametric solution
manifold (at the reduced space) using a NN. In this example, we use a
simple multilayer perceptron, but the plenty of different archiutectures
can be plugged as well.
with either Radial Basis Function Interpolation(POD-RBF) or Neural
Network (POD-NN) [2]. Here we basically perform a dimensional reduction
using the POD approach, and approximating the parametric solution
manifold (at the reduced space) using an interpolation (RBF) or a
regression technique (NN). In this example, we use a simple multilayer
perceptron, but the plenty of different architectures can be plugged as
well.
References
^^^^^^^^^^
@@ -38,7 +40,7 @@ minimum PINA version to run this tutorial is the ``0.1``.
from pina.geometry import CartesianDomain
from pina.problem import ParametricProblem
from pina.model.layers import PODBlock
from pina.model.layers import PODBlock, RBFBlock
from pina import Condition, LabelTensor, Trainer
from pina.model import FeedForward
from pina.solvers import SupervisedSolver
@@ -48,7 +50,7 @@ minimum PINA version to run this tutorial is the ``0.1``.
.. parsed-literal::
We are using PINA version 0.1
We are using PINA version 0.1.1
We exploit the `Smithers <www.github.com/mathLab/Smithers>`__ library to
@@ -60,7 +62,7 @@ snapshots of the velocity (along :math:`x`, :math:`y`, and the
magnitude) and pressure fields, and the corresponding parameter values.
To visually check the snapshots, lets plot also the data points and the
reference solution: this is the expected output of the neural network.
reference solution: this is the expected output of our model.
.. code:: ipython3
@@ -73,13 +75,14 @@ reference solution: this is the expected output of the neural network.
ax.set_title(f'$\mu$ = {p[0]:.2f}')
.. image:: tutorial_files/tutorial_5_1.png
.. image:: tutorial_files/tutorial_5_0.png
The *snapshots* - aka the numerical solutions computed for several
parameters - and the corresponding parameters are the only data we need
to train the model, in order to predict for any new test parameter the
solution. To properly validate the accuracy, we initially split the 500
to train the model, in order to predict the solution for any new test
parameter. To properly validate the accuracy, we initially split the 500
snapshots into the training dataset (90% of the original data) and the
testing one (the reamining 10%). It must be said that, to plug the
snapshots into **PINA**, we have to cast them to ``LabelTensor``
@@ -111,15 +114,92 @@ methodology), just defining a simple *input-output* condition.
parameter_domain = CartesianDomain({'mu': [0, 100]})
conditions = {
'io': Condition(input_points=p, output_points=u)
'io': Condition(input_points=p_train, output_points=u_train)
}
Then, we define the model we want to use: basically we have a MLP
architecture that takes in input the parameter and return the *modal
coefficients*, so the reduced dimension representation (the coordinates
in the POD space). Such latent variable is the projected to the original
space using the POD modes, which are computed and stored in the
``PODBlock`` object.
poisson_problem = SnapshotProblem()
We can then build a ``PODRBF`` model (using a Radial Basis Function
interpolation as approximation) and a ``PODNN`` approach (using an MLP
architecture as approximation).
POD-RBF reduced order model
---------------------------
Then, we define the model we want to use, with the POD (``PODBlock``)
and the RBF (``RBFBlock``) objects.
.. code:: ipython3
class PODRBF(torch.nn.Module):
"""
Proper orthogonal decomposition with Radial Basis Function interpolation model.
"""
def __init__(self, pod_rank, rbf_kernel):
"""
"""
super().__init__()
self.pod = PODBlock(pod_rank)
self.rbf = RBFBlock(kernel=rbf_kernel)
def forward(self, x):
"""
Defines the computation performed at every call.
:param x: The tensor to apply the forward pass.
:type x: torch.Tensor
:return: the output computed by the model.
:rtype: torch.Tensor
"""
coefficents = self.rbf(x)
return self.pod.expand(coefficents)
def fit(self, p, x):
"""
Call the :meth:`pina.model.layers.PODBlock.fit` method of the
:attr:`pina.model.layers.PODBlock` attribute to perform the POD,
and the :meth:`pina.model.layers.RBFBlock.fit` method of the
:attr:`pina.model.layers.RBFBlock` attribute to fit the interpolation.
"""
self.pod.fit(x)
self.rbf.fit(p, self.pod.reduce(x))
We can then fit the model and ask it to predict the required field for
unseen values of the parameters. Note that this model does not need a
``Trainer`` since it does not include any neural network or learnable
parameters.
.. code:: ipython3
pod_rbf = PODRBF(pod_rank=20, rbf_kernel='thin_plate_spline')
pod_rbf.fit(p_train, u_train)
.. code:: ipython3
u_test_rbf = pod_rbf(p_test)
u_train_rbf = pod_rbf(p_train)
relative_error_train = torch.norm(u_train_rbf - u_train)/torch.norm(u_train)
relative_error_test = torch.norm(u_test_rbf - u_test)/torch.norm(u_test)
print('Error summary for POD-RBF model:')
print(f' Train: {relative_error_train.item():e}')
print(f' Test: {relative_error_test.item():e}')
.. parsed-literal::
Error summary for POD-RBF model:
Train: 1.287801e-03
Test: 1.217041e-03
POD-NN reduced order model
--------------------------
.. code:: ipython3
@@ -164,29 +244,27 @@ space using the POD modes, which are computed and stored in the
We highlight that the POD modes are directly computed by means of the
singular value decomposition (computed over the input data), and not
trained using the back-propagation approach. Only the weights of the MLP
trained using the backpropagation approach. Only the weights of the MLP
are actually trained during the optimization loop.
.. code:: ipython3
poisson_problem = SnapshotProblem()
pod_nn = PODNN(pod_rank=20, layers=[10, 10, 10], func=torch.nn.Tanh)
pod_nn.fit_pod(u)
pod_nn.fit_pod(u_train)
pinn_stokes = SupervisedSolver(
pod_nn_stokes = SupervisedSolver(
problem=poisson_problem,
model=pod_nn,
optimizer=torch.optim.Adam,
optimizer_kwargs={'lr': 0.0001})
Now that we set the ``Problem`` and the ``Model``, we have just to train
the model and use it for predict the test snapshots.
Now that we have set the ``Problem`` and the ``Model``, we have just to
train the model and use it for predicting the test snapshots.
.. code:: ipython3
trainer = Trainer(
solver=pinn_stokes,
solver=pod_nn_stokes,
max_epochs=1000,
batch_size=100,
log_every_n_steps=5,
@@ -196,15 +274,41 @@ the model and use it for predict the test snapshots.
.. parsed-literal::
`Trainer.fit` stopped: `max_epochs=1000` reached.
GPU available: True (cuda), used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
/u/a/aivagnes/anaconda3/lib/python3.8/site-packages/pytorch_lightning/trainer/setup.py:187: GPU available but not used. You can set it by doing `Trainer(accelerator='gpu')`.
| Name | Type | Params
----------------------------------------
0 | _loss | MSELoss | 0
1 | _neural_net | Network | 460
----------------------------------------
460 Trainable params
0 Non-trainable params
460 Total params
0.002 Total estimated model params size (MB)
/u/a/aivagnes/anaconda3/lib/python3.8/site-packages/torch/cuda/__init__.py:152: UserWarning:
Found GPU0 Quadro K600 which is of cuda capability 3.0.
PyTorch no longer supports this GPU because it is too old.
The minimum cuda capability supported by this library is 3.7.
warnings.warn(old_gpu_warn % (d, name, major, minor, min_arch // 10, min_arch % 10))
.. parsed-literal::
Epoch 999: 100%|██████████| 5/5 [00:00<00:00, 248.36it/s, v_num=20, mean_loss=0.902]
Training: | | 0/? [00:00<?, ?it/s]
Done! Now the computational expensive part is over, we can load in
.. parsed-literal::
`Trainer.fit` stopped: `max_epochs=1000` reached.
Done! Now that the computational expensive part is over, we can load in
future the model to infer new parameters (simply loading the checkpoint
file automatically created by ``Lightning``) or test its performances.
We measure the relative error for the training and test datasets,
@@ -212,55 +316,73 @@ printing the mean one.
.. code:: ipython3
u_test_pred = pinn_stokes(p_test)
u_train_pred = pinn_stokes(p_train)
u_test_nn = pod_nn_stokes(p_test)
u_train_nn = pod_nn_stokes(p_train)
relative_error_train = torch.norm(u_train_pred - u_train)/torch.norm(u_train)
relative_error_test = torch.norm(u_test_pred - u_test)/torch.norm(u_test)
relative_error_train = torch.norm(u_train_nn - u_train)/torch.norm(u_train)
relative_error_test = torch.norm(u_test_nn - u_test)/torch.norm(u_test)
print('Error summary:')
print('Error summary for POD-NN model:')
print(f' Train: {relative_error_train.item():e}')
print(f' Test: {relative_error_test.item():e}')
.. parsed-literal::
Error summary:
Train: 3.865598e-02
Test: 3.593161e-02
Error summary for POD-NN model:
Train: 3.767902e-02
Test: 3.488588e-02
We can of course also plot the solutions predicted by the ``PODNN``
model, comparing them to the original ones. We can note here some
differences, especially for low velocities, but improvements can be
accomplished thanks to longer training.
POD-RBF vs POD-NN
-----------------
We can of course also plot the solutions predicted by the ``PODRBF`` and
by the ``PODNN`` model, comparing them to the original ones. We can note
here, in the ``PODNN`` model and for low velocities, some differences,
but improvements can be accomplished thanks to longer training.
.. code:: ipython3
idx = torch.randint(0, len(u_test_pred), (4,))
u_idx = pinn_stokes(p_test[idx])
idx = torch.randint(0, len(u_test), (4,))
u_idx_rbf = pod_rbf(p_test[idx])
u_idx_nn = pod_nn_stokes(p_test[idx])
import numpy as np
import matplotlib
fig, axs = plt.subplots(3, 4, figsize=(14, 9))
import matplotlib.pyplot as plt
relative_error = np.abs(u_test[idx] - u_idx.detach())
relative_error = np.where(u_test[idx] < 1e-7, 1e-7, relative_error/u_test[idx])
fig, axs = plt.subplots(5, 4, figsize=(14, 9))
for i, (idx_, u_, err_) in enumerate(zip(idx, u_idx, relative_error)):
cm = axs[0, i].tricontourf(dataset.triang, u_.detach())
relative_error_rbf = np.abs(u_test[idx] - u_idx_rbf.detach())
relative_error_rbf = np.where(u_test[idx] < 1e-7, 1e-7, relative_error_rbf/u_test[idx])
relative_error_nn = np.abs(u_test[idx] - u_idx_nn.detach())
relative_error_nn = np.where(u_test[idx] < 1e-7, 1e-7, relative_error_nn/u_test[idx])
for i, (idx_, rbf_, nn_, rbf_err_, nn_err_) in enumerate(
zip(idx, u_idx_rbf, u_idx_nn, relative_error_rbf, relative_error_nn)):
axs[0, i].set_title(f'$\mu$ = {p_test[idx_].item():.2f}')
plt.colorbar(cm)
cm = axs[1, i].tricontourf(dataset.triang, u_test[idx_].flatten())
plt.colorbar(cm)
cm = axs[0, i].tricontourf(dataset.triang, rbf_.detach()) # POD-RBF prediction
plt.colorbar(cm, ax=axs[0, i])
cm = axs[2, i].tripcolor(dataset.triang, err_, norm=matplotlib.colors.LogNorm())
plt.colorbar(cm)
cm = axs[1, i].tricontourf(dataset.triang, nn_.detach()) # POD-NN prediction
plt.colorbar(cm, ax=axs[1, i])
cm = axs[2, i].tricontourf(dataset.triang, u_test[idx_].flatten()) # Truth
plt.colorbar(cm, ax=axs[2, i])
cm = axs[3, i].tripcolor(dataset.triang, rbf_err_, norm=matplotlib.colors.LogNorm()) # Error for POD-RBF
plt.colorbar(cm, ax=axs[3, i])
cm = axs[4, i].tripcolor(dataset.triang, nn_err_, norm=matplotlib.colors.LogNorm()) # Error for POD-NN
plt.colorbar(cm, ax=axs[4, i])
plt.show()
.. image:: tutorial_files/tutorial_19_0.png
.. image:: tutorial_files/tutorial_27_0.png

Binary file not shown.

Before

Width:  |  Height:  |  Size: 132 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 110 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 16 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 27 KiB

View File

@@ -13,6 +13,7 @@ __all__ = [
"FourierFeatureEmbedding",
"AVNOBlock",
"LowRankBlock",
"RBFBlock"
]
from .convolution_2d import ContinuousConvBlock
@@ -27,3 +28,4 @@ from .pod import PODBlock
from .embedding import PeriodicBoundaryEmbedding, FourierFeatureEmbedding
from .avno_layer import AVNOBlock
from .lowrank_layer import LowRankBlock
from .rbf_layer import RBFBlock

View File

@@ -0,0 +1,433 @@
"""Module for Radial Basis Function Interpolation layer."""
import math
import warnings
from itertools import combinations_with_replacement
import torch
from ...utils import check_consistency
def linear(r):
'''
Linear radial basis function.
'''
return -r
def thin_plate_spline(r, eps=1e-7):
'''
Thin plate spline radial basis function.
'''
r = torch.clamp(r, min=eps)
return r**2 * torch.log(r)
def cubic(r):
'''
Cubic radial basis function.
'''
return r**3
def quintic(r):
'''
Quintic radial basis function.
'''
return -r**5
def multiquadric(r):
'''
Multiquadric radial basis function.
'''
return -torch.sqrt(r**2 + 1)
def inverse_multiquadric(r):
'''
Inverse multiquadric radial basis function.
'''
return 1/torch.sqrt(r**2 + 1)
def inverse_quadratic(r):
'''
Inverse quadratic radial basis function.
'''
return 1/(r**2 + 1)
def gaussian(r):
'''
Gaussian radial basis function.
'''
return torch.exp(-r**2)
radial_functions = {
"linear": linear,
"thin_plate_spline": thin_plate_spline,
"cubic": cubic,
"quintic": quintic,
"multiquadric": multiquadric,
"inverse_multiquadric": inverse_multiquadric,
"inverse_quadratic": inverse_quadratic,
"gaussian": gaussian
}
scale_invariant = {"linear", "thin_plate_spline", "cubic", "quintic"}
min_degree_funcs = {
"multiquadric": 0,
"linear": 0,
"thin_plate_spline": 1,
"cubic": 1,
"quintic": 2
}
class RBFBlock(torch.nn.Module):
"""
Radial Basis Function (RBF) interpolation layer. It need to be fitted with
the data with the method :meth:`fit`, before it can be used to interpolate
new points. The layer is not trainable.
.. note::
It reproduces the implementation of ``scipy.interpolate.RBFBlock`` and
it is inspired from the implementation in `torchrbf.
<https://github.com/ArmanMaesumi/torchrbf>`_
"""
def __init__(
self,
neighbors=None,
smoothing=0.0,
kernel="thin_plate_spline",
epsilon=None,
degree=None,
):
"""
:param int neighbors: Number of neighbors to use for the
interpolation.
If ``None``, use all data points.
:param float smoothing: Smoothing parameter for the interpolation.
if 0.0, the interpolation is exact and no smoothing is applied.
:param str kernel: Radial basis function to use. Must be one of
``linear``, ``thin_plate_spline``, ``cubic``, ``quintic``,
``multiquadric``, ``inverse_multiquadric``, ``inverse_quadratic``,
or ``gaussian``.
:param float epsilon: Shape parameter that scaled the input to
the RBF. This defaults to 1 for kernels in ``scale_invariant``
dictionary, and must be specified for other kernels.
:param int degree: Degree of the added polynomial.
For some kernels, there exists a minimum degree of the polynomial
such that the RBF is well-posed. Those minimum degrees are specified
in the `min_degree_funcs` dictionary above. If `degree` is less than
the minimum degree, a warning is raised and the degree is set to the
minimum value.
"""
super().__init__()
check_consistency(neighbors, (int, type(None)))
check_consistency(smoothing, (int, float, torch.Tensor))
check_consistency(kernel, str)
check_consistency(epsilon, (float, type(None)))
check_consistency(degree, (int, type(None)))
self.neighbors = neighbors
self.smoothing = smoothing
self.kernel = kernel
self.epsilon = epsilon
self.degree = degree
self.powers = None
# initialize data points and values
self.y = None
self.d = None
# initialize attributes for the fitted model
self._shift = None
self._scale = None
self._coeffs = None
@property
def smoothing(self):
"""
Smoothing parameter for the interpolation.
:rtype: float
"""
return self._smoothing
@smoothing.setter
def smoothing(self, value):
self._smoothing = value
@property
def kernel(self):
"""
Radial basis function to use.
:rtype: str
"""
return self._kernel
@kernel.setter
def kernel(self, value):
if value not in radial_functions:
raise ValueError(f"Unknown kernel: {value}")
self._kernel = value.lower()
@property
def epsilon(self):
"""
Shape parameter that scaled the input to the RBF.
:rtype: float
"""
return self._epsilon
@epsilon.setter
def epsilon(self, value):
if value is None:
if self.kernel in scale_invariant:
value = 1.0
else:
raise ValueError("Must specify `epsilon` for this kernel.")
else:
value = float(value)
self._epsilon = value
@property
def degree(self):
"""
Degree of the added polynomial.
:rtype: int
"""
return self._degree
@degree.setter
def degree(self, value):
min_degree = min_degree_funcs.get(self.kernel, -1)
if value is None:
value = max(min_degree, 0)
else:
value = int(value)
if value < -1:
raise ValueError("`degree` must be at least -1.")
if value < min_degree:
warnings.warn(
"`degree` is too small for this kernel. Setting to "
f"{min_degree}.", UserWarning,
)
self._degree = value
def _check_data(self, y, d):
if y.ndim != 2:
raise ValueError("y must be a 2-dimensional tensor.")
if d.shape[0] != y.shape[0]:
raise ValueError(
"The first dim of d must have the same length as "
"the first dim of y."
)
if isinstance(self.smoothing, (int, float)):
self.smoothing = torch.full((y.shape[0],), self.smoothing
).float().to(y.device)
def fit(self, y, d):
"""
Fit the RBF interpolator to the data.
:param torch.Tensor y: (n, d) tensor of data points.
:param torch.Tensor d: (n, m) tensor of data values.
"""
self._check_data(y, d)
self.y = y
self.d = d
if self.neighbors is None:
nobs = self.y.shape[0]
else:
raise NotImplementedError("neighbors currently not supported")
powers = RBFBlock.monomial_powers(self.y.shape[1], self.degree).to(
y.device)
if powers.shape[0] > nobs:
raise ValueError("The data is not compatible with the "
"requested degree.")
if self.neighbors is None:
self._shift, self._scale, self._coeffs = RBFBlock.solve(self.y,
self.d.reshape((self.y.shape[0], -1)),
self.smoothing, self.kernel, self.epsilon, powers)
self.powers = powers
def forward(self, x):
"""
Returns the interpolated data at the given points `x`.
:param torch.Tensor x: `(n, d)` tensor of points at which
to query the interpolator
:rtype: `(n, m)` torch.Tensor of interpolated data.
"""
if x.ndim != 2:
raise ValueError("`x` must be a 2-dimensional tensor.")
nx, ndim = x.shape
if ndim != self.y.shape[1]:
raise ValueError(
"Expected the second dim of `x` to have length "
f"{self.y.shape[1]}."
)
kernel_func = radial_functions[self.kernel]
yeps = self.y * self.epsilon
xeps = x * self.epsilon
xhat = (x - self._shift) / self._scale
kv = RBFBlock.kernel_vector(xeps, yeps, kernel_func)
p = RBFBlock.polynomial_matrix(xhat, self.powers)
vec = torch.cat([kv, p], dim=1)
out = torch.matmul(vec, self._coeffs)
out = out.reshape((nx,) + self.d.shape[1:])
return out
@staticmethod
def kernel_vector(x, y, kernel_func):
"""
Evaluate radial functions with centers `y` for all points in `x`.
:param torch.Tensor x: `(n, d)` tensor of points.
:param torch.Tensor y: `(m, d)` tensor of centers.
:param str kernel_func: Radial basis function to use.
:rtype: `(n, m)` torch.Tensor of radial function values.
"""
return kernel_func(torch.cdist(x, y))
@staticmethod
def polynomial_matrix(x, powers):
"""
Evaluate monomials at `x` with given `powers`.
:param torch.Tensor x: `(n, d)` tensor of points.
:param torch.Tensor powers: `(r, d)` tensor of powers for each monomial.
:rtype: `(n, r)` torch.Tensor of monomial values.
"""
x_ = torch.repeat_interleave(x, repeats=powers.shape[0], dim=0)
powers_ = powers.repeat(x.shape[0], 1)
return torch.prod(x_**powers_, dim=1).view(x.shape[0], powers.shape[0])
@staticmethod
def kernel_matrix(x, kernel_func):
"""
Returns radial function values for all pairs of points in `x`.
:param torch.Tensor x: `(n, d`) tensor of points.
:param str kernel_func: Radial basis function to use.
:rtype: `(n, n`) torch.Tensor of radial function values.
"""
return kernel_func(torch.cdist(x, x))
@staticmethod
def monomial_powers(ndim, degree):
"""
Return the powers for each monomial in a polynomial.
:param int ndim: Number of variables in the polynomial.
:param int degree: Degree of the polynomial.
:rtype: `(nmonos, ndim)` torch.Tensor where each row contains the powers
for each variable in a monomial.
"""
nmonos = math.comb(degree + ndim, ndim)
out = torch.zeros((nmonos, ndim), dtype=torch.int32)
count = 0
for deg in range(degree + 1):
for mono in combinations_with_replacement(range(ndim), deg):
for var in mono:
out[count, var] += 1
count += 1
return out
@staticmethod
def build(y, d, smoothing, kernel, epsilon, powers):
"""
Build the RBF linear system.
:param torch.Tensor y: (n, d) tensor of data points.
:param torch.Tensor d: (n, m) tensor of data values.
:param torch.Tensor smoothing: (n,) tensor of smoothing parameters.
:param str kernel: Radial basis function to use.
:param float epsilon: Shape parameter that scaled the input to the RBF.
:param torch.Tensor powers: (r, d) tensor of powers for each monomial.
:rtype: (lhs, rhs, shift, scale) where `lhs` and `rhs` are the
left-hand side and right-hand side of the linear system, and
`shift` and `scale` are the shift and scale parameters.
"""
p = d.shape[0]
s = d.shape[1]
r = powers.shape[0]
kernel_func = radial_functions[kernel]
mins = torch.min(y, dim=0).values
maxs = torch.max(y, dim=0).values
shift = (maxs + mins) / 2
scale = (maxs - mins) / 2
scale[scale == 0.0] = 1.0
yeps = y * epsilon
yhat = (y - shift) / scale
lhs = torch.empty((p + r, p + r), device=d.device).float()
lhs[:p, :p] = RBFBlock.kernel_matrix(yeps, kernel_func)
lhs[:p, p:] = RBFBlock.polynomial_matrix(yhat, powers)
lhs[p:, :p] = lhs[:p, p:].T
lhs[p:, p:] = 0.0
lhs[:p, :p] += torch.diag(smoothing)
rhs = torch.empty((r + p, s), device=d.device).float()
rhs[:p] = d
rhs[p:] = 0.0
return lhs, rhs, shift, scale
@staticmethod
def solve(y, d, smoothing, kernel, epsilon, powers):
"""
Build then solve the RBF linear system.
:param torch.Tensor y: (n, d) tensor of data points.
:param torch.Tensor d: (n, m) tensor of data values.
:param torch.Tensor smoothing: (n,) tensor of smoothing parameters.
:param str kernel: Radial basis function to use.
:param float epsilon: Shape parameter that scaled the input to the RBF.
:param torch.Tensor powers: (r, d) tensor of powers for each monomial.
:raises ValueError: If the linear system is singular.
:rtype: (shift, scale, coeffs) where `shift` and `scale` are the
shift and scale parameters, and `coeffs` are the coefficients
of the interpolator
"""
lhs, rhs, shift, scale = RBFBlock.build(y, d, smoothing, kernel,
epsilon, powers)
try:
coeffs = torch.linalg.solve(lhs, rhs)
except RuntimeError as e:
msg = "Singular matrix."
nmonos = powers.shape[0]
if nmonos > 0:
pmat = RBFBlock.polynomial_matrix((y - shift) / scale, powers)
rank = torch.linalg.matrix_rank(pmat)
if rank < nmonos:
msg = (
"Singular matrix. The matrix of monomials evaluated at "
"the data point coordinates does not have full column "
f"rank ({rank}/{nmonos})."
)
raise ValueError(msg) from e
return shift, scale, coeffs

View File

@@ -0,0 +1,85 @@
import torch
import pytest
import math
from pina.model.layers.rbf_layer import RBFBlock
x = torch.linspace(-1, 1, 100)
toy_params = torch.linspace(0, 1, 10).unsqueeze(1)
toy_snapshots = torch.vstack([torch.exp(-x**2)*c for c in toy_params])
toy_params_test = torch.linspace(0, 1, 3).unsqueeze(1)
toy_snapshots_test = torch.vstack([torch.exp(-x**2)*c for c in toy_params_test])
kernels = ["linear", "thin_plate_spline", "cubic", "quintic",
"multiquadric", "inverse_multiquadric", "inverse_quadratic", "gaussian"]
noscale_invariant_kernels = ["multiquadric", "inverse_multiquadric",
"inverse_quadratic", "gaussian"]
scale_invariant_kernels = ["linear", "thin_plate_spline", "cubic", "quintic"]
def test_constructor_default():
rbf = RBFBlock()
assert rbf.kernel == "thin_plate_spline"
assert rbf.epsilon == 1
assert rbf.smoothing == 0.
@pytest.mark.parametrize("kernel", kernels)
@pytest.mark.parametrize("epsilon", [0.1, 1., 10.])
def test_constructor_epsilon(kernel, epsilon):
if kernel in scale_invariant_kernels:
rbf = RBFBlock(kernel=kernel)
assert rbf.kernel == kernel
assert rbf.epsilon == 1
elif kernel in noscale_invariant_kernels:
with pytest.raises(ValueError):
rbf = RBFBlock(kernel=kernel)
rbf = RBFBlock(kernel=kernel, epsilon=epsilon)
assert rbf.kernel == kernel
assert rbf.epsilon == epsilon
assert rbf.smoothing == 0.
@pytest.mark.parametrize("kernel", kernels)
@pytest.mark.parametrize("epsilon", [0.1, 1., 10.])
@pytest.mark.parametrize("degree", [2, 3, 4])
@pytest.mark.parametrize("smoothing", [1e-5, 1e-3, 1e-1])
def test_constructor_all(kernel, epsilon, degree, smoothing):
rbf = RBFBlock(kernel=kernel, epsilon=epsilon, degree=degree,
smoothing=smoothing)
assert rbf.kernel == kernel
assert rbf.epsilon == epsilon
assert rbf.degree == degree
assert rbf.smoothing == smoothing
assert rbf.y == None
assert rbf.d == None
assert rbf.powers == None
assert rbf._shift == None
assert rbf._scale == None
assert rbf._coeffs == None
def test_fit():
rbf = RBFBlock()
rbf.fit(toy_params, toy_snapshots)
ndim = toy_params.shape[1]
torch.testing.assert_close(rbf.y, toy_params)
torch.testing.assert_close(rbf.d, toy_snapshots)
assert rbf.powers.shape == (math.comb(rbf.degree+ndim, ndim), ndim)
assert rbf._shift.shape == (ndim,)
assert rbf._scale.shape == (ndim,)
assert rbf._coeffs.shape == (rbf.powers.shape[0]+toy_snapshots.shape[0], toy_snapshots.shape[1])
def test_forward():
rbf = RBFBlock()
rbf.fit(toy_params, toy_snapshots)
c = rbf(toy_params)
assert c.shape == toy_snapshots.shape
torch.testing.assert_close(c, toy_snapshots)
def test_forward_unseen_parameters():
rbf = RBFBlock()
rbf.fit(toy_params, toy_snapshots)
c = rbf(toy_params_test)
assert c.shape == toy_snapshots_test.shape
torch.testing.assert_close(c, toy_snapshots_test)

2
tutorials/README.md vendored
View File

@@ -32,4 +32,4 @@ Time dependent Kuramoto Sivashinsky equation using the Averaging Neural Operator
| Description | Tutorial |
|---------------|-----------|
Unstructured convolutional autoencoder via continuous convolution |[[.ipynb](tutorial4/tutorial.ipynb),&#160;[.py](tutorial4/tutorial.py),&#160;[.html](http://mathlab.github.io/PINA/_rst/tutorials/tutorial4/tutorial.html)]|
POD-NN for reduced order modeling| [[.ipynb](tutorial8/tutorial.ipynb),&#160;[.py](tutorial8/tutorial.py),&#160;[.html](http://mathlab.github.io/PINA/_rst/tutorials/tutorial8/tutorial.html)]|
POD-RBF and POD-NN for reduced order modeling| [[.ipynb](tutorial8/tutorial.ipynb),&#160;[.py](tutorial8/tutorial.py),&#160;[.html](http://mathlab.github.io/PINA/_rst/tutorials/tutorial8/tutorial.html)]|

File diff suppressed because one or more lines are too long

View File

@@ -1,11 +1,11 @@
#!/usr/bin/env python
# coding: utf-8
# # Tutorial: Reduced order model (PODNN) for parametric problems
# # Tutorial: Reduced order model (POD-RBF and POD-NN) for parametric problems
# The tutorial aims to show how to employ the **PINA** library in order to apply a reduced order modeling technique [1]. Such methodologies have several similarities with machine learning approaches, since the main goal consists of predicting the solution of differential equations (typically parametric PDEs) in a real-time fashion.
# The tutorial aims to show how to employ the **PINA** library in order to apply a reduced order modeling technique [1]. Such methodologies have several similarities with machine learning approaches, since the main goal consists in predicting the solution of differential equations (typically parametric PDEs) in a real-time fashion.
#
# In particular we are going to use the Proper Orthogonal Decomposition with Neural Network (PODNN) [2], which basically perform a dimensional reduction using the POD approach, approximating the parametric solution manifold (at the reduced space) using a NN. In this example, we use a simple multilayer perceptron, but the plenty of different archiutectures can be plugged as well.
# In particular we are going to use the Proper Orthogonal Decomposition with either Radial Basis Function Interpolation(POD-RBF) or Neural Network (POD-NN) [2]. Here we basically perform a dimensional reduction using the POD approach, and approximating the parametric solution manifold (at the reduced space) using an interpolation (RBF) or a regression technique (NN). In this example, we use a simple multilayer perceptron, but the plenty of different architectures can be plugged as well.
#
# #### References
# 1. Rozza G., Stabile G., Ballarin F. (2022). Advanced Reduced Order Methods and Applications in Computational Fluid Dynamics, Society for Industrial and Applied Mathematics.
@@ -14,7 +14,7 @@
# Let's start with the necessary imports.
# It's important to note the minimum PINA version to run this tutorial is the `0.1`.
# In[29]:
# In[1]:
get_ipython().run_line_magic('matplotlib', 'inline')
@@ -26,7 +26,7 @@ import pina
from pina.geometry import CartesianDomain
from pina.problem import ParametricProblem
from pina.model.layers import PODBlock
from pina.model.layers import PODBlock, RBFBlock
from pina import Condition, LabelTensor, Trainer
from pina.model import FeedForward
from pina.solvers import SupervisedSolver
@@ -37,9 +37,9 @@ print(f'We are using PINA version {pina.__version__}')
# We exploit the [Smithers](www.github.com/mathLab/Smithers) library to collect the parametric snapshots. In particular, we use the `NavierStokesDataset` class that contains a set of parametric solutions of the Navier-Stokes equations in a 2D L-shape domain. The parameter is the inflow velocity.
# The dataset is composed by 500 snapshots of the velocity (along $x$, $y$, and the magnitude) and pressure fields, and the corresponding parameter values.
#
# To visually check the snapshots, let's plot also the data points and the reference solution: this is the expected output of the neural network.
# To visually check the snapshots, let's plot also the data points and the reference solution: this is the expected output of our model.
# In[30]:
# In[2]:
from smithers.dataset import NavierStokesDataset
@@ -51,10 +51,10 @@ for ax, p, u in zip(axs, dataset.params[:4], dataset.snapshots['mag(v)'][:4]):
ax.set_title(f'$\mu$ = {p[0]:.2f}')
# The *snapshots* - aka the numerical solutions computed for several parameters - and the corresponding parameters are the only data we need to train the model, in order to predict for any new test parameter the solution.
# The *snapshots* - aka the numerical solutions computed for several parameters - and the corresponding parameters are the only data we need to train the model, in order to predict the solution for any new test parameter.
# To properly validate the accuracy, we initially split the 500 snapshots into the training dataset (90% of the original data) and the testing one (the reamining 10%). It must be said that, to plug the snapshots into **PINA**, we have to cast them to `LabelTensor` objects.
# In[31]:
# In[3]:
u = torch.tensor(dataset.snapshots['mag(v)']).float()
@@ -73,7 +73,7 @@ p_train, p_test = p[:n_train], p[n_train:]
# It is now time to define the problem! We inherit from `ParametricProblem` (since the space invariant typically of this methodology), just defining a simple *input-output* condition.
# In[32]:
# In[4]:
class SnapshotProblem(ParametricProblem):
@@ -81,13 +81,85 @@ class SnapshotProblem(ParametricProblem):
parameter_domain = CartesianDomain({'mu': [0, 100]})
conditions = {
'io': Condition(input_points=p, output_points=u)
'io': Condition(input_points=p_train, output_points=u_train)
}
poisson_problem = SnapshotProblem()
# Then, we define the model we want to use: basically we have a MLP architecture that takes in input the parameter and return the *modal coefficients*, so the reduced dimension representation (the coordinates in the POD space). Such latent variable is the projected to the original space using the POD modes, which are computed and stored in the `PODBlock` object.
# In[33]:
# We can then build a `PODRBF` model (using a Radial Basis Function interpolation as approximation) and a `PODNN` approach (using an MLP architecture as approximation).
# ## POD-RBF reduced order model
# Then, we define the model we want to use, with the POD (`PODBlock`) and the RBF (`RBFBlock`) objects.
# In[5]:
class PODRBF(torch.nn.Module):
"""
Proper orthogonal decomposition with Radial Basis Function interpolation model.
"""
def __init__(self, pod_rank, rbf_kernel):
"""
"""
super().__init__()
self.pod = PODBlock(pod_rank)
self.rbf = RBFBlock(kernel=rbf_kernel)
def forward(self, x):
"""
Defines the computation performed at every call.
:param x: The tensor to apply the forward pass.
:type x: torch.Tensor
:return: the output computed by the model.
:rtype: torch.Tensor
"""
coefficents = self.rbf(x)
return self.pod.expand(coefficents)
def fit(self, p, x):
"""
Call the :meth:`pina.model.layers.PODBlock.fit` method of the
:attr:`pina.model.layers.PODBlock` attribute to perform the POD,
and the :meth:`pina.model.layers.RBFBlock.fit` method of the
:attr:`pina.model.layers.RBFBlock` attribute to fit the interpolation.
"""
self.pod.fit(x)
self.rbf.fit(p, self.pod.reduce(x))
# We can then fit the model and ask it to predict the required field for unseen values of the parameters. Note that this model does not need a `Trainer` since it does not include any neural network or learnable parameters.
# In[6]:
pod_rbf = PODRBF(pod_rank=20, rbf_kernel='thin_plate_spline')
pod_rbf.fit(p_train, u_train)
# In[7]:
u_test_rbf = pod_rbf(p_test)
u_train_rbf = pod_rbf(p_train)
relative_error_train = torch.norm(u_train_rbf - u_train)/torch.norm(u_train)
relative_error_test = torch.norm(u_test_rbf - u_test)/torch.norm(u_test)
print('Error summary for POD-RBF model:')
print(f' Train: {relative_error_train.item():e}')
print(f' Test: {relative_error_test.item():e}')
# ## POD-NN reduced order model
# In[8]:
class PODNN(torch.nn.Module):
@@ -130,30 +202,28 @@ class PODNN(torch.nn.Module):
self.pod.fit(x)
# We highlight that the POD modes are directly computed by means of the singular value decomposition (computed over the input data), and not trained using the back-propagation approach. Only the weights of the MLP are actually trained during the optimization loop.
# We highlight that the POD modes are directly computed by means of the singular value decomposition (computed over the input data), and not trained using the backpropagation approach. Only the weights of the MLP are actually trained during the optimization loop.
# In[34]:
# In[9]:
poisson_problem = SnapshotProblem()
pod_nn = PODNN(pod_rank=20, layers=[10, 10, 10], func=torch.nn.Tanh)
pod_nn.fit_pod(u)
pod_nn.fit_pod(u_train)
pinn_stokes = SupervisedSolver(
pod_nn_stokes = SupervisedSolver(
problem=poisson_problem,
model=pod_nn,
optimizer=torch.optim.Adam,
optimizer_kwargs={'lr': 0.0001})
# Now that we set the `Problem` and the `Model`, we have just to train the model and use it for predict the test snapshots.
# Now that we have set the `Problem` and the `Model`, we have just to train the model and use it for predicting the test snapshots.
# In[35]:
# In[10]:
trainer = Trainer(
solver=pinn_stokes,
solver=pod_nn_stokes,
max_epochs=1000,
batch_size=100,
log_every_n_steps=5,
@@ -161,47 +231,69 @@ trainer = Trainer(
trainer.train()
# Done! Now the computational expensive part is over, we can load in future the model to infer new parameters (simply loading the checkpoint file automatically created by `Lightning`) or test its performances. We measure the relative error for the training and test datasets, printing the mean one.
# Done! Now that the computational expensive part is over, we can load in future the model to infer new parameters (simply loading the checkpoint file automatically created by `Lightning`) or test its performances. We measure the relative error for the training and test datasets, printing the mean one.
# In[36]:
# In[11]:
u_test_pred = pinn_stokes(p_test)
u_train_pred = pinn_stokes(p_train)
u_test_nn = pod_nn_stokes(p_test)
u_train_nn = pod_nn_stokes(p_train)
relative_error_train = torch.norm(u_train_pred - u_train)/torch.norm(u_train)
relative_error_test = torch.norm(u_test_pred - u_test)/torch.norm(u_test)
relative_error_train = torch.norm(u_train_nn - u_train)/torch.norm(u_train)
relative_error_test = torch.norm(u_test_nn - u_test)/torch.norm(u_test)
print('Error summary:')
print('Error summary for POD-NN model:')
print(f' Train: {relative_error_train.item():e}')
print(f' Test: {relative_error_test.item():e}')
# We can of course also plot the solutions predicted by the `PODNN` model, comparing them to the original ones. We can note here some differences, especially for low velocities, but improvements can be accomplished thanks to longer training.
# ## POD-RBF vs POD-NN
# In[37]:
# We can of course also plot the solutions predicted by the `PODRBF` and by the `PODNN` model, comparing them to the original ones. We can note here, in the `PODNN` model and for low velocities, some differences, but improvements can be accomplished thanks to longer training.
# In[12]:
idx = torch.randint(0, len(u_test_pred), (4,))
u_idx = pinn_stokes(p_test[idx])
idx = torch.randint(0, len(u_test), (4,))
u_idx_rbf = pod_rbf(p_test[idx])
u_idx_nn = pod_nn_stokes(p_test[idx])
import numpy as np
import matplotlib
fig, axs = plt.subplots(3, 4, figsize=(14, 9))
import matplotlib.pyplot as plt
relative_error = np.abs(u_test[idx] - u_idx.detach())
relative_error = np.where(u_test[idx] < 1e-7, 1e-7, relative_error/u_test[idx])
fig, axs = plt.subplots(5, 4, figsize=(14, 9))
for i, (idx_, u_, err_) in enumerate(zip(idx, u_idx, relative_error)):
cm = axs[0, i].tricontourf(dataset.triang, u_.detach())
relative_error_rbf = np.abs(u_test[idx] - u_idx_rbf.detach())
relative_error_rbf = np.where(u_test[idx] < 1e-7, 1e-7, relative_error_rbf/u_test[idx])
relative_error_nn = np.abs(u_test[idx] - u_idx_nn.detach())
relative_error_nn = np.where(u_test[idx] < 1e-7, 1e-7, relative_error_nn/u_test[idx])
for i, (idx_, rbf_, nn_, rbf_err_, nn_err_) in enumerate(
zip(idx, u_idx_rbf, u_idx_nn, relative_error_rbf, relative_error_nn)):
axs[0, i].set_title(f'$\mu$ = {p_test[idx_].item():.2f}')
plt.colorbar(cm)
cm = axs[1, i].tricontourf(dataset.triang, u_test[idx_].flatten())
plt.colorbar(cm)
cm = axs[0, i].tricontourf(dataset.triang, rbf_.detach()) # POD-RBF prediction
plt.colorbar(cm, ax=axs[0, i])
cm = axs[2, i].tripcolor(dataset.triang, err_, norm=matplotlib.colors.LogNorm())
plt.colorbar(cm)
cm = axs[1, i].tricontourf(dataset.triang, nn_.detach()) # POD-NN prediction
plt.colorbar(cm, ax=axs[1, i])
cm = axs[2, i].tricontourf(dataset.triang, u_test[idx_].flatten()) # Truth
plt.colorbar(cm, ax=axs[2, i])
cm = axs[3, i].tripcolor(dataset.triang, rbf_err_, norm=matplotlib.colors.LogNorm()) # Error for POD-RBF
plt.colorbar(cm, ax=axs[3, i])
cm = axs[4, i].tripcolor(dataset.triang, nn_err_, norm=matplotlib.colors.LogNorm()) # Error for POD-NN
plt.colorbar(cm, ax=axs[4, i])
plt.show()
# In[ ]: