version 0.0.1

This commit is contained in:
Your Name
2022-02-11 16:44:37 +01:00
parent fa8ffd5042
commit 1483746b45
29 changed files with 416 additions and 559 deletions

View File

@@ -1,5 +1,15 @@
from .pinn import PINN
from .deep_feed_forward import DeepFeedForward
__all__ = [
'PINN',
'ParametricPINN',
'LabelTensor',
'Plotter',
'Condition',
'Span'
]
from .label_tensor import LabelTensor
from .pinn import PINN
#from .ppinn import ParametricPINN
from .plotter import Plotter
from .span import Span
from .condition import Condition

32
pina/condition.py Normal file
View File

@@ -0,0 +1,32 @@
""" """
import torch
from .location import Location
class Condition:
def __init__(self, *args, **kwargs):
if len(args) == 2 and not kwargs:
if (isinstance(args[0], torch.Tensor) and
isinstance(args[1], torch.Tensor)):
self.input_points = args[0]
self.output_points = args[1]
elif isinstance(args[0], Location) and callable(args[1]):
self.location = args[0]
self.function = args[1]
else:
raise ValueError
elif not args and len(kwargs) == 2:
if 'input_points' in kwargs and 'output_points' in kwargs:
self.input_points = kwargs['input_points']
self.output_points = kwargs['output_points']
elif 'location' in kwargs and 'function' in kwargs:
self.location = kwargs['location']
self.function = kwargs['function']
else:
raise ValueError
else:
raise ValueError

View File

@@ -1,44 +0,0 @@
import numpy as np
from .chebyshev import chebyshev_roots
class Cube():
def __init__(self, bound):
self.bound = np.asarray(bound)
def discretize(self, n, mode='random'):
if mode == 'random':
pts = np.random.uniform(size=(n, self.bound.shape[0]))
elif mode == 'chebyshev':
pts = np.array([
chebyshev_roots(n) * .5 + .5
for _ in range(self.bound.shape[0])])
grids = np.meshgrid(*pts)
pts = np.hstack([grid.reshape(-1, 1) for grid in grids])
elif mode == 'grid':
pts = np.array([
np.linspace(0, 1, n)
for _ in range(self.bound.shape[0])])
grids = np.meshgrid(*pts)
pts = np.hstack([grid.reshape(-1, 1) for grid in grids])
elif mode == 'lh' or mode == 'latin':
from scipy.stats import qmc
sampler = qmc.LatinHypercube(d=self.bound.shape[0])
pts = sampler.random(n)
# Scale pts
pts *= self.bound[:, 1] - self.bound[:, 0]
pts += self.bound[:, 0]
return pts
def meshgrid(self, n):
pts = np.array([
np.linspace(0, 1, n)
for _ in range(self.bound.shape[0])])
pts *= self.bound[:, 1] - self.bound[:, 0]
pts += self.bound[:, 0]
return np.meshgrid(*pts)

View File

@@ -1,8 +1,8 @@
import torch
class LabelTensor():
class LabelTensor():
def __init__(self, x, labels):
def __init__(self, x, labels):
if len(labels) != x.shape[1]:
@@ -21,7 +21,19 @@ class LabelTensor():
return self.tensor
def __str__(self):
return self.tensor, self.labels
return '{}\n {}\n'.format(self.labels, self.tensor)
@property
def shape(self):
return self.tensor.shape
@property
def dtype(self):
return self.tensor.dtype
@property
def device(self):
return self.tensor.device
@property
def labels(self):

9
pina/location.py Normal file
View File

@@ -0,0 +1,9 @@
from abc import ABCMeta, abstractmethod
class Location(metaclass=ABCMeta):
@property
@abstractmethod
def sample(self):
pass

View File

@@ -14,7 +14,7 @@ __title__ = "pina"
__author__ = "Nicola Demo, Maria Strazzullo"
__copyright__ = "Copyright 2021-2021, PINA contributors"
__license__ = "MIT"
__version__ = "0.0.0"
__mail__ = 'demo.nicola@gmail.com, ' # TODO
__version__ = "0.0.1"
__mail__ = 'demo.nicola@gmail.com, ' # TODO
__maintainer__ = __author__
__status__ = "Alpha"

