* solvers -> solver
* adaptive_functions -> adaptive_function
* callbacks -> callback
* operators -> operator
* pinns -> physics_informed_solver
* layers -> block
This commit is contained in:
Dario Coscia
2025-02-19 11:35:43 +01:00
committed by Nicola Demo
parent 810d215ca0
commit df673cad4e
90 changed files with 155 additions and 151 deletions

View File

@@ -1,35 +0,0 @@
__all__ = [
"ContinuousConvBlock",
"ResidualBlock",
"EnhancedLinear",
"SpectralConvBlock1D",
"SpectralConvBlock2D",
"SpectralConvBlock3D",
"FourierBlock1D",
"FourierBlock2D",
"FourierBlock3D",
"PODBlock",
"OrthogonalBlock",
"PeriodicBoundaryEmbedding",
"FourierFeatureEmbedding",
"AVNOBlock",
"LowRankBlock",
"RBFBlock",
"GNOBlock"
]
from .convolution_2d import ContinuousConvBlock
from .residual import ResidualBlock, EnhancedLinear
from .spectral import (
SpectralConvBlock1D,
SpectralConvBlock2D,
SpectralConvBlock3D,
)
from .fourier import FourierBlock1D, FourierBlock2D, FourierBlock3D
from .pod import PODBlock
from .orthogonal import OrthogonalBlock
from .embedding import PeriodicBoundaryEmbedding, FourierFeatureEmbedding
from .avno_layer import AVNOBlock
from .lowrank_layer import LowRankBlock
from .rbf_layer import RBFBlock
from .gno_block import GNOBlock

View File

@@ -1,67 +0,0 @@
""" Module for Averaging Neural Operator Layer class. """
from torch import nn, mean
from pina.utils import check_consistency
class AVNOBlock(nn.Module):
r"""
The PINA implementation of the inner layer of the Averaging Neural Operator.
The operator layer performs an affine transformation where the convolution
is approximated with a local average. Given the input function
:math:`v(x)\in\mathbb{R}^{\rm{emb}}` the layer computes
the operator update :math:`K(v)` as:
.. math::
K(v) = \sigma\left(Wv(x) + b + \frac{1}{|\mathcal{A}|}\int v(y)dy\right)
where:
* :math:`\mathbb{R}^{\rm{emb}}` is the embedding (hidden) size
corresponding to the ``hidden_size`` object
* :math:`\sigma` is a non-linear activation, corresponding to the
``func`` object
* :math:`W\in\mathbb{R}^{\rm{emb}\times\rm{emb}}` is a tunable matrix.
* :math:`b\in\mathbb{R}^{\rm{emb}}` is a tunable bias.
.. seealso::
**Original reference**: Lanthaler S. Li, Z., Kovachki,
Stuart, A. (2020). *The Nonlocal Neural Operator: Universal
Approximation*.
DOI: `arXiv preprint arXiv:2304.13221.
<https://arxiv.org/abs/2304.13221>`_
"""
def __init__(self, hidden_size=100, func=nn.GELU):
"""
:param int hidden_size: Size of the hidden layer, defaults to 100.
:param func: The activation function, default to nn.GELU.
"""
super().__init__()
# Check type consistency
check_consistency(hidden_size, int)
check_consistency(func, nn.Module, subclass=True)
# Assignment
self._nn = nn.Linear(hidden_size, hidden_size)
self._func = func()
def forward(self, x):
r"""
Forward pass of the layer, it performs a sum of local average
and an affine transformation of the field.
:param torch.Tensor x: The input tensor for performing the
computation. It expects a tensor :math:`B \times N \times D`,
where :math:`B` is the batch_size, :math:`N` the number of points
in the mesh, :math:`D` the dimension of the problem. In particular
:math:`D` is the codomain of the function :math:`v`. For example
a scalar function has :math:`D=1`, a 4-dimensional vector function
:math:`D=4`.
:return: The output tensor obtained from Average Neural Operator Block.
:rtype: torch.Tensor
"""
return self._func(self._nn(x) + mean(x, dim=1, keepdim=True))

View File

@@ -1,181 +0,0 @@
"""Module for Base Continuous Convolution class."""
from abc import ABCMeta, abstractmethod
import torch
from .stride import Stride
from .utils_convolution import optimizing
class BaseContinuousConv(torch.nn.Module, metaclass=ABCMeta):
"""
Abstract class
"""
def __init__(
self,
input_numb_field,
output_numb_field,
filter_dim,
stride,
model=None,
optimize=False,
no_overlap=False,
):
"""
Base Class for Continuous Convolution.
The algorithm expects input to be in the form:
$$[B \times N_{in} \times N \times D]$$
where $B$ is the batch_size, $N_{in}$ is the number of input
fields, $N$ the number of points in the mesh, $D$ the dimension
of the problem. In particular:
* $D$ is the number of spatial variables + 1. The last column must
contain the field value. For example for 2D problems $D=3$ and
the tensor will be something like `[first coordinate, second
coordinate, field value]`.
* $N_{in}$ represents the number of vectorial function presented.
For example a vectorial function $f = [f_1, f_2]$ will have
$N_{in}=2$.
:Note
A 2-dimensional vectorial function $N_{in}=2$ of 3-dimensional
input $D=3+1=4$ with 100 points input mesh and batch size of 8
is represented as a tensor `[8, 2, 100, 4]`, where the columns
`[:, 0, :, -1]` and `[:, 1, :, -1]` represent the first and
second filed value respectively
The algorithm returns a tensor of shape:
$$[B \times N_{out} \times N' \times D]$$
where $B$ is the batch_size, $N_{out}$ is the number of output
fields, $N'$ the number of points in the mesh, $D$ the dimension
of the problem.
:param input_numb_field: number of fields in the input
:type input_numb_field: int
:param output_numb_field: number of fields in the output
:type output_numb_field: int
:param filter_dim: dimension of the filter
:type filter_dim: tuple/ list
:param stride: stride for the filter
:type stride: dict
:param model: neural network for inner parametrization,
defaults to None.
:type model: torch.nn.Module, optional
:param optimize: flag for performing optimization on the continuous
filter, defaults to False. The flag `optimize=True` should be
used only when the scatter datapoints are fixed through the
training. If torch model is in `.eval()` mode, the flag is
automatically set to False always.
:type optimize: bool, optional
:param no_overlap: flag for performing optimization on the transpose
continuous filter, defaults to False. The flag set to `True` should
be used only when the filter positions do not overlap for different
strides. RuntimeError will raise in case of non-compatible strides.
:type no_overlap: bool, optional
"""
super().__init__()
if isinstance(input_numb_field, int):
self._input_numb_field = input_numb_field
else:
raise ValueError("input_numb_field must be int.")
if isinstance(output_numb_field, int):
self._output_numb_field = output_numb_field
else:
raise ValueError("input_numb_field must be int.")
if isinstance(filter_dim, (tuple, list)):
vect = filter_dim
else:
raise ValueError("filter_dim must be tuple or list.")
vect = torch.tensor(vect)
self.register_buffer("_dim", vect, persistent=False)
if isinstance(stride, dict):
self._stride = Stride(stride)
else:
raise ValueError("stride must be dictionary.")
self._net = model
if isinstance(optimize, bool):
self._optimize = optimize
else:
raise ValueError("optimize must be bool.")
# choosing how to initialize based on optimization
if self._optimize:
# optimizing decorator ensure the function is called
# just once
self._choose_initialization = optimizing(
self._initialize_convolution
)
else:
self._choose_initialization = self._initialize_convolution
if not isinstance(no_overlap, bool):
raise ValueError("no_overlap must be bool.")
if no_overlap:
raise NotImplementedError
self.transpose = self.transpose_no_overlap
else:
self.transpose = self.transpose_overlap
class DefaultKernel(torch.nn.Module):
def __init__(self, input_dim, output_dim):
super().__init__()
assert isinstance(input_dim, int)
assert isinstance(output_dim, int)
self._model = torch.nn.Sequential(
torch.nn.Linear(input_dim, 20),
torch.nn.ReLU(),
torch.nn.Linear(20, 20),
torch.nn.ReLU(),
torch.nn.Linear(20, output_dim),
)
def forward(self, x):
return self._model(x)
@property
def net(self):
return self._net
@property
def stride(self):
return self._stride
@property
def filter_dim(self):
return self._dim
@property
def input_numb_field(self):
return self._input_numb_field
@property
def output_numb_field(self):
return self._output_numb_field
@property
@abstractmethod
def forward(self, X):
pass
@property
@abstractmethod
def transpose_overlap(self, X):
pass
@property
@abstractmethod
def transpose_no_overlap(self, X):
pass
@property
@abstractmethod
def _initialize_convolution(self, X, type):
pass

View File

