E2SAR 0.2.0
All Data Structures Namespaces Functions Typedefs Enumerations
e2sar::Reassembler Class Reference

Data Structures

struct  ReassemblerFlags
 
struct  ReportedStats
 

Public Member Functions

 Reassembler (const EjfatURI &uri, ip::address data_ip, u_int16_t starting_port, std::vector< int > cpuCoreList, const ReassemblerFlags &rflags=ReassemblerFlags())
 
 Reassembler (const EjfatURI &uri, ip::address data_ip, u_int16_t starting_port, size_t numRecvThreads=1, const ReassemblerFlags &rflags=ReassemblerFlags())
 
 Reassembler (const EjfatURI &uri, u_int16_t starting_port, std::vector< int > cpuCoreList, const ReassemblerFlags &rflags=ReassemblerFlags(), bool v6=false)
 
 Reassembler (const EjfatURI &uri, u_int16_t starting_port, size_t numRecvThreads=1, const ReassemblerFlags &rflags=ReassemblerFlags(), bool v6=false)
 
 Reassembler (const Reassembler &r)=delete
 
Reassembleroperator= (const Reassembler &o)=delete
 
result< int > registerWorker (const std::string &node_name) noexcept
 
result< int > deregisterWorker () noexcept
 
result< int > openAndStart () noexcept
 
result< int > getEvent (uint8_t **event, size_t *bytes, EventNum_t *eventNum, uint16_t *dataId) noexcept
 
result< int > recvEvent (uint8_t **event, size_t *bytes, EventNum_t *eventNum, uint16_t *dataId, u_int64_t wait_ms=0) noexcept
 
const ReportedStats getStats () const noexcept
 
result< boost::tuple< EventNum_t, u_int16_t, size_t > > get_LostEvent () noexcept
 
result< std::list< std::pair< u_int16_t, size_t > > > get_FDStats () noexcept
 
const size_t get_numRecvThreads () const noexcept
 
const std::pair< int, int > get_recvPorts () const noexcept
 
const int get_portRange () const noexcept
 
const ip::address get_dataIP () const noexcept
 
void stopThreads ()
 

Friends

class Segmenter
 
struct RecvThreadState
 
struct sendStateThreadState
 

Constructor & Destructor Documentation

◆ Reassembler() [1/4]

e2sar::Reassembler::Reassembler ( const EjfatURI & uri,
ip::address data_ip,
u_int16_t starting_port,
std::vector< int > cpuCoreList,
const ReassemblerFlags & rflags = ReassemblerFlags() )

Create a reassembler object to run receive on a specific set of CPU cores We assume you picked the CPU core list by studying CPU-to-NUMA affinity for the receiver NIC on the target system. The number of started receive threads will match the number of cores on the list. For the started receive threads affinity will be set to these cores.

Parameters
uri- EjfatURI with lb_id and instance token, so we can register a worker and then SendState
data_ip- IP address (v4 or v6) on which we are listening
starting_port- starting port number on which we are listening
cpuCoreList- list of core identifiers to be used for receive threads
rflags- optional ReassemblerFlags structure with additional flags

◆ Reassembler() [2/4]

e2sar::Reassembler::Reassembler ( const EjfatURI & uri,
ip::address data_ip,
u_int16_t starting_port,
size_t numRecvThreads = 1,
const ReassemblerFlags & rflags = ReassemblerFlags() )

Create a reassembler object to run on a specified number of receive threads without taking into account thread-to-CPU and CPU-to-NUMA affinity.

Parameters
uri- EjfatURI with lb_id and instance token, so we can register a worker and then SendState
data_ip- IP address (v4 or v6) on which we are listening
starting_port- starting port number on which we are listening
numRecvThreads- number of threads
rflags- optional ReassemblerFlags structure with additional flags

◆ Reassembler() [3/4]

e2sar::Reassembler::Reassembler ( const EjfatURI & uri,
u_int16_t starting_port,
std::vector< int > cpuCoreList,
const ReassemblerFlags & rflags = ReassemblerFlags(),
bool v6 = false )

Create a reassembler object to run receive on a specific set of CPU cores We assume you picked the CPU core list by studying CPU-to-NUMA affinity for the receiver NIC on the target system. The number of started receive threads will match the number of cores on the list. For the started receive threads affinity will be set to these cores. This method attempts to auto-detect the outgoing IP address

Parameters
uri- EjfatURI with lb_id and instance token, so we can register a worker and then SendState
starting_port- starting port number on which we are listening
cpuCoreList- list of core identifiers to be used for receive threads
rflags- optional ReassemblerFlags structure with additional flags
v6- use IPv6 dataplane

