HPS-MC
 
Loading...
Searching...
No Matches
job_template.py
Go to the documentation of this file.
1"""! @package job_template
2Expand a Jinja job template into a full list of jobs in JSON format."""
3
4import sys
5import os
6import itertools
7import copy
8import json
9import argparse
10import math
11import uuid as _uuid
12import subprocess
13import re
14
15from jinja2 import Template, Environment, FileSystemLoader
16
17
18def basename(path):
19 """! Filter to return a file base name stripped of dir and extension."""
20 return os.path.splitext(os.path.basename(path))[0]
21
22
23def extension(path):
24 """! Filter to get file extension from string."""
25 return os.path.splitext(path)[1]
26
27
28def dirname(path):
29 """! Filter to get dir name from string."""
30 return os.path.dirname(path)
31
32
33def pad(num, npad=4):
34 """! Filter to pad a number."""
35 return format(num, format(npad, '02'))
36
37
38def uuid():
39 """! Function to get a uuid within a template."""
40 return str(_uuid.uuid4())[:8]
41
42
44 """! Filter to get a run number by inspecting first event in slcio file."""
45 event_dump = subprocess.run(
46 ["dumpevent", path, "1"], # dump the first event from the file
47 check=True, # throw exception if returns non-0 exit code
48 stdout=subprocess.PIPE, # keep the output in the object rather than printing it out
49 stderr=subprocess.PIPE
50 )
51 # search output for run number
52 match = re.search('run:\\s*(\\d+)', event_dump.stdout.decode('utf-8'))
53 if not match:
54 raise ValueError(f'Unable to find run number from dump of first event in {path}')
55 # group 0 is the entire match, group 1 is what is in the parentheses above
56 return int(match.group(1))
57
58
59def filenum(path):
60 """! Filter to get the trailing number of a file
61
62 This will extract the number between the last underscore and the extension.
63 For example 'file_name_is_number.root' will return 'number' if 'number' is
64 actually a integer.
65 """
66 # use our other function to remove the extention and directory
67 filename = basename(path)
68 # entries in a filename are split by '_', we take the last one (index -1),
69 # and then attempt to convert it to an int
70 return int(filename.split('_')[-1])
71
72# def pwd():
73# return os.getcwd()
74
75
76class JobData(object):
77 """! Very simple key-value object for storing data for each job."""
78
79 def __init__(self):
80 self.input_files = {}
81 self.params = {}
82 self.job_id = 0
83
84 def set(self, name, value):
85 setattr(self, name, value)
86
87 def set_param(self, name, value):
88 self.params[name] = value
89
90
91class MaxJobsException(Exception):
92 """! Exception if max jobs are reached."""
93
94 def __init__(self, max_jobs):
95 super().__init__("Reached max jobs: {}".format(max_jobs))
96
97
99 """! Template engine for transforming input job template into JSON job store.
100
101 Accepts a set of iteration variables of which all combinations will be turned into jobs.
102 Also accepts lists of input files with a unique key from which one or more can be read
103 per job.
104 The user's template should be a JSON dict with jinja2 markup.
105 """
106
107 def __init__(self, template_file=None, output_file='jobs.json'):
108
109 self.template_file = template_file
110 self.env = Environment(loader=FileSystemLoader('.'))
111 self.env.filters['basename'] = basename
112 self.env.filters['pad'] = pad
113 self.env.filters['uuid'] = uuid
114 self.env.filters['extension'] = extension
115 self.env.filters['dirname'] = dirname
116 self.env.filters['lcio_dumpevent_runnumber'] = lcio_dumpevent_runnumber
117 self.env.filters['filenum'] = filenum
118
120
121 self.input_files = {}
122
123 self.itervars = {}
124
125 self.output_file = output_file
126
127 def add_input_files(self, key, file_list, nreads=1):
128 """! Add new input files to dict of input files.
129 @param key key under which new input files are added
130 @param file_list list of new input files to be added
131 @param nreads nbr of times the input files are read \todo check if this is correct
132 """
133 if key in self.input_files:
134 raise Exception('Input file key already exists: %s' % key)
135 self.input_files[key] = (file_list, nreads)
136
137 def add_itervar(self, name, vals):
138 """! Add new iteration variable to dict of iteration variables.
139 @param name name of new variable
140 @param vals list of values for iteration variable
141 """
142 if name in self.itervars:
143 raise Exception('The iter var already exists: %s' % name)
144 self.itervars[name] = vals
145
146 def add_itervars(self, iter_dict):
147 """! Add several iter variables at once.
148 @param iter_dict new dict of iteration variables to be added
149 """
150 for k, v in iter_dict.items():
151 self.add_itervar(k, v)
152
153 def add_itervars_json(self, json_file):
154 """! Add iter variables from json file.
155 @param json_file name of json file
156 """
157 self.add_itervars(json.load(json_file))
158
159 def get_itervars(self):
160 """!
161 Return all combinations of the iteration variables.
162 """
163 var_list = []
164 var_names = []
165 if self.itervars:
166 var_names.extend(sorted(self.itervars.keys()))
167 for k in sorted(self.itervars.keys()):
168 var_list.append(self.itervars[k])
169 prod = itertools.product(*var_list)
170 return var_names, list(prod)
171
172 def run(self):
173 """!
174 Generate the JSON jobs from processing the template and write to file.
175 """
176 self.template = self.env.get_template(self.template_file)
177 self.template.globals['uuid'] = uuid
178 jobs = []
179 for job in self._create_jobs():
180 job_vars = {'job': job,
181 'job_id': job.job_id,
182 'sequence': job.sequence,
183 'input_files': job.input_files}
184 for k, v in job.params.items():
185 if k in job_vars:
186 raise Exception("Illegal variable name: {}".format(k))
187 job_vars[k] = v
188 s = self.template.render(job_vars)
189 job_json = json.loads(s)
190 job_json['job_id'] = job.job_id
191
192 jobs.append(job_json)
193 with open(self.output_file, 'w') as f:
194 json.dump(jobs, f, indent=4)
195 print('Wrote %d jobs to: %s' % (len(jobs), self.output_file))
196
198 """!
199 Get the maximum number of iterations based on file input parameters.
200 """
201 max_iter = -1
202 for input_name in list(self.input_files.keys()):
203 nreads = self.input_files[input_name][1]
204 flist = self.input_files[input_name][0]
205 n_iter = int(math.floor(len(flist) / nreads))
206 if n_iter > max_iter:
207 max_iter = n_iter
208 return max_iter
209
210 def _create_jobs(self):
211
212 jobs = []
213
214 var_names, var_vals = self.get_itervars()
215 nvars = len(var_names)
216
217 job_id = self.job_id_start
218
219 max_iter = self._get_max_iterations()
220 if max_iter < 1:
221 max_iter = self.repeat
222 else:
223 max_iter = max_iter * self.repeat
224
225 njobs = 0
226 try:
227 for var_index in range(len(var_vals)):
228 jobdata = JobData()
229 for j in range(nvars):
230 jobdata.set_param(var_names[j], var_vals[var_index][j])
231 input_files = copy.deepcopy(self.input_files)
232 for r in range(max_iter):
233 jobdata.set('job_id', job_id)
234 jobdata.set('sequence', r)
235 if (len(input_files.keys())):
236 for input_name in list(input_files.keys()):
237 job_input_files = []
238 nreads = input_files[input_name][1]
239 for iread in range(nreads):
240 input_file = input_files[input_name][0].pop(0)
241 job_input_files.append(input_file)
242 jobdata.input_files[input_name] = job_input_files
243 jobdata_copy = copy.deepcopy(jobdata)
244 jobs.append(jobdata_copy)
245 job_id += 1
246 njobs += 1
247 if njobs >= self.max_jobs:
249 except MaxJobsException as mje:
250 print(mje)
251
252 return jobs
253
254 def _read_input_file_list(self, input_file_list):
255 """! Read the input file list from arg parsing."""
256 for f in input_file_list:
257 name = f[0]
258 if name in list(self.input_files.keys()):
259 raise Exception('Duplicate input file list name: %s' % name)
260 input_file = f[1]
261 nreads = int(f[2])
262 input_file_list = []
263 with open(input_file, 'r') as f:
264 lines = f.readlines()
265 for line in lines:
266 if len(line.strip()):
267 input_file_list.append(line.strip())
268 if not len(input_file_list):
269 raise Exception('Failed to read any input files from file: %s' % input_file)
270 self.input_files[name] = (input_file_list, nreads)
271
272 def parse_args(self):
273 """! Parse arguments for template engine."""
274
275 parser = argparse.ArgumentParser(description="Create a JSON job store from a jinja2 template")
276 parser.add_argument("-j", "--job-start", nargs="?", type=int, help="Starting job ID", default=0)
277 parser.add_argument("-a", "--var-file", help="Variables in JSON format for iteration")
278 parser.add_argument("-i", "--input-file-list", action='append', nargs=3,
279 metavar=('NAME', 'FILE', 'NREADS'), help="Unique name of input file list, path on disk, number of files to read per job")
280 parser.add_argument("-r", "--repeat", type=int, help="Number of times to repeat job parameters", default=1)
281 parser.add_argument("-m", "--max-jobs", type=int, help="Max number of jobs to generate", default=sys.maxsize)
282 parser.add_argument("template_file", help="Job template in JSON format with jinja2 markup")
283 parser.add_argument("output_file", help="Output file containing the generated JSON job store")
284
285 cl = parser.parse_args()
286
287 self.job_id_start = cl.job_start
288
289 self.repeat = cl.repeat
290
291 self.max_jobs = cl.max_jobs
292
293 self.template_file = cl.template_file
294 if not os.path.isfile(self.template_file):
295 raise Exception('The template file does not exist: %s' % self.json_template_file)
296
297 self.output_file = cl.output_file
298
299 self.input_files = {}
300 if cl.input_file_list is not None:
301 self._read_input_file_list(cl.input_file_list)
302
303 if cl.var_file:
304 var_file = cl.var_file
305 if not os.path.exists(var_file):
306 raise Exception('The var file does not exist: %s' % var_file)
307 with open(var_file, 'r') as f:
308 self.add_itervars(json.load(f))
309
310
311if __name__ == '__main__':
312 job_tmpl = JobTemplate()
313 job_tmpl.parse_args()
314 job_tmpl.run()
Very simple key-value object for storing data for each job.
set(self, name, value)
set_param(self, name, value)
Template engine for transforming input job template into JSON job store.
_get_max_iterations(self)
Get the maximum number of iterations based on file input parameters.
run(self)
Generate the JSON jobs from processing the template and write to file.
get_itervars(self)
Return all combinations of the iteration variables.
__init__(self, template_file=None, output_file='jobs.json')
parse_args(self)
Parse arguments for template engine.
input_files
dict of input files
_read_input_file_list(self, input_file_list)
Read the input file list from arg parsing.
template_file
template file from which parameters are read
add_itervars_json(self, json_file)
Add iter variables from json file.
add_itervars(self, iter_dict)
Add several iter variables at once.
add_input_files(self, key, file_list, nreads=1)
Add new input files to dict of input files.
output_file
name of output file
itervars
dict of iteration variables
add_itervar(self, name, vals)
Add new iteration variable to dict of iteration variables.
Exception if max jobs are reached.
filenum(path)
Filter to get the trailing number of a file.
dirname(path)
Filter to get dir name from string.
extension(path)
Filter to get file extension from string.
pad(num, npad=4)
Filter to pad a number.
basename(path)
Filter to return a file base name stripped of dir and extension.
lcio_dumpevent_runnumber(path)
Filter to get a run number by inspecting first event in slcio file.