E2SAR 0.2.0
Loading...
Searching...
No Matches
e2sarDPReassembler.hpp
1#ifndef E2SARDREASSEMBLERPHPP
2#define E2SARDREASSEMBLERPHPP
3
4#include <boost/asio.hpp>
5#include <boost/lockfree/queue.hpp>
6#include <boost/pool/pool.hpp>
7#include <boost/pool/object_pool.hpp>
8#include <boost/unordered_map.hpp>
9#include <boost/thread.hpp>
10#include <boost/tuple/tuple.hpp>
11#include <boost/tuple/tuple_io.hpp>
12#include <boost/circular_buffer.hpp>
13#include <boost/any.hpp>
14#include <boost/asio/ip/udp.hpp>
15#include <boost/variant.hpp>
16#include <boost/heap/priority_queue.hpp>
17#include <boost/container/flat_set.hpp>
18
19#include <sys/select.h>
20
21#ifdef NETLINK_AVAILABLE
22#include <linux/rtnetlink.h>
23#endif
24
25#include <atomic>
26
27#include "e2sarError.hpp"
28#include "e2sarUtil.hpp"
29#include "e2sarHeaders.hpp"
30#include "e2sarNetUtil.hpp"
31#include "e2sarCP.hpp"
32#include "portable_endian.h"
33
34/***
35 * Dataplane definitions for E2SAR Reassembler
36*/
37
38namespace e2sar
39{
40 const size_t RECV_BUFFER_SIZE{9000};
41 /*
42 The Reassembler class knows how to reassemble the events back. It relies
43 on the RE header structure to reassemble the event, because the LB portion
44 of LBRE header is stripped off by the load balancer.
45
46 It runs on or next to the worker performing event processing.
47 */
49 {
50
51 friend class Segmenter;
52 private:
53 EjfatURI dpuri;
54 LBManager lbman;
55
56 // Segementer queue state - we use lockfree queue
57 // and an associated atomic variable that reflects
58 // how many event entries are in it.
59
60 // Structure to hold each recv-queue item
61 struct EventQueueItem {
62 boost::chrono::steady_clock::time_point firstSegment; // when first segment arrived
63 size_t numFragments; // how many fragments received (in and out of order)
64 size_t bytes; // total length
65 size_t curBytes; // current bytes accumulated (could be scattered across fragments)
66 EventNum_t eventNum;
67 u_int8_t *event;
68 u_int16_t dataId;
69
70 EventQueueItem(): numFragments{0}, bytes{0}, curBytes{0},
71 eventNum{0}, event{nullptr}, dataId{0} {}
72
73 ~EventQueueItem() {}
74
75 EventQueueItem& operator=(const EventQueueItem &i) = delete;
76
77 EventQueueItem(const EventQueueItem &i): firstSegment{i.firstSegment},
78 numFragments{i.numFragments},
79 bytes{i.bytes}, curBytes{i.curBytes},
80 eventNum{i.eventNum}, event{i.event}, dataId{i.dataId} {}
84 EventQueueItem(REHdr *rehdr): EventQueueItem()
85 {
86 this->initFromHeader(rehdr);
87 }
88
89 inline void initFromHeader(REHdr *rehdr)
90 {
91 bytes = rehdr->get_bufferLength();
92 dataId = rehdr->get_dataId();
93 eventNum = rehdr->get_eventNum();
94 // user deallocates this, so we don't use a pool
95 event = new u_int8_t[rehdr->get_bufferLength()];
96 // set the timestamp
97 firstSegment = boost::chrono::steady_clock::now();
98 }
99 };
100
101 // stats block
102 struct AtomicStats {
103 std::atomic<EventNum_t> enqueueLoss{0}; // number of events received and lost on enqueue
104 std::atomic<EventNum_t> reassemblyLoss{0}; // number of events lost in reassembly (missing segments)
105 std::atomic<EventNum_t> eventSuccess{0}; // events successfully processed
106 // last error code
107 std::atomic<int> lastErrno{0};
108 // gRPC error count
109 std::atomic<int> grpcErrCnt{0};
110 // data socket error count
111 std::atomic<int> dataErrCnt{0};
112 // last e2sar error
113 std::atomic<E2SARErrorc> lastE2SARError{E2SARErrorc::NoError};
114 // a limited queue to push lost event numbers to
115 //boost::lockfree::queue<std::pair<EventNum_t, u_int16_t>*> lostEventsQueue{20};
116 boost::lockfree::queue<boost::tuple<EventNum_t, u_int16_t, size_t>*> lostEventsQueue{20};
117 // this array is accessed by different threads using fd as an index (so no collisions)
118 std::vector<size_t> fragmentsPerFd;
119 std::vector<u_int16_t> portPerFd; // which port assigned to this FD - initialized at the start
120 };
121 AtomicStats recvStats;
122
123 // receive event queue definitions
124 static const size_t QSIZE{1000};
125 boost::lockfree::queue<EventQueueItem*> eventQueue{QSIZE};
126 std::atomic<size_t> eventQueueDepth{0};
127
128 // push event on the common event queue
129 // return 1 if event is lost, 0 on success
130 inline int enqueue(const std::shared_ptr<EventQueueItem> &item) noexcept
131 {
132 int ret = 0;
133 // get rid of the shared object here
134 // lockfree queue uses atomic operations and cannot use shared_ptr type
135 auto newItem = new EventQueueItem(*item.get());
136 if (eventQueue.push(newItem))
137 eventQueueDepth++;
138 else
139 {
140 delete newItem; // the shared ptr object will be released by caller
141 ret = 1; // event lost, queue was full
142 }
143 // queue is lock free so we don't lock
144 recvThreadCond.notify_all();
145 return ret;
146 }
147
148 // pop event off the event queue
149 inline EventQueueItem* dequeue() noexcept
150 {
151 EventQueueItem* item{nullptr};
152 auto a = eventQueue.pop(item);
153 if (a)
154 {
155 eventQueueDepth--;
156 return item;
157 } else
158 return nullptr; // queue was empty
159 }
160
161 // PID-related parameters
162 const u_int32_t epochMs; // length of a schedule epoch - 1 sec
163 const float setPoint;
164 const float Kp; // PID proportional
165 const float Ki; // PID integral
166 const float Kd; // PID derivative
167 const float weight; // processing power factor
168 const float min_factor;
169 const float max_factor;
170 struct PIDSample {
171 UnixTimeMicro_t sampleTime; // in usec since epoch
172 float error;
173 float integral;
174
175 PIDSample(UnixTimeMicro_t st, float er, float intg):
176 sampleTime{st}, error{er}, integral{intg} {}
177 };
178 boost::circular_buffer<PIDSample> pidSampleBuffer;
179
180 // have we registered a worker
181 bool registeredWorker{false};
182
183
188 struct GCThreadState {
189 Reassembler &reas;
190 boost::thread threadObj;
191
192 GCThreadState(Reassembler &r): reas{r} {}
193
194 // Go through the list of threads on the recv thread list and
195 // do garbage collection on their partially assembled events
196 void _threadBody();
197 };
198 GCThreadState gcThreadState;
199
204 struct RecvThreadState {
205 // owner object
206 Reassembler &reas;
207 boost::thread threadObj;
208
209 // timers
210 struct timeval sleep_tv;
211
212 // UDP sockets
213 std::vector<int> udpPorts;
214 std::vector<int> sockets;
215 int maxFdPlusOne;
216 fd_set fdSet;
217
218 // object pool from which receive frames come from
219 // template parameter is allocator
220 //boost::pool<> recvBufferPool{RECV_BUFFER_SIZE};
221 // map from <event number, data id> to event queue item (shared pointer)
222 // for those items that are in assembly. Note that event
223 // is uniquely identified by <event number, data id> and
224 // so long as the entropy doesn't change while the event
225 // segments are transmitted, they are guarangeed to go
226 // to the same port
227 boost::unordered_map<std::pair<EventNum_t, u_int16_t>, std::shared_ptr<EventQueueItem>, pair_hash, pair_equal> eventsInProgress;
228 // mutex for guarding access to events in progress (recv thread, gc thread)
229 boost::mutex evtsInProgressMutex;
230 // thread local instance of events we lost
231 boost::container::flat_set<std::pair<EventNum_t, u_int16_t>> lostEvents;
232
233 // CPU core ids
234 std::vector<int> cpuCoreList;
235
236 // this constructor deliberately uses move semantics for uports
237 inline RecvThreadState(Reassembler &r, std::vector<int> &&uports,
238 const std::vector<int> &ccl):
239 reas{r}, udpPorts{uports}, cpuCoreList{ccl}
240 {
241 sleep_tv.tv_sec = 0;
242 sleep_tv.tv_usec = 10000; // 10 msec max
243 }
244
245 inline ~RecvThreadState()
246 {
247 //recvBufferPool.purge_memory();
248 }
249
250 // open v4/v6 sockets
251 result<int> _open();
252 // close sockets
253 result<int> _close();
254 // thread loop
255 void _threadBody();
256
257 // log a lost event and add to lost queue for external inspection
258 // boolean flag discriminates between enqueue losses (true)
259 // and reassembly losses (false)
260 inline void logLostEvent(std::shared_ptr<EventQueueItem> item, bool enqueLoss)
261 {
262 std::pair<EventNum_t, u_int16_t> evt(item->eventNum, item->dataId);
263
264 if (lostEvents.contains(evt))
265 return;
266 // this is thread-local
267 lostEvents.insert(evt);
268 // lockfree queue (only takes trivial types)
269 boost::tuple<EventNum_t, u_int16_t, size_t> *evtPtr =
270 new boost::tuple<EventNum_t, u_int16_t, size_t>(evt.first, evt.second, item->numFragments);
271 reas.recvStats.lostEventsQueue.push(evtPtr);
272 // this is atomic
273 if (enqueLoss)
274 reas.recvStats.enqueueLoss++;
275 else
276 reas.recvStats.reassemblyLoss++;
277 }
278 };
279 friend struct RecvThreadState;
280 std::list<RecvThreadState> recvThreadState;
281
282 // receive related parameters
283 const std::vector<int> cpuCoreList;
284 ip::address dataIP;
285 const u_int16_t dataPort;
286 const int portRange; // translates into 2^portRange - 1 ports we listen to
287 const size_t numRecvThreads;
288 const size_t numRecvPorts;
289 std::vector<std::list<int>> threadsToPorts;
290 const bool withLBHeader;
291 const int eventTimeout_ms; // how long we allow events to linger 'in progress' before we give up
292 const int recvWaitTimeout_ms{10}; // how long we wait on condition variable before we come up for air
293 // recv socket buffer size for setsockop
294 const int rcvSocketBufSize;
295
296 // lock with recv thread
297 boost::mutex recvThreadMtx;
298 // condition variable for recv thread queue
299 boost::condition_variable recvThreadCond;
300 // lock for the mutex
301 //thread_local boost::unique_lock<boost::mutex> condLock(recvThreadMtx, boost::defer_lock);
302
306 inline void assignPortsToThreads()
307 {
308 // O(numRecvPorts)
309 for(size_t i=0; i<numRecvPorts;)
310 {
311 for(size_t j=0; i<numRecvPorts && j<numRecvThreads; i++, j++)
312 {
313 threadsToPorts[j].push_back(dataPort + i);
314 }
315 }
316 }
317
321 struct SendStateThreadState {
322 // owner object
323 Reassembler &reas;
324 boost::thread threadObj;
325
326 const u_int16_t period_ms;
327
328 // UDP sockets
329 int socketFd{0};
330
331 inline SendStateThreadState(Reassembler &r, u_int16_t period_ms):
332 reas{r}, period_ms{period_ms}
333 {}
334
335 // thread loop. all important behavior is encapsulated inside LBManager
336 void _threadBody();
337 };
338 friend struct sendStateThreadState;
339 SendStateThreadState sendStateThreadState;
340 bool useCP; // for debugging we may not want to have CP running
341 // global thread stop signal
342 bool threadsStop{false};
343
347 inline void sanityChecks()
348 {
349 if (numRecvThreads > 128)
350 throw E2SARException("Too many reassembly threads requested, limit 128");
351
352 if (numRecvPorts > (2 << 13))
353 throw E2SARException("Too many receive ports reqiuested, limit 2^14");
354
355 if (eventTimeout_ms > 5000)
356 throw E2SARException("Event timeout exception unreasonably long, limit 5s");
357
358 if (dataPort < 1024)
359 throw E2SARException("Base receive port in the privileged range (<1024)");
360
361 if (portRange > 14)
362 throw E2SARException("Port range out of bounds: [0, 14]");
363
364 if (!dpuri.has_dataAddr())
365 throw E2SARException("Data address not present in the URI");
366 }
367 public:
379 EventNum_t enqueueLoss; // number of events received and lost on enqueue
380 EventNum_t reassemblyLoss; // number of events lost in reassembly due to missing segments
381 EventNum_t eventSuccess; // events successfully processed
382 int lastErrno;
383 int grpcErrCnt;
384 int dataErrCnt;
385 E2SARErrorc lastE2SARError;
386
387 ReportedStats() = delete;
388 ReportedStats(const AtomicStats &as): enqueueLoss{as.enqueueLoss},
389 reassemblyLoss{as.reassemblyLoss}, eventSuccess{as.eventSuccess},
390 lastErrno{as.lastErrno}, grpcErrCnt{as.grpcErrCnt}, dataErrCnt{as.dataErrCnt},
391 lastE2SARError{as.lastE2SARError}
392 {}
393 };
394
420 {
421 bool useCP;
422 bool useHostAddress;
423 u_int16_t period_ms;
424 bool validateCert;
425 float Ki, Kp, Kd, setPoint;
426 u_int32_t epoch_ms;
427 int portRange;
428 bool withLBHeader;
429 int eventTimeout_ms;
430 int rcvSocketBufSize;
431 float weight, min_factor, max_factor;
432 ReassemblerFlags(): useCP{true}, useHostAddress{false},
433 period_ms{100}, validateCert{true}, Ki{0.}, Kp{0.}, Kd{0.}, setPoint{0.},
434 epoch_ms{1000}, portRange{-1}, withLBHeader{false}, eventTimeout_ms{500},
435 rcvSocketBufSize{1024*1024*3}, weight{1.0}, min_factor{0.5}, max_factor{2.0} {}
440 static result<Reassembler::ReassemblerFlags> getFromINI(const std::string &iniFile) noexcept;
441 };
454 Reassembler(const EjfatURI &uri, ip::address data_ip, u_int16_t starting_port,
455 std::vector<int> cpuCoreList,
456 const ReassemblerFlags &rflags = ReassemblerFlags());
466 Reassembler(const EjfatURI &uri, ip::address data_ip, u_int16_t starting_port,
467 size_t numRecvThreads = 1, const ReassemblerFlags &rflags = ReassemblerFlags());
468
482 Reassembler(const EjfatURI &uri, u_int16_t starting_port,
483 std::vector<int> cpuCoreList,
484 const ReassemblerFlags &rflags = ReassemblerFlags(),
485 bool v6 = false);
496 Reassembler(const EjfatURI &uri, u_int16_t starting_port,
497 size_t numRecvThreads = 1,
498 const ReassemblerFlags &rflags = ReassemblerFlags(),
499 bool v6 = false);
500
501 Reassembler(const Reassembler &r) = delete;
502 Reassembler & operator=(const Reassembler &o) = delete;
504 {
505 if (useCP && registeredWorker)
506 auto res = lbman.deregisterWorker();
507
508 stopThreads();
509 recvThreadCond.notify_all();
510
511 // wait to exit
512 if (useCP)
513 sendStateThreadState.threadObj.join();
514
515 for(auto i = recvThreadState.begin(); i != recvThreadState.end(); ++i)
516 i->threadObj.join();
517
518 gcThreadState.threadObj.join();
519
520 // pool memory is implicitly freed when pool goes out of scope
521 }
522
528 result<int> registerWorker(const std::string &node_name) noexcept;
529
534 result<int> deregisterWorker() noexcept;
535
542 result<int> openAndStart() noexcept;
543
553 result<int> getEvent(uint8_t **event, size_t *bytes, EventNum_t* eventNum, uint16_t *dataId) noexcept;
554
564 result<int> recvEvent(uint8_t **event, size_t *bytes, EventNum_t* eventNum, uint16_t *dataId, u_int64_t wait_ms=0) noexcept;
565
576 inline const ReportedStats getStats() const noexcept
577 {
578 return ReportedStats(recvStats);
579 }
580
585 inline result<boost::tuple<EventNum_t, u_int16_t, size_t>> get_LostEvent() noexcept
586 {
587 boost::tuple<EventNum_t, u_int16_t, size_t> *res = nullptr;
588 if (recvStats.lostEventsQueue.pop(res))
589 {
590 auto ret{*res};
591 delete res;
592 return ret;
593 }
594 else
595 return E2SARErrorInfo{E2SARErrorc::NotFound, "Lost event queue is empty"};
596 }
597
603 inline result<std::list<std::pair<u_int16_t, size_t>>> get_FDStats() noexcept
604 {
605 if (not threadsStop)
606 return E2SARErrorInfo{E2SARErrorc::LogicError, "This method should only be called after the threads have been stopped."};
607
608 int i{0};
609 std::list<std::pair<u_int16_t, size_t>> ret;
610 for(auto count: recvStats.fragmentsPerFd)
611 {
612 if (recvStats.portPerFd[i] != 0)
613 ret.push_back(std::make_pair<>(recvStats.portPerFd[i], count));
614 ++i;
615 }
616 return ret;
617 }
618
622 inline const size_t get_numRecvThreads() const noexcept
623 {
624 return numRecvThreads;
625 }
626
630 inline const std::pair<int, int> get_recvPorts() const noexcept
631 {
632 return std::make_pair(dataPort, dataPort + numRecvPorts - 1);
633 }
634
639 inline const int get_portRange() const noexcept
640 {
641 return portRange;
642 }
643
647 inline const ip::address get_dataIP() const noexcept
648 {
649 return dataIP;
650 }
655 {
656 threadsStop = true;
657 }
658 protected:
659 private:
660
661
662
663 };
664}
665#endif
Definition e2sarError.hpp:61
Definition e2sarUtil.hpp:31
const bool has_dataAddr() const
Definition e2sarUtil.hpp:231
Definition e2sarCP.hpp:120
result< int > deregisterWorker() noexcept
Definition e2sarCP.cpp:474
Definition e2sarDPReassembler.hpp:49
result< std::list< std::pair< u_int16_t, size_t > > > get_FDStats() noexcept
Definition e2sarDPReassembler.hpp:603
result< int > registerWorker(const std::string &node_name) noexcept
Definition e2sarDPReassembler.cpp:568
result< int > recvEvent(uint8_t **event, size_t *bytes, EventNum_t *eventNum, uint16_t *dataId, u_int64_t wait_ms=0) noexcept
Definition e2sarDPReassembler.cpp:608
const ReportedStats getStats() const noexcept
Definition e2sarDPReassembler.hpp:576
result< int > getEvent(uint8_t **event, size_t *bytes, EventNum_t *eventNum, uint16_t *dataId) noexcept
Definition e2sarDPReassembler.cpp:591
result< boost::tuple< EventNum_t, u_int16_t, size_t > > get_LostEvent() noexcept
Definition e2sarDPReassembler.hpp:585
void stopThreads()
Definition e2sarDPReassembler.hpp:654
const int get_portRange() const noexcept
Definition e2sarDPReassembler.hpp:639
const ip::address get_dataIP() const noexcept
Definition e2sarDPReassembler.hpp:647
const size_t get_numRecvThreads() const noexcept
Definition e2sarDPReassembler.hpp:622
Reassembler(const EjfatURI &uri, ip::address data_ip, u_int16_t starting_port, std::vector< int > cpuCoreList, const ReassemblerFlags &rflags=ReassemblerFlags())
Definition e2sarDPReassembler.cpp:37
const std::pair< int, int > get_recvPorts() const noexcept
Definition e2sarDPReassembler.hpp:630
result< int > openAndStart() noexcept
Definition e2sarDPReassembler.cpp:180
result< int > deregisterWorker() noexcept
Definition e2sarDPReassembler.cpp:579
Definition e2sarDPSegmenter.hpp:48
Definition e2sar.hpp:11
E2SARErrorc
Definition e2sarError.hpp:24
Definition e2sarError.hpp:41
Definition e2sarHeaders.hpp:22
EventNum_t get_eventNum() const
Definition e2sarHeaders.hpp:43
u_int16_t get_dataId() const
Definition e2sarHeaders.hpp:67
u_int32_t get_bufferLength() const
Definition e2sarHeaders.hpp:51
Definition e2sarDPReassembler.hpp:420
static result< Reassembler::ReassemblerFlags > getFromINI(const std::string &iniFile) noexcept
Definition e2sarDPReassembler.cpp:643
Definition e2sarDPReassembler.hpp:378
Definition e2sarUtil.hpp:474
Definition e2sarUtil.hpp:465