Source code for wakis.solverFIT3D

# copyright ################################# #
# This file is part of the wakis Package.     #
# Copyright (c) CERN, 2024.                   #
# ########################################### #

import time

import h5py
import numpy as np
from scipy.constants import c as c_light
from scipy.constants import epsilon_0 as eps_0
from scipy.constants import mu_0 as mu_0
from scipy.sparse import csc_matrix as sparse_mat
from scipy.sparse import diags, hstack, vstack

from .boundaries import BCsMixin
from .field import Field
from .logger import Logger
from .materials import material_lib
from .plotting import PlotMixin
from .routines import RoutinesMixin

try:
    from cupyx.scipy.sparse import csc_matrix as gpu_sparse_mat

    imported_cupyx = True
except ImportError:
    imported_cupyx = False

try:
    from sparse_dot_mkl import csr_matrix as mkl_sparse_mat
    from sparse_dot_mkl import dot_product_mkl

    imported_mkl = True
except ImportError:
    imported_mkl = False


[docs]class SolverFIT3D(PlotMixin, RoutinesMixin, BCsMixin): def __init__( self, grid, wake=None, cfln=0.5, dt=None, bc_low=["Periodic", "Periodic", "Periodic"], bc_high=["Periodic", "Periodic", "Periodic"], use_stl=False, use_conductors=False, use_gpu=False, use_mpi=False, dtype=np.float64, n_pml=10, bg=[1.0, 1.0], verbose=1, ): """ 3D time-domain electromagnetic solver based on the Finite Integration Technique (FIT). Handles mesh and geometry, material assignment, boundary conditions and time-stepping. Supports CPU, optional GPU acceleration (cupyx) and MPI domain decomposition. Provides utilities for importing conductors and STL solids, applying PML/ABC boundaries, and saving/restoring solver state. Parameters ---------- grid : GridFIT3D Instance providing mesh, coordinate arrays and geometry flags. wake : WakeSolver, optional Wakefield object with beam parameters used for wake computations. cfln : float, optional CFL number used to compute a stable timestep when ``dt`` is None. dt : float, optional Explicit timestep. If provided, it overrides the CFL-based value. bc_low, bc_high : list of str, optional Boundary conditions for low/high faces in (x, y, z) order. use_stl : bool, optional If True, apply solids and materials provided in the ``grid`` object. use_conductors : bool, optional If True, import conductor geometry from ``conductors.py`` masks. use_gpu : bool, optional Enable GPU acceleration via ``cupyx`` (if available). use_mpi : bool, optional Enable MPI execution for a subdivided grid. dtype : numpy dtype, optional Numeric dtype for solver arrays (default ``np.float64``). n_pml : int, optional Number of PML cells for PML boundary regions. bg : sequence or str, optional Background material [eps_r, mu_r, sigma] or a material key from the library. If a sigma value is provided conductivity handling is enabled. verbose : int or bool, optional Verbosity flag for initialization messages. Attributes ---------- E, H, J : wakis.Field Electric field, magnetic field and current density containers. Access components via labels 'x','y','z'. Example: ``solver.E[:, :, n, 'z']`` gives Ez at z-index n. ieps, imu, sigma : wakis.Field Material tensors (inverse permittivity, inverse permeability and conductivity) stored per field component. grid : GridFIT3D Reference to the input grid object. dt : float Time-step used for time integration. cfln : float CFL number used when computing dt from grid spacing. """ self.verbose = verbose t0 = time.time() self.logger = Logger() # Flags self.step_0 = True self.nstep = int(0) self.plotter_active = False self.use_conductors = use_conductors self.use_stl = use_stl self.use_gpu = use_gpu self.use_mpi = use_mpi self.activate_abc = False # Will turn true if abc BCs are chosen self.activate_pml = False # Will turn true if pml BCs are chosen self.use_conductivity = False # Will turn true with conductive material or pml self.imported_mkl = imported_mkl # Use MKL backend when available self.one_step = self._one_step if use_stl: self.use_conductors = False self.update_logger(["use_gpu", "use_mpi"]) # Grid self.grid = grid self.background = bg self.Nx = self.grid.Nx self.Ny = self.grid.Ny self.Nz = self.grid.Nz self.N = self.Nx * self.Ny * self.Nz self.dx = self.grid.dx self.dy = self.grid.dy self.dz = self.grid.dz self.x = self.grid.x[:-1] + self.dx / 2 self.y = self.grid.y[:-1] + self.dy / 2 self.z = self.grid.z[:-1] + self.dz / 2 self.L = self.grid.L self.iA = self.grid.iA self.tL = self.grid.tL self.itA = self.grid.itA self.update_logger(["grid", "background"]) # Wake computation self.wake = wake if self.wake is not None: self.logger.wakeSolver = self.wake.logger.wakeSolver # Fields self.dtype = dtype self.E = Field( self.Nx, self.Ny, self.Nz, use_gpu=self.use_gpu, dtype=self.dtype ) self.H = Field( self.Nx, self.Ny, self.Nz, use_gpu=self.use_gpu, dtype=self.dtype ) self.J = Field( self.Nx, self.Ny, self.Nz, use_gpu=self.use_gpu, dtype=self.dtype ) # MPI init if self.use_mpi: if self.grid.use_mpi: self._mpi_initialize() self.one_step = self._mpi_one_step else: print( "[!] Grid not subdivided for MPI, set `use_mpi`=True also in \ `GridFIT3D` to enable MPI" ) # Matrices if verbose: print("Assembling operator matrices...") N = self.N self.Px = diags([-1, 1], [0, 1], shape=(N, N), dtype=np.int8) self.Py = diags([-1, 1], [0, self.Nx], shape=(N, N), dtype=np.int8) self.Pz = diags([-1, 1], [0, self.Nx * self.Ny], shape=(N, N), dtype=np.int8) # original grid self.Ds = diags(self.L.toarray(), shape=(3 * N, 3 * N), dtype=self.dtype) self.iDa = diags(self.iA.toarray(), shape=(3 * N, 3 * N), dtype=self.dtype) # tilde grid self.tDs = diags(self.tL.toarray(), shape=(3 * N, 3 * N), dtype=self.dtype) self.itDa = diags(self.itA.toarray(), shape=(3 * N, 3 * N), dtype=self.dtype) # Curl matrix self.C = vstack( [ hstack([sparse_mat((N, N)), -self.Pz, self.Py]), hstack([self.Pz, sparse_mat((N, N)), -self.Px]), hstack([-self.Py, self.Px, sparse_mat((N, N))]), ], dtype=np.int8, ) # Boundaries if verbose: print("Applying boundary conditions...") self.bc_low = bc_low self.bc_high = bc_high self.update_logger(["bc_low", "bc_high"]) self._apply_bc_to_C() # Materials if verbose: print("Adding material tensors...") if type(bg) is str: bg = material_lib[bg.lower()] if len(bg) == 3: self.eps_bg, self.mu_bg, self.sigma_bg = ( bg[0] * eps_0, bg[1] * mu_0, bg[2], ) self.use_conductivity = True else: self.eps_bg, self.mu_bg, self.sigma_bg = ( bg[0] * eps_0, bg[1] * mu_0, 0.0, ) # fmt: off self.ieps = ( Field(self.Nx, self.Ny, self.Nz, use_ones=True, dtype=self.dtype) * (1.0 / self.eps_bg) ) self.imu = ( Field(self.Nx, self.Ny, self.Nz, use_ones=True, dtype=self.dtype) * (1.0 / self.mu_bg) ) self.sigma = ( Field(self.Nx, self.Ny, self.Nz, use_ones=True, dtype=self.dtype) * self.sigma_bg ) # fmt: on if self.use_stl: self._apply_stl_materials() # Fill PML BCs if self.activate_pml: if verbose: print("Filling PML sigmas...") self.n_pml = n_pml self._initialize_PML() self.update_logger(["n_pml"]) # Timestep calculation if verbose: print("Calculating maximal stable timestep...") self.cfln = cfln if dt is None: self.dt = cfln / ( c_light * np.sqrt( 1 / np.min(self.grid.dx) ** 2 + 1 / np.min(self.grid.dy) ** 2 + 1 / np.min(self.grid.dz) ** 2 ) ) else: self.dt = dt self.dt = self.dtype(self.dt) self.update_logger(["dt"]) if self.use_conductivity: # relaxation time criterion tau mask = np.logical_and( self.sigma.toarray() != 0, # for non-conductive self.ieps.toarray() != 0, ) # for PEC eps=inf self.tau = (1 / self.ieps.toarray()[mask]) / self.sigma.toarray()[mask] if self.dt > self.tau.min(): self.dt = self.tau.min() # Pre-computing if verbose: print("Pre-computing...") self.iDeps = diags(self.ieps.toarray(), shape=(3 * N, 3 * N), dtype=self.dtype) self.iDmu = diags(self.imu.toarray(), shape=(3 * N, 3 * N), dtype=self.dtype) self.Dsigma = diags( self.sigma.toarray(), shape=(3 * N, 3 * N), dtype=self.dtype ) self.tDsiDmuiDaC = self.iDa * self.iDmu * self.C * self.Ds self.itDaiDepsDstC = self.iDeps * self.itDa * self.C.transpose() * self.tDs if imported_mkl and not self.use_gpu: # MKL backend for CPU if verbose: print("Using MKL backend for time-stepping...") self.tDsiDmuiDaC = mkl_sparse_mat(self.tDsiDmuiDaC) self.itDaiDepsDstC = mkl_sparse_mat(self.itDaiDepsDstC) self.one_step = ( self._mpi_one_step_mkl if self.use_mpi else self._one_step_mkl ) # Move to GPU if use_gpu: if verbose: print("Moving to GPU...") if imported_cupyx: self.tDsiDmuiDaC = gpu_sparse_mat(self.tDsiDmuiDaC) self.itDaiDepsDstC = gpu_sparse_mat(self.itDaiDepsDstC) self.ieps.to_gpu() self.sigma.to_gpu() else: raise ImportError( "[!] cupyx could not be imported, please check CUDA installation" ) if verbose: print(f"Total solver initialization time: {time.time() - t0} s") self.solverInitializationTime = time.time() - t0 self.update_logger(["solverInitializationTime"]) self.solverInitializationTime = time.time() - t0 self.update_logger(["solverInitializationTime"])
[docs] def update_tensors(self, tensor="all"): """ Update tensor matrices after material Field changes and precompute combined operators used for time-stepping. When ``ieps``, ``imu`` or ``sigma`` are modified this routine reconstructs the corresponding sparse diagonal matrices and the composite operator products used in the update equations. Use the ``tensor`` argument to restrict work to a single tensor for efficiency. Parameters ---------- tensor : {'ieps','imu','sigma','all'}, optional Which tensor to update. Default is 'all' which recomputes every tensor and refreshes the precomputed time-stepping matrices. """ if self.verbose: print(f'Re-computing tensor "{tensor}"...') if tensor == "ieps": self.iDeps = diags( self.ieps.toarray(), shape=(3 * self.N, 3 * self.N), dtype=self.dtype, ) elif tensor == "imu": self.iDmu = diags( self.imu.toarray(), shape=(3 * self.N, 3 * self.N), dtype=self.dtype, ) elif tensor == "sigma": self.Dsigma = diags( self.sigma.toarray(), shape=(3 * self.N, 3 * self.N), dtype=self.dtype, ) elif tensor == "all": self.iDeps = diags( self.ieps.toarray(), shape=(3 * self.N, 3 * self.N), dtype=self.dtype, ) self.iDmu = diags( self.imu.toarray(), shape=(3 * self.N, 3 * self.N), dtype=self.dtype, ) self.Dsigma = diags( self.sigma.toarray(), shape=(3 * self.N, 3 * self.N), dtype=self.dtype, ) if self.verbose: print("Re-Pre-computing ...") self.tDsiDmuiDaC = self.iDa * self.iDmu * self.C * self.Ds self.itDaiDepsDstC = self.iDeps * self.itDa * self.C.transpose() * self.tDs self.step_0 = False
def _one_step(self): if self.step_0: self._set_ghosts_to_0() self.step_0 = False self._attrcleanup() self.H.fromarray( self.H.toarray() - self.dt * self.tDsiDmuiDaC * self.E.toarray() ) self.E.fromarray( self.E.toarray() + self.dt * ( self.itDaiDepsDstC * self.H.toarray() - self.ieps.toarray() * self.J.toarray() ) ) # include current computation if self.use_conductivity: self.J.fromarray(self.sigma.toarray() * self.E.toarray()) def _one_step_mkl(self): if self.step_0: self._set_ghosts_to_0() self.step_0 = False self._attrcleanup() self.H.fromarray( self.H.toarray() - self.dt * dot_product_mkl(self.tDsiDmuiDaC, self.E.toarray()) ) self.E.fromarray( self.E.toarray() + self.dt * ( dot_product_mkl(self.itDaiDepsDstC, self.H.toarray()) - self.ieps.toarray() * self.J.toarray() ) ) # include current computation if self.use_conductivity: self.J.fromarray(self.sigma.toarray() * self.E.toarray()) def _mpi_initialize(self): self.comm = self.grid.comm self.rank = self.grid.rank self.size = self.grid.size self.NZ = self.grid.NZ self.ZMIN = self.grid.ZMIN self.ZMAX = self.grid.ZMAX self.Z = self.grid.Z def _mpi_one_step(self): if self.step_0: self._set_ghosts_to_0() self.step_0 = False self._attrcleanup() self.H.fromarray( self.H.toarray() - self.dt * self.tDsiDmuiDaC * self.E.toarray() ) self._mpi_communicate(self.H) self._mpi_communicate(self.J) self.E.fromarray( self.E.toarray() + self.dt * ( self.itDaiDepsDstC * self.H.toarray() - self.ieps.toarray() * self.J.toarray() ) ) self._mpi_communicate(self.E) # include current computation if self.use_conductivity: self.J.fromarray(self.sigma.toarray() * self.E.toarray()) def _mpi_one_step_mkl(self): if self.step_0: self._set_ghosts_to_0() self.step_0 = False self._attrcleanup() self.H.fromarray( self.H.toarray() - self.dt * dot_product_mkl(self.tDsiDmuiDaC, self.E.toarray()) ) self._mpi_communicate(self.H) self._mpi_communicate(self.J) self.E.fromarray( self.E.toarray() + self.dt * ( dot_product_mkl(self.itDaiDepsDstC, self.H.toarray()) - self.ieps.toarray() * self.J.toarray() ) ) self._mpi_communicate(self.E) # include current computation if self.use_conductivity: self.J.fromarray(self.sigma.toarray() * self.E.toarray()) def _mpi_communicate(self, field): if self.use_gpu: field.from_gpu() # ghosts lo if self.rank > 0: for d in ["x", "y", "z"]: self.comm.Sendrecv( field[:, :, 1, d], recvbuf=field[:, :, 0, d], dest=self.rank - 1, sendtag=0, source=self.rank - 1, recvtag=1, ) # ghosts hi if self.rank < self.size - 1: for d in ["x", "y", "z"]: self.comm.Sendrecv( field[:, :, -2, d], recvbuf=field[:, :, -1, d], dest=self.rank + 1, sendtag=1, source=self.rank + 1, recvtag=0, ) if self.use_gpu: field.to_gpu()
[docs] def mpi_gather(self, field, x=None, y=None, z=None, component=None): """ Gather a component or slice of a distributed Field from all MPI ranks. Assumes the field is split along the z-axis among ranks. The function collects local buffers, removes ghost cells and concatenates rank contributions to build a global NumPy array on the root rank (rank 0). Parameters ---------- field : str or wakis.Field Field identifier ('E','H','J') optionally with a component suffix (e.g. 'Ex'), or a ``wakis.Field`` object. If no component is given the 'z' component is used by default. x, y, z : int or slice, optional Index or slice for each axis to gather. Defaults to the full range. component : {'x','y','z'} or slice, optional Component to gather when ``field`` is a Field object. Returns ------- numpy.ndarray or None Assembled global array on rank 0; returns ``None`` on non-root ranks. """ if x is None: x = slice(0, self.Nx) if y is None: y = slice(0, self.Ny) if z is None: z = slice(0, self.NZ) if type(field) is str: if len(field) == 2: # support for e.g. field='Ex' component = field[1] field = field[0] elif len(field) == 4: # support for Abs component = field[1:] field = field[0] elif component is None: component = "z" print("[!] `component` not specified, using default component='z'") if field == "E": local = self.E[x, y, :, component].ravel() elif field == "H": local = self.H[x, y, :, component].ravel() elif field == "J": local = self.J[x, y, :, component].ravel() else: if component is None: component = "z" print("[!] `component` not specified, using default component='z'") local = field[x, y, :, component].ravel() buffer = self.comm.gather(local, root=0) _field = None if self.rank == 0: if type(x) is int and type(y) is int: # 1d array at x=a, y=b nz = self.NZ // self.size _field = np.zeros((self.NZ)) for r in range(self.size): zz = np.s_[r * nz : (r + 1) * nz] if r == 0: _field[zz] = np.reshape(buffer[r], (nz + self.grid.n_ghosts))[ :-1 ] elif r == (self.size - 1): _field[zz] = np.reshape(buffer[r], (nz + self.grid.n_ghosts))[ 1: ] else: _field[zz] = np.reshape( buffer[r], (nz + 2 * self.grid.n_ghosts) )[1:-1] _field = _field[z] elif type(x) is int: # 2d slice at x=a ny = y.stop - y.start nz = self.NZ // self.size _field = np.zeros((ny, self.NZ)) for r in range(self.size): zz = np.s_[r * nz : (r + 1) * nz] if r == 0: _field[:, zz] = np.reshape( buffer[r], (ny, nz + self.grid.n_ghosts) )[:, :-1] elif r == (self.size - 1): _field[:, zz] = np.reshape( buffer[r], (ny, nz + self.grid.n_ghosts) )[:, 1:] else: _field[:, zz] = np.reshape( buffer[r], (ny, nz + 2 * self.grid.n_ghosts) )[:, 1:-1] _field = _field[:, z] elif type(y) is int: # 2d slice at y=a nx = x.stop - x.start nz = self.NZ // self.size _field = np.zeros((nx, self.NZ)) for r in range(self.size): zz = np.s_[r * nz : (r + 1) * nz] if r == 0: _field[:, zz] = np.reshape( buffer[r], (nx, nz + self.grid.n_ghosts) )[:, :-1] elif r == (self.size - 1): _field[:, zz] = np.reshape( buffer[r], (nx, nz + self.grid.n_ghosts) )[:, 1:] else: _field[:, zz] = np.reshape( buffer[r], (nx, nz + 2 * self.grid.n_ghosts) )[:, 1:-1] _field = _field[:, z] else: # both type slice -> 3d array nx = x.stop - x.start ny = y.stop - y.start nz = self.NZ // self.size _field = np.zeros((nx, ny, self.NZ)) for r in range(self.size): zz = np.s_[r * nz : (r + 1) * nz] if r == 0: _field[:, :, zz] = np.reshape( buffer[r], (nx, ny, nz + self.grid.n_ghosts) )[:, :, :-1] elif r == (self.size - 1): _field[:, :, zz] = np.reshape( buffer[r], (nx, ny, nz + self.grid.n_ghosts) )[:, :, 1:] else: _field[:, :, zz] = np.reshape( buffer[r], (nx, ny, nz + 2 * self.grid.n_ghosts) )[:, :, 1:-1] _field = _field[:, :, z] return _field
[docs] def mpi_gather_asField(self, field): """ Gather distributed field data from MPI ranks and return a global Field. Collects the full 3-component field (E, H or J) from each rank and reconstructs a single ``wakis.Field`` on the root rank. Ghost cells are removed when reassembling the per-rank buffers. Parameters ---------- field : str or wakis.Field Identifier ('E','H','J') or a Field-like object to gather. Returns ------- wakis.Field or None Global Field object assembled on rank 0. Returns ``None`` on other ranks. """ _field = Field(self.Nx, self.Ny, self.NZ) for d in ["x", "y", "z"]: if type(field) is str: if field == "E": local = self.E[:, :, :, d].ravel() elif field == "H": local = self.H[:, :, :, d].ravel() elif field == "J": local = self.J[:, :, :, d].ravel() else: local = field[:, :, :, d].ravel() buffer = self.comm.gather(local, root=0) if self.rank == 0: nz = self.NZ // self.size for r in range(self.size): zz = np.s_[r * nz : (r + 1) * nz] if r == 0: _field[:, :, zz, d] = np.reshape( buffer[r], (self.Nx, self.Ny, nz + self.grid.n_ghosts), )[:, :, :-1] elif r == (self.size - 1): _field[:, :, zz, d] = np.reshape( buffer[r], (self.Nx, self.Ny, nz + self.grid.n_ghosts), )[:, :, 1:] else: _field[:, :, zz, d] = np.reshape( buffer[r], (self.Nx, self.Ny, nz + 2 * self.grid.n_ghosts), )[:, :, 1:-1] return _field
def _set_ghosts_to_0(self): """ Zero-out ghost-cell field values used for MPI and boundary exchange. Clears any initial condition values that were accidentally placed in ghost cells so that subsequent MPI sends/receives and boundary updates behave correctly. """ # Set H ghost quantities to 0 for d in ["x", "y", "z"]: # tangential to zero if d != "x": self.H[-1, :, :, d] = 0.0 if d != "y": self.H[:, -1, :, d] = 0.0 if d != "z": self.H[:, :, -1, d] = 0.0 # Set E ghost quantities to 0 self.E[-1, :, :, "x"] = 0.0 self.E[:, -1, :, "y"] = 0.0 self.E[:, :, -1, "z"] = 0.0 def _apply_conductors(self): """ Apply PEC conductor masking by zeroing inverse-permittivity inside conductor volumes. This enforces tangential electric field cancellation inside conductor regions by setting the local 1/epsilon to zero. """ self.flag_in_conductors = ( self.grid.flag_int_cell_yz[:-1, :, :] + self.grid.flag_int_cell_zx[:, :-1, :] + self.grid.flag_int_cell_xy[:, :, :-1] ) self.ieps *= self.flag_in_conductors def _set_field_in_conductors_to_0(self): """ Zero dynamic fields inside conductor masks. Ensures that any initial E/H fields mapped into conductor regions are removed before time-stepping, avoiding non-physical behaviour. """ self.flag_cleanup = ( self.grid.flag_int_cell_yz[:-1, :, :] + self.grid.flag_int_cell_zx[:, :-1, :] + self.grid.flag_int_cell_xy[:, :, :-1] ) self.H *= self.flag_cleanup self.E *= self.flag_cleanup def _apply_stl_materials(self): """ Mask STL solids in the grid and assign user-defined materials. Iterates over STL solids imported in the grid and updates ``ieps``, ``imu`` and ``sigma`` according to the material provided for each solid. Materials may be referenced by a library key (string) or given as explicit tuples (eps_r, mu_r[, sigma]). Inverse permittivity and inverse permeability values are stored in the corresponding Fields. Notes ----- - STL material values must be relative (eps_r, mu_r). - Supply conductivity explicitly to enable conductive behaviour. """ grid = self.grid.grid self.stl_solids = self.grid.stl_solids self.stl_materials = self.grid.stl_materials self.stl_colors = self.grid.stl_colors for key in self.stl_solids.keys(): mask = np.reshape(grid[key], (self.Nx, self.Ny, self.Nz)).astype(int) if type(self.stl_materials[key]) is str: # Retrieve from material library mat_key = self.stl_materials[key].lower() eps = material_lib[mat_key][0] * eps_0 mu = material_lib[mat_key][1] * mu_0 # Setting to zero self.ieps += self.ieps * (-1.0 * mask) self.imu += self.imu * (-1.0 * mask) # Adding new values self.ieps += mask * 1.0 / eps self.imu += mask * 1.0 / mu # Conductivity if len(material_lib[mat_key]) == 3: sigma = material_lib[mat_key][2] self.sigma += self.sigma * (-1.0 * mask) self.sigma += mask * sigma self.use_conductivity = True elif self.sigma_bg > 0.0: # assumed sigma = 0 self.sigma += self.sigma * (-1.0 * mask) else: # From input eps = self.stl_materials[key][0] * eps_0 mu = self.stl_materials[key][1] * mu_0 # Setting to zero self.ieps += self.ieps * (-1.0 * mask) self.imu += self.imu * (-1.0 * mask) # Adding new values self.ieps += mask * 1.0 / eps self.imu += mask * 1.0 / mu # Conductivity if len(self.stl_materials[key]) == 3: sigma = self.stl_materials[key][2] self.sigma += self.sigma * (-1.0 * mask) self.sigma += mask * sigma self.use_conductivity = True elif self.sigma_bg > 0.0: # assumed sigma = 0 self.sigma += self.sigma * (-1.0 * mask) def _attrcleanup(self): # Fields del self.L, self.tL, self.iA, self.itA if hasattr(self, "BC"): del self.BC del self.Dbc # Matrices del self.Px, self.Py, self.Pz del self.Ds, self.iDa, self.tDs, self.itDa del self.C
[docs] def save_state(self, filename="solver_state.h5", close=True): """ Save dynamic solver state (H, E, J) to an HDF5 file. Writes the core dynamic fields to ``filename``. When running under MPI the distributed fields are gathered to the root rank before saving. Parameters ---------- filename : str, optional Output HDF5 filename. Default is "solver_state.h5". close : bool, optional If True (default) the file is closed before returning. If False an open ``h5py.File`` is returned for caller-managed operations. Returns ------- h5py.File or None Open file object when ``close`` is False, otherwise None. """ if self.use_mpi: # MPI savestate H = self.mpi_gather_asField("H") E = self.mpi_gather_asField("E") J = self.mpi_gather_asField("J") if self.rank == 0: state = h5py.File(filename, "w") state.create_dataset("H", data=H) state.create_dataset("E", data=E) state.create_dataset("J", data=J) # TODO: check for MPI-GPU elif self.use_gpu: # GPU savestate state = h5py.File(filename, "w") state.create_dataset("H", data=self.H.toarray().get()) state.create_dataset("E", data=self.E.toarray().get()) state.create_dataset("J", data=self.J.toarray().get()) else: # CPU savestate state = h5py.File(filename, "w") state.create_dataset("H", data=self.H.toarray()) state.create_dataset("E", data=self.E.toarray()) state.create_dataset("J", data=self.J.toarray()) if close: state.close() else: return state # Caller must close this manually
[docs] def load_state(self, filename="solver_state.h5"): """ Load dynamic solver state (H, E, J) from an HDF5 file and restore them. Parameters ---------- filename : str, optional Input HDF5 filename. Default is "solver_state.h5". Notes ----- Currently performs a simple load from a single-file state. MPI-aware redistribution of loaded arrays to worker ranks is TODO. """ state = h5py.File(filename, "r") self.E.fromarray(state["E"][:]) self.H.fromarray(state["H"][:]) self.J.fromarray(state["J"][:]) # TODO: support MPI loadstate state.close()
[docs] def read_state(self, filename="solver_state.h5"): """ Open an HDF5 file for read-only access without loading its contents. Returns an open ``h5py.File`` object that the caller must close when finished. This is useful for inspecting saved state without restoring it into the solver. Parameters ---------- filename : str, optional Input HDF5 filename. Default is "solver_state.h5". Returns ------- h5py.File Open HDF5 file object in read mode. """ return h5py.File(filename, "r")
[docs] def reset_fields(self): """ Reset dynamic field arrays (E, H, J) to zero across the simulation. Useful when reusing a ``SolverFIT3D`` instance for a new run without reconstructing the entire object. """ for d in ["x", "y", "z"]: self.E[:, :, :, d] = 0.0 self.H[:, :, :, d] = 0.0 self.J[:, :, :, d] = 0.0
[docs] def update_logger(self, attrs): """ Copy selected solver attributes into the internal ``Logger`` object. Parameters ---------- attrs : iterable of str Names of attributes to copy to ``self.logger.solver``. Special case 'grid' copies the grid logger reference instead of a value. """ for atr in attrs: if atr == "grid": self.logger.grid = self.grid.logger.grid else: self.logger.solver[atr] = getattr(self, atr)