1"""! @package prepare_merge_jobs
2Scan directories for ROOT files and prepare merge job configurations.
4This module scans run directories (e.g., hps_014185), collects ROOT files,
5batches them into groups of up to 20 files, and generates input file lists
6for use with hps-mc-job-template.
14from pathlib
import Path
18 """! Prepare merge jobs by scanning directories for ROOT files."""
20 def __init__(self, parent_dir, output_prefix="merge_jobs", max_files_per_job=20,
21 file_pattern="*.root", run_pattern="hps_*", max_depth=3, path_filter=None):
22 """! Initialize the merge job preparation.
24 @param parent_dir Parent directory containing run subdirectories
25 @param output_prefix Prefix for output file lists and job configs
26 @param max_files_per_job Maximum number of ROOT files per merge job
27 @param file_pattern Glob pattern for files to merge (default: *.root)
28 @param run_pattern Glob pattern for run directories (default: hps_*)
29 @param max_depth Maximum depth to search for ROOT files if not found at top level (default: 3)
30 @param path_filter String that must appear somewhere in the full file path (default: None)
41 raise ValueError(f
"Parent directory does not exist: {self.parent_dir}")
44 """! Recursively search for files matching pattern up to max_depth.
46 @param directory Directory to search in
47 @param current_depth Current recursion depth
48 @return List of file paths found
58 for subdir
in sorted(directory.iterdir()):
61 all_files.extend(files)
67 """! Scan parent directory for run directories and ROOT files.
69 If no ROOT files are found directly in a run directory, searches
70 recursively up to max_depth levels deep.
72 @return Dictionary mapping run names to lists of ROOT file paths
81 print(f
"No directories matching '{self.run_pattern}' found in {self.parent_dir}")
82 print(f
"Searching recursively up to {self.max_depth} levels deep...")
90 parent_name = f.parent.name
91 if parent_name
not in files_by_parent:
92 files_by_parent[parent_name] = []
93 files_by_parent[parent_name].append(str(f))
95 for parent_name, files
in sorted(files_by_parent.items()):
96 run_files[parent_name] = sorted(files)
97 print(f
" {parent_name}: {len(files)} files")
99 print(f
"Found {len(run_dirs)} run directories")
102 for run_dir
in run_dirs:
103 if not run_dir.is_dir():
106 run_name = run_dir.name
115 print(f
" {run_name}: {len(root_files)} files (found in subdirectories)")
117 print(f
" {run_name}: {len(root_files)} files")
120 run_files[run_name] = [str(f)
for f
in root_files]
124 filtered_run_files = {}
125 total_before = sum(len(files)
for files
in run_files.values())
126 for run_name, files
in run_files.items():
127 filtered = [f
for f
in files
if self.
path_filter in f]
129 filtered_run_files[run_name] = filtered
130 total_after = sum(len(files)
for files
in filtered_run_files.values())
131 print(f
"\nPath filter '{self.path_filter}': {total_before} -> {total_after} files")
132 run_files = filtered_run_files
137 """! Create batches of files for merge jobs.
139 If a run has more than max_files_per_job files, it will be split into
142 @param run_files Dictionary mapping run names to lists of file paths
143 @return List of batch dictionaries with metadata
148 for run_name, files
in run_files.items():
154 'batch_id': batch_id,
155 'run_name': run_name,
158 'files': batch_files,
159 'num_files': len(batch_files)
162 batches.append(batch_info)
168 """! Write input file lists for each batch.
170 Creates either a single file list or separate files per batch.
172 @param batches List of batch dictionaries
173 @param single_file If True, write all files to one list; if False, one list per batch
174 @return List of file paths written or single file path if single_file=True
177 output_file = f
"{self.output_prefix}_input_files.txt"
178 with open(output_file,
'w')
as f:
179 for batch
in batches:
180 for file_path
in batch[
'files']:
181 f.write(f
"{file_path}\n")
183 total_files = sum(batch[
'num_files']
for batch
in batches)
184 print(f
"\nWrote {total_files} file paths to: {output_file}")
189 for batch
in batches:
190 batch_file = f
"{self.output_prefix}_batch{batch['batch_id']:03d}_files.txt"
191 with open(batch_file,
'w')
as f:
192 for file_path
in batch[
'files']:
193 f.write(f
"{file_path}\n")
194 file_lists.append(batch_file)
196 print(f
"\nWrote {len(batches)} separate input file lists:")
197 for i, file_list
in enumerate(file_lists):
198 print(f
" Batch {i}: {file_list} ({batches[i]['num_files']} files)")
203 """! Write batch metadata to a JSON file.
205 This provides information about how files were grouped into batches,
206 useful for generating appropriate output file names.
208 @param batches List of batch dictionaries
209 @param output_file Path to output file (default: {output_prefix}_batches.json)
210 @return Path to the written file
212 if output_file
is None:
213 output_file = f
"{self.output_prefix}_batches.json"
215 with open(output_file,
'w')
as f:
216 json.dump(batches, f, indent=2)
218 print(f
"Wrote batch metadata to: {output_file}")
223 """! Generate iteration variables JSON for hps-mc-job-template.
225 Since the template system creates Cartesian products of iteration variables,
226 we create a single "batch_index" variable that can be used to index into
229 Note: For merge jobs, it's often simpler to NOT use iteration variables
230 and instead use the -r (repeat) option with file path parsing in templates.
232 @param batches List of batch dictionaries
233 @param output_file Path to output file (default: {output_prefix}_vars.json)
234 @return Path to the written file
236 if output_file
is None:
237 output_file = f
"{self.output_prefix}_vars.json"
242 'batch_index': list(range(len(batches)))
245 with open(output_file,
'w')
as f:
246 json.dump(vars_dict, f, indent=2)
248 print(f
"Wrote iteration variables to: {output_file}")
249 print(f
"Note: Contains single batch_index variable to avoid Cartesian products")
253 def run(self, write_vars=True, write_metadata=True, separate_lists=True):
254 """! Run the full preparation workflow.
256 @param write_vars Write iteration variables JSON file
257 @param write_metadata Write batch metadata JSON file
258 @param separate_lists Write separate input file list per batch
259 @return Dictionary with paths to generated files and batch info
261 print(f
"Scanning parent directory: {self.parent_dir}")
262 print(f
"Run pattern: {self.run_pattern}")
263 print(f
"File pattern: {self.file_pattern}")
264 print(f
"Max files per job: {self.max_files_per_job}")
265 print(f
"Max search depth: {self.max_depth}")
267 print(f
"Path filter: {self.path_filter}")
274 print(
"\nNo files found. Exiting.")
280 print(f
"\nCreated {len(batches)} batches:")
281 for batch
in batches:
283 if batch[
'total_batches'] > 1:
284 suffix = f
" (batch {batch['batch_num'] + 1}/{batch['total_batches']})"
285 print(f
" Batch {batch['batch_id']}: {batch['run_name']}{suffix} - {batch['num_files']} files")
291 'file_lists': file_lists
if separate_lists
else [file_lists],
292 'num_batches': len(batches),
294 'separate_lists': separate_lists
299 result[
'metadata_file'] = metadata_file
303 result[
'vars_file'] = vars_file
309 """! Command-line interface for merge job preparation."""
311 parser = argparse.ArgumentParser(
312 description=
"Scan directories for ROOT files and prepare merge job configurations",
313 formatter_class=argparse.RawDescriptionHelpFormatter,
316 # Scan directory and prepare job files
317 %(prog)s /path/to/runs
319 # Use custom output prefix
320 %(prog)s /path/to/runs -o my_merge
322 # Change max files per job
323 %(prog)s /path/to/runs -n 10
325 # Custom file and directory patterns
326 %(prog)s /path/to/runs -f "*_recon.root" -r "run_*"
328 # Search deeper for nested ROOT files (up to 5 levels)
329 %(prog)s /path/to/runs -d 5
331 # Search with wildcard pattern when files are in subdirectories
332 %(prog)s /path/to/runs -r "ap*" -d 3
334 # Filter files by path substring (e.g., only include files with "pass5" in path)
335 %(prog)s /path/to/runs -F "pass5"
337 # Skip generating vars file (only create input file list)
338 %(prog)s /path/to/runs --no-vars
344 help=
'Parent directory containing run subdirectories'
348 '-o',
'--output-prefix',
349 default=
'merge_jobs',
350 help=
'Prefix for output files (default: merge_jobs)'
357 help=
'Maximum number of files per merge job (default: 20)'
361 '-f',
'--file-pattern',
363 help=
'Glob pattern for files to merge (default: *.root)'
367 '-r',
'--run-pattern',
369 help=
'Glob pattern for run directories (default: hps_*)'
376 help=
'Maximum depth to search for ROOT files if not found at top level (default: 3)'
380 '-F',
'--path-filter',
382 help=
'Only include files whose full path contains this string'
388 help=
'Do not generate iteration variables JSON file'
394 help=
'Do not generate batch metadata JSON file'
400 help=
'Write all files to a single input list instead of separate lists per batch (default: separate lists)'
403 args = parser.parse_args()
407 parent_dir=args.parent_dir,
408 output_prefix=args.output_prefix,
409 max_files_per_job=args.max_files,
410 file_pattern=args.file_pattern,
411 run_pattern=args.run_pattern,
412 max_depth=args.max_depth,
413 path_filter=args.path_filter
417 write_vars=
not args.no_vars,
418 write_metadata=
not args.no_metadata,
419 separate_lists=
not args.single_list
424 print(
"Preparation complete!")
426 print(f
"\nGenerated files:")
428 if result[
'separate_lists']:
429 print(f
" - {len(result['file_lists'])} separate input file lists:")
430 for file_list
in result[
'file_lists']:
431 print(f
" {file_list}")
433 print(f
" - Input file list: {result['file_lists'][0]}")
435 if 'vars_file' in result:
436 print(f
" - Iteration vars: {result['vars_file']}")
437 if 'metadata_file' in result:
438 print(f
" - Batch metadata: {result['metadata_file']}")
440 print(f
"\nNext steps:")
441 print(f
" 1. Create/use the job template: merge_root.json.tmpl")
442 print(f
" 2. Generate jobs for each batch:")
445 if result[
'separate_lists']:
446 print(f
" # Process each batch separately (recommended)")
447 print(f
" for batch_file in {args.output_prefix}_batch*_files.txt; do")
448 print(f
" batch_num=$(echo $batch_file | grep -oP 'batch\\K[0-9]+')")
449 print(f
" hps-mc-job-template \\")
450 print(f
" -j $batch_num \\")
451 print(f
" -i root_files $batch_file $(cat $batch_file | wc -l) \\")
452 print(f
" merge_root.json.tmpl \\")
453 print(f
" {args.output_prefix}_batch${{batch_num}}_jobs.json")
456 print(f
" # Or combine all into one jobs file:")
457 print(f
" cat {args.output_prefix}_batch*_jobs.json | jq -s 'add' > {args.output_prefix}_all_jobs.json")
459 print(f
" hps-mc-job-template \\")
460 print(f
" -i root_files {result['file_lists'][0]} {args.max_files} \\")
461 print(f
" merge_root.json.tmpl \\")
462 print(f
" {args.output_prefix}_jobs.json")
468 except Exception
as e:
469 print(f
"Error: {e}", file=sys.stderr)
473if __name__ ==
'__main__':
Prepare merge jobs by scanning directories for ROOT files.
run(self, write_vars=True, write_metadata=True, separate_lists=True)
Run the full preparation workflow.
write_batch_metadata(self, batches, output_file=None)
Write batch metadata to a JSON file.
write_input_file_lists(self, batches, single_file=False)
Write input file lists for each batch.
_find_files_recursive(self, directory, current_depth=0)
Recursively search for files matching pattern up to max_depth.
__init__(self, parent_dir, output_prefix="merge_jobs", max_files_per_job=20, file_pattern="*.root", run_pattern="hps_*", max_depth=3, path_filter=None)
Initialize the merge job preparation.
scan_directories(self)
Scan parent directory for run directories and ROOT files.
create_batches(self, run_files)
Create batches of files for merge jobs.
generate_iteration_vars(self, batches, output_file=None)
Generate iteration variables JSON for hps-mc-job-template.
main()
Command-line interface for merge job preparation.