"""
Defines the class used to automate the batch execution of the MaNGA DAP.
----
.. include license and copyright
.. include:: ../include/copy.rst
----
.. include common links, assuming primary doc root is up one directory
.. include:: ../include/links.rst
"""
from pathlib import Path
import time
import shutil
import warnings
from IPython import embed
import numpy
try:
import pbs.queue
except:
warnings.warn('Could not import pbs.queue! Any cluster submission will fail!', ImportWarning)
from ..config import manga, manga_environ, python_versions
from .drpcomplete import DRPComplete
from ..util.parser import arginp_to_list
from ..util.fileio import create_symlink
[docs]
class rundap:
r"""
Class used for automated execution of the MaNGA DAP via submission
of scripts to a cluster.
[ More complete explanation of the class here. ]
.. note::
This is purely a survey-level utility for running the DAP at
Utah, including submitting jobs to the cluster. No user-level
usage of the DAP should be considered here, apart from not
hindering such usage of the primary python programs!
.. todo::
- Have :func:`_read_arg` return the boolean for :attr:`version`
instead of keeping it as an attribute.
- Ditch ``console`` and :func:`_read_arg` in favor of the
more typical ``parser`` and ``main`` pairing.
Args:
overwrite (:obj:`bool`, optional):
Overwrite and redo any existing analysis.
console (:obj:`bool`, optional):
The class has been executed from the terminal with
command-line arguments that should be parsed.
quiet (:obj:`bool`, optional):
Suppress output
print_version (:obj:`bool`, optional):
Print the class version and return.
drpver (:obj:`str`, optional):
DRP version to analyze. Default is defined by
:func:`mangadap.config.manga.drp_version`
redux_path (:obj:`str`, optional):
The top-level path with the DRP files used to override the
default defined by
:func:`mangadap.config.manga.drp_redux_path`.
dapver (:obj:`str`, optional):
The DAP version to use for the analysis, used to override
the default defined by
:func:`mangadap.config.manga.dap_version`. *Only
used to define the file paths, not used to switch to the
specified code version!*
analysis_path (:obj:`str`, optional):
The top-level path for the DAP output files, used to
override the default defined by
:func:`mangadap.config.manga.dap_analysis_path`.
plan_file (:obj:`str`, optional):
Name of the plan file to use for *all* DRP data in this run,
used to override the default defined by
:func:`mangadap.par.analysisplan.AnalysisPlanSet.default`.
platelist (:obj:`int`, :obj:`str`, optional):
List of plates to analyze; default is to search the
:attr:`redux_path` for any DRP files; see
:class:`mangadap.survey.drpcomplete.DRPComplete`.
ifudesignlist (:obj:`int`, :obj:`str`, optional):
List of ifudesigns to analyze: default is to search the
:attr:`redux_path` for any DRP files; see
:class:`mangadap.survey.drpcomplete.DRPComplete`.
combinatorics (:obj:`bool`, optional):
Use all unique combinations of the entered
plate/ifudesign/mode lists to create the full list of DRP
files to analyze.
list_file (:obj:`str`, optional):
File with a list of plates and ifudesigns to analyze. The
file must have two columns, with one plate-ifu
combination per file row.
sres_ext (:obj:`str`, optional):
The extension to use when constructing the
spectral resolution vectors for the MaNGA datacubes. See
:func:`mangadap.datacube.manga.MaNGADataCube.spectral_resolution`.
sres_fill (:obj:`bool`, optional):
Fill masked spectral-resolution data by simple linear
interpolation.
covar_ext (:obj:`str`, optional):
Extension in the MaNGA DRP CUBE file to use as the single
spatial correlation matrix for all wavelength channels.
on_disk (:obj:`bool`, optional):
When searching for available files to analyze, search the
DRP directory path instead of using the data in the DRPall
file.
can_analyze (:obj:`bool`, optional):
Only construct script files for datacubes that can/should be
analyzed by the DAP. See
:func:`~mangadap.survey.drpcomplete.DRPComplete.can_analyze`.
log (:obj:`bool`, optional):
Flag to create the DAP log files.
dapproc (:obj:`bool`, optional):
Flag to execute the main DAP processing.
pltifu_plots (:obj:`bool`, optional):
Create the QA plate-ifu specific plots.
post_process (:obj:`bool`, optional):
Prepare (and possibly perform) the post processing steps:
create the DAPall file and its associated QA plots, and the
fit QA plots for each plate.
post_plots (:obj:`bool`, optional):
Create the post-processing QA plots.
report_progress (:obj:`bool`, optional):
Report the progress of the analysis.
vebose (:obj:`int`, optional):
Verbosity level between 0 and 2.
label (:obj:`str`, optional):
Label to use in cluster queue.
nodes (:obj:`int`, optional):
Number of cluster nodes to use.
qos (:obj:`str`, optional):
Select a specific processor (e.g. sdss-fast when the manga
user). This option is ignored if :attr:`q` is set.
umask (:obj:`str`, optional):
umask to set for output.
walltime (:obj:`str`, optional):
Maximum wall time for cluster job.
hard (:obj:`bool`, optional):
Same as hard keyword in cluster submission; see
:func:`pbs.queue.commit`. This keyword is in the
**opposite** sense of the "--toughness" command-line
options; i.e. including --toughness on the command line sets
hard=False.
create (:obj:`bool`, optional):
Use the pbs package to create the scripts.
submit (:obj:`bool`, optional):
Submit all jobs to the cluster.
queue (:obj:`str`, optional):
Name of the destination queue. When submitting jobs at
Utah, this should **not** be set (leaving it at the default
of None). When submitting jobs at Portsmouth, this can be
used to select either sciama1.q, cluster.q (default), or
sciama3.q.
Attributes:
overwrite (bool): **Only used with :attr:`all`**. Overwrite and
redo any existing analysis.
quiet (bool): Suppress output
print_version (bool): Print the version and then return
mpl (:class:`mangadap.survey.mangampl.MaNGAMPL`): MPL version;
see above.
redux_path (str): The top-level path with the DRP files.
dapver (str): The DAP version.
analysis_path (str): The top-level path for the DAP output
files.
plan_file (str): Name of the plan file to use for ALL DRP data
in this run. Will be None if using the default plan.
plan (:class:`AnalysisPlanSet`): The set of analysis plans
daptypes (:obj:`list`): This list of daptypes, one per plan
platelist (list): List of plates to analyze.
ifudesignlist (list): List of ifudesigns to analyze.
combinatorics (bool): Use all unique combinations of the entered
plate/ifudesign/mode lists to create the full list of DRP
files to analyze.
on_disk (bool): See above
can_analyze (bool): See above
log (str): See above
dapproc (bool): Flag to execute the main DAP processing
pltifu_plots (bool): Create the QA plate-ifu specific plots
post_process (:obj:`bool`): See above
post_plots (:obj:`bool`): See above
report_progress (:obj:`bool`): See abve
label (str): Label to use in cluster queue.
nodes (int): Number of cluster nodes to use.
qos (str): Specific processor to use (Utah specific)
umask (str): umask to set for output.
walltime (str): Maximum wall time for cluster job.
hard (bool): Same as hard keyword in cluster submission; see
:func:`pbs.queue.commit`.
create (bool): Use the pbs package to create the submission
scripts.
submit (bool): Submit all jobs to the cluster.
q (str): Queue destination (Portsmouth specific)
drpc (:class:`mangadap.survey.drpcomplete.DRPComplete`):
Database of the available DRP files and the parameters
necessary to write the DAP par files.
Raises:
FileNotFoundError:
Raised if a plate-ifu list file is provided but it does not
exist, or if the plan file does not exist.
ValueError:
Raised if all processing steps are turned off, if the
selected MPL is undefined; if the fast node is selected with
nodes > 1; if multiple run modes are selected; or if the
plate list, ifudesign list, or mode list are not provided
during a redo mode or if they are provided with any other
mode.
"""
def __init__(self, overwrite=None, quiet=False, drpver=None, redux_path=None, dapver=None,
analysis_path=None, plan_file=None, platelist=None, ifudesignlist=None,
combinatorics=False, list_file=None, sres_ext=None, sres_fill=None,
covar_ext=None, on_disk=False, can_analyze=False, log=False, dapproc=True,
pltifu_plots=True, post_process=False, post_plots=False, report_progress=False,
verbose=0, label='mangadap', nodes=1, cpus=None, qos=None, umask='0027',
walltime='240:00:00', hard=True, create=False, submit=False, queue=None):
# Save run-mode options
self.overwrite = overwrite
self.quiet = quiet
# Override environment
self.drpver = drpver
self.redux_path = None if redux_path is None else Path(redux_path).resolve()
self.dapver = dapver
self.analysis_path = None if analysis_path is None else Path(analysis_path).resolve()
# List of files to analyze
self.plan_file = None if plan_file is None else Path(plan_file).resolve()
_list_file = None if list_file is None else Path(list_file).resolve()
if _list_file is not None and not _list_file.exists():
raise FileNotFoundError(f'No file: {_list_file}')
if list_file is None:
self.platelist = arginp_to_list(platelist, evaluate=True)
self.ifudesignlist = arginp_to_list(ifudesignlist, evaluate=True)
else:
if platelist is not None or ifudesignlist is not None:
warnings.warn('Provided both file and direct list of plate-ifus to process; '
'file input takes precedence.')
self.platelist, self.ifudesignlist = self._read_file_list(_list_file)
self.combinatorics = combinatorics
self.sres_ext = sres_ext
self.sres_fill = sres_fill
self.covar_ext = covar_ext
self.on_disk = on_disk
self.can_analyze = can_analyze
# Set the options for output
self.log = log
self.dapproc = dapproc
self.pltifu_plots = pltifu_plots
self.post_process = post_process
self.post_plots = post_plots
# Report the progess of the cluster, otherwise just finish
# script
self.report_progress = report_progress
self.verbose = verbose
# Cluster queue keywords
self.label = label
self.nodes = nodes
self.cpus = cpus
self.qos = qos
self.umask = umask
self.walltime = walltime
self.hard = hard
self.create = create
self.submit = submit
self.q = queue
# If submitting to the cluster, make sure that scripts will be
# created!
if not self.create and self.submit:
self.create = True
# If setting qos, the number of nodes must be 1
if self.qos is not None and self.nodes > 1:
raise ValueError('When selecting the fast node, must have nodes=1!')
# Check processing steps
nproc = 0
processes = []
if self.dapproc:
nproc += 1
processes += [ 'Main DAP blocks' ]
if self.pltifu_plots:
nproc += 1
processes += [ 'PLATEIFU-specific QA plots' ]
if self.post_process:
nproc += 1
processes += [ 'DAPall' ]
if self.post_plots:
nproc += 1
processes += [ 'DAPall and PLATE-specific QA plots' ]
if nproc < 1:
raise ValueError('No processing steps requested!')
if (self.dapproc or self.pltifu_plots) and self.submit and self.post_process \
and not self.report_progress:
warnings.warn('If both processing and post processing, need to report progress. '
'Setting to report progress.')
self.report_progress = True
# Set versions
if self.drpver is None:
self.drpver = manga.drp_version()
if self.dapver is None:
self.dapver = manga.dap_version()
# Set the output paths
self.redux_path = manga.drp_redux_path(self.drpver) \
if self.redux_path is None else self.redux_path
self.analysis_path = manga.dap_analysis_path(self.drpver, self.dapver) \
if self.analysis_path is None else self.analysis_path
# Set the subdirectory from which the scripts are sourced and
# the various log files are written. Currently based on start
# UTC; e.g., analysis_path/log/19May2016T09.17.42UTC
self.calling_path = self.analysis_path / 'log' \
/ time.strftime('%d%b%YT%H.%M.%SUTC',time.gmtime())
# Create the calling path. The existence test is executed, even
# though time stamp should mean that calling_path never exists.
# TODO: allow argument to set calling_path directly?
if not self.calling_path.exists():
self.calling_path.mkdir(parents=True)
# Allow for the default plan.
if self.plan_file is None:
warnings.warn('No analysis plan was provided. DAP executions will adopt the default '
'plan.')
else:
if not self.plan_file.exists():
raise FileNotFoundError(f'Plan file does not exist: {self.plan_file}')
_plan_file = self.plan_file
self.plan_file = self.calling_path / self.plan_file.name
if not self.plan_file.exists():
shutil.copy2(_plan_file, self.plan_file)
# Construct the DAPTYPEs and check that the plans yield a fully
# unique list
self.plan = manga.MaNGAAnalysisPlan.default(analysis_path=self.analysis_path) \
if self.plan_file is None \
else manga.MaNGAAnalysisPlan.from_toml(self.plan_file,
analysis_path=self.analysis_path)
self.daptypes = [self.plan[key]['key'] for key in self.plan.keys()]
if len(numpy.unique(self.daptypes)) != self.plan.nplans:
raise ValueError('{0} do not yield unique DAPTYPEs.'.format(
'Default plans' if self.plan_file is None
else f'Plans in {self.plan_file} '))
# Alert the user of the versions to be used
if self.q is not None:
print(f'Attempting to submit to queue: {self.q}')
print('Versions:')
for key in python_versions.keys():
print(f' {key:>12}: {python_versions[key]:<30}')
print(f' {"DRP":>12}: {self.drpver:<30}')
print(f' {"DAP":>12}: {self.dapver:<30}')
print('Paths:')
print(f' REDUX: {self.redux_path}')
print(f' ANALYSIS: {self.analysis_path}')
print(f' CALLING: {self.calling_path}')
if self.plan_file is not None:
print(f' PLAN FILE: {self.plan_file}')
print('Processing steps added to scripts:')
print(f' {processes}')
# Create and update the DRPComplete file if necessary
self.drpc = DRPComplete(drpver=self.drpver, redux_path=self.redux_path,
dapver=self.dapver, analysis_path=self.analysis_path)
# Update the DRPComplete list; force an update if platetarget
# files are provided
self.drpc.update(platelist=self.platelist, ifudesignlist=self.ifudesignlist,
combinatorics=self.combinatorics, on_disk=self.on_disk)
# Hereafter, self.platelist and self.ifuplatelist are the INPUT
# values, whereas self.drpc.platelist and self.drpc.ifuplatelist
# are the actual DRP reductions to analyze (including
# combinatorics and file existence tests). See, e.g.,
# selected_drpfile_list().
# Find the rows in the DRPComplete database with the DRP file
# meta data
drpc_rows = self._selected_drpfile_rows(can_analyze=self.can_analyze)
print(f'Number of DRP files to process: {len(drpc_rows)}')
if len(drpc_rows) == 0:
# If coded correctly, this function should never return here.
return
# Construct the post-processing scripts before
# constructing/submitting datacube processing scripts
if self.post_process or self.post_plots:
post_queue = self._build_post_queue(drpc_rows)
# Process the relevant datacubes
if self.dapproc or self.pltifu_plots:
# Create the script files for the set of drpfiles, and
# submit the scripts to the queue if self.submit is true
proc_queue = self._build_proc_queue(drpc_rows)
# Submit the queue to the cluster
if self.create:
proc_queue.commit(hard=self.hard, submit=self.submit)
# If not reporting progress, done
if not self.report_progress:
return
# Wait until the queue is finished
running = True
while running and self.submit:
time.sleep(60)
percent_complete = proc_queue.get_percent_complete()
print(f'Percent complete ... {percent_complete:.1f}%', end='\r')
if percent_complete == 100:
running = False
print(f'Percent complete ... {percent_complete:.1f}%')
if not self.post_process and not self.post_plots:
# Don't perform the post-processing
return
# Submit the post-processing queue to the cluster
if self.create:
post_queue.commit(hard=self.hard, submit=self.submit)
# ******************************************************************
# UTILITY FUNCTIONS
# ******************************************************************
[docs]
def _read_file_list(self, list_file):
db = numpy.genfromtxt(str(list_file), dtype=object)
if db.shape[1] != 2:
raise ValueError('Input file should contain 2 columns, plate and ifu.')
return db[:,0].astype(int).tolist(), db[:,1].astype(int).tolist()
[docs]
def _create_queue(self):
"""
Create a queue instance.
This function is written to be functional for submission to the
cluster queues both at Utah and using the sciama cluster at
Portsmouth. The available queues are:
+-----------+--------+------+-----------+
| Queue | Nodes | PPN | GB/core |
+===========+========+======+===========+
| ember | ?? | 12 | ?? |
+-----------+--------+------+-----------+
| sciama1.q | 80 | 12 | 2 |
+-----------+--------+------+-----------+
| sciama2.q | 96 | 16 | 4 |
+-----------+--------+------+-----------+
| sciama3.q | 48 | 20 | 3.2 |
+-----------+--------+------+-----------+
One should **not** submit to sciama3.q without permission from
Will Percival. The default queue is cluster.q (sciama2.q).
Submissions to Utah should have their queue destination set to
None. However, the fast node can be selected using :attr:`qos`.
"""
# Instantiate
queue = pbs.queue(verbose=not self.quiet)
if self.q == 'ember':
# Submitting to Utah ember cluster
ppn = 12
cpus = ppn if self.cpus is None else min(self.cpus, ppn)
walltime = self.walltime if int(self.walltime.split(':')[0]) < 72 else '72:00:00'
queue.create(label=self.label, nodes=self.nodes, qos=self.qos, umask=self.umask,
walltime=walltime, ppn=ppn, cpus=cpus, partition='ember', alloc='sdss')
elif self.q is not None:
# All other self.q values expected for Portsmouth cluster,
# sciama. In this case, the number of nodes is queue
# dependent, and qos is not set
if self.q == 'sciama1.q':
ppn = 12
elif self.q == 'sciama3.q':
ppn = 20
else:
ppn = 16
cpus = ppn if self.cpus is None else min(self.cpus, ppn)
queue.create(label=self.label, nodes=self.nodes, umask=self.umask,
walltime=self.walltime, queue=self.q, ppn=ppn, cpus=cpus)
else:
# self.q can be None when submitting to both the Portsmouth
# and Utah clusters. In this case, the default queue
# destination and ppn is correct. qos is also set, but this
# should only be used when submitting to Utah.
ppn = 16
cpus = ppn if self.cpus is None else min(self.cpus, ppn)
queue.create(label=self.label, nodes=self.nodes, qos=self.qos, umask=self.umask,
walltime=self.walltime, ppn=ppn, cpus=cpus)
return queue
# Files:
# - mangadap-{plate}-{ifudesign}-LOG{mode} = script file = *
# - *.ready = script/directory is ready for queue submission
# - *.queued = script submitted to queue
# - *.started = script assigned a node/processor and has started running
# - *.done = completed execution of the full script
# Other:
# - *.par = parameter file
# - *.out, *.err = stdout and stderr output from the script
[docs]
def _build_proc_queue(self, drpc_rows):
"""
Create a queue instance, write the script files for all CPUs
using :func:`prepare_for_analysis` and append the job to the
queue, and then submit the jobs to the cluster queue. If
:attr:`submit` is False, this is essentially a wrapper for
:func:`prepare_for_analysis`.
This function is written to be functional for submission to the
cluster queues both at Utah and using the sciama cluster at
Portsmouth; see :func:`_create_queue`.
.. todo::
- This algorithm is slow because it has to search through
the drpcomplete fits file each time to find the
plate/ifudesign combo. Should make this more efficient.
Args:
drpc_rows (`numpy.ndarray`):
A vector with the rows in the
:class:`mangadap.survey.drpcomplete.DRPComplete`
database with the DRP files to analyze.
"""
# If submitting to the queue, create the queue object. The
# creation of the queue object is system dependent; however,
# once created, the queue object is treated identically for both
# the Utah and Portsmouth clusters.
if self.create:
queue = self._create_queue()
# Create the script files, regardless of whether or not they are
# submitted to the queue, appending the scripts to the queue if
# they are to be submitted
ndrp = len(drpc_rows)
for i, index in enumerate(drpc_rows):
# Write the compute script (write_compute_script also checks
# the path exists!)
scriptfile, stdoutfile, stderrfile \
= self.write_compute_script(index, dapproc=self.dapproc,
plots=self.pltifu_plots, overwrite=self.overwrite)
# Set the status to ready
plate = self.drpc['PLATE'][index]
ifudesign = self.drpc['IFUDESIGN'][index]
self.set_pltifu_status(plate, ifudesign, status='ready')
if self.create:
queue.append('source {0}'.format(scriptfile), outfile=stdoutfile,
errfile=stderrfile)
self.set_pltifu_status(plate, ifudesign, status='queued')
else:
print(f'Preparing for analysis...{(i+1)*100/ndrp:.1f}%', end='\r')
if not self.create:
print(f'Preparing for analysis...100%')
return queue if self.create else None
[docs]
def _build_post_queue(self, drpc_rows):
"""
Build the queue that performs the post-processing steps.
"""
# Create the queue
if self.create:
queue = self._create_queue()
if self.post_process:
# Write the DAPall script and set its status
dapall_scr, dapall_out, dapall_err \
= self.write_dapall_script(plots=self.post_plots, overwrite=self.overwrite)
self.set_status(dapall_scr, status='ready')
if self.create:
# Add it to the queue
queue.append(f'source {dapall_scr}', outfile=dapall_out, errfile=dapall_err)
self.set_status(dapall_scr, status='queued')
# Write the plate QA plot scripts
if self.post_plots:
# Find the unique plates
plates = numpy.unique(self.drpc['PLATE'][drpc_rows])
for plt in plates:
# Write the plot script for this plate
scriptfile, stdoutfile, stderrfile \
= self.write_plate_qa_script(plt, overwrite=self.overwrite)
self.set_status(scriptfile, status='ready')
if self.create:
queue.append(f'source {scriptfile}', outfile=stdoutfile, errfile=stderrfile)
self.set_status(scriptfile, status='queued')
return queue if self.create else None
# ******************************************************************
# Management
# ******************************************************************
[docs]
def set_pltifu_status(self, plate, ifudesign, status='queued'):
"""
Generate a touch file that signifies the status for a given
plate/ifudesign/mode.
Args:
plate (:obj:`int`):
Plate number
ifudesign (:obj:`int`):
IFU design number
status (:obj:`str`, optional):
The status signifier for the touch file.
"""
# Get the name of the status file
root = manga.MaNGAConfig(plate, ifudesign, drpver=self.drpver,
redux_path=self.redux_path).cfg_root
self.set_status(str(self.calling_path / str(plate) / str(ifudesign) / root), status)
[docs]
def set_status(self, root, status='queued'):
"""
Generate a general touch file.
Args:
root (:obj:`str`): Full path and root name of the touch file
status (:obj:`str`, optional):
The status signifier for the touch file.
"""
# Touch the status file
Path(f'{root}.{status}').touch()
[docs]
def _selected_drpfile_rows(self, can_analyze=False):
"""
Find the rows in the
:class:`mangadap.survey.drpcomplete.DRPComplete` database with
the selected plates and IFUs to analyze.
Args:
can_analyze (:obj:`bool`, optional):
In addition to returning the selected rows, only include those
rows with DRP datacubes that can be analyzed. See
:func:`mangadap.survey.drpcomplete.DRPComplete.can_analyze`.
Returns:
`numpy.ndarray`_: A vector with the rows in the DRPComplete object
with the relevant DRP file data.
"""
# Find the rows in the DRPComplete database with the specified
# plate and IFUS
# All available PLATEIFUs
drpc_pltifu = numpy.array([f'{p}-{i}' for p,i in
zip(self.drpc['PLATE'],self.drpc['IFUDESIGN'])])
# The PLATEIFUs to analyze
this_pltifu = numpy.array([f'{p}-{i}' for p,i in
zip(self.drpc.platelist, self.drpc.ifudesignlist)])
# The rows with the PLATEIFUs to analyze
rows = numpy.array([numpy.where(drpc_pltifu == pi)[0][0] if pi in drpc_pltifu else -1 \
for pi in this_pltifu])
if numpy.any(rows < 0):
raise ValueError('Should be able to find all plates and ifus.')
if can_analyze:
keep = self.drpc.can_analyze()[rows]
rows = rows[keep]
return rows
[docs]
def write_compute_script(self, index, dapproc=True, plots=True, overwrite=False,
relative_symlink=True):
"""
Write the MaNGA DAP script file that is sent to a single CPU to
analyze a single DRP file with a given plate and ifudesign.
Args:
index (:obj:`int`, optional):
Row in the
:class:`mangadap.survey.drpcomplete.DRPComplete`
database with the DRP file to analysis.
dapproc (:obj:`bool`, optional):
Flag to execute the main DAP processing.
plots (:obj:`bool`, optional):
Create the QA plots.
overwrite (:obj:`bool`, optional):
Flag to overwrite any existing files.
relative_symlink (:obj:`bool`, optional):
Set the symlink to the par file to be a relative path
instead of the absolute path.
Returns:
:obj:`str`: Three strings with the name of the written
script file, the file for the output sent to STDOUT, and the
file for the output sent to STDERR.
"""
# Fake out the plan cube to just require the configuration information
self.plan.cube = manga.MaNGAConfig(self.drpc['PLATE'][index],
self.drpc['IFUDESIGN'][index], drpver=self.drpver,
redux_path=self.redux_path)
cfgfile = self.plan.common_path() / f'{self.plan.cube.cfg_root}.ini'
# Write the configuration file if it doesn't exist
if not cfgfile.exists() or overwrite:
# overwrite defaults to True
self.drpc.write_config(cfgfile, index=index, sres_ext=self.sres_ext,
sres_fill=self.sres_fill, covar_ext=self.covar_ext)
# and create symlinks to it
for i, key in enumerate(self.plan.keys()):
method_ref_dir = self.plan.method_path(plan_index=i, ref=True)
create_symlink(str(cfgfile), str(method_ref_dir),
relative_symlink=relative_symlink, overwrite=overwrite, quiet=True)
# Set the root path for the scripts, inputs, outputs, and logs
_calling_path = self.calling_path / str(self.plan.cube.plate) \
/ str(self.plan.cube.ifudesign)
# Set the names for the script, stdout, and stderr files
scriptfile = _calling_path / f'{self.plan.cube.cfg_root}'
stdoutfile = Path(f'{scriptfile}.out').resolve()
stderrfile = Path(f'{scriptfile}.err').resolve()
################################################################
# Script file already exists, so just return
if scriptfile.exists() and not overwrite:
return scriptfile, stdoutfile, stderrfile
# Main script components are:
# - (dapproc) Run the DAP processing code
# - (plots) Run the plotting scripts to plot the output
# ONE OF THESE 2 MUST BE TRUE
################################################################
# Open the script file and write the date as a commented header
# line
if not scriptfile.parent.exists():
scriptfile.parent.mkdir(parents=True)
file = open(scriptfile, 'w')
file.write('# Auto-generated batch file\n')
file.write(f'# {time.strftime("%a %d %b %Y %H:%M:%S",time.localtime())}\n')
file.write('\n')
# Create the started touch file
startfile = f'{scriptfile}.started'
file.write(f'touch {startfile}\n')
file.write('\n')
# Command that runs the DAP
if dapproc:
command = f'OMP_NUM_THREADS=1 manga_dap -c {cfgfile} -o {self.analysis_path}'
if self.plan_file is not None:
command += f' -p {self.plan_file}'
if self.log:
command += f' --log {scriptfile}.log'
if self.verbose > 0:
command += (' -'+'v'*self.verbose)
file.write(f'{command}\n')
file.write('\n')
# Plotting scripts
if plots:
command = f'OMP_NUM_THREADS=1 dap_ppxffit_qa -c {cfgfile} -o {self.analysis_path}'
if self.plan_file is not None:
command += f' -p {self.plan_file}'
command += f' -b 2.5'
file.write(f'{command}\n')
file.write('\n')
command = f'OMP_NUM_THREADS=1 spotcheck_dap_maps -c {cfgfile} -o {self.analysis_path}'
if self.plan_file is not None:
command += f' -p {self.plan_file}'
command += f' -b 2.5'
file.write(f'{command}\n')
file.write('\n')
command = f'OMP_NUM_THREADS=1 dap_fit_residuals -c {cfgfile} -o {self.analysis_path}'
if self.plan_file is not None:
command += f' -p {self.plan_file}'
command += f' -b 2.5'
file.write(f'{command}\n')
file.write('\n')
# Touch the done file
donefile = f'{scriptfile}.done'
file.write(f'touch {donefile}\n')
file.write('\n')
# Done
file.close()
# Return the script file, file for stdout, and file for stderr
return scriptfile, stdoutfile, stderrfile
[docs]
def write_dapall_script(self, plots=True, overwrite=False):
"""
Write the script used to construct the DAPall file and its
associated quality assessment plots.
Args:
plots (:obj:`bool`, optional):
Create the QA plots. Default is True.
overwrite (:obj:`bool`, optional):
Flag to overwrite any existing files.
Returns:
:obj:`str`: Three strings with the name of the written
script file, the file for the output sent to STDOUT, and the
file for the output sent to STDERR.
"""
# Check that the path exists, creating it if not
if not self.calling_path.exists():
self.calling_path.mkdir(parents=True)
# Set the names for the script, stdout, and stderr files
scriptfile = self.calling_path / 'build_dapall'
stdoutfile = f'{scriptfile}.out'
stderrfile = f'{scriptfile}.err'
# Script file already exists, so just return
if scriptfile.exists() and not overwrite:
return scriptfile, stdoutfile, stderrfile
# Open the script file and write the date as a commented header
# line
file = open(scriptfile, 'w')
file.write('# Auto-generated batch file\n')
file.write(f'# {time.strftime("%a %d %b %Y %H:%M:%S",time.localtime())}\n')
file.write('\n')
# Create the started touch file
startfile = f'{scriptfile}.started'
file.write(f'touch {startfile}\n')
file.write('\n')
# Command that constructs the DAPall file
command = 'OMP_NUM_THREADS=1 '
command += f'construct_dapall --drpver {self.drpver} -r {self.redux_path} ' \
f'--dapver {self.dapver} -a {self.analysis_path}'
if self.plan_file is not None:
command += f' --plan_file {self.plan_file}'
if self.verbose > 0:
command += (' -'+'v'*self.verbose )
file.write(f'{command}\n')
file.write('\n')
# Add the plotting commands
if plots:
command = f'OMP_NUM_THREADS=1 dapall_qa --drpver {self.drpver} ' \
f'--redux_path {self.redux_path} --dapver {self.dapver} ' \
f'--analysis_path {self.analysis_path}'
if self.plan_file is not None:
command += f' --plan_file {self.plan_file}'
file.write(f'{command}\n')
file.write('\n')
# Touch the done file
donefile = f'{scriptfile}.done'
file.write(f'touch {donefile}\n')
file.write('\n')
file.close()
################################################################
# Return the script file, file for stdout, and file for stderr
return scriptfile, stdoutfile, stderrfile
[docs]
def write_plate_qa_script(self, plate, overwrite=False):
"""
Write the script used to create the plate fit QA plot.
Args:
plate (:obj:`int`):
The plate number for the plot.
overwrite (:obj:`bool`, optional):
Flag to overwrite any existing files.
Returns:
:obj:`str`: Three strings with the name of the written
script file, the file for the output sent to STDOUT, and the
file for the output sent to STDERR.
"""
# Check that the path exists, creating it if not
path = self.calling_path / str(plate)
if not path.exists():
path.mkdir(parents=True)
# Set the names for the script, stdout, and stderr files
scriptfile = self.calling_path / str(plate) / f'{plate}_fitqa'
stdoutfile = f'{scriptfile}.out'
stderrfile = f'{scriptfile}.err'
# Script file already exists, so just return
if scriptfile.exists() and not overwrite:
return scriptfile, stdoutfile, stderrfile
# Open the script file and write the date as a commented header
# line
file = open(scriptfile, 'w')
file.write('# Auto-generated batch file\n')
file.write(f'# {time.strftime("%a %d %b %Y %H:%M:%S",time.localtime())}\n')
file.write('\n')
# Create the started touch file
startfile = f'{scriptfile}.started'
file.write(f'touch {startfile}\n')
file.write('\n')
# Command that creates the plots
command = f'OMP_NUM_THREADS=1 dap_plate_fit_qa {plate} --analysis_path {self.analysis_path}'
if self.plan_file is not None:
command += f' --plan_file {self.plan_file}'
file.write(f'{command}\n')
file.write('\n')
# Touch the done file
donefile = f'{scriptfile}.done'
file.write(f'touch {donefile}\n')
file.write('\n')
file.close()
################################################################
# Return the script file, file for stdout, and file for stderr
return scriptfile, stdoutfile, stderrfile