Add functionalities in DataModule and data loaders + tests datasets and DataModule (#453)

* Add num_workers and pin_memory arguments to DataLoader and DataModule tests
This commit is contained in:
Filippo Olivo
2025-02-18 09:10:23 +01:00
committed by Nicola Demo
parent 9cae9a438f
commit 571ef7f9e2
5 changed files with 455 additions and 29 deletions

View File

@@ -1,4 +1,5 @@
import logging
import warnings
from lightning.pytorch import LightningDataModule
import torch
from ..label_tensor import LabelTensor
@@ -8,6 +9,7 @@ from torch.utils.data.distributed import DistributedSampler
from .dataset import PinaDatasetFactory
from ..collector import Collector
class DummyDataloader:
""""
Dummy dataloader used when batch size is None. It callects all the data
@@ -57,7 +59,7 @@ class Collator:
self.max_conditions_lengths = max_conditions_lengths
self.callable_function = self._collate_custom_dataloader if \
max_conditions_lengths is None else (
self._collate_standard_dataloader)
self._collate_standard_dataloader)
self.dataset = dataset
def _collate_custom_dataloader(self, batch):
@@ -95,7 +97,7 @@ class Collator:
class PinaSampler:
def __new__(self, dataset, batch_size, shuffle, automatic_batching):
def __new__(cls, dataset, shuffle):
if (torch.distributed.is_available() and
torch.distributed.is_initialized()):
@@ -123,15 +125,35 @@ class PinaDataModule(LightningDataModule):
batch_size=None,
shuffle=True,
repeat=False,
automatic_batching=False
automatic_batching=False,
num_workers=0,
pin_memory=False,
):
"""
Initialize the object, creating dataset based on input problem
:param problem: Problem where data are defined
:param train_size: number/percentage of elements in train split
:param test_size: number/percentage of elements in test split
:param val_size: number/percentage of elements in evaluation split
:param batch_size: batch size used for training
Initialize the object, creating datasets based on the input problem.
:param problem: The problem defining the dataset.
:type problem: AbstractProblem
:param train_size: Fraction or number of elements in the training split.
:type train_size: float
:param test_size: Fraction or number of elements in the test split.
:type test_size: float
:param val_size: Fraction or number of elements in the validation split.
:type val_size: float
:param predict_size: Fraction or number of elements in the prediction split.
:type predict_size: float
:param batch_size: Batch size used for training. If None, the entire dataset is used per batch.
:type batch_size: int or None
:param shuffle: Whether to shuffle the dataset before splitting.
:type shuffle: bool
:param repeat: Whether to repeat the dataset indefinitely.
:type repeat: bool
:param automatic_batching: Whether to enable automatic batching.
:type automatic_batching: bool
:param num_workers: Number of worker threads for data loading. Default 0 (serial loading)
:type num_workers: int
:param pin_memory: Whether to use pinned memory for faster data transfer to GPU. (Default False)
:type pin_memory: bool
"""
logging.debug('Start initialization of Pina DataModule')
logging.info('Start initialization of Pina DataModule')
@@ -170,6 +192,15 @@ class PinaDataModule(LightningDataModule):
collector = Collector(problem)
collector.store_fixed_data()
collector.store_sample_domains()
if batch_size is None and num_workers != 0:
warnings.warn(
"Setting num_workers when batch_size is None has no effect on "
"the DataLoading process.")
if batch_size is None and pin_memory:
warnings.warn("Setting pin_memory to True has no effect when "
"batch_size is None.")
self.num_workers = num_workers
self.pin_memory = pin_memory
self.collector_splits = self._create_splits(collector, splits_dict)
self.transfer_batch_to_device = self._transfer_batch_to_device
@@ -271,20 +302,27 @@ class PinaDataModule(LightningDataModule):
dataset_dict[key].update({condition_name: data})
return dataset_dict
def _create_dataloader(self, split, dataset):
shuffle = self.shuffle if split == 'train' else False
# Suppress the warning about num_workers.
# In many cases, especially for PINNs, serial data loading can outperform parallel data loading.
warnings.filterwarnings(
"ignore",
message=(
r"The '(train|val|test)_dataloader' does not have many workers which may be a bottleneck."),
module="lightning.pytorch.trainer.connectors.data_connector"
)
# Use custom batching (good if batch size is large)
if self.batch_size is not None:
sampler = PinaSampler(dataset, self.batch_size,
shuffle, self.automatic_batching)
sampler = PinaSampler(dataset, shuffle)
if self.automatic_batching:
collate = Collator(self.find_max_conditions_lengths(split))
else:
collate = Collator(None, dataset)
return DataLoader(dataset, self.batch_size,
collate_fn=collate, sampler=sampler)
collate_fn=collate, sampler=sampler,
num_workers=self.num_workers)
dataloader = DummyDataloader(dataset)
dataloader.dataset = self._transfer_batch_to_device(
dataloader.dataset, self.trainer.strategy.root_device, 0)

View File

@@ -18,6 +18,8 @@ class Trainer(lightning.pytorch.Trainer):
predict_size=0.,
compile=None,
automatic_batching=None,
num_workers=None,
pin_memory=None,
**kwargs):
"""
PINA Trainer class for costumizing every aspect of training via flags.
@@ -44,6 +46,10 @@ class Trainer(lightning.pytorch.Trainer):
performed. Please avoid using automatic batching when batch_size is
large, default False.
:type automatic_batching: bool
:param num_workers: Number of worker threads for data loading. Default 0 (serial loading)
:type num_workers: int
:param pin_memory: Whether to use pinned memory for faster data transfer to GPU. (Default False)
:type pin_memory: bool
:Keyword Arguments:
The additional keyword arguments specify the training setup
@@ -60,6 +66,14 @@ class Trainer(lightning.pytorch.Trainer):
check_consistency(automatic_batching, bool)
if compile is not None:
check_consistency(compile, bool)
if pin_memory is not None:
check_consistency(pin_memory, bool)
else:
pin_memory = False
if num_workers is not None:
check_consistency(pin_memory, int)
else:
num_workers = 0
if train_size + test_size + val_size + predict_size > 1:
raise ValueError('train_size, test_size, val_size and predict_size '
'must sum up to 1.')
@@ -93,19 +107,16 @@ class Trainer(lightning.pytorch.Trainer):
compile = False
if automatic_batching is None:
automatic_batching = False
# set attributes
self.compile = compile
self.automatic_batching = automatic_batching
self.train_size = train_size
self.test_size = test_size
self.val_size = val_size
self.predict_size = predict_size
self.solver = solver
self.batch_size = batch_size
self._move_to_device()
self.data_module = None
self._create_loader()
self._create_datamodule(train_size, test_size, val_size, predict_size,
batch_size, automatic_batching, pin_memory,
num_workers)
# logging
self.logging_kwargs = {
@@ -127,7 +138,15 @@ class Trainer(lightning.pytorch.Trainer):
pb.unknown_parameters[key] = torch.nn.Parameter(
pb.unknown_parameters[key].data.to(device))
def _create_loader(self):
def _create_datamodule(self,
train_size,
test_size,
val_size,
predict_size,
batch_size,
automatic_batching,
pin_memory,
num_workers):
"""
This method is used here because is resampling is needed
during training, there is no need to define to touch the
@@ -136,8 +155,8 @@ class Trainer(lightning.pytorch.Trainer):
if not self.solver.problem.are_all_domains_discretised:
error_message = '\n'.join([
f"""{" " * 13} ---> Domain {key} {
"sampled" if key in self.solver.problem.discretised_domains else
"not sampled"}""" for key in
"sampled" if key in self.solver.problem.discretised_domains else
"not sampled"}""" for key in
self.solver.problem.domains.keys()
])
raise RuntimeError('Cannot create Trainer if not all conditions '
@@ -145,12 +164,14 @@ class Trainer(lightning.pytorch.Trainer):
f'{error_message}')
self.data_module = PinaDataModule(
self.solver.problem,
train_size=self.train_size,
test_size=self.test_size,
val_size=self.val_size,
predict_size=self.predict_size,
batch_size=self.batch_size,
automatic_batching=self.automatic_batching)
train_size=train_size,
test_size=test_size,
val_size=val_size,
predict_size=predict_size,
batch_size=batch_size,
automatic_batching=automatic_batching,
num_workers=num_workers,
pin_memory=pin_memory)
def train(self, **kwargs):
"""