update tutorial3

This commit is contained in:
Dario Coscia
2025-03-07 18:26:18 +01:00
committed by Nicola Demo
parent dc71d328cf
commit 7ef39f1e3b
2 changed files with 422 additions and 305 deletions

View File

@@ -32,16 +32,19 @@
" \n", " \n",
"import torch\n", "import torch\n",
"import matplotlib.pyplot as plt\n", "import matplotlib.pyplot as plt\n",
"plt.style.use('tableau-colorblind10')\n", "import warnings\n",
"\n", "\n",
"from pina import Condition, LabelTensor\n",
"from pina.problem import SpatialProblem, TimeDependentProblem\n", "from pina.problem import SpatialProblem, TimeDependentProblem\n",
"from pina.operator import laplacian, grad\n", "from pina.operator import laplacian, grad\n",
"from pina.domain import CartesianDomain\n", "from pina.domain import CartesianDomain\n",
"from pina.solver import PINN\n", "from pina.solver import PINN\n",
"from pina.trainer import Trainer\n", "from pina.trainer import Trainer\n",
"from pina.equation import Equation\n", "from pina.equation import Equation, FixedValue\n",
"from pina.equation.equation_factory import FixedValue\n", "\n",
"from pina import Condition, LabelTensor" "from lightning.pytorch.loggers import TensorBoardLogger\n",
"\n",
"warnings.filterwarnings('ignore')"
] ]
}, },
{ {
@@ -86,37 +89,61 @@
"outputs": [], "outputs": [],
"source": [ "source": [
"class Wave(TimeDependentProblem, SpatialProblem):\n", "class Wave(TimeDependentProblem, SpatialProblem):\n",
" output_variables = ['u']\n", " output_variables = [\"u\"]\n",
" spatial_domain = CartesianDomain({'x': [0, 1], 'y': [0, 1]})\n", " spatial_domain = CartesianDomain({\"x\": [0, 1], \"y\": [0, 1]})\n",
" temporal_domain = CartesianDomain({'t': [0, 1]})\n", " temporal_domain = CartesianDomain({\"t\": [0, 1]})\n",
"\n", "\n",
" def wave_equation(input_, output_):\n", " def wave_equation(input_, output_):\n",
" u_t = grad(output_, input_, components=['u'], d=['t'])\n", " u_t = grad(output_, input_, components=[\"u\"], d=[\"t\"])\n",
" u_tt = grad(u_t, input_, components=['dudt'], d=['t'])\n", " u_tt = grad(u_t, input_, components=[\"dudt\"], d=[\"t\"])\n",
" nabla_u = laplacian(output_, input_, components=['u'], d=['x', 'y'])\n", " nabla_u = laplacian(output_, input_, components=[\"u\"], d=[\"x\", \"y\"])\n",
" return nabla_u - u_tt\n", " return nabla_u - u_tt\n",
"\n", "\n",
" def initial_condition(input_, output_):\n", " def initial_condition(input_, output_):\n",
" u_expected = (torch.sin(torch.pi*input_.extract(['x'])) *\n", " u_expected = torch.sin(torch.pi * input_.extract([\"x\"])) * torch.sin(\n",
" torch.sin(torch.pi*input_.extract(['y'])))\n", " torch.pi * input_.extract([\"y\"])\n",
" return output_.extract(['u']) - u_expected\n", " )\n",
" return output_.extract([\"u\"]) - u_expected\n",
"\n", "\n",
" conditions = {\n", " conditions = {\n",
" 'bound_cond1': Condition(domain=CartesianDomain({'x': [0, 1], 'y': 1, 't': [0, 1]}), equation=FixedValue(0.)),\n", " \"bound_cond1\": Condition(\n",
" 'bound_cond2': Condition(domain=CartesianDomain({'x': [0, 1], 'y': 0, 't': [0, 1]}), equation=FixedValue(0.)),\n", " domain=CartesianDomain({\"x\": [0, 1], \"y\": 1, \"t\": [0, 1]}),\n",
" 'bound_cond3': Condition(domain=CartesianDomain({'x': 1, 'y': [0, 1], 't': [0, 1]}), equation=FixedValue(0.)),\n", " equation=FixedValue(0.0),\n",
" 'bound_cond4': Condition(domain=CartesianDomain({'x': 0, 'y': [0, 1], 't': [0, 1]}), equation=FixedValue(0.)),\n", " ),\n",
" 'time_cond': Condition(domain=CartesianDomain({'x': [0, 1], 'y': [0, 1], 't': 0}), equation=Equation(initial_condition)),\n", " \"bound_cond2\": Condition(\n",
" 'phys_cond': Condition(domain=CartesianDomain({'x': [0, 1], 'y': [0, 1], 't': [0, 1]}), equation=Equation(wave_equation)),\n", " domain=CartesianDomain({\"x\": [0, 1], \"y\": 0, \"t\": [0, 1]}),\n",
" equation=FixedValue(0.0),\n",
" ),\n",
" \"bound_cond3\": Condition(\n",
" domain=CartesianDomain({\"x\": 1, \"y\": [0, 1], \"t\": [0, 1]}),\n",
" equation=FixedValue(0.0),\n",
" ),\n",
" \"bound_cond4\": Condition(\n",
" domain=CartesianDomain({\"x\": 0, \"y\": [0, 1], \"t\": [0, 1]}),\n",
" equation=FixedValue(0.0),\n",
" ),\n",
" \"time_cond\": Condition(\n",
" domain=CartesianDomain({\"x\": [0, 1], \"y\": [0, 1], \"t\": 0}),\n",
" equation=Equation(initial_condition),\n",
" ),\n",
" \"phys_cond\": Condition(\n",
" domain=CartesianDomain({\"x\": [0, 1], \"y\": [0, 1], \"t\": [0, 1]}),\n",
" equation=Equation(wave_equation),\n",
" ),\n",
" }\n", " }\n",
"\n", "\n",
" def wave_sol(self, pts):\n", " def truth_solution(self, pts):\n",
" return (torch.sin(torch.pi*pts.extract(['x'])) *\n", " f = (\n",
" torch.sin(torch.pi*pts.extract(['y'])) *\n", " torch.sin(torch.pi * pts.extract([\"x\"]))\n",
" torch.cos(torch.sqrt(torch.tensor(2.))*torch.pi*pts.extract(['t'])))\n", " * torch.sin(torch.pi * pts.extract([\"y\"]))\n",
" * torch.cos(\n",
" torch.sqrt(torch.tensor(2.0)) * torch.pi * pts.extract([\"t\"])\n",
" )\n",
" )\n",
" return LabelTensor(f, self.output_variables)\n",
"\n", "\n",
" truth_solution = wave_sol\n",
"\n", "\n",
"# define problem\n",
"problem = Wave()" "problem = Wave()"
] ]
}, },
@@ -152,16 +179,23 @@
" def __init__(self, input_dim, output_dim):\n", " def __init__(self, input_dim, output_dim):\n",
" super().__init__()\n", " super().__init__()\n",
"\n", "\n",
" self.layers = torch.nn.Sequential(torch.nn.Linear(input_dim, 40),\n", " self.layers = torch.nn.Sequential(\n",
" torch.nn.Linear(input_dim, 40),\n",
" torch.nn.ReLU(),\n", " torch.nn.ReLU(),\n",
" torch.nn.Linear(40, 40),\n", " torch.nn.Linear(40, 40),\n",
" torch.nn.ReLU(),\n", " torch.nn.ReLU(),\n",
" torch.nn.Linear(40, output_dim))\n", " torch.nn.Linear(40, output_dim),\n",
" \n", " )\n",
"\n",
" # here in the foward we implement the hard constraints\n", " # here in the foward we implement the hard constraints\n",
" def forward(self, x):\n", " def forward(self, x):\n",
" hard = x.extract(['x'])*(1-x.extract(['x']))*x.extract(['y'])*(1-x.extract(['y']))\n", " hard = (\n",
" return hard*self.layers(x)" " x.extract([\"x\"])\n",
" * (1 - x.extract([\"x\"]))\n",
" * x.extract([\"y\"])\n",
" * (1 - x.extract([\"y\"]))\n",
" )\n",
" return hard * self.layers(x)"
] ]
}, },
{ {
@@ -177,7 +211,7 @@
"id": "b465bebd", "id": "b465bebd",
"metadata": {}, "metadata": {},
"source": [ "source": [
"In this tutorial, the neural network is trained for 1000 epochs with a learning rate of 0.001 (default in `PINN`). Training takes approximately 3 minutes." "In this tutorial, the neural network is trained for 1000 epochs with a learning rate of 0.001 (default in `PINN`). As always, we will log using `Tensorboard`."
] ]
}, },
{ {
@@ -188,22 +222,66 @@
"outputs": [], "outputs": [],
"source": [ "source": [
"# generate the data\n", "# generate the data\n",
"problem.discretise_domain(1000, 'random', domains=['phys_cond', 'time_cond', 'bound_cond1', 'bound_cond2', 'bound_cond3', 'bound_cond4'])\n", "problem.discretise_domain(\n",
" 1000,\n",
" \"random\",\n",
" domains=[\n",
" \"phys_cond\",\n",
" \"time_cond\",\n",
" \"bound_cond1\",\n",
" \"bound_cond2\",\n",
" \"bound_cond3\",\n",
" \"bound_cond4\",\n",
" ],\n",
")\n",
"\n", "\n",
"# create the solver\n", "# define model\n",
"pinn = PINN(problem, HardMLP(len(problem.input_variables), len(problem.output_variables)))\n", "model = HardMLP(len(problem.input_variables), len(problem.output_variables))\n",
"\n",
"# crete the solver\n",
"pinn = PINN(problem=problem, model=model)\n",
"\n", "\n",
"# create trainer and train\n", "# create trainer and train\n",
"trainer = Trainer(pinn, max_epochs=1000, accelerator='cpu', enable_model_summary=False) # we train on CPU and avoid model summary at beginning of training (optional)\n", "trainer = Trainer(\n",
" solver=pinn,\n",
" max_epochs=1000,\n",
" accelerator=\"cpu\",\n",
" enable_model_summary=False,\n",
" train_size=1.0,\n",
" val_size=0.0,\n",
" test_size=0.0,\n",
" logger=TensorBoardLogger(\"tutorial_logs\"),\n",
" enable_progress_bar=False,\n",
")\n",
"trainer.train()" "trainer.train()"
] ]
}, },
{
"cell_type": "markdown",
"id": "4c6dbfac",
"metadata": {},
"source": [
"Let's now plot the logging to see how the losses vary during training. For this, we will use `TensorBoard`."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "77bfcb6e",
"metadata": {},
"outputs": [],
"source": [
"# Load the TensorBoard extension\n",
"%load_ext tensorboard\n",
"%tensorboard --logdir 'tutorial_logs'\n"
]
},
{ {
"cell_type": "markdown", "cell_type": "markdown",
"id": "c2a5c405", "id": "c2a5c405",
"metadata": {}, "metadata": {},
"source": [ "source": [
"Notice that the loss on the boundaries of the spatial domain is exactly zero, as expected! After the training is completed one can now plot some results using the `matplotlib`. We plot the predicted output on the left side, the true solution at the center and the difference on the right side." "Notice that the loss on the boundaries of the spatial domain is exactly zero, as expected! After the training is completed one can now plot some results using the `matplotlib`. We plot the predicted output on the left side, the true solution at the center and the difference on the right side using the `plot_solution` function."
] ]
}, },
{ {
@@ -213,32 +291,35 @@
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
"source": [ "source": [
"def fixed_time_plot(fixed_variables, pinn):\n", "@torch.no_grad()\n",
" #sample domain points and get values corresponding to fixed variables\n", "def plot_solution(solver, time):\n",
" pts = pinn.problem.spatial_domain.sample(256, 'grid', variables=['x','y'])\n", " # get the problem\n",
" grids = [p_.reshape(256, 256) for p_ in pts.extract(['x','y']).T]\n", " problem = solver.problem\n",
" fixed_pts = torch.ones(pts.shape[0], len(fixed_variables))\n", " # get spatial points\n",
" fixed_pts *= torch.tensor(list(fixed_variables.values()))\n", " spatial_samples = problem.spatial_domain.sample(30, \"grid\")\n",
" fixed_pts = fixed_pts.as_subclass(LabelTensor)\n", " # get temporal value\n",
" fixed_pts.labels = list(fixed_variables.keys())\n", " time = LabelTensor(torch.tensor([[time]]), \"t\")\n",
" pts = pts.append(fixed_pts).to(device=pinn.device)\n", " # cross data\n",
" predicted_output = pinn.forward(pts).extract('u').as_subclass(torch.Tensor).cpu().detach().reshape(256,256)\n", " points = spatial_samples.append(time, mode=\"cross\")\n",
" #get true solution\n", " # compute pinn solution, true solution and absolute difference\n",
" true_output = pinn.problem.truth_solution(pts).cpu().detach().reshape(256,256)\n", " data = {\n",
" pts = pts.cpu()\n", " \"PINN solution\": solver(points),\n",
" #plot prediction, true solution and difference\n", " \"True solution\": problem.truth_solution(points),\n",
" grids = [p_.reshape(256, 256) for p_ in pts.extract(['x','y']).T]\n", " \"Absolute Difference\": torch.abs(\n",
" fig, ax = plt.subplots(nrows=1, ncols=3, figsize=(16, 6))\n", " solver(points) - problem.truth_solution(points)\n",
" cb = getattr(ax[0], 'contourf')(*grids, predicted_output)\n", " )\n",
" fig.colorbar(cb, ax=ax[0])\n", " }\n",
" ax[0].title.set_text('Neural Network prediction')\n", " # plot the solution\n",
" cb = getattr(ax[1], 'contourf')(*grids, true_output)\n", " plt.suptitle(f'Solution for time {time.item()}')\n",
" fig.colorbar(cb, ax=ax[1])\n", " for idx, (title, field) in enumerate(data.items()):\n",
" ax[1].title.set_text('True solution')\n", " plt.subplot(1, 3, idx + 1)\n",
" cb = getattr(ax[2],'contourf')(*grids,(true_output - predicted_output))\n", " plt.title(title)\n",
" fig.colorbar(cb, ax=ax[2])\n", " plt.tricontourf( # convert to torch tensor + flatten\n",
" ax[2].title.set_text('Residual')\n", " points.extract(\"x\").tensor.flatten(),\n",
" plt.show(block=True)" " points.extract(\"y\").tensor.flatten(),\n",
" field.tensor.flatten(),\n",
" )\n",
" plt.colorbar(), plt.tight_layout()"
] ]
}, },
{ {
@@ -256,12 +337,14 @@
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
"source": [ "source": [
"print('Plotting at t=0')\n", "plt.figure(figsize=(12, 6))\n",
"fixed_time_plot(fixed_variables={'t':0.0},pinn=pinn)\n", "plot_solution(solver=pinn, time=0)\n",
"print('Plotting at t=0.5')\n", "\n",
"fixed_time_plot(fixed_variables={'t':0.5},pinn=pinn)\n", "plt.figure(figsize=(12, 6))\n",
"print('Plotting at t=1.0')\n", "plot_solution(solver=pinn, time=0.5)\n",
"fixed_time_plot(fixed_variables={'t':1.0},pinn=pinn)" "\n",
"plt.figure(figsize=(12, 6))\n",
"plot_solution(solver=pinn, time=1)"
] ]
}, },
{ {
@@ -290,17 +373,30 @@
" def __init__(self, input_dim, output_dim):\n", " def __init__(self, input_dim, output_dim):\n",
" super().__init__()\n", " super().__init__()\n",
"\n", "\n",
" self.layers = torch.nn.Sequential(torch.nn.Linear(input_dim, 40),\n", " self.layers = torch.nn.Sequential(\n",
" torch.nn.Linear(input_dim, 40),\n",
" torch.nn.ReLU(),\n", " torch.nn.ReLU(),\n",
" torch.nn.Linear(40, 40),\n", " torch.nn.Linear(40, 40),\n",
" torch.nn.ReLU(),\n", " torch.nn.ReLU(),\n",
" torch.nn.Linear(40, output_dim))\n", " torch.nn.Linear(40, output_dim),\n",
" \n", " )\n",
"\n",
" # here in the foward we implement the hard constraints\n", " # here in the foward we implement the hard constraints\n",
" def forward(self, x):\n", " def forward(self, x):\n",
" hard_space = x.extract(['x'])*(1-x.extract(['x']))*x.extract(['y'])*(1-x.extract(['y']))\n", " hard_space = (\n",
" hard_t = torch.sin(torch.pi*x.extract(['x'])) * torch.sin(torch.pi*x.extract(['y'])) * torch.cos(torch.sqrt(torch.tensor(2.))*torch.pi*x.extract(['t']))\n", " x.extract([\"x\"])\n",
" return hard_space * self.layers(x) * x.extract(['t']) + hard_t" " * (1 - x.extract([\"x\"]))\n",
" * x.extract([\"y\"])\n",
" * (1 - x.extract([\"y\"]))\n",
" )\n",
" hard_t = (\n",
" torch.sin(torch.pi * x.extract([\"x\"]))\n",
" * torch.sin(torch.pi * x.extract([\"y\"]))\n",
" * torch.cos(\n",
" torch.sqrt(torch.tensor(2.0)) * torch.pi * x.extract([\"t\"])\n",
" )\n",
" )\n",
" return hard_space * self.layers(x) * x.extract([\"t\"]) + hard_t"
] ]
}, },
{ {
@@ -308,7 +404,7 @@
"id": "5d3dc67b", "id": "5d3dc67b",
"metadata": {}, "metadata": {},
"source": [ "source": [
"Now let's train with the same configuration as thre previous test" "Now let's train with the same configuration as the previous test"
] ]
}, },
{ {
@@ -318,14 +414,24 @@
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
"source": [ "source": [
"# generate the data\n", "# define model\n",
"problem.discretise_domain(1000, 'random', domains=['phys_cond', 'time_cond', 'bound_cond1', 'bound_cond2', 'bound_cond3', 'bound_cond4'])\n", "model = HardMLPtime(len(problem.input_variables), len(problem.output_variables))\n",
"\n", "\n",
"# crete the solver\n", "# crete the solver\n",
"pinn = PINN(problem, HardMLPtime(len(problem.input_variables), len(problem.output_variables)))\n", "pinn = PINN(problem=problem, model=model)\n",
"\n", "\n",
"# create trainer and train\n", "# create trainer and train\n",
"trainer = Trainer(pinn, max_epochs=1000, accelerator='cpu', enable_model_summary=False) # we train on CPU and avoid model summary at beginning of training (optional)\n", "trainer = Trainer(\n",
" solver=pinn,\n",
" max_epochs=1000,\n",
" accelerator=\"cpu\",\n",
" enable_model_summary=False,\n",
" train_size=1.0,\n",
" val_size=0.0,\n",
" test_size=0.0,\n",
" logger=TensorBoardLogger(\"tutorial_logs\"),\n",
" enable_progress_bar=False,\n",
")\n",
"trainer.train()" "trainer.train()"
] ]
}, },
@@ -344,12 +450,14 @@
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
"source": [ "source": [
"print('Plotting at t=0')\n", "plt.figure(figsize=(12, 6))\n",
"fixed_time_plot(fixed_variables={'t':0.0},pinn=pinn)\n", "plot_solution(solver=pinn, time=0)\n",
"print('Plotting at t=0.5')\n", "\n",
"fixed_time_plot(fixed_variables={'t':0.5},pinn=pinn)\n", "plt.figure(figsize=(12, 6))\n",
"print('Plotting at t=1.0')\n", "plot_solution(solver=pinn, time=0.5)\n",
"fixed_time_plot(fixed_variables={'t':1.0},pinn=pinn)\n" "\n",
"plt.figure(figsize=(12, 6))\n",
"plot_solution(solver=pinn, time=1)"
] ]
}, },
{ {
@@ -357,7 +465,17 @@
"id": "b7338109", "id": "b7338109",
"metadata": {}, "metadata": {},
"source": [ "source": [
"We can see now that the results are way better! This is due to the fact that previously the network was not learning correctly the initial conditon, leading to a poor solution when time evolved. By imposing the initial condition the network is able to correctly solve the problem." "We can see now that the results are way better! This is due to the fact that previously the network was not learning correctly the initial conditon, leading to a poor solution when time evolved. By imposing the initial condition the network is able to correctly solve the problem. We can also see using Tensorboard how the two losses decreased:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7ce34dac",
"metadata": {},
"outputs": [],
"source": [
"%tensorboard --logdir 'tutorial_logs'"
] ]
}, },
{ {
@@ -381,7 +499,7 @@
], ],
"metadata": { "metadata": {
"kernelspec": { "kernelspec": {
"display_name": "Python 3", "display_name": "pina",
"language": "python", "language": "python",
"name": "python3" "name": "python3"
}, },
@@ -395,7 +513,7 @@
"name": "python", "name": "python",
"nbconvert_exporter": "python", "nbconvert_exporter": "python",
"pygments_lexer": "ipython3", "pygments_lexer": "ipython3",
"version": "3.12.7" "version": "3.9.21"
} }
}, },
"nbformat": 4, "nbformat": 4,

