HPS-MC
 
Loading...
Searching...
No Matches
batch.py
Go to the documentation of this file.
1"""!
2@package batch
3
4Defines a set of classes and a command-line interface for submitting batch jobs.
5
6Supported systems include serial execution locally, a multiprocessing pool,
7Slurm, LSF, and Auger.
8"""
9
10import os
11import argparse
12import subprocess
13import sys
14import logging
15import signal
16import multiprocessing
17import psutil
18from pathlib import Path
19import socket
20
21import xml.etree.ElementTree as ET
22from xml.dom import minidom
23from xml.sax.saxutils import unescape
24from distutils.spawn import find_executable
25
26from abc import ABC, abstractmethod
27
28from hpsmc.job import Job, JobStore, JobScriptDatabase
29
30logger = logging.getLogger("hpsmc.batch")
31
32RUN_SCRIPT = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'job.py')
33
34
35class Batch(ABC):
36 """!
37 Generic batch processing interface.
38 """
39
40 def __init__(self):
41
42 parser = argparse.ArgumentParser(self.__class__.__name__,
43 epilog='Available scripts: %s' % ', '.join(JobScriptDatabase().get_script_names()))
44
45 parser.add_argument("-c", "--config-file", nargs='?', help="Config file", action='append')
46 parser.add_argument("-l", "--log-dir", nargs='?', help="Log file output dir", required=False, default=str(Path(os.getcwd(), 'logs')))
47 parser.add_argument("-d", "--run-dir", nargs='?', help="Base run dir for the jobs (must be an absolute path)", default=None)
48 parser.add_argument("-D", "--debug", action='store_true', help="Enable debug settings", required=False)
49 parser.add_argument("-o", "--check-output", action='store_true', required=False, help="Do not submit jobs where output files already exist")
50 parser.add_argument("-s", "--job-steps", type=int, default=None, required=False)
51 parser.add_argument("-r", "--job-range", nargs='?', help="Submit jobs numbers within range (e.g. '1:100')", required=False)
52 parser.add_argument("script", nargs='?', help="Name of job script")
53 parser.add_argument("jobstore", nargs='?', help="Job store in JSON format")
54 parser.add_argument("jobids", nargs="*", type=int, help="List of individual job IDs to submit (optional)")
55
56 self.parser = parser
57
58 def parse_args(self, args):
59 """! Parse command line arguments and perform setup."""
60
61 cl = self.parser.parse_args(args)
62
63 logger.debug(str(cl))
64
65 if cl.script is None:
66 raise Exception('The script is a required argument.')
67 self.script_name = cl.script # Name of script
68 script_db = JobScriptDatabase()
69 if not script_db.exists(self.script_name):
70 raise Exception('The script name is not valid: %s' % self.script_name)
71 self.script = script_db.get_script_path(self.script_name) # Path to script
72 if not os.path.isfile(self.script):
73 raise Exception('The job script does not exist: %s' % self.script)
74
75 if cl.jobstore is None:
76 raise Exception('The job store file is a required argument.')
77 if not os.path.isfile(cl.jobstore):
78 raise Exception('The job store does not exist: %s' % cl.jobstore)
79 self.jobstore = JobStore(cl.jobstore)
80
81 self.debug = cl.debug
82
83 # Set log dir which is for copying back log files generated by the batch system
84 self.log_dir = os.path.abspath(cl.log_dir)
85 logger.info('log dir: {}'.format(self.log_dir))
86 if not os.path.exists(self.log_dir):
87 os.makedirs(self.log_dir)
88 logger.info('Created log dir: {}'.format(self.log_dir))
89
90 # Set run dir which is a root directory under which job directories will be created
91 self.run_dir = cl.run_dir
92 if self.run_dir is not None:
93 logger.info('run dir: {}'.format(self.run_dir))
94 if not os.path.isabs(self.run_dir):
95 # Require that the run dir is supplied as an abs path
96 raise Exception("The run dir for batch processing must be an abs path.")
97
98 self.check_output = cl.check_output
99
100 if cl.jobids:
101 self.job_ids = list(map(int, cl.jobids))
102 else:
103 self.job_ids = []
104
105 self.job_steps = cl.job_steps
106
107 if cl.job_range:
108 toks = cl.job_range.split(':')
109 if len(toks) != 2:
110 raise ValueError('Bad format for job range: ' + cl.job_range)
111 self.start_job_num = int(toks[0])
112 self.end_job_num = int(toks[1])
113 if self.start_job_num > self.end_job_num:
114 raise ValueError("The start job number must be >= the end job num when using a range.")
115 if self.start_job_num < 0 or self.end_job_num < 0:
116 raise ValueError("The job range numbers must be > 0.")
117 else:
118 self.start_job_num = None
119 self.end_job_num = None
120
121 if cl.config_file:
122 self.config_files = list(map(os.path.abspath, cl.config_file))
123 else:
124 self.config_files = []
125
126 return cl
127
128 @abstractmethod
129 def submit_job(self, job_id):
130 """!
131 Submit a single batch job and return the batch ID.
132
133 This is abstract as each batch system will do this differently.
134
135 Some batch systems don't implement this but sub-classes should override this and make it a
136 no-op so that they can be instantiated.
137 """
138 pass
139
140 def submit(self):
141 """!
142 This is the generic batch submission function which gets a list of jobs to run based on command line
143 arguments and submits them individually. It calls the abstract submit_job() method and prints the batch
144 system ID that was returned, if any.
145 """
146 job_ids = self._get_filtered_job_ids()
147 logger.info('Submitting jobs: %s' % str(job_ids))
148 for job_id in job_ids:
149 if not self.jobstore.has_job_id(job_id):
150 raise Exception('Job ID was not found in job store: %s' % job_id)
151 job_data = self.jobstore.get_job(job_id)
152 batch_id = self.submit_job(job_id)
153 logger.info(f"Submitted job {job_id} with batch ID {str(batch_id)}")
154
155 def default_rundir(self, job_id=None):
156 if job_id is None:
157 raise Exception('Missing valid job ID')
158 return str(Path(os.getcwd(), 'scratch', str(job_id)))
159
160 def build_cmd(self, job_id):
161 """!
162 This is the basic implementation of building a command to run the job from a batch system.
163 """
164 cmd = [sys.executable, RUN_SCRIPT, 'run']
165 logfile = self._logfile(job_id)
166 cmd.extend(['-o', f"{logfile}.out",
167 '-e', f"{logfile}.err"])
168 if self.run_dir:
169 # Set the job's base run dir explicitly from user argument,
170 # appending the job number as a subdirectory.
171 job_dir = str(Path(self.run_dir, str(job_id)))
172 else:
173 # Set the job directory to the default.
174 job_dir = self.default_rundir(job_id)
175
176 logger.debug(f'job dir: {job_dir}')
177 cmd.extend(['-d', job_dir])
178
179 if len(self.config_files):
180 for cfg in self.config_files:
181 cmd.extend(['-c', cfg])
182 if self.job_steps:
183 cmd.extend(['--job-steps', str(self.job_steps)])
184 cmd.extend(['-i', str(job_id)])
185 cmd.append(self.script)
186 cmd.append(os.path.abspath(self.jobstore.path))
187 logger.debug("Job command: %s" % " ".join(cmd))
188 return cmd
189
190 def _logfile(self, job_id):
191 """!
192 Get the base name of a log file for the job.
193 """
194 return os.path.abspath(os.path.join(self.log_dir, 'job.%s' % str(job_id)))
195
196 @staticmethod
198 """!
199 Check if all output files exist for the given job. This is not the job ID but the full JSON job data.
200
201 Return False when first missing output is found.
202 """
203 for src, dest in job["output_files"].items():
204 if not os.path.isfile(os.path.join(job["output_dir"], dest)):
205 logger.debug('Job output does not exist: %s -> %s' % (src, dest))
206 return False
207 return True
208
210 """!
211 Get a list of job IDs to submit based on parsed command line options and whether output files are being checked.
212 """
213 submit_ids = self.jobstore.get_job_ids()
214 logger.debug('Initial pre-filtered job IDs: {}'.format(str(submit_ids)))
215 if self.start_job_num:
216 submit_ids = [job_id for job_id in submit_ids
217 if int(job_id) >= self.start_job_num and int(job_id) <= self.end_job_num]
218 elif len(self.job_ids):
219 submit_ids = self.job_ids
220 logger.debug('job IDs after range check: {}'.format(str(submit_ids)))
221 if self.check_output:
222 submit_ids = self._job_ids_missing_output(submit_ids)
223 logger.info('job IDs after output file check: {}'.format(str(submit_ids)))
224 return submit_ids
225
226 def _job_ids_missing_output(self, job_ids):
227 """! Get a list of IDs for jobs that are missing output files."""
228 return [job_id for job_id in job_ids if not self._outputs_exist(self.jobstore.get_job(job_id))]
229
230
231class BatchSystem(Batch, ABC):
232 """!
233 Represents a batch processing system that requires submission like Slurm or Auger.
234
235 This subclasses Batch because it adds a number of different parameters which do not apply to all the
236 batch system types (namely Pool and Local).
237 """
238
239 def __init__(self):
240
241 super().__init__()
242
243 self.parser.add_argument("-q", "--queue", nargs='?',
244 help="Job queue or partition",
245 required=False)
246 self.parser.add_argument("-W", "--job-length", type=int, help="Max job length in hours", required=False, default=4)
247 self.parser.add_argument("-m", "--memory", type=int, help="Max job memory allocation in MB", default=2000)
248 self.parser.add_argument("-f", "--diskspace", type=int, help="Disk space needed for job in GB", default=20)
249 self.parser.add_argument("-e", "--email", nargs='?', help="Email address for job notifications", required=False)
250
251 self.parser.add_argument("-O", "--os", nargs='?', help="Operating system of batch nodes (Auger and LSF)")
252
253 # Set site based on FQDN
254 self.site = BatchSystem._site()
255
256 def parse_args(self, args):
257 """! Parse command line arguments and perform setup."""
258
259 cl = super().parse_args(args)
260
261 self.email = cl.email
262 self.queue = cl.queue
263 self.os = cl.os
264 self.memory = cl.memory
265 self.diskspace = cl.diskspace
266 self.job_length = cl.job_length
267
268 return cl
269
270 @staticmethod
271 def _site():
272 fqdn = socket.getfqdn()
273 site = None
274 if 'slac.stanford.edu' in fqdn:
275 site = 'slac'
276 elif 'jlab.org' in fqdn:
277 site = 'jlab'
278 return site
279
280
282 """! Submit LSF batch jobs."""
283
284 def __init__(self):
285 super().__init__()
286
287 def parse_args(self, args):
288 super().parse_args(args)
289 os.environ['LSB_JOB_REPORT_MAIL'] = 'Y' if self.email else 'N'
290
291 def build_cmd(self, job_id):
292
293 log_file = os.path.abspath(os.path.join(self.log_dir, 'job.%s.log' % str(job_id)))
294
295 queue = self.queue
296 if queue is None:
297 queue = 'long'
298
299 if self.os is not None:
300 lsf_os = self.os
301 else:
302 lsf_os = 'centos7'
303
304 cmd = ['bsub',
305 '-W', str(self.job_lengthjob_length) + ':0',
306 '-q', queue,
307 '-R', lsf_os,
308 '-o', log_file,
309 '-e', log_file]
310
311 if self.email:
312 cmd.extend(['-u', self.email])
313
314 cmd.extend(super().build_cmd(self, job_id))
315
316 return cmd
317
318 def submit_job(self, job_id):
319 cmd = self.build_cmdbuild_cmd(job_id)
320 logger.info('Submitting job %s to LSF with command: %s' % (job_id, ' '.join(cmd)))
321 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
322 out, err = proc.communicate()
323 if err is not None and len(err):
324 logger.warning(err)
325 tokens = out.decode().split(" ")
326 if tokens[0] != 'Job':
327 raise Exception('Unexpected output from bsub command: %s' % out)
328 batch_id = int(tokens[1].replace('<', '').replace('>', ''))
329 return batch_id
330
331
333 """! Submit Slurm batch jobs."""
334
335 def __init__(self):
336
337 super().__init__()
338
339 self.parser.add_argument("-S", "--sh-dir", nargs='?', help="Directory to hold generated shell scripts for Slurm", default=str(Path(os.getcwd(), 'sh')))
340 self.parser.add_argument("-E", "--env", nargs='?', help="Full path to env setup script", required=False, default=None)
341 self.parser.add_argument("-A", "--account", nargs='?', help="Account name for s3df slurm jobs.", required=False, default=None)
342
343 def parse_args(self, args):
344
345 cl = super().parse_args(args)
346
347 # Set Slurm env script
348 self.env = cl.env
349 self.account = cl.account
350
351 # Set Slurm scripts dir
352 self.sh_dir = os.path.abspath(cl.sh_dir)
353 logger.info('Slurm sh dir: {}'.format(self.sh_dir))
354 if not os.path.exists(self.sh_dir):
355 os.makedirs(self.sh_dir)
356 logger.info('Created Slurm sh dir: {}'.format(self.sh_dir))
357
358 def default_rundir(self, job_id=None):
359 """!
360 Override the basic implementation for getting the default run directory.
361 """
362 if self.sitesite == 'slac':
363 run_dir = '$LSCRATCH'
364 elif self.sitesite == 'jlab':
365 run_dir = '/scratch/slurm/$SLURM_JOBID'
366 else:
367 run_dir = os.getcwd() + "/scratch/$SLURM_JOBID"
368 return run_dir
369
370 def _default_queue(self):
371 queue = self.queue
372 if queue is None:
373 if self.sitesite == 'slac':
374 queue = 'shared'
375 elif self.sitesite == 'jlab':
376 queue = 'ifarm'
377 else:
378 raise Exception('No queue name was provided.')
379 return queue
380
381 def _sbatch(self, job_id):
382 log_file = self._logfile(job_id)
383 sbatch_cmd = ['sbatch',
384 '--time=%s' % (str(self.job_lengthjob_length) + ':00:00'),
385 '--mem=%sM' % self.memorymemory,
386 '--job-name=%i_%s' % (job_id, self.script_namescript_name),
387 '--output=%s.out' % log_file,
388 '--error=%s.err' % log_file]
389 if self.queue:
390 sbatch_cmd.extend([f'--partition={self.queue}'])
391 if self.account:
392 sbatch_cmd.extend([f'--account={self.account}'])
393 if self.email:
394 sbatch_cmd.extend([f'--mail-user={self.email}',
395 f'--mail-type=ALL'])
396 return sbatch_cmd
397
398 def _sh_filename(self, job_id):
399 return self.sh_dir + '/job.%i.sh' % job_id
400
401 def build_cmd(self, job_id):
402 """!
403 Wrap submission of Slurm jobs using a generated script.
404 """
405
406 # Get the sbatch command
407 cmd = self._sbatch(job_id)
408
409 # Get name of shell script to generate
410 sh_filename = self._sh_filename(job_id)
411
412 # Build the basic job command for execution
413 job_cmd = super().build_cmd(job_id)
414 if self.run_dir is None:
415 # The superclass will have already set this if the user provided an
416 # explicit run dir. Here we set a default scratch directory if none
417 # was given.
418 job_cmd.extend(['-d', self.default_rundirdefault_rundir()])
419
420 # Write the job submission script out
421 self._write_job_script(sh_filename, job_cmd)
422
423 # Append job run script to Slurm command
424 cmd.append(sh_filename)
425
426 return cmd
427
428 def _write_job_script(self, sh_filename, job_cmd):
429 """!
430 Write the shell script for Slurm job submission using the 'sbatch' command.
431 """
432
433 script_lines = ['#!/bin/bash',
434 '']
435 if self.env:
436 script_lines.append(f'source {self.env}')
437 script_lines.extend(['echo Start time: `date`',
438 'echo PWD=`pwd`',
439 'echo ---- Start Environment ----',
440 'env | sort',
441 'echo ---- End Environment ----',
442 'time ' + ' '.join(job_cmd),
443 'echo End time: `date`'])
444
445 logger.debug("Slurm submission script:\n" + str(script_lines))
446
447 with open(sh_filename, 'w') as sh_file:
448 for script_line in script_lines:
449 sh_file.write(script_line + '\n')
450
451 logger.debug('Wrote Slurm submission script to: '.format(str(Path(self.sh_dir, sh_filename))))
452
453 def submit_job(self, job_id):
454 cmd = self.build_cmdbuild_cmd(job_id)
455 logger.info('Submitting job %s to Slurm with command: %s' % (job_id, ' '.join(cmd)))
456 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
457 out, err = proc.communicate()
458 if err is not None and len(err):
459 logger.warning(err)
460 tokens = out.decode().split(" ")
461 if tokens[0] != 'Submitted':
462 raise Exception('Unexpected output from sbatch command: %s' % out)
463 batch_id = int(tokens[3].replace('<', '').replace('>', ''))
464 return batch_id
465
466
468 """!
469 Submit Auger batch jobs.
470
471 Auger itself is actually deprecated and unavailable but its submission XML format is supported by
472 the Swif class (see below).
473 """
474
475 def __init__(self):
476
477 super().__init__()
478
479 self.setup_script = find_executable('hps-mc-env.csh')
480
481 if not self.setup_script:
482 raise Exception("Failed to find 'hps-mc-env.csh' in environment.")
483
484 def submit_job(self, job_id):
485 """!
486 Make this a no-op. Auger is a bit of a special case in terms of how batch submission works with a
487 generated XML file including all job IDs, so we do not implement single job submission.
488 """
489 pass
490
491 def submit(self):
492 """!
493 Batch submission method for Auger.
494
495 This differs from some of the other systems in that it doesn't loop over individual
496 job IDs. Instead a single XML file is submitted for all the jobs at once.
497 """
498 xml_filename = self._create_job_xml() # write request to XML file
499 auger_ids = self._jsub(xml_filename) # execute jsub to submit jobs
500 logger.info("Submitted Auger jobs: %s" % str(auger_ids))
501
503 job_ids = self._get_filtered_job_ids()
504 logger.info('Submitting jobs: %s' % str(job_ids))
505 req = self._create_req(self.script_name) # create request XML header
506 for job_id in job_ids:
507 if not self.jobstore.has_job_id(job_id):
508 raise Exception('Job ID was not found in job store: %s' % job_id)
509 job_params = self.jobstore.get_job(job_id)
510 if self.check_output and Batch._outputs_exist(job_params):
511 logger.warning("Skipping Auger submission for job "
512 "because outputs already exist: %d" % job_id)
513 else:
514 self._add_job(req, job_params) # add job to request
515 return self._write_req(req) # write request to file
516
517 def _jsub(self, xml_filename):
518 cmd = ['jsub', '-xml', xml_filename]
519 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
520 out, err = proc.communicate()
521 auger_ids = self._get_auger_ids(out)
522 return auger_ids
523
524 def _get_auger_ids(self, out):
525 auger_ids = []
526 for line in out.splitlines():
527 if line.strip().startswith(b'<jsub>'):
528 j = ET.fromstring(line)
529 for req in j.getchildren():
530 for child in req.getchildren():
531 if child.tag == 'jobIndex':
532 auger_id = int(child.text)
533 auger_ids.append(auger_id)
534 elif child.tag == 'error':
535 # Submission failed so raise an exception with the error msg
536 raise Exception(child.text)
537 break
538 return auger_ids
539
540 def _write_req(self, req, filename='temp.xml'):
541 pretty = unescape(minidom.parseString(ET.tostring(req)).toprettyxml(indent=" "))
542 with open(filename, 'w') as f:
543 f.write(pretty)
544 return f.name
545
546 def _create_req(self, req_name):
547 req = ET.Element("Request")
548 name_elem = ET.SubElement(req, "Name")
549 name_elem.set("name", req_name)
550 prj = ET.SubElement(req, "Project")
551 prj.set("name", "hps")
552 trk = ET.SubElement(req, "Track")
553 if self.debug:
554 # Queue arg is not used when debug flag is active.
555 trk.set("name", "debug")
556 else:
557 # Queue name is used to set job track.
558 queue = 'simulation'
559 if self.queue is not None:
560 queue = self.queue
561 trk.set("name", queue)
562 if self.emailemail:
563 email = ET.SubElement(req, "Email")
564 email.set("email", self.emailemail)
565 email.set("request", "true")
566 email.set("job", "true")
567 mem = ET.SubElement(req, "Memory")
568 mem.set("space", str(self.memorymemory))
569 mem.set("unit", "MB")
570 disk = ET.SubElement(req, "DiskSpace")
571 disk.set("space", str(self.diskspacediskspace))
572 disk.set("unit", "GB")
573 limit = ET.SubElement(req, "TimeLimit")
574 limit.set("time", str(self.job_lengthjob_length))
575 limit.set("unit", "hours")
576 os_elem = ET.SubElement(req, "OS")
577 if self.os is not None:
578 auger_os = self.os
579 else:
580 auger_os = 'general'
581 os_elem.set("name", auger_os)
582 return req
583
584 def build_cmd(self, job_id):
585 cmd = [sys.executable, RUN_SCRIPT, 'run']
587 for cfg in self.config_filesconfig_files:
588 cmd.extend(['-c', cfg])
589 if self.job_stepsjob_steps is not None:
590 cmd.extend(['--job-steps', str(self.job_stepsjob_steps)])
591 cmd.extend(['-i', str(job_id)])
592 cmd.append(self.scriptscript)
593 cmd.append(os.path.abspath(self.jobstore.path))
594 logger.debug("Job command: %s" % " ".join(cmd))
595 return cmd
596
597 def _create_job(self, params):
598 """! Needed for resolving ptag output sources."""
599 j = Job()
600 j.script = self.scriptscript
601 j._load_params(params)
602 j._load_script()
603 return j
604
605 def _add_job(self, req, job_params):
606 job = ET.SubElement(req, "Job")
607 job_id = job_params['job_id']
608 year = '' # /todo change to number
609 if 'year' in job_params.keys():
610 year = job_params['year']
611
612 if 'input_files' in list(job_params.keys()):
613 inputfiles = job_params['input_files']
614 for src, dest in inputfiles.items():
615 if not src.startswith('http'):
616 input_elem = ET.SubElement(job, "Input")
617 input_elem.set("dest", dest)
618 if src.startswith("/mss"):
619 src_file = "mss:%s" % src
620 else:
621 src_file = src
622 input_elem.set("src", src_file)
623 else:
624 logger.warning("http input file will not be included in XML job descr: {}".format(src))
625 outputfiles = job_params["output_files"]
626 outputdir = job_params["output_dir"]
627 # outputdir = os.path.realpath(outputdir)
628 j = self._create_job(job_params)
629 for src, dest in outputfiles.items():
630 output_elem = ET.SubElement(job, "Output")
631 res_src = j.resolve_output_src(src)
632 output_elem.set("src", res_src)
633 dest_file = os.path.abspath(os.path.join(outputdir, dest))
634 if dest_file.startswith("/mss"):
635 dest_file = "mss:%s" % dest_file
636 logger.debug('Auger dest file: {} -> {}'.format(src, dest))
637 output_elem.set("dest", dest_file)
638
639 job_name = ET.SubElement(job, "Name")
640 job_name.set("name", '%ihps%i' % (year, job_id))
641
642 job_err = ET.SubElement(job, "Stderr")
643 stdout_file = os.path.abspath(os.path.join(self.log_dir, "job.%d.out" % job_id))
644 stderr_file = os.path.abspath(os.path.join(self.log_dir, "job.%d.err" % job_id))
645 job_err.set("dest", stderr_file)
646 job_out = ET.SubElement(job, "Stdout")
647 job_out.set("dest", stdout_file)
648
649 cmd = ET.SubElement(job, "Command")
650 cmd_lines = []
651 cmd_lines.append("<![CDATA[")
652
653 cmd_lines.append('pwd;\n')
654 cmd_lines.append('env | sort;\n')
655 cmd_lines.append('ls -lart;\n')
656 cmd_lines.append("source %s;\n" % os.path.realpath(self.setup_script))
657 cmd_lines.append("source %s/bin/jlab-env.csh;\n" % os.getenv('HPSMC_DIR'))
658
659 job_cmd = self.build_cmdbuild_cmd(job_id)
660
661 # Write log file locally so it can be copied back with Output element
662 # log_file = 'job.%d.log' % job_id
663 # job_cmd.extend(['-l', '$PWD/%s' % log_file])
664 # log_out_elem = ET.SubElement(job, "Output")
665 # log_out_elem.set('src', log_file)
666 # log_out_elem.set('dest', os.path.join(self.log_dir, log_file))
667
668 cmd_lines.extend(job_cmd)
669 cmd_lines.append(';\n')
670
671 cmd_lines.append('ls -lart; \n')
672
673 cmd_lines.append("]]>")
674
675 # logger.debug(cmd_lines)
676
677 cmd.text = ' '.join(cmd_lines)
678
679
680class Swif(Auger):
681 """!
682 Submit using the 'swif2' command at JLAB using an Auger file.
683
684 This is just a thin wrapper of the parent class to call the swif2 commands with the generated Auger XML file.
685
686 Existing workflows generated by this class should be fully canceled and removed before resubmitting using this
687 interface.
688 """
689
690 def __init__(self):
691
692 super().__init__()
693
694 self.parser.add_argument("-w", "--workflow", nargs='?', help="Name of swif2 workflow", required=False)
695
696 def parse_args(self, args):
697 cl = super().parse_args(args)
698 if cl.workflow:
699 self.workflow = cl.workflow
700 else:
701 self.workflow = self.script_name
702 logger.debug(f'swif workflow name set to: {self.workflow}')
703 return cl
704
705 def submit(self):
706
707 logger.info("Submitting swif workflow: {}".format(self.workflow))
708
709 # Write request to XML file
710 xml_filename = self._create_job_xml()
711
712 # Add job to swif2 workflow using Auger XML file
713 cmd = ['swif2', 'add-jsub', self.workflow, '-script', xml_filename]
714 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
715 out = proc.communicate()[0]
716 print("".join([s for s in out.decode().strip().splitlines(True) if s.strip()]))
717 proc.wait()
718
719 # Run the workflow
720 run_cmd = ['swif2', 'run', self.workflow]
721 proc = subprocess.Popen(run_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
722 out = proc.communicate()[0]
723 print("".join([s for s in out.decode().strip().splitlines(True) if s.strip()]))
724 proc.wait()
725
726
728 """!
729 Run local batch jobs sequentially.
730 """
731
732 def __init__(self):
733 super().__init__()
734
735 def submit_job(self, job_id):
736 """! Run a single job locally."""
737 cmd = self.build_cmd(job_id)
738 if self.submit:
739 logger.info(f"Executing local job: {job_id}")
740 proc = subprocess.Popen(cmd, shell=False)
741 proc.communicate()
742 if proc.returncode:
743 logger.error(f"Local execution of {job_id} returned error code: {proc.returncode}")
744
745
746# Queue used to keep track of processes created by batch pool.
747mp_queue = multiprocessing.Queue()
748
749
751 """! Run the command in a new process whose PID is added to a global MP queue."""
752 try:
753 sys.stdout.flush()
754 proc = subprocess.Popen(cmd, preexec_fn=os.setsid)
755 mp_queue.put(proc.pid)
756 proc.wait()
757 returncode = proc.returncode
758 except subprocess.CalledProcessError as e:
759 logger.error(str(e))
760 sys.stdout.flush()
761 pass
762 return returncode
763
764
765def is_running(proc):
766 """!
767 Check if a system process looks like it is still running.
768 """
769 return proc.status() in [psutil.STATUS_RUNNING,
770 psutil.STATUS_SLEEPING,
771 psutil.STATUS_DISK_SLEEP,
772 psutil.STATUS_IDLE]
773
774
776 """!
777 Kill processes in the multiprocessing queue if the jobs are canceled.
778 """
779
780 def __init__(self, mp_queue):
781 self.mp_queue = mp_queue
782
783 def __enter__(self):
784 return self
785
786 def __exit__(self, type, val, tb):
787 """! Kill processes on exit."""
788 while True:
789 pid = mp_queue.get()
790 try:
791 parent = psutil.Process(pid)
792 for child in parent.children(recursive=True):
793 if is_running(child):
794 print('Killing running process: %d' % child.pid)
795 child.kill()
796 if is_running(parent):
797 parent.kill()
798 except Exception as e:
799 # This probably just means it already finished.
800 pass
801
802 if mp_queue.empty():
803 break
804
805
806class Pool(Batch):
807 """!
808 Run a set of jobs in a local multiprocessing pool using Python's multiprocessing module.
809
810 The number of processes to spawn can be provided using the '-p' argument.
811 """
812
813 # Max wait in seconds when getting results
814 max_wait = 999999
815
816 def __init__(self):
817 super().__init__()
818 self.parser.add_argument("-p", "--pool-size", type=int,
819 help="Job pool size (only applicable when running pool)", required=False,
820 default=multiprocessing.cpu_count())
821
822 def submit_job(self, job_id):
823 """!
824 Make this a no-op as we do not implement single job submission for the processing pool.
825 """
826 pass
827
828 def parse_args(self, args):
829 cl = super().parse_args(args)
830 self.pool_size = int(cl.pool_size)
831 return cl
832
833 def submit(self):
834 """! Submit jobs to a local processing pool.
835
836 This method will not return until all jobs are finished or execution
837 is interrupted.
838 """
839
840 cmds = []
841 for job_id in self._get_filtered_job_ids():
842 cmd = self.build_cmd(job_id)
843 cmds.append(cmd)
844
845 # logger.debug('Running job commands in pool ...')
846 # logger.debug('\n'.join([' '.join(cmd) for cmd in cmds]))
847
848 if not len(cmds):
849 raise Exception('No job IDs found to submit')
850
851 # Run jobs in an MP pool and cleanup child processes on exit
852 with KillProcessQueue(mp_queue):
853 original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
854 pool = multiprocessing.Pool(self.pool_size)
855 signal.signal(signal.SIGINT, original_sigint_handler)
856 try:
857 logger.info("Running %d jobs in pool ..." % len(cmds))
858 res = pool.map_async(run_job_pool, cmds)
859 # timeout must be properly set, otherwise tasks will crash
860 logger.info("Pool results: " + str(res.get(Pool.max_wait)))
861 logger.info("Normal termination")
862 pool.close()
863 pool.join()
864 except KeyboardInterrupt:
865 logger.fatal("Caught KeyboardInterrupt, terminating workers")
866 pool.terminate()
867 except Exception as e:
868 logger.fatal("Caught Exception '%s', terminating workers" % (str(e)))
869 pool.terminate()
870 except BaseException: # catch *all* exceptions
871 e = sys.exc_info()[0]
872 logger.fatal("Caught non-Python Exception '%s'" % (e))
873 pool.terminate()
874
875
876if __name__ == '__main__':
877 system_dict = {
878 "lsf": LSF,
879 "slurm": Slurm,
880 "auger": Auger,
881 "local": Local,
882 "pool": Pool,
883 "swif": Swif
884 }
885 if len(sys.argv) > 1:
886 system = sys.argv[1].lower()
887 if system not in list(system_dict.keys()):
888 raise Exception(f"The batch system {system} is not valid.")
889 batch = system_dict[system]()
890 args = sys.argv[2:]
891 batch.parse_args(args)
892 batch.submit()
893 else:
894 print("Usage: batch.py [system] [args]")
895 print(" Available systems: {}".format(', '.join(list(system_dict.keys()))))
Submit Auger batch jobs.
Definition batch.py:467
_create_req(self, req_name)
Definition batch.py:546
build_cmd(self, job_id)
This is the basic implementation of building a command to run the job from a batch system.
Definition batch.py:584
submit_job(self, job_id)
Make this a no-op.
Definition batch.py:484
_create_job(self, params)
Needed for resolving ptag output sources.
Definition batch.py:597
_get_auger_ids(self, out)
Definition batch.py:524
submit(self)
Batch submission method for Auger.
Definition batch.py:491
_create_job_xml(self)
Definition batch.py:502
_add_job(self, req, job_params)
Definition batch.py:605
_write_req(self, req, filename='temp.xml')
Definition batch.py:540
_jsub(self, xml_filename)
Definition batch.py:517
Represents a batch processing system that requires submission like Slurm or Auger.
Definition batch.py:231
parse_args(self, args)
Parse command line arguments and perform setup.
Definition batch.py:256
Generic batch processing interface.
Definition batch.py:35
_get_filtered_job_ids(self)
Get a list of job IDs to submit based on parsed command line options and whether output files are bei...
Definition batch.py:209
build_cmd(self, job_id)
This is the basic implementation of building a command to run the job from a batch system.
Definition batch.py:160
submit_job(self, job_id)
Submit a single batch job and return the batch ID.
Definition batch.py:129
submit(self)
This is the generic batch submission function which gets a list of jobs to run based on command line ...
Definition batch.py:140
default_rundir(self, job_id=None)
Definition batch.py:155
parse_args(self, args)
Parse command line arguments and perform setup.
Definition batch.py:58
_outputs_exist(job)
Check if all output files exist for the given job.
Definition batch.py:197
_logfile(self, job_id)
Get the base name of a log file for the job.
Definition batch.py:190
__init__(self)
Definition batch.py:40
_job_ids_missing_output(self, job_ids)
Get a list of IDs for jobs that are missing output files.
Definition batch.py:226
Kill processes in the multiprocessing queue if the jobs are canceled.
Definition batch.py:775
__exit__(self, type, val, tb)
Kill processes on exit.
Definition batch.py:786
__init__(self, mp_queue)
Definition batch.py:780
Submit LSF batch jobs.
Definition batch.py:281
build_cmd(self, job_id)
This is the basic implementation of building a command to run the job from a batch system.
Definition batch.py:291
submit_job(self, job_id)
Submit a single batch job and return the batch ID.
Definition batch.py:318
parse_args(self, args)
Parse command line arguments and perform setup.
Definition batch.py:287
__init__(self)
Definition batch.py:284
Run local batch jobs sequentially.
Definition batch.py:727
submit_job(self, job_id)
Run a single job locally.
Definition batch.py:735
Run a set of jobs in a local multiprocessing pool using Python's multiprocessing module.
Definition batch.py:806
submit_job(self, job_id)
Make this a no-op as we do not implement single job submission for the processing pool.
Definition batch.py:822
submit(self)
Submit jobs to a local processing pool.
Definition batch.py:833
parse_args(self, args)
Parse command line arguments and perform setup.
Definition batch.py:828
__init__(self)
Definition batch.py:816
Submit Slurm batch jobs.
Definition batch.py:332
build_cmd(self, job_id)
Wrap submission of Slurm jobs using a generated script.
Definition batch.py:401
submit_job(self, job_id)
Submit a single batch job and return the batch ID.
Definition batch.py:453
_sbatch(self, job_id)
Definition batch.py:381
default_rundir(self, job_id=None)
Override the basic implementation for getting the default run directory.
Definition batch.py:358
_default_queue(self)
Definition batch.py:370
parse_args(self, args)
Parse command line arguments and perform setup.
Definition batch.py:343
_write_job_script(self, sh_filename, job_cmd)
Write the shell script for Slurm job submission using the 'sbatch' command.
Definition batch.py:428
_sh_filename(self, job_id)
Definition batch.py:398
Submit using the 'swif2' command at JLAB using an Auger file.
Definition batch.py:680
submit(self)
Batch submission method for Auger.
Definition batch.py:705
parse_args(self, args)
Parse command line arguments and perform setup.
Definition batch.py:696
__init__(self)
Definition batch.py:690
Database of job scripts.
Definition job.py:125
Simple JSON based store of job data.
Definition job.py:73
Primary class to run HPS jobs from a Python script.
Definition job.py:160
run_job_pool(cmd)
Run the command in a new process whose PID is added to a global MP queue.
Definition batch.py:750
is_running(proc)
Check if a system process looks like it is still running.
Definition batch.py:765