Source code for redrock.zfind

"""
redrock.zfind
=============

Redshift finding algorithms.
"""

from __future__ import division, print_function

import re
import sys
import traceback

import numpy as np

import astropy.table

from . import constants

from .utils import elapsed

from .targets import distribute_targets

from .templates import parse_fulltype

from .archetypes import All_archetypes

from .priors import Priors

from .results import read_zscan_redrock

from .zscan import calc_zchi2_targets

from .fitz import fitz, get_dv

from .zwarning import ZWarningMask as ZW
from .zwarning import badfit_mask

[docs]def sort_dict_by_col(d, colname): """Sort a dict of np.ndarrays by one key. Replacement for astropy.Table.sort """ if (not colname in d): raise KeyError('Key '+str(colname)+' is not in dictionary') for k in d.keys(): if (type(d[k]) is not np.ndarray): raise ValueError('Column '+str(k)+' is not an np.array') idx = d[colname].argsort(0).flatten() for k in d.keys(): d[k] = d[k][idx] return
[docs]def sort_dict_by_cols(d, colnames, sort_first_column_first = True): """Sort a dict of np.ndarrays by multiple keys Replacement for astropy.Table.sort Args: d : a dictionary colnames : a tuple or list of column names sort_first_column_first : boolean - np.lexsort((a,b)) will sort by b first and then sort by a. This is the opposite of astropy.Table.sort behavior. Setting this to true will ensure that sort_dict_by_cols(d, ('a','b')) will result in sorting by a first and then b. """ for c in colnames: if (not c in d): raise KeyError('Key '+str(c)+' is not in dictionary') for k in d.keys(): if (type(d[k]) is not np.ndarray): raise ValueError('Column '+str(k)+' is not an np.array') valsToSort = () for c in colnames: valsToSort += (d[c],) if (sort_first_column_first): #Reverse order of valsToSort valsToSort = valsToSort[::-1] idx = np.lexsort(valsToSort, axis=0).flatten() for k in d.keys(): d[k] = d[k][idx] return
[docs]def _mp_fitz(chi2, target_data, t, nminima, qout, archetype, use_gpu, deg_legendre, zminfit_npoints, per_camera, n_nearest, prior_sigma): """Wrapper for multiprocessing version of fitz. """ try: # Unpack targets from shared memory for tg in target_data: tg.sharedmem_unpack() results = list() for i, tg in enumerate(target_data): zfit = fitz(chi2[i], t.template.redshifts, tg, t.template, nminima=nminima, archetype=archetype, use_gpu=use_gpu, zminfit_npoints=zminfit_npoints, per_camera=per_camera, deg_legendre=deg_legendre, n_nearest=n_nearest, prior_sigma=prior_sigma) results.append( (tg.id, zfit) ) qout.put(results) except: exc_type, exc_value, exc_traceback = sys.exc_info() lines = traceback.format_exception(exc_type, exc_value, exc_traceback) lines = [ "MP calc_zchi2: {}".format(x) for x in lines ] print("".join(lines)) sys.stdout.flush()
[docs]def calc_deltachi2(chi2, z, zwarn, dvlimit=constants.max_velo_diff): ''' Calculate chi2 differences, excluding candidates with close z or bad fits Args: chi2 : array of chi2 values z : array of redshifts zwarn : array of zwarn values Options: dvlimit: exclude candidates that are closer than dvlimit [km/s] Returns (deltachi2, setzwarn) where `deltachi2` is array of chi2 differences to next best good fit, and `setzwarn` is boolean array of whether a SMALL_DELTACHI2 zwarn bit should be set. Note: The final target always has deltachi2=0.0 because we don't know what the next chi2 would have been. This can also occur for the last N targets if all N of them are within dvlimit of each other. ''' nz = len(chi2) deltachi2 = np.zeros(nz) okfit = (zwarn & badfit_mask) == 0 for i in range(len(chi2)-1): dv = get_dv(z[i+1:], z[i]) ii = (np.abs(dv)>dvlimit) & okfit[i+1:] if np.any(ii): dchi2 = chi2[i+1:] - chi2[i] deltachi2[i] = np.min(dchi2[ii]) #- zwarn SMALL_DELTA_CHI2 is based upon small difference to any good fit, #- including a slightly better one noti = np.ones(nz, dtype=bool) setzwarn = np.zeros(nz, dtype=bool) for i in range(nz): noti[:] = True noti[i] = False alldeltachi2 = np.absolute(chi2[noti] - chi2[i]) alldv = np.absolute(get_dv(z=z[noti], zref=z[i])) zwarn = np.any( okfit[noti] & (alldeltachi2 < constants.min_deltachi2) & (alldv >= dvlimit) ) if zwarn: setzwarn[i] = True return deltachi2, setzwarn
[docs]def _rebalance_after_scan(targets, results): """Helper for rebalancing targets and results after lopsided zscan """ # gather lopsided results and targets on rank 0 for rebalancing results = targets.comm.gather(results, root=0) lopsided_targets = targets.comm.gather(targets.local(), root=0) if targets.comm.rank == 0: # Flatten lopsided distributed targets (list of list of targets) flattened_targets = [t for sl in lopsided_targets for t in sl] # Split targets into approximately equal lengths sublists ix = np.array_split(np.arange(len(flattened_targets)), len(lopsided_targets)) #dist_targets = [flattened_targets[i[0]:i[0] + len(i)] for i in ix] #Less elegantly create a list of lists, appending empty lists where #len(targets) == 0 in the case of MPI ranks > ntargets dist_targets = [] for i in ix: if len(i) == 0: dist_targets.append([]) else: dist_targets.append(flattened_targets[i[0]:i[0] + len(i)]) # Merge list of result dictionaries results = {k: v for d in results for k, v in d.items()} # Split results using rebalanced target lists dist_results = [{t.id: results[t.id] for t in s} for s in dist_targets] else: dist_targets = None dist_results = None # distribute rebalance targets and results local_targets = targets.comm.scatter(dist_targets, root=0) results = targets.comm.scatter(dist_results, root=0) return local_targets, results
[docs]def sort_zfit(zfit): """ Sorts zfit table by goodness of fit, using 'zwarn' and 'chi2' columns Args: zfit: astropy Table with columns 'zwarn' and 'chi2' Modifies zfit in-place by sorting it """ zfit['__badfit__'] = (zfit['zwarn'] & badfit_mask) != 0 zfit.sort( ('__badfit__', 'chi2') ) zfit.remove_column('__badfit__')
[docs]def sort_zfit_dict(zfit): """ Sorts zfit dict by goodness of fit, using 'zwarn' and 'chi2' columns Args: zfit: dict of numpy arrays with columns 'zwarn' and 'chi2' Modifies zfit in-place by sorting it """ zfit['__badfit__'] = (zfit['zwarn'] & badfit_mask) != 0 sort_dict_by_cols(zfit, ('__badfit__', 'chi2'), sort_first_column_first=True) zfit.pop('__badfit__')
[docs]def zfind(targets, templates, mp_procs=1, nminima=3, archetypes=None, priors=None, chi2_scan=None, use_gpu=False, zminfit_npoints=15, per_camera=None, deg_legendre=None, n_nearest=None, prior_sigma=None, ncamera=None): """Compute all redshift fits for the local set of targets and collect. Given targets and templates distributed across a set of MPI processes, compute the redshift fits for all redshifts and our local set of targets. Each process computes the fits for a slice of redshift range and then cycles through redshift slices by passing the interpolated templates along to the next process in order. Note: If using MPI, only the rank 0 process will return results- all other processes with return a tuple of (None, None). Args: targets (DistTargets): distributed targets. templates (list): list of DistTemplate objects. mp_procs (int, optional): if not using MPI, this is the number of multiprocessing processes to use. nminima (int, optional): number of chi^2 minima to consider. Passed to fitz(). archetypes (list optional): list of Archetype objects to use for final fitz choice of best chi2 vs. z minimum. priors (str, optional): file containing redshift priors chi2_scan (str, optional): file containing already computed chi2 scan use_gpu (bool, optional): use gpu for calc_zchi2 deg_legendre (int): in archetype mode polynomials upto deg_legendre-1 will be used zminfit_npoints (int): number of finer redshift pixels to search for final redshift per_camera: (bool): True if fitting needs to be done in each camera for archetype mode n_nearest (int): number of nearest neighbours to be used in chi2 space (including best archetype) prior_sigma (float): prior to add in the final solution matrix: added as 1/(prior_sigma**2) only for per-camera mode zminfit_npoints (int): number of minimum redshift to be fit for final redshift estimation Returns: tuple: (allresults, allzfit), where "allresults" is a dictionary of the full chi^2 fit information, suitable for writing to a redrock scan file. "allzfit" is an astropy Table of only the best fit parameters for a limited set of minima. """ if archetypes: archetype_spectype = list(archetypes.keys()) # to account for the case if only one archetype is provided if not priors is None: priors = Priors(priors) # Find most likely candidate redshifts by scanning over the # pre-interpolated templates on a coarse redshift spacing. # See if we are the process that should be printing stuff... am_root = False if targets.comm is None: am_root = True elif targets.comm.rank == 0: am_root = True # If we are not using MPI, our DistTargets object will have all the targets # on the main process. In that case, we would like to distribute our # targets across multiprocesses. Here we compute that distribution, if # needed. mpdist = None if targets.comm is None: mpdist = distribute_targets(targets.local(), mp_procs) # Compute the coarse-binned chi2 for all local targets. start_zscan = elapsed(None, "", comm=targets.comm) if chi2_scan is None: results = calc_zchi2_targets(targets, templates, mp_procs=mp_procs, use_gpu=use_gpu) else: results = read_zscan_redrock(chi2_scan) elapsed(start_zscan, "Scanning redshifts", comm=targets.comm) if (use_gpu): #If using GPU, copy template flux and wave arrays to cupy objects #on the GPU once here so it is not copied every iteration of #rebinning below. Use gpuwave and gpuflux so as to not overwrite #numpy arrays. import cupy as cp for t in templates: t.template.gpuwave = cp.asarray(t.template.wave) t.template.gpuflux = cp.asarray(t.template.flux) # Note: rebalancing no longer needs to be done now that following steps # have been GPU-ized - CW 12/22 # Note: GPU zscan accommodates lopsided distribution of targets but this # is not great for the following steps that have not been GPU-ified yet. # Rebalance targets and results before proceeded. #if hasattr(targets, 'is_lopsided') and targets.is_lopsided and targets.comm is not None: # start_rebalance = elapsed(None, "", comm=targets.comm) # local_targets, results = _rebalance_after_scan(targets, results) # elapsed(start_rebalance, "Rebalancing targets", comm=targets.comm) #else: # local_targets = targets.local() local_targets = targets.local() # Apply redshift prior if not priors is None: for tg in results.keys(): for ft in results[tg].keys(): results[tg][ft]['zchi2'] += priors.eval(tg, results[tg][ft]['redshifts']) # For each of our local targets, refine the redshift fit close to the # minima in the coarse fit. start_findbest = elapsed(None, "", comm=targets.comm) sort = np.array([ t.template.full_type for t in templates]).argsort() #creating list of template spectype to make use in case archetypes would be used all_spectype = [] for spec in templates: if spec.template.template_type not in all_spectype: all_spectype.append(spec.template.template_type) for t in np.array(list(templates))[sort]: ft = t.template.full_type if archetypes: if t.template._rrtype in archetypes.keys(): archetype = archetypes[t.template._rrtype] else: archetype = None else: archetype = None if am_root: print(" Finding best fits for template {}"\ .format(t.template.full_type)) sys.stdout.flush() start = elapsed(None, "", comm=targets.comm) # Here we have another parallelization choice between MPI and # multiprocessing. if targets.comm is not None or mp_procs == 1: # MPI case. Every process just works with its local targets. for tg in local_targets: zfit = fitz(results[tg.id][ft]['zchi2'] \ + results[tg.id][ft]['penalty'], t.template.redshifts, tg, t.template, nminima=nminima,archetype=archetype, use_gpu=use_gpu, deg_legendre=deg_legendre, zminfit_npoints=zminfit_npoints, per_camera=per_camera, n_nearest=n_nearest, prior_sigma=prior_sigma) results[tg.id][ft]['zfit'] = zfit else: # Multiprocessing case. import multiprocessing as mp # Ensure that all targets are packed into shared memory for tg in targets.local(): tg.sharedmem_pack() qout = mp.Queue() procs = list() for i in range(mp_procs): if len(mpdist[i]) == 0: continue target_data = [ x for x in targets.local() if x.id in mpdist[i] ] eff_chi2 = np.zeros((len(target_data), len(t.template.redshifts)), dtype=np.float64) for i, tg in enumerate(target_data): eff_chi2[i,:] = results[tg.id][ft]['zchi2'] \ + results[tg.id][ft]['penalty'] p = mp.Process(target=_mp_fitz, args=(eff_chi2, target_data, t, nminima, qout, archetype, use_gpu, deg_legendre, zminfit_npoints, per_camera, n_nearest, prior_sigma)) procs.append(p) p.start() # Extract the output for i in range(mp_procs): if len(mpdist[i]) == 0: continue res = qout.get() for rs in res: results[rs[0]][ft]['zfit'] = rs[1] elapsed(start, " Finished in", comm=targets.comm) elapsed(start_findbest, "Finding best redshift", comm=targets.comm) # Add the target metadata to the results start_finalize = elapsed(None, "", comm=targets.comm) for tg in local_targets: results[tg.id]['meta'] = tg.meta # Gather our results to the root process and split off the zfit data. # Only process zero returns data- other ranks return None. allresults = None allzfit = None if targets.comm is not None: results = targets.comm.gather(results, root=0) else: results = [ results ] if am_root: allresults = dict() for p in results: allresults.update(p) del results # for t in templates: # ft = t.template.full_type allzfit = list() for tid in targets.all_target_ids: tzfit = list() for fulltype in allresults[tid]: if fulltype == 'meta': continue tmp = allresults[tid][fulltype]['zfit'] if archetypes is None: spectype, subtype = parse_fulltype(fulltype) else: if 'fulltype' in tmp.keys():#to take care of case when archetype is applied only for one template spectype = list() subtype = list() #l el is a list with one element (corresponding to each minimum) for el in tmp['fulltype']: this_spectype, this_subtype = parse_fulltype(el[0]) spectype.append(this_spectype) subtype.append(this_subtype) del tmp['fulltype'] #it's a dictionary else: spectype, subtype = parse_fulltype(fulltype) if spectype.upper() == 'STAR': max_velo_diff = constants.max_velo_diff_star else: max_velo_diff = constants.max_velo_diff #Have to create arrays of correct length since using dict of #np arrays instead of astropy Table nmin = len(tmp['chi2']) if np.isscalar(spectype): tmp['spectype'] = np.full((nmin, 1), spectype) tmp['subtype'] = np.full((nmin, 1), subtype) else: assert len(spectype)==nmin tmp['spectype'] = np.array([spectype]).reshape((nmin, 1)) tmp['subtype'] = np.array([subtype]).reshape((nmin, 1)) tmp['ncoeff'] = np.array([tmp['coeff'].shape[1]]*nmin).reshape((nmin, 1)) tzfit.append(tmp) del allresults[tid][fulltype]['zfit'] maxncoeff = max([tmp['coeff'].shape[1] for tmp in tzfit]) for tmp in tzfit: if tmp['coeff'].shape[1] < maxncoeff: n = maxncoeff - tmp['coeff'].shape[1] nx = tmp['coeff'].shape[0] c = np.append(tmp['coeff'], np.zeros((nx, n)), axis=1) tmp['coeff'] = c #tzfit = astropy.table.vstack(tzfit) ## Equivalent of astropy.table.vstack(tzfit) - vstack each array tzfit2 = dict() for k in tzfit[0].keys(): tzfit2[k] = list() for i in range(len(tzfit)): tzfit2[k].append(tzfit[i][k]) tzfit2[k] = np.vstack(tzfit2[k]) if (tzfit2[k].shape[1] == 1): tzfit2[k] = tzfit2[k].flatten() tzfit = tzfit2 #Have to create arrays of correct length since using dict of #np arrays instead of astropy Table l = len(tzfit['chi2']) tzfit['targetid'] = np.array([tid]*l) if (archetypes): if (len(archetype_spectype)==len(all_spectype)): if n_nearest is None: ibad = tzfit['coeff'][:,0]<=0. # means that best archetype has negative model else: ibad = np.any(tzfit['coeff'][:,:n_nearest]<0, axis=1) # any archetype has negative coeff else: ## need to check if this bitmask is only applied to objects for which archetypes are used ibad = np.isin(tzfit['spectype'], np.array(archetype_spectype)) index_check = np.where(ibad)[0] for k in index_check: if n_nearest is None: if tzfit['coeff'][:,0][k]>=0.: # don't reject physical model ibad[k]=False else: if np.all(tzfit['coeff'][:,:n_nearest][k]>=0): ibad[k]=False tzfit['zwarn'][ibad] |= ZW.NEGATIVE_MODEL tzfit['zwarn'][ tzfit['npixels']==0 ] |= ZW.NODATA tzfit['zwarn'][ (tzfit['npixels']<10*tzfit['ncoeff']) ] |= \ ZW.LITTLE_COVERAGE #- Sort by badfit zwarn bits and chi2 sort_zfit_dict(tzfit) # Trim down cases of multiple subtypes for a single type (e.g. # STARs) tzfit is already sorted by chi2, so keep first nminima of # each type. iikeep = list() for spectype in np.unique(tzfit['spectype']): ii = np.where(tzfit['spectype'] == spectype)[0] iikeep.extend(ii[0:nminima]) if (len(iikeep) < l): for k in tzfit.keys(): tzfit[k] = tzfit[k][iikeep] #- grouping by spectype could get chi2 out of order; resort sort_zfit_dict(tzfit) #Length may have changed l = len(tzfit['chi2']) tzfit['znum'] = np.arange(l) #- calc deltachi2 and set ZW.SMALL_DELTA_CHI2 flag deltachi2, setzwarn = calc_deltachi2(tzfit['chi2'], tzfit['z'], tzfit['zwarn'], dvlimit=max_velo_diff) tzfit['deltachi2'] = deltachi2 tzfit['zwarn'][setzwarn] |= ZW.SMALL_DELTA_CHI2 # Store # Here convert to astropy table allzfit.append(astropy.table.Table(tzfit)) allzfit = astropy.table.vstack(allzfit) #print(allzfit['targetid', 'z', 'zwarn', 'chi2', 'deltachi2','spectype','subtype']) # Cosmetic: move TARGETID to be first column as primary key try: allzfit.columns.move_to_end('targetid', last=False) except: # Must be using python2, don't mess with the order. pass # Now we have the final table of best fit results. We want to add any # extra columns from the target metadata. We assume that the meta keys # for the first target are the same keys for all targets... zfitids = list(allzfit['targetid']) allmetakeys = list(sorted(allresults[zfitids[0]]['meta'].keys())) # Parse any type information for the metadata. typepat = re.compile(r'(.*)_datatype') metakeys = list() metatypes = dict() for mk in allmetakeys: mat = typepat.match(mk) if mat is None: # this is a real key metakeys.append(mk) else: # get the data type metatypes[mat.group(1)] = allresults[zfitids[0]]['meta'][mk] for mk in metakeys: if mk not in metatypes: metatypes[mk] = None # Append the columns for mk in metakeys: if metatypes[mk] is not None: col = astropy.table.Column(np.array([ \ allresults[x]['meta'][mk] for x in zfitids ], dtype=metatypes[mk]), name=mk) else: col = astropy.table.Column([ allresults[x]['meta'][mk] \ for x in zfitids ], name=mk) allzfit.add_column(col) # Remove the meta data from the dictionary, so that it is not later # interpreted as a template type. for tid in targets.all_target_ids: del allresults[tid]['meta'] # Standardize column sizes if allzfit['subtype'].dtype != '<U20': allzfit.replace_column('subtype', allzfit['subtype'].astype('<U20')) #TODO: Think about standardizing the array dtype in case nearest neighbours is used if allzfit['spectype'].dtype != '<U6': allzfit.replace_column('spectype',allzfit['spectype'].astype('<U6')) if archetypes is None or not per_camera: maxcoeff = np.max([t.template.nbasis for t in templates]) else: if n_nearest is not None: maxcoeff = max(np.max([t.template.nbasis for t in templates]), ncamera*(deg_legendre)+n_nearest) else: maxcoeff = max(np.max([t.template.nbasis for t in templates]), ncamera*(deg_legendre)+1) if allzfit['coeff'].ndim == 1: ntarg = allzfit['coeff'].shape ncoeff = 1 else: ntarg, ncoeff = allzfit['coeff'].shape if ncoeff != maxcoeff: coeff = np.zeros((ntarg, maxcoeff), dtype=allzfit['coeff'].dtype) coeff[:,0:ncoeff] = allzfit['coeff'] allzfit.replace_column('coeff', coeff) elapsed(start_finalize, "Finalizing results", comm=targets.comm) return allresults, allzfit