Files
PINA/tests/test_solvers/test_causalpinn.py
Dario Coscia e0429bb445 PINN variants addition and Solvers Update (#263)
* gpinn/basepinn new classes, pinn restructure
* codacy fix gpinn/basepinn/pinn
* inverse problem fix
* Causal PINN (#267)
* fix GPU training in inverse problem (#283)
* Create a `compute_residual` attribute for `PINNInterface`
* Modify dataloading in solvers (#286)
* Modify PINNInterface by removing _loss_phys, _loss_data
* Adding in PINNInterface a variable to track the current condition during training
* Modify GPINN,PINN,CausalPINN to match changes in PINNInterface
* Competitive Pinn Addition (#288)
* fixing after rebase/ fix loss
* fixing final issues

---------

Co-authored-by: Dario Coscia <dariocoscia@Dario-Coscia.local>

* Modify min max formulation to max min for paper consistency
* Adding SAPINN solver (#291)
* rom solver
* fix import

---------

Co-authored-by: Dario Coscia <dariocoscia@Dario-Coscia.local>
Co-authored-by: Anna Ivagnes <75523024+annaivagnes@users.noreply.github.com>
Co-authored-by: valc89 <103250118+valc89@users.noreply.github.com>
Co-authored-by: Monthly Tag bot <mtbot@noreply.github.com>
Co-authored-by: Nicola Demo <demo.nicola@gmail.com>
2024-05-10 14:07:01 +02:00

266 lines
9.2 KiB
Python

import torch
import pytest
from pina.problem import TimeDependentProblem, InverseProblem, SpatialProblem
from pina.operators import grad
from pina.geometry import CartesianDomain
from pina import Condition, LabelTensor
from pina.solvers import CausalPINN
from pina.trainer import Trainer
from pina.model import FeedForward
from pina.equation.equation import Equation
from pina.equation.equation_factory import FixedValue
from pina.loss import LpLoss
class FooProblem(SpatialProblem):
'''
Foo problem formulation.
'''
output_variables = ['u']
conditions = {}
spatial_domain = None
class InverseDiffusionReactionSystem(TimeDependentProblem, SpatialProblem, InverseProblem):
def diffusionreaction(input_, output_, params_):
x = input_.extract('x')
t = input_.extract('t')
u_t = grad(output_, input_, d='t')
u_x = grad(output_, input_, d='x')
u_xx = grad(u_x, input_, d='x')
r = torch.exp(-t) * (1.5 * torch.sin(2*x) + (8/3)*torch.sin(3*x) +
(15/4)*torch.sin(4*x) + (63/8)*torch.sin(8*x))
return u_t - params_['mu']*u_xx - r
def _solution(self, pts):
t = pts.extract('t')
x = pts.extract('x')
return torch.exp(-t) * (torch.sin(x) + (1/2)*torch.sin(2*x) +
(1/3)*torch.sin(3*x) + (1/4)*torch.sin(4*x) +
(1/8)*torch.sin(8*x))
# assign output/ spatial and temporal variables
output_variables = ['u']
spatial_domain = CartesianDomain({'x': [-torch.pi, torch.pi]})
temporal_domain = CartesianDomain({'t': [0, 1]})
unknown_parameter_domain = CartesianDomain({'mu': [-1, 1]})
# problem condition statement
conditions = {
'D': Condition(location=CartesianDomain({'x': [-torch.pi, torch.pi],
't': [0, 1]}),
equation=Equation(diffusionreaction)),
'data' : Condition(input_points=LabelTensor(torch.tensor([[0., 0.]]), ['x', 't']),
output_points=LabelTensor(torch.tensor([[0.]]), ['u'])),
}
class DiffusionReactionSystem(TimeDependentProblem, SpatialProblem):
def diffusionreaction(input_, output_):
x = input_.extract('x')
t = input_.extract('t')
u_t = grad(output_, input_, d='t')
u_x = grad(output_, input_, d='x')
u_xx = grad(u_x, input_, d='x')
r = torch.exp(-t) * (1.5 * torch.sin(2*x) + (8/3)*torch.sin(3*x) +
(15/4)*torch.sin(4*x) + (63/8)*torch.sin(8*x))
return u_t - u_xx - r
def _solution(self, pts):
t = pts.extract('t')
x = pts.extract('x')
return torch.exp(-t) * (torch.sin(x) + (1/2)*torch.sin(2*x) +
(1/3)*torch.sin(3*x) + (1/4)*torch.sin(4*x) +
(1/8)*torch.sin(8*x))
# assign output/ spatial and temporal variables
output_variables = ['u']
spatial_domain = CartesianDomain({'x': [-torch.pi, torch.pi]})
temporal_domain = CartesianDomain({'t': [0, 1]})
# problem condition statement
conditions = {
'D': Condition(location=CartesianDomain({'x': [-torch.pi, torch.pi],
't': [0, 1]}),
equation=Equation(diffusionreaction)),
}
class myFeature(torch.nn.Module):
"""
Feature: sin(x)
"""
def __init__(self):
super(myFeature, self).__init__()
def forward(self, x):
t = (torch.sin(x.extract(['x']) * torch.pi))
return LabelTensor(t, ['sin(x)'])
# make the problem
problem = DiffusionReactionSystem()
model = FeedForward(len(problem.input_variables),
len(problem.output_variables))
model_extra_feats = FeedForward(
len(problem.input_variables) + 1,
len(problem.output_variables))
extra_feats = [myFeature()]
def test_constructor():
CausalPINN(problem=problem, model=model, extra_features=None)
with pytest.raises(ValueError):
CausalPINN(FooProblem(), model=model, extra_features=None)
def test_constructor_extra_feats():
model_extra_feats = FeedForward(
len(problem.input_variables) + 1,
len(problem.output_variables))
CausalPINN(problem=problem,
model=model_extra_feats,
extra_features=extra_feats)
def test_train_cpu():
problem = DiffusionReactionSystem()
boundaries = ['D']
n = 10
problem.discretise_domain(n, 'grid', locations=boundaries)
pinn = CausalPINN(problem = problem,
model=model, extra_features=None, loss=LpLoss())
trainer = Trainer(solver=pinn, max_epochs=1,
accelerator='cpu', batch_size=20)
trainer.train()
def test_train_restore():
tmpdir = "tests/tmp_restore"
problem = DiffusionReactionSystem()
boundaries = ['D']
n = 10
problem.discretise_domain(n, 'grid', locations=boundaries)
pinn = CausalPINN(problem=problem,
model=model,
extra_features=None,
loss=LpLoss())
trainer = Trainer(solver=pinn,
max_epochs=5,
accelerator='cpu',
default_root_dir=tmpdir)
trainer.train()
ntrainer = Trainer(solver=pinn, max_epochs=15, accelerator='cpu')
t = ntrainer.train(
ckpt_path=f'{tmpdir}/lightning_logs/version_0/'
'checkpoints/epoch=4-step=5.ckpt')
import shutil
shutil.rmtree(tmpdir)
def test_train_load():
tmpdir = "tests/tmp_load"
problem = DiffusionReactionSystem()
boundaries = ['D']
n = 10
problem.discretise_domain(n, 'grid', locations=boundaries)
pinn = CausalPINN(problem=problem,
model=model,
extra_features=None,
loss=LpLoss())
trainer = Trainer(solver=pinn,
max_epochs=15,
accelerator='cpu',
default_root_dir=tmpdir)
trainer.train()
new_pinn = CausalPINN.load_from_checkpoint(
f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=14-step=15.ckpt',
problem = problem, model=model)
test_pts = CartesianDomain({'x': [0, 1], 't': [0, 1]}).sample(10)
assert new_pinn.forward(test_pts).extract(['u']).shape == (10, 1)
assert new_pinn.forward(test_pts).extract(
['u']).shape == pinn.forward(test_pts).extract(['u']).shape
torch.testing.assert_close(
new_pinn.forward(test_pts).extract(['u']),
pinn.forward(test_pts).extract(['u']))
import shutil
shutil.rmtree(tmpdir)
def test_train_inverse_problem_cpu():
problem = InverseDiffusionReactionSystem()
boundaries = ['D']
n = 100
problem.discretise_domain(n, 'random', locations=boundaries)
pinn = CausalPINN(problem = problem,
model=model, extra_features=None, loss=LpLoss())
trainer = Trainer(solver=pinn, max_epochs=1,
accelerator='cpu', batch_size=20)
trainer.train()
# # TODO does not currently work
# def test_train_inverse_problem_restore():
# tmpdir = "tests/tmp_restore_inv"
# problem = InverseDiffusionReactionSystem()
# boundaries = ['D']
# n = 100
# problem.discretise_domain(n, 'random', locations=boundaries)
# pinn = CausalPINN(problem=problem,
# model=model,
# extra_features=None,
# loss=LpLoss())
# trainer = Trainer(solver=pinn,
# max_epochs=5,
# accelerator='cpu',
# default_root_dir=tmpdir)
# trainer.train()
# ntrainer = Trainer(solver=pinn, max_epochs=5, accelerator='cpu')
# t = ntrainer.train(
# ckpt_path=f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=4-step=5.ckpt')
# import shutil
# shutil.rmtree(tmpdir)
def test_train_inverse_problem_load():
tmpdir = "tests/tmp_load_inv"
problem = InverseDiffusionReactionSystem()
boundaries = ['D']
n = 100
problem.discretise_domain(n, 'random', locations=boundaries)
pinn = CausalPINN(problem=problem,
model=model,
extra_features=None,
loss=LpLoss())
trainer = Trainer(solver=pinn,
max_epochs=15,
accelerator='cpu',
default_root_dir=tmpdir)
trainer.train()
new_pinn = CausalPINN.load_from_checkpoint(
f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=14-step=30.ckpt',
problem = problem, model=model)
test_pts = CartesianDomain({'x': [0, 1], 't': [0, 1]}).sample(10)
assert new_pinn.forward(test_pts).extract(['u']).shape == (10, 1)
assert new_pinn.forward(test_pts).extract(
['u']).shape == pinn.forward(test_pts).extract(['u']).shape
torch.testing.assert_close(
new_pinn.forward(test_pts).extract(['u']),
pinn.forward(test_pts).extract(['u']))
import shutil
shutil.rmtree(tmpdir)
def test_train_extra_feats_cpu():
problem = DiffusionReactionSystem()
boundaries = ['D']
n = 10
problem.discretise_domain(n, 'grid', locations=boundaries)
pinn = CausalPINN(problem=problem,
model=model_extra_feats,
extra_features=extra_feats)
trainer = Trainer(solver=pinn, max_epochs=5, accelerator='cpu')
trainer.train()