# Licensed under a 3-clause BSD style license - see LICENSE.rst
# -*- coding: utf-8 -*-
"""
Stack some spectra!
----
.. include license and copyright
.. include:: ../include/copy.rst
----
.. include common links, assuming primary doc root is up one directory
.. include:: ../include/links.rst
"""
from IPython import embed
import numpy
from scipy import sparse, interpolate
from matplotlib import pyplot, rc
#from astropy.io import fits
import astropy.constants
from ..par.parset import KeywordParSet
from ..util.covariance import Covariance
from ..util.filter import interpolate_masked_vector
from ..util.sampling import Resample, spectral_coordinate_step
[docs]
class SpectralStackPar(KeywordParSet):
r"""
Class with parameters used to set how to stack a set of spectra.
See :class:`mangadap.par.parset.ParSet` for attributes.
.. todo::
Allow for sigma rejection.
The defined parameters are:
.. include:: ../tables/spectralstackpar.rst
.. warning::
Velocity registration is currently *not* implemented.
"""
def __init__(self, operation='mean', register=False, cz=None, covar_mode='none',
covar_par=None):
in_fl = [ int, float ]
ar_like = [ numpy.ndarray, list ]
op_options = SpectralStack.operation_options()
covar_options = SpectralStack.covariance_mode_options()
pars = ['operation', 'register', 'cz', 'covar_mode', 'covar_par']
values = [operation, register, cz, covar_mode, covar_par]
defaults = ['mean', False, None, 'none', None]
options = [op_options, None, None, covar_options, None]
dtypes = [str, bool, ar_like, str, in_fl+ar_like]
descr = ['Operation to perform for the stacked spectrum. See ' \
':func:`SpectralStack.operation_options` for the available operation options.',
'Flag to register the spectra by deshifting them based on their observed *cz* ' \
'velocities. This is done before adding them based on a provided ' \
'prior measurement of the velocities.',
'List of measured *cz* velocities used to register the spectra.',
'Describes how to incorporate covariance into the spectral stacking. ' \
'See :func:`SpectralStack.covariance_mode_options` for the available options.',
'The parameter(s) needed to perform a given method of handling the ' \
'covariance. See :func:`SpectralStack.covariance_mode_options` for the ' \
'available options.']
super().__init__(pars, values=values, defaults=defaults, options=options, dtypes=dtypes,
descr=descr)
[docs]
class SpectralStack:
r"""
Class whose primary function is to stack a set of spectra while
treating covariance between spectra.
See :func:`covariance_mode_options` for available methods of
accounting for covariance.
Class also approximates the resulting spectral resolution of the
stacked data.
.. todo::
List attributes
"""
def __init__(self):
# Keep the matrix used to bin the spectra. This will be a
# scipy.sparse.csr_matrix.
self.rebin_T = None
# Internal arrays for callback
self.wave = None
self.flux = None
self.fluxsqr = None
self.fluxmean = None
self.fluxsdev = None
self.npix = None
self.ivar = None
self.sres = None
self.covar = None
[docs]
@staticmethod
def _check_covariance_type(covariance_mode, covar, ivar):
"""
Check that the covariance variable has the correct type for
the given mode.
Args:
covariance_mode (:obj:`str`):
Covariance handling mode; see
:func:`covariance_mode_options`.
covar (None, :obj:`float`, :class:`mangadap.util.covariance.Covariance`):
The object to check the type against the covariance
handling mode.
ivar (None, object):
Inverse variance. Only check performed is whether or
not this is None.
Returns:
:obj:`bool`: Flag that type is correct.
"""
if covariance_mode == 'none':
return True
if covariance_mode in ['calibrate', 'channels', 'wavelengths'] and ivar is None:
return False
if covariance_mode == 'calibrate' and not isinstance(covar, float):
return False
if covariance_mode == 'calibrate':
return True
if not isinstance(covar, Covariance):
return False
return covar.dim == 3
[docs]
@staticmethod
def _check_covariance_shape(covariance_mode, covar, nwave, nspec):
"""
Check that the input covariance object has the correct shape
for the given mode.
Args:
covariance_mode (:obj:`str`):
Covariance handling mode; see
:func:`covariance_mode_options`.
covar (None, :obj:`float`, :class:`mangadap.util.covariance.Covariance`):
The object to check the type against the covariance
handling mode.
nwave (:obj:`int`):
Number of wavelength channels.
nspec (:obj:`int`):
Number of spectra.
Returns:
:obj:`bool`: Flag that the covariance data has the
correct shape.
"""
if covariance_mode in [ 'none', 'calibrate']:
return True
if covariance_mode in [ 'full', 'approx_correlation' ] \
and (covar.dim != 3 or covar.shape[-1] != nwave):
return False
if covar.shape[0] != nspec:
return False
return True
[docs]
def _set_rebin_transfer_matrix(self, binid, binwgt=None):
r"""
Construct the transfer matrix that rebins the spectra.
The shape of the transfer matrix is :math:`(N_{\rm bin}
\times N_{\rm spec})` with the expectation that the spectrum
flux array has shape :math:`(N_{\rm spec} \times N_{\rm
wave})`.
The binned spectra are calculated by matrix multiplication,
:math:`\mathbf{B} = \mathbf{T} \times \mathbf{F}` such that
the covariance matrix can be calculated as :math:`\mathbf{C}
= \mathbf{T} \times \mathbf{\Sigma} \times \mathbf{T}^{\rm
T}`, where :math:`\mathbf{\Sigma}` is the covariance matrix
in the flux array, :math:`\mathbf{F}`.
If weighting, the sum of the weights is normalized to the
number of points included in the bin.
Args:
binid (`numpy.ndarray`_):
Index, one per spectrum in the flux array, for the
binned spectrum. Indices of less than one are
ignored.
binwgt (`numpy.ndarray`_, optional):
List of weights for the spectra. If not provided, the
weights are uniform.
"""
nspec = binid.size
valid = binid > -1
unique_bins = numpy.unique(binid[valid])
nbin = len(unique_bins)
self.rebin_T = numpy.zeros((nbin,nspec), dtype=float)
for j in range(nbin):
indx = binid == unique_bins[j]
self.rebin_T[j,indx] = 1.0 if binwgt is None else \
binwgt[indx]*numpy.sum(indx)/numpy.sum(binwgt[indx])
self.rebin_T = sparse.csr_matrix(self.rebin_T)
[docs]
def _stack_without_covariance(self, flux, ivar=None, sres=None, linear_sres=False):
"""
Stack the spectra, ignoring any covariance that may or may not
exist.
This method calculates the sum of the flux, flux^2, and
determines the number of pixels in the sum. Sets
:attr:`flux`, :attr:`fluxsqr`, :attr:`npix`, :attr:`ivar`,
:attr:`sres`. The stored data is always based on the **sum**
of the spectra in the stack.
Args:
flux (`numpy.ma.MaskedArray`_):
Flux array.
ivar (:obj:`numpy.ma.MaskedArray`, optional):
The inverse variance array.
sres (`numpy.ma.MaskedArray`_, optional):
1D or 2D spectral resolution as a function of
wavelength for all or each input spectrum. Default is
to ignore any spectral resolution data.
linear_sres (:obj:`bool`, optional):
Construct the combined resolution as the mean of the
input instrumental FWHM (1/sres). If False, the
quadratic mean is used (i.e., rms =
sqrt(mean(square))); this mode is formally correct
for the second moment of the combined LSF (however,
note that this not the same as the accuracy of a
Gaussian fit to the combined line profile; cf. Law et
al. 2020).
"""
# Convert the stack transform matrix to a dense array
rt = self.rebin_T.toarray()
# Relevant shapes
nstack, nspec = rt.shape
nwave = flux.shape[-1]
# Get the number of pixels included in the stack of each spectrum.
gpm = numpy.logical_not(numpy.ma.getmaskarray(flux)).astype(int)
self.npix = numpy.dot(rt, gpm)
# The gymnastics below are done primarily to ensure that stacks
# that only include a single spectrum do *not* lose their
# masked values in the array with the stacked spectra. I.e.,
# these spectra are simply copied to the stacked array,
# whereas numpy.dot is used for the stack calculations. NOTE:
# numpy.ma.dot was abandoned because it is *much* slower than
# the simple multiplication by the gpm done below.
# Calculate the number of spectra in each stack
nspec_per_stack = numpy.sum((rt > 0).astype(int), axis=1)
# Construct the output vectors
self.flux = numpy.ma.zeros((nstack, nwave), dtype=float)
self.fluxsqr = numpy.ma.zeros((nstack, nwave), dtype=float)
self.ivar = None
if ivar is not None:
self.ivar = numpy.ma.zeros((nstack, nwave), dtype=float)
self.sres = None
if sres is not None:
Tc = numpy.sum(rt, axis=1)
Tc[numpy.logical_not(Tc>0)] = 1.0
self.sres = numpy.ma.zeros((nstack, nwave), dtype=float)
# Find the stacked spectra that include more than one input
# spectrum
stacki, speci = numpy.where((nspec_per_stack[:,None] > 1) & (rt > 0))
# Do the stacking
self.flux[stacki] = numpy.dot(rt[stacki,:][:,speci], gpm[speci]*flux.data[speci])
self.fluxsqr[stacki] = numpy.dot(rt[stacki,:][:,speci],
numpy.square(gpm[speci]*flux.data[speci]))
if self.ivar is not None:
self.ivar[stacki] = numpy.ma.power(numpy.dot(numpy.square(rt[stacki,:][:,speci]),
numpy.ma.divide(gpm[speci], ivar[speci]).filled(0.0)), -1.)
if self.sres is not None:
self.sres[stacki] = numpy.ma.power(numpy.dot(rt[stacki,:][:,speci],
numpy.ma.power(sres[speci], -1).filled(0.0))
/ Tc[stacki,None], -1) if linear_sres \
else numpy.ma.power(numpy.dot(rt[stacki,:][:,speci],
numpy.ma.power(sres[speci], -2).filled(0.0))
/ Tc[stacki,None], -0.5)
# Copy over the data from the "stacks" that only include single
# spectra
stacki, speci = numpy.where((nspec_per_stack[:,None] == 1) & (rt > 0))
self.flux[stacki] = flux[speci]
self.fluxsqr[stacki] = numpy.square(flux[speci])
if self.ivar is not None:
self.ivar[stacki] = ivar[speci]
if self.sres is not None:
self.sres[stacki] = sres[speci]
# Ensure that pixels with zero contributions are masked;
# spectral resolution vectors are *not* masked.
self.flux[self.npix == 0] = numpy.ma.masked
self.fluxsqr[self.npix == 0] = numpy.ma.masked
if self.ivar is not None:
self.ivar[self.npix == 0] = numpy.ma.masked
[docs]
def _stack_with_covariance(self, flux, covariance_mode, covar, ivar=None, sres=None):
"""
Stack the spectra and incorporate covariance.
Args:
flux (`numpy.ma.MaskedArray`_):
Flux array.
covariance_mode (:obj:`str`):
Covariance handling mode; see
:func:`covariance_mode_options`.
covar (None, :obj:`float`, :class:`mangadap.util.covariance.Covariance`):
The relevant covariance object that must match the
needs of the covariance handling mode.
ivar (:obj:`numpy.ma.MaskedArray`, optional):
The inverse variance array. Must not be None if
``covariance_mode`` is 'channels' or 'wavelengths'.
sres (`numpy.ma.MaskedArray`_, optional):
1D or 2D spectral resolution as a function of
wavelength for all or each input spectrum. Default is
to ignore any spectral resolution data.
"""
# First stack without covariance. This sets self.flux,
# self.fluxsqr, self.npix, self.ivar, self.sres.
self._stack_without_covariance(flux, ivar=ivar, sres=sres)
# If calibrating the noise based on the equation,
# Noise_corrected = Noise_nocovar * (1 + f * log10(N))
# apply it and return
if covariance_mode == 'calibrate':
if self.ivar is not None:
self.ivar /= numpy.square(1.0 + covar*numpy.ma.log10(self.npix))
return
# Check that the code knows what to do otherwise
# TODO: Isn't this check done elsewhere?
if covariance_mode not in ['channels', 'wavelengths', 'approx_correlation', 'full']:
raise ValueError('Unknown covariance mode: {0}'.format(covariance_mode))
# Recalibrate the error based on a selection of covariance
# channels
recalibrate_ivar = covariance_mode in ['channels', 'wavelengths']
if recalibrate_ivar and self.ivar is None:
raise ValueError('Must provide ivar to recalibrate based on covar.')
# Setup for output
nbin = self.flux.shape[0]
nchan = covar.shape[-1]
self.covar = numpy.empty(nchan, dtype=sparse.csr_matrix)
variance_ratio = numpy.ma.zeros( (nbin,nchan), dtype=float) \
if recalibrate_ivar else None
# Calculate the covariance in the stack
for i in range(nchan):
j = covar.input_indx[i]
cov_array = covar.full(channel=j)
self.covar[i] = sparse.triu(self.rebin_T.dot(cov_array.dot(self.rebin_T.T))).tocsr()
# Get the variance ratio
if recalibrate_ivar:
variance_ratio[:,i] = self.covar[i].diagonal() * self.ivar[:,j]
variance_ratio[numpy.ma.getmaskarray(self.ivar)[:,j],i] = numpy.ma.masked
self.covar = Covariance(self.covar, input_indx=covar.input_indx)
# Set ivar by recalibrating the existing data
if recalibrate_ivar:
ratio = numpy.ma.median(variance_ratio, axis=1 )
# TODO: Add a debug mode for this?
# self._recalibrate_ivar_figure(ratio, ofile='ivar_calibration.pdf')
self.ivar = numpy.ma.power(ratio, -1.0)[:,None] * self.ivar
return
# Get the inverse variance from the full covariance matrix
self.ivar = numpy.ma.power(self.covar.variance(), -1.) #.filled(0.0)
[docs]
def _covar_in_mean(self):
"""
Compute the covariance in the mean spectrum by propagating the
division by the number of pixels through the covariance matrix.
Returns:
:class:`~mangadap.util.covariance.Covariance`: Covariance
in the mean spectrum. Returns None if :attr:`covar` is
None.
"""
if self.covar is None:
return None
nchan = self.covar.shape[-1]
nbin = self.flux.shape[0]
inpix = numpy.ma.power(self.npix, -1.)
covar = numpy.empty(nchan, dtype=sparse.csr_matrix)
for i in range(nchan):
j = self.covar.input_indx[i]
_inpix = inpix[:,j,None]*inpix[None,:,j]
covar[i] = sparse.triu(self.covar.toarray(channel=j) * _inpix).tocsr()
return Covariance(covar, input_indx=self.covar.input_indx)
[docs]
def _get_stack_mean(self):
"""
Convert the summed stack to the mean stacked spectra using
the internal data.
Returns:
:obj:`tuple`: See the return statement for :func:`stack`.
"""
_ivar = None if self.ivar is None else self.ivar * numpy.square(self.npix)
return self.wave, self.fluxmean, self.fluxsdev, self.npix, _ivar, self.sres, \
self._covar_in_mean()
[docs]
@staticmethod
def operation_options():
"""
Return the allowed stacking operations.
Current operations are:
- ``mean``: Construct the mean of the spectra
- ``sum``: Construct the spectrum sum.
Returns:
:obj:`list`: List of available operations.
"""
return ['mean', 'sum']
[docs]
@staticmethod
def covariance_mode_options(par_needed=False):
r"""
Return the list of allowed covariance options.
The two parameters ``covar_mode`` and ``covar_par`` (see
:class:`SpectralStackPar`) set how covariance is accounted
for in the stacking procedure. The valid options are:
- ``none``: The noise in the stacked spectrum is a
nominal propagation of the error assuming no
covariance. No parameters needed.
- ``calibrate``: Where :math:`N_{\rm bin}` is the number
of binned spaxels and :math:`\alpha` as a provided
parameter, the spectral noise is calibrated following:
.. math::
n_{\rm calib} = n_{\rm nominal} (1 + \alpha \log\
N_{\rm bin})
- ``channels``: The noise vector of each stacked spectrum
is adjusted based on the mean ratio of the nominal and
formally correct calculations of the noise measurements
over a number of spectral channels. The channels are
drawn from across the full spectral range. The number
of channels to use is a defined parameter. The
covariance matrix must be provided to :func:`stack`.
- ``wavelengths``: Functionally equivalent to
``channels``; however, the channels to use are set by a
list of provided wavelengths. The covariance matrix
must be provided to :func:`stack`.
- ``approx_correlation``: Approximate the covariance
matrix using a Gaussian description of the correlation
between pixels. See
:func:`mangadap.datacube.datacube.DataCube.approximate_correlation_matrix`.
The value of :math:`\sigma` provides for the Gaussian
desciption of :math:`\rho_{ij}` in the correlation
matrix. The covariance matrix must be provided to
:func:`stack`.
- ``full``: The full covariance cube is calculated and
the noise vectors are constructed using the formally
correct calculation. No parameters needed. The
covariance matrix must be provided to :func:`stack`.
Returns:
:obj:`list`: List of the allowed modes.
"""
modes = ['calibrate', 'approx_correlation', 'channels', 'wavelengths']
if par_needed:
return modes
return modes + ['none', 'full']
[docs]
@staticmethod
def parse_covariance_parameters(mode, par):
"""
Parse the parameters needed for the treatment of the
covariance when stacking spectra.
Args:
mode (:obj:`str`):
Mode to use. Must be an allowed mode; see
:func:`covariance_mode_options`.
par (:obj:`str`):
String representation of the parameters for the
specified mode.
Returns:
:obj:`float`, :obj:`list`: Parameters parsed from the
input string for the designated covariance mode.
Raises:
TypeError:
Raised if the input parameter could not be converted
to a float as needed by the specified mode.
ValueError:
Raised if the mode is not recognized.
"""
mode_options = SpectralStack.covariance_mode_options()
if mode not in mode_options:
raise ValueError('Mode not among valid options: {0}.\nOptions are: {1}'.format(mode,
mode_options))
if mode in ['none', 'full']:
return None
if mode in ['calibrate', 'approx_correlation']:
try:
return float(par)
except:
raise TypeError('Could not convert to float: {0}'.format(par))
if mode == 'channels':
return int(par) #[ int(e.strip()) for e in par.split(',') ]
if mode == 'wavelengths':
return [ float(e.strip()) for e in par.split(',') ]
[docs]
@staticmethod
def min_max_wave(wave, cz):
r"""
Determine the minimum and maximum of all shifted wavelength
ranges.
Args:
wave (array-like):
Original wavelengths. Should be 1D.
cz (:obj:`float`, array-like):
The redshift of one or more spectra to be removed. Each
element is applied to the wavelength vector to determine the
maximum wavelength range required to full sample all
deshifted spectra.
Returns:
:obj:`tuple`: Two floats with the minimum and maximum
wavelengths.
Raises:
ValueError:
Raised if either ``wave`` or ``cz`` have the wrong
dimensionality.
"""
_wave = numpy.atleast_1d(wave)
if _wave.ndim != 1:
raise ValueError('Wavelength vector should be 1D!')
_cz = numpy.atleast_1d(cz)
if _cz.ndim != 1:
raise ValueError('Velocity should be a float or 1D vector.')
_wave = numpy.array([numpy.amin(_wave), numpy.amax(_wave)])
_shifted_wave = _wave[None,:]/(1.+_cz[:,None]/astropy.constants.c.to('km/s').value)
return numpy.amin(_shifted_wave), numpy.amax(_shifted_wave)
[docs]
@staticmethod
def register(wave, cz, flux, ivar=None, mask=None, sres=None, log=False, base=10.0,
keep_range=False, flim=0.5):
r"""
Register a set of spectra to the same wavelength range given a
set of measured velocities.
.. todo::
- Allow for correction for deredshifting flux.
Args:
wave (`numpy.ndarray`_):
Single wavelength vector for all input spectra. Must
be 1D with shape :math:`(N_{\rm wave},)`.
cz (:obj:`float`, array-like):
The measured :math:`cz` velocities of all spectra or each
spectrum individually. The "registration" deshifts all the
spectra such that :math:`cz = 0`.
flux (`numpy.ndarray`_, `numpy.ma.MaskedArray`_):
Flux array to register. Must be 2D with shape
:math:`(N_{\rm spec},N_{\rm wave})`.
ivar (`numpy.ndarray`_, `numpy.ma.MaskedArray`_, optional):
Inverse variance in the spectrum fluxes. Shape must
match ``flux``.
mask (`numpy.ndarray`_, optional):
Boolean array with values in ``flux`` to mask.
Default assumes nothing is masked. If ``flux`` and/or
``ivar`` are masked array, this is included in a
union with those masks. Shape must match ``flux``.
sres (`numpy.ndarray`_, `numpy.ma.MaskedArray`_, optional):
1D or 2D spectral resolution as a function of
wavelength for all or each input spectrum. Default is
to ignore any spectral resolution data. If a masked
array, the masked pixels are interpolated.
log (:obj:`bool`, optional):
Flag that the wavelength vector is sampled
geometrically in wavelength.
base (:obj:`float`, optional):
If the wavelength vector is geometrically sampled,
this is the base of the logarithmic step.
keep_range (:obj:`bool`, optional):
When registering the wavelengths of the shifted
spectra, keep the identical spectral range as input.
flim (:obj:`float`, optional):
Mask any pixels in the output flux arrays that are
covered by less than this fraction by the input
masked flux array.
Returns:
:obj:`tuple`:Returns four arrays: (1) the new wavelength
vector common to all spectra; (2) the new masked flux
array; (3) the new masked inverse variance array, which
will be None if no inverse variances are provided to the
function; and (4) the new spectral resolution vectors,
which will be None if no spectral resolution vectors are
provided.
Raises:
ValueError:
Raised if the wavelength or velocity offset vectors are
not one-dimensional, if the flux array is not
two-dimensional, if the inverse variance or mask arrays
do not have the same shape as the flux array, or if the
number of wavelengths does not match the second axis of
the flux array.
"""
# Check the input
if len(wave.shape) != 1:
raise ValueError('Input wavelength array must be one-dimensional.')
if len(flux.shape) != 2:
raise ValueError('Input flux array must be two-dimensional. To register a single ' \
'flux vector, use mangadap.util.sampling.Resample.')
if flux.shape[1] != wave.size:
raise ValueError('Flux array shape does not match the wavelength data.')
if ivar is not None and ivar.shape != flux.shape:
raise ValueError('Input inverse variance array must have the same shape as flux array.')
if mask is not None and mask.shape != flux.shape:
raise ValueError('Input mask array must have the same shape as flux array.')
if sres is not None and sres.shape != wave.shape and sres.shape != flux.shape:
raise ValueError('Input spectral resolution data has incorrect shape.')
# Get the mask
inp_mask = SpectralStack._get_input_mask(flux, ivar=ivar, mask=mask)
# Get the spectral resolution (always a masked array)
inp_sres = SpectralStack._check_input_sres(sres, flux.shape[0])
# Output spectral range. If keep_range is True, this is the same as the
# input range; otherwise, this selects the maximum range necessary to
# accommodate all velocity shifts.
outRange = [wave[0], wave[-1]] if keep_range \
else list(SpectralStack.min_max_wave(wave, cz))
# Sampling (logarithmic or linear)
dw = spectral_coordinate_step(wave, log=log, base=base)
# Calculate the 1-sigma error
ferr = None if ivar is None else numpy.ma.power(ivar, -0.5)
# Resample the flux and error
_cz = numpy.atleast_1d(cz)
if len(_cz) != 1 and len(_cz) != flux.shape[0]:
raise ValueError('Must provide one velocity for all spectra or one velocity per '
'spectrum.')
if len(_cz) == 1:
# Offsetting all spectra by a single velocity
_wave = wave/(1+cz/astropy.constants.c.to('km/s').value)
resamp = Resample(flux, e=ferr, mask=inp_mask, x=_wave, inLog=log, newRange=outRange,
newdx=dw, base=base)
_flux = resamp.outy
_flux[resamp.outf < flim] = numpy.ma.masked
if ivar is None:
_ivar = None
else:
_ivar = numpy.ma.power(resamp.oute, -2)
_ivar[numpy.ma.getmaskarray(_flux)] = numpy.ma.masked
# Interpolate the spectral resolution at the location of the new
# wavelength vector
_sres = None if inp_sres is None \
else interpolate.interp1d(_wave, inp_sres, axis=1, assume_sorted=True,
fill_value='extrapolate')(resamp.outx)
return resamp.outx, _flux, _ivar, _sres
# Need to resample each vector independently
# Get the shifted wavelengths
_wave = wave[None,:]/(1.+_cz[:,None]/astropy.constants.c.to('km/s').value)
# Resample the first spectrum. Done outside of a loop to make sure to
# get the length of the new spectra.
resamp = Resample(flux[0], e=None if ferr is None else ferr[0], mask=inp_mask[0],
x=_wave[0], inLog=log, newRange=outRange, newdx=dw, base=base)
# Get the new wavelength array (should be the same for all spectra)
newwave = resamp.outx
# Init the flux array and apply the mask
_flux = numpy.ma.MaskedArray(numpy.zeros((flux.shape[0],resamp.outy.size), dtype=float))
_flux[0] = resamp.outy
_flux[0,resamp.outf < flim] = numpy.ma.masked
# Init the error array
_ferr = numpy.zeros_like(_flux, dtype=float)
if ferr is not None:
_ferr[0] = resamp.oute
# Init the spectral resolution vector(s)
_sres = None
if inp_sres is not None:
_sres = numpy.zeros_like(_flux.data, dtype=float)
_sres[0] = interpolate.interp1d(wave, inp_sres[0], assume_sorted=True,
fill_value='extrapolate')(resamp.outx)
# Loop over the remaining spectra
for i in range(1,flux.shape[0]):
resamp = Resample(flux[i], e=None if ferr is None else ferr[i], mask=inp_mask[i],
x=_wave[i], inLog=log, newx=newwave)
_flux[i] = resamp.outy
_flux[i,resamp.outf < flim] = numpy.ma.masked
if ferr is not None:
_ferr[i] = resamp.oute
if inp_sres is not None:
_sres[i] = interpolate.interp1d(_wave[i], inp_sres[i], assume_sorted=True,
fill_value='extrapolate')(newwave)
# Compute the inverse variance
if ivar is None:
_ivar = None
else:
_ivar = numpy.ma.power(_ferr, -2)
_ivar[numpy.ma.getmaskarray(_flux)] = numpy.ma.masked
# Done
return newwave, _flux, _ivar, _sres
[docs]
@staticmethod
def build_covariance_data(cube, covariance_mode, covariance_par):
"""
Construct the covariance data needed by the specified
covariance mode.
Args:
cube (:class:`mangadap.datacube.datacube.DataCube`):
Datacube object to bin.
covariance_mode (:obj:`str`):
Mode to use. Must be an allowed mode; see
:func:`covariance_mode_options`.
covariance_par (:obj:`int`, :obj:`float`, :obj:`list`):
Parameters required by the specified mode.
Returns:
None, :obj:`float`,
:class:`mangadap.util.covariance.Covariance`: Covariance
data relevant to the selected mode.
Raises:
ValueError:
Raised if the covariance mode is not valid or if a
covariance matrix cannot be computed for the provided
datacube.
TypeError:
Raised if the covariance parameters do not have the
correct type.
"""
# Check that the covariance data is correct
if covariance_mode is not None \
and covariance_mode not in SpectralStack.covariance_mode_options():
raise ValueError('Unrecognized method for covariance: {0}'.format(covariance_mode))
if covariance_mode is None:
covariance_mode = 'none'
if covariance_mode == 'none':
return None
if covariance_mode == 'calibrate':
return covariance_par
# TODO: I don't think this catches every use case...
if not cube.can_compute_covariance:
raise ValueError('Cannot compute covariance for the provided datacube.')
if covariance_mode in [ 'channels', 'wavelengths' ]:
if covariance_mode == 'channels':
if not isinstance(covariance_par, int):
raise TypeError('Unable to handle \'channels\' covariance parameter with ' \
'type: {0}'.format(type(covariance_par)))
_covariance_par = numpy.linspace(0,cube.nwave-1,num=covariance_par).astype(int)
else:
_covariance_par = covariance_par
if isinstance(_covariance_par, float):
_covariance_par = numpy.array([covariance_par])
if isinstance(_covariance_par, list):
_covariance_par = numpy.array(covariance_par)
if not isinstance(_covariance_par, numpy.ndarray):
raise TypeError('Unable to handle covariance parameter of type: {0}'.format(
type(covariance_par)))
# Convert the wavelengths to channel numbers
_covariance_par = numpy.unique(numpy.argsort(numpy.absolute(
cube.wave[:,None]-_covariance_par[None,:]),
axis=1)[0,:])
return cube.covariance_cube(channels=_covariance_par)
if covariance_mode == 'approx_correlation':
if not isinstance(covariance_par, float):
raise TypeError('For approximate correlation matrix, provide sigma as a float.')
return cube.covariance_cube(sigma_rho=covariance_par)
if covariance_mode == 'full':
warnings.warn('Sit back. This is going to take a while...')
return cube.covariance_cube()
raise ValueError('Unrecognized covariance method: {0}'.format(covariance_mode))
[docs]
def stack_datacube(self, cube, binid, par=None):
r"""
Wrapper function for :func:`stack` that accepts a datacube
object.
.. todo::
- List the required elements of ``par``.
Args:
cube (:class:`mangadap.datacube.datacube.DataCube`):
Datacube to stack.
binid (:obj:`numpy.ndarray`):
Indices of the bin in which to place each spectrum.
Shape must be the flattened spatial shape of the
datacube; i.e., :math:`(N_{\rm spec},)`.
par (:class:`SpectralStackPar`, optional):
Set of parameters used to define how to stack the
spectra. See :func:`stack`. Does not need to be
provided on initialization, but a parameter set is
required for all the stacking routines.
Returns:
:obj:`tuple`: See the return statement for :func:`stack`.
"""
flux = cube.copy_to_masked_array(flag=cube.do_not_stack_flags())
ivar = cube.copy_to_masked_array(attr='ivar', flag=cube.do_not_stack_flags())
sres = cube.copy_to_array(attr='sres')
covar = SpectralStack.build_covariance_data(cube, par['covar_mode'], par['covar_par'])
# Make sure all the inverse variance values are valid
indx = numpy.invert(ivar > 0)
if numpy.any(indx):
flux.mask |= indx
ivar.mask |= indx
return self.stack(cube.wave, flux, binid=binid, ivar=ivar, log=True, keep_range=True) \
if par is None else \
self.stack(cube.wave, flux, operation=par['operation'], binid=binid, ivar=ivar,
sres=sres, cz=par['cz'], log=True,
covariance_mode=par['covar_mode'], covar=covar, keep_range=True)
[docs]
def stack(self, wave, flux, operation='mean', binid=None, binwgt=None, ivar=None, mask=None,
sres=None, cz=None, log=False, base=10.0, covariance_mode=None, covar=None,
keep_range=False):
r"""
Stack a set of spectra.
The method also sets all the returned variables to the
internal attributes; however, they are always kept as the
sum, not the mean, of the spectra, regardless of the value of
``operation``.
.. todo:
- Allow for renormalization of spectra before stacking.
Where and how, TBD.
- If only one spectrum returned, return flux array as a
single vector?
Args:
wave (`numpy.ndarray`_):
Single wavelength vector for all input spectra.
flux (`numpy.ndarray`_, `numpy.ma.MaskedArray`_):
Spectrum flux values. Shape must be :math:`(N_{\rm
spec}, N_{\rm wave})`.
operation (:obj:`str`, optional):
Stacking operation to perform. See
:func:`operation_options`.
binid (`numpy.ndarray`_, optional):
Indices of the bin in which to place each spectrum.
Shape is :math:`(N_{\rm spec},)`. If not provided,
all the spectra will be combined.
binwgt (`numpy.ndarray`_, optional):
Weights for each of the spectra. All weights must be
positive. A binwgt of 0 is considered identical to
masking the spectrum. Shape is :math:`(N_{\rm
spec},)`. If None, weights are uniform.
ivar (`numpy.ndarray`_, `numpy.ma.MaskedArray`_, optional):
Inverse variance in the spectrum fluxes. Shape must
match ``flux``.
mask (`numpy.ndarray`_, optional):
Binary mask values for the spectrum fluxes
(False=unmasked; True=masked). Shape must match
``flux``. Default assumes no pixel mask.
sres (`numpy.ndarray`_, `numpy.ma.MaskedArray`_, optional):
1D or 2D spectral resolution as a function of
wavelength for all or each input spectrum. Shape must
be :math:`(N_{\rm wave},)` or :math:`(N_{\rm
spec},N_{\rm wave})`. If provided as a masked array,
masked pixels are replaced with the interpolated
resolution at that location.
cz (`numpy.ndarray`_, optional):
Vector with measured :math:`cz` velocities used to deshift
the spectra before stacking. See :func:`register`.
log (:obj:`bool`, optional):
Flag that the wavelength vector is geometrically stepped
in wavelength.
base (:obj:`float`, optional):
If the wavelength vector is geometrically stepped, this
is the base of the logarithmic step.
covariance_mode (:obj:`str`, optional):
Keyword for method to use for dealing with covariance;
see :func:`covariance_mode_options`.
covar (:obj:`float`, :class:`mangadap.util.covariance.Covariance`, optional):
Covariance object to use, which must match the
expectation from the covariance mode. See
:func:`covariance_mode_options` and
:func:`_check_covariance_type`.
keep_range (:obj:`float`, optional):
When registering the wavelengths of the shifted spectra,
keep the identical spectral range as input.
Returns:
:obj:`tuple`: Returns seven objects: the wavelength
vector, the stacked flux, the standard deviation in the
stacked flux, the number of spectra in each stacked
pixel, the stacked inverse variance, the stacked spectral
resolution, and the stacked covariance.
Raises:
ValueError:
Raised if the wavelength vector is not
one-dimensional, if the flux array is not
two-dimensional, if the ancillary arrays (inverse
variance, weights, mask) do not have the same shape
as the flux array, if any of the weights are
negative, or if the number of wavelengths does not
match the second axis of the flux array. Also raised
if the covariance mode is not recognized; see
:func:`covariance_mode_options`.
TypeError:
Raised if the object type of ``covar`` does not match
what is expected given ``covariance_mode``.
NotImplementedError:
Raised if the requesting to both velocity register
the spectra and correct the error vectors for spatial
covariance.
"""
# Check the input shapes
if len(wave.shape) != 1:
raise ValueError('Input wavelength vector must be one-dimensional!')
nwave = wave.size
if len(flux.shape) != 2:
raise ValueError('Can only stack two-dimensional matrices.')
if flux.shape[1] != nwave:
raise ValueError('Flux array shape does not match the wavelength data.')
if ivar is not None and flux.shape != ivar.shape:
raise ValueError('Shape of the inverse-variance array must match the flux array.')
if mask is not None and flux.shape != mask.shape:
raise ValueError('Shape of the mask array must match the flux array.')
if sres is not None and sres.shape != wave.shape and sres.shape != flux.shape:
raise ValueError('Input spectral resolution data has incorrect shape.')
nspec = flux.shape[0]
if binid is not None and binid.size != nspec:
raise ValueError('Length of binid must match the number of input spectra.')
if binwgt is not None:
if binwgt.size != nspec:
raise ValueError('Length of binwgt must match the number of input spectra.')
if numpy.any(binwgt < 0):
raise ValueError('Weights cannot be negative.')
# Check that the covariance data is correct
if covariance_mode is not None \
and covariance_mode not in SpectralStack.covariance_mode_options():
raise ValueError('Unrecognized method for covariance: {0}'.format(covariance_mode))
if covariance_mode is None:
covariance_mode = 'none'
if not SpectralStack._check_covariance_type(covariance_mode, covar, ivar):
raise TypeError('Incorrect covariance and/or inverse variance object type for input ' \
'mode:\n mode: {0}\n input covar type: {1}\n input ivar' \
' type: {2}'.format(covariance_mode, type(covar), type(ivar)))
if not SpectralStack._check_covariance_shape(covariance_mode, covar, nwave, nspec):
raise ValueError('Covariance object has incorrect shape for use with specified mode.')
if isinstance(covar, Covariance) and cz is not None:
raise NotImplementedError('Currently cannot both velocity register and apply ' \
'covariance matrix calculation!')
# Get the masked, velocity registered flux and inverse variance
# arrays
if cz is None:
_mask = SpectralStack._get_input_mask(flux, ivar=ivar, mask=mask)
_flux = numpy.ma.MaskedArray(flux, mask=_mask)
_ivar = None if ivar is None else numpy.ma.MaskedArray(ivar, mask=_mask)
_sres = SpectralStack._check_input_sres(sres, flux.shape[0])
self.wave = wave
else:
self.wave, _flux, _ivar, _sres = self.register(wave, cz, flux, ivar=ivar, mask=mask,
sres=sres, log=log, base=base,
keep_range=keep_range)
# Calculate the transfer matrix
self._set_rebin_transfer_matrix(numpy.zeros(nspec, dtype=int)
if binid is None else binid, binwgt=binwgt)
# Stack the spectra with or without covariance
if covariance_mode == 'none':
self._stack_without_covariance(_flux, ivar=_ivar, sres=_sres)
else:
self._stack_with_covariance(_flux, covariance_mode, covar, ivar=_ivar, sres=_sres)
# Calculate the standard deviation in the flux, even if the flux
# operation is to sum the data
self.fluxmean = self.flux/self.npix
self.fluxsdev = numpy.ma.sqrt((self.fluxsqr/self.npix - numpy.square(self.fluxmean))
* self.npix*numpy.ma.power((self.npix-1), -1.))
# Interpolate across any masked pixels in the spectral
# resolution array, if provided and requested.
if self.sres is not None:
if numpy.sum(self.sres.mask) == 0:
self.sres = self.sres.data
else:
outshape = self.sres.shape
self.sres = numpy.apply_along_axis(interpolate_masked_vector, 1,
self.sres.reshape(1,-1) if self.sres.ndim == 1
else self.sres.reshape(outshape[0], -1)
).reshape(outshape)
# Return the stacked data
if operation == 'sum':
return self.wave, self.flux, self.fluxsdev, self.npix, self.ivar, self.sres, self.covar
return self._get_stack_mean()