HPS-MC
 
All Classes Namespaces Files Functions Variables Pages
Loading...
Searching...
No Matches
job.py
Go to the documentation of this file.
1"""! @package job
2Primary class for running and managing HPSMC jobs defined by a set of components.
3"""
4
5import os
6import sys
7import json
8import time
9import shutil
10import filecmp
11import argparse
12import getpass
13import logging
14import glob
15import subprocess
16import copy
17import pathlib
18
19from collections.abc import Sequence
20from os.path import expanduser
21
22from hpsmc._config import convert_config_value
23from hpsmc import global_config
24
25
26logger = logging.getLogger('hpsmc.job')
27
28
29class JobConfig(object):
30 """! Wrapper for accessing config information from parser."""
31
32 def __init__(self):
33 self.parser = copy.copy(global_config)
34
35 def __str__(self):
36 parser_lines = ['Job configuration:']
37 for section in self.parser.sections():
38 parser_lines.append("[" + section + "]")
39 for i, v in self.parser.items(section):
40 parser_lines.append("%s=%s" % (i, v))
41 parser_lines.append('')
42 return '\n'.join(parser_lines)
43
44 def config(self, obj, section=None, required_names=[], allowed_names=[], require_section=True):
45 """! Push config into an object by setting an attribute.
46 @param obj object config is pushed to
47 @param section object name (basically name of the component the config is pushed to?)
48 @param required_names list of names of required configurations
49 @param allowed_names list of allowed config names
50 """
51 if not section:
52 section = obj.__class__.__name__
53 if self.parser.has_section(section):
54 # Check that required settings are there.
55 for req in required_names:
56 if req not in dict(self.parser.items(section)):
57 raise Exception("Missing required config '%s'" % req)
58 # Push each config item into the object by setting attribute.
59 for name, value in self.parser.items(section):
60 if len(allowed_names) and name not in allowed_names:
61 raise Exception("Config name '%s' is not allowed for '%s'" % (name, section))
62 setattr(obj, name, convert_config_value(value))
63 # logger.info("%s:%s:%s=%s" % (obj.__class__.__name__,
64 # name,
65 # getattr(obj, name).__class__.__name__,
66 # getattr(obj, name)))
67 elif require_section:
68 raise Exception("Missing required config section '%s'" % section)
69 else:
70 logger.warning('Config section not found: %s' % section)
71
72
74 """!
75 Simple JSON based store of job data.
76 """
77
78 def __init__(self, path=None):
79 self.path = path
80 self.data = {}
81 if self.path:
82 self.load(self.path)
83 else:
84 logger.warning('Path was not provided to job store - no jobs loaded!')
85
86 def load(self, json_store):
87 """! Load raw JSON data into this job store.
88 @param json_store json file containing raw job data
89 """
90 if not os.path.exists(json_store):
91 raise Exception('JSON job store does not exist: {}'.format(json_store))
92 with open(json_store, 'r') as f:
93 json_data = json.loads(f.read())
94 for job_data in json_data:
95 if 'job_id' not in job_data:
96 raise Exception(f"Job data is missing a job id")
97 job_id = int(job_data['job_id'])
98 if job_id in self.data:
99 raise Exception(f"Duplicate job id: {job_id}")
100 self.data[job_id] = job_data
101 logger.debug("Loaded job {} data:\n {}".format(job_id, job_data))
102 logger.info(f"Successfully loaded {len(self.data)} jobs from: {json_store}")
103
104 def get_job(self, job_id):
105 """! Get a job by its job ID.
106 @param job_id job ID
107 @return job"""
108 if not self.has_job_id(job_id):
109 raise Exception(f"Job ID does not exist: {job_id}")
110 return self.data[int(job_id)]
111
112 def get_job_data(self):
113 """! Get the raw dict containing all the job data."""
114 return self.data
115
116 def get_job_ids(self):
117 """! Get a sorted list of job IDs."""
118 return sorted(self.data.keys())
119
120 def has_job_id(self, job_id):
121 """! Return true if the job ID exists in the store."""
122 return int(job_id) in list(self.data.keys())
123
124
126 """! Database of job scripts.
127 """
128
129 def __init__(self):
130 if 'HPSMC_DIR' not in os.environ:
131 raise Exception('HPSMC_DIR is not set in the environ.')
132 hps_mc_dir = os.environ['HPSMC_DIR']
133 script_dir = os.path.join(hps_mc_dir, 'lib', 'python', 'jobs')
134
135 self.scripts = {}
136 for f in glob.glob(os.path.join(script_dir, '*_job.py')):
137 script_name = os.path.splitext(os.path.basename(f))[0].replace('_job', '')
138 self.scripts[script_name] = f
139
140 def get_script_path(self, name):
141 """! Get path to job script from job name.
142 @param job name
143 @return path to job script"""
144 return self.scripts[name]
145
147 """! Get list of all script names."""
148 return list(self.scripts.keys())
149
150 def get_scripts(self):
151 """! Get dict containing paths to scripts sorted by script names."""
152 return self.scripts
153
154 def exists(self, name):
155 """! Test if job script exists in dict.
156 @return True if job name is key in job dict"""
157 return name in self.scripts
158
159
160class Job(object):
161 """!
162 Primary class to run HPS jobs from a Python script.
163
164 Jobs are run by executing a series of components
165 which are configured using a config file with parameters
166 provided by a JSON job file.
167 """
168
169
171 """
172 _config_names = ['enable_copy_output_files',
173 'enable_copy_input_files',
174 'enable_cleanup',
175 'delete_existing',
176 'delete_rundir',
177 'dry_run',
178 'ignore_return_codes',
179 'check_output_files',
180 'enable_file_chaining',
181 'enable_env_config']
182 """
183
184 # Prefix to indicate ptag in job param file.
185 PTAG_PREFIX = 'ptag:'
186
187 def __init__(self, args=sys.argv, **kwargs):
188
189 if 'HPSMC_DIR' not in os.environ:
190 raise Exception('HPSMC_DIR is not set in the environ.')
191 self.hpsmc_dir = os.environ['HPSMC_DIR']
192
193
194 self.args = args
195
197
198 self.description = "HPS MC Job"
199
200 self.job_id = None
201
202 self.param_file = None
203
204 self.components = []
205
206 self.rundir = os.getcwd()
207
208 self.params = {}
209
210 self.output_dir = os.getcwd()
211
212 self.input_files = {}
213
214 self.output_files = {}
215
216 self.ptags = {}
217
218 self.component_out = sys.stdout
219
220 self.component_err = sys.stderr
221
222 self.script = None
223
224 self.job_steps = None
225
227
228
231 self.enable_cleanup = True
232 self.delete_existing = False
233 self.delete_rundir = False
234 self.dry_run = False
237 self.check_commands = False
239 self.enable_env_config = False
240
241 def add(self, component):
242 """!
243 Public method for adding components to the job.
244 """
245 if isinstance(component, Sequence) and not isinstance(component, str):
246 self.components.extend(component)
247 else:
248 self.components.append(component)
249
250 def set_parameters(self, params):
251 """!
252 Add parameters to the job, overriding values if they exist already.
253
254 This method can be used in job scripts to define default values.
255 """
256 for k, v in params.items():
257 if k in self.params:
258 logger.debug("Setting new value '%s' for parameter '%s' with existing value '%s'."
259 % (str(v), str(k), params[k]))
260 self.params[k] = v
261
262 def parse_args(self):
263 """!
264 Configure the job from command line arguments.
265 """
266
267 parser = argparse.ArgumentParser(description=self.description)
268 parser.add_argument("-c", "--config-file", help="Config file locations", action='append')
269 parser.add_argument("-d", "--run-dir", nargs='?', help="Job run dir")
270 parser.add_argument("-o", "--out", nargs='?', help="File for component stdout (default prints to console)")
271 parser.add_argument("-e", "--err", nargs='?', help="File for component stderr (default prints to console)")
272 parser.add_argument("-s", "--job-steps", type=int, default=None, help="Job steps to run (single number)")
273 parser.add_argument("-i", "--job-id", type=int, help="Job ID from JSON job store", default=None)
274 parser.add_argument("script", nargs='?', help="Path or name of job script")
275 parser.add_argument("params", nargs='?', help="Job param file in JSON format")
276
277
283
284 cl = parser.parse_args(self.args)
285
286 # Read in job configuration files
287 if cl.config_file:
288 config_files = list(map(os.path.abspath, cl.config_file))
289 logger.info("Reading additional config from: {}".format(config_files))
290 self.job_config.parser.read(config_files)
291
292 # Set file for stdout from components
293 if cl.out:
294 out_file = cl.out
295 if not os.path.isabs(out_file):
296 out_file = os.path.abspath(out_file)
297 logger.info('Job output will be written to: %s' % out_file)
298 self.out = open(out_file, 'w')
299
300 # Set file for stderr from components
301 if cl.err:
302 err_file = cl.err
303 if not os.path.isabs(err_file):
304 err_file = os.path.abspath(err_file)
305 logger.info('Job error will be written to: %s' % err_file)
306 self.err = open(err_file, 'w')
307
308 if cl.run_dir:
309 self.rundir = os.path.abspath(cl.run_dir)
310
311 self.job_steps = cl.job_steps
312 if self.job_steps is not None and self.job_steps < 1:
313 raise Exception("Invalid job steps argument (must be > 0): {}".format(self.job_steps))
314
315 if cl.script:
316
317 self.script = cl.script
318 else:
319 raise Exception('Missing required script name or location.')
320
321 # Params are actually optional as some job scripts might not need them.
322 if cl.params:
323 self.param_file = os.path.abspath(cl.params)
324 params = {}
325 if cl.job_id:
326 # Load data from a job store containing multiple jobs.
327 self.job_id = cl.job_id
328 logger.debug("Loading job with ID %d from job store '%s'" % (self.job_id, self.param_file))
329 jobstore = JobStore(self.param_file)
330 if jobstore.has_job_id(self.job_id):
331 params = jobstore.get_job(self.job_id)
332 else:
333 raise Exception("No job id %d was found in the job store '%s'" % (self.job_id, self.param_file))
334 else:
335 # Load data from a JSON file with a single job definition.
336 logger.info('Loading job parameters from file: %s' % self.param_file)
337 params = json.loads(open(self.param_file, 'r').read())
338 if not isinstance(params, dict):
339 raise Exception('Job ID must be provided when running from a job store.')
340
341 self._load_params(params)
342
343 def _load_params(self, params):
344 """!
345 Load the job parameters from JSON data.
346 """
347
348 self.set_parameters(params)
349
350 # logger.info(json.dumps(self.params, indent=4, sort_keys=False))
351
352 if 'output_dir' in self.params:
353 self.output_dir = self.params['output_dir']
354 if not os.path.isabs(self.output_dir):
355 self.output_dir = os.path.abspath(self.output_dir)
356 logger.debug("Changed output dir to abs path: %s" % self.output_dir)
357
358 if 'job_id' in self.params:
359 self.job_id = self.params['job_id']
360
361 if 'input_files' in self.params:
362 self.input_files = self.params['input_files']
363
364 if 'output_files' in self.params:
365 self.output_files = self.params['output_files']
366
368 """!
369 Prepare dictionary of input files.
370
371 If a link to a download location is given as input, the file is downloaded into the run directory before the file name is added to the input_files dict. If a regular file is provided, it is added to the dict without any additional action.
372 """
373 input_files_dict = {}
374 for file_key, file_name in self.input_files.items():
375 if 'https' in file_key:
376 logger.info("Downloading input file from: %s" % file_key)
377 file_name_path = self.rundir + "/" + file_name
378 # \todo FIXME: We need to make sure wget is installed locally during the build or use a python lib like requests.
379 subprocess.check_output(['wget', '-q', '-O', file_name_path, file_key])
380 input_files_dict.update({file_name: file_name})
381 else:
382 input_files_dict.update({file_key: file_name})
383 self.input_files = input_files_dict
384
385 def _initialize(self):
386 """!
387 Perform basic initialization before the job script is loaded.
388 """
389
390 if not os.path.isabs(self.rundir):
391 self.rundir = os.path.abspath(self.rundir)
392 logger.info('Changed run dir to abs path: %s' % self.rundir)
393 # raise Exception('The run dir is not an absolute path: %s' % self.rundir)
394
395 # Set run dir if running inside LSF
396 if "LSB_JOBID" in os.environ:
397 self.rundir = os.path.join("/scratch", getpass.getuser(), os.environ["LSB_JOBID"])
398 logger.info('Set run dir for LSF: %s' % self.rundir)
399 self.delete_rundir = True
400
401 # Create run dir if it does not exist
402 if not os.path.exists(self.rundir):
403 logger.info('Creating run dir: %s' % self.rundir)
404 os.makedirs(self.rundir)
405
406 def _configure(self):
407 """!
408 Configure job class and components.
409 """
410
411 # Configure job class
412 self.job_config.config(self, require_section=False)
413
414 # Configure the location of the fieldmap files
416
417 # Configure each of the job components
418 for component in self.components:
419
420 # Configure logging for the component.
421 component.config_logging(self.job_config.parser)
422
423 # Configure the component from job configuration.
424 component.config(self.job_config.parser)
425
426
427 if self.enable_env_config:
428 # Configure from env vars, if enabled.
429 component.config_from_environ()
430
431 # Check that the config is acceptable.
432 component.check_config()
433
434 def _load_script(self):
435 """!
436 Load the job script.
437 """
438 # This might be okay if the user is manually adding components to a job for testing
439 # without the command line interface. If no components are added before the job is
440 # run, then this will be caught later.
441 if self.script is None:
442 logger.warning("No job script was provided!")
443 return
444
445 if not self.script.endswith('.py'):
446 # Script is a name.
447 script_db = JobScriptDatabase()
448 if not script_db.exists(self.script):
449 raise Exception("The script name is not valid: %s" % self.script)
450 script_path = script_db.get_script_path(self.script)
451 logger.debug("Found script '%s' from name '%s'" % (script_path, self.script))
452 else:
453 # Script is a path.
454 script_path = self.script
455
456 if not os.path.exists(script_path):
457 raise Exception('Job script does not exist: %s' % script_path)
458
459 logger.info('Loading job script: %s' % script_path)
460
461 exec(compile(open(script_path, "rb").read(), script_path, 'exec'), {'job': self})
462
463 def run(self):
464 """!
465 This is the primary execution method for running the job.
466 """
467
468 logger.info('Job ID: ' + str(self.job_id))
469 logger.info('Description: %s' % self.description)
470
471 # Print config to the log
472 logger.info(str(self.job_config))
473
474 # Initialize after CL parameters were parsed.
475 self._initialize()
476
477 # Load the job components from the script
478 self._load_script()
479
480 if not len(self.components):
481 raise Exception("Job has no components to execute.")
482
483 # Print list of job components
484 logger.info("Job components loaded: %s" % ([c.name for c in self.components]))
485
486 # Print job parameters.
487 if len(self.params) > 0:
488 logger.info("Job parameters loaded: %s" % str(self.params))
489 else:
490 logger.info("No job parameters were specified!")
491
492 # This will configure the Job class and its components by copying
493 # information into them from loaded config files.
494 self._configure()
495
496 # This (re)sets the input correctly
497 self._set_input_files()
498
499 # Set component parameters from job JSON file.
500 self._set_parameters()
501
502 # Perform component setup to prepare for execution.
503 # May use config and parameters that were set from above.
504 self._setup()
505
506 if not self.dry_run:
508 # Copy input files to the run dir.
509 self._copy_input_files()
510 else:
511 # Symlink input files if copying is disabled.
513
514 # Save job start time
515 start_time = time.time()
516
517 # Execute the job.
518 self._execute()
519
520 # Copy the output files to the output dir if enabled and not in dry run.
521 if not self.dry_run:
522
523 # Print job timer info
524 stop_time = time.time()
525 elapsed = stop_time - start_time
526 logger.info("Job execution took {} seconds".format(round(elapsed, 4)))
527
528 # Copy by file path or ptag
530
531 self._copy_output_files()
532 else:
533 logger.warning('Copy output files is disabled!')
534
535 # Perform job cleanup.
536 if self.enable_cleanup:
537 self._cleanup()
538
539 logger.info('Successfully finished running job: %s' % self.description)
540
541 def _execute(self):
542 """!
543 Execute all components in job.
544
545 If dry_run is set to True, the components will not be exectuted,
546 list of components will be put out instead.
547 """
548 if not self.dry_run:
549
550 for component in self.components:
551
552 logger.info("Executing '%s' with command: %s" % (component.name, component.cmd_line_str()))
553 # logger.info("Component IO: {} -> {}".format(str(component.input_files(), component.output_files())))
554
555 # Print header to stdout
556 self.component_out.write('================ Component: %s ================\n' % component.name)
557 self.component_out.flush()
558
559 # Print header to stderr if output is going to a file
560 if self.component_err != sys.stderr:
561 self.component_err.write('================ Component: %s ================\n' % component.name)
562 self.component_err.flush()
563
564 start = time.time()
565 returncode = component.execute(self.component_out, self.component_err)
566 end = time.time()
567 elapsed = end - start
568 logger.info("Execution of {} took {} second(s) with return code: {}"
569 .format(component.name, round(elapsed, 4), str(returncode)))
570
571 if not self.ignore_return_codes and returncode:
572 raise Exception("Non-zero return code %d from '%s'" % (returncode, component.name))
573
574 if self.check_output_files:
575 for outputfile in component.output_files():
576 if not os.path.isfile(outputfile):
577 raise Exception("Output file '%s' is missing after execution." % outputfile)
578 else:
579 # Dry run mode. Just print component command but do not execute it.
580 logger.info("Dry run enabled. Components will NOT be executed!")
581 for component in self.components:
582 logger.info("'%s' with args: %s (DRY RUN)" % (component.name, ' '.join(component.cmd_args())))
583
584 def _setup(self):
585 """!
586 Necessary setup before job can be executed.
587 """
588
589 # Change to run dir
590 logger.info('Changing to run dir: %s' % self.rundir)
591 os.chdir(self.rundir)
592
593 # Create a symlink to the fieldmap directory
595
596 # Limit components according to job steps
597 if self.job_steps is not None:
598 if self.job_steps > 0:
599 self.components = self.components[0:self.job_steps]
600 logger.info("Job is limited to first %d steps." % self.job_steps)
601
602 if self.enable_file_chaining:
604
605 # Run setup methods of each component
606 for component in self.components:
607 logger.debug('Setting up component: %s' % (component.name))
608 component.rundir = self.rundir
609 component.setup()
610 if self.check_commands and not component.cmd_exists():
611 raise Exception("Command '%s' does not exist for '%s'." % (component.command, component.name))
612
614 """!
615 Pipe component outputs to inputs automatically.
616 """
617 for i in range(0, len(self.components)):
618 component = self.components[i]
619 logger.debug("Configuring file IO for component '%s' with order %d" % (component. name, i))
620 if i == 0:
621 logger.debug("Setting inputs on '%s' to: %s"
622 % (component.name, str(list(self.input_files.values()))))
623 if not len(component.inputs):
624 component.inputs = list(self.input_files.values())
625 elif i > -1:
626 logger.debug("Setting inputs on '%s' to: %s"
627 % (component.name, str(self.components[i - 1].output_files())))
628 if len(component.inputs) == 0:
629 component.inputs = self.components[i - 1].output_files()
630
632 """!
633 Push JSON job parameters to components.
634 """
635 for component in self.components:
636 component.set_parameters(self.params)
637
638 def _cleanup(self):
639 """!
640 Perform post-job cleanup.
641 """
642 for component in self.components:
643 logger.info('Running cleanup for component: %s' % str(component.name))
644 component.cleanup()
645 if self.delete_rundir:
646 logger.info('Deleting run dir: %s' % self.rundir)
647 if os.path.exists("%s/__swif_env__" % self.rundir):
648 for f in os.listdir(self.rundir):
649 if ('.log' not in f) and ('__swif_' not in f):
650 os.system('rm -r %s' % f)
651 else:
652 shutil.rmtree(self.rundir)
653 if self.component_out != sys.stdout:
654 try:
655 self.component_out.flush()
656 self.component_out.close()
657 except Exception as e:
658 logger.warn(e)
659
660 if self.component_err != sys.stderr:
661 try:
662 self.component_err.flush()
663 self.component_err.close()
664 except Exception as e:
665 logger.warn(e)
666
668 """!
669 Copy output files to output directory, handling ptags if necessary.
670 """
671
672 if not os.path.exists(self.output_dir):
673 logger.debug('Creating output dir: %s' % self.output_dir)
674 os.makedirs(self.output_dir, 0o755)
675
676 for src, dest in self.output_files.items():
677 src_file = src
678 if Job.is_ptag(src):
679 ptag_src = Job.get_ptag_from_src(src)
680 if ptag_src in list(self.ptags.keys()):
681 src_file = self.ptags[ptag_src]
682 logger.info("Resolved ptag: {} -> {}".format(ptag_src, src_file))
683 else:
684 raise Exception('Undefined ptag used in job params: %s' % ptag_src)
685 self._copy_output_file(src_file, dest)
686
687 def _copy_output_file(self, src, dest):
688 """!
689 Copy an output file from src to dest.
690 """
691
692 src_file = os.path.join(self.rundir, src)
693 dest_file = os.path.join(self.output_dir, dest)
694
695 # Create directory if not exists; this allows relative path segments
696 # in output file strings.
697 if not os.path.exists(os.path.dirname(dest_file)):
698 os.makedirs(os.path.dirname(dest_file), 0o755)
699
700 # Check if the file is already there and does not need copying (e.g. if running in local dir)
701 samefile = False
702 if os.path.exists(dest_file):
703 if os.path.samefile(src_file, dest_file):
704 samefile = True
705
706 # If target file already exists then see if it can be deleted; otherwise raise an error
707 if os.path.isfile(dest_file):
708 if self.delete_existing:
709 logger.debug('Deleting existing file: %s' % dest_file)
710 os.remove(dest_file)
711 else:
712 raise Exception('Output file already exists: %s' % dest_file)
713
714 # Copy the file to the destination dir if not already created by the job
715 logger.info("Copying '%s' to '%s'" % (src_file, dest_file))
716 if not samefile:
717 # shutil will throw and error if the copy obviously fails
718 shutil.copyfile(src_file, dest_file)
719 # take the time to double-check that the copy is identical to the original
720 # this catches any sneaky network-dropping related copy failures
721 if not filecmp.cmp(src_file, dest_file, shallow=False):
722 raise Exception("Copy from '%s' to '%s' failed." % (src_file, dest_file))
723 else:
724 logger.warning("Skipping copy of '%s' to '%s' because they are the same file!" % (src_file, dest_file))
725
727 """!
728 Copy input files to the run dir.
729 """
730 for src, dest in self.input_files.items():
731 # if not os.path.isabs(src):
732
734 if os.path.dirname(dest):
735 raise Exception("The input file destination '%s' is not valid." % dest)
736 logger.info("Copying input file: %s -> %s" % (src, os.path.join(self.rundir, dest)))
737 if '/mss/' in src:
738 src = src.replace('/mss/', '/cache/')
739 if os.path.exists(os.path.join(self.rundir, dest)):
740 logger.info("The input file '%s' already exists at destination '%s'" % (dest, self.rundir))
741 os.chmod(os.path.join(self.rundir, dest), 0o666)
742 else:
743 shutil.copyfile(src, os.path.join(self.rundir, dest))
744 os.chmod(dest, 0o666)
745
747 """!
748 Symlink input files.
749 """
750 for src, dest in self.input_files.items():
751 if not os.path.isabs(src):
752 raise Exception('The input source file is not an absolute path: %s' % src)
753 if os.path.dirname(dest):
754 raise Exception('The input file destination is not valid: %s' % dest)
755 logger.debug("Symlinking input '%s' to '%s'" % (src, os.path.join(self.rundir, dest)))
756 os.symlink(src, os.path.join(self.rundir, dest))
757
758 def ptag(self, tag, filename):
759 """!
760 Map a key to an output file name so a user can reference it in their job params.
761 """
762 if tag not in list(self.ptags.keys()):
763 self.ptags[tag] = filename
764 logger.info("Added ptag %s -> %s" % (tag, filename))
765 else:
766 raise Exception('The ptag already exists: %s' % tag)
767
768 @staticmethod
769 def is_ptag(src):
770 return src.startswith(Job.PTAG_PREFIX)
771
772 @staticmethod
774 if src.startswith(Job.PTAG_PREFIX):
775 return src[len(Job.PTAG_PREFIX):]
776 else:
777 raise Exception('File src is not a ptag: %s' % src)
778
779 def resolve_output_src(self, src):
780 if Job.is_ptag(src):
781 return self.ptags[Job.get_ptag_from_src(src)]
782 else:
783 return src
784
786 """!
787 Set fieldmap dir to install location if not provided in config
788 """
789 if self.hps_fieldmaps_dir is None:
790 self.hps_fieldmaps_dir = "{}/share/fieldmap".format(self.hpsmc_dir)
791 if not os.path.isdir(self.hps_fieldmaps_dir):
792 raise Exception("The fieldmaps dir does not exist: {}".format(self.hps_fieldmaps_dir))
793 logger.debug("Using fieldmap dir from install: {}".format(self.hps_fieldmaps_dir))
794 else:
795 logger.debug("Using fieldmap dir from config: {}".format(self.hps_fieldmaps_dir))
796
798 """!
799 Symlink to the fieldmap directory
800 """
801 fieldmap_symlink = pathlib.Path(os.getcwd(), "fieldmap")
802 if not fieldmap_symlink.exists():
803 logger.debug("Creating symlink to fieldmap directory: {}".format(fieldmap_symlink))
804 os.symlink(self.hps_fieldmaps_dir, "fieldmap")
805 else:
806 if fieldmap_symlink.is_dir() or os.path.islink(fieldmap_symlink):
807 logger.debug("Fieldmap symlink or directory already exists: {}".format(fieldmap_symlink))
808 else:
809 raise Exception("A file called 'fieldmap' exists but it is not a symlink or directory!")
810
811
812cmds = {
813 'run': 'Run a job script',
814 'script': 'Show list of available job scripts (provide script name for detailed info)',
815 'component': 'Show list of available components (provide component name for detailed info)'}
816
817
819 print("Usage: job.py [command] [args]")
820 print(" command:")
821 for name, descr in cmds.items():
822 print(" %s: %s" % (name, descr))
823
824
825if __name__ == '__main__':
826 if len(sys.argv) > 1:
827 cmd = sys.argv[1]
828 if cmd not in list(cmds.keys()):
830 raise Exception('The job command is not valid: %s' % cmd)
831 args = sys.argv[2:]
832 if cmd == 'run':
833 job = Job(args)
834 job.parse_args()
835 job.run()
836 elif cmd == 'script':
837 if len(sys.argv) > 2:
838 script = sys.argv[2]
839 from hpsmc.help import print_job_script
840 print_job_script(script)
841 else:
842 scriptdb = JobScriptDatabase()
843 print("AVAILABLE JOB SCRIPTS: ")
844 for name in sorted(scriptdb.get_script_names()):
845 print(' %s: %s' % (name, scriptdb.get_script_path(name)))
846 elif cmd == 'component':
847 if len(sys.argv) > 2:
848 from hpsmc.help import print_component
849 component_name = sys.argv[2]
850 print_component(component_name)
851 else:
852 from hpsmc.help import print_components
853 print_components()
854
855 else:
Wrapper for accessing config information from parser.
Definition job.py:29
__str__(self)
Definition job.py:35
__init__(self)
Definition job.py:32
config(self, obj, section=None, required_names=[], allowed_names=[], require_section=True)
Push config into an object by setting an attribute.
Definition job.py:44
Database of job scripts.
Definition job.py:125
exists(self, name)
Test if job script exists in dict.
Definition job.py:154
get_scripts(self)
Get dict containing paths to scripts sorted by script names.
Definition job.py:150
scripts
dict of paths to job scripts sorted by name
Definition job.py:135
get_script_path(self, name)
Get path to job script from job name.
Definition job.py:140
get_script_names(self)
Get list of all script names.
Definition job.py:146
Simple JSON based store of job data.
Definition job.py:73
__init__(self, path=None)
Definition job.py:78
load(self, json_store)
Load raw JSON data into this job store.
Definition job.py:86
has_job_id(self, job_id)
Return true if the job ID exists in the store.
Definition job.py:120
get_job_ids(self)
Get a sorted list of job IDs.
Definition job.py:116
get_job_data(self)
Get the raw dict containing all the job data.
Definition job.py:112
get_job(self, job_id)
Get a job by its job ID.
Definition job.py:104
Primary class to run HPS jobs from a Python script.
Definition job.py:160
hps_fieldmaps_dir
fieldmaps dir
Definition job.py:226
run(self)
This is the primary execution method for running the job.
Definition job.py:463
parse_args(self)
Configure the job from command line arguments.
Definition job.py:262
_copy_output_file(self, src, dest)
Copy an output file from src to dest.
Definition job.py:687
component_err
output for component error messages
Definition job.py:220
_copy_output_files(self)
Copy output files to output directory, handling ptags if necessary.
Definition job.py:667
enable_file_chaining
Definition job.py:238
description
short description of job, should be overridden by the job script
Definition job.py:198
input_files
dict of input files
Definition job.py:212
_config_fieldmap_dir(self)
Set fieldmap dir to install location if not provided in config.
Definition job.py:785
components
list of components in job
Definition job.py:204
add(self, component)
Public method for adding components to the job.
Definition job.py:241
resolve_output_src(self, src)
Definition job.py:779
job_config
Job configuration.
Definition job.py:196
params
dict of parameters
Definition job.py:208
job_id
job ID
Definition job.py:200
job_steps
job steps
Definition job.py:224
_config_file_pipeline(self)
Pipe component outputs to inputs automatically.
Definition job.py:613
_symlink_fieldmap_dir(self)
Symlink to the fieldmap directory.
Definition job.py:797
param_file
path to parameter file
Definition job.py:202
script
script containing component initializations
Definition job.py:222
_load_script(self)
Load the job script.
Definition job.py:434
_copy_input_files(self)
Copy input files to the run dir.
Definition job.py:726
_cleanup(self)
Perform post-job cleanup.
Definition job.py:638
rundir
rundir is current working directory
Definition job.py:206
_setup(self)
Necessary setup before job can be executed.
Definition job.py:584
_initialize(self)
Perform basic initialization before the job script is loaded.
Definition job.py:385
get_ptag_from_src(src)
Definition job.py:773
output_files
dict of output files
Definition job.py:214
set_parameters(self, params)
Add parameters to the job, overriding values if they exist already.
Definition job.py:250
__init__(self, args=sys.argv, **kwargs)
Definition job.py:187
args
(passed) job arguments
Definition job.py:194
component_out
output for component printouts
Definition job.py:218
check_output_files
Definition job.py:236
_load_params(self, params)
Load the job parameters from JSON data.
Definition job.py:343
enable_env_config
Definition job.py:239
ptag(self, tag, filename)
Map a key to an output file name so a user can reference it in their job params.
Definition job.py:758
_execute(self)
Execute all components in job.
Definition job.py:541
delete_existing
Definition job.py:232
_set_parameters(self)
Push JSON job parameters to components.
Definition job.py:631
_configure(self)
Configure job class and components.
Definition job.py:406
enable_copy_output_files
These attributes can all be set in the config file.
Definition job.py:229
is_ptag(src)
Definition job.py:769
_symlink_input_files(self)
Symlink input files.
Definition job.py:746
output_dir
output_dir is current working directory
Definition job.py:210
_set_input_files(self)
Prepare dictionary of input files.
Definition job.py:367
ignore_return_codes
Definition job.py:235
enable_copy_input_files
Definition job.py:230
ptags
dict with keys to output filenames
Definition job.py:216
Utility script for printing help about component classes.
Definition help.py:1
print_usage()
Definition job.py:818