implement first GNO

This commit is contained in:
FilippoOlivo
2025-09-25 14:44:39 +02:00
parent d53b076ecc
commit f3be9e99f8
5 changed files with 89 additions and 306 deletions

View File

@@ -0,0 +1,25 @@
from pina.model import GraphNeuralOperator
import torch
from torch_geometric.data import Data
class GNO(torch.nn.Module):
def __init__(
self, x_ch_node, f_ch_node, hidden, layers, edge_ch=0, out_ch=1
):
super().__init__()
lifting_operator = torch.nn.Linear(x_ch_node + f_ch_node, hidden)
self.gno = GraphNeuralOperator(
lifting_operator=lifting_operator,
projection_operator=torch.nn.Linear(hidden, out_ch),
edge_features=edge_ch,
n_layers=layers,
internal_n_layers=2,
shared_weights=False,
)
def forward(self, x, c, edge_index, edge_attr):
x = torch.cat([x, c], dim=-1)
x = Data(x=x, edge_index=edge_index, edge_attr=edge_attr)
return self.gno(x)

View File

@@ -8,19 +8,16 @@ class FiLM(nn.Module):
def __init__(self, c_ch, h_ch):
super().__init__()
self.net = nn.Sequential(
nn.Linear(c_ch, 2*h_ch),
nn.SiLU(),
nn.Linear(2*h_ch, 2*h_ch)
nn.Linear(c_ch, 2 * h_ch), nn.SiLU(), nn.Linear(2 * h_ch, 2 * h_ch)
)
# init to identity: gamma≈0 (so 1+gamma=1), beta=0
nn.init.zeros_(self.net[-1].weight)
nn.init.zeros_(self.net[-1].bias)
self.norm = nn.LayerNorm(h_ch)
def forward(self, h, c):
gb = self.net(c)
gamma, beta = gb.chunk(2, dim=-1)
return (1 + gamma) * self.norm(h) + beta
return (1 + gamma) * h + beta
class ConditionalGNOBlock(MessagePassing):
@@ -28,46 +25,35 @@ class ConditionalGNOBlock(MessagePassing):
Message passing with FiLM applied to the MESSAGE m_ij,
using edge context c_ij = (c_i + c_j)/2.
"""
def __init__(self, hidden_ch, edge_ch=0, aggr="mean"):
def __init__(self, hidden_ch, edge_ch=0, aggr="add"):
super().__init__(aggr=aggr, node_dim=0)
self.pre_norm = nn.LayerNorm(hidden_ch)
# raw message builder
self.msg = nn.Sequential(
nn.Linear(2*hidden_ch + edge_ch, 2*hidden_ch),
nn.SiLU(),
nn.Linear(2*hidden_ch, hidden_ch)
)
# FiLM over the message (per-edge)
self.film_msg = FiLM(c_ch=hidden_ch, h_ch=hidden_ch)
# node update with residual
self.update_mlp = nn.Sequential(
nn.Linear(2*hidden_ch, hidden_ch),
self.edge_attr_net = nn.Sequential(
nn.Linear(edge_ch, hidden_ch // 2),
nn.SiLU(),
nn.Linear(hidden_ch, hidden_ch)
nn.Linear(hidden_ch // 2, hidden_ch),
)
self.x_net = nn.Sequential(
nn.Linear(hidden_ch, hidden_ch * 2),
nn.SiLU(),
nn.Linear(hidden_ch * 2, hidden_ch),
)
def forward(self, x, c, edge_index, edge_attr=None):
# pre-norm helps stability
x_in = x
x = self.pre_norm(x)
m = self.propagate(edge_index, x=x, c=c, edge_attr=edge_attr)
out = self.update_mlp(torch.cat([x_in, m], dim=-1))
return x_in + out # residual
return self.propagate(edge_index, x=x, c=c, edge_attr=edge_attr)
def message(self, x_i, x_j, c_i, c_j, edge_attr):
def update(self, aggr_out, x):
return self.x_net(x) + aggr_out
def message(self, x_j, c_i, c_j, edge_attr):
# c_ij = (c_i + c_j)/2
c_ij = 0.5 * (c_i + c_j)
m = self.film_msg(x_j, c_ij)
if edge_attr is not None:
m_in = torch.cat([x_i, x_j, edge_attr], dim=-1)
else:
m_in = torch.cat([x_i, x_j], dim=-1)
m_raw = self.msg(m_in)
# edge conditioning: simple mean
c_ctx = 0.5 * (c_i + c_j)
m = self.film_msg(m_raw, c_ctx)
a_ij = self.edge_attr_net(edge_attr)
m = m * a_ij
return m
@@ -79,7 +65,10 @@ class GatingGNO(nn.Module):
Out:
y : [N, out_ch]
"""
def __init__(self, x_ch_node, f_ch_node, hidden, layers, edge_ch=0, out_ch=1):
def __init__(
self, x_ch_node, f_ch_node, hidden, layers, edge_ch=0, out_ch=1
):
super().__init__()
self.encoder_x = nn.Sequential(
nn.Linear(x_ch_node, hidden // 2),
@@ -92,12 +81,15 @@ class GatingGNO(nn.Module):
nn.Linear(hidden // 2, hidden),
)
self.blocks = nn.ModuleList(
[ConditionalGNOBlock(hidden_ch=hidden, edge_ch=edge_ch) for _ in range(layers)]
[
ConditionalGNOBlock(hidden_ch=hidden, edge_ch=edge_ch)
for _ in range(layers)
]
)
self.dec = nn.Sequential(
nn.LayerNorm(hidden),
nn.Linear(hidden, hidden // 2),
nn.SiLU(),
nn.Linear(hidden, out_ch)
nn.Linear(hidden // 2, out_ch),
)
def forward(self, x, c, edge_index, edge_attr=None):
@@ -105,4 +97,4 @@ class GatingGNO(nn.Module):
c = self.encoder_c(c) # [N,H]
for blk in self.blocks:
x = blk(x, c, edge_index, edge_attr=edge_attr)
return self.dec(x)
return self.dec(x)