# Licensed under a 3-clause BSD style license - see LICENSE.rst
# -*- coding: utf-8 -*-
r"""
Defines a class with common functions for MaNGA fits files. This class
needs to be as core as possible (little to no dependencies are
higher-level classes).
----
.. include license and copyright
.. include:: ../include/copy.rst
----
.. include common links, assuming primary doc root is up one directory
.. include:: ../include/links.rst
"""
import sys
import os
import numpy
import logging
import warnings
from IPython import embed
from scipy import sparse
from astropy.wcs import WCS
from astropy.io import fits
import astropy.constants
from mangadap import __version__
from .bitmask import BitMask
from .log import log_output
from .fileio import compress_file, create_symlink
from .pixelmask import SpectralPixelMask
[docs]
class DAPFitsUtil:
r"""
An abstract class that only implements a set of static methods for
interaction with MaNGA fits files.
.. todo::
- Make this a base class for all the fits files used/created by
the DAP?
- Use HDUList as a base class for this object?
"""
[docs]
@staticmethod
def storage_mode_options():
"""
Return the allowed storage modes.
Returns:
list: List of the allowed storage mode for DAP fits files.
"""
return [ '2d', '3d' ]
[docs]
@staticmethod
def get_spatial_shape(shape, dispaxis):
"""
Return the shape of the spatial axes given the specified
dispersion axis.
Args:
shape (tuple): Shape of the full numpy.ndarray object.
dispaxis (int): Axis with the spectra.
Returns:
tuple : Shape of the remaining axes.
"""
return shape[:dispaxis] + shape[dispaxis+1:]
[docs]
@staticmethod
def add_channel_names(hdr, names, units=None):
_hdr = hdr.copy()
nchannels = len(names)
ndig = int(numpy.log10(nchannels))+1
if units is not None and len(units) != nchannels:
raise ValueError('Length of column names and units are not the same.')
for i in range(nchannels):
_hdr['C'+'{0}'.format(i+1).zfill(ndig)] = (names[i], 'Data in channel {0}'.format(i+1))
if units is not None:
_hdr['U'+'{0}'.format(i+1).zfill(ndig)] \
= (units[i], 'Units of data in channel {0}'.format(i+1))
return _hdr
# @staticmethod
# def build_cube_header(drpf, author, maskname=None):
# hdr = drpf.hdu['FLUX'].header.copy()
# hdr = DAPFitsUtil.clean_cube_header(hdr)
# hdr['AUTHOR'] = author
# if maskname is not None:
# hdr['MASKNAME'] = maskname
# return hdr
[docs]
@staticmethod
def downselect_bins(bin_indx, good_bins):
"""
Replace values in bin_indx with -1 if they are not in good_bins.
"""
unique_bins, reconstruct = numpy.unique(bin_indx, return_inverse=True)
missing_bins = list(set(unique_bins) - set(good_bins))
unique_bins = numpy.array([ -1 if x in missing_bins else x for x in unique_bins])
return unique_bins[reconstruct]
[docs]
@staticmethod
def match_image_naxis(hdu, ext=None):
"""
Match the NAXISn header keywords to the shape of the image data.
That is, reset the header keywrods such that the shape of the
data array is (NAXIS1, NAXIS2, NAXIS3, ...).
Note that any reordering of the NAXISn keywords will *not*
change the WCS information. You will have to account for if you
need the WCS information.
.. todo::
If a WCS is detected, use WCS.swapaxes() to transpose the
WCS system!
"""
_ext = [ h.name for h in hdu ] if ext is None else ext
for e in _ext:
if isinstance(hdu[e], (fits.hdu.image.PrimaryHDU, fits.hdu.image.ImageHDU)) \
and hdu[e].data is not None:
shape = hdu[e].data.shape
for i in range(len(shape)):
hdu[e].header['NAXIS{0}'.format(i+1)] = shape[i]
return hdu
[docs]
@staticmethod
def transpose_image_data(hdu, ext=None):
"""
Transpose all image data in an HDUList.
The primary use of this function is to ensure the HDUList data
arrays use C-contiguous memory but with the natural array
ordering of the axes that follow the NAXIS keywords in the
header. I.e., data arrays provided by astropy.io.fits have
shapes like (..., NAXIS3, NAXIS2, NAXIS1). This function
reorders the data arrays to have shapes like (NAXIS1, NAXIS2,
NAXIS3, ...), and ensures that the NAXISn header keywords
reflect this. Typically, one would do::
hdu = transpose_image_data(fits.open('test.fits'))
To prepare the image data for writing to a new fits file, one
would do::
transpose_image_data(hdu).writeto('new.fits')
If you just want to ensure that the NAXISn keywords refect the
python native ordering, then use :func:`match_image_naxis`.
Note that any WCS information will **not** be transposed, which
you will have to account for if you need the WCS information.
.. todo::
If a WCS is detected, use WCS.swapaxes() to transpose the
WCS system!
"""
_ext = [ h.name for h in hdu ] if ext is None else ext
for e in _ext:
if isinstance(hdu[e], (fits.hdu.image.PrimaryHDU, fits.hdu.image.ImageHDU)) \
and hdu[e].data is not None:
hdu[e].data = numpy.asarray(hdu[e].data.T, order='C')
shape = hdu[e].data.shape
for i in range(len(shape)):
hdu[e].header['NAXIS{0}'.format(i+1)] = shape[i]
return hdu
[docs]
@staticmethod
def reconstruct_map(map_shape, bin_indx, arr, dtype=None, quiet=False):
r"""
Reconstruct a set of maps with the specified shape based on a
set of input 1D arrays. The map shape is expected to be
:math:`(N_x,N_y)` and the array shapes are expected to be
:math:`(N_{\rm pix},)`, where :math:`N_{\rm pix}` does *not*
have to match the number of unique bins. The bin_indx array is
expected to be a vector with length :math:`N_x\timesN_y`.
The data type of the output arrays is set by the input type if
the provided dtype is None; otherwise, the code attempts to case
each output map according to each element of dtype.
Returns a tuple if the number of arrays is larger than one so
that they can be caught by individual variables.
"""
# Check the input types
if not isinstance(map_shape, tuple):
raise TypeError('Map shape must be a tuple.')
if not isinstance(bin_indx, numpy.ndarray):
raise TypeError('Input bin_indx must be a numpy.ndarray.')
if not isinstance(arr, (list, tuple, numpy.ndarray)):
raise TypeError('Input arr must be a list or tuple of numpy.ndarray objects or a '
'single numpy.ndarray.')
_arr = [arr] if isinstance(arr, numpy.ndarray) else list(arr)
if dtype is None:
_dtype = []
for a in _arr:
_dtype += [a.dtype.name]
else:
_dtype = [dtype] if isinstance(dtype, str) else list(dtype)
# Check the input sizes
if len(bin_indx.shape) != 1:
raise ValueError('Input bin_indx must be a 1D vector.')
if bin_indx.size != numpy.prod(map_shape):
raise ValueError('Input bin_indx length does not match cube shape.')
narr = len(_arr)
if len(_dtype) != narr:
raise ValueError('Number of dtypes must match the number of arrays.')
for i in range(1,narr):
if _arr[i].shape != _arr[i-1].shape:
raise ValueError('All arrays must have the same shape.')
# Get the unique bins and how to reconstruct the bins from the
# unique set
unique_bins, reconstruct = numpy.unique(bin_indx, return_inverse=True)
# print(unique_bins)
# Handle missing bins by changing from the bin id to the bin index.
# Detecting whether or not there is a need for handling missing bins is
# done by comparing the unique array to a full string of all indices up
# to the maximum index in the unique array. **This assumes bins can
# only either be -1, to indicate that the spaxel/bin was not analyzed,
# or a non-negative index number** (i.e., all bin IDs must be >= -1).
# The code handles cases both with and without any ignored bins.
warn = False
if numpy.any(unique_bins < 0):
# Includes ignored bins/spaxels
if unique_bins.size != unique_bins[-1]+2 \
or numpy.any((unique_bins - numpy.arange(-1,unique_bins.size-1)) != 0):
warn = True
unique_bins = numpy.arange(-1,unique_bins.size-1)
else:
# All bins/spaxels have valid bin numbers
if unique_bins.size != unique_bins[-1]+1 \
or numpy.any((unique_bins - numpy.arange(unique_bins.size)) != 0):
warn = True
unique_bins = numpy.arange(unique_bins.size)
if warn and not quiet:
warnings.warn('Bin numbers and indices do not match. Map values are expected to be '
'sorted by their bin number.')
# if unique_bins.size != unique_bins[-1]+2 \
# or numpy.any((unique_bins - numpy.arange(-1,unique_bins.size-1)) != 0):
# if not quiet:
# warnings.warn('Bin numbers and indices do not match. Map values are expected '
# 'to be sorted by their bin number.')
# unique_bins = numpy.arange(-1,unique_bins.size-1)
# Get the valid bins
indx = bin_indx > -1
# print(map_shape, bin_indx.shape, numpy.sum(indx), numpy.prod(bin_indx.shape))
# Restructure all the arrays to match the cube shape
new_arr = numpy.empty(narr, dtype=object)
# print('narr:', narr)
for i in range(narr):
new_arr[i] = numpy.zeros(map_shape, dtype=_dtype[i])
new_arr[i].ravel()[indx] = _arr[i][unique_bins[reconstruct[indx]]].copy()
s = numpy.sum(numpy.invert(numpy.isfinite(new_arr[i])))
if s > 0:
print(i+1, narr)
print('dtype: ', _dtype[i])
a = numpy.sum(numpy.invert(numpy.isfinite(_arr[i][unique_bins[reconstruct[indx]]])))
print('not finite input:', a)
print('not finite output ravel:',
numpy.sum(numpy.invert(numpy.isfinite(new_arr[i].ravel()[indx]))))
print('not finite output:', s)
nf = numpy.invert(numpy.isfinite(new_arr[i].ravel()[indx]))
print('inp: ', _arr[i][unique_bins[reconstruct[indx]]][nf])
print('out: ', new_arr[i].ravel()[indx][nf])
new_arr[i].ravel()[indx][nf] = 0.0
raise ValueError('NaNs in data!')
return tuple([ a for a in new_arr]) if narr > 1 else new_arr[0]
[docs]
@staticmethod
def reconstruct_cube(cube_shape, bin_indx, arr, dtype=None):
r"""
Reconstruct a set of cubes with the specified shape based on a
set of input 2D arrays. The cube shape is expected to be
:math:`(N_x,N_y,N_\lamba)` and the array shapes are expected to
be :math:`(N_{\rm spec},N_\lambda)`, where :math:`N_{\rm spec}`
does *not* have to match the number of unique bins. The
bin_indx array is expected to be a vector with length
:math:`N_x\timesN_y`.
The data type of the output arrays is set by the input type if
the provided dtype is None; otherwise, the code attempts to case
each output cube accordint to each element of dtype.
Returns a tuple if the number of arrays is larger than one so
that they can be caught by individual variables.
"""
# Check the input types
if not isinstance(cube_shape, tuple):
raise TypeError('Cube shape must be a tuple.')
if not isinstance(bin_indx, numpy.ndarray):
raise TypeError('Input bin_indx must be a numpy.ndarray.')
if not isinstance(arr, (list, tuple, numpy.ndarray)):
raise TypeError('Input arr must be a list or tuple of numpy.ndarray objects or a '
'single numpy.ndarray.')
_arr = [arr] if isinstance(arr, numpy.ndarray) else list(arr)
if dtype is None:
_dtype = []
for a in _arr:
_dtype += [a.dtype.name]
else:
_dtype = [dtype] if isinstance(dtype, str) else list(dtype)
# Check the input sizes
if len(bin_indx.shape) != 1:
raise ValueError('Input bin_indx must be a 1D vector.')
if bin_indx.size != numpy.prod(cube_shape[0:2]):
raise ValueError('Input bin_indx length does not match cube shape.')
narr = len(_arr)
if len(_dtype) != narr:
raise ValueError('Number of dtypes must match the number of arrays.')
nwave = _arr[0].shape[-1]
if cube_shape[-1] != nwave:
raise ValueError('Number of pixels in arrays must match last dimension of cube.')
for i in range(1,narr):
if _arr[i].shape != _arr[i-1].shape:
raise ValueError('All arrays must have the same shape.')
# Get the unique bins and how to reconstruct the bins from the
# unique set
unique_bins, reconstruct = numpy.unique(bin_indx, return_inverse=True)
# Handle missing bins by changing from the bin id to the bin index.
# Detecting whether or not there is a need for handling missing bins is
# done by comparing the unique array to a full string of all indices up
# to the maximum index in the unique array. **This assumes bins can
# only either be -1, to indicate that the spaxel/bin was not analyzed,
# or a non-negative index number** (i.e., all bin IDs must be >= -1).
# The code handles cases both with and without any ignored bins.
warn = False
if numpy.any(unique_bins < 0):
# Includes ignored bins/spaxels
if unique_bins.size != unique_bins[-1]+2 \
or numpy.any((unique_bins - numpy.arange(-1,unique_bins.size-1)) != 0):
warn = True
unique_bins = numpy.arange(-1,unique_bins.size-1)
else:
# All bins/spaxels have valid bin numbers
if unique_bins.size != unique_bins[-1]+1 \
or numpy.any((unique_bins - numpy.arange(unique_bins.size)) != 0):
warn = True
unique_bins = numpy.arange(unique_bins.size)
if warn and not quiet:
warnings.warn('Bin numbers and indices do not match. Map values are expected to be '
'sorted by their bin number.')
# if unique_bins.size != unique_bins[-1]+2 \
# or numpy.any((unique_bins - numpy.arange(-1,unique_bins.size-1)) != 0):
# warnings.warn('Bin numbers and indices do not match. Spectra are expected '
# 'to be sorted by their bin number.')
# unique_bins = numpy.arange(-1,unique_bins.size-1)
# Get the valid bins
indx = bin_indx > -1
# Restructure all the arrays to match the cube shape
new_arr = numpy.empty(narr, dtype=object)
for i in range(narr):
new_arr[i] = numpy.zeros(cube_shape, dtype=_dtype[i])
new_arr[i].reshape(-1,nwave)[indx,:] = _arr[i][unique_bins[reconstruct[indx]],:]
return tuple([ a for a in new_arr]) if narr > 1 else new_arr[0]
[docs]
@staticmethod
def read(ofile, permissions='readonly', checksum=False):
return DAPFitsUtil.transpose_image_data(fits.open(ofile, mode=permissions,
checksum=checksum))
[docs]
@staticmethod
def write(hdu, ofile, overwrite=False, checksum=False, symlink_dir=None, relative_symlink=True,
loggers=None, quiet=False):
"""
Write an HDUList to an output file. It is expected that the hdu
was either read by the :func:`read` function, or that the hdu
was passed through :func:`transpose_image_data` such that
HDUList data arrays use C-contiguous memory but with the natural
array ordering of the axes that follow the NAXIS keywords in the
header.
"""
# Get the output file and determine if it should be compressed
compress = False
if ofile.split('.')[-1] == 'gz':
_ofile = ofile[:ofile.rfind('.')]
compress = True
else:
_ofile = ofile
# Transpose the order
hdu = DAPFitsUtil.transpose_image_data(hdu)
# Write the data
if not quiet:
log_output(loggers, 1, logging.INFO, 'Writing: {0}'.format(_ofile))
hdu.writeto(_ofile, overwrite=overwrite, checksum=checksum)
# Transpose it back
hdu = DAPFitsUtil.transpose_image_data(hdu)
# Compress if desired
if compress:
if not quiet:
log_output(loggers, 1, logging.INFO, 'Compressing: {0}'.format(ofile))
# And compress it
compress_file(_ofile, overwrite=overwrite)
os.remove(_ofile)
# Create the symlink if requested
if symlink_dir is not None:
create_symlink(ofile, symlink_dir, relative_symlink=relative_symlink, loggers=loggers,
quiet=quiet)
[docs]
@staticmethod
def unique_bins(bin_indx, return_index=False):
"""
Get the unique bins and the indices of the unique bins in the
flattened spatial dimension, ignoring the bins with indices of
-1.
"""
uniq, indx = numpy.unique(bin_indx.ravel(), return_index=True)
if uniq[0] == -1:
uniq = uniq[1:]
indx = indx[1:]
return (uniq, indx) if return_index else uniq
# return tuple(map(lambda x : x[1:], numpy.unique(bin_indx.ravel(), return_index=True))) \
# if return_index else numpy.unique(bin_indx.ravel())[1:]
# TODO: Consolidate these with what's in datacube and rowstackedspectra.
[docs]
@staticmethod
def copy_to_array(hdu, ext='FLUX', allowed_ext=None, waverange=None, select_bins=None,
missing_bins=None, nbins=None, unique_bins=None):
r"""
Return a copy of the selected data array. The array size is
always :math:`N_{\rm spec} \times N_{\rm wavelength}`; i.e., the
CUBE files are flattened to two dimensions, matching the
dimensionality of the RSS files. The spatial positions within
the original DRP file for each spectrum are given by tuples in
:attr:`spatial_index`. See :func:`copy_to_masked_array` for
arguments.
Returns:
numpy.ndarray: A 2D array with a copy of the data from the
selected extension.
"""
masked_data = DAPFitsUtil.copy_to_masked_array(hdu, ext=ext, allowed_ext=allowed_ext,
waverange=waverange, select_bins=select_bins,
missing_bins=missing_bins, nbins=nbins,
unique_bins=unique_bins)
# For this approach, the wavelengths masked should be
# *identical* for all spectra
nwave = numpy.sum(numpy.invert(numpy.ma.getmaskarray(masked_data))[0,:])
if nwave == 0:
raise ValueError('Full wavelength range has been masked!')
# Compression returns a flattened array, so it needs to be
# reshaped for the new number of (unmasked) wavelength channels
return masked_data.compressed().reshape(-1,nwave)
[docs]
@staticmethod
def copy_to_masked_array(hdu, ext='FLUX', mask_ext=None, flag=None, bitmask=None,
allowed_ext=None, waverange=None, select_bins=None, missing_bins=None,
nbins=None, unique_bins=None):
r"""
Return a copy of the selected data array as a masked array.
This is functionally identical to :func:`DRPFits.copy_to_array`,
except the output format is a `numpy.ma.MaskedArray`_. The
pixels that are considered to be masked can be specified using
the `flag` option.
Args:
waverange (array-like) : (**Optional**) Two-element array
with the first and last wavelength to include in the
computation. Default is to use the full wavelength
range.
ext (str) : (**Optional**) Name of the extension from which
to draw the data. Must be allowed for the current
:attr:`mode`; see :attr:`spectral_arrays`. Default is
`'FLUX'`.
mask_ext (str) : (**Optional**) Name of the extension with
the mask bit data. Must be allowed for the current
:attr:`mode`; see :attr:`spectral_arrays`. Default is
`'MASK'`.
flag (str or list): (**Optional**) (List of) Flag names that
are considered when deciding if a pixel should be
masked. The names *must* be a valid bit name as defined
by the provided bitmask (see
:class:`mangadap.util.bitmask.BitMask`). If not
provided, *ANY* non-zero mask bit is omitted.
bitmask (:class:`mangadap.util.bitmask.BitMask`):
(**Optional**) BitMask object used to determine if pixel
is flagged as being masked by a specific maskbit.
allowed_ext (array-like): (**Optional**) List of allowed
extenstions that can be copied to an array; the mask
extension **must** be one of these.
waverange (array-like): (**Optional**) A two-element
array with the valid wavelength range. Anything outside
of this range is masked.
select_bins (array-like): (**Optional**) A list of spectra
in the flattened cube (with shape :math:`N_{\rm spec}
\times N_\lambda`) to return.
missing_bins (list): (**Optional**) A list of bin numbers
that are missing from the output selection and that will
be replaced with masked spectra.
nbins (int): (**Optional**) The total number of defined
bins. Default is to find the maximum number in the
unique_bins list.
unique_bins (array-like): (**Optional**) The indices of the
bins that have valid spectra. The length of this array
should match the selected_bins array.
Returns:
numpy.ndarray: A 2D array with a copy of the data from the
selected extension.
Raises:
KeyError : Raised if `ext` is not a valid extension.
"""
# Make sure extensions are not lists
_ext = ext
if isinstance(ext, list):
if len(ext) > 1:
raise ValueError('Must provide only one extension.')
_ext = ext[0]
_mask_ext = mask_ext
if isinstance(_mask_ext, list):
if len(_mask_ext) > 1:
raise ValueError('Must provide only one mask extension.')
_mask_ext = mask_ext[0]
# Make sure operation is allowed
if allowed_ext is not None and _ext not in allowed_ext:
raise KeyError('Cannot copy data to array for extension {0}.'.format(_ext))
if allowed_ext is not None and _mask_ext is not None and _mask_ext not in allowed_ext:
raise KeyError('Cannot use extension {0} as mask.'.format(_mask_ext))
# Make sure masks and flags make sense
if flag is not None and bitmask is None:
raise ValueError('If flags are provided, must also provided associated BitMask.')
if bitmask is not None and not isinstance(bitmask, BitMask):
raise TypeError('Provided object must be a BitMask.')
# Total number of available spectra
nspec = numpy.prod(hdu[_ext].data.shape[0:-1])
# Create the wavelength mask (will be all true if
# waverange=None)
mask = SpectralPixelMask(waverange=waverange).boolean(hdu['WAVE'].data, nspec=nspec)
# Add in any bitmasks
if _mask_ext is not None and bitmask is not None:
mask |= bitmask.flagged(hdu[_mask_ext].data.reshape(nspec,-1), flag=flag)
if _mask_ext is not None and bitmask is None:
mask |= (hdu[_mask_ext].data > 0)
# Create the output MaskedArray
a = numpy.ma.MaskedArray(hdu[_ext].data.copy().reshape(nspec,-1), mask=mask)
# Apply any bin selection
if select_bins is not None:
a = a[select_bins,:]
# No missing bins are specified, so return the array
if missing_bins is None or len(missing_bins) == 0:
return a
# Missing bins have been specified, so the unique_bins also need
# to be specified
if unique_bins is None:
raise ValueError('Must specify the unique bins if missing bins are specified.')
if len(unique_bins) != a.shape[0]:
raise ValueError('Number of unique bins does not match the number of spectra.')
# If the number of bins hasn't been provided, construct it based
# on the unique bins specified
_nbins = max(unique_bins)+1 if nbins is None else nbins
# Create the array with the right shape and mask everything
_a = numpy.ma.MaskedArray(numpy.zeros((_nbins, a.shape[-1]), dtype=hdu[_ext].data.dtype),
mask=numpy.ones((_nbins, a.shape[-1]), dtype=bool))
# Copy the data to the correct bins, which also unmasks these
# pixels
_a[unique_bins,:] = a
# Return the masked array; "missing bins" are fully masked
return _a
[docs]
@staticmethod
def marginalize_mask(mask, inp_flags=None, inp_bitmask=None, out_flag=None, out_bitmask=None,
out_mask=None, dispaxis=2):
"""
Marginalize a mask over one of its spectral dimension.
The output mask selects pixels that are masked by *all* pixels along the
spectral axis.
If no optional arguments are provided, the input mask must be a boolean
array and the result is identically ``numpy.all(mask, axis=dispaxis)``.
Args:
mask (`numpy.ndarray`_):
The mask with more than 1 dimension. The spectral axis is
defined by ``dispaxis``. The array can be either a bitmask or a
boolean mask; the latter is assumed if ``inp_bitmask`` is not
provided, and vice versa.
inp_flags (:obj:`str`, :obj:`list`, optional):
The list of input flags used to select masked values in bitmask
arrays. If None and ``inp_bitmask`` is provided, all flags are
check individually.
inp_bitmask (:class:`~mangadap.util.bitmask.BitMask`, optional):
Object used to identify bits in the input mask.
out_flag (:obj:`str`, optional):
If returning a bitmask, this sets the flag to use to identify
the masked pixels. If None and ``inp_bitmask`` is provided, the
output flags are identical to the input flags.
out_bitmask (:class:`~mangadap.util.bitmask.BitMask`, optional):
Object used to identify bits in the output mask.
out_mask (`numpy.ndarray`_, optional):
If provided, any masking is applied directly to this array;
i.e., this is both modified and returned by the function. If
provided, the array must have the correct shape (same as the
input array wihtout the spectral axis).
dispaxis (:obj:`int`, optional):
The spectral axis of the array.
Returns:
`numpy.ndarray`_: A mask array with the marginalized mask. The
shape is the same as the input array, but without the spectral axis.
Raises:
ValueError:
Raised if the input arrays have incorrect shapes.
"""
# Check the input
if not mask.ndim > 1:
raise ValueError('Input mask array must have more than one dimension.')
itype = bool if inp_bitmask is None else inp_bitmask.minimum_dtype()
# TODO: Need to use 'same_kind' here instead of 'equiv' because MaNGA
# DRP doesn't necessarily use the minimum dtype for the masks.
if not numpy.can_cast(mask.dtype, numpy.dtype(itype), casting='same_kind'):
raise TypeError(f'Cannot cast input mask with type {mask.dtype} to expected type '
f'{numpy.dtype(itype)}.')
spatial_shape = DAPFitsUtil.get_spatial_shape(mask.shape, dispaxis)
if out_mask is not None and out_mask.shape != spatial_shape:
raise ValueError('Provided output mask has incorrect shape; expected '
f'{spatial_shape}, found {out_mask.shape}.')
if inp_bitmask is None and inp_flags is not None:
raise ValueError('Cannot interpret input flags without inp_bitmask.')
if out_bitmask is None and out_flag is not None:
raise ValueError('Cannot interpret output flags without out_bitmask.')
if out_bitmask is not None and out_flag is None:
raise ValueError('If providing the output bitmask, must also provide the output flag.')
otype = bool if out_bitmask is None else out_bitmask.minimum_dtype()
# TODO: May need to use 'same_kind' instead of 'equiv'
if out_mask is not None \
and not numpy.can_cast(out_mask.dtype, numpy.dtype(otype), casting='equiv'):
raise TypeError('Provided output mask has incorrect type; expected '
f'{numpy.dtype(otype)}, found {out_mask.dtype}.')
_out_mask = numpy.zeros(spatial_shape, dtype=otype) if out_mask is None else out_mask
if inp_bitmask is None:
# Handle the case when the input mask is a boolean array
indx = numpy.all(mask, axis=dispaxis)
if out_bitmask is None:
_out_mask |= indx
return _out_mask
_out_mask[indx] = out_bitmask.turn_on(_out_mask[indx], out_flag)
return _out_mask
# Handle the case when the input mask is a bit mask
if inp_flags is None:
inp_flags = inp_bitmask.keys()
_inp_flags = inp_flags if isinstance(inp_flags, list) else [inp_flags]
for flag in _inp_flags:
indx = numpy.all(inp_bitmask.flagged(mask, flag=flag), axis=dispaxis)
if numpy.sum(indx) == 0:
continue
if out_bitmask is None:
_out_mask |= indx
continue
_out_mask[indx] = out_bitmask.turn_on(_out_mask[indx],
(flag if out_flag is None else out_flag))
return _out_mask
[docs]
@staticmethod
def _mask_data_type(bit_type):
if bit_type in [numpy.uint64, numpy.int64]:
return ('FLAG64BIT', '64-bit mask')
if bit_type in [numpy.uint32, numpy.int32]:
return ('FLAG32BIT', '32-bit mask')
if bit_type in [numpy.uint16, numpy.int16]:
return ('FLAG16BIT', '16-bit mask')
if bit_type in [numpy.uint8, numpy.int8]:
return ('FLAG8BIT', '8-bit mask')
if bit_type in [bool, numpy.bool_]:
return ('MASKZERO', 'Binary mask; zero values are good/unmasked')
raise ValueError('Invalid bit_type: {0}!'.format(str(bit_type)))
[docs]
@staticmethod
def empty_hdus(ext):
hdus = []
for e in ext:
hdus += [ fits.ImageHDU(data=None, name=e) ]
return hdus
[docs]
@staticmethod
def list_of_image_hdus(data, hdr, ext):
hdus = []
for d,h,e in zip(data,hdr,ext):
hdus += [ fits.ImageHDU(data=d, header=h, name=e) ]
return hdus
[docs]
@staticmethod
def redshift_to_Newtonian_velocity(v, redshift, ivar=False):
if ivar:
return v*numpy.square(1.0+redshift)
return (v-astropy.constants.c.to('km/s').value*redshift)/(1.0+redshift)