From 35481925b1eeb2b14f8d5ced3830c611886da523 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sat, 25 Apr 2020 01:29:31 +0300 Subject: [PATCH] Implement very simple HTTP streaming to handle etcd watches --- osd.h | 4 +- osd_http.cpp | 108 ++++++++++++++++++++++++++++++++++----------------- osd_http.h | 6 ++- 3 files changed, 79 insertions(+), 39 deletions(-) diff --git a/osd.h b/osd.h index 22e8dbc7..4ea718a9 100644 --- a/osd.h +++ b/osd.h @@ -189,6 +189,8 @@ struct osd_wanted_peer_t int address_index; }; +struct http_response_t; + class osd_t { friend struct http_co_t; @@ -295,7 +297,7 @@ class osd_t void send_replies(); void handle_send(ring_data_t *data, int peer_fd); void outbox_push(osd_client_t & cl, osd_op_t *op); - void http_request(std::string host, std::string request, std::function callback); + void http_request(std::string host, std::string request, bool streaming, std::function callback); void http_request_json(std::string host, std::string request, std::function callback); // peer handling (primary OSD logic) diff --git a/osd_http.cpp b/osd_http.cpp index 83d705f5..55095d9a 100644 --- a/osd_http.cpp +++ b/osd_http.cpp @@ -6,8 +6,8 @@ #include -#include "osd_http.h" #include "osd.h" +#include "osd_http.h" static int extract_port(std::string & host) { @@ -67,28 +67,32 @@ struct http_co_t std::string request; std::string response; std::vector rbuf; + bool streaming; + + bool headers_received = false; + http_response_t parsed; int st = 0; int peer_fd = -1; int timeout_id = -1; int epoll_events = 0; - int code = 0; - int sent = 0, received = 0; + int sent = 0; iovec iov; msghdr msg = { 0 }; int cqe_res = 0; - std::function callback; + std::function callback; std::function epoll_handler; ~http_co_t(); void resume(); }; -void osd_t::http_request(std::string host, std::string request, std::function callback) +void osd_t::http_request(std::string host, std::string request, bool streaming, std::function callback) { http_co_t *handler = new http_co_t(); handler->osd = this; + handler->streaming = streaming; handler->host = host; handler->request = request; handler->callback = callback; @@ -103,14 +107,13 @@ void osd_t::http_request(std::string host, std::string request, std::function callback) { - http_request(host, request, [this, callback](int err, std::string txt) + http_request(host, request, false, [this, callback](const http_response_t* res) { - if (err != 0) + if (res->error_code != 0) { - callback("Error code: "+std::to_string(err)+" ("+std::string(strerror(err))+")", json11::Json()); + callback("Error code: "+std::to_string(res->error_code)+" ("+std::string(strerror(res->error_code))+")", json11::Json()); return; } - std::unique_ptr res(parse_http_response(txt)); if (res->status_code != 200) { callback("HTTP "+std::to_string(res->status_code)+" "+res->status_line+" body: "+res->body, json11::Json()); @@ -120,16 +123,15 @@ void osd_t::http_request_json(std::string host, std::string request, json11::Json data = json11::Json::parse(res->body, json_err); if (json_err != "") { - callback("Bad JSON: "+json_err+" (response: "+(res->body == "" ? txt : res->body)+")", json11::Json()); + callback("Bad JSON: "+json_err+" (response: "+res->body+")", json11::Json()); return; } callback(std::string(), data); }); } -http_response_t *parse_http_response(std::string res) +void parse_headers(std::string & res, http_response_t *parsed) { - http_response_t *parsed = new http_response_t(); int pos = res.find("\r\n"); pos = pos < 0 ? res.length() : pos+2; std::string status_line = res.substr(0, pos); @@ -139,6 +141,7 @@ http_response_t *parse_http_response(std::string res) if (status_text) { parsed->status_line = status_text; + // %ms = allocate a buffer free(status_text); status_text = NULL; } @@ -147,20 +150,7 @@ http_response_t *parse_http_response(std::string res) { if (pos == prev) { - if (parsed->headers["transfer-encoding"] == "chunked") - { - prev = pos+2; - while ((pos = res.find("\r\n", prev)) >= prev) - { - uint64_t len = strtoull(res.c_str()+prev, NULL, 16); - parsed->body += res.substr(pos+2, len); - prev = pos+2+len+2; - } - } - else - { - parsed->body = res.substr(pos+2); - } + res = res.substr(pos+2); break; } std::string header = res.substr(prev, pos-prev); @@ -177,7 +167,6 @@ http_response_t *parse_http_response(std::string res) } prev = pos+2; } - return parsed; } http_co_t::~http_co_t() @@ -187,7 +176,22 @@ http_co_t::~http_co_t() osd->tfd->clear_timer(timeout_id); timeout_id = -1; } - callback(code, response); + if (parsed.headers["transfer-encoding"] == "chunked") + { + int prev = 0, pos = 0; + while ((pos = response.find("\r\n", prev)) >= prev) + { + uint64_t len = strtoull(response.c_str()+prev, NULL, 16); + parsed.body += response.substr(pos+2, len); + prev = pos+2+len+2; + } + } + else + { + std::swap(parsed.body, response); + } + parsed.eof = true; + callback(&parsed); if (peer_fd >= 0) { osd->epoll_handlers.erase(peer_fd); @@ -206,7 +210,7 @@ void http_co_t::resume() int r; if ((r = inet_pton(AF_INET, host.c_str(), &addr.sin_addr)) != 1) { - code = ENXIO; + parsed.error_code = ENXIO; delete this; return; } @@ -215,7 +219,7 @@ void http_co_t::resume() peer_fd = socket(AF_INET, SOCK_STREAM, 0); if (peer_fd < 0) { - code = errno; + parsed.error_code = errno; delete this; return; } @@ -226,7 +230,7 @@ void http_co_t::resume() { if (response.length() == 0) { - code = EIO; + parsed.error_code = EIO; } delete this; }); @@ -234,7 +238,7 @@ void http_co_t::resume() r = connect(peer_fd, (sockaddr*)&addr, sizeof(addr)); if (r < 0 && errno != EINPROGRESS) { - code = errno; + parsed.error_code = errno; delete this; return; } @@ -245,7 +249,7 @@ void http_co_t::resume() ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLET; if (epoll_ctl(osd->epoll_fd, EPOLL_CTL_ADD, peer_fd, &ev) < 0) { - code = errno; + parsed.error_code = errno; delete this; return; } @@ -265,7 +269,7 @@ void http_co_t::resume() } if (result != 0) { - code = result; + parsed.error_code = result; delete this; return; } @@ -277,7 +281,7 @@ void http_co_t::resume() ev.events = EPOLLIN | EPOLLRDHUP | EPOLLET; if (epoll_ctl(osd->epoll_fd, EPOLL_CTL_MOD, peer_fd, &ev) < 0) { - code = errno; + parsed.error_code = errno; delete this; return; } @@ -377,7 +381,39 @@ void http_co_t::resume() return; } response += std::string(rbuf.data(), cqe_res); - received += cqe_res; + if (!headers_received) + { + int pos = response.find("\r\n\r\n"); + if (pos >= 0) + { + headers_received = true; + parse_headers(response, &parsed); + streaming = streaming && parsed.headers["transfer-encoding"] == "chunked"; + } + } + if (streaming && headers_received && response.size() > 0) + { + int prev = 0, pos = 0; + while ((pos = response.find("\r\n", prev)) >= prev) + { + uint64_t len = strtoull(response.c_str()+prev, NULL, 16); + if (response.size() < pos+2+len+2) + { + break; + } + parsed.body += response.substr(pos+2, len); + prev = pos+2+len+2; + } + if (prev > 0) + { + response = response.substr(prev); + } + if (parsed.body.size() > 0) + { + callback(&parsed); + parsed.body = ""; + } + } st = 5; resume(); return; diff --git a/osd_http.h b/osd_http.h index 7dbeedbd..38534f5b 100644 --- a/osd_http.h +++ b/osd_http.h @@ -6,12 +6,14 @@ struct http_response_t { - int status_code; + bool eof = false; + int error_code = 0; + int status_code = 0; std::string status_line; std::map headers; std::string body; }; -http_response_t *parse_http_response(std::string res); +void parse_headers(std::string & res, http_response_t *parsed); std::vector getifaddr_list(bool include_v6 = false); uint64_t stoull_full(const std::string & str, int base = 10);