🎨 Format Python code with psf/black

This commit is contained in:
ndem0
2024-02-09 11:25:00 +00:00
committed by Nicola Demo
parent 591aeeb02b
commit cbb43a5392
64 changed files with 1323 additions and 955 deletions

View File

@@ -1,10 +1,4 @@
__all__ = [
'PINN',
'GAROM',
'SupervisedSolver',
'SolverInterface'
]
__all__ = ["PINN", "GAROM", "SupervisedSolver", "SolverInterface"]
from .garom import GAROM
from .pinn import PINN

View File

@@ -2,10 +2,13 @@
import torch
import sys
try:
from torch.optim.lr_scheduler import LRScheduler # torch >= 2.0
except ImportError:
from torch.optim.lr_scheduler import _LRScheduler as LRScheduler # torch < 2.0
from torch.optim.lr_scheduler import (
_LRScheduler as LRScheduler,
) # torch < 2.0
from torch.optim.lr_scheduler import ConstantLR
from .solver import SolverInterface
@@ -18,12 +21,12 @@ class GAROM(SolverInterface):
"""
GAROM solver class. This class implements Generative Adversarial
Reduced Order Model solver, using user specified ``models`` to solve
a specific order reduction``problem``.
a specific order reduction``problem``.
.. seealso::
**Original reference**: Coscia, D., Demo, N., & Rozza, G. (2023).
*Generative Adversarial Reduced Order Modelling*.
*Generative Adversarial Reduced Order Modelling*.
DOI: `arXiv preprint arXiv:2305.15881.
<https://doi.org/10.48550/arXiv.2305.15881>`_.
"""
@@ -35,19 +38,13 @@ class GAROM(SolverInterface):
discriminator,
loss=None,
optimizer_generator=torch.optim.Adam,
optimizer_generator_kwargs={'lr': 0.001},
optimizer_generator_kwargs={"lr": 0.001},
optimizer_discriminator=torch.optim.Adam,
optimizer_discriminator_kwargs={'lr': 0.001},
optimizer_discriminator_kwargs={"lr": 0.001},
scheduler_generator=ConstantLR,
scheduler_generator_kwargs={
"factor": 1,
"total_iters": 0
},
scheduler_generator_kwargs={"factor": 1, "total_iters": 0},
scheduler_discriminator=ConstantLR,
scheduler_discriminator_kwargs={
"factor": 1,
"total_iters": 0
},
scheduler_discriminator_kwargs={"factor": 1, "total_iters": 0},
gamma=0.3,
lambda_k=0.001,
regularizer=False,
@@ -95,8 +92,10 @@ class GAROM(SolverInterface):
problem=problem,
optimizers=[optimizer_generator, optimizer_discriminator],
optimizers_kwargs=[
optimizer_generator_kwargs, optimizer_discriminator_kwargs
])
optimizer_generator_kwargs,
optimizer_discriminator_kwargs,
],
)
# set automatic optimization for GANs
self.automatic_optimization = False
@@ -118,13 +117,14 @@ class GAROM(SolverInterface):
# assign schedulers
self._schedulers = [
scheduler_generator(
self.optimizers[0], **scheduler_generator_kwargs),
self.optimizers[0], **scheduler_generator_kwargs
),
scheduler_discriminator(
self.optimizers[1],
**scheduler_discriminator_kwargs)
self.optimizers[1], **scheduler_discriminator_kwargs
),
]
# loss and writer
# loss and writer
self._loss = loss
# began hyperparameters
@@ -141,7 +141,7 @@ class GAROM(SolverInterface):
:param x: The input tensor.
:type x: torch.Tensor
:param mc_steps: Number of montecarlo samples to approximate the
:param mc_steps: Number of montecarlo samples to approximate the
expected value, defaults to 20.
:type mc_steps: int
:param variance: Returining also the sample variance of the solution, defaults to False.
@@ -189,8 +189,12 @@ class GAROM(SolverInterface):
# generator loss
r_loss = self._loss(snapshots, generated_snapshots)
d_fake = self.discriminator.forward_map([generated_snapshots, parameters])
g_loss = self._loss(d_fake, generated_snapshots) + self.regularizer * r_loss
d_fake = self.discriminator.forward_map(
[generated_snapshots, parameters]
)
g_loss = (
self._loss(d_fake, generated_snapshots) + self.regularizer * r_loss
)
# backward step
g_loss.backward()
@@ -210,7 +214,9 @@ class GAROM(SolverInterface):
# Discriminator pass
d_real = self.discriminator.forward_map([snapshots, parameters])
d_fake = self.discriminator.forward_map([generated_snapshots, parameters])
d_fake = self.discriminator.forward_map(
[generated_snapshots, parameters]
)
# evaluate loss
d_loss_real = self._loss(d_real, snapshots)
@@ -235,7 +241,7 @@ class GAROM(SolverInterface):
self.k += self.lambda_k * diff.item()
self.k = min(max(self.k, 0), 1) # Constraint to interval [0, 1]
return diff
def training_step(self, batch, batch_idx):
"""GAROM solver training step.
@@ -248,42 +254,75 @@ class GAROM(SolverInterface):
"""
dataloader = self.trainer.train_dataloader
condition_idx = batch['condition']
condition_idx = batch["condition"]
for condition_id in range(condition_idx.min(), condition_idx.max()+1):
for condition_id in range(condition_idx.min(), condition_idx.max() + 1):
if sys.version_info >= (3, 8):
condition_name = dataloader.condition_names[condition_id]
else:
condition_name = dataloader.loaders.condition_names[condition_id]
condition_name = dataloader.loaders.condition_names[
condition_id
]
condition = self.problem.conditions[condition_name]
pts = batch['pts'].detach()
out = batch['output']
pts = batch["pts"].detach()
out = batch["output"]
if condition_name not in self.problem.conditions:
raise RuntimeError('Something wrong happened.')
raise RuntimeError("Something wrong happened.")
# for data driven mode
if not hasattr(condition, 'output_points'):
raise NotImplementedError('GAROM works only in data-driven mode.')
if not hasattr(condition, "output_points"):
raise NotImplementedError(
"GAROM works only in data-driven mode."
)
# get data
snapshots = out[condition_idx == condition_id]
parameters = pts[condition_idx == condition_id]
d_loss_real, d_loss_fake, d_loss = self._train_discriminator(
parameters, snapshots)
parameters, snapshots
)
r_loss, g_loss = self._train_generator(parameters, snapshots)
diff = self._update_weights(d_loss_real, d_loss_fake)
# logging
self.log('mean_loss', float(r_loss), prog_bar=True, logger=True, on_epoch=True, on_step=False)
self.log('d_loss', float(d_loss), prog_bar=True, logger=True, on_epoch=True, on_step=False)
self.log('g_loss', float(g_loss), prog_bar=True, logger=True, on_epoch=True, on_step=False)
self.log('stability_metric', float(d_loss_real + torch.abs(diff)), prog_bar=True, logger=True, on_epoch=True, on_step=False)
self.log(
"mean_loss",
float(r_loss),
prog_bar=True,
logger=True,
on_epoch=True,
on_step=False,
)
self.log(
"d_loss",
float(d_loss),
prog_bar=True,
logger=True,
on_epoch=True,
on_step=False,
)
self.log(
"g_loss",
float(g_loss),
prog_bar=True,
logger=True,
on_epoch=True,
on_step=False,
)
self.log(
"stability_metric",
float(d_loss_real + torch.abs(diff)),
prog_bar=True,
logger=True,
on_epoch=True,
on_step=False,
)
return

