Add pointnet

This commit is contained in:
2025-10-16 15:20:58 +02:00
parent 81455d789c
commit 8f23a8af66
4 changed files with 837 additions and 1 deletions

View File

@@ -24,7 +24,6 @@ class GraphDataModule(LightningDataModule):
self.hf_repo = hf_repo
self.split_name = split_name
self.dataset_dict = {}
# self.geometry = None
self.geometry_dict = {}
self.train_size = train_size
self.val_size = val_size

View File

@@ -0,0 +1,574 @@
import torch
import torch.nn as nn
class ResidualBlock(nn.Module):
"""Residual block base class. Implementation of a residual block.
Reference: https://arxiv.org/pdf/1512.03385.pdf : Equation #2
"""
def __init__(self, input_dim, output_dim, hidden_dim, spectral_norm=False):
"""Residual block constructor
:param input_dim: dimension of the input
:type input_dim: int
:param output_dim: dimension of the output
:type output_dim: int
:param hidden_dim: hidden dimension for mapping the input (first block)
:type hidden_dim: int
:param spectral_norm: apply spectral normalization, defaults to False
:type spectral_norm: bool, optional
"""
super().__init__()
self._spectral_norm = spectral_norm
self._input_dim = input_dim
self._output_dim = output_dim
self._hidden_dim = hidden_dim
self.l1 = self._spect_norm(nn.Linear(input_dim, hidden_dim))
self.l2 = self._spect_norm(nn.Linear(hidden_dim, output_dim))
self.l3 = self._spect_norm(nn.Linear(input_dim, output_dim))
def forward(self, x):
y = self.activation(self.l1(x))
y = self.l2(y)
x = self.l3(x)
return y + x
def _spect_norm(self, x):
return nn.utils.spectral_norm(x) if self._spectral_norm else x
@property
def spectral_norm(self):
return self._spectral_norm
@property
def input_dim(self):
return self._input_dim
@property
def output_dim(self):
return self._output_dim
@property
def hidden_dim(self):
return self._hidden_dim
class MLP(torch.nn.Module):
"""Multi-layer Perceptron base class"""
def __init__(
self,
input_dim,
output_dim,
inner_size=20,
n_layers=2,
func=nn.Tanh,
layers=None,
batch_norm=False,
spectral_norm=False,
):
"""Deep neural network model
:param input_dim: input channel for the network
:type input_dim: int
:param output_dim: output channel for the network
:type output_dim: int
:param inner_size: inner size of each hidden layer, defaults to 20
:type inner_size: int, optional
:param n_layers: number of layers in the network, defaults to 2
:type n_layers: int, optional
:param func: function(s) to pass to the network, defaults to nn.Tanh
:type func: (list of) torch.nn function(s), optional
:param layers: list of layers for the network, defaults to None
:type layers: list[int], optional
:param batch_norm: apply batch normalization layer
:type bool, default False
:param spectral_norm: apply spectral normalization layer
:type bool, default False
"""
super().__init__()
self._input_dim = input_dim
self._output_dim = output_dim
self._inner_size = inner_size
self._n_layers = n_layers
self._layers = layers
self._bnorm = batch_norm
self._spectnorm = spectral_norm
if layers is None:
layers = [inner_size] * n_layers
tmp_layers = layers.copy()
tmp_layers.insert(0, self._input_dim)
tmp_layers.append(self._output_dim)
self._layers = []
self._batchnorm = []
for i in range(len(tmp_layers) - 1):
self._layers.append(
self.spect_norm(nn.Linear(tmp_layers[i], tmp_layers[i + 1]))
)
self._batchnorm.append(nn.LazyBatchNorm1d())
if isinstance(func, list):
self._functions = func
else:
self._functions = [func for _ in range(len(self._layers) - 1)]
unique_list = []
for layer, func, bnorm in zip(
self._layers[:-1], self._functions, self._batchnorm
):
unique_list.append(layer)
if func is not None:
if batch_norm:
unique_list.append(bnorm)
unique_list.append(func())
unique_list.append(self._layers[-1])
self.model = nn.Sequential(*unique_list)
def spect_norm(self, x):
return nn.utils.spectral_norm(x) if self._spectnorm else x
def forward(self, x):
"""Forward method for NeuralNet class
:param x: network input data
:type x: torch.tensor
:return: network output
:rtype: torch.tensor
"""
return self.model(x)
@property
def input_dim(self):
return self._input_dim
@property
def output_dim(self):
return self._output_dim
@property
def inner_size(self):
return self._inner_size
@property
def n_layers(self):
return self._n_layers
@property
def functions(self):
return self._functions
@property
def layers(self):
return self._layers
class TNet(nn.Module):
"""T-Net base class. Implementation of T-Network.
Reference: Charles R. Qi et al. https://arxiv.org/pdf/1612.00593.pdf
"""
def __init__(self, input_dim):
"""T-Net block constructor
:param input_dim: input dimension of point cloud
:type input_dim: int
"""
super().__init__()
function = nn.Tanh
self._mlp1 = MLP(
input_dim=input_dim,
output_dim=1024,
layers=[64, 128],
func=function,
batch_norm=True,
)
self._mlp2 = MLP(
input_dim=1024,
output_dim=input_dim * input_dim,
layers=[512, 256],
func=function,
batch_norm=True,
)
self._function = function()
self._bn1 = nn.LazyBatchNorm1d()
def forward(self, X):
"""Forward pass for T-Net
:param X: input tensor, shape [batch, N, $input_{dim}$]
with batch the batch size, N number of points and $input_{dim}$
the input dimension of the point cloud.
:type X: torch.tensor
:return: output affine matrix transformation, shape
[batch, $input_{dim} \times input_{dim}$] with batch
the batch size and $input_{dim}$ the input dimension
of the point cloud.
:rtype: torch.tensor
"""
batch, input_dim = X.shape[0], X.shape[2]
# encoding using first MLP
X = self._mlp1(X)
X = self._function(self._bn1(X))
# applying symmetric function to aggregate information (using max as default)
X, _ = torch.max(X, dim=1)
# decoding using third MLP
X = self._mlp2(X)
return X.reshape(batch, input_dim, input_dim)
class PointNet(nn.Module):
"""Point-Net base class. Implementation of Point Network for segmentation.
Reference: Charles R. Qi et al. https://arxiv.org/pdf/1612.00593.pdf
"""
def __init__(self, input_dim, output_dim, tnet=False):
"""Point-Net block constructor
:param input_dim: input dimension of point cloud
:type input_dim: int
:param output_dim: output dimension of point cloud
:type output_dim: int
:param tnet: apply T-Net transformation, defaults to False
:type tnet: bool, optional
"""
super().__init__()
function = nn.Tanh
self._use_tnet = tnet
self._mlp1 = MLP(
input_dim=input_dim,
output_dim=64,
inner_size=64,
n_layers=1,
func=function,
batch_norm=True,
)
self._mlp2 = MLP(
input_dim=64,
output_dim=1024,
inner_size=128,
n_layers=1,
func=function,
batch_norm=True,
)
self._mlp3 = MLP(
input_dim=1088,
output_dim=128,
layers=[512, 256],
func=function,
batch_norm=True,
)
self._mlp4 = MLP(
input_dim=128,
output_dim=output_dim,
n_layers=0,
func=function,
batch_norm=True,
)
if self._use_tnet:
self._tnet_transform = TNet(input_dim=input_dim)
self._tnet_feature = TNet(input_dim=64)
self._function = function()
self._bn1 = nn.LazyBatchNorm1d()
self._bn2 = nn.LazyBatchNorm1d()
self._bn3 = nn.LazyBatchNorm1d()
def concat(self, embedding, input_):
"""Returns concatenation of global and local features for Point-Net
:param embedding: global features of Point-Net, shape [batch, $input_{dim}$]
with batch the batch size and $input_{dim}$ the input dimension
of the point cloud.
:type embedding: torch.tensor
:param input_: local features of Point-Net, shape [batch, N, $input_{dim}$]
with batch the batch size, N number of points and $input_{dim}$
the input dimension of the point cloud.
:type input_: torch.tensor
:return: concatenation vector, shape [batch, N, $input_{dim}$]
with batch the batch size, N number of points and $input_{dim}$
:rtype: torch.tensor
"""
n_points = input_.shape[1]
embedding = embedding.repeat(n_points, 1, 1).permute(1, 0, 2)
return torch.cat([embedding, input_], dim=2)
def forward(self, X):
"""Forward pass for Point-Net
:param X: input tensor, shape [batch, N, $input_{dim}$]
with batch the batch size, N number of points and $input_{dim}$
the input dimension of the point cloud.
:type X: torch.tensor
:return: segmentation vector, shape [batch, N, $output_{dim}$]
with batch the batch size, N number of points and $output_{dim}$
the output dimension of the point cloud.
:rtype: torch.tensor
"""
# using transform tnet if needed
if self._use_tnet:
transform = self._tnet_transform(X)
X = torch.matmul(X, transform)
# encoding using first MLP
X = self._mlp1(X)
X = self._function(self._bn1(X))
# using transform tnet if needed
if self._use_tnet:
transform = self._tnet_feature(X)
X = torch.matmul(X, transform)
# saving latent representation for later concatanation
latent = X
# encoding using second MLP
X = self._mlp2(X)
X = self._function(self._bn2(X))
# applying symmetric function to aggregate information (using max as default)
X, _ = torch.max(X, dim=1)
# concatenating with latent vector
X = self.concat(X, latent)
# decoding using third MLP
X = self._mlp3(X)
X = self._function(self._bn3(X))
# decoding using fourth MLP
X = self._mlp4(X)
return X
class ConvTNet(nn.Module):
"""T-Net base class. Implementation of T-Network with convolutional layers.
Reference: Ali Kashefi et al. https://arxiv.org/abs/2208.13434
"""
def __init__(self, input_dim):
"""T-Net block constructor
:param input_dim: input dimension of point cloud
:type input_dim: int
"""
super().__init__()
function = nn.Tanh
self._function = function()
self._block1 = nn.Sequential(
nn.Conv1d(input_dim, 64, 1),
nn.BatchNorm1d(64),
self._function,
nn.Conv1d(64, 128, 1),
nn.BatchNorm1d(128),
self._function,
nn.Conv1d(128, 1024, 1),
nn.BatchNorm1d(1024),
self._function,
)
self._block2 = MLP(
input_dim=1024,
output_dim=input_dim * input_dim,
layers=[512, 256],
func=function,
batch_norm=True,
)
def forward(self, X):
"""Forward pass for T-Net
:param X: input tensor, shape [batch, $input_{dim}$, N]
with batch the batch size, N number of points and $input_{dim}$
the input dimension of the point cloud.
:type X: torch.tensor
:return: output affine matrix transformation, shape
[batch, $input_{dim} \times input_{dim}$] with batch
the batch size and $input_{dim}$ the input dimension
of the point cloud.
:rtype: torch.tensor
"""
batch, input_dim = X.shape[0], X.shape[1]
# encoding using first MLP
X = self._block1(X)
# applying symmetric function to aggregate information (using max as default)
X, _ = torch.max(X, dim=-1)
# decoding using third MLP
X = self._block2(X)
return X.reshape(batch, input_dim, input_dim)
class ConvPointNet(nn.Module):
"""Point-Net base class. Implementation of Point Network for segmentation.
Reference: Ali Kashefi et al. https://arxiv.org/abs/2208.13434
"""
def __init__(self, input_dim, output_dim, tnet=False):
"""Point-Net block constructor
:param input_dim: input dimension of point cloud
:type input_dim: int
:param output_dim: output dimension of point cloud
:type output_dim: int
:param tnet: apply T-Net transformation, defaults to False
:type tnet: bool, optional
"""
super().__init__()
self._function = nn.Tanh()
self._use_tnet = tnet
self._block1 = nn.Sequential(
nn.Conv1d(input_dim, 64, 1),
nn.BatchNorm1d(64),
self._function,
nn.Conv1d(64, 64, 1),
nn.BatchNorm1d(64),
self._function,
)
self._block2 = nn.Sequential(
nn.Conv1d(64, 64, 1),
nn.BatchNorm1d(64),
self._function,
nn.Conv1d(64, 128, 1),
nn.BatchNorm1d(128),
self._function,
nn.Conv1d(128, 1024, 1),
nn.BatchNorm1d(1024),
self._function,
)
self._block3 = nn.Sequential(
nn.Conv1d(1088, 512, 1),
nn.BatchNorm1d(512),
self._function,
nn.Conv1d(512, 256, 1),
nn.BatchNorm1d(256),
self._function,
nn.Conv1d(256, 128, 1),
nn.BatchNorm1d(128),
self._function,
)
self._block4 = nn.Conv1d(128, output_dim, 1)
if self._use_tnet:
self._tnet_transform = ConvTNet(input_dim=input_dim)
self._tnet_feature = ConvTNet(input_dim=64)
def concat(self, embedding, input_):
"""
Returns concatenation of global and local features for Point-Net
:param embedding: global features of Point-Net, shape [batch, $input_{dim}$]
with batch the batch size and $input_{dim}$ the input dimension
of the point cloud.
:type embedding: torch.tensor
:param input_: local features of Point-Net, shape [batch, N, $input_{dim}$]
with batch the batch size, N number of points and $input_{dim}$
the input dimension of the point cloud.
:type input_: torch.tensor
:return: concatenation vector, shape [batch, N, $input_{dim}$]
with batch the batch size, N number of points and $input_{dim}$
:rtype: torch.tensor
"""
n_points = input_.shape[-1]
embedding = embedding.unsqueeze(2).repeat(1, 1, n_points)
return torch.cat([embedding, input_], dim=1)
def forward(self, X):
"""Forward pass for Point-Net
:param X: input tensor, shape [batch, N, $input_{dim}$]
with batch the batch size, N number of points and $input_{dim}$
the input dimension of the point cloud.
:type X: torch.tensor
:return: segmentation vector, shape [batch, N, $output_{dim}$]
with batch the batch size, N number of points and $output_{dim}$
the output dimension of the point cloud.
:rtype: torch.tensor
"""
# permuting indeces
X = X.permute(0, 2, 1)
# using transform tnet if needed
if self._use_tnet:
transform = self._tnet_transform(X)
X = X.transpose(2, 1)
X = torch.matmul(X, transform)
X = X.transpose(2, 1)
# encoding using first MLP
X = self._block1(X)
# using transform tnet if needed
if self._use_tnet:
transform = self._tnet_feature(X)
X = X.transpose(2, 1)
X = torch.matmul(X, transform)
X = X.transpose(2, 1)
# saving latent representation for later concatanation
latent = X
# encoding using second MLP
X = self._block2(X)
# applying symmetric function to aggregate information (using max as default)
X, _ = torch.max(X, dim=-1)
# concatenating with latent vector
X = self.concat(X, latent)
# decoding using third MLP
X = self._block3(X)
# decoding using fourth MLP
X = self._block4(X)
# permuting indeces
X = X.permute(0, 2, 1)
return X

View File

@@ -0,0 +1,171 @@
import torch
from tqdm import tqdm
from lightning import LightningDataModule
from datasets import load_dataset
import os
from torch.utils.data import DataLoader, TensorDataset
from torch.nn.utils.rnn import pad_sequence
class PointDataModule(LightningDataModule):
def __init__(
self,
hf_repo: str,
split_name: str,
train_size: float = 0.2,
val_size: float = 0.1,
test_size: float = 0.1,
batch_size: int = 32,
remove_boundary_edges: bool = True,
):
super().__init__()
self.hf_repo = hf_repo
self.split_name = split_name
self.dataset_dict = {}
self.geometry_dict = {}
self.train_size = train_size
self.val_size = val_size
self.test_size = test_size
self.batch_size = batch_size
self.remove_boundary_edges = remove_boundary_edges
def prepare_data(self):
dataset = load_dataset(self.hf_repo, name="snapshots")[self.split_name]
geometry = load_dataset(self.hf_repo, name="geometry")[self.split_name]
total_len = len(dataset)
train_len = int(self.train_size * total_len)
valid_len = int(self.val_size * total_len)
self.dataset_dict = {
"train": dataset.select(range(0, train_len)),
"val": dataset.select(range(train_len, train_len + valid_len)),
"test": dataset.select(range(train_len + valid_len, total_len)),
}
self.geometry_dict = {
"train": geometry.select(range(0, train_len)),
"val": geometry.select(range(train_len, train_len + valid_len)),
"test": geometry.select(range(train_len + valid_len, total_len)),
}
def _compute_boundary_mask(
self, bottom_ids, right_ids, top_ids, left_ids, temperature
):
left_ids = left_ids[~torch.isin(left_ids, bottom_ids)]
right_ids = right_ids[~torch.isin(right_ids, bottom_ids)]
left_ids = left_ids[~torch.isin(left_ids, top_ids)]
right_ids = right_ids[~torch.isin(right_ids, top_ids)]
bottom_bc = temperature[bottom_ids].median()
bottom_bc_mask = torch.ones(len(bottom_ids)) * bottom_bc
left_bc = temperature[left_ids].median()
left_bc_mask = torch.ones(len(left_ids)) * left_bc
right_bc = temperature[right_ids].median()
right_bc_mask = torch.ones(len(right_ids)) * right_bc
boundary_values = torch.cat(
[bottom_bc_mask, right_bc_mask, left_bc_mask], dim=0
)
boundary_mask = torch.cat([bottom_ids, right_ids, left_ids], dim=0)
return boundary_mask, boundary_values
def _build_dataset(
self,
snapshot: dict,
geometry: dict,
) -> tuple[torch.Tensor, torch.Tensor]:
conductivity = torch.tensor(
snapshot["conductivity"], dtype=torch.float32
)
temperature = torch.tensor(snapshot["temperature"], dtype=torch.float32)
pos = torch.tensor(geometry["points"], dtype=torch.float32)[:, :2]
bottom_ids = torch.tensor(
geometry["bottom_boundary_ids"], dtype=torch.long
)
top_ids = torch.tensor(geometry["top_boundary_ids"], dtype=torch.long)
left_ids = torch.tensor(geometry["left_boundary_ids"], dtype=torch.long)
right_ids = torch.tensor(
geometry["right_boundary_ids"], dtype=torch.long
)
boundary_mask, boundary_values = self._compute_boundary_mask(
bottom_ids, right_ids, top_ids, left_ids, temperature
)
x = torch.zeros_like(temperature, dtype=torch.float32).unsqueeze(-1)
x[boundary_mask] = boundary_values.unsqueeze(-1)
x = torch.cat([x, conductivity.unsqueeze(-1), pos], dim=-1)
return x, temperature.unsqueeze(-1)
def setup(self, stage: str = None):
if stage == "fit" or stage is None:
x = []
y = []
for snap, geom in tqdm(
zip(self.dataset_dict["train"], self.geometry_dict["train"]),
desc="Building train graphs",
total=len(self.dataset_dict["train"]),
):
x_i, y_i = self._build_dataset(snap, geom)
x.append(x_i)
y.append(y_i)
self.train_dataset = TensorDataset(
pad_sequence(x, batch_first=True, padding_value=-1),
pad_sequence(y, batch_first=True, padding_value=-1),
)
for snap, geom in tqdm(
zip(self.dataset_dict["val"], self.geometry_dict["val"]),
desc="Building val graphs",
total=len(self.dataset_dict["val"]),
):
x_i, y_i = self._build_dataset(snap, geom)
x.append(x_i)
y.append(y_i)
self.val_dataset = TensorDataset(
pad_sequence(x, batch_first=True, padding_value=-1),
pad_sequence(y, batch_first=True, padding_value=-1),
)
if stage == "test" or stage is None:
x = []
y = []
for snap, geom in tqdm(
zip(self.dataset_dict["test"], self.geometry_dict["test"]),
desc="Building test graphs",
total=len(self.dataset_dict["test"]),
):
x_i, y_i = self._build_dataset(snap, geom)
x.append(x_i)
y.append(y_i)
self.test_data = TensorDataset(
pad_sequence(x, batch_first=True, padding_value=-1),
pad_sequence(y, batch_first=True, padding_value=-1),
)
def train_dataloader(self):
return DataLoader(
self.train_dataset,
batch_size=self.batch_size,
shuffle=True,
num_workers=8,
pin_memory=True,
)
def val_dataloader(self):
return DataLoader(
self.val_dataset,
batch_size=self.batch_size,
shuffle=False,
num_workers=8,
pin_memory=True,
)
def test_dataloader(self):
return DataLoader(
self.test_data,
batch_size=self.batch_size,
shuffle=False,
num_workers=8,
pin_memory=True,
)

