1"""! Tools that can be used in HPSMC jobs."""
10from subprocess
import PIPE
18 Run the SLIC Geant4 simulation.
20 Optional parameters are: **nevents**, **macros**, **run_number**, **disable_particle_table** \n
21 Required parameters are: **detector** \n
22 Required configurations are: **slic_dir**, **detector_dir**
38 self, name=
"slic", command=
"slic", output_ext=
".slcio", **kwargs
43 Setup command arguments.
44 @return list of arguments
47 raise Exception(
"No inputs given for SLIC.")
62 args.extend([
"-m",
"run_number.mac"])
66 if os.path.exists(tbl):
67 args.extend([
"-P", tbl])
69 raise Exception(
"SLIC particle.tbl does not exist: %s" % tbl)
74 if macro ==
"run_number.mac":
75 raise Exception(
"Macro name '%s' is not allowed." % macro)
76 if not os.path.isabs(macro):
77 raise Exception(
"Macro '%s' is not an absolute path." % macro)
78 args.extend([
"-m", macro])
85 """! Return path to detector file."""
89 """! Return path to particle table."""
90 return os.path.join(self.
slic_dir,
"share",
"particle.tbl")
93 """! Configure SLIC component."""
99 raise Exception(
"Failed to find valid detector_dir")
101 "Using detector_dir from install: {}".format(self.
detector_dir)
105 """! Setup SLIC component."""
106 if not os.path.exists(self.
slic_dir):
107 raise Exception(
"slic_dir does not exist: %s" % self.
slic_dir)
111 raise Exception(
"SLIC setup script does not exist: %s" % self.
namename)
114 run_number_cmd =
"/lcio/runNumber %d" % self.
run_number
115 run_number_mac = open(
"run_number.mac",
"w")
116 run_number_mac.write(run_number_cmd)
117 run_number_mac.close()
121 Return list of optional parameters.
123 Optional parameters are: **nevents**, **macros**, **run_number**
124 @return list of optional parameters
126 return [
"nevents",
"macros",
"run_number",
"disable_particle_table"]
130 Return list of required parameters.
132 Required parameters are: **detector**
133 @return list of required parameters
139 Return list of required configurations.
141 Required configurations are: **slic_dir**, **detector_dir**
142 @return list of required configurations
144 return [
"slic_dir",
"detector_dir"]
148 Execute SLIC component.
150 Component is executed by creating command line input
151 from command and command arguments.
152 @return return code of process
155 cl =
'bash -c ". %s && %s %s"' % (
162 proc = subprocess.Popen(cl, shell=
True, stdout=log_out, stderr=log_err)
166 return proc.returncode
171 Copy the SQLite database file to the desired location.
176 Initialize SQLiteProc to copy the SQLite file.
184 "Setting SQLite local copy source file from config: %s"
195 Component.__init__(self, name=
"sqlite_file_copy", **kwargs)
199 Return dummy command arguments to satisfy the parent class.
201 cmd_args = [
"(no-command-needed)"]
203 if not all(isinstance(arg, str)
for arg
in cmd_args):
204 raise ValueError(
"All arguments must be strings.")
210 Execute the file copy operation.
217 f
"Copying file from {self.source_file} to {self.destination_file}"
222 self.
logger.info(f
"Successfully copied file to {self.destination_file}")
226 except Exception
as e:
227 self.
logger.error(f
"Error during file copy: {e}")
233 Run the hps-java JobManager class.
235 Input files have slcio format.
237 Required parameters are: **steering_files** \n
238 Optional parameters are: **detector**, **run_number**, **defs**
270 if "overlay_file" in kwargs:
279 description=
"HPS Java Job Manager",
288 "Append token for '%s' automatically set to '%s' from steering key."
293 """! Configure JobManager component."""
297 if os.getenv(
"HPS_JAVA_BIN_JAR",
None)
is not None:
300 "Set HPS_JAVA_BIN_JAR from environment: {}".format(
306 "hps_java_bin_jar not set in environment or config file!"
309 if os.getenv(
"CONDITIONS_URL",
None)
is not None:
312 "Set CONDITIONS_URL from environment: {}".format(
319 Return list of required configurations.
321 Required configurations are: **hps_java_bin_jar**
322 @retun list of required configurations.
324 return [
"hps_java_bin_jar"]
327 """! Setup JobManager component."""
329 raise Exception(
"No inputs provided to hps-java.")
339 Setup command arguments.
340 @return list of arguments
345 self.
logger.debug(
"Setting java_args from config: %s" % self.
java_args)
366 self.
logger.debug(
"Setting conditions_password from config (not shown)")
392 args.append(
"outputFile=" + os.path.splitext(self.
output_files()[0])[0])
395 for k, v
in self.
defs.items():
397 args.append(k +
"=" + str(v))
402 "Steering does not exist at '%s' so assuming it is a resource."
408 "Steering looks like a file but is not an abs path: %s"
419 args.append(input_file)
423 args.append(
"overlayFile=" + os.path.splitext(self.
overlay_file)[0])
429 Return list of required parameters.
431 Required parameters are: **steering_files**
432 @return list of required parameters
434 return [
"steering_files"]
438 Return list of optional parameters.
440 Optional parameters are: **detector**, **run_number**, **defs**
441 @return list of optional parameters
443 return [
"detector",
"run_number",
"defs",
"nevents"]
448 Run the make_mini_dst command on the input file.
450 Required parameters are: **input_file**
451 Required configs are: **minidst_install_dir**
456 Initialize ProcessMiniDst with default input file and the command to run.
461 Component.__init__(self, name=
'make_mini_dst',
462 command=
'make_mini_dst',
463 description=
'Create the MiniDST ROOT file',
468 """! Setup the MiniDST component."""
471 raise Exception(
"No input files provided to make_mini_dst.")
478 Return list of required parameters.
480 Required parameters are only the standard "input_files".
481 @return list of required parameters
487 Return list of optional parameters.
489 There are currently no optional parameters.
490 @return list of optional parameters
496 Return list of required configs.
498 Required configs are: **minidst_install_dir**
499 @return list of required configs
501 return [
"minidst_install_dir"]
504 """! Adjust names of output files."""
508 print(f
"Set outputs to: {self.outputs}")
514 Setup command arguments for make_mini_dst.
515 @return list of arguments
519 print(
"===== Make MiniDST with input files: ", end=
" ")
521 print(f
"{self.input_files()[i]}", end=
", ")
522 print(f
" ==> {self.output_files()}")
534 Run the hpstr analysis tool.
536 Required parameters are: **config_files** \n
537 Optional parameters are: **year**, **is_data**, **nevents** \n
538 Required configs are: **hpstr_install_dir**, **hpstr_base**
541 def __init__(self, cfg=None, is_data=0, year=None, tracking=None, **kwargs):
554 Component.__init__(self, name=
"hpstr", command=
"hpstr", **kwargs)
557 """! Setup HPSTR component."""
572 if len(os.path.dirname(config_file)):
574 if os.path.isabs(config_file):
579 "The config has a directory but is not an abs path: %s" % self.
cfg
584 self.
hpstr_base,
"processors",
"config", config_file
589 if os.path.splitext(self.
input_files()[0])[1] ==
".root":
597 Return list of required parameters.
599 Required parameters are: **config_files**
600 @return list of required parameters
602 return [
"config_files"]
606 Return list of optional parameters.
608 Optional parameters are: **year**, **is_data**, **nevents**
609 @return list of optional parameters
611 return [
"year",
"is_data",
"nevents",
"tracking"]
615 Return list of required configs.
617 Required configs are: **hpstr_install_dir**, **hpstr_base**
618 @return list of required configs
620 return [
"hpstr_install_dir",
"hpstr_base"]
624 Setup command arguments.
625 @return list of arguments
638 if self.
year is not None:
639 args.extend([
"-y", str(self.
year)])
641 args.extend([
"-w", str(self.
tracking)])
645 """! Adjust names of output files."""
648 return [
"%s.root" % f]
655 """! Execute HPSTR component."""
657 cl =
'bash -c ". %s && %s %s"' % (
664 proc = subprocess.Popen(cl, shell=
True, stdout=log_out, stderr=log_err)
668 return proc.returncode
676 Generic class for StdHep tools.
684 "lhe_tridents_displacetime",
685 "lhe_tridents_displaceuni",
693 Component.__init__(self, name=name, command=
"stdhep_" + name, **kwargs)
697 Setup command arguments.
698 @return list of arguments
702 if self.
name in StdHepTool.seed_names:
708 raise Exception(
"Too many outputs specified for StdHepTool.")
710 raise Exception(
"No outputs specified for StdHepTool.")
713 for i
in self.
inputs[::-1]:
716 raise Exception(
"No inputs specified for StdHepTool.")
723 Transform StdHep events into beam coordinates.
725 Optional parameters are: **beam_sigma_x**, **beam_sigma_y**, **beam_rot_x**,
726 **beam_rot_y**, **beam_rot_z**, **target_x**, **target_y**, **target_z**
748 StdHepTool.__init__(self, name=
"beam_coords", append_tok=
"rot", **kwargs)
752 Setup command arguments.
753 @return list of arguments
755 args = StdHepTool.cmd_args(self)
770 args.extend([
"-X", str(self.
target_x)])
772 args.extend([
"-Y", str(self.
target_y)])
774 args.extend([
"-Z", str(self.
target_z)])
780 Return list of optional parameters.
782 Optional parameters are: **beam_sigma_x**, **beam_sigma_y**, **beam_rot_x**,
783 **beam_rot_y**, **beam_rot_z**, **target_x**, **target_y**, **target_z**
784 @return list of optional parameters
800 Randomly sample StdHep events into a new file.
802 Optional parameters are: **nevents**, **mu**
806 StdHepTool.__init__(self, name=
"random_sample", append_tok=
"sampled", **kwargs)
812 Setup command arguments.
813 @return list of arguments
817 if self.
name in StdHepTool.seed_names:
820 args.extend([
"-N", str(1)])
825 if self.
mu is not None:
826 args.extend([
"-m", str(self.
mu)])
830 args.insert(0, os.path.splitext(self.
output_files()[0])[0])
832 raise Exception(
"Too many outputs specified for RandomSample.")
834 raise Exception(
"No outputs specified for RandomSample.")
837 for i
in self.
inputs[::-1]:
840 raise Exception(
"No inputs were provided.")
846 Return list of optional parameters.
848 Optional parameters are: **nevents**, **mu**
849 @return list of optional parameters
851 return [
"nevents",
"mu"]
854 """! Execute RandomSample component"""
855 returncode = Component.execute(self, log_out, log_err)
858 src =
"%s_1.stdhep" % os.path.splitext(self.
output_files()[0])[0]
859 dest =
"%s.stdhep" % os.path.splitext(self.
output_files()[0])[0]
860 self.
logger.debug(
"Moving '%s' to '%s'" % (src, dest))
861 shutil.move(src, dest)
868 Convert LHE files to StdHep.
874 StdHepTool.__init__(self,
876 output_ext=
'.stdhep',
881 Setup command arguments.
882 @return list of arguments
884 args = StdHepTool.cmd_args(self)
890 Convert LHE files to StdHep, displacing the time by given ctau.
892 Optional parameters are: **ctau**
899 self, name=
"lhe_tridents_displacetime", output_ext=
".stdhep", **kwargs
904 Setup command arguments.
905 @return list of arguments
907 args = StdHepTool.cmd_args(self)
908 if self.
ctau is not None:
909 args.extend([
"-l", str(self.
ctau)])
914 Return list of optional parameters.
916 Optional parameters are: **ctau**
917 @return list of optional parameters
924 Convert LHE files to StdHep, displacing the time by given ctau.
926 Optional parameters are: **ctau**
933 self, name=
"lhe_tridents_displaceuni", output_ext=
".stdhep", **kwargs
938 Setup command arguments.
939 @return list of arguments
941 args = StdHepTool.cmd_args(self)
942 if self.
ctau is not None:
943 args.extend([
"-l", str(self.
ctau)])
948 Return list of optional parameters.
950 Optional parameters are: **ctau**
951 @return list of optional parameters
958 Add mother particles for physics samples.
962 StdHepTool.__init__(self, name=
"add_mother", append_tok=
"mom", **kwargs)
966 """! Add full truth mother particles for physics samples"""
970 self,
"add_mother_full_truth", append_tok=
"mom_full_truth", **kwargs
974 "Must have 2 input files: a stdhep file and a lhe file in order"
979 raise Exception(
"The first input file must be a stdhep file")
983 raise Exception(
"The second input file must be a lhe file")
987 Setup command arguments.
988 @return list of arguments
995 Merge StdHep files, applying poisson sampling.
997 Required parameters are: **target_thickness**, **num_electrons**
1008 StdHepTool.__init__(self, name=
"merge_poisson", append_tok=
"sampled", **kwargs)
1011 """! Setup MergePoisson component."""
1015 raise Exception(
"Cross section is missing.")
1016 self.
logger.info(
"mu is %f", self.
mu)
1020 Return list of required parameters.
1022 Required parameters are: **target_thickness**, **num_electrons**
1023 @return list of required parameters
1025 return [
"target_thickness",
"num_electrons"]
1029 Setup command arguments.
1030 @return list of arguments
1033 if self.
name in StdHepTool.seed_names:
1040 args.insert(0, os.path.splitext(self.
output_files()[0])[0])
1042 raise Exception(
"Too many outputs specified for MergePoisson.")
1044 raise Exception(
"No outputs specified for MergePoisson.")
1047 for i
in self.
inputs[::-1]:
1050 raise Exception(
"No inputs were provided.")
1055 """! Execute MergePoisson component."""
1056 returncode = Component.execute(self, log_out, log_err)
1059 src =
"%s_1.stdhep" % os.path.splitext(self.
output_files()[0])[0]
1060 dest =
"%s.stdhep" % os.path.splitext(self.
output_files()[0])[0]
1061 self.
logger.debug(
"Moving '%s' to '%s'" % (src, dest))
1062 shutil.move(src, dest)
1071 Optional parameters are: none \n
1072 Required parameters are: none
1076 StdHepTool.__init__(self, name=
"merge_files", **kwargs)
1080 Return list of optional parameters.
1082 Optional parameters are: none
1083 @return list of optional parameters
1089 Return list of required parameters.
1091 Required parameters are: none
1092 @return list of required parameters
1099 Count number of events in a StdHep file.
1104 self, name=
"stdhep_count", command=
"stdhep_count.sh", **kwargs
1109 Setup command arguments.
1110 @return list of arguments
1116 """! Execute StdHepCount component."""
1119 proc = subprocess.Popen(cl, stdout=PIPE)
1120 (output, err) = proc.communicate()
1122 nevents = int(output.split()[1])
1123 print(
"StdHep file '%s' has %d events." % (self.
input_files()[0], nevents))
1125 return proc.returncode
1130 Generic base class for Java based tools.
1140 Component.__init__(self, name,
"java", **kwargs)
1144 Return list of required config.
1146 Required config are: **hps_java_bin_jar**
1147 @return list of required config
1149 return [
"hps_java_bin_jar"]
1153 Setup command arguments.
1154 @return list of arguments
1158 self.
logger.debug(
"Setting java_args from config: %s" + self.
java_args)
1176 Convert EVIO events to LCIO using the hps-java EvioToLcio command line tool.
1178 Input files have evio format (format used by DAQ system).
1180 Required parameters are: **detector**, **steering_files** \n
1181 Optional parameters are: **run_number**, **skip_events**, **nevents**, **event_print_interval**
1198 name=
"evio_to_lcio",
1199 java_class=
"org.hps.evio.EvioToLcio",
1200 output_ext=
".slcio",
1206 Return list of required parameters.
1208 Required parameters are: **detector**, **steering_files**
1209 @return list of required parameters
1211 return [
"detector",
"steering_files"]
1215 Return list of optional parameters.
1217 Optional parameters are: **run_number**, **skip_events**, **nevents**, **event_print_interval**
1218 @return list of optional parameters
1220 return [
"run_number",
"skip_events",
"nevents",
"event_print_interval"]
1223 """! Setup EvioToLcio component."""
1233 Setup command arguments.
1234 @return list of arguments
1236 args = JavaTool.cmd_args(self)
1238 raise Exception(
"No output files were provided.")
1240 args.append(
"-DoutputFile=%s" % os.path.splitext(output_file)[0])
1250 "Steering does not exist at '%s' so assuming it is a resource."
1256 "Steering looks like a file but is not an abs path: %s"
1267 args.append(inputfile)
1277 Space MC events and apply energy filters to process before readout.
1279 Optional parameters are: **filter_ecal_hit_ecut**, **filter_event_interval**,
1280 **filter_nevents_read**, **filter_nevents_write**, **filter_no_cuts** \n
1281 Required config are: **hps_java_bin_jar**
1285 if "filter_no_cuts" in kwargs:
1291 if "filter_ecal_pairs" in kwargs:
1296 if "filter_ecal_hit_ecut" in kwargs:
1303 if "filter_event_interval" in kwargs:
1309 if "filter_nevents_read" in kwargs:
1315 if "filter_nevents_write" in kwargs:
1325 name=
"filter_bunches",
1326 java_class=
"org.hps.util.FilterMCBunches",
1332 """! Configure FilterBunches component."""
1335 if os.getenv(
"HPS_JAVA_BIN_JAR",
None)
is not None:
1338 "Set HPS_JAVA_BIN_JAR from environment: {}".format(
1345 Setup command arguments.
1346 @return list of arguments
1348 args = JavaTool.cmd_args(self)
1371 Return list of optional parameters.
1373 Optional parameters are: **filter_ecal_hit_ecut**, **filter_event_interval**,
1374 **filter_nevents_read**, **filter_nevents_write**, **filter_no_cuts** \n
1375 @return list of optional parameters
1378 "filter_ecal_hit_ecut",
1379 "filter_event_interval",
1380 "filter_nevents_read",
1381 "filter_nevents_write",
1387 Return list of required config.
1389 Required config are: **hps_java_bin_jar**
1390 @return list of required config
1392 return [
"hps_java_bin_jar"]
1397 Apply hodo-hit filter and space MC events to process before readout.
1399 The nevents parameter is not settable from JSON in this class. It should
1400 be supplied as an init argument in the job script if it needs to be
1401 customized (the default nevents and event_interval used to apply spacing
1402 should usually not need to be changed by the user). \n
1404 Optional parameters are: **num_hodo_hits**, **event_interval**
1408 if "num_hodo_hits" in kwargs:
1413 if "event_interval" in kwargs:
1420 name=
"filter_events",
1421 java_class=
"org.hps.util.ExtractEventsWithHitAtHodoEcal",
1428 Setup command arguments.
1429 @return list of arguments
1431 args = JavaTool.cmd_args(self)
1447 Return list of optional parameters.
1449 Optional parameters are: **num_hodo_hits**, **event_interval**
1450 @return list of optional parameters
1452 return [
"num_hodo_hits",
"event_interval"]
1457 Unzip the input files to outputs.
1461 Component.__init__(self, name=
"unzip", command=
"gunzip", **kwargs)
1464 """! Return list of output files."""
1467 return [os.path.splitext(i)[0]
for i
in self.
input_files()]
1470 """! Execute Unzip component."""
1474 with gzip.open(inputfile,
"rb")
as in_file, open(
1477 shutil.copyfileobj(in_file, out_file)
1478 self.
logger.debug(
"Unzipped '%s' to '%s'" % (inputfile, outputfile))
1484 Dump LCIO event information.
1486 Required parameters are: none \n
1487 Required config are: **lcio_dir**
1493 Component.__init__(self, name=
"lcio_dump_event", command=
"dumpevent", **kwargs)
1495 if "event_num" in kwargs:
1501 """! Configure LCIODumpEvent component."""
1507 """! Setup LCIODumpEvent component."""
1512 Setup command arguments.
1513 @return list of arguments
1516 raise Exception(
"Missing required inputs for LCIODumpEvent.")
1524 Return list of required config.
1526 Required config are: **lcio_dir**
1527 @return list of required config
1533 Return list of required parameters.
1535 Required parameters are: none
1536 @return list of required parameters
1543 Count events in an LHE file.
1546 def __init__(self, minevents=0, fail_on_underflow=False, **kwargs):
1548 Component.__init__(self, name=
"lhe_count", **kwargs)
1551 """! Setup LHECount component."""
1553 raise Exception(
"Missing at least one input file.")
1557 Check if command exists.
1558 @return True if command exists
1563 """! Execute LHECount component."""
1565 with gzip.open(i,
"rb")
as in_file:
1566 lines = in_file.readlines()
1570 if "<event>" in line:
1573 print(
"LHE file '%s' has %d events." % (i, nevents))
1576 msg =
"LHE file '%s' does not contain the minimum %d events." % (
1580 if self.fail_on_underflow:
1581 raise Exception(msg)
1589 Tar files into an archive.
1593 Component.__init__(self, name=
"tar_files", **kwargs)
1597 Check if command exists.
1598 @return True if command exists
1603 """! Execute TarFiles component."""
1604 self.
logger.debug(
"Opening '%s' for writing ..." % self.
outputs[0])
1605 tar = tarfile.open(self.
outputs[0],
"w")
1607 self.
logger.debug(
"Adding '%s' to archive" % i)
1616 Move input files to new locations.
1620 Component.__init__(self, name=
"move_files", **kwargs)
1624 Check if command exists.
1625 @return True if command exists
1630 """! Execute TarFiles component."""
1632 raise Exception(
"Input and output lists are not the same length!")
1636 self.
logger.info(
"Moving %s -> %s" % (src, dest))
1637 shutil.move(src, dest)
1643 Generic component for LCIO tools.
1645 Required parameters are: none \n
1646 Required config are: **lcio_bin_jar**
1652 Component.__init__(self, name, command=
"java", **kwargs)
1655 """! Configure LCIOTool component."""
1662 Setup command arguments.
1663 @return list of arguments
1666 raise Exception(
"Name required to write cmd args for LCIOTool.")
1671 Return list of required config.
1673 Required config are: **lcio_bin_jar**
1674 @return list of required config
1676 return [
"lcio_bin_jar"]
1680 Return list of required parameters.
1682 Required parameters are: none
1683 @return list of required parameters
1690 Concatenate LCIO files together.
1694 LCIOTool.__init__(self, name=
"concat", **kwargs)
1698 Setup command arguments.
1699 @return list of arguments
1701 args = LCIOTool.cmd_args(self)
1703 raise Exception(
"Missing at least one input file.")
1705 raise Exception(
"Missing an output file.")
1707 args.extend([
"-f", i])
1708 args.extend([
"-o", self.
outputs[0]])
1714 Count events in LCIO files.
1716 Required parameters are: none \n
1717 Optional parameters are: none
1721 LCIOTool.__init__(self, name=
"count", **kwargs)
1725 Setup command arguments.
1726 @return list of arguments
1728 args = LCIOTool.cmd_args(self)
1730 raise Exception(
"Missing an input file.")
1736 Return list of required parameters.
1738 Required parameters are: none
1739 @return list of required parameters
1745 Return list of optional parameters.
1747 Optional parameters are: none
1748 @return list of optional parameters
1759 LCIOTool.__init__(self, name=
"merge", **kwargs)
1763 Setup command arguments.
1764 @return list of arguments
1766 args = LCIOTool.cmd_args(self)
1768 raise Exception(
"Missing at least one input file.")
1770 raise Exception(
"Missing an output file.")
1772 args.extend([
"-f", i])
1773 args.extend([
"-o", self.
outputs[0]])
1780MergeROOT tool for hps-mc
1781Merges ROOT files using hadd with validation
1787 Merge ROOT files using hadd with event count validation.
1789 This component uses ROOT's hadd utility to merge multiple ROOT files
1790 into a single output file, and validates that all events are preserved.
1795 Initialize MergeROOT component.
1800 List of input ROOT files to merge
1802 List containing the output merged ROOT file name
1803 force : bool, optional
1804 Force overwrite of output file (default: True)
1805 compression : int, optional
1806 Compression level for output file (0-9, default: None uses hadd default)
1807 validate : bool, optional
1808 Validate event counts after merge (default: True)
1809 write_stats : bool, optional
1810 Write JSON stats file after merge (default: True when validate=True)
1811 job_id : int, optional
1812 Job ID to include in stats output
1814 Component.__init__(self, **kwargs)
1821 if not hasattr(self,
"force"):
1825 if not hasattr(self,
"compression"):
1829 if not hasattr(self,
"validate"):
1833 if not hasattr(self,
"write_stats"):
1837 if not hasattr(self,
"job_id"):
1849 Build command line arguments for hadd.
1854 List of command arguments
1857 sys.stderr.write(
"MergeROOT DEBUG: cmd_args() called\n")
1858 sys.stderr.write(
" self.force=%s, self.compression=%s\n" % (self.
force, self.
compression))
1877 sys.stderr.write(
"MergeROOT DEBUG: ERROR - No output file specified!\n")
1879 raise RuntimeError(
"MergeROOT: No output file specified")
1885 sys.stderr.write(
"MergeROOT DEBUG: ERROR - No input files specified!\n")
1887 raise RuntimeError(
"MergeROOT: No input files specified")
1889 sys.stderr.write(
"MergeROOT DEBUG: cmd_args() returning: %s\n" % args)
1895 Scan a ROOT file and extract TTree event counts.
1901 log_out : file, optional
1902 Log file for output (used to report multiple key cycles)
1907 Dictionary mapping tree names to entry counts
1913 "MergeROOT: PyROOT is required for validation but not available"
1920 root_file = ROOT.TFile.Open(filename,
"READ")
1921 if not root_file
or root_file.IsZombie():
1922 raise RuntimeError(
"MergeROOT: Cannot open ROOT file: %s" % filename)
1925 for key
in root_file.GetListOfKeys():
1929 if obj.InheritsFrom(
"TTree"):
1930 tree_name = obj.GetName()
1931 cycle = key.GetCycle()
1932 num_entries = obj.GetEntries()
1934 if tree_name
not in tree_cycles:
1935 tree_cycles[tree_name] = []
1936 tree_cycles[tree_name].append((cycle, num_entries))
1941 for tree_name, cycles
in tree_cycles.items():
1944 cycles.sort(key=
lambda x: x[0], reverse=
True)
1945 highest_cycle, highest_entries = cycles[0]
1947 log_out.write(
" WARNING: Multiple key cycles found for tree '%s':\n" % tree_name)
1948 for cyc, ent
in cycles:
1949 marker =
" <-- using" if cyc == highest_cycle
else ""
1950 log_out.write(
" Cycle %d: %d entries%s\n" % (cyc, ent, marker))
1951 tree_counts[tree_name] = highest_entries
1953 tree_counts[tree_name] = cycles[0][1]
1959 Scan all input files and store tree event counts.
1966 log_out.write(
"\n" +
"=" * 70 +
"\n")
1967 log_out.write(
"MergeROOT: Scanning input files for TTrees\n")
1968 log_out.write(
"=" * 70 +
"\n")
1971 if not os.path.exists(input_file):
1972 raise RuntimeError(
"MergeROOT: Input file not found: %s" % input_file)
1974 log_out.write(
"\nScanning: %s\n" % input_file)
1978 log_out.write(
" WARNING: No TTrees found in this file\n")
1980 for tree_name, count
in tree_counts.items():
1981 log_out.write(
" Tree '%s': %d events\n" % (tree_name, count))
1985 log_out.write(
"\n" +
"=" * 70 +
"\n")
1990 Scan output file and store tree event counts.
1999 log_out.write(
"\n" +
"=" * 70 +
"\n")
2000 log_out.write(
"MergeROOT: Scanning output file for TTrees\n")
2001 log_out.write(
"=" * 70 +
"\n")
2002 log_out.write(
"\nScanning: %s\n" % output_file)
2007 log_out.write(
" WARNING: No TTrees found in output file\n")
2010 log_out.write(
" Tree '%s': %d events\n" % (tree_name, count))
2012 log_out.write(
"\n" +
"=" * 70 +
"\n")
2017 Validate that event counts match between input and output files.
2027 True if validation passes, False otherwise
2029 log_out.write(
"\n" +
"=" * 70 +
"\n")
2030 log_out.write(
"MergeROOT: Validating merge results\n")
2031 log_out.write(
"=" * 70 +
"\n\n")
2034 total_input_counts = {}
2037 for tree_name, count
in tree_counts.items():
2038 if tree_name
not in total_input_counts:
2039 total_input_counts[tree_name] = 0
2040 total_input_counts[tree_name] += count
2045 if not total_input_counts:
2046 log_out.write(
"WARNING: No TTrees found in input files\n")
2049 log_out.write(
"Event count validation:\n")
2050 log_out.write(
"-" * 70 +
"\n")
2052 "%-30s %15s %15s %10s\n"
2053 % (
"Tree Name",
"Input Events",
"Output Events",
"Status")
2055 log_out.write(
"-" * 70 +
"\n")
2057 for tree_name, input_count
in sorted(total_input_counts.items()):
2060 if output_count == input_count:
2067 "%-30s %15d %15d %10s\n"
2068 % (tree_name, input_count, output_count, status)
2073 total_input_counts.keys()
2076 log_out.write(
"\nWARNING: Output contains trees not found in inputs:\n")
2077 for tree_name
in extra_trees:
2079 " - %s: %d events\n"
2083 log_out.write(
"-" * 70 +
"\n")
2086 log_out.write(
"\n✓ VALIDATION PASSED: All event counts match!\n")
2088 log_out.write(
"\n✗ VALIDATION FAILED: Event count mismatch detected!\n")
2090 log_out.write(
"=" * 70 +
"\n\n")
2097 Print a summary of the merge operation.
2104 log_out.write(
"\n" +
"=" * 70 +
"\n")
2105 log_out.write(
"MergeROOT: Summary\n")
2106 log_out.write(
"=" * 70 +
"\n")
2107 log_out.write(
"Input files: %d\n" % len(self.
inputsinputs))
2110 log_out.write(
" %d. %s\n" % (i, input_file))
2114 "Compression level: %s\n"
2120 log_out.write(
"\nTotal events in merged file:\n")
2122 log_out.write(
" %-30s: %d events\n" % (tree_name, count))
2124 log_out.write(
"=" * 70 +
"\n")
2129 Get the stats JSON filename based on the output ROOT filename.
2134 Path to stats JSON file (e.g., 'merged_X_job1.root' -> 'merged_X_job1_stats.json')
2139 base, _ = os.path.splitext(output_file)
2140 return base +
"_stats.json"
2144 Write merge statistics to a JSON file.
2150 validation_passed : bool
2151 Whether the validation passed
2154 if stats_file
is None:
2155 log_out.write(
"WARNING: Cannot determine stats filename, skipping stats output\n")
2158 log_out.write(
"\n" +
"=" * 70 +
"\n")
2159 log_out.write(
"MergeROOT: Writing stats to %s\n" % stats_file)
2160 log_out.write(
"=" * 70 +
"\n")
2163 total_input_events = {}
2165 for tree_name, count
in tree_counts.items():
2166 if tree_name
not in total_input_events:
2167 total_input_events[tree_name] = 0
2168 total_input_events[tree_name] += count
2171 input_files_list = []
2174 input_files_list.append({
2176 "events": tree_counts
2184 "input_files": input_files_list,
2185 "total_input_events": total_input_events,
2186 "validation_passed": validation_passed,
2191 with open(stats_file,
'w')
as f:
2192 json.dump(stats, f, indent=2)
2194 log_out.write(
"Stats written successfully\n")
2195 log_out.write(
"=" * 70 +
"\n")
2200 Execute MergeROOT component using hadd.
2212 Return code from hadd command
2215 log_out.write(
"\n" +
"=" * 70 +
"\n")
2216 log_out.write(
"MergeROOT: DEBUG - Entering execute()\n")
2217 log_out.write(
"=" * 70 +
"\n")
2218 log_out.write(
"DEBUG: self.command = %s\n" % self.
commandcommand)
2219 log_out.write(
"DEBUG: self.inputs = %s\n" % self.
inputsinputs)
2220 log_out.write(
"DEBUG: self.outputs = %s\n" % self.
outputsoutputs)
2221 log_out.write(
"DEBUG: self.force = %s\n" % self.
force)
2222 log_out.write(
"DEBUG: self.compression = %s\n" % self.
compression)
2223 log_out.write(
"DEBUG: self.validate = %s\n" % self.
validate)
2227 log_out.write(
"\nDEBUG: Checking if hadd command exists...\n")
2230 raise RuntimeError(
"MergeROOT: hadd command not found in PATH")
2231 log_out.write(
"DEBUG: hadd command found\n")
2235 log_out.write(
"\nDEBUG: Checking input files exist...\n")
2238 log_out.write(
"DEBUG: Checking: %s\n" % input_file)
2240 if not os.path.exists(input_file):
2241 raise RuntimeError(
"MergeROOT: Input file not found: %s" % input_file)
2242 log_out.write(
"DEBUG: -> exists (size: %d bytes)\n" % os.path.getsize(input_file))
2246 log_out.write(
"\nDEBUG: Validation enabled = %s\n" % self.
validate)
2250 log_out.write(
"DEBUG: Starting input file scan...\n")
2253 log_out.write(
"DEBUG: Input file scan complete\n")
2255 except Exception
as e:
2256 log_out.write(
"\nWARNING: Could not scan input files: %s\n" % str(e))
2257 log_out.write(
"Proceeding with merge without validation.\n")
2261 log_out.write(
"\nDEBUG: Building command arguments...\n")
2264 log_out.write(
"DEBUG: cmd_args() returned: %s\n" % self.
cmd_argscmd_args())
2268 log_out.write(
"\n" +
"=" * 70 +
"\n")
2269 log_out.write(
"MergeROOT: Executing hadd\n")
2270 log_out.write(
"=" * 70 +
"\n")
2271 log_out.write(
"Command: %s\n" %
" ".join(cmd))
2272 log_out.write(
"=" * 70 +
"\n\n")
2276 log_out.write(
"DEBUG: About to call subprocess.Popen...\n")
2278 proc = subprocess.Popen(cmd, stdout=log_out, stderr=log_err)
2279 log_out.write(
"DEBUG: Popen returned, PID = %s\n" % proc.pid)
2281 log_out.write(
"DEBUG: Waiting for process to complete...\n")
2284 log_out.write(
"DEBUG: Process completed, returncode = %d\n" % proc.returncode)
2288 if proc.returncode != 0:
2289 log_out.write(
"DEBUG: hadd FAILED with return code %d\n" % proc.returncode)
2292 "MergeROOT: hadd failed with return code %d" % proc.returncode
2296 log_out.write(
"DEBUG: Checking if output file exists: %s\n" % self.
outputsoutputs[0])
2300 "MergeROOT: Output file was not created: %s" % self.
outputsoutputs[0]
2302 log_out.write(
"DEBUG: Output file exists, size = %d bytes\n" % os.path.getsize(self.
outputsoutputs[0]))
2305 log_out.write(
"\n✓ hadd completed successfully\n")
2309 log_out.write(
"\nDEBUG: Post-merge validation check, self.validate = %s\n" % self.
validate)
2311 validation_passed =
True
2314 log_out.write(
"DEBUG: Starting output file scan...\n")
2317 log_out.write(
"DEBUG: Output file scan complete\n")
2319 log_out.write(
"DEBUG: Starting merge validation...\n")
2323 log_out.write(
"DEBUG: Merge validation complete, passed = %s\n" % validation_passed)
2326 if not validation_passed:
2327 raise RuntimeError(
"MergeROOT: Event count validation failed!")
2329 except Exception
as e:
2330 log_out.write(
"\nERROR during validation: %s\n" % str(e))
2335 log_out.write(
"\nDEBUG: write_stats = %s\n" % self.
write_stats)
2340 except Exception
as e:
2341 log_out.write(
"\nWARNING: Could not write stats JSON: %s\n" % str(e))
2345 log_out.write(
"\nDEBUG: Printing summary...\n")
2349 log_out.write(
"\nDEBUG: MergeROOT.execute() returning %d\n" % proc.returncode)
2351 return proc.returncode
2355 Return list of output files.
2360 List containing the merged output ROOT file and optionally the stats JSON
2365 if stats_file
and stats_file
not in files:
2366 files.append(stats_file)
2371 Return list of required configuration parameters.
2376 List of required config parameters (empty for MergeROOT)
Base class for components in a job.
output_files(self)
Return a list of output files created by this component.
config_from_environ(self)
Configure component from environment variables which are just upper case versions of the required con...
cmd_exists(self)
Check if the component's assigned command exists.
cmd_args(self)
Return the command arguments of this component.
input_files(self)
Get a list of input files for this component.
Miscellaneous math functions.