4 Defines a set of classes and a command-line interface for submitting batch jobs.
6 Supported systems include serial execution locally, a multiprocessing pool,
16 import multiprocessing
18 from pathlib
import Path
21 import xml.etree.ElementTree
as ET
22 from xml.dom
import minidom
23 from xml.sax.saxutils
import unescape
24 from distutils.spawn
import find_executable
26 from abc
import ABC, abstractmethod
28 from hpsmc.job import Job, JobStore, JobScriptDatabase
30 logger = logging.getLogger(
"hpsmc.batch")
32 RUN_SCRIPT = os.path.join(os.path.dirname(os.path.realpath(__file__)),
'job.py')
37 Generic batch processing interface.
42 parser = argparse.ArgumentParser(self.__class__.__name__,
43 epilog=
'Available scripts: %s' %
', '.join(
JobScriptDatabase().get_script_names()))
45 parser.add_argument(
"-c",
"--config-file", nargs=
'?', help=
"Config file", action=
'append')
46 parser.add_argument(
"-l",
"--log-dir", nargs=
'?', help=
"Log file output dir", required=
False, default=str(Path(os.getcwd(),
'logs')))
47 parser.add_argument(
"-d",
"--run-dir", nargs=
'?', help=
"Base run dir for the jobs (must be an absolute path)", default=
None)
48 parser.add_argument(
"-D",
"--debug", action=
'store_true', help=
"Enable debug settings", required=
False)
49 parser.add_argument(
"-o",
"--check-output", action=
'store_true', required=
False, help=
"Do not submit jobs where output files already exist")
50 parser.add_argument(
"-s",
"--job-steps", type=int, default=
None, required=
False)
51 parser.add_argument(
"-r",
"--job-range", nargs=
'?', help=
"Submit jobs numbers within range (e.g. '1:100')", required=
False)
52 parser.add_argument(
"script", nargs=
'?', help=
"Name of job script")
53 parser.add_argument(
"jobstore", nargs=
'?', help=
"Job store in JSON format")
54 parser.add_argument(
"jobids", nargs=
"*", type=int, help=
"List of individual job IDs to submit (optional)")
59 """! Parse command line arguments and perform setup."""
66 raise Exception(
'The script is a required argument.')
69 if not script_db.exists(self.
script_namescript_name):
70 raise Exception(
'The script name is not valid: %s' % self.
script_namescript_name)
72 if not os.path.isfile(self.
scriptscript):
73 raise Exception(
'The job script does not exist: %s' % self.
scriptscript)
75 if cl.jobstore
is None:
76 raise Exception(
'The job store file is a required argument.')
77 if not os.path.isfile(cl.jobstore):
78 raise Exception(
'The job store does not exist: %s' % cl.jobstore)
84 self.
log_dirlog_dir = os.path.abspath(cl.log_dir)
85 logger.info(
'log dir: {}'.format(self.
log_dirlog_dir))
86 if not os.path.exists(self.
log_dirlog_dir):
87 os.makedirs(self.
log_dirlog_dir)
88 logger.info(
'Created log dir: {}'.format(self.
log_dirlog_dir))
92 if self.
run_dirrun_dir
is not None:
93 logger.info(
'run dir: {}'.format(self.
run_dirrun_dir))
94 if not os.path.isabs(self.
run_dirrun_dir):
96 raise Exception(
"The run dir for batch processing must be an abs path.")
101 self.
job_idsjob_ids = list(map(int, cl.jobids))
108 toks = cl.job_range.split(
':')
110 raise ValueError(
'Bad format for job range: ' + cl.job_range)
114 raise ValueError(
"The start job number must be >= the end job num when using a range.")
116 raise ValueError(
"The job range numbers must be > 0.")
122 self.
config_filesconfig_files = list(map(os.path.abspath, cl.config_file))
131 Submit a single batch job and return the batch ID.
133 This is abstract as each batch system will do this differently.
135 Some batch systems don't implement this but sub-classes should override this and make it a
136 no-op so that they can be instantiated.
142 This is the generic batch submission function which gets a list of jobs to run based on command line
143 arguments and submits them individually. It calls the abstract submit_job() method and prints the batch
144 system ID that was returned, if any.
146 job_ids = self._get_filtered_job_ids()
147 logger.info(
'Submitting jobs: %s' % str(job_ids))
148 for job_id
in job_ids:
149 if not self.jobstore.has_job_id(job_id):
150 raise Exception(
'Job ID was not found in job store: %s' % job_id)
151 job_data = self.jobstore.get_job(job_id)
152 batch_id = self.submit_job(job_id)
153 logger.info(f
"Submitted job {job_id} with batch ID {str(batch_id)}")
157 raise Exception(
'Missing valid job ID')
158 return str(Path(os.getcwd(),
'scratch', str(job_id)))
162 This is the basic implementation of building a command to run the job from a batch system.
164 cmd = [sys.executable, RUN_SCRIPT,
'run']
165 logfile = self.
_logfile_logfile(job_id)
166 cmd.extend([
'-o', f
"{logfile}.out",
167 '-e', f
"{logfile}.err"])
171 job_dir = str(Path(self.
run_dirrun_dir, str(job_id)))
176 logger.debug(f
'job dir: {job_dir}')
177 cmd.extend([
'-d', job_dir])
181 cmd.extend([
'-c', cfg])
183 cmd.extend([
'--job-steps', str(self.
job_stepsjob_steps)])
184 cmd.extend([
'-i', str(job_id)])
185 cmd.append(self.
scriptscript)
186 cmd.append(os.path.abspath(self.
jobstorejobstore.path))
187 logger.debug(
"Job command: %s" %
" ".join(cmd))
192 Get the base name of a log file for the job.
194 return os.path.abspath(os.path.join(self.
log_dirlog_dir,
'job.%s' % str(job_id)))
199 Check if all output files exist for the given job. This is not the job ID but the full JSON job data.
201 Return False when first missing output is found.
203 for src, dest
in job[
"output_files"].items():
204 if not os.path.isfile(os.path.join(job[
"output_dir"], dest)):
205 logger.debug(
'Job output does not exist: %s -> %s' % (src, dest))
211 Get a list of job IDs to submit based on parsed command line options and whether output files are being checked.
213 submit_ids = self.
jobstorejobstore.get_job_ids()
214 logger.debug(
'Initial pre-filtered job IDs: {}'.format(str(submit_ids)))
216 submit_ids = [job_id
for job_id
in submit_ids
219 submit_ids = self.
job_idsjob_ids
220 logger.debug(
'job IDs after range check: {}'.format(str(submit_ids)))
223 logger.info(
'job IDs after output file check: {}'.format(str(submit_ids)))
227 """! Get a list of IDs for jobs that are missing output files."""
228 return [job_id
for job_id
in job_ids
if not self.
_outputs_exist_outputs_exist(self.
jobstorejobstore.get_job(job_id))]
233 Represents a batch processing system that requires submission like Slurm or Auger.
235 This subclasses Batch because it adds a number of different parameters which do not apply to all the
236 batch system types (namely Pool and Local).
243 self.
parserparser.add_argument(
"-q",
"--queue", nargs=
'?',
244 help=
"Job queue or partition",
246 self.
parserparser.add_argument(
"-W",
"--job-length", type=int, help=
"Max job length in hours", required=
False, default=4)
247 self.
parserparser.add_argument(
"-m",
"--memory", type=int, help=
"Max job memory allocation in MB", default=2000)
248 self.
parserparser.add_argument(
"-f",
"--diskspace", type=int, help=
"Disk space needed for job in GB", default=20)
249 self.
parserparser.add_argument(
"-e",
"--email", nargs=
'?', help=
"Email address for job notifications", required=
False)
251 self.
parserparser.add_argument(
"-O",
"--os", nargs=
'?', help=
"Operating system of batch nodes (Auger and LSF)")
254 self.
sitesite = BatchSystem._site()
257 """! Parse command line arguments and perform setup."""
272 fqdn = socket.getfqdn()
274 if 'slac.stanford.edu' in fqdn:
276 elif 'jlab.org' in fqdn:
282 """! Submit LSF batch jobs."""
289 os.environ[
'LSB_JOB_REPORT_MAIL'] =
'Y' if self.
emailemail
else 'N'
293 log_file = os.path.abspath(os.path.join(self.
log_dirlog_dir,
'job.%s.log' % str(job_id)))
295 queue = self.
queuequeue
299 if self.
osos
is not None:
312 cmd.extend([
'-u', self.
emailemail])
314 cmd.extend(super().
build_cmd(self, job_id))
320 logger.info(
'Submitting job %s to LSF with command: %s' % (job_id,
' '.join(cmd)))
321 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
322 out, err = proc.communicate()
323 if err
is not None and len(err):
325 tokens = out.decode().split(
" ")
326 if tokens[0] !=
'Job':
327 raise Exception(
'Unexpected output from bsub command: %s' % out)
328 batch_id = int(tokens[1].replace(
'<',
'').replace(
'>',
''))
333 """! Submit Slurm batch jobs."""
339 self.
parserparser.add_argument(
"-S",
"--sh-dir", nargs=
'?', help=
"Directory to hold generated shell scripts for Slurm", default=str(Path(os.getcwd(),
'sh')))
340 self.
parserparser.add_argument(
"-E",
"--env", nargs=
'?', help=
"Full path to env setup script", required=
False, default=
None)
341 self.
parserparser.add_argument(
"-A",
"--account", nargs=
'?', help=
"Account name for s3df slurm jobs.", required=
False, default=
None)
352 self.
sh_dirsh_dir = os.path.abspath(cl.sh_dir)
353 logger.info(
'Slurm sh dir: {}'.format(self.
sh_dirsh_dir))
354 if not os.path.exists(self.
sh_dirsh_dir):
355 os.makedirs(self.
sh_dirsh_dir)
356 logger.info(
'Created Slurm sh dir: {}'.format(self.
sh_dirsh_dir))
360 Override the basic implementation for getting the default run directory.
363 run_dir =
'$LSCRATCH'
365 run_dir =
'/scratch/slurm/$SLURM_JOBID'
367 run_dir = os.getcwd() +
"/scratch/$SLURM_JOBID"
371 queue = self.
queuequeue
378 raise Exception(
'No queue name was provided.')
382 log_file = self.
_logfile_logfile(job_id)
383 sbatch_cmd = [
'sbatch',
384 '--time=%s' % (str(self.
job_lengthjob_length) +
':00:00'),
385 '--mem=%sM' % self.
memorymemory,
386 '--job-name=%i_%s' % (job_id, self.
script_namescript_name),
387 '--output=%s.out' % log_file,
388 '--error=%s.err' % log_file]
390 sbatch_cmd.extend([f
'--partition={self.queue}'])
392 sbatch_cmd.extend([f
'--account={self.account}'])
394 sbatch_cmd.extend([f
'--mail-user={self.email}',
399 return self.
sh_dirsh_dir +
'/job.%i.sh' % job_id
403 Wrap submission of Slurm jobs using a generated script.
407 cmd = self.
_sbatch_sbatch(job_id)
414 if self.
run_dirrun_dir
is None:
424 cmd.append(sh_filename)
430 Write the shell script for Slurm job submission using the 'sbatch' command.
433 script_lines = [
'#!/bin/bash',
436 script_lines.append(f
'source {self.env}')
437 script_lines.extend([
'echo Start time: `date`',
439 'echo ---- Start Environment ----',
441 'echo ---- End Environment ----',
442 'time ' +
' '.join(job_cmd),
443 'echo End time: `date`'])
445 logger.debug(
"Slurm submission script:\n" + str(script_lines))
447 with open(sh_filename,
'w')
as sh_file:
448 for script_line
in script_lines:
449 sh_file.write(script_line +
'\n')
451 logger.debug(
'Wrote Slurm submission script to: '.format(str(Path(self.
sh_dirsh_dir, sh_filename))))
455 logger.info(
'Submitting job %s to Slurm with command: %s' % (job_id,
' '.join(cmd)))
456 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
457 out, err = proc.communicate()
458 if err
is not None and len(err):
460 tokens = out.decode().split(
" ")
461 if tokens[0] !=
'Submitted':
462 raise Exception(
'Unexpected output from sbatch command: %s' % out)
463 batch_id = int(tokens[3].replace(
'<',
'').replace(
'>',
''))
469 Submit Auger batch jobs.
471 Auger itself is actually deprecated and unavailable but its submission XML format is supported by
472 the Swif class (see below).
482 raise Exception(
"Failed to find 'hps-mc-env.csh' in environment.")
486 Make this a no-op. Auger is a bit of a special case in terms of how batch submission works with a
487 generated XML file including all job IDs, so we do not implement single job submission.
493 Batch submission method for Auger.
495 This differs from some of the other systems in that it doesn't loop over individual
496 job IDs. Instead a single XML file is submitted for all the jobs at once.
498 xml_filename = self._create_job_xml()
499 auger_ids = self._jsub(xml_filename)
500 logger.info(
"Submitted Auger jobs: %s" % str(auger_ids))
504 logger.info(
'Submitting jobs: %s' % str(job_ids))
506 for job_id
in job_ids:
507 if not self.
jobstorejobstore.has_job_id(job_id):
508 raise Exception(
'Job ID was not found in job store: %s' % job_id)
509 job_params = self.
jobstorejobstore.get_job(job_id)
510 if self.
check_outputcheck_output
and Batch._outputs_exist(job_params):
511 logger.warning(
"Skipping Auger submission for job "
512 "because outputs already exist: %d" % job_id)
514 self.
_add_job_add_job(req, job_params)
518 cmd = [
'jsub',
'-xml', xml_filename]
519 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
520 out, err = proc.communicate()
526 for line
in out.splitlines():
527 if line.strip().startswith(b
'<jsub>'):
528 j = ET.fromstring(line)
529 for req
in j.getchildren():
530 for child
in req.getchildren():
531 if child.tag ==
'jobIndex':
532 auger_id = int(child.text)
533 auger_ids.append(auger_id)
534 elif child.tag ==
'error':
536 raise Exception(child.text)
541 pretty = unescape(minidom.parseString(ET.tostring(req)).toprettyxml(indent=
" "))
542 with open(filename,
'w')
as f:
547 req = ET.Element(
"Request")
548 name_elem = ET.SubElement(req,
"Name")
549 name_elem.set(
"name", req_name)
550 prj = ET.SubElement(req,
"Project")
551 prj.set(
"name",
"hps")
552 trk = ET.SubElement(req,
"Track")
555 trk.set(
"name",
"debug")
559 if self.
queuequeue
is not None:
560 queue = self.
queuequeue
561 trk.set(
"name", queue)
563 email = ET.SubElement(req,
"Email")
564 email.set(
"email", self.
emailemail)
565 email.set(
"request",
"true")
566 email.set(
"job",
"true")
567 mem = ET.SubElement(req,
"Memory")
568 mem.set(
"space", str(self.
memorymemory))
569 mem.set(
"unit",
"MB")
570 disk = ET.SubElement(req,
"DiskSpace")
571 disk.set(
"space", str(self.
diskspacediskspace))
572 disk.set(
"unit",
"GB")
573 limit = ET.SubElement(req,
"TimeLimit")
574 limit.set(
"time", str(self.
job_lengthjob_length))
575 limit.set(
"unit",
"hours")
576 os_elem = ET.SubElement(req,
"OS")
577 if self.
osos
is not None:
581 os_elem.set(
"name", auger_os)
585 cmd = [sys.executable, RUN_SCRIPT,
'run']
588 cmd.extend([
'-c', cfg])
590 cmd.extend([
'--job-steps', str(self.
job_stepsjob_steps)])
591 cmd.extend([
'-i', str(job_id)])
592 cmd.append(self.
scriptscript)
593 cmd.append(os.path.abspath(self.
jobstorejobstore.path))
594 logger.debug(
"Job command: %s" %
" ".join(cmd))
598 """! Needed for resolving ptag output sources."""
600 j.script = self.
scriptscript
601 j._load_params(params)
606 job = ET.SubElement(req,
"Job")
607 job_id = job_params[
'job_id']
609 if 'year' in job_params.keys():
610 year = job_params[
'year']
612 if 'input_files' in list(job_params.keys()):
613 inputfiles = job_params[
'input_files']
614 for src, dest
in inputfiles.items():
615 if not src.startswith(
'http'):
616 input_elem = ET.SubElement(job,
"Input")
617 input_elem.set(
"dest", dest)
618 if src.startswith(
"/mss"):
619 src_file =
"mss:%s" % src
622 input_elem.set(
"src", src_file)
624 logger.warning(
"http input file will not be included in XML job descr: {}".format(src))
625 outputfiles = job_params[
"output_files"]
626 outputdir = job_params[
"output_dir"]
629 for src, dest
in outputfiles.items():
630 output_elem = ET.SubElement(job,
"Output")
631 res_src = j.resolve_output_src(src)
632 output_elem.set(
"src", res_src)
633 dest_file = os.path.abspath(os.path.join(outputdir, dest))
634 if dest_file.startswith(
"/mss"):
635 dest_file =
"mss:%s" % dest_file
636 logger.debug(
'Auger dest file: {} -> {}'.format(src, dest))
637 output_elem.set(
"dest", dest_file)
639 job_name = ET.SubElement(job,
"Name")
640 job_name.set(
"name",
'%ihps%i' % (year, job_id))
642 job_err = ET.SubElement(job,
"Stderr")
643 stdout_file = os.path.abspath(os.path.join(self.
log_dirlog_dir,
"job.%d.out" % job_id))
644 stderr_file = os.path.abspath(os.path.join(self.
log_dirlog_dir,
"job.%d.err" % job_id))
645 job_err.set(
"dest", stderr_file)
646 job_out = ET.SubElement(job,
"Stdout")
647 job_out.set(
"dest", stdout_file)
649 cmd = ET.SubElement(job,
"Command")
651 cmd_lines.append(
"<![CDATA[")
653 cmd_lines.append(
'pwd;\n')
654 cmd_lines.append(
'env | sort;\n')
655 cmd_lines.append(
'ls -lart;\n')
656 cmd_lines.append(
"source %s;\n" % os.path.realpath(self.
setup_scriptsetup_script))
657 cmd_lines.append(
"source %s/bin/jlab-env.csh;\n" % os.getenv(
'HPSMC_DIR'))
668 cmd_lines.extend(job_cmd)
669 cmd_lines.append(
';\n')
671 cmd_lines.append(
'ls -lart; \n')
673 cmd_lines.append(
"]]>")
677 cmd.text =
' '.join(cmd_lines)
682 Submit using the 'swif2' command at JLAB using an Auger file.
684 This is just a thin wrapper of the parent class to call the swif2 commands with the generated Auger XML file.
686 Existing workflows generated by this class should be fully canceled and removed before resubmitting using this
694 self.
parserparser.add_argument(
"-w",
"--workflow", nargs=
'?', help=
"Name of swif2 workflow", required=
False)
702 logger.debug(f
'swif workflow name set to: {self.workflow}')
707 logger.info(
"Submitting swif workflow: {}".format(self.
workflowworkflow))
713 cmd = [
'swif2',
'add-jsub', self.
workflowworkflow,
'-script', xml_filename]
714 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
715 out = proc.communicate()[0]
716 print(
"".join([s
for s
in out.decode().strip().splitlines(
True)
if s.strip()]))
720 run_cmd = [
'swif2',
'run', self.
workflowworkflow]
721 proc = subprocess.Popen(run_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
722 out = proc.communicate()[0]
723 print(
"".join([s
for s
in out.decode().strip().splitlines(
True)
if s.strip()]))
729 Run local batch jobs sequentially.
736 """! Run a single job locally."""
739 logger.info(f
"Executing local job: {job_id}")
740 proc = subprocess.Popen(cmd, shell=
False)
743 logger.error(f
"Local execution of {job_id} returned error code: {proc.returncode}")
747 mp_queue = multiprocessing.Queue()
751 """! Run the command in a new process whose PID is added to a global MP queue."""
754 proc = subprocess.Popen(cmd, preexec_fn=os.setsid)
755 mp_queue.put(proc.pid)
757 returncode = proc.returncode
758 except subprocess.CalledProcessError
as e:
767 Check if a system process looks like it is still running.
769 return proc.status()
in [psutil.STATUS_RUNNING,
770 psutil.STATUS_SLEEPING,
771 psutil.STATUS_DISK_SLEEP,
777 Kill processes in the multiprocessing queue if the jobs are canceled.
787 """! Kill processes on exit."""
791 parent = psutil.Process(pid)
792 for child
in parent.children(recursive=
True):
794 print(
'Killing running process: %d' % child.pid)
798 except Exception
as e:
808 Run a set of jobs in a local multiprocessing pool using Python's multiprocessing module.
810 The number of processes to spawn can be provided using the '-p' argument.
818 self.
parserparser.add_argument(
"-p",
"--pool-size", type=int,
819 help=
"Job pool size (only applicable when running pool)", required=
False,
820 default=multiprocessing.cpu_count())
824 Make this a no-op as we do not implement single job submission for the processing pool.
834 """! Submit jobs to a local processing pool.
836 This method will not return until all jobs are finished or execution
849 raise Exception(
'No job IDs found to submit')
853 original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
854 pool = multiprocessing.Pool(self.
pool_sizepool_size)
855 signal.signal(signal.SIGINT, original_sigint_handler)
857 logger.info(
"Running %d jobs in pool ..." % len(cmds))
858 res = pool.map_async(run_job_pool, cmds)
860 logger.info(
"Pool results: " + str(res.get(Pool.max_wait)))
861 logger.info(
"Normal termination")
864 except KeyboardInterrupt:
865 logger.fatal(
"Caught KeyboardInterrupt, terminating workers")
867 except Exception
as e:
868 logger.fatal(
"Caught Exception '%s', terminating workers" % (str(e)))
870 except BaseException:
871 e = sys.exc_info()[0]
872 logger.fatal(
"Caught non-Python Exception '%s'" % (e))
876 if __name__ ==
'__main__':
885 if len(sys.argv) > 1:
886 system = sys.argv[1].lower()
887 if system
not in list(system_dict.keys()):
888 raise Exception(f
"The batch system {system} is not valid.")
889 batch = system_dict[system]()
891 batch.parse_args(args)
894 print(
"Usage: batch.py [system] [args]")
895 print(
" Available systems: {}".format(
', '.join(list(system_dict.keys()))))
def submit_job(self, job_id)
Make this a no-op.
def _write_req(self, req, filename='temp.xml')
def submit(self)
Batch submission method for Auger.
def _jsub(self, xml_filename)
def build_cmd(self, job_id)
This is the basic implementation of building a command to run the job from a batch system.
def _create_job_xml(self)
def _create_job(self, params)
Needed for resolving ptag output sources.
def _get_auger_ids(self, out)
def _create_req(self, req_name)
def _add_job(self, req, job_params)
Represents a batch processing system that requires submission like Slurm or Auger.
def parse_args(self, args)
Parse command line arguments and perform setup.
Generic batch processing interface.
def submit_job(self, job_id)
Submit a single batch job and return the batch ID.
def _job_ids_missing_output(self, job_ids)
Get a list of IDs for jobs that are missing output files.
def _outputs_exist(job)
Check if all output files exist for the given job.
def default_rundir(self, job_id=None)
def submit(self)
This is the generic batch submission function which gets a list of jobs to run based on command line ...
def build_cmd(self, job_id)
This is the basic implementation of building a command to run the job from a batch system.
def _get_filtered_job_ids(self)
Get a list of job IDs to submit based on parsed command line options and whether output files are bei...
def parse_args(self, args)
Parse command line arguments and perform setup.
def _logfile(self, job_id)
Get the base name of a log file for the job.
Kill processes in the multiprocessing queue if the jobs are canceled.
def __init__(self, mp_queue)
def __exit__(self, type, val, tb)
Kill processes on exit.
def submit_job(self, job_id)
Submit a single batch job and return the batch ID.
def build_cmd(self, job_id)
This is the basic implementation of building a command to run the job from a batch system.
def parse_args(self, args)
Parse command line arguments and perform setup.
Run local batch jobs sequentially.
def submit_job(self, job_id)
Run a single job locally.
Run a set of jobs in a local multiprocessing pool using Python's multiprocessing module.
def submit_job(self, job_id)
Make this a no-op as we do not implement single job submission for the processing pool.
def submit(self)
Submit jobs to a local processing pool.
def parse_args(self, args)
Parse command line arguments and perform setup.
def submit_job(self, job_id)
Submit a single batch job and return the batch ID.
def default_rundir(self, job_id=None)
Override the basic implementation for getting the default run directory.
def _sbatch(self, job_id)
def _sh_filename(self, job_id)
def build_cmd(self, job_id)
Wrap submission of Slurm jobs using a generated script.
def parse_args(self, args)
Parse command line arguments and perform setup.
def _write_job_script(self, sh_filename, job_cmd)
Write the shell script for Slurm job submission using the 'sbatch' command.
Submit using the 'swif2' command at JLAB using an Auger file.
def submit(self)
Batch submission method for Auger.
def parse_args(self, args)
Parse command line arguments and perform setup.
Simple JSON based store of job data.
Primary class to run HPS jobs from a Python script.
def run_job_pool(cmd)
Run the command in a new process whose PID is added to a global MP queue.
def is_running(proc)
Check if a system process looks like it is still running.