55 const u_int16_t dataId;
59 const u_int32_t eventSrcId;
62 const size_t numSendSockets;
65 const int sndSocketBufSize;
78 const u_int8_t lbHdrVersion;
81 static constexpr size_t QSIZE{2047};
84 static constexpr unsigned cqeBatchSize{100};
87 static constexpr boost::chrono::milliseconds sleepTime{1};
90 struct EventQueueItem {
96 void (*callback)(boost::any);
101 boost::lockfree::queue<EventQueueItem*, boost::lockfree::fixed_sized<true>> eventQueue{QSIZE};
103#ifdef LIBURING_AVAILABLE
104 std::vector<struct io_uring> rings;
105 std::vector<boost::mutex> ringMtxs;
109 const size_t uringSize = 1000;
115 struct msghdr* msghdr;
116 void (*callback)(boost::any);
124 UnixTimeNano_t lastSyncTimeNanos;
126 EventNum_t eventsSinceLastSync;
129 boost::circular_buffer<SendStats> eventStatsBuffer;
131 boost::atomic<UnixTimeNano_t> currentSyncStartNano{0};
132 boost::atomic<EventNum_t> eventsInCurrentSync{0};
135 boost::atomic<EventNum_t> userEventNum{0};
138 boost::random::ranlux24_base ranlux;
140 boost::random::uniform_int_distribution<> lsbDist{0, 255};
143 size_t roundRobinIndex{0};
151 std::atomic<u_int64_t> msgCnt{0};
153 std::atomic<u_int64_t> errCnt{0};
155 std::atomic<int> lastErrno{0};
157 std::atomic<E2SARErrorc> lastE2SARError{E2SARErrorc::NoError};
160 AtomicStats syncStats;
161 AtomicStats sendStats;
166 struct SyncThreadState {
169 boost::thread threadObj;
171 const u_int16_t period_ms{100};
173 const bool connectSocket{
true};
176#define GET_V4_SYNC_STRUCT(sas) boost::get<sockaddr_in>(sas)
177#define GET_V6_SYNC_STRUCT(sas) boost::get<sockaddr_in6>(sas)
178 boost::variant<sockaddr_in, sockaddr_in6> syncAddrStruct;
184 inline SyncThreadState(
Segmenter &s, u_int16_t time_period_ms,
bool cnct=
true):
186 period_ms{time_period_ms},
191 result<int> _close();
192 result<int> _send(
SyncHdr *hdr);
197 friend struct SyncThreadState;
199 SyncThreadState syncThreadState;
205 struct SendThreadState {
208 boost::thread threadObj;
212 const bool connectSocket{
true};
216 const bool ticksAsREEventNum;
220 std::string iface{
""};
225#define GET_FD(sas, i) boost::get<0>(sas[i])
226#define GET_LOCAL_SEND_STRUCT(sas,i) boost::get<1>(sas[i])
227#define GET_REMOTE_SEND_STRUCT(sas, i) boost::get<2>(sas[i])
228 std::vector<boost::tuple<int, sockaddr_in, sockaddr_in>> socketFd4;
229 std::vector<boost::tuple<int, sockaddr_in6, sockaddr_in6>> socketFd6;
234 boost::random::ranlux24_base ranlux;
235 boost::random::uniform_int_distribution<> randDist{0, std::numeric_limits<u_int16_t>::max()};
237 boost::random::uniform_int_distribution<> portDist{10000, std::numeric_limits<u_int16_t>::max()};
239 inline SendThreadState(
Segmenter &s,
int idx,
bool v6, u_int16_t mtu,
bool tasreenum,
bool cnct=
true):
240 seg{s}, threadIndex{idx}, connectSocket{cnct}, useV6{v6}, ticksAsREEventNum{tasreenum}, mtu{mtu},
241 maxPldLen{mtu - getTotalHeaderLength(v6)}, socketFd4(s.numSendSockets),
242 socketFd6(s.numSendSockets),
243 ranlux{
static_cast<u_int32_t
>(std::time(0))}
246 auto nowT = boost::chrono::system_clock::now();
247 ranlux.seed(boost::chrono::duration_cast<boost::chrono::nanoseconds>(nowT.time_since_epoch()).count());
253 result<int> _close();
255 result<int> _waitAndCloseFd(
int fd);
257 result<int> _send(u_int8_t *event,
size_t bytes, EventNum_t altEventNum, u_int16_t dataId,
258 u_int16_t entropy,
size_t roundRobinIndex, int64_t interFrameSleepUsec = 0,
259 void (*callback)(boost::any) =
nullptr, boost::any cbArg =
nullptr);
262#ifdef LIBURING_AVAILABLE
264 void _reap(
size_t roundRobinIndex);
267 friend struct SendThreadState;
269 SendThreadState sendThreadState;
270 const size_t numSendThreads{1};
275 const std::vector<int> cpuCoreList;
277#ifdef LIBURING_AVAILABLE
280 static constexpr unsigned pollWaitTime{2000};
282 boost::atomic<u_int32_t> outstandingSends{0};
286 boost::mutex sendThreadMtx;
293#define MIN_CLOCK_ENTROPY 6
299 inline void sanityChecks()
301 if (numSendSockets > 128)
302 throw E2SARException(
"Too many sending sockets threads requested, limit 128");
304 if (syncThreadState.period_ms > 10000)
307 if (sendThreadState.mtu > 9000)
316 if (sendThreadState.mtu <= getTotalHeaderLength(sendThreadState.useV6))
317 throw E2SARErrorInfo{E2SARErrorc::SocketError,
"Insufficient MTU length to accommodate headers"};
321 bool threadsStop{
false};
323 bool syncThreadStop{
false};
340 ReportedStats(
const AtomicStats &as): msgCnt{as.msgCnt}, errCnt{as.errCnt},
341 lastErrno{as.lastErrno}, lastE2SARError{as.lastE2SARError}
373 bool connectedSocket;
376 u_int16_t syncPeriodMs;
377 u_int16_t syncPeriods;
379 size_t numSendSockets;
380 int sndSocketBufSize;
384 bool ticksAsREEventNum;
385 u_int8_t lbHdrVersion;
388 useCP{
true}, warmUpMs{1000}, syncPeriodMs{1000}, syncPeriods{2}, mtu{1500},
389 numSendSockets{4}, sndSocketBufSize{1024*1024*3}, rateGbps{-1.0}, smooth{
false},
390 multiPort{
false}, ticksAsREEventNum{
false}, lbHdrVersion{lbhdrVersion2} {}
395 static result<SegmenterFlags>
getFromINI(
const std::string &iniFile)
noexcept;
408 std::vector<int> cpuCoreList,
439#ifdef LIBURING_AVAILABLE
442 for (
size_t i = 0; i < rings.size(); ++i)
444 io_uring_unregister_files(&rings[i]);
446 io_uring_queue_exit(&rings[i]);
469 result<
int>
sendEvent(u_int8_t *event,
size_t bytes, EventNum_t _eventNumber=0LL,
470 u_int16_t _dataId=0, u_int16_t _entropy=0) noexcept;
484 EventNum_t _eventNum=0LL, u_int16_t _dataId = 0, u_int16_t entropy=0,
485 void (*callback)(boost::any) =
nullptr,
486 boost::any cbArg =
nullptr) noexcept;
507 inline const std::string
getIntf() const noexcept
509 return sendThreadState.iface;
517 return sendThreadState.mtu;
525 return sendThreadState.maxPldLen;
533 return sendThreadState.useV6;
538 inline void stopThreads()
543 while (not eventQueue.empty()) {}
548 sendThreadState.threadObj.join();
550 syncThreadStop =
true;
551 syncThreadState.threadObj.join();
560 inline EventRate_t eventRate(UnixTimeNano_t currentTimeNanos)
563 if (eventStatsBuffer.size() == 0)
565 EventNum_t eventTotal{0LL};
567 for(
auto el: eventStatsBuffer)
570 eventTotal += el.eventsSinceLastSync;
572 auto timeDiff = currentTimeNanos -
573 eventStatsBuffer.begin()->lastSyncTimeNanos;
578 return std::round(
static_cast<float>(eventTotal*1000000000UL)/timeDiff);
583 inline void fillSyncHdr(SyncHdr *hdr, UnixTimeNano_t tnano)
585 EventRate_t reportedRate{1000000};
587 auto nowT = boost::chrono::system_clock::now();
589 EventNum_t reportedEventNum = boost::chrono::duration_cast<boost::chrono::microseconds>(nowT.time_since_epoch()).count();
590 hdr->set(eventSrcId, reportedEventNum, reportedRate, tnano);
600 inline int_least64_t addClockEntropy(int_least64_t clockSample)
602 return (clockSample & ~0xFF) | lsbDist(ranlux);