View File

@@ -0,0 +1,92 @@
import torch
from lightning import LightningModule
import importlib
from matplotlib import pyplot as plt
from matplotlib.tri import Triangulation
def _plot_mesh(x, y, y_pred):
x = x[0, ...].detach().cpu()
pos = x[0, ...].detach().cpu()
pos = x[x[:, 0] != -1]
y = y[0, ...].detach().cpu()
y = y[x[:, 0] != -1]
y_pred = y_pred[0, ...].detach().cpu()
y_pred = y_pred[x[:, 0] != -1]
tria = Triangulation(pos[:, 2], pos[:, 3])
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.tricontourf(tria, y.squeeze().numpy(), levels=14)
plt.colorbar()
plt.title("True temperature")
plt.subplot(1, 2, 2)
plt.tricontourf(tria, y_pred.squeeze().numpy(), levels=14)
plt.colorbar()
plt.title("Predicted temperature")
plt.savefig("point_net.png", dpi=300)
def import_class(class_path: str):
module_path, class_name = class_path.rsplit(".", 1) # split last dot
module = importlib.import_module(module_path) # import the module
cls = getattr(module, class_name) # get the class
return cls
class PointSolver(LightningModule):
def __init__(
self,
model_class_path: str,
model_init_args: dict,
loss: torch.nn.Module = None,
):
super().__init__()
self.model = import_class(model_class_path)(**model_init_args)
self.loss = loss if loss is not None else torch.nn.MSELoss()
def forward(
self,
x: torch.Tensor,
):
return self.model(x)
def _compute_loss(self, x, y):
return self.loss(x, y)
def _log_loss(self, loss, batch, stage: str):
self.log(
f"{stage}/loss",
loss,
on_step=False,
on_epoch=True,
prog_bar=True,
batch_size=len(batch),
)
return loss
def training_step(self, batch, _):
x, y = batch
y_pred = self(x)
loss = self.loss(y_pred, y)
self._log_loss(loss, batch, "train")
return loss
def validation_step(self, batch, _):
x, y = batch
y_pred = self(x)
loss = self.loss(y_pred, y)
self._log_loss(loss, batch, "val")
return loss
def test_step(self, batch, _):
x, y = batch
y_pred = self.model(x)
loss = self._compute_loss(y_pred, y)
self._log_loss(loss, batch, "test")
_plot_mesh(x, y, y_pred)
return loss
def configure_optimizers(self):
optimizer = torch.optim.Adam(self.parameters(), lr=1e-3)
return optimizer