E2SAR 0.3.0
Loading...
Searching...
No Matches
e2sarDPSegmenter.hpp
1#ifndef E2SARDSEGMENTERPHPP
2#define E2SARDSEGMENTERPHPP
3
4#include <sys/types.h>
5#include <sys/socket.h>
6
7#ifdef LIBURING_AVAILABLE
8#include <liburing.h>
9#endif
10
11#include <boost/asio.hpp>
12#include <boost/lockfree/queue.hpp>
13#include <boost/pool/pool.hpp>
14#include <boost/pool/object_pool.hpp>
15#include <boost/thread.hpp>
16#include <boost/tuple/tuple.hpp>
17#include <boost/tuple/tuple_io.hpp>
18#include <boost/circular_buffer.hpp>
19#include <boost/any.hpp>
20#include <boost/asio/ip/udp.hpp>
21#include <boost/variant.hpp>
22#include <boost/random.hpp>
23#include <boost/chrono.hpp>
24
25#include <atomic>
26
27#include "e2sar.hpp"
28#include "e2sarUtil.hpp"
29#include "e2sarHeaders.hpp"
30#include "e2sarNetUtil.hpp"
31#include "portable_endian.h"
32
33/***
34 * Dataplane definitions for E2SAR Segmenter
35*/
36
37namespace e2sar
38{
39 /*
40 The Segmenter class knows how to break up the provided
41 events into segments consumable by the hardware loadbalancer.
42 It relies on header structures to segment into UDP packets and
43 follows other LB rules while doing it.
44
45 It runs on or next to the source of events.
46 */
48 {
49 friend class Reassembler;
50 private:
51 EjfatURI dpuri;
52 // unique identifier of the originating segmentation
53 // point (e.g. a DAQ), carried in RE header (could be persistent
54 // as set here, or specified per event buffer)
55 const u_int16_t dataId;
56 // unique identifier of an individual LB packet transmitting
57 // host/daq, 32-bit to accommodate IP addresses more easily
58 // carried in Sync header
59 const u_int32_t eventSrcId;
60
61 // number of send sockets we will be using (to help randomize LAG ports on FPGAs)
62 const size_t numSendSockets;
63
64 // send socket buffer size for setsockop
65 const int sndSocketBufSize;
66
67 // send rate (ignore if negative)
68 const float rateGbps;
69 // used to avoid floating point comparisons, set to false if rateGbps <= 0
70 const bool rateLimit;
71 // use smoothing rate shaping, i.e. only in sendmsg wait after every call
72 // WARNING: Incompatible with optimizations that send entire batches of
73 // frames to the kernel, i.e. sendMmsg and io_uring
74 const bool smooth;
75 // use multiple destination ports (for back-to-back testing only)
76 const bool multiPort;
77 // which LB header version are we using
78 const u_int8_t lbHdrVersion;
79
80 // Max size of internal queue holding events to be sent.
81 static constexpr size_t QSIZE{2047};
82
83 // size of CQE batch we peek
84 static constexpr unsigned cqeBatchSize{100};
85
86 // how long data send thread spends sleeping
87 static constexpr boost::chrono::milliseconds sleepTime{1};
88
89 // Structure to hold each send-queue item
90 struct EventQueueItem {
91 uint32_t bytes;
92 EventNum_t eventNum;
93 u_int16_t dataId;
94 u_int8_t *event;
95 u_int16_t entropy; // optional per event entropy
96 void (*callback)(boost::any);
97 boost::any cbArg;
98 };
99
100 // Fast, lock-free, wait-free queue (supports multiple producers/consumers)
101 boost::lockfree::queue<EventQueueItem*, boost::lockfree::fixed_sized<true>> eventQueue{QSIZE};
102
103#ifdef LIBURING_AVAILABLE
104 std::vector<struct io_uring> rings;
105 std::vector<boost::mutex> ringMtxs;
106
107 // each ring has to have a predefined size - we want to
108 // put at least 2*eventSize/bufferSize entries onto it
109 const size_t uringSize = 1000;
110
111 // we need to be able to call the callback from the CQE thread
112 // instead of send thread when liburing optimization is turned on
113 struct SQEUserData
114 {
115 struct msghdr* msghdr;
116 void (*callback)(boost::any);
117 boost::any cbArg;
118 };
119#endif
120
121 // structure that maintains send stats
122 struct SendStats {
123 // Last time a sync message sent to CP in nanosec since epoch.
124 UnixTimeNano_t lastSyncTimeNanos;
125 // Number of events sent since last sync message sent to CP.
126 EventNum_t eventsSinceLastSync;
127 };
128 // event metadata fifo to keep track of stats
129 boost::circular_buffer<SendStats> eventStatsBuffer;
130 // keep these atomic, as they are accessed by Sync, Send (and maybe main) thread
131 boost::atomic<UnixTimeNano_t> currentSyncStartNano{0};
132 boost::atomic<EventNum_t> eventsInCurrentSync{0};
133
134 // currently user-assigned or sequential event number at enqueuing and reported in RE header
135 boost::atomic<EventNum_t> userEventNum{0};
136
137 // fast random number generator
138 boost::random::ranlux24_base ranlux;
139 // to get better entropy in usec clock samples (if needed)
140 boost::random::uniform_int_distribution<> lsbDist{0, 255};
141
142 // we RR through these FDs for send thread _send
143 size_t roundRobinIndex{0};
144
149 struct AtomicStats {
150 // sync messages sent
151 std::atomic<u_int64_t> msgCnt{0};
152 // sync errors seen on send
153 std::atomic<u_int64_t> errCnt{0};
154 // last error code
155 std::atomic<int> lastErrno{0};
156 // last e2sar error
157 std::atomic<E2SARErrorc> lastE2SARError{E2SARErrorc::NoError};
158 };
159 // independent stats for each thread
160 AtomicStats syncStats;
161 AtomicStats sendStats;
162
166 struct SyncThreadState {
167 // owner object
168 Segmenter &seg;
169 boost::thread threadObj;
170 // period in ms
171 const u_int16_t period_ms{100};
172 // connect socket flag (usually true)
173 const bool connectSocket{true};
174 // sockaddr_in[6] union (use boost::get<sockaddr_in> or
175 // boost::get<sockaddr_in6> to get to the appropriate structure)
176#define GET_V4_SYNC_STRUCT(sas) boost::get<sockaddr_in>(sas)
177#define GET_V6_SYNC_STRUCT(sas) boost::get<sockaddr_in6>(sas)
178 boost::variant<sockaddr_in, sockaddr_in6> syncAddrStruct;
179 // flag that tells us we are v4 or v6
180 bool isV6{false};
181 // UDP sockets
182 int socketFd{0};
183
184 inline SyncThreadState(Segmenter &s, u_int16_t time_period_ms, bool cnct=true):
185 seg{s},
186 period_ms{time_period_ms},
187 connectSocket{cnct}
188 {}
189
190 result<int> _open();
191 result<int> _close();
192 result<int> _send(SyncHdr *hdr);
193 void _threadBody();
194
195
196 };
197 friend struct SyncThreadState;
198
199 SyncThreadState syncThreadState;
200
205 struct SendThreadState {
206 // owner object
207 Segmenter &seg;
208 boost::thread threadObj;
209 // thread index (to help pick core)
210 int threadIndex;
211 // connect socket flag (usually true)
212 const bool connectSocket{true};
213
214 // flags
215 const bool useV6;
216 const bool ticksAsREEventNum;
217
218 // transmit parameters
219 size_t mtu{0}; // must accommodate typical IP, UDP, LB+RE headers and payload; not a const because we may change it
220 std::string iface{""}; // outgoing interface - we may set it if possible
221 size_t maxPldLen; // not a const because mtu is not a const
222
223 // UDP sockets and matching sockaddr structures (local and remote)
224 // <socket fd, local address, remote address>
225#define GET_FD(sas, i) boost::get<0>(sas[i])
226#define GET_LOCAL_SEND_STRUCT(sas,i) boost::get<1>(sas[i])
227#define GET_REMOTE_SEND_STRUCT(sas, i) boost::get<2>(sas[i])
228 std::vector<boost::tuple<int, sockaddr_in, sockaddr_in>> socketFd4;
229 std::vector<boost::tuple<int, sockaddr_in6, sockaddr_in6>> socketFd6;
230
231 // fast random number generator to create entropy values for events
232 // this entropy value is held the same for all packets of a given
233 // event guaranteeing the same destination UDP port for all of them
234 boost::random::ranlux24_base ranlux;
235 boost::random::uniform_int_distribution<> randDist{0, std::numeric_limits<u_int16_t>::max()};
236 // to get random port numbers we skip low numbered privileged ports
237 boost::random::uniform_int_distribution<> portDist{10000, std::numeric_limits<u_int16_t>::max()};
238
239 inline SendThreadState(Segmenter &s, int idx, bool v6, u_int16_t mtu, bool tasreenum, bool cnct=true):
240 seg{s}, threadIndex{idx}, connectSocket{cnct}, useV6{v6}, ticksAsREEventNum{tasreenum}, mtu{mtu},
241 maxPldLen{mtu - getTotalHeaderLength(v6)}, socketFd4(s.numSendSockets),
242 socketFd6(s.numSendSockets),
243 ranlux{static_cast<u_int32_t>(std::time(0))}
244 {
245 // this way every segmenter send thread has a unique PRNG sequence
246 auto nowT = boost::chrono::system_clock::now();
247 ranlux.seed(boost::chrono::duration_cast<boost::chrono::nanoseconds>(nowT.time_since_epoch()).count());
248 }
249
250 // open v4/v6 sockets
251 result<int> _open();
252 // close sockets
253 result<int> _close();
254 // close a given socket, wait that it has sent all the data (in Linux)
255 result<int> _waitAndCloseFd(int fd);
256 // fragment and send the event
257 result<int> _send(u_int8_t *event, size_t bytes, EventNum_t altEventNum, u_int16_t dataId,
258 u_int16_t entropy, size_t roundRobinIndex, int64_t interFrameSleepUsec = 0,
259 void (*callback)(boost::any) = nullptr, boost::any cbArg = nullptr);
260 // thread loop
261 void _threadBody();
262#ifdef LIBURING_AVAILABLE
263 // reap CQEs if LIBURING is used
264 void _reap(size_t roundRobinIndex);
265#endif
266 };
267 friend struct SendThreadState;
268
269 SendThreadState sendThreadState;
270 const size_t numSendThreads{1};
271 // list of cores we can use to run threads
272 // can be longer than the number of threads
273 // thread at index i uses core cpuCoreList[i]
274 // we don't check cores are unique
275 const std::vector<int> cpuCoreList;
276
277#ifdef LIBURING_AVAILABLE
278 // this is the sleep time for kernel thread in poll mode
279 // it is in milliseconds
280 static constexpr unsigned pollWaitTime{2000};
281 // atomic counter of outstanging sends
282 boost::atomic<u_int32_t> outstandingSends{0};
283#endif
284
285 // lock with send thread
286 boost::mutex sendThreadMtx;
287 // condition variable for send thread
288 //boost::condition_variable sendThreadCond;
289 // warm up period in MS between sync thread starting and data being allowed to be sent
290 u_int16_t warmUpMs;
291 // use control plane (can be disabled for debugging)
292 bool useCP;
293#define MIN_CLOCK_ENTROPY 6
294 bool addEntropy;
295
299 inline void sanityChecks()
300 {
301 if (numSendSockets > 128)
302 throw E2SARException("Too many sending sockets threads requested, limit 128");
303
304 if (syncThreadState.period_ms > 10000)
305 throw E2SARException("Sync period too long, limit 10s");
306
307 if (sendThreadState.mtu > 9000)
308 throw E2SARException("MTU set too long, limit 9000");
309
310 if (useCP and not dpuri.has_syncAddr())
311 throw E2SARException("Sync address not present in the URI");
312
313 if (not dpuri.has_dataAddr())
314 throw E2SARException("Data address is not present in the URI");
315
316 if (sendThreadState.mtu <= getTotalHeaderLength(sendThreadState.useV6))
317 throw E2SARErrorInfo{E2SARErrorc::SocketError, "Insufficient MTU length to accommodate headers"};
318 }
319
321 bool threadsStop{false};
323 bool syncThreadStop{false};
324 public:
334 u_int64_t msgCnt;
335 u_int64_t errCnt;
336 int lastErrno;
337 E2SARErrorc lastE2SARError;
338
339 ReportedStats() = delete;
340 ReportedStats(const AtomicStats &as): msgCnt{as.msgCnt}, errCnt{as.errCnt},
341 lastErrno{as.lastErrno}, lastE2SARError{as.lastE2SARError}
342 {}
343 };
344
371 {
372 bool dpV6;
373 bool connectedSocket;
374 bool useCP;
375 u_int16_t warmUpMs;
376 u_int16_t syncPeriodMs;
377 u_int16_t syncPeriods;
378 u_int16_t mtu;
379 size_t numSendSockets;
380 int sndSocketBufSize;
381 float rateGbps;
382 bool smooth;
383 bool multiPort;
384 bool ticksAsREEventNum;
385 u_int8_t lbHdrVersion;
386
387 SegmenterFlags(): dpV6{false}, connectedSocket{true},
388 useCP{true}, warmUpMs{1000}, syncPeriodMs{1000}, syncPeriods{2}, mtu{1500},
389 numSendSockets{4}, sndSocketBufSize{1024*1024*3}, rateGbps{-1.0}, smooth{false},
390 multiPort{false}, ticksAsREEventNum{false}, lbHdrVersion{lbhdrVersion2} {}
395 static result<SegmenterFlags> getFromINI(const std::string &iniFile) noexcept;
396 };
407 Segmenter(const EjfatURI &uri, u_int16_t dataId, u_int32_t eventSrcId,
408 std::vector<int> cpuCoreList,
409 const SegmenterFlags &sflags=SegmenterFlags());
410
420 Segmenter(const EjfatURI &uri, u_int16_t dataId, u_int32_t eventSrcId,
421 const SegmenterFlags &sflags=SegmenterFlags());
422
426 Segmenter(const Segmenter &s) = delete;
430 Segmenter & operator=(const Segmenter &o) = delete;
431
436 {
437 stopThreads();
438
439#ifdef LIBURING_AVAILABLE
440 if (Optimizations::isSelected(Optimizations::Code::liburing_send))
441 {
442 for (size_t i = 0; i < rings.size(); ++i)
443 {
444 io_uring_unregister_files(&rings[i]);
445 // deallocate the ring
446 io_uring_queue_exit(&rings[i]);
447 }
448 }
449#endif
450 // pool memory is implicitly freed when pool goes out of scope
451 }
452
458 result<int> openAndStart() noexcept;
459
469 result<int> sendEvent(u_int8_t *event, size_t bytes, EventNum_t _eventNumber=0LL,
470 u_int16_t _dataId=0, u_int16_t _entropy=0) noexcept;
471
483 result<int> addToSendQueue(u_int8_t *event, size_t bytes,
484 EventNum_t _eventNum=0LL, u_int16_t _dataId = 0, u_int16_t entropy=0,
485 void (*callback)(boost::any) = nullptr,
486 boost::any cbArg = nullptr) noexcept;
487
491 inline const ReportedStats getSyncStats() const noexcept
492 {
493 return ReportedStats(syncStats);
494 }
495
499 inline const ReportedStats getSendStats() const noexcept
500 {
501 return ReportedStats(sendStats);
502 }
503
507 inline const std::string getIntf() const noexcept
508 {
509 return sendThreadState.iface;
510 }
511
515 inline u_int16_t getMTU() const noexcept
516 {
517 return sendThreadState.mtu;
518 }
519
523 inline size_t getMaxPldLen() const noexcept
524 {
525 return sendThreadState.maxPldLen;
526 }
527
531 inline bool isUsingIPv6() const noexcept
532 {
533 return sendThreadState.useV6;
534 }
535 /*
536 * Tell threads to stop
537 */
538 inline void stopThreads()
539 {
540 if (not threadsStop)
541 {
542 // wait until queue empties
543 while (not eventQueue.empty()) {}
544
545 // tell sending threads to stop and
546 // wait till they are done
547 threadsStop = true;
548 sendThreadState.threadObj.join();
549 // now we can stop the sync thread
550 syncThreadStop = true;
551 syncThreadState.threadObj.join();
552 }
553 }
554 private:
555 // Calculate the average event rate from circular buffer
556 // note that locking lives outside this function, as needed.
557 // NOTE: this is only useful to sync messages if sequential
558 // event IDs are used. When usec timestamp is used for LB event numbers
559 // the event is constant 1 MHz
560 inline EventRate_t eventRate(UnixTimeNano_t currentTimeNanos)
561 {
562 // no rate to report
563 if (eventStatsBuffer.size() == 0)
564 return 1;
565 EventNum_t eventTotal{0LL};
566 // walk the circular buffer
567 for(auto el: eventStatsBuffer)
568 {
569 // add up the events
570 eventTotal += el.eventsSinceLastSync;
571 }
572 auto timeDiff = currentTimeNanos -
573 eventStatsBuffer.begin()->lastSyncTimeNanos;
574
575 // convert to Hz and return
576 //return (eventTotal*1000000000UL)/timeDiff;
577 // this uses floating point but is more accurate at low rates
578 return std::round(static_cast<float>(eventTotal*1000000000UL)/timeDiff);
579 }
580
581 // doesn't require locking as it looks at only srcId in segmenter, which
582 // never changes past initialization
583 inline void fillSyncHdr(SyncHdr *hdr, UnixTimeNano_t tnano)
584 {
585 EventRate_t reportedRate{1000000};
586 // figure out what event number would be at this moment, don't worry about its entropy
587 auto nowT = boost::chrono::system_clock::now();
588 // Convert the time point to microseconds since the epoch
589 EventNum_t reportedEventNum = boost::chrono::duration_cast<boost::chrono::microseconds>(nowT.time_since_epoch()).count();
590 hdr->set(eventSrcId, reportedEventNum, reportedRate, tnano);
591 }
592
600 inline int_least64_t addClockEntropy(int_least64_t clockSample)
601 {
602 return (clockSample & ~0xFF) | lsbDist(ranlux);
603 }
604 };
605}
606#endif
Definition e2sarError.hpp:62
Definition e2sarUtil.hpp:56
const bool has_syncAddr() const
Definition e2sarUtil.hpp:300
const bool has_dataAddr() const
Definition e2sarUtil.hpp:294
static const bool isSelected(Code o) noexcept
Definition e2sarUtil.cpp:151
Definition e2sarDPReassembler.hpp:49
Definition e2sarDPSegmenter.hpp:48
result< int > addToSendQueue(u_int8_t *event, size_t bytes, EventNum_t _eventNum=0LL, u_int16_t _dataId=0, u_int16_t entropy=0, void(*callback)(boost::any)=nullptr, boost::any cbArg=nullptr) noexcept
Definition e2sarDPSegmenter.cpp:920
~Segmenter()
Definition e2sarDPSegmenter.hpp:435
bool isUsingIPv6() const noexcept
Definition e2sarDPSegmenter.hpp:531
const ReportedStats getSyncStats() const noexcept
Definition e2sarDPSegmenter.hpp:491
result< int > openAndStart() noexcept
Definition e2sarDPSegmenter.cpp:160
const std::string getIntf() const noexcept
Definition e2sarDPSegmenter.hpp:507
Segmenter(const EjfatURI &uri, u_int16_t dataId, u_int32_t eventSrcId, std::vector< int > cpuCoreList, const SegmenterFlags &sflags=SegmenterFlags())
Definition e2sarDPSegmenter.cpp:26
result< int > sendEvent(u_int8_t *event, size_t bytes, EventNum_t _eventNumber=0LL, u_int16_t _dataId=0, u_int16_t _entropy=0) noexcept
Definition e2sarDPSegmenter.cpp:901
size_t getMaxPldLen() const noexcept
Definition e2sarDPSegmenter.hpp:523
Segmenter(const Segmenter &s)=delete
u_int16_t getMTU() const noexcept
Definition e2sarDPSegmenter.hpp:515
Segmenter & operator=(const Segmenter &o)=delete
const ReportedStats getSendStats() const noexcept
Definition e2sarDPSegmenter.hpp:499
Definition e2sar.hpp:11
E2SARErrorc
Definition e2sarError.hpp:24
Definition e2sarError.hpp:42
Definition e2sarDPSegmenter.hpp:333
Definition e2sarDPSegmenter.hpp:371
static result< SegmenterFlags > getFromINI(const std::string &iniFile) noexcept
Definition e2sarDPSegmenter.cpp:950
Definition e2sarHeaders.hpp:324