Files
thermal-conduction-ml/ThermalSolver/graph_datamodule_unsteady.py
FilippoOlivo 54bebf7154 fix model
2025-12-01 14:55:13 +01:00

267 lines
9.5 KiB
Python

import torch
from tqdm import tqdm
from lightning import LightningDataModule
from datasets import load_dataset
from torch_geometric.data import Data
from torch_geometric.loader import DataLoader
from torch_geometric.utils import to_undirected
from .mesh_data import MeshData
# from torch.utils.data import Dataset
from torch_geometric.utils import scatter
def compute_nodal_area(edge_index, edge_attr, num_nodes):
"""
1. Calculates Area ~ (Min Edge Length)^2
2. Scales by Mean so average cell has size 1.0
"""
row, col = edge_index
dist = edge_attr.squeeze()
# 1. Get 'h' (Closest neighbor distance)
# Using 'min' filters out diagonal connections in the quad mesh
h = scatter(dist, col, dim=0, dim_size=num_nodes, reduce="min")
# 2. Estimate Raw Area
raw_area = h.pow(2)
# 3. Mean Scaling (The Best Normalization)
# This keeps values near 1.0, preserving stability AND physics ratios.
# We detach to ensure no gradients flow here (it's static data).
mean_val = raw_area.mean().detach()
# Result:
# Small cells -> approx 0.1
# Large cells -> approx 5.0
# Average -> 1.0
# nodal_area = (raw_area / mean_val).unsqueeze(-1) + 1e-6
nodal_area = raw_area
return nodal_area.unsqueeze(-1)
class GraphDataModule(LightningDataModule):
def __init__(
self,
hf_repo: str,
split_name: str,
train_size: float = 0.2,
val_size: float = 0.1,
test_size: float = 0.1,
batch_size: int = 32,
remove_boundary_edges: bool = False,
build_radial_graph: bool = False,
radius: float = None,
start_unrolling_steps: int = 1,
):
super().__init__()
self.hf_repo = hf_repo
self.split_name = split_name
self.dataset_dict = {}
self.train_dataset, self.val_dataset, self.test_dataset = (
None,
None,
None,
)
self.unrolling_steps = start_unrolling_steps
self.geometry_dict = {}
self.train_size = train_size
self.val_size = val_size
self.test_size = test_size
self.batch_size = batch_size
self.remove_boundary_edges = remove_boundary_edges
self.build_radial_graph = build_radial_graph
self.radius = radius
def prepare_data(self):
dataset = load_dataset(self.hf_repo, name="snapshots")[self.split_name]
geometry = load_dataset(self.hf_repo, name="geometry")[self.split_name]
total_len = len(dataset)
train_len = int(self.train_size * total_len)
valid_len = int(self.val_size * total_len)
self.dataset_dict = {
"train": dataset.select(range(0, train_len)),
"val": dataset.select(range(train_len, train_len + valid_len)),
"test": dataset.select(range(train_len + valid_len, total_len)),
}
self.geometry_dict = {
"train": geometry.select(range(0, train_len)),
"val": geometry.select(range(train_len, train_len + valid_len)),
"test": geometry.select(range(train_len + valid_len, total_len)),
}
def _compute_boundary_mask(
self, bottom_ids, right_ids, top_ids, left_ids, temperature
):
left_ids = left_ids[~torch.isin(left_ids, bottom_ids)]
right_ids = right_ids[~torch.isin(right_ids, bottom_ids)]
left_ids = left_ids[~torch.isin(left_ids, top_ids)]
right_ids = right_ids[~torch.isin(right_ids, top_ids)]
bottom_bc = temperature[bottom_ids].median()
bottom_bc_mask = torch.ones(len(bottom_ids)) * bottom_bc
left_bc = temperature[left_ids].median()
left_bc_mask = torch.ones(len(left_ids)) * left_bc
right_bc = temperature[right_ids].median()
right_bc_mask = torch.ones(len(right_ids)) * right_bc
boundary_values = torch.cat(
[bottom_bc_mask, right_bc_mask, left_bc_mask], dim=0
)
boundary_mask = torch.cat([bottom_ids, right_ids, left_ids], dim=0)
return boundary_mask, boundary_values
def _build_dataset(
self,
snapshot: dict,
geometry: dict,
) -> Data:
conductivity = torch.tensor(
geometry["conductivity"], dtype=torch.float32
)
temperatures = torch.tensor(
snapshot["temperatures"], dtype=torch.float32
)[:40]
times = torch.tensor(snapshot["times"], dtype=torch.float32)
pos = torch.tensor(geometry["points"], dtype=torch.float32)[:, :2]
bottom_ids = torch.tensor(
geometry["bottom_boundary_ids"], dtype=torch.long
)
top_ids = torch.tensor(geometry["top_boundary_ids"], dtype=torch.long)
left_ids = torch.tensor(geometry["left_boundary_ids"], dtype=torch.long)
right_ids = torch.tensor(
geometry["right_boundary_ids"], dtype=torch.long
)
if self.build_radial_graph:
# from pina.graph import RadiusGraph
# if self.radius is None:
# raise ValueError("Radius must be specified for radial graph.")
# edge_index = RadiusGraph.compute_radius_graph(
# pos, radius=self.radius
# )
# from torch_geometric.utils import remove_self_loops
# edge_index, _ = remove_self_loops(edge_index)
raise NotImplementedError(
"Radial graph building not implemented yet."
)
else:
edge_index = torch.tensor(
geometry["edge_index"], dtype=torch.int64
).T
edge_index = to_undirected(edge_index, num_nodes=pos.size(0))
boundary_mask, boundary_values = self._compute_boundary_mask(
bottom_ids, right_ids, top_ids, left_ids, temperatures[0, :]
)
edge_attr = torch.norm(pos[edge_index[0]] - pos[edge_index[1]], dim=1)
nodal_area = compute_nodal_area(edge_index, edge_attr, pos.size(0))
if self.remove_boundary_edges:
boundary_idx = torch.unique(boundary_mask)
edge_index_mask = ~torch.isin(edge_index[1], boundary_idx)
edge_index = edge_index[:, edge_index_mask]
edge_attr = edge_attr[edge_index_mask]
n_data = temperatures.size(0) - self.unrolling_steps
data = []
for i in range(n_data):
x = temperatures[i, :].unsqueeze(-1)
y = (
temperatures[i + 1 : i + 1 + self.unrolling_steps, :]
.unsqueeze(-1)
.permute(1, 0, 2)
)
data.append(
MeshData(
x=x,
y=y,
c=conductivity.unsqueeze(-1),
edge_index=edge_index,
pos=pos,
edge_attr=edge_attr,
boundary_mask=boundary_mask,
boundary_values=boundary_values,
nodal_area=nodal_area,
)
)
return data
def setup(self, stage: str = None):
if stage == "fit" or stage is None:
self.train_data = [
self._build_dataset(snap, geom)
for snap, geom in tqdm(
zip(
self.dataset_dict["train"], self.geometry_dict["train"]
),
desc="Building train graphs",
total=len(self.dataset_dict["train"]),
)
]
self.val_data = [
self._build_dataset(snap, geom)
for snap, geom in tqdm(
zip(self.dataset_dict["val"], self.geometry_dict["val"]),
desc="Building val graphs",
total=len(self.dataset_dict["val"]),
)
]
if stage == "test" or stage is None:
self.test_data = [
self._build_dataset(snap, geom)
for snap, geom in tqdm(
zip(self.dataset_dict["test"], self.geometry_dict["test"]),
desc="Building test graphs",
total=len(self.dataset_dict["test"]),
)
]
# def create_autoregressive_datasets(self, dataset: str, no_unrolling: bool = False):
# if dataset == "train":
# return AutoregressiveDataset(self.train_data, self.unrolling_steps, no_unrolling)
# if dataset == "val":
# return AutoregressiveDataset(self.val_data, self.unrolling_steps, no_unrolling)
# if dataset == "test":
# return AutoregressiveDataset(self.test_data, self.unrolling_steps, no_unrolling)
def train_dataloader(self):
# ds = self.create_autoregressive_datasets(dataset="train")
# self.train_dataset = ds
# print(type(self.train_data[0]))
ds = [i for data in self.train_data for i in data]
# print(type(ds[0]))
return DataLoader(
ds,
batch_size=self.batch_size,
shuffle=True,
num_workers=8,
pin_memory=True,
)
def val_dataloader(self):
ds = [i for data in self.val_data for i in data]
return DataLoader(
ds,
batch_size=128,
shuffle=False,
num_workers=8,
pin_memory=True,
)
def test_dataloader(self):
ds = self.create_autoregressive_datasets(
dataset="test", no_unrolling=True
)
return DataLoader(
ds,
batch_size=self.batch_size,
shuffle=False,
num_workers=8,
pin_memory=True,
)