Solvers for multiple models (#133)

* Solvers for multiple models
- Implementing the possibility to add multiple models for solvers (e.g. GAN)
- Implementing GAROM solver, see https://arxiv.org/abs/2305.15881
- Implementing tests for GAROM solver (cpu only)
- Fixing docs PINNs
- Creating a solver directory, for consistency in the package


---------

Co-authored-by: Dario Coscia <dariocoscia@dhcp-040.eduroam.sissa.it>
This commit is contained in:
Dario Coscia
2023-06-28 14:44:49 +02:00
committed by Nicola Demo
parent 6c8635c316
commit 701046661f
9 changed files with 612 additions and 81 deletions

View File

@@ -0,0 +1,162 @@
import torch
from pina.problem import AbstractProblem
from pina import Condition, LabelTensor
from pina.solvers import GAROM
from pina.trainer import Trainer
import torch.nn as nn
import matplotlib.tri as tri
def func(x, mu1, mu2):
import torch
x_m1 = (x[:, 0] - mu1).pow(2)
x_m2 = (x[:, 1] - mu2).pow(2)
norm = x[:, 0]**2 + x[:, 1]**2
return torch.exp(-(x_m1 + x_m2))
class ParametricGaussian(AbstractProblem):
output_variables = [f'u_{i}' for i in range(900)]
# params
xx = torch.linspace(-1, 1, 20)
yy = xx
params = LabelTensor(torch.cartesian_prod(xx, yy), labels=['mu1', 'mu2'])
# define domain
x = torch.linspace(-1, 1, 30)
domain = torch.cartesian_prod(x, x)
triang = tri.Triangulation(domain[:, 0], domain[:, 1])
sol = []
for p in params:
sol.append(func(domain, p[0], p[1]))
snapshots = LabelTensor(torch.stack(sol), labels=output_variables)
# define conditions
conditions = {
'data': Condition(
input_points=params,
output_points=snapshots)
}
# simple Generator Network
class Generator(nn.Module):
def __init__(self, input_dimension, parameters_dimension,
noise_dimension, activation=torch.nn.SiLU):
super().__init__()
self._noise_dimension = noise_dimension
self._activation = activation
self.model = torch.nn.Sequential(
torch.nn.Linear(6 * self._noise_dimension, input_dimension // 6),
self._activation(),
torch.nn.Linear(input_dimension // 6, input_dimension // 3),
self._activation(),
torch.nn.Linear(input_dimension // 3, input_dimension)
)
self.condition = torch.nn.Sequential(
torch.nn.Linear(parameters_dimension, 2 * self._noise_dimension),
self._activation(),
torch.nn.Linear(2 * self._noise_dimension, 5 * self._noise_dimension)
)
def forward(self, param):
# uniform sampling in [-1, 1]
z = torch.rand(size=(param.shape[0], self._noise_dimension),
device=param.device,
dtype=param.dtype,
requires_grad=True)
z = 2. * z - 1.
# conditioning by concatenation of mapped parameters
input_ = torch.cat((z, self.condition(param)), dim=-1)
out = self.model(input_)
return out
# Simple Discriminator Network
class Discriminator(nn.Module):
def __init__(self, input_dimension, parameter_dimension,
hidden_dimension, activation=torch.nn.ReLU):
super().__init__()
self._activation = activation
self.encoding = torch.nn.Sequential(
torch.nn.Linear(input_dimension, input_dimension // 3),
self._activation(),
torch.nn.Linear(input_dimension // 3, input_dimension // 6),
self._activation(),
torch.nn.Linear(input_dimension // 6, hidden_dimension)
)
self.decoding = torch.nn.Sequential(
torch.nn.Linear(2*hidden_dimension, input_dimension // 6),
self._activation(),
torch.nn.Linear(input_dimension // 6, input_dimension // 3),
self._activation(),
torch.nn.Linear(input_dimension // 3, input_dimension),
)
self.condition = torch.nn.Sequential(
torch.nn.Linear(parameter_dimension, hidden_dimension // 2),
self._activation(),
torch.nn.Linear(hidden_dimension // 2, hidden_dimension)
)
def forward(self, data):
x, condition = data
encoding = self.encoding(x)
conditioning = torch.cat((encoding, self.condition(condition)), dim=-1)
decoding = self.decoding(conditioning)
return decoding
problem = ParametricGaussian()
def test_constructor():
GAROM(problem = problem,
generator = Generator(input_dimension=900,
parameters_dimension=2,
noise_dimension=12),
discriminator = Discriminator(input_dimension=900,
parameter_dimension=2,
hidden_dimension=64)
)
def test_train_cpu():
solver = GAROM(problem = problem,
generator = Generator(input_dimension=900,
parameters_dimension=2,
noise_dimension=12),
discriminator = Discriminator(input_dimension=900,
parameter_dimension=2,
hidden_dimension=64)
)
trainer = Trainer(solver=solver, kwargs={'max_epochs' : 4, 'accelerator': 'cpu'})
trainer.train()
def test_sample():
solver = GAROM(problem = problem,
generator = Generator(input_dimension=900,
parameters_dimension=2,
noise_dimension=12),
discriminator = Discriminator(input_dimension=900,
parameter_dimension=2,
hidden_dimension=64)
)
solver.sample(problem.params)
assert solver.sample(problem.params).shape == problem.snapshots.shape
def test_forward():
solver = GAROM(problem = problem,
generator = Generator(input_dimension=900,
parameters_dimension=2,
noise_dimension=12),
discriminator = Discriminator(input_dimension=900,
parameter_dimension=2,
hidden_dimension=64)
)
solver(problem.params, mc_steps=100, variance=True)
assert solver(problem.params).shape == problem.snapshots.shape

View File

@@ -0,0 +1,215 @@
import torch
import pytest
from pina.problem import SpatialProblem
from pina.operators import nabla
from pina.geometry import CartesianDomain
from pina import Condition, LabelTensor, PINN
from pina.trainer import Trainer
from pina.model import FeedForward
from pina.equation.equation import Equation
from pina.equation.equation_factory import FixedValue
from pina.plotter import Plotter
from pina.loss import LpLoss
def laplace_equation(input_, output_):
force_term = (torch.sin(input_.extract(['x'])*torch.pi) *
torch.sin(input_.extract(['y'])*torch.pi))
nabla_u = nabla(output_.extract(['u']), input_)
return nabla_u - force_term
my_laplace = Equation(laplace_equation)
in_ = LabelTensor(torch.tensor([[0., 1.]], requires_grad=True), ['x', 'y'])
out_ = LabelTensor(torch.tensor([[0.]], requires_grad=True), ['u'])
class Poisson(SpatialProblem):
output_variables = ['u']
spatial_domain = CartesianDomain({'x': [0, 1], 'y': [0, 1]})
conditions = {
'gamma1': Condition(
location=CartesianDomain({'x': [0, 1], 'y': 1}),
equation=FixedValue(0.0)),
'gamma2': Condition(
location=CartesianDomain({'x': [0, 1], 'y': 0}),
equation=FixedValue(0.0)),
'gamma3': Condition(
location=CartesianDomain({'x': 1, 'y': [0, 1]}),
equation=FixedValue(0.0)),
'gamma4': Condition(
location=CartesianDomain({'x': 0, 'y': [0, 1]}),
equation=FixedValue(0.0)),
'D': Condition(
location=CartesianDomain({'x': [0, 1], 'y': [0, 1]}),
equation=my_laplace),
'data': Condition(
input_points=in_,
output_points=out_)
}
def poisson_sol(self, pts):
return -(
torch.sin(pts.extract(['x'])*torch.pi) *
torch.sin(pts.extract(['y'])*torch.pi)
)/(2*torch.pi**2)
truth_solution = poisson_sol
class myFeature(torch.nn.Module):
"""
Feature: sin(x)
"""
def __init__(self):
super(myFeature, self).__init__()
def forward(self, x):
t = (torch.sin(x.extract(['x'])*torch.pi) *
torch.sin(x.extract(['y'])*torch.pi))
return LabelTensor(t, ['sin(x)sin(y)'])
# make the problem
poisson_problem = Poisson()
model = FeedForward(len(poisson_problem.input_variables),len(poisson_problem.output_variables))
model_extra_feats = FeedForward(len(poisson_problem.input_variables)+1,len(poisson_problem.output_variables))
extra_feats = [myFeature()]
def test_constructor():
PINN(problem = poisson_problem, model=model, extra_features=None)
def test_constructor_extra_feats():
model_extra_feats = FeedForward(len(poisson_problem.input_variables)+1,len(poisson_problem.output_variables))
PINN(problem = poisson_problem, model=model_extra_feats, extra_features=extra_feats)
def test_train_cpu():
poisson_problem = Poisson()
boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
n = 10
poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
poisson_problem.discretise_domain(n, 'grid', locations=['D'])
pinn = PINN(problem = poisson_problem, model=model, extra_features=None, loss=LpLoss())
trainer = Trainer(solver=pinn, kwargs={'max_epochs' : 5, 'accelerator':'cpu'})
trainer.train()
def test_train_extra_feats_cpu():
poisson_problem = Poisson()
boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
n = 10
poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
poisson_problem.discretise_domain(n, 'grid', locations=['D'])
pinn = PINN(problem = poisson_problem, model=model_extra_feats, extra_features=extra_feats)
trainer = Trainer(solver=pinn, kwargs={'max_epochs' : 5, 'accelerator':'cpu'})
trainer.train()
"""
def test_train_2():
boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
n = 10
expected_keys = [[], list(range(0, 50, 3))]
param = [0, 3]
for i, truth_key in zip(param, expected_keys):
pinn = PINN(problem, model)
pinn.discretise_domain(n, 'grid', locations=boundaries)
pinn.discretise_domain(n, 'grid', locations=['D'])
pinn.train(50, save_loss=i)
assert list(pinn.history_loss.keys()) == truth_key
def test_train_extra_feats():
pinn = PINN(problem, model_extra_feat, [myFeature()])
boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
n = 10
pinn.discretise_domain(n, 'grid', locations=boundaries)
pinn.discretise_domain(n, 'grid', locations=['D'])
pinn.train(5)
def test_train_2_extra_feats():
boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
n = 10
expected_keys = [[], list(range(0, 50, 3))]
param = [0, 3]
for i, truth_key in zip(param, expected_keys):
pinn = PINN(problem, model_extra_feat, [myFeature()])
pinn.discretise_domain(n, 'grid', locations=boundaries)
pinn.discretise_domain(n, 'grid', locations=['D'])
pinn.train(50, save_loss=i)
assert list(pinn.history_loss.keys()) == truth_key
def test_train_with_optimizer_kwargs():
boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
n = 10
expected_keys = [[], list(range(0, 50, 3))]
param = [0, 3]
for i, truth_key in zip(param, expected_keys):
pinn = PINN(problem, model, optimizer_kwargs={'lr' : 0.3})
pinn.discretise_domain(n, 'grid', locations=boundaries)
pinn.discretise_domain(n, 'grid', locations=['D'])
pinn.train(50, save_loss=i)
assert list(pinn.history_loss.keys()) == truth_key
def test_train_with_lr_scheduler():
boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
n = 10
expected_keys = [[], list(range(0, 50, 3))]
param = [0, 3]
for i, truth_key in zip(param, expected_keys):
pinn = PINN(
problem,
model,
lr_scheduler_type=torch.optim.lr_scheduler.CyclicLR,
lr_scheduler_kwargs={'base_lr' : 0.1, 'max_lr' : 0.3, 'cycle_momentum': False}
)
pinn.discretise_domain(n, 'grid', locations=boundaries)
pinn.discretise_domain(n, 'grid', locations=['D'])
pinn.train(50, save_loss=i)
assert list(pinn.history_loss.keys()) == truth_key
# def test_train_batch():
# pinn = PINN(problem, model, batch_size=6)
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# n = 10
# pinn.discretise_domain(n, 'grid', locations=boundaries)
# pinn.discretise_domain(n, 'grid', locations=['D'])
# pinn.train(5)
# def test_train_batch_2():
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# n = 10
# expected_keys = [[], list(range(0, 50, 3))]
# param = [0, 3]
# for i, truth_key in zip(param, expected_keys):
# pinn = PINN(problem, model, batch_size=6)
# pinn.discretise_domain(n, 'grid', locations=boundaries)
# pinn.discretise_domain(n, 'grid', locations=['D'])
# pinn.train(50, save_loss=i)
# assert list(pinn.history_loss.keys()) == truth_key
if torch.cuda.is_available():
# def test_gpu_train():
# pinn = PINN(problem, model, batch_size=20, device='cuda')
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# n = 100
# pinn.discretise_domain(n, 'grid', locations=boundaries)
# pinn.discretise_domain(n, 'grid', locations=['D'])
# pinn.train(5)
def test_gpu_train_nobatch():
pinn = PINN(problem, model, batch_size=None, device='cuda')
boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
n = 100
pinn.discretise_domain(n, 'grid', locations=boundaries)
pinn.discretise_domain(n, 'grid', locations=['D'])
pinn.train(5)
"""