5#include <boost/asio.hpp>
6#include <boost/tuple/tuple.hpp>
9#include <grpcpp/channel.h>
10#include <grpcpp/client_context.h>
11#include <grpcpp/create_channel.h>
12#include <grpcpp/security/credentials.h>
14#include <google/protobuf/util/time_util.h>
15#include <boost/date_time/posix_time/posix_time.hpp>
17#include "grpc/loadbalancer.grpc.pb.h"
19#include "e2sarUtil.hpp"
22using grpc::ClientContext;
23using grpc::ClientReader;
24using grpc::ClientReaderWriter;
25using grpc::ClientWriter;
28using loadbalancer::LoadBalancer;
30using loadbalancer::LoadBalancerStatusReply;
31using loadbalancer::WorkerStatus;
32using loadbalancer::OverviewReply;
34using google::protobuf::Timestamp;
37#define DEFAULT_LB_RESERVE_DURATION 24
43#define TOKEN_IN_BODY 0
72 u_int32_t slotsAssigned;
73 google::protobuf::Timestamp lastUpdated;
75 LBWorkerStatus(
const std::string &n,
float fp,
float cs, u_int32_t sa, google::protobuf::Timestamp lu) : name{n}, fillPercent{fp}, controlSignal{cs}, slotsAssigned{sa}, lastUpdated{lu} {}
83 google::protobuf::Timestamp timestamp;
84 u_int64_t currentEpoch;
85 u_int64_t currentPredictedEventNumber;
86 std::vector<WorkerStatus> workers;
87 std::vector<std::string> senderAddresses;
88 google::protobuf::Timestamp expiresAt;
93 LBStatus(google::protobuf::Timestamp ts, u_int64_t ce, u_int64_t penum, std::vector<WorkerStatus> &w, std::vector<std::string> &sa, google::protobuf::Timestamp exp):
94 timestamp{ts}, currentEpoch{ce}, currentPredictedEventNumber{penum}, expiresAt{exp}
96 workers = std::move(w);
97 senderAddresses = std::move(sa);
109 std::pair<ip::address, u_int16_t> syncAddressAndPort;
110 ip::address dataIPv4;
111 ip::address dataIPv6;
117 using OverviewMessage = std::vector<OverviewEntry>;
123 std::string addr_string;
124 std::unique_ptr<LoadBalancer::Stub> _stub;
125 std::shared_ptr<grpc::Channel> _channel;
141 grpc::SslCredentialsOptions opts = grpc::SslCredentialsOptions()) : _cpuri(cpuri)
148 validateServer =
false;
150 if (!useHostAddress && !cp_host_r.has_error())
153 auto cp_host_v = cp_host_r.value();
154 addr_string = cp_host_v.first +
":"s + std::to_string(cp_host_v.second);
159 auto cp_addr_r = cpuri.get_cpAddr();
160 if (cp_addr_r.has_error())
161 throw E2SARException(
"Unable to initialize LBManager due to missing CP address in URI");
162 auto cp_addr_v = cp_addr_r.value();
163 if (cp_addr_v.first.is_v4())
164 addr_string =
"ipv4:///" + cp_addr_v.first.to_string() +
":" + std::to_string(cp_addr_v.second);
166 addr_string =
"ipv6:///[" + cp_addr_v.first.to_string() +
"]:" + std::to_string(cp_addr_v.second);
171 grpc::experimental::TlsChannelCredentialsOptions topts;
175 std::shared_ptr<grpc::experimental::NoOpCertificateVerifier> verifier = std::make_shared<grpc::experimental::NoOpCertificateVerifier>();
176 topts.set_verify_server_certs(false);
177 topts.set_check_call_host(false);
178 topts.set_certificate_verifier(verifier);
179 _channel = grpc::CreateChannel(addr_string, grpc::experimental::TlsCredentials(topts));
184 _channel = grpc::CreateChannel(addr_string, grpc::SslCredentials(opts));
189 _channel = grpc::CreateChannel(addr_string, grpc::InsecureChannelCredentials());
191 _stub = LoadBalancer::NewStub(_channel);
203 result<u_int32_t> reserveLB(
const std::string &lb_name,
205 const std::vector<std::string> &senders)
noexcept;
218 result<u_int32_t> reserveLB(
const std::string &lb_name,
219 const boost::posix_time::time_duration &duration,
220 const std::vector<std::string> &senders)
noexcept;
231 result<u_int32_t> reserveLB(
const std::string &lb_name,
232 const double &durationSeconds,
233 const std::vector<std::string> &senders)
noexcept;
244 result<int> getLB(
const std::string &lbid)
noexcept;
249 result<int> getLB() noexcept;
261 result<std::unique_ptr<LoadBalancerStatusReply>> getLBStatus(const std::
string &lbid) noexcept;
272 result<std::unique_ptr<LoadBalancerStatusReply>> getLBStatus() noexcept;
281 result<std::unique_ptr<OverviewReply>> overview() noexcept;
288 result<
int> addSenders(const std::vector<std::
string>& senders) noexcept;
294 result<
int> removeSenders(const std::vector<std::
string>& senders) noexcept;
300 result<
int> addSenderSelf(
bool v6=false) noexcept;
306 result<
int> removeSenderSelf(
bool v6=false) noexcept;
316 static inline std::vector<WorkerStatus> get_WorkerStatusVector(std::unique_ptr<LoadBalancerStatusReply> &rep) noexcept
318 std::vector<WorkerStatus> ret(rep->workers_size());
321 for (
auto i = rep->workers().begin(); i != rep->workers().end(); ++i, j++)
338 std::vector<WorkerStatus> ret(rep.workers_size());
341 for (
auto i = rep.workers().begin(); i != rep.workers().end(); ++i, j++)
356 std::vector<std::string> ret(rep->senderaddresses_size());
359 for (
auto i = rep->senderaddresses().begin(); i != rep->senderaddresses().end(); ++i, j++)
374 std::vector<std::string> ret(rep.senderaddresses_size());
377 for (
auto i = rep.senderaddresses().begin(); i != rep.senderaddresses().end(); ++i, j++)
385 static inline const std::unique_ptr<LBStatus>
asLBStatus(std::unique_ptr<LoadBalancerStatusReply> &rep)
noexcept
387 std::vector<std::string> addresses{get_SenderAddressVector(rep)};
388 std::vector<WorkerStatus> workers{get_WorkerStatusVector(rep)};
389 std::unique_ptr<LBStatus> pret = std::make_unique<LBStatus>(rep->timestamp(), rep->currentepoch(), rep->currentpredictedeventnumber(),
390 workers, addresses, rep->expiresat());
397 std::vector<std::string> addresses{get_SenderAddressVector(rep)};
398 std::vector<WorkerStatus> workers{get_WorkerStatusVector(rep)};
399 return LBStatus(rep.timestamp(), rep.currentepoch(), rep.currentpredictedeventnumber(),
400 workers, addresses, rep.expiresat());
404 static inline const OverviewMessage
asOverviewMessage(std::unique_ptr<OverviewReply> &rep)
noexcept
407 OverviewMessage om(rep.get()->loadbalancers_size());
408 for (
auto i = rep->loadbalancers().begin(); i != rep->loadbalancers().end(); ++i, j++)
410 om[j].name = i->name();
411 om[j].lbid = i->reservation().lbid();
412 om[j].syncAddressAndPort.first = ip::make_address(i->reservation().syncipaddress());
413 om[j].syncAddressAndPort.second = i->reservation().syncudpport();
414 om[j].dataIPv4 = ip::make_address(i->reservation().dataipv4address());
415 om[j].dataIPv6 = ip::make_address(i->reservation().dataipv6address());
416 om[j].fpgaLBId = i->reservation().fpgalbid();
417 om[j].status = asLBStatus(i->status());
426 OverviewMessage om(rep.loadbalancers_size());
427 for (
auto i = rep.loadbalancers().begin(); i != rep.loadbalancers().end(); ++i, j++)
429 om[j].name = i->name();
430 om[j].lbid = i->reservation().lbid();
431 om[j].syncAddressAndPort.first = ip::make_address(i->reservation().syncipaddress());
432 om[j].syncAddressAndPort.second = i->reservation().syncudpport();
433 om[j].dataIPv4 = ip::make_address(i->reservation().dataipv4address());
434 om[j].dataIPv6 = ip::make_address(i->reservation().dataipv6address());
435 om[j].fpgaLBId = i->reservation().fpgalbid();
436 om[j].status = asLBStatus(i->status());
447 result<int> freeLB(
const std::string &lbid)
noexcept;
453 result<int> freeLB() noexcept;
471 result<
int> registerWorker(const std::
string &node_name, std::pair<ip::address, u_int16_t> node_ip_port,
float weight, u_int16_t source_count,
float min_factor,
float max_factor) noexcept;
491 result<
int> registerWorkerSelf(const std::
string &node_name, u_int16_t node_port,
float weight, u_int16_t source_count,
float min_factor,
float max_factor,
bool v6=false) noexcept;
498 result<
int> deregisterWorker() noexcept;
508 result<
int> sendState(
float fill_percent,
float control_signal,
bool is_ready) noexcept;
520 result<
int> sendState(
float fill_percent,
float control_signal,
bool is_ready, const Timestamp &ts) noexcept;
527 result<boost::tuple<std::
string, std::
string, std::
string>> version() noexcept;
532 inline const
EjfatURI &get_URI() const noexcept {
return _cpuri; }
543 static inline result<grpc::SslCredentialsOptions>
makeSslOptions(
const std::string &pem_root_certs,
544 const std::string &pem_private_key,
545 const std::string &pem_cert_chain)
noexcept
547 return grpc::SslCredentialsOptions{std::move(pem_root_certs),
548 std::move(pem_private_key),
549 std::move(pem_cert_chain)};
561 static result<grpc::SslCredentialsOptions> makeSslOptionsFromFiles(
562 std::string_view pem_root_certs,
563 std::string_view pem_private_key,
564 std::string_view pem_cert_chain)
noexcept;
570 static result<grpc::SslCredentialsOptions> makeSslOptionsFromFiles(
571 std::string_view pem_root_certs)
noexcept;
591 static inline int get_PortRange(
int source_count)
noexcept
598 if (source_count < 2)
602 else if (source_count > 16384)
610 while (source_count > maxCount)
Definition e2sarUtil.hpp:31
bool get_useTls() const
Definition e2sarUtil.hpp:98
const result< std::pair< std::string, u_int16_t > > get_cpHost() const
Definition e2sarUtil.hpp:210
Definition e2sarCP.hpp:120
static std::vector< std::string > get_SenderAddressVector(std::unique_ptr< LoadBalancerStatusReply > &rep) noexcept
Definition e2sarCP.hpp:354
LBManager(const EjfatURI &cpuri, bool validateServer=true, bool useHostAddress=false, grpc::SslCredentialsOptions opts=grpc::SslCredentialsOptions())
Definition e2sarCP.hpp:140
static std::vector< WorkerStatus > get_WorkerStatusVector(const LoadBalancerStatusReply &rep) noexcept
Definition e2sarCP.hpp:336
std::string get_AddrString()
Definition e2sarCP.hpp:579
static std::vector< std::string > get_SenderAddressVector(const LoadBalancerStatusReply &rep) noexcept
Definition e2sarCP.hpp:372
static const OverviewMessage asOverviewMessage(std::unique_ptr< OverviewReply > &rep) noexcept
Definition e2sarCP.hpp:404
static const std::unique_ptr< LBStatus > asLBStatus(std::unique_ptr< LoadBalancerStatusReply > &rep) noexcept
Definition e2sarCP.hpp:385
static const OverviewMessage asOverviewMessage(const OverviewReply &rep) noexcept
Definition e2sarCP.hpp:423
static result< grpc::SslCredentialsOptions > makeSslOptions(const std::string &pem_root_certs, const std::string &pem_private_key, const std::string &pem_cert_chain) noexcept
Definition e2sarCP.hpp:543
static const LBStatus asLBStatus(const LoadBalancerStatusReply &rep) noexcept
Definition e2sarCP.hpp:395
google::protobuf::Timestamp TimeUntil
Definition e2sarCP.hpp:62
Definition e2sarCP.hpp:82
LBStatus(google::protobuf::Timestamp ts, u_int64_t ce, u_int64_t penum, std::vector< WorkerStatus > &w, std::vector< std::string > &sa, google::protobuf::Timestamp exp)
Definition e2sarCP.hpp:93
Definition e2sarCP.hpp:68
Definition e2sarCP.hpp:106