| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | // multithreaded test of an iguana algorithm | ||
| 2 | |||
| 3 | #include <hipo4/reader.h> | ||
| 4 | #include <iguana/algorithms/AlgorithmSequence.h> | ||
| 5 | |||
| 6 | ✗ | inline int TestMultithreading( | |
| 7 | std::string const command, | ||
| 8 | std::string const algo_name, | ||
| 9 | std::vector<std::string> const prerequisite_algos, | ||
| 10 | std::vector<std::string> const bank_names, | ||
| 11 | std::string const data_file, | ||
| 12 | int const num_events, | ||
| 13 | int const num_threads, | ||
| 14 | std::string const concurrency_model, | ||
| 15 | bool const vary_run, | ||
| 16 | std::string const log_level) | ||
| 17 | { | ||
| 18 | |||
| 19 | ✗ | iguana::Logger log("test"); | |
| 20 | ✗ | log.SetLevel(log_level); | |
| 21 | |||
| 22 | // check arguments | ||
| 23 | ✗ | if(algo_name.empty() || bank_names.empty()) { | |
| 24 | ✗ | log.Error("need algorithm name and banks"); | |
| 25 | ✗ | return 1; | |
| 26 | } | ||
| 27 | ✗ | if(data_file.empty()) { | |
| 28 | ✗ | log.Error("need a data file for command {:?}", command); | |
| 29 | ✗ | return 1; | |
| 30 | } | ||
| 31 | |||
| 32 | // set the concurrency model | ||
| 33 | ✗ | if(!concurrency_model.empty()) | |
| 34 | ✗ | iguana::GlobalConcurrencyModel = concurrency_model; | |
| 35 | |||
| 36 | // find the 'RUN::config' bank, if any | ||
| 37 | ✗ | std::optional<hipo::banklist::size_type> run_config_bank_idx{}; | |
| 38 | ✗ | if(vary_run) { | |
| 39 | ✗ | for(hipo::banklist::size_type idx = 0; idx < bank_names.size(); idx++) { | |
| 40 | ✗ | if(bank_names.at(idx) == "RUN::config") { | |
| 41 | run_config_bank_idx = idx; | ||
| 42 | break; | ||
| 43 | } | ||
| 44 | } | ||
| 45 | } | ||
| 46 | |||
| 47 | // number of events per thread | ||
| 48 | int const default_frame_size = 50; | ||
| 49 | ✗ | int num_events_per_thread = (int)std::round((double)num_events / num_threads); | |
| 50 | ✗ | int num_events_per_frame = num_events > 0 ? std::min(num_events_per_thread, default_frame_size) : default_frame_size; | |
| 51 | ✗ | int num_frames_per_thread = num_events > 0 ? (int)std::ceil((double)num_events_per_thread / num_events_per_frame) : 0; | |
| 52 | ✗ | int num_events_actual = num_events_per_frame * num_frames_per_thread * num_threads; | |
| 53 | ✗ | log.Info("num_events_per_thread = {}", num_events_per_thread); | |
| 54 | ✗ | log.Info("num_events_per_frame = {}", num_events_per_frame); | |
| 55 | ✗ | log.Info("num_frames_per_thread = {}", num_frames_per_thread); | |
| 56 | ✗ | if(num_events > 0) { | |
| 57 | ✗ | log.Info("=> will actually process num_events = {}", num_events_actual); | |
| 58 | ✗ | if(num_events != num_events_actual) | |
| 59 | ✗ | log.Warn("argument's num_events ({}) differs from the actual num_events that will be processed ({})", | |
| 60 | num_events, num_events_actual); | ||
| 61 | } | ||
| 62 | else { | ||
| 63 | ✗ | log.Info("=> will actually process num_events = ALL OF THEM"); | |
| 64 | } | ||
| 65 | |||
| 66 | // start the stream | ||
| 67 | ✗ | hipo::readerstream stream; | |
| 68 | stream.open(data_file.c_str()); | ||
| 69 | |||
| 70 | // define the worker function | ||
| 71 | ✗ | auto ftn = [&stream, | |
| 72 | algo_name, | ||
| 73 | prerequisite_algos, | ||
| 74 | bank_names, | ||
| 75 | vary_run, | ||
| 76 | log_level, | ||
| 77 | num_events_per_thread, | ||
| 78 | num_events_per_frame, | ||
| 79 | run_config_bank_idx](int order) { | ||
| 80 | // fill a frame | ||
| 81 | std::vector<hipo::event> events; | ||
| 82 | ✗ | for(int i = 0; i < num_events_per_frame; i++) | |
| 83 | ✗ | events.push_back(hipo::event()); | |
| 84 | |||
| 85 | // bank list | ||
| 86 | hipo::banklist banks; | ||
| 87 | ✗ | for(auto const& bank_name : bank_names) | |
| 88 | ✗ | banks.push_back(hipo::bank(stream.dictionary().getSchema(bank_name.c_str()), 48)); | |
| 89 | |||
| 90 | // define the algorithm | ||
| 91 | ✗ | iguana::AlgorithmSequence seq; | |
| 92 | ✗ | for(auto const& prerequisite_algo : prerequisite_algos) | |
| 93 | ✗ | seq.Add(prerequisite_algo); | |
| 94 | ✗ | seq.Add(algo_name); | |
| 95 | ✗ | seq.SetName("TEST thread " + std::to_string(order)); | |
| 96 | ✗ | seq.PrintSequence(); | |
| 97 | ✗ | seq.SetOption(algo_name, "log", log_level); | |
| 98 | |||
| 99 | // start the algorithm | ||
| 100 | ✗ | seq.Start(banks); | |
| 101 | |||
| 102 | // loop over frames | ||
| 103 | int nProcessed = 0; | ||
| 104 | ✗ | while(nProcessed < num_events_per_thread || num_events_per_thread == 0) { | |
| 105 | ✗ | stream.pull(events); | |
| 106 | |||
| 107 | // loop over events in this frame | ||
| 108 | int nNonEmpty = 0; | ||
| 109 | ✗ | for(auto& event : events) { | |
| 110 | ✗ | if(event.getSize() > 16) { | |
| 111 | ✗ | nNonEmpty++; | |
| 112 | ✗ | nProcessed++; | |
| 113 | |||
| 114 | // read the banks | ||
| 115 | ✗ | for(auto& bank : banks) | |
| 116 | ✗ | event.read(bank); | |
| 117 | |||
| 118 | // occasionally vary the run number; so far, algorithms with data-dependent configuration | ||
| 119 | // parameters have dependence on run number, so this variation aims to improve thread | ||
| 120 | // sanitizer test coverage | ||
| 121 | ✗ | if(vary_run && run_config_bank_idx.has_value()) { | |
| 122 | // === rapid variation === | ||
| 123 | /* | ||
| 124 | banks[run_config_bank_idx.value()].putInt("run", 0, std::rand() % 20000); | ||
| 125 | */ | ||
| 126 | // === slower variation === | ||
| 127 | ///* | ||
| 128 | ✗ | if(std::rand() % 10 == 0) { // randomly increase or decrease the run number | |
| 129 | ✗ | auto runnum = banks[run_config_bank_idx.value()].getInt("run", 0); | |
| 130 | ✗ | runnum += (std::rand() % 2 == 0) ? 1000 : -1000; | |
| 131 | runnum = std::max(runnum, 0); // prevent negative run number | ||
| 132 | ✗ | banks[run_config_bank_idx.value()].putInt("run", 0, runnum); | |
| 133 | } | ||
| 134 | ✗ | else if(std::rand() % 10 == 1) { | |
| 135 | ✗ | banks[run_config_bank_idx.value()].putInt("run", 0, 1); // set the runnum to '1' | |
| 136 | } | ||
| 137 | //*/ | ||
| 138 | } | ||
| 139 | |||
| 140 | // run the iguana algorithm | ||
| 141 | ✗ | seq.Run(banks); | |
| 142 | } | ||
| 143 | } | ||
| 144 | ✗ | if(nNonEmpty == 0) | |
| 145 | break; | ||
| 146 | } | ||
| 147 | |||
| 148 | // stop the algorithm | ||
| 149 | ✗ | seq.Stop(); | |
| 150 | |||
| 151 | ✗ | seq.GetLog()->Info("nProcessed = {}", nProcessed); | |
| 152 | ✗ | return nProcessed; | |
| 153 | ✗ | }; | |
| 154 | |||
| 155 | // run | ||
| 156 | ✗ | stream.run(ftn, num_threads); | |
| 157 | return 0; | ||
| 158 | ✗ | } | |
| 159 |