fix module and model + add curriculum callback

This commit is contained in:
Filippo Olivo
2025-12-09 09:18:36 +01:00
parent 2935785b31
commit f2ce282a68
4 changed files with 243 additions and 111 deletions

View File

@@ -116,13 +116,12 @@ class GraphSolver(LightningModule):
return out return out
def _preprocess_batch(self, batch: Batch): def _preprocess_batch(self, batch: Batch):
x, y, c, edge_index, edge_attr, nodal_area = ( x, y, c, edge_index, edge_attr = (
batch.x, batch.x,
batch.y, batch.y,
batch.c, batch.c,
batch.edge_index, batch.edge_index,
batch.edge_attr, batch.edge_attr,
batch.nodal_area,
) )
edge_attr = 1 / edge_attr edge_attr = 1 / edge_attr
conductivity = self._compute_c_ij(c, edge_index) conductivity = self._compute_c_ij(c, edge_index)
@@ -133,34 +132,7 @@ class GraphSolver(LightningModule):
x, y, edge_index, edge_attr, conductivity = self._preprocess_batch( x, y, edge_index, edge_attr, conductivity = self._preprocess_batch(
batch batch
) )
# deg = self._compute_deg(edge_index, edge_attr, x.size(0))
losses = [] losses = []
# print(x.shape, y.shape)
# # print(torch.max(edge_index), torch.min(edge_index))
# plt.figure()
# plt.subplot(2,3,1)
# plt.scatter(batch.pos[:,0].cpu(), batch.pos[:,1].cpu(), c=x.squeeze().cpu())
# plt.subplot(2,3,2)
# plt.scatter(batch.pos[:,0].cpu(), batch.pos[:,1].cpu(), c=y[:,0,:].squeeze().cpu())
# plt.subplot(2,3,3)
# plt.scatter(batch.pos[:,0].cpu(), batch.pos[:,1].cpu(), c=y[:,1,:].squeeze().cpu())
# plt.subplot(2,3,4)
# plt.scatter(batch.pos[:,0].cpu(), batch.pos[:,1].cpu(), c=y[:,2,:].squeeze().cpu())
# plt.subplot(2,3,5)
# plt.scatter(batch.pos[:,0].cpu(), batch.pos[:,1].cpu(), c=y[:,3,:].squeeze().cpu())
# plt.subplot(2,3,6)
# plt.scatter(batch.pos[:,0].cpu(), batch.pos[:,1].cpu(), c=y[:,4,:].squeeze().cpu())
# plt.suptitle("Training Batch Visualization", fontsize=16)
# plt.savefig("training_batch_visualization.png", dpi=300)
# plt.close()
# y = z
pos = batch.pos
boundary_mask = batch.boundary_mask
boundary_values = batch.boundary_values
# plt.scatter(pos[boundary_mask,0].cpu(), pos[boundary_mask,1].cpu(), c=boundary_values.cpu(), s=1)
# plt.savefig("boundary_nodes.png", dpi=300)
# y = z
scale = 50
for i in range(self.unrolling_steps): for i in range(self.unrolling_steps):
out = self._compute_model_steps( out = self._compute_model_steps(
x, x,
@@ -172,15 +144,26 @@ class GraphSolver(LightningModule):
conductivity, conductivity,
) )
x = out x = out
# print(out.shape, y[:, i, :].shape)
losses.append(self.loss(out.flatten(), y[:, i, :].flatten())) losses.append(self.loss(out.flatten(), y[:, i, :].flatten()))
# print(self.model.scale_edge_attr.item())
loss = torch.stack(losses).mean() loss = torch.stack(losses).mean()
# for param in self.model.parameters():
# print(f"Param: {param.shape}, Grad: {param.grad}")
# print(f"Param: {param[0]}")
self._log_loss(loss, batch, "train") self._log_loss(loss, batch, "train")
for i, layer in enumerate(self.model.layers):
self.log(
f"alpha_{i}",
layer.alpha,
prog_bar=True,
on_epoch=True,
on_step=False,
batch_size=int(batch.num_graphs),
)
self.log(
"dt",
self.model.dt,
prog_bar=True,
on_epoch=True,
on_step=False,
batch_size=int(batch.num_graphs),
)
return loss return loss
def validation_step(self, batch: Batch, batch_idx): def validation_step(self, batch: Batch, batch_idx):
@@ -222,8 +205,59 @@ class GraphSolver(LightningModule):
self._log_loss(loss, batch, "val") self._log_loss(loss, batch, "val")
return loss return loss
def _check_convergence(self, y_pred, y_true, tol=1e-3):
l2_norm = torch.norm(y_pred - y_true, p=2)
y_true_norm = torch.norm(y_true, p=2)
rel_error = l2_norm / (y_true_norm + 1e-8)
return rel_error.item() < tol
def test_step(self, batch: Batch, batch_idx): def test_step(self, batch: Batch, batch_idx):
pass x, y, edge_index, edge_attr, conductivity = self._preprocess_batch(
batch
)
# deg = self._compute_deg(edge_index, edge_attr, x.size(0))
losses = []
all_losses = []
norms = []
for i in range(self.unrolling_steps):
out = self._compute_model_steps(
# torch.cat([x,pos], dim=-1),
x,
edge_index,
edge_attr,
# deg,
batch.boundary_mask,
batch.boundary_values,
conductivity,
)
norms.append(torch.norm(out - x, p=2).item())
x = out
loss = self.loss(out, y[:, i, :])
all_losses.append(loss.item())
losses.append(loss)
# if (
# batch_idx == 0
# and self.current_epoch % 10 == 0
# and self.current_epoch > 0
# ):
# _plot_mesh(
# batch.pos,
# x,
# out,
# y[:, i, :],
# batch.batch,
# i,
# self.current_epoch,
# )
loss = torch.stack(losses).mean()
# if (
# batch_idx == 0
# and self.current_epoch % 10 == 0
# and self.current_epoch > 0
# ):
_plot_losses(norms, self.current_epoch)
self._log_loss(loss, batch, "test")
return loss
def configure_optimizers(self): def configure_optimizers(self):
optimizer = torch.optim.AdamW(self.parameters(), lr=1e-3) optimizer = torch.optim.AdamW(self.parameters(), lr=1e-3)

