5#include <boost/asio.hpp>
6#include <boost/tuple/tuple.hpp>
9#include <grpcpp/channel.h>
10#include <grpcpp/client_context.h>
11#include <grpcpp/create_channel.h>
12#include <grpcpp/security/credentials.h>
14#include <google/protobuf/util/time_util.h>
16#include <boost/date_time/posix_time/posix_time.hpp>
18#include "grpc/loadbalancer.grpc.pb.h"
20#include "e2sarUtil.hpp"
23using grpc::ClientContext;
24using grpc::ClientReader;
25using grpc::ClientReaderWriter;
26using grpc::ClientWriter;
29using loadbalancer::LoadBalancer;
31using loadbalancer::LoadBalancerStatusReply;
32using loadbalancer::WorkerStatus;
33using loadbalancer::OverviewReply;
35using google::protobuf::Timestamp;
38#define DEFAULT_LB_RESERVE_DURATION 24
69 int64_t total_events_recv, total_events_reassembled, total_events_reassembly_err, total_events_dequeued,
70 total_event_enqueue_err, total_bytes_recv, total_packets_recv;
72 WorkerStats(): total_events_recv{0}, total_events_reassembled{0}, total_events_reassembly_err{0},
73 total_events_dequeued{0}, total_event_enqueue_err{0}, total_bytes_recv{0}, total_packets_recv{0} {
83 google::protobuf::Timestamp timestamp;
84 u_int64_t currentEpoch;
85 u_int64_t currentPredictedEventNumber;
86 std::vector<WorkerStatus> workers;
87 std::vector<std::string> senderAddresses;
88 google::protobuf::Timestamp expiresAt;
93 LBStatus(google::protobuf::Timestamp ts, u_int64_t ce, u_int64_t penum, std::vector<WorkerStatus> &w, std::vector<std::string> &sa, google::protobuf::Timestamp exp):
94 timestamp{ts}, currentEpoch{ce}, currentPredictedEventNumber{penum}, expiresAt{exp}
96 workers = std::move(w);
97 senderAddresses = std::move(sa);
109 std::pair<ip::address, u_int16_t> syncIPv4AndPort;
110 std::pair<ip::address, u_int16_t> syncIPv6AndPort;
111 ip::address dataIPv4;
112 ip::address dataIPv6;
114 u_int32_t dataMinPort;
115 u_int32_t dataMaxPort;
120 using OverviewMessage = std::vector<OverviewEntry>;
126 int64_t timestamp_ms;
129 FloatSample(int64_t ts,
float v) : timestamp_ms{ts}, value{v} {}
137 int64_t timestamp_ms;
140 IntegerSample(int64_t ts, int64_t v) : timestamp_ms{ts}, value{v} {}
150 std::variant<std::vector<FloatSample>, std::vector<IntegerSample>> timeseries;
152 TimeseriesData(
const std::string& p,
const std::string &u, std::vector<FloatSample>& fs):
153 path{p}, unit{u}, timeseries{std::move(fs)} {}
154 TimeseriesData(
const std::string& p,
const std::string &u, std::vector<IntegerSample>& is):
155 path{p}, unit{u}, timeseries{std::move(is)} {}
161 std::vector<TimeseriesData> td;
163 TimeseriesResult(int64_t _ts, std::vector<TimeseriesData> &_td): since_ms{_ts}, td{std::move(_td)} {}
174 EjfatURI::TokenType resourceType;
175 std::string resourceId;
176 EjfatURI::TokenPermission permission;
179 resourceType{EjfatURI::TokenType::all},
181 permission{EjfatURI::TokenPermission::_read_only_} {}
182 TokenPermission(EjfatURI::TokenType rt,
const std::string& rid, EjfatURI::TokenPermission pt):
183 resourceType{rt}, resourceId{rid}, permission{pt} {}
192 std::vector<TokenPermission> permissions;
193 std::string created_at;
209 std::string addr_string;
210 std::unique_ptr<LoadBalancer::Stub> _stub;
211 std::shared_ptr<grpc::Channel> _channel;
227 grpc::SslCredentialsOptions opts = grpc::SslCredentialsOptions()) : _cpuri(cpuri)
234 validateServer =
false;
236 if (!useHostAddress && !cp_host_r.has_error())
239 auto cp_host_v = cp_host_r.value();
240 addr_string = cp_host_v.first +
":"s + std::to_string(cp_host_v.second);
245 auto cp_addr_r = cpuri.get_cpAddr();
246 if (cp_addr_r.has_error())
247 throw E2SARException(
"Unable to initialize LBManager due to missing CP address in URI");
248 auto cp_addr_v = cp_addr_r.value();
249 if (cp_addr_v.first.is_v4())
250 addr_string =
"ipv4:///" + cp_addr_v.first.to_string() +
":" + std::to_string(cp_addr_v.second);
252 addr_string =
"ipv6:///[" + cp_addr_v.first.to_string() +
"]:" + std::to_string(cp_addr_v.second);
257 grpc::experimental::TlsChannelCredentialsOptions topts;
261 std::shared_ptr<grpc::experimental::NoOpCertificateVerifier> verifier = std::make_shared<grpc::experimental::NoOpCertificateVerifier>();
262 topts.set_verify_server_certs(false);
263 topts.set_check_call_host(false);
264 topts.set_certificate_verifier(verifier);
265 _channel = grpc::CreateChannel(addr_string, grpc::experimental::TlsCredentials(topts));
270 _channel = grpc::CreateChannel(addr_string, grpc::SslCredentials(opts));
275 _channel = grpc::CreateChannel(addr_string, grpc::InsecureChannelCredentials());
277 _stub = LoadBalancer::NewStub(_channel);
290 result<u_int32_t> reserveLB(
const std::string &lb_name,
292 const std::vector<std::string> &senders,
293 int ip_family=loadbalancer::IpFamily::DUAL_STACK)
noexcept;
307 result<u_int32_t> reserveLB(
const std::string &lb_name,
308 const boost::posix_time::time_duration &duration,
309 const std::vector<std::string> &senders,
310 int ip_family=loadbalancer::IpFamily::DUAL_STACK)
noexcept;
322 result<u_int32_t> reserveLB(
const std::string &lb_name,
323 const double &durationSeconds,
324 const std::vector<std::string> &senders,
325 int ip_family=loadbalancer::IpFamily::DUAL_STACK)
noexcept;
336 result<int> getLB(
const std::string &lbid)
noexcept;
341 result<int> getLB() noexcept;
353 result<std::unique_ptr<LoadBalancerStatusReply>> getLBStatus(const std::
string &lbid) noexcept;
364 result<std::unique_ptr<LoadBalancerStatusReply>> getLBStatus() noexcept;
373 result<std::unique_ptr<OverviewReply>> overview() noexcept;
380 result<
int> addSenders(const std::vector<std::
string>& senders) noexcept;
386 result<
int> removeSenders(const std::vector<std::
string>& senders) noexcept;
392 result<
int> addSenderSelf(
bool v6=false) noexcept;
398 result<
int> removeSenderSelf(
bool v6=false) noexcept;
408 result<std::
string> createToken(
409 const std::
string &name,
410 const std::vector<TokenPermission> &permissions) noexcept;
419 result<TokenDetails> listTokenPermissions(const TokenSelector &target) noexcept;
428 result<std::vector<TokenDetails>> listChildTokens(const TokenSelector &target) noexcept;
437 result<
int> revokeToken(const TokenSelector &target) noexcept;
447 result<TimeseriesResult> timeseries(const std::
string &path, const Timestamp &since) noexcept;
457 static inline std::vector<WorkerStatus> get_WorkerStatusVector(std::unique_ptr<LoadBalancerStatusReply> &rep) noexcept
459 std::vector<WorkerStatus> ret(rep->workers_size());
462 for (
auto i = rep->workers().begin(); i != rep->workers().end(); ++i, j++)
479 std::vector<WorkerStatus> ret(rep.workers_size());
482 for (
auto i = rep.workers().begin(); i != rep.workers().end(); ++i, j++)
497 std::vector<std::string> ret(rep->senderaddresses_size());
500 for (
auto i = rep->senderaddresses().begin(); i != rep->senderaddresses().end(); ++i, j++)
515 std::vector<std::string> ret(rep.senderaddresses_size());
518 for (
auto i = rep.senderaddresses().begin(); i != rep.senderaddresses().end(); ++i, j++)
526 static inline const std::unique_ptr<LBStatus>
asLBStatus(std::unique_ptr<LoadBalancerStatusReply> &rep)
noexcept
528 std::vector<std::string> addresses{get_SenderAddressVector(rep)};
529 std::vector<WorkerStatus> workers{get_WorkerStatusVector(rep)};
530 std::unique_ptr<LBStatus> pret = std::make_unique<LBStatus>(rep->timestamp(), rep->currentepoch(), rep->currentpredictedeventnumber(),
531 workers, addresses, rep->expiresat());
538 std::vector<std::string> addresses{get_SenderAddressVector(rep)};
539 std::vector<WorkerStatus> workers{get_WorkerStatusVector(rep)};
540 return LBStatus(rep.timestamp(), rep.currentepoch(), rep.currentpredictedeventnumber(),
541 workers, addresses, rep.expiresat());
545 static inline const OverviewMessage
asOverviewMessage(std::unique_ptr<OverviewReply> &rep)
noexcept
548 OverviewMessage om(rep.get()->loadbalancers_size());
549 for (
auto i = rep->loadbalancers().begin(); i != rep->loadbalancers().end(); ++i, j++)
551 om[j].name = i->name();
552 om[j].lbid = i->reservation().lbid();
553 om[j].syncIPv4AndPort.first = ip::make_address(i->reservation().syncipv4address());
554 om[j].syncIPv4AndPort.second = i->reservation().syncudpport();
555 om[j].syncIPv6AndPort.first = ip::make_address(i->reservation().syncipv6address());
556 om[j].syncIPv6AndPort.second = i->reservation().syncudpport();
557 om[j].dataIPv4 = ip::make_address(i->reservation().dataipv4address());
558 om[j].dataIPv6 = ip::make_address(i->reservation().dataipv6address());
559 om[j].fpgaLBId = i->reservation().fpgalbid();
560 om[j].dataMinPort = i->reservation().dataminport();
561 om[j].dataMaxPort = i->reservation().datamaxport();
562 om[j].status = asLBStatus(i->status());
571 OverviewMessage om(rep.loadbalancers_size());
572 for (
auto i = rep.loadbalancers().begin(); i != rep.loadbalancers().end(); ++i, j++)
574 om[j].name = i->name();
575 om[j].lbid = i->reservation().lbid();
576 om[j].syncIPv4AndPort.first = ip::make_address(i->reservation().syncipv4address());
577 om[j].syncIPv4AndPort.second = i->reservation().syncudpport();
578 om[j].syncIPv6AndPort.first = ip::make_address(i->reservation().syncipv6address());
579 om[j].syncIPv6AndPort.second = i->reservation().syncudpport();
580 om[j].dataIPv4 = ip::make_address(i->reservation().dataipv4address());
581 om[j].dataIPv6 = ip::make_address(i->reservation().dataipv6address());
582 om[j].fpgaLBId = i->reservation().fpgalbid();
583 om[j].dataMinPort = i->reservation().dataminport();
584 om[j].dataMaxPort = i->reservation().datamaxport();
585 om[j].status = asLBStatus(i->status());
596 result<int> freeLB(
const std::string &lbid)
noexcept;
602 result<int> freeLB() noexcept;
621 result<
int> registerWorker(const std::
string &node_name, std::pair<ip::address, u_int16_t> node_ip_port,
float weight, u_int16_t source_count,
622 float min_factor,
float max_factor,
bool keep_lb_header=false) noexcept;
643 result<
int> registerWorkerSelf(const std::
string &node_name, u_int16_t node_port,
float weight, u_int16_t source_count,
float min_factor,
float max_factor,
644 bool v6=false,
bool keep_lb_header=false) noexcept;
651 result<
int> deregisterWorker() noexcept;
663 result<
int> sendState(
float fill_percent,
float control_signal,
bool is_ready,
677 result<
int> sendState(
float fill_percent,
float control_signal,
bool is_ready, const Timestamp &ts,
688 result<
int> sendState(
float fill_percent,
float control_signal,
bool is_ready) noexcept;
700 result<
int> sendState(
float fill_percent,
float control_signal,
bool is_ready, const Timestamp &ts) noexcept;
707 result<boost::tuple<std::
string, std::
string, std::
string>> version() noexcept;
712 inline const
EjfatURI &get_URI() const noexcept {
return _cpuri; }
723 static inline result<grpc::SslCredentialsOptions>
makeSslOptions(
const std::string &pem_root_certs,
724 const std::string &pem_private_key,
725 const std::string &pem_cert_chain)
noexcept
727 return grpc::SslCredentialsOptions{std::move(pem_root_certs),
728 std::move(pem_private_key),
729 std::move(pem_cert_chain)};
741 static result<grpc::SslCredentialsOptions> makeSslOptionsFromFiles(
742 std::string_view pem_root_certs,
743 std::string_view pem_private_key,
744 std::string_view pem_cert_chain)
noexcept;
750 static result<grpc::SslCredentialsOptions> makeSslOptionsFromFiles(
751 std::string_view pem_root_certs)
noexcept;
772 static inline int get_PortRange(
int source_count)
noexcept
779 if (source_count < 2)
783 else if (source_count > 16384)
791 while (source_count > maxCount)
Definition e2sarUtil.hpp:56
bool get_useTls() const
Definition e2sarUtil.hpp:152
const result< std::pair< std::string, u_int16_t > > get_cpHost() const
Definition e2sarUtil.hpp:273
Definition e2sarCP.hpp:206
static std::vector< std::string > get_SenderAddressVector(std::unique_ptr< LoadBalancerStatusReply > &rep) noexcept
Definition e2sarCP.hpp:495
LBManager(const EjfatURI &cpuri, bool validateServer=true, bool useHostAddress=false, grpc::SslCredentialsOptions opts=grpc::SslCredentialsOptions())
Definition e2sarCP.hpp:226
static std::vector< WorkerStatus > get_WorkerStatusVector(const LoadBalancerStatusReply &rep) noexcept
Definition e2sarCP.hpp:477
std::string get_AddrString()
Definition e2sarCP.hpp:759
static std::vector< std::string > get_SenderAddressVector(const LoadBalancerStatusReply &rep) noexcept
Definition e2sarCP.hpp:513
static const OverviewMessage asOverviewMessage(std::unique_ptr< OverviewReply > &rep) noexcept
Definition e2sarCP.hpp:545
static const std::unique_ptr< LBStatus > asLBStatus(std::unique_ptr< LoadBalancerStatusReply > &rep) noexcept
Definition e2sarCP.hpp:526
static const OverviewMessage asOverviewMessage(const OverviewReply &rep) noexcept
Definition e2sarCP.hpp:568
static result< grpc::SslCredentialsOptions > makeSslOptions(const std::string &pem_root_certs, const std::string &pem_private_key, const std::string &pem_cert_chain) noexcept
Definition e2sarCP.hpp:723
static const LBStatus asLBStatus(const LoadBalancerStatusReply &rep) noexcept
Definition e2sarCP.hpp:536
google::protobuf::Timestamp TimeUntil
Definition e2sarCP.hpp:56
std::variant< uint32_t, std::string > TokenSelector
Definition e2sarCP.hpp:203
Definition e2sarCP.hpp:125
Definition e2sarCP.hpp:136
Definition e2sarCP.hpp:82
LBStatus(google::protobuf::Timestamp ts, u_int64_t ce, u_int64_t penum, std::vector< WorkerStatus > &w, std::vector< std::string > &sa, google::protobuf::Timestamp exp)
Definition e2sarCP.hpp:93
Definition e2sarCP.hpp:106
Definition e2sarCP.hpp:147
Definition e2sarCP.hpp:159
Definition e2sarCP.hpp:190
Definition e2sarCP.hpp:173
Definition e2sarCP.hpp:68