HPS-MC
 
Loading...
Searching...
No Matches
prepare_merge_jobs.py
Go to the documentation of this file.
1"""! @package prepare_merge_jobs
2Scan directories for ROOT files and prepare merge job configurations.
3
4This module scans run directories (e.g., hps_014185), collects ROOT files,
5batches them into groups of up to 20 files, and generates input file lists
6for use with hps-mc-job-template.
7"""
8
9import os
10import sys
11import glob
12import argparse
13import json
14from pathlib import Path
15
16
18 """! Prepare merge jobs by scanning directories for ROOT files."""
19
20 def __init__(self, parent_dir, output_prefix="merge_jobs", max_files_per_job=20,
21 file_pattern="*.root", run_pattern="hps_*"):
22 """! Initialize the merge job preparation.
23
24 @param parent_dir Parent directory containing run subdirectories
25 @param output_prefix Prefix for output file lists and job configs
26 @param max_files_per_job Maximum number of ROOT files per merge job
27 @param file_pattern Glob pattern for files to merge (default: *.root)
28 @param run_pattern Glob pattern for run directories (default: hps_*)
29 """
30 self.parent_dir = Path(parent_dir).resolve()
31 self.output_prefix = output_prefix
32 self.max_files_per_job = max_files_per_job
33 self.file_pattern = file_pattern
34 self.run_pattern = run_pattern
35
36 if not self.parent_dir.is_dir():
37 raise ValueError(f"Parent directory does not exist: {self.parent_dir}")
38
40 """! Scan parent directory for run directories and ROOT files.
41
42 @return Dictionary mapping run names to lists of ROOT file paths
43 """
44 run_files = {}
45
46 # Find all run directories matching the pattern
47 run_dirs = sorted(self.parent_dir.glob(self.run_pattern))
48
49 if not run_dirs:
50 print(f"Warning: No directories matching '{self.run_pattern}' found in {self.parent_dir}")
51 return run_files
52
53 print(f"Found {len(run_dirs)} run directories")
54
55 # Scan each run directory for ROOT files
56 for run_dir in run_dirs:
57 if not run_dir.is_dir():
58 continue
59
60 run_name = run_dir.name
61 root_files = sorted(run_dir.glob(self.file_pattern))
62
63 if root_files:
64 run_files[run_name] = [str(f) for f in root_files]
65 print(f" {run_name}: {len(root_files)} files")
66
67 return run_files
68
69 def create_batches(self, run_files):
70 """! Create batches of files for merge jobs.
71
72 If a run has more than max_files_per_job files, it will be split into
73 multiple batches.
74
75 @param run_files Dictionary mapping run names to lists of file paths
76 @return List of batch dictionaries with metadata
77 """
78 batches = []
79 batch_id = 0
80
81 for run_name, files in run_files.items():
82 # Split files into batches of max_files_per_job
83 for i in range(0, len(files), self.max_files_per_job):
84 batch_files = files[i:i + self.max_files_per_job]
85
86 batch_info = {
87 'batch_id': batch_id,
88 'run_name': run_name,
89 'batch_num': i // self.max_files_per_job,
90 'total_batches': (len(files) + self.max_files_per_job - 1) // self.max_files_per_job,
91 'files': batch_files,
92 'num_files': len(batch_files)
93 }
94
95 batches.append(batch_info)
96 batch_id += 1
97
98 return batches
99
100 def write_input_file_lists(self, batches, single_file=False):
101 """! Write input file lists for each batch.
102
103 Creates either a single file list or separate files per batch.
104
105 @param batches List of batch dictionaries
106 @param single_file If True, write all files to one list; if False, one list per batch
107 @return List of file paths written or single file path if single_file=True
108 """
109 if single_file:
110 output_file = f"{self.output_prefix}_input_files.txt"
111 with open(output_file, 'w') as f:
112 for batch in batches:
113 for file_path in batch['files']:
114 f.write(f"{file_path}\n")
115
116 total_files = sum(batch['num_files'] for batch in batches)
117 print(f"\nWrote {total_files} file paths to: {output_file}")
118 return output_file
119 else:
120 # Write separate file for each batch
121 file_lists = []
122 for batch in batches:
123 batch_file = f"{self.output_prefix}_batch{batch['batch_id']:03d}_files.txt"
124 with open(batch_file, 'w') as f:
125 for file_path in batch['files']:
126 f.write(f"{file_path}\n")
127 file_lists.append(batch_file)
128
129 print(f"\nWrote {len(batches)} separate input file lists:")
130 for i, file_list in enumerate(file_lists):
131 print(f" Batch {i}: {file_list} ({batches[i]['num_files']} files)")
132
133 return file_lists
134
135 def write_batch_metadata(self, batches, output_file=None):
136 """! Write batch metadata to a JSON file.
137
138 This provides information about how files were grouped into batches,
139 useful for generating appropriate output file names.
140
141 @param batches List of batch dictionaries
142 @param output_file Path to output file (default: {output_prefix}_batches.json)
143 @return Path to the written file
144 """
145 if output_file is None:
146 output_file = f"{self.output_prefix}_batches.json"
147
148 with open(output_file, 'w') as f:
149 json.dump(batches, f, indent=2)
150
151 print(f"Wrote batch metadata to: {output_file}")
152
153 return output_file
154
155 def generate_iteration_vars(self, batches, output_file=None):
156 """! Generate iteration variables JSON for hps-mc-job-template.
157
158 Since the template system creates Cartesian products of iteration variables,
159 we create a single "batch_index" variable that can be used to index into
160 the batch metadata.
161
162 Note: For merge jobs, it's often simpler to NOT use iteration variables
163 and instead use the -r (repeat) option with file path parsing in templates.
164
165 @param batches List of batch dictionaries
166 @param output_file Path to output file (default: {output_prefix}_vars.json)
167 @return Path to the written file
168 """
169 if output_file is None:
170 output_file = f"{self.output_prefix}_vars.json"
171
172 # Create a single iteration variable to avoid Cartesian product issues
173 # Users can use this with the batch metadata file if needed
174 vars_dict = {
175 'batch_index': list(range(len(batches)))
176 }
177
178 with open(output_file, 'w') as f:
179 json.dump(vars_dict, f, indent=2)
180
181 print(f"Wrote iteration variables to: {output_file}")
182 print(f"Note: Contains single batch_index variable to avoid Cartesian products")
183
184 return output_file
185
186 def run(self, write_vars=True, write_metadata=True, separate_lists=True):
187 """! Run the full preparation workflow.
188
189 @param write_vars Write iteration variables JSON file
190 @param write_metadata Write batch metadata JSON file
191 @param separate_lists Write separate input file list per batch
192 @return Dictionary with paths to generated files and batch info
193 """
194 print(f"Scanning parent directory: {self.parent_dir}")
195 print(f"Run pattern: {self.run_pattern}")
196 print(f"File pattern: {self.file_pattern}")
197 print(f"Max files per job: {self.max_files_per_job}")
198 print()
199
200 # Scan directories
201 run_files = self.scan_directories()
202
203 if not run_files:
204 print("\nNo files found. Exiting.")
205 return None
206
207 # Create batches
208 batches = self.create_batches(run_files)
209
210 print(f"\nCreated {len(batches)} batches:")
211 for batch in batches:
212 suffix = ""
213 if batch['total_batches'] > 1:
214 suffix = f" (batch {batch['batch_num'] + 1}/{batch['total_batches']})"
215 print(f" Batch {batch['batch_id']}: {batch['run_name']}{suffix} - {batch['num_files']} files")
216
217 # Write file lists
218 file_lists = self.write_input_file_lists(batches, single_file=not separate_lists)
219
220 result = {
221 'file_lists': file_lists if separate_lists else [file_lists],
222 'num_batches': len(batches),
223 'batches': batches,
224 'separate_lists': separate_lists
225 }
226
227 if write_metadata:
228 metadata_file = self.write_batch_metadata(batches)
229 result['metadata_file'] = metadata_file
230
231 if write_vars:
232 vars_file = self.generate_iteration_vars(batches)
233 result['vars_file'] = vars_file
234
235 return result
236
237
238def main():
239 """! Command-line interface for merge job preparation."""
240
241 parser = argparse.ArgumentParser(
242 description="Scan directories for ROOT files and prepare merge job configurations",
243 formatter_class=argparse.RawDescriptionHelpFormatter,
244 epilog="""
245Examples:
246 # Scan directory and prepare job files
247 %(prog)s /path/to/runs
248
249 # Use custom output prefix
250 %(prog)s /path/to/runs -o my_merge
251
252 # Change max files per job
253 %(prog)s /path/to/runs -n 10
254
255 # Custom file and directory patterns
256 %(prog)s /path/to/runs -f "*_recon.root" -r "run_*"
257
258 # Skip generating vars file (only create input file list)
259 %(prog)s /path/to/runs --no-vars
260 """
261 )
262
263 parser.add_argument(
264 'parent_dir',
265 help='Parent directory containing run subdirectories'
266 )
267
268 parser.add_argument(
269 '-o', '--output-prefix',
270 default='merge_jobs',
271 help='Prefix for output files (default: merge_jobs)'
272 )
273
274 parser.add_argument(
275 '-n', '--max-files',
276 type=int,
277 default=20,
278 help='Maximum number of files per merge job (default: 20)'
279 )
280
281 parser.add_argument(
282 '-f', '--file-pattern',
283 default='*.root',
284 help='Glob pattern for files to merge (default: *.root)'
285 )
286
287 parser.add_argument(
288 '-r', '--run-pattern',
289 default='hps_*',
290 help='Glob pattern for run directories (default: hps_*)'
291 )
292
293 parser.add_argument(
294 '--no-vars',
295 action='store_true',
296 help='Do not generate iteration variables JSON file'
297 )
298
299 parser.add_argument(
300 '--no-metadata',
301 action='store_true',
302 help='Do not generate batch metadata JSON file'
303 )
304
305 parser.add_argument(
306 '--single-list',
307 action='store_true',
308 help='Write all files to a single input list instead of separate lists per batch (default: separate lists)'
309 )
310
311 args = parser.parse_args()
312
313 try:
314 prep = MergeJobPreparation(
315 parent_dir=args.parent_dir,
316 output_prefix=args.output_prefix,
317 max_files_per_job=args.max_files,
318 file_pattern=args.file_pattern,
319 run_pattern=args.run_pattern
320 )
321
322 result = prep.run(
323 write_vars=not args.no_vars,
324 write_metadata=not args.no_metadata,
325 separate_lists=not args.single_list
326 )
327
328 if result:
329 print("\n" + "="*60)
330 print("Preparation complete!")
331 print("="*60)
332 print(f"\nGenerated files:")
333
334 if result['separate_lists']:
335 print(f" - {len(result['file_lists'])} separate input file lists:")
336 for file_list in result['file_lists']:
337 print(f" {file_list}")
338 else:
339 print(f" - Input file list: {result['file_lists'][0]}")
340
341 if 'vars_file' in result:
342 print(f" - Iteration vars: {result['vars_file']}")
343 if 'metadata_file' in result:
344 print(f" - Batch metadata: {result['metadata_file']}")
345
346 print(f"\nNext steps:")
347 print(f" 1. Create/use the job template: merge_root.json.tmpl")
348 print(f" 2. Generate jobs for each batch:")
349 print()
350
351 if result['separate_lists']:
352 print(f" # Process each batch separately (recommended)")
353 print(f" for batch_file in {args.output_prefix}_batch*_files.txt; do")
354 print(f" batch_num=$(echo $batch_file | grep -oP 'batch\\K[0-9]+')")
355 print(f" hps-mc-job-template \\")
356 print(f" -j $batch_num \\")
357 print(f" -i root_files $batch_file $(cat $batch_file | wc -l) \\")
358 print(f" merge_root.json.tmpl \\")
359 print(f" {args.output_prefix}_batch${{batch_num}}_jobs.json")
360 print(f" done")
361 print()
362 print(f" # Or combine all into one jobs file:")
363 print(f" cat {args.output_prefix}_batch*_jobs.json | jq -s 'add' > {args.output_prefix}_all_jobs.json")
364 else:
365 print(f" hps-mc-job-template \\")
366 print(f" -i root_files {result['file_lists'][0]} {args.max_files} \\")
367 print(f" merge_root.json.tmpl \\")
368 print(f" {args.output_prefix}_jobs.json")
369
370 return 0
371 else:
372 return 1
373
374 except Exception as e:
375 print(f"Error: {e}", file=sys.stderr)
376 return 1
377
378
379if __name__ == '__main__':
380 sys.exit(main())
Prepare merge jobs by scanning directories for ROOT files.
run(self, write_vars=True, write_metadata=True, separate_lists=True)
Run the full preparation workflow.
write_batch_metadata(self, batches, output_file=None)
Write batch metadata to a JSON file.
write_input_file_lists(self, batches, single_file=False)
Write input file lists for each batch.
scan_directories(self)
Scan parent directory for run directories and ROOT files.
create_batches(self, run_files)
Create batches of files for merge jobs.
__init__(self, parent_dir, output_prefix="merge_jobs", max_files_per_job=20, file_pattern="*.root", run_pattern="hps_*")
Initialize the merge job preparation.
generate_iteration_vars(self, batches, output_file=None)
Generate iteration variables JSON for hps-mc-job-template.
main()
Command-line interface for merge job preparation.