import torch from torch import nn from torch_geometric.nn import MessagePassing # ---- FiLM that starts as identity and normalizes the target ---- class FiLM(nn.Module): def __init__(self, c_ch, h_ch): super().__init__() self.net = nn.Sequential( nn.Linear(c_ch, 2 * h_ch), nn.SiLU(), nn.Linear(2 * h_ch, 2 * h_ch) ) # init to identity: gamma≈0 (so 1+gamma=1), beta=0 nn.init.zeros_(self.net[-1].weight) nn.init.zeros_(self.net[-1].bias) def forward(self, h, c): gb = self.net(c) gamma, beta = gb.chunk(2, dim=-1) return (1 + gamma) * h + beta class ConditionalGNOBlock(MessagePassing): """ Message passing with FiLM applied to the MESSAGE m_ij, using edge context c_ij = (c_i + c_j)/2. """ def __init__(self, hidden_ch, edge_ch=0, aggr="add"): super().__init__(aggr=aggr, node_dim=0) # FiLM over the message (per-edge) self.film_msg = FiLM(c_ch=hidden_ch, h_ch=hidden_ch) self.edge_attr_net = nn.Sequential( nn.Linear(edge_ch, hidden_ch // 2), nn.SiLU(), nn.Linear(hidden_ch // 2, hidden_ch), ) self.x_net = nn.Sequential( nn.Linear(hidden_ch, hidden_ch * 2), nn.SiLU(), nn.Linear(hidden_ch * 2, hidden_ch), ) def forward(self, x, c, edge_index, edge_attr=None): return self.propagate(edge_index, x=x, c=c, edge_attr=edge_attr) def update(self, aggr_out, x): return self.x_net(x) + aggr_out def message(self, x_j, c_i, c_j, edge_attr): # c_ij = (c_i + c_j)/2 c_ij = 0.5 * (c_i + c_j) m = self.film_msg(x_j, c_ij) if edge_attr is not None: a_ij = self.edge_attr_net(edge_attr) m = m * a_ij return m class GatingGNO(nn.Module): """ In: x : [N, Cx] (e.g., u or features to predict from) c : [N, Cf] (conditioning field, e.g., conductivity) Out: y : [N, out_ch] """ def __init__( self, x_ch_node, f_ch_node, hidden, layers, edge_ch=0, out_ch=1 ): super().__init__() self.encoder_x = nn.Sequential( nn.Linear(x_ch_node, hidden // 2), nn.SiLU(), nn.Linear(hidden // 2, hidden), ) self.encoder_c = nn.Sequential( nn.Linear(f_ch_node, hidden // 2), nn.SiLU(), nn.Linear(hidden // 2, hidden), ) self.blocks = nn.ModuleList( [ ConditionalGNOBlock(hidden_ch=hidden, edge_ch=edge_ch) for _ in range(layers) ] ) self.dec = nn.Sequential( nn.Linear(hidden, hidden // 2), nn.SiLU(), nn.Linear(hidden // 2, out_ch), ) def forward(self, x, c, edge_index, edge_attr=None): x = self.encoder_x(x) # [N,H] c = self.encoder_c(c) # [N,H] for blk in self.blocks: x = blk(x, c, edge_index, edge_attr=edge_attr) return self.dec(x)