Source code for mangadap.util.pixelmask

# Licensed under a 3-clause BSD style license - see LICENSE.rst
# -*- coding: utf-8 -*-
"""
A class hierarchy for pixel masks.

----

.. include license and copyright
.. include:: ../include/copy.rst

----

.. include common links, assuming primary doc root is up one directory
.. include:: ../include/links.rst
"""
import numpy
import astropy.constants

from .bitmask import BitMask
from ..par.artifactdb import ArtifactDB
from ..par.emissionlinedb import EmissionLineDB

from matplotlib import pyplot


[docs] class PixelMask: """ Base class for a general 1D or 2D pixel mask. Attributes: shape (tuple): Shape of the array for the mask. """ def __init__(self): self.shape = None
[docs] def _set_shape(self, x, ny=None): """ Set :attr:`shape` of the object Args: x (numpy.ndarray): Vector with the x-coordinates ny (int): (**Optional**) Size of the second dimension. Default is that there is no second dimension. """ self.shape = (len(x),) if ny is None else (ny,len(x))
[docs] def _empty_mask(self, x, ny=None): """ Return an empty mask with the correct shape. Args: x (numpy.ndarray): Coordinate vector ny (int): (**Optional**) Size of the second dimension. Default is that there is only one dimension. Returns: numpy.ndarray : Boolean mask of the correct shape. """ self._set_shape(x, ny=ny) return numpy.full(self.shape, False, dtype=bool)
[docs] def _mask_coordinate_ranges(self, x, rng, ny=None): """ Flag any x coordinates between a set of range limits as True. The mask is repeated in the second dimension, if requested. Args: x (numpy.ndarray): Coordinate vector rng (list, numpy.ndarray): (List of) Coordinate ranges that should be masked. ny (int): (**Optional**) Size of the second dimension. Default is that there is only one dimension. Returns: numpy.ndarray : Boolean mask of the correct shape. """ self._set_shape(x, ny=ny) _rng = numpy.atleast_2d(rng) # Account for any undefined limits _rng[numpy.equal(_rng[:,0], None),0] = x[0]-1 _rng[numpy.equal(_rng[:,1], None),1] = x[-1]+1 # Generate the mask mask = numpy.any( numpy.array([ numpy.logical_and(x>l, x<u) \ for l,u in zip(_rng[:,0], _rng[:,1])]), axis=0) return mask if len(self.shape) == 1 else numpy.array([mask]*self.shape[0])
[docs] class SpectralPixelMask(PixelMask): """ Container that produces a mask for the stellar continuum based on a set of emission lines and artifacts. Args: artdb (:class:`mangadap.proc.artifactdb.ArtifactDB`): (**Optional**) Database with the list of artifacts to mask. emldb (:class:`mangadap.proc.emissionlinedb.EmissionLineDB`): (**Optional**) Database with the list of emission lines to mask. waverange (numpy.ndarray): (**Optional**) Any pixels **outside** this wavelength range are masked. Attributes: artdb (:class:`mangadap.proc.artifactdb.ArtifactDB`): Database with the list of artifacts to mask. emldb (:class:`mangadap.proc.emissionlinedb.EmissionLineDB`): Database with the list of emission lines to mask. waverange (numpy.ndarray): Any pixels **outside** this wavelength range are masked. """ def __init__(self, artdb=None, emldb=None, waverange=None, nsig=None): if artdb is not None and not isinstance(artdb, ArtifactDB): raise TypeError('Must provide an ArtifactDB for artifacts to mask.') self.artdb = artdb if emldb is not None and not isinstance(emldb, EmissionLineDB): raise TypeError('Must provide EmissionLineDB for emission-lines to mask.') self.emldb = emldb if waverange is not None and len(waverange) != 2: raise ValueError('Provided wavelength range must have two and only two elements.') self.waverange = waverange ## KHRR added nsig here and above self.nsig = 3. if nsig is None else nsig
[docs] def _waverange_mask(self, wave, nspec=None): """ Mask the pixels **not** within the selected wavelength range. Args: wave (numpy.ndarray): Wavelength coordinate vector nspec (int): (**Optional**) Number of spectra to mask. Default is just one. Returns: numpy.ndarray : Boolean mask of the correct shape. """ if self.waverange is None: return self._empty_mask(wave, ny=nspec) return numpy.invert(self._mask_coordinate_ranges(wave, self.waverange, ny=nspec))
[docs] def _artifact_mask(self, wave, nspec=None): """ Mask the pixels in the wavelength range(s) defined by the artifact database. Args: wave (numpy.ndarray): Wavelength coordinate vector nspec (int): (**Optional**) Number of spectra to mask. Default is just one. Returns: numpy.ndarray : Boolean mask of the correct shape. """ if self.artdb is None: return self._empty_mask(wave, ny=nspec) return self._mask_coordinate_ranges(wave, self.artdb['waverange'], ny=nspec)
[docs] def _check_eml_kin_argument(self, kin): """ Check and return the correct kinematics vectors. If a single number is provided, the function returns the number repeated for each emission line. Args: kin (float, list, numpy.ndarray): An input set of kinematics to used by the mask. Returns: numpy.ndarray: A 1D float array of the correct shape with the kinematics Raises: ValueError: Raised if the length of the `kin` array is not the same as the number of emission lines in :attr:`emldb`. """ if kin is None: return None if isinstance(kin, (int, float, numpy.floating, numpy.integer)): return numpy.full(self.emldb.size, kin, dtype=float) if isinstance(kin, (list, numpy.ndarray)): if len(kin) != self.emldb.size: raise ValueError('Provided vector does not have a matching length.') return numpy.atleast_1d(kin).astype(float)
[docs] def _get_emission_line_bands(self, velocity_offsets=0.0, sigma=250.0): #, nsigma=3.0): r""" Set the emission-line masks, using the emission-line parameters defined by :attr:`emldb` (see :class:`mangadap.par.emissionlinedb.EmissionLineDB`. All emission lines in the database, except for those marked with `action==i` are masked. The center of the masked region is constructed using `emldb['restwave']` and the provided velocity (`velocity_offset`). The velocity of any lines marked as sky (`action==s`) is always set to 0. The width of the mask is set to be :math:`\pm n_\sigma \sigma` about this center, where both :math:`n_\sigma` and :math:`\sigma` are parameters (`nsigma`, `sigma`). If `sigma` is None, `emldb['sig']` is used for each line. Args: velocity_offsets (float, numpy.ndarray): (**Optional**) The velocity offset to apply to each emission-line mask. Must be either a single number or one number per emission line. Assumed to be 0 if set to None. sigma (float, numpy.ndarray): (**Optional**) Line velocity dispersions used to set the mask width. Must be either a single number or one number per emission line. If None, the dispersion provided by the emission-line database is used (`emldb['sig']`). nsigma (float, numpy.ndarray): (**Optional**) The half-width of the band in units of the provided velocity dipsersions. Must be either a single number or one number per emission line. Cannot be None. Returns: numpy.ndarray : A :math:`N_l \times 2` array with the wavelength limits of the emission-line bands. Raises: ValueError: Raised if `nsigma` is None, or any `nsigma` is not greater than 0; raised if the half-width of any mask is not greater than 0. """ # Mask everything but the lines to ignore indx = numpy.invert(self.emldb['action'] == 'i') nbands = numpy.sum(indx) # No lines to mask if nbands == 0: return None # Get the number of standard deviations to cover with the mask _nsigma = self._check_eml_kin_argument(self.nsig) # used to be (nsigma) if _nsigma is None: raise ValueError('Must provide the number of sigma to cover with the mask.') if numpy.any(numpy.invert(_nsigma > 0)): raise ValueError('Must provide a non-zero number of sigma for mask hal-fwidth.') # Get the mask centers _velocity_offsets = self._check_eml_kin_argument(velocity_offsets) _velocity_offsets[ self.emldb['action'] == 's' ] = 0.0 center = self.emldb['restwave'][indx] if _velocity_offsets is None else \ self.emldb['restwave'][indx] \ * (1.0 + _velocity_offsets[indx]/astropy.constants.c.to('km/s').value) # Get the mask widths _sigma = self._check_eml_kin_argument(sigma) halfwidth = _nsigma[indx] * (self.emldb['sig'][indx] if _sigma is None else _sigma[indx]) if numpy.any(numpy.invert(halfwidth > 0)): raise ValueError('All emission-line mask half-widths must be larger than 0.') return numpy.array([ center*(1.0-halfwidth/astropy.constants.c.to('km/s').value), center*(1.0+halfwidth/astropy.constants.c.to('km/s').value) ]).T
[docs] def _emission_line_mask(self, wave, nspec=None, velocity_offsets=0.0, sigma=250.0): #nsigma=3.0): """ Mask the pixels in the wavelength range(s) defined by the emission-line database that has been adjusted by a set of velocities and dispersions. Currently, the velocity offsets are applied to all lines for each spectrum, whereas sigma and nsigma are applied to all spectra for each line. Args: wave (numpy.ndarray): Wavelength coordinate vector nspec (int): (**Optional**) Number of spectra to mask. Default is just one. velocity_offsets (float, numpy.ndarray): (**Optional**) One or more velocity offsets to apply to the emission-line bands on a spectrum-by-spectrum basis. Default is to apply no velocity offset. sigma (float, numpy.ndarray): (**Optional**) One or more velocity dispersions to use for setting the width of the emission-line band on a line-by-line basis. Default is a width based on a dispersion of 250 km/s. nsigma (float, numpy.ndarray): (**Optional**) One or more numbers that sets the width of the band in units of the provided velocity dipsersions band on a line-by-line basis. Returns: numpy.ndarray : Boolean mask of the correct shape. """ if self.emldb is None: return self._empty_mask(wave, ny=nspec) if isinstance(velocity_offsets, (list, numpy.ndarray)) and len(velocity_offsets) > 1: _velocity_offsets = numpy.asarray(velocity_offsets) if len(_velocity_offsets) != nspec: raise ValueError('Velocity offsets do not match the number of spectra.') mask = self._empty_mask(wave, ny=nspec) for i in range(len(_velocity_offsets)): waverange = self._get_emission_line_bands(velocity_offsets=_velocity_offsets[i], sigma=sigma) #nsigma=self.nsig) mask[i,:] = self._mask_coordinate_ranges(wave, waverange) return mask _velocity_offsets = velocity_offsets[0] \ if isinstance(velocity_offsets, (list, numpy.ndarray)) \ and len(velocity_offsets) == 1 else velocity_offsets waverange = self._get_emission_line_bands(velocity_offsets=_velocity_offsets, sigma=sigma) # nsigma=self.nsig) return self._mask_coordinate_ranges(wave, waverange, ny=nspec)
[docs] def boolean(self, wave, nspec=None, velocity_offsets=0.0, sigma=250.0): # nsigma=3.0): """ Construct the full boolean mask that includes the desired wavelength range, omitting artifacts, and omitting emission lines. Args: wave (numpy.ndarray): Wavelength coordinate vector nspec (int): (**Optional**) Number of spectra to mask. Default is just one. velocity_offsets (float, numpy.ndarray): (**Optional**) One or more velocity offsets to apply to the emission-line bands on a spectrum-by-spectrum basis. Default is to apply no velocity offset. sigma (float, numpy.ndarray): (**Optional**) One or more velocity dispersions to use for setting the width of the emission-line band on a line-by-line basis. Default is a width based on a dispersion of 250 km/s. nsigma (float, numpy.ndarray): (**Optional**) One or more numbers that sets the width of the band in units of the provided velocity dipsersions band on a line-by-line basis. Returns: numpy.ndarray : Boolean mask of the correct shape. """ if nspec is not None and not nspec > 0: raise ValueError('Number of spectra must be larger than 0!') return self._waverange_mask(wave, nspec=nspec) | self._artifact_mask(wave, nspec=nspec) \ | self._emission_line_mask(wave, nspec=nspec, velocity_offsets=velocity_offsets, sigma=sigma) # nsigma=self.nsig)
[docs] def bits(self, bitmask, wave, nspec=None, mask=None, velocity_offsets=0.0, sigma=250.0, # nsigma=3.0, waverange_flag='OUTSIDE_RANGE', art_flag='ARTIFACT', eml_flag='EML_REGION'): """ Construct a bit mask that signifies pixels as outside the desired wavelength range, as being affected by an artifact, and being designated as an emission-line region. To simply flag the pixels as a binary masked or unmasked, use :func:`boolean`. Args: bitmask (:class:`mangadap.util.bitmask.BitMask`): Bitmask used to flag pixels. The flags waverange_flag, art_flag, and eml_flag *must* be defined in the provided instance of BitMask. wave (numpy.ndarray): Wavelength coordinate vector nspec (int): (**Optional**) Number of spectra to mask. Default is just one. mask (numpy.array): (**Optional**) Baseline mask to add pixels masks. Should have data type that matches provided bitmask. Default is to start with all pixels unmasked. velocity_offsets (float, numpy.ndarray): (**Optional**) One or more velocity offsets to apply to the emission-line bands on a spectrum-by-spectrum basis. Default is to apply no velocity offset. sigma (float, numpy.ndarray): (**Optional**) One or more velocity dispersions to use for setting the width of the emission-line band on a line-by-line basis. Default is a width based on a dispersion of 250 km/s. nsigma (float, numpy.ndarray): (**Optional**) One or more numbers that sets the width of the band in units of the provided velocity dipsersions band on a line-by-line basis. waverange_flag (str): (**Optional**) Bitmask name used to flag a pixel as outside of the desired wavelength range. Default is 'OUTSIDE_RANGE'. art_flag (str): (**Optional**) Bitmask name used to flag a pixel being affected by an artifact. Default is 'ARTIFACT'. eml_flag (str): (**Optional**) Bitmask name used to flag a pixel being within an emission-line region. Default is 'EML_REGION'. Returns: numpy.ndarray : Bit mask of the correct shape. """ # Check the bitmask type if not isinstance(bitmask, BitMask): raise TypeError('Must provide object of type BitMask.') if nspec is not None and not nspec > 0: raise ValueError('Number of spectra must be larger than 0!') # Get the wavelength range mask wavemask = self._waverange_mask(wave, nspec=nspec) # Check that the input mask has the same size if mask is not None and wavemask.shape != mask.shape: raise ValueError('Input mask does not have the correct shape.') # Get the artifact mask artmask = self._artifact_mask(wave, nspec=nspec) # Get the emission-line mask emlmask = self._emission_line_mask(wave, nspec=nspec, velocity_offsets=velocity_offsets, sigma=sigma) # nsigma=self.nsig) # Construct and return the mask _mask = numpy.zeros(wavemask.shape, dtype=bitmask.minimum_dtype()) \ if mask is None else mask if numpy.sum(wavemask) > 0: _mask[wavemask] = bitmask.turn_on(_mask[wavemask], flag=waverange_flag) if numpy.sum(artmask) > 0: _mask[artmask] = bitmask.turn_on(_mask[artmask], flag=art_flag) if numpy.sum(emlmask) > 0: _mask[emlmask] = bitmask.turn_on(_mask[emlmask], flag=eml_flag) return _mask