"""
A class hierarchy that measures moments of the observed emission lines.
----
.. include license and copyright
.. include:: ../include/copy.rst
----
.. include common links, assuming primary doc root is up one directory
.. include:: ../include/links.rst
"""
import inspect
from pathlib import Path
import logging
import numpy
from astropy.io import fits
import astropy.constants
from ..util.fitsutil import DAPFitsUtil
from ..util.bitmask import BitMask
from ..util.dapbitmask import DAPBitMask
from ..util.datatable import DataTable
from ..util.pixelmask import SpectralPixelMask
from ..util.log import log_output
from ..par.parset import KeywordParSet
from ..par.artifactdb import ArtifactDB
from ..par.emissionmomentsdb import EmissionMomentsDB
from .spatiallybinnedspectra import SpatiallyBinnedSpectra
from .stellarcontinuummodel import StellarContinuumModel
from .bandpassfilter import passband_integral, passband_integrated_width
from .bandpassfilter import passband_weighted_mean, passband_weighted_sdev, pseudocontinuum
from .bandpassfilter import emission_line_equivalent_width
from .spectralfitting import EmissionLineFit
[docs]
class EmissionLineMomentsDef(KeywordParSet):
"""
A class that holds the parameters necessary to perform the
emission-line moment measurements.
The defined parameters are:
.. include:: ../tables/emissionlinemomentsdef.rst
"""
def __init__(self, key='EMOMMPL11', minimum_snr=0.0, pixelmask=None, passbands='ELBMPL9',
redo_postmodeling=True, fit_vel_name='Ha-6564', overwrite=False):
# Use the signature to get the parameters and the default values
sig = inspect.signature(self.__class__)
pars = list(sig.parameters.keys())
defaults = [sig.parameters[key].default for key in pars]
# Remaining definitions done by hand
in_fl = [int, float]
values = [key, minimum_snr, pixelmask, passbands, redo_postmodeling, fit_vel_name,
overwrite]
dtypes = [str, in_fl, SpectralPixelMask, str, bool, str, bool]
descr = ['Keyword used to distinguish between different emission-line moment databases.',
'Minimum S/N of spectrum to analyze',
'Object used to mask spectral pixels',
'Either a string identifying the emission-line bandpass filter database to ' \
'use, or the direct path to the parameter file defining the database',
'Redo the moment measurements after the emission-line modeling has been ' \
'performed',
'The name of the emission line used to set the redshift of each spaxel used ' \
'to set the observed wavelength of the bandpasses.',
'If the output file already exists, redo all the calculations and overwrite it.']
super().__init__(pars, values=values, defaults=defaults, dtypes=dtypes, descr=descr)
self.momdb = None
self._validate()
[docs]
@classmethod
def from_dict(cls, d):
"""
Instantiate from a dictionary.
"""
# Copy over the primary keywords.
# TODO: Get these from inspection of the instatiation signature!
_d = {}
for key in ['key', 'minimum_snr', 'passbands', 'redo_postmodeling', 'fit_vel_name',
'overwrite']:
if key in d.keys():
_d[key] = d[key]
artifacts = None if 'artifact_mask' not in d.keys() or d['artifact_mask'] is None \
else ArtifactDB.from_key(d['artifact_mask'])
waverange = None if 'waverange' not in d.keys() or d['waverange'] is None \
else d['waverange']
if not all([a is None for a in [artifacts, waverange]]):
_d['pixelmask'] = SpectralPixelMask(artdb=artifacts, waverange=waverange)
# Return the instantiation
return super().from_dict(_d)
[docs]
def _validate(self):
"""
Validate the object and instantiate the passband database to use for the
emission-line moments.
"""
if self['passbands'] is None:
raise ValueError('Must define a passband database.')
# Check if the provided string is a file
parfile = Path(self['passbands']).resolve()
if parfile.exists():
self.momdb = EmissionMomentsDB(str(parfile))
return
# Otherwise, assume this is a keyword selecting a database distributed
# with the DAP has been chosen.
self.momdb = EmissionMomentsDB.from_key(self['passbands'])
[docs]
class EmissionLineMomentsBitMask(DAPBitMask):
r"""
Derived class that specifies the mask bits for the emission-line
moment measurements. The maskbits defined are:
.. include:: ../tables/emissionlinemomentsbitmask.rst
"""
cfg_root = 'emission_line_moments_bits'
[docs]
class EmissionLineMomentsDataTable(DataTable):
"""
Primary data table with the results of the emission-line-moment
calculations.
Table includes:
.. include:: ../tables/emissionlinemomentsdatatable.rst
Args:
neml (:obj:`int`):
Number of emission lines being measured.
bitmask (:class:`~mangadap.util.bitmask.BitMask`, optional):
Object used to flag mask bits. If None, flags are simply
boolean.
shape (:obj:`int`, :obj:`tuple`, optional):
The shape of the initial array. If None, the data array
will not be instantiated; use :func:`init` to initialize
the data array after instantiation.
"""
def __init__(self, neml=1, bitmask=None, shape=None):
# NOTE: This should require python 3.7 to make sure that this
# is an "ordered" dictionary.
datamodel = dict(BINID=dict(typ=int, shape=None, descr='Spectrum/Bin ID number'),
BINID_INDEX=dict(typ=int, shape=None,
descr='Index of the spectrum in the list of '
'provided spectra.'),
REDSHIFT=dict(typ=float, shape=None,
descr='Redshift used for shifting the passbands'),
MASK=dict(typ=bool if bitmask is None else bitmask.minimum_dtype(),
shape=(neml,),
descr='Bad-value boolean or bit mask value for the moments'),
BCEN=dict(typ=float, shape=(neml,), descr='Center of the blue sideband.'),
BCONT=dict(typ=float, shape=(neml,),
descr='Pseudo-continuum in the blue sideband'),
BCONTERR=dict(typ=float, shape=(neml,),
descr='Error in the blue-sideband pseudo-continuum'),
RCEN=dict(typ=float, shape=(neml,), descr='Center of the red sideband.'),
RCONT=dict(typ=float, shape=(neml,),
descr='Pseudo-continuum in the red sideband'),
RCONTERR=dict(typ=float, shape=(neml,),
descr='Error in the red-sideband pseudo-continuum'),
CNTSLOPE=dict(typ=float, shape=(neml,),
descr='Continuum slope used to determine the continuum at '
'the line center.'),
FLUX=dict(typ=float, shape=(neml,), descr='Summed flux (0th moment)'),
FLUXERR=dict(typ=float, shape=(neml,),
descr='Error in the summed flux'),
MOM1=dict(typ=float, shape=(neml,),
descr='Line centroid redshift (:math:`cz`; 1st moment)'),
MOM1ERR=dict(typ=float, shape=(neml,),
descr='Error in the line centroid redshift'),
MOM2=dict(typ=float, shape=(neml,),
descr='Line standard deviation (2nd moment)'),
MOM2ERR=dict(typ=float, shape=(neml,),
descr='Error in the line standard deviation'),
SINST=dict(typ=float, shape=(neml,),
descr='Instrumental dispersion at the line centroid'),
BMED=dict(typ=float, shape=(neml,),
descr='Median flux in the blue sideband used for EW'),
RMED=dict(typ=float, shape=(neml,),
descr='Median flux in the red sideband used for EW'),
EWCONT=dict(typ=float, shape=(neml,),
descr='Continuum value used for EW calculation'),
EW=dict(typ=float, shape=(neml,),
descr='Equivalent width (FLUX/pseudo continuum)'),
EWERR=dict(typ=float, shape=(neml,),
descr='Error in the equivalent width'))
keys = list(datamodel.keys())
super(EmissionLineMomentsDataTable,
self).__init__(keys, [datamodel[k]['typ'] for k in keys],
element_shapes=[datamodel[k]['shape'] for k in keys],
descr=[datamodel[k]['descr'] for k in keys],
shape=shape)
[docs]
class EmissionLineMoments:
r"""
Class that holds the emission-line moment measurements.
Args:
loggers (list): (**Optional**) List of `logging.Logger`_ objects
to log progress; ignored if quiet=True. Logging is done
using :func:`mangadap.util.log.log_output`. Default is no
logging.
quiet (bool): (**Optional**) Suppress all terminal and logging
output. Default is False.
Attributes:
loggers (list): List of `logging.Logger`_ objects to log
progress; ignored if quiet=True. Logging is done using
:func:`mangadap.util.log.log_output`.
quiet (bool): Suppress all terminal and logging output.
"""
# @profile
def __init__(self, database, binned_spectra, stellar_continuum=None,
emission_line_model=None, redshift=None, database_list=None, artifact_path=None,
bandpass_path=None, output_path=None, output_file=None, hardcopy=True,
overwrite=None, checksum=False, loggers=None, quiet=False):
self.loggers = None
self.quiet = False
# Define the database properties
# self.database = None
# self.pixelmask = None
# self.artdb = None
# self.momdb = None
self.database = database
if not isinstance(self.database, EmissionLineMomentsDef):
raise TypeError('Method must have type EmissionLineMomentsDef.')
# self._define_databases(database_key, database_list=database_list,
# artifact_path=artifact_path, bandpass_path=bandpass_path)
self.nmom = self.database.momdb.size
self.binned_spectra = None
self.stellar_continuum = None
self.emission_line_model = None
self.redshift = None
# Define the output directory and file
self.directory_path = None # Set in default_paths
self.output_file = None
self.hardcopy = None
# Initialize the objects used in the assessments
self.bitmask = EmissionLineMomentsBitMask()
self.hdu = None
self.checksum = checksum
self.spatial_shape = None
self.nspec = None
self.spatial_index = None
self.image_arrays = None
self._assign_image_arrays()
self.nbins = None
self.missing_bins = None
# Run the assessments of the DRP file
self.measure(binned_spectra, stellar_continuum=stellar_continuum,
emission_line_model=emission_line_model, redshift=redshift,
output_path=output_path, output_file=output_file, hardcopy=hardcopy,
overwrite=overwrite, loggers=loggers, quiet=quiet)
def __getitem__(self, key):
return self.hdu[key]
[docs]
@staticmethod
def default_paths(cube, method_key, rdxqa_method, binning_method, stelcont_method=None,
elmodel_method=None, output_path=None, output_file=None):
"""
Set the default directory and file name for the output file.
Args:
cube (:class:`mangadap.datacube.datacube.DataCube`):
Datacube to analyze.
method_key (:obj:`str`):
Keyword designating the method used for the reduction
assessments.
rdxqa_method (:obj:`str`):
The method key for the basic assessments of the datacube.
binning_method (:obj:`str`):
The method key for the spatial binning.
stelcont_method (:obj:`str`, optional):
The method key for the stellar-continuum fitting method. If
None, not included in output file name.
elmodel_method (:obj:`str`, optional):
The method key for the emission-line modeling method. If None,
not included in the output file name.
output_path (:obj:`str`, `Path`_, optional):
The path for the output file. If None, the current working
directory is used.
output_file (:obj:`str`, optional):
The name of the output "reference" file. The full path of the
output file will be :attr:`directory_path`/:attr:`output_file`.
If None, the default is to combine ``cube.output_root`` and the
method keys. The order of the keys is the order of operations
(rdxqa, binning, stellar continuum, emission-line model).
Returns:
:obj:`tuple`: Returns a `Path`_ with the output directory and a
:obj:`str` with the output file name.
"""
directory_path = Path('.').resolve() if output_path is None \
else Path(output_path).resolve()
method = f'{rdxqa_method}-{binning_method}'
if stelcont_method is not None:
method = f'{method}-{stelcont_method}'
if elmodel_method is not None:
method = f'{method}-{elmodel_method}'
method = f'{method}-{method_key}'
_output_file = f'{cube.output_root}-{method}.fits.gz' if output_file is None \
else output_file
return directory_path, _output_file
[docs]
def _assign_image_arrays(self):
"""
Set :attr:`image_arrays`, which contains the list of extensions
in :attr:`hdu` that are on-sky image data.
"""
self.image_arrays = [ 'BINID' ]
[docs]
def _get_missing_bins(self, unique_bins=None):
if unique_bins is None:
good_snr = self.binned_spectra.above_snr_limit(self.database['minimum_snr'])
return numpy.sort(self.binned_spectra.missing_bins +
self.binned_spectra['BINS'].data['BINID'][numpy.invert(good_snr)].tolist())
return SpatiallyBinnedSpectra._get_missing_bins(unique_bins)
[docs]
def _assign_redshifts(self, redshift, measure_on_unbinned_spaxels, good_snr,
default_redshift=None):
"""
Set the redshift to use for each spectrum for the emission-line
moments.
In terms of precedence, directly provided redshifts override
those in any available EmissionLineModel.
If :attr:`emission_line_model` and redshift are None, the default
redshift is used (or 0.0 if this is also None).
To get the emission-line stellar kinematics, the function calls
:func:`mangadap.proc.emissionlinemodel.EmissionLineModel.matched_kinematics`.
The method must have a fit_vel_name define for this use!
In this function, the provided redshift must be a single value
or None; therefore, the means of any vectors should be provided
instead of the full vector.
Args:
redshift (:obj:`float`, `numpy.ndarray`_):
Redshifts (:math:`z`) to use for each spectrum.
measure_on_unbinned_spaxels (:obj:`bool`):
Flag that method expects to measure moments on unbinned
spaxels.
good_snr (`numpy.ndarray`_):
Boolean array setting which spectra have sufficient S/N
for the measurements.
default_redshift (:obj:`float`, optional):
Only used if there are stellar kinematics available.
Provides the default redshift to use for spectra without
stellar measurements; see ``redshift`` in
:func:`mangadap.proc.stellarcontinuummodel.StellarContinuumModel.matched_kinematics`.
If None (default), the median of the unmasked stellar
velocities will be used.
"""
# Construct the binid matrix if measuring on unbinned spaxels
if measure_on_unbinned_spaxels:
binid = numpy.full(self.spatial_shape, -1, dtype=int)
binid.ravel()[good_snr] = numpy.arange(self.nbins)
missing = []
nspec = self.binned_spectra.cube.nspec
else:
binid = self.binned_spectra['BINID'].data
missing = self.binned_spectra.missing_bins
nspec = self.binned_spectra.nbins
#---------------------------------------------------------------
# Get the redshifts measured for the selected emission line and
# use them if no default value is provided
if self.emission_line_model is not None and self.database['fit_vel_name'] is not None \
and redshift is None:
self.redshift, _ = self.emission_line_model.matched_kinematics(
binid, self.database['fit_vel_name'],
redshift=default_redshift, nearest=True, missing=missing)
if measure_on_unbinned_spaxels:
tmp = self.redshift.copy()
self.redshift = numpy.zeros(nspec, dtype=float)
self.redshift[good_snr] = tmp
return
#---------------------------------------------------------------
# Use the default value(s)
_redshift = numpy.atleast_1d(redshift)
if len(_redshift) not in [ 1, nspec ]:
raise ValueError('Provided redshift must be either a single value or match the '
'number of binned spectra or the number of unbinned spaxels.')
self.redshift = numpy.full(nspec, redshift, dtype=float) \
if len(_redshift) == 1 else _redshift.copy()
[docs]
def _flag_good_spectra(self, measure_on_unbinned_spaxels):
if measure_on_unbinned_spaxels:
return self.binned_spectra.check_fgoodpix()
return self.binned_spectra.above_snr_limit(self.database['minimum_snr'])
[docs]
@staticmethod
def sideband_pseudocontinua(wave, spec, sidebands, spec_n=None, err=None, log=True):
"""
Get the side-band integrals.
err is a single vector that is the same for all spec!
Returns:
:obj:`float`, `numpy.ndarray`_: Return five arrays or
floats: (1) The center of each passband, (2) the mean
continuum level, (3) the propagated error in the
continuum level (will be None if no errors are provided),
(4) flag that part of the passband was masked, (5) flag
that the passband was fully masked or empty.
"""
if spec.ndim == 1:
return pseudocontinuum(wave, spec, passband=sidebands, err=err, log=log)
# Calculate the pseudo-continua in the sidebands
nspec = spec.shape[0]
nbands = sidebands.shape[0]
# More than one spectrum entered, use spec_n to decide which
# spectrum to use for the integrals. If spec_n is None and
# there is more than one spectrum (improper use of the code!),
# allow the code to continue but issue a warning
if spec_n is None:
warnings.warn('Multiple spectra entered, but no designation for which to use!')
_spec_n = numpy.zeros(nbands, dtype=int)
else:
_spec_n = spec_n
# TODO: Change this to a warning?
if numpy.amax(_spec_n) > nspec-1:
raise ValueError('Selected spectrum not provided!')
# Instantiate the arrays
continuum = numpy.zeros(nbands, dtype=float)
continuum_error = None if err is None else numpy.zeros(nbands, dtype=float)
band_center = numpy.zeros(nbands, dtype=float)
band_incomplete = numpy.zeros(nbands, dtype=bool)
band_empty = numpy.zeros(nbands, dtype=bool)
for i in range(nspec):
indx = _spec_n==i
if numpy.sum(indx) == 0:
continue
band_center[indx], continuum[indx], _continuum_error, band_incomplete[indx], \
band_empty[indx] = pseudocontinuum(wave, spec[i,:], passband=sidebands[indx,:],
err=err, log=log)
if err is not None:
continuum_error[indx] = _continuum_error
return band_center, continuum, continuum_error, band_incomplete, band_empty
[docs]
@staticmethod
def single_band_moments(wave, spec, passband, restwave, err=None):
"""
Measure the moments for a single band.
If spectrum errors are not provided, moment errors are returned
as 0.0.
Args:
wave (`numpy.ndarray`_):
Wavelengths in angstroms
spec (`numpy.ndarray`_):
Flux in flux density (per angstrom)
restwave (float):
The rest wavelength of the line.
err (`numpy.ndarray`_):
The 1-sigma errors in the flux density.
Returns:
flux
fluxerr
mom1
mom1err
mom2
mom2err
incomplete band
empty band
division by zero
undefined 1st moment
undefined 2nd moment
"""
_passband = numpy.atleast_1d(passband)
if _passband.ndim > 1:
raise ValueError('Should only pass a single band to single_band_moments.')
if len(_passband) != 2:
raise ValueError('Band should be a list of 2 numbers.')
# Get the fraction of the passband that is unmasked
interval_frac = passband_integrated_width(wave, spec, passband=passband, log=True) \
/ numpy.diff(passband)[0]
incomplete = interval_frac < 1.0
empty = numpy.invert(interval_frac > 0.0)
if empty:
return 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, incomplete, empty, False, True, True
# Get the redshift vector
cz = astropy.constants.c.to('km/s').value*(wave/restwave-1)
# Get the integrated flux
flux = passband_integral(wave, spec, passband=passband, log=True)
fluxerr = 0.0 if err is None else \
passband_integral(wave, err, passband=passband, log=True, quad=True)
# No flux in the passband, meaning division by zero will occur!
if not numpy.absolute(flux) > 0.0:
return flux, fluxerr, 0.0, 0.0, 0.0, 0.0, incomplete, empty, True, True, True
# Get the first and second moments:
mom1, mom1err = passband_weighted_mean(wave, spec, cz, yerr=err, passband=passband,
log=True)
mom2, mom2err = passband_weighted_sdev(wave, spec, cz, yerr=err, passband=passband,
log=True)
# If no errors provided, set errors to 0
if err is None:
mom1err = 0.0
mom2err = 0.0
# Set masked values to 0.0
undefined_mom1 = False
if isinstance(mom1, numpy.ma.core.MaskedConstant) and mom1.mask:
mom1 = 0.0
mom1err = 0.0
undefined_mom1 = True
undefined_mom2 = False
if isinstance(mom2, numpy.ma.core.MaskedConstant) and mom2.mask:
mom2 = 0.0
mom2err = 0.0
undefined_mom2 = True
if isinstance(mom1err, numpy.ma.core.MaskedConstant) and mom1err.mask:
mom1err = 0.0
if isinstance(mom2err, numpy.ma.core.MaskedConstant) and mom2err.mask:
mom2err = 0.0
return flux, fluxerr, mom1, mom1err, mom2, mom2err, incomplete, empty, False, \
undefined_mom1, undefined_mom2
[docs]
@staticmethod
def continuum_subtracted_moments(wave, spec, mainbands, restwave, bcen, bcont, rcen, rcont,
spec_n=None, err=None, sres=None):
"""
Calculate the continuum-subtracted moments
err and sres are single vectors that are the same for all spec!
Returns:
cntm
cntb
flux
fluxerr
mom1
mom1err
mom2
mom2err
sinst
incomplete
empty
divbyzero
undefined_mom2
"""
# Get the parameters for the linear continuum across the
# primary passband
nbands = mainbands.shape[0]
flux = numpy.zeros(nbands, dtype=float)
fluxerr = numpy.zeros(nbands, dtype=float)
mom1 = numpy.zeros(nbands, dtype=float)
mom1err = numpy.zeros(nbands, dtype=float)
mom2 = numpy.zeros(nbands, dtype=float)
mom2err = numpy.zeros(nbands, dtype=float)
incomplete = numpy.zeros(nbands, dtype=bool)
empty = numpy.zeros(nbands, dtype=bool)
divbyzero = numpy.zeros(nbands, dtype=bool)
undefined_mom1 = numpy.zeros(nbands, dtype=bool)
undefined_mom2 = numpy.zeros(nbands, dtype=bool)
if numpy.any(rcen-bcen == 0):
print('band centers are the same!')
cntm = (rcont - bcont) / (rcen - bcen)
cntb = bcont - bcen*cntm
# If more than one spectrum entered, use spec_n to decide which
# spectrum to use for the integrals
if len(spec.shape) > 1:
# In this case, spec_n should always be provided, but allow
# the code to continue with a warning
if spec_n is None:
warnings.warn('Multiple spectra entered, but no designation for which to use!')
_spec_n = numpy.zeros(self.nmom, dtype=int)
else:
_spec_n = spec_n
# Get the moments for all the main passbands
for i,p in enumerate(mainbands):
cont = cntb[i] + cntm[i]*wave
_spec = spec - cont if len(spec.shape) == 1 else spec[_spec_n[i],:] - cont
flux[i], fluxerr[i], mom1[i], mom1err[i], mom2[i], mom2err[i], incomplete[i], \
empty[i], divbyzero[i], undefined_mom1[i], undefined_mom2[i] \
= EmissionLineMoments.single_band_moments(wave, _spec, p, restwave[i],
err=err)
# Get the instrumental dispersion at the position of the line
# center
sinst = numpy.zeros(nbands, dtype=float) if sres is None \
else EmissionLineFit.instrumental_dispersion(wave, sres, restwave, mom1)
return cntm, cntb, flux, fluxerr, mom1, mom1err, mom2, mom2err, sinst, incomplete, empty, \
divbyzero, undefined_mom1, undefined_mom2
[docs]
@staticmethod
def measure_moments(momdb, wave, flux, ivar=None, mask=None, sres=None, continuum=None,
redshift=None, bitmask=None):
"""
Measure the emission-line moments.
If not input as masked arrays, flux is converted to one.
"""
# Check the input moment database
if not isinstance(momdb, EmissionMomentsDB):
raise TypeError('Input database must have type EmissionMomentsDB.')
# Check the bitmask if provided
if bitmask is not None and not isinstance(bitmask, BitMask):
raise TypeError('Input bitmask must have type BitMask.')
# Prepare the spectra for the measurements
_flux, _err, _sres, _continuum, _redshift, _ \
= EmissionLineFit.check_and_prep_input(wave, flux, ivar=ivar, mask=mask,
sres=sres, continuum=continuum,
redshift=redshift)
# Subtract the continuum and determine where the continuum is
# undefined. If _continuum is None, this just returns _fluxnc =
# _flux and no_continuum = None.
_fluxnc, no_continuum = EmissionLineFit.subtract_continuum(_flux, _continuum)
# Initialize the output data
nspec = _flux.shape[0]
nmom = momdb.size
measurements = EmissionLineMomentsDataTable(neml=nmom, bitmask=bitmask, shape=nspec)
# Save the redshift used
measurements['REDSHIFT'] = _redshift
# Mask dummy lines
if numpy.any(momdb.dummy):
measurements['MASK'][:,momdb.dummy] = True if bitmask is None else \
bitmask.turn_on(measurements['MASK'][:,momdb.dummy], 'UNDEFINED_BANDS')
# No valid lines
if numpy.all(momdb.dummy):
return measurements
# Common arrays used for each spectrum
blue_fraction = numpy.zeros(nmom, dtype=float)
red_fraction = numpy.zeros(nmom, dtype=float)
incomplete = numpy.zeros(nmom, dtype=bool)
empty = numpy.zeros(nmom, dtype=bool)
divbyzero = numpy.zeros(nmom, dtype=bool)
undefined_mom1 = numpy.zeros(nmom, dtype=bool)
undefined_mom2 = numpy.zeros(nmom, dtype=bool)
# Perform the measurements for each spectrum
for i in range(nspec):
print('Measuring emission-line moments in spectrum: {0}/{1}'.format(i+1,nspec),
end='\r')
incomplete[:] = False
empty[:] = False
divbyzero[:] = False
undefined_mom1[:] = False
undefined_mom2[:] = False
# Shift the sidebands to the appropriate redshift
_bluebands = momdb['blueside'][numpy.invert(momdb.dummy)]*(1.0+_redshift[i])
_redbands = momdb['redside'][numpy.invert(momdb.dummy)]*(1.0+_redshift[i])
__err = None if _err is None else _err[i,:]
__sres = None if _sres is None else _sres[i,:]
# Check if the model was subtracted over the full range of
# the sidebands, or if the model was subtracted in one
# sideband but not the other
if no_continuum is None:
spec = _flux[i,:]
spec_n = None
# Flag moment calculations without a stellar-absorption
# correction
measurements['MASK'][i,:] = True if bitmask is None \
else bitmask.turn_on(measurements['MASK'][i,:],
'NO_ABSORPTION_CORRECTION')
else:
blue_fraction[numpy.invert(momdb.dummy)] \
= passband_integrated_width(wave, no_continuum[i,:], passband=_bluebands,
log=True) / numpy.diff(_bluebands, axis=1).ravel()
indx = numpy.logical_and(blue_fraction > 0.0, blue_fraction < 1.0)
measurements['MASK'][i,indx] = True if bitmask is None \
else bitmask.turn_on(measurements['MASK'][i,indx], 'BLUE_JUMP')
red_fraction[numpy.invert(momdb.dummy)] \
= passband_integrated_width(wave, no_continuum[i,:], passband=_redbands,
log=True) / numpy.diff(_redbands, axis=1).ravel()
indx = numpy.logical_and(red_fraction > 0.0, red_fraction < 1.0)
measurements['MASK'][i,indx] = True if bitmask is None \
else bitmask.turn_on(measurements['MASK'][i,indx], 'RED_JUMP')
indx = (numpy.invert(blue_fraction > 0) & (red_fraction > 0)) \
| (numpy.invert(red_fraction > 0) & (blue_fraction > 0))
measurements['MASK'][i,indx] = True if bitmask is None \
else bitmask.turn_on(measurements['MASK'][i,indx],'JUMP_BTWN_SIDEBANDS')
# Only use model-subtracted measurements if there are no
# jumps in the bands or between the bands
jumps = measurements['MASK'][i,:] if bitmask is None \
else bitmask.flagged(measurements['MASK'][i,:],
flag=['BLUE_JUMP', 'RED_JUMP',
'JUMP_BTWN_SIDEBANDS'])
spec = numpy.ma.append(_flux[i,:].reshape(1,-1), _fluxnc[i,:].reshape(1,-1), axis=0)
spec_n = numpy.zeros(nmom, dtype=int)
spec_n[numpy.invert(jumps)] = 1
# Flag moment calculations as having not been corrected
# for stellar absorption
measurements['MASK'][i,spec_n == 0] = True if bitmask is None \
else bitmask.turn_on(measurements['MASK'][i,spec_n == 0],
'NO_ABSORPTION_CORRECTION')
# Get the blue pseudo continuum
measurements['BCEN'][i,numpy.invert(momdb.dummy)], \
measurements['BCONT'][i,numpy.invert(momdb.dummy)], conterr, \
incomplete[numpy.invert(momdb.dummy)], empty[numpy.invert(momdb.dummy)] \
= EmissionLineMoments.sideband_pseudocontinua(wave, spec, _bluebands,
spec_n=None if spec_n is None
else spec_n[numpy.invert(momdb.dummy)],
err=__err)
if __err is not None:
measurements['BCONTERR'][i,numpy.invert(momdb.dummy)] = conterr
if numpy.sum(incomplete) > 0:
measurements['MASK'][i,incomplete] = True if bitmask is None \
else bitmask.turn_on(measurements['MASK'][i,incomplete], 'BLUE_INCOMP')
if numpy.sum(empty) > 0:
measurements['MASK'][i,empty] = True if bitmask is None \
else bitmask.turn_on(measurements['MASK'][i,empty], 'BLUE_EMPTY')
# Get the red pseudo continuum
measurements['RCEN'][i,numpy.invert(momdb.dummy)], \
measurements['RCONT'][i,numpy.invert(momdb.dummy)], conterr, \
incomplete[numpy.invert(momdb.dummy)], empty[numpy.invert(momdb.dummy)] \
= EmissionLineMoments.sideband_pseudocontinua(wave, spec, _redbands,
spec_n=None if spec_n is None
else spec_n[numpy.invert(momdb.dummy)],
err=__err)
if __err is not None:
measurements['RCONTERR'][i,numpy.invert(momdb.dummy)] = conterr
if numpy.sum(incomplete) > 0:
measurements['MASK'][i,incomplete] = True if bitmask is None \
else bitmask.turn_on(measurements['MASK'][i,incomplete], 'RED_INCOMP')
if numpy.sum(empty) > 0:
measurements['MASK'][i,empty] = True if bitmask is None \
else bitmask.turn_on(measurements['MASK'][i,empty], 'RED_EMPTY')
# Get the main passband integral after subtracting the
# continuum
indx = numpy.invert(momdb.dummy) \
& numpy.invert(measurements['MASK'][i,:] if bitmask is None
else bitmask.flagged(measurements['MASK'][i,:],
flag=['BLUE_EMPTY', 'RED_EMPTY',
'BLUE_JUMP', 'RED_JUMP']))
_mainbands = momdb['primary'][indx]*(1.0+_redshift[i])
measurements['CNTSLOPE'][i,indx], cntb, measurements['FLUX'][i,indx], \
measurements['FLUXERR'][i,indx], measurements['MOM1'][i,indx], \
measurements['MOM1ERR'][i,indx], measurements['MOM2'][i,indx], \
measurements['MOM2ERR'][i,indx], measurements['SINST'][i,indx], \
incomplete[indx], empty[indx], divbyzero[indx], undefined_mom1[indx], \
undefined_mom2[indx] = \
EmissionLineMoments.continuum_subtracted_moments(wave, spec,
_mainbands, momdb['restwave'][indx],
measurements['BCEN'][i,indx],
measurements['BCONT'][i,indx],
measurements['RCEN'][i,indx],
measurements['RCONT'][i,indx],
spec_n=None if spec_n is None
else spec_n[indx], err=__err,
sres=__sres)
# Turn on any necessary bits
measurements['MASK'][i,incomplete] = True if bitmask is None \
else bitmask.turn_on(measurements['MASK'][i,incomplete], 'MAIN_INCOMP')
measurements['MASK'][i,empty] = True if bitmask is None \
else bitmask.turn_on(measurements['MASK'][i,empty], 'MAIN_EMPTY')
measurements['MASK'][i,divbyzero] = True if bitmask is None \
else bitmask.turn_on(measurements['MASK'][i,divbyzero], 'DIVBYZERO')
measurements['MASK'][i,undefined_mom1] = True if bitmask is None \
else bitmask.turn_on(measurements['MASK'][i,undefined_mom1],
'UNDEFINED_MOM1')
measurements['MASK'][i,undefined_mom2] = True if bitmask is None \
else bitmask.turn_on(measurements['MASK'][i,undefined_mom2],
'UNDEFINED_MOM2')
print('Measuring emission-line moments in spectrum: {0}/{0}'.format(nspec))
return measurements
[docs]
def file_name(self):
"""Return the name of the output file."""
return self.output_file
[docs]
def file_path(self):
"""Return the full path to the output file."""
if self.directory_path is None or self.output_file is None:
return None
return self.directory_path / self.output_file
[docs]
def measure(self, binned_spectra, stellar_continuum=None, emission_line_model=None,
redshift=None, output_path=None, output_file=None, hardcopy=True, overwrite=None,
loggers=None, quiet=False):
"""
Measure the emission-line moments using the binned spectra.
If neither stellar-continuum nor emission-line models are
provided:
- Moments are measure on the binned spectra
- No continuum subtraction is performed
If a stellar-continuum model is provided without an
emission-line model:
- Moments are measured on the binned spectra
- The best-fitting stellar continuum is subtracted
If an emission-line model is provided without a
stellar-continuum model:
- Moments are measured on the relevant (binned or unbinned)
spectra
- If the emission-line model includes data regarding the
stellar-continuum fit (template spectra and template
weights), the continuum is subtracted before the
measurements are made; otherwise, no continuum is
subtracted
If both stellar-continuum and emission-line models are provided,
and if the stellar-continuum and emission-line fits are
performed on the same spectra:
- Moments are measured on the relevant (binned or unbinned)
spectra
- If the emission-line model includes data regarding the
stellar-continuum fit (template spectra and template
weights), the continuum is subtracted before the
measurements are made; otherwise, the stellar continuum is
used.
If both stellar-continuum and emission-line models are provided,
and if the stellar-continuum and emission-line fits are
performed on different spectra:
- The behavior is exactly as if the stellar-continuum model
was not provided.
Args:
output_path (:obj:`str`, `Path`_, optional):
The path for the output file. If None, the current working
directory is used.
output_file (:obj:`str`, optional):
The name of the output "reference" file. The full path of the
output file will be :attr:`directory_path`/:attr:`output_file`.
If None, the default is to combine ``cube.output_root`` and the
method keys. The order of the keys is the order of operations
(rdxqa, binning, stellar continuum, emission-line model).
"""
# Initialize the reporting
if loggers is not None:
self.loggers = loggers
self.quiet = quiet
# SpatiallyBinnedSpectra object always needed
if binned_spectra is None:
raise ValueError('Must provide spectra object for fitting.')
if not isinstance(binned_spectra, SpatiallyBinnedSpectra):
raise TypeError('Must provide a valid SpatiallyBinnedSpectra object!')
if binned_spectra.hdu is None:
raise ValueError('Provided SpatiallyBinnedSpectra object is undefined!')
self.binned_spectra = binned_spectra
# Check stellar-continuum model object, if provided
self.stellar_continuum = None
if stellar_continuum is not None:
if not isinstance(stellar_continuum, StellarContinuumModel):
raise TypeError('Provided stellar continuum must have StellarContinuumModel type!')
if stellar_continuum.hdu is None:
raise ValueError('Provided StellarContinuumModel is undefined!')
self.stellar_continuum = stellar_continuum
# Check emission-line model object, if provided
self.emission_line_model = None
if emission_line_model is not None:
# if not isinstance(emission_line_model, EmissionLineModel):
# raise TypeError('Provided emission line models must be of type EmissionLineModel.')
if emission_line_model.hdu is None:
raise ValueError('Provided EmissionLineModel is undefined!')
self.emission_line_model = emission_line_model
# What stellar continuum is available?
# - Assume the stellar continuum can always be extracted if a
# StellarContinuumModel object is provided
# - Check if the EmissionLineModel fitter has the appropriate
# function; True for Sasuke, not currently true for Elric
eml_stellar_continuum_available = self.emission_line_model is not None \
and callable(self.emission_line_model.method['fitclass'].construct_continuum_models)
# What spectra to use?
# - Assume StellarContinuumModel always fits the binned spectra
# - The EmissionLineModel fits the binned spectra or unbinned
# spaxels as specified by its deconstruct_bins flag
measure_on_unbinned_spaxels = self.emission_line_model is not None \
and self.emission_line_model.method['fitpar']['deconstruct_bins'] != 'ignore'
self.spatial_shape =self.binned_spectra.spatial_shape
self.nspec = self.binned_spectra.cube.nspec if measure_on_unbinned_spaxels \
else self.binned_spectra.nbins
self.spatial_index = self.binned_spectra.spatial_index.copy()
#---------------------------------------------------------------
# Get the good spectra
good_snr = self._flag_good_spectra(measure_on_unbinned_spaxels)
# Set the number of bins measured and missing bins
self.nbins = numpy.sum(good_snr)
self.missing_bins = [] if measure_on_unbinned_spaxels else self._get_missing_bins()
# Get the redshifts to apply
self._assign_redshifts(redshift, measure_on_unbinned_spaxels, good_snr)
# Report
if not self.quiet:
log_output(self.loggers, 1, logging.INFO, '-'*50)
log_output(loggers, 1, logging.INFO, f'{"EMISSION-LINE MOMENTS":^50}')
log_output(self.loggers, 1, logging.INFO, '-'*50)
log_output(self.loggers, 1, logging.INFO, '-'*50)
log_output(self.loggers, 1, logging.INFO, 'Measurements for {0}'.format(
'unbinned spaxels' if measure_on_unbinned_spaxels else 'binned spectra'))
if self.stellar_continuum is None and self.emission_line_model is None:
log_output(self.loggers, 1, logging.INFO, 'No stellar continuum available.')
elif self.emission_line_model is not None and eml_stellar_continuum_available:
log_output(self.loggers, 1, logging.INFO,
'Using stellar continuum from emission-line model fit.')
else:
log_output(self.loggers, 1, logging.INFO,
'Using stellar continuum from stellar-continuum model fit.')
log_output(self.loggers, 1, logging.INFO, f'Number of spectra: {self.nspec}')
if not measure_on_unbinned_spaxels and len(self.binned_spectra.missing_bins) > 0:
log_output(self.loggers, 1, logging.INFO,
f'Missing bins: {len(self.binned_spectra.missing_bins)}')
log_output(self.loggers, 1, logging.INFO,
f'With good S/N and to measure: {numpy.sum(good_snr)}')
# Make sure there are good spectra
if numpy.sum(good_snr) == 0:
raise ValueError('No good spectra for measurements!')
#---------------------------------------------------------------
# (Re)Set the output paths
self.directory_path, self.output_file = \
EmissionLineMoments.default_paths(self.binned_spectra.cube, self.database['key'],
self.binned_spectra.rdxqa.method['key'], self.binned_spectra.method['key'],
stelcont_method=None if self.stellar_continuum is None
else self.stellar_continuum.method['key'],
elmodel_method=None if self.emission_line_model is None
else self.emission_line_model.method['key'],
output_path=output_path, output_file=output_file)
# self._set_paths(directory_path, dapver, analysis_path, output_file)
_overwrite = self.database['overwrite'] if overwrite is None else overwrite
#---------------------------------------------------------------
# Check that the file path is defined
ofile = self.file_path()
if ofile is None:
raise ValueError('File path for output file is undefined!')
# Report
if not self.quiet:
log_output(self.loggers,1,logging.INFO, f'Output path: {self.directory_path}')
log_output(self.loggers,1,logging.INFO, f'Output file: {self.output_file}')
#---------------------------------------------------------------
# If the file already exists, and not overwriting, just read the
# file
if ofile.exists() and not _overwrite:
self.hardcopy = True
if not self.quiet:
log_output(self.loggers, 1, logging.INFO, 'Using existing file')
self.read(checksum=self.checksum)
if not self.quiet:
log_output(self.loggers, 1, logging.INFO, '-'*50)
return
#---------------------------------------------------------------
# Get the spectra to use for the measurements
wave, flux, ivar, sres \
= EmissionLineFit.get_spectra_to_fit(self.binned_spectra,
pixelmask=self.database['pixelmask'],
select=good_snr,
original_spaxels=measure_on_unbinned_spaxels)
# Get the continuum if there is any
if eml_stellar_continuum_available:
if measure_on_unbinned_spaxels:
binid = numpy.full(self.binned_spectra.spatial_shape, -1, dtype=int)
binid.ravel()[good_snr] = numpy.arange(self.nbins)
continuum = self.emission_line_model.fill_continuum_to_match(binid)
else:
continuum = self.emission_line_model.fill_continuum_to_match(
self.binned_spectra['BINID'].data,
missing=self.binned_spectra.missing_bins)
elif self.stellar_continuum is not None:
continuum = self.stellar_continuum.fill_to_match(self.binned_spectra['BINID'].data,
missing=self.binned_spectra.missing_bins)
#---------------------------------------------------------------
# Perform the moment measurements
_redshift = self.redshift[good_snr]
measurements = self.measure_moments(self.database.momdb, wave, flux, ivar=ivar, sres=sres,
continuum=continuum, redshift=_redshift,
bitmask=self.bitmask)
#---------------------------------------------------------------
# Perform the equivalent width measurements
include_band = numpy.array([numpy.invert(self.database.momdb.dummy)]*self.nbins) \
& numpy.invert(self.bitmask.flagged(measurements['MASK'],
flag=['BLUE_EMPTY', 'RED_EMPTY']))
# Set the line center at the center of the primary passband
line_center = (1.0+_redshift)[:,None]*self.database.momdb['restwave'][None,:]
measurements['BMED'], measurements['RMED'], pos, measurements['EWCONT'], \
measurements['EW'], measurements['EWERR'] \
= emission_line_equivalent_width(wave, flux,
self.database.momdb['blueside'],
self.database.momdb['redside'],
line_center, measurements['FLUX'],
redshift=_redshift,
line_flux_err=measurements['FLUXERR'],
include_band=include_band)
# Flag non-positive measurements
measurements['MASK'][include_band & numpy.invert(pos)] \
= self.bitmask.turn_on(measurements['MASK'][include_band & numpy.invert(pos)],
'NON_POSITIVE_CONTINUUM')
#---------------------------------------------------------------
# Set the number of bins measured, missing bins, and bin IDs
measurements['BINID'] = numpy.arange(self.nbins) if measure_on_unbinned_spaxels \
else self.binned_spectra['BINS'].data['BINID'][good_snr]
measurements['BINID_INDEX'] = numpy.arange(self.nbins) if measure_on_unbinned_spaxels \
else numpy.arange(self.binned_spectra.nbins)[good_snr]
measurements_binid = None
if measure_on_unbinned_spaxels:
measurements_binid = numpy.full(self.binned_spectra.spatial_shape, -1, dtype=int)
measurements_binid.ravel()[good_snr] = numpy.arange(self.nbins)
#---------------------------------------------------------------
# Initialize the header
self.hardcopy = hardcopy
pri_hdr = self._initialize_primary_header(measurements_binid=measurements_binid)
map_hdr = DAPFitsUtil.build_map_header(self.binned_spectra.cube.fluxhdr,
'K Westfall <westfall@ucolick.org>')
# Get the spatial map mask
map_mask = numpy.zeros(self.spatial_shape, dtype=self.bitmask.minimum_dtype())
# Account for measurements on individual spaxels
if measurements_binid is None:
# Add any spaxel not used because it was flagged by the
# binning step
indx = self.binned_spectra['MAPMASK'].data > 0
map_mask[indx] = self.bitmask.turn_on(map_mask[indx], 'DIDNOTUSE')
# Isolate any spaxels with foreground stars
indx = self.binned_spectra.bitmask.flagged(self.binned_spectra['MAPMASK'].data,
'FORESTAR')
map_mask[indx] = self.bitmask.turn_on(map_mask[indx], 'FORESTAR')
# Get the bins that were below the S/N limit
indx = numpy.invert(DAPFitsUtil.reconstruct_map(self.spatial_shape,
self.binned_spectra['BINID'].data.ravel(),
good_snr, dtype='bool')) & (map_mask == 0)
map_mask[indx] = self.bitmask.turn_on(map_mask[indx], 'LOW_SNR')
# Get the bin ids with measured indices
bin_indx = DAPFitsUtil.downselect_bins(self.binned_spectra['BINID'].data.ravel(),
measurements['BINID']).reshape(self.spatial_shape)
else:
# Assume any model with a binid less than zero is from a
# spaxel that was not used
indx = measurements_binid < 0
map_mask[indx] = self.bitmask.turn_on(map_mask[indx], 'DIDNOTUSE')
# The number of valid bins MUST match the number of
# measurements
nvalid = numpy.sum(numpy.invert(indx))
if nvalid != len(measurements):
raise ValueError('Provided id does not match the number of measurements.')
# Get the bin ids with fitted models
bin_indx = measurements_binid
# Save the data to the hdu attribute
self.hdu = fits.HDUList([fits.PrimaryHDU(header=pri_hdr),
fits.ImageHDU(data=bin_indx, header=map_hdr, name='BINID'),
fits.ImageHDU(data=map_mask, header=map_hdr, name='MAPMASK'),
self.database.momdb.to_datatable().to_hdu(name='ELMBAND'),
measurements.to_hdu(name='ELMMNTS')])
#---------------------------------------------------------------
# Write the data, if requested
if self.hardcopy:
if not self.directory_path.exists():
self.diretory_path.mkdir(parents=True)
self.write(overwrite=_overwrite)
if not self.quiet:
log_output(self.loggers, 1, logging.INFO, '-'*50)
[docs]
def write(self, overwrite=False):
"""
Write the hdu object to the file.
"""
DAPFitsUtil.write(self.hdu, str(self.file_path()), overwrite=overwrite, checksum=True,
loggers=self.loggers, quiet=self.quiet)
[docs]
def read(self, ifile=None, strict=True, checksum=False):
"""
Read an existing file with a previously binned set of spectra.
"""
_ifile = self.file_path() if ifile is None else Path(ifile).resolve()
if not _ifile.exists():
raise FileNotFoundError('File does not exist!: {0}'.format(ifile))
if self.hdu is not None:
self.hdu.close()
self.hdu = DAPFitsUtil.read(str(_ifile), checksum=checksum)
# Confirm that the internal method is the same as the method
# that was used in writing the file
if self.hdu['PRIMARY'].header['ELMKEY'] != self.database['key']:
if strict:
raise ValueError('Keywords in header do not match specified method keywords!')
else:
warnings.warn('Keywords in header do not match specified method keywords!')
# TODO: "strict" should also check other aspects of the file to
# make sure that the details of the method are also the same,
# not just the keyword
self.nbins = self.hdu['PRIMARY'].header['NBINS']
unique_bins = numpy.unique(self.hdu['BINID'].data.ravel()) \
if self.hdu['PRIMARY'].header['ELMREBIN'] else None
self.missing_bins = self._get_missing_bins(unique_bins=unique_bins)
# TODO: Use the EmissionMomentsDB.channel_names function!
[docs]
def channel_names(self):
return numpy.array([ '{0}-{1}'.format(n,int(w)) \
for n,w in zip(self['ELMBAND'].data['NAME'],
self['ELMBAND'].data['RESTWAVE']) ])