View File

@@ -1,9 +1,13 @@
""" Module for PINN """
import torch
try:
from torch.optim.lr_scheduler import LRScheduler # torch >= 2.0
except ImportError:
from torch.optim.lr_scheduler import _LRScheduler as LRScheduler # torch < 2.0
from torch.optim.lr_scheduler import (
_LRScheduler as LRScheduler,
) # torch < 2.0
import sys
from torch.optim.lr_scheduler import ConstantLR
@@ -39,14 +43,11 @@ class PINN(SolverInterface):
extra_features=None,
loss=torch.nn.MSELoss(),
optimizer=torch.optim.Adam,
optimizer_kwargs={'lr': 0.001},
optimizer_kwargs={"lr": 0.001},
scheduler=ConstantLR,
scheduler_kwargs={
"factor": 1,
"total_iters": 0
},
scheduler_kwargs={"factor": 1, "total_iters": 0},
):
'''
"""
:param AbstractProblem problem: The formulation of the problem.
:param torch.nn.Module model: The neural network model to use.
:param torch.nn.Module loss: The loss function used as minimizer,
@@ -59,12 +60,14 @@ class PINN(SolverInterface):
:param torch.optim.LRScheduler scheduler: Learning
rate scheduler.
:param dict scheduler_kwargs: LR scheduler constructor keyword args.
'''
super().__init__(models=[model],
problem=problem,
optimizers=[optimizer],
optimizers_kwargs=[optimizer_kwargs],
extra_features=extra_features)
"""
super().__init__(
models=[model],
problem=problem,
optimizers=[optimizer],
optimizers_kwargs=[optimizer_kwargs],
extra_features=extra_features,
)
# check consistency
check_consistency(scheduler, LRScheduler, subclass=True)
@@ -105,15 +108,21 @@ class PINN(SolverInterface):
# to the parameters that the optimizer needs to optimize
if isinstance(self.problem, InverseProblem):
self.optimizers[0].add_param_group(
{'params': [self._params[var] for var in self.problem.unknown_variables]}
)
{
"params": [
self._params[var]
for var in self.problem.unknown_variables
]
}
)
return self.optimizers, [self.scheduler]
def _clamp_inverse_problem_params(self):
for v in self._params:
self._params[v].data.clamp_(
self.problem.unknown_parameter_domain.range_[v][0],
self.problem.unknown_parameter_domain.range_[v][1])
self.problem.unknown_parameter_domain.range_[v][0],
self.problem.unknown_parameter_domain.range_[v][1],
)
def _loss_data(self, input, output):
return self.loss(self.forward(input), output)
@@ -121,9 +130,15 @@ class PINN(SolverInterface):
def _loss_phys(self, samples, equation):
try:
residual = equation.residual(samples, self.forward(samples))
except TypeError: # this occurs when the function has three inputs, i.e. inverse problem
residual = equation.residual(samples, self.forward(samples), self._params)
return self.loss(torch.zeros_like(residual, requires_grad=True), residual)
except (
TypeError
): # this occurs when the function has three inputs, i.e. inverse problem
residual = equation.residual(
samples, self.forward(samples), self._params
)
return self.loss(
torch.zeros_like(residual, requires_grad=True), residual
)
def training_step(self, batch, batch_idx):
"""
@@ -140,23 +155,25 @@ class PINN(SolverInterface):
dataloader = self.trainer.train_dataloader
condition_losses = []
condition_idx = batch['condition']
condition_idx = batch["condition"]
for condition_id in range(condition_idx.min(), condition_idx.max()+1):
for condition_id in range(condition_idx.min(), condition_idx.max() + 1):
if sys.version_info >= (3, 8):
condition_name = dataloader.condition_names[condition_id]
else:
condition_name = dataloader.loaders.condition_names[condition_id]
condition_name = dataloader.loaders.condition_names[
condition_id
]
condition = self.problem.conditions[condition_name]
pts = batch['pts']
pts = batch["pts"]
if len(batch) == 2:
samples = pts[condition_idx == condition_id]
loss = self._loss_phys(samples, condition.equation)
elif len(batch) == 3:
samples = pts[condition_idx == condition_id]
ground_truth = batch['output'][condition_idx == condition_id]
ground_truth = batch["output"][condition_idx == condition_id]
loss = self._loss_data(samples, ground_truth)
else:
raise ValueError("Batch size not supported")
@@ -164,10 +181,16 @@ class PINN(SolverInterface):
# TODO for users this us hard to remember when creating a new solver, to fix in a smarter way
loss = loss.as_subclass(torch.Tensor)
# # add condition losses and accumulate logging for each epoch
# # add condition losses and accumulate logging for each epoch
condition_losses.append(loss * condition.data_weight)
self.log(condition_name + '_loss', float(loss),
prog_bar=True, logger=True, on_epoch=True, on_step=False)
self.log(
condition_name + "_loss",
float(loss),
prog_bar=True,
logger=True,
on_epoch=True,
on_step=False,
)
# clamp unknown parameters of the InverseProblem to their domain ranges (if needed)
if isinstance(self.problem, InverseProblem):
@@ -176,8 +199,14 @@ class PINN(SolverInterface):
# TODO Fix the bug, tot_loss is a label tensor without labels
# we need to pass it as a torch tensor to make everything work
total_loss = sum(condition_losses)
self.log('mean_loss', float(total_loss / len(condition_losses)),
prog_bar=True, logger=True, on_epoch=True, on_step=False)
self.log(
"mean_loss",
float(total_loss / len(condition_losses)),
prog_bar=True,
logger=True,
on_epoch=True,
on_step=False,
)
return total_loss

