Implement simple websocket client
parent
35481925b1
commit
268b497c0b
11
osd.h
11
osd.h
|
@ -191,10 +191,10 @@ struct osd_wanted_peer_t
|
||||||
|
|
||||||
struct http_response_t;
|
struct http_response_t;
|
||||||
|
|
||||||
|
struct websocket_t;
|
||||||
|
|
||||||
class osd_t
|
class osd_t
|
||||||
{
|
{
|
||||||
friend struct http_co_t;
|
|
||||||
|
|
||||||
// config
|
// config
|
||||||
|
|
||||||
blockstore_config_t config;
|
blockstore_config_t config;
|
||||||
|
@ -219,7 +219,7 @@ class osd_t
|
||||||
|
|
||||||
// peer OSDs
|
// peer OSDs
|
||||||
|
|
||||||
std::string etcd_lease_id;
|
std::string etcd_lease_id, etcd_watch_revision;
|
||||||
std::map<osd_num_t, json11::Json> peer_states;
|
std::map<osd_num_t, json11::Json> peer_states;
|
||||||
std::map<osd_num_t, osd_wanted_peer_t> wanted_peers;
|
std::map<osd_num_t, osd_wanted_peer_t> wanted_peers;
|
||||||
bool loading_peer_config = false;
|
bool loading_peer_config = false;
|
||||||
|
@ -267,6 +267,9 @@ class osd_t
|
||||||
uint64_t subop_stat_count[2][OSD_OP_MAX+1] = { 0 };
|
uint64_t subop_stat_count[2][OSD_OP_MAX+1] = { 0 };
|
||||||
|
|
||||||
// cluster connection
|
// cluster connection
|
||||||
|
void http_request(std::string host, std::string request, bool streaming, std::function<void(const http_response_t *response)> callback);
|
||||||
|
void http_request_json(std::string host, std::string request, std::function<void(std::string, json11::Json data)> callback);
|
||||||
|
websocket_t* open_websocket(std::string host, std::string path, std::function<void(const http_response_t *msg)> callback);
|
||||||
void etcd_call(std::string api, json11::Json payload, std::function<void(std::string, json11::Json)> callback);
|
void etcd_call(std::string api, json11::Json payload, std::function<void(std::string, json11::Json)> callback);
|
||||||
void etcd_txn(json11::Json txn, std::function<void(std::string, json11::Json)> callback);
|
void etcd_txn(json11::Json txn, std::function<void(std::string, json11::Json)> callback);
|
||||||
void parse_config(blockstore_config_t & config);
|
void parse_config(blockstore_config_t & config);
|
||||||
|
@ -297,8 +300,6 @@ class osd_t
|
||||||
void send_replies();
|
void send_replies();
|
||||||
void handle_send(ring_data_t *data, int peer_fd);
|
void handle_send(ring_data_t *data, int peer_fd);
|
||||||
void outbox_push(osd_client_t & cl, osd_op_t *op);
|
void outbox_push(osd_client_t & cl, osd_op_t *op);
|
||||||
void http_request(std::string host, std::string request, bool streaming, std::function<void(const http_response_t *response)> callback);
|
|
||||||
void http_request_json(std::string host, std::string request, std::function<void(std::string, json11::Json data)> callback);
|
|
||||||
|
|
||||||
// peer handling (primary OSD logic)
|
// peer handling (primary OSD logic)
|
||||||
void connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port, std::function<void(osd_num_t, int)> callback);
|
void connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port, std::function<void(osd_num_t, int)> callback);
|
||||||
|
|
|
@ -235,10 +235,11 @@ void osd_t::load_global_config()
|
||||||
});
|
});
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (data["responses"][0]["response_range"]["kvs"].array_items().size() > 0)
|
etcd_watch_revision = data["header"]["revision"].string_value();
|
||||||
|
if (data["kvs"].array_items().size() > 0)
|
||||||
{
|
{
|
||||||
std::string key = base64_decode(data["responses"][0]["response_range"]["kvs"][0]["key"].string_value());
|
std::string key = base64_decode(data["kvs"][0]["key"].string_value());
|
||||||
std::string json_text = base64_decode(data["responses"][0]["response_range"]["kvs"][0]["value"].string_value());
|
std::string json_text = base64_decode(data["kvs"][0]["value"].string_value());
|
||||||
std::string json_err;
|
std::string json_err;
|
||||||
json11::Json value = json11::Json::parse(json_text, json_err);
|
json11::Json value = json11::Json::parse(json_text, json_err);
|
||||||
if (json_err != "")
|
if (json_err != "")
|
||||||
|
@ -282,7 +283,7 @@ void osd_t::acquire_lease()
|
||||||
etcd_lease_id = data["ID"].string_value();
|
etcd_lease_id = data["ID"].string_value();
|
||||||
create_state();
|
create_state();
|
||||||
});
|
});
|
||||||
printf("[OSD %lu] reporting to etcd at %s each %d seconds\n", this->osd_num, etcd_address.c_str(), etcd_report_interval);
|
printf("[OSD %lu] reporting to etcd at %s every %d seconds\n", this->osd_num, etcd_address.c_str(), etcd_report_interval);
|
||||||
tfd->set_timer(etcd_report_interval*1000, true, [this](int timer_id)
|
tfd->set_timer(etcd_report_interval*1000, true, [this](int timer_id)
|
||||||
{
|
{
|
||||||
renew_lease();
|
renew_lease();
|
||||||
|
|
805
osd_http.cpp
805
osd_http.cpp
|
@ -7,101 +7,75 @@
|
||||||
#include <ctype.h>
|
#include <ctype.h>
|
||||||
|
|
||||||
#include "osd.h"
|
#include "osd.h"
|
||||||
|
#include "json11/json11.hpp"
|
||||||
#include "osd_http.h"
|
#include "osd_http.h"
|
||||||
|
|
||||||
static int extract_port(std::string & host)
|
#define READ_BUFFER_SIZE 9000
|
||||||
{
|
|
||||||
int port = 0;
|
|
||||||
int pos = 0;
|
|
||||||
if ((pos = host.find(':')) >= 0)
|
|
||||||
{
|
|
||||||
port = strtoull(host.c_str() + pos + 1, NULL, 10);
|
|
||||||
if (port >= 0x10000)
|
|
||||||
{
|
|
||||||
port = 0;
|
|
||||||
}
|
|
||||||
host = host.substr(0, pos);
|
|
||||||
}
|
|
||||||
return port;
|
|
||||||
}
|
|
||||||
|
|
||||||
std::vector<std::string> getifaddr_list(bool include_v6)
|
static int extract_port(std::string & host);
|
||||||
{
|
static std::string ws_format_frame(int type, uint64_t size);
|
||||||
std::vector<std::string> addresses;
|
static bool ws_parse_frame(std::string & buf, int & type, std::string & res);
|
||||||
ifaddrs *list, *ifa;
|
|
||||||
if (getifaddrs(&list) == -1)
|
|
||||||
{
|
|
||||||
throw std::runtime_error(std::string("getifaddrs: ") + strerror(errno));
|
|
||||||
}
|
|
||||||
for (ifa = list; ifa != NULL; ifa = ifa->ifa_next)
|
|
||||||
{
|
|
||||||
if (!ifa->ifa_addr)
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
int family = ifa->ifa_addr->sa_family;
|
|
||||||
if ((family == AF_INET || family == AF_INET6 && include_v6) &&
|
|
||||||
(ifa->ifa_flags & (IFF_UP | IFF_RUNNING | IFF_LOOPBACK)) == (IFF_UP | IFF_RUNNING))
|
|
||||||
{
|
|
||||||
void *addr_ptr;
|
|
||||||
if (family == AF_INET)
|
|
||||||
addr_ptr = &((sockaddr_in *)ifa->ifa_addr)->sin_addr;
|
|
||||||
else
|
|
||||||
addr_ptr = &((sockaddr_in6 *)ifa->ifa_addr)->sin6_addr;
|
|
||||||
char addr[INET6_ADDRSTRLEN];
|
|
||||||
if (!inet_ntop(family, addr_ptr, addr, INET6_ADDRSTRLEN))
|
|
||||||
{
|
|
||||||
throw std::runtime_error(std::string("inet_ntop: ") + strerror(errno));
|
|
||||||
}
|
|
||||||
addresses.push_back(std::string(addr));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
freeifaddrs(list);
|
|
||||||
return addresses;
|
|
||||||
}
|
|
||||||
|
|
||||||
struct http_co_t
|
struct http_co_t
|
||||||
{
|
{
|
||||||
osd_t *osd;
|
ring_loop_t *ringloop;
|
||||||
|
timerfd_manager_t *tfd;
|
||||||
|
int epoll_fd;
|
||||||
|
std::map<int, std::function<void(int, int)>> *epoll_handlers;
|
||||||
|
|
||||||
|
int request_timeout = 0;
|
||||||
std::string host;
|
std::string host;
|
||||||
std::string request;
|
std::string request;
|
||||||
std::string response;
|
std::string response;
|
||||||
std::vector<char> rbuf;
|
bool want_streaming;
|
||||||
bool streaming;
|
|
||||||
|
|
||||||
bool headers_received = false;
|
|
||||||
http_response_t parsed;
|
http_response_t parsed;
|
||||||
|
|
||||||
int st = 0;
|
int state = 0;
|
||||||
int peer_fd = -1;
|
int peer_fd = -1;
|
||||||
int timeout_id = -1;
|
int timeout_id = -1;
|
||||||
int epoll_events = 0;
|
int epoll_events = 0;
|
||||||
|
ring_data_t *send_data = NULL, *read_data = NULL;
|
||||||
int sent = 0;
|
int sent = 0;
|
||||||
iovec iov;
|
std::vector<char> rbuf;
|
||||||
msghdr msg = { 0 };
|
iovec read_iov, send_iov;
|
||||||
int cqe_res = 0;
|
msghdr read_msg = { 0 }, send_msg = { 0 };
|
||||||
|
int waiting_read_sqe = 0, waiting_send_sqe = 0;
|
||||||
|
|
||||||
std::function<void(const http_response_t*)> callback;
|
std::function<void(const http_response_t*)> callback;
|
||||||
std::function<void(int, int)> epoll_handler;
|
|
||||||
|
websocket_t ws;
|
||||||
|
|
||||||
~http_co_t();
|
~http_co_t();
|
||||||
void resume();
|
void connect();
|
||||||
|
void handle_connect_result();
|
||||||
|
void submit_read();
|
||||||
|
void submit_send();
|
||||||
|
void handle_read();
|
||||||
|
void post_message(int type, const std::string & msg);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
#define HTTP_CO_CONNECTING 1
|
||||||
|
#define HTTP_CO_SENDING_REQUEST 2
|
||||||
|
#define HTTP_CO_REQUEST_SENT 3
|
||||||
|
#define HTTP_CO_HEADERS_RECEIVED 4
|
||||||
|
#define HTTP_CO_WEBSOCKET 5
|
||||||
|
#define HTTP_CO_STREAMING_CHUNKED 6
|
||||||
|
|
||||||
void osd_t::http_request(std::string host, std::string request, bool streaming, std::function<void(const http_response_t *response)> callback)
|
void osd_t::http_request(std::string host, std::string request, bool streaming, std::function<void(const http_response_t *response)> callback)
|
||||||
{
|
{
|
||||||
http_co_t *handler = new http_co_t();
|
http_co_t *handler = new http_co_t();
|
||||||
handler->osd = this;
|
handler->ringloop = ringloop;
|
||||||
handler->streaming = streaming;
|
handler->epoll_fd = epoll_fd;
|
||||||
|
handler->epoll_handlers = &epoll_handlers;
|
||||||
|
handler->request_timeout = http_request_timeout;
|
||||||
|
handler->tfd = tfd;
|
||||||
|
handler->want_streaming = streaming;
|
||||||
handler->host = host;
|
handler->host = host;
|
||||||
handler->request = request;
|
handler->request = request;
|
||||||
handler->callback = callback;
|
handler->callback = callback;
|
||||||
handler->epoll_handler = [this, handler](int peer_fd, int epoll_events)
|
handler->ws.co = handler;
|
||||||
{
|
handler->connect();
|
||||||
handler->epoll_events |= epoll_events;
|
|
||||||
handler->resume();
|
|
||||||
};
|
|
||||||
handler->resume();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void osd_t::http_request_json(std::string host, std::string request,
|
void osd_t::http_request_json(std::string host, std::string request,
|
||||||
|
@ -130,7 +104,363 @@ void osd_t::http_request_json(std::string host, std::string request,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
void parse_headers(std::string & res, http_response_t *parsed)
|
websocket_t* osd_t::open_websocket(std::string host, std::string path, std::function<void(const http_response_t *msg)> callback)
|
||||||
|
{
|
||||||
|
std::string request = "GET "+path+" HTTP/1.1\r\n"
|
||||||
|
"Host: "+host+"\r\n"
|
||||||
|
"Upgrade: websocket\r\n"
|
||||||
|
"Connection: upgrade\r\n"
|
||||||
|
"Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==\r\n"
|
||||||
|
"Sec-WebSocket-Version: 13\r\n"
|
||||||
|
"\r\n";
|
||||||
|
http_co_t *handler = new http_co_t();
|
||||||
|
handler->ringloop = ringloop;
|
||||||
|
handler->epoll_fd = epoll_fd;
|
||||||
|
handler->epoll_handlers = &epoll_handlers;
|
||||||
|
handler->request_timeout = http_request_timeout;
|
||||||
|
handler->tfd = tfd;
|
||||||
|
handler->want_streaming = false;
|
||||||
|
handler->host = host;
|
||||||
|
handler->request = request;
|
||||||
|
handler->callback = callback;
|
||||||
|
handler->ws.co = handler;
|
||||||
|
handler->connect();
|
||||||
|
return &handler->ws;
|
||||||
|
}
|
||||||
|
|
||||||
|
void websocket_t::post_message(int type, const std::string & msg)
|
||||||
|
{
|
||||||
|
co->post_message(type, msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
http_co_t::~http_co_t()
|
||||||
|
{
|
||||||
|
if (timeout_id >= 0)
|
||||||
|
{
|
||||||
|
tfd->clear_timer(timeout_id);
|
||||||
|
timeout_id = -1;
|
||||||
|
}
|
||||||
|
if (read_data)
|
||||||
|
{
|
||||||
|
// Ignore CQE result
|
||||||
|
read_data->callback = [](ring_data_t *data) {};
|
||||||
|
}
|
||||||
|
else if (waiting_read_sqe)
|
||||||
|
{
|
||||||
|
ringloop->cancel_wait_sqe(waiting_read_sqe);
|
||||||
|
}
|
||||||
|
if (send_data)
|
||||||
|
{
|
||||||
|
// Ignore CQE result
|
||||||
|
send_data->callback = [](ring_data_t *data) {};
|
||||||
|
}
|
||||||
|
else if (waiting_send_sqe)
|
||||||
|
{
|
||||||
|
ringloop->cancel_wait_sqe(waiting_send_sqe);
|
||||||
|
}
|
||||||
|
if (peer_fd >= 0)
|
||||||
|
{
|
||||||
|
epoll_handlers->erase(peer_fd);
|
||||||
|
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, peer_fd, NULL);
|
||||||
|
close(peer_fd);
|
||||||
|
peer_fd = -1;
|
||||||
|
}
|
||||||
|
if (parsed.headers["transfer-encoding"] == "chunked")
|
||||||
|
{
|
||||||
|
int prev = 0, pos = 0;
|
||||||
|
while ((pos = response.find("\r\n", prev)) >= prev)
|
||||||
|
{
|
||||||
|
uint64_t len = strtoull(response.c_str()+prev, NULL, 16);
|
||||||
|
parsed.body += response.substr(pos+2, len);
|
||||||
|
prev = pos+2+len+2;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
std::swap(parsed.body, response);
|
||||||
|
}
|
||||||
|
parsed.eof = true;
|
||||||
|
callback(&parsed);
|
||||||
|
}
|
||||||
|
|
||||||
|
void http_co_t::connect()
|
||||||
|
{
|
||||||
|
int port = extract_port(host);
|
||||||
|
struct sockaddr_in addr;
|
||||||
|
int r;
|
||||||
|
if ((r = inet_pton(AF_INET, host.c_str(), &addr.sin_addr)) != 1)
|
||||||
|
{
|
||||||
|
parsed.error_code = ENXIO;
|
||||||
|
delete this;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
addr.sin_family = AF_INET;
|
||||||
|
addr.sin_port = htons(port ? port : 80);
|
||||||
|
peer_fd = socket(AF_INET, SOCK_STREAM, 0);
|
||||||
|
if (peer_fd < 0)
|
||||||
|
{
|
||||||
|
parsed.error_code = errno;
|
||||||
|
delete this;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK);
|
||||||
|
if (request_timeout > 0)
|
||||||
|
{
|
||||||
|
timeout_id = tfd->set_timer(1000*request_timeout, false, [this](int timer_id)
|
||||||
|
{
|
||||||
|
if (response.length() == 0)
|
||||||
|
{
|
||||||
|
parsed.error_code = EIO;
|
||||||
|
}
|
||||||
|
delete this;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
(*epoll_handlers)[peer_fd] = [this](int peer_fd, int epoll_events)
|
||||||
|
{
|
||||||
|
this->epoll_events |= epoll_events;
|
||||||
|
if (state == HTTP_CO_CONNECTING)
|
||||||
|
{
|
||||||
|
handle_connect_result();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (epoll_events & EPOLLIN)
|
||||||
|
{
|
||||||
|
submit_read();
|
||||||
|
}
|
||||||
|
else if (epoll_events & (EPOLLRDHUP|EPOLLERR))
|
||||||
|
{
|
||||||
|
delete this;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
// Add FD to epoll (EPOLLOUT for tracking connect() result)
|
||||||
|
epoll_event ev;
|
||||||
|
ev.data.fd = peer_fd;
|
||||||
|
ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLET;
|
||||||
|
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peer_fd, &ev) < 0)
|
||||||
|
{
|
||||||
|
parsed.error_code = errno;
|
||||||
|
delete this;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
epoll_events = 0;
|
||||||
|
// Finally call connect
|
||||||
|
r = ::connect(peer_fd, (sockaddr*)&addr, sizeof(addr));
|
||||||
|
if (r < 0 && errno != EINPROGRESS)
|
||||||
|
{
|
||||||
|
parsed.error_code = errno;
|
||||||
|
delete this;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
state = HTTP_CO_CONNECTING;
|
||||||
|
}
|
||||||
|
|
||||||
|
void http_co_t::handle_connect_result()
|
||||||
|
{
|
||||||
|
if (epoll_events & (EPOLLOUT | EPOLLERR))
|
||||||
|
{
|
||||||
|
int result = 0;
|
||||||
|
socklen_t result_len = sizeof(result);
|
||||||
|
if (getsockopt(peer_fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0)
|
||||||
|
{
|
||||||
|
result = errno;
|
||||||
|
}
|
||||||
|
if (result != 0)
|
||||||
|
{
|
||||||
|
parsed.error_code = result;
|
||||||
|
delete this;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
int one = 1;
|
||||||
|
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
|
||||||
|
state = HTTP_CO_SENDING_REQUEST;
|
||||||
|
submit_send();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
delete this;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void http_co_t::submit_read()
|
||||||
|
{
|
||||||
|
if (!read_data && !waiting_read_sqe)
|
||||||
|
{
|
||||||
|
if (rbuf.size() != READ_BUFFER_SIZE)
|
||||||
|
{
|
||||||
|
rbuf.resize(READ_BUFFER_SIZE);
|
||||||
|
}
|
||||||
|
io_uring_sqe *sqe = ringloop->get_sqe();
|
||||||
|
if (!sqe)
|
||||||
|
{
|
||||||
|
waiting_read_sqe = ringloop->wait_sqe([this]() { waiting_read_sqe = 0; submit_read(); });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
read_data = ((ring_data_t*)sqe->user_data);
|
||||||
|
read_iov = { .iov_base = rbuf.data(), .iov_len = READ_BUFFER_SIZE };
|
||||||
|
read_msg.msg_iov = &read_iov;
|
||||||
|
read_msg.msg_iovlen = 1;
|
||||||
|
epoll_events = epoll_events & ~EPOLLIN;
|
||||||
|
read_data->callback = [this](ring_data_t *data)
|
||||||
|
{
|
||||||
|
read_data = NULL;
|
||||||
|
if (data->res == -EAGAIN)
|
||||||
|
{
|
||||||
|
data->res = 0;
|
||||||
|
}
|
||||||
|
if (data->res < 0)
|
||||||
|
{
|
||||||
|
delete this;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
response += std::string(rbuf.data(), data->res);
|
||||||
|
if (data->res == READ_BUFFER_SIZE)
|
||||||
|
{
|
||||||
|
submit_read();
|
||||||
|
}
|
||||||
|
handle_read();
|
||||||
|
if (data->res < READ_BUFFER_SIZE && (epoll_events & (EPOLLRDHUP|EPOLLERR)))
|
||||||
|
{
|
||||||
|
delete this;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
my_uring_prep_recvmsg(sqe, peer_fd, &read_msg, 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void http_co_t::submit_send()
|
||||||
|
{
|
||||||
|
if (sent < request.size() && !send_data && !waiting_send_sqe)
|
||||||
|
{
|
||||||
|
io_uring_sqe *sqe = ringloop->get_sqe();
|
||||||
|
if (!sqe)
|
||||||
|
{
|
||||||
|
waiting_send_sqe = ringloop->wait_sqe([this]() { waiting_send_sqe = 0; submit_send(); });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
send_data = ((ring_data_t*)sqe->user_data);
|
||||||
|
send_iov = (iovec){ .iov_base = (void*)(request.c_str()+sent), .iov_len = request.size()-sent };
|
||||||
|
send_msg.msg_iov = &send_iov;
|
||||||
|
send_msg.msg_iovlen = 1;
|
||||||
|
send_data->callback = [this](ring_data_t *data)
|
||||||
|
{
|
||||||
|
send_data = NULL;
|
||||||
|
if (data->res == -EAGAIN)
|
||||||
|
{
|
||||||
|
data->res = 0;
|
||||||
|
}
|
||||||
|
else if (data->res < 0)
|
||||||
|
{
|
||||||
|
delete this;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
sent += data->res;
|
||||||
|
if (state == HTTP_CO_SENDING_REQUEST)
|
||||||
|
{
|
||||||
|
if (sent >= request.size())
|
||||||
|
state = HTTP_CO_REQUEST_SENT;
|
||||||
|
else
|
||||||
|
submit_send();
|
||||||
|
}
|
||||||
|
else if (state == HTTP_CO_WEBSOCKET)
|
||||||
|
{
|
||||||
|
request = request.substr(sent);
|
||||||
|
sent = 0;
|
||||||
|
submit_send();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
my_uring_prep_sendmsg(sqe, peer_fd, &send_msg, 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void http_co_t::handle_read()
|
||||||
|
{
|
||||||
|
if (state == HTTP_CO_REQUEST_SENT)
|
||||||
|
{
|
||||||
|
int pos = response.find("\r\n\r\n");
|
||||||
|
if (pos >= 0)
|
||||||
|
{
|
||||||
|
if (timeout_id >= 0)
|
||||||
|
{
|
||||||
|
tfd->clear_timer(timeout_id);
|
||||||
|
timeout_id = -1;
|
||||||
|
}
|
||||||
|
state = HTTP_CO_HEADERS_RECEIVED;
|
||||||
|
parse_http_headers(response, &parsed);
|
||||||
|
if (parsed.status_code == 101 &&
|
||||||
|
parsed.headers.find("sec-websocket-accept") != parsed.headers.end() &&
|
||||||
|
parsed.headers["upgrade"] == "websocket" &&
|
||||||
|
parsed.headers["connection"] == "upgrade")
|
||||||
|
{
|
||||||
|
// Don't care about validating the key
|
||||||
|
state = HTTP_CO_WEBSOCKET;
|
||||||
|
request = "";
|
||||||
|
sent = 0;
|
||||||
|
}
|
||||||
|
else if (want_streaming && parsed.headers["transfer-encoding"] == "chunked")
|
||||||
|
{
|
||||||
|
state = HTTP_CO_STREAMING_CHUNKED;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (state == HTTP_CO_STREAMING_CHUNKED && response.size() > 0)
|
||||||
|
{
|
||||||
|
int prev = 0, pos = 0;
|
||||||
|
while ((pos = response.find("\r\n", prev)) >= prev)
|
||||||
|
{
|
||||||
|
uint64_t len = strtoull(response.c_str()+prev, NULL, 16);
|
||||||
|
if (response.size() < pos+2+len+2)
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
parsed.body += response.substr(pos+2, len);
|
||||||
|
prev = pos+2+len+2;
|
||||||
|
}
|
||||||
|
if (prev > 0)
|
||||||
|
{
|
||||||
|
response = response.substr(prev);
|
||||||
|
}
|
||||||
|
if (parsed.body.size() > 0)
|
||||||
|
{
|
||||||
|
callback(&parsed);
|
||||||
|
parsed.body = "";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (state == HTTP_CO_WEBSOCKET && response.size() > 0)
|
||||||
|
{
|
||||||
|
while (ws_parse_frame(response, parsed.ws_msg_type, parsed.body))
|
||||||
|
{
|
||||||
|
callback(&parsed);
|
||||||
|
parsed.body = "";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void http_co_t::post_message(int type, const std::string & msg)
|
||||||
|
{
|
||||||
|
request += ws_format_frame(type, msg.size());
|
||||||
|
request += msg;
|
||||||
|
submit_send();
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t stoull_full(const std::string & str, int base)
|
||||||
|
{
|
||||||
|
if (isspace(str[0]))
|
||||||
|
{
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
char *end = NULL;
|
||||||
|
uint64_t r = strtoull(str.c_str(), &end, base);
|
||||||
|
if (end != str.c_str()+str.length())
|
||||||
|
{
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
return r;
|
||||||
|
}
|
||||||
|
|
||||||
|
void parse_http_headers(std::string & res, http_response_t *parsed)
|
||||||
{
|
{
|
||||||
int pos = res.find("\r\n");
|
int pos = res.find("\r\n");
|
||||||
pos = pos < 0 ? res.length() : pos+2;
|
pos = pos < 0 ? res.length() : pos+2;
|
||||||
|
@ -169,268 +499,137 @@ void parse_headers(std::string & res, http_response_t *parsed)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
http_co_t::~http_co_t()
|
static std::string ws_format_frame(int type, uint64_t size)
|
||||||
{
|
{
|
||||||
if (timeout_id >= 0)
|
// Always zero mask
|
||||||
|
std::string res;
|
||||||
|
int p = 0;
|
||||||
|
res.resize(2 + (size >= 126 ? 2 : 0) + (size >= 65536 ? 6 : 0) + /*mask*/4);
|
||||||
|
res[p++] = 0x80 | type;
|
||||||
|
if (size < 126)
|
||||||
|
res[p++] = size | /*mask*/0x80;
|
||||||
|
else if (size < 65536)
|
||||||
{
|
{
|
||||||
osd->tfd->clear_timer(timeout_id);
|
res[p++] = 126 | /*mask*/0x80;
|
||||||
timeout_id = -1;
|
res[p++] = (size >> 8) & 0xFF;
|
||||||
}
|
res[p++] = (size >> 0) & 0xFF;
|
||||||
if (parsed.headers["transfer-encoding"] == "chunked")
|
|
||||||
{
|
|
||||||
int prev = 0, pos = 0;
|
|
||||||
while ((pos = response.find("\r\n", prev)) >= prev)
|
|
||||||
{
|
|
||||||
uint64_t len = strtoull(response.c_str()+prev, NULL, 16);
|
|
||||||
parsed.body += response.substr(pos+2, len);
|
|
||||||
prev = pos+2+len+2;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
std::swap(parsed.body, response);
|
res[p++] = 127 | /*mask*/0x80;
|
||||||
}
|
res[p++] = (size >> 56) & 0xFF;
|
||||||
parsed.eof = true;
|
res[p++] = (size >> 48) & 0xFF;
|
||||||
callback(&parsed);
|
res[p++] = (size >> 40) & 0xFF;
|
||||||
if (peer_fd >= 0)
|
res[p++] = (size >> 32) & 0xFF;
|
||||||
{
|
res[p++] = (size >> 24) & 0xFF;
|
||||||
osd->epoll_handlers.erase(peer_fd);
|
res[p++] = (size >> 16) & 0xFF;
|
||||||
epoll_ctl(osd->epoll_fd, EPOLL_CTL_DEL, peer_fd, NULL);
|
res[p++] = (size >> 8) & 0xFF;
|
||||||
close(peer_fd);
|
res[p++] = (size >> 0) & 0xFF;
|
||||||
peer_fd = -1;
|
|
||||||
}
|
}
|
||||||
|
res[p++] = 0;
|
||||||
|
res[p++] = 0;
|
||||||
|
res[p++] = 0;
|
||||||
|
res[p++] = 0;
|
||||||
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
void http_co_t::resume()
|
static bool ws_parse_frame(std::string & buf, int & type, std::string & res)
|
||||||
{
|
{
|
||||||
if (st == 0)
|
uint64_t hdr = 2;
|
||||||
|
if (buf.size() < hdr)
|
||||||
{
|
{
|
||||||
int port = extract_port(host);
|
return false;
|
||||||
struct sockaddr_in addr;
|
|
||||||
int r;
|
|
||||||
if ((r = inet_pton(AF_INET, host.c_str(), &addr.sin_addr)) != 1)
|
|
||||||
{
|
|
||||||
parsed.error_code = ENXIO;
|
|
||||||
delete this;
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
addr.sin_family = AF_INET;
|
type = buf[0] & ~0x80;
|
||||||
addr.sin_port = htons(port ? port : 80);
|
bool mask = buf[1] & 0x80;
|
||||||
peer_fd = socket(AF_INET, SOCK_STREAM, 0);
|
hdr += mask ? 4 : 0;
|
||||||
if (peer_fd < 0)
|
uint64_t len = (buf[1] & ~0x80);
|
||||||
|
if (len == 126)
|
||||||
{
|
{
|
||||||
parsed.error_code = errno;
|
hdr += 2;
|
||||||
delete this;
|
if (buf.size() < hdr)
|
||||||
return;
|
{
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK);
|
len = ((uint64_t)buf[2] << 8) | ((uint64_t)buf[3] << 0);
|
||||||
if (osd->http_request_timeout > 0)
|
|
||||||
{
|
|
||||||
timeout_id = osd->tfd->set_timer(1000*osd->http_request_timeout, false, [this](int timer_id)
|
|
||||||
{
|
|
||||||
if (response.length() == 0)
|
|
||||||
{
|
|
||||||
parsed.error_code = EIO;
|
|
||||||
}
|
}
|
||||||
delete this;
|
else if (len == 127)
|
||||||
});
|
|
||||||
}
|
|
||||||
r = connect(peer_fd, (sockaddr*)&addr, sizeof(addr));
|
|
||||||
if (r < 0 && errno != EINPROGRESS)
|
|
||||||
{
|
{
|
||||||
parsed.error_code = errno;
|
hdr += 8;
|
||||||
delete this;
|
if (buf.size() < hdr)
|
||||||
return;
|
|
||||||
}
|
|
||||||
osd->epoll_handlers[peer_fd] = epoll_handler;
|
|
||||||
// Add FD to epoll (EPOLLOUT for tracking connect() result)
|
|
||||||
epoll_event ev;
|
|
||||||
ev.data.fd = peer_fd;
|
|
||||||
ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLET;
|
|
||||||
if (epoll_ctl(osd->epoll_fd, EPOLL_CTL_ADD, peer_fd, &ev) < 0)
|
|
||||||
{
|
{
|
||||||
parsed.error_code = errno;
|
return false;
|
||||||
delete this;
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
epoll_events = 0;
|
len = ((uint64_t)buf[2] << 56) |
|
||||||
st = 1;
|
((uint64_t)buf[3] << 48) |
|
||||||
return;
|
((uint64_t)buf[4] << 40) |
|
||||||
|
((uint64_t)buf[5] << 32) |
|
||||||
|
((uint64_t)buf[6] << 24) |
|
||||||
|
((uint64_t)buf[7] << 16) |
|
||||||
|
((uint64_t)buf[8] << 8) |
|
||||||
|
((uint64_t)buf[9] << 0);
|
||||||
}
|
}
|
||||||
if (st == 1)
|
if (buf.size() < hdr+len)
|
||||||
{
|
{
|
||||||
if (epoll_events & (EPOLLOUT | EPOLLERR))
|
return false;
|
||||||
{
|
|
||||||
int result = 0;
|
|
||||||
socklen_t result_len = sizeof(result);
|
|
||||||
if (getsockopt(peer_fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0)
|
|
||||||
{
|
|
||||||
result = errno;
|
|
||||||
}
|
}
|
||||||
if (result != 0)
|
if (mask)
|
||||||
{
|
{
|
||||||
parsed.error_code = result;
|
for (int i = 0; i < len; i++)
|
||||||
delete this;
|
buf[hdr+i] ^= buf[hdr-4+(i & 3)];
|
||||||
return;
|
|
||||||
}
|
|
||||||
int one = 1;
|
|
||||||
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
|
|
||||||
// Disable EPOLLOUT on this fd
|
|
||||||
epoll_event ev;
|
|
||||||
ev.data.fd = peer_fd;
|
|
||||||
ev.events = EPOLLIN | EPOLLRDHUP | EPOLLET;
|
|
||||||
if (epoll_ctl(osd->epoll_fd, EPOLL_CTL_MOD, peer_fd, &ev) < 0)
|
|
||||||
{
|
|
||||||
parsed.error_code = errno;
|
|
||||||
delete this;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
st = 2;
|
|
||||||
epoll_events = 0;
|
|
||||||
resume();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
else if (epoll_events & EPOLLRDHUP)
|
|
||||||
{
|
|
||||||
delete this;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Write data
|
|
||||||
if (st == 2)
|
|
||||||
{
|
|
||||||
io_uring_sqe *sqe = osd->ringloop->get_sqe();
|
|
||||||
if (!sqe)
|
|
||||||
return;
|
|
||||||
ring_data_t* data = ((ring_data_t*)sqe->user_data);
|
|
||||||
iov = (iovec){ .iov_base = (void*)(request.c_str()+sent), .iov_len = request.size()-sent };
|
|
||||||
msg.msg_iov = &iov;
|
|
||||||
msg.msg_iovlen = 1;
|
|
||||||
data->callback = [this](ring_data_t *data)
|
|
||||||
{
|
|
||||||
st = 4;
|
|
||||||
cqe_res = data->res;
|
|
||||||
resume();
|
|
||||||
};
|
|
||||||
my_uring_prep_sendmsg(sqe, peer_fd, &msg, 0);
|
|
||||||
st = 3;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (st == 3)
|
|
||||||
{
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (st == 4)
|
|
||||||
{
|
|
||||||
if (cqe_res < 0 && cqe_res != -EAGAIN)
|
|
||||||
{
|
|
||||||
delete this;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
sent += cqe_res;
|
|
||||||
if (sent < request.size())
|
|
||||||
st = 2;
|
|
||||||
else
|
|
||||||
st = 5;
|
|
||||||
resume();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// Read response
|
|
||||||
if (st == 5)
|
|
||||||
{
|
|
||||||
if (epoll_events & EPOLLIN)
|
|
||||||
{
|
|
||||||
if (rbuf.size() != 9000)
|
|
||||||
rbuf.resize(9000);
|
|
||||||
io_uring_sqe *sqe = osd->ringloop->get_sqe();
|
|
||||||
if (!sqe)
|
|
||||||
return;
|
|
||||||
ring_data_t* data = ((ring_data_t*)sqe->user_data);
|
|
||||||
iov = { .iov_base = rbuf.data(), .iov_len = 9000 };
|
|
||||||
msg.msg_iov = &iov;
|
|
||||||
msg.msg_iovlen = 1;
|
|
||||||
data->callback = [this](ring_data_t *data)
|
|
||||||
{
|
|
||||||
st = 7;
|
|
||||||
cqe_res = data->res;
|
|
||||||
resume();
|
|
||||||
};
|
|
||||||
my_uring_prep_recvmsg(sqe, peer_fd, &msg, 0);
|
|
||||||
st = 6;
|
|
||||||
epoll_events = epoll_events & ~EPOLLIN;
|
|
||||||
}
|
|
||||||
else if (epoll_events & (EPOLLRDHUP|EPOLLERR))
|
|
||||||
{
|
|
||||||
delete this;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (st == 6)
|
|
||||||
{
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (st == 7)
|
|
||||||
{
|
|
||||||
if (cqe_res < 0 && cqe_res != -EAGAIN)
|
|
||||||
{
|
|
||||||
delete this;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
response += std::string(rbuf.data(), cqe_res);
|
|
||||||
if (!headers_received)
|
|
||||||
{
|
|
||||||
int pos = response.find("\r\n\r\n");
|
|
||||||
if (pos >= 0)
|
|
||||||
{
|
|
||||||
headers_received = true;
|
|
||||||
parse_headers(response, &parsed);
|
|
||||||
streaming = streaming && parsed.headers["transfer-encoding"] == "chunked";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (streaming && headers_received && response.size() > 0)
|
|
||||||
{
|
|
||||||
int prev = 0, pos = 0;
|
|
||||||
while ((pos = response.find("\r\n", prev)) >= prev)
|
|
||||||
{
|
|
||||||
uint64_t len = strtoull(response.c_str()+prev, NULL, 16);
|
|
||||||
if (response.size() < pos+2+len+2)
|
|
||||||
{
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
parsed.body += response.substr(pos+2, len);
|
|
||||||
prev = pos+2+len+2;
|
|
||||||
}
|
|
||||||
if (prev > 0)
|
|
||||||
{
|
|
||||||
response = response.substr(prev);
|
|
||||||
}
|
|
||||||
if (parsed.body.size() > 0)
|
|
||||||
{
|
|
||||||
callback(&parsed);
|
|
||||||
parsed.body = "";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
st = 5;
|
|
||||||
resume();
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
res += buf.substr(hdr, len);
|
||||||
|
buf = buf.substr(hdr+len);
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t stoull_full(const std::string & str, int base)
|
std::vector<std::string> getifaddr_list(bool include_v6)
|
||||||
{
|
{
|
||||||
if (isspace(str[0]))
|
std::vector<std::string> addresses;
|
||||||
|
ifaddrs *list, *ifa;
|
||||||
|
if (getifaddrs(&list) == -1)
|
||||||
{
|
{
|
||||||
return 0;
|
throw std::runtime_error(std::string("getifaddrs: ") + strerror(errno));
|
||||||
}
|
}
|
||||||
char *end = NULL;
|
for (ifa = list; ifa != NULL; ifa = ifa->ifa_next)
|
||||||
uint64_t r = strtoull(str.c_str(), &end, base);
|
|
||||||
if (end != str.c_str()+str.length())
|
|
||||||
{
|
{
|
||||||
return 0;
|
if (!ifa->ifa_addr)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
return r;
|
int family = ifa->ifa_addr->sa_family;
|
||||||
|
if ((family == AF_INET || family == AF_INET6 && include_v6) &&
|
||||||
|
(ifa->ifa_flags & (IFF_UP | IFF_RUNNING | IFF_LOOPBACK)) == (IFF_UP | IFF_RUNNING))
|
||||||
|
{
|
||||||
|
void *addr_ptr;
|
||||||
|
if (family == AF_INET)
|
||||||
|
addr_ptr = &((sockaddr_in *)ifa->ifa_addr)->sin_addr;
|
||||||
|
else
|
||||||
|
addr_ptr = &((sockaddr_in6 *)ifa->ifa_addr)->sin6_addr;
|
||||||
|
char addr[INET6_ADDRSTRLEN];
|
||||||
|
if (!inet_ntop(family, addr_ptr, addr, INET6_ADDRSTRLEN))
|
||||||
|
{
|
||||||
|
throw std::runtime_error(std::string("inet_ntop: ") + strerror(errno));
|
||||||
|
}
|
||||||
|
addresses.push_back(std::string(addr));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
freeifaddrs(list);
|
||||||
|
return addresses;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int extract_port(std::string & host)
|
||||||
|
{
|
||||||
|
int port = 0;
|
||||||
|
int pos = 0;
|
||||||
|
if ((pos = host.find(':')) >= 0)
|
||||||
|
{
|
||||||
|
port = strtoull(host.c_str() + pos + 1, NULL, 10);
|
||||||
|
if (port >= 0x10000)
|
||||||
|
{
|
||||||
|
port = 0;
|
||||||
|
}
|
||||||
|
host = host.substr(0, pos);
|
||||||
|
}
|
||||||
|
return port;
|
||||||
}
|
}
|
||||||
|
|
19
osd_http.h
19
osd_http.h
|
@ -2,7 +2,13 @@
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
#include <map>
|
#include <map>
|
||||||
#include "json11/json11.hpp"
|
|
||||||
|
#define WS_CONTINUATION 0
|
||||||
|
#define WS_TEXT 1
|
||||||
|
#define WS_BINARY 2
|
||||||
|
#define WS_CLOSE 8
|
||||||
|
#define WS_PING 9
|
||||||
|
#define WS_PONG 10
|
||||||
|
|
||||||
struct http_response_t
|
struct http_response_t
|
||||||
{
|
{
|
||||||
|
@ -11,9 +17,18 @@ struct http_response_t
|
||||||
int status_code = 0;
|
int status_code = 0;
|
||||||
std::string status_line;
|
std::string status_line;
|
||||||
std::map<std::string, std::string> headers;
|
std::map<std::string, std::string> headers;
|
||||||
|
int ws_msg_type = -1;
|
||||||
std::string body;
|
std::string body;
|
||||||
};
|
};
|
||||||
|
|
||||||
void parse_headers(std::string & res, http_response_t *parsed);
|
struct http_co_t;
|
||||||
|
|
||||||
|
struct websocket_t
|
||||||
|
{
|
||||||
|
http_co_t *co;
|
||||||
|
void post_message(int type, const std::string & msg);
|
||||||
|
};
|
||||||
|
|
||||||
|
void parse_http_headers(std::string & res, http_response_t *parsed);
|
||||||
std::vector<std::string> getifaddr_list(bool include_v6 = false);
|
std::vector<std::string> getifaddr_list(bool include_v6 = false);
|
||||||
uint64_t stoull_full(const std::string & str, int base = 10);
|
uint64_t stoull_full(const std::string & str, int base = 10);
|
||||||
|
|
|
@ -18,6 +18,7 @@ ring_loop_t::ring_loop_t(int qd)
|
||||||
{
|
{
|
||||||
free_ring_data[i] = i;
|
free_ring_data[i] = i;
|
||||||
}
|
}
|
||||||
|
wait_sqe_id = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
ring_loop_t::~ring_loop_t()
|
ring_loop_t::~ring_loop_t()
|
||||||
|
@ -64,6 +65,11 @@ void ring_loop_t::loop()
|
||||||
free_ring_data[free_ring_data_ptr++] = d - ring_datas;
|
free_ring_data[free_ring_data_ptr++] = d - ring_datas;
|
||||||
io_uring_cqe_seen(&ring, cqe);
|
io_uring_cqe_seen(&ring, cqe);
|
||||||
}
|
}
|
||||||
|
while (get_sqe_queue.size() > 0)
|
||||||
|
{
|
||||||
|
(get_sqe_queue[0].second)();
|
||||||
|
get_sqe_queue.erase(get_sqe_queue.begin());
|
||||||
|
}
|
||||||
do
|
do
|
||||||
{
|
{
|
||||||
loop_again = false;
|
loop_again = false;
|
||||||
|
|
17
ringloop.h
17
ringloop.h
|
@ -118,9 +118,11 @@ struct ring_consumer_t
|
||||||
|
|
||||||
class ring_loop_t
|
class ring_loop_t
|
||||||
{
|
{
|
||||||
|
std::vector<std::pair<int,std::function<void()>>> get_sqe_queue;
|
||||||
std::vector<ring_consumer_t*> consumers;
|
std::vector<ring_consumer_t*> consumers;
|
||||||
struct ring_data_t *ring_datas;
|
struct ring_data_t *ring_datas;
|
||||||
int *free_ring_data;
|
int *free_ring_data;
|
||||||
|
int wait_sqe_id;
|
||||||
unsigned free_ring_data_ptr;
|
unsigned free_ring_data_ptr;
|
||||||
bool loop_again;
|
bool loop_again;
|
||||||
struct io_uring ring;
|
struct io_uring ring;
|
||||||
|
@ -139,6 +141,21 @@ public:
|
||||||
io_uring_sqe_set_data(sqe, ring_datas + free_ring_data[--free_ring_data_ptr]);
|
io_uring_sqe_set_data(sqe, ring_datas + free_ring_data[--free_ring_data_ptr]);
|
||||||
return sqe;
|
return sqe;
|
||||||
}
|
}
|
||||||
|
inline int wait_sqe(std::function<void()> cb)
|
||||||
|
{
|
||||||
|
get_sqe_queue.push_back({ wait_sqe_id, cb });
|
||||||
|
return wait_sqe_id++;
|
||||||
|
}
|
||||||
|
inline void cancel_wait_sqe(int wait_id)
|
||||||
|
{
|
||||||
|
for (int i = 0; i < get_sqe_queue.size(); i++)
|
||||||
|
{
|
||||||
|
if (get_sqe_queue[i].first == wait_id)
|
||||||
|
{
|
||||||
|
get_sqe_queue.erase(get_sqe_queue.begin()+i, get_sqe_queue.begin()+i+1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
inline int submit()
|
inline int submit()
|
||||||
{
|
{
|
||||||
return io_uring_submit(&ring);
|
return io_uring_submit(&ring);
|
||||||
|
|
Loading…
Reference in New Issue