From 268b497c0b32d35160ce1db3d71b2dc9a65e9316 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sat, 25 Apr 2020 23:11:50 +0300 Subject: [PATCH] Implement simple websocket client --- osd.h | 11 +- osd_cluster.cpp | 9 +- osd_http.cpp | 805 ++++++++++++++++++++++++++++++------------------ osd_http.h | 19 +- ringloop.cpp | 6 + ringloop.h | 17 + 6 files changed, 553 insertions(+), 314 deletions(-) diff --git a/osd.h b/osd.h index 4ea718a9..2030ed88 100644 --- a/osd.h +++ b/osd.h @@ -191,10 +191,10 @@ struct osd_wanted_peer_t struct http_response_t; +struct websocket_t; + class osd_t { - friend struct http_co_t; - // config blockstore_config_t config; @@ -219,7 +219,7 @@ class osd_t // peer OSDs - std::string etcd_lease_id; + std::string etcd_lease_id, etcd_watch_revision; std::map peer_states; std::map wanted_peers; bool loading_peer_config = false; @@ -267,6 +267,9 @@ class osd_t uint64_t subop_stat_count[2][OSD_OP_MAX+1] = { 0 }; // cluster connection + void http_request(std::string host, std::string request, bool streaming, std::function callback); + void http_request_json(std::string host, std::string request, std::function callback); + websocket_t* open_websocket(std::string host, std::string path, std::function callback); void etcd_call(std::string api, json11::Json payload, std::function callback); void etcd_txn(json11::Json txn, std::function callback); void parse_config(blockstore_config_t & config); @@ -297,8 +300,6 @@ class osd_t void send_replies(); void handle_send(ring_data_t *data, int peer_fd); void outbox_push(osd_client_t & cl, osd_op_t *op); - void http_request(std::string host, std::string request, bool streaming, std::function callback); - void http_request_json(std::string host, std::string request, std::function callback); // peer handling (primary OSD logic) void connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port, std::function callback); diff --git a/osd_cluster.cpp b/osd_cluster.cpp index a804f441..33537317 100644 --- a/osd_cluster.cpp +++ b/osd_cluster.cpp @@ -235,10 +235,11 @@ void osd_t::load_global_config() }); return; } - if (data["responses"][0]["response_range"]["kvs"].array_items().size() > 0) + etcd_watch_revision = data["header"]["revision"].string_value(); + if (data["kvs"].array_items().size() > 0) { - std::string key = base64_decode(data["responses"][0]["response_range"]["kvs"][0]["key"].string_value()); - std::string json_text = base64_decode(data["responses"][0]["response_range"]["kvs"][0]["value"].string_value()); + std::string key = base64_decode(data["kvs"][0]["key"].string_value()); + std::string json_text = base64_decode(data["kvs"][0]["value"].string_value()); std::string json_err; json11::Json value = json11::Json::parse(json_text, json_err); if (json_err != "") @@ -282,7 +283,7 @@ void osd_t::acquire_lease() etcd_lease_id = data["ID"].string_value(); create_state(); }); - printf("[OSD %lu] reporting to etcd at %s each %d seconds\n", this->osd_num, etcd_address.c_str(), etcd_report_interval); + printf("[OSD %lu] reporting to etcd at %s every %d seconds\n", this->osd_num, etcd_address.c_str(), etcd_report_interval); tfd->set_timer(etcd_report_interval*1000, true, [this](int timer_id) { renew_lease(); diff --git a/osd_http.cpp b/osd_http.cpp index 55095d9a..bb880f87 100644 --- a/osd_http.cpp +++ b/osd_http.cpp @@ -7,101 +7,75 @@ #include #include "osd.h" +#include "json11/json11.hpp" #include "osd_http.h" -static int extract_port(std::string & host) -{ - int port = 0; - int pos = 0; - if ((pos = host.find(':')) >= 0) - { - port = strtoull(host.c_str() + pos + 1, NULL, 10); - if (port >= 0x10000) - { - port = 0; - } - host = host.substr(0, pos); - } - return port; -} +#define READ_BUFFER_SIZE 9000 -std::vector getifaddr_list(bool include_v6) -{ - std::vector addresses; - ifaddrs *list, *ifa; - if (getifaddrs(&list) == -1) - { - throw std::runtime_error(std::string("getifaddrs: ") + strerror(errno)); - } - for (ifa = list; ifa != NULL; ifa = ifa->ifa_next) - { - if (!ifa->ifa_addr) - { - continue; - } - int family = ifa->ifa_addr->sa_family; - if ((family == AF_INET || family == AF_INET6 && include_v6) && - (ifa->ifa_flags & (IFF_UP | IFF_RUNNING | IFF_LOOPBACK)) == (IFF_UP | IFF_RUNNING)) - { - void *addr_ptr; - if (family == AF_INET) - addr_ptr = &((sockaddr_in *)ifa->ifa_addr)->sin_addr; - else - addr_ptr = &((sockaddr_in6 *)ifa->ifa_addr)->sin6_addr; - char addr[INET6_ADDRSTRLEN]; - if (!inet_ntop(family, addr_ptr, addr, INET6_ADDRSTRLEN)) - { - throw std::runtime_error(std::string("inet_ntop: ") + strerror(errno)); - } - addresses.push_back(std::string(addr)); - } - } - freeifaddrs(list); - return addresses; -} +static int extract_port(std::string & host); +static std::string ws_format_frame(int type, uint64_t size); +static bool ws_parse_frame(std::string & buf, int & type, std::string & res); struct http_co_t { - osd_t *osd; + ring_loop_t *ringloop; + timerfd_manager_t *tfd; + int epoll_fd; + std::map> *epoll_handlers; + + int request_timeout = 0; std::string host; std::string request; std::string response; - std::vector rbuf; - bool streaming; + bool want_streaming; - bool headers_received = false; http_response_t parsed; - int st = 0; + int state = 0; int peer_fd = -1; int timeout_id = -1; int epoll_events = 0; + ring_data_t *send_data = NULL, *read_data = NULL; int sent = 0; - iovec iov; - msghdr msg = { 0 }; - int cqe_res = 0; + std::vector rbuf; + iovec read_iov, send_iov; + msghdr read_msg = { 0 }, send_msg = { 0 }; + int waiting_read_sqe = 0, waiting_send_sqe = 0; std::function callback; - std::function epoll_handler; + + websocket_t ws; ~http_co_t(); - void resume(); + void connect(); + void handle_connect_result(); + void submit_read(); + void submit_send(); + void handle_read(); + void post_message(int type, const std::string & msg); }; +#define HTTP_CO_CONNECTING 1 +#define HTTP_CO_SENDING_REQUEST 2 +#define HTTP_CO_REQUEST_SENT 3 +#define HTTP_CO_HEADERS_RECEIVED 4 +#define HTTP_CO_WEBSOCKET 5 +#define HTTP_CO_STREAMING_CHUNKED 6 + void osd_t::http_request(std::string host, std::string request, bool streaming, std::function callback) { http_co_t *handler = new http_co_t(); - handler->osd = this; - handler->streaming = streaming; + handler->ringloop = ringloop; + handler->epoll_fd = epoll_fd; + handler->epoll_handlers = &epoll_handlers; + handler->request_timeout = http_request_timeout; + handler->tfd = tfd; + handler->want_streaming = streaming; handler->host = host; handler->request = request; handler->callback = callback; - handler->epoll_handler = [this, handler](int peer_fd, int epoll_events) - { - handler->epoll_events |= epoll_events; - handler->resume(); - }; - handler->resume(); + handler->ws.co = handler; + handler->connect(); } void osd_t::http_request_json(std::string host, std::string request, @@ -130,7 +104,363 @@ void osd_t::http_request_json(std::string host, std::string request, }); } -void parse_headers(std::string & res, http_response_t *parsed) +websocket_t* osd_t::open_websocket(std::string host, std::string path, std::function callback) +{ + std::string request = "GET "+path+" HTTP/1.1\r\n" + "Host: "+host+"\r\n" + "Upgrade: websocket\r\n" + "Connection: upgrade\r\n" + "Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==\r\n" + "Sec-WebSocket-Version: 13\r\n" + "\r\n"; + http_co_t *handler = new http_co_t(); + handler->ringloop = ringloop; + handler->epoll_fd = epoll_fd; + handler->epoll_handlers = &epoll_handlers; + handler->request_timeout = http_request_timeout; + handler->tfd = tfd; + handler->want_streaming = false; + handler->host = host; + handler->request = request; + handler->callback = callback; + handler->ws.co = handler; + handler->connect(); + return &handler->ws; +} + +void websocket_t::post_message(int type, const std::string & msg) +{ + co->post_message(type, msg); +} + +http_co_t::~http_co_t() +{ + if (timeout_id >= 0) + { + tfd->clear_timer(timeout_id); + timeout_id = -1; + } + if (read_data) + { + // Ignore CQE result + read_data->callback = [](ring_data_t *data) {}; + } + else if (waiting_read_sqe) + { + ringloop->cancel_wait_sqe(waiting_read_sqe); + } + if (send_data) + { + // Ignore CQE result + send_data->callback = [](ring_data_t *data) {}; + } + else if (waiting_send_sqe) + { + ringloop->cancel_wait_sqe(waiting_send_sqe); + } + if (peer_fd >= 0) + { + epoll_handlers->erase(peer_fd); + epoll_ctl(epoll_fd, EPOLL_CTL_DEL, peer_fd, NULL); + close(peer_fd); + peer_fd = -1; + } + if (parsed.headers["transfer-encoding"] == "chunked") + { + int prev = 0, pos = 0; + while ((pos = response.find("\r\n", prev)) >= prev) + { + uint64_t len = strtoull(response.c_str()+prev, NULL, 16); + parsed.body += response.substr(pos+2, len); + prev = pos+2+len+2; + } + } + else + { + std::swap(parsed.body, response); + } + parsed.eof = true; + callback(&parsed); +} + +void http_co_t::connect() +{ + int port = extract_port(host); + struct sockaddr_in addr; + int r; + if ((r = inet_pton(AF_INET, host.c_str(), &addr.sin_addr)) != 1) + { + parsed.error_code = ENXIO; + delete this; + return; + } + addr.sin_family = AF_INET; + addr.sin_port = htons(port ? port : 80); + peer_fd = socket(AF_INET, SOCK_STREAM, 0); + if (peer_fd < 0) + { + parsed.error_code = errno; + delete this; + return; + } + fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK); + if (request_timeout > 0) + { + timeout_id = tfd->set_timer(1000*request_timeout, false, [this](int timer_id) + { + if (response.length() == 0) + { + parsed.error_code = EIO; + } + delete this; + }); + } + (*epoll_handlers)[peer_fd] = [this](int peer_fd, int epoll_events) + { + this->epoll_events |= epoll_events; + if (state == HTTP_CO_CONNECTING) + { + handle_connect_result(); + } + else + { + if (epoll_events & EPOLLIN) + { + submit_read(); + } + else if (epoll_events & (EPOLLRDHUP|EPOLLERR)) + { + delete this; + } + } + }; + // Add FD to epoll (EPOLLOUT for tracking connect() result) + epoll_event ev; + ev.data.fd = peer_fd; + ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLET; + if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peer_fd, &ev) < 0) + { + parsed.error_code = errno; + delete this; + return; + } + epoll_events = 0; + // Finally call connect + r = ::connect(peer_fd, (sockaddr*)&addr, sizeof(addr)); + if (r < 0 && errno != EINPROGRESS) + { + parsed.error_code = errno; + delete this; + return; + } + state = HTTP_CO_CONNECTING; +} + +void http_co_t::handle_connect_result() +{ + if (epoll_events & (EPOLLOUT | EPOLLERR)) + { + int result = 0; + socklen_t result_len = sizeof(result); + if (getsockopt(peer_fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0) + { + result = errno; + } + if (result != 0) + { + parsed.error_code = result; + delete this; + return; + } + int one = 1; + setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); + state = HTTP_CO_SENDING_REQUEST; + submit_send(); + } + else + { + delete this; + } +} + +void http_co_t::submit_read() +{ + if (!read_data && !waiting_read_sqe) + { + if (rbuf.size() != READ_BUFFER_SIZE) + { + rbuf.resize(READ_BUFFER_SIZE); + } + io_uring_sqe *sqe = ringloop->get_sqe(); + if (!sqe) + { + waiting_read_sqe = ringloop->wait_sqe([this]() { waiting_read_sqe = 0; submit_read(); }); + return; + } + read_data = ((ring_data_t*)sqe->user_data); + read_iov = { .iov_base = rbuf.data(), .iov_len = READ_BUFFER_SIZE }; + read_msg.msg_iov = &read_iov; + read_msg.msg_iovlen = 1; + epoll_events = epoll_events & ~EPOLLIN; + read_data->callback = [this](ring_data_t *data) + { + read_data = NULL; + if (data->res == -EAGAIN) + { + data->res = 0; + } + if (data->res < 0) + { + delete this; + return; + } + response += std::string(rbuf.data(), data->res); + if (data->res == READ_BUFFER_SIZE) + { + submit_read(); + } + handle_read(); + if (data->res < READ_BUFFER_SIZE && (epoll_events & (EPOLLRDHUP|EPOLLERR))) + { + delete this; + return; + } + }; + my_uring_prep_recvmsg(sqe, peer_fd, &read_msg, 0); + } +} + +void http_co_t::submit_send() +{ + if (sent < request.size() && !send_data && !waiting_send_sqe) + { + io_uring_sqe *sqe = ringloop->get_sqe(); + if (!sqe) + { + waiting_send_sqe = ringloop->wait_sqe([this]() { waiting_send_sqe = 0; submit_send(); }); + return; + } + send_data = ((ring_data_t*)sqe->user_data); + send_iov = (iovec){ .iov_base = (void*)(request.c_str()+sent), .iov_len = request.size()-sent }; + send_msg.msg_iov = &send_iov; + send_msg.msg_iovlen = 1; + send_data->callback = [this](ring_data_t *data) + { + send_data = NULL; + if (data->res == -EAGAIN) + { + data->res = 0; + } + else if (data->res < 0) + { + delete this; + return; + } + sent += data->res; + if (state == HTTP_CO_SENDING_REQUEST) + { + if (sent >= request.size()) + state = HTTP_CO_REQUEST_SENT; + else + submit_send(); + } + else if (state == HTTP_CO_WEBSOCKET) + { + request = request.substr(sent); + sent = 0; + submit_send(); + } + }; + my_uring_prep_sendmsg(sqe, peer_fd, &send_msg, 0); + } +} + +void http_co_t::handle_read() +{ + if (state == HTTP_CO_REQUEST_SENT) + { + int pos = response.find("\r\n\r\n"); + if (pos >= 0) + { + if (timeout_id >= 0) + { + tfd->clear_timer(timeout_id); + timeout_id = -1; + } + state = HTTP_CO_HEADERS_RECEIVED; + parse_http_headers(response, &parsed); + if (parsed.status_code == 101 && + parsed.headers.find("sec-websocket-accept") != parsed.headers.end() && + parsed.headers["upgrade"] == "websocket" && + parsed.headers["connection"] == "upgrade") + { + // Don't care about validating the key + state = HTTP_CO_WEBSOCKET; + request = ""; + sent = 0; + } + else if (want_streaming && parsed.headers["transfer-encoding"] == "chunked") + { + state = HTTP_CO_STREAMING_CHUNKED; + } + } + } + if (state == HTTP_CO_STREAMING_CHUNKED && response.size() > 0) + { + int prev = 0, pos = 0; + while ((pos = response.find("\r\n", prev)) >= prev) + { + uint64_t len = strtoull(response.c_str()+prev, NULL, 16); + if (response.size() < pos+2+len+2) + { + break; + } + parsed.body += response.substr(pos+2, len); + prev = pos+2+len+2; + } + if (prev > 0) + { + response = response.substr(prev); + } + if (parsed.body.size() > 0) + { + callback(&parsed); + parsed.body = ""; + } + } + if (state == HTTP_CO_WEBSOCKET && response.size() > 0) + { + while (ws_parse_frame(response, parsed.ws_msg_type, parsed.body)) + { + callback(&parsed); + parsed.body = ""; + } + } +} + +void http_co_t::post_message(int type, const std::string & msg) +{ + request += ws_format_frame(type, msg.size()); + request += msg; + submit_send(); +} + +uint64_t stoull_full(const std::string & str, int base) +{ + if (isspace(str[0])) + { + return 0; + } + char *end = NULL; + uint64_t r = strtoull(str.c_str(), &end, base); + if (end != str.c_str()+str.length()) + { + return 0; + } + return r; +} + +void parse_http_headers(std::string & res, http_response_t *parsed) { int pos = res.find("\r\n"); pos = pos < 0 ? res.length() : pos+2; @@ -169,268 +499,137 @@ void parse_headers(std::string & res, http_response_t *parsed) } } -http_co_t::~http_co_t() +static std::string ws_format_frame(int type, uint64_t size) { - if (timeout_id >= 0) + // Always zero mask + std::string res; + int p = 0; + res.resize(2 + (size >= 126 ? 2 : 0) + (size >= 65536 ? 6 : 0) + /*mask*/4); + res[p++] = 0x80 | type; + if (size < 126) + res[p++] = size | /*mask*/0x80; + else if (size < 65536) { - osd->tfd->clear_timer(timeout_id); - timeout_id = -1; - } - if (parsed.headers["transfer-encoding"] == "chunked") - { - int prev = 0, pos = 0; - while ((pos = response.find("\r\n", prev)) >= prev) - { - uint64_t len = strtoull(response.c_str()+prev, NULL, 16); - parsed.body += response.substr(pos+2, len); - prev = pos+2+len+2; - } + res[p++] = 126 | /*mask*/0x80; + res[p++] = (size >> 8) & 0xFF; + res[p++] = (size >> 0) & 0xFF; } else { - std::swap(parsed.body, response); - } - parsed.eof = true; - callback(&parsed); - if (peer_fd >= 0) - { - osd->epoll_handlers.erase(peer_fd); - epoll_ctl(osd->epoll_fd, EPOLL_CTL_DEL, peer_fd, NULL); - close(peer_fd); - peer_fd = -1; + res[p++] = 127 | /*mask*/0x80; + res[p++] = (size >> 56) & 0xFF; + res[p++] = (size >> 48) & 0xFF; + res[p++] = (size >> 40) & 0xFF; + res[p++] = (size >> 32) & 0xFF; + res[p++] = (size >> 24) & 0xFF; + res[p++] = (size >> 16) & 0xFF; + res[p++] = (size >> 8) & 0xFF; + res[p++] = (size >> 0) & 0xFF; } + res[p++] = 0; + res[p++] = 0; + res[p++] = 0; + res[p++] = 0; + return res; } -void http_co_t::resume() +static bool ws_parse_frame(std::string & buf, int & type, std::string & res) { - if (st == 0) + uint64_t hdr = 2; + if (buf.size() < hdr) { - int port = extract_port(host); - struct sockaddr_in addr; - int r; - if ((r = inet_pton(AF_INET, host.c_str(), &addr.sin_addr)) != 1) - { - parsed.error_code = ENXIO; - delete this; - return; - } - addr.sin_family = AF_INET; - addr.sin_port = htons(port ? port : 80); - peer_fd = socket(AF_INET, SOCK_STREAM, 0); - if (peer_fd < 0) - { - parsed.error_code = errno; - delete this; - return; - } - fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK); - if (osd->http_request_timeout > 0) - { - timeout_id = osd->tfd->set_timer(1000*osd->http_request_timeout, false, [this](int timer_id) - { - if (response.length() == 0) - { - parsed.error_code = EIO; - } - delete this; - }); - } - r = connect(peer_fd, (sockaddr*)&addr, sizeof(addr)); - if (r < 0 && errno != EINPROGRESS) - { - parsed.error_code = errno; - delete this; - return; - } - osd->epoll_handlers[peer_fd] = epoll_handler; - // Add FD to epoll (EPOLLOUT for tracking connect() result) - epoll_event ev; - ev.data.fd = peer_fd; - ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLET; - if (epoll_ctl(osd->epoll_fd, EPOLL_CTL_ADD, peer_fd, &ev) < 0) - { - parsed.error_code = errno; - delete this; - return; - } - epoll_events = 0; - st = 1; - return; + return false; } - if (st == 1) + type = buf[0] & ~0x80; + bool mask = buf[1] & 0x80; + hdr += mask ? 4 : 0; + uint64_t len = (buf[1] & ~0x80); + if (len == 126) { - if (epoll_events & (EPOLLOUT | EPOLLERR)) + hdr += 2; + if (buf.size() < hdr) { - int result = 0; - socklen_t result_len = sizeof(result); - if (getsockopt(peer_fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0) - { - result = errno; - } - if (result != 0) - { - parsed.error_code = result; - delete this; - return; - } - int one = 1; - setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); - // Disable EPOLLOUT on this fd - epoll_event ev; - ev.data.fd = peer_fd; - ev.events = EPOLLIN | EPOLLRDHUP | EPOLLET; - if (epoll_ctl(osd->epoll_fd, EPOLL_CTL_MOD, peer_fd, &ev) < 0) - { - parsed.error_code = errno; - delete this; - return; - } - st = 2; - epoll_events = 0; - resume(); - return; - } - else if (epoll_events & EPOLLRDHUP) - { - delete this; - return; - } - else - { - return; + return false; } + len = ((uint64_t)buf[2] << 8) | ((uint64_t)buf[3] << 0); } - // Write data - if (st == 2) + else if (len == 127) { - io_uring_sqe *sqe = osd->ringloop->get_sqe(); - if (!sqe) - return; - ring_data_t* data = ((ring_data_t*)sqe->user_data); - iov = (iovec){ .iov_base = (void*)(request.c_str()+sent), .iov_len = request.size()-sent }; - msg.msg_iov = &iov; - msg.msg_iovlen = 1; - data->callback = [this](ring_data_t *data) + hdr += 8; + if (buf.size() < hdr) { - st = 4; - cqe_res = data->res; - resume(); - }; - my_uring_prep_sendmsg(sqe, peer_fd, &msg, 0); - st = 3; - return; + return false; + } + len = ((uint64_t)buf[2] << 56) | + ((uint64_t)buf[3] << 48) | + ((uint64_t)buf[4] << 40) | + ((uint64_t)buf[5] << 32) | + ((uint64_t)buf[6] << 24) | + ((uint64_t)buf[7] << 16) | + ((uint64_t)buf[8] << 8) | + ((uint64_t)buf[9] << 0); } - if (st == 3) + if (buf.size() < hdr+len) { - return; + return false; } - if (st == 4) + if (mask) { - if (cqe_res < 0 && cqe_res != -EAGAIN) - { - delete this; - return; - } - sent += cqe_res; - if (sent < request.size()) - st = 2; - else - st = 5; - resume(); - return; - } - // Read response - if (st == 5) - { - if (epoll_events & EPOLLIN) - { - if (rbuf.size() != 9000) - rbuf.resize(9000); - io_uring_sqe *sqe = osd->ringloop->get_sqe(); - if (!sqe) - return; - ring_data_t* data = ((ring_data_t*)sqe->user_data); - iov = { .iov_base = rbuf.data(), .iov_len = 9000 }; - msg.msg_iov = &iov; - msg.msg_iovlen = 1; - data->callback = [this](ring_data_t *data) - { - st = 7; - cqe_res = data->res; - resume(); - }; - my_uring_prep_recvmsg(sqe, peer_fd, &msg, 0); - st = 6; - epoll_events = epoll_events & ~EPOLLIN; - } - else if (epoll_events & (EPOLLRDHUP|EPOLLERR)) - { - delete this; - return; - } - } - if (st == 6) - { - return; - } - if (st == 7) - { - if (cqe_res < 0 && cqe_res != -EAGAIN) - { - delete this; - return; - } - response += std::string(rbuf.data(), cqe_res); - if (!headers_received) - { - int pos = response.find("\r\n\r\n"); - if (pos >= 0) - { - headers_received = true; - parse_headers(response, &parsed); - streaming = streaming && parsed.headers["transfer-encoding"] == "chunked"; - } - } - if (streaming && headers_received && response.size() > 0) - { - int prev = 0, pos = 0; - while ((pos = response.find("\r\n", prev)) >= prev) - { - uint64_t len = strtoull(response.c_str()+prev, NULL, 16); - if (response.size() < pos+2+len+2) - { - break; - } - parsed.body += response.substr(pos+2, len); - prev = pos+2+len+2; - } - if (prev > 0) - { - response = response.substr(prev); - } - if (parsed.body.size() > 0) - { - callback(&parsed); - parsed.body = ""; - } - } - st = 5; - resume(); - return; + for (int i = 0; i < len; i++) + buf[hdr+i] ^= buf[hdr-4+(i & 3)]; } + res += buf.substr(hdr, len); + buf = buf.substr(hdr+len); + return true; } -uint64_t stoull_full(const std::string & str, int base) +std::vector getifaddr_list(bool include_v6) { - if (isspace(str[0])) + std::vector addresses; + ifaddrs *list, *ifa; + if (getifaddrs(&list) == -1) { - return 0; + throw std::runtime_error(std::string("getifaddrs: ") + strerror(errno)); } - char *end = NULL; - uint64_t r = strtoull(str.c_str(), &end, base); - if (end != str.c_str()+str.length()) + for (ifa = list; ifa != NULL; ifa = ifa->ifa_next) { - return 0; + if (!ifa->ifa_addr) + { + continue; + } + int family = ifa->ifa_addr->sa_family; + if ((family == AF_INET || family == AF_INET6 && include_v6) && + (ifa->ifa_flags & (IFF_UP | IFF_RUNNING | IFF_LOOPBACK)) == (IFF_UP | IFF_RUNNING)) + { + void *addr_ptr; + if (family == AF_INET) + addr_ptr = &((sockaddr_in *)ifa->ifa_addr)->sin_addr; + else + addr_ptr = &((sockaddr_in6 *)ifa->ifa_addr)->sin6_addr; + char addr[INET6_ADDRSTRLEN]; + if (!inet_ntop(family, addr_ptr, addr, INET6_ADDRSTRLEN)) + { + throw std::runtime_error(std::string("inet_ntop: ") + strerror(errno)); + } + addresses.push_back(std::string(addr)); + } } - return r; + freeifaddrs(list); + return addresses; +} + +static int extract_port(std::string & host) +{ + int port = 0; + int pos = 0; + if ((pos = host.find(':')) >= 0) + { + port = strtoull(host.c_str() + pos + 1, NULL, 10); + if (port >= 0x10000) + { + port = 0; + } + host = host.substr(0, pos); + } + return port; } diff --git a/osd_http.h b/osd_http.h index 38534f5b..71c51317 100644 --- a/osd_http.h +++ b/osd_http.h @@ -2,7 +2,13 @@ #include #include #include -#include "json11/json11.hpp" + +#define WS_CONTINUATION 0 +#define WS_TEXT 1 +#define WS_BINARY 2 +#define WS_CLOSE 8 +#define WS_PING 9 +#define WS_PONG 10 struct http_response_t { @@ -11,9 +17,18 @@ struct http_response_t int status_code = 0; std::string status_line; std::map headers; + int ws_msg_type = -1; std::string body; }; -void parse_headers(std::string & res, http_response_t *parsed); +struct http_co_t; + +struct websocket_t +{ + http_co_t *co; + void post_message(int type, const std::string & msg); +}; + +void parse_http_headers(std::string & res, http_response_t *parsed); std::vector getifaddr_list(bool include_v6 = false); uint64_t stoull_full(const std::string & str, int base = 10); diff --git a/ringloop.cpp b/ringloop.cpp index fcafdc4b..19754a72 100644 --- a/ringloop.cpp +++ b/ringloop.cpp @@ -18,6 +18,7 @@ ring_loop_t::ring_loop_t(int qd) { free_ring_data[i] = i; } + wait_sqe_id = 1; } ring_loop_t::~ring_loop_t() @@ -64,6 +65,11 @@ void ring_loop_t::loop() free_ring_data[free_ring_data_ptr++] = d - ring_datas; io_uring_cqe_seen(&ring, cqe); } + while (get_sqe_queue.size() > 0) + { + (get_sqe_queue[0].second)(); + get_sqe_queue.erase(get_sqe_queue.begin()); + } do { loop_again = false; diff --git a/ringloop.h b/ringloop.h index 86e75274..ed1b915b 100644 --- a/ringloop.h +++ b/ringloop.h @@ -118,9 +118,11 @@ struct ring_consumer_t class ring_loop_t { + std::vector>> get_sqe_queue; std::vector consumers; struct ring_data_t *ring_datas; int *free_ring_data; + int wait_sqe_id; unsigned free_ring_data_ptr; bool loop_again; struct io_uring ring; @@ -139,6 +141,21 @@ public: io_uring_sqe_set_data(sqe, ring_datas + free_ring_data[--free_ring_data_ptr]); return sqe; } + inline int wait_sqe(std::function cb) + { + get_sqe_queue.push_back({ wait_sqe_id, cb }); + return wait_sqe_id++; + } + inline void cancel_wait_sqe(int wait_id) + { + for (int i = 0; i < get_sqe_queue.size(); i++) + { + if (get_sqe_queue[i].first == wait_id) + { + get_sqe_queue.erase(get_sqe_queue.begin()+i, get_sqe_queue.begin()+i+1); + } + } + } inline int submit() { return io_uring_submit(&ring);