PINN variants addition and Solvers Update (#263)
* gpinn/basepinn new classes, pinn restructure * codacy fix gpinn/basepinn/pinn * inverse problem fix * Causal PINN (#267) * fix GPU training in inverse problem (#283) * Create a `compute_residual` attribute for `PINNInterface` * Modify dataloading in solvers (#286) * Modify PINNInterface by removing _loss_phys, _loss_data * Adding in PINNInterface a variable to track the current condition during training * Modify GPINN,PINN,CausalPINN to match changes in PINNInterface * Competitive Pinn Addition (#288) * fixing after rebase/ fix loss * fixing final issues --------- Co-authored-by: Dario Coscia <dariocoscia@Dario-Coscia.local> * Modify min max formulation to max min for paper consistency * Adding SAPINN solver (#291) * rom solver * fix import --------- Co-authored-by: Dario Coscia <dariocoscia@Dario-Coscia.local> Co-authored-by: Anna Ivagnes <75523024+annaivagnes@users.noreply.github.com> Co-authored-by: valc89 <103250118+valc89@users.noreply.github.com> Co-authored-by: Monthly Tag bot <mtbot@noreply.github.com> Co-authored-by: Nicola Demo <demo.nicola@gmail.com>
This commit is contained in:
266
tests/test_solvers/test_causalpinn.py
Normal file
266
tests/test_solvers/test_causalpinn.py
Normal file
@@ -0,0 +1,266 @@
|
||||
import torch
|
||||
import pytest
|
||||
|
||||
from pina.problem import TimeDependentProblem, InverseProblem, SpatialProblem
|
||||
from pina.operators import grad
|
||||
from pina.geometry import CartesianDomain
|
||||
from pina import Condition, LabelTensor
|
||||
from pina.solvers import CausalPINN
|
||||
from pina.trainer import Trainer
|
||||
from pina.model import FeedForward
|
||||
from pina.equation.equation import Equation
|
||||
from pina.equation.equation_factory import FixedValue
|
||||
from pina.loss import LpLoss
|
||||
|
||||
|
||||
|
||||
class FooProblem(SpatialProblem):
|
||||
'''
|
||||
Foo problem formulation.
|
||||
'''
|
||||
output_variables = ['u']
|
||||
conditions = {}
|
||||
spatial_domain = None
|
||||
|
||||
|
||||
class InverseDiffusionReactionSystem(TimeDependentProblem, SpatialProblem, InverseProblem):
|
||||
|
||||
def diffusionreaction(input_, output_, params_):
|
||||
x = input_.extract('x')
|
||||
t = input_.extract('t')
|
||||
u_t = grad(output_, input_, d='t')
|
||||
u_x = grad(output_, input_, d='x')
|
||||
u_xx = grad(u_x, input_, d='x')
|
||||
r = torch.exp(-t) * (1.5 * torch.sin(2*x) + (8/3)*torch.sin(3*x) +
|
||||
(15/4)*torch.sin(4*x) + (63/8)*torch.sin(8*x))
|
||||
return u_t - params_['mu']*u_xx - r
|
||||
|
||||
def _solution(self, pts):
|
||||
t = pts.extract('t')
|
||||
x = pts.extract('x')
|
||||
return torch.exp(-t) * (torch.sin(x) + (1/2)*torch.sin(2*x) +
|
||||
(1/3)*torch.sin(3*x) + (1/4)*torch.sin(4*x) +
|
||||
(1/8)*torch.sin(8*x))
|
||||
|
||||
# assign output/ spatial and temporal variables
|
||||
output_variables = ['u']
|
||||
spatial_domain = CartesianDomain({'x': [-torch.pi, torch.pi]})
|
||||
temporal_domain = CartesianDomain({'t': [0, 1]})
|
||||
unknown_parameter_domain = CartesianDomain({'mu': [-1, 1]})
|
||||
|
||||
# problem condition statement
|
||||
conditions = {
|
||||
'D': Condition(location=CartesianDomain({'x': [-torch.pi, torch.pi],
|
||||
't': [0, 1]}),
|
||||
equation=Equation(diffusionreaction)),
|
||||
'data' : Condition(input_points=LabelTensor(torch.tensor([[0., 0.]]), ['x', 't']),
|
||||
output_points=LabelTensor(torch.tensor([[0.]]), ['u'])),
|
||||
}
|
||||
|
||||
class DiffusionReactionSystem(TimeDependentProblem, SpatialProblem):
|
||||
|
||||
def diffusionreaction(input_, output_):
|
||||
x = input_.extract('x')
|
||||
t = input_.extract('t')
|
||||
u_t = grad(output_, input_, d='t')
|
||||
u_x = grad(output_, input_, d='x')
|
||||
u_xx = grad(u_x, input_, d='x')
|
||||
r = torch.exp(-t) * (1.5 * torch.sin(2*x) + (8/3)*torch.sin(3*x) +
|
||||
(15/4)*torch.sin(4*x) + (63/8)*torch.sin(8*x))
|
||||
return u_t - u_xx - r
|
||||
|
||||
def _solution(self, pts):
|
||||
t = pts.extract('t')
|
||||
x = pts.extract('x')
|
||||
return torch.exp(-t) * (torch.sin(x) + (1/2)*torch.sin(2*x) +
|
||||
(1/3)*torch.sin(3*x) + (1/4)*torch.sin(4*x) +
|
||||
(1/8)*torch.sin(8*x))
|
||||
|
||||
# assign output/ spatial and temporal variables
|
||||
output_variables = ['u']
|
||||
spatial_domain = CartesianDomain({'x': [-torch.pi, torch.pi]})
|
||||
temporal_domain = CartesianDomain({'t': [0, 1]})
|
||||
|
||||
# problem condition statement
|
||||
conditions = {
|
||||
'D': Condition(location=CartesianDomain({'x': [-torch.pi, torch.pi],
|
||||
't': [0, 1]}),
|
||||
equation=Equation(diffusionreaction)),
|
||||
}
|
||||
|
||||
class myFeature(torch.nn.Module):
|
||||
"""
|
||||
Feature: sin(x)
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
super(myFeature, self).__init__()
|
||||
|
||||
def forward(self, x):
|
||||
t = (torch.sin(x.extract(['x']) * torch.pi))
|
||||
return LabelTensor(t, ['sin(x)'])
|
||||
|
||||
|
||||
# make the problem
|
||||
problem = DiffusionReactionSystem()
|
||||
model = FeedForward(len(problem.input_variables),
|
||||
len(problem.output_variables))
|
||||
model_extra_feats = FeedForward(
|
||||
len(problem.input_variables) + 1,
|
||||
len(problem.output_variables))
|
||||
extra_feats = [myFeature()]
|
||||
|
||||
|
||||
def test_constructor():
|
||||
CausalPINN(problem=problem, model=model, extra_features=None)
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
CausalPINN(FooProblem(), model=model, extra_features=None)
|
||||
|
||||
|
||||
def test_constructor_extra_feats():
|
||||
model_extra_feats = FeedForward(
|
||||
len(problem.input_variables) + 1,
|
||||
len(problem.output_variables))
|
||||
CausalPINN(problem=problem,
|
||||
model=model_extra_feats,
|
||||
extra_features=extra_feats)
|
||||
|
||||
|
||||
def test_train_cpu():
|
||||
problem = DiffusionReactionSystem()
|
||||
boundaries = ['D']
|
||||
n = 10
|
||||
problem.discretise_domain(n, 'grid', locations=boundaries)
|
||||
pinn = CausalPINN(problem = problem,
|
||||
model=model, extra_features=None, loss=LpLoss())
|
||||
trainer = Trainer(solver=pinn, max_epochs=1,
|
||||
accelerator='cpu', batch_size=20)
|
||||
trainer.train()
|
||||
|
||||
|
||||
def test_train_restore():
|
||||
tmpdir = "tests/tmp_restore"
|
||||
problem = DiffusionReactionSystem()
|
||||
boundaries = ['D']
|
||||
n = 10
|
||||
problem.discretise_domain(n, 'grid', locations=boundaries)
|
||||
pinn = CausalPINN(problem=problem,
|
||||
model=model,
|
||||
extra_features=None,
|
||||
loss=LpLoss())
|
||||
trainer = Trainer(solver=pinn,
|
||||
max_epochs=5,
|
||||
accelerator='cpu',
|
||||
default_root_dir=tmpdir)
|
||||
trainer.train()
|
||||
ntrainer = Trainer(solver=pinn, max_epochs=15, accelerator='cpu')
|
||||
t = ntrainer.train(
|
||||
ckpt_path=f'{tmpdir}/lightning_logs/version_0/'
|
||||
'checkpoints/epoch=4-step=5.ckpt')
|
||||
import shutil
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
|
||||
def test_train_load():
|
||||
tmpdir = "tests/tmp_load"
|
||||
problem = DiffusionReactionSystem()
|
||||
boundaries = ['D']
|
||||
n = 10
|
||||
problem.discretise_domain(n, 'grid', locations=boundaries)
|
||||
pinn = CausalPINN(problem=problem,
|
||||
model=model,
|
||||
extra_features=None,
|
||||
loss=LpLoss())
|
||||
trainer = Trainer(solver=pinn,
|
||||
max_epochs=15,
|
||||
accelerator='cpu',
|
||||
default_root_dir=tmpdir)
|
||||
trainer.train()
|
||||
new_pinn = CausalPINN.load_from_checkpoint(
|
||||
f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=14-step=15.ckpt',
|
||||
problem = problem, model=model)
|
||||
test_pts = CartesianDomain({'x': [0, 1], 't': [0, 1]}).sample(10)
|
||||
assert new_pinn.forward(test_pts).extract(['u']).shape == (10, 1)
|
||||
assert new_pinn.forward(test_pts).extract(
|
||||
['u']).shape == pinn.forward(test_pts).extract(['u']).shape
|
||||
torch.testing.assert_close(
|
||||
new_pinn.forward(test_pts).extract(['u']),
|
||||
pinn.forward(test_pts).extract(['u']))
|
||||
import shutil
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
def test_train_inverse_problem_cpu():
|
||||
problem = InverseDiffusionReactionSystem()
|
||||
boundaries = ['D']
|
||||
n = 100
|
||||
problem.discretise_domain(n, 'random', locations=boundaries)
|
||||
pinn = CausalPINN(problem = problem,
|
||||
model=model, extra_features=None, loss=LpLoss())
|
||||
trainer = Trainer(solver=pinn, max_epochs=1,
|
||||
accelerator='cpu', batch_size=20)
|
||||
trainer.train()
|
||||
|
||||
|
||||
# # TODO does not currently work
|
||||
# def test_train_inverse_problem_restore():
|
||||
# tmpdir = "tests/tmp_restore_inv"
|
||||
# problem = InverseDiffusionReactionSystem()
|
||||
# boundaries = ['D']
|
||||
# n = 100
|
||||
# problem.discretise_domain(n, 'random', locations=boundaries)
|
||||
# pinn = CausalPINN(problem=problem,
|
||||
# model=model,
|
||||
# extra_features=None,
|
||||
# loss=LpLoss())
|
||||
# trainer = Trainer(solver=pinn,
|
||||
# max_epochs=5,
|
||||
# accelerator='cpu',
|
||||
# default_root_dir=tmpdir)
|
||||
# trainer.train()
|
||||
# ntrainer = Trainer(solver=pinn, max_epochs=5, accelerator='cpu')
|
||||
# t = ntrainer.train(
|
||||
# ckpt_path=f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=4-step=5.ckpt')
|
||||
# import shutil
|
||||
# shutil.rmtree(tmpdir)
|
||||
|
||||
|
||||
def test_train_inverse_problem_load():
|
||||
tmpdir = "tests/tmp_load_inv"
|
||||
problem = InverseDiffusionReactionSystem()
|
||||
boundaries = ['D']
|
||||
n = 100
|
||||
problem.discretise_domain(n, 'random', locations=boundaries)
|
||||
pinn = CausalPINN(problem=problem,
|
||||
model=model,
|
||||
extra_features=None,
|
||||
loss=LpLoss())
|
||||
trainer = Trainer(solver=pinn,
|
||||
max_epochs=15,
|
||||
accelerator='cpu',
|
||||
default_root_dir=tmpdir)
|
||||
trainer.train()
|
||||
new_pinn = CausalPINN.load_from_checkpoint(
|
||||
f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=14-step=30.ckpt',
|
||||
problem = problem, model=model)
|
||||
test_pts = CartesianDomain({'x': [0, 1], 't': [0, 1]}).sample(10)
|
||||
assert new_pinn.forward(test_pts).extract(['u']).shape == (10, 1)
|
||||
assert new_pinn.forward(test_pts).extract(
|
||||
['u']).shape == pinn.forward(test_pts).extract(['u']).shape
|
||||
torch.testing.assert_close(
|
||||
new_pinn.forward(test_pts).extract(['u']),
|
||||
pinn.forward(test_pts).extract(['u']))
|
||||
import shutil
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
|
||||
def test_train_extra_feats_cpu():
|
||||
problem = DiffusionReactionSystem()
|
||||
boundaries = ['D']
|
||||
n = 10
|
||||
problem.discretise_domain(n, 'grid', locations=boundaries)
|
||||
pinn = CausalPINN(problem=problem,
|
||||
model=model_extra_feats,
|
||||
extra_features=extra_feats)
|
||||
trainer = Trainer(solver=pinn, max_epochs=5, accelerator='cpu')
|
||||
trainer.train()
|
||||
418
tests/test_solvers/test_competitive_pinn.py
Normal file
418
tests/test_solvers/test_competitive_pinn.py
Normal file
@@ -0,0 +1,418 @@
|
||||
import torch
|
||||
import pytest
|
||||
|
||||
from pina.problem import SpatialProblem, InverseProblem
|
||||
from pina.operators import laplacian
|
||||
from pina.geometry import CartesianDomain
|
||||
from pina import Condition, LabelTensor
|
||||
from pina.solvers import CompetitivePINN as PINN
|
||||
from pina.trainer import Trainer
|
||||
from pina.model import FeedForward
|
||||
from pina.equation.equation import Equation
|
||||
from pina.equation.equation_factory import FixedValue
|
||||
from pina.loss import LpLoss
|
||||
|
||||
|
||||
def laplace_equation(input_, output_):
|
||||
force_term = (torch.sin(input_.extract(['x']) * torch.pi) *
|
||||
torch.sin(input_.extract(['y']) * torch.pi))
|
||||
delta_u = laplacian(output_.extract(['u']), input_)
|
||||
return delta_u - force_term
|
||||
|
||||
|
||||
my_laplace = Equation(laplace_equation)
|
||||
in_ = LabelTensor(torch.tensor([[0., 1.]]), ['x', 'y'])
|
||||
out_ = LabelTensor(torch.tensor([[0.]]), ['u'])
|
||||
in2_ = LabelTensor(torch.rand(60, 2), ['x', 'y'])
|
||||
out2_ = LabelTensor(torch.rand(60, 1), ['u'])
|
||||
|
||||
|
||||
class InversePoisson(SpatialProblem, InverseProblem):
|
||||
'''
|
||||
Problem definition for the Poisson equation.
|
||||
'''
|
||||
output_variables = ['u']
|
||||
x_min = -2
|
||||
x_max = 2
|
||||
y_min = -2
|
||||
y_max = 2
|
||||
data_input = LabelTensor(torch.rand(10, 2), ['x', 'y'])
|
||||
data_output = LabelTensor(torch.rand(10, 1), ['u'])
|
||||
spatial_domain = CartesianDomain({'x': [x_min, x_max], 'y': [y_min, y_max]})
|
||||
# define the ranges for the parameters
|
||||
unknown_parameter_domain = CartesianDomain({'mu1': [-1, 1], 'mu2': [-1, 1]})
|
||||
|
||||
def laplace_equation(input_, output_, params_):
|
||||
'''
|
||||
Laplace equation with a force term.
|
||||
'''
|
||||
force_term = torch.exp(
|
||||
- 2*(input_.extract(['x']) - params_['mu1'])**2
|
||||
- 2*(input_.extract(['y']) - params_['mu2'])**2)
|
||||
delta_u = laplacian(output_, input_, components=['u'], d=['x', 'y'])
|
||||
|
||||
return delta_u - force_term
|
||||
|
||||
# define the conditions for the loss (boundary conditions, equation, data)
|
||||
conditions = {
|
||||
'gamma1': Condition(location=CartesianDomain({'x': [x_min, x_max],
|
||||
'y': y_max}),
|
||||
equation=FixedValue(0.0, components=['u'])),
|
||||
'gamma2': Condition(location=CartesianDomain(
|
||||
{'x': [x_min, x_max], 'y': y_min
|
||||
}),
|
||||
equation=FixedValue(0.0, components=['u'])),
|
||||
'gamma3': Condition(location=CartesianDomain(
|
||||
{'x': x_max, 'y': [y_min, y_max]
|
||||
}),
|
||||
equation=FixedValue(0.0, components=['u'])),
|
||||
'gamma4': Condition(location=CartesianDomain(
|
||||
{'x': x_min, 'y': [y_min, y_max]
|
||||
}),
|
||||
equation=FixedValue(0.0, components=['u'])),
|
||||
'D': Condition(location=CartesianDomain(
|
||||
{'x': [x_min, x_max], 'y': [y_min, y_max]
|
||||
}),
|
||||
equation=Equation(laplace_equation)),
|
||||
'data': Condition(input_points=data_input.extract(['x', 'y']),
|
||||
output_points=data_output)
|
||||
}
|
||||
|
||||
|
||||
class Poisson(SpatialProblem):
|
||||
output_variables = ['u']
|
||||
spatial_domain = CartesianDomain({'x': [0, 1], 'y': [0, 1]})
|
||||
|
||||
conditions = {
|
||||
'gamma1': Condition(
|
||||
location=CartesianDomain({'x': [0, 1], 'y': 1}),
|
||||
equation=FixedValue(0.0)),
|
||||
'gamma2': Condition(
|
||||
location=CartesianDomain({'x': [0, 1], 'y': 0}),
|
||||
equation=FixedValue(0.0)),
|
||||
'gamma3': Condition(
|
||||
location=CartesianDomain({'x': 1, 'y': [0, 1]}),
|
||||
equation=FixedValue(0.0)),
|
||||
'gamma4': Condition(
|
||||
location=CartesianDomain({'x': 0, 'y': [0, 1]}),
|
||||
equation=FixedValue(0.0)),
|
||||
'D': Condition(
|
||||
input_points=LabelTensor(torch.rand(size=(100, 2)), ['x', 'y']),
|
||||
equation=my_laplace),
|
||||
'data': Condition(
|
||||
input_points=in_,
|
||||
output_points=out_),
|
||||
'data2': Condition(
|
||||
input_points=in2_,
|
||||
output_points=out2_)
|
||||
}
|
||||
|
||||
def poisson_sol(self, pts):
|
||||
return -(torch.sin(pts.extract(['x']) * torch.pi) *
|
||||
torch.sin(pts.extract(['y']) * torch.pi)) / (2 * torch.pi**2)
|
||||
|
||||
truth_solution = poisson_sol
|
||||
|
||||
|
||||
class myFeature(torch.nn.Module):
|
||||
"""
|
||||
Feature: sin(x)
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
super(myFeature, self).__init__()
|
||||
|
||||
def forward(self, x):
|
||||
t = (torch.sin(x.extract(['x']) * torch.pi) *
|
||||
torch.sin(x.extract(['y']) * torch.pi))
|
||||
return LabelTensor(t, ['sin(x)sin(y)'])
|
||||
|
||||
|
||||
# make the problem
|
||||
poisson_problem = Poisson()
|
||||
model = FeedForward(len(poisson_problem.input_variables),
|
||||
len(poisson_problem.output_variables))
|
||||
model_extra_feats = FeedForward(
|
||||
len(poisson_problem.input_variables) + 1,
|
||||
len(poisson_problem.output_variables))
|
||||
extra_feats = [myFeature()]
|
||||
|
||||
|
||||
def test_constructor():
|
||||
PINN(problem=poisson_problem, model=model)
|
||||
PINN(problem=poisson_problem, model=model, discriminator = model)
|
||||
|
||||
|
||||
def test_constructor_extra_feats():
|
||||
with pytest.raises(TypeError):
|
||||
model_extra_feats = FeedForward(
|
||||
len(poisson_problem.input_variables) + 1,
|
||||
len(poisson_problem.output_variables))
|
||||
PINN(problem=poisson_problem,
|
||||
model=model_extra_feats,
|
||||
extra_features=extra_feats)
|
||||
|
||||
|
||||
def test_train_cpu():
|
||||
poisson_problem = Poisson()
|
||||
boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
n = 10
|
||||
poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
|
||||
pinn = PINN(problem = poisson_problem, model=model, loss=LpLoss())
|
||||
trainer = Trainer(solver=pinn, max_epochs=1,
|
||||
accelerator='cpu', batch_size=20)
|
||||
trainer.train()
|
||||
|
||||
|
||||
def test_train_restore():
|
||||
tmpdir = "tests/tmp_restore"
|
||||
poisson_problem = Poisson()
|
||||
boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
n = 10
|
||||
poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
|
||||
pinn = PINN(problem=poisson_problem,
|
||||
model=model,
|
||||
loss=LpLoss())
|
||||
trainer = Trainer(solver=pinn,
|
||||
max_epochs=5,
|
||||
accelerator='cpu',
|
||||
default_root_dir=tmpdir)
|
||||
trainer.train()
|
||||
ntrainer = Trainer(solver=pinn, max_epochs=15, accelerator='cpu')
|
||||
t = ntrainer.train(
|
||||
ckpt_path=f'{tmpdir}/lightning_logs/version_0/'
|
||||
'checkpoints/epoch=4-step=10.ckpt')
|
||||
import shutil
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
|
||||
def test_train_load():
|
||||
tmpdir = "tests/tmp_load"
|
||||
poisson_problem = Poisson()
|
||||
boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
n = 10
|
||||
poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
|
||||
pinn = PINN(problem=poisson_problem,
|
||||
model=model,
|
||||
loss=LpLoss())
|
||||
trainer = Trainer(solver=pinn,
|
||||
max_epochs=15,
|
||||
accelerator='cpu',
|
||||
default_root_dir=tmpdir)
|
||||
trainer.train()
|
||||
new_pinn = PINN.load_from_checkpoint(
|
||||
f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=14-step=30.ckpt',
|
||||
problem = poisson_problem, model=model)
|
||||
test_pts = CartesianDomain({'x': [0, 1], 'y': [0, 1]}).sample(10)
|
||||
assert new_pinn.forward(test_pts).extract(['u']).shape == (10, 1)
|
||||
assert new_pinn.forward(test_pts).extract(
|
||||
['u']).shape == pinn.forward(test_pts).extract(['u']).shape
|
||||
torch.testing.assert_close(
|
||||
new_pinn.forward(test_pts).extract(['u']),
|
||||
pinn.forward(test_pts).extract(['u']))
|
||||
import shutil
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
def test_train_inverse_problem_cpu():
|
||||
poisson_problem = InversePoisson()
|
||||
boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4', 'D']
|
||||
n = 100
|
||||
poisson_problem.discretise_domain(n, 'random', locations=boundaries)
|
||||
pinn = PINN(problem = poisson_problem, model=model, loss=LpLoss())
|
||||
trainer = Trainer(solver=pinn, max_epochs=1,
|
||||
accelerator='cpu', batch_size=20)
|
||||
trainer.train()
|
||||
|
||||
|
||||
# # TODO does not currently work
|
||||
# def test_train_inverse_problem_restore():
|
||||
# tmpdir = "tests/tmp_restore_inv"
|
||||
# poisson_problem = InversePoisson()
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4', 'D']
|
||||
# n = 100
|
||||
# poisson_problem.discretise_domain(n, 'random', locations=boundaries)
|
||||
# pinn = PINN(problem=poisson_problem,
|
||||
# model=model,
|
||||
# loss=LpLoss())
|
||||
# trainer = Trainer(solver=pinn,
|
||||
# max_epochs=5,
|
||||
# accelerator='cpu',
|
||||
# default_root_dir=tmpdir)
|
||||
# trainer.train()
|
||||
# ntrainer = Trainer(solver=pinn, max_epochs=5, accelerator='cpu')
|
||||
# t = ntrainer.train(
|
||||
# ckpt_path=f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=4-step=10.ckpt')
|
||||
# import shutil
|
||||
# shutil.rmtree(tmpdir)
|
||||
|
||||
|
||||
def test_train_inverse_problem_load():
|
||||
tmpdir = "tests/tmp_load_inv"
|
||||
poisson_problem = InversePoisson()
|
||||
boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4', 'D']
|
||||
n = 100
|
||||
poisson_problem.discretise_domain(n, 'random', locations=boundaries)
|
||||
pinn = PINN(problem=poisson_problem,
|
||||
model=model,
|
||||
loss=LpLoss())
|
||||
trainer = Trainer(solver=pinn,
|
||||
max_epochs=15,
|
||||
accelerator='cpu',
|
||||
default_root_dir=tmpdir)
|
||||
trainer.train()
|
||||
new_pinn = PINN.load_from_checkpoint(
|
||||
f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=14-step=30.ckpt',
|
||||
problem = poisson_problem, model=model)
|
||||
test_pts = CartesianDomain({'x': [0, 1], 'y': [0, 1]}).sample(10)
|
||||
assert new_pinn.forward(test_pts).extract(['u']).shape == (10, 1)
|
||||
assert new_pinn.forward(test_pts).extract(
|
||||
['u']).shape == pinn.forward(test_pts).extract(['u']).shape
|
||||
torch.testing.assert_close(
|
||||
new_pinn.forward(test_pts).extract(['u']),
|
||||
pinn.forward(test_pts).extract(['u']))
|
||||
import shutil
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
# # TODO fix asap. Basically sampling few variables
|
||||
# # works only if both variables are in a range.
|
||||
# # if one is fixed and the other not, this will
|
||||
# # not work. This test also needs to be fixed and
|
||||
# # insert in test problem not in test pinn.
|
||||
# def test_train_cpu_sampling_few_vars():
|
||||
# poisson_problem = Poisson()
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3']
|
||||
# n = 10
|
||||
# poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# poisson_problem.discretise_domain(n, 'random', locations=['gamma4'], variables=['x'])
|
||||
# poisson_problem.discretise_domain(n, 'random', locations=['gamma4'], variables=['y'])
|
||||
# pinn = PINN(problem = poisson_problem, model=model, extra_features=None, loss=LpLoss())
|
||||
# trainer = Trainer(solver=pinn, kwargs={'max_epochs' : 5, 'accelerator':'cpu'})
|
||||
# trainer.train()
|
||||
|
||||
|
||||
# TODO, fix GitHub actions to run also on GPU
|
||||
# def test_train_gpu():
|
||||
# poisson_problem = Poisson()
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# n = 10
|
||||
# poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# pinn = PINN(problem = poisson_problem, model=model, extra_features=None, loss=LpLoss())
|
||||
# trainer = Trainer(solver=pinn, kwargs={'max_epochs' : 5, 'accelerator':'gpu'})
|
||||
# trainer.train()
|
||||
|
||||
# def test_train_gpu(): #TODO fix ASAP
|
||||
# poisson_problem = Poisson()
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# n = 10
|
||||
# poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# poisson_problem.conditions.pop('data') # The input/output pts are allocated on cpu
|
||||
# pinn = PINN(problem = poisson_problem, model=model, extra_features=None, loss=LpLoss())
|
||||
# trainer = Trainer(solver=pinn, kwargs={'max_epochs' : 5, 'accelerator':'gpu'})
|
||||
# trainer.train()
|
||||
|
||||
# def test_train_2():
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# n = 10
|
||||
# expected_keys = [[], list(range(0, 50, 3))]
|
||||
# param = [0, 3]
|
||||
# for i, truth_key in zip(param, expected_keys):
|
||||
# pinn = PINN(problem, model)
|
||||
# pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
# pinn.train(50, save_loss=i)
|
||||
# assert list(pinn.history_loss.keys()) == truth_key
|
||||
|
||||
|
||||
# def test_train_extra_feats():
|
||||
# pinn = PINN(problem, model_extra_feat, [myFeature()])
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# n = 10
|
||||
# pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
# pinn.train(5)
|
||||
|
||||
|
||||
# def test_train_2_extra_feats():
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# n = 10
|
||||
# expected_keys = [[], list(range(0, 50, 3))]
|
||||
# param = [0, 3]
|
||||
# for i, truth_key in zip(param, expected_keys):
|
||||
# pinn = PINN(problem, model_extra_feat, [myFeature()])
|
||||
# pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
# pinn.train(50, save_loss=i)
|
||||
# assert list(pinn.history_loss.keys()) == truth_key
|
||||
|
||||
|
||||
# def test_train_with_optimizer_kwargs():
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# n = 10
|
||||
# expected_keys = [[], list(range(0, 50, 3))]
|
||||
# param = [0, 3]
|
||||
# for i, truth_key in zip(param, expected_keys):
|
||||
# pinn = PINN(problem, model, optimizer_kwargs={'lr' : 0.3})
|
||||
# pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
# pinn.train(50, save_loss=i)
|
||||
# assert list(pinn.history_loss.keys()) == truth_key
|
||||
|
||||
|
||||
# def test_train_with_lr_scheduler():
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# n = 10
|
||||
# expected_keys = [[], list(range(0, 50, 3))]
|
||||
# param = [0, 3]
|
||||
# for i, truth_key in zip(param, expected_keys):
|
||||
# pinn = PINN(
|
||||
# problem,
|
||||
# model,
|
||||
# lr_scheduler_type=torch.optim.lr_scheduler.CyclicLR,
|
||||
# lr_scheduler_kwargs={'base_lr' : 0.1, 'max_lr' : 0.3, 'cycle_momentum': False}
|
||||
# )
|
||||
# pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
# pinn.train(50, save_loss=i)
|
||||
# assert list(pinn.history_loss.keys()) == truth_key
|
||||
|
||||
|
||||
# # def test_train_batch():
|
||||
# # pinn = PINN(problem, model, batch_size=6)
|
||||
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# # n = 10
|
||||
# # pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# # pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
# # pinn.train(5)
|
||||
|
||||
|
||||
# # def test_train_batch_2():
|
||||
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# # n = 10
|
||||
# # expected_keys = [[], list(range(0, 50, 3))]
|
||||
# # param = [0, 3]
|
||||
# # for i, truth_key in zip(param, expected_keys):
|
||||
# # pinn = PINN(problem, model, batch_size=6)
|
||||
# # pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# # pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
# # pinn.train(50, save_loss=i)
|
||||
# # assert list(pinn.history_loss.keys()) == truth_key
|
||||
|
||||
|
||||
# if torch.cuda.is_available():
|
||||
|
||||
# # def test_gpu_train():
|
||||
# # pinn = PINN(problem, model, batch_size=20, device='cuda')
|
||||
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# # n = 100
|
||||
# # pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# # pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
# # pinn.train(5)
|
||||
|
||||
# def test_gpu_train_nobatch():
|
||||
# pinn = PINN(problem, model, batch_size=None, device='cuda')
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# n = 100
|
||||
# pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
# pinn.train(5)
|
||||
|
||||
432
tests/test_solvers/test_gpinn.py
Normal file
432
tests/test_solvers/test_gpinn.py
Normal file
@@ -0,0 +1,432 @@
|
||||
import torch
|
||||
|
||||
from pina.problem import SpatialProblem, InverseProblem
|
||||
from pina.operators import laplacian
|
||||
from pina.geometry import CartesianDomain
|
||||
from pina import Condition, LabelTensor
|
||||
from pina.solvers import GPINN
|
||||
from pina.trainer import Trainer
|
||||
from pina.model import FeedForward
|
||||
from pina.equation.equation import Equation
|
||||
from pina.equation.equation_factory import FixedValue
|
||||
from pina.loss import LpLoss
|
||||
|
||||
|
||||
def laplace_equation(input_, output_):
|
||||
force_term = (torch.sin(input_.extract(['x']) * torch.pi) *
|
||||
torch.sin(input_.extract(['y']) * torch.pi))
|
||||
delta_u = laplacian(output_.extract(['u']), input_)
|
||||
return delta_u - force_term
|
||||
|
||||
|
||||
my_laplace = Equation(laplace_equation)
|
||||
in_ = LabelTensor(torch.tensor([[0., 1.]]), ['x', 'y'])
|
||||
out_ = LabelTensor(torch.tensor([[0.]]), ['u'])
|
||||
in2_ = LabelTensor(torch.rand(60, 2), ['x', 'y'])
|
||||
out2_ = LabelTensor(torch.rand(60, 1), ['u'])
|
||||
|
||||
|
||||
class InversePoisson(SpatialProblem, InverseProblem):
|
||||
'''
|
||||
Problem definition for the Poisson equation.
|
||||
'''
|
||||
output_variables = ['u']
|
||||
x_min = -2
|
||||
x_max = 2
|
||||
y_min = -2
|
||||
y_max = 2
|
||||
data_input = LabelTensor(torch.rand(10, 2), ['x', 'y'])
|
||||
data_output = LabelTensor(torch.rand(10, 1), ['u'])
|
||||
spatial_domain = CartesianDomain({'x': [x_min, x_max], 'y': [y_min, y_max]})
|
||||
# define the ranges for the parameters
|
||||
unknown_parameter_domain = CartesianDomain({'mu1': [-1, 1], 'mu2': [-1, 1]})
|
||||
|
||||
def laplace_equation(input_, output_, params_):
|
||||
'''
|
||||
Laplace equation with a force term.
|
||||
'''
|
||||
force_term = torch.exp(
|
||||
- 2*(input_.extract(['x']) - params_['mu1'])**2
|
||||
- 2*(input_.extract(['y']) - params_['mu2'])**2)
|
||||
delta_u = laplacian(output_, input_, components=['u'], d=['x', 'y'])
|
||||
|
||||
return delta_u - force_term
|
||||
|
||||
# define the conditions for the loss (boundary conditions, equation, data)
|
||||
conditions = {
|
||||
'gamma1': Condition(location=CartesianDomain({'x': [x_min, x_max],
|
||||
'y': y_max}),
|
||||
equation=FixedValue(0.0, components=['u'])),
|
||||
'gamma2': Condition(location=CartesianDomain(
|
||||
{'x': [x_min, x_max], 'y': y_min}),
|
||||
equation=FixedValue(0.0, components=['u'])),
|
||||
'gamma3': Condition(location=CartesianDomain(
|
||||
{'x': x_max, 'y': [y_min, y_max]}),
|
||||
equation=FixedValue(0.0, components=['u'])),
|
||||
'gamma4': Condition(location=CartesianDomain(
|
||||
{'x': x_min, 'y': [y_min, y_max]
|
||||
}),
|
||||
equation=FixedValue(0.0, components=['u'])),
|
||||
'D': Condition(location=CartesianDomain(
|
||||
{'x': [x_min, x_max], 'y': [y_min, y_max]
|
||||
}),
|
||||
equation=Equation(laplace_equation)),
|
||||
'data': Condition(
|
||||
input_points=data_input.extract(['x', 'y']),
|
||||
output_points=data_output)
|
||||
}
|
||||
|
||||
|
||||
class Poisson(SpatialProblem):
|
||||
output_variables = ['u']
|
||||
spatial_domain = CartesianDomain({'x': [0, 1], 'y': [0, 1]})
|
||||
|
||||
conditions = {
|
||||
'gamma1': Condition(
|
||||
location=CartesianDomain({'x': [0, 1], 'y': 1}),
|
||||
equation=FixedValue(0.0)),
|
||||
'gamma2': Condition(
|
||||
location=CartesianDomain({'x': [0, 1], 'y': 0}),
|
||||
equation=FixedValue(0.0)),
|
||||
'gamma3': Condition(
|
||||
location=CartesianDomain({'x': 1, 'y': [0, 1]}),
|
||||
equation=FixedValue(0.0)),
|
||||
'gamma4': Condition(
|
||||
location=CartesianDomain({'x': 0, 'y': [0, 1]}),
|
||||
equation=FixedValue(0.0)),
|
||||
'D': Condition(
|
||||
input_points=LabelTensor(torch.rand(size=(100, 2)), ['x', 'y']),
|
||||
equation=my_laplace),
|
||||
'data': Condition(
|
||||
input_points=in_,
|
||||
output_points=out_),
|
||||
'data2': Condition(
|
||||
input_points=in2_,
|
||||
output_points=out2_)
|
||||
}
|
||||
|
||||
def poisson_sol(self, pts):
|
||||
return -(torch.sin(pts.extract(['x']) * torch.pi) *
|
||||
torch.sin(pts.extract(['y']) * torch.pi)) / (2 * torch.pi**2)
|
||||
|
||||
truth_solution = poisson_sol
|
||||
|
||||
|
||||
class myFeature(torch.nn.Module):
|
||||
"""
|
||||
Feature: sin(x)
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
super(myFeature, self).__init__()
|
||||
|
||||
def forward(self, x):
|
||||
t = (torch.sin(x.extract(['x']) * torch.pi) *
|
||||
torch.sin(x.extract(['y']) * torch.pi))
|
||||
return LabelTensor(t, ['sin(x)sin(y)'])
|
||||
|
||||
|
||||
# make the problem
|
||||
poisson_problem = Poisson()
|
||||
model = FeedForward(len(poisson_problem.input_variables),
|
||||
len(poisson_problem.output_variables))
|
||||
model_extra_feats = FeedForward(
|
||||
len(poisson_problem.input_variables) + 1,
|
||||
len(poisson_problem.output_variables))
|
||||
extra_feats = [myFeature()]
|
||||
|
||||
|
||||
def test_constructor():
|
||||
GPINN(problem=poisson_problem, model=model, extra_features=None)
|
||||
|
||||
|
||||
def test_constructor_extra_feats():
|
||||
model_extra_feats = FeedForward(
|
||||
len(poisson_problem.input_variables) + 1,
|
||||
len(poisson_problem.output_variables))
|
||||
GPINN(problem=poisson_problem,
|
||||
model=model_extra_feats,
|
||||
extra_features=extra_feats)
|
||||
|
||||
|
||||
def test_train_cpu():
|
||||
poisson_problem = Poisson()
|
||||
boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
n = 10
|
||||
poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
|
||||
pinn = GPINN(problem = poisson_problem,
|
||||
model=model, extra_features=None, loss=LpLoss())
|
||||
trainer = Trainer(solver=pinn, max_epochs=1,
|
||||
accelerator='cpu', batch_size=20)
|
||||
trainer.train()
|
||||
|
||||
|
||||
def test_train_restore():
|
||||
tmpdir = "tests/tmp_restore"
|
||||
poisson_problem = Poisson()
|
||||
boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
n = 10
|
||||
poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
|
||||
pinn = GPINN(problem=poisson_problem,
|
||||
model=model,
|
||||
extra_features=None,
|
||||
loss=LpLoss())
|
||||
trainer = Trainer(solver=pinn,
|
||||
max_epochs=5,
|
||||
accelerator='cpu',
|
||||
default_root_dir=tmpdir)
|
||||
trainer.train()
|
||||
ntrainer = Trainer(solver=pinn, max_epochs=15, accelerator='cpu')
|
||||
t = ntrainer.train(
|
||||
ckpt_path=f'{tmpdir}/lightning_logs/version_0/'
|
||||
'checkpoints/epoch=4-step=10.ckpt')
|
||||
import shutil
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
|
||||
def test_train_load():
|
||||
tmpdir = "tests/tmp_load"
|
||||
poisson_problem = Poisson()
|
||||
boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
n = 10
|
||||
poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
|
||||
pinn = GPINN(problem=poisson_problem,
|
||||
model=model,
|
||||
extra_features=None,
|
||||
loss=LpLoss())
|
||||
trainer = Trainer(solver=pinn,
|
||||
max_epochs=15,
|
||||
accelerator='cpu',
|
||||
default_root_dir=tmpdir)
|
||||
trainer.train()
|
||||
new_pinn = GPINN.load_from_checkpoint(
|
||||
f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=14-step=30.ckpt',
|
||||
problem = poisson_problem, model=model)
|
||||
test_pts = CartesianDomain({'x': [0, 1], 'y': [0, 1]}).sample(10)
|
||||
assert new_pinn.forward(test_pts).extract(['u']).shape == (10, 1)
|
||||
assert new_pinn.forward(test_pts).extract(
|
||||
['u']).shape == pinn.forward(test_pts).extract(['u']).shape
|
||||
torch.testing.assert_close(
|
||||
new_pinn.forward(test_pts).extract(['u']),
|
||||
pinn.forward(test_pts).extract(['u']))
|
||||
import shutil
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
def test_train_inverse_problem_cpu():
|
||||
poisson_problem = InversePoisson()
|
||||
boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4', 'D']
|
||||
n = 100
|
||||
poisson_problem.discretise_domain(n, 'random', locations=boundaries)
|
||||
pinn = GPINN(problem = poisson_problem,
|
||||
model=model, extra_features=None, loss=LpLoss())
|
||||
trainer = Trainer(solver=pinn, max_epochs=1,
|
||||
accelerator='cpu', batch_size=20)
|
||||
trainer.train()
|
||||
|
||||
|
||||
# # TODO does not currently work
|
||||
# def test_train_inverse_problem_restore():
|
||||
# tmpdir = "tests/tmp_restore_inv"
|
||||
# poisson_problem = InversePoisson()
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4', 'D']
|
||||
# n = 100
|
||||
# poisson_problem.discretise_domain(n, 'random', locations=boundaries)
|
||||
# pinn = GPINN(problem=poisson_problem,
|
||||
# model=model,
|
||||
# extra_features=None,
|
||||
# loss=LpLoss())
|
||||
# trainer = Trainer(solver=pinn,
|
||||
# max_epochs=5,
|
||||
# accelerator='cpu',
|
||||
# default_root_dir=tmpdir)
|
||||
# trainer.train()
|
||||
# ntrainer = Trainer(solver=pinn, max_epochs=5, accelerator='cpu')
|
||||
# t = ntrainer.train(
|
||||
# ckpt_path=f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=4-step=10.ckpt')
|
||||
# import shutil
|
||||
# shutil.rmtree(tmpdir)
|
||||
|
||||
|
||||
def test_train_inverse_problem_load():
|
||||
tmpdir = "tests/tmp_load_inv"
|
||||
poisson_problem = InversePoisson()
|
||||
boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4', 'D']
|
||||
n = 100
|
||||
poisson_problem.discretise_domain(n, 'random', locations=boundaries)
|
||||
pinn = GPINN(problem=poisson_problem,
|
||||
model=model,
|
||||
extra_features=None,
|
||||
loss=LpLoss())
|
||||
trainer = Trainer(solver=pinn,
|
||||
max_epochs=15,
|
||||
accelerator='cpu',
|
||||
default_root_dir=tmpdir)
|
||||
trainer.train()
|
||||
new_pinn = GPINN.load_from_checkpoint(
|
||||
f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=14-step=30.ckpt',
|
||||
problem = poisson_problem, model=model)
|
||||
test_pts = CartesianDomain({'x': [0, 1], 'y': [0, 1]}).sample(10)
|
||||
assert new_pinn.forward(test_pts).extract(['u']).shape == (10, 1)
|
||||
assert new_pinn.forward(test_pts).extract(
|
||||
['u']).shape == pinn.forward(test_pts).extract(['u']).shape
|
||||
torch.testing.assert_close(
|
||||
new_pinn.forward(test_pts).extract(['u']),
|
||||
pinn.forward(test_pts).extract(['u']))
|
||||
import shutil
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
# # TODO fix asap. Basically sampling few variables
|
||||
# # works only if both variables are in a range.
|
||||
# # if one is fixed and the other not, this will
|
||||
# # not work. This test also needs to be fixed and
|
||||
# # insert in test problem not in test pinn.
|
||||
# def test_train_cpu_sampling_few_vars():
|
||||
# poisson_problem = Poisson()
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3']
|
||||
# n = 10
|
||||
# poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# poisson_problem.discretise_domain(n, 'random', locations=['gamma4'], variables=['x'])
|
||||
# poisson_problem.discretise_domain(n, 'random', locations=['gamma4'], variables=['y'])
|
||||
# pinn = GPINN(problem = poisson_problem, model=model, extra_features=None, loss=LpLoss())
|
||||
# trainer = Trainer(solver=pinn, kwargs={'max_epochs' : 5, 'accelerator':'cpu'})
|
||||
# trainer.train()
|
||||
|
||||
|
||||
def test_train_extra_feats_cpu():
|
||||
poisson_problem = Poisson()
|
||||
boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
n = 10
|
||||
poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
|
||||
pinn = GPINN(problem=poisson_problem,
|
||||
model=model_extra_feats,
|
||||
extra_features=extra_feats)
|
||||
trainer = Trainer(solver=pinn, max_epochs=5, accelerator='cpu')
|
||||
trainer.train()
|
||||
|
||||
|
||||
# TODO, fix GitHub actions to run also on GPU
|
||||
# def test_train_gpu():
|
||||
# poisson_problem = Poisson()
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# n = 10
|
||||
# poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# pinn = GPINN(problem = poisson_problem, model=model, extra_features=None, loss=LpLoss())
|
||||
# trainer = Trainer(solver=pinn, kwargs={'max_epochs' : 5, 'accelerator':'gpu'})
|
||||
# trainer.train()
|
||||
|
||||
# def test_train_gpu(): #TODO fix ASAP
|
||||
# poisson_problem = Poisson()
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# n = 10
|
||||
# poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# poisson_problem.conditions.pop('data') # The input/output pts are allocated on cpu
|
||||
# pinn = GPINN(problem = poisson_problem, model=model, extra_features=None, loss=LpLoss())
|
||||
# trainer = Trainer(solver=pinn, kwargs={'max_epochs' : 5, 'accelerator':'gpu'})
|
||||
# trainer.train()
|
||||
|
||||
# def test_train_2():
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# n = 10
|
||||
# expected_keys = [[], list(range(0, 50, 3))]
|
||||
# param = [0, 3]
|
||||
# for i, truth_key in zip(param, expected_keys):
|
||||
# pinn = GPINN(problem, model)
|
||||
# pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
# pinn.train(50, save_loss=i)
|
||||
# assert list(pinn.history_loss.keys()) == truth_key
|
||||
|
||||
|
||||
# def test_train_extra_feats():
|
||||
# pinn = GPINN(problem, model_extra_feat, [myFeature()])
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# n = 10
|
||||
# pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
# pinn.train(5)
|
||||
|
||||
|
||||
# def test_train_2_extra_feats():
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# n = 10
|
||||
# expected_keys = [[], list(range(0, 50, 3))]
|
||||
# param = [0, 3]
|
||||
# for i, truth_key in zip(param, expected_keys):
|
||||
# pinn = GPINN(problem, model_extra_feat, [myFeature()])
|
||||
# pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
# pinn.train(50, save_loss=i)
|
||||
# assert list(pinn.history_loss.keys()) == truth_key
|
||||
|
||||
|
||||
# def test_train_with_optimizer_kwargs():
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# n = 10
|
||||
# expected_keys = [[], list(range(0, 50, 3))]
|
||||
# param = [0, 3]
|
||||
# for i, truth_key in zip(param, expected_keys):
|
||||
# pinn = GPINN(problem, model, optimizer_kwargs={'lr' : 0.3})
|
||||
# pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
# pinn.train(50, save_loss=i)
|
||||
# assert list(pinn.history_loss.keys()) == truth_key
|
||||
|
||||
|
||||
# def test_train_with_lr_scheduler():
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# n = 10
|
||||
# expected_keys = [[], list(range(0, 50, 3))]
|
||||
# param = [0, 3]
|
||||
# for i, truth_key in zip(param, expected_keys):
|
||||
# pinn = GPINN(
|
||||
# problem,
|
||||
# model,
|
||||
# lr_scheduler_type=torch.optim.lr_scheduler.CyclicLR,
|
||||
# lr_scheduler_kwargs={'base_lr' : 0.1, 'max_lr' : 0.3, 'cycle_momentum': False}
|
||||
# )
|
||||
# pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
# pinn.train(50, save_loss=i)
|
||||
# assert list(pinn.history_loss.keys()) == truth_key
|
||||
|
||||
|
||||
# # def test_train_batch():
|
||||
# # pinn = GPINN(problem, model, batch_size=6)
|
||||
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# # n = 10
|
||||
# # pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# # pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
# # pinn.train(5)
|
||||
|
||||
|
||||
# # def test_train_batch_2():
|
||||
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# # n = 10
|
||||
# # expected_keys = [[], list(range(0, 50, 3))]
|
||||
# # param = [0, 3]
|
||||
# # for i, truth_key in zip(param, expected_keys):
|
||||
# # pinn = GPINN(problem, model, batch_size=6)
|
||||
# # pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# # pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
# # pinn.train(50, save_loss=i)
|
||||
# # assert list(pinn.history_loss.keys()) == truth_key
|
||||
|
||||
|
||||
# if torch.cuda.is_available():
|
||||
|
||||
# # def test_gpu_train():
|
||||
# # pinn = GPINN(problem, model, batch_size=20, device='cuda')
|
||||
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# # n = 100
|
||||
# # pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# # pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
# # pinn.train(5)
|
||||
|
||||
# def test_gpu_train_nobatch():
|
||||
# pinn = GPINN(problem, model, batch_size=None, device='cuda')
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# n = 100
|
||||
# pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
# pinn.train(5)
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import torch
|
||||
|
||||
from pina.problem import SpatialProblem
|
||||
from pina.problem import SpatialProblem, InverseProblem
|
||||
from pina.operators import laplacian
|
||||
from pina.geometry import CartesianDomain
|
||||
from pina import Condition, LabelTensor
|
||||
@@ -26,6 +26,58 @@ in2_ = LabelTensor(torch.rand(60, 2), ['x', 'y'])
|
||||
out2_ = LabelTensor(torch.rand(60, 1), ['u'])
|
||||
|
||||
|
||||
class InversePoisson(SpatialProblem, InverseProblem):
|
||||
'''
|
||||
Problem definition for the Poisson equation.
|
||||
'''
|
||||
output_variables = ['u']
|
||||
x_min = -2
|
||||
x_max = 2
|
||||
y_min = -2
|
||||
y_max = 2
|
||||
data_input = LabelTensor(torch.rand(10, 2), ['x', 'y'])
|
||||
data_output = LabelTensor(torch.rand(10, 1), ['u'])
|
||||
spatial_domain = CartesianDomain({'x': [x_min, x_max], 'y': [y_min, y_max]})
|
||||
# define the ranges for the parameters
|
||||
unknown_parameter_domain = CartesianDomain({'mu1': [-1, 1], 'mu2': [-1, 1]})
|
||||
|
||||
def laplace_equation(input_, output_, params_):
|
||||
'''
|
||||
Laplace equation with a force term.
|
||||
'''
|
||||
force_term = torch.exp(
|
||||
- 2*(input_.extract(['x']) - params_['mu1'])**2
|
||||
- 2*(input_.extract(['y']) - params_['mu2'])**2)
|
||||
delta_u = laplacian(output_, input_, components=['u'], d=['x', 'y'])
|
||||
|
||||
return delta_u - force_term
|
||||
|
||||
# define the conditions for the loss (boundary conditions, equation, data)
|
||||
conditions = {
|
||||
'gamma1': Condition(location=CartesianDomain({'x': [x_min, x_max],
|
||||
'y': y_max}),
|
||||
equation=FixedValue(0.0, components=['u'])),
|
||||
'gamma2': Condition(location=CartesianDomain(
|
||||
{'x': [x_min, x_max], 'y': y_min
|
||||
}),
|
||||
equation=FixedValue(0.0, components=['u'])),
|
||||
'gamma3': Condition(location=CartesianDomain(
|
||||
{'x': x_max, 'y': [y_min, y_max]
|
||||
}),
|
||||
equation=FixedValue(0.0, components=['u'])),
|
||||
'gamma4': Condition(location=CartesianDomain(
|
||||
{'x': x_min, 'y': [y_min, y_max]
|
||||
}),
|
||||
equation=FixedValue(0.0, components=['u'])),
|
||||
'D': Condition(location=CartesianDomain(
|
||||
{'x': [x_min, x_max], 'y': [y_min, y_max]
|
||||
}),
|
||||
equation=Equation(laplace_equation)),
|
||||
'data': Condition(input_points=data_input.extract(['x', 'y']),
|
||||
output_points=data_output)
|
||||
}
|
||||
|
||||
|
||||
class Poisson(SpatialProblem):
|
||||
output_variables = ['u']
|
||||
spatial_domain = CartesianDomain({'x': [0, 1], 'y': [0, 1]})
|
||||
@@ -103,8 +155,10 @@ def test_train_cpu():
|
||||
boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
n = 10
|
||||
poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
|
||||
pinn = PINN(problem = poisson_problem, model=model, extra_features=None, loss=LpLoss())
|
||||
trainer = Trainer(solver=pinn, max_epochs=1, accelerator='cpu', batch_size=20)
|
||||
pinn = PINN(problem = poisson_problem, model=model,
|
||||
extra_features=None, loss=LpLoss())
|
||||
trainer = Trainer(solver=pinn, max_epochs=1,
|
||||
accelerator='cpu', batch_size=20)
|
||||
trainer.train()
|
||||
|
||||
|
||||
@@ -125,7 +179,8 @@ def test_train_restore():
|
||||
trainer.train()
|
||||
ntrainer = Trainer(solver=pinn, max_epochs=15, accelerator='cpu')
|
||||
t = ntrainer.train(
|
||||
ckpt_path=f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=4-step=10.ckpt')
|
||||
ckpt_path=f'{tmpdir}/lightning_logs/version_0/'
|
||||
'checkpoints/epoch=4-step=10.ckpt')
|
||||
import shutil
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
@@ -158,6 +213,68 @@ def test_train_load():
|
||||
import shutil
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
def test_train_inverse_problem_cpu():
|
||||
poisson_problem = InversePoisson()
|
||||
boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4', 'D']
|
||||
n = 100
|
||||
poisson_problem.discretise_domain(n, 'random', locations=boundaries)
|
||||
pinn = PINN(problem = poisson_problem, model=model,
|
||||
extra_features=None, loss=LpLoss())
|
||||
trainer = Trainer(solver=pinn, max_epochs=1,
|
||||
accelerator='cpu', batch_size=20)
|
||||
trainer.train()
|
||||
|
||||
|
||||
# # TODO does not currently work
|
||||
# def test_train_inverse_problem_restore():
|
||||
# tmpdir = "tests/tmp_restore_inv"
|
||||
# poisson_problem = InversePoisson()
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4', 'D']
|
||||
# n = 100
|
||||
# poisson_problem.discretise_domain(n, 'random', locations=boundaries)
|
||||
# pinn = PINN(problem=poisson_problem,
|
||||
# model=model,
|
||||
# extra_features=None,
|
||||
# loss=LpLoss())
|
||||
# trainer = Trainer(solver=pinn,
|
||||
# max_epochs=5,
|
||||
# accelerator='cpu',
|
||||
# default_root_dir=tmpdir)
|
||||
# trainer.train()
|
||||
# ntrainer = Trainer(solver=pinn, max_epochs=5, accelerator='cpu')
|
||||
# t = ntrainer.train(
|
||||
# ckpt_path=f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=4-step=10.ckpt')
|
||||
# import shutil
|
||||
# shutil.rmtree(tmpdir)
|
||||
|
||||
|
||||
def test_train_inverse_problem_load():
|
||||
tmpdir = "tests/tmp_load_inv"
|
||||
poisson_problem = InversePoisson()
|
||||
boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4', 'D']
|
||||
n = 100
|
||||
poisson_problem.discretise_domain(n, 'random', locations=boundaries)
|
||||
pinn = PINN(problem=poisson_problem,
|
||||
model=model,
|
||||
extra_features=None,
|
||||
loss=LpLoss())
|
||||
trainer = Trainer(solver=pinn,
|
||||
max_epochs=15,
|
||||
accelerator='cpu',
|
||||
default_root_dir=tmpdir)
|
||||
trainer.train()
|
||||
new_pinn = PINN.load_from_checkpoint(
|
||||
f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=14-step=30.ckpt',
|
||||
problem = poisson_problem, model=model)
|
||||
test_pts = CartesianDomain({'x': [0, 1], 'y': [0, 1]}).sample(10)
|
||||
assert new_pinn.forward(test_pts).extract(['u']).shape == (10, 1)
|
||||
assert new_pinn.forward(test_pts).extract(
|
||||
['u']).shape == pinn.forward(test_pts).extract(['u']).shape
|
||||
torch.testing.assert_close(
|
||||
new_pinn.forward(test_pts).extract(['u']),
|
||||
pinn.forward(test_pts).extract(['u']))
|
||||
import shutil
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
# # TODO fix asap. Basically sampling few variables
|
||||
# # works only if both variables are in a range.
|
||||
@@ -197,85 +314,32 @@ def test_train_extra_feats_cpu():
|
||||
# pinn = PINN(problem = poisson_problem, model=model, extra_features=None, loss=LpLoss())
|
||||
# trainer = Trainer(solver=pinn, kwargs={'max_epochs' : 5, 'accelerator':'gpu'})
|
||||
# trainer.train()
|
||||
"""
|
||||
def test_train_gpu(): #TODO fix ASAP
|
||||
poisson_problem = Poisson()
|
||||
boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
n = 10
|
||||
poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
|
||||
poisson_problem.conditions.pop('data') # The input/output pts are allocated on cpu
|
||||
pinn = PINN(problem = poisson_problem, model=model, extra_features=None, loss=LpLoss())
|
||||
trainer = Trainer(solver=pinn, kwargs={'max_epochs' : 5, 'accelerator':'gpu'})
|
||||
trainer.train()
|
||||
|
||||
def test_train_2():
|
||||
boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
n = 10
|
||||
expected_keys = [[], list(range(0, 50, 3))]
|
||||
param = [0, 3]
|
||||
for i, truth_key in zip(param, expected_keys):
|
||||
pinn = PINN(problem, model)
|
||||
pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
pinn.train(50, save_loss=i)
|
||||
assert list(pinn.history_loss.keys()) == truth_key
|
||||
# def test_train_gpu(): #TODO fix ASAP
|
||||
# poisson_problem = Poisson()
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# n = 10
|
||||
# poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# poisson_problem.conditions.pop('data') # The input/output pts are allocated on cpu
|
||||
# pinn = PINN(problem = poisson_problem, model=model, extra_features=None, loss=LpLoss())
|
||||
# trainer = Trainer(solver=pinn, kwargs={'max_epochs' : 5, 'accelerator':'gpu'})
|
||||
# trainer.train()
|
||||
|
||||
# def test_train_2():
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# n = 10
|
||||
# expected_keys = [[], list(range(0, 50, 3))]
|
||||
# param = [0, 3]
|
||||
# for i, truth_key in zip(param, expected_keys):
|
||||
# pinn = PINN(problem, model)
|
||||
# pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
# pinn.train(50, save_loss=i)
|
||||
# assert list(pinn.history_loss.keys()) == truth_key
|
||||
|
||||
|
||||
def test_train_extra_feats():
|
||||
pinn = PINN(problem, model_extra_feat, [myFeature()])
|
||||
boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
n = 10
|
||||
pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
pinn.train(5)
|
||||
|
||||
|
||||
def test_train_2_extra_feats():
|
||||
boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
n = 10
|
||||
expected_keys = [[], list(range(0, 50, 3))]
|
||||
param = [0, 3]
|
||||
for i, truth_key in zip(param, expected_keys):
|
||||
pinn = PINN(problem, model_extra_feat, [myFeature()])
|
||||
pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
pinn.train(50, save_loss=i)
|
||||
assert list(pinn.history_loss.keys()) == truth_key
|
||||
|
||||
|
||||
def test_train_with_optimizer_kwargs():
|
||||
boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
n = 10
|
||||
expected_keys = [[], list(range(0, 50, 3))]
|
||||
param = [0, 3]
|
||||
for i, truth_key in zip(param, expected_keys):
|
||||
pinn = PINN(problem, model, optimizer_kwargs={'lr' : 0.3})
|
||||
pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
pinn.train(50, save_loss=i)
|
||||
assert list(pinn.history_loss.keys()) == truth_key
|
||||
|
||||
|
||||
def test_train_with_lr_scheduler():
|
||||
boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
n = 10
|
||||
expected_keys = [[], list(range(0, 50, 3))]
|
||||
param = [0, 3]
|
||||
for i, truth_key in zip(param, expected_keys):
|
||||
pinn = PINN(
|
||||
problem,
|
||||
model,
|
||||
lr_scheduler_type=torch.optim.lr_scheduler.CyclicLR,
|
||||
lr_scheduler_kwargs={'base_lr' : 0.1, 'max_lr' : 0.3, 'cycle_momentum': False}
|
||||
)
|
||||
pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
pinn.train(50, save_loss=i)
|
||||
assert list(pinn.history_loss.keys()) == truth_key
|
||||
|
||||
|
||||
# def test_train_batch():
|
||||
# pinn = PINN(problem, model, batch_size=6)
|
||||
# def test_train_extra_feats():
|
||||
# pinn = PINN(problem, model_extra_feat, [myFeature()])
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# n = 10
|
||||
# pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
@@ -283,34 +347,87 @@ def test_train_with_lr_scheduler():
|
||||
# pinn.train(5)
|
||||
|
||||
|
||||
# def test_train_batch_2():
|
||||
# def test_train_2_extra_feats():
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# n = 10
|
||||
# expected_keys = [[], list(range(0, 50, 3))]
|
||||
# param = [0, 3]
|
||||
# for i, truth_key in zip(param, expected_keys):
|
||||
# pinn = PINN(problem, model, batch_size=6)
|
||||
# pinn = PINN(problem, model_extra_feat, [myFeature()])
|
||||
# pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
# pinn.train(50, save_loss=i)
|
||||
# assert list(pinn.history_loss.keys()) == truth_key
|
||||
|
||||
|
||||
if torch.cuda.is_available():
|
||||
# def test_train_with_optimizer_kwargs():
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# n = 10
|
||||
# expected_keys = [[], list(range(0, 50, 3))]
|
||||
# param = [0, 3]
|
||||
# for i, truth_key in zip(param, expected_keys):
|
||||
# pinn = PINN(problem, model, optimizer_kwargs={'lr' : 0.3})
|
||||
# pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
# pinn.train(50, save_loss=i)
|
||||
# assert list(pinn.history_loss.keys()) == truth_key
|
||||
|
||||
# def test_gpu_train():
|
||||
# pinn = PINN(problem, model, batch_size=20, device='cuda')
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# n = 100
|
||||
# pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
# pinn.train(5)
|
||||
|
||||
def test_gpu_train_nobatch():
|
||||
pinn = PINN(problem, model, batch_size=None, device='cuda')
|
||||
boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
n = 100
|
||||
pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
pinn.train(5)
|
||||
"""
|
||||
# def test_train_with_lr_scheduler():
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# n = 10
|
||||
# expected_keys = [[], list(range(0, 50, 3))]
|
||||
# param = [0, 3]
|
||||
# for i, truth_key in zip(param, expected_keys):
|
||||
# pinn = PINN(
|
||||
# problem,
|
||||
# model,
|
||||
# lr_scheduler_type=torch.optim.lr_scheduler.CyclicLR,
|
||||
# lr_scheduler_kwargs={'base_lr' : 0.1, 'max_lr' : 0.3, 'cycle_momentum': False}
|
||||
# )
|
||||
# pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
# pinn.train(50, save_loss=i)
|
||||
# assert list(pinn.history_loss.keys()) == truth_key
|
||||
|
||||
|
||||
# # def test_train_batch():
|
||||
# # pinn = PINN(problem, model, batch_size=6)
|
||||
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# # n = 10
|
||||
# # pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# # pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
# # pinn.train(5)
|
||||
|
||||
|
||||
# # def test_train_batch_2():
|
||||
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# # n = 10
|
||||
# # expected_keys = [[], list(range(0, 50, 3))]
|
||||
# # param = [0, 3]
|
||||
# # for i, truth_key in zip(param, expected_keys):
|
||||
# # pinn = PINN(problem, model, batch_size=6)
|
||||
# # pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# # pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
# # pinn.train(50, save_loss=i)
|
||||
# # assert list(pinn.history_loss.keys()) == truth_key
|
||||
|
||||
|
||||
# if torch.cuda.is_available():
|
||||
|
||||
# # def test_gpu_train():
|
||||
# # pinn = PINN(problem, model, batch_size=20, device='cuda')
|
||||
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# # n = 100
|
||||
# # pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# # pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
# # pinn.train(5)
|
||||
|
||||
# def test_gpu_train_nobatch():
|
||||
# pinn = PINN(problem, model, batch_size=None, device='cuda')
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# n = 100
|
||||
# pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
# pinn.train(5)
|
||||
|
||||
|
||||
105
tests/test_solvers/test_rom_solver.py
Normal file
105
tests/test_solvers/test_rom_solver.py
Normal file
@@ -0,0 +1,105 @@
|
||||
import torch
|
||||
import pytest
|
||||
|
||||
from pina.problem import AbstractProblem
|
||||
from pina import Condition, LabelTensor
|
||||
from pina.solvers import ReducedOrderModelSolver
|
||||
from pina.trainer import Trainer
|
||||
from pina.model import FeedForward
|
||||
from pina.loss import LpLoss
|
||||
|
||||
|
||||
class NeuralOperatorProblem(AbstractProblem):
|
||||
input_variables = ['u_0', 'u_1']
|
||||
output_variables = [f'u_{i}' for i in range(100)]
|
||||
conditions = {'data' : Condition(input_points=
|
||||
LabelTensor(torch.rand(10, 2),
|
||||
input_variables),
|
||||
output_points=
|
||||
LabelTensor(torch.rand(10, 100),
|
||||
output_variables))}
|
||||
|
||||
|
||||
# make the problem + extra feats
|
||||
class AE(torch.nn.Module):
|
||||
def __init__(self, input_dimensions, rank):
|
||||
super().__init__()
|
||||
self.encode = FeedForward(input_dimensions, rank, layers=[input_dimensions//4])
|
||||
self.decode = FeedForward(rank, input_dimensions, layers=[input_dimensions//4])
|
||||
class AE_missing_encode(torch.nn.Module):
|
||||
def __init__(self, input_dimensions, rank):
|
||||
super().__init__()
|
||||
self.encode = FeedForward(input_dimensions, rank, layers=[input_dimensions//4])
|
||||
class AE_missing_decode(torch.nn.Module):
|
||||
def __init__(self, input_dimensions, rank):
|
||||
super().__init__()
|
||||
self.decode = FeedForward(rank, input_dimensions, layers=[input_dimensions//4])
|
||||
|
||||
rank = 10
|
||||
problem = NeuralOperatorProblem()
|
||||
interpolation_net = FeedForward(len(problem.input_variables),
|
||||
rank)
|
||||
reduction_net = AE(len(problem.output_variables), rank)
|
||||
|
||||
def test_constructor():
|
||||
ReducedOrderModelSolver(problem=problem,reduction_network=reduction_net,
|
||||
interpolation_network=interpolation_net)
|
||||
with pytest.raises(SyntaxError):
|
||||
ReducedOrderModelSolver(problem=problem,
|
||||
reduction_network=AE_missing_encode(
|
||||
len(problem.output_variables), rank),
|
||||
interpolation_network=interpolation_net)
|
||||
ReducedOrderModelSolver(problem=problem,
|
||||
reduction_network=AE_missing_decode(
|
||||
len(problem.output_variables), rank),
|
||||
interpolation_network=interpolation_net)
|
||||
|
||||
|
||||
def test_train_cpu():
|
||||
solver = ReducedOrderModelSolver(problem = problem,reduction_network=reduction_net,
|
||||
interpolation_network=interpolation_net, loss=LpLoss())
|
||||
trainer = Trainer(solver=solver, max_epochs=3, accelerator='cpu', batch_size=20)
|
||||
trainer.train()
|
||||
|
||||
|
||||
def test_train_restore():
|
||||
tmpdir = "tests/tmp_restore"
|
||||
solver = ReducedOrderModelSolver(problem=problem,
|
||||
reduction_network=reduction_net,
|
||||
interpolation_network=interpolation_net,
|
||||
loss=LpLoss())
|
||||
trainer = Trainer(solver=solver,
|
||||
max_epochs=5,
|
||||
accelerator='cpu',
|
||||
default_root_dir=tmpdir)
|
||||
trainer.train()
|
||||
ntrainer = Trainer(solver=solver, max_epochs=15, accelerator='cpu')
|
||||
t = ntrainer.train(
|
||||
ckpt_path=f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=4-step=5.ckpt')
|
||||
import shutil
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
|
||||
def test_train_load():
|
||||
tmpdir = "tests/tmp_load"
|
||||
solver = ReducedOrderModelSolver(problem=problem,
|
||||
reduction_network=reduction_net,
|
||||
interpolation_network=interpolation_net,
|
||||
loss=LpLoss())
|
||||
trainer = Trainer(solver=solver,
|
||||
max_epochs=15,
|
||||
accelerator='cpu',
|
||||
default_root_dir=tmpdir)
|
||||
trainer.train()
|
||||
new_solver = ReducedOrderModelSolver.load_from_checkpoint(
|
||||
f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=14-step=15.ckpt',
|
||||
problem = problem,reduction_network=reduction_net,
|
||||
interpolation_network=interpolation_net)
|
||||
test_pts = LabelTensor(torch.rand(20, 2), problem.input_variables)
|
||||
assert new_solver.forward(test_pts).shape == (20, 100)
|
||||
assert new_solver.forward(test_pts).shape == solver.forward(test_pts).shape
|
||||
torch.testing.assert_close(
|
||||
new_solver.forward(test_pts),
|
||||
solver.forward(test_pts))
|
||||
import shutil
|
||||
shutil.rmtree(tmpdir)
|
||||
437
tests/test_solvers/test_sapinn.py
Normal file
437
tests/test_solvers/test_sapinn.py
Normal file
@@ -0,0 +1,437 @@
|
||||
import torch
|
||||
import pytest
|
||||
|
||||
from pina.problem import SpatialProblem, InverseProblem
|
||||
from pina.operators import laplacian
|
||||
from pina.geometry import CartesianDomain
|
||||
from pina import Condition, LabelTensor
|
||||
from pina.solvers import SAPINN as PINN
|
||||
from pina.trainer import Trainer
|
||||
from pina.model import FeedForward
|
||||
from pina.equation.equation import Equation
|
||||
from pina.equation.equation_factory import FixedValue
|
||||
from pina.loss import LpLoss
|
||||
|
||||
|
||||
def laplace_equation(input_, output_):
|
||||
force_term = (torch.sin(input_.extract(['x']) * torch.pi) *
|
||||
torch.sin(input_.extract(['y']) * torch.pi))
|
||||
delta_u = laplacian(output_.extract(['u']), input_)
|
||||
return delta_u - force_term
|
||||
|
||||
|
||||
my_laplace = Equation(laplace_equation)
|
||||
in_ = LabelTensor(torch.tensor([[0., 1.]]), ['x', 'y'])
|
||||
out_ = LabelTensor(torch.tensor([[0.]]), ['u'])
|
||||
in2_ = LabelTensor(torch.rand(60, 2), ['x', 'y'])
|
||||
out2_ = LabelTensor(torch.rand(60, 1), ['u'])
|
||||
|
||||
|
||||
class InversePoisson(SpatialProblem, InverseProblem):
|
||||
'''
|
||||
Problem definition for the Poisson equation.
|
||||
'''
|
||||
output_variables = ['u']
|
||||
x_min = -2
|
||||
x_max = 2
|
||||
y_min = -2
|
||||
y_max = 2
|
||||
data_input = LabelTensor(torch.rand(10, 2), ['x', 'y'])
|
||||
data_output = LabelTensor(torch.rand(10, 1), ['u'])
|
||||
spatial_domain = CartesianDomain({'x': [x_min, x_max], 'y': [y_min, y_max]})
|
||||
# define the ranges for the parameters
|
||||
unknown_parameter_domain = CartesianDomain({'mu1': [-1, 1], 'mu2': [-1, 1]})
|
||||
|
||||
def laplace_equation(input_, output_, params_):
|
||||
'''
|
||||
Laplace equation with a force term.
|
||||
'''
|
||||
force_term = torch.exp(
|
||||
- 2*(input_.extract(['x']) - params_['mu1'])**2
|
||||
- 2*(input_.extract(['y']) - params_['mu2'])**2)
|
||||
delta_u = laplacian(output_, input_, components=['u'], d=['x', 'y'])
|
||||
|
||||
return delta_u - force_term
|
||||
|
||||
# define the conditions for the loss (boundary conditions, equation, data)
|
||||
conditions = {
|
||||
'gamma1': Condition(location=CartesianDomain({'x': [x_min, x_max],
|
||||
'y': y_max}),
|
||||
equation=FixedValue(0.0, components=['u'])),
|
||||
'gamma2': Condition(location=CartesianDomain(
|
||||
{'x': [x_min, x_max], 'y': y_min
|
||||
}),
|
||||
equation=FixedValue(0.0, components=['u'])),
|
||||
'gamma3': Condition(location=CartesianDomain(
|
||||
{'x': x_max, 'y': [y_min, y_max]
|
||||
}),
|
||||
equation=FixedValue(0.0, components=['u'])),
|
||||
'gamma4': Condition(location=CartesianDomain(
|
||||
{'x': x_min, 'y': [y_min, y_max]
|
||||
}),
|
||||
equation=FixedValue(0.0, components=['u'])),
|
||||
'D': Condition(location=CartesianDomain(
|
||||
{'x': [x_min, x_max], 'y': [y_min, y_max]
|
||||
}),
|
||||
equation=Equation(laplace_equation)),
|
||||
'data': Condition(input_points=data_input.extract(['x', 'y']),
|
||||
output_points=data_output)
|
||||
}
|
||||
|
||||
|
||||
class Poisson(SpatialProblem):
|
||||
output_variables = ['u']
|
||||
spatial_domain = CartesianDomain({'x': [0, 1], 'y': [0, 1]})
|
||||
|
||||
conditions = {
|
||||
'gamma1': Condition(
|
||||
location=CartesianDomain({'x': [0, 1], 'y': 1}),
|
||||
equation=FixedValue(0.0)),
|
||||
'gamma2': Condition(
|
||||
location=CartesianDomain({'x': [0, 1], 'y': 0}),
|
||||
equation=FixedValue(0.0)),
|
||||
'gamma3': Condition(
|
||||
location=CartesianDomain({'x': 1, 'y': [0, 1]}),
|
||||
equation=FixedValue(0.0)),
|
||||
'gamma4': Condition(
|
||||
location=CartesianDomain({'x': 0, 'y': [0, 1]}),
|
||||
equation=FixedValue(0.0)),
|
||||
'D': Condition(
|
||||
input_points=LabelTensor(torch.rand(size=(100, 2)), ['x', 'y']),
|
||||
equation=my_laplace),
|
||||
'data': Condition(
|
||||
input_points=in_,
|
||||
output_points=out_),
|
||||
'data2': Condition(
|
||||
input_points=in2_,
|
||||
output_points=out2_)
|
||||
}
|
||||
|
||||
def poisson_sol(self, pts):
|
||||
return -(torch.sin(pts.extract(['x']) * torch.pi) *
|
||||
torch.sin(pts.extract(['y']) * torch.pi)) / (2 * torch.pi**2)
|
||||
|
||||
truth_solution = poisson_sol
|
||||
|
||||
|
||||
class myFeature(torch.nn.Module):
|
||||
"""
|
||||
Feature: sin(x)
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
super(myFeature, self).__init__()
|
||||
|
||||
def forward(self, x):
|
||||
t = (torch.sin(x.extract(['x']) * torch.pi) *
|
||||
torch.sin(x.extract(['y']) * torch.pi))
|
||||
return LabelTensor(t, ['sin(x)sin(y)'])
|
||||
|
||||
|
||||
# make the problem
|
||||
poisson_problem = Poisson()
|
||||
model = FeedForward(len(poisson_problem.input_variables),
|
||||
len(poisson_problem.output_variables))
|
||||
model_extra_feats = FeedForward(
|
||||
len(poisson_problem.input_variables) + 1,
|
||||
len(poisson_problem.output_variables))
|
||||
extra_feats = [myFeature()]
|
||||
|
||||
|
||||
def test_constructor():
|
||||
PINN(problem=poisson_problem, model=model, extra_features=None)
|
||||
with pytest.raises(ValueError):
|
||||
PINN(problem=poisson_problem, model=model, extra_features=None,
|
||||
weights_function=1)
|
||||
|
||||
|
||||
def test_constructor_extra_feats():
|
||||
model_extra_feats = FeedForward(
|
||||
len(poisson_problem.input_variables) + 1,
|
||||
len(poisson_problem.output_variables))
|
||||
PINN(problem=poisson_problem,
|
||||
model=model_extra_feats,
|
||||
extra_features=extra_feats)
|
||||
|
||||
|
||||
def test_train_cpu():
|
||||
poisson_problem = Poisson()
|
||||
boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
n = 10
|
||||
poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
|
||||
pinn = PINN(problem = poisson_problem, model=model,
|
||||
extra_features=None, loss=LpLoss())
|
||||
trainer = Trainer(solver=pinn, max_epochs=1,
|
||||
accelerator='cpu', batch_size=20)
|
||||
trainer.train()
|
||||
|
||||
|
||||
def test_train_restore():
|
||||
tmpdir = "tests/tmp_restore"
|
||||
poisson_problem = Poisson()
|
||||
boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
n = 10
|
||||
poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
|
||||
pinn = PINN(problem=poisson_problem,
|
||||
model=model,
|
||||
extra_features=None,
|
||||
loss=LpLoss())
|
||||
trainer = Trainer(solver=pinn,
|
||||
max_epochs=5,
|
||||
accelerator='cpu',
|
||||
default_root_dir=tmpdir)
|
||||
trainer.train()
|
||||
ntrainer = Trainer(solver=pinn, max_epochs=15, accelerator='cpu')
|
||||
t = ntrainer.train(
|
||||
ckpt_path=f'{tmpdir}/lightning_logs/version_0/'
|
||||
'checkpoints/epoch=4-step=10.ckpt')
|
||||
import shutil
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
|
||||
def test_train_load():
|
||||
tmpdir = "tests/tmp_load"
|
||||
poisson_problem = Poisson()
|
||||
boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
n = 10
|
||||
poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
|
||||
pinn = PINN(problem=poisson_problem,
|
||||
model=model,
|
||||
extra_features=None,
|
||||
loss=LpLoss())
|
||||
trainer = Trainer(solver=pinn,
|
||||
max_epochs=15,
|
||||
accelerator='cpu',
|
||||
default_root_dir=tmpdir)
|
||||
trainer.train()
|
||||
new_pinn = PINN.load_from_checkpoint(
|
||||
f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=14-step=30.ckpt',
|
||||
problem = poisson_problem, model=model)
|
||||
test_pts = CartesianDomain({'x': [0, 1], 'y': [0, 1]}).sample(10)
|
||||
assert new_pinn.forward(test_pts).extract(['u']).shape == (10, 1)
|
||||
assert new_pinn.forward(test_pts).extract(
|
||||
['u']).shape == pinn.forward(test_pts).extract(['u']).shape
|
||||
torch.testing.assert_close(
|
||||
new_pinn.forward(test_pts).extract(['u']),
|
||||
pinn.forward(test_pts).extract(['u']))
|
||||
import shutil
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
def test_train_inverse_problem_cpu():
|
||||
poisson_problem = InversePoisson()
|
||||
boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4', 'D']
|
||||
n = 100
|
||||
poisson_problem.discretise_domain(n, 'random', locations=boundaries)
|
||||
pinn = PINN(problem = poisson_problem, model=model,
|
||||
extra_features=None, loss=LpLoss())
|
||||
trainer = Trainer(solver=pinn, max_epochs=1,
|
||||
accelerator='cpu', batch_size=20)
|
||||
trainer.train()
|
||||
|
||||
|
||||
# # TODO does not currently work
|
||||
# def test_train_inverse_problem_restore():
|
||||
# tmpdir = "tests/tmp_restore_inv"
|
||||
# poisson_problem = InversePoisson()
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4', 'D']
|
||||
# n = 100
|
||||
# poisson_problem.discretise_domain(n, 'random', locations=boundaries)
|
||||
# pinn = PINN(problem=poisson_problem,
|
||||
# model=model,
|
||||
# extra_features=None,
|
||||
# loss=LpLoss())
|
||||
# trainer = Trainer(solver=pinn,
|
||||
# max_epochs=5,
|
||||
# accelerator='cpu',
|
||||
# default_root_dir=tmpdir)
|
||||
# trainer.train()
|
||||
# ntrainer = Trainer(solver=pinn, max_epochs=5, accelerator='cpu')
|
||||
# t = ntrainer.train(
|
||||
# ckpt_path=f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=4-step=10.ckpt')
|
||||
# import shutil
|
||||
# shutil.rmtree(tmpdir)
|
||||
|
||||
|
||||
def test_train_inverse_problem_load():
|
||||
tmpdir = "tests/tmp_load_inv"
|
||||
poisson_problem = InversePoisson()
|
||||
boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4', 'D']
|
||||
n = 100
|
||||
poisson_problem.discretise_domain(n, 'random', locations=boundaries)
|
||||
pinn = PINN(problem=poisson_problem,
|
||||
model=model,
|
||||
extra_features=None,
|
||||
loss=LpLoss())
|
||||
trainer = Trainer(solver=pinn,
|
||||
max_epochs=15,
|
||||
accelerator='cpu',
|
||||
default_root_dir=tmpdir)
|
||||
trainer.train()
|
||||
new_pinn = PINN.load_from_checkpoint(
|
||||
f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=14-step=30.ckpt',
|
||||
problem = poisson_problem, model=model)
|
||||
test_pts = CartesianDomain({'x': [0, 1], 'y': [0, 1]}).sample(10)
|
||||
assert new_pinn.forward(test_pts).extract(['u']).shape == (10, 1)
|
||||
assert new_pinn.forward(test_pts).extract(
|
||||
['u']).shape == pinn.forward(test_pts).extract(['u']).shape
|
||||
torch.testing.assert_close(
|
||||
new_pinn.forward(test_pts).extract(['u']),
|
||||
pinn.forward(test_pts).extract(['u']))
|
||||
import shutil
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
# # TODO fix asap. Basically sampling few variables
|
||||
# # works only if both variables are in a range.
|
||||
# # if one is fixed and the other not, this will
|
||||
# # not work. This test also needs to be fixed and
|
||||
# # insert in test problem not in test pinn.
|
||||
# def test_train_cpu_sampling_few_vars():
|
||||
# poisson_problem = Poisson()
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3']
|
||||
# n = 10
|
||||
# poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# poisson_problem.discretise_domain(n, 'random', locations=['gamma4'], variables=['x'])
|
||||
# poisson_problem.discretise_domain(n, 'random', locations=['gamma4'], variables=['y'])
|
||||
# pinn = PINN(problem = poisson_problem, model=model, extra_features=None, loss=LpLoss())
|
||||
# trainer = Trainer(solver=pinn, kwargs={'max_epochs' : 5, 'accelerator':'cpu'})
|
||||
# trainer.train()
|
||||
|
||||
|
||||
def test_train_extra_feats_cpu():
|
||||
poisson_problem = Poisson()
|
||||
boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
n = 10
|
||||
poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
|
||||
pinn = PINN(problem=poisson_problem,
|
||||
model=model_extra_feats,
|
||||
extra_features=extra_feats)
|
||||
trainer = Trainer(solver=pinn, max_epochs=5, accelerator='cpu')
|
||||
trainer.train()
|
||||
|
||||
|
||||
# TODO, fix GitHub actions to run also on GPU
|
||||
# def test_train_gpu():
|
||||
# poisson_problem = Poisson()
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# n = 10
|
||||
# poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# pinn = PINN(problem = poisson_problem, model=model, extra_features=None, loss=LpLoss())
|
||||
# trainer = Trainer(solver=pinn, kwargs={'max_epochs' : 5, 'accelerator':'gpu'})
|
||||
# trainer.train()
|
||||
|
||||
# def test_train_gpu(): #TODO fix ASAP
|
||||
# poisson_problem = Poisson()
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# n = 10
|
||||
# poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# poisson_problem.conditions.pop('data') # The input/output pts are allocated on cpu
|
||||
# pinn = PINN(problem = poisson_problem, model=model, extra_features=None, loss=LpLoss())
|
||||
# trainer = Trainer(solver=pinn, kwargs={'max_epochs' : 5, 'accelerator':'gpu'})
|
||||
# trainer.train()
|
||||
|
||||
# def test_train_2():
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# n = 10
|
||||
# expected_keys = [[], list(range(0, 50, 3))]
|
||||
# param = [0, 3]
|
||||
# for i, truth_key in zip(param, expected_keys):
|
||||
# pinn = PINN(problem, model)
|
||||
# pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
# pinn.train(50, save_loss=i)
|
||||
# assert list(pinn.history_loss.keys()) == truth_key
|
||||
|
||||
|
||||
# def test_train_extra_feats():
|
||||
# pinn = PINN(problem, model_extra_feat, [myFeature()])
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# n = 10
|
||||
# pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
# pinn.train(5)
|
||||
|
||||
|
||||
# def test_train_2_extra_feats():
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# n = 10
|
||||
# expected_keys = [[], list(range(0, 50, 3))]
|
||||
# param = [0, 3]
|
||||
# for i, truth_key in zip(param, expected_keys):
|
||||
# pinn = PINN(problem, model_extra_feat, [myFeature()])
|
||||
# pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
# pinn.train(50, save_loss=i)
|
||||
# assert list(pinn.history_loss.keys()) == truth_key
|
||||
|
||||
|
||||
# def test_train_with_optimizer_kwargs():
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# n = 10
|
||||
# expected_keys = [[], list(range(0, 50, 3))]
|
||||
# param = [0, 3]
|
||||
# for i, truth_key in zip(param, expected_keys):
|
||||
# pinn = PINN(problem, model, optimizer_kwargs={'lr' : 0.3})
|
||||
# pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
# pinn.train(50, save_loss=i)
|
||||
# assert list(pinn.history_loss.keys()) == truth_key
|
||||
|
||||
|
||||
# def test_train_with_lr_scheduler():
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# n = 10
|
||||
# expected_keys = [[], list(range(0, 50, 3))]
|
||||
# param = [0, 3]
|
||||
# for i, truth_key in zip(param, expected_keys):
|
||||
# pinn = PINN(
|
||||
# problem,
|
||||
# model,
|
||||
# lr_scheduler_type=torch.optim.lr_scheduler.CyclicLR,
|
||||
# lr_scheduler_kwargs={'base_lr' : 0.1, 'max_lr' : 0.3, 'cycle_momentum': False}
|
||||
# )
|
||||
# pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
# pinn.train(50, save_loss=i)
|
||||
# assert list(pinn.history_loss.keys()) == truth_key
|
||||
|
||||
|
||||
# # def test_train_batch():
|
||||
# # pinn = PINN(problem, model, batch_size=6)
|
||||
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# # n = 10
|
||||
# # pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# # pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
# # pinn.train(5)
|
||||
|
||||
|
||||
# # def test_train_batch_2():
|
||||
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# # n = 10
|
||||
# # expected_keys = [[], list(range(0, 50, 3))]
|
||||
# # param = [0, 3]
|
||||
# # for i, truth_key in zip(param, expected_keys):
|
||||
# # pinn = PINN(problem, model, batch_size=6)
|
||||
# # pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# # pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
# # pinn.train(50, save_loss=i)
|
||||
# # assert list(pinn.history_loss.keys()) == truth_key
|
||||
|
||||
|
||||
# if torch.cuda.is_available():
|
||||
|
||||
# # def test_gpu_train():
|
||||
# # pinn = PINN(problem, model, batch_size=20, device='cuda')
|
||||
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# # n = 100
|
||||
# # pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# # pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
# # pinn.train(5)
|
||||
|
||||
# def test_gpu_train_nobatch():
|
||||
# pinn = PINN(problem, model, batch_size=None, device='cuda')
|
||||
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
|
||||
# n = 100
|
||||
# pinn.discretise_domain(n, 'grid', locations=boundaries)
|
||||
# pinn.discretise_domain(n, 'grid', locations=['D'])
|
||||
# pinn.train(5)
|
||||
|
||||
Reference in New Issue
Block a user