Files
PINA/tests/test_solvers/test_causalpinn.py
Nicola Demo f0d68b34c7 refact
2025-03-19 17:46:33 +01:00

278 lines
9.7 KiB
Python

import torch
import pytest
from pina.problem import TimeDependentProblem, InverseProblem, SpatialProblem
from pina.operators import grad
from pina.domain import CartesianDomain
from pina import Condition, LabelTensor
from pina.solvers import CausalPINN
from pina.trainer import Trainer
from pina.model import FeedForward
from pina.equation.equation import Equation
from pina.equation.equation_factory import FixedValue
from pina.loss.loss_interface import LpLoss
class FooProblem(SpatialProblem):
'''
Foo problem formulation.
'''
output_variables = ['u']
conditions = {}
spatial_domain = None
class InverseDiffusionReactionSystem(TimeDependentProblem, SpatialProblem, InverseProblem):
def diffusionreaction(input_, output_, params_):
x = input_.extract('x')
t = input_.extract('t')
u_t = grad(output_, input_, d='t')
u_x = grad(output_, input_, d='x')
u_xx = grad(u_x, input_, d='x')
r = torch.exp(-t) * (1.5 * torch.sin(2*x) + (8/3)*torch.sin(3*x) +
(15/4)*torch.sin(4*x) + (63/8)*torch.sin(8*x))
return u_t - params_['mu']*u_xx - r
def _solution(self, pts):
t = pts.extract('t')
x = pts.extract('x')
return torch.exp(-t) * (torch.sin(x) + (1/2)*torch.sin(2*x) +
(1/3)*torch.sin(3*x) + (1/4)*torch.sin(4*x) +
(1/8)*torch.sin(8*x))
# assign output/ spatial and temporal variables
output_variables = ['u']
spatial_domain = CartesianDomain({'x': [-torch.pi, torch.pi]})
temporal_domain = CartesianDomain({'t': [0, 1]})
unknown_parameter_domain = CartesianDomain({'mu': [-1, 1]})
# problem condition statement
conditions = {
'D': Condition(location=CartesianDomain({'x': [-torch.pi, torch.pi],
't': [0, 1]}),
equation=Equation(diffusionreaction)),
'data' : Condition(input_points=LabelTensor(torch.tensor([[0., 0.]]), ['x', 't']),
output_points=LabelTensor(torch.tensor([[0.]]), ['u'])),
}
class DiffusionReactionSystem(TimeDependentProblem, SpatialProblem):
def diffusionreaction(input_, output_):
x = input_.extract('x')
t = input_.extract('t')
u_t = grad(output_, input_, d='t')
u_x = grad(output_, input_, d='x')
u_xx = grad(u_x, input_, d='x')
r = torch.exp(-t) * (1.5 * torch.sin(2*x) + (8/3)*torch.sin(3*x) +
(15/4)*torch.sin(4*x) + (63/8)*torch.sin(8*x))
return u_t - u_xx - r
def _solution(self, pts):
t = pts.extract('t')
x = pts.extract('x')
return torch.exp(-t) * (torch.sin(x) + (1/2)*torch.sin(2*x) +
(1/3)*torch.sin(3*x) + (1/4)*torch.sin(4*x) +
(1/8)*torch.sin(8*x))
# assign output/ spatial and temporal variables
output_variables = ['u']
spatial_domain = CartesianDomain({'x': [-torch.pi, torch.pi]})
temporal_domain = CartesianDomain({'t': [0, 1]})
# problem condition statement
conditions = {
'D': Condition(location=CartesianDomain({'x': [-torch.pi, torch.pi],
't': [0, 1]}),
equation=Equation(diffusionreaction)),
}
class myFeature(torch.nn.Module):
"""
Feature: sin(x)
"""
def __init__(self):
super(myFeature, self).__init__()
def forward(self, x):
t = (torch.sin(x.extract(['x']) * torch.pi))
return LabelTensor(t, ['sin(x)'])
# make the problem
problem = DiffusionReactionSystem()
model = FeedForward(len(problem.input_variables),
len(problem.output_variables))
model_extra_feats = FeedForward(
len(problem.input_variables) + 1,
len(problem.output_variables))
extra_feats = [myFeature()]
def test_constructor():
CausalPINN(problem=problem, model=model, extra_features=None)
with pytest.raises(ValueError):
CausalPINN(FooProblem(), model=model, extra_features=None)
def test_constructor_extra_feats():
model_extra_feats = FeedForward(
len(problem.input_variables) + 1,
len(problem.output_variables))
CausalPINN(problem=problem,
model=model_extra_feats,
extra_features=extra_feats)
def test_train_cpu():
problem = DiffusionReactionSystem()
boundaries = ['D']
n = 10
problem.discretise_domain(n, 'grid', locations=boundaries)
pinn = CausalPINN(problem = problem,
model=model, extra_features=None, loss=LpLoss())
trainer = Trainer(solver=pinn, max_epochs=1,
accelerator='cpu', batch_size=20)
trainer.train()
def test_log():
problem.discretise_domain(100)
solver = CausalPINN(problem = problem,
model=model, extra_features=None, loss=LpLoss())
trainer = Trainer(solver, max_epochs=2, accelerator='cpu')
trainer.train()
# assert the logged metrics are correct
logged_metrics = sorted(list(trainer.logged_metrics.keys()))
total_metrics = sorted(
list([key + '_loss' for key in problem.conditions.keys()])
+ ['mean_loss'])
assert logged_metrics == total_metrics
def test_train_restore():
tmpdir = "tests/tmp_restore"
problem = DiffusionReactionSystem()
boundaries = ['D']
n = 10
problem.discretise_domain(n, 'grid', locations=boundaries)
pinn = CausalPINN(problem=problem,
model=model,
extra_features=None,
loss=LpLoss())
trainer = Trainer(solver=pinn,
max_epochs=5,
accelerator='cpu',
default_root_dir=tmpdir)
trainer.train()
ntrainer = Trainer(solver=pinn, max_epochs=15, accelerator='cpu')
t = ntrainer.train(
ckpt_path=f'{tmpdir}/lightning_logs/version_0/'
'checkpoints/epoch=4-step=5.ckpt')
import shutil
shutil.rmtree(tmpdir)
def test_train_load():
tmpdir = "tests/tmp_load"
problem = DiffusionReactionSystem()
boundaries = ['D']
n = 10
problem.discretise_domain(n, 'grid', locations=boundaries)
pinn = CausalPINN(problem=problem,
model=model,
extra_features=None,
loss=LpLoss())
trainer = Trainer(solver=pinn,
max_epochs=15,
accelerator='cpu',
default_root_dir=tmpdir)
trainer.train()
new_pinn = CausalPINN.load_from_checkpoint(
f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=14-step=15.ckpt',
problem = problem, model=model)
test_pts = CartesianDomain({'x': [0, 1], 't': [0, 1]}).sample(10)
assert new_pinn.forward(test_pts).extract(['u']).shape == (10, 1)
assert new_pinn.forward(test_pts).extract(
['u']).shape == pinn.forward(test_pts).extract(['u']).shape
torch.testing.assert_close(
new_pinn.forward(test_pts).extract(['u']),
pinn.forward(test_pts).extract(['u']))
import shutil
shutil.rmtree(tmpdir)
def test_train_inverse_problem_cpu():
problem = InverseDiffusionReactionSystem()
boundaries = ['D']
n = 100
problem.discretise_domain(n, 'random', locations=boundaries)
pinn = CausalPINN(problem = problem,
model=model, extra_features=None, loss=LpLoss())
trainer = Trainer(solver=pinn, max_epochs=1,
accelerator='cpu', batch_size=20)
trainer.train()
# # TODO does not currently work
# def test_train_inverse_problem_restore():
# tmpdir = "tests/tmp_restore_inv"
# problem = InverseDiffusionReactionSystem()
# boundaries = ['D']
# n = 100
# problem.discretise_domain(n, 'random', locations=boundaries)
# pinn = CausalPINN(problem=problem,
# model=model,
# extra_features=None,
# loss=LpLoss())
# trainer = Trainer(solver=pinn,
# max_epochs=5,
# accelerator='cpu',
# default_root_dir=tmpdir)
# trainer.train()
# ntrainer = Trainer(solver=pinn, max_epochs=5, accelerator='cpu')
# t = ntrainer.train(
# ckpt_path=f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=4-step=5.ckpt')
# import shutil
# shutil.rmtree(tmpdir)
def test_train_inverse_problem_load():
tmpdir = "tests/tmp_load_inv"
problem = InverseDiffusionReactionSystem()
boundaries = ['D']
n = 100
problem.discretise_domain(n, 'random', locations=boundaries)
pinn = CausalPINN(problem=problem,
model=model,
extra_features=None,
loss=LpLoss())
trainer = Trainer(solver=pinn,
max_epochs=15,
accelerator='cpu',
default_root_dir=tmpdir)
trainer.train()
new_pinn = CausalPINN.load_from_checkpoint(
f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=14-step=30.ckpt',
problem = problem, model=model)
test_pts = CartesianDomain({'x': [0, 1], 't': [0, 1]}).sample(10)
assert new_pinn.forward(test_pts).extract(['u']).shape == (10, 1)
assert new_pinn.forward(test_pts).extract(
['u']).shape == pinn.forward(test_pts).extract(['u']).shape
torch.testing.assert_close(
new_pinn.forward(test_pts).extract(['u']),
pinn.forward(test_pts).extract(['u']))
import shutil
shutil.rmtree(tmpdir)
def test_train_extra_feats_cpu():
problem = DiffusionReactionSystem()
boundaries = ['D']
n = 10
problem.discretise_domain(n, 'grid', locations=boundaries)
pinn = CausalPINN(problem=problem,
model=model_extra_feats,
extra_features=extra_feats)
trainer = Trainer(solver=pinn, max_epochs=5, accelerator='cpu')
trainer.train()