Source code for spinguin.core.la

"""
This module provides various linear algebra tools required for spin dynamics
simulations.
"""

# Imports
import math
import numpy as np
from numpy.typing import ArrayLike
from scipy.sparse import eye_array, csc_array, block_array, issparse
from scipy.io import mmwrite, mmread
from io import BytesIO
from functools import lru_cache
from sympy.physics.quantum.cg import CG
from spinguin.core.sparse_dot import sparse_dot as _sparse_dot
from spinguin.core.intersect_indices import intersect_indices
from spinguin.core.hide_prints import HidePrints
from spinguin.core.nmr_isotopes import ISOTOPES
from multiprocessing.shared_memory import SharedMemory

[docs] def write_shared_sparse(A: csc_array) -> tuple[ dict[str, str | np.dtype | tuple[int]], tuple[SharedMemory, SharedMemory, SharedMemory]]: """ Creates a shared memory representation of a sparse CSC array. Parameters ---------- A : csc_array Sparse array to be shared. Returns ------- A_shared : dict Dictionary containing shared memory names and metadata for the sparse array's data, indices, and indptr, along with their shapes and dtypes. A_shm : tuple Tuple containing the shared memory objects for the sparse array's data, indices, and indptr. """ # Create a shared memory of the sparse array A_data_shm = SharedMemory(create=True, size=A.data.nbytes) A_indices_shm = SharedMemory(create=True, size=A.indices.nbytes) A_indptr_shm = SharedMemory(create=True, size=A.indptr.nbytes) A_data_shared = np.ndarray(A.data.shape, dtype=A.data.dtype, buffer=A_data_shm.buf) A_indices_shared = np.ndarray(A.indices.shape, dtype=A.indices.dtype, buffer=A_indices_shm.buf) A_indptr_shared = np.ndarray(A.indptr.shape, dtype=A.indptr.dtype, buffer=A_indptr_shm.buf) A_data_shared[:] = A.data[:] A_indices_shared[:] = A.indices[:] A_indptr_shared[:] = A.indptr[:] # Save the information of the memory to a dictionary A_shared = { 'A_data_shm_name' : A_data_shm.name, 'A_indices_shm_name' : A_indices_shm.name, 'A_indptr_shm_name' : A_indptr_shm.name, 'A_data_shape' : A.data.shape, 'A_indices_shape' : A.indices.shape, 'A_indptr_shape' : A.indptr.shape, 'A_data_dtype' : A.data.dtype, 'A_indices_dtype' : A.indices.dtype, 'A_indptr_dtype' : A.indptr.dtype, 'A_shape' : A.shape } # Combine the shared memories into one tuple A_shm = (A_data_shm, A_indices_shm, A_indptr_shm) return A_shared, A_shm
[docs] def read_shared_sparse(A_shared: dict[str, str | np.dtype | tuple[int]]) -> \ tuple[csc_array, tuple[SharedMemory, SharedMemory, SharedMemory]]: """ Reads a shared memory representation of a sparse CSC array and reconstructs it. Parameters ---------- A_shared : dict Dictionary containing shared memory names and metadata for the sparse array's data, indices, and indptr, along with their shapes and dtypes. Returns ------- A : csc_array Sparse array reconstructed from the shared memory. A_shm : tuple Tuple containing the shared memory objects for the sparse array's data, indices, and indptr. """ # Parse the dictionary A_data_shm_name = A_shared['A_data_shm_name'] A_indices_shm_name = A_shared['A_indices_shm_name'] A_indptr_shm_name = A_shared['A_indptr_shm_name'] A_data_shape = A_shared['A_data_shape'] A_indices_shape = A_shared['A_indices_shape'] A_indptr_shape = A_shared['A_indptr_shape'] A_data_dtype = A_shared['A_data_dtype'] A_indices_dtype = A_shared['A_indices_dtype'] A_indptr_dtype = A_shared['A_indptr_dtype'] A_shape = A_shared['A_shape'] # Obtain the shared memories A_data_shm = SharedMemory(name=A_data_shm_name) A_indices_shm = SharedMemory(name=A_indices_shm_name) A_indptr_shm = SharedMemory(name=A_indptr_shm_name) # Obtain the previously shared array A_data = np.ndarray(shape=A_data_shape, dtype=A_data_dtype, buffer=A_data_shm.buf) A_indices = np.ndarray(shape=A_indices_shape, dtype=A_indices_dtype, buffer=A_indices_shm.buf) A_indptr = np.ndarray(shape=A_indptr_shape, dtype=A_indptr_dtype, buffer=A_indptr_shm.buf) # Create the sparse array A = csc_array((A_data, A_indices, A_indptr), shape=A_shape, copy=False) # Combine the shared memories into one tuple A_shm = (A_data_shm, A_indices_shm, A_indptr_shm) return A, A_shm
[docs] def isvector(v: csc_array | np.ndarray, ord: str = "col") -> bool: """ Checks if the given array is a vector. Parameters ---------- v : csc_array or ndarray Array to be checked. Must be two-dimensional. ord : str Can be either "col" or "row". Returns ------- bool True if the array is a vector. """ # Check whether the array is two-dimensional if len(v.shape) != 2: raise ValueError("Input array must be two-dimensional.") # Determine whether to check for row or column vector if ord == "col": i = 1 elif ord == "row": i = 0 else: raise ValueError(f"Invalid value for ord: {ord}") # Check whether the array is a vector return v.shape[i] == 1
[docs] def norm_1(A: csc_array | np.ndarray, ord: str = 'row') -> float: """ Calculates the 1-norm of a matrix. Parameters ---------- A : csc_array or ndarray Array for which the norm is calculated. ord : str, default='row' Either 'row' or 'col', specifying the direction for the 1-norm calculation. Returns ------- norm_1 : float 1-norm of the given array `A`. """ # Process either row- or column-wise if ord == 'row': axis = 1 elif ord == 'col': axis = 0 else: raise ValueError(f"Invalid value for ord: {ord}") # Calculate sums along rows or columns and get the maximum of them return abs(A).sum(axis).max()
[docs] def expm(A: np.ndarray | csc_array, zero_value: float) -> np.ndarray | csc_array: """ Calculates the matrix exponential of a SciPy sparse CSC array using the scaling and squaring method with the Taylor series, shown to be the fastest method in: https://doi.org/10.1016/j.jmr.2010.12.004 This function uses a custom dot product implementation, which is more memory-efficient and parallelized. Parameters ---------- A : ndarray or csc_array Array to be exponentiated. zero_value : float Values below this threshold are considered zero. Used to increase the sparsity of the result and estimate the convergence of the Taylor series. Returns ------- expm_A : ndarray or csc_array Matrix exponential of `A`. """ print("Calculating the matrix exponential...") # Calculate the norm of A norm_A = norm_1(A, ord='col') # If the norm of the matrix is too large, scale the matrix down if norm_A > 1: # Calculate the scaling factor for the matrix scaling_count = int(math.ceil(math.log2(norm_A))) scaling_factor = 2 ** scaling_count print(f"Scaling the matrix down {scaling_count} times.") # Scale the matrix down A = A / scaling_factor # Calculate the expm of the scaled matrix using the Taylor series expm_A = expm_taylor(A, zero_value) # Scale the matrix exponential back up by repeated squaring for i in range(scaling_count): print(f"Squaring the matrix. Step {i+1} of {scaling_count}.") expm_A = custom_dot(expm_A, expm_A, zero_value) # If the norm of the matrix is small, proceed without scaling else: expm_A = expm_taylor(A, zero_value) print("Matrix exponential completed.") return expm_A
[docs] def expm_taylor(A: np.ndarray | csc_array, zero_value: float) -> np.ndarray | csc_array: """ Computes the matrix exponential using the Taylor series. This function is adapted from an older SciPy version. It uses a custom dot product implementation, which is more memory-efficient and parallelized. Parameters ---------- A : ndarray or csc_array Matrix (N, N) to be exponentiated. zero_value : float Values below this threshold are considered zero. Used to increase sparsity and check the convergence of the series. Returns ------- eA : ndarray or csc_array Matrix exponential of A. """ print("Calculating the matrix exponential using Taylor series.") # Increase sparsity of A eliminate_small(A, zero_value) # Create a unit matrix for the first term eA = eye_array(A.shape[0], A.shape[0], dtype=complex, format='csc') # Make a copy for the terms trm = eA.copy() # Calculate new terms until their significance becomes negligible k = 1 cont = True while cont: print(f"Taylor series term: {k}") # Get the next term trm = custom_dot(trm, A / k, zero_value) # Add the term to the result eA += trm # Increment the counter k += 1 # Continue if the convergence criterion is not met if issparse(A): cont = (trm.nnz != 0) else: cont = (np.count_nonzero(trm) != 0) print("Taylor series converged.") return eA
[docs] def eliminate_small(A: np.ndarray | csc_array, zero_value: float): """ Eliminates small values from the input matrix `A` by replacing values smaller than `zero_value` with zeros. Modification happens inplace. Parameters ---------- A : ndarray or csc_array Array to be modified. zero_value : float Values smaller than this threshold are set to zero. """ # Identify values smaller than the threshold and set them to zero if issparse(A): nonzero_mask = np.abs(A.data) < zero_value A.data[nonzero_mask] = 0 A.eliminate_zeros() else: nonzero_mask = np.abs(A) < zero_value A[nonzero_mask] = 0
[docs] def sparse_to_bytes(A: csc_array) -> bytes: """ Converts the given SciPy sparse array into a byte representation. Parameters ---------- A : csc_array Sparse matrix to be converted into bytes. Returns ------- A_bytes : bytes Byte representation of the input matrix. """ # Initialize a BytesIO object bytes_io = BytesIO() # Write the matrix A to bytes mmwrite(bytes_io, A) # Retrieve the bytes A_bytes = bytes_io.getvalue() return A_bytes
[docs] def bytes_to_sparse(A_bytes: bytes) -> csc_array: """ Converts a byte representation back to a SciPy sparse array. Parameters ---------- A_bytes : bytes Byte representation of a SciPy sparse array. Returns ------- A : csc_array Sparse array reconstructed from the byte representation. """ # Initialize a BytesIO object bytes_io = BytesIO(A_bytes) # Read the SciPy sparse array from bytes A = mmread(bytes_io) return A
[docs] def comm(A: csc_array | np.ndarray, B: csc_array | np.ndarray) -> csc_array | np.ndarray: """ Calculates the commutator [A, B] of two operators. Parameters ---------- A : csc_array or ndarray First operator. B : csc_array or ndarray Second operator. Returns ------- C : csc_array or ndarray Commutator [A, B]. """ # Compute the commutator C = A @ B - B @ A return C
[docs] def find_common_rows(A: np.ndarray, B: np.ndarray) -> tuple[np.ndarray, np.ndarray]: """ Identifies the indices of common rows between two arrays, `A` and `B`. Each row must appear only once in the arrays and they must be sorted in lexicographical order. Parameters ---------- A : ndarray First array to compare. B : ndarray Second array to compare. Returns ------- A_ind : ndarray Indices of the common rows in array `A`. B_ind : ndarray Indices of the common rows in array `B`. """ # Handle special case where both arrays are empty if A.shape[1] == 0 and B.shape[1] == 0: A_ind = np.array([0]) B_ind = np.array([0]) return A_ind, B_ind # Get the row length ensuring the correct data type row_length = np.longlong(A.shape[1]) # Convert the arrays to 1D A = A.ravel() B = B.ravel() # Ensure that the data types are correct A = A.astype(np.longlong) B = B.astype(np.longlong) # Find the common indices A_ind, B_ind = intersect_indices(A, B, row_length) return A_ind, B_ind
[docs] def auxiliary_matrix_expm(A: np.ndarray | csc_array, B: np.ndarray | csc_array, C: np.ndarray | csc_array, t: float, zero_value: float) -> csc_array: """ Computes the matrix exponential of an auxiliary matrix. This is used to calculate the Redfield integral. Based on Goodwin and Kuprov (Eq. 3): https://doi.org/10.1063/1.4928978 Parameters ---------- A : ndarray or csc_array Top-left block of the auxiliary matrix. B : ndarray or csc_array Top-right block of the auxiliary matrix. C : ndarray or csc_array Bottom-right block of the auxiliary matrix. t : float Integration time. zero_value : float Threshold below which values are considered zero when exponentiating the auxiliary matrix using the Taylor series. This significantly impacts performance. Use the largest value that still provides correct results. Returns ------- expm_aux : ndarray or csc_array Matrix exponential of the auxiliary matrix. The output is sparse or dense matching the sparsity of the input. """ # Ensure that the input arrays are all either sparse or dense if not (issparse(A) == issparse(B) == issparse(C)): raise ValueError(f"All arrays A, B and C must be of same type.") # Are we using sparse? sparse = issparse(A) # Construct the auxiliary matrix if sparse: empty_array = csc_array(A.shape) aux = block_array([[A, B], [empty_array, C]], format='csc') else: empty_array = np.zeros(A.shape) aux = np.block([[A, B], [empty_array, C]]) # Compute the matrix exponential of the auxiliary matrix with HidePrints(): expm_aux = expm(aux * t, zero_value) return expm_aux
[docs] def angle_between_vectors(v1: np.ndarray, v2: np.ndarray) -> float: """ Computes the angle between two vectors in radians. Parameters ---------- v1 : ndarray First vector. v2 : ndarray Second vector. Returns ------- theta : float Angle between the vectors in radians. """ # Handle the case where the vectors are identical if np.array_equal(v1, v2): theta = 0 else: theta = np.arccos( np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))) return theta
[docs] def decompose_matrix(matrix: np.ndarray) \ -> tuple[np.ndarray, np.ndarray, np.ndarray]: """ Decomposes a matrix into three components: - Isotropic part. - Antisymmetric part. - Symmetric traceless part. Parameters ---------- matrix : ndarray Matrix to decompose. Returns ------- isotropic : ndarray Isotropic part of the input matrix. antisymmetric : ndarray Antisymmetric part of the input matrix. symmetric_traceless : ndarray Symmetric traceless part of the input matrix. """ # Compute the isotropic, antisymmetric, and symmetric traceless parts isotropic = np.trace(matrix) * np.eye(matrix.shape[0]) / matrix.shape[0] antisymmetric = (matrix - matrix.T) / 2 symmetric_traceless = (matrix + matrix.T) / 2 - isotropic return isotropic, antisymmetric, symmetric_traceless
[docs] def principal_axis_system(tensor: np.ndarray) \ -> tuple[np.ndarray, np.ndarray, np.ndarray]: """ Determines the principal axis system (PAS) of a Cartesian tensor and transforms the tensor into the PAS. The PAS is defined as the coordinate system that diagonalizes the symmetric traceless part of the tensor. The eigenvalues are ordered as `(|largest|, |middle|, |smallest|)`. Parameters ---------- tensor : np.ndarray Cartesian tensor to transform. Returns ------- eigenvalues : ndarray Eigenvalues of the tensor in the PAS. eigenvectors : ndarray Two-dimensional array where rows contain the eigenvectors of the PAS. tensor_PAS : ndarray Tensor transformed into the PAS. """ # Extract the symmetric traceless part of the tensor _, _, symmetric_traceless = decompose_matrix(tensor) # Diagonalize the symmetric traceless part eigenvalues, eigenvectors = np.linalg.eig(symmetric_traceless) # Sort eigenvalues and eigenvectors by the absolute value of eigenvalues idx = np.argsort(np.abs(eigenvalues))[::-1] eigenvalues = eigenvalues[idx] eigenvectors = eigenvectors[:, idx].T # Transform the tensor into the principal axis system tensor_PAS = eigenvectors @ tensor @ np.linalg.inv(eigenvectors) return eigenvalues, eigenvectors, tensor_PAS
[docs] def cartesian_tensor_to_spherical_tensor(C: np.ndarray) -> dict: """ Converts a rank-2 Cartesian tensor to a spherical tensor. Uses the double outer product (DOP) convention from: Eqs. 293-298 in Man: Cartesian and Spherical Tensors in NMR Hamiltonians https://doi.org/10.1002/cmr.a.21289 Parameters ---------- C : ndarray Rank-2 tensor in Cartesian coordinates. Returns ------- spherical_tensor : dict Keys specify the rank and the projection (l, q), and the values are the components. """ # Extract the Cartesian components C_xx, C_xy, C_xz = C[0, :] C_yx, C_yy, C_yz = C[1, :] C_zx, C_zy, C_zz = C[2, :] # Build the spherical tensor components spherical_tensor = { (0, 0): -1 / math.sqrt(3) * (C_xx + C_yy + C_zz), (1, 0): -1j / math.sqrt(2) * (C_xy - C_yx), (1, 1): -1 / 2 * (C_zx - C_xz + 1j * (C_zy - C_yz)), (1, -1): -1 / 2 * (C_zx - C_xz - 1j * (C_zy - C_yz)), (2, 0): 1 / math.sqrt(6) * (-C_xx + 2 * C_zz - C_yy), (2, 1): -1 / 2 * (C_xz + C_zx + 1j * (C_yz + C_zy)), (2, -1): 1 / 2 * (C_xz + C_zx - 1j * (C_yz + C_zy)), (2, 2): 1 / 2 * (C_xx - C_yy + 1j * (C_xy + C_yx)), (2, -2): 1 / 2 * (C_xx - C_yy - 1j * (C_xy + C_yx)) } return spherical_tensor
[docs] def vector_to_spherical_tensor(vector: np.ndarray) -> dict: """ Converts a Cartesian vector to a spherical tensor of rank 1. Uses the covariant components. Eq. 230 in Man: Cartesian and Spherical Tensors in NMR Hamiltonians https://doi.org/10.1002/cmr.a.21289 Parameters ---------- vector : ndarray Vector in the format [x, y, z]. Returns ------- spherical_tensor : dict Keys specify the rank and the projection (l, q), and the values are the components. """ # Build the spherical tensor spherical_tensor = { (1, 1): -1 / math.sqrt(2) * (vector[0] + 1j * vector[1]), (1, 0): vector[2], (1, -1): 1 / math.sqrt(2) * (vector[0] - 1j * vector[1]) } return spherical_tensor
[docs] @lru_cache(maxsize=32784) def CG_coeff(j1: float, m1: float, j2: float, m2: float, j3: float, m3: float) -> float: """ Computes the Clebsch-Gordan coefficients. Parameters ---------- j1 : float Angular momentum of state 1. m1 : float Magnetic quantum number of state 1. j2 : float Angular momentum of state 2. m2 : float Magnetic quantum number of state 2. j3 : float Total angular momentum of the coupled system. m3 : float Magnetic quantum number of the coupled system. Returns ------- coeff : float Clebsch-Gordan coefficient. """ # Get the coefficient coeff = float(CG(j1, m1, j2, m2, j3, m3).doit()) return coeff
[docs] def custom_dot( A: np.ndarray | csc_array, B: np.ndarray | csc_array, zero_value: float ) -> csc_array: """ User-friendly wrapper for the custom sparse matrix multiplication, which saves memory usage by dropping values smaller than `zero_value` during the calculation. The sparse multiplication is implemented with C++ / Cython and is parallelized with OpenMP. NOTE: If either of the input arrays is NumPy array, this function falls back to the regular `@` multiplication. Parameters ---------- A : ndarray or csc_array First matrix in the multiplication. B : ndarray or csc_array Second matrix in the multiplication. zero_value : float Threshold under which the resulting matrix elements are considered as zero. Returns ------- C : ndarray or csc_array Result of matrix multiplication. """ # Check input types if isinstance(A, np.ndarray) or isinstance(B, np.ndarray): C = A @ B eliminate_small(C, zero_value) elif issparse(A) and issparse(B): A = A.tocsc() B = B.tocsc() C = _sparse_dot(A, B, zero_value) else: raise ValueError("Invalid input type for custom dot.") return C
[docs] def arraylike_to_tuple(A: ArrayLike) -> tuple: """ Converts a 1-dimensional `ArrayLike` object into a Python tuple. Parameters ---------- A : ArrayLike An object that can be converted into NumPy array. Returns ------- A : tuple The original object represented as Python tuple. """ # Convert to tuple A = np.asarray(A) if A.ndim == 0: A = tuple([A.item()]) elif A.ndim == 1: A = tuple(A) else: raise ValueError(f"Cannot convert {A.ndim}-dimensional array into " "tuple.") return A
[docs] def arraylike_to_array(A: ArrayLike) -> np.ndarray: """ Converts an `ArrayLike` object into a NumPy array while ensuring that at least one dimension is created. Parameters ---------- A : ArrayLike An object that can be converted into NumPy array. Returns ------- A : ndarray The original object converted into a NumPy array. """ # Convert to NumPy array and ensure at least one dimension A = np.asarray(A) A = np.atleast_1d(A) return A