Implementation of DataLoader and DataModule (#383)
Refactoring for 0.2 * Data module, data loader and dataset * Refactor LabelTensor * Refactor solvers Co-authored-by: dario-coscia <dariocos99@gmail.com>
This commit is contained in:
committed by
Nicola Demo
parent
dd43c8304c
commit
a27bd35443
@@ -1,17 +1,71 @@
|
||||
"""
|
||||
This module provide basic data management functionalities
|
||||
"""
|
||||
|
||||
import logging
|
||||
from lightning.pytorch import LightningDataModule
|
||||
import math
|
||||
import torch
|
||||
import logging
|
||||
from pytorch_lightning import LightningDataModule
|
||||
from .sample_dataset import SamplePointDataset
|
||||
from .supervised_dataset import SupervisedDataset
|
||||
from .unsupervised_dataset import UnsupervisedDataset
|
||||
from .pina_dataloader import PinaDataLoader
|
||||
from .pina_subset import PinaSubset
|
||||
from ..label_tensor import LabelTensor
|
||||
from torch.utils.data import DataLoader, BatchSampler, SequentialSampler, \
|
||||
RandomSampler
|
||||
from torch.utils.data.distributed import DistributedSampler
|
||||
from .dataset import PinaDatasetFactory
|
||||
|
||||
class Collator:
|
||||
def __init__(self, max_conditions_lengths, ):
|
||||
self.max_conditions_lengths = max_conditions_lengths
|
||||
self.callable_function = self._collate_custom_dataloader if \
|
||||
max_conditions_lengths is None else (
|
||||
self._collate_standard_dataloader)
|
||||
|
||||
@staticmethod
|
||||
def _collate_custom_dataloader(batch):
|
||||
return batch[0]
|
||||
|
||||
def _collate_standard_dataloader(self, batch):
|
||||
"""
|
||||
Function used to collate the batch
|
||||
"""
|
||||
batch_dict = {}
|
||||
if isinstance(batch, dict):
|
||||
return batch
|
||||
conditions_names = batch[0].keys()
|
||||
|
||||
# Condition names
|
||||
for condition_name in conditions_names:
|
||||
single_cond_dict = {}
|
||||
condition_args = batch[0][condition_name].keys()
|
||||
for arg in condition_args:
|
||||
data_list = [batch[idx][condition_name][arg] for idx in range(
|
||||
min(len(batch),
|
||||
self.max_conditions_lengths[condition_name]))]
|
||||
if isinstance(data_list[0], LabelTensor):
|
||||
single_cond_dict[arg] = LabelTensor.stack(data_list)
|
||||
elif isinstance(data_list[0], torch.Tensor):
|
||||
single_cond_dict[arg] = torch.stack(data_list)
|
||||
else:
|
||||
raise NotImplementedError(
|
||||
f"Data type {type(data_list[0])} not supported")
|
||||
batch_dict[condition_name] = single_cond_dict
|
||||
return batch_dict
|
||||
|
||||
def __call__(self, batch):
|
||||
return self.callable_function(batch)
|
||||
|
||||
|
||||
class PinaBatchSampler(BatchSampler):
|
||||
def __init__(self, dataset, batch_size, shuffle, sampler=None):
|
||||
if sampler is None:
|
||||
if (torch.distributed.is_available() and
|
||||
torch.distributed.is_initialized()):
|
||||
rank = torch.distributed.get_rank()
|
||||
world_size = torch.distributed.get_world_size()
|
||||
sampler = DistributedSampler(dataset, shuffle=shuffle,
|
||||
rank=rank, num_replicas=world_size)
|
||||
else:
|
||||
if shuffle:
|
||||
sampler = RandomSampler(dataset)
|
||||
else:
|
||||
sampler = SequentialSampler(dataset)
|
||||
super().__init__(sampler=sampler, batch_size=batch_size,
|
||||
drop_last=False)
|
||||
|
||||
class PinaDataModule(LightningDataModule):
|
||||
"""
|
||||
@@ -20,160 +74,218 @@ class PinaDataModule(LightningDataModule):
|
||||
"""
|
||||
|
||||
def __init__(self,
|
||||
problem,
|
||||
device,
|
||||
collector,
|
||||
train_size=.7,
|
||||
test_size=.1,
|
||||
val_size=.2,
|
||||
test_size=.2,
|
||||
val_size=.1,
|
||||
predict_size=0.,
|
||||
batch_size=None,
|
||||
shuffle=True,
|
||||
datasets=None):
|
||||
repeat=False,
|
||||
automatic_batching=False
|
||||
):
|
||||
"""
|
||||
Initialize the object, creating dataset based on input problem
|
||||
:param AbstractProblem problem: PINA problem
|
||||
:param device: Device used for training and testing
|
||||
:param Collector collector: PINA problem
|
||||
:param train_size: number/percentage of elements in train split
|
||||
:param test_size: number/percentage of elements in test split
|
||||
:param eval_size: number/percentage of elements in evaluation split
|
||||
:param val_size: number/percentage of elements in evaluation split
|
||||
:param batch_size: batch size used for training
|
||||
:param datasets: list of datasets objects
|
||||
"""
|
||||
logging.debug('Start initialization of Pina DataModule')
|
||||
logging.info('Start initialization of Pina DataModule')
|
||||
super().__init__()
|
||||
self.problem = problem
|
||||
self.device = device
|
||||
self.dataset_classes = [
|
||||
SupervisedDataset, UnsupervisedDataset, SamplePointDataset
|
||||
]
|
||||
if datasets is None:
|
||||
self.datasets = None
|
||||
else:
|
||||
self.datasets = datasets
|
||||
|
||||
self.split_length = []
|
||||
self.split_names = []
|
||||
self.loader_functions = {}
|
||||
self.default_batching = automatic_batching
|
||||
self.batch_size = batch_size
|
||||
self.condition_names = problem.collector.conditions_name
|
||||
|
||||
if train_size > 0:
|
||||
self.split_names.append('train')
|
||||
self.split_length.append(train_size)
|
||||
self.loader_functions['train_dataloader'] = lambda: PinaDataLoader(
|
||||
self.splits['train'], self.batch_size, self.condition_names)
|
||||
if test_size > 0:
|
||||
self.split_length.append(test_size)
|
||||
self.split_names.append('test')
|
||||
self.loader_functions['test_dataloader'] = lambda: PinaDataLoader(
|
||||
self.splits['test'], self.batch_size, self.condition_names)
|
||||
if val_size > 0:
|
||||
self.split_length.append(val_size)
|
||||
self.split_names.append('val')
|
||||
self.loader_functions['val_dataloader'] = lambda: PinaDataLoader(
|
||||
self.splits['val'], self.batch_size, self.condition_names)
|
||||
if predict_size > 0:
|
||||
self.split_length.append(predict_size)
|
||||
self.split_names.append('predict')
|
||||
self.loader_functions['predict_dataloader'] = lambda: PinaDataLoader(
|
||||
self.splits['predict'], self.batch_size, self.condition_names)
|
||||
self.splits = {k: {} for k in self.split_names}
|
||||
self.shuffle = shuffle
|
||||
self.repeat = repeat
|
||||
|
||||
for k, v in self.loader_functions.items():
|
||||
setattr(self, k, v)
|
||||
|
||||
def prepare_data(self):
|
||||
if self.datasets is None:
|
||||
self._create_datasets()
|
||||
# Begin Data splitting
|
||||
splits_dict = {}
|
||||
if train_size > 0:
|
||||
splits_dict['train'] = train_size
|
||||
self.train_dataset = None
|
||||
else:
|
||||
self.train_dataloader = super().train_dataloader
|
||||
if test_size > 0:
|
||||
splits_dict['test'] = test_size
|
||||
self.test_dataset = None
|
||||
else:
|
||||
self.test_dataloader = super().test_dataloader
|
||||
if val_size > 0:
|
||||
splits_dict['val'] = val_size
|
||||
self.val_dataset = None
|
||||
else:
|
||||
self.val_dataloader = super().val_dataloader
|
||||
if predict_size > 0:
|
||||
splits_dict['predict'] = predict_size
|
||||
self.predict_dataset = None
|
||||
else:
|
||||
self.predict_dataloader = super().predict_dataloader
|
||||
self.collector_splits = self._create_splits(collector, splits_dict)
|
||||
|
||||
def setup(self, stage=None):
|
||||
"""
|
||||
Perform the splitting of the dataset
|
||||
"""
|
||||
logging.debug('Start setup of Pina DataModule obj')
|
||||
if self.datasets is None:
|
||||
self._create_datasets()
|
||||
if stage == 'fit' or stage is None:
|
||||
for dataset in self.datasets:
|
||||
if len(dataset) > 0:
|
||||
splits = self.dataset_split(dataset,
|
||||
self.split_length,
|
||||
shuffle=self.shuffle)
|
||||
for i in range(len(self.split_length)):
|
||||
self.splits[self.split_names[i]][
|
||||
dataset.data_type] = splits[i]
|
||||
self.train_dataset = PinaDatasetFactory(
|
||||
self.collector_splits['train'],
|
||||
max_conditions_lengths=self.find_max_conditions_lengths(
|
||||
'train'))
|
||||
if 'val' in self.collector_splits.keys():
|
||||
self.val_dataset = PinaDatasetFactory(
|
||||
self.collector_splits['val'],
|
||||
max_conditions_lengths=self.find_max_conditions_lengths(
|
||||
'val')
|
||||
)
|
||||
elif stage == 'test':
|
||||
raise NotImplementedError("Testing pipeline not implemented yet")
|
||||
self.test_dataset = PinaDatasetFactory(
|
||||
self.collector_splits['test'],
|
||||
max_conditions_lengths=self.find_max_conditions_lengths(
|
||||
'test')
|
||||
)
|
||||
elif stage == 'predict':
|
||||
self.predict_dataset = PinaDatasetFactory(
|
||||
self.collector_splits['predict'],
|
||||
max_conditions_lengths=self.find_max_conditions_lengths(
|
||||
'predict')
|
||||
)
|
||||
else:
|
||||
raise ValueError("stage must be either 'fit' or 'test'")
|
||||
raise ValueError(
|
||||
"stage must be either 'fit' or 'test' or 'predict'."
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def dataset_split(dataset, lengths, seed=None, shuffle=True):
|
||||
"""
|
||||
Perform the splitting of the dataset
|
||||
:param dataset: dataset object we wanted to split
|
||||
:param lengths: lengths of elements in dataset
|
||||
:param seed: random seed
|
||||
:param shuffle: shuffle dataset
|
||||
:return: split dataset
|
||||
:rtype: PinaSubset
|
||||
"""
|
||||
if sum(lengths) - 1 < 1e-3:
|
||||
len_dataset = len(dataset)
|
||||
lengths = [
|
||||
int(math.floor(len_dataset * length)) for length in lengths
|
||||
]
|
||||
remainder = len(dataset) - sum(lengths)
|
||||
for i in range(remainder):
|
||||
lengths[i % len(lengths)] += 1
|
||||
elif sum(lengths) - 1 >= 1e-3:
|
||||
raise ValueError(f"Sum of lengths is {sum(lengths)} less than 1")
|
||||
def _split_condition(condition_dict, splits_dict):
|
||||
len_condition = len(condition_dict['input_points'])
|
||||
|
||||
if shuffle:
|
||||
if seed is not None:
|
||||
generator = torch.Generator()
|
||||
generator.manual_seed(seed)
|
||||
indices = torch.randperm(sum(lengths), generator=generator)
|
||||
else:
|
||||
indices = torch.randperm(sum(lengths))
|
||||
dataset.apply_shuffle(indices)
|
||||
|
||||
indices = torch.arange(0, sum(lengths), 1, dtype=torch.uint8).tolist()
|
||||
offsets = [
|
||||
sum(lengths[:i]) if i > 0 else 0 for i in range(len(lengths))
|
||||
]
|
||||
return [
|
||||
PinaSubset(dataset, indices[offset:offset + length])
|
||||
for offset, length in zip(offsets, lengths)
|
||||
lengths = [
|
||||
int(math.floor(len_condition * length)) for length in
|
||||
splits_dict.values()
|
||||
]
|
||||
|
||||
def _create_datasets(self):
|
||||
remainder = len_condition - sum(lengths)
|
||||
for i in range(remainder):
|
||||
lengths[i % len(lengths)] += 1
|
||||
splits_dict = {k: v for k, v in zip(splits_dict.keys(), lengths)
|
||||
}
|
||||
to_return_dict = {}
|
||||
offset = 0
|
||||
for stage, stage_len in splits_dict.items():
|
||||
to_return_dict[stage] = {k: v[offset:offset + stage_len]
|
||||
for k, v in condition_dict.items() if
|
||||
k != 'equation'
|
||||
# Equations are NEVER dataloaded
|
||||
}
|
||||
offset += stage_len
|
||||
return to_return_dict
|
||||
|
||||
def _create_splits(self, collector, splits_dict):
|
||||
"""
|
||||
Create the dataset objects putting data
|
||||
Create the dataset objects putting data
|
||||
"""
|
||||
logging.debug('Dataset creation in PinaDataModule obj')
|
||||
collector = self.problem.collector
|
||||
batching_dim = self.problem.batching_dimension
|
||||
datasets_slots = [i.__slots__ for i in self.dataset_classes]
|
||||
self.datasets = [
|
||||
dataset(device=self.device) for dataset in self.dataset_classes
|
||||
]
|
||||
logging.debug('Filling datasets in PinaDataModule obj')
|
||||
for name, data in collector.data_collections.items():
|
||||
keys = list(data.keys())
|
||||
idx = [
|
||||
key for key, val in collector.conditions_name.items()
|
||||
if val == name
|
||||
]
|
||||
for i, slot in enumerate(datasets_slots):
|
||||
if slot == keys:
|
||||
self.datasets[i].add_points(data, idx[0], batching_dim)
|
||||
|
||||
# ----------- Auxiliary function ------------
|
||||
def _apply_shuffle(condition_dict, len_data):
|
||||
idx = torch.randperm(len_data)
|
||||
for k, v in condition_dict.items():
|
||||
if k == 'equation':
|
||||
continue
|
||||
datasets = []
|
||||
for dataset in self.datasets:
|
||||
if not dataset.empty:
|
||||
dataset.initialize()
|
||||
datasets.append(dataset)
|
||||
self.datasets = datasets
|
||||
if isinstance(v, list):
|
||||
condition_dict[k] = [v[i] for i in idx]
|
||||
elif isinstance(v, LabelTensor):
|
||||
condition_dict[k] = LabelTensor(v.tensor[idx],
|
||||
v.labels)
|
||||
elif isinstance(v, torch.Tensor):
|
||||
condition_dict[k] = v[idx]
|
||||
else:
|
||||
raise ValueError(f"Data type {type(v)} not supported")
|
||||
# ----------- End auxiliary function ------------
|
||||
|
||||
logging.debug('Dataset creation in PinaDataModule obj')
|
||||
split_names = list(splits_dict.keys())
|
||||
dataset_dict = {name: {} for name in split_names}
|
||||
for condition_name, condition_dict in collector.data_collections.items():
|
||||
len_data = len(condition_dict['input_points'])
|
||||
if self.shuffle:
|
||||
_apply_shuffle(condition_dict, len_data)
|
||||
for key, data in self._split_condition(condition_dict,
|
||||
splits_dict).items():
|
||||
dataset_dict[key].update({condition_name: data})
|
||||
return dataset_dict
|
||||
|
||||
def find_max_conditions_lengths(self, split):
|
||||
max_conditions_lengths = {}
|
||||
for k, v in self.collector_splits[split].items():
|
||||
if self.batch_size is None:
|
||||
max_conditions_lengths[k] = len(v['input_points'])
|
||||
elif self.repeat:
|
||||
max_conditions_lengths[k] = self.batch_size
|
||||
else:
|
||||
max_conditions_lengths[k] = min(len(v['input_points']),
|
||||
self.batch_size)
|
||||
return max_conditions_lengths
|
||||
|
||||
def val_dataloader(self):
|
||||
"""
|
||||
Create the validation dataloader
|
||||
"""
|
||||
|
||||
batch_size = self.batch_size if self.batch_size is not None else len(
|
||||
self.val_dataset)
|
||||
|
||||
# Use default batching in torch DataLoader (good is batch size is small)
|
||||
if self.default_batching:
|
||||
collate = Collator(self.find_max_conditions_lengths('val'))
|
||||
return DataLoader(self.val_dataset, self.batch_size,
|
||||
collate_fn=collate)
|
||||
collate = Collator(None)
|
||||
# Use custom batching (good if batch size is large)
|
||||
sampler = PinaBatchSampler(self.val_dataset, batch_size, shuffle=False)
|
||||
return DataLoader(self.val_dataset, sampler=sampler,
|
||||
collate_fn=collate)
|
||||
|
||||
def train_dataloader(self):
|
||||
"""
|
||||
Create the training dataloader
|
||||
"""
|
||||
# Use default batching in torch DataLoader (good is batch size is small)
|
||||
if self.default_batching:
|
||||
collate = Collator(self.find_max_conditions_lengths('train'))
|
||||
return DataLoader(self.train_dataset, self.batch_size,
|
||||
collate_fn=collate)
|
||||
collate = Collator(None)
|
||||
# Use custom batching (good if batch size is large)
|
||||
batch_size = self.batch_size if self.batch_size is not None else len(
|
||||
self.train_dataset)
|
||||
sampler = PinaBatchSampler(self.train_dataset, batch_size,
|
||||
shuffle=False)
|
||||
return DataLoader(self.train_dataset, sampler=sampler,
|
||||
collate_fn=collate)
|
||||
|
||||
def test_dataloader(self):
|
||||
"""
|
||||
Create the testing dataloader
|
||||
"""
|
||||
raise NotImplementedError("Test dataloader not implemented")
|
||||
|
||||
def predict_dataloader(self):
|
||||
"""
|
||||
Create the prediction dataloader
|
||||
"""
|
||||
raise NotImplementedError("Predict dataloader not implemented")
|
||||
|
||||
def transfer_batch_to_device(self, batch, device, dataloader_idx):
|
||||
"""
|
||||
Transfer the batch to the device. This method is called in the
|
||||
training loop and is used to transfer the batch to the device.
|
||||
"""
|
||||
batch = [
|
||||
(k, super(LightningDataModule, self).transfer_batch_to_device(v,
|
||||
device,
|
||||
dataloader_idx))
|
||||
for k, v in batch.items()
|
||||
]
|
||||
return batch
|
||||
|
||||
Reference in New Issue
Block a user