# Licensed under a 3-clause BSD style license - see LICENSE.rst
# -*- coding: utf-8 -*-
"""
A class hierarchy that performs the stellar-continuum fitting.
----
.. include license and copyright
.. include:: ../include/copy.rst
----
.. include common links, assuming primary doc root is up one directory
.. include:: ../include/links.rst
"""
import inspect
from pathlib import Path
import warnings
import logging
from IPython import embed
import numpy
from astropy.io import fits
import astropy.constants
from ..par.parset import KeywordParSet, ParSet
from ..util.log import log_output
from ..util.fitsutil import DAPFitsUtil
from ..util.fileio import create_symlink
from ..util.sampling import spectral_coordinate_step, spectrum_velocity_scale
from ..util.resolution import SpectralResolution
from ..util.dapbitmask import DAPBitMask
from .spatiallybinnedspectra import SpatiallyBinnedSpectra
from .templatelibrary import TemplateLibrary
from .ppxffit import PPXFFitPar, PPXFFit
from .util import replace_with_data_from_nearest_coo
[docs]
class StellarContinuumModelDef(KeywordParSet):
"""
A class that holds the parameters necessary to perform the
stellar-continuum fitting.
The provided ``fitfunc`` must have the form::
model_wave, model_flux, model_mask, model_par \
= fitfunc(self, binned_spectra, par=None)
where ``binned_spectra`` has type
:class:`mangadap.proc.spatiallybinnedspetra.SpatiallyBinnedSpectra`,
and the returned objects are the wavelength vector, the fitted
model flux, a bitmask for the model, and the set of model
parameters. For example, see
:func:`mangadap.proc.ppxffit.PPXFFit.fit_SpatiallyBinnedSpectra`.
The defined parameters are:
.. include:: ../tables/stellarcontinuummodeldef.rst
"""
def __init__(self, key='MILESHC', minimum_snr=1.0, fitpar=None, fitclass=None, fitfunc=None,
overwrite=False):
# Use the signature to get the parameters and the default values
sig = inspect.signature(self.__class__)
pars = list(sig.parameters.keys())
defaults = [sig.parameters[key].default for key in pars]
# Remaining definitions done by hand
in_fl = [int, float]
par_opt = [ParSet, dict]
values = [key, minimum_snr, fitpar, fitclass, fitfunc, overwrite ]
dtypes = [str, in_fl, par_opt, None, None, bool]
can_call = [False, False, False, False, True, False]
descr = ['Keyword used to distinguish between different spatial binning schemes.',
'Minimum S/N of spectrum to fit',
'Any additional parameters, aside from the spectra themselves, required by ' \
'the fitting function. If None, uses a default pPXF fit.',
'Instance of class object to use for the model fitting. Needed in case ' \
'fitfunc is a non-static member function of a class. If None, uses a ' \
'default pPXF fit.',
'The function that models the spectra. If None, uses a default pPXF fit and ' \
'anything provided for fitpar and fitclass are ignored!',
'If the output file already exists, redo all the calculations and overwrite it.']
super().__init__(pars, values=values, defaults=defaults, dtypes=dtypes, can_call=can_call,
descr=descr)
self._validate()
[docs]
def _validate(self):
"""
Validate the modeling method.
"""
if self['fitfunc'] is None:
warnings.warn('No fitting function defined. Defaulting to use mangadap.proc.ppxffit.')
self['fitpar'] = PPXFFitPar()
self['fitclass'] = PPXFFit(StellarContinuumModelBitMask())
self['fitfunc'] = self['fitclass'].fit_SpatiallyBinnedSpectra
[docs]
@classmethod
def from_dict(cls, d):
"""
Instantiate the object from a nested dictionary.
The dictionary is expected to have keys for all the class keywords; see
the class instantiation method. It should also include the
``'fit_method'`` keyword used to select the type of fit to perform, and
a nested dictionary selected by the keyword ``'fit'`` with the
parameters required by the fitting method. If these are not provided,
the default fitter, :class:`~mangadap.proc.ppxffit.PPXFFit` and
parameters will be used.
.. warning::
Currently, the only accepted value for ``'fit_method'`` is
``'ppxf'``, and the ``'fit'`` dictionary must provide the parameters
defined by :class:`~mangadap.proc.ppxffit.PPXFFitPar`.
Args:
d (:obj:`dict`):
Dictionary used to instantiate the class.
"""
# Copy over the primary keywords
_d = {}
for key in ['key', 'minimum_snr', 'overwrite']:
if key in d.keys():
_d[key] = d[key]
# Set the spatial binning method
if 'fit_method' not in d or d['fit_method'] in ['none', None]:
# Use the default fitter and default fitting parameters!
_d['fitpar'] = None
_d['fitclass'] = None
_d['fitfunc'] = None
elif d['fit_method'] == 'ppxf':
_d['fitpar'] = PPXFFitPar.from_dict(d['fit']) if 'fit' in d else PPXFFitPar()
_d['fitclass'] = PPXFFit(StellarContinuumModelBitMask())
_d['fitfunc'] = _d['fitclass'].fit_SpatiallyBinnedSpectra
else:
raise ValueError('Unrecognized fitting method. Requires direct instantiation or '
'code alterations.')
# Return the instantiation
return super().from_dict(_d)
[docs]
class StellarContinuumModelBitMask(DAPBitMask):
r"""
Derived class that specifies the mask bits for the stellar-continuum
modeling. The maskbits defined are:
.. include:: ../tables/stellarcontinuummodelbitmask.rst
"""
cfg_root = 'stellar_continuum_model_bits'
[docs]
class StellarContinuumModel:
r"""
Class that holds the stellar-continuum model results.
.. todo::
- Update attributes
Args:
method (:class:`~mangadap.proc.stellarcontinuummodel.StellarContinuumModelDef`):
Object that defines the main parameters used for the stellar
continuum fitting.
binned_spectra (:class:`mangadap.proc.spatiallybinnedspectra.SpatiallyBinnedSpectra`):
The binned spectra to fit. Must be able to access parent
:class:`mangadap.datacube.datacube.DataCube` attribute
(``cube``).
guess_vel (:obj:`float`, `numpy.ndarray`_):
A single or spectrum-dependent initial estimate of the
redshift in km/s (:math:`cz`).
guess_sig (:obj:`float`, `numpy.ndarray`_, optional):
A single or spectrum-dependent initial estimate of the
velocity dispersion in km/s. If None, default set to 100
km/s.
method_list (:obj:`list`, optional):
List of :class:`StellarContinuumModelDef` objects that
define one or more methods to use for the stellar
continuum fitting. The default list is provided by the
config files in the DAP source directory and compiled
into this list using
:func:`available_stellar_continuum_modeling_methods`.
output_path (:obj:`str`, `Path`_, optional):
The path for the output file. If None, the current working
directory is used.
output_file (:obj:`str`, optional):
The name of the output "reference" file. The full path of the output
file will be :attr:`directory_path`/:attr:`output_file`. If None,
the default is to combine ``cube.output_root`` and the method keys.
The order of the keys is the order of operations (rdxqa, binning,
stellar continuum). See
:func:`~mangadap.proc.stellarcontinuummodel.StellarContinuumModel.default_paths`.
hardcopy (:obj:`bool`, optional):
Flag to write the HDUList attribute to disk. If False,
the core `astropy.io.fits.HDUList`_ attribute
(:attr:`hdu`) is only kept in memory and would have to be
reconstructed.
symlink_dir (:obj:`str`, optional):
Create a symbolic link to the created file in the supplied
directory. If None, no symbolic link is created.
tpl_hardcopy (:obj:`bool`, optional):
Save the processed template library used during the fit. If True,
the output path and symlink directory for the template library are
identical to the main reference file.
overwrite (:obj:`bool`, optional):
Overwrite any existing files. Default is to use any
existing file instead of redoing the analysis and
overwriting the existing output.
checksum (:obj:`bool`, optional):
Use the checksum in the fits header to confirm that the
data has not been corrupted. The checksum is **always**
written to the fits header when the file is created.
loggers (:obj:`list`, optional):
List of `logging.Logger`_ objects to log progress;
ignored if quiet=True. Logging is done using
:func:`mangadap.util.log.log_output`. Default is no
logging.
quiet (:obj:`bool`, optional):
Suppress all terminal and logging output.
Attributes:
loggers (:obj:`list`):
List of `logging.Logger`_ objects to log progress.
quiet (:obj:`bool`):
Suppress all terminal and logging output.
"""
def __init__(self, method, binned_spectra, guess_vel, guess_sig=None, method_list=None,
output_path=None, output_file=None, hardcopy=True, symlink_dir=None,
tpl_hardcopy=False, overwrite=False, checksum=False, loggers=None, quiet=False):
self.loggers = None
self.quiet = False
# Define the method properties
self.method = method
if not isinstance(self.method, StellarContinuumModelDef):
raise TypeError('Method must have type StellarContinuumModelDef.')
self.binned_spectra = None
self.guess_vel = None
self.guess_sig = None
# Define the output directory and file
self.directory_path = None # Set in default_paths
self.output_file = None
self.hardcopy = None
self.tpl_hardcopy = None
self.symlink_dir = None
# Define the bitmask
self.bitmask = StellarContinuumModelBitMask()
# Initialize the main class attributes
self.hdu = None
self.checksum = checksum
self.shape = None
self.spatial_shape = None
self.nspec = None
self.spatial_index = None
self.spectral_arrays = None
self._assign_spectral_arrays()
self.image_arrays = None
self._assign_image_arrays()
self.nwave = None
self.nmodels = None
self.missing_models = None
# TODO: Include covariance between measured properties
# Run the assessments of the DRP file
self.fit(binned_spectra, guess_vel, guess_sig=guess_sig, output_path=output_path,
output_file=output_file, hardcopy=hardcopy, symlink_dir=symlink_dir,
tpl_hardcopy=tpl_hardcopy, overwrite=overwrite, loggers=loggers, quiet=quiet)
def __getitem__(self, key):
return self.hdu[key]
[docs]
def get_template_library(self, output_path=None, output_file=None, hardcopy=None,
symlink_dir=None, velocity_offset=None, match_resolution=False,
resolution_fwhm=None):
"""
Construct the template library for the continuum fitting.
If the FWHM (in angstroms; see ``resolution_fwhm``) is not
provided, this method is a simple wrapper that returns a
:class:`mangadap.proc.templatelibrary.TemplateLibrary`.
Otherwise, the provided FWHM is used to construct the
spectral resolution for the template library.
In either case, the sampling is set to match the spectra in
:attr:`binned_spectra` (up to the velocity-scale ratio set by
:attr:`method`).
Args:
output_path (:obj:`str`, `Path`_, optional):
The path for the output file. If None, set to
:attr:`directory_path`. An output file is only produced if
``hardcopy`` is True.
output_file (:obj:`str`, optional):
The name of the output template library file. If None and
``resolution_fwhm`` is None, uses the default template library
file; see
:func:`~mangadap.proc.templatelibrary.TemplateLibrary.default_paths`.
If None and ``resolution_fwhm`` is provided, the default
template file is also used, but the library key is changed to
indicate the new FHWM of the output resolution. An output file
is only produced if ``hardcopy`` is True.
hardcopy (:obj:`bool`, optional):
Flag to keep a hardcopy of the processed template library. If
None, the flag is set by :attr:`tpl_hardcopy`. If no hardcopy
is to be produced, ``output_path`` and ``output_file`` are
ignored.
symlink_dir (:obj:`str`, optional):
Create a symbolic link to the created template library file in
this directory. If None, :attr:`symlink_dir` is used. A symlink
is produced only if ``hardcopy`` is True.
velocity_offset (:obj:`float`, optional):
Velocity offset to use when matching the spectral
resolution between the template library and the galaxy
spectra.
match_resolution (:obj:`bool`, optional):
Match the spectral resolution of the template library to
the resolution provided by the ``cube``; the latter must
be provided for this argument to have any use.
resolution_fwhm (:obj:`float`, optional):
The target resolution for the template library set by
a constant FWHM of the resolution element in
angstroms over all wavelengths. If None, the
resolution is either matched to the binned spectra
(see ``match_resolution``) or kept at the native
resolution of the library.
Returns:
:class:`mangadap.proc.templatelibrary.TemplateLibrary`:
The template library prepared for fitting the binned
spectra.
"""
# Allow output paths and files to be over-ridden
_output_path = self.directory_path if output_path is None else output_path
_hardcopy = self.tpl_hardcopy if hardcopy is None else hardcopy
_symlink_dir = self.symlink_dir if symlink_dir is None else symlink_dir
if resolution_fwhm is None:
return TemplateLibrary(self.method['fitpar']['template_library_key'],
velocity_offset=velocity_offset, cube=self.binned_spectra.cube,
match_resolution=match_resolution,
velscale_ratio=self.method['fitpar']['velscale_ratio'],
output_path=_output_path, output_file=output_file,
hardcopy=_hardcopy, symlink_dir=_symlink_dir,
loggers=self.loggers, quiet=self.quiet)
if output_file is None:
_key = f'{self.method["fitpar"]["template_library_key"]}-FWHM{resolution_fwhm:.2f}'
output_file = TemplateLibrary.default_paths(_key, cube=self.binned_spectra.cube,
output_path=_output_path)[1]
# Set the spectral resolution
wave = self.binned_spectra['WAVE'].data
sres = SpectralResolution(wave, wave/resolution_fwhm, log10=True)
velscale_ratio = 1 if self.method['fitpar']['velscale_ratio'] is None \
else self.method['fitpar']['velscale_ratio']
spectral_step = spectral_coordinate_step(wave, log=True) / velscale_ratio
return TemplateLibrary(self.method['fitpar']['template_library_key'], sres=sres,
velocity_offset=velocity_offset, spectral_step=spectral_step,
log=True, output_path=_output_path, output_file=output_file,
hardcopy=_hardcopy, symlink_dir=_symlink_dir,
loggers=self.loggers, quiet=self.quiet)
[docs]
def _finalize_cube_mask(self, mask):
"""
Finalize the mask after the 2D mask has been reconstructed
into a 3D cube.
This mostly handles the masks for regions outside the
datacube field of view.
Note that the input mask is both edited in-place and
returned.
.. todo::
- This needs to be abstracted for non-DRP datacubes.
- Describe MAPMASK usage
Args:
mask (`numpy.ndarray`_):
3D array with the current bitmask data.
Returns:
`numpy.ndarray`_: Edited bitmask data.
"""
# TODO: I'm not sure this is correct for non-MaNGA data
# Turn on the flag stating that the pixel wasn't used
indx = self.binned_spectra.bitmask.flagged(self.binned_spectra.cube.mask,
flag=self.binned_spectra.do_not_fit_flags())
mask[indx] = self.bitmask.turn_on(mask[indx], 'DIDNOTUSE')
# Turn on the flag stating that the pixel has a foreground star
indx = self.binned_spectra.bitmask.flagged(self.binned_spectra.cube.mask, flag='FORESTAR')
mask[indx] = self.bitmask.turn_on(mask[indx], 'FORESTAR')
# Propagate the MAPMASK to the full cube
for f in ['DIDNOTUSE', 'FORESTAR', 'LOW_SNR']:
indx = self.bitmask.flagged(self['MAPMASK'].data, f)
mask[indx,:] = self.bitmask.turn_on(mask[indx,:], f)
return mask
[docs]
def _assign_spectral_arrays(self):
"""
Set :attr:`spectral_arrays`, which contains the list of
extensions in :attr:`hdu` that contain spectral data.
"""
self.spectral_arrays = ['FLUX', 'MASK']
[docs]
def _assign_image_arrays(self):
"""
Set :attr:`image_arrays`, which contains the list of extensions
in :attr:`hdu` that are on-sky image data.
"""
self.image_arrays = ['BINID']
[docs]
def _get_missing_models(self, debug=False):
"""
Find the spectra with missing models, either because there is
no model or the bin is also missing.
Args:
debug (:obj:`bool`, optional):
When checking the S/N limits, run
:func:`mangadap.proc.spatiallybinnedspectra.SpatiallyBinnedSpectra.above_snr_limit`
in debug mode.
Returns:
`numpy.ndarray`_: Sorted list of missing models.
"""
good_snr = self.binned_spectra.above_snr_limit(self.method['minimum_snr'], debug=debug)
return numpy.sort(self.binned_spectra['BINS'].data['BINID'][numpy.invert(good_snr)].tolist()
+ self.binned_spectra.missing_bins)
[docs]
def _construct_2d_hdu(self, good_snr, model_flux, model_mask, model_par):
r"""
Construct :attr:`hdu` that is held in memory for manipulation
of the object. See :func:`construct_3d_hdu` to convert the
object into a datacube.
Args:
good_snr (`numpy.ndarray`_):
Boolean array selecting the bins with good S/N.
model_flux (`numpy.ndarray`_):
Best-fitting model spectra for each bin.
model_mask (`numpy.ndarray`_):
Bitmask flags for each model pixel.
model_par (:class`~mangadap.proc.spectralfitting.StellarKinematicsFitDataTable`):
Data table with the best-fitting model parameters.
"""
if not self.quiet:
log_output(self.loggers, 1, logging.INFO, 'Constructing hdu ...')
# Initialize the headers
pri_hdr = self._initialize_primary_header()
pri_hdr = self._add_method_header(pri_hdr)
map_hdr = DAPFitsUtil.build_map_header(self.binned_spectra.cube.fluxhdr,
'K Westfall <westfall@ucolick.org>')
# Get the spatial map mask
map_mask = numpy.zeros(self.spatial_shape, dtype=self.bitmask.minimum_dtype())
# Add any spaxel not used because it was flagged by the binning
# step
indx = self.binned_spectra['MAPMASK'].data > 0
map_mask[indx] = self.bitmask.turn_on(map_mask[indx], 'DIDNOTUSE')
# Isolate any spaxels with foreground stars
indx = self.binned_spectra.bitmask.flagged(self.binned_spectra['MAPMASK'].data, 'FORESTAR')
map_mask[indx] = self.bitmask.turn_on(map_mask[indx], 'FORESTAR')
# Get the bins that were blow the S/N limit
indx = numpy.invert(DAPFitsUtil.reconstruct_map(self.spatial_shape,
self.binned_spectra['BINID'].data.ravel(),
good_snr, dtype='bool')) & (map_mask == 0)
map_mask[indx] = self.bitmask.turn_on(map_mask[indx], 'LOW_SNR')
# Get the bin ids with fitted models
bin_indx = DAPFitsUtil.downselect_bins(self.binned_spectra['BINID'].data.ravel(),
model_par['BINID']).reshape(self.spatial_shape)
# Save the data to the hdu attribute
self.hdu = fits.HDUList([fits.PrimaryHDU(header=pri_hdr),
fits.ImageHDU(data=model_flux.data, name='FLUX'),
fits.ImageHDU(data=model_mask, name='MASK'),
self.binned_spectra['WAVE'].copy(),
fits.ImageHDU(data=bin_indx, header=map_hdr, name='BINID'),
fits.ImageHDU(data=map_mask, header=map_hdr, name='MAPMASK'),
model_par.to_hdu(name='PAR')])
[docs]
@staticmethod
def default_paths(cube, method_key, rdxqa_method, binning_method, output_path=None,
output_file=None):
"""
Set the default directory and file name for the output file.
Args:
cube (:class:`mangadap.datacube.datacube.DataCube`):
Datacube to analyze.
method_key (:obj:`str`):
Keyword designating the method used for the reduction
assessments.
rdxqa_method (:obj:`str`):
The method key for the basic assessments of the datacube.
binning_method (:obj:`str`):
The method key for the spatial binning.
output_path (:obj:`str`, `Path`_, optional):
The path for the output file. If None, the current working
directory is used.
output_file (:obj:`str`, optional):
The name of the output "reference" file. The full path of the
output file will be :attr:`directory_path`/:attr:`output_file`.
If None, the default is to combine ``cube.output_root`` and the
method keys. The order of the keys is the order of operations
(rdxqa, binning, stellar continuum).
Returns:
:obj:`tuple`: Returns a `Path`_ with the output directory and a
:obj:`str` with the output file name.
"""
directory_path = Path('.').resolve() if output_path is None \
else Path(output_path).resolve()
method = f'{rdxqa_method}-{binning_method}-{method_key}'
_output_file = f'{cube.output_root}-{method}.fits.gz' if output_file is None \
else output_file
return directory_path, _output_file
[docs]
def file_name(self):
"""Return the name of the output file."""
return self.output_file
[docs]
def file_path(self):
"""Return the full path to the output file."""
if self.directory_path is None or self.output_file is None:
return None
return self.directory_path / self.output_file
[docs]
def info(self):
return self.hdu.info()
[docs]
def all_spectrum_flags(self):
return ['DIDNOTUSE', 'FORESTAR', 'LOW_SNR', 'ARTIFACT', 'OUTSIDE_RANGE', 'EML_REGION',
'TPL_PIXELS', 'TRUNCATED', 'PPXF_REJECT', 'INVALID_ERROR', 'FIT_FAILED',
'NEAR_BOUND']
[docs]
def all_except_emission_flags(self):
return ['DIDNOTUSE', 'FORESTAR', 'LOW_SNR', 'ARTIFACT', 'OUTSIDE_RANGE', 'TPL_PIXELS',
'TRUNCATED', 'PPXF_REJECT', 'INVALID_ERROR', 'FIT_FAILED', 'NEAR_BOUND']
[docs]
def fit(self, binned_spectra, guess_vel, guess_sig=None, output_path=None, output_file=None,
hardcopy=True, symlink_dir=None, tpl_hardcopy=None, overwrite=None, loggers=None,
quiet=False):
"""
Fit the binned spectra using the specified method.
Args:
binned_spectra (:class:`mangadap.proc.spatiallybinnedspectra.SpatiallyBinnedSpectra`):
The binned spectra to fit. Must be able to access
parent :class:`mangadap.datacube.datacube.DataCube`
attribute (``cube``).
guess_vel (:obj:`float`, `numpy.ndarray`):
A single or spectrum-dependent initial estimate of the
redshift in km/s (:math:`cz`).
guess_sig (:obj:`float`, `numpy.ndarray`, optional):
A single or spectrum-dependent initial estimate of the
velocity dispersion in km/s.
output_path (:obj:`str`, `Path`_, optional):
The path for the output file. If None, the current working
directory is used.
output_file (:obj:`str`, optional):
The name of the output "reference" file. The full path of the output
file will be :attr:`directory_path`/:attr:`output_file`. If None,
the default is to combine ``cube.output_root`` and the method keys.
The order of the keys is the order of operations (rdxqa, binning,
stellar continuum). See
:func:`~mangadap.proc.stellarcontinuummodel.StellarContinuumModel.default_paths`.
hardcopy (:obj:`bool`, optional):
Flag to write the HDUList attribute to disk. Default is
True; if False, the HDUList is only kept in memory and
would have to be reconstructed.
symlink_dir (:obj:`str`, optional):
Create a symbolic link to the created file in the
supplied directory. Default is to produce no symbolic
link.
tpl_hardcopy (:obj:`bool`, optional):
Save the processed template library used during the fit. If
True, the output path and symlink directory for the template
library are identical to the main reference file.
overwrite (:obj:`bool`, optional):
Overwrite any existing files. Default is to use any
existing file instead of redoing the analysis and
overwriting the existing output.
loggers (:obj:`list`, optional):
List of `logging.Logger`_ objects to log progress;
ignored if quiet=True. Logging is done using
:func:`mangadap.util.log.log_output`. Default is no
logging.
quiet (:obj:`bool`, optional):
Suppress all terminal and logging output.
"""
# Initialize the reporting
if loggers is not None:
self.loggers = loggers
self.quiet = quiet
# SpatiallyBinnedSpectra object always needed
if binned_spectra is None:
raise ValueError('Must provide spectra object for fitting.')
if not isinstance(binned_spectra, SpatiallyBinnedSpectra):
raise TypeError('Must provide a valid SpatiallyBinnedSpectra object!')
if binned_spectra.hdu is None:
raise ValueError('Provided SpatiallyBinnedSpectra object is undefined!')
self.binned_spectra = binned_spectra
self.shape = self.binned_spectra.shape
self.spatial_shape =self.binned_spectra.spatial_shape
self.nspec = self.binned_spectra.nspec
self.spatial_index = self.binned_spectra.spatial_index.copy()
self.nwave = self.binned_spectra.nwave
# Get the guess kinematics
if guess_vel is not None:
self.guess_vel=guess_vel
if self.guess_vel is None:
raise ValueError('Must provide initial guess velocity.')
self.guess_sig = 100.0 if guess_sig is None else guess_sig
#---------------------------------------------------------------
# Get the good spectra
# debug = True
debug = False
good_snr = self.binned_spectra.above_snr_limit(self.method['minimum_snr'], debug=debug)
# Report
if not self.quiet:
log_output(self.loggers, 1, logging.INFO, '-'*50)
log_output(loggers, 1, logging.INFO, f'{"STELLAR CONTINUUM FITTING":^50}')
log_output(self.loggers, 1, logging.INFO, '-'*50)
log_output(self.loggers, 1, logging.INFO,
f'Number of binned spectra: {self.binned_spectra.nbins}')
if len(self.binned_spectra.missing_bins) > 0:
log_output(self.loggers, 1, logging.INFO,
f'Missing bins: {len(self.binned_spectra.missing_bins)}')
log_output(self.loggers, 1, logging.INFO,
f'With good S/N and to fit: {numpy.sum(good_snr)}')
# TODO: Is there a better way to handle this?
if numpy.sum(good_snr) == 0:
raise ValueError('No good spectra to fit!')
#---------------------------------------------------------------
# Fill in any remaining binning parameters
self.tpl_hardcopy = tpl_hardcopy
# self._fill_method_par()
if self.method['fitpar'] is not None and hasattr(self.method['fitpar'], 'fill'):
self.method['fitpar'].fill(self.binned_spectra, guess_vel=self.guess_vel,
guess_sig=self.guess_sig, output_path=self.directory_path,
hardcopy=self.tpl_hardcopy, symlink_dir=self.symlink_dir,
loggers=self.loggers, quiet=self.quiet)
_overwrite = self.method['overwrite'] if overwrite is None else overwrite
# (Re)Set the output paths
self.directory_path, self.output_file \
= StellarContinuumModel.default_paths(self.binned_spectra.cube,
self.method['key'],
self.binned_spectra.rdxqa.method['key'],
self.binned_spectra.method['key'],
output_path=output_path,
output_file=output_file)
#---------------------------------------------------------------
# Check that the file path is defined
ofile = self.file_path()
if ofile is None:
raise ValueError('File path for output file is undefined!')
# Report
if not self.quiet:
log_output(self.loggers, 1, logging.INFO, f'Output path: {self.directory_path}')
log_output(self.loggers, 1, logging.INFO, f'Output file: {self.output_file}')
#---------------------------------------------------------------
# If the file already exists, and not ovewriting, just read the
# file
self.symlink_dir = symlink_dir
if ofile.exists() and not overwrite:
self.hardcopy = True
if not self.quiet:
log_output(self.loggers, 1, logging.INFO, 'Using existing file')
self.read(checksum=self.checksum, debug=debug)
# Make sure the symlink exists
if self.symlink_dir is not None:
create_symlink(str(ofile), self.symlink_dir, overwrite=overwrite)
if not self.quiet:
log_output(self.loggers, 1, logging.INFO, '-'*50)
return
#---------------------------------------------------------------
# Fit the spectra
# Mask should be fully defined within the fitting function
model_wave, model_flux, model_mask, model_par \
= self.method['fitfunc'](self.binned_spectra, gpm=good_snr,
par=self.method['fitpar'], loggers=self.loggers,
quiet=self.quiet, debug=debug)
# The number of models returned should match the number of
# "good" spectra
# TODO: Include failed fits in "missing" models?
# DEBUG
if model_flux.shape[0] != numpy.sum(good_snr):
raise ValueError('Unexpected returned shape of fitted continuum models.')
# Set the number of models and the missing models
self.nmodels = model_flux.shape[0]
self.missing_models = self._get_missing_models(debug=debug)
# Report
if not self.quiet:
log_output(self.loggers, 1, logging.INFO, f'Fitted models: {self.nmodels}')
if len(self.missing_models) > 0:
log_output(self.loggers, 1, logging.INFO,
f'Missing models: {len(self.missing_models)}')
# Construct the 2d hdu list that only contains the fitted models
self.hardcopy = hardcopy
self._construct_2d_hdu(good_snr, model_flux, model_mask, model_par)
#---------------------------------------------------------------
# Write the data, if requested
if self.hardcopy:
if not self.directory_path.exists():
self.director_path.mkdir(parents=True)
self.write(overwrite=overwrite)
if not self.quiet:
log_output(self.loggers, 1, logging.INFO, '-'*50)
[docs]
def construct_3d_hdu(self):
"""
Reformat the model spectra into a cube matching the shape of
the DRP fits file.
"""
# Report
if not self.quiet:
log_output(self.loggers, 1, logging.INFO, 'Constructing stellar continuum datacube ...')
bin_indx = self.hdu['BINID'].data.copy()
model_flux = self.hdu['FLUX'].data.copy()
model_mask = self.hdu['MASK'].data.copy()
flux, mask = DAPFitsUtil.reconstruct_cube(self.shape, bin_indx.ravel(),
[model_flux, model_mask])
mask = self._finalize_cube_mask(mask)
# Primary header is identical regardless of the shape of the
# extensions
hdr = self.hdu['PRIMARY'].header.copy()
cube_hdr = DAPFitsUtil.build_cube_header(self.binned_spectra.cube,
'K Westfall <westfall@ucolick.org>')
# Return the converted hdu without altering the internal hdu
return fits.HDUList([ fits.PrimaryHDU(header=hdr),
fits.ImageHDU(data=flux, header=cube_hdr, name='FLUX'),
fits.ImageHDU(data=mask, header=cube_hdr, name='MASK'),
self.hdu['WAVE'].copy(),
self.hdu['BINID'].copy(),
self.hdu['PAR'].copy()
])
# Exact same function as used by SpatiallyBinnedSpectra
[docs]
def write(self, match_datacube=False, overwrite=False):
"""
Write the hdu object to the file.
Args:
match_datacube (:obj:`bool`, optional):
Match the shape of the data arrays to the input
datacube. I.e., convert them to 3D and replicate the
binned spectrum to each spaxel in the bin.
overwrite (:obj:`bool`, optional):
Overwrite any existing file.
"""
# Convert the spectral arrays in the HDU to a 3D cube and write
# it
if match_datacube:
hdu = self.construct_3d_hdu()
DAPFitsUtil.write(hdu, str(self.file_path()), overwrite=overwrite, checksum=True,
symlink_dir=self.symlink_dir, loggers=self.loggers, quiet=self.quiet)
return
# Just write the unique (2D) data
DAPFitsUtil.write(self.hdu, str(self.file_path()), overwrite=overwrite, checksum=True,
symlink_dir=self.symlink_dir, loggers=self.loggers, quiet=self.quiet)
[docs]
def read(self, ifile=None, strict=True, checksum=False, debug=False):
"""
Read an existing file with a previously fit set of continuum
models.
Args:
ifile (:obj:`str`, optional):
Name of the file with the data. Default is to use the
name provided by :func:`file_path`.
strict (:obj:`bool`, optional):
Force a strict reading of the file to make sure that
it adheres to the expected format. At the moment,
this only checks to make sure the method keyword
printed in the header matches the expected value in
:attr:`method`.
checksum (:obj:`bool`, optional):
Use the checksum in the header to check for
corruption of the data.
debug (:obj:`bool`, optional):
Run check for missing models in debugging mode.
Raises:
FileNotFoundError:
Raised if the file does not exist.
ValueError:
Raised if ``strict`` is true and the header keyword
``SCKEY`` does not match the method keyword.
"""
if ifile is None:
ifile = self.file_path()
if not ifile.exists():
raise FileNotFoundError('File does not exist!: {0}'.format(ifile))
if self.hdu is not None:
self.hdu.close()
# self.hdu = fits.open(ifile, checksum=checksum)
self.hdu = DAPFitsUtil.read(str(ifile), checksum=checksum)
# Confirm that the internal method is the same as the method
# that was used in writing the file
if self.hdu['PRIMARY'].header['SCKEY'] != self.method['key']:
if strict:
raise ValueError('Keywords in header do not match specified method keywords!')
elif not self.quiet:
warnings.warn('Keywords in header do not match specified method keywords!')
# TODO: "strict" should also check other aspects of the file to
# make sure that the details of the method are also the same,
# not just the keyword
# Attempt to read the input guess velocity and sigma
try:
self.guess_vel = self.hdu['PRIMARY'].header['SCINPVEL']
self.guess_sig = self.hdu['PRIMARY'].header['SCINPSIG']
except:
if not self.quiet:
warnings.warn('Unable to read input guess kinematics from file header!')
self.guess_vel = None
self.guess_sig = None
# Attempt to read the modeling parameters
if self.method['fitpar'] is not None and callable(self.method['fitpar'].fromheader):
self.method['fitpar'].fromheader(self.hdu['PRIMARY'].header)
self.nmodels = self.hdu['FLUX'].shape[0]
self.missing_models = self._get_missing_models(debug=debug)
[docs]
def copy_to_array(self, ext='FLUX', waverange=None, include_missing=False):
r"""
Wrapper for :func:`mangadap.util.fitsutil.DAPFitsUtil.copy_to_array`
specific for :class:`StellarContinuumModel`.
Return a copy of the selected data array. The array size is
always :math:`N_{\rm models} \times N_{\rm wavelength}`; i.e.,
the data is always flattened to two dimensions and the unique
spectra are selected.
Args:
ext (:obj:`str`, optional):
Name of the extension from which to draw the data.
Must be allowed for the current :attr:`mode`; see
:attr:`data_arrays`.
waverange (array-like, optional):
Two-element array with the first and last wavelength
to include in the computation. If None, use the full
wavelength range.
include_missing (:obj:`bool`, optional):
Create an array with a size that accommodates the
missing models.
Returns:
`numpy.ndarray`_: A 2D array with a copy of the data from
the selected extension.
"""
return DAPFitsUtil.copy_to_array(self.hdu, ext=ext, allowed_ext=self.spectral_arrays,
waverange=waverange,
missing_bins=self.missing_models if include_missing else None,
nbins=self.nbins,
unique_bins=DAPFitsUtil.unique_bins(self.hdu['BINID'].data))
[docs]
def copy_to_masked_array(self, ext='FLUX', flag=None, waverange=None, include_missing=False):
"""
Wrapper for
:func:`mangadap.util.fitsutil.DAPFitsUtil.copy_to_masked_array`
specific for :class:`StellarContinuumModel`.
Return a copy of the selected data array as a masked array.
This is functionally identical to :func:`copy_to_array`,
except the output format is a `numpy.ma.MaskedArray`_. The
pixels that are considered to be masked can be specified
using `flag`.
Args:
ext (:obj:`str`, optional):
Name of the extension from which to draw the data.
Must be allowed for the current :attr:`mode`; see
:attr:`data_arrays`.
flag (:obj:`str`, :obj:`list`, optional):
(List of) Flag names that are considered when
deciding if a pixel should be masked. The names
*must* be a valid bit name as defined by
:attr:`bitmask`. If not provided, *ANY* non-zero mask
bit is omitted.
waverange (array-like, optional):
Two-element array with the first and last wavelength
to include in the computation. If None, use the full
wavelength range.
include_missing (:obj:`bool`, optional):
Create an array with a size that accommodates the
missing models.
Returns:
`numpy.ma.MaskedArray`_: A 2D array with a copy of the
data from the selected extension.
"""
return DAPFitsUtil.copy_to_masked_array(self.hdu, ext=ext, mask_ext='MASK', flag=flag,
bitmask=self.bitmask,
allowed_ext=self.spectral_arrays,
waverange=waverange,
missing_bins=self.missing_models if include_missing else None,
nbins=self.nmodels,
unique_bins=DAPFitsUtil.unique_bins(self.hdu['BINID'].data))
[docs]
def construct_models(self, replacement_templates=None, redshift_only=False,
deredshift=False, corrected_dispersion=False):
"""
Reconstruct the best fitting models.
This is largely a wrapper for a :func:`construct_models`
method provided by the :attr:`method` fitting class. As such,
this method will raise an exception if the fitting class has
no such method.
Args:
replacement_templates (:class:`~mangadap.proc.templatelibary.TemplateLibrary`, optional):
Instead of using the template library used to fit the
data, use this library instead. The number of spectra
in the replacement library **must** match the
original number of templates. This is only really
useful when replacing the original set of templates
with ones at a different spectral resolution.
redshift_only (:obj:`bool`, optional):
When constructing the models, only redshift the
spectra, effectively providing the best-fitting model
with a velocity dispersion of 0 km/s.
deredshift (:obj:`bool`, optional):
Return the best-fitting models in the rest frame.
corrected_dispersion (:obj:`bool`, optional):
Return the best-fitting models with a LOSVD that has
been corrected for any resolution difference between
the fitted templates and the binned spectra.
Returns:
`numpy.ndarray`_: The best-fitting models.
Raises:
ValueError:
Raised if there is no defined fitting class.
AttributeError:
Raised if the fitting class has no method called
:func:`construct_models`.
TypeError:
Raised if the provided ``replacement_templates`` are
not an instance of
:class:`mangadap.proc.templatelibary.TemplateLibrary`.
"""
if self.method['fitclass'] is None:
raise ValueError('No class object available for constructing the model!')
if not callable(self.method['fitclass'].construct_models):
raise AttributeError('Provided fit class object has no callable ' \
'\'construct_models\' attribute!')
if replacement_templates is not None \
and not isinstance(replacement_templates, TemplateLibrary):
raise TypeError('Provided template replacements must have type TemplateLibrary.')
# Only the selected models are constructed, others are masked
select = numpy.invert(self.bitmask.flagged(self.hdu['PAR'].data['MASK'],
flag=['NO_FIT', 'FIT_FAILED', 'INSUFFICIENT_DATA']))
templates = self.method['fitpar']['template_library'] if replacement_templates is None \
else replacement_templates
# The only reason it needs the flux data is to get the shape, so
# it doesn't matter that I'm passing the model flux
return self.method['fitclass'].construct_models(templates['WAVE'].data,
templates['FLUX'].data,
self.hdu['WAVE'].data,
self.hdu['FLUX'].data.shape,
self.hdu['PAR'].data, select=select,
redshift_only=redshift_only,
deredshift=deredshift,
corrected_dispersion=corrected_dispersion,
dvtol=1e-9)
# TODO: Get rid of dispaxis?
# TODO: Make this a general routine.
[docs]
@staticmethod
def reset_continuum_mask_window(continuum, dispaxis=1, quiet=False):
"""
Reset the mask of the stellar continuum to a continuous window
from the minimum to maximum valid wavelength.
.. todo::
- Allow continuum to be n-dimensional and use
`numpy.apply_along_axis`.
Args:
continuum (`numpy.ma.MaskedArray`_):
Stellar continuum models.
dispaxis (:obj:`int`, optional):
Wavelength axis in provided ``continuum`` object.
quiet (:obj:`bool`, optional):
Suppress terminal output.
Returns:
`numpy.ma.MaskedArray`_: Continuum object but unmasked
over the full continuous window between the minimum and
maximum valid wavelength.
"""
spatial_shape = DAPFitsUtil.get_spatial_shape(continuum.shape, dispaxis)
if len(spatial_shape) != 1:
raise ValueError('Input array should be two-dimensional!')
# No pixels are masked!
if numpy.sum(numpy.ma.getmaskarray(continuum)) == 0:
return continuum
_continuum = continuum.copy() if dispaxis == 1 else continuum.copy().T
nspec,npix = _continuum.shape
pix = numpy.ma.MaskedArray(numpy.array([ numpy.arange(npix) ]*nspec),
mask=numpy.ma.getmaskarray(_continuum).copy())
min_good_pix = numpy.ma.amin(pix, axis=dispaxis)
max_good_pix = numpy.ma.amax(pix, axis=dispaxis)
for c,s,e in zip(_continuum, min_good_pix,max_good_pix+1):
if isinstance(s, numpy.ma.core.MaskedConstant) \
or isinstance(e, numpy.ma.core.MaskedConstant):
continue
numpy.ma.getmaskarray(c)[s:e] = False
return _continuum if dispaxis == 1 else _continuum.T
[docs]
def unmasked_continuum_model(self, replacement_templates=None, redshift_only=False,
deredshift=False, corrected_dispersion=False):
"""
Return the stellar continuum unmasked over the full continuous
fitting region.
Models are reconstructed based on the model parameters if new
template fluxes are provided or if the returned models should
not include the LOSVD convolution (redshift_only=True). As
such, this is largely a wrapper for :func:`construct_models`
and :func:`reset_continuum_mask_window`.
Args:
replacement_templates (:class:`mangadap.proc.templatelibary.TemplateLibrary`, optional):
Instead of using the template library used to fit the
data, use this library instead. The number of spectra
in the replacement library **must** match the
original number of templates. This is only really
useful when replacing the original set of templates
with ones at a different spectral resolution.
redshift_only (:obj:`bool`, optional):
When constructing the models, only redshift the
spectra, effectively providing the best-fitting model
with a velocity dispersion of 0 km/s.
deredshift (:obj:`bool`, optional):
Return the best-fitting models in the rest frame.
corrected_dispersion (:obj:`bool`, optional):
Return the best-fitting models with a LOSVD that has
been corrected for any resolution difference between
the fitted templates and the binned spectra.
Returns:
`numpy.ndarray`_: The best-fitting models.
"""
# Get the models for the binned spectra
reconstruct = replacement_templates is not None or redshift_only or corrected_dispersion
continuum = self.construct_models(replacement_templates=replacement_templates,
redshift_only=redshift_only, deredshift=deredshift,
corrected_dispersion=corrected_dispersion) \
if reconstruct \
else self.copy_to_masked_array(flag=self.all_spectrum_flags())
return StellarContinuumModel.reset_continuum_mask_window(continuum, quiet=self.quiet)
[docs]
def fill_to_match(self, binid, replacement_templates=None, redshift_only=False,
deredshift=False, corrected_dispersion=False, missing=None):
"""
Get the stellar-continuum model from this objects that matches
the input bin ID matrix.
The output is a 2D matrix ordered by the bin ID; any skipped
index numbers in the maximum of the union of the unique
numbers in the ``binid`` and ``missing`` input are masked.
All this does is find the stellar continuum from spaxels in
this model and match them to the input spaxels. Depending on
how differently the data sets have been binned, this says
nothing about whether or not the returned models are a good
fit to the data!
Args:
binid (`numpy.ndarray`_):
A map of the bin IDs with shape that matches the
spatial shape of the parent datacube; see
:attr:`spatial_shape`. This provides the mapping
between the current stellar continuum models and the
output data array.
replacement_templates (:class:`mangadap.proc.templatelibary.TemplateLibrary`, optional):
Instead of using the template library used to fit the
data, use this library instead. The number of spectra
in the replacement library **must** match the
original number of templates. This is only really
useful when replacing the original set of templates
with ones at a different spectral resolution.
redshift_only (:obj:`bool`, optional):
When constructing the models, only redshift the
spectra, effectively providing the best-fitting model
with a velocity dispersion of 0 km/s.
deredshift (:obj:`bool`, optional):
Return the best-fitting models in the rest frame.
corrected_dispersion (:obj:`bool`, optional):
Return the best-fitting models with a LOSVD that has
been corrected for any resolution difference between
the fitted templates and the binned spectra.
missing (array-like, optional):
Include empty spectra for binids with these numbers.
Ignored if None or an empty array.
Returns:
`numpy.ndarray`_: The best-fitting models matched to the
input bin ID array.
Raises:
ValueError:
Raised if the shape of ``binid`` does not match
:attr:`spatial_shape`.
"""
# The input bin id array must have the same shape as self
if binid.shape != self.spatial_shape:
raise ValueError('Input bin ID matrix has incorrect shape.')
# Construct the best-fitting models
best_fit_continuum = self.unmasked_continuum_model(
replacement_templates=replacement_templates,
redshift_only=redshift_only, deredshift=deredshift,
corrected_dispersion=corrected_dispersion)
# Get the number of output continuua
nbins = numpy.amax(binid).astype(int)+1 if missing is None or len(missing) == 0 else \
numpy.amax( numpy.append(binid.ravel(), missing) ).astype(int)+1
# Map the BINID to the spectrum index, assuming bins are sorted
u, indx, reconstruct = numpy.unique(self['BINID'].data.ravel(), return_index=True,
return_inverse=True)
# u_bin_indx = numpy.arange(len(u))-1
u_bin_indx = numpy.arange(u.size)
if u[0] == -1:
u_bin_indx -= 1
_bin_indx = u_bin_indx[reconstruct].reshape(self.spatial_shape)
# Fill in bins with no models with masked zeros
continuum = numpy.ma.zeros((nbins,best_fit_continuum.shape[1]), dtype=float)
continuum[:,:] = numpy.ma.masked
for i,j in zip(binid.ravel(), _bin_indx.ravel()):
if i < 0 or j < 0:
continue
continuum[i,:] = best_fit_continuum[j,:]
return continuum
[docs]
def matched_kinematics(self, binid, redshift=None, dispersion=100.0, constant=False, cz=False,
corrected=False, min_dispersion=None, nearest=False, missing=None):
r"""
Get the stellar kinematics from this objects that matches the
input bin ID matrix.
This method has a similar intent to :func:`fill_to_match`,
but instead of matching the model spectra, its intent is to
match the stellar kinematics.
For spectra the were not fit but have a bin ID (see
``binid``), the method uses the median (unmasked) kinematic
measurement if no default values are provided (see
``redshift`` and ``dispersion``).
Args:
binid (`numpy.ndarray`_):
2D array with the bin ID associate with each spaxel
in the datacube. Shape must be the same as
:attr:`spatial_shape`.
redshift (:obj:`float`, optional):
The default redshift to use for spectra without a
stellar-continuum fit. Default is to use the median
of the unmasked measurements
dispersion (:obj:`float`, optional):
The default velocity dispersion to use for spectra
without a stellar-continuum fit. Default is 100 km/s.
constant (:obj:`bool`, optional):
Force the function to return a constant redshift and
dispersion for each spectrum, regardless of any
fitted kinematics.
cz (:obj:`bool`, optional):
Return the redshift as cz in km/s, as opposed to
unitless z.
corrected (:obj:`bool`, optional):
Return the velocity dispersions corrected for any
resolution difference between the templates and the
galaxy data. If False, return the uncorrected data.
min_dispersion (:obj:`float`, optional):
Impose a minimum dispersion.
nearest (:obj:`bool`, optional):
Instead of the median of the results for the spectra
that were not fit, use the value from the nearest
bin.
missing (array-like, optional):
Any bin ID numbers missing from the input ``binid``
image needed for constructing the output matched
data.
Returns:
:obj:`tuple`: Returns `numpy.ndarray`_ objects with a
redshift (or cz) and dispersion for each binned spectrum.
Shape is :math:`(N_{\rm bin},)`.
Raises:
TypeError:
Raised if the input redshift or dispersion values are
not single numbers.
ValueError:
Raised if the shape of ``binid`` does not match
:attr:`spatial_shape`.
"""
# Check input
if redshift is not None and not isinstance(redshift, (float,int)):
raise TypeError('Redshift must be a single number or None.')
if dispersion is not None and not isinstance(dispersion, (float,int)):
raise TypeError('Dispersion must be a single number or None.')
if binid.shape != self.spatial_shape:
raise ValueError('Input bin ID map must match the spatial shape of the DRP cube.')
# Get the number of output kinematics
nbins = numpy.amax(binid).astype(int)+1 if missing is None or len(missing) == 0 else \
numpy.amax( numpy.append(binid.ravel(), missing) ).astype(int)+1
# Mask bad values
mask = self.bitmask.flagged(self.hdu['PAR'].data['MASK'].copy(),
[ 'NO_FIT', 'FIT_FAILED', 'INSUFFICIENT_DATA', 'NEAR_BOUND' ])
if numpy.all(mask):
# Everything is masked so use input guesses
warnings.warn('All stellar continuum fits have been masked! Using input guesses.')
str_z = numpy.ma.MaskedArray(self.method['fitpar']['guess_redshift'].copy())
str_d = numpy.ma.MaskedArray(self.method['fitpar']['guess_dispersion'].copy())
# Select one per *stellar-continuum model*
str_z = str_z[self['PAR'].data['BINID_INDEX']]
str_d = str_d[self['PAR'].data['BINID_INDEX']]
else:
# Pull out the data
str_z = numpy.ma.MaskedArray(self.hdu['PAR'].data['KIN'][:,0].copy(), mask=mask) \
/ astropy.constants.c.to('km/s').value
str_d = numpy.ma.MaskedArray(self.hdu['PAR'].data['KIN'][:,1].copy(), mask=mask)
# Apply the sigma corrections if requested. Values with the
# correction larger than the measured value will be masked!
if corrected:
sigma_corr = numpy.ma.MaskedArray(self.hdu['PAR'].data['SIGMACORR_EMP'], mask=mask)
str_d = numpy.ma.sqrt( numpy.square(str_d) - numpy.square(sigma_corr) )
# Set the default values to use when necessary
_redshift = numpy.ma.median(str_z) if redshift is None else redshift
_dispersion = numpy.ma.median(str_d) if dispersion is None else dispersion
if _dispersion is numpy.ma.masked:
_dispersion = numpy.median(self.method['fitpar']['guess_dispersion'])
if min_dispersion is not None and _dispersion < min_dispersion:
_dispersion = min_dispersion
# Just return the single value
if constant:
str_z = numpy.full(nbins, _redshift * astropy.constants.c.to('km/s').value
if cz else _redshift, dtype=float)
str_d = numpy.full(nbins, _dispersion, dtype=float)
return str_z, str_d
# Fill masked values with the nearest datum
if nearest:
# Fill masked values with the nearest datum
replace = numpy.ma.getmaskarray(str_z) | numpy.ma.getmaskarray(str_d)
if numpy.all(replace):
# All are bad so replace with _redshift and _dispersion
str_z = numpy.full(str_z.shape, _redshift, dtype=float)
str_d = numpy.full(str_d.shape, _dispersion, dtype=float)
elif numpy.any(replace):
best_fit_kinematics = numpy.ma.append([str_z], [str_d], axis=0).T
valid_bins = numpy.unique(self['BINID'].data)
if valid_bins[0] == -1:
valid_bins = valid_bins[1:]
coo = self.binned_spectra['BINS'].data['SKY_COO'][valid_bins,:]
kinematics = replace_with_data_from_nearest_coo(coo, best_fit_kinematics, replace)
str_z = kinematics[:,0]
str_d = kinematics[:,1]
else:
# Fill any masked values with the single estimate
str_z = str_z.filled(_redshift)
str_d = str_d.filled(_dispersion)
# Map the BINID to the spectrum index, assuming bins are sorted
# and that the BINID map has -1 BINID values
u, indx, reconstruct = numpy.unique(self['BINID'].data.ravel(), return_index=True,
return_inverse=True)
# u_bin_indx = numpy.arange(len(u))-1
u_bin_indx = numpy.arange(u.size)
if u[0] == -1:
u_bin_indx -= 1
_bin_indx = u_bin_indx[reconstruct].reshape(self.spatial_shape)
# Match the kinematics to the output bin ID map
_str_z = numpy.ma.masked_all(nbins, dtype=float)
_str_d = numpy.ma.masked_all(nbins, dtype=float)
for i,j in zip(binid.ravel(), _bin_indx.ravel()):
if i < 0 or j < 0:
continue
_str_z[i] = str_z[j]
_str_d[i] = str_d[j]
str_z = _str_z.filled(_redshift)
str_d = _str_d.filled(_dispersion)
# Convert to cz velocities (km/s)
if cz:
str_z *= astropy.constants.c.to('km/s').value
return str_z, str_d
# TODO: Function out of date! Keep it around for now though...
# def matched_template_flags(self, binned_spectra):
# """
# Return the templates used during the fit to each spectrum,
# matched to the spectra in the binned_spectra object. For
# spectra with no models, the flags are all set to true.
# """
# if binned_spectra is self.binned_spectra:
# usetpl = self.hdu['PAR'].data['TPLWGT'] > 0
#
# # Number of models matches the numbers of bins
# if binned_spectra.nbins == self.nmodels:
## print('returning usetpl')
# return usetpl
#
# # Fill in bins with no models with masked zeros
## print('Fill in bins with no models with masked zeros')
# ntpl = self.method['fitpar']['template_library'].ntpl
# _usetpl = numpy.ones((binned_spectra.nbins,ntpl), dtype=bool)
# for i,j in enumerate(self.hdu['PAR'].data['BINID_INDEX']):
# _usetpl[j,:] = usetpl[i,:]
# return _usetpl
#
# raise NotImplementedError('Can only match to internal binned_spectra.')