E2SAR 0.2.0
Loading...
Searching...
No Matches
e2sarDPSegmenter.hpp
1#ifndef E2SARDSEGMENTERPHPP
2#define E2SARDSEGMENTERPHPP
3
4#include <sys/types.h>
5#include <sys/socket.h>
6
7#ifdef LIBURING_AVAILABLE
8#include <liburing.h>
9#endif
10
11#include <boost/asio.hpp>
12#include <boost/lockfree/queue.hpp>
13#include <boost/pool/pool.hpp>
14#include <boost/pool/object_pool.hpp>
15#include <boost/thread.hpp>
16#include <boost/tuple/tuple.hpp>
17#include <boost/tuple/tuple_io.hpp>
18#include <boost/circular_buffer.hpp>
19#include <boost/any.hpp>
20#include <boost/asio/ip/udp.hpp>
21#include <boost/variant.hpp>
22#include <boost/random.hpp>
23#include <boost/chrono.hpp>
24
25#include <atomic>
26
27#include "e2sar.hpp"
28#include "e2sarUtil.hpp"
29#include "e2sarHeaders.hpp"
30#include "e2sarNetUtil.hpp"
31#include "portable_endian.h"
32
33/***
34 * Dataplane definitions for E2SAR Segmenter
35*/
36
37namespace e2sar
38{
39 /*
40 The Segmenter class knows how to break up the provided
41 events into segments consumable by the hardware loadbalancer.
42 It relies on header structures to segment into UDP packets and
43 follows other LB rules while doing it.
44
45 It runs on or next to the source of events.
46 */
48 {
49 friend class Reassembler;
50 private:
51 EjfatURI dpuri;
52 // unique identifier of the originating segmentation
53 // point (e.g. a DAQ), carried in RE header (could be persistent
54 // as set here, or specified per event buffer)
55 const u_int16_t dataId;
56 // unique identifier of an individual LB packet transmitting
57 // host/daq, 32-bit to accommodate IP addresses more easily
58 // carried in Sync header
59 const u_int32_t eventSrcId;
60
61 // number of send sockets we will be using (to help randomize LAG ports on FPGAs)
62 const size_t numSendSockets;
63
64 // send socket buffer size for setsockop
65 const int sndSocketBufSize;
66
67 // send rate (ignore if negative)
68 const float rateGbps;
69 // used to avoid floating point comparisons, set to false if rateGbps <= 0
70 const bool rateLimit;
71 // use multiple destination ports (for back-to-back testing only)
72 const bool multiPort;
73
74 // Max size of internal queue holding events to be sent.
75 static constexpr size_t QSIZE{2047};
76
77 // size of CQE batch we peek
78 static constexpr unsigned cqeBatchSize{100};
79
80 // how long data send thread spends sleeping
81 static constexpr boost::chrono::milliseconds sleepTime{1};
82
83 // Structure to hold each send-queue item
84 struct EventQueueItem {
85 uint32_t bytes;
86 EventNum_t eventNum;
87 u_int16_t dataId;
88 u_int8_t *event;
89 u_int16_t entropy; // optional per event entropy
90 void (*callback)(boost::any);
91 boost::any cbArg;
92 };
93
94 // Fast, lock-free, wait-free queue (supports multiple producers/consumers)
95 boost::lockfree::queue<EventQueueItem*> eventQueue{QSIZE};
96
97#ifdef LIBURING_AVAILABLE
98 std::vector<struct io_uring> rings;
99 std::vector<boost::mutex> ringMtxs;
100
101 // each ring has to have a predefined size - we want to
102 // put at least 2*eventSize/bufferSize entries onto it
103 const size_t uringSize = 1000;
104
105 // we need to be able to call the callback from the CQE thread
106 // instead of send thread when liburing optimization is turned on
107 struct SQEUserData
108 {
109 struct msghdr* msghdr;
110 void (*callback)(boost::any);
111 boost::any cbArg;
112 };
113#endif
114
115 // structure that maintains send stats
116 struct SendStats {
117 // Last time a sync message sent to CP in nanosec since epoch.
118 UnixTimeNano_t lastSyncTimeNanos;
119 // Number of events sent since last sync message sent to CP.
120 EventNum_t eventsSinceLastSync;
121 };
122 // event metadata fifo to keep track of stats
123 boost::circular_buffer<SendStats> eventStatsBuffer;
124 // keep these atomic, as they are accessed by Sync, Send (and maybe main) thread
125 boost::atomic<UnixTimeNano_t> currentSyncStartNano{0};
126 boost::atomic<EventNum_t> eventsInCurrentSync{0};
127
128 // currently user-assigned or sequential event number at enqueuing and reported in RE header
129 boost::atomic<EventNum_t> userEventNum{0};
130
131 // fast random number generator
132 boost::random::ranlux24_base ranlux;
133 // to get better entropy in usec clock samples (if needed)
134 boost::random::uniform_int_distribution<> lsbDist{0, 255};
135
136 // we RR through these FDs for send thread _send
137 size_t roundRobinIndex{0};
138
143 struct AtomicStats {
144 // sync messages sent
145 std::atomic<u_int64_t> msgCnt{0};
146 // sync errors seen on send
147 std::atomic<u_int64_t> errCnt{0};
148 // last error code
149 std::atomic<int> lastErrno{0};
150 // last e2sar error
151 std::atomic<E2SARErrorc> lastE2SARError{E2SARErrorc::NoError};
152 };
153 // independent stats for each thread
154 AtomicStats syncStats;
155 AtomicStats sendStats;
156
160 struct SyncThreadState {
161 // owner object
162 Segmenter &seg;
163 boost::thread threadObj;
164 // period in ms
165 const u_int16_t period_ms{100};
166 // connect socket flag (usually true)
167 const bool connectSocket{true};
168 // sockaddr_in[6] union (use boost::get<sockaddr_in> or
169 // boost::get<sockaddr_in6> to get to the appropriate structure)
170#define GET_V4_SYNC_STRUCT(sas) boost::get<sockaddr_in>(sas)
171#define GET_V6_SYNC_STRUCT(sas) boost::get<sockaddr_in6>(sas)
172 boost::variant<sockaddr_in, sockaddr_in6> syncAddrStruct;
173 // flag that tells us we are v4 or v6
174 bool isV6{false};
175 // UDP sockets
176 int socketFd{0};
177
178 inline SyncThreadState(Segmenter &s, u_int16_t time_period_ms, bool cnct=true):
179 seg{s},
180 period_ms{time_period_ms},
181 connectSocket{cnct}
182 {}
183
184 result<int> _open();
185 result<int> _close();
186 result<int> _send(SyncHdr *hdr);
187 void _threadBody();
188
189
190 };
191 friend struct SyncThreadState;
192
193 SyncThreadState syncThreadState;
194
199 struct SendThreadState {
200 // owner object
201 Segmenter &seg;
202 boost::thread threadObj;
203 // thread index (to help pick core)
204 int threadIndex;
205 // connect socket flag (usually true)
206 const bool connectSocket{true};
207
208 // flags
209 const bool useV6;
210
211 // transmit parameters
212 size_t mtu{0}; // must accommodate typical IP, UDP, LB+RE headers and payload; not a const because we may change it
213 std::string iface{""}; // outgoing interface - we may set it if possible
214 size_t maxPldLen; // not a const because mtu is not a const
215
216 // UDP sockets and matching sockaddr structures (local and remote)
217 // <socket fd, local address, remote address>
218#define GET_FD(sas, i) boost::get<0>(sas[i])
219#define GET_LOCAL_SEND_STRUCT(sas,i) boost::get<1>(sas[i])
220#define GET_REMOTE_SEND_STRUCT(sas, i) boost::get<2>(sas[i])
221 std::vector<boost::tuple<int, sockaddr_in, sockaddr_in>> socketFd4;
222 std::vector<boost::tuple<int, sockaddr_in6, sockaddr_in6>> socketFd6;
223
224 // fast random number generator to create entropy values for events
225 // this entropy value is held the same for all packets of a given
226 // event guaranteeing the same destination UDP port for all of them
227 boost::random::ranlux24_base ranlux;
228 boost::random::uniform_int_distribution<> randDist{0, std::numeric_limits<u_int16_t>::max()};
229 // to get random port numbers we skip low numbered privileged ports
230 boost::random::uniform_int_distribution<> portDist{10000, std::numeric_limits<u_int16_t>::max()};
231
232 inline SendThreadState(Segmenter &s, int idx, bool v6, u_int16_t mtu, bool cnct=true):
233 seg{s}, threadIndex{idx}, connectSocket{cnct}, useV6{v6}, mtu{mtu},
234 maxPldLen{mtu - TOTAL_HDR_LEN}, socketFd4(s.numSendSockets),
235 socketFd6(s.numSendSockets), ranlux{static_cast<u_int32_t>(std::time(0))}
236 {
237 // this way every segmenter send thread has a unique PRNG sequence
238 auto nowT = boost::chrono::system_clock::now();
239 ranlux.seed(boost::chrono::duration_cast<boost::chrono::nanoseconds>(nowT.time_since_epoch()).count());
240 }
241
242 // open v4/v6 sockets
243 result<int> _open();
244 // close sockets
245 result<int> _close();
246 // close a given socket, wait that it has sent all the data (in Linux)
247 result<int> _waitAndCloseFd(int fd);
248 // fragment and send the event
249 result<int> _send(u_int8_t *event, size_t bytes, EventNum_t altEventNum, u_int16_t dataId,
250 u_int16_t entropy, size_t roundRobinIndex,
251 void (*callback)(boost::any) = nullptr, boost::any cbArg = nullptr);
252 // thread loop
253 void _threadBody();
254 };
255 friend struct SendThreadState;
256
257 SendThreadState sendThreadState;
258 const size_t numSendThreads{1};
259 // list of cores we can use to run threads
260 // can be longer than the number of threads
261 // thread at index i uses core cpuCoreList[i]
262 // we don't check cores are unique
263 const std::vector<int> cpuCoreList;
264
265#ifdef LIBURING_AVAILABLE
266 // this thread reaps completions off the uring
267 // and updates stat counters
268 struct CQEThreadState {
269 // owner object
270 Segmenter &seg;
271 boost::thread threadObj;
272
273 inline CQEThreadState(Segmenter &s): seg{s}
274 {
275 ;
276 }
277 void _threadBody();
278 };
279 friend struct CQEThreadState;
280
281 CQEThreadState cqeThreadState;
282
283 // between send and CQE reaping thread
284 boost::mutex cqeThreadMtx;
285 // condition variable for CQE thread waiting on new submissions
286 boost::condition_variable cqeThreadCond;
287 // wait time in ms before CQE thread checks if its time to stop
288 // we can't wait for too long - it only takes about 300usec
289 // to exhaust 256 SQEs using 1500 byte MTU at 10Gbps
290 static constexpr boost::chrono::microseconds cqeWaitTime{200};
291 // this is the sleep time for kernel thread in poll mode
292 // it is in milliseconds
293 static constexpr unsigned pollWaitTime{2000};
294 // atomic counter of outstanging sends
295 boost::atomic<u_int32_t> outstandingSends{0};
296#endif
297
298 // lock with send thread
299 boost::mutex sendThreadMtx;
300 // condition variable for send thread
301 //boost::condition_variable sendThreadCond;
302 // warm up period in MS between sync thread starting and data being allowed to be sent
303 u_int16_t warmUpMs;
304 // use control plane (can be disabled for debugging)
305 bool useCP;
306#define MIN_CLOCK_ENTROPY 6
307 bool addEntropy;
308
312 inline void sanityChecks()
313 {
314 if (numSendSockets > 128)
315 throw E2SARException("Too many sending sockets threads requested, limit 128");
316
317 if (syncThreadState.period_ms > 10000)
318 throw E2SARException("Sync period too long, limit 10s");
319
320 if (sendThreadState.mtu > 9000)
321 throw E2SARException("MTU set too long, limit 9000");
322
323 if (useCP and not dpuri.has_syncAddr())
324 throw E2SARException("Sync address not present in the URI");
325
326 if (not dpuri.has_dataAddr())
327 throw E2SARException("Data address is not present in the URI");
328
329 if (sendThreadState.mtu <= TOTAL_HDR_LEN)
330 throw E2SARErrorInfo{E2SARErrorc::SocketError, "Insufficient MTU length to accommodate headers"};
331 }
332
334 bool threadsStop{false};
335 public:
345 u_int64_t msgCnt;
346 u_int64_t errCnt;
347 int lastErrno;
348 E2SARErrorc lastE2SARError;
349
350 ReportedStats() = delete;
351 ReportedStats(const AtomicStats &as): msgCnt{as.msgCnt}, errCnt{as.errCnt},
352 lastErrno{as.lastErrno}, lastE2SARError{as.lastE2SARError}
353 {}
354 };
355
378 {
379 bool dpV6;
380 bool connectedSocket;
381 bool useCP;
382 u_int16_t warmUpMs;
383 u_int16_t syncPeriodMs;
384 u_int16_t syncPeriods;
385 u_int16_t mtu;
386 size_t numSendSockets;
387 int sndSocketBufSize;
388 float rateGbps;
389 bool multiPort;
390
391 SegmenterFlags(): dpV6{false}, connectedSocket{true},
392 useCP{true}, warmUpMs{1000}, syncPeriodMs{1000}, syncPeriods{2}, mtu{1500},
393 numSendSockets{4}, sndSocketBufSize{1024*1024*3}, rateGbps{-1.0}, multiPort{false} {}
398 static result<SegmenterFlags> getFromINI(const std::string &iniFile) noexcept;
399 };
410 Segmenter(const EjfatURI &uri, u_int16_t dataId, u_int32_t eventSrcId,
411 std::vector<int> cpuCoreList,
412 const SegmenterFlags &sflags=SegmenterFlags());
413
423 Segmenter(const EjfatURI &uri, u_int16_t dataId, u_int32_t eventSrcId,
424 const SegmenterFlags &sflags=SegmenterFlags());
425
429 Segmenter(const Segmenter &s) = delete;
433 Segmenter & operator=(const Segmenter &o) = delete;
434
439 {
440 stopThreads();
441#ifdef LIBURING_AVAILABLE
442 // release CQE thread one last time so it can quit in peace
443 if (Optimizations::isSelected(Optimizations::Code::liburing_send))
444 cqeThreadCond.notify_all();
445#endif
446
447 // wait to exit
448 syncThreadState.threadObj.join();
449 sendThreadState.threadObj.join();
450#ifdef LIBURING_AVAILABLE
451 if (Optimizations::isSelected(Optimizations::Code::liburing_send))
452 {
453 cqeThreadState.threadObj.join();
454 for (size_t i = 0; i < rings.size(); ++i)
455 {
456 io_uring_unregister_files(&rings[i]);
457 // deallocate the ring
458 io_uring_queue_exit(&rings[i]);
459 }
460 }
461#endif
462 // pool memory is implicitly freed when pool goes out of scope
463 }
464
470 result<int> openAndStart() noexcept;
471
481 result<int> sendEvent(u_int8_t *event, size_t bytes, EventNum_t _eventNumber=0LL,
482 u_int16_t _dataId=0, u_int16_t _entropy=0) noexcept;
483
495 result<int> addToSendQueue(u_int8_t *event, size_t bytes,
496 EventNum_t _eventNum=0LL, u_int16_t _dataId = 0, u_int16_t entropy=0,
497 void (*callback)(boost::any) = nullptr,
498 boost::any cbArg = nullptr) noexcept;
499
503 inline const ReportedStats getSyncStats() const noexcept
504 {
505 return ReportedStats(syncStats);
506 }
507
511 inline const ReportedStats getSendStats() const noexcept
512 {
513 return ReportedStats(sendStats);
514 }
515
519 inline const std::string getIntf() const noexcept
520 {
521 return sendThreadState.iface;
522 }
523
527 inline const u_int16_t getMTU() const noexcept
528 {
529 return sendThreadState.mtu;
530 }
531
535 inline const size_t getMaxPldLen() const noexcept
536 {
537 return sendThreadState.maxPldLen;
538 }
539 /*
540 * Tell threads to stop
541 */
542 inline void stopThreads()
543 {
544 threadsStop = true;
545 }
546 private:
547 // Calculate the average event rate from circular buffer
548 // note that locking lives outside this function, as needed.
549 // NOTE: this is only useful to sync messages if sequential
550 // event IDs are used. When usec timestamp is used for LB event numbers
551 // the event is constant 1 MHz
552 inline EventRate_t eventRate(UnixTimeNano_t currentTimeNanos)
553 {
554 // no rate to report
555 if (eventStatsBuffer.size() == 0)
556 return 1;
557 EventNum_t eventTotal{0LL};
558 // walk the circular buffer
559 for(auto el: eventStatsBuffer)
560 {
561 // add up the events
562 eventTotal += el.eventsSinceLastSync;
563 }
564 auto timeDiff = currentTimeNanos -
565 eventStatsBuffer.begin()->lastSyncTimeNanos;
566
567 // convert to Hz and return
568 //return (eventTotal*1000000000UL)/timeDiff;
569 // this uses floating point but is more accurate at low rates
570 return std::round(static_cast<float>(eventTotal*1000000000UL)/timeDiff);
571 }
572
573 // doesn't require locking as it looks at only srcId in segmenter, which
574 // never changes past initialization
575 inline void fillSyncHdr(SyncHdr *hdr, UnixTimeNano_t tnano)
576 {
577 EventRate_t reportedRate{1000000};
578 // figure out what event number would be at this moment, don't worry about its entropy
579 auto nowT = boost::chrono::system_clock::now();
580 // Convert the time point to microseconds since the epoch
581 EventNum_t reportedEventNum = boost::chrono::duration_cast<boost::chrono::microseconds>(nowT.time_since_epoch()).count();
582 hdr->set(eventSrcId, reportedEventNum, reportedRate, tnano);
583 }
584
592 inline int_least64_t addClockEntropy(int_least64_t clockSample)
593 {
594 return (clockSample & ~0xFF) | lsbDist(ranlux);
595 }
596 };
597}
598#endif
Definition e2sarError.hpp:61
Definition e2sarUtil.hpp:31
const bool has_syncAddr() const
Definition e2sarUtil.hpp:237
const bool has_dataAddr() const
Definition e2sarUtil.hpp:231
static const bool isSelected(Code o) noexcept
Definition e2sarUtil.cpp:150
Definition e2sarDPReassembler.hpp:49
Definition e2sarDPSegmenter.hpp:48
result< int > addToSendQueue(u_int8_t *event, size_t bytes, EventNum_t _eventNum=0LL, u_int16_t _dataId=0, u_int16_t entropy=0, void(*callback)(boost::any)=nullptr, boost::any cbArg=nullptr) noexcept
Definition e2sarDPSegmenter.cpp:913
~Segmenter()
Definition e2sarDPSegmenter.hpp:438
const ReportedStats getSyncStats() const noexcept
Definition e2sarDPSegmenter.hpp:503
result< int > openAndStart() noexcept
Definition e2sarDPSegmenter.cpp:155
const std::string getIntf() const noexcept
Definition e2sarDPSegmenter.hpp:519
const size_t getMaxPldLen() const noexcept
Definition e2sarDPSegmenter.hpp:535
Segmenter(const EjfatURI &uri, u_int16_t dataId, u_int32_t eventSrcId, std::vector< int > cpuCoreList, const SegmenterFlags &sflags=SegmenterFlags())
Definition e2sarDPSegmenter.cpp:25
result< int > sendEvent(u_int8_t *event, size_t bytes, EventNum_t _eventNumber=0LL, u_int16_t _dataId=0, u_int16_t _entropy=0) noexcept
Definition e2sarDPSegmenter.cpp:894
const u_int16_t getMTU() const noexcept
Definition e2sarDPSegmenter.hpp:527
Segmenter(const Segmenter &s)=delete
Segmenter & operator=(const Segmenter &o)=delete
const ReportedStats getSendStats() const noexcept
Definition e2sarDPSegmenter.hpp:511
Definition e2sar.hpp:11
E2SARErrorc
Definition e2sarError.hpp:24
Definition e2sarError.hpp:41
Definition e2sarDPSegmenter.hpp:344
Definition e2sarDPSegmenter.hpp:378
static result< SegmenterFlags > getFromINI(const std::string &iniFile) noexcept
Definition e2sarDPSegmenter.cpp:941
Definition e2sarHeaders.hpp:173