◆ Reassembler() [4/4]

e2sar::Reassembler::Reassembler ( const EjfatURI & uri,
u_int16_t starting_port,
size_t numRecvThreads = 1,
const ReassemblerFlags & rflags = ReassemblerFlags(),
bool v6 = false )

Create a reassembler object to run on a specified number of receive threads without taking into account thread-to-CPU and CPU-to-NUMA affinity. This method attempts to auto-detect the outgoing IP address

Parameters
uri- EjfatURI with lb_id and instance token, so we can register a worker and then SendState
starting_port- starting port number on which we are listening
numRecvThreads- number of threads
rflags- optional ReassemblerFlags structure with additional flags
v6- use IPv6 dataplane

Member Function Documentation

◆ deregisterWorker()

result< int > e2sar::Reassembler::deregisterWorker ( )
noexcept

Deregister this worker

Returns
- 0 on success or an error condition

◆ get_dataIP()

const ip::address e2sar::Reassembler::get_dataIP ( ) const
inlinenoexcept

Get the data IP address to be used for communicating to the dataplane

◆ get_FDStats()

result< std::list< std::pair< u_int16_t, size_t > > > e2sar::Reassembler::get_FDStats ( )
inlinenoexcept

Get per-port/per-fd fragments received stats. Only call this after the threads have been stopped, otherwise error is returned.

Returns
- list of pairs <port, number of received fragments> or error

◆ get_LostEvent()

result< boost::tuple< EventNum_t, u_int16_t, size_t > > e2sar::Reassembler::get_LostEvent ( )
inlinenoexcept

Try to pop an event number of a lost event from the queue that stores them

Returns
result with either (eventNumber,dataId) or E2SARErrorc::NotFound if queue is empty

◆ get_numRecvThreads()

const size_t e2sar::Reassembler::get_numRecvThreads ( ) const
inlinenoexcept

Get the number of threads this Reassembler is using

◆ get_portRange()

const int e2sar::Reassembler::get_portRange ( ) const
inlinenoexcept

Get the port range that will be communicated to CP (this is either specified explicitly as part of ReassemblerFlags or computed from the number of cores or threads requested)

◆ get_recvPorts()

const std::pair< int, int > e2sar::Reassembler::get_recvPorts ( ) const
inlinenoexcept

Get the ports this reassembler is listening on, returned as a pair <start port, end port>

◆ getEvent()

result< int > e2sar::Reassembler::getEvent ( uint8_t ** event,
size_t * bytes,
EventNum_t * eventNum,
uint16_t * dataId )
noexcept

A non-blocking call to get an assembled event off a reassembled event queue

Parameters
event- event buffer
bytes- size of the event in the buffer
eventNum- the assembled event number
dataId- dataId from the reassembly header identifying the DAQ
Returns
- result structure, check has_error() method or value() which is 0 on success and -1 if the queue was empty.

◆ getStats()

const ReportedStats e2sar::Reassembler::getStats ( ) const
inlinenoexcept

Get a struct representing all the stats:

  • EventNum_t enqueueLoss; // number of events received and lost on enqueue
  • EventNum_t reassemblyLoss; // number of events lost in reassembly due to missing segments
  • EventNum_t eventSuccess; // events successfully processed
  • int lastErrno;
  • int grpcErrCnt;
  • int dataErrCnt;
  • E2SARErrorc lastE2SARError;

◆ openAndStart()

result< int > e2sar::Reassembler::openAndStart ( )
noexcept

Open sockets and start the threads - this marks the moment from which we are listening for incoming packets, assembling them into event buffers and putting them into the queue.

Returns
- 0 on success, otherwise error condition

◆ recvEvent()

result< int > e2sar::Reassembler::recvEvent ( uint8_t ** event,
size_t * bytes,
EventNum_t * eventNum,
uint16_t * dataId,
u_int64_t wait_ms = 0 )
noexcept

Blocking variant of getEvent() with same parameter semantics

Parameters
event- event buffer
bytes- size of the event in the buffer
eventNum- the assembled event number
dataId- dataId from the reassembly header identifying the DAQ
wait_ms- how long to block before giving up, defaults to 0 - forever
Returns
- result structure, check has_error() method or value() which is 0

◆ registerWorker()

result< int > e2sar::Reassembler::registerWorker ( const std::string & node_name)
noexcept

Register a worker with the control plane

Parameters
node_name- name of this node (any unique string)
Returns
- 0 on success or an error condition

◆ stopThreads()

void e2sar::Reassembler::stopThreads ( )
inline

Tell threads to stop. Also causes recvEvent to exit with (-1)


The documentation for this class was generated from the following files: