Compare commits
2 Commits
master
...
msgr-iothr
Author | SHA1 | Date |
---|---|---|
Vitaliy Filippov | 249a233b37 | |
Vitaliy Filippov | d07e072212 |
|
@ -271,7 +271,7 @@ void http_co_t::close_connection()
|
|||
}
|
||||
if (peer_fd >= 0)
|
||||
{
|
||||
tfd->set_fd_handler(peer_fd, false, NULL);
|
||||
tfd->set_fd_handler(peer_fd, 0, NULL);
|
||||
close(peer_fd);
|
||||
peer_fd = -1;
|
||||
}
|
||||
|
@ -314,7 +314,7 @@ void http_co_t::start_connection()
|
|||
stackout();
|
||||
return;
|
||||
}
|
||||
tfd->set_fd_handler(peer_fd, true, [this](int peer_fd, int epoll_events)
|
||||
tfd->set_fd_handler(peer_fd, EPOLLIN|EPOLLOUT, [this](int peer_fd, int epoll_events)
|
||||
{
|
||||
this->epoll_events |= epoll_events;
|
||||
handle_events();
|
||||
|
@ -372,7 +372,7 @@ void http_co_t::handle_connect_result()
|
|||
}
|
||||
int one = 1;
|
||||
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
|
||||
tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events)
|
||||
tfd->set_fd_handler(peer_fd, EPOLLIN, [this](int peer_fd, int epoll_events)
|
||||
{
|
||||
this->epoll_events |= epoll_events;
|
||||
handle_events();
|
||||
|
|
|
@ -15,6 +15,207 @@
|
|||
#include "msgr_rdma.h"
|
||||
#endif
|
||||
|
||||
#include <sys/poll.h>
|
||||
#include <sys/eventfd.h>
|
||||
|
||||
static uint64_t one = 1;
|
||||
|
||||
msgr_iothread_t::msgr_iothread_t()
|
||||
{
|
||||
ring = new ring_loop_t(RINGLOOP_DEFAULT_SIZE);
|
||||
epmgr = new epoll_manager_t(ring);
|
||||
submit_eventfd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
|
||||
if (submit_eventfd < 0)
|
||||
{
|
||||
throw std::runtime_error(std::string("failed to create eventfd: ")+strerror(errno));
|
||||
}
|
||||
epmgr->tfd->set_fd_handler(submit_eventfd, EPOLLIN, [this](int fd, int epoll_events)
|
||||
{
|
||||
// Reset eventfd counter
|
||||
uint64_t ctr = 0;
|
||||
int r = read(submit_eventfd, &ctr, 8);
|
||||
if (r < 0 && errno != EAGAIN && errno != EINTR)
|
||||
{
|
||||
fprintf(stderr, "Error resetting eventfd: %s\n", strerror(errno));
|
||||
}
|
||||
ring->wakeup();
|
||||
});
|
||||
consumer.loop = [this]()
|
||||
{
|
||||
read_requests();
|
||||
send_replies();
|
||||
ring->submit();
|
||||
};
|
||||
ring->register_consumer(&consumer);
|
||||
thread = new std::thread(&msgr_iothread_t::run, this);
|
||||
}
|
||||
|
||||
msgr_iothread_t::~msgr_iothread_t()
|
||||
{
|
||||
stop();
|
||||
delete thread;
|
||||
delete epmgr;
|
||||
delete ring;
|
||||
}
|
||||
|
||||
void msgr_iothread_t::stop()
|
||||
{
|
||||
mu.lock();
|
||||
if (stopped)
|
||||
{
|
||||
mu.unlock();
|
||||
return;
|
||||
}
|
||||
stopped = true;
|
||||
write(submit_eventfd, &one, sizeof(one));
|
||||
mu.unlock();
|
||||
thread->join();
|
||||
ring->unregister_consumer(&consumer);
|
||||
close(submit_eventfd);
|
||||
}
|
||||
|
||||
void msgr_iothread_t::add_client(osd_client_t *cl)
|
||||
{
|
||||
mu.lock();
|
||||
if (stopped)
|
||||
{
|
||||
mu.unlock();
|
||||
return;
|
||||
}
|
||||
assert(!clients[cl->peer_fd]);
|
||||
clients[cl->peer_fd] = cl;
|
||||
epmgr->tfd->set_fd_handler(cl->peer_fd, EPOLLIN, [this](int peer_fd, int epoll_events)
|
||||
{
|
||||
// FIXME: Slight copypaste (see handle_peer_epoll)
|
||||
if (epoll_events & EPOLLIN)
|
||||
{
|
||||
auto cl_it = clients.find(peer_fd);
|
||||
if (cl_it != clients.end())
|
||||
{
|
||||
auto cl = cl_it->second;
|
||||
cl->mu.lock();
|
||||
cl->read_ready++;
|
||||
if (cl->read_ready == 1)
|
||||
{
|
||||
read_ready_clients.push_back(peer_fd);
|
||||
ring->wakeup();
|
||||
}
|
||||
cl->mu.unlock();
|
||||
}
|
||||
}
|
||||
});
|
||||
mu.unlock();
|
||||
}
|
||||
|
||||
void msgr_iothread_t::remove_client(osd_client_t *cl)
|
||||
{
|
||||
mu.lock();
|
||||
if (stopped)
|
||||
{
|
||||
mu.unlock();
|
||||
return;
|
||||
}
|
||||
auto cl_it = clients.find(cl->peer_fd);
|
||||
if (cl_it != clients.end() && cl_it->second == cl)
|
||||
{
|
||||
clients.erase(cl->peer_fd);
|
||||
epmgr->tfd->set_fd_handler(cl->peer_fd, 0, NULL);
|
||||
}
|
||||
mu.unlock();
|
||||
}
|
||||
|
||||
void msgr_iothread_t::wakeup_out(int peer_fd, ring_loop_t *outer_ring)
|
||||
{
|
||||
write_ready_mu.lock();
|
||||
if (!write_ready_clients.size())
|
||||
{
|
||||
io_uring_sqe* sqe = outer_ring->get_sqe();
|
||||
if (!sqe)
|
||||
{
|
||||
write(submit_eventfd, &one, sizeof(one));
|
||||
}
|
||||
else
|
||||
{
|
||||
ring_data_t* data = ((ring_data_t*)sqe->user_data);
|
||||
data->callback = [](ring_data_t*){};
|
||||
my_uring_prep_write(sqe, submit_eventfd, &one, sizeof(one), 0);
|
||||
}
|
||||
}
|
||||
write_ready_clients.push_back(peer_fd);
|
||||
write_ready_mu.unlock();
|
||||
}
|
||||
|
||||
void msgr_iothread_t::read_requests()
|
||||
{
|
||||
// FIXME: Slight copypaste (see messenger_t::read_requests)
|
||||
auto to_recv = std::move(read_ready_clients);
|
||||
for (int i = 0; i < to_recv.size(); i++)
|
||||
{
|
||||
int peer_fd = to_recv[i];
|
||||
auto cl_it = clients.find(peer_fd);
|
||||
if (cl_it == clients.end())
|
||||
{
|
||||
continue;
|
||||
}
|
||||
osd_client_t *cl = cl_it->second;
|
||||
cl->mu.lock();
|
||||
auto ok = cl->try_recv(ring, false);
|
||||
cl->mu.unlock();
|
||||
if (!ok)
|
||||
{
|
||||
read_ready_clients.insert(read_ready_clients.end(), to_recv.begin()+i, to_recv.end());
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void msgr_iothread_t::send_replies()
|
||||
{
|
||||
if (stopped)
|
||||
{
|
||||
return;
|
||||
}
|
||||
write_ready_mu.lock();
|
||||
auto to_send = std::move(write_ready_clients);
|
||||
write_ready_mu.unlock();
|
||||
for (int i = 0; i < to_send.size(); i++)
|
||||
{
|
||||
auto cl_it = clients.find(to_send[i]);
|
||||
if (cl_it == clients.end())
|
||||
{
|
||||
continue;
|
||||
}
|
||||
auto cl = cl_it->second;
|
||||
cl->mu.lock();
|
||||
auto ok = cl->try_send(ring, false/*, lock*/);
|
||||
cl->mu.unlock();
|
||||
if (!ok)
|
||||
{
|
||||
// ring is full (rare but what if...)
|
||||
write_ready_mu.lock();
|
||||
write_ready_clients.insert(write_ready_clients.end(), to_send.begin()+i, to_send.end());
|
||||
write_ready_mu.unlock();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void msgr_iothread_t::run()
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
mu.lock();
|
||||
if (stopped)
|
||||
{
|
||||
mu.unlock();
|
||||
return;
|
||||
}
|
||||
ring->loop();
|
||||
mu.unlock();
|
||||
ring->wait();
|
||||
}
|
||||
}
|
||||
|
||||
void osd_messenger_t::init()
|
||||
{
|
||||
#ifdef WITH_RDMA
|
||||
|
@ -35,7 +236,7 @@ void osd_messenger_t::init()
|
|||
? rdma_max_sge : rdma_context->attrx.orig_attr.max_sge;
|
||||
fprintf(stderr, "[OSD %ju] RDMA initialized successfully\n", osd_num);
|
||||
fcntl(rdma_context->channel->fd, F_SETFL, fcntl(rdma_context->channel->fd, F_GETFL, 0) | O_NONBLOCK);
|
||||
tfd->set_fd_handler(rdma_context->channel->fd, false, [this](int notify_fd, int epoll_events)
|
||||
tfd->set_fd_handler(rdma_context->channel->fd, EPOLLIN, [this](int notify_fd, int epoll_events)
|
||||
{
|
||||
handle_rdma_events();
|
||||
});
|
||||
|
@ -43,6 +244,44 @@ void osd_messenger_t::init()
|
|||
}
|
||||
}
|
||||
#endif
|
||||
if (ringloop && iothread_count > 0)
|
||||
{
|
||||
for (int i = 0; i < iothread_count; i++)
|
||||
{
|
||||
auto iot = new msgr_iothread_t();
|
||||
iothreads.push_back(iot);
|
||||
}
|
||||
immediates_eventfd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
|
||||
if (immediates_eventfd < 0)
|
||||
{
|
||||
throw std::runtime_error(std::string("failed to create set_immediate eventfd: ")+strerror(errno));
|
||||
}
|
||||
tfd->set_fd_handler(immediates_eventfd, EPOLLIN, [this](int peer_fd, int epoll_events)
|
||||
{
|
||||
// Reset eventfd counter
|
||||
uint64_t ctr = 0;
|
||||
int r = read(immediates_eventfd, &ctr, 8);
|
||||
if (r < 0 && errno != EAGAIN && errno != EINTR)
|
||||
{
|
||||
fprintf(stderr, "Error resetting eventfd: %s\n", strerror(errno));
|
||||
}
|
||||
while (true)
|
||||
{
|
||||
immediates_mu.lock();
|
||||
auto to_run = std::move(immediates);
|
||||
immediates_mu.unlock();
|
||||
if (!to_run.size())
|
||||
{
|
||||
break;
|
||||
}
|
||||
for (auto & cb: to_run)
|
||||
{
|
||||
cb();
|
||||
}
|
||||
}
|
||||
ringloop->wakeup();
|
||||
});
|
||||
}
|
||||
keepalive_timer_id = tfd->set_timer(1000, true, [this](int)
|
||||
{
|
||||
auto cl_it = clients.begin();
|
||||
|
@ -120,6 +359,12 @@ void osd_messenger_t::init()
|
|||
|
||||
osd_messenger_t::~osd_messenger_t()
|
||||
{
|
||||
if (immediates_eventfd >= 0)
|
||||
{
|
||||
tfd->set_fd_handler(immediates_eventfd, 0, NULL);
|
||||
close(immediates_eventfd);
|
||||
immediates_eventfd = -1;
|
||||
}
|
||||
if (keepalive_timer_id >= 0)
|
||||
{
|
||||
tfd->clear_timer(keepalive_timer_id);
|
||||
|
@ -129,6 +374,14 @@ osd_messenger_t::~osd_messenger_t()
|
|||
{
|
||||
stop_client(clients.begin()->first, true, true);
|
||||
}
|
||||
if (iothreads.size())
|
||||
{
|
||||
for (auto iot: iothreads)
|
||||
{
|
||||
delete iot;
|
||||
}
|
||||
iothreads.clear();
|
||||
}
|
||||
#ifdef WITH_RDMA
|
||||
if (rdma_context)
|
||||
{
|
||||
|
@ -165,6 +418,10 @@ void osd_messenger_t::parse_config(const json11::Json & config)
|
|||
this->rdma_max_msg = 129*1024;
|
||||
this->rdma_odp = config["rdma_odp"].bool_value();
|
||||
#endif
|
||||
if (!osd_num)
|
||||
this->iothread_count = config["client_iothread_count"].is_null() ? 4 : (uint32_t)config["client_iothread_count"].uint64_value();
|
||||
else
|
||||
this->iothread_count = (uint32_t)config["osd_iothread_count"].uint64_value();
|
||||
this->receive_buffer_size = (uint32_t)config["tcp_header_buffer_size"].uint64_value();
|
||||
if (!this->receive_buffer_size || this->receive_buffer_size > 1024*1024*1024)
|
||||
this->receive_buffer_size = 65536;
|
||||
|
@ -255,6 +512,7 @@ void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer
|
|||
{
|
||||
fprintf(stderr, "Connecting to OSD %ju at %s:%d (client %d)\n", peer_osd, peer_host, peer_port, peer_fd);
|
||||
}
|
||||
clients[peer_fd]->msgr = this;
|
||||
clients[peer_fd]->peer_addr = addr;
|
||||
clients[peer_fd]->peer_port = peer_port;
|
||||
clients[peer_fd]->peer_fd = peer_fd;
|
||||
|
@ -262,7 +520,8 @@ void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer
|
|||
clients[peer_fd]->connect_timeout_id = -1;
|
||||
clients[peer_fd]->osd_num = peer_osd;
|
||||
clients[peer_fd]->in_buf = malloc_or_die(receive_buffer_size);
|
||||
tfd->set_fd_handler(peer_fd, true, [this](int peer_fd, int epoll_events)
|
||||
clients[peer_fd]->receive_buffer_size = receive_buffer_size;
|
||||
tfd->set_fd_handler(peer_fd, EPOLLIN|EPOLLOUT, [this](int peer_fd, int epoll_events)
|
||||
{
|
||||
// Either OUT (connected) or HUP
|
||||
handle_connect_epoll(peer_fd);
|
||||
|
@ -303,7 +562,11 @@ void osd_messenger_t::handle_connect_epoll(int peer_fd)
|
|||
int one = 1;
|
||||
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
|
||||
cl->peer_state = PEER_CONNECTED;
|
||||
tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events)
|
||||
if (iothreads.size())
|
||||
{
|
||||
iothreads[peer_fd % iothreads.size()]->add_client(cl);
|
||||
}
|
||||
tfd->set_fd_handler(peer_fd, iothreads.size() ? 0 : EPOLLIN, [this](int peer_fd, int epoll_events)
|
||||
{
|
||||
handle_peer_epoll(peer_fd, epoll_events);
|
||||
});
|
||||
|
@ -487,7 +750,7 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
|
|||
fprintf(stderr, "Connected to OSD %ju using RDMA\n", cl->osd_num);
|
||||
}
|
||||
cl->peer_state = PEER_RDMA;
|
||||
tfd->set_fd_handler(cl->peer_fd, false, [this](int peer_fd, int epoll_events)
|
||||
tfd->set_fd_handler(cl->peer_fd, 0, [this](int peer_fd, int epoll_events)
|
||||
{
|
||||
// Do not miss the disconnection!
|
||||
if (epoll_events & EPOLLRDHUP)
|
||||
|
@ -522,13 +785,19 @@ void osd_messenger_t::accept_connections(int listen_fd)
|
|||
int one = 1;
|
||||
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
|
||||
clients[peer_fd] = new osd_client_t();
|
||||
clients[peer_fd]->msgr = this;
|
||||
clients[peer_fd]->peer_addr = addr;
|
||||
clients[peer_fd]->peer_port = ntohs(((sockaddr_in*)&addr)->sin_port);
|
||||
clients[peer_fd]->peer_fd = peer_fd;
|
||||
clients[peer_fd]->peer_state = PEER_CONNECTED;
|
||||
clients[peer_fd]->in_buf = malloc_or_die(receive_buffer_size);
|
||||
clients[peer_fd]->receive_buffer_size = receive_buffer_size;
|
||||
// Add FD to epoll
|
||||
tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events)
|
||||
if (iothreads.size())
|
||||
{
|
||||
iothreads[peer_fd % iothreads.size()]->add_client(clients[peer_fd]);
|
||||
}
|
||||
tfd->set_fd_handler(peer_fd, iothreads.size() ? 0 : EPOLLIN, [this](int peer_fd, int epoll_events)
|
||||
{
|
||||
handle_peer_epoll(peer_fd, epoll_events);
|
||||
});
|
||||
|
|
|
@ -11,6 +11,7 @@
|
|||
#include <map>
|
||||
#include <deque>
|
||||
#include <vector>
|
||||
#include <mutex>
|
||||
|
||||
#include "malloc_or_die.h"
|
||||
#include "json11/json11.hpp"
|
||||
|
@ -45,8 +46,13 @@ struct msgr_rdma_connection_t;
|
|||
struct msgr_rdma_context_t;
|
||||
#endif
|
||||
|
||||
struct osd_messenger_t;
|
||||
|
||||
struct osd_client_t
|
||||
{
|
||||
std::mutex mu;
|
||||
osd_messenger_t *msgr = NULL;
|
||||
|
||||
int refs = 0;
|
||||
|
||||
sockaddr_storage peer_addr;
|
||||
|
@ -59,6 +65,7 @@ struct osd_client_t
|
|||
osd_num_t osd_num = 0;
|
||||
|
||||
void *in_buf = NULL;
|
||||
uint32_t receive_buffer_size = 0;
|
||||
|
||||
#ifdef WITH_RDMA
|
||||
msgr_rdma_connection_t *rdma_conn = NULL;
|
||||
|
@ -89,6 +96,17 @@ struct osd_client_t
|
|||
std::vector<msgr_sendp_t> outbox, next_outbox;
|
||||
|
||||
~osd_client_t();
|
||||
|
||||
bool try_send(ring_loop_t *ringloop, bool use_sync_send_recv);
|
||||
int handle_send(int result);
|
||||
|
||||
bool try_recv(ring_loop_t *ringloop, bool use_sync_send_recv);
|
||||
int handle_read(int result);
|
||||
bool handle_read_buffer(void *curbuf, int remain);
|
||||
bool handle_finished_read();
|
||||
void handle_op_hdr();
|
||||
bool handle_reply_hdr();
|
||||
void handle_reply_ready(osd_op_t *op);
|
||||
};
|
||||
|
||||
struct osd_wanted_peer_t
|
||||
|
@ -111,6 +129,53 @@ struct osd_op_stats_t
|
|||
uint64_t subop_stat_count[OSD_OP_MAX+1] = { 0 };
|
||||
};
|
||||
|
||||
#ifdef __MOCK__
|
||||
class msgr_iothread_t;
|
||||
#else
|
||||
|
||||
#include <thread>
|
||||
|
||||
#include "epoll_manager.h"
|
||||
|
||||
class msgr_iothread_t
|
||||
{
|
||||
protected:
|
||||
ring_loop_t *ring = NULL;
|
||||
epoll_manager_t *epmgr = NULL;
|
||||
ring_consumer_t consumer;
|
||||
int submit_eventfd = -1;
|
||||
bool stopped = false;
|
||||
std::mutex mu;
|
||||
std::map<int, osd_client_t*> clients;
|
||||
std::vector<int> read_ready_clients;
|
||||
std::mutex write_ready_mu;
|
||||
std::vector<int> write_ready_clients;
|
||||
std::thread *thread = NULL;
|
||||
|
||||
void run();
|
||||
|
||||
void read_requests();
|
||||
|
||||
void send_replies();
|
||||
|
||||
public:
|
||||
|
||||
void handle_client_read(osd_client_t *cl, int res);
|
||||
void handle_client_send(osd_client_t *cl, int res);
|
||||
|
||||
msgr_iothread_t();
|
||||
~msgr_iothread_t();
|
||||
|
||||
void add_client(osd_client_t *cl);
|
||||
|
||||
void remove_client(osd_client_t *cl);
|
||||
|
||||
void wakeup_out(int peer_fd, ring_loop_t *outer_ring);
|
||||
|
||||
void stop();
|
||||
};
|
||||
#endif
|
||||
|
||||
struct osd_messenger_t
|
||||
{
|
||||
protected:
|
||||
|
@ -123,6 +188,7 @@ protected:
|
|||
int osd_ping_timeout = 0;
|
||||
int log_level = 0;
|
||||
bool use_sync_send_recv = false;
|
||||
int iothread_count = 0;
|
||||
|
||||
#ifdef WITH_RDMA
|
||||
bool use_rdma = true;
|
||||
|
@ -134,10 +200,13 @@ protected:
|
|||
bool rdma_odp = false;
|
||||
#endif
|
||||
|
||||
std::vector<msgr_iothread_t*> iothreads;
|
||||
std::vector<int> read_ready_clients;
|
||||
std::vector<int> write_ready_clients;
|
||||
int immediates_eventfd = -1;
|
||||
std::mutex immediates_mu;
|
||||
// We don't use ringloop->set_immediate here because we may have no ringloop in client :)
|
||||
std::vector<std::function<void()>> set_immediate;
|
||||
std::vector<std::function<void()>> immediates;
|
||||
|
||||
public:
|
||||
timerfd_manager_t *tfd;
|
||||
|
@ -155,10 +224,13 @@ public:
|
|||
void parse_config(const json11::Json & config);
|
||||
void connect_peer(uint64_t osd_num, json11::Json peer_state);
|
||||
void stop_client(int peer_fd, bool force = false, bool force_delete = false);
|
||||
void stop_client_from_iothread(osd_client_t *cl);
|
||||
void outbox_push(osd_op_t *cur_op);
|
||||
std::function<void(osd_op_t*)> exec_op;
|
||||
std::function<void(osd_num_t)> repeer_pgs;
|
||||
std::function<bool(osd_client_t*, json11::Json)> check_config_hook;
|
||||
void handle_client_read(osd_client_t *cl, int res);
|
||||
void handle_client_send(osd_client_t *cl, int res);
|
||||
void read_requests();
|
||||
void send_replies();
|
||||
void accept_connections(int listen_fd);
|
||||
|
@ -178,6 +250,9 @@ public:
|
|||
void inc_op_stats(osd_op_stats_t & stats, uint64_t opcode, timespec & tv_begin, timespec & tv_end, uint64_t len);
|
||||
void measure_exec(osd_op_t *cur_op);
|
||||
|
||||
void set_immediate(std::function<void()> cb);
|
||||
void set_immediate_or_run(std::function<void()> cb);
|
||||
|
||||
protected:
|
||||
void try_connect_peer(uint64_t osd_num);
|
||||
void try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port);
|
||||
|
@ -188,15 +263,7 @@ protected:
|
|||
void cancel_osd_ops(osd_client_t *cl);
|
||||
void cancel_op(osd_op_t *op);
|
||||
|
||||
bool try_send(osd_client_t *cl);
|
||||
void handle_send(int result, osd_client_t *cl);
|
||||
|
||||
bool handle_read(int result, osd_client_t *cl);
|
||||
bool handle_read_buffer(osd_client_t *cl, void *curbuf, int remain);
|
||||
bool handle_finished_read(osd_client_t *cl);
|
||||
void handle_op_hdr(osd_client_t *cl);
|
||||
bool handle_reply_hdr(osd_client_t *cl);
|
||||
void handle_reply_ready(osd_op_t *op);
|
||||
void handle_immediates();
|
||||
|
||||
#ifdef WITH_RDMA
|
||||
void try_send_rdma(osd_client_t *cl);
|
||||
|
@ -205,4 +272,6 @@ protected:
|
|||
bool try_recv_rdma(osd_client_t *cl);
|
||||
void handle_rdma_events();
|
||||
#endif
|
||||
|
||||
friend struct osd_client_t;
|
||||
};
|
||||
|
|
|
@ -603,7 +603,7 @@ void osd_messenger_t::handle_rdma_events()
|
|||
if (!is_send)
|
||||
{
|
||||
rc->cur_recv--;
|
||||
if (!handle_read_buffer(cl, rc->recv_buffers[rc->next_recv_buf].buf, wc[i].byte_len))
|
||||
if (!cl->handle_read_buffer(rc->recv_buffers[rc->next_recv_buf].buf, wc[i].byte_len))
|
||||
{
|
||||
// handle_read_buffer may stop the client
|
||||
continue;
|
||||
|
@ -666,9 +666,5 @@ void osd_messenger_t::handle_rdma_events()
|
|||
}
|
||||
}
|
||||
} while (event_count > 0);
|
||||
for (auto cb: set_immediate)
|
||||
{
|
||||
cb();
|
||||
}
|
||||
set_immediate.clear();
|
||||
handle_immediates();
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
// Copyright (c) Vitaliy Filippov, 2019+
|
||||
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
|
||||
|
||||
#include <unistd.h>
|
||||
#include "messenger.h"
|
||||
|
||||
void osd_messenger_t::read_requests()
|
||||
|
@ -9,63 +10,119 @@ void osd_messenger_t::read_requests()
|
|||
{
|
||||
int peer_fd = read_ready_clients[i];
|
||||
osd_client_t *cl = clients[peer_fd];
|
||||
if (cl->read_msg.msg_iovlen)
|
||||
if (!cl->try_recv(ringloop, use_sync_send_recv))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
if (cl->read_remaining < receive_buffer_size)
|
||||
{
|
||||
cl->read_iov.iov_base = cl->in_buf;
|
||||
cl->read_iov.iov_len = receive_buffer_size;
|
||||
cl->read_msg.msg_iov = &cl->read_iov;
|
||||
cl->read_msg.msg_iovlen = 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
cl->read_iov.iov_base = 0;
|
||||
cl->read_iov.iov_len = cl->read_remaining;
|
||||
cl->read_msg.msg_iov = cl->recv_list.get_iovec();
|
||||
cl->read_msg.msg_iovlen = cl->recv_list.get_size();
|
||||
}
|
||||
cl->refs++;
|
||||
if (ringloop && !use_sync_send_recv)
|
||||
{
|
||||
io_uring_sqe* sqe = ringloop->get_sqe();
|
||||
if (!sqe)
|
||||
{
|
||||
cl->read_msg.msg_iovlen = 0;
|
||||
read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients.begin() + i);
|
||||
return;
|
||||
}
|
||||
ring_data_t* data = ((ring_data_t*)sqe->user_data);
|
||||
data->callback = [this, cl](ring_data_t *data) { handle_read(data->res, cl); };
|
||||
my_uring_prep_recvmsg(sqe, peer_fd, &cl->read_msg, 0);
|
||||
}
|
||||
else
|
||||
{
|
||||
int result = recvmsg(peer_fd, &cl->read_msg, 0);
|
||||
if (result < 0)
|
||||
{
|
||||
result = -errno;
|
||||
}
|
||||
handle_read(result, cl);
|
||||
read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients.begin() + i);
|
||||
return;
|
||||
}
|
||||
}
|
||||
read_ready_clients.clear();
|
||||
if (!iothreads.size())
|
||||
{
|
||||
handle_immediates();
|
||||
}
|
||||
}
|
||||
|
||||
bool osd_messenger_t::handle_read(int result, osd_client_t *cl)
|
||||
bool osd_client_t::try_recv(ring_loop_t *ringloop, bool use_sync_send_recv)
|
||||
{
|
||||
bool ret = false;
|
||||
auto cl = this;
|
||||
if (cl->read_msg.msg_iovlen)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
if (cl->read_remaining < cl->receive_buffer_size)
|
||||
{
|
||||
cl->read_iov.iov_base = cl->in_buf;
|
||||
cl->read_iov.iov_len = cl->receive_buffer_size;
|
||||
cl->read_msg.msg_iov = &cl->read_iov;
|
||||
cl->read_msg.msg_iovlen = 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
cl->read_iov.iov_base = 0;
|
||||
cl->read_iov.iov_len = cl->read_remaining;
|
||||
cl->read_msg.msg_iov = cl->recv_list.get_iovec();
|
||||
cl->read_msg.msg_iovlen = cl->recv_list.get_size();
|
||||
}
|
||||
cl->refs++;
|
||||
if (ringloop && !use_sync_send_recv)
|
||||
{
|
||||
io_uring_sqe* sqe = ringloop->get_sqe();
|
||||
if (!sqe)
|
||||
{
|
||||
cl->read_msg.msg_iovlen = 0;
|
||||
return false;
|
||||
}
|
||||
ring_data_t* data = ((ring_data_t*)sqe->user_data);
|
||||
if (msgr->iothreads.size())
|
||||
{
|
||||
data->callback = [this](ring_data_t *data) { msgr->iothreads[peer_fd % msgr->iothreads.size()]->handle_client_read(this, data->res); };
|
||||
}
|
||||
else
|
||||
{
|
||||
data->callback = [this](ring_data_t *data) { msgr->handle_client_read(this, data->res); };
|
||||
}
|
||||
my_uring_prep_recvmsg(sqe, peer_fd, &cl->read_msg, 0);
|
||||
}
|
||||
else
|
||||
{
|
||||
int result = recvmsg(peer_fd, &cl->read_msg, 0);
|
||||
if (result < 0)
|
||||
{
|
||||
result = -errno;
|
||||
}
|
||||
msgr->handle_client_read(this, result);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void osd_messenger_t::handle_client_read(osd_client_t *cl, int res)
|
||||
{
|
||||
res = cl->handle_read(res);
|
||||
if (res == -ENOENT)
|
||||
{
|
||||
if (!cl->refs)
|
||||
delete cl;
|
||||
}
|
||||
else if (res == -EIO)
|
||||
{
|
||||
stop_client(cl->peer_fd);
|
||||
}
|
||||
else if (res == -EAGAIN)
|
||||
{
|
||||
read_ready_clients.push_back(cl->peer_fd);
|
||||
}
|
||||
}
|
||||
|
||||
void msgr_iothread_t::handle_client_read(osd_client_t *cl, int res)
|
||||
{
|
||||
cl->mu.lock();
|
||||
res = cl->handle_read(res);
|
||||
if (res == -ENOENT)
|
||||
{
|
||||
if (!cl->refs)
|
||||
cl->msgr->set_immediate([cl]() { delete cl; });
|
||||
}
|
||||
cl->mu.unlock();
|
||||
if (res == -EIO)
|
||||
{
|
||||
cl->msgr->stop_client_from_iothread(cl);
|
||||
}
|
||||
else if (res == -EAGAIN)
|
||||
{
|
||||
read_ready_clients.push_back(cl->peer_fd);
|
||||
ring->wakeup();
|
||||
}
|
||||
}
|
||||
|
||||
int osd_client_t::handle_read(int result)
|
||||
{
|
||||
auto cl = this;
|
||||
cl->read_msg.msg_iovlen = 0;
|
||||
cl->refs--;
|
||||
if (cl->peer_state == PEER_STOPPED)
|
||||
{
|
||||
if (cl->refs <= 0)
|
||||
{
|
||||
delete cl;
|
||||
}
|
||||
return false;
|
||||
return -ENOENT;
|
||||
}
|
||||
if (result <= 0 && result != -EAGAIN && result != -EINTR)
|
||||
{
|
||||
|
@ -74,27 +131,14 @@ bool osd_messenger_t::handle_read(int result, osd_client_t *cl)
|
|||
{
|
||||
fprintf(stderr, "Client %d socket read error: %d (%s). Disconnecting client\n", cl->peer_fd, -result, strerror(-result));
|
||||
}
|
||||
stop_client(cl->peer_fd);
|
||||
return false;
|
||||
}
|
||||
if (result == -EAGAIN || result == -EINTR || result < cl->read_iov.iov_len)
|
||||
{
|
||||
cl->read_ready--;
|
||||
if (cl->read_ready > 0)
|
||||
read_ready_clients.push_back(cl->peer_fd);
|
||||
}
|
||||
else
|
||||
{
|
||||
read_ready_clients.push_back(cl->peer_fd);
|
||||
return -EIO;
|
||||
}
|
||||
int expected = cl->read_iov.iov_len;
|
||||
if (result > 0)
|
||||
{
|
||||
if (cl->read_iov.iov_base == cl->in_buf)
|
||||
{
|
||||
if (!handle_read_buffer(cl, cl->in_buf, result))
|
||||
{
|
||||
goto fin;
|
||||
}
|
||||
handle_read_buffer(cl->in_buf, result);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -103,28 +147,25 @@ bool osd_messenger_t::handle_read(int result, osd_client_t *cl)
|
|||
cl->recv_list.eat(result);
|
||||
if (cl->recv_list.done >= cl->recv_list.count)
|
||||
{
|
||||
if (!handle_finished_read(cl))
|
||||
{
|
||||
goto fin;
|
||||
}
|
||||
handle_finished_read();
|
||||
}
|
||||
}
|
||||
if (result >= cl->read_iov.iov_len)
|
||||
{
|
||||
ret = true;
|
||||
}
|
||||
}
|
||||
fin:
|
||||
for (auto cb: set_immediate)
|
||||
if (result == -EAGAIN || result == -EINTR || result < expected)
|
||||
{
|
||||
cb();
|
||||
cl->read_ready--;
|
||||
assert(cl->read_ready >= 0);
|
||||
}
|
||||
set_immediate.clear();
|
||||
return ret;
|
||||
if (cl->read_ready > 0)
|
||||
{
|
||||
return -EAGAIN;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool osd_messenger_t::handle_read_buffer(osd_client_t *cl, void *curbuf, int remain)
|
||||
bool osd_client_t::handle_read_buffer(void *curbuf, int remain)
|
||||
{
|
||||
auto cl = this;
|
||||
// Compose operation(s) from the buffer
|
||||
while (remain > 0)
|
||||
{
|
||||
|
@ -160,7 +201,7 @@ bool osd_messenger_t::handle_read_buffer(osd_client_t *cl, void *curbuf, int rem
|
|||
}
|
||||
if (cl->recv_list.done >= cl->recv_list.count)
|
||||
{
|
||||
if (!handle_finished_read(cl))
|
||||
if (!handle_finished_read())
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
@ -169,19 +210,20 @@ bool osd_messenger_t::handle_read_buffer(osd_client_t *cl, void *curbuf, int rem
|
|||
return true;
|
||||
}
|
||||
|
||||
bool osd_messenger_t::handle_finished_read(osd_client_t *cl)
|
||||
bool osd_client_t::handle_finished_read()
|
||||
{
|
||||
auto cl = this;
|
||||
cl->recv_list.reset();
|
||||
if (cl->read_state == CL_READ_HDR)
|
||||
{
|
||||
if (cl->read_op->req.hdr.magic == SECONDARY_OSD_REPLY_MAGIC)
|
||||
return handle_reply_hdr(cl);
|
||||
return handle_reply_hdr();
|
||||
else if (cl->read_op->req.hdr.magic == SECONDARY_OSD_OP_MAGIC)
|
||||
handle_op_hdr(cl);
|
||||
handle_op_hdr();
|
||||
else
|
||||
{
|
||||
fprintf(stderr, "Received garbage: magic=%jx id=%ju opcode=%jx from %d\n", cl->read_op->req.hdr.magic, cl->read_op->req.hdr.id, cl->read_op->req.hdr.opcode, cl->peer_fd);
|
||||
stop_client(cl->peer_fd);
|
||||
msgr->stop_client_from_iothread(cl);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -189,7 +231,7 @@ bool osd_messenger_t::handle_finished_read(osd_client_t *cl)
|
|||
{
|
||||
// Operation is ready
|
||||
cl->received_ops.push_back(cl->read_op);
|
||||
set_immediate.push_back([this, op = cl->read_op]() { exec_op(op); });
|
||||
msgr->set_immediate([msgr = this->msgr, op = cl->read_op, cl]() { msgr->exec_op(op); });
|
||||
cl->read_op = NULL;
|
||||
cl->read_state = 0;
|
||||
}
|
||||
|
@ -207,8 +249,9 @@ bool osd_messenger_t::handle_finished_read(osd_client_t *cl)
|
|||
return true;
|
||||
}
|
||||
|
||||
void osd_messenger_t::handle_op_hdr(osd_client_t *cl)
|
||||
void osd_client_t::handle_op_hdr()
|
||||
{
|
||||
auto cl = this;
|
||||
osd_op_t *cur_op = cl->read_op;
|
||||
if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ)
|
||||
{
|
||||
|
@ -285,20 +328,21 @@ void osd_messenger_t::handle_op_hdr(osd_client_t *cl)
|
|||
{
|
||||
// Operation is ready
|
||||
cl->received_ops.push_back(cur_op);
|
||||
set_immediate.push_back([this, cur_op]() { exec_op(cur_op); });
|
||||
msgr->set_immediate([msgr = this->msgr, cur_op, cl]() { msgr->exec_op(cur_op); });
|
||||
cl->read_op = NULL;
|
||||
cl->read_state = 0;
|
||||
}
|
||||
}
|
||||
|
||||
bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl)
|
||||
bool osd_client_t::handle_reply_hdr()
|
||||
{
|
||||
auto cl = this;
|
||||
auto req_it = cl->sent_ops.find(cl->read_op->req.hdr.id);
|
||||
if (req_it == cl->sent_ops.end())
|
||||
{
|
||||
// Command out of sync. Drop connection
|
||||
fprintf(stderr, "Client %d command out of sync: id %ju\n", cl->peer_fd, cl->read_op->req.hdr.id);
|
||||
stop_client(cl->peer_fd);
|
||||
msgr->stop_client_from_iothread(cl);
|
||||
return false;
|
||||
}
|
||||
osd_op_t *op = req_it->second;
|
||||
|
@ -315,7 +359,7 @@ bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl)
|
|||
fprintf(stderr, "Client %d read reply of different length: expected %u+%u, got %jd+%u\n",
|
||||
cl->peer_fd, expected_size, op->bitmap_len, op->reply.hdr.retval, bmp_len);
|
||||
cl->sent_ops[op->req.hdr.id] = op;
|
||||
stop_client(cl->peer_fd);
|
||||
msgr->stop_client_from_iothread(cl);
|
||||
return false;
|
||||
}
|
||||
if (bmp_len > 0)
|
||||
|
@ -391,24 +435,92 @@ reuse:
|
|||
return true;
|
||||
}
|
||||
|
||||
void osd_messenger_t::handle_reply_ready(osd_op_t *op)
|
||||
void osd_client_t::handle_reply_ready(osd_op_t *op)
|
||||
{
|
||||
// Measure subop latency
|
||||
timespec tv_end;
|
||||
clock_gettime(CLOCK_REALTIME, &tv_end);
|
||||
stats.subop_stat_count[op->req.hdr.opcode]++;
|
||||
if (!stats.subop_stat_count[op->req.hdr.opcode])
|
||||
msgr->set_immediate([msgr = this->msgr, op, cl = this]()
|
||||
{
|
||||
// Measure subop latency
|
||||
auto & stats = msgr->stats;
|
||||
timespec tv_end;
|
||||
clock_gettime(CLOCK_REALTIME, &tv_end);
|
||||
stats.subop_stat_count[op->req.hdr.opcode]++;
|
||||
stats.subop_stat_sum[op->req.hdr.opcode] = 0;
|
||||
}
|
||||
stats.subop_stat_sum[op->req.hdr.opcode] += (
|
||||
(tv_end.tv_sec - op->tv_begin.tv_sec)*1000000 +
|
||||
(tv_end.tv_nsec - op->tv_begin.tv_nsec)/1000
|
||||
);
|
||||
set_immediate.push_back([op]()
|
||||
{
|
||||
if (!stats.subop_stat_count[op->req.hdr.opcode])
|
||||
{
|
||||
stats.subop_stat_count[op->req.hdr.opcode]++;
|
||||
stats.subop_stat_sum[op->req.hdr.opcode] = 0;
|
||||
}
|
||||
stats.subop_stat_sum[op->req.hdr.opcode] += (
|
||||
(tv_end.tv_sec - op->tv_begin.tv_sec)*1000000 +
|
||||
(tv_end.tv_nsec - op->tv_begin.tv_nsec)/1000
|
||||
);
|
||||
// Copy lambda to be unaffected by `delete op`
|
||||
std::function<void(osd_op_t*)>(op->callback)(op);
|
||||
});
|
||||
}
|
||||
|
||||
static uint64_t one = 1;
|
||||
|
||||
void osd_messenger_t::set_immediate(std::function<void()> cb/*, ring_loop_t *ringloop*/)
|
||||
{
|
||||
if (!iothreads.size())
|
||||
{
|
||||
immediates.push_back(cb);
|
||||
return;
|
||||
}
|
||||
immediates_mu.lock();
|
||||
bool wakeup_main_thread = !immediates.size();
|
||||
immediates.push_back(cb);
|
||||
immediates_mu.unlock();
|
||||
if (wakeup_main_thread)
|
||||
{
|
||||
// io_uring_sqe* sqe = ringloop ? ringloop->get_sqe() : NULL;
|
||||
// if (!sqe)
|
||||
// {
|
||||
write(immediates_eventfd, &one, sizeof(one));
|
||||
// FIXME: Can't use ringloop here, oops
|
||||
// }
|
||||
// else
|
||||
// {
|
||||
// ring_data_t* data = ((ring_data_t*)sqe->user_data);
|
||||
// data->callback = [](ring_data_t*){};
|
||||
// my_uring_prep_write(sqe, immediates_eventfd, &one, sizeof(one), 0);
|
||||
// }
|
||||
}
|
||||
}
|
||||
|
||||
void osd_messenger_t::set_immediate_or_run(std::function<void()> cb/*, ring_loop_t *ringloop*/)
|
||||
{
|
||||
if (!iothreads.size())
|
||||
{
|
||||
cb();
|
||||
return;
|
||||
}
|
||||
immediates_mu.lock();
|
||||
bool wakeup_main_thread = !immediates.size();
|
||||
immediates.push_back(cb);
|
||||
immediates_mu.unlock();
|
||||
if (wakeup_main_thread)
|
||||
{
|
||||
// io_uring_sqe* sqe = ringloop ? ringloop->get_sqe() : NULL;
|
||||
// if (!sqe)
|
||||
// {
|
||||
write(immediates_eventfd, &one, sizeof(one));
|
||||
// FIXME: Can't use ringloop here, oops
|
||||
// }
|
||||
// else
|
||||
// {
|
||||
// ring_data_t* data = ((ring_data_t*)sqe->user_data);
|
||||
// data->callback = [](ring_data_t*){};
|
||||
// my_uring_prep_write(sqe, immediates_eventfd, &one, sizeof(one), 0);
|
||||
// }
|
||||
}
|
||||
}
|
||||
|
||||
void osd_messenger_t::handle_immediates()
|
||||
{
|
||||
auto to_run = std::move(immediates);
|
||||
for (auto & cb: to_run)
|
||||
{
|
||||
cb();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,10 +15,17 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
|
|||
{
|
||||
clock_gettime(CLOCK_REALTIME, &cur_op->tv_begin);
|
||||
}
|
||||
else
|
||||
else if (cur_op->op_type == OSD_OP_IN)
|
||||
{
|
||||
measure_exec(cur_op);
|
||||
}
|
||||
if (iothreads.size())
|
||||
{
|
||||
cl->mu.lock();
|
||||
}
|
||||
if (cur_op->op_type == OSD_OP_IN)
|
||||
{
|
||||
// Check that operation actually belongs to this client
|
||||
// FIXME: Review if this is still needed
|
||||
bool found = false;
|
||||
for (auto it = cl->received_ops.begin(); it != cl->received_ops.end(); it++)
|
||||
{
|
||||
|
@ -32,6 +39,10 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
|
|||
if (!found)
|
||||
{
|
||||
delete cur_op;
|
||||
if (iothreads.size())
|
||||
{
|
||||
cl->mu.unlock();
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -39,7 +50,6 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
|
|||
auto & to_outbox = cl->write_msg.msg_iovlen ? cl->next_outbox : cl->outbox;
|
||||
if (cur_op->op_type == OSD_OP_IN)
|
||||
{
|
||||
measure_exec(cur_op);
|
||||
to_send_list.push_back((iovec){ .iov_base = cur_op->reply.buf, .iov_len = OSD_PACKET_SIZE });
|
||||
}
|
||||
else
|
||||
|
@ -108,21 +118,36 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
|
|||
#ifdef WITH_RDMA
|
||||
if (cl->peer_state == PEER_RDMA)
|
||||
{
|
||||
if (iothreads.size())
|
||||
{
|
||||
cl->mu.unlock();
|
||||
}
|
||||
try_send_rdma(cl);
|
||||
return;
|
||||
}
|
||||
#endif
|
||||
if (!ringloop)
|
||||
if (iothreads.size())
|
||||
{
|
||||
int should_wakeup = !cl->write_msg.msg_iovlen && !cl->write_state;
|
||||
cl->write_state = CL_WRITE_READY;
|
||||
cl->mu.unlock();
|
||||
if (should_wakeup)
|
||||
{
|
||||
auto iot = iothreads[cl->peer_fd % iothreads.size()];
|
||||
iot->wakeup_out(cl->peer_fd, ringloop);
|
||||
}
|
||||
}
|
||||
else if (!ringloop)
|
||||
{
|
||||
// FIXME: It's worse because it doesn't allow batching
|
||||
while (cl->outbox.size())
|
||||
{
|
||||
try_send(cl);
|
||||
cl->try_send(NULL, true);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if ((cl->write_msg.msg_iovlen > 0 || !try_send(cl)) && (cl->write_state == 0))
|
||||
if ((cl->write_msg.msg_iovlen > 0 || !cl->try_send(ringloop, use_sync_send_recv)) && (cl->write_state == 0))
|
||||
{
|
||||
cl->write_state = CL_WRITE_READY;
|
||||
write_ready_clients.push_back(cur_op->peer_fd);
|
||||
|
@ -180,8 +205,9 @@ void osd_messenger_t::measure_exec(osd_op_t *cur_op)
|
|||
}
|
||||
}
|
||||
|
||||
bool osd_messenger_t::try_send(osd_client_t *cl)
|
||||
bool osd_client_t::try_send(ring_loop_t *ringloop, bool use_sync_send_recv)
|
||||
{
|
||||
auto cl = this;
|
||||
int peer_fd = cl->peer_fd;
|
||||
if (!cl->send_list.size() || cl->write_msg.msg_iovlen > 0)
|
||||
{
|
||||
|
@ -198,7 +224,14 @@ bool osd_messenger_t::try_send(osd_client_t *cl)
|
|||
cl->write_msg.msg_iovlen = cl->send_list.size() < IOV_MAX ? cl->send_list.size() : IOV_MAX;
|
||||
cl->refs++;
|
||||
ring_data_t* data = ((ring_data_t*)sqe->user_data);
|
||||
data->callback = [this, cl](ring_data_t *data) { handle_send(data->res, cl); };
|
||||
if (msgr->iothreads.size())
|
||||
{
|
||||
data->callback = [this](ring_data_t *data) { msgr->iothreads[this->peer_fd % msgr->iothreads.size()]->handle_client_send(this, data->res); };
|
||||
}
|
||||
else
|
||||
{
|
||||
data->callback = [this](ring_data_t *data) { msgr->handle_client_send(this, data->res); };
|
||||
}
|
||||
my_uring_prep_sendmsg(sqe, peer_fd, &cl->write_msg, 0);
|
||||
}
|
||||
else
|
||||
|
@ -211,18 +244,68 @@ bool osd_messenger_t::try_send(osd_client_t *cl)
|
|||
{
|
||||
result = -errno;
|
||||
}
|
||||
handle_send(result, cl);
|
||||
msgr->handle_client_send(this, result);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void osd_messenger_t::handle_client_send(osd_client_t *cl, int res)
|
||||
{
|
||||
res = cl->handle_send(res);
|
||||
if (res == -ENOENT)
|
||||
{
|
||||
if (!cl->refs)
|
||||
delete cl;
|
||||
}
|
||||
else if (res == -EIO)
|
||||
{
|
||||
stop_client(cl->peer_fd);
|
||||
}
|
||||
else if (res == -EAGAIN)
|
||||
{
|
||||
write_ready_clients.push_back(cl->peer_fd);
|
||||
}
|
||||
}
|
||||
|
||||
void msgr_iothread_t::handle_client_send(osd_client_t *cl, int res)
|
||||
{
|
||||
cl->mu.lock();
|
||||
res = cl->handle_send(res);
|
||||
if (res == -ENOENT)
|
||||
{
|
||||
if (!cl->refs)
|
||||
cl->msgr->set_immediate([cl]() { delete cl; });
|
||||
}
|
||||
cl->mu.unlock();
|
||||
if (res == -EIO)
|
||||
{
|
||||
cl->msgr->stop_client_from_iothread(cl);
|
||||
}
|
||||
else if (res == -EAGAIN)
|
||||
{
|
||||
write_ready_mu.lock();
|
||||
write_ready_clients.push_back(cl->peer_fd);
|
||||
write_ready_mu.unlock();
|
||||
ring->wakeup();
|
||||
}
|
||||
}
|
||||
|
||||
void osd_messenger_t::send_replies()
|
||||
{
|
||||
if (iothreads.size())
|
||||
{
|
||||
return;
|
||||
}
|
||||
for (int i = 0; i < write_ready_clients.size(); i++)
|
||||
{
|
||||
int peer_fd = write_ready_clients[i];
|
||||
auto cl_it = clients.find(peer_fd);
|
||||
if (cl_it != clients.end() && !try_send(cl_it->second))
|
||||
if (cl_it == clients.end())
|
||||
{
|
||||
continue;
|
||||
}
|
||||
auto cl = cl_it->second;
|
||||
if (!cl->try_send(ringloop, use_sync_send_recv))
|
||||
{
|
||||
write_ready_clients.erase(write_ready_clients.begin(), write_ready_clients.begin() + i);
|
||||
return;
|
||||
|
@ -231,24 +314,20 @@ void osd_messenger_t::send_replies()
|
|||
write_ready_clients.clear();
|
||||
}
|
||||
|
||||
void osd_messenger_t::handle_send(int result, osd_client_t *cl)
|
||||
int osd_client_t::handle_send(int result)
|
||||
{
|
||||
auto cl = this;
|
||||
cl->write_msg.msg_iovlen = 0;
|
||||
cl->refs--;
|
||||
if (cl->peer_state == PEER_STOPPED)
|
||||
{
|
||||
if (cl->refs <= 0)
|
||||
{
|
||||
delete cl;
|
||||
}
|
||||
return;
|
||||
return -ENOENT;
|
||||
}
|
||||
if (result < 0 && result != -EAGAIN && result != -EINTR)
|
||||
{
|
||||
// this is a client socket, so don't panic. just disconnect it
|
||||
fprintf(stderr, "Client %d socket write error: %d (%s). Disconnecting client\n", cl->peer_fd, -result, strerror(-result));
|
||||
stop_client(cl->peer_fd);
|
||||
return;
|
||||
return -EIO;
|
||||
}
|
||||
if (result >= 0)
|
||||
{
|
||||
|
@ -261,7 +340,7 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl)
|
|||
if (cl->outbox[done].flags & MSGR_SENDP_FREE)
|
||||
{
|
||||
// Reply fully sent
|
||||
delete cl->outbox[done].op;
|
||||
msgr->set_immediate_or_run([op = cl->outbox[done].op] { delete op; });
|
||||
}
|
||||
result -= iov.iov_len;
|
||||
done++;
|
||||
|
@ -291,26 +370,35 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl)
|
|||
{
|
||||
// FIXME: Do something better than just forgetting the FD
|
||||
// FIXME: Ignore pings during RDMA state transition
|
||||
if (log_level > 0)
|
||||
{
|
||||
fprintf(stderr, "Successfully connected with client %d using RDMA\n", cl->peer_fd);
|
||||
}
|
||||
cl->peer_state = PEER_RDMA;
|
||||
tfd->set_fd_handler(cl->peer_fd, false, [this](int peer_fd, int epoll_events)
|
||||
msgr->set_immediate_or_run([cl = this, msgr = this->msgr, peer_fd = this->peer_fd]()
|
||||
{
|
||||
// Do not miss the disconnection!
|
||||
if (epoll_events & EPOLLRDHUP)
|
||||
auto cl_it = msgr->clients.find(peer_fd);
|
||||
if (cl_it == msgr->clients.end() || cl_it->second != cl)
|
||||
{
|
||||
handle_peer_epoll(peer_fd, epoll_events);
|
||||
return;
|
||||
}
|
||||
if (msgr->log_level > 0)
|
||||
{
|
||||
fprintf(stderr, "Successfully connected with client %d using RDMA\n", peer_fd);
|
||||
}
|
||||
msgr->tfd->set_fd_handler(peer_fd, 0, [msgr](int peer_fd, int epoll_events)
|
||||
{
|
||||
// Do not miss the disconnection!
|
||||
if (epoll_events & EPOLLRDHUP)
|
||||
{
|
||||
msgr->handle_peer_epoll(peer_fd, epoll_events);
|
||||
}
|
||||
});
|
||||
// Add the initial receive request
|
||||
msgr->try_recv_rdma(cl);
|
||||
});
|
||||
// Add the initial receive request
|
||||
try_recv_rdma(cl);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
if (cl->write_state != 0)
|
||||
{
|
||||
write_ready_clients.push_back(cl->peer_fd);
|
||||
return -EAGAIN;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@
|
|||
|
||||
void osd_messenger_t::cancel_osd_ops(osd_client_t *cl)
|
||||
{
|
||||
cl->mu.lock();
|
||||
std::vector<osd_op_t*> cancel_ops;
|
||||
cancel_ops.resize(cl->sent_ops.size());
|
||||
int i = 0;
|
||||
|
@ -20,6 +21,7 @@ void osd_messenger_t::cancel_osd_ops(osd_client_t *cl)
|
|||
}
|
||||
cl->sent_ops.clear();
|
||||
cl->outbox.clear();
|
||||
cl->mu.unlock();
|
||||
for (auto op: cancel_ops)
|
||||
{
|
||||
cancel_op(op);
|
||||
|
@ -53,8 +55,10 @@ void osd_messenger_t::stop_client(int peer_fd, bool force, bool force_delete)
|
|||
return;
|
||||
}
|
||||
osd_client_t *cl = it->second;
|
||||
cl->mu.lock();
|
||||
if (cl->peer_state == PEER_CONNECTING && !force || cl->peer_state == PEER_STOPPED)
|
||||
{
|
||||
cl->mu.unlock();
|
||||
return;
|
||||
}
|
||||
if (log_level > 0)
|
||||
|
@ -71,6 +75,7 @@ void osd_messenger_t::stop_client(int peer_fd, bool force, bool force_delete)
|
|||
// First set state to STOPPED so another stop_client() call doesn't try to free it again
|
||||
cl->refs++;
|
||||
cl->peer_state = PEER_STOPPED;
|
||||
cl->mu.unlock();
|
||||
if (cl->osd_num)
|
||||
{
|
||||
// ...and forget OSD peer
|
||||
|
@ -78,7 +83,11 @@ void osd_messenger_t::stop_client(int peer_fd, bool force, bool force_delete)
|
|||
}
|
||||
#ifndef __MOCK__
|
||||
// Then remove FD from the eventloop so we don't accidentally read something
|
||||
tfd->set_fd_handler(peer_fd, false, NULL);
|
||||
tfd->set_fd_handler(peer_fd, 0, NULL);
|
||||
if (iothreads.size())
|
||||
{
|
||||
iothreads[peer_fd % iothreads.size()]->remove_client(cl);
|
||||
}
|
||||
if (cl->connect_timeout_id >= 0)
|
||||
{
|
||||
tfd->clear_timer(cl->connect_timeout_id);
|
||||
|
@ -108,17 +117,24 @@ void osd_messenger_t::stop_client(int peer_fd, bool force, bool force_delete)
|
|||
repeer_pgs(cl->osd_num);
|
||||
}
|
||||
// Then cancel all operations
|
||||
cl->mu.lock();
|
||||
if (cl->read_op)
|
||||
{
|
||||
if (!cl->read_op->callback)
|
||||
auto op = cl->read_op;
|
||||
cl->read_op = NULL;
|
||||
cl->mu.unlock();
|
||||
if (!op->callback)
|
||||
{
|
||||
delete cl->read_op;
|
||||
delete op;
|
||||
}
|
||||
else
|
||||
{
|
||||
cancel_op(cl->read_op);
|
||||
cancel_op(op);
|
||||
}
|
||||
cl->read_op = NULL;
|
||||
}
|
||||
else
|
||||
{
|
||||
cl->mu.unlock();
|
||||
}
|
||||
if (cl->osd_num)
|
||||
{
|
||||
|
@ -131,11 +147,32 @@ void osd_messenger_t::stop_client(int peer_fd, bool force, bool force_delete)
|
|||
{
|
||||
clients.erase(it);
|
||||
}
|
||||
cl->mu.lock();
|
||||
cl->refs--;
|
||||
if (cl->refs <= 0 || force_delete)
|
||||
{
|
||||
cl->mu.unlock();
|
||||
delete cl;
|
||||
}
|
||||
else
|
||||
cl->mu.unlock();
|
||||
}
|
||||
|
||||
void osd_messenger_t::stop_client_from_iothread(osd_client_t *cl)
|
||||
{
|
||||
if (!iothreads.size())
|
||||
{
|
||||
stop_client(cl->peer_fd);
|
||||
return;
|
||||
}
|
||||
set_immediate([this, cl, peer_fd = cl->peer_fd]()
|
||||
{
|
||||
auto cl_it = clients.find(peer_fd);
|
||||
if (cl_it != clients.end() && cl_it->second == cl)
|
||||
{
|
||||
stop_client(peer_fd);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
osd_client_t::~osd_client_t()
|
||||
|
|
|
@ -655,7 +655,7 @@ help:
|
|||
ringloop->register_consumer(&consumer);
|
||||
// Add FD to epoll
|
||||
bool stop = false;
|
||||
epmgr->tfd->set_fd_handler(sockfd[0], false, [this, &stop](int peer_fd, int epoll_events)
|
||||
epmgr->tfd->set_fd_handler(sockfd[0], EPOLLIN, [this, &stop](int peer_fd, int epoll_events)
|
||||
{
|
||||
if (epoll_events & EPOLLRDHUP)
|
||||
{
|
||||
|
|
|
@ -185,7 +185,7 @@ void kv_cli_t::run()
|
|||
fcntl(0, F_SETFL, fcntl(0, F_GETFL, 0) | O_NONBLOCK);
|
||||
try
|
||||
{
|
||||
epmgr->tfd->set_fd_handler(0, false, [this](int fd, int events)
|
||||
epmgr->tfd->set_fd_handler(0, EPOLLIN, [this](int fd, int events)
|
||||
{
|
||||
if (events & EPOLLIN)
|
||||
{
|
||||
|
@ -193,7 +193,7 @@ void kv_cli_t::run()
|
|||
}
|
||||
if (events & EPOLLRDHUP)
|
||||
{
|
||||
epmgr->tfd->set_fd_handler(0, false, NULL);
|
||||
epmgr->tfd->set_fd_handler(0, 0, NULL);
|
||||
finished = true;
|
||||
}
|
||||
});
|
||||
|
|
|
@ -243,7 +243,7 @@ void nfs_proxy_t::run(json11::Json cfg)
|
|||
// Create NFS socket and add it to epoll
|
||||
int nfs_socket = create_and_bind_socket(bind_address, nfs_port, 128, &listening_port);
|
||||
fcntl(nfs_socket, F_SETFL, fcntl(nfs_socket, F_GETFL, 0) | O_NONBLOCK);
|
||||
epmgr->tfd->set_fd_handler(nfs_socket, false, [this](int nfs_socket, int epoll_events)
|
||||
epmgr->tfd->set_fd_handler(nfs_socket, EPOLLIN, [this](int nfs_socket, int epoll_events)
|
||||
{
|
||||
if (epoll_events & EPOLLRDHUP)
|
||||
{
|
||||
|
@ -260,7 +260,7 @@ void nfs_proxy_t::run(json11::Json cfg)
|
|||
// Create portmap socket and add it to epoll
|
||||
int portmap_socket = create_and_bind_socket(bind_address, 111, 128, NULL);
|
||||
fcntl(portmap_socket, F_SETFL, fcntl(portmap_socket, F_GETFL, 0) | O_NONBLOCK);
|
||||
epmgr->tfd->set_fd_handler(portmap_socket, false, [this](int portmap_socket, int epoll_events)
|
||||
epmgr->tfd->set_fd_handler(portmap_socket, EPOLLIN, [this](int portmap_socket, int epoll_events)
|
||||
{
|
||||
if (epoll_events & EPOLLRDHUP)
|
||||
{
|
||||
|
@ -466,7 +466,7 @@ void nfs_proxy_t::do_accept(int listen_fd)
|
|||
{
|
||||
cli->proc_table.insert(fn);
|
||||
}
|
||||
epmgr->tfd->set_fd_handler(nfs_fd, true, [cli](int nfs_fd, int epoll_events)
|
||||
epmgr->tfd->set_fd_handler(nfs_fd, EPOLLIN|EPOLLOUT, [cli](int nfs_fd, int epoll_events)
|
||||
{
|
||||
// Handle incoming event
|
||||
if (epoll_events & EPOLLRDHUP)
|
||||
|
@ -723,7 +723,7 @@ void nfs_client_t::stop()
|
|||
stopped = true;
|
||||
if (refs <= 0)
|
||||
{
|
||||
parent->epmgr->tfd->set_fd_handler(nfs_fd, true, NULL);
|
||||
parent->epmgr->tfd->set_fd_handler(nfs_fd, 0, NULL);
|
||||
close(nfs_fd);
|
||||
delete this;
|
||||
}
|
||||
|
|
|
@ -141,6 +141,14 @@ void osd_t::parse_config(bool init)
|
|||
config = msgr.merge_configs(cli_config, file_config, etcd_global_config, etcd_osd_config);
|
||||
if (config.find("log_level") == this->config.end())
|
||||
config["log_level"] = 1;
|
||||
if (init)
|
||||
{
|
||||
// OSD number
|
||||
osd_num = config["osd_num"].uint64_value();
|
||||
if (!osd_num)
|
||||
throw std::runtime_error("osd_num is required in the configuration");
|
||||
msgr.osd_num = osd_num;
|
||||
}
|
||||
if (bs)
|
||||
{
|
||||
auto bs_cfg = json_to_bs(config);
|
||||
|
@ -150,11 +158,6 @@ void osd_t::parse_config(bool init)
|
|||
msgr.parse_config(config);
|
||||
if (init)
|
||||
{
|
||||
// OSD number
|
||||
osd_num = config["osd_num"].uint64_value();
|
||||
if (!osd_num)
|
||||
throw std::runtime_error("osd_num is required in the configuration");
|
||||
msgr.osd_num = osd_num;
|
||||
// Vital Blockstore parameters
|
||||
bs_block_size = config["block_size"].uint64_value();
|
||||
if (!bs_block_size)
|
||||
|
@ -361,7 +364,7 @@ void osd_t::bind_socket()
|
|||
listen_fd = create_and_bind_socket(bind_address, bind_port, listen_backlog, &listening_port);
|
||||
fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
|
||||
|
||||
epmgr->set_fd_handler(listen_fd, false, [this](int fd, int events)
|
||||
epmgr->set_fd_handler(listen_fd, EPOLLIN, [this](int fd, int events)
|
||||
{
|
||||
msgr.accept_connections(listen_fd);
|
||||
});
|
||||
|
|
|
@ -247,6 +247,7 @@ resume_8:
|
|||
finish:
|
||||
if (cur_op->peer_fd)
|
||||
{
|
||||
// FIXME: Do it before executing sync
|
||||
auto it = msgr.clients.find(cur_op->peer_fd);
|
||||
if (it != msgr.clients.end())
|
||||
it->second->dirty_pgs.clear();
|
||||
|
|
|
@ -43,7 +43,7 @@ int main(int narg, char *args[])
|
|||
// Accept new connections
|
||||
int listen_fd = create_and_bind_socket("0.0.0.0", 11203, 128, NULL);
|
||||
fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
|
||||
epmgr->set_fd_handler(listen_fd, false, [listen_fd, msgr](int fd, int events)
|
||||
epmgr->set_fd_handler(listen_fd, EPOLLIN, [listen_fd, msgr](int fd, int events)
|
||||
{
|
||||
msgr->accept_connections(listen_fd);
|
||||
});
|
||||
|
|
|
@ -21,7 +21,7 @@ epoll_manager_t::epoll_manager_t(ring_loop_t *ringloop)
|
|||
throw std::runtime_error(std::string("epoll_create: ") + strerror(errno));
|
||||
}
|
||||
|
||||
tfd = new timerfd_manager_t([this](int fd, bool wr, std::function<void(int, int)> handler) { set_fd_handler(fd, wr, handler); });
|
||||
tfd = new timerfd_manager_t([this](int fd, int events, std::function<void(int, int)> handler) { set_fd_handler(fd, events, handler); });
|
||||
|
||||
if (ringloop)
|
||||
{
|
||||
|
@ -54,14 +54,14 @@ int epoll_manager_t::get_fd()
|
|||
return epoll_fd;
|
||||
}
|
||||
|
||||
void epoll_manager_t::set_fd_handler(int fd, bool wr, std::function<void(int, int)> handler)
|
||||
void epoll_manager_t::set_fd_handler(int fd, int events, std::function<void(int, int)> handler)
|
||||
{
|
||||
if (handler != NULL)
|
||||
{
|
||||
bool exists = epoll_handlers.find(fd) != epoll_handlers.end();
|
||||
epoll_event ev;
|
||||
ev.data.fd = fd;
|
||||
ev.events = (wr ? EPOLLOUT : 0) | EPOLLIN | EPOLLRDHUP | EPOLLET;
|
||||
ev.events = events | EPOLLRDHUP | EPOLLET;
|
||||
if (epoll_ctl(epoll_fd, exists ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &ev) < 0)
|
||||
{
|
||||
if (errno == ENOENT)
|
||||
|
|
|
@ -3,6 +3,8 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include <sys/epoll.h>
|
||||
|
||||
#include <map>
|
||||
|
||||
#include "ringloop.h"
|
||||
|
@ -21,7 +23,7 @@ public:
|
|||
epoll_manager_t(ring_loop_t *ringloop);
|
||||
~epoll_manager_t();
|
||||
int get_fd();
|
||||
void set_fd_handler(int fd, bool wr, std::function<void(int, int)> handler);
|
||||
void set_fd_handler(int fd, int events, std::function<void(int, int)> handler);
|
||||
void handle_events(int timeout);
|
||||
|
||||
timerfd_manager_t *tfd;
|
||||
|
|
|
@ -32,12 +32,22 @@ static inline void my_uring_prep_readv(struct io_uring_sqe *sqe, int fd, const s
|
|||
my_uring_prep_rw(IORING_OP_READV, sqe, fd, iovecs, nr_vecs, offset);
|
||||
}
|
||||
|
||||
static inline void my_uring_prep_read(struct io_uring_sqe *sqe, int fd, void *buf, unsigned nbytes, off_t offset)
|
||||
{
|
||||
my_uring_prep_rw(IORING_OP_READ, sqe, fd, buf, nbytes, offset);
|
||||
}
|
||||
|
||||
static inline void my_uring_prep_read_fixed(struct io_uring_sqe *sqe, int fd, void *buf, unsigned nbytes, off_t offset, int buf_index)
|
||||
{
|
||||
my_uring_prep_rw(IORING_OP_READ_FIXED, sqe, fd, buf, nbytes, offset);
|
||||
sqe->buf_index = buf_index;
|
||||
}
|
||||
|
||||
static inline void my_uring_prep_write(struct io_uring_sqe *sqe, int fd, void *buf, unsigned nbytes, off_t offset)
|
||||
{
|
||||
my_uring_prep_rw(IORING_OP_WRITE, sqe, fd, buf, nbytes, offset);
|
||||
}
|
||||
|
||||
static inline void my_uring_prep_writev(struct io_uring_sqe *sqe, int fd, const struct iovec *iovecs, unsigned nr_vecs, off_t offset)
|
||||
{
|
||||
my_uring_prep_rw(IORING_OP_WRITEV, sqe, fd, iovecs, nr_vecs, offset);
|
||||
|
|
|
@ -11,7 +11,7 @@
|
|||
#include <stdexcept>
|
||||
#include "timerfd_manager.h"
|
||||
|
||||
timerfd_manager_t::timerfd_manager_t(std::function<void(int, bool, std::function<void(int, int)>)> set_fd_handler)
|
||||
timerfd_manager_t::timerfd_manager_t(std::function<void(int, int, std::function<void(int, int)>)> set_fd_handler)
|
||||
{
|
||||
this->set_fd_handler = set_fd_handler;
|
||||
wait_state = 0;
|
||||
|
@ -20,7 +20,7 @@ timerfd_manager_t::timerfd_manager_t(std::function<void(int, bool, std::function
|
|||
{
|
||||
throw std::runtime_error(std::string("timerfd_create: ") + strerror(errno));
|
||||
}
|
||||
set_fd_handler(timerfd, false, [this](int fd, int events)
|
||||
set_fd_handler(timerfd, EPOLLIN, [this](int fd, int events)
|
||||
{
|
||||
handle_readable();
|
||||
});
|
||||
|
@ -28,7 +28,7 @@ timerfd_manager_t::timerfd_manager_t(std::function<void(int, bool, std::function
|
|||
|
||||
timerfd_manager_t::~timerfd_manager_t()
|
||||
{
|
||||
set_fd_handler(timerfd, false, NULL);
|
||||
set_fd_handler(timerfd, 0, NULL);
|
||||
close(timerfd);
|
||||
}
|
||||
|
||||
|
|
|
@ -30,9 +30,9 @@ class timerfd_manager_t
|
|||
void trigger_nearest();
|
||||
void handle_readable();
|
||||
public:
|
||||
std::function<void(int, bool, std::function<void(int, int)>)> set_fd_handler;
|
||||
std::function<void(int, int, std::function<void(int, int)>)> set_fd_handler;
|
||||
|
||||
timerfd_manager_t(std::function<void(int, bool, std::function<void(int, int)>)> set_fd_handler);
|
||||
timerfd_manager_t(std::function<void(int, int, std::function<void(int, int)>)> set_fd_handler);
|
||||
~timerfd_manager_t();
|
||||
int set_timer(uint64_t millis, bool repeat, std::function<void(int)> callback);
|
||||
int set_timer_us(uint64_t micros, bool repeat, std::function<void(int)> callback);
|
||||
|
|
Loading…
Reference in New Issue