1"""! @package prepare_merge_jobs
2Scan directories for ROOT files and prepare merge job configurations.
4This module scans run directories (e.g., hps_014185), collects ROOT files,
5batches them into groups of up to 20 files, and generates input file lists
6for use with hps-mc-job-template.
14from pathlib
import Path
18 """! Prepare merge jobs by scanning directories for ROOT files."""
20 def __init__(self, parent_dir, output_prefix="merge_jobs", max_files_per_job=20,
21 file_pattern="*.root", run_pattern="hps_*"):
22 """! Initialize the merge job preparation.
24 @param parent_dir Parent directory containing run subdirectories
25 @param output_prefix Prefix for output file lists and job configs
26 @param max_files_per_job Maximum number of ROOT files per merge job
27 @param file_pattern Glob pattern for files to merge (default: *.root)
28 @param run_pattern Glob pattern for run directories (default: hps_*)
37 raise ValueError(f
"Parent directory does not exist: {self.parent_dir}")
40 """! Scan parent directory for run directories and ROOT files.
42 @return Dictionary mapping run names to lists of ROOT file paths
50 print(f
"Warning: No directories matching '{self.run_pattern}' found in {self.parent_dir}")
53 print(f
"Found {len(run_dirs)} run directories")
56 for run_dir
in run_dirs:
57 if not run_dir.is_dir():
60 run_name = run_dir.name
64 run_files[run_name] = [str(f)
for f
in root_files]
65 print(f
" {run_name}: {len(root_files)} files")
70 """! Create batches of files for merge jobs.
72 If a run has more than max_files_per_job files, it will be split into
75 @param run_files Dictionary mapping run names to lists of file paths
76 @return List of batch dictionaries with metadata
81 for run_name, files
in run_files.items():
92 'num_files': len(batch_files)
95 batches.append(batch_info)
101 """! Write input file lists for each batch.
103 Creates either a single file list or separate files per batch.
105 @param batches List of batch dictionaries
106 @param single_file If True, write all files to one list; if False, one list per batch
107 @return List of file paths written or single file path if single_file=True
110 output_file = f
"{self.output_prefix}_input_files.txt"
111 with open(output_file,
'w')
as f:
112 for batch
in batches:
113 for file_path
in batch[
'files']:
114 f.write(f
"{file_path}\n")
116 total_files = sum(batch[
'num_files']
for batch
in batches)
117 print(f
"\nWrote {total_files} file paths to: {output_file}")
122 for batch
in batches:
123 batch_file = f
"{self.output_prefix}_batch{batch['batch_id']:03d}_files.txt"
124 with open(batch_file,
'w')
as f:
125 for file_path
in batch[
'files']:
126 f.write(f
"{file_path}\n")
127 file_lists.append(batch_file)
129 print(f
"\nWrote {len(batches)} separate input file lists:")
130 for i, file_list
in enumerate(file_lists):
131 print(f
" Batch {i}: {file_list} ({batches[i]['num_files']} files)")
136 """! Write batch metadata to a JSON file.
138 This provides information about how files were grouped into batches,
139 useful for generating appropriate output file names.
141 @param batches List of batch dictionaries
142 @param output_file Path to output file (default: {output_prefix}_batches.json)
143 @return Path to the written file
145 if output_file
is None:
146 output_file = f
"{self.output_prefix}_batches.json"
148 with open(output_file,
'w')
as f:
149 json.dump(batches, f, indent=2)
151 print(f
"Wrote batch metadata to: {output_file}")
156 """! Generate iteration variables JSON for hps-mc-job-template.
158 Since the template system creates Cartesian products of iteration variables,
159 we create a single "batch_index" variable that can be used to index into
162 Note: For merge jobs, it's often simpler to NOT use iteration variables
163 and instead use the -r (repeat) option with file path parsing in templates.
165 @param batches List of batch dictionaries
166 @param output_file Path to output file (default: {output_prefix}_vars.json)
167 @return Path to the written file
169 if output_file
is None:
170 output_file = f
"{self.output_prefix}_vars.json"
175 'batch_index': list(range(len(batches)))
178 with open(output_file,
'w')
as f:
179 json.dump(vars_dict, f, indent=2)
181 print(f
"Wrote iteration variables to: {output_file}")
182 print(f
"Note: Contains single batch_index variable to avoid Cartesian products")
186 def run(self, write_vars=True, write_metadata=True, separate_lists=True):
187 """! Run the full preparation workflow.
189 @param write_vars Write iteration variables JSON file
190 @param write_metadata Write batch metadata JSON file
191 @param separate_lists Write separate input file list per batch
192 @return Dictionary with paths to generated files and batch info
194 print(f
"Scanning parent directory: {self.parent_dir}")
195 print(f
"Run pattern: {self.run_pattern}")
196 print(f
"File pattern: {self.file_pattern}")
197 print(f
"Max files per job: {self.max_files_per_job}")
204 print(
"\nNo files found. Exiting.")
210 print(f
"\nCreated {len(batches)} batches:")
211 for batch
in batches:
213 if batch[
'total_batches'] > 1:
214 suffix = f
" (batch {batch['batch_num'] + 1}/{batch['total_batches']})"
215 print(f
" Batch {batch['batch_id']}: {batch['run_name']}{suffix} - {batch['num_files']} files")
221 'file_lists': file_lists
if separate_lists
else [file_lists],
222 'num_batches': len(batches),
224 'separate_lists': separate_lists
229 result[
'metadata_file'] = metadata_file
233 result[
'vars_file'] = vars_file
239 """! Command-line interface for merge job preparation."""
241 parser = argparse.ArgumentParser(
242 description=
"Scan directories for ROOT files and prepare merge job configurations",
243 formatter_class=argparse.RawDescriptionHelpFormatter,
246 # Scan directory and prepare job files
247 %(prog)s /path/to/runs
249 # Use custom output prefix
250 %(prog)s /path/to/runs -o my_merge
252 # Change max files per job
253 %(prog)s /path/to/runs -n 10
255 # Custom file and directory patterns
256 %(prog)s /path/to/runs -f "*_recon.root" -r "run_*"
258 # Skip generating vars file (only create input file list)
259 %(prog)s /path/to/runs --no-vars
265 help=
'Parent directory containing run subdirectories'
269 '-o',
'--output-prefix',
270 default=
'merge_jobs',
271 help=
'Prefix for output files (default: merge_jobs)'
278 help=
'Maximum number of files per merge job (default: 20)'
282 '-f',
'--file-pattern',
284 help=
'Glob pattern for files to merge (default: *.root)'
288 '-r',
'--run-pattern',
290 help=
'Glob pattern for run directories (default: hps_*)'
296 help=
'Do not generate iteration variables JSON file'
302 help=
'Do not generate batch metadata JSON file'
308 help=
'Write all files to a single input list instead of separate lists per batch (default: separate lists)'
311 args = parser.parse_args()
315 parent_dir=args.parent_dir,
316 output_prefix=args.output_prefix,
317 max_files_per_job=args.max_files,
318 file_pattern=args.file_pattern,
319 run_pattern=args.run_pattern
323 write_vars=
not args.no_vars,
324 write_metadata=
not args.no_metadata,
325 separate_lists=
not args.single_list
330 print(
"Preparation complete!")
332 print(f
"\nGenerated files:")
334 if result[
'separate_lists']:
335 print(f
" - {len(result['file_lists'])} separate input file lists:")
336 for file_list
in result[
'file_lists']:
337 print(f
" {file_list}")
339 print(f
" - Input file list: {result['file_lists'][0]}")
341 if 'vars_file' in result:
342 print(f
" - Iteration vars: {result['vars_file']}")
343 if 'metadata_file' in result:
344 print(f
" - Batch metadata: {result['metadata_file']}")
346 print(f
"\nNext steps:")
347 print(f
" 1. Create/use the job template: merge_root.json.tmpl")
348 print(f
" 2. Generate jobs for each batch:")
351 if result[
'separate_lists']:
352 print(f
" # Process each batch separately (recommended)")
353 print(f
" for batch_file in {args.output_prefix}_batch*_files.txt; do")
354 print(f
" batch_num=$(echo $batch_file | grep -oP 'batch\\K[0-9]+')")
355 print(f
" hps-mc-job-template \\")
356 print(f
" -j $batch_num \\")
357 print(f
" -i root_files $batch_file $(cat $batch_file | wc -l) \\")
358 print(f
" merge_root.json.tmpl \\")
359 print(f
" {args.output_prefix}_batch${{batch_num}}_jobs.json")
362 print(f
" # Or combine all into one jobs file:")
363 print(f
" cat {args.output_prefix}_batch*_jobs.json | jq -s 'add' > {args.output_prefix}_all_jobs.json")
365 print(f
" hps-mc-job-template \\")
366 print(f
" -i root_files {result['file_lists'][0]} {args.max_files} \\")
367 print(f
" merge_root.json.tmpl \\")
368 print(f
" {args.output_prefix}_jobs.json")
374 except Exception
as e:
375 print(f
"Error: {e}", file=sys.stderr)
379if __name__ ==
'__main__':
Prepare merge jobs by scanning directories for ROOT files.
run(self, write_vars=True, write_metadata=True, separate_lists=True)
Run the full preparation workflow.
write_batch_metadata(self, batches, output_file=None)
Write batch metadata to a JSON file.
write_input_file_lists(self, batches, single_file=False)
Write input file lists for each batch.
scan_directories(self)
Scan parent directory for run directories and ROOT files.
create_batches(self, run_files)
Create batches of files for merge jobs.
__init__(self, parent_dir, output_prefix="merge_jobs", max_files_per_job=20, file_pattern="*.root", run_pattern="hps_*")
Initialize the merge job preparation.
generate_iteration_vars(self, batches, output_file=None)
Generate iteration variables JSON for hps-mc-job-template.
main()
Command-line interface for merge job preparation.