# Licensed under a 3-clause BSD style license - see LICENSE.rst
# -*- coding: utf-8 -*-
"""
Implements an emission-line profile fitting class.
.. warning::
Although it may still be possible to use :class:`Elric`, it is
currently not actively used by the survey-level operation of the
MaNGA DAP. See instead :class:`mangadap.proc.sasuke.Sasuke`.
----
.. include license and copyright
.. include:: ../include/copy.rst
----
.. include common links, assuming primary doc root is up one directory
.. include:: ../include/links.rst
"""
import os
import time
import logging
import numpy
from scipy import interpolate, integrate, optimize
from scipy.special import erf
import astropy.constants
from astropy.modeling.polynomial import Legendre1D
from ..par.parset import KeywordParSet
from ..par.emissionlinedb import EmissionLineDB
from ..util.fileio import init_record_array
from ..util.log import log_output
from ..util.pixelmask import SpectralPixelMask
from ..util import lineprofiles
from .spatiallybinnedspectra import SpatiallyBinnedSpectra
from .stellarcontinuummodel import StellarContinuumModel
from .spectralfitting import EmissionLineFit
from .util import sample_growth
from matplotlib import pyplot
# Add strict versioning
# from distutils.version import StrictVersion
# Main fitting engine --------------------------------------------------
[docs]
class LineProfileFit:
r"""
Simultaneously fit multiple line profiles. Currently only allows
one to fit using an
:class:`~mangadap.util.lineprofiles.NCompLineProfile` object. The
fitting algorithm used is `scipy.optimize.least_squares`_ with
fitting method 'trf' to allow for bounds.
.. todo::
For multicomponent lines, set the first normalization to be the
normalization for the sum of all components, then force the
normalization of the subcomponents to be ordered from highest to
lowest and bounded from 0 to 1.
Args:
x (array-like):
1D vector with the independent variable
y (array-like):
1D vector with the dependent variable
profile_list (:obj:`list`, :class:`~mangadap.util.lineprofiles.NCompLineProfile`):
The profile(s) to fit to the dependent variable.
base_order (int): (**Optional**) The order of the Legendre
polynomial to include in the model for the baseline trend in
y below the fitted line profile(s).
error (array-like, optional):
1D vector with the error in the dependent variables. If
not provided, no error weighting is performed during the
fitting process, and the covariance *will not* be
constructed.
mask (array-like, optional):
1D boolean array used to ignore values in `y` during the
fit.
par (array-like, optional):
1D vector with the initial guess for model parameters.
The number of parameters much match the expectation based
on the provided list of profiles and the order of the
baseline polynomial. If not provided, the parameters are
initialized to 0.
fixed (array-like, optional):
1D vector with flags used to fix parameters during the
fit. The number of parameters much match the expectation
based on the provided list of profiles and the order of
the baseline polynomial. If not provided, all parameters
are freely fit.
bounds (:obj:`tuple`): (**Optional**) 2-tuple with two
array-like elements giving the upper and lower bound for
each parameter. The length of each array element must
match the number of parameters. For an unbounded problem,
set ``bounds=None``, or use numpy.inf with an appropriate
sign to disable bounds on all or some variables.
run_fit (bool): (**Optional**) Flag to run the fit upon
instantiation of the object, which defaults to True. If set
to False, the object is initialized but the fit is not
executed, and can be executed later using :func:`fit`.
construct_covariance (bool): (**Optional**) Flag to construct
the covariance matrix based on the result object provided by
`scipy.optimize.least_squares`_, which defaults to True. If
set to False, the covariance matrix is set to None.
verbose (int): (**Optional**) Verbosity level for
`scipy.optimize.least_squares`_; default is 0.
Attributes:
x (`numpy.ndarray`_): Independent variable of length :math:`M`.
y (`numpy.ndarray`_): Dependent variable to be fit of length
:math:`M`.
err (`numpy.ndarray`_): Error, :math:`\sigma`, in the dependent
variable of length :math:`M`.
mask (`numpy.ndarray`_): Flag to fit dependent variables of length
:math:`M`.
nlines (int): Number of lines being fit.
base_order (int): The order of the Legendre polynomial include
in the model; see `astropy.modeling.polynomial.Legendre1D`_.
model (`astropy.modeling.models.CompoundModel`_): The compound
model being fit composed of the line profiles and the
baseline (if a baseline is being fit). That is, this
defines the function :math:`f(\mathbf{x}|\mathbf{\theta})`,
where :math:`\mathbf{x}` is the dependent variable and
:math:`\mathbf{\theta}` is the list of variables. The
fitting algorithm minimizes the (error-weighted) residuals
to approximate :math:`y=f(\mathbf{x}|\mathbf{\theta})`.
npar (int): Total number of parameters in the model. The is the
number of parameters per line and the number of parameters
included in the baseline.
nfitpar (int): Number of *free* parameters, defined as
:math:`N`.
par (`numpy.ndarray`_): Full list of model parameters, including
those parameters that have been fixed.
fixed (`numpy.ndarray`_): Flags to fit (False) or fix (True) a
given parameter.
bounds (:obj:`tuple`): 2-tuple with two array-like
elements giving the upper and lower bound for each
parameter. For an unbounded problem, this is set to
``bounds=(-numpy.inf,numpy.inf)``.
result (`scipy.optimize.OptimizeResult`_): Object with the
results from `scipy.optimize.least_squares`_. The
best-fitting parameters, :math:`\mathbf{\theta}`, is
returned as ``result.x``.
cov (`numpy.ndarray`_): The formal covariance matrix for the fit.
The `scipy.optimize.OptimizeResult`_ object provides the
Jacobian of the model, an :math:`M \times N` array with
elements
.. math::
J_{ij} = \left.\frac{\partial f_i}{\partial
\theta_j}\right|_{\mathbf{\theta}}
at location in parameter space of the best-fitting model.
This is used to construct the covariance matrix by taking
the inverse of the curvature matrix:
.. math::
\mathbf{\alpha}_{kl} = \left[\frac{1}{\sigma_{i}}
J_{ik}\right]^{\rm T} \left[\frac{1}{\sigma_{i}}
J_{il}\right].
That is, :math:`\mathbf{C} = \mathbf{\alpha}^{-1}`.
Raises:
TypeError:
Raised if the provided profile objects are not instances
of :class:`~mangadap.util.lineprofiles.NCompLineProfile`.
ValueError:
Raised if any of the provided parameter arrays (par,
fixed) are not one-dimensional or the number of
parameters is not as expected based on the number of
profile and baseline parameters.
.. todo::
- Provide a better initialization for the parameters.
- Need to provide the best-fitting parameters and errors as full
vectors, including the fixed parameters.
"""
def __init__(self, x, y, profile_list, base_order=None, error=None, mask=None, par=None,
fixed=None, bounds=None, run_fit=True, construct_covariance=True, verbose=0):
# Check the input list of profiles
_profile_list = profile_list if isinstance(profile_list, list) else [profile_list]
self.nlines = len(_profile_list)
for i in range(self.nlines):
if not isinstance(_profile_list[i], lineprofiles.NCompLineProfile):
raise TypeError('LineProfileFit only works with NCompLineProfile objects.')
# Construct and check the parameter vectors
self.npar = numpy.sum([ p.npar for p in _profile_list])
self.base_order = -1 if base_order is None else base_order
if self.base_order > -1:
self.npar += (self.base_order + 1)
if par is not None and len(par.shape) != 1:
raise ValueError('Input parameter array should only be one-dimensional.')
if fixed is not None and fixed.shape != par.shape:
raise ValueError('Input parameter and fixed flags should have the same shape.')
_par = None if par is None else par.ravel()
if _par is not None and len(_par) != self.npar:
raise ValueError('Incorrect number of parameters provided')
self.par = numpy.zeros(self.npar, dtype=float) \
if _par is None else _par.astype(float)
self.fixed = numpy.zeros(self.npar, dtype=bool) \
if fixed is None else fixed.astype(bool)
if not isinstance(bounds, tuple):
raise ValueError('Bounds must have type tuple.')
if len(bounds) != 2:
raise ValueError('Bounds must only have two elements.')
if len(bounds[0]) != self.npar or len(bounds[1]) != self.npar:
raise ValueError('Lower and/or upper bounds have an incorrect number of parameters.')
self.bounded = bounds is not None
self.bounds = (-numpy.inf, numpy.inf) if bounds is None else bounds
# Get the number of free parameters
self.nfitpar = numpy.sum(numpy.invert(self.fixed))
# Construct the model
self.model = _profile_list[0].profile
for i in range(1,self.nlines):
self.model += _profile_list[i].profile
if self.base_order > -1:
self.model += Legendre1D(self.base_order)
# Fix the parameters
if numpy.sum(self.fixed) > 0:
self.model.fixed[ self.model.param_names[fixed] ] = True
# print(self.model)
# Need to setup how to tie fluxes, velocities, dispersions
self.x = None
self.y = None
self.err = None
self.mask = None
self.result = None
self.cov = None
if run_fit:
self.fit(x, y, error=error, mask=mask, construct_covariance=construct_covariance,
verbose=verbose)
[docs]
def _assign_par(self, par):
"""
Fixed parameters are ignored!
"""
if len(par) not in [ self.npar, self.nfitpar ]:
raise ValueError('Incorrect number of parameters provided')
self.par[numpy.invert(self.fixed)] = par[numpy.invert(self.fixed)] \
if len(par) == self.npar else par
# Then tie parameters...
[docs]
def _chi(self, par):
return self._resid(par)/self.err
[docs]
def _resid(self, par):
self._assign_par(par)
ymodel = self._quick_sample(self.x)
return (self.y-ymodel)
[docs]
def _calculate_covariance_matrix(self, verbose=0):
if self.result is None or self.err is None:
if self.result is not None:
warnings.warn('Cannot calculate covariance matrix without error vector!')
return None
# invsig = numpy.array([numpy.ma.power(self.err,-1.)]*self.result.jac.shape[1]).T
# a = (invsig.ravel()*self.result.jac.ravel()).reshape(self.result.jac.shape)
# a = numpy.dot(a.T,a)
a = numpy.dot(self.result.jac.T,self.result.jac)
if verbose > 0:
print('Curvature:')
print(' ', a)
try:
return numpy.linalg.inv(a)
except:
return numpy.linalg.pinv(a)
[docs]
def _quick_sample(self, x):
"""
Sample without providing/checking new input parameters.
"""
#print(*self.par.ravel())
return self.model.evaluate(x, *self.par.ravel())
[docs]
def sample(self, x, par=None):
if par is not None:
self._assign_par(par)
return self._quick_sample(x)
[docs]
def fit(self, x, y, error=None, mask=None, construct_covariance=True, verbose=0):
"""
Fit the line profiles provided upon initialization to the data
using `scipy.optimize.least_squares`_.
"""
# Check that the lengths of all the arrays match
if len(x.shape) != 1:
raise ValueError('Can only fit single spectra!')
if x.shape != y.shape \
or (error is not None and error.shape != y.shape) \
or (mask is not None and mask.shape != y.shape):
raise ValueError('All vector shapes must be identical.')
# If the input is a masked array, combine the two masks; the
# first line below works for both normal numpy.ndarrays and
# MaskedArray
_mask = numpy.ma.getmaskarray(y)
if mask is not None:
_mask |= mask
# Set the internal vectors to be numpy.ndarrays that only include the unmasked values
indx = numpy.invert(_mask)
self.x = numpy.array(x[indx])
self.y = numpy.array(y[indx])
self.err = None if error is None else numpy.array(error[indx])
if self.y.size <= self.npar:
raise ValueError('Insufficient data points ({0}) to fit with this function ' \
'(npar={1})!'.format(self.y.size, self.npar))
# print('Initial guess parameters inside LineProfileFit:')
# print(self.par)
# print(self.bounds)
try:
self.result = optimize.least_squares(self._resid if self.err is None else self._chi,
self.par, bounds=self.bounds, jac='2-point',
method='trf', loss='linear', verbose=verbose)
except ValueError as e:
print('ValueError: {0}'.format(e))
print(self.par)
print(self.bounds)
self.result = optimize.OptimizeResult(success=False)
return
# try:
# self.result = optimize.least_squares(self._resid if self.err is None else self._chi,
# self.par, bounds=self.bounds, jac='2-point',
# #method='lm', loss='linear', verbose=1)
# method='trf', loss='linear', verbose=2)
# except:
# exc_info = sys.exc_info()
# warnings.warn('Error during fit: {0}:: {1}'.format(exc_info[0], exc_info[1]))
# self.result = OptimizeResult(x=self.par, cost=None, fun=None, jac=None, grad=None,
# optimality=None, active_mask=None, nfev=None, njev=None,
# status=None, success=False)
# return
self.cov = None
if construct_covariance:
self.cov = self._calculate_covariance_matrix(verbose=verbose)
if verbose > 0:
print('Result: ')
print(' ', self.result.x)
if self.cov is not None:
print('Covariance: ')
print(' ', self.cov)
[docs]
class ElricPar(KeywordParSet):
"""
Elric emission-line fitting parameters.
The defined parameters are:
.. include:: ../tables/elricpar.rst
"""
def __init__(self, emission_lines=None, base_order=None, window_buffer=None,
guess_redshift=None, guess_dispersion=None, minimum_snr=None, pixelmask=None,
stellar_continuum=None):
arr_in_fl = [ numpy.ndarray, list, int, float ]
in_fl = [ int, float ]
pars = [ 'emission_lines', 'base_order', 'window_buffer', 'guess_redshift',
'guess_dispersion', 'minimum_snr', 'pixelmask', 'stellar_continuum' ]
values = [ emission_lines, base_order, window_buffer, guess_redshift, guess_dispersion,
minimum_snr, pixelmask, stellar_continuum ]
defaults = [ None, -1, 25.0, None, None, 0.0, None, None ]
dtypes = [ EmissionLineDB, int, in_fl, arr_in_fl, arr_in_fl, in_fl, SpectralPixelMask,
StellarContinuumModel ]
super(ElricPar, self).__init__(pars, values=values, defaults=defaults, dtypes=dtypes)
[docs]
class TiedLineProfile:
def __init__(self, profile, base_profile, relative_flux=None, mean_separation=None,
relative_stddev=None):
self.profile = profile
self.base_profile = base_profile
self.relative_flux = relative_flux
self.mean_separation = mean_separation
self.relative_stddev = relative_stddev
[docs]
def tie_flux(self):
self.profile.set_flux(self.relative_flux*self.base_profile.flux())
[docs]
def tie_mean(self):
self.profile.set_mean(self.base_profile.moment(order=1)+self.velocity_separation)
[docs]
def tie_stddev(self):
self.profile.set_stddev(self.relative_stddev*self.base_profile.moment(order=2))
[docs]
class ElricFittingWindow:
r"""
A utility class for the fitting windows used by Elric
The class can be instantiated as fully None.
Args:
nlines (int): (**Optional**) The number of lines to be fit
simultaneously.
db_indx (`numpy.ndarray`_): (**Optional**) An array with the
database index (0-based) of each line to be fit.
line_index (`numpy.ndarray`_): (**Optional**) An array with the
index numbers, read from the database, of each line to be
fit.
restwave (`numpy.ndarray`_): (**Optional**) An array with the rest
wavelengths of each line to be fit.
profile_set (`numpy.ndarray`_): (**Optional**) An array with the
profile objects that define the functional form of each
line.
fixed_par (`numpy.ndarray`_): (**Optional**) An array with the
fixed parameters for *all* parameters in the model to be
fit.
bounds (`numpy.ndarray`_): (**Optional**) A two-column array with
the lower (first column) and upper (second column) bounds
for the fit parameters.
log_bounds (`numpy.ndarray`_): (**Optional**) The range of the
boundary should be considered as logarithmic when testing if
a parameter is near its boundary.
output_model (bool): (**Optional**) Include the best-fitting
model in the composite emission-line model for each
spectrum. This is only flagged as true if ALL the emission
lines in the fitting window are to be included according to
the emission-line database.
tied_pairs (`numpy.ndarray`_): (**Optional**) The series of
tied profiles (:class:`TiedLineProfile` objects) that are
used to tie parameters of the model.
tied_funcs (`numpy.ndarray`_): (**Optional**) The member functions
of the :class:`TiedLineProfile` objects that should be
called *sequentially* to tie model parameters.
"""
def __init__(self, nlines=None, db_indx=None, line_index=None, restwave=None, profile_set=None,
fixed_par=None, bounds=None, log_bounds=None, output_model=None, tied_pairs=None,
tied_funcs=None):
self.nlines = nlines
self.db_indx = db_indx
self.line_index = line_index
self.restwave = restwave
self.profile_set = profile_set
self.init_par = numpy.array([ p.par.copy().ravel() for p in self.profile_set ])
self.fixed_par = fixed_par
self.bounds = bounds
self.log_bounds = log_bounds
self.output_model = output_model
self.tied_pairs = tied_pairs
self.tied_funcs = tied_funcs
[docs]
def append(self, db_indx, line_index, restwave, profile, fixed_par, bounds, log_bounds,
output_model):
"""
Append another line to the fitting window. Must be a single line!
"""
# Check that input is for a single line
if not isinstance(db_indx, int) or not isinstance(line_index, (int, numpy.integer)):
raise TypeError('Appended index must be a single integer.')
if not isinstance(restwave, float):
raise TypeError('Appended restwavelength must be a single float.')
if not isinstance(profile, lineprofiles.NCompLineProfile):
raise TypeError('Appended profile must have type NCompLineProfile.')
if len(fixed_par.shape) != 1 or len(fixed_par) != profile.npar:
raise TypeError('Appended fixed parameter must be a vector with the correct length.')
if len(bounds.shape) != 2 or bounds.shape[0] != profile.npar or bounds.shape[1] != 2:
raise TypeError('Appended bounds must have shape: {0}'.format(tuple(profile.npar,2)))
if len(log_bounds.shape) != 1 or len(log_bounds) != profile.npar:
raise TypeError('Appended log_bounds parameter must be a vector of the correct length.')
if not isinstance(output_model, bool):
raise TypeError('Flag to output model must be boolean.')
self.nlines += 1
self.db_indx = numpy.append(self.db_indx, db_indx)
self.line_index = numpy.append(self.line_index, line_index)
self.restwave = numpy.append(self.restwave, restwave)
self.profile_set = numpy.append(self.profile_set, profile)
self.init_par = numpy.append(self.init_par, profile.par.copy(), axis=0)
self.fixed_par = numpy.append(self.fixed_par, fixed_par.astype(bool).reshape(1,-1), axis=0)
self.bounds = numpy.append(self.bounds, bounds.reshape(1,-1,2), axis=0)
self.log_bounds = numpy.append(self.log_bounds, log_bounds.astype(bool).reshape(1,-1),
axis=0)
self.output_model &= output_model
[docs]
def reset_init_mean(self, indx, newmean):
self.profile_set[indx].set_mean(newmean)
self.init_par[indx,:] = self.profile_set[indx].par
[docs]
def reinit_profiles(self):
for i in range(self.nlines):
self.profile_set[i].assign_par(self.init_par[i,:])
[docs]
class Elric(EmissionLineFit):
"""
ELRIC: Emission-Line Regression and Inference Class
https://en.wikipedia.org/wiki/Edward_Elric
Use LineProfileFit to fit the emission-line properties in a set of
spectra.
.. todo::
- Implement some scheme to penalize multicomponent fits at low S/N
"""
def __init__(self, bitmask, wave=None, flux=None, emission_lines=None, error=None, mask=None,
stellar_continuum=None, base_order=-1, window_buffer=25, guess_redshift=None,
guess_dispersion=None, default_dispersion=20.0, run_fit=False, loggers=None,
quiet=False):
EmissionLineFit.__init__(self, 'elric', bitmask)
# Attributes kept by SpectralFitting:
# fit_type='emission_line', bitmask=bitmask, par=None
# Attributes kept by EmissionLineFit:
# fit_method='elric'
# Declare the attributes kept for convenience
self.default_dispersion = default_dispersion
self.loggers=None
self.quiet=None
self.nspec = None
self.nwave = None
self.emission_lines = None
self.neml = None
self.nwindows = None
self.fitting_window = None
self.wave = None
self.sres = None
self.flux = None
self.fluxnc = None
self.error = None
self.mask = None
self.continuum = None
self.no_continuum = None
self.base_order = None
self.window_buffer = None
self.redshift = None
self.dispersion = None
self.bestfit = None
if not run_fit:
return
if wave is None or flux is None or emission_lines is None:
raise ValueError('Cannot run fit without wavelength and flux vectors, and ' \
'emission-line database.')
self.fit(wave, flux, emission_lines, error=error, mask=mask,
stellar_continuum=stellar_continuum, base_order=base_order,
window_buffer=window_buffer, guess_redshift=guess_redshift,
guess_dispersion=guess_dispersion, loggers=loggers, quiet=quiet)
# TODO: Convert this to a DataTable
[docs]
@staticmethod
def _per_fitting_window_dtype(nwin, max_npar, mask_dtype):
r"""
Construct the record array data type for the output fits
extension.
"""
return [ ('BINID',int),
('BINID_INDEX',int),
('MASK', mask_dtype, (nwin,)),
('NPIXFIT',int,(nwin,)),
('PAR',float,(nwin,max_npar)),
('ERR',float,(nwin,max_npar)),
('LOBND',float,(nwin,max_npar)),
('UPBND',float,(nwin,max_npar)),
('FIXED',bool,(nwin,max_npar)),
# ('TIED',int,(nwin,max_npar)),
('IGNORE',bool,(nwin,max_npar)),
('CHI2',float,(nwin,)),
('RCHI2',float,(nwin,)),
('RMS',float,(nwin,)),
('RESID',float,(nwin,7)),
('FRAC_RMS',float,(nwin,)),
('FRAC_RESID',float,(nwin,7))
]
[docs]
@staticmethod
def _find_tied_index(index, emission_lines):
i = numpy.where(emission_lines['index'] == index)[0]
if len(i) > 1:
raise ValueError('Line indices are not unique!')
if emission_lines['mode'][i[0]] == 'f':
return i[0], index
tied = int(emission_lines['mode'][i[0]][1:])
j = numpy.where(emission_lines['index'] == tied)[0]
if len(j) > 1:
raise ValueError('Line indices are not unique!')
return (j[0], tied) if emission_lines['mode'][j[0]] == 'f' \
else Elric._find_tied_index(tied, emission_lines)
[docs]
@staticmethod
def _set_profile_ties(base_profiles, base_restwave, base_fixed_par, tied_profiles,
tied_restwave, tied_fixed_par, mode, flux):
nprof = len(base_profiles)
if len(base_restwave) != nprof or len(tied_profiles) != nprof \
or len(tied_restwave) != nprof or len(mode) != nprof or len(flux) != nprof:
raise ValueError('Must provide same length arrays to _tied_funcs.')
tied_pairs = []
tied_functions = []
for i in range(nprof):
if mode[i][0] == 'w':
continue
tied_pars += [ TiedLineProfile(tied_profiles[i], base_profiles[i]) ]
tied_fixed_par[i,:] |= base_fixed_par[i,:]
if mode[i][0] in [ 'a', 'x' ]:
tied_pairs[i].relative_flux = flux[i]
tied_functions += [ tied_pairs[i].tie_flux ]
tied_fixed_par[i,:] |= tied_pairs[i].profile.fix_flux()
if mode[i][0] in [ 'a', 'k', 'v' ]:
tied_pairs[i].mean_separation \
= (tied_restwave[i]/base_restwave[i]-1)*astropy.constants.c.to('km/s').value
tied_functions += [ tied_pairs[i].tie_mean ]
tied_fixed_par[i,:] |= tied_pairs[i].profile.fix_mean()
if mode[i][0] in [ 'a', 'x', 'k', 's' ]:
tied_pairs[i].relative_stddev = 1.0
tied_functions += [ tied_pairs[i].tie_stddev ]
tied_fixed_par[i,:] |= tied_pairs[i].profile.fix_stddev()
return numpy.array(tied_pairs), numpy.array(tied_functions), tied_fixed_par
[docs]
@staticmethod
def _line_velocity_offset(restwave, restwave_primary):
return (restwave/restwave_primary - 1.0) * astropy.constants.c.to('km/s').value
[docs]
def _parse_emission_line_models(self):
r"""
Parse the input emission-line file into a set of windows --- the
number of windows is :math:`N_{\rm win}`) --- that are fit for
each spectrum.
"""
# Setup the number of windows to fit based on the primary lines
primary_line = self.emission_lines['mode'] == 'f'
primary_index = self.emission_lines['index'][primary_line]
self.nwindows = numpy.sum(primary_line)
if not self.quiet:
log_output(self.loggers, 1, logging.INFO,
'Number of emission lines to fit: {0}'.format(self.neml))
log_output(self.loggers, 1, logging.INFO,
'Number of fitting windows: {0}'.format(self.nwindows))
# Set up the primary line data using the guess parameters from
# the database
self.fitting_window = numpy.empty(self.nwindows, dtype=object)
_db_indx = numpy.arange(self.neml)[primary_line]
for i in range(self.nwindows):
db_indx = numpy.array([_db_indx[i]])
line_index = numpy.array([primary_index[i]])
restwave = numpy.array([ self.emission_lines['restwave'][primary_line][i] ])
profile_set = numpy.array([ lineprofiles.NCompLineProfile(
self.emission_lines['ncomp'][primary_line][i],
par=self.emission_lines['par'][primary_line][i],
profile=eval('lineprofiles.'
+self.emission_lines['profile'][primary_line][i])) ])
fixed_par = self.emission_lines['fix'][primary_line][i].astype(bool).reshape(1,-1)
bounds = numpy.array([ [l,u] \
for l,u in zip(self.emission_lines['lobnd'][primary_line][i],
self.emission_lines['hibnd'][primary_line][i])
]).reshape(1,-1,2)
log_bounds = self.emission_lines['log_bnd'][primary_line][i].astype(bool).reshape(1,-1)
output_model = self.emission_lines['output_model'][primary_line][i]
self.fitting_window[i] = ElricFittingWindow(nlines=1, db_indx=db_indx,
line_index=line_index, restwave=restwave,
profile_set=profile_set,
fixed_par=fixed_par, bounds=bounds,
log_bounds=log_bounds,
output_model=output_model)
# Return if nothing is tied
if self.nwindows == self.neml:
return
# Append additional profiles
for i, e in enumerate(self.emission_lines):
if primary_line[i]:
continue
# Basic check
if e['index'] == int(e['mode'][1:]):
raise ValueError('Cannot tie line to itself!')
# Run up the depenency chain to find the primary line
# associated with this one
indx, tied_index = self._find_tied_index(e['index'], self.emission_lines)
base_indx = numpy.where( primary_index == tied_index)[0][0]
# Append the line to this fitting window
self.fitting_window[base_indx].append(i, e['index'], e['restwave'],
lineprofiles.NCompLineProfile(e['ncomp'],
par=e['par'],
profile=eval('lineprofiles.'
+e['profile'])),
e['fix'],
numpy.array([ [l,u] for l,u in zip(e['lobnd'], e['hibnd']) ]).reshape(-1,2),
e['log_bnd'], bool(e['output_model']))
# Set the guess velocity of the profile based on the
# velocity relative to the main line
self.fitting_window[base_indx].reset_init_mean(-1,
self._line_velocity_offset(self.fitting_window[base_indx].restwave[-1],
self.fitting_window[base_indx].restwave[0]))
# Finalize by setting up any tied parameters
for i in range(self.nwindows):
if self.fitting_window[i].nlines == 1:
continue
# Basic debug check
if self.emission_lines['mode'][self.fitting_window[i].db_indx[0]] != 'f':
raise ValueError('DEBUG')
# The order is important. Start with the primary line:
tied = numpy.zeros(self.fitting_window[i].nlines, dtype=bool)
tied[0] = True # Primary line already "tied" to the set
# Iterate until all lines are tied
while numpy.sum(tied) != self.fitting_window[i].nlines:
# Find the indices of lines that are directly tied to
# any previously tied lines. At the beginning, this
# only selects lines that are tied to the primary line.
indx = numpy.where([
not tied[j] and numpy.any(int(mode[1:]) \
== self.fitting_window[i].line_index[tied]) \
for j,mode in enumerate(
self.emission_lines['mode'][self.fitting_window[i].db_indx[:]])
])[0]
# For these lines, find the indices in the list of lines
# for this window with their associated lines to tie to
tied_indx = [ int(mode[1:]) for mode in \
self.emission_lines['mode'][self.fitting_window[i].db_indx[indx]]]
profile_indx = [ numpy.where(t == self.fitting_window[i].line_index[:])[0][0] \
for t in tied_indx ]
# Run the maing function that sets up the seried of
# tying classes/functions and updates the parameters
# that should be "fixed" during the fit.
_tied_pairs, _tied_funcs, _fixed_par \
= self._set_profile_ties(self.fitting_window[i].profile_set[profile_indx],
self.fitting_window[i].restwave[profile_indx],
numpy.take(self.fitting_window[i].fixed_par, profile_indx, axis=0),
self.fitting_window[i].profile_set[indx],
self.fitting_window[i].restwave[indx],
numpy.take(self.fitting_window[i].fixed_par, indx, axis=0),
self.emission_lines['mode'][self.fitting_window[i].db_indx[indx]],
self.emission_lines['flux'][self.fitting_window[i].db_indx[indx]])
# Add these to the main output
self.fitting_window[i].tied_pairs = numpy.append(self.fitting_window[i].tied_pairs,
_tied_pairs)
self.fitting_window[i].tied_funcs = numpy.append(self.fitting_window[i].tied_funcs,
_tied_funcs)
for j, jj in enumerate(indx):
self.fitting_window[i].fixed_par[jj,:] = _fixed_par[j,:]
# These lines are now tied!
tied[indx] = True
[docs]
@staticmethod
def _velocity_vectors(wave, fitting_window):
return numpy.array([ (wave/fw.restwave[0]-1.0)*astropy.constants.c.to('km/s').value \
for fw in fitting_window ])
[docs]
@staticmethod
def _fit_masks(wave, fitting_window, redshift, window_buffer):
# Input redshift must be a single value
if not isinstance(redshift, float):
raise TypeError('redshift must be a single float')
nwindows = len(fitting_window)
fitting_mask = numpy.array([ numpy.logical_and(
wave>(fw.restwave[0]-window_buffer)*(1+redshift),
wave<(fw.restwave[0]+window_buffer)*(1+redshift)) \
for fw in fitting_window ])
# print(fitting_mask.shape)
for i in range(nwindows):
if len(fitting_window[i].restwave) == 1:
continue
fitting_mask[i,:] |= numpy.any( numpy.array([ numpy.logical_and(
wave>(rw-window_buffer)*(1+redshift),
wave<(rw+window_buffer)*(1+redshift)) \
for rw in fitting_window[i].restwave ]), axis=0)
return fitting_mask
# @staticmethod
# def _check_db(emission_lines):
#
# # Check the input type
# if not isinstance(emission_lines, EmissionLineDB):
# raise TypeError('Input database must have type EmissionLineDB.')
#
# # Check the database itself
# neml = emission_lines.nsets
# for i in range(neml):
# profile = eval(emission_lines['profile'][i])
# npar = len(profile.param_names)
# if emission_lines['par'][i].size != npar*emission_lines['ncomp'][i]:
# raise ValueError('Provided {0} parameters, but expected {1}.'.format(
# emission_lines['par'][i].size, npar*emission_lines['ncomp'][i]))
# if emission_lines['fix'][i].size != npar*emission_lines['ncomp'][i]:
# raise ValueError('Provided {0} fix flags, but expected {1}.'.format(
# emission_lines['fix'][i].size, npar*emission_lines['ncomp'][i]))
# if numpy.any([f not in [0, 1] for f in emission_lines['fix'][i] ]):
# warnings.warn('Fix values should only be 0 or 1; non-zero values interpreted as 1.')
# if emission_lines['lobnd'][i].size != npar*emission_lines['ncomp'][i]:
# raise ValueError('Provided {0} lower bounds, but expected {1}.'.format(
# emission_lines['lobnd'][i].size,
# npar*emission_lines['ncomp'][i]))
# if emission_lines['hibnd'][i].size != npar*emission_lines['ncomp'][i]:
# raise ValueError('Provided {0} upper bounds, but expected {1}.'.format(
# emission_lines['hibnd'][i].size,
# npar*emission_lines['ncomp'][i]))
# if emission_lines['log_bnd'][i].size != npar*emission_lines['ncomp'][i]:
# raise ValueError('Provided {0} log boundaries designations, but expected '
# '{1}.'.format(emission_lines['log_bnd'][i].size,
# npar*emission_lines['ncomp'][i]))
[docs]
@staticmethod
def _is_near_bound(par, lbnd, ubnd, logbounded, rtol=1e-2, atol=1e-4):
"""
Determine of any of the parameters within start and start+npar
are "near" an imposed boundary.
"""
# print('par: ', par)
# print('lbnd: ', lbnd)
lbounded = numpy.invert(numpy.isinf(lbnd))
# print('L bounded: ', lbounded)
# print('ubnd: ', ubnd)
ubounded = numpy.invert(numpy.isinf(ubnd))
# print('U bounded: ', ubounded)
if numpy.sum(lbounded) == 0 and numpy.sum(ubounded) == 0:
# print('No bounds')
return False
if numpy.any(lbounded & numpy.invert(ubounded) & (par - lbnd < atol)):
# print('Near lower bound')
return True
if numpy.any(numpy.invert(lbounded) & ubounded & (ubnd - par < atol)):
# print('Near upper bound')
return True
ulbounded = lbounded & ubounded
# print('UL bounded', ulbounded)
if numpy.sum(ulbounded) == 0:
# print('None bounded from both sides')
return False
# print('log bounded', logbounded)
# print('UL & log bounded', ulbounded & logbounded)
# print('UL & not log bounded', ulbounded & ~logbounded)
Dp = numpy.zeros(ulbounded.size, dtype=float)
indx = ulbounded & logbounded
Dp[indx] = numpy.log10(ubnd[indx]) - numpy.log10(lbnd[indx])
indx = ulbounded & numpy.invert(logbounded)
Dp[indx] = ubnd[indx] - lbnd[indx]
tol = Dp*rtol
# print(tol)
# print(tol[ulbounded & logbounded])
# print((numpy.ma.log10(par) - numpy.ma.log10(lbnd))[ulbounded & logbounded])
# print((numpy.ma.log10(ubnd) - numpy.ma.log10(par))[ulbounded & logbounded])
if numpy.any( ulbounded & logbounded & ( (numpy.ma.log10(par) - numpy.ma.log10(lbnd) < tol)
| (numpy.ma.log10(ubnd) - numpy.ma.log10(par) < tol))):
# print('Near boundary in log')
return True
# print(tol[ulbounded & ~logbounded])
# print((par - lbnd)[ulbounded & ~logbounded])
# print((ubnd - par)[ulbounded & ~logbounded])
if numpy.any(ulbounded & numpy.invert(logbounded) & ((par-lbnd < tol) | (ubnd-par < tol))):
# print('Near boundary in linear')
return True
# print('Not near boundary')
return False
# # Check if the parameters are near a boundary
# if self.bestfit[i,j].bounded:
# model_fit_par['LOBND'][i,j,:npar] = self.bestfit[i,j].bounds[0]
# model_fit_par['UPBND'][i,j,:npar] = self.bestfit[i,j].bounds[1]
# for k in range(npar):
# if not numpy.isinf(self.bestfit[i,j].bounds[0][k]) \
# and model_fit_par['PAR'][i,j,k] - model_fit_par['ERR'][i,j,k] \
# <= self.bestfit[i,j].bounds[0][k]:
# near_bound = True
# break
# if not numpy.isinf(self.bestfit[i,j].bounds[1][k]) \
# and model_fit_par['PAR'][i,j,k] + model_fit_par['ERR'][i,j,k] \
# >= self.bestfit[i,j].bounds[1][k]:
# near_bound = True
# break
# if self.bestfit[i,j].bounded:
# if numpy.any(~numpy.isinf(self.bestfit[i,j].bounds[0][pl:pl+nlinepar[k]]) \
# & (model_fit_par['PAR'][i,j,pl:pl+nlinepar[k]]
# - model_fit_par['ERR'][i,j,pl:pl+nlinepar[k]]
# <= self.bestfit[i,j].bounds[0][pl:pl+nlinepar[k]])):
# model_eml_par['MASK'][i,emlj] \
# = self.bitmask.turn_on(model_eml_par['MASK'][i,emlj], 'NEAR_BOUND')
# near_bound = True
# if numpy.any(~numpy.isinf(self.bestfit[i,j].bounds[1][pl:pl+nlinepar[k]]) \
# & (model_fit_par['PAR'][i,j,pl:pl+nlinepar[k]]
# + model_fit_par['ERR'][i,j,pl:pl+nlinepar[k]] \
# >= self.bestfit[i,j].bounds[1][pl:pl+nlinepar[k]])):
# model_eml_par['MASK'][i,emlj] \
# = self.bitmask.turn_on(model_eml_par['MASK'][i,emlj], 'NEAR_BOUND')
# near_bound = True
[docs]
@staticmethod
def _correct_subeml_par(restwave_0, restwave_k, par, err=None):
"""
Correct the parameters fit assuming a rest wavelength of
restwave_0, when it's actually restwave_k. Input parameters are
expected to be flux, velocity, velocity dispersion.
"""
c = astropy.constants.c.to('km/s').value
_par = par.copy()
_par[0] *= restwave_k/restwave_0
_par[1] = (restwave_0*(par[1]/c + 1)/restwave_k - 1)*c
_par[2] *= restwave_0/restwave_k
if err is None:
return _par
_err = err.copy()
_err[0] *= restwave_k/restwave_0
_err[1] *= restwave_0/restwave_k
_err[2] *= restwave_0/restwave_k
return _par, _err
[docs]
def _assess_and_save_fit(self, i, j, model_fit_par, model_eml_par):
"""
Assess the result of the LineProfileFit results.
- (DONE) Check the success failure
- (DONE) Calculate the chi2, rchi2, rms, fractional_rms,
residuals, fractional_residuals
- (DONE) Compare the fit parameters to the bounds
- Check the chi-square and residuals?
- Check the velocity offset wrt the input
- For multiple lines, check the order of the lines matches
the guess parameters
- For multiple component lines, check the ordering of the
subcomponents
"""
# Instantiate the returned flag that the parameter are near the
# imposed boundary; only meaningful if the fit did not fail
near_bound = False
# Assign the fitting window
model_eml_par['FIT_INDEX'][i,self.fitting_window[j].db_indx] = j
# print(model_eml_par['FIT_INDEX'][i,:])
# If the parameters are bounded, save the bounds
npar = self.bestfit[i,j].npar
if self.bestfit[i,j].bounded:
model_fit_par['LOBND'][i,j,:npar] = self.bestfit[i,j].bounds[0]
model_fit_par['UPBND'][i,j,:npar] = self.bestfit[i,j].bounds[1]
# Save which parameters were fixed or should be ignored
model_fit_par['IGNORE'][i,j,npar:] = True
# include_model = True
if not self.bestfit[i,j].result.success:
model_fit_par['MASK'][i,j] = self.bitmask.turn_on(model_fit_par['MASK'][i,j],
'FIT_FAILED')
if not self.quiet:
log_output(self.loggers, 1, logging.ERROR,
'FIT FAILED: Spec: {0}/{1} ({2} : {3})'.format(i+1,self.nspec,
self.fitting_window[j].line_index,
self.emission_lines['name'][self.fitting_window[j].db_indx]))
return near_bound
# Get the full list of parameters (including fixed and tied values)
self.bestfit[i,j]._assign_par(self.bestfit[i,j].result.x)
par = self.bestfit[i,j].par
model_fit_par['PAR'][i,j,:npar] = par[:]
model_fit_par['FIXED'][i,j,:npar] = self.bestfit[i,j].fixed
model_fit_par['FIXED'][i,j,npar:] = True
model_fit_par['ERR'][i,j,:npar] = numpy.zeros(npar, dtype=float)
if self.error is not None:
tmperr = numpy.diag(self.bestfit[i,j].cov)
if not numpy.any(tmperr < 0):
model_fit_par['ERR'][i,j,numpy.invert(model_fit_par['FIXED'][i,j])] \
= numpy.sqrt(numpy.diag(self.bestfit[i,j].cov))
else:
model_fit_par['MASK'][i,j] = self.bitmask.turn_on(model_fit_par['MASK'][i,j],
'UNDEFINED_COVAR')
# print('PAR: ', model_fit_par['PAR'][i,j,:npar])
# print('ERR: ', model_fit_par['ERR'][i,j,:npar])
# Get the per-line parameters
nlinepar = numpy.array([ p.npar for p in self.fitting_window[j].profile_set ])
for k in range(self.fitting_window[j].nlines):
pl = 0 if k == 0 else numpy.sum(nlinepar[:k])
emlj = self.fitting_window[j].db_indx[k]
model_eml_par['FLUX'][i,emlj] \
= self.fitting_window[j].profile_set[k].moment(order=0,
par=par[pl:pl+nlinepar[k]])
model_eml_par['FLUXERR'][i,emlj] \
= self.fitting_window[j].profile_set[k].moment_err(order=0,
par=par[pl:pl+nlinepar[k]],
err=model_fit_par['ERR'][i,j,pl:pl+nlinepar[k]])
# Flux unit conversion
# print(astropy.constants.c.to('km/s').value/self.fitting_window[j].restwave[k])
# /= astropy.constants.c.to('km/s').value/self.fitting_window[j].restwave[k]
model_eml_par['FLUX'][i,emlj] \
*= self.fitting_window[j].restwave[0]/astropy.constants.c.to('km/s').value
model_eml_par['FLUXERR'][i,emlj] \
*= self.fitting_window[j].restwave[0]/astropy.constants.c.to('km/s').value
model_eml_par['KIN'][i,emlj,0] \
= self.fitting_window[j].profile_set[k].moment(order=1,
par=par[pl:pl+nlinepar[k]])
model_eml_par['KINERR'][i,emlj,0] \
= self.fitting_window[j].profile_set[k].moment_err(order=1,
par=par[pl:pl+nlinepar[k]],
err=model_fit_par['ERR'][i,j,pl:pl+nlinepar[k]])
model_eml_par['KIN'][i,emlj,1] \
= self.fitting_window[j].profile_set[k].moment(order=2,
par=par[pl:pl+nlinepar[k]])
model_eml_par['KINERR'][i,emlj,1] \
= self.fitting_window[j].profile_set[k].moment_err(order=2,
par=par[pl:pl+nlinepar[k]],
err=model_fit_par['ERR'][i,j,pl:pl+nlinepar[k]])
# Correct for the fact that the fit was done using a
# velocity vector defined by the primary line
# TODO: Need to change how fitting is done so that this is
# NOT necessary!
if k > 0:
line_par, line_parerr \
= self._correct_subeml_par(self.fitting_window[j].restwave[0],
self.fitting_window[j].restwave[k],
numpy.array([ model_eml_par['FLUX'][i,emlj],
model_eml_par['KIN'][i,emlj,0],
model_eml_par['KIN'][i,emlj,1] ]),
err= numpy.array([
model_eml_par['FLUXERR'][i,emlj],
model_eml_par['KINERR'][i,emlj,0],
model_eml_par['KINERR'][i,emlj,1] ]))
model_eml_par['FLUX'][i,emlj], model_eml_par['KIN'][i,emlj,0], \
model_eml_par['KIN'][i,emlj,1] = tuple(line_par)
model_eml_par['FLUXERR'][i,emlj], model_eml_par['KINERR'][i,emlj,0], \
model_eml_par['KINERR'][i,emlj,1] = tuple(line_parerr)
if self.bestfit[i,j].bounded \
& self._is_near_bound(model_fit_par['PAR'][i,j,pl:pl+nlinepar[k]],
self.bestfit[i,j].bounds[0][pl:pl+nlinepar[k]],
self.bestfit[i,j].bounds[1][pl:pl+nlinepar[k]],
self.fitting_window[j].log_bounds.ravel().copy()[pl:pl+nlinepar[k]]):
model_eml_par['MASK'][i,emlj] \
= self.bitmask.turn_on(model_eml_par['MASK'][i,emlj], 'NEAR_BOUND')
near_bound = True
# print('Any line in window near boundary: ', near_bound)
if near_bound:
model_fit_par['MASK'][i,j] = self.bitmask.turn_on(model_fit_par['MASK'][i,j],
'NEAR_BOUND')
# Set the fit statistics
model_fit_par['CHI2'][i,j] = 0.0 if self.error is None else \
numpy.sum(numpy.square(self.bestfit[i,j]._chi(par)))
# print('CHI2:', model_fit_par['CHI2'][i,j])
model_fit_par['RCHI2'][i,j] = model_fit_par['CHI2'][i,j] \
/ (model_fit_par['NPIXFIT'][i,j] - npar)
# print('RCHI2:', model_fit_par['RCHI2'][i,j])
resid = self.bestfit[i,j]._resid(par)
model_fit_par['RMS'][i,j] = numpy.sqrt(numpy.mean(numpy.square(resid)))
# print('RMS:', model_fit_par['RMS'][i,j])
model_fit_par['RESID'][i,j] = sample_growth(numpy.ma.absolute(resid),
[0.0, 0.25, 0.50, 0.75, 0.90, 0.99, 1.0])
# model_fit_par['RESID'][i,j] = residual_growth(resid, [0.25, 0.50, 0.75, 0.90, 0.99])
# print('RESID: ', model_fit_par['RESID'][i,j])
fit = self.bestfit[i,j].sample(self.bestfit[i,j].x, par)
indx = numpy.absolute(fit) > 1e-4
if numpy.sum(indx) > 0:
frac_resid = resid[indx]/fit[indx]
model_fit_par['FRAC_RMS'][i,j] = numpy.sqrt(numpy.mean(numpy.square(frac_resid)))
# print('FRMS:', model_fit_par['FRAC_RMS'][i,j])
if numpy.sum(indx) > 1:
model_fit_par['FRAC_RESID'][i,j] = sample_growth(numpy.ma.absolute(frac_resid),
[0.0, 0.25, 0.50, 0.75, 0.90, 0.99, 1.0])
# model_fit_par['FRAC_RESID'][i,j] = residual_growth(frac_resid,
# [0.25, 0.50, 0.75, 0.90, 0.99])
# print('FRAC_RESID: ', model_fit_par['FRAC_RESID'][i,j])
return near_bound
# def _instrumental_dispersion_correction(self, model_eml_par):
# """
# Determine the instrumental velocity dispersion at the centroids
# of the fitted emission lines.
# """
# if self.sres is None:
# return
#
# interpolator = interpolate.interp1d(self.wave, self.sres, fill_value='extrapolate',
# assume_sorted=True)
# restwave = numpy.array([self.emission_lines['restwave']]*self.nspec)
# cnst = constants()
#
# # Get the instrumental dispersion at each (valid) line center
# indx = ~self.bitmask.flagged(model_eml_par['MASK'],
# flag=['INSUFFICIENT_DATA', 'FIT_FAILED', 'NEAR_BOUND',
# 'UNDEFINED_COVAR' ])
# model_eml_par['SINST'][indx] = astropy.constants.c.to('km/s') \
# / interpolator((model_eml_par['KIN'][indx,0]
# / astropy.constants.c.to('km/s').value+1.0)
# *restwave[indx])/cnst.sig2fwhm
# z = model_eml_par['KIN'][indx,0] / astropy.constants.c.to('km/s').value
# pyplot.scatter(restwave[indx]*(1.0+z), model_eml_par['SINST'][indx], marker='.', s=30,
# color='k')
# pyplot.show()
#
# pyplot.scatter(model_eml_par['SINST'][indx], model_eml_par['KIN'][indx,1], marker='.',
# s=30, color='k')
# pyplot.show()
# NEVER TESTED
# def _correct_velocity_dispersion(self, model_eml_par):
# """
# Use the **previously calculated** instrumental velocity
# dispersion to correct the measured velocity dispersions of the
# lines to the astrophysical velocity dispersion.
# """
#
# # Correct the second moments for the instrumental dispersion
# defined = numpy.zeros(indx.shape, dtype=numpy.bool)
# defined[indx] = model_eml_par['SINST'][indx] < model_eml_par['KIN'][indx,1]
# nonzero = numpy.zeros(indx.shape, dtype=numpy.bool)
# nonzero[indx] = numpy.absolute(model_eml_par['SINST'][indx]
# - model_eml_par['KIN'][indx,1]) > 0
#
## orig = model_eml_par['KIN'].copy()
#
# model_eml_par['KINERR'][nonzero,1] \
# = 2.0*model_eml_par['KIN'][nonzero,1]*model_eml_par['KINERR'][nonzero,1]
# model_eml_par['KIN'][nonzero,1] = numpy.square(model_eml_par['KIN'][nonzero,1]) \
# - numpy.square(model_eml_par['SINST'][nonzero])
# model_eml_par['KIN'][nonzero,1] = model_eml_par['KIN'][nonzero,1] \
# / numpy.sqrt(numpy.absolute(model_eml_par['KIN'][nonzero,1]))
# model_eml_par['KINERR'][nonzero,1] /= numpy.absolute(2.*model_eml_par['KIN'][nonzero,1])
#
# # Flag undefined values
# model_eml_par['MASK'][indx & ~defined] \
# = self.bitmask.turn_on(model_eml_par['MASK'][indx & ~defined], 'UNDEFINED_SIGMA')
#
## flg = self.bitmask.flagged(model_eml_par['MASK'], flag='UNDEFINED_SIGMA')
## pyplot.scatter(orig[nonzero & flg,1], model_eml_par['KIN'][nonzero & flg,1], marker='.',
## s=30, color='0.8')
## pyplot.scatter(orig[nonzero & ~flg,1], model_eml_par['KIN'][nonzero & ~flg,1], marker='.',
## s=30, color='k')
## pyplot.show()
[docs]
def fit_SpatiallyBinnedSpectra(self, binned_spectra, par=None, loggers=None, quiet=False):
"""
This is a basic interface that is geared for the DAP that
interacts with the rest of the, more general, parts of the
class.
This should not declare anything to self!
"""
# Assign the parameters if provided
if par is None:
raise ValueError('Required parameters for LineProfileFit have not been defined.')
# SpatiallyBinnedSpectra object always needed
if binned_spectra is None:
raise ValueError('Must provide spectra object for fitting.')
if not isinstance(binned_spectra, SpatiallyBinnedSpectra):
raise TypeError('Must provide a valid SpatiallyBinnedSpectra object!')
if binned_spectra.hdu is None:
raise ValueError('Provided SpatiallyBinnedSpectra object is undefined!')
# Continuum accounts for underlying absorption
if par['stellar_continuum'] is not None \
and not isinstance(par['stellar_continuum'], StellarContinuumModel):
raise TypeError('Must provide a valid StellarContinuumModel object.')
continuum = None if par['stellar_continuum'] is None \
else par['stellar_continuum'].fill_to_match(binned_spectra['BINID'].data,
missing=binned_spectra.missing_bins)
if continuum is not None:
if continuum.shape != binned_spectra['FLUX'].data.shape:
raise ValueError('Provided continuum does not match shape of the binned spectra.')
if not isinstance(continuum, numpy.ma.MaskedArray):
continuum = numpy.ma.MaskedArray(continuum)
# Get the data arrays to fit
good_snr = binned_spectra.above_snr_limit(par['minimum_snr'])
wave, flux, ivar, sres = EmissionLineFit.get_spectra_to_fit(binned_spectra,
pixelmask=par['pixelmask'],
select=good_snr)
# Return the fitted data
model_wave, model_flux, model_base, model_mask, model_fit_par, model_eml_par \
= self.fit(binned_spectra['WAVE'].data, flux, par['emission_lines'],
ivar=ivar, sres=sres, continuum=continuum[good_snr,:],
base_order=par['base_order'], window_buffer=par['window_buffer'],
guess_redshift=par['guess_redshift'][good_snr],
guess_dispersion=par['guess_dispersion'][good_snr], loggers=loggers,
quiet=quiet)
# Save the the bin ID numbers indices based on the spectra
# selected to be fit
model_fit_par['BINID'] = binned_spectra['BINS'].data['BINID'][good_snr]
model_fit_par['BINID_INDEX'] = numpy.arange(binned_spectra.nbins)[good_snr]
model_eml_par['BINID'] = binned_spectra['BINS'].data['BINID'][good_snr]
model_eml_par['BINID_INDEX'] = numpy.arange(binned_spectra.nbins)[good_snr]
# Add the equivalent width data
EmissionLineFit.measure_equivalent_width(binned_spectra['WAVE'].data, flux,
par['emission_lines'], model_eml_par,
redshift=par['guess_redshift'][good_snr],
bitmask=self.bitmask)
# Only return model and model parameters for the *fitted*
# spectra
return model_flux, model_base, model_mask, model_fit_par, model_eml_par, None
[docs]
def fit(self, wave, flux, emission_lines, ivar=None, mask=None, sres=None,
continuum=None, base_order=-1, window_buffer=25, guess_redshift=None,
guess_dispersion=None, loggers=None, quiet=False):
"""
The flux array is expected to have size Nspec x Nwave.
Raises:
ValueError: Raised if the length of the spectra, errors, or
mask does not match the length of the wavelength array;
raised if the wavelength, redshift, or dispersion arrays
are not 1D vectors; and raised if the number of
redshifts or dispersions is not a single value or the
same as the number of input spectra.
"""
# Initialize the reporting
if loggers is not None:
self.loggers = loggers
self.quiet = quiet
# Check the input emission-line database
EmissionLineFit.check_emission_line_database(emission_lines)
self.emission_lines = emission_lines
self.neml = self.emission_lines.neml
# Prepare the spectra for fitting
self.wave = wave
self.flux, self.error, self.sres, self.continuum, self.redshift, self.dispersion \
= EmissionLineFit.check_and_prep_input(self.wave, flux, ivar=ivar, mask=mask,
sres=sres, continuum=continuum,
redshift=guess_redshift,
dispersion=guess_dispersion,
default_dispersion=self.default_dispersion)
# Keep the shape
self.nspec, self.nwave = self.flux.shape
# Subtract the continuum
self.fluxnc, self.no_continuum = EmissionLineFit.subtract_continuum(self.flux,
self.continuum)
# No continuum for any spaxel!
if self.no_continuum is None:
self.no_continuum = numpy.ones(self.flux.shape, dtype=bool)
# pyplot.step(self.wave, self.flux[0,:], where='mid', color='k', lw=0.5, linestyle='-')
# pyplot.step(self.wave, self.fluxnc[0,:], where='mid', color='g', lw=0.5, linestyle='-')
# pyplot.plot(self.wave, self.no_continuum[0,:], color='r', lw=0.5, linestyle='-')
# pyplot.show()
# exit()
# Build the emission-line fitting windows
self._parse_emission_line_models()
max_npar = numpy.amax([ fw.fixed_par.size for fw in self.fitting_window ])
self.base_order = base_order
if self.base_order > -1:
max_npar += base_order+1
self.window_buffer = window_buffer
# Initialize the output arrays
# Model flux:
model_flux = numpy.zeros(self.flux.shape, dtype=float)
model_base = numpy.zeros(self.flux.shape, dtype=float)
# Model mask:
model_mask = numpy.zeros(self.flux.shape, dtype=self.bitmask.minimum_dtype())
indx = numpy.ma.getmaskarray(self.flux)
model_mask[indx] = self.bitmask.turn_on(model_mask[indx], 'DIDNOTUSE')
model_mask[self.no_continuum] = self.bitmask.turn_on(model_mask[self.no_continuum],
'NOCONTINUUM')
# Model parameters and fit quality
model_fit_par = init_record_array(self.nspec,
Elric._per_fitting_window_dtype(self.nwindows, max_npar,
self.bitmask.minimum_dtype()))
model_fit_par['BINID'] = numpy.arange(self.nspec)
model_fit_par['BINID_INDEX'] = numpy.arange(self.nspec)
# Emission-line parameters
model_eml_par = self.emission_line_datatable(self.neml, 2, self.bitmask.minimum_dtype(),
shape=self.nspec)
# model_eml_par = init_record_array(self.nspec,
# self._per_emission_line_dtype(self.neml, 2,
# self.bitmask.minimum_dtype()))
model_eml_par['CONTMPLY'] = numpy.ones(model_eml_par['CONTMPLY'].shape, dtype=float)
model_eml_par['BINID'] = numpy.arange(self.nspec)
model_eml_par['BINID_INDEX'] = numpy.arange(self.nspec)
t = time.perf_counter()
# Do the fit
velocity = self._velocity_vectors(self.wave, self.fitting_window)
self.bestfit = numpy.empty((self.nspec,self.nwindows), dtype=object)
for i in range(self.nspec):
# for i in range(5):
# for j in range(self.nwindows):
# print('Window: {0}/{1}'.format(j+1, self.nwindows))
# for p in self.fitting_window[j].profile_set:
# print(p.par)
# if not self.quiet:
# log_output(self.loggers, 1, logging.INFO, 'Fit: {0}/{1}'.format(i+1,self.nspec))
print('Fit: {0}/{1}'.format(i+1,self.nspec), end='\r')
# Get the fitting mask for each emission-line in each window
fitting_mask = self._fit_masks(self.wave, self.fitting_window, self.redshift[i],
self.window_buffer)
# Set the mask for any pixels that are NEVER fit for this
# spectrum
indx = numpy.invert(numpy.any(fitting_mask, axis=0))
model_mask[i,indx] = self.bitmask.turn_on(model_mask[i,indx], 'OUTSIDE_RANGE')
model_jump = numpy.array([
numpy.sum(fm & self.no_continuum[i,:]) not in [0, numpy.sum(fm)] \
for fm in numpy.take(fitting_mask, numpy.arange(self.nwindows), axis=0) ])
spec_to_fit = numpy.ma.array([ self.flux[i,:] if mj else self.fluxnc[i,:] \
for mj in model_jump ])
cz = self.redshift[i]*astropy.constants.c.to('km/s').value
# total_near_bound = 0
# Fit each window
for j in range(self.nwindows):
# Rest the profile to the initial parameters
self.fitting_window[j].reinit_profiles()
# print('Fitting: ', self.emission_lines['name'][self.fitting_window[j].db_indx])
# No observed pixels in fitting window
# print('Sum of fitting mask: {0}'.format(numpy.sum(fitting_mask[j,:])))
if numpy.sum(fitting_mask[j,:]) == 0:
warnings.warn('No valid data for fit to line(s) {0} ({1}). '
'Continuuing'.format(self.fitting_window[j].line_index,
self.emission_lines['name'][self.fitting_window[j].db_indx]))
model_fit_par['MASK'][i,j] = self.bitmask.turn_on(model_fit_par['MASK'][i,j],
'INSUFFICIENT_DATA')
continue
# w,h = pyplot.figaspect(1)
# fig = pyplot.figure(figsize=(1.5*w,1.5*h))
# ax = fig.add_axes([0.1, 0.3, 0.8, 0.4])
# ax.set_xlim(velocity[j,0], velocity[j,-1])
# ax.step(velocity[j,:], spec_to_fit[j,:], where='mid', color='k', linestyle='-',
# lw=0.5)
# ax.plot(velocity[j,:], fitting_mask[j,:], color='r')
# axt = ax.twiny()
# axt.set_xlim(self.wave[0], self.wave[-1])
# pyplot.show()
# Shift the guess velocity based on the redshift and
# adjust the guess flux
# TODO: Use emission-line moment results for the
# guesses, if provided
# TODO: Use self.dispersion to set the initial
# dispersion guesses? Currently uses guesses from
# emission-line database (see
# _parse_emission_line_models)
for k,p in enumerate(self.fitting_window[j].profile_set):
p.shift_mean(cz)
vi = numpy.argsort(numpy.absolute(velocity[j,:]-p.moment(order=1)))[0]
# print('guess flux: ', (spec_to_fit[j,vi] if spec_to_fit[j,vi] > 0.1 else 0.1) \
# * numpy.sqrt(2*numpy.pi) * p.moment(order=2))
p.set_flux((spec_to_fit[j,vi] if spec_to_fit[j,vi] > 0.1 else 0.1) \
* numpy.sqrt(2*numpy.pi) * p.moment(order=2))
# Setup the guess parameters and bounds
_guess_par = numpy.array([ p.par.copy().ravel() \
for p in self.fitting_window[j].profile_set ]).ravel()
_fixed_par = self.fitting_window[j].fixed_par.ravel().copy()
_bounds = self.fitting_window[j].bounds.copy().reshape(-1,2)
# print(_guess_par)
# print(_fixed_par)
# print(_bounds)
# For individual lines, bound the line mean to be within
# the fitting window
if self.fitting_window[j].nlines == 1:
indx = self.fitting_window[j].profile_set[0].mean_indx()
_bounds[indx,0] = velocity[j,fitting_mask[j,:]][0]
_bounds[indx,1] = velocity[j,fitting_mask[j,:]][-1]
# For multiple lines, bound the line mean to be within
# its own region of the window defined by the +/- 2/3 of
# the velocity separation to the nearest line. These
# borders overlap!
else:
indx = numpy.array(self.fitting_window[j].profile_set[0].mean_indx())
for k in range(1,self.fitting_window[j].nlines):
indx = numpy.append(indx,
numpy.array(self.fitting_window[j].profile_set[0].mean_indx()) \
+ sum([self.fitting_window[j].profile_set[kk].npar for kk in range(k)]))
srt = numpy.argsort(_guess_par[indx])
borders = []
for k in range(len(indx)-1):
diff = _guess_par[indx][srt[k+1]]-_guess_par[indx][srt[k]]
borders += [ _guess_par[indx][srt[k]] + 2.0*diff/3.0,
_guess_par[indx][srt[k]] + diff/3.0 ]
borders = numpy.array([ velocity[j,fitting_mask[j,:]][0], *borders,
velocity[j,fitting_mask[j,:]][-1] ])
_bounds[indx,0] = borders[2*srt]
_bounds[indx,1] = borders[2*srt+1]
# Add the parameters for the baseline
if base_order > -1:
_guess_par = numpy.append(_guess_par, numpy.zeros(base_order+1))
# _guess_par[-base_order-1] = numpy.median(spec_to_fit[j,fitting_mask[j,:]])
# print(0.0, _guess_par[-base_order-1])
# pyplot.scatter(velocity[j,fitting_mask[j,:]], spec_to_fit[j,fitting_mask[j,:]],
# marker='.', s=100, color='k')
# y = velocity.copy()[j,fitting_mask[j,:]]
# y[:] = numpy.median(spec_to_fit[j,fitting_mask[j,:]])
# pyplot.plot(velocity[j,fitting_mask[j,:]], y, color='r')
# pyplot.show()
# exit()
_fixed_par = numpy.append(self.fitting_window[j].fixed_par.ravel(),
numpy.zeros(base_order+1).astype(bool))
_bounds = numpy.append(_bounds,
numpy.array([-numpy.inf,numpy.inf]*(base_order+1)).reshape(-1,2),axis=0)
# Check there is sufficient data to perform the fit
model_fit_par['NPIXFIT'][i,j] = len(spec_to_fit[j,fitting_mask[j,:]].compressed())
if model_fit_par['NPIXFIT'][i,j] <= _guess_par.size:
warnings.warn('Insufficient data points ({0}) to fit with this function ' \
'(npar={1})!'.format(
len(spec_to_fit[j,fitting_mask[j,:]].compressed()),
_guess_par.size))
self.bestfit[i,j] = None
# Flag that the fit was not performed
model_fit_par['MASK'][i,j] = self.bitmask.turn_on(model_fit_par['MASK'][i,j],
'INSUFFICIENT_DATA')
continue
# Make sure that the guess is still in the bounds:
# TODO: How do I deal with this:
# - throw a warning and adjust the guess
# - as above and include a mask bit
# - don't perform the fit because it will be bogus
indx = numpy.logical_or( _guess_par < _bounds[:,0], _guess_par > _bounds[:,1])
if numpy.sum(indx) > 0:
warnings.warn('Initial guess outside bounds for line(s) {0} ({1}). '
'Adjusted to center of bounds.'.format(
self.fitting_window[j].line_index,
self.emission_lines['name'][self.fitting_window[j].db_indx]))
_guess_par[indx] = numpy.mean(_bounds[indx,:], axis=1)
# TODO: Get rid of this debugging issue...
if not numpy.all(_bounds[:,0]<_bounds[:,1]):
print('input')
print(_bounds)
print(self.fitting_window[j].bounds.reshape(-1,2))
print(numpy.sum(fitting_mask[j,:]))
if self.fitting_window[j].nlines == 1:
indx = self.fitting_window[j].profile_set[0].mean_indx()
_bounds[indx,0] = velocity[j,fitting_mask[j,:]][0]
_bounds[indx,1] = velocity[j,fitting_mask[j,:]][-1]
else:
indx = numpy.array(self.fitting_window[j].profile_set[0].mean_indx())
for k in range(1,self.fitting_window[j].nlines):
indx = numpy.append(indx,
numpy.array(self.fitting_window[j].profile_set[0].mean_indx()) \
+ sum([self.fitting_window[j].profile_set[kk].npar
for kk in range(k)]))
srt = numpy.argsort(_guess_par[indx])
borders = []
for k in range(len(indx)-1):
diff = _guess_par[indx][srt[k+1]]-_guess_par[indx][srt[k]]
borders += [ _guess_par[indx][srt[k]] + 2.0*diff/3.0,
_guess_par[indx][srt[k]] + diff/3.0 ]
borders = numpy.array([ velocity[j,fitting_mask[j,:]][0], *borders,
velocity[j,fitting_mask[j,:]][-1] ])
print('borders')
print(borders)
_bounds[indx,0] = borders[2*srt]
_bounds[indx,1] = borders[2*srt+1]
print('output')
print(_bounds)
raise ValueError('WTF')
# print(_guess_par)
# print(_bounds)
# pyplot.errorbar(velocity[j,fitting_mask[j,:]], spec_to_fit[j,fitting_mask[j,:]],
# yerr=_error[i,fitting_mask[j,:]], marker='.', color='k',
# linestyle='', capsize=0)
# pyplot.title('Spec: {0}; Line (set): {1}'.format(i+1,j+1))
# pyplot.show()
# print('N valid pixels: {0}'.format(
# len(spec_to_fit[j,fitting_mask[j,:]].compressed())))
# Run the fit
# print('guess par: {0} {1}'.format(i, j))
# print(_guess_par)
# print(' ')
self.bestfit[i,j] = LineProfileFit(velocity[j,:], spec_to_fit[j,:],
self.fitting_window[j].profile_set.tolist(),
error=None if self.error is None else self.error[i,:],
base_order=base_order,
mask=numpy.invert(fitting_mask[j,:]),
par=_guess_par,
bounds=(_bounds[:,0], _bounds[:,1]),
construct_covariance=True)
#, verbose=2)#, fixed=_fixed_par)
# print('Spec: {0}; Line (set): {1}; Success: {2}'.format(i+1,j+1,
# self.bestfit[i,j].result.success))
near_bound = self._assess_and_save_fit(i, j, model_fit_par, model_eml_par)
if near_bound:
# print('Flagging mask as near bound')
model_mask[i,fitting_mask[j,:]] \
= self.bitmask.turn_on(model_mask[i,fitting_mask[j,:]], 'NEAR_BOUND')
# total_near_bound += 1
# Add the best-fit to the lines to the composite model
# for this spectrum
if self.fitting_window[j].output_model and not self.bestfit[i,j].result.success:
model_mask[i,fitting_mask[j,:]] \
= self.bitmask.turn_on(model_mask[i,fitting_mask[j,:]], 'FIT_FAILED')
elif not self.fitting_window[j].output_model or model_fit_par['MASK'][i,j] != 0:
model_fit_par['MASK'][i,j] = self.bitmask.turn_on(model_fit_par['MASK'][i,j],
'EXCLUDED_FROM_MODEL')
elif self.fitting_window[j].output_model and model_fit_par['MASK'][i,j] == 0:
# Get the full model (eventually will only be the
# baseline)
p = self.bestfit[i,j].result.x.copy()
model_base[i,fitting_mask[j,:]] \
+= self.bestfit[i,j].sample(velocity[j,fitting_mask[j,:]], par=p)
# Remove the baseline and only get the line profile
p[-base_order-1:] = 0.0
model_flux[i,fitting_mask[j,:]] \
+= self.bestfit[i,j].sample(velocity[j,fitting_mask[j,:]], par=p)
# if 44 in self.fitting_window[j].line_index:
# pyplot.errorbar(velocity[j,fitting_mask[j,:]], spec_to_fit[j,fitting_mask[j,:]],
# yerr=self.error[i,fitting_mask[j,:]], marker='.', color='k',
# linestyle='', capsize=0)
# pyplot.plot(velocity[j,fitting_mask[j,:]],
# self.bestfit[i,j].sample(velocity[j,fitting_mask[j,:]],
# par=self.bestfit[i,j].result.x), linestyle='-', color='g')
# pyplot.plot(velocity[j,fitting_mask[j,:]],
# self.bestfit[i,j].sample(velocity[j,fitting_mask[j,:]],
# par=_guess_par), linestyle='-', color='r')
# print(_guess_par)
# print(self.bestfit[i,j].result.x)
# pyplot.title('Spec: {0}; Line (set): {1}'.format(i+1,j+1))
# pyplot.show()
# pyplot.plot(wave, model_flux[i,:], linestyle='-', color='g')
# pyplot.plot(wave, model_base[i,:], linestyle='-', color='r')
# pyplot.show()
# print('Windows with parameters near boundary: {0}'.format(total_near_bound))
# Determine the instrumental dispersion correction at the line centers
indx = numpy.invert(self.bitmask.flagged(model_eml_par['MASK'][i,:],
flag=['INSUFFICIENT_DATA', 'FIT_FAILED',
'NEAR_BOUND', 'UNDEFINED_COVAR' ]))
model_eml_par['SIGMACORR'][i,indx] \
= EmissionLineFit.instrumental_dispersion(self.wave, self.sres[i,:],
emission_lines['restwave'][indx],
model_eml_par['KIN'][i,indx,0])
# With these definitions, the instrumental sigma and the sigma correction are
# identical, and the "template" sigma are all zero
model_eml_par['SIGMAINST'] = model_eml_par['SIGMACORR'].copy()
print('Fit: {0}/{0}'.format(i+1,self.nspec))
# Remove the lines from the full model to just provide the
# baseline model
model_base -= model_flux
# pyplot.plot(wave, model_flux[0,:], linestyle='-', color='g')
# pyplot.plot(wave, model_base[0,:], linestyle='-', color='r')
# pyplot.show()
if not self.quiet:
log_output(self.loggers, 1, logging.INFO, 'Fits completed in {0:.4e} min.'.format(
(time.perf_counter() - t)/60))
return self.wave, model_flux, model_base, model_mask, model_fit_par, model_eml_par