# Licensed under a 3-clause BSD style license - see LICENSE.rst
# -*- coding: utf-8 -*-
r"""
Defines a class used to store and interface with covariance matrices.
.. todo::
- Allow for calculation of the inverse of the covariance matrix.
- Instead of 3D covariance cubes being an array of sparse objects,
make the whole thing a sparse array.
- The usage examples need to be updated!
Usage examples
--------------
You can calculate the covariance matrix for a given wavelength
channel in a :class:`mangadap.drpfits.DRPFits` object::
# Access the DRP RSS file
from mangadap.drpfits import DRPFits
drpf = DRPFits(7495, 12703, 'RSS', read=True)
# Calculate a single covariance matrix
C = drpf.covariance_matrix(2281)
# Show the result in an image
C.show()
# Access specific elements
print(C[0,0])
# Convert to a 'dense' array
dense_C = C.toarray()
# Get the triplets of the non-zero elements
i, j, v = C.find()
# Write it to disk (overwrite existing file)
C.write('test_covariance.fits', overwrite=True)
The covariance matrix is stored in "coordinate" format in a fits
binary table. Since the covariance matrix is symmetric by definition,
only those non-zero elements in the upper triangle (:math:`C_{ij}`
where :math:`i\leq j`) are saved (in memory or on disk). You can read
an existing covariance matrix fits file::
from mangadap.util.covariance import Covariance
C = Covariance(ifile='test_covariance.fits')
You can calculate a set of covariance matrices or the full covariance
cube::
# Access the DRP RSS file
from mangadap.drpfits import DRPFits
drpf = DRPFits(7495, 12703, 'RSS', read=True)
# Calculate the full cube
CC = drpf.covariance_cube() # BEWARE: This may require a LOT of memory
# Calculate fewer but many covariance matrices
channels = [ 0, 1000, 2000, 3000, 4000 ]
C = drpf.covariance_cube(channels=channels)
# Access individual elements
print(C[0,0,0])
print(C[0,0,1000])
# Write the full cube, or set of channels
CC.write('full_covariance_cube.fits') # BEWARE: This will be a BIG file
C.write('covariance_channel_set.fits')
Although you can access the data in the covariance matrix as
explained above, this is generally inefficient because the
:func:`Covariance.__getitem__` function currently cannot handle
slices. If you need to perform operations with the covariance matrix,
you're better off working with the :attr:`Covariance.cov` attribute
directly. To do this, you won't be able to use the aliasing of the
channel indices for 3D covariance matrices.
The :class:`Covariance` class also allows you to toggle between
accessing the matrix as a true covariance matrix, or by splitting the
matrix into its variance and correlation components. For example,::
# Get the covariance matrix for a single wavelength channel
from mangadap.drpfits import DRPFits
drpf = DRPFits(7495, 12703, 'RSS', read=True)
C = drpf.covariance_matrix(2281)
# Show the covariance matrix before and after changing it to a
# correlation matrix
C.show()
C.to_correlation()
C.show()
print(C.is_correlation)
C.revert_correlation()
Covariance matrices that have been converted to correlation matrices
can be written and read in without issue. See
:func:`Covariance.write` and :func:`Covariance.read`. For example::
# Get the covariance matrix for a single wavelength channel
import numpy
from mangadap.util.covariance import Covariance
from mangadap.drpfits import DRPFits
drpf = DRPFits(7495, 12703, 'RSS', read=True)
channels = [ 0, 1000, 2000, 3000, 4000 ]
Cov = drpf.covariance_cube(channels=channels)
Cov.to_correlation()
Cov.show(channel=2000)
Cov.write('correlation_matrix.fits', overwrite=True)
Cov.revert_correlation()
Corr = Covariance(ifile='correlation_matrix.fits')
Corr.revert_correlation()
assert not (numpy.abs(numpy.sum(Cov.toarray(channel=2000)
- Corr.toarray(channel=2000))) > 0.0)
----
.. include license and copyright
.. include:: ../include/copy.rst
----
.. include common links, assuming primary doc root is up one directory
.. include:: ../include/links.rst
"""
import os
import numpy
import warnings
from IPython import embed
from scipy import sparse
from astropy.io import fits
from matplotlib import pyplot
# TODO: Can I avoid using DAPFitsUtil here?
from .fitsutil import DAPFitsUtil
[docs]class Covariance:
r"""
A general utility for storing, manipulating, and file I/O of
sparse covariance matrices.
.. todo::
- Can create empty object. Does this make sense?
.. warning::
Although possible to abstract further, use of ``raw_shape``
can only be 2D in the current implementation.
Args:
inp (`scipy.sparse.csr_matrix`_, `numpy.ndarray`_):
Covariance matrix to store. Input **must** be covariance
data, not correlation data. Data type can be either a
single `scipy.sparse.csr_matrix`_ object or a 1-D array
of them. Assumes all sparse matrices in the input ndarray
have the same size. If None, the covariance object is
instantiated empty.
input_indx (`numpy.ndarray`_, optional):
If *inp* is an array of `scipy.sparse.csr_matrix`_
objects, this is an integer array specifying a pseudo-set
of indices to use instead of the direct index in the
array. I.e., if *inp* is an array with 5 elements, one
can provide a 5-element array for :attr:`input_indx` that
are used instead as the designated index for the
covariance matrices. See: :func:`__getitem__`,
:func:`_grab_true_index`.
impose_triu (:obj:`bool`, optional):
Flag to force the `scipy.sparse.csr_matrix`_ object to
only be the upper triangle of the covariance matrix. The
covariance matrix is symmetric such that C_ij = C_ji, so
it's not necessary to keep both values. This flag will
force a call to `scipy.sparse.triu`_ when setting the
covariance matrix. Otherwise, the input matrix is
**assumed** to only have the upper triangle of numbers.
correlation (:obj:`bool`, optional):
Convert the input to a correlation matrix. The input
**must** be the covariance matrix.
raw_shape (:obj:`tuple`, optional):
The covariance data is for a higher dimensional array
with this shape. For example, if the covariance data is
for a 2D image with shape ``(nx,ny)`` -- the shape of the
covariance array is be ``(nx*ny, nx*ny)`` -- provide
``raw_shape=(nx,ny)``. This is primarily used for reading
and writing; see also :func:`transpose_raw_shape`. If
None, any higher dimensionality is ignored. When the
:class:`Covariance` object contains multiple covariance
arrays, this raw shape should be applicable to *all* of
them.
Raises:
TypeError:
Raised if the input array is not one-dimensional or the
input covariance matrix are not
`scipy.sparse.csr_matrix`_ objects.
ValueError:
Raised if the :attr:`input_indx` either does not have the
correct size or is not required due to the covariance
object being a single matrix.
Attributes:
cov (`scipy.sparse.csr_matrix`_, `numpy.ndarray`_):
The covariance matrix stored in sparse format.
shape (:obj:`tuple`):
Shape of the full array.
raw_shape (:obj:`tuple`):
The covariance data is for a higher dimensional array
with this shape. For example, if the covariance data is
for a 2D image, this would be ``(nx,ny)`` and the shape
of the covariance array would be ``(nx*ny, nx*ny)``.
dim (:obj:`int`):
The number of dimensions in the covariance matrix. This
can either be 2 (a single covariance matrix) or 3
(multiple covariance matrices).
nnz (:obj:`int`):
The number of non-zero covariance matrix elements.
input_indx (`numpy.ndarray`_):
If :attr:`cov` is an array of `scipy.sparse.csr_matrix`_
objects, this is an integer array specifying a pseudo-set
of indices to use instead of the direct index in the
array. I.e., if :attr:`cov` is an array with 5 elements,
one can provide a 5-element array for :attr:`input_indx`
that are used instead as the designated index for the
covariance matrices. See: :func:`__getitem__`,
:func:`_grab_true_index`.
inv (`scipy.sparse.csr_matrix`_):
The inverse of the covariance matrix. **This is not
currently calculated!**
var (`numpy.ndarray`):
Array with the variance provided by the diagonal of
the/each covariance matrix. This is only populated if
necessary, either by being requested (:func:`variance`)
or if needed to convert between covariance and
correlation matrices.
is_correlation (:obj:`bool`):
Flag that the covariance matrix has been saved as a
variance vector and a correlation matrix.
"""
def __init__(self, inp, input_indx=None, impose_triu=False, correlation=False, raw_shape=None):
self.cov = inp
self.shape = None
self.raw_shape = raw_shape
if self.raw_shape is not None and len(self.raw_shape) != 2:
raise NotImplementedError('The source raw_shape can only be 2D.')
self.dim = None
self.nnz = None
self.input_indx = None
self.inv = None
self.var = None
self.is_correlation = False
# Return empty object
if self.cov is None:
return
# Set the dimensionality, check that each element of the array
# has the correct type, count the number of non-zero covariance
# values, and (if necessary) initialize the indices for each
# covariance matrix
if isinstance(self.cov, numpy.ndarray):
if len(self.cov.shape) > 1:
raise TypeError('Input ndarray can only be one-dimensional')
self.dim = 3
self.nnz = 0
for cov in self.cov:
if not sparse.isspmatrix_csr(cov):
raise TypeError('Input covariance matrix (or elements) must be csr_matrices.')
self.nnz += cov.nnz
if input_indx is not None:
if input_indx.shape != self.cov.shape:
raise ValueError('Input array and input index array must be the same size.')
if numpy.unique(input_indx).size != input_indx.size:
raise ValueError('Input list of indices must all be unique!')
self.input_indx = input_indx
else:
self.input_indx = numpy.arange(len(self.cov))
else:
self.dim = 2
if not sparse.isspmatrix_csr(self.cov):
raise TypeError('Input covariance matrix (or elements) must be csr matrices.')
self.nnz = self.cov.nnz
if input_indx is not None:
raise ValueError('Input indices only allowed when allocating multiple matrices.')
# Set the shape of the full matrix/cube
self._set_shape()
if self.raw_shape is not None and numpy.prod(self.raw_shape) != self.shape[0]:
raise ValueError('Product of raw shape must match the covariance axis length.')
# If requested, impose that the input matrix only have values in
# its upper triangle.
if impose_triu:
self._impose_upper_triangle()
# Set the inverse of the covariance matrix
# **NOT IMPLEMENTED YET**
# Set the variance array and the correlation matrix flag
if correlation:
self.to_correlation()
# def __getitem__(self, *args):
# """
# Return the covariance value at a provided 2D or 3D position.
#
# .. todo::
#
# This method is dangerous because it only returns non-zero
# values for the data in the upper triangle of the matrix.
# Fix this!
#
# Args:
# *args (tuple):
# 2 or 3 integers designating the covariance value to
# return. Number of values must match the
# dimensionality of the object.
#
# Returns:
# :obj:`float`: The value of the covariance matrix at the
# designated index.
#
# Raises:
# ValueError:
# Raised if the number of arguments does not match the
# dimensionality of the object.
# """
# indx = tuple(*args)
# if len(indx) != self.dim:
# raise ValueError('Incorrect number of dimensions!')
# if self.dim == 2:
# return self.cov[tuple(sorted(indx))]
# return self.cov[self._grab_true_index(indx[2])][tuple(sorted(indx[:2]))]
[docs] @classmethod
def from_samples(cls, samples, cov_tol=None, rho_tol=None):
r"""
Define a covariance object using descrete samples.
The covariance is generated using `numpy.cov`_ for a set of
discretely sampled data for an :math:`N`-dimensional
parameter space.
Args:
samples (`numpy.ndarray`_):
Array with samples drawn from an
:math:`N`-dimensional parameter space. The shape of
the input array must be :math:`N_{\rm par}\times
N_{\rm samples}`.
cov_tol (:obj:`float`, optional):
Any covariance value less than this is assumed to be
equivalent to (and set to) 0.
rho_tol (:obj:`float`, optional):
Any correlation coefficient less than this is assumed
to be equivalent to (and set to) 0.
Returns:
:class:`Covariance`: An :math:`N_{\rm par}\times N_{\rm
par}` covariance matrix built using the provided samples.
Raises:
ValueError:
Raised if input array is not 2D or if the number of
samples (length of the second axis) is less than 2.
"""
if len(samples.shape) != 2:
raise ValueError('Input samples for covariance matrix must be a 2D array!')
if samples.shape[1] < 2:
raise ValueError('Fewer than two samples provided!')
return Covariance.from_array(numpy.cov(samples), cov_tol=cov_tol, rho_tol=rho_tol)
# TODO: Currently only works with a single covariance matrix
[docs] @classmethod
def from_array(cls, covar, cov_tol=None, rho_tol=None, raw_shape=None):
r"""
Define a covariance object using a dense array.
Args:
covar (array-like):
Array with the covariance data. The shape of the
array must be square. Input can be any object that
can be converted to a dense array using the object
method ``toarray`` or using ``numpy.atleast_2d``.
cov_tol (:obj:`float`, optional):
Any covariance value less than this is assumed to be
equivalent to (and set to) 0.
rho_tol (:obj:`float`, optional):
Any correlation coefficient less than this is assumed
to be equivalent to (and set to) 0.
raw_shape (:obj:`tuple`, optional):
The covariance data is for a higher dimensional array
with this shape. For example, if the covariance data
is for a 2D image with shape ``(nx,ny)`` -- the shape
of the covariance array is be ``(nx*ny, nx*ny)`` --
provide ``raw_shape=(nx,ny)``. This is primarily used
for reading and writing; see also
:func:`transpose_raw_shape`. If None, any higher
dimensionality is ignored. When the
:class:`Covariance` object contains multiple
covariance arrays, this raw shape should be
applicable to *all* of them.
Returns:
:class:`Covariance`: The covariance matrix built using
the provided array.
Raises:
ValueError:
Raised if ``covar`` could not be converted to a dense
array.
"""
try:
_covar = covar.toarray()
except:
_covar = numpy.atleast_2d(covar)
if not isinstance(_covar, numpy.ndarray) or _covar.ndim != 2:
raise ValueError('Could not convert input covariance data into a 2D dense array.')
n = _covar.shape[0]
if rho_tol is not None:
variance = numpy.diag(_covar)
correlation = _covar / numpy.ma.sqrt(variance[:,None]*variance[None,:])
correlation[numpy.ma.absolute(correlation) < rho_tol] = 0.0
_covar = correlation.filled(0.0) \
* numpy.ma.sqrt(variance[:,None]*variance[None,:]).filled(0.0)
if cov_tol is not None:
_covar[_covar < cov_tol] = 0.0
indx = _covar > 0.0
i, j = numpy.meshgrid(numpy.arange(n), numpy.arange(n), indexing='ij')
return cls(sparse.coo_matrix((_covar[indx].ravel(),
(i[indx].ravel(), j[indx].ravel())),
shape=(n,n)).tocsr(), impose_triu=True, raw_shape=raw_shape)
[docs] @classmethod
def from_fits(cls, source, ivar_ext='IVAR', transpose_ivar=False, covar_ext='CORREL',
impose_triu=False, correlation=False, quiet=False):
r"""
Read covariance data from a fits file.
This read operation matches the data saved to a fits file
using :func:`write`. The class can read covariance data
written by other programs *as long as they have a
commensurate format*. See the description of the
:func:`write` method.
If the extension names and column names are correct,
:class:`Covariance` can read fits files that were not
produced explicitly by this method. This is useful for MaNGA
DAP products that include the covariance data as one
extension among others. The methods :func:`output_hdus` and
:func:`coordinate_data` are provided to produce the data that
can be placed in any fits file.
The method determines if the output data were reshaped by
checking the number of columns in the binary table.
When the covariance data are for a higher dimensional array,
the memory order of the higher dimensional array
(particularly whether it's constructed row- or column-major)
is important to ensuring the loaded data is correct. This is
why we have chosen the specific format for the binary table
used to store the covariance data. Regardless of whether the
data was written by a row-major or column-major language, the
format should be such that this class can properly read and
recover the covariance matrix. However, in some cases, you
still may need to transpose the inverse variance data; see
``transpose_ivar``.
Args:
source (:obj:`str`, `astropy.io.fits.HDUList`_):
Initialize the object using an
`astropy.io.fits.HDUList`_ object or path to a fits
file.
ivar_ext (:obj:`str`, optional):
If reading the data from ``source``, this is the name
of the extension with the inverse variance data.
Default is ``'IVAR'``. If None, the variance is taken
as unity.
transpose_ivar (:obj:`bool`, optional):
Flag to transpose the inverse variance data before
rescaling the correlation matrix. Should only be
necessary in some cases when the covariance data was
written by a method other than :func:`write`.
covar_ext (:obj:`str`, optional):
If reading the data from ``source``, this is the name
of the extension with covariance data. Default is
``'CORREL'``.
impose_triu (:obj:`bool`, optional):
Flag to force the `scipy.sparse.csr_matrix`_ object
to only be the upper triangle of the covariance
matrix. The covariance matrix is symmetric such that
:math:`C_{ij} = C_{ji}`, so it's not necessary to
keep both values. This flag will force a call to
`scipy.sparse.triu`_ when setting the covariance
matrix. Otherwise, the input matrix is *assumed* to
only have the upper triangle of numbers.
correlation (:obj:`bool`, optional):
Return the matrix as a correlation matrix. Default
(False) is to use the data (always saved in
correlation format; see :func:`write`) to construct
the covariance matrix.
quiet (:obj:`bool`, optional):
Suppress terminal output.
"""
# Open the provided source, if it hasn't been yet
hdu = source if isinstance(source, fits.HDUList) else fits.open(source)
# Read a coordinate data
shape = eval(hdu[covar_ext].header['COVSHAPE'])
raw_shape = eval(hdu[covar_ext].header['COVRWSHP']) \
if 'COVRWSHP' in hdu[covar_ext].header else None
dim = len(shape)
reshape = len(hdu[covar_ext].columns.names) in [ 5, 6 ]
if reshape:
i_c1, i_c2, j_c1, j_c2, rhoij = [ hdu[covar_ext].data[ext].copy() for ext in
[ 'INDXI_C1', 'INDXI_C2', 'INDXJ_C1', 'INDXJ_C2',
'RHOIJ' ] ]
if raw_shape is None:
raw_shape = Covariance.square_shape(shape[0])
i = numpy.ravel_multi_index((i_c1, i_c2), raw_shape)
j = numpy.ravel_multi_index((j_c1, j_c2), raw_shape)
# Make sure the data are only in the upper triangle
indx = j < i
i[indx], j[indx] = j[indx], i[indx]
else:
i, j, rhoij = [hdu[covar_ext].data[ext] for ext in ['INDXI', 'INDXJ', 'RHOIJ']]
# Number of non-zero elements
nnz = len(rhoij)
ivar = None if ivar_ext is None else hdu[ivar_ext].data
# Set correlation data
if dim == 2:
if ivar is not None:
if transpose_ivar:
ivar = ivar.T
if reshape:
ivar = ivar.ravel()
var = numpy.ones(shape[1:], dtype=float) if ivar is None \
else numpy.ma.power(ivar, -1).filled(0.0)
cij = rhoij * numpy.sqrt( var[i]*var[j] )
cov = sparse.coo_matrix((cij, (i, j)), shape=shape).tocsr()
input_indx = None
else:
k = hdu[covar_ext].data['INDXK'].copy()
input_indx = numpy.unique(k)
if len(input_indx) > shape[-1]:
raise ValueError('Number of unique channel indices is greater than provided shape.')
if len(input_indx) < shape[-1]:
warnings.warn('Fewer unique channels than all available.')
if ivar is not None:
if transpose_ivar:
ivar = ivar.tranpose(1,0,2)
if reshape:
ivar = ivar.reshape(-1, shape[-1])
var = numpy.ones(shape[1:], dtype=float) if ivar is None \
else numpy.ma.power(ivar, -1).filled(0.0)
cov = numpy.empty(shape[-1], dtype=sparse.csr_matrix)
for ii, uk in enumerate(input_indx):
indx = k == uk
cij = rhoij[indx] * numpy.sqrt( var[i[indx],ii]*var[j[indx],ii] )
cov[ii] = sparse.coo_matrix((cij, (i[indx], j[indx])), shape=shape[:-1]).tocsr()
# Report
# TODO: Convert report to use logging
if not quiet:
print('Read covariance cube:')
print(' output type: {0}'.format('Correlation' \
if correlation else 'Covariance'))
print(' dimensions: {0}'.format(dim))
print(' shape: {0}'.format(shape))
if dim == 3:
print(' pseudo-indices: ', input_indx)
print(' non-zero values: {0}'.format(nnz))
return cls(cov, input_indx=input_indx, correlation=correlation, raw_shape=raw_shape)
[docs] @classmethod
def from_matrix_multiplication(cls, T, Sigma):
r"""
Construct the covariance matrix that results from a matrix
multiplication.
The matrix multiplication should be of the form:
.. math::
{\mathbf T} \times {\mathbf X} = {\mathbf Y}
where :math:`{\mathbf T}` is a transfer matrix of size :math:`N_y\times
N_x`, :math:`{\mathbf X}` is a vector of size :math:`N_x`, and
:math:`{\mathbf Y}` is the vector of length :math:`{N_y}` that results
from the multiplication.
The covariance matrix is then
.. math::
{\mathbf C} = {\mathbf T} \times {\mathbf \Sigma} \times
{\mathbf T}^{\rm T},
where :math:`{\mathbf \Sigma}` is the covariance matrix for the elements
of :math:`{\mathbf X}`. If `Sigma` is provided as a vector of length
:math:`N_x`, it is assumed that the elements of :math:`{\mathbf X}` are
independent and the provided vector gives the *variance* in each
element; i.e., the provided data represent the diagonal of
:math:`{\mathbf \Sigma}`.
Args:
T (`scipy.sparse.csr_matrix`_, `numpy.ndarray`_):
Transfer matrix. See above.
Sigma (`scipy.sparse.csr_matrix`_, `numpy.ndarray`_):
Covariance matrix. See above.
"""
if len(T.shape) != 2:
raise ValueError('Input transfer matrix must be two-dimensional.')
nx = T.shape[1]
if Sigma.shape != (nx,nx) and Sigma.shape != (nx,):
raise ValueError('Shape of input variance matrix must be either '
'({0},{0}) or ({0},).'.format(nx))
# If it isn't already, convert T to a csr_matrix
_T = T if isinstance(T, sparse.csr_matrix) else sparse.csr_matrix(T)
# Set the covariance matrix in X
_Sigma = sparse.coo_matrix( (Sigma, (numpy.arange(nx),numpy.arange(nx))),
shape=(nx,nx)).tocsr() if len(Sigma.shape) == 1 \
else (Sigma if isinstance(Sigma, sparse.csr_matrix) \
else sparse.csr_matrix(Sigma))
# Construct the covariance matrix
return cls(sparse.triu(_T.dot(_Sigma.dot(_T.transpose()))).tocsr())
[docs] @classmethod
def from_variance(cls, variance, correlation=False):
r"""
Construct a diagonal covariance matrix using the provided variance.
Args:
variance (`numpy.ndarray`_):
The variance vector.
correlation (:obj:`bool`, optional):
Upon instantiation, convert the :class:`Covariance`
object to a correlation matrix.
"""
return cls(sparse.csr_matrix(numpy.diagflat(variance)), correlation=correlation)
[docs] def _grab_true_index(self, inp):
"""
In the case of a 3D array, return the true array-element
index given the pseudo index.
.. todo::
Search operation is inefficient. Apparently a better option
is a feature request for numpy 2.0
Args:
inp (:obj:`int`):
Requested index to convert to the real index in the
stored array of covariance matrices.
Returns:
:obj:`int`: The index in the stored array of the
requested covariance matrix.
"""
if self.input_indx is not None:
i = numpy.where(self.input_indx == inp)[0]
if i.size != 1:
raise IndexError('Invalid index: {0}!'.format(inp))
return i[0]
return inp
[docs] def _set_shape(self):
"""Set the 2D or 3D shape of the covariance matrix."""
self.shape = self.cov.shape
if self.dim == 2:
return
self.shape = self.cov.ravel()[0].shape + self.shape
[docs] def _impose_upper_triangle(self):
"""
Force :attr:`cov` to only contain non-zero elements in its
upper triangle.
"""
if self.dim == 2:
self.cov = sparse.triu(self.cov).tocsr()
self.nnz = self.cov.nnz
return
self.nnz = 0
for i in range(self.shape[-1]):
self.cov[i] = sparse.triu(self.cov[i]).tocsr()
self.nnz += self.cov[i].nnz
[docs] def full(self, channel=None):
r"""
Return a `scipy.sparse.csr_matrix`_ object with both its
upper and lower triangle filled, ensuring that they are
symmetric.
This method is essentially equivalent to :func:`toarray`
except that it returns a sparse array and can only be used
for one channel at a time.
Args:
channel (:obj:`int`, optional):
The pseudo-index of the covariance matrix to return.
Returns:
`scipy.sparse.csr_matrix`_: The sparse matrix with both
the upper and lower triangles filled (with symmetric
information).
Raises:
ValueError:
Raised if the object is 3D and *channel* is not
provided.
"""
if self.dim == 2:
a = self.cov
else:
if channel is None:
raise ValueError('Must define channel! Use channel=...')
a = self.cov[self._grab_true_index(channel)]
return (sparse.triu(a) + sparse.triu(a,1).T)
[docs] @staticmethod
def square_shape(size):
"""
Determine the shape of a 2D square array resulting from
reshaping a vector.
Args:
size (:obj:`int`):
Size of the vector.
Returns:
:obj:`int`: Length of one axis of the square array.
"""
# Get the size of the square image (and make sure it's square)
n = numpy.floor(numpy.sqrt(size)).astype(int)
if n*n != size:
raise ValueError('{0} is not the square of an integer!'.format(size))
return (n,n)
[docs] def transpose_raw_shape(self):
"""
Reorder the covariance array to account for a transpose in
the ravel ordering of the source array.
For a higher dimensional source array (see
:attr:`raw_shape`), the indices in the covariance matrix
relevant to source array can be found using
`numpy.ravel_multi_index`_ (or `numpy.unravel_index`_ for
vice versa). However, if the source array is transposed,
these operations will be invalid.
This method constructs a new `Covariance` object from the
existing data but with the indices rearranged for the
transposed source array.
.. warning::
If :attr:`raw_shape` it is not defined, a warning is
issued and this method simply returns a copy of the
current covariance data.
"""
if self.raw_shape is None:
warnings.warn('Covariance array raw shape undefined. Returning a copy.')
return self.copy()
raw_shape_t = self.raw_shape[::-1]
if self.dim == 2:
# Get the reshaped coordinate data
i_c1, i_c2, j_c1, j_c2, rhoij, var = self.coordinate_data(reshape=True)
# Get the current covariance data. TODO: Allow coordinate
# data to return covariance data instead of it always being
# correlation data.
i = numpy.ravel_multi_index((i_c1, i_c2), self.raw_shape)
j = numpy.ravel_multi_index((j_c1, j_c2), self.raw_shape)
cij = rhoij * numpy.sqrt(var.flat[i] * var.flat[j])
# Get the new coordinates
i = numpy.ravel_multi_index((i_c2, i_c1), raw_shape_t)
j = numpy.ravel_multi_index((j_c2, j_c1), raw_shape_t)
indx = j < i
i[indx], j[indx] = j[indx], i[indx]
# Return the new covariance matrix
return Covariance(sparse.coo_matrix((cij, (i, j)), shape=self.shape).tocsr(),
raw_shape=raw_shape_t)
# Get the reshaped coordinate data
i_c1, i_c2, j_c1, j_c2, k, rhoij, var = self.coordinate_data(reshape=True)
# Current coordinates
i = numpy.ravel_multi_index((i_c1, i_c2), self.raw_shape)
j = numpy.ravel_multi_index((j_c1, j_c2), self.raw_shape)
# Transposed coordinates
it = numpy.ravel_multi_index((i_c2, i_c1), raw_shape_t)
jt = numpy.ravel_multi_index((j_c2, j_c1), raw_shape_t)
# TODO: Copied from `from_fits`. Put this stuff into a method.
cov = numpy.empty(self.shape[-1], dtype=sparse.csr_matrix)
for ii, uk in enumerate(self.input_indx):
indx = k == uk
cij = rhoij[indx] * numpy.sqrt(var[i[indx],ii] * var[j[indx],ii])
cov[ii] = sparse.coo_matrix((cij, (it[indx], jt[indx])), shape=self.shape[:-1]).tocsr()
return Covariance(cov, input_indx=self.input_indx, raw_shape=raw_shape_t)
[docs] def apply_new_variance(self, var):
"""
Using the same correlation coefficients, return a new
:class:`Covariance` object with the provided variance.
Args:
var (`numpy.ndarray`_):
Variance vector. Must have a length that matches the
shape of this :class:`Covariance` instance.
Returns:
:class:`Covariance`: A covariance matrix with the same
shape and correlation coefficients and this object, but
with different variance.
Raises:
ValueError:
Raised if the length of the variance vector is
incorrect.
"""
if var.shape != self.shape[1:]:
raise ValueError('Provided variance has incorrect shape.')
# Convert to a correlation matrix, if needed
is_correlation = self.is_correlation
if not is_correlation:
self.to_correlation()
if self.dim == 2:
i, j, c = sparse.find(self.cov)
new_cov = sparse.coo_matrix( (c*numpy.sqrt(var[i]*var[j]), (i,j)),
shape=self.shape).tocsr()
else:
new_cov = numpy.empty(self.shape[-1], dtype=sparse.csr_matrix)
for p in range(self.shape[-1]):
i, j, c = sparse.find(self.cov[p])
new_cov[p] = sparse.coo_matrix((c*numpy.sqrt(var[i,p]*var[j,p]), (i,j)),
shape=self.shape[:-1]).tocsr()
# Revert to covariance matrix, if needed
if not is_correlation:
self.revert_correlation()
# Return a new covariance matrix
return Covariance(new_cov, input_indx=self.input_indx, correlation=is_correlation)
[docs] def copy(self):
"""
Return a copy of this Covariance object.
"""
# If the data is saved as a correlation matrix, first revert to
# a covariance matrix
is_correlation=self.is_correlation
if self.is_correlation:
self.revert_correlation()
# Create the new Covariance instance with a copy of the data
cp = Covariance(self.cov.copy(), input_indx=self.input_indx.copy())
# If necessary, convert the data to a correlation matrix
if is_correlation:
self.to_correlation()
cp.to_correlation()
return cp
[docs] def toarray(self, channel=None):
"""
Convert the sparse covariance matrix to a dense array, filled
with zeros where appropriate.
Args:
channel (:obj:`int`, optional):
The pseudo-index of the covariance matrix to plot. If
None and the object is 3D, the full 3D matrix is
returned.
Returns:
`numpy.ndarray`_: Dense array with the full covariance
matrix.
"""
if self.dim == 2 or channel is not None:
return self.full(channel=channel).toarray()
arr = numpy.empty(self.shape, dtype=float)
for k in range(self.shape[-1]):
indx = k if self.input_indx is None else self.input_indx[k]
arr[:,:,k] = self.full(channel=indx).toarray()
return arr
[docs] def show(self, channel=None, zoom=None, ofile=None, log10=False):
"""
Show a covariance/correlation matrix data.
This converts the (selected) covariance matrix to a filled
array and plots the array using `pyplot.imshow`_. If an
output file is provided, the image is redirected to the
designated output file; otherwise, the image is plotted to
the screen.
Args:
channel (:obj:`int`, optional):
The pseudo-index of the covariance matrix to plot.
Required if the covariance object is 3D.
zoom (:obj:`float`, optional):
Factor by which to zoom in on the center of the image
by *removing the other regions of the array*. E.g.
*zoom=2* will show only the central quarter of the
covariance matrix.
ofile (:obj:`str`, optional):
If provided, the array is output to this file instead
of being plotted to the screen.
"""
# Convert the covariance matrix to an array
a = self.toarray(channel)
# Remove some fraction of the array to effectively zoom in on
# the center of the covariance matrix
if zoom is not None:
xs = int(self.shape[0]/2 - self.shape[0]/2/zoom)
xe = xs + int(self.shape[0]/zoom) + 1
ys = int(self.shape[1]/2 - self.shape[1]/2/zoom)
ye = ys + int(self.shape[1]/zoom) + 1
a = a[xs:xe,ys:ye]
# Print the plot to the screen if no output file is provided.
if ofile is None:
im = pyplot.imshow(numpy.ma.log10(a) if log10 else a,
interpolation='nearest', origin='lower')
pyplot.colorbar()
pyplot.show()
return
# Print the plot to the designated output file
fig = pyplot.figure(1)
im = pyplot.imshow(numpy.ma.log10(a) if log10 else a,
interpolation='nearest', origin='lower')
pyplot.colorbar()
fig.canvas.print_figure(ofile)
fig.clear()
# TODO: Should there be any difference between this and `coordinate_data`
[docs] def find(self, channel=None):
"""
Find the non-zero values in the **full** covariance matrix (not
just the upper triangle).
This is a simple wrapper for :func:`full` and
`scipy.sparse.find`_.
Args:
channel (:obj:`int`, optional):
The pseudo-index of the covariance matrix to plot.
Required if the covariance object is 3D.
Returns:
tuple: A tuple of arrays ``i``, ``j``, and ``c``. The
arrays ``i`` and ``j`` contain the index coordinates of
the non-zero values, and ``c`` contains the values
themselves.
"""
return sparse.find(self.full(channel=channel))
# def issingular(self):
# if self.dim > 2:
# nsingular = 0
# for cov in self.cov.ravel():
# print(numpy.linalg.cond(sparse.triu(cov).toarray()+sparse.triu(cov,1).T.toarray()))
# print(1/sys.float_info.epsilon)
# else:
# print(numpy.linalg.cond(sparse.triu(self.cov).toarray()+sparse.triu(self.cov,1).T.toarray()))
# print(1/sys.float_info.epsilon)
# def inverse(self, redo=False):
# if self.inv is not None and not redo:
# return self.inv
# if self.dim > 2:
# self.inv = numpy.empty(self.shape[:self.dim-2], dtype=sparse.csr_matrix)
# print(self.inv.shape)
# print(type(self.inv[0]))
# for cov, inv in zip(self.cov.ravel(), self.inv.ravel()):
# try:
# inv = sparse.triu(sparse.csr_matrix( \
# scipy.linalg.inv(sparse.triu(cov).toarray() \
# + sparse.triu(cov,1).T.toarray()))).tocsr()
# except numpy.linalg.linalg.LinAlgError as e:
# print('Caught error: {0}'.format(e))
#
# else:
# print(self.cov.nnz)
# try:
# self.inv = sparse.triu(sparse.csr_matrix( \
# scipy.linalg.inv(sparse.triu(self.cov).toarray() \
# + sparse.triu(self.cov,1).T.toarray()))).tocsr()
# except numpy.linalg.linalg.LinAlgError as e:
# print('Caught error: {0}'.format(e))
#
# return self.inv
[docs] def coordinate_data(self, reshape=False):
r"""
Construct data arrays with the non-zero covariance components
in coordinate format.
This procedure is primarily used when constructing the data
arrays to write to a fits file.
Regardless of whether or not the current internal data is a
covariance matrix or a correlation matrix, the data is always
returned as a correlation matrix.
Matching the class convention, the returned data only
includes the upper triangle.
If the covariance matrix is 2-dimensional, four columns are
output:
- :math:`i,j`: The row and column indices, respectively,
of the covariance matrix.
- :math:`rho_{ij}`: The correlation coefficient between
pixels :math:`i` and :math:`j`.
- :math:`V_i`: The variance in each pixel; i.e., the
value of :math:`C_{ii} \forall i`. If reshape is True,
this will be output as a two-dimenional array.
If the covariance matrix is 3-dimensional, one additional column
is output:
- :math:`k`: The indices of the channels associated with
each covariance matrix. This is either just the index
of the covariance matrix or the provided pseudo-indices
of each channel.
If using the reshape option, the :math:`i,j` indices are
converted to two columns each providing the indices in the
associated reshaped array with coordinates :math:`c_1,c_2`.
Args:
reshape (:obj:`bool`, optional):
Reshape the output in the case when :math:`i,j` are
actually pixels in a two-dimensional image. The shape
of the image is expected to be square, such that the
shape of the covariance matrix is :math:`N_x\times
N_y`, where :math:`N_x = N_y`. Each of the ``I`` and
``J`` output columns are then split into two columns
according to associate coordinates, such that there
are 8 output columns.
Returns:
:obj:`tuple`: Four to seven `numpy.ndarray`_ objects
depending on if the data has been reshaped into an image.
In detail:
- If 2D and not reshaping: The four returned objects
contain the indices along the first and second
axes, the correlation coefficient, and the variance
vector.
- If 2D and reshaping: The six returned objects are
the unraveled indices in the 2D source array (two
indices each for the two axes of the covariance
matrix, ordered as the first and second indices for
the first covariance index then for the second
covariance index), the correlation coefficient, and
the 2D variance array.
- If 3D and not reshaping: The five returned objects
contain the indices along the first and second
axes, the index of the relevant correlation matrix
(see :attr:`input_index`), the correlation
coefficient, and the variance vector.
- If 3D and reshaping: The seven returned objects are
the unraveled indices in the 2D source array (two
indices each for the two axes of the covariance
matrix, ordered as the first and second indices for
the first covariance index then for the second
covariance index), the index of the relevant
correlation matrix (see :attr:`input_index`), the
correlation coefficient, and the 2D variance array.
"""
# Only ever print correlation matrices
is_correlation = self.is_correlation
if not is_correlation:
self.to_correlation()
if reshape:
# If reshaping, get the new shape; assume the 2D array is
# square if raw_shape is None.
new_shape = Covariance.square_shape(self.shape[0]) \
if self.raw_shape is None else self.raw_shape
# Only one covariance matrix
if self.dim == 2:
# Get the data
i, j, rhoij = sparse.find(self.cov)
# If object was originally a covariance matrix, revert it back
if not is_correlation:
self.revert_correlation()
# Return the data
if not reshape:
# Returns four arrays
return i, j, rhoij, self.var.copy()
# Returns six arrays
i_c1, i_c2 = numpy.unravel_index(i, new_shape)
j_c1, j_c2 = numpy.unravel_index(j, new_shape)
return i_c1, i_c2, j_c1, j_c2, rhoij, self.var.reshape(new_shape).copy()
# More than one covariance matrix
i = numpy.empty(self.nnz, dtype=int)
j = numpy.empty(self.nnz, dtype=int)
k = numpy.zeros(self.nnz, dtype=int)
rhoij = numpy.empty(self.nnz, dtype=float)
ii = 0
for kk in range(self.shape[-1]):
nnz = self.cov[kk].nnz
k[ii:ii+nnz] = kk if self.input_indx is None else self.input_indx[kk]
i[ii:ii+nnz], j[ii:ii+nnz], rhoij[ii:ii+nnz] = sparse.find(self.cov[kk])
ii += nnz
# If object was originally a covariance matrix, revert it back
if not is_correlation:
self.revert_correlation()
# Check if any of the covariance matrices had all 0 values. If so, add
# values just to maintain the shape of the result when reconstructed
# from this output.
missing_k = numpy.setdiff1d(numpy.arange(self.shape[-1])
if self.input_indx is None else self.input_indx,
numpy.unique(k))
if len(missing_k) > 0:
for _k in missing_k:
i = numpy.append(i, [0])
j = numpy.append(j, [0])
k = numpy.append(k, _k)
rhoij = numpy.append(rhoij, [0.])
# Return the data
if not reshape:
# Returns five arrays
return i, j, k, rhoij, self.var.copy()
# Returns seven arrays
i_c1, i_c2 = numpy.unravel_index(i, new_shape)
j_c1, j_c2 = numpy.unravel_index(j, new_shape)
return i_c1, i_c2, j_c1, j_c2, k, rhoij, self.var.reshape(*new_shape, -1).copy()
[docs] def output_hdus(self, reshape=False, hdr=None):
r"""
Construct the output HDUs and header that contain the covariance
data.
Args:
reshape (:obj:`bool`, optional):
Reshape the output in the case when :math:`i,j` are
actually pixels in a two-dimensional image. The shape
of the image is expected to be square, such that the
shape of the covariance matrix is :math:`N_x\times
N_y`, where :math:`N_x = N_y`. Each of the ``I`` and
``J`` output columns are then split into two columns
according to associate coordinates, such that there
are 8 output columns.
hdr (`astropy.io.fits.Header`_, optional):
`astropy.io.fits.Header`_ instance to which to add
covariance keywords. If None, a new
`astropy.io.fits.Header`_ instance is returned.
Returns:
tuple: Returns three objects:
- The header for the primary HDU.
- An `astropy.io.fits.ImageHDU`_ object with the
variance vector.
- An `astropy.io.fits.BinTableHDU`_ object with the
correlation coefficients.
"""
# Use input header or create a minimal one
_hdr = fits.Header() if hdr is None else hdr
# Ensure the input header has the correct type
if not isinstance(_hdr, fits.Header):
raise TypeError('Input header must have type astropy.io.fits.Header.')
_hdr['COVSHAPE'] = (str(self.shape), 'Shape of the correlation matrix')
tbl_hdr = fits.Header()
tbl_hdr['COVSHAPE'] = (str(self.shape), 'Shape of the correlation matrix')
if self.raw_shape is not None:
tbl_hdr['COVRWSHP'] = (str(self.raw_shape), 'Raw shape of the source data')
# Construct the correlation binary table HDU
if self.dim == 2:
if reshape:
i_c1, i_c2, j_c1, j_c2, rhoij, var = self.coordinate_data(reshape=reshape)
covar_hdu = fits.BinTableHDU.from_columns([
fits.Column(name='INDXI_C1', format='1J', array=i_c1),
fits.Column(name='INDXI_C2', format='1J', array=i_c2),
fits.Column(name='INDXJ_C1', format='1J', array=j_c1),
fits.Column(name='INDXJ_C2', format='1J', array=j_c2),
fits.Column(name='RHOIJ', format='1D', array=rhoij)
], name='CORREL', header=tbl_hdr)
else:
i, j, rhoij, var = self.coordinate_data(reshape=reshape)
covar_hdu = fits.BinTableHDU.from_columns([
fits.Column(name='INDXI', format='1J', array=i),
fits.Column(name='INDXJ', format='1J', array=j),
fits.Column(name='RHOIJ', format='1D', array=rhoij)
], name='CORREL', header=tbl_hdr)
else:
if reshape:
i_c1, i_c2, j_c1, j_c2, k, rhoij, var = self.coordinate_data(reshape=reshape)
covar_hdu = fits.BinTableHDU.from_columns([
fits.Column(name='INDXI_C1', format='1J', array=i_c1),
fits.Column(name='INDXI_C2', format='1J', array=i_c2),
fits.Column(name='INDXJ_C1', format='1J', array=j_c1),
fits.Column(name='INDXJ_C2', format='1J', array=j_c2),
fits.Column(name='INDXK', format='1J', array=k),
fits.Column(name='RHOIJ', format='1D', array=rhoij)
], name='CORREL', header=tbl_hdr)
else:
i, j, k, rhoij, var = self.coordinate_data(reshape=reshape)
covar_hdu = fits.BinTableHDU.from_columns([
fits.Column(name='INDXI', format='1J', array=i),
fits.Column(name='INDXJ', format='1J', array=j),
fits.Column(name='INDXK', format='1J', array=k),
fits.Column(name='RHOIJ', format='1D', array=rhoij)
], name='CORREL', header=tbl_hdr)
ivar_hdu = fits.ImageHDU(data=numpy.ma.power(var,-1.).filled(0.0), name='IVAR')
return _hdr, ivar_hdu, covar_hdu
[docs] def write(self, ofile, reshape=False, hdr=None, overwrite=False):
r"""
Write the covariance object to a fits file.
Objects written using this function can be reinstantiated
using :func:`from_fits`.
The covariance matrix (matrices) are stored in "coordinate"
format using fits binary tables; see
`scipy.sparse.coo_matrix`_. The matrix is *always* stored as
a correlation matrix, even if the object is currently in the
state holding the covariance data.
Independent of the dimensionality of the covariance matrix, the
written file has a ``PRIMARY`` extension with the keyword
``COVSHAPE`` that specifies the original dimensions of the
covariance matrix; see :attr:`shape`.
The correlation data are written to the ``CORREL`` extension.
The number of columns in this extension depends on the
provided keywords; see :func:`coordinate_data`. The column
names are:
- ``INDXI``, ``INDXJ``, ``INDXK``: indices in the covariance
matrix. The last index is provided only if the object is
3D. ``INDXI`` and ``INDXJ`` are separated into two
columns if the output is reshaped; these columns are
``INDXI_C1``, ``INDXI_C2``, ``INDXJ_C1``, ``INDXJ_C2``.
- ``RHOIJ``: The non-zero correlation coefficients located
the specified coordinates
The inverse of the variance along the diagonal of the covariance
matrix is output in an ImageHDU in extension ``IVAR``.
For 3D matrices, if pseudo-indices have been provided, these are
used in the ``INDXK`` column; however, the ``COVSHAPE`` header
keyword only gives the shape of the unique indices of the
covariance channels.
Args:
ofile (:obj:`str`):
File name for the output.
reshape (:obj:`bool`, optional):
Reshape the output in the case when :math:`i,j` are
actually pixels in a two-dimensional image. The shape
of the image is expected to be square, such that the
shape of the covariance matrix is :math:`N_x\times
N_y`, where :math:`N_x = N_y`. Each of the ``I`` and
``J`` output columns are then split into two columns
according to associate coordinates.
hdr (`astropy.io.fits.Header`_, optional):
A header object to include in the PRIMARY extension.
The SHAPE keyword will be added/overwritten.
overwrite (:obj:`bool`, optional):
Overwrite any existing file.
Raises:
FileExistsError:
Raised if the output file already exists and overwrite is False.
TypeError:
Raised if the input ``hdr`` does not have the correct
type.
"""
if os.path.isfile(ofile) and not overwrite:
raise FileExistsError(f'{ofile} exists! Use \'overwrite=True\' to overwrite.')
# Construct HDUList and write the fits file
_hdr, ivar_hdu, covar_hdu = self.output_hdus(reshape=reshape, hdr=hdr)
DAPFitsUtil.write(fits.HDUList([fits.PrimaryHDU(header=_hdr), ivar_hdu, covar_hdu]),
ofile, overwrite=overwrite, checksum=True)
[docs] def variance(self, copy=True):
"""
Return the variance vector(s) of the covariance matrix.
Args:
copy (:obj:`bool`, optional):
Return a copy instead of a reference.
"""
if self.var is not None:
return self.var.copy() if copy else self.var
if self.dim == 2:
self.var = numpy.diag(self.cov.toarray()).copy()
return self.var
self.var = numpy.empty(self.shape[1:], dtype=float)
for p in range(self.shape[-1]):
self.var[:,p] = numpy.diag(self.cov[p].toarray()).copy()
return self.var.copy() if copy else self.var
[docs] def to_correlation(self):
r"""
Convert the covariance matrix into a correlation matrix by
dividing each element by the variances.
If the matrix is a correlation matrix already (see
:attr:`is_correlation`), no operations are performed.
Otherwise, the variance vectors are computed, if necessary, and
used to normalize the covariance values.
A :class:`Covariance` object can be reverted from a correlation
matrix using :func:`revert_correlation`.
"""
# Object is already a correlation matrix
if self.is_correlation:
return
# Ensure that the variance has been calculated
self.variance()
self.is_correlation = True
if self.dim == 2:
i, j, c = sparse.find(self.cov)
self.cov = sparse.coo_matrix( (c/numpy.sqrt(self.var[i]*self.var[j]), (i,j)),
shape=self.shape).tocsr()
return
for p in range(self.shape[-1]):
i, j, c = sparse.find(self.cov[p])
self.cov[p] = sparse.coo_matrix( (c/numpy.sqrt(self.var[i,p]*self.var[j,p]), (i,j)),
shape=self.shape[:-1]).tocsr()
[docs] def revert_correlation(self):
r"""
Revert the object from a correlation matrix back to a full
covariance matrix.
This function does nothing if the correlation flag has not been
flipped. The variances must have already been calculated!
"""
if not self.is_correlation:
return
if self.dim == 2:
i, j, c = sparse.find(self.cov)
self.cov = sparse.coo_matrix( (c*numpy.sqrt(self.var[i]*self.var[j]), (i,j)),
shape=self.shape).tocsr()
self.is_correlation = False
return
for p in range(self.shape[-1]):
i, j, c = sparse.find(self.cov[p])
self.cov[p] = sparse.coo_matrix( (c*numpy.sqrt(self.var[i,p]*self.var[j,p]), (i,j)),
shape=self.shape[:-1]).tocsr()
self.is_correlation = False
####################################################################
# TODO: Generalize these to fill and thin or something
[docs] def bin_to_spaxel_covariance(self, bin_indx):
r"""
Propagate the covariance matrix data for the stacked spectra
into the full cube.
This (self) should be the covariance/correlation matrix for the
binned spectra.
.. todo::
- Does this still need to be tested?
- Generalize the nomenclature.
Args:
bin_indx (`numpy.ndarray`_):
The integer vector with the bin associated with each
spectrum in the DRP cube. This is the flattened BINID
array.
Returns:
:class:`Covariance`: Covariance/Correlation matrix for
the spaxelized binned data.
"""
# Total number of spectra
nspec = len(bin_indx)
# Get the unique bins and how to reconstruct the bins from the
# unique set
unique_bins, reconstruct = numpy.unique(bin_indx, return_inverse=True)
# Handle missing bins by changing from the bin id to the bin index.
# Detecting whether or not there is a need for handling missing bins is
# done by comparing the unique array to a full string of all indices up
# to the maximum index in the unique array. **This assumes bins can
# only either be -1, to indicate that the spaxel/bin was not analyzed,
# or a non-negative index number** (i.e., all bin IDs must be >= -1).
# The code handles cases both with and without any ignored bins.
warn = False
if numpy.any(unique_bins < 0):
# Includes ignored bins/spaxels
if unique_bins.size != unique_bins[-1]+2 \
or numpy.any((unique_bins - numpy.arange(-1,unique_bins.size-1)) != 0):
warn = True
unique_bins = numpy.arange(-1,unique_bins.size-1)
else:
# All bins/spaxels have valid bin numbers
if unique_bins.size != unique_bins[-1]+1 \
or numpy.any((unique_bins - numpy.arange(unique_bins.size)) != 0):
warn = True
unique_bins = numpy.arange(unique_bins.size)
if warn and not quiet:
warnings.warn('Bin numbers and indices do not match. Map values are expected to be '
'sorted by their bin number.')
# if unique_bins.size != unique_bins[-1]+2 \
# or numpy.any((unique_bins - numpy.arange(-1,unique_bins.size-1)) != 0):
# warnings.warn('Bin numbers and indices do not match. Spectra are expected '
# 'to be sorted by their bin number.')
# unique_bins = numpy.arange(-1,unique_bins.size-1)
# Get the valid bins
indx = bin_indx > -1
# Expand the covariance matrix by repeating the full matrix
# elements for repeated bin values in different spaxels
nchan = self.shape[-1]
spaxel_covar = numpy.empty(nchan, dtype=sparse.csr_matrix)
for i in range(nchan):
j = self.input_indx[i]
# Input bin and covariance indices
ii = unique_bins[reconstruct[indx]]
ii_i, ii_j = map( lambda x: x.ravel(), numpy.meshgrid(ii, ii) )
# Output spaxel and covariance indices
oi = numpy.arange(nspec)[indx]
oi_i, oi_j = map( lambda x: x.ravel(), numpy.meshgrid(oi, oi) )
_covar = numpy.zeros((nspec, nspec), dtype=float)
_covar[oi_i, oi_j] = self.toarray(channel=j)[ii_i,ii_j]
spaxel_covar[i] = sparse.triu(_covar).tocsr()
return Covariance(spaxel_covar, input_indx=self.input_indx)
[docs] def spaxel_to_bin_covariance(self, bin_indx):
r"""
Opposite of :func:`bin_to_spaxel_covariance`: Revert the covariance
matrix to the covariance between the unique binned spectra.
This (self) should be the covariance/correlation matrix for the
binned spectra redistributed to the size of the original spaxels
map.
.. todo::
- Does this still need to be tested?
.. warning::
**This does NOT propagate the covariance between the spaxels
into the covariance in the binned data.** That operation is
done by, e.g.,
:func:`mangadap.proc.spectralstack.SpectralStack._stack_with_covariance`.
This **only** performs the inverse operation of
:func:`bin_to_spaxel_covariance`.
Args:
bin_indx (numpy.ndarray): The integer vector with the bin
associated with each spectrum in the DRP cube. This is
the flattened BINID array.
Returns:
:class:`Covariance`: Covariance/Correlation matrix for
the stacked spectra.
"""
# Get the unique bins and their first occurrence in the bin list
unique_bins, unique_indx = numpy.unique(bin_indx, return_index=True)
# Handle missing bins by changing from the bin id to the bin index.
# Detecting whether or not there is a need for handling missing bins is
# done by comparing the unique array to a full string of all indices up
# to the maximum index in the unique array. **This assumes bins can
# only either be -1, to indicate that the spaxel/bin was not analyzed,
# or a non-negative index number** (i.e., all bin IDs must be >= -1).
# The code handles cases both with and without any ignored bins.
warn = False
if numpy.any(unique_bins < 0):
# Includes ignored bins/spaxels
if unique_bins.size != unique_bins[-1]+2 \
or numpy.any((unique_bins - numpy.arange(-1,unique_bins.size-1)) != 0):
warn = True
unique_bins = numpy.arange(-1,unique_bins.size-1)
else:
# All bins/spaxels have valid bin numbers
if unique_bins.size != unique_bins[-1]+1 \
or numpy.any((unique_bins - numpy.arange(unique_bins.size)) != 0):
warn = True
unique_bins = numpy.arange(unique_bins.size)
if warn and not quiet:
warnings.warn('Bin numbers and indices do not match. Map values are expected to be '
'sorted by their bin number.')
# if unique_bins.size != unique_bins[-1]+2 \
# or numpy.any((unique_bins - numpy.arange(-1,unique_bins.size-1)) != 0):
# warnings.warn('Bin numbers and indices do not match. Spectra are expected '
# 'to be sorted by their bin number.')
# unique_bins = numpy.arange(-1,unique_bins.size-1)
# Total number of bins
nbins = len(unique_bins)-1
nchan = self.shape[-1]
bin_covar = numpy.empty(nchan, dtype=sparse.csr_matrix)
for i in range(nchan):
j = self.input_indx[i]
# Input spectrum and covariance indices
ii = unique_indx[1:]
ii_i, ii_j = map( lambda x: x.ravel(), numpy.meshgrid(ii, ii) )
# Output spectrum and covariance indices
oi = unique_bins[1:]
oi_i, oi_j = map( lambda x: x.ravel(), numpy.meshgrid(oi, oi) )
_covar = numpy.zeros((nbins, nbins), dtype=float)
_covar[oi_i, oi_j] = self.toarray(channel=j)[ii_i,ii_j]
bin_covar[i] = sparse.triu(_covar).tocsr()
return Covariance(bin_covar, input_indx=self.input_indx)