Iguana 0.0.0
Implementation Guardian of Analysis Algorithms
Loading...
Searching...
No Matches
TestMultithreading.h
1// multithreaded test of an iguana algorithm
2
3#include <hipo4/reader.h>
4#include <iguana/algorithms/AlgorithmSequence.h>
5
6inline int TestMultithreading(
7 std::string const command,
8 std::string const algo_name,
9 std::vector<std::string> const prerequisite_algos,
10 std::vector<std::string> const bank_names,
11 std::string const data_file,
12 int const num_events,
13 int const num_threads,
14 std::string const concurrency_model,
15 bool const vary_run,
16 bool const verbose)
17{
18
19 iguana::Logger log("test", verbose ? iguana::Logger::Level::trace : iguana::Logger::Level::info);
20
21 // check arguments
22 if(algo_name.empty() || bank_names.empty()) {
23 log.Error("need algorithm name and banks");
24 return 1;
25 }
26 if(data_file.empty()) {
27 log.Error("need a data file for command {:?}", command);
28 return 1;
29 }
30
31 // set the concurrency model
32 if(!concurrency_model.empty())
33 iguana::GlobalConcurrencyModel = concurrency_model;
34
35 // find the 'RUN::config' bank, if any
36 std::optional<hipo::banklist::size_type> run_config_bank_idx{};
37 if(vary_run) {
38 for(hipo::banklist::size_type idx = 0; idx < bank_names.size(); idx++) {
39 if(bank_names.at(idx) == "RUN::config") {
40 run_config_bank_idx = idx;
41 break;
42 }
43 }
44 }
45
46 // number of events per thread
47 int const default_frame_size = 50;
48 int num_events_per_thread = (int) std::round((double) num_events / num_threads);
49 int num_events_per_frame = num_events > 0 ? std::min(num_events_per_thread, default_frame_size) : default_frame_size;
50 int num_frames_per_thread = num_events > 0 ? (int) std::ceil((double) num_events_per_thread / num_events_per_frame) : 0;
51 int num_events_actual = num_events_per_frame * num_frames_per_thread * num_threads;
52 log.Info("num_events_per_thread = {}", num_events_per_thread);
53 log.Info("num_events_per_frame = {}", num_events_per_frame );
54 log.Info("num_frames_per_thread = {}", num_frames_per_thread);
55 if(num_events > 0) {
56 log.Info("=> will actually process num_events = {}", num_events_actual);
57 if(num_events != num_events_actual)
58 log.Warn("argument's num_events ({}) differs from the actual num_events that will be processed ({})",
59 num_events, num_events_actual);
60 } else {
61 log.Info("=> will actually process num_events = ALL OF THEM");
62 }
63
64 // start the stream
65 hipo::readerstream stream;
66 stream.open(data_file.c_str());
67
68 // define the worker function
69 auto ftn = [
70 &stream,
71 algo_name,
72 prerequisite_algos,
73 bank_names,
74 vary_run,
75 verbose,
76 num_events_per_thread,
77 num_events_per_frame,
78 run_config_bank_idx
79 ](int order) {
80
81 // fill a frame
82 std::vector<hipo::event> events;
83 for(int i = 0; i < num_events_per_frame; i++)
84 events.push_back(hipo::event());
85
86 // bank list
87 hipo::banklist banks;
88 for(auto const& bank_name : bank_names)
89 banks.push_back(hipo::bank(stream.dictionary().getSchema(bank_name.c_str()),48));
90
91 // define the algorithm
93 for(auto const& prerequisite_algo : prerequisite_algos)
94 seq.Add(prerequisite_algo);
95 seq.Add(algo_name);
96 seq.SetName("TEST thread " + std::to_string(order));
97 seq.PrintSequence();
98 seq.SetOption(algo_name, "log", verbose ? "trace" : "info");
99
100 // start the algorithm
101 seq.Start(banks);
102
103 // loop over frames
104 int nProcessed = 0;
105 while(nProcessed < num_events_per_thread || num_events_per_thread == 0) {
106 stream.pull(events);
107
108 // loop over events in this frame
109 int nNonEmpty = 0;
110 for(auto& event : events) {
111 if(event.getSize() > 16) {
112 nNonEmpty++;
113 nProcessed++;
114
115 // read the banks
116 for(auto& bank : banks)
117 event.read(bank);
118
119 // occasionally vary the run number; so far, algorithms with data-dependent configuration
120 // parameters have dependence on run number, so this variation aims to improve thread
121 // sanitizer test coverage
122 if(vary_run && run_config_bank_idx.has_value()) {
123 // === rapid variation ===
124 /*
125 banks[run_config_bank_idx.value()].putInt("run", 0, std::rand() % 20000);
126 */
127 // === slower variation ===
129 if(std::rand() % 10 == 0) { // randomly increase or decrease the run number
130 auto runnum = banks[run_config_bank_idx.value()].getInt("run", 0);
131 runnum += (std::rand() % 2 == 0) ? 1000 : -1000;
132 runnum = std::max(runnum, 0); // prevent negative run number
133 banks[run_config_bank_idx.value()].putInt("run", 0, runnum);
134 }
135 else if(std::rand() % 10 == 1) {
136 banks[run_config_bank_idx.value()].putInt("run", 0, 1); // set the runnum to '1'
137 }
138 //*/
139 }
140
141 // run the iguana algorithm
142 seq.Run(banks);
143 }
144 }
145 if(nNonEmpty == 0)
146 break;
147 }
148
149 // stop the algorithm
150 seq.Stop();
151
152 seq.GetLog()->Info("nProcessed = {}", nProcessed);
153 return nProcessed;
154 };
155
156 // run
157 stream.run(ftn, num_threads);
158 return 0;
159}
User-level class for running a sequence of algorithms.
void SetOption(std::string const &algo_name, std::string const &key, const OPTION_TYPE val)
void Stop() override
Finalize this algorithm after all events are processed.
void Run(hipo::banklist &banks) const override
Run this algorithm for an event.
void Add(std::string const &class_name, std::string const &instance_name="")
void Start(hipo::banklist &banks) override
Initialize this algorithm before any events are processed, with the intent to process banks
void SetName(std::string_view name)
void PrintSequence(Logger::Level level=Logger::info) const
Simple logger service.
Definition Logger.h:31
std::unique_ptr< Logger > & GetLog()
GlobalParam< std::string > GlobalConcurrencyModel
The concurrency model, for running certain algorithms in a thread-safe way.