View File

@@ -15,12 +15,14 @@ class SolverInterface(pytorch_lightning.LightningModule, metaclass=ABCMeta):
LightningModule methods.
"""
def __init__(self,
models,
problem,
optimizers,
optimizers_kwargs,
extra_features=None):
def __init__(
self,
models,
problem,
optimizers,
optimizers_kwargs,
extra_features=None,
):
"""
:param models: A torch neural network model instance.
:type models: torch.nn.Module
@@ -30,7 +32,7 @@ class SolverInterface(pytorch_lightning.LightningModule, metaclass=ABCMeta):
use.
:param list(dict) optimizer_kwargs: A list of optimizer constructor keyword args.
:param list(torch.nn.Module) extra_features: The additional input
features to use as augmented input. If ``None`` no extra features
features to use as augmented input. If ``None`` no extra features
are passed. If it is a list of :class:`torch.nn.Module`, the extra feature
list is passed to all models. If it is a list of extra features' lists,
each single list of extra feature is passed to a model.
@@ -57,19 +59,23 @@ class SolverInterface(pytorch_lightning.LightningModule, metaclass=ABCMeta):
# check length consistency optimizers
if len_model != len_optimizer:
raise ValueError('You must define one optimizer for each model.'
f'Got {len_model} models, and {len_optimizer}'
' optimizers.')
raise ValueError(
"You must define one optimizer for each model."
f"Got {len_model} models, and {len_optimizer}"
" optimizers."
)
# check length consistency optimizers kwargs
if len_optimizer_kwargs != len_optimizer:
raise ValueError('You must define one dictionary of keyword'
' arguments for each optimizers.'
f'Got {len_optimizer} optimizers, and'
f' {len_optimizer_kwargs} dicitionaries')
raise ValueError(
"You must define one dictionary of keyword"
" arguments for each optimizers."
f"Got {len_optimizer} optimizers, and"
f" {len_optimizer_kwargs} dicitionaries"
)
# extra features handling
if (extra_features is None) or (len(extra_features)==0):
if (extra_features is None) or (len(extra_features) == 0):
extra_features = [None] * len_model
else:
# if we only have a list of extra features
@@ -78,24 +84,28 @@ class SolverInterface(pytorch_lightning.LightningModule, metaclass=ABCMeta):
else: # if we have a list of list extra features
if len(extra_features) != len_model:
raise ValueError(
'You passed a list of extrafeatures list with len'
f'different of models len. Expected {len_model} '
f'got {len(extra_features)}. If you want to use '
'the same list of extra features for all models, '
'just pass a list of extrafeatures and not a list '
'of list of extra features.')
"You passed a list of extrafeatures list with len"
f"different of models len. Expected {len_model} "
f"got {len(extra_features)}. If you want to use "
"the same list of extra features for all models, "
"just pass a list of extrafeatures and not a list "
"of list of extra features."
)
# assigning model and optimizers
self._pina_models = []
self._pina_optimizers = []
for idx in range(len_model):
model_ = Network(model=models[idx],
input_variables=problem.input_variables,
output_variables=problem.output_variables,
extra_features=extra_features[idx])
optim_ = optimizers[idx](model_.parameters(),
**optimizers_kwargs[idx])
model_ = Network(
model=models[idx],
input_variables=problem.input_variables,
output_variables=problem.output_variables,
extra_features=extra_features[idx],
)
optim_ = optimizers[idx](
model_.parameters(), **optimizers_kwargs[idx]
)
self._pina_models.append(model_)
self._pina_optimizers.append(optim_)

