HPS-MC
job_template.py
Go to the documentation of this file.
1 """! @package job_template
2 Expand a Jinja job template into a full list of jobs in JSON format."""
3 
4 import sys
5 import os
6 import itertools
7 import copy
8 import json
9 import argparse
10 import math
11 import uuid as _uuid
12 import subprocess
13 import re
14 
15 from jinja2 import Template, Environment, FileSystemLoader
16 
17 
18 def basename(path):
19  """! Filter to return a file base name stripped of dir and extension."""
20  return os.path.splitext(os.path.basename(path))[0]
21 
22 
23 def extension(path):
24  """! Filter to get file extension from string."""
25  return os.path.splitext(path)[1]
26 
27 
28 def dirname(path):
29  """! Filter to get dir name from string."""
30  return os.path.dirname(path)
31 
32 
33 def pad(num, npad=4):
34  """! Filter to pad a number."""
35  return format(num, format(npad, '02'))
36 
37 
38 def uuid():
39  """! Function to get a uuid within a template."""
40  return str(_uuid.uuid4())[:8]
41 
42 
44  """! Filter to get a run number by inspecting first event in slcio file."""
45  event_dump = subprocess.run(
46  ["dumpevent", path, "1"], # dump the first event from the file
47  check=True, # throw exception if returns non-0 exit code
48  stdout=subprocess.PIPE, # keep the output in the object rather than printing it out
49  stderr=subprocess.PIPE
50  )
51  # search output for run number
52  match = re.search('run:\\s*(\\d+)', event_dump.stdout.decode('utf-8'))
53  if not match:
54  raise ValueError(f'Unable to find run number from dump of first event in {path}')
55  # group 0 is the entire match, group 1 is what is in the parentheses above
56  return int(match.group(1))
57 
58 
59 def filenum(path):
60  """! Filter to get the trailing number of a file
61 
62  This will extract the number between the last underscore and the extension.
63  For example 'file_name_is_number.root' will return 'number' if 'number' is
64  actually a integer.
65  """
66  # use our other function to remove the extention and directory
67  filename = basename(path)
68  # entries in a filename are split by '_', we take the last one (index -1),
69  # and then attempt to convert it to an int
70  return int(filename.split('_')[-1])
71 
72 # def pwd():
73 # return os.getcwd()
74 
75 
76 class JobData(object):
77  """! Very simple key-value object for storing data for each job."""
78 
79  def __init__(self):
80  self.input_filesinput_files = {}
81  self.paramsparams = {}
82  self.job_idjob_id = 0
83 
84  def set(self, name, value):
85  setattr(self, name, value)
86 
87  def set_param(self, name, value):
88  self.paramsparams[name] = value
89 
90 
91 class MaxJobsException(Exception):
92  """! Exception if max jobs are reached."""
93 
94  def __init__(self, max_jobs):
95  super().__init__("Reached max jobs: {}".format(max_jobs))
96 
97 
99  """! Template engine for transforming input job template into JSON job store.
100 
101  Accepts a set of iteration variables of which all combinations will be turned into jobs.
102  Also accepts lists of input files with a unique key from which one or more can be read
103  per job.
104  The user's template should be a JSON dict with jinja2 markup.
105  """
106 
107  def __init__(self, template_file=None, output_file='jobs.json'):
108 
109  self.template_filetemplate_file = template_file
110  self.envenv = Environment(loader=FileSystemLoader('.'))
111  self.envenv.filters['basename'] = basename
112  self.envenv.filters['pad'] = pad
113  self.envenv.filters['uuid'] = uuid
114  self.envenv.filters['extension'] = extension
115  self.envenv.filters['dirname'] = dirname
116  self.envenv.filters['lcio_dumpevent_runnumber'] = lcio_dumpevent_runnumber
117  self.envenv.filters['filenum'] = filenum
118 
119  self.job_id_startjob_id_start = 0
120 
121  self.input_filesinput_files = {}
122 
123  self.itervarsitervars = {}
124 
125  self.output_fileoutput_file = output_file
126 
127  def add_input_files(self, key, file_list, nreads=1):
128  """! Add new input files to dict of input files.
129  @param key key under which new input files are added
130  @param file_list list of new input files to be added
131  @param nreads nbr of times the input files are read \todo check if this is correct
132  """
133  if key in self.input_filesinput_files:
134  raise Exception('Input file key already exists: %s' % key)
135  self.input_filesinput_files[key] = (file_list, nreads)
136 
137  def add_itervar(self, name, vals):
138  """! Add new iteration variable to dict of iteration variables.
139  @param name name of new variable
140  @param vals list of values for iteration variable
141  """
142  if name in self.itervarsitervars:
143  raise Exception('The iter var already exists: %s' % name)
144  self.itervarsitervars[name] = vals
145 
146  def add_itervars(self, iter_dict):
147  """! Add several iter variables at once.
148  @param iter_dict new dict of iteration variables to be added
149  """
150  for k, v in iter_dict.items():
151  self.add_itervaradd_itervar(k, v)
152 
153  def add_itervars_json(self, json_file):
154  """! Add iter variables from json file.
155  @param json_file name of json file
156  """
157  self.add_itervarsadd_itervars(json.load(json_file))
158 
159  def get_itervars(self):
160  """!
161  Return all combinations of the iteration variables.
162  """
163  var_list = []
164  var_names = []
165  if self.itervarsitervars:
166  var_names.extend(sorted(self.itervarsitervars.keys()))
167  for k in sorted(self.itervarsitervars.keys()):
168  var_list.append(self.itervarsitervars[k])
169  prod = itertools.product(*var_list)
170  return var_names, list(prod)
171 
172  def run(self):
173  """!
174  Generate the JSON jobs from processing the template and write to file.
175  """
176  self.templatetemplate = self.envenv.get_template(self.template_filetemplate_file)
177  self.templatetemplate.globals['uuid'] = uuid
178  jobs = []
179  for job in self._create_jobs_create_jobs():
180  job_vars = {'job': job,
181  'job_id': job.job_id,
182  'sequence': job.sequence,
183  'input_files': job.input_files}
184  for k, v in job.params.items():
185  if k in job_vars:
186  raise Exception("Illegal variable name: {}".format(k))
187  job_vars[k] = v
188  s = self.templatetemplate.render(job_vars)
189  job_json = json.loads(s)
190  job_json['job_id'] = job.job_id
191 
192  jobs.append(job_json)
193  with open(self.output_fileoutput_file, 'w') as f:
194  json.dump(jobs, f, indent=4)
195  print('Wrote %d jobs to: %s' % (len(jobs), self.output_fileoutput_file))
196 
198  """!
199  Get the maximum number of iterations based on file input parameters.
200  """
201  max_iter = -1
202  for input_name in list(self.input_filesinput_files.keys()):
203  nreads = self.input_filesinput_files[input_name][1]
204  flist = self.input_filesinput_files[input_name][0]
205  n_iter = int(math.floor(len(flist) / nreads))
206  if n_iter > max_iter:
207  max_iter = n_iter
208  return max_iter
209 
210  def _create_jobs(self):
211 
212  jobs = []
213 
214  var_names, var_vals = self.get_itervarsget_itervars()
215  nvars = len(var_names)
216 
217  job_id = self.job_id_startjob_id_start
218 
219  max_iter = self._get_max_iterations_get_max_iterations()
220  if max_iter < 1:
221  max_iter = self.repeatrepeat
222  else:
223  max_iter = max_iter * self.repeatrepeat
224 
225  njobs = 0
226  try:
227  for var_index in range(len(var_vals)):
228  jobdata = JobData()
229  for j in range(nvars):
230  jobdata.set_param(var_names[j], var_vals[var_index][j])
231  input_files = copy.deepcopy(self.input_filesinput_files)
232  for r in range(max_iter):
233  jobdata.set('job_id', job_id)
234  jobdata.set('sequence', r)
235  if (len(input_files.keys())):
236  for input_name in list(input_files.keys()):
237  job_input_files = []
238  nreads = input_files[input_name][1]
239  for iread in range(nreads):
240  input_file = input_files[input_name][0].pop(0)
241  job_input_files.append(input_file)
242  jobdata.input_files[input_name] = job_input_files
243  jobdata_copy = copy.deepcopy(jobdata)
244  jobs.append(jobdata_copy)
245  job_id += 1
246  njobs += 1
247  if njobs >= self.max_jobsmax_jobs:
248  raise MaxJobsException(self.max_jobsmax_jobs)
249  except MaxJobsException as mje:
250  print(mje)
251 
252  return jobs
253 
254  def _read_input_file_list(self, input_file_list):
255  """! Read the input file list from arg parsing."""
256  for f in input_file_list:
257  name = f[0]
258  if name in list(self.input_filesinput_files.keys()):
259  raise Exception('Duplicate input file list name: %s' % name)
260  input_file = f[1]
261  nreads = int(f[2])
262  input_file_list = []
263  with open(input_file, 'r') as f:
264  lines = f.readlines()
265  for line in lines:
266  if len(line.strip()):
267  input_file_list.append(line.strip())
268  if not len(input_file_list):
269  raise Exception('Failed to read any input files from file: %s' % input_file)
270  self.input_filesinput_files[name] = (input_file_list, nreads)
271 
272  def parse_args(self):
273  """! Parse arguments for template engine."""
274 
275  parser = argparse.ArgumentParser(description="Create a JSON job store from a jinja2 template")
276  parser.add_argument("-j", "--job-start", nargs="?", type=int, help="Starting job ID", default=0)
277  parser.add_argument("-a", "--var-file", help="Variables in JSON format for iteration")
278  parser.add_argument("-i", "--input-file-list", action='append', nargs=3,
279  metavar=('NAME', 'FILE', 'NREADS'), help="Unique name of input file list, path on disk, number of files to read per job")
280  parser.add_argument("-r", "--repeat", type=int, help="Number of times to repeat job parameters", default=1)
281  parser.add_argument("-m", "--max-jobs", type=int, help="Max number of jobs to generate", default=sys.maxsize)
282  parser.add_argument("template_file", help="Job template in JSON format with jinja2 markup")
283  parser.add_argument("output_file", help="Output file containing the generated JSON job store")
284 
285  cl = parser.parse_args()
286 
287  self.job_id_startjob_id_start = cl.job_start
288 
289  self.repeatrepeat = cl.repeat
290 
291  self.max_jobsmax_jobs = cl.max_jobs
292 
293  self.template_filetemplate_file = cl.template_file
294  if not os.path.isfile(self.template_filetemplate_file):
295  raise Exception('The template file does not exist: %s' % self.json_template_file)
296 
297  self.output_fileoutput_file = cl.output_file
298 
299  self.input_filesinput_files = {}
300  if cl.input_file_list is not None:
301  self._read_input_file_list_read_input_file_list(cl.input_file_list)
302 
303  if cl.var_file:
304  var_file = cl.var_file
305  if not os.path.exists(var_file):
306  raise Exception('The var file does not exist: %s' % var_file)
307  with open(var_file, 'r') as f:
308  self.add_itervarsadd_itervars(json.load(f))
309 
310 
311 if __name__ == '__main__':
312  job_tmpl = JobTemplate()
313  job_tmpl.parse_args()
314  job_tmpl.run()
Very simple key-value object for storing data for each job.
Definition: job_template.py:76
def set_param(self, name, value)
Definition: job_template.py:87
def set(self, name, value)
Definition: job_template.py:84
Template engine for transforming input job template into JSON job store.
Definition: job_template.py:98
def _read_input_file_list(self, input_file_list)
Read the input file list from arg parsing.
def __init__(self, template_file=None, output_file='jobs.json')
def add_itervars(self, iter_dict)
Add several iter variables at once.
input_files
dict of input files
template_file
template file from which parameters are read
def add_itervar(self, name, vals)
Add new iteration variable to dict of iteration variables.
def add_itervars_json(self, json_file)
Add iter variables from json file.
def get_itervars(self)
Return all combinations of the iteration variables.
output_file
name of output file
def add_input_files(self, key, file_list, nreads=1)
Add new input files to dict of input files.
def parse_args(self)
Parse arguments for template engine.
itervars
dict of iteration variables
def run(self)
Generate the JSON jobs from processing the template and write to file.
def _get_max_iterations(self)
Get the maximum number of iterations based on file input parameters.
Exception if max jobs are reached.
Definition: job_template.py:91
def basename(path)
Filter to return a file base name stripped of dir and extension.
Definition: job_template.py:18
def uuid()
Function to get a uuid within a template.
Definition: job_template.py:38
def lcio_dumpevent_runnumber(path)
Filter to get a run number by inspecting first event in slcio file.
Definition: job_template.py:43
def pad(num, npad=4)
Filter to pad a number.
Definition: job_template.py:33
def dirname(path)
Filter to get dir name from string.
Definition: job_template.py:28
def extension(path)
Filter to get file extension from string.
Definition: job_template.py:23
def filenum(path)
Filter to get the trailing number of a file.
Definition: job_template.py:59