View File

@@ -7,44 +7,13 @@ from torch_geometric.loader import DataLoader
from torch_geometric.utils import to_undirected from torch_geometric.utils import to_undirected
from .mesh_data import MeshData from .mesh_data import MeshData
# from torch.utils.data import Dataset
from torch_geometric.utils import scatter
def compute_nodal_area(edge_index, edge_attr, num_nodes):
"""
1. Calculates Area ~ (Min Edge Length)^2
2. Scales by Mean so average cell has size 1.0
"""
row, col = edge_index
dist = edge_attr.squeeze()
# 1. Get 'h' (Closest neighbor distance)
# Using 'min' filters out diagonal connections in the quad mesh
h = scatter(dist, col, dim=0, dim_size=num_nodes, reduce="min")
# 2. Estimate Raw Area
raw_area = h.pow(2)
# 3. Mean Scaling (The Best Normalization)
# This keeps values near 1.0, preserving stability AND physics ratios.
# We detach to ensure no gradients flow here (it's static data).
mean_val = raw_area.mean().detach()
# Result:
# Small cells -> approx 0.1
# Large cells -> approx 5.0
# Average -> 1.0
# nodal_area = (raw_area / mean_val).unsqueeze(-1) + 1e-6
nodal_area = raw_area
return nodal_area.unsqueeze(-1)
class GraphDataModule(LightningDataModule): class GraphDataModule(LightningDataModule):
def __init__( def __init__(
self, self,
hf_repo: str, hf_repo: str,
split_name: str, split_name: str,
n_elements: int = None,
train_size: float = 0.2, train_size: float = 0.2,
val_size: float = 0.1, val_size: float = 0.1,
test_size: float = 0.1, test_size: float = 0.1,
@@ -52,18 +21,19 @@ class GraphDataModule(LightningDataModule):
remove_boundary_edges: bool = False, remove_boundary_edges: bool = False,
build_radial_graph: bool = False, build_radial_graph: bool = False,
radius: float = None, radius: float = None,
start_unrolling_steps: int = 1, unrolling_steps: int = 1,
): ):
super().__init__() super().__init__()
self.hf_repo = hf_repo self.hf_repo = hf_repo
self.split_name = split_name self.split_name = split_name
self.n_elements = n_elements
self.dataset_dict = {} self.dataset_dict = {}
self.train_dataset, self.val_dataset, self.test_dataset = ( self.train_dataset, self.val_dataset, self.test_dataset = (
None, None,
None, None,
None, None,
) )
self.unrolling_steps = start_unrolling_steps self.unrolling_steps = unrolling_steps
self.geometry_dict = {} self.geometry_dict = {}
self.train_size = train_size self.train_size = train_size
self.val_size = val_size self.val_size = val_size
@@ -76,6 +46,9 @@ class GraphDataModule(LightningDataModule):
def prepare_data(self): def prepare_data(self):
dataset = load_dataset(self.hf_repo, name="snapshots")[self.split_name] dataset = load_dataset(self.hf_repo, name="snapshots")[self.split_name]
geometry = load_dataset(self.hf_repo, name="geometry")[self.split_name] geometry = load_dataset(self.hf_repo, name="geometry")[self.split_name]
if self.n_elements is not None:
dataset = dataset.select(range(self.n_elements))
geometry = geometry.select(range(self.n_elements))
total_len = len(dataset) total_len = len(dataset)
train_len = int(self.train_size * total_len) train_len = int(self.train_size * total_len)
@@ -117,13 +90,18 @@ class GraphDataModule(LightningDataModule):
self, self,
snapshot: dict, snapshot: dict,
geometry: dict, geometry: dict,
test: bool = False,
) -> Data: ) -> Data:
conductivity = torch.tensor( conductivity = torch.tensor(
geometry["conductivity"], dtype=torch.float32 geometry["conductivity"], dtype=torch.float32
) )
temperatures = torch.tensor( temperatures = (
snapshot["temperatures"], dtype=torch.float32 torch.tensor(snapshot["temperatures"], dtype=torch.float32)[:40]
)[:40] if not test
else torch.tensor(snapshot["temperatures"], dtype=torch.float32)[
: self.unrolling_steps + 1
]
)
times = torch.tensor(snapshot["times"], dtype=torch.float32) times = torch.tensor(snapshot["times"], dtype=torch.float32)
pos = torch.tensor(geometry["points"], dtype=torch.float32)[:, :2] pos = torch.tensor(geometry["points"], dtype=torch.float32)[:, :2]
@@ -138,16 +116,6 @@ class GraphDataModule(LightningDataModule):
) )
if self.build_radial_graph: if self.build_radial_graph:
# from pina.graph import RadiusGraph
# if self.radius is None:
# raise ValueError("Radius must be specified for radial graph.")
# edge_index = RadiusGraph.compute_radius_graph(
# pos, radius=self.radius
# )
# from torch_geometric.utils import remove_self_loops
# edge_index, _ = remove_self_loops(edge_index)
raise NotImplementedError( raise NotImplementedError(
"Radial graph building not implemented yet." "Radial graph building not implemented yet."
) )
@@ -161,7 +129,6 @@ class GraphDataModule(LightningDataModule):
bottom_ids, right_ids, top_ids, left_ids, temperatures[0, :] bottom_ids, right_ids, top_ids, left_ids, temperatures[0, :]
) )
edge_attr = torch.norm(pos[edge_index[0]] - pos[edge_index[1]], dim=1) edge_attr = torch.norm(pos[edge_index[0]] - pos[edge_index[1]], dim=1)
nodal_area = compute_nodal_area(edge_index, edge_attr, pos.size(0))
if self.remove_boundary_edges: if self.remove_boundary_edges:
boundary_idx = torch.unique(boundary_mask) boundary_idx = torch.unique(boundary_mask)
edge_index_mask = ~torch.isin(edge_index[1], boundary_idx) edge_index_mask = ~torch.isin(edge_index[1], boundary_idx)
@@ -186,7 +153,6 @@ class GraphDataModule(LightningDataModule):
edge_attr=edge_attr, edge_attr=edge_attr,
boundary_mask=boundary_mask, boundary_mask=boundary_mask,
boundary_values=boundary_values, boundary_values=boundary_values,
nodal_area=nodal_area,
) )
) )
return data return data
@@ -213,7 +179,7 @@ class GraphDataModule(LightningDataModule):
] ]
if stage == "test" or stage is None: if stage == "test" or stage is None:
self.test_data = [ self.test_data = [
self._build_dataset(snap, geom) self._build_dataset(snap, geom, test=True)
for snap, geom in tqdm( for snap, geom in tqdm(
zip(self.dataset_dict["test"], self.geometry_dict["test"]), zip(self.dataset_dict["test"], self.geometry_dict["test"]),
desc="Building test graphs", desc="Building test graphs",
@@ -234,7 +200,9 @@ class GraphDataModule(LightningDataModule):
# self.train_dataset = ds # self.train_dataset = ds
# print(type(self.train_data[0])) # print(type(self.train_data[0]))
ds = [i for data in self.train_data for i in data] ds = [i for data in self.train_data for i in data]
# print(type(ds[0])) print(
f"\nLoading training data, using {self.unrolling_steps} unrolling steps..."
)
return DataLoader( return DataLoader(
ds, ds,
batch_size=self.batch_size, batch_size=self.batch_size,
@@ -244,6 +212,9 @@ class GraphDataModule(LightningDataModule):
) )
def val_dataloader(self): def val_dataloader(self):
print(
f"\nLoading validation data, using {self.unrolling_steps} unrolling steps..."
)
ds = [i for data in self.val_data for i in data] ds = [i for data in self.val_data for i in data]
return DataLoader( return DataLoader(
ds, ds,
@@ -254,12 +225,10 @@ class GraphDataModule(LightningDataModule):
) )
def test_dataloader(self): def test_dataloader(self):
ds = self.create_autoregressive_datasets( ds = [i for data in self.test_data for i in data]
dataset="test", no_unrolling=True
)
return DataLoader( return DataLoader(
ds, ds,
batch_size=self.batch_size, batch_size=1,
shuffle=False, shuffle=False,
num_workers=8, num_workers=8,
pin_memory=True, pin_memory=True,

View File

@@ -1,6 +1,7 @@
import torch import torch
import torch.nn as nn import torch.nn as nn
from torch_geometric.nn import MessagePassing from torch_geometric.nn import MessagePassing
from torch.nn.utils import spectral_norm
class DiffusionLayer(MessagePassing): class DiffusionLayer(MessagePassing):
@@ -13,28 +14,34 @@ class DiffusionLayer(MessagePassing):
channels: int, channels: int,
**kwargs, **kwargs,
): ):
super().__init__(aggr="add", **kwargs) super().__init__(aggr="add", **kwargs)
self.dt = nn.Parameter(torch.tensor(1e-4))
self.conductivity_net = nn.Sequential( self.conductivity_net = nn.Sequential(
nn.Linear(channels, channels, bias=False), spectral_norm(nn.Linear(channels, channels, bias=False)),
nn.GELU(), nn.GELU(),
nn.Linear(channels, channels, bias=False), spectral_norm(nn.Linear(channels, channels, bias=False)),
) )
self.phys_encoder = nn.Sequential( self.phys_encoder = nn.Sequential(
nn.Linear(1, 8, bias=False), spectral_norm(nn.Linear(1, 8, bias=True)),
nn.Tanh(), nn.Tanh(),
nn.Linear(8, 1, bias=False), spectral_norm(nn.Linear(8, 1, bias=True)),
nn.Softplus(), nn.Softplus(),
) )
self.alpha_param = nn.Parameter(torch.tensor(1e-2))
@property
def alpha(self):
return torch.clamp(self.alpha_param, min=1e-5, max=1.0)
def forward(self, x, edge_index, edge_weight, conductivity): def forward(self, x, edge_index, edge_weight, conductivity):
edge_weight = edge_weight.unsqueeze(-1) edge_weight = edge_weight.unsqueeze(-1)
conductance = self.phys_encoder(edge_weight) conductance = self.phys_encoder(edge_weight)
net_flux = self.propagate(edge_index, x=x, conductance=conductance) net_flux = self.propagate(edge_index, x=x, conductance=conductance)
return x + ((net_flux) * self.dt) # return (1-self.alpha) * x + self.alpha * net_flux
# return net_flux + x
return x + self.alpha * net_flux
def message(self, x_i, x_j, conductance): def message(self, x_i, x_j, conductance):
delta = x_j - x_i delta = x_j - x_i
@@ -44,15 +51,21 @@ class DiffusionLayer(MessagePassing):
class DiffusionNet(nn.Module): class DiffusionNet(nn.Module):
def __init__(self, input_dim=1, output_dim=1, hidden_dim=8, n_layers=4): def __init__(
self,
input_dim=1,
output_dim=1,
hidden_dim=8,
n_layers=4,
shared_weights=False,
):
super().__init__() super().__init__()
# Encoder: Projects input temperature to hidden feature space # Encoder: Projects input temperature to hidden feature space
self.enc = nn.Sequential( self.enc = nn.Sequential(
nn.Linear(input_dim, hidden_dim, bias=True), spectral_norm(nn.Linear(input_dim, hidden_dim, bias=True)),
nn.GELU(),
nn.Linear(hidden_dim, hidden_dim, bias=True),
nn.GELU(), nn.GELU(),
spectral_norm(nn.Linear(hidden_dim, hidden_dim, bias=True)),
) )
self.scale_x = nn.Parameter(torch.zeros(hidden_dim)) self.scale_x = nn.Parameter(torch.zeros(hidden_dim))
@@ -60,27 +73,40 @@ class DiffusionNet(nn.Module):
# Scale parameters for conditioning # Scale parameters for conditioning
self.scale_edge_attr = nn.Parameter(torch.zeros(1)) self.scale_edge_attr = nn.Parameter(torch.zeros(1))
# Stack of Diffusion Layers # If shared_weights is True, use the same DiffusionLayer multiple times
self.layers = torch.nn.ModuleList( if shared_weights:
[DiffusionLayer(hidden_dim) for _ in range(n_layers)] diffusion_layer = DiffusionLayer(hidden_dim)
) self.layers = torch.nn.ModuleList(
[diffusion_layer for _ in range(n_layers)]
)
# If shared_weights is False, use separate DiffusionLayers
else:
# Stack of Diffusion Layers
self.layers = torch.nn.ModuleList(
[DiffusionLayer(hidden_dim) for _ in range(n_layers)]
)
# Decoder: Projects hidden features back to Temperature space # Decoder: Projects hidden features back to Temperature space
self.dec = nn.Sequential( self.dec = nn.Sequential(
nn.Linear(hidden_dim, hidden_dim, bias=True), spectral_norm(nn.Linear(hidden_dim, hidden_dim, bias=True)),
nn.GELU(), nn.GELU(),
nn.Linear(hidden_dim, output_dim, bias=True), spectral_norm(nn.Linear(hidden_dim, output_dim, bias=True)),
nn.Softplus(), # Ensure positive temperature output nn.Softplus(), # Ensure positive temperature output
) )
self.func = torch.nn.GELU() self.func = torch.nn.GELU()
self.dt_param = nn.Parameter(torch.tensor(1e-2))
@property
def dt(self):
return torch.clamp(self.dt_param, min=1e-5, max=0.5)
def forward(self, x, edge_index, edge_attr, conductivity): def forward(self, x, edge_index, edge_attr, conductivity):
# 1. Global Residual Connection setup # 1. Global Residual Connection setup
# We save the input to add it back at the very end. # We save the input to add it back at the very end.
# The network learns the correction (Delta T), not the absolute T. # The network learns the correction (Delta T), not the absolute T.
x_input = x x_input = x
# 2. Encode # 2. Encode
h = self.enc(x) * torch.exp(self.scale_x) h = self.enc(x) * torch.exp(self.scale_x)
@@ -98,5 +124,4 @@ class DiffusionNet(nn.Module):
# 6. Final Update (Explicit Euler Step) # 6. Final Update (Explicit Euler Step)
# T_new = T_old + Correction # T_new = T_old + Correction
# return x_input + delta_x return delta_x + x_input * self.dt
return delta_x

View File

@@ -0,0 +1,104 @@
import torch
from lightning.pytorch.callbacks import Callback
import os
class SwitchDataLoaderCallback(Callback):
def __init__(
self,
ckpt_path,
increase_unrolling_steps_by,
increase_unrolling_steps_every,
max_unrolling_steps=10,
patience=None,
last_patience=None,
metric="val/loss",
):
super().__init__()
self.ckpt_path = ckpt_path
if os.path.exists(ckpt_path) is False:
os.makedirs(ckpt_path)
self.increase_unrolling_steps_by = increase_unrolling_steps_by
self.increase_unrolling_steps_every = increase_unrolling_steps_every
self.max_unrolling_steps = max_unrolling_steps
self.metric = metric
self.actual_loss = torch.inf
if patience is not None:
self.patience = patience
if last_patience is not None:
self.last_patience = last_patience
self.no_improvement_epochs = 0
self.last_step_reached = False
def on_validation_epoch_end(self, trainer, pl_module):
self._metric_tracker(trainer, pl_module)
if self.last_step_reached is False:
self._unrolling_steps_handler(pl_module, trainer)
else:
if self.no_improvement_epochs >= self.last_patience:
trainer.should_stop = True
def _metric_tracker(self, trainer, pl_module):
if trainer.callback_metrics.get(self.metric) < self.actual_loss:
self.actual_loss = trainer.callback_metrics.get(self.metric)
self._save_model(pl_module, trainer)
self.no_improvement_epochs = 0
print(f"\nNew best {self.metric}: {self.actual_loss:.4f}")
else:
self.no_improvement_epochs += 1
print(
f"\nNo improvement in {self.metric} for {self.no_improvement_epochs} epochs."
)
def _should_reload_dataloader(self, trainer):
if self.patience is not None:
print(
f"Checking patience: {self.no_improvement_epochs} / {self.patience}"
)
if self.no_improvement_epochs >= self.patience:
return True
elif (
trainer.current_epoch + 1 % self.increase_unrolling_steps_every == 0
):
print("Reached scheduled epoch for increasing unrolling steps.")
return True
return False
def _unrolling_steps_handler(self, pl_module, trainer):
if self._should_reload_dataloader(trainer):
self._load_model(pl_module)
if pl_module.unrolling_steps >= self.max_unrolling_steps:
return
pl_module.unrolling_steps += self.increase_unrolling_steps_by
trainer.datamodule.unrolling_steps = pl_module.unrolling_steps
print(f"Incremented unrolling steps to {pl_module.unrolling_steps}")
trainer.datamodule.setup(stage="fit")
trainer.manual_dataloader_reload()
self.actual_loss = torch.inf
if pl_module.unrolling_steps >= self.max_unrolling_steps:
print(
"Reached max unrolling steps. Stopping further increments."
)
self.last_step_reached = True
def _save_model(self, pl_module, trainer):
pt_path = os.path.join(
self.ckpt_path,
f"{pl_module.unrolling_steps}_unrolling_best_model.pt",
)
torch.save(pl_module.state_dict(), pt_path) # <--- CHANGED THIS
ckpt_path = os.path.join(
self.ckpt_path,
f"{pl_module.unrolling_steps}_unrolling_best_checkpoint.ckpt",
)
trainer.save_checkpoint(ckpt_path, weights_only=False)
def _load_model(self, pl_module):
pt_path = os.path.join(
self.ckpt_path,
f"{pl_module.unrolling_steps}_unrolling_best_model.pt",
)
pl_module.load_state_dict(torch.load(pt_path, weights_only=True))
print(
f"Loaded model weights from {pt_path} for unrolling steps = {pl_module.unrolling_steps}"
)