import torch from lightning import LightningModule from torch_geometric.data import Batch from matplotlib.tri import Triangulation # def plot_results(x, pos, step, i, batch): # x = x[batch == 0] # pos = pos[batch == 0] # tria = Triangulation(pos[:, 0].cpu(), pos[:, 1].cpu()) # import matplotlib.pyplot as plt # plt.tricontourf(tria, x[:, 0].cpu(), levels=14) # plt.colorbar() # plt.savefig(f"{step:03d}_out_{i:03d}.png") # plt.axis("equal") # plt.close() class GraphSolver(LightningModule): def __init__( self, model: torch.nn.Module, loss: torch.nn.Module = None, unrolling_steps: int = 48, ): super().__init__() self.model = model self.loss = loss if loss is not None else torch.nn.MSELoss() self.unrolling_steps = unrolling_steps def forward( self, x: torch.Tensor, c: torch.Tensor, boundary: torch.Tensor, boundary_mask: torch.Tensor, edge_index: torch.Tensor, edge_attr: torch.Tensor, unrolling_steps: int = None, ): return self.model( x, c, boundary, boundary_mask, edge_index, edge_attr, unrolling_steps, ) def _compute_loss(self, x, y): return self.loss(x, y) def _preprocess_batch(self, batch: Batch): return batch.x, batch.y, batch.c, batch.edge_index, batch.edge_attr def _log_loss(self, loss, batch, stage: str): self.log( f"{stage}_loss", loss, on_step=False, on_epoch=True, prog_bar=True, batch_size=int(batch.num_graphs), ) return loss def training_step(self, batch: Batch, _): x, y, c, edge_index, edge_attr = self._preprocess_batch(batch) # x = self._impose_bc(x, batch) # for _ in range(self.unrolling_steps): y_pred = self( x, c, batch.boundary_values, batch.boundary_mask, edge_index=edge_index, edge_attr=edge_attr, unrolling_steps=self.unrolling_steps, ) # x = self._impose_bc(x, batch) loss = self.loss(y_pred, y) self._log_loss(loss, batch, "train") return loss def validation_step(self, batch: Batch, _): x, y, c, edge_index, edge_attr = self._preprocess_batch(batch) y_pred = self( x, c, batch.boundary_values, batch.boundary_mask, edge_index=edge_index, edge_attr=edge_attr, unrolling_steps=self.unrolling_steps, ) loss = self.loss(y_pred, y) self._log_loss(loss, batch, "val") return loss def test_step(self, batch: Batch, _): x, y, c, edge_index, edge_attr = self._preprocess_batch(batch) # for _ in range(self.unrolling_steps): y_pred = self.model( x, c, batch.boundary_values, batch.boundary_mask, edge_index=edge_index, edge_attr=edge_attr, unrolling_steps=self.unrolling_steps, plot_results=True, batch=batch.batch, pos=batch.pos, ) # x = self._impose_bc(x, batch) # plot_results(x, batch.pos, self.global_step, _, batch.batch) loss = self._compute_loss(y_pred, y) self._log_loss(loss, batch, "test") return loss def configure_optimizers(self): optimizer = torch.optim.Adam(self.parameters(), lr=1e-3) return optimizer def _impose_bc(self, x: torch.Tensor, data: Batch): x[data.boundary_mask] = data.boundary_values return x