HPS-MC
 
Loading...
Searching...
No Matches
job.py
Go to the documentation of this file.
1"""! @package job
2Primary class for running and managing HPSMC jobs defined by a set of components.
3"""
4
5import os
6import sys
7import json
8import time
9import shutil
10import filecmp
11import argparse
12import getpass
13import logging
14import glob
15import subprocess
16import copy
17import pathlib
18
19from collections.abc import Sequence
20from os.path import expanduser
21
22from hpsmc._config import convert_config_value
23from hpsmc import global_config
24
25
26logger = logging.getLogger('hpsmc.job')
27
28
29class JobConfig(object):
30 """! Wrapper for accessing config information from parser."""
31
32 def __init__(self):
33 self.parser = copy.copy(global_config)
34
35 def __str__(self):
36 parser_lines = ['Job configuration:']
37 for section in self.parser.sections():
38 parser_lines.append("[" + section + "]")
39 for i, v in self.parser.items(section):
40 parser_lines.append("%s=%s" % (i, v))
41 parser_lines.append('')
42 return '\n'.join(parser_lines)
43
44 def config(self, obj, section=None, required_names=[], allowed_names=[], require_section=True):
45 """! Push config into an object by setting an attribute.
46 @param obj object config is pushed to
47 @param section object name (basically name of the component the config is pushed to?)
48 @param required_names list of names of required configurations
49 @param allowed_names list of allowed config names
50 """
51 if not section:
52 section = obj.__class__.__name__
53 if self.parser.has_section(section):
54 # Check that required settings are there.
55 for req in required_names:
56 if req not in dict(self.parser.items(section)):
57 raise Exception("Missing required config '%s'" % req)
58 # Push each config item into the object by setting attribute.
59 for name, value in self.parser.items(section):
60 if len(allowed_names) and name not in allowed_names:
61 raise Exception("Config name '%s' is not allowed for '%s'" % (name, section))
62 setattr(obj, name, convert_config_value(value))
63 # logger.info("%s:%s:%s=%s" % (obj.__class__.__name__,
64 # name,
65 # getattr(obj, name).__class__.__name__,
66 # getattr(obj, name)))
67 elif require_section:
68 raise Exception("Missing required config section '%s'" % section)
69 else:
70 logger.warning('Config section not found: %s' % section)
71
72
74 """!
75 Simple JSON based store of job data.
76 """
77
78 def __init__(self, path=None):
79 self.path = path
80 self.data = {}
81 if self.path:
82 self.load(self.path)
83 else:
84 logger.warning('Path was not provided to job store - no jobs loaded!')
85
86 def load(self, json_store):
87 """! Load raw JSON data into this job store.
88 @param json_store json file containing raw job data
89 """
90 if not os.path.exists(json_store):
91 raise Exception('JSON job store does not exist: {}'.format(json_store))
92 with open(json_store, 'r') as f:
93 json_data = json.loads(f.read())
94 for job_data in json_data:
95 if 'job_id' not in job_data:
96 raise Exception(f"Job data is missing a job id")
97 job_id = int(job_data['job_id'])
98 if job_id in self.data:
99 raise Exception(f"Duplicate job id: {job_id}")
100 self.data[job_id] = job_data
101 logger.debug("Loaded job {} data:\n {}".format(job_id, job_data))
102 logger.info(f"Successfully loaded {len(self.data)} jobs from: {json_store}")
103
104 def get_job(self, job_id):
105 """! Get a job by its job ID.
106 @param job_id job ID
107 @return job"""
108 if not self.has_job_id(job_id):
109 raise Exception(f"Job ID does not exist: {job_id}")
110 return self.data[int(job_id)]
111
112 def get_job_data(self):
113 """! Get the raw dict containing all the job data."""
114 return self.data
115
116 def get_job_ids(self):
117 """! Get a sorted list of job IDs."""
118 return sorted(self.data.keys())
119
120 def has_job_id(self, job_id):
121 """! Return true if the job ID exists in the store."""
122 return int(job_id) in list(self.data.keys())
123
124
126 """! Database of job scripts.
127 """
128
129 def __init__(self):
130 if 'HPSMC_DIR' not in os.environ:
131 raise Exception('HPSMC_DIR is not set in the environ.')
132 hps_mc_dir = os.environ['HPSMC_DIR']
133 script_dir = os.path.join(hps_mc_dir, 'lib', 'python', 'jobs')
134
135 self.scripts = {}
136 for f in glob.glob(os.path.join(script_dir, '*_job.py')):
137 script_name = os.path.splitext(os.path.basename(f))[0].replace('_job', '')
138 self.scripts[script_name] = f
139
140 def get_script_path(self, name):
141 """! Get path to job script from job name.
142 @param job name
143 @return path to job script"""
144 return self.scripts[name]
145
147 """! Get list of all script names."""
148 return list(self.scripts.keys())
149
150 def get_scripts(self):
151 """! Get dict containing paths to scripts sorted by script names."""
152 return self.scripts
153
154 def exists(self, name):
155 """! Test if job script exists in dict.
156 @return True if job name is key in job dict"""
157 return name in self.scripts
158
159
160class Job(object):
161 """!
162 Primary class to run HPS jobs from a Python script.
163
164 Jobs are run by executing a series of components
165 which are configured using a config file with parameters
166 provided by a JSON job file.
167 """
168
169
171 """
172 _config_names = ['enable_copy_output_files',
173 'enable_copy_input_files',
174 'enable_cleanup',
175 'delete_existing',
176 'delete_rundir',
177 'dry_run',
178 'ignore_return_codes',
179 'check_output_files',
180 'enable_file_chaining',
181 'enable_env_config']
182 """
183
184 # Prefix to indicate ptag in job param file.
185 PTAG_PREFIX = 'ptag:'
186
187 def __init__(self, args=sys.argv, **kwargs):
188
189 if 'HPSMC_DIR' not in os.environ:
190 raise Exception('HPSMC_DIR is not set in the environ.')
191 self.hpsmc_dir = os.environ['HPSMC_DIR']
192
193
194 self.args = args
195
197
198 self.description = "HPS MC Job"
199
200 self.job_id = None
201
202 self.param_file = None
203
204 self.components = []
205
206 self.rundir = os.getcwd()
207
208 self.params = {}
209
210 self.output_dir = os.getcwd()
211
212 self.input_files = {}
213
214 self.output_files = {}
215
216 self.ptags = {}
217
218 self.component_out = sys.stdout
219
220 self.component_err = sys.stderr
221
222 self.script = None
223
224 self.job_steps = None
225
227
228
231 self.enable_cleanup = True
232 self.delete_existing = False
233 self.delete_rundir = False
234 self.dry_run = False
237 self.check_commands = False
239 self.enable_env_config = False
240
241 def add(self, component):
242 """!
243 Public method for adding components to the job.
244 """
245 if isinstance(component, Sequence) and not isinstance(component, str):
246 self.components.extend(component)
247 else:
248 self.components.append(component)
249
250 def set_parameters(self, params):
251 """!
252 Add parameters to the job, overriding values if they exist already.
253
254 This method can be used in job scripts to define default values.
255 """
256 for k, v in params.items():
257 if k in self.params:
258 logger.debug("Setting new value '%s' for parameter '%s' with existing value '%s'."
259 % (str(v), str(k), params[k]))
260 self.params[k] = v
261
262 def parse_args(self):
263 """!
264 Configure the job from command line arguments.
265 """
266
267 parser = argparse.ArgumentParser(description=self.description)
268 parser.add_argument("-c", "--config-file", help="Config file locations", action='append')
269 parser.add_argument("-d", "--run-dir", nargs='?', help="Job run dir")
270 parser.add_argument("-o", "--out", nargs='?', help="File for component stdout (default prints to console)")
271 parser.add_argument("-e", "--err", nargs='?', help="File for component stderr (default prints to console)")
272 parser.add_argument("-s", "--job-steps", type=int, default=None, help="Job steps to run (single number)")
273 parser.add_argument("-i", "--job-id", type=int, help="Job ID from JSON job store", default=None)
274 parser.add_argument("script", nargs='?', help="Path or name of job script")
275 parser.add_argument("params", nargs='?', help="Job param file in JSON format")
276
277
283
284 cl = parser.parse_args(self.args)
285
286 # Read in job configuration files
287 if cl.config_file:
288 config_files = list(map(os.path.abspath, cl.config_file))
289 logger.info("Reading additional config from: {}".format(config_files))
290 self.job_config.parser.read(config_files)
291
292 # Set file for stdout from components
293 if cl.out:
294 out_file = cl.out
295 if not os.path.isabs(out_file):
296 out_file = os.path.abspath(out_file)
297 logger.info('Job output will be written to: %s' % out_file)
298 self.component_out = open(out_file, 'w')
299
300 # Set file for stderr from components
301 if cl.err:
302 err_file = cl.err
303 if not os.path.isabs(err_file):
304 err_file = os.path.abspath(err_file)
305 logger.info('Job error will be written to: %s' % err_file)
306 self.component_err = open(err_file, 'w')
307
308 if cl.run_dir:
309 self.rundir = os.path.abspath(cl.run_dir)
310
311 self.job_steps = cl.job_steps
312 if self.job_steps is not None and self.job_steps < 1:
313 raise Exception("Invalid job steps argument (must be > 0): {}".format(self.job_steps))
314
315 if cl.script:
316
317 self.script = cl.script
318 else:
319 raise Exception('Missing required script name or location.')
320
321 # Params are actually optional as some job scripts might not need them.
322 if cl.params:
323 self.param_file = os.path.abspath(cl.params)
324 params = {}
325 if cl.job_id is not None:
326 # Load data from a job store containing multiple jobs.
327 self.job_id = cl.job_id
328 logger.debug("Loading job with ID %d from job store '%s'" % (self.job_id, self.param_file))
329 jobstore = JobStore(self.param_file)
330 if jobstore.has_job_id(self.job_id):
331 params = jobstore.get_job(self.job_id)
332 else:
333 raise Exception("No job id %d was found in the job store '%s'" % (self.job_id, self.param_file))
334 else:
335 # Load data from a JSON file with a single job definition.
336 logger.info('Loading job parameters from file: %s' % self.param_file)
337 params = json.loads(open(self.param_file, 'r').read())
338 if isinstance(params, list):
339 params = params[0]
340 if not isinstance(params, dict):
341 raise Exception('Job ID must be provided when running from a job store.')
342
343 self._load_params(params)
344
345 def _load_params(self, params):
346 """!
347 Load the job parameters from JSON data.
348 """
349
350 self.set_parameters(params)
351
352 # logger.info(json.dumps(self.params, indent=4, sort_keys=False))
353
354 if 'output_dir' in self.params:
355 self.output_dir = self.params['output_dir']
356 if not os.path.isabs(self.output_dir):
357 self.output_dir = os.path.abspath(self.output_dir)
358 logger.debug("Changed output dir to abs path: %s" % self.output_dir)
359
360 if 'job_id' in self.params:
361 self.job_id = self.params['job_id']
362
363 if 'input_files' in self.params:
364 self.input_files = self.params['input_files']
365
366 if 'output_files' in self.params:
367 self.output_files = self.params['output_files']
368
370 """!
371 Prepare dictionary of input files.
372
373 If a link to a download location is given as input, the file is downloaded into the run directory before the file name is added to the input_files dict. If a regular file is provided, it is added to the dict without any additional action.
374 """
375 input_files_dict = {}
376 for file_key, file_name in self.input_files.items():
377 if 'https' in file_key:
378 logger.info("Downloading input file from: %s" % file_key)
379 file_name_path = self.rundir + "/" + file_name
380 # \todo FIXME: We need to make sure wget is installed locally during the build or use a python lib like requests.
381 subprocess.check_output(['wget', '-q', '-O', file_name_path, file_key])
382 input_files_dict.update({file_name: file_name})
383 else:
384 input_files_dict.update({file_key: file_name})
385 self.input_files = input_files_dict
386
387 def _initialize(self):
388 """!
389 Perform basic initialization before the job script is loaded.
390 """
391
392 if not os.path.isabs(self.rundir):
393 self.rundir = os.path.abspath(self.rundir)
394 logger.info('Changed run dir to abs path: %s' % self.rundir)
395 # raise Exception('The run dir is not an absolute path: %s' % self.rundir)
396
397 # Set run dir if running inside LSF
398 if "LSB_JOBID" in os.environ:
399 self.rundir = os.path.join("/scratch", getpass.getuser(), os.environ["LSB_JOBID"])
400 logger.info('Set run dir for LSF: %s' % self.rundir)
401 self.delete_rundir = True
402
403 # Create run dir if it does not exist
404 if not os.path.exists(self.rundir):
405 logger.info('Creating run dir: %s' % self.rundir)
406 os.makedirs(self.rundir)
407
408 def _configure(self):
409 """!
410 Configure job class and components.
411 """
412
413 # Configure job class
414 self.job_config.config(self, require_section=False)
415
416 # Configure the location of the fieldmap files
418
419 # Configure each of the job components
420 for component in self.components:
421
422 # Configure logging for the component.
423 component.config_logging(self.job_config.parser)
424
425 # Configure the component from job configuration.
426 component.config(self.job_config.parser)
427
428
429 if self.enable_env_config:
430 # Configure from env vars, if enabled.
431 component.config_from_environ()
432
433 # Check that the config is acceptable.
434 component.check_config()
435
436 def _load_script(self):
437 """!
438 Load the job script.
439 """
440 # This might be okay if the user is manually adding components to a job for testing
441 # without the command line interface. If no components are added before the job is
442 # run, then this will be caught later.
443 if self.script is None:
444 logger.warning("No job script was provided!")
445 return
446
447 if not self.script.endswith('.py'):
448 # Script is a name.
449 script_db = JobScriptDatabase()
450 if not script_db.exists(self.script):
451 raise Exception("The script name is not valid: %s" % self.script)
452 script_path = script_db.get_script_path(self.script)
453 logger.debug("Found script '%s' from name '%s'" % (script_path, self.script))
454 else:
455 # Script is a path.
456 script_path = self.script
457
458 if not os.path.exists(script_path):
459 raise Exception('Job script does not exist: %s' % script_path)
460
461 logger.info('Loading job script: %s' % script_path)
462
463 exec(compile(open(script_path, "rb").read(), script_path, 'exec'), {'job': self})
464
465 def run(self):
466 """!
467 This is the primary execution method for running the job.
468 """
469
470 logger.info('Job ID: ' + str(self.job_id))
471 logger.info('Description: %s' % self.description)
472
473 # Print config to the log
474 logger.info(str(self.job_config))
475
476 # Initialize after CL parameters were parsed.
477 self._initialize()
478
479 # Load the job components from the script
480 self._load_script()
481
482 if not len(self.components):
483 raise Exception("Job has no components to execute.")
484
485 # Print list of job components
486 logger.info("Job components loaded: %s" % ([c.name for c in self.components]))
487
488 # Print job parameters.
489 if len(self.params) > 0:
490 logger.info("Job parameters loaded: %s" % str(self.params))
491 else:
492 logger.info("No job parameters were specified!")
493
494 # This will configure the Job class and its components by copying
495 # information into them from loaded config files.
496 self._configure()
497
498 # This (re)sets the input correctly
499 self._set_input_files()
500
501 # Set component parameters from job JSON file.
502 self._set_parameters()
503
504 # Perform component setup to prepare for execution.
505 # May use config and parameters that were set from above.
506 self._setup()
507
508 if not self.dry_run:
510 # Copy input files to the run dir.
511 self._copy_input_files()
512 else:
513 # Symlink input files if copying is disabled.
515
516 # Save job start time
517 start_time = time.time()
518
519 # Execute the job.
520 self._execute()
521
522 # Copy the output files to the output dir if enabled and not in dry run.
523 if not self.dry_run:
524
525 # Print job timer info
526 stop_time = time.time()
527 elapsed = stop_time - start_time
528 logger.info("Job execution took {} seconds".format(round(elapsed, 4)))
529
530 # Copy by file path or ptag
532
533 self._copy_output_files()
534 else:
535 logger.warning('Copy output files is disabled!')
536
537 # Perform job cleanup.
538 if self.enable_cleanup:
539 self._cleanup()
540
541 logger.info('Successfully finished running job: %s' % self.description)
542
543 def _execute(self):
544 """!
545 Execute all components in job.
546
547 If dry_run is set to True, the components will not be exectuted,
548 list of components will be put out instead.
549 """
550 if not self.dry_run:
551
552 for component in self.components:
553
554 logger.info("Executing '%s' with command: %s" % (component.name, component.cmd_line_str()))
555 # logger.info("Component IO: {} -> {}".format(str(component.input_files(), component.output_files())))
556
557 # Print header to stdout
558 self.component_out.write('================ Component: %s ================\n' % component.name)
559 self.component_out.flush()
560
561 # Print header to stderr if output is going to a file
562 if self.component_err != sys.stderr:
563 self.component_err.write('================ Component: %s ================\n' % component.name)
564 self.component_err.flush()
565
566 start = time.time()
567 returncode = component.execute(self.component_out, self.component_err)
568 end = time.time()
569 elapsed = end - start
570 logger.info("Execution of {} took {} second(s) with return code: {}"
571 .format(component.name, round(elapsed, 4), str(returncode)))
572
573 if not self.ignore_return_codes and returncode:
574 raise Exception("Non-zero return code %d from '%s'" % (returncode, component.name))
575
576 if self.check_output_files:
577 for outputfile in component.output_files():
578 if not os.path.isfile(outputfile):
579 raise Exception("Output file '%s' is missing after execution." % outputfile)
580 else:
581 # Dry run mode. Just print component command but do not execute it.
582 logger.info("Dry run enabled. Components will NOT be executed!")
583 for component in self.components:
584 logger.info("'%s' with args: %s (DRY RUN)" % (component.name, ' '.join(component.cmd_args())))
585
586 def _setup(self):
587 """!
588 Necessary setup before job can be executed.
589 """
590
591 # Change to run dir
592 logger.info('Changing to run dir: %s' % self.rundir)
593 os.chdir(self.rundir)
594
595 # Create a symlink to the fieldmap directory
597
598 # Limit components according to job steps
599 if self.job_steps is not None:
600 if self.job_steps > 0:
601 self.components = self.components[0:self.job_steps]
602 logger.info("Job is limited to first %d steps." % self.job_steps)
603
604 if self.enable_file_chaining:
606
607 # Run setup methods of each component
608 for component in self.components:
609 logger.debug('Setting up component: %s' % (component.name))
610 component.rundir = self.rundir
611 component.setup()
612 if self.check_commands and not component.cmd_exists():
613 raise Exception("Command '%s' does not exist for '%s'." % (component.command, component.name))
614
616 """!
617 Pipe component outputs to inputs automatically.
618 """
619 for i in range(0, len(self.components)):
620 component = self.components[i]
621 logger.debug("Configuring file IO for component '%s' with order %d" % (component. name, i))
622 if i == 0:
623 logger.debug("Setting inputs on '%s' to: %s"
624 % (component.name, str(list(self.input_files.values()))))
625 if not len(component.inputs):
626 component.inputs = list(self.input_files.values())
627 elif i > -1:
628 logger.debug("Setting inputs on '%s' to: %s"
629 % (component.name, str(self.components[i - 1].output_files())))
630 if len(component.inputs) == 0:
631 component.inputs = self.components[i - 1].output_files()
632
634 """!
635 Push JSON job parameters to components.
636 """
637 for component in self.components:
638 component.set_parameters(self.params)
639
640 def _cleanup(self):
641 """!
642 Perform post-job cleanup.
643 """
644 for component in self.components:
645 logger.info('Running cleanup for component: %s' % str(component.name))
646 component.cleanup()
647 if self.delete_rundir:
648 logger.info('Deleting run dir: %s' % self.rundir)
649 if os.path.exists("%s/__swif_env__" % self.rundir):
650 for f in os.listdir(self.rundir):
651 if ('.log' not in f) and ('__swif_' not in f):
652 os.system('rm -r %s' % f)
653 else:
654 shutil.rmtree(self.rundir)
655 if self.component_out != sys.stdout:
656 try:
657 self.component_out.flush()
658 self.component_out.close()
659 except Exception as e:
660 logger.warn(e)
661
662 if self.component_err != sys.stderr:
663 try:
664 self.component_err.flush()
665 self.component_err.close()
666 except Exception as e:
667 logger.warn(e)
668
670 """!
671 Copy output files to output directory, handling ptags if necessary.
672 """
673
674 if not os.path.exists(self.output_dir):
675 logger.debug('Creating output dir: %s' % self.output_dir)
676 os.makedirs(self.output_dir, 0o755)
677
678 for src, dest in self.output_files.items():
679 src_file = src
680 if Job.is_ptag(src):
681 ptag_src = Job.get_ptag_from_src(src)
682 if ptag_src in list(self.ptags.keys()):
683 src_file = self.ptags[ptag_src]
684 logger.info("Resolved ptag: {} -> {}".format(ptag_src, src_file))
685 else:
686 raise Exception('Undefined ptag used in job params: %s' % ptag_src)
687 self._copy_output_file(src_file, dest)
688
689 def _copy_output_file(self, src, dest):
690 """!
691 Copy an output file from src to dest.
692 """
693
694 src_file = os.path.join(self.rundir, src)
695 dest_file = os.path.join(self.output_dir, dest)
696
697 # Create directory if not exists; this allows relative path segments
698 # in output file strings.
699 if not os.path.exists(os.path.dirname(dest_file)):
700 os.makedirs(os.path.dirname(dest_file), 0o755)
701
702 # Check if the file is already there and does not need copying (e.g. if running in local dir)
703 samefile = False
704 if os.path.exists(dest_file):
705 if os.path.samefile(src_file, dest_file):
706 samefile = True
707
708 # If target file already exists then see if it can be deleted; otherwise raise an error
709 if os.path.isfile(dest_file):
710 if self.delete_existing:
711 logger.debug('Deleting existing file: %s' % dest_file)
712 os.remove(dest_file)
713 else:
714 raise Exception('Output file already exists: %s' % dest_file)
715
716 # Copy the file to the destination dir if not already created by the job
717 logger.info("Copying '%s' to '%s'" % (src_file, dest_file))
718 if not samefile:
719 # shutil will throw and error if the copy obviously fails
720 shutil.copyfile(src_file, dest_file)
721 # take the time to double-check that the copy is identical to the original
722 # this catches any sneaky network-dropping related copy failures
723 if not filecmp.cmp(src_file, dest_file, shallow=False):
724 raise Exception("Copy from '%s' to '%s' failed." % (src_file, dest_file))
725 else:
726 logger.warning("Skipping copy of '%s' to '%s' because they are the same file!" % (src_file, dest_file))
727
729 """!
730 Copy input files to the run dir.
731 """
732 for src, dest in self.input_files.items():
733 # if not os.path.isabs(src):
734
736 if os.path.dirname(dest):
737 raise Exception("The input file destination '%s' is not valid." % dest)
738 logger.info("Copying input file: %s -> %s" % (src, os.path.join(self.rundir, dest)))
739 if '/mss/' in src:
740 src = src.replace('/mss/', '/cache/')
741 if os.path.exists(os.path.join(self.rundir, dest)):
742 logger.info("The input file '%s' already exists at destination '%s'" % (dest, self.rundir))
743 os.chmod(os.path.join(self.rundir, dest), 0o666)
744 else:
745 shutil.copyfile(src, os.path.join(self.rundir, dest))
746 os.chmod(dest, 0o666)
747
749 """!
750 Symlink input files.
751 """
752 for src, dest in self.input_files.items():
753 if not os.path.isabs(src):
754 raise Exception('The input source file is not an absolute path: %s' % src)
755 if os.path.dirname(dest):
756 raise Exception('The input file destination is not valid: %s' % dest)
757 logger.debug("Symlinking input '%s' to '%s'" % (src, os.path.join(self.rundir, dest)))
758 os.symlink(src, os.path.join(self.rundir, dest))
759
760 def ptag(self, tag, filename):
761 """!
762 Map a key to an output file name so a user can reference it in their job params.
763 """
764 if tag not in list(self.ptags.keys()):
765 self.ptags[tag] = filename
766 logger.info("Added ptag %s -> %s" % (tag, filename))
767 else:
768 raise Exception('The ptag already exists: %s' % tag)
769
770 @staticmethod
771 def is_ptag(src):
772 return src.startswith(Job.PTAG_PREFIX)
773
774 @staticmethod
776 if src.startswith(Job.PTAG_PREFIX):
777 return src[len(Job.PTAG_PREFIX):]
778 else:
779 raise Exception('File src is not a ptag: %s' % src)
780
781 def resolve_output_src(self, src):
782 if Job.is_ptag(src):
783 return self.ptags[Job.get_ptag_from_src(src)]
784 else:
785 return src
786
788 """!
789 Set fieldmap dir to install location if not provided in config
790 """
791 if self.hps_fieldmaps_dir is None:
792 self.hps_fieldmaps_dir = "{}/share/fieldmap".format(self.hpsmc_dir)
793 if not os.path.isdir(self.hps_fieldmaps_dir):
794 raise Exception("The fieldmaps dir does not exist: {}".format(self.hps_fieldmaps_dir))
795 logger.debug("Using fieldmap dir from install: {}".format(self.hps_fieldmaps_dir))
796 else:
797 logger.debug("Using fieldmap dir from config: {}".format(self.hps_fieldmaps_dir))
798
800 """!
801 Symlink to the fieldmap directory
802 """
803 fieldmap_symlink = pathlib.Path(os.getcwd(), "fieldmap")
804 if not fieldmap_symlink.exists():
805 logger.debug("Creating symlink to fieldmap directory: {}".format(fieldmap_symlink))
806 os.symlink(self.hps_fieldmaps_dir, "fieldmap")
807 else:
808 if fieldmap_symlink.is_dir() or os.path.islink(fieldmap_symlink):
809 logger.debug("Fieldmap symlink or directory already exists: {}".format(fieldmap_symlink))
810 else:
811 raise Exception("A file called 'fieldmap' exists but it is not a symlink or directory!")
812
813
814cmds = {
815 'run': 'Run a job script',
816 'script': 'Show list of available job scripts (provide script name for detailed info)',
817 'component': 'Show list of available components (provide component name for detailed info)'}
818
819
821 print("Usage: job.py [command] [args]")
822 print(" command:")
823 for name, descr in cmds.items():
824 print(" %s: %s" % (name, descr))
825
826
827if __name__ == '__main__':
828 if len(sys.argv) > 1:
829 cmd = sys.argv[1]
830 if cmd not in list(cmds.keys()):
832 raise Exception('The job command is not valid: %s' % cmd)
833 args = sys.argv[2:]
834 if cmd == 'run':
835 job = Job(args)
836 job.parse_args()
837 job.run()
838 elif cmd == 'script':
839 if len(sys.argv) > 2:
840 script = sys.argv[2]
841 from hpsmc.help import print_job_script
842 print_job_script(script)
843 else:
844 scriptdb = JobScriptDatabase()
845 print("AVAILABLE JOB SCRIPTS: ")
846 for name in sorted(scriptdb.get_script_names()):
847 print(' %s: %s' % (name, scriptdb.get_script_path(name)))
848 elif cmd == 'component':
849 if len(sys.argv) > 2:
850 from hpsmc.help import print_component
851 component_name = sys.argv[2]
852 print_component(component_name)
853 else:
854 from hpsmc.help import print_components
855 print_components()
856
857 else:
Wrapper for accessing config information from parser.
Definition job.py:29
__str__(self)
Definition job.py:35
__init__(self)
Definition job.py:32
config(self, obj, section=None, required_names=[], allowed_names=[], require_section=True)
Push config into an object by setting an attribute.
Definition job.py:44
Database of job scripts.
Definition job.py:125
exists(self, name)
Test if job script exists in dict.
Definition job.py:154
get_scripts(self)
Get dict containing paths to scripts sorted by script names.
Definition job.py:150
scripts
dict of paths to job scripts sorted by name
Definition job.py:135
get_script_path(self, name)
Get path to job script from job name.
Definition job.py:140
get_script_names(self)
Get list of all script names.
Definition job.py:146
Simple JSON based store of job data.
Definition job.py:73
__init__(self, path=None)
Definition job.py:78
load(self, json_store)
Load raw JSON data into this job store.
Definition job.py:86
has_job_id(self, job_id)
Return true if the job ID exists in the store.
Definition job.py:120
get_job_ids(self)
Get a sorted list of job IDs.
Definition job.py:116
get_job_data(self)
Get the raw dict containing all the job data.
Definition job.py:112
get_job(self, job_id)
Get a job by its job ID.
Definition job.py:104
Primary class to run HPS jobs from a Python script.
Definition job.py:160
hps_fieldmaps_dir
fieldmaps dir
Definition job.py:226
run(self)
This is the primary execution method for running the job.
Definition job.py:465
parse_args(self)
Configure the job from command line arguments.
Definition job.py:262
_copy_output_file(self, src, dest)
Copy an output file from src to dest.
Definition job.py:689
component_err
output for component error messages
Definition job.py:220
_copy_output_files(self)
Copy output files to output directory, handling ptags if necessary.
Definition job.py:669
enable_file_chaining
Definition job.py:238
description
short description of job, should be overridden by the job script
Definition job.py:198
input_files
dict of input files
Definition job.py:212
_config_fieldmap_dir(self)
Set fieldmap dir to install location if not provided in config.
Definition job.py:787
components
list of components in job
Definition job.py:204
add(self, component)
Public method for adding components to the job.
Definition job.py:241
resolve_output_src(self, src)
Definition job.py:781
job_config
Job configuration.
Definition job.py:196
params
dict of parameters
Definition job.py:208
job_id
job ID
Definition job.py:200
job_steps
job steps
Definition job.py:224
_config_file_pipeline(self)
Pipe component outputs to inputs automatically.
Definition job.py:615
_symlink_fieldmap_dir(self)
Symlink to the fieldmap directory.
Definition job.py:799
param_file
path to parameter file
Definition job.py:202
script
script containing component initializations
Definition job.py:222
_load_script(self)
Load the job script.
Definition job.py:436
_copy_input_files(self)
Copy input files to the run dir.
Definition job.py:728
_cleanup(self)
Perform post-job cleanup.
Definition job.py:640
rundir
rundir is current working directory
Definition job.py:206
_setup(self)
Necessary setup before job can be executed.
Definition job.py:586
_initialize(self)
Perform basic initialization before the job script is loaded.
Definition job.py:387
get_ptag_from_src(src)
Definition job.py:775
output_files
dict of output files
Definition job.py:214
set_parameters(self, params)
Add parameters to the job, overriding values if they exist already.
Definition job.py:250
__init__(self, args=sys.argv, **kwargs)
Definition job.py:187
args
(passed) job arguments
Definition job.py:194
component_out
output for component printouts
Definition job.py:218
check_output_files
Definition job.py:236
_load_params(self, params)
Load the job parameters from JSON data.
Definition job.py:345
enable_env_config
Definition job.py:239
ptag(self, tag, filename)
Map a key to an output file name so a user can reference it in their job params.
Definition job.py:760
_execute(self)
Execute all components in job.
Definition job.py:543
delete_existing
Definition job.py:232
_set_parameters(self)
Push JSON job parameters to components.
Definition job.py:633
_configure(self)
Configure job class and components.
Definition job.py:408
enable_copy_output_files
These attributes can all be set in the config file.
Definition job.py:229
is_ptag(src)
Definition job.py:771
_symlink_input_files(self)
Symlink input files.
Definition job.py:748
output_dir
output_dir is current working directory
Definition job.py:210
_set_input_files(self)
Prepare dictionary of input files.
Definition job.py:369
ignore_return_codes
Definition job.py:235
enable_copy_input_files
Definition job.py:230
ptags
dict with keys to output filenames
Definition job.py:216
Utility script for printing help about component classes.
Definition help.py:1
print_usage()
Definition job.py:820