Source code for redrock.targets

"""
redrock.targets
===============

Classes and functions for targets and their spectra.
"""

from __future__ import absolute_import, division, print_function


import numpy as np
import scipy.sparse

from .utils import mp_array, distribute_work, reduced_wavelength

from . import constants

[docs]class Spectrum(object): """Simple container class for an individual spectrum. Args: wave (array): the wavelength grid. flux (array): the flux values. ivar (array): the inverse variance. R (scipy.sparse.dia_matrix): the resolution matrix in band diagonal format. Rcsr (scipy.sparse.csr_matrix): the resolution matrix in CSR format. band (str): camera band name """ # @profile def __init__(self, wave, flux, ivar, R, Rcsr=None, band=None): if R is not None: w = np.asarray(R.sum(axis=1))[:,0]<constants.min_resolution_integral ivar[w] = 0. self.nwave = wave.size self.wave = wave self.flux = flux self.ivar = ivar self.R = R self._Rcsr = Rcsr self._mpshared = False # "wavehash" tracks the different wavelength grids for different spectra # if "band" is provided, use that, otherwise hash elements of the wave array self.band = band if band is not None: self.wavehash = band else: if hasattr(R,'data'): self.wavehash = hash((len(wave), wave[0], wave[1], wave[-2], wave[-1], R.data.shape[0])) else: self.wavehash = hash((len(wave), wave[0], wave[1], wave[-2], wave[-1])) #It is much more efficient to calculate edges once if rebinning #and copy to GPU and store here rather than doing this every time self._gpuwave = None self._gpuedges = None #Min and max edges are stored as CPU scalars to avoid excessive #GPU to CPU copying self.minedge = None self.maxedge = None @property def gpuwave(self): if (self._gpuwave is None): #Copy to GPU once import cupy as cp self._gpuwave = cp.asarray(self.wave) return self._gpuwave @property def gpuedges(self): if (self._gpuedges is None): #Compute edges, minedge, and maxedge, and copy to GPU once import cupy as cp from .rebin import centers2edges edges = centers2edges(self.wave) self.minedge = edges[0] self.maxedge = edges[-1] self._gpuedges = cp.asarray(edges) return self._gpuedges @property def Rcsr(self): self.sharedmem_unpack() if self._Rcsr is None: self._Rcsr = self.R.tocsr() return self._Rcsr
[docs] def sharedmem_pack(self): """Pack spectral data into multiprocessing shared memory. """ if not self._mpshared: # Store data in multiprocessing shared memory self.wave = mp_array(self.wave) self.flux = mp_array(self.flux) self.ivar = mp_array(self.ivar) self._ndiag = self.R.data.shape[0] self._splen = self.R.data.shape[1] self.R_offsets = mp_array(self.R.offsets) self.R_data = mp_array(self.R.data) del self.R if self._Rcsr is not None: self._csrshape = self._Rcsr.shape self.Rcsr_indices = mp_array(self._Rcsr.indices) self.Rcsr_indptr = mp_array(self._Rcsr.indptr) self.Rcsr_data = mp_array(self._Rcsr.data) del self._Rcsr else: self.Rcsr_data = None self._mpshared = True return
[docs] def sharedmem_unpack(self): """Unpack spectral data from multiprocessing shared memory. """ if self._mpshared: self.wave = np.array(self.wave) self.flux = np.array(self.flux) self.ivar = np.array(self.ivar) self.R = scipy.sparse.dia_matrix((np.array(self.R_data), np.array(self.R_offsets)), shape=(self._splen, self._splen)) del self.R_data del self.R_offsets if self.Rcsr_data is not None: self._Rcsr = scipy.sparse.csr_matrix((np.array(self.Rcsr_data), np.array(self.Rcsr_indices), np.array(self.Rcsr_indptr)), shape=self._csrshape) del self.Rcsr_data del self.Rcsr_indices del self.Rcsr_indptr else: self._Rcsr = None self._mpshared = False return
[docs]class Target(object): """A single target. This represents the data for a single target, including a unique identifier and the individual spectra observed for this object (or a coadd). Args: targetid (int or str): unique targetid spectra (list): list of Spectrum objects coadd (bool): compute and store the coadd at construction time. The coadd can always be recomputed with the compute_coadd() method. cosmics_nsig (float): cosmic rejection threshold in compute_coadd meta (dict): optional metadata dictionary for this Target. """ def __init__(self, targetid, spectra, coadd=False, cosmics_nsig=0., meta=None): self.id = targetid self.spectra = spectra if meta is None: self.meta = dict() else: self.meta = meta if coadd: self.compute_coadd(cache_Rcsr=False, cosmics_nsig=cosmics_nsig) #It is much more efficient to copy weights, flux, wflux to GPU once #and store here rather than doing this every time self.gpuweights = None self.gpuflux = None self.gpuwflux = None self._legendre = None self._gpulegendre = None self.nleg = 0 self.bands = [] for s in spectra: self.bands.append(s.band)
[docs] def eval_model(self, bestfit, templates, archetypes=None, nleg=None): """ Evaluate the best fit model for the spectra in this Target. Args: bestfit: dict-like with SPECTYPE, SUBTYPE, COEFF, and Z templates: dict[(SPECTYPE,SUBTYPE)] of Template objects Options: archetypes: dict[(SPECTYPE,SUBTYPE)] of Archetype objects Returns ``model`` dict of rendered best fit models keyed by wavehash Note: modifies self by setting self.model to be returned value. Example after calling target.eval_model:: spec = target.spectra[i] wavelengths = spec.wave fluxdata = spec.flux wavehash = spec.wavehash fluxmodel = target.model[wavehash] """ spectype = bestfit['SPECTYPE'] subtype = bestfit['SUBTYPE'] coeff = bestfit['COEFF'] z = bestfit['Z'] fitmethod = bestfit['FITMETHOD'] self.model = dict() if np.all(coeff == 0.0): # entirely masked data has coeffs=0 for sp in self.spectra: self.model[sp.wavehash] = np.zeros(len(sp.wave), dtype=np.float32) elif fitmethod == 'ARCH': #- Archetype fits have N nearest neighbor archetype coeffs #- followed by ncamera*nlegendre coefficients #- import here to avoid circular import from .archetypes import split_archetype_coeff archcoeff, legcoeff = split_archetype_coeff(subtype, coeff, len(self.bands)) ax = archetypes[spectype] for i, sp in enumerate(self.spectra): self.model[sp.wavehash] = ax.eval(subtype, archcoeff, sp.wave, z, R=sp.Rcsr, legcoeff=legcoeff[i]) else: tx = templates[(spectype, subtype)] for sp in self.spectra: self.model[sp.wavehash] = tx.eval(coeff, sp.wave, z, sp.Rcsr) return self.model
def legendre(self, nleg, use_gpu=False): if (use_gpu and self.nleg == nleg): if (self._gpulegendre is not None): return self._gpulegendre elif (self._legendre is not None): import cupy as cp self._gpulegendre = { hs:cp.asarray(self._legendre[hs]) for hs in self._legendre } return self._gpulegendre if (self._legendre is not None and self.nleg == nleg): return self._legendre dwave = { s.wavehash:s.wave for s in self.spectra } self._legendre = { hs:np.array([scipy.special.legendre(i)(reduced_wavelength(w)) for i in range(nleg)]) for hs, w in dwave.items() } self.nleg = nleg if (use_gpu): import cupy as cp self._gpulegendre = { hs:cp.asarray(self._legendre[hs]) for hs in self._legendre } return self._gpulegendre return self._legendre def gpu_spectral_data(self): if self.gpuweights is None: #Assemble arrays and copy to GPU once import cupy as cp self.gpuweights = cp.concatenate([cp.asarray(s.ivar) for s in self.spectra]) self.gpuflux = cp.concatenate([cp.asarray(s.flux) for s in self.spectra]) self.gpuwflux = self.gpuweights * self.gpuflux return (self.gpuweights, self.gpuflux, self.gpuwflux) ### @profile
[docs] def compute_coadd(self, cache_Rcsr=False, cosmics_nsig=0.): """Compute the coadd from the current spectra list. Args: cache_Rcsr: pre-calculate and cache sparse CSR format of resolution matrix R cosmics_nsig (float): number of sigma for cosmic rejection. This method REPLACES the list of individual spectra with coadds. """ # preserve order of wavehashes wavehashes = list() for sp in self.spectra: if sp.wavehash not in wavehashes: wavehashes.append(sp.wavehash) coadd = list() for key in wavehashes: wave = None unweightedflux = None weightedflux = None weights = None Rdiags = None offsets = None nspec = 0 flux=[] # references ivar=[] # references grad=[] # gradients, copy gradvar=[] for s in self.spectra: if s.wavehash != key: continue nspec += 1 current_band = s.band if wave is None : wave = s.wave Rdiags = s.R.data * s.ivar offsets = s.R.offsets else : assert len(s.wave) == len(wave) Rdiags += s.R.data * s.ivar flux.append(s.flux) ivar.append(s.ivar) if cosmics_nsig > 0 : # interpolate over bad measurements # to be able to compute gradient next # to a bad pixel and identify oulier # many cosmics residuals are on edge # of cosmic ray trace, and so can be # next to a masked flux bin good = (s.ivar>0) bad = (s.ivar==0) tflux = s.flux.copy() tivar = s.ivar.copy() tflux[bad] = np.interp(wave[bad],wave[good],s.flux[good]) tivar[bad] = np.interp(wave[bad],wave[good],s.ivar[good]) bad = (tivar<=0) tivar[bad]=np.min(tivar[tivar>0]) # compute a simple gradient tvar = 1/tivar tflux[1:] = tflux[1:]-tflux[:-1] tvar[1:] = tvar[1:]+tvar[:-1] tflux[0] = 0 grad.append(tflux) gradvar.append(tvar) flux=np.array(flux) ivar=np.array(ivar) if len(grad)>1 and cosmics_nsig > 0 : # detect outliers by comparing spectra grad=np.array(grad) gradivar=1/np.array(gradvar) nspec=grad.shape[0] meangrad=np.sum(gradivar*grad,axis=0)/np.sum(gradivar) deltagrad=grad-meangrad chi2=np.sum(gradivar*deltagrad**2,axis=0)/(nspec-1) jj=np.where(chi2>cosmics_nsig**2)[0] for j in jj : k=np.argmax(gradivar[:,j]*deltagrad[:,j]**2) #k=np.argmax(flux[:,j]) #print("masking spec",k,"wave=",wave[j],"flux=",flux[k,j]) ivar[k][j]=0. unweightedflux = np.sum(flux,axis=0) weights = np.sum(ivar,axis=0) weightedflux = np.sum(ivar*flux,axis=0) isbad = (weights == 0) flux = weightedflux / (weights + isbad) flux[isbad] = unweightedflux[isbad] / nspec Rdiags /= (weights + isbad) nwave = Rdiags.shape[1] R = scipy.sparse.dia_matrix((Rdiags, offsets), shape=(nwave, nwave)) if cache_Rcsr: Rcsr = R.tocsr() else: Rcsr = None spc = Spectrum(wave, flux, weights, R, Rcsr, band=current_band) coadd.append(spc) # swap the coadds into place. self.spectra = coadd return
[docs] def sharedmem_pack(self): """Pack all spectra into multiprocessing shared memory. """ for s in self.spectra: s.sharedmem_pack() return
[docs] def sharedmem_unpack(self): """Unpack all spectra from multiprocessing shared memory. """ for s in self.spectra: s.sharedmem_unpack() return
[docs]class DistTargets(object): """Base class for distributed targets. Target objects are distributed across the processes in an MPI communicator, but the details of how this data is loaded from disk is specific to a given project. Each project should inherit from this base class and create an appropriate class for the data files being used. This class defines some general methods and the API that should be followed by these derived classes. Args: targetids (list): the global set of target IDs. comm (mpi4py.MPI.Comm): (optional) the MPI communicator. """ def __init__(self, targetids, comm=None): self._comm = comm self._targetids = targetids self._dwave = None @property def comm(self): return self._comm @property def all_target_ids(self): return self._targetids def _local_target_ids(self): raise NotImplementedError("You should not instantiate a DistTargets " "object directly")
[docs] def local_target_ids(self): """Return the local list of target IDs. """ return self._local_target_ids()
def _local_data(self): raise NotImplementedError("You should not instantiate a DistTargets " "object directly")
[docs] def local(self): """Return the local list of Target objects. """ return self._local_data()
[docs] def eval_models(self, bestfit, templates, archetypes=None): """ Calls target.eval_model(bestfit, templates, archetypes) for each local target Args: bestfit: dict-like with SPECTYPE, SUBTYPE, COEFF, and Z templates: dict[(SPECTYPE,SUBTYPE)] of Template objects Options: archetypes: dict[(SPECTYPE,SUBTYPE)] of Archetype objects Returns ``models`` list of dictionaries, with one item per local target, and dictionaries keyed by the wavehashes of the spectra for that target. See Target.eval_model for more details. Note: sets target.model for each local target """ models = list() for tgt in self.local(): i = np.where(bestfit['TARGETID'] == tgt.id)[0][0] model = tgt.eval_model(bestfit[i], templates, archetypes) models.append(model) return models
[docs] def wavegrids(self): """Return the global dictionary of wavelength grids for each wave hash. """ if self._dwave is None: my_dwave = dict() for t in self.local(): for s in t.spectra: if s.wavehash not in my_dwave: my_dwave[s.wavehash] = s.wave.copy() if self._comm is None: self._dwave = my_dwave.copy() else: temp = self._comm.allgather(my_dwave) self._dwave = dict() for pdata in temp: for k, v in pdata.items(): if k not in self._dwave: self._dwave[k] = v.copy() del temp del my_dwave return self._dwave
[docs]def distribute_targets(targets, nproc): """Distribute a list of targets among processes. Given a list of Target objects, compute the load balanced distribution of those targets among a set of processes. This function is used when one already has a list of Target objects that need to be distributed. This happens, for example, when creating a DistTargetsCopy object from pre-existing Targets, or when using multiprocessing to do operations on the MPI-local list of targets. Args: targets (list): list of Target objects. nproc (int): number of processes. Returns: list: A list (one element for each process) with each element being a list of the target IDs assigned to that process. """ # We weight each target by the number of spectra. ids = list() tweights = dict() for tg in targets: ids.append(tg.id) tweights[tg.id] = len(tg.spectra) return distribute_work(nproc, ids, weights=tweights)
[docs]class DistTargetsCopy(DistTargets): """Distributed targets built from a copy. This class is a simple wrapper that distributes targets located on one process to the processes in a communicator. Args: targets (list): list of Target objects on one process. comm (mpi4py.MPI.Comm): (optional) the MPI communicator. root (int): the process which has the input targets locally. """ def __init__(self, targets, comm=None, root=0): comm_size = 1 comm_rank = 0 if comm is not None: comm_size = comm.size comm_rank = comm.rank self._alltargetids = list() if comm_rank == root: for tg in targets: self._alltargetids.append(tg.id) if comm is not None: self._alltargetids = comm.bcast(self._alltargetids, root=root) # Distribute the targets among process weighted by the amount of work # to do for each target. self._proc_targets = distribute_targets(targets, comm_size) self._my_targets = self._proc_targets[comm_rank] # Distribute targets from the root process to the others self._my_data = None if comm is None: self._my_data = targets else: tbuf = dict() for tg in targets: recv = comm.bcast(tg, root=root) if recv.id in self._my_targets: tbuf[recv.id] = recv self._my_data = [ tbuf[x] for x in self._my_targets ] super(DistTargetsCopy, self).__init__(self._alltargetids, comm=comm) def _local_target_ids(self): return self._my_targets def _local_data(self): return self._my_data