Dev Update (#582)
* Fix adaptive refinement (#571) --------- Co-authored-by: Dario Coscia <93731561+dario-coscia@users.noreply.github.com> * Remove collector * Fixes * Fixes * rm unnecessary comment * fix advection (#581) * Fix tutorial .html link (#580) * fix problem data collection for v0.1 (#584) * Message Passing Module (#516) * add deep tensor network block * add interaction network block * add radial field network block * add schnet block * add equivariant network block * fix + tests + doc files * fix egnn + equivariance/invariance tests Co-authored-by: Dario Coscia <dariocos99@gmail.com> --------- Co-authored-by: giovanni <giovanni.canali98@yahoo.it> Co-authored-by: AleDinve <giuseppealessio.d@student.unisi.it> * add type checker (#527) --------- Co-authored-by: Filippo Olivo <filippo@filippoolivo.com> Co-authored-by: Giovanni Canali <115086358+GiovanniCanali@users.noreply.github.com> Co-authored-by: giovanni <giovanni.canali98@yahoo.it> Co-authored-by: AleDinve <giuseppealessio.d@student.unisi.it>
This commit is contained in:
@@ -1,45 +1,58 @@
|
||||
import pytest
|
||||
|
||||
from torch.nn import MSELoss
|
||||
|
||||
from pina.solver import PINN
|
||||
from pina.trainer import Trainer
|
||||
from pina.model import FeedForward
|
||||
from pina.problem.zoo import Poisson2DSquareProblem as Poisson
|
||||
from pina.callback import R3Refinement
|
||||
from pina.callback.refinement import R3Refinement
|
||||
|
||||
|
||||
# make the problem
|
||||
poisson_problem = Poisson()
|
||||
boundaries = ["g1", "g2", "g3", "g4"]
|
||||
n = 10
|
||||
poisson_problem.discretise_domain(n, "grid", domains=boundaries)
|
||||
poisson_problem.discretise_domain(n, "grid", domains="D")
|
||||
poisson_problem.discretise_domain(10, "grid", domains=["g1", "g2", "g3", "g4"])
|
||||
poisson_problem.discretise_domain(10, "grid", domains="D")
|
||||
model = FeedForward(
|
||||
len(poisson_problem.input_variables), len(poisson_problem.output_variables)
|
||||
)
|
||||
|
||||
# make the solver
|
||||
solver = PINN(problem=poisson_problem, model=model)
|
||||
|
||||
|
||||
# def test_r3constructor():
|
||||
# R3Refinement(sample_every=10)
|
||||
def test_constructor():
|
||||
# good constructor
|
||||
R3Refinement(sample_every=10)
|
||||
R3Refinement(sample_every=10, residual_loss=MSELoss)
|
||||
R3Refinement(sample_every=10, condition_to_update=["D"])
|
||||
# wrong constructor
|
||||
with pytest.raises(ValueError):
|
||||
R3Refinement(sample_every="str")
|
||||
with pytest.raises(ValueError):
|
||||
R3Refinement(sample_every=10, condition_to_update=3)
|
||||
|
||||
|
||||
# def test_r3refinment_routine():
|
||||
# # make the trainer
|
||||
# trainer = Trainer(solver=solver,
|
||||
# callback=[R3Refinement(sample_every=1)],
|
||||
# accelerator='cpu',
|
||||
# max_epochs=5)
|
||||
# trainer.train()
|
||||
|
||||
# def test_r3refinment_routine():
|
||||
# model = FeedForward(len(poisson_problem.input_variables),
|
||||
# len(poisson_problem.output_variables))
|
||||
# solver = PINN(problem=poisson_problem, model=model)
|
||||
# trainer = Trainer(solver=solver,
|
||||
# callback=[R3Refinement(sample_every=1)],
|
||||
# accelerator='cpu',
|
||||
# max_epochs=5)
|
||||
# before_n_points = {loc : len(pts) for loc, pts in trainer.solver.problem.input_pts.items()}
|
||||
# trainer.train()
|
||||
# after_n_points = {loc : len(pts) for loc, pts in trainer.solver.problem.input_pts.items()}
|
||||
# assert before_n_points == after_n_points
|
||||
@pytest.mark.parametrize(
|
||||
"condition_to_update", [["D", "g1"], ["D", "g1", "g2", "g3", "g4"]]
|
||||
)
|
||||
def test_sample(condition_to_update):
|
||||
trainer = Trainer(
|
||||
solver=solver,
|
||||
callbacks=[
|
||||
R3Refinement(
|
||||
sample_every=1, condition_to_update=condition_to_update
|
||||
)
|
||||
],
|
||||
accelerator="cpu",
|
||||
max_epochs=5,
|
||||
)
|
||||
before_n_points = {
|
||||
loc: len(trainer.solver.problem.input_pts[loc])
|
||||
for loc in condition_to_update
|
||||
}
|
||||
trainer.train()
|
||||
after_n_points = {
|
||||
loc: len(trainer.data_module.train_dataset.input[loc])
|
||||
for loc in condition_to_update
|
||||
}
|
||||
assert before_n_points == trainer.callbacks[0].initial_population_size
|
||||
assert before_n_points == after_n_points
|
||||
|
||||
@@ -1,135 +0,0 @@
|
||||
import torch
|
||||
import pytest
|
||||
from pina import Condition, LabelTensor, Graph
|
||||
from pina.condition import InputTargetCondition, DomainEquationCondition
|
||||
from pina.graph import RadiusGraph
|
||||
from pina.problem import AbstractProblem, SpatialProblem
|
||||
from pina.domain import CartesianDomain
|
||||
from pina.equation.equation import Equation
|
||||
from pina.equation.equation_factory import FixedValue
|
||||
from pina.operator import laplacian
|
||||
from pina.collector import Collector
|
||||
|
||||
|
||||
def test_supervised_tensor_collector():
|
||||
class SupervisedProblem(AbstractProblem):
|
||||
output_variables = None
|
||||
conditions = {
|
||||
"data1": Condition(
|
||||
input=torch.rand((10, 2)),
|
||||
target=torch.rand((10, 2)),
|
||||
),
|
||||
"data2": Condition(
|
||||
input=torch.rand((20, 2)),
|
||||
target=torch.rand((20, 2)),
|
||||
),
|
||||
"data3": Condition(
|
||||
input=torch.rand((30, 2)),
|
||||
target=torch.rand((30, 2)),
|
||||
),
|
||||
}
|
||||
|
||||
problem = SupervisedProblem()
|
||||
collector = Collector(problem)
|
||||
for v in collector.conditions_name.values():
|
||||
assert v in problem.conditions.keys()
|
||||
|
||||
|
||||
def test_pinn_collector():
|
||||
def laplace_equation(input_, output_):
|
||||
force_term = torch.sin(input_.extract(["x"]) * torch.pi) * torch.sin(
|
||||
input_.extract(["y"]) * torch.pi
|
||||
)
|
||||
delta_u = laplacian(output_.extract(["u"]), input_)
|
||||
return delta_u - force_term
|
||||
|
||||
my_laplace = Equation(laplace_equation)
|
||||
in_ = LabelTensor(
|
||||
torch.tensor([[0.0, 1.0]], requires_grad=True), ["x", "y"]
|
||||
)
|
||||
out_ = LabelTensor(torch.tensor([[0.0]], requires_grad=True), ["u"])
|
||||
|
||||
class Poisson(SpatialProblem):
|
||||
output_variables = ["u"]
|
||||
spatial_domain = CartesianDomain({"x": [0, 1], "y": [0, 1]})
|
||||
|
||||
conditions = {
|
||||
"gamma1": Condition(
|
||||
domain=CartesianDomain({"x": [0, 1], "y": 1}),
|
||||
equation=FixedValue(0.0),
|
||||
),
|
||||
"gamma2": Condition(
|
||||
domain=CartesianDomain({"x": [0, 1], "y": 0}),
|
||||
equation=FixedValue(0.0),
|
||||
),
|
||||
"gamma3": Condition(
|
||||
domain=CartesianDomain({"x": 1, "y": [0, 1]}),
|
||||
equation=FixedValue(0.0),
|
||||
),
|
||||
"gamma4": Condition(
|
||||
domain=CartesianDomain({"x": 0, "y": [0, 1]}),
|
||||
equation=FixedValue(0.0),
|
||||
),
|
||||
"D": Condition(
|
||||
domain=CartesianDomain({"x": [0, 1], "y": [0, 1]}),
|
||||
equation=my_laplace,
|
||||
),
|
||||
"data": Condition(input=in_, target=out_),
|
||||
}
|
||||
|
||||
def poisson_sol(self, pts):
|
||||
return -(
|
||||
torch.sin(pts.extract(["x"]) * torch.pi)
|
||||
* torch.sin(pts.extract(["y"]) * torch.pi)
|
||||
) / (2 * torch.pi**2)
|
||||
|
||||
truth_solution = poisson_sol
|
||||
|
||||
problem = Poisson()
|
||||
boundaries = ["gamma1", "gamma2", "gamma3", "gamma4"]
|
||||
problem.discretise_domain(10, "grid", domains=boundaries)
|
||||
problem.discretise_domain(10, "grid", domains="D")
|
||||
|
||||
collector = Collector(problem)
|
||||
collector.store_fixed_data()
|
||||
collector.store_sample_domains()
|
||||
|
||||
for k, v in problem.conditions.items():
|
||||
if isinstance(v, InputTargetCondition):
|
||||
assert list(collector.data_collections[k].keys()) == [
|
||||
"input",
|
||||
"target",
|
||||
]
|
||||
|
||||
for k, v in problem.conditions.items():
|
||||
if isinstance(v, DomainEquationCondition):
|
||||
assert list(collector.data_collections[k].keys()) == [
|
||||
"input",
|
||||
"equation",
|
||||
]
|
||||
|
||||
|
||||
def test_supervised_graph_collector():
|
||||
pos = torch.rand((100, 3))
|
||||
x = [torch.rand((100, 3)) for _ in range(10)]
|
||||
graph_list_1 = [RadiusGraph(pos=pos, radius=0.4, x=x_) for x_ in x]
|
||||
out_1 = torch.rand((10, 100, 3))
|
||||
|
||||
pos = torch.rand((50, 3))
|
||||
x = [torch.rand((50, 3)) for _ in range(10)]
|
||||
graph_list_2 = [RadiusGraph(pos=pos, radius=0.4, x=x_) for x_ in x]
|
||||
out_2 = torch.rand((10, 50, 3))
|
||||
|
||||
class SupervisedProblem(AbstractProblem):
|
||||
output_variables = None
|
||||
conditions = {
|
||||
"data1": Condition(input=graph_list_1, target=out_1),
|
||||
"data2": Condition(input=graph_list_2, target=out_2),
|
||||
}
|
||||
|
||||
problem = SupervisedProblem()
|
||||
collector = Collector(problem)
|
||||
collector.store_fixed_data()
|
||||
# assert all(collector._is_conditions_ready.values())
|
||||
for v in collector.conditions_name.values():
|
||||
assert v in problem.conditions.keys()
|
||||
59
tests/test_messagepassing/test_deep_tensor_network_block.py
Normal file
59
tests/test_messagepassing/test_deep_tensor_network_block.py
Normal file
@@ -0,0 +1,59 @@
|
||||
import pytest
|
||||
import torch
|
||||
from pina.model.block.message_passing import DeepTensorNetworkBlock
|
||||
|
||||
# Data for testing
|
||||
x = torch.rand(10, 3)
|
||||
edge_index = torch.randint(0, 10, (2, 20))
|
||||
edge_attr = torch.randn(20, 2)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("node_feature_dim", [1, 3])
|
||||
@pytest.mark.parametrize("edge_feature_dim", [3, 5])
|
||||
def test_constructor(node_feature_dim, edge_feature_dim):
|
||||
|
||||
DeepTensorNetworkBlock(
|
||||
node_feature_dim=node_feature_dim,
|
||||
edge_feature_dim=edge_feature_dim,
|
||||
)
|
||||
|
||||
# Should fail if node_feature_dim is negative
|
||||
with pytest.raises(AssertionError):
|
||||
DeepTensorNetworkBlock(
|
||||
node_feature_dim=-1, edge_feature_dim=edge_feature_dim
|
||||
)
|
||||
|
||||
# Should fail if edge_feature_dim is negative
|
||||
with pytest.raises(AssertionError):
|
||||
DeepTensorNetworkBlock(
|
||||
node_feature_dim=node_feature_dim, edge_feature_dim=-1
|
||||
)
|
||||
|
||||
|
||||
def test_forward():
|
||||
|
||||
model = DeepTensorNetworkBlock(
|
||||
node_feature_dim=x.shape[1],
|
||||
edge_feature_dim=edge_attr.shape[1],
|
||||
)
|
||||
|
||||
output_ = model(edge_index=edge_index, x=x, edge_attr=edge_attr)
|
||||
assert output_.shape == x.shape
|
||||
|
||||
|
||||
def test_backward():
|
||||
|
||||
model = DeepTensorNetworkBlock(
|
||||
node_feature_dim=x.shape[1],
|
||||
edge_feature_dim=edge_attr.shape[1],
|
||||
)
|
||||
|
||||
output_ = model(
|
||||
edge_index=edge_index,
|
||||
x=x.requires_grad_(),
|
||||
edge_attr=edge_attr.requires_grad_(),
|
||||
)
|
||||
|
||||
loss = torch.mean(output_)
|
||||
loss.backward()
|
||||
assert x.grad.shape == x.shape
|
||||
165
tests/test_messagepassing/test_equivariant_network_block.py
Normal file
165
tests/test_messagepassing/test_equivariant_network_block.py
Normal file
@@ -0,0 +1,165 @@
|
||||
import pytest
|
||||
import torch
|
||||
from pina.model.block.message_passing import EnEquivariantNetworkBlock
|
||||
|
||||
# Data for testing
|
||||
x = torch.rand(10, 4)
|
||||
pos = torch.rand(10, 3)
|
||||
edge_index = torch.randint(0, 10, (2, 20))
|
||||
edge_attr = torch.randn(20, 2)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("node_feature_dim", [1, 3])
|
||||
@pytest.mark.parametrize("edge_feature_dim", [0, 2])
|
||||
@pytest.mark.parametrize("pos_dim", [2, 3])
|
||||
def test_constructor(node_feature_dim, edge_feature_dim, pos_dim):
|
||||
|
||||
EnEquivariantNetworkBlock(
|
||||
node_feature_dim=node_feature_dim,
|
||||
edge_feature_dim=edge_feature_dim,
|
||||
pos_dim=pos_dim,
|
||||
hidden_dim=64,
|
||||
n_message_layers=2,
|
||||
n_update_layers=2,
|
||||
)
|
||||
|
||||
# Should fail if node_feature_dim is negative
|
||||
with pytest.raises(AssertionError):
|
||||
EnEquivariantNetworkBlock(
|
||||
node_feature_dim=-1,
|
||||
edge_feature_dim=edge_feature_dim,
|
||||
pos_dim=pos_dim,
|
||||
)
|
||||
|
||||
# Should fail if edge_feature_dim is negative
|
||||
with pytest.raises(AssertionError):
|
||||
EnEquivariantNetworkBlock(
|
||||
node_feature_dim=node_feature_dim,
|
||||
edge_feature_dim=-1,
|
||||
pos_dim=pos_dim,
|
||||
)
|
||||
|
||||
# Should fail if pos_dim is negative
|
||||
with pytest.raises(AssertionError):
|
||||
EnEquivariantNetworkBlock(
|
||||
node_feature_dim=node_feature_dim,
|
||||
edge_feature_dim=edge_feature_dim,
|
||||
pos_dim=-1,
|
||||
)
|
||||
|
||||
# Should fail if hidden_dim is negative
|
||||
with pytest.raises(AssertionError):
|
||||
EnEquivariantNetworkBlock(
|
||||
node_feature_dim=node_feature_dim,
|
||||
edge_feature_dim=edge_feature_dim,
|
||||
pos_dim=pos_dim,
|
||||
hidden_dim=-1,
|
||||
)
|
||||
|
||||
# Should fail if n_message_layers is negative
|
||||
with pytest.raises(AssertionError):
|
||||
EnEquivariantNetworkBlock(
|
||||
node_feature_dim=node_feature_dim,
|
||||
edge_feature_dim=edge_feature_dim,
|
||||
pos_dim=pos_dim,
|
||||
n_message_layers=-1,
|
||||
)
|
||||
|
||||
# Should fail if n_update_layers is negative
|
||||
with pytest.raises(AssertionError):
|
||||
EnEquivariantNetworkBlock(
|
||||
node_feature_dim=node_feature_dim,
|
||||
edge_feature_dim=edge_feature_dim,
|
||||
pos_dim=pos_dim,
|
||||
n_update_layers=-1,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("edge_feature_dim", [0, 2])
|
||||
def test_forward(edge_feature_dim):
|
||||
|
||||
model = EnEquivariantNetworkBlock(
|
||||
node_feature_dim=x.shape[1],
|
||||
edge_feature_dim=edge_feature_dim,
|
||||
pos_dim=pos.shape[1],
|
||||
hidden_dim=64,
|
||||
n_message_layers=2,
|
||||
n_update_layers=2,
|
||||
)
|
||||
|
||||
if edge_feature_dim == 0:
|
||||
output_ = model(edge_index=edge_index, x=x, pos=pos)
|
||||
else:
|
||||
output_ = model(
|
||||
edge_index=edge_index, x=x, pos=pos, edge_attr=edge_attr
|
||||
)
|
||||
|
||||
assert output_[0].shape == x.shape
|
||||
assert output_[1].shape == pos.shape
|
||||
|
||||
|
||||
@pytest.mark.parametrize("edge_feature_dim", [0, 2])
|
||||
def test_backward(edge_feature_dim):
|
||||
|
||||
model = EnEquivariantNetworkBlock(
|
||||
node_feature_dim=x.shape[1],
|
||||
edge_feature_dim=edge_feature_dim,
|
||||
pos_dim=pos.shape[1],
|
||||
hidden_dim=64,
|
||||
n_message_layers=2,
|
||||
n_update_layers=2,
|
||||
)
|
||||
|
||||
if edge_feature_dim == 0:
|
||||
output_ = model(
|
||||
edge_index=edge_index,
|
||||
x=x.requires_grad_(),
|
||||
pos=pos.requires_grad_(),
|
||||
)
|
||||
else:
|
||||
output_ = model(
|
||||
edge_index=edge_index,
|
||||
x=x.requires_grad_(),
|
||||
pos=pos.requires_grad_(),
|
||||
edge_attr=edge_attr.requires_grad_(),
|
||||
)
|
||||
|
||||
loss = torch.mean(output_[0])
|
||||
loss.backward()
|
||||
assert x.grad.shape == x.shape
|
||||
assert pos.grad.shape == pos.shape
|
||||
|
||||
|
||||
def test_equivariance():
|
||||
|
||||
# Graph to be fully connected and undirected
|
||||
edge_index = torch.combinations(torch.arange(x.shape[0]), r=2).T
|
||||
edge_index = torch.cat([edge_index, edge_index.flip(0)], dim=1)
|
||||
|
||||
# Random rotation (det(rotation) should be 1)
|
||||
rotation = torch.linalg.qr(torch.rand(pos.shape[-1], pos.shape[-1])).Q
|
||||
if torch.det(rotation) < 0:
|
||||
rotation[:, 0] *= -1
|
||||
|
||||
# Random translation
|
||||
translation = torch.rand(1, pos.shape[-1])
|
||||
|
||||
model = EnEquivariantNetworkBlock(
|
||||
node_feature_dim=x.shape[1],
|
||||
edge_feature_dim=0,
|
||||
pos_dim=pos.shape[1],
|
||||
hidden_dim=64,
|
||||
n_message_layers=2,
|
||||
n_update_layers=2,
|
||||
).eval()
|
||||
|
||||
h1, pos1 = model(edge_index=edge_index, x=x, pos=pos)
|
||||
h2, pos2 = model(
|
||||
edge_index=edge_index, x=x, pos=pos @ rotation.T + translation
|
||||
)
|
||||
|
||||
# Transform model output
|
||||
pos1_transformed = (pos1 @ rotation.T) + translation
|
||||
|
||||
assert torch.allclose(pos2, pos1_transformed, atol=1e-5)
|
||||
assert torch.allclose(h1, h2, atol=1e-5)
|
||||
84
tests/test_messagepassing/test_interaction_network_block.py
Normal file
84
tests/test_messagepassing/test_interaction_network_block.py
Normal file
@@ -0,0 +1,84 @@
|
||||
import pytest
|
||||
import torch
|
||||
from pina.model.block.message_passing import InteractionNetworkBlock
|
||||
|
||||
# Data for testing
|
||||
x = torch.rand(10, 3)
|
||||
edge_index = torch.randint(0, 10, (2, 20))
|
||||
edge_attr = torch.randn(20, 2)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("node_feature_dim", [1, 3])
|
||||
@pytest.mark.parametrize("edge_feature_dim", [0, 2])
|
||||
def test_constructor(node_feature_dim, edge_feature_dim):
|
||||
|
||||
InteractionNetworkBlock(
|
||||
node_feature_dim=node_feature_dim,
|
||||
edge_feature_dim=edge_feature_dim,
|
||||
hidden_dim=64,
|
||||
n_message_layers=2,
|
||||
n_update_layers=2,
|
||||
)
|
||||
|
||||
# Should fail if node_feature_dim is negative
|
||||
with pytest.raises(AssertionError):
|
||||
InteractionNetworkBlock(node_feature_dim=-1)
|
||||
|
||||
# Should fail if edge_feature_dim is negative
|
||||
with pytest.raises(AssertionError):
|
||||
InteractionNetworkBlock(node_feature_dim=3, edge_feature_dim=-1)
|
||||
|
||||
# Should fail if hidden_dim is negative
|
||||
with pytest.raises(AssertionError):
|
||||
InteractionNetworkBlock(node_feature_dim=3, hidden_dim=-1)
|
||||
|
||||
# Should fail if n_message_layers is negative
|
||||
with pytest.raises(AssertionError):
|
||||
InteractionNetworkBlock(node_feature_dim=3, n_message_layers=-1)
|
||||
|
||||
# Should fail if n_update_layers is negative
|
||||
with pytest.raises(AssertionError):
|
||||
InteractionNetworkBlock(node_feature_dim=3, n_update_layers=-1)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("edge_feature_dim", [0, 2])
|
||||
def test_forward(edge_feature_dim):
|
||||
|
||||
model = InteractionNetworkBlock(
|
||||
node_feature_dim=x.shape[1],
|
||||
edge_feature_dim=edge_feature_dim,
|
||||
hidden_dim=64,
|
||||
n_message_layers=2,
|
||||
n_update_layers=2,
|
||||
)
|
||||
|
||||
if edge_feature_dim == 0:
|
||||
output_ = model(edge_index=edge_index, x=x)
|
||||
else:
|
||||
output_ = model(edge_index=edge_index, x=x, edge_attr=edge_attr)
|
||||
assert output_.shape == x.shape
|
||||
|
||||
|
||||
@pytest.mark.parametrize("edge_feature_dim", [0, 2])
|
||||
def test_backward(edge_feature_dim):
|
||||
|
||||
model = InteractionNetworkBlock(
|
||||
node_feature_dim=x.shape[1],
|
||||
edge_feature_dim=edge_feature_dim,
|
||||
hidden_dim=64,
|
||||
n_message_layers=2,
|
||||
n_update_layers=2,
|
||||
)
|
||||
|
||||
if edge_feature_dim == 0:
|
||||
output_ = model(edge_index=edge_index, x=x.requires_grad_())
|
||||
else:
|
||||
output_ = model(
|
||||
edge_index=edge_index,
|
||||
x=x.requires_grad_(),
|
||||
edge_attr=edge_attr.requires_grad_(),
|
||||
)
|
||||
|
||||
loss = torch.mean(output_)
|
||||
loss.backward()
|
||||
assert x.grad.shape == x.shape
|
||||
92
tests/test_messagepassing/test_radial_field_network_block.py
Normal file
92
tests/test_messagepassing/test_radial_field_network_block.py
Normal file
@@ -0,0 +1,92 @@
|
||||
import pytest
|
||||
import torch
|
||||
from pina.model.block.message_passing import RadialFieldNetworkBlock
|
||||
|
||||
# Data for testing
|
||||
x = torch.rand(10, 3)
|
||||
edge_index = torch.randint(0, 10, (2, 20))
|
||||
|
||||
|
||||
@pytest.mark.parametrize("node_feature_dim", [1, 3])
|
||||
def test_constructor(node_feature_dim):
|
||||
|
||||
RadialFieldNetworkBlock(
|
||||
node_feature_dim=node_feature_dim,
|
||||
hidden_dim=64,
|
||||
n_layers=2,
|
||||
)
|
||||
|
||||
# Should fail if node_feature_dim is negative
|
||||
with pytest.raises(AssertionError):
|
||||
RadialFieldNetworkBlock(
|
||||
node_feature_dim=-1,
|
||||
hidden_dim=64,
|
||||
n_layers=2,
|
||||
)
|
||||
|
||||
# Should fail if hidden_dim is negative
|
||||
with pytest.raises(AssertionError):
|
||||
RadialFieldNetworkBlock(
|
||||
node_feature_dim=node_feature_dim,
|
||||
hidden_dim=-1,
|
||||
n_layers=2,
|
||||
)
|
||||
|
||||
# Should fail if n_layers is negative
|
||||
with pytest.raises(AssertionError):
|
||||
RadialFieldNetworkBlock(
|
||||
node_feature_dim=node_feature_dim,
|
||||
hidden_dim=64,
|
||||
n_layers=-1,
|
||||
)
|
||||
|
||||
|
||||
def test_forward():
|
||||
|
||||
model = RadialFieldNetworkBlock(
|
||||
node_feature_dim=x.shape[1],
|
||||
hidden_dim=64,
|
||||
n_layers=2,
|
||||
)
|
||||
|
||||
output_ = model(edge_index=edge_index, x=x)
|
||||
assert output_.shape == x.shape
|
||||
|
||||
|
||||
def test_backward():
|
||||
|
||||
model = RadialFieldNetworkBlock(
|
||||
node_feature_dim=x.shape[1],
|
||||
hidden_dim=64,
|
||||
n_layers=2,
|
||||
)
|
||||
|
||||
output_ = model(edge_index=edge_index, x=x.requires_grad_())
|
||||
loss = torch.mean(output_)
|
||||
loss.backward()
|
||||
assert x.grad.shape == x.shape
|
||||
|
||||
|
||||
def test_equivariance():
|
||||
|
||||
# Graph to be fully connected and undirected
|
||||
edge_index = torch.combinations(torch.arange(x.shape[0]), r=2).T
|
||||
edge_index = torch.cat([edge_index, edge_index.flip(0)], dim=1)
|
||||
|
||||
# Random rotation (det(rotation) should be 1)
|
||||
rotation = torch.linalg.qr(torch.rand(x.shape[-1], x.shape[-1])).Q
|
||||
if torch.det(rotation) < 0:
|
||||
rotation[:, 0] *= -1
|
||||
|
||||
# Random translation
|
||||
translation = torch.rand(1, x.shape[-1])
|
||||
|
||||
model = RadialFieldNetworkBlock(node_feature_dim=x.shape[1]).eval()
|
||||
|
||||
pos1 = model(edge_index=edge_index, x=x)
|
||||
pos2 = model(edge_index=edge_index, x=x @ rotation.T + translation)
|
||||
|
||||
# Transform model output
|
||||
pos1_transformed = (pos1 @ rotation.T) + translation
|
||||
|
||||
assert torch.allclose(pos2, pos1_transformed, atol=1e-5)
|
||||
@@ -296,22 +296,183 @@ def test_laplacian(f):
|
||||
laplacian(output_=output_, input_=input_, components=["a", "b", "c"])
|
||||
|
||||
|
||||
def test_advection():
|
||||
def test_advection_scalar():
|
||||
|
||||
# Define input and output
|
||||
# Define 3-dimensional input
|
||||
input_ = torch.rand((20, 3), requires_grad=True)
|
||||
input_ = LabelTensor(input_, ["x", "y", "z"])
|
||||
output_ = LabelTensor(input_**2, ["u", "v", "c"])
|
||||
|
||||
# Define the velocity field
|
||||
velocity = output_.extract(["c"])
|
||||
# Define 3-dimensional velocity field and quantity to be advected
|
||||
velocity = torch.rand((20, 3), requires_grad=True)
|
||||
field = torch.sum(input_**2, dim=-1, keepdim=True)
|
||||
|
||||
# Compute the true advection and the pina advection
|
||||
pina_advection = advection(
|
||||
output_=output_, input_=input_, velocity_field="c"
|
||||
# Combine velocity and field into a LabelTensor
|
||||
labels = ["ux", "uy", "uz", "c"]
|
||||
output_ = LabelTensor(torch.cat((velocity, field), dim=1), labels)
|
||||
|
||||
# Compute the pina advection
|
||||
components = ["c"]
|
||||
pina_adv = advection(
|
||||
output_=output_,
|
||||
input_=input_,
|
||||
velocity_field=["ux", "uy", "uz"],
|
||||
components=components,
|
||||
d=["x", "y", "z"],
|
||||
)
|
||||
true_advection = velocity * 2 * input_.extract(["x", "y"])
|
||||
|
||||
# Check the shape of the advection
|
||||
assert pina_advection.shape == (*output_.shape[:-1], output_.shape[-1] - 1)
|
||||
assert torch.allclose(pina_advection, true_advection)
|
||||
# Compute the true advection
|
||||
grads = 2 * input_
|
||||
true_adv = torch.sum(grads * velocity, dim=grads.ndim - 1, keepdim=True)
|
||||
|
||||
# Check the shape, labels, and value of the advection
|
||||
assert pina_adv.shape == (*output_.shape[:-1], len(components))
|
||||
assert pina_adv.labels == ["adv_c"]
|
||||
assert torch.allclose(pina_adv, true_adv)
|
||||
|
||||
# Should fail if input not a LabelTensor
|
||||
with pytest.raises(TypeError):
|
||||
advection(
|
||||
output_=output_,
|
||||
input_=input_.tensor,
|
||||
velocity_field=["ux", "uy", "uz"],
|
||||
)
|
||||
|
||||
# Should fail if output not a LabelTensor
|
||||
with pytest.raises(TypeError):
|
||||
advection(
|
||||
output_=output_.tensor,
|
||||
input_=input_,
|
||||
velocity_field=["ux", "uy", "uz"],
|
||||
)
|
||||
|
||||
# Should fail for non-existent input labels
|
||||
with pytest.raises(RuntimeError):
|
||||
advection(
|
||||
output_=output_,
|
||||
input_=input_,
|
||||
d=["x", "a"],
|
||||
velocity_field=["ux", "uy", "uz"],
|
||||
)
|
||||
|
||||
# Should fail for non-existent output labels
|
||||
with pytest.raises(RuntimeError):
|
||||
advection(
|
||||
output_=output_,
|
||||
input_=input_,
|
||||
components=["a", "b", "c"],
|
||||
velocity_field=["ux", "uy", "uz"],
|
||||
)
|
||||
|
||||
# Should fail if velocity_field labels are not present in the output labels
|
||||
with pytest.raises(RuntimeError):
|
||||
advection(
|
||||
output_=output_,
|
||||
input_=input_,
|
||||
velocity_field=["ux", "uy", "nonexistent"],
|
||||
components=["c"],
|
||||
)
|
||||
|
||||
# Should fail if velocity_field dimensionality does not match input tensor
|
||||
with pytest.raises(RuntimeError):
|
||||
advection(
|
||||
output_=output_,
|
||||
input_=input_,
|
||||
velocity_field=["ux", "uy"],
|
||||
components=["c"],
|
||||
)
|
||||
|
||||
|
||||
def test_advection_vector():
|
||||
|
||||
# Define 3-dimensional input
|
||||
input_ = torch.rand((20, 3), requires_grad=True)
|
||||
input_ = LabelTensor(input_, ["x", "y", "z"])
|
||||
|
||||
# Define 3-dimensional velocity field
|
||||
velocity = torch.rand((20, 3), requires_grad=True)
|
||||
|
||||
# Define 2-dimensional field to be advected
|
||||
field_1 = torch.sum(input_**2, dim=-1, keepdim=True)
|
||||
field_2 = torch.sum(input_**3, dim=-1, keepdim=True)
|
||||
|
||||
# Combine velocity and field into a LabelTensor
|
||||
labels = ["ux", "uy", "uz", "c1", "c2"]
|
||||
output_ = LabelTensor(
|
||||
torch.cat((velocity, field_1, field_2), dim=1), labels
|
||||
)
|
||||
|
||||
# Compute the pina advection
|
||||
components = ["c1", "c2"]
|
||||
pina_adv = advection(
|
||||
output_=output_,
|
||||
input_=input_,
|
||||
velocity_field=["ux", "uy", "uz"],
|
||||
components=components,
|
||||
d=["x", "y", "z"],
|
||||
)
|
||||
|
||||
# Compute the true gradients of the fields "c1", "c2"
|
||||
grads1 = 2 * input_
|
||||
grads2 = 3 * input_**2
|
||||
|
||||
# Compute the true advection for each field
|
||||
true_adv1 = torch.sum(grads1 * velocity, dim=grads1.ndim - 1, keepdim=True)
|
||||
true_adv2 = torch.sum(grads2 * velocity, dim=grads2.ndim - 1, keepdim=True)
|
||||
true_adv = torch.cat((true_adv1, true_adv2), dim=-1)
|
||||
|
||||
# Check the shape, labels, and value of the advection
|
||||
assert pina_adv.shape == (*output_.shape[:-1], len(components))
|
||||
assert pina_adv.labels == ["adv_c1", "adv_c2"]
|
||||
assert torch.allclose(pina_adv, true_adv)
|
||||
|
||||
# Should fail if input not a LabelTensor
|
||||
with pytest.raises(TypeError):
|
||||
advection(
|
||||
output_=output_,
|
||||
input_=input_.tensor,
|
||||
velocity_field=["ux", "uy", "uz"],
|
||||
)
|
||||
|
||||
# Should fail if output not a LabelTensor
|
||||
with pytest.raises(TypeError):
|
||||
advection(
|
||||
output_=output_.tensor,
|
||||
input_=input_,
|
||||
velocity_field=["ux", "uy", "uz"],
|
||||
)
|
||||
|
||||
# Should fail for non-existent input labels
|
||||
with pytest.raises(RuntimeError):
|
||||
advection(
|
||||
output_=output_,
|
||||
input_=input_,
|
||||
d=["x", "a"],
|
||||
velocity_field=["ux", "uy", "uz"],
|
||||
)
|
||||
|
||||
# Should fail for non-existent output labels
|
||||
with pytest.raises(RuntimeError):
|
||||
advection(
|
||||
output_=output_,
|
||||
input_=input_,
|
||||
components=["a", "b", "c"],
|
||||
velocity_field=["ux", "uy", "uz"],
|
||||
)
|
||||
|
||||
# Should fail if velocity_field labels are not present in the output labels
|
||||
with pytest.raises(RuntimeError):
|
||||
advection(
|
||||
output_=output_,
|
||||
input_=input_,
|
||||
velocity_field=["ux", "uy", "nonexistent"],
|
||||
components=["c"],
|
||||
)
|
||||
|
||||
# Should fail if velocity_field dimensionality does not match input tensor
|
||||
with pytest.raises(RuntimeError):
|
||||
advection(
|
||||
output_=output_,
|
||||
input_=input_,
|
||||
velocity_field=["ux", "uy"],
|
||||
components=["c"],
|
||||
)
|
||||
|
||||
@@ -4,6 +4,11 @@ from pina.problem.zoo import Poisson2DSquareProblem as Poisson
|
||||
from pina import LabelTensor
|
||||
from pina.domain import Union
|
||||
from pina.domain import CartesianDomain
|
||||
from pina.condition import (
|
||||
Condition,
|
||||
InputTargetCondition,
|
||||
DomainEquationCondition,
|
||||
)
|
||||
|
||||
|
||||
def test_discretise_domain():
|
||||
@@ -45,6 +50,24 @@ def test_variables_correct_order_sampling():
|
||||
)
|
||||
|
||||
|
||||
def test_input_pts():
|
||||
n = 10
|
||||
poisson_problem = Poisson()
|
||||
poisson_problem.discretise_domain(n, "grid")
|
||||
assert sorted(list(poisson_problem.input_pts.keys())) == sorted(
|
||||
list(poisson_problem.conditions.keys())
|
||||
)
|
||||
|
||||
|
||||
def test_collected_data():
|
||||
n = 10
|
||||
poisson_problem = Poisson()
|
||||
poisson_problem.discretise_domain(n, "grid")
|
||||
assert sorted(list(poisson_problem.collected_data.keys())) == sorted(
|
||||
list(poisson_problem.conditions.keys())
|
||||
)
|
||||
|
||||
|
||||
def test_add_points():
|
||||
poisson_problem = Poisson()
|
||||
poisson_problem.discretise_domain(0, "random", domains=["D"])
|
||||
@@ -84,3 +107,23 @@ def test_wrong_custom_sampling_logic(mode):
|
||||
}
|
||||
with pytest.raises(RuntimeError):
|
||||
poisson_problem.discretise_domain(sample_rules=sampling_rules)
|
||||
|
||||
|
||||
def test_aggregate_data():
|
||||
poisson_problem = Poisson()
|
||||
poisson_problem.conditions["data"] = Condition(
|
||||
input=LabelTensor(torch.tensor([[0.0, 1.0]]), labels=["x", "y"]),
|
||||
target=LabelTensor(torch.tensor([[0.0]]), labels=["u"]),
|
||||
)
|
||||
poisson_problem.discretise_domain(0, "random", domains="all")
|
||||
poisson_problem.collect_data()
|
||||
assert isinstance(poisson_problem.collected_data, dict)
|
||||
for name, conditions in poisson_problem.conditions.items():
|
||||
assert name in poisson_problem.collected_data.keys()
|
||||
if isinstance(conditions, InputTargetCondition):
|
||||
assert "input" in poisson_problem.collected_data[name].keys()
|
||||
assert "target" in poisson_problem.collected_data[name].keys()
|
||||
elif isinstance(conditions, DomainEquationCondition):
|
||||
assert "input" in poisson_problem.collected_data[name].keys()
|
||||
assert "target" not in poisson_problem.collected_data[name].keys()
|
||||
assert "equation" in poisson_problem.collected_data[name].keys()
|
||||
|
||||
55
tests/test_type_checker.py
Normal file
55
tests/test_type_checker.py
Normal file
@@ -0,0 +1,55 @@
|
||||
import pytest
|
||||
import logging
|
||||
import math
|
||||
from pina.type_checker import enforce_types
|
||||
|
||||
|
||||
# Definition of a test function for arguments
|
||||
@enforce_types
|
||||
def foo_function1(a: int, b: float) -> float:
|
||||
return a + b
|
||||
|
||||
|
||||
# Definition of a test function for return values
|
||||
@enforce_types
|
||||
def foo_function2(a: int, right: bool) -> float:
|
||||
if right:
|
||||
return float(a)
|
||||
else:
|
||||
return "Hello, world!"
|
||||
|
||||
|
||||
def test_argument_type_checking():
|
||||
|
||||
# Setting logging level to INFO, which should not trigger type checking
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
|
||||
# Both should work, even if the arguments are not of the expected type
|
||||
assert math.isclose(foo_function1(a=1, b=2.0), 3.0)
|
||||
assert math.isclose(foo_function1(a=1, b=2), 3.0)
|
||||
|
||||
# Setting logging level to DEBUG, which should trigger type checking
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
|
||||
# The second should fail, as the second argument is an int
|
||||
assert math.isclose(foo_function1(a=1, b=2.0), 3.0)
|
||||
with pytest.raises(TypeError):
|
||||
foo_function1(a=1, b=2)
|
||||
|
||||
|
||||
def test_return_type_checking():
|
||||
|
||||
# Setting logging level to INFO, which should not trigger type checking
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
|
||||
# Both should work, even if the return value is not of the expected type
|
||||
assert math.isclose(foo_function2(a=1, right=True), 1.0)
|
||||
assert foo_function2(a=1, right=False) == "Hello, world!"
|
||||
|
||||
# Setting logging level to DEBUG, which should trigger type checking
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
|
||||
# The second should fail, as the return value is a string
|
||||
assert math.isclose(foo_function2(a=1, right=True), 1.0)
|
||||
with pytest.raises(TypeError):
|
||||
foo_function2(a=1, right=False)
|
||||
@@ -1,12 +1,9 @@
|
||||
import torch
|
||||
|
||||
from pina.utils import merge_tensors
|
||||
from pina.label_tensor import LabelTensor
|
||||
from pina import LabelTensor
|
||||
from pina.domain import EllipsoidDomain, CartesianDomain
|
||||
from pina.utils import check_consistency
|
||||
import pytest
|
||||
from pina.domain import DomainInterface
|
||||
|
||||
from pina import LabelTensor
|
||||
from pina.utils import merge_tensors, check_consistency, check_positive_integer
|
||||
from pina.domain import EllipsoidDomain, CartesianDomain, DomainInterface
|
||||
|
||||
|
||||
def test_merge_tensors():
|
||||
@@ -50,3 +47,24 @@ def test_check_consistency_incorrect():
|
||||
check_consistency(torch.Tensor, DomainInterface, subclass=True)
|
||||
with pytest.raises(ValueError):
|
||||
check_consistency(ellipsoid1, torch.Tensor)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("value", [0, 1, 2, 3, 10])
|
||||
@pytest.mark.parametrize("strict", [True, False])
|
||||
def test_check_positive_integer(value, strict):
|
||||
if value != 0:
|
||||
check_positive_integer(value, strict=strict)
|
||||
else:
|
||||
check_positive_integer(value, strict=False)
|
||||
|
||||
# Should fail if value is negative
|
||||
with pytest.raises(AssertionError):
|
||||
check_positive_integer(-1, strict=strict)
|
||||
|
||||
# Should fail if value is not an integer
|
||||
with pytest.raises(AssertionError):
|
||||
check_positive_integer(1.5, strict=strict)
|
||||
|
||||
# Should fail if value is not a number
|
||||
with pytest.raises(AssertionError):
|
||||
check_positive_integer("string", strict=strict)
|
||||
|
||||
Reference in New Issue
Block a user