From f2ce282a68301d3ee8fcb2e5c9f370fe7913523b Mon Sep 17 00:00:00 2001 From: Filippo Olivo Date: Tue, 9 Dec 2025 09:18:36 +0100 Subject: [PATCH] fix module and model + add curriculum callback --- ThermalSolver/autoregressive_module.py | 106 +++++++++++++------- ThermalSolver/graph_datamodule_unsteady.py | 79 +++++---------- ThermalSolver/model/diffusion_net.py | 65 ++++++++---- ThermalSolver/switch_dataloader_callback.py | 104 +++++++++++++++++++ 4 files changed, 243 insertions(+), 111 deletions(-) create mode 100644 ThermalSolver/switch_dataloader_callback.py diff --git a/ThermalSolver/autoregressive_module.py b/ThermalSolver/autoregressive_module.py index c70cbe7..34d0073 100644 --- a/ThermalSolver/autoregressive_module.py +++ b/ThermalSolver/autoregressive_module.py @@ -116,13 +116,12 @@ class GraphSolver(LightningModule): return out def _preprocess_batch(self, batch: Batch): - x, y, c, edge_index, edge_attr, nodal_area = ( + x, y, c, edge_index, edge_attr = ( batch.x, batch.y, batch.c, batch.edge_index, batch.edge_attr, - batch.nodal_area, ) edge_attr = 1 / edge_attr conductivity = self._compute_c_ij(c, edge_index) @@ -133,34 +132,7 @@ class GraphSolver(LightningModule): x, y, edge_index, edge_attr, conductivity = self._preprocess_batch( batch ) - # deg = self._compute_deg(edge_index, edge_attr, x.size(0)) losses = [] - # print(x.shape, y.shape) - # # print(torch.max(edge_index), torch.min(edge_index)) - # plt.figure() - # plt.subplot(2,3,1) - # plt.scatter(batch.pos[:,0].cpu(), batch.pos[:,1].cpu(), c=x.squeeze().cpu()) - # plt.subplot(2,3,2) - # plt.scatter(batch.pos[:,0].cpu(), batch.pos[:,1].cpu(), c=y[:,0,:].squeeze().cpu()) - # plt.subplot(2,3,3) - # plt.scatter(batch.pos[:,0].cpu(), batch.pos[:,1].cpu(), c=y[:,1,:].squeeze().cpu()) - # plt.subplot(2,3,4) - # plt.scatter(batch.pos[:,0].cpu(), batch.pos[:,1].cpu(), c=y[:,2,:].squeeze().cpu()) - # plt.subplot(2,3,5) - # plt.scatter(batch.pos[:,0].cpu(), batch.pos[:,1].cpu(), c=y[:,3,:].squeeze().cpu()) - # plt.subplot(2,3,6) - # plt.scatter(batch.pos[:,0].cpu(), batch.pos[:,1].cpu(), c=y[:,4,:].squeeze().cpu()) - # plt.suptitle("Training Batch Visualization", fontsize=16) - # plt.savefig("training_batch_visualization.png", dpi=300) - # plt.close() - # y = z - pos = batch.pos - boundary_mask = batch.boundary_mask - boundary_values = batch.boundary_values - # plt.scatter(pos[boundary_mask,0].cpu(), pos[boundary_mask,1].cpu(), c=boundary_values.cpu(), s=1) - # plt.savefig("boundary_nodes.png", dpi=300) - # y = z - scale = 50 for i in range(self.unrolling_steps): out = self._compute_model_steps( x, @@ -172,15 +144,26 @@ class GraphSolver(LightningModule): conductivity, ) x = out - # print(out.shape, y[:, i, :].shape) losses.append(self.loss(out.flatten(), y[:, i, :].flatten())) - # print(self.model.scale_edge_attr.item()) - loss = torch.stack(losses).mean() - # for param in self.model.parameters(): - # print(f"Param: {param.shape}, Grad: {param.grad}") - # print(f"Param: {param[0]}") self._log_loss(loss, batch, "train") + for i, layer in enumerate(self.model.layers): + self.log( + f"alpha_{i}", + layer.alpha, + prog_bar=True, + on_epoch=True, + on_step=False, + batch_size=int(batch.num_graphs), + ) + self.log( + "dt", + self.model.dt, + prog_bar=True, + on_epoch=True, + on_step=False, + batch_size=int(batch.num_graphs), + ) return loss def validation_step(self, batch: Batch, batch_idx): @@ -222,8 +205,59 @@ class GraphSolver(LightningModule): self._log_loss(loss, batch, "val") return loss + def _check_convergence(self, y_pred, y_true, tol=1e-3): + l2_norm = torch.norm(y_pred - y_true, p=2) + y_true_norm = torch.norm(y_true, p=2) + rel_error = l2_norm / (y_true_norm + 1e-8) + return rel_error.item() < tol + def test_step(self, batch: Batch, batch_idx): - pass + x, y, edge_index, edge_attr, conductivity = self._preprocess_batch( + batch + ) + # deg = self._compute_deg(edge_index, edge_attr, x.size(0)) + losses = [] + all_losses = [] + norms = [] + for i in range(self.unrolling_steps): + out = self._compute_model_steps( + # torch.cat([x,pos], dim=-1), + x, + edge_index, + edge_attr, + # deg, + batch.boundary_mask, + batch.boundary_values, + conductivity, + ) + norms.append(torch.norm(out - x, p=2).item()) + x = out + loss = self.loss(out, y[:, i, :]) + all_losses.append(loss.item()) + losses.append(loss) + # if ( + # batch_idx == 0 + # and self.current_epoch % 10 == 0 + # and self.current_epoch > 0 + # ): + # _plot_mesh( + # batch.pos, + # x, + # out, + # y[:, i, :], + # batch.batch, + # i, + # self.current_epoch, + # ) + loss = torch.stack(losses).mean() + # if ( + # batch_idx == 0 + # and self.current_epoch % 10 == 0 + # and self.current_epoch > 0 + # ): + _plot_losses(norms, self.current_epoch) + self._log_loss(loss, batch, "test") + return loss def configure_optimizers(self): optimizer = torch.optim.AdamW(self.parameters(), lr=1e-3) diff --git a/ThermalSolver/graph_datamodule_unsteady.py b/ThermalSolver/graph_datamodule_unsteady.py index 6bf109f..1fc94f8 100644 --- a/ThermalSolver/graph_datamodule_unsteady.py +++ b/ThermalSolver/graph_datamodule_unsteady.py @@ -7,44 +7,13 @@ from torch_geometric.loader import DataLoader from torch_geometric.utils import to_undirected from .mesh_data import MeshData -# from torch.utils.data import Dataset -from torch_geometric.utils import scatter - - -def compute_nodal_area(edge_index, edge_attr, num_nodes): - """ - 1. Calculates Area ~ (Min Edge Length)^2 - 2. Scales by Mean so average cell has size 1.0 - """ - row, col = edge_index - dist = edge_attr.squeeze() - - # 1. Get 'h' (Closest neighbor distance) - # Using 'min' filters out diagonal connections in the quad mesh - h = scatter(dist, col, dim=0, dim_size=num_nodes, reduce="min") - - # 2. Estimate Raw Area - raw_area = h.pow(2) - - # 3. Mean Scaling (The Best Normalization) - # This keeps values near 1.0, preserving stability AND physics ratios. - # We detach to ensure no gradients flow here (it's static data). - mean_val = raw_area.mean().detach() - - # Result: - # Small cells -> approx 0.1 - # Large cells -> approx 5.0 - # Average -> 1.0 - # nodal_area = (raw_area / mean_val).unsqueeze(-1) + 1e-6 - nodal_area = raw_area - return nodal_area.unsqueeze(-1) - class GraphDataModule(LightningDataModule): def __init__( self, hf_repo: str, split_name: str, + n_elements: int = None, train_size: float = 0.2, val_size: float = 0.1, test_size: float = 0.1, @@ -52,18 +21,19 @@ class GraphDataModule(LightningDataModule): remove_boundary_edges: bool = False, build_radial_graph: bool = False, radius: float = None, - start_unrolling_steps: int = 1, + unrolling_steps: int = 1, ): super().__init__() self.hf_repo = hf_repo self.split_name = split_name + self.n_elements = n_elements self.dataset_dict = {} self.train_dataset, self.val_dataset, self.test_dataset = ( None, None, None, ) - self.unrolling_steps = start_unrolling_steps + self.unrolling_steps = unrolling_steps self.geometry_dict = {} self.train_size = train_size self.val_size = val_size @@ -76,6 +46,9 @@ class GraphDataModule(LightningDataModule): def prepare_data(self): dataset = load_dataset(self.hf_repo, name="snapshots")[self.split_name] geometry = load_dataset(self.hf_repo, name="geometry")[self.split_name] + if self.n_elements is not None: + dataset = dataset.select(range(self.n_elements)) + geometry = geometry.select(range(self.n_elements)) total_len = len(dataset) train_len = int(self.train_size * total_len) @@ -117,13 +90,18 @@ class GraphDataModule(LightningDataModule): self, snapshot: dict, geometry: dict, + test: bool = False, ) -> Data: conductivity = torch.tensor( geometry["conductivity"], dtype=torch.float32 ) - temperatures = torch.tensor( - snapshot["temperatures"], dtype=torch.float32 - )[:40] + temperatures = ( + torch.tensor(snapshot["temperatures"], dtype=torch.float32)[:40] + if not test + else torch.tensor(snapshot["temperatures"], dtype=torch.float32)[ + : self.unrolling_steps + 1 + ] + ) times = torch.tensor(snapshot["times"], dtype=torch.float32) pos = torch.tensor(geometry["points"], dtype=torch.float32)[:, :2] @@ -138,16 +116,6 @@ class GraphDataModule(LightningDataModule): ) if self.build_radial_graph: - # from pina.graph import RadiusGraph - - # if self.radius is None: - # raise ValueError("Radius must be specified for radial graph.") - # edge_index = RadiusGraph.compute_radius_graph( - # pos, radius=self.radius - # ) - # from torch_geometric.utils import remove_self_loops - - # edge_index, _ = remove_self_loops(edge_index) raise NotImplementedError( "Radial graph building not implemented yet." ) @@ -161,7 +129,6 @@ class GraphDataModule(LightningDataModule): bottom_ids, right_ids, top_ids, left_ids, temperatures[0, :] ) edge_attr = torch.norm(pos[edge_index[0]] - pos[edge_index[1]], dim=1) - nodal_area = compute_nodal_area(edge_index, edge_attr, pos.size(0)) if self.remove_boundary_edges: boundary_idx = torch.unique(boundary_mask) edge_index_mask = ~torch.isin(edge_index[1], boundary_idx) @@ -186,7 +153,6 @@ class GraphDataModule(LightningDataModule): edge_attr=edge_attr, boundary_mask=boundary_mask, boundary_values=boundary_values, - nodal_area=nodal_area, ) ) return data @@ -213,7 +179,7 @@ class GraphDataModule(LightningDataModule): ] if stage == "test" or stage is None: self.test_data = [ - self._build_dataset(snap, geom) + self._build_dataset(snap, geom, test=True) for snap, geom in tqdm( zip(self.dataset_dict["test"], self.geometry_dict["test"]), desc="Building test graphs", @@ -234,7 +200,9 @@ class GraphDataModule(LightningDataModule): # self.train_dataset = ds # print(type(self.train_data[0])) ds = [i for data in self.train_data for i in data] - # print(type(ds[0])) + print( + f"\nLoading training data, using {self.unrolling_steps} unrolling steps..." + ) return DataLoader( ds, batch_size=self.batch_size, @@ -244,6 +212,9 @@ class GraphDataModule(LightningDataModule): ) def val_dataloader(self): + print( + f"\nLoading validation data, using {self.unrolling_steps} unrolling steps..." + ) ds = [i for data in self.val_data for i in data] return DataLoader( ds, @@ -254,12 +225,10 @@ class GraphDataModule(LightningDataModule): ) def test_dataloader(self): - ds = self.create_autoregressive_datasets( - dataset="test", no_unrolling=True - ) + ds = [i for data in self.test_data for i in data] return DataLoader( ds, - batch_size=self.batch_size, + batch_size=1, shuffle=False, num_workers=8, pin_memory=True, diff --git a/ThermalSolver/model/diffusion_net.py b/ThermalSolver/model/diffusion_net.py index 08f152b..4a2bb36 100644 --- a/ThermalSolver/model/diffusion_net.py +++ b/ThermalSolver/model/diffusion_net.py @@ -1,6 +1,7 @@ import torch import torch.nn as nn from torch_geometric.nn import MessagePassing +from torch.nn.utils import spectral_norm class DiffusionLayer(MessagePassing): @@ -13,28 +14,34 @@ class DiffusionLayer(MessagePassing): channels: int, **kwargs, ): - super().__init__(aggr="add", **kwargs) - self.dt = nn.Parameter(torch.tensor(1e-4)) self.conductivity_net = nn.Sequential( - nn.Linear(channels, channels, bias=False), + spectral_norm(nn.Linear(channels, channels, bias=False)), nn.GELU(), - nn.Linear(channels, channels, bias=False), + spectral_norm(nn.Linear(channels, channels, bias=False)), ) self.phys_encoder = nn.Sequential( - nn.Linear(1, 8, bias=False), + spectral_norm(nn.Linear(1, 8, bias=True)), nn.Tanh(), - nn.Linear(8, 1, bias=False), + spectral_norm(nn.Linear(8, 1, bias=True)), nn.Softplus(), ) + self.alpha_param = nn.Parameter(torch.tensor(1e-2)) + + @property + def alpha(self): + return torch.clamp(self.alpha_param, min=1e-5, max=1.0) + def forward(self, x, edge_index, edge_weight, conductivity): edge_weight = edge_weight.unsqueeze(-1) conductance = self.phys_encoder(edge_weight) net_flux = self.propagate(edge_index, x=x, conductance=conductance) - return x + ((net_flux) * self.dt) + # return (1-self.alpha) * x + self.alpha * net_flux + # return net_flux + x + return x + self.alpha * net_flux def message(self, x_i, x_j, conductance): delta = x_j - x_i @@ -44,15 +51,21 @@ class DiffusionLayer(MessagePassing): class DiffusionNet(nn.Module): - def __init__(self, input_dim=1, output_dim=1, hidden_dim=8, n_layers=4): + def __init__( + self, + input_dim=1, + output_dim=1, + hidden_dim=8, + n_layers=4, + shared_weights=False, + ): super().__init__() # Encoder: Projects input temperature to hidden feature space self.enc = nn.Sequential( - nn.Linear(input_dim, hidden_dim, bias=True), - nn.GELU(), - nn.Linear(hidden_dim, hidden_dim, bias=True), + spectral_norm(nn.Linear(input_dim, hidden_dim, bias=True)), nn.GELU(), + spectral_norm(nn.Linear(hidden_dim, hidden_dim, bias=True)), ) self.scale_x = nn.Parameter(torch.zeros(hidden_dim)) @@ -60,27 +73,40 @@ class DiffusionNet(nn.Module): # Scale parameters for conditioning self.scale_edge_attr = nn.Parameter(torch.zeros(1)) - # Stack of Diffusion Layers - self.layers = torch.nn.ModuleList( - [DiffusionLayer(hidden_dim) for _ in range(n_layers)] - ) + # If shared_weights is True, use the same DiffusionLayer multiple times + if shared_weights: + diffusion_layer = DiffusionLayer(hidden_dim) + self.layers = torch.nn.ModuleList( + [diffusion_layer for _ in range(n_layers)] + ) + # If shared_weights is False, use separate DiffusionLayers + else: + # Stack of Diffusion Layers + self.layers = torch.nn.ModuleList( + [DiffusionLayer(hidden_dim) for _ in range(n_layers)] + ) # Decoder: Projects hidden features back to Temperature space self.dec = nn.Sequential( - nn.Linear(hidden_dim, hidden_dim, bias=True), + spectral_norm(nn.Linear(hidden_dim, hidden_dim, bias=True)), nn.GELU(), - nn.Linear(hidden_dim, output_dim, bias=True), + spectral_norm(nn.Linear(hidden_dim, output_dim, bias=True)), nn.Softplus(), # Ensure positive temperature output ) self.func = torch.nn.GELU() + self.dt_param = nn.Parameter(torch.tensor(1e-2)) + + @property + def dt(self): + return torch.clamp(self.dt_param, min=1e-5, max=0.5) + def forward(self, x, edge_index, edge_attr, conductivity): # 1. Global Residual Connection setup # We save the input to add it back at the very end. # The network learns the correction (Delta T), not the absolute T. x_input = x - # 2. Encode h = self.enc(x) * torch.exp(self.scale_x) @@ -98,5 +124,4 @@ class DiffusionNet(nn.Module): # 6. Final Update (Explicit Euler Step) # T_new = T_old + Correction - # return x_input + delta_x - return delta_x + return delta_x + x_input * self.dt diff --git a/ThermalSolver/switch_dataloader_callback.py b/ThermalSolver/switch_dataloader_callback.py new file mode 100644 index 0000000..b91e597 --- /dev/null +++ b/ThermalSolver/switch_dataloader_callback.py @@ -0,0 +1,104 @@ +import torch +from lightning.pytorch.callbacks import Callback +import os + + +class SwitchDataLoaderCallback(Callback): + def __init__( + self, + ckpt_path, + increase_unrolling_steps_by, + increase_unrolling_steps_every, + max_unrolling_steps=10, + patience=None, + last_patience=None, + metric="val/loss", + ): + super().__init__() + self.ckpt_path = ckpt_path + if os.path.exists(ckpt_path) is False: + os.makedirs(ckpt_path) + self.increase_unrolling_steps_by = increase_unrolling_steps_by + self.increase_unrolling_steps_every = increase_unrolling_steps_every + self.max_unrolling_steps = max_unrolling_steps + self.metric = metric + self.actual_loss = torch.inf + if patience is not None: + self.patience = patience + if last_patience is not None: + self.last_patience = last_patience + self.no_improvement_epochs = 0 + self.last_step_reached = False + + def on_validation_epoch_end(self, trainer, pl_module): + self._metric_tracker(trainer, pl_module) + if self.last_step_reached is False: + self._unrolling_steps_handler(pl_module, trainer) + else: + if self.no_improvement_epochs >= self.last_patience: + trainer.should_stop = True + + def _metric_tracker(self, trainer, pl_module): + if trainer.callback_metrics.get(self.metric) < self.actual_loss: + self.actual_loss = trainer.callback_metrics.get(self.metric) + self._save_model(pl_module, trainer) + self.no_improvement_epochs = 0 + print(f"\nNew best {self.metric}: {self.actual_loss:.4f}") + else: + self.no_improvement_epochs += 1 + print( + f"\nNo improvement in {self.metric} for {self.no_improvement_epochs} epochs." + ) + + def _should_reload_dataloader(self, trainer): + if self.patience is not None: + print( + f"Checking patience: {self.no_improvement_epochs} / {self.patience}" + ) + if self.no_improvement_epochs >= self.patience: + return True + elif ( + trainer.current_epoch + 1 % self.increase_unrolling_steps_every == 0 + ): + print("Reached scheduled epoch for increasing unrolling steps.") + return True + return False + + def _unrolling_steps_handler(self, pl_module, trainer): + if self._should_reload_dataloader(trainer): + self._load_model(pl_module) + if pl_module.unrolling_steps >= self.max_unrolling_steps: + return + pl_module.unrolling_steps += self.increase_unrolling_steps_by + trainer.datamodule.unrolling_steps = pl_module.unrolling_steps + print(f"Incremented unrolling steps to {pl_module.unrolling_steps}") + trainer.datamodule.setup(stage="fit") + trainer.manual_dataloader_reload() + self.actual_loss = torch.inf + if pl_module.unrolling_steps >= self.max_unrolling_steps: + print( + "Reached max unrolling steps. Stopping further increments." + ) + self.last_step_reached = True + + def _save_model(self, pl_module, trainer): + pt_path = os.path.join( + self.ckpt_path, + f"{pl_module.unrolling_steps}_unrolling_best_model.pt", + ) + torch.save(pl_module.state_dict(), pt_path) # <--- CHANGED THIS + ckpt_path = os.path.join( + self.ckpt_path, + f"{pl_module.unrolling_steps}_unrolling_best_checkpoint.ckpt", + ) + trainer.save_checkpoint(ckpt_path, weights_only=False) + + def _load_model(self, pl_module): + pt_path = os.path.join( + self.ckpt_path, + f"{pl_module.unrolling_steps}_unrolling_best_model.pt", + ) + pl_module.load_state_dict(torch.load(pt_path, weights_only=True)) + print( + f"Loaded model weights from {pt_path} for unrolling steps = {pl_module.unrolling_steps}" + )