import torch import pytest from pina.problem import TimeDependentProblem, InverseProblem, SpatialProblem from pina.operators import grad from pina.domain import CartesianDomain from pina import Condition, LabelTensor from pina.solvers import CausalPINN from pina.trainer import Trainer from pina.model import FeedForward from pina.equation.equation import Equation from pina.equation.equation_factory import FixedValue from pina.loss.loss_interface import LpLoss class FooProblem(SpatialProblem): ''' Foo problem formulation. ''' output_variables = ['u'] conditions = {} spatial_domain = None class InverseDiffusionReactionSystem(TimeDependentProblem, SpatialProblem, InverseProblem): def diffusionreaction(input_, output_, params_): x = input_.extract('x') t = input_.extract('t') u_t = grad(output_, input_, d='t') u_x = grad(output_, input_, d='x') u_xx = grad(u_x, input_, d='x') r = torch.exp(-t) * (1.5 * torch.sin(2*x) + (8/3)*torch.sin(3*x) + (15/4)*torch.sin(4*x) + (63/8)*torch.sin(8*x)) return u_t - params_['mu']*u_xx - r def _solution(self, pts): t = pts.extract('t') x = pts.extract('x') return torch.exp(-t) * (torch.sin(x) + (1/2)*torch.sin(2*x) + (1/3)*torch.sin(3*x) + (1/4)*torch.sin(4*x) + (1/8)*torch.sin(8*x)) # assign output/ spatial and temporal variables output_variables = ['u'] spatial_domain = CartesianDomain({'x': [-torch.pi, torch.pi]}) temporal_domain = CartesianDomain({'t': [0, 1]}) unknown_parameter_domain = CartesianDomain({'mu': [-1, 1]}) # problem condition statement conditions = { 'D': Condition(location=CartesianDomain({'x': [-torch.pi, torch.pi], 't': [0, 1]}), equation=Equation(diffusionreaction)), 'data' : Condition(input_points=LabelTensor(torch.tensor([[0., 0.]]), ['x', 't']), output_points=LabelTensor(torch.tensor([[0.]]), ['u'])), } class DiffusionReactionSystem(TimeDependentProblem, SpatialProblem): def diffusionreaction(input_, output_): x = input_.extract('x') t = input_.extract('t') u_t = grad(output_, input_, d='t') u_x = grad(output_, input_, d='x') u_xx = grad(u_x, input_, d='x') r = torch.exp(-t) * (1.5 * torch.sin(2*x) + (8/3)*torch.sin(3*x) + (15/4)*torch.sin(4*x) + (63/8)*torch.sin(8*x)) return u_t - u_xx - r def _solution(self, pts): t = pts.extract('t') x = pts.extract('x') return torch.exp(-t) * (torch.sin(x) + (1/2)*torch.sin(2*x) + (1/3)*torch.sin(3*x) + (1/4)*torch.sin(4*x) + (1/8)*torch.sin(8*x)) # assign output/ spatial and temporal variables output_variables = ['u'] spatial_domain = CartesianDomain({'x': [-torch.pi, torch.pi]}) temporal_domain = CartesianDomain({'t': [0, 1]}) # problem condition statement conditions = { 'D': Condition(location=CartesianDomain({'x': [-torch.pi, torch.pi], 't': [0, 1]}), equation=Equation(diffusionreaction)), } class myFeature(torch.nn.Module): """ Feature: sin(x) """ def __init__(self): super(myFeature, self).__init__() def forward(self, x): t = (torch.sin(x.extract(['x']) * torch.pi)) return LabelTensor(t, ['sin(x)']) # make the problem problem = DiffusionReactionSystem() model = FeedForward(len(problem.input_variables), len(problem.output_variables)) model_extra_feats = FeedForward( len(problem.input_variables) + 1, len(problem.output_variables)) extra_feats = [myFeature()] def test_constructor(): CausalPINN(problem=problem, model=model, extra_features=None) with pytest.raises(ValueError): CausalPINN(FooProblem(), model=model, extra_features=None) def test_constructor_extra_feats(): model_extra_feats = FeedForward( len(problem.input_variables) + 1, len(problem.output_variables)) CausalPINN(problem=problem, model=model_extra_feats, extra_features=extra_feats) def test_train_cpu(): problem = DiffusionReactionSystem() boundaries = ['D'] n = 10 problem.discretise_domain(n, 'grid', locations=boundaries) pinn = CausalPINN(problem = problem, model=model, extra_features=None, loss=LpLoss()) trainer = Trainer(solver=pinn, max_epochs=1, accelerator='cpu', batch_size=20) trainer.train() def test_log(): problem.discretise_domain(100) solver = CausalPINN(problem = problem, model=model, extra_features=None, loss=LpLoss()) trainer = Trainer(solver, max_epochs=2, accelerator='cpu') trainer.train() # assert the logged metrics are correct logged_metrics = sorted(list(trainer.logged_metrics.keys())) total_metrics = sorted( list([key + '_loss' for key in problem.conditions.keys()]) + ['mean_loss']) assert logged_metrics == total_metrics def test_train_restore(): tmpdir = "tests/tmp_restore" problem = DiffusionReactionSystem() boundaries = ['D'] n = 10 problem.discretise_domain(n, 'grid', locations=boundaries) pinn = CausalPINN(problem=problem, model=model, extra_features=None, loss=LpLoss()) trainer = Trainer(solver=pinn, max_epochs=5, accelerator='cpu', default_root_dir=tmpdir) trainer.train() ntrainer = Trainer(solver=pinn, max_epochs=15, accelerator='cpu') t = ntrainer.train( ckpt_path=f'{tmpdir}/lightning_logs/version_0/' 'checkpoints/epoch=4-step=5.ckpt') import shutil shutil.rmtree(tmpdir) def test_train_load(): tmpdir = "tests/tmp_load" problem = DiffusionReactionSystem() boundaries = ['D'] n = 10 problem.discretise_domain(n, 'grid', locations=boundaries) pinn = CausalPINN(problem=problem, model=model, extra_features=None, loss=LpLoss()) trainer = Trainer(solver=pinn, max_epochs=15, accelerator='cpu', default_root_dir=tmpdir) trainer.train() new_pinn = CausalPINN.load_from_checkpoint( f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=14-step=15.ckpt', problem = problem, model=model) test_pts = CartesianDomain({'x': [0, 1], 't': [0, 1]}).sample(10) assert new_pinn.forward(test_pts).extract(['u']).shape == (10, 1) assert new_pinn.forward(test_pts).extract( ['u']).shape == pinn.forward(test_pts).extract(['u']).shape torch.testing.assert_close( new_pinn.forward(test_pts).extract(['u']), pinn.forward(test_pts).extract(['u'])) import shutil shutil.rmtree(tmpdir) def test_train_inverse_problem_cpu(): problem = InverseDiffusionReactionSystem() boundaries = ['D'] n = 100 problem.discretise_domain(n, 'random', locations=boundaries) pinn = CausalPINN(problem = problem, model=model, extra_features=None, loss=LpLoss()) trainer = Trainer(solver=pinn, max_epochs=1, accelerator='cpu', batch_size=20) trainer.train() # # TODO does not currently work # def test_train_inverse_problem_restore(): # tmpdir = "tests/tmp_restore_inv" # problem = InverseDiffusionReactionSystem() # boundaries = ['D'] # n = 100 # problem.discretise_domain(n, 'random', locations=boundaries) # pinn = CausalPINN(problem=problem, # model=model, # extra_features=None, # loss=LpLoss()) # trainer = Trainer(solver=pinn, # max_epochs=5, # accelerator='cpu', # default_root_dir=tmpdir) # trainer.train() # ntrainer = Trainer(solver=pinn, max_epochs=5, accelerator='cpu') # t = ntrainer.train( # ckpt_path=f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=4-step=5.ckpt') # import shutil # shutil.rmtree(tmpdir) def test_train_inverse_problem_load(): tmpdir = "tests/tmp_load_inv" problem = InverseDiffusionReactionSystem() boundaries = ['D'] n = 100 problem.discretise_domain(n, 'random', locations=boundaries) pinn = CausalPINN(problem=problem, model=model, extra_features=None, loss=LpLoss()) trainer = Trainer(solver=pinn, max_epochs=15, accelerator='cpu', default_root_dir=tmpdir) trainer.train() new_pinn = CausalPINN.load_from_checkpoint( f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=14-step=30.ckpt', problem = problem, model=model) test_pts = CartesianDomain({'x': [0, 1], 't': [0, 1]}).sample(10) assert new_pinn.forward(test_pts).extract(['u']).shape == (10, 1) assert new_pinn.forward(test_pts).extract( ['u']).shape == pinn.forward(test_pts).extract(['u']).shape torch.testing.assert_close( new_pinn.forward(test_pts).extract(['u']), pinn.forward(test_pts).extract(['u'])) import shutil shutil.rmtree(tmpdir) def test_train_extra_feats_cpu(): problem = DiffusionReactionSystem() boundaries = ['D'] n = 10 problem.discretise_domain(n, 'grid', locations=boundaries) pinn = CausalPINN(problem=problem, model=model_extra_feats, extra_features=extra_feats) trainer = Trainer(solver=pinn, max_epochs=5, accelerator='cpu') trainer.train()