HPS-MC
batch.py
Go to the documentation of this file.
1 """!
2 @package batch
3 
4 Defines a set of classes and a command-line interface for submitting batch jobs.
5 
6 Supported systems include serial execution locally, a multiprocessing pool,
7 Slurm, LSF, and Auger.
8 """
9 
10 import os
11 import argparse
12 import subprocess
13 import sys
14 import logging
15 import signal
16 import multiprocessing
17 import psutil
18 from pathlib import Path
19 import socket
20 
21 import xml.etree.ElementTree as ET
22 from xml.dom import minidom
23 from xml.sax.saxutils import unescape
24 from distutils.spawn import find_executable
25 
26 from abc import ABC, abstractmethod
27 
28 from hpsmc.job import Job, JobStore, JobScriptDatabase
29 
30 logger = logging.getLogger("hpsmc.batch")
31 
32 RUN_SCRIPT = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'job.py')
33 
34 
35 class Batch(ABC):
36  """!
37  Generic batch processing interface.
38  """
39 
40  def __init__(self):
41 
42  parser = argparse.ArgumentParser(self.__class__.__name__,
43  epilog='Available scripts: %s' % ', '.join(JobScriptDatabase().get_script_names()))
44 
45  parser.add_argument("-c", "--config-file", nargs='?', help="Config file", action='append')
46  parser.add_argument("-l", "--log-dir", nargs='?', help="Log file output dir", required=False, default=str(Path(os.getcwd(), 'logs')))
47  parser.add_argument("-d", "--run-dir", nargs='?', help="Base run dir for the jobs (must be an absolute path)", default=None)
48  parser.add_argument("-D", "--debug", action='store_true', help="Enable debug settings", required=False)
49  parser.add_argument("-o", "--check-output", action='store_true', required=False, help="Do not submit jobs where output files already exist")
50  parser.add_argument("-s", "--job-steps", type=int, default=None, required=False)
51  parser.add_argument("-r", "--job-range", nargs='?', help="Submit jobs numbers within range (e.g. '1:100')", required=False)
52  parser.add_argument("script", nargs='?', help="Name of job script")
53  parser.add_argument("jobstore", nargs='?', help="Job store in JSON format")
54  parser.add_argument("jobids", nargs="*", type=int, help="List of individual job IDs to submit (optional)")
55 
56  self.parserparser = parser
57 
58  def parse_args(self, args):
59  """! Parse command line arguments and perform setup."""
60 
61  cl = self.parserparser.parse_args(args)
62 
63  logger.debug(str(cl))
64 
65  if cl.script is None:
66  raise Exception('The script is a required argument.')
67  self.script_namescript_name = cl.script # Name of script
68  script_db = JobScriptDatabase()
69  if not script_db.exists(self.script_namescript_name):
70  raise Exception('The script name is not valid: %s' % self.script_namescript_name)
71  self.scriptscript = script_db.get_script_path(self.script_namescript_name) # Path to script
72  if not os.path.isfile(self.scriptscript):
73  raise Exception('The job script does not exist: %s' % self.scriptscript)
74 
75  if cl.jobstore is None:
76  raise Exception('The job store file is a required argument.')
77  if not os.path.isfile(cl.jobstore):
78  raise Exception('The job store does not exist: %s' % cl.jobstore)
79  self.jobstorejobstore = JobStore(cl.jobstore)
80 
81  self.debugdebug = cl.debug
82 
83  # Set log dir which is for copying back log files generated by the batch system
84  self.log_dirlog_dir = os.path.abspath(cl.log_dir)
85  logger.info('log dir: {}'.format(self.log_dirlog_dir))
86  if not os.path.exists(self.log_dirlog_dir):
87  os.makedirs(self.log_dirlog_dir)
88  logger.info('Created log dir: {}'.format(self.log_dirlog_dir))
89 
90  # Set run dir which is a root directory under which job directories will be created
91  self.run_dirrun_dir = cl.run_dir
92  if self.run_dirrun_dir is not None:
93  logger.info('run dir: {}'.format(self.run_dirrun_dir))
94  if not os.path.isabs(self.run_dirrun_dir):
95  # Require that the run dir is supplied as an abs path
96  raise Exception("The run dir for batch processing must be an abs path.")
97 
98  self.check_outputcheck_output = cl.check_output
99 
100  if cl.jobids:
101  self.job_idsjob_ids = list(map(int, cl.jobids))
102  else:
103  self.job_idsjob_ids = []
104 
105  self.job_stepsjob_steps = cl.job_steps
106 
107  if cl.job_range:
108  toks = cl.job_range.split(':')
109  if len(toks) != 2:
110  raise ValueError('Bad format for job range: ' + cl.job_range)
111  self.start_job_numstart_job_num = int(toks[0])
112  self.end_job_numend_job_num = int(toks[1])
113  if self.start_job_numstart_job_num > self.end_job_numend_job_num:
114  raise ValueError("The start job number must be >= the end job num when using a range.")
115  if self.start_job_numstart_job_num < 0 or self.end_job_numend_job_num < 0:
116  raise ValueError("The job range numbers must be > 0.")
117  else:
118  self.start_job_numstart_job_num = None
119  self.end_job_numend_job_num = None
120 
121  if cl.config_file:
122  self.config_filesconfig_files = list(map(os.path.abspath, cl.config_file))
123  else:
124  self.config_filesconfig_files = []
125 
126  return cl
127 
128  @abstractmethod
129  def submit_job(self, job_id):
130  """!
131  Submit a single batch job and return the batch ID.
132 
133  This is abstract as each batch system will do this differently.
134 
135  Some batch systems don't implement this but sub-classes should override this and make it a
136  no-op so that they can be instantiated.
137  """
138  pass
139 
140  def submit(self):
141  """!
142  This is the generic batch submission function which gets a list of jobs to run based on command line
143  arguments and submits them individually. It calls the abstract submit_job() method and prints the batch
144  system ID that was returned, if any.
145  """
146  job_ids = self._get_filtered_job_ids()
147  logger.info('Submitting jobs: %s' % str(job_ids))
148  for job_id in job_ids:
149  if not self.jobstore.has_job_id(job_id):
150  raise Exception('Job ID was not found in job store: %s' % job_id)
151  job_data = self.jobstore.get_job(job_id)
152  batch_id = self.submit_job(job_id)
153  logger.info(f"Submitted job {job_id} with batch ID {str(batch_id)}")
154 
155  def default_rundir(self, job_id=None):
156  if job_id is None:
157  raise Exception('Missing valid job ID')
158  return str(Path(os.getcwd(), 'scratch', str(job_id)))
159 
160  def build_cmd(self, job_id):
161  """!
162  This is the basic implementation of building a command to run the job from a batch system.
163  """
164  cmd = [sys.executable, RUN_SCRIPT, 'run']
165  logfile = self._logfile_logfile(job_id)
166  cmd.extend(['-o', f"{logfile}.out",
167  '-e', f"{logfile}.err"])
168  if self.run_dirrun_dir:
169  # Set the job's base run dir explicitly from user argument,
170  # appending the job number as a subdirectory.
171  job_dir = str(Path(self.run_dirrun_dir, str(job_id)))
172  else:
173  # Set the job directory to the default.
174  job_dir = self.default_rundirdefault_rundir(job_id)
175 
176  logger.debug(f'job dir: {job_dir}')
177  cmd.extend(['-d', job_dir])
178 
179  if len(self.config_filesconfig_files):
180  for cfg in self.config_filesconfig_files:
181  cmd.extend(['-c', cfg])
182  if self.job_stepsjob_steps:
183  cmd.extend(['--job-steps', str(self.job_stepsjob_steps)])
184  cmd.extend(['-i', str(job_id)])
185  cmd.append(self.scriptscript)
186  cmd.append(os.path.abspath(self.jobstorejobstore.path))
187  logger.debug("Job command: %s" % " ".join(cmd))
188  return cmd
189 
190  def _logfile(self, job_id):
191  """!
192  Get the base name of a log file for the job.
193  """
194  return os.path.abspath(os.path.join(self.log_dirlog_dir, 'job.%s' % str(job_id)))
195 
196  @staticmethod
197  def _outputs_exist(job):
198  """!
199  Check if all output files exist for the given job. This is not the job ID but the full JSON job data.
200 
201  Return False when first missing output is found.
202  """
203  for src, dest in job["output_files"].items():
204  if not os.path.isfile(os.path.join(job["output_dir"], dest)):
205  logger.debug('Job output does not exist: %s -> %s' % (src, dest))
206  return False
207  return True
208 
210  """!
211  Get a list of job IDs to submit based on parsed command line options and whether output files are being checked.
212  """
213  submit_ids = self.jobstorejobstore.get_job_ids()
214  logger.debug('Initial pre-filtered job IDs: {}'.format(str(submit_ids)))
215  if self.start_job_numstart_job_num:
216  submit_ids = [job_id for job_id in submit_ids
217  if int(job_id) >= self.start_job_numstart_job_num and int(job_id) <= self.end_job_numend_job_num]
218  elif len(self.job_idsjob_ids):
219  submit_ids = self.job_idsjob_ids
220  logger.debug('job IDs after range check: {}'.format(str(submit_ids)))
221  if self.check_outputcheck_output:
222  submit_ids = self._job_ids_missing_output_job_ids_missing_output(submit_ids)
223  logger.info('job IDs after output file check: {}'.format(str(submit_ids)))
224  return submit_ids
225 
226  def _job_ids_missing_output(self, job_ids):
227  """! Get a list of IDs for jobs that are missing output files."""
228  return [job_id for job_id in job_ids if not self._outputs_exist_outputs_exist(self.jobstorejobstore.get_job(job_id))]
229 
230 
231 class BatchSystem(Batch, ABC):
232  """!
233  Represents a batch processing system that requires submission like Slurm or Auger.
234 
235  This subclasses Batch because it adds a number of different parameters which do not apply to all the
236  batch system types (namely Pool and Local).
237  """
238 
239  def __init__(self):
240 
241  super().__init__()
242 
243  self.parserparser.add_argument("-q", "--queue", nargs='?',
244  help="Job queue or partition",
245  required=False)
246  self.parserparser.add_argument("-W", "--job-length", type=int, help="Max job length in hours", required=False, default=4)
247  self.parserparser.add_argument("-m", "--memory", type=int, help="Max job memory allocation in MB", default=2000)
248  self.parserparser.add_argument("-f", "--diskspace", type=int, help="Disk space needed for job in GB", default=20)
249  self.parserparser.add_argument("-e", "--email", nargs='?', help="Email address for job notifications", required=False)
250 
251  self.parserparser.add_argument("-O", "--os", nargs='?', help="Operating system of batch nodes (Auger and LSF)")
252 
253  # Set site based on FQDN
254  self.sitesite = BatchSystem._site()
255 
256  def parse_args(self, args):
257  """! Parse command line arguments and perform setup."""
258 
259  cl = super().parse_args(args)
260 
261  self.emailemail = cl.email
262  self.queuequeue = cl.queue
263  self.osos = cl.os
264  self.memorymemory = cl.memory
265  self.diskspacediskspace = cl.diskspace
266  self.job_lengthjob_length = cl.job_length
267 
268  return cl
269 
270  @staticmethod
271  def _site():
272  fqdn = socket.getfqdn()
273  site = None
274  if 'slac.stanford.edu' in fqdn:
275  site = 'slac'
276  elif 'jlab.org' in fqdn:
277  site = 'jlab'
278  return site
279 
280 
282  """! Submit LSF batch jobs."""
283 
284  def __init__(self):
285  super().__init__()
286 
287  def parse_args(self, args):
288  super().parse_args(args)
289  os.environ['LSB_JOB_REPORT_MAIL'] = 'Y' if self.emailemail else 'N'
290 
291  def build_cmd(self, job_id):
292 
293  log_file = os.path.abspath(os.path.join(self.log_dirlog_dir, 'job.%s.log' % str(job_id)))
294 
295  queue = self.queuequeue
296  if queue is None:
297  queue = 'long'
298 
299  if self.osos is not None:
300  lsf_os = self.osos
301  else:
302  lsf_os = 'centos7'
303 
304  cmd = ['bsub',
305  '-W', str(self.job_lengthjob_length) + ':0',
306  '-q', queue,
307  '-R', lsf_os,
308  '-o', log_file,
309  '-e', log_file]
310 
311  if self.emailemail:
312  cmd.extend(['-u', self.emailemail])
313 
314  cmd.extend(super().build_cmd(self, job_id))
315 
316  return cmd
317 
318  def submit_job(self, job_id):
319  cmd = self.build_cmdbuild_cmdbuild_cmd(job_id)
320  logger.info('Submitting job %s to LSF with command: %s' % (job_id, ' '.join(cmd)))
321  proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
322  out, err = proc.communicate()
323  if err is not None and len(err):
324  logger.warning(err)
325  tokens = out.decode().split(" ")
326  if tokens[0] != 'Job':
327  raise Exception('Unexpected output from bsub command: %s' % out)
328  batch_id = int(tokens[1].replace('<', '').replace('>', ''))
329  return batch_id
330 
331 
333  """! Submit Slurm batch jobs."""
334 
335  def __init__(self):
336 
337  super().__init__()
338 
339  self.parserparser.add_argument("-S", "--sh-dir", nargs='?', help="Directory to hold generated shell scripts for Slurm", default=str(Path(os.getcwd(), 'sh')))
340  self.parserparser.add_argument("-E", "--env", nargs='?', help="Full path to env setup script", required=False, default=None)
341  self.parserparser.add_argument("-A", "--account", nargs='?', help="Account name for s3df slurm jobs.", required=False, default=None)
342 
343  def parse_args(self, args):
344 
345  cl = super().parse_args(args)
346 
347  # Set Slurm env script
348  self.envenv = cl.env
349  self.accountaccount = cl.account
350 
351  # Set Slurm scripts dir
352  self.sh_dirsh_dir = os.path.abspath(cl.sh_dir)
353  logger.info('Slurm sh dir: {}'.format(self.sh_dirsh_dir))
354  if not os.path.exists(self.sh_dirsh_dir):
355  os.makedirs(self.sh_dirsh_dir)
356  logger.info('Created Slurm sh dir: {}'.format(self.sh_dirsh_dir))
357 
358  def default_rundir(self, job_id=None):
359  """!
360  Override the basic implementation for getting the default run directory.
361  """
362  if self.sitesitesite == 'slac':
363  run_dir = '$LSCRATCH'
364  elif self.sitesitesite == 'jlab':
365  run_dir = '/scratch/slurm/$SLURM_JOBID'
366  else:
367  run_dir = os.getcwd() + "/scratch/$SLURM_JOBID"
368  return run_dir
369 
370  def _default_queue(self):
371  queue = self.queuequeue
372  if queue is None:
373  if self.sitesitesite == 'slac':
374  queue = 'shared'
375  elif self.sitesitesite == 'jlab':
376  queue = 'ifarm'
377  else:
378  raise Exception('No queue name was provided.')
379  return queue
380 
381  def _sbatch(self, job_id):
382  log_file = self._logfile_logfile(job_id)
383  sbatch_cmd = ['sbatch',
384  '--time=%s' % (str(self.job_lengthjob_length) + ':00:00'),
385  '--mem=%sM' % self.memorymemory,
386  '--job-name=%i_%s' % (job_id, self.script_namescript_name),
387  '--output=%s.out' % log_file,
388  '--error=%s.err' % log_file]
389  if self.queuequeue:
390  sbatch_cmd.extend([f'--partition={self.queue}'])
391  if self.accountaccount:
392  sbatch_cmd.extend([f'--account={self.account}'])
393  if self.emailemail:
394  sbatch_cmd.extend([f'--mail-user={self.email}',
395  f'--mail-type=ALL'])
396  return sbatch_cmd
397 
398  def _sh_filename(self, job_id):
399  return self.sh_dirsh_dir + '/job.%i.sh' % job_id
400 
401  def build_cmd(self, job_id):
402  """!
403  Wrap submission of Slurm jobs using a generated script.
404  """
405 
406  # Get the sbatch command
407  cmd = self._sbatch_sbatch(job_id)
408 
409  # Get name of shell script to generate
410  sh_filename = self._sh_filename_sh_filename(job_id)
411 
412  # Build the basic job command for execution
413  job_cmd = super().build_cmd(job_id)
414  if self.run_dirrun_dir is None:
415  # The superclass will have already set this if the user provided an
416  # explicit run dir. Here we set a default scratch directory if none
417  # was given.
418  job_cmd.extend(['-d', self.default_rundirdefault_rundirdefault_rundir()])
419 
420  # Write the job submission script out
421  self._write_job_script_write_job_script(sh_filename, job_cmd)
422 
423  # Append job run script to Slurm command
424  cmd.append(sh_filename)
425 
426  return cmd
427 
428  def _write_job_script(self, sh_filename, job_cmd):
429  """!
430  Write the shell script for Slurm job submission using the 'sbatch' command.
431  """
432 
433  script_lines = ['#!/bin/bash',
434  '']
435  if self.envenv:
436  script_lines.append(f'source {self.env}')
437  script_lines.extend(['echo Start time: `date`',
438  'echo PWD=`pwd`',
439  'echo ---- Start Environment ----',
440  'env | sort',
441  'echo ---- End Environment ----',
442  'time ' + ' '.join(job_cmd),
443  'echo End time: `date`'])
444 
445  logger.debug("Slurm submission script:\n" + str(script_lines))
446 
447  with open(sh_filename, 'w') as sh_file:
448  for script_line in script_lines:
449  sh_file.write(script_line + '\n')
450 
451  logger.debug('Wrote Slurm submission script to: '.format(str(Path(self.sh_dirsh_dir, sh_filename))))
452 
453  def submit_job(self, job_id):
454  cmd = self.build_cmdbuild_cmdbuild_cmd(job_id)
455  logger.info('Submitting job %s to Slurm with command: %s' % (job_id, ' '.join(cmd)))
456  proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
457  out, err = proc.communicate()
458  if err is not None and len(err):
459  logger.warning(err)
460  tokens = out.decode().split(" ")
461  if tokens[0] != 'Submitted':
462  raise Exception('Unexpected output from sbatch command: %s' % out)
463  batch_id = int(tokens[3].replace('<', '').replace('>', ''))
464  return batch_id
465 
466 
468  """!
469  Submit Auger batch jobs.
470 
471  Auger itself is actually deprecated and unavailable but its submission XML format is supported by
472  the Swif class (see below).
473  """
474 
475  def __init__(self):
476 
477  super().__init__()
478 
479  self.setup_scriptsetup_script = find_executable('hps-mc-env.csh')
480 
481  if not self.setup_scriptsetup_script:
482  raise Exception("Failed to find 'hps-mc-env.csh' in environment.")
483 
484  def submit_job(self, job_id):
485  """!
486  Make this a no-op. Auger is a bit of a special case in terms of how batch submission works with a
487  generated XML file including all job IDs, so we do not implement single job submission.
488  """
489  pass
490 
491  def submit(self):
492  """!
493  Batch submission method for Auger.
494 
495  This differs from some of the other systems in that it doesn't loop over individual
496  job IDs. Instead a single XML file is submitted for all the jobs at once.
497  """
498  xml_filename = self._create_job_xml() # write request to XML file
499  auger_ids = self._jsub(xml_filename) # execute jsub to submit jobs
500  logger.info("Submitted Auger jobs: %s" % str(auger_ids))
501 
502  def _create_job_xml(self):
503  job_ids = self._get_filtered_job_ids_get_filtered_job_ids()
504  logger.info('Submitting jobs: %s' % str(job_ids))
505  req = self._create_req_create_req(self.script_namescript_name) # create request XML header
506  for job_id in job_ids:
507  if not self.jobstorejobstore.has_job_id(job_id):
508  raise Exception('Job ID was not found in job store: %s' % job_id)
509  job_params = self.jobstorejobstore.get_job(job_id)
510  if self.check_outputcheck_output and Batch._outputs_exist(job_params):
511  logger.warning("Skipping Auger submission for job "
512  "because outputs already exist: %d" % job_id)
513  else:
514  self._add_job_add_job(req, job_params) # add job to request
515  return self._write_req_write_req(req) # write request to file
516 
517  def _jsub(self, xml_filename):
518  cmd = ['jsub', '-xml', xml_filename]
519  proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
520  out, err = proc.communicate()
521  auger_ids = self._get_auger_ids_get_auger_ids(out)
522  return auger_ids
523 
524  def _get_auger_ids(self, out):
525  auger_ids = []
526  for line in out.splitlines():
527  if line.strip().startswith(b'<jsub>'):
528  j = ET.fromstring(line)
529  for req in j.getchildren():
530  for child in req.getchildren():
531  if child.tag == 'jobIndex':
532  auger_id = int(child.text)
533  auger_ids.append(auger_id)
534  elif child.tag == 'error':
535  # Submission failed so raise an exception with the error msg
536  raise Exception(child.text)
537  break
538  return auger_ids
539 
540  def _write_req(self, req, filename='temp.xml'):
541  pretty = unescape(minidom.parseString(ET.tostring(req)).toprettyxml(indent=" "))
542  with open(filename, 'w') as f:
543  f.write(pretty)
544  return f.name
545 
546  def _create_req(self, req_name):
547  req = ET.Element("Request")
548  name_elem = ET.SubElement(req, "Name")
549  name_elem.set("name", req_name)
550  prj = ET.SubElement(req, "Project")
551  prj.set("name", "hps")
552  trk = ET.SubElement(req, "Track")
553  if self.debugdebug:
554  # Queue arg is not used when debug flag is active.
555  trk.set("name", "debug")
556  else:
557  # Queue name is used to set job track.
558  queue = 'simulation'
559  if self.queuequeue is not None:
560  queue = self.queuequeue
561  trk.set("name", queue)
562  if self.emailemail:
563  email = ET.SubElement(req, "Email")
564  email.set("email", self.emailemail)
565  email.set("request", "true")
566  email.set("job", "true")
567  mem = ET.SubElement(req, "Memory")
568  mem.set("space", str(self.memorymemory))
569  mem.set("unit", "MB")
570  disk = ET.SubElement(req, "DiskSpace")
571  disk.set("space", str(self.diskspacediskspace))
572  disk.set("unit", "GB")
573  limit = ET.SubElement(req, "TimeLimit")
574  limit.set("time", str(self.job_lengthjob_length))
575  limit.set("unit", "hours")
576  os_elem = ET.SubElement(req, "OS")
577  if self.osos is not None:
578  auger_os = self.osos
579  else:
580  auger_os = 'general'
581  os_elem.set("name", auger_os)
582  return req
583 
584  def build_cmd(self, job_id):
585  cmd = [sys.executable, RUN_SCRIPT, 'run']
586  if len(self.config_filesconfig_files):
587  for cfg in self.config_filesconfig_files:
588  cmd.extend(['-c', cfg])
589  if self.job_stepsjob_steps is not None:
590  cmd.extend(['--job-steps', str(self.job_stepsjob_steps)])
591  cmd.extend(['-i', str(job_id)])
592  cmd.append(self.scriptscript)
593  cmd.append(os.path.abspath(self.jobstorejobstore.path))
594  logger.debug("Job command: %s" % " ".join(cmd))
595  return cmd
596 
597  def _create_job(self, params):
598  """! Needed for resolving ptag output sources."""
599  j = Job()
600  j.script = self.scriptscript
601  j._load_params(params)
602  j._load_script()
603  return j
604 
605  def _add_job(self, req, job_params):
606  job = ET.SubElement(req, "Job")
607  job_id = job_params['job_id']
608  year = '' # /todo change to number
609  if 'year' in job_params.keys():
610  year = job_params['year']
611 
612  if 'input_files' in list(job_params.keys()):
613  inputfiles = job_params['input_files']
614  for src, dest in inputfiles.items():
615  if not src.startswith('http'):
616  input_elem = ET.SubElement(job, "Input")
617  input_elem.set("dest", dest)
618  if src.startswith("/mss"):
619  src_file = "mss:%s" % src
620  else:
621  src_file = src
622  input_elem.set("src", src_file)
623  else:
624  logger.warning("http input file will not be included in XML job descr: {}".format(src))
625  outputfiles = job_params["output_files"]
626  outputdir = job_params["output_dir"]
627  # outputdir = os.path.realpath(outputdir)
628  j = self._create_job_create_job(job_params)
629  for src, dest in outputfiles.items():
630  output_elem = ET.SubElement(job, "Output")
631  res_src = j.resolve_output_src(src)
632  output_elem.set("src", res_src)
633  dest_file = os.path.abspath(os.path.join(outputdir, dest))
634  if dest_file.startswith("/mss"):
635  dest_file = "mss:%s" % dest_file
636  logger.debug('Auger dest file: {} -> {}'.format(src, dest))
637  output_elem.set("dest", dest_file)
638 
639  job_name = ET.SubElement(job, "Name")
640  job_name.set("name", '%ihps%i' % (year, job_id))
641 
642  job_err = ET.SubElement(job, "Stderr")
643  stdout_file = os.path.abspath(os.path.join(self.log_dirlog_dir, "job.%d.out" % job_id))
644  stderr_file = os.path.abspath(os.path.join(self.log_dirlog_dir, "job.%d.err" % job_id))
645  job_err.set("dest", stderr_file)
646  job_out = ET.SubElement(job, "Stdout")
647  job_out.set("dest", stdout_file)
648 
649  cmd = ET.SubElement(job, "Command")
650  cmd_lines = []
651  cmd_lines.append("<![CDATA[")
652 
653  cmd_lines.append('pwd;\n')
654  cmd_lines.append('env | sort;\n')
655  cmd_lines.append('ls -lart;\n')
656  cmd_lines.append("source %s;\n" % os.path.realpath(self.setup_scriptsetup_script))
657  cmd_lines.append("source %s/bin/jlab-env.csh;\n" % os.getenv('HPSMC_DIR'))
658 
659  job_cmd = self.build_cmdbuild_cmdbuild_cmd(job_id)
660 
661  # Write log file locally so it can be copied back with Output element
662  # log_file = 'job.%d.log' % job_id
663  # job_cmd.extend(['-l', '$PWD/%s' % log_file])
664  # log_out_elem = ET.SubElement(job, "Output")
665  # log_out_elem.set('src', log_file)
666  # log_out_elem.set('dest', os.path.join(self.log_dir, log_file))
667 
668  cmd_lines.extend(job_cmd)
669  cmd_lines.append(';\n')
670 
671  cmd_lines.append('ls -lart; \n')
672 
673  cmd_lines.append("]]>")
674 
675  # logger.debug(cmd_lines)
676 
677  cmd.text = ' '.join(cmd_lines)
678 
679 
680 class Swif(Auger):
681  """!
682  Submit using the 'swif2' command at JLAB using an Auger file.
683 
684  This is just a thin wrapper of the parent class to call the swif2 commands with the generated Auger XML file.
685 
686  Existing workflows generated by this class should be fully canceled and removed before resubmitting using this
687  interface.
688  """
689 
690  def __init__(self):
691 
692  super().__init__()
693 
694  self.parserparser.add_argument("-w", "--workflow", nargs='?', help="Name of swif2 workflow", required=False)
695 
696  def parse_args(self, args):
697  cl = super().parse_args(args)
698  if cl.workflow:
699  self.workflowworkflow = cl.workflow
700  else:
701  self.workflowworkflow = self.script_namescript_name
702  logger.debug(f'swif workflow name set to: {self.workflow}')
703  return cl
704 
705  def submit(self):
706 
707  logger.info("Submitting swif workflow: {}".format(self.workflowworkflow))
708 
709  # Write request to XML file
710  xml_filename = self._create_job_xml_create_job_xml()
711 
712  # Add job to swif2 workflow using Auger XML file
713  cmd = ['swif2', 'add-jsub', self.workflowworkflow, '-script', xml_filename]
714  proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
715  out = proc.communicate()[0]
716  print("".join([s for s in out.decode().strip().splitlines(True) if s.strip()]))
717  proc.wait()
718 
719  # Run the workflow
720  run_cmd = ['swif2', 'run', self.workflowworkflow]
721  proc = subprocess.Popen(run_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
722  out = proc.communicate()[0]
723  print("".join([s for s in out.decode().strip().splitlines(True) if s.strip()]))
724  proc.wait()
725 
726 
727 class Local(Batch):
728  """!
729  Run local batch jobs sequentially.
730  """
731 
732  def __init__(self):
733  super().__init__()
734 
735  def submit_job(self, job_id):
736  """! Run a single job locally."""
737  cmd = self.build_cmdbuild_cmd(job_id)
738  if self.submitsubmit:
739  logger.info(f"Executing local job: {job_id}")
740  proc = subprocess.Popen(cmd, shell=False)
741  proc.communicate()
742  if proc.returncode:
743  logger.error(f"Local execution of {job_id} returned error code: {proc.returncode}")
744 
745 
746 # Queue used to keep track of processes created by batch pool.
747 mp_queue = multiprocessing.Queue()
748 
749 
750 def run_job_pool(cmd):
751  """! Run the command in a new process whose PID is added to a global MP queue."""
752  try:
753  sys.stdout.flush()
754  proc = subprocess.Popen(cmd, preexec_fn=os.setsid)
755  mp_queue.put(proc.pid)
756  proc.wait()
757  returncode = proc.returncode
758  except subprocess.CalledProcessError as e:
759  logger.error(str(e))
760  sys.stdout.flush()
761  pass
762  return returncode
763 
764 
765 def is_running(proc):
766  """!
767  Check if a system process looks like it is still running.
768  """
769  return proc.status() in [psutil.STATUS_RUNNING,
770  psutil.STATUS_SLEEPING,
771  psutil.STATUS_DISK_SLEEP,
772  psutil.STATUS_IDLE]
773 
774 
776  """!
777  Kill processes in the multiprocessing queue if the jobs are canceled.
778  """
779 
780  def __init__(self, mp_queue):
781  self.mp_queuemp_queue = mp_queue
782 
783  def __enter__(self):
784  return self
785 
786  def __exit__(self, type, val, tb):
787  """! Kill processes on exit."""
788  while True:
789  pid = mp_queue.get()
790  try:
791  parent = psutil.Process(pid)
792  for child in parent.children(recursive=True):
793  if is_running(child):
794  print('Killing running process: %d' % child.pid)
795  child.kill()
796  if is_running(parent):
797  parent.kill()
798  except Exception as e:
799  # This probably just means it already finished.
800  pass
801 
802  if mp_queue.empty():
803  break
804 
805 
806 class Pool(Batch):
807  """!
808  Run a set of jobs in a local multiprocessing pool using Python's multiprocessing module.
809 
810  The number of processes to spawn can be provided using the '-p' argument.
811  """
812 
813  # Max wait in seconds when getting results
814  max_wait = 999999
815 
816  def __init__(self):
817  super().__init__()
818  self.parserparser.add_argument("-p", "--pool-size", type=int,
819  help="Job pool size (only applicable when running pool)", required=False,
820  default=multiprocessing.cpu_count())
821 
822  def submit_job(self, job_id):
823  """!
824  Make this a no-op as we do not implement single job submission for the processing pool.
825  """
826  pass
827 
828  def parse_args(self, args):
829  cl = super().parse_args(args)
830  self.pool_sizepool_size = int(cl.pool_size)
831  return cl
832 
833  def submit(self):
834  """! Submit jobs to a local processing pool.
835 
836  This method will not return until all jobs are finished or execution
837  is interrupted.
838  """
839 
840  cmds = []
841  for job_id in self._get_filtered_job_ids_get_filtered_job_ids():
842  cmd = self.build_cmdbuild_cmd(job_id)
843  cmds.append(cmd)
844 
845  # logger.debug('Running job commands in pool ...')
846  # logger.debug('\n'.join([' '.join(cmd) for cmd in cmds]))
847 
848  if not len(cmds):
849  raise Exception('No job IDs found to submit')
850 
851  # Run jobs in an MP pool and cleanup child processes on exit
852  with KillProcessQueue(mp_queue):
853  original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
854  pool = multiprocessing.Pool(self.pool_sizepool_size)
855  signal.signal(signal.SIGINT, original_sigint_handler)
856  try:
857  logger.info("Running %d jobs in pool ..." % len(cmds))
858  res = pool.map_async(run_job_pool, cmds)
859  # timeout must be properly set, otherwise tasks will crash
860  logger.info("Pool results: " + str(res.get(Pool.max_wait)))
861  logger.info("Normal termination")
862  pool.close()
863  pool.join()
864  except KeyboardInterrupt:
865  logger.fatal("Caught KeyboardInterrupt, terminating workers")
866  pool.terminate()
867  except Exception as e:
868  logger.fatal("Caught Exception '%s', terminating workers" % (str(e)))
869  pool.terminate()
870  except BaseException: # catch *all* exceptions
871  e = sys.exc_info()[0]
872  logger.fatal("Caught non-Python Exception '%s'" % (e))
873  pool.terminate()
874 
875 
876 if __name__ == '__main__':
877  system_dict = {
878  "lsf": LSF,
879  "slurm": Slurm,
880  "auger": Auger,
881  "local": Local,
882  "pool": Pool,
883  "swif": Swif
884  }
885  if len(sys.argv) > 1:
886  system = sys.argv[1].lower()
887  if system not in list(system_dict.keys()):
888  raise Exception(f"The batch system {system} is not valid.")
889  batch = system_dict[system]()
890  args = sys.argv[2:]
891  batch.parse_args(args)
892  batch.submit()
893  else:
894  print("Usage: batch.py [system] [args]")
895  print(" Available systems: {}".format(', '.join(list(system_dict.keys()))))
Submit Auger batch jobs.
Definition: batch.py:467
def submit_job(self, job_id)
Make this a no-op.
Definition: batch.py:484
def _write_req(self, req, filename='temp.xml')
Definition: batch.py:540
def submit(self)
Batch submission method for Auger.
Definition: batch.py:491
def _jsub(self, xml_filename)
Definition: batch.py:517
def build_cmd(self, job_id)
This is the basic implementation of building a command to run the job from a batch system.
Definition: batch.py:584
def _create_job_xml(self)
Definition: batch.py:502
def _create_job(self, params)
Needed for resolving ptag output sources.
Definition: batch.py:597
def _get_auger_ids(self, out)
Definition: batch.py:524
def _create_req(self, req_name)
Definition: batch.py:546
def _add_job(self, req, job_params)
Definition: batch.py:605
def __init__(self)
Definition: batch.py:475
Represents a batch processing system that requires submission like Slurm or Auger.
Definition: batch.py:231
def parse_args(self, args)
Parse command line arguments and perform setup.
Definition: batch.py:256
def __init__(self)
Definition: batch.py:239
Generic batch processing interface.
Definition: batch.py:35
def submit_job(self, job_id)
Submit a single batch job and return the batch ID.
Definition: batch.py:129
def _job_ids_missing_output(self, job_ids)
Get a list of IDs for jobs that are missing output files.
Definition: batch.py:226
def _outputs_exist(job)
Check if all output files exist for the given job.
Definition: batch.py:197
def default_rundir(self, job_id=None)
Definition: batch.py:155
def submit(self)
This is the generic batch submission function which gets a list of jobs to run based on command line ...
Definition: batch.py:140
def build_cmd(self, job_id)
This is the basic implementation of building a command to run the job from a batch system.
Definition: batch.py:160
def _get_filtered_job_ids(self)
Get a list of job IDs to submit based on parsed command line options and whether output files are bei...
Definition: batch.py:209
def parse_args(self, args)
Parse command line arguments and perform setup.
Definition: batch.py:58
def _logfile(self, job_id)
Get the base name of a log file for the job.
Definition: batch.py:190
def __init__(self)
Definition: batch.py:40
Kill processes in the multiprocessing queue if the jobs are canceled.
Definition: batch.py:775
def __init__(self, mp_queue)
Definition: batch.py:780
def __exit__(self, type, val, tb)
Kill processes on exit.
Definition: batch.py:786
Submit LSF batch jobs.
Definition: batch.py:281
def submit_job(self, job_id)
Submit a single batch job and return the batch ID.
Definition: batch.py:318
def build_cmd(self, job_id)
This is the basic implementation of building a command to run the job from a batch system.
Definition: batch.py:291
def parse_args(self, args)
Parse command line arguments and perform setup.
Definition: batch.py:287
def __init__(self)
Definition: batch.py:284
Run local batch jobs sequentially.
Definition: batch.py:727
def submit_job(self, job_id)
Run a single job locally.
Definition: batch.py:735
def __init__(self)
Definition: batch.py:732
Run a set of jobs in a local multiprocessing pool using Python's multiprocessing module.
Definition: batch.py:806
def submit_job(self, job_id)
Make this a no-op as we do not implement single job submission for the processing pool.
Definition: batch.py:822
def submit(self)
Submit jobs to a local processing pool.
Definition: batch.py:833
def parse_args(self, args)
Parse command line arguments and perform setup.
Definition: batch.py:828
def __init__(self)
Definition: batch.py:816
Submit Slurm batch jobs.
Definition: batch.py:332
def submit_job(self, job_id)
Submit a single batch job and return the batch ID.
Definition: batch.py:453
def _default_queue(self)
Definition: batch.py:370
def default_rundir(self, job_id=None)
Override the basic implementation for getting the default run directory.
Definition: batch.py:358
def _sbatch(self, job_id)
Definition: batch.py:381
def _sh_filename(self, job_id)
Definition: batch.py:398
def build_cmd(self, job_id)
Wrap submission of Slurm jobs using a generated script.
Definition: batch.py:401
def parse_args(self, args)
Parse command line arguments and perform setup.
Definition: batch.py:343
def _write_job_script(self, sh_filename, job_cmd)
Write the shell script for Slurm job submission using the 'sbatch' command.
Definition: batch.py:428
def __init__(self)
Definition: batch.py:335
Submit using the 'swif2' command at JLAB using an Auger file.
Definition: batch.py:680
def submit(self)
Batch submission method for Auger.
Definition: batch.py:705
def parse_args(self, args)
Parse command line arguments and perform setup.
Definition: batch.py:696
def __init__(self)
Definition: batch.py:690
Database of job scripts.
Definition: job.py:125
Simple JSON based store of job data.
Definition: job.py:73
Primary class to run HPS jobs from a Python script.
Definition: job.py:160
def run_job_pool(cmd)
Run the command in a new process whose PID is added to a global MP queue.
Definition: batch.py:750
def is_running(proc)
Check if a system process looks like it is still running.
Definition: batch.py:765
Definition: job.py:1