2Primary class for running and managing HPSMC jobs defined by a set of components.
19from collections.abc
import Sequence
20from os.path
import expanduser
23from hpsmc
import global_config
26logger = logging.getLogger(
'hpsmc.job')
30 """! Wrapper for accessing config information from parser."""
33 self.
parser = copy.copy(global_config)
36 parser_lines = [
'Job configuration:']
37 for section
in self.
parser.sections():
38 parser_lines.append(
"[" + section +
"]")
39 for i, v
in self.
parser.items(section):
40 parser_lines.append(
"%s=%s" % (i, v))
41 parser_lines.append(
'')
42 return '\n'.join(parser_lines)
44 def config(self, obj, section=None, required_names=[], allowed_names=[], require_section=True):
45 """! Push config into an object by setting an attribute.
46 @param obj object config is pushed to
47 @param section object name (basically name of the component the config is pushed to?)
48 @param required_names list of names of required configurations
49 @param allowed_names list of allowed config names
52 section = obj.__class__.__name__
53 if self.
parser.has_section(section):
55 for req
in required_names:
56 if req
not in dict(self.
parser.items(section)):
57 raise Exception(
"Missing required config '%s'" % req)
59 for name, value
in self.
parser.items(section):
60 if len(allowed_names)
and name
not in allowed_names:
61 raise Exception(
"Config name '%s' is not allowed for '%s'" % (name, section))
62 setattr(obj, name, convert_config_value(value))
68 raise Exception(
"Missing required config section '%s'" % section)
70 logger.warning(
'Config section not found: %s' % section)
75 Simple JSON based store of job data.
84 logger.warning(
'Path was not provided to job store - no jobs loaded!')
86 def load(self, json_store):
87 """! Load raw JSON data into this job store.
88 @param json_store json file containing raw job data
90 if not os.path.exists(json_store):
91 raise Exception(
'JSON job store does not exist: {}'.format(json_store))
92 with open(json_store,
'r')
as f:
93 json_data = json.loads(f.read())
94 for job_data
in json_data:
95 if 'job_id' not in job_data:
96 raise Exception(f
"Job data is missing a job id")
97 job_id = int(job_data[
'job_id'])
98 if job_id
in self.
data:
99 raise Exception(f
"Duplicate job id: {job_id}")
100 self.
data[job_id] = job_data
101 logger.debug(
"Loaded job {} data:\n {}".format(job_id, job_data))
102 logger.info(f
"Successfully loaded {len(self.data)} jobs from: {json_store}")
105 """! Get a job by its job ID.
109 raise Exception(f
"Job ID does not exist: {job_id}")
110 return self.
data[int(job_id)]
113 """! Get the raw dict containing all the job data."""
117 """! Get a sorted list of job IDs."""
118 return sorted(self.
data.keys())
121 """! Return true if the job ID exists in the store."""
122 return int(job_id)
in list(self.
data.keys())
126 """! Database of job scripts.
130 if 'HPSMC_DIR' not in os.environ:
131 raise Exception(
'HPSMC_DIR is not set in the environ.')
132 hps_mc_dir = os.environ[
'HPSMC_DIR']
133 script_dir = os.path.join(hps_mc_dir,
'lib',
'python',
'jobs')
136 for f
in glob.glob(os.path.join(script_dir,
'*_job.py')):
137 script_name = os.path.splitext(os.path.basename(f))[0].replace(
'_job',
'')
141 """! Get path to job script from job name.
143 @return path to job script"""
147 """! Get list of all script names."""
148 return list(self.
scripts.keys())
151 """! Get dict containing paths to scripts sorted by script names."""
155 """! Test if job script exists in dict.
156 @return True if job name is key in job dict"""
162 Primary class to run HPS jobs from a Python script.
164 Jobs are run by executing a series of components
165 which are configured using a config file with parameters
166 provided by a JSON job file.
172 _config_names = ['enable_copy_output_files',
173 'enable_copy_input_files',
178 'ignore_return_codes',
179 'check_output_files',
180 'enable_file_chaining',
185 PTAG_PREFIX =
'ptag:'
189 if 'HPSMC_DIR' not in os.environ:
190 raise Exception(
'HPSMC_DIR is not set in the environ.')
243 Public method for adding components to the job.
245 if isinstance(component, Sequence)
and not isinstance(component, str):
252 Add parameters to the job, overriding values if they exist already.
254 This method can be used in job scripts to define default values.
256 for k, v
in params.items():
258 logger.debug(
"Setting new value '%s' for parameter '%s' with existing value '%s'."
259 % (str(v), str(k), params[k]))
264 Configure the job from command line arguments.
267 parser = argparse.ArgumentParser(description=self.
description)
268 parser.add_argument(
"-c",
"--config-file", help=
"Config file locations", action=
'append')
269 parser.add_argument(
"-d",
"--run-dir", nargs=
'?', help=
"Job run dir")
270 parser.add_argument(
"-o",
"--out", nargs=
'?', help=
"File for component stdout (default prints to console)")
271 parser.add_argument(
"-e",
"--err", nargs=
'?', help=
"File for component stderr (default prints to console)")
272 parser.add_argument(
"-s",
"--job-steps", type=int, default=
None, help=
"Job steps to run (single number)")
273 parser.add_argument(
"-i",
"--job-id", type=int, help=
"Job ID from JSON job store", default=
None)
274 parser.add_argument(
"script", nargs=
'?', help=
"Path or name of job script")
275 parser.add_argument(
"params", nargs=
'?', help=
"Job param file in JSON format")
284 cl = parser.parse_args(self.
args)
288 config_files = list(map(os.path.abspath, cl.config_file))
289 logger.info(
"Reading additional config from: {}".format(config_files))
295 if not os.path.isabs(out_file):
296 out_file = os.path.abspath(out_file)
297 logger.info(
'Job output will be written to: %s' % out_file)
298 self.
out = open(out_file,
'w')
303 if not os.path.isabs(err_file):
304 err_file = os.path.abspath(err_file)
305 logger.info(
'Job error will be written to: %s' % err_file)
306 self.
err = open(err_file,
'w')
309 self.
rundir = os.path.abspath(cl.run_dir)
313 raise Exception(
"Invalid job steps argument (must be > 0): {}".format(self.
job_steps))
319 raise Exception(
'Missing required script name or location.')
328 logger.debug(
"Loading job with ID %d from job store '%s'" % (self.
job_id, self.
param_file))
330 if jobstore.has_job_id(self.
job_id):
331 params = jobstore.get_job(self.
job_id)
333 raise Exception(
"No job id %d was found in the job store '%s'" % (self.
job_id, self.
param_file))
336 logger.info(
'Loading job parameters from file: %s' % self.
param_file)
337 params = json.loads(open(self.
param_file,
'r').read())
338 if not isinstance(params, dict):
339 raise Exception(
'Job ID must be provided when running from a job store.')
345 Load the job parameters from JSON data.
352 if 'output_dir' in self.
params:
356 logger.debug(
"Changed output dir to abs path: %s" % self.
output_dir)
358 if 'job_id' in self.
params:
361 if 'input_files' in self.
params:
364 if 'output_files' in self.
params:
369 Prepare dictionary of input files.
371 If a link to a download location is given as input, the file is downloaded into the run directory before the file name is added to the input_files dict. If a regular file is provided, it is added to the dict without any additional action.
373 input_files_dict = {}
374 for file_key, file_name
in self.
input_files.items():
375 if 'https' in file_key:
376 logger.info(
"Downloading input file from: %s" % file_key)
377 file_name_path = self.
rundir +
"/" + file_name
379 subprocess.check_output([
'wget',
'-q',
'-O', file_name_path, file_key])
380 input_files_dict.update({file_name: file_name})
382 input_files_dict.update({file_key: file_name})
387 Perform basic initialization before the job script is loaded.
390 if not os.path.isabs(self.
rundir):
392 logger.info(
'Changed run dir to abs path: %s' % self.
rundir)
396 if "LSB_JOBID" in os.environ:
397 self.
rundir = os.path.join(
"/scratch", getpass.getuser(), os.environ[
"LSB_JOBID"])
398 logger.info(
'Set run dir for LSF: %s' % self.
rundir)
402 if not os.path.exists(self.
rundir):
403 logger.info(
'Creating run dir: %s' % self.
rundir)
408 Configure job class and components.
412 self.
job_config.config(self, require_section=
False)
421 component.config_logging(self.
job_config.parser)
429 component.config_from_environ()
432 component.check_config()
442 logger.warning(
"No job script was provided!")
445 if not self.
script.endswith(
'.py'):
448 if not script_db.exists(self.
script):
449 raise Exception(
"The script name is not valid: %s" % self.
script)
450 script_path = script_db.get_script_path(self.
script)
451 logger.debug(
"Found script '%s' from name '%s'" % (script_path, self.
script))
456 if not os.path.exists(script_path):
457 raise Exception(
'Job script does not exist: %s' % script_path)
459 logger.info(
'Loading job script: %s' % script_path)
461 exec(compile(open(script_path,
"rb").read(), script_path,
'exec'), {
'job': self})
465 This is the primary execution method for running the job.
468 logger.info(
'Job ID: ' + str(self.
job_id))
481 raise Exception(
"Job has no components to execute.")
484 logger.info(
"Job components loaded: %s" % ([c.name
for c
in self.
components]))
488 logger.info(
"Job parameters loaded: %s" % str(self.
params))
490 logger.info(
"No job parameters were specified!")
515 start_time = time.time()
524 stop_time = time.time()
525 elapsed = stop_time - start_time
526 logger.info(
"Job execution took {} seconds".format(round(elapsed, 4)))
533 logger.warning(
'Copy output files is disabled!')
539 logger.info(
'Successfully finished running job: %s' % self.
description)
543 Execute all components in job.
545 If dry_run is set to True, the components will not be exectuted,
546 list of components will be put out instead.
552 logger.info(
"Executing '%s' with command: %s" % (component.name, component.cmd_line_str()))
556 self.
component_out.write(
'================ Component: %s ================\n' % component.name)
561 self.
component_err.write(
'================ Component: %s ================\n' % component.name)
567 elapsed = end - start
568 logger.info(
"Execution of {} took {} second(s) with return code: {}"
569 .format(component.name, round(elapsed, 4), str(returncode)))
572 raise Exception(
"Non-zero return code %d from '%s'" % (returncode, component.name))
575 for outputfile
in component.output_files():
576 if not os.path.isfile(outputfile):
577 raise Exception(
"Output file '%s' is missing after execution." % outputfile)
580 logger.info(
"Dry run enabled. Components will NOT be executed!")
582 logger.info(
"'%s' with args: %s (DRY RUN)" % (component.name,
' '.join(component.cmd_args())))
586 Necessary setup before job can be executed.
590 logger.info(
'Changing to run dir: %s' % self.
rundir)
600 logger.info(
"Job is limited to first %d steps." % self.
job_steps)
607 logger.debug(
'Setting up component: %s' % (component.name))
608 component.rundir = self.
rundir
611 raise Exception(
"Command '%s' does not exist for '%s'." % (component.command, component.name))
615 Pipe component outputs to inputs automatically.
619 logger.debug(
"Configuring file IO for component '%s' with order %d" % (component. name, i))
621 logger.debug(
"Setting inputs on '%s' to: %s"
622 % (component.name, str(list(self.
input_files.values()))))
623 if not len(component.inputs):
626 logger.debug(
"Setting inputs on '%s' to: %s"
628 if len(component.inputs) == 0:
633 Push JSON job parameters to components.
636 component.set_parameters(self.
params)
640 Perform post-job cleanup.
643 logger.info(
'Running cleanup for component: %s' % str(component.name))
646 logger.info(
'Deleting run dir: %s' % self.
rundir)
647 if os.path.exists(
"%s/__swif_env__" % self.
rundir):
648 for f
in os.listdir(self.
rundir):
649 if (
'.log' not in f)
and (
'__swif_' not in f):
650 os.system(
'rm -r %s' % f)
652 shutil.rmtree(self.
rundir)
657 except Exception
as e:
664 except Exception
as e:
669 Copy output files to output directory, handling ptags if necessary.
673 logger.debug(
'Creating output dir: %s' % self.
output_dir)
679 ptag_src = Job.get_ptag_from_src(src)
680 if ptag_src
in list(self.
ptags.keys()):
681 src_file = self.
ptags[ptag_src]
682 logger.info(
"Resolved ptag: {} -> {}".format(ptag_src, src_file))
684 raise Exception(
'Undefined ptag used in job params: %s' % ptag_src)
689 Copy an output file from src to dest.
692 src_file = os.path.join(self.
rundir, src)
693 dest_file = os.path.join(self.
output_dir, dest)
697 if not os.path.exists(os.path.dirname(dest_file)):
698 os.makedirs(os.path.dirname(dest_file), 0o755)
702 if os.path.exists(dest_file):
703 if os.path.samefile(src_file, dest_file):
707 if os.path.isfile(dest_file):
709 logger.debug(
'Deleting existing file: %s' % dest_file)
712 raise Exception(
'Output file already exists: %s' % dest_file)
715 logger.info(
"Copying '%s' to '%s'" % (src_file, dest_file))
718 shutil.copyfile(src_file, dest_file)
721 if not filecmp.cmp(src_file, dest_file, shallow=
False):
722 raise Exception(
"Copy from '%s' to '%s' failed." % (src_file, dest_file))
724 logger.warning(
"Skipping copy of '%s' to '%s' because they are the same file!" % (src_file, dest_file))
728 Copy input files to the run dir.
734 if os.path.dirname(dest):
735 raise Exception(
"The input file destination '%s' is not valid." % dest)
736 logger.info(
"Copying input file: %s -> %s" % (src, os.path.join(self.
rundir, dest)))
738 src = src.replace(
'/mss/',
'/cache/')
739 if os.path.exists(os.path.join(self.
rundir, dest)):
740 logger.info(
"The input file '%s' already exists at destination '%s'" % (dest, self.
rundir))
741 os.chmod(os.path.join(self.
rundir, dest), 0o666)
743 shutil.copyfile(src, os.path.join(self.
rundir, dest))
744 os.chmod(dest, 0o666)
751 if not os.path.isabs(src):
752 raise Exception(
'The input source file is not an absolute path: %s' % src)
753 if os.path.dirname(dest):
754 raise Exception(
'The input file destination is not valid: %s' % dest)
755 logger.debug(
"Symlinking input '%s' to '%s'" % (src, os.path.join(self.
rundir, dest)))
756 os.symlink(src, os.path.join(self.
rundir, dest))
760 Map a key to an output file name so a user can reference it in their job params.
762 if tag
not in list(self.
ptags.keys()):
763 self.
ptags[tag] = filename
764 logger.info(
"Added ptag %s -> %s" % (tag, filename))
766 raise Exception(
'The ptag already exists: %s' % tag)
770 return src.startswith(Job.PTAG_PREFIX)
774 if src.startswith(Job.PTAG_PREFIX):
775 return src[len(Job.PTAG_PREFIX):]
777 raise Exception(
'File src is not a ptag: %s' % src)
781 return self.
ptags[Job.get_ptag_from_src(src)]
787 Set fieldmap dir to install location if not provided in config
792 raise Exception(
"The fieldmaps dir does not exist: {}".format(self.
hps_fieldmaps_dir))
793 logger.debug(
"Using fieldmap dir from install: {}".format(self.
hps_fieldmaps_dir))
795 logger.debug(
"Using fieldmap dir from config: {}".format(self.
hps_fieldmaps_dir))
799 Symlink to the fieldmap directory
801 fieldmap_symlink = pathlib.Path(os.getcwd(),
"fieldmap")
802 if not fieldmap_symlink.exists():
803 logger.debug(
"Creating symlink to fieldmap directory: {}".format(fieldmap_symlink))
806 if fieldmap_symlink.is_dir()
or os.path.islink(fieldmap_symlink):
807 logger.debug(
"Fieldmap symlink or directory already exists: {}".format(fieldmap_symlink))
809 raise Exception(
"A file called 'fieldmap' exists but it is not a symlink or directory!")
813 'run':
'Run a job script',
814 'script':
'Show list of available job scripts (provide script name for detailed info)',
815 'component':
'Show list of available components (provide component name for detailed info)'}
819 print(
"Usage: job.py [command] [args]")
821 for name, descr
in cmds.items():
822 print(
" %s: %s" % (name, descr))
825if __name__ ==
'__main__':
826 if len(sys.argv) > 1:
828 if cmd
not in list(cmds.keys()):
830 raise Exception(
'The job command is not valid: %s' % cmd)
836 elif cmd ==
'script':
837 if len(sys.argv) > 2:
840 print_job_script(script)
843 print(
"AVAILABLE JOB SCRIPTS: ")
844 for name
in sorted(scriptdb.get_script_names()):
845 print(
' %s: %s' % (name, scriptdb.get_script_path(name)))
846 elif cmd ==
'component':
847 if len(sys.argv) > 2:
849 component_name = sys.argv[2]
850 print_component(component_name)
Wrapper for accessing config information from parser.
config(self, obj, section=None, required_names=[], allowed_names=[], require_section=True)
Push config into an object by setting an attribute.
exists(self, name)
Test if job script exists in dict.
get_scripts(self)
Get dict containing paths to scripts sorted by script names.
scripts
dict of paths to job scripts sorted by name
get_script_path(self, name)
Get path to job script from job name.
get_script_names(self)
Get list of all script names.
Simple JSON based store of job data.
__init__(self, path=None)
load(self, json_store)
Load raw JSON data into this job store.
has_job_id(self, job_id)
Return true if the job ID exists in the store.
get_job_ids(self)
Get a sorted list of job IDs.
get_job_data(self)
Get the raw dict containing all the job data.
get_job(self, job_id)
Get a job by its job ID.
Primary class to run HPS jobs from a Python script.
hps_fieldmaps_dir
fieldmaps dir
run(self)
This is the primary execution method for running the job.
parse_args(self)
Configure the job from command line arguments.
_copy_output_file(self, src, dest)
Copy an output file from src to dest.
component_err
output for component error messages
_copy_output_files(self)
Copy output files to output directory, handling ptags if necessary.
description
short description of job, should be overridden by the job script
input_files
dict of input files
_config_fieldmap_dir(self)
Set fieldmap dir to install location if not provided in config.
components
list of components in job
add(self, component)
Public method for adding components to the job.
resolve_output_src(self, src)
job_config
Job configuration.
_config_file_pipeline(self)
Pipe component outputs to inputs automatically.
_symlink_fieldmap_dir(self)
Symlink to the fieldmap directory.
param_file
path to parameter file
script
script containing component initializations
_load_script(self)
Load the job script.
_copy_input_files(self)
Copy input files to the run dir.
_cleanup(self)
Perform post-job cleanup.
rundir
rundir is current working directory
_setup(self)
Necessary setup before job can be executed.
_initialize(self)
Perform basic initialization before the job script is loaded.
output_files
dict of output files
set_parameters(self, params)
Add parameters to the job, overriding values if they exist already.
__init__(self, args=sys.argv, **kwargs)
args
(passed) job arguments
component_out
output for component printouts
_load_params(self, params)
Load the job parameters from JSON data.
ptag(self, tag, filename)
Map a key to an output file name so a user can reference it in their job params.
_execute(self)
Execute all components in job.
_set_parameters(self)
Push JSON job parameters to components.
_configure(self)
Configure job class and components.
enable_copy_output_files
These attributes can all be set in the config file.
_symlink_input_files(self)
Symlink input files.
output_dir
output_dir is current working directory
_set_input_files(self)
Prepare dictionary of input files.
ptags
dict with keys to output filenames
Utility script for printing help about component classes.