HPS-MC
 
Loading...
Searching...
No Matches
prepare_merge_jobs.py
Go to the documentation of this file.
1"""! @package prepare_merge_jobs
2Scan directories for ROOT files and prepare merge job configurations.
3
4This module scans run directories (e.g., hps_014185), collects ROOT files,
5batches them into groups of up to 20 files, and generates input file lists
6for use with hps-mc-job-template.
7"""
8
9import os
10import sys
11import glob
12import argparse
13import json
14from pathlib import Path
15
16
18 """! Prepare merge jobs by scanning directories for ROOT files."""
19
20 def __init__(self, parent_dir, output_prefix="merge_jobs", max_files_per_job=20,
21 file_pattern="*.root", run_pattern="hps_*", max_depth=3, path_filter=None):
22 """! Initialize the merge job preparation.
23
24 @param parent_dir Parent directory containing run subdirectories
25 @param output_prefix Prefix for output file lists and job configs
26 @param max_files_per_job Maximum number of ROOT files per merge job
27 @param file_pattern Glob pattern for files to merge (default: *.root)
28 @param run_pattern Glob pattern for run directories (default: hps_*)
29 @param max_depth Maximum depth to search for ROOT files if not found at top level (default: 3)
30 @param path_filter String that must appear somewhere in the full file path (default: None)
31 """
32 self.parent_dir = Path(parent_dir).resolve()
33 self.output_prefix = output_prefix
34 self.max_files_per_job = max_files_per_job
35 self.file_pattern = file_pattern
36 self.run_pattern = run_pattern
37 self.max_depth = max_depth
38 self.path_filter = path_filter
39
40 if not self.parent_dir.is_dir():
41 raise ValueError(f"Parent directory does not exist: {self.parent_dir}")
42
43 def _find_files_recursive(self, directory, current_depth=0):
44 """! Recursively search for files matching pattern up to max_depth.
45
46 @param directory Directory to search in
47 @param current_depth Current recursion depth
48 @return List of file paths found
49 """
50 # First, check for files at current level
51 root_files = sorted(directory.glob(self.file_pattern))
52 if root_files:
53 return root_files
54
55 # If no files found and we haven't exceeded max depth, search subdirectories
56 if current_depth < self.max_depth:
57 all_files = []
58 for subdir in sorted(directory.iterdir()):
59 if subdir.is_dir():
60 files = self._find_files_recursive(subdir, current_depth + 1)
61 all_files.extend(files)
62 return all_files
63
64 return []
65
67 """! Scan parent directory for run directories and ROOT files.
68
69 If no ROOT files are found directly in a run directory, searches
70 recursively up to max_depth levels deep.
71
72 @return Dictionary mapping run names to lists of ROOT file paths
73 """
74 run_files = {}
75
76 # Find all run directories matching the pattern
77 run_dirs = sorted(self.parent_dir.glob(self.run_pattern))
78
79 if not run_dirs:
80 # If no directories match the pattern, try scanning deeper
81 print(f"No directories matching '{self.run_pattern}' found in {self.parent_dir}")
82 print(f"Searching recursively up to {self.max_depth} levels deep...")
83
84 # Search the parent directory itself
85 root_files = self._find_files_recursive(self.parent_dir, current_depth=0)
86 if root_files:
87 # Group files by their parent directory name
88 files_by_parent = {}
89 for f in root_files:
90 parent_name = f.parent.name
91 if parent_name not in files_by_parent:
92 files_by_parent[parent_name] = []
93 files_by_parent[parent_name].append(str(f))
94
95 for parent_name, files in sorted(files_by_parent.items()):
96 run_files[parent_name] = sorted(files)
97 print(f" {parent_name}: {len(files)} files")
98 else:
99 print(f"Found {len(run_dirs)} run directories")
100
101 # Scan each run directory for ROOT files
102 for run_dir in run_dirs:
103 if not run_dir.is_dir():
104 continue
105
106 run_name = run_dir.name
107
108 # First try direct glob, then recursive search if needed
109 root_files = sorted(run_dir.glob(self.file_pattern))
110
111 if not root_files:
112 # Search deeper
113 root_files = self._find_files_recursive(run_dir, current_depth=0)
114 if root_files:
115 print(f" {run_name}: {len(root_files)} files (found in subdirectories)")
116 else:
117 print(f" {run_name}: {len(root_files)} files")
118
119 if root_files:
120 run_files[run_name] = [str(f) for f in root_files]
121
122 # Apply path filter if specified
123 if self.path_filter:
124 filtered_run_files = {}
125 total_before = sum(len(files) for files in run_files.values())
126 for run_name, files in run_files.items():
127 filtered = [f for f in files if self.path_filter in f]
128 if filtered:
129 filtered_run_files[run_name] = filtered
130 total_after = sum(len(files) for files in filtered_run_files.values())
131 print(f"\nPath filter '{self.path_filter}': {total_before} -> {total_after} files")
132 run_files = filtered_run_files
133
134 return run_files
135
136 def create_batches(self, run_files):
137 """! Create batches of files for merge jobs.
138
139 If a run has more than max_files_per_job files, it will be split into
140 multiple batches.
141
142 @param run_files Dictionary mapping run names to lists of file paths
143 @return List of batch dictionaries with metadata
144 """
145 batches = []
146 batch_id = 0
147
148 for run_name, files in run_files.items():
149 # Split files into batches of max_files_per_job
150 for i in range(0, len(files), self.max_files_per_job):
151 batch_files = files[i:i + self.max_files_per_job]
152
153 batch_info = {
154 'batch_id': batch_id,
155 'run_name': run_name,
156 'batch_num': i // self.max_files_per_job,
157 'total_batches': (len(files) + self.max_files_per_job - 1) // self.max_files_per_job,
158 'files': batch_files,
159 'num_files': len(batch_files)
160 }
161
162 batches.append(batch_info)
163 batch_id += 1
164
165 return batches
166
167 def write_input_file_lists(self, batches, single_file=False):
168 """! Write input file lists for each batch.
169
170 Creates either a single file list or separate files per batch.
171
172 @param batches List of batch dictionaries
173 @param single_file If True, write all files to one list; if False, one list per batch
174 @return List of file paths written or single file path if single_file=True
175 """
176 if single_file:
177 output_file = f"{self.output_prefix}_input_files.txt"
178 with open(output_file, 'w') as f:
179 for batch in batches:
180 for file_path in batch['files']:
181 f.write(f"{file_path}\n")
182
183 total_files = sum(batch['num_files'] for batch in batches)
184 print(f"\nWrote {total_files} file paths to: {output_file}")
185 return output_file
186 else:
187 # Write separate file for each batch
188 file_lists = []
189 for batch in batches:
190 batch_file = f"{self.output_prefix}_batch{batch['batch_id']:03d}_files.txt"
191 with open(batch_file, 'w') as f:
192 for file_path in batch['files']:
193 f.write(f"{file_path}\n")
194 file_lists.append(batch_file)
195
196 print(f"\nWrote {len(batches)} separate input file lists:")
197 for i, file_list in enumerate(file_lists):
198 print(f" Batch {i}: {file_list} ({batches[i]['num_files']} files)")
199
200 return file_lists
201
202 def write_batch_metadata(self, batches, output_file=None):
203 """! Write batch metadata to a JSON file.
204
205 This provides information about how files were grouped into batches,
206 useful for generating appropriate output file names.
207
208 @param batches List of batch dictionaries
209 @param output_file Path to output file (default: {output_prefix}_batches.json)
210 @return Path to the written file
211 """
212 if output_file is None:
213 output_file = f"{self.output_prefix}_batches.json"
214
215 with open(output_file, 'w') as f:
216 json.dump(batches, f, indent=2)
217
218 print(f"Wrote batch metadata to: {output_file}")
219
220 return output_file
221
222 def generate_iteration_vars(self, batches, output_file=None):
223 """! Generate iteration variables JSON for hps-mc-job-template.
224
225 Since the template system creates Cartesian products of iteration variables,
226 we create a single "batch_index" variable that can be used to index into
227 the batch metadata.
228
229 Note: For merge jobs, it's often simpler to NOT use iteration variables
230 and instead use the -r (repeat) option with file path parsing in templates.
231
232 @param batches List of batch dictionaries
233 @param output_file Path to output file (default: {output_prefix}_vars.json)
234 @return Path to the written file
235 """
236 if output_file is None:
237 output_file = f"{self.output_prefix}_vars.json"
238
239 # Create a single iteration variable to avoid Cartesian product issues
240 # Users can use this with the batch metadata file if needed
241 vars_dict = {
242 'batch_index': list(range(len(batches)))
243 }
244
245 with open(output_file, 'w') as f:
246 json.dump(vars_dict, f, indent=2)
247
248 print(f"Wrote iteration variables to: {output_file}")
249 print(f"Note: Contains single batch_index variable to avoid Cartesian products")
250
251 return output_file
252
253 def run(self, write_vars=True, write_metadata=True, separate_lists=True):
254 """! Run the full preparation workflow.
255
256 @param write_vars Write iteration variables JSON file
257 @param write_metadata Write batch metadata JSON file
258 @param separate_lists Write separate input file list per batch
259 @return Dictionary with paths to generated files and batch info
260 """
261 print(f"Scanning parent directory: {self.parent_dir}")
262 print(f"Run pattern: {self.run_pattern}")
263 print(f"File pattern: {self.file_pattern}")
264 print(f"Max files per job: {self.max_files_per_job}")
265 print(f"Max search depth: {self.max_depth}")
266 if self.path_filter:
267 print(f"Path filter: {self.path_filter}")
268 print()
269
270 # Scan directories
271 run_files = self.scan_directories()
272
273 if not run_files:
274 print("\nNo files found. Exiting.")
275 return None
276
277 # Create batches
278 batches = self.create_batches(run_files)
279
280 print(f"\nCreated {len(batches)} batches:")
281 for batch in batches:
282 suffix = ""
283 if batch['total_batches'] > 1:
284 suffix = f" (batch {batch['batch_num'] + 1}/{batch['total_batches']})"
285 print(f" Batch {batch['batch_id']}: {batch['run_name']}{suffix} - {batch['num_files']} files")
286
287 # Write file lists
288 file_lists = self.write_input_file_lists(batches, single_file=not separate_lists)
289
290 result = {
291 'file_lists': file_lists if separate_lists else [file_lists],
292 'num_batches': len(batches),
293 'batches': batches,
294 'separate_lists': separate_lists
295 }
296
297 if write_metadata:
298 metadata_file = self.write_batch_metadata(batches)
299 result['metadata_file'] = metadata_file
300
301 if write_vars:
302 vars_file = self.generate_iteration_vars(batches)
303 result['vars_file'] = vars_file
304
305 return result
306
307
308def main():
309 """! Command-line interface for merge job preparation."""
310
311 parser = argparse.ArgumentParser(
312 description="Scan directories for ROOT files and prepare merge job configurations",
313 formatter_class=argparse.RawDescriptionHelpFormatter,
314 epilog="""
315Examples:
316 # Scan directory and prepare job files
317 %(prog)s /path/to/runs
318
319 # Use custom output prefix
320 %(prog)s /path/to/runs -o my_merge
321
322 # Change max files per job
323 %(prog)s /path/to/runs -n 10
324
325 # Custom file and directory patterns
326 %(prog)s /path/to/runs -f "*_recon.root" -r "run_*"
327
328 # Search deeper for nested ROOT files (up to 5 levels)
329 %(prog)s /path/to/runs -d 5
330
331 # Search with wildcard pattern when files are in subdirectories
332 %(prog)s /path/to/runs -r "ap*" -d 3
333
334 # Filter files by path substring (e.g., only include files with "pass5" in path)
335 %(prog)s /path/to/runs -F "pass5"
336
337 # Skip generating vars file (only create input file list)
338 %(prog)s /path/to/runs --no-vars
339 """
340 )
341
342 parser.add_argument(
343 'parent_dir',
344 help='Parent directory containing run subdirectories'
345 )
346
347 parser.add_argument(
348 '-o', '--output-prefix',
349 default='merge_jobs',
350 help='Prefix for output files (default: merge_jobs)'
351 )
352
353 parser.add_argument(
354 '-n', '--max-files',
355 type=int,
356 default=20,
357 help='Maximum number of files per merge job (default: 20)'
358 )
359
360 parser.add_argument(
361 '-f', '--file-pattern',
362 default='*.root',
363 help='Glob pattern for files to merge (default: *.root)'
364 )
365
366 parser.add_argument(
367 '-r', '--run-pattern',
368 default='hps_*',
369 help='Glob pattern for run directories (default: hps_*)'
370 )
371
372 parser.add_argument(
373 '-d', '--max-depth',
374 type=int,
375 default=3,
376 help='Maximum depth to search for ROOT files if not found at top level (default: 3)'
377 )
378
379 parser.add_argument(
380 '-F', '--path-filter',
381 default=None,
382 help='Only include files whose full path contains this string'
383 )
384
385 parser.add_argument(
386 '--no-vars',
387 action='store_true',
388 help='Do not generate iteration variables JSON file'
389 )
390
391 parser.add_argument(
392 '--no-metadata',
393 action='store_true',
394 help='Do not generate batch metadata JSON file'
395 )
396
397 parser.add_argument(
398 '--single-list',
399 action='store_true',
400 help='Write all files to a single input list instead of separate lists per batch (default: separate lists)'
401 )
402
403 args = parser.parse_args()
404
405 try:
406 prep = MergeJobPreparation(
407 parent_dir=args.parent_dir,
408 output_prefix=args.output_prefix,
409 max_files_per_job=args.max_files,
410 file_pattern=args.file_pattern,
411 run_pattern=args.run_pattern,
412 max_depth=args.max_depth,
413 path_filter=args.path_filter
414 )
415
416 result = prep.run(
417 write_vars=not args.no_vars,
418 write_metadata=not args.no_metadata,
419 separate_lists=not args.single_list
420 )
421
422 if result:
423 print("\n" + "="*60)
424 print("Preparation complete!")
425 print("="*60)
426 print(f"\nGenerated files:")
427
428 if result['separate_lists']:
429 print(f" - {len(result['file_lists'])} separate input file lists:")
430 for file_list in result['file_lists']:
431 print(f" {file_list}")
432 else:
433 print(f" - Input file list: {result['file_lists'][0]}")
434
435 if 'vars_file' in result:
436 print(f" - Iteration vars: {result['vars_file']}")
437 if 'metadata_file' in result:
438 print(f" - Batch metadata: {result['metadata_file']}")
439
440 print(f"\nNext steps:")
441 print(f" 1. Create/use the job template: merge_root.json.tmpl")
442 print(f" 2. Generate jobs for each batch:")
443 print()
444
445 if result['separate_lists']:
446 print(f" # Process each batch separately (recommended)")
447 print(f" for batch_file in {args.output_prefix}_batch*_files.txt; do")
448 print(f" batch_num=$(echo $batch_file | grep -oP 'batch\\K[0-9]+')")
449 print(f" hps-mc-job-template \\")
450 print(f" -j $batch_num \\")
451 print(f" -i root_files $batch_file $(cat $batch_file | wc -l) \\")
452 print(f" merge_root.json.tmpl \\")
453 print(f" {args.output_prefix}_batch${{batch_num}}_jobs.json")
454 print(f" done")
455 print()
456 print(f" # Or combine all into one jobs file:")
457 print(f" cat {args.output_prefix}_batch*_jobs.json | jq -s 'add' > {args.output_prefix}_all_jobs.json")
458 else:
459 print(f" hps-mc-job-template \\")
460 print(f" -i root_files {result['file_lists'][0]} {args.max_files} \\")
461 print(f" merge_root.json.tmpl \\")
462 print(f" {args.output_prefix}_jobs.json")
463
464 return 0
465 else:
466 return 1
467
468 except Exception as e:
469 print(f"Error: {e}", file=sys.stderr)
470 return 1
471
472
473if __name__ == '__main__':
474 sys.exit(main())
Prepare merge jobs by scanning directories for ROOT files.
run(self, write_vars=True, write_metadata=True, separate_lists=True)
Run the full preparation workflow.
write_batch_metadata(self, batches, output_file=None)
Write batch metadata to a JSON file.
write_input_file_lists(self, batches, single_file=False)
Write input file lists for each batch.
_find_files_recursive(self, directory, current_depth=0)
Recursively search for files matching pattern up to max_depth.
__init__(self, parent_dir, output_prefix="merge_jobs", max_files_per_job=20, file_pattern="*.root", run_pattern="hps_*", max_depth=3, path_filter=None)
Initialize the merge job preparation.
scan_directories(self)
Scan parent directory for run directories and ROOT files.
create_batches(self, run_files)
Create batches of files for merge jobs.
generate_iteration_vars(self, batches, output_file=None)
Generate iteration variables JSON for hps-mc-job-template.
main()
Command-line interface for merge job preparation.