HPS-MC
job.py
Go to the documentation of this file.
1 """! @package job
2 Primary class for running and managing HPSMC jobs defined by a set of components.
3 """
4 
5 import os
6 import sys
7 import json
8 import time
9 import shutil
10 import filecmp
11 import argparse
12 import getpass
13 import logging
14 import glob
15 import subprocess
16 import copy
17 import pathlib
18 
19 from collections.abc import Sequence
20 from os.path import expanduser
21 
22 from hpsmc._config import convert_config_value
23 from hpsmc import global_config
24 
25 
26 logger = logging.getLogger('hpsmc.job')
27 
28 
29 class JobConfig(object):
30  """! Wrapper for accessing config information from parser."""
31 
32  def __init__(self):
33  self.parserparser = copy.copy(global_config)
34 
35  def __str__(self):
36  parser_lines = ['Job configuration:']
37  for section in self.parserparser.sections():
38  parser_lines.append("[" + section + "]")
39  for i, v in self.parserparser.items(section):
40  parser_lines.append("%s=%s" % (i, v))
41  parser_lines.append('')
42  return '\n'.join(parser_lines)
43 
44  def config(self, obj, section=None, required_names=[], allowed_names=[], require_section=True):
45  """! Push config into an object by setting an attribute.
46  @param obj object config is pushed to
47  @param section object name (basically name of the component the config is pushed to?)
48  @param required_names list of names of required configurations
49  @param allowed_names list of allowed config names
50  """
51  if not section:
52  section = obj.__class__.__name__
53  if self.parserparser.has_section(section):
54  # Check that required settings are there.
55  for req in required_names:
56  if req not in dict(self.parserparser.items(section)):
57  raise Exception("Missing required config '%s'" % req)
58  # Push each config item into the object by setting attribute.
59  for name, value in self.parserparser.items(section):
60  if len(allowed_names) and name not in allowed_names:
61  raise Exception("Config name '%s' is not allowed for '%s'" % (name, section))
62  setattr(obj, name, convert_config_value(value))
63  # logger.info("%s:%s:%s=%s" % (obj.__class__.__name__,
64  # name,
65  # getattr(obj, name).__class__.__name__,
66  # getattr(obj, name)))
67  elif require_section:
68  raise Exception("Missing required config section '%s'" % section)
69  else:
70  logger.warning('Config section not found: %s' % section)
71 
72 
73 class JobStore:
74  """!
75  Simple JSON based store of job data.
76  """
77 
78  def __init__(self, path=None):
79  self.pathpath = path
80  self.datadata = {}
81  if self.pathpath:
82  self.loadload(self.pathpath)
83  else:
84  logger.warning('Path was not provided to job store - no jobs loaded!')
85 
86  def load(self, json_store):
87  """! Load raw JSON data into this job store.
88  @param json_store json file containing raw job data
89  """
90  if not os.path.exists(json_store):
91  raise Exception('JSON job store does not exist: {}'.format(json_store))
92  with open(json_store, 'r') as f:
93  json_data = json.loads(f.read())
94  for job_data in json_data:
95  if 'job_id' not in job_data:
96  raise Exception(f"Job data is missing a job id")
97  job_id = int(job_data['job_id'])
98  if job_id in self.datadata:
99  raise Exception(f"Duplicate job id: {job_id}")
100  self.datadata[job_id] = job_data
101  logger.debug("Loaded job {} data:\n {}".format(job_id, job_data))
102  logger.info(f"Successfully loaded {len(self.data)} jobs from: {json_store}")
103 
104  def get_job(self, job_id):
105  """! Get a job by its job ID.
106  @param job_id job ID
107  @return job"""
108  if not self.has_job_idhas_job_id(job_id):
109  raise Exception(f"Job ID does not exist: {job_id}")
110  return self.datadata[int(job_id)]
111 
112  def get_job_data(self):
113  """! Get the raw dict containing all the job data."""
114  return self.datadata
115 
116  def get_job_ids(self):
117  """! Get a sorted list of job IDs."""
118  return sorted(self.datadata.keys())
119 
120  def has_job_id(self, job_id):
121  """! Return true if the job ID exists in the store."""
122  return int(job_id) in list(self.datadata.keys())
123 
124 
126  """! Database of job scripts.
127  """
128 
129  def __init__(self):
130  if 'HPSMC_DIR' not in os.environ:
131  raise Exception('HPSMC_DIR is not set in the environ.')
132  hps_mc_dir = os.environ['HPSMC_DIR']
133  script_dir = os.path.join(hps_mc_dir, 'lib', 'python', 'jobs')
134 
135  self.scriptsscripts = {}
136  for f in glob.glob(os.path.join(script_dir, '*_job.py')):
137  script_name = os.path.splitext(os.path.basename(f))[0].replace('_job', '')
138  self.scriptsscripts[script_name] = f
139 
140  def get_script_path(self, name):
141  """! Get path to job script from job name.
142  @param job name
143  @return path to job script"""
144  return self.scriptsscripts[name]
145 
146  def get_script_names(self):
147  """! Get list of all script names."""
148  return list(self.scriptsscripts.keys())
149 
150  def get_scripts(self):
151  """! Get dict containing paths to scripts sorted by script names."""
152  return self.scriptsscripts
153 
154  def exists(self, name):
155  """! Test if job script exists in dict.
156  @return True if job name is key in job dict"""
157  return name in self.scriptsscripts
158 
159 
160 class Job(object):
161  """!
162  Primary class to run HPS jobs from a Python script.
163 
164  Jobs are run by executing a series of components
165  which are configured using a config file with parameters
166  provided by a JSON job file.
167  """
168 
169 
171  """
172  _config_names = ['enable_copy_output_files',
173  'enable_copy_input_files',
174  'delete_existing',
175  'delete_rundir',
176  'dry_run',
177  'ignore_return_codes',
178  'check_output_files',
179  'enable_file_chaining',
180  'enable_env_config']
181  """
182 
183  # Prefix to indicate ptag in job param file.
184  PTAG_PREFIX = 'ptag:'
185 
186  def __init__(self, args=sys.argv, **kwargs):
187 
188  if 'HPSMC_DIR' not in os.environ:
189  raise Exception('HPSMC_DIR is not set in the environ.')
190  self.hpsmc_dirhpsmc_dir = os.environ['HPSMC_DIR']
191 
192 
193  self.argsargs = args
194 
195  self.job_configjob_config = JobConfig()
196 
197  self.descriptiondescription = "HPS MC Job"
198 
199  self.job_idjob_id = None
200 
201  self.param_fileparam_file = None
202 
203  self.componentscomponents = []
204 
205  self.rundirrundir = os.getcwd()
206 
207  self.paramsparams = {}
208 
209  self.output_diroutput_dir = os.getcwd()
210 
211  self.input_filesinput_files = {}
212 
213  self.output_filesoutput_files = {}
214 
215  self.ptagsptags = {}
216 
217  self.component_outcomponent_out = sys.stdout
218 
219  self.component_errcomponent_err = sys.stderr
220 
221  self.scriptscript = None
222 
223  self.job_stepsjob_steps = None
224 
225  self.hps_fieldmaps_dirhps_fieldmaps_dir = None
226 
227 
228  self.enable_copy_output_filesenable_copy_output_files = True
229  self.enable_copy_input_filesenable_copy_input_files = True
230  self.delete_existingdelete_existing = False
231  self.delete_rundirdelete_rundir = False
232  self.dry_rundry_run = False
233  self.ignore_return_codesignore_return_codes = True
234  self.check_output_filescheck_output_files = True
235  self.check_commandscheck_commands = False
236  self.enable_file_chainingenable_file_chaining = True
237  self.enable_env_configenable_env_config = False
238 
239  def add(self, component):
240  """!
241  Public method for adding components to the job.
242  """
243  if isinstance(component, Sequence) and not isinstance(component, str):
244  self.componentscomponents.extend(component)
245  else:
246  self.componentscomponents.append(component)
247 
248  def set_parameters(self, params):
249  """!
250  Add parameters to the job, overriding values if they exist already.
251 
252  This method can be used in job scripts to define default values.
253  """
254  for k, v in params.items():
255  if k in self.paramsparams:
256  logger.debug("Setting new value '%s' for parameter '%s' with existing value '%s'."
257  % (str(v), str(k), params[k]))
258  self.paramsparams[k] = v
259 
260  def parse_args(self):
261  """!
262  Configure the job from command line arguments.
263  """
264 
265  parser = argparse.ArgumentParser(description=self.descriptiondescription)
266  parser.add_argument("-c", "--config-file", help="Config file locations", action='append')
267  parser.add_argument("-d", "--run-dir", nargs='?', help="Job run dir")
268  parser.add_argument("-o", "--out", nargs='?', help="File for component stdout (default prints to console)")
269  parser.add_argument("-e", "--err", nargs='?', help="File for component stderr (default prints to console)")
270  parser.add_argument("-s", "--job-steps", type=int, default=None, help="Job steps to run (single number)")
271  parser.add_argument("-i", "--job-id", type=int, help="Job ID from JSON job store", default=None)
272  parser.add_argument("script", nargs='?', help="Path or name of job script")
273  parser.add_argument("params", nargs='?', help="Job param file in JSON format")
274 
275 
281 
282  cl = parser.parse_args(self.argsargs)
283 
284  # Read in job configuration files
285  if cl.config_file:
286  config_files = list(map(os.path.abspath, cl.config_file))
287  logger.info("Reading additional config from: {}".format(config_files))
288  self.job_configjob_config.parser.read(config_files)
289 
290  # Set file for stdout from components
291  if cl.out:
292  out_file = cl.out
293  if not os.path.isabs(out_file):
294  out_file = os.path.abspath(out_file)
295  logger.info('Job output will be written to: %s' % out_file)
296  self.outout = open(out_file, 'w')
297 
298  # Set file for stderr from components
299  if cl.err:
300  err_file = cl.err
301  if not os.path.isabs(err_file):
302  err_file = os.path.abspath(err_file)
303  logger.info('Job error will be written to: %s' % err_file)
304  self.errerr = open(err_file, 'w')
305 
306  if cl.run_dir:
307  self.rundirrundir = os.path.abspath(cl.run_dir)
308 
309  self.job_stepsjob_steps = cl.job_steps
310  if self.job_stepsjob_steps is not None and self.job_stepsjob_steps < 1:
311  raise Exception("Invalid job steps argument (must be > 0): {}".format(self.job_stepsjob_steps))
312 
313  if cl.script:
314 
315  self.scriptscript = cl.script
316  else:
317  raise Exception('Missing required script name or location.')
318 
319  # Params are actually optional as some job scripts might not need them.
320  if cl.params:
321  self.param_fileparam_file = os.path.abspath(cl.params)
322  params = {}
323  if cl.job_id:
324  # Load data from a job store containing multiple jobs.
325  self.job_idjob_id = cl.job_id
326  logger.debug("Loading job with ID %d from job store '%s'" % (self.job_idjob_id, self.param_fileparam_file))
327  jobstore = JobStore(self.param_fileparam_file)
328  if jobstore.has_job_id(self.job_idjob_id):
329  params = jobstore.get_job(self.job_idjob_id)
330  else:
331  raise Exception("No job id %d was found in the job store '%s'" % (self.job_idjob_id, self.param_fileparam_file))
332  else:
333  # Load data from a JSON file with a single job definition.
334  logger.info('Loading job parameters from file: %s' % self.param_fileparam_file)
335  params = json.loads(open(self.param_fileparam_file, 'r').read())
336  if not isinstance(params, dict):
337  raise Exception('Job ID must be provided when running from a job store.')
338 
339  self._load_params_load_params(params)
340 
341  def _load_params(self, params):
342  """!
343  Load the job parameters from JSON data.
344  """
345 
346  self.set_parametersset_parameters(params)
347 
348  # logger.info(json.dumps(self.params, indent=4, sort_keys=False))
349 
350  if 'output_dir' in self.paramsparams:
351  self.output_diroutput_dir = self.paramsparams['output_dir']
352  if not os.path.isabs(self.output_diroutput_dir):
353  self.output_diroutput_dir = os.path.abspath(self.output_diroutput_dir)
354  logger.debug("Changed output dir to abs path: %s" % self.output_diroutput_dir)
355 
356  if 'job_id' in self.paramsparams:
357  self.job_idjob_id = self.paramsparams['job_id']
358 
359  if 'input_files' in self.paramsparams:
360  self.input_filesinput_files = self.paramsparams['input_files']
361 
362  if 'output_files' in self.paramsparams:
363  self.output_filesoutput_files = self.paramsparams['output_files']
364 
365  def _set_input_files(self):
366  """!
367  Prepare dictionary of input files.
368 
369  If a link to a download location is given as input, the file is downloaded into the run directory before the file name is added to the input_files dict. If a regular file is provided, it is added to the dict without any additional action.
370  """
371  input_files_dict = {}
372  for file_key, file_name in self.input_filesinput_files.items():
373  if 'https' in file_key:
374  logger.info("Downloading input file from: %s" % file_key)
375  file_name_path = self.rundirrundir + "/" + file_name
376  # \todo FIXME: We need to make sure wget is installed locally during the build or use a python lib like requests.
377  subprocess.check_output(['wget', '-q', '-O', file_name_path, file_key])
378  input_files_dict.update({file_name: file_name})
379  else:
380  input_files_dict.update({file_key: file_name})
381  self.input_filesinput_files = input_files_dict
382 
383  def _initialize(self):
384  """!
385  Perform basic initialization before the job script is loaded.
386  """
387 
388  if not os.path.isabs(self.rundirrundir):
389  self.rundirrundir = os.path.abspath(self.rundirrundir)
390  logger.info('Changed run dir to abs path: %s' % self.rundirrundir)
391  # raise Exception('The run dir is not an absolute path: %s' % self.rundir)
392 
393  # Set run dir if running inside LSF
394  if "LSB_JOBID" in os.environ:
395  self.rundirrundir = os.path.join("/scratch", getpass.getuser(), os.environ["LSB_JOBID"])
396  logger.info('Set run dir for LSF: %s' % self.rundirrundir)
397  self.delete_rundirdelete_rundir = True
398 
399  # Create run dir if it does not exist
400  if not os.path.exists(self.rundirrundir):
401  logger.info('Creating run dir: %s' % self.rundirrundir)
402  os.makedirs(self.rundirrundir)
403 
404  def _configure(self):
405  """!
406  Configure job class and components.
407  """
408 
409  # Configure job class
410  self.job_configjob_config.config(self, require_section=False)
411 
412  # Configure the location of the fieldmap files
413  self._config_fieldmap_dir_config_fieldmap_dir()
414 
415  # Configure each of the job components
416  for component in self.componentscomponents:
417 
418  # Configure logging for the component.
419  component.config_logging(self.job_configjob_config.parser)
420 
421  # Configure the component from job configuration.
422  component.config(self.job_configjob_config.parser)
423 
424 
425  if self.enable_env_configenable_env_config:
426  # Configure from env vars, if enabled.
427  component.config_from_environ()
428 
429  # Check that the config is acceptable.
430  component.check_config()
431 
432  def _load_script(self):
433  """!
434  Load the job script.
435  """
436  # This might be okay if the user is manually adding components to a job for testing
437  # without the command line interface. If no components are added before the job is
438  # run, then this will be caught later.
439  if self.scriptscript is None:
440  logger.warning("No job script was provided!")
441  return
442 
443  if not self.scriptscript.endswith('.py'):
444  # Script is a name.
445  script_db = JobScriptDatabase()
446  if not script_db.exists(self.scriptscript):
447  raise Exception("The script name is not valid: %s" % self.scriptscript)
448  script_path = script_db.get_script_path(self.scriptscript)
449  logger.debug("Found script '%s' from name '%s'" % (script_path, self.scriptscript))
450  else:
451  # Script is a path.
452  script_path = self.scriptscript
453 
454  if not os.path.exists(script_path):
455  raise Exception('Job script does not exist: %s' % script_path)
456 
457  logger.info('Loading job script: %s' % script_path)
458 
459  exec(compile(open(script_path, "rb").read(), script_path, 'exec'), {'job': self})
460 
461  def run(self):
462  """!
463  This is the primary execution method for running the job.
464  """
465 
466  logger.info('Job ID: ' + str(self.job_idjob_id))
467  logger.info('Description: %s' % self.descriptiondescription)
468 
469  # Print config to the log
470  logger.info(str(self.job_configjob_config))
471 
472  # Initialize after CL parameters were parsed.
473  self._initialize_initialize()
474 
475  # Load the job components from the script
476  self._load_script_load_script()
477 
478  if not len(self.componentscomponents):
479  raise Exception("Job has no components to execute.")
480 
481  # Print list of job components
482  logger.info("Job components loaded: %s" % ([c.name for c in self.componentscomponents]))
483 
484  # Print job parameters.
485  if len(self.paramsparams) > 0:
486  logger.info("Job parameters loaded: %s" % str(self.paramsparams))
487  else:
488  logger.info("No job parameters were specified!")
489 
490  # This will configure the Job class and its components by copying
491  # information into them from loaded config files.
492  self._configure_configure()
493 
494  # This (re)sets the input correctly
495  self._set_input_files_set_input_files()
496 
497  # Set component parameters from job JSON file.
498  self._set_parameters_set_parameters()
499 
500  # Perform component setup to prepare for execution.
501  # May use config and parameters that were set from above.
502  self._setup_setup()
503 
504  if not self.dry_rundry_run:
505  if self.enable_copy_input_filesenable_copy_input_files:
506  # Copy input files to the run dir.
507  self._copy_input_files_copy_input_files()
508  else:
509  # Symlink input files if copying is disabled.
510  self._symlink_input_files_symlink_input_files()
511 
512  # Save job start time
513  start_time = time.time()
514 
515  # Execute the job.
516  self._execute_execute()
517 
518  # Copy the output files to the output dir if enabled and not in dry run.
519  if not self.dry_rundry_run:
520 
521  # Print job timer info
522  stop_time = time.time()
523  elapsed = stop_time - start_time
524  logger.info("Job execution took {} seconds".format(round(elapsed, 4)))
525 
526  # Copy by file path or ptag
527  if self.enable_copy_output_filesenable_copy_output_files:
528 
529  self._copy_output_files_copy_output_files()
530  else:
531  logger.warning('Copy output files is disabled!')
532 
533  # Perform job cleanup.
534  self._cleanup_cleanup()
535 
536  logger.info('Successfully finished running job: %s' % self.descriptiondescription)
537 
538  def _execute(self):
539  """!
540  Execute all components in job.
541 
542  If dry_run is set to True, the components will not be exectuted,
543  list of components will be put out instead.
544  """
545  if not self.dry_rundry_run:
546 
547  for component in self.componentscomponents:
548 
549  logger.info("Executing '%s' with command: %s" % (component.name, component.cmd_line_str()))
550  # logger.info("Component IO: {} -> {}".format(str(component.input_files(), component.output_files())))
551 
552  # Print header to stdout
553  self.component_outcomponent_out.write('================ Component: %s ================\n' % component.name)
554  self.component_outcomponent_out.flush()
555 
556  # Print header to stderr if output is going to a file
557  if self.component_errcomponent_err != sys.stderr:
558  self.component_errcomponent_err.write('================ Component: %s ================\n' % component.name)
559  self.component_errcomponent_err.flush()
560 
561  start = time.time()
562  returncode = component.execute(self.component_outcomponent_out, self.component_errcomponent_err)
563  end = time.time()
564  elapsed = end - start
565  logger.info("Execution of {} took {} second(s) with return code: {}"
566  .format(component.name, round(elapsed, 4), str(returncode)))
567 
568  if not self.ignore_return_codesignore_return_codes and returncode:
569  raise Exception("Non-zero return code %d from '%s'" % (returncode, component.name))
570 
571  if self.check_output_filescheck_output_files:
572  for outputfile in component.output_files():
573  if not os.path.isfile(outputfile):
574  raise Exception("Output file '%s' is missing after execution." % outputfile)
575  else:
576  # Dry run mode. Just print component command but do not execute it.
577  logger.info("Dry run enabled. Components will NOT be executed!")
578  for component in self.componentscomponents:
579  logger.info("'%s' with args: %s (DRY RUN)" % (component.name, ' '.join(component.cmd_args())))
580 
581  def _setup(self):
582  """!
583  Necessary setup before job can be executed.
584  """
585 
586  # Change to run dir
587  logger.info('Changing to run dir: %s' % self.rundirrundir)
588  os.chdir(self.rundirrundir)
589 
590  # Create a symlink to the fieldmap directory
591  self._symlink_fieldmap_dir_symlink_fieldmap_dir()
592 
593  # Limit components according to job steps
594  if self.job_stepsjob_steps is not None:
595  if self.job_stepsjob_steps > 0:
596  self.componentscomponents = self.componentscomponents[0:self.job_stepsjob_steps]
597  logger.info("Job is limited to first %d steps." % self.job_stepsjob_steps)
598 
599  if self.enable_file_chainingenable_file_chaining:
600  self._config_file_pipeline_config_file_pipeline()
601 
602  # Run setup methods of each component
603  for component in self.componentscomponents:
604  logger.debug('Setting up component: %s' % (component.name))
605  component.rundir = self.rundirrundir
606  component.setup()
607  if self.check_commandscheck_commands and not component.cmd_exists():
608  raise Exception("Command '%s' does not exist for '%s'." % (component.command, component.name))
609 
611  """!
612  Pipe component outputs to inputs automatically.
613  """
614  for i in range(0, len(self.componentscomponents)):
615  component = self.componentscomponents[i]
616  logger.debug("Configuring file IO for component '%s' with order %d" % (component. name, i))
617  if i == 0:
618  logger.debug("Setting inputs on '%s' to: %s"
619  % (component.name, str(list(self.input_filesinput_files.values()))))
620  if not len(component.inputs):
621  component.inputs = list(self.input_filesinput_files.values())
622  elif i > -1:
623  logger.debug("Setting inputs on '%s' to: %s"
624  % (component.name, str(self.componentscomponents[i - 1].output_files())))
625  if len(component.inputs) == 0:
626  component.inputs = self.componentscomponents[i - 1].output_files()
627 
628  def _set_parameters(self):
629  """!
630  Push JSON job parameters to components.
631  """
632  for component in self.componentscomponents:
633  component.set_parameters(self.paramsparams)
634 
635  def _cleanup(self):
636  """!
637  Perform post-job cleanup.
638  """
639  for component in self.componentscomponents:
640  logger.info('Running cleanup for component: %s' % str(component.name))
641  component.cleanup()
642  if self.delete_rundirdelete_rundir:
643  logger.info('Deleting run dir: %s' % self.rundirrundir)
644  if os.path.exists("%s/__swif_env__" % self.rundirrundir):
645  for f in os.listdir(self.rundirrundir):
646  if ('.log' not in f) and ('__swif_' not in f):
647  os.system('rm -r %s' % f)
648  else:
649  shutil.rmtree(self.rundirrundir)
650  if self.component_outcomponent_out != sys.stdout:
651  try:
652  self.component_outcomponent_out.flush()
653  self.component_outcomponent_out.close()
654  except Exception as e:
655  logger.warn(e)
656 
657  if self.component_errcomponent_err != sys.stderr:
658  try:
659  self.component_errcomponent_err.flush()
660  self.component_errcomponent_err.close()
661  except Exception as e:
662  logger.warn(e)
663 
665  """!
666  Copy output files to output directory, handling ptags if necessary.
667  """
668 
669  if not os.path.exists(self.output_diroutput_dir):
670  logger.debug('Creating output dir: %s' % self.output_diroutput_dir)
671  os.makedirs(self.output_diroutput_dir, 0o755)
672 
673  for src, dest in self.output_filesoutput_files.items():
674  src_file = src
675  if Job.is_ptag(src):
676  ptag_src = Job.get_ptag_from_src(src)
677  if ptag_src in list(self.ptagsptags.keys()):
678  src_file = self.ptagsptags[ptag_src]
679  logger.info("Resolved ptag: {} -> {}".format(ptag_src, src_file))
680  else:
681  raise Exception('Undefined ptag used in job params: %s' % ptag_src)
682  self._copy_output_file_copy_output_file(src_file, dest)
683 
684  def _copy_output_file(self, src, dest):
685  """!
686  Copy an output file from src to dest.
687  """
688 
689  src_file = os.path.join(self.rundirrundir, src)
690  dest_file = os.path.join(self.output_diroutput_dir, dest)
691 
692  # Create directory if not exists; this allows relative path segments
693  # in output file strings.
694  if not os.path.exists(os.path.dirname(dest_file)):
695  os.makedirs(os.path.dirname(dest_file), 0o755)
696 
697  # Check if the file is already there and does not need copying (e.g. if running in local dir)
698  samefile = False
699  if os.path.exists(dest_file):
700  if os.path.samefile(src_file, dest_file):
701  samefile = True
702 
703  # If target file already exists then see if it can be deleted; otherwise raise an error
704  if os.path.isfile(dest_file):
705  if self.delete_existingdelete_existing:
706  logger.debug('Deleting existing file: %s' % dest_file)
707  os.remove(dest_file)
708  else:
709  raise Exception('Output file already exists: %s' % dest_file)
710 
711  # Copy the file to the destination dir if not already created by the job
712  logger.info("Copying '%s' to '%s'" % (src_file, dest_file))
713  if not samefile:
714  # shutil will throw and error if the copy obviously fails
715  shutil.copyfile(src_file, dest_file)
716  # take the time to double-check that the copy is identical to the original
717  # this catches any sneaky network-dropping related copy failures
718  if not filecmp.cmp(src_file, dest_file, shallow=False):
719  raise Exception("Copy from '%s' to '%s' failed." % (src_file, dest_file))
720  else:
721  logger.warning("Skipping copy of '%s' to '%s' because they are the same file!" % (src_file, dest_file))
722 
723  def _copy_input_files(self):
724  """!
725  Copy input files to the run dir.
726  """
727  for src, dest in self.input_filesinput_files.items():
728  # if not os.path.isabs(src):
729 
731  if os.path.dirname(dest):
732  raise Exception("The input file destination '%s' is not valid." % dest)
733  logger.info("Copying input file: %s -> %s" % (src, os.path.join(self.rundirrundir, dest)))
734  if '/mss/' in src:
735  src = src.replace('/mss/', '/cache/')
736  if os.path.exists(os.path.join(self.rundirrundir, dest)):
737  logger.info("The input file '%s' already exists at destination '%s'" % (dest, self.rundirrundir))
738  os.chmod(os.path.join(self.rundirrundir, dest), 0o666)
739  else:
740  shutil.copyfile(src, os.path.join(self.rundirrundir, dest))
741  os.chmod(dest, 0o666)
742 
744  """!
745  Symlink input files.
746  """
747  for src, dest in self.input_filesinput_files.items():
748  if not os.path.isabs(src):
749  raise Exception('The input source file is not an absolute path: %s' % src)
750  if os.path.dirname(dest):
751  raise Exception('The input file destination is not valid: %s' % dest)
752  logger.debug("Symlinking input '%s' to '%s'" % (src, os.path.join(self.rundirrundir, dest)))
753  os.symlink(src, os.path.join(self.rundirrundir, dest))
754 
755  def ptag(self, tag, filename):
756  """!
757  Map a key to an output file name so a user can reference it in their job params.
758  """
759  if tag not in list(self.ptagsptags.keys()):
760  self.ptagsptags[tag] = filename
761  logger.info("Added ptag %s -> %s" % (tag, filename))
762  else:
763  raise Exception('The ptag already exists: %s' % tag)
764 
765  @staticmethod
766  def is_ptag(src):
767  return src.startswith(Job.PTAG_PREFIX)
768 
769  @staticmethod
771  if src.startswith(Job.PTAG_PREFIX):
772  return src[len(Job.PTAG_PREFIX):]
773  else:
774  raise Exception('File src is not a ptag: %s' % src)
775 
776  def resolve_output_src(self, src):
777  if Job.is_ptag(src):
778  return self.ptagsptags[Job.get_ptag_from_src(src)]
779  else:
780  return src
781 
783  """!
784  Set fieldmap dir to install location if not provided in config
785  """
786  if self.hps_fieldmaps_dirhps_fieldmaps_dir is None:
787  self.hps_fieldmaps_dirhps_fieldmaps_dir = "{}/share/fieldmap".format(self.hpsmc_dirhpsmc_dir)
788  if not os.path.isdir(self.hps_fieldmaps_dirhps_fieldmaps_dir):
789  raise Exception("The fieldmaps dir does not exist: {}".format(self.hps_fieldmaps_dirhps_fieldmaps_dir))
790  logger.debug("Using fieldmap dir from install: {}".format(self.hps_fieldmaps_dirhps_fieldmaps_dir))
791  else:
792  logger.debug("Using fieldmap dir from config: {}".format(self.hps_fieldmaps_dirhps_fieldmaps_dir))
793 
795  """!
796  Symlink to the fieldmap directory
797  """
798  fieldmap_symlink = pathlib.Path(os.getcwd(), "fieldmap")
799  if not fieldmap_symlink.exists():
800  logger.debug("Creating symlink to fieldmap directory: {}".format(fieldmap_symlink))
801  os.symlink(self.hps_fieldmaps_dirhps_fieldmaps_dir, "fieldmap")
802  else:
803  if fieldmap_symlink.is_dir() or os.path.islink(fieldmap_symlink):
804  logger.debug("Fieldmap symlink or directory already exists: {}".format(fieldmap_symlink))
805  else:
806  raise Exception("A file called 'fieldmap' exists but it is not a symlink or directory!")
807 
808 
809 cmds = {
810  'run': 'Run a job script',
811  'script': 'Show list of available job scripts (provide script name for detailed info)',
812  'component': 'Show list of available components (provide component name for detailed info)'}
813 
814 
816  print("Usage: job.py [command] [args]")
817  print(" command:")
818  for name, descr in cmds.items():
819  print(" %s: %s" % (name, descr))
820 
821 
822 if __name__ == '__main__':
823  if len(sys.argv) > 1:
824  cmd = sys.argv[1]
825  if cmd not in list(cmds.keys()):
826  print_usage()
827  raise Exception('The job command is not valid: %s' % cmd)
828  args = sys.argv[2:]
829  if cmd == 'run':
830  job = Job(args)
831  job.parse_args()
832  job.run()
833  elif cmd == 'script':
834  if len(sys.argv) > 2:
835  script = sys.argv[2]
836  from hpsmc.help import print_job_script
837  print_job_script(script)
838  else:
839  scriptdb = JobScriptDatabase()
840  print("AVAILABLE JOB SCRIPTS: ")
841  for name in sorted(scriptdb.get_script_names()):
842  print(' %s: %s' % (name, scriptdb.get_script_path(name)))
843  elif cmd == 'component':
844  if len(sys.argv) > 2:
845  from hpsmc.help import print_component
846  component_name = sys.argv[2]
847  print_component(component_name)
848  else:
849  from hpsmc.help import print_components
851 
852  else:
853  print_usage()
Wrapper for accessing config information from parser.
Definition: job.py:29
def __str__(self)
Definition: job.py:35
def config(self, obj, section=None, required_names=[], allowed_names=[], require_section=True)
Push config into an object by setting an attribute.
Definition: job.py:44
def __init__(self)
Definition: job.py:32
Database of job scripts.
Definition: job.py:125
def get_script_path(self, name)
Get path to job script from job name.
Definition: job.py:140
def get_script_names(self)
Get list of all script names.
Definition: job.py:146
scripts
dict of paths to job scripts sorted by name
Definition: job.py:135
def exists(self, name)
Test if job script exists in dict.
Definition: job.py:154
def get_scripts(self)
Get dict containing paths to scripts sorted by script names.
Definition: job.py:150
Simple JSON based store of job data.
Definition: job.py:73
def __init__(self, path=None)
Definition: job.py:78
def load(self, json_store)
Load raw JSON data into this job store.
Definition: job.py:86
def get_job_data(self)
Get the raw dict containing all the job data.
Definition: job.py:112
def get_job(self, job_id)
Get a job by its job ID.
Definition: job.py:104
def has_job_id(self, job_id)
Return true if the job ID exists in the store.
Definition: job.py:120
def get_job_ids(self)
Get a sorted list of job IDs.
Definition: job.py:116
Primary class to run HPS jobs from a Python script.
Definition: job.py:160
def _configure(self)
Configure job class and components.
Definition: job.py:404
hps_fieldmaps_dir
fieldmaps dir
Definition: job.py:225
def _copy_input_files(self)
Copy input files to the run dir.
Definition: job.py:723
component_err
output for component error messages
Definition: job.py:219
def _config_fieldmap_dir(self)
Set fieldmap dir to install location if not provided in config.
Definition: job.py:782
enable_file_chaining
Definition: job.py:236
description
short description of job, should be overridden by the job script
Definition: job.py:197
def _cleanup(self)
Perform post-job cleanup.
Definition: job.py:635
input_files
dict of input files
Definition: job.py:211
def _config_file_pipeline(self)
Pipe component outputs to inputs automatically.
Definition: job.py:610
components
list of components in job
Definition: job.py:203
job_config
Job configuration.
Definition: job.py:195
def _load_script(self)
Load the job script.
Definition: job.py:432
params
dict of parameters
Definition: job.py:207
job_id
job ID
Definition: job.py:199
job_steps
job steps
Definition: job.py:223
param_file
path to parameter file
Definition: job.py:201
def _copy_output_files(self)
Copy output files to output directory, handling ptags if necessary.
Definition: job.py:664
script
script containing component initializations
Definition: job.py:221
check_commands
Definition: job.py:235
def add(self, component)
Public method for adding components to the job.
Definition: job.py:239
rundir
rundir is current working directory
Definition: job.py:205
def get_ptag_from_src(src)
Definition: job.py:770
def _set_input_files(self)
Prepare dictionary of input files.
Definition: job.py:365
def _setup(self)
Necessary setup before job can be executed.
Definition: job.py:581
def is_ptag(src)
Definition: job.py:766
output_files
dict of output files
Definition: job.py:213
args
(passed) job arguments
Definition: job.py:193
def _copy_output_file(self, src, dest)
Copy an output file from src to dest.
Definition: job.py:684
def _initialize(self)
Perform basic initialization before the job script is loaded.
Definition: job.py:383
def ptag(self, tag, filename)
Map a key to an output file name so a user can reference it in their job params.
Definition: job.py:755
component_out
output for component printouts
Definition: job.py:217
def resolve_output_src(self, src)
Definition: job.py:776
check_output_files
Definition: job.py:234
def __init__(self, args=sys.argv, **kwargs)
Definition: job.py:186
def parse_args(self)
Configure the job from command line arguments.
Definition: job.py:260
def _execute(self)
Execute all components in job.
Definition: job.py:538
enable_env_config
Definition: job.py:237
def _load_params(self, params)
Load the job parameters from JSON data.
Definition: job.py:341
def run(self)
This is the primary execution method for running the job.
Definition: job.py:461
delete_existing
Definition: job.py:230
def _set_parameters(self)
Push JSON job parameters to components.
Definition: job.py:628
def _symlink_fieldmap_dir(self)
Symlink to the fieldmap directory.
Definition: job.py:794
def _symlink_input_files(self)
Symlink input files.
Definition: job.py:743
def set_parameters(self, params)
Add parameters to the job, overriding values if they exist already.
Definition: job.py:248
delete_rundir
Definition: job.py:231
enable_copy_output_files
These attributes can all be set in the config file.
Definition: job.py:228
output_dir
output_dir is current working directory
Definition: job.py:209
ignore_return_codes
Definition: job.py:233
enable_copy_input_files
Definition: job.py:229
ptags
dict with keys to output filenames
Definition: job.py:215
def convert_config_value(val)
Convert config value to Python readable value.
Definition: _config.py:24
Utility script for printing help about component classes.
Definition: help.py:1
def print_component(v)
Accepts Component class and prints info about it.
Definition: help.py:14
def print_job_script(script_path)
Definition: help.py:62
def print_components()
Print info for all Component classes.
Definition: help.py:52
def print_usage()
Definition: job.py:815