fix tests

This commit is contained in:
Nicola Demo
2025-01-23 09:52:23 +01:00
parent 9aed1a30b3
commit a899327de1
32 changed files with 2331 additions and 2428 deletions

View File

@@ -8,219 +8,155 @@ from pina import Condition, LabelTensor
from pina.solvers import CausalPINN
from pina.trainer import Trainer
from pina.model import FeedForward
from pina.equation.equation import Equation
from pina.equation import Equation
from pina.equation.equation_factory import FixedValue
from pina.loss.loss_interface import LpLoss
from pina.loss import LpLoss
class FooProblem(SpatialProblem):
'''
Foo problem formulation.
'''
output_variables = ['u']
conditions = {}
spatial_domain = None
# class FooProblem(SpatialProblem):
# '''
# Foo problem formulation.
# '''
# output_variables = ['u']
# conditions = {}
# spatial_domain = None
class InverseDiffusionReactionSystem(TimeDependentProblem, SpatialProblem, InverseProblem):
# class InverseDiffusionReactionSystem(TimeDependentProblem, SpatialProblem, InverseProblem):
def diffusionreaction(input_, output_, params_):
x = input_.extract('x')
t = input_.extract('t')
u_t = grad(output_, input_, d='t')
u_x = grad(output_, input_, d='x')
u_xx = grad(u_x, input_, d='x')
r = torch.exp(-t) * (1.5 * torch.sin(2*x) + (8/3)*torch.sin(3*x) +
(15/4)*torch.sin(4*x) + (63/8)*torch.sin(8*x))
return u_t - params_['mu']*u_xx - r
# def diffusionreaction(input_, output_, params_):
# x = input_.extract('x')
# t = input_.extract('t')
# u_t = grad(output_, input_, d='t')
# u_x = grad(output_, input_, d='x')
# u_xx = grad(u_x, input_, d='x')
# r = torch.exp(-t) * (1.5 * torch.sin(2*x) + (8/3)*torch.sin(3*x) +
# (15/4)*torch.sin(4*x) + (63/8)*torch.sin(8*x))
# return u_t - params_['mu']*u_xx - r
def _solution(self, pts):
t = pts.extract('t')
x = pts.extract('x')
return torch.exp(-t) * (torch.sin(x) + (1/2)*torch.sin(2*x) +
(1/3)*torch.sin(3*x) + (1/4)*torch.sin(4*x) +
(1/8)*torch.sin(8*x))
# def _solution(self, pts):
# t = pts.extract('t')
# x = pts.extract('x')
# return torch.exp(-t) * (torch.sin(x) + (1/2)*torch.sin(2*x) +
# (1/3)*torch.sin(3*x) + (1/4)*torch.sin(4*x) +
# (1/8)*torch.sin(8*x))
# assign output/ spatial and temporal variables
output_variables = ['u']
spatial_domain = CartesianDomain({'x': [-torch.pi, torch.pi]})
temporal_domain = CartesianDomain({'t': [0, 1]})
unknown_parameter_domain = CartesianDomain({'mu': [-1, 1]})
# # assign output/ spatial and temporal variables
# output_variables = ['u']
# spatial_domain = CartesianDomain({'x': [-torch.pi, torch.pi]})
# temporal_domain = CartesianDomain({'t': [0, 1]})
# unknown_parameter_domain = CartesianDomain({'mu': [-1, 1]})
# problem condition statement
conditions = {
'D': Condition(location=CartesianDomain({'x': [-torch.pi, torch.pi],
't': [0, 1]}),
equation=Equation(diffusionreaction)),
'data' : Condition(input_points=LabelTensor(torch.tensor([[0., 0.]]), ['x', 't']),
output_points=LabelTensor(torch.tensor([[0.]]), ['u'])),
}
# # problem condition statement
# conditions = {
# 'D': Condition(location=CartesianDomain({'x': [-torch.pi, torch.pi],
# 't': [0, 1]}),
# equation=Equation(diffusionreaction)),
# 'data' : Condition(input_points=LabelTensor(torch.tensor([[0., 0.]]), ['x', 't']),
# output_points=LabelTensor(torch.tensor([[0.]]), ['u'])),
# }
class DiffusionReactionSystem(TimeDependentProblem, SpatialProblem):
# class DiffusionReactionSystem(TimeDependentProblem, SpatialProblem):
def diffusionreaction(input_, output_):
x = input_.extract('x')
t = input_.extract('t')
u_t = grad(output_, input_, d='t')
u_x = grad(output_, input_, d='x')
u_xx = grad(u_x, input_, d='x')
r = torch.exp(-t) * (1.5 * torch.sin(2*x) + (8/3)*torch.sin(3*x) +
(15/4)*torch.sin(4*x) + (63/8)*torch.sin(8*x))
return u_t - u_xx - r
# def diffusionreaction(input_, output_):
# x = input_.extract('x')
# t = input_.extract('t')
# u_t = grad(output_, input_, d='t')
# u_x = grad(output_, input_, d='x')
# u_xx = grad(u_x, input_, d='x')
# r = torch.exp(-t) * (1.5 * torch.sin(2*x) + (8/3)*torch.sin(3*x) +
# (15/4)*torch.sin(4*x) + (63/8)*torch.sin(8*x))
# return u_t - u_xx - r
def _solution(self, pts):
t = pts.extract('t')
x = pts.extract('x')
return torch.exp(-t) * (torch.sin(x) + (1/2)*torch.sin(2*x) +
(1/3)*torch.sin(3*x) + (1/4)*torch.sin(4*x) +
(1/8)*torch.sin(8*x))
# def _solution(self, pts):
# t = pts.extract('t')
# x = pts.extract('x')
# return torch.exp(-t) * (torch.sin(x) + (1/2)*torch.sin(2*x) +
# (1/3)*torch.sin(3*x) + (1/4)*torch.sin(4*x) +
# (1/8)*torch.sin(8*x))
# assign output/ spatial and temporal variables
output_variables = ['u']
spatial_domain = CartesianDomain({'x': [-torch.pi, torch.pi]})
temporal_domain = CartesianDomain({'t': [0, 1]})
# # assign output/ spatial and temporal variables
# output_variables = ['u']
# spatial_domain = CartesianDomain({'x': [-torch.pi, torch.pi]})
# temporal_domain = CartesianDomain({'t': [0, 1]})
# problem condition statement
conditions = {
'D': Condition(location=CartesianDomain({'x': [-torch.pi, torch.pi],
't': [0, 1]}),
equation=Equation(diffusionreaction)),
}
# # problem condition statement
# conditions = {
# 'D': Condition(location=CartesianDomain({'x': [-torch.pi, torch.pi],
# 't': [0, 1]}),
# equation=Equation(diffusionreaction)),
# }
class myFeature(torch.nn.Module):
"""
Feature: sin(x)
"""
# class myFeature(torch.nn.Module):
# """
# Feature: sin(x)
# """
def __init__(self):
super(myFeature, self).__init__()
# def __init__(self):
# super(myFeature, self).__init__()
def forward(self, x):
t = (torch.sin(x.extract(['x']) * torch.pi))
return LabelTensor(t, ['sin(x)'])
# def forward(self, x):
# t = (torch.sin(x.extract(['x']) * torch.pi))
# return LabelTensor(t, ['sin(x)'])
# make the problem
problem = DiffusionReactionSystem()
model = FeedForward(len(problem.input_variables),
len(problem.output_variables))
model_extra_feats = FeedForward(
len(problem.input_variables) + 1,
len(problem.output_variables))
extra_feats = [myFeature()]
# # make the problem
# problem = DiffusionReactionSystem()
# model = FeedForward(len(problem.input_variables),
# len(problem.output_variables))
# model_extra_feats = FeedForward(
# len(problem.input_variables) + 1,
# len(problem.output_variables))
# extra_feats = [myFeature()]
def test_constructor():
CausalPINN(problem=problem, model=model, extra_features=None)
# def test_constructor():
# CausalPINN(problem=problem, model=model, extra_features=None)
with pytest.raises(ValueError):
CausalPINN(FooProblem(), model=model, extra_features=None)
# with pytest.raises(ValueError):
# CausalPINN(FooProblem(), model=model, extra_features=None)
def test_constructor_extra_feats():
model_extra_feats = FeedForward(
len(problem.input_variables) + 1,
len(problem.output_variables))
CausalPINN(problem=problem,
model=model_extra_feats,
extra_features=extra_feats)
# def test_constructor_extra_feats():
# model_extra_feats = FeedForward(
# len(problem.input_variables) + 1,
# len(problem.output_variables))
# CausalPINN(problem=problem,
# model=model_extra_feats,
# extra_features=extra_feats)
def test_train_cpu():
problem = DiffusionReactionSystem()
boundaries = ['D']
n = 10
problem.discretise_domain(n, 'grid', locations=boundaries)
pinn = CausalPINN(problem = problem,
model=model, extra_features=None, loss=LpLoss())
trainer = Trainer(solver=pinn, max_epochs=1,
accelerator='cpu', batch_size=20)
trainer.train()
def test_log():
problem.discretise_domain(100)
solver = CausalPINN(problem = problem,
model=model, extra_features=None, loss=LpLoss())
trainer = Trainer(solver, max_epochs=2, accelerator='cpu')
trainer.train()
# assert the logged metrics are correct
logged_metrics = sorted(list(trainer.logged_metrics.keys()))
total_metrics = sorted(
list([key + '_loss' for key in problem.conditions.keys()])
+ ['mean_loss'])
assert logged_metrics == total_metrics
def test_train_restore():
tmpdir = "tests/tmp_restore"
problem = DiffusionReactionSystem()
boundaries = ['D']
n = 10
problem.discretise_domain(n, 'grid', locations=boundaries)
pinn = CausalPINN(problem=problem,
model=model,
extra_features=None,
loss=LpLoss())
trainer = Trainer(solver=pinn,
max_epochs=5,
accelerator='cpu',
default_root_dir=tmpdir)
trainer.train()
ntrainer = Trainer(solver=pinn, max_epochs=15, accelerator='cpu')
t = ntrainer.train(
ckpt_path=f'{tmpdir}/lightning_logs/version_0/'
'checkpoints/epoch=4-step=5.ckpt')
import shutil
shutil.rmtree(tmpdir)
def test_train_load():
tmpdir = "tests/tmp_load"
problem = DiffusionReactionSystem()
boundaries = ['D']
n = 10
problem.discretise_domain(n, 'grid', locations=boundaries)
pinn = CausalPINN(problem=problem,
model=model,
extra_features=None,
loss=LpLoss())
trainer = Trainer(solver=pinn,
max_epochs=15,
accelerator='cpu',
default_root_dir=tmpdir)
trainer.train()
new_pinn = CausalPINN.load_from_checkpoint(
f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=14-step=15.ckpt',
problem = problem, model=model)
test_pts = CartesianDomain({'x': [0, 1], 't': [0, 1]}).sample(10)
assert new_pinn.forward(test_pts).extract(['u']).shape == (10, 1)
assert new_pinn.forward(test_pts).extract(
['u']).shape == pinn.forward(test_pts).extract(['u']).shape
torch.testing.assert_close(
new_pinn.forward(test_pts).extract(['u']),
pinn.forward(test_pts).extract(['u']))
import shutil
shutil.rmtree(tmpdir)
def test_train_inverse_problem_cpu():
problem = InverseDiffusionReactionSystem()
boundaries = ['D']
n = 100
problem.discretise_domain(n, 'random', locations=boundaries)
pinn = CausalPINN(problem = problem,
model=model, extra_features=None, loss=LpLoss())
trainer = Trainer(solver=pinn, max_epochs=1,
accelerator='cpu', batch_size=20)
trainer.train()
# # TODO does not currently work
# def test_train_inverse_problem_restore():
# tmpdir = "tests/tmp_restore_inv"
# problem = InverseDiffusionReactionSystem()
# def test_train_cpu():
# problem = DiffusionReactionSystem()
# boundaries = ['D']
# n = 100
# problem.discretise_domain(n, 'random', locations=boundaries)
# n = 10
# problem.discretise_domain(n, 'grid', locations=boundaries)
# pinn = CausalPINN(problem = problem,
# model=model, extra_features=None, loss=LpLoss())
# trainer = Trainer(solver=pinn, max_epochs=1,
# accelerator='cpu', batch_size=20)
# trainer.train()
# def test_log():
# problem.discretise_domain(100)
# solver = CausalPINN(problem = problem,
# model=model, extra_features=None, loss=LpLoss())
# trainer = Trainer(solver, max_epochs=2, accelerator='cpu')
# trainer.train()
# # assert the logged metrics are correct
# logged_metrics = sorted(list(trainer.logged_metrics.keys()))
# total_metrics = sorted(
# list([key + '_loss' for key in problem.conditions.keys()])
# + ['mean_loss'])
# assert logged_metrics == total_metrics
# def test_train_restore():
# tmpdir = "tests/tmp_restore"
# problem = DiffusionReactionSystem()
# boundaries = ['D']
# n = 10
# problem.discretise_domain(n, 'grid', locations=boundaries)
# pinn = CausalPINN(problem=problem,
# model=model,
# extra_features=None,
@@ -230,49 +166,113 @@ def test_train_inverse_problem_cpu():
# accelerator='cpu',
# default_root_dir=tmpdir)
# trainer.train()
# ntrainer = Trainer(solver=pinn, max_epochs=5, accelerator='cpu')
# ntrainer = Trainer(solver=pinn, max_epochs=15, accelerator='cpu')
# t = ntrainer.train(
# ckpt_path=f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=4-step=5.ckpt')
# ckpt_path=f'{tmpdir}/lightning_logs/version_0/'
# 'checkpoints/epoch=4-step=5.ckpt')
# import shutil
# shutil.rmtree(tmpdir)
def test_train_inverse_problem_load():
tmpdir = "tests/tmp_load_inv"
problem = InverseDiffusionReactionSystem()
boundaries = ['D']
n = 100
problem.discretise_domain(n, 'random', locations=boundaries)
pinn = CausalPINN(problem=problem,
model=model,
extra_features=None,
loss=LpLoss())
trainer = Trainer(solver=pinn,
max_epochs=15,
accelerator='cpu',
default_root_dir=tmpdir)
trainer.train()
new_pinn = CausalPINN.load_from_checkpoint(
f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=14-step=30.ckpt',
problem = problem, model=model)
test_pts = CartesianDomain({'x': [0, 1], 't': [0, 1]}).sample(10)
assert new_pinn.forward(test_pts).extract(['u']).shape == (10, 1)
assert new_pinn.forward(test_pts).extract(
['u']).shape == pinn.forward(test_pts).extract(['u']).shape
torch.testing.assert_close(
new_pinn.forward(test_pts).extract(['u']),
pinn.forward(test_pts).extract(['u']))
import shutil
shutil.rmtree(tmpdir)
# def test_train_load():
# tmpdir = "tests/tmp_load"
# problem = DiffusionReactionSystem()
# boundaries = ['D']
# n = 10
# problem.discretise_domain(n, 'grid', locations=boundaries)
# pinn = CausalPINN(problem=problem,
# model=model,
# extra_features=None,
# loss=LpLoss())
# trainer = Trainer(solver=pinn,
# max_epochs=15,
# accelerator='cpu',
# default_root_dir=tmpdir)
# trainer.train()
# new_pinn = CausalPINN.load_from_checkpoint(
# f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=14-step=15.ckpt',
# problem = problem, model=model)
# test_pts = CartesianDomain({'x': [0, 1], 't': [0, 1]}).sample(10)
# assert new_pinn.forward(test_pts).extract(['u']).shape == (10, 1)
# assert new_pinn.forward(test_pts).extract(
# ['u']).shape == pinn.forward(test_pts).extract(['u']).shape
# torch.testing.assert_close(
# new_pinn.forward(test_pts).extract(['u']),
# pinn.forward(test_pts).extract(['u']))
# import shutil
# shutil.rmtree(tmpdir)
# def test_train_inverse_problem_cpu():
# problem = InverseDiffusionReactionSystem()
# boundaries = ['D']
# n = 100
# problem.discretise_domain(n, 'random', locations=boundaries)
# pinn = CausalPINN(problem = problem,
# model=model, extra_features=None, loss=LpLoss())
# trainer = Trainer(solver=pinn, max_epochs=1,
# accelerator='cpu', batch_size=20)
# trainer.train()
def test_train_extra_feats_cpu():
problem = DiffusionReactionSystem()
boundaries = ['D']
n = 10
problem.discretise_domain(n, 'grid', locations=boundaries)
pinn = CausalPINN(problem=problem,
model=model_extra_feats,
extra_features=extra_feats)
trainer = Trainer(solver=pinn, max_epochs=5, accelerator='cpu')
trainer.train()
# # # TODO does not currently work
# # def test_train_inverse_problem_restore():
# # tmpdir = "tests/tmp_restore_inv"
# # problem = InverseDiffusionReactionSystem()
# # boundaries = ['D']
# # n = 100
# # problem.discretise_domain(n, 'random', locations=boundaries)
# # pinn = CausalPINN(problem=problem,
# # model=model,
# # extra_features=None,
# # loss=LpLoss())
# # trainer = Trainer(solver=pinn,
# # max_epochs=5,
# # accelerator='cpu',
# # default_root_dir=tmpdir)
# # trainer.train()
# # ntrainer = Trainer(solver=pinn, max_epochs=5, accelerator='cpu')
# # t = ntrainer.train(
# # ckpt_path=f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=4-step=5.ckpt')
# # import shutil
# # shutil.rmtree(tmpdir)
# def test_train_inverse_problem_load():
# tmpdir = "tests/tmp_load_inv"
# problem = InverseDiffusionReactionSystem()
# boundaries = ['D']
# n = 100
# problem.discretise_domain(n, 'random', locations=boundaries)
# pinn = CausalPINN(problem=problem,
# model=model,
# extra_features=None,
# loss=LpLoss())
# trainer = Trainer(solver=pinn,
# max_epochs=15,
# accelerator='cpu',
# default_root_dir=tmpdir)
# trainer.train()
# new_pinn = CausalPINN.load_from_checkpoint(
# f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=14-step=30.ckpt',
# problem = problem, model=model)
# test_pts = CartesianDomain({'x': [0, 1], 't': [0, 1]}).sample(10)
# assert new_pinn.forward(test_pts).extract(['u']).shape == (10, 1)
# assert new_pinn.forward(test_pts).extract(
# ['u']).shape == pinn.forward(test_pts).extract(['u']).shape
# torch.testing.assert_close(
# new_pinn.forward(test_pts).extract(['u']),
# pinn.forward(test_pts).extract(['u']))
# import shutil
# shutil.rmtree(tmpdir)
# def test_train_extra_feats_cpu():
# problem = DiffusionReactionSystem()
# boundaries = ['D']
# n = 10
# problem.discretise_domain(n, 'grid', locations=boundaries)
# pinn = CausalPINN(problem=problem,
# model=model_extra_feats,
# extra_features=extra_feats)
# trainer = Trainer(solver=pinn, max_epochs=5, accelerator='cpu')
# trainer.train()