| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | // multithreaded test of an iguana algorithm | ||
| 2 | |||
| 3 | #include <hipo4/reader.h> | ||
| 4 | #include <iguana/algorithms/AlgorithmSequence.h> | ||
| 5 | |||
| 6 | ✗ | inline int TestMultithreading( | |
| 7 | std::string const command, | ||
| 8 | std::string const algo_name, | ||
| 9 | std::vector<std::string> const prerequisite_algos, | ||
| 10 | std::vector<std::string> const bank_names, | ||
| 11 | std::string const data_file, | ||
| 12 | int const num_events, | ||
| 13 | int const num_threads, | ||
| 14 | std::string const concurrency_model, | ||
| 15 | bool const vary_run, | ||
| 16 | bool const verbose) | ||
| 17 | { | ||
| 18 | |||
| 19 | ✗ | iguana::Logger log("test", verbose ? iguana::Logger::Level::trace : iguana::Logger::Level::info); | |
| 20 | |||
| 21 | // check arguments | ||
| 22 | ✗ | if(algo_name.empty() || bank_names.empty()) { | |
| 23 | ✗ | log.Error("need algorithm name and banks"); | |
| 24 | ✗ | return 1; | |
| 25 | } | ||
| 26 | ✗ | if(data_file.empty()) { | |
| 27 | ✗ | log.Error("need a data file for command {:?}", command); | |
| 28 | ✗ | return 1; | |
| 29 | } | ||
| 30 | |||
| 31 | // set the concurrency model | ||
| 32 | ✗ | if(!concurrency_model.empty()) | |
| 33 | ✗ | iguana::GlobalConcurrencyModel = concurrency_model; | |
| 34 | |||
| 35 | // find the 'RUN::config' bank, if any | ||
| 36 | ✗ | std::optional<hipo::banklist::size_type> run_config_bank_idx{}; | |
| 37 | ✗ | if(vary_run) { | |
| 38 | ✗ | for(hipo::banklist::size_type idx = 0; idx < bank_names.size(); idx++) { | |
| 39 | ✗ | if(bank_names.at(idx) == "RUN::config") { | |
| 40 | run_config_bank_idx = idx; | ||
| 41 | break; | ||
| 42 | } | ||
| 43 | } | ||
| 44 | } | ||
| 45 | |||
| 46 | // number of events per thread | ||
| 47 | int const default_frame_size = 50; | ||
| 48 | ✗ | int num_events_per_thread = (int)std::round((double)num_events / num_threads); | |
| 49 | ✗ | int num_events_per_frame = num_events > 0 ? std::min(num_events_per_thread, default_frame_size) : default_frame_size; | |
| 50 | ✗ | int num_frames_per_thread = num_events > 0 ? (int)std::ceil((double)num_events_per_thread / num_events_per_frame) : 0; | |
| 51 | ✗ | int num_events_actual = num_events_per_frame * num_frames_per_thread * num_threads; | |
| 52 | ✗ | log.Info("num_events_per_thread = {}", num_events_per_thread); | |
| 53 | ✗ | log.Info("num_events_per_frame = {}", num_events_per_frame); | |
| 54 | ✗ | log.Info("num_frames_per_thread = {}", num_frames_per_thread); | |
| 55 | ✗ | if(num_events > 0) { | |
| 56 | ✗ | log.Info("=> will actually process num_events = {}", num_events_actual); | |
| 57 | ✗ | if(num_events != num_events_actual) | |
| 58 | ✗ | log.Warn("argument's num_events ({}) differs from the actual num_events that will be processed ({})", | |
| 59 | num_events, num_events_actual); | ||
| 60 | } | ||
| 61 | else { | ||
| 62 | ✗ | log.Info("=> will actually process num_events = ALL OF THEM"); | |
| 63 | } | ||
| 64 | |||
| 65 | // start the stream | ||
| 66 | ✗ | hipo::readerstream stream; | |
| 67 | stream.open(data_file.c_str()); | ||
| 68 | |||
| 69 | // define the worker function | ||
| 70 | ✗ | auto ftn = [&stream, | |
| 71 | algo_name, | ||
| 72 | prerequisite_algos, | ||
| 73 | bank_names, | ||
| 74 | vary_run, | ||
| 75 | verbose, | ||
| 76 | num_events_per_thread, | ||
| 77 | num_events_per_frame, | ||
| 78 | run_config_bank_idx](int order) { | ||
| 79 | // fill a frame | ||
| 80 | std::vector<hipo::event> events; | ||
| 81 | ✗ | for(int i = 0; i < num_events_per_frame; i++) | |
| 82 | ✗ | events.push_back(hipo::event()); | |
| 83 | |||
| 84 | // bank list | ||
| 85 | hipo::banklist banks; | ||
| 86 | ✗ | for(auto const& bank_name : bank_names) | |
| 87 | ✗ | banks.push_back(hipo::bank(stream.dictionary().getSchema(bank_name.c_str()), 48)); | |
| 88 | |||
| 89 | // define the algorithm | ||
| 90 | ✗ | iguana::AlgorithmSequence seq; | |
| 91 | ✗ | for(auto const& prerequisite_algo : prerequisite_algos) | |
| 92 | ✗ | seq.Add(prerequisite_algo); | |
| 93 | ✗ | seq.Add(algo_name); | |
| 94 | ✗ | seq.SetName("TEST thread " + std::to_string(order)); | |
| 95 | ✗ | seq.PrintSequence(); | |
| 96 | ✗ | seq.SetOption(algo_name, "log", verbose ? "trace" : "info"); | |
| 97 | |||
| 98 | // start the algorithm | ||
| 99 | ✗ | seq.Start(banks); | |
| 100 | |||
| 101 | // loop over frames | ||
| 102 | int nProcessed = 0; | ||
| 103 | ✗ | while(nProcessed < num_events_per_thread || num_events_per_thread == 0) { | |
| 104 | ✗ | stream.pull(events); | |
| 105 | |||
| 106 | // loop over events in this frame | ||
| 107 | int nNonEmpty = 0; | ||
| 108 | ✗ | for(auto& event : events) { | |
| 109 | ✗ | if(event.getSize() > 16) { | |
| 110 | ✗ | nNonEmpty++; | |
| 111 | ✗ | nProcessed++; | |
| 112 | |||
| 113 | // read the banks | ||
| 114 | ✗ | for(auto& bank : banks) | |
| 115 | ✗ | event.read(bank); | |
| 116 | |||
| 117 | // occasionally vary the run number; so far, algorithms with data-dependent configuration | ||
| 118 | // parameters have dependence on run number, so this variation aims to improve thread | ||
| 119 | // sanitizer test coverage | ||
| 120 | ✗ | if(vary_run && run_config_bank_idx.has_value()) { | |
| 121 | // === rapid variation === | ||
| 122 | /* | ||
| 123 | banks[run_config_bank_idx.value()].putInt("run", 0, std::rand() % 20000); | ||
| 124 | */ | ||
| 125 | // === slower variation === | ||
| 126 | ///* | ||
| 127 | ✗ | if(std::rand() % 10 == 0) { // randomly increase or decrease the run number | |
| 128 | ✗ | auto runnum = banks[run_config_bank_idx.value()].getInt("run", 0); | |
| 129 | ✗ | runnum += (std::rand() % 2 == 0) ? 1000 : -1000; | |
| 130 | runnum = std::max(runnum, 0); // prevent negative run number | ||
| 131 | ✗ | banks[run_config_bank_idx.value()].putInt("run", 0, runnum); | |
| 132 | } | ||
| 133 | ✗ | else if(std::rand() % 10 == 1) { | |
| 134 | ✗ | banks[run_config_bank_idx.value()].putInt("run", 0, 1); // set the runnum to '1' | |
| 135 | } | ||
| 136 | //*/ | ||
| 137 | } | ||
| 138 | |||
| 139 | // run the iguana algorithm | ||
| 140 | ✗ | seq.Run(banks); | |
| 141 | } | ||
| 142 | } | ||
| 143 | ✗ | if(nNonEmpty == 0) | |
| 144 | break; | ||
| 145 | } | ||
| 146 | |||
| 147 | // stop the algorithm | ||
| 148 | ✗ | seq.Stop(); | |
| 149 | |||
| 150 | ✗ | seq.GetLog()->Info("nProcessed = {}", nProcessed); | |
| 151 | ✗ | return nProcessed; | |
| 152 | ✗ | }; | |
| 153 | |||
| 154 | // run | ||
| 155 | ✗ | stream.run(ftn, num_threads); | |
| 156 | return 0; | ||
| 157 | ✗ | } | |
| 158 |