@@ -1,571 +0,0 @@
"""Module for Continuous Convolution class"""
from .convolution import BaseContinuousConv
from .utils_convolution import check_point, map_points_
from .integral import Integral
import torch
class ContinuousConvBlock(BaseContinuousConv):
"""
Implementation of Continuous Convolutional operator.
The algorithm expects input to be in the form:
:math:`[B, N_{in}, N, D]`
where :math:`B` is the batch_size, :math:`N_{in}` is the number of input
fields, :math:`N` the number of points in the mesh, :math:`D` the dimension
of the problem. In particular:
* :math:`D` is the number of spatial variables + 1. The last column must
contain the field value. For example for 2D problems :math:`D=3` and
the tensor will be something like ``[first coordinate, second
coordinate, field value]``.
* :math:`N_{in}` represents the number of vectorial function presented.
For example a vectorial function :math:`f = [f_1, f_2]` will have
:math:`N_{in}=2`.
.. seealso::
**Original reference**: Coscia, D., Meneghetti, L., Demo, N. et al.
*A continuous convolutional trainable filter for modelling unstructured data*.
Comput Mech 72, 253265 (2023). DOI `<https://doi.org/10.1007/s00466-023-02291-1>`_
"""
def __init__(
self,
input_numb_field,
output_numb_field,
filter_dim,
stride,
model=None,
optimize=False,
no_overlap=False,
):
"""
:param input_numb_field: Number of fields :math:`N_{in}` in the input.
:type input_numb_field: int
:param output_numb_field: Number of fields :math:`N_{out}` in the output.
:type output_numb_field: int
:param filter_dim: Dimension of the filter.
:type filter_dim: tuple(int) | list(int)
:param stride: Stride for the filter.
:type stride: dict
:param model: Neural network for inner parametrization,
defaults to ``None``. If None, a default multilayer perceptron
of width three and size twenty with ReLU activation is used.
:type model: torch.nn.Module
:param optimize: Flag for performing optimization on the continuous
filter, defaults to False. The flag `optimize=True` should be
used only when the scatter datapoints are fixed through the
training. If torch model is in ``.eval()`` mode, the flag is
automatically set to False always.
:type optimize: bool
:param no_overlap: Flag for performing optimization on the transpose
continuous filter, defaults to False. The flag set to `True` should
be used only when the filter positions do not overlap for different
strides. RuntimeError will raise in case of non-compatible strides.
:type no_overlap: bool
.. note::
Using `optimize=True` the filter can be use either in `forward`
or in `transpose` mode, not both. If `optimize=False` the same
filter can be used for both `transpose` and `forward` modes.
:Example:
>>> class MLP(torch.nn.Module):
def __init__(self) -> None:
super().__init__()
self. model = torch.nn.Sequential(
torch.nn.Linear(2, 8),
torch.nn.ReLU(),
torch.nn.Linear(8, 8),
torch.nn.ReLU(),
torch.nn.Linear(8, 1))
def forward(self, x):
return self.model(x)
>>> dim = [3, 3]
>>> stride = {"domain": [10, 10],
"start": [0, 0],
"jumps": [3, 3],
"direction": [1, 1.]}
>>> conv = ContinuousConv2D(1, 2, dim, stride, MLP)
>>> conv
ContinuousConv2D(
(_net): ModuleList(
(0): MLP(
(model): Sequential(
(0): Linear(in_features=2, out_features=8, bias=True)
(1): ReLU()
(2): Linear(in_features=8, out_features=8, bias=True)
(3): ReLU()
(4): Linear(in_features=8, out_features=1, bias=True)
)
)
(1): MLP(
(model): Sequential(
(0): Linear(in_features=2, out_features=8, bias=True)
(1): ReLU()
(2): Linear(in_features=8, out_features=8, bias=True)
(3): ReLU()
(4): Linear(in_features=8, out_features=1, bias=True)
)
)
)
)
"""
super().__init__(
input_numb_field=input_numb_field,
output_numb_field=output_numb_field,
filter_dim=filter_dim,
stride=stride,
model=model,
optimize=optimize,
no_overlap=no_overlap,
)
# integral routine
self._integral = Integral("discrete")
# create the network
self._net = self._spawn_networks(model)
# stride for continuous convolution overridden
self._stride = self._stride._stride_discrete
def _spawn_networks(self, model):
"""
Private method to create a collection of kernels
:param model: A :class:`torch.nn.Module` model in form of Object class.
:type model: torch.nn.Module
:return: List of :class:`torch.nn.Module` models.
:rtype: torch.nn.ModuleList
"""
nets = []
if self._net is None:
for _ in range(self._input_numb_field * self._output_numb_field):
tmp = ContinuousConvBlock.DefaultKernel(len(self._dim), 1)
nets.append(tmp)
else:
if not isinstance(model, object):
raise ValueError(
"Expected a python class inheriting" " from torch.nn.Module"
)
for _ in range(self._input_numb_field * self._output_numb_field):
tmp = model()
if not isinstance(tmp, torch.nn.Module):
raise ValueError(
"The python class must be inherited from"
" torch.nn.Module. See the docstring for"
" an example."
)
nets.append(tmp)
return torch.nn.ModuleList(nets)
def _extract_mapped_points(self, batch_idx, index, x):
"""
Priviate method to extract mapped points in the filter
:param x: Input tensor of shape ``[channel, N, dim]``
:type x: torch.Tensor
:return: Mapped points and indeces for each channel,
:rtype: torch.Tensor, list
"""
mapped_points = []
indeces_channels = []
for stride_idx, current_stride in enumerate(self._stride):
# indeces of points falling into filter range
indeces = index[stride_idx][batch_idx]
# how many points for each channel fall into the filter?
numb_points_insiede = torch.sum(indeces, dim=-1).tolist()
# extracting points for each channel
# shape: [sum(numb_points_insiede), filter_dim + 1]
point_stride = x[indeces]
# mapping points in filter domain
map_points_(point_stride[..., :-1], current_stride)
# extracting points for each channel
point_stride_channel = point_stride.split(numb_points_insiede)
# appending in list for later use
mapped_points.append(point_stride_channel)
indeces_channels.append(numb_points_insiede)
# stacking input for passing to neural net
mapping = map(torch.cat, zip(*mapped_points))
stacked_input = tuple(mapping)
indeces_channels = tuple(zip(*indeces_channels))
return stacked_input, indeces_channels
def _find_index(self, X):
"""
Private method to extract indeces for convolution.
:param X: Input tensor, as in ContinuousConvBlock ``__init__``.
:type X: torch.Tensor
"""
# append the index for each stride
index = []
for _, current_stride in enumerate(self._stride):
tmp = check_point(X, current_stride, self._dim)
index.append(tmp)
# storing the index
self._index = index
def _make_grid_forward(self, X):
"""
Private method to create forward convolution grid.
:param X: Input tensor, as in ContinuousConvBlock docstring.
:type X: torch.Tensor
"""
# filter dimension + number of points in output grid
filter_dim = len(self._dim)
number_points = len(self._stride)
# initialize the grid
grid = torch.zeros(
size=(
X.shape[0],
self._output_numb_field,
number_points,
filter_dim + 1,
),
device=X.device,
dtype=X.dtype,
)
grid[..., :-1] = self._stride + self._dim * 0.5
# saving the grid
self._grid = grid.detach()
def _make_grid_transpose(self, X):
"""
Private method to create transpose convolution grid.
:param X: Input tensor, as in ContinuousConvBlock docstring.
:type X: torch.Tensor
"""
# initialize to all zeros
tmp = torch.zeros_like(X)
tmp[..., :-1] = X[..., :-1]
# save on tmp
self._grid_transpose = tmp
def _make_grid(self, X, type):
"""
Private method to create convolution grid.
:param X: Input tensor, as in ContinuousConvBlock docstring.
:type X: torch.Tensor
:param type: Type of convolution, ``['forward', 'inverse']`` the
possibilities.
:type type: str
"""
# choose the type of convolution
if type == "forward":
return self._make_grid_forward(X)
elif type == "inverse":
self._make_grid_transpose(X)
else:
raise TypeError
def _initialize_convolution(self, X, type="forward"):
"""
Private method to intialize the convolution.
The convolution is initialized by setting a grid and
calculate the index for finding the points inside the
filter.
:param X: Input tensor, as in ContinuousConvBlock docstring.
:type X: torch.Tensor
:param str type: type of convolution, ``['forward', 'inverse'] ``the
possibilities.
"""
# variable for the convolution
self._make_grid(X, type)
# calculate the index
self._find_index(X)
def forward(self, X):
"""
Forward pass in the convolutional layer.
:param x: Input data for the convolution :math:`[B, N_{in}, N, D]`.
:type x: torch.Tensor
:return: Convolution output :math:`[B, N_{out}, N, D]`.
:rtype: torch.Tensor
"""
# initialize convolution
if self.training: # we choose what to do based on optimization
self._choose_initialization(X, type="forward")
else: # we always initialize on testing
self._initialize_convolution(X, "forward")
# create convolutional array
conv = self._grid.clone().detach()
# total number of fields
tot_dim = self._output_numb_field * self._input_numb_field
for batch_idx, x in enumerate(X):
# extract mapped points
stacked_input, indeces_channels = self._extract_mapped_points(
batch_idx, self._index, x
)
# compute the convolution
# storing intermidiate results for each channel convolution
res_tmp = []
# for each field
for idx_conv in range(tot_dim):
# index for each input field
idx = idx_conv % self._input_numb_field
# extract input for each channel
single_channel_input = stacked_input[idx]
# extract filter
net = self._net[idx_conv]
# calculate filter value
staked_output = net(single_channel_input[..., :-1])
# perform integral for all strides in one field
integral = self._integral(
staked_output,
single_channel_input[..., -1],
indeces_channels[idx],
)
res_tmp.append(integral)
# stacking integral results
res_tmp = torch.stack(res_tmp)
# sum filters (for each input fields) in groups
# for different ouput fields
conv[batch_idx, ..., -1] = res_tmp.reshape(
self._output_numb_field, self._input_numb_field, -1
).sum(1)
return conv
def transpose_no_overlap(self, integrals, X):
"""
Transpose pass in the layer for no-overlapping filters
:param integrals: Weights for the transpose convolution. Shape
:math:`[B, N_{in}, N]`
where B is the batch_size, :math`N_{in}` is the number of input
fields, :math:`N` the number of points in the mesh, D the dimension
of the problem.
:type integral: torch.tensor
:param X: Input data. Expect tensor of shape
:math:`[B, N_{in}, M, D]` where :math:`B` is the batch_size,
:math`N_{in}`is the number of input fields, :math:`M` the number of points
in the mesh, :math:`D` the dimension of the problem.
:type X: torch.Tensor
:return: Feed forward transpose convolution. Tensor of shape
:math:`[B, N_{out}, M, D]` where :math:`B` is the batch_size,
:math`N_{out}`is the number of input fields, :math:`M` the number of points
in the mesh, :math:`D` the dimension of the problem.
:rtype: torch.Tensor
.. note::
This function is automatically called when ``.transpose()``
method is used and ``no_overlap=True``
"""
# initialize convolution
if self.training: # we choose what to do based on optimization
self._choose_initialization(X, type="inverse")
else: # we always initialize on testing
self._initialize_convolution(X, "inverse")
# initialize grid
X = self._grid_transpose.clone().detach()
conv_transposed = self._grid_transpose.clone().detach()
# total number of dim
tot_dim = self._input_numb_field * self._output_numb_field
for batch_idx, x in enumerate(X):
# extract mapped points
stacked_input, indeces_channels = self._extract_mapped_points(
batch_idx, self._index, x
)
# compute the transpose convolution
# total number of fields
res_tmp = []
# for each field
for idx_conv in range(tot_dim):
# index for each output field
idx = idx_conv % self._output_numb_field
# index for each input field
idx_in = idx_conv % self._input_numb_field
# extract input for each field
single_channel_input = stacked_input[idx]
rep_idx = torch.tensor(indeces_channels[idx])
integral = integrals[batch_idx, idx_in, :].repeat_interleave(
rep_idx
)
# extract filter
net = self._net[idx_conv]
# perform transpose convolution for all strides in one field
staked_output = net(single_channel_input[..., :-1]).flatten()
integral = staked_output * integral
res_tmp.append(integral)
# stacking integral results and sum
# filters (for each input fields) in groups
# for different output fields
res_tmp = (
torch.stack(res_tmp)
.reshape(self._input_numb_field, self._output_numb_field, -1)
.sum(0)
)
conv_transposed[batch_idx, ..., -1] = res_tmp
return conv_transposed
def transpose_overlap(self, integrals, X):
"""
Transpose pass in the layer for overlapping filters
:param integrals: Weights for the transpose convolution. Shape
:math:`[B, N_{in}, N]`
where B is the batch_size, :math`N_{in}` is the number of input
fields, :math:`N` the number of points in the mesh, D the dimension
of the problem.
:type integral: torch.tensor
:param X: Input data. Expect tensor of shape
:math:`[B, N_{in}, M, D]` where :math:`B` is the batch_size,
:math`N_{in}`is the number of input fields, :math:`M` the number of points
in the mesh, :math:`D` the dimension of the problem.
:type X: torch.Tensor
:return: Feed forward transpose convolution. Tensor of shape
:math:`[B, N_{out}, M, D]` where :math:`B` is the batch_size,
:math`N_{out}`is the number of input fields, :math:`M` the number of points
in the mesh, :math:`D` the dimension of the problem.
:rtype: torch.Tensor
.. note:: This function is automatically called when ``.transpose()``
method is used and ``no_overlap=False``
"""
# initialize convolution
if self.training: # we choose what to do based on optimization
self._choose_initialization(X, type="inverse")
else: # we always initialize on testing
self._initialize_convolution(X, "inverse")
# initialize grid
X = self._grid_transpose.clone().detach()
conv_transposed = self._grid_transpose.clone().detach()
# list to iterate for calculating nn output
tmp = [i for i in range(self._output_numb_field)]
iterate_conv = [
item for item in tmp for _ in range(self._input_numb_field)
]
for batch_idx, x in enumerate(X):
# accumulator for the convolution on different batches
accumulator_batch = torch.zeros(
size=(
self._grid_transpose.shape[1],
self._grid_transpose.shape[2],
),
requires_grad=True,
device=X.device,
dtype=X.dtype,
).clone()
for stride_idx, current_stride in enumerate(self._stride):
# indeces of points falling into filter range
indeces = self._index[stride_idx][batch_idx]
# number of points for each channel
numb_pts_channel = tuple(indeces.sum(dim=-1))
# extracting points for each channel
point_stride = x[indeces]
# if no points to upsample we just skip
if point_stride.nelement() == 0:
continue
# mapping points in filter domain
map_points_(point_stride[..., :-1], current_stride)
# input points for kernels
# we split for extracting number of points for each channel
nn_input_pts = point_stride[..., :-1].split(numb_pts_channel)
# accumulate partial convolution results for each field
res_tmp = []
# for each channel field compute transpose convolution
for idx_conv, idx_channel_out in enumerate(iterate_conv):
# index for input channels
idx_channel_in = idx_conv % self._input_numb_field
# extract filter
net = self._net[idx_conv]
# calculate filter value
staked_output = net(nn_input_pts[idx_channel_out])
# perform integral for all strides in one field
integral = (
staked_output
* integrals[batch_idx, idx_channel_in, stride_idx]
)
# append results
res_tmp.append(integral.flatten())
# computing channel sum
channel_sum = []
start = 0
for _ in range(self._output_numb_field):
tmp = res_tmp[start : start + self._input_numb_field]
tmp = torch.vstack(tmp).sum(dim=0)
channel_sum.append(tmp)
start += self._input_numb_field
# accumulate the results
accumulator_batch[indeces] += torch.hstack(channel_sum)
# save results of accumulation for each batch
conv_transposed[batch_idx, ..., -1] = accumulator_batch
return conv_transposed

