"""
A class hierarchy that fits the emission lines.
----
.. include license and copyright
.. include:: ../include/copy.rst
----
.. include common links, assuming primary doc root is up one directory
.. include:: ../include/links.rst
"""
import inspect
from pathlib import Path
import logging
import warnings
from IPython import embed
import numpy
from astropy.io import fits
import astropy.constants
from ..par.parset import KeywordParSet, ParSet
from ..util.fitsutil import DAPFitsUtil
from ..util.dapbitmask import DAPBitMask
from ..util.log import log_output
from .spatiallybinnedspectra import SpatiallyBinnedSpectra
from .stellarcontinuummodel import StellarContinuumModel
#from .elric import Elric, ElricPar
from .sasuke import Sasuke, SasukePar
from .util import replace_with_data_from_nearest_coo
from .spectralfitting import EmissionLineFit
[docs]
class EmissionLineModelDef(KeywordParSet):
"""
A class that holds the parameters necessary to perform the
emission-line profile fits.
The defined parameters are:
.. include:: ../tables/emissionlinemodeldef.rst
"""
def __init__(self, key='EFITMPL11SSPDB', minimum_snr=0.0, mom_vel_name='Ha-6564',
mom_disp_name=None, fitpar=None, fitclass=None, fitfunc=None, overwrite=False):
# Use the signature to get the parameters and the default values
sig = inspect.signature(self.__class__)
pars = list(sig.parameters.keys())
defaults = [sig.parameters[key].default for key in pars]
# Remaining definitions done by hand
in_fl = [int, float]
par_opt = [ParSet, dict]
values = [key, minimum_snr, mom_vel_name, mom_disp_name, fitpar, fitclass, fitfunc,
overwrite]
dtypes = [str, in_fl, str, str, par_opt, None, None, bool]
can_call = [False, False, False, False, False, False, True, False]
descr = ['Keyword used to distinguish between different emission-line moment databases.',
'Minimum S/N of spectrum to fit',
'Name of the emission-line moments band used to set the initial velocity guess ' \
'for each spaxel.',
'Name of the emission-line moments band used to set the initial velocity ' \
'dispersion guess for each spaxel.',
'Fitting function parameters',
'Class used to perform the fit.',
'Function or method that performs the fit; **must** be callable.',
'If the output file already exists, redo all the calculations and overwrite it.']
super().__init__(pars, values=values, defaults=defaults, dtypes=dtypes, can_call=can_call,
descr=descr)
self._validate()
[docs]
def _validate(self):
"""
Validate the modeling method.
"""
if self['fitfunc'] is None:
warnings.warn('No fitting function defined. Defaulting to use mangadap.proc.ppxffit.')
self['fitpar'] = SasukePar()
self['fitclass'] = Sasuke(EmissionLineModelBitMask())
self['fitfunc'] = self['fitclass'].fit_SpatiallyBinnedSpectra
[docs]
@classmethod
def from_dict(cls, d):
"""
Instantiate the object from a nested dictionary.
The dictionary is expected to have keys for all the class keywords; see
the class instantiation method. It should also include the
``'fit_method'`` keyword used to select the type of fit to perform, and
a nested dictionary selected by the keyword ``'fit'`` with the
parameters required by the fitting method. If these are not provided,
the default fitter, :class:`~mangadap.proc.sasuke.Sasuke` and parameters
will be used.
.. warning::
Currently, the only accepted value for ``'fit_method'`` is
``'sasuke'``, and the ``'fit'`` dictionary must provide the
parameters defined by :class:`~mangadap.proc.sasuke.SasukePar`.
Args:
d (:obj:`dict`):
Dictionary used to instantiate the class.
"""
# Copy over the primary keywords
_d = {}
for key in ['key', 'minimum_snr', 'mom_vel_name', 'mom_disp_name', 'overwrite']:
if key in d.keys():
_d[key] = d[key]
# Set the spatial binning method
if 'fit_method' not in d or d['fit_method'] in ['none', None]:
# Use the default fitter and default fitting parameters!
_d['fitpar'] = None
_d['fitclass'] = None
_d['fitfunc'] = None
elif d['fit_method'] == 'sasuke':
# TODO: How do I pass the emission lines to Sasuke?
_d['fitpar'] = SasukePar.from_dict(d['fit']) if 'fit' in d else SasukePar()
_d['fitclass'] = Sasuke(EmissionLineModelBitMask())
_d['fitfunc'] = _d['fitclass'].fit_SpatiallyBinnedSpectra
else:
raise ValueError('Unrecognized fitting method. Requires direct instantiation or '
'code alterations.')
# Return the instantiation
return super().from_dict(_d)
[docs]
class EmissionLineModelBitMask(DAPBitMask):
r"""
Derived class that specifies the mask bits for the emission-line
modeling. The maskbits defined are:
.. include:: ../tables/emissionlinemodelbitmask.rst
"""
cfg_root = 'emission_line_model_bits'
[docs]
class EmissionLineModel:
r"""
Class that holds the emission-line model fits.
Args:
method (:class:`~mangadap.proc.emissionlinemodel.EmissionLineModelDef`):
Object that defines the main parameters used for the emission-line
model fitting.
loggers (:obj:`list`, optional):
List of `logging.Logger`_ objects to log progress; ignored if
quiet=True. Logging is done using
:func:`~mangadap.util.log.log_output`.
quiet (:obj:`bool`, optional):
Suppress all terminal and logging output.
"""
def __init__(self, method, binned_spectra, stellar_continuum=None,
emission_line_moments=None, redshift=None, dispersion=None, minimum_error=None,
method_list=None, artifact_path=None, ism_line_path=None,
emission_line_db_path=None, output_path=None, output_file=None, hardcopy=True,
overwrite=None, checksum=False, loggers=None, quiet=False):
self.loggers = None
self.quiet = False
# Define the database properties
self.method = method
if not isinstance(self.method, EmissionLineModelDef):
raise TypeError('Method must have type EmissionLineModelDef.')
# Setup the pixel mask
# Mask ISM lines if a list is provided -- KHRR
# use nsigma=2.0 to avoid masking HeI
# self.pixelmask = SpectralPixelMask(artdb=None if self.method['artifacts'] is None else
# ArtifactDB.from_key(self.method['artifacts'],
# directory_path=artifact_path),
# emldb=None if self.method['ism_mask'] is None else
# EmissionLineDB.from_key(self.method['ism_mask'],
# directory_path=ism_line_path),
# waverange=self.method['waverange'], nsig=2.0)
# Setup the emission-line database
# self.emldb = None if self.method['emission_lines'] is None else \
# EmissionLineDB.from_key(self.method['emission_lines'],
# directory_path=emission_line_db_path)
# self.neml = None if self.emldb is None else self.emldb.size
# self.emldb = None
self.neml = None
self.binned_spectra = None
self.redshift = None
self.dispersion = None
self.stellar_continuum = None
# Define the output directory and file
self.directory_path = None # Set in default_paths
self.output_file = None
self.hardcopy = None
# Initialize the objects used in the assessments
self.bitmask = EmissionLineModelBitMask()
self.hdu = None
self.checksum = checksum
self.shape = None
self.spatial_shape = None
self.spatial_index = None
self.spectral_arrays = None
self._assign_spectral_arrays()
self.image_arrays = None
self._assign_image_arrays()
self.nwave = None
self.nmodels = None
self.missing_models = None
# Fit the binned spectra
self.fit(binned_spectra, stellar_continuum=stellar_continuum,
emission_line_moments=emission_line_moments, redshift=redshift,
dispersion=dispersion, minimum_error=minimum_error, output_path=output_path,
output_file=output_file, hardcopy=hardcopy, overwrite=overwrite, loggers=loggers,
quiet=quiet)
def __getitem__(self, key):
return self.hdu[key]
[docs]
@staticmethod
def default_paths(cube, method_key, rdxqa_method, binning_method, stelcont_method=None,
output_path=None, output_file=None):
"""
Set the default directory and file name for the output file.
Args:
cube (:class:`mangadap.datacube.datacube.DataCube`):
Datacube to analyze.
method_key (:obj:`str`):
Keyword designating the method used for the reduction
assessments.
rdxqa_method (:obj:`str`):
The method key for the basic assessments of the datacube.
binning_method (:obj:`str`):
The method key for the spatial binning.
stelcont_method (:obj:`str`, optional):
The method key for the stellar-continuum fitting method. If
None, not included in output file name.
output_path (:obj:`str`, `Path`_, optional):
The path for the output file. If None, the current working
directory is used.
output_file (:obj:`str`, optional):
The name of the output "reference" file. The full path of the
output file will be :attr:`directory_path`/:attr:`output_file`.
If None, the default is to combine ``cube.output_root`` and the
method keys. The order of the keys is the order of operations
(rdxqa, binning, stellar continuum, emission-line model).
Returns:
:obj:`tuple`: Returns a `Path`_ with the output directory and a
:obj:`str` with the output file name.
"""
directory_path = Path('.').resolve() if output_path is None \
else Path(output_path).resolve()
method = f'{rdxqa_method}-{binning_method}'
if stelcont_method is not None:
method = f'{method}-{stelcont_method}'
method = f'{method}-{method_key}'
_output_file = f'{cube.output_root}-{method}.fits.gz' if output_file is None \
else output_file
return directory_path, _output_file
[docs]
def _finalize_cube_mask(self, mask):
"""
Finalize the mask after the 2D mask has been reconstructed
into a 3D cube.
This mostly handles the masks for regions outside the
datacube field of view.
Note that the input mask is both edited in-place and
returned.
.. todo::
- This needs to be abstracted for non-DRP datacubes.
- Describe MAPMASK usage
Args:
mask (`numpy.ndarray`_):
3D array with the current bitmask data.
Returns:
`numpy.ndarray`_: Edited bitmask data.
"""
# Turn on the flag stating that the pixel wasn't used
indx = self.binned_spectra.bitmask.flagged(self.binned_spectra.cube.mask,
flag=self.binned_spectra.do_not_fit_flags())
mask[indx] = self.bitmask.turn_on(mask[indx], 'DIDNOTUSE')
# Turn on the flag stating that the pixel has a foreground star
indx = self.binned_spectra.bitmask.flagged(self.binned_spectra.cube.mask, flag='FORESTAR')
mask[indx] = self.bitmask.turn_on(mask[indx], 'FORESTAR')
# Propagate the MAPMASK to the full cube
for f in ['DIDNOTUSE', 'FORESTAR', 'LOW_SNR']:
indx = self.bitmask.flagged(self['MAPMASK'].data, f)
mask[indx,:] = self.bitmask.turn_on(mask[indx,:], f)
return mask
[docs]
def _assign_spectral_arrays(self):
self.spectral_arrays = [ 'FLUX', 'BASE', 'MASK' ]
[docs]
def _assign_image_arrays(self):
"""
Set :attr:`image_arrays`, which contains the list of extensions
in :attr:`hdu` that are on-sky image data.
"""
self.image_arrays = [ 'BINID' ]
[docs]
def _get_missing_models(self, unique_bins=None, debug=False):
if unique_bins is None:
good_snr = self.binned_spectra.above_snr_limit(self.method['minimum_snr'], debug=debug)
return numpy.sort(self.binned_spectra.missing_bins +
self.binned_spectra['BINS'].data['BINID'][numpy.invert(good_snr)].tolist())
return SpatiallyBinnedSpectra._get_missing_bins(unique_bins)
[docs]
def _get_line_fit_metrics(self, model_flux, model_base, model_mask, model_eml_par,
model_binid=None, window=15, debug=False):
"""
Compute fit-quality metrics near each fitted line.
Primarily a wrapper of :func:`EmissionLineFit.line_metrics`.
Args:
model_flux (`numpy.ndarray`_):
The best-fitting emission-line model spectra.
model_base (`numpy.ndarray`_):
The "baseline" of the emission-line model defined as the
difference between the best-fitting stellar-continuum
model and the best-fitting continuum from the
emission-line modeling procedure.
model_mask (`numpy.ndarray`_):
A boolean or integer (bitmask) array with the mask for
the model data.
model_eml_par (:class:`~mangadap.proc.spectralfitting.EmissionLineFitDataTable`):
A record array with the best-fitting results and
parameters for each emission line. The format is
expected to be given by
:func:`EmissionLineFit._per_emission_line_dtype`. *This
object is edited in place and returned.*
model_binid (`numpy.ndarray`_, optional):
A 2D array with the bin ID assigned to each spaxel with
a fitted emission-line model. The number of
non-negative bin IDs must match the number of fitted
models. If None, this is generated from the BINID
column in the model_eml_par object.
window (:obj:`int`, optional):
The window size in pixels around each line to use for
calculation of the figures of merit.
Returns:
`numpy.recarray`_: Return the input ``model_eml_par``
after filling the LINE_PIXC, AMP, ANR, LINE_NSTAT,
LINE_CHI2, LINE_RMS, and LINE_FRMS columns.
"""
# Get the baseline continuum
bin_indx = DAPFitsUtil.downselect_bins(self.binned_spectra['BINID'].data.ravel(),
model_eml_par['BINID']).reshape(self.spatial_shape) \
if model_binid is None else model_binid
# continuum = numpy.ma.MaskedArray(numpy.zeros(model_base.shape, dtype=float)) \
# if self.stellar_continuum is None else \
# self.stellar_continuum.fill_to_match(bin_indx, missing=self.missing_models)
# If stellar continuum available, masked array returned by
# fill_to_match needs to be filled with zeros to correctly
# reconstruct the emission-line continuum.
if self.stellar_continuum is None:
continuum = model_base
else:
continuum = self.stellar_continuum.fill_to_match(bin_indx,
missing=self.missing_models).filled(0.)
indx = numpy.unique(bin_indx.ravel())
if indx[0] == -1:
indx = indx[1:]
continuum = continuum[indx,:] + model_base
# Interpret the input mask as either a boolean or bitmask array
if model_mask.dtype is numpy.dtype('bool'):
_model_mask = numpy.zeros(model_flux.shape, dtype=self.bitmask.minimum_dtype())
_model_mask[model_mask] = self.bitmask.turn_on(_model_mask[model_mask], 'DIDNOTUSE')
else:
_model_mask = model_mask
# Compute the full (continuum + emission lines) model spectra
model = numpy.ma.MaskedArray(model_flux+continuum, mask=_model_mask>0)
pixelmask = None if 'pixelmask' not in self.method['fitpar'].keys() \
else self.method['fitpar']['pixelmask']
# Get the fitted spectra
if self.method['fitpar']['deconstruct_bins'] != 'ignore':
# The models should be for the individual spaxels
bins_to_fit = EmissionLineFit.select_binned_spectra_to_fit(self.binned_spectra,
minimum_snr=self.method['minimum_snr'],
stellar_continuum=self.stellar_continuum,
debug=debug)
# NOTE: bins_to_fit only used in this spaxel selection if debug=True
spaxel_to_fit = EmissionLineFit.select_spaxels_to_fit(self.binned_spectra,
minimum_snr=self.method['minimum_snr'],
bins_to_fit=bins_to_fit, debug=debug)
wave, flux, ferr, _ = EmissionLineFit.get_spectra_to_fit(self.binned_spectra,
pixelmask=pixelmask,
select=spaxel_to_fit,
error=True,
original_spaxels=True)
else:
# The models should be for the binned spectra
bins_to_fit = EmissionLineFit.select_binned_spectra_to_fit(self.binned_spectra,
minimum_snr=self.method['minimum_snr'],
stellar_continuum=self.stellar_continuum,
debug=debug)
wave, flux, ferr, _ = EmissionLineFit.get_spectra_to_fit(self.binned_spectra,
pixelmask=pixelmask,
select=bins_to_fit, error=True)
# Get the line metrics
return EmissionLineFit.line_metrics(self.method['fitpar'].emldb, wave, flux, ferr, model,
model_eml_par, bitmask=self.bitmask, window=window)
[docs]
def _construct_2d_hdu(self, good_snr, model_flux, model_base, model_mask, model_eml_par,
model_fit_par=None, model_binid=None):
"""
Construct :attr:`hdu` that is held in memory for manipulation of
the object. See :func:`construct_3d_hdu` if you want to convert
the object into a DRP-like datacube.
"""
if not self.quiet:
log_output(self.loggers, 1, logging.INFO, 'Constructing hdu ...')
# Initialize the headers
pri_hdr = self._initialize_primary_header()
pri_hdr = self._add_method_header(pri_hdr, model_binid=model_binid)
map_hdr = DAPFitsUtil.build_map_header(self.binned_spectra.cube.fluxhdr,
'K Westfall <westfall@ucolick.org>')
# Get the spatial map mask
map_mask = numpy.zeros(self.spatial_shape, dtype=self.bitmask.minimum_dtype())
# Always isolate any spaxels with foreground stars
indx = self.binned_spectra.bitmask.flagged(self.binned_spectra['MAPMASK'].data, 'FORESTAR')
map_mask[indx] = self.bitmask.turn_on(map_mask[indx], 'FORESTAR')
# Allow the fitting function to redefine the bin IDs associated
# with each model
if model_binid is None:
# Add any spaxel not used because it was flagged by the
# binning step
indx = self.binned_spectra['MAPMASK'].data > 0
map_mask[indx] = self.bitmask.turn_on(map_mask[indx], 'DIDNOTUSE')
# Get the bins that were below the S/N limit
indx = numpy.invert(DAPFitsUtil.reconstruct_map(self.spatial_shape,
self.binned_spectra['BINID'].data.ravel(),
good_snr, dtype='bool')) & (map_mask == 0)
map_mask[indx] = self.bitmask.turn_on(map_mask[indx], 'LOW_SNR')
# Get the bin ids with fitted models
bin_indx = DAPFitsUtil.downselect_bins(self.binned_spectra['BINID'].data.ravel(),
model_eml_par['BINID']).reshape(self.spatial_shape)
else:
# Assume any model with a binid less than zero is from a
# spaxel that was not used
indx = model_binid < 0
map_mask[indx] = self.bitmask.turn_on(map_mask[indx], 'DIDNOTUSE')
# The number of valid bins MUST match the number of
# measurements
nvalid = numpy.sum(numpy.invert(indx))
if nvalid != len(model_eml_par):
raise ValueError('Provided model id does not match the number of measurements.')
# Get the bin ids with fitted models
bin_indx = model_binid
# Allow the fitting function to return a boolean model mask
if model_mask.dtype is numpy.dtype('bool'):
_model_mask = numpy.zeros(model_flux.shape, dtype=self.bitmask.minimum_dtype())
_model_mask[model_mask] = self.bitmask.turn_on(_model_mask[model_mask], 'DIDNOTUSE')
else:
_model_mask = model_mask
# Save the data to the hdu attribute
par_hdu = fits.BinTableHDU(data=None, name='PAR') if self.method['fitpar'].emldb is None \
else self.method['fitpar'].emldb.to_datatable().to_hdu(name='PAR')
fit_hdu = fits.BinTableHDU(data=None, name='FIT') if model_fit_par is None else \
model_fit_par.to_hdu(name='FIT', add_dim=True)
self.hdu = fits.HDUList([fits.PrimaryHDU(header=pri_hdr),
fits.ImageHDU(data=model_flux.data, name='FLUX'),
fits.ImageHDU(data=model_base.data, name='BASE'),
fits.ImageHDU(data=_model_mask, name='MASK'),
self.binned_spectra['WAVE'].copy(),
fits.ImageHDU(data=bin_indx, header=map_hdr, name='BINID'),
fits.ImageHDU(data=map_mask, header=map_hdr, name='MAPMASK'),
par_hdu, fit_hdu,
model_eml_par.to_hdu(name='EMLDATA', add_dim=True)
])
[docs]
def file_name(self):
"""Return the name of the output file."""
return self.output_file
[docs]
def file_path(self):
"""Return the full path to the output file."""
if self.directory_path is None or self.output_file is None:
return None
return self.directory_path / self.output_file
[docs]
def fit(self, binned_spectra, stellar_continuum=None, emission_line_moments=None,
redshift=None, dispersion=None, minimum_error=None, output_path=None, output_file=None,
hardcopy=True, overwrite=None, loggers=None, quiet=False):
"""
Fit the emission lines.
"""
# Initialize the reporting
if loggers is not None:
self.loggers = loggers
self.quiet = quiet
# SpatiallyBinnedSpectra object always needed
if binned_spectra is None:
raise ValueError('Must provide spectra object for fitting.')
if not isinstance(binned_spectra, SpatiallyBinnedSpectra):
raise TypeError('Must provide a valid SpatiallyBinnedSpectra object!')
if binned_spectra.hdu is None:
raise ValueError('Provided SpatiallyBinnedSpectra object is undefined!')
self.binned_spectra = binned_spectra
# Continuum accounts for underlying absorption
if stellar_continuum is not None \
and not isinstance(stellar_continuum, StellarContinuumModel):
raise TypeError('Must provide a valid StellarContinuumModel object.')
self.stellar_continuum = stellar_continuum
# Emission-line moments can be used to set the initial guess
# kinematics
if emission_line_moments is not None:
# No longer checks for this; errors will be raised
# just by trying to use the object...
# if not isinstance(emission_line_moments, EmissionLineMoments):
# raise TypeError('Must provide a valid EmissionLineMoments object.')
names = emission_line_moments.channel_names()
if self.method['mom_vel_name'] is not None \
and self.method['mom_vel_name'] not in names:
raise ValueError('{0}'.format(self.method['mom_vel_name']) +
' is not a valid name in the EmissionLineMoments object.')
if self.method['mom_disp_name'] is not None \
and self.method['mom_disp_name'] not in names:
raise ValueError('{0}'.format(self.method['mom_disp_name']) +
' is not a valid name in the EmissionLineMoments object.')
self.shape = self.binned_spectra.shape
self.spatial_shape =self.binned_spectra.spatial_shape
self.spatial_index = self.binned_spectra.spatial_index.copy()
self.nwave = self.binned_spectra.nwave
# Get the guess kinematics
self._assign_input_kinematics(emission_line_moments, redshift, dispersion)
#---------------------------------------------------------------
# Get the good spectra
# debug=True
debug=False
# good_snr = EmissionLineFit.select_binned_spectra_to_fit(self.binned_spectra,
# minimum_snr=self.method['minimum_snr'],
# stellar_continuum=self.stellar_continuum,
# debug=debug)
good_bins = EmissionLineFit.select_binned_spectra_to_fit(self.binned_spectra,
minimum_snr=self.method['minimum_snr'],
stellar_continuum=self.stellar_continuum,
debug=debug)
# NOTE: bins_to_fit only used in this spaxel selection if debug=True
good_spax = EmissionLineFit.select_spaxels_to_fit(self.binned_spectra,
minimum_snr=self.method['minimum_snr'],
bins_to_fit=good_bins, debug=debug)
# Report
if not self.quiet:
log_output(self.loggers, 1, logging.INFO, '-'*50)
log_output(loggers, 1, logging.INFO, '{0:^50}'.format('EMISSION-LINE PROFILE FITTING'))
log_output(self.loggers, 1, logging.INFO, '-'*50)
log_output(self.loggers, 1, logging.INFO, 'Number of binned spectra: {0}'.format(
self.binned_spectra.nbins))
if len(self.binned_spectra.missing_bins) > 0:
log_output(self.loggers, 1, logging.INFO, 'Missing bins: {0}'.format(
len(self.binned_spectra.missing_bins)))
log_output(self.loggers, 1, logging.INFO, 'Bins with good S/N: {0}'.format(
numpy.sum(good_bins)))
log_output(self.loggers, 1, logging.INFO, 'Spaxels with good S/N: {0}'.format(
numpy.sum(good_spax)))
# log_output(self.loggers, 1, logging.INFO, 'Bin deconstruction method: {0}'.format(
# self.method['deconstruct_bins']))
if numpy.sum(good_bins) == 0:
raise ValueError('No good spectra to fit!')
#---------------------------------------------------------------
# Fill in any remaining binning parameters
# self._fill_method_par()
if self.method['fitpar'] is not None and hasattr(self.method['fitpar'], 'fill'):
self.method['fitpar'].fill(self.binned_spectra,
stellar_continuum=self.stellar_continuum,
guess_vel=self.redshift*astropy.constants.c.to('km/s').value,
guess_sig=self.dispersion, output_path=self.directory_path,
#hardcopy=self.tpl_hardcopy, symlink_dir=self.symlink_dir,
loggers=self.loggers, quiet=self.quiet)
_overwrite = self.method['overwrite'] if overwrite is None else overwrite
# (Re)Set the output paths
self.directory_path, self.output_file \
= EmissionLineModel.default_paths(self.binned_spectra.cube, self.method['key'],
self.binned_spectra.rdxqa.method['key'], self.binned_spectra.method['key'],
stelcont_method=None if self.stellar_continuum is None
else self.stellar_continuum.method['key'],
output_path=output_path, output_file=output_file)
#---------------------------------------------------------------
# Check that the file path is defined
ofile = self.file_path()
if ofile is None:
raise ValueError('File path for output file is undefined!')
# Report
if not self.quiet:
log_output(self.loggers, 1, logging.INFO, f'Output path: {self.directory_path}')
log_output(self.loggers, 1, logging.INFO, f'Output file: {self.output_file}')
#---------------------------------------------------------------
# If the file already exists, and not overwriting, just read the
# file
if ofile.exists() and not _overwrite:
self.hardcopy = True
if not self.quiet:
log_output(self.loggers, 1, logging.INFO, 'Using existing file')
self.read(checksum=self.checksum, debug=debug)
if not self.quiet:
log_output(self.loggers, 1, logging.INFO, '-'*50)
return
#---------------------------------------------------------------
# Fit the spectra
# Mask should be fully defined within the fitting function This
# should also add the equivalent widths. TODO: Shouldn't the
# equivalent widths be done here?
model_flux, model_base, model_mask, model_fit_par, model_eml_par, model_binid = \
self.method['fitfunc'](self.binned_spectra,
stellar_continuum=self.stellar_continuum,
good_bins=good_bins, good_spax=good_spax,
par=self.method['fitpar'], loggers=self.loggers,
quiet=self.quiet, debug=debug)
# Impose a minimum error because of the conversion to inverse
# variance when constructing a MAPS file; applied to the error
# quantities from spectralfitting.EmissionLineFit dtype
# parameters: FLUXERR, KINERR, EWERR
# TODO: Add a maskbit as well? Yes, but in MAPS construction
indx = model_eml_par['FLUXERR'] < (numpy.finfo(model_eml_par['FLUX'].dtype).eps \
if minimum_error is None else minimum_error)
model_eml_par['FLUXERR'][indx] = 0.0
indx = model_eml_par['KINERR'] < (numpy.finfo(model_eml_par['KIN'].dtype).eps \
if minimum_error is None else minimum_error)
model_eml_par['KINERR'][indx] = 0.0
indx = model_eml_par['EWERR'] < (numpy.finfo(model_eml_par['EW'].dtype).eps \
if minimum_error is None else minimum_error)
model_eml_par['EWERR'][indx] = 0.0
# TODO: Test if, when deconstruct_bins is true, the model_binid
# spaxels (with fits) all have unique bin IDs.
# The emission-line fitting can proceed without knowing which
# lines are fit; this sets the number of lines
if self.neml is None:
self.neml = model_eml_par['FLUX'].shape[1]
# The number of models returned should be the number of "good" binned
# spectra if the fitter does *not* deconstruct the bins.
# TODO: This may now cause problems with Elric...
if self.method['fitpar']['deconstruct_bins'] == 'ignore' \
and model_flux.shape[0] != numpy.sum(good_bins):
print(model_flux.shape[0])
print(numpy.sum(good_bins))
raise ValueError('Unexpected returned shape of fitted emission-line models.')
#---------------------------------------------------------------
# Set the number of models and the missing models
self.nmodels = model_flux.shape[0]
unique_bins = None if model_binid is None else numpy.unique(model_binid.ravel())
self.missing_models = self._get_missing_models(unique_bins=unique_bins, debug=debug)
# Report
if not self.quiet:
log_output(self.loggers, 1, logging.INFO, f'Fitted models: {self.nmodels}')
if len(self.missing_models) > 0:
log_output(self.loggers, 1, logging.INFO,
f'Missing models: {len(self.missing_models)}')
log_output(self.loggers, 1, logging.INFO, 'Calculating fit metrics')
# Compute the line-fit metrics
model_eml_par = self._get_line_fit_metrics(model_flux, model_base, model_mask,
model_eml_par, model_binid=model_binid,
debug=debug)
# Construct the 2d hdu list that only contains the fitted models
self.hardcopy = hardcopy
self._construct_2d_hdu(good_bins, model_flux, model_base, model_mask, model_eml_par,
model_fit_par=model_fit_par, model_binid=model_binid)
#---------------------------------------------------------------
# Write the data, if requested
if self.hardcopy:
if not self.directory_path.exists():
self.directory_path.mkdir(parents=True)
self.write(overwrite=_overwrite)
if not self.quiet:
log_output(self.loggers, 1, logging.INFO, '-'*50)
[docs]
def construct_3d_hdu(self):
"""
Reformat the model spectra into a cube matching the shape of
the DRP fits file.
"""
# Report
if not self.quiet:
log_output(self.loggers, 1, logging.INFO,
'Constructing emission-line model datacube ...')
bin_indx = self.hdu['BINID'].data.copy()
model_flux = self.hdu['FLUX'].data.copy()
model_base = self.hdu['BASE'].data.copy()
model_mask = self.hdu['MASK'].data.copy()
flux, base, mask = DAPFitsUtil.reconstruct_cube(self.shape, bin_indx.ravel(),
[model_flux, model_base, model_mask])
mask = self._finalize_cube_mask(mask)
# Primary header is identical regardless of the shape of the
# extensions
hdr = self.hdu['PRIMARY'].header.copy()
cube_hdr = DAPFitsUtil.build_cube_header(self.binned_spectra.cube,
'K Westfall <westfall@ucolick.org>')
par_hdu = self.hdu['PAR'].copy() if 'data' in self.hdu['PAR'].__dict__ \
else fits.BinTableHDU(data=None, name='PAR')
fit_hdu = self.hdu['FIT'].copy() if 'data' in self.hdu['FIT'].__dict__ \
else fits.BinTableHDU(data=None, name='FIT')
# Return the converted hdu without altering the internal hdu
return fits.HDUList([ fits.PrimaryHDU(header=hdr),
fits.ImageHDU(data=flux, header=cube_hdr, name='FLUX'),
fits.ImageHDU(data=base, header=cube_hdr, name='BASE'),
fits.ImageHDU(data=mask, header=cube_hdr, name='MASK'),
self.hdu['WAVE'].copy(),
self.hdu['BINID'].copy(),
par_hdu,
fit_hdu,
self.hdu['EMLDATA'].copy()
])
# Exact same function as used by SpatiallyBinnedSpectra
[docs]
def write(self, match_DRP=False, overwrite=False):
"""
Write the hdu object to the file.
"""
# Convert the spectral arrays in the HDU to a 3D cube and write
# it
if match_DRP:
hdu = self.construct_3d_hdu()
DAPFitsUtil.write(hdu, str(self.file_path()), overwrite=overwrite, checksum=True,
loggers=self.loggers, quiet=self.quiet)
return
# Just write the unique (2D) data
DAPFitsUtil.write(self.hdu, str(self.file_path()), overwrite=overwrite, checksum=True,
loggers=self.loggers, quiet=self.quiet)
[docs]
def read(self, ifile=None, strict=True, checksum=False, debug=False):
"""
Read an existing file with an existing set of emission-line
models.
"""
if ifile is None:
ifile = self.file_path()
if not ifile.exists():
raise FileNotFoundError('File does not exist!: {0}'.format(ifile))
if self.hdu is not None:
self.hdu.close()
self.hdu = DAPFitsUtil.read(str(ifile), checksum=checksum)
# Force load the PAR and FIT extensions because it may be
# checked by self.construct_3d_hdu() or construct_maps_file in
# dapfits.py
try:
if self.hdu['PAR'].data is None:
pass
except:
warnings.warn('Could not load emission-line parameter data.')
try:
if self.hdu['FIT'].data is None:
pass
except:
warnings.warn('Could not load emission-line fit data.')
# Make sure that the number of emission-lines is set
_neml = self.hdu['EMLDATA'].data['FLUX'].shape[1]
if self.neml is not None and _neml != self.neml:
raise ValueError('Num. of emission lines read does not match emission-line database.')
if self.neml is None:
self.neml = _neml
# Confirm that the internal method is the same as the method
# that was used in writing the file
if self.hdu['PRIMARY'].header['ELFKEY'] != self.method['key']:
if strict:
raise ValueError('Keywords in header do not match specified method keywords!')
else:
warnings.warn('Keywords in header do not match specified method keywords!')
# TODO: "strict" should also check other aspects of the file to
# make sure that the details of the method are also the same,
# not just the keyword
# Attempt to read the modeling parameters
if self.method['fitpar'] is not None:
try:
self.method['fitpar'].fromheader(self.hdu['PRIMARY'].header)
except:
if not self.quiet:
warnings.warn('Fit parameter object does not have a fromheader ' \
'attribute. No parameters head from header.')
self.nmodels = self.hdu['FLUX'].shape[0]
unique_bins = numpy.unique(self.hdu['BINID'].data.ravel()) \
if self.hdu['PRIMARY'].header['ELREBIN'] else None
self.missing_models = self._get_missing_models(unique_bins=unique_bins, debug=debug)
[docs]
def copy_to_array(self, ext='FLUX', waverange=None, include_missing=False):
r"""
Wrapper for :func:`mangadap.util.fitsutil.DAPFitsUtil.copy_to_array`
specific for :class:`EmissionLineModel`.
Return a copy of the selected data array. The array size is
always :math:`N_{\rm models} \times N_{\rm wavelength}`; i.e., the
data is always flattened to two dimensions and the unique
spectra are selected.
Args:
ext (:obj:`str`, optional):
Name of the extension from which to draw the data. Must be
allowed for the current :attr:`mode`; see :attr:`data_arrays`.
Default is ``'FLUX'``.
waverange (array-like, optional):
Two-element array with the first and last wavelength to include
in the computation. Default is to use the full wavelength
range.
include_missing (:obj:`bool`, optional):
Create an array with a size that accommodates the missing
models.
Returns:
`numpy.ndarray`_: A 2D array with a copy of the data from
the selected extension.
"""
return DAPFitsUtil.copy_to_array(self.hdu, ext=ext, allowed_ext=self.spectral_arrays,
waverange=waverange, missing_bins=self.missing_models if include_missing else None,
nbins=self.nmodels, unique_bins=DAPFitsUtil.unique_bins(self.hdu['BINID'].data))
[docs]
def copy_to_masked_array(self, ext='FLUX', flag=None, waverange=None, include_missing=False):
r"""
Wrapper for
:func:`mangadap.util.fitsutil.DAPFitsUtil.copy_to_masked_array` specific
for :class:`EmissionLineModel`.
Return a copy of the selected data array as a masked array. This is
functionally identical to :func:`copy_to_array`, except the output
format is a `numpy.ma.MaskedArray`_. The pixels that are considered to
be masked can be specified using `flag`.
Args:
ext (:obj:`str`, optional):
Name of the extension from which to draw the data. Must be
allowed for the current :attr:`mode`; see :attr:`data_arrays`.
Default is `'FLUX'`.
flag (:obj:`str`, :obj:`list`, optional):
(List of) Flag names that are considered when deciding if a
pixel should be masked. The names *must* be a valid bit name as
defined by :attr:`bitmask`. If not provided, *ANY* non-zero
mask bit is omitted.
waverange (array-like, optional):
Two-element array with the first and last wavelength to include
in the computation. Default is to use the full wavelength
range.
include_missing (:obj:`bool`, optional):
Create an array with a size that accommodates the missing
models.
Returns:
`numpy.ndarray`_: A 2D array with a copy of the data from
the selected extension.
"""
return DAPFitsUtil.copy_to_masked_array(self.hdu, ext=ext, mask_ext='MASK', flag=flag,
bitmask=self.bitmask, allowed_ext=self.spectral_arrays, waverange=waverange,
missing_bins=self.missing_models if include_missing else None,
nbins=self.nmodels, unique_bins=DAPFitsUtil.unique_bins(self.hdu['BINID'].data))
[docs]
def fill_to_match(self, binid, include_base=False, missing=None):
"""
Get the emission-line model that matches the input bin ID matrix. The
output is a 2D matrix ordered by the bin ID; any skipped index numbers
in the maximum of the union of the unique numbers in the `binid` and
`missing` input are masked.
Use `include_base` to include the baseline in the output emission-line
models.
"""
# The input bin id array must have the same shape as self
if binid.shape != self.spatial_shape:
raise ValueError('Input bin ID matrix has incorrect shape.')
# Construct the best-fitting models
best_fit_model = self.copy_to_array()
if include_base:
best_fit_model += self.copy_to_array(ext='BASE')
# Get the number of output models
nbins = numpy.amax(binid).astype(int)+1 if missing is None or len(missing) == 0 else \
numpy.amax( numpy.append(binid.ravel(), missing) ).astype(int)+1
# Map the BINID to the spectrum index, assuming bins are sorted
u, indx, reconstruct = numpy.unique(self['BINID'].data.ravel(), return_index=True,
return_inverse=True)
u_bin_indx = numpy.arange(u.size)
if u[0] == -1:
u_bin_indx -= 1
_bin_indx = u_bin_indx[reconstruct].reshape(self.spatial_shape)
# Fill in bins with no models with masked zeros
emission_lines = numpy.ma.zeros((nbins,best_fit_model.shape[1]), dtype=float)
emission_lines[:,:] = numpy.ma.masked
for i,j in zip(binid.ravel(), _bin_indx.ravel()):
if i < 0 or j < 0:
continue
emission_lines[i,:] = best_fit_model[j,:]
return emission_lines
[docs]
def matched_kinematics(self, binid, line_name, redshift=None, dispersion=100.0, constant=False,
cz=False, corrected=False, min_dispersion=None, nearest=False,
missing=None):
"""
Return the redshift and velocity dispersion for all the
spectra based on the emission-line model fit to a single line.
For spectra the were not fit, use the median (unmasked)
kinematic measurement if no default value is provided as an
argument.
Args:
binid (`numpy.ndarray`_):
2D array with the bin ID associate with each spaxel
in the datacube.
line (str): Line to use for the kinematics. Must match one
of the lines created by :func:`channel_names`. Function
will raise an exception if the line_name is None or if
the channel names cannot be constructed.
redshift (float): (**Optional**) The default redshift to use
for spectra without an emission-line fit. Default is to
use the median of the unmasked measurements.
dispersion (float): (**Optional**) The default velocity
dispersion to use for spectra without an emission-line
fit. Default is 100 km/s.
constant (bool): (**Optional**) Force the function to return
a constant redshift and dispersion for each spectrum,
regardless of any fitted kinematics.
cz (bool): (**Optional**) Return the redshift as cz in km/s,
as opposed to unitless z.
corrected (bool): (**Optional**) By default, the returned
velocity dispersions are the measured values, which will
include the instrumental resolution. Setting corrected
to True will return the corrected dispersions.
min_dispersion (float): (**Optional**) Impose a minimum
dispersion.
nearest (bool): (**Optional**) Instead of the median of the
results for the spectra that were not fit, use the value
from the nearest bin.
missing (list): (**Optional**) Any bin ID numbers missing
from the input `binid` image needed for constructing the
output matched data.
Returns:
`numpy.ndarray`_: Returns arrays with a redshift (or cz)
and dispersion for each spectrum.
Raises:
TypeError: Raised if the input redshift or dispersion values
are not single numbers.
"""
# Check input
if redshift is not None and not isinstance(redshift, (float,int)):
raise TypeError('Redshift must be a single number or None.')
if dispersion is not None and not isinstance(dispersion, (float,int)):
raise TypeError('Dispersion must be a single number or None.')
if binid.shape != self.spatial_shape:
raise ValueError('Input bin ID map must match the spatial shape of the DRP cube.')
# Get the channel names
names = self.channel_names()
if line_name not in names:
raise ValueError('Line {0} not present in channel list.'.format(line_name))
vel_channel = numpy.arange(len(names))[names == line_name][0]
# Get the number of output kinematics
nbins = numpy.amax(binid).astype(int)+1 if missing is None or len(missing) == 0 else \
numpy.amax( numpy.append(binid.ravel(), missing) ).astype(int)+1
# Mask bad values
mask = self.bitmask.flagged(self.hdu['EMLDATA'].data['MASK'][:,vel_channel].copy(),
[ 'NO_FIT', 'FIT_FAILED', 'INSUFFICIENT_DATA', 'NEAR_BOUND' ])
if numpy.all(mask):
# Unlike StellarContinuumModel.matched_kinematics, the input
# guesses here always are based on the number of binned
# spectra. So if the class is meant to deconstruct these
# bins, we can't just use the guess kinematics when all the
# output is masked. So we'll just use the defaults.
warnings.warn('All emission line models have been masked! Using defaults.')
eml_z = numpy.ma.MaskedArray(numpy.full(self.nmodels,
(0.0 if redshift is None else redshift),
dtype=float))
eml_d = numpy.ma.MaskedArray(numpy.full(self.nmodels,
(100.0 if dispersion is None else dispersion), dtype=float))
else:
# Pull out the data
eml_z = numpy.ma.MaskedArray(self.hdu['EMLDATA'].data['KIN'][:,vel_channel,0].copy(),
mask=mask) / astropy.constants.c.to('km/s').value
eml_d = numpy.ma.MaskedArray(self.hdu['EMLDATA'].data['KIN'][:,vel_channel,1].copy(),
mask=mask)
# Apply the sigma corrections if requested. Values with the
# correction larger than the measured value will be masked!
if corrected:
sigma_corr = numpy.ma.MaskedArray(
self.hdu['EMLDATA'].data['SIGMACORR'][:,vel_channel], mask=mask)
eml_d = numpy.ma.sqrt(numpy.square(eml_d) - numpy.square(sigma_corr))
# Set the default values to use when necessary
_redshift = numpy.ma.median(eml_z) if redshift is None else redshift
_dispersion = numpy.ma.median(eml_d) if dispersion is None else dispersion
if _dispersion is numpy.ma.masked:
_dispersion = 100.0
if min_dispersion is not None and _dispersion < min_dispersion:
_dispersion = min_dispersion
# Just return the single value
if constant:
eml_z = numpy.full(nbins, _redshift * astropy.constants.c.to('km/s').value
if cz else _redshift, dtype=float)
eml_d = numpy.full(nbins, _dispersion, dtype=float)
return eml_z, eml_d
if nearest:
replace = numpy.ma.getmaskarray(eml_z) | numpy.ma.getmaskarray(eml_d)
if numpy.all(replace):
# All are bad so replace with _redshift and _dispersion
eml_z = numpy.full(eml_z.shape, _redshift, dtype=float)
eml_d = numpy.full(eml_d.shape, _dispersion, dtype=float)
elif numpy.any(replace):
# Fill masked values with the nearest datum
best_fit_kinematics = numpy.ma.append([eml_z], [eml_d], axis=0).T
if self.method['fitpar']['deconstruct_bins'] != 'ignore':
indx = self['BINID'].data.ravel() > -1
coo = self.binned_spectra.rdxqa['SPECTRUM'].data['SKY_COO'][indx,:]
else:
valid_bins = numpy.unique(self['BINID'].data)
if valid_bins[0] == -1:
valid_bins = valid_bins[1:]
coo = self.binned_spectra['BINS'].data['SKY_COO'][valid_bins,:]
replace = eml_z.mask | eml_d.mask
kinematics = replace_with_data_from_nearest_coo(coo, best_fit_kinematics, replace)
eml_z = kinematics[:,0]
eml_d = kinematics[:,1]
else:
# Fill any masked values with the single estimate
eml_z = eml_z.filled(_redshift)
eml_d = eml_d.filled(_dispersion)
# Map the BINID to the spectrum index, assuming bins are sorted
# and that the BINID map has -1 BINID values
u, indx, reconstruct = numpy.unique(self['BINID'].data.ravel(), return_index=True,
return_inverse=True)
# u_bin_indx = numpy.arange(len(u))-1
u_bin_indx = numpy.arange(u.size)
if u[0] == -1:
u_bin_indx -= 1
_bin_indx = u_bin_indx[reconstruct].reshape(self.spatial_shape)
# Match the kinematics to the output bin ID map
_eml_z = numpy.ma.masked_all(nbins, dtype=float)
_eml_d = numpy.ma.masked_all(nbins, dtype=float)
for i,j in zip(binid.ravel(), _bin_indx.ravel()):
if i < 0 or j < 0:
continue
_eml_z[i] = eml_z[j]
_eml_d[i] = eml_d[j]
eml_z = _eml_z.filled(_redshift)
eml_d = _eml_d.filled(_dispersion)
# Convert to cz velocities (km/s)
if cz:
eml_z *= astropy.constants.c.to('km/s').value
return eml_z, eml_d
[docs]
def fill_continuum_to_match(self, binid, replacement_templates=None, redshift_only=False,
deredshift=False, corrected_dispersion=False, missing=None):
"""
Get the emission-line continuum model, if possible, that matches
the input bin ID matrix. The output is a 2D matrix ordered by
the bin ID; any skipped index numbers in the maximum of the
union of the unique numbers in the `binid` and `missing` input
are masked.
Use `replacement_templates` only if the number of templates is
identical to the number used during the fitting procedure. Use
`redshift_only` to produce the best-fitting model without and
velocity dispersion. Use `corrected_dispersion` to produce the
model using the corrected velocity dispersion.
"""
if self.stellar_continuum is None:
raise ValueError('EmissionLineModel object has no associated StellarContinuumModel.')
if self.method['fitclass'] is None:
raise ValueError('No class object available for constructing the model!')
if not callable(self.method['fitclass'].construct_continuum_models):
raise AttributeError('Provided fit class object has no callable ' \
'\'construct_continuum_models\' attribute!')
if corrected_dispersion and 'NEAREST_BIN' not in self.hdu['FIT'].columns.names:
raise KeyError('Nearest stellar continuum bin not set in fit parameters. Cannot '
'apply dispersion corrections.')
# The input bin id array must have the same shape as self
if binid.shape != self.spatial_shape:
raise ValueError('Input bin ID matrix has incorrect shape.')
# Pull out the dispersion corrections, if requested
dispersion_corrections = None
if corrected_dispersion:
nsc = self.stellar_continuum['PAR'].data['BINID'].size
indx = [ numpy.arange(nsc)[self.stellar_continuum['PAR'].data['BINID'] == nb][0]
for nb in self.hdu['FIT'].data['NEAREST_BIN'] ]
dispersion_corrections = self.stellar_continuum['PAR'].data['SIGMACORR_SRES'][indx]
# Get the continuum models
best_fit_continuum = self.construct_continuum_models(
replacement_templates=replacement_templates,
deredshift=deredshift, redshift_only=redshift_only,
dispersion_corrections=dispersion_corrections)
# Get the number of output continuua
nbins = numpy.amax(binid).astype(int)+1 if missing is None or len(missing) == 0 else \
numpy.amax( numpy.append(binid.ravel(), missing) ).astype(int)+1
# Map the BINID to the spectrum index, assuming bins are sorted
# and that the BINID map has -1 BINID values
u, indx, reconstruct = numpy.unique(self['BINID'].data.ravel(), return_index=True,
return_inverse=True)
# u_bin_indx = numpy.arange(len(u))-1
u_bin_indx = numpy.arange(u.size)
if u[0] == -1:
u_bin_indx -= 1
_bin_indx = u_bin_indx[reconstruct].reshape(self.spatial_shape)
# Fill in bins with no models with masked zeros
continuum = numpy.ma.zeros((nbins,best_fit_continuum.shape[1]), dtype=float)
continuum[:,:] = numpy.ma.masked
for i,j in zip(binid.ravel(), _bin_indx.ravel()):
if i < 0 or j < 0:
continue
continuum[i,:] = best_fit_continuum[j,:]
return continuum
[docs]
def construct_continuum_models(self, replacement_templates=None, redshift_only=False,
deredshift=False, dispersion_corrections=None):
if self.stellar_continuum is None:
raise ValueError('EmissionLineModel object has no associated StellarContinuumModel.')
if self.method['fitclass'] is None:
raise ValueError('No class object available for constructing the model!')
if not callable(self.method['fitclass'].construct_continuum_models):
raise AttributeError('Provided fit class object has no callable ' \
'\'construct_continuum_models\' attribute!')
if replacement_templates is not None \
and not isinstance(replacement_templates, TemplateLibrary):
raise TypeError('Provided template replacements must have type TemplateLibrary.')
# Only the selected models are constructed, others are masked
select = numpy.invert(self.bitmask.flagged(self.hdu['FIT'].data['MASK'],
flag=['NO_FIT', 'FIT_FAILED', 'INSUFFICIENT_DATA']))
# Get the template library
templates = replacement_templates
if templates is None or not isinstance(templates, TemplateLibrary):
try:
templates = self.method['fitpar'].templates
#
# bins_to_fit = EmissionLineFit.select_binned_spectra_to_fit(self.binned_spectra,
# minimum_snr=self.method['fitpar']['minimum_snr'],
# stellar_continuum=self.stellar_continuum)
# z = numpy.mean(self.method['fitpar']['guess_redshift'][bins_to_fit])
# templates = self.method['fitclass'].get_stellar_templates(self.method['fitpar'],
# self.binned_spectra.cube,
# z=z,
# loggers=self.loggers,
# quiet=self.quiet)[0]
except:
templates = self.stellar_continuum.method['fitpar'].templates
return self.method['fitclass'].construct_continuum_models(
self.method['fitpar'].emldb, templates['WAVE'].data,
templates['FLUX'].data, self.hdu['WAVE'].data,
self.hdu['FLUX'].data.shape, self.hdu['FIT'].data,
select=select, redshift_only=redshift_only,
deredshift=deredshift,
dispersion_corrections=dispersion_corrections, dvtol=1e-9)
# TODO: Use the EmissionMomentsDB.channel_names function!
[docs]
def channel_names(self):
if self.method['fitpar'].emldb is None:
raise ValueError('Channel names undefined because line database is not available.')
return numpy.array([ '{0}-{1}'.format(n,int(w)) \
for n,w in zip(self['PAR'].data['NAME'],
self['PAR'].data['RESTWAVE']) ])