61 struct EventQueueItem {
62 boost::chrono::steady_clock::time_point firstSegment;
70 EventQueueItem(): numFragments{0}, bytes{0}, curBytes{0},
71 eventNum{0},
event{
nullptr}, dataId{0} {}
75 EventQueueItem& operator=(
const EventQueueItem &i) =
delete;
77 EventQueueItem(
const EventQueueItem &i): firstSegment{i.firstSegment},
78 numFragments{i.numFragments},
79 bytes{i.bytes}, curBytes{i.curBytes},
80 eventNum{i.eventNum},
event{i.event}, dataId{i.dataId} {}
84 EventQueueItem(
REHdr *rehdr): EventQueueItem()
86 this->initFromHeader(rehdr);
89 inline void initFromHeader(
REHdr *rehdr)
97 firstSegment = boost::chrono::steady_clock::now();
103 std::atomic<EventNum_t> enqueueLoss{0};
104 std::atomic<EventNum_t> reassemblyLoss{0};
105 std::atomic<EventNum_t> eventSuccess{0};
106 std::atomic<size_t> totalBytesReceived{0};
107 std::atomic<size_t> totalPacketsReceived{0};
108 std::atomic<size_t> badHeaderDiscards{0};
110 std::atomic<int> lastErrno{0};
112 std::atomic<int> grpcErrCnt{0};
114 std::atomic<int> dataErrCnt{0};
116 std::atomic<E2SARErrorc> lastE2SARError{E2SARErrorc::NoError};
118 boost::lockfree::queue<boost::tuple<EventNum_t, u_int16_t, size_t>*, boost::lockfree::fixed_sized<false>> lostEventsQueue{0};
120 std::vector<size_t> fragmentsPerFd;
121 std::vector<u_int16_t> portPerFd;
123 AtomicStats recvStats;
126 static const size_t QSIZE{1000};
127 boost::lockfree::queue<EventQueueItem*> eventQueue{QSIZE};
128 std::atomic<size_t> eventQueueDepth{0};
132 inline int enqueue(
const std::shared_ptr<EventQueueItem> &item)
noexcept
137 auto newItem =
new EventQueueItem(*item.get());
138 if (eventQueue.push(newItem))
146 recvThreadCond.notify_all();
151 inline EventQueueItem* dequeue()
noexcept
153 EventQueueItem* item{
nullptr};
154 auto a = eventQueue.pop(item);
164 const u_int32_t epochMs;
165 const float setPoint;
170 const float min_factor;
171 const float max_factor;
173 UnixTimeMicro_t sampleTime;
177 PIDSample(UnixTimeMicro_t st,
float er,
float intg):
178 sampleTime{st}, error{er}, integral{intg} {}
180 boost::circular_buffer<PIDSample> pidSampleBuffer;
183 bool registeredWorker{
false};
190 struct GCThreadState {
192 boost::thread threadObj;
200 GCThreadState gcThreadState;
206 struct RecvThreadState {
209 boost::thread threadObj;
212 struct timeval sleep_tv;
215 std::vector<int> udpPorts;
216 std::vector<int> sockets;
229 boost::unordered_map<std::pair<EventNum_t, u_int16_t>, std::shared_ptr<EventQueueItem>,
pair_hash,
pair_equal> eventsInProgress;
231 boost::mutex evtsInProgressMutex;
233 boost::container::flat_set<std::pair<EventNum_t, u_int16_t>> lostEvents;
236 std::vector<int> cpuCoreList;
239 inline RecvThreadState(
Reassembler &r, std::vector<int> &&uports,
240 const std::vector<int> &ccl):
241 reas{r}, udpPorts{uports}, cpuCoreList{ccl}
244 sleep_tv.tv_usec = 10000;
247 inline ~RecvThreadState()
255 result<int> _close();
262 inline void logLostEvent(std::shared_ptr<EventQueueItem> item,
bool enqueLoss)
264 std::pair<EventNum_t, u_int16_t> evt(item->eventNum, item->dataId);
266 if (lostEvents.contains(evt))
269 lostEvents.insert(evt);
271 boost::tuple<EventNum_t, u_int16_t, size_t> *evtPtr =
272 new boost::tuple<EventNum_t, u_int16_t, size_t>(evt.first, evt.second, item->numFragments);
273 reas.recvStats.lostEventsQueue.push(evtPtr);
276 reas.recvStats.enqueueLoss++;
278 reas.recvStats.reassemblyLoss++;
281 friend struct RecvThreadState;
282 std::list<RecvThreadState> recvThreadState;
285 const std::vector<int> cpuCoreList;
287 const u_int16_t dataPort;
289 const size_t numRecvThreads;
290 const size_t numRecvPorts;
291 std::vector<std::list<int>> threadsToPorts;
292 const bool withLBHeader;
293 const int eventTimeout_ms;
294 const int recvWaitTimeout_ms{10};
296 const int rcvSocketBufSize;
299 boost::mutex recvThreadMtx;
301 boost::condition_variable recvThreadCond;
308 inline void assignPortsToThreads()
311 for(
size_t i=0; i<numRecvPorts;)
313 for(
size_t j=0; i<numRecvPorts && j<numRecvThreads; i++, j++)
315 threadsToPorts[j].push_back(dataPort + i);
323 struct SendStateThreadState {
326 boost::thread threadObj;
328 const u_int16_t period_ms;
333 inline SendStateThreadState(
Reassembler &r, u_int16_t period_ms):
334 reas{r}, period_ms{period_ms}
340 friend struct sendStateThreadState;
341 SendStateThreadState sendStateThreadState;
345 bool threadsStop{
false};
350 inline void sanityChecks()
352 if (numRecvThreads > 128)
353 throw E2SARException(
"Too many reassembly threads requested, limit 128");
355 if (numRecvPorts > (2 << 13))
356 throw E2SARException(
"Too many receive ports reqiuested, limit 2^14");
358 if (eventTimeout_ms > 10000)
359 throw E2SARException(
"Event timeout exception unreasonably long, limit 10s");
362 throw E2SARException(
"Base receive port in the privileged range (<1024)");
384 EventNum_t enqueueLoss;
385 EventNum_t reassemblyLoss;
386 EventNum_t eventSuccess;
391 size_t totalPackets, totalBytes, badHeaderDiscards;
394 ReportedStats(
const AtomicStats &as): enqueueLoss{as.enqueueLoss},
395 reassemblyLoss{as.reassemblyLoss}, eventSuccess{as.eventSuccess},
396 lastErrno{as.lastErrno}, grpcErrCnt{as.grpcErrCnt}, dataErrCnt{as.dataErrCnt},
397 lastE2SARError{as.lastE2SARError}, totalPackets{as.totalPacketsReceived},
398 totalBytes{as.totalBytesReceived}, badHeaderDiscards{as.badHeaderDiscards}
432 float Ki, Kp, Kd, setPoint;
437 int rcvSocketBufSize;
438 float weight, min_factor, max_factor;
441 period_ms{100}, validateCert{
true}, Ki{0.}, Kp{0.}, Kd{0.}, setPoint{0.},
442 epoch_ms{1000}, portRange{-1}, withLBHeader{
false}, eventTimeout_ms{500},
443 rcvSocketBufSize{1024*1024*3}, weight{1.0}, min_factor{0.5}, max_factor{2.0},
444 reportStats{
false} {}
449 static result<Reassembler::ReassemblerFlags>
getFromINI(
const std::string &iniFile)
noexcept;
464 std::vector<int> cpuCoreList,
492 std::vector<int> cpuCoreList,
506 size_t numRecvThreads = 1,
514 if (useCP && registeredWorker)
527 result<int>
registerWorker(
const std::string &node_name)
noexcept;
552 result<
int>
getEvent(uint8_t **event,
size_t *bytes, EventNum_t* eventNum, uint16_t *dataId) noexcept;
563 result<
int>
recvEvent(uint8_t **event,
size_t *bytes, EventNum_t* eventNum, uint16_t *dataId, u_int64_t wait_ms=0) noexcept;
584 inline result<boost::tuple<EventNum_t, u_int16_t, size_t>>
get_LostEvent() noexcept
586 boost::tuple<EventNum_t, u_int16_t, size_t> *res =
nullptr;
587 if (recvStats.lostEventsQueue.pop(res))
594 return E2SARErrorInfo{E2SARErrorc::NotFound,
"Lost event queue is empty"};
602 inline result<std::list<std::pair<u_int16_t, size_t>>>
get_FDStats() noexcept
605 return E2SARErrorInfo{E2SARErrorc::LogicError,
"This method should only be called after the threads have been stopped."};
608 std::list<std::pair<u_int16_t, size_t>> ret;
609 for(
auto count: recvStats.fragmentsPerFd)
611 if (recvStats.portPerFd[i] != 0)
612 ret.push_back(std::make_pair<>(recvStats.portPerFd[i], count));
623 return numRecvThreads;
631 return std::make_pair(dataPort, dataPort + numRecvPorts - 1);
659 recvThreadCond.notify_all();
663 sendStateThreadState.threadObj.join();
665 for(
auto i = recvThreadState.begin(); i != recvThreadState.end(); ++i)
669 EventQueueItem* item{
nullptr};
672 a = eventQueue.pop(item);
675 if (item->event !=
nullptr)
676 delete[] item->event;
681 gcThreadState.threadObj.join();
685 boost::tuple<EventNum_t, u_int16_t, size_t>* evtPtr{
nullptr};
686 while (recvStats.lostEventsQueue.pop(evtPtr))