E2SAR 0.3.0
Loading...
Searching...
No Matches
e2sarDPReassembler.hpp
1#ifndef E2SARDREASSEMBLERPHPP
2#define E2SARDREASSEMBLERPHPP
3
4#include <boost/asio.hpp>
5#include <boost/lockfree/queue.hpp>
6#include <boost/pool/pool.hpp>
7#include <boost/pool/object_pool.hpp>
8#include <boost/unordered_map.hpp>
9#include <boost/thread.hpp>
10#include <boost/tuple/tuple.hpp>
11#include <boost/tuple/tuple_io.hpp>
12#include <boost/circular_buffer.hpp>
13#include <boost/any.hpp>
14#include <boost/asio/ip/udp.hpp>
15#include <boost/variant.hpp>
16#include <boost/heap/priority_queue.hpp>
17#include <boost/container/flat_set.hpp>
18
19#include <sys/select.h>
20
21#ifdef NETLINK_AVAILABLE
22#include <linux/rtnetlink.h>
23#endif
24
25#include <atomic>
26
27#include "e2sarError.hpp"
28#include "e2sarUtil.hpp"
29#include "e2sarHeaders.hpp"
30#include "e2sarNetUtil.hpp"
31#include "e2sarCP.hpp"
32#include "portable_endian.h"
33
34/***
35 * Dataplane definitions for E2SAR Reassembler
36*/
37
38namespace e2sar
39{
40 const size_t RECV_BUFFER_SIZE{9000};
41 /*
42 The Reassembler class knows how to reassemble the events back. It relies
43 on the RE header structure to reassemble the event, because the LB portion
44 of LBRE header is stripped off by the load balancer.
45
46 It runs on or next to the worker performing event processing.
47 */
49 {
50
51 friend class Segmenter;
52 private:
53 EjfatURI dpuri;
54 LBManager lbman;
55
56 // Segementer queue state - we use lockfree queue
57 // and an associated atomic variable that reflects
58 // how many event entries are in it.
59
60 // Structure to hold each recv-queue item
61 struct EventQueueItem {
62 boost::chrono::steady_clock::time_point firstSegment; // when first segment arrived
63 size_t numFragments; // how many fragments received (in and out of order)
64 size_t bytes; // total length
65 size_t curBytes; // current bytes accumulated (could be scattered across fragments)
66 EventNum_t eventNum;
67 u_int8_t *event;
68 u_int16_t dataId;
69
70 EventQueueItem(): numFragments{0}, bytes{0}, curBytes{0},
71 eventNum{0}, event{nullptr}, dataId{0} {}
72
73 ~EventQueueItem() {}
74
75 EventQueueItem& operator=(const EventQueueItem &i) = delete;
76
77 EventQueueItem(const EventQueueItem &i): firstSegment{i.firstSegment},
78 numFragments{i.numFragments},
79 bytes{i.bytes}, curBytes{i.curBytes},
80 eventNum{i.eventNum}, event{i.event}, dataId{i.dataId} {}
84 EventQueueItem(REHdr *rehdr): EventQueueItem()
85 {
86 this->initFromHeader(rehdr);
87 }
88
89 inline void initFromHeader(REHdr *rehdr)
90 {
91 bytes = rehdr->get_bufferLength();
92 dataId = rehdr->get_dataId();
93 eventNum = rehdr->get_eventNum();
94 // user deallocates this, so we don't use a pool
95 event = new u_int8_t[rehdr->get_bufferLength()];
96 // set the timestamp
97 firstSegment = boost::chrono::steady_clock::now();
98 }
99 };
100
101 // stats block
102 struct AtomicStats {
103 std::atomic<EventNum_t> enqueueLoss{0}; // number of events received and lost on enqueue
104 std::atomic<EventNum_t> reassemblyLoss{0}; // number of events lost in reassembly (missing segments)
105 std::atomic<EventNum_t> eventSuccess{0}; // events successfully processed
106 std::atomic<size_t> totalBytesReceived{0};
107 std::atomic<size_t> totalPacketsReceived{0};
108 std::atomic<size_t> badHeaderDiscards{0}; // number of frames discarded due to failed header check
109 // last error code
110 std::atomic<int> lastErrno{0};
111 // gRPC error count
112 std::atomic<int> grpcErrCnt{0};
113 // data socket error count
114 std::atomic<int> dataErrCnt{0};
115 // last e2sar error
116 std::atomic<E2SARErrorc> lastE2SARError{E2SARErrorc::NoError};
117 // a now unlimited queue to push lost event numbers to
118 boost::lockfree::queue<boost::tuple<EventNum_t, u_int16_t, size_t>*, boost::lockfree::fixed_sized<false>> lostEventsQueue{0};
119 // this array is accessed by different threads using fd as an index (so no collisions)
120 std::vector<size_t> fragmentsPerFd;
121 std::vector<u_int16_t> portPerFd; // which port assigned to this FD - initialized at the start
122 };
123 AtomicStats recvStats;
124
125 // receive event queue definitions
126 static const size_t QSIZE{1000};
127 boost::lockfree::queue<EventQueueItem*> eventQueue{QSIZE};
128 std::atomic<size_t> eventQueueDepth{0};
129
130 // push event on the common event queue
131 // return 1 if event is lost, 0 on success
132 inline int enqueue(const std::shared_ptr<EventQueueItem> &item) noexcept
133 {
134 int ret = 0;
135 // get rid of the shared object here
136 // lockfree queue uses atomic operations and cannot use shared_ptr type
137 auto newItem = new EventQueueItem(*item.get());
138 if (eventQueue.push(newItem))
139 eventQueueDepth++;
140 else
141 {
142 delete newItem; // the shared ptr object will be released by caller
143 ret = 1; // event lost, queue was full
144 }
145 // queue is lock free so we don't lock
146 recvThreadCond.notify_all();
147 return ret;
148 }
149
150 // pop event off the event queue
151 inline EventQueueItem* dequeue() noexcept
152 {
153 EventQueueItem* item{nullptr};
154 auto a = eventQueue.pop(item);
155 if (a)
156 {
157 eventQueueDepth--;
158 return item;
159 } else
160 return nullptr; // queue was empty
161 }
162
163 // PID-related parameters
164 const u_int32_t epochMs; // length of a schedule epoch - 1 sec
165 const float setPoint;
166 const float Kp; // PID proportional
167 const float Ki; // PID integral
168 const float Kd; // PID derivative
169 const float weight; // processing power factor
170 const float min_factor;
171 const float max_factor;
172 struct PIDSample {
173 UnixTimeMicro_t sampleTime; // in usec since epoch
174 float error;
175 float integral;
176
177 PIDSample(UnixTimeMicro_t st, float er, float intg):
178 sampleTime{st}, error{er}, integral{intg} {}
179 };
180 boost::circular_buffer<PIDSample> pidSampleBuffer;
181
182 // have we registered a worker
183 bool registeredWorker{false};
184
185
190 struct GCThreadState {
191 Reassembler &reas;
192 boost::thread threadObj;
193
194 GCThreadState(Reassembler &r): reas{r} {}
195
196 // Go through the list of threads on the recv thread list and
197 // do garbage collection on their partially assembled events
198 void _threadBody();
199 };
200 GCThreadState gcThreadState;
201
206 struct RecvThreadState {
207 // owner object
208 Reassembler &reas;
209 boost::thread threadObj;
210
211 // timers
212 struct timeval sleep_tv;
213
214 // UDP sockets
215 std::vector<int> udpPorts;
216 std::vector<int> sockets;
217 int maxFdPlusOne;
218 fd_set fdSet;
219
220 // object pool from which receive frames come from
221 // template parameter is allocator
222 //boost::pool<> recvBufferPool{RECV_BUFFER_SIZE};
223 // map from <event number, data id> to event queue item (shared pointer)
224 // for those items that are in assembly. Note that event
225 // is uniquely identified by <event number, data id> and
226 // so long as the entropy doesn't change while the event
227 // segments are transmitted, they are guarangeed to go
228 // to the same port
229 boost::unordered_map<std::pair<EventNum_t, u_int16_t>, std::shared_ptr<EventQueueItem>, pair_hash, pair_equal> eventsInProgress;
230 // mutex for guarding access to events in progress (recv thread, gc thread)
231 boost::mutex evtsInProgressMutex;
232 // thread local instance of events we lost
233 boost::container::flat_set<std::pair<EventNum_t, u_int16_t>> lostEvents;
234
235 // CPU core ids
236 std::vector<int> cpuCoreList;
237
238 // this constructor deliberately uses move semantics for uports
239 inline RecvThreadState(Reassembler &r, std::vector<int> &&uports,
240 const std::vector<int> &ccl):
241 reas{r}, udpPorts{uports}, cpuCoreList{ccl}
242 {
243 sleep_tv.tv_sec = 0;
244 sleep_tv.tv_usec = 10000; // 10 msec max
245 }
246
247 inline ~RecvThreadState()
248 {
249 //recvBufferPool.purge_memory();
250 }
251
252 // open v4/v6 sockets
253 result<int> _open();
254 // close sockets
255 result<int> _close();
256 // thread loop
257 void _threadBody();
258
259 // log a lost event and add to lost queue for external inspection
260 // boolean flag discriminates between enqueue losses (true)
261 // and reassembly losses (false)
262 inline void logLostEvent(std::shared_ptr<EventQueueItem> item, bool enqueLoss)
263 {
264 std::pair<EventNum_t, u_int16_t> evt(item->eventNum, item->dataId);
265
266 if (lostEvents.contains(evt))
267 return;
268 // this is thread-local
269 lostEvents.insert(evt);
270 // lockfree queue (only takes trivial types)
271 boost::tuple<EventNum_t, u_int16_t, size_t> *evtPtr =
272 new boost::tuple<EventNum_t, u_int16_t, size_t>(evt.first, evt.second, item->numFragments);
273 reas.recvStats.lostEventsQueue.push(evtPtr);
274 // this is atomic
275 if (enqueLoss)
276 reas.recvStats.enqueueLoss++;
277 else
278 reas.recvStats.reassemblyLoss++;
279 }
280 };
281 friend struct RecvThreadState;
282 std::list<RecvThreadState> recvThreadState;
283
284 // receive related parameters
285 const std::vector<int> cpuCoreList;
286 ip::address dataIP;
287 const u_int16_t dataPort;
288 const int portRange; // translates into 2^portRange - 1 ports we listen to
289 const size_t numRecvThreads;
290 const size_t numRecvPorts;
291 std::vector<std::list<int>> threadsToPorts;
292 const bool withLBHeader;
293 const int eventTimeout_ms; // how long we allow events to linger 'in progress' before we give up
294 const int recvWaitTimeout_ms{10}; // how long we wait on condition variable before we come up for air
295 // recv socket buffer size for setsockop
296 const int rcvSocketBufSize;
297
298 // lock with recv thread
299 boost::mutex recvThreadMtx;
300 // condition variable for recv thread queue
301 boost::condition_variable recvThreadCond;
302 // lock for the mutex
303 //thread_local boost::unique_lock<boost::mutex> condLock(recvThreadMtx, boost::defer_lock);
304
308 inline void assignPortsToThreads()
309 {
310 // O(numRecvPorts)
311 for(size_t i=0; i<numRecvPorts;)
312 {
313 for(size_t j=0; i<numRecvPorts && j<numRecvThreads; i++, j++)
314 {
315 threadsToPorts[j].push_back(dataPort + i);
316 }
317 }
318 }
319
323 struct SendStateThreadState {
324 // owner object
325 Reassembler &reas;
326 boost::thread threadObj;
327
328 const u_int16_t period_ms;
329
330 // UDP sockets
331 int socketFd{0};
332
333 inline SendStateThreadState(Reassembler &r, u_int16_t period_ms):
334 reas{r}, period_ms{period_ms}
335 {}
336
337 // thread loop. all important behavior is encapsulated inside LBManager
338 void _threadBody();
339 };
340 friend struct sendStateThreadState;
341 SendStateThreadState sendStateThreadState;
342 bool useCP; // for debugging we may not want to have CP running
343 bool reportStats; // report worker stats in sendState thread (usually false)
344 // global thread stop signal
345 bool threadsStop{false};
346
350 inline void sanityChecks()
351 {
352 if (numRecvThreads > 128)
353 throw E2SARException("Too many reassembly threads requested, limit 128");
354
355 if (numRecvPorts > (2 << 13))
356 throw E2SARException("Too many receive ports reqiuested, limit 2^14");
357
358 if (eventTimeout_ms > 10000)
359 throw E2SARException("Event timeout exception unreasonably long, limit 10s");
360
361 if (dataPort < 1024)
362 throw E2SARException("Base receive port in the privileged range (<1024)");
363
364 if (portRange > 14)
365 throw E2SARException("Port range out of bounds: [0, 14]");
366
367 if (!dpuri.has_dataAddr())
368 throw E2SARException("Data address not present in the URI");
369 }
370 public:
384 EventNum_t enqueueLoss; // number of events received and lost on enqueue
385 EventNum_t reassemblyLoss; // number of events lost in reassembly due to missing segments
386 EventNum_t eventSuccess; // events successfully processed
387 int lastErrno;
388 int grpcErrCnt;
389 int dataErrCnt;
390 E2SARErrorc lastE2SARError;
391 size_t totalPackets, totalBytes, badHeaderDiscards;
392
393 ReportedStats() = delete;
394 ReportedStats(const AtomicStats &as): enqueueLoss{as.enqueueLoss},
395 reassemblyLoss{as.reassemblyLoss}, eventSuccess{as.eventSuccess},
396 lastErrno{as.lastErrno}, grpcErrCnt{as.grpcErrCnt}, dataErrCnt{as.dataErrCnt},
397 lastE2SARError{as.lastE2SARError}, totalPackets{as.totalPacketsReceived},
398 totalBytes{as.totalBytesReceived}, badHeaderDiscards{as.badHeaderDiscards}
399 {}
400 };
401
427 {
428 bool useCP;
429 bool useHostAddress;
430 u_int16_t period_ms;
431 bool validateCert;
432 float Ki, Kp, Kd, setPoint;
433 u_int32_t epoch_ms;
434 int portRange;
435 bool withLBHeader;
436 int eventTimeout_ms;
437 int rcvSocketBufSize;
438 float weight, min_factor, max_factor;
439 bool reportStats;
440 ReassemblerFlags(): useCP{true}, useHostAddress{false},
441 period_ms{100}, validateCert{true}, Ki{0.}, Kp{0.}, Kd{0.}, setPoint{0.},
442 epoch_ms{1000}, portRange{-1}, withLBHeader{false}, eventTimeout_ms{500},
443 rcvSocketBufSize{1024*1024*3}, weight{1.0}, min_factor{0.5}, max_factor{2.0},
444 reportStats{false} {}
449 static result<Reassembler::ReassemblerFlags> getFromINI(const std::string &iniFile) noexcept;
450 };
463 Reassembler(const EjfatURI &uri, ip::address data_ip, u_int16_t starting_port,
464 std::vector<int> cpuCoreList,
465 const ReassemblerFlags &rflags = ReassemblerFlags());
475 Reassembler(const EjfatURI &uri, ip::address data_ip, u_int16_t starting_port,
476 size_t numRecvThreads = 1, const ReassemblerFlags &rflags = ReassemblerFlags());
477
491 Reassembler(const EjfatURI &uri, u_int16_t starting_port,
492 std::vector<int> cpuCoreList,
493 const ReassemblerFlags &rflags = ReassemblerFlags(),
494 bool v6 = false);
505 Reassembler(const EjfatURI &uri, u_int16_t starting_port,
506 size_t numRecvThreads = 1,
507 const ReassemblerFlags &rflags = ReassemblerFlags(),
508 bool v6 = false);
509
510 Reassembler(const Reassembler &r) = delete;
511 Reassembler & operator=(const Reassembler &o) = delete;
513 {
514 if (useCP && registeredWorker)
515 auto res = lbman.deregisterWorker();
516
517 stopThreads();
518
519 // pool memory is implicitly freed when pool goes out of scope
520 }
521
527 result<int> registerWorker(const std::string &node_name) noexcept;
528
533 result<int> deregisterWorker() noexcept;
534
541 result<int> openAndStart() noexcept;
542
552 result<int> getEvent(uint8_t **event, size_t *bytes, EventNum_t* eventNum, uint16_t *dataId) noexcept;
553
563 result<int> recvEvent(uint8_t **event, size_t *bytes, EventNum_t* eventNum, uint16_t *dataId, u_int64_t wait_ms=0) noexcept;
564
575 inline const ReportedStats getStats() const noexcept
576 {
577 return ReportedStats(recvStats);
578 }
579
584 inline result<boost::tuple<EventNum_t, u_int16_t, size_t>> get_LostEvent() noexcept
585 {
586 boost::tuple<EventNum_t, u_int16_t, size_t> *res = nullptr;
587 if (recvStats.lostEventsQueue.pop(res))
588 {
589 auto ret{*res};
590 delete res;
591 return ret;
592 }
593 else
594 return E2SARErrorInfo{E2SARErrorc::NotFound, "Lost event queue is empty"};
595 }
596
602 inline result<std::list<std::pair<u_int16_t, size_t>>> get_FDStats() noexcept
603 {
604 if (not threadsStop)
605 return E2SARErrorInfo{E2SARErrorc::LogicError, "This method should only be called after the threads have been stopped."};
606
607 int i{0};
608 std::list<std::pair<u_int16_t, size_t>> ret;
609 for(auto count: recvStats.fragmentsPerFd)
610 {
611 if (recvStats.portPerFd[i] != 0)
612 ret.push_back(std::make_pair<>(recvStats.portPerFd[i], count));
613 ++i;
614 }
615 return ret;
616 }
617
621 inline size_t get_numRecvThreads() const noexcept
622 {
623 return numRecvThreads;
624 }
625
629 inline const std::pair<int, int> get_recvPorts() const noexcept
630 {
631 return std::make_pair(dataPort, dataPort + numRecvPorts - 1);
632 }
633
638 inline int get_portRange() const noexcept
639 {
640 return portRange;
641 }
642
646 inline const ip::address get_dataIP() const noexcept
647 {
648 return dataIP;
649 }
654 {
655 if (not threadsStop)
656 {
657 threadsStop = true;
658
659 recvThreadCond.notify_all();
660
661 // wait to exit
662 if (useCP)
663 sendStateThreadState.threadObj.join();
664
665 for(auto i = recvThreadState.begin(); i != recvThreadState.end(); ++i)
666 i->threadObj.join();
667
668 // drain event queue
669 EventQueueItem* item{nullptr};
670 bool a{false};
671 do {
672 a = eventQueue.pop(item);
673 if (a)
674 {
675 if (item->event != nullptr)
676 delete[] item->event;
677 delete item;
678 }
679 } while (a);
680
681 gcThreadState.threadObj.join();
682
683 // drain lost events queue - tuples are heap-allocated in logLostEvent
684 // and only freed by get_LostEvent(); if the caller never calls it they leak
685 boost::tuple<EventNum_t, u_int16_t, size_t>* evtPtr{nullptr};
686 while (recvStats.lostEventsQueue.pop(evtPtr))
687 delete evtPtr;
688 }
689 }
690 protected:
691 private:
692
693
694
695 };
696}
697#endif
Definition e2sarError.hpp:62
Definition e2sarUtil.hpp:56
const bool has_dataAddr() const
Definition e2sarUtil.hpp:294
Definition e2sarCP.hpp:206
result< int > deregisterWorker() noexcept
Definition e2sarCP.cpp:476
Definition e2sarDPReassembler.hpp:49
result< std::list< std::pair< u_int16_t, size_t > > > get_FDStats() noexcept
Definition e2sarDPReassembler.hpp:602
result< int > registerWorker(const std::string &node_name) noexcept
Definition e2sarDPReassembler.cpp:603
result< int > recvEvent(uint8_t **event, size_t *bytes, EventNum_t *eventNum, uint16_t *dataId, u_int64_t wait_ms=0) noexcept
Definition e2sarDPReassembler.cpp:643
const ReportedStats getStats() const noexcept
Definition e2sarDPReassembler.hpp:575
result< int > getEvent(uint8_t **event, size_t *bytes, EventNum_t *eventNum, uint16_t *dataId) noexcept
Definition e2sarDPReassembler.cpp:626
result< boost::tuple< EventNum_t, u_int16_t, size_t > > get_LostEvent() noexcept
Definition e2sarDPReassembler.hpp:584
void stopThreads()
Definition e2sarDPReassembler.hpp:653
const ip::address get_dataIP() const noexcept
Definition e2sarDPReassembler.hpp:646
Reassembler(const EjfatURI &uri, ip::address data_ip, u_int16_t starting_port, std::vector< int > cpuCoreList, const ReassemblerFlags &rflags=ReassemblerFlags())
Definition e2sarDPReassembler.cpp:37
int get_portRange() const noexcept
Definition e2sarDPReassembler.hpp:638
const std::pair< int, int > get_recvPorts() const noexcept
Definition e2sarDPReassembler.hpp:629
result< int > openAndStart() noexcept
Definition e2sarDPReassembler.cpp:184
result< int > deregisterWorker() noexcept
Definition e2sarDPReassembler.cpp:614
size_t get_numRecvThreads() const noexcept
Definition e2sarDPReassembler.hpp:621
Definition e2sarDPSegmenter.hpp:48
Definition e2sar.hpp:11
E2SARErrorc
Definition e2sarError.hpp:24
Definition e2sarError.hpp:42
Definition e2sarHeaders.hpp:22
EventNum_t get_eventNum() const
Definition e2sarHeaders.hpp:43
u_int16_t get_dataId() const
Definition e2sarHeaders.hpp:67
u_int32_t get_bufferLength() const
Definition e2sarHeaders.hpp:51
Definition e2sarDPReassembler.hpp:427
static result< Reassembler::ReassemblerFlags > getFromINI(const std::string &iniFile) noexcept
Definition e2sarDPReassembler.cpp:678
Definition e2sarDPReassembler.hpp:383
Definition e2sarUtil.hpp:535
Definition e2sarUtil.hpp:526