2Primary class for running and managing HPSMC jobs defined by a set of components.
19from collections.abc
import Sequence
20from os.path
import expanduser
23from hpsmc
import global_config
26logger = logging.getLogger(
'hpsmc.job')
30 """! Wrapper for accessing config information from parser."""
33 self.
parser = copy.copy(global_config)
36 parser_lines = [
'Job configuration:']
37 for section
in self.
parser.sections():
38 parser_lines.append(
"[" + section +
"]")
39 for i, v
in self.
parser.items(section):
40 parser_lines.append(
"%s=%s" % (i, v))
41 parser_lines.append(
'')
42 return '\n'.join(parser_lines)
44 def config(self, obj, section=None, required_names=[], allowed_names=[], require_section=True):
45 """! Push config into an object by setting an attribute.
46 @param obj object config is pushed to
47 @param section object name (basically name of the component the config is pushed to?)
48 @param required_names list of names of required configurations
49 @param allowed_names list of allowed config names
52 section = obj.__class__.__name__
53 if self.
parser.has_section(section):
55 for req
in required_names:
56 if req
not in dict(self.
parser.items(section)):
57 raise Exception(
"Missing required config '%s'" % req)
59 for name, value
in self.
parser.items(section):
60 if len(allowed_names)
and name
not in allowed_names:
61 raise Exception(
"Config name '%s' is not allowed for '%s'" % (name, section))
62 setattr(obj, name, convert_config_value(value))
68 raise Exception(
"Missing required config section '%s'" % section)
70 logger.warning(
'Config section not found: %s' % section)
75 Simple JSON based store of job data.
84 logger.warning(
'Path was not provided to job store - no jobs loaded!')
86 def load(self, json_store):
87 """! Load raw JSON data into this job store.
88 @param json_store json file containing raw job data
90 if not os.path.exists(json_store):
91 raise Exception(
'JSON job store does not exist: {}'.format(json_store))
92 with open(json_store,
'r')
as f:
93 json_data = json.loads(f.read())
94 for job_data
in json_data:
95 if 'job_id' not in job_data:
96 raise Exception(f
"Job data is missing a job id")
97 job_id = int(job_data[
'job_id'])
98 if job_id
in self.
data:
99 raise Exception(f
"Duplicate job id: {job_id}")
100 self.
data[job_id] = job_data
101 logger.debug(
"Loaded job {} data:\n {}".format(job_id, job_data))
102 logger.info(f
"Successfully loaded {len(self.data)} jobs from: {json_store}")
105 """! Get a job by its job ID.
109 raise Exception(f
"Job ID does not exist: {job_id}")
110 return self.
data[int(job_id)]
113 """! Get the raw dict containing all the job data."""
117 """! Get a sorted list of job IDs."""
118 return sorted(self.
data.keys())
121 """! Return true if the job ID exists in the store."""
122 return int(job_id)
in list(self.
data.keys())
126 """! Database of job scripts.
130 if 'HPSMC_DIR' not in os.environ:
131 raise Exception(
'HPSMC_DIR is not set in the environ.')
132 hps_mc_dir = os.environ[
'HPSMC_DIR']
133 script_dir = os.path.join(hps_mc_dir,
'lib',
'python',
'jobs')
136 for f
in glob.glob(os.path.join(script_dir,
'*_job.py')):
137 script_name = os.path.splitext(os.path.basename(f))[0].replace(
'_job',
'')
141 """! Get path to job script from job name.
143 @return path to job script"""
147 """! Get list of all script names."""
148 return list(self.
scripts.keys())
151 """! Get dict containing paths to scripts sorted by script names."""
155 """! Test if job script exists in dict.
156 @return True if job name is key in job dict"""
162 Primary class to run HPS jobs from a Python script.
164 Jobs are run by executing a series of components
165 which are configured using a config file with parameters
166 provided by a JSON job file.
172 _config_names = ['enable_copy_output_files',
173 'enable_copy_input_files',
177 'ignore_return_codes',
178 'check_output_files',
179 'enable_file_chaining',
184 PTAG_PREFIX =
'ptag:'
188 if 'HPSMC_DIR' not in os.environ:
189 raise Exception(
'HPSMC_DIR is not set in the environ.')
241 Public method for adding components to the job.
243 if isinstance(component, Sequence)
and not isinstance(component, str):
250 Add parameters to the job, overriding values if they exist already.
252 This method can be used in job scripts to define default values.
254 for k, v
in params.items():
256 logger.debug(
"Setting new value '%s' for parameter '%s' with existing value '%s'."
257 % (str(v), str(k), params[k]))
262 Configure the job from command line arguments.
265 parser = argparse.ArgumentParser(description=self.
description)
266 parser.add_argument(
"-c",
"--config-file", help=
"Config file locations", action=
'append')
267 parser.add_argument(
"-d",
"--run-dir", nargs=
'?', help=
"Job run dir")
268 parser.add_argument(
"-o",
"--out", nargs=
'?', help=
"File for component stdout (default prints to console)")
269 parser.add_argument(
"-e",
"--err", nargs=
'?', help=
"File for component stderr (default prints to console)")
270 parser.add_argument(
"-s",
"--job-steps", type=int, default=
None, help=
"Job steps to run (single number)")
271 parser.add_argument(
"-i",
"--job-id", type=int, help=
"Job ID from JSON job store", default=
None)
272 parser.add_argument(
"script", nargs=
'?', help=
"Path or name of job script")
273 parser.add_argument(
"params", nargs=
'?', help=
"Job param file in JSON format")
282 cl = parser.parse_args(self.
args)
286 config_files = list(map(os.path.abspath, cl.config_file))
287 logger.info(
"Reading additional config from: {}".format(config_files))
293 if not os.path.isabs(out_file):
294 out_file = os.path.abspath(out_file)
295 logger.info(
'Job output will be written to: %s' % out_file)
296 self.
out = open(out_file,
'w')
301 if not os.path.isabs(err_file):
302 err_file = os.path.abspath(err_file)
303 logger.info(
'Job error will be written to: %s' % err_file)
304 self.
err = open(err_file,
'w')
307 self.
rundir = os.path.abspath(cl.run_dir)
311 raise Exception(
"Invalid job steps argument (must be > 0): {}".format(self.
job_steps))
317 raise Exception(
'Missing required script name or location.')
326 logger.debug(
"Loading job with ID %d from job store '%s'" % (self.
job_id, self.
param_file))
328 if jobstore.has_job_id(self.
job_id):
329 params = jobstore.get_job(self.
job_id)
331 raise Exception(
"No job id %d was found in the job store '%s'" % (self.
job_id, self.
param_file))
334 logger.info(
'Loading job parameters from file: %s' % self.
param_file)
335 params = json.loads(open(self.
param_file,
'r').read())
336 if not isinstance(params, dict):
337 raise Exception(
'Job ID must be provided when running from a job store.')
343 Load the job parameters from JSON data.
350 if 'output_dir' in self.
params:
354 logger.debug(
"Changed output dir to abs path: %s" % self.
output_dir)
356 if 'job_id' in self.
params:
359 if 'input_files' in self.
params:
362 if 'output_files' in self.
params:
367 Prepare dictionary of input files.
369 If a link to a download location is given as input, the file is downloaded into the run directory before the file name is added to the input_files dict. If a regular file is provided, it is added to the dict without any additional action.
371 input_files_dict = {}
372 for file_key, file_name
in self.
input_files.items():
373 if 'https' in file_key:
374 logger.info(
"Downloading input file from: %s" % file_key)
375 file_name_path = self.
rundir +
"/" + file_name
377 subprocess.check_output([
'wget',
'-q',
'-O', file_name_path, file_key])
378 input_files_dict.update({file_name: file_name})
380 input_files_dict.update({file_key: file_name})
385 Perform basic initialization before the job script is loaded.
388 if not os.path.isabs(self.
rundir):
390 logger.info(
'Changed run dir to abs path: %s' % self.
rundir)
394 if "LSB_JOBID" in os.environ:
395 self.
rundir = os.path.join(
"/scratch", getpass.getuser(), os.environ[
"LSB_JOBID"])
396 logger.info(
'Set run dir for LSF: %s' % self.
rundir)
400 if not os.path.exists(self.
rundir):
401 logger.info(
'Creating run dir: %s' % self.
rundir)
406 Configure job class and components.
410 self.
job_config.config(self, require_section=
False)
419 component.config_logging(self.
job_config.parser)
427 component.config_from_environ()
430 component.check_config()
440 logger.warning(
"No job script was provided!")
443 if not self.
script.endswith(
'.py'):
446 if not script_db.exists(self.
script):
447 raise Exception(
"The script name is not valid: %s" % self.
script)
448 script_path = script_db.get_script_path(self.
script)
449 logger.debug(
"Found script '%s' from name '%s'" % (script_path, self.
script))
454 if not os.path.exists(script_path):
455 raise Exception(
'Job script does not exist: %s' % script_path)
457 logger.info(
'Loading job script: %s' % script_path)
459 exec(compile(open(script_path,
"rb").read(), script_path,
'exec'), {
'job': self})
463 This is the primary execution method for running the job.
466 logger.info(
'Job ID: ' + str(self.
job_id))
479 raise Exception(
"Job has no components to execute.")
482 logger.info(
"Job components loaded: %s" % ([c.name
for c
in self.
components]))
486 logger.info(
"Job parameters loaded: %s" % str(self.
params))
488 logger.info(
"No job parameters were specified!")
513 start_time = time.time()
522 stop_time = time.time()
523 elapsed = stop_time - start_time
524 logger.info(
"Job execution took {} seconds".format(round(elapsed, 4)))
531 logger.warning(
'Copy output files is disabled!')
536 logger.info(
'Successfully finished running job: %s' % self.
description)
540 Execute all components in job.
542 If dry_run is set to True, the components will not be exectuted,
543 list of components will be put out instead.
549 logger.info(
"Executing '%s' with command: %s" % (component.name, component.cmd_line_str()))
553 self.
component_out.write(
'================ Component: %s ================\n' % component.name)
558 self.
component_err.write(
'================ Component: %s ================\n' % component.name)
564 elapsed = end - start
565 logger.info(
"Execution of {} took {} second(s) with return code: {}"
566 .format(component.name, round(elapsed, 4), str(returncode)))
569 raise Exception(
"Non-zero return code %d from '%s'" % (returncode, component.name))
572 for outputfile
in component.output_files():
573 if not os.path.isfile(outputfile):
574 raise Exception(
"Output file '%s' is missing after execution." % outputfile)
577 logger.info(
"Dry run enabled. Components will NOT be executed!")
579 logger.info(
"'%s' with args: %s (DRY RUN)" % (component.name,
' '.join(component.cmd_args())))
583 Necessary setup before job can be executed.
587 logger.info(
'Changing to run dir: %s' % self.
rundir)
597 logger.info(
"Job is limited to first %d steps." % self.
job_steps)
604 logger.debug(
'Setting up component: %s' % (component.name))
605 component.rundir = self.
rundir
608 raise Exception(
"Command '%s' does not exist for '%s'." % (component.command, component.name))
612 Pipe component outputs to inputs automatically.
616 logger.debug(
"Configuring file IO for component '%s' with order %d" % (component. name, i))
618 logger.debug(
"Setting inputs on '%s' to: %s"
619 % (component.name, str(list(self.
input_files.values()))))
620 if not len(component.inputs):
623 logger.debug(
"Setting inputs on '%s' to: %s"
625 if len(component.inputs) == 0:
630 Push JSON job parameters to components.
633 component.set_parameters(self.
params)
637 Perform post-job cleanup.
640 logger.info(
'Running cleanup for component: %s' % str(component.name))
643 logger.info(
'Deleting run dir: %s' % self.
rundir)
644 if os.path.exists(
"%s/__swif_env__" % self.
rundir):
645 for f
in os.listdir(self.
rundir):
646 if (
'.log' not in f)
and (
'__swif_' not in f):
647 os.system(
'rm -r %s' % f)
649 shutil.rmtree(self.
rundir)
654 except Exception
as e:
661 except Exception
as e:
666 Copy output files to output directory, handling ptags if necessary.
670 logger.debug(
'Creating output dir: %s' % self.
output_dir)
676 ptag_src = Job.get_ptag_from_src(src)
677 if ptag_src
in list(self.
ptags.keys()):
678 src_file = self.
ptags[ptag_src]
679 logger.info(
"Resolved ptag: {} -> {}".format(ptag_src, src_file))
681 raise Exception(
'Undefined ptag used in job params: %s' % ptag_src)
686 Copy an output file from src to dest.
689 src_file = os.path.join(self.
rundir, src)
690 dest_file = os.path.join(self.
output_dir, dest)
694 if not os.path.exists(os.path.dirname(dest_file)):
695 os.makedirs(os.path.dirname(dest_file), 0o755)
699 if os.path.exists(dest_file):
700 if os.path.samefile(src_file, dest_file):
704 if os.path.isfile(dest_file):
706 logger.debug(
'Deleting existing file: %s' % dest_file)
709 raise Exception(
'Output file already exists: %s' % dest_file)
712 logger.info(
"Copying '%s' to '%s'" % (src_file, dest_file))
715 shutil.copyfile(src_file, dest_file)
718 if not filecmp.cmp(src_file, dest_file, shallow=
False):
719 raise Exception(
"Copy from '%s' to '%s' failed." % (src_file, dest_file))
721 logger.warning(
"Skipping copy of '%s' to '%s' because they are the same file!" % (src_file, dest_file))
725 Copy input files to the run dir.
731 if os.path.dirname(dest):
732 raise Exception(
"The input file destination '%s' is not valid." % dest)
733 logger.info(
"Copying input file: %s -> %s" % (src, os.path.join(self.
rundir, dest)))
735 src = src.replace(
'/mss/',
'/cache/')
736 if os.path.exists(os.path.join(self.
rundir, dest)):
737 logger.info(
"The input file '%s' already exists at destination '%s'" % (dest, self.
rundir))
738 os.chmod(os.path.join(self.
rundir, dest), 0o666)
740 shutil.copyfile(src, os.path.join(self.
rundir, dest))
741 os.chmod(dest, 0o666)
748 if not os.path.isabs(src):
749 raise Exception(
'The input source file is not an absolute path: %s' % src)
750 if os.path.dirname(dest):
751 raise Exception(
'The input file destination is not valid: %s' % dest)
752 logger.debug(
"Symlinking input '%s' to '%s'" % (src, os.path.join(self.
rundir, dest)))
753 os.symlink(src, os.path.join(self.
rundir, dest))
757 Map a key to an output file name so a user can reference it in their job params.
759 if tag
not in list(self.
ptags.keys()):
760 self.
ptags[tag] = filename
761 logger.info(
"Added ptag %s -> %s" % (tag, filename))
763 raise Exception(
'The ptag already exists: %s' % tag)
767 return src.startswith(Job.PTAG_PREFIX)
771 if src.startswith(Job.PTAG_PREFIX):
772 return src[len(Job.PTAG_PREFIX):]
774 raise Exception(
'File src is not a ptag: %s' % src)
778 return self.
ptags[Job.get_ptag_from_src(src)]
784 Set fieldmap dir to install location if not provided in config
789 raise Exception(
"The fieldmaps dir does not exist: {}".format(self.
hps_fieldmaps_dir))
790 logger.debug(
"Using fieldmap dir from install: {}".format(self.
hps_fieldmaps_dir))
792 logger.debug(
"Using fieldmap dir from config: {}".format(self.
hps_fieldmaps_dir))
796 Symlink to the fieldmap directory
798 fieldmap_symlink = pathlib.Path(os.getcwd(),
"fieldmap")
799 if not fieldmap_symlink.exists():
800 logger.debug(
"Creating symlink to fieldmap directory: {}".format(fieldmap_symlink))
803 if fieldmap_symlink.is_dir()
or os.path.islink(fieldmap_symlink):
804 logger.debug(
"Fieldmap symlink or directory already exists: {}".format(fieldmap_symlink))
806 raise Exception(
"A file called 'fieldmap' exists but it is not a symlink or directory!")
810 'run':
'Run a job script',
811 'script':
'Show list of available job scripts (provide script name for detailed info)',
812 'component':
'Show list of available components (provide component name for detailed info)'}
816 print(
"Usage: job.py [command] [args]")
818 for name, descr
in cmds.items():
819 print(
" %s: %s" % (name, descr))
822if __name__ ==
'__main__':
823 if len(sys.argv) > 1:
825 if cmd
not in list(cmds.keys()):
827 raise Exception(
'The job command is not valid: %s' % cmd)
833 elif cmd ==
'script':
834 if len(sys.argv) > 2:
837 print_job_script(script)
840 print(
"AVAILABLE JOB SCRIPTS: ")
841 for name
in sorted(scriptdb.get_script_names()):
842 print(
' %s: %s' % (name, scriptdb.get_script_path(name)))
843 elif cmd ==
'component':
844 if len(sys.argv) > 2:
846 component_name = sys.argv[2]
847 print_component(component_name)
Wrapper for accessing config information from parser.
config(self, obj, section=None, required_names=[], allowed_names=[], require_section=True)
Push config into an object by setting an attribute.
exists(self, name)
Test if job script exists in dict.
get_scripts(self)
Get dict containing paths to scripts sorted by script names.
scripts
dict of paths to job scripts sorted by name
get_script_path(self, name)
Get path to job script from job name.
get_script_names(self)
Get list of all script names.
Simple JSON based store of job data.
__init__(self, path=None)
load(self, json_store)
Load raw JSON data into this job store.
has_job_id(self, job_id)
Return true if the job ID exists in the store.
get_job_ids(self)
Get a sorted list of job IDs.
get_job_data(self)
Get the raw dict containing all the job data.
get_job(self, job_id)
Get a job by its job ID.
Primary class to run HPS jobs from a Python script.
hps_fieldmaps_dir
fieldmaps dir
run(self)
This is the primary execution method for running the job.
parse_args(self)
Configure the job from command line arguments.
_copy_output_file(self, src, dest)
Copy an output file from src to dest.
component_err
output for component error messages
_copy_output_files(self)
Copy output files to output directory, handling ptags if necessary.
description
short description of job, should be overridden by the job script
input_files
dict of input files
_config_fieldmap_dir(self)
Set fieldmap dir to install location if not provided in config.
components
list of components in job
add(self, component)
Public method for adding components to the job.
resolve_output_src(self, src)
job_config
Job configuration.
_config_file_pipeline(self)
Pipe component outputs to inputs automatically.
_symlink_fieldmap_dir(self)
Symlink to the fieldmap directory.
param_file
path to parameter file
script
script containing component initializations
_load_script(self)
Load the job script.
_copy_input_files(self)
Copy input files to the run dir.
_cleanup(self)
Perform post-job cleanup.
rundir
rundir is current working directory
_setup(self)
Necessary setup before job can be executed.
_initialize(self)
Perform basic initialization before the job script is loaded.
output_files
dict of output files
set_parameters(self, params)
Add parameters to the job, overriding values if they exist already.
__init__(self, args=sys.argv, **kwargs)
args
(passed) job arguments
component_out
output for component printouts
_load_params(self, params)
Load the job parameters from JSON data.
ptag(self, tag, filename)
Map a key to an output file name so a user can reference it in their job params.
_execute(self)
Execute all components in job.
_set_parameters(self)
Push JSON job parameters to components.
_configure(self)
Configure job class and components.
enable_copy_output_files
These attributes can all be set in the config file.
_symlink_input_files(self)
Symlink input files.
output_dir
output_dir is current working directory
_set_input_files(self)
Prepare dictionary of input files.
ptags
dict with keys to output filenames
Utility script for printing help about component classes.