130 lines
3.7 KiB
Python
130 lines
3.7 KiB
Python
import torch
|
|
from lightning import LightningModule
|
|
from torch_geometric.data import Batch
|
|
from matplotlib.tri import Triangulation
|
|
|
|
|
|
# def plot_results(x, pos, step, i, batch):
|
|
# x = x[batch == 0]
|
|
# pos = pos[batch == 0]
|
|
# tria = Triangulation(pos[:, 0].cpu(), pos[:, 1].cpu())
|
|
# import matplotlib.pyplot as plt
|
|
|
|
# plt.tricontourf(tria, x[:, 0].cpu(), levels=14)
|
|
# plt.colorbar()
|
|
# plt.savefig(f"{step:03d}_out_{i:03d}.png")
|
|
# plt.axis("equal")
|
|
# plt.close()
|
|
|
|
|
|
class GraphSolver(LightningModule):
|
|
def __init__(
|
|
self,
|
|
model: torch.nn.Module,
|
|
loss: torch.nn.Module = None,
|
|
unrolling_steps: int = 48,
|
|
):
|
|
super().__init__()
|
|
self.model = model
|
|
self.loss = loss if loss is not None else torch.nn.MSELoss()
|
|
self.unrolling_steps = unrolling_steps
|
|
|
|
def forward(
|
|
self,
|
|
x: torch.Tensor,
|
|
c: torch.Tensor,
|
|
boundary: torch.Tensor,
|
|
boundary_mask: torch.Tensor,
|
|
edge_index: torch.Tensor,
|
|
edge_attr: torch.Tensor,
|
|
unrolling_steps: int = None,
|
|
):
|
|
return self.model(
|
|
x,
|
|
c,
|
|
boundary,
|
|
boundary_mask,
|
|
edge_index,
|
|
edge_attr,
|
|
unrolling_steps,
|
|
)
|
|
|
|
def _compute_loss(self, x, y):
|
|
return self.loss(x, y)
|
|
|
|
def _preprocess_batch(self, batch: Batch):
|
|
return batch.x, batch.y, batch.c, batch.edge_index, batch.edge_attr
|
|
|
|
def _log_loss(self, loss, batch, stage: str):
|
|
self.log(
|
|
f"{stage}_loss",
|
|
loss,
|
|
on_step=False,
|
|
on_epoch=True,
|
|
prog_bar=True,
|
|
batch_size=int(batch.num_graphs),
|
|
)
|
|
return loss
|
|
|
|
def training_step(self, batch: Batch, _):
|
|
x, y, c, edge_index, edge_attr = self._preprocess_batch(batch)
|
|
# x = self._impose_bc(x, batch)
|
|
# for _ in range(self.unrolling_steps):
|
|
y_pred = self(
|
|
x,
|
|
c,
|
|
batch.boundary_values,
|
|
batch.boundary_mask,
|
|
edge_index=edge_index,
|
|
edge_attr=edge_attr,
|
|
unrolling_steps=self.unrolling_steps,
|
|
)
|
|
# x = self._impose_bc(x, batch)
|
|
loss = self.loss(y_pred, y)
|
|
self._log_loss(loss, batch, "train")
|
|
return loss
|
|
|
|
def validation_step(self, batch: Batch, _):
|
|
x, y, c, edge_index, edge_attr = self._preprocess_batch(batch)
|
|
y_pred = self(
|
|
x,
|
|
c,
|
|
batch.boundary_values,
|
|
batch.boundary_mask,
|
|
edge_index=edge_index,
|
|
edge_attr=edge_attr,
|
|
unrolling_steps=self.unrolling_steps,
|
|
)
|
|
loss = self.loss(y_pred, y)
|
|
self._log_loss(loss, batch, "val")
|
|
return loss
|
|
|
|
def test_step(self, batch: Batch, _):
|
|
x, y, c, edge_index, edge_attr = self._preprocess_batch(batch)
|
|
# for _ in range(self.unrolling_steps):
|
|
y_pred = self.model(
|
|
x,
|
|
c,
|
|
batch.boundary_values,
|
|
batch.boundary_mask,
|
|
edge_index=edge_index,
|
|
edge_attr=edge_attr,
|
|
unrolling_steps=self.unrolling_steps,
|
|
plot_results=True,
|
|
batch=batch.batch,
|
|
pos=batch.pos,
|
|
)
|
|
# x = self._impose_bc(x, batch)
|
|
# plot_results(x, batch.pos, self.global_step, _, batch.batch)
|
|
loss = self._compute_loss(y_pred, y)
|
|
self._log_loss(loss, batch, "test")
|
|
return loss
|
|
|
|
def configure_optimizers(self):
|
|
optimizer = torch.optim.Adam(self.parameters(), lr=1e-3)
|
|
return optimizer
|
|
|
|
def _impose_bc(self, x: torch.Tensor, data: Batch):
|
|
x[data.boundary_mask] = data.boundary_values
|
|
return x
|