E2SAR 0.3.0
Loading...
Searching...
No Matches
e2sarCP.hpp
1#ifndef E2SARCPHPP
2#define E2SARCPHPP
3#include <vector>
4#include <memory>
5#include <boost/asio.hpp>
6#include <boost/tuple/tuple.hpp>
7
8#include <grpc/grpc.h>
9#include <grpcpp/channel.h>
10#include <grpcpp/client_context.h>
11#include <grpcpp/create_channel.h>
12#include <grpcpp/security/credentials.h>
13
14#include <google/protobuf/util/time_util.h>
15
16#include <boost/date_time/posix_time/posix_time.hpp>
17
18#include "grpc/loadbalancer.grpc.pb.h"
19
20#include "e2sarUtil.hpp"
21
22using grpc::Channel;
23using grpc::ClientContext;
24using grpc::ClientReader;
25using grpc::ClientReaderWriter;
26using grpc::ClientWriter;
27using grpc::Status;
28
29using loadbalancer::LoadBalancer;
30
31using loadbalancer::LoadBalancerStatusReply;
32using loadbalancer::WorkerStatus;
33using loadbalancer::OverviewReply;
34
35using google::protobuf::Timestamp;
36
37// default reservation duration for a load balancer in hours
38#define DEFAULT_LB_RESERVE_DURATION 24
39
40/***
41 * Control Plane definitions for E2SAR
42 */
43
44namespace e2sar
45{
56 using TimeUntil = google::protobuf::Timestamp;
57
68 struct WorkerStats {
69 int64_t total_events_recv, total_events_reassembled, total_events_reassembly_err, total_events_dequeued,
70 total_event_enqueue_err, total_bytes_recv, total_packets_recv;
71
72 WorkerStats(): total_events_recv{0}, total_events_reassembled{0}, total_events_reassembly_err{0},
73 total_events_dequeued{0}, total_event_enqueue_err{0}, total_bytes_recv{0}, total_packets_recv{0} {
74 ;
75 }
76 };
77
81 struct LBStatus
82 {
83 google::protobuf::Timestamp timestamp;
84 u_int64_t currentEpoch;
85 u_int64_t currentPredictedEventNumber;
86 std::vector<WorkerStatus> workers;
87 std::vector<std::string> senderAddresses;
88 google::protobuf::Timestamp expiresAt;
89
93 LBStatus(google::protobuf::Timestamp ts, u_int64_t ce, u_int64_t penum, std::vector<WorkerStatus> &w, std::vector<std::string> &sa, google::protobuf::Timestamp exp):
94 timestamp{ts}, currentEpoch{ce}, currentPredictedEventNumber{penum}, expiresAt{exp}
95 {
96 workers = std::move(w);
97 senderAddresses = std::move(sa);
98 }
99 LBStatus() {}
100 };
101
106 {
107 std::string name; // name passed in reserveLB
108 std::string lbid; // load balancer id
109 std::pair<ip::address, u_int16_t> syncIPv4AndPort;
110 std::pair<ip::address, u_int16_t> syncIPv6AndPort;
111 ip::address dataIPv4;
112 ip::address dataIPv6;
113 u_int32_t fpgaLBId;
114 u_int32_t dataMinPort;
115 u_int32_t dataMaxPort;
116 LBStatus status; // same as in lbstatus call
117
118 OverviewEntry() {}
119 };
120 using OverviewMessage = std::vector<OverviewEntry>;
121
125 struct FloatSample {
126 int64_t timestamp_ms; // milliseconds since epoch
127 float value;
128
129 FloatSample(int64_t ts, float v) : timestamp_ms{ts}, value{v} {}
130 FloatSample() : timestamp_ms{0}, value{0.0f} {}
131 };
132
137 int64_t timestamp_ms; // milliseconds since epoch
138 int64_t value;
139
140 IntegerSample(int64_t ts, int64_t v) : timestamp_ms{ts}, value{v} {}
141 IntegerSample() : timestamp_ms{0}, value{0} {}
142 };
143
148 std::string path;
149 std::string unit;
150 std::variant<std::vector<FloatSample>, std::vector<IntegerSample>> timeseries;
151
152 TimeseriesData(const std::string& p, const std::string &u, std::vector<FloatSample>& fs):
153 path{p}, unit{u}, timeseries{std::move(fs)} {}
154 TimeseriesData(const std::string& p, const std::string &u, std::vector<IntegerSample>& is):
155 path{p}, unit{u}, timeseries{std::move(is)} {}
156 };
157
158 // a collection of timeseries with a common since timestamp
160 int64_t since_ms;
161 std::vector<TimeseriesData> td; // possibly multiple vectors of timeseries data returned by the query
162
163 TimeseriesResult(int64_t _ts, std::vector<TimeseriesData> &_td): since_ms{_ts}, td{std::move(_td)} {}
164
165 // be careful this is not a true copy constructor as it uses move to move the timeseries
166 TimeseriesResult(TimeseriesResult&& _tsr): since_ms{_tsr.since_ms}, td{std::move(_tsr.td)} {}
167 };
168
174 EjfatURI::TokenType resourceType;
175 std::string resourceId; // optional, can be empty
176 EjfatURI::TokenPermission permission;
177
179 resourceType{EjfatURI::TokenType::all},
180 resourceId{""},
181 permission{EjfatURI::TokenPermission::_read_only_} {}
182 TokenPermission(EjfatURI::TokenType rt, const std::string& rid, EjfatURI::TokenPermission pt):
183 resourceType{rt}, resourceId{rid}, permission{pt} {}
184 };
185
191 std::string name;
192 std::vector<TokenPermission> permissions;
193 std::string created_at;
194 uint32_t id;
195
196 TokenDetails(): name{""}, created_at{""}, id{0} {}
197 };
198
203 using TokenSelector = std::variant<uint32_t, std::string>;
204
206 {
207 private:
208 EjfatURI _cpuri;
209 std::string addr_string;
210 std::unique_ptr<LoadBalancer::Stub> _stub;
211 std::shared_ptr<grpc::Channel> _channel;
212
213 protected:
214 public:
226 LBManager(const EjfatURI &cpuri, bool validateServer = true, bool useHostAddress = false,
227 grpc::SslCredentialsOptions opts = grpc::SslCredentialsOptions()) : _cpuri(cpuri)
228 {
229
230 auto cp_host_r = cpuri.get_cpHost();
231
232 // using host address automatically disables cert validation
233 if (useHostAddress)
234 validateServer = false;
235
236 if (!useHostAddress && !cp_host_r.has_error())
237 {
238 // try hostname
239 auto cp_host_v = cp_host_r.value();
240 addr_string = cp_host_v.first + ":"s + std::to_string(cp_host_v.second);
241 }
242 else
243 {
244 // try address
245 auto cp_addr_r = cpuri.get_cpAddr();
246 if (cp_addr_r.has_error())
247 throw E2SARException("Unable to initialize LBManager due to missing CP address in URI");
248 auto cp_addr_v = cp_addr_r.value();
249 if (cp_addr_v.first.is_v4())
250 addr_string = "ipv4:///" + cp_addr_v.first.to_string() + ":" + std::to_string(cp_addr_v.second);
251 else
252 addr_string = "ipv6:///[" + cp_addr_v.first.to_string() + "]:" + std::to_string(cp_addr_v.second);
253 }
254
255 if (cpuri.get_useTls())
256 {
257 grpc::experimental::TlsChannelCredentialsOptions topts;
258 if (!validateServer)
259 {
260 // disable most of server certificate validation
261 std::shared_ptr<grpc::experimental::NoOpCertificateVerifier> verifier = std::make_shared<grpc::experimental::NoOpCertificateVerifier>();
262 topts.set_verify_server_certs(false);
263 topts.set_check_call_host(false);
264 topts.set_certificate_verifier(verifier);
265 _channel = grpc::CreateChannel(addr_string, grpc::experimental::TlsCredentials(topts));
266 }
267 else
268 {
269 // use provided SSL options
270 _channel = grpc::CreateChannel(addr_string, grpc::SslCredentials(opts));
271 }
272 }
273 else
274 {
275 _channel = grpc::CreateChannel(addr_string, grpc::InsecureChannelCredentials());
276 }
277 _stub = LoadBalancer::NewStub(_channel);
278 }
279
290 result<u_int32_t> reserveLB(const std::string &lb_name,
291 const TimeUntil &until,
292 const std::vector<std::string> &senders,
293 int ip_family=loadbalancer::IpFamily::DUAL_STACK) noexcept;
294
307 result<u_int32_t> reserveLB(const std::string &lb_name,
308 const boost::posix_time::time_duration &duration,
309 const std::vector<std::string> &senders,
310 int ip_family=loadbalancer::IpFamily::DUAL_STACK) noexcept;
311
322 result<u_int32_t> reserveLB(const std::string &lb_name,
323 const double &durationSeconds,
324 const std::vector<std::string> &senders,
325 int ip_family=loadbalancer::IpFamily::DUAL_STACK) noexcept;
326
336 result<int> getLB(const std::string &lbid) noexcept;
341 result<int> getLB() noexcept;
342
353 result<std::unique_ptr<LoadBalancerStatusReply>> getLBStatus(const std::string &lbid) noexcept;
354
364 result<std::unique_ptr<LoadBalancerStatusReply>> getLBStatus() noexcept;
365
373 result<std::unique_ptr<OverviewReply>> overview() noexcept;
374
380 result<int> addSenders(const std::vector<std::string>& senders) noexcept;
381
386 result<int> removeSenders(const std::vector<std::string>& senders) noexcept;
387
392 result<int> addSenderSelf(bool v6=false) noexcept;
393
398 result<int> removeSenderSelf(bool v6=false) noexcept;
399
408 result<std::string> createToken(
409 const std::string &name,
410 const std::vector<TokenPermission> &permissions) noexcept;
411
419 result<TokenDetails> listTokenPermissions(const TokenSelector &target) noexcept;
420
428 result<std::vector<TokenDetails>> listChildTokens(const TokenSelector &target) noexcept;
429
437 result<int> revokeToken(const TokenSelector &target) noexcept;
438
447 result<TimeseriesResult> timeseries(const std::string &path, const Timestamp &since) noexcept;
448
457 static inline std::vector<WorkerStatus> get_WorkerStatusVector(std::unique_ptr<LoadBalancerStatusReply> &rep) noexcept
458 {
459 std::vector<WorkerStatus> ret(rep->workers_size());
460
461 size_t j{0};
462 for (auto i = rep->workers().begin(); i != rep->workers().end(); ++i, j++)
463 {
464 ret[j].CopyFrom(*i);
465 }
466 return ret;
467 }
468
477 static inline std::vector<WorkerStatus> get_WorkerStatusVector(const LoadBalancerStatusReply &rep) noexcept
478 {
479 std::vector<WorkerStatus> ret(rep.workers_size());
480
481 size_t j{0};
482 for (auto i = rep.workers().begin(); i != rep.workers().end(); ++i, j++)
483 {
484 ret[j].CopyFrom(*i);
485 }
486 return ret;
487 }
488
495 static inline std::vector<std::string> get_SenderAddressVector(std::unique_ptr<LoadBalancerStatusReply> &rep) noexcept
496 {
497 std::vector<std::string> ret(rep->senderaddresses_size());
498
499 size_t j{0};
500 for (auto i = rep->senderaddresses().begin(); i != rep->senderaddresses().end(); ++i, j++)
501 {
502 ret[j] = *i;
503 }
504 return ret;
505 }
506
513 static inline std::vector<std::string> get_SenderAddressVector(const LoadBalancerStatusReply &rep) noexcept
514 {
515 std::vector<std::string> ret(rep.senderaddresses_size());
516
517 size_t j{0};
518 for (auto i = rep.senderaddresses().begin(); i != rep.senderaddresses().end(); ++i, j++)
519 {
520 ret[j] = *i;
521 }
522 return ret;
523 }
524
526 static inline const std::unique_ptr<LBStatus> asLBStatus(std::unique_ptr<LoadBalancerStatusReply> &rep) noexcept
527 {
528 std::vector<std::string> addresses{get_SenderAddressVector(rep)};
529 std::vector<WorkerStatus> workers{get_WorkerStatusVector(rep)};
530 std::unique_ptr<LBStatus> pret = std::make_unique<LBStatus>(rep->timestamp(), rep->currentepoch(), rep->currentpredictedeventnumber(),
531 workers, addresses, rep->expiresat());
532 return pret;
533 }
534
536 static inline const LBStatus asLBStatus(const LoadBalancerStatusReply &rep) noexcept
537 {
538 std::vector<std::string> addresses{get_SenderAddressVector(rep)};
539 std::vector<WorkerStatus> workers{get_WorkerStatusVector(rep)};
540 return LBStatus(rep.timestamp(), rep.currentepoch(), rep.currentpredictedeventnumber(),
541 workers, addresses, rep.expiresat());
542 }
543
545 static inline const OverviewMessage asOverviewMessage(std::unique_ptr<OverviewReply> &rep) noexcept
546 {
547 size_t j{0};
548 OverviewMessage om(rep.get()->loadbalancers_size());
549 for (auto i = rep->loadbalancers().begin(); i != rep->loadbalancers().end(); ++i, j++)
550 {
551 om[j].name = i->name();
552 om[j].lbid = i->reservation().lbid();
553 om[j].syncIPv4AndPort.first = ip::make_address(i->reservation().syncipv4address());
554 om[j].syncIPv4AndPort.second = i->reservation().syncudpport();
555 om[j].syncIPv6AndPort.first = ip::make_address(i->reservation().syncipv6address());
556 om[j].syncIPv6AndPort.second = i->reservation().syncudpport();
557 om[j].dataIPv4 = ip::make_address(i->reservation().dataipv4address());
558 om[j].dataIPv6 = ip::make_address(i->reservation().dataipv6address());
559 om[j].fpgaLBId = i->reservation().fpgalbid();
560 om[j].dataMinPort = i->reservation().dataminport();
561 om[j].dataMaxPort = i->reservation().datamaxport();
562 om[j].status = asLBStatus(i->status());
563 }
564 return om;
565 }
566
568 static inline const OverviewMessage asOverviewMessage(const OverviewReply &rep) noexcept
569 {
570 size_t j{0};
571 OverviewMessage om(rep.loadbalancers_size());
572 for (auto i = rep.loadbalancers().begin(); i != rep.loadbalancers().end(); ++i, j++)
573 {
574 om[j].name = i->name();
575 om[j].lbid = i->reservation().lbid();
576 om[j].syncIPv4AndPort.first = ip::make_address(i->reservation().syncipv4address());
577 om[j].syncIPv4AndPort.second = i->reservation().syncudpport();
578 om[j].syncIPv6AndPort.first = ip::make_address(i->reservation().syncipv6address());
579 om[j].syncIPv6AndPort.second = i->reservation().syncudpport();
580 om[j].dataIPv4 = ip::make_address(i->reservation().dataipv4address());
581 om[j].dataIPv6 = ip::make_address(i->reservation().dataipv6address());
582 om[j].fpgaLBId = i->reservation().fpgalbid();
583 om[j].dataMinPort = i->reservation().dataminport();
584 om[j].dataMaxPort = i->reservation().datamaxport();
585 om[j].status = asLBStatus(i->status());
586 }
587 return om;
588 }
589
596 result<int> freeLB(const std::string &lbid) noexcept;
602 result<int> freeLB() noexcept;
603
621 result<int> registerWorker(const std::string &node_name, std::pair<ip::address, u_int16_t> node_ip_port, float weight, u_int16_t source_count,
622 float min_factor, float max_factor, bool keep_lb_header=false) noexcept;
623
643 result<int> registerWorkerSelf(const std::string &node_name, u_int16_t node_port, float weight, u_int16_t source_count, float min_factor, float max_factor,
644 bool v6=false, bool keep_lb_header=false) noexcept;
645
651 result<int> deregisterWorker() noexcept;
652
663 result<int> sendState(float fill_percent, float control_signal, bool is_ready,
664 const WorkerStats &stats) noexcept;
665
677 result<int> sendState(float fill_percent, float control_signal, bool is_ready, const Timestamp &ts,
678 const WorkerStats &stats) noexcept;
679
688 result<int> sendState(float fill_percent, float control_signal, bool is_ready) noexcept;
689
700 result<int> sendState(float fill_percent, float control_signal, bool is_ready, const Timestamp &ts) noexcept;
701
707 result<boost::tuple<std::string, std::string, std::string>> version() noexcept;
708
712 inline const EjfatURI &get_URI() const noexcept { return _cpuri; }
713
723 static inline result<grpc::SslCredentialsOptions> makeSslOptions(const std::string &pem_root_certs,
724 const std::string &pem_private_key,
725 const std::string &pem_cert_chain) noexcept
726 {
727 return grpc::SslCredentialsOptions{std::move(pem_root_certs),
728 std::move(pem_private_key),
729 std::move(pem_cert_chain)};
730 }
731
741 static result<grpc::SslCredentialsOptions> makeSslOptionsFromFiles(
742 std::string_view pem_root_certs,
743 std::string_view pem_private_key,
744 std::string_view pem_cert_chain) noexcept;
745
750 static result<grpc::SslCredentialsOptions> makeSslOptionsFromFiles(
751 std::string_view pem_root_certs) noexcept;
752
759 inline std::string get_AddrString() {
760 return addr_string;
761 }
762
763 };
764
772 static inline int get_PortRange(int source_count) noexcept
773 {
774 // Based on the proto file enum for the load balancer, seen below,
775 // map the max # of sources a backend will see to the PortRange value.
776 // This is necessay to provide the control plane when registering.
777
778 // Handle edge cases
779 if (source_count < 2)
780 {
781 return 0;
782 }
783 else if (source_count > 16384)
784 {
785 return 14;
786 }
787
788 int maxCount = 2;
789 int iteration = 1;
790
791 while (source_count > maxCount)
792 {
793 iteration++;
794 maxCount <<= 1;
795 }
796
797 return iteration;
798 }
799}
800#endif
Definition e2sarUtil.hpp:56
bool get_useTls() const
Definition e2sarUtil.hpp:152
const result< std::pair< std::string, u_int16_t > > get_cpHost() const
Definition e2sarUtil.hpp:273
Definition e2sarCP.hpp:206
static std::vector< std::string > get_SenderAddressVector(std::unique_ptr< LoadBalancerStatusReply > &rep) noexcept
Definition e2sarCP.hpp:495
LBManager(const EjfatURI &cpuri, bool validateServer=true, bool useHostAddress=false, grpc::SslCredentialsOptions opts=grpc::SslCredentialsOptions())
Definition e2sarCP.hpp:226
static std::vector< WorkerStatus > get_WorkerStatusVector(const LoadBalancerStatusReply &rep) noexcept
Definition e2sarCP.hpp:477
std::string get_AddrString()
Definition e2sarCP.hpp:759
static std::vector< std::string > get_SenderAddressVector(const LoadBalancerStatusReply &rep) noexcept
Definition e2sarCP.hpp:513
static const OverviewMessage asOverviewMessage(std::unique_ptr< OverviewReply > &rep) noexcept
Definition e2sarCP.hpp:545
static const std::unique_ptr< LBStatus > asLBStatus(std::unique_ptr< LoadBalancerStatusReply > &rep) noexcept
Definition e2sarCP.hpp:526
static const OverviewMessage asOverviewMessage(const OverviewReply &rep) noexcept
Definition e2sarCP.hpp:568
static result< grpc::SslCredentialsOptions > makeSslOptions(const std::string &pem_root_certs, const std::string &pem_private_key, const std::string &pem_cert_chain) noexcept
Definition e2sarCP.hpp:723
static const LBStatus asLBStatus(const LoadBalancerStatusReply &rep) noexcept
Definition e2sarCP.hpp:536
Definition e2sar.hpp:11
google::protobuf::Timestamp TimeUntil
Definition e2sarCP.hpp:56
std::variant< uint32_t, std::string > TokenSelector
Definition e2sarCP.hpp:203
Definition e2sarCP.hpp:125
Definition e2sarCP.hpp:136
Definition e2sarCP.hpp:82
LBStatus(google::protobuf::Timestamp ts, u_int64_t ce, u_int64_t penum, std::vector< WorkerStatus > &w, std::vector< std::string > &sa, google::protobuf::Timestamp exp)
Definition e2sarCP.hpp:93
Definition e2sarCP.hpp:106
Definition e2sarCP.hpp:147
Definition e2sarCP.hpp:159
Definition e2sarCP.hpp:190
Definition e2sarCP.hpp:173
Definition e2sarCP.hpp:68