2Primary class for running and managing HPSMC jobs defined by a set of components.
19from collections.abc
import Sequence
20from os.path
import expanduser
23from hpsmc
import global_config
26logger = logging.getLogger(
'hpsmc.job')
30 """! Wrapper for accessing config information from parser."""
33 self.
parser = copy.copy(global_config)
36 parser_lines = [
'Job configuration:']
37 for section
in self.
parser.sections():
38 parser_lines.append(
"[" + section +
"]")
39 for i, v
in self.
parser.items(section):
40 parser_lines.append(
"%s=%s" % (i, v))
41 parser_lines.append(
'')
42 return '\n'.join(parser_lines)
44 def config(self, obj, section=None, required_names=[], allowed_names=[], require_section=True):
45 """! Push config into an object by setting an attribute.
46 @param obj object config is pushed to
47 @param section object name (basically name of the component the config is pushed to?)
48 @param required_names list of names of required configurations
49 @param allowed_names list of allowed config names
52 section = obj.__class__.__name__
53 if self.
parser.has_section(section):
55 for req
in required_names:
56 if req
not in dict(self.
parser.items(section)):
57 raise Exception(
"Missing required config '%s'" % req)
59 for name, value
in self.
parser.items(section):
60 if len(allowed_names)
and name
not in allowed_names:
61 raise Exception(
"Config name '%s' is not allowed for '%s'" % (name, section))
62 setattr(obj, name, convert_config_value(value))
68 raise Exception(
"Missing required config section '%s'" % section)
70 logger.warning(
'Config section not found: %s' % section)
75 Simple JSON based store of job data.
84 logger.warning(
'Path was not provided to job store - no jobs loaded!')
86 def load(self, json_store):
87 """! Load raw JSON data into this job store.
88 @param json_store json file containing raw job data
90 if not os.path.exists(json_store):
91 raise Exception(
'JSON job store does not exist: {}'.format(json_store))
92 with open(json_store,
'r')
as f:
93 json_data = json.loads(f.read())
94 for job_data
in json_data:
95 if 'job_id' not in job_data:
96 raise Exception(f
"Job data is missing a job id")
97 job_id = int(job_data[
'job_id'])
98 if job_id
in self.
data:
99 raise Exception(f
"Duplicate job id: {job_id}")
100 self.
data[job_id] = job_data
101 logger.debug(
"Loaded job {} data:\n {}".format(job_id, job_data))
102 logger.info(f
"Successfully loaded {len(self.data)} jobs from: {json_store}")
105 """! Get a job by its job ID.
109 raise Exception(f
"Job ID does not exist: {job_id}")
110 return self.
data[int(job_id)]
113 """! Get the raw dict containing all the job data."""
117 """! Get a sorted list of job IDs."""
118 return sorted(self.
data.keys())
121 """! Return true if the job ID exists in the store."""
122 return int(job_id)
in list(self.
data.keys())
126 """! Database of job scripts.
130 if 'HPSMC_DIR' not in os.environ:
131 raise Exception(
'HPSMC_DIR is not set in the environ.')
132 hps_mc_dir = os.environ[
'HPSMC_DIR']
133 script_dir = os.path.join(hps_mc_dir,
'lib',
'python',
'jobs')
136 for f
in glob.glob(os.path.join(script_dir,
'*_job.py')):
137 script_name = os.path.splitext(os.path.basename(f))[0].replace(
'_job',
'')
141 """! Get path to job script from job name.
143 @return path to job script"""
147 """! Get list of all script names."""
148 return list(self.
scripts.keys())
151 """! Get dict containing paths to scripts sorted by script names."""
155 """! Test if job script exists in dict.
156 @return True if job name is key in job dict"""
162 Primary class to run HPS jobs from a Python script.
164 Jobs are run by executing a series of components
165 which are configured using a config file with parameters
166 provided by a JSON job file.
172 _config_names = ['enable_copy_output_files',
173 'enable_copy_input_files',
178 'ignore_return_codes',
179 'check_output_files',
180 'enable_file_chaining',
185 PTAG_PREFIX =
'ptag:'
189 if 'HPSMC_DIR' not in os.environ:
190 raise Exception(
'HPSMC_DIR is not set in the environ.')
243 Public method for adding components to the job.
245 if isinstance(component, Sequence)
and not isinstance(component, str):
252 Add parameters to the job, overriding values if they exist already.
254 This method can be used in job scripts to define default values.
256 for k, v
in params.items():
258 logger.debug(
"Setting new value '%s' for parameter '%s' with existing value '%s'."
259 % (str(v), str(k), params[k]))
264 Configure the job from command line arguments.
267 parser = argparse.ArgumentParser(description=self.
description)
268 parser.add_argument(
"-c",
"--config-file", help=
"Config file locations", action=
'append')
269 parser.add_argument(
"-d",
"--run-dir", nargs=
'?', help=
"Job run dir")
270 parser.add_argument(
"-o",
"--out", nargs=
'?', help=
"File for component stdout (default prints to console)")
271 parser.add_argument(
"-e",
"--err", nargs=
'?', help=
"File for component stderr (default prints to console)")
272 parser.add_argument(
"-s",
"--job-steps", type=int, default=
None, help=
"Job steps to run (single number)")
273 parser.add_argument(
"-i",
"--job-id", type=int, help=
"Job ID from JSON job store", default=
None)
274 parser.add_argument(
"script", nargs=
'?', help=
"Path or name of job script")
275 parser.add_argument(
"params", nargs=
'?', help=
"Job param file in JSON format")
284 cl = parser.parse_args(self.
args)
288 config_files = list(map(os.path.abspath, cl.config_file))
289 logger.info(
"Reading additional config from: {}".format(config_files))
295 if not os.path.isabs(out_file):
296 out_file = os.path.abspath(out_file)
297 logger.info(
'Job output will be written to: %s' % out_file)
303 if not os.path.isabs(err_file):
304 err_file = os.path.abspath(err_file)
305 logger.info(
'Job error will be written to: %s' % err_file)
309 self.
rundir = os.path.abspath(cl.run_dir)
313 raise Exception(
"Invalid job steps argument (must be > 0): {}".format(self.
job_steps))
319 raise Exception(
'Missing required script name or location.')
325 if cl.job_id
is not None:
328 logger.debug(
"Loading job with ID %d from job store '%s'" % (self.
job_id, self.
param_file))
330 if jobstore.has_job_id(self.
job_id):
331 params = jobstore.get_job(self.
job_id)
333 raise Exception(
"No job id %d was found in the job store '%s'" % (self.
job_id, self.
param_file))
336 logger.info(
'Loading job parameters from file: %s' % self.
param_file)
337 params = json.loads(open(self.
param_file,
'r').read())
338 if isinstance(params, list):
340 if not isinstance(params, dict):
341 raise Exception(
'Job ID must be provided when running from a job store.')
347 Load the job parameters from JSON data.
354 if 'output_dir' in self.
params:
358 logger.debug(
"Changed output dir to abs path: %s" % self.
output_dir)
360 if 'job_id' in self.
params:
363 if 'input_files' in self.
params:
366 if 'output_files' in self.
params:
371 Prepare dictionary of input files.
373 If a link to a download location is given as input, the file is downloaded into the run directory before the file name is added to the input_files dict. If a regular file is provided, it is added to the dict without any additional action.
375 input_files_dict = {}
376 for file_key, file_name
in self.
input_files.items():
377 if 'https' in file_key:
378 logger.info(
"Downloading input file from: %s" % file_key)
379 file_name_path = self.
rundir +
"/" + file_name
381 subprocess.check_output([
'wget',
'-q',
'-O', file_name_path, file_key])
382 input_files_dict.update({file_name: file_name})
384 input_files_dict.update({file_key: file_name})
389 Perform basic initialization before the job script is loaded.
392 if not os.path.isabs(self.
rundir):
394 logger.info(
'Changed run dir to abs path: %s' % self.
rundir)
398 if "LSB_JOBID" in os.environ:
399 self.
rundir = os.path.join(
"/scratch", getpass.getuser(), os.environ[
"LSB_JOBID"])
400 logger.info(
'Set run dir for LSF: %s' % self.
rundir)
404 if not os.path.exists(self.
rundir):
405 logger.info(
'Creating run dir: %s' % self.
rundir)
410 Configure job class and components.
414 self.
job_config.config(self, require_section=
False)
423 component.config_logging(self.
job_config.parser)
431 component.config_from_environ()
434 component.check_config()
444 logger.warning(
"No job script was provided!")
447 if not self.
script.endswith(
'.py'):
450 if not script_db.exists(self.
script):
451 raise Exception(
"The script name is not valid: %s" % self.
script)
452 script_path = script_db.get_script_path(self.
script)
453 logger.debug(
"Found script '%s' from name '%s'" % (script_path, self.
script))
458 if not os.path.exists(script_path):
459 raise Exception(
'Job script does not exist: %s' % script_path)
461 logger.info(
'Loading job script: %s' % script_path)
463 exec(compile(open(script_path,
"rb").read(), script_path,
'exec'), {
'job': self})
467 This is the primary execution method for running the job.
470 logger.info(
'Job ID: ' + str(self.
job_id))
483 raise Exception(
"Job has no components to execute.")
486 logger.info(
"Job components loaded: %s" % ([c.name
for c
in self.
components]))
490 logger.info(
"Job parameters loaded: %s" % str(self.
params))
492 logger.info(
"No job parameters were specified!")
517 start_time = time.time()
526 stop_time = time.time()
527 elapsed = stop_time - start_time
528 logger.info(
"Job execution took {} seconds".format(round(elapsed, 4)))
535 logger.warning(
'Copy output files is disabled!')
541 logger.info(
'Successfully finished running job: %s' % self.
description)
545 Execute all components in job.
547 If dry_run is set to True, the components will not be exectuted,
548 list of components will be put out instead.
554 logger.info(
"Executing '%s' with command: %s" % (component.name, component.cmd_line_str()))
558 self.
component_out.write(
'================ Component: %s ================\n' % component.name)
563 self.
component_err.write(
'================ Component: %s ================\n' % component.name)
569 elapsed = end - start
570 logger.info(
"Execution of {} took {} second(s) with return code: {}"
571 .format(component.name, round(elapsed, 4), str(returncode)))
574 raise Exception(
"Non-zero return code %d from '%s'" % (returncode, component.name))
577 for outputfile
in component.output_files():
578 if not os.path.isfile(outputfile):
579 raise Exception(
"Output file '%s' is missing after execution." % outputfile)
582 logger.info(
"Dry run enabled. Components will NOT be executed!")
584 logger.info(
"'%s' with args: %s (DRY RUN)" % (component.name,
' '.join(component.cmd_args())))
588 Necessary setup before job can be executed.
592 logger.info(
'Changing to run dir: %s' % self.
rundir)
602 logger.info(
"Job is limited to first %d steps." % self.
job_steps)
609 logger.debug(
'Setting up component: %s' % (component.name))
610 component.rundir = self.
rundir
613 raise Exception(
"Command '%s' does not exist for '%s'." % (component.command, component.name))
617 Pipe component outputs to inputs automatically.
621 logger.debug(
"Configuring file IO for component '%s' with order %d" % (component. name, i))
623 logger.debug(
"Setting inputs on '%s' to: %s"
624 % (component.name, str(list(self.
input_files.values()))))
625 if not len(component.inputs):
628 logger.debug(
"Setting inputs on '%s' to: %s"
630 if len(component.inputs) == 0:
635 Push JSON job parameters to components.
638 component.set_parameters(self.
params)
642 Perform post-job cleanup.
645 logger.info(
'Running cleanup for component: %s' % str(component.name))
648 logger.info(
'Deleting run dir: %s' % self.
rundir)
649 if os.path.exists(
"%s/__swif_env__" % self.
rundir):
650 for f
in os.listdir(self.
rundir):
651 if (
'.log' not in f)
and (
'__swif_' not in f):
652 os.system(
'rm -r %s' % f)
654 shutil.rmtree(self.
rundir)
659 except Exception
as e:
666 except Exception
as e:
671 Copy output files to output directory, handling ptags if necessary.
675 logger.debug(
'Creating output dir: %s' % self.
output_dir)
681 ptag_src = Job.get_ptag_from_src(src)
682 if ptag_src
in list(self.
ptags.keys()):
683 src_file = self.
ptags[ptag_src]
684 logger.info(
"Resolved ptag: {} -> {}".format(ptag_src, src_file))
686 raise Exception(
'Undefined ptag used in job params: %s' % ptag_src)
691 Copy an output file from src to dest.
694 src_file = os.path.join(self.
rundir, src)
695 dest_file = os.path.join(self.
output_dir, dest)
699 if not os.path.exists(os.path.dirname(dest_file)):
700 os.makedirs(os.path.dirname(dest_file), 0o755)
704 if os.path.exists(dest_file):
705 if os.path.samefile(src_file, dest_file):
709 if os.path.isfile(dest_file):
711 logger.debug(
'Deleting existing file: %s' % dest_file)
714 raise Exception(
'Output file already exists: %s' % dest_file)
717 logger.info(
"Copying '%s' to '%s'" % (src_file, dest_file))
720 shutil.copyfile(src_file, dest_file)
723 if not filecmp.cmp(src_file, dest_file, shallow=
False):
724 raise Exception(
"Copy from '%s' to '%s' failed." % (src_file, dest_file))
726 logger.warning(
"Skipping copy of '%s' to '%s' because they are the same file!" % (src_file, dest_file))
730 Copy input files to the run dir.
736 if os.path.dirname(dest):
737 raise Exception(
"The input file destination '%s' is not valid." % dest)
738 logger.info(
"Copying input file: %s -> %s" % (src, os.path.join(self.
rundir, dest)))
740 src = src.replace(
'/mss/',
'/cache/')
741 if os.path.exists(os.path.join(self.
rundir, dest)):
742 logger.info(
"The input file '%s' already exists at destination '%s'" % (dest, self.
rundir))
743 os.chmod(os.path.join(self.
rundir, dest), 0o666)
745 shutil.copyfile(src, os.path.join(self.
rundir, dest))
746 os.chmod(dest, 0o666)
753 if not os.path.isabs(src):
754 raise Exception(
'The input source file is not an absolute path: %s' % src)
755 if os.path.dirname(dest):
756 raise Exception(
'The input file destination is not valid: %s' % dest)
757 logger.debug(
"Symlinking input '%s' to '%s'" % (src, os.path.join(self.
rundir, dest)))
758 os.symlink(src, os.path.join(self.
rundir, dest))
762 Map a key to an output file name so a user can reference it in their job params.
764 if tag
not in list(self.
ptags.keys()):
765 self.
ptags[tag] = filename
766 logger.info(
"Added ptag %s -> %s" % (tag, filename))
768 raise Exception(
'The ptag already exists: %s' % tag)
772 return src.startswith(Job.PTAG_PREFIX)
776 if src.startswith(Job.PTAG_PREFIX):
777 return src[len(Job.PTAG_PREFIX):]
779 raise Exception(
'File src is not a ptag: %s' % src)
783 return self.
ptags[Job.get_ptag_from_src(src)]
789 Set fieldmap dir to install location if not provided in config
794 raise Exception(
"The fieldmaps dir does not exist: {}".format(self.
hps_fieldmaps_dir))
795 logger.debug(
"Using fieldmap dir from install: {}".format(self.
hps_fieldmaps_dir))
797 logger.debug(
"Using fieldmap dir from config: {}".format(self.
hps_fieldmaps_dir))
801 Symlink to the fieldmap directory
803 fieldmap_symlink = pathlib.Path(os.getcwd(),
"fieldmap")
804 if not fieldmap_symlink.exists():
805 logger.debug(
"Creating symlink to fieldmap directory: {}".format(fieldmap_symlink))
808 if fieldmap_symlink.is_dir()
or os.path.islink(fieldmap_symlink):
809 logger.debug(
"Fieldmap symlink or directory already exists: {}".format(fieldmap_symlink))
811 raise Exception(
"A file called 'fieldmap' exists but it is not a symlink or directory!")
815 'run':
'Run a job script',
816 'script':
'Show list of available job scripts (provide script name for detailed info)',
817 'component':
'Show list of available components (provide component name for detailed info)'}
821 print(
"Usage: job.py [command] [args]")
823 for name, descr
in cmds.items():
824 print(
" %s: %s" % (name, descr))
827if __name__ ==
'__main__':
828 if len(sys.argv) > 1:
830 if cmd
not in list(cmds.keys()):
832 raise Exception(
'The job command is not valid: %s' % cmd)
838 elif cmd ==
'script':
839 if len(sys.argv) > 2:
842 print_job_script(script)
845 print(
"AVAILABLE JOB SCRIPTS: ")
846 for name
in sorted(scriptdb.get_script_names()):
847 print(
' %s: %s' % (name, scriptdb.get_script_path(name)))
848 elif cmd ==
'component':
849 if len(sys.argv) > 2:
851 component_name = sys.argv[2]
852 print_component(component_name)
Wrapper for accessing config information from parser.
config(self, obj, section=None, required_names=[], allowed_names=[], require_section=True)
Push config into an object by setting an attribute.
exists(self, name)
Test if job script exists in dict.
get_scripts(self)
Get dict containing paths to scripts sorted by script names.
scripts
dict of paths to job scripts sorted by name
get_script_path(self, name)
Get path to job script from job name.
get_script_names(self)
Get list of all script names.
Simple JSON based store of job data.
__init__(self, path=None)
load(self, json_store)
Load raw JSON data into this job store.
has_job_id(self, job_id)
Return true if the job ID exists in the store.
get_job_ids(self)
Get a sorted list of job IDs.
get_job_data(self)
Get the raw dict containing all the job data.
get_job(self, job_id)
Get a job by its job ID.
Primary class to run HPS jobs from a Python script.
hps_fieldmaps_dir
fieldmaps dir
run(self)
This is the primary execution method for running the job.
parse_args(self)
Configure the job from command line arguments.
_copy_output_file(self, src, dest)
Copy an output file from src to dest.
component_err
output for component error messages
_copy_output_files(self)
Copy output files to output directory, handling ptags if necessary.
description
short description of job, should be overridden by the job script
input_files
dict of input files
_config_fieldmap_dir(self)
Set fieldmap dir to install location if not provided in config.
components
list of components in job
add(self, component)
Public method for adding components to the job.
resolve_output_src(self, src)
job_config
Job configuration.
_config_file_pipeline(self)
Pipe component outputs to inputs automatically.
_symlink_fieldmap_dir(self)
Symlink to the fieldmap directory.
param_file
path to parameter file
script
script containing component initializations
_load_script(self)
Load the job script.
_copy_input_files(self)
Copy input files to the run dir.
_cleanup(self)
Perform post-job cleanup.
rundir
rundir is current working directory
_setup(self)
Necessary setup before job can be executed.
_initialize(self)
Perform basic initialization before the job script is loaded.
output_files
dict of output files
set_parameters(self, params)
Add parameters to the job, overriding values if they exist already.
__init__(self, args=sys.argv, **kwargs)
args
(passed) job arguments
component_out
output for component printouts
_load_params(self, params)
Load the job parameters from JSON data.
ptag(self, tag, filename)
Map a key to an output file name so a user can reference it in their job params.
_execute(self)
Execute all components in job.
_set_parameters(self)
Push JSON job parameters to components.
_configure(self)
Configure job class and components.
enable_copy_output_files
These attributes can all be set in the config file.
_symlink_input_files(self)
Symlink input files.
output_dir
output_dir is current working directory
_set_input_files(self)
Prepare dictionary of input files.
ptags
dict with keys to output filenames
Utility script for printing help about component classes.