7
pina/model/__init__.py Normal file
View File

@@ -0,0 +1,7 @@
__all__ = [
'FeedForward',
'MultiFeedForward'
]
from .feed_forward import FeedForward
from .multi_feed_forward import MultiFeedForward

View File

@@ -1,34 +1,21 @@
import torch
import torch.nn as nn
import numpy as np
from .cube import Cube
torch.pi = torch.acos(torch.zeros(1)).item() * 2 # which is 3.1415927410125732
from torch.nn import Tanh, ReLU
#import torch.nn.utils.prune as prune
from pina.adaptive_functions import AdaptiveLinear
from pina.label_tensor import LabelTensor
class DeepFeedForward(torch.nn.Module):
def __init__(self,
inner_size=20,
n_layers=2,
func=nn.Tanh,
input_variables=None,
output_variables=None,
layers=None,
extra_features=None):
class FeedForward(torch.nn.Module):
def __init__(self, input_variables, output_variables, inner_size=20,
n_layers=2, func=nn.Tanh, layers=None, extra_features=None):
'''
'''
super(DeepFeedForward, self).__init__()
super().__init__()
if extra_features is None:
extra_features = []
self.extra_features = nn.Sequential(*extra_features)
if input_variables is None: input_variables = ['x']
if output_variables is None: input_variables = ['y']
self.input_variables = input_variables
self.input_dimension = len(input_variables)
@@ -37,45 +24,43 @@ class DeepFeedForward(torch.nn.Module):
n_features = len(extra_features)
if layers is None: layers = [inner_size] * n_layers
if layers is None:
layers = [inner_size] * n_layers
tmp_layers = layers.copy()
tmp_layers.insert(0, self.input_dimension+n_features)#-1)
tmp_layers.insert(0, self.input_dimension+n_features)
tmp_layers.append(self.output_dimension)
self.layers = []
for i in range(len(tmp_layers)-1):
self.layers.append(nn.Linear(tmp_layers[i], tmp_layers[i+1]))
if isinstance(func, list):
self.functions = func
else:
self.functions = [func for _ in range(len(self.layers)-1)]
unique_list = []
for layer, func in zip(self.layers[:-1], self.functions):
unique_list.append(layer)
if func is not None: unique_list.append(func())
if func is not None:
unique_list.append(func())
unique_list.append(self.layers[-1])
self.model = nn.Sequential(*unique_list)
def forward(self, x):
"""
"""
nf = len(self.extra_features)
if nf == 0:
return LabelTensor(self.model(x), self.output_variables)
return LabelTensor(self.model(x.tensor), self.output_variables)
# if self.extra_features
#input_ = torch.zeros(x.shape[0], nf+self.input_dimension, dtype=x.dtype, device=x.device)
input_ = torch.zeros(x.shape[0], nf+x.shape[1], dtype=x.dtype, device=x.device)
input_[:, :x.shape[1]] = x
for i, feature in enumerate(self.extra_features, start=self.input_dimension):
input_ = torch.zeros(x.shape[0], nf+x.shape[1], dtype=x.dtype,
device=x.device)
input_[:, :x.shape[1]] = x.tensor
for i, feature in enumerate(self.extra_features,
start=self.input_dimension):
input_[:, i] = feature(x)
return LabelTensor(self.model(input_), self.output_variables)

View File

@@ -0,0 +1,17 @@
import torch
from .feed_forward import FeedForward
class MultiFeedForward(torch.nn.Module):
def __init__(self, dff_dict):
'''
'''
super().__init__()
if not isinstance(dff_dict, dict):
raise TypeError
for name, constructor_args in dff_dict.items():
setattr(self, name, FeedForward(**constructor_args))

View File

@@ -1,27 +0,0 @@
from .problem import Problem
import torch
import torch.nn as nn
import numpy as np
from .cube import Cube
torch.pi = torch.acos(torch.zeros(1)).item() * 2 # which is 3.1415927410125732
from torch.nn import Tanh, ReLU
import torch.nn.utils.prune as prune
from pina.adaptive_functions import AdaptiveLinear
from pina.deep_feed_forward import DeepFeedForward
class MultiDeepFeedForward(torch.nn.Module):
def __init__(self, dff_dict):
'''
'''
super().__init__()
if not isinstance(dff_dict, dict):
raise TypeError
for name, constructor_args in dff_dict.items():
setattr(self, name, DeepFeedForward(**constructor_args))

