# Licensed under a 3-clause BSD style license - see LICENSE.rst
# -*- coding: utf-8 -*-
"""
.. include:: ../include/parset_usage.rst
.. todo::
- Add range and length parameters allowing one to define the range
allowed for the parameter values and number of elements required
(if the parameter is an array)
- Allow for a from_par_file classmethod to initialize the parameter
set based on a yanny parameter file.
- Save the defaults and allow for a revert_to_default function.
- Write an __add__ function that will all you to add multiple
parameter sets.
----
.. include license and copyright
.. include:: ../include/copy.rst
----
.. include common links, assuming primary doc root is up one directory
.. include:: ../include/links.rst
"""
import os
import inspect
import warnings
import textwrap
from IPython import embed
import numpy
from configobj import ConfigObj
from .util import recursive_dict_evaluate
[docs]
class ParSet:
"""
Generic base class to handle and manipulate a list of operational
parameters. A glorified dictionary that constrains and types its
components.
Args:
pars (:obj:`list`):
A list of keywords for a list of parameter values.
values (:obj:`list`, optional):
Initialize the parameters to these values. If not provided,
all parameters are initialized to `None` or the provided
default.
defaults (:obj:`list`, optional):
For any parameters not provided in the *values* list, use
these default values. If not provided, no defaults are
assumed.
options (:obj:`list`, optional):
Force the parameters to be one of a list of options. Each
element in the list can be a list itself. If not provided,
all parameters are allowed to take on any value within the
allowed data type.
dtypes (:obj:`list`, optional):
Force the parameter to be one of a list of data types. Each
element in the list can be a list itself. If not provided,
all parameters are allowed to have any data type.
can_call (:obj:`list`, optional): Flag that the parameters are
callable operations. Default is False.
descr (:obj:`list`, optional):
A list of parameter descriptions. Empty strings by default.
cfg_section (:obj:`str`, optional):
The top-level designation for a configuration section
written based on the contents of this parameter set.
cfg_comment (:obj:`str`, optional):
Comment to be placed at the top-level of the configuration
section written based on the contents of this parameter set.
Raises:
TypeError:
Raised if the input parameters are not lists or if the input
keys are not strings.
ValueError:
Raised if any of the optional arguments do not have the same
length as the input list of parameter keys.
Attributes:
npar (:obj:`int`):
Number of parameters
data (:obj:`dict`):
Dictionary with the parameter values
default (:obj:`dict`):
Dictionary with the default values
options (:obj:`dict`):
Dictionary with the allowed options for the parameter values
dtype (:obj:`dict`):
Dictionary with the allowed data types for the parameters
can_call (:obj:`dict`):
Dictionary with the callable flags
descr (:obj:`dict`):
Dictionary with the description of each parameter.
cfg_section (:obj:`str`):
The top-level designation for a configuration section
written based on the contents of this parameter set.
cfg_comment (:obj:`str`):
Comment to be placed at the top-level of the configuration
section written based on the contents of this parameter set.
"""
prefix = 'PAR'
"""
Class prefix for header keywords when writing the parset to an
`astropy.io.fits.Header`_ object.
"""
def __init__(self, pars, values=None, defaults=None, options=None, dtypes=None, can_call=None,
descr=None, cfg_section=None, cfg_comment=None):
# Check that the list of input parameters is a list of strings
if not isinstance(pars, list):
raise TypeError('Input parameter keys must be provided as a list.')
for key in pars:
if not isinstance(key, str):
raise TypeError('Input parameter keys must be strings.')
# Get the length of the parameter list and make sure the list
# has unique values
self.npar = len(pars)
if len(numpy.unique(numpy.array(pars))) != self.npar:
raise ValueError('All input parameter keys must be unique.')
# Check that the other lists, if provided, have the correct type
# and length
if values is not None and (not isinstance(values, list) or len(values) != self.npar):
raise ValueError('Values must be a list with the same length as the keys list.')
if defaults is not None and (not isinstance(defaults, list) or len(defaults) != self.npar):
raise ValueError('Defaults must be a list with the same length as the keys list.')
if options is not None and (not isinstance(options, list) or len(options) != self.npar):
raise ValueError('Options must be a list with the same length as the keys list.')
if dtypes is not None and (not isinstance(dtypes, list) or len(dtypes) != self.npar):
raise ValueError('Data types list must have the same length as the keys list.')
if can_call is not None and (not isinstance(can_call, list) or len(can_call) != self.npar):
raise ValueError('List of callable flags must have the same length as keys list.')
if descr is not None and (not isinstance(descr, list) or len(descr) != self.npar):
raise ValueError('List of parameter descriptions must have the same length as '
'keys list.')
# Set up dummy lists for no input
_values = [None]*self.npar if values is None else values
_defaults = [None]*self.npar if defaults is None else defaults
_options = [None]*self.npar if options is None else options
_dtypes = [None]*self.npar if dtypes is None else dtypes
_can_call = [False]*self.npar if can_call is None else can_call
_descr = ['']*self.npar if descr is None else descr
# Set the defaults
self.default = dict([ (p, d) for p, d in zip(pars, _defaults) ])
# Set the valid options
self.options = dict([ (p, [o]) if o is not None and not isinstance(o, list) else (p, o) \
for p, o in zip(pars, _options) ])
# Set the valid types
self.dtype = dict([ (p, [t]) if not isinstance(t, list) else (p, t) \
for p, t in zip(pars, _dtypes) ])
# Set the calling flags
self.can_call = dict([ (p, t) for p, t in zip(pars, _can_call) ])
# Set the calling flags
self.descr = dict([ (p, t) for p, t in zip(pars, _descr) ])
# Set the data dictionary using the overloaded
# __setitem__function so that value checking is performed
self.data = dict.fromkeys(pars)
for p, d, v, t in zip(pars, _defaults, _values, _dtypes):
# Check if 'None' is an allowed option
none_allowed = False
if type(t) is list:
if type(None) in t:
none_allowed = True
if v is None and not none_allowed:
self.__setitem__(p, d)
continue
self.__setitem__(p, v)
# Save the configuration file section details
self.cfg_section = cfg_section
self.cfg_comment = cfg_comment
def __getitem__(self, key):
"""
Return the value of the designated key.
Args:
key (:obj:`str`):
Key for new parameter
"""
return self.data[key]
def __setitem__(self, key, value):
"""
Set the value for a key.
Args:
key (:obj:`str`):
Key for new parameter
value (object):
Parameter value, must have the data type specified by
:attr:`dtype` when instantiating the object, if any were
provided.
Raises:
KeyError:
Raised if the keyword is not valid for this class.
ValueError:
Raised if the parameter value is not among the allowed
options (:attr:`options`).
TypeError:
Raised if the parameter value does not have an allowed
data type (:attr:`dtype`) or if the provided value is
not a callable object, but is expected to be by
:attr:`can_call`.
"""
if key not in self.keys():
raise KeyError('{0} is not a valid key for {1}.'.format(key, self.__class__.__name__))
if value is None:
self.data[key] = value
return
if isinstance(value, list):
is_parset_or_dict = [ isinstance(v, (ParSet, dict)) for v in value ]
if numpy.any(is_parset_or_dict) and not numpy.all(is_parset_or_dict):
warnings.warn('List includes a mix of ParSet and dicts with other types. '
'Displaying and writing the ParSet will not be correct!')
if self.options[key] is not None and value not in self.options[key]:
raise ValueError('Input value for {0} invalid: {1}.\nOptions are: {2}'.format(
key, value, self.options[key]))
if self.dtype[key] is not None \
and not any([ d is None or isinstance(value, d) for d in self.dtype[key]]):
raise TypeError('Input value for {0} has incorrect type: {1}.'.format(key, value) +
'\nValid types are: {0}'.format(self.dtype[key]))
if self.can_call[key] and not callable(value):
raise TypeError('{0} is not a callable object.'.format(value))
self.data[key] = value
def __len__(self):
"""Return the number of parameters."""
return self.npar
def __iter__(self):
"""Return an iterable to the parameter values."""
return iter(self.data.values())
def __repr__(self):
"""Return a string representation of the parameters."""
return self._output_string(header=self.cfg_section)
[docs]
def _output_string(self, header=None, value_only=False):
"""
Constructs the short-format table strings for the
``__repr__`` method.
Args:
header (:obj:`str`, optional):
String header to provide for the table. This is
typically the name of the configuration section.
value_only (:obj:`bool`, optional):
By default, the table includes the parameter key, its
current value, the default value, its data type, and if
the value can be a callable function. If
`value_only=True`, only the parameter key and current
value are returned.
Returns:
str: Single long string with the parameter table for the
``__repr__`` method.
"""
additional_par_strings = []
ncol = 2 if value_only else 5
data_table = numpy.empty((self.npar+1, ncol), dtype=object)
data_table[0,:] = ['Parameter', 'Value'] if value_only \
else ['Parameter', 'Value', 'Default', 'Type', 'Callable']
for i, k in enumerate(self.keys()):
data_table[i+1,0] = k
if isinstance(self.data[k], ParSet):
_header = k if header is None else '{0}:{1}'.format(header, k)
additional_par_strings += [ self.data[k]._output_string(header=_header,
value_only=value_only) ]
data_table[i+1,1] = 'see below'
if not value_only:
data_table[i+1,2] = 'see below'
else:
data_table[i+1,1] = ParSet._data_string(self.data[k])
if not value_only:
data_table[i+1,2] = ParSet._data_string(self.default[k])
if value_only:
continue
data_table[i+1,3] = ', '.join(self._types_list(k))
data_table[i+1,4] = self.can_call[k].__repr__()
output = [ParSet._data_table_string(data_table)]
if header is not None:
output = [header] + output
if len(additional_par_strings) > 0:
output += additional_par_strings
return '\n'.join(output)
[docs]
@staticmethod
def _data_table_string(data_table, delimiter='print'):
"""
Provided the array of data, format it with equally spaced
columns and add a header (first row) and contents delimiter.
Args:
data_table (`numpy.ndarray`_):
Array of string representations of the data to print.
Returns:
:obj:`str`: Single long string with the data table.
"""
nrows, ncols = data_table.shape
col_width = [ numpy.amax([ len(dij) for dij in dj]) for dj in data_table.T ]
row_string = ['']*(nrows+1) if delimiter == 'print' else ['']*(nrows+3)
start = 2 if delimiter == 'print' else 3
for i in range(start,nrows+start-1):
row_string[i] = ' '.join([ data_table[1+i-start,j].ljust(col_width[j])
for j in range(ncols)])
if delimiter == 'print':
# Heading row
row_string[0] = ' '.join([ data_table[0,j].ljust(col_width[j]) for j in range(ncols)])
# Delimiter
row_string[1] = '-'*len(row_string[0])
return '\n'.join(row_string)+'\n'
# For an rst table
row_string[0] = ' '.join([ '='*col_width[j] for j in range(ncols)])
row_string[1] = ' '.join([ data_table[0,j].ljust(col_width[j]) for j in range(ncols)])
row_string[2] = row_string[0]
row_string[-1] = row_string[0]
return '\n'.join(row_string)+'\n'
[docs]
@staticmethod
def _data_string(data, use_repr=True, verbatum=False):
"""
Convert a single datum into a string
Simply return strings, recursively convert the elements of
any objects with a ``__len__`` attribute, or use the object's
own ``__repr__`` attribute for all other objects.
Args:
data (object):
The object to stringify.
"""
if isinstance(data, str):
return data if not verbatum else '``' + data + '``'
if hasattr(data, '__len__'):
return '[]' if isinstance(data, list) and len(data) == 0 \
else ', '.join([ ParSet._data_string(d, use_repr=use_repr,
verbatum=verbatum) for d in data ])
if use_repr:
return data.__repr__()
return str(data)
[docs]
def _wrap_print(self, head, output, tcols):
"""
Wrap the contents of an output string for a fixed terminal
width. Used for the long-format :func:`info` method.
Args:
head (:obj:`str`):
The inline header for the output. Can be an empty
string, but cannot be ``None``.
output (:obj:`str`):
The main body of the text to write.
tcols (:obj:`int`):
The allowed width for the output.
"""
tail = ' '*len(head)
if tcols is not None:
lines = textwrap.wrap('{0}'.format(output), tcols-len(head))
if len(lines) == 0:
print('{0}None'.format(head))
else:
_head = [ head ] + [ tail ]*(len(lines)-1)
print('\n'.join([ h+l for h,l in zip(_head, lines)]))
else:
print(head+'{0}'.format(output))
[docs]
def _types_list(self, key):
"""Return the string names for the specified data types."""
return ['Undefined' if t is None else t.__name__ for t in self.dtype[key]]
[docs]
@staticmethod
def config_lines(par, section_name=None, section_comment=None, section_level=0,
exclude_defaults=False, include_descr=True):
"""
Recursively generate the lines of a configuration file based on
the provided ParSet or dict (par).
Args:
section_name (:obj:`str`, optional):
Name to give to the top-level of the configuration
output.
section_comment (:obj:`str`, optional):
Description to provide for the top-level configuration
output.
section_level (:obj:`int`, optional):
The level for the configuration output. Sets the
indentation level and the number of square brackets
assigned to the section name.
exclude_defaults (:obj:`bool`, optional):
Do not include any parameters that are identical to the
defaults.
include_descr (:obj:`bool`, optional):
Include the descriptions of each parameter as comments.
Returns:
:obj:`list`: The list of the lines to write to a
configuration file.
"""
# Get the list of parameters that are ParSets
parset_keys = [ k for k in par.keys() if isinstance(par[k], (ParSet, dict)) ]
n_parsets = len(parset_keys)
# Set the top-level comment and section name
section_indent = ' '*4*section_level
component_indent = section_indent + ' '*4
lines = [] if section_comment is None \
else ParSet._config_comment(section_comment, section_indent)
lines += [ section_indent + '['*(section_level+1) + section_name
+ ']'*(section_level+1) ]
min_lines = len(lines)
# Add all the parameters that are not ParSets
for k in par.keys():
# Skip it if this element is a ParSet
if n_parsets > 0 and k in parset_keys:
continue
# If the value is a list, determine if all the elements of
# the list are also dictionaries or ParSets
if isinstance(par[k], list) and len(par[k]) > 0:
is_parset_or_dict = [ isinstance(v, (ParSet, dict)) for v in par[k] ]
if numpy.all(is_parset_or_dict):
ndig = int(numpy.log10(len(par[k])))+1
for i, v in enumerate(par[k]):
indx = str(i+1).zfill(ndig)
# Try to add the section comment
section_comment = None
if include_descr:
try:
section_comment = par.descr[k] + ': ' + indx
except:
pass
lines += ParSet.config_lines(v, section_name=k+indx,
section_comment=section_comment,
section_level=section_level+1,
exclude_defaults=exclude_defaults,
include_descr=include_descr)
continue
# Working with a single element
# Try to add the description for this parameter
try:
if par.descr[k] is not None and include_descr:
lines += ParSet._config_comment(par.descr[k], component_indent)
except:
pass
if not exclude_defaults or par[k] != par.default[k]:
lines += [ component_indent + k + ' = ' + ParSet._data_string(par[k]) ]
# Then add the items that are ParSets as subsections
for k in parset_keys:
section_comment = None
if include_descr:
try:
section_comment = par.descr[k]
except:
pass
lines += ParSet.config_lines(par[k], section_name=k, section_comment=section_comment,
section_level=section_level+1,
exclude_defaults=exclude_defaults,
include_descr=include_descr)
return lines if len(lines) > min_lines else []
[docs]
def info(self, basekey=None):
"""
A long-form version of __repr__ that includes the parameter descriptions.
"""
# Try to get the width of the available space to print
try:
tr, tcols = numpy.array(os.popen('stty size', 'r').read().split()).astype(int)
tcols -= int(tcols*0.1)
except:
tr = None
tcols = None
for k in self.data.keys():
if isinstance(self.data[k], ParSet):
self.data[k].info(basekey=k)
continue
print('{0}'.format(k) if basekey is None else '{0}:{1}'.format(basekey,k))
self._wrap_print(' Value: ', self.data[k], tcols)
self._wrap_print(' Default: ', self.default[k], tcols)
self._wrap_print(' Options: ', 'None' if self.options[k] is None
else ', '.join(self.options[k]), tcols)
self._wrap_print(' Valid Types: ', 'None' if self.dtype[k] is None
else ', '.join(self._types_list(k)), tcols)
self._wrap_print(' Callable: ', self.can_call[k], tcols)
self._wrap_print(' Description: ', self.descr[k], tcols)
print(' ')
[docs]
def keys(self):
"""Return the list of parameter set keys."""
return list(self.data.keys())
[docs]
def add(self, key, value, default=None, options=None, dtype=None, can_call=None, descr=None):
"""
Add a new parameter.
Args:
key (:obj:`str`):
Key for new parameter
value (object):
Parameter value, must have a type in the list provided
by ``dtype``, if the list is provided
default (object, optional):
Define a default value for the parameter, must have a
type in the list provided by ``dtype``, if the list is
provided. No default if not provided.
options (:obj:`list`, optional):
List of discrete values that the parameter is allowed to
have. Allowed to be anything if not provided.
dtype (:obj:`list`, optional):
List of allowed data types that the parameter can have.
Allowed to be anything if not provided.
can_call (:obj:`bool`, optional):
Flag that the parameters are callable operations.
Default is False.
descr (:obj:`str`, optional):
Parameter description. Default is that no description
is added.
Raises:
ValueError:
Raised if the keyword alread exists.
"""
if key in self.data.keys():
raise ValueError('Keyword {0} already exists and cannot be added!')
self.npar += 1
self.default[key] = None if default is None else default
self.options[key] = [options] if options is not None and not isinstance(options, list) \
else options
self.dtype[key] = [dtype] if dtype is not None and not isinstance(dtype, list) else dtype
self.can_call[key] = False if can_call is None else can_call
self.descr[key] = None if descr is None else descr
self.data[key] = None
try:
self.__setitem__(key, value)
except:
# Delete the added components
del self.default[key]
del self.options[key]
del self.dtype[key]
del self.can_call[key]
del self.descr[key]
# Re-raise the exception
raise
[docs]
def to_config(self, cfg_file=None, section_name=None, section_comment=None, section_level=0,
append=False, quiet=False, exclude_defaults=False, include_descr=True):
"""
Write/Append the parameter set to a configuration file.
Args:
cfg_file (:obj:`str`, optional):
The name of the file to write/append to. If None
(default), the function will just return the list of
strings that would have been written to the file. These
lines can be used to construct a `configobj.ConfigObj`_
instance.
section_name (:obj:`str`, optional):
The top-level name for the config section. This must be
provided if :attr:`cfg_section` is None or any of the
parameters are not also :class:`ParSet` instances
themselves.
section_comment (:obj:`str`, optional):
The top-level comment for the config section based on
this :class:`ParSet`.
section_level (:obj:`int`, optional):
The top level of this :class:`ParSet`. Used for
recursive output of nested :class:`ParSet` objects.
append (:obj:`bool`, optional):
Append this configuration output of this :class:`ParSet`
to the file. False by default. If not appending and
the file exists, the file is automatically overwritten.
quiet (:obj:`bool`, optional):
Suppress all standard output from the function.
exclude_defaults (:obj:`bool`, optional):
Do not include any parameters that are identical to the
defaults.
include_descr (:obj:`bool`, optional):
Include the descriptions of each parameter as comments.
Raises:
ValueError:
Raised if there are types other than :class:`ParSet` in
the parameter list, :attr:`cfg_section` is ``None``, and
no section_name argument was provided.
"""
if cfg_file is not None and os.path.isfile(cfg_file) and not append and not quiet:
warnings.warn('Selected configuration file already exists and will be overwritten!')
config_output = []
if numpy.all([ isinstance(d, ParSet) or d is None for d in self.data.values() ]):
# All the elements are ParSets themselves, so just iterate
# through each one
for k in self.keys():
if self.data[k] is None:
continue
section_comment = self.descr[k] if include_descr else None
config_output += ParSet.config_lines(self.data[k], section_name=k,
section_comment=section_comment,
section_level=section_level,
exclude_defaults=exclude_defaults,
include_descr=include_descr)
else:
# Cannot write the parameters as a configuration file
# without a top-level configuration section
if section_name is None and self.cfg_section is None:
warnings.warn('No top-level section name available; using [default].')
section_name = 'default'
_section_name = self.cfg_section if section_name is None else section_name
_section_comment = self.cfg_comment if section_comment is None else section_comment
config_output += ParSet.config_lines(self, section_name=_section_name,
section_comment=_section_comment,
section_level=section_level,
exclude_defaults=exclude_defaults,
include_descr=include_descr)
if cfg_file is None:
# Only return the list of lines for the output file. Useful
# if you want to use instantly create a new ConfigObj
# instance without having to write a file
return config_output
# Write the file
with open(cfg_file, 'a' if append else 'w') as f:
f.write('\n'.join(config_output))
[docs]
@classmethod
def from_config(cls, cfg, section_name='default', evaluate=True):
"""
Construct the parameter set using a configuration file.
Args:
cfg (:obj:`str`, :obj:`list`):
Either a single string with a file name to read, or a
list of configuration-file-style strings with the
parameters.
section_name (:obj:`str`, optional):
The configuration file section with the parameters.
evaluate (:obj:`bool`, optional):
Evaluate the values in the config object before
assigning them in the subsequent parameter sets. The
parameters in the config file are *always* read as
strings, so this should almost always be true; however,
see the warning below.
.. warning::
When ``evaluate`` is true, the function runs
``eval()`` on all the entries in the `ConfigObj`
dictionary, done using
:func:`mangadap.par.util.recursive_dict_evaluate`.
This has the potential to go haywire if the name of
a parameter unintentionally happens to be identical
to an imported or system-level function. Of course,
this can be useful by allowing one to define the
function to use as a parameter, but it also means
one has to be careful with the values that the
parameters should be allowed to have. The current
way around this is to provide a list of strings that
should be ignored during the evaluation, done using
:func:`mangadap.par.util._eval_ignore`.
Returns:
:class:`ParSet`: The instance of the parameter set.
"""
# Instantiate the ConfigObj instance
_cfg = ConfigObj(cfg)[section_name]
# Evaluate the strings, if requested
if evaluate:
_cfg = recursive_dict_evaluate(_cfg)
# Instantiate the object based on the configuration dictionary
return cls.from_dict(_cfg)
[docs]
@staticmethod
def _rst_class_name(p):
return ':class:`' + type(p).__module__ + '.' + type(p).__name__ + '`'
[docs]
def to_rst_table(self, parsets_listed=[], header=True, class_link=True, nested=True):
r"""
Construct a reStructuredText table with the :class:`ParSet`
data.
This method is mostly meant for documentation purposes, as way
of showing the format and default parameters of a given single
:class:`ParSet` or nested set of :class:`ParSet` objects.
Args:
parsets_listed (:obj:`list`, optional):
For nested :class:`ParSet` objects, this is used to keep
a log of :class:`ParSet` objects that have already been
included in the rst table, forcing the table to only
appear once.
header (:obj:`bool`, optional):
Include a section header
class_link (:obj:`bool`, optional):
Include an rst-style link to the class instantiation
documentation.
Returns:
:obj:`list`: A list of strings containing each line of the
rst table. To print the table::
print('\n'.join(p.to_rst_table()))
where ``p`` is a :class:`ParSet` instance.
"""
new_parsets = []
data_table = numpy.empty((self.npar+1, 5), dtype=object)
data_table[0,:] = ['Key', 'Type', 'Options', 'Default', 'Description']
for i,k in enumerate(self.keys()):
data_table[i+1,0] = ParSet._data_string(k, use_repr=False, verbatum=True)
if nested and isinstance(self.data[k], ParSet):
if type(self.data[k]).__name__ not in parsets_listed:
new_parsets += [k]
parsets_listed += [ type(self.data[k]).__name__ ]
data_table[i+1,1] = ParSet._rst_class_name(self.data[k])
data_table[i+1,3] = '`{0} Keywords`_'.format(type(self.data[k]).__name__)
else:
data_table[i+1,1] = ', '.join(self._types_list(k))
data_table[i+1,3] = '..' if self.default[k] is None \
else ParSet._data_string(self.default[k], use_repr=False,
verbatum=True)
data_table[i+1,2] = '..' if self.options[k] is None \
else ParSet._data_string(self.options[k], use_repr=False,
verbatum=True)
data_table[i+1,4] = '..' if self.descr[k] is None \
else ParSet._data_string(self.descr[k])
output = ['']
if header:
output += ['{0} Keywords'.format(type(self).__name__)]
output += ['-'*len(output[0])]
output += ['']
if class_link:
output += ['Class Instantiation: ' + ParSet._rst_class_name(self)]
output += ['']
output += [ParSet._data_table_string(data_table, delimiter='rst')]
output += ['']
for k in new_parsets:
output += ['----']
output += ['']
output += self.data[k].to_rst_table(parsets_listed=parsets_listed)
return output
[docs]
def validate_keys(self, required=None, can_be_None=None):
"""
Validate the keys in the :class:`ParSet`.
Args:
required (:obj:`list`, optional):
A list of required keys.
can_be_None (:obj:`list`, optional):
A list of keys with values that are allowed to be None.
All other keys are expected to have defined values.
Raises:
ValueError:
Raised if required keys are not present or if keys have
``None`` values and are not expected to be.
"""
if required is None and can_be_None is None:
# No validation rules, so implicitly valid
return
if required is not None:
not_defined = numpy.array([ k not in self.keys() for k in required ])
if numpy.any(not_defined):
raise ValueError('Required keys were not defined: {0}'.format(
numpy.asarray(required)[not_defined].tolist()))
if can_be_None is not None:
should_not_be_None = numpy.array([ self.data[k] is None and k not in can_be_None
for k in self.keys()])
if numpy.any(should_not_be_None):
raise ValueError('These keys should not be None: {0}'.format(
numpy.asarray(self.keys())[should_not_be_None].tolist()))
[docs]
def to_dict(self):
"""
Return a dictionary with the parameters.
.. warning::
This simply returns a pointer to the internal object
dictionary, :attr:`data`.
"""
# TODO: Return a copy?
return self.data
[docs]
@classmethod
def from_dict(cls, data):
"""
Create a :class:`ParSet` from a dictionary.
Objects built in this way are nearly identical to a normal
dictionary, except that one cannot add keys in the same way.
"""
return cls([*data.keys()], values=[*data.values()])
[docs]
@staticmethod
def parse_par_from_hdr(hdr, prefix):
"""
Parse the dictionary of parameters written to a header.
Args:
hdr (`astropy.io.fits.Header`_):
Header object to parse.
prefix (:obj:`str`):
The prefix used for the header keywords.
Returns:
:obj:`dict`: A dictionary with the parameter keywords and
values.
"""
par = {}
for k, v in hdr.items():
# Check if this header keyword starts with the required
# prefix
if k[:len(prefix)] == prefix:
try:
# Try to convert the keyword without the prefix into
# an integer.
i = int(k[len(prefix):])-1
except ValueError:
# Assume the value is some other random keyword that
# starts with the prefix but isn't a parameter
continue
# Assume we've found a parameter. Parse the parameter
# name from the header comment and add it to the
# dictionary.
par_key = hdr.comments[k].split(':')[-1].strip()
par[par_key] = v
return par
[docs]
class KeywordParSet(ParSet):
"""
An abstract class that uses :class:`ParSet` as its base.
The main purpose of this class is to redefine the
:func:`ParSet.from_dict` method and disallow adding new parameters.
"""
[docs]
@classmethod
def from_dict(cls, data, ignore_extra=True):
"""
Construct the object using a dictionary.
"""
if 'key' in data.keys() and data['key'] is None:
return None
k = numpy.array([*data.keys()])
parkeys = [*inspect.signature(cls).parameters.keys()]
if not ignore_extra:
badkeys = numpy.array([pk not in parkeys for pk in k])
if numpy.any(badkeys):
raise ValueError('{0} not recognized key(s) for {1}.'.format(k[badkeys],
cls.__class__.__name__))
kwargs = {}
if len(k) > 0:
for pk in parkeys:
if pk not in k:
continue
kwargs[pk] = data[pk]
return cls(**kwargs)
[docs]
def add(self, *args, **kwargs):
"""
Disallow functionality of base class that adds new parameters.
"""
raise NotImplementedError('Cannot add parameters to a {0} instance.'.format(
self.__class__.__name__))
# TODO: Change this to a DataTable?
[docs]
class ParDatabase:
"""
Class used as a list of ParSets in a glorified structured numpy
array.
Very similar to yanny when converted to a numpy array.
Can be initialized using a list of ParSet objects, or an SDSS
parameter file.
.. todo::
- Check that the data types are the same for all ParSet objects
in the list
- Better handle the NaN values when converting None to a float
type
- Add from_par_file classmethod?
"""
def __init__(self, inp):
"""
nsets - number of parameter sets
npar - number of parameters in each parameter set
data - parameter set data
options - allowed options for values
dtype - allowed datatypes
can_call - parameter is a callable function
"""
_inp = [inp] if isinstance(inp, ParSet) else inp
if not isinstance(_inp, list):
raise TypeError('Input must be a list.')
for i in _inp:
if not isinstance(i, ParSet):
raise TypeError('Input must be a list of ParSet objects.')
self.npar = _inp[0].npar
self.nsets = len(_inp)
keys = _inp[0].keys()
dtypes = []
for i in range(self.nsets):
if _inp[i].npar != self.npar:
raise ValueError('Not all ParSet objects have the same number of parameters.')
if _inp[i].keys() != keys:
raise ValueError('Not all ParSet objects have the same keys.')
# Other checks?
dtypes += [self._set_dtypes(_inp, i)]
for j in range(self.npar):
t = dtypes[0][j][1]
for i in range(1, self.nsets):
if len(dtypes[i][j]) != len(dtypes[0][j]):
raise ValueError('Data types are inconsistent, mixing scalars, lists, arrays.')
if dtypes[i][j][1] != t:
dtypes[0][j] = (dtypes[0][j][0], numpy.dtype(object), dtypes[0][j][2]) \
if len(dtypes[0][j]) == 3 \
else (dtypes[0][j][0], numpy.dtype(object))
record_dtypes = dtypes[0]
data = []
for i in range(self.nsets):
data += [ tuple([_inp[i][k] for k in keys]) ]
# WARNING: None values are converted to nan if data type is
# float
self.data = numpy.array(data, dtype=record_dtypes).view(numpy.recarray)
self.options = inp[0].options.copy()
self.dtype = inp[0].dtype.copy()
self.can_call = inp[0].can_call.copy()
def __getitem__(self, key):
"""
Return the value of the designated key.
Args:
key (str) : Key for new parameter
"""
return self.data[key]
def __len__(self):
return self.nsets
[docs]
def keys(self):
return list(self.data.dtype.names)
[docs]
@staticmethod
def _set_dtypes(inp, i):
keys = inp[i].keys()
dtypes = []
for k in keys:
if inp[i].dtype[k] is None:
dtypes += [(k,object)]
continue
# inp.dtype is always a list
if any([t in inp[i].dtype[k] for t in [int , float]]) \
and any([t in inp[i].dtype[k] for t in [list, numpy.ndarray]]):
warnings.warn('Parameter set has elements that can be either individual ' \
'ints/floats or lists/arrays. Database column {0} will have type ' \
'\'object\'.'.format(k))
dtypes += [(k,object)]
elif len(list({int, float} - set(inp[i].dtype[k]))) == 0:
dtypes += [(k,float)]
elif len(list({list, numpy.ndarray} - set(inp[i].dtype[k]))) == 0 \
or inp[i].dtype[k] == numpy.ndarray:
_inp = numpy.asarray(inp[i][k])
dtypes += [(k,_inp.dtype,_inp.shape)]
elif isinstance(inp[i][k], str):
if any([ _inp[k] is None for _inp in inp]):
dtypes += [(k, object)]
else:
dtypes += [(k,'<U{0:d}'.format(max([ len(_inp[k]) for _inp in inp])))]
else:
dtypes += [(k,type(inp[i][k]))]
return dtypes
[docs]
def append(self, db):
if not isinstance(db, ParDatabase):
raise TypeError('Can only append ParDatabase object.')
try:
self.data = numpy.append(self.data, db.data)
except TypeError as e:
raise TypeError('Could not append data:: {0}'.format(e))