Source code for hiperwalk.quantum_walk._pyneblina_interface

try:
    import neblina
except ModuleNotFoundError:
    pass
import numpy as np
import scipy.sparse
from warnings import warn
from .._constants import *

############################################
# used for automatically stopping the engine
import atexit

__engine_initiated = False
__hpc_type = None

[docs] def set_hpc(hpc): r""" Indicate which HPC platform is going to be used. After executing the ``set_hpc`` command, all subsequent hiperwalk commands will use the designated HPC platform. Parameters ---------- hpc : {None, 'cpu', 'gpu'} Indicates whether to utilize HPC for matrix multiplication using CPU or GPU. If ``hpc=None``, it will use standalone Python. """ new_hpc = hpc if hpc is not None: hpc = hpc.lower() hpc = hpc.strip() if hpc == 'cpu': new_hpc = 0 elif hpc == 'gpu': new_hpc = 1 else: raise ValueError( 'Unexpected value of `hpc`: ' + new_hpc + '. Expected a value in ' + "[None, 'cpu', 'gpu'].") global __hpc_type if __hpc_type != new_hpc: exit_handler() __hpc_type = new_hpc _init_engine()
def get_hpc(): global __hpc_type if __hpc_type == 0: return 'cpu' if __hpc_type == 1: return 'gpu' return None def exit_handler(): global __engine_initiated if __engine_initiated: neblina.stop_engine() __engine_initiated = False atexit.register(exit_handler) def _init_engine(): r""" Initiates neblina-core engine. Initiates the engine if it was not previously initiated """ global __engine_initiated global __hpc_type if not __engine_initiated and __hpc_type is not None: # TODO: if not 'neblina' in sys.modules raise ModuleNotFoundError neblina_imported = True try: neblina.init_engine(__hpc_type, 0) except NameError: neblina_imported = False if not neblina_imported: raise ModuleNotFoundError( "Module neblina was not imported. " + "Do you have neblina-core and pyneblina installed?" ) __engine_initiated = True def send_vector(v): r""" Transfers a vector (v) to Neblina-core, and moves it to the device to be used. Returns a pointer to this vector (needed to call other pyneblina functions). by default, a vector with complex entries is expected. If the matrix has only real entries, invoke this function by TransferVector(v, False); this saves half the memory that would be used. TODO: is there a way to move the vector to the device directly? I think an auxiliary vector is beign created, thus twice the memory needed is being used """ vec = neblina.load_numpy_array(v) neblina.move_vector_device(vec) return vec def retrieve_vector(nbl_vec): r""" Retrieves vector from the device and converts it to python array. By default, it is supposed that the vector is not going to be used in other pyneblina calculations, thus it is going to be deleted to free memory. TODO: get vector dimension(vdim) automatically """ # if a vector is being retrieved. # the engine should have been already initiated neblina.move_vector_host(nbl_vec) py_vec = neblina.retrieve_numpy_array(nbl_vec) # if not pynbl_vec.is_complex: # raise NotImplementedError("Cannot retrieve real-only vectors.") # py_vec = np.array( # [neblina.vector_get(nbl_vec, 2*i) # + 1j*neblina.vector_get(nbl_vec, 2*i + 1) # for i in range(pynbl_vec.shape)] # ) # TODO: check if vector is being deleted (or not) return py_vec def _send_sparse_matrix(M): r""" Transfers a sparse Matrix (M) stored in csr format to Neblina-core and moves it to the device (ready to be used). By default, a matrix with complex elements is expected. If the matrix has only real elements, invoke this function by TransferSparseMatrix(M, False); this saves half the memory that would be used. TODO: Add tests - Transfer and check real Matrix - Transfer and check complex Matrix TODO: isn't there a way for neblina-core to use the csr matrix directly? In order to avoid double memory usage """ # TODO: check if complex automatically? is_complex = np.issubdtype(M.dtype, np.complexfloating) n = M.shape[0] # creates neblina sparse matrix structure # TODO: needs better support from pyneblina to # use next instruction (commented). # For example: neblina.sparse_matrix_set works, but in the real case, # it should not be needed to pass the imaginary part as argument. # In addition, there should be a way to # return the matrix and automatically # convert to a matrix of float or of complex numbers accordingly. smat = (neblina.sparse_matrix_new(n, n, neblina.COMPLEX) if is_complex else neblina.sparse_matrix_new(n, n, neblina.FLOAT)) for row in range(n): start = M.indptr[row] end = M.indptr[row + 1] # columns must be added in reverse order for index in range(end - 1, start - 1, -1): col = M.indices[index] if is_complex: neblina.sparse_matrix_set(smat, row, col, M[row, col].real, M[row, col].imag) else: neblina.sparse_matrix_set(smat, row, col, M[row, col].real, 0) neblina.sparse_matrix_pack(smat) neblina.move_sparse_matrix_device(smat) return smat def _send_dense_matrix(M): mat = neblina.load_numpy_matrix(M) neblina.move_matrix_device(mat) return mat def send_matrix(M): if scipy.sparse.issparse(M): return _send_sparse_matrix(M) return _send_dense_matrix(M) def retrieve_matrix(nbl_mat): try: neblina.move_matrix_host(nbl_mat) mat = neblina.retrieve_numpy_matrix(nbl_mat) except: raise NotImplementedError( "Cannot retrieve sparse matrix." ) return mat def multiply_matrix_vector(nbl_mat, nbl_vec, is_sparse): """ Request matrix multiplication to neblina. Multiplies the matrix by the vector, i.e. ``smat @ vec``. Parameters ---------- mat : :class:`PyNeblinaMatrix` neblina matrix object vec : :class:`PyNeblinaVector` neblina vector object Returns ------- Neblina vector object resulted from matrix multiplication. See Also -------- send_sparse_matrix : returns a neblina sparse matrix object send_vector : returns a neblina vector object """ # if a matrix-vector operation is being requested, # the engine should have been already initiated if is_sparse: nbl_vec = neblina.sparse_matvec_mul(nbl_vec, nbl_mat) else: nbl_vec = neblina.matvec_mul(nbl_vec, nbl_mat) return nbl_vec def multiply_matrices(nbl_A, nbl_B): return neblina.mat_mul(nbl_A, nbl_B) def matrix_power_series(A, n): r""" Computes the following power series. A must be a dense numpy matrix. I + A + A^2/2 + A^3/3! + A^4/4! + ... + A^n/n! """ if scipy.sparse.issparse(A): A = np.array(A.todense()) warn( "Sparse matrix multiplication not implemented. " + "Converting to dense." ) pA = send_matrix(A) T = np.eye(A.shape[0], dtype=A.dtype) pT = send_matrix(T) M = np.eye(A.shape[0], dtype=A.dtype) pM = send_matrix(M) for i in range(1, n + 1): pT = neblina.mat_mul(pT, pA) pT = neblina.scalar_mat_mul(1/i, pT) pM = neblina.mat_add(pM, pT) #return neblina.retrieve_numpy_matrix(pM) return retrieve_matrix(pM) def copy_vector(v): res = neblina.copy_vector_from_device(v) vec = neblina.retrieve_numpy_array(res) return vec