"""
redrock.utils
=============
Redrock utility functions.
"""
from __future__ import absolute_import, division, print_function
import sys
import os
import time
import numpy as np
from . import constants
#- From https://github.com/desihub/desispec io.util.native_endian
[docs]def native_endian(data):
"""Convert numpy array data to native endianness if needed.
Returns new array if endianness is swapped, otherwise returns input data
By default, FITS data from astropy.io.fits.getdata() are not Intel
native endianness and scipy 0.14 sparse matrices have a bug with
non-native endian data.
Args:
data (array): input array
Returns:
array: original array if input in native endianness, otherwise a copy
with the bytes swapped.
"""
if data.dtype.isnative:
return data
else:
return data.byteswap().newbyteorder()
[docs]def encode_column(c):
"""Returns a bytes column encoded into a string column.
Args:
c (Table column): a column of a Table.
Returns:
array: an array of strings.
"""
return c.astype((str, c.dtype.itemsize))
#- Adapted from http://stackoverflow.com/a/21659588; unix only
[docs]def getch():
"""Return a single character from stdin.
"""
import tty, termios
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setraw(fd)
ch = sys.stdin.read(1)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
return ch
[docs]def elapsed(timer, prefix, comm=None):
"""Get and print the elapsed time.
If timer is None, compute the start time and return. Otherwise, find the
elapsed time and print a message before returning the new start time.
Args:
timer (float): time in seconds for some arbitrary epoch. If "None",
get the current time and return.
prefix (str): string to print before the elapsed time.
comm (mpi4py.MPI.Comm): optional communicator.
Returns:
float: the new start time in seconds.
"""
if comm is not None:
comm.barrier()
cur = time.time()
if timer is not None:
elapsed = cur - timer
if ((comm is None) or (comm.rank == 0)):
print("{}: {:0.1f} seconds".format(prefix, elapsed))
sys.stdout.flush()
return cur
[docs]def nersc_login_node():
"""Returns True if we are on a NERSC login node, else False.
"""
if ("NERSC_HOST" in os.environ) and ("SLURM_JOB_NAME" not in os.environ):
return True
else:
return False
[docs]def get_mp(requested):
"""Returns a reasonable number of multiprocessing processes.
This checks whether the requested value makes sense, and also whether we
are running on a NERSC login node (and hence would get in trouble trying
to use all cores).
Args:
requested (int): the requested number of processes.
Returns:
int: the number of processes to use.
"""
import multiprocessing as mp
mpmax = mp.cpu_count()
mpbest = mpmax // 2
if mpbest == 0:
mpbest = 1
procs = None
if requested == 0:
procs = mpbest
elif requested > mpmax:
print("Requested number of processes ({}) is too large, "
"reducing to {}".format(requested, mpmax))
sys.stdout.flush()
procs = mpmax
else:
procs = requested
# On NERSC login nodes, avoid hogging the whole node (and getting an
# unhappy phone call).
login_max = 4
if nersc_login_node():
if procs > login_max:
print("Running on a NERSC login node- reducing number of processes"
" to {}".format(login_max))
sys.stdout.flush()
procs = login_max
return procs
[docs]def mp_array(original):
"""Allocate a raw shared memory buffer and wrap it in an ndarray.
This allocates a multiprocessing.RawArray and wraps the buffer
with an ndarray.
Args:
typcode (str): the type code of the array.
size_or_init: passed to the RawArray constructor.
Returns;
ndarray: the wrapped data.
If the input array has size=0, the original is returned without
wrapping it in a size=0 multiprocessing.RawArray (which generates
warnings)
"""
import multiprocessing as mp
# A zero-length array generates the warning documented in
# https://github.com/desihub/redrock/issues/250; capture it.
if original.size == 0:
return original
typecode = original.dtype.char
shape = original.shape
raw = mp.RawArray(typecode, original.ravel())
nd = np.array(raw, dtype=typecode, copy=False).view()
nd.shape = shape
return nd
[docs]def distribute_work(nproc, ids, weights=None, capacities=None):
"""Helper function to distribute work among processes with varying capacities.
Args:
nproc (int): the number of processes.
ids (list): list of work unit IDs
weights (dict): dictionary of weights for each ID. If None,
use equal weighting.
capacities (list): list of process capacities. If None,
use equal capacity per process. A process with higher capacity
can handle more work.
Returns:
list: A list (one element for each process) with each element
being a list of the IDs assigned to that process.
"""
# Sort ids by weights (descending)
if weights is None:
weights = { x : 1 for x in ids }
sids = list(sorted(ids, key=lambda x: weights[x], reverse=True))
# If capacities are not provided, assume they are equal
if capacities is None:
capacities = [1] * nproc
# Initialize distributed list of ids
dist = [list() for _ in range(nproc)]
# Initialize process list. Processes are modeled using dictionary
# with fields for a unique id, capacity, and load (total weight of work).
processes = [dict(id=i, capacity=c, load=0) for i, c in enumerate(capacities)]
for id in sids:
w = weights[id]
# Identify process to receive task. Smallest normalized load, break ties with capacity, followed by id.
minload = min(processes, key=lambda p: ((p['load'] + w)/p['capacity'], 1/p['capacity'], p['id']))
i = processes.index(minload)
# Assign work unit to process
minload['load'] += weights[id]
dist[i].append(id)
return dist
[docs]def getGPUCountMPI(comm):
"""Determine the number of GPUs available when running in MPI mode
In MPI mode, cupy.cuda.runtime.getDeviceCount() always returns 1 so
this method determines the actual number of GPUs.
Args:
comm: the MPI communicator
Returns:
int: the number of GPUs
"""
#First check /proc because this is fast
if os.access('/proc/driver/nvidia/gpus/', os.F_OK):
return len(os.listdir('/proc/driver/nvidia/gpus/'))
#Now gather device IDs which is slow
if comm.rank == 0:
print("WARNING: --max-gpuprocs option was not specified")
print("WARNING: Cannot find /proc/driver/nvidia/gpus/")
print("WARNING: Finding number of devices by gathering PCI Bus ids")
import cupy
#Get this device's PCI id
n = cupy.cuda.runtime.getDevice()
pci_id = cupy.cuda.runtime.deviceGetPCIBusId(n)
#Gather and remove duplicates
pci_id_list = comm.allgather(pci_id)
pci_id_list = list(set(pci_id_list))
if comm.rank == 0:
print("WARNING: Found {:d} GPUs".format(len(pci_id_list)))
return len(pci_id_list)
[docs]def reduced_wavelength(wave):
"""function to calculate reduced wavelength arrya for
legendre polynomial in archetype mode,
legendre polynomials are orthogonal in [-1,1]
Args:
wave (array): wavelength for which reduced wavelengths to be estimated
Return:
reduced wavelength in [-1,1] range
"""
wavemax = wave.max()
wavemin = wave.min()
return 2*(wave - wavemin) / (wavemax - wavemin) - 1