157 lines
4.7 KiB
Python
157 lines
4.7 KiB
Python
import pytest
|
|
import torch
|
|
from torch.nn import Linear
|
|
|
|
from pina import LabelTensor
|
|
from pina.model import DeepONet
|
|
from pina.model import FeedForward
|
|
|
|
data = torch.rand((20, 3))
|
|
input_vars = ["a", "b", "c"]
|
|
input_ = LabelTensor(data, input_vars)
|
|
symbol_funcs_red = DeepONet._symbol_functions(dim=-1)
|
|
output_dims = [1, 5, 10, 20]
|
|
|
|
|
|
def test_constructor():
|
|
branch_net = FeedForward(input_dimensions=1, output_dimensions=10)
|
|
trunk_net = FeedForward(input_dimensions=2, output_dimensions=10)
|
|
DeepONet(
|
|
branch_net=branch_net,
|
|
trunk_net=trunk_net,
|
|
input_indeces_branch_net=["a"],
|
|
input_indeces_trunk_net=["b", "c"],
|
|
reduction="+",
|
|
aggregator="*",
|
|
)
|
|
|
|
|
|
def test_constructor_fails_when_invalid_inner_layer_size():
|
|
branch_net = FeedForward(input_dimensions=1, output_dimensions=10)
|
|
trunk_net = FeedForward(input_dimensions=2, output_dimensions=8)
|
|
with pytest.raises(ValueError):
|
|
DeepONet(
|
|
branch_net=branch_net,
|
|
trunk_net=trunk_net,
|
|
input_indeces_branch_net=["a"],
|
|
input_indeces_trunk_net=["b", "c"],
|
|
reduction="+",
|
|
aggregator="*",
|
|
)
|
|
|
|
|
|
def test_forward_extract_str():
|
|
branch_net = FeedForward(input_dimensions=1, output_dimensions=10)
|
|
trunk_net = FeedForward(input_dimensions=2, output_dimensions=10)
|
|
model = DeepONet(
|
|
branch_net=branch_net,
|
|
trunk_net=trunk_net,
|
|
input_indeces_branch_net=["a"],
|
|
input_indeces_trunk_net=["b", "c"],
|
|
reduction="+",
|
|
aggregator="*",
|
|
)
|
|
model(input_)
|
|
assert model(input_).shape[-1] == 1
|
|
|
|
|
|
def test_forward_extract_int():
|
|
branch_net = FeedForward(input_dimensions=1, output_dimensions=10)
|
|
trunk_net = FeedForward(input_dimensions=2, output_dimensions=10)
|
|
model = DeepONet(
|
|
branch_net=branch_net,
|
|
trunk_net=trunk_net,
|
|
input_indeces_branch_net=[0],
|
|
input_indeces_trunk_net=[1, 2],
|
|
reduction="+",
|
|
aggregator="*",
|
|
)
|
|
model(data)
|
|
|
|
|
|
def test_backward_extract_int():
|
|
data = torch.rand((20, 3))
|
|
branch_net = FeedForward(input_dimensions=1, output_dimensions=10)
|
|
trunk_net = FeedForward(input_dimensions=2, output_dimensions=10)
|
|
model = DeepONet(
|
|
branch_net=branch_net,
|
|
trunk_net=trunk_net,
|
|
input_indeces_branch_net=[0],
|
|
input_indeces_trunk_net=[1, 2],
|
|
reduction="+",
|
|
aggregator="*",
|
|
)
|
|
data.requires_grad = True
|
|
model(data)
|
|
l = torch.mean(model(data))
|
|
l.backward()
|
|
assert data._grad.shape == torch.Size([20, 3])
|
|
|
|
|
|
def test_forward_extract_str_wrong():
|
|
branch_net = FeedForward(input_dimensions=1, output_dimensions=10)
|
|
trunk_net = FeedForward(input_dimensions=2, output_dimensions=10)
|
|
model = DeepONet(
|
|
branch_net=branch_net,
|
|
trunk_net=trunk_net,
|
|
input_indeces_branch_net=["a"],
|
|
input_indeces_trunk_net=["b", "c"],
|
|
reduction="+",
|
|
aggregator="*",
|
|
)
|
|
with pytest.raises(RuntimeError):
|
|
model(data)
|
|
|
|
|
|
def test_backward_extract_str_wrong():
|
|
data = torch.rand((20, 3))
|
|
branch_net = FeedForward(input_dimensions=1, output_dimensions=10)
|
|
trunk_net = FeedForward(input_dimensions=2, output_dimensions=10)
|
|
model = DeepONet(
|
|
branch_net=branch_net,
|
|
trunk_net=trunk_net,
|
|
input_indeces_branch_net=["a"],
|
|
input_indeces_trunk_net=["b", "c"],
|
|
reduction="+",
|
|
aggregator="*",
|
|
)
|
|
data.requires_grad = True
|
|
with pytest.raises(RuntimeError):
|
|
model(data)
|
|
l = torch.mean(model(data))
|
|
l.backward()
|
|
assert data._grad.shape == torch.Size([20, 3])
|
|
|
|
|
|
@pytest.mark.parametrize("red", symbol_funcs_red)
|
|
def test_forward_symbol_funcs(red):
|
|
branch_net = FeedForward(input_dimensions=1, output_dimensions=10)
|
|
trunk_net = FeedForward(input_dimensions=2, output_dimensions=10)
|
|
model = DeepONet(
|
|
branch_net=branch_net,
|
|
trunk_net=trunk_net,
|
|
input_indeces_branch_net=["a"],
|
|
input_indeces_trunk_net=["b", "c"],
|
|
reduction=red,
|
|
aggregator="*",
|
|
)
|
|
model(input_)
|
|
assert model(input_).shape[-1] == 1
|
|
|
|
|
|
@pytest.mark.parametrize("out_dim", output_dims)
|
|
def test_forward_callable_reduction(out_dim):
|
|
branch_net = FeedForward(input_dimensions=1, output_dimensions=10)
|
|
trunk_net = FeedForward(input_dimensions=2, output_dimensions=10)
|
|
reduction_layer = Linear(10, out_dim)
|
|
model = DeepONet(
|
|
branch_net=branch_net,
|
|
trunk_net=trunk_net,
|
|
input_indeces_branch_net=["a"],
|
|
input_indeces_trunk_net=["b", "c"],
|
|
reduction=reduction_layer,
|
|
aggregator="*",
|
|
)
|
|
model(input_)
|
|
assert model(input_).shape[-1] == out_dim
|