HPS-MC
 
Loading...
Searching...
No Matches
job.py
Go to the documentation of this file.
1"""! @package job
2Primary class for running and managing HPSMC jobs defined by a set of components.
3"""
4
5import os
6import sys
7import json
8import time
9import shutil
10import filecmp
11import argparse
12import getpass
13import logging
14import glob
15import subprocess
16import copy
17import pathlib
18
19from collections.abc import Sequence
20from os.path import expanduser
21
22from hpsmc._config import convert_config_value
23from hpsmc import global_config
24
25
26logger = logging.getLogger('hpsmc.job')
27
28
29class JobConfig(object):
30 """! Wrapper for accessing config information from parser."""
31
32 def __init__(self):
33 self.parser = copy.copy(global_config)
34
35 def __str__(self):
36 parser_lines = ['Job configuration:']
37 for section in self.parser.sections():
38 parser_lines.append("[" + section + "]")
39 for i, v in self.parser.items(section):
40 parser_lines.append("%s=%s" % (i, v))
41 parser_lines.append('')
42 return '\n'.join(parser_lines)
43
44 def config(self, obj, section=None, required_names=[], allowed_names=[], require_section=True):
45 """! Push config into an object by setting an attribute.
46 @param obj object config is pushed to
47 @param section object name (basically name of the component the config is pushed to?)
48 @param required_names list of names of required configurations
49 @param allowed_names list of allowed config names
50 """
51 if not section:
52 section = obj.__class__.__name__
53 if self.parser.has_section(section):
54 # Check that required settings are there.
55 for req in required_names:
56 if req not in dict(self.parser.items(section)):
57 raise Exception("Missing required config '%s'" % req)
58 # Push each config item into the object by setting attribute.
59 for name, value in self.parser.items(section):
60 if len(allowed_names) and name not in allowed_names:
61 raise Exception("Config name '%s' is not allowed for '%s'" % (name, section))
62 setattr(obj, name, convert_config_value(value))
63 # logger.info("%s:%s:%s=%s" % (obj.__class__.__name__,
64 # name,
65 # getattr(obj, name).__class__.__name__,
66 # getattr(obj, name)))
67 elif require_section:
68 raise Exception("Missing required config section '%s'" % section)
69 else:
70 logger.warning('Config section not found: %s' % section)
71
72
74 """!
75 Simple JSON based store of job data.
76 """
77
78 def __init__(self, path=None):
79 self.path = path
80 self.data = {}
81 if self.path:
82 self.load(self.path)
83 else:
84 logger.warning('Path was not provided to job store - no jobs loaded!')
85
86 def load(self, json_store):
87 """! Load raw JSON data into this job store.
88 @param json_store json file containing raw job data
89 """
90 if not os.path.exists(json_store):
91 raise Exception('JSON job store does not exist: {}'.format(json_store))
92 with open(json_store, 'r') as f:
93 json_data = json.loads(f.read())
94 for job_data in json_data:
95 if 'job_id' not in job_data:
96 raise Exception(f"Job data is missing a job id")
97 job_id = int(job_data['job_id'])
98 if job_id in self.data:
99 raise Exception(f"Duplicate job id: {job_id}")
100 self.data[job_id] = job_data
101 logger.debug("Loaded job {} data:\n {}".format(job_id, job_data))
102 logger.info(f"Successfully loaded {len(self.data)} jobs from: {json_store}")
103
104 def get_job(self, job_id):
105 """! Get a job by its job ID.
106 @param job_id job ID
107 @return job"""
108 if not self.has_job_id(job_id):
109 raise Exception(f"Job ID does not exist: {job_id}")
110 return self.data[int(job_id)]
111
112 def get_job_data(self):
113 """! Get the raw dict containing all the job data."""
114 return self.data
115
116 def get_job_ids(self):
117 """! Get a sorted list of job IDs."""
118 return sorted(self.data.keys())
119
120 def has_job_id(self, job_id):
121 """! Return true if the job ID exists in the store."""
122 return int(job_id) in list(self.data.keys())
123
124
126 """! Database of job scripts.
127 """
128
129 def __init__(self):
130 if 'HPSMC_DIR' not in os.environ:
131 raise Exception('HPSMC_DIR is not set in the environ.')
132 hps_mc_dir = os.environ['HPSMC_DIR']
133 script_dir = os.path.join(hps_mc_dir, 'lib', 'python', 'jobs')
134
135 self.scripts = {}
136 for f in glob.glob(os.path.join(script_dir, '*_job.py')):
137 script_name = os.path.splitext(os.path.basename(f))[0].replace('_job', '')
138 self.scripts[script_name] = f
139
140 def get_script_path(self, name):
141 """! Get path to job script from job name.
142 @param job name
143 @return path to job script"""
144 return self.scripts[name]
145
147 """! Get list of all script names."""
148 return list(self.scripts.keys())
149
150 def get_scripts(self):
151 """! Get dict containing paths to scripts sorted by script names."""
152 return self.scripts
153
154 def exists(self, name):
155 """! Test if job script exists in dict.
156 @return True if job name is key in job dict"""
157 return name in self.scripts
158
159
160class Job(object):
161 """!
162 Primary class to run HPS jobs from a Python script.
163
164 Jobs are run by executing a series of components
165 which are configured using a config file with parameters
166 provided by a JSON job file.
167 """
168
169
171 """
172 _config_names = ['enable_copy_output_files',
173 'enable_copy_input_files',
174 'delete_existing',
175 'delete_rundir',
176 'dry_run',
177 'ignore_return_codes',
178 'check_output_files',
179 'enable_file_chaining',
180 'enable_env_config']
181 """
182
183 # Prefix to indicate ptag in job param file.
184 PTAG_PREFIX = 'ptag:'
185
186 def __init__(self, args=sys.argv, **kwargs):
187
188 if 'HPSMC_DIR' not in os.environ:
189 raise Exception('HPSMC_DIR is not set in the environ.')
190 self.hpsmc_dir = os.environ['HPSMC_DIR']
191
192
193 self.args = args
194
196
197 self.description = "HPS MC Job"
198
199 self.job_id = None
200
201 self.param_file = None
202
203 self.components = []
204
205 self.rundir = os.getcwd()
206
207 self.params = {}
208
209 self.output_dir = os.getcwd()
210
211 self.input_files = {}
212
213 self.output_files = {}
214
215 self.ptags = {}
216
217 self.component_out = sys.stdout
218
219 self.component_err = sys.stderr
220
221 self.script = None
222
223 self.job_steps = None
224
226
227
230 self.delete_existing = False
231 self.delete_rundir = False
232 self.dry_run = False
235 self.check_commands = False
237 self.enable_env_config = False
238
239 def add(self, component):
240 """!
241 Public method for adding components to the job.
242 """
243 if isinstance(component, Sequence) and not isinstance(component, str):
244 self.components.extend(component)
245 else:
246 self.components.append(component)
247
248 def set_parameters(self, params):
249 """!
250 Add parameters to the job, overriding values if they exist already.
251
252 This method can be used in job scripts to define default values.
253 """
254 for k, v in params.items():
255 if k in self.params:
256 logger.debug("Setting new value '%s' for parameter '%s' with existing value '%s'."
257 % (str(v), str(k), params[k]))
258 self.params[k] = v
259
260 def parse_args(self):
261 """!
262 Configure the job from command line arguments.
263 """
264
265 parser = argparse.ArgumentParser(description=self.description)
266 parser.add_argument("-c", "--config-file", help="Config file locations", action='append')
267 parser.add_argument("-d", "--run-dir", nargs='?', help="Job run dir")
268 parser.add_argument("-o", "--out", nargs='?', help="File for component stdout (default prints to console)")
269 parser.add_argument("-e", "--err", nargs='?', help="File for component stderr (default prints to console)")
270 parser.add_argument("-s", "--job-steps", type=int, default=None, help="Job steps to run (single number)")
271 parser.add_argument("-i", "--job-id", type=int, help="Job ID from JSON job store", default=None)
272 parser.add_argument("script", nargs='?', help="Path or name of job script")
273 parser.add_argument("params", nargs='?', help="Job param file in JSON format")
274
275
281
282 cl = parser.parse_args(self.args)
283
284 # Read in job configuration files
285 if cl.config_file:
286 config_files = list(map(os.path.abspath, cl.config_file))
287 logger.info("Reading additional config from: {}".format(config_files))
288 self.job_config.parser.read(config_files)
289
290 # Set file for stdout from components
291 if cl.out:
292 out_file = cl.out
293 if not os.path.isabs(out_file):
294 out_file = os.path.abspath(out_file)
295 logger.info('Job output will be written to: %s' % out_file)
296 self.out = open(out_file, 'w')
297
298 # Set file for stderr from components
299 if cl.err:
300 err_file = cl.err
301 if not os.path.isabs(err_file):
302 err_file = os.path.abspath(err_file)
303 logger.info('Job error will be written to: %s' % err_file)
304 self.err = open(err_file, 'w')
305
306 if cl.run_dir:
307 self.rundir = os.path.abspath(cl.run_dir)
308
309 self.job_steps = cl.job_steps
310 if self.job_steps is not None and self.job_steps < 1:
311 raise Exception("Invalid job steps argument (must be > 0): {}".format(self.job_steps))
312
313 if cl.script:
314
315 self.script = cl.script
316 else:
317 raise Exception('Missing required script name or location.')
318
319 # Params are actually optional as some job scripts might not need them.
320 if cl.params:
321 self.param_file = os.path.abspath(cl.params)
322 params = {}
323 if cl.job_id:
324 # Load data from a job store containing multiple jobs.
325 self.job_id = cl.job_id
326 logger.debug("Loading job with ID %d from job store '%s'" % (self.job_id, self.param_file))
327 jobstore = JobStore(self.param_file)
328 if jobstore.has_job_id(self.job_id):
329 params = jobstore.get_job(self.job_id)
330 else:
331 raise Exception("No job id %d was found in the job store '%s'" % (self.job_id, self.param_file))
332 else:
333 # Load data from a JSON file with a single job definition.
334 logger.info('Loading job parameters from file: %s' % self.param_file)
335 params = json.loads(open(self.param_file, 'r').read())
336 if not isinstance(params, dict):
337 raise Exception('Job ID must be provided when running from a job store.')
338
339 self._load_params(params)
340
341 def _load_params(self, params):
342 """!
343 Load the job parameters from JSON data.
344 """
345
346 self.set_parameters(params)
347
348 # logger.info(json.dumps(self.params, indent=4, sort_keys=False))
349
350 if 'output_dir' in self.params:
351 self.output_dir = self.params['output_dir']
352 if not os.path.isabs(self.output_dir):
353 self.output_dir = os.path.abspath(self.output_dir)
354 logger.debug("Changed output dir to abs path: %s" % self.output_dir)
355
356 if 'job_id' in self.params:
357 self.job_id = self.params['job_id']
358
359 if 'input_files' in self.params:
360 self.input_files = self.params['input_files']
361
362 if 'output_files' in self.params:
363 self.output_files = self.params['output_files']
364
366 """!
367 Prepare dictionary of input files.
368
369 If a link to a download location is given as input, the file is downloaded into the run directory before the file name is added to the input_files dict. If a regular file is provided, it is added to the dict without any additional action.
370 """
371 input_files_dict = {}
372 for file_key, file_name in self.input_files.items():
373 if 'https' in file_key:
374 logger.info("Downloading input file from: %s" % file_key)
375 file_name_path = self.rundir + "/" + file_name
376 # \todo FIXME: We need to make sure wget is installed locally during the build or use a python lib like requests.
377 subprocess.check_output(['wget', '-q', '-O', file_name_path, file_key])
378 input_files_dict.update({file_name: file_name})
379 else:
380 input_files_dict.update({file_key: file_name})
381 self.input_files = input_files_dict
382
383 def _initialize(self):
384 """!
385 Perform basic initialization before the job script is loaded.
386 """
387
388 if not os.path.isabs(self.rundir):
389 self.rundir = os.path.abspath(self.rundir)
390 logger.info('Changed run dir to abs path: %s' % self.rundir)
391 # raise Exception('The run dir is not an absolute path: %s' % self.rundir)
392
393 # Set run dir if running inside LSF
394 if "LSB_JOBID" in os.environ:
395 self.rundir = os.path.join("/scratch", getpass.getuser(), os.environ["LSB_JOBID"])
396 logger.info('Set run dir for LSF: %s' % self.rundir)
397 self.delete_rundir = True
398
399 # Create run dir if it does not exist
400 if not os.path.exists(self.rundir):
401 logger.info('Creating run dir: %s' % self.rundir)
402 os.makedirs(self.rundir)
403
404 def _configure(self):
405 """!
406 Configure job class and components.
407 """
408
409 # Configure job class
410 self.job_config.config(self, require_section=False)
411
412 # Configure the location of the fieldmap files
414
415 # Configure each of the job components
416 for component in self.components:
417
418 # Configure logging for the component.
419 component.config_logging(self.job_config.parser)
420
421 # Configure the component from job configuration.
422 component.config(self.job_config.parser)
423
424
425 if self.enable_env_config:
426 # Configure from env vars, if enabled.
427 component.config_from_environ()
428
429 # Check that the config is acceptable.
430 component.check_config()
431
432 def _load_script(self):
433 """!
434 Load the job script.
435 """
436 # This might be okay if the user is manually adding components to a job for testing
437 # without the command line interface. If no components are added before the job is
438 # run, then this will be caught later.
439 if self.script is None:
440 logger.warning("No job script was provided!")
441 return
442
443 if not self.script.endswith('.py'):
444 # Script is a name.
445 script_db = JobScriptDatabase()
446 if not script_db.exists(self.script):
447 raise Exception("The script name is not valid: %s" % self.script)
448 script_path = script_db.get_script_path(self.script)
449 logger.debug("Found script '%s' from name '%s'" % (script_path, self.script))
450 else:
451 # Script is a path.
452 script_path = self.script
453
454 if not os.path.exists(script_path):
455 raise Exception('Job script does not exist: %s' % script_path)
456
457 logger.info('Loading job script: %s' % script_path)
458
459 exec(compile(open(script_path, "rb").read(), script_path, 'exec'), {'job': self})
460
461 def run(self):
462 """!
463 This is the primary execution method for running the job.
464 """
465
466 logger.info('Job ID: ' + str(self.job_id))
467 logger.info('Description: %s' % self.description)
468
469 # Print config to the log
470 logger.info(str(self.job_config))
471
472 # Initialize after CL parameters were parsed.
473 self._initialize()
474
475 # Load the job components from the script
476 self._load_script()
477
478 if not len(self.components):
479 raise Exception("Job has no components to execute.")
480
481 # Print list of job components
482 logger.info("Job components loaded: %s" % ([c.name for c in self.components]))
483
484 # Print job parameters.
485 if len(self.params) > 0:
486 logger.info("Job parameters loaded: %s" % str(self.params))
487 else:
488 logger.info("No job parameters were specified!")
489
490 # This will configure the Job class and its components by copying
491 # information into them from loaded config files.
492 self._configure()
493
494 # This (re)sets the input correctly
495 self._set_input_files()
496
497 # Set component parameters from job JSON file.
498 self._set_parameters()
499
500 # Perform component setup to prepare for execution.
501 # May use config and parameters that were set from above.
502 self._setup()
503
504 if not self.dry_run:
506 # Copy input files to the run dir.
507 self._copy_input_files()
508 else:
509 # Symlink input files if copying is disabled.
511
512 # Save job start time
513 start_time = time.time()
514
515 # Execute the job.
516 self._execute()
517
518 # Copy the output files to the output dir if enabled and not in dry run.
519 if not self.dry_run:
520
521 # Print job timer info
522 stop_time = time.time()
523 elapsed = stop_time - start_time
524 logger.info("Job execution took {} seconds".format(round(elapsed, 4)))
525
526 # Copy by file path or ptag
528
529 self._copy_output_files()
530 else:
531 logger.warning('Copy output files is disabled!')
532
533 # Perform job cleanup.
534 self._cleanup()
535
536 logger.info('Successfully finished running job: %s' % self.description)
537
538 def _execute(self):
539 """!
540 Execute all components in job.
541
542 If dry_run is set to True, the components will not be exectuted,
543 list of components will be put out instead.
544 """
545 if not self.dry_run:
546
547 for component in self.components:
548
549 logger.info("Executing '%s' with command: %s" % (component.name, component.cmd_line_str()))
550 # logger.info("Component IO: {} -> {}".format(str(component.input_files(), component.output_files())))
551
552 # Print header to stdout
553 self.component_out.write('================ Component: %s ================\n' % component.name)
554 self.component_out.flush()
555
556 # Print header to stderr if output is going to a file
557 if self.component_err != sys.stderr:
558 self.component_err.write('================ Component: %s ================\n' % component.name)
559 self.component_err.flush()
560
561 start = time.time()
562 returncode = component.execute(self.component_out, self.component_err)
563 end = time.time()
564 elapsed = end - start
565 logger.info("Execution of {} took {} second(s) with return code: {}"
566 .format(component.name, round(elapsed, 4), str(returncode)))
567
568 if not self.ignore_return_codes and returncode:
569 raise Exception("Non-zero return code %d from '%s'" % (returncode, component.name))
570
571 if self.check_output_files:
572 for outputfile in component.output_files():
573 if not os.path.isfile(outputfile):
574 raise Exception("Output file '%s' is missing after execution." % outputfile)
575 else:
576 # Dry run mode. Just print component command but do not execute it.
577 logger.info("Dry run enabled. Components will NOT be executed!")
578 for component in self.components:
579 logger.info("'%s' with args: %s (DRY RUN)" % (component.name, ' '.join(component.cmd_args())))
580
581 def _setup(self):
582 """!
583 Necessary setup before job can be executed.
584 """
585
586 # Change to run dir
587 logger.info('Changing to run dir: %s' % self.rundir)
588 os.chdir(self.rundir)
589
590 # Create a symlink to the fieldmap directory
592
593 # Limit components according to job steps
594 if self.job_steps is not None:
595 if self.job_steps > 0:
596 self.components = self.components[0:self.job_steps]
597 logger.info("Job is limited to first %d steps." % self.job_steps)
598
599 if self.enable_file_chaining:
601
602 # Run setup methods of each component
603 for component in self.components:
604 logger.debug('Setting up component: %s' % (component.name))
605 component.rundir = self.rundir
606 component.setup()
607 if self.check_commands and not component.cmd_exists():
608 raise Exception("Command '%s' does not exist for '%s'." % (component.command, component.name))
609
611 """!
612 Pipe component outputs to inputs automatically.
613 """
614 for i in range(0, len(self.components)):
615 component = self.components[i]
616 logger.debug("Configuring file IO for component '%s' with order %d" % (component. name, i))
617 if i == 0:
618 logger.debug("Setting inputs on '%s' to: %s"
619 % (component.name, str(list(self.input_files.values()))))
620 if not len(component.inputs):
621 component.inputs = list(self.input_files.values())
622 elif i > -1:
623 logger.debug("Setting inputs on '%s' to: %s"
624 % (component.name, str(self.components[i - 1].output_files())))
625 if len(component.inputs) == 0:
626 component.inputs = self.components[i - 1].output_files()
627
629 """!
630 Push JSON job parameters to components.
631 """
632 for component in self.components:
633 component.set_parameters(self.params)
634
635 def _cleanup(self):
636 """!
637 Perform post-job cleanup.
638 """
639 for component in self.components:
640 logger.info('Running cleanup for component: %s' % str(component.name))
641 component.cleanup()
642 if self.delete_rundir:
643 logger.info('Deleting run dir: %s' % self.rundir)
644 if os.path.exists("%s/__swif_env__" % self.rundir):
645 for f in os.listdir(self.rundir):
646 if ('.log' not in f) and ('__swif_' not in f):
647 os.system('rm -r %s' % f)
648 else:
649 shutil.rmtree(self.rundir)
650 if self.component_out != sys.stdout:
651 try:
652 self.component_out.flush()
653 self.component_out.close()
654 except Exception as e:
655 logger.warn(e)
656
657 if self.component_err != sys.stderr:
658 try:
659 self.component_err.flush()
660 self.component_err.close()
661 except Exception as e:
662 logger.warn(e)
663
665 """!
666 Copy output files to output directory, handling ptags if necessary.
667 """
668
669 if not os.path.exists(self.output_dir):
670 logger.debug('Creating output dir: %s' % self.output_dir)
671 os.makedirs(self.output_dir, 0o755)
672
673 for src, dest in self.output_files.items():
674 src_file = src
675 if Job.is_ptag(src):
676 ptag_src = Job.get_ptag_from_src(src)
677 if ptag_src in list(self.ptags.keys()):
678 src_file = self.ptags[ptag_src]
679 logger.info("Resolved ptag: {} -> {}".format(ptag_src, src_file))
680 else:
681 raise Exception('Undefined ptag used in job params: %s' % ptag_src)
682 self._copy_output_file(src_file, dest)
683
684 def _copy_output_file(self, src, dest):
685 """!
686 Copy an output file from src to dest.
687 """
688
689 src_file = os.path.join(self.rundir, src)
690 dest_file = os.path.join(self.output_dir, dest)
691
692 # Create directory if not exists; this allows relative path segments
693 # in output file strings.
694 if not os.path.exists(os.path.dirname(dest_file)):
695 os.makedirs(os.path.dirname(dest_file), 0o755)
696
697 # Check if the file is already there and does not need copying (e.g. if running in local dir)
698 samefile = False
699 if os.path.exists(dest_file):
700 if os.path.samefile(src_file, dest_file):
701 samefile = True
702
703 # If target file already exists then see if it can be deleted; otherwise raise an error
704 if os.path.isfile(dest_file):
705 if self.delete_existing:
706 logger.debug('Deleting existing file: %s' % dest_file)
707 os.remove(dest_file)
708 else:
709 raise Exception('Output file already exists: %s' % dest_file)
710
711 # Copy the file to the destination dir if not already created by the job
712 logger.info("Copying '%s' to '%s'" % (src_file, dest_file))
713 if not samefile:
714 # shutil will throw and error if the copy obviously fails
715 shutil.copyfile(src_file, dest_file)
716 # take the time to double-check that the copy is identical to the original
717 # this catches any sneaky network-dropping related copy failures
718 if not filecmp.cmp(src_file, dest_file, shallow=False):
719 raise Exception("Copy from '%s' to '%s' failed." % (src_file, dest_file))
720 else:
721 logger.warning("Skipping copy of '%s' to '%s' because they are the same file!" % (src_file, dest_file))
722
724 """!
725 Copy input files to the run dir.
726 """
727 for src, dest in self.input_files.items():
728 # if not os.path.isabs(src):
729
731 if os.path.dirname(dest):
732 raise Exception("The input file destination '%s' is not valid." % dest)
733 logger.info("Copying input file: %s -> %s" % (src, os.path.join(self.rundir, dest)))
734 if '/mss/' in src:
735 src = src.replace('/mss/', '/cache/')
736 if os.path.exists(os.path.join(self.rundir, dest)):
737 logger.info("The input file '%s' already exists at destination '%s'" % (dest, self.rundir))
738 os.chmod(os.path.join(self.rundir, dest), 0o666)
739 else:
740 shutil.copyfile(src, os.path.join(self.rundir, dest))
741 os.chmod(dest, 0o666)
742
744 """!
745 Symlink input files.
746 """
747 for src, dest in self.input_files.items():
748 if not os.path.isabs(src):
749 raise Exception('The input source file is not an absolute path: %s' % src)
750 if os.path.dirname(dest):
751 raise Exception('The input file destination is not valid: %s' % dest)
752 logger.debug("Symlinking input '%s' to '%s'" % (src, os.path.join(self.rundir, dest)))
753 os.symlink(src, os.path.join(self.rundir, dest))
754
755 def ptag(self, tag, filename):
756 """!
757 Map a key to an output file name so a user can reference it in their job params.
758 """
759 if tag not in list(self.ptags.keys()):
760 self.ptags[tag] = filename
761 logger.info("Added ptag %s -> %s" % (tag, filename))
762 else:
763 raise Exception('The ptag already exists: %s' % tag)
764
765 @staticmethod
766 def is_ptag(src):
767 return src.startswith(Job.PTAG_PREFIX)
768
769 @staticmethod
771 if src.startswith(Job.PTAG_PREFIX):
772 return src[len(Job.PTAG_PREFIX):]
773 else:
774 raise Exception('File src is not a ptag: %s' % src)
775
776 def resolve_output_src(self, src):
777 if Job.is_ptag(src):
778 return self.ptags[Job.get_ptag_from_src(src)]
779 else:
780 return src
781
783 """!
784 Set fieldmap dir to install location if not provided in config
785 """
786 if self.hps_fieldmaps_dir is None:
787 self.hps_fieldmaps_dir = "{}/share/fieldmap".format(self.hpsmc_dir)
788 if not os.path.isdir(self.hps_fieldmaps_dir):
789 raise Exception("The fieldmaps dir does not exist: {}".format(self.hps_fieldmaps_dir))
790 logger.debug("Using fieldmap dir from install: {}".format(self.hps_fieldmaps_dir))
791 else:
792 logger.debug("Using fieldmap dir from config: {}".format(self.hps_fieldmaps_dir))
793
795 """!
796 Symlink to the fieldmap directory
797 """
798 fieldmap_symlink = pathlib.Path(os.getcwd(), "fieldmap")
799 if not fieldmap_symlink.exists():
800 logger.debug("Creating symlink to fieldmap directory: {}".format(fieldmap_symlink))
801 os.symlink(self.hps_fieldmaps_dir, "fieldmap")
802 else:
803 if fieldmap_symlink.is_dir() or os.path.islink(fieldmap_symlink):
804 logger.debug("Fieldmap symlink or directory already exists: {}".format(fieldmap_symlink))
805 else:
806 raise Exception("A file called 'fieldmap' exists but it is not a symlink or directory!")
807
808
809cmds = {
810 'run': 'Run a job script',
811 'script': 'Show list of available job scripts (provide script name for detailed info)',
812 'component': 'Show list of available components (provide component name for detailed info)'}
813
814
816 print("Usage: job.py [command] [args]")
817 print(" command:")
818 for name, descr in cmds.items():
819 print(" %s: %s" % (name, descr))
820
821
822if __name__ == '__main__':
823 if len(sys.argv) > 1:
824 cmd = sys.argv[1]
825 if cmd not in list(cmds.keys()):
827 raise Exception('The job command is not valid: %s' % cmd)
828 args = sys.argv[2:]
829 if cmd == 'run':
830 job = Job(args)
831 job.parse_args()
832 job.run()
833 elif cmd == 'script':
834 if len(sys.argv) > 2:
835 script = sys.argv[2]
836 from hpsmc.help import print_job_script
837 print_job_script(script)
838 else:
839 scriptdb = JobScriptDatabase()
840 print("AVAILABLE JOB SCRIPTS: ")
841 for name in sorted(scriptdb.get_script_names()):
842 print(' %s: %s' % (name, scriptdb.get_script_path(name)))
843 elif cmd == 'component':
844 if len(sys.argv) > 2:
845 from hpsmc.help import print_component
846 component_name = sys.argv[2]
847 print_component(component_name)
848 else:
849 from hpsmc.help import print_components
850 print_components()
851
852 else:
Wrapper for accessing config information from parser.
Definition job.py:29
__str__(self)
Definition job.py:35
__init__(self)
Definition job.py:32
config(self, obj, section=None, required_names=[], allowed_names=[], require_section=True)
Push config into an object by setting an attribute.
Definition job.py:44
Database of job scripts.
Definition job.py:125
exists(self, name)
Test if job script exists in dict.
Definition job.py:154
get_scripts(self)
Get dict containing paths to scripts sorted by script names.
Definition job.py:150
scripts
dict of paths to job scripts sorted by name
Definition job.py:135
get_script_path(self, name)
Get path to job script from job name.
Definition job.py:140
get_script_names(self)
Get list of all script names.
Definition job.py:146
Simple JSON based store of job data.
Definition job.py:73
__init__(self, path=None)
Definition job.py:78
load(self, json_store)
Load raw JSON data into this job store.
Definition job.py:86
has_job_id(self, job_id)
Return true if the job ID exists in the store.
Definition job.py:120
get_job_ids(self)
Get a sorted list of job IDs.
Definition job.py:116
get_job_data(self)
Get the raw dict containing all the job data.
Definition job.py:112
get_job(self, job_id)
Get a job by its job ID.
Definition job.py:104
Primary class to run HPS jobs from a Python script.
Definition job.py:160
hps_fieldmaps_dir
fieldmaps dir
Definition job.py:225
run(self)
This is the primary execution method for running the job.
Definition job.py:461
parse_args(self)
Configure the job from command line arguments.
Definition job.py:260
_copy_output_file(self, src, dest)
Copy an output file from src to dest.
Definition job.py:684
component_err
output for component error messages
Definition job.py:219
_copy_output_files(self)
Copy output files to output directory, handling ptags if necessary.
Definition job.py:664
enable_file_chaining
Definition job.py:236
description
short description of job, should be overridden by the job script
Definition job.py:197
input_files
dict of input files
Definition job.py:211
_config_fieldmap_dir(self)
Set fieldmap dir to install location if not provided in config.
Definition job.py:782
components
list of components in job
Definition job.py:203
add(self, component)
Public method for adding components to the job.
Definition job.py:239
resolve_output_src(self, src)
Definition job.py:776
job_config
Job configuration.
Definition job.py:195
params
dict of parameters
Definition job.py:207
job_id
job ID
Definition job.py:199
job_steps
job steps
Definition job.py:223
_config_file_pipeline(self)
Pipe component outputs to inputs automatically.
Definition job.py:610
_symlink_fieldmap_dir(self)
Symlink to the fieldmap directory.
Definition job.py:794
param_file
path to parameter file
Definition job.py:201
script
script containing component initializations
Definition job.py:221
_load_script(self)
Load the job script.
Definition job.py:432
_copy_input_files(self)
Copy input files to the run dir.
Definition job.py:723
_cleanup(self)
Perform post-job cleanup.
Definition job.py:635
rundir
rundir is current working directory
Definition job.py:205
_setup(self)
Necessary setup before job can be executed.
Definition job.py:581
_initialize(self)
Perform basic initialization before the job script is loaded.
Definition job.py:383
get_ptag_from_src(src)
Definition job.py:770
output_files
dict of output files
Definition job.py:213
set_parameters(self, params)
Add parameters to the job, overriding values if they exist already.
Definition job.py:248
__init__(self, args=sys.argv, **kwargs)
Definition job.py:186
args
(passed) job arguments
Definition job.py:193
component_out
output for component printouts
Definition job.py:217
check_output_files
Definition job.py:234
_load_params(self, params)
Load the job parameters from JSON data.
Definition job.py:341
enable_env_config
Definition job.py:237
ptag(self, tag, filename)
Map a key to an output file name so a user can reference it in their job params.
Definition job.py:755
_execute(self)
Execute all components in job.
Definition job.py:538
delete_existing
Definition job.py:230
_set_parameters(self)
Push JSON job parameters to components.
Definition job.py:628
_configure(self)
Configure job class and components.
Definition job.py:404
enable_copy_output_files
These attributes can all be set in the config file.
Definition job.py:228
is_ptag(src)
Definition job.py:766
_symlink_input_files(self)
Symlink input files.
Definition job.py:743
output_dir
output_dir is current working directory
Definition job.py:209
_set_input_files(self)
Prepare dictionary of input files.
Definition job.py:365
ignore_return_codes
Definition job.py:233
enable_copy_input_files
Definition job.py:229
ptags
dict with keys to output filenames
Definition job.py:215
Utility script for printing help about component classes.
Definition help.py:1
print_usage()
Definition job.py:815