1 """! @package job_template
2 Expand a Jinja job template into a full list of jobs in JSON format."""
15 from jinja2
import Template, Environment, FileSystemLoader
19 """! Filter to return a file base name stripped of dir and extension."""
20 return os.path.splitext(os.path.basename(path))[0]
24 """! Filter to get file extension from string."""
25 return os.path.splitext(path)[1]
29 """! Filter to get dir name from string."""
30 return os.path.dirname(path)
34 """! Filter to pad a number."""
35 return format(num, format(npad,
'02'))
39 """! Function to get a uuid within a template."""
40 return str(_uuid.uuid4())[:8]
44 """! Filter to get a run number by inspecting first event in slcio file."""
45 event_dump = subprocess.run(
46 [
"dumpevent", path,
"1"],
48 stdout=subprocess.PIPE,
49 stderr=subprocess.PIPE
52 match = re.search(
'run:\\s*(\\d+)', event_dump.stdout.decode(
'utf-8'))
54 raise ValueError(f
'Unable to find run number from dump of first event in {path}')
56 return int(match.group(1))
60 """! Filter to get the trailing number of a file
62 This will extract the number between the last underscore and the extension.
63 For example 'file_name_is_number.root' will return 'number' if 'number' is
70 return int(filename.split(
'_')[-1])
77 """! Very simple key-value object for storing data for each job."""
84 def set(self, name, value):
85 setattr(self, name, value)
88 self.
paramsparams[name] = value
92 """! Exception if max jobs are reached."""
95 super().
__init__(
"Reached max jobs: {}".format(max_jobs))
99 """! Template engine for transforming input job template into JSON job store.
101 Accepts a set of iteration variables of which all combinations will be turned into jobs.
102 Also accepts lists of input files with a unique key from which one or more can be read
104 The user's template should be a JSON dict with jinja2 markup.
107 def __init__(self, template_file=None, output_file='jobs.json'):
110 self.
envenv = Environment(loader=FileSystemLoader(
'.'))
111 self.
envenv.filters[
'basename'] = basename
112 self.
envenv.filters[
'pad'] = pad
113 self.
envenv.filters[
'uuid'] = uuid
114 self.
envenv.filters[
'extension'] = extension
115 self.
envenv.filters[
'dirname'] = dirname
116 self.
envenv.filters[
'lcio_dumpevent_runnumber'] = lcio_dumpevent_runnumber
117 self.
envenv.filters[
'filenum'] = filenum
128 """! Add new input files to dict of input files.
129 @param key key under which new input files are added
130 @param file_list list of new input files to be added
131 @param nreads nbr of times the input files are read \todo check if this is correct
134 raise Exception(
'Input file key already exists: %s' % key)
135 self.
input_filesinput_files[key] = (file_list, nreads)
138 """! Add new iteration variable to dict of iteration variables.
139 @param name name of new variable
140 @param vals list of values for iteration variable
143 raise Exception(
'The iter var already exists: %s' % name)
147 """! Add several iter variables at once.
148 @param iter_dict new dict of iteration variables to be added
150 for k, v
in iter_dict.items():
154 """! Add iter variables from json file.
155 @param json_file name of json file
161 Return all combinations of the iteration variables.
166 var_names.extend(sorted(self.
itervarsitervars.keys()))
167 for k
in sorted(self.
itervarsitervars.keys()):
168 var_list.append(self.
itervarsitervars[k])
169 prod = itertools.product(*var_list)
170 return var_names, list(prod)
174 Generate the JSON jobs from processing the template and write to file.
177 self.
templatetemplate.globals[
'uuid'] = uuid
180 job_vars = {
'job': job,
181 'job_id': job.job_id,
182 'sequence': job.sequence,
183 'input_files': job.input_files}
184 for k, v
in job.params.items():
186 raise Exception(
"Illegal variable name: {}".format(k))
188 s = self.
templatetemplate.render(job_vars)
189 job_json = json.loads(s)
190 job_json[
'job_id'] = job.job_id
192 jobs.append(job_json)
194 json.dump(jobs, f, indent=4)
195 print(
'Wrote %d jobs to: %s' % (len(jobs), self.
output_fileoutput_file))
199 Get the maximum number of iterations based on file input parameters.
202 for input_name
in list(self.
input_filesinput_files.keys()):
203 nreads = self.
input_filesinput_files[input_name][1]
205 n_iter = int(math.floor(len(flist) / nreads))
206 if n_iter > max_iter:
215 nvars = len(var_names)
221 max_iter = self.
repeatrepeat
223 max_iter = max_iter * self.
repeatrepeat
227 for var_index
in range(len(var_vals)):
229 for j
in range(nvars):
230 jobdata.set_param(var_names[j], var_vals[var_index][j])
231 input_files = copy.deepcopy(self.
input_filesinput_files)
232 for r
in range(max_iter):
233 jobdata.set(
'job_id', job_id)
234 jobdata.set(
'sequence', r)
235 if (len(input_files.keys())):
236 for input_name
in list(input_files.keys()):
238 nreads = input_files[input_name][1]
239 for iread
in range(nreads):
240 input_file = input_files[input_name][0].pop(0)
241 job_input_files.append(input_file)
242 jobdata.input_files[input_name] = job_input_files
243 jobdata_copy = copy.deepcopy(jobdata)
244 jobs.append(jobdata_copy)
249 except MaxJobsException
as mje:
255 """! Read the input file list from arg parsing."""
256 for f
in input_file_list:
258 if name
in list(self.
input_filesinput_files.keys()):
259 raise Exception(
'Duplicate input file list name: %s' % name)
263 with open(input_file,
'r')
as f:
264 lines = f.readlines()
266 if len(line.strip()):
267 input_file_list.append(line.strip())
268 if not len(input_file_list):
269 raise Exception(
'Failed to read any input files from file: %s' % input_file)
270 self.
input_filesinput_files[name] = (input_file_list, nreads)
273 """! Parse arguments for template engine."""
275 parser = argparse.ArgumentParser(description=
"Create a JSON job store from a jinja2 template")
276 parser.add_argument(
"-j",
"--job-start", nargs=
"?", type=int, help=
"Starting job ID", default=0)
277 parser.add_argument(
"-a",
"--var-file", help=
"Variables in JSON format for iteration")
278 parser.add_argument(
"-i",
"--input-file-list", action=
'append', nargs=3,
279 metavar=(
'NAME',
'FILE',
'NREADS'), help=
"Unique name of input file list, path on disk, number of files to read per job")
280 parser.add_argument(
"-r",
"--repeat", type=int, help=
"Number of times to repeat job parameters", default=1)
281 parser.add_argument(
"-m",
"--max-jobs", type=int, help=
"Max number of jobs to generate", default=sys.maxsize)
282 parser.add_argument(
"template_file", help=
"Job template in JSON format with jinja2 markup")
283 parser.add_argument(
"output_file", help=
"Output file containing the generated JSON job store")
285 cl = parser.parse_args()
295 raise Exception(
'The template file does not exist: %s' % self.json_template_file)
300 if cl.input_file_list
is not None:
304 var_file = cl.var_file
305 if not os.path.exists(var_file):
306 raise Exception(
'The var file does not exist: %s' % var_file)
307 with open(var_file,
'r')
as f:
311 if __name__ ==
'__main__':
313 job_tmpl.parse_args()
Very simple key-value object for storing data for each job.
def set_param(self, name, value)
def set(self, name, value)
Template engine for transforming input job template into JSON job store.
def _read_input_file_list(self, input_file_list)
Read the input file list from arg parsing.
def __init__(self, template_file=None, output_file='jobs.json')
def add_itervars(self, iter_dict)
Add several iter variables at once.
input_files
dict of input files
template_file
template file from which parameters are read
def add_itervar(self, name, vals)
Add new iteration variable to dict of iteration variables.
def add_itervars_json(self, json_file)
Add iter variables from json file.
def get_itervars(self)
Return all combinations of the iteration variables.
output_file
name of output file
def add_input_files(self, key, file_list, nreads=1)
Add new input files to dict of input files.
def parse_args(self)
Parse arguments for template engine.
itervars
dict of iteration variables
def run(self)
Generate the JSON jobs from processing the template and write to file.
job_id_start
start ID for jobs
def _get_max_iterations(self)
Get the maximum number of iterations based on file input parameters.
Exception if max jobs are reached.
def __init__(self, max_jobs)
def basename(path)
Filter to return a file base name stripped of dir and extension.
def uuid()
Function to get a uuid within a template.
def lcio_dumpevent_runnumber(path)
Filter to get a run number by inspecting first event in slcio file.
def pad(num, npad=4)
Filter to pad a number.
def dirname(path)
Filter to get dir name from string.
def extension(path)
Filter to get file extension from string.
def filenum(path)
Filter to get the trailing number of a file.