# copyright ################################# #
# This file is part of the wakis Package. #
# Copyright (c) CERN, 2024. #
# ########################################### #
import time
import h5py
import numpy as np
from scipy.constants import c as c_light
from scipy.constants import epsilon_0 as eps_0
from scipy.constants import mu_0 as mu_0
from scipy.sparse import csc_matrix as sparse_mat
from scipy.sparse import diags, hstack, vstack
from .field import Field
from .logger import Logger
from .materials import material_lib
from .plotting import PlotMixin
from .routines import RoutinesMixin
try:
from cupyx.scipy.sparse import csc_matrix as gpu_sparse_mat
imported_cupyx = True
except ImportError:
imported_cupyx = False
try:
from sparse_dot_mkl import csr_matrix as mkl_sparse_mat
from sparse_dot_mkl import dot_product_mkl
imported_mkl = True
except ImportError:
imported_mkl = False
[docs]class SolverFIT3D(PlotMixin, RoutinesMixin):
def __init__(
self,
grid,
wake=None,
cfln=0.5,
dt=None,
bc_low=["Periodic", "Periodic", "Periodic"],
bc_high=["Periodic", "Periodic", "Periodic"],
use_stl=False,
use_conductors=False,
use_gpu=False,
use_mpi=False,
dtype=np.float64,
n_pml=10,
bg=[1.0, 1.0],
verbose=1,
):
"""
3D time-domain electromagnetic solver based on the Finite Integration
Technique (FIT).
Handles mesh and geometry, material assignment, boundary conditions and
time-stepping. Supports CPU, optional GPU acceleration (cupyx) and MPI
domain decomposition. Provides utilities for importing conductors and
STL solids, applying PML/ABC boundaries, and saving/restoring solver
state.
Parameters
----------
grid : GridFIT3D
Instance providing mesh, coordinate arrays and geometry flags.
wake : WakeSolver, optional
Wakefield object with beam parameters used for wake computations.
cfln : float, optional
CFL number used to compute a stable timestep when ``dt`` is None.
dt : float, optional
Explicit timestep. If provided, it overrides the CFL-based value.
bc_low, bc_high : list of str, optional
Boundary conditions for low/high faces in (x, y, z) order.
use_stl : bool, optional
If True, apply solids and materials provided in the ``grid`` object.
use_conductors : bool, optional
If True, import conductor geometry from ``conductors.py`` masks.
use_gpu : bool, optional
Enable GPU acceleration via ``cupyx`` (if available).
use_mpi : bool, optional
Enable MPI execution for a subdivided grid.
dtype : numpy dtype, optional
Numeric dtype for solver arrays (default ``np.float64``).
n_pml : int, optional
Number of PML cells for PML boundary regions.
bg : sequence or str, optional
Background material [eps_r, mu_r, sigma] or a material key from
the library. If a sigma value is provided conductivity handling is
enabled.
verbose : int or bool, optional
Verbosity flag for initialization messages.
Attributes
----------
E, H, J : wakis.Field
Electric field, magnetic field and current density containers.
Access components via labels 'x','y','z'. Example:
``solver.E[:, :, n, 'z']`` gives Ez at z-index n.
ieps, imu, sigma : wakis.Field
Material tensors (inverse permittivity, inverse permeability and
conductivity) stored per field component.
grid : GridFIT3D
Reference to the input grid object.
dt : float
Time-step used for time integration.
cfln : float
CFL number used when computing dt from grid spacing.
"""
self.verbose = verbose
t0 = time.time()
self.logger = Logger()
# Flags
self.step_0 = True
self.nstep = int(0)
self.plotter_active = False
self.use_conductors = use_conductors
self.use_stl = use_stl
self.use_gpu = use_gpu
self.use_mpi = use_mpi
self.activate_abc = False # Will turn true if abc BCs are chosen
self.activate_pml = False # Will turn true if pml BCs are chosen
self.use_conductivity = (
False # Will turn true with conductive material or pml
)
self.imported_mkl = imported_mkl # Use MKL backend when available
self.one_step = self._one_step
if use_stl:
self.use_conductors = False
self.update_logger(["use_gpu", "use_mpi"])
# Grid
self.grid = grid
self.background = bg
self.Nx = self.grid.Nx
self.Ny = self.grid.Ny
self.Nz = self.grid.Nz
self.N = self.Nx * self.Ny * self.Nz
self.dx = self.grid.dx
self.dy = self.grid.dy
self.dz = self.grid.dz
self.x = self.grid.x[:-1] + self.dx / 2
self.y = self.grid.y[:-1] + self.dy / 2
self.z = self.grid.z[:-1] + self.dz / 2
self.L = self.grid.L
self.iA = self.grid.iA
self.tL = self.grid.tL
self.itA = self.grid.itA
self.update_logger(["grid", "background"])
# Wake computation
self.wake = wake
if self.wake is not None:
self.logger.wakeSolver = self.wake.logger.wakeSolver
# Fields
self.dtype = dtype
self.E = Field(
self.Nx, self.Ny, self.Nz, use_gpu=self.use_gpu, dtype=self.dtype
)
self.H = Field(
self.Nx, self.Ny, self.Nz, use_gpu=self.use_gpu, dtype=self.dtype
)
self.J = Field(
self.Nx, self.Ny, self.Nz, use_gpu=self.use_gpu, dtype=self.dtype
)
# MPI init
if self.use_mpi:
if self.grid.use_mpi:
self._mpi_initialize()
self.one_step = self._mpi_one_step
else:
print(
"[!] Grid not subdivided for MPI, set `use_mpi`=True also in \
`GridFIT3D` to enable MPI"
)
# Matrices
if verbose:
print("Assembling operator matrices...")
N = self.N
self.Px = diags([-1, 1], [0, 1], shape=(N, N), dtype=np.int8)
self.Py = diags([-1, 1], [0, self.Nx], shape=(N, N), dtype=np.int8)
self.Pz = diags(
[-1, 1], [0, self.Nx * self.Ny], shape=(N, N), dtype=np.int8
)
# original grid
self.Ds = diags(
self.L.toarray(), shape=(3 * N, 3 * N), dtype=self.dtype
)
self.iDa = diags(
self.iA.toarray(), shape=(3 * N, 3 * N), dtype=self.dtype
)
# tilde grid
self.tDs = diags(
self.tL.toarray(), shape=(3 * N, 3 * N), dtype=self.dtype
)
self.itDa = diags(
self.itA.toarray(), shape=(3 * N, 3 * N), dtype=self.dtype
)
# Curl matrix
self.C = vstack(
[
hstack([sparse_mat((N, N)), -self.Pz, self.Py]),
hstack([self.Pz, sparse_mat((N, N)), -self.Px]),
hstack([-self.Py, self.Px, sparse_mat((N, N))]),
],
dtype=np.int8,
)
# Boundaries
if verbose:
print("Applying boundary conditions...")
self.bc_low = bc_low
self.bc_high = bc_high
self.update_logger(["bc_low", "bc_high"])
self._apply_bc_to_C()
# Materials
if verbose:
print("Adding material tensors...")
if type(bg) is str:
bg = material_lib[bg.lower()]
if len(bg) == 3:
self.eps_bg, self.mu_bg, self.sigma_bg = (
bg[0] * eps_0,
bg[1] * mu_0,
bg[2],
)
self.use_conductivity = True
else:
self.eps_bg, self.mu_bg, self.sigma_bg = (
bg[0] * eps_0,
bg[1] * mu_0,
0.0,
)
# fmt: off
self.ieps = (
Field(self.Nx, self.Ny, self.Nz, use_ones=True, dtype=self.dtype)
* (1.0 / self.eps_bg)
)
self.imu = (
Field(self.Nx, self.Ny, self.Nz, use_ones=True, dtype=self.dtype)
* (1.0 / self.mu_bg)
)
self.sigma = (
Field(self.Nx, self.Ny, self.Nz, use_ones=True, dtype=self.dtype)
* self.sigma_bg
)
# fmt: on
if self.use_stl:
self._apply_stl_materials()
# Fill PML BCs
if self.activate_pml:
if verbose:
print("Filling PML sigmas...")
self.n_pml = n_pml
self.pml_lo = 5.0e-3
self.pml_hi = 10.0
self.pml_func = np.geomspace
self.pml_eps_r = 1.0
self._fill_pml_sigmas()
self.update_logger(["n_pml"])
# Timestep calculation
if verbose:
print("Calculating maximal stable timestep...")
self.cfln = cfln
if dt is None:
self.dt = cfln / (
c_light
* np.sqrt(
1 / np.min(self.grid.dx) ** 2
+ 1 / np.min(self.grid.dy) ** 2
+ 1 / np.min(self.grid.dz) ** 2
)
)
else:
self.dt = dt
self.dt = self.dtype(self.dt)
self.update_logger(["dt"])
if self.use_conductivity: # relaxation time criterion tau
mask = np.logical_and(
self.sigma.toarray() != 0, # for non-conductive
self.ieps.toarray() != 0,
) # for PEC eps=inf
self.tau = (1 / self.ieps.toarray()[mask]) / self.sigma.toarray()[
mask
]
if self.dt > self.tau.min():
self.dt = self.tau.min()
# Pre-computing
if verbose:
print("Pre-computing...")
self.iDeps = diags(
self.ieps.toarray(), shape=(3 * N, 3 * N), dtype=self.dtype
)
self.iDmu = diags(
self.imu.toarray(), shape=(3 * N, 3 * N), dtype=self.dtype
)
self.Dsigma = diags(
self.sigma.toarray(), shape=(3 * N, 3 * N), dtype=self.dtype
)
self.tDsiDmuiDaC = self.iDa * self.iDmu * self.C * self.Ds
self.itDaiDepsDstC = (
self.iDeps * self.itDa * self.C.transpose() * self.tDs
)
if imported_mkl and not self.use_gpu: # MKL backend for CPU
if verbose:
print("Using MKL backend for time-stepping...")
self.tDsiDmuiDaC = mkl_sparse_mat(self.tDsiDmuiDaC)
self.itDaiDepsDstC = mkl_sparse_mat(self.itDaiDepsDstC)
self.one_step = (
self._mpi_one_step_mkl if self.use_mpi else self._one_step_mkl
)
# Move to GPU
if use_gpu:
if verbose:
print("Moving to GPU...")
if imported_cupyx:
self.tDsiDmuiDaC = gpu_sparse_mat(self.tDsiDmuiDaC)
self.itDaiDepsDstC = gpu_sparse_mat(self.itDaiDepsDstC)
self.ieps.to_gpu()
self.sigma.to_gpu()
else:
raise ImportError(
"[!] cupyx could not be imported, please check CUDA installation"
)
if verbose:
print(f"Total solver initialization time: {time.time() - t0} s")
self.solverInitializationTime = time.time() - t0
self.update_logger(["solverInitializationTime"])
self.solverInitializationTime = time.time() - t0
self.update_logger(["solverInitializationTime"])
[docs] def update_tensors(self, tensor="all"):
"""
Update tensor matrices after material Field changes and precompute
combined operators used for time-stepping.
When ``ieps``, ``imu`` or ``sigma`` are modified this routine
reconstructs the corresponding sparse diagonal matrices and the
composite operator products used in the update equations. Use the
``tensor`` argument to restrict work to a single tensor for efficiency.
Parameters
----------
tensor : {'ieps','imu','sigma','all'}, optional
Which tensor to update. Default is 'all' which recomputes every
tensor and refreshes the precomputed time-stepping matrices.
"""
if self.verbose:
print(f'Re-computing tensor "{tensor}"...')
if tensor == "ieps":
self.iDeps = diags(
self.ieps.toarray(),
shape=(3 * self.N, 3 * self.N),
dtype=self.dtype,
)
elif tensor == "imu":
self.iDmu = diags(
self.imu.toarray(),
shape=(3 * self.N, 3 * self.N),
dtype=self.dtype,
)
elif tensor == "sigma":
self.Dsigma = diags(
self.sigma.toarray(),
shape=(3 * self.N, 3 * self.N),
dtype=self.dtype,
)
elif tensor == "all":
self.iDeps = diags(
self.ieps.toarray(),
shape=(3 * self.N, 3 * self.N),
dtype=self.dtype,
)
self.iDmu = diags(
self.imu.toarray(),
shape=(3 * self.N, 3 * self.N),
dtype=self.dtype,
)
self.Dsigma = diags(
self.sigma.toarray(),
shape=(3 * self.N, 3 * self.N),
dtype=self.dtype,
)
if self.verbose:
print("Re-Pre-computing ...")
self.tDsiDmuiDaC = self.iDa * self.iDmu * self.C * self.Ds
self.itDaiDepsDstC = (
self.iDeps * self.itDa * self.C.transpose() * self.tDs
)
self.step_0 = False
def _one_step(self):
if self.step_0:
self._set_ghosts_to_0()
self.step_0 = False
self._attrcleanup()
self.H.fromarray(
self.H.toarray() - self.dt * self.tDsiDmuiDaC * self.E.toarray()
)
self.E.fromarray(
self.E.toarray()
+ self.dt
* (
self.itDaiDepsDstC * self.H.toarray()
- self.ieps.toarray() * self.J.toarray()
)
)
# include current computation
if self.use_conductivity:
self.J.fromarray(self.sigma.toarray() * self.E.toarray())
def _one_step_mkl(self):
if self.step_0:
self._set_ghosts_to_0()
self.step_0 = False
self._attrcleanup()
self.H.fromarray(
self.H.toarray()
- self.dt * dot_product_mkl(self.tDsiDmuiDaC, self.E.toarray())
)
self.E.fromarray(
self.E.toarray()
+ self.dt
* (
dot_product_mkl(self.itDaiDepsDstC, self.H.toarray())
- self.ieps.toarray() * self.J.toarray()
)
)
# include current computation
if self.use_conductivity:
self.J.fromarray(self.sigma.toarray() * self.E.toarray())
def _mpi_initialize(self):
self.comm = self.grid.comm
self.rank = self.grid.rank
self.size = self.grid.size
self.NZ = self.grid.NZ
self.ZMIN = self.grid.ZMIN
self.ZMAX = self.grid.ZMAX
self.Z = self.grid.Z
def _mpi_one_step(self):
if self.step_0:
self._set_ghosts_to_0()
self.step_0 = False
self._attrcleanup()
self.H.fromarray(
self.H.toarray() - self.dt * self.tDsiDmuiDaC * self.E.toarray()
)
self._mpi_communicate(self.H)
self._mpi_communicate(self.J)
self.E.fromarray(
self.E.toarray()
+ self.dt
* (
self.itDaiDepsDstC * self.H.toarray()
- self.ieps.toarray() * self.J.toarray()
)
)
self._mpi_communicate(self.E)
# include current computation
if self.use_conductivity:
self.J.fromarray(self.sigma.toarray() * self.E.toarray())
def _mpi_one_step_mkl(self):
if self.step_0:
self._set_ghosts_to_0()
self.step_0 = False
self._attrcleanup()
self.H.fromarray(
self.H.toarray()
- self.dt * dot_product_mkl(self.tDsiDmuiDaC, self.E.toarray())
)
self._mpi_communicate(self.H)
self._mpi_communicate(self.J)
self.E.fromarray(
self.E.toarray()
+ self.dt
* (
dot_product_mkl(self.itDaiDepsDstC, self.H.toarray())
- self.ieps.toarray() * self.J.toarray()
)
)
self._mpi_communicate(self.E)
# include current computation
if self.use_conductivity:
self.J.fromarray(self.sigma.toarray() * self.E.toarray())
def _mpi_communicate(self, field):
if self.use_gpu:
field.from_gpu()
# ghosts lo
if self.rank > 0:
for d in ["x", "y", "z"]:
self.comm.Sendrecv(
field[:, :, 1, d],
recvbuf=field[:, :, 0, d],
dest=self.rank - 1,
sendtag=0,
source=self.rank - 1,
recvtag=1,
)
# ghosts hi
if self.rank < self.size - 1:
for d in ["x", "y", "z"]:
self.comm.Sendrecv(
field[:, :, -2, d],
recvbuf=field[:, :, -1, d],
dest=self.rank + 1,
sendtag=1,
source=self.rank + 1,
recvtag=0,
)
if self.use_gpu:
field.to_gpu()
[docs] def mpi_gather(self, field, x=None, y=None, z=None, component=None):
"""
Gather a component or slice of a distributed Field from all MPI ranks.
Assumes the field is split along the z-axis among ranks. The function
collects local buffers, removes ghost cells and concatenates rank
contributions to build a global NumPy array on the root rank (rank 0).
Parameters
----------
field : str or wakis.Field
Field identifier ('E','H','J') optionally with a component suffix
(e.g. 'Ex'), or a ``wakis.Field`` object. If no component is given
the 'z' component is used by default.
x, y, z : int or slice, optional
Index or slice for each axis to gather. Defaults to the full range.
component : {'x','y','z'} or slice, optional
Component to gather when ``field`` is a Field object.
Returns
-------
numpy.ndarray or None
Assembled global array on rank 0; returns ``None`` on non-root ranks.
"""
if x is None:
x = slice(0, self.Nx)
if y is None:
y = slice(0, self.Ny)
if z is None:
z = slice(0, self.NZ)
if type(field) is str:
if len(field) == 2: # support for e.g. field='Ex'
component = field[1]
field = field[0]
elif len(field) == 4: # support for Abs
component = field[1:]
field = field[0]
elif component is None:
component = "z"
print(
"[!] `component` not specified, using default component='z'"
)
if field == "E":
local = self.E[x, y, :, component].ravel()
elif field == "H":
local = self.H[x, y, :, component].ravel()
elif field == "J":
local = self.J[x, y, :, component].ravel()
else:
if component is None:
component = "z"
print(
"[!] `component` not specified, using default component='z'"
)
local = field[x, y, :, component].ravel()
buffer = self.comm.gather(local, root=0)
_field = None
if self.rank == 0:
if type(x) is int and type(y) is int: # 1d array at x=a, y=b
nz = self.NZ // self.size
_field = np.zeros((self.NZ))
for r in range(self.size):
zz = np.s_[r * nz : (r + 1) * nz]
if r == 0:
_field[zz] = np.reshape(
buffer[r], (nz + self.grid.n_ghosts)
)[:-1]
elif r == (self.size - 1):
_field[zz] = np.reshape(
buffer[r], (nz + self.grid.n_ghosts)
)[1:]
else:
_field[zz] = np.reshape(
buffer[r], (nz + 2 * self.grid.n_ghosts)
)[1:-1]
_field = _field[z]
elif type(x) is int: # 2d slice at x=a
ny = y.stop - y.start
nz = self.NZ // self.size
_field = np.zeros((ny, self.NZ))
for r in range(self.size):
zz = np.s_[r * nz : (r + 1) * nz]
if r == 0:
_field[:, zz] = np.reshape(
buffer[r], (ny, nz + self.grid.n_ghosts)
)[:, :-1]
elif r == (self.size - 1):
_field[:, zz] = np.reshape(
buffer[r], (ny, nz + self.grid.n_ghosts)
)[:, 1:]
else:
_field[:, zz] = np.reshape(
buffer[r], (ny, nz + 2 * self.grid.n_ghosts)
)[:, 1:-1]
_field = _field[:, z]
elif type(y) is int: # 2d slice at y=a
nx = x.stop - x.start
nz = self.NZ // self.size
_field = np.zeros((nx, self.NZ))
for r in range(self.size):
zz = np.s_[r * nz : (r + 1) * nz]
if r == 0:
_field[:, zz] = np.reshape(
buffer[r], (nx, nz + self.grid.n_ghosts)
)[:, :-1]
elif r == (self.size - 1):
_field[:, zz] = np.reshape(
buffer[r], (nx, nz + self.grid.n_ghosts)
)[:, 1:]
else:
_field[:, zz] = np.reshape(
buffer[r], (nx, nz + 2 * self.grid.n_ghosts)
)[:, 1:-1]
_field = _field[:, z]
else: # both type slice -> 3d array
nx = x.stop - x.start
ny = y.stop - y.start
nz = self.NZ // self.size
_field = np.zeros((nx, ny, self.NZ))
for r in range(self.size):
zz = np.s_[r * nz : (r + 1) * nz]
if r == 0:
_field[:, :, zz] = np.reshape(
buffer[r], (nx, ny, nz + self.grid.n_ghosts)
)[:, :, :-1]
elif r == (self.size - 1):
_field[:, :, zz] = np.reshape(
buffer[r], (nx, ny, nz + self.grid.n_ghosts)
)[:, :, 1:]
else:
_field[:, :, zz] = np.reshape(
buffer[r], (nx, ny, nz + 2 * self.grid.n_ghosts)
)[:, :, 1:-1]
_field = _field[:, :, z]
return _field
[docs] def mpi_gather_asField(self, field):
"""
Gather distributed field data from MPI ranks and return a global Field.
Collects the full 3-component field (E, H or J) from each rank and
reconstructs a single ``wakis.Field`` on the root rank. Ghost cells are
removed when reassembling the per-rank buffers.
Parameters
----------
field : str or wakis.Field
Identifier ('E','H','J') or a Field-like object to gather.
Returns
-------
wakis.Field or None
Global Field object assembled on rank 0. Returns ``None`` on other
ranks.
"""
_field = Field(self.Nx, self.Ny, self.NZ)
for d in ["x", "y", "z"]:
if type(field) is str:
if field == "E":
local = self.E[:, :, :, d].ravel()
elif field == "H":
local = self.H[:, :, :, d].ravel()
elif field == "J":
local = self.J[:, :, :, d].ravel()
else:
local = field[:, :, :, d].ravel()
buffer = self.comm.gather(local, root=0)
if self.rank == 0:
nz = self.NZ // self.size
for r in range(self.size):
zz = np.s_[r * nz : (r + 1) * nz]
if r == 0:
_field[:, :, zz, d] = np.reshape(
buffer[r],
(self.Nx, self.Ny, nz + self.grid.n_ghosts),
)[:, :, :-1]
elif r == (self.size - 1):
_field[:, :, zz, d] = np.reshape(
buffer[r],
(self.Nx, self.Ny, nz + self.grid.n_ghosts),
)[:, :, 1:]
else:
_field[:, :, zz, d] = np.reshape(
buffer[r],
(self.Nx, self.Ny, nz + 2 * self.grid.n_ghosts),
)[:, :, 1:-1]
return _field
def _apply_bc_to_C(self):
"""
Apply boundary conditions by modifying curl and metric matrices.
Adjusts rows/columns of the curl operator ``C`` and the metric-diagonal
matrices (``tDs``, ``itDa``) according to the low/high boundary
condition lists ``bc_low`` and ``bc_high``. Handles periodic, PEC/PMC,
ABC and PML options and also configures MPI-internal faces when the
grid is subdivided.
"""
xlo, ylo, zlo = 1.0, 1.0, 1.0
xhi, yhi, zhi = 1.0, 1.0, 1.0
# Check BCs for internal MPI subdomains
if self.use_mpi and self.grid.use_mpi:
if self.rank > 0:
self.bc_low = ["pec", "pec", "mpi"]
if self.rank < self.size - 1:
self.bc_high = ["pec", "pec", "mpi"]
# Perodic: out == in
if any(True for x in self.bc_low if x.lower() == "periodic"):
if (
self.bc_low[0].lower() == "periodic"
and self.bc_high[0].lower() == "periodic"
):
self.tL[-1, :, :, "x"] = self.L[0, :, :, "x"]
self.itA[-1, :, :, "y"] = self.iA[0, :, :, "y"]
self.itA[-1, :, :, "z"] = self.iA[0, :, :, "z"]
if (
self.bc_low[1].lower() == "periodic"
and self.bc_high[1].lower() == "periodic"
):
self.tL[:, -1, :, "y"] = self.L[:, 0, :, "y"]
self.itA[:, -1, :, "x"] = self.iA[:, 0, :, "x"]
self.itA[:, -1, :, "z"] = self.iA[:, 0, :, "z"]
if (
self.bc_low[2].lower() == "periodic"
and self.bc_high[2].lower() == "periodic"
):
self.tL[:, :, -1, "z"] = self.L[:, :, 0, "z"]
self.itA[:, :, -1, "x"] = self.iA[:, :, 0, "x"]
self.itA[:, :, -1, "y"] = self.iA[:, :, 0, "y"]
self.tDs = diags(
self.tL.toarray(),
shape=(3 * self.N, 3 * self.N),
dtype=self.dtype,
)
self.itDa = diags(
self.itA.toarray(),
shape=(3 * self.N, 3 * self.N),
dtype=self.dtype,
)
# Dirichlet PEC: tangential E field = 0 at boundary
if any(
True
for x in self.bc_low
if x.lower() in ("electric", "pec", "pml")
) or any(
True
for x in self.bc_high
if x.lower() in ("electric", "pec", "pml")
):
if self.bc_low[0].lower() in ("electric", "pec", "pml"):
xlo = 0
if self.bc_low[1].lower() in ("electric", "pec", "pml"):
ylo = 0
if self.bc_low[2].lower() in ("electric", "pec", "pml"):
zlo = 0
if self.bc_high[0].lower() in ("electric", "pec", "pml"):
xhi = 0
if self.bc_high[1].lower() in ("electric", "pec", "pml"):
yhi = 0
if self.bc_high[2].lower() in ("electric", "pec", "pml"):
zhi = 0
# Assemble matrix
self.BC = Field(
self.Nx, self.Ny, self.Nz, dtype=np.int8, use_ones=True
)
for d in ["x", "y", "z"]: # tangential to zero
if d != "x":
self.BC[0, :, :, d] = xlo
self.BC[-1, :, :, d] = xhi
if d != "y":
self.BC[:, 0, :, d] = ylo
self.BC[:, -1, :, d] = yhi
if d != "z":
self.BC[:, :, 0, d] = zlo
self.BC[:, :, -1, d] = zhi
self.Dbc = diags(
self.BC.toarray(),
shape=(3 * self.N, 3 * self.N),
dtype=np.int8,
)
# Update C (columns)
self.C = self.C * self.Dbc
# Dirichlet PMC: tangential H field = 0 at boundary
if any(
True for x in self.bc_low if x.lower() in ("magnetic", "pmc")
) or any(
True for x in self.bc_high if x.lower() in ("magnetic", "pmc")
):
if self.bc_low[0].lower() == "magnetic" or self.bc_low[0] == "pmc":
xlo = 0
if self.bc_low[1].lower() == "magnetic" or self.bc_low[1] == "pmc":
ylo = 0
if self.bc_low[2].lower() == "magnetic" or self.bc_low[2] == "pmc":
zlo = 0
if (
self.bc_high[0].lower() == "magnetic"
or self.bc_high[0] == "pmc"
):
xhi = 0
if (
self.bc_high[1].lower() == "magnetic"
or self.bc_high[1] == "pmc"
):
yhi = 0
if (
self.bc_high[2].lower() == "magnetic"
or self.bc_high[2] == "pmc"
):
zhi = 0
# Assemble matrix
self.BC = Field(
self.Nx, self.Ny, self.Nz, dtype=np.int8, use_ones=True
)
for d in ["x", "y", "z"]: # tangential to zero
if d != "x":
self.BC[0, :, :, d] = xlo
self.BC[-1, :, :, d] = xhi
if d != "y":
self.BC[:, 0, :, d] = ylo
self.BC[:, -1, :, d] = yhi
if d != "z":
self.BC[:, :, 0, d] = zlo
self.BC[:, :, -1, d] = zhi
self.Dbc = diags(
self.BC.toarray(),
shape=(3 * self.N, 3 * self.N),
dtype=np.int8,
)
# Update C (rows)
self.C = self.Dbc * self.C
# Absorbing boundary conditions ABC
if any(True for x in self.bc_low if x.lower() == "abc") or any(
True for x in self.bc_high if x.lower() == "abc"
):
if self.bc_high[0].lower() == "abc":
self.tL[-1, :, :, "x"] = self.L[0, :, :, "x"]
self.itA[-1, :, :, "y"] = self.iA[0, :, :, "y"]
self.itA[-1, :, :, "z"] = self.iA[0, :, :, "z"]
if self.bc_high[1].lower() == "abc":
self.tL[:, -1, :, "y"] = self.L[:, 0, :, "y"]
self.itA[:, -1, :, "x"] = self.iA[:, 0, :, "x"]
self.itA[:, -1, :, "z"] = self.iA[:, 0, :, "z"]
if self.bc_high[2].lower() == "abc":
self.tL[:, :, -1, "z"] = self.L[:, :, 0, "z"]
self.itA[:, :, -1, "x"] = self.iA[:, :, 0, "x"]
self.itA[:, :, -1, "y"] = self.iA[:, :, 0, "y"]
self.tDs = diags(
self.tL.toarray(),
shape=(3 * self.N, 3 * self.N),
dtype=self.dtype,
)
self.itDa = diags(
self.itA.toarray(),
shape=(3 * self.N, 3 * self.N),
dtype=self.dtype,
)
self.activate_abc = True
# Perfect Matching Layers (PML)
if any(True for x in self.bc_low if x.lower() == "pml") or any(
True for x in self.bc_high if x.lower() == "pml"
):
self.activate_pml = True
self.use_conductivity = True
def _fill_pml_sigmas(self):
"""
Compute and apply PML sigma profiles to the solver conductivity tensor.
Uses configured PML settings (number of layers, profile function and
scaling) to set per-component conductivity in the PML regions. This is
used to absorb outgoing waves and reduce reflections at domain edges.
"""
# Initialize
sx, sy, sz = np.zeros(self.Nx), np.zeros(self.Ny), np.zeros(self.Nz)
# pml_exp = 2
# Fill
if self.bc_low[0].lower() == "pml":
# sx[0:self.n_pml] = eps_0/(2*self.dt)*((self.x[self.n_pml] - self.x[:self.n_pml])/(self.n_pml*self.dx))**pml_exp
sx[0 : self.n_pml] = self.pml_func(
self.pml_hi, self.pml_lo, self.n_pml
)
for d in ["x", "y", "z"]:
# Get the properties from the layer before the PML
# Take the values at the center of the yz plane
ieps_0_pml = self.ieps[
self.n_pml + 1, self.Ny // 2, self.Nz // 2, d
]
sigma_0_pml = self.sigma[
self.n_pml + 1, self.Ny // 2, self.Nz // 2, d
]
sigma_mult_pml = (
1 if sigma_0_pml < 1 else sigma_0_pml
) # avoid null sigma in PML for relaxation time computation
for i in range(self.n_pml):
self.ieps[i, :, :, d] = ieps_0_pml
self.sigma[i, :, :, d] = (
sigma_0_pml + sigma_mult_pml * sx[i]
)
# if sx[i] > 0 : self.ieps[i, :, :, d] = 1/(eps_0+sx[i]*(2*self.dt))
if self.bc_low[1].lower() == "pml":
# sy[0:self.n_pml] = 1/(2*self.dt)*((self.y[self.n_pml] - self.y[:self.n_pml])/(self.n_pml*self.dy))**pml_exp
sy[0 : self.n_pml] = self.pml_func(
self.pml_hi, self.pml_lo, self.n_pml
)
for d in ["x", "y", "z"]:
# Get the properties from the layer before the PML
# Take the values at the center of the xz plane
ieps_0_pml = self.ieps[
self.Nx // 2, self.n_pml + 1, self.Nz // 2, d
]
sigma_0_pml = self.sigma[
self.Nx // 2, self.n_pml + 1, self.Nz // 2, d
]
sigma_mult_pml = (
1 if sigma_0_pml < 1 else sigma_0_pml
) # avoid null sigma in PML for relaxation time computation
for j in range(self.n_pml):
self.ieps[:, j, :, d] = ieps_0_pml
self.sigma[:, j, :, d] = (
sigma_0_pml + sigma_mult_pml * sy[j]
)
# if sy[j] > 0 : self.ieps[:, j, :, d] = 1/(eps_0+sy[j]*(2*self.dt))
if self.bc_low[2].lower() == "pml":
# sz[0:self.n_pml] = eps_0/(2*self.dt)*((self.z[self.n_pml] - self.z[:self.n_pml])/(self.n_pml*self.dz))**pml_exp
sz[0 : self.n_pml] = self.pml_func(
self.pml_hi, self.pml_lo, self.n_pml
)
for d in ["x", "y", "z"]:
# Get the properties from the layer before the PML
# Take the values at the center of the xy plane
ieps_0_pml = self.ieps[
self.Nx // 2, self.Ny // 2, self.n_pml + 1, d
]
sigma_0_pml = self.sigma[
self.Nx // 2, self.Ny // 2, self.n_pml + 1, d
]
sigma_mult_pml = (
1 if sigma_0_pml < 1 else sigma_0_pml
) # avoid null sigma in PML for relaxation time computation
for k in range(self.n_pml):
self.ieps[:, :, k, d] = ieps_0_pml
self.sigma[:, :, k, d] = (
sigma_0_pml + sigma_mult_pml * sz[k]
)
# if sz[k] > 0. : self.ieps[:, :, k, d] = 1/(np.mean(sz[:self.n_pml])*eps_0)
if self.bc_high[0].lower() == "pml":
# sx[-self.n_pml:] = 1/(2*self.dt)*((self.x[-self.n_pml:] - self.x[-self.n_pml])/(self.n_pml*self.dx))**pml_exp
sx[-self.n_pml :] = self.pml_func(
self.pml_lo, self.pml_hi, self.n_pml
)
for d in ["x", "y", "z"]:
# Get the properties from the layer before the PML
# Take the values at the center of the yz plane
ieps_0_pml = self.ieps[
-(self.n_pml + 1), self.Ny // 2, self.Nz // 2, d
]
sigma_0_pml = self.sigma[
-(self.n_pml + 1), self.Ny // 2, self.Nz // 2, d
]
sigma_mult_pml = (
1 if sigma_0_pml < 1 else sigma_0_pml
) # avoid null sigma in PML for relaxation time computation
for i in range(self.n_pml):
i += 1
self.ieps[-i, :, :, d] = ieps_0_pml
self.sigma[-i, :, :, d] = (
sigma_0_pml + sigma_mult_pml * sx[-i]
)
# if sx[-i] > 0 : self.ieps[-i, :, :, d] = 1/(eps_0+sx[-i]*(2*self.dt))
if self.bc_high[1].lower() == "pml":
# sy[-self.n_pml:] = 1/(2*self.dt)*((self.y[-self.n_pml:] - self.y[-self.n_pml])/(self.n_pml*self.dy))**pml_exp
sy[-self.n_pml :] = self.pml_func(
self.pml_lo, self.pml_hi, self.n_pml
)
for d in ["x", "y", "z"]:
# Get the properties from the layer before the PML
# Take the values at the center of the xz plane
ieps_0_pml = self.ieps[
self.Nx // 2, -(self.n_pml + 1), self.Nz // 2, d
]
sigma_0_pml = self.sigma[
self.Nx // 2, -(self.n_pml + 1), self.Nz // 2, d
]
sigma_mult_pml = (
1 if sigma_0_pml < 1 else sigma_0_pml
) # avoid null sigma in PML for relaxation time computation
for j in range(self.n_pml):
j += 1
self.ieps[:, -j, :, d] = ieps_0_pml
self.sigma[:, -j, :, d] = (
sigma_0_pml + sigma_mult_pml * sy[-j]
)
# if sy[-j] > 0 : self.ieps[:, -j, :, d] = 1/(eps_0+sy[-j]*(2*self.dt))
if self.bc_high[2].lower() == "pml":
# sz[-self.n_pml:] = eps_0/(2*self.dt)*((self.z[-self.n_pml:] - self.z[-self.n_pml])/(self.n_pml*self.dz))**pml_exp
sz[-self.n_pml :] = self.pml_func(
self.pml_lo, self.pml_hi, self.n_pml
)
for d in ["x", "y", "z"]:
# Get the properties from the layer before the PML
# Take the values at the center of the xy plane
ieps_0_pml = self.ieps[
self.Nx // 2, self.Ny // 2, -(self.n_pml + 1), d
]
sigma_0_pml = self.sigma[
self.Nx // 2, self.Ny // 2, -(self.n_pml + 1), d
]
sigma_mult_pml = (
1 if sigma_0_pml < 1 else sigma_0_pml
) # avoid null sigma in PML for relaxation time computation
for k in range(self.n_pml):
k += 1
self.ieps[:, :, -k, d] = ieps_0_pml
self.sigma[:, :, -k, d] = (
sigma_0_pml + sigma_mult_pml * sz[-k]
)
# self.ieps[:, :, -k, d] = 1/(np.mean(sz[-self.n_pml:])*eps_0)
[docs] def get_abc(self):
"""
Save boundary field snapshots needed by the Absorbing Boundary
Condition (ABC) update.
Extracts the necessary boundary layers for electric and magnetic
fields for those faces configured with ABC and returns two
dictionaries holding the saved arrays. Those dictionaries are later
consumed by ``update_abc`` to restore boundary values.
"""
E_abc, H_abc = {}, {}
if self.bc_low[0].lower() == "abc":
E_abc[0] = {}
H_abc[0] = {}
for d in ["x", "y", "z"]:
E_abc[0][d + "lo"] = self.E[1, :, :, d]
H_abc[0][d + "lo"] = self.H[1, :, :, d]
if self.bc_low[1].lower() == "abc":
E_abc[1] = {}
H_abc[1] = {}
for d in ["x", "y", "z"]:
E_abc[1][d + "lo"] = self.E[:, 1, :, d]
H_abc[1][d + "lo"] = self.H[:, 1, :, d]
if self.bc_low[2].lower() == "abc":
E_abc[2] = {}
H_abc[2] = {}
for d in ["x", "y", "z"]:
E_abc[2][d + "lo"] = self.E[:, :, 1, d]
H_abc[2][d + "lo"] = self.H[:, :, 1, d]
if self.bc_high[0].lower() == "abc":
E_abc[0] = {}
H_abc[0] = {}
for d in ["x", "y", "z"]:
E_abc[0][d + "hi"] = self.E[-1, :, :, d]
H_abc[0][d + "hi"] = self.H[-1, :, :, d]
if self.bc_high[1].lower() == "abc":
E_abc[1] = {}
H_abc[1] = {}
for d in ["x", "y", "z"]:
E_abc[1][d + "hi"] = self.E[:, -1, :, d]
H_abc[1][d + "hi"] = self.H[:, -1, :, d]
if self.bc_high[2].lower() == "abc":
E_abc[2] = {}
H_abc[2] = {}
for d in ["x", "y", "z"]:
E_abc[2][d + "hi"] = self.E[:, :, -1, d]
H_abc[2][d + "hi"] = self.H[:, :, -1, d]
return E_abc, H_abc
[docs] def update_abc(self, E_abc, H_abc):
"""
Apply the Absorbing Boundary Condition (ABC) using previously saved
snapshots.
Parameters
----------
E_abc, H_abc : dict
Dictionaries produced by ``get_abc`` that contain boundary-layer
field arrays. Each dictionary maps face indices to arrays used to
overwrite the exterior cell values after a timestep.
"""
if self.bc_low[0].lower() == "abc":
for d in ["x", "y", "z"]:
self.E[0, :, :, d] = E_abc[0][d + "lo"]
self.H[0, :, :, d] = H_abc[0][d + "lo"]
if self.bc_low[1].lower() == "abc":
for d in ["x", "y", "z"]:
self.E[:, 0, :, d] = E_abc[1][d + "lo"]
self.H[:, 0, :, d] = H_abc[1][d + "lo"]
if self.bc_low[2].lower() == "abc":
for d in ["x", "y", "z"]:
self.E[:, :, 0, d] = E_abc[2][d + "lo"]
self.H[:, :, 0, d] = H_abc[2][d + "lo"]
if self.bc_high[0].lower() == "abc":
for d in ["x", "y", "z"]:
self.E[-1, :, :, d] = E_abc[0][d + "hi"]
self.H[-1, :, :, d] = H_abc[0][d + "hi"]
if self.bc_high[1].lower() == "abc":
for d in ["x", "y", "z"]:
self.E[:, -1, :, d] = E_abc[1][d + "hi"]
self.H[:, -1, :, d] = H_abc[1][d + "hi"]
if self.bc_high[2].lower() == "abc":
for d in ["x", "y", "z"]:
self.E[:, :, -1, d] = E_abc[2][d + "hi"]
self.H[:, :, -1, d] = H_abc[2][d + "hi"]
def _set_ghosts_to_0(self):
"""
Zero-out ghost-cell field values used for MPI and boundary exchange.
Clears any initial condition values that were accidentally placed in
ghost cells so that subsequent MPI sends/receives and boundary updates
behave correctly.
"""
# Set H ghost quantities to 0
for d in ["x", "y", "z"]: # tangential to zero
if d != "x":
self.H[-1, :, :, d] = 0.0
if d != "y":
self.H[:, -1, :, d] = 0.0
if d != "z":
self.H[:, :, -1, d] = 0.0
# Set E ghost quantities to 0
self.E[-1, :, :, "x"] = 0.0
self.E[:, -1, :, "y"] = 0.0
self.E[:, :, -1, "z"] = 0.0
def _apply_conductors(self):
"""
Apply PEC conductor masking by zeroing inverse-permittivity inside
conductor volumes.
This enforces tangential electric field cancellation inside conductor
regions by setting the local 1/epsilon to zero.
"""
self.flag_in_conductors = (
self.grid.flag_int_cell_yz[:-1, :, :]
+ self.grid.flag_int_cell_zx[:, :-1, :]
+ self.grid.flag_int_cell_xy[:, :, :-1]
)
self.ieps *= self.flag_in_conductors
def _set_field_in_conductors_to_0(self):
"""
Zero dynamic fields inside conductor masks.
Ensures that any initial E/H fields mapped into conductor regions are
removed before time-stepping, avoiding non-physical behaviour.
"""
self.flag_cleanup = (
self.grid.flag_int_cell_yz[:-1, :, :]
+ self.grid.flag_int_cell_zx[:, :-1, :]
+ self.grid.flag_int_cell_xy[:, :, :-1]
)
self.H *= self.flag_cleanup
self.E *= self.flag_cleanup
def _apply_stl_materials(self):
"""
Mask STL solids in the grid and assign user-defined materials.
Iterates over STL solids imported in the grid and updates ``ieps``,
``imu`` and ``sigma`` according to the material provided for each
solid. Materials may be referenced by a library key (string) or given
as explicit tuples (eps_r, mu_r[, sigma]). Inverse permittivity and
inverse permeability values are stored in the corresponding Fields.
Notes
-----
- STL material values must be relative (eps_r, mu_r).
- Supply conductivity explicitly to enable conductive behaviour.
"""
grid = self.grid.grid
self.stl_solids = self.grid.stl_solids
self.stl_materials = self.grid.stl_materials
self.stl_colors = self.grid.stl_colors
for key in self.stl_solids.keys():
mask = np.reshape(grid[key], (self.Nx, self.Ny, self.Nz)).astype(
int
)
if type(self.stl_materials[key]) is str:
# Retrieve from material library
mat_key = self.stl_materials[key].lower()
eps = material_lib[mat_key][0] * eps_0
mu = material_lib[mat_key][1] * mu_0
# Setting to zero
self.ieps += self.ieps * (-1.0 * mask)
self.imu += self.imu * (-1.0 * mask)
# Adding new values
self.ieps += mask * 1.0 / eps
self.imu += mask * 1.0 / mu
# Conductivity
if len(material_lib[mat_key]) == 3:
sigma = material_lib[mat_key][2]
self.sigma += self.sigma * (-1.0 * mask)
self.sigma += mask * sigma
self.use_conductivity = True
elif self.sigma_bg > 0.0: # assumed sigma = 0
self.sigma += self.sigma * (-1.0 * mask)
else:
# From input
eps = self.stl_materials[key][0] * eps_0
mu = self.stl_materials[key][1] * mu_0
# Setting to zero
self.ieps += self.ieps * (-1.0 * mask)
self.imu += self.imu * (-1.0 * mask)
# Adding new values
self.ieps += mask * 1.0 / eps
self.imu += mask * 1.0 / mu
# Conductivity
if len(self.stl_materials[key]) == 3:
sigma = self.stl_materials[key][2]
self.sigma += self.sigma * (-1.0 * mask)
self.sigma += mask * sigma
self.use_conductivity = True
elif self.sigma_bg > 0.0: # assumed sigma = 0
self.sigma += self.sigma * (-1.0 * mask)
def _attrcleanup(self):
# Fields
del self.L, self.tL, self.iA, self.itA
if hasattr(self, "BC"):
del self.BC
del self.Dbc
# Matrices
del self.Px, self.Py, self.Pz
del self.Ds, self.iDa, self.tDs, self.itDa
del self.C
[docs] def save_state(self, filename="solver_state.h5", close=True):
"""
Save dynamic solver state (H, E, J) to an HDF5 file.
Writes the core dynamic fields to ``filename``. When running under MPI
the distributed fields are gathered to the root rank before saving.
Parameters
----------
filename : str, optional
Output HDF5 filename. Default is "solver_state.h5".
close : bool, optional
If True (default) the file is closed before returning. If False an
open ``h5py.File`` is returned for caller-managed operations.
Returns
-------
h5py.File or None
Open file object when ``close`` is False, otherwise None.
"""
if self.use_mpi: # MPI savestate
H = self.mpi_gather_asField("H")
E = self.mpi_gather_asField("E")
J = self.mpi_gather_asField("J")
if self.rank == 0:
state = h5py.File(filename, "w")
state.create_dataset("H", data=H)
state.create_dataset("E", data=E)
state.create_dataset("J", data=J)
# TODO: check for MPI-GPU
elif self.use_gpu: # GPU savestate
state = h5py.File(filename, "w")
state.create_dataset("H", data=self.H.toarray().get())
state.create_dataset("E", data=self.E.toarray().get())
state.create_dataset("J", data=self.J.toarray().get())
else: # CPU savestate
state = h5py.File(filename, "w")
state.create_dataset("H", data=self.H.toarray())
state.create_dataset("E", data=self.E.toarray())
state.create_dataset("J", data=self.J.toarray())
if close:
state.close()
else:
return state # Caller must close this manually
[docs] def load_state(self, filename="solver_state.h5"):
"""
Load dynamic solver state (H, E, J) from an HDF5 file and restore them.
Parameters
----------
filename : str, optional
Input HDF5 filename. Default is "solver_state.h5".
Notes
-----
Currently performs a simple load from a single-file state. MPI-aware
redistribution of loaded arrays to worker ranks is TODO.
"""
state = h5py.File(filename, "r")
self.E.fromarray(state["E"][:])
self.H.fromarray(state["H"][:])
self.J.fromarray(state["J"][:])
# TODO: support MPI loadstate
state.close()
[docs] def read_state(self, filename="solver_state.h5"):
"""
Open an HDF5 file for read-only access without loading its contents.
Returns an open ``h5py.File`` object that the caller must close when
finished. This is useful for inspecting saved state without restoring
it into the solver.
Parameters
----------
filename : str, optional
Input HDF5 filename. Default is "solver_state.h5".
Returns
-------
h5py.File
Open HDF5 file object in read mode.
"""
return h5py.File(filename, "r")
[docs] def reset_fields(self):
"""
Reset dynamic field arrays (E, H, J) to zero across the simulation.
Useful when reusing a ``SolverFIT3D`` instance for a new run without
reconstructing the entire object.
"""
for d in ["x", "y", "z"]:
self.E[:, :, :, d] = 0.0
self.H[:, :, :, d] = 0.0
self.J[:, :, :, d] = 0.0
[docs] def update_logger(self, attrs):
"""
Copy selected solver attributes into the internal ``Logger`` object.
Parameters
----------
attrs : iterable of str
Names of attributes to copy to ``self.logger.solver``. Special case
'grid' copies the grid logger reference instead of a value.
"""
for atr in attrs:
if atr == "grid":
self.logger.grid = self.grid.logger.grid
else:
self.logger.solver[atr] = getattr(self, atr)