6#include <boost/url.hpp>
7#include <boost/algorithm/string.hpp>
8#include <boost/asio.hpp>
9#include <boost/chrono.hpp>
10#include <boost/thread.hpp>
11#include <boost/log/trivial.hpp>
12#include <boost/log/common.hpp>
13#include <boost/log/sinks.hpp>
14#include <boost/log/sources/logger.hpp>
15#include <boost/log/expressions.hpp>
16#include <boost/log/attributes.hpp>
17#include <boost/log/support/date_time.hpp>
18#include <boost/core/null_deleter.hpp>
19#include <boost/shared_ptr.hpp>
21#include "grpc/loadbalancer.grpc.pb.h"
23#include "e2sarError.hpp"
25using namespace boost::asio;
26using namespace std::string_literals;
27using namespace boost::log;
29#define BOOST_LOG_FLUSH() sink->flush()
30#define BOOST_MLL_START(PREF) { std::ostringstream PREF_ostr;
31#define BOOST_MLL_LOG(PREF) PREF_ostr
32#define BOOST_MLL_STOP(PREF) BOOST_LOG_SEV(lg, trivial::info) << PREF_ostr.str(); } BOOST_LOG_FLUSH();
34#define BOOST_LOG_INFO() BOOST_LOG_SEV(lg, trivial::info)
35#define BOOST_LOG_WARN() BOOST_LOG_SEV(lg, trivial::warning)
36#define BOOST_LOG_ERR() BOOST_LOG_SEV(lg, trivial::error)
43 typedef sinks::asynchronous_sink<sinks::text_ostream_backend> text_sink;
44 extern boost::shared_ptr<text_sink> sink;
45 extern sources::severity_logger_mt<trivial::severity_level> lg;
47 const u_int16_t DATAPLANE_PORT = 19522;
58 enum class TokenType: u_int16_t {
60 all=0, admin=1, load_balancer=admin, instance=2, reservation=instance, session=3, END
62 inline static constexpr size_t ttAsIdx(TokenType tt)
64 return static_cast<size_t>(tt);
66 inline static const std::string toString(TokenType tt)
69 case TokenType::all:
return "ALL"s;
70 case TokenType::admin:
return "LOAD_BALANCER"s;
71 case TokenType::instance:
return "RESERVATION"s;
72 case TokenType::session:
return "SESSION"s;
73 default:
return "UNKNOWN"s;
76 static const size_t tokenTypeCardinality =
static_cast<size_t>(TokenType::END);
79 enum class TokenPermission: u_int16_t {
81 _read_only_, _register_, _reserve_, _update_, END
83 inline static const std::string toString(TokenPermission tt)
86 case TokenPermission::_read_only_:
return "READ"s;
87 case TokenPermission::_register_:
return "REGISTER"s;
88 case TokenPermission::_reserve_:
return "RESERVE"s;
89 case TokenPermission::_update_:
return "UPDATE"s;
90 default:
return "UNKNOWN"s;
93 static const size_t tokenPermissionCardinality =
static_cast<size_t>(TokenPermission::END);
119 std::array<std::string, tokenTypeCardinality> tokensByType;
121 std::string sessionId;
124 ip::address dataAddrv4;
125 ip::address dataAddrv6;
127 ip::address syncAddr;
139 EjfatURI(
const std::string &uri, TokenType tt=TokenType::admin,
bool preferV6=
false);
158 inline void set_Token(
const std::string &t, TokenType tt)
160 tokensByType[ttAsIdx(tt)] = t;
166 tokensByType[ttAsIdx(TokenType::instance)] = t;
172 tokensByType[ttAsIdx(TokenType::session)] = t;
178 auto idx = ttAsIdx(TokenType::instance);
179 if (!tokensByType[idx].empty())
180 return tokensByType[idx];
182 return E2SARErrorInfo{E2SARErrorc::ParameterNotAvailable,
"Instance token not available"s};
188 auto idx = ttAsIdx(TokenType::session);
189 if (!tokensByType[idx].empty())
190 return tokensByType[idx];
192 return E2SARErrorInfo{E2SARErrorc::ParameterNotAvailable,
"Session token not available"s};
198 auto idx = ttAsIdx(TokenType::admin);
199 if (!tokensByType[idx].empty())
200 return tokensByType[idx];
202 return E2SARErrorInfo{E2SARErrorc::ParameterNotAvailable,
"Admin token not available"s};
238 if (a.first.is_v4()) {
239 dataAddrv4 = a.first;
243 dataAddrv6 = a.first;
267 inline const result<std::pair<ip::address, u_int16_t>>
get_cpAddr()
const
269 return std::pair<ip::address, u_int16_t>(cpAddr, cpPort);
273 inline const result<std::pair<std::string, u_int16_t>>
get_cpHost()
const
276 return std::pair<std::string, u_int16_t>(cpHost, cpPort);
278 return E2SARErrorInfo{E2SARErrorc::ParameterNotAvailable,
"Control plane hostname not available"s};
296 return haveDatav4 || haveDatav6;
306 inline const result<std::pair<ip::address, u_int16_t>>
get_dataAddrv4() const noexcept
309 return std::pair<ip::address, u_int16_t>(dataAddrv4, dataPort);
310 return E2SARErrorInfo{E2SARErrorc::ParameterNotAvailable,
"Data plane address not available"s};
314 inline const result<std::pair<ip::address, u_int16_t>>
get_dataAddrv6() const noexcept
317 return std::pair<ip::address, u_int16_t>(dataAddrv6, dataPort);
318 return E2SARErrorInfo{E2SARErrorc::ParameterNotAvailable,
"Data plane address not available"s};
322 inline const result<std::pair<ip::address, u_int16_t>>
get_syncAddr() const noexcept
325 return std::pair<ip::address, u_int16_t>(syncAddr, syncPort);
326 return E2SARErrorInfo{E2SARErrorc::ParameterNotAvailable,
"Sync address not available"s};
332 operator std::string()
const;
334 const std::string to_string(TokenType tt = TokenType::admin)
const;
341 static inline result<EjfatURI>
getFromEnv(
const std::string &envVar =
"EJFAT_URI"s,
342 TokenType tt=TokenType::admin,
bool preferV6=
false) noexcept
344 const char *envStr = std::getenv(envVar.c_str());
345 if (envStr !=
nullptr)
349 return EjfatURI(envStr, tt, preferV6);
353 return E2SARErrorInfo{E2SARErrorc::CaughtException,
"Unable to parse EJFAT_URI from environment variable: "s +
static_cast<std::string
>(e)};
356 return E2SARErrorInfo{E2SARErrorc::Undefined,
"Environment variable "s + envVar +
" not defined."s};
365 TokenType tt=TokenType::admin,
bool preferV6=
false) noexcept
369 return EjfatURI(uriStr, tt, preferV6);
373 return E2SARErrorInfo{E2SARErrorc::CaughtException,
"Unable to parse URI from string: "s +
static_cast<std::string
>(e)};
382 static inline result<EjfatURI>
getFromFile(
const std::string &fileName =
"/tmp/ejfat_uri"s,
383 TokenType tt=TokenType::admin,
bool preferV6=
false) noexcept
385 if (!fileName.empty())
387 std::ifstream file(fileName);
391 if (std::getline(file, uriLine))
396 return EjfatURI(uriLine, tt, preferV6);
400 return E2SARErrorInfo{E2SARErrorc::CaughtException,
"Unable to parse URI: "s +
static_cast<std::string
>(e)};
404 return E2SARErrorInfo{E2SARErrorc::Undefined,
"Unable to parse URI."s};
407 return E2SARErrorInfo{E2SARErrorc::NotFound,
"Unable to find file "s + fileName};
421 static inline const result<ip::address> string_to_ip(const std::
string &addr) noexcept
430 ip::make_address(addr.substr(1, addr.length() - 2));
436 return ip::make_address(addr.substr(1, addr.length() - 2));
439 return ip::make_address(addr);
441 catch (boost::system::system_error &e)
443 return E2SARErrorInfo{E2SARErrorc::ParameterError,
"Unable to convert IP address from "s + addr};
450 static inline const result<u_int16_t> string_to_port(
const std::string &port_string)
noexcept
454 u_int16_t port = std::stoi(port_string);
455 if (port < 1024 || port > 65535)
458 return E2SARErrorInfo{E2SARErrorc::OutOfRange,
"Port value "s + port_string +
" is out of range"s};
462 catch (
const std::exception &e)
464 return E2SARErrorInfo{E2SARErrorc::ParameterError,
"Unable to convert "s + port_string +
" to integer"s};
471 static inline const result<std::pair<ip::address, u_int16_t>> string_tuple_to_ip_and_port(
const std::string &t)
noexcept
474 auto const pos = t.find_last_of(
"]:");
477 if ((pos == std::string::npos) || (t[pos] ==
']'))
479 auto r1 = string_to_ip(t);
481 return std::pair<ip::address, u_int16_t>(r1.value(), 0);
483 return E2SARErrorInfo{E2SARErrorc::ParameterError,
"Unable to convert "s + t +
" to ip address and port"s};
487 auto r1 = string_to_ip(t.substr(0, pos));
488 auto r2 = string_to_port(t.substr(pos + 1));
490 return std::pair<ip::address, int>(r1.value(), r2.value());
491 return E2SARErrorInfo{E2SARErrorc::ParameterError,
"Unable to convert "s + t +
" to ip address and port"s};
500 static inline result<std::vector<ip::address>> resolveHost(
const std::string &host_name)
noexcept
503 std::vector<ip::address> addresses;
504 boost::asio::io_context io_context;
508 ip::udp::resolver resolver(io_context);
509 ip::udp::resolver::results_type results = resolver.resolve(host_name,
"443");
511 for(
auto i = results.begin(); i != results.end(); ++i)
513 ip::udp::endpoint endpoint = *i;
514 addresses.push_back(endpoint.address());
521 return E2SARErrorInfo{E2SARErrorc::NotFound,
"Unable to convert "s + host_name +
" to ip address"s};
527 std::size_t operator()(
const std::pair<u_int64_t, u_int16_t>& p)
const {
528 u_int64_t hash1 = p.first;
529 u_int64_t tmp = p.second;
530 u_int64_t hash2 = tmp | tmp << 16 | tmp << 32 | tmp << 48;
531 return hash1 ^ hash2;
536 bool operator()(
const std::pair<u_int64_t, u_int16_t>& lhs,
const std::pair<u_int64_t, u_int16_t>& rhs)
const {
537 return lhs.first == rhs.first && lhs.second == rhs.second;
549 static inline float clockEntropyTest(
int totalTests = 1000,
int sleepMs = 1)
551 std::vector<int_least64_t> points;
552 std::vector<int> bins(256, 0);
554 for (
int i = 0; i < totalTests; i++)
556 auto now = boost::chrono::system_clock::now();
557 auto nowUsec = boost::chrono::duration_cast<boost::chrono::microseconds>(now.time_since_epoch()).count();
558 bins[nowUsec & 0xff]++;
559 auto until = now + boost::chrono::milliseconds(sleepMs);
560 boost::this_thread::sleep_until(until);
565 for (
size_t i = 0; i < bins.size(); i++)
567 float prob =
static_cast<float>(bins[i])/(totalTests*1.0);
568 entropy += prob * std::log(prob);
572 entropy *= -1.0/std::log(2);
576 template<
typename Container>
577 std::string concatWithSeparator(
const Container& c,
const std::string& sep=
","s)
579 typename Container::const_iterator it = c.begin();
591 inline void busyWaitUsecs(
const boost::chrono::steady_clock::time_point &tp, int64_t usecs)
596 if (boost::chrono::duration_cast<boost::chrono::microseconds>(boost::chrono::high_resolution_clock::now()
597 - tp).count() > usecs)
602 using OptimizationsWord = u_int16_t;
619 inline static OptimizationsWord toWord(Code o)
621 return 1 <<
static_cast<int>(o);
623 inline static std::string toString(Code o)
627 case Code::none:
return "none"s;
628 case Code::sendmmsg:
return "sendmmsg";
629 case Code::liburing_recv:
return "liburing_recv";
630 case Code::liburing_send:
return "liburing_send";
635 inline static Code fromString(
const std::string& opt)
639 else if (opt ==
"sendmmsg"s)
640 return Code::sendmmsg;
641 else if (opt ==
"liburing_recv"s)
642 return Code::liburing_recv;
643 else if (opt ==
"liburing_send"s)
644 return Code::liburing_send;
645 return Code::unknown;
650 const static std::vector<std::string> availableAsStrings()
noexcept;
655 const static OptimizationsWord availableAsWord()
noexcept;
661 static result<int> select(std::vector<std::string>& opt)
noexcept;
667 static result <int> select(std::vector<Code> &opt)
noexcept;
672 const static std::vector<std::string> selectedAsStrings()
noexcept;
677 const static OptimizationsWord selectedAsWord()
noexcept;
682 const static std::vector<Code> selectedAsList()
noexcept;
687 const static bool isSelected(Code o)
noexcept;
690 static const std::vector<Optimizations::Code> available;
693 OptimizationsWord selected_optimizations;
Definition e2sarError.hpp:62
Definition e2sarUtil.hpp:56
EjfatURI(const std::string &uri, TokenType tt=TokenType::admin, bool preferV6=false)
Definition e2sarUtil.cpp:189
const result< std::pair< ip::address, u_int16_t > > get_syncAddr() const noexcept
Definition e2sarUtil.hpp:322
const result< std::string > get_SessionToken() const
Definition e2sarUtil.hpp:186
void set_lbName(const std::string &n)
Definition e2sarUtil.hpp:206
~EjfatURI()
Definition e2sarUtil.hpp:142
bool get_useTls() const
Definition e2sarUtil.hpp:152
const bool has_dataAddrv4() const
Definition e2sarUtil.hpp:282
void set_InstanceToken(const std::string &t)
Definition e2sarUtil.hpp:164
const result< std::pair< ip::address, u_int16_t > > get_dataAddrv6() const noexcept
Definition e2sarUtil.hpp:314
const std::string get_lbId() const
Definition e2sarUtil.hpp:255
const result< std::pair< std::string, u_int16_t > > get_cpHost() const
Definition e2sarUtil.hpp:273
static result< EjfatURI > getFromEnv(const std::string &envVar="EJFAT_URI"s, TokenType tt=TokenType::admin, bool preferV6=false) noexcept
Definition e2sarUtil.hpp:341
const bool has_syncAddr() const
Definition e2sarUtil.hpp:300
void set_sessionId(const std::string &i)
Definition e2sarUtil.hpp:218
const result< std::string > get_AdminToken() const
Definition e2sarUtil.hpp:196
void set_Token(const std::string &t, TokenType tt)
Definition e2sarUtil.hpp:158
const result< std::pair< ip::address, u_int16_t > > get_cpAddr() const
Definition e2sarUtil.hpp:267
const std::string get_sessionId() const
Definition e2sarUtil.hpp:261
void set_dataAddr(const std::pair< ip::address, u_int16_t > &a)
Definition e2sarUtil.hpp:236
const result< std::string > get_InstanceToken() const
Definition e2sarUtil.hpp:176
static result< EjfatURI > getFromFile(const std::string &fileName="/tmp/ejfat_uri"s, TokenType tt=TokenType::admin, bool preferV6=false) noexcept
Definition e2sarUtil.hpp:382
void set_SessionToken(const std::string &t)
Definition e2sarUtil.hpp:170
const bool has_dataAddrv6() const
Definition e2sarUtil.hpp:288
static result< EjfatURI > getFromString(const std::string &uriStr, TokenType tt=TokenType::admin, bool preferV6=false) noexcept
Definition e2sarUtil.hpp:364
result< std::vector< ip::address > > getDataplaneLocalAddresses(bool v6=false) noexcept
Definition e2sarUtil.cpp:386
const bool has_dataAddr() const
Definition e2sarUtil.hpp:294
const result< std::pair< ip::address, u_int16_t > > get_dataAddrv4() const noexcept
Definition e2sarUtil.hpp:306
void set_syncAddr(const std::pair< ip::address, u_int16_t > &a)
Definition e2sarUtil.hpp:226
const std::string get_lbName() const
Definition e2sarUtil.hpp:249
void set_lbId(const std::string &i)
Definition e2sarUtil.hpp:212
Definition e2sarUtil.hpp:607
std::string expandTilde(const std::string &path)
Definition e2sarUtil.cpp:418
void defineClogLogger()
Definition e2sarUtil.cpp:440
Definition e2sarError.hpp:42
Definition e2sarUtil.hpp:535
Definition e2sarUtil.hpp:526