"""
redrock.zscan
=============
Algorithms for scanning redshifts.
"""
from __future__ import division, print_function
import time
import sys
import traceback
import numpy as np
from scipy.optimize import lsq_linear, nnls
#try:
# import cupy as cp
# import cupyx.scipy
# import cupyx
# cupy_available = cp.is_available()
#except ImportError:
# cupy_available = False
#Moved CUPY import to local within method. Set global var to None here
cp = None
#On consumer grade GPUs with lower memory this will be set to True
#and attempts will be made to minimize memory usage to under 4 GB
cp_memcheck = False
from .utils import elapsed
from .targets import distribute_targets
#Since CUPY import is moved, vars block_size and cuda_source will always be
#set but this should cost no real time.
block_size = 512 #Default block size, should work on all modern nVidia GPUs
# cuda_source contains raw CUDA kernels to be loaded as CUPY module
cuda_source = r'''
extern "C" {
__global__ void batch_dot_product_sparse(const double* Rcsr_values, const int* Rcsr_cols, const int* Rcsr_indptr, const double* tdata, double* tb, int nrows, int ncols, int nbasis, int nt) {
// This kernel performs a batch dot product of a sparse matrix Rcsr
// with a set of redshifted templates
//** Args:
// Rcsr_values, Rcsr_cols, Rcsr_indptr = individualarrays from sparse matrix
// (Rcsr = sparse array, ncols x nrows)
// tdata = redshifted templates, nt x ncols x nbasis
// tb = output array = nt x ncols x nbasis
// nrows, ncols, nbasis, nt = array dimensions
const int i = blockDim.x*blockIdx.x + threadIdx.x; //thread index i - corresponds to the output array index
if (i >= ncols*nbasis*nt) return;
double x = 0; //define a local var to accumulate
//ibatch, icol, it = index in 3d representation of output tb array
//icol also == row in Rcsr input
int ibatch = i % nbasis;
int icol = (i % (nbasis*ncols)) / nbasis;
int it = i / (nbasis*ncols);
int t_start = it*nbasis*ncols; //first index in tdata for this thread
int row = icol;
int col;
//loop over all nonzero entries in sparse matrix and compute dot product
for (int j = Rcsr_indptr[row]; j < Rcsr_indptr[row+1]; j++) {
col = Rcsr_cols[j];
x += Rcsr_values[j] * tdata[t_start+nbasis*col+ibatch];
}
tb[i] = x;
return;
}
__global__ void batch_dot_product_3d3d(const double* a, const double* b, double* M, int nrows, int nbasis, int nt, int nparallel, int transpose_a) {
// This kernel computes a batch dot product of two 3-d arrays,
// a and b such that for every i
// M[i,:,:] = a[i,:,:].dot(b[i,:,:])
// This replicates the CUPY code M = a @ b but more efficiently
// It will use nparallel threads to compute the dot product
// for each output element in M. Each thread will handle
// nrows/nparallel products and sums into an intermediate
// local variable // and then an atomicAdd will be used to add
// this intermediate sum to the output array.
// This replicates the python command:
// M = Tb.T.dot(np.multiply(weights[:,None], Tb))
// for all M where a = Tb.T and b = weights[:,None]*Tb
//
//** Args:
// a = a 3-d array
// b = a 3-d array
// weights = the weights array for this target (1d, size = nrows)
// wflux = the wflux array for this target (1d, size = nrows)
// M = the output M array (nt x nbasis x nbasis)
// y = the output y array (nt x nbasis)
// nrows, nbasis, nt = array dimensions
// nparallel = number of parallel threads to used for each output
const int i = blockDim.x*blockIdx.x + threadIdx.x; //thread index i
if (i >= nbasis*nbasis*nt*nparallel) return;
int m_idx = i / nparallel; //index in output M array
int t = m_idx / (nbasis*nbasis); //target number
int a_row = (m_idx % (nbasis*nbasis)) % nbasis; //row in a array
int b_row = (m_idx % (nbasis*nbasis)) / nbasis; //row in b array
int stride = nrows/nparallel; //stride to divide up nparallel threads
int start = (threadIdx.x % nparallel)*stride; //start index in nrows dim
int end = ((threadIdx.x % nparallel)+1)*stride; //end index in nrows dim
if (threadIdx.x % nparallel == (nparallel-1)) end = nrows;
int a_idx = t*nrows*nbasis + a_row*nrows; // 1-d index for first element to be processed by this thread
if (transpose_a) a_idx = t*nrows*nbasis + a_row;
int b_idx = t*nrows*nbasis + b_row; // 1-d index for first element to be processed by this thread
double x = 0; //define local var to accumulate
//perform intermediate sum dot product for this thread
if (transpose_a) {
for (int j = start; j < end; j++) {
//stride by nbasis
x += a[a_idx+j*nbasis] * b[b_idx+j*nbasis];
}
} else {
for (int j = start; j < end; j++) {
//stride by nbasis
x += a[a_idx+j] * b[b_idx+j*nbasis];
}
}
//use atomic add to avoid collisions between threads
atomicAdd(&M[m_idx], x);
}
__global__ void batch_dot_product_3d2d(const double* tb, const double* zc, double* model, int nrows, int nbasis, int nt) {
// This kernel computes a batch dot product of Tb (a 3-d array)
// and zc (a 2-d array), the result of the matrix solution of
// M and y, for all templates (nt) in parallel. It results in
// the 2-d model array. Each thread computes an element in the
// output model array. It replaces the python code:
// model = Tb.dot(cupy.array(zc))
//** Args:
// tb = the Tb array, the stacked output from all 3 filters from
// batch_dot_product_sparse, for all redshift templates (nt x nrows x nbasis)
// zc = the zc array, the output of
// zc = cp.linalg.solve(all_M, all_y)
// shape = (nt x nbasis)
// model = the output of the dot product, (nt x nrows)
const int i = blockDim.x*blockIdx.x + threadIdx.x; //thread index i
if (i >= nrows*nt) return;
int it = i / nrows; //target num
int row = i % nrows; //row num
int i_tb = it * nrows * nbasis + row * nbasis; //start index in Tb array
int i_zc = it * nbasis; //start index in zc array
double x = 0; //use local var to accumulate
//compute dot product
for (int j = 0; j < nbasis; j++) {
x += tb[i_tb+j] * zc[i_zc+j];
}
//copy to output
model[i] = x;
}
}
'''
#This is used by original CPU algorithm
#It is called by archeypes so keep for now
[docs]def _zchi2_one(Tb, weights, flux, wflux, zcoeff, solve_matrices_algorithm):
"""Calculate a single chi2.
For one redshift and a set of spectral data, compute the chi2 for template
data that is already on the correct grid.
"""
M = Tb.T.dot(np.multiply(weights[:,None], Tb))
y = Tb.T.dot(wflux)
try:
zcoeff[:] = solve_matrices(M, y, solve_algorithm=solve_matrices_algorithm, use_gpu=False)
except np.linalg.LinAlgError:
return 9e99
except NotImplementedError:
return 9e99
model = Tb.dot(zcoeff)
zchi2 = np.dot( (flux - model)**2, weights )
return zchi2
[docs]def spectral_data(spectra):
"""Compute concatenated spectral data products.
This helper function builds full length array quantities needed for the
chi2 fit.
Args:
spectra (list): list of Spectrum objects.
Returns:
tuple: (weights, flux, wflux) concatenated values used for single
redshift chi^2 fits.
"""
weights = np.concatenate([ s.ivar for s in spectra ])
flux = np.concatenate([ s.flux for s in spectra ])
wflux = weights * flux
return (weights, flux, wflux)
#This is used by original CPU algorithm
#It is called by archeypes so keep for now
[docs]def calc_zchi2_one(spectra, weights, flux, wflux, tdata, solve_matrices_algorithm):
"""Calculate a single chi2.
For one redshift and a set of spectra, compute the chi2 for template
data that is already on the correct grid.
Args:
spectra (list): list of Spectrum objects.
weights (array): concatenated spectral weights (ivar).
flux (array): concatenated flux values.
wflux (array): concatenated weighted flux values.
tdata (dict): dictionary of interpolated template values for each
wavehash.
Returns:
tuple: chi^2 and coefficients.
"""
Tb = list()
nbasis = None
for s in spectra:
key = s.wavehash
if nbasis is None:
nbasis = tdata[key].shape[1]
#print("using ",nbasis," basis vectors", flush=True)
Tb.append(s.Rcsr.dot(tdata[key]))
Tb = np.vstack(Tb)
zcoeff = np.zeros(nbasis, dtype=np.float64)
zchi2 = _zchi2_one(Tb, weights, flux, wflux, zcoeff, solve_matrices_algorithm)
return zchi2, zcoeff
[docs]def per_camera_coeff_with_least_square_batch(target, tdata, weights, flux, wflux, nleg, narch, method=None, n_nbh=None, prior=None, use_gpu=False, bands=None):
"""This function calculates coefficients for archetype mode in each camera using normal linear algebra matrix solver or BVLS (bounded value least square) method
BVLS described in : https://www.stat.berkeley.edu/~stark/Preprints/bvls.pdf
Scipy: https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.lsq_linear.html
Args:
target (object): target object
tdata (dict): template data for model fit for ALL archetypes
weights (array): concatenated spectral weights (ivar).
flux (array): concatenated flux values.
wflux (array): concatenated weighted flux values.
nleg (int): number of Legendre polynomials
narch (int): number of archetypes
method (string): 'PCA', 'BVLS', 'NMF', or 'NNLS' (same as NMF)
n_nbh (int): number of nearest best archetypes
prior (array): prior matrix added to the Legendre coefficients (1/sigma^2)
use_gpu (bool): use GPU or not
bands (list): list of wavelength bands
Returns:
coefficients and chi2
"""
### TODO - implement BVLS on GPU
# number of cameras in DESI: b, r, z
spectra = target.spectra
ncam = len(bands)
nbasis = n_nbh+nleg*ncam # n_nbh : for actual physical archetype(s), nleg: number of legendre polynomials, ncamera: number of cameras
ret_zcoeff = {}
ret_zcoeff['alpha'] = []
if nleg>0:
for b in bands: # bands as save in targets object
ret_zcoeff[b] = []
new_bands = sorted(bands) # saves as correct order
#Setup dict of solver args to pass bounds to solver
method = method.upper()
solver_args = dict()
if (method == 'BVLS'):
#only positive coefficients are allowed for the archetypes
bounds = np.zeros((2, nbasis))
bounds[0][n_nbh:]=-np.inf #constant and slope terms in archetype method (can be positive or negative)
bounds[1] = np.inf
solver_args['bounds'] = bounds
#Use branching options because GPU is faster in batch in 3d
#but due to timing weirdness in numpy, CPU is faster looping over
#narch and calling calc_zchi2_batch one arch at a time.
if (use_gpu):
#Use batch 3d array on GPU with CuPy to calculate new tdata array
for i, hs in enumerate(tdata):
tdata2 = np.zeros_like(tdata[hs], shape=(tdata[hs].shape[0], tdata[hs].shape[1], nbasis))
tdata2[:,:,:n_nbh] = tdata[hs][:,:,:n_nbh]
tdata2[:,:,n_nbh+i*nleg:n_nbh+(i+1)*nleg] = tdata[hs][:,:,n_nbh:]
tdata[hs] = tdata2
(zzchi2, zzcoeff) = calc_zchi2_batch(spectra, tdata, weights, flux, wflux, narch, nbasis, solve_matrices_algorithm=method, solver_args=solver_args, prior=prior, use_gpu=use_gpu)
else:
#Create zzchi2, zcoeff, and tdata2 dict here
tdata2 = dict()
zzchi2 = np.zeros(narch, dtype=np.float64)
zzcoeff = np.zeros((narch, n_nbh+ncam*(nleg)), dtype=np.float64)
#Loop over all arch
for j in range(narch):
#Create a 2d array in each color for tdata2 then add 3rd dimension of rank 1
for i, hs in enumerate(tdata):
tdata2[hs] = np.zeros((tdata[hs].shape[1], nbasis))
for k in range(n_nbh):
tdata2[hs][:,k] = tdata[hs][j,:,k] # these are nearest archetype
if nleg>0:
for k in range(n_nbh, n_nbh+nleg):
tdata2[hs][:,k+nleg*i] = tdata[hs][j,:,k] # Legendre polynomials terms
tdata2[hs] = tdata2[hs][None,:,:]
zzchi2[j], zzcoeff[j] = calc_zchi2_batch(spectra, tdata2, weights, flux, wflux, 1, nbasis, solve_matrices_algorithm=method, solver_args=solver_args, prior=prior, use_gpu=use_gpu)
# saving leading archetype coefficients in correct order
ret_zcoeff['alpha'] = [zzcoeff[:,k] for k in range(n_nbh)] # archetype coefficient(s)
# logic in case nearest neighbour is used
if n_nbh>1:
ret_zcoeff['alpha'] = np.array([ret_zcoeff['alpha'][jj][0] for jj in range(n_nbh)])[None,:]
else:
ret_zcoeff['alpha'] = ret_zcoeff['alpha'][0][:,None]
if nleg>=1:
split_coeff = np.split(zzcoeff[:,n_nbh:], ncam, axis=1) # n_camera = 3
# In target spectra redrock saves values as 'b', 'z', 'r'.
# So just re-ordering them here to 'b', 'r', 'z' for easier reading
old_coeff = {band: split_coeff[i] for i, band in enumerate(bands)}
for band in new_bands:# 3 cameras
ret_zcoeff[band] = old_coeff[band]
coeff = np.concatenate(list(ret_zcoeff.values()), axis=1)
#print(f'{time.time()-start} [sec] took for per camera BVLS method\n')
return zzchi2, coeff
[docs]def batch_dot_product_sparse(spectra, tdata, nz, use_gpu):
"""Calculate a batch dot product of the 3 sparse matrices in spectra
with every template in tdata. Sparse matrix libraries are used
to perform the dot products.
Args:
spectra (list): list of Spectrum objects.
tdata (dict): dictionary of interpolated template values for each
wavehash.
nz (int): number of templates
use_gpu (bool): use GPU or not
Returns:
Tbs (list): dot products of these 3 spectra with all templates
"""
if (use_gpu):
#Use GPU to do dot products in batch
return _batch_dot_product_sparse_gpu(spectra, tdata)
#Need to find shape of output array of batch dot product
nrows = 0
nbasis = None
for key in tdata:
nrows += tdata[key].shape[1]
if (nbasis is None):
nbasis = tdata[key].shape[2]
#Create empty array rather than stacking a list - faster
Tbs = np.empty((nz, nrows, nbasis))
#Loop over all templates
for i in range(nz):
irow = 0
for s in spectra:
key = s.wavehash
curr_tb = s.Rcsr.dot(tdata[key][i,:,:])
#Copy this dot product result into the Tbs array
Tbs[i, irow:irow+curr_tb.shape[0],:] = curr_tb
irow += curr_tb.shape[0]
return Tbs
[docs]def _batch_dot_product_sparse_gpu(spectra, tdata):
"""GPU implementation.
Calculate a batch dot product of the 3 sparse matrices in spectra
with every template in tdata. A CUDA kernel replicates the functionality
of the scipy sparse matrix dot product but done for every template
in parallel so that the kernel is only called once per spectrum.
Args:
spectra (list): list of Spectrum objects.
tdata (dict): dictionary of interpolated template values for each
wavehash.
Returns:
Tbs (cp.array): dot products of these 3 spectra with all templates
"""
# Load CUDA kernel
cp_module = cp.RawModule(code=cuda_source)
batch_dot_product_sparse_kernel = cp_module.get_function('batch_dot_product_sparse')
Tbs = list()
for s in spectra:
key = s.wavehash
#Array dimensions needed by CUDA kernel
nrows = cp.int32(s.Rcsr.shape[1])
ncols = cp.int32(s.Rcsr.shape[0])
nbasis = cp.int32(tdata[key].shape[2])
nt = cp.int32(tdata[key].shape[0])
#Use actual numpy arrays that represent sparse array - .data, .indices, and .indptr
#Use batch_dot_product_sparse kernel to perform dot product in parallel for all templates
#for this (target, spectrum) combination.
#Allocate CUPY arrays and calculate number of blocks to use.
n = tdata[key].size
blocks = (n+block_size-1)//block_size
Rcsr_values = cp.asarray(s.Rcsr.data, cp.float64)
Rcsr_cols = cp.asarray(s.Rcsr.indices, cp.int32)
Rcsr_indptr = cp.asarray(s.Rcsr.indptr, cp.int32)
curr_tb = cp.empty((nt, ncols, nbasis))
#Launch kernel and syncrhronize
batch_dot_product_sparse_kernel((blocks,), (block_size,), (Rcsr_values, Rcsr_cols, Rcsr_indptr, tdata[key], curr_tb, nrows, ncols, nbasis, nt))
#Commented out synchronize - needed for timing kernels but we still
#get execution of this kernel before data is needed for next kernel
#so output is the same and slightly faster without synchronize
#cp.cuda.Stream.null.synchronize()
#Append to list
Tbs.append(curr_tb)
#Use CUPY.hstack to combine into one nt x ncols x nbasis array
Tbs = cp.hstack(Tbs)
#cp.cuda.Stream.null.synchronize()
return Tbs
[docs]def dot_product_sparse_one(spectra, tdata, i):
"""Calculate a dot product of the 3 sparse matrices in spectra
with ONE template in tdata. Sparse matrix libraries are used
to perform the dot products.
Args:
spectra (list): list of Spectrum objects.
tdata (dict): dictionary of interpolated template values for each
wavehash - arrays are 3d (nz x nlambda x nbasis)
i (int): index of this redshift
Returns:
Tb (array): dot products of these 3 spectra with ONE templates
"""
Tb = list()
for s in spectra:
key = s.wavehash
Tb.append(s.Rcsr.dot(tdata[key][i,:,:]))
Tb = np.vstack(Tb)
return Tb
[docs]def calc_batch_dot_product_3d2d(Tbs, zc, use_gpu):
"""Calculate a batch dot product of the 3d array Tbs with the 2d
array zc. The 3-d array shape is A x B x C and the 2-d array
shape is A x C. The resulting output 2-d array shape is A x B.
E.g., for all A a dot product of a 2d array of shape B x C
is performed with a 1-d array of shape C.
These are non-sparse numpy arrays.
Args:
Tbs (array): the stacked output from all 3 filters from
batch_dot_product_sparse, for all redshift templates
(nz x nrows x nbasis)
zc (array): zcoeffs, the 2-d array output of
zc = linalg.solve(all_M, all_y)
(nz x nbasis)
use_gpu (bool): use GPU or not
Returns:
model (array): the output of the dot product, (nz x nrows)
"""
if (use_gpu):
return _calc_batch_dot_product_3d2d_gpu(Tbs, zc)
#Get array dims to reshape model array to 2d
nz = zc.shape[0]
nrows = Tbs[0].shape[0]
model = (Tbs@zc[:, :, None]).reshape((nz, nrows))
return model
[docs]def _calc_batch_dot_product_3d2d_gpu(Tbs, zc):
"""GPU implementation.
Calculate a batch dot product of the 3d array Tbs with the 2d
array zc. The 3-d array shape is A x B x C and the 2-d array
shape is A x C. The resulting output 2-d array shape is A x B.
E.g., for all A a dot product of a 2d array of shape B x C
is performed with a 1-d array of shape C.
These are non-sparse numpy arrays.
Args:
Tbs (array): the stacked output from all 3 filters from
batch_dot_product_sparse, for all redshift templates
(nz x nrows x nbasis)
zc (array): zcoeffs, the 2-d array output of
zc = linalg.solve(all_M, all_y)
(nz x nbasis)
Returns:
model (array): the output of the dot product, (nz x nrows)
"""
#Use batch_dot_product_3d2d kernel to compute model array
# Load CUDA kernel
cp_module = cp.RawModule(code=cuda_source)
batch_dot_product_3d2d_kernel = cp_module.get_function('batch_dot_product_3d2d')
#Array dims needed by CUDA:
nz = zc.shape[0]
nrows = Tbs[0].shape[0]
n = nrows * nz
nbasis = zc.shape[1]
#Allocate CUPY array and calc blocks to be used
blocks = (n+block_size-1)//block_size
model = cp.empty((nz, nrows), cp.float64)
#Launch kernel and synchronize
batch_dot_product_3d2d_kernel((blocks,), (block_size,), (Tbs, zc, model, nrows, nbasis, nz))
#cp.cuda.Stream.null.synchronize()
return model
###!!! NOTE - used in v2 and v3 algorithms as an alternative to straight CuPy
### computation of M and y or the calc_M_y_batch method as a middle ground
### between maximum speed and maximum maintainability.
### This only offloads the computationally expensive dot product itself
### (and optionally the transpose) because the CuPy @ matrix multiplication
### seems to have a bug on Volta architecure GPUs.
### This is the equivalent of M = a @ b
### (Or if transpose_a is true, M = a.swapaxes(-2, -1) @ b)
[docs]def calc_batch_dot_product_3d3d_gpu(a, b, transpose_a=False, fullprecision=True):
"""GPU implementation.
Calculate a batch dot product of the 3d array a with the 3d
array b. The 3-d array shape is A x B x C and the 2-d array
shape is A x C. The resulting output 2-d array shape is A x B.
E.g., for all A a dot product of a 2d array of shape B x C
is performed with a 1-d array of shape C.
These are non-sparse numpy arrays.
Args:
a (array): a 3-d array (nz x ncols x nrows)
In practice, the Tb array, the stacked output from all 3 filters
from batch_dot_product_sparse, for all redshift templates
(nz x nrows x nbasis) which should have its transpose
performed yielding shape (nz x nbasis x nrows).
b (array): another 3-d array (nz x nrows x ncols)
In practice, the Tb array multiplied y weights, shape
(nz x nrows x nbasis)
transpose_a (bool): Whether or not to transpose the a array
before performing the dot product
fullprecision (bool): Whether or not to ensure reproducibly identical
results. The context is that it can be faster to use many
parallel threads to compute a single output array element of the
dot product result, but due to floating point rounding issues,
the random order of the summation can change the result by
order 1e-16 (e.g. a+b+c != c+a+b). When set to true, the
number of parallel threads is decreased to ensure reproducibility
at a trade-off of speed.
Returns:
M (array): the output of the dot product, (nz x ncols x ncols)
such that M[i,:,:] = a[i,:,:].dot(b[i,:,:])
"""
#Use batch_dot_product_3d3d kernel to compute model array
# Load CUDA kernel
import cupy as cp
cp_module = cp.RawModule(code=cuda_source)
batch_dot_product_3d3d_kernel = cp_module.get_function('batch_dot_product_3d3d')
#Array dims needed by CUDA:
nz = cp.int32(a.shape[0])
if (transpose_a):
nrows = cp.int32(a.shape[1])
ncols = cp.int32(a.shape[2])
else:
nrows = cp.int32(a.shape[2])
ncols = cp.int32(a.shape[1])
transpose_a = cp.int32(transpose_a)
nparallel = cp.int32(4)
if (fullprecision):
nparallel = cp.int32(4)
elif (nz <= 512):
#With smaller arrays, use more parallel threads in order to maximize
#parallelism and leverage the full resources of the GPU
nparallel = cp.int32(64)
#Create CUPY arrays and calculate number of blocks
n = nz*ncols*ncols*nparallel
blocks = (n+block_size-1)//block_size
all_M = cp.zeros((nz, ncols, ncols))
#Launch kernel and synchronize
batch_dot_product_3d3d_kernel((blocks,), (block_size,), (a, b, all_M, nrows, ncols, nz, nparallel, transpose_a))
#cp.cuda.Stream.null.synchronize()
return all_M
###!!! NOTE - this is called in the v3 algorithm
### In this version, everything is done in batch on the GPU but the
### templates are looped over on the CPU. The operations performed
### are very obviously analagous though and should be highly
### maintainable. The main difference is the extra loop on the CPU version
[docs]def calc_zchi2_batch(spectra, tdata, weights, flux, wflux, nz, nbasis, solve_matrices_algorithm=None,
solver_args=None, use_gpu=False, fullprecision=True, prior=None):
"""Calculate a batch of chi2.
For many redshifts and a set of spectral data, compute the chi2 for
template data that is already on the correct grid.
Args:
spectra (list): list of Spectrum objects.
tdata (dict): dictionary of interpolated template values for each
wavehash - arrays are 3d (nz x nlambda x nbasis)
weights (array): concatenated spectral weights (ivar).
flux (array): concatenated flux values.
wflux (array): concatenated weighted flux values.
nz (int): number of templates
nbasis (int): nbasis
solve_matrices_algorithm (string): PCA, BLVS, or NMF - the algorithm
used to solve matrix equation
solver_args (dict): Optional args to pass to solver, such as bounds
array for BVLS.
use_gpu (bool): use GPU or not
fullprecision (bool): Whether or not to ensure reproducibly identical
results. See calc_batch_dot_product_3d3d_gpu.
prior (2d array): prior matrix on coefficients (1/sig**2)
Returns:
zchi2 (array): array with one element per redshift for this target
zcoeff (array): array of best fit template coefficients
"""
zchi2 = np.zeros(nz)
if (weights.sum() == 0):
zchi2[:] = 9e99
zcoeff = np.zeros((nz, nbasis))
return (zchi2, zcoeff)
if (use_gpu):
global cp
import cupy as cp
#On the GPU, all operations are batch operations for all templates
#in parallel.
#1) batch_dot_product_sparse will compute dot products of all
#spectra with all templates in batch and return a 3D array of
#size (nz x ncols x nbasis).
Tbs = batch_dot_product_sparse(spectra, tdata, nz, use_gpu)
if (cp_memcheck):
#Free memory on consumer grade GPUs with low resources
mpool = cp.get_default_memory_pool()
mpool.free_all_blocks()
#2) On the GPU, M and y are computed for all templates at once
#CUPY swapaxes is the equivalent of the transpose in CPU mode
#and the @ matrix multiplication operator performs a dot
#product for each template.
###!!! NOTE - there are 3 different options for calculating the
### M and y arrays -
### A) Straight CUPY, which works well on perlmutter with a
### runtime of 6.2s on 1 GPU and 2.0s on 4 GPUs, but is
### unusably slow on Volta generation GPUs (16.8s for only
### 10 targets on a 1660 Super).
### B) calc_M_y_batch, the custom CUDA kernel, which is the
### fastest at 2.9s on 1 GPU and 0.7s on 4 GPUs (and 0.7s
### for 10 targets on a 1660 Super) but is the most difficult
### from a maintenance perspective
### C) Use the calc_batch_dot_product_3d3d_gpu kernel to offload
### only the matrix multiplication for M (and transpose of
### Tbs) but use CUPY for everything else. This strikes a
### middle ground that is very maintainable but removes the
### bottleneck of the CUPY Volta issue. 5.7s on 1 GPU and
### 1.8s on 4 GPUs on Perlmutter; 1.6s for 10 targets on
### 1660 Super.
###!!! NOTE - uncomment the 2 lines below to run (A)
#all_M = Tbs.swapaxes(-2, -1) @ (weights[None, :, None] * Tbs)
#all_y = (Tbs.swapaxes(-2, -1) @ wflux)
###!!! NOTE - uncomment the below line to run (B)
#(all_M, all_y) = calc_M_y_batch(Tbs, weights, wflux, nz, nbasis)
###!!! NOTE - uncomment the 2 lines below to run (C)
all_M = calc_batch_dot_product_3d3d_gpu(Tbs, (weights[None, :, None] * Tbs), transpose_a=True, fullprecision=fullprecision)
all_y = (Tbs.swapaxes(-2, -1) @ wflux)
###!!! NOTE - uncomment the 2 lines below to run an alternative
### version of (C) that does the transpose on the CPU - this seems
### to needlessly waste time though
#all_M = calc_batch_dot_product_3d3d_gpu(cp.ascontiguousarray(Tbs.swapaxes(-2, -1)), (weights[None, :, None] * Tbs))
#all_y = (Tbs.swapaxes(-2, -1) @ wflux)
if prior is not None:
all_M += cp.asarray(prior)
#3) Use new helper method solve_matrices to use appropriate method
#for this template to solve for zcoeff in batch for all_M and all_y.
#There is no Error thrown by cupy's version of linalg.solve so just
#need to catch NotImplementedError.
try:
zcoeff = solve_matrices(all_M, all_y, solve_algorithm=solve_matrices_algorithm,
solver_args=solver_args, use_gpu=True)
except NotImplementedError:
zchi2[:] = 9e99
zcoeff = np.zeros((nz, nbasis))
return (zchi2, zcoeff)
#4) calc_batch_dot_product_3d2d will compute the dot product
#of Tbs and zcoeff for all templates in parallel.
#It is the same as model[i,:,:] = Tbs[i,:,:].dot(zcoeff[i,:])
model = calc_batch_dot_product_3d2d(Tbs, zcoeff, use_gpu)
#5) On the GPU, (flux-model)*(flux-model) is faster than
#(flux-model)**2. The @ matrix multiplication operator performs
#a dot product for each template. get() copies the data back
#from the GPU to the numpy array allocated for zchi2.
zchi2[:] = (((flux - model)*(flux-model)) @ weights).get()
#Copy data from GPU to numpy arrays
zcoeff = zcoeff.get()
if (cp_memcheck):
#Free memory on consumer grade GPUs with low resources
del Tbs
del all_M
del all_y
del model
mpool = cp.get_default_memory_pool()
mpool.free_all_blocks()
else:
zcoeff = np.zeros((nz, nbasis))
#On the CPU, the templates are looped over and all operations
#are performed on one template at a time.
for i in range(nz):
#1) dot_product_sparse_one will compute dot products of all
#spectra with ONE template and return a 2D array of size
#(ncols x nbasis)
Tb = dot_product_sparse_one(spectra, tdata, i)
#2) On the CPU, M and y are computed for each template
M = Tb.T.dot(np.multiply(weights[:,None], Tb))
y = Tb.T.dot(wflux)
if prior is not None:
M += prior
#3) Use new helper method solve_matrices to use appropriate method
#for this template to solve for zcoeff for each M, y.
#Catch LinAlgError and NotImplementedError
try:
zcoeff[i,:] = solve_matrices(M, y, solve_algorithm=solve_matrices_algorithm, solver_args=solver_args, use_gpu=False)
except np.linalg.LinAlgError:
zchi2[i] = 9e99
continue
except NotImplementedError:
zchi2[i] = 9e99
continue
#4) Calculate dot products individually for each template
model = Tb.dot(zcoeff[i,:])
#5) Calculate this zchi2 element individually for each template
zchi2[i] = np.dot( (flux - model)**2, weights )
return (zchi2, zcoeff)
[docs]def calc_negOII_penalty(OIItemplate, coeff):
"""return penalty term for model having negative [OII] flux
Args:
OIItemplate[nwave, nbasis]: portion of template around [OII] doublet
coeff[nz, nbasis]: coefficients from full template fit to data
Returns:
penalty[nz]: penalty per redshift bin
"""
#- evaluate template and sum over [OII] doublet
OIIflux = np.sum(coeff @ OIItemplate, axis=1)
#- Linear penalty with negative [OII]; no mathematical basis but
#- pragmatically works ok
penalty = -OIIflux * (OIIflux < 0)
return penalty
###!!! NOTE - this is the main method for the v3 algorithm
### In this version, everything is done in batch on the GPU but the
### templates are looped over on the CPU. The operations performed
### are very obviously analagous though and should be highly
### maintainable. The main difference is the extra loop on the CPU version
[docs]def calc_zchi2(target_ids, target_data, dtemplate, progress=None, use_gpu=False):
"""Calculate chi2 vs. redshift for a given PCA template.
New CPU/GPU algorithms June 2022
Args:
target_ids (list): targets IDs.
target_data (list): list of Target objects.
dtemplate (DistTemplate): distributed template data
progress (multiprocessing.Queue): optional queue for tracking
progress, only used if MPI is disabled.
Returns:
tuple: (zchi2, zcoeff, zchi2penalty) with:
- zchi2[ntargets, nz]: array with one element per target per
redshift
- zcoeff[ntargets, nz, ncoeff]: array of best fit template
coefficients for each target at each redshift
- zchi2penalty[ntargets, nz]: array of penalty priors per target
and redshift, e.g. to penalize unphysical fits
"""
if (use_gpu):
#Use global to import cupy here - it will not be needed to be imported
#in any other method
global cp
import cupy as cp
#Get CUDA device and check available memory.
#If < 3 GB free set cp_memcheck
d = cp.cuda.Device()
if (d.mem_info[0] < 3*1024*1024*1024):
global cp_memcheck
cp_memcheck = True
nz = len(dtemplate.local.redshifts)
ntargets = len(target_ids)
nbasis = dtemplate.template.nbasis
zchi2 = np.zeros( (ntargets, nz) )
zchi2penalty = np.zeros( (ntargets, nz) )
zcoeff = np.zeros( (ntargets, nz, nbasis) )
## Redshifted templates are now already in format needed - dict of 3d
# arrays (CUPY or numpy).
tdata = dtemplate.local.data
for j in range(ntargets):
if (use_gpu):
#Use new helper method gpu_spectral_data() that will copy to GPU
#on first access and recall data in subsequent calls instead of
#copying every time
(weights, flux, wflux) = target_data[j].gpu_spectral_data()
else:
#Use spectral_data() to get numpy arrays
(weights, flux, wflux) = spectral_data(target_data[j].spectra)
if np.sum(weights) == 0:
zchi2[j,:] = 9e99
#Update progress for multiprocessing!!
if dtemplate.comm is None and progress is not None:
progress.put(1)
continue
# Solving for template fit coefficients for all redshifts.
# We use the pre-interpolated templates for each
# unique wavelength range.
# Use helper method calc_zchi2_batch to calculate zchi2 and zcoeff
# for all templates for all three spectra for this target
# For coarse z scan, use fullprecision = False to maximize speed
(zchi2[j,:], zcoeff[j,:,:]) = calc_zchi2_batch(
target_data[j].spectra, tdata, weights, flux, wflux, nz, nbasis,
dtemplate.template.solve_matrices_algorithm, use_gpu=use_gpu,
fullprecision=False)
#- Galaxy templates penalize chi2 for negative [OII] flux; ad-hoc
if hasattr(dtemplate.template, 'OIItemplate'):
zchi2penalty[j] = calc_negOII_penalty(dtemplate.template.OIItemplate, zcoeff[j])
if dtemplate.comm is None and progress is not None:
progress.put(1)
return zchi2, zcoeff, zchi2penalty
[docs]def _mp_calc_zchi2(indx, target_ids, target_data, t, use_gpu, qout, qprog):
"""Wrapper for multiprocessing version of calc_zchi2.
"""
try:
# Unpack targets from shared memory
for tg in target_data:
tg.sharedmem_unpack()
tzchi2, tzcoeff, tpenalty = calc_zchi2(target_ids, target_data, t,
use_gpu=use_gpu, progress=qprog)
qout.put( (indx, tzchi2, tzcoeff, tpenalty) )
except:
exc_type, exc_value, exc_traceback = sys.exc_info()
lines = traceback.format_exception(exc_type, exc_value, exc_traceback)
lines = [ "MP calc_zchi2: {}".format(x) for x in lines ]
print("".join(lines))
sys.stdout.flush()
[docs]def calc_zchi2_targets(targets, templates, mp_procs=1, use_gpu=False):
"""Compute all chi2 fits for the local set of targets and collect.
Given targets and templates distributed across a set of MPI processes,
compute the coarse-binned chi^2 fit for all redshifts and our local set
of targets. Each process computes the fits for a slice of redshift range
and then cycles through redshift slices by passing the interpolated
templates along to the next process in order.
Args:
targets (DistTargets): distributed targets.
templates (list): list of DistTemplate objects.
mp_procs (int): if not using MPI, this is the number of multiprocessing
processes to use.
gpu (bool): (optional) use gpu for calc_zchi2
Returns:
dict: dictionary of results for each local target ID.
"""
# Find most likely candidate redshifts by scanning over the
# pre-interpolated templates on a coarse redshift spacing.
# See if we are the process that should be printing stuff...
am_root = False
if targets.comm is None:
am_root = True
elif targets.comm.rank == 0:
am_root = True
# If we are not using MPI, our DistTargets object will have all the targets
# on the main process. In that case, we would like to distribute our
# targets across multiprocesses. Here we compute that distribution, if
# needed.
mpdist = None
if targets.comm is None:
mpdist = distribute_targets(targets.local(), mp_procs)
results = dict()
for tid in targets.local_target_ids():
results[tid] = dict()
if am_root:
print("Computing redshifts")
sys.stdout.flush()
for t in templates:
ft = t.template.full_type
if am_root:
print(" Scanning redshifts for template {}"\
.format(t.template.full_type))
sys.stdout.flush()
start = elapsed(None, "", comm=targets.comm)
# There are 2 parallelization techniques supported here (MPI and
# multiprocessing).
zchi2 = None
zcoeff = None
penalty = None
if targets.comm is not None or mp_procs == 1:
# MPI case.
# The following while-loop will cycle through the redshift slices
# (one per MPI process) until all processes have computed the chi2
# for all redshifts for their local targets.
if am_root:
sys.stdout.write(" Progress: {:3d} %\n".format(0))
sys.stdout.flush()
zchi2 = dict()
zcoeff = dict()
penalty = dict()
mpi_prog_frac = 1.0
prog_chunk = 10
if t.comm is not None:
mpi_prog_frac = 1.0 / t.comm.size
if t.comm.size < prog_chunk:
prog_chunk = 100 // t.comm.size
proglast = 0
prog = 1
done = False
#CW 04/25/22 - when running in GPU mode, all non-GPU procs should
#have 0 targets. Set done to true in these cases so it skips the
#while loop - this saves ~2s on 500 targets on 64 CPU / 4 GPU
#and no need to call calc_zchi2 on empty target list
if (use_gpu and len(targets.local_target_ids()) == 0):
done = True
while not done:
# Compute the fit for our current redshift slice.
tzchi2, tzcoeff, tpenalty = \
calc_zchi2(targets.local_target_ids(), targets.local(), t, use_gpu=use_gpu)
# Save the results into a dict keyed on targetid
tids = targets.local_target_ids()
for i, tid in enumerate(tids):
if tid not in zchi2:
zchi2[tid] = {}
zcoeff[tid] = {}
penalty[tid] = {}
zchi2[tid][t.local.index] = tzchi2[i]
zcoeff[tid][t.local.index] = tzcoeff[i]
penalty[tid][t.local.index] = tpenalty[i]
prg = int(100.0 * prog * mpi_prog_frac)
if prg >= proglast + prog_chunk:
proglast += prog_chunk
if am_root and (t.comm is not None):
sys.stdout.write(" Progress: {:3d} %\n"\
.format(proglast))
sys.stdout.flush()
prog += 1
# Cycle through the redshift slices
done = t.cycle()
for tid in zchi2.keys():
zchi2[tid] = np.concatenate([ zchi2[tid][p] for p in sorted(zchi2[tid].keys()) ])
zcoeff[tid] = np.concatenate([ zcoeff[tid][p] for p in sorted(zcoeff[tid].keys()) ])
penalty[tid] = np.concatenate([ penalty[tid][p] for p in sorted(penalty[tid].keys()) ])
else:
# Multiprocessing case.
import multiprocessing as mp
# Ensure that all targets are packed into shared memory
for tg in targets.local():
tg.sharedmem_pack()
# We explicitly spawn processes here (rather than using a pool.map)
# so that we can communicate the read-only objects once and send
# a whole list of redshifts to each process.
qout = mp.Queue()
qprog = mp.Queue()
procs = list()
for i in range(mp_procs):
if len(mpdist[i]) == 0:
continue
target_ids = mpdist[i]
target_data = [ x for x in targets.local() if x.id in mpdist[i] ]
p = mp.Process(target=_mp_calc_zchi2,
args=(i, target_ids, target_data, t, use_gpu, qout, qprog))
procs.append(p)
p.start()
# Track progress
sys.stdout.write(" Progress: {:3d} %\n".format(0))
sys.stdout.flush()
ntarget = len(targets.local_target_ids())
progincr = 10
if mp_procs > ntarget:
progincr = int(100.0 / ntarget)
tot = 0
proglast = 0
while (tot < ntarget):
cnt = qprog.get()
tot += cnt
prg = int(100.0 * tot / ntarget)
if prg >= proglast + progincr:
proglast += progincr
sys.stdout.write(" Progress: {:3d} %\n".format(proglast))
sys.stdout.flush()
# Extract the output
zchi2 = dict()
zcoeff = dict()
penalty = dict()
for _ in range(len(procs)):
res = qout.get()
tids = mpdist[res[0]]
for j,tid in enumerate(tids):
zchi2[tid] = res[1][j]
zcoeff[tid] = res[2][j]
penalty[tid] = res[3][j]
elapsed(start, " Finished in", comm=targets.comm)
for tid in sorted(zchi2.keys()):
results[tid][ft] = dict()
results[tid][ft]['redshifts'] = t.template.redshifts
results[tid][ft]['zchi2'] = zchi2[tid]
results[tid][ft]['penalty'] = penalty[tid]
results[tid][ft]['zcoeff'] = zcoeff[tid]
return results
[docs]def solve_matrices(M, y, solve_algorithm="PCA", solver_args=None, use_gpu=False):
"""Solve the matrix equation y = M*x for the unknown x using the
specified algorithm. The default is to use PCA via numpy or cupy
linalg.solve. But non-negative matrix factorization (NMF) and other
algorithms may be integrated here and selected based on the template.
Args:
M (array): 2d array on CPU; 3d array on GPU for all redshifts
y (array): 1d array on CPU; 2d array on GPU for all redshifts
solve_algorithm (string): which algorithm to use
solver_args (dict): Optional args to pass to solver, such as bounds
array for BVLS.
use_gpu (bool): use GPU or not
Returns:
x (array): solution to y = M*x
(1-d array on CPU, 2-d array on GPU)
Raises:
LinAlgError is passed up if raised by np.linalg.solve
NotImplementedError if algorithm is not implemented or undefined
"""
if solve_algorithm.upper() == "PCA":
#Use PCA via linalg.solve in either numpy or cupy
if (use_gpu):
#Use cupy linalg.solve to solve for zcoeff in batch for all_M and
#all_y where all_M and all_y are 3d and 2d arrays representing
#M and y at every redshift bin for the given template.
#There is no Error thrown by cupy's version.
return cp.linalg.solve(M, y)
else:
#Use numpy linalg.solve which throws exception
try:
return np.linalg.solve(M, y)
except np.linalg.LinAlgError:
raise
elif solve_algorithm in ("NMF", "NNLS"):
if (use_gpu):
nz = y.shape[0]
Mcpu = M.get()
ycpu = y.get()
zcoeff = np.zeros(y.shape)
#Copy to CPU, run scipy.optimize.nnls, copy back to GPU
for j in range(nz):
try:
res = nnls(Mcpu[j,:,:], ycpu[j,:])
zcoeff[j,:] = res[0]
except Exception:
zcoeff[j,:] = 9e99
return cp.asarray(zcoeff)
else:
try:
res = nnls(M, y)
zcoeff = res[0]
except Exception:
raise np.linalg.LinAlgError
return zcoeff
elif solve_algorithm == "BVLS":
if (solver_args is not None and 'bounds' in solver_args):
bounds = solver_args['bounds']
else:
nbasis = y.shape[-1]
bounds = np.zeros((2, nbasis))
bounds[0]=-np.inf
bounds[1]=np.inf
if (use_gpu):
nz = y.shape[0]
Mcpu = M.get()
ycpu = y.get()
zcoeff = np.zeros(y.shape)
#Copy to CPU, run scipy.optimize.lsq_linear, copy back to GPU
for j in range(nz):
try:
res = lsq_linear(Mcpu[j,:,:], ycpu[j,:], bounds=bounds, method='bvls')
zcoeff[j,:] = res.x
except np.linalg.LinAlgError:
zcoeff[j,:] = 9e99
return cp.asarray(zcoeff)
else:
try:
res = lsq_linear(M, y, bounds=bounds, method='bvls')
zcoeff = res.x
except np.linalg.LinAlgError:
raise
return zcoeff
else:
raise NotImplementedError("The solve_algorithm "+solve_algorithm+" is not implemented.")