Refactoring solvers (#541)

* Refactoring solvers

* Simplify logic compile
* Improve and update doc
* Create SupervisedSolverInterface
* Specialize SupervisedSolver and ReducedOrderModelSolver
* Create EnsembleSolverInterface + EnsembleSupervisedSolver
* Create tests ensemble solvers

* formatter

* codacy

* fix issues + speedup test
This commit is contained in:
Dario Coscia
2025-04-09 14:51:42 +02:00
parent 485c8dd789
commit 6dd7bd2825
37 changed files with 1514 additions and 510 deletions

View File

@@ -27,12 +27,12 @@ class DummySpatialProblem(SpatialProblem):
# define problems
problem = DiffusionReactionProblem()
problem.discretise_domain(50)
problem.discretise_domain(10)
# add input-output condition to test supervised learning
input_pts = torch.rand(50, len(problem.input_variables))
input_pts = torch.rand(10, len(problem.input_variables))
input_pts = LabelTensor(input_pts, problem.input_variables)
output_pts = torch.rand(50, len(problem.output_variables))
output_pts = torch.rand(10, len(problem.output_variables))
output_pts = LabelTensor(output_pts, problem.output_variables)
problem.conditions["data"] = Condition(input=input_pts, target=output_pts)

View File

@@ -19,9 +19,9 @@ from torch._dynamo.eval_frame import OptimizedModule
# define problems
problem = Poisson()
problem.discretise_domain(50)
problem.discretise_domain(10)
inverse_problem = InversePoisson()
inverse_problem.discretise_domain(50)
inverse_problem.discretise_domain(10)
# reduce the number of data points to speed up testing
data_condition = inverse_problem.conditions["data"]
@@ -29,9 +29,9 @@ data_condition.input = data_condition.input[:10]
data_condition.target = data_condition.target[:10]
# add input-output condition to test supervised learning
input_pts = torch.rand(50, len(problem.input_variables))
input_pts = torch.rand(10, len(problem.input_variables))
input_pts = LabelTensor(input_pts, problem.input_variables)
output_pts = torch.rand(50, len(problem.output_variables))
output_pts = torch.rand(10, len(problem.output_variables))
output_pts = LabelTensor(output_pts, problem.output_variables)
problem.conditions["data"] = Condition(input=input_pts, target=output_pts)

View File

@@ -0,0 +1,149 @@
import pytest
import torch
from pina import LabelTensor, Condition
from pina.model import FeedForward
from pina.trainer import Trainer
from pina.solver import DeepEnsemblePINN
from pina.condition import (
InputTargetCondition,
InputEquationCondition,
DomainEquationCondition,
)
from pina.problem.zoo import Poisson2DSquareProblem as Poisson
from torch._dynamo.eval_frame import OptimizedModule
# define problems
problem = Poisson()
problem.discretise_domain(10)
# add input-output condition to test supervised learning
input_pts = torch.rand(10, len(problem.input_variables))
input_pts = LabelTensor(input_pts, problem.input_variables)
output_pts = torch.rand(10, len(problem.output_variables))
output_pts = LabelTensor(output_pts, problem.output_variables)
problem.conditions["data"] = Condition(input=input_pts, target=output_pts)
# define models
models = [
FeedForward(
len(problem.input_variables), len(problem.output_variables), n_layers=1
)
for _ in range(5)
]
def test_constructor():
solver = DeepEnsemblePINN(problem=problem, models=models)
assert solver.accepted_conditions_types == (
InputTargetCondition,
InputEquationCondition,
DomainEquationCondition,
)
assert solver.num_ensemble == 5
@pytest.mark.parametrize("batch_size", [None, 1, 5, 20])
@pytest.mark.parametrize("compile", [True, False])
def test_solver_train(batch_size, compile):
solver = DeepEnsemblePINN(models=models, problem=problem)
trainer = Trainer(
solver=solver,
max_epochs=2,
accelerator="cpu",
batch_size=batch_size,
train_size=1.0,
val_size=0.0,
test_size=0.0,
compile=compile,
)
trainer.train()
if trainer.compile:
assert all(
[isinstance(model, OptimizedModule) for model in solver.models]
)
@pytest.mark.parametrize("batch_size", [None, 1, 5, 20])
@pytest.mark.parametrize("compile", [True, False])
def test_solver_validation(batch_size, compile):
solver = DeepEnsemblePINN(models=models, problem=problem)
trainer = Trainer(
solver=solver,
max_epochs=2,
accelerator="cpu",
batch_size=batch_size,
train_size=0.9,
val_size=0.1,
test_size=0.0,
compile=compile,
)
trainer.train()
if trainer.compile:
assert all(
[isinstance(model, OptimizedModule) for model in solver.models]
)
@pytest.mark.parametrize("batch_size", [None, 1, 5, 20])
@pytest.mark.parametrize("compile", [True, False])
def test_solver_test(batch_size, compile):
solver = DeepEnsemblePINN(models=models, problem=problem)
trainer = Trainer(
solver=solver,
max_epochs=2,
accelerator="cpu",
batch_size=batch_size,
train_size=0.7,
val_size=0.2,
test_size=0.1,
compile=compile,
)
trainer.test()
if trainer.compile:
assert all(
[isinstance(model, OptimizedModule) for model in solver.models]
)
def test_train_load_restore():
dir = "tests/test_solver/tmp"
solver = DeepEnsemblePINN(models=models, problem=problem)
trainer = Trainer(
solver=solver,
max_epochs=5,
accelerator="cpu",
batch_size=None,
train_size=0.7,
val_size=0.2,
test_size=0.1,
default_root_dir=dir,
)
trainer.train()
# restore
new_trainer = Trainer(solver=solver, max_epochs=5, accelerator="cpu")
new_trainer.train(
ckpt_path=f"{dir}/lightning_logs/version_0/checkpoints/"
+ "epoch=4-step=5.ckpt"
)
# loading
new_solver = DeepEnsemblePINN.load_from_checkpoint(
f"{dir}/lightning_logs/version_0/checkpoints/epoch=4-step=5.ckpt",
problem=problem,
models=models,
)
test_pts = LabelTensor(torch.rand(20, 2), problem.input_variables)
assert new_solver.forward(test_pts).shape == solver.forward(test_pts).shape
torch.testing.assert_close(
new_solver.forward(test_pts), solver.forward(test_pts)
)
# rm directories
import shutil
shutil.rmtree("tests/test_solver/tmp")

View File

@@ -0,0 +1,275 @@
import torch
import pytest
from torch._dynamo.eval_frame import OptimizedModule
from torch_geometric.nn import GCNConv
from pina import Condition, LabelTensor
from pina.condition import InputTargetCondition
from pina.problem import AbstractProblem
from pina.solver import DeepEnsembleSupervisedSolver
from pina.model import FeedForward
from pina.trainer import Trainer
from pina.graph import KNNGraph
class LabelTensorProblem(AbstractProblem):
input_variables = ["u_0", "u_1"]
output_variables = ["u"]
conditions = {
"data": Condition(
input=LabelTensor(torch.randn(20, 2), ["u_0", "u_1"]),
target=LabelTensor(torch.randn(20, 1), ["u"]),
),
}
class TensorProblem(AbstractProblem):
input_variables = ["u_0", "u_1"]
output_variables = ["u"]
conditions = {
"data": Condition(input=torch.randn(20, 2), target=torch.randn(20, 1))
}
x = torch.rand((15, 20, 5))
pos = torch.rand((15, 20, 2))
output_ = torch.rand((15, 20, 1))
input_ = [
KNNGraph(x=x_, pos=pos_, neighbours=3, edge_attr=True)
for x_, pos_ in zip(x, pos)
]
class GraphProblem(AbstractProblem):
output_variables = None
conditions = {"data": Condition(input=input_, target=output_)}
x = LabelTensor(torch.rand((15, 20, 5)), ["a", "b", "c", "d", "e"])
pos = LabelTensor(torch.rand((15, 20, 2)), ["x", "y"])
output_ = LabelTensor(torch.rand((15, 20, 1)), ["u"])
input_ = [
KNNGraph(x=x[i], pos=pos[i], neighbours=3, edge_attr=True)
for i in range(len(x))
]
class GraphProblemLT(AbstractProblem):
output_variables = ["u"]
input_variables = ["a", "b", "c", "d", "e"]
conditions = {"data": Condition(input=input_, target=output_)}
models = [FeedForward(2, 1) for i in range(10)]
class Models(torch.nn.Module):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.lift = torch.nn.Linear(5, 10)
self.activation = torch.nn.Tanh()
self.output = torch.nn.Linear(10, 1)
self.conv = GCNConv(10, 10)
def forward(self, batch):
x = batch.x
edge_index = batch.edge_index
for _ in range(1):
y = self.lift(x)
y = self.activation(y)
y = self.conv(y, edge_index)
y = self.activation(y)
y = self.output(y)
return y
graph_models = [Models() for i in range(10)]
def test_constructor():
solver = DeepEnsembleSupervisedSolver(
problem=TensorProblem(), models=models
)
DeepEnsembleSupervisedSolver(problem=LabelTensorProblem(), models=models)
assert DeepEnsembleSupervisedSolver.accepted_conditions_types == (
InputTargetCondition
)
assert solver.num_ensemble == 10
@pytest.mark.parametrize("batch_size", [None, 1, 5, 20])
@pytest.mark.parametrize("use_lt", [True, False])
@pytest.mark.parametrize("compile", [True, False])
def test_solver_train(use_lt, batch_size, compile):
problem = LabelTensorProblem() if use_lt else TensorProblem()
solver = DeepEnsembleSupervisedSolver(
problem=problem, models=models, use_lt=use_lt
)
trainer = Trainer(
solver=solver,
max_epochs=2,
accelerator="cpu",
batch_size=batch_size,
train_size=1.0,
test_size=0.0,
val_size=0.0,
compile=compile,
)
trainer.train()
if trainer.compile:
assert all(
[isinstance(model, OptimizedModule) for model in solver.models]
)
@pytest.mark.parametrize("batch_size", [None, 1, 5, 20])
@pytest.mark.parametrize("use_lt", [True, False])
def test_solver_train_graph(batch_size, use_lt):
problem = GraphProblemLT() if use_lt else GraphProblem()
solver = DeepEnsembleSupervisedSolver(
problem=problem, models=graph_models, use_lt=use_lt
)
trainer = Trainer(
solver=solver,
max_epochs=2,
accelerator="cpu",
batch_size=batch_size,
train_size=1.0,
test_size=0.0,
val_size=0.0,
)
trainer.train()
@pytest.mark.parametrize("use_lt", [True, False])
@pytest.mark.parametrize("compile", [True, False])
def test_solver_validation(use_lt, compile):
problem = LabelTensorProblem() if use_lt else TensorProblem()
solver = DeepEnsembleSupervisedSolver(
problem=problem, models=models, use_lt=use_lt
)
trainer = Trainer(
solver=solver,
max_epochs=2,
accelerator="cpu",
batch_size=None,
train_size=0.9,
val_size=0.1,
test_size=0.0,
compile=compile,
)
trainer.train()
if trainer.compile:
assert all(
[isinstance(model, OptimizedModule) for model in solver.models]
)
@pytest.mark.parametrize("batch_size", [None, 1, 5, 20])
@pytest.mark.parametrize("use_lt", [True, False])
def test_solver_validation_graph(batch_size, use_lt):
problem = GraphProblemLT() if use_lt else GraphProblem()
solver = DeepEnsembleSupervisedSolver(
problem=problem, models=graph_models, use_lt=use_lt
)
trainer = Trainer(
solver=solver,
max_epochs=2,
accelerator="cpu",
batch_size=batch_size,
train_size=0.9,
val_size=0.1,
test_size=0.0,
)
trainer.train()
@pytest.mark.parametrize("use_lt", [True, False])
@pytest.mark.parametrize("compile", [True, False])
def test_solver_test(use_lt, compile):
problem = LabelTensorProblem() if use_lt else TensorProblem()
solver = DeepEnsembleSupervisedSolver(
problem=problem, models=models, use_lt=use_lt
)
trainer = Trainer(
solver=solver,
max_epochs=2,
accelerator="cpu",
batch_size=None,
train_size=0.8,
val_size=0.1,
test_size=0.1,
compile=compile,
)
trainer.test()
if trainer.compile:
assert all(
[isinstance(model, OptimizedModule) for model in solver.models]
)
@pytest.mark.parametrize("batch_size", [None, 1, 5, 20])
@pytest.mark.parametrize("use_lt", [True, False])
def test_solver_test_graph(batch_size, use_lt):
problem = GraphProblemLT() if use_lt else GraphProblem()
solver = DeepEnsembleSupervisedSolver(
problem=problem, models=graph_models, use_lt=use_lt
)
trainer = Trainer(
solver=solver,
max_epochs=2,
accelerator="cpu",
batch_size=batch_size,
train_size=0.8,
val_size=0.1,
test_size=0.1,
)
trainer.test()
def test_train_load_restore():
dir = "tests/test_solver/tmp/"
problem = LabelTensorProblem()
solver = DeepEnsembleSupervisedSolver(problem=problem, models=models)
trainer = Trainer(
solver=solver,
max_epochs=5,
accelerator="cpu",
batch_size=None,
train_size=0.9,
test_size=0.1,
val_size=0.0,
default_root_dir=dir,
)
trainer.train()
# restore
new_trainer = Trainer(solver=solver, max_epochs=5, accelerator="cpu")
new_trainer.train(
ckpt_path=f"{dir}/lightning_logs/version_0/checkpoints/"
+ "epoch=4-step=5.ckpt"
)
# loading
new_solver = DeepEnsembleSupervisedSolver.load_from_checkpoint(
f"{dir}/lightning_logs/version_0/checkpoints/epoch=4-step=5.ckpt",
problem=problem,
models=models,
)
test_pts = LabelTensor(torch.rand(20, 2), problem.input_variables)
assert new_solver.forward(test_pts).shape == solver.forward(test_pts).shape
torch.testing.assert_close(
new_solver.forward(test_pts), solver.forward(test_pts)
)
# rm directories
import shutil
shutil.rmtree("tests/test_solver/tmp")

View File

@@ -2,7 +2,7 @@ import torch
import torch.nn as nn
import pytest
from pina import Condition, LabelTensor
from pina import Condition
from pina.solver import GAROM
from pina.condition import InputTargetCondition
from pina.problem import AbstractProblem
@@ -15,7 +15,7 @@ class TensorProblem(AbstractProblem):
input_variables = ["u_0", "u_1"]
output_variables = ["u"]
conditions = {
"data": Condition(target=torch.randn(50, 2), input=torch.randn(50, 1))
"data": Condition(target=torch.randn(10, 2), input=torch.randn(10, 1))
}

View File

@@ -30,9 +30,9 @@ class DummyTimeProblem(TimeDependentProblem):
# define problems
problem = Poisson()
problem.discretise_domain(50)
problem.discretise_domain(10)
inverse_problem = InversePoisson()
inverse_problem.discretise_domain(50)
inverse_problem.discretise_domain(10)
# reduce the number of data points to speed up testing
data_condition = inverse_problem.conditions["data"]
@@ -40,9 +40,9 @@ data_condition.input = data_condition.input[:10]
data_condition.target = data_condition.target[:10]
# add input-output condition to test supervised learning
input_pts = torch.rand(50, len(problem.input_variables))
input_pts = torch.rand(10, len(problem.input_variables))
input_pts = LabelTensor(input_pts, problem.input_variables)
output_pts = torch.rand(50, len(problem.output_variables))
output_pts = torch.rand(10, len(problem.output_variables))
output_pts = LabelTensor(output_pts, problem.output_variables)
problem.conditions["data"] = Condition(input=input_pts, target=output_pts)

View File

@@ -19,9 +19,9 @@ from torch._dynamo.eval_frame import OptimizedModule
# define problems
problem = Poisson()
problem.discretise_domain(50)
problem.discretise_domain(10)
inverse_problem = InversePoisson()
inverse_problem.discretise_domain(50)
inverse_problem.discretise_domain(10)
# reduce the number of data points to speed up testing
data_condition = inverse_problem.conditions["data"]
@@ -29,9 +29,9 @@ data_condition.input = data_condition.input[:10]
data_condition.target = data_condition.target[:10]
# add input-output condition to test supervised learning
input_pts = torch.rand(50, len(problem.input_variables))
input_pts = torch.rand(10, len(problem.input_variables))
input_pts = LabelTensor(input_pts, problem.input_variables)
output_pts = torch.rand(50, len(problem.output_variables))
output_pts = torch.rand(10, len(problem.output_variables))
output_pts = LabelTensor(output_pts, problem.output_variables)
problem.conditions["data"] = Condition(input=input_pts, target=output_pts)

View File

@@ -18,9 +18,9 @@ from torch._dynamo.eval_frame import OptimizedModule
# define problems
problem = Poisson()
problem.discretise_domain(50)
problem.discretise_domain(10)
inverse_problem = InversePoisson()
inverse_problem.discretise_domain(50)
inverse_problem.discretise_domain(10)
# reduce the number of data points to speed up testing
data_condition = inverse_problem.conditions["data"]
@@ -28,9 +28,9 @@ data_condition.input = data_condition.input[:10]
data_condition.target = data_condition.target[:10]
# add input-output condition to test supervised learning
input_pts = torch.rand(50, len(problem.input_variables))
input_pts = torch.rand(10, len(problem.input_variables))
input_pts = LabelTensor(input_pts, problem.input_variables)
output_pts = torch.rand(50, len(problem.output_variables))
output_pts = torch.rand(10, len(problem.output_variables))
output_pts = LabelTensor(output_pts, problem.output_variables)
problem.conditions["data"] = Condition(input=input_pts, target=output_pts)

View File

@@ -19,9 +19,9 @@ from torch._dynamo.eval_frame import OptimizedModule
# define problems
problem = Poisson()
problem.discretise_domain(50)
problem.discretise_domain(10)
inverse_problem = InversePoisson()
inverse_problem.discretise_domain(50)
inverse_problem.discretise_domain(10)
# reduce the number of data points to speed up testing
data_condition = inverse_problem.conditions["data"]
@@ -29,9 +29,9 @@ data_condition.input = data_condition.input[:10]
data_condition.target = data_condition.target[:10]
# add input-output condition to test supervised learning
input_pts = torch.rand(50, len(problem.input_variables))
input_pts = torch.rand(10, len(problem.input_variables))
input_pts = LabelTensor(input_pts, problem.input_variables)
output_pts = torch.rand(50, len(problem.output_variables))
output_pts = torch.rand(10, len(problem.output_variables))
output_pts = LabelTensor(output_pts, problem.output_variables)
problem.conditions["data"] = Condition(input=input_pts, target=output_pts)

View File

@@ -30,9 +30,9 @@ class TensorProblem(AbstractProblem):
}
x = torch.rand((100, 20, 5))
pos = torch.rand((100, 20, 2))
output_ = torch.rand((100, 20, 1))
x = torch.rand((15, 20, 5))
pos = torch.rand((15, 20, 2))
output_ = torch.rand((15, 20, 1))
input_ = [
KNNGraph(x=x_, pos=pos_, neighbours=3, edge_attr=True)
for x_, pos_ in zip(x, pos)
@@ -44,9 +44,9 @@ class GraphProblem(AbstractProblem):
conditions = {"data": Condition(input=input_, target=output_)}
x = LabelTensor(torch.rand((100, 20, 5)), ["a", "b", "c", "d", "e"])
pos = LabelTensor(torch.rand((100, 20, 2)), ["x", "y"])
output_ = LabelTensor(torch.rand((100, 20, 1)), ["u"])
x = LabelTensor(torch.rand((15, 20, 5)), ["a", "b", "c", "d", "e"])
pos = LabelTensor(torch.rand((15, 20, 2)), ["x", "y"])
output_ = LabelTensor(torch.rand((15, 20, 1)), ["u"])
input_ = [
KNNGraph(x=x[i], pos=pos[i], neighbours=3, edge_attr=True)
for i in range(len(x))