View File

@@ -9,7 +9,7 @@
# #
# First of all, some useful imports. # First of all, some useful imports.
# In[1]: # In[ ]:
## routine needed to run the notebook on Google Colab ## routine needed to run the notebook on Google Colab
@@ -22,16 +22,20 @@ if IN_COLAB:
get_ipython().system('pip install "pina-mathlab"') get_ipython().system('pip install "pina-mathlab"')
import torch import torch
import matplotlib.pyplot as plt
import warnings
import matplotlib.pylab as plt from pina import Condition, LabelTensor
from pina.problem import SpatialProblem, TimeDependentProblem from pina.problem import SpatialProblem, TimeDependentProblem
from pina.operator import laplacian, grad from pina.operator import laplacian, grad
from pina.domain import CartesianDomain from pina.domain import CartesianDomain
from pina.solver import PINN from pina.solver import PINN
from pina.trainer import Trainer from pina.trainer import Trainer
from pina.equation import Equation from pina.equation import Equation, FixedValue
from pina.equation.equation_factory import FixedValue
from pina import Condition, LabelTensor from lightning.pytorch.loggers import TensorBoardLogger
warnings.filterwarnings('ignore')
# ## The problem definition # ## The problem definition
@@ -50,41 +54,65 @@ from pina import Condition, LabelTensor
# Now, the wave problem is written in PINA code as a class, inheriting from `SpatialProblem` and `TimeDependentProblem` since we deal with spatial, and time dependent variables. The equations are written as `conditions` that should be satisfied in the corresponding domains. `truth_solution` is the exact solution which will be compared with the predicted one. # Now, the wave problem is written in PINA code as a class, inheriting from `SpatialProblem` and `TimeDependentProblem` since we deal with spatial, and time dependent variables. The equations are written as `conditions` that should be satisfied in the corresponding domains. `truth_solution` is the exact solution which will be compared with the predicted one.
# In[2]: # In[ ]:
class Wave(TimeDependentProblem, SpatialProblem): class Wave(TimeDependentProblem, SpatialProblem):
output_variables = ['u'] output_variables = ["u"]
spatial_domain = CartesianDomain({'x': [0, 1], 'y': [0, 1]}) spatial_domain = CartesianDomain({"x": [0, 1], "y": [0, 1]})
temporal_domain = CartesianDomain({'t': [0, 1]}) temporal_domain = CartesianDomain({"t": [0, 1]})
def wave_equation(input_, output_): def wave_equation(input_, output_):
u_t = grad(output_, input_, components=['u'], d=['t']) u_t = grad(output_, input_, components=["u"], d=["t"])
u_tt = grad(u_t, input_, components=['dudt'], d=['t']) u_tt = grad(u_t, input_, components=["dudt"], d=["t"])
nabla_u = laplacian(output_, input_, components=['u'], d=['x', 'y']) nabla_u = laplacian(output_, input_, components=["u"], d=["x", "y"])
return nabla_u - u_tt return nabla_u - u_tt
def initial_condition(input_, output_): def initial_condition(input_, output_):
u_expected = (torch.sin(torch.pi*input_.extract(['x'])) * u_expected = torch.sin(torch.pi * input_.extract(["x"])) * torch.sin(
torch.sin(torch.pi*input_.extract(['y']))) torch.pi * input_.extract(["y"])
return output_.extract(['u']) - u_expected )
return output_.extract(["u"]) - u_expected
conditions = { conditions = {
'bound_cond1': Condition(domain=CartesianDomain({'x': [0, 1], 'y': 1, 't': [0, 1]}), equation=FixedValue(0.)), "bound_cond1": Condition(
'bound_cond2': Condition(domain=CartesianDomain({'x': [0, 1], 'y': 0, 't': [0, 1]}), equation=FixedValue(0.)), domain=CartesianDomain({"x": [0, 1], "y": 1, "t": [0, 1]}),
'bound_cond3': Condition(domain=CartesianDomain({'x': 1, 'y': [0, 1], 't': [0, 1]}), equation=FixedValue(0.)), equation=FixedValue(0.0),
'bound_cond4': Condition(domain=CartesianDomain({'x': 0, 'y': [0, 1], 't': [0, 1]}), equation=FixedValue(0.)), ),
'time_cond': Condition(domain=CartesianDomain({'x': [0, 1], 'y': [0, 1], 't': 0}), equation=Equation(initial_condition)), "bound_cond2": Condition(
'phys_cond': Condition(domain=CartesianDomain({'x': [0, 1], 'y': [0, 1], 't': [0, 1]}), equation=Equation(wave_equation)), domain=CartesianDomain({"x": [0, 1], "y": 0, "t": [0, 1]}),
equation=FixedValue(0.0),
),
"bound_cond3": Condition(
domain=CartesianDomain({"x": 1, "y": [0, 1], "t": [0, 1]}),
equation=FixedValue(0.0),
),
"bound_cond4": Condition(
domain=CartesianDomain({"x": 0, "y": [0, 1], "t": [0, 1]}),
equation=FixedValue(0.0),
),
"time_cond": Condition(
domain=CartesianDomain({"x": [0, 1], "y": [0, 1], "t": 0}),
equation=Equation(initial_condition),
),
"phys_cond": Condition(
domain=CartesianDomain({"x": [0, 1], "y": [0, 1], "t": [0, 1]}),
equation=Equation(wave_equation),
),
} }
def wave_sol(self, pts): def truth_solution(self, pts):
return (torch.sin(torch.pi*pts.extract(['x'])) * f = (
torch.sin(torch.pi*pts.extract(['y'])) * torch.sin(torch.pi * pts.extract(["x"]))
torch.cos(torch.sqrt(torch.tensor(2.))*torch.pi*pts.extract(['t']))) * torch.sin(torch.pi * pts.extract(["y"]))
* torch.cos(
torch.sqrt(torch.tensor(2.0)) * torch.pi * pts.extract(["t"])
)
)
return LabelTensor(f, self.output_variables)
truth_solution = wave_sol
# define problem
problem = Wave() problem = Wave()
@@ -96,7 +124,7 @@ problem = Wave()
# #
# where $NN$ is the neural net output. This neural network takes as input the coordinates (in this case $x$, $y$ and $t$) and provides the unknown field $u$. By construction, it is zero on the boundaries. The residuals of the equations are evaluated at several sampling points (which the user can manipulate using the method `discretise_domain`) and the loss minimized by the neural network is the sum of the residuals. # where $NN$ is the neural net output. This neural network takes as input the coordinates (in this case $x$, $y$ and $t$) and provides the unknown field $u$. By construction, it is zero on the boundaries. The residuals of the equations are evaluated at several sampling points (which the user can manipulate using the method `discretise_domain`) and the loss minimized by the neural network is the sum of the residuals.
# In[3]: # In[ ]:
class HardMLP(torch.nn.Module): class HardMLP(torch.nn.Module):
@@ -104,120 +132,129 @@ class HardMLP(torch.nn.Module):
def __init__(self, input_dim, output_dim): def __init__(self, input_dim, output_dim):
super().__init__() super().__init__()
self.layers = torch.nn.Sequential(torch.nn.Linear(input_dim, 40), self.layers = torch.nn.Sequential(
torch.nn.Linear(input_dim, 40),
torch.nn.ReLU(), torch.nn.ReLU(),
torch.nn.Linear(40, 40), torch.nn.Linear(40, 40),
torch.nn.ReLU(), torch.nn.ReLU(),
torch.nn.Linear(40, output_dim)) torch.nn.Linear(40, output_dim),
)
# here in the foward we implement the hard constraints # here in the foward we implement the hard constraints
def forward(self, x): def forward(self, x):
hard = x.extract(['x'])*(1-x.extract(['x']))*x.extract(['y'])*(1-x.extract(['y'])) hard = (
return hard*self.layers(x) x.extract(["x"])
* (1 - x.extract(["x"]))
* x.extract(["y"])
* (1 - x.extract(["y"]))
)
return hard * self.layers(x)
# ## Train and Inference # ## Train and Inference
# In this tutorial, the neural network is trained for 1000 epochs with a learning rate of 0.001 (default in `PINN`). Training takes approximately 3 minutes. # In this tutorial, the neural network is trained for 1000 epochs with a learning rate of 0.001 (default in `PINN`). As always, we will log using `Tensorboard`.
# In[4]: # In[ ]:
# generate the data # generate the data
problem.discretise_domain(1000, 'random', domains=['phys_cond', 'time_cond', 'bound_cond1', 'bound_cond2', 'bound_cond3', 'bound_cond4']) problem.discretise_domain(
1000,
"random",
domains=[
"phys_cond",
"time_cond",
"bound_cond1",
"bound_cond2",
"bound_cond3",
"bound_cond4",
],
)
# define model
model = HardMLP(len(problem.input_variables), len(problem.output_variables))
# crete the solver # crete the solver
pinn = PINN(problem, HardMLP(len(problem.input_variables), len(problem.output_variables))) pinn = PINN(problem=problem, model=model)
# create trainer and train # create trainer and train
trainer = Trainer(pinn, max_epochs=1000, accelerator='cpu', enable_model_summary=False) # we train on CPU and avoid model summary at beginning of training (optional) trainer = Trainer(
solver=pinn,
max_epochs=1000,
accelerator="cpu",
enable_model_summary=False,
train_size=1.0,
val_size=0.0,
test_size=0.0,
logger=TensorBoardLogger("tutorial_logs"),
enable_progress_bar=False,
)
trainer.train() trainer.train()
# Notice that the loss on the boundaries of the spatial domain is exactly zero, as expected! After the training is completed one can now plot some results using the `Plotter` class of **PINA**. # Let's now plot the logging to see how the losses vary during training. For this, we will use `TensorBoard`.
# In[5]: # In[ ]:
method='contourf' # Load the TensorBoard extension
# plotting at fixed time t = 0.0 get_ipython().run_line_magic('load_ext', 'tensorboard')
print('Plotting at t=0') get_ipython().run_line_magic('tensorboard', "--logdir 'tutorial_logs'")
fixed_variables={'t': 0.0}
pts = pinn.problem.spatial_domain.sample(256, 'grid', variables=['x','y'])
grids = [p_.reshape(256, 256) for p_ in pts.extract(['x','y']).T]
fixed_pts = torch.ones(pts.shape[0], len(fixed_variables))
fixed_pts *= torch.tensor(list(fixed_variables.values()))
fixed_pts = fixed_pts.as_subclass(LabelTensor)
fixed_pts.labels = list(fixed_variables.keys())
pts = pts.append(fixed_pts)
pts = pts.to(device=pinn.device)
predicted_output = pinn.forward(pts).extract('u').as_subclass(torch.Tensor).cpu().detach().reshape(256,256)
true_output = pinn.problem.truth_solution(pts).cpu().detach().reshape(256,256)
pts = pts.cpu()
grids = [p_.reshape(256, 256) for p_ in pts.extract(['x','y']).T]
fig, ax = plt.subplots(nrows=1, ncols=3, figsize=(16, 6))
cb = getattr(ax[0], method)(*grids, predicted_output)
fig.colorbar(cb, ax=ax[0])
ax[0].title.set_text('Neural Network prediction')
cb = getattr(ax[1], method)(*grids, true_output)
fig.colorbar(cb, ax=ax[1])
ax[1].title.set_text('True solution')
cb = getattr(ax[2],method)(*grids,(true_output - predicted_output))
fig.colorbar(cb, ax=ax[2])
ax[2].title.set_text('Residual')
# plotting at fixed time t = 0.5
print('Plotting at t=0.5')
#plotter.plot(pinn, fixed_variables={'t': 0.5})
fixed_variables={'t': 0.5}
pts = pinn.problem.spatial_domain.sample(256, 'grid', variables=['x','y'])
fixed_pts = torch.ones(pts.shape[0], len(fixed_variables))
fixed_pts *= torch.tensor(list(fixed_variables.values()))
fixed_pts = fixed_pts.as_subclass(LabelTensor)
fixed_pts.labels = list(fixed_variables.keys())
pts = pts.append(fixed_pts)
pts = pts.to(device=pinn.device)
predicted_output = pinn.forward(pts).extract('u').as_subclass(torch.Tensor).cpu().detach().reshape(256,256)
true_output = pinn.problem.truth_solution(pts).cpu().detach().reshape(256,256)
pts = pts.cpu()
grids = [p_.reshape(256, 256) for p_ in pts.extract(['x','y']).T]
fig, ax = plt.subplots(nrows=1, ncols=3, figsize=(16, 6))
cb = getattr(ax[0], method)(*grids, predicted_output)
fig.colorbar(cb, ax=ax[0])
ax[0].title.set_text('Neural Network prediction')
cb = getattr(ax[1], method)(*grids, true_output)
fig.colorbar(cb, ax=ax[1])
ax[1].title.set_text('True solution')
cb = getattr(ax[2],method)(*grids,(true_output - predicted_output))
fig.colorbar(cb, ax=ax[2])
ax[2].title.set_text('Residual')
# plotting at fixed time t = 1.
print('Plotting at t=1')
#plotter.plot(pinn, fixed_variables={'t': 1.0})
fixed_variables={'t': 1.0}
pts = pinn.problem.spatial_domain.sample(256, 'grid', variables=['x','y'])
fixed_pts = torch.ones(pts.shape[0], len(fixed_variables))
fixed_pts *= torch.tensor(list(fixed_variables.values()))
fixed_pts = fixed_pts.as_subclass(LabelTensor)
fixed_pts.labels = list(fixed_variables.keys())
pts = pts.append(fixed_pts)
pts = pts.to(device=pinn.device)
predicted_output = pinn.forward(pts).extract('u').as_subclass(torch.Tensor).cpu().detach().reshape(256,256)
true_output = pinn.problem.truth_solution(pts).cpu().detach().reshape(256,256)
pts = pts.cpu()
grids = [p_.reshape(256, 256) for p_ in pts.extract(['x','y']).T]
fig, ax = plt.subplots(nrows=1, ncols=3, figsize=(16, 6))
cb = getattr(ax[0], method)(*grids, predicted_output)
fig.colorbar(cb, ax=ax[0])
ax[0].title.set_text('Neural Network prediction')
cb = getattr(ax[1], method)(*grids, true_output)
fig.colorbar(cb, ax=ax[1])
ax[1].title.set_text('True solution')
cb = getattr(ax[2],method)(*grids,(true_output - predicted_output))
fig.colorbar(cb, ax=ax[2])
ax[2].title.set_text('Residual')
# The results are not so great, and we can clearly see that as time progress the solution gets worse.... Can we do better? # Notice that the loss on the boundaries of the spatial domain is exactly zero, as expected! After the training is completed one can now plot some results using the `matplotlib`. We plot the predicted output on the left side, the true solution at the center and the difference on the right side using the `plot_solution` function.
# In[ ]:
@torch.no_grad()
def plot_solution(solver, time):
# get the problem
problem = solver.problem
# get spatial points
spatial_samples = problem.spatial_domain.sample(30, "grid")
# get temporal value
time = LabelTensor(torch.tensor([[time]]), "t")
# cross data
points = spatial_samples.append(time, mode="cross")
# compute pinn solution, true solution and absolute difference
data = {
"PINN solution": solver(points),
"True solution": problem.truth_solution(points),
"Absolute Difference": torch.abs(
solver(points) - problem.truth_solution(points)
)
}
# plot the solution
plt.suptitle(f'Solution for time {time.item()}')
for idx, (title, field) in enumerate(data.items()):
plt.subplot(1, 3, idx + 1)
plt.title(title)
plt.tricontourf( # convert to torch tensor + flatten
points.extract("x").tensor.flatten(),
points.extract("y").tensor.flatten(),
field.tensor.flatten(),
)
plt.colorbar(), plt.tight_layout()
# Let's take a look at the results at different times, for example `0.0`, `0.5` and `1.0`:
# In[ ]:
plt.figure(figsize=(12, 6))
plot_solution(solver=pinn, time=0)
plt.figure(figsize=(12, 6))
plot_solution(solver=pinn, time=0.5)
plt.figure(figsize=(12, 6))
plot_solution(solver=pinn, time=1)
# The results are not so great, and we can clearly see that as time progresses the solution gets worse.... Can we do better?
# #
# A valid option is to impose the initial condition as hard constraint as well. Specifically, our solution is written as: # A valid option is to impose the initial condition as hard constraint as well. Specifically, our solution is written as:
# #
@@ -225,7 +262,7 @@ ax[2].title.set_text('Residual')
# #
# Let us build the network first # Let us build the network first
# In[6]: # In[ ]:
class HardMLPtime(torch.nn.Module): class HardMLPtime(torch.nn.Module):
@@ -233,118 +270,80 @@ class HardMLPtime(torch.nn.Module):
def __init__(self, input_dim, output_dim): def __init__(self, input_dim, output_dim):
super().__init__() super().__init__()
self.layers = torch.nn.Sequential(torch.nn.Linear(input_dim, 40), self.layers = torch.nn.Sequential(
torch.nn.Linear(input_dim, 40),
torch.nn.ReLU(), torch.nn.ReLU(),
torch.nn.Linear(40, 40), torch.nn.Linear(40, 40),
torch.nn.ReLU(), torch.nn.ReLU(),
torch.nn.Linear(40, output_dim)) torch.nn.Linear(40, output_dim),
)
# here in the foward we implement the hard constraints # here in the foward we implement the hard constraints
def forward(self, x): def forward(self, x):
hard_space = x.extract(['x'])*(1-x.extract(['x']))*x.extract(['y'])*(1-x.extract(['y'])) hard_space = (
hard_t = torch.sin(torch.pi*x.extract(['x'])) * torch.sin(torch.pi*x.extract(['y'])) * torch.cos(torch.sqrt(torch.tensor(2.))*torch.pi*x.extract(['t'])) x.extract(["x"])
return hard_space * self.layers(x) * x.extract(['t']) + hard_t * (1 - x.extract(["x"]))
* x.extract(["y"])
* (1 - x.extract(["y"]))
)
hard_t = (
torch.sin(torch.pi * x.extract(["x"]))
* torch.sin(torch.pi * x.extract(["y"]))
* torch.cos(
torch.sqrt(torch.tensor(2.0)) * torch.pi * x.extract(["t"])
)
)
return hard_space * self.layers(x) * x.extract(["t"]) + hard_t
# Now let's train with the same configuration as thre previous test # Now let's train with the same configuration as the previous test
# In[7]: # In[ ]:
# generate the data # define model
problem.discretise_domain(1000, 'random', domains=['phys_cond', 'time_cond', 'bound_cond1', 'bound_cond2', 'bound_cond3', 'bound_cond4']) model = HardMLPtime(len(problem.input_variables), len(problem.output_variables))
# crete the solver # crete the solver
pinn = PINN(problem, HardMLPtime(len(problem.input_variables), len(problem.output_variables))) pinn = PINN(problem=problem, model=model)
# create trainer and train # create trainer and train
trainer = Trainer(pinn, max_epochs=1000, accelerator='cpu', enable_model_summary=False) # we train on CPU and avoid model summary at beginning of training (optional) trainer = Trainer(
solver=pinn,
max_epochs=1000,
accelerator="cpu",
enable_model_summary=False,
train_size=1.0,
val_size=0.0,
test_size=0.0,
logger=TensorBoardLogger("tutorial_logs"),
enable_progress_bar=False,
)
trainer.train() trainer.train()
# We can clearly see that the loss is way lower now. Let's plot the results # We can clearly see that the loss is way lower now. Let's plot the results
# In[8]: # In[ ]:
# plotting at fixed time t = 0.0 plt.figure(figsize=(12, 6))
print('Plotting at t=0') plot_solution(solver=pinn, time=0)
fixed_variables={'t': 0.0}
pts = pinn.problem.spatial_domain.sample(256, 'grid', variables=['x','y']) plt.figure(figsize=(12, 6))
grids = [p_.reshape(256, 256) for p_ in pts.extract(['x','y']).T] plot_solution(solver=pinn, time=0.5)
fixed_pts = torch.ones(pts.shape[0], len(fixed_variables))
fixed_pts *= torch.tensor(list(fixed_variables.values())) plt.figure(figsize=(12, 6))
fixed_pts = fixed_pts.as_subclass(LabelTensor) plot_solution(solver=pinn, time=1)
fixed_pts.labels = list(fixed_variables.keys())
pts = pts.append(fixed_pts)
pts = pts.to(device=pinn.device)
predicted_output = pinn.forward(pts).extract('u').as_subclass(torch.Tensor).cpu().detach().reshape(256,256)
true_output = pinn.problem.truth_solution(pts).cpu().detach().reshape(256,256)
pts = pts.cpu()
grids = [p_.reshape(256, 256) for p_ in pts.extract(['x','y']).T]
fig, ax = plt.subplots(nrows=1, ncols=3, figsize=(16, 6))
cb = getattr(ax[0], method)(*grids, predicted_output)
fig.colorbar(cb, ax=ax[0])
ax[0].title.set_text('Neural Network prediction')
cb = getattr(ax[1], method)(*grids, true_output)
fig.colorbar(cb, ax=ax[1])
ax[1].title.set_text('True solution')
cb = getattr(ax[2],method)(*grids,(true_output - predicted_output))
fig.colorbar(cb, ax=ax[2])
ax[2].title.set_text('Residual')
# plotting at fixed time t = 0.5
print('Plotting at t=0.5')
#plotter.plot(pinn, fixed_variables={'t': 0.5})
fixed_variables={'t': 0.5}
pts = pinn.problem.spatial_domain.sample(256, 'grid', variables=['x','y'])
fixed_pts = torch.ones(pts.shape[0], len(fixed_variables))
fixed_pts *= torch.tensor(list(fixed_variables.values()))
fixed_pts = fixed_pts.as_subclass(LabelTensor)
fixed_pts.labels = list(fixed_variables.keys())
pts = pts.append(fixed_pts)
pts = pts.to(device=pinn.device)
predicted_output = pinn.forward(pts).extract('u').as_subclass(torch.Tensor).cpu().detach().reshape(256,256)
true_output = pinn.problem.truth_solution(pts).cpu().detach().reshape(256,256)
pts = pts.cpu()
grids = [p_.reshape(256, 256) for p_ in pts.extract(['x','y']).T]
fig, ax = plt.subplots(nrows=1, ncols=3, figsize=(16, 6))
cb = getattr(ax[0], method)(*grids, predicted_output)
fig.colorbar(cb, ax=ax[0])
ax[0].title.set_text('Neural Network prediction')
cb = getattr(ax[1], method)(*grids, true_output)
fig.colorbar(cb, ax=ax[1])
ax[1].title.set_text('True solution')
cb = getattr(ax[2],method)(*grids,(true_output - predicted_output))
fig.colorbar(cb, ax=ax[2])
ax[2].title.set_text('Residual')
# plotting at fixed time t = 1.
print('Plotting at t=1')
#plotter.plot(pinn, fixed_variables={'t': 1.0})
fixed_variables={'t': 1.0}
pts = pinn.problem.spatial_domain.sample(256, 'grid', variables=['x','y'])
fixed_pts = torch.ones(pts.shape[0], len(fixed_variables))
fixed_pts *= torch.tensor(list(fixed_variables.values()))
fixed_pts = fixed_pts.as_subclass(LabelTensor)
fixed_pts.labels = list(fixed_variables.keys())
pts = pts.append(fixed_pts)
pts = pts.to(device=pinn.device)
predicted_output = pinn.forward(pts).extract('u').as_subclass(torch.Tensor).cpu().detach().reshape(256,256)
true_output = pinn.problem.truth_solution(pts).cpu().detach().reshape(256,256)
pts = pts.cpu()
grids = [p_.reshape(256, 256) for p_ in pts.extract(['x','y']).T]
fig, ax = plt.subplots(nrows=1, ncols=3, figsize=(16, 6))
cb = getattr(ax[0], method)(*grids, predicted_output)
fig.colorbar(cb, ax=ax[0])
ax[0].title.set_text('Neural Network prediction')
cb = getattr(ax[1], method)(*grids, true_output)
fig.colorbar(cb, ax=ax[1])
ax[1].title.set_text('True solution')
cb = getattr(ax[2],method)(*grids,(true_output - predicted_output))
fig.colorbar(cb, ax=ax[2])
ax[2].title.set_text('Residual')
# We can see now that the results are way better! This is due to the fact that previously the network was not learning correctly the initial conditon, leading to a poor solution when time evolved. By imposing the initial condition the network is able to correctly solve the problem. # We can see now that the results are way better! This is due to the fact that previously the network was not learning correctly the initial conditon, leading to a poor solution when time evolved. By imposing the initial condition the network is able to correctly solve the problem. We can also see using Tensorboard how the two losses decreased:
# In[ ]:
get_ipython().run_line_magic('tensorboard', "--logdir 'tutorial_logs'")
# ## What's next? # ## What's next?
# #