|
| Reassembler (const EjfatURI &uri, ip::address data_ip, u_int16_t starting_port, std::vector< int > cpuCoreList, const ReassemblerFlags &rflags=ReassemblerFlags()) |
|
| Reassembler (const EjfatURI &uri, ip::address data_ip, u_int16_t starting_port, size_t numRecvThreads=1, const ReassemblerFlags &rflags=ReassemblerFlags()) |
|
| Reassembler (const EjfatURI &uri, u_int16_t starting_port, std::vector< int > cpuCoreList, const ReassemblerFlags &rflags=ReassemblerFlags(), bool v6=false) |
|
| Reassembler (const EjfatURI &uri, u_int16_t starting_port, size_t numRecvThreads=1, const ReassemblerFlags &rflags=ReassemblerFlags(), bool v6=false) |
|
| Reassembler (const Reassembler &r)=delete |
|
Reassembler & | operator= (const Reassembler &o)=delete |
|
result< int > | registerWorker (const std::string &node_name) noexcept |
|
result< int > | deregisterWorker () noexcept |
|
result< int > | openAndStart () noexcept |
|
result< int > | getEvent (uint8_t **event, size_t *bytes, EventNum_t *eventNum, uint16_t *dataId) noexcept |
|
result< int > | recvEvent (uint8_t **event, size_t *bytes, EventNum_t *eventNum, uint16_t *dataId, u_int64_t wait_ms=0) noexcept |
|
const ReportedStats | getStats () const noexcept |
|
result< boost::tuple< EventNum_t, u_int16_t, size_t > > | get_LostEvent () noexcept |
|
result< std::list< std::pair< u_int16_t, size_t > > > | get_FDStats () noexcept |
|
const size_t | get_numRecvThreads () const noexcept |
|
const std::pair< int, int > | get_recvPorts () const noexcept |
|
const int | get_portRange () const noexcept |
|
const ip::address | get_dataIP () const noexcept |
|
void | stopThreads () |
|
|
class | Segmenter |
|
struct | RecvThreadState |
|
struct | sendStateThreadState |
|
◆ Reassembler() [1/4]
Create a reassembler object to run receive on a specific set of CPU cores We assume you picked the CPU core list by studying CPU-to-NUMA affinity for the receiver NIC on the target system. The number of started receive threads will match the number of cores on the list. For the started receive threads affinity will be set to these cores.
- Parameters
-
uri | - EjfatURI with lb_id and instance token, so we can register a worker and then SendState |
data_ip | - IP address (v4 or v6) on which we are listening |
starting_port | - starting port number on which we are listening |
cpuCoreList | - list of core identifiers to be used for receive threads |
rflags | - optional ReassemblerFlags structure with additional flags |
◆ Reassembler() [2/4]
Create a reassembler object to run on a specified number of receive threads without taking into account thread-to-CPU and CPU-to-NUMA affinity.
- Parameters
-
uri | - EjfatURI with lb_id and instance token, so we can register a worker and then SendState |
data_ip | - IP address (v4 or v6) on which we are listening |
starting_port | - starting port number on which we are listening |
numRecvThreads | - number of threads |
rflags | - optional ReassemblerFlags structure with additional flags |
◆ Reassembler() [3/4]
Create a reassembler object to run receive on a specific set of CPU cores We assume you picked the CPU core list by studying CPU-to-NUMA affinity for the receiver NIC on the target system. The number of started receive threads will match the number of cores on the list. For the started receive threads affinity will be set to these cores. This method attempts to auto-detect the outgoing IP address
- Parameters
-
uri | - EjfatURI with lb_id and instance token, so we can register a worker and then SendState |
starting_port | - starting port number on which we are listening |
cpuCoreList | - list of core identifiers to be used for receive threads |
rflags | - optional ReassemblerFlags structure with additional flags |
v6 | - use IPv6 dataplane |
◆ Reassembler() [4/4]
Create a reassembler object to run on a specified number of receive threads without taking into account thread-to-CPU and CPU-to-NUMA affinity. This method attempts to auto-detect the outgoing IP address
- Parameters
-
uri | - EjfatURI with lb_id and instance token, so we can register a worker and then SendState |
starting_port | - starting port number on which we are listening |
numRecvThreads | - number of threads |
rflags | - optional ReassemblerFlags structure with additional flags |
v6 | - use IPv6 dataplane |
◆ deregisterWorker()
result< int > e2sar::Reassembler::deregisterWorker |
( |
| ) |
|
|
noexcept |
Deregister this worker
- Returns
- - 0 on success or an error condition
◆ get_dataIP()
const ip::address e2sar::Reassembler::get_dataIP |
( |
| ) |
const |
|
inlinenoexcept |
Get the data IP address to be used for communicating to the dataplane
◆ get_FDStats()
result< std::list< std::pair< u_int16_t, size_t > > > e2sar::Reassembler::get_FDStats |
( |
| ) |
|
|
inlinenoexcept |
Get per-port/per-fd fragments received stats. Only call this after the threads have been stopped, otherwise error is returned.
- Returns
- - list of pairs <port, number of received fragments> or error
◆ get_LostEvent()
result< boost::tuple< EventNum_t, u_int16_t, size_t > > e2sar::Reassembler::get_LostEvent |
( |
| ) |
|
|
inlinenoexcept |
Try to pop an event number of a lost event from the queue that stores them
- Returns
- result with either (eventNumber,dataId) or E2SARErrorc::NotFound if queue is empty
◆ get_numRecvThreads()
const size_t e2sar::Reassembler::get_numRecvThreads |
( |
| ) |
const |
|
inlinenoexcept |
◆ get_portRange()
const int e2sar::Reassembler::get_portRange |
( |
| ) |
const |
|
inlinenoexcept |
Get the port range that will be communicated to CP (this is either specified explicitly as part of ReassemblerFlags or computed from the number of cores or threads requested)
◆ get_recvPorts()
const std::pair< int, int > e2sar::Reassembler::get_recvPorts |
( |
| ) |
const |
|
inlinenoexcept |
Get the ports this reassembler is listening on, returned as a pair <start port, end port>
◆ getEvent()
result< int > e2sar::Reassembler::getEvent |
( |
uint8_t ** | event, |
|
|
size_t * | bytes, |
|
|
EventNum_t * | eventNum, |
|
|
uint16_t * | dataId ) |
|
noexcept |
A non-blocking call to get an assembled event off a reassembled event queue
- Parameters
-
event | - event buffer |
bytes | - size of the event in the buffer |
eventNum | - the assembled event number |
dataId | - dataId from the reassembly header identifying the DAQ |
- Returns
- - result structure, check has_error() method or value() which is 0 on success and -1 if the queue was empty.
◆ getStats()
Get a struct representing all the stats:
- EventNum_t enqueueLoss; // number of events received and lost on enqueue
- EventNum_t reassemblyLoss; // number of events lost in reassembly due to missing segments
- EventNum_t eventSuccess; // events successfully processed
- int lastErrno;
- int grpcErrCnt;
- int dataErrCnt;
- E2SARErrorc lastE2SARError;
◆ openAndStart()
result< int > e2sar::Reassembler::openAndStart |
( |
| ) |
|
|
noexcept |
Open sockets and start the threads - this marks the moment from which we are listening for incoming packets, assembling them into event buffers and putting them into the queue.
- Returns
- - 0 on success, otherwise error condition
◆ recvEvent()
result< int > e2sar::Reassembler::recvEvent |
( |
uint8_t ** | event, |
|
|
size_t * | bytes, |
|
|
EventNum_t * | eventNum, |
|
|
uint16_t * | dataId, |
|
|
u_int64_t | wait_ms = 0 ) |
|
noexcept |
Blocking variant of getEvent() with same parameter semantics
- Parameters
-
event | - event buffer |
bytes | - size of the event in the buffer |
eventNum | - the assembled event number |
dataId | - dataId from the reassembly header identifying the DAQ |
wait_ms | - how long to block before giving up, defaults to 0 - forever |
- Returns
- - result structure, check has_error() method or value() which is 0
◆ registerWorker()
result< int > e2sar::Reassembler::registerWorker |
( |
const std::string & | node_name | ) |
|
|
noexcept |
Register a worker with the control plane
- Parameters
-
node_name | - name of this node (any unique string) |
- Returns
- - 0 on success or an error condition
◆ stopThreads()
void e2sar::Reassembler::stopThreads |
( |
| ) |
|
|
inline |
Tell threads to stop. Also causes recvEvent to exit with (-1)
The documentation for this class was generated from the following files: