Line |
Branch |
Exec |
Source |
1 |
|
|
// multithreaded test of an iguana algorithm |
2 |
|
|
|
3 |
|
|
#include <hipo4/reader.h> |
4 |
|
|
#include <iguana/algorithms/AlgorithmSequence.h> |
5 |
|
|
|
6 |
|
✗ |
inline int TestMultithreading( |
7 |
|
|
std::string const command, |
8 |
|
|
std::string const algo_name, |
9 |
|
|
std::vector<std::string> const prerequisite_algos, |
10 |
|
|
std::vector<std::string> const bank_names, |
11 |
|
|
std::string const data_file, |
12 |
|
|
int const num_events, |
13 |
|
|
int const num_threads, |
14 |
|
|
std::string const concurrency_model, |
15 |
|
|
bool const vary_run, |
16 |
|
|
bool const verbose) |
17 |
|
|
{ |
18 |
|
|
|
19 |
|
✗ |
iguana::Logger log("test", verbose ? iguana::Logger::Level::trace : iguana::Logger::Level::info); |
20 |
|
|
|
21 |
|
|
// check arguments |
22 |
|
✗ |
if(algo_name.empty() || bank_names.empty()) { |
23 |
|
✗ |
log.Error("need algorithm name and banks"); |
24 |
|
✗ |
return 1; |
25 |
|
|
} |
26 |
|
✗ |
if(data_file.empty()) { |
27 |
|
✗ |
log.Error("need a data file for command {:?}", command); |
28 |
|
✗ |
return 1; |
29 |
|
|
} |
30 |
|
|
|
31 |
|
|
// set the concurrency model |
32 |
|
✗ |
if(!concurrency_model.empty()) |
33 |
|
✗ |
iguana::GlobalConcurrencyModel = concurrency_model; |
34 |
|
|
|
35 |
|
|
// find the 'RUN::config' bank, if any |
36 |
|
✗ |
std::optional<hipo::banklist::size_type> run_config_bank_idx{}; |
37 |
|
✗ |
if(vary_run) { |
38 |
|
✗ |
for(hipo::banklist::size_type idx = 0; idx < bank_names.size(); idx++) { |
39 |
|
✗ |
if(bank_names.at(idx) == "RUN::config") { |
40 |
|
✗ |
run_config_bank_idx = idx; |
41 |
|
✗ |
break; |
42 |
|
|
} |
43 |
|
|
} |
44 |
|
|
} |
45 |
|
|
|
46 |
|
|
// number of events per thread |
47 |
|
✗ |
int const default_frame_size = 50; |
48 |
|
✗ |
int num_events_per_thread = (int) std::round((double) num_events / num_threads); |
49 |
|
✗ |
int num_events_per_frame = num_events > 0 ? std::min(num_events_per_thread, default_frame_size) : default_frame_size; |
50 |
|
✗ |
int num_frames_per_thread = num_events > 0 ? (int) std::ceil((double) num_events_per_thread / num_events_per_frame) : 0; |
51 |
|
✗ |
int num_events_actual = num_events_per_frame * num_frames_per_thread * num_threads; |
52 |
|
✗ |
log.Info("num_events_per_thread = {}", num_events_per_thread); |
53 |
|
✗ |
log.Info("num_events_per_frame = {}", num_events_per_frame ); |
54 |
|
✗ |
log.Info("num_frames_per_thread = {}", num_frames_per_thread); |
55 |
|
✗ |
if(num_events > 0) { |
56 |
|
✗ |
log.Info("=> will actually process num_events = {}", num_events_actual); |
57 |
|
✗ |
if(num_events != num_events_actual) |
58 |
|
✗ |
log.Warn("argument's num_events ({}) differs from the actual num_events that will be processed ({})", |
59 |
|
|
num_events, num_events_actual); |
60 |
|
|
} else { |
61 |
|
✗ |
log.Info("=> will actually process num_events = ALL OF THEM"); |
62 |
|
|
} |
63 |
|
|
|
64 |
|
|
// start the stream |
65 |
|
✗ |
hipo::readerstream stream; |
66 |
|
|
stream.open(data_file.c_str()); |
67 |
|
|
|
68 |
|
|
// define the worker function |
69 |
|
✗ |
auto ftn = [ |
70 |
|
|
&stream, |
71 |
|
|
algo_name, |
72 |
|
|
prerequisite_algos, |
73 |
|
|
bank_names, |
74 |
|
|
vary_run, |
75 |
|
|
verbose, |
76 |
|
|
num_events_per_thread, |
77 |
|
|
num_events_per_frame, |
78 |
|
|
run_config_bank_idx |
79 |
|
|
](int order) { |
80 |
|
|
|
81 |
|
|
// fill a frame |
82 |
|
|
std::vector<hipo::event> events; |
83 |
|
✗ |
for(int i = 0; i < num_events_per_frame; i++) |
84 |
|
✗ |
events.push_back(hipo::event()); |
85 |
|
|
|
86 |
|
|
// bank list |
87 |
|
|
hipo::banklist banks; |
88 |
|
✗ |
for(auto const& bank_name : bank_names) |
89 |
|
✗ |
banks.push_back(hipo::bank(stream.dictionary().getSchema(bank_name.c_str()),48)); |
90 |
|
|
|
91 |
|
|
// define the algorithm |
92 |
|
✗ |
iguana::AlgorithmSequence seq; |
93 |
|
✗ |
for(auto const& prerequisite_algo : prerequisite_algos) |
94 |
|
✗ |
seq.Add(prerequisite_algo); |
95 |
|
✗ |
seq.Add(algo_name); |
96 |
|
✗ |
seq.SetName("TEST thread " + std::to_string(order)); |
97 |
|
✗ |
seq.PrintSequence(); |
98 |
|
✗ |
seq.SetOption(algo_name, "log", verbose ? "trace" : "info"); |
99 |
|
|
|
100 |
|
|
// start the algorithm |
101 |
|
✗ |
seq.Start(banks); |
102 |
|
|
|
103 |
|
|
// loop over frames |
104 |
|
|
int nProcessed = 0; |
105 |
|
✗ |
while(nProcessed < num_events_per_thread || num_events_per_thread == 0) { |
106 |
|
✗ |
stream.pull(events); |
107 |
|
|
|
108 |
|
|
// loop over events in this frame |
109 |
|
|
int nNonEmpty = 0; |
110 |
|
✗ |
for(auto& event : events) { |
111 |
|
✗ |
if(event.getSize() > 16) { |
112 |
|
✗ |
nNonEmpty++; |
113 |
|
✗ |
nProcessed++; |
114 |
|
|
|
115 |
|
|
// read the banks |
116 |
|
✗ |
for(auto& bank : banks) |
117 |
|
✗ |
event.read(bank); |
118 |
|
|
|
119 |
|
|
// occasionally vary the run number; so far, algorithms with data-dependent configuration |
120 |
|
|
// parameters have dependence on run number, so this variation aims to improve thread |
121 |
|
|
// sanitizer test coverage |
122 |
|
✗ |
if(vary_run && run_config_bank_idx.has_value()) { |
123 |
|
|
// === rapid variation === |
124 |
|
|
/* |
125 |
|
|
banks[run_config_bank_idx.value()].putInt("run", 0, std::rand() % 20000); |
126 |
|
|
*/ |
127 |
|
|
// === slower variation === |
128 |
|
|
///* |
129 |
|
✗ |
if(std::rand() % 10 == 0) { // randomly increase or decrease the run number |
130 |
|
✗ |
auto runnum = banks[run_config_bank_idx.value()].getInt("run", 0); |
131 |
|
✗ |
runnum += (std::rand() % 2 == 0) ? 1000 : -1000; |
132 |
|
✗ |
runnum = std::max(runnum, 0); // prevent negative run number |
133 |
|
✗ |
banks[run_config_bank_idx.value()].putInt("run", 0, runnum); |
134 |
|
|
} |
135 |
|
✗ |
else if(std::rand() % 10 == 1) { |
136 |
|
✗ |
banks[run_config_bank_idx.value()].putInt("run", 0, 1); // set the runnum to '1' |
137 |
|
|
} |
138 |
|
|
//*/ |
139 |
|
|
} |
140 |
|
|
|
141 |
|
|
// run the iguana algorithm |
142 |
|
✗ |
seq.Run(banks); |
143 |
|
|
} |
144 |
|
|
} |
145 |
|
✗ |
if(nNonEmpty == 0) |
146 |
|
|
break; |
147 |
|
|
} |
148 |
|
|
|
149 |
|
|
// stop the algorithm |
150 |
|
✗ |
seq.Stop(); |
151 |
|
|
|
152 |
|
✗ |
seq.GetLog()->Info("nProcessed = {}", nProcessed); |
153 |
|
✗ |
return nProcessed; |
154 |
|
✗ |
}; |
155 |
|
|
|
156 |
|
|
// run |
157 |
|
✗ |
stream.run(ftn, num_threads); |
158 |
|
|
return 0; |
159 |
|
✗ |
} |
160 |
|
|
|