forked from vitalif/vitastor
Rework HTTP client to use keepalive, move getifaddr_list to addr_util
parent
c3304bce27
commit
5473d5b4a2
|
@ -84,7 +84,8 @@ const etcd_tree = {
|
||||||
osd_ping_timeout: 5, // seconds. min: 1
|
osd_ping_timeout: 5, // seconds. min: 1
|
||||||
up_wait_retry_interval: 500, // ms. min: 50
|
up_wait_retry_interval: 500, // ms. min: 50
|
||||||
// osd
|
// osd
|
||||||
etcd_report_interval: 5,
|
etcd_report_interval: 5, // seconds
|
||||||
|
etcd_keepalive_interval: 10, // seconds, default is etcd_report_interval*2
|
||||||
run_primary: true,
|
run_primary: true,
|
||||||
osd_network: null, // "192.168.7.0/24" or an array of masks
|
osd_network: null, // "192.168.7.0/24" or an array of masks
|
||||||
bind_address: "0.0.0.0",
|
bind_address: "0.0.0.0",
|
||||||
|
|
|
@ -1,4 +1,7 @@
|
||||||
#include <arpa/inet.h>
|
#include <arpa/inet.h>
|
||||||
|
#include <net/if.h>
|
||||||
|
#include <sys/types.h>
|
||||||
|
#include <ifaddrs.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
|
|
||||||
|
@ -58,3 +61,128 @@ std::string addr_to_string(const sockaddr &addr)
|
||||||
throw std::runtime_error(std::string("inet_ntop: ") + strerror(errno));
|
throw std::runtime_error(std::string("inet_ntop: ") + strerror(errno));
|
||||||
return std::string(peer_str)+":"+std::to_string(port);
|
return std::string(peer_str)+":"+std::to_string(port);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool cidr_match(const in_addr &addr, const in_addr &net, uint8_t bits)
|
||||||
|
{
|
||||||
|
if (bits == 0)
|
||||||
|
{
|
||||||
|
// C99 6.5.7 (3): u32 << 32 is undefined behaviour
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return !((addr.s_addr ^ net.s_addr) & htonl(0xFFFFFFFFu << (32 - bits)));
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool cidr6_match(const in6_addr &address, const in6_addr &network, uint8_t bits)
|
||||||
|
{
|
||||||
|
const uint32_t *a = address.s6_addr32;
|
||||||
|
const uint32_t *n = network.s6_addr32;
|
||||||
|
int bits_whole, bits_incomplete;
|
||||||
|
bits_whole = bits >> 5; // number of whole u32
|
||||||
|
bits_incomplete = bits & 0x1F; // number of bits in incomplete u32
|
||||||
|
if (bits_whole && memcmp(a, n, bits_whole << 2))
|
||||||
|
return false;
|
||||||
|
if (bits_incomplete)
|
||||||
|
{
|
||||||
|
uint32_t mask = htonl((0xFFFFFFFFu) << (32 - bits_incomplete));
|
||||||
|
if ((a[bits_whole] ^ n[bits_whole]) & mask)
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct addr_mask_t
|
||||||
|
{
|
||||||
|
sa_family_t family;
|
||||||
|
in_addr ipv4;
|
||||||
|
in6_addr ipv6;
|
||||||
|
uint8_t bits;
|
||||||
|
};
|
||||||
|
|
||||||
|
std::vector<std::string> getifaddr_list(std::vector<std::string> mask_cfg, bool include_v6)
|
||||||
|
{
|
||||||
|
std::vector<addr_mask_t> masks;
|
||||||
|
for (auto mask: mask_cfg)
|
||||||
|
{
|
||||||
|
unsigned bits = 0;
|
||||||
|
int p = mask.find('/');
|
||||||
|
if (p != std::string::npos)
|
||||||
|
{
|
||||||
|
char null_byte = 0;
|
||||||
|
if (sscanf(mask.c_str()+p+1, "%u%c", &bits, &null_byte) != 1 || bits > 128)
|
||||||
|
{
|
||||||
|
throw std::runtime_error((include_v6 ? "Invalid IPv4 address mask: " : "Invalid IP address mask: ") + mask);
|
||||||
|
}
|
||||||
|
mask = mask.substr(0, p);
|
||||||
|
}
|
||||||
|
in_addr ipv4;
|
||||||
|
in6_addr ipv6;
|
||||||
|
if (inet_pton(AF_INET, mask.c_str(), &ipv4) == 1)
|
||||||
|
{
|
||||||
|
if (bits > 32)
|
||||||
|
{
|
||||||
|
throw std::runtime_error((include_v6 ? "Invalid IPv4 address mask: " : "Invalid IP address mask: ") + mask);
|
||||||
|
}
|
||||||
|
masks.push_back((addr_mask_t){ .family = AF_INET, .ipv4 = ipv4, .bits = (uint8_t)bits });
|
||||||
|
}
|
||||||
|
else if (include_v6 && inet_pton(AF_INET6, mask.c_str(), &ipv6) == 1)
|
||||||
|
{
|
||||||
|
masks.push_back((addr_mask_t){ .family = AF_INET6, .ipv6 = ipv6, .bits = (uint8_t)bits });
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
throw std::runtime_error((include_v6 ? "Invalid IPv4 address mask: " : "Invalid IP address mask: ") + mask);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
std::vector<std::string> addresses;
|
||||||
|
ifaddrs *list, *ifa;
|
||||||
|
if (getifaddrs(&list) == -1)
|
||||||
|
{
|
||||||
|
throw std::runtime_error(std::string("getifaddrs: ") + strerror(errno));
|
||||||
|
}
|
||||||
|
for (ifa = list; ifa != NULL; ifa = ifa->ifa_next)
|
||||||
|
{
|
||||||
|
if (!ifa->ifa_addr)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
int family = ifa->ifa_addr->sa_family;
|
||||||
|
if ((family == AF_INET || family == AF_INET6 && include_v6) &&
|
||||||
|
(ifa->ifa_flags & (IFF_UP | IFF_RUNNING | IFF_LOOPBACK)) == (IFF_UP | IFF_RUNNING))
|
||||||
|
{
|
||||||
|
void *addr_ptr;
|
||||||
|
if (family == AF_INET)
|
||||||
|
{
|
||||||
|
addr_ptr = &((sockaddr_in *)ifa->ifa_addr)->sin_addr;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
addr_ptr = &((sockaddr_in6 *)ifa->ifa_addr)->sin6_addr;
|
||||||
|
}
|
||||||
|
if (masks.size() > 0)
|
||||||
|
{
|
||||||
|
int i;
|
||||||
|
for (i = 0; i < masks.size(); i++)
|
||||||
|
{
|
||||||
|
if (masks[i].family == family && (family == AF_INET
|
||||||
|
? cidr_match(*(in_addr*)addr_ptr, masks[i].ipv4, masks[i].bits)
|
||||||
|
: cidr6_match(*(in6_addr*)addr_ptr, masks[i].ipv6, masks[i].bits)))
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (i >= masks.size())
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
char addr[INET6_ADDRSTRLEN];
|
||||||
|
if (!inet_ntop(family, addr_ptr, addr, INET6_ADDRSTRLEN))
|
||||||
|
{
|
||||||
|
throw std::runtime_error(std::string("inet_ntop: ") + strerror(errno));
|
||||||
|
}
|
||||||
|
addresses.push_back(std::string(addr));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
freeifaddrs(list);
|
||||||
|
return addresses;
|
||||||
|
}
|
||||||
|
|
|
@ -2,6 +2,8 @@
|
||||||
|
|
||||||
#include <sys/socket.h>
|
#include <sys/socket.h>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
bool string_to_addr(std::string str, bool parse_port, int default_port, struct sockaddr *addr);
|
bool string_to_addr(std::string str, bool parse_port, int default_port, struct sockaddr *addr);
|
||||||
std::string addr_to_string(const sockaddr &addr);
|
std::string addr_to_string(const sockaddr &addr);
|
||||||
|
std::vector<std::string> getifaddr_list(std::vector<std::string> mask_cfg = std::vector<std::string>(), bool include_v6 = false);
|
||||||
|
|
|
@ -5,6 +5,7 @@
|
||||||
#include "pg_states.h"
|
#include "pg_states.h"
|
||||||
#include "etcd_state_client.h"
|
#include "etcd_state_client.h"
|
||||||
#ifndef __MOCK__
|
#ifndef __MOCK__
|
||||||
|
#include "addr_util.h"
|
||||||
#include "http_client.h"
|
#include "http_client.h"
|
||||||
#include "base64.h"
|
#include "base64.h"
|
||||||
#endif
|
#endif
|
||||||
|
@ -25,9 +26,14 @@ etcd_state_client_t::~etcd_state_client_t()
|
||||||
#ifndef __MOCK__
|
#ifndef __MOCK__
|
||||||
if (etcd_watch_ws)
|
if (etcd_watch_ws)
|
||||||
{
|
{
|
||||||
etcd_watch_ws->close();
|
http_close(etcd_watch_ws);
|
||||||
etcd_watch_ws = NULL;
|
etcd_watch_ws = NULL;
|
||||||
}
|
}
|
||||||
|
if (keepalive_client)
|
||||||
|
{
|
||||||
|
http_close(keepalive_client);
|
||||||
|
keepalive_client = NULL;
|
||||||
|
}
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -74,14 +80,26 @@ void etcd_state_client_t::etcd_call(std::string api, json11::Json payload, int t
|
||||||
"Host: "+etcd_address+"\r\n"
|
"Host: "+etcd_address+"\r\n"
|
||||||
"Content-Type: application/json\r\n"
|
"Content-Type: application/json\r\n"
|
||||||
"Content-Length: "+std::to_string(req.size())+"\r\n"
|
"Content-Length: "+std::to_string(req.size())+"\r\n"
|
||||||
"Connection: close\r\n"
|
"Connection: keep-alive\r\n"
|
||||||
|
"Keep-Alive: timeout="+std::to_string(etcd_keepalive_interval)+"\r\n"
|
||||||
"\r\n"+req;
|
"\r\n"+req;
|
||||||
http_request_json(tfd, etcd_address, req, timeout, [this, cur_addr = selected_etcd_address, callback](std::string err, json11::Json data)
|
auto cb = [this, cur_addr = selected_etcd_address, callback](const http_response_t *response)
|
||||||
{
|
{
|
||||||
if (err != "" && cur_addr == selected_etcd_address)
|
std::string err;
|
||||||
selected_etcd_address = "";
|
json11::Json data;
|
||||||
|
response->parse_json_response(err, data);
|
||||||
|
if (err != "")
|
||||||
|
{
|
||||||
|
if (cur_addr == selected_etcd_address)
|
||||||
|
selected_etcd_address = "";
|
||||||
|
}
|
||||||
callback(err, data);
|
callback(err, data);
|
||||||
});
|
};
|
||||||
|
if (!keepalive_client)
|
||||||
|
{
|
||||||
|
keepalive_client = http_init(tfd);
|
||||||
|
}
|
||||||
|
http_request(keepalive_client, etcd_address, req, { .timeout = timeout, .keepalive = true }, cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
void etcd_state_client_t::add_etcd_url(std::string addr)
|
void etcd_state_client_t::add_etcd_url(std::string addr)
|
||||||
|
@ -155,6 +173,13 @@ void etcd_state_client_t::parse_config(const json11::Json & config)
|
||||||
this->etcd_prefix = "/"+this->etcd_prefix;
|
this->etcd_prefix = "/"+this->etcd_prefix;
|
||||||
}
|
}
|
||||||
this->log_level = config["log_level"].int64_value();
|
this->log_level = config["log_level"].int64_value();
|
||||||
|
this->etcd_keepalive_interval = config["etcd_keepalive_interval"].uint64_value();
|
||||||
|
if (this->etcd_keepalive_interval <= 0)
|
||||||
|
{
|
||||||
|
this->etcd_keepalive_interval = config["etcd_report_interval"].uint64_value() * 2;
|
||||||
|
if (this->etcd_keepalive_interval <= 0)
|
||||||
|
this->etcd_keepalive_interval = 10;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void etcd_state_client_t::pick_next_etcd()
|
void etcd_state_client_t::pick_next_etcd()
|
||||||
|
@ -200,7 +225,7 @@ void etcd_state_client_t::start_etcd_watcher()
|
||||||
ws_alive = 1;
|
ws_alive = 1;
|
||||||
if (etcd_watch_ws)
|
if (etcd_watch_ws)
|
||||||
{
|
{
|
||||||
etcd_watch_ws->close();
|
http_close(etcd_watch_ws);
|
||||||
etcd_watch_ws = NULL;
|
etcd_watch_ws = NULL;
|
||||||
}
|
}
|
||||||
etcd_watch_ws = open_websocket(tfd, etcd_address, etcd_api_path+"/watch", ETCD_SLOW_TIMEOUT,
|
etcd_watch_ws = open_websocket(tfd, etcd_address, etcd_api_path+"/watch", ETCD_SLOW_TIMEOUT,
|
||||||
|
@ -232,7 +257,7 @@ void etcd_state_client_t::start_etcd_watcher()
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Revisions before %lu were compacted by etcd, reloading state\n",
|
fprintf(stderr, "Revisions before %lu were compacted by etcd, reloading state\n",
|
||||||
data["result"]["compact_revision"].uint64_value());
|
data["result"]["compact_revision"].uint64_value());
|
||||||
etcd_watch_ws->close();
|
http_close(etcd_watch_ws);
|
||||||
etcd_watch_ws = NULL;
|
etcd_watch_ws = NULL;
|
||||||
etcd_watch_revision = 0;
|
etcd_watch_revision = 0;
|
||||||
on_reload_hook();
|
on_reload_hook();
|
||||||
|
@ -286,6 +311,7 @@ void etcd_state_client_t::start_etcd_watcher()
|
||||||
{
|
{
|
||||||
selected_etcd_address = "";
|
selected_etcd_address = "";
|
||||||
}
|
}
|
||||||
|
http_close(etcd_watch_ws);
|
||||||
etcd_watch_ws = NULL;
|
etcd_watch_ws = NULL;
|
||||||
if (etcd_watches_initialised == 0)
|
if (etcd_watches_initialised == 0)
|
||||||
{
|
{
|
||||||
|
@ -302,7 +328,7 @@ void etcd_state_client_t::start_etcd_watcher()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object {
|
http_post_message(etcd_watch_ws, WS_TEXT, json11::Json(json11::Json::object {
|
||||||
{ "create_request", json11::Json::object {
|
{ "create_request", json11::Json::object {
|
||||||
{ "key", base64_encode(etcd_prefix+"/config/") },
|
{ "key", base64_encode(etcd_prefix+"/config/") },
|
||||||
{ "range_end", base64_encode(etcd_prefix+"/config0") },
|
{ "range_end", base64_encode(etcd_prefix+"/config0") },
|
||||||
|
@ -311,7 +337,7 @@ void etcd_state_client_t::start_etcd_watcher()
|
||||||
{ "progress_notify", true },
|
{ "progress_notify", true },
|
||||||
} }
|
} }
|
||||||
}).dump());
|
}).dump());
|
||||||
etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object {
|
http_post_message(etcd_watch_ws, WS_TEXT, json11::Json(json11::Json::object {
|
||||||
{ "create_request", json11::Json::object {
|
{ "create_request", json11::Json::object {
|
||||||
{ "key", base64_encode(etcd_prefix+"/osd/state/") },
|
{ "key", base64_encode(etcd_prefix+"/osd/state/") },
|
||||||
{ "range_end", base64_encode(etcd_prefix+"/osd/state0") },
|
{ "range_end", base64_encode(etcd_prefix+"/osd/state0") },
|
||||||
|
@ -320,7 +346,7 @@ void etcd_state_client_t::start_etcd_watcher()
|
||||||
{ "progress_notify", true },
|
{ "progress_notify", true },
|
||||||
} }
|
} }
|
||||||
}).dump());
|
}).dump());
|
||||||
etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object {
|
http_post_message(etcd_watch_ws, WS_TEXT, json11::Json(json11::Json::object {
|
||||||
{ "create_request", json11::Json::object {
|
{ "create_request", json11::Json::object {
|
||||||
{ "key", base64_encode(etcd_prefix+"/pg/state/") },
|
{ "key", base64_encode(etcd_prefix+"/pg/state/") },
|
||||||
{ "range_end", base64_encode(etcd_prefix+"/pg/state0") },
|
{ "range_end", base64_encode(etcd_prefix+"/pg/state0") },
|
||||||
|
@ -329,7 +355,7 @@ void etcd_state_client_t::start_etcd_watcher()
|
||||||
{ "progress_notify", true },
|
{ "progress_notify", true },
|
||||||
} }
|
} }
|
||||||
}).dump());
|
}).dump());
|
||||||
etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object {
|
http_post_message(etcd_watch_ws, WS_TEXT, json11::Json(json11::Json::object {
|
||||||
{ "create_request", json11::Json::object {
|
{ "create_request", json11::Json::object {
|
||||||
{ "key", base64_encode(etcd_prefix+"/pg/history/") },
|
{ "key", base64_encode(etcd_prefix+"/pg/history/") },
|
||||||
{ "range_end", base64_encode(etcd_prefix+"/pg/history0") },
|
{ "range_end", base64_encode(etcd_prefix+"/pg/history0") },
|
||||||
|
@ -348,14 +374,14 @@ void etcd_state_client_t::start_etcd_watcher()
|
||||||
}
|
}
|
||||||
else if (!ws_alive)
|
else if (!ws_alive)
|
||||||
{
|
{
|
||||||
etcd_watch_ws->close();
|
http_close(etcd_watch_ws);
|
||||||
etcd_watch_ws = NULL;
|
etcd_watch_ws = NULL;
|
||||||
start_etcd_watcher();
|
start_etcd_watcher();
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
ws_alive = 0;
|
ws_alive = 0;
|
||||||
etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object {
|
http_post_message(etcd_watch_ws, WS_TEXT, json11::Json(json11::Json::object {
|
||||||
{ "progress_request", json11::Json::object { } }
|
{ "progress_request", json11::Json::object { } }
|
||||||
}).dump());
|
}).dump());
|
||||||
}
|
}
|
||||||
|
|
|
@ -71,7 +71,7 @@ struct inode_watch_t
|
||||||
inode_config_t cfg;
|
inode_config_t cfg;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct websocket_t;
|
struct http_co_t;
|
||||||
|
|
||||||
struct etcd_state_client_t
|
struct etcd_state_client_t
|
||||||
{
|
{
|
||||||
|
@ -82,10 +82,11 @@ protected:
|
||||||
std::string selected_etcd_address;
|
std::string selected_etcd_address;
|
||||||
std::vector<std::string> addresses_to_try;
|
std::vector<std::string> addresses_to_try;
|
||||||
std::vector<inode_watch_t*> watches;
|
std::vector<inode_watch_t*> watches;
|
||||||
websocket_t *etcd_watch_ws = NULL;
|
http_co_t *etcd_watch_ws = NULL, *keepalive_client = NULL;
|
||||||
int ws_keepalive_timer = -1;
|
int ws_keepalive_timer = -1;
|
||||||
int ws_alive = 0;
|
int ws_alive = 0;
|
||||||
uint64_t bs_block_size = DEFAULT_BLOCK_SIZE;
|
uint64_t bs_block_size = DEFAULT_BLOCK_SIZE;
|
||||||
|
int etcd_keepalive_interval = 10;
|
||||||
void add_etcd_url(std::string);
|
void add_etcd_url(std::string);
|
||||||
void pick_next_etcd();
|
void pick_next_etcd();
|
||||||
public:
|
public:
|
||||||
|
|
|
@ -4,9 +4,7 @@
|
||||||
#include <netinet/tcp.h>
|
#include <netinet/tcp.h>
|
||||||
#include <sys/epoll.h>
|
#include <sys/epoll.h>
|
||||||
|
|
||||||
#include <net/if.h>
|
|
||||||
#include <arpa/inet.h>
|
#include <arpa/inet.h>
|
||||||
#include <ifaddrs.h>
|
|
||||||
|
|
||||||
#include <ctype.h>
|
#include <ctype.h>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
|
@ -25,11 +23,12 @@
|
||||||
static std::string trim(const std::string & in);
|
static std::string trim(const std::string & in);
|
||||||
static std::string ws_format_frame(int type, uint64_t size);
|
static std::string ws_format_frame(int type, uint64_t size);
|
||||||
static bool ws_parse_frame(std::string & buf, int & type, std::string & res);
|
static bool ws_parse_frame(std::string & buf, int & type, std::string & res);
|
||||||
|
static void parse_http_headers(std::string & res, http_response_t *parsed);
|
||||||
|
|
||||||
// FIXME: Use keepalive
|
|
||||||
struct http_co_t
|
struct http_co_t
|
||||||
{
|
{
|
||||||
timerfd_manager_t *tfd;
|
timerfd_manager_t *tfd;
|
||||||
|
std::function<void(const http_response_t*)> response_callback;
|
||||||
|
|
||||||
int request_timeout = 0;
|
int request_timeout = 0;
|
||||||
std::string host;
|
std::string host;
|
||||||
|
@ -37,11 +36,12 @@ struct http_co_t
|
||||||
std::string ws_outbox;
|
std::string ws_outbox;
|
||||||
std::string response;
|
std::string response;
|
||||||
bool want_streaming;
|
bool want_streaming;
|
||||||
|
bool keepalive;
|
||||||
|
|
||||||
http_response_t parsed;
|
std::vector<std::function<void()>> keepalive_queue;
|
||||||
uint64_t target_response_size = 0;
|
|
||||||
|
|
||||||
int state = 0;
|
int state = 0;
|
||||||
|
std::string connected_host;
|
||||||
int peer_fd = -1;
|
int peer_fd = -1;
|
||||||
int timeout_id = -1;
|
int timeout_id = -1;
|
||||||
int epoll_events = 0;
|
int epoll_events = 0;
|
||||||
|
@ -49,10 +49,8 @@ struct http_co_t
|
||||||
std::vector<char> rbuf;
|
std::vector<char> rbuf;
|
||||||
iovec read_iov, send_iov;
|
iovec read_iov, send_iov;
|
||||||
msghdr read_msg = { 0 }, send_msg = { 0 };
|
msghdr read_msg = { 0 }, send_msg = { 0 };
|
||||||
|
http_response_t parsed;
|
||||||
std::function<void(const http_response_t*)> callback;
|
uint64_t target_response_size = 0;
|
||||||
|
|
||||||
websocket_t ws;
|
|
||||||
|
|
||||||
int onstack = 0;
|
int onstack = 0;
|
||||||
bool ended = false;
|
bool ended = false;
|
||||||
|
@ -62,65 +60,39 @@ struct http_co_t
|
||||||
inline void stackout() { onstack--; if (!onstack && ended) end(); }
|
inline void stackout() { onstack--; if (!onstack && ended) end(); }
|
||||||
inline void end() { ended = true; if (!onstack) { delete this; } }
|
inline void end() { ended = true; if (!onstack) { delete this; } }
|
||||||
void start_connection();
|
void start_connection();
|
||||||
|
void close_connection();
|
||||||
void handle_events();
|
void handle_events();
|
||||||
void handle_connect_result();
|
void handle_connect_result();
|
||||||
void submit_read();
|
void submit_read();
|
||||||
void submit_send();
|
void submit_send();
|
||||||
bool handle_read();
|
bool handle_read();
|
||||||
|
void fill_parsed_response();
|
||||||
void post_message(int type, const std::string & msg);
|
void post_message(int type, const std::string & msg);
|
||||||
|
void send_request(const std::string & host, const std::string & request,
|
||||||
|
const http_options_t & options, std::function<void(const http_response_t *response)> response_callback);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
#define HTTP_CO_CLOSED 0
|
||||||
#define HTTP_CO_CONNECTING 1
|
#define HTTP_CO_CONNECTING 1
|
||||||
#define HTTP_CO_SENDING_REQUEST 2
|
#define HTTP_CO_SENDING_REQUEST 2
|
||||||
#define HTTP_CO_REQUEST_SENT 3
|
#define HTTP_CO_REQUEST_SENT 3
|
||||||
#define HTTP_CO_HEADERS_RECEIVED 4
|
#define HTTP_CO_HEADERS_RECEIVED 4
|
||||||
#define HTTP_CO_WEBSOCKET 5
|
#define HTTP_CO_WEBSOCKET 5
|
||||||
#define HTTP_CO_CHUNKED 6
|
#define HTTP_CO_CHUNKED 6
|
||||||
|
#define HTTP_CO_KEEPALIVE 7
|
||||||
|
|
||||||
#define DEFAULT_TIMEOUT 5000
|
#define DEFAULT_TIMEOUT 5000
|
||||||
|
|
||||||
void http_request(timerfd_manager_t *tfd, const std::string & host, const std::string & request,
|
http_co_t *http_init(timerfd_manager_t *tfd)
|
||||||
const http_options_t & options, std::function<void(const http_response_t *response)> callback)
|
|
||||||
{
|
{
|
||||||
http_co_t *handler = new http_co_t();
|
http_co_t *handler = new http_co_t();
|
||||||
handler->request_timeout = options.timeout < 0 ? 0 : (options.timeout == 0 ? DEFAULT_TIMEOUT : options.timeout);
|
|
||||||
handler->want_streaming = options.want_streaming;
|
|
||||||
handler->tfd = tfd;
|
handler->tfd = tfd;
|
||||||
handler->host = host;
|
handler->state = HTTP_CO_CLOSED;
|
||||||
handler->request = request;
|
return handler;
|
||||||
handler->callback = callback;
|
|
||||||
handler->ws.co = handler;
|
|
||||||
handler->start_connection();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void http_request_json(timerfd_manager_t *tfd, const std::string & host, const std::string & request,
|
http_co_t* open_websocket(timerfd_manager_t *tfd, const std::string & host, const std::string & path,
|
||||||
int timeout, std::function<void(std::string, json11::Json r)> callback)
|
int timeout, std::function<void(const http_response_t *msg)> response_callback)
|
||||||
{
|
|
||||||
http_request(tfd, host, request, { .timeout = timeout }, [callback](const http_response_t* res)
|
|
||||||
{
|
|
||||||
if (res->error_code != 0)
|
|
||||||
{
|
|
||||||
callback("Error code: "+std::to_string(res->error_code)+" ("+std::string(strerror(res->error_code))+")", json11::Json());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (res->status_code != 200)
|
|
||||||
{
|
|
||||||
callback("HTTP "+std::to_string(res->status_code)+" "+res->status_line+" body: "+trim(res->body), json11::Json());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
std::string json_err;
|
|
||||||
json11::Json data = json11::Json::parse(res->body, json_err);
|
|
||||||
if (json_err != "")
|
|
||||||
{
|
|
||||||
callback("Bad JSON: "+json_err+" (response: "+trim(res->body)+")", json11::Json());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
callback(std::string(), data);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
websocket_t* open_websocket(timerfd_manager_t *tfd, const std::string & host, const std::string & path,
|
|
||||||
int timeout, std::function<void(const http_response_t *msg)> callback)
|
|
||||||
{
|
{
|
||||||
std::string request = "GET "+path+" HTTP/1.1\r\n"
|
std::string request = "GET "+path+" HTTP/1.1\r\n"
|
||||||
"Host: "+host+"\r\n"
|
"Host: "+host+"\r\n"
|
||||||
|
@ -130,40 +102,145 @@ websocket_t* open_websocket(timerfd_manager_t *tfd, const std::string & host, co
|
||||||
"Sec-WebSocket-Version: 13\r\n"
|
"Sec-WebSocket-Version: 13\r\n"
|
||||||
"\r\n";
|
"\r\n";
|
||||||
http_co_t *handler = new http_co_t();
|
http_co_t *handler = new http_co_t();
|
||||||
|
handler->tfd = tfd;
|
||||||
|
handler->state = HTTP_CO_CLOSED;
|
||||||
|
handler->host = host;
|
||||||
handler->request_timeout = timeout < 0 ? -1 : (timeout == 0 ? DEFAULT_TIMEOUT : timeout);
|
handler->request_timeout = timeout < 0 ? -1 : (timeout == 0 ? DEFAULT_TIMEOUT : timeout);
|
||||||
handler->want_streaming = false;
|
handler->want_streaming = false;
|
||||||
handler->tfd = tfd;
|
handler->keepalive = false;
|
||||||
handler->host = host;
|
|
||||||
handler->request = request;
|
handler->request = request;
|
||||||
handler->callback = callback;
|
handler->response_callback = response_callback;
|
||||||
handler->ws.co = handler;
|
|
||||||
handler->start_connection();
|
handler->start_connection();
|
||||||
return &handler->ws;
|
return handler;
|
||||||
}
|
}
|
||||||
|
|
||||||
void websocket_t::post_message(int type, const std::string & msg)
|
void http_request(http_co_t *handler, const std::string & host, const std::string & request,
|
||||||
|
const http_options_t & options, std::function<void(const http_response_t *response)> response_callback)
|
||||||
{
|
{
|
||||||
co->post_message(type, msg);
|
handler->send_request(host, request, options, response_callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
void websocket_t::close()
|
void http_co_t::send_request(const std::string & host, const std::string & request,
|
||||||
|
const http_options_t & options, std::function<void(const http_response_t *response)> response_callback)
|
||||||
{
|
{
|
||||||
co->end();
|
stackin();
|
||||||
|
if (state == HTTP_CO_WEBSOCKET)
|
||||||
|
{
|
||||||
|
stackout();
|
||||||
|
throw std::runtime_error("Attempt to send HTTP request into a websocket or chunked stream");
|
||||||
|
}
|
||||||
|
else if (state != HTTP_CO_KEEPALIVE && state != HTTP_CO_CLOSED)
|
||||||
|
{
|
||||||
|
keepalive_queue.push_back([this, host, request, options, response_callback]()
|
||||||
|
{
|
||||||
|
this->send_request(host, request, options, response_callback);
|
||||||
|
});
|
||||||
|
stackout();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this->request_timeout = options.timeout < 0 ? 0 : (options.timeout == 0 ? DEFAULT_TIMEOUT : options.timeout);
|
||||||
|
this->want_streaming = options.want_streaming;
|
||||||
|
this->keepalive = options.keepalive;
|
||||||
|
this->host = host;
|
||||||
|
this->request = request;
|
||||||
|
this->response = "";
|
||||||
|
this->sent = 0;
|
||||||
|
this->response_callback = response_callback;
|
||||||
|
if (state == HTTP_CO_KEEPALIVE && connected_host != host)
|
||||||
|
{
|
||||||
|
close_connection();
|
||||||
|
}
|
||||||
|
if (request_timeout > 0)
|
||||||
|
{
|
||||||
|
timeout_id = tfd->set_timer(request_timeout, false, [this](int timer_id)
|
||||||
|
{
|
||||||
|
stackin();
|
||||||
|
close_connection();
|
||||||
|
parsed = { .error = "HTTP request timed out" };
|
||||||
|
this->response_callback(&parsed);
|
||||||
|
this->response_callback = NULL;
|
||||||
|
stackout();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
if (state == HTTP_CO_KEEPALIVE)
|
||||||
|
{
|
||||||
|
state = HTTP_CO_SENDING_REQUEST;
|
||||||
|
submit_send();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
start_connection();
|
||||||
|
}
|
||||||
|
stackout();
|
||||||
|
}
|
||||||
|
|
||||||
|
void http_post_message(http_co_t *handler, int type, const std::string & msg)
|
||||||
|
{
|
||||||
|
handler->post_message(type, msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
void http_co_t::post_message(int type, const std::string & msg)
|
||||||
|
{
|
||||||
|
stackin();
|
||||||
|
if (state == HTTP_CO_WEBSOCKET)
|
||||||
|
{
|
||||||
|
request += ws_format_frame(type, msg.size());
|
||||||
|
request += msg;
|
||||||
|
submit_send();
|
||||||
|
}
|
||||||
|
else if (state == HTTP_CO_KEEPALIVE || state == HTTP_CO_CHUNKED)
|
||||||
|
{
|
||||||
|
throw std::runtime_error("Attempt to send websocket message on a regular HTTP connection");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ws_outbox += ws_format_frame(type, msg.size());
|
||||||
|
ws_outbox += msg;
|
||||||
|
}
|
||||||
|
stackout();
|
||||||
|
}
|
||||||
|
|
||||||
|
void http_close(http_co_t *handler)
|
||||||
|
{
|
||||||
|
handler->end();
|
||||||
|
}
|
||||||
|
|
||||||
|
void http_response_t::parse_json_response(std::string & error, json11::Json & r) const
|
||||||
|
{
|
||||||
|
if (this->error != "")
|
||||||
|
{
|
||||||
|
error = this->error;
|
||||||
|
r = json11::Json();
|
||||||
|
}
|
||||||
|
else if (status_code != 200)
|
||||||
|
{
|
||||||
|
error = "HTTP "+std::to_string(status_code)+" "+status_line+" body: "+trim(body);
|
||||||
|
r = json11::Json();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
std::string json_err;
|
||||||
|
json11::Json data = json11::Json::parse(body, json_err);
|
||||||
|
if (json_err != "")
|
||||||
|
{
|
||||||
|
error = "Bad JSON: "+json_err+" (response: "+trim(body)+")";
|
||||||
|
r = json11::Json();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
error = "";
|
||||||
|
r = data;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
http_co_t::~http_co_t()
|
http_co_t::~http_co_t()
|
||||||
{
|
{
|
||||||
if (timeout_id >= 0)
|
close_connection();
|
||||||
{
|
}
|
||||||
tfd->clear_timer(timeout_id);
|
|
||||||
timeout_id = -1;
|
void http_co_t::fill_parsed_response()
|
||||||
}
|
{
|
||||||
if (peer_fd >= 0)
|
|
||||||
{
|
|
||||||
tfd->set_fd_handler(peer_fd, false, NULL);
|
|
||||||
close(peer_fd);
|
|
||||||
peer_fd = -1;
|
|
||||||
}
|
|
||||||
if (parsed.headers["transfer-encoding"] == "chunked")
|
if (parsed.headers["transfer-encoding"] == "chunked")
|
||||||
{
|
{
|
||||||
int prev = 0, pos = 0;
|
int prev = 0, pos = 0;
|
||||||
|
@ -178,8 +255,24 @@ http_co_t::~http_co_t()
|
||||||
{
|
{
|
||||||
std::swap(parsed.body, response);
|
std::swap(parsed.body, response);
|
||||||
}
|
}
|
||||||
parsed.eof = true;
|
}
|
||||||
callback(&parsed);
|
|
||||||
|
void http_co_t::close_connection()
|
||||||
|
{
|
||||||
|
if (timeout_id >= 0)
|
||||||
|
{
|
||||||
|
tfd->clear_timer(timeout_id);
|
||||||
|
timeout_id = -1;
|
||||||
|
}
|
||||||
|
if (peer_fd >= 0)
|
||||||
|
{
|
||||||
|
tfd->set_fd_handler(peer_fd, false, NULL);
|
||||||
|
close(peer_fd);
|
||||||
|
peer_fd = -1;
|
||||||
|
}
|
||||||
|
state = HTTP_CO_CLOSED;
|
||||||
|
connected_host = "";
|
||||||
|
response = "";
|
||||||
}
|
}
|
||||||
|
|
||||||
void http_co_t::start_connection()
|
void http_co_t::start_connection()
|
||||||
|
@ -188,39 +281,31 @@ void http_co_t::start_connection()
|
||||||
struct sockaddr addr;
|
struct sockaddr addr;
|
||||||
if (!string_to_addr(host.c_str(), 1, 80, &addr))
|
if (!string_to_addr(host.c_str(), 1, 80, &addr))
|
||||||
{
|
{
|
||||||
parsed.error_code = ENXIO;
|
parsed = { .error = "Invalid address: "+host };
|
||||||
|
response_callback(&parsed);
|
||||||
|
response_callback = NULL;
|
||||||
stackout();
|
stackout();
|
||||||
end();
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
peer_fd = socket(addr.sa_family, SOCK_STREAM, 0);
|
peer_fd = socket(addr.sa_family, SOCK_STREAM, 0);
|
||||||
if (peer_fd < 0)
|
if (peer_fd < 0)
|
||||||
{
|
{
|
||||||
parsed.error_code = errno;
|
parsed = { .error = std::string("socket: ")+strerror(errno) };
|
||||||
|
response_callback(&parsed);
|
||||||
|
response_callback = NULL;
|
||||||
stackout();
|
stackout();
|
||||||
end();
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK);
|
fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK);
|
||||||
if (request_timeout > 0)
|
|
||||||
{
|
|
||||||
timeout_id = tfd->set_timer(request_timeout, false, [this](int timer_id)
|
|
||||||
{
|
|
||||||
if (response.length() == 0)
|
|
||||||
{
|
|
||||||
parsed.error_code = ETIME;
|
|
||||||
}
|
|
||||||
end();
|
|
||||||
});
|
|
||||||
}
|
|
||||||
epoll_events = 0;
|
epoll_events = 0;
|
||||||
// Finally call connect
|
// Finally call connect
|
||||||
int r = ::connect(peer_fd, (sockaddr*)&addr, sizeof(addr));
|
int r = ::connect(peer_fd, (sockaddr*)&addr, sizeof(addr));
|
||||||
if (r < 0 && errno != EINPROGRESS)
|
if (r < 0 && errno != EINPROGRESS)
|
||||||
{
|
{
|
||||||
parsed.error_code = errno;
|
parsed = { .error = std::string("connect: ")+strerror(errno) };
|
||||||
|
response_callback(&parsed);
|
||||||
|
response_callback = NULL;
|
||||||
stackout();
|
stackout();
|
||||||
end();
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
tfd->set_fd_handler(peer_fd, true, [this](int peer_fd, int epoll_events)
|
tfd->set_fd_handler(peer_fd, true, [this](int peer_fd, int epoll_events)
|
||||||
|
@ -228,6 +313,7 @@ void http_co_t::start_connection()
|
||||||
this->epoll_events |= epoll_events;
|
this->epoll_events |= epoll_events;
|
||||||
handle_events();
|
handle_events();
|
||||||
});
|
});
|
||||||
|
connected_host = host;
|
||||||
state = HTTP_CO_CONNECTING;
|
state = HTTP_CO_CONNECTING;
|
||||||
stackout();
|
stackout();
|
||||||
}
|
}
|
||||||
|
@ -250,7 +336,14 @@ void http_co_t::handle_events()
|
||||||
}
|
}
|
||||||
else if (epoll_events & (EPOLLRDHUP|EPOLLERR))
|
else if (epoll_events & (EPOLLRDHUP|EPOLLERR))
|
||||||
{
|
{
|
||||||
end();
|
close_connection();
|
||||||
|
if (response_callback != NULL)
|
||||||
|
{
|
||||||
|
parsed.eof = true;
|
||||||
|
response_callback(&parsed);
|
||||||
|
response_callback = NULL;
|
||||||
|
parsed = {};
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -269,9 +362,11 @@ void http_co_t::handle_connect_result()
|
||||||
}
|
}
|
||||||
if (result != 0)
|
if (result != 0)
|
||||||
{
|
{
|
||||||
parsed.error_code = result;
|
close_connection();
|
||||||
|
parsed = { .error = std::string("connect: ")+strerror(result) };
|
||||||
|
response_callback(&parsed);
|
||||||
|
response_callback = NULL;
|
||||||
stackout();
|
stackout();
|
||||||
end();
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
int one = 1;
|
int one = 1;
|
||||||
|
@ -286,6 +381,49 @@ void http_co_t::handle_connect_result()
|
||||||
stackout();
|
stackout();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void http_co_t::submit_send()
|
||||||
|
{
|
||||||
|
stackin();
|
||||||
|
int res;
|
||||||
|
again:
|
||||||
|
if (sent < request.size())
|
||||||
|
{
|
||||||
|
send_iov = (iovec){ .iov_base = (void*)(request.c_str()+sent), .iov_len = request.size()-sent };
|
||||||
|
send_msg.msg_iov = &send_iov;
|
||||||
|
send_msg.msg_iovlen = 1;
|
||||||
|
res = sendmsg(peer_fd, &send_msg, MSG_NOSIGNAL);
|
||||||
|
if (res < 0)
|
||||||
|
{
|
||||||
|
res = -errno;
|
||||||
|
}
|
||||||
|
if (res == -EAGAIN)
|
||||||
|
{
|
||||||
|
res = 0;
|
||||||
|
}
|
||||||
|
else if (res < 0)
|
||||||
|
{
|
||||||
|
stackout();
|
||||||
|
end();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
sent += res;
|
||||||
|
if (state == HTTP_CO_SENDING_REQUEST)
|
||||||
|
{
|
||||||
|
if (sent >= request.size())
|
||||||
|
state = HTTP_CO_REQUEST_SENT;
|
||||||
|
else
|
||||||
|
goto again;
|
||||||
|
}
|
||||||
|
else if (state == HTTP_CO_WEBSOCKET)
|
||||||
|
{
|
||||||
|
request = request.substr(sent);
|
||||||
|
sent = 0;
|
||||||
|
goto again;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
stackout();
|
||||||
|
}
|
||||||
|
|
||||||
void http_co_t::submit_read()
|
void http_co_t::submit_read()
|
||||||
{
|
{
|
||||||
stackin();
|
stackin();
|
||||||
|
@ -321,51 +459,6 @@ void http_co_t::submit_read()
|
||||||
stackout();
|
stackout();
|
||||||
}
|
}
|
||||||
|
|
||||||
void http_co_t::submit_send()
|
|
||||||
{
|
|
||||||
stackin();
|
|
||||||
int res;
|
|
||||||
again:
|
|
||||||
if (sent < request.size())
|
|
||||||
{
|
|
||||||
send_iov = (iovec){ .iov_base = (void*)(request.c_str()+sent), .iov_len = request.size()-sent };
|
|
||||||
send_msg.msg_iov = &send_iov;
|
|
||||||
send_msg.msg_iovlen = 1;
|
|
||||||
res = sendmsg(peer_fd, &send_msg, MSG_NOSIGNAL);
|
|
||||||
if (res < 0)
|
|
||||||
{
|
|
||||||
res = -errno;
|
|
||||||
}
|
|
||||||
if (res == -EAGAIN)
|
|
||||||
{
|
|
||||||
res = 0;
|
|
||||||
}
|
|
||||||
else if (res < 0)
|
|
||||||
{
|
|
||||||
stackout();
|
|
||||||
end();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
sent += res;
|
|
||||||
if (state == HTTP_CO_SENDING_REQUEST)
|
|
||||||
{
|
|
||||||
if (sent >= request.size())
|
|
||||||
{
|
|
||||||
state = HTTP_CO_REQUEST_SENT;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
goto again;
|
|
||||||
}
|
|
||||||
else if (state == HTTP_CO_WEBSOCKET)
|
|
||||||
{
|
|
||||||
request = request.substr(sent);
|
|
||||||
sent = 0;
|
|
||||||
goto again;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
stackout();
|
|
||||||
}
|
|
||||||
|
|
||||||
bool http_co_t::handle_read()
|
bool http_co_t::handle_read()
|
||||||
{
|
{
|
||||||
stackin();
|
stackin();
|
||||||
|
@ -376,6 +469,7 @@ bool http_co_t::handle_read()
|
||||||
{
|
{
|
||||||
if (timeout_id >= 0)
|
if (timeout_id >= 0)
|
||||||
{
|
{
|
||||||
|
// Timeout is cleared when headers are received
|
||||||
tfd->clear_timer(timeout_id);
|
tfd->clear_timer(timeout_id);
|
||||||
timeout_id = -1;
|
timeout_id = -1;
|
||||||
}
|
}
|
||||||
|
@ -403,20 +497,27 @@ bool http_co_t::handle_read()
|
||||||
if (!target_response_size)
|
if (!target_response_size)
|
||||||
{
|
{
|
||||||
// Sorry, unsupported response
|
// Sorry, unsupported response
|
||||||
|
close_connection();
|
||||||
|
parsed = { .error = "Response has neither Connection: close, nor Transfer-Encoding: chunked nor Content-Length headers" };
|
||||||
|
response_callback(&parsed);
|
||||||
|
response_callback = NULL;
|
||||||
stackout();
|
stackout();
|
||||||
end();
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
keepalive = false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (state == HTTP_CO_HEADERS_RECEIVED && target_response_size > 0 && response.size() >= target_response_size)
|
if (state == HTTP_CO_HEADERS_RECEIVED && target_response_size > 0 && response.size() >= target_response_size)
|
||||||
{
|
{
|
||||||
stackout();
|
fill_parsed_response();
|
||||||
end();
|
response_callback(&parsed);
|
||||||
return false;
|
parsed.eof = true;
|
||||||
}
|
}
|
||||||
if (state == HTTP_CO_CHUNKED && response.size() > 0)
|
else if (state == HTTP_CO_CHUNKED && response.size() > 0)
|
||||||
{
|
{
|
||||||
int prev = 0, pos = 0;
|
int prev = 0, pos = 0;
|
||||||
while ((pos = response.find("\r\n", prev)) >= prev)
|
while ((pos = response.find("\r\n", prev)) >= prev)
|
||||||
|
@ -439,55 +540,49 @@ bool http_co_t::handle_read()
|
||||||
{
|
{
|
||||||
response = response.substr(prev);
|
response = response.substr(prev);
|
||||||
}
|
}
|
||||||
if (parsed.eof)
|
if (want_streaming)
|
||||||
{
|
{
|
||||||
stackout();
|
// Streaming response
|
||||||
end();
|
response_callback(&parsed);
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (want_streaming && parsed.body.size() > 0)
|
|
||||||
{
|
|
||||||
if (!ended)
|
|
||||||
{
|
|
||||||
// Don't deliver additional events after close()
|
|
||||||
callback(&parsed);
|
|
||||||
}
|
|
||||||
parsed.body = "";
|
parsed.body = "";
|
||||||
}
|
}
|
||||||
|
if (parsed.eof && !want_streaming)
|
||||||
|
{
|
||||||
|
// Normal response
|
||||||
|
response_callback(&parsed);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (state == HTTP_CO_WEBSOCKET && response.size() > 0)
|
else if (state == HTTP_CO_WEBSOCKET && response.size() > 0)
|
||||||
{
|
{
|
||||||
while (ws_parse_frame(response, parsed.ws_msg_type, parsed.body))
|
while (ws_parse_frame(response, parsed.ws_msg_type, parsed.body))
|
||||||
{
|
{
|
||||||
if (!ended)
|
response_callback(&parsed);
|
||||||
{
|
|
||||||
// Don't deliver additional events after close()
|
|
||||||
callback(&parsed);
|
|
||||||
}
|
|
||||||
parsed.body = "";
|
parsed.body = "";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (parsed.eof)
|
||||||
|
{
|
||||||
|
response_callback = NULL;
|
||||||
|
parsed = {};
|
||||||
|
if (!keepalive)
|
||||||
|
{
|
||||||
|
close_connection();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
state = HTTP_CO_KEEPALIVE;
|
||||||
|
if (keepalive_queue.size() > 0)
|
||||||
|
{
|
||||||
|
auto next = keepalive_queue[0];
|
||||||
|
keepalive_queue.erase(keepalive_queue.begin(), keepalive_queue.begin()+1);
|
||||||
|
next();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
stackout();
|
stackout();
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void http_co_t::post_message(int type, const std::string & msg)
|
|
||||||
{
|
|
||||||
stackin();
|
|
||||||
if (state == HTTP_CO_WEBSOCKET)
|
|
||||||
{
|
|
||||||
request += ws_format_frame(type, msg.size());
|
|
||||||
request += msg;
|
|
||||||
submit_send();
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
ws_outbox += ws_format_frame(type, msg.size());
|
|
||||||
ws_outbox += msg;
|
|
||||||
}
|
|
||||||
stackout();
|
|
||||||
}
|
|
||||||
|
|
||||||
uint64_t stoull_full(const std::string & str, int base)
|
uint64_t stoull_full(const std::string & str, int base)
|
||||||
{
|
{
|
||||||
if (isspace(str[0]))
|
if (isspace(str[0]))
|
||||||
|
@ -503,7 +598,7 @@ uint64_t stoull_full(const std::string & str, int base)
|
||||||
return r;
|
return r;
|
||||||
}
|
}
|
||||||
|
|
||||||
void parse_http_headers(std::string & res, http_response_t *parsed)
|
static void parse_http_headers(std::string & res, http_response_t *parsed)
|
||||||
{
|
{
|
||||||
int pos = res.find("\r\n");
|
int pos = res.find("\r\n");
|
||||||
pos = pos < 0 ? res.length() : pos+2;
|
pos = pos < 0 ? res.length() : pos+2;
|
||||||
|
@ -625,136 +720,6 @@ static bool ws_parse_frame(std::string & buf, int & type, std::string & res)
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool cidr_match(const in_addr &addr, const in_addr &net, uint8_t bits)
|
|
||||||
{
|
|
||||||
if (bits == 0)
|
|
||||||
{
|
|
||||||
// C99 6.5.7 (3): u32 << 32 is undefined behaviour
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
return !((addr.s_addr ^ net.s_addr) & htonl(0xFFFFFFFFu << (32 - bits)));
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool cidr6_match(const in6_addr &address, const in6_addr &network, uint8_t bits)
|
|
||||||
{
|
|
||||||
const uint32_t *a = address.s6_addr32;
|
|
||||||
const uint32_t *n = network.s6_addr32;
|
|
||||||
int bits_whole, bits_incomplete;
|
|
||||||
bits_whole = bits >> 5; // number of whole u32
|
|
||||||
bits_incomplete = bits & 0x1F; // number of bits in incomplete u32
|
|
||||||
if (bits_whole && memcmp(a, n, bits_whole << 2))
|
|
||||||
return false;
|
|
||||||
if (bits_incomplete)
|
|
||||||
{
|
|
||||||
uint32_t mask = htonl((0xFFFFFFFFu) << (32 - bits_incomplete));
|
|
||||||
if ((a[bits_whole] ^ n[bits_whole]) & mask)
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
struct addr_mask_t
|
|
||||||
{
|
|
||||||
sa_family_t family;
|
|
||||||
in_addr ipv4;
|
|
||||||
in6_addr ipv6;
|
|
||||||
uint8_t bits;
|
|
||||||
};
|
|
||||||
|
|
||||||
std::vector<std::string> getifaddr_list(json11::Json mask_cfg, bool include_v6)
|
|
||||||
{
|
|
||||||
std::vector<addr_mask_t> masks;
|
|
||||||
if (mask_cfg.is_string())
|
|
||||||
{
|
|
||||||
mask_cfg = json11::Json::array{ mask_cfg };
|
|
||||||
}
|
|
||||||
for (auto mask_json: mask_cfg.array_items())
|
|
||||||
{
|
|
||||||
std::string mask = mask_json.string_value();
|
|
||||||
unsigned bits = 0;
|
|
||||||
int p = mask.find('/');
|
|
||||||
if (p != std::string::npos)
|
|
||||||
{
|
|
||||||
char null_byte = 0;
|
|
||||||
if (sscanf(mask.c_str()+p+1, "%u%c", &bits, &null_byte) != 1 || bits > 128)
|
|
||||||
{
|
|
||||||
throw std::runtime_error((include_v6 ? "Invalid IPv4 address mask: " : "Invalid IP address mask: ") + mask);
|
|
||||||
}
|
|
||||||
mask = mask.substr(0, p);
|
|
||||||
}
|
|
||||||
in_addr ipv4;
|
|
||||||
in6_addr ipv6;
|
|
||||||
if (inet_pton(AF_INET, mask.c_str(), &ipv4) == 1)
|
|
||||||
{
|
|
||||||
if (bits > 32)
|
|
||||||
{
|
|
||||||
throw std::runtime_error((include_v6 ? "Invalid IPv4 address mask: " : "Invalid IP address mask: ") + mask);
|
|
||||||
}
|
|
||||||
masks.push_back((addr_mask_t){ .family = AF_INET, .ipv4 = ipv4, .bits = (uint8_t)bits });
|
|
||||||
}
|
|
||||||
else if (include_v6 && inet_pton(AF_INET6, mask.c_str(), &ipv6) == 1)
|
|
||||||
{
|
|
||||||
masks.push_back((addr_mask_t){ .family = AF_INET6, .ipv6 = ipv6, .bits = (uint8_t)bits });
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
throw std::runtime_error((include_v6 ? "Invalid IPv4 address mask: " : "Invalid IP address mask: ") + mask);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
std::vector<std::string> addresses;
|
|
||||||
ifaddrs *list, *ifa;
|
|
||||||
if (getifaddrs(&list) == -1)
|
|
||||||
{
|
|
||||||
throw std::runtime_error(std::string("getifaddrs: ") + strerror(errno));
|
|
||||||
}
|
|
||||||
for (ifa = list; ifa != NULL; ifa = ifa->ifa_next)
|
|
||||||
{
|
|
||||||
if (!ifa->ifa_addr)
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
int family = ifa->ifa_addr->sa_family;
|
|
||||||
if ((family == AF_INET || family == AF_INET6 && include_v6) &&
|
|
||||||
(ifa->ifa_flags & (IFF_UP | IFF_RUNNING | IFF_LOOPBACK)) == (IFF_UP | IFF_RUNNING))
|
|
||||||
{
|
|
||||||
void *addr_ptr;
|
|
||||||
if (family == AF_INET)
|
|
||||||
{
|
|
||||||
addr_ptr = &((sockaddr_in *)ifa->ifa_addr)->sin_addr;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
addr_ptr = &((sockaddr_in6 *)ifa->ifa_addr)->sin6_addr;
|
|
||||||
}
|
|
||||||
if (masks.size() > 0)
|
|
||||||
{
|
|
||||||
int i;
|
|
||||||
for (i = 0; i < masks.size(); i++)
|
|
||||||
{
|
|
||||||
if (masks[i].family == family && (family == AF_INET
|
|
||||||
? cidr_match(*(in_addr*)addr_ptr, masks[i].ipv4, masks[i].bits)
|
|
||||||
: cidr6_match(*(in6_addr*)addr_ptr, masks[i].ipv6, masks[i].bits)))
|
|
||||||
{
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (i >= masks.size())
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
char addr[INET6_ADDRSTRLEN];
|
|
||||||
if (!inet_ntop(family, addr_ptr, addr, INET6_ADDRSTRLEN))
|
|
||||||
{
|
|
||||||
throw std::runtime_error(std::string("inet_ntop: ") + strerror(errno));
|
|
||||||
}
|
|
||||||
addresses.push_back(std::string(addr));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
freeifaddrs(list);
|
|
||||||
return addresses;
|
|
||||||
}
|
|
||||||
|
|
||||||
std::string strtolower(const std::string & in)
|
std::string strtolower(const std::string & in)
|
||||||
{
|
{
|
||||||
std::string s = in;
|
std::string s = in;
|
||||||
|
|
|
@ -21,41 +21,34 @@ struct http_options_t
|
||||||
{
|
{
|
||||||
int timeout;
|
int timeout;
|
||||||
bool want_streaming;
|
bool want_streaming;
|
||||||
|
bool keepalive;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct http_response_t
|
struct http_response_t
|
||||||
{
|
{
|
||||||
|
std::string error;
|
||||||
|
|
||||||
bool eof = false;
|
bool eof = false;
|
||||||
int error_code = 0;
|
|
||||||
int status_code = 0;
|
int status_code = 0;
|
||||||
std::string status_line;
|
std::string status_line;
|
||||||
std::map<std::string, std::string> headers;
|
std::map<std::string, std::string> headers;
|
||||||
int ws_msg_type = -1;
|
int ws_msg_type = -1;
|
||||||
std::string body;
|
std::string body;
|
||||||
|
|
||||||
|
void parse_json_response(std::string & error, json11::Json & r) const;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Opened websocket or keepalive HTTP connection
|
||||||
struct http_co_t;
|
struct http_co_t;
|
||||||
|
|
||||||
struct websocket_t
|
http_co_t* http_init(timerfd_manager_t *tfd);
|
||||||
{
|
http_co_t* open_websocket(timerfd_manager_t *tfd, const std::string & host, const std::string & path,
|
||||||
http_co_t *co;
|
int timeout, std::function<void(const http_response_t *msg)> on_message);
|
||||||
void post_message(int type, const std::string & msg);
|
void http_request(http_co_t *handler, const std::string & host, const std::string & request,
|
||||||
void close();
|
const http_options_t & options, std::function<void(const http_response_t *response)> response_callback);
|
||||||
};
|
void http_post_message(http_co_t *handler, int type, const std::string & msg);
|
||||||
|
void http_close(http_co_t *co);
|
||||||
void parse_http_headers(std::string & res, http_response_t *parsed);
|
|
||||||
|
|
||||||
std::vector<std::string> getifaddr_list(json11::Json mask_cfg = json11::Json(), bool include_v6 = true);
|
|
||||||
|
|
||||||
|
// Utils
|
||||||
uint64_t stoull_full(const std::string & str, int base = 10);
|
uint64_t stoull_full(const std::string & str, int base = 10);
|
||||||
|
|
||||||
std::string strtolower(const std::string & in);
|
std::string strtolower(const std::string & in);
|
||||||
|
|
||||||
void http_request(timerfd_manager_t *tfd, const std::string & host, const std::string & request,
|
|
||||||
const http_options_t & options, std::function<void(const http_response_t *response)> callback);
|
|
||||||
|
|
||||||
void http_request_json(timerfd_manager_t *tfd, const std::string & host, const std::string & request,
|
|
||||||
int timeout, std::function<void(std::string, json11::Json r)> callback);
|
|
||||||
|
|
||||||
websocket_t* open_websocket(timerfd_manager_t *tfd, const std::string & host, const std::string & path,
|
|
||||||
int timeout, std::function<void(const http_response_t *msg)> callback);
|
|
||||||
|
|
|
@ -6,6 +6,7 @@
|
||||||
#include "etcd_state_client.h"
|
#include "etcd_state_client.h"
|
||||||
#include "http_client.h"
|
#include "http_client.h"
|
||||||
#include "osd_rmw.h"
|
#include "osd_rmw.h"
|
||||||
|
#include "addr_util.h"
|
||||||
|
|
||||||
// Startup sequence:
|
// Startup sequence:
|
||||||
// Start etcd watcher -> Load global OSD configuration -> Bind socket -> Acquire lease -> Report&lock OSD state
|
// Start etcd watcher -> Load global OSD configuration -> Bind socket -> Acquire lease -> Report&lock OSD state
|
||||||
|
|
Loading…
Reference in New Issue