E2SAR 0.2.0
Loading...
Searching...
No Matches
e2sarCP.hpp
1#ifndef E2SARCPHPP
2#define E2SARCPHPP
3#include <vector>
4#include <memory>
5#include <boost/asio.hpp>
6#include <boost/tuple/tuple.hpp>
7
8#include <grpc/grpc.h>
9#include <grpcpp/channel.h>
10#include <grpcpp/client_context.h>
11#include <grpcpp/create_channel.h>
12#include <grpcpp/security/credentials.h>
13
14#include <google/protobuf/util/time_util.h>
15#include <boost/date_time/posix_time/posix_time.hpp>
16
17#include "grpc/loadbalancer.grpc.pb.h"
18
19#include "e2sarUtil.hpp"
20
21using grpc::Channel;
22using grpc::ClientContext;
23using grpc::ClientReader;
24using grpc::ClientReaderWriter;
25using grpc::ClientWriter;
26using grpc::Status;
27
28using loadbalancer::LoadBalancer;
29
30using loadbalancer::LoadBalancerStatusReply;
31using loadbalancer::WorkerStatus;
32using loadbalancer::OverviewReply;
33
34using google::protobuf::Timestamp;
35
36// default reservation duration for a load balancer in hours
37#define DEFAULT_LB_RESERVE_DURATION 24
38
39// change to '1' to test older versions of UDPLBd where token
40// was sent as a parameter in the body. The new way is to send
41// it in the authorization header as bearer token
42#ifndef TOKEN_IN_BODY
43#define TOKEN_IN_BODY 0
44#endif
45
46/***
47 * Control Plane definitions for E2SAR
48 */
49
50namespace e2sar
51{
62 using TimeUntil = google::protobuf::Timestamp;
63
68 {
69 std::string name;
70 float fillPercent;
71 float controlSignal;
72 u_int32_t slotsAssigned;
73 google::protobuf::Timestamp lastUpdated;
74
75 LBWorkerStatus(const std::string &n, float fp, float cs, u_int32_t sa, google::protobuf::Timestamp lu) : name{n}, fillPercent{fp}, controlSignal{cs}, slotsAssigned{sa}, lastUpdated{lu} {}
76 };
77
81 struct LBStatus
82 {
83 google::protobuf::Timestamp timestamp;
84 u_int64_t currentEpoch;
85 u_int64_t currentPredictedEventNumber;
86 std::vector<WorkerStatus> workers;
87 std::vector<std::string> senderAddresses;
88 google::protobuf::Timestamp expiresAt;
89
93 LBStatus(google::protobuf::Timestamp ts, u_int64_t ce, u_int64_t penum, std::vector<WorkerStatus> &w, std::vector<std::string> &sa, google::protobuf::Timestamp exp):
94 timestamp{ts}, currentEpoch{ce}, currentPredictedEventNumber{penum}, expiresAt{exp}
95 {
96 workers = std::move(w);
97 senderAddresses = std::move(sa);
98 }
99 LBStatus() {}
100 };
101
106 {
107 std::string name; // name passed in reserveLB
108 std::string lbid; // load balancer id
109 std::pair<ip::address, u_int16_t> syncAddressAndPort;
110 ip::address dataIPv4;
111 ip::address dataIPv6;
112 u_int32_t fpgaLBId;
113 LBStatus status; // same as in lbstatus call
114
115 OverviewEntry() {}
116 };
117 using OverviewMessage = std::vector<OverviewEntry>;
118
120 {
121 private:
122 EjfatURI _cpuri;
123 std::string addr_string;
124 std::unique_ptr<LoadBalancer::Stub> _stub;
125 std::shared_ptr<grpc::Channel> _channel;
126
127 protected:
128 public:
140 LBManager(const EjfatURI &cpuri, bool validateServer = true, bool useHostAddress = false,
141 grpc::SslCredentialsOptions opts = grpc::SslCredentialsOptions()) : _cpuri(cpuri)
142 {
143
144 auto cp_host_r = cpuri.get_cpHost();
145
146 // using host address automatically disables cert validation
147 if (useHostAddress)
148 validateServer = false;
149
150 if (!useHostAddress && !cp_host_r.has_error())
151 {
152 // try hostname
153 auto cp_host_v = cp_host_r.value();
154 addr_string = cp_host_v.first + ":"s + std::to_string(cp_host_v.second);
155 }
156 else
157 {
158 // try address
159 auto cp_addr_r = cpuri.get_cpAddr();
160 if (cp_addr_r.has_error())
161 throw E2SARException("Unable to initialize LBManager due to missing CP address in URI");
162 auto cp_addr_v = cp_addr_r.value();
163 if (cp_addr_v.first.is_v4())
164 addr_string = "ipv4:///" + cp_addr_v.first.to_string() + ":" + std::to_string(cp_addr_v.second);
165 else
166 addr_string = "ipv6:///[" + cp_addr_v.first.to_string() + "]:" + std::to_string(cp_addr_v.second);
167 }
168
169 if (cpuri.get_useTls())
170 {
171 grpc::experimental::TlsChannelCredentialsOptions topts;
172 if (!validateServer)
173 {
174 // disable most of server certificate validation
175 std::shared_ptr<grpc::experimental::NoOpCertificateVerifier> verifier = std::make_shared<grpc::experimental::NoOpCertificateVerifier>();
176 topts.set_verify_server_certs(false);
177 topts.set_check_call_host(false);
178 topts.set_certificate_verifier(verifier);
179 _channel = grpc::CreateChannel(addr_string, grpc::experimental::TlsCredentials(topts));
180 }
181 else
182 {
183 // use provided SSL options
184 _channel = grpc::CreateChannel(addr_string, grpc::SslCredentials(opts));
185 }
186 }
187 else
188 {
189 _channel = grpc::CreateChannel(addr_string, grpc::InsecureChannelCredentials());
190 }
191 _stub = LoadBalancer::NewStub(_channel);
192 }
193
203 result<u_int32_t> reserveLB(const std::string &lb_name,
204 const TimeUntil &until,
205 const std::vector<std::string> &senders) noexcept;
206
218 result<u_int32_t> reserveLB(const std::string &lb_name,
219 const boost::posix_time::time_duration &duration,
220 const std::vector<std::string> &senders) noexcept;
221
231 result<u_int32_t> reserveLB(const std::string &lb_name,
232 const double &durationSeconds,
233 const std::vector<std::string> &senders) noexcept;
234
244 result<int> getLB(const std::string &lbid) noexcept;
249 result<int> getLB() noexcept;
250
261 result<std::unique_ptr<LoadBalancerStatusReply>> getLBStatus(const std::string &lbid) noexcept;
262
272 result<std::unique_ptr<LoadBalancerStatusReply>> getLBStatus() noexcept;
273
281 result<std::unique_ptr<OverviewReply>> overview() noexcept;
282
288 result<int> addSenders(const std::vector<std::string>& senders) noexcept;
289
294 result<int> removeSenders(const std::vector<std::string>& senders) noexcept;
295
300 result<int> addSenderSelf(bool v6=false) noexcept;
301
306 result<int> removeSenderSelf(bool v6=false) noexcept;
307
316 static inline std::vector<WorkerStatus> get_WorkerStatusVector(std::unique_ptr<LoadBalancerStatusReply> &rep) noexcept
317 {
318 std::vector<WorkerStatus> ret(rep->workers_size());
319
320 size_t j{0};
321 for (auto i = rep->workers().begin(); i != rep->workers().end(); ++i, j++)
322 {
323 ret[j].CopyFrom(*i);
324 }
325 return ret;
326 }
327
336 static inline std::vector<WorkerStatus> get_WorkerStatusVector(const LoadBalancerStatusReply &rep) noexcept
337 {
338 std::vector<WorkerStatus> ret(rep.workers_size());
339
340 size_t j{0};
341 for (auto i = rep.workers().begin(); i != rep.workers().end(); ++i, j++)
342 {
343 ret[j].CopyFrom(*i);
344 }
345 return ret;
346 }
347
354 static inline std::vector<std::string> get_SenderAddressVector(std::unique_ptr<LoadBalancerStatusReply> &rep) noexcept
355 {
356 std::vector<std::string> ret(rep->senderaddresses_size());
357
358 size_t j{0};
359 for (auto i = rep->senderaddresses().begin(); i != rep->senderaddresses().end(); ++i, j++)
360 {
361 ret[j] = *i;
362 }
363 return ret;
364 }
365
372 static inline std::vector<std::string> get_SenderAddressVector(const LoadBalancerStatusReply &rep) noexcept
373 {
374 std::vector<std::string> ret(rep.senderaddresses_size());
375
376 size_t j{0};
377 for (auto i = rep.senderaddresses().begin(); i != rep.senderaddresses().end(); ++i, j++)
378 {
379 ret[j] = *i;
380 }
381 return ret;
382 }
383
385 static inline const std::unique_ptr<LBStatus> asLBStatus(std::unique_ptr<LoadBalancerStatusReply> &rep) noexcept
386 {
387 std::vector<std::string> addresses{get_SenderAddressVector(rep)};
388 std::vector<WorkerStatus> workers{get_WorkerStatusVector(rep)};
389 std::unique_ptr<LBStatus> pret = std::make_unique<LBStatus>(rep->timestamp(), rep->currentepoch(), rep->currentpredictedeventnumber(),
390 workers, addresses, rep->expiresat());
391 return pret;
392 }
393
395 static inline const LBStatus asLBStatus(const LoadBalancerStatusReply &rep) noexcept
396 {
397 std::vector<std::string> addresses{get_SenderAddressVector(rep)};
398 std::vector<WorkerStatus> workers{get_WorkerStatusVector(rep)};
399 return LBStatus(rep.timestamp(), rep.currentepoch(), rep.currentpredictedeventnumber(),
400 workers, addresses, rep.expiresat());
401 }
402
404 static inline const OverviewMessage asOverviewMessage(std::unique_ptr<OverviewReply> &rep) noexcept
405 {
406 size_t j{0};
407 OverviewMessage om(rep.get()->loadbalancers_size());
408 for (auto i = rep->loadbalancers().begin(); i != rep->loadbalancers().end(); ++i, j++)
409 {
410 om[j].name = i->name();
411 om[j].lbid = i->reservation().lbid();
412 om[j].syncAddressAndPort.first = ip::make_address(i->reservation().syncipaddress());
413 om[j].syncAddressAndPort.second = i->reservation().syncudpport();
414 om[j].dataIPv4 = ip::make_address(i->reservation().dataipv4address());
415 om[j].dataIPv6 = ip::make_address(i->reservation().dataipv6address());
416 om[j].fpgaLBId = i->reservation().fpgalbid();
417 om[j].status = asLBStatus(i->status());
418 }
419 return om;
420 }
421
423 static inline const OverviewMessage asOverviewMessage(const OverviewReply &rep) noexcept
424 {
425 size_t j{0};
426 OverviewMessage om(rep.loadbalancers_size());
427 for (auto i = rep.loadbalancers().begin(); i != rep.loadbalancers().end(); ++i, j++)
428 {
429 om[j].name = i->name();
430 om[j].lbid = i->reservation().lbid();
431 om[j].syncAddressAndPort.first = ip::make_address(i->reservation().syncipaddress());
432 om[j].syncAddressAndPort.second = i->reservation().syncudpport();
433 om[j].dataIPv4 = ip::make_address(i->reservation().dataipv4address());
434 om[j].dataIPv6 = ip::make_address(i->reservation().dataipv6address());
435 om[j].fpgaLBId = i->reservation().fpgalbid();
436 om[j].status = asLBStatus(i->status());
437 }
438 return om;
439 }
440
447 result<int> freeLB(const std::string &lbid) noexcept;
453 result<int> freeLB() noexcept;
454
471 result<int> registerWorker(const std::string &node_name, std::pair<ip::address, u_int16_t> node_ip_port, float weight, u_int16_t source_count, float min_factor, float max_factor) noexcept;
472
491 result<int> registerWorkerSelf(const std::string &node_name, u_int16_t node_port, float weight, u_int16_t source_count, float min_factor, float max_factor, bool v6=false) noexcept;
492
498 result<int> deregisterWorker() noexcept;
499
508 result<int> sendState(float fill_percent, float control_signal, bool is_ready) noexcept;
509
520 result<int> sendState(float fill_percent, float control_signal, bool is_ready, const Timestamp &ts) noexcept;
521
527 result<boost::tuple<std::string, std::string, std::string>> version() noexcept;
528
532 inline const EjfatURI &get_URI() const noexcept { return _cpuri; }
533
543 static inline result<grpc::SslCredentialsOptions> makeSslOptions(const std::string &pem_root_certs,
544 const std::string &pem_private_key,
545 const std::string &pem_cert_chain) noexcept
546 {
547 return grpc::SslCredentialsOptions{std::move(pem_root_certs),
548 std::move(pem_private_key),
549 std::move(pem_cert_chain)};
550 }
551
561 static result<grpc::SslCredentialsOptions> makeSslOptionsFromFiles(
562 std::string_view pem_root_certs,
563 std::string_view pem_private_key,
564 std::string_view pem_cert_chain) noexcept;
565
570 static result<grpc::SslCredentialsOptions> makeSslOptionsFromFiles(
571 std::string_view pem_root_certs) noexcept;
572
579 inline std::string get_AddrString() {
580 return addr_string;
581 }
582 };
583
591 static inline int get_PortRange(int source_count) noexcept
592 {
593 // Based on the proto file enum for the load balancer, seen below,
594 // map the max # of sources a backend will see to the PortRange value.
595 // This is necessay to provide the control plane when registering.
596
597 // Handle edge cases
598 if (source_count < 2)
599 {
600 return 0;
601 }
602 else if (source_count > 16384)
603 {
604 return 14;
605 }
606
607 int maxCount = 2;
608 int iteration = 1;
609
610 while (source_count > maxCount)
611 {
612 iteration++;
613 maxCount <<= 1;
614 }
615
616 return iteration;
617 }
618}
619#endif
Definition e2sarUtil.hpp:31
bool get_useTls() const
Definition e2sarUtil.hpp:98
const result< std::pair< std::string, u_int16_t > > get_cpHost() const
Definition e2sarUtil.hpp:210
Definition e2sarCP.hpp:120
static std::vector< std::string > get_SenderAddressVector(std::unique_ptr< LoadBalancerStatusReply > &rep) noexcept
Definition e2sarCP.hpp:354
LBManager(const EjfatURI &cpuri, bool validateServer=true, bool useHostAddress=false, grpc::SslCredentialsOptions opts=grpc::SslCredentialsOptions())
Definition e2sarCP.hpp:140
static std::vector< WorkerStatus > get_WorkerStatusVector(const LoadBalancerStatusReply &rep) noexcept
Definition e2sarCP.hpp:336
std::string get_AddrString()
Definition e2sarCP.hpp:579
static std::vector< std::string > get_SenderAddressVector(const LoadBalancerStatusReply &rep) noexcept
Definition e2sarCP.hpp:372
static const OverviewMessage asOverviewMessage(std::unique_ptr< OverviewReply > &rep) noexcept
Definition e2sarCP.hpp:404
static const std::unique_ptr< LBStatus > asLBStatus(std::unique_ptr< LoadBalancerStatusReply > &rep) noexcept
Definition e2sarCP.hpp:385
static const OverviewMessage asOverviewMessage(const OverviewReply &rep) noexcept
Definition e2sarCP.hpp:423
static result< grpc::SslCredentialsOptions > makeSslOptions(const std::string &pem_root_certs, const std::string &pem_private_key, const std::string &pem_cert_chain) noexcept
Definition e2sarCP.hpp:543
static const LBStatus asLBStatus(const LoadBalancerStatusReply &rep) noexcept
Definition e2sarCP.hpp:395
Definition e2sar.hpp:11
google::protobuf::Timestamp TimeUntil
Definition e2sarCP.hpp:62
Definition e2sarCP.hpp:82
LBStatus(google::protobuf::Timestamp ts, u_int64_t ce, u_int64_t penum, std::vector< WorkerStatus > &w, std::vector< std::string > &sa, google::protobuf::Timestamp exp)
Definition e2sarCP.hpp:93
Definition e2sarCP.hpp:68
Definition e2sarCP.hpp:106