2 Primary class for running and managing HPSMC jobs defined by a set of components.
19 from collections.abc
import Sequence
20 from os.path
import expanduser
23 from hpsmc
import global_config
26 logger = logging.getLogger(
'hpsmc.job')
30 """! Wrapper for accessing config information from parser."""
33 self.
parserparser = copy.copy(global_config)
36 parser_lines = [
'Job configuration:']
37 for section
in self.
parserparser.sections():
38 parser_lines.append(
"[" + section +
"]")
39 for i, v
in self.
parserparser.items(section):
40 parser_lines.append(
"%s=%s" % (i, v))
41 parser_lines.append(
'')
42 return '\n'.join(parser_lines)
44 def config(self, obj, section=None, required_names=[], allowed_names=[], require_section=True):
45 """! Push config into an object by setting an attribute.
46 @param obj object config is pushed to
47 @param section object name (basically name of the component the config is pushed to?)
48 @param required_names list of names of required configurations
49 @param allowed_names list of allowed config names
52 section = obj.__class__.__name__
53 if self.
parserparser.has_section(section):
55 for req
in required_names:
56 if req
not in dict(self.
parserparser.items(section)):
57 raise Exception(
"Missing required config '%s'" % req)
59 for name, value
in self.
parserparser.items(section):
60 if len(allowed_names)
and name
not in allowed_names:
61 raise Exception(
"Config name '%s' is not allowed for '%s'" % (name, section))
68 raise Exception(
"Missing required config section '%s'" % section)
70 logger.warning(
'Config section not found: %s' % section)
75 Simple JSON based store of job data.
84 logger.warning(
'Path was not provided to job store - no jobs loaded!')
86 def load(self, json_store):
87 """! Load raw JSON data into this job store.
88 @param json_store json file containing raw job data
90 if not os.path.exists(json_store):
91 raise Exception(
'JSON job store does not exist: {}'.format(json_store))
92 with open(json_store,
'r')
as f:
93 json_data = json.loads(f.read())
94 for job_data
in json_data:
95 if 'job_id' not in job_data:
96 raise Exception(f
"Job data is missing a job id")
97 job_id = int(job_data[
'job_id'])
98 if job_id
in self.
datadata:
99 raise Exception(f
"Duplicate job id: {job_id}")
100 self.
datadata[job_id] = job_data
101 logger.debug(
"Loaded job {} data:\n {}".format(job_id, job_data))
102 logger.info(f
"Successfully loaded {len(self.data)} jobs from: {json_store}")
105 """! Get a job by its job ID.
109 raise Exception(f
"Job ID does not exist: {job_id}")
110 return self.
datadata[int(job_id)]
113 """! Get the raw dict containing all the job data."""
117 """! Get a sorted list of job IDs."""
118 return sorted(self.
datadata.keys())
121 """! Return true if the job ID exists in the store."""
122 return int(job_id)
in list(self.
datadata.keys())
126 """! Database of job scripts.
130 if 'HPSMC_DIR' not in os.environ:
131 raise Exception(
'HPSMC_DIR is not set in the environ.')
132 hps_mc_dir = os.environ[
'HPSMC_DIR']
133 script_dir = os.path.join(hps_mc_dir,
'lib',
'python',
'jobs')
136 for f
in glob.glob(os.path.join(script_dir,
'*_job.py')):
137 script_name = os.path.splitext(os.path.basename(f))[0].replace(
'_job',
'')
138 self.
scriptsscripts[script_name] = f
141 """! Get path to job script from job name.
143 @return path to job script"""
144 return self.
scriptsscripts[name]
147 """! Get list of all script names."""
148 return list(self.
scriptsscripts.keys())
151 """! Get dict containing paths to scripts sorted by script names."""
155 """! Test if job script exists in dict.
156 @return True if job name is key in job dict"""
157 return name
in self.
scriptsscripts
162 Primary class to run HPS jobs from a Python script.
164 Jobs are run by executing a series of components
165 which are configured using a config file with parameters
166 provided by a JSON job file.
172 _config_names = ['enable_copy_output_files',
173 'enable_copy_input_files',
177 'ignore_return_codes',
178 'check_output_files',
179 'enable_file_chaining',
184 PTAG_PREFIX =
'ptag:'
188 if 'HPSMC_DIR' not in os.environ:
189 raise Exception(
'HPSMC_DIR is not set in the environ.')
241 Public method for adding components to the job.
243 if isinstance(component, Sequence)
and not isinstance(component, str):
250 Add parameters to the job, overriding values if they exist already.
252 This method can be used in job scripts to define default values.
254 for k, v
in params.items():
255 if k
in self.
paramsparams:
256 logger.debug(
"Setting new value '%s' for parameter '%s' with existing value '%s'."
257 % (str(v), str(k), params[k]))
262 Configure the job from command line arguments.
265 parser = argparse.ArgumentParser(description=self.
descriptiondescription)
266 parser.add_argument(
"-c",
"--config-file", help=
"Config file locations", action=
'append')
267 parser.add_argument(
"-d",
"--run-dir", nargs=
'?', help=
"Job run dir")
268 parser.add_argument(
"-o",
"--out", nargs=
'?', help=
"File for component stdout (default prints to console)")
269 parser.add_argument(
"-e",
"--err", nargs=
'?', help=
"File for component stderr (default prints to console)")
270 parser.add_argument(
"-s",
"--job-steps", type=int, default=
None, help=
"Job steps to run (single number)")
271 parser.add_argument(
"-i",
"--job-id", type=int, help=
"Job ID from JSON job store", default=
None)
272 parser.add_argument(
"script", nargs=
'?', help=
"Path or name of job script")
273 parser.add_argument(
"params", nargs=
'?', help=
"Job param file in JSON format")
282 cl = parser.parse_args(self.
argsargs)
286 config_files = list(map(os.path.abspath, cl.config_file))
287 logger.info(
"Reading additional config from: {}".format(config_files))
288 self.
job_configjob_config.parser.read(config_files)
293 if not os.path.isabs(out_file):
294 out_file = os.path.abspath(out_file)
295 logger.info(
'Job output will be written to: %s' % out_file)
296 self.
outout = open(out_file,
'w')
301 if not os.path.isabs(err_file):
302 err_file = os.path.abspath(err_file)
303 logger.info(
'Job error will be written to: %s' % err_file)
304 self.
errerr = open(err_file,
'w')
307 self.
rundirrundir = os.path.abspath(cl.run_dir)
311 raise Exception(
"Invalid job steps argument (must be > 0): {}".format(self.
job_stepsjob_steps))
315 self.
scriptscript = cl.script
317 raise Exception(
'Missing required script name or location.')
321 self.
param_fileparam_file = os.path.abspath(cl.params)
325 self.
job_idjob_id = cl.job_id
326 logger.debug(
"Loading job with ID %d from job store '%s'" % (self.
job_idjob_id, self.
param_fileparam_file))
328 if jobstore.has_job_id(self.
job_idjob_id):
329 params = jobstore.get_job(self.
job_idjob_id)
331 raise Exception(
"No job id %d was found in the job store '%s'" % (self.
job_idjob_id, self.
param_fileparam_file))
334 logger.info(
'Loading job parameters from file: %s' % self.
param_fileparam_file)
335 params = json.loads(open(self.
param_fileparam_file,
'r').read())
336 if not isinstance(params, dict):
337 raise Exception(
'Job ID must be provided when running from a job store.')
343 Load the job parameters from JSON data.
350 if 'output_dir' in self.
paramsparams:
352 if not os.path.isabs(self.
output_diroutput_dir):
354 logger.debug(
"Changed output dir to abs path: %s" % self.
output_diroutput_dir)
356 if 'job_id' in self.
paramsparams:
359 if 'input_files' in self.
paramsparams:
362 if 'output_files' in self.
paramsparams:
367 Prepare dictionary of input files.
369 If a link to a download location is given as input, the file is downloaded into the run directory before the file name is added to the input_files dict. If a regular file is provided, it is added to the dict without any additional action.
371 input_files_dict = {}
372 for file_key, file_name
in self.
input_filesinput_files.items():
373 if 'https' in file_key:
374 logger.info(
"Downloading input file from: %s" % file_key)
375 file_name_path = self.
rundirrundir +
"/" + file_name
377 subprocess.check_output([
'wget',
'-q',
'-O', file_name_path, file_key])
378 input_files_dict.update({file_name: file_name})
380 input_files_dict.update({file_key: file_name})
385 Perform basic initialization before the job script is loaded.
388 if not os.path.isabs(self.
rundirrundir):
390 logger.info(
'Changed run dir to abs path: %s' % self.
rundirrundir)
394 if "LSB_JOBID" in os.environ:
395 self.
rundirrundir = os.path.join(
"/scratch", getpass.getuser(), os.environ[
"LSB_JOBID"])
396 logger.info(
'Set run dir for LSF: %s' % self.
rundirrundir)
400 if not os.path.exists(self.
rundirrundir):
401 logger.info(
'Creating run dir: %s' % self.
rundirrundir)
402 os.makedirs(self.
rundirrundir)
406 Configure job class and components.
419 component.config_logging(self.
job_configjob_config.parser)
422 component.config(self.
job_configjob_config.parser)
427 component.config_from_environ()
430 component.check_config()
439 if self.
scriptscript
is None:
440 logger.warning(
"No job script was provided!")
443 if not self.
scriptscript.endswith(
'.py'):
446 if not script_db.exists(self.
scriptscript):
447 raise Exception(
"The script name is not valid: %s" % self.
scriptscript)
448 script_path = script_db.get_script_path(self.
scriptscript)
449 logger.debug(
"Found script '%s' from name '%s'" % (script_path, self.
scriptscript))
452 script_path = self.
scriptscript
454 if not os.path.exists(script_path):
455 raise Exception(
'Job script does not exist: %s' % script_path)
457 logger.info(
'Loading job script: %s' % script_path)
459 exec(compile(open(script_path,
"rb").read(), script_path,
'exec'), {
'job': self})
463 This is the primary execution method for running the job.
466 logger.info(
'Job ID: ' + str(self.
job_idjob_id))
467 logger.info(
'Description: %s' % self.
descriptiondescription)
479 raise Exception(
"Job has no components to execute.")
482 logger.info(
"Job components loaded: %s" % ([c.name
for c
in self.
componentscomponents]))
485 if len(self.
paramsparams) > 0:
486 logger.info(
"Job parameters loaded: %s" % str(self.
paramsparams))
488 logger.info(
"No job parameters were specified!")
513 start_time = time.time()
522 stop_time = time.time()
523 elapsed = stop_time - start_time
524 logger.info(
"Job execution took {} seconds".format(round(elapsed, 4)))
531 logger.warning(
'Copy output files is disabled!')
536 logger.info(
'Successfully finished running job: %s' % self.
descriptiondescription)
540 Execute all components in job.
542 If dry_run is set to True, the components will not be exectuted,
543 list of components will be put out instead.
549 logger.info(
"Executing '%s' with command: %s" % (component.name, component.cmd_line_str()))
553 self.
component_outcomponent_out.write(
'================ Component: %s ================\n' % component.name)
558 self.
component_errcomponent_err.write(
'================ Component: %s ================\n' % component.name)
564 elapsed = end - start
565 logger.info(
"Execution of {} took {} second(s) with return code: {}"
566 .format(component.name, round(elapsed, 4), str(returncode)))
569 raise Exception(
"Non-zero return code %d from '%s'" % (returncode, component.name))
572 for outputfile
in component.output_files():
573 if not os.path.isfile(outputfile):
574 raise Exception(
"Output file '%s' is missing after execution." % outputfile)
577 logger.info(
"Dry run enabled. Components will NOT be executed!")
579 logger.info(
"'%s' with args: %s (DRY RUN)" % (component.name,
' '.join(component.cmd_args())))
583 Necessary setup before job can be executed.
587 logger.info(
'Changing to run dir: %s' % self.
rundirrundir)
588 os.chdir(self.
rundirrundir)
597 logger.info(
"Job is limited to first %d steps." % self.
job_stepsjob_steps)
604 logger.debug(
'Setting up component: %s' % (component.name))
605 component.rundir = self.
rundirrundir
607 if self.
check_commandscheck_commands
and not component.cmd_exists():
608 raise Exception(
"Command '%s' does not exist for '%s'." % (component.command, component.name))
612 Pipe component outputs to inputs automatically.
614 for i
in range(0, len(self.
componentscomponents)):
616 logger.debug(
"Configuring file IO for component '%s' with order %d" % (component. name, i))
618 logger.debug(
"Setting inputs on '%s' to: %s"
619 % (component.name, str(list(self.
input_filesinput_files.values()))))
620 if not len(component.inputs):
621 component.inputs = list(self.
input_filesinput_files.values())
623 logger.debug(
"Setting inputs on '%s' to: %s"
625 if len(component.inputs) == 0:
630 Push JSON job parameters to components.
633 component.set_parameters(self.
paramsparams)
637 Perform post-job cleanup.
640 logger.info(
'Running cleanup for component: %s' % str(component.name))
643 logger.info(
'Deleting run dir: %s' % self.
rundirrundir)
644 if os.path.exists(
"%s/__swif_env__" % self.
rundirrundir):
645 for f
in os.listdir(self.
rundirrundir):
646 if (
'.log' not in f)
and (
'__swif_' not in f):
647 os.system(
'rm -r %s' % f)
649 shutil.rmtree(self.
rundirrundir)
654 except Exception
as e:
661 except Exception
as e:
666 Copy output files to output directory, handling ptags if necessary.
669 if not os.path.exists(self.
output_diroutput_dir):
670 logger.debug(
'Creating output dir: %s' % self.
output_diroutput_dir)
676 ptag_src = Job.get_ptag_from_src(src)
677 if ptag_src
in list(self.
ptagsptags.keys()):
678 src_file = self.
ptagsptags[ptag_src]
679 logger.info(
"Resolved ptag: {} -> {}".format(ptag_src, src_file))
681 raise Exception(
'Undefined ptag used in job params: %s' % ptag_src)
686 Copy an output file from src to dest.
689 src_file = os.path.join(self.
rundirrundir, src)
690 dest_file = os.path.join(self.
output_diroutput_dir, dest)
694 if not os.path.exists(os.path.dirname(dest_file)):
695 os.makedirs(os.path.dirname(dest_file), 0o755)
699 if os.path.exists(dest_file):
700 if os.path.samefile(src_file, dest_file):
704 if os.path.isfile(dest_file):
706 logger.debug(
'Deleting existing file: %s' % dest_file)
709 raise Exception(
'Output file already exists: %s' % dest_file)
712 logger.info(
"Copying '%s' to '%s'" % (src_file, dest_file))
715 shutil.copyfile(src_file, dest_file)
718 if not filecmp.cmp(src_file, dest_file, shallow=
False):
719 raise Exception(
"Copy from '%s' to '%s' failed." % (src_file, dest_file))
721 logger.warning(
"Skipping copy of '%s' to '%s' because they are the same file!" % (src_file, dest_file))
725 Copy input files to the run dir.
727 for src, dest
in self.
input_filesinput_files.items():
731 if os.path.dirname(dest):
732 raise Exception(
"The input file destination '%s' is not valid." % dest)
733 logger.info(
"Copying input file: %s -> %s" % (src, os.path.join(self.
rundirrundir, dest)))
735 src = src.replace(
'/mss/',
'/cache/')
736 if os.path.exists(os.path.join(self.
rundirrundir, dest)):
737 logger.info(
"The input file '%s' already exists at destination '%s'" % (dest, self.
rundirrundir))
738 os.chmod(os.path.join(self.
rundirrundir, dest), 0o666)
740 shutil.copyfile(src, os.path.join(self.
rundirrundir, dest))
741 os.chmod(dest, 0o666)
747 for src, dest
in self.
input_filesinput_files.items():
748 if not os.path.isabs(src):
749 raise Exception(
'The input source file is not an absolute path: %s' % src)
750 if os.path.dirname(dest):
751 raise Exception(
'The input file destination is not valid: %s' % dest)
752 logger.debug(
"Symlinking input '%s' to '%s'" % (src, os.path.join(self.
rundirrundir, dest)))
753 os.symlink(src, os.path.join(self.
rundirrundir, dest))
757 Map a key to an output file name so a user can reference it in their job params.
759 if tag
not in list(self.
ptagsptags.keys()):
760 self.
ptagsptags[tag] = filename
761 logger.info(
"Added ptag %s -> %s" % (tag, filename))
763 raise Exception(
'The ptag already exists: %s' % tag)
767 return src.startswith(Job.PTAG_PREFIX)
771 if src.startswith(Job.PTAG_PREFIX):
772 return src[len(Job.PTAG_PREFIX):]
774 raise Exception(
'File src is not a ptag: %s' % src)
778 return self.
ptagsptags[Job.get_ptag_from_src(src)]
784 Set fieldmap dir to install location if not provided in config
789 raise Exception(
"The fieldmaps dir does not exist: {}".format(self.
hps_fieldmaps_dirhps_fieldmaps_dir))
790 logger.debug(
"Using fieldmap dir from install: {}".format(self.
hps_fieldmaps_dirhps_fieldmaps_dir))
792 logger.debug(
"Using fieldmap dir from config: {}".format(self.
hps_fieldmaps_dirhps_fieldmaps_dir))
796 Symlink to the fieldmap directory
798 fieldmap_symlink = pathlib.Path(os.getcwd(),
"fieldmap")
799 if not fieldmap_symlink.exists():
800 logger.debug(
"Creating symlink to fieldmap directory: {}".format(fieldmap_symlink))
803 if fieldmap_symlink.is_dir()
or os.path.islink(fieldmap_symlink):
804 logger.debug(
"Fieldmap symlink or directory already exists: {}".format(fieldmap_symlink))
806 raise Exception(
"A file called 'fieldmap' exists but it is not a symlink or directory!")
810 'run':
'Run a job script',
811 'script':
'Show list of available job scripts (provide script name for detailed info)',
812 'component':
'Show list of available components (provide component name for detailed info)'}
816 print(
"Usage: job.py [command] [args]")
818 for name, descr
in cmds.items():
819 print(
" %s: %s" % (name, descr))
822 if __name__ ==
'__main__':
823 if len(sys.argv) > 1:
825 if cmd
not in list(cmds.keys()):
827 raise Exception(
'The job command is not valid: %s' % cmd)
833 elif cmd ==
'script':
834 if len(sys.argv) > 2:
840 print(
"AVAILABLE JOB SCRIPTS: ")
841 for name
in sorted(scriptdb.get_script_names()):
842 print(
' %s: %s' % (name, scriptdb.get_script_path(name)))
843 elif cmd ==
'component':
844 if len(sys.argv) > 2:
846 component_name = sys.argv[2]
Wrapper for accessing config information from parser.
def config(self, obj, section=None, required_names=[], allowed_names=[], require_section=True)
Push config into an object by setting an attribute.
def get_script_path(self, name)
Get path to job script from job name.
def get_script_names(self)
Get list of all script names.
scripts
dict of paths to job scripts sorted by name
def exists(self, name)
Test if job script exists in dict.
def get_scripts(self)
Get dict containing paths to scripts sorted by script names.
Simple JSON based store of job data.
def __init__(self, path=None)
def load(self, json_store)
Load raw JSON data into this job store.
def get_job_data(self)
Get the raw dict containing all the job data.
def get_job(self, job_id)
Get a job by its job ID.
def has_job_id(self, job_id)
Return true if the job ID exists in the store.
def get_job_ids(self)
Get a sorted list of job IDs.
Primary class to run HPS jobs from a Python script.
def _configure(self)
Configure job class and components.
hps_fieldmaps_dir
fieldmaps dir
def _copy_input_files(self)
Copy input files to the run dir.
component_err
output for component error messages
def _config_fieldmap_dir(self)
Set fieldmap dir to install location if not provided in config.
description
short description of job, should be overridden by the job script
def _cleanup(self)
Perform post-job cleanup.
input_files
dict of input files
def _config_file_pipeline(self)
Pipe component outputs to inputs automatically.
components
list of components in job
job_config
Job configuration.
def _load_script(self)
Load the job script.
param_file
path to parameter file
def _copy_output_files(self)
Copy output files to output directory, handling ptags if necessary.
script
script containing component initializations
def add(self, component)
Public method for adding components to the job.
rundir
rundir is current working directory
def get_ptag_from_src(src)
def _set_input_files(self)
Prepare dictionary of input files.
def _setup(self)
Necessary setup before job can be executed.
output_files
dict of output files
args
(passed) job arguments
def _copy_output_file(self, src, dest)
Copy an output file from src to dest.
def _initialize(self)
Perform basic initialization before the job script is loaded.
def ptag(self, tag, filename)
Map a key to an output file name so a user can reference it in their job params.
component_out
output for component printouts
def resolve_output_src(self, src)
def __init__(self, args=sys.argv, **kwargs)
def parse_args(self)
Configure the job from command line arguments.
def _execute(self)
Execute all components in job.
def _load_params(self, params)
Load the job parameters from JSON data.
def run(self)
This is the primary execution method for running the job.
def _set_parameters(self)
Push JSON job parameters to components.
def _symlink_fieldmap_dir(self)
Symlink to the fieldmap directory.
def _symlink_input_files(self)
Symlink input files.
def set_parameters(self, params)
Add parameters to the job, overriding values if they exist already.
enable_copy_output_files
These attributes can all be set in the config file.
output_dir
output_dir is current working directory
ptags
dict with keys to output filenames
def convert_config_value(val)
Convert config value to Python readable value.
Utility script for printing help about component classes.
def print_component(v)
Accepts Component class and prints info about it.
def print_job_script(script_path)
def print_components()
Print info for all Component classes.