# Licensed under a 3-clause BSD style license - see LICENSE.rst
# -*- coding: utf-8 -*-
"""
A set of functions used to filter arrays.
----
.. include license and copyright
.. include:: ../include/copy.rst
----
.. include common links, assuming primary doc root is up one directory
.. include:: ../include/links.rst
"""
import warnings
from IPython import embed
import numpy
from scipy import interpolate, sparse
# Debug
from matplotlib import pyplot
from matplotlib.ticker import NullFormatter
[docs]def high_pass_filter(flux, dw=0, k=None, Dw=None):
"""
Pulled from FIREFLY's hpf() function and edited on 17 Nov 2016.
Apply a high-pass filter to the input vector.
"""
n = flux.size
_dw = int(dw)
if k is None and Dw is None:
Dw = 100
_k = n//Dw
elif k is not None and Dw is None:
_k = int(k)
Dw = n//_k
elif k is None and Dw is not None:
_k = n//Dw
else:
raise ValueError('Cannot provide both k and Dw.')
# PRINT A REPORT
# Rita's typical inputs for SDSS:
# w = 10 # 10
# windowsize = 20 # 20
# My MaNGA inputs:
# if w == 0 and windowsize == 0:
# print "HPF parameters not specified: choosing default window (stepsize=40)"
# w = 40
# windowsize = 0
h = numpy.fft.fft(flux)
h_filtered = numpy.zeros(n, dtype=complex)
window = numpy.zeros(n)
unwindow = numpy.zeros(n)
window[0] = 1 # keep F0 for normalisation
unwindow[0] = 1
for i in range(_dw):
window[_k+i] = (i+1.0)/_dw
window[n-1-(_k+_dw-i)] = (_dw-i)/_dw
window[_k+_dw:n-(_k+_dw)] = 1
unwindow = 1 - window
unwindow[0] = 1
h_filtered = h * window
un_h_filtered = h * unwindow
res = numpy.real(numpy.fft.ifft(h_filtered))
unres = numpy.real(numpy.fft.ifft(un_h_filtered))
res_out = (1.0+(res-numpy.median(res))/unres) * numpy.median(res)
return res_out, window, res, unres
#def off_diagonal_identity(size, win):
# r"""
# Construct a matrix with ones within a window along the diagonal.
#
# Args:
# size (int) : Size for the square matrix; i.e., :math:`N` for the
# :math:`N\timesN` matrix.
# win (int): Number of ones in each row along the diagonal.
#
# Raises:
# ValueError: Raised if the window is larger than 2*size-1.
#
# """
# if win > 2*size-1:
# raise ValueError('Window too large for matrix size.')
# if win == 2*size-1:
# return numpy.ones((size,size), dtype=int)
# x = numpy.zeros((size,size), dtype=int)#numpy.identity(size).astype(int)
# for i in range(1,(win+1)//2):
# x[:-i,i:] = x[:-i,i:] + numpy.identity(size-i).astype(int)
# x += x.T
# if win % 2 != 1:
# x[:-i-1,i+1:] = x[:-i-1,i+1:] + numpy.identity(size-i-1).astype(int)
# return x + numpy.identity(size).astype(int)
[docs]def off_diagonal_identity(size, win, return_sparse=False):
r"""
Construct a matrix with ones within a window along the diagonal.
Args:
size (int) : Size for the square matrix; i.e., :math:`N` for the
:math:`N\timesN` matrix.
win (int): Number of ones in each row along the diagonal.
Raises:
ValueError: Raised if the window is larger than 2*size-1.
"""
if win > 2*size-1:
raise ValueError('Window too large for matrix size.')
if win == 2*size-1:
return numpy.ones((size,size), dtype=int)
# Indices of diagonal
ii = numpy.arange(size).astype(int)
# Build the upper triangle
i = numpy.empty(0, dtype=int)
j = numpy.empty(0, dtype=int)
for k in range(1,(win+1)//2):
i = numpy.append(i, ii[:size-k])
j = numpy.append(j, ii[k:size])
# Copy to the lower triangle
_i = numpy.append(i,j)
j = numpy.append(j,i)
# Add the diagonal
i = numpy.append(_i, ii)
j = numpy.append(j, ii)
# Accommodate an even window
if win % 2 != 1:
i = numpy.append(i, ii[:size-k-1])
j = numpy.append(j, ii[k+1:size])
# Construct and return the array
if return_sparse:
return sparse.coo_matrix((numpy.ones(len(i), dtype=int),(i,j)), shape=(size,size)).tocsr()
a = numpy.zeros((size,size), dtype=int)
a[i,j] = 1
return a
[docs]def build_smoothing_mask(x, pix_buffer, default=None, mask_x=None):
r"""
Construct a mask for the provided vector that masks the first and
last pix_buffer pixels in the coordinate vector.
The mask is instantiated as fully unmasked, unless a default mask is
provided.
To mask specified ranges in x, provide mask_x with a shape
:math:`N_{\rm mask}\times 2` where each mask is defined by the
starting and ending value of x to exclude.
"""
# Smooth the ratio of the data to the model
if len(x.shape) != 1:
raise ValueError('Input must be a vector.')
npix = x.size
mask = numpy.zeros(npix, dtype=bool) if default is None else default.copy()
if mask.size != npix:
raise ValueError('Provided default has an incorrect length.')
mask[:pix_buffer] = True
mask[-pix_buffer:] = True
if mask_x is None:
return mask
for m in mask_x:
mask |= numpy.logical_and(x > m[0], x < m[1])
return mask
[docs]class BoxcarFilter():
"""
"""
def __init__(self, boxcar, lo=None, hi=None, niter=None, y=None, wgt=None, mask=None,
local_sigma=None):
if boxcar <= 1:
raise ValueError('Boxcar must be greater than 1!')
self.y = None
self.wgt = None
self.input_mask = None
self.output_mask = None
self.boxcar = boxcar
self.lo_rej = lo
self.hi_rej = hi
self.nrej = -1 if niter is None else niter
self.nvec = None
self.npix = None
self.local_sigma = False if local_sigma is None else local_sigma
self.smoothed_wgt = None
self.smoothed_n = None
self.smoothed_y = None
self.sigma_y = None
self.rdx_index = None
if y is not None:
self.smooth(y, wgt=wgt, mask=mask, boxcar=boxcar, lo=lo, hi=hi, niter=niter,
local_sigma=local_sigma)
[docs] def _assign_par(self, boxcar, lo, hi, niter, local_sigma):
if boxcar is not None:
self.boxcar = boxcar
if lo is not None:
self.lo_rej = lo
if hi is not None:
self.hi_rej = hi
if niter is not None:
self.nrej = niter
if local_sigma is not None:
self.local_sigma = local_sigma
[docs] def _check_par(self):
if self.boxcar <= 1:
raise ValueError('Boxcar must be greater than 1!')
if self.nrej is not None and self.nrej > 0 and self.lo_rej is None and self.hi_rej is None:
raise ValueError('Must provide lo or hi if niter provided')
[docs] def _reduce_indices(self):
indx = numpy.array([numpy.arange(self.npix)-self.boxcar//2,
numpy.arange(self.npix)+self.boxcar//2+1])
if self.boxcar % 2 != 1:
indx[:self.npix] += 1
return indx.T.ravel().clip(0,self.npix-1)
[docs] def _apply(self):
self.smoothed_n = numpy.add.reduceat(numpy.invert(self.output_mask), self.rdx_index, axis=1,
dtype=int)[:,::2]
self.smoothed_wgt = numpy.add.reduceat(self.input_wgt, self.rdx_index, axis=1,
dtype=float)[:,::2]
self.smoothed_y = numpy.add.reduceat(self.input_wgt*self.y.filled(0.0), self.rdx_index,
axis=1, dtype=float)[:,::2]
smoothed_y2 = numpy.add.reduceat(self.input_wgt*numpy.square(self.y.filled(0.0)),
self.rdx_index, axis=1, dtype=float)[:,::2]
self.smoothed_y = numpy.ma.divide(self.smoothed_y, self.smoothed_wgt)
self.smoothed_y[self.output_mask | (self.smoothed_n == 0)] = numpy.ma.masked
self.sigma_y = numpy.ma.sqrt((numpy.ma.divide(smoothed_y2, self.smoothed_wgt)
- numpy.square(self.smoothed_y))
* numpy.ma.divide(self.smoothed_n, self.smoothed_n-1)) \
if self.local_sigma else \
numpy.ma.MaskedArray(numpy.array([numpy.std(self.y - self.smoothed_y,
axis=1)]*self.npix).T)
self.output_mask = numpy.ma.getmaskarray(self.smoothed_y) \
| numpy.ma.getmaskarray(self.sigma_y)
self.smoothed_y[self.output_mask | (self.smoothed_n == 0)] = numpy.ma.masked
self.sigma_y[self.output_mask | (self.smoothed_n == 0)] = numpy.ma.masked
return
w,h = pyplot.figaspect(1)
fig = pyplot.figure(figsize=(1.5*w,1.5*h))
minf = numpy.amin( numpy.ma.log10(numpy.append(self.y.compressed(), self.smoothed_y.compressed())) )
maxf = numpy.amax( numpy.ma.log10(numpy.append(self.y.compressed(), self.smoothed_y.compressed())) )
mins = numpy.amin(numpy.ma.log10(self.sigma_y).compressed())
maxs = numpy.amax(numpy.ma.log10(self.sigma_y).compressed())
minr = numpy.amin(numpy.ma.log10(numpy.ma.divide(self.y, self.smoothed_y)).compressed())
maxr = numpy.amax(numpy.ma.log10(numpy.ma.divide(self.y, self.smoothed_y)).compressed())
ax = fig.add_axes([0.05, 0.50, 0.45, 0.45])
ax.xaxis.set_major_formatter(NullFormatter())
ax.imshow(numpy.ma.log10(self.y), origin='lower', interpolation='nearest', vmin=minf,
vmax=maxf, aspect='auto')
ax = fig.add_axes([0.50, 0.50, 0.45, 0.45])
ax.xaxis.set_major_formatter(NullFormatter())
ax.yaxis.set_major_formatter(NullFormatter())
ax.imshow(numpy.ma.log10(self.smoothed_y), origin='lower', interpolation='nearest',
vmin=minf, vmax=maxf, aspect='auto')
ax = fig.add_axes([0.05, 0.05, 0.45, 0.45])
ax.yaxis.set_major_formatter(NullFormatter())
ax.imshow(numpy.ma.log10(numpy.ma.divide(self.y,self.smoothed_y)), origin='lower',
interpolation='nearest', aspect='auto', vmin=minr, vmax=maxr)
ax = fig.add_axes([0.50, 0.05, 0.45, 0.45])
ax.yaxis.set_major_formatter(NullFormatter())
ax.imshow(numpy.ma.log10(self.sigma_y), origin='lower', interpolation='nearest',
aspect='auto', vmin=mins, vmax=maxs)
pyplot.show()
exit()
[docs] def _interpolate(self):
"""
Interpolate the smoothed image across the masked regions.
Leading and trailing masked regions of each row are set to the
first and last unmasked value, respectively.
interpolate both the smoothed data and the sigma
"""
# TODO: Put in a check here in case the full array is masked...
# Boolean arrays with bad pixels (should be the same for both
# smoothed_y and sigma_y)
badpix = numpy.ma.getmaskarray(self.smoothed_y)
# Deal with any vectors that are fully masked
goodvec = numpy.sum(numpy.invert(badpix), axis=1) > 0
ngood = numpy.sum(goodvec)
veci = numpy.arange(self.nvec)[goodvec]
# Find the first and last unflagged pixels in the good vectors
pixcoo = numpy.ma.MaskedArray(numpy.array([numpy.arange(self.npix)]*ngood),
mask=badpix[goodvec,:]).astype(int)
mini = numpy.ma.amin(pixcoo, axis=1)
maxi = numpy.ma.amax(pixcoo, axis=1)
# Copy those to the first and last pixels for each row
self.smoothed_y[veci,0] = numpy.array([self.smoothed_y[i,m] for i,m in zip(veci, mini)])
self.smoothed_y[veci,-1] = numpy.array([self.smoothed_y[i,m] for i,m in zip(veci, maxi)])
self.sigma_y[veci,0] = numpy.array([self.sigma_y[i,m] for i,m in zip(veci, mini)])
self.sigma_y[veci,-1] = numpy.array([self.sigma_y[i,m] for i,m in zip(veci, maxi)])
# Detach badpix from self.smoothed_y
badpix = numpy.ma.getmaskarray(self.smoothed_y).copy()
# Interpolate the smoothed array
x = numpy.ma.MaskedArray(numpy.arange(self.nvec*self.npix), mask=badpix)
interpolator = interpolate.interp1d(x.compressed(), self.smoothed_y.compressed(),
assume_sorted=True, fill_value='extrapolate')
self.smoothed_y.ravel()[badpix.ravel()] = interpolator(x.data[badpix.ravel()])
self.smoothed_y[numpy.invert(goodvec),:] = 0.0
# Interpolate the sigma array
interpolator = interpolate.interp1d(x.compressed(), self.sigma_y.compressed(),
assume_sorted=True, fill_value='extrapolate')
self.sigma_y.ravel()[badpix.ravel()] = interpolator(x.data[badpix.ravel()])
self.sigma_y[numpy.invert(goodvec),:] = 0.0
[docs] def smooth(self, y, wgt=None, mask=None, boxcar=None, lo=None, hi=None, niter=None,
local_sigma=None):
"""
Smooth a vector or array of vectors by a boxcar with specified
rejection.
"""
# Assign and check the binning parameters
self._assign_par(boxcar, lo, hi, niter, local_sigma)
self._check_par()
# Check the input vector/array
if len(y.shape) > 2:
raise ValueError('Can only deal with vectors or matrices.')
# Set the weighting
self.input_wgt = numpy.ones(y.shape, dtype=bool) if wgt is None else wgt
if self.input_wgt.shape != y.shape:
raise ValueError('Weights shape does not match data.')
if isinstance(y, numpy.ma.MaskedArray):
self.input_wgt[numpy.ma.getmaskarray(y)] = 0.0
# Set the mask
self.input_mask = numpy.zeros(y.shape, dtype=bool) if mask is None else mask
if self.input_mask.shape != y.shape:
raise ValueError('Mask shape does not match data.')
if isinstance(y, numpy.ma.MaskedArray):
self.input_mask |= numpy.ma.getmaskarray(y)
# Save the input
provided_vector = len(y.shape) == 1
self.y = numpy.ma.atleast_2d(numpy.ma.MaskedArray(y.copy(), mask=self.input_mask))
self.input_wgt = numpy.atleast_2d(self.input_wgt)
self.input_mask = numpy.atleast_2d(self.input_mask)
self.output_mask = self.input_mask.copy()
self.nvec, self.npix = self.y.shape
# Create the list of reduceat indices
self.rdx_index = self._reduce_indices()
# Get the first iteration of the smoothed vectors
self._apply()
# No rejection iterations requested so return the result
if self.lo_rej is None and self.hi_rej is None:
self._interpolate()
return self.smoothed_y[0,:] if provided_vector else self.smoothed_y
# Iteratively reject outliers
nmasked = numpy.sum(numpy.ma.getmaskarray(self.y))
i=0
while i > -1:
nbad = numpy.sum(self.output_mask)
if self.lo_rej is not None:
self.output_mask = numpy.logical_or(self.output_mask, self.y.data - self.smoothed_y
< (-self.lo_rej*self.sigma_y))
if self.hi_rej is not None:
self.output_mask = numpy.logical_or(self.output_mask, self.y.data - self.smoothed_y
> (self.hi_rej*self.sigma_y))
self.y[self.output_mask] = numpy.ma.masked
self._apply()
nmasked = numpy.sum(self.output_mask) - nbad
# print('Niter: {0}; Nmasked: {1}'.format(i+1, nmasked))
i += 1
if i == self.nrej or nmasked == 0:
break
self._interpolate()
return self.smoothed_y[0,:] if provided_vector else self.smoothed_y
@property
def mask(self):
return self.output_mask.copy()
[docs]def smooth_masked_vector(x, nsmooth):
"""
Smooth a masked vector by a box of size nsmooth.
"""
n = off_diagonal_identity(x.size, nsmooth)*numpy.invert(numpy.ma.getmaskarray(x))[None,:]
nn = numpy.sum(n,axis=1)
return numpy.ma.MaskedArray(numpy.dot(n,x.data), mask=numpy.invert(nn>0) | x.mask)/nn
[docs]def interpolate_masked_vector(y, quiet=True, extrap_with_median=False):
"""
Interpolate over the masked pixels in an input vector using linear
interpolation.
Args:
y (numpy.ma.MaskedArray):
Vector to interpolate.
quiet (:obj:`bool`, optional):
Suppress standard output (default is True).
extrap_with_median (:obj:`bool`, optional):
Instead of linear extrapolation, use the median of the
interpolated vector. Defaults to linear extrapolation.
Returns:
numpy.ndarray: Returns the interpolated, unmasked vector.
"""
x = numpy.arange(y.size)
indx = numpy.ma.getmaskarray(y)
if numpy.sum(numpy.invert(indx)) < 2:
if not quiet:
warnings.warn('Input vector has fewer than 2 unmasked values! Returning zero vector.')
return numpy.zeros(y.size, dtype=y.dtype.name)
interpolator = interpolate.interp1d(x[numpy.invert(indx)], y[numpy.invert(indx)],
fill_value='extrapolate')
_y = y.data.copy()
_y[indx] = interpolator(x[indx])
if extrap_with_median:
med = numpy.median(interpolator.y)
m_indx = (x[indx] < interpolator.x[0]) | (x[indx] > interpolator.x[-1])
_y[indx][m_indx] = med
return _y
[docs]def boxcar_smooth_vector(x, boxcar, mask=None, lo=None, hi=None, niter=None, return_mask=False):
"""
Boxcar smooth an input vector.
- Ignores masked pixels.
- Allows for iterative positive and negative outliers.
- Can return the mask separately from the smoothed vector.
"""
if niter is not None and lo is None and hi is None:
raise ValueError('Must provide lo or hi if niter provided')
_mask = numpy.zeros(x.size, dtype=bool) if mask is None else mask
_x = x if isinstance(x, numpy.ma.MaskedArray) else numpy.ma.MaskedArray(x)
_x[_mask] = numpy.ma.masked
sx = interpolate_masked_vector( smooth_masked_vector(_x, boxcar) )
# pyplot.step(numpy.arange(_x.size), _x.data, where='mid', color='0.3', lw=0.5)
# pyplot.step(numpy.arange(_x.size), _x, where='mid', color='k', lw=0.5)
# pyplot.plot(numpy.arange(_x.size), sx, color='r', lw=1)
# pyplot.show()
if numpy.all([ x is None for x in [ lo, hi, niter ]]):
if return_mask:
return sx, numpy.ma.getmaskarray(_x).copy()
return sx
_niter = -1 if niter is None else niter
nrej = numpy.sum(numpy.ma.getmaskarray(_x))
i=0
while i > -1:
sig = numpy.std((_x - sx).compressed())
mask = numpy.ma.getmaskarray(_x).copy()
if lo is not None:
mask = numpy.logical_or(mask, _x.data - sx < -lo*sig)
if hi is not None:
mask = numpy.logical_or(mask, _x.data - sx > hi*sig)
nrej = numpy.sum(mask) - numpy.sum(_x.mask)
# print('Niter: {0}; Nrej: {1}'.format(i+1, nrej))
_x[mask] = numpy.ma.masked
sx = interpolate_masked_vector( smooth_masked_vector(_x, boxcar) )
# pyplot.step(numpy.arange(_x.size), _x.data, where='mid', color='cyan', lw=0.5)
# pyplot.step(numpy.arange(_x.size), _x, where='mid', color='k', lw=0.5)
# pyplot.plot(numpy.arange(_x.size), sx, color='r', lw=1)
# pyplot.show()
i += 1
if i == _niter or nrej == 0:
break
pyplot.step(numpy.arange(_x.size), _x.data, where='mid', color='cyan', lw=0.5)
pyplot.step(numpy.arange(_x.size), _x, where='mid', color='k', lw=0.5)
pyplot.plot(numpy.arange(_x.size), sx, color='r', lw=1)
pyplot.show()
if return_mask:
return sx, numpy.ma.getmaskarray(_x).copy()
return sx