183 lines
5.3 KiB
Python
183 lines
5.3 KiB
Python
import torch
|
|
import pytest
|
|
|
|
from pina import LabelTensor, Condition
|
|
from pina.solver import SelfAdaptivePINN as SAPINN
|
|
from pina.trainer import Trainer
|
|
from pina.model import FeedForward
|
|
from pina.problem.zoo import (
|
|
Poisson2DSquareProblem as Poisson,
|
|
InversePoisson2DSquareProblem as InversePoisson,
|
|
)
|
|
from pina.condition import (
|
|
InputTargetCondition,
|
|
InputEquationCondition,
|
|
DomainEquationCondition,
|
|
)
|
|
from torch._dynamo.eval_frame import OptimizedModule
|
|
|
|
|
|
# define problems
|
|
problem = Poisson()
|
|
problem.discretise_domain(10)
|
|
inverse_problem = InversePoisson()
|
|
inverse_problem.discretise_domain(10)
|
|
|
|
# reduce the number of data points to speed up testing
|
|
if hasattr(inverse_problem.conditions, "data"):
|
|
data_condition = inverse_problem.conditions["data"]
|
|
data_condition.input = data_condition.input[:10]
|
|
data_condition.target = data_condition.target[:10]
|
|
|
|
# add input-output condition to test supervised learning
|
|
input_pts = torch.rand(10, len(problem.input_variables))
|
|
input_pts = LabelTensor(input_pts, problem.input_variables)
|
|
output_pts = torch.rand(10, len(problem.output_variables))
|
|
output_pts = LabelTensor(output_pts, problem.output_variables)
|
|
problem.conditions["data"] = Condition(input=input_pts, target=output_pts)
|
|
|
|
# define model
|
|
model = FeedForward(len(problem.input_variables), len(problem.output_variables))
|
|
|
|
|
|
@pytest.mark.parametrize("problem", [problem, inverse_problem])
|
|
@pytest.mark.parametrize("weight_fn", [torch.nn.Sigmoid(), torch.nn.Tanh()])
|
|
def test_constructor(problem, weight_fn):
|
|
|
|
solver = SAPINN(problem=problem, model=model, weight_function=weight_fn)
|
|
|
|
with pytest.raises(ValueError):
|
|
SAPINN(model=model, problem=problem, weight_function=1)
|
|
|
|
assert solver.accepted_conditions_types == (
|
|
InputTargetCondition,
|
|
InputEquationCondition,
|
|
DomainEquationCondition,
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize("problem", [problem, inverse_problem])
|
|
@pytest.mark.parametrize("compile", [True, False])
|
|
@pytest.mark.parametrize(
|
|
"loss", [torch.nn.L1Loss(reduction="sum"), torch.nn.MSELoss()]
|
|
)
|
|
def test_solver_train(problem, compile, loss):
|
|
solver = SAPINN(model=model, problem=problem, loss=loss)
|
|
trainer = Trainer(
|
|
solver=solver,
|
|
max_epochs=2,
|
|
accelerator="cpu",
|
|
batch_size=None,
|
|
train_size=1.0,
|
|
val_size=0.0,
|
|
test_size=0.0,
|
|
compile=compile,
|
|
)
|
|
trainer.train()
|
|
if trainer.compile:
|
|
assert all(
|
|
[
|
|
isinstance(model, (OptimizedModule, torch.nn.ModuleDict))
|
|
for model in solver.models
|
|
]
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize("problem", [problem, inverse_problem])
|
|
@pytest.mark.parametrize("compile", [True, False])
|
|
@pytest.mark.parametrize(
|
|
"loss", [torch.nn.L1Loss(reduction="sum"), torch.nn.MSELoss()]
|
|
)
|
|
def test_solver_validation(problem, compile, loss):
|
|
solver = SAPINN(model=model, problem=problem, loss=loss)
|
|
trainer = Trainer(
|
|
solver=solver,
|
|
max_epochs=2,
|
|
accelerator="cpu",
|
|
batch_size=None,
|
|
train_size=0.9,
|
|
val_size=0.1,
|
|
test_size=0.0,
|
|
compile=compile,
|
|
)
|
|
trainer.train()
|
|
if trainer.compile:
|
|
assert all(
|
|
[
|
|
isinstance(model, (OptimizedModule, torch.nn.ModuleDict))
|
|
for model in solver.models
|
|
]
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize("problem", [problem, inverse_problem])
|
|
@pytest.mark.parametrize("compile", [True, False])
|
|
@pytest.mark.parametrize(
|
|
"loss", [torch.nn.L1Loss(reduction="sum"), torch.nn.MSELoss()]
|
|
)
|
|
def test_solver_test(problem, compile, loss):
|
|
solver = SAPINN(model=model, problem=problem, loss=loss)
|
|
trainer = Trainer(
|
|
solver=solver,
|
|
max_epochs=2,
|
|
accelerator="cpu",
|
|
batch_size=None,
|
|
train_size=0.7,
|
|
val_size=0.2,
|
|
test_size=0.1,
|
|
compile=compile,
|
|
)
|
|
trainer.test()
|
|
if trainer.compile:
|
|
assert all(
|
|
[
|
|
isinstance(model, (OptimizedModule, torch.nn.ModuleDict))
|
|
for model in solver.models
|
|
]
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize("problem", [problem, inverse_problem])
|
|
def test_train_load_restore(problem):
|
|
dir = "tests/test_solver/tmp"
|
|
problem = problem
|
|
solver = SAPINN(model=model, problem=problem)
|
|
trainer = Trainer(
|
|
solver=solver,
|
|
max_epochs=5,
|
|
accelerator="cpu",
|
|
batch_size=None,
|
|
train_size=0.7,
|
|
val_size=0.2,
|
|
test_size=0.1,
|
|
default_root_dir=dir,
|
|
)
|
|
trainer.train()
|
|
# restore
|
|
new_trainer = Trainer(solver=solver, max_epochs=5, accelerator="cpu")
|
|
new_trainer.train(
|
|
ckpt_path=f"{dir}/lightning_logs/version_0/checkpoints/"
|
|
+ "epoch=4-step=5.ckpt"
|
|
)
|
|
|
|
# loading
|
|
new_solver = SAPINN.load_from_checkpoint(
|
|
f"{dir}/lightning_logs/version_0/checkpoints/epoch=4-step=5.ckpt",
|
|
problem=problem,
|
|
model=model,
|
|
)
|
|
|
|
test_pts = LabelTensor(torch.rand(20, 2), problem.input_variables)
|
|
assert new_solver.forward(test_pts).shape == (20, 1)
|
|
assert new_solver.forward(test_pts).shape == (
|
|
solver.forward(test_pts).shape
|
|
)
|
|
torch.testing.assert_close(
|
|
new_solver.forward(test_pts), solver.forward(test_pts)
|
|
)
|
|
|
|
# rm directories
|
|
import shutil
|
|
|
|
shutil.rmtree("tests/test_solver/tmp")
|