View File

@@ -1,10 +1,14 @@
""" Module for SupervisedSolver """
import torch
import sys
try:
from torch.optim.lr_scheduler import LRScheduler # torch >= 2.0
except ImportError:
from torch.optim.lr_scheduler import _LRScheduler as LRScheduler # torch < 2.0
from torch.optim.lr_scheduler import (
_LRScheduler as LRScheduler,
) # torch < 2.0
from torch.optim.lr_scheduler import ConstantLR
@@ -18,7 +22,7 @@ from torch.nn.modules.loss import _Loss
class SupervisedSolver(SolverInterface):
"""
SupervisedSolver solver class. This class implements a SupervisedSolver,
using a user specified ``model`` to solve a specific ``problem``.
using a user specified ``model`` to solve a specific ``problem``.
"""
def __init__(
@@ -28,14 +32,11 @@ class SupervisedSolver(SolverInterface):
extra_features=None,
loss=torch.nn.MSELoss(),
optimizer=torch.optim.Adam,
optimizer_kwargs={'lr': 0.001},
optimizer_kwargs={"lr": 0.001},
scheduler=ConstantLR,
scheduler_kwargs={
"factor": 1,
"total_iters": 0
},
scheduler_kwargs={"factor": 1, "total_iters": 0},
):
'''
"""
:param AbstractProblem problem: The formualation of the problem.
:param torch.nn.Module model: The neural network model to use.
:param torch.nn.Module loss: The loss function used as minimizer,
@@ -49,12 +50,14 @@ class SupervisedSolver(SolverInterface):
:param torch.optim.LRScheduler scheduler: Learning
rate scheduler.
:param dict scheduler_kwargs: LR scheduler constructor keyword args.
'''
super().__init__(models=[model],
problem=problem,
optimizers=[optimizer],
optimizers_kwargs=[optimizer_kwargs],
extra_features=extra_features)
"""
super().__init__(
models=[model],
problem=problem,
optimizers=[optimizer],
optimizers_kwargs=[optimizer_kwargs],
extra_features=extra_features,
)
# check consistency
check_consistency(scheduler, LRScheduler, subclass=True)
@@ -69,7 +72,7 @@ class SupervisedSolver(SolverInterface):
def forward(self, x):
"""Forward pass implementation for the solver.
:param torch.Tensor x: Input tensor.
:param torch.Tensor x: Input tensor.
:return: Solver solution.
:rtype: torch.Tensor
"""
@@ -95,32 +98,39 @@ class SupervisedSolver(SolverInterface):
"""
dataloader = self.trainer.train_dataloader
condition_idx = batch['condition']
condition_idx = batch["condition"]
for condition_id in range(condition_idx.min(), condition_idx.max()+1):
for condition_id in range(condition_idx.min(), condition_idx.max() + 1):
if sys.version_info >= (3, 8):
condition_name = dataloader.condition_names[condition_id]
else:
condition_name = dataloader.loaders.condition_names[condition_id]
condition_name = dataloader.loaders.condition_names[
condition_id
]
condition = self.problem.conditions[condition_name]
pts = batch['pts']
out = batch['output']
pts = batch["pts"]
out = batch["output"]
if condition_name not in self.problem.conditions:
raise RuntimeError('Something wrong happened.')
raise RuntimeError("Something wrong happened.")
# for data driven mode
if not hasattr(condition, 'output_points'):
raise NotImplementedError('Supervised solver works only in data-driven mode.')
if not hasattr(condition, "output_points"):
raise NotImplementedError(
"Supervised solver works only in data-driven mode."
)
output_pts = out[condition_idx == condition_id]
input_pts = pts[condition_idx == condition_id]
loss = self.loss(self.forward(input_pts), output_pts) * condition.data_weight
loss = (
self.loss(self.forward(input_pts), output_pts)
* condition.data_weight
)
loss = loss.as_subclass(torch.Tensor)
self.log('mean_loss', float(loss), prog_bar=True, logger=True)
self.log("mean_loss", float(loss), prog_bar=True, logger=True)
return loss
@property