Update solvers (#434)

* Enable DDP training with batch_size=None and add validity check for split sizes
* Refactoring SolverInterfaces (#435)
* Solver update + weighting
* Updating PINN for 0.2
* Modify GAROM + tests
* Adding more versatile loggers
* Disable compilation when running on Windows
* Fix tests

---------

Co-authored-by: giovanni <giovanni.canali98@yahoo.it>
Co-authored-by: FilippoOlivo <filippo@filippoolivo.com>
This commit is contained in:
Dario Coscia
2025-02-17 11:26:21 +01:00
committed by Nicola Demo
parent 780c4921eb
commit 9cae9a438f
50 changed files with 2848 additions and 4187 deletions

View File

@@ -2,9 +2,9 @@ import torch
from pina.model import FNO
output_channels = 5
batch_size = 15
resolution = [30, 40, 50]
lifting_dim = 128
batch_size = 4
resolution = [4, 6, 8]
lifting_dim = 24
def test_constructor():

View File

@@ -1,49 +0,0 @@
import torch
import pytest
from pina.model.network import Network
from pina.model import FeedForward
from pina import LabelTensor
data = torch.rand((20, 3))
data_lt = LabelTensor(data, ['x', 'y', 'z'])
input_dim = 3
output_dim = 4
torchmodel = FeedForward(input_dim, output_dim)
extra_feat = []
def test_constructor():
Network(model=torchmodel,
input_variables=['x', 'y', 'z'],
output_variables=['a', 'b', 'c', 'd'],
extra_features=None)
def test_forward():
net = Network(model=torchmodel,
input_variables=['x', 'y', 'z'],
output_variables=['a', 'b', 'c', 'd'],
extra_features=None)
out = net.torchmodel(data)
out_lt = net(data_lt)
assert isinstance(out, torch.Tensor)
assert isinstance(out_lt, LabelTensor)
assert out.shape == (20, 4)
assert out_lt.shape == (20, 4)
assert torch.allclose(out_lt, out)
assert out_lt.labels == ['a', 'b', 'c', 'd']
with pytest.raises(AssertionError):
net(data)
def test_backward():
net = Network(model=torchmodel,
input_variables=['x', 'y', 'z'],
output_variables=['a', 'b', 'c', 'd'],
extra_features=None)
data = torch.rand((20, 3))
data.requires_grad = True
out = net.torchmodel(data)
l = torch.mean(out)
l.backward()
assert data._grad.shape == torch.Size([20, 3])

View File

@@ -1,111 +0,0 @@
import torch
import pytest
from pina import Condition, LabelTensor, Trainer
from pina.problem import SpatialProblem
from pina.operators import laplacian
from pina.domain import CartesianDomain
from pina.model import FeedForward
from pina.solvers import PINNInterface
from pina.problem.zoo import Poisson2DSquareProblem as Poisson
# from pina.equation import Equation
# from pina.equation.equation_factory import FixedValue
# def laplace_equation(input_, output_):
# force_term = (torch.sin(input_.extract(['x']) * torch.pi) *
# torch.sin(input_.extract(['y']) * torch.pi))
# delta_u = laplacian(output_.extract(['u']), input_)
# return delta_u - force_term
# my_laplace = Equation(laplace_equation)
# in_ = LabelTensor(torch.tensor([[0., 1.]]), ['x', 'y'])
# out_ = LabelTensor(torch.tensor([[0.]]), ['u'])
# in2_ = LabelTensor(torch.rand(60, 2), ['x', 'y'])
# out2_ = LabelTensor(torch.rand(60, 1), ['u'])
# class Poisson(SpatialProblem):
# output_variables = ['u']
# spatial_domain = CartesianDomain({'x': [0, 1], 'y': [0, 1]})
# conditions = {
# 'gamma1': Condition(
# location=CartesianDomain({'x': [0, 1], 'y': 1}),
# equation=FixedValue(0.0)),
# 'gamma2': Condition(
# location=CartesianDomain({'x': [0, 1], 'y': 0}),
# equation=FixedValue(0.0)),
# 'gamma3': Condition(
# location=CartesianDomain({'x': 1, 'y': [0, 1]}),
# equation=FixedValue(0.0)),
# 'gamma4': Condition(
# location=CartesianDomain({'x': 0, 'y': [0, 1]}),
# equation=FixedValue(0.0)),
# 'D': Condition(
# input_points=LabelTensor(torch.rand(size=(100, 2)), ['x', 'y']),
# equation=my_laplace),
# 'data': Condition(
# input_points=in_,
# output_points=out_),
# 'data2': Condition(
# input_points=in2_,
# output_points=out2_)
# }
# def poisson_sol(self, pts):
# return -(torch.sin(pts.extract(['x']) * torch.pi) *
# torch.sin(pts.extract(['y']) * torch.pi)) / (2 * torch.pi**2)
# truth_solution = poisson_sol
# from pina import TorchOptimizer
# class FOOPINN(PINNInterface):
# def __init__(self, model, problem):
# super().__init__(models=[model], problem=problem,
# optimizers=TorchOptimizer(torch.optim.Adam, lr=1e-3),
# loss=torch.nn.MSELoss())
# def forward(self, x):
# return self.models[0](x)
# def loss_phys(self, samples, equation):
# residual = self.compute_residual(samples=samples, equation=equation)
# loss_value = self.loss(
# torch.zeros_like(residual, requires_grad=True), residual
# )
# self.store_log(loss_value=float(loss_value))
# return loss_value
# # make the problem
# poisson_problem = Poisson()
# poisson_problem.discretise_domain(100)
# model = FeedForward(len(poisson_problem.input_variables),
# len(poisson_problem.output_variables))
# model_extra_feats = FeedForward(
# len(poisson_problem.input_variables) + 1,
# len(poisson_problem.output_variables))
# def test_constructor():
# with pytest.raises(TypeError):
# PINNInterface()
# # a simple pinn built with PINNInterface
# FOOPINN(model, poisson_problem)
# def test_train_step():
# solver = FOOPINN(model, poisson_problem)
# trainer = Trainer(solver, max_epochs=2, accelerator='cpu')
# trainer.train()
# def test_log():
# solver = FOOPINN(model, poisson_problem)
# trainer = Trainer(solver, max_epochs=2, accelerator='cpu')
# trainer.train()
# # assert the logged metrics are correct
# logged_metrics = sorted(list(trainer.logged_metrics.keys()))
# total_metrics = sorted(
# list([key + '_loss' for key in poisson_problem.conditions.keys()])
# + ['mean_loss'])
# assert logged_metrics == total_metrics

View File

@@ -0,0 +1,156 @@
import torch
import pytest
from pina import LabelTensor, Condition
from pina.problem import SpatialProblem
from pina.solvers import CausalPINN
from pina.trainer import Trainer
from pina.model import FeedForward
from pina.problem.zoo import (
DiffusionReactionProblem,
InverseDiffusionReactionProblem
)
from pina.condition import (
InputOutputPointsCondition,
InputPointsEquationCondition,
DomainEquationCondition
)
from torch._dynamo.eval_frame import OptimizedModule
class DummySpatialProblem(SpatialProblem):
'''
A mock spatial problem for testing purposes.
'''
output_variables = ['u']
conditions = {}
spatial_domain = None
# define problems and model
problem = DiffusionReactionProblem()
problem.discretise_domain(50)
inverse_problem = InverseDiffusionReactionProblem()
inverse_problem.discretise_domain(50)
model = FeedForward(
len(problem.input_variables),
len(problem.output_variables)
)
# add input-output condition to test supervised learning
input_pts = torch.rand(50, len(problem.input_variables))
input_pts = LabelTensor(input_pts, problem.input_variables)
output_pts = torch.rand(50, len(problem.output_variables))
output_pts = LabelTensor(output_pts, problem.output_variables)
problem.conditions['data'] = Condition(
input_points=input_pts,
output_points=output_pts
)
@pytest.mark.parametrize("problem", [problem, inverse_problem])
@pytest.mark.parametrize("eps", [100, 100.1])
def test_constructor(problem, eps):
with pytest.raises(ValueError):
CausalPINN(model=model, problem=DummySpatialProblem())
solver = CausalPINN(model=model, problem=problem, eps=eps)
assert solver.accepted_conditions_types == (
InputOutputPointsCondition,
InputPointsEquationCondition,
DomainEquationCondition
)
@pytest.mark.parametrize("problem", [problem, inverse_problem])
@pytest.mark.parametrize("batch_size", [None, 1, 5, 20])
@pytest.mark.parametrize("compile", [True, False])
def test_solver_train(problem, batch_size, compile):
solver = CausalPINN(model=model, problem=problem)
trainer = Trainer(solver=solver,
max_epochs=2,
accelerator='cpu',
batch_size=batch_size,
train_size=1.,
val_size=0.,
test_size=0.,
compile=compile)
trainer.train()
if trainer.compile:
assert (isinstance(solver.model, OptimizedModule))
@pytest.mark.parametrize("problem", [problem, inverse_problem])
@pytest.mark.parametrize("batch_size", [None, 1, 5, 20])
@pytest.mark.parametrize("compile", [True, False])
def test_solver_validation(problem, batch_size, compile):
solver = CausalPINN(model=model, problem=problem)
trainer = Trainer(solver=solver,
max_epochs=2,
accelerator='cpu',
batch_size=batch_size,
train_size=0.9,
val_size=0.1,
test_size=0.,
compile=compile)
trainer.train()
if trainer.compile:
assert (isinstance(solver.model, OptimizedModule))
@pytest.mark.parametrize("problem", [problem, inverse_problem])
@pytest.mark.parametrize("batch_size", [None, 1, 5, 20])
@pytest.mark.parametrize("compile", [True, False])
def test_solver_test(problem, batch_size, compile):
solver = CausalPINN(model=model, problem=problem)
trainer = Trainer(solver=solver,
max_epochs=2,
accelerator='cpu',
batch_size=batch_size,
train_size=0.7,
val_size=0.2,
test_size=0.1,
compile=compile)
trainer.test()
if trainer.compile:
assert (isinstance(solver.model, OptimizedModule))
@pytest.mark.parametrize("problem", [problem, inverse_problem])
def test_train_load_restore(problem):
dir = "tests/test_solvers/tmp"
problem = problem
solver = CausalPINN(model=model, problem=problem)
trainer = Trainer(solver=solver,
max_epochs=5,
accelerator='cpu',
batch_size=None,
train_size=0.7,
val_size=0.2,
test_size=0.1,
default_root_dir=dir)
trainer.train()
# restore
new_trainer = Trainer(solver=solver, max_epochs=5, accelerator='cpu')
new_trainer.train(
ckpt_path=f'{dir}/lightning_logs/version_0/checkpoints/' +
'epoch=4-step=5.ckpt')
# loading
new_solver = CausalPINN.load_from_checkpoint(
f'{dir}/lightning_logs/version_0/checkpoints/epoch=4-step=5.ckpt',
problem=problem, model=model)
test_pts = LabelTensor(torch.rand(20, 2), problem.input_variables)
assert new_solver.forward(test_pts).shape == (20, 1)
assert new_solver.forward(test_pts).shape == (
solver.forward(test_pts).shape
)
torch.testing.assert_close(
new_solver.forward(test_pts),
solver.forward(test_pts))
# rm directories
import shutil
shutil.rmtree('tests/test_solvers/tmp')

View File

@@ -1,278 +0,0 @@
import torch
import pytest
from pina.problem import TimeDependentProblem, InverseProblem, SpatialProblem
from pina.operators import grad
from pina.domain import CartesianDomain
from pina import Condition, LabelTensor
from pina.solvers import CausalPINN
from pina.trainer import Trainer
from pina.model import FeedForward
from pina.equation import Equation
from pina.equation.equation_factory import FixedValue
from pina.loss import LpLoss
# class FooProblem(SpatialProblem):
# '''
# Foo problem formulation.
# '''
# output_variables = ['u']
# conditions = {}
# spatial_domain = None
# class InverseDiffusionReactionSystem(TimeDependentProblem, SpatialProblem, InverseProblem):
# def diffusionreaction(input_, output_, params_):
# x = input_.extract('x')
# t = input_.extract('t')
# u_t = grad(output_, input_, d='t')
# u_x = grad(output_, input_, d='x')
# u_xx = grad(u_x, input_, d='x')
# r = torch.exp(-t) * (1.5 * torch.sin(2*x) + (8/3)*torch.sin(3*x) +
# (15/4)*torch.sin(4*x) + (63/8)*torch.sin(8*x))
# return u_t - params_['mu']*u_xx - r
# def _solution(self, pts):
# t = pts.extract('t')
# x = pts.extract('x')
# return torch.exp(-t) * (torch.sin(x) + (1/2)*torch.sin(2*x) +
# (1/3)*torch.sin(3*x) + (1/4)*torch.sin(4*x) +
# (1/8)*torch.sin(8*x))
# # assign output/ spatial and temporal variables
# output_variables = ['u']
# spatial_domain = CartesianDomain({'x': [-torch.pi, torch.pi]})
# temporal_domain = CartesianDomain({'t': [0, 1]})
# unknown_parameter_domain = CartesianDomain({'mu': [-1, 1]})
# # problem condition statement
# conditions = {
# 'D': Condition(location=CartesianDomain({'x': [-torch.pi, torch.pi],
# 't': [0, 1]}),
# equation=Equation(diffusionreaction)),
# 'data' : Condition(input_points=LabelTensor(torch.tensor([[0., 0.]]), ['x', 't']),
# output_points=LabelTensor(torch.tensor([[0.]]), ['u'])),
# }
# class DiffusionReactionSystem(TimeDependentProblem, SpatialProblem):
# def diffusionreaction(input_, output_):
# x = input_.extract('x')
# t = input_.extract('t')
# u_t = grad(output_, input_, d='t')
# u_x = grad(output_, input_, d='x')
# u_xx = grad(u_x, input_, d='x')
# r = torch.exp(-t) * (1.5 * torch.sin(2*x) + (8/3)*torch.sin(3*x) +
# (15/4)*torch.sin(4*x) + (63/8)*torch.sin(8*x))
# return u_t - u_xx - r
# def _solution(self, pts):
# t = pts.extract('t')
# x = pts.extract('x')
# return torch.exp(-t) * (torch.sin(x) + (1/2)*torch.sin(2*x) +
# (1/3)*torch.sin(3*x) + (1/4)*torch.sin(4*x) +
# (1/8)*torch.sin(8*x))
# # assign output/ spatial and temporal variables
# output_variables = ['u']
# spatial_domain = CartesianDomain({'x': [-torch.pi, torch.pi]})
# temporal_domain = CartesianDomain({'t': [0, 1]})
# # problem condition statement
# conditions = {
# 'D': Condition(location=CartesianDomain({'x': [-torch.pi, torch.pi],
# 't': [0, 1]}),
# equation=Equation(diffusionreaction)),
# }
# class myFeature(torch.nn.Module):
# """
# Feature: sin(x)
# """
# def __init__(self):
# super(myFeature, self).__init__()
# def forward(self, x):
# t = (torch.sin(x.extract(['x']) * torch.pi))
# return LabelTensor(t, ['sin(x)'])
# # make the problem
# problem = DiffusionReactionSystem()
# model = FeedForward(len(problem.input_variables),
# len(problem.output_variables))
# model_extra_feats = FeedForward(
# len(problem.input_variables) + 1,
# len(problem.output_variables))
# extra_feats = [myFeature()]
# def test_constructor():
# CausalPINN(problem=problem, model=model, extra_features=None)
# with pytest.raises(ValueError):
# CausalPINN(FooProblem(), model=model, extra_features=None)
# def test_constructor_extra_feats():
# model_extra_feats = FeedForward(
# len(problem.input_variables) + 1,
# len(problem.output_variables))
# CausalPINN(problem=problem,
# model=model_extra_feats,
# extra_features=extra_feats)
# def test_train_cpu():
# problem = DiffusionReactionSystem()
# boundaries = ['D']
# n = 10
# problem.discretise_domain(n, 'grid', locations=boundaries)
# pinn = CausalPINN(problem = problem,
# model=model, extra_features=None, loss=LpLoss())
# trainer = Trainer(solver=pinn, max_epochs=1,
# accelerator='cpu', batch_size=20)
# trainer.train()
# def test_log():
# problem.discretise_domain(100)
# solver = CausalPINN(problem = problem,
# model=model, extra_features=None, loss=LpLoss())
# trainer = Trainer(solver, max_epochs=2, accelerator='cpu')
# trainer.train()
# # assert the logged metrics are correct
# logged_metrics = sorted(list(trainer.logged_metrics.keys()))
# total_metrics = sorted(
# list([key + '_loss' for key in problem.conditions.keys()])
# + ['mean_loss'])
# assert logged_metrics == total_metrics
# def test_train_restore():
# tmpdir = "tests/tmp_restore"
# problem = DiffusionReactionSystem()
# boundaries = ['D']
# n = 10
# problem.discretise_domain(n, 'grid', locations=boundaries)
# pinn = CausalPINN(problem=problem,
# model=model,
# extra_features=None,
# loss=LpLoss())
# trainer = Trainer(solver=pinn,
# max_epochs=5,
# accelerator='cpu',
# default_root_dir=tmpdir)
# trainer.train()
# ntrainer = Trainer(solver=pinn, max_epochs=15, accelerator='cpu')
# t = ntrainer.train(
# ckpt_path=f'{tmpdir}/lightning_logs/version_0/'
# 'checkpoints/epoch=4-step=5.ckpt')
# import shutil
# shutil.rmtree(tmpdir)
# def test_train_load():
# tmpdir = "tests/tmp_load"
# problem = DiffusionReactionSystem()
# boundaries = ['D']
# n = 10
# problem.discretise_domain(n, 'grid', locations=boundaries)
# pinn = CausalPINN(problem=problem,
# model=model,
# extra_features=None,
# loss=LpLoss())
# trainer = Trainer(solver=pinn,
# max_epochs=15,
# accelerator='cpu',
# default_root_dir=tmpdir)
# trainer.train()
# new_pinn = CausalPINN.load_from_checkpoint(
# f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=14-step=15.ckpt',
# problem = problem, model=model)
# test_pts = CartesianDomain({'x': [0, 1], 't': [0, 1]}).sample(10)
# assert new_pinn.forward(test_pts).extract(['u']).shape == (10, 1)
# assert new_pinn.forward(test_pts).extract(
# ['u']).shape == pinn.forward(test_pts).extract(['u']).shape
# torch.testing.assert_close(
# new_pinn.forward(test_pts).extract(['u']),
# pinn.forward(test_pts).extract(['u']))
# import shutil
# shutil.rmtree(tmpdir)
# def test_train_inverse_problem_cpu():
# problem = InverseDiffusionReactionSystem()
# boundaries = ['D']
# n = 100
# problem.discretise_domain(n, 'random', locations=boundaries)
# pinn = CausalPINN(problem = problem,
# model=model, extra_features=None, loss=LpLoss())
# trainer = Trainer(solver=pinn, max_epochs=1,
# accelerator='cpu', batch_size=20)
# trainer.train()
# # # TODO does not currently work
# # def test_train_inverse_problem_restore():
# # tmpdir = "tests/tmp_restore_inv"
# # problem = InverseDiffusionReactionSystem()
# # boundaries = ['D']
# # n = 100
# # problem.discretise_domain(n, 'random', locations=boundaries)
# # pinn = CausalPINN(problem=problem,
# # model=model,
# # extra_features=None,
# # loss=LpLoss())
# # trainer = Trainer(solver=pinn,
# # max_epochs=5,
# # accelerator='cpu',
# # default_root_dir=tmpdir)
# # trainer.train()
# # ntrainer = Trainer(solver=pinn, max_epochs=5, accelerator='cpu')
# # t = ntrainer.train(
# # ckpt_path=f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=4-step=5.ckpt')
# # import shutil
# # shutil.rmtree(tmpdir)
# def test_train_inverse_problem_load():
# tmpdir = "tests/tmp_load_inv"
# problem = InverseDiffusionReactionSystem()
# boundaries = ['D']
# n = 100
# problem.discretise_domain(n, 'random', locations=boundaries)
# pinn = CausalPINN(problem=problem,
# model=model,
# extra_features=None,
# loss=LpLoss())
# trainer = Trainer(solver=pinn,
# max_epochs=15,
# accelerator='cpu',
# default_root_dir=tmpdir)
# trainer.train()
# new_pinn = CausalPINN.load_from_checkpoint(
# f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=14-step=30.ckpt',
# problem = problem, model=model)
# test_pts = CartesianDomain({'x': [0, 1], 't': [0, 1]}).sample(10)
# assert new_pinn.forward(test_pts).extract(['u']).shape == (10, 1)
# assert new_pinn.forward(test_pts).extract(
# ['u']).shape == pinn.forward(test_pts).extract(['u']).shape
# torch.testing.assert_close(
# new_pinn.forward(test_pts).extract(['u']),
# pinn.forward(test_pts).extract(['u']))
# import shutil
# shutil.rmtree(tmpdir)
# def test_train_extra_feats_cpu():
# problem = DiffusionReactionSystem()
# boundaries = ['D']
# n = 10
# problem.discretise_domain(n, 'grid', locations=boundaries)
# pinn = CausalPINN(problem=problem,
# model=model_extra_feats,
# extra_features=extra_feats)
# trainer = Trainer(solver=pinn, max_epochs=5, accelerator='cpu')
# trainer.train()

View File

@@ -1,429 +1,145 @@
import torch
import pytest
from pina.problem import SpatialProblem, InverseProblem
from pina.operators import laplacian
from pina.domain import CartesianDomain
from pina import Condition, LabelTensor
from pina.solvers import CompetitivePINN as PINN
from pina import LabelTensor, Condition
from pina.solvers import CompetitivePINN as CompPINN
from pina.trainer import Trainer
from pina.model import FeedForward
from pina.equation import Equation
from pina.equation.equation_factory import FixedValue
from pina.loss import LpLoss
from pina.problem.zoo import (
Poisson2DSquareProblem as Poisson,
InversePoisson2DSquareProblem as InversePoisson
)
from pina.condition import (
InputOutputPointsCondition,
InputPointsEquationCondition,
DomainEquationCondition
)
from torch._dynamo.eval_frame import OptimizedModule
# def laplace_equation(input_, output_):
# force_term = (torch.sin(input_.extract(['x']) * torch.pi) *
# torch.sin(input_.extract(['y']) * torch.pi))
# delta_u = laplacian(output_.extract(['u']), input_)
# return delta_u - force_term
# define problems and model
problem = Poisson()
problem.discretise_domain(50)
inverse_problem = InversePoisson()
inverse_problem.discretise_domain(50)
model = FeedForward(
len(problem.input_variables),
len(problem.output_variables)
)
# add input-output condition to test supervised learning
input_pts = torch.rand(50, len(problem.input_variables))
input_pts = LabelTensor(input_pts, problem.input_variables)
output_pts = torch.rand(50, len(problem.output_variables))
output_pts = LabelTensor(output_pts, problem.output_variables)
problem.conditions['data'] = Condition(
input_points=input_pts,
output_points=output_pts
)
@pytest.mark.parametrize("problem", [problem, inverse_problem])
@pytest.mark.parametrize("discr", [None, model])
def test_constructor(problem, discr):
solver = CompPINN(problem=problem, model=model)
solver = CompPINN(problem=problem, model=model, discriminator=discr)
assert solver.accepted_conditions_types == (
InputOutputPointsCondition,
InputPointsEquationCondition,
DomainEquationCondition
)
@pytest.mark.parametrize("problem", [problem, inverse_problem])
@pytest.mark.parametrize("batch_size", [None, 1, 5, 20])
@pytest.mark.parametrize("compile", [True, False])
def test_solver_train(problem, batch_size, compile):
solver = CompPINN(problem=problem, model=model)
trainer = Trainer(solver=solver,
max_epochs=2,
accelerator='cpu',
batch_size=batch_size,
train_size=1.,
val_size=0.,
test_size=0.,
compile=compile)
trainer.train()
if trainer.compile:
assert (all([isinstance(model, OptimizedModule)
for model in solver.models]))
# my_laplace = Equation(laplace_equation)
# in_ = LabelTensor(torch.tensor([[0., 1.]]), ['x', 'y'])
# out_ = LabelTensor(torch.tensor([[0.]]), ['u'])
# in2_ = LabelTensor(torch.rand(60, 2), ['x', 'y'])
# out2_ = LabelTensor(torch.rand(60, 1), ['u'])
@pytest.mark.parametrize("problem", [problem, inverse_problem])
@pytest.mark.parametrize("batch_size", [None, 1, 5, 20])
@pytest.mark.parametrize("compile", [True, False])
def test_solver_validation(problem, batch_size, compile):
solver = CompPINN(problem=problem, model=model)
trainer = Trainer(solver=solver,
max_epochs=2,
accelerator='cpu',
batch_size=batch_size,
train_size=0.9,
val_size=0.1,
test_size=0.,
compile=compile)
trainer.train()
if trainer.compile:
assert (all([isinstance(model, OptimizedModule)
for model in solver.models]))
# class InversePoisson(SpatialProblem, InverseProblem):
# '''
# Problem definition for the Poisson equation.
# '''
# output_variables = ['u']
# x_min = -2
# x_max = 2
# y_min = -2
# y_max = 2
# data_input = LabelTensor(torch.rand(10, 2), ['x', 'y'])
# data_output = LabelTensor(torch.rand(10, 1), ['u'])
# spatial_domain = CartesianDomain({'x': [x_min, x_max], 'y': [y_min, y_max]})
# # define the ranges for the parameters
# unknown_parameter_domain = CartesianDomain({'mu1': [-1, 1], 'mu2': [-1, 1]})
@pytest.mark.parametrize("problem", [problem, inverse_problem])
@pytest.mark.parametrize("batch_size", [None, 1, 5, 20])
@pytest.mark.parametrize("compile", [True, False])
def test_solver_test(problem, batch_size, compile):
solver = CompPINN(problem=problem, model=model)
trainer = Trainer(solver=solver,
max_epochs=2,
accelerator='cpu',
batch_size=batch_size,
train_size=0.7,
val_size=0.2,
test_size=0.1,
compile=compile)
trainer.test()
if trainer.compile:
assert (all([isinstance(model, OptimizedModule)
for model in solver.models]))
# def laplace_equation(input_, output_, params_):
# '''
# Laplace equation with a force term.
# '''
# force_term = torch.exp(
# - 2*(input_.extract(['x']) - params_['mu1'])**2
# - 2*(input_.extract(['y']) - params_['mu2'])**2)
# delta_u = laplacian(output_, input_, components=['u'], d=['x', 'y'])
@pytest.mark.parametrize("problem", [problem, inverse_problem])
def test_train_load_restore(problem):
dir = "tests/test_solvers/tmp"
problem = problem
solver = CompPINN(problem=problem, model=model)
trainer = Trainer(solver=solver,
max_epochs=5,
accelerator='cpu',
batch_size=None,
train_size=0.7,
val_size=0.2,
test_size=0.1,
default_root_dir=dir)
trainer.train()
# return delta_u - force_term
# restore
new_trainer = Trainer(solver=solver, max_epochs=5, accelerator='cpu')
new_trainer.train(
ckpt_path=f'{dir}/lightning_logs/version_0/checkpoints/' +
'epoch=4-step=5.ckpt')
# # define the conditions for the loss (boundary conditions, equation, data)
# conditions = {
# 'gamma1': Condition(location=CartesianDomain({'x': [x_min, x_max],
# 'y': y_max}),
# equation=FixedValue(0.0, components=['u'])),
# 'gamma2': Condition(location=CartesianDomain(
# {'x': [x_min, x_max], 'y': y_min
# }),
# equation=FixedValue(0.0, components=['u'])),
# 'gamma3': Condition(location=CartesianDomain(
# {'x': x_max, 'y': [y_min, y_max]
# }),
# equation=FixedValue(0.0, components=['u'])),
# 'gamma4': Condition(location=CartesianDomain(
# {'x': x_min, 'y': [y_min, y_max]
# }),
# equation=FixedValue(0.0, components=['u'])),
# 'D': Condition(location=CartesianDomain(
# {'x': [x_min, x_max], 'y': [y_min, y_max]
# }),
# equation=Equation(laplace_equation)),
# 'data': Condition(input_points=data_input.extract(['x', 'y']),
# output_points=data_output)
# }
# loading
new_solver = CompPINN.load_from_checkpoint(
f'{dir}/lightning_logs/version_0/checkpoints/epoch=4-step=5.ckpt',
problem=problem, model=model)
test_pts = LabelTensor(torch.rand(20, 2), problem.input_variables)
assert new_solver.forward(test_pts).shape == (20, 1)
assert new_solver.forward(test_pts).shape == (
solver.forward(test_pts).shape
)
torch.testing.assert_close(
new_solver.forward(test_pts),
solver.forward(test_pts))
# class Poisson(SpatialProblem):
# output_variables = ['u']
# spatial_domain = CartesianDomain({'x': [0, 1], 'y': [0, 1]})
# conditions = {
# 'gamma1': Condition(
# location=CartesianDomain({'x': [0, 1], 'y': 1}),
# equation=FixedValue(0.0)),
# 'gamma2': Condition(
# location=CartesianDomain({'x': [0, 1], 'y': 0}),
# equation=FixedValue(0.0)),
# 'gamma3': Condition(
# location=CartesianDomain({'x': 1, 'y': [0, 1]}),
# equation=FixedValue(0.0)),
# 'gamma4': Condition(
# location=CartesianDomain({'x': 0, 'y': [0, 1]}),
# equation=FixedValue(0.0)),
# 'D': Condition(
# input_points=LabelTensor(torch.rand(size=(100, 2)), ['x', 'y']),
# equation=my_laplace),
# 'data': Condition(
# input_points=in_,
# output_points=out_),
# 'data2': Condition(
# input_points=in2_,
# output_points=out2_)
# }
# def poisson_sol(self, pts):
# return -(torch.sin(pts.extract(['x']) * torch.pi) *
# torch.sin(pts.extract(['y']) * torch.pi)) / (2 * torch.pi**2)
# truth_solution = poisson_sol
# class myFeature(torch.nn.Module):
# """
# Feature: sin(x)
# """
# def __init__(self):
# super(myFeature, self).__init__()
# def forward(self, x):
# t = (torch.sin(x.extract(['x']) * torch.pi) *
# torch.sin(x.extract(['y']) * torch.pi))
# return LabelTensor(t, ['sin(x)sin(y)'])
# # make the problem
# poisson_problem = Poisson()
# model = FeedForward(len(poisson_problem.input_variables),
# len(poisson_problem.output_variables))
# model_extra_feats = FeedForward(
# len(poisson_problem.input_variables) + 1,
# len(poisson_problem.output_variables))
# extra_feats = [myFeature()]
# def test_constructor():
# PINN(problem=poisson_problem, model=model)
# PINN(problem=poisson_problem, model=model, discriminator = model)
# def test_constructor_extra_feats():
# with pytest.raises(TypeError):
# model_extra_feats = FeedForward(
# len(poisson_problem.input_variables) + 1,
# len(poisson_problem.output_variables))
# PINN(problem=poisson_problem,
# model=model_extra_feats,
# extra_features=extra_feats)
# def test_train_cpu():
# poisson_problem = Poisson()
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# n = 10
# poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
# pinn = PINN(problem = poisson_problem, model=model, loss=LpLoss())
# trainer = Trainer(solver=pinn, max_epochs=1,
# accelerator='cpu', batch_size=20)
# trainer.train()
# def test_log():
# poisson_problem.discretise_domain(100)
# solver = PINN(problem = poisson_problem, model=model, loss=LpLoss())
# trainer = Trainer(solver, max_epochs=2, accelerator='cpu')
# trainer.train()
# # assert the logged metrics are correct
# logged_metrics = sorted(list(trainer.logged_metrics.keys()))
# total_metrics = sorted(
# list([key + '_loss' for key in poisson_problem.conditions.keys()])
# + ['mean_loss'])
# assert logged_metrics == total_metrics
# def test_train_restore():
# tmpdir = "tests/tmp_restore"
# poisson_problem = Poisson()
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# n = 10
# poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
# pinn = PINN(problem=poisson_problem,
# model=model,
# loss=LpLoss())
# trainer = Trainer(solver=pinn,
# max_epochs=5,
# accelerator='cpu',
# default_root_dir=tmpdir)
# trainer.train()
# ntrainer = Trainer(solver=pinn, max_epochs=15, accelerator='cpu')
# t = ntrainer.train(
# ckpt_path=f'{tmpdir}/lightning_logs/version_0/'
# 'checkpoints/epoch=4-step=10.ckpt')
# import shutil
# shutil.rmtree(tmpdir)
# def test_train_load():
# tmpdir = "tests/tmp_load"
# poisson_problem = Poisson()
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# n = 10
# poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
# pinn = PINN(problem=poisson_problem,
# model=model,
# loss=LpLoss())
# trainer = Trainer(solver=pinn,
# max_epochs=15,
# accelerator='cpu',
# default_root_dir=tmpdir)
# trainer.train()
# new_pinn = PINN.load_from_checkpoint(
# f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=14-step=30.ckpt',
# problem = poisson_problem, model=model)
# test_pts = CartesianDomain({'x': [0, 1], 'y': [0, 1]}).sample(10)
# assert new_pinn.forward(test_pts).extract(['u']).shape == (10, 1)
# assert new_pinn.forward(test_pts).extract(
# ['u']).shape == pinn.forward(test_pts).extract(['u']).shape
# torch.testing.assert_close(
# new_pinn.forward(test_pts).extract(['u']),
# pinn.forward(test_pts).extract(['u']))
# import shutil
# shutil.rmtree(tmpdir)
# def test_train_inverse_problem_cpu():
# poisson_problem = InversePoisson()
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4', 'D']
# n = 100
# poisson_problem.discretise_domain(n, 'random', locations=boundaries)
# pinn = PINN(problem = poisson_problem, model=model, loss=LpLoss())
# trainer = Trainer(solver=pinn, max_epochs=1,
# accelerator='cpu', batch_size=20)
# trainer.train()
# # # TODO does not currently work
# # def test_train_inverse_problem_restore():
# # tmpdir = "tests/tmp_restore_inv"
# # poisson_problem = InversePoisson()
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4', 'D']
# # n = 100
# # poisson_problem.discretise_domain(n, 'random', locations=boundaries)
# # pinn = PINN(problem=poisson_problem,
# # model=model,
# # loss=LpLoss())
# # trainer = Trainer(solver=pinn,
# # max_epochs=5,
# # accelerator='cpu',
# # default_root_dir=tmpdir)
# # trainer.train()
# # ntrainer = Trainer(solver=pinn, max_epochs=5, accelerator='cpu')
# # t = ntrainer.train(
# # ckpt_path=f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=4-step=10.ckpt')
# # import shutil
# # shutil.rmtree(tmpdir)
# def test_train_inverse_problem_load():
# tmpdir = "tests/tmp_load_inv"
# poisson_problem = InversePoisson()
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4', 'D']
# n = 100
# poisson_problem.discretise_domain(n, 'random', locations=boundaries)
# pinn = PINN(problem=poisson_problem,
# model=model,
# loss=LpLoss())
# trainer = Trainer(solver=pinn,
# max_epochs=15,
# accelerator='cpu',
# default_root_dir=tmpdir)
# trainer.train()
# new_pinn = PINN.load_from_checkpoint(
# f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=14-step=30.ckpt',
# problem = poisson_problem, model=model)
# test_pts = CartesianDomain({'x': [0, 1], 'y': [0, 1]}).sample(10)
# assert new_pinn.forward(test_pts).extract(['u']).shape == (10, 1)
# assert new_pinn.forward(test_pts).extract(
# ['u']).shape == pinn.forward(test_pts).extract(['u']).shape
# torch.testing.assert_close(
# new_pinn.forward(test_pts).extract(['u']),
# pinn.forward(test_pts).extract(['u']))
# import shutil
# shutil.rmtree(tmpdir)
# # # TODO fix asap. Basically sampling few variables
# # # works only if both variables are in a range.
# # # if one is fixed and the other not, this will
# # # not work. This test also needs to be fixed and
# # # insert in test problem not in test pinn.
# # def test_train_cpu_sampling_few_vars():
# # poisson_problem = Poisson()
# # boundaries = ['gamma1', 'gamma2', 'gamma3']
# # n = 10
# # poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
# # poisson_problem.discretise_domain(n, 'random', locations=['gamma4'], variables=['x'])
# # poisson_problem.discretise_domain(n, 'random', locations=['gamma4'], variables=['y'])
# # pinn = PINN(problem = poisson_problem, model=model, extra_features=None, loss=LpLoss())
# # trainer = Trainer(solver=pinn, kwargs={'max_epochs' : 5, 'accelerator':'cpu'})
# # trainer.train()
# # TODO, fix GitHub actions to run also on GPU
# # def test_train_gpu():
# # poisson_problem = Poisson()
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # n = 10
# # poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
# # pinn = PINN(problem = poisson_problem, model=model, extra_features=None, loss=LpLoss())
# # trainer = Trainer(solver=pinn, kwargs={'max_epochs' : 5, 'accelerator':'gpu'})
# # trainer.train()
# # def test_train_gpu(): #TODO fix ASAP
# # poisson_problem = Poisson()
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # n = 10
# # poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
# # poisson_problem.conditions.pop('data') # The input/output pts are allocated on cpu
# # pinn = PINN(problem = poisson_problem, model=model, extra_features=None, loss=LpLoss())
# # trainer = Trainer(solver=pinn, kwargs={'max_epochs' : 5, 'accelerator':'gpu'})
# # trainer.train()
# # def test_train_2():
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # n = 10
# # expected_keys = [[], list(range(0, 50, 3))]
# # param = [0, 3]
# # for i, truth_key in zip(param, expected_keys):
# # pinn = PINN(problem, model)
# # pinn.discretise_domain(n, 'grid', locations=boundaries)
# # pinn.discretise_domain(n, 'grid', locations=['D'])
# # pinn.train(50, save_loss=i)
# # assert list(pinn.history_loss.keys()) == truth_key
# # def test_train_extra_feats():
# # pinn = PINN(problem, model_extra_feat, [myFeature()])
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # n = 10
# # pinn.discretise_domain(n, 'grid', locations=boundaries)
# # pinn.discretise_domain(n, 'grid', locations=['D'])
# # pinn.train(5)
# # def test_train_2_extra_feats():
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # n = 10
# # expected_keys = [[], list(range(0, 50, 3))]
# # param = [0, 3]
# # for i, truth_key in zip(param, expected_keys):
# # pinn = PINN(problem, model_extra_feat, [myFeature()])
# # pinn.discretise_domain(n, 'grid', locations=boundaries)
# # pinn.discretise_domain(n, 'grid', locations=['D'])
# # pinn.train(50, save_loss=i)
# # assert list(pinn.history_loss.keys()) == truth_key
# # def test_train_with_optimizer_kwargs():
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # n = 10
# # expected_keys = [[], list(range(0, 50, 3))]
# # param = [0, 3]
# # for i, truth_key in zip(param, expected_keys):
# # pinn = PINN(problem, model, optimizer_kwargs={'lr' : 0.3})
# # pinn.discretise_domain(n, 'grid', locations=boundaries)
# # pinn.discretise_domain(n, 'grid', locations=['D'])
# # pinn.train(50, save_loss=i)
# # assert list(pinn.history_loss.keys()) == truth_key
# # def test_train_with_lr_scheduler():
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # n = 10
# # expected_keys = [[], list(range(0, 50, 3))]
# # param = [0, 3]
# # for i, truth_key in zip(param, expected_keys):
# # pinn = PINN(
# # problem,
# # model,
# # lr_scheduler_type=torch.optim.lr_scheduler.CyclicLR,
# # lr_scheduler_kwargs={'base_lr' : 0.1, 'max_lr' : 0.3, 'cycle_momentum': False}
# # )
# # pinn.discretise_domain(n, 'grid', locations=boundaries)
# # pinn.discretise_domain(n, 'grid', locations=['D'])
# # pinn.train(50, save_loss=i)
# # assert list(pinn.history_loss.keys()) == truth_key
# # # def test_train_batch():
# # # pinn = PINN(problem, model, batch_size=6)
# # # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # # n = 10
# # # pinn.discretise_domain(n, 'grid', locations=boundaries)
# # # pinn.discretise_domain(n, 'grid', locations=['D'])
# # # pinn.train(5)
# # # def test_train_batch_2():
# # # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # # n = 10
# # # expected_keys = [[], list(range(0, 50, 3))]
# # # param = [0, 3]
# # # for i, truth_key in zip(param, expected_keys):
# # # pinn = PINN(problem, model, batch_size=6)
# # # pinn.discretise_domain(n, 'grid', locations=boundaries)
# # # pinn.discretise_domain(n, 'grid', locations=['D'])
# # # pinn.train(50, save_loss=i)
# # # assert list(pinn.history_loss.keys()) == truth_key
# # if torch.cuda.is_available():
# # # def test_gpu_train():
# # # pinn = PINN(problem, model, batch_size=20, device='cuda')
# # # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # # n = 100
# # # pinn.discretise_domain(n, 'grid', locations=boundaries)
# # # pinn.discretise_domain(n, 'grid', locations=['D'])
# # # pinn.train(5)
# # def test_gpu_train_nobatch():
# # pinn = PINN(problem, model, batch_size=None, device='cuda')
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # n = 100
# # pinn.discretise_domain(n, 'grid', locations=boundaries)
# # pinn.discretise_domain(n, 'grid', locations=['D'])
# # pinn.train(5)
# rm directories
import shutil
shutil.rmtree('tests/test_solvers/tmp')

View File

@@ -1,167 +1,177 @@
import torch
import torch.nn as nn
from pina.problem import AbstractProblem
import pytest
from pina import Condition, LabelTensor
from pina.solvers import GAROM
from pina.condition import InputOutputPointsCondition
from pina.problem import AbstractProblem
from pina.model import FeedForward
from pina.trainer import Trainer
import torch.nn as nn
import matplotlib.tri as tri
from torch._dynamo.eval_frame import OptimizedModule
# def func(x, mu1, mu2):
# import torch
# x_m1 = (x[:, 0] - mu1).pow(2)
# x_m2 = (x[:, 1] - mu2).pow(2)
# norm = x[:, 0]**2 + x[:, 1]**2
# return torch.exp(-(x_m1 + x_m2))
class TensorProblem(AbstractProblem):
input_variables = ['u_0', 'u_1']
output_variables = ['u']
conditions = {
'data': Condition(
output_points=torch.randn(50, 2),
input_points=torch.randn(50, 1))
}
# class ParametricGaussian(AbstractProblem):
# output_variables = [f'u_{i}' for i in range(900)]
# simple Generator Network
class Generator(nn.Module):
# # params
# xx = torch.linspace(-1, 1, 20)
# yy = xx
# params = LabelTensor(torch.cartesian_prod(xx, yy), labels=['mu1', 'mu2'])
def __init__(self,
input_dimension=2,
parameters_dimension=1,
noise_dimension=2,
activation=torch.nn.SiLU):
super().__init__()
# # define domain
# x = torch.linspace(-1, 1, 30)
# domain = torch.cartesian_prod(x, x)
# triang = tri.Triangulation(domain[:, 0], domain[:, 1])
# sol = []
# for p in params:
# sol.append(func(domain, p[0], p[1]))
# snapshots = LabelTensor(torch.stack(sol), labels=output_variables)
self._noise_dimension = noise_dimension
self._activation = activation
self.model = FeedForward(6*noise_dimension, input_dimension)
self.condition = FeedForward(parameters_dimension, 5 * noise_dimension)
# # define conditions
# conditions = {
# 'data': Condition(input_points=params, output_points=snapshots)
# }
def forward(self, param):
# uniform sampling in [-1, 1]
z = 2 * torch.rand(size=(param.shape[0], self._noise_dimension),
device=param.device,
dtype=param.dtype,
requires_grad=True) - 1
return self.model(torch.cat((z, self.condition(param)), dim=-1))
# Simple Discriminator Network
# # simple Generator Network
# class Generator(nn.Module):
class Discriminator(nn.Module):
# def __init__(self,
# input_dimension,
# parameters_dimension,
# noise_dimension,
# activation=torch.nn.SiLU):
# super().__init__()
def __init__(self,
input_dimension=2,
parameter_dimension=1,
hidden_dimension=2,
activation=torch.nn.ReLU):
super().__init__()
# self._noise_dimension = noise_dimension
# self._activation = activation
self._activation = activation
self.encoding = FeedForward(input_dimension, hidden_dimension)
self.decoding = FeedForward(2*hidden_dimension, input_dimension)
self.condition = FeedForward(parameter_dimension, hidden_dimension)
# self.model = torch.nn.Sequential(
# torch.nn.Linear(6 * self._noise_dimension, input_dimension // 6),
# self._activation(),
# torch.nn.Linear(input_dimension // 6, input_dimension // 3),
# self._activation(),
# torch.nn.Linear(input_dimension // 3, input_dimension))
# self.condition = torch.nn.Sequential(
# torch.nn.Linear(parameters_dimension, 2 * self._noise_dimension),
# self._activation(),
# torch.nn.Linear(2 * self._noise_dimension,
# 5 * self._noise_dimension))
# def forward(self, param):
# # uniform sampling in [-1, 1]
# z = torch.rand(size=(param.shape[0], self._noise_dimension),
# device=param.device,
# dtype=param.dtype,
# requires_grad=True)
# z = 2. * z - 1.
# # conditioning by concatenation of mapped parameters
# input_ = torch.cat((z, self.condition(param)), dim=-1)
# out = self.model(input_)
# return out
def forward(self, data):
x, condition = data
encoding = self.encoding(x)
conditioning = torch.cat((encoding, self.condition(condition)), dim=-1)
decoding = self.decoding(conditioning)
return decoding
# # Simple Discriminator Network
# class Discriminator(nn.Module):
# def __init__(self,
# input_dimension,
# parameter_dimension,
# hidden_dimension,
# activation=torch.nn.ReLU):
# super().__init__()
# self._activation = activation
# self.encoding = torch.nn.Sequential(
# torch.nn.Linear(input_dimension, input_dimension // 3),
# self._activation(),
# torch.nn.Linear(input_dimension // 3, input_dimension // 6),
# self._activation(),
# torch.nn.Linear(input_dimension // 6, hidden_dimension))
# self.decoding = torch.nn.Sequential(
# torch.nn.Linear(2 * hidden_dimension, input_dimension // 6),
# self._activation(),
# torch.nn.Linear(input_dimension // 6, input_dimension // 3),
# self._activation(),
# torch.nn.Linear(input_dimension // 3, input_dimension),
# )
# self.condition = torch.nn.Sequential(
# torch.nn.Linear(parameter_dimension, hidden_dimension // 2),
# self._activation(),
# torch.nn.Linear(hidden_dimension // 2, hidden_dimension))
# def forward(self, data):
# x, condition = data
# encoding = self.encoding(x)
# conditioning = torch.cat((encoding, self.condition(condition)), dim=-1)
# decoding = self.decoding(conditioning)
# return decoding
def test_constructor():
GAROM(problem=TensorProblem(),
generator=Generator(),
discriminator=Discriminator())
assert GAROM.accepted_conditions_types == (
InputOutputPointsCondition
)
# problem = ParametricGaussian()
@pytest.mark.parametrize("batch_size", [None, 1, 5, 20])
@pytest.mark.parametrize("compile", [True, False])
def test_solver_train(batch_size, compile):
solver = GAROM(problem=TensorProblem(),
generator=Generator(),
discriminator=Discriminator())
trainer = Trainer(solver=solver,
max_epochs=2,
accelerator='cpu',
batch_size=batch_size,
train_size=1.,
test_size=0.,
val_size=0.,
compile=compile)
trainer.train()
if trainer.compile:
assert (all([isinstance(model, OptimizedModule)
for model in solver.models]))
# def test_constructor():
# GAROM(problem=problem,
# generator=Generator(input_dimension=900,
# parameters_dimension=2,
# noise_dimension=12),
# discriminator=Discriminator(input_dimension=900,
# parameter_dimension=2,
# hidden_dimension=64))
@pytest.mark.parametrize("batch_size", [None, 1, 5, 20])
@pytest.mark.parametrize("compile", [True, False])
def test_solver_validation(batch_size, compile):
solver = GAROM(problem=TensorProblem(),
generator=Generator(),
discriminator=Discriminator())
trainer = Trainer(solver=solver,
max_epochs=2,
accelerator='cpu',
batch_size=batch_size,
train_size=0.9,
val_size=0.1,
test_size=0.,
compile=compile)
trainer.train()
if trainer.compile:
assert (all([isinstance(model, OptimizedModule)
for model in solver.models]))
# def test_train_cpu():
# solver = GAROM(problem=problem,
# generator=Generator(input_dimension=900,
# parameters_dimension=2,
# noise_dimension=12),
# discriminator=Discriminator(input_dimension=900,
# parameter_dimension=2,
# hidden_dimension=64))
# trainer = Trainer(solver=solver, max_epochs=4, accelerator='cpu', batch_size=20)
# trainer.train()
@pytest.mark.parametrize("batch_size", [None, 1, 5, 20])
@pytest.mark.parametrize("compile", [True, False])
def test_solver_test(batch_size, compile):
solver = GAROM(problem=TensorProblem(),
generator=Generator(),
discriminator=Discriminator(),
)
trainer = Trainer(solver=solver,
max_epochs=2,
accelerator='cpu',
batch_size=batch_size,
train_size=0.8,
val_size=0.1,
test_size=0.1,
compile=compile)
trainer.test()
if trainer.compile:
assert (all([isinstance(model, OptimizedModule)
for model in solver.models]))
# def test_sample():
# solver = GAROM(problem=problem,
# generator=Generator(input_dimension=900,
# parameters_dimension=2,
# noise_dimension=12),
# discriminator=Discriminator(input_dimension=900,
# parameter_dimension=2,
# hidden_dimension=64))
# solver.sample(problem.params)
# assert solver.sample(problem.params).shape == problem.snapshots.shape
def test_train_load_restore():
dir = "tests/test_solvers/tmp/"
problem = TensorProblem()
solver = GAROM(problem=TensorProblem(),
generator=Generator(),
discriminator=Discriminator(),
)
trainer = Trainer(solver=solver,
max_epochs=5,
accelerator='cpu',
batch_size=None,
train_size=0.9,
test_size=0.1,
val_size=0.,
default_root_dir=dir)
trainer.train()
# restore
new_trainer = Trainer(solver=solver, max_epochs=5, accelerator='cpu')
new_trainer.train(
ckpt_path=f'{dir}/lightning_logs/version_0/checkpoints/' +
'epoch=4-step=5.ckpt')
# def test_forward():
# solver = GAROM(problem=problem,
# generator=Generator(input_dimension=900,
# parameters_dimension=2,
# noise_dimension=12),
# discriminator=Discriminator(input_dimension=900,
# parameter_dimension=2,
# hidden_dimension=64))
# solver(problem.params, mc_steps=100, variance=True)
# assert solver(problem.params).shape == problem.snapshots.shape
# loading
new_solver = GAROM.load_from_checkpoint(
f'{dir}/lightning_logs/version_0/checkpoints/epoch=4-step=5.ckpt',
problem=TensorProblem(), generator=Generator(), discriminator=Discriminator())
test_pts = torch.rand(20, 1)
assert new_solver.forward(test_pts).shape == (20, 2)
assert new_solver.forward(test_pts).shape == solver.forward(test_pts).shape
# rm directories
import shutil
shutil.rmtree('tests/test_solvers/tmp')

View File

@@ -1,444 +0,0 @@
import torch
from pina.problem import SpatialProblem, InverseProblem
from pina.operators import laplacian
from pina.domain import CartesianDomain
from pina import Condition, LabelTensor
from pina.solvers import GPINN
from pina.trainer import Trainer
from pina.model import FeedForward
from pina.equation import Equation
from pina.equation.equation_factory import FixedValue
from pina.loss import LpLoss
# def laplace_equation(input_, output_):
# force_term = (torch.sin(input_.extract(['x']) * torch.pi) *
# torch.sin(input_.extract(['y']) * torch.pi))
# delta_u = laplacian(output_.extract(['u']), input_)
# return delta_u - force_term
# my_laplace = Equation(laplace_equation)
# in_ = LabelTensor(torch.tensor([[0., 1.]]), ['x', 'y'])
# out_ = LabelTensor(torch.tensor([[0.]]), ['u'])
# in2_ = LabelTensor(torch.rand(60, 2), ['x', 'y'])
# out2_ = LabelTensor(torch.rand(60, 1), ['u'])
# class InversePoisson(SpatialProblem, InverseProblem):
# '''
# Problem definition for the Poisson equation.
# '''
# output_variables = ['u']
# x_min = -2
# x_max = 2
# y_min = -2
# y_max = 2
# data_input = LabelTensor(torch.rand(10, 2), ['x', 'y'])
# data_output = LabelTensor(torch.rand(10, 1), ['u'])
# spatial_domain = CartesianDomain({'x': [x_min, x_max], 'y': [y_min, y_max]})
# # define the ranges for the parameters
# unknown_parameter_domain = CartesianDomain({'mu1': [-1, 1], 'mu2': [-1, 1]})
# def laplace_equation(input_, output_, params_):
# '''
# Laplace equation with a force term.
# '''
# force_term = torch.exp(
# - 2*(input_.extract(['x']) - params_['mu1'])**2
# - 2*(input_.extract(['y']) - params_['mu2'])**2)
# delta_u = laplacian(output_, input_, components=['u'], d=['x', 'y'])
# return delta_u - force_term
# # define the conditions for the loss (boundary conditions, equation, data)
# conditions = {
# 'gamma1': Condition(location=CartesianDomain({'x': [x_min, x_max],
# 'y': y_max}),
# equation=FixedValue(0.0, components=['u'])),
# 'gamma2': Condition(location=CartesianDomain(
# {'x': [x_min, x_max], 'y': y_min}),
# equation=FixedValue(0.0, components=['u'])),
# 'gamma3': Condition(location=CartesianDomain(
# {'x': x_max, 'y': [y_min, y_max]}),
# equation=FixedValue(0.0, components=['u'])),
# 'gamma4': Condition(location=CartesianDomain(
# {'x': x_min, 'y': [y_min, y_max]
# }),
# equation=FixedValue(0.0, components=['u'])),
# 'D': Condition(location=CartesianDomain(
# {'x': [x_min, x_max], 'y': [y_min, y_max]
# }),
# equation=Equation(laplace_equation)),
# 'data': Condition(
# input_points=data_input.extract(['x', 'y']),
# output_points=data_output)
# }
# class Poisson(SpatialProblem):
# output_variables = ['u']
# spatial_domain = CartesianDomain({'x': [0, 1], 'y': [0, 1]})
# conditions = {
# 'gamma1': Condition(
# location=CartesianDomain({'x': [0, 1], 'y': 1}),
# equation=FixedValue(0.0)),
# 'gamma2': Condition(
# location=CartesianDomain({'x': [0, 1], 'y': 0}),
# equation=FixedValue(0.0)),
# 'gamma3': Condition(
# location=CartesianDomain({'x': 1, 'y': [0, 1]}),
# equation=FixedValue(0.0)),
# 'gamma4': Condition(
# location=CartesianDomain({'x': 0, 'y': [0, 1]}),
# equation=FixedValue(0.0)),
# 'D': Condition(
# input_points=LabelTensor(torch.rand(size=(100, 2)), ['x', 'y']),
# equation=my_laplace),
# 'data': Condition(
# input_points=in_,
# output_points=out_),
# 'data2': Condition(
# input_points=in2_,
# output_points=out2_)
# }
# def poisson_sol(self, pts):
# return -(torch.sin(pts.extract(['x']) * torch.pi) *
# torch.sin(pts.extract(['y']) * torch.pi)) / (2 * torch.pi**2)
# truth_solution = poisson_sol
# class myFeature(torch.nn.Module):
# """
# Feature: sin(x)
# """
# def __init__(self):
# super(myFeature, self).__init__()
# def forward(self, x):
# t = (torch.sin(x.extract(['x']) * torch.pi) *
# torch.sin(x.extract(['y']) * torch.pi))
# return LabelTensor(t, ['sin(x)sin(y)'])
# # make the problem
# poisson_problem = Poisson()
# model = FeedForward(len(poisson_problem.input_variables),
# len(poisson_problem.output_variables))
# model_extra_feats = FeedForward(
# len(poisson_problem.input_variables) + 1,
# len(poisson_problem.output_variables))
# extra_feats = [myFeature()]
# def test_constructor():
# GPINN(problem=poisson_problem, model=model, extra_features=None)
# def test_constructor_extra_feats():
# model_extra_feats = FeedForward(
# len(poisson_problem.input_variables) + 1,
# len(poisson_problem.output_variables))
# GPINN(problem=poisson_problem,
# model=model_extra_feats,
# extra_features=extra_feats)
# def test_train_cpu():
# poisson_problem = Poisson()
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# n = 10
# poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
# pinn = GPINN(problem = poisson_problem,
# model=model, extra_features=None, loss=LpLoss())
# trainer = Trainer(solver=pinn, max_epochs=1,
# accelerator='cpu', batch_size=20)
# trainer.train()
# def test_log():
# poisson_problem.discretise_domain(100)
# solver = GPINN(problem = poisson_problem, model=model,
# extra_features=None, loss=LpLoss())
# trainer = Trainer(solver, max_epochs=2, accelerator='cpu')
# trainer.train()
# # assert the logged metrics are correct
# logged_metrics = sorted(list(trainer.logged_metrics.keys()))
# total_metrics = sorted(
# list([key + '_loss' for key in poisson_problem.conditions.keys()])
# + ['mean_loss'])
# assert logged_metrics == total_metrics
# def test_train_restore():
# tmpdir = "tests/tmp_restore"
# poisson_problem = Poisson()
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# n = 10
# poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
# pinn = GPINN(problem=poisson_problem,
# model=model,
# extra_features=None,
# loss=LpLoss())
# trainer = Trainer(solver=pinn,
# max_epochs=5,
# accelerator='cpu',
# default_root_dir=tmpdir)
# trainer.train()
# ntrainer = Trainer(solver=pinn, max_epochs=15, accelerator='cpu')
# t = ntrainer.train(
# ckpt_path=f'{tmpdir}/lightning_logs/version_0/'
# 'checkpoints/epoch=4-step=10.ckpt')
# import shutil
# shutil.rmtree(tmpdir)
# def test_train_load():
# tmpdir = "tests/tmp_load"
# poisson_problem = Poisson()
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# n = 10
# poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
# pinn = GPINN(problem=poisson_problem,
# model=model,
# extra_features=None,
# loss=LpLoss())
# trainer = Trainer(solver=pinn,
# max_epochs=15,
# accelerator='cpu',
# default_root_dir=tmpdir)
# trainer.train()
# new_pinn = GPINN.load_from_checkpoint(
# f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=14-step=30.ckpt',
# problem = poisson_problem, model=model)
# test_pts = CartesianDomain({'x': [0, 1], 'y': [0, 1]}).sample(10)
# assert new_pinn.forward(test_pts).extract(['u']).shape == (10, 1)
# assert new_pinn.forward(test_pts).extract(
# ['u']).shape == pinn.forward(test_pts).extract(['u']).shape
# torch.testing.assert_close(
# new_pinn.forward(test_pts).extract(['u']),
# pinn.forward(test_pts).extract(['u']))
# import shutil
# shutil.rmtree(tmpdir)
# def test_train_inverse_problem_cpu():
# poisson_problem = InversePoisson()
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4', 'D']
# n = 100
# poisson_problem.discretise_domain(n, 'random', locations=boundaries)
# pinn = GPINN(problem = poisson_problem,
# model=model, extra_features=None, loss=LpLoss())
# trainer = Trainer(solver=pinn, max_epochs=1,
# accelerator='cpu', batch_size=20)
# trainer.train()
# # # TODO does not currently work
# # def test_train_inverse_problem_restore():
# # tmpdir = "tests/tmp_restore_inv"
# # poisson_problem = InversePoisson()
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4', 'D']
# # n = 100
# # poisson_problem.discretise_domain(n, 'random', locations=boundaries)
# # pinn = GPINN(problem=poisson_problem,
# # model=model,
# # extra_features=None,
# # loss=LpLoss())
# # trainer = Trainer(solver=pinn,
# # max_epochs=5,
# # accelerator='cpu',
# # default_root_dir=tmpdir)
# # trainer.train()
# # ntrainer = Trainer(solver=pinn, max_epochs=5, accelerator='cpu')
# # t = ntrainer.train(
# # ckpt_path=f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=4-step=10.ckpt')
# # import shutil
# # shutil.rmtree(tmpdir)
# def test_train_inverse_problem_load():
# tmpdir = "tests/tmp_load_inv"
# poisson_problem = InversePoisson()
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4', 'D']
# n = 100
# poisson_problem.discretise_domain(n, 'random', locations=boundaries)
# pinn = GPINN(problem=poisson_problem,
# model=model,
# extra_features=None,
# loss=LpLoss())
# trainer = Trainer(solver=pinn,
# max_epochs=15,
# accelerator='cpu',
# default_root_dir=tmpdir)
# trainer.train()
# new_pinn = GPINN.load_from_checkpoint(
# f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=14-step=30.ckpt',
# problem = poisson_problem, model=model)
# test_pts = CartesianDomain({'x': [0, 1], 'y': [0, 1]}).sample(10)
# assert new_pinn.forward(test_pts).extract(['u']).shape == (10, 1)
# assert new_pinn.forward(test_pts).extract(
# ['u']).shape == pinn.forward(test_pts).extract(['u']).shape
# torch.testing.assert_close(
# new_pinn.forward(test_pts).extract(['u']),
# pinn.forward(test_pts).extract(['u']))
# import shutil
# shutil.rmtree(tmpdir)
# # # TODO fix asap. Basically sampling few variables
# # # works only if both variables are in a range.
# # # if one is fixed and the other not, this will
# # # not work. This test also needs to be fixed and
# # # insert in test problem not in test pinn.
# # def test_train_cpu_sampling_few_vars():
# # poisson_problem = Poisson()
# # boundaries = ['gamma1', 'gamma2', 'gamma3']
# # n = 10
# # poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
# # poisson_problem.discretise_domain(n, 'random', locations=['gamma4'], variables=['x'])
# # poisson_problem.discretise_domain(n, 'random', locations=['gamma4'], variables=['y'])
# # pinn = GPINN(problem = poisson_problem, model=model, extra_features=None, loss=LpLoss())
# # trainer = Trainer(solver=pinn, kwargs={'max_epochs' : 5, 'accelerator':'cpu'})
# # trainer.train()
# def test_train_extra_feats_cpu():
# poisson_problem = Poisson()
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# n = 10
# poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
# pinn = GPINN(problem=poisson_problem,
# model=model_extra_feats,
# extra_features=extra_feats)
# trainer = Trainer(solver=pinn, max_epochs=5, accelerator='cpu')
# trainer.train()
# # TODO, fix GitHub actions to run also on GPU
# # def test_train_gpu():
# # poisson_problem = Poisson()
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # n = 10
# # poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
# # pinn = GPINN(problem = poisson_problem, model=model, extra_features=None, loss=LpLoss())
# # trainer = Trainer(solver=pinn, kwargs={'max_epochs' : 5, 'accelerator':'gpu'})
# # trainer.train()
# # def test_train_gpu(): #TODO fix ASAP
# # poisson_problem = Poisson()
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # n = 10
# # poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
# # poisson_problem.conditions.pop('data') # The input/output pts are allocated on cpu
# # pinn = GPINN(problem = poisson_problem, model=model, extra_features=None, loss=LpLoss())
# # trainer = Trainer(solver=pinn, kwargs={'max_epochs' : 5, 'accelerator':'gpu'})
# # trainer.train()
# # def test_train_2():
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # n = 10
# # expected_keys = [[], list(range(0, 50, 3))]
# # param = [0, 3]
# # for i, truth_key in zip(param, expected_keys):
# # pinn = GPINN(problem, model)
# # pinn.discretise_domain(n, 'grid', locations=boundaries)
# # pinn.discretise_domain(n, 'grid', locations=['D'])
# # pinn.train(50, save_loss=i)
# # assert list(pinn.history_loss.keys()) == truth_key
# # def test_train_extra_feats():
# # pinn = GPINN(problem, model_extra_feat, [myFeature()])
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # n = 10
# # pinn.discretise_domain(n, 'grid', locations=boundaries)
# # pinn.discretise_domain(n, 'grid', locations=['D'])
# # pinn.train(5)
# # def test_train_2_extra_feats():
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # n = 10
# # expected_keys = [[], list(range(0, 50, 3))]
# # param = [0, 3]
# # for i, truth_key in zip(param, expected_keys):
# # pinn = GPINN(problem, model_extra_feat, [myFeature()])
# # pinn.discretise_domain(n, 'grid', locations=boundaries)
# # pinn.discretise_domain(n, 'grid', locations=['D'])
# # pinn.train(50, save_loss=i)
# # assert list(pinn.history_loss.keys()) == truth_key
# # def test_train_with_optimizer_kwargs():
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # n = 10
# # expected_keys = [[], list(range(0, 50, 3))]
# # param = [0, 3]
# # for i, truth_key in zip(param, expected_keys):
# # pinn = GPINN(problem, model, optimizer_kwargs={'lr' : 0.3})
# # pinn.discretise_domain(n, 'grid', locations=boundaries)
# # pinn.discretise_domain(n, 'grid', locations=['D'])
# # pinn.train(50, save_loss=i)
# # assert list(pinn.history_loss.keys()) == truth_key
# # def test_train_with_lr_scheduler():
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # n = 10
# # expected_keys = [[], list(range(0, 50, 3))]
# # param = [0, 3]
# # for i, truth_key in zip(param, expected_keys):
# # pinn = GPINN(
# # problem,
# # model,
# # lr_scheduler_type=torch.optim.lr_scheduler.CyclicLR,
# # lr_scheduler_kwargs={'base_lr' : 0.1, 'max_lr' : 0.3, 'cycle_momentum': False}
# # )
# # pinn.discretise_domain(n, 'grid', locations=boundaries)
# # pinn.discretise_domain(n, 'grid', locations=['D'])
# # pinn.train(50, save_loss=i)
# # assert list(pinn.history_loss.keys()) == truth_key
# # # def test_train_batch():
# # # pinn = GPINN(problem, model, batch_size=6)
# # # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # # n = 10
# # # pinn.discretise_domain(n, 'grid', locations=boundaries)
# # # pinn.discretise_domain(n, 'grid', locations=['D'])
# # # pinn.train(5)
# # # def test_train_batch_2():
# # # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # # n = 10
# # # expected_keys = [[], list(range(0, 50, 3))]
# # # param = [0, 3]
# # # for i, truth_key in zip(param, expected_keys):
# # # pinn = GPINN(problem, model, batch_size=6)
# # # pinn.discretise_domain(n, 'grid', locations=boundaries)
# # # pinn.discretise_domain(n, 'grid', locations=['D'])
# # # pinn.train(50, save_loss=i)
# # # assert list(pinn.history_loss.keys()) == truth_key
# # if torch.cuda.is_available():
# # # def test_gpu_train():
# # # pinn = GPINN(problem, model, batch_size=20, device='cuda')
# # # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # # n = 100
# # # pinn.discretise_domain(n, 'grid', locations=boundaries)
# # # pinn.discretise_domain(n, 'grid', locations=['D'])
# # # pinn.train(5)
# # def test_gpu_train_nobatch():
# # pinn = GPINN(problem, model, batch_size=None, device='cuda')
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # n = 100
# # pinn.discretise_domain(n, 'grid', locations=boundaries)
# # pinn.discretise_domain(n, 'grid', locations=['D'])
# # pinn.train(5)

View File

@@ -0,0 +1,155 @@
import pytest
import torch
from pina import LabelTensor, Condition
from pina.problem import TimeDependentProblem
from pina.solvers import GradientPINN
from pina.model import FeedForward
from pina.trainer import Trainer
from pina.problem.zoo import (
Poisson2DSquareProblem as Poisson,
InversePoisson2DSquareProblem as InversePoisson
)
from pina.condition import (
InputOutputPointsCondition,
InputPointsEquationCondition,
DomainEquationCondition
)
from torch._dynamo.eval_frame import OptimizedModule
class DummyTimeProblem(TimeDependentProblem):
"""
A mock time-dependent problem for testing purposes.
"""
output_variables = ['u']
temporal_domain = None
conditions = {}
# define problems and model
problem = Poisson()
problem.discretise_domain(50)
inverse_problem = InversePoisson()
inverse_problem.discretise_domain(50)
model = FeedForward(
len(problem.input_variables),
len(problem.output_variables)
)
# add input-output condition to test supervised learning
input_pts = torch.rand(50, len(problem.input_variables))
input_pts = LabelTensor(input_pts, problem.input_variables)
output_pts = torch.rand(50, len(problem.output_variables))
output_pts = LabelTensor(output_pts, problem.output_variables)
problem.conditions['data'] = Condition(
input_points=input_pts,
output_points=output_pts
)
@pytest.mark.parametrize("problem", [problem, inverse_problem])
def test_constructor(problem):
with pytest.raises(ValueError):
GradientPINN(model=model, problem=DummyTimeProblem())
solver = GradientPINN(model=model, problem=problem)
assert solver.accepted_conditions_types == (
InputOutputPointsCondition,
InputPointsEquationCondition,
DomainEquationCondition
)
@pytest.mark.parametrize("problem", [problem, inverse_problem])
@pytest.mark.parametrize("batch_size", [None, 1, 5, 20])
@pytest.mark.parametrize("compile", [True, False])
def test_solver_train(problem, batch_size, compile):
solver = GradientPINN(model=model, problem=problem)
trainer = Trainer(solver=solver,
max_epochs=2,
accelerator='cpu',
batch_size=batch_size,
train_size=1.,
val_size=0.,
test_size=0.,
compile=compile)
trainer.train()
if trainer.compile:
assert (isinstance(solver.model, OptimizedModule))
@pytest.mark.parametrize("problem", [problem, inverse_problem])
@pytest.mark.parametrize("batch_size", [None, 1, 5, 20])
@pytest.mark.parametrize("compile", [True, False])
def test_solver_validation(problem, batch_size, compile):
solver = GradientPINN(model=model, problem=problem)
trainer = Trainer(solver=solver,
max_epochs=2,
accelerator='cpu',
batch_size=batch_size,
train_size=0.9,
val_size=0.1,
test_size=0.,
compile=compile)
trainer.train()
if trainer.compile:
assert (isinstance(solver.model, OptimizedModule))
@pytest.mark.parametrize("problem", [problem, inverse_problem])
@pytest.mark.parametrize("batch_size", [None, 1, 5, 20])
@pytest.mark.parametrize("compile", [True, False])
def test_solver_test(problem, batch_size, compile):
solver = GradientPINN(model=model, problem=problem)
trainer = Trainer(solver=solver,
max_epochs=2,
accelerator='cpu',
batch_size=batch_size,
train_size=0.7,
val_size=0.2,
test_size=0.1,
compile=compile)
trainer.test()
if trainer.compile:
assert (isinstance(solver.model, OptimizedModule))
@pytest.mark.parametrize("problem", [problem, inverse_problem])
def test_train_load_restore(problem):
dir = "tests/test_solvers/tmp"
problem = problem
solver = GradientPINN(model=model, problem=problem)
trainer = Trainer(solver=solver,
max_epochs=5,
accelerator='cpu',
batch_size=None,
train_size=0.7,
val_size=0.2,
test_size=0.1,
default_root_dir=dir)
trainer.train()
# restore
new_trainer = Trainer(solver=solver, max_epochs=5, accelerator='cpu')
new_trainer.train(
ckpt_path=f'{dir}/lightning_logs/version_0/checkpoints/' +
'epoch=4-step=5.ckpt')
# loading
new_solver = GradientPINN.load_from_checkpoint(
f'{dir}/lightning_logs/version_0/checkpoints/epoch=4-step=5.ckpt',
problem=problem, model=model)
test_pts = LabelTensor(torch.rand(20, 2), problem.input_variables)
assert new_solver.forward(test_pts).shape == (20, 1)
assert new_solver.forward(test_pts).shape == (
solver.forward(test_pts).shape
)
torch.testing.assert_close(
new_solver.forward(test_pts),
solver.forward(test_pts))
# rm directories
import shutil
shutil.rmtree('tests/test_solvers/tmp')

View File

@@ -1,185 +1,134 @@
import pytest
import torch
from pina.problem import SpatialProblem, InverseProblem
from pina.operators import laplacian
from pina.domain import CartesianDomain
from pina import Condition, LabelTensor
from pina.solvers import PINN
from pina.trainer import Trainer
from pina import LabelTensor, Condition
from pina.model import FeedForward
from pina.equation import Equation
from pina.equation.equation_factory import FixedValue
from pina.loss import LpLoss
from pina.problem.zoo import Poisson2DSquareProblem
# class InversePoisson(SpatialProblem, InverseProblem):
# '''
# Problem definition for the Poisson equation.
# '''
# output_variables = ['u']
# x_min = -2
# x_max = 2
# y_min = -2
# y_max = 2
# data_input = LabelTensor(torch.rand(10, 2), ['x', 'y'])
# data_output = LabelTensor(torch.rand(10, 1), ['u'])
# spatial_domain = CartesianDomain({'x': [x_min, x_max], 'y': [y_min, y_max]})
# # define the ranges for the parameters
# unknown_parameter_domain = CartesianDomain({'mu1': [-1, 1], 'mu2': [-1, 1]})
# def laplace_equation(input_, output_, params_):
# '''
# Laplace equation with a force term.
# '''
# force_term = torch.exp(
# - 2*(input_.extract(['x']) - params_['mu1'])**2
# - 2*(input_.extract(['y']) - params_['mu2'])**2)
# delta_u = laplacian(output_, input_, components=['u'], d=['x', 'y'])
# return delta_u - force_term
# # define the conditions for the loss (boundary conditions, equation, data)
# conditions = {
# 'gamma1': Condition(domain=CartesianDomain({'x': [x_min, x_max],
# 'y': y_max}),
# equation=FixedValue(0.0, components=['u'])),
# 'gamma2': Condition(domain=CartesianDomain(
# {'x': [x_min, x_max], 'y': y_min
# }),
# equation=FixedValue(0.0, components=['u'])),
# 'gamma3': Condition(domain=CartesianDomain(
# {'x': x_max, 'y': [y_min, y_max]
# }),
# equation=FixedValue(0.0, components=['u'])),
# 'gamma4': Condition(domain=CartesianDomain(
# {'x': x_min, 'y': [y_min, y_max]
# }),
# equation=FixedValue(0.0, components=['u'])),
# 'D': Condition(domain=CartesianDomain(
# {'x': [x_min, x_max], 'y': [y_min, y_max]
# }),
# equation=Equation(laplace_equation)),
# 'data': Condition(input_points=data_input.extract(['x', 'y']),
# output_points=data_output)
# }
from pina.trainer import Trainer
from pina.solvers import PINN
from pina.condition import (
InputOutputPointsCondition,
InputPointsEquationCondition,
DomainEquationCondition
)
from pina.problem.zoo import (
Poisson2DSquareProblem as Poisson,
InversePoisson2DSquareProblem as InversePoisson
)
from torch._dynamo.eval_frame import OptimizedModule
# # make the problem
# poisson_problem = Poisson2DSquareProblem()
# model = FeedForward(len(poisson_problem.input_variables),
# len(poisson_problem.output_variables))
# model_extra_feats = FeedForward(
# len(poisson_problem.input_variables) + 1,
# len(poisson_problem.output_variables))
# define problems and model
problem = Poisson()
problem.discretise_domain(50)
inverse_problem = InversePoisson()
inverse_problem.discretise_domain(50)
model = FeedForward(
len(problem.input_variables),
len(problem.output_variables)
)
# add input-output condition to test supervised learning
input_pts = torch.rand(50, len(problem.input_variables))
input_pts = LabelTensor(input_pts, problem.input_variables)
output_pts = torch.rand(50, len(problem.output_variables))
output_pts = LabelTensor(output_pts, problem.output_variables)
problem.conditions['data'] = Condition(
input_points=input_pts,
output_points=output_pts
)
@pytest.mark.parametrize("problem", [problem, inverse_problem])
def test_constructor(problem):
solver = PINN(problem=problem, model=model)
assert solver.accepted_conditions_types == (
InputOutputPointsCondition,
InputPointsEquationCondition,
DomainEquationCondition
)
@pytest.mark.parametrize("problem", [problem, inverse_problem])
@pytest.mark.parametrize("batch_size", [None, 1, 5, 20])
@pytest.mark.parametrize("compile", [True, False])
def test_solver_train(problem, batch_size, compile):
solver = PINN(model=model, problem=problem)
trainer = Trainer(solver=solver,
max_epochs=2,
accelerator='cpu',
batch_size=batch_size,
train_size=1.,
val_size=0.,
test_size=0.,
compile=compile)
trainer.train()
# def test_constructor():
# PINN(problem=poisson_problem, model=model, extra_features=None)
@pytest.mark.parametrize("problem", [problem, inverse_problem])
@pytest.mark.parametrize("batch_size", [None, 1, 5, 20])
@pytest.mark.parametrize("compile", [True, False])
def test_solver_validation(problem, batch_size, compile):
solver = PINN(model=model, problem=problem)
trainer = Trainer(solver=solver,
max_epochs=2,
accelerator='cpu',
batch_size=batch_size,
train_size=0.9,
val_size=0.1,
test_size=0.,
compile=compile)
trainer.train()
if trainer.compile:
assert(isinstance(solver.model, OptimizedModule))
@pytest.mark.parametrize("problem", [problem, inverse_problem])
@pytest.mark.parametrize("batch_size", [None, 1, 5, 20])
@pytest.mark.parametrize("compile", [True, False])
def test_solver_test(problem, batch_size, compile):
solver = PINN(model=model, problem=problem)
trainer = Trainer(solver=solver,
max_epochs=2,
accelerator='cpu',
batch_size=batch_size,
train_size=0.7,
val_size=0.2,
test_size=0.1,
compile=compile)
trainer.test()
# def test_constructor_extra_feats():
# model_extra_feats = FeedForward(
# len(poisson_problem.input_variables) + 1,
# len(poisson_problem.output_variables))
# PINN(problem=poisson_problem,
# model=model_extra_feats)
@pytest.mark.parametrize("problem", [problem, inverse_problem])
def test_train_load_restore(problem):
dir = "tests/test_solvers/tmp"
problem = problem
solver = PINN(model=model, problem=problem)
trainer = Trainer(solver=solver,
max_epochs=5,
accelerator='cpu',
batch_size=None,
train_size=0.7,
val_size=0.2,
test_size=0.1,
default_root_dir=dir)
trainer.train()
# restore
new_trainer = Trainer(solver=solver, max_epochs=5, accelerator='cpu')
new_trainer.train(
ckpt_path=f'{dir}/lightning_logs/version_0/checkpoints/' +
'epoch=4-step=5.ckpt')
# def test_train_cpu():
# poisson_problem = Poisson2DSquareProblem()
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# n = 10
# poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
# pinn = PINN(problem = poisson_problem, model=model,
# extra_features=None, loss=LpLoss())
# trainer = Trainer(solver=pinn, max_epochs=1,
# accelerator='cpu', batch_size=20, val_size=0., train_size=1., test_size=0.)
# loading
new_solver = PINN.load_from_checkpoint(
f'{dir}/lightning_logs/version_0/checkpoints/epoch=4-step=5.ckpt',
problem=problem, model=model)
# def test_train_load():
# tmpdir = "tests/tmp_load"
# poisson_problem = Poisson2DSquareProblem()
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# n = 10
# poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
# pinn = PINN(problem=poisson_problem,
# model=model,
# extra_features=None,
# loss=LpLoss())
# trainer = Trainer(solver=pinn,
# max_epochs=15,
# accelerator='cpu',
# default_root_dir=tmpdir)
# trainer.train()
# new_pinn = PINN.load_from_checkpoint(
# f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=14-step=15.ckpt',
# problem = poisson_problem, model=model)
# test_pts = CartesianDomain({'x': [0, 1], 'y': [0, 1]}).sample(10)
# assert new_pinn.forward(test_pts).extract(['u']).shape == (10, 1)
# assert new_pinn.forward(test_pts).extract(
# ['u']).shape == pinn.forward(test_pts).extract(['u']).shape
# torch.testing.assert_close(
# new_pinn.forward(test_pts).extract(['u']),
# pinn.forward(test_pts).extract(['u']))
# import shutil
# shutil.rmtree(tmpdir)
test_pts = LabelTensor(torch.rand(20, 2), problem.input_variables)
assert new_solver.forward(test_pts).shape == (20, 1)
assert new_solver.forward(test_pts).shape == solver.forward(test_pts).shape
torch.testing.assert_close(
new_solver.forward(test_pts),
solver.forward(test_pts))
# def test_train_restore():
# tmpdir = "tests/tmp_restore"
# poisson_problem = Poisson2DSquareProblem()
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# n = 10
# poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
# pinn = PINN(problem=poisson_problem,
# model=model,
# extra_features=None,
# loss=LpLoss())
# trainer = Trainer(solver=pinn,
# max_epochs=5,
# accelerator='cpu',
# default_root_dir=tmpdir)
# trainer.train()
# ntrainer = Trainer(solver=pinn, max_epochs=15, accelerator='cpu')
# t = ntrainer.train(
# ckpt_path=f'{tmpdir}/lightning_logs/version_0/'
# 'checkpoints/epoch=4-step=5.ckpt')
# import shutil
# shutil.rmtree(tmpdir)
# def test_train_inverse_problem_cpu():
# poisson_problem = InversePoisson()
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4', 'D']
# n = 100
# poisson_problem.discretise_domain(n, 'random', locations=boundaries,
# variables=['x', 'y'])
# pinn = PINN(problem = poisson_problem, model=model,
# extra_features=None, loss=LpLoss())
# trainer = Trainer(solver=pinn, max_epochs=1,
# accelerator='cpu', batch_size=20)
# trainer.train()
# def test_train_inverse_problem_load():
# tmpdir = "tests/tmp_load_inv"
# poisson_problem = InversePoisson()
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4', 'D']
# n = 100
# poisson_problem.discretise_domain(n, 'random', locations=boundaries)
# pinn = PINN(problem=poisson_problem,
# model=model,
# extra_features=None,
# loss=LpLoss())
# trainer = Trainer(solver=pinn,
# max_epochs=15,
# accelerator='cpu',
# default_root_dir=tmpdir)
# trainer.train()
# new_pinn = PINN.load_from_checkpoint(
# f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=14-step=15.ckpt',
# problem = poisson_problem, model=model)
# test_pts = CartesianDomain({'x': [0, 1], 'y': [0, 1]}).sample(10)
# assert new_pinn.forward(test_pts).extract(['u']).shape == (10, 1)
# assert new_pinn.forward(test_pts).extract(
# ['u']).shape == pinn.forward(test_pts).extract(['u']).shape
# torch.testing.assert_close(
# new_pinn.forward(test_pts).extract(['u']),
# pinn.forward(test_pts).extract(['u']))
# import shutil
# shutil.rmtree(tmpdir)
# rm directories
import shutil
shutil.rmtree('tests/test_solvers/tmp')

View File

@@ -1,449 +1,157 @@
import torch
import pytest
import torch
from pina.problem import SpatialProblem, InverseProblem
from pina.operators import laplacian
from pina.domain import CartesianDomain
from pina import Condition, LabelTensor
from pina.solvers import RBAPINN as PINN
from pina.trainer import Trainer
from pina import LabelTensor, Condition
from pina.model import FeedForward
from pina.equation import Equation
from pina.equation.equation_factory import FixedValue
from pina.loss import LpLoss
from pina.trainer import Trainer
from pina.solvers import RBAPINN
from pina.condition import (
InputOutputPointsCondition,
InputPointsEquationCondition,
DomainEquationCondition
)
from pina.problem.zoo import (
Poisson2DSquareProblem as Poisson,
InversePoisson2DSquareProblem as InversePoisson
)
from torch._dynamo.eval_frame import OptimizedModule
# define problems and model
problem = Poisson()
problem.discretise_domain(50)
inverse_problem = InversePoisson()
inverse_problem.discretise_domain(50)
model = FeedForward(
len(problem.input_variables),
len(problem.output_variables)
)
# add input-output condition to test supervised learning
input_pts = torch.rand(50, len(problem.input_variables))
input_pts = LabelTensor(input_pts, problem.input_variables)
output_pts = torch.rand(50, len(problem.output_variables))
output_pts = LabelTensor(output_pts, problem.output_variables)
problem.conditions['data'] = Condition(
input_points=input_pts,
output_points=output_pts
)
# def laplace_equation(input_, output_):
# force_term = (torch.sin(input_.extract(['x']) * torch.pi) *
# torch.sin(input_.extract(['y']) * torch.pi))
# delta_u = laplacian(output_.extract(['u']), input_)
# return delta_u - force_term
@pytest.mark.parametrize("problem", [problem, inverse_problem])
@pytest.mark.parametrize("eta", [1, 0.001])
@pytest.mark.parametrize("gamma", [0.5, 0.9])
def test_constructor(problem, eta, gamma):
with pytest.raises(AssertionError):
solver = RBAPINN(model=model, problem=problem, gamma=1.5)
solver = RBAPINN(model=model, problem=problem, eta=eta, gamma=gamma)
assert solver.accepted_conditions_types == (
InputOutputPointsCondition,
InputPointsEquationCondition,
DomainEquationCondition
)
# my_laplace = Equation(laplace_equation)
# in_ = LabelTensor(torch.tensor([[0., 1.]]), ['x', 'y'])
# out_ = LabelTensor(torch.tensor([[0.]]), ['u'])
# in2_ = LabelTensor(torch.rand(60, 2), ['x', 'y'])
# out2_ = LabelTensor(torch.rand(60, 1), ['u'])
@pytest.mark.parametrize("problem", [problem, inverse_problem])
def test_wrong_batch(problem):
with pytest.raises(NotImplementedError):
solver = RBAPINN(model=model, problem=problem)
trainer = Trainer(solver=solver,
max_epochs=2,
accelerator='cpu',
batch_size=10,
train_size=1.,
val_size=0.,
test_size=0.)
trainer.train()
# class InversePoisson(SpatialProblem, InverseProblem):
# '''
# Problem definition for the Poisson equation.
# '''
# output_variables = ['u']
# x_min = -2
# x_max = 2
# y_min = -2
# y_max = 2
# data_input = LabelTensor(torch.rand(10, 2), ['x', 'y'])
# data_output = LabelTensor(torch.rand(10, 1), ['u'])
# spatial_domain = CartesianDomain({'x': [x_min, x_max], 'y': [y_min, y_max]})
# # define the ranges for the parameters
# unknown_parameter_domain = CartesianDomain({'mu1': [-1, 1], 'mu2': [-1, 1]})
# def laplace_equation(input_, output_, params_):
# '''
# Laplace equation with a force term.
# '''
# force_term = torch.exp(
# - 2*(input_.extract(['x']) - params_['mu1'])**2
# - 2*(input_.extract(['y']) - params_['mu2'])**2)
# delta_u = laplacian(output_, input_, components=['u'], d=['x', 'y'])
# return delta_u - force_term
# # define the conditions for the loss (boundary conditions, equation, data)
# conditions = {
# 'gamma1': Condition(location=CartesianDomain({'x': [x_min, x_max],
# 'y': y_max}),
# equation=FixedValue(0.0, components=['u'])),
# 'gamma2': Condition(location=CartesianDomain(
# {'x': [x_min, x_max], 'y': y_min
# }),
# equation=FixedValue(0.0, components=['u'])),
# 'gamma3': Condition(location=CartesianDomain(
# {'x': x_max, 'y': [y_min, y_max]
# }),
# equation=FixedValue(0.0, components=['u'])),
# 'gamma4': Condition(location=CartesianDomain(
# {'x': x_min, 'y': [y_min, y_max]
# }),
# equation=FixedValue(0.0, components=['u'])),
# 'D': Condition(location=CartesianDomain(
# {'x': [x_min, x_max], 'y': [y_min, y_max]
# }),
# equation=Equation(laplace_equation)),
# 'data': Condition(input_points=data_input.extract(['x', 'y']),
# output_points=data_output)
# }
@pytest.mark.parametrize("problem", [problem, inverse_problem])
@pytest.mark.parametrize("compile", [True, False])
def test_solver_train(problem, compile):
solver = RBAPINN(model=model, problem=problem)
trainer = Trainer(solver=solver,
max_epochs=2,
accelerator='cpu',
batch_size=None,
train_size=1.,
val_size=0.,
test_size=0.,
compile=compile)
trainer.train()
if trainer.compile:
assert (isinstance(solver.model, OptimizedModule))
# class Poisson(SpatialProblem):
# output_variables = ['u']
# spatial_domain = CartesianDomain({'x': [0, 1], 'y': [0, 1]})
# conditions = {
# 'gamma1': Condition(
# location=CartesianDomain({'x': [0, 1], 'y': 1}),
# equation=FixedValue(0.0)),
# 'gamma2': Condition(
# location=CartesianDomain({'x': [0, 1], 'y': 0}),
# equation=FixedValue(0.0)),
# 'gamma3': Condition(
# location=CartesianDomain({'x': 1, 'y': [0, 1]}),
# equation=FixedValue(0.0)),
# 'gamma4': Condition(
# location=CartesianDomain({'x': 0, 'y': [0, 1]}),
# equation=FixedValue(0.0)),
# 'D': Condition(
# input_points=LabelTensor(torch.rand(size=(100, 2)), ['x', 'y']),
# equation=my_laplace),
# 'data': Condition(
# input_points=in_,
# output_points=out_),
# 'data2': Condition(
# input_points=in2_,
# output_points=out2_)
# }
# def poisson_sol(self, pts):
# return -(torch.sin(pts.extract(['x']) * torch.pi) *
# torch.sin(pts.extract(['y']) * torch.pi)) / (2 * torch.pi**2)
# truth_solution = poisson_sol
@pytest.mark.parametrize("problem", [problem, inverse_problem])
@pytest.mark.parametrize("compile", [True, False])
def test_solver_validation(problem, compile):
solver = RBAPINN(model=model, problem=problem)
trainer = Trainer(solver=solver,
max_epochs=2,
accelerator='cpu',
batch_size=None,
train_size=0.9,
val_size=0.1,
test_size=0.,
compile=compile)
trainer.train()
if trainer.compile:
assert (isinstance(solver.model, OptimizedModule))
# class myFeature(torch.nn.Module):
# """
# Feature: sin(x)
# """
# def __init__(self):
# super(myFeature, self).__init__()
# def forward(self, x):
# t = (torch.sin(x.extract(['x']) * torch.pi) *
# torch.sin(x.extract(['y']) * torch.pi))
# return LabelTensor(t, ['sin(x)sin(y)'])
@pytest.mark.parametrize("problem", [problem, inverse_problem])
@pytest.mark.parametrize("compile", [True, False])
def test_solver_test(problem, compile):
solver = RBAPINN(model=model, problem=problem)
trainer = Trainer(solver=solver,
max_epochs=2,
accelerator='cpu',
batch_size=None,
train_size=0.7,
val_size=0.2,
test_size=0.1,
compile=compile)
trainer.test()
if trainer.compile:
assert (isinstance(solver.model, OptimizedModule))
# # make the problem
# poisson_problem = Poisson()
# model = FeedForward(len(poisson_problem.input_variables),
# len(poisson_problem.output_variables))
# model_extra_feats = FeedForward(
# len(poisson_problem.input_variables) + 1,
# len(poisson_problem.output_variables))
# extra_feats = [myFeature()]
@pytest.mark.parametrize("problem", [problem, inverse_problem])
def test_train_load_restore(problem):
dir = "tests/test_solvers/tmp"
problem = problem
solver = RBAPINN(model=model, problem=problem)
trainer = Trainer(solver=solver,
max_epochs=5,
accelerator='cpu',
batch_size=None,
train_size=0.7,
val_size=0.2,
test_size=0.1,
default_root_dir=dir)
trainer.train()
# restore
new_trainer = Trainer(solver=solver, max_epochs=5, accelerator='cpu')
new_trainer.train(
ckpt_path=f'{dir}/lightning_logs/version_0/checkpoints/' +
'epoch=4-step=5.ckpt')
# def test_constructor():
# PINN(problem=poisson_problem, model=model, extra_features=None)
# with pytest.raises(ValueError):
# PINN(problem=poisson_problem, model=model, eta='x')
# PINN(problem=poisson_problem, model=model, gamma='x')
# loading
new_solver = RBAPINN.load_from_checkpoint(
f'{dir}/lightning_logs/version_0/checkpoints/epoch=4-step=5.ckpt',
problem=problem, model=model)
test_pts = LabelTensor(torch.rand(20, 2), problem.input_variables)
assert new_solver.forward(test_pts).shape == (20, 1)
assert new_solver.forward(test_pts).shape == (
solver.forward(test_pts).shape
)
torch.testing.assert_close(
new_solver.forward(test_pts),
solver.forward(test_pts))
# def test_constructor_extra_feats():
# model_extra_feats = FeedForward(
# len(poisson_problem.input_variables) + 1,
# len(poisson_problem.output_variables))
# PINN(problem=poisson_problem,
# model=model_extra_feats,
# extra_features=extra_feats)
# def test_train_cpu():
# poisson_problem = Poisson()
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# n = 10
# poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
# pinn = PINN(problem = poisson_problem, model=model,
# extra_features=None, loss=LpLoss())
# trainer = Trainer(solver=pinn, max_epochs=1,
# accelerator='cpu', batch_size=20)
# trainer.train()
# def test_log():
# poisson_problem.discretise_domain(100)
# solver = PINN(problem = poisson_problem, model=model,
# extra_features=None, loss=LpLoss())
# trainer = Trainer(solver, max_epochs=2, accelerator='cpu')
# trainer.train()
# # assert the logged metrics are correct
# logged_metrics = sorted(list(trainer.logged_metrics.keys()))
# total_metrics = sorted(
# list([key + '_loss' for key in poisson_problem.conditions.keys()])
# + ['mean_loss'])
# assert logged_metrics == total_metrics
# def test_train_restore():
# tmpdir = "tests/tmp_restore"
# poisson_problem = Poisson()
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# n = 10
# poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
# pinn = PINN(problem=poisson_problem,
# model=model,
# extra_features=None,
# loss=LpLoss())
# trainer = Trainer(solver=pinn,
# max_epochs=5,
# accelerator='cpu',
# default_root_dir=tmpdir)
# trainer.train()
# ntrainer = Trainer(solver=pinn, max_epochs=15, accelerator='cpu')
# t = ntrainer.train(
# ckpt_path=f'{tmpdir}/lightning_logs/version_0/'
# 'checkpoints/epoch=4-step=10.ckpt')
# import shutil
# shutil.rmtree(tmpdir)
# def test_train_load():
# tmpdir = "tests/tmp_load"
# poisson_problem = Poisson()
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# n = 10
# poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
# pinn = PINN(problem=poisson_problem,
# model=model,
# extra_features=None,
# loss=LpLoss())
# trainer = Trainer(solver=pinn,
# max_epochs=15,
# accelerator='cpu',
# default_root_dir=tmpdir)
# trainer.train()
# new_pinn = PINN.load_from_checkpoint(
# f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=14-step=30.ckpt',
# problem = poisson_problem, model=model)
# test_pts = CartesianDomain({'x': [0, 1], 'y': [0, 1]}).sample(10)
# assert new_pinn.forward(test_pts).extract(['u']).shape == (10, 1)
# assert new_pinn.forward(test_pts).extract(
# ['u']).shape == pinn.forward(test_pts).extract(['u']).shape
# torch.testing.assert_close(
# new_pinn.forward(test_pts).extract(['u']),
# pinn.forward(test_pts).extract(['u']))
# import shutil
# shutil.rmtree(tmpdir)
# def test_train_inverse_problem_cpu():
# poisson_problem = InversePoisson()
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4', 'D']
# n = 100
# poisson_problem.discretise_domain(n, 'random', locations=boundaries)
# pinn = PINN(problem = poisson_problem, model=model,
# extra_features=None, loss=LpLoss())
# trainer = Trainer(solver=pinn, max_epochs=1,
# accelerator='cpu', batch_size=20)
# trainer.train()
# # # TODO does not currently work
# # def test_train_inverse_problem_restore():
# # tmpdir = "tests/tmp_restore_inv"
# # poisson_problem = InversePoisson()
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4', 'D']
# # n = 100
# # poisson_problem.discretise_domain(n, 'random', locations=boundaries)
# # pinn = PINN(problem=poisson_problem,
# # model=model,
# # extra_features=None,
# # loss=LpLoss())
# # trainer = Trainer(solver=pinn,
# # max_epochs=5,
# # accelerator='cpu',
# # default_root_dir=tmpdir)
# # trainer.train()
# # ntrainer = Trainer(solver=pinn, max_epochs=5, accelerator='cpu')
# # t = ntrainer.train(
# # ckpt_path=f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=4-step=10.ckpt')
# # import shutil
# # shutil.rmtree(tmpdir)
# def test_train_inverse_problem_load():
# tmpdir = "tests/tmp_load_inv"
# poisson_problem = InversePoisson()
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4', 'D']
# n = 100
# poisson_problem.discretise_domain(n, 'random', locations=boundaries)
# pinn = PINN(problem=poisson_problem,
# model=model,
# extra_features=None,
# loss=LpLoss())
# trainer = Trainer(solver=pinn,
# max_epochs=15,
# accelerator='cpu',
# default_root_dir=tmpdir)
# trainer.train()
# new_pinn = PINN.load_from_checkpoint(
# f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=14-step=30.ckpt',
# problem = poisson_problem, model=model)
# test_pts = CartesianDomain({'x': [0, 1], 'y': [0, 1]}).sample(10)
# assert new_pinn.forward(test_pts).extract(['u']).shape == (10, 1)
# assert new_pinn.forward(test_pts).extract(
# ['u']).shape == pinn.forward(test_pts).extract(['u']).shape
# torch.testing.assert_close(
# new_pinn.forward(test_pts).extract(['u']),
# pinn.forward(test_pts).extract(['u']))
# import shutil
# shutil.rmtree(tmpdir)
# # # TODO fix asap. Basically sampling few variables
# # # works only if both variables are in a range.
# # # if one is fixed and the other not, this will
# # # not work. This test also needs to be fixed and
# # # insert in test problem not in test pinn.
# # def test_train_cpu_sampling_few_vars():
# # poisson_problem = Poisson()
# # boundaries = ['gamma1', 'gamma2', 'gamma3']
# # n = 10
# # poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
# # poisson_problem.discretise_domain(n, 'random', locations=['gamma4'], variables=['x'])
# # poisson_problem.discretise_domain(n, 'random', locations=['gamma4'], variables=['y'])
# # pinn = PINN(problem = poisson_problem, model=model, extra_features=None, loss=LpLoss())
# # trainer = Trainer(solver=pinn, kwargs={'max_epochs' : 5, 'accelerator':'cpu'})
# # trainer.train()
# def test_train_extra_feats_cpu():
# poisson_problem = Poisson()
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# n = 10
# poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
# pinn = PINN(problem=poisson_problem,
# model=model_extra_feats,
# extra_features=extra_feats)
# trainer = Trainer(solver=pinn, max_epochs=5, accelerator='cpu')
# trainer.train()
# # TODO, fix GitHub actions to run also on GPU
# # def test_train_gpu():
# # poisson_problem = Poisson()
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # n = 10
# # poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
# # pinn = PINN(problem = poisson_problem, model=model, extra_features=None, loss=LpLoss())
# # trainer = Trainer(solver=pinn, kwargs={'max_epochs' : 5, 'accelerator':'gpu'})
# # trainer.train()
# # def test_train_gpu(): #TODO fix ASAP
# # poisson_problem = Poisson()
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # n = 10
# # poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
# # poisson_problem.conditions.pop('data') # The input/output pts are allocated on cpu
# # pinn = PINN(problem = poisson_problem, model=model, extra_features=None, loss=LpLoss())
# # trainer = Trainer(solver=pinn, kwargs={'max_epochs' : 5, 'accelerator':'gpu'})
# # trainer.train()
# # def test_train_2():
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # n = 10
# # expected_keys = [[], list(range(0, 50, 3))]
# # param = [0, 3]
# # for i, truth_key in zip(param, expected_keys):
# # pinn = PINN(problem, model)
# # pinn.discretise_domain(n, 'grid', locations=boundaries)
# # pinn.discretise_domain(n, 'grid', locations=['D'])
# # pinn.train(50, save_loss=i)
# # assert list(pinn.history_loss.keys()) == truth_key
# # def test_train_extra_feats():
# # pinn = PINN(problem, model_extra_feat, [myFeature()])
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # n = 10
# # pinn.discretise_domain(n, 'grid', locations=boundaries)
# # pinn.discretise_domain(n, 'grid', locations=['D'])
# # pinn.train(5)
# # def test_train_2_extra_feats():
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # n = 10
# # expected_keys = [[], list(range(0, 50, 3))]
# # param = [0, 3]
# # for i, truth_key in zip(param, expected_keys):
# # pinn = PINN(problem, model_extra_feat, [myFeature()])
# # pinn.discretise_domain(n, 'grid', locations=boundaries)
# # pinn.discretise_domain(n, 'grid', locations=['D'])
# # pinn.train(50, save_loss=i)
# # assert list(pinn.history_loss.keys()) == truth_key
# # def test_train_with_optimizer_kwargs():
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # n = 10
# # expected_keys = [[], list(range(0, 50, 3))]
# # param = [0, 3]
# # for i, truth_key in zip(param, expected_keys):
# # pinn = PINN(problem, model, optimizer_kwargs={'lr' : 0.3})
# # pinn.discretise_domain(n, 'grid', locations=boundaries)
# # pinn.discretise_domain(n, 'grid', locations=['D'])
# # pinn.train(50, save_loss=i)
# # assert list(pinn.history_loss.keys()) == truth_key
# # def test_train_with_lr_scheduler():
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # n = 10
# # expected_keys = [[], list(range(0, 50, 3))]
# # param = [0, 3]
# # for i, truth_key in zip(param, expected_keys):
# # pinn = PINN(
# # problem,
# # model,
# # lr_scheduler_type=torch.optim.lr_scheduler.CyclicLR,
# # lr_scheduler_kwargs={'base_lr' : 0.1, 'max_lr' : 0.3, 'cycle_momentum': False}
# # )
# # pinn.discretise_domain(n, 'grid', locations=boundaries)
# # pinn.discretise_domain(n, 'grid', locations=['D'])
# # pinn.train(50, save_loss=i)
# # assert list(pinn.history_loss.keys()) == truth_key
# # # def test_train_batch():
# # # pinn = PINN(problem, model, batch_size=6)
# # # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # # n = 10
# # # pinn.discretise_domain(n, 'grid', locations=boundaries)
# # # pinn.discretise_domain(n, 'grid', locations=['D'])
# # # pinn.train(5)
# # # def test_train_batch_2():
# # # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # # n = 10
# # # expected_keys = [[], list(range(0, 50, 3))]
# # # param = [0, 3]
# # # for i, truth_key in zip(param, expected_keys):
# # # pinn = PINN(problem, model, batch_size=6)
# # # pinn.discretise_domain(n, 'grid', locations=boundaries)
# # # pinn.discretise_domain(n, 'grid', locations=['D'])
# # # pinn.train(50, save_loss=i)
# # # assert list(pinn.history_loss.keys()) == truth_key
# # if torch.cuda.is_available():
# # # def test_gpu_train():
# # # pinn = PINN(problem, model, batch_size=20, device='cuda')
# # # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # # n = 100
# # # pinn.discretise_domain(n, 'grid', locations=boundaries)
# # # pinn.discretise_domain(n, 'grid', locations=['D'])
# # # pinn.train(5)
# # def test_gpu_train_nobatch():
# # pinn = PINN(problem, model, batch_size=None, device='cuda')
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # n = 100
# # pinn.discretise_domain(n, 'grid', locations=boundaries)
# # pinn.discretise_domain(n, 'grid', locations=['D'])
# # pinn.train(5)
# rm directories
import shutil
shutil.rmtree('tests/test_solvers/tmp')

View File

@@ -1,105 +1,187 @@
import torch
import pytest
from pina.problem import AbstractProblem
from pina import Condition, LabelTensor
from pina.problem import AbstractProblem
from pina.condition import InputOutputPointsCondition
from pina.solvers import ReducedOrderModelSolver
from pina.trainer import Trainer
from pina.model import FeedForward
from pina.loss import LpLoss
from pina.problem.zoo import Poisson2DSquareProblem
from torch._dynamo.eval_frame import OptimizedModule
# class NeuralOperatorProblem(AbstractProblem):
# input_variables = ['u_0', 'u_1']
# output_variables = [f'u_{i}' for i in range(100)]
# conditions = {'data' : Condition(input_points=
# LabelTensor(torch.rand(10, 2),
# input_variables),
# output_points=
# LabelTensor(torch.rand(10, 100),
# output_variables))}
class LabelTensorProblem(AbstractProblem):
input_variables = ['u_0', 'u_1']
output_variables = ['u']
conditions = {
'data': Condition(
input_points=LabelTensor(torch.randn(20, 2), ['u_0', 'u_1']),
output_points=LabelTensor(torch.randn(20, 1), ['u'])),
}
# # make the problem + extra feats
# class AE(torch.nn.Module):
# def __init__(self, input_dimensions, rank):
# super().__init__()
# self.encode = FeedForward(input_dimensions, rank, layers=[input_dimensions//4])
# self.decode = FeedForward(rank, input_dimensions, layers=[input_dimensions//4])
# class AE_missing_encode(torch.nn.Module):
# def __init__(self, input_dimensions, rank):
# super().__init__()
# self.encode = FeedForward(input_dimensions, rank, layers=[input_dimensions//4])
# class AE_missing_decode(torch.nn.Module):
# def __init__(self, input_dimensions, rank):
# super().__init__()
# self.decode = FeedForward(rank, input_dimensions, layers=[input_dimensions//4])
# rank = 10
# problem = NeuralOperatorProblem()
# interpolation_net = FeedForward(len(problem.input_variables),
# rank)
# reduction_net = AE(len(problem.output_variables), rank)
# def test_constructor():
# ReducedOrderModelSolver(problem=problem,reduction_network=reduction_net,
# interpolation_network=interpolation_net)
# with pytest.raises(SyntaxError):
# ReducedOrderModelSolver(problem=problem,
# reduction_network=AE_missing_encode(
# len(problem.output_variables), rank),
# interpolation_network=interpolation_net)
# ReducedOrderModelSolver(problem=problem,
# reduction_network=AE_missing_decode(
# len(problem.output_variables), rank),
# interpolation_network=interpolation_net)
class TensorProblem(AbstractProblem):
input_variables = ['u_0', 'u_1']
output_variables = ['u']
conditions = {
'data': Condition(
input_points=torch.randn(20, 2),
output_points=torch.randn(20, 1))
}
# def test_train_cpu():
# solver = ReducedOrderModelSolver(problem = problem,reduction_network=reduction_net,
# interpolation_network=interpolation_net, loss=LpLoss())
# trainer = Trainer(solver=solver, max_epochs=3, accelerator='cpu', batch_size=20)
# trainer.train()
class AE(torch.nn.Module):
def __init__(self, input_dimensions, rank):
super().__init__()
self.encode = FeedForward(
input_dimensions, rank, layers=[input_dimensions//4])
self.decode = FeedForward(
rank, input_dimensions, layers=[input_dimensions//4])
# def test_train_restore():
# tmpdir = "tests/tmp_restore"
# solver = ReducedOrderModelSolver(problem=problem,
# reduction_network=reduction_net,
# interpolation_network=interpolation_net,
# loss=LpLoss())
# trainer = Trainer(solver=solver,
# max_epochs=5,
# accelerator='cpu',
# default_root_dir=tmpdir)
# trainer.train()
# ntrainer = Trainer(solver=solver, max_epochs=15, accelerator='cpu')
# t = ntrainer.train(
# ckpt_path=f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=4-step=5.ckpt')
# import shutil
# shutil.rmtree(tmpdir)
class AE_missing_encode(torch.nn.Module):
def __init__(self, input_dimensions, rank):
super().__init__()
self.encode = FeedForward(
input_dimensions, rank, layers=[input_dimensions//4])
# def test_train_load():
# tmpdir = "tests/tmp_load"
# solver = ReducedOrderModelSolver(problem=problem,
# reduction_network=reduction_net,
# interpolation_network=interpolation_net,
# loss=LpLoss())
# trainer = Trainer(solver=solver,
# max_epochs=15,
# accelerator='cpu',
# default_root_dir=tmpdir)
# trainer.train()
# new_solver = ReducedOrderModelSolver.load_from_checkpoint(
# f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=14-step=15.ckpt',
# problem = problem,reduction_network=reduction_net,
# interpolation_network=interpolation_net)
# test_pts = LabelTensor(torch.rand(20, 2), problem.input_variables)
# assert new_solver.forward(test_pts).shape == (20, 100)
# assert new_solver.forward(test_pts).shape == solver.forward(test_pts).shape
# torch.testing.assert_close(
# new_solver.forward(test_pts),
# solver.forward(test_pts))
# import shutil
# shutil.rmtree(tmpdir)
class AE_missing_decode(torch.nn.Module):
def __init__(self, input_dimensions, rank):
super().__init__()
self.decode = FeedForward(
rank, input_dimensions, layers=[input_dimensions//4])
rank = 10
model = AE(2, 1)
interpolation_net = FeedForward(2, rank)
reduction_net = AE(1, rank)
def test_constructor():
problem = TensorProblem()
ReducedOrderModelSolver(problem=problem,
interpolation_network=interpolation_net,
reduction_network=reduction_net)
ReducedOrderModelSolver(problem=LabelTensorProblem(),
reduction_network=reduction_net,
interpolation_network=interpolation_net)
assert ReducedOrderModelSolver.accepted_conditions_types == InputOutputPointsCondition
with pytest.raises(SyntaxError):
ReducedOrderModelSolver(problem=problem,
reduction_network=AE_missing_encode(
len(problem.output_variables), rank),
interpolation_network=interpolation_net)
ReducedOrderModelSolver(problem=problem,
reduction_network=AE_missing_decode(
len(problem.output_variables), rank),
interpolation_network=interpolation_net)
with pytest.raises(ValueError):
ReducedOrderModelSolver(problem=Poisson2DSquareProblem(),
reduction_network=reduction_net,
interpolation_network=interpolation_net)
@pytest.mark.parametrize("batch_size", [None, 1, 5, 20])
@pytest.mark.parametrize("use_lt", [True, False])
@pytest.mark.parametrize("compile", [True, False])
def test_solver_train(use_lt, batch_size, compile):
problem = LabelTensorProblem() if use_lt else TensorProblem()
solver = ReducedOrderModelSolver(problem=problem,
reduction_network=reduction_net,
interpolation_network=interpolation_net, use_lt=use_lt)
trainer = Trainer(solver=solver,
max_epochs=2,
accelerator='cpu',
batch_size=batch_size,
train_size=1.,
test_size=0.,
val_size=0.,
compile=compile)
trainer.train()
if trainer.compile:
for v in solver.model.values():
assert (isinstance(v, OptimizedModule))
@pytest.mark.parametrize("use_lt", [True, False])
@pytest.mark.parametrize("compile", [True, False])
def test_solver_validation(use_lt, compile):
problem = LabelTensorProblem() if use_lt else TensorProblem()
solver = ReducedOrderModelSolver(problem=problem,
reduction_network=reduction_net,
interpolation_network=interpolation_net, use_lt=use_lt)
trainer = Trainer(solver=solver,
max_epochs=2,
accelerator='cpu',
batch_size=None,
train_size=0.9,
val_size=0.1,
test_size=0.,
compile=compile)
trainer.train()
if trainer.compile:
for v in solver.model.values():
assert (isinstance(v, OptimizedModule))
@pytest.mark.parametrize("use_lt", [True, False])
@pytest.mark.parametrize("compile", [True, False])
def test_solver_test(use_lt, compile):
problem = LabelTensorProblem() if use_lt else TensorProblem()
solver = ReducedOrderModelSolver(problem=problem,
reduction_network=reduction_net,
interpolation_network=interpolation_net, use_lt=use_lt)
trainer = Trainer(solver=solver,
max_epochs=2,
accelerator='cpu',
batch_size=None,
train_size=0.8,
val_size=0.1,
test_size=0.1,
compile=compile)
trainer.train()
if trainer.compile:
for v in solver.model.values():
assert (isinstance(v, OptimizedModule))
def test_train_load_restore():
dir = "tests/test_solvers/tmp/"
problem = LabelTensorProblem()
solver = ReducedOrderModelSolver(problem=problem,
reduction_network=reduction_net,
interpolation_network=interpolation_net)
trainer = Trainer(solver=solver,
max_epochs=5,
accelerator='cpu',
batch_size=None,
train_size=0.9,
test_size=0.1,
val_size=0.,
default_root_dir=dir)
trainer.train()
# restore
ntrainer = Trainer(solver=solver,
max_epochs=5,
accelerator='cpu',)
ntrainer.train(
ckpt_path=f'{dir}/lightning_logs/version_0/checkpoints/epoch=4-step=5.ckpt')
# loading
new_solver = ReducedOrderModelSolver.load_from_checkpoint(
f'{dir}/lightning_logs/version_0/checkpoints/epoch=4-step=5.ckpt',
problem=problem,
reduction_network=reduction_net,
interpolation_network=interpolation_net)
test_pts = LabelTensor(torch.rand(20, 2), problem.input_variables)
assert new_solver.forward(test_pts).shape == (20, 1)
assert new_solver.forward(test_pts).shape == solver.forward(test_pts).shape
torch.testing.assert_close(
new_solver.forward(test_pts),
solver.forward(test_pts))
# rm directories
import shutil
shutil.rmtree('tests/test_solvers/tmp')

View File

@@ -1,449 +0,0 @@
import torch
import pytest
from pina.problem import SpatialProblem, InverseProblem
from pina.operators import laplacian
from pina.domain import CartesianDomain
from pina import Condition, LabelTensor
from pina.solvers import SAPINN as PINN
from pina.trainer import Trainer
from pina.model import FeedForward
from pina.equation import Equation
from pina.equation.equation_factory import FixedValue
from pina.loss import LpLoss
# def laplace_equation(input_, output_):
# force_term = (torch.sin(input_.extract(['x']) * torch.pi) *
# torch.sin(input_.extract(['y']) * torch.pi))
# delta_u = laplacian(output_.extract(['u']), input_)
# return delta_u - force_term
# my_laplace = Equation(laplace_equation)
# in_ = LabelTensor(torch.tensor([[0., 1.]]), ['x', 'y'])
# out_ = LabelTensor(torch.tensor([[0.]]), ['u'])
# in2_ = LabelTensor(torch.rand(60, 2), ['x', 'y'])
# out2_ = LabelTensor(torch.rand(60, 1), ['u'])
# class InversePoisson(SpatialProblem, InverseProblem):
# '''
# Problem definition for the Poisson equation.
# '''
# output_variables = ['u']
# x_min = -2
# x_max = 2
# y_min = -2
# y_max = 2
# data_input = LabelTensor(torch.rand(10, 2), ['x', 'y'])
# data_output = LabelTensor(torch.rand(10, 1), ['u'])
# spatial_domain = CartesianDomain({'x': [x_min, x_max], 'y': [y_min, y_max]})
# # define the ranges for the parameters
# unknown_parameter_domain = CartesianDomain({'mu1': [-1, 1], 'mu2': [-1, 1]})
# def laplace_equation(input_, output_, params_):
# '''
# Laplace equation with a force term.
# '''
# force_term = torch.exp(
# - 2*(input_.extract(['x']) - params_['mu1'])**2
# - 2*(input_.extract(['y']) - params_['mu2'])**2)
# delta_u = laplacian(output_, input_, components=['u'], d=['x', 'y'])
# return delta_u - force_term
# # define the conditions for the loss (boundary conditions, equation, data)
# conditions = {
# 'gamma1': Condition(location=CartesianDomain({'x': [x_min, x_max],
# 'y': y_max}),
# equation=FixedValue(0.0, components=['u'])),
# 'gamma2': Condition(location=CartesianDomain(
# {'x': [x_min, x_max], 'y': y_min
# }),
# equation=FixedValue(0.0, components=['u'])),
# 'gamma3': Condition(location=CartesianDomain(
# {'x': x_max, 'y': [y_min, y_max]
# }),
# equation=FixedValue(0.0, components=['u'])),
# 'gamma4': Condition(location=CartesianDomain(
# {'x': x_min, 'y': [y_min, y_max]
# }),
# equation=FixedValue(0.0, components=['u'])),
# 'D': Condition(location=CartesianDomain(
# {'x': [x_min, x_max], 'y': [y_min, y_max]
# }),
# equation=Equation(laplace_equation)),
# 'data': Condition(input_points=data_input.extract(['x', 'y']),
# output_points=data_output)
# }
# class Poisson(SpatialProblem):
# output_variables = ['u']
# spatial_domain = CartesianDomain({'x': [0, 1], 'y': [0, 1]})
# conditions = {
# 'gamma1': Condition(
# location=CartesianDomain({'x': [0, 1], 'y': 1}),
# equation=FixedValue(0.0)),
# 'gamma2': Condition(
# location=CartesianDomain({'x': [0, 1], 'y': 0}),
# equation=FixedValue(0.0)),
# 'gamma3': Condition(
# location=CartesianDomain({'x': 1, 'y': [0, 1]}),
# equation=FixedValue(0.0)),
# 'gamma4': Condition(
# location=CartesianDomain({'x': 0, 'y': [0, 1]}),
# equation=FixedValue(0.0)),
# 'D': Condition(
# input_points=LabelTensor(torch.rand(size=(100, 2)), ['x', 'y']),
# equation=my_laplace),
# 'data': Condition(
# input_points=in_,
# output_points=out_),
# 'data2': Condition(
# input_points=in2_,
# output_points=out2_)
# }
# def poisson_sol(self, pts):
# return -(torch.sin(pts.extract(['x']) * torch.pi) *
# torch.sin(pts.extract(['y']) * torch.pi)) / (2 * torch.pi**2)
# truth_solution = poisson_sol
# class myFeature(torch.nn.Module):
# """
# Feature: sin(x)
# """
# def __init__(self):
# super(myFeature, self).__init__()
# def forward(self, x):
# t = (torch.sin(x.extract(['x']) * torch.pi) *
# torch.sin(x.extract(['y']) * torch.pi))
# return LabelTensor(t, ['sin(x)sin(y)'])
# # make the problem
# poisson_problem = Poisson()
# model = FeedForward(len(poisson_problem.input_variables),
# len(poisson_problem.output_variables))
# model_extra_feats = FeedForward(
# len(poisson_problem.input_variables) + 1,
# len(poisson_problem.output_variables))
# extra_feats = [myFeature()]
# def test_constructor():
# PINN(problem=poisson_problem, model=model, extra_features=None)
# with pytest.raises(ValueError):
# PINN(problem=poisson_problem, model=model, extra_features=None,
# weights_function=1)
# def test_constructor_extra_feats():
# model_extra_feats = FeedForward(
# len(poisson_problem.input_variables) + 1,
# len(poisson_problem.output_variables))
# PINN(problem=poisson_problem,
# model=model_extra_feats,
# extra_features=extra_feats)
# def test_train_cpu():
# poisson_problem = Poisson()
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# n = 10
# poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
# pinn = PINN(problem = poisson_problem, model=model,
# extra_features=None, loss=LpLoss())
# trainer = Trainer(solver=pinn, max_epochs=1,
# accelerator='cpu', batch_size=20)
# trainer.train()
# def test_log():
# poisson_problem.discretise_domain(100)
# solver = PINN(problem = poisson_problem, model=model,
# extra_features=None, loss=LpLoss())
# trainer = Trainer(solver, max_epochs=2, accelerator='cpu')
# trainer.train()
# # assert the logged metrics are correct
# logged_metrics = sorted(list(trainer.logged_metrics.keys()))
# total_metrics = sorted(
# list([key + '_loss' for key in poisson_problem.conditions.keys()])
# + ['mean_loss'])
# assert logged_metrics == total_metrics
# def test_train_restore():
# tmpdir = "tests/tmp_restore"
# poisson_problem = Poisson()
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# n = 10
# poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
# pinn = PINN(problem=poisson_problem,
# model=model,
# extra_features=None,
# loss=LpLoss())
# trainer = Trainer(solver=pinn,
# max_epochs=5,
# accelerator='cpu',
# default_root_dir=tmpdir)
# trainer.train()
# ntrainer = Trainer(solver=pinn, max_epochs=15, accelerator='cpu')
# t = ntrainer.train(
# ckpt_path=f'{tmpdir}/lightning_logs/version_0/'
# 'checkpoints/epoch=4-step=10.ckpt')
# import shutil
# shutil.rmtree(tmpdir)
# def test_train_load():
# tmpdir = "tests/tmp_load"
# poisson_problem = Poisson()
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# n = 10
# poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
# pinn = PINN(problem=poisson_problem,
# model=model,
# extra_features=None,
# loss=LpLoss())
# trainer = Trainer(solver=pinn,
# max_epochs=15,
# accelerator='cpu',
# default_root_dir=tmpdir)
# trainer.train()
# new_pinn = PINN.load_from_checkpoint(
# f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=14-step=30.ckpt',
# problem = poisson_problem, model=model)
# test_pts = CartesianDomain({'x': [0, 1], 'y': [0, 1]}).sample(10)
# assert new_pinn.forward(test_pts).extract(['u']).shape == (10, 1)
# assert new_pinn.forward(test_pts).extract(
# ['u']).shape == pinn.forward(test_pts).extract(['u']).shape
# torch.testing.assert_close(
# new_pinn.forward(test_pts).extract(['u']),
# pinn.forward(test_pts).extract(['u']))
# import shutil
# shutil.rmtree(tmpdir)
# def test_train_inverse_problem_cpu():
# poisson_problem = InversePoisson()
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4', 'D']
# n = 100
# poisson_problem.discretise_domain(n, 'random', locations=boundaries)
# pinn = PINN(problem = poisson_problem, model=model,
# extra_features=None, loss=LpLoss())
# trainer = Trainer(solver=pinn, max_epochs=1,
# accelerator='cpu', batch_size=20)
# trainer.train()
# # # TODO does not currently work
# # def test_train_inverse_problem_restore():
# # tmpdir = "tests/tmp_restore_inv"
# # poisson_problem = InversePoisson()
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4', 'D']
# # n = 100
# # poisson_problem.discretise_domain(n, 'random', locations=boundaries)
# # pinn = PINN(problem=poisson_problem,
# # model=model,
# # extra_features=None,
# # loss=LpLoss())
# # trainer = Trainer(solver=pinn,
# # max_epochs=5,
# # accelerator='cpu',
# # default_root_dir=tmpdir)
# # trainer.train()
# # ntrainer = Trainer(solver=pinn, max_epochs=5, accelerator='cpu')
# # t = ntrainer.train(
# # ckpt_path=f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=4-step=10.ckpt')
# # import shutil
# # shutil.rmtree(tmpdir)
# def test_train_inverse_problem_load():
# tmpdir = "tests/tmp_load_inv"
# poisson_problem = InversePoisson()
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4', 'D']
# n = 100
# poisson_problem.discretise_domain(n, 'random', locations=boundaries)
# pinn = PINN(problem=poisson_problem,
# model=model,
# extra_features=None,
# loss=LpLoss())
# trainer = Trainer(solver=pinn,
# max_epochs=15,
# accelerator='cpu',
# default_root_dir=tmpdir)
# trainer.train()
# new_pinn = PINN.load_from_checkpoint(
# f'{tmpdir}/lightning_logs/version_0/checkpoints/epoch=14-step=30.ckpt',
# problem = poisson_problem, model=model)
# test_pts = CartesianDomain({'x': [0, 1], 'y': [0, 1]}).sample(10)
# assert new_pinn.forward(test_pts).extract(['u']).shape == (10, 1)
# assert new_pinn.forward(test_pts).extract(
# ['u']).shape == pinn.forward(test_pts).extract(['u']).shape
# torch.testing.assert_close(
# new_pinn.forward(test_pts).extract(['u']),
# pinn.forward(test_pts).extract(['u']))
# import shutil
# shutil.rmtree(tmpdir)
# # # TODO fix asap. Basically sampling few variables
# # # works only if both variables are in a range.
# # # if one is fixed and the other not, this will
# # # not work. This test also needs to be fixed and
# # # insert in test problem not in test pinn.
# # def test_train_cpu_sampling_few_vars():
# # poisson_problem = Poisson()
# # boundaries = ['gamma1', 'gamma2', 'gamma3']
# # n = 10
# # poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
# # poisson_problem.discretise_domain(n, 'random', locations=['gamma4'], variables=['x'])
# # poisson_problem.discretise_domain(n, 'random', locations=['gamma4'], variables=['y'])
# # pinn = PINN(problem = poisson_problem, model=model, extra_features=None, loss=LpLoss())
# # trainer = Trainer(solver=pinn, kwargs={'max_epochs' : 5, 'accelerator':'cpu'})
# # trainer.train()
# def test_train_extra_feats_cpu():
# poisson_problem = Poisson()
# boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# n = 10
# poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
# pinn = PINN(problem=poisson_problem,
# model=model_extra_feats,
# extra_features=extra_feats)
# trainer = Trainer(solver=pinn, max_epochs=5, accelerator='cpu')
# trainer.train()
# # TODO, fix GitHub actions to run also on GPU
# # def test_train_gpu():
# # poisson_problem = Poisson()
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # n = 10
# # poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
# # pinn = PINN(problem = poisson_problem, model=model, extra_features=None, loss=LpLoss())
# # trainer = Trainer(solver=pinn, kwargs={'max_epochs' : 5, 'accelerator':'gpu'})
# # trainer.train()
# # def test_train_gpu(): #TODO fix ASAP
# # poisson_problem = Poisson()
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # n = 10
# # poisson_problem.discretise_domain(n, 'grid', locations=boundaries)
# # poisson_problem.conditions.pop('data') # The input/output pts are allocated on cpu
# # pinn = PINN(problem = poisson_problem, model=model, extra_features=None, loss=LpLoss())
# # trainer = Trainer(solver=pinn, kwargs={'max_epochs' : 5, 'accelerator':'gpu'})
# # trainer.train()
# # def test_train_2():
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # n = 10
# # expected_keys = [[], list(range(0, 50, 3))]
# # param = [0, 3]
# # for i, truth_key in zip(param, expected_keys):
# # pinn = PINN(problem, model)
# # pinn.discretise_domain(n, 'grid', locations=boundaries)
# # pinn.discretise_domain(n, 'grid', locations=['D'])
# # pinn.train(50, save_loss=i)
# # assert list(pinn.history_loss.keys()) == truth_key
# # def test_train_extra_feats():
# # pinn = PINN(problem, model_extra_feat, [myFeature()])
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # n = 10
# # pinn.discretise_domain(n, 'grid', locations=boundaries)
# # pinn.discretise_domain(n, 'grid', locations=['D'])
# # pinn.train(5)
# # def test_train_2_extra_feats():
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # n = 10
# # expected_keys = [[], list(range(0, 50, 3))]
# # param = [0, 3]
# # for i, truth_key in zip(param, expected_keys):
# # pinn = PINN(problem, model_extra_feat, [myFeature()])
# # pinn.discretise_domain(n, 'grid', locations=boundaries)
# # pinn.discretise_domain(n, 'grid', locations=['D'])
# # pinn.train(50, save_loss=i)
# # assert list(pinn.history_loss.keys()) == truth_key
# # def test_train_with_optimizer_kwargs():
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # n = 10
# # expected_keys = [[], list(range(0, 50, 3))]
# # param = [0, 3]
# # for i, truth_key in zip(param, expected_keys):
# # pinn = PINN(problem, model, optimizer_kwargs={'lr' : 0.3})
# # pinn.discretise_domain(n, 'grid', locations=boundaries)
# # pinn.discretise_domain(n, 'grid', locations=['D'])
# # pinn.train(50, save_loss=i)
# # assert list(pinn.history_loss.keys()) == truth_key
# # def test_train_with_lr_scheduler():
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # n = 10
# # expected_keys = [[], list(range(0, 50, 3))]
# # param = [0, 3]
# # for i, truth_key in zip(param, expected_keys):
# # pinn = PINN(
# # problem,
# # model,
# # lr_scheduler_type=torch.optim.lr_scheduler.CyclicLR,
# # lr_scheduler_kwargs={'base_lr' : 0.1, 'max_lr' : 0.3, 'cycle_momentum': False}
# # )
# # pinn.discretise_domain(n, 'grid', locations=boundaries)
# # pinn.discretise_domain(n, 'grid', locations=['D'])
# # pinn.train(50, save_loss=i)
# # assert list(pinn.history_loss.keys()) == truth_key
# # # def test_train_batch():
# # # pinn = PINN(problem, model, batch_size=6)
# # # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # # n = 10
# # # pinn.discretise_domain(n, 'grid', locations=boundaries)
# # # pinn.discretise_domain(n, 'grid', locations=['D'])
# # # pinn.train(5)
# # # def test_train_batch_2():
# # # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # # n = 10
# # # expected_keys = [[], list(range(0, 50, 3))]
# # # param = [0, 3]
# # # for i, truth_key in zip(param, expected_keys):
# # # pinn = PINN(problem, model, batch_size=6)
# # # pinn.discretise_domain(n, 'grid', locations=boundaries)
# # # pinn.discretise_domain(n, 'grid', locations=['D'])
# # # pinn.train(50, save_loss=i)
# # # assert list(pinn.history_loss.keys()) == truth_key
# # if torch.cuda.is_available():
# # # def test_gpu_train():
# # # pinn = PINN(problem, model, batch_size=20, device='cuda')
# # # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # # n = 100
# # # pinn.discretise_domain(n, 'grid', locations=boundaries)
# # # pinn.discretise_domain(n, 'grid', locations=['D'])
# # # pinn.train(5)
# # def test_gpu_train_nobatch():
# # pinn = PINN(problem, model, batch_size=None, device='cuda')
# # boundaries = ['gamma1', 'gamma2', 'gamma3', 'gamma4']
# # n = 100
# # pinn.discretise_domain(n, 'grid', locations=boundaries)
# # pinn.discretise_domain(n, 'grid', locations=['D'])
# # pinn.train(5)

View File

@@ -0,0 +1,159 @@
import torch
import pytest
from pina import LabelTensor, Condition
from pina.solvers import SelfAdaptivePINN as SAPINN
from pina.trainer import Trainer
from pina.model import FeedForward
from pina.problem.zoo import (
Poisson2DSquareProblem as Poisson,
InversePoisson2DSquareProblem as InversePoisson
)
from pina.condition import (
InputOutputPointsCondition,
InputPointsEquationCondition,
DomainEquationCondition
)
from torch._dynamo.eval_frame import OptimizedModule
# make the problem
problem = Poisson()
problem.discretise_domain(50)
inverse_problem = InversePoisson()
inverse_problem.discretise_domain(50)
model = FeedForward(
len(problem.input_variables),
len(problem.output_variables)
)
# add input-output condition to test supervised learning
input_pts = torch.rand(50, len(problem.input_variables))
input_pts = LabelTensor(input_pts, problem.input_variables)
output_pts = torch.rand(50, len(problem.output_variables))
output_pts = LabelTensor(output_pts, problem.output_variables)
problem.conditions['data'] = Condition(
input_points=input_pts,
output_points=output_pts
)
@pytest.mark.parametrize("problem", [problem, inverse_problem])
@pytest.mark.parametrize("weight_fn", [torch.nn.Sigmoid(), torch.nn.Tanh()])
def test_constructor(problem, weight_fn):
with pytest.raises(ValueError):
SAPINN(model=model, problem=problem, weight_function=1)
solver = SAPINN(problem=problem, model=model, weight_function=weight_fn)
assert solver.accepted_conditions_types == (
InputOutputPointsCondition,
InputPointsEquationCondition,
DomainEquationCondition
)
@pytest.mark.parametrize("problem", [problem, inverse_problem])
def test_wrong_batch(problem):
with pytest.raises(NotImplementedError):
solver = SAPINN(model=model, problem=problem)
trainer = Trainer(solver=solver,
max_epochs=2,
accelerator='cpu',
batch_size=10,
train_size=1.,
val_size=0.,
test_size=0.)
trainer.train()
@pytest.mark.parametrize("problem", [problem, inverse_problem])
@pytest.mark.parametrize("compile", [True, False])
def test_solver_train(problem, compile):
solver = SAPINN(model=model, problem=problem)
trainer = Trainer(solver=solver,
max_epochs=2,
accelerator='cpu',
batch_size=None,
train_size=1.,
val_size=0.,
test_size=0.,
compile=compile)
trainer.train()
if trainer.compile:
assert (all([isinstance(model, (OptimizedModule, torch.nn.ModuleDict))
for model in solver.models]))
@pytest.mark.parametrize("problem", [problem, inverse_problem])
@pytest.mark.parametrize("compile", [True, False])
def test_solver_validation(problem, compile):
solver = SAPINN(model=model, problem=problem)
trainer = Trainer(solver=solver,
max_epochs=2,
accelerator='cpu',
batch_size=None,
train_size=0.9,
val_size=0.1,
test_size=0.,
compile=compile)
trainer.train()
if trainer.compile:
assert (all([isinstance(model, (OptimizedModule, torch.nn.ModuleDict))
for model in solver.models]))
@pytest.mark.parametrize("problem", [problem, inverse_problem])
@pytest.mark.parametrize("compile", [True, False])
def test_solver_test(problem, compile):
solver = SAPINN(model=model, problem=problem)
trainer = Trainer(solver=solver,
max_epochs=2,
accelerator='cpu',
batch_size=None,
train_size=0.7,
val_size=0.2,
test_size=0.1,
compile=compile)
trainer.test()
if trainer.compile:
assert (all([isinstance(model, (OptimizedModule, torch.nn.ModuleDict))
for model in solver.models]))
@pytest.mark.parametrize("problem", [problem, inverse_problem])
def test_train_load_restore(problem):
dir = "tests/test_solvers/tmp"
problem = problem
solver = SAPINN(model=model, problem=problem)
trainer = Trainer(solver=solver,
max_epochs=5,
accelerator='cpu',
batch_size=None,
train_size=0.7,
val_size=0.2,
test_size=0.1,
default_root_dir=dir)
trainer.train()
# restore
new_trainer = Trainer(solver=solver, max_epochs=5, accelerator='cpu')
new_trainer.train(
ckpt_path=f'{dir}/lightning_logs/version_0/checkpoints/' +
'epoch=4-step=5.ckpt')
# loading
new_solver = SAPINN.load_from_checkpoint(
f'{dir}/lightning_logs/version_0/checkpoints/epoch=4-step=5.ckpt',
problem=problem, model=model)
test_pts = LabelTensor(torch.rand(20, 2), problem.input_variables)
assert new_solver.forward(test_pts).shape == (20, 1)
assert new_solver.forward(test_pts).shape == (
solver.forward(test_pts).shape
)
torch.testing.assert_close(
new_solver.forward(test_pts),
solver.forward(test_pts))
# rm directories
import shutil
shutil.rmtree('tests/test_solvers/tmp')

View File

@@ -1,143 +1,133 @@
import torch
import pytest
from pina.problem import AbstractProblem, SpatialProblem
from pina import Condition, LabelTensor
from pina.condition import InputOutputPointsCondition
from pina.problem import AbstractProblem
from pina.solvers import SupervisedSolver
from pina.model import FeedForward
from pina.equation import Equation
from pina.equation.equation_factory import FixedValue
from pina.operators import laplacian
from pina.domain import CartesianDomain
from pina.trainer import Trainer
# in_ = LabelTensor(torch.tensor([[0., 1.]]), ['u_0', 'u_1'])
# out_ = LabelTensor(torch.tensor([[0.]]), ['u'])
from torch._dynamo.eval_frame import OptimizedModule
# class NeuralOperatorProblem(AbstractProblem):
# input_variables = ['u_0', 'u_1']
# output_variables = ['u']
# conditions = {
# 'data': Condition(input_points=in_, output_points=out_),
# }
class LabelTensorProblem(AbstractProblem):
input_variables = ['u_0', 'u_1']
output_variables = ['u']
conditions = {
'data': Condition(
input_points=LabelTensor(torch.randn(20, 2), ['u_0', 'u_1']),
output_points=LabelTensor(torch.randn(20, 1), ['u'])),
}
# class myFeature(torch.nn.Module):
# """
# Feature: sin(x)
# """
# def __init__(self):
# super(myFeature, self).__init__()
# def forward(self, x):
# t = (torch.sin(x.extract(['u_0']) * torch.pi) *
# torch.sin(x.extract(['u_1']) * torch.pi))
# return LabelTensor(t, ['sin(x)sin(y)'])
class TensorProblem(AbstractProblem):
input_variables = ['u_0', 'u_1']
output_variables = ['u']
conditions = {
'data': Condition(
input_points=torch.randn(20, 2),
output_points=torch.randn(20, 1))
}
# problem = NeuralOperatorProblem()
# extra_feats = [myFeature()]
# model = FeedForward(len(problem.input_variables), len(problem.output_variables))
# model_extra_feats = FeedForward(
# len(problem.input_variables) + 1, len(problem.output_variables))
model = FeedForward(2, 1)
# def test_constructor():
# SupervisedSolver(problem=problem, model=model)
def test_constructor():
SupervisedSolver(problem=TensorProblem(), model=model)
SupervisedSolver(problem=LabelTensorProblem(), model=model)
assert SupervisedSolver.accepted_conditions_types == (
InputOutputPointsCondition
)
# test_constructor()
@pytest.mark.parametrize("batch_size", [None, 1, 5, 20])
@pytest.mark.parametrize("use_lt", [True, False])
@pytest.mark.parametrize("compile", [True, False])
def test_solver_train(use_lt, batch_size, compile):
problem = LabelTensorProblem() if use_lt else TensorProblem()
solver = SupervisedSolver(problem=problem, model=model, use_lt=use_lt)
trainer = Trainer(solver=solver,
max_epochs=2,
accelerator='cpu',
batch_size=batch_size,
train_size=1.,
test_size=0.,
val_size=0.,
compile=compile)
trainer.train()
if trainer.compile:
assert (isinstance(solver.model, OptimizedModule))
# def laplace_equation(input_, output_):
# force_term = (torch.sin(input_.extract(['x']) * torch.pi) *
# torch.sin(input_.extract(['y']) * torch.pi))
# delta_u = laplacian(output_.extract(['u']), input_)
# return delta_u - force_term
@pytest.mark.parametrize("use_lt", [True, False])
@pytest.mark.parametrize("compile", [True, False])
def test_solver_validation(use_lt, compile):
problem = LabelTensorProblem() if use_lt else TensorProblem()
solver = SupervisedSolver(problem=problem, model=model, use_lt=use_lt)
trainer = Trainer(solver=solver,
max_epochs=2,
accelerator='cpu',
batch_size=None,
train_size=0.9,
val_size=0.1,
test_size=0.,
compile=compile)
trainer.train()
if trainer.compile:
assert (isinstance(solver.model, OptimizedModule))
# my_laplace = Equation(laplace_equation)
@pytest.mark.parametrize("use_lt", [True, False])
@pytest.mark.parametrize("compile", [True, False])
def test_solver_test(use_lt, compile):
problem = LabelTensorProblem() if use_lt else TensorProblem()
solver = SupervisedSolver(problem=problem, model=model, use_lt=use_lt)
trainer = Trainer(solver=solver,
max_epochs=2,
accelerator='cpu',
batch_size=None,
train_size=0.8,
val_size=0.1,
test_size=0.1,
compile=compile)
trainer.test()
if trainer.compile:
assert (isinstance(solver.model, OptimizedModule))
# class Poisson(SpatialProblem):
# output_variables = ['u']
# spatial_domain = CartesianDomain({'x': [0, 1], 'y': [0, 1]})
def test_train_load_restore():
dir = "tests/test_solvers/tmp/"
problem = LabelTensorProblem()
solver = SupervisedSolver(problem=problem, model=model)
trainer = Trainer(solver=solver,
max_epochs=5,
accelerator='cpu',
batch_size=None,
train_size=0.9,
test_size=0.1,
val_size=0.,
default_root_dir=dir)
trainer.train()
# conditions = {
# 'gamma1':
# Condition(domain=CartesianDomain({
# 'x': [0, 1],
# 'y': 1
# }),
# equation=FixedValue(0.0)),
# 'gamma2':
# Condition(domain=CartesianDomain({
# 'x': [0, 1],
# 'y': 0
# }),
# equation=FixedValue(0.0)),
# 'gamma3':
# Condition(domain=CartesianDomain({
# 'x': 1,
# 'y': [0, 1]
# }),
# equation=FixedValue(0.0)),
# 'gamma4':
# Condition(domain=CartesianDomain({
# 'x': 0,
# 'y': [0, 1]
# }),
# equation=FixedValue(0.0)),
# 'D':
# Condition(domain=CartesianDomain({
# 'x': [0, 1],
# 'y': [0, 1]
# }),
# equation=my_laplace),
# 'data':
# Condition(input_points=in_, output_points=out_)
# }
# restore
new_trainer = Trainer(solver=solver, max_epochs=5, accelerator='cpu')
new_trainer.train(
ckpt_path=f'{dir}/lightning_logs/version_0/checkpoints/' +
'epoch=4-step=5.ckpt')
# def poisson_sol(self, pts):
# return -(torch.sin(pts.extract(['x']) * torch.pi) *
# torch.sin(pts.extract(['y']) * torch.pi)) / (2 * torch.pi ** 2)
# loading
new_solver = SupervisedSolver.load_from_checkpoint(
f'{dir}/lightning_logs/version_0/checkpoints/epoch=4-step=5.ckpt',
problem=problem, model=model)
# truth_solution = poisson_sol
test_pts = LabelTensor(torch.rand(20, 2), problem.input_variables)
assert new_solver.forward(test_pts).shape == (20, 1)
assert new_solver.forward(test_pts).shape == solver.forward(test_pts).shape
torch.testing.assert_close(
new_solver.forward(test_pts),
solver.forward(test_pts))
# def test_wrong_constructor():
# poisson_problem = Poisson()
# with pytest.raises(ValueError):
# SupervisedSolver(problem=poisson_problem, model=model)
# def test_train_cpu():
# solver = SupervisedSolver(problem=problem, model=model)
# trainer = Trainer(solver=solver,
# max_epochs=200,
# accelerator='gpu',
# batch_size=5,
# train_size=1,
# test_size=0.,
# val_size=0.)
# trainer.train()
# test_train_cpu()
# def test_extra_features_constructor():
# SupervisedSolver(problem=problem,
# model=model_extra_feats,
# extra_features=extra_feats)
# def test_extra_features_train_cpu():
# solver = SupervisedSolver(problem=problem,
# model=model_extra_feats,
# extra_features=extra_feats)
# trainer = Trainer(solver=solver,
# max_epochs=200,
# accelerator='gpu',
# batch_size=5)
# trainer.train()
# rm directories
import shutil
shutil.rmtree('tests/test_solvers/tmp')

View File

@@ -0,0 +1,42 @@
import pytest
import torch
from pina import Trainer
from pina.solvers import PINN
from pina.model import FeedForward
from pina.problem.zoo import Poisson2DSquareProblem
from pina.loss import ScalarWeighting
problem = Poisson2DSquareProblem()
model = FeedForward(len(problem.input_variables), len(problem.output_variables))
condition_names = problem.conditions.keys()
print(problem.conditions.keys())
@pytest.mark.parametrize("weights",
[1, 1., dict(zip(condition_names, [1]*len(condition_names)))])
def test_constructor(weights):
ScalarWeighting(weights=weights)
@pytest.mark.parametrize("weights", ['a', [1,2,3]])
def test_wrong_constructor(weights):
with pytest.raises(ValueError):
ScalarWeighting(weights=weights)
@pytest.mark.parametrize("weights",
[1, 1., dict(zip(condition_names, [1]*len(condition_names)))])
def test_aggregate(weights):
weighting = ScalarWeighting(weights=weights)
losses = dict(zip(condition_names, [torch.randn(1) for _ in range(len(condition_names))]))
weighting.aggregate(losses=losses)
@pytest.mark.parametrize("weights",
[1, 1., dict(zip(condition_names, [1]*len(condition_names)))])
def test_train_aggregation(weights):
weighting = ScalarWeighting(weights=weights)
problem.discretise_domain(50)
solver = PINN(
problem=problem,
model=model,
weighting=weighting)
trainer = Trainer(solver=solver, max_epochs=5, accelerator='cpu')
trainer.train()