55 const u_int16_t dataId;
59 const u_int32_t eventSrcId;
62 const size_t numSendSockets;
65 const int sndSocketBufSize;
75 static constexpr size_t QSIZE{2047};
78 static constexpr unsigned cqeBatchSize{100};
81 static constexpr boost::chrono::milliseconds sleepTime{1};
84 struct EventQueueItem {
90 void (*callback)(boost::any);
95 boost::lockfree::queue<EventQueueItem*> eventQueue{QSIZE};
97#ifdef LIBURING_AVAILABLE
98 std::vector<struct io_uring> rings;
99 std::vector<boost::mutex> ringMtxs;
103 const size_t uringSize = 1000;
109 struct msghdr* msghdr;
110 void (*callback)(boost::any);
118 UnixTimeNano_t lastSyncTimeNanos;
120 EventNum_t eventsSinceLastSync;
123 boost::circular_buffer<SendStats> eventStatsBuffer;
125 boost::atomic<UnixTimeNano_t> currentSyncStartNano{0};
126 boost::atomic<EventNum_t> eventsInCurrentSync{0};
129 boost::atomic<EventNum_t> userEventNum{0};
132 boost::random::ranlux24_base ranlux;
134 boost::random::uniform_int_distribution<> lsbDist{0, 255};
137 size_t roundRobinIndex{0};
145 std::atomic<u_int64_t> msgCnt{0};
147 std::atomic<u_int64_t> errCnt{0};
149 std::atomic<int> lastErrno{0};
151 std::atomic<E2SARErrorc> lastE2SARError{E2SARErrorc::NoError};
154 AtomicStats syncStats;
155 AtomicStats sendStats;
160 struct SyncThreadState {
163 boost::thread threadObj;
165 const u_int16_t period_ms{100};
167 const bool connectSocket{
true};
170#define GET_V4_SYNC_STRUCT(sas) boost::get<sockaddr_in>(sas)
171#define GET_V6_SYNC_STRUCT(sas) boost::get<sockaddr_in6>(sas)
172 boost::variant<sockaddr_in, sockaddr_in6> syncAddrStruct;
178 inline SyncThreadState(
Segmenter &s, u_int16_t time_period_ms,
bool cnct=
true):
180 period_ms{time_period_ms},
185 result<int> _close();
186 result<int> _send(
SyncHdr *hdr);
191 friend struct SyncThreadState;
193 SyncThreadState syncThreadState;
199 struct SendThreadState {
202 boost::thread threadObj;
206 const bool connectSocket{
true};
213 std::string iface{
""};
218#define GET_FD(sas, i) boost::get<0>(sas[i])
219#define GET_LOCAL_SEND_STRUCT(sas,i) boost::get<1>(sas[i])
220#define GET_REMOTE_SEND_STRUCT(sas, i) boost::get<2>(sas[i])
221 std::vector<boost::tuple<int, sockaddr_in, sockaddr_in>> socketFd4;
222 std::vector<boost::tuple<int, sockaddr_in6, sockaddr_in6>> socketFd6;
227 boost::random::ranlux24_base ranlux;
228 boost::random::uniform_int_distribution<> randDist{0, std::numeric_limits<u_int16_t>::max()};
230 boost::random::uniform_int_distribution<> portDist{10000, std::numeric_limits<u_int16_t>::max()};
232 inline SendThreadState(
Segmenter &s,
int idx,
bool v6, u_int16_t mtu,
bool cnct=
true):
233 seg{s}, threadIndex{idx}, connectSocket{cnct}, useV6{v6}, mtu{mtu},
234 maxPldLen{mtu - TOTAL_HDR_LEN}, socketFd4(s.numSendSockets),
235 socketFd6(s.numSendSockets), ranlux{
static_cast<u_int32_t
>(std::time(0))}
238 auto nowT = boost::chrono::system_clock::now();
239 ranlux.seed(boost::chrono::duration_cast<boost::chrono::nanoseconds>(nowT.time_since_epoch()).count());
245 result<int> _close();
247 result<int> _waitAndCloseFd(
int fd);
249 result<int> _send(u_int8_t *event,
size_t bytes, EventNum_t altEventNum, u_int16_t dataId,
250 u_int16_t entropy,
size_t roundRobinIndex,
251 void (*callback)(boost::any) =
nullptr, boost::any cbArg =
nullptr);
255 friend struct SendThreadState;
257 SendThreadState sendThreadState;
258 const size_t numSendThreads{1};
263 const std::vector<int> cpuCoreList;
265#ifdef LIBURING_AVAILABLE
268 struct CQEThreadState {
271 boost::thread threadObj;
273 inline CQEThreadState(
Segmenter &s): seg{s}
279 friend struct CQEThreadState;
281 CQEThreadState cqeThreadState;
284 boost::mutex cqeThreadMtx;
286 boost::condition_variable cqeThreadCond;
290 static constexpr boost::chrono::microseconds cqeWaitTime{200};
293 static constexpr unsigned pollWaitTime{2000};
295 boost::atomic<u_int32_t> outstandingSends{0};
299 boost::mutex sendThreadMtx;
306#define MIN_CLOCK_ENTROPY 6
312 inline void sanityChecks()
314 if (numSendSockets > 128)
315 throw E2SARException(
"Too many sending sockets threads requested, limit 128");
317 if (syncThreadState.period_ms > 10000)
320 if (sendThreadState.mtu > 9000)
329 if (sendThreadState.mtu <= TOTAL_HDR_LEN)
330 throw E2SARErrorInfo{E2SARErrorc::SocketError,
"Insufficient MTU length to accommodate headers"};
334 bool threadsStop{
false};
351 ReportedStats(
const AtomicStats &as): msgCnt{as.msgCnt}, errCnt{as.errCnt},
352 lastErrno{as.lastErrno}, lastE2SARError{as.lastE2SARError}
380 bool connectedSocket;
383 u_int16_t syncPeriodMs;
384 u_int16_t syncPeriods;
386 size_t numSendSockets;
387 int sndSocketBufSize;
392 useCP{
true}, warmUpMs{1000}, syncPeriodMs{1000}, syncPeriods{2}, mtu{1500},
393 numSendSockets{4}, sndSocketBufSize{1024*1024*3}, rateGbps{-1.0}, multiPort{
false} {}
398 static result<SegmenterFlags>
getFromINI(
const std::string &iniFile)
noexcept;
411 std::vector<int> cpuCoreList,
441#ifdef LIBURING_AVAILABLE
444 cqeThreadCond.notify_all();
448 syncThreadState.threadObj.join();
449 sendThreadState.threadObj.join();
450#ifdef LIBURING_AVAILABLE
453 cqeThreadState.threadObj.join();
454 for (
size_t i = 0; i < rings.size(); ++i)
456 io_uring_unregister_files(&rings[i]);
458 io_uring_queue_exit(&rings[i]);
481 result<
int>
sendEvent(u_int8_t *event,
size_t bytes, EventNum_t _eventNumber=0LL,
482 u_int16_t _dataId=0, u_int16_t _entropy=0) noexcept;
496 EventNum_t _eventNum=0LL, u_int16_t _dataId = 0, u_int16_t entropy=0,
497 void (*callback)(boost::any) =
nullptr,
498 boost::any cbArg =
nullptr) noexcept;
519 inline const std::string
getIntf() const noexcept
521 return sendThreadState.iface;
527 inline const u_int16_t
getMTU() const noexcept
529 return sendThreadState.mtu;
537 return sendThreadState.maxPldLen;
542 inline void stopThreads()
552 inline EventRate_t eventRate(UnixTimeNano_t currentTimeNanos)
555 if (eventStatsBuffer.size() == 0)
557 EventNum_t eventTotal{0LL};
559 for(
auto el: eventStatsBuffer)
562 eventTotal += el.eventsSinceLastSync;
564 auto timeDiff = currentTimeNanos -
565 eventStatsBuffer.begin()->lastSyncTimeNanos;
570 return std::round(
static_cast<float>(eventTotal*1000000000UL)/timeDiff);
575 inline void fillSyncHdr(SyncHdr *hdr, UnixTimeNano_t tnano)
577 EventRate_t reportedRate{1000000};
579 auto nowT = boost::chrono::system_clock::now();
581 EventNum_t reportedEventNum = boost::chrono::duration_cast<boost::chrono::microseconds>(nowT.time_since_epoch()).count();
582 hdr->set(eventSrcId, reportedEventNum, reportedRate, tnano);
592 inline int_least64_t addClockEntropy(int_least64_t clockSample)
594 return (clockSample & ~0xFF) | lsbDist(ranlux);