Co-authored-by: GiovanniCanali <giovanni.canali98@yahoo.it>
This commit is contained in:
avisquid
2025-10-03 14:37:56 -04:00
committed by GitHub
parent b5e4d13663
commit 2108c76d14
11 changed files with 885 additions and 39 deletions

View File

@@ -5,19 +5,22 @@ from pina.model.block.message_passing import EnEquivariantNetworkBlock
# Data for testing
x = torch.rand(10, 4)
pos = torch.rand(10, 3)
edge_index = torch.randint(0, 10, (2, 20))
edge_attr = torch.randn(20, 2)
velocity = torch.rand(10, 3)
edge_idx = torch.randint(0, 10, (2, 20))
edge_attributes = torch.randn(20, 2)
@pytest.mark.parametrize("node_feature_dim", [1, 3])
@pytest.mark.parametrize("edge_feature_dim", [0, 2])
@pytest.mark.parametrize("pos_dim", [2, 3])
def test_constructor(node_feature_dim, edge_feature_dim, pos_dim):
@pytest.mark.parametrize("use_velocity", [True, False])
def test_constructor(node_feature_dim, edge_feature_dim, pos_dim, use_velocity):
EnEquivariantNetworkBlock(
node_feature_dim=node_feature_dim,
edge_feature_dim=edge_feature_dim,
pos_dim=pos_dim,
use_velocity=use_velocity,
hidden_dim=64,
n_message_layers=2,
n_update_layers=2,
@@ -29,6 +32,7 @@ def test_constructor(node_feature_dim, edge_feature_dim, pos_dim):
node_feature_dim=-1,
edge_feature_dim=edge_feature_dim,
pos_dim=pos_dim,
use_velocity=use_velocity,
)
# Should fail if edge_feature_dim is negative
@@ -37,6 +41,7 @@ def test_constructor(node_feature_dim, edge_feature_dim, pos_dim):
node_feature_dim=node_feature_dim,
edge_feature_dim=-1,
pos_dim=pos_dim,
use_velocity=use_velocity,
)
# Should fail if pos_dim is negative
@@ -45,6 +50,7 @@ def test_constructor(node_feature_dim, edge_feature_dim, pos_dim):
node_feature_dim=node_feature_dim,
edge_feature_dim=edge_feature_dim,
pos_dim=-1,
use_velocity=use_velocity,
)
# Should fail if hidden_dim is negative
@@ -54,6 +60,7 @@ def test_constructor(node_feature_dim, edge_feature_dim, pos_dim):
edge_feature_dim=edge_feature_dim,
pos_dim=pos_dim,
hidden_dim=-1,
use_velocity=use_velocity,
)
# Should fail if n_message_layers is negative
@@ -63,6 +70,7 @@ def test_constructor(node_feature_dim, edge_feature_dim, pos_dim):
edge_feature_dim=edge_feature_dim,
pos_dim=pos_dim,
n_message_layers=-1,
use_velocity=use_velocity,
)
# Should fail if n_update_layers is negative
@@ -72,11 +80,22 @@ def test_constructor(node_feature_dim, edge_feature_dim, pos_dim):
edge_feature_dim=edge_feature_dim,
pos_dim=pos_dim,
n_update_layers=-1,
use_velocity=use_velocity,
)
# Should fail if use_velocity is not boolean
with pytest.raises(ValueError):
EnEquivariantNetworkBlock(
node_feature_dim=node_feature_dim,
edge_feature_dim=edge_feature_dim,
pos_dim=pos_dim,
use_velocity="False",
)
@pytest.mark.parametrize("edge_feature_dim", [0, 2])
def test_forward(edge_feature_dim):
@pytest.mark.parametrize("use_velocity", [True, False])
def test_forward(edge_feature_dim, use_velocity):
model = EnEquivariantNetworkBlock(
node_feature_dim=x.shape[1],
@@ -85,21 +104,26 @@ def test_forward(edge_feature_dim):
hidden_dim=64,
n_message_layers=2,
n_update_layers=2,
use_velocity=use_velocity,
)
if edge_feature_dim == 0:
output_ = model(edge_index=edge_index, x=x, pos=pos)
else:
output_ = model(
edge_index=edge_index, x=x, pos=pos, edge_attr=edge_attr
)
# Manage inputs
vel = velocity if use_velocity else None
edge_attr = edge_attributes if edge_feature_dim > 0 else None
# Checks on output shapes
output_ = model(
x=x, pos=pos, edge_index=edge_idx, edge_attr=edge_attr, vel=vel
)
assert output_[0].shape == x.shape
assert output_[1].shape == pos.shape
if vel is not None:
assert output_[2].shape == vel.shape
@pytest.mark.parametrize("edge_feature_dim", [0, 2])
def test_backward(edge_feature_dim):
@pytest.mark.parametrize("use_velocity", [True, False])
def test_backward(edge_feature_dim, use_velocity):
model = EnEquivariantNetworkBlock(
node_feature_dim=x.shape[1],
@@ -108,35 +132,45 @@ def test_backward(edge_feature_dim):
hidden_dim=64,
n_message_layers=2,
n_update_layers=2,
use_velocity=use_velocity,
)
# Manage inputs
vel = velocity.requires_grad_() if use_velocity else None
edge_attr = (
edge_attributes.requires_grad_() if edge_feature_dim > 0 else None
)
if edge_feature_dim == 0:
output_ = model(
edge_index=edge_index,
edge_index=edge_idx,
x=x.requires_grad_(),
pos=pos.requires_grad_(),
vel=vel,
)
else:
output_ = model(
edge_index=edge_index,
edge_index=edge_idx,
x=x.requires_grad_(),
pos=pos.requires_grad_(),
edge_attr=edge_attr.requires_grad_(),
edge_attr=edge_attr,
vel=vel,
)
loss = torch.mean(output_[0])
# Checks on gradients
loss = sum(torch.mean(output_[i]) for i in range(len(output_)))
loss.backward()
assert x.grad.shape == x.shape
assert pos.grad.shape == pos.shape
if use_velocity:
assert vel.grad.shape == vel.shape
def test_equivariance():
@pytest.mark.parametrize("edge_feature_dim", [0, 2])
@pytest.mark.parametrize("use_velocity", [True, False])
def test_equivariance(edge_feature_dim, use_velocity):
# Graph to be fully connected and undirected
edge_index = torch.combinations(torch.arange(x.shape[0]), r=2).T
edge_index = torch.cat([edge_index, edge_index.flip(0)], dim=1)
# Random rotation (det(rotation) should be 1)
# Random rotation
rotation = torch.linalg.qr(torch.rand(pos.shape[-1], pos.shape[-1])).Q
if torch.det(rotation) < 0:
rotation[:, 0] *= -1
@@ -146,20 +180,37 @@ def test_equivariance():
model = EnEquivariantNetworkBlock(
node_feature_dim=x.shape[1],
edge_feature_dim=0,
edge_feature_dim=edge_feature_dim,
pos_dim=pos.shape[1],
hidden_dim=64,
n_message_layers=2,
n_update_layers=2,
use_velocity=use_velocity,
).eval()
h1, pos1 = model(edge_index=edge_index, x=x, pos=pos)
h2, pos2 = model(
edge_index=edge_index, x=x, pos=pos @ rotation.T + translation
# Manage inputs
vel = velocity if use_velocity else None
edge_attr = edge_attributes if edge_feature_dim > 0 else None
# Transform inputs (no translation for velocity)
pos_rot = pos @ rotation.T + translation
vel_rot = vel @ rotation.T if use_velocity else vel
# Get model outputs
out1 = model(
x=x, pos=pos, edge_index=edge_idx, edge_attr=edge_attr, vel=vel
)
out2 = model(
x=x, pos=pos_rot, edge_index=edge_idx, edge_attr=edge_attr, vel=vel_rot
)
# Transform model output
pos1_transformed = (pos1 @ rotation.T) + translation
# Unpack outputs
h1, pos1, *other1 = out1
h2, pos2, *other2 = out2
if use_velocity:
vel1, vel2 = other1[0], other2[0]
assert torch.allclose(pos2, pos1_transformed, atol=1e-5)
assert torch.allclose(pos2, pos1 @ rotation.T + translation, atol=1e-5)
assert torch.allclose(h1, h2, atol=1e-5)
if vel is not None:
assert torch.allclose(vel2, vel1 @ rotation.T, atol=1e-5)

View File

@@ -0,0 +1,132 @@
import pytest
import torch
from pina.model.block.message_passing import EquivariantGraphNeuralOperatorBlock
# Data for testing. Shapes: (time, nodes, features)
x = torch.rand(5, 10, 4)
pos = torch.rand(5, 10, 3)
vel = torch.rand(5, 10, 3)
# Edge index and attributes
edge_idx = torch.randint(0, 10, (2, 20))
edge_attributes = torch.randn(20, 2)
@pytest.mark.parametrize("node_feature_dim", [1, 3])
@pytest.mark.parametrize("edge_feature_dim", [0, 2])
@pytest.mark.parametrize("pos_dim", [2, 3])
@pytest.mark.parametrize("modes", [1, 5])
def test_constructor(node_feature_dim, edge_feature_dim, pos_dim, modes):
EquivariantGraphNeuralOperatorBlock(
node_feature_dim=node_feature_dim,
edge_feature_dim=edge_feature_dim,
pos_dim=pos_dim,
modes=modes,
)
# Should fail if modes is negative
with pytest.raises(AssertionError):
EquivariantGraphNeuralOperatorBlock(
node_feature_dim=node_feature_dim,
edge_feature_dim=edge_feature_dim,
pos_dim=pos_dim,
modes=-1,
)
@pytest.mark.parametrize("modes", [1, 5])
def test_forward(modes):
model = EquivariantGraphNeuralOperatorBlock(
node_feature_dim=x.shape[2],
edge_feature_dim=edge_attributes.shape[1],
pos_dim=pos.shape[2],
modes=modes,
)
output_ = model(
x=x,
pos=pos,
vel=vel,
edge_index=edge_idx,
edge_attr=edge_attributes,
)
# Checks on output shapes
assert output_[0].shape == x.shape
assert output_[1].shape == pos.shape
assert output_[2].shape == vel.shape
@pytest.mark.parametrize("modes", [1, 5])
def test_backward(modes):
model = EquivariantGraphNeuralOperatorBlock(
node_feature_dim=x.shape[2],
edge_feature_dim=edge_attributes.shape[1],
pos_dim=pos.shape[2],
modes=modes,
)
output_ = model(
x=x.requires_grad_(),
pos=pos.requires_grad_(),
vel=vel.requires_grad_(),
edge_index=edge_idx,
edge_attr=edge_attributes.requires_grad_(),
)
# Checks on gradients
loss = sum(torch.mean(output_[i]) for i in range(len(output_)))
loss.backward()
assert x.grad.shape == x.shape
assert pos.grad.shape == pos.shape
assert vel.grad.shape == vel.shape
@pytest.mark.parametrize("modes", [1, 5])
def test_equivariance(modes):
# Random rotation
rotation = torch.linalg.qr(torch.rand(pos.shape[2], pos.shape[2])).Q
if torch.det(rotation) < 0:
rotation[:, 0] *= -1
# Random translation
translation = torch.rand(1, pos.shape[2])
model = EquivariantGraphNeuralOperatorBlock(
node_feature_dim=x.shape[2],
edge_feature_dim=edge_attributes.shape[1],
pos_dim=pos.shape[2],
modes=modes,
).eval()
# Transform inputs (no translation for velocity)
pos_rot = pos @ rotation.T + translation
vel_rot = vel @ rotation.T
# Get model outputs
out1 = model(
x=x,
pos=pos,
vel=vel,
edge_index=edge_idx,
edge_attr=edge_attributes,
)
out2 = model(
x=x,
pos=pos_rot,
vel=vel_rot,
edge_index=edge_idx,
edge_attr=edge_attributes,
)
# Unpack outputs
h1, pos1, vel1 = out1
h2, pos2, vel2 = out2
assert torch.allclose(pos2, pos1 @ rotation.T + translation, atol=1e-5)
assert torch.allclose(vel2, vel1 @ rotation.T, atol=1e-5)
assert torch.allclose(h1, h2, atol=1e-5)