View File

@@ -1,9 +0,0 @@
from .problem2d import Problem2D
import numpy as np
class ParametricProblem2D(Problem2D):
def __init__(self, variables=None, bc=None, params_bound=None, domain_bound=None):
Problem2D.__init__(self, variables=variables, bc=bc, domain_bound=domain_bound)
self.params_domain = params_bound

View File

@@ -163,17 +163,16 @@ class PINN(object):
if locations == 'all':
locations = [condition for condition in self.problem.conditions]
for location in locations:
manifold, func = self.problem.conditions[location].values()
if torch.is_tensor(manifold):
pts = manifold
else:
pts = manifold.discretize(n, mode)
condition = self.problem.conditions[location]
pts = torch.from_numpy(pts)
try:
pts = condition.location.sample(n, mode)
except:
pts = condition.input_points
self.input_pts[location] = LabelTensor(pts, self.problem.input_variables)
self.input_pts[location] = pts
print(pts.tensor.shape)
self.input_pts[location].tensor.to(dtype=self.dtype, device=self.device)
self.input_pts[location].tensor.requires_grad_(True)
self.input_pts[location].tensor.retain_grad()
@@ -203,17 +202,15 @@ class PINN(object):
while True:
losses = []
for condition_name in self.problem.conditions:
condition = self.problem.conditions[condition_name]
pts = self.input_pts[condition_name]
predicted = self.model(pts.tensor)
if isinstance(self.problem.conditions[condition_name]['func'], list):
for func in self.problem.conditions[condition_name]['func']:
residuals = func(pts, predicted)
losses.append(self._compute_norm(residuals))
else:
residuals = self.problem.conditions[condition_name]['func'](pts, predicted)
losses.append(self._compute_norm(residuals))
#print(condition_name, losses[-1])
predicted = self.model(pts)
residuals = condition.function(pts, predicted)
losses.append(self._compute_norm(residuals))
self.optimizer.zero_grad()
sum(losses).backward()

View File

@@ -7,7 +7,7 @@ import torch
from pina import LabelTensor
from pina import PINN
from .problem import Problem2D, Problem1D, TimeDependentProblem
from .problem import SpatialProblem, TimeDependentProblem
#from pina.tdproblem1d import TimeDepProblem1D
@@ -79,13 +79,31 @@ class Plotter:
def plot(self, obj, filename=None):
def plot(self, obj, method='contourf', filename=None):
"""
"""
if isinstance(obj.problem, (TimeDependentProblem, Problem1D)): # time-dep 1D
self._plot_1D_TimeDep(obj, method='pcolor')
elif isinstance(obj.problem, Problem2D): # 2D
self._plot_2D(obj, method='pcolor')
res = 256
pts = obj.problem.domain.sample(res, 'grid')
grids_container = [
pts[:, 0].reshape(res, res),
pts[:, 1].reshape(res, res),
]
predicted_output = obj.model(pts)
if hasattr(obj.problem, 'truth_solution'):
truth_output = obj.problem.truth_solution(*pts.tensor.T).float()
fig, axes = plt.subplots(nrows=1, ncols=3, figsize=(16, 6))
cb = getattr(axes[0], method)(*grids_container, predicted_output.tensor.reshape(res, res).detach())
fig.colorbar(cb, ax=axes[0])
cb = getattr(axes[1], method)(*grids_container, truth_output.reshape(res, res).detach())
fig.colorbar(cb, ax=axes[1])
cb = getattr(axes[2], method)(*grids_container, (truth_output-predicted_output.tensor.float().flatten()).detach().reshape(res, res))
fig.colorbar(cb, ax=axes[2])
else:
fig, axes = plt.subplots(nrows=1, ncols=1, figsize=(8, 6))
cb = getattr(axes, method)(*grids_container, predicted_output.tensor.reshape(res, res).detach())
fig.colorbar(cb, ax=axes)
if filename:
plt.savefig(filename)

View File

