61 struct EventQueueItem {
62 boost::chrono::steady_clock::time_point firstSegment;
70 EventQueueItem(): numFragments{0}, bytes{0}, curBytes{0},
71 eventNum{0},
event{
nullptr}, dataId{0} {}
75 EventQueueItem& operator=(
const EventQueueItem &i) =
delete;
77 EventQueueItem(
const EventQueueItem &i): firstSegment{i.firstSegment},
78 numFragments{i.numFragments},
79 bytes{i.bytes}, curBytes{i.curBytes},
80 eventNum{i.eventNum},
event{i.event}, dataId{i.dataId} {}
84 EventQueueItem(
REHdr *rehdr): EventQueueItem()
86 this->initFromHeader(rehdr);
89 inline void initFromHeader(
REHdr *rehdr)
97 firstSegment = boost::chrono::steady_clock::now();
103 std::atomic<EventNum_t> enqueueLoss{0};
104 std::atomic<EventNum_t> reassemblyLoss{0};
105 std::atomic<EventNum_t> eventSuccess{0};
107 std::atomic<int> lastErrno{0};
109 std::atomic<int> grpcErrCnt{0};
111 std::atomic<int> dataErrCnt{0};
113 std::atomic<E2SARErrorc> lastE2SARError{E2SARErrorc::NoError};
116 boost::lockfree::queue<boost::tuple<EventNum_t, u_int16_t, size_t>*> lostEventsQueue{20};
118 std::vector<size_t> fragmentsPerFd;
119 std::vector<u_int16_t> portPerFd;
121 AtomicStats recvStats;
124 static const size_t QSIZE{1000};
125 boost::lockfree::queue<EventQueueItem*> eventQueue{QSIZE};
126 std::atomic<size_t> eventQueueDepth{0};
130 inline int enqueue(
const std::shared_ptr<EventQueueItem> &item)
noexcept
135 auto newItem =
new EventQueueItem(*item.get());
136 if (eventQueue.push(newItem))
144 recvThreadCond.notify_all();
149 inline EventQueueItem* dequeue()
noexcept
151 EventQueueItem* item{
nullptr};
152 auto a = eventQueue.pop(item);
162 const u_int32_t epochMs;
163 const float setPoint;
168 const float min_factor;
169 const float max_factor;
171 UnixTimeMicro_t sampleTime;
175 PIDSample(UnixTimeMicro_t st,
float er,
float intg):
176 sampleTime{st}, error{er}, integral{intg} {}
178 boost::circular_buffer<PIDSample> pidSampleBuffer;
181 bool registeredWorker{
false};
188 struct GCThreadState {
190 boost::thread threadObj;
198 GCThreadState gcThreadState;
204 struct RecvThreadState {
207 boost::thread threadObj;
210 struct timeval sleep_tv;
213 std::vector<int> udpPorts;
214 std::vector<int> sockets;
227 boost::unordered_map<std::pair<EventNum_t, u_int16_t>, std::shared_ptr<EventQueueItem>,
pair_hash,
pair_equal> eventsInProgress;
229 boost::mutex evtsInProgressMutex;
231 boost::container::flat_set<std::pair<EventNum_t, u_int16_t>> lostEvents;
234 std::vector<int> cpuCoreList;
237 inline RecvThreadState(
Reassembler &r, std::vector<int> &&uports,
238 const std::vector<int> &ccl):
239 reas{r}, udpPorts{uports}, cpuCoreList{ccl}
242 sleep_tv.tv_usec = 10000;
245 inline ~RecvThreadState()
253 result<int> _close();
260 inline void logLostEvent(std::shared_ptr<EventQueueItem> item,
bool enqueLoss)
262 std::pair<EventNum_t, u_int16_t> evt(item->eventNum, item->dataId);
264 if (lostEvents.contains(evt))
267 lostEvents.insert(evt);
269 boost::tuple<EventNum_t, u_int16_t, size_t> *evtPtr =
270 new boost::tuple<EventNum_t, u_int16_t, size_t>(evt.first, evt.second, item->numFragments);
271 reas.recvStats.lostEventsQueue.push(evtPtr);
274 reas.recvStats.enqueueLoss++;
276 reas.recvStats.reassemblyLoss++;
279 friend struct RecvThreadState;
280 std::list<RecvThreadState> recvThreadState;
283 const std::vector<int> cpuCoreList;
285 const u_int16_t dataPort;
287 const size_t numRecvThreads;
288 const size_t numRecvPorts;
289 std::vector<std::list<int>> threadsToPorts;
290 const bool withLBHeader;
291 const int eventTimeout_ms;
292 const int recvWaitTimeout_ms{10};
294 const int rcvSocketBufSize;
297 boost::mutex recvThreadMtx;
299 boost::condition_variable recvThreadCond;
306 inline void assignPortsToThreads()
309 for(
size_t i=0; i<numRecvPorts;)
311 for(
size_t j=0; i<numRecvPorts && j<numRecvThreads; i++, j++)
313 threadsToPorts[j].push_back(dataPort + i);
321 struct SendStateThreadState {
324 boost::thread threadObj;
326 const u_int16_t period_ms;
331 inline SendStateThreadState(
Reassembler &r, u_int16_t period_ms):
332 reas{r}, period_ms{period_ms}
338 friend struct sendStateThreadState;
339 SendStateThreadState sendStateThreadState;
342 bool threadsStop{
false};
347 inline void sanityChecks()
349 if (numRecvThreads > 128)
350 throw E2SARException(
"Too many reassembly threads requested, limit 128");
352 if (numRecvPorts > (2 << 13))
353 throw E2SARException(
"Too many receive ports reqiuested, limit 2^14");
355 if (eventTimeout_ms > 5000)
356 throw E2SARException(
"Event timeout exception unreasonably long, limit 5s");
359 throw E2SARException(
"Base receive port in the privileged range (<1024)");
379 EventNum_t enqueueLoss;
380 EventNum_t reassemblyLoss;
381 EventNum_t eventSuccess;
388 ReportedStats(
const AtomicStats &as): enqueueLoss{as.enqueueLoss},
389 reassemblyLoss{as.reassemblyLoss}, eventSuccess{as.eventSuccess},
390 lastErrno{as.lastErrno}, grpcErrCnt{as.grpcErrCnt}, dataErrCnt{as.dataErrCnt},
391 lastE2SARError{as.lastE2SARError}
425 float Ki, Kp, Kd, setPoint;
430 int rcvSocketBufSize;
431 float weight, min_factor, max_factor;
433 period_ms{100}, validateCert{
true}, Ki{0.}, Kp{0.}, Kd{0.}, setPoint{0.},
434 epoch_ms{1000}, portRange{-1}, withLBHeader{
false}, eventTimeout_ms{500},
435 rcvSocketBufSize{1024*1024*3}, weight{1.0}, min_factor{0.5}, max_factor{2.0} {}
440 static result<Reassembler::ReassemblerFlags>
getFromINI(
const std::string &iniFile)
noexcept;
455 std::vector<int> cpuCoreList,
483 std::vector<int> cpuCoreList,
497 size_t numRecvThreads = 1,
505 if (useCP && registeredWorker)
509 recvThreadCond.notify_all();
513 sendStateThreadState.threadObj.join();
515 for(
auto i = recvThreadState.begin(); i != recvThreadState.end(); ++i)
518 gcThreadState.threadObj.join();
528 result<int>
registerWorker(
const std::string &node_name)
noexcept;
553 result<
int>
getEvent(uint8_t **event,
size_t *bytes, EventNum_t* eventNum, uint16_t *dataId) noexcept;
564 result<
int>
recvEvent(uint8_t **event,
size_t *bytes, EventNum_t* eventNum, uint16_t *dataId, u_int64_t wait_ms=0) noexcept;
585 inline result<boost::tuple<EventNum_t, u_int16_t, size_t>>
get_LostEvent() noexcept
587 boost::tuple<EventNum_t, u_int16_t, size_t> *res =
nullptr;
588 if (recvStats.lostEventsQueue.pop(res))
595 return E2SARErrorInfo{E2SARErrorc::NotFound,
"Lost event queue is empty"};
603 inline result<std::list<std::pair<u_int16_t, size_t>>>
get_FDStats() noexcept
606 return E2SARErrorInfo{E2SARErrorc::LogicError,
"This method should only be called after the threads have been stopped."};
609 std::list<std::pair<u_int16_t, size_t>> ret;
610 for(
auto count: recvStats.fragmentsPerFd)
612 if (recvStats.portPerFd[i] != 0)
613 ret.push_back(std::make_pair<>(recvStats.portPerFd[i], count));
624 return numRecvThreads;
632 return std::make_pair(dataPort, dataPort + numRecvPorts - 1);