View File

@@ -1,261 +0,0 @@
""" Embedding modulus. """
import torch
from pina.utils import check_consistency
from typing import Union, Sequence
class PeriodicBoundaryEmbedding(torch.nn.Module):
r"""
Imposing hard constraint periodic boundary conditions by embedding the
input.
A periodic function :math:`u:\mathbb{R}^{\rm{in}}
\rightarrow\mathbb{R}^{\rm{out}}` periodic in the spatial
coordinates :math:`\mathbf{x}` with periods :math:`\mathbf{L}` is such that:
.. math::
u(\mathbf{x}) = u(\mathbf{x} + n \mathbf{L})\;\;
\forall n\in\mathbb{N}.
The :meth:`PeriodicBoundaryEmbedding` augments the input such that the periodic conditons
is guarantee. The input is augmented by the following formula:
.. math::
\mathbf{x} \rightarrow \tilde{\mathbf{x}} = \left[1,
\cos\left(\frac{2\pi}{L_1} x_1 \right),
\sin\left(\frac{2\pi}{L_1}x_1\right), \cdots,
\cos\left(\frac{2\pi}{L_{\rm{in}}}x_{\rm{in}}\right),
\sin\left(\frac{2\pi}{L_{\rm{in}}}x_{\rm{in}}\right)\right],
where :math:`\text{dim}(\tilde{\mathbf{x}}) = 3\text{dim}(\mathbf{x})`.
.. seealso::
**Original reference**:
1. Dong, Suchuan, and Naxian Ni (2021). *A method for representing
periodic functions and enforcing exactly periodic boundary
conditions with deep neural networks*. Journal of Computational
Physics 435, 110242.
DOI: `10.1016/j.jcp.2021.110242.
<https://doi.org/10.1016/j.jcp.2021.110242>`_
2. Wang, S., Sankaran, S., Wang, H., & Perdikaris, P. (2023). *An
expert's guide to training physics-informed neural networks*.
DOI: `arXiv preprint arXiv:2308.0846.
<https://arxiv.org/abs/2308.08468>`_
.. warning::
The embedding is a truncated fourier expansion, and only ensures
function PBC and not for its derivatives. Ensuring approximate
periodicity in
the derivatives of :math:`u` can be done, and extensive
tests have shown (also in the reference papers) that this implementation
can correctly compute the PBC on the derivatives up to the order
:math:`\sim 2,3`, while it is not guarantee the periodicity for
:math:`>3`. The PINA code is tested only for function PBC and not for
its derivatives.
"""
def __init__(self, input_dimension, periods, output_dimension=None):
"""
:param int input_dimension: The dimension of the input tensor, it can
be checked with `tensor.ndim` method.
:param float | int | dict periods: The periodicity in each dimension for
the input data. If ``float`` or ``int`` is passed,
the period is assumed constant for all the dimensions of the data.
If a ``dict`` is passed the `dict.values` represent periods,
while the ``dict.keys`` represent the dimension where the
periodicity is applied. The `dict.keys` can either be `int`
if working with ``torch.Tensor`` or ``str`` if
working with ``LabelTensor``.
:param int output_dimension: The dimension of the output after the
fourier embedding. If not ``None`` a ``torch.nn.Linear`` layer
is applied to the fourier embedding output to match the desired
dimensionality, default ``None``.
"""
super().__init__()
# check input consistency
check_consistency(periods, (float, int, dict))
check_consistency(input_dimension, int)
if output_dimension is not None:
check_consistency(output_dimension, int)
self._layer = torch.nn.Linear(input_dimension * 3, output_dimension)
else:
self._layer = torch.nn.Identity()
# checks on the periods
if isinstance(periods, dict):
if not all(
isinstance(dim, (str, int)) and isinstance(period, (float, int))
for dim, period in periods.items()
):
raise TypeError(
"In dictionary periods, keys must be integers"
" or strings, and values must be float or int."
)
self._period = periods
else:
self._period = {k: periods for k in range(input_dimension)}
def forward(self, x):
"""
Forward pass to compute the periodic boundary conditions embedding.
:param torch.Tensor x: Input tensor.
:return: Periodic embedding of the input.
:rtype: torch.Tensor
"""
omega = torch.stack(
[
torch.pi * 2.0 / torch.tensor([val], device=x.device)
for val in self._period.values()
],
dim=-1,
)
x = self._get_vars(x, list(self._period.keys()))
return self._layer(
torch.cat(
[
torch.ones_like(x),
torch.cos(omega * x),
torch.sin(omega * x),
],
dim=-1,
)
)
def _get_vars(self, x, indeces):
"""
Get variables from input tensor ordered by specific indeces.
:param torch.Tensor x: The input tensor to extract.
:param list[int] | list[str] indeces: List of indeces to extract.
:return: The extracted tensor given the indeces.
:rtype: torch.Tensor
"""
if isinstance(indeces[0], str):
try:
return x.extract(indeces)
except AttributeError:
raise RuntimeError(
"Not possible to extract input variables from tensor."
" Ensure that the passed tensor is a LabelTensor or"
" pass list of integers to extract variables. For"
" more information refer to warning in the documentation."
)
elif isinstance(indeces[0], int):
return x[..., indeces]
else:
raise RuntimeError(
"Not able to extract right indeces for tensor."
" For more information refer to warning in the documentation."
)
@property
def period(self):
"""
The period of the periodic function to approximate.
"""
return self._period
class FourierFeatureEmbedding(torch.nn.Module):
def __init__(self, input_dimension, output_dimension, sigma):
r"""
Fourier Feature Embedding class for encoding input features
using random Fourier features.This class applies a Fourier
transformation to the input features,
which can help in learning high-frequency variations in data.
If multiple sigma are provided, the class
supports multiscale feature embedding, creating embeddings for
each scale specified by the sigma.
The :obj:`FourierFeatureEmbedding` augments the input
by the following formula (3.10 of original paper):
.. math::
\mathbf{x} \rightarrow \tilde{\mathbf{x}} = \left[
\cos\left( \mathbf{B} \mathbf{x} \right),
\sin\left( \mathbf{B} \mathbf{x} \right)\right],
where :math:`\mathbf{B}_{ij} \sim \mathcal{N}(0, \sigma^2)`.
In case multiple ``sigma`` are passed, the resulting embeddings
are concateneted:
.. math::
\mathbf{x} \rightarrow \tilde{\mathbf{x}} = \left[
\cos\left( \mathbf{B}^1 \mathbf{x} \right),
\sin\left( \mathbf{B}^1 \mathbf{x} \right),
\cos\left( \mathbf{B}^2 \mathbf{x} \right),
\sin\left( \mathbf{B}^3 \mathbf{x} \right),
\dots,
\cos\left( \mathbf{B}^M \mathbf{x} \right),
\sin\left( \mathbf{B}^M \mathbf{x} \right)\right],
where :math:`\mathbf{B}^k_{ij} \sim \mathcal{N}(0, \sigma_k^2) \quad
k \in (1, \dots, M)`.
.. seealso::
**Original reference**:
Wang, Sifan, Hanwen Wang, and Paris Perdikaris. *On the eigenvector
bias of Fourier feature networks: From regression to solving
multi-scale PDEs with physics-informed neural networks.*
Computer Methods in Applied Mechanics and
Engineering 384 (2021): 113938.
DOI: `10.1016/j.cma.2021.113938.
<https://doi.org/10.1016/j.cma.2021.113938>`_
:param int input_dimension: The input vector dimension of the layer.
:param int output_dimension: The output dimension of the layer. The
output is obtained as a concatenation of the cosine and sine
embedding, hence it must be a multiple of two (even number).
:param int | float sigma: The standard deviation used for the
Fourier Embedding. This value must reflect the granularity of the
scale in the differential equation solution.
"""
super().__init__()
# check consistency
check_consistency(sigma, (int, float))
check_consistency(output_dimension, int)
check_consistency(input_dimension, int)
if output_dimension % 2:
raise RuntimeError(
"Expected output_dimension to be a even number, "
f"got {output_dimension}."
)
# assign sigma
self._sigma = sigma
# create non-trainable matrices
self._matrix = (
torch.rand(
size=(input_dimension, output_dimension // 2),
requires_grad=False,
)
* self.sigma
)
def forward(self, x):
"""
Forward pass to compute the fourier embedding.
:param torch.Tensor x: Input tensor.
:return: Fourier embeddings of the input.
:rtype: torch.Tensor
"""
# compute random matrix multiplication
out = torch.mm(x, self._matrix.to(device=x.device, dtype=x.dtype))
# return embedding
return torch.cat(
[torch.cos(2 * torch.pi * out), torch.sin(2 * torch.pi * out)],
dim=-1,
)
@property
def sigma(self):
"""
Returning the variance of the sampled matrix for Fourier Embedding.
"""
return self._sigma

View File

@@ -1,219 +0,0 @@
import torch
import torch.nn as nn
from ...utils import check_consistency
from pina.model.layers import (
SpectralConvBlock1D,
SpectralConvBlock2D,
SpectralConvBlock3D,
)
class FourierBlock1D(nn.Module):
"""
Fourier block implementation for three dimensional
input tensor. The combination of Fourier blocks
make up the Fourier Neural Operator
.. seealso::
**Original reference**: Li, Z., Kovachki, N., Azizzadenesheli, K., Liu, B.,
Bhattacharya, K., Stuart, A., & Anandkumar, A. (2020). *Fourier neural operator for
parametric partial differential equations*.
DOI: `arXiv preprint arXiv:2010.08895.
<https://arxiv.org/abs/2010.08895>`_
"""
def __init__(
self,
input_numb_fields,
output_numb_fields,
n_modes,
activation=torch.nn.Tanh,
):
super().__init__()
"""
PINA implementation of Fourier block one dimension. The module computes
the spectral convolution of the input with a linear kernel in the
fourier space, and then it maps the input back to the physical
space. The output is then added to a Linear tranformation of the
input in the physical space. Finally an activation function is
applied to the output.
The block expects an input of size ``[batch, input_numb_fields, N]``
and returns an output of size ``[batch, output_numb_fields, N]``.
:param int input_numb_fields: The number of channels for the input.
:param int output_numb_fields: The number of channels for the output.
:param list | tuple n_modes: Number of modes to select for each dimension.
It must be at most equal to the ``floor(N/2)+1``.
:param torch.nn.Module activation: The activation function.
"""
# check type consistency
check_consistency(activation(), nn.Module)
# assign variables
self._spectral_conv = SpectralConvBlock1D(
input_numb_fields=input_numb_fields,
output_numb_fields=output_numb_fields,
n_modes=n_modes,
)
self._activation = activation()
self._linear = nn.Conv1d(input_numb_fields, output_numb_fields, 1)
def forward(self, x):
"""
Forward computation for Fourier Block. It performs a spectral
convolution and a linear transformation of the input and sum the
results.
:param x: The input tensor for fourier block, expect of size
``[batch, input_numb_fields, x]``.
:type x: torch.Tensor
:return: The output tensor obtained from the
fourier block of size ``[batch, output_numb_fields, x]``.
:rtype: torch.Tensor
"""
return self._activation(self._spectral_conv(x) + self._linear(x))
class FourierBlock2D(nn.Module):
"""
Fourier block implementation for two dimensional
input tensor. The combination of Fourier blocks
make up the Fourier Neural Operator
.. seealso::
**Original reference**: Li, Zongyi, et al.
*Fourier neural operator for parametric partial
differential equations*. arXiv preprint
arXiv:2010.08895 (2020)
<https://arxiv.org/abs/2010.08895.pdf>`_.
"""
def __init__(
self,
input_numb_fields,
output_numb_fields,
n_modes,
activation=torch.nn.Tanh,
):
"""
PINA implementation of Fourier block two dimensions. The module computes
the spectral convolution of the input with a linear kernel in the
fourier space, and then it maps the input back to the physical
space. The output is then added to a Linear tranformation of the
input in the physical space. Finally an activation function is
applied to the output.
The block expects an input of size ``[batch, input_numb_fields, Nx, Ny]``
and returns an output of size ``[batch, output_numb_fields, Nx, Ny]``.
:param int input_numb_fields: The number of channels for the input.
:param int output_numb_fields: The number of channels for the output.
:param list | tuple n_modes: Number of modes to select for each dimension.
It must be at most equal to the ``floor(Nx/2)+1`` and ``floor(Ny/2)+1``.
:param torch.nn.Module activation: The activation function.
"""
super().__init__()
# check type consistency
check_consistency(activation(), nn.Module)
# assign variables
self._spectral_conv = SpectralConvBlock2D(
input_numb_fields=input_numb_fields,
output_numb_fields=output_numb_fields,
n_modes=n_modes,
)
self._activation = activation()
self._linear = nn.Conv2d(input_numb_fields, output_numb_fields, 1)
def forward(self, x):
"""
Forward computation for Fourier Block. It performs a spectral
convolution and a linear transformation of the input and sum the
results.
:param x: The input tensor for fourier block, expect of size
``[batch, input_numb_fields, x, y]``.
:type x: torch.Tensor
:return: The output tensor obtained from the
fourier block of size ``[batch, output_numb_fields, x, y, z]``.
:rtype: torch.Tensor
"""
return self._activation(self._spectral_conv(x) + self._linear(x))
class FourierBlock3D(nn.Module):
"""
Fourier block implementation for three dimensional
input tensor. The combination of Fourier blocks
make up the Fourier Neural Operator
.. seealso::
**Original reference**: Li, Zongyi, et al.
*Fourier neural operator for parametric partial
differential equations*. arXiv preprint
arXiv:2010.08895 (2020)
<https://arxiv.org/abs/2010.08895.pdf>`_.
"""
def __init__(
self,
input_numb_fields,
output_numb_fields,
n_modes,
activation=torch.nn.Tanh,
):
"""
PINA implementation of Fourier block three dimensions. The module computes
the spectral convolution of the input with a linear kernel in the
fourier space, and then it maps the input back to the physical
space. The output is then added to a Linear tranformation of the
input in the physical space. Finally an activation function is
applied to the output.
The block expects an input of size ``[batch, input_numb_fields, Nx, Ny, Nz]``
and returns an output of size ``[batch, output_numb_fields, Nx, Ny, Nz]``.
:param int input_numb_fields: The number of channels for the input.
:param int output_numb_fields: The number of channels for the output.
:param list | tuple n_modes: Number of modes to select for each dimension.
It must be at most equal to the ``floor(Nx/2)+1``, ``floor(Ny/2)+1``
and ``floor(Nz/2)+1``.
:param torch.nn.Module activation: The activation function.
"""
super().__init__()
# check type consistency
check_consistency(activation(), nn.Module)
# assign variables
self._spectral_conv = SpectralConvBlock3D(
input_numb_fields=input_numb_fields,
output_numb_fields=output_numb_fields,
n_modes=n_modes,
)
self._activation = activation()
self._linear = nn.Conv3d(input_numb_fields, output_numb_fields, 1)
def forward(self, x):
"""
Forward computation for Fourier Block. It performs a spectral
convolution and a linear transformation of the input and sum the
results.
:param x: The input tensor for fourier block, expect of size
``[batch, input_numb_fields, x, y, z]``.
:type x: torch.Tensor
:return: The output tensor obtained from the
fourier block of size ``[batch, output_numb_fields, x, y, z]``.
:rtype: torch.Tensor
"""
return self._activation(self._spectral_conv(x) + self._linear(x))

View File

@@ -1,87 +0,0 @@
import torch
from torch_geometric.nn import MessagePassing
class GNOBlock(MessagePassing):
"""
TODO: Add documentation
"""
def __init__(
self,
width,
edges_features,
n_layers=2,
layers=None,
inner_size=None,
internal_func=None,
external_func=None
):
"""
Initialize the Graph Integral Layer, inheriting from the MessagePassing class of PyTorch Geometric.
:param width: The width of the hidden representation of the nodes features
:type width: int
:param edges_features: The number of edge features.
:type edges_features: int
:param n_layers: The number of layers in the Feed Forward Neural Network used to compute the representation of the edges features.
:type n_layers: int
"""
from pina.model import FeedForward
super(GNOBlock, self).__init__(aggr='mean')
self.width = width
if layers is None and inner_size is None:
inner_size = width
self.dense = FeedForward(input_dimensions=edges_features,
output_dimensions=width ** 2,
n_layers=n_layers,
layers=layers,
inner_size=inner_size,
func=internal_func)
self.W = torch.nn.Linear(width, width)
self.func = external_func()
def message(self, x_j, edge_attr):
"""
This function computes the message passed between the nodes of the graph. Overwrite the default message function defined in the MessagePassing class.
:param x_j: The node features of the neighboring.
:type x_j: torch.Tensor
:param edge_attr: The edge features.
:type edge_attr: torch.Tensor
:return: The message passed between the nodes of the graph.
:rtype: torch.Tensor
"""
x = self.dense(edge_attr).view(-1, self.width, self.width)
return torch.einsum('bij,bj->bi', x, x_j)
def update(self, aggr_out, x):
"""
This function updates the node features of the graph. Overwrite the default update function defined in the MessagePassing class.
:param aggr_out: The aggregated messages.
:type aggr_out: torch.Tensor
:param x: The node features.
:type x: torch.Tensor
:return: The updated node features.
:rtype: torch.Tensor
"""
aggr_out = aggr_out + self.W(x)
return aggr_out
def forward(self, x, edge_index, edge_attr):
"""
The forward pass of the Graph Integral Layer.
:param x: Node features.
:type x: torch.Tensor
:param edge_index: Edge index.
:type edge_index: torch.Tensor
:param edge_attr: Edge features.
:type edge_attr: torch.Tensor
:return: Output of a single iteration over the Graph Integral Layer.
:rtype: torch.Tensor
"""
return self.func(
self.propagate(edge_index, x=x, edge_attr=edge_attr)
)

View File

@@ -1,63 +0,0 @@
import torch
class Integral(object):
def __init__(self, param):
"""Integral class for continous convolution
:param param: type of continuous convolution
:type param: string
"""
if param == "discrete":
self.make_integral = self.integral_param_disc
elif param == "continuous":
self.make_integral = self.integral_param_cont
else:
raise TypeError
def __call__(self, *args, **kwds):
return self.make_integral(*args, **kwds)
def _prepend_zero(self, x):
"""Create bins for performing integral
:param x: input tensor
:type x: torch.tensor
:return: bins for integrals
:rtype: torch.tensor
"""
return torch.cat((torch.zeros(1, dtype=x.dtype, device=x.device), x))
def integral_param_disc(self, x, y, idx):
"""Perform discretize integral
with discrete parameters
:param x: input vector
:type x: torch.tensor
:param y: input vector
:type y: torch.tensor
:param idx: indeces for different strides
:type idx: list
:return: integral
:rtype: torch.tensor
"""
cs_idxes = self._prepend_zero(torch.cumsum(torch.tensor(idx), 0))
cs = self._prepend_zero(torch.cumsum(x.flatten() * y.flatten(), 0))
return cs[cs_idxes[1:]] - cs[cs_idxes[:-1]]
def integral_param_cont(self, x, y, idx):
"""Perform discretize integral for continuous convolution
with continuous parameters
:param x: input vector
:type x: torch.tensor
:param y: input vector
:type y: torch.tensor
:param idx: indeces for different strides
:type idx: list
:return: integral
:rtype: torch.tensor
"""
raise NotImplementedError

View File

@@ -1,142 +0,0 @@
""" Module for Averaging Neural Operator Layer class. """
import torch
from pina.utils import check_consistency
import pina.model as pm # avoid circular import
class LowRankBlock(torch.nn.Module):
r"""
The PINA implementation of the inner layer of the Averaging Neural Operator.
The operator layer performs an affine transformation where the convolution
is approximated with a local average. Given the input function
:math:`v(x)\in\mathbb{R}^{\rm{emb}}` the layer computes
the operator update :math:`K(v)` as:
.. math::
K(v) = \sigma\left(Wv(x) + b + \sum_{i=1}^r \langle
\psi^{(i)} , v(x) \rangle \phi^{(i)} \right)
where:
* :math:`\mathbb{R}^{\rm{emb}}` is the embedding (hidden) size
corresponding to the ``hidden_size`` object
* :math:`\sigma` is a non-linear activation, corresponding to the
``func`` object
* :math:`W\in\mathbb{R}^{\rm{emb}\times\rm{emb}}` is a tunable matrix.
* :math:`b\in\mathbb{R}^{\rm{emb}}` is a tunable bias.
* :math:`\psi^{(i)}\in\mathbb{R}^{\rm{emb}}` and
:math:`\phi^{(i)}\in\mathbb{R}^{\rm{emb}}` are :math:`r` a low rank
basis functions mapping.
* :math:`b\in\mathbb{R}^{\rm{emb}}` is a tunable bias.
.. seealso::
**Original reference**: Kovachki, N., Li, Z., Liu, B.,
Azizzadenesheli, K., Bhattacharya, K., Stuart, A., & Anandkumar, A.
(2023). *Neural operator: Learning maps between function
spaces with applications to PDEs*. Journal of Machine Learning
Research, 24(89), 1-97.
"""
def __init__(
self,
input_dimensions,
embedding_dimenion,
rank,
inner_size=20,
n_layers=2,
func=torch.nn.Tanh,
bias=True,
):
"""
:param int input_dimensions: The number of input components of the
model.
Expected tensor shape of the form :math:`(*, d)`, where *
means any number of dimensions including none,
and :math:`d` the ``input_dimensions``.
:param int embedding_dimenion: Size of the embedding dimension of the
field.
:param int rank: The rank number of the basis approximation components
of the model. Expected tensor shape of the form :math:`(*, 2d)`,
where * means any number of dimensions including none,
and :math:`2d` the ``rank`` for both basis functions.
:param int inner_size: Number of neurons in the hidden layer(s) for the
basis function network. Default is 20.
:param int n_layers: Number of hidden layers. for the
basis function network. Default is 2.
:param func: The activation function to use for the
basis function network. If a single
:class:`torch.nn.Module` is passed, this is used as
activation function after any layers, except the last one.
If a list of Modules is passed,
they are used as activation functions at any layers, in order.
:param bool bias: If ``True`` the MLP will consider some bias for the
basis function network.
"""
super().__init__()
# Assignment (check consistency inside FeedForward)
self._basis = pm.FeedForward(
input_dimensions=input_dimensions,
output_dimensions=2 * rank * embedding_dimenion,
inner_size=inner_size,
n_layers=n_layers,
func=func,
bias=bias,
)
self._nn = torch.nn.Linear(embedding_dimenion, embedding_dimenion)
check_consistency(rank, int)
self._rank = rank
self._func = func()
def forward(self, x, coords):
r"""
Forward pass of the layer, it performs an affine transformation of
the field, and a low rank approximation by
doing a dot product of the basis
:math:`\psi^{(i)}` with the filed vector :math:`v`, and use this
coefficients to expand :math:`\phi^{(i)}` evaluated in the
spatial input :math:`x`.
:param torch.Tensor x: The input tensor for performing the
computation. It expects a tensor :math:`B \times N \times D`,
where :math:`B` is the batch_size, :math:`N` the number of points
in the mesh, :math:`D` the dimension of the problem. In particular
:math:`D` is the codomain of the function :math:`v`. For example
a scalar function has :math:`D=1`, a 4-dimensional vector function
:math:`D=4`.
:param torch.Tensor coords: The coordinates in which the field is
evaluated for performing the computation. It expects a
tensor :math:`B \times N \times d`,
where :math:`B` is the batch_size, :math:`N` the number of points
in the mesh, :math:`D` the dimension of the domain.
:return: The output tensor obtained from Average Neural Operator Block.
:rtype: torch.Tensor
"""
# extract basis
coords = coords.as_subclass(torch.Tensor)
basis = self._basis(coords)
# reshape [B, N, D, 2*rank]
shape = list(basis.shape[:-1]) + [-1, 2 * self.rank]
basis = basis.reshape(shape)
# divide
psi = basis[..., : self.rank]
phi = basis[..., self.rank :]
# compute dot product
coeff = torch.einsum("...dr,...d->...r", psi, x)
# expand the basis
expansion = torch.einsum("...r,...dr->...d", coeff, phi)
# apply linear layer and return
return self._func(self._nn(x) + expansion)
@property
def rank(self):
"""
The basis rank.
"""
return self._rank

View File

@@ -1,11 +0,0 @@
""" Module for Averaging Neural Operator Layer class. """
from torch import nn, mean
from torch_geometric.nn import MessagePassing, InstanceNorm, radius_graph
from pina.utils import check_consistency
class MessagePassingBlock(nn.Module):

View File

@@ -1,121 +0,0 @@
"""Module for OrthogonalBlock."""
import torch
from ...utils import check_consistency
class OrthogonalBlock(torch.nn.Module):
"""
Module to make the input orthonormal.
The module takes a tensor of size :math:`[N, M]` and returns a tensor of
size :math:`[N, M]` where the columns are orthonormal. The block performs a
Gram Schmidt orthogonalization process for the input, see
`here <https://en.wikipedia.org/wiki/Gram%E2%80%93Schmidt_process>` for
details.
"""
def __init__(self, dim=-1, requires_grad=True):
"""
Initialize the OrthogonalBlock module.
:param int dim: The dimension where to orthogonalize.
:param bool requires_grad: If autograd should record operations on
the returned tensor, defaults to True.
"""
super().__init__()
# store dim
self.dim = dim
# store requires_grad
check_consistency(requires_grad, bool)
self._requires_grad = requires_grad
def forward(self, X):
"""
Forward pass of the OrthogonalBlock module using a Gram-Schmidt
algorithm.
:raises Warning: If the dimension is greater than the other dimensions.
:param torch.Tensor X: The input tensor to orthogonalize. The input must
be of dimensions :math:`[N, M]`.
:return: The orthonormal tensor.
"""
# check dim is less than all the other dimensions
if X.shape[self.dim] > min(X.shape):
raise Warning(
"The dimension where to orthogonalize is greater"
" than the other dimensions"
)
result = torch.zeros_like(X, requires_grad=self._requires_grad)
X_0 = torch.select(X, self.dim, 0).clone()
result_0 = X_0 / torch.linalg.norm(X_0)
result = self._differentiable_copy(result, 0, result_0)
# iterate over the rest of the basis with Gram-Schmidt
for i in range(1, X.shape[self.dim]):
v = torch.select(X, self.dim, i).clone()
for j in range(i):
vj = torch.select(result, self.dim, j).clone()
v = v - torch.sum(v * vj, dim=self.dim, keepdim=True) * vj
# result_i = torch.select(result, self.dim, i)
result_i = v / torch.linalg.norm(v)
result = self._differentiable_copy(result, i, result_i)
return result
def _differentiable_copy(self, result, idx, value):
"""
Perform a differentiable copy operation on a tensor.
:param torch.Tensor result: The tensor where values will be copied to.
:param int idx: The index along the specified dimension where the
value will be copied.
:param torch.Tensor value: The tensor value to copy into the
result tensor.
:return: A new tensor with the copied values.
:rtype: torch.Tensor
"""
return result.index_copy(
self.dim, torch.tensor([idx]), value.unsqueeze(self.dim)
)
@property
def dim(self):
"""
Get the dimension along which operations are performed.
:return: The current dimension value.
:rtype: int
"""
return self._dim
@dim.setter
def dim(self, value):
"""
Set the dimension along which operations are performed.
:param value: The dimension to be set, which must be 0, 1, or -1.
:type value: int
:raises IndexError: If the provided dimension is not in the
range [-1, 1].
"""
# check consistency
check_consistency(value, int)
if value not in [0, 1, -1]:
raise IndexError(
"Dimension out of range (expected to be in "
f"range of [-1, 1], but got {value})"
)
# assign value
self._dim = value
@property
def requires_grad(self):
"""
Indicates whether gradient computation is required for operations
on the tensors.
:return: True if gradients are required, False otherwise.
:rtype: bool
"""
return self._requires_grad

View File

@@ -1,195 +0,0 @@
"""Module for Base Continuous Convolution class."""
from abc import ABCMeta, abstractmethod
import torch
from .stride import Stride
from .utils_convolution import optimizing
import warnings
class PODBlock(torch.nn.Module):
"""
POD layer: it projects the input field on the proper orthogonal
decomposition basis. It needs to be fitted to the data before being used
with the method :meth:`fit`, which invokes the singular value decomposition.
The layer is not trainable.
.. note::
All the POD modes are stored in memory, avoiding to recompute them when the rank changes but increasing the memory usage.
"""
def __init__(self, rank, scale_coefficients=True):
"""
Build the POD layer with the given rank.
:param int rank: The rank of the POD layer.
:param bool scale_coefficients: If True, the coefficients are scaled
after the projection to have zero mean and unit variance.
"""
super().__init__()
self.__scale_coefficients = scale_coefficients
self._basis = None
self._scaler = None
self._rank = rank
@property
def rank(self):
"""
The rank of the POD layer.
:rtype: int
"""
return self._rank
@rank.setter
def rank(self, value):
if value < 1 or not isinstance(value, int):
raise ValueError("The rank must be positive integer")
self._rank = value
@property
def basis(self):
"""
The POD basis. It is a matrix whose columns are the first `self.rank` POD modes.
:rtype: torch.Tensor
"""
if self._basis is None:
return None
return self._basis[: self.rank]
@property
def scaler(self):
"""
The scaler. It is a dictionary with the keys `'mean'` and `'std'` that
store the mean and the standard deviation of the coefficients.
:rtype: dict
"""
if self._scaler is None:
return
return {
"mean": self._scaler["mean"][: self.rank],
"std": self._scaler["std"][: self.rank],
}
@property
def scale_coefficients(self):
"""
If True, the coefficients are scaled after the projection to have zero
mean and unit variance.
:rtype: bool
"""
return self.__scale_coefficients
def fit(self, X, randomized=True):
"""
Set the POD basis by performing the singular value decomposition of the
given tensor. If `self.scale_coefficients` is True, the coefficients
are scaled after the projection to have zero mean and unit variance.
:param torch.Tensor X: The tensor to be reduced.
"""
self._fit_pod(X, randomized)
if self.__scale_coefficients:
self._fit_scaler(torch.matmul(self._basis, X.T))
def _fit_scaler(self, coeffs):
"""
Private merhod that computes the mean and the standard deviation of the
given coefficients, allowing to scale them to have zero mean and unit
variance. Mean and standard deviation are stored in the private member
`_scaler`.
:param torch.Tensor coeffs: The coefficients to be scaled.
"""
self._scaler = {
"std": torch.std(coeffs, dim=1),
"mean": torch.mean(coeffs, dim=1),
}
def _fit_pod(self, X, randomized):
"""
Private method that computes the POD basis of the given tensor and stores it in the private member `_basis`.
:param torch.Tensor X: The tensor to be reduced.
"""
if X.device.type == "mps": # svd_lowrank not arailable for mps
warnings.warn(
"svd_lowrank not available for mps, using svd instead."
"This may slow down computations.",
ResourceWarning,
)
self._basis = torch.svd(X.T)[0].T
else:
if randomized:
warnings.warn(
"Considering a randomized algorithm to compute the POD basis"
)
self._basis = torch.svd_lowrank(X.T, q=X.shape[0])[0].T
else:
self._basis = torch.svd(X.T)[0].T
def forward(self, X):
"""
The forward pass of the POD layer. By default it executes the
:meth:`reduce` method, reducing the input tensor to its POD
representation. The POD layer needs to be fitted before being used.
:param torch.Tensor X: The input tensor to be reduced.
:return: The reduced tensor.
:rtype: torch.Tensor
"""
return self.reduce(X)
def reduce(self, X):
"""
Reduce the input tensor to its POD representation. The POD layer needs
to be fitted before being used.
:param torch.Tensor X: The input tensor to be reduced.
:return: The reduced tensor.
:rtype: torch.Tensor
"""
if self._basis is None:
raise RuntimeError(
"The POD layer needs to be fitted before being used."
)
coeff = torch.matmul(self.basis, X.T)
if coeff.ndim == 1:
coeff = coeff.unsqueeze(1)
coeff = coeff.T
if self.__scale_coefficients:
coeff = (coeff - self.scaler["mean"]) / self.scaler["std"]
return coeff
def expand(self, coeff):
"""
Expand the given coefficients to the original space. The POD layer needs
to be fitted before being used.
:param torch.Tensor coeff: The coefficients to be expanded.
:return: The expanded tensor.
:rtype: torch.Tensor
"""
if self._basis is None:
raise RuntimeError(
"The POD layer needs to be trained before being used."
)
if self.__scale_coefficients:
coeff = coeff * self.scaler["std"] + self.scaler["mean"]
predicted = torch.matmul(self.basis.T, coeff.T).T
if predicted.ndim == 1:
predicted = predicted.unsqueeze(0)
return predicted

View File

@@ -1,453 +0,0 @@
"""Module for Radial Basis Function Interpolation layer."""
import math
import warnings
from itertools import combinations_with_replacement
import torch
from ...utils import check_consistency
def linear(r):
"""
Linear radial basis function.
"""
return -r
def thin_plate_spline(r, eps=1e-7):
"""
Thin plate spline radial basis function.
"""
r = torch.clamp(r, min=eps)
return r**2 * torch.log(r)
def cubic(r):
"""
Cubic radial basis function.
"""
return r**3
def quintic(r):
"""
Quintic radial basis function.
"""
return -(r**5)
def multiquadric(r):
"""
Multiquadric radial basis function.
"""
return -torch.sqrt(r**2 + 1)
def inverse_multiquadric(r):
"""
Inverse multiquadric radial basis function.
"""
return 1 / torch.sqrt(r**2 + 1)
def inverse_quadratic(r):
"""
Inverse quadratic radial basis function.
"""
return 1 / (r**2 + 1)
def gaussian(r):
"""
Gaussian radial basis function.
"""
return torch.exp(-(r**2))
radial_functions = {
"linear": linear,
"thin_plate_spline": thin_plate_spline,
"cubic": cubic,
"quintic": quintic,
"multiquadric": multiquadric,
"inverse_multiquadric": inverse_multiquadric,
"inverse_quadratic": inverse_quadratic,
"gaussian": gaussian,
}
scale_invariant = {"linear", "thin_plate_spline", "cubic", "quintic"}
min_degree_funcs = {
"multiquadric": 0,
"linear": 0,
"thin_plate_spline": 1,
"cubic": 1,
"quintic": 2,
}
class RBFBlock(torch.nn.Module):
"""
Radial Basis Function (RBF) interpolation layer. It need to be fitted with
the data with the method :meth:`fit`, before it can be used to interpolate
new points. The layer is not trainable.
.. note::
It reproduces the implementation of ``scipy.interpolate.RBFBlock`` and
it is inspired from the implementation in `torchrbf.
<https://github.com/ArmanMaesumi/torchrbf>`_
"""
def __init__(
self,
neighbors=None,
smoothing=0.0,
kernel="thin_plate_spline",
epsilon=None,
degree=None,
):
"""
:param int neighbors: Number of neighbors to use for the
interpolation.
If ``None``, use all data points.
:param float smoothing: Smoothing parameter for the interpolation.
if 0.0, the interpolation is exact and no smoothing is applied.
:param str kernel: Radial basis function to use. Must be one of
``linear``, ``thin_plate_spline``, ``cubic``, ``quintic``,
``multiquadric``, ``inverse_multiquadric``, ``inverse_quadratic``,
or ``gaussian``.
:param float epsilon: Shape parameter that scaled the input to
the RBF. This defaults to 1 for kernels in ``scale_invariant``
dictionary, and must be specified for other kernels.
:param int degree: Degree of the added polynomial.
For some kernels, there exists a minimum degree of the polynomial
such that the RBF is well-posed. Those minimum degrees are specified
in the `min_degree_funcs` dictionary above. If `degree` is less than
the minimum degree, a warning is raised and the degree is set to the
minimum value.
"""
super().__init__()
check_consistency(neighbors, (int, type(None)))
check_consistency(smoothing, (int, float, torch.Tensor))
check_consistency(kernel, str)
check_consistency(epsilon, (float, type(None)))
check_consistency(degree, (int, type(None)))
self.neighbors = neighbors
self.smoothing = smoothing
self.kernel = kernel
self.epsilon = epsilon
self.degree = degree
self.powers = None
# initialize data points and values
self.y = None
self.d = None
# initialize attributes for the fitted model
self._shift = None
self._scale = None
self._coeffs = None
@property
def smoothing(self):
"""
Smoothing parameter for the interpolation.
:rtype: float
"""
return self._smoothing
@smoothing.setter
def smoothing(self, value):
self._smoothing = value
@property
def kernel(self):
"""
Radial basis function to use.
:rtype: str
"""
return self._kernel
@kernel.setter
def kernel(self, value):
if value not in radial_functions:
raise ValueError(f"Unknown kernel: {value}")
self._kernel = value.lower()
@property
def epsilon(self):
"""
Shape parameter that scaled the input to the RBF.
:rtype: float
"""
return self._epsilon
@epsilon.setter
def epsilon(self, value):
if value is None:
if self.kernel in scale_invariant:
value = 1.0
else:
raise ValueError("Must specify `epsilon` for this kernel.")
else:
value = float(value)
self._epsilon = value
@property
def degree(self):
"""
Degree of the added polynomial.
:rtype: int
"""
return self._degree
@degree.setter
def degree(self, value):
min_degree = min_degree_funcs.get(self.kernel, -1)
if value is None:
value = max(min_degree, 0)
else:
value = int(value)
if value < -1:
raise ValueError("`degree` must be at least -1.")
if value < min_degree:
warnings.warn(
"`degree` is too small for this kernel. Setting to "
f"{min_degree}.",
UserWarning,
)
self._degree = value
def _check_data(self, y, d):
if y.ndim != 2:
raise ValueError("y must be a 2-dimensional tensor.")
if d.shape[0] != y.shape[0]:
raise ValueError(
"The first dim of d must have the same length as "
"the first dim of y."
)
if isinstance(self.smoothing, (int, float)):
self.smoothing = (
torch.full((y.shape[0],), self.smoothing).float().to(y.device)
)
def fit(self, y, d):
"""
Fit the RBF interpolator to the data.
:param torch.Tensor y: (n, d) tensor of data points.
:param torch.Tensor d: (n, m) tensor of data values.
"""
self._check_data(y, d)
self.y = y
self.d = d
if self.neighbors is None:
nobs = self.y.shape[0]
else:
raise NotImplementedError("neighbors currently not supported")
powers = RBFBlock.monomial_powers(self.y.shape[1], self.degree).to(
y.device
)
if powers.shape[0] > nobs:
raise ValueError(
"The data is not compatible with the requested degree."
)
if self.neighbors is None:
self._shift, self._scale, self._coeffs = RBFBlock.solve(
self.y,
self.d.reshape((self.y.shape[0], -1)),
self.smoothing,
self.kernel,
self.epsilon,
powers,
)
self.powers = powers
def forward(self, x):
"""
Returns the interpolated data at the given points `x`.
:param torch.Tensor x: `(n, d)` tensor of points at which
to query the interpolator
:rtype: `(n, m)` torch.Tensor of interpolated data.
"""
if x.ndim != 2:
raise ValueError("`x` must be a 2-dimensional tensor.")
nx, ndim = x.shape
if ndim != self.y.shape[1]:
raise ValueError(
"Expected the second dim of `x` to have length "
f"{self.y.shape[1]}."
)
kernel_func = radial_functions[self.kernel]
yeps = self.y * self.epsilon
xeps = x * self.epsilon
xhat = (x - self._shift) / self._scale
kv = RBFBlock.kernel_vector(xeps, yeps, kernel_func)
p = RBFBlock.polynomial_matrix(xhat, self.powers)
vec = torch.cat([kv, p], dim=1)
out = torch.matmul(vec, self._coeffs)
out = out.reshape((nx,) + self.d.shape[1:])
return out
@staticmethod
def kernel_vector(x, y, kernel_func):
"""
Evaluate radial functions with centers `y` for all points in `x`.
:param torch.Tensor x: `(n, d)` tensor of points.
:param torch.Tensor y: `(m, d)` tensor of centers.
:param str kernel_func: Radial basis function to use.
:rtype: `(n, m)` torch.Tensor of radial function values.
"""
return kernel_func(torch.cdist(x, y))
@staticmethod
def polynomial_matrix(x, powers):
"""
Evaluate monomials at `x` with given `powers`.
:param torch.Tensor x: `(n, d)` tensor of points.
:param torch.Tensor powers: `(r, d)` tensor of powers for each monomial.
:rtype: `(n, r)` torch.Tensor of monomial values.
"""
x_ = torch.repeat_interleave(x, repeats=powers.shape[0], dim=0)
powers_ = powers.repeat(x.shape[0], 1)
return torch.prod(x_**powers_, dim=1).view(x.shape[0], powers.shape[0])
@staticmethod
def kernel_matrix(x, kernel_func):
"""
Returns radial function values for all pairs of points in `x`.
:param torch.Tensor x: `(n, d`) tensor of points.
:param str kernel_func: Radial basis function to use.
:rtype: `(n, n`) torch.Tensor of radial function values.
"""
return kernel_func(torch.cdist(x, x))
@staticmethod
def monomial_powers(ndim, degree):
"""
Return the powers for each monomial in a polynomial.
:param int ndim: Number of variables in the polynomial.
:param int degree: Degree of the polynomial.
:rtype: `(nmonos, ndim)` torch.Tensor where each row contains the powers
for each variable in a monomial.
"""
nmonos = math.comb(degree + ndim, ndim)
out = torch.zeros((nmonos, ndim), dtype=torch.int32)
count = 0
for deg in range(degree + 1):
for mono in combinations_with_replacement(range(ndim), deg):
for var in mono:
out[count, var] += 1
count += 1
return out
@staticmethod
def build(y, d, smoothing, kernel, epsilon, powers):
"""
Build the RBF linear system.
:param torch.Tensor y: (n, d) tensor of data points.
:param torch.Tensor d: (n, m) tensor of data values.
:param torch.Tensor smoothing: (n,) tensor of smoothing parameters.
:param str kernel: Radial basis function to use.
:param float epsilon: Shape parameter that scaled the input to the RBF.
:param torch.Tensor powers: (r, d) tensor of powers for each monomial.
:rtype: (lhs, rhs, shift, scale) where `lhs` and `rhs` are the
left-hand side and right-hand side of the linear system, and
`shift` and `scale` are the shift and scale parameters.
"""
p = d.shape[0]
s = d.shape[1]
r = powers.shape[0]
kernel_func = radial_functions[kernel]
mins = torch.min(y, dim=0).values
maxs = torch.max(y, dim=0).values
shift = (maxs + mins) / 2
scale = (maxs - mins) / 2
scale[scale == 0.0] = 1.0
yeps = y * epsilon
yhat = (y - shift) / scale
lhs = torch.empty((p + r, p + r), device=d.device).float()
lhs[:p, :p] = RBFBlock.kernel_matrix(yeps, kernel_func)
lhs[:p, p:] = RBFBlock.polynomial_matrix(yhat, powers)
lhs[p:, :p] = lhs[:p, p:].T
lhs[p:, p:] = 0.0
lhs[:p, :p] += torch.diag(smoothing)
rhs = torch.empty((r + p, s), device=d.device).float()
rhs[:p] = d
rhs[p:] = 0.0
return lhs, rhs, shift, scale
@staticmethod
def solve(y, d, smoothing, kernel, epsilon, powers):
"""
Build then solve the RBF linear system.
:param torch.Tensor y: (n, d) tensor of data points.
:param torch.Tensor d: (n, m) tensor of data values.
:param torch.Tensor smoothing: (n,) tensor of smoothing parameters.
:param str kernel: Radial basis function to use.
:param float epsilon: Shape parameter that scaled the input to the RBF.
:param torch.Tensor powers: (r, d) tensor of powers for each monomial.
:raises ValueError: If the linear system is singular.
:rtype: (shift, scale, coeffs) where `shift` and `scale` are the
shift and scale parameters, and `coeffs` are the coefficients
of the interpolator
"""
lhs, rhs, shift, scale = RBFBlock.build(
y, d, smoothing, kernel, epsilon, powers
)
try:
coeffs = torch.linalg.solve(lhs, rhs)
except RuntimeError as e:
msg = "Singular matrix."
nmonos = powers.shape[0]
if nmonos > 0:
pmat = RBFBlock.polynomial_matrix((y - shift) / scale, powers)
rank = torch.linalg.matrix_rank(pmat)
if rank < nmonos:
msg = (
"Singular matrix. The matrix of monomials evaluated at "
"the data point coordinates does not have full column "
f"rank ({rank}/{nmonos})."
)
raise ValueError(msg) from e
return shift, scale, coeffs

View File

@@ -1,164 +0,0 @@
import torch
import torch.nn as nn
from ...utils import check_consistency
class ResidualBlock(nn.Module):
"""Residual block base class. Implementation of a residual block.
.. seealso::
**Original reference**: He, Kaiming, et al.
*Deep residual learning for image recognition.*
Proceedings of the IEEE conference on computer vision
and pattern recognition. 2016..
DOI: `<https://arxiv.org/pdf/1512.03385.pdf>`_.
"""
def __init__(
self,
input_dim,
output_dim,
hidden_dim,
spectral_norm=False,
activation=torch.nn.ReLU(),
):
"""
Initializes the ResidualBlock module.
:param int input_dim: Dimension of the input to pass to the
feedforward linear layer.
:param int output_dim: Dimension of the output from the
residual layer.
:param int hidden_dim: Hidden dimension for mapping the input
(first block).
:param bool spectral_norm: Apply spectral normalization to feedforward
layers, defaults to False.
:param torch.nn.Module activation: Cctivation function after first block.
"""
super().__init__()
# check consistency
check_consistency(spectral_norm, bool)
check_consistency(input_dim, int)
check_consistency(output_dim, int)
check_consistency(hidden_dim, int)
check_consistency(activation, torch.nn.Module)
# assign variables
self._spectral_norm = spectral_norm
self._input_dim = input_dim
self._output_dim = output_dim
self._hidden_dim = hidden_dim
self._activation = activation
# create layers
self._l1 = self._spect_norm(nn.Linear(input_dim, hidden_dim))
self._l2 = self._spect_norm(nn.Linear(hidden_dim, output_dim))
self._l3 = self._spect_norm(nn.Linear(input_dim, output_dim))
def forward(self, x):
"""Forward pass for residual block layer.
:param torch.Tensor x: Input tensor for the residual layer.
:return: Output tensor for the residual layer.
:rtype: torch.Tensor
"""
y = self._activation(self._l1(x))
y = self._l2(y)
x = self._l3(x)
return y + x
def _spect_norm(self, x):
"""Perform spectral norm on the layers.
:param x: A torch.nn.Module Linear layer
:type x: torch.nn.Module
:return: The spectral norm of the layer
:rtype: torch.nn.Module
"""
return nn.utils.spectral_norm(x) if self._spectral_norm else x
import torch
import torch.nn as nn
class EnhancedLinear(torch.nn.Module):
"""
A wrapper class for enhancing a linear layer with activation and/or dropout.
:param layer: The linear layer to be enhanced.
:type layer: torch.nn.Module
:param activation: The activation function to be applied after the linear layer.
:type activation: torch.nn.Module
:param dropout: The dropout probability to be applied after the activation (if provided).
:type dropout: float
:Example:
>>> linear_layer = torch.nn.Linear(10, 20)
>>> activation = torch.nn.ReLU()
>>> dropout_prob = 0.5
>>> enhanced_linear = EnhancedLinear(linear_layer, activation, dropout_prob)
"""
def __init__(self, layer, activation=None, dropout=None):
"""
Initializes the EnhancedLinear module.
:param layer: The linear layer to be enhanced.
:type layer: torch.nn.Module
:param activation: The activation function to be applied after the linear layer.
:type activation: torch.nn.Module
:param dropout: The dropout probability to be applied after the activation (if provided).
:type dropout: float
"""
super().__init__()
# check consistency
check_consistency(layer, nn.Module)
if activation is not None:
check_consistency(activation, nn.Module)
if dropout is not None:
check_consistency(dropout, float)
# assign forward
if (dropout is None) and (activation is None):
self._model = torch.nn.Sequential(layer)
elif (dropout is None) and (activation is not None):
self._model = torch.nn.Sequential(layer, activation)
elif (dropout is not None) and (activation is None):
self._model = torch.nn.Sequential(layer, self._drop(dropout))
elif (dropout is not None) and (activation is not None):
self._model = torch.nn.Sequential(
layer, activation, self._drop(dropout)
)
def forward(self, x):
"""
Forward pass through the enhanced linear module.
:param x: Input tensor.
:type x: torch.Tensor
:return: Output tensor after passing through the enhanced linear module.
:rtype: torch.Tensor
"""
return self._model(x)
def _drop(self, p):
"""
Applies dropout with probability p.
:param p: Dropout probability.
:type p: float
:return: Dropout layer with the specified probability.
:rtype: torch.nn.Dropout
"""
return torch.nn.Dropout(p)

View File

@@ -1,407 +0,0 @@
import torch
import torch.nn as nn
from ...utils import check_consistency
import warnings
######## 1D Spectral Convolution ###########
class SpectralConvBlock1D(nn.Module):
"""
PINA implementation of Spectral Convolution Block for one
dimensional tensors.
"""
def __init__(self, input_numb_fields, output_numb_fields, n_modes):
"""
The module computes the spectral convolution of the input with a linear kernel in the
fourier space, and then it maps the input back to the physical
space.
The block expects an input of size ``[batch, input_numb_fields, N]``
and returns an output of size ``[batch, output_numb_fields, N]``.
:param int input_numb_fields: The number of channels for the input.
:param int output_numb_fields: The number of channels for the output.
:param int n_modes: Number of modes to select, it must be at most equal
to the ``floor(N/2)+1``.
"""
super().__init__()
# check type consistency
check_consistency(input_numb_fields, int)
check_consistency(output_numb_fields, int)
# assign variables
self._modes = n_modes
self._input_channels = input_numb_fields
self._output_channels = output_numb_fields
# scaling factor
scale = 1.0 / (self._input_channels * self._output_channels)
self._weights = nn.Parameter(
scale
* torch.rand(
self._input_channels,
self._output_channels,
self._modes,
dtype=torch.cfloat,
)
)
def _compute_mult1d(self, input, weights):
"""
Compute the matrix multiplication of the input
with the linear kernel weights.
:param input: The input tensor, expect of size
``[batch, input_numb_fields, x]``.
:type input: torch.Tensor
:param weights: The kernel weights, expect of
size ``[input_numb_fields, output_numb_fields, x]``.
:type weights: torch.Tensor
:return: The matrix multiplication of the input
with the linear kernel weights.
:rtype: torch.Tensor
"""
return torch.einsum("bix,iox->box", input, weights)
def forward(self, x):
"""
Forward computation for Spectral Convolution.
:param x: The input tensor, expect of size
``[batch, input_numb_fields, x]``.
:type x: torch.Tensor
:return: The output tensor obtained from the
spectral convolution of size ``[batch, output_numb_fields, x]``.
:rtype: torch.Tensor
"""
batch_size = x.shape[0]
# Compute Fourier transform of the input
x_ft = torch.fft.rfft(x)
# Multiply relevant Fourier modes
out_ft = torch.zeros(
batch_size,
self._output_channels,
x.size(-1) // 2 + 1,
device=x.device,
dtype=torch.cfloat,
)
out_ft[:, :, : self._modes] = self._compute_mult1d(
x_ft[:, :, : self._modes], self._weights
)
# Return to physical space
return torch.fft.irfft(out_ft, n=x.size(-1))
######## 2D Spectral Convolution ###########
class SpectralConvBlock2D(nn.Module):
"""
PINA implementation of spectral convolution block for two
dimensional tensors.
"""
def __init__(self, input_numb_fields, output_numb_fields, n_modes):
"""
The module computes the spectral convolution of the input with a linear kernel in the
fourier space, and then it maps the input back to the physical
space.
The block expects an input of size ``[batch, input_numb_fields, Nx, Ny]``
and returns an output of size ``[batch, output_numb_fields, Nx, Ny]``.
:param int input_numb_fields: The number of channels for the input.
:param int output_numb_fields: The number of channels for the output.
:param list | tuple n_modes: Number of modes to select for each dimension.
It must be at most equal to the ``floor(Nx/2)+1`` and ``floor(Ny/2)+1``.
"""
super().__init__()
# check type consistency
check_consistency(input_numb_fields, int)
check_consistency(output_numb_fields, int)
check_consistency(n_modes, int)
if isinstance(n_modes, (tuple, list)):
if len(n_modes) != 2:
raise ValueError(
"Expected n_modes to be a list or tuple of len two, "
"with each entry corresponding to the number of modes "
"for each dimension "
)
elif isinstance(n_modes, int):
n_modes = [n_modes] * 2
else:
raise ValueError(
"Expected n_modes to be a list or tuple of len two, "
"with each entry corresponding to the number of modes "
"for each dimension; or an int value representing the "
"number of modes for all dimensions"
)
# assign variables
self._modes = n_modes
self._input_channels = input_numb_fields
self._output_channels = output_numb_fields
# scaling factor
scale = 1.0 / (self._input_channels * self._output_channels)
self._weights1 = nn.Parameter(
scale
* torch.rand(
self._input_channels,
self._output_channels,
self._modes[0],
self._modes[1],
dtype=torch.cfloat,
)
)
self._weights2 = nn.Parameter(
scale
* torch.rand(
self._input_channels,
self._output_channels,
self._modes[0],
self._modes[1],
dtype=torch.cfloat,
)
)
def _compute_mult2d(self, input, weights):
"""
Compute the matrix multiplication of the input
with the linear kernel weights.
:param input: The input tensor, expect of size
``[batch, input_numb_fields, x, y]``.
:type input: torch.Tensor
:param weights: The kernel weights, expect of
size ``[input_numb_fields, output_numb_fields, x, y]``.
:type weights: torch.Tensor
:return: The matrix multiplication of the input
with the linear kernel weights.
:rtype: torch.Tensor
"""
return torch.einsum("bixy,ioxy->boxy", input, weights)
def forward(self, x):
"""
Forward computation for Spectral Convolution.
:param x: The input tensor, expect of size
``[batch, input_numb_fields, x, y]``.
:type x: torch.Tensor
:return: The output tensor obtained from the
spectral convolution of size ``[batch, output_numb_fields, x, y]``.
:rtype: torch.Tensor
"""
batch_size = x.shape[0]
# Compute Fourier transform of the input
x_ft = torch.fft.rfft2(x)
# Multiply relevant Fourier modes
out_ft = torch.zeros(
batch_size,
self._output_channels,
x.size(-2),
x.size(-1) // 2 + 1,
device=x.device,
dtype=torch.cfloat,
)
out_ft[:, :, : self._modes[0], : self._modes[1]] = self._compute_mult2d(
x_ft[:, :, : self._modes[0], : self._modes[1]], self._weights1
)
out_ft[:, :, -self._modes[0] :, : self._modes[1] :] = (
self._compute_mult2d(
x_ft[:, :, -self._modes[0] :, : self._modes[1]], self._weights2
)
)
# Return to physical space
return torch.fft.irfft2(out_ft, s=(x.size(-2), x.size(-1)))
######## 3D Spectral Convolution ###########
class SpectralConvBlock3D(nn.Module):
"""
PINA implementation of spectral convolution block for three
dimensional tensors.
"""
def __init__(self, input_numb_fields, output_numb_fields, n_modes):
"""
The module computes the spectral convolution of the input with a linear kernel in the
fourier space, and then it maps the input back to the physical
space.
The block expects an input of size ``[batch, input_numb_fields, Nx, Ny, Nz]``
and returns an output of size ``[batch, output_numb_fields, Nx, Ny, Nz]``.
:param int input_numb_fields: The number of channels for the input.
:param int output_numb_fields: The number of channels for the output.
:param list | tuple n_modes: Number of modes to select for each dimension.
It must be at most equal to the ``floor(Nx/2)+1``, ``floor(Ny/2)+1``
and ``floor(Nz/2)+1``.
"""
super().__init__()
# check type consistency
check_consistency(input_numb_fields, int)
check_consistency(output_numb_fields, int)
check_consistency(n_modes, int)
if isinstance(n_modes, (tuple, list)):
if len(n_modes) != 3:
raise ValueError(
"Expected n_modes to be a list or tuple of len three, "
"with each entry corresponding to the number of modes "
"for each dimension "
)
elif isinstance(n_modes, int):
n_modes = [n_modes] * 3
else:
raise ValueError(
"Expected n_modes to be a list or tuple of len three, "
"with each entry corresponding to the number of modes "
"for each dimension; or an int value representing the "
"number of modes for all dimensions"
)
# assign variables
self._modes = n_modes
self._input_channels = input_numb_fields
self._output_channels = output_numb_fields
# scaling factor
scale = 1.0 / (self._input_channels * self._output_channels)
self._weights1 = nn.Parameter(
scale
* torch.rand(
self._input_channels,
self._output_channels,
self._modes[0],
self._modes[1],
self._modes[2],
dtype=torch.cfloat,
)
)
self._weights2 = nn.Parameter(
scale
* torch.rand(
self._input_channels,
self._output_channels,
self._modes[0],
self._modes[1],
self._modes[2],
dtype=torch.cfloat,
)
)
self._weights3 = nn.Parameter(
scale
* torch.rand(
self._input_channels,
self._output_channels,
self._modes[0],
self._modes[1],
self._modes[2],
dtype=torch.cfloat,
)
)
self._weights4 = nn.Parameter(
scale
* torch.rand(
self._input_channels,
self._output_channels,
self._modes[0],
self._modes[1],
self._modes[2],
dtype=torch.cfloat,
)
)
def _compute_mult3d(self, input, weights):
"""
Compute the matrix multiplication of the input
with the linear kernel weights.
:param input: The input tensor, expect of size
``[batch, input_numb_fields, x, y, z]``.
:type input: torch.Tensor
:param weights: The kernel weights, expect of
size ``[input_numb_fields, output_numb_fields, x, y, z]``.
:type weights: torch.Tensor
:return: The matrix multiplication of the input
with the linear kernel weights.
:rtype: torch.Tensor
"""
return torch.einsum("bixyz,ioxyz->boxyz", input, weights)
def forward(self, x):
"""
Forward computation for Spectral Convolution.
:param x: The input tensor, expect of size
``[batch, input_numb_fields, x, y, z]``.
:type x: torch.Tensor
:return: The output tensor obtained from the
spectral convolution of size ``[batch, output_numb_fields, x, y, z]``.
:rtype: torch.Tensor
"""
batch_size = x.shape[0]
# Compute Fourier transform of the input
x_ft = torch.fft.rfftn(x, dim=[-3, -2, -1])
# Multiply relevant Fourier modes
out_ft = torch.zeros(
batch_size,
self._output_channels,
x.size(-3),
x.size(-2),
x.size(-1) // 2 + 1,
device=x.device,
dtype=torch.cfloat,
)
slice0 = (
slice(None),
slice(None),
slice(self._modes[0]),
slice(self._modes[1]),
slice(self._modes[2]),
)
out_ft[slice0] = self._compute_mult3d(x_ft[slice0], self._weights1)
slice1 = (
slice(None),
slice(None),
slice(self._modes[0]),
slice(-self._modes[1], None),
slice(self._modes[2]),
)
out_ft[slice1] = self._compute_mult3d(x_ft[slice1], self._weights2)
slice2 = (
slice(None),
slice(None),
slice(-self._modes[0], None),
slice(self._modes[1]),
slice(self._modes[2]),
)
out_ft[slice2] = self._compute_mult3d(x_ft[slice2], self._weights3)
slice3 = (
slice(None),
slice(None),
slice(-self._modes[0], None),
slice(-self._modes[1], None),
slice(self._modes[2]),
)
out_ft[slice3] = self._compute_mult3d(x_ft[slice3], self._weights4)
# Return to physical space
return torch.fft.irfftn(out_ft, s=(x.size(-3), x.size(-2), x.size(-1)))

View File

@@ -1,85 +0,0 @@
import torch
class Stride(object):
def __init__(self, dict):
"""Stride class for continous convolution
:param param: type of continuous convolution
:type param: string
"""
self._dict_stride = dict
self._stride_continuous = None
self._stride_discrete = self._create_stride_discrete(dict)
def _create_stride_discrete(self, my_dict):
"""Creating the list for applying the filter
:param my_dict: Dictionary with the following arguments:
domain size, starting position of the filter, jump size
for the filter and direction of the filter
:type my_dict: dict
:raises IndexError: Values in the dict must have all same length
:raises ValueError: Domain values must be greater than 0
:raises ValueError: Direction must be either equal to 1, -1 or 0
:raises IndexError: Direction and jumps must have zero in the same
index
:return: list of positions for the filter
:rtype: list
:Example:
>>> stride = {"domain": [4, 4],
"start": [-4, 2],
"jump": [2, 2],
"direction": [1, 1],
}
>>> create_stride(stride)
[[-4.0, 2.0], [-4.0, 4.0], [-2.0, 2.0], [-2.0, 4.0]]
"""
# we must check boundaries of the input as well
domain, start, jumps, direction = my_dict.values()
# checking
if not all([len(s) == len(domain) for s in my_dict.values()]):
raise IndexError("values in the dict must have all same length")
if not all(v >= 0 for v in domain):
raise ValueError("domain values must be greater than 0")
if not all(v == 1 or v == -1 or v == 0 for v in direction):
raise ValueError("direction must be either equal to 1, -1 or 0")
seq_jumps = [i for i, e in enumerate(jumps) if e == 0]
seq_direction = [i for i, e in enumerate(direction) if e == 0]
if seq_direction != seq_jumps:
raise IndexError(
"direction and jumps must have zero in the same index"
)
if seq_jumps:
for i in seq_jumps:
jumps[i] = domain[i]
direction[i] = 1
# creating the stride grid
values_mesh = [
torch.arange(0, i, step).float() for i, step in zip(domain, jumps)
]
values_mesh = [
single * dim for single, dim in zip(values_mesh, direction)
]
mesh = torch.meshgrid(values_mesh)
coordinates_mesh = [x.reshape(-1, 1) for x in mesh]
stride = torch.cat(coordinates_mesh, dim=1) + torch.tensor(start)
return stride

View File

@@ -1,49 +0,0 @@
import torch
def check_point(x, current_stride, dim):
max_stride = current_stride + dim
indeces = torch.logical_and(
x[..., :-1] < max_stride, x[..., :-1] >= current_stride
).all(dim=-1)
return indeces
def map_points_(x, filter_position):
"""Mapping function n dimensional case
:param x: input data of two dimension
:type x: torch.tensor
:param filter_position: position of the filter
:type dim: list[numeric]
:return: data mapped inplace
:rtype: torch.tensor
"""
x.add_(-filter_position)
return x
def optimizing(f):
"""Decorator for calling a function just once
:param f: python function
:type f: function
"""
def wrapper(*args, **kwargs):
if kwargs["type"] == "forward":
if not wrapper.has_run_inverse:
wrapper.has_run_inverse = True
return f(*args, **kwargs)
if kwargs["type"] == "inverse":
if not wrapper.has_run:
wrapper.has_run = True
return f(*args, **kwargs)
wrapper.has_run_inverse = False
wrapper.has_run = False
return wrapper