@@ -1,28 +1,16 @@
from mpmath import chebyt, chop, taylor
import torch
import numpy as np
from .problem import AbstractProblem
import torch
import torch.nn as nn
import numpy as np
from .cube import Cube
from .deep_feed_forward import DeepFeedForward
from pina.label_tensor import LabelTensor
from pina.pinn import PINN
torch.pi = torch.acos(torch.zeros(1)).item() * 2 # which is 3.1415927410125732
from . import PINN, LabelTensor
torch.pi = torch.acos(torch.zeros(1)).item() * 2 # 3.1415927410125732
class ParametricPINN(PINN):
def __init__(self,
problem,
model,
optimizer=torch.optim.Adam,
lr=0.001,
regularizer=0.00001,
data_weight=1.,
dtype=torch.float64,
device='cpu',
lr_accelerate=None,
error_norm='mse'):
def __init__(self, problem, model, optimizer=torch.optim.Adam, lr=0.001,
regularizer=0.00001, data_weight=1., dtype=torch.float64,
device='cpu', lr_accelerate=None, error_norm='mse'):
'''
:param Problem problem: the formualation of the problem.
:param dict architecture: a dictionary containing the information to
@@ -40,7 +28,7 @@ class ParametricPINN(PINN):
default is `torch.float64`.
:param float lr_accelete: the coefficient that controls the learning
rate increase, such that, for all the epoches in which the loss is
decreasing, the learning_rate is update using
decreasing, the learning_rate is update using
$learning_rate = learning_rate * lr_accelerate$.
When the loss stops to decrease, the learning rate is set to the
initial value [TODO test parameters]
@@ -49,7 +37,7 @@ class ParametricPINN(PINN):
self.problem = problem
# self._architecture = architecture if architecture else dict()
# self._architecture['input_dimension'] = self.problem.domain_bound.shape[0]
# self._architecture['output_dimension'] = len(self.problem.variables)
@@ -77,7 +65,7 @@ class ParametricPINN(PINN):
self.trained_epoch = 0
self.optimizer = optimizer(
self.model.parameters(), lr=lr, weight_decay=regularizer)
self.data_weight = data_weight
@property
@@ -86,7 +74,7 @@ class ParametricPINN(PINN):
@problem.setter
def problem(self, problem):
if not isinstance(problem, Problem):
if not isinstance(problem, AbstractProblem):
raise TypeError
self._problem = problem
@@ -103,124 +91,31 @@ class ParametricPINN(PINN):
def get_phys_residuals(self):
"""
"""
residuals = []
for equation in self.problem.equation:
residuals.append(equation(self.phys_pts, self.model(self.phys_pts)))
return residuals
def _compute_norm(self, vec):
"""
Compute the norm of the `vec` one-dimensional tensor based on the
`self.error_norm` attribute.
.. todo: complete
:param vec torch.tensor: the tensor
"""
if isinstance(self.error_norm, int):
return torch.sum(torch.abs(vec**self.error_norm))**(1./self.error_norm)
elif self.error_norm == 'mse':
return torch.mean(vec**2)
elif self.error_norm == 'me':
return torch.mean(torch.abs(vec))
else:
raise RuntimeError
def save_state(self, filename):
checkpoint = {
'epoch': self.trained_epoch,
'model_state': self.model.state_dict(),
'optimizer_state' : self.optimizer.state_dict(),
'optimizer_class' : self.optimizer.__class__,
}
# TODO save also architecture param?
#if isinstance(self.model, DeepFeedForward):
# checkpoint['model_class'] = self.model.__class__
# checkpoint['model_structure'] = {
# }
torch.save(checkpoint, filename)
def load_state(self, filename):
checkpoint = torch.load(filename)
self.model.load_state_dict(checkpoint['model_state'])
self.optimizer = checkpoint['optimizer_class'](self.model.parameters())
self.optimizer.load_state_dict(checkpoint['optimizer_state'])
self.trained_epoch = checkpoint['epoch']
return self
def span_pts(self, n, mode='grid', locations='all'):
'''
'''
if locations == 'all':
locations = [condition for condition in self.problem.conditions]
for location in locations:
manifold, func = self.problem.conditions[location].values()
if torch.is_tensor(manifold):
pts = manifold
else:
pts = manifold.discretize(n, mode)
pts = torch.from_numpy(pts)
self.input_pts[location] = LabelTensor(pts, self.problem.input_variables)
self.input_pts[location].tensor.to(dtype=self.dtype, device=self.device)
self.input_pts[location].tensor.requires_grad_(True)
self.input_pts[location].tensor.retain_grad()
def train(self, stop=100, frequency_print=2, trial=None):
epoch = 0
## TODO for elliptic
# parameters = torch.cat(torch.linspace(
# self.problem.parameter_domain[0, 0],
# self.problem.parameter_domain[0, 1],
# 5)
## for param laplacian
#parameters = torch.rand(50, 2)*2-1
parameters = torch.from_numpy(Cube([[-1, 1], [-1, 1]]).discretize(5, 'grid'))
# alpha_p = torch.logspace(start=-2, end=0, steps=10)
# mu_p = torch.linspace(0.5, 3, 5)
# g1_, g2_ = torch.meshgrid(alpha_p, mu_p)
# parameters = torch.cat([g2_.reshape(-1, 1), g1_.reshape(-1, 1)], axis=1)
print(parameters)
while True:
losses = []
for condition_name in self.problem.conditions:
condition = self.problem.conditions[condition_name]
pts = self.input_pts[condition_name]
pts = torch.cat([
pts.tensor.repeat_interleave(parameters.shape[0], dim=0),
torch.tile(parameters, (pts.tensor.shape[0], 1))
], axis=1)
pts = LabelTensor(pts, self.problem.input_variables + self.problem.parameters)
predicted = self.model(pts.tensor)
#predicted = self.model(pts)
if isinstance(self.problem.conditions[condition_name]['func'], list):
for func in self.problem.conditions[condition_name]['func']:
residuals = func(pts, None, predicted)
losses.append(self._compute_norm(residuals))
else:
residuals = self.problem.conditions[condition_name]['func'](pts, None, predicted)
losses.append(self._compute_norm(residuals))
residuals = condition.function(pts, predicted)
losses.append(self._compute_norm(residuals))
self.optimizer.zero_grad()
sum(losses).backward()
self.optimizer.step()
@@ -268,15 +163,15 @@ class ParametricPINN(PINN):
if trial:
import optuna
rial.report(loss[0].item()+loss[1].item(), epoch)
trial.report(loss[0].item()+loss[1].item(), epoch)
if trial.should_prune():
raise optuna.exceptions.TrialPruned()
if isinstance(stop, int):
if epoch == stop:
if epoch == stop:
break
elif isinstance(stop, float):
if loss[0].item() + loss[1].item() < stop:
if loss[0].item() + loss[1].item() < stop:
break
if epoch % frequency_print == 0:
@@ -289,7 +184,7 @@ class ParametricPINN(PINN):
def error(self, dtype='l2', res=100):
import numpy as np
if hasattr(self.problem, 'truth_solution') and self.problem.truth_solution is not None:
pts_container = []
@@ -305,8 +200,8 @@ class ParametricPINN(PINN):
unrolled_pts = torch.tensor([t.flatten() for t in grids_container]).T.to(dtype=self.dtype, device=self.device)
Z_pred = self.model(unrolled_pts)
Z_pred = Z_pred.detach().numpy().reshape(grids_container[0].shape)
if dtype == 'l2':
if dtype == 'l2':
return np.linalg.norm(Z_pred - Z_true)/np.linalg.norm(Z_true)
else:
# TODO H1
@@ -339,7 +234,7 @@ class ParametricPINN(PINN):
for i, output in enumerate(Z_pred.tensor.T, start=1):
output = output.detach().numpy().reshape(grids_container[0].shape)
plt.subplot(1, n, i)
plt.subplot(1, n, i)
plt.contourf(*grids_container, output)
plt.colorbar()
@@ -354,14 +249,14 @@ class ParametricPINN(PINN):
import matplotlib
matplotlib.use('GTK3Agg')
import matplotlib.pyplot as plt
if hasattr(self.problem, 'truth_solution') and self.problem.truth_solution is not None:
n_plot = 2
elif hasattr(self.problem, 'data_solution') and self.problem.data_solution is not None:
n_plot = 2
else:
n_plot = 1
fig, axs = plt.subplots(nrows=1, ncols=n_plot, figsize=(n_plot*6,4))
if not isinstance(axs, np.ndarray): axs = [axs]
@@ -369,14 +264,14 @@ class ParametricPINN(PINN):
grids_container = self.problem.data_solution['grid']
Z_true = self.problem.data_solution['grid_solution']
elif hasattr(self.problem, 'truth_solution') and self.problem.truth_solution is not None:
pts_container = []
for mn, mx in self.problem.domain_bound:
pts_container.append(np.linspace(mn, mx, res))
grids_container = np.meshgrid(*pts_container)
Z_true = self.problem.truth_solution(*grids_container)
pts_container = []
for mn, mx in self.problem.domain_bound:
pts_container.append(np.linspace(mn, mx, res))
@@ -396,38 +291,38 @@ class ParametricPINN(PINN):
set_pred = axs[0].contourf(*grids_container, Z_pred)
axs[0].set_title('PINN [trained epoch = {}]'.format(self.trained_epoch) + " " + variable) #TODO add info about parameter in the title
fig.colorbar(set_pred, ax=axs[0])
if n_plot == 2:
set_true = axs[1].contourf(*grids_container, Z_true)
axs[1].set_title('Truth solution')
fig.colorbar(set_true, ax=axs[1])
if filename is None:
plt.show()
else:
fig.savefig(filename + " " + variable)
def plot_error(self, res, filename=None):
import matplotlib
matplotlib.use('GTK3Agg')
import matplotlib.pyplot as plt
fig, axs = plt.subplots(nrows=1, ncols=1, figsize=(6,4))
if not isinstance(axs, np.ndarray): axs = [axs]
if hasattr(self.problem, 'data_solution') and self.problem.data_solution is not None:
grids_container = self.problem.data_solution['grid']
Z_true = self.problem.data_solution['grid_solution']
elif hasattr(self.problem, 'truth_solution') and self.problem.truth_solution is not None:
elif hasattr(self.problem, 'truth_solution') and self.problem.truth_solution is not None:
pts_container = []
for mn, mx in self.problem.domain_bound:
pts_container.append(np.linspace(mn, mx, res))
grids_container = np.meshgrid(*pts_container)
Z_true = self.problem.truth_solution(*grids_container)
Z_true = self.problem.truth_solution(*grids_container)
try:
unrolled_pts = torch.tensor([t.flatten() for t in grids_container]).T.to(dtype=self.type)
@@ -449,7 +344,7 @@ class ParametricPINN(PINN):
'''
print(self.pred_loss.item(),loss.item(), self.old_loss.item())
if self.accelerate is not None:
if self.pred_loss > loss and loss >= self.old_loss:
if self.pred_loss > loss and loss >= self.old_loss:
self.current_lr = self.original_lr
#print('restart')
elif (loss-self.pred_loss).item() < 0.1:
@@ -459,7 +354,7 @@ if self.accelerate is not None:
self.current_lr -= .5*self.current_lr
#print(self.current_lr)
#self.current_lr = min(loss.item()*3, 0.02)
for g in self.optimizer.param_groups:
g['lr'] = self.current_lr
g['lr'] = self.current_lr
'''

View File

@@ -1,4 +1,11 @@
__all__ = [
'AbstractProblem',
'SpatialProblem',
'TimeDependentProblem',
'ParametricProblem',
]
from .abstract_problem import AbstractProblem
from .problem2d import Problem2D
from .problem1d import Problem1D
from .spatial_problem import SpatialProblem
from .timedep_problem import TimeDependentProblem
from .parametric_problem import ParametricProblem

View File

@@ -4,9 +4,23 @@ from abc import ABCMeta, abstractmethod
class AbstractProblem(metaclass=ABCMeta):
@property
@abstractmethod
def input_variables(self):
pass
variables = []
if hasattr(self, 'spatial_variables'):
variables += self.spatial_variables
if hasattr(self, 'temporal_variable'):
variables += self.temporal_variable
if hasattr(self, 'parameters'):
variables += self.parameters
if hasattr(self, 'custom_variables'):
variables += self.custom_variables
return variables
@input_variables.setter
def input_variables(self, variables):
raise NotImplementedError
@property
@abstractmethod

View File

@@ -0,0 +1,11 @@
from abc import abstractmethod
from .abstract_problem import AbstractProblem
class ParametricProblem(AbstractProblem):
@property
@abstractmethod
def parameters(self):
pass

View File

@@ -1,6 +0,0 @@
from .abstract_problem import AbstractProblem
class Problem1D(AbstractProblem):
spatial_dimensions = 1

View File

@@ -1,6 +0,0 @@
from .abstract_problem import AbstractProblem
class Problem2D(AbstractProblem):
spatial_dimensions = 2

View File

@@ -0,0 +1,10 @@
from abc import abstractmethod
from .abstract_problem import AbstractProblem
class SpatialProblem(AbstractProblem):
@abstractmethod
def spatial_variables(self):
pass

View File

@@ -1,6 +1,11 @@
from abc import abstractmethod
from .abstract_problem import AbstractProblem
class TimeDependentProblem(AbstractProblem):
pass
@property
@abstractmethod
def temporal_variable(self):
pass

View File

@@ -1,27 +0,0 @@
import torch
import numpy as np
from .chebyshev import chebyshev_roots
class Segment():
def __init__(self, p1, p2):
self.p1 = p1
self.p2 = p2
def discretize(self, n, mode='random'):
pts = []
if mode == 'random':
iterator = np.random.uniform(0, 1, n)
elif mode == 'grid':
iterator = np.linspace(0, 1, n)
elif mode == 'chebyshev':
iterator = chebyshev_roots(n) * .5 + .5
for k in iterator:
x = self.p1[0] + k*(self.p2[0]-self.p1[0])
y = self.p1[1] + k*(self.p2[1]-self.p1[1])
pts.append((x, y))
pts = np.array(pts)
return pts

68
pina/span.py Normal file
View File

@@ -0,0 +1,68 @@
import numpy as np
from .chebyshev import chebyshev_roots
import torch
from .location import Location
from .label_tensor import LabelTensor
class Span(Location):
def __init__(self, span_dict):
self.fixed_ = {}
self.range_ = {}
for k, v in span_dict.items():
if isinstance(v, (int, float)):
self.fixed_[k] = v
elif isinstance(v, (list, tuple)) and len(v) == 2:
self.range_[k] = v
else:
raise TypeError
def sample(self, n, mode='random'):
bounds = np.array(list(self.range_.values()))
if mode == 'random':
pts = np.random.uniform(size=(n, bounds.shape[0]))
elif mode == 'chebyshev':
pts = np.array([
chebyshev_roots(n) * .5 + .5
for _ in range(bounds.shape[0])])
grids = np.meshgrid(*pts)
pts = np.hstack([grid.reshape(-1, 1) for grid in grids])
elif mode == 'grid':
pts = np.array([
np.linspace(0, 1, n)
for _ in range(bounds.shape[0])])
grids = np.meshgrid(*pts)
pts = np.hstack([grid.reshape(-1, 1) for grid in grids])
elif mode == 'lh' or mode == 'latin':
from scipy.stats import qmc
sampler = qmc.LatinHypercube(d=bounds.shape[0])
pts = sampler.random(n)
# Scale pts
pts *= bounds[:, 1] - bounds[:, 0]
pts += bounds[:, 0]
pts = torch.from_numpy(pts)
pts_range_ = LabelTensor(pts, list(self.range_.keys()))
fixed = torch.Tensor(list(self.fixed_.values()))
pts_fixed_ = torch.ones(pts_range_.tensor.shape[0], len(self.fixed_)) * fixed
pts_fixed_ = LabelTensor(pts_fixed_, list(self.fixed_.keys()))
if self.fixed_:
return LabelTensor.hstack([pts_range_, pts_fixed_])
else:
return pts_range_
def meshgrid(self, n):
pts = np.array([
np.linspace(0, 1, n)
for _ in range(self.bound.shape[0])])
pts *= self.bound[:, 1] - self.bound[:, 0]
pts += self.bound[:, 0]
return np.meshgrid(*pts)