# Licensed under a 3-clause BSD style license - see LICENSE.rst
# -*- coding: utf-8 -*-
"""
Implements a wrapper class for pPXF.
.. todo::
Allow new iteration mode that iteratively fits until the velocity is
not up against one of the +/- 2000 km/s limits? Could be useful for
poor redshift guesses.
----
.. include license and copyright
.. include:: ../include/copy.rst
----
.. include common links, assuming primary doc root is up one directory
.. include:: ../include/links.rst
"""
import inspect
import warnings
import logging
from IPython import embed
import numpy
#from scipy import interpolate, fftpack
from matplotlib import pyplot
import astropy.constants
from ppxf import ppxf, capfit
from ..par.parset import KeywordParSet
from ..par.artifactdb import ArtifactDB
from ..par.emissionlinedb import EmissionLineDB
from ..util.pixelmask import PixelMask, SpectralPixelMask
from ..util.filter import BoxcarFilter
from ..util.log import log_output
from ..util.sampling import spectrum_velocity_scale, angstroms_per_pixel, Resample
from ..util.resolution import match_spectral_resolution, SpectralResolution
from ..util.constants import DAPConstants
from .spatiallybinnedspectra import SpatiallyBinnedSpectra
from .templatelibrary import TemplateLibraryDef, TemplateLibrary
from .spectralfitting import StellarKinematicsFit
from .util import sample_growth, optimal_scale
[docs]
class PPXFFitPar(KeywordParSet):
r"""
Define a parameter set used by the pPXF fitting method.
The defined parameters are:
.. include:: ../tables/ppxffitpar.rst
"""
def __init__(self, guess_redshift=0., guess_dispersion=100.,
iteration_mode='nonzero_templates', template_library='MILESHC',
matched_resolution=False, pixelmask=None, reject_boxcar=100, velscale_ratio=1,
bias=None, degree=8, mdegree=0, moments=2):
# Use the signature to get the parameters and the default values
sig = inspect.signature(self.__class__)
pars = list(sig.parameters.keys())
defaults = [sig.parameters[key].default for key in pars]
# Remaining definitions done by hand
arr_in_fl = [numpy.ndarray, list, int, float]
in_fl = [int, float]
iter_opt = PPXFFit.iteration_modes()
moment_opt = [2, 4, 6]
values = [guess_redshift, guess_dispersion, iteration_mode, template_library,
matched_resolution, pixelmask, reject_boxcar, velscale_ratio, bias, degree,
mdegree, moments]
options = [None, None, iter_opt, None, None, None, None, None, None, None, None,
moment_opt]
dtypes = [arr_in_fl, arr_in_fl, str, [str, TemplateLibraryDef], bool, PixelMask, int,
int, in_fl, int, int, int ]
descr = ['Initial guess for the redshift (:math:`cz`) of each binned spectrum.',
'Initial guess for the velocity dispersion for each binned spectrum.',
'Iteration mode to use; see :func:`PPXFFit.iteration_modes`.',
'Keyword identifier or definition object used to build the spectral template ' \
'library used during the fit.',
'Flag that the spectral resolution of the templates are (should be) matched ' \
'to the galaxy data.',
'Pixel mask to include during the fitting; see ' \
':func:`~mangadap.util.pixelmask.SpectralPixelMask`.',
'Number of pixels in the boxcar used to determine the local sigma for ' \
'rejecting outliers.',
'The **integer** ratio between the velocity scale of the pixel in the galaxy ' \
'data to that of the template data.',
'`ppxf`_ ``bias`` parameter used to penalize low S/N spectra toward a ' \
'Gaussian LOSVD.',
'`ppxf`_ ``degree`` parameter used to set the order of the additive polynomial ' \
'to include in the fit.',
'`ppxf`_ ``mdegree`` parameter used to set the order of the multiplicative ' \
'polynomial to include in the fit.',
r'`ppxf`_ ``moments`` parameter used to set the number of moments of the ' \
r'LOSVD to fit. The DAP has not been well tested for fits that include ' \
r'any more than :math:`V` and :math:`\sigma`.']
super().__init__(pars, values=values, defaults=defaults, options=options, dtypes=dtypes,
descr=descr)
self.templates = None
[docs]
@classmethod
def from_dict(cls, d):
"""
Instantiate the parameters based on the provided dictionary.
"""
# Copy over the primary keywords
_d = {}
for key in ['guess_redshift', 'guess_dispersion', 'iteration_mode', 'matched_resolution',
'reject_boxcar', 'velscale_ratio', 'bias', 'degree', 'mdegree', 'moments']:
if key in d.keys():
_d[key] = d[key]
artifacts = None if 'artifact_mask' not in d.keys() or d['artifact_mask'] is None \
else ArtifactDB.from_key(d['artifact_mask'])
emission_lines = None if 'emission_line_mask' not in d.keys() \
or d['emission_line_mask'] is None \
else EmissionLineDB.from_key(d['emission_line_mask'])
waverange = None if 'waverange' not in d.keys() or d['waverange'] is None \
else d['waverange']
if not all([a is None for a in [artifacts, emission_lines, waverange]]):
_d['pixelmask'] = SpectralPixelMask(artdb=artifacts, emldb=emission_lines,
waverange=waverange)
if 'templates' in d.keys():
_d['template_library'] = TemplateLibraryDef.from_dict(d['templates'])
# Return the instantiation
return super().from_dict(_d)
[docs]
def fill(self, binned_spectra, guess_vel=None, guess_sig=None, **kwargs):
"""
Use the provided object to "fill" the parameters by constructing the
template library spectra used during the fit and to set the initial
guess kinematics for each spectrum.
kwargs are passed directly to the template library construction
"""
# Fill the guess kinematics
c = astropy.constants.c.to('km/s').value
nbins = binned_spectra.nbins
if isinstance(guess_vel, (list, numpy.ndarray)):
_guess_vel = numpy.asarray(guess_vel)
if _guess_vel.size > 1 and _guess_vel.size != nbins:
raise ValueError('Incorrect number of guess velocities provided; expected '
f'{nbins}, found {_guess_vel.size}.')
self['guess_redshift'] = _guess_vel.copy()/c
elif guess_vel is not None:
self['guess_redshift'] = numpy.full(nbins, guess_vel/c, dtype=float)
else:
self['guess_redshift'] = numpy.zeros(nbins, dtype=float)
if isinstance(guess_sig, (list, numpy.ndarray)):
_guess_sig = numpy.asarray(guess_sig)
if _guess_sig.size > 1 and _guess_sig.size != nbins:
raise ValueError('Incorrect number of guess dispersions provided; expected '
f'{nbins}, found {_guess_sig.size}.')
self['guess_dispersion'] = _guess_sig.copy()
elif guess_sig is not None:
self['guess_dispersion'] = numpy.full(nbins, guess_sig, dtype=float)
else:
self['guess_dispersion'] = numpy.full(nbins, 100., dtype=float)
# Instatiate the template library
if self['template_library'] is not None:
velocity_offset = None if self['guess_redshift'] is None \
else numpy.mean(c * self['guess_redshift'])
self.templates = TemplateLibrary(self['template_library'],
velocity_offset=velocity_offset,
cube=binned_spectra.cube,
match_resolution=self['matched_resolution'],
velscale_ratio=self['velscale_ratio'],
**kwargs)
[docs]
class PPXFFitResult:
"""
A basic utility to save the critical parts of the pPXF model.
"""
def __init__(self, degree, mdegree, start, end, tpl_to_use, ppxf_fit, ntpl,
weight_errors=False, component_fits=False):
self.start = start
self.end = end
self.npixtot = end-start
self.tpl_to_use = tpl_to_use.copy()
self.gpm = None if ppxf_fit is None else ppxf_fit.goodpixels.copy()
self.bestfit = None if ppxf_fit is None else ppxf_fit.bestfit.copy()
self.tplwgt = None if ppxf_fit is None else ppxf_fit.weights[0:ntpl].copy()
self.tplwgterr = None
# Set status
if ppxf_fit is None:
self.status = None
elif numpy.sum(self.tplwgt) == 0:
warnings.warn('No non-zero templates! Setting status as failed.')
self.status = -2 # All template weights are zero!
else:
self.status = ppxf_fit.status
if ppxf_fit is not None and weight_errors:
design_matrix = ppxf_fit.matrix[ppxf_fit.goodpixels,:] \
/ ppxf_fit.noise[ppxf_fit.goodpixels, None]
cov, _tplwgterr = capfit.cov_err(design_matrix)
self.tplwgterr = _tplwgterr[degree+1:]
# covariance_matrix = numpy.linalg.inv(numpy.matmul(design_matrix.T, design_matrix))
# self.tplwgterr = numpy.sqrt(numpy.diag(covariance_matrix))[degree+1:]
self.addcoef = None if ppxf_fit is None or degree < 0 else ppxf_fit.polyweights.copy()
self.multcoef = None if ppxf_fit is None or mdegree <= 0 else ppxf_fit.mpolyweights.copy()
self.kin = None if ppxf_fit is None else ppxf_fit.sol.copy()
self.kinerr = None if ppxf_fit is None else ppxf_fit.error.copy()
# self.robust_rchi2 = None if ppxf_fit is None else ppxf_fit.chi2
self.bestfit_comp = None
if ppxf_fit is None or not component_fits:
return
# Construct the model for each component
component_weights = ppxf_fit.weights[None,:]*numpy.array([
numpy.array(ppxf_fit.component == i).astype(int)
for i in range(ppxf_fit.ncomp)])
self.bestfit_comp = numpy.dot(component_weights, ppxf_fit.matrix.T[degree+1:,:])
# Include the multiplicative polynomial
if self.multcoef is not None:
mpoly = numpy.polynomial.legendre.legval(numpy.linspace(-1, 1, end-start),
numpy.append(1.0,self.multcoef))
self.bestfit_comp[:,start:end] *= mpoly[None,:]
[docs]
def empty_fit(self):
return self.status is None
[docs]
def reached_maxiter(self):
return self.status == 5
[docs]
def fit_failed(self):
return self.empty_fit() or self.status <= 0
[docs]
class PPXFModel:
"""
Class that reconstructs a pPXF model given a set of templates and
the model parameters.
This pulls functions from M. Cappellari's ppxf class, version 6.7.0.
Input common to ppxf() input should have the same format as pPXF
input.
The only reason the galaxy spectrum (or spectra) are provided is to
set if the data are to fit as reflection-symmetric and how many
spectral pixels there are.
"""
def __init__(self, templates, galaxy, velscale, velscale_ratio=None, vsyst=None, sigma_diff=0,
moments=2, degree=4, mdegree=0, component=0, gas_component=None, lam=None,
reddening=None, gas_reddening=None, reddening_func=None, sky=None,
templates_rfft=None, trig=False, quiet=False):
# Suppress terminal output
self.quiet = quiet
# Dimensions of spectrum (or spectra) to fit
self.nspec = galaxy.ndim # nspec=2 for reflection-symmetric LOSVD
self.npix = galaxy.shape[0] # total pixels in the galaxy spectrum
if not numpy.all(numpy.isfinite(galaxy)):
raise ValueError('GALAXY data must be finite')
if self.nspec == 2 and vsyst is None:
raise ValueError('VSYST must be defined for two-sided fitting')
# Set templates to use
self.templates = templates.reshape(templates.shape[0], -1)
self.npix_temp, self.ntemp = self.templates.shape
# Pixel sampling
self.velscale = velscale
self.vsyst = 0 if vsyst is None else vsyst/velscale
self.sigma_diff = sigma_diff/velscale
self.factor = 1 # default value
if velscale_ratio is not None:
if not isinstance(velscale_ratio, int):
raise ValueError('VELSCALE_RATIO must be an integer')
self.npix_temp -= self.npix_temp % velscale_ratio
# Make size multiple of velscale_ratio
self.templates = self.templates[:self.npix_temp, :]
# This is the size after rebin()
self.npix_temp //= velscale_ratio
self.factor = velscale_ratio
if self.npix_temp < galaxy.shape[0]:
raise ValueError('TEMPLATES length cannot be smaller than GALAXY')
# Set the FFT of the templates
self.npad = 2**int(numpy.ceil(numpy.log2(self.templates.shape[0])))
self.templates_rfft = numpy.fft.rfft(self.templates, self.npad, axis=0) \
if templates_rfft is None else templates_rfft
# Set sky spectrum
if sky is None:
self.sky = None
else:
if sky.shape[0] != galaxy.shape[0]:
raise ValueError('GALAXY and SKY must have the same size')
self.sky = sky.reshape(sky.shape[0], -1)
self.nsky = 0 if self.sky is None else self.sky.shape[1]
# Select polynomials to use
self.degree = max(degree, -1)
self.mdegree = max(mdegree, 0)
if trig:
if self.degree > -1 and self.degree % 2 != 0:
raise ValueError('`degree` must be even with `trig=True`')
if self.mdegree > -1 and self.mdegree % 2 != 0:
raise ValueError('`mdegree` must be even with `trig=True`')
self.polyval = ppxf.trigval
self.polyvander = ppxf.trigvander
else:
self.polyval = numpy.polynomial.legendre.legval
self.polyvander = numpy.polynomial.legendre.legvander
# Initialize the design matrix. Additive polynomial components
# are independent of the input parameters.
self.npoly = (self.degree + 1)*self.nspec # Number of additive polynomials in fit
self.ndmcols = self.npoly + self.nsky*self.nspec + self.ntemp
self.matrix = numpy.zeros((self.npix*self.nspec, self.ndmcols))
if self.degree > -1:
vand = self.polyvander(numpy.linspace(-1, 1, self.npix), self.degree)
self.matrix[: self.npix, : self.npoly//self.nspec] = vand
if self.nspec == 2: # poly for right spectrum
self.matrix[self.npix :, self.npoly//self.nspec : self.npoly] = vand
# Kinematic components
self.component = numpy.atleast_1d(component)
if self.component.dtype != int:
raise TypeError('COMPONENT must be integers')
if self.component.size == 1 and self.ntemp > 1:
# component is a scalar so all templates have the same LOSVD
self.component = numpy.zeros(self.ntemp, dtype=int)
elif self.component.size != self.ntemp:
raise ValueError('There must be one kinematic COMPONENT per template')
tmp = numpy.unique(self.component)
self.ncomp = tmp.size
if not numpy.array_equal(tmp, numpy.arange(self.ncomp)):
raise ValueError('COMPONENT must range from 0 to NCOMP-1')
# Isolate gas components
if gas_component is None:
self.gas_component = numpy.zeros(self.component.size, dtype=bool)
self.gas_any = False
else:
self.gas_component = numpy.asarray(gas_component)
if self.gas_component.dtype != bool:
raise TypeError('`gas_component` must be boolean')
if self.gas_component.size != component.size:
raise ValueError('`gas_component` and `component` must have the same size')
if not numpy.any(gas_component):
raise ValueError('At least one `gas_component` must be set to true')
self.gas_any = True
# Set reddening
if lam is None and (reddening is not None or gas_reddening is not None):
raise ValueError('LAM must be given with REDDENING or GAS_REDDENING keyword')
if self.mdegree > 0 and reddening is not None:
raise ValueError('MDEGREE cannot be used with REDDENING keyword')
if gas_reddening is not None and not self.gas_any:
raise ValueError('GAS_COMPONENT must be nonzero with GAS_REDDENING keyword')
if lam is not None and lam.shape != galaxy.shape:
raise ValueError('GALAXY and LAM must have the same size')
self.lam = lam
self.reddening = reddening
self.gas_reddening = gas_reddening
if reddening_func is None:
self.reddening_func = ppxf.reddening_cal00
else:
if not callable(reddening_func):
raise TypeError('`reddening_func` must be callable')
self.reddening_func = reddening_func
# Kinematic moments to fit
self.moments = numpy.atleast_1d(moments)
if self.moments.size == 1:
# moments is scalar: all LOSVDs have same number of G-H moments
self.moments = numpy.full(self.ncomp, self.moments, dtype=int)
self.fixall = self.moments < 0 # negative moments --> keep entire LOSVD fixed
self.moments = numpy.abs(self.moments)
if self.moments.size != self.ncomp:
raise ValueError('MOMENTS must be an array of length NCOMP')
def __call__(self, kin, tplwgts, addpoly=None, multpoly=None, reddening=None,
gas_reddening=None):
"""Same as :func:`construct`"""
return self.construct(kin, tplwgts, addpoly=addpoly, multpoly=multpoly,
reddening=reddening, gas_reddening=gas_reddening)
[docs]
def construct(self, kin, tplwgts, addpoly=None, multpoly=None, reddening=None,
gas_reddening=None):
"""
Construct a pPXF model spectrum provided the input
parameters. Mostly a copy of ppxf._linear_fit().
Args:
kin (list, numpy.ndarray): Must have the kinematics of each
component. Must be a list or array of vectors with a
shape (NCOMP,). The length of each vector in the
list/array must be the same as MOMENTS.
tplwgts (numpy.ndarray): Weights to apply to each template.
Shape must be (NTEMP,).
addpoly (numpy.ndarray): (**Optional**) Coefficients of the
additive polynomials. Shape must be
(NSPEC*(DEGREE+1),). Exception is raised if
coefficients are expected (self.degree > -1), but no
coefficiencts are provided.
multpoly (numpy.ndarray): (**Optional**) Coefficients of the
multiplicative polynomials. Shape must be
(NSPEC*MDEGREE,). Exception is raised if coefficients
are expected (self.mdegree > 0), but no coefficiencts
are provided.
reddening (float): (**Optional**) E(B-V) value for the
continuum fit. Exception is raised if value is expected
(self.reddening is not None), but none is provided.
gas_reddening (float): (**Optional**) E(B-V) value for the
gas components. Exception is raised if value is
expected (self.reddening is not None), but none is
provided.
Returns:
numpy.ndarray:
"""
# Check the input kinematics
params = [kin] if self.ncomp == 1 else list(kin)
if len(params) != self.ncomp:
raise ValueError('There must be one set of kinematic moments per component')
if not numpy.all([hasattr(p, "__len__") and len(p) == m
for p,m in zip(params, self.moments)]):
raise ValueError('Input kinematics have the incorrect length')
# Convert velocity from km/s to pixels
for j, flx in enumerate(params):
params[j][:2] = flx[:2]/self.velscale
params = numpy.concatenate(params)
# Check the optional input
if self.degree > -1 and (addpoly is None or len(addpoly) != (self.degree + 1)*self.nspec):
raise ValueError('addpoly must be provided with length {0}'.format(
(self.degree+1)*self.nspec))
if self.mdegree > 0 and (multpoly is None or len(multpoly) != self.mdegree*self.nspec):
raise ValueError('multpoly must be provided with length {0}'.format(
self.mdegree*self.nspec))
if self.reddening is not None and reddening is None:
raise ValueError('reddening must be provided')
if self.gas_reddening is not None and gas_reddening is None:
raise ValueError('gas_reddening must be provided')
# Check the input template weights
weights = numpy.atleast_1d(tplwgts)
if weights.size != self.ntemp:
raise ValueError('Incorrect number of template weights provided.')
# Include the additive polynomial weights
if self.degree > -1:
weights = numpy.append(addpoly, weights)
# Get the real FFT of the LOSVDs
losvd_rfft = ppxf.losvd_rfft(params, self.nspec, self.moments,
self.templates_rfft.shape[0], self.ncomp, self.vsyst,
self.factor, self.sigma_diff)
# Get the multiplicative polynomials
x = numpy.linspace(-1, 1, self.npix)
if self.mdegree > 0:
if self.nspec == 2: # Different multiplicative poly for left/right spectra
mpoly1 = self.polyval(x, numpy.append(1.0, multpoly[::2]))
mpoly2 = self.polyval(x, numpy.append(1.0, multpoly[1::2]))
mpoly = numpy.append(mpoly1, mpoly2).clip(0.1)
else:
mpoly = self.polyval(x, numpy.append(1.0, multpoly)).clip(0.1)
else:
mpoly = None if reddening is None else self.reddening_func(self.lam, reddening)
gas_mpoly = None if gas_reddening is None else self.reddening_func(self.lam, gas_reddening)
# Perform the LOSVD convolution and finish the design matrix
tmp = numpy.empty((self.nspec, self.npix_temp))
for j, template_rfft in enumerate(self.templates_rfft.T): # columns loop
for k in range(self.nspec):
pr = template_rfft*losvd_rfft[:, self.component[j], k]
tt = numpy.fft.irfft(pr, self.npad)
tmp[k, :] = ppxf.rebin(tt[:self.npix_temp*self.factor], self.factor)
self.matrix[:, self.npoly + j] = tmp[:, :self.npix].ravel()
if self.gas_component[j]:
if gas_mpoly is not None:
self.matrix[:, self.npoly + j] *= gas_mpoly
elif mpoly is not None:
self.matrix[:, self.npoly + j] *= mpoly
if self.nsky > 0:
k = self.npoly + self.ntemp
self.matrix[: self.npix, k : k + self.nsky] = self.sky
if nspec == 2: # Sky for right spectrum
self.matrix[self.npix :, k + self.nsky : k + 2*self.nsky] = self.sky
# return the model
return self.matrix.dot(weights)
[docs]
class PPXFFit(StellarKinematicsFit):
"""
Use pPXF to measure the stellar kinematics. Although it can also
fit the composition and emission lines, for now just force it to be
a :class:`StellarKinematicsFit` objec.
Attributes:
bitmask (BitMask): Bitmask to use for masking spectral pixels
and parameter values. For
:func:`fit_SpatiallyBinnedSpectra`, must have bit for
'LOW_SNR'. For :func:`fit` must have bits for 'TPL_PIXELS',
'TRUNCATED', 'PPXF_REJECT', 'LARGE_CHI2', 'LARGE_RESID',
'INSUFFICIENT_DATA', 'FIT_FAILED', 'NEAR_BOUND'.
"""
# Define bit names as global to the class
snr_flag = 'LOW_SNR'
rng_flag = 'OUTSIDE_RANGE'
tpl_flag = 'TPL_PIXELS'
trunc_flag = 'TRUNCATED'
rej_flag = 'PPXF_REJECT'
def __init__(self, bitmask, par=None):
StellarKinematicsFit.__init__(self, 'ppxf', bitmask, par=par)
# Logging and terminal output
self.loggers = None
self.quiet = False
# Imposed fitting boundaries
self.velocity_limits = None
self.sigma_limits = None
self.gh_limits = None
# Spectral data
self.tpl_wave = None
self.tpl_sres = None
self.tpl_flux = None
self.ntpl = None
self.npix_tpl = None
self.obj_wave = None
self.obj_sres = None
self.obj_flux = None
self.obj_ferr = None
self.input_obj_mask = None
self.obj_to_fit = None
self.nobj = None
self.npix_obj = None
# Fitting parameters
self.velscale = None
self.velscale_ratio = None
self.matched_resolution = None
self.guess_kin = None
self.spectrum_start = None
self.spectrum_end = None
self.dof = None
self.base_velocity = None
# Fitting options
self.iteration_mode = None
self.reject_boxcar = None
self.bias = None
self.degree = None
self.mdegree = None
self.moments = None
self.fix_kinematics = None
[docs]
@staticmethod
def iteration_modes():
r"""
Possible iteration methods:
* ``none``: Fit all bins with all templates with a single call
to pPXF.
* ``no_global_wrej``: Do not fit the global spectrum
first, but include a rejection iteration. All templates are
fit in each step.
* ``global_template``: Fit the global spectrum with all
templates and include a single rejection iteration. The
pixel mask for this fit is the base mask for all fits to the
individual bins. A single rejection iteration is done for
each bin. **Only the global template is used when fitting
each bin.**
* ``nonzero_templates``: Fit the global spectrum with
all templates and include a single rejection iteration. The
pixel mask for this fit is the base mask for all fits to the
individual bins. A single rejection iteration is done for
each bin. **Only the templates with non-zero weights are
used when fitting each bin.**
* ``all_templates``: Fit the global spectrum with all
templates and include a single rejection iteration. The
pixel mask for this fit is the base mask for all fits to the
individual bins. A single rejection iteration is done for
each bin. **All templates are used when fitting each bin.**
Returns:
list: List of allowed options.
"""
return [ 'none',
'no_global_wrej',
'global_template',
'nonzero_templates',
'all_templates']
[docs]
def _mode_uses_global_spectrum(self):
return self.iteration_mode in ['global_template', 'nonzero_templates', 'all_templates']
[docs]
def _mode_uses_global_template(self):
return self.iteration_mode in ['global_template']
[docs]
def _mode_uses_nonzero_templates(self):
return self.iteration_mode in ['nonzero_templates']
[docs]
def _mode_uses_all_templates(self):
return self.iteration_mode in ['none', 'no_global_wrej', 'all_templates']
[docs]
def _mode_includes_rejection(self):
return self.iteration_mode != 'none'
[docs]
def _check_mode(self, iteration_mode, reject_boxcar, mdegree):
if iteration_mode not in self.iteration_modes():
raise ValueError('Do not understand iteration mode \'{0}\''.format(iteration_mode))
self.iteration_mode = iteration_mode
self.reject_boxcar = reject_boxcar
[docs]
@staticmethod
def check_template_usage_flags(nobj, ntpl, usetpl):
if usetpl is not None and usetpl.shape != (nobj, ntpl) \
and usetpl.shape != (ntpl,):
raise ValueError('Provided template selection object does not have the correct shape!')
if usetpl is None:
_usetpl = numpy.ones((nobj,ntpl), dtype=bool)
else:
_usetpl = usetpl.astype(bool)
if _usetpl.shape == (ntpl,):
_usetpl = numpy.array([_usetpl]*nobj)
return _usetpl
[docs]
@staticmethod
def check_resolution_match(tpl_sres, obj_sres, matched_resolution):
# Confirm there is enough information to handle an unmatched
# resolution
if not matched_resolution and (obj_sres is None or tpl_sres is None):
raise ValueError('If the spectral resolution is not matched between the template and '
'the object data, you must provide the spectral resolution for both.')
return matched_resolution
[docs]
@staticmethod
def set_wavelength_range(nobj, obj_wave, waverange=None):
_waverange = numpy.array([[obj_wave[0]-1, obj_wave[-1]+1]]) \
if waverange is None else numpy.atleast_2d(waverange)
_waverange[numpy.equal(_waverange[:,0], None),0] = obj_wave[0]-1
_waverange[numpy.equal(_waverange[:,1], None),1] = obj_wave[-1]+1
if _waverange.shape[0] == 1:
_waverange = numpy.array([_waverange[0,:]]*nobj)
if _waverange.shape != (nobj,2):
raise ValueError('Input wavelength range array does not have the correct shape.')
return _waverange
[docs]
@staticmethod
def initialize_model_mask(obj_wave, obj_flux, mask=None, bitmask=None, velocity_offset=None):
model_mask = numpy.zeros(obj_flux.shape,
dtype=bool if bitmask is None else bitmask.minimum_dtype())
if mask is None:
return model_mask
# Include the mask, if provided
if isinstance(mask, numpy.ndarray):
if mask is not None and mask.shape != obj_flux.shape:
raise ValueError('Shape of object mask array must match its flux array.')
model_mask = mask if bitmask is None else mask.astype(bitmask.minimum_dtype())
if isinstance(mask, SpectralPixelMask):
model_mask = mask.boolean(obj_wave, nspec=obj_flux.shape[0],
velocity_offsets=velocity_offset) if bitmask is None \
else mask.bits(bitmask, obj_wave, nspec=obj_flux.shape[0],
velocity_offsets=velocity_offset)
return model_mask
[docs]
@staticmethod
def initialize_pixels_to_fit(tpl_wave, obj_wave, obj_flux, obj_ferr, velscale,
velscale_ratio=None, waverange=None, mask=None, bitmask=None,
velocity_offset=None, max_velocity_range=400., alias_window=None,
ensemble=True, loggers=None, quiet=False):
"""
obj_flux and obj_ferr *must* be masked arrays.
.. todo::
- This function can alter obj_flux and obj_ferr !! Return
them instead?
"""
# Initialize the mask (does not include any mask in the obj_flux
# object)
nobj = obj_flux.shape[0]
err = numpy.empty(nobj, dtype=object) # Empty error messages
_waverange = PPXFFit.set_wavelength_range(nobj, obj_wave, waverange=waverange)
model_mask = PPXFFit.initialize_model_mask(obj_wave, obj_flux, mask=mask, bitmask=bitmask,
velocity_offset=velocity_offset)
# Include the masked object pixels
model_mask[obj_flux.mask] = True if bitmask is None else \
bitmask.turn_on(model_mask[obj_flux.mask], 'DIDNOTUSE')
# Assess the regions that need to be masked during fitting
if ensemble:
# Treat all spectra as part of an ensemble so mask the same
# region for all spectra
_waverange = numpy.array([numpy.amax(_waverange[:,0]), numpy.amin(_waverange[:,1])])
try:
fit_indx, waverange_mask, npix_mask, alias_mask \
= PPXFFit.fitting_mask(tpl_wave, obj_wave, velscale,
velscale_ratio=velscale_ratio,
waverange=_waverange,
velocity_offset=velocity_offset,
max_velocity_range=max_velocity_range,
alias_window=alias_window, loggers=loggers,
quiet=True)#quiet)
except ValueError as e:
return model_mask, numpy.array([e]*nobj), numpy.zeros(nobj, dtype=int), \
numpy.array([obj_flux.shape[1]]*nobj).astype(int)
fit_indx = numpy.array([ fit_indx ]*nobj)
waverange_mask = numpy.array([ waverange_mask ]*nobj)
npix_mask = numpy.array([ npix_mask ]*nobj)
alias_mask = numpy.array([ alias_mask ]*nobj)
else:
# Treat all spectra independently, such that the masks are
# independent
fit_indx = numpy.zeros(obj_flux.shape, dtype=bool)
waverange_mask = numpy.zeros(obj_flux.shape, dtype=bool)
npix_mask = numpy.zeros(obj_flux.shape, dtype=bool)
alias_mask = numpy.zeros(obj_flux.shape, dtype=bool)
_velocity_offset = numpy.asarray(velocity_offset) \
if isinstance(velocity_offset,(list,numpy.ndarray)) \
else numpy.array([velocity_offset]*nobj)
if len(_velocity_offset) != nobj:
raise ValueError('Incorrect number of velocity offsets provided.')
for i in range(nobj):
try:
fit_indx[i,:], waverange_mask[i,:], npix_mask[i,:], alias_mask[i,:] \
= PPXFFit.fitting_mask(tpl_wave, obj_wave, velscale,
velscale_ratio=velscale_ratio,
waverange=_waverange[i,:],
velocity_offset=_velocity_offset[i],
max_velocity_range=max_velocity_range,
alias_window=alias_window, loggers=loggers,
quiet=True)#quiet)
except ValueError as e:
err[i] = e
# Add these regions to the mask
if bitmask is None:
model_mask[waverange_mask] = True
model_mask[npix_mask] = True
model_mask[alias_mask] = True
else:
model_mask[waverange_mask] = bitmask.turn_on(model_mask[waverange_mask],
PPXFFit.rng_flag)
model_mask[npix_mask] = bitmask.turn_on(model_mask[npix_mask], PPXFFit.tpl_flag)
model_mask[alias_mask] = bitmask.turn_on(model_mask[alias_mask], PPXFFit.trunc_flag)
# Make sure that the errors are valid
indx = numpy.invert(obj_ferr.data > 0) | numpy.invert(numpy.isfinite(obj_ferr.data))
if numpy.sum(indx) > 0:
model_mask[indx] = True if bitmask is None else\
bitmask.turn_on(model_mask[indx], 'INVALID_ERROR')
# To avoid having pPXF throw an exception, make sure that all
# of these bad errors are set to unity
obj_ferr[indx] = 1.0
# Update the internal mask of the data
obj_flux[model_mask > 0] = numpy.ma.masked
obj_ferr[model_mask > 0] = numpy.ma.masked
# Determine the starting and ending pixels and return
# TODO: does this work if all the pixels are masked in a given
# spectrum?
pix = numpy.ma.MaskedArray(numpy.array([numpy.arange(obj_flux.shape[1])]*nobj),
mask=numpy.invert(fit_indx))
return model_mask, err, numpy.ma.amin(pix, axis=1), numpy.ma.amax(pix, axis=1)+1
[docs]
def _run_fit_iteration(self, obj_flux, obj_ferr, start, end, base_velocity, tpl_flux,
tpl_rfft, guess_kin, fix_kinematics=False, obj_to_fit=None,
tpl_to_use=None, degree=None, mdegree=None, dof=None,
weight_errors=False, plot=False):
r"""
Fit all the object spectra in obj_flux.
.. todo::
- Calculate DOF in this function?
- Explicitly set the bounds to use instead of using the pPXF
defaults?
Args:
obj_flux (numpy.ma.MaskedArray): Size is :math:`N_{\rm
spec}\times N_{\rm chan}`, object spectra.
obj_ferr (numpy.ma.MaskedArray): Size is :math:`N_{\rm
spec}\times N_{\rm chan}`, object errors
start (array): Size is :math:`N_{\rm spec}`, starting pixel
for each spectrum
end (array): Size is :math:`N_{\rm spec}`, ending pixel (+1)
for each spectrum
base_velocity (array): Size is :math:`N_{\rm spec}`, base
velocity offset between each object spectrum and the
template spectra.
tpl_flux (array): Size is :math:`N_{\rm tpl}\times N_{\rm
tpl chan}`, template spectra
tpl_rfft (array): Size is :math:`N_{\rm tpl}\times N_{\rm
tpl pad}`, real FFT of the template spectra
guess_kin (array): Initial guess for kinematics. Size is
:math:`N_{\rm spec}\times N_{\rm moments}`.
fix_kinematics (bool): (**Optional**) Flag to fix the
kinematics to the input values during the fit.
obj_to_fit (array): (**Optional**) Size is :math:`N_{\rm
spec}`, boolean flag to fit object spectrum
tpl_to_use (array): (**Optional**) Size is :math:`N_{\rm
spec}\times N_{\rm tpl}`, boolean flag to use a template
for the fit to each object spectrum
plot (bool): (**Optional**) Produce the default ppxf fit
plot.
degree (int): (**Optional**) Additive polynomial order.
Default is to use the internal attribute :attr:`degree`.
mdegree (int): (**Optional**) Multiplicative polynomial
order. Default is to use the internal attribute
:attr:`mdegree`.
dof (int): (**Optional**) Number of degrees of freedom in
the fit. Default is to use the internal attribute
:attr:`dof`.
Returns:
numpy.ndarray : Array with :math:`N_{\rm spec}` instances of
:class:`PPXFFitResult`.
"""
# Get the list of templates to use
nspec = obj_flux.shape[0]
_obj_to_fit = numpy.ones(nspec, dtype=bool) if obj_to_fit is None else obj_to_fit
ntpl = tpl_flux.shape[0]
_tpl_to_use = numpy.ones((nspec,ntpl), dtype=bool) if tpl_to_use is None else tpl_to_use
# input_kin = self.guess_kin if fixed_kin is None else fixed_kin
moments = -self.moments if fix_kinematics else self.moments
degree = self.degree if degree is None else degree
mdegree = self.mdegree if mdegree is None else mdegree
dof = self.dof if dof is None else dof
# linear = fixed_kin is not None and mdegree < 1
linear = fix_kinematics and mdegree < 1
# Create the object to hold all the fits
result = numpy.empty(nspec, dtype=object)
# plot=True
# Fit each spectrum
for i in range(nspec):
print('Running pPXF fit on spectrum: {0}/{1}'.format(i+1,nspec), end='\r')
# Meant to ignore this spectrum
if not _obj_to_fit[i]:
result[i] = None
continue
# Get the pixels to fit for this spectrum
gpm = numpy.where(numpy.invert(obj_flux.mask[i,start[i]:end[i]]))[0]
# Check if there is sufficient data for the fit
ntpl_to_use = numpy.sum(_tpl_to_use[i,:])
if len(gpm) < dof+ntpl_to_use:
if not self.quiet:
warnings.warn('Insufficient data points ({0}) to fit spectrum {1}'
'(dof={2}).'.format(len(gpm), i+1, dof+ntpl_to_use))
result[i] = PPXFFitResult(degree, mdegree, start[i], end[i], _tpl_to_use[i,:],
None, ntpl)
continue
# Run ppxf
if plot:
pyplot.clf()
result[i] = PPXFFitResult(degree, mdegree, start[i], end[i], _tpl_to_use[i,:],
ppxf.ppxf(tpl_flux[tpl_to_use[i,:],:].T,
obj_flux.data[i,start[i]:end[i]],
obj_ferr.data[i,start[i]:end[i]], self.velscale,
guess_kin[i,:], velscale_ratio=self.velscale_ratio,
goodpixels=gpm, bias=self.bias, degree=degree,
mdegree=mdegree, moments=moments, vsyst=-base_velocity[i],
quiet=(not plot), plot=plot, linear=linear,
templates_rfft=tpl_rfft[tpl_to_use[i,:],:].T), ntpl,
weight_errors=weight_errors)
# linear_method='lsqlin'), ntpl,
# weight_errors=weight_errors)
if result[i].kin[1] < 0:
raise ValueError('Dispersion less than 0! {0}/{1} {2}'.format(
i+1,nspec,result[i].kin[1]))
if result[i].reached_maxiter() and not self.quiet:
warnings.warn('pPXF optimizer reached maximum number of iterations for spectrum '
'{0}.'.format(i+1))
if plot:
pyplot.show()
# model = PPXFModel(tpl_flux[tpl_to_use[i,:],:].T, obj_flux.data[i,start[i]:end[i]],
# self.velscale, velscale_ratio=self.velscale_ratio, degree=degree,
# mdegree=mdegree, moments=moments, vsyst=-base_velocity[i],
# quiet=True, templates_rfft=tpl_rfft[tpl_to_use[i,:],:].T)
#
# modelfit = model(result[i].kin, result[i].tplwgt, addpoly=result[i].addcoef,
# multpoly=result[i].multcoef)
#
# print(numpy.sum(result[i].bestfit-modelfit))
#
# pyplot.plot(self.obj_wave[start[i]:end[i]], obj_flux.data[i,start[i]:end[i]],
# color='k')
## pyplot.plot(self.obj_wave[start[i]:end[i]], modelfit, color='C0')
# pyplot.plot(self.obj_wave[start[i]:end[i]], result[i].bestfit, color='C3')
# pyplot.plot(self.obj_wave[start[i]:end[i]],
# obj_flux.data[i,start[i]:end[i]]-result[i].bestfit, color='C1')
# pyplot.show()
#
print('Running pPXF fit on spectrum: {0}/{1}'.format(nspec,nspec))
return result
[docs]
def _fit_global_spectrum(self, obj_to_include=None, plot=False):
"""
Fit the global spectrum. This:
- Sets the base-level good pixel mask for the fits to the individual
bins
- Gives the template weights for the global template
.. todo::
- Only include spectra above a given S/N in global spectrum?
- Allow for a number of iterations as input.
"""
_obj_to_include = numpy.ones(self.nobj, dtype=bool) \
if obj_to_include is None else obj_to_include
if len(_obj_to_include) != self.nobj:
raise ValueError('Incorrect number of object flags.')
# Create the global spectrum and its error
# TODO: Does not include covariance!
global_spectrum = numpy.ma.sum(self.obj_flux[_obj_to_include,:], axis=0).reshape(1,-1)
global_spectrum_err = numpy.ma.sqrt(numpy.ma.sum(
numpy.square(self.obj_ferr[_obj_to_include,:]),
axis=0)).reshape(1,-1)
global_spectrum_err[numpy.ma.getmaskarray(global_spectrum)] = 1.0 # To avoid pPXF error
# TODO: Because of how it's used, setting start, end, and
# base_vel this way will mess things up later in the fit()
# function UNLESS all the spectra have the same start and end;
# ie., when fitting the global spectrum, the object spectra
# provided to fit() must be treated as an ensemble.
# Set the fitting region and base velocity offset
start = numpy.array([numpy.amax(self.spectrum_start)])
base_vel = numpy.array([self.base_velocity[numpy.argmax(self.spectrum_start)]])
end = numpy.array([numpy.amin(self.spectrum_end)])
# Use any template that is request for any of the individual
# spectra
usetpl = numpy.any(self.usetpl, axis=0).reshape(1,-1)
# plot=True
# Fit the spectrum
if not self.quiet:
log_output(self.loggers, 1, logging.INFO, 'First fit to global spectrum.')
result = self._run_fit_iteration(global_spectrum, global_spectrum_err, start, end,
base_vel, self.tpl_flux, self.tpl_rfft, self.guess_kin,
fix_kinematics=self.fix_kinematics, tpl_to_use=usetpl,
plot=plot)
# Return if pPXF failed
if result[0].fit_failed():
if not self.quiet:
log_output(self.loggers, 1, logging.INFO, 'pPXF failed to fit global spectrum!')
return None, None
# Perform a single fit rejection
global_spectrum = PPXFFit.reject_model_outliers(global_spectrum, result, rescale=True,
loggers=self.loggers, quiet=self.quiet)
# refit the spectrum
if not self.quiet:
log_output(self.loggers, 1, logging.INFO, 'Fit to global spectrum after rejection.')
return self._run_fit_iteration(global_spectrum, global_spectrum_err, start, end,
base_vel, self.tpl_flux, self.tpl_rfft, self.guess_kin,
fix_kinematics=self.fix_kinematics, tpl_to_use=usetpl,
plot=plot)[0]
[docs]
def _fill_ppxf_par(self, kin, no_shift=True):
# Moments for each kinematic component
ncomp = 1
moments = numpy.atleast_1d(self.moments)
_moments = numpy.full(ncomp, numpy.absolute(moments), dtype=int) if moments.size == 1 \
else numpy.absolute(moments)
# Construct the LOSVD parameter vector
vj = numpy.append(0, numpy.cumsum(_moments)[:-1])
par = kin.copy()
par[0:2] /= self.velscale
if no_shift:
par[0] = 0.0
return par, _moments, vj
[docs]
def _get_losvd_kernels(self, result, no_shift=True):
nspec = len(result)
losvd_kernel_rfft = numpy.empty(nspec, dtype=object)
for i in range(nspec):
if result[i] is None or result[i].fit_failed():
continue
par, _moments, vj = self._fill_ppxf_par(result[i].kin, no_shift=no_shift)
losvd_kernel_rfft[i] = ppxf.losvd_rfft(par, 1, _moments, self.tpl_rfft.shape[1], 1,
0.0 if no_shift else -self.base_velocity[i]/self.velscale,
self.velscale_ratio, 0.0)[:,0,0]
return losvd_kernel_rfft
[docs]
def _fit_all_spectra(self, templates, templates_rfft, tpl_to_use, plot=False,
plot_file_root=None):
"""
Fit all spectra provided.
- Get an initial fit
- Reject
- Mask and smooth templates and objects
- Fit ratio
- Mask and smooth
- Fit ratio
- Fit unmasked with fixed kinematics to get models (with
emission lines?)
"""
#---------------------------------------------------------------
# Fit the spectra
if not self.quiet:
log_output(self.loggers, 1, logging.INFO,
'Number of object spectra to fit: {0}/{1}'.format(
numpy.sum(self.obj_to_fit), len(self.obj_to_fit)))
result = self._run_fit_iteration(self.obj_flux, self.obj_ferr, self.spectrum_start,
self.spectrum_end, self.base_velocity, templates,
templates_rfft, self.guess_kin,
fix_kinematics=self.fix_kinematics,
obj_to_fit=self.obj_to_fit, tpl_to_use=tpl_to_use,
weight_errors=not self._mode_includes_rejection(),
plot=plot)
if not self._mode_includes_rejection():
# Only a single fit so return
return result
#---------------------------------------------------------------
# Copy the input as to not overwrite the input masks
obj_flux = self.obj_flux.copy()
obj_ferr = self.obj_ferr.copy()
obj_to_fit = self.obj_to_fit.copy()
nspec = obj_flux.shape[0]
# Save which were not fit successfully
obj_to_fit &= numpy.invert(numpy.array([ r is None or r.fit_failed() for r in result ]))
if not self.quiet:
log_output(self.loggers, 1, logging.INFO,
'Number of object spectra to fit (excluding failed fits): {0}/{1}'.format(
numpy.sum(self.obj_to_fit), len(self.obj_to_fit)))
#---------------------------------------------------------------
# Reject model outliers
# TODO: This will cause an error if boxcar is None!
obj_flux = PPXFFit.reject_model_outliers(obj_flux, result, rescale=False,
local_sigma=True, boxcar=self.reject_boxcar,
loggers=self.loggers, quiet=self.quiet)
# Copy the new mask to the errors
obj_ferr[numpy.ma.getmaskarray(obj_flux)] = numpy.ma.masked
# Refit and return results
return self._run_fit_iteration(obj_flux, obj_ferr, self.spectrum_start,
self.spectrum_end, self.base_velocity, templates,
templates_rfft, self.guess_kin,
fix_kinematics=self.fix_kinematics,
obj_to_fit=obj_to_fit, tpl_to_use=tpl_to_use,
weight_errors=True, plot=plot)
[docs]
def _fit_dispersion_correction(self, templates, templates_rfft, result,
baseline_dispersion=None):
"""
Calculate the dispersion correction:
- Construct the optimized, redshifted template *without* the
convolution with the best-fitting LOSVD.
- Convolve it with the resolution difference in the data.
- Use pPXF to fit the matched version to the unmatched version;
this should be the dispersion correction.
.. todo::
Decide how to deal with regions below 2-pixel resolution in
synthetic spectrum being fit. How does this alter the
correction?
"""
if not self.quiet:
log_output(self.loggers, 1, logging.INFO,
'Determining velocity dispersion corrections using fit to '
'resolution-matched templates.')
# Construct the model spectra with only the resolution
# difference
model_wlosvd = numpy.ma.empty(self.obj_flux.shape, dtype=float)
model_wlosvd_msres = numpy.ma.empty(self.obj_flux.shape, dtype=float)
model_template = numpy.empty((self.nobj, templates.shape[1]), dtype=float)
model_template_rfft = numpy.empty((self.nobj, templates_rfft.shape[1]), dtype=complex)
start = numpy.empty(self.nobj, dtype=int)
end = numpy.empty(self.nobj, dtype=int)
guess_kin = self.guess_kin.copy()
model_tpl_to_use = numpy.zeros((self.nobj, self.nobj), dtype=bool)
res_match_offset = numpy.zeros(self.nobj, dtype=float)
tpl_ang_per_pix = angstroms_per_pixel(self.tpl_wave, log=True, base=10.)
unity = numpy.ones(self.tpl_wave.size, dtype=float)
_obj_to_fit = self.obj_to_fit.copy()
# Get the nominal kinematics
nominal_redshift = numpy.zeros(self.nobj, dtype=float)
nominal_dispersion = numpy.zeros(self.nobj, dtype=float)
for i in range(self.nobj):
if _obj_to_fit[i] and (result[i] is None or result[i].fit_failed()):
_obj_to_fit[i] = False
continue
if not _obj_to_fit[i] or result[i] is None or result[i].fit_failed():
continue
v = PPXFFit.convert_velocity(result[i].kin[0], 0.0)[0]
nominal_redshift[i] = v/astropy.constants.c.to('km/s').value
nominal_dispersion[i] = result[i].kin[1] if baseline_dispersion is None \
else baseline_dispersion
# Create the spectra to use for the correction
niter = 1
for j in range(niter):
for i in range(self.nobj):
if not _obj_to_fit[i] or result[i] is None or result[i].fit_failed():
continue
# Get the convolution kernel with 0 velocity shift
nominal_par, _moments, vj = self._fill_ppxf_par(numpy.array([0.0,
nominal_dispersion[i]]))
nominal_losvd_kernel_rfft = ppxf.losvd_rfft(nominal_par, 1, _moments,
templates_rfft.shape[1], 1, 0.0,
self.velscale_ratio, 0.0)[:,0,0]
# Get the composite template
model_template[i,:] = numpy.dot(result[i].tplwgt,
templates[result[i].tpl_to_use,:])
model_template_rfft[i,:] = numpy.dot(result[i].tplwgt,
templates_rfft[result[i].tpl_to_use,:])
# Convolve the template to the fitted velocity dispersion
tmp_wlosvd = numpy.fft.irfft(model_template_rfft[i,:]*nominal_losvd_kernel_rfft,
self.tpl_npad)[:self.npix_tpl]
model_tpl_to_use[i,i] = True
# Match the spectral resolution
tmp_wlosvd_msres, sres, res_match_offset[i], mask, ivar = \
match_spectral_resolution(self.tpl_wave, tmp_wlosvd,
self.tpl_sres.sres(),
self.obj_wave/(1+nominal_redshift[i]),
self.obj_sres[i,:], min_sig_pix=0.0, log10=True,
new_log10=True, quiet=True, no_offset=False)
# Check 2-pixel resolution limit in resolution-matched
# spectrum
# pix_per_fwhm = numpy.ma.divide(numpy.ma.divide(self.tpl_wave, sres),
# tpl_ang_per_pix)
# pix_per_fwhm = interpolate.interp1d(self.tpl_wave, pix_per_fwhm,
# fill_value='extrapolate')(self.obj_wave)
# Resample to match the object spectra
inRange = self.tpl_wave[[0,-1]] * (1.0 + nominal_redshift[i])
model_wlosvd[i,:] = Resample(tmp_wlosvd,
x=self.tpl_wave*(1 + nominal_redshift[i]),
inLog=True, newRange=self.obj_wave[[0,-1]],
newpix=self.obj_wave.size, newLog=True).outy
model_wlosvd_msres[i,:] = Resample(tmp_wlosvd_msres,
x=self.tpl_wave*(1 + nominal_redshift[i]),
inLog=True, newRange=self.obj_wave[[0,-1]],
newpix=self.obj_wave.size, newLog=True).outy
# Check 2-pixel resolution limit in resampled data
# _, npix = resample_vector(unity, xRange=inRange, inLog=True,
# newRange=self.obj_wave[[0,-1]],
# newpix=self.obj_wave.size, newLog=True, conserve=True,
# flat=False)
# new_pix_per_fwhm = numpy.ma.divide(pix_per_fwhm, npix)
# indx = new_pix_per_fwhm < 2
# print('Number below 2 pixels: {0}'.format(numpy.sum(indx)))
# Set the pixels to fit
start[i] = result[i].start
end[i] = result[i].end
model_wlosvd[i,:start[i]] = 0.0
model_wlosvd[i,:start[i]] = numpy.ma.masked
model_wlosvd[i,end[i]:] = 0.0
model_wlosvd[i,end[i]:] = numpy.ma.masked
model_wlosvd_msres[i,:start[i]] = 0.0
model_wlosvd_msres[i,:start[i]] = numpy.ma.masked
model_wlosvd_msres[i,end[i]:] = 0.0
model_wlosvd_msres[i,end[i]:] = numpy.ma.masked
# Set the guess kinematics
guess_kin[i,0] = numpy.log(nominal_redshift[i]+1) \
* astropy.constants.c.to('km/s').value
guess_kin[i,1] = nominal_dispersion[i]
# Fit the model to the resolution-matched model
model_ferr = numpy.ma.ones(self.obj_flux.shape, dtype=float)
model_ferr[ numpy.ma.getmaskarray(model_wlosvd_msres) ] = numpy.ma.masked
result_wlosvd = self._run_fit_iteration(model_wlosvd, model_ferr, start, end,
self.base_velocity, model_template,
model_template_rfft, guess_kin,
obj_to_fit=_obj_to_fit,
tpl_to_use=model_tpl_to_use)#,
#plot=True)
result_wlosvd_msres = self._run_fit_iteration(model_wlosvd_msres, model_ferr, start,
end, self.base_velocity, model_template,
model_template_rfft, guess_kin,
obj_to_fit=_obj_to_fit,
tpl_to_use=model_tpl_to_use)#,
#plot=True)
dispersion_correction_err = numpy.array([ rcl is None or rcl.fit_failed()
or rcl.reached_maxiter()
or rcls is None or rcls.fit_failed()
or rcls.reached_maxiter()
for rcl, rcls in zip(result_wlosvd, result_wlosvd_msres) ])
dispersion_correction_err &= _obj_to_fit
indx = numpy.invert(dispersion_correction_err) & _obj_to_fit
disp_wlosvd = numpy.zeros(self.nobj, dtype=float)
disp_wlosvd[indx] = numpy.array([ rc.kin[1] for rc in result_wlosvd[indx] ])
disp_wlosvd_msres = numpy.zeros(self.nobj, dtype=float)
disp_wlosvd_msres[indx] = numpy.array([ numpy.square(rc.kin[1]) - numpy.square(sigoff)
for rc, sigoff in zip(result_wlosvd_msres[indx],
res_match_offset[indx])])
disp_wlosvd_msres = numpy.ma.sqrt(disp_wlosvd_msres).filled(0.0)
dispersion_correction = numpy.zeros(self.nobj, dtype=float)
dispersion_correction[indx] = numpy.ma.sqrt(numpy.square(disp_wlosvd_msres[indx])
-numpy.square(disp_wlosvd[indx])).filled(0.0)
# TODO: The looping and this update should probably be
# removed
if j < niter-1:
nominal_dispersion[indx] = numpy.ma.sqrt( numpy.square(nominal_dispersion[indx])
- numpy.square(dispersion_correction[indx])).filled(0.0)
if not self.quiet:
log_output(self.loggers, 1, logging.INFO,
'Dispersion corrections larger than measurement: {0}'.format(
numpy.sum(dispersion_correction > nominal_dispersion)))
nominal_dispersion[dispersion_correction > nominal_dispersion] \
= self.sigma_limits[0]
if not self.quiet:
log_output(self.loggers, 1, logging.INFO,
'Mean dispersion correction: {0}'.format(
numpy.mean(dispersion_correction[indx])))
return dispersion_correction, dispersion_correction_err
[docs]
def _nominal_dispersion_correction(self, obj_sres, gpm, cz):
"""
Calculate the dispersion corrections as the quadrature
difference between the spectral resolution of the template and
object spectra. Returns a masked array!
"""
c = astropy.constants.c.to('km/s').value
fwhm_inst_obj = c/obj_sres[gpm]
fwhm_inst_tpl = c/self.tpl_sres(self.obj_wave[gpm]/(1+cz/c))
mean_fwhm_sqr_diff = numpy.mean(numpy.square(fwhm_inst_obj) - numpy.square(fwhm_inst_tpl))
if mean_fwhm_sqr_diff < 0:
return 0., True
return numpy.sqrt(mean_fwhm_sqr_diff)/DAPConstants.sig2fwhm, False
[docs]
def _is_near_bounds(self, result, guess_velocity, tol_frac=1e-2):
"""
Check if the fitted kinematics are near the imposed limits.
The definition of "near" is that the velocity and higher moments
cannot be closer than the provided fraction of the total width
to the boundary. For the velocity dispersion, the fraction is
done in log space.
"""
near_bounds = numpy.zeros(self.moments, dtype=bool)
near_lower_sigma_bound = False
# Velocity
_velocity_limits = self.velocity_limits + guess_velocity
Dv = numpy.diff(_velocity_limits)[0]
tol = Dv*tol_frac
near_bounds[0] = result.kin[0]-_velocity_limits[0] < tol \
or _velocity_limits[1]-result.kin[0] < tol
# Velocity dispersion
Ds = numpy.diff(numpy.log10(self.sigma_limits))[0]
tol = Ds*tol_frac
near_lower_sigma_bound \
= numpy.log10(result.kin[1]) - numpy.log10(self.sigma_limits[0]) < tol
near_bounds[1] = near_lower_sigma_bound \
or numpy.log10(self.sigma_limits[1]) - numpy.log10(result.kin[1]) < tol
if self.moments == 2:
return near_bounds, near_lower_sigma_bound
# H3 and H4
Dh = numpy.diff(self.gh_limits)
tol = Dh*tol_frac
near_bounds[2] = result.kin[2] - self.gh_limits[0] < tol \
or self.gh_limits[1] - result.kin[2] < tol
near_bounds[3] = result.kin[3] - self.gh_limits[0] < tol \
or self.gh_limits[1] - result.kin[3] < tol
if self.moments == 4:
return near_bounds, near_lower_sigma_bound
# H5 and H6
near_bounds[4] = result.kin[4] - self.gh_limits[0] < tol \
or self.gh_limits[1] - result.kin[4] < tol
near_bounds[5] = result.kin[5] - self.gh_limits[0] < tol \
or self.gh_limits[1] - result.kin[5] < tol
return near_bounds, near_lower_sigma_bound
# def _set_and_report_failed_status(self, model_mask, model_par, message):
# if not self.quiet:
# log_output(self.loggers, 1, logging.INFO, message)
# return self.bitmask.turn_on(model_mask, 'FIT_FAILED'), \
# self.bitmask.turn_on(model_par, 'FIT_FAILED')
[docs]
def _validate_kinematics(self, model_mask, model_par):
"""
Validate the returned kinematics.
Checks:
- corrected velocity dispersion must be in the range 50-400
km/s
"""
sigcor = numpy.square(model_par['KIN'][:,1]) - numpy.square(model_par['SIGMACORR_EMP'])
indx = ((sigcor < 2500.) | (sigcor > 1.6e5)) & self.obj_to_fit
if numpy.sum(indx) == 0:
return
model_par['MASK'][indx] = self.bitmask.turn_on(model_par['MASK'][indx], 'BAD_SIGMA')
[docs]
def _save_results(self, global_fit_result, templates, templates_rfft, result, model_mask,
model_par):
#---------------------------------------------------------------
# Get the model spectra
model_flux = PPXFFit.compile_model_flux(self.obj_flux, result)
# Calculate the model residuals. They still need to be masked
# in regions that were *not* included in the fit; see the
# iterations below
residual = self.obj_flux - model_flux
fractional_residual = numpy.ma.divide(self.obj_flux - model_flux, model_flux)
chi2 = numpy.square(numpy.ma.divide(residual, self.obj_ferr))
# Instantiate a bad pixel mask to be used
bpm = numpy.zeros_like(self.obj_wave, dtype=bool)
# TODO: This is done in initialize_mask
# # Flag the pixels that were not used
# model_mask[numpy.ma.getmaskarray(self.obj_flux)] \
# = self.bitmask.turn_on(model_mask[numpy.ma.getmaskarray(self.obj_flux)],
# flag='DIDNOTUSE')
#---------------------------------------------------------------
# Need to iterate over each spectrum
for i in range(self.nobj):
#-----------------------------------------------------------
# Set output flags
# No fit was performed
if result[i] is None:
model_mask[i,:] = self.bitmask.turn_on(model_mask[i,:], 'NO_FIT')
model_par['MASK'][i] = self.bitmask.turn_on(model_par['MASK'][i], 'NO_FIT')
continue
# No fit attempted because of insufficient data
if result[i].empty_fit():
model_mask[i,:] = self.bitmask.turn_on(model_mask[i,:], 'NO_FIT')
model_par['MASK'][i] = self.bitmask.turn_on(model_par['MASK'][i],
'INSUFFICIENT_DATA')
continue
# Fit attempted but failed
if result[i].fit_failed():
model_mask[i,:] = self.bitmask.turn_on(model_mask[i,:], 'FIT_FAILED')
model_par['MASK'][i] = self.bitmask.turn_on(model_par['MASK'][i], 'FIT_FAILED')
# Fit successful but hit maximum iterations.
if result[i].reached_maxiter():
model_par['MASK'][i] = self.bitmask.turn_on(model_par['MASK'][i], 'MAXITER')
# Test if the kinematics are near the imposed boundaries.
# The best fitting model is still provided, but masked.
near_bound, near_lower_sigma_bound = self._is_near_bounds(result[i],
self.guess_kin[i,0])
# If the velocity dispersion has hit the lower limit, ONLY
# flag the value as having a MIN_SIGMA.
if near_lower_sigma_bound:
near_bound[1] = False
model_par['MASK'][i] = self.bitmask.turn_on(model_par['MASK'][i], 'MIN_SIGMA')
# Otherwise, flag both the model and parameter set
if numpy.any(near_bound):
if not self.quiet:
warnings.warn('Returned parameters for spectrum {0} too close to '
'bounds.'.format(i+1))
model_par['MASK'][i] = self.bitmask.turn_on(model_par['MASK'][i], 'NEAR_BOUND')
# model_mask[i,:] = self.bitmask.turn_on(model_mask[i,:], 'NEAR_BOUND')
# Mask rejected pixels
s = self.spectrum_start[i]
e = self.spectrum_end[i]
original_gpm = numpy.where(numpy.invert(self.obj_flux.mask[i,s:e]))[0]
rejected_pixels = list(set(original_gpm) - set(result[i].gpm))
if len(rejected_pixels) > 0:
model_mask[i,s:e][rejected_pixels] \
= self.bitmask.turn_on(model_mask[i,s:e][rejected_pixels],
flag=PPXFFit.rej_flag)
#-----------------------------------------------------------
# Save the model parameters and figures of merit
# Number of fitted pixels
model_par['NPIXFIT'][i] = len(result[i].gpm)
# Template weights and errors
if self._mode_uses_global_template():
model_par['TPLWGT'][i] = result[i].tplwgt[0]*global_fit_result.tplwgt
if result[i].tplwgterr is not None:
model_par['TPLWGTERR'][i] = result[i].tplwgterr[0]*global_fit_result.tplwgt
else:
model_par['TPLWGT'][i][result[i].tpl_to_use] = result[i].tplwgt
if result[i].tplwgterr is not None:
model_par['TPLWGTERR'][i][result[i].tpl_to_use] = result[i].tplwgterr
# Templates used
model_par['USETPL'][i] = result[i].tpl_to_use
# Additive polynomial coefficients
# TODO: Save nominal polynomial errors as well?
if self.degree >= 0 and result[i].addcoef is not None:
model_par['ADDCOEF'][i] = result[i].addcoef
if self.mdegree > 0 and result[i].multcoef is not None:
model_par['MULTCOEF'][i] = result[i].multcoef
# Input kinematics
model_par['KININP'][i] = self.guess_kin[i,:]
# Best-fit kinematics
model_par['KIN'][i] = result[i].kin
# Kinematic errors
model_par['KINERR'][i] = result[i].kinerr
# Ensure that only pixels used in the fit are included in
# the fit metrics
chi2[i,model_mask[i,:]>0] = numpy.ma.masked
residual[i,model_mask[i,:]>0] = numpy.ma.masked
fractional_residual[i,model_mask[i,:]>0] = numpy.ma.masked
# Get growth statistics for the three figures of merit
model_par['CHIGRW'][i] \
= sample_growth(numpy.ma.sqrt(chi2[i,:]).compressed(),
[0.0, 0.68, 0.95, 0.99, 1.0], use_interpolate=False)
model_par['RMSGRW'][i] \
= sample_growth(numpy.ma.absolute(residual[i,:]).compressed(),
[0.0, 0.68, 0.95, 0.99, 1.0], use_interpolate=False)
model_par['FRMSGRW'][i] \
= sample_growth(numpy.ma.absolute(fractional_residual[i,:]).compressed(),
[0.0, 0.68, 0.95, 0.99, 1.0], use_interpolate=False)
# Calculate the dispersion correction if necessary
if not self.matched_resolution:
model_par['SIGMACORR_SRES'][i], err \
= self._nominal_dispersion_correction(self.obj_sres[i], result[i].gpm,
PPXFFit.convert_velocity(model_par['KIN'][i,0], 0)[0])
if err:
model_par['MASK'][i] = self.bitmask.turn_on(model_par['MASK'][i],
'BAD_SIGMACORR_SRES')
# Get the figures-of-merit for each spectrum
model_par['CHI2'] = numpy.ma.sum(chi2, axis=1).filled(0.0)
model_par['RMS'] = numpy.sqrt(numpy.ma.mean(numpy.square(residual), axis=1).filled(0.0))
model_par['FRMS'] = numpy.sqrt(numpy.ma.mean(numpy.square(fractional_residual),
axis=1).filled(0.0))
dof = numpy.sum(model_par['TPLWGT'], axis=1) + self.dof
model_par['RCHI2'] = model_par['CHI2'] / (model_par['NPIXFIT'] - dof)
#---------------------------------------------------------------
# Calculate the dispersion corrections, if necessary
# TODO: Get the velocity dispersion corrections here and above
# regardless of the resolution matching?
if not self.matched_resolution:
model_par['SIGMACORR_EMP'], err \
= self._fit_dispersion_correction(templates, templates_rfft, result,
baseline_dispersion=100)
if numpy.sum(err) > 0:
model_par['MASK'][err] = self.bitmask.turn_on(model_par['MASK'][err],
'BAD_SIGMACORR_EMP')
#---------------------------------------------------------------
# Test if kinematics are reliable
# TODO: Skipped for now because unreliable flag not well defined
# self._validate_kinematics(model_mask, model_par)
#---------------------------------------------------------------
# Convert the velocities from pixel units to redshift
model_par['KININP'][:,0], _ = PPXFFit.convert_velocity(model_par['KININP'][:,0],
numpy.zeros(self.nobj))
model_par['KIN'][:,0], model_par['KINERR'][:,0] \
= PPXFFit.convert_velocity(model_par['KIN'][:,0], model_par['KINERR'][:,0])
# indx = numpy.invert(self.bitmask.flagged(model_par['MASK'],
# flag=['NO_FIT', 'INSUFFICIENT_DATA', 'FIT_FAILED']))
# pyplot.scatter(model_par['SIGMACORR_SRES'][indx], model_par['SIGMACORR_EMP'][indx],
# marker='.', s=50)
# pyplot.show()
#---------------------------------------------------------------
return model_flux, model_mask, model_par
[docs]
def fit_SpatiallyBinnedSpectra(self, binned_spectra, gpm=None, par=None, loggers=None,
quiet=False, debug=False):
"""
This is a basic interface that is geared for the DAP that
interacts with the rest of the, more general, parts of the
class.
This should not declare anything to self!
"""
# Parameter must be provided
if par is None:
raise ValueError('Required parameters for PPXFFit have not been defined.')
# Check the parameters
# _def = PPXFFitPar._keyword_defaults()
# if par['regul'] != _def['regul'] or par['reddening'] != _def['reddening'] \
# or par['component'] != _def['component'] or par['reg_dim'] != _def['reg_dim']:
# raise NotImplementedError('Cannot use regul, reddening, component, or regul_dim yet.')
# SpatiallyBinnedSpectra object always needed
if binned_spectra is None or not isinstance(binned_spectra, SpatiallyBinnedSpectra):
raise TypeError('Must provide a SpatiallyBinnedSpectra object for fitting.')
if binned_spectra.hdu is None:
raise ValueError('Provided SpatiallyBinnedSpectra object is undefined!')
# If specified, the binned_spectra should have spectral
# resolution measurements based on a pre-pixelized Gaussian
if 'STCKPRE' in binned_spectra.hdu['PRIMARY'].header \
and not binned_spectra.hdu['PRIMARY'].header['STCKPRE']:
raise ValueError('PPXFFit expects LSF measurements based on a pre-pixelized Gaussian.')
# # TemplateLibrary object always needed
# if par['template_library'] is None \
# or not isinstance(par['template_library'], TemplateLibrary):
# raise TypeError('Must provide a TemplateLibrary object for fitting.')
# if par['template_library'].hdu is None:
# raise ValueError('Provided TemplateLibrary object is undefined!')
# TemplateLibrary object always needed
if par.templates is None or not isinstance(par.templates, TemplateLibrary):
raise TypeError('Must provide a TemplateLibrary object for fitting.')
if par.templates.hdu is None:
raise ValueError('Provided TemplateLibrary object is undefined!')
# Select the spectra that meet the selection criteria
# TODO: Link this to the StellarContinuumModel._bins_to_fit()
# function...
# good_spec = binned_spectra.above_snr_limit(par['minimum_snr'], debug=debug)
# Get the object data
obj_wave = binned_spectra['WAVE'].data.copy()
obj_flux = binned_spectra.copy_to_masked_array(flag=binned_spectra.do_not_fit_flags())
obj_ferr = numpy.ma.power(binned_spectra.copy_to_masked_array(ext='IVAR',
flag=binned_spectra.do_not_fit_flags()) , -0.5)
obj_sres = binned_spectra.copy_to_array(ext='SPECRES')
guess_redshift = par['guess_redshift']
guess_dispersion = par['guess_dispersion']
binid = binned_spectra['BINS'].data['BINID']
binid_index = numpy.arange(binned_spectra.nbins)
if gpm is not None:
obj_flux = obj_flux[gpm,:]
obj_ferr = obj_ferr[gpm,:]
obj_sres = obj_sres[gpm,:]
guess_redshift = guess_redshift[gpm]
guess_dispersion = guess_dispersion[gpm]
binid = binid[gpm]
binid_index = binid_index[gpm]
# Warn the user that only a single spectral resolution is used
# for the templates
if not self.quiet:
warnings.warn('Adopting mean spectral resolution of all templates!')
tpl_sres = numpy.mean(par.templates['SPECRES'].data, axis=0).ravel()
# Perform the fit
# TODO: Alias window is never used...
model_wave, model_flux, model_mask, model_par \
= self.fit(par.templates['WAVE'].data.copy(), par.templates['FLUX'].data.copy(),
binned_spectra['WAVE'].data.copy(), obj_flux, obj_ferr, guess_redshift,
guess_dispersion, iteration_mode=par['iteration_mode'],
reject_boxcar=par['reject_boxcar'], ensemble=True,
velscale_ratio=par['velscale_ratio'], mask=par['pixelmask'],
matched_resolution=par['matched_resolution'], tpl_sres=tpl_sres,
obj_sres=obj_sres, waverange=par['pixelmask'].waverange,
bias=par['bias'], degree=par['degree'], mdegree=par['mdegree'],
moments=par['moments'], loggers=loggers, quiet=quiet, dvtol=1e-9)
#plot=True)
# DEBUG
assert numpy.sum(model_wave - binned_spectra['WAVE'].data) == 0, \
'Incorrect wavelength range'
# Save the the bin ID numbers indices based on the spectra
# selected to be fit
model_par['BINID'] = binid #binned_spectra['BINS'].data['BINID'][good_spec]
model_par['BINID_INDEX'] = binid_index #numpy.arange(binned_spectra.nbins)[good_spec]
# Only return model and model parameters for the *fitted*
# spectra
return model_wave, model_flux, model_mask, model_par
[docs]
def fit(self, tpl_wave, tpl_flux, obj_wave, obj_flux, obj_ferr, guess_redshift,
guess_dispersion, iteration_mode='global_template', reject_boxcar=100, ensemble=True,
velscale_ratio=None, mask=None, usetpl=None, matched_resolution=True, tpl_sres=None,
obj_sres=None, waverange=None, bias=None, degree=4, mdegree=0, moments=2, loggers=None,
quiet=False, max_velocity_range=400., alias_window=None, dvtol=1e-10, plot=False,
plot_file_root=None):
r"""
Wrapper for pPXF with some additional convenience functions.
Limited implementation at the moment.
Args:
tpl_wave (numpy.ndarray):
1D vector of template wavelengths at rest in angstroms.
tpl_flux (numpy.ndarray):
N-templates x N-wavelengths array of template spectra to
fit.
obj_wave (numpy.ndarray):
1D vector of object wavelengths in angstroms. Does NOT
need to be same as the template wavelengths.
obj_flux (numpy.ndarray):
N-spec x N-wavelengths array of object spectra to fit.
Can be a numpy.ma.MaskedArray.
obj_ferr (numpy.ndarray):
N-spec x N-wavelengths array with the errors in the
object spectra.
guess_redshift (:obj:`float`, numpy.ndarray):
Single or spectrum-specific redshift used to set the
initial guess kinematics.
guess_dispersion (:obj:`float`, numpy.ndarray):
Single or spectrum-specific velocity dispersion used to
set the initial guess kinematics.
iteration_mode (:obj:`str`, optional):
Iteration sequence to perform. See
:func:`iteration_modes`.
reject_boxcar (:obj:`int`, optional):
Size of the boxcar to use during the rejection
iteration. Default is 100. If None, rejection uses the
entire residual spectrum.
ensemble (:obj:`bool`, optional):
Treat the list of input spectra as an ensemble.
Currently, this only affects how the spectra are masked.
Default is to treat them as an ensemble. When not
treated as an ensemble, each spectrum is masked
individually according to the input wavelength range and
velocity offsets. *It does not make sense to set the
iteration_mode to something that will include a fit to
the global spectrum if you're not treating the list of
object spectra as an ensemble.*
velscale_ratio (:obj:`int`, optional):
Ratio of velocity scale per pixel in the object spectra
to that for the template spectra. Default is that they
should be identical.
mask (numpy.ndarray, :class:`mangadap.util.pixelmask.SpectralPixelMask`, optional):
A baseline pixel mask to use during the fitting. Other
pixels may be masked via the convenience functions, but
these pixels will always be masked.
matched_resolution (:obj:`bool`, optional):
Flag that the object and template spectra have identical
spectral resolution. Default is True.
tpl_sres (numpy.ndarray, optional):
One-dimensional vector with the spectral resolution
(:math:`R = \lambda/\Delta\lambda`) at each wavelength
of the template spectra. Default is the resolution is
not provided and assumed to be same as the object
resolution.
obj_sres (numpy.ndarray, optional):
One- or Two-dimensional array with the spectral
resolution (:math:`R = \lambda/\Delta\lambda`) at each
wavelength for (each of) the object spectra. Default is
the resolution is not provided and assumed to be same as
the template resolution.
waverange (array-like, optional):
Lower and upper wavelength limits to *include* in the
fit. This can be a two-element vector to apply the same
limits to all spectra, or a N-spec x 2 array with
wavelength ranges for each spectrum to be fit. Default
is to use as much of the spectrum as possible.
bias (:obj:`float`, optional):
From the pPXF documentation: This parameter biases the
(h3, h4, ...) measurements towards zero (Gaussian LOSVD)
unless their inclusion significantly decreases the error
in the fit. Set this to BIAS=0.0 not to bias the fit:
the solution (including [V, sigma]) will be noisier in
that case. The default BIAS should provide acceptable
results in most cases, but it would be safe to test it
with Monte Carlo simulations. This keyword precisely
corresponds to the parameter \lambda in the Cappellari &
Emsellem (2004) paper. Note that the penalty depends on
the *relative* change of the fit residuals, so it is
insensitive to proper scaling of the NOISE vector. A
nonzero BIAS can be safely used even without a reliable
NOISE spectrum, or with equal weighting for all pixels.
degree (:obj:`int`, optional):
From the pPXF documentation: degree of the *additive*
Legendre polynomial used to correct the template
continuum shape during the fit (default: 4). Set DEGREE
= -1 not to include any additive polynomial.
mdegree (:obj:`int`, optional):
From the pPXF documentation: degree of the
*multiplicative* Legendre polynomial (with mean of 1)
used to correct the continuum shape during the fit
(default: 0). The zero degree multiplicative polynomial
is always included in the fit as it corresponds to the
weights assigned to the templates. Note that the
computation time is longer with multiplicative
polynomials than with the same number of additive
polynomials.
.. note::
**IMPORTANT**: Multiplicative polynomials cannot be
used when the REDDENING keyword is set.
moments (:obj:`int`, optional):
From the pPXF documentation: Order of the Gauss-Hermite
moments to fit. Set this keyword to 4 to fit [h3, h4]
and to 6 to fit [h3, h4, h5, h6]. Note that in all cases
the G-H moments are fitted (non-linearly) *together*
with [V, sigma].
- If MOMENTS=2 or MOMENTS is not set then only [V,
sigma] are fitted and the other parameters are
returned as zero.
- If MOMENTS is negative then the kinematics of the
given COMPONENT are kept fixed to the input
values.
- EXAMPLE: We want to keep fixed component 0, which
has an LOSVD described by [V, sigma, h3, h4] and
is modelled with 100 spectral templates; At the
same time we fit [V, sigma] for COMPONENT=1, which
is described by 5 templates (this situation may
arise when fitting stellar templates with
pre-determined stellar kinematics, while fitting
the gas emission). We should give in input to
ppxf() the following parameters: component =
[0]*100 + [1]*5 # --> [0, 0, ..., 0, 1, 1, 1, 1,
1] moments = [-4, 2] start = [[V, sigma, h3, h4],
[V, sigma]]
loggers (:obj:`list`, optional):
List of `logging.Logger`_ objects to log progress;
ignored if quiet=True. Logging is done using
:func:`mangadap.util.log.log_output`. Default is no
logging.
quiet (:obj:`bool`, optional):
Suppress all terminal and logging output. Default is
False.
max_velocity_range (:obj:`float`, optional):
Maximum range (+/-) expected for the fitted velocities
in km/s.
alias_window (:obj:`float`, optional):
The window to mask to avoid aliasing near the edges of
the spectral range in km/s. Default is six times
*max_velocity_range*.
dvtol (:obj:`float`, optional):
The velocity scale of the template spectra and object
spectrum must be smaller than this tolerance.
plot (:obj:`bool`, optional):
Show the automatically generated pPXF fit plots.
Returns:
tuple: Returns 4 numpy.ndarray objects:
1. The wavelengths of the best fitting model spectra.
Nominally the same as the wavelengths of the input
object spectra (*obj_wave*).
2. The fluxes of the best-fitting model spectra.
3. A mask for the best-fitting models spectra, following
from the internal bitmask.
4. A record array with the fitted model parameters; see
:class:`spectralfitting.StellarKinematicsFit._per_stellar_kinematics_dtype`.
Raises:
ValueError:
Raised if the input arrays are not of the correct shape
or if the pixel scale of the template and object spectra
is greater than the specified tolerance.
"""
# plot = True
#---------------------------------------------------------------
# Initialize the reporting
if loggers is not None:
self.loggers = loggers
self.quiet = quiet
#---------------------------------------------------------------
# Check the input; the following operations are necessarily
# sequential in some cases because of attributes assigned along
# the way
# - Mode
self._check_mode(iteration_mode, reject_boxcar, mdegree)
# - Objects
self.obj_wave, self.obj_flux, self.obj_ferr, self.obj_sres \
= PPXFFit.check_objects(obj_wave, obj_flux, obj_ferr=obj_ferr, obj_sres=obj_sres)
self.nobj, self.npix_obj = self.obj_flux.shape
self.input_obj_mask = numpy.ma.getmaskarray(self.obj_flux).copy()
self.obj_to_fit = numpy.any(numpy.invert(self.input_obj_mask), axis=1)
# print('Objects to fit: {0}/{1}'.format(numpy.sum(self.obj_to_fit), self.nobj))
# - Input pixel scale
self.velscale, self.velscale_ratio \
= PPXFFit.check_pixel_scale(tpl_wave, self.obj_wave,
velscale_ratio=velscale_ratio, dvtol=dvtol)
# - Templates
self.tpl_wave, self.tpl_flux, self.tpl_sres \
= PPXFFit.check_templates(tpl_wave, tpl_flux, tpl_sres=tpl_sres,
velscale_ratio=self.velscale_ratio)
self.ntpl, self.npix_tpl = self.tpl_flux.shape
self.tpl_npad = 2**int(numpy.ceil(numpy.log2(self.npix_tpl)))
# self.tpl_npad = fftpack.next_fast_len(self.npix_tpl)
self.tpl_rfft = numpy.fft.rfft(self.tpl_flux, self.tpl_npad, axis=1)
# - Template usage
self.usetpl = PPXFFit.check_template_usage_flags(self.nobj, self.ntpl, usetpl)
# - Input kinematics
self.input_cz, self.guess_kin = PPXFFit.check_input_kinematics(self.nobj, guess_redshift,
guess_dispersion)
# - Spectral resolution
self.matched_resolution = PPXFFit.check_resolution_match(self.tpl_sres, self.obj_sres,
matched_resolution)
# - Selected wavelength range: always has shape (self.nobj,2)
# TODO: When is this used?
self.waverange = PPXFFit.set_wavelength_range(self.nobj, self.obj_wave, waverange=waverange)
#---------------------------------------------------------------
# Make sure that any mode that fits the global spectrum treats
# the individual spectra as part of an ensemble. This step is
# important for setting the spectral range and the masking that
# is assumed throughout the rest of the function!
if not ensemble and self._mode_uses_global_spectrum():
warnings.warn('When fitting the global spectrum, the spectra MUST be treated as an '
'ensemble.')
_ensemble = True
else:
_ensemble = ensemble
#---------------------------------------------------------------
# Initialize the mask and the spectral range to fit.
model_mask, init_pix_err, self.spectrum_start, self.spectrum_end \
= PPXFFit.initialize_pixels_to_fit(self.tpl_wave, self.obj_wave, self.obj_flux,
self.obj_ferr, self.velscale,
velscale_ratio=self.velscale_ratio,
waverange=waverange, mask=mask,
bitmask=self.bitmask,
velocity_offset=self.input_cz,
max_velocity_range=max_velocity_range,
alias_window=alias_window, ensemble=_ensemble,
loggers=self.loggers, quiet=self.quiet)
#---------------------------------------------------------------
# Set the basic pPXF parameters
# - Polynomials
self.degree = max(degree,-1)
self.mdegree = max(mdegree,0)
# - Kinematics
self.velocity_limits, self.sigma_limits, self.gh_limits \
= PPXFFit.losvd_limits(self.velscale)
self.bias = bias
self.moments = numpy.absolute(moments)
self.fix_kinematics = moments < 0
# - Degrees of freedom
self.dof = self.moments + max(self.mdegree, 0)
if self.degree >= 0:
self.dof += self.degree+1
if self.fix_kinematics:
self.dof -= self.moments
#---------------------------------------------------------------
# Report the input checks/results
if not self.quiet:
log_output(self.loggers, 1, logging.INFO, 'Number of templates: {0}'.format(self.ntpl))
log_output(self.loggers, 1, logging.INFO, 'Number of object spectra: {0}'.format(
self.nobj))
log_output(self.loggers, 1, logging.INFO, 'Pixel scale: {0} km/s'.format(self.velscale))
log_output(self.loggers, 1, logging.INFO, 'Pixel scale ratio: {0}'.format(
self.velscale_ratio))
log_output(self.loggers, 1, logging.INFO, 'Dispersion limits: {0} - {1}'.format(
*self.sigma_limits))
log_output(self.loggers, 1, logging.INFO, 'Kinematics fixed: {0}'.format(
self.fix_kinematics))
log_output(self.loggers, 2, logging.INFO, 'Model degrees of freedom: {0}'.format(
self.dof+self.ntpl))
log_output(self.loggers, 1, logging.INFO, 'Iteration mode: {0}'.format(
self.iteration_mode))
log_output(self.loggers, 2, logging.INFO,
'Additive polynomial order: {0}'.format(self.degree
if self.degree > -1 else 'None'))
log_output(self.loggers, 2, logging.INFO,
'Multiplicative polynomial order: {0}'.format(self.mdegree
if self.mdegree > 0 else 'None'))
#---------------------------------------------------------------
# Initialize the output data
model_flux = numpy.zeros(self.obj_flux.shape, dtype=float)
model_par = self.init_datatable(self.ntpl, self.degree+1, max(self.mdegree,0),
self.moments, self.bitmask.minimum_dtype(),
shape=self.nobj)
# Set the bins; here the ID and index are identical
model_par['BINID'] = numpy.arange(self.nobj)
model_par['BINID_INDEX'] = numpy.arange(self.nobj)
# Save the pixel statistics
model_par['BEGPIX'] = self.spectrum_start
model_par['ENDPIX'] = self.spectrum_end
model_par['NPIXTOT'] = self.spectrum_end - self.spectrum_start
#---------------------------------------------------------------
# Flag any errors in the fitted spectral range initialization
ended_in_error = numpy.array([e is not None for e in init_pix_err])
if numpy.any(ended_in_error):
if not self.quiet:
warnings.warn('Masking failures in some/all spectra. Errors are: {0}'.format(
numpy.array([(i,e)
for i,e in enumerate(init_pix_err)])[ended_in_error]))
model_par['MASK'][ended_in_error] \
= self.bitmask.turn_on(model_par['MASK'][ended_in_error], 'NO_FIT')
if numpy.all(ended_in_error):
return self.obj_wave, model_flux, model_mask, model_par
#---------------------------------------------------------------
# Get the input pixel shift between the object and template
# wavelength vectors; interpretted by pPXF as a base velocity
# shift between the two
self.base_velocity = numpy.array([PPXFFit.ppxf_tpl_obj_voff(self.tpl_wave,
self.obj_wave[s:e], self.velscale,
velscale_ratio=self.velscale_ratio)
for s,e in zip(self.spectrum_start,
self.spectrum_end)])
#---------------------------------------------------------------
# Fit the global spectrum if requested by the iteration mode
global_fit_result = None
if self._mode_uses_global_spectrum():
if not self.quiet:
log_output(self.loggers, 1, logging.INFO, 'Fitting global spectrum.')
global_fit_result = self._fit_global_spectrum(obj_to_include=self.obj_to_fit, plot=plot)
if global_fit_result.fit_failed():
# pPXF failed!
model_mask[:] = self.bitmask.turn_on(model_mask[:], 'FIT_FAILED')
model_par['MASK'][:] = self.bitmask.turn_on(model_par['MASK'][:], 'FIT_FAILED')
return self.obj_wave, model_flux, model_mask, model_par
if not self.quiet:
log_output(self.loggers, 1, logging.INFO,
'Number of non-zero templates in fit to global spectrum: {0}'.format(
numpy.sum(global_fit_result.tplwgt > 0)))
#---------------------------------------------------------------
# Initialize the template set according to the iteration mode
if self._mode_uses_global_template():
templates = numpy.dot(global_fit_result.tplwgt, self.tpl_flux).reshape(1,-1)
tpl_to_use = numpy.ones((self.nobj,1), dtype=bool)
templates_rfft = numpy.fft.rfft(templates, self.tpl_npad, axis=1)
elif self._mode_uses_nonzero_templates():
templates = self.tpl_flux
tpl_to_use = numpy.array([global_fit_result.tplwgt > 0]*self.nobj) & self.usetpl
templates_rfft = self.tpl_rfft
elif self._mode_uses_all_templates():
templates = self.tpl_flux
tpl_to_use = self.usetpl
templates_rfft = self.tpl_rfft
else:
raise ValueError('Unrecognized iteration mode: {0}'.format(self.iteration_mode))
# #---------------------------------------------------------------
# # Print heading of fitted kinematics for each bin
# if not self.quiet:
# log_output(self.loggers, 2, logging.INFO, '{0:>5}'.format('INDX')
# + ' {:>7} {:>7}'.format('SWAVE', 'EWAVE')
# + (' {:>9}'*self.moments).format(*(['KIN{0}'.format(i+1)
# for i in range(self.moments)]))
# + ' {0:>5} {1:>9} {2:>5} {3:>4}'.format('NZTPL', 'CHI2', 'RCHI2', 'STAT'))
# Fit all spectra
result = self._fit_all_spectra(templates, templates_rfft, tpl_to_use, plot=plot,
plot_file_root=plot_file_root)
#---------------------------------------------------------------
# Save the results
model_flux, model_mask, model_par \
= self._save_results(global_fit_result, templates, templates_rfft, result,
model_mask, model_par)
if not self.quiet:
log_output(self.loggers, 1, logging.INFO, 'pPXF finished')
return self.obj_wave, model_flux, model_mask, model_par
[docs]
@staticmethod
def obj_tpl_pixelmatch(velscale, tpl_wave, velscale_ratio=None, dvtol=1e-10):
"""
Confirm that the pixel scale of the template and object data are
the same within some tolerance, accounting for and input ratio
of the two.
"""
_velscale_ratio = 1 if velscale_ratio is None else velscale_ratio
return numpy.absolute(velscale - spectrum_velocity_scale(tpl_wave)*_velscale_ratio) < dvtol
[docs]
@staticmethod
def fitting_mask(tpl_wave, obj_wave, velscale, velscale_ratio=None, waverange=None,
velocity_offset=None, max_velocity_range=400., alias_window=None,
loggers=None, quiet=False):
"""
Return a list of pixels in the object spectrum to be fit using pPXF.
Be clear between velocity (ppxf) vs. redshift (cz) !
The limits applied to the fitted pixels are:
- Apply the provided wavelength range limit (*waverange*).
- pPXF will only allow a fit when the number of template
pixels is the same as or exceeds the number of pixels in
the object spectrum. The first step toward limiting
template spectra that are too long is to truncate the blue
and red edges that likely won't be used given the provided
velocity offsets (*velocity_offset*) and the expected
velocity range (*max_velocity_range*).
- Remove leading and trailing pixels that will cause alias
problems during the convolution with the LOSVD
(*alias_window*).
Args:
obj_wave (array): Wavelength vector of the object spectrum to be
fit.
tpl_wave (array): Wavelength vector of the template library to
fit to the object spectrum.
velscale (float): Velocity scale of the pixel.
velscale_ratio (int): (**Optional**) Ratio of the object
velscale to the template velscale. Default is 1 (i.e.
the two have the same pixel scale).
waverange (array): (**Optional**) Lower and upper wavelength
limits to *include* in the analysis. The array can
either define a single wavelength range -- shape is (2,)
-- or a set of wavelength ranges -- shape is (n,2).
Default is to apply no wavelength range limitation.
velocity_offset (array): (**Optional**) Vector with the
velocity offset (expected or actual) between the
template and the object spectrum in km/s. Used to
estimate which wavelengths can be removed from the
template. This can be a single offset or a set of
offsets. If both waverange and velocity_offset are 2D
arrays, the number of wavelength ranges and offsets must
be the same. Default is that there is no velocity
offset.
max_velocity_range (float): (**Optional**) Maximum range
(+/-) expected for the fitted velocities in km/s.
Default is 400 km/s.
alias_window (float) : (**Optional**) The window to mask to
avoid aliasing near the edges of the spectral range in
km/s. Default is six times *max_velocity_range*.
loggers (list): (**Optional**) List of `logging.Logger`_
objects to log progress; ignored if quiet=True. Logging
is done using :func:`mangadap.util.log.log_output`.
Default is no logging.
quiet (bool): (**Optional**) Suppress all terminal and
logging output. Default is False.
Returns:
numpy.ndarray: Four boolean vectors are returned:
- flags for pixels to include in the fit
- flags for pixels that were excluded because they were
outside the designated wavelength range
- flags for pixels that were excluded to ensure the
proper length of the template spectrum wrt the object
spectrum.
- flags for pixels that were truncated to avoid
convolution aliasing
Raises:
ValueError: Raised if (1) no pixels are valid for the fit,
(2) the template and object do not have overlapping
spectral regions given the expected velocity offset
between the two, or (3) the turncation to deal with
aliasing removes all remaining pixels.
"""
# Check the input
_velocity_offset = numpy.zeros(1, dtype=float) if velocity_offset is None \
else numpy.atleast_1d(velocity_offset)
_alias_window = 6*max_velocity_range if alias_window is None else alias_window
# 1. Apply the wavelength range limit, if provided
now=len(obj_wave) # Number of object wavelengths
if not quiet:
log_output(loggers, 1, logging.INFO,
'Original number of object pixels: {0}'.format(now))
if waverange is not None and len(waverange) == 2:
fit_indx = numpy.ones(now, dtype=bool)
if waverange[0] is not None:
fit_indx &= (obj_wave > waverange[0])
if waverange[1] is not None:
fit_indx &= (obj_wave < waverange[1])
if numpy.sum(fit_indx) == 0:
raise ValueError('Selected wavelength range for analysis contains no pixels!')
else:
fit_indx = numpy.full(now, True, dtype=bool)
waverange_mask = numpy.invert(fit_indx)
# Minimum and maximum redshift about primary offsets
c = astropy.constants.c.to('km/s').value
z_min = (numpy.amin(_velocity_offset) - max_velocity_range)/c
z_max = (numpy.amax(_velocity_offset) + max_velocity_range)/c
if not quiet:
log_output(loggers, 1, logging.INFO,
'Minimum/Maximum redshift: {0}/{1}'.format(z_min, z_max))
# 2. If the number of template pixels is not >= number of fitted galaxy pixels,
# further limit the blue and red edges of the galaxy spectra.
# **This must account for the relative pixel scale.**
now=numpy.sum(fit_indx) # Number of good object pixels
_velscale_ratio = 1 if velscale_ratio is None else int(velscale_ratio)
ntw=len(tpl_wave)//_velscale_ratio # Number of template pixels
if not quiet:
log_output(loggers, 1, logging.INFO,
'After selecting the wavelength range to analyze: {0}'.format(now))
log_output(loggers, 1, logging.INFO,
'Number of template pixels (in units of the galaxy pixel scale): {0}'.format(ntw))
if ntw < now:
# Indices of wavelengths redward of the redshifted template
# TODO: Change this to the rigorous calculation of the pPXF
# velocity: see
indx = obj_wave > tpl_wave[0]*(1. + z_min)
if numpy.sum(indx) == 0:
raise ValueError('No overlapping wavelengths between galaxy and template!')
if not quiet:
log_output(loggers, 1, logging.INFO,
'Initial wavelength of template: {0}'.format(tpl_wave[0]))
log_output(loggers, 1, logging.INFO,
'Initial wavelength of redshifted template: {0}'.format(
tpl_wave[0]*(1. + z_min)))
log_output(loggers, 1, logging.INFO,
'Initial wavelength of spectrum: {0}'.format(obj_wave[0]))
log_output(loggers, 1, logging.INFO,
'Pixels blueward of redshifted template: {0}'.format(
len(obj_wave)-numpy.sum(indx)))
# Merge with current index
fit_indx &= indx
if numpy.sum(fit_indx) == 0:
raise ValueError('No overlapping wavelengths between galaxy and template!')
now_= numpy.sum(fit_indx)
if not quiet:
log_output(loggers, 1, logging.INFO,
'After merging with the specified wavelength range: {0}'.format(now_))
else:
now_ = now
# New number of good object pixels
if ntw < now_:
fit_indx[ numpy.where(fit_indx)[0][ntw:] ] = False # Truncate the red edge as well
if not quiet:
log_output(loggers, 1, logging.INFO,
'Limit to at most the number of template pixels: {0} {1}'.format(
numpy.sum(fit_indx), ntw))
npix_mask = numpy.invert(fit_indx) & numpy.invert(waverange_mask)
# 3. Limit wavelength range to avoid aliasing problems in the template convolution
nalias = int(numpy.floor(_alias_window/velscale)) # Number of pixels to mask
# Mask to the range that should be unaffected by alias errors
waverange_tpl = [ tpl_wave[nalias*_velscale_ratio]*(1+z_max),
tpl_wave[(ntw-nalias)*_velscale_ratio-1]*(1+z_min) ]
if not quiet:
log_output(loggers, 1, logging.INFO,
'Mask to these wavelengths to avoid convolution aliasing: {0} - {1}'.format(
waverange_tpl[0], waverange_tpl[1]))
indx = numpy.logical_and(obj_wave > waverange_tpl[0], obj_wave < waverange_tpl[1])
# Merge with current index
fit_indx &= indx
if numpy.sum(fit_indx) == 0:
raise ValueError('No wavelengths available in this range!')
if not quiet:
log_output(loggers, 1, logging.INFO,
'Final wavelength range to fit: {0} {1}'.format(obj_wave[fit_indx][0],
obj_wave[fit_indx][-1]))
alias_mask = numpy.invert(fit_indx) & numpy.invert(npix_mask)
return fit_indx, waverange_mask, npix_mask, alias_mask
[docs]
@staticmethod
def ppxf_tpl_obj_voff(tpl_wave, obj_wave, velscale, velscale_ratio=None):
"""
Determine the pseudo offset in velocity between the template and
object spectra, just due to the difference in the starting
wavelengths.
This calculation is independent of the base of the logarithm used
the sampling of the spectra.
Assumes wavelengths are logarithmically binned.
Args:
tpl_wave (numpy.ndarray): Wavelength vector for the template
library to fit to the object spectrum.
obj_wave (numpy.ndarray): Wavelength vector for the object
spectrum to be fit.
velscale (float): Velocity step per pixel in km/s for the
**object** spectrum.
velscale_ratio (int): (**Optional**) The **integer** ratio
between the velocity scale of the pixel in the galaxy
data to that of the template data. This is used only
when constructing the template library. Default is
None, which is the same as assuming that the velocity
scales are identical.
Returns:
float: Velocity offset in km/s between the initial wavelengths
of the template and object spectra.
.. todo::
- Implement a check that calculates the velocity ratio directly?
"""
dlogl = numpy.log(obj_wave[0])-numpy.log(tpl_wave[0]) if velscale_ratio is None \
else numpy.log(obj_wave[0])-numpy.mean(numpy.log(tpl_wave[0:velscale_ratio]))
return dlogl*velscale / numpy.diff(numpy.log(obj_wave[0:2]))[0]
[docs]
@staticmethod
def check_templates(tpl_wave, tpl_flux, tpl_sres=None, velscale_ratio=None):
r"""
Check that the input template data is valid for use with
pPXFFit.
"""
if len(tpl_wave.shape) != 1:
raise ValueError('Input template wavelengths must be a vector; all spectra should '
'have the same wavelength solution.')
if isinstance(tpl_flux, numpy.ma.MaskedArray):
raise TypeError('Template spectra cannot be masked arrays!')
if tpl_flux.shape[1] != len(tpl_wave):
raise ValueError('The template spectra fluxes must have the same length as the '
'wavelength array.')
if tpl_sres is not None and tpl_sres.shape != tpl_wave.shape:
raise ValueError('Provided template resolution vector does not have the correct shape.')
# Force the number of template pixels to be an integer number of
# object pixels
if velscale_ratio is not None and velscale_ratio > 1:
npix_tpl = len(tpl_wave) - len(tpl_wave) % velscale_ratio
_tpl_wave = tpl_wave[:npix_tpl]
_tpl_flux = tpl_flux[:,:npix_tpl]
_tpl_sres = None if tpl_sres is None else tpl_sres[:npix_tpl]
else:
_tpl_wave = tpl_wave
_tpl_flux = tpl_flux
_tpl_sres = tpl_sres
_tpl_sres = None if _tpl_sres is None \
else SpectralResolution(_tpl_wave, _tpl_sres, log10=True)
# TODO: Allow spectral resolution to be spectrum dependent?
return _tpl_wave, _tpl_flux, _tpl_sres
[docs]
@staticmethod
def check_objects(obj_wave, obj_flux, obj_ferr=None, obj_sres=None):
r"""
Check that the input object data is valid for use with pPXFFit.
Args:
obj_wave (numpy.ndarray): 1D vector of object wavelengths in
angstroms. Does NOT need to be same as the template
wavelengths.
obj_flux (numpy.ndarray): :math:`N_{\rm spec}\times N_{\rm
pix}` array of object spectra to fit. Can be a
numpy.ma.MaskedArray.
obj_ferr (numpy.ndarray): (**Optional**) :math:`N_{\rm
spec}\times N_{\rm pix}` array with the errors in the
object spectra. Can be a numpy.ma.MaskedArray.
obj_sres (numpy.ndarray): (**Optional**) 1D or 2D array with
the spectral resolution (:math:`R =
\lambda/\Delta\lambda`) at each wavelength for (each of)
the object spectra. Default is the resolution is not
provided and assumed to be same as the template
resolution.
Returns:
:obj:`tuple`: Returns four arrays: the object wavelength
vector, the object flux array, the object flux error and
the object spectral resolution. The latter two objects
can be None if the relevant object is None on input. All
arrays have the same shape as the input ``obj_flux``. If
``obj_sres`` is input as a 1D vector, the spectral
resolution is repeated for all input flux vectors. All
input arrays are **copies** of the input.
"""
if len(obj_wave.shape) != 1:
raise ValueError('Input object wavelengths must be a vector; all spectra should '
'have the same wavelength solution.')
if obj_flux.shape[1] != len(obj_wave):
raise ValueError('The object spectra fluxes must have the same length as the '
'wavelength array.')
_obj_flux = obj_flux.copy() if isinstance(obj_flux, numpy.ma.MaskedArray) \
else numpy.ma.MaskedArray(obj_flux, copy=True)
if obj_ferr is not None and obj_ferr.shape != _obj_flux.shape:
raise ValueError('The shape of any provided error array must match the flux array.')
_obj_ferr = numpy.ma.MaskedArray(numpy.ones(_obj_flux.shape, dtype=float),
mask=numpy.ma.getmaskarray(_obj_flux).copy()) \
if obj_ferr is None else \
(obj_ferr.copy() if isinstance(obj_ferr, numpy.ma.MaskedArray) \
else numpy.ma.MaskedArray(obj_ferr.copy()))
if obj_sres is not None and obj_sres.shape != obj_wave.shape \
and obj_sres.shape != _obj_flux.shape:
raise ValueError('Provided object resolution vector does not have the correct shape.')
_obj_sres = None if obj_sres is None else obj_sres.copy()
if _obj_sres is not None and _obj_sres.shape != _obj_flux.shape:
_obj_sres = numpy.array([_obj_sres]*_obj_flux.shape[0])
return obj_wave, _obj_flux, _obj_ferr, _obj_sres
[docs]
@staticmethod
def check_pixel_scale(tpl_wave, obj_wave, velscale_ratio=None, dvtol=1e-10):
"""
Confirm that the pixel scale of the template and object spectra
are identical within a certain tolerance, accounting for an
input pixel-scale ratio. Returns the velocity scale of the
object spectra and the velocity scale ratio wrt the template
spectra.
"""
# Get the pixel scale
velscale = spectrum_velocity_scale(obj_wave)
_velscale_ratio = 1 if velscale_ratio is None else int(velscale_ratio)
if not PPXFFit.obj_tpl_pixelmatch(velscale, tpl_wave, velscale_ratio=_velscale_ratio,
dvtol=dvtol):
raise ValueError('Pixel scale of the object and template spectra must be identical.')
return velscale, _velscale_ratio
[docs]
@staticmethod
def losvd_limits(velscale):
r"""
Return the limits on the LOSVD parameters used by pPXF.
- Velocity limits are :math:`\pm 2000` km/s
- Velocity-disperison limits are from 1/10 pixels to 1000 km/s
- Limits of the higher orders moments are from -0.3 to 0.3
"""
velocity_limits = numpy.array([-2e3, 2e3])
sigma_limits = numpy.array([0.01*velscale, 1e3])
gh_limits = numpy.array([-0.3, 0.3])
return velocity_limits, sigma_limits, gh_limits
[docs]
@staticmethod
def reject_model_outliers(obj_flux, ppxf_result, rescale=False, local_sigma=False, boxcar=None,
nsigma=3.0, niter=9, loggers=None, quiet=False):
if boxcar is None and local_sigma:
raise ValueError('For local sigma determination, must provide boxcar size.')
model_flux = PPXFFit.compile_model_flux(obj_flux, ppxf_result, rescale=rescale)
if local_sigma:
if not quiet:
log_output(loggers, 1, logging.INFO,
'Rejecting using local sigma (boxcar is {0} pixels).'.format(boxcar))
residual = obj_flux-model_flux # This should be masked where the data were not fit
bf = BoxcarFilter(boxcar, lo=nsigma, hi=nsigma, niter=niter, y=residual,
local_sigma=True)
obj_flux[bf.output_mask] = numpy.ma.masked
return obj_flux
if not quiet:
log_output(loggers, 1, logging.INFO, 'Rejecting full spectrum outliers.')
for i in range(niter):
residual = obj_flux-model_flux # This should be masked where the data were not fit
sigma = numpy.array([numpy.ma.std(residual, axis=1)]*residual.shape[1]).T
indx = numpy.absolute(residual) > nsigma*sigma
old_mask = numpy.ma.getmaskarray(obj_flux).copy()
obj_flux[indx] = numpy.ma.masked
if numpy.sum(numpy.ma.getmaskarray(obj_flux)) == numpy.sum(old_mask):
break
return obj_flux
[docs]
@staticmethod
def compile_model_flux(obj_flux, ppxf_result, rescale=False):
"""
Return the model flux but pulling the models out of the ppxf
results. The model can be rescaled to the data based on the
fitted pixels if rescale is True.
The output array is masked in the spectral regions below and
above the fitted wavelength range; any intervening pixels are
*not* masked, even if they're not included in the fit.
"""
model_flux = numpy.ma.MaskedArray(numpy.zeros(obj_flux.shape, dtype=float),
mask=numpy.ones(obj_flux.shape, dtype=bool))
for i in range(obj_flux.shape[0]):
if ppxf_result[i] is None or ppxf_result[i].fit_failed():
continue
s = ppxf_result[i].start
e = ppxf_result[i].end
g = ppxf_result[i].gpm
scale = optimal_scale(obj_flux[i,s:e][g], ppxf_result[i].bestfit[g]) if rescale else 1.
model_flux[i,s:e] = scale*ppxf_result[i].bestfit
return model_flux
[docs]
@staticmethod
def convert_velocity(v, verr):
r"""
Convert kinematics from pPXF from pixel shifts to redshifts.
pPXF determines the velocity offset by making the approximation
that every pixel (logarithmically binned in wavelength) is a
constant change in velocity. Over large velocity shifts, this
approximation can become poor. An e-mail summary from Michele
Cappellari:
The velocity scale per pixel is input as
.. math::
\delta v = c \delta\ln\lambda = c (\ln\lambda_1 -
\ln\lambda_0)
The velocites output by pPXF are:
.. math::
V = \delta v N_{\rm shift} = c \ln(\lambda_{N_{\rm
shift}}/\lambda_0)
which implies that the relation between PPXF output velocity and
redshift is
.. math::
1 + z = exp(V/c),
which reduces z~vel/c in the low-redshift limit. This function
converts the :math:`V` values provided by pPXF to :math:`cz`
velocities.
.. note::
**IMPORTANT**: After this conversion, you must revert the
velocities back to the "pixel-based velocities" (using
:func:`_revert_velocity`) before using the velocities to
reconstruct the pPXF fitted model.
"""
c=astropy.constants.c.to('km/s').value
return (numpy.exp(v/c)-1.0)*c, verr*numpy.absolute(numpy.exp(v/c))
[docs]
@staticmethod
def revert_velocity(v, verr):
"""
Revert the velocity back to the "pixelized" velocity returned by
pPXF.
Order matters here. The error computation is NOT a true error
propagation; it's just the inverse of the "convert" operation
"""
c=astropy.constants.c.to('km/s').value
_v = c*numpy.log(v/c+1.0)
return _v, verr/numpy.absolute(numpy.exp(_v/c))
[docs]
@staticmethod
def construct_models(tpl_wave, tpl_flux, obj_wave, obj_flux_shape, model_par, select=None,
redshift_only=False, deredshift=False, corrected_dispersion=False,
dvtol=1e-10):
"""
Construct models using the provided set of model parameters.
This is a wrapper for :class:`PPXFModel`.
Only the shape of the object data is needed, not the data
itself.
Allows for a replacement template library that must have the
same shape as :attr:`tpl_flux`.
The input velocities are expected to be cz, not "ppxf"
(pixelized) velocities.
If redshift_only is true, the provided dispersion is set to 1e-9
km/s, which is numerically identical to 0 (i.e., just shifting
the spectrum) in the tested applications. However, beware that
this is a HARDCODED number.
To convolve the model to the corrected dispersion, instead of
the uncorrected dispersion, set corrected_dispersion=True.
Correction *always* uses SIGMACORR_EMP data.
"""
if redshift_only and corrected_dispersion:
raise ValueError('The redshift_only and corrected_dispersion are mutually exclusive.')
if deredshift:
raise NotImplementedError('Cannot yet deredshift models.')
# Check the spectral sampling
velscale_ratio = int(numpy.around(spectrum_velocity_scale(obj_wave)
/ spectrum_velocity_scale(tpl_wave)))
_velscale, _velscale_ratio \
= PPXFFit.check_pixel_scale(tpl_wave, obj_wave, velscale_ratio=velscale_ratio,
dvtol=dvtol)
# Check the input spectra
obj_flux = numpy.zeros(obj_flux_shape, dtype=float)
_obj_wave, _obj_flux, _, _ = PPXFFit.check_objects(obj_wave, obj_flux)
nobj = _obj_flux.shape[0]
_tpl_wave, _tpl_flux, _ = PPXFFit.check_templates(tpl_wave, tpl_flux,
velscale_ratio=_velscale_ratio)
ntpl = _tpl_flux.shape[0]
# Check the shape of the input model parameter database
if model_par['BINID'].size != nobj:
raise ValueError('Incorrect number of model-parameter sets.')
if model_par['TPLWGT'].shape[1] != ntpl:
raise ValueError('The number of weights does not match the number of templates.')
# Get the input pixel shift between the object and template
# wavelength vectors; interpretted by pPXF as a base velocity
# shift between the two
vsyst = numpy.array([ -PPXFFit.ppxf_tpl_obj_voff(_tpl_wave, _obj_wave[s:e], _velscale,
velscale_ratio=_velscale_ratio)
if e > s else 0.0
for s,e in zip(model_par['BEGPIX'], model_par['ENDPIX'])])
# Get the additive and multiplicative degree of the polynomials
degree = model_par['ADDCOEF'].shape
degree = -1 if len(degree) == 1 else degree[1]-1
mdegree = model_par['MULTCOEF'].shape
mdegree = 0 if len(mdegree) == 1 else mdegree[1]
moments = model_par['KIN'].shape[1]
# Only produce selected models
skip = numpy.zeros(nobj, dtype=bool) if select is None else numpy.logical_not(select)
# Instantiate the output model array
models = numpy.ma.zeros(_obj_flux.shape, dtype=float)
# Initially mask everything
models[:,:] = numpy.ma.masked
# Set the kinematics
kin = model_par['KIN'].copy()
kin[:,0],_ = PPXFFit.revert_velocity(model_par['KIN'][:,0], model_par['KINERR'][:,0])
if redshift_only:
kin[:,1] = 1e-9
elif corrected_dispersion:
kin[:,1] = numpy.ma.sqrt(numpy.square(model_par['KIN'][:,1])
- numpy.square(model_par['SIGMACORR_EMP'])).filled(1e-9)
# if deredshift:
# kin[:,0] = 0.0
# Construct the model for each (selected) object spectrum
for i in range(nobj):
if skip[i]:
continue
print('Constructing model for spectrum: {0}/{1}'.format(i+1,nobj), end='\r')
# This has to be redeclared every iteration because the
# starting and ending pixels might be different (annoying);
# as will the velocity offset; this means that the FFT of
# the templates is recalculated at every step...
f = PPXFModel(_tpl_flux.T,
_obj_flux.data[i,model_par['BEGPIX'][i]:model_par['ENDPIX'][i]],
_velscale, velscale_ratio=_velscale_ratio, vsyst=vsyst[i],
moments=moments, degree=degree, mdegree=mdegree)
models[i,model_par['BEGPIX'][i]:model_par['ENDPIX'][i]] \
= f(kin[i,:], model_par['TPLWGT'][i,:],
addpoly=None if degree < 0 else model_par['ADDCOEF'][i,:],
multpoly=None if mdegree < 1 else model_par['MULTCOEF'][i,:])
print('Constructing model for spectrum: {0}/{0}'.format(nobj))
return models