# Licensed under a 3-clause BSD style license - see LICENSE.rst
# -*- coding: utf-8 -*-
"""
Provides a set of functions to handle and alter the instrumental
resolution.
----
.. include license and copyright
.. include:: ../include/copy.rst
----
.. include common links, assuming primary doc root is up one directory
.. include:: ../include/links.rst
"""
import warnings
from IPython import embed
import numpy
from scipy import interpolate
from scipy.special import erf
import astropy.constants
from .constants import DAPConstants
from .sampling import spectrum_velocity_scale
#from matplotlib import pyplot
[docs]
class VariableGaussianKernel:
r"""
Support class for variable sigma convolution. See
:func:`convolution_variable_sigma`.
Modest modification of M. Cappellari's
:func:`ppxf.ppxf_util.gaussian_filter1d` function.
Args:
sigma (numpy.ndarray):
Coordinate-dependent standard deviation of the Gaussian
kernel in pixels.
minsig (:obj:`float`, optional):
The minimum sigma value allowed.
nsig (:obj:`float`, optional):
The extent of the calculation of the kernel for the
convolution. By default, the kernel is not calculated
beyond 3*sigma.
integral (:obj:`bool`, optional):
Construct the kernel based on the integral of the Gaussian
over the pixel width using the error function, instead of
just sampling the Gaussian function directly.
Raises:
ValueError:
Raised if the input sigma has no length.
Attributes:
sigma (numpy.ndarray):
Coordinate-dependent standard deviation of the Gaussian
kernel.
p (int):
The maximum number of pixels on either side of the kernel
center needed for the convolution.
m (int):
Number of pixels in the kernel (:math:`m = 2p + 1`).
kernel (numpy.ndarray):
The calculated kernel to use for the convolution.
"""
def __init__(self, sigma, minsig=0.01, nsig=3.0, integral=False):
self.n = sigma.size # Vector length
if self.n == 0:
raise ValueError('Input sigma has zero length!')
self.sigma = sigma.clip(min=minsig) # Force sigmas to minimum
self.p = int(numpy.ceil(numpy.amax(nsig*self.sigma))) # Kernel covers up to nsig*sigma
self.m = 2*self.p + 1 # Kernel length
x = numpy.linspace(-self.p, self.p, self.m) # X^2 for kernel
x2 = numpy.square(x)
# Kernel will have size m x n
self.kernel = (erf((x[:,None]+0.5)/numpy.sqrt(2)/self.sigma)
- erf((x[:,None]-0.5)/numpy.sqrt(2)/self.sigma))/2. if integral else \
numpy.exp(-x2[:, None]/(2*numpy.square(self.sigma))) \
/ numpy.sqrt(2*numpy.pi) / self.sigma
self.kernel /= numpy.sum(self.kernel, axis=0)[None, :] # Normalize kernel
[docs]
def _check_shape(self, y, ye=None):
"""
Make sure that the shapes are appropriate for the defined kernel.
"""
if len(y.shape) != 1:
raise ValueError('y must be a 1D array!')
if y.size != self.n:
raise ValueError('y and sigma must have the same shape!')
if ye is not None and (len(ye.shape) != 1 or ye.size != self.n):
raise ValueError('ye length does not must have the correct shape!')
[docs]
def _create_a(self, y):
a = numpy.zeros(self.kernel.shape)
for i in range(self.m):
a[i,self.p:-self.p] = y[i:self.n-self.m+i+1]
return a
[docs]
def convolve(self, y, ye=None):
"""
Convolve the input vector with the variable-width Gaussian.
The input `y` must be a 1D vector with the same length as
:attr:`sigma`. If errors are provided, they must also adhere to
these shape limitations and are propagated to the convolved
vector.
Args:
y (numpy.ndarray):
Vector to convolve
ye (numpy.ndarray, optional):
Error in the vector to convolve
Raises:
ValueError:
Raised if *y* is not a 1D vector, or if the shape of *y*
and *sigma* (and *ye* if provided) are different.
Returns:
`numpy.ndarray`_: The convolved vector. If the errors are
provided, the function returns two vectors, the convolved
vector and its error.
"""
self._check_shape(y, ye=ye)
# Create m copies of the shifted input function
a = self._create_a(y)
if ye is None:
return numpy.sum(a*self.kernel, axis=0)
# Construct the error
ae = self._create_a(ye)
return numpy.sum(a*self.kernel, axis=0), \
numpy.sqrt(numpy.sum(numpy.square(ae*self.kernel), axis=0))
[docs]
def convolution_variable_sigma(y, sigma, ye=None, integral=False, large_sigma=10.):
r"""
Convolve a discretely sampled function :math:`y(x)` with a Gaussian
kernel, :math:`g`, where the standard deviation of the kernel is a
function of :math:`x`, :math:`\sigma(x)`. Nominal calculations can
be performed to propagate the error in the result; these
calculations **do not** include the covariance between the pixels,
which will mean that the calculations likely have significant error!
The convolution is defined as:
.. math::
(y\ast g)(x) &= \int_{-\infty}^{\infty} y(X)\ g(\sigma,x-X)\ dX \\
&= \int_{-\infty}^{\infty} \frac{y(X)}{\sqrt{2\pi}\
\sigma(X)}\ \exp\left(-\frac{(x-X)^2}{2\
\sigma(X)^2}\right) dX .
To minimize edge effects and account for the censoring of the data
(finite range in :math:`x`), the convolution is actually calculated
as a definite integral and normalized as follows:
.. math::
(y\ast g)(x) \sim\frac{
\int_{x-n_\sigma*\sigma(x)}^{x+n_\sigma*\sigma(x)} y(X)\
g(\sigma,x-X)\ dX}{
\int_{x-n_\sigma*\sigma(x)}^{x+n_\sigma*\sigma(x)}
g(\sigma,x-X)\ dX} .
The above is identical to getting the weighted mean of :math:`y` at
each position :math:`x`, where the weights are defined by a Gaussian
kernel centered at :math:`x` with a variable dispersion.
Use of this function requires:
- *y* and *sigma* must be 1D vectors
- *y* and *sigma* must be uniformly sampled on the same grid
- *sigma* must be in pixel units.
.. todo::
This should be deprecated and/or merged with
:class:`VariableGaussianKernel`.
Args:
y (`numpy.ndarray`_):
A uniformly sampled function to convolve.
sigma (`numpy.ndarray`_):
The standard deviation of the Gaussian kernel sampled at
the same positions as ``y``. The units of ``sigma``
*must* be in pixels.
ye (`numpy.ndarray`_, optional):
Errors in the function :math:`y(x)`. Must be the same
shape as ``y``.
integral (:obj:`bool`, optional):
Construct the kernel based on the integral of the
Gaussian over the pixel width using the error function,
instead of just sampling the Gaussian function directly.
See :class:`VariableGaussianKernel`.
large_sigma (:obj:`float`, optional):
A convenience parameter that causes a warning to be
issued if any of the values in ``sigma`` are larger than
this threshold.
Returns:
`numpy.ndarray`_: Arrays with the convolved function
:math:`(y\ast g)(x)` sampled at the same positions as the
input :math:`x` vector and its error. The second array will
be returned as None if the error vector is not provided.
"""
if numpy.any(sigma > large_sigma):
warnings.warn('Gaussian kernel dispersion larger than {0:.1f} pixels!'.format(large_sigma))
return VariableGaussianKernel(sigma, integral=integral).convolve(y, ye=ye)
[docs]
class SpectralResolution:
r"""
Container class for the resolution, :math:`R =
\lambda/\Delta\lambda`, of a spectrum. The primary functionality is
to determine the parameters necessary to match the resolution of one
spectrum to another. It can also be used as a function to
interpolate the spectral resolution at a given wavelenth.
Args:
wave (numpy.ndarray):
A 1D vector with the wavelength in angstroms. The sampling
may be either in linear steps of wavelength or
:math:`\log_{10}` steps.
sres (numpy.ndarray):
A 1D vector with the spectral resolution, :math:`R`, sampled
at the positions of the provided wavelength vector.
log10 (:obj:`bool`, optional):
Flag that the spectrum has been binned logarithmically (base
10) in wavelength
interp_ext (:obj:`int`, :obj:`tuple`, :obj:`str`, optional):
The value to pass as ``fill_value`` to the interpolator,
which defines its behavior when attempting to sample the
spectral resolution beyond where it is defined. See
`scipy.interpolate.interp1d`_.
bounds_error (:obj:`bool`, optional):
Force `scipy.interpolate.interp1d`_ to raise an exception
if the interpolation is outside the bounds of the defined
spectral resolution function.
Raises:
ValueError: Raised if *wave* is not a 1D vector or if *wave* and
*sres* do not have the same shape.
Attributes:
interpolator (`scipy.interpolate.interp1d`_):
An object used to interpolate the spectral resolution at any
given wavelength. The interpolation is hard-wired to be
*linear* and its extrapolation behavior is defined by
``interp_ext``. The wavelength and resolution vectors are
held by this object for later reference if needed.
log10 (:obj:`bool`):
Flag that the spectrum has been binned logarithmically (base
10) in wavelength
c (:obj:`float`):
The speed of light; provided by `astropy.constants`_.
dv (:obj:`float`):
The velocity step per pixel in km/s. Defined using
:func:`spectrum_velocity_scale` if :attr:`log10` is True;
otherwise set to None.
dw (:obj:`float`):
The wavelength step per pixel in angstroms. Defined as the
wavelength step between the first two pixels if
:attr:`log10` is False; otherwise set to None.
min_sig (:obj:`float`):
Minimum standard deviation allowed for the kernel used to
match two spectral resolutions. See
:func:`GaussianKernelDifference`.
sig_pd (`numpy.ndarray`):
The standard deviation in pixels required to match the
spectral resolution of this object to the resolution defined
by a different :class:`SpectralResolution` object. See
:func:`GaussianKernelDifference`.
sig_mask (`numpy.ndarray`):
A *uint* vector used to identify measurements of
:attr:`sig_pd` that should **not** be used to match the
spectral resolution of this object to the resolution defined
by a different :class:`SpectralResolution` object. See
:func:`GaussianKernelDifference`.
sig_vo (:obj:`float`):
A constant offset of the kernel standard deviation **in
km/s** that has been applied to :attr:`sig_pd`. See
:func:`GaussianKernelDifference`.
.. todo::
- Allow it to determine if the binning is linear or geometric,
then use the *log10* keyword to distinguish between natural
log and :math:`log_{10}` binning.
- Allow for more than one type of line-spread function?
.. warning::
The default behavior of the interpolator is to extrapolate the
input spectral resolution vector when trying to sample from
regions beyond where it is sampled. Use ``interp_ext`` change
this; see `scipy.interpolate.interp1d`_.
"""
def __init__(self, wave, sres, log10=False, interp_ext='extrapolate', bounds_error=False):
# Check the sizes
if len(wave.shape) != 1:
raise ValueError('wave must be a 1D array!')
if wave.shape != sres.shape:
raise ValueError('wave and sres must have the same shape!')
# Use linear interpolation when sampling the spectral resolution
self.interpolator = interpolate.interp1d(wave, sres, fill_value=interp_ext,
bounds_error=bounds_error)
self.log10 = log10
self.c = astropy.constants.c.to('km/s').value
self.dv = spectrum_velocity_scale(wave) if log10 else None
self.dw = None if log10 else wave[1] - wave[0]
# No resolution matching calculated yet
self.min_sig = None
self.sig_pd = None
self.sig_mask = None
self.sig_vo = None
def __call__(self, w):
"""Interpolate the spectral resolution at wavelength *w*."""
return self.interpolator(w)
[docs]
def _finalize_GaussianKernelDifference(self, sig2_pd):
r"""
Given the calculated :math:`\sigma^2_{p,d}`, calculate and save
the attributes :attr:`sig_pd` and :attr:`sig_mask`. See
:func:`GaussianKernelDifference`.
"""
# print('finalize b')
indx = numpy.isclose(sig2_pd, 0.0)
nindx = numpy.invert(indx)
self.sig_pd = sig2_pd.copy()
self.sig_pd[nindx] = sig2_pd[nindx]/numpy.sqrt(numpy.absolute(sig2_pd[nindx]))
self.sig_pd[indx] = 0.0
# self.sig_mask = numpy.array(self.sig_pd < -self.min_sig).astype(numpy.uint)
self.sig_mask = numpy.array(self.sig_pd < self.min_sig).astype(numpy.uint)
# print('finalize e')
[docs]
def _convert_vd2pd(self, sig2_vd):
r"""
Convert from :math:`\sigma^2_{v,d}` to :math:`\sigma^2_{p,d}`.
See :func:`GaussianKernelDifference`.
"""
# print('vd2pd')
return sig2_vd / numpy.square(self.dv) if self.log10 else \
sig2_vd / numpy.square(self.c*self.dw/self.wave())
[docs]
def _convert_pd2vd(self, sig2_pd):
r"""
Convert from :math:`\sigma^2_{p,d}` to :math:`\sigma^2_{v,d}`.
See :func:`GaussianKernelDifference`.
"""
# print('pd2vd')
return sig2_pd * numpy.square(self.dv) if self.log10 else \
sig2_pd * numpy.square(self.c*self.dw/self.wave())
[docs]
def wave(self, copy=False):
"""
Return the wavelength vector; held by :attr:`interpolator`.
"""
# return self.interpolator._data[0]
return self.interpolator.x.copy() if copy else self.interpolator.x
[docs]
def sres(self, copy=False):
"""
Return the resolution vector; held by :attr:`interpolator`.
"""
# return self.interpolator._data[1]
return self.interpolator.y.copy() if copy else self.interpolator.y
[docs]
def instrumental_dispersion(self, w=None):
r"""
Return the instrumental dispersion by converting from :math:`R`
to :math:`\sigma_{v,inst}` according to:
.. math::
\sigma_{v,inst} = \frac{c}{\sqrt{8\ln 2}\ R}.
If w is None, just convert the internal interpolator values.
Otherwise, return the values sampled at w.
"""
if w is None:
return numpy.ma.divide(self.c, DAPConstants.sig2fwhm*self.interpolator.y).filled(0.0)
return numpy.ma.divide(self.c, DAPConstants.sig2fwhm*self.interpolator(w)).filled(0.0)
[docs]
def match(self, new_sres, no_offset=True, min_sig_pix=0.0):
"""
Currently only an alias for :func:`GaussianKernelDifference`.
"""
self.GaussianKernelDifference(new_sres, no_offset=no_offset, min_sig_pix=min_sig_pix)
[docs]
def GaussianKernelDifference(self, new_sres, no_offset=True, min_sig_pix=0.0):
r"""
Determine the parameters of a Gaussian kernel required to
convert the resolution of this object to the resolution of a
different the :class:`SpectralResolution` object, *new_sres*.
The spectral resolution is defined as :math:`R =
\lambda/\Delta\lambda`, where :math:`\Delta\lambda` is the FWHM
of the spectral resolution element. The standard deviation of
the resolution element in angstroms is then
.. math::
\sigma_\lambda = \frac{\lambda}{f R}, \ \ {\rm where} \ \ f
= \frac{{\rm FWHM_\lambda}}{\sigma_\lambda} = \sqrt{8\ln 2}.
Assuming a Gaussian (in angstroms) line-spread function:
.. math::
\sigma^2_{\lambda,2} = \sigma^2_{\lambda,1} +
\sigma^2_{\lambda,d}
such that
.. math::
\sigma^2_{\lambda,d} = \left(\frac{\lambda}{f}\right)^2
(R^{-2}_2 - R^{-2}_1)
is the defining parameter of the Gaussian kernel needed to take
a spectrum of resolution :math:`R_1` to one with a resolution of
:math:`R_2`.
For input to :func:`convolution_variable_sigma`, the
*wavelength-dependent* parameter of the Gaussian kernel is
converted to pixels. This function allows for the input spectra
to be linearly sampled in angstroms or log10(angstroms). For
the former (*log10=False*),
.. math::
\sigma^2_{p,d} = \left(\frac{\lambda}{f\
\delta\lambda}\right)^2 (R^{-2}_2 - R^{-2}_1)
where :math:`\delta\lambda` is the size of the pixel in
angstroms. If the units are log10(angstrom) (*log10=True*), we
approximate the velocity width of each pixel to be :math:`\delta
v = c \ln(10.0) (\log\lambda[1]-\log\lambda[0])`, such that
.. math::
\sigma^2_{p,d} &= \left(\frac{c}{ \delta v \lambda}\right)^2
\sigma^2_{\lambda,d} \\ &= \left(\frac{c}{ f\ \delta
v}\right)^2 (R^{-2}_2 - R^{-2}_1)\ ;
:math:`c` is the speed of light in km/s.
The nominal use of this algorithm assumes :math:`R_1 \geq R_2`.
However, in practice, :func:`convolution_variable_sigma` only
uses a Gaussian kernel up to some minimum value of
:math:`\epsilon_\sigma`; below this, the kernel is assumed to be
a Delta function. Therefore, as long as
.. math::
\sigma_{p,d} \equiv \sigma^2_{p,d}/\sqrt{|\sigma^2_{p,d}|}
\geq -\epsilon_\sigma\ ,
the behavior of :func:`convolution_variable_sigma` should not be
affected. However, in regions with
.. math::
\sigma_{p,d} \equiv \sigma^2_{p,d}/\sqrt{|\sigma^2_{p,d}|}
\leq \epsilon_\sigma\ ,
the behavior of :func:`convolution_variable_sigma` does *not*
produce an accurate convolution!
To deal with spectral regions that do not have
:math:`\sigma_{p,d} \geq \epsilon_\sigma`, there are three
choices:
(**Option 1**) trim the spectral range to only those
spectral regions where the existing resolution is better
than the target resolution up to this limit,
(**Option 2**) match the existing resolution to the target
resolution up to some constant offset that must be accounted
for in subsequent analyses, or
(**Option 3**) allow for a wavelength dependent difference
in the spectral resolution that must be accounted for in
subsequent analyses.
The choice of either Option 1 or 2 is selected by setting
*no_offset* to, respectively, True or False; Option 1 is the
default behavior. Currently, Option 3 is not allowed.
For Option 1, pixels with :math:`\sigma_{p,d} <
\epsilon_\sigma` are masked (*sigma_mask = 1*); however, the
returned values of :math:`\sigma_{p,d}` are left unchanged.
For Option 2, we define
.. math::
\sigma^2_{v,o} = | {\rm min}(0.0, {\rm min}(\sigma^2_{v,d})
- {\rm max}(\epsilon_\sigma \delta v)^2) |
where :math:`\delta v` is constant for the logarithmically
binned spectrum and is wavelength dependent for the linearly
binned spectra; in the latter case, the velocity step is
determined for each pixel::
_wave = self.wave()
dv = self.c * (2.0*(_wave[1:] - _wave[0:-1]) / (_wave[1:] + _wave[0:-1]))
If :math:`\sigma^2_{v,o} > 0.0`, it must be that :math:`{\rm
min}(\sigma^2_{v,d}) < {\rm max}(\epsilon_\sigma \delta v)^2`,
such that an offset should be applied. In that case, the
returned kernel parameters are
.. math::
\sigma^\prime_{v,d} = \sqrt{\sigma^2_{v,d} +
\sigma^2_{v,o}}\ .
with the units converted to pixels using the equations above, no
pixels are masked, and :math:`\sqrt{\sigma^2_{v,o}}` is returned
for the offset. Otherwise, the offset is set to 0.
Args:
new_sres (:class:`SpectralResolution`):
Spectral resolution to match to.
no_offset (:obj:`bool`, optional):
Force :math:`\sigma^2_{v,o} = 0` by masking regions with
:math:`\sigma_{p,d} < \epsilon_\sigma`; i.e., the value
of this arguments selects Option 1 (True) or Option 2
(False).
min_sig_pix (:obj:`float`, optional):
Minimum value of the standard deviation allowed before
assuming the kernel is a Delta function.
"""
# print('kernel difference')
# Save the minimum pixel sigma to allow
self.min_sig = min_sig_pix
# Interpolate the new spectral resolution vector at the wavelengths
# of the input spectral resolution
_wave = self.wave()
_sres = self.sres()
interp_sres = new_sres(_wave)
# Determine the variance (in angstroms) of Gaussian needed to match
# input resolution to the new values
# print('sig2_wd')
sig2_wd = numpy.square(_wave/DAPConstants.sig2fwhm) \
* (1.0/numpy.square(interp_sres) - 1.0/numpy.square(_sres))
# pyplot.plot(_wave, _sres)
# pyplot.plot(_wave, interp_sres)
# pyplot.show()
# print('sig2_vd')
# Convert to km/s
sig2_vd = numpy.square(self.c/_wave) * sig2_wd
# Option 1:
if no_offset:
# Convert the variance to pixel coordinates
sig2_pd = sig2_vd / numpy.square(self.dv) if self.log10 else \
sig2_wd / numpy.square(self.dw)
# No offset applied
self.sig_vo = 0.0
# Option 2:
else:
# 1% fudge so pixel at min_sig is not masked!
fudge = 1.01
# Calculate the velocity step of each pixel
dv = self.c * (2.0*(_wave[1:] - _wave[0:-1]) / (_wave[1:] + _wave[0:-1]))
# Get the needed *velocity* offset (this is the square)
self.sig_vo = fudge*numpy.absolute(min(0.0, numpy.amin(sig2_vd)
- numpy.square(self.min_sig * numpy.amax(dv))))
# Apply it if it's larger than 0
if self.sig_vo > 0:
sig2_vd += self.sig_vo
self.sig_vo = numpy.sqrt(self.sig_vo)
# Convert the variance to pixel coordinates
sig2_pd = self._convert_vd2pd(sig2_vd)
self._finalize_GaussianKernelDifference(sig2_pd)
# print('kernel difference done')
# def ZeroGaussianKernelDifference(self, min_sig_pix=0.0):
# self.min_sig = min_sig_pix
# sig2_pd = numpy.zeros(self.wave().shape, dtype=numpy.float64)
# self._finalize_GaussianKernelDifference(sig2_pd)
[docs]
def offset_GaussianKernelDifference(self, offset):
r"""
If the properties required to match the resolution of one
spectrum to another has already been calculated (see
:func:`GaussianKernelDifference`), this allows for one to apply
an additional offset. The additional offset **must** be in km/s
(not pixels).
The offset is applied in quadrature; however, the offset can be
negative such that one can reduce :attr:`sig_pd`. Once
converted to km/s, the offset is applied by calculating:
.. math::
\sigma^{\prime\ 2}_{v,d} = \sigma^{2}_{v,d} +
\sigma_{off}|\sigma_{off}|\ .
:attr:`sig_vo` is adjusted in the same way, and the change in
:math:`\sigma^{\prime\ 2}_{v,d}` is then propagated to
:attr:`sig_pd` and :attr:`sig_mask`.
Args:
offset (float): Value of the standard deviation in km/s to
add in quadrature to a previously calculated
:attr:`sig_pd`.
Raises:
ValueError: Raised if the kernel properties have not yet
been defined.
"""
if self.min_sig is None or self.sig_pd is None or self.sig_mask is None \
or self.sig_vo is None:
# print('WARNING: No kernel difference yet defined. Assuming 0.')
# self.ZeroGaussianKernelDifference()
raise ValueError('No kernel defined yet. Run GaussianKernelDifference first.')
if numpy.isclose(offset,0.0):
return
off2 = offset*numpy.absolute(offset)
sig2_vo = self.sig_vo*numpy.absolute(self.sig_vo) + off2
self.sig_vo = 0.0 if numpy.isclose(sig2_vo, 0.0) \
else sig2_vo/numpy.sqrt(numpy.absolute(sig2_vo))
sig2_vd = self._convert_pd2vd(self.sig_pd*numpy.absolute(sig_pd)) + off2
self._finalize_GaussianKernelDifference(self._convert_vd2pd(sig2_vd))
[docs]
def adjusted_resolution(self, indx=None):
r"""
Return the resolution that should result from applying
:func:`convolution_variable_sigma` to the spectrum associated
with this spectral resolution object using :attr:`sigma_pd`.
I.e., calculate:
.. math::
R_2 = \left[ \left(\frac{f}{c}\right)^2 \sigma^2_{v,d} +
R^{-2}_1\right]^{-1/2}\ .
Args:
indx (tuple): (**Optional**) Selection tuple used to return
a subset of the full resolution vector.
Returns:
numpy.ndarray: The (full or selected) vector with the
adjusted resolution.
.. todo::
Allow to reset the resolution of this object to the adjusted
resolution and reset the kernel variables to None.
"""
if indx is None:
return 1.0/numpy.sqrt( numpy.square(DAPConstants.sig2fwhm/self.c) \
* self._convert_pd2vd(self.sig_pd*numpy.absolute(self.sig_pd)) \
+ 1.0/numpy.square(self.sres()) )
return 1.0/numpy.sqrt( numpy.square(DAPConstants.sig2fwhm/self.c) \
* (self._convert_pd2vd(self.sig_pd*numpy.absolute(self.sig_pd)))[indx] \
+ 1.0/numpy.square(self.sres()[indx]) )
[docs]
def match_spectral_resolution(wave, flux, sres, new_sres_wave, new_sres, ivar=None, mask=None,
min_sig_pix=0.0, no_offset=True, variable_offset=False, log10=False,
new_log10=False, integral=True, quiet=False):
r"""
Adjust the existing spectral resolution of a spectrum to a **lower**
resolution as best as possible. The primary functionality is in
:class:`SpectralResolution`, which determines the Gaussian kernel
parameters needed to match the resolution, and
:func:`convolve_variable_sigma`, which actually performs the
convolution to match the resolution.
In particular, see
:func:`SpectralResolution.GaussianKernelDifference` for a
description of how the kernel parameters are determined and how
regions where the target resolution is **higher** than the existing
resolution. In this case, one of the options is to adopt an offset
of the resolution (in km/s) that could be corrected for in
subsequent analysis. In this case, setting *variable_offset* to
True allows the offset to be different for all input spectra. If
one expects to combine the spectra, the default behavior should be
used, forcing all the spectra to have a constant offset.
Args:
wave (numpy.ndarray): A 1D or 2D (:math:`N_{\rm spec}\times
N_{\rm pix}`) array with the wavelength in angstroms for a
set of spectra. The sampling may be either in linear steps
of wavelength or :math:`\log_{10}` steps, as set using
*log10*.
flux (numpy.ndarray): A 1D or 2D (:math:`N_{\rm spec}\times
N_{\rm pix}`) array with the flux sampled at the provided
wavelengths.
sres (numpy.ndarray): A 1D or 2D (:math:`N_{\rm spec}\times
N_{\rm pix}`) array with the spectral resolution, :math:`R`,
at the provided wavelengths.
new_sres_wave (numpy.ndarray): A 1D vector with the wavelength
in angstroms at which the new resolution of the input
spectra has been sampled. The sampling may be either in
linear steps of wavelength or :math:`\log_{10}` steps, as
set using *new_log10*.
new_sres (numpy.ndarray): A 1D vector with the new resolution
for the input spectra.
ivar (numpy.ndarray): (**Optional**) A 1D or 2D (:math:`N_{\rm
spec}\times N_{\rm pix}`) array with the inverse variance of
the flux sampled at the provided wavelengths. This vector
is used to estimate the noise in the resolution-matched
spectra.
.. warning::
The accuracy of the errors still remain untested!
mask (numpy.ndarray): (**Optional**) A 1D or 2D (:math:`N_{\rm
spec}\times N_{\rm pix}`) array with a *uint* mask for the
flux sampled at the provided wavelengths.
no_offset (bool): (**Optional**) Force :math:`\sigma^2_{v,o} =
0` by masking regions with :math:`\sigma_{p,d} <
-\epsilon_\sigma`; i.e., the value of this arguments selects
Option 1 (True) or Option 2 (False). See
:func:`SpectralResolution.GaussianKernelDifference`.
min_sig_pix (float): (**Optional**) Minimum value of the
standard deviation in pixels allowed before assuming the
kernel is a Delta function.
variable_offset (bool): (**Optional**) Flag to allow the offset
applied to each spectrum (when the input contains more than
one spectraum) to be tailored to each spectrum. Otherwise
(*variable_offset=False*) the offset is forced to be the
same for all spectra.
log10 (bool): (**Optional**) Flag that the spectrum has been
binned logarithmically (base 10) in wavelength
new_log10 (bool): (**Optional**) Flag that the coordinates of
the new spectral resolution are spectrum as been binned
logarithmically (base 10) in wavelength.
Returns:
numpy.ndarray: Five objects are returned:
- A 1D or 2D (:math:`N_{\rm spec}\times N_{\rm pix}`) array
with the resolution-matched flux sampled at the input
wavelengths.
- A 1D or 2D (:math:`N_{\rm spec}\times N_{\rm pix}`) array
with the spectral resolution, :math:`R`, of the
resolution-matched spectra at the provided wavelengths.
- A 1D vector with any constant offset in resolution **in
km/s** between the targetted value and the result. See
:func:`SpectralResolution.GaussianKernelDifference`.
- A 1D or 2D (:math:`N_{\rm spec}\times N_{\rm pix}`) array
with a *uint* mask for the resolution-matched flux sampled
at the input wavelengths. This is returned regardless of
whether an input mask was provided. Any pixel that had a
resolution that was lower than the target resolution (up
to some tolerance defined by *min_sig_pix*) is returned as
masked.
- A 1D or 2D (:math:`N_{\rm spec}\times N_{\rm pix}`) array
with the inverse variance of the resolution-matched flux
sampled at the input wavelengths. If *ivar*
is not provided, a 'None' returned as the last element
Raises:
ValueError: Raised if:
- the input *wave* array is 2D and the *sres* array is not;
a 1D wavelength array is allowed for a 2D *sres* array but
not vice versa
- the number of spectral pixels in *wave*, *flux*, and
*sres* is not the same
- the shape of the *flux*, *mask* (if provided), and *ivar*
(if provided) are not the same
- the shape of the *new_sres_wave* and *new_sres* arrays
are not the same and/or not 1D
.. todo::
- Add interp_ext != 'extrapolate' option?
- Better way to use warnings?
"""
# Check the dimensionality of wave and sres
wave_matrix = len(wave.shape) == 2
sres_matrix = len(sres.shape) == 2
if wave_matrix and not sres_matrix:
raise ValueError('If input wavelength array is 2D, the spectral resolution array must' \
' also be 2D')
# Check the shapes
if (wave_matrix == sres_matrix and wave.shape != sres.shape) or \
(not wave_matrix and sres_matrix and wave.shape[0] != sres.shape[1]):
raise ValueError('Input spectral resolution and coordinate arrays must have the same' \
' number of spectral channels!')
if (wave_matrix and wave.shape != flux.shape) or \
(not wave_matrix and len(flux.shape) == 2 and wave.shape[0] != flux.shape[1]) or \
(not wave_matrix and len(flux.shape) == 1 and wave.shape != flux.shape):
raise ValueError('Input flux and coordinate arrays must have the same number of' \
' spectral channels!')
if (mask is not None and mask.shape != flux.shape):
raise ValueError('Input flux and mask arrays must have the same shape!')
if (ivar is not None and ivar.shape != flux.shape):
raise ValueError('Input flux and ivar arrays must have the same shape!')
if len(sres.shape) > len(flux.shape):
raise ValueError('Shape of the spectral resolution array must be <= to the flux array.')
if len(new_sres_wave.shape) != 1 or len(new_sres.shape) != 1:
raise ValueError('New spectral resolution and coordinate arrays must be 1D!')
if new_sres_wave.shape != new_sres.shape:
raise ValueError('New spectral resolution and coordinate arrays must have the same shape!')
# Raise a warning if the new_sres vector will have to be
# extrapolated for the input wavelengths
if not quiet and (numpy.amin(wave) < new_sres_wave[0] or numpy.amax(wave) > new_sres_wave[-1]):
warnings.warn('Mapping to the new spectral resolution will require extrapolating the' \
' provided input vectors!')
# Initialize some variables
nspec = 1 if len(flux.shape) == 1 else flux.shape[0]
nsres = 1 if len(sres.shape) == 1 else sres.shape[0]
if sres_matrix and nspec != nsres:
raise ValueError('For 2D matrices, number of spectral resolution vectors must match the ' \
'number of spectra.')
spec_dim = len(flux.shape)
sres_dim = len(sres.shape)
sigma_offset = numpy.zeros(nspec, dtype=numpy.float64)
new_res = SpectralResolution(new_sres_wave, new_sres, log10=new_log10,
interp_ext=(new_sres[0],new_sres[-1]))
# pyplot.plot(new_sres_wave, new_sres)
# pyplot.show()
# exit()
res = numpy.empty(nspec, dtype=object)
# Get the kernel parameters necessary to match all spectra to the
# new resolution
if nsres == 1 and sres_dim == 1:
res[0] = SpectralResolution(wave, sres, log10=log10, interp_ext=(sres[0], sres[-1]))
res[0].match(new_res, no_offset=no_offset, min_sig_pix=min_sig_pix)
sigma_offset[0] = res[0].sig_vo
for i in range(1,nspec):
res[i] = res[0]
# pyplot.plot(wave, res[0].sig_pd)
# pyplot.show()
else:
for i in range(0,nsres):
_wave = wave[i,:].ravel() if wave_matrix else wave
_sres = sres[i,:].ravel() if sres_matrix else sres
res[i] = SpectralResolution(_wave, _sres, log10=log10,
interp_ext=(sres[i,0],sres[i,-1]))
res[i].match(new_res, no_offset=no_offset, min_sig_pix=min_sig_pix)
sigma_offset[i] = res[i].sig_vo
# pyplot.plot(_wave, res[i].sig_pd)
# pyplot.plot(_wave, numpy.sqrt(res[i]._convert_pd2vd(numpy.square(res[i].sig_pd))))
# pyplot.show()
# Force all the offsets to be the same, if requested
if not no_offset and not variable_offset:
common_offset = numpy.max(sigma_offset)
offset_diff = numpy.sqrt( numpy.square(common_offset) - numpy.square(sigma_offset))
for r, o in zip(res,offset_diff):
r.offset_GaussianKernelDifference(o)
# Perform the convolutions
out_flux = flux.copy()
out_ivar = None if ivar is None else numpy.ma.MaskedArray(ivar.copy())
noise = None if ivar is None else numpy.ma.sqrt(1.0/out_ivar)
out_sres = sres.copy()
_mask = numpy.zeros(flux.shape, dtype=numpy.uint) if mask is None else mask
out_mask = _mask.copy()
# print('test div by zero')
if nspec == 1 and spec_dim == 1:
sig_kernel = res[0].sig_pd.copy()
sig_kernel[sig_kernel < min_sig_pix] = 0.0
indx = res[0].sig_pd > min_sig_pix
try:
if ivar is None:
out_flux[indx] = convolution_variable_sigma(flux, sig_kernel,
integral=integral)[indx]
else:
oflux, oivar = convolution_variable_sigma(flux, sig_kernel,
ye=None if ivar is None else noise[indx],
integral=integral)
out_flux[indx] = oflux[indx]
out_ivar[indx] = oivar[indx]
del oflux, oivar
out_sres[indx] = res[0].adjusted_resolution(indx=indx)
out_mask = numpy.array((res[0].sig_mask == 1) | (_mask == 1)).astype(numpy.uint)
except ValueError as e:
warnings.warn('Encountered ValueError: {0} ; continuing but resolution is NOT '
'changed and mask is set.'.format(e))
out_mask = numpy.ones(flux.shape, dtype=numpy.uint)
else:
for i in range(0,nspec):
if not quiet:
print('Matching resolution: {0}/{1}'.format(i+1,nspec), end='\r')
try:
# indx = numpy.where(res[i].sig_pd > min_sig_pix)
sig_kernel = res[i].sig_pd.copy()
sig_kernel[sig_kernel < min_sig_pix] = 0.0
indx = res[i].sig_pd > min_sig_pix
if ivar is None:
out_flux[i,indx] = convolution_variable_sigma(flux[i,:], sig_kernel,
integral=integral)[indx]
else:
oflux, oivar = convolution_variable_sigma(flux[i,:], sig_kernel,
ye=None if ivar is None
else noise[i,:].ravel(),
integral=integral)
out_flux[i,indx] = oflux[indx]
out_ivar[i,indx] = oivar[indx]
del oflux, oivar
out_mask[i,:] = numpy.array((res[i].sig_mask == 1) \
| (_mask[i,:] == 1)).astype(numpy.uint)
if len(out_sres.shape) == 1 and i == 0:
out_sres[indx] = res[i].adjusted_resolution(indx=indx)
else:
out_sres[i,indx] = res[i].adjusted_resolution(indx=indx)
except ValueError as e:
warnings.warn('Encountered ValueError: {0} ; continuing but resolution is NOT '
'changed and mask is set.'.format(e))
out_mask[i,:] = numpy.ones(flux.shape[1], dtype=numpy.uint)
if not quiet:
print('Matching resolution: {0}/{0}'.format(nspec))
# TODO: Add this functionality from the IDL version?
#
# Finally, the code masks a number of pixels at the beginning and
# end of the spectra to remove regions affected by errors in the
# convolution due to the censoring of the data. The number of
# pixels is the FWHM of the largest Gaussian applied in the
# convolution: ceil(sig2fwhm*max(diff_sig_w)/dw). This is currently
# hard-wired and should be tested.
if ivar is not None:
out_ivar = numpy.square(1.0/out_ivar)
# When returning out_ivar, convert it to a normal array
return out_flux, out_sres, sigma_offset, out_mask, numpy.asarray(out_ivar)
return out_flux, out_sres, sigma_offset, out_mask, None