HPS-MC
 
Loading...
Searching...
No Matches
collect_merge_stats.py
Go to the documentation of this file.
1#!/usr/bin/env python3
2"""
3Collector script for MergeROOT statistics files.
4
5This script aggregates merge statistics JSON files from multiple jobs
6and produces a summary report.
7
8Usage:
9 hps-mc-collect-merge-stats <dir> [-o output.json] [-p pattern] [-q]
10"""
11
12import argparse
13import glob
14import json
15import os
16import sys
17
18
20 """
21 Collects and aggregates MergeROOT statistics from JSON files.
22 """
23
24 def __init__(self, search_dir, pattern="**/*_stats.json"):
25 """
26 Initialize the collector.
27
28 Parameters
29 ----------
30 search_dir : str
31 Directory to search for stats files
32 pattern : str
33 Glob pattern for finding stats files (default: **/*_stats.json)
34 """
35 self.search_dir = search_dir
36 self.pattern = pattern
37 self.stats_files = []
38 self.job_stats = []
39 self.summary = {}
40
42 """
43 Find all stats JSON files matching the pattern.
44
45 Returns
46 -------
47 list
48 List of paths to stats files found
49 """
50 search_pattern = os.path.join(self.search_dir, self.pattern)
51 self.stats_files = sorted(glob.glob(search_pattern, recursive=True))
52 return self.stats_files
53
54 def collect(self):
55 """
56 Collect and aggregate statistics from all found files.
57
58 Returns
59 -------
60 dict
61 Aggregated statistics dictionary
62 """
63 if not self.stats_files:
64 self.find_stats_files()
65
66 self.job_stats = []
67 successful_jobs = 0
68 failed_jobs = 0
69 total_input_files = 0
70 tree_totals = {}
71
72 for stats_file in self.stats_files:
73 try:
74 with open(stats_file, 'r') as f:
75 stats = json.load(f)
76
77 self.job_stats.append({
78 "file": stats_file,
79 "stats": stats
80 })
81
82 # Track validation results
83 if stats.get("validation_passed", False):
84 successful_jobs += 1
85 else:
86 failed_jobs += 1
87
88 # Count input files
89 total_input_files += stats.get("num_input_files", 0)
90
91 # Aggregate tree totals (use output events as source of truth)
92 output_events = stats.get("output_events", {})
93 for tree_name, count in output_events.items():
94 if tree_name not in tree_totals:
95 tree_totals[tree_name] = {"input": 0, "output": 0}
96 tree_totals[tree_name]["output"] += count
97
98 # Sum up input events
99 total_input_events = stats.get("total_input_events", {})
100 for tree_name, count in total_input_events.items():
101 if tree_name not in tree_totals:
102 tree_totals[tree_name] = {"input": 0, "output": 0}
103 tree_totals[tree_name]["input"] += count
104
105 except (json.JSONDecodeError, IOError) as e:
106 print("WARNING: Could not read stats file %s: %s" % (stats_file, e),
107 file=sys.stderr)
108 failed_jobs += 1
109 continue
110
111 # Build summary
112 self.summary = {
113 "summary": {
114 "total_jobs": len(self.job_stats),
115 "successful_jobs": successful_jobs,
116 "failed_jobs": failed_jobs,
117 "total_input_files": total_input_files,
118 "all_validations_passed": failed_jobs == 0 and len(self.job_stats) > 0
119 },
120 "tree_totals": tree_totals,
121 "jobs": [js["stats"] for js in self.job_stats]
122 }
123
124 return self.summary
125
126 def write_summary(self, output_file):
127 """
128 Write the summary to a JSON file.
129
130 Parameters
131 ----------
132 output_file : str
133 Path to output JSON file
134 """
135 if not self.summary:
136 self.collect()
137
138 with open(output_file, 'w') as f:
139 json.dump(self.summary, f, indent=2)
140
141 def print_report(self, quiet=False):
142 """
143 Print a human-readable summary report.
144
145 Parameters
146 ----------
147 quiet : bool
148 If True, only print summary line
149 """
150 if not self.summary:
151 self.collect()
152
153 s = self.summary["summary"]
154
155 if quiet:
156 status = "PASS" if s["all_validations_passed"] else "FAIL"
157 print("%s: %d jobs, %d input files" % (
158 status, s["total_jobs"], s["total_input_files"]))
159 return
160
161 print()
162 print("=" * 70)
163 print("MergeROOT Statistics Summary")
164 print("=" * 70)
165 print()
166 print("Search directory: %s" % self.search_dir)
167 print("Stats files found: %d" % len(self.stats_files))
168 print()
169 print("-" * 70)
170 print("Job Summary")
171 print("-" * 70)
172 print(" Total jobs: %d" % s["total_jobs"])
173 print(" Successful jobs: %d" % s["successful_jobs"])
174 print(" Failed jobs: %d" % s["failed_jobs"])
175 print(" Total input files: %d" % s["total_input_files"])
176 print()
177
178 # Tree totals
179 tree_totals = self.summary.get("tree_totals", {})
180 if tree_totals:
181 print("-" * 70)
182 print("Event Counts by Tree")
183 print("-" * 70)
184 print("%-30s %15s %15s %10s" % ("Tree Name", "Input Events", "Output Events", "Status"))
185 print("-" * 70)
186
187 for tree_name, counts in sorted(tree_totals.items()):
188 input_count = counts["input"]
189 output_count = counts["output"]
190 if input_count == output_count:
191 status = "PASS"
192 else:
193 status = "FAIL"
194 print("%-30s %15d %15d %10s" % (tree_name, input_count, output_count, status))
195
196 print("-" * 70)
197 print()
198
199 # Final status
200 if s["all_validations_passed"]:
201 print("OVERALL STATUS: PASS")
202 else:
203 print("OVERALL STATUS: FAIL")
204
205 print("=" * 70)
206 print()
207
208
209def main():
210 """Main entry point for the collector script."""
211 parser = argparse.ArgumentParser(
212 description="Collect and aggregate MergeROOT statistics from JSON files.",
213 formatter_class=argparse.RawDescriptionHelpFormatter,
214 epilog="""
215Examples:
216 # Collect stats from current directory
217 hps-mc-collect-merge-stats .
218
219 # Collect stats and write summary to file
220 hps-mc-collect-merge-stats /path/to/output -o summary.json
221
222 # Use custom pattern
223 hps-mc-collect-merge-stats . -p "**/merge_*_stats.json"
224
225 # Quiet mode (single line output)
226 hps-mc-collect-merge-stats . -q
227"""
228 )
229
230 parser.add_argument(
231 "directory",
232 help="Directory to search for stats files"
233 )
234 parser.add_argument(
235 "-o", "--output",
236 help="Output JSON file for aggregated summary"
237 )
238 parser.add_argument(
239 "-p", "--pattern",
240 default="**/*_stats.json",
241 help="Glob pattern for stats files (default: **/*_stats.json)"
242 )
243 parser.add_argument(
244 "-q", "--quiet",
245 action="store_true",
246 help="Quiet mode - only print summary line"
247 )
248
249 args = parser.parse_args()
250
251 # Validate directory
252 if not os.path.isdir(args.directory):
253 print("ERROR: Directory not found: %s" % args.directory, file=sys.stderr)
254 sys.exit(1)
255
256 # Create collector and run
257 collector = MergeStatsCollector(args.directory, args.pattern)
258 stats_files = collector.find_stats_files()
259
260 if not stats_files:
261 print("WARNING: No stats files found matching pattern '%s' in %s" % (
262 args.pattern, args.directory), file=sys.stderr)
263 sys.exit(0)
264
265 # Collect and report
266 collector.collect()
267 collector.print_report(quiet=args.quiet)
268
269 # Write output file if requested
270 if args.output:
271 collector.write_summary(args.output)
272 if not args.quiet:
273 print("Summary written to: %s" % args.output)
274
275 # Exit with appropriate code
276 if collector.summary["summary"]["all_validations_passed"]:
277 sys.exit(0)
278 else:
279 sys.exit(1)
280
281
282if __name__ == "__main__":
283 main()
__init__(self, search_dir, pattern="**/*_stats.json")