1"""! Tools that can be used in HPSMC jobs."""
10from subprocess
import PIPE
18 Run the SLIC Geant4 simulation.
20 Optional parameters are: **nevents**, **macros**, **run_number**, **disable_particle_table** \n
21 Required parameters are: **detector** \n
22 Required configurations are: **slic_dir**, **detector_dir**
38 self, name=
"slic", command=
"slic", output_ext=
".slcio", **kwargs
43 Setup command arguments.
44 @return list of arguments
47 raise Exception(
"No inputs given for SLIC.")
62 args.extend([
"-m",
"run_number.mac"])
66 if os.path.exists(tbl):
67 args.extend([
"-P", tbl])
69 raise Exception(
"SLIC particle.tbl does not exist: %s" % tbl)
74 if macro ==
"run_number.mac":
75 raise Exception(
"Macro name '%s' is not allowed." % macro)
76 if not os.path.isabs(macro):
77 raise Exception(
"Macro '%s' is not an absolute path." % macro)
78 args.extend([
"-m", macro])
85 """! Return path to detector file."""
89 """! Return path to particle table."""
90 return os.path.join(self.
slic_dir,
"share",
"particle.tbl")
93 """! Configure SLIC component."""
99 raise Exception(
"Failed to find valid detector_dir")
101 "Using detector_dir from install: {}".format(self.
detector_dir)
105 """! Setup SLIC component."""
106 if not os.path.exists(self.
slic_dir):
107 raise Exception(
"slic_dir does not exist: %s" % self.
slic_dir)
111 raise Exception(
"SLIC setup script does not exist: %s" % self.
namename)
114 run_number_cmd =
"/lcio/runNumber %d" % self.
run_number
115 run_number_mac = open(
"run_number.mac",
"w")
116 run_number_mac.write(run_number_cmd)
117 run_number_mac.close()
121 Return list of optional parameters.
123 Optional parameters are: **nevents**, **macros**, **run_number**
124 @return list of optional parameters
126 return [
"nevents",
"macros",
"run_number",
"disable_particle_table"]
130 Return list of required parameters.
132 Required parameters are: **detector**
133 @return list of required parameters
139 Return list of required configurations.
141 Required configurations are: **slic_dir**, **detector_dir**
142 @return list of required configurations
144 return [
"slic_dir",
"detector_dir"]
148 Execute SLIC component.
150 Component is executed by creating command line input
151 from command and command arguments.
152 @return return code of process
155 cl =
'bash -c ". %s && %s %s"' % (
162 proc = subprocess.Popen(cl, shell=
True, stdout=log_out, stderr=log_err)
166 return proc.returncode
171 Copy the SQLite database file to the desired location.
176 Initialize SQLiteProc to copy the SQLite file.
184 "Setting SQLite local copy source file from config: %s"
195 Component.__init__(self, name=
"sqlite_file_copy", **kwargs)
199 Return dummy command arguments to satisfy the parent class.
201 cmd_args = [
"(no-command-needed)"]
203 if not all(isinstance(arg, str)
for arg
in cmd_args):
204 raise ValueError(
"All arguments must be strings.")
210 Execute the file copy operation.
217 f
"Copying file from {self.source_file} to {self.destination_file}"
222 self.
logger.info(f
"Successfully copied file to {self.destination_file}")
226 except Exception
as e:
227 self.
logger.error(f
"Error during file copy: {e}")
233 Run the hps-java JobManager class.
235 Input files have slcio format.
237 Required parameters are: **steering_files** \n
238 Optional parameters are: **detector**, **run_number**, **defs**
270 if "overlay_file" in kwargs:
279 description=
"HPS Java Job Manager",
288 "Append token for '%s' automatically set to '%s' from steering key."
293 """! Configure JobManager component."""
297 if os.getenv(
"HPS_JAVA_BIN_JAR",
None)
is not None:
300 "Set HPS_JAVA_BIN_JAR from environment: {}".format(
306 "hps_java_bin_jar not set in environment or config file!"
309 if os.getenv(
"CONDITIONS_URL",
None)
is not None:
312 "Set CONDITIONS_URL from environment: {}".format(
319 Return list of required configurations.
321 Required configurations are: **hps_java_bin_jar**
322 @retun list of required configurations.
324 return [
"hps_java_bin_jar"]
327 """! Setup JobManager component."""
329 raise Exception(
"No inputs provided to hps-java.")
339 Setup command arguments.
340 @return list of arguments
345 self.
logger.debug(
"Setting java_args from config: %s" % self.
java_args)
366 self.
logger.debug(
"Setting conditions_password from config (not shown)")
392 args.append(
"outputFile=" + os.path.splitext(self.
output_files()[0])[0])
395 for k, v
in self.
defs.items():
397 args.append(k +
"=" + str(v))
402 "Steering does not exist at '%s' so assuming it is a resource."
408 "Steering looks like a file but is not an abs path: %s"
419 args.append(input_file)
423 args.append(
"overlayFile=" + os.path.splitext(self.
overlay_file)[0])
429 Return list of required parameters.
431 Required parameters are: **steering_files**
432 @return list of required parameters
434 return [
"steering_files"]
438 Return list of optional parameters.
440 Optional parameters are: **detector**, **run_number**, **defs**
441 @return list of optional parameters
443 return [
"detector",
"run_number",
"defs",
"nevents"]
448 Run the hpstr analysis tool.
450 Required parameters are: **config_files** \n
451 Optional parameters are: **year**, **is_data**, **nevents** \n
452 Required configs are: **hpstr_install_dir**, **hpstr_base**
455 def __init__(self, cfg=None, is_data=0, year=None, tracking=None, **kwargs):
468 Component.__init__(self, name=
"hpstr", command=
"hpstr", **kwargs)
471 """! Setup HPSTR component."""
486 if len(os.path.dirname(config_file)):
488 if os.path.isabs(config_file):
493 "The config has a directory but is not an abs path: %s" % self.
cfg
498 self.
hpstr_base,
"processors",
"config", config_file
503 if os.path.splitext(self.
input_files()[0])[1] ==
".root":
511 Return list of required parameters.
513 Required parameters are: **config_files**
514 @return list of required parameters
516 return [
"config_files"]
520 Return list of optional parameters.
522 Optional parameters are: **year**, **is_data**, **nevents**
523 @return list of optional parameters
525 return [
"year",
"is_data",
"nevents",
"tracking"]
529 Return list of required configs.
531 Required configs are: **hpstr_install_dir**, **hpstr_base**
532 @return list of required configs
534 return [
"hpstr_install_dir",
"hpstr_base"]
538 Setup command arguments.
539 @return list of arguments
552 if self.
year is not None:
553 args.extend([
"-y", str(self.
year)])
555 args.extend([
"-w", str(self.
tracking)])
559 """! Adjust names of output files."""
562 return [
"%s.root" % f]
569 """! Execute HPSTR component."""
571 cl =
'bash -c ". %s && %s %s"' % (
578 proc = subprocess.Popen(cl, shell=
True, stdout=log_out, stderr=log_err)
582 return proc.returncode
590 Generic class for StdHep tools.
598 "lhe_tridents_displacetime",
599 "lhe_tridents_displaceuni",
607 Component.__init__(self, name=name, command=
"stdhep_" + name, **kwargs)
611 Setup command arguments.
612 @return list of arguments
616 if self.
name in StdHepTool.seed_names:
622 raise Exception(
"Too many outputs specified for StdHepTool.")
624 raise Exception(
"No outputs specified for StdHepTool.")
627 for i
in self.
inputs[::-1]:
630 raise Exception(
"No inputs specified for StdHepTool.")
637 Transform StdHep events into beam coordinates.
639 Optional parameters are: **beam_sigma_x**, **beam_sigma_y**, **beam_rot_x**,
640 **beam_rot_y**, **beam_rot_z**, **target_x**, **target_y**, **target_z**
662 StdHepTool.__init__(self, name=
"beam_coords", append_tok=
"rot", **kwargs)
666 Setup command arguments.
667 @return list of arguments
669 args = StdHepTool.cmd_args(self)
684 args.extend([
"-X", str(self.
target_x)])
686 args.extend([
"-Y", str(self.
target_y)])
688 args.extend([
"-Z", str(self.
target_z)])
694 Return list of optional parameters.
696 Optional parameters are: **beam_sigma_x**, **beam_sigma_y**, **beam_rot_x**,
697 **beam_rot_y**, **beam_rot_z**, **target_x**, **target_y**, **target_z**
698 @return list of optional parameters
714 Randomly sample StdHep events into a new file.
716 Optional parameters are: **nevents**, **mu**
720 StdHepTool.__init__(self, name=
"random_sample", append_tok=
"sampled", **kwargs)
726 Setup command arguments.
727 @return list of arguments
731 if self.
name in StdHepTool.seed_names:
734 args.extend([
"-N", str(1)])
739 if self.
mu is not None:
740 args.extend([
"-m", str(self.
mu)])
744 args.insert(0, os.path.splitext(self.
output_files()[0])[0])
746 raise Exception(
"Too many outputs specified for RandomSample.")
748 raise Exception(
"No outputs specified for RandomSample.")
751 for i
in self.
inputs[::-1]:
754 raise Exception(
"No inputs were provided.")
760 Return list of optional parameters.
762 Optional parameters are: **nevents**, **mu**
763 @return list of optional parameters
765 return [
"nevents",
"mu"]
768 """! Execute RandomSample component"""
769 returncode = Component.execute(self, log_out, log_err)
772 src =
"%s_1.stdhep" % os.path.splitext(self.
output_files()[0])[0]
773 dest =
"%s.stdhep" % os.path.splitext(self.
output_files()[0])[0]
774 self.
logger.debug(
"Moving '%s' to '%s'" % (src, dest))
775 shutil.move(src, dest)
782 Convert LHE files to StdHep, displacing the time by given ctau.
784 Optional parameters are: **ctau**
791 self, name=
"lhe_tridents_displacetime", output_ext=
".stdhep", **kwargs
796 Setup command arguments.
797 @return list of arguments
799 args = StdHepTool.cmd_args(self)
800 if self.
ctau is not None:
801 args.extend([
"-l", str(self.
ctau)])
806 Return list of optional parameters.
808 Optional parameters are: **ctau**
809 @return list of optional parameters
816 Convert LHE files to StdHep, displacing the time by given ctau.
818 Optional parameters are: **ctau**
825 self, name=
"lhe_tridents_displaceuni", output_ext=
".stdhep", **kwargs
830 Setup command arguments.
831 @return list of arguments
833 args = StdHepTool.cmd_args(self)
834 if self.
ctau is not None:
835 args.extend([
"-l", str(self.
ctau)])
840 Return list of optional parameters.
842 Optional parameters are: **ctau**
843 @return list of optional parameters
850 Add mother particles for physics samples.
854 StdHepTool.__init__(self, name=
"add_mother", append_tok=
"mom", **kwargs)
858 """! Add full truth mother particles for physics samples"""
862 self,
"add_mother_full_truth", append_tok=
"mom_full_truth", **kwargs
866 "Must have 2 input files: a stdhep file and a lhe file in order"
871 raise Exception(
"The first input file must be a stdhep file")
875 raise Exception(
"The second input file must be a lhe file")
879 Setup command arguments.
880 @return list of arguments
887 Merge StdHep files, applying poisson sampling.
889 Required parameters are: **target_thickness**, **num_electrons**
900 StdHepTool.__init__(self, name=
"merge_poisson", append_tok=
"sampled", **kwargs)
903 """! Setup MergePoisson component."""
907 raise Exception(
"Cross section is missing.")
908 self.
logger.info(
"mu is %f", self.
mu)
912 Return list of required parameters.
914 Required parameters are: **target_thickness**, **num_electrons**
915 @return list of required parameters
917 return [
"target_thickness",
"num_electrons"]
921 Setup command arguments.
922 @return list of arguments
925 if self.
name in StdHepTool.seed_names:
932 args.insert(0, os.path.splitext(self.
output_files()[0])[0])
934 raise Exception(
"Too many outputs specified for MergePoisson.")
936 raise Exception(
"No outputs specified for MergePoisson.")
939 for i
in self.
inputs[::-1]:
942 raise Exception(
"No inputs were provided.")
947 """! Execute MergePoisson component."""
948 returncode = Component.execute(self, log_out, log_err)
951 src =
"%s_1.stdhep" % os.path.splitext(self.
output_files()[0])[0]
952 dest =
"%s.stdhep" % os.path.splitext(self.
output_files()[0])[0]
953 self.
logger.debug(
"Moving '%s' to '%s'" % (src, dest))
954 shutil.move(src, dest)
963 Optional parameters are: none \n
964 Required parameters are: none
968 StdHepTool.__init__(self, name=
"merge_files", **kwargs)
972 Return list of optional parameters.
974 Optional parameters are: none
975 @return list of optional parameters
981 Return list of required parameters.
983 Required parameters are: none
984 @return list of required parameters
991 Count number of events in a StdHep file.
996 self, name=
"stdhep_count", command=
"stdhep_count.sh", **kwargs
1001 Setup command arguments.
1002 @return list of arguments
1008 """! Execute StdHepCount component."""
1011 proc = subprocess.Popen(cl, stdout=PIPE)
1012 (output, err) = proc.communicate()
1014 nevents = int(output.split()[1])
1015 print(
"StdHep file '%s' has %d events." % (self.
input_files()[0], nevents))
1017 return proc.returncode
1022 Generic base class for Java based tools.
1032 Component.__init__(self, name,
"java", **kwargs)
1036 Return list of required config.
1038 Required config are: **hps_java_bin_jar**
1039 @return list of required config
1041 return [
"hps_java_bin_jar"]
1045 Setup command arguments.
1046 @return list of arguments
1050 self.
logger.debug(
"Setting java_args from config: %s" + self.
java_args)
1068 Convert EVIO events to LCIO using the hps-java EvioToLcio command line tool.
1070 Input files have evio format (format used by DAQ system).
1072 Required parameters are: **detector**, **steering_files** \n
1073 Optional parameters are: **run_number**, **skip_events**, **nevents**, **event_print_interval**
1090 name=
"evio_to_lcio",
1091 java_class=
"org.hps.evio.EvioToLcio",
1092 output_ext=
".slcio",
1098 Return list of required parameters.
1100 Required parameters are: **detector**, **steering_files**
1101 @return list of required parameters
1103 return [
"detector",
"steering_files"]
1107 Return list of optional parameters.
1109 Optional parameters are: **run_number**, **skip_events**, **nevents**, **event_print_interval**
1110 @return list of optional parameters
1112 return [
"run_number",
"skip_events",
"nevents",
"event_print_interval"]
1115 """! Setup EvioToLcio component."""
1125 Setup command arguments.
1126 @return list of arguments
1128 args = JavaTool.cmd_args(self)
1130 raise Exception(
"No output files were provided.")
1132 args.append(
"-DoutputFile=%s" % os.path.splitext(output_file)[0])
1142 "Steering does not exist at '%s' so assuming it is a resource."
1148 "Steering looks like a file but is not an abs path: %s"
1159 args.append(inputfile)
1169 Space MC events and apply energy filters to process before readout.
1171 Optional parameters are: **filter_ecal_hit_ecut**, **filter_event_interval**,
1172 **filter_nevents_read**, **filter_nevents_write**, **filter_no_cuts** \n
1173 Required config are: **hps_java_bin_jar**
1177 if "filter_no_cuts" in kwargs:
1183 if "filter_ecal_pairs" in kwargs:
1188 if "filter_ecal_hit_ecut" in kwargs:
1195 if "filter_event_interval" in kwargs:
1201 if "filter_nevents_read" in kwargs:
1207 if "filter_nevents_write" in kwargs:
1217 name=
"filter_bunches",
1218 java_class=
"org.hps.util.FilterMCBunches",
1224 """! Configure FilterBunches component."""
1227 if os.getenv(
"HPS_JAVA_BIN_JAR",
None)
is not None:
1230 "Set HPS_JAVA_BIN_JAR from environment: {}".format(
1237 Setup command arguments.
1238 @return list of arguments
1240 args = JavaTool.cmd_args(self)
1263 Return list of optional parameters.
1265 Optional parameters are: **filter_ecal_hit_ecut**, **filter_event_interval**,
1266 **filter_nevents_read**, **filter_nevents_write**, **filter_no_cuts** \n
1267 @return list of optional parameters
1270 "filter_ecal_hit_ecut",
1271 "filter_event_interval",
1272 "filter_nevents_read",
1273 "filter_nevents_write",
1279 Return list of required config.
1281 Required config are: **hps_java_bin_jar**
1282 @return list of required config
1284 return [
"hps_java_bin_jar"]
1289 Apply hodo-hit filter and space MC events to process before readout.
1291 The nevents parameter is not settable from JSON in this class. It should
1292 be supplied as an init argument in the job script if it needs to be
1293 customized (the default nevents and event_interval used to apply spacing
1294 should usually not need to be changed by the user). \n
1296 Optional parameters are: **num_hodo_hits**, **event_interval**
1300 if "num_hodo_hits" in kwargs:
1305 if "event_interval" in kwargs:
1312 name=
"filter_events",
1313 java_class=
"org.hps.util.ExtractEventsWithHitAtHodoEcal",
1320 Setup command arguments.
1321 @return list of arguments
1323 args = JavaTool.cmd_args(self)
1339 Return list of optional parameters.
1341 Optional parameters are: **num_hodo_hits**, **event_interval**
1342 @return list of optional parameters
1344 return [
"num_hodo_hits",
"event_interval"]
1349 Unzip the input files to outputs.
1353 Component.__init__(self, name=
"unzip", command=
"gunzip", **kwargs)
1356 """! Return list of output files."""
1359 return [os.path.splitext(i)[0]
for i
in self.
input_files()]
1362 """! Execute Unzip component."""
1366 with gzip.open(inputfile,
"rb")
as in_file, open(
1369 shutil.copyfileobj(in_file, out_file)
1370 self.
logger.debug(
"Unzipped '%s' to '%s'" % (inputfile, outputfile))
1376 Dump LCIO event information.
1378 Required parameters are: none \n
1379 Required config are: **lcio_dir**
1385 Component.__init__(self, name=
"lcio_dump_event", command=
"dumpevent", **kwargs)
1387 if "event_num" in kwargs:
1393 """! Configure LCIODumpEvent component."""
1399 """! Setup LCIODumpEvent component."""
1404 Setup command arguments.
1405 @return list of arguments
1408 raise Exception(
"Missing required inputs for LCIODumpEvent.")
1416 Return list of required config.
1418 Required config are: **lcio_dir**
1419 @return list of required config
1425 Return list of required parameters.
1427 Required parameters are: none
1428 @return list of required parameters
1435 Count events in an LHE file.
1438 def __init__(self, minevents=0, fail_on_underflow=False, **kwargs):
1440 Component.__init__(self, name=
"lhe_count", **kwargs)
1443 """! Setup LHECount component."""
1445 raise Exception(
"Missing at least one input file.")
1449 Check if command exists.
1450 @return True if command exists
1455 """! Execute LHECount component."""
1457 with gzip.open(i,
"rb")
as in_file:
1458 lines = in_file.readlines()
1462 if "<event>" in line:
1465 print(
"LHE file '%s' has %d events." % (i, nevents))
1468 msg =
"LHE file '%s' does not contain the minimum %d events." % (
1472 if self.fail_on_underflow:
1473 raise Exception(msg)
1481 Tar files into an archive.
1485 Component.__init__(self, name=
"tar_files", **kwargs)
1489 Check if command exists.
1490 @return True if command exists
1495 """! Execute TarFiles component."""
1496 self.
logger.debug(
"Opening '%s' for writing ..." % self.
outputs[0])
1497 tar = tarfile.open(self.
outputs[0],
"w")
1499 self.
logger.debug(
"Adding '%s' to archive" % i)
1508 Move input files to new locations.
1512 Component.__init__(self, name=
"move_files", **kwargs)
1516 Check if command exists.
1517 @return True if command exists
1522 """! Execute TarFiles component."""
1524 raise Exception(
"Input and output lists are not the same length!")
1528 self.
logger.info(
"Moving %s -> %s" % (src, dest))
1529 shutil.move(src, dest)
1535 Generic component for LCIO tools.
1537 Required parameters are: none \n
1538 Required config are: **lcio_bin_jar**
1544 Component.__init__(self, name, command=
"java", **kwargs)
1547 """! Configure LCIOTool component."""
1554 Setup command arguments.
1555 @return list of arguments
1558 raise Exception(
"Name required to write cmd args for LCIOTool.")
1563 Return list of required config.
1565 Required config are: **lcio_bin_jar**
1566 @return list of required config
1568 return [
"lcio_bin_jar"]
1572 Return list of required parameters.
1574 Required parameters are: none
1575 @return list of required parameters
1582 Concatenate LCIO files together.
1586 LCIOTool.__init__(self, name=
"concat", **kwargs)
1590 Setup command arguments.
1591 @return list of arguments
1593 args = LCIOTool.cmd_args(self)
1595 raise Exception(
"Missing at least one input file.")
1597 raise Exception(
"Missing an output file.")
1599 args.extend([
"-f", i])
1600 args.extend([
"-o", self.
outputs[0]])
1606 Count events in LCIO files.
1608 Required parameters are: none \n
1609 Optional parameters are: none
1613 LCIOTool.__init__(self, name=
"count", **kwargs)
1617 Setup command arguments.
1618 @return list of arguments
1620 args = LCIOTool.cmd_args(self)
1622 raise Exception(
"Missing an input file.")
1628 Return list of required parameters.
1630 Required parameters are: none
1631 @return list of required parameters
1637 Return list of optional parameters.
1639 Optional parameters are: none
1640 @return list of optional parameters
1651 LCIOTool.__init__(self, name=
"merge", **kwargs)
1655 Setup command arguments.
1656 @return list of arguments
1658 args = LCIOTool.cmd_args(self)
1660 raise Exception(
"Missing at least one input file.")
1662 raise Exception(
"Missing an output file.")
1664 args.extend([
"-f", i])
1665 args.extend([
"-o", self.
outputs[0]])
1672MergeROOT tool for hps-mc
1673Merges ROOT files using hadd with validation
1679 Merge ROOT files using hadd with event count validation.
1681 This component uses ROOT's hadd utility to merge multiple ROOT files
1682 into a single output file, and validates that all events are preserved.
1687 Initialize MergeROOT component.
1692 List of input ROOT files to merge
1694 List containing the output merged ROOT file name
1695 force : bool, optional
1696 Force overwrite of output file (default: True)
1697 compression : int, optional
1698 Compression level for output file (0-9, default: None uses hadd default)
1699 validate : bool, optional
1700 Validate event counts after merge (default: True)
1701 write_stats : bool, optional
1702 Write JSON stats file after merge (default: True when validate=True)
1703 job_id : int, optional
1704 Job ID to include in stats output
1706 Component.__init__(self, **kwargs)
1713 if not hasattr(self,
"force"):
1717 if not hasattr(self,
"compression"):
1721 if not hasattr(self,
"validate"):
1725 if not hasattr(self,
"write_stats"):
1729 if not hasattr(self,
"job_id"):
1741 Build command line arguments for hadd.
1746 List of command arguments
1749 sys.stderr.write(
"MergeROOT DEBUG: cmd_args() called\n")
1750 sys.stderr.write(
" self.force=%s, self.compression=%s\n" % (self.
force, self.
compression))
1769 sys.stderr.write(
"MergeROOT DEBUG: ERROR - No output file specified!\n")
1771 raise RuntimeError(
"MergeROOT: No output file specified")
1777 sys.stderr.write(
"MergeROOT DEBUG: ERROR - No input files specified!\n")
1779 raise RuntimeError(
"MergeROOT: No input files specified")
1781 sys.stderr.write(
"MergeROOT DEBUG: cmd_args() returning: %s\n" % args)
1787 Scan a ROOT file and extract TTree event counts.
1793 log_out : file, optional
1794 Log file for output (used to report multiple key cycles)
1799 Dictionary mapping tree names to entry counts
1805 "MergeROOT: PyROOT is required for validation but not available"
1812 root_file = ROOT.TFile.Open(filename,
"READ")
1813 if not root_file
or root_file.IsZombie():
1814 raise RuntimeError(
"MergeROOT: Cannot open ROOT file: %s" % filename)
1817 for key
in root_file.GetListOfKeys():
1821 if obj.InheritsFrom(
"TTree"):
1822 tree_name = obj.GetName()
1823 cycle = key.GetCycle()
1824 num_entries = obj.GetEntries()
1826 if tree_name
not in tree_cycles:
1827 tree_cycles[tree_name] = []
1828 tree_cycles[tree_name].append((cycle, num_entries))
1833 for tree_name, cycles
in tree_cycles.items():
1836 cycles.sort(key=
lambda x: x[0], reverse=
True)
1837 highest_cycle, highest_entries = cycles[0]
1839 log_out.write(
" WARNING: Multiple key cycles found for tree '%s':\n" % tree_name)
1840 for cyc, ent
in cycles:
1841 marker =
" <-- using" if cyc == highest_cycle
else ""
1842 log_out.write(
" Cycle %d: %d entries%s\n" % (cyc, ent, marker))
1843 tree_counts[tree_name] = highest_entries
1845 tree_counts[tree_name] = cycles[0][1]
1851 Scan all input files and store tree event counts.
1858 log_out.write(
"\n" +
"=" * 70 +
"\n")
1859 log_out.write(
"MergeROOT: Scanning input files for TTrees\n")
1860 log_out.write(
"=" * 70 +
"\n")
1863 if not os.path.exists(input_file):
1864 raise RuntimeError(
"MergeROOT: Input file not found: %s" % input_file)
1866 log_out.write(
"\nScanning: %s\n" % input_file)
1870 log_out.write(
" WARNING: No TTrees found in this file\n")
1872 for tree_name, count
in tree_counts.items():
1873 log_out.write(
" Tree '%s': %d events\n" % (tree_name, count))
1877 log_out.write(
"\n" +
"=" * 70 +
"\n")
1882 Scan output file and store tree event counts.
1891 log_out.write(
"\n" +
"=" * 70 +
"\n")
1892 log_out.write(
"MergeROOT: Scanning output file for TTrees\n")
1893 log_out.write(
"=" * 70 +
"\n")
1894 log_out.write(
"\nScanning: %s\n" % output_file)
1899 log_out.write(
" WARNING: No TTrees found in output file\n")
1902 log_out.write(
" Tree '%s': %d events\n" % (tree_name, count))
1904 log_out.write(
"\n" +
"=" * 70 +
"\n")
1909 Validate that event counts match between input and output files.
1919 True if validation passes, False otherwise
1921 log_out.write(
"\n" +
"=" * 70 +
"\n")
1922 log_out.write(
"MergeROOT: Validating merge results\n")
1923 log_out.write(
"=" * 70 +
"\n\n")
1926 total_input_counts = {}
1929 for tree_name, count
in tree_counts.items():
1930 if tree_name
not in total_input_counts:
1931 total_input_counts[tree_name] = 0
1932 total_input_counts[tree_name] += count
1937 if not total_input_counts:
1938 log_out.write(
"WARNING: No TTrees found in input files\n")
1941 log_out.write(
"Event count validation:\n")
1942 log_out.write(
"-" * 70 +
"\n")
1944 "%-30s %15s %15s %10s\n"
1945 % (
"Tree Name",
"Input Events",
"Output Events",
"Status")
1947 log_out.write(
"-" * 70 +
"\n")
1949 for tree_name, input_count
in sorted(total_input_counts.items()):
1952 if output_count == input_count:
1959 "%-30s %15d %15d %10s\n"
1960 % (tree_name, input_count, output_count, status)
1965 total_input_counts.keys()
1968 log_out.write(
"\nWARNING: Output contains trees not found in inputs:\n")
1969 for tree_name
in extra_trees:
1971 " - %s: %d events\n"
1975 log_out.write(
"-" * 70 +
"\n")
1978 log_out.write(
"\n✓ VALIDATION PASSED: All event counts match!\n")
1980 log_out.write(
"\n✗ VALIDATION FAILED: Event count mismatch detected!\n")
1982 log_out.write(
"=" * 70 +
"\n\n")
1989 Print a summary of the merge operation.
1996 log_out.write(
"\n" +
"=" * 70 +
"\n")
1997 log_out.write(
"MergeROOT: Summary\n")
1998 log_out.write(
"=" * 70 +
"\n")
1999 log_out.write(
"Input files: %d\n" % len(self.
inputsinputs))
2002 log_out.write(
" %d. %s\n" % (i, input_file))
2006 "Compression level: %s\n"
2012 log_out.write(
"\nTotal events in merged file:\n")
2014 log_out.write(
" %-30s: %d events\n" % (tree_name, count))
2016 log_out.write(
"=" * 70 +
"\n")
2021 Get the stats JSON filename based on the output ROOT filename.
2026 Path to stats JSON file (e.g., 'merged_X_job1.root' -> 'merged_X_job1_stats.json')
2031 base, _ = os.path.splitext(output_file)
2032 return base +
"_stats.json"
2036 Write merge statistics to a JSON file.
2042 validation_passed : bool
2043 Whether the validation passed
2046 if stats_file
is None:
2047 log_out.write(
"WARNING: Cannot determine stats filename, skipping stats output\n")
2050 log_out.write(
"\n" +
"=" * 70 +
"\n")
2051 log_out.write(
"MergeROOT: Writing stats to %s\n" % stats_file)
2052 log_out.write(
"=" * 70 +
"\n")
2055 total_input_events = {}
2057 for tree_name, count
in tree_counts.items():
2058 if tree_name
not in total_input_events:
2059 total_input_events[tree_name] = 0
2060 total_input_events[tree_name] += count
2063 input_files_list = []
2066 input_files_list.append({
2068 "events": tree_counts
2076 "input_files": input_files_list,
2077 "total_input_events": total_input_events,
2078 "validation_passed": validation_passed,
2083 with open(stats_file,
'w')
as f:
2084 json.dump(stats, f, indent=2)
2086 log_out.write(
"Stats written successfully\n")
2087 log_out.write(
"=" * 70 +
"\n")
2092 Execute MergeROOT component using hadd.
2104 Return code from hadd command
2107 log_out.write(
"\n" +
"=" * 70 +
"\n")
2108 log_out.write(
"MergeROOT: DEBUG - Entering execute()\n")
2109 log_out.write(
"=" * 70 +
"\n")
2110 log_out.write(
"DEBUG: self.command = %s\n" % self.
commandcommand)
2111 log_out.write(
"DEBUG: self.inputs = %s\n" % self.
inputsinputs)
2112 log_out.write(
"DEBUG: self.outputs = %s\n" % self.
outputsoutputs)
2113 log_out.write(
"DEBUG: self.force = %s\n" % self.
force)
2114 log_out.write(
"DEBUG: self.compression = %s\n" % self.
compression)
2115 log_out.write(
"DEBUG: self.validate = %s\n" % self.
validate)
2119 log_out.write(
"\nDEBUG: Checking if hadd command exists...\n")
2122 raise RuntimeError(
"MergeROOT: hadd command not found in PATH")
2123 log_out.write(
"DEBUG: hadd command found\n")
2127 log_out.write(
"\nDEBUG: Checking input files exist...\n")
2130 log_out.write(
"DEBUG: Checking: %s\n" % input_file)
2132 if not os.path.exists(input_file):
2133 raise RuntimeError(
"MergeROOT: Input file not found: %s" % input_file)
2134 log_out.write(
"DEBUG: -> exists (size: %d bytes)\n" % os.path.getsize(input_file))
2138 log_out.write(
"\nDEBUG: Validation enabled = %s\n" % self.
validate)
2142 log_out.write(
"DEBUG: Starting input file scan...\n")
2145 log_out.write(
"DEBUG: Input file scan complete\n")
2147 except Exception
as e:
2148 log_out.write(
"\nWARNING: Could not scan input files: %s\n" % str(e))
2149 log_out.write(
"Proceeding with merge without validation.\n")
2153 log_out.write(
"\nDEBUG: Building command arguments...\n")
2156 log_out.write(
"DEBUG: cmd_args() returned: %s\n" % self.
cmd_argscmd_args())
2160 log_out.write(
"\n" +
"=" * 70 +
"\n")
2161 log_out.write(
"MergeROOT: Executing hadd\n")
2162 log_out.write(
"=" * 70 +
"\n")
2163 log_out.write(
"Command: %s\n" %
" ".join(cmd))
2164 log_out.write(
"=" * 70 +
"\n\n")
2168 log_out.write(
"DEBUG: About to call subprocess.Popen...\n")
2170 proc = subprocess.Popen(cmd, stdout=log_out, stderr=log_err)
2171 log_out.write(
"DEBUG: Popen returned, PID = %s\n" % proc.pid)
2173 log_out.write(
"DEBUG: Waiting for process to complete...\n")
2176 log_out.write(
"DEBUG: Process completed, returncode = %d\n" % proc.returncode)
2180 if proc.returncode != 0:
2181 log_out.write(
"DEBUG: hadd FAILED with return code %d\n" % proc.returncode)
2184 "MergeROOT: hadd failed with return code %d" % proc.returncode
2188 log_out.write(
"DEBUG: Checking if output file exists: %s\n" % self.
outputsoutputs[0])
2192 "MergeROOT: Output file was not created: %s" % self.
outputsoutputs[0]
2194 log_out.write(
"DEBUG: Output file exists, size = %d bytes\n" % os.path.getsize(self.
outputsoutputs[0]))
2197 log_out.write(
"\n✓ hadd completed successfully\n")
2201 log_out.write(
"\nDEBUG: Post-merge validation check, self.validate = %s\n" % self.
validate)
2203 validation_passed =
True
2206 log_out.write(
"DEBUG: Starting output file scan...\n")
2209 log_out.write(
"DEBUG: Output file scan complete\n")
2211 log_out.write(
"DEBUG: Starting merge validation...\n")
2215 log_out.write(
"DEBUG: Merge validation complete, passed = %s\n" % validation_passed)
2218 if not validation_passed:
2219 raise RuntimeError(
"MergeROOT: Event count validation failed!")
2221 except Exception
as e:
2222 log_out.write(
"\nERROR during validation: %s\n" % str(e))
2227 log_out.write(
"\nDEBUG: write_stats = %s\n" % self.
write_stats)
2232 except Exception
as e:
2233 log_out.write(
"\nWARNING: Could not write stats JSON: %s\n" % str(e))
2237 log_out.write(
"\nDEBUG: Printing summary...\n")
2241 log_out.write(
"\nDEBUG: MergeROOT.execute() returning %d\n" % proc.returncode)
2243 return proc.returncode
2247 Return list of output files.
2252 List containing the merged output ROOT file and optionally the stats JSON
2257 if stats_file
and stats_file
not in files:
2258 files.append(stats_file)
2263 Return list of required configuration parameters.
2268 List of required config parameters (empty for MergeROOT)
Base class for components in a job.
output_files(self)
Return a list of output files created by this component.
config_from_environ(self)
Configure component from environment variables which are just upper case versions of the required con...
cmd_exists(self)
Check if the component's assigned command exists.
cmd_args(self)
Return the command arguments of this component.
input_files(self)
Get a list of input files for this component.
Miscellaneous math functions.