4Defines a set of classes and a command-line interface for submitting batch jobs.
6Supported systems include serial execution locally, a multiprocessing pool,
18from pathlib
import Path
21import xml.etree.ElementTree
as ET
22from xml.dom
import minidom
23from xml.sax.saxutils
import unescape
24from distutils.spawn
import find_executable
26from abc
import ABC, abstractmethod
28from hpsmc.job import Job, JobStore, JobScriptDatabase
30logger = logging.getLogger(
"hpsmc.batch")
32RUN_SCRIPT = os.path.join(os.path.dirname(os.path.realpath(__file__)),
'job.py')
37 Generic batch processing interface.
42 parser = argparse.ArgumentParser(self.__class__.__name__,
43 epilog=
'Available scripts: %s' %
', '.join(
JobScriptDatabase().get_script_names()))
45 parser.add_argument(
"-c",
"--config-file", nargs=
'?', help=
"Config file", action=
'append')
46 parser.add_argument(
"-l",
"--log-dir", nargs=
'?', help=
"Log file output dir", required=
False, default=str(Path(os.getcwd(),
'logs')))
47 parser.add_argument(
"-d",
"--run-dir", nargs=
'?', help=
"Base run dir for the jobs (must be an absolute path)", default=
None)
48 parser.add_argument(
"-D",
"--debug", action=
'store_true', help=
"Enable debug settings", required=
False)
49 parser.add_argument(
"-o",
"--check-output", action=
'store_true', required=
False, help=
"Do not submit jobs where output files already exist")
50 parser.add_argument(
"-s",
"--job-steps", type=int, default=
None, required=
False)
51 parser.add_argument(
"-r",
"--job-range", nargs=
'?', help=
"Submit jobs numbers within range (e.g. '1:100')", required=
False)
52 parser.add_argument(
"script", nargs=
'?', help=
"Name of job script")
53 parser.add_argument(
"jobstore", nargs=
'?', help=
"Job store in JSON format")
54 parser.add_argument(
"jobids", nargs=
"*", type=int, help=
"List of individual job IDs to submit (optional)")
59 """! Parse command line arguments and perform setup."""
66 raise Exception(
'The script is a required argument.')
70 raise Exception(
'The script name is not valid: %s' % self.
script_name)
72 if not os.path.isfile(self.
script):
73 raise Exception(
'The job script does not exist: %s' % self.
script)
75 if cl.jobstore
is None:
76 raise Exception(
'The job store file is a required argument.')
77 if not os.path.isfile(cl.jobstore):
78 raise Exception(
'The job store does not exist: %s' % cl.jobstore)
84 self.
log_dir = os.path.abspath(cl.log_dir)
85 logger.info(
'log dir: {}'.format(self.
log_dir))
86 if not os.path.exists(self.
log_dir):
88 logger.info(
'Created log dir: {}'.format(self.
log_dir))
93 logger.info(
'run dir: {}'.format(self.
run_dir))
94 if not os.path.isabs(self.
run_dir):
96 raise Exception(
"The run dir for batch processing must be an abs path.")
108 toks = cl.job_range.split(
':')
110 raise ValueError(
'Bad format for job range: ' + cl.job_range)
114 raise ValueError(
"The start job number must be >= the end job num when using a range.")
116 raise ValueError(
"The job range numbers must be > 0.")
131 Submit a single batch job and return the batch ID.
133 This is abstract as each batch system will do this differently.
135 Some batch systems don't implement this but sub-classes should override this and make it a
136 no-op so that they can be instantiated.
142 This is the generic batch submission function which gets a list of jobs to run based on command line
143 arguments and submits them individually. It calls the abstract submit_job() method and prints the batch
144 system ID that was returned, if any.
146 job_ids = self._get_filtered_job_ids()
147 logger.info(
'Submitting jobs: %s' % str(job_ids))
148 for job_id
in job_ids:
149 if not self.jobstore.has_job_id(job_id):
150 raise Exception(
'Job ID was not found in job store: %s' % job_id)
151 job_data = self.jobstore.get_job(job_id)
152 batch_id = self.submit_job(job_id)
153 logger.info(f
"Submitted job {job_id} with batch ID {str(batch_id)}")
157 raise Exception(
'Missing valid job ID')
158 return str(Path(os.getcwd(),
'scratch', str(job_id)))
162 This is the basic implementation of building a command to run the job from a batch system.
164 cmd = [sys.executable, RUN_SCRIPT,
'run']
166 cmd.extend([
'-o', f
"{logfile}.out",
167 '-e', f
"{logfile}.err"])
171 job_dir = str(Path(self.
run_dir, str(job_id)))
176 logger.debug(f
'job dir: {job_dir}')
177 cmd.extend([
'-d', job_dir])
181 cmd.extend([
'-c', cfg])
183 cmd.extend([
'--job-steps', str(self.
job_steps)])
184 cmd.extend([
'-i', str(job_id)])
186 cmd.append(os.path.abspath(self.
jobstore.path))
187 logger.debug(
"Job command: %s" %
" ".join(cmd))
192 Get the base name of a log file for the job.
194 return os.path.abspath(os.path.join(self.
log_dir,
'job.%s' % str(job_id)))
199 Check if all output files exist for the given job. This is not the job ID but the full JSON job data.
201 Return False when first missing output is found.
203 for src, dest
in job[
"output_files"].items():
204 if not os.path.isfile(os.path.join(job[
"output_dir"], dest)):
205 logger.debug(
'Job output does not exist: %s -> %s' % (src, dest))
211 Get a list of job IDs to submit based on parsed command line options and whether output files are being checked.
213 submit_ids = self.
jobstore.get_job_ids()
214 logger.debug(
'Initial pre-filtered job IDs: {}'.format(str(submit_ids)))
216 submit_ids = [job_id
for job_id
in submit_ids
220 logger.debug(
'job IDs after range check: {}'.format(str(submit_ids)))
223 logger.info(
'job IDs after output file check: {}'.format(str(submit_ids)))
227 """! Get a list of IDs for jobs that are missing output files."""
233 Represents a batch processing system that requires submission like Slurm or Auger.
235 This subclasses Batch because it adds a number of different parameters which do not apply to all the
236 batch system types (namely Pool and Local).
243 self.
parser.add_argument(
"-q",
"--queue", nargs=
'?',
244 help=
"Job queue or partition",
246 self.
parser.add_argument(
"-W",
"--job-length", type=int, help=
"Max job length in hours", required=
False, default=4)
247 self.
parser.add_argument(
"-m",
"--memory", type=int, help=
"Max job memory allocation in MB", default=2000)
248 self.
parser.add_argument(
"-f",
"--diskspace", type=int, help=
"Disk space needed for job in GB", default=20)
249 self.
parser.add_argument(
"-e",
"--email", nargs=
'?', help=
"Email address for job notifications", required=
False)
251 self.
parser.add_argument(
"-O",
"--os", nargs=
'?', help=
"Operating system of batch nodes (Auger and LSF)")
254 self.
site = BatchSystem._site()
257 """! Parse command line arguments and perform setup."""
272 fqdn = socket.getfqdn()
274 if 'slac.stanford.edu' in fqdn:
276 elif 'jlab.org' in fqdn:
282 """! Submit LSF batch jobs."""
289 os.environ[
'LSB_JOB_REPORT_MAIL'] =
'Y' if self.
email else 'N'
293 log_file = os.path.abspath(os.path.join(self.
log_dir,
'job.%s.log' % str(job_id)))
299 if self.
os is not None:
312 cmd.extend([
'-u', self.
email])
314 cmd.extend(super().
build_cmd(self, job_id))
320 logger.info(
'Submitting job %s to LSF with command: %s' % (job_id,
' '.join(cmd)))
321 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
322 out, err = proc.communicate()
323 if err
is not None and len(err):
325 tokens = out.decode().split(
" ")
326 if tokens[0] !=
'Job':
327 raise Exception(
'Unexpected output from bsub command: %s' % out)
328 batch_id = int(tokens[1].replace(
'<',
'').replace(
'>',
''))
333 """! Submit Slurm batch jobs."""
339 self.
parser.add_argument(
"-S",
"--sh-dir", nargs=
'?', help=
"Directory to hold generated shell scripts for Slurm", default=str(Path(os.getcwd(),
'sh')))
340 self.
parser.add_argument(
"-E",
"--env", nargs=
'?', help=
"Full path to env setup script", required=
False, default=
None)
341 self.
parser.add_argument(
"-A",
"--account", nargs=
'?', help=
"Account name for s3df slurm jobs.", required=
False, default=
None)
353 logger.info(
'Slurm sh dir: {}'.format(self.
sh_dir))
354 if not os.path.exists(self.
sh_dir):
356 logger.info(
'Created Slurm sh dir: {}'.format(self.
sh_dir))
360 Override the basic implementation for getting the default run directory.
363 run_dir =
'$LSCRATCH'
365 run_dir =
'/scratch/slurm/$SLURM_JOBID'
367 run_dir = os.getcwd() +
"/scratch/$SLURM_JOBID"
378 raise Exception(
'No queue name was provided.')
383 sbatch_cmd = [
'sbatch',
387 '--output=%s.out' % log_file,
388 '--error=%s.err' % log_file]
390 sbatch_cmd.extend([f
'--partition={self.queue}'])
392 sbatch_cmd.extend([f
'--account={self.account}'])
394 sbatch_cmd.extend([f
'--mail-user={self.email}',
399 return self.
sh_dir +
'/job.%i.sh' % job_id
403 Wrap submission of Slurm jobs using a generated script.
424 cmd.append(sh_filename)
430 Write the shell script for Slurm job submission using the 'sbatch' command.
433 script_lines = [
'#!/bin/bash',
436 script_lines.append(f
'source {self.env}')
437 script_lines.extend([
'echo Start time: `date`',
439 'echo ---- Start Environment ----',
441 'echo ---- End Environment ----',
442 'time ' +
' '.join(job_cmd),
443 'echo End time: `date`'])
445 logger.debug(
"Slurm submission script:\n" + str(script_lines))
447 with open(sh_filename,
'w')
as sh_file:
448 for script_line
in script_lines:
449 sh_file.write(script_line +
'\n')
451 logger.debug(
'Wrote Slurm submission script to: '.format(str(Path(self.
sh_dir, sh_filename))))
455 logger.info(
'Submitting job %s to Slurm with command: %s' % (job_id,
' '.join(cmd)))
456 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
457 out, err = proc.communicate()
458 if err
is not None and len(err):
460 tokens = out.decode().split(
" ")
461 if tokens[0] !=
'Submitted':
462 raise Exception(
'Unexpected output from sbatch command: %s' % out)
463 batch_id = int(tokens[3].replace(
'<',
'').replace(
'>',
''))
469 Submit Auger batch jobs.
471 Auger itself is actually deprecated and unavailable but its submission XML format is supported by
472 the Swif class (see below).
482 raise Exception(
"Failed to find 'hps-mc-env.csh' in environment.")
486 Make this a no-op. Auger is a bit of a special case in terms of how batch submission works with a
487 generated XML file including all job IDs, so we do not implement single job submission.
493 Batch submission method for Auger.
495 This differs from some of the other systems in that it doesn't loop over individual
496 job IDs. Instead a single XML file is submitted for all the jobs at once.
498 xml_filename = self._create_job_xml()
499 auger_ids = self._jsub(xml_filename)
500 logger.info(
"Submitted Auger jobs: %s" % str(auger_ids))
504 logger.info(
'Submitting jobs: %s' % str(job_ids))
506 for job_id
in job_ids:
507 if not self.
jobstore.has_job_id(job_id):
508 raise Exception(
'Job ID was not found in job store: %s' % job_id)
509 job_params = self.
jobstore.get_job(job_id)
510 if self.
check_output and Batch._outputs_exist(job_params):
511 logger.warning(
"Skipping Auger submission for job "
512 "because outputs already exist: %d" % job_id)
518 cmd = [
'jsub',
'-xml', xml_filename]
519 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
520 out, err = proc.communicate()
526 for line
in out.splitlines():
527 if line.strip().startswith(b
'<jsub>'):
528 j = ET.fromstring(line)
529 for req
in j.getchildren():
530 for child
in req.getchildren():
531 if child.tag ==
'jobIndex':
532 auger_id = int(child.text)
533 auger_ids.append(auger_id)
534 elif child.tag ==
'error':
536 raise Exception(child.text)
541 pretty = unescape(minidom.parseString(ET.tostring(req)).toprettyxml(indent=
" "))
542 with open(filename,
'w')
as f:
547 req = ET.Element(
"Request")
548 name_elem = ET.SubElement(req,
"Name")
549 name_elem.set(
"name", req_name)
550 prj = ET.SubElement(req,
"Project")
551 prj.set(
"name",
"hps")
552 trk = ET.SubElement(req,
"Track")
555 trk.set(
"name",
"debug")
559 if self.
queue is not None:
561 trk.set(
"name", queue)
563 email = ET.SubElement(req,
"Email")
565 email.set(
"request",
"true")
566 email.set(
"job",
"true")
567 mem = ET.SubElement(req,
"Memory")
569 mem.set(
"unit",
"MB")
570 disk = ET.SubElement(req,
"DiskSpace")
572 disk.set(
"unit",
"GB")
573 limit = ET.SubElement(req,
"TimeLimit")
575 limit.set(
"unit",
"hours")
576 os_elem = ET.SubElement(req,
"OS")
577 if self.
os is not None:
581 os_elem.set(
"name", auger_os)
585 cmd = [sys.executable, RUN_SCRIPT,
'run']
588 cmd.extend([
'-c', cfg])
591 cmd.extend([
'-i', str(job_id)])
593 cmd.append(os.path.abspath(self.
jobstore.path))
594 logger.debug(
"Job command: %s" %
" ".join(cmd))
598 """! Needed for resolving ptag output sources."""
601 j._load_params(params)
606 job = ET.SubElement(req,
"Job")
607 job_id = job_params[
'job_id']
609 if 'year' in job_params.keys():
610 year = job_params[
'year']
612 if 'input_files' in list(job_params.keys()):
613 inputfiles = job_params[
'input_files']
614 for src, dest
in inputfiles.items():
615 if not src.startswith(
'http'):
616 input_elem = ET.SubElement(job,
"Input")
617 input_elem.set(
"dest", dest)
618 if src.startswith(
"/mss"):
619 src_file =
"mss:%s" % src
622 input_elem.set(
"src", src_file)
624 logger.warning(
"http input file will not be included in XML job descr: {}".format(src))
625 outputfiles = job_params[
"output_files"]
626 outputdir = job_params[
"output_dir"]
629 for src, dest
in outputfiles.items():
630 output_elem = ET.SubElement(job,
"Output")
631 res_src = j.resolve_output_src(src)
632 output_elem.set(
"src", res_src)
633 dest_file = os.path.abspath(os.path.join(outputdir, dest))
634 if dest_file.startswith(
"/mss"):
635 dest_file =
"mss:%s" % dest_file
636 logger.debug(
'Auger dest file: {} -> {}'.format(src, dest))
637 output_elem.set(
"dest", dest_file)
639 job_name = ET.SubElement(job,
"Name")
640 job_name.set(
"name",
'%ihps%i' % (year, job_id))
642 job_err = ET.SubElement(job,
"Stderr")
643 stdout_file = os.path.abspath(os.path.join(self.
log_dir,
"job.%d.out" % job_id))
644 stderr_file = os.path.abspath(os.path.join(self.
log_dir,
"job.%d.err" % job_id))
645 job_err.set(
"dest", stderr_file)
646 job_out = ET.SubElement(job,
"Stdout")
647 job_out.set(
"dest", stdout_file)
649 cmd = ET.SubElement(job,
"Command")
651 cmd_lines.append(
"<![CDATA[")
653 cmd_lines.append(
'pwd;\n')
654 cmd_lines.append(
'env | sort;\n')
655 cmd_lines.append(
'ls -lart;\n')
656 cmd_lines.append(
"source %s;\n" % os.path.realpath(self.
setup_script))
657 cmd_lines.append(
"source %s/bin/jlab-env.csh;\n" % os.getenv(
'HPSMC_DIR'))
668 cmd_lines.extend(job_cmd)
669 cmd_lines.append(
';\n')
671 cmd_lines.append(
'ls -lart; \n')
673 cmd_lines.append(
"]]>")
677 cmd.text =
' '.join(cmd_lines)
682 Submit using the 'swif2' command at JLAB using an Auger file.
684 This is just a thin wrapper of the parent class to call the swif2 commands with the generated Auger XML file.
686 Existing workflows generated by this class should be fully canceled and removed before resubmitting using this
694 self.
parser.add_argument(
"-w",
"--workflow", nargs=
'?', help=
"Name of swif2 workflow", required=
False)
702 logger.debug(f
'swif workflow name set to: {self.workflow}')
707 logger.info(
"Submitting swif workflow: {}".format(self.
workflow))
713 cmd = [
'swif2',
'add-jsub', self.
workflow,
'-script', xml_filename]
714 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
715 out = proc.communicate()[0]
716 print(
"".join([s
for s
in out.decode().strip().splitlines(
True)
if s.strip()]))
720 run_cmd = [
'swif2',
'run', self.
workflow]
721 proc = subprocess.Popen(run_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
722 out = proc.communicate()[0]
723 print(
"".join([s
for s
in out.decode().strip().splitlines(
True)
if s.strip()]))
729 Run local batch jobs sequentially.
736 """! Run a single job locally."""
739 logger.info(f
"Executing local job: {job_id}")
740 proc = subprocess.Popen(cmd, shell=
False)
743 logger.error(f
"Local execution of {job_id} returned error code: {proc.returncode}")
747mp_queue = multiprocessing.Queue()
751 """! Run the command in a new process whose PID is added to a global MP queue."""
754 proc = subprocess.Popen(cmd, preexec_fn=os.setsid)
755 mp_queue.put(proc.pid)
757 returncode = proc.returncode
758 except subprocess.CalledProcessError
as e:
767 Check if a system process looks like it is still running.
769 return proc.status()
in [psutil.STATUS_RUNNING,
770 psutil.STATUS_SLEEPING,
771 psutil.STATUS_DISK_SLEEP,
777 Kill processes in the multiprocessing queue if the jobs are canceled.
787 """! Kill processes on exit."""
791 parent = psutil.Process(pid)
792 for child
in parent.children(recursive=
True):
794 print(
'Killing running process: %d' % child.pid)
798 except Exception
as e:
808 Run a set of jobs in a local multiprocessing pool using Python's multiprocessing module.
810 The number of processes to spawn can be provided using the '-p' argument.
818 self.
parser.add_argument(
"-p",
"--pool-size", type=int,
819 help=
"Job pool size (only applicable when running pool)", required=
False,
820 default=multiprocessing.cpu_count())
824 Make this a no-op as we do not implement single job submission for the processing pool.
834 """! Submit jobs to a local processing pool.
836 This method will not return until all jobs are finished or execution
849 raise Exception(
'No job IDs found to submit')
853 original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
854 pool = multiprocessing.Pool(self.
pool_size)
855 signal.signal(signal.SIGINT, original_sigint_handler)
857 logger.info(
"Running %d jobs in pool ..." % len(cmds))
858 res = pool.map_async(run_job_pool, cmds)
860 logger.info(
"Pool results: " + str(res.get(Pool.max_wait)))
861 logger.info(
"Normal termination")
864 except KeyboardInterrupt:
865 logger.fatal(
"Caught KeyboardInterrupt, terminating workers")
867 except Exception
as e:
868 logger.fatal(
"Caught Exception '%s', terminating workers" % (str(e)))
870 except BaseException:
871 e = sys.exc_info()[0]
872 logger.fatal(
"Caught non-Python Exception '%s'" % (e))
876if __name__ ==
'__main__':
885 if len(sys.argv) > 1:
886 system = sys.argv[1].lower()
887 if system
not in list(system_dict.keys()):
888 raise Exception(f
"The batch system {system} is not valid.")
889 batch = system_dict[system]()
891 batch.parse_args(args)
894 print(
"Usage: batch.py [system] [args]")
895 print(
" Available systems: {}".format(
', '.join(list(system_dict.keys()))))
_create_req(self, req_name)
build_cmd(self, job_id)
This is the basic implementation of building a command to run the job from a batch system.
submit_job(self, job_id)
Make this a no-op.
_create_job(self, params)
Needed for resolving ptag output sources.
_get_auger_ids(self, out)
submit(self)
Batch submission method for Auger.
_add_job(self, req, job_params)
_write_req(self, req, filename='temp.xml')
_jsub(self, xml_filename)
Represents a batch processing system that requires submission like Slurm or Auger.
parse_args(self, args)
Parse command line arguments and perform setup.
Generic batch processing interface.
_get_filtered_job_ids(self)
Get a list of job IDs to submit based on parsed command line options and whether output files are bei...
build_cmd(self, job_id)
This is the basic implementation of building a command to run the job from a batch system.
submit_job(self, job_id)
Submit a single batch job and return the batch ID.
submit(self)
This is the generic batch submission function which gets a list of jobs to run based on command line ...
default_rundir(self, job_id=None)
parse_args(self, args)
Parse command line arguments and perform setup.
_outputs_exist(job)
Check if all output files exist for the given job.
_logfile(self, job_id)
Get the base name of a log file for the job.
_job_ids_missing_output(self, job_ids)
Get a list of IDs for jobs that are missing output files.
Kill processes in the multiprocessing queue if the jobs are canceled.
__exit__(self, type, val, tb)
Kill processes on exit.
build_cmd(self, job_id)
This is the basic implementation of building a command to run the job from a batch system.
submit_job(self, job_id)
Submit a single batch job and return the batch ID.
parse_args(self, args)
Parse command line arguments and perform setup.
Run local batch jobs sequentially.
submit_job(self, job_id)
Run a single job locally.
Run a set of jobs in a local multiprocessing pool using Python's multiprocessing module.
submit_job(self, job_id)
Make this a no-op as we do not implement single job submission for the processing pool.
submit(self)
Submit jobs to a local processing pool.
parse_args(self, args)
Parse command line arguments and perform setup.
build_cmd(self, job_id)
Wrap submission of Slurm jobs using a generated script.
submit_job(self, job_id)
Submit a single batch job and return the batch ID.
default_rundir(self, job_id=None)
Override the basic implementation for getting the default run directory.
parse_args(self, args)
Parse command line arguments and perform setup.
_write_job_script(self, sh_filename, job_cmd)
Write the shell script for Slurm job submission using the 'sbatch' command.
_sh_filename(self, job_id)
Submit using the 'swif2' command at JLAB using an Auger file.
submit(self)
Batch submission method for Auger.
parse_args(self, args)
Parse command line arguments and perform setup.
Simple JSON based store of job data.
Primary class to run HPS jobs from a Python script.
run_job_pool(cmd)
Run the command in a new process whose PID is added to a global MP queue.
is_running(proc)
Check if a system process looks like it is still running.