"""
A class hierarchy that perform that spatially bins a set of
two-dimensional data.
The base class allows for user-defined definitions of binning
procedures.
----
.. include license and copyright
.. include:: ../include/copy.rst
----
.. include common links, assuming primary doc root is up one directory
.. include:: ../include/links.rst
"""
import inspect
from pathlib import Path
import warnings
import logging
from IPython import embed
import numpy
from scipy import spatial
from astropy.io import fits
#from ..datacube import DataCube
from ..par.parset import KeywordParSet, ParSet
from ..util.datatable import DataTable
from ..util.fitsutil import DAPFitsUtil
from ..util.fileio import create_symlink
from ..util.dapbitmask import DAPBitMask
from ..util.pixelmask import SpectralPixelMask
from ..util.sampling import angstroms_per_pixel
from ..util.covariance import Covariance
from ..util.extinction import GalacticExtinction
from ..util.log import log_output
from . import spatialbinning
# TODO: Maybe don't need this? I.e., only used for type checking
from .reductionassessments import ReductionAssessment
from .spectralstack import SpectralStackPar, SpectralStack
from .util import replace_with_data_from_nearest_coo
[docs]class SpatiallyBinnedSpectraDef(KeywordParSet):
"""
A class that holds the two parameter sets and the key designator
for the binning scheme.
The provided ``binfunc`` method must have a call with the
following form::
id = bin(x, y, par=par)
where ``id`` is the index of the bin to place each spectrum and
``x`` and ``y`` are the on-sky Cartesian coordinates;
e.g.::
x = ReductionAssessment.hdu['SPECTRUM'].data['SKY_COO'][:,0]
y = ReductionAssessment.hdu['SPECTRUM'].data['SKY_COO'][:,1]
The provided ``stackfunc`` method must have a call with the
following form::
stack_wave, stack_flux, stack_sdev, stack_npix, stack_ivar, \
stack_sres, stack_covar = stack(cube, id, par=par)
where ``cube`` is a :class:`mangadap.datacube.datacube.DataCube`
object. Note that the wavelengths are not returned because the
input and output wavelength ranges are expected to be the same!
As long as they are mutable, the values in ``par`` can change,
meaning that some products of the bin algorithm can be passed to
the stack algorithm. For example, if you want to weight the
inclusion of the spectrum in the bin, you'd have to provide both
the binning and stacking routines. Actually, if that's the case,
you're better off providing a class object that will both bin and
stack the spectra!
The defined parameters are:
.. include:: ../tables/spatiallybinnedspectradef.rst
"""
def __init__(self, key='SPX', galactic_reddening='ODonnell', galactic_rv=3.1,
minimum_snr=1.0, minimum_frac=0.8, binpar=None, binclass=None, binfunc=None,
stackpar=None, stackclass=None, stackfunc=None, overwrite=False):
# Use the signature to get the parameters and the default values
sig = inspect.signature(self.__class__)
pars = list(sig.parameters.keys())
defaults = [sig.parameters[key].default for key in pars]
# Remaining definitions done by hand
in_fl = [ int, float ]
par_opt = [ ParSet, dict ]
values = [key, galactic_reddening, galactic_rv, minimum_snr, minimum_frac,
binpar, binclass, binfunc, stackpar, stackclass, stackfunc, overwrite]
dtypes = [str, str, in_fl, in_fl, in_fl, par_opt, None, None, par_opt, None, None, bool]
can_call = [False, False, False, False, False, False, False, True, False, False, True,
False]
descr = ['Keyword used to distinguish between different spatial binning schemes.',
'The string identifier for the Galactic extinction curve to use. See ' \
':func:`~mangadap.util.extinction.GalacticExtinction.valid_forms` for the ' \
'available curves.',
'Ratio of V-band extinction to the B-V reddening.',
'Minimum S/N of spectra to include in any bin.',
'Minimum fraction of unmasked pixels in each spectrum included in any bin.',
'The spatial-binning parameters.',
'Instance of the spatial-binning class. Needed in case binfunc ' \
'is a non-static member function of the class.',
'The spatial-binning function that determines which spectra go into each bin.',
'The spectral-stacking parameter set.',
'Instance of spectral-stacking class to use. Needed in case ' \
'stackfunc is a non-static member function of the class.',
'The spectral-stacking function that stacks the spectra in a given bin.',
'If the output file already exists, redo all the calculations and overwrite it.']
super().__init__(pars, values=values, defaults=defaults, dtypes=dtypes, can_call=can_call,
descr=descr)
[docs] @classmethod
def from_dict(cls, d):
"""
Instantiate the object from a nested dictionary.
The dictionary can have any of the keywords that are identical to the
instantiation keyword arguments, except for the spatial binning and
spectral stacking parameters. These are assumed to be construced from a
set of nested dictionaries. The keywords/nested dictionaries used to
define these parameters are:
- ``'spatial_method'``: selects a string that identifies the
spatial binning method. If not present, no spatial binning is
performed.
- ``'spatial'``: selects a nested dictionary with the parameters
needed to instantiate the spatial binning method. May not be
necessary for some binning methods. If not present, will use
defaults for binning method selected.
- ``'spectral'``: selects a nested dictionary with spectral stacking
parameters. If not present, will use defaults.
Args:
d (:obj:`dict`):
Dictionary used to instantiate the class.
"""
# Copy over the primary keywords.
# TODO: Get these from inspection of the instatiation signature!
_d = {}
for key in ['key', 'galactic_reddening', 'galactic_rv', 'minimum_snr', 'minimum_frac',
'overwrite']:
if key in d.keys():
_d[key] = d[key]
# Set the spatial binning method
if 'spatial_method' not in d or d['spatial_method'] in ['none', None]:
# Do not bin!
_d['binpar'] = None
_d['binclass'] = None
_d['binfunc'] = None
elif d['spatial_method'] == 'global':
_d['binpar'] = None
_d['binclass'] = spatialbinning.GlobalBinning()
_d['binfunc'] = _d['binclass'].bin_index
elif d['spatial_method'] == 'radial':
_d['binpar'] = spatialbinning.RadialBinningPar.from_dict(d['spatial']) \
if 'spatial' in d else spatialbinning.RadialBinningPar()
_d['binclass'] = spatialbinning.RadialBinning()
_d['binfunc'] = _d['binclass'].bin_index
elif d['spatial_method'] == 'voronoi':
_d['binpar'] = spatialbinning.VoronoiBinningPar.from_dict(d['spatial']) \
if 'spatial' in d else spatialbinning.VoronoiBinningPar()
_d['binclass'] = spatialbinning.VoronoiBinning()
_d['binfunc'] = _d['binclass'].bin_index
elif d['spatial_method'] == 'square':
_d['binpar'] = spatialbinning.SquareBinningPar.from_dict(d['spatial']) \
if 'spatial' in d else spatialbinning.SquareBinningPar()
_d['binclass'] = spatialbinning.SquareBinning()
_d['binfunc'] = _d['binclass'].bin_index
else:
raise ValueError('Unrecognized binning method. Requires direct instantiation or '
'code alterations.')
# Set the spectral binning method
_d['stackpar'] = SpectralStackPar.from_dict(d['spectral']) if 'spectral' in d \
else SpectralStackPar()
_d['stackclass'] = SpectralStack()
_d['stackfunc'] = _d['stackclass'].stack_datacube
# Return the instantiation
return super().from_dict(_d)
[docs]class SpatiallyBinnedSpectraBitMask(DAPBitMask):
r"""
Derived class that specifies the mask bits for the spatially binned
spectra. The maskbits defined are:
.. include:: ../tables/spatiallybinnedspectrabitmask.rst
"""
cfg_root = 'spatially_binned_spectra_bits'
[docs]class SpatiallyBinnedSpectraDataTable(DataTable):
"""
Primary data table with the results of the spatial binning.
Table includes:
.. include:: ../tables/spatiallybinnedspectradatatable.rst
.. todo::
There currently is no mask; however, could add masks for:
- Insufficient good wavelength channels in the spectrum.
- Variance in flux in bin is too large.
Args:
shape (:obj:`int`, :obj:`tuple`, optional):
The shape of the initial array. If None, the data array
will not be instantiated; use :func:`init` to initialize
the data array after instantiation.
"""
def __init__(self, shape=None):
# NOTE: This should require python 3.7 to make sure that this
# is an "ordered" dictionary.
datamodel = dict(BINID=dict(typ=int, shape=None, descr='Bin ID number'),
NBIN=dict(typ=int, shape=None, descr='Number of spaxels in the bin'),
SKY_COO=dict(typ=float, shape=(2,),
descr='The mean on-sky coordinates of the binned spaxels.'),
LW_SKY_COO=dict(typ=float, shape=(2,),
descr='The luminosity-weighted mean on-sky coordinates '
'of the binned spaxels.'),
ELL_COO=dict(typ=float, shape=(2,),
descr='The mean elliptical coordinates of the binned '
'spaxels.'),
LW_ELL_COO=dict(typ=float, shape=(2,),
descr='The luminosity-weighted mean elliptical '
'coordinates of the binned spaxels.'),
AREA=dict(typ=float, shape=None, descr='Total on-sky area of the bin'),
AREA_FRAC=dict(typ=float, shape=None,
descr='Fraction of the expected area covered by good '
'spaxels (not relevant to all binning schemes)'),
SIGNAL=dict(typ=float, shape=None,
descr='Mean flux per pixel in the binned spectrum.'),
VARIANCE=dict(typ=float, shape=None,
descr='Mean variance per pixel in the binned spectrum.'),
SNR=dict(typ=float, shape=None,
descr='Mean S/N per pixel in the binned spectrum.'))
keys = list(datamodel.keys())
super(SpatiallyBinnedSpectraDataTable,
self).__init__(keys, [datamodel[k]['typ'] for k in keys],
element_shapes=[datamodel[k]['shape'] for k in keys],
descr=[datamodel[k]['descr'] for k in keys],
shape=shape)
[docs]class SpatiallyBinnedSpectra:
r"""
Class that holds spatially binned spectra.
Args:
method (:class:`~mangadap.proc.spatiallybinnedspectra.SpatiallyBinnedSpectra`):
Object that defines the main parameters used for the spatial
binning.
cube (:class:`mangadap.datacube.datacube.DataCube`):
The datacube with the spectra to bin.
rdxqa (:class:`mangadap.proc.reductionassessments.ReductionAssessments`):
The basic assessments of the datacube that are used for
the binning procedures.
reff (:obj:`float`, optional):
The effective radius of the galaxy in arcsec.
output_path (:obj:`str`, `Path`_, optional):
The path for the output file. If None, the current working
directory is used.
output_file (:obj:`str`, optional):
The name of the output "reference" file. The full path of the output
file will be :attr:`directory_path`/:attr:`output_file`. If None,
the default is to combine ``cube.output_root`` and the method keys
of both ``rdxqa`` and the one for this class. The order of the keys
is the order of operations (rdxqa key, then the binning key). See
:func:`~mangadap.proc.spatiallybinnedspectra.SpatiallyBinnedSpectra.default_paths`.
hardcopy (:obj:`bool`, optional):
Flag to write the `astropy.io.fits.HDUList`_
(:attr:`hdu`) to disk. If False, the object data is only
kept in memory.
symlink_dir (:obj:`str`, optional):
Create a symbolic link to the created file in the
supplied directory. If None, no symbolic link is created.
overwrite (:obj:`bool`, optional):
Overwrite any existing files. If False, any existing
files will be used. If True, the analysis is redone and
any existing output is overwritten.
checksum (:obj:`bool`, optional):
Use the checksum in the fits header to confirm that the
data has not been corrupted. The checksum is **always**
written to the fits header when the file is created.
loggers (:obj:`list`, optional):
List of `logging.Logger`_ objects to log progress;
ignored if quiet=True. Logging is done using
:func:`mangadap.util.log.log_output`. If None, no logging
is performed and output is just written to ``stdout``.
quiet (:obj:`bool`, optional):
Suppress all terminal and logging output.
Attributes:
loggers (:obj:`list`):
List of `logging.Logger`_ objects to log progress;
ignored if quiet=True. Logging is done using
:func:`mangadap.util.log.log_output`.
quiet (:obj:`bool`):
Suppress all terminal and logging output.
.. todo::
- Allow velocity offsets for registration.
- Fill in attributes.
"""
def __init__(self, method, cube, rdxqa, reff=None, ebv=None, output_path=None,
output_file=None, hardcopy=True, symlink_dir=None, overwrite=False,
checksum=False, loggers=None, quiet=False):
self.loggers = None
self.quiet = False
# Define the method properties
self.method = method
if not isinstance(self.method, SpatiallyBinnedSpectraDef):
raise TypeError('Method must have type SpatiallyBinnedSpectraDef.')
self.cube = None
self.rdxqa = None
self.reff = None
self.ebv = None
self.galext = None
# Define the output directory and file
self.directory_path = None # Set in default__paths
self.output_file = None
self.hardcopy = None
self.symlink_dir = None
# Define the bitmask
self.bitmask = SpatiallyBinnedSpectraBitMask()
# Initialize the main class attributes
self.hdu = None
self.checksum = checksum
self.shape = None
self.spatial_shape = None
self.nspec = None
self.spatial_index = None
self.spectral_arrays = None
self._assign_spectral_arrays()
self.image_arrays = None
self._assign_image_arrays()
self.nwave = None
self.nbins = None
self.missing_bins = None
self.covariance = None
# Bin the spectra
self.bin_spectra(cube, rdxqa, reff=reff, ebv=ebv, output_path=output_path,
output_file=output_file, hardcopy=hardcopy, symlink_dir=symlink_dir,
overwrite=overwrite, loggers=loggers, quiet=quiet)
def __getitem__(self, key):
return self.hdu[key]
[docs] @staticmethod
def default_paths(cube, method_key, rdxqa_method, output_path=None, output_file=None):
"""
Set the default directory and file name for the output file.
Args:
cube (:class:`mangadap.datacube.datacube.DataCube`):
Datacube to analyze.
method_key (:obj:`str`):
Keyword designating the method used for the reduction
assessments.
rdxqa_method (:obj:`str`):
The method key for the basic assessments of the datacube.
output_path (:obj:`str`, `Path`_, optional):
The path for the output file. If None, the current working
directory is used.
output_file (:obj:`str`, optional):
The name of the output "reference" file. The full path of the
output file will be :attr:`directory_path`/:attr:`output_file`.
If None, the default is to combine ``cube.output_root`` and the
method keys of both ``rdxqa`` and the one for this class. The
order of the keys is the order of operations (rdxqa key, then
the binning key).
Returns:
:obj:`tuple`: Returns a `Path`_ with the output directory and a
:obj:`str` with the output file name.
"""
directory_path = Path('.').resolve() if output_path is None \
else Path(output_path).resolve()
method = f'{rdxqa_method}-{method_key}'
_output_file = f'{cube.output_root}-{method}.fits.gz' if output_file is None \
else output_file
return directory_path, _output_file
[docs] def _finalize_cube_mask(self, mask):
"""
Finalize the mask after the 2D mask has been reconstructed
into a 3D cube.
This mostly handles the masks for regions outside the
datacube field of view.
This propagates the bits flagging pixels that should not be
included in the stacked spectra, and sets the the LOW_SPECCOV
and LOW_SNR bits based on the results from
:func:`check_fgoodpix` and :func:`_check_snr`.
Note that the input mask is both edited in-place and
returned.
.. todo::
This needs to be abstracted for non-DRP datacubes.
Args:
mask (`numpy.ndarray`_):
3D array with the current bitmask data.
Returns:
`numpy.ndarray`_: Edited bitmask data.
"""
# Get the spaxels with good spectral coverage and S/N
good_fgoodpix = self.check_fgoodpix()
good_snr = self._check_snr()
if self.cube.bitmask is not None:
# Turn on the flag stating that the pixel wasn't used
indx = self.cube.bitmask.flagged(self.cube.mask, flag=self.cube.do_not_stack_flags())
mask[indx] = self.bitmask.turn_on(mask[indx], 'DIDNOTUSE')
# Turn on the flag stating that the pixel has a foreground star
if self.cube.propagate_flags() is not None:
flags = self.cube.propagate_flags()
# TODO: This will barf if flags is a numpy array
flags = flags if isinstance(flags, list) else [flags]
for flag in flags:
indx = self.cube.bitmask.flagged(self.cube.mask, flag=flag)
if numpy.any(indx):
if flag in self.bitmask.keys():
mask[indx] = self.bitmask.turn_on(mask[indx], flag) #'FORESTAR')
warnings.warn(f'{flag} is not a flag in {self.bitmask.__class__.__name__} '
'and cannot be propagated!')
# Turn on the flag stating that the number of valid channels in
# the spectrum was below the input fraction.
indx = numpy.array([numpy.invert(good_fgoodpix).reshape(self.spatial_shape).T]*self.nwave).T
mask[indx] = self.bitmask.turn_on(mask[indx], flag='LOW_SPECCOV')
# Turn on the flag stating that the S/N in the spectrum was
# below the requested limit
indx = numpy.array([numpy.invert(good_snr).reshape(self.spatial_shape).T]*self.nwave).T
mask[indx] = self.bitmask.turn_on(mask[indx], flag='LOW_SNR')
return mask
[docs] def _check_snr(self):
"""
Determine which spectra in :attr:`rdxqa` have a S/N greater than
the minimum set by :attr:`method`. Only these spectra will be
included in the binning.
Returns:
`numpy.ndarray`_: Boolean array for the spectra that
satisfy the criterion.
"""
return self.rdxqa['SPECTRUM'].data['SNR'] > self.method['minimum_snr']
[docs] def _assign_spectral_arrays(self):
"""
Set :attr:`spectral_arrays`, which contains the list of
extensions in :attr:`hdu` that contain spectral data.
"""
self.spectral_arrays = ['FLUX', 'IVAR', 'MASK', 'SPECRES', 'FLUXD', 'NPIX']
[docs] def _assign_image_arrays(self):
"""
Set :attr:`image_arrays`, which contains the list of extensions
in :attr:`hdu` that are on-sky image data.
"""
self.image_arrays = ['BINID']
[docs] @staticmethod
def _get_missing_bins(unique_bins):
"""Return bin IDs omitted from a sequential list."""
return list(set(numpy.arange(numpy.amax(unique_bins)+1)) - set(unique_bins))
[docs] def _unbinned_data_table(self, bin_indx):
"""
Construct the output data table for the unbinned spectra.
Args:
bin_indx (`numpy.ndarray`_):
The integer vector with the bin associated with each
spectrum in the DRP cube. This is the flattened
``BINID`` array.
Returns:
`numpy.recarray`_: The record array that is put in the
BINS extension of :attr:`hdu`.
"""
# Find the spectra that have a bin ID
good_spec = bin_indx > -1
self.nbins = numpy.amax(bin_indx)+1
self.missing_bins = [] # No missing bins
# Copy the data from the ReductionAssessments object
bin_data = SpatiallyBinnedSpectraDataTable(shape=self.nbins)
bin_data['BINID'] = numpy.arange(self.nbins)
bin_data['NBIN'] = numpy.ones(self.nbins, dtype=int)
bin_data['SKY_COO'][:,0] = self.rdxqa['SPECTRUM'].data['SKY_COO'][good_spec,0]
bin_data['SKY_COO'][:,1] = self.rdxqa['SPECTRUM'].data['SKY_COO'][good_spec,1]
bin_data['LW_SKY_COO'] = bin_data['SKY_COO'].copy()
bin_data['ELL_COO'][:,0] = self.rdxqa['SPECTRUM'].data['ELL_COO'][good_spec,0]
bin_data['ELL_COO'][:,1] = self.rdxqa['SPECTRUM'].data['ELL_COO'][good_spec,1]
bin_data['LW_ELL_COO'] = bin_data['ELL_COO'].copy()
if self.cube.pixelscale is None:
if self.cube.wcs is not None:
self.cube._get_pixelscale()
else:
raise ValueError('Must provide datacube spatial pixelscale, or a WCS so that it '
'can be calculated.')
bin_data['AREA'] = numpy.full(self.nbins, numpy.square(self.cube.pixelscale), dtype=float)
bin_data['AREA_FRAC'] = numpy.ones(self.nbins, dtype=int)
bin_data['SIGNAL'] = self.rdxqa['SPECTRUM'].data['SIGNAL'][good_spec]
bin_data['VARIANCE'] = self.rdxqa['SPECTRUM'].data['VARIANCE'][good_spec]
bin_data['SNR'] = self.rdxqa['SPECTRUM'].data['SNR'][good_spec]
return bin_data
[docs] def _binned_data_table(self, bin_indx, stack_flux, stack_ivar, per_pixel=True):
r"""
Construct the output data table for the binned spectra.
Args:
bin_indx (`numpy.ndarray`_):
The integer vector with the bin associated with each
spectrum in the DRP cube. This is the flattened
``BINID`` array.
stack_flux (`numpy.ndarray`_):
The stacked spectra with shape :math:`N_{\rm
spec}\times\N_\lambda}`.
stack_ivar (`numpy.ndarray`_):
The stacked inverse variance with shape :math:`N_{\rm
spec}\times\N_\lambda}`.
per_pixel (:obj:`bool`, optional):
Base the flux statistics on per-pixel measurements.
Set to False for a per-angstrom calculation.
Returns:
`numpy.recarray`_: The record array that is put in the
``BINS`` extension of :attr:`hdu`.
"""
# Get the unique bins and the number of spectra in each bin
unique_bins, bin_count = numpy.unique(bin_indx, return_counts=True)
if unique_bins[0] == -1:
unique_bins = unique_bins[1:]
bin_count = bin_count[1:]
# Get the number of returned bins:
# - The number of returned bins MAY NOT BE THE SAME as the
# number of requested bins. E.g., if radial bins were
# requested out to 2.5 Reff, when coverage only goes to 1.5
# Reff.
# - Bins with no spectra are not included in the data table!
# - Missing bins are only identified as those without indices in
# the range of provided bin numbers.
self.nbins = len(unique_bins)
self.missing_bins = SpatiallyBinnedSpectra._get_missing_bins(unique_bins)
# Intialize the data for the binned spectra
bin_data = SpatiallyBinnedSpectraDataTable(shape=self.nbins)
bin_data['BINID'] = unique_bins.astype(int)
bin_data['NBIN'] = bin_count.astype(int)
# Recalculate the mean signal, mean noise, and mean S/N of the
# binned spectra
if stack_ivar is None:
warnings.warn('No inverse variance for stack. Errors set to unity.')
_wavelength_mask = SpectralPixelMask(waverange=self.rdxqa.method['waverange']
).boolean(self.cube.wave)
_mask = numpy.ma.getmaskarray(stack_flux) | _wavelength_mask[None,:]
if stack_ivar is not None:
_mask |= numpy.ma.getmaskarray(stack_ivar)
_stack_flux = numpy.ma.MaskedArray(stack_flux.data, mask=_mask)
# TODO: Somehow consolidate with
# mangadap.datacube.datacube.DataCube.flux_stats?
# Set the response function
dw = numpy.ones(self.cube.nwave, dtype=float) if per_pixel \
else angstroms_per_pixel(self.cube.wave, log=self.cube.log)
_response_func = self.cube.interpolate_to_match(self.rdxqa.method.response)
# Get the signal
response_integral = numpy.sum(numpy.invert(numpy.ma.getmaskarray(_stack_flux))
* (_response_func*dw)[None,:], axis=1)
bin_data['SIGNAL'] = numpy.ma.divide(numpy.ma.sum(_stack_flux*(_response_func*dw)[None,:],
axis=1), response_integral).filled(0.0)
# And the variance and SNR, if the inverse variance is available
if stack_ivar is not None:
_stack_ivar = numpy.ma.MaskedArray(stack_ivar.data, mask=_mask)
bin_data['VARIANCE'] = numpy.ma.divide(numpy.ma.sum(numpy.ma.power(_stack_ivar, -1.) \
* (_response_func*dw)[None,:], axis=1),
response_integral).filled(0.0)
bin_data['SNR'] = numpy.ma.divide(numpy.ma.sum(_stack_flux * numpy.ma.sqrt(_stack_ivar)
* (_response_func*dw)[None,:], axis=1),
response_integral).filled(0.0)
del _stack_ivar
del _mask, _stack_flux
# Sort the list of bin ids and determine if any spectra jump
# between bins
srt = numpy.argsort(bin_indx)
bin_change = numpy.where(numpy.diff(bin_indx[srt]) > 0)[0] + 1
if bin_change.size != self.nbins:
bin_change = numpy.append([0], bin_change)
# Some convenience reference variables
x = self.rdxqa['SPECTRUM'].data['SKY_COO'][:,0]
y = self.rdxqa['SPECTRUM'].data['SKY_COO'][:,1]
r = self.rdxqa['SPECTRUM'].data['ELL_COO'][:,0]
t = self.rdxqa['SPECTRUM'].data['ELL_COO'][:,1]
s = self.rdxqa['SPECTRUM'].data['SIGNAL']
# Get the signal-weighted on-sky coordinates
wsum = numpy.add.reduceat(s[srt], bin_change)
bin_data['LW_SKY_COO'][:,0] = numpy.add.reduceat((x*s)[srt], bin_change)/wsum
bin_data['LW_SKY_COO'][:,1] = numpy.add.reduceat((y*s)[srt], bin_change)/wsum
bin_data['LW_ELL_COO'][:,0] = numpy.add.reduceat((r*s)[srt], bin_change)/wsum
bin_data['LW_ELL_COO'][:,1] = numpy.add.reduceat((t*s)[srt], bin_change)/wsum
# ... and the unweighted on-sky coordinates
bin_data['SKY_COO'][:,0] = numpy.add.reduceat(x[srt], bin_change)/bin_data['NBIN']
bin_data['SKY_COO'][:,1] = numpy.add.reduceat(y[srt], bin_change)/bin_data['NBIN']
bin_data['ELL_COO'][:,0] = numpy.add.reduceat(r[srt], bin_change)/bin_data['NBIN']
bin_data['ELL_COO'][:,1] = numpy.add.reduceat(t[srt], bin_change)/bin_data['NBIN']
# The mean azimuth can go wrong when bins cross the major axis
_wt = numpy.add.reduceat(((t+180)*s)[srt], bin_change)/wsum
indx = numpy.absolute(_wt-180-bin_data['LW_ELL_COO'][:,1]) > 1.0 # HARDWIRED
bin_data['LW_ELL_COO'][indx,1] = _wt[indx]-180
_bt = numpy.add.reduceat(t[srt]+180, bin_change) / bin_data['NBIN']
indx = numpy.absolute(_bt-180-bin_data['ELL_COO'][:,1]) > 1.0 # HARDWIRED
bin_data['ELL_COO'][indx,1] = _bt[indx]-180
# Compute the area covered by each bin
bin_data['AREA'] = self.cube.binned_on_sky_area(bin_indx)[1]
# Calculate the fractional area of the bin covered by the
# spectra, if possible; if not, the fractional area is unity
try:
bin_total_area = self.method['binclass'].bin_area()[unique_bins]
except AttributeError as e:
if not self.quiet:
warnings.warn('Could not calculate nominal bin area:: '
'AttributeError: {0}'.format(e))
bin_total_area = bin_data['AREA']
bin_data['AREA_FRAC'] = bin_data['AREA']/bin_total_area
return bin_data
[docs] def _apply_reddening(self, flux, ivar, sdev, covar, deredden=True):
"""
Correct the spectra for Galactic reddening.
Largely a wrapper for executing
:func:`mangadap.util.extinction.GalacticExtinction.apply` on
the provided arrays.
If the reddening law is undefined (:attr:`galext.form` is
None), the method simply returns the input.
Otherwise, the input arrays are both modified directly and
returned.
Args:
flux (`numpy.ndarray`_):
Flux array
ivar (`numpy.ndarray`_):
Inverse variance array. Can be None.
sdev (`numpy.ndarray`_):
The standard deviation of the stacked spectra, if
relevant to ``flux``. Can be None.
covar (:class:`mangadap.util.covariance.Covariance`):
Spatial covariance in the binned spectra. Assumed to
be a 3D covariance cube. Can be None.
Returns:
:obj:`tuple`: Returns four `numpy.ndarray`_ objects with
the reddening corrected flux, inverse variance, standard
deviation in the stack, and spatial covariance. Any of
the latter three can be None if the corresponding input
is None.
"""
if self.galext.form is None:
return flux, ivar, sdev, covar
# NOTE: I'm doing some gymnastics here to make sure that the
# masked values are altered by the reddening, but that they're
# still masked on output.
_flux = flux.data if isinstance(flux, numpy.ma.MaskedArray) else flux.copy()
_ivar = None if ivar is None \
else (ivar.data if isinstance(ivar, numpy.ma.MaskedArray) else ivar.copy())
_sdev = None if sdev is None \
else (sdev.data if isinstance(sdev, numpy.ma.MaskedArray) else sdev.copy())
_covar = None if covar is None else covar.copy()
# Apply the reddening correction
_flux, _ivar = self.galext.apply(_flux, ivar=_ivar, deredden=deredden)
if sdev is not None:
_sdev = self.galext.apply(_sdev, deredden=deredden)
if covar is not None:
for i,j in enumerate(_covar.input_indx):
_covar.cov[i] = _covar.cov[i] * numpy.square(self.galext.redcorr[j]) if deredden \
else _covar.cov[i] / numpy.square(self.galext.redcorr[j])
if isinstance(flux, numpy.ma.MaskedArray):
_flux = numpy.ma.MaskedArray(_flux, mask=numpy.ma.getmaskarray(flux).copy())
if _ivar is not None and isinstance(ivar, numpy.ma.MaskedArray):
_ivar = numpy.ma.MaskedArray(_ivar, mask=numpy.ma.getmaskarray(ivar).copy())
if _sdev is not None and isinstance(_sdev, numpy.ma.MaskedArray):
_sdev = numpy.ma.MaskedArray(_sdev, mask=numpy.ma.getmaskarray(sdev).copy())
return _flux, _ivar, _sdev, _covar
# TODO: Allow both stack_sres and self.cube.sres to be None.
[docs] def _construct_2d_hdu(self, bin_indx, good_fgoodpix, good_snr, bin_data, stack_flux,
stack_sdev, stack_ivar, stack_npix, stack_mask, stack_sres, stack_covar):
r"""
Construct :attr:`hdu` that is held in memory for manipulation
of the object. See :func:`construct_3d_hdu` to convert the
object into a datacube.
Args:
bin_indx (`numpy.ndarray`_):
2D array with the bin associated with each spaxel.
good_fgoodpix (`numpy.ndarray`_):
Boolean array selecting spaxels with good spectral
coverage.
good_snr (`numpy.ndarray`_):
Boolean array selecting spaxels with good S/N.
bin_data (:class:`SpatiallyBinnedSpectraDataTable`):
Data table with relevant metadata for each binned
spectrum.
stack_flux (`numpy.ndarray`_):
Array with the stacked spectral flux. Shape is
:math:`(N_{\rm bin}, N_{\rm wave})`.
stack_sdev (`numpy.ndarray`_):
Array with the standard deviation in the stacked
spectral flux. Shape is :math:`(N_{\rm bin}, N_{\rm
wave})`.
stack_ivar (`numpy.ndarray`_):
Array with the inverse variance in the stacked
spectral flux. Shape is :math:`(N_{\rm bin}, N_{\rm
wave})`.
stack_npix (`numpy.ndarray`_):
Integer array with the number of spectral channels
that were stacked to construct the binned spectrum.
Shape is :math:`(N_{\rm bin}, N_{\rm wave})`.
stack_mask (`numpy.ndarray`_):
Integer array with the bitmasks associated with pixel
in the stacked spectra. Shape is :math:`(N_{\rm bin},
N_{\rm wave})`.
stack_sres (`numpy.ndarray`_):
Array with the spectral resolution of the stacked
spectra. Shape is :math:`(N_{\rm bin}, N_{\rm
wave})`. If None, the spectral resolution is set to
be the median of the resolution in the datacube. If
the datacube also has no spectral resolution data,
the method faults.
stack_covar (:class:`mangadap.util.covariance.Covariance`):
Spatial covariance in the stacked spectra. Can be
None.
"""
if not self.quiet:
log_output(self.loggers, 1, logging.INFO, 'Constructing hdu ...')
if stack_sres is None:
if self.cube.sres is None:
raise ValueError('Must be able to define spectral resolution.')
stack_sres = numpy.ma.MaskedArray([numpy.median(self.cube.sres.reshape(-1,
self.cube.nwave), axis=0)]*self.nbins)
self.covariance = None if stack_covar is None or not isinstance(stack_covar, Covariance) \
else stack_covar.copy()
# Initialize the headers
pri_hdr = self._initialize_primary_header()
pri_hdr = self._add_method_header(pri_hdr)
pri_hdr = self._add_reddening_header(pri_hdr)
red_hdr = fits.Header()
red_hdr = self._add_reddening_header(red_hdr)
map_hdr = DAPFitsUtil.build_map_header(self.cube.fluxhdr,
'K Westfall <westfall@ucolick.org>')
# Get the spatial map mask
# Marginalize the spectral masks over wavelength
map_mask = DAPFitsUtil.marginalize_mask(self.cube.mask,
inp_flags=self.cube.do_not_use_flags(),
inp_bitmask=self.cube.bitmask,
out_flag='DIDNOTUSE', out_bitmask=self.bitmask)
if self.cube.propagate_flags() is not None:
for flag in self.cube.propagate_flags():
map_mask = DAPFitsUtil.marginalize_mask(self.cube.mask, inp_flags=flag,
inp_bitmask=self.cube.bitmask,
out_flag=flag, out_bitmask=self.bitmask,
out_mask=map_mask)
drp_bad = map_mask > 0
# Add the spectra with low spectral coverage
indx = numpy.invert(good_fgoodpix.reshape(self.spatial_shape)) & numpy.invert(drp_bad)
map_mask[indx] = self.bitmask.turn_on(map_mask[indx], 'LOW_SPECCOV')
# Add the spectra with low S/N
indx = numpy.invert(good_snr.reshape(self.spatial_shape)) & numpy.invert(drp_bad)
map_mask[indx] = self.bitmask.turn_on(map_mask[indx], 'LOW_SNR')
# Fill the covariance HDUs
if self.covariance is not None:
pri_hdr, ivar_hdu, covar_hdu = self.covariance.output_hdus(hdr=pri_hdr)
# Save the data to the hdu attribute
self.hdu = fits.HDUList([fits.PrimaryHDU(header=pri_hdr),
fits.ImageHDU(data=stack_flux.data, name='FLUX'),
fits.ImageHDU(data=stack_ivar.data, name='IVAR'),
fits.ImageHDU(data=stack_mask, name='MASK'),
fits.ImageHDU(data=self.cube.wave, name='WAVE'),
fits.ImageHDU(data=stack_sres.data, name='SPECRES'),
fits.ImageHDU(data=self.galext.redcorr.data, header=red_hdr,
name='REDCORR'),
fits.ImageHDU(data=stack_sdev.data, name='FLUXD'),
fits.ImageHDU(data=stack_npix.data, name='NPIX'),
fits.ImageHDU(data=bin_indx, header=map_hdr, name='BINID'),
fits.ImageHDU(data=map_mask, header=map_hdr, name='MAPMASK'),
bin_data.to_hdu(name='BINS')])
# Fill the covariance matrix
if self.covariance is not None:
self.hdu += [ covar_hdu ]
[docs] def file_name(self):
"""Return the name of the output file."""
return self.output_file
[docs] def file_path(self):
"""Return the full path to the output file."""
if self.directory_path is None or self.output_file is None:
return None
return self.directory_path / self.output_file
[docs] def info(self):
return self.hdu.info()
@property
def is_unbinned(self):
"""Determine if the spectra are unbinned."""
return self.method['binpar'] is None and self.method['binclass'] is None \
and self.method['binfunc'] is None
[docs] @staticmethod
def do_not_fit_flags():
return ['DIDNOTUSE', 'FORESTAR', 'LOW_SPECCOV', 'LOW_SNR', 'NONE_IN_STACK', 'IVARINVALID']
[docs] def check_fgoodpix(self, minimum_fraction=None):
r"""
Determine which spaxels in :attr:`rdxqa` have a fractional
spectral coverage of greater than the provided minimum fraction.
Only these spectra will be included in the binning.
Args:
minimum_fraction (:obj:`float`, optional):
The minimum fraction of the spectrum that must be
valid for the spectrum to be included in any bin. If
None, ``self.method['minimum_frac']`` is used.
Returns:
`numpy.ndarray`_: Boolean array for the spectra that
satisfy the criterion. Shape is :math:`(N_{\rm
spaxel},)`.
"""
if minimum_fraction is None:
minimum_fraction = self.method['minimum_frac']
return self.rdxqa['SPECTRUM'].data['FGOODPIX'] > minimum_fraction
[docs] def above_snr_limit(self, sn_limit, original_spaxels=False, debug=False):
"""
Flag bins above a provided S/N limit.
Args:
sn_limit (:obj:`float`):
S/N threshold.
original_spaxels (:obj:`bool`, optional):
Instead of returning flags for the binned spectra S/N, return
flags for the original spaxels. Note the difference in the
returned shape!!
debug (:obj:`bool`, optional):
Run in debug mode. This just selects the first 2
spectra that meet the S/N criterion so that the rest
of the analysis is just done using those two bins.
Returns:
`numpy.ndarray`_: Boolean array selecting those bins that
meet the S/N threshold.
"""
if debug:
if original_spaxels:
raise ValueError('original_spaxels can only be true if debug is false.')
warnings.warn('You are setting all but two spectra as bad!')
test = self.hdu['BINS'].data['SNR'] > sn_limit
indx = numpy.arange(len(test))[test]
test[indx[2:]] = False
return test
return (self.rdxqa['SPECTRUM'].data['SNR'] > sn_limit) if original_spaxels \
else (self.hdu['BINS'].data['SNR'] > sn_limit)
# TODO: Need to abstract further for non-DRP cubes, and for cubes
# without a mask.
[docs] def get_mask_array_from_cube(self, select=None):
r"""
Convert the datacube mask for individual spaxels to the binned
spectra mask.
Any pixel with bit flags in the list returned by
:func:`mangadap.datacube.datacube.DataCube.do_not_stack_flags`
for :attr:`cube` are consolidated into the ``DIDNOTUSE``
flag. Any ``FORESTAR`` flags are propagated.
Args:
select (`numpy.ndarray`_, optional):
Flattened, boolean array with the spaxels in the
datacube to select. Shape is :math:`(N_{\rm
spaxel},)`. If None, all spaxels are included.
Returns:
`numpy.ndarray`_: Integer array with the consolidated
mask bits.
"""
drp_mask = self.cube.copy_to_array(attr='mask')
if select is not None:
drp_mask = drp_mask[select,:]
# Initialize
mask = numpy.zeros(drp_mask.shape, dtype=self.bitmask.minimum_dtype())
# Consolidate pixels flagged to be excluded from any
# stacking into DIDNOTUSE
if self.cube.bitmask is None:
indx = drp_mask
else:
flags = self.cube.do_not_stack_flags()
indx = self.cube.bitmask.flagged(drp_mask, flag=flags)
mask[indx] = self.bitmask.turn_on(mask[indx], 'DIDNOTUSE')
# Propagate the FORESTAR flags
if self.cube.bitmask is not None and self.cube.propagate_flags() is not None:
flags = self.cube.propagate_flags()
flags = flags if isinstance(flags, list) else [flags]
for flag in flags:
indx = self.cube.bitmask.flagged(self.cube.mask, flag=flag)
if numpy.any(indx):
if flag in self.bitmask.keys():
mask[indx] = self.bitmask.turn_on(mask[indx], flag) #'FORESTAR')
warnings.warn(f'{flag} is not a flag in {self.bitmask.__class__.__name__} '
'and cannot be propagated!')
return mask
[docs] def bin_spectra(self, cube, rdxqa, reff=None, ebv=None, output_path=None, output_file=None,
hardcopy=True, symlink_dir=None, overwrite=False, loggers=None, quiet=False):
"""
Bin and stack the spectra.
This is the core funtion of this class, constructing its main
data container, :attr:`hdu`.
.. todo::
Describe algorithm.
Args:
cube (:class:`mangadap.datacube.datacube.DataCube`):
The datacube with the spectra to bin.
rdxqa (:class:`mangadap.proc.reductionassessments.ReductionAssessments`):
The basic assessments of the datacube that are used
for the binning procedures.
reff (:obj:`float`, optional):
The effective radius of the galaxy in arcsec.
ebv (:obj:`float`, optional):
The E(B-V) Galactic reddening to be removed
output_path (:obj:`str`, `Path`_, optional):
The path for the output file. If None, the current working
directory is used.
output_file (:obj:`str`, optional):
The name of the output "reference" file. The full path of the
output file will be :attr:`directory_path`/:attr:`output_file`.
If None, the default is to combine ``cube.output_root`` and the
method keys of both ``rdxqa`` and the one for this class. The
order of the keys is the order of operations (rdxqa key, then
the binning key). See
:func:`~mangadap.proc.spatiallybinnedspectra.SpatiallyBinnedSpectra.default_paths`.
hardcopy (:obj:`bool`, optional):
Flag to write the `astropy.io.fits.HDUList`_
(:attr:`hdu`) to disk. If False, the object data is
only kept in memory.
symlink_dir (:obj:`str`, optional):
Create a symbolic link to the created file in the
supplied directory. If None, no symbolic link is
created.
overwrite (:obj:`bool`, optional):
Overwrite any existing files. If False, any existing
files will be used. If True, the analysis is redone
and any existing output is overwritten.
loggers (:obj:`list`, optional):
List of `logging.Logger`_ objects to log progress;
ignored if quiet=True. Logging is done using
:func:`mangadap.util.log.log_output`. If None, no
logging is performed and output is just written to
``stdout``.
quiet (:obj:`bool`, optional):
Suppress all terminal and logging output.
Raises:
ValueError:
Raised if ``cube`` or ``rdxqa`` is None, or if no
spectra in the datacube meet the S/N or spectral
coverage criteria. If the spectra are actually being
binned (i.e., the binning type is not 'none'), this
error is also raised if the output file cannot be
defined, if no spectra are assigned to any bin, or if
the stacking function results in a correlation matrix
instead of a full covariance matrix.
TypeError:
Raised if the input ``cube`` is not derived from
:class:`mangadap.datacube.datacube.DataCube` or if
``rdxqa`` is not a
:class:`mangadap.proc.reductionassessments.ReductionAssessment`.
"""
# Initialize the reporting
if loggers is not None:
self.loggers = loggers
self.quiet = quiet
# DataCube always needed
if cube is None:
raise ValueError('Must provide datacube object!')
# if not isinstance(cube, DataCube):
# raise TypeError('Must provide a valid DataCube object!')
# Test if the RSS file exists; cannot compute covariance if not
# TODO: this will break for a different stacking class
if self.method['stackpar']['covar_mode'] != 'none' and not cube.can_compute_covariance:
warnings.warn('Cannot determine covariance matrix! Continuing without!')
self.method['stackpar']['covar_mode'] = 'none'
# TODO: How many of these attributes should I keep vs. just use
# the cube attributes?
self.cube = cube
self.shape = self.cube.shape
self.spatial_shape = self.cube.spatial_shape
self.nspec = self.cube.nspec
self.spatial_index = self.cube.spatial_index.copy()
self.nwave = self.cube.nwave
# ReductionAssessment object always needed
if rdxqa is None:
raise ValueError('Must provide a ReductionAssessment object to bin spectra.')
if not isinstance(rdxqa, ReductionAssessment):
raise TypeError('Must provide a valid ReductionAssessment object!')
self.rdxqa = rdxqa
# Save the effective radius if provided. Only used if/when
# scaling the radii by the effective radius in the radial
# binning approach
if reff is not None:
self.reff = reff
if ebv is not None:
self.ebv = ebv
# Set the Galactic extinction correction defined by the method
self.galext = GalacticExtinction(form=self.method['galactic_reddening'],
wave=self.cube.wave,
ebv=0.0 if self.ebv is None else self.ebv,
rv=self.method['galactic_rv'])
#---------------------------------------------------------------
# Get the good spectra
# - Must have valid pixels over more than 80% of the spectral
# range
good_fgoodpix = self.check_fgoodpix()
# - Must have sufficienct S/N, as defined by the input par
good_snr = self._check_snr()
# Good spectra have both these criteria
good_spec = good_fgoodpix & good_snr
# Report
if not self.quiet:
log_output(self.loggers, 1, logging.INFO, '-'*50)
log_output(loggers, 1, logging.INFO, '{0:^50}'.format('SPATIALLY BINNING SPECTRA'))
log_output(self.loggers, 1, logging.INFO, '-'*50)
log_output(self.loggers, 1, logging.INFO,
'Total spectra: {0}'.format(len(good_fgoodpix)))
log_output(self.loggers, 1, logging.INFO,
'With 80% spectral coverage: {0}'.format(numpy.sum(good_fgoodpix)))
log_output(self.loggers, 1, logging.INFO,
'With good S/N: {0}'.format(numpy.sum(good_snr)))
log_output(self.loggers, 1, logging.INFO,
'Number of good spectra: {0}'.format(numpy.sum(good_spec)))
if self.galext.form is None:
log_output(self.loggers, 1, logging.INFO, 'Galactic dereddening not applied.')
else:
log_output(self.loggers, 1, logging.INFO,
'Galactic extinction law: {0}'.format(self.galext.form))
log_output(self.loggers, 1, logging.INFO,
'Galactic E(B-V) = {0}'.format(self.galext.ebv))
log_output(self.loggers, 1, logging.INFO,
'Galactic R(V): {0}'.format(self.galext.rv))
if numpy.sum(good_spec) == 0:
raise ValueError('No good spectra!')
#---------------------------------------------------------------
# Fill in any remaining binning parameters
# self._fill_method_par(good_spec)
if self.method['binpar'] is not None and hasattr(self.method['binpar'], 'fill'):
self.method['binpar'].fill(self.rdxqa)
_overwrite = self.method['overwrite'] if overwrite is None else overwrite
# (Re)Set the output paths
# Reset the output paths if necessary
self.directory_path, self.output_file \
= SpatiallyBinnedSpectra.default_paths(self.cube, self.method['key'],
self.rdxqa.method['key'],
output_path=output_path,
output_file=output_file)
#---------------------------------------------------------------
# No binning so just fill the hdu with the appropriate data from
# the DRP file
if self.is_unbinned:
# TODO: This is just a short cut. Would be better if I
# didn't use this. I.e., I should treat the function calls
# exactly the same, but just create binning and stacking
# classes for when the data is unbinned...
# Report data is unbinned
if not self.quiet:
log_output(self.loggers, 1, logging.INFO,
'No binning requested; analyzing DRP spaxel data directly.')
# Generate pseudo bin index
bin_indx = numpy.full(self.cube.spatial_shape, -1, dtype=int)
i = numpy.asarray(tuple(self.cube.spatial_index[good_spec]))
bin_indx[i[:,0],i[:,1]] = numpy.arange(numpy.sum(good_spec))
# Build the data arrays directly from the DRP file
flags = self.cube.do_not_stack_flags()
flux = self.cube.copy_to_masked_array(flag=flags)[good_spec,:]
sdev = numpy.ma.zeros(flux.shape, dtype=float)
ivar = self.cube.copy_to_masked_array(attr='ivar', flag=flags)[good_spec,:]
sres = self.cube.copy_to_array(attr='sres')[good_spec,:]
npix = numpy.ones(flux.shape, dtype=int)
npix[numpy.ma.getmaskarray(flux)] = 0
# Build the mask, converting from the DRP bits to the
# BinnedSpectra bits
mask = self.get_mask_array_from_cube(select=good_spec)
# Mask anything with an invalid ivar
# TODO: Not sure this is necessary
indx = numpy.logical_not(ivar > 0)
if numpy.any(indx):
flux.mask |= indx
ivar.mask |= indx
mask[indx] = self.bitmask.turn_on(mask[indx], ['IVARINVALID', 'DIDNOTUSE'])
# TODO: Does this work with any covariance mode? Don't like
# this back and forth between what is supposed to be a stack
# only function
# (SpectralStack.build_covariance_data_DRPFits) and
# something that is supposed to be independent of the details
# of the stacking implementation (SpatiallyBinnedSpectra)...
# Try to add the covariance data, if requested
covar=None
if self.method['stackpar']['covar_mode'] not in ['none', None]:
if not self.quiet:
log_output(self.loggers, 1, logging.INFO, 'Attempting to compute covariance.')
try:
covar = self.method['stackclass'].build_covariance_data(self.cube,
self.method['stackpar']['covar_mode'],
self.method['stackpar']['covar_par'])
except AttributeError as e:
if not self.quiet:
warnings.warn('Could not build covariance data:: '
'AttributeError: {0}'.format(str(e)))
covar = None
if isinstance(covar, Covariance):
covar = covar.spaxel_to_bin_covariance(bin_indx.ravel())
# Fill the table with the per-spectrum data
bin_data = self._unbinned_data_table(bin_indx.ravel())
# Deredden the spectra if the method requests it
flux, ivar, _, covar = self._apply_reddening(flux, ivar, None, covar)
# Build the internal HDUList object and covariance attribute
if (hardcopy or symlink_dir is not None) and not self.quiet:
warnings.warn('Hardcopies and symlinks of the SpatiallyBinnedSpectra object are '
'never kept when analyzing unbinned data.')
self.hardcopy = False
self.symlink_dir = None
self._construct_2d_hdu(bin_indx, good_fgoodpix, good_snr, bin_data, flux, sdev, ivar,
npix, mask, sres, covar)
# Finish
if not self.quiet:
log_output(self.loggers, 1, logging.INFO, '-'*50)
return
#---------------------------------------------------------------
# Check that the file path is defined
ofile = self.file_path()
if ofile is None:
raise ValueError('File path for output file is undefined!')
# Report
if not self.quiet:
log_output(self.loggers, 1, logging.INFO, f'Output path: {self.directory_path}')
log_output(self.loggers, 1, logging.INFO, f'Output file: {self.output_file}')
#---------------------------------------------------------------
# If the file already exists, and not overwriting, just read the
# file
self.symlink_dir = symlink_dir
if ofile.exists() and not _overwrite:
self.hardcopy = True
if not self.quiet:
log_output(self.loggers, 1, logging.INFO, 'Reading existing file')
self.read(checksum=self.checksum)
if not self.quiet and reff is not None and self.reff != reff:
warnings.warn('Provided effective radius different from available file; set ' \
'overwrite=True to overwrite.')
# Make sure the symlink exists
if self.symlink_dir is not None:
create_symlink(str(ofile), self.symlink_dir, overwrite=_overwrite)
if not self.quiet:
log_output(self.loggers, 1, logging.INFO, '-'*50)
return
#---------------------------------------------------------------
# Determine how to bin the spectra. To be included in any bin,
# the spectra must be selected as 'good' according to the
# selections made above.
if not self.quiet:
log_output(self.loggers, 1, logging.INFO, 'Binning spectra ...')
# bin_indx = numpy.full(self.nspec, -1, dtype=int)
# bin_indx[good_spec] \
bin_indx = self.method['binfunc'](self.rdxqa['SPECTRUM'].data['SKY_COO'][:,0],
self.rdxqa['SPECTRUM'].data['SKY_COO'][:,1],
gpm=good_spec, par=self.method['binpar'])
if numpy.sum(bin_indx > -1) == 0:
raise ValueError('No spectra in ANY bin!')
# # Done for testing missing bins
# warnings.warn('You\'re forcing bins 2 and 3 to be empty!')
# time.sleep(3)
# warnings.warn('Proceeding...')
# bin_indx[ (bin_indx == 2) | (bin_indx == 3) ] = 1
#---------------------------------------------------------------
# Stack the spectra in each bin
if not self.quiet:
log_output(self.loggers, 1, logging.INFO, 'Stacking spectra ...')
stack_wave, stack_flux, stack_sdev, stack_npix, stack_ivar, stack_sres, stack_covar = \
self.method['stackfunc'](self.cube, bin_indx, par=self.method['stackpar'])
if stack_covar is not None and stack_covar.is_correlation:
raise ValueError('Unexpected to have stack covariance be a correlation matrix.')
# Construct the mask
stack_mask = numpy.zeros(stack_flux.shape, dtype=self.bitmask.minimum_dtype())
indx = numpy.invert(stack_npix>0)
stack_mask[indx] = self.bitmask.turn_on(stack_mask[indx], ['NONE_IN_STACK', 'NO_STDDEV'])
indx = numpy.invert(stack_npix>1)
stack_mask[indx] = self.bitmask.turn_on(stack_mask[indx], 'NO_STDDEV')
indx = numpy.invert(stack_ivar>0)
stack_mask[indx] = self.bitmask.turn_on(stack_mask[indx], ['IVARINVALID', 'DIDNOTUSE'])
#---------------------------------------------------------------
# Fill the table with the per-bin data (BEFORE applying the
# reddening)
bin_data = self._binned_data_table(bin_indx, stack_flux, stack_ivar)
#---------------------------------------------------------------
# Deredden the spectra if the method requests it
stack_flux, stack_ivar, stack_sdev, stack_covar \
= self._apply_reddening(stack_flux, stack_ivar, stack_sdev, stack_covar)
#---------------------------------------------------------------
# Build the internal HDUList object and covariance attribute
self.hardcopy = hardcopy
self._construct_2d_hdu(bin_indx.reshape(self.spatial_shape), good_fgoodpix, good_snr,
bin_data, stack_flux, stack_sdev, stack_ivar, stack_npix,
stack_mask, stack_sres, stack_covar)
#---------------------------------------------------------------
# Write the data, if requested
if self.hardcopy:
if not self.directory_path.exists():
self.directory_path.mkdir(parents=True)
self.write(overwrite=_overwrite)
if not self.quiet:
log_output(self.loggers, 1, logging.INFO, '-'*50)
[docs] def construct_3d_hdu(self):
"""
Reformat the binned spectra into a cube matching the shape of
the datacube from which it was derived.
"""
# Report
if not self.quiet:
log_output(self.loggers, 1, logging.INFO, 'Constructing binned spectra datacube ...')
# Get/Copy the necessary data arrays
bin_indx = self.hdu['BINID'].data.copy()
stack_flux = self.hdu['FLUX'].data.copy()
stack_sdev = self.hdu['FLUXD'].data.copy()
stack_ivar = self.hdu['IVAR'].data.copy()
stack_mask = self.hdu['MASK'].data.copy()
stack_npix = self.hdu['NPIX'].data.copy()
stack_sres = self.hdu['SPECRES'].data.copy()
# Reconstruct the stacked spectra into a datacube
flux, mask, sdev, ivar, npix, sres \
= DAPFitsUtil.reconstruct_cube(self.shape, bin_indx.ravel(),
[stack_flux, stack_mask, stack_sdev, stack_ivar,
stack_npix, stack_sres ])
# TODO: Move this to the Covariance class?
if self.covariance is not None:
covariance = self.covariance.bin_to_spaxel_covariance(bin_indx.ravel())
# Finalize the mask
mask = self._finalize_cube_mask(mask)
# Primary header is identical regardless of the shape of the
# extensions
pri_hdr = self.hdu['PRIMARY'].header.copy()
cube_hdr = DAPFitsUtil.build_cube_header(self.cube, 'K Westfall <westfall@ucolick.org>')
# Fill the covariance HDUs
if self.covariance is not None:
pri_hdr, ivar_hdu, covar_hdu = covariance.output_hdus(reshape=True, hdr=pri_hdr)
# Save the data to the hdu attribute
# TODO: Strip headers of sub extensions of everything but the
# WCS and HDUCLAS information. Keep WCS information in BINID.
hdu = fits.HDUList([fits.PrimaryHDU(header=pri_hdr),
fits.ImageHDU(data=flux, header=cube_hdr, name='FLUX'),
fits.ImageHDU(data=ivar, header=cube_hdr, name='IVAR'),
fits.ImageHDU(data=mask, header=cube_hdr, name='MASK'),
self.hdu['WAVE'].copy(),
fits.ImageHDU(data=sres, header=cube_hdr, name='SPECRES'),
self.hdu['REDCORR'].copy(),
fits.ImageHDU(data=sdev, header=cube_hdr, name='FLUXD'),
fits.ImageHDU(data=npix, header=cube_hdr, name='NPIX'),
self.hdu['BINID'].copy(),
self.hdu['BINS'].copy()
])
# Fill the covariance matrix
if self.covariance is not None:
self.hdu += [ covar_hdu ]
return hdu
[docs] def write(self, match_datacube=False, overwrite=False):
"""
Write the hdu object to the file.
Args:
match_datacube (:obj:`bool`, optional):
Match the shape of the data arrays to the input
datacube. I.e., convert them to 3D and replicate the
binned spectrum to each spaxel in the bin.
overwrite (:obj:`bool`, optional):
Overwrite any existing file.
"""
# Convert the spectral arrays in the HDU to a 3D cube and write
# it
if match_datacube:
hdu = self.construct_3d_hdu()
DAPFitsUtil.write(hdu, str(self.file_path()), overwrite=overwrite, checksum=True,
symlink_dir=self.symlink_dir, loggers=self.loggers, quiet=self.quiet)
return
# Just write the unique (2D) data
DAPFitsUtil.write(self.hdu, str(self.file_path()), overwrite=overwrite, checksum=True,
symlink_dir=self.symlink_dir, loggers=self.loggers, quiet=self.quiet)
[docs] def read(self, ifile=None, strict=True, checksum=False):
"""
Read an existing file with a previously binned set of spectra.
Args:
ifile (:obj:`str`, optional):
Name of the file with the data. Default is to use the
name provided by :func:`file_path`.
strict (:obj:`bool`, optional):
Force a strict reading of the file to make sure that
it adheres to the expected format. At the moment,
this only checks to make sure the method keyword
printed in the header matches the expected value in
:attr:`method`.
checksum (:obj:`bool`, optional):
Use the checksum in the header to check for
corruption of the data.
Raises:
FileNotFoundError:
Raised if the file does not exist.
ValueError:
Raised if ``strict`` is true and the header keyword
``BINKEY`` does not match the method keyword.
"""
if ifile is None:
ifile = self.file_path()
if not ifile.exists():
raise FileNotFoundError('File does not exist!: {0}'.format(ifile))
if self.hdu is not None:
self.hdu.close()
self.hdu = DAPFitsUtil.read(str(ifile), checksum=checksum)
# Confirm that the internal method is the same as the method
# that was used in writing the file
if self.hdu['PRIMARY'].header['BINKEY'] != self.method['key']:
if strict:
raise ValueError('Keywords in header do not match specified method keywords!')
elif not self.quiet:
warnings.warn('Keywords in header do not match specified method keywords!')
# TODO: "strict" should also check other aspects of the file to
# make sure that the details of the method are also the same,
# not just the keyword
# Attempt to read and construct the covariance object
# TODO: Covariance matrices need to have the input_index
# written to the file
try:
self.covariance = Covariance.from_fits(self.hdu, ivar_ext=None, correlation=True)
except:
if not self.quiet:
warnings.warn('Unable to find/read covariance data.')
self.covariance = None
else:
var = numpy.ma.power(self.hdu['IVAR'].data[...,self.covariance.input_indx],
-1).filled(0.0).reshape(-1, self.covariance.shape[-1])
self.covariance = self.covariance.apply_new_variance(var)
# Attempt to read the effective radius
try:
self.reff = self.hdu['PRIMARY'].header['REFF']
except:
if not self.quiet:
warnings.warn('Unable to read effective radius from file header!')
self.reff = None
# Attempt to read binning parameters
if self.method['binpar'] is not None and callable(self.method['binpar'].fromheader):
self.method['binpar'].fromheader(self.hdu['PRIMARY'].header)
# Attempt to stacking parameters
if self.method['stackpar'] is not None and callable(self.method['stackpar'].fromheader):
self.method['stackpar'].fromheader(self.hdu['PRIMARY'].header)
self.nbins = self.hdu['PRIMARY'].header['NBINS']
self.missing_bins = SpatiallyBinnedSpectra._get_missing_bins(self.hdu['BINS'].data['BINID'])
[docs] def copy_to_array(self, ext='FLUX', waverange=None, include_missing=False):
r"""
Wrapper for :func:`mangadap.util.fitsutil.DAPFitsUtil.copy_to_array`
specific for :class:`SpatiallyBinnedSpectra`.
Return a copy of the selected data array. The array size is
always :math:`N_{\rm bins} \times N_{\rm wavelength}`; i.e.,
the data is always flattened to two dimensions and the unique
spectra are selected.
Args:
ext (:obj:`str`, optional):
Name of the extension from which to draw the data.
Must be allowed for the current :attr:`mode`; see
:attr:`data_arrays`.
waverange (array-like, optional):
Two-element array with the first and last wavelength
to include in the computation. If None, use the full
wavelength range.
include_missing (:obj:`bool`, optional):
Create an array with a size that accommodates the
missing models.
Returns:
`numpy.ndarray`_: A 2D array with a copy of the data from
the selected extension.
"""
return DAPFitsUtil.copy_to_array(self.hdu, ext=ext, allowed_ext=self.spectral_arrays,
waverange=waverange, missing_bins=self.missing_bins
if include_missing else None,
nbins=self.nbins,
unique_bins=DAPFitsUtil.unique_bins(self.hdu['BINID'].data))
[docs] def copy_to_masked_array(self, ext='FLUX', flag=None, waverange=None, include_missing=False):
"""
Wrapper for
:func:`mangadap.util.fitsutil.DAPFitsUtil.copy_to_masked_array`
specific for :class:`SpatiallyBinnedSpectra`.
Return a copy of the selected data array as a masked array.
This is functionally identical to :func:`copy_to_array`,
except the output format is a `numpy.ma.MaskedArray`_. The
pixels that are considered to be masked can be specified
using `flag`.
Args:
ext (:obj:`str`, optional):
Name of the extension from which to draw the data.
Must be allowed for the current :attr:`mode`; see
:attr:`data_arrays`.
flag (:obj:`str`, :obj:`list`, optional):
(List of) Flag names that are considered when
deciding if a pixel should be masked. The names
*must* be a valid bit name as defined by
:attr:`bitmask`. If not provided, *ANY* non-zero mask
bit is omitted.
waverange (array-like, optional):
Two-element array with the first and last wavelength
to include in the computation. If None, use the full
wavelength range.
include_missing (:obj:`bool`, optional):
Create an array with a size that accommodates the
missing models.
Returns:
`numpy.ma.MaskedArray`_: A 2D array with a copy of the
data from the selected extension.
"""
return DAPFitsUtil.copy_to_masked_array(self.hdu, ext=ext, mask_ext='MASK', flag=flag,
bitmask=self.bitmask, allowed_ext=self.spectral_arrays,
waverange=waverange, missing_bins=self.missing_bins
if include_missing else None,
nbins=self.nbins,
unique_bins=DAPFitsUtil.unique_bins(self.hdu['BINID'].data))
[docs] def get_bin_indices(self, bins):
"""
Return the indices of the bins in the BIN table.
Args:
bins (array-like):
The bin ID numbers to find
Returns:
`numpy.ndarray`_: Integer array with the index of each
bin ID in the BINID columns of the BINS extension.
"""
return numpy.array([numpy.where(self.hdu['BINS'].data['BINID'] == b)[0][0] for b in bins])
[docs] def find_nearest_bin(self, input_bins, weighted=False, indices=False):
"""
Use a `scipy.spatial.KDTree`_ to find the bins nearest to and
excluding a list of input bins.
Args:
input_bins (array-like):
One or more bin ID numbers to use to locate the bin
nearest to it based on its on-sky coordinates. The
list must be a unique set.
weighted (:obj:`bool`, optional):
Use the weighted coordinates (LW_SKY_COO) instead of
the unweighted coordinates (SKY_COO) when finding the
nearest bin.
indices (:obj:`bool`, optional):
Return the indices of the nearest bins instead of
their ID number (default).
Returns:
`numpy.ndarray`_: The bin IDs, one per input bin, of the
bin closest to each input bin. Any bins that are in
:attr:`missing_bins` have a return value of -1; there are
no coordinates for these bins.
Raises:
ValueError:
Raised if the set of input bins is not unique or
contains bin IDs that are not present.
"""
# Check the input bin IDs
_input_bins = numpy.atleast_1d(input_bins)
single_value = len(_input_bins) == 1 and not isinstance(input_bins, (list, numpy.ndarray))
if len(numpy.unique(_input_bins)) != len(_input_bins):
raise ValueError('Must provide a unique set of input bin IDs.')
maxbinid = numpy.amax(self.hdu['BINS'].data['BINID'])
if numpy.amin(_input_bins) < 0 or numpy.amax(_input_bins) > maxbinid:
return ValueError('Input contains invalid bin IDs.')
# Account for any missing bins
valid_input_bin = numpy.ones(_input_bins.size, dtype=bool) if len(self.missing_bins) == 0 \
else numpy.invert([ ib in self.missing_bins for ib in _input_bins ])
if not numpy.all(valid_input_bin):
warnings.warn('Input bin IDs include missing bins. Returned as -1.')
_input_bins = _input_bins[valid_input_bin]
# Select the coordinate column
col = 'LW_SKY_COO' if weighted else 'SKY_COO'
# The reference bins are all bins EXCEPT those provided
reference_bin = numpy.in1d(self.hdu['BINS'].data['BINID'], _input_bins, assume_unique=True,
invert=True)
# Get the coordinates of the reference grid
ref_coo = numpy.array([self.hdu['BINS'].data[col][reference_bin,0],
self.hdu['BINS'].data[col][reference_bin,1] ]).T
# Construct the KDTree
kd = spatial.KDTree(ref_coo)
# Get the indices of the input bins in the internal list
input_bin_indx = self.get_bin_indices(_input_bins)
# Get the coordinates of the bins to match to the nearest
# reference bin
input_coo = numpy.array([self.hdu['BINS'].data[col][input_bin_indx,0],
self.hdu['BINS'].data[col][input_bin_indx,1] ]).T
# Get the indices of the nearest bins
dist, nearest_bin = kd.query(input_coo)
# Return either the bin indices or the bin ID numbers
output_bins = numpy.zeros(_input_bins.size, dtype=int) - 1
output_bins[valid_input_bin] = numpy.arange(self.nbins)[reference_bin][nearest_bin] \
if indices else self.hdu['BINS'].data['BINID'][reference_bin][nearest_bin]
return output_bins[0] if single_value else output_bins
[docs] def replace_with_data_from_nearest_bin(self, data, bad_bins):
"""
Replace data in the list of provided bad bins with the data from
the nearest good bin.
Args:
data (array-like):
Data for each bin. The length must be the same as
:attr:`nbins`.
bad_bins (array-like):
The list of indices (must not be a boolean array)
with bad values to be replaced.
Returns:
`numpy.ndarray`_: A new array with the bad data filled
with the data from the nearest bin.
Raises:
ValueError:
Raised if the input array doesn't have the correct
shape or if the list of bad bins has numbers outside
the viable range (0,self.nbins-1).
"""
if len(data) != self.nbins:
raise ValueError('Input data must have {0} elements.'.format(self.nbins))
# No bad bins so just return the input
if len(bad_bins) == 0:
return data
# Select the bins to replace
replace = numpy.zeros(self.nbins, dtype=bool)
replace[self.get_bin_indices(bad_bins)] = True
# Get the coordinates of the reference grid
coo = numpy.array([ self.hdu['BINS'].data['SKY_COO'][:,0],
self.hdu['BINS'].data['SKY_COO'][:,1] ]).T
# Return the replaced data
return replace_with_data_from_nearest_coo(coo, data, replace)