Compare commits

...

2 Commits

Author SHA1 Message Date
Vitaliy Filippov 249a233b37 WIP another experiment - "smart" iothreads
Test / test_snapshot_chain_ec (push) Successful in 2m55s Details
Test / test_rebalance_verify_imm (push) Successful in 3m55s Details
Test / test_root_node (push) Successful in 11s Details
Test / test_rebalance_verify (push) Successful in 4m32s Details
Test / test_switch_primary (push) Successful in 37s Details
Test / test_rebalance_verify_ec_imm (push) Successful in 2m53s Details
Test / test_write (push) Successful in 46s Details
Test / test_write_no_same (push) Successful in 17s Details
Test / test_write_xor (push) Successful in 1m55s Details
Test / test_rebalance_verify_ec (push) Successful in 5m59s Details
Test / test_heal_pg_size_2 (push) Successful in 3m58s Details
Test / test_heal_ec (push) Successful in 3m51s Details
Test / test_heal_csum_32k_dmj (push) Successful in 5m47s Details
Test / test_heal_csum_32k_dj (push) Successful in 6m10s Details
Test / test_heal_csum_4k_dmj (push) Successful in 6m50s Details
Test / test_osd_tags (push) Successful in 19s Details
Test / test_enospc (push) Successful in 1m11s Details
Test / test_heal_csum_4k_dj (push) Successful in 6m1s Details
Test / test_enospc_xor (push) Successful in 1m11s Details
Test / test_heal_csum_32k (push) Failing after 10m14s Details
Test / test_enospc_imm (push) Successful in 46s Details
Test / test_enospc_imm_xor (push) Successful in 1m3s Details
Test / test_scrub (push) Successful in 28s Details
Test / test_scrub_zero_osd_2 (push) Successful in 28s Details
Test / test_scrub_xor (push) Successful in 34s Details
Test / test_scrub_pg_size_6_pg_minsize_4_osd_count_6_ec (push) Successful in 39s Details
Test / test_scrub_pg_size_3 (push) Successful in 49s Details
Test / test_scrub_ec (push) Successful in 25s Details
Test / test_nfs (push) Successful in 16s Details
Test / test_heal_csum_4k (push) Successful in 8m55s Details
2024-07-03 11:05:45 +03:00
Vitaliy Filippov d07e072212 Change bool wr to event mask in epoll_manager 2024-07-01 00:30:59 +03:00
18 changed files with 770 additions and 183 deletions

View File

@ -271,7 +271,7 @@ void http_co_t::close_connection()
}
if (peer_fd >= 0)
{
tfd->set_fd_handler(peer_fd, false, NULL);
tfd->set_fd_handler(peer_fd, 0, NULL);
close(peer_fd);
peer_fd = -1;
}
@ -314,7 +314,7 @@ void http_co_t::start_connection()
stackout();
return;
}
tfd->set_fd_handler(peer_fd, true, [this](int peer_fd, int epoll_events)
tfd->set_fd_handler(peer_fd, EPOLLIN|EPOLLOUT, [this](int peer_fd, int epoll_events)
{
this->epoll_events |= epoll_events;
handle_events();
@ -372,7 +372,7 @@ void http_co_t::handle_connect_result()
}
int one = 1;
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events)
tfd->set_fd_handler(peer_fd, EPOLLIN, [this](int peer_fd, int epoll_events)
{
this->epoll_events |= epoll_events;
handle_events();

View File

@ -15,6 +15,207 @@
#include "msgr_rdma.h"
#endif
#include <sys/poll.h>
#include <sys/eventfd.h>
static uint64_t one = 1;
msgr_iothread_t::msgr_iothread_t()
{
ring = new ring_loop_t(RINGLOOP_DEFAULT_SIZE);
epmgr = new epoll_manager_t(ring);
submit_eventfd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
if (submit_eventfd < 0)
{
throw std::runtime_error(std::string("failed to create eventfd: ")+strerror(errno));
}
epmgr->tfd->set_fd_handler(submit_eventfd, EPOLLIN, [this](int fd, int epoll_events)
{
// Reset eventfd counter
uint64_t ctr = 0;
int r = read(submit_eventfd, &ctr, 8);
if (r < 0 && errno != EAGAIN && errno != EINTR)
{
fprintf(stderr, "Error resetting eventfd: %s\n", strerror(errno));
}
ring->wakeup();
});
consumer.loop = [this]()
{
read_requests();
send_replies();
ring->submit();
};
ring->register_consumer(&consumer);
thread = new std::thread(&msgr_iothread_t::run, this);
}
msgr_iothread_t::~msgr_iothread_t()
{
stop();
delete thread;
delete epmgr;
delete ring;
}
void msgr_iothread_t::stop()
{
mu.lock();
if (stopped)
{
mu.unlock();
return;
}
stopped = true;
write(submit_eventfd, &one, sizeof(one));
mu.unlock();
thread->join();
ring->unregister_consumer(&consumer);
close(submit_eventfd);
}
void msgr_iothread_t::add_client(osd_client_t *cl)
{
mu.lock();
if (stopped)
{
mu.unlock();
return;
}
assert(!clients[cl->peer_fd]);
clients[cl->peer_fd] = cl;
epmgr->tfd->set_fd_handler(cl->peer_fd, EPOLLIN, [this](int peer_fd, int epoll_events)
{
// FIXME: Slight copypaste (see handle_peer_epoll)
if (epoll_events & EPOLLIN)
{
auto cl_it = clients.find(peer_fd);
if (cl_it != clients.end())
{
auto cl = cl_it->second;
cl->mu.lock();
cl->read_ready++;
if (cl->read_ready == 1)
{
read_ready_clients.push_back(peer_fd);
ring->wakeup();
}
cl->mu.unlock();
}
}
});
mu.unlock();
}
void msgr_iothread_t::remove_client(osd_client_t *cl)
{
mu.lock();
if (stopped)
{
mu.unlock();
return;
}
auto cl_it = clients.find(cl->peer_fd);
if (cl_it != clients.end() && cl_it->second == cl)
{
clients.erase(cl->peer_fd);
epmgr->tfd->set_fd_handler(cl->peer_fd, 0, NULL);
}
mu.unlock();
}
void msgr_iothread_t::wakeup_out(int peer_fd, ring_loop_t *outer_ring)
{
write_ready_mu.lock();
if (!write_ready_clients.size())
{
io_uring_sqe* sqe = outer_ring->get_sqe();
if (!sqe)
{
write(submit_eventfd, &one, sizeof(one));
}
else
{
ring_data_t* data = ((ring_data_t*)sqe->user_data);
data->callback = [](ring_data_t*){};
my_uring_prep_write(sqe, submit_eventfd, &one, sizeof(one), 0);
}
}
write_ready_clients.push_back(peer_fd);
write_ready_mu.unlock();
}
void msgr_iothread_t::read_requests()
{
// FIXME: Slight copypaste (see messenger_t::read_requests)
auto to_recv = std::move(read_ready_clients);
for (int i = 0; i < to_recv.size(); i++)
{
int peer_fd = to_recv[i];
auto cl_it = clients.find(peer_fd);
if (cl_it == clients.end())
{
continue;
}
osd_client_t *cl = cl_it->second;
cl->mu.lock();
auto ok = cl->try_recv(ring, false);
cl->mu.unlock();
if (!ok)
{
read_ready_clients.insert(read_ready_clients.end(), to_recv.begin()+i, to_recv.end());
break;
}
}
}
void msgr_iothread_t::send_replies()
{
if (stopped)
{
return;
}
write_ready_mu.lock();
auto to_send = std::move(write_ready_clients);
write_ready_mu.unlock();
for (int i = 0; i < to_send.size(); i++)
{
auto cl_it = clients.find(to_send[i]);
if (cl_it == clients.end())
{
continue;
}
auto cl = cl_it->second;
cl->mu.lock();
auto ok = cl->try_send(ring, false/*, lock*/);
cl->mu.unlock();
if (!ok)
{
// ring is full (rare but what if...)
write_ready_mu.lock();
write_ready_clients.insert(write_ready_clients.end(), to_send.begin()+i, to_send.end());
write_ready_mu.unlock();
break;
}
}
}
void msgr_iothread_t::run()
{
while (true)
{
mu.lock();
if (stopped)
{
mu.unlock();
return;
}
ring->loop();
mu.unlock();
ring->wait();
}
}
void osd_messenger_t::init()
{
#ifdef WITH_RDMA
@ -35,7 +236,7 @@ void osd_messenger_t::init()
? rdma_max_sge : rdma_context->attrx.orig_attr.max_sge;
fprintf(stderr, "[OSD %ju] RDMA initialized successfully\n", osd_num);
fcntl(rdma_context->channel->fd, F_SETFL, fcntl(rdma_context->channel->fd, F_GETFL, 0) | O_NONBLOCK);
tfd->set_fd_handler(rdma_context->channel->fd, false, [this](int notify_fd, int epoll_events)
tfd->set_fd_handler(rdma_context->channel->fd, EPOLLIN, [this](int notify_fd, int epoll_events)
{
handle_rdma_events();
});
@ -43,6 +244,44 @@ void osd_messenger_t::init()
}
}
#endif
if (ringloop && iothread_count > 0)
{
for (int i = 0; i < iothread_count; i++)
{
auto iot = new msgr_iothread_t();
iothreads.push_back(iot);
}
immediates_eventfd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
if (immediates_eventfd < 0)
{
throw std::runtime_error(std::string("failed to create set_immediate eventfd: ")+strerror(errno));
}
tfd->set_fd_handler(immediates_eventfd, EPOLLIN, [this](int peer_fd, int epoll_events)
{
// Reset eventfd counter
uint64_t ctr = 0;
int r = read(immediates_eventfd, &ctr, 8);
if (r < 0 && errno != EAGAIN && errno != EINTR)
{
fprintf(stderr, "Error resetting eventfd: %s\n", strerror(errno));
}
while (true)
{
immediates_mu.lock();
auto to_run = std::move(immediates);
immediates_mu.unlock();
if (!to_run.size())
{
break;
}
for (auto & cb: to_run)
{
cb();
}
}
ringloop->wakeup();
});
}
keepalive_timer_id = tfd->set_timer(1000, true, [this](int)
{
auto cl_it = clients.begin();
@ -120,6 +359,12 @@ void osd_messenger_t::init()
osd_messenger_t::~osd_messenger_t()
{
if (immediates_eventfd >= 0)
{
tfd->set_fd_handler(immediates_eventfd, 0, NULL);
close(immediates_eventfd);
immediates_eventfd = -1;
}
if (keepalive_timer_id >= 0)
{
tfd->clear_timer(keepalive_timer_id);
@ -129,6 +374,14 @@ osd_messenger_t::~osd_messenger_t()
{
stop_client(clients.begin()->first, true, true);
}
if (iothreads.size())
{
for (auto iot: iothreads)
{
delete iot;
}
iothreads.clear();
}
#ifdef WITH_RDMA
if (rdma_context)
{
@ -165,6 +418,10 @@ void osd_messenger_t::parse_config(const json11::Json & config)
this->rdma_max_msg = 129*1024;
this->rdma_odp = config["rdma_odp"].bool_value();
#endif
if (!osd_num)
this->iothread_count = config["client_iothread_count"].is_null() ? 4 : (uint32_t)config["client_iothread_count"].uint64_value();
else
this->iothread_count = (uint32_t)config["osd_iothread_count"].uint64_value();
this->receive_buffer_size = (uint32_t)config["tcp_header_buffer_size"].uint64_value();
if (!this->receive_buffer_size || this->receive_buffer_size > 1024*1024*1024)
this->receive_buffer_size = 65536;
@ -255,6 +512,7 @@ void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer
{
fprintf(stderr, "Connecting to OSD %ju at %s:%d (client %d)\n", peer_osd, peer_host, peer_port, peer_fd);
}
clients[peer_fd]->msgr = this;
clients[peer_fd]->peer_addr = addr;
clients[peer_fd]->peer_port = peer_port;
clients[peer_fd]->peer_fd = peer_fd;
@ -262,7 +520,8 @@ void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer
clients[peer_fd]->connect_timeout_id = -1;
clients[peer_fd]->osd_num = peer_osd;
clients[peer_fd]->in_buf = malloc_or_die(receive_buffer_size);
tfd->set_fd_handler(peer_fd, true, [this](int peer_fd, int epoll_events)
clients[peer_fd]->receive_buffer_size = receive_buffer_size;
tfd->set_fd_handler(peer_fd, EPOLLIN|EPOLLOUT, [this](int peer_fd, int epoll_events)
{
// Either OUT (connected) or HUP
handle_connect_epoll(peer_fd);
@ -303,7 +562,11 @@ void osd_messenger_t::handle_connect_epoll(int peer_fd)
int one = 1;
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
cl->peer_state = PEER_CONNECTED;
tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events)
if (iothreads.size())
{
iothreads[peer_fd % iothreads.size()]->add_client(cl);
}
tfd->set_fd_handler(peer_fd, iothreads.size() ? 0 : EPOLLIN, [this](int peer_fd, int epoll_events)
{
handle_peer_epoll(peer_fd, epoll_events);
});
@ -487,7 +750,7 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
fprintf(stderr, "Connected to OSD %ju using RDMA\n", cl->osd_num);
}
cl->peer_state = PEER_RDMA;
tfd->set_fd_handler(cl->peer_fd, false, [this](int peer_fd, int epoll_events)
tfd->set_fd_handler(cl->peer_fd, 0, [this](int peer_fd, int epoll_events)
{
// Do not miss the disconnection!
if (epoll_events & EPOLLRDHUP)
@ -522,13 +785,19 @@ void osd_messenger_t::accept_connections(int listen_fd)
int one = 1;
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
clients[peer_fd] = new osd_client_t();
clients[peer_fd]->msgr = this;
clients[peer_fd]->peer_addr = addr;
clients[peer_fd]->peer_port = ntohs(((sockaddr_in*)&addr)->sin_port);
clients[peer_fd]->peer_fd = peer_fd;
clients[peer_fd]->peer_state = PEER_CONNECTED;
clients[peer_fd]->in_buf = malloc_or_die(receive_buffer_size);
clients[peer_fd]->receive_buffer_size = receive_buffer_size;
// Add FD to epoll
tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events)
if (iothreads.size())
{
iothreads[peer_fd % iothreads.size()]->add_client(clients[peer_fd]);
}
tfd->set_fd_handler(peer_fd, iothreads.size() ? 0 : EPOLLIN, [this](int peer_fd, int epoll_events)
{
handle_peer_epoll(peer_fd, epoll_events);
});

View File

@ -11,6 +11,7 @@
#include <map>
#include <deque>
#include <vector>
#include <mutex>
#include "malloc_or_die.h"
#include "json11/json11.hpp"
@ -45,8 +46,13 @@ struct msgr_rdma_connection_t;
struct msgr_rdma_context_t;
#endif
struct osd_messenger_t;
struct osd_client_t
{
std::mutex mu;
osd_messenger_t *msgr = NULL;
int refs = 0;
sockaddr_storage peer_addr;
@ -59,6 +65,7 @@ struct osd_client_t
osd_num_t osd_num = 0;
void *in_buf = NULL;
uint32_t receive_buffer_size = 0;
#ifdef WITH_RDMA
msgr_rdma_connection_t *rdma_conn = NULL;
@ -89,6 +96,17 @@ struct osd_client_t
std::vector<msgr_sendp_t> outbox, next_outbox;
~osd_client_t();
bool try_send(ring_loop_t *ringloop, bool use_sync_send_recv);
int handle_send(int result);
bool try_recv(ring_loop_t *ringloop, bool use_sync_send_recv);
int handle_read(int result);
bool handle_read_buffer(void *curbuf, int remain);
bool handle_finished_read();
void handle_op_hdr();
bool handle_reply_hdr();
void handle_reply_ready(osd_op_t *op);
};
struct osd_wanted_peer_t
@ -111,6 +129,53 @@ struct osd_op_stats_t
uint64_t subop_stat_count[OSD_OP_MAX+1] = { 0 };
};
#ifdef __MOCK__
class msgr_iothread_t;
#else
#include <thread>
#include "epoll_manager.h"
class msgr_iothread_t
{
protected:
ring_loop_t *ring = NULL;
epoll_manager_t *epmgr = NULL;
ring_consumer_t consumer;
int submit_eventfd = -1;
bool stopped = false;
std::mutex mu;
std::map<int, osd_client_t*> clients;
std::vector<int> read_ready_clients;
std::mutex write_ready_mu;
std::vector<int> write_ready_clients;
std::thread *thread = NULL;
void run();
void read_requests();
void send_replies();
public:
void handle_client_read(osd_client_t *cl, int res);
void handle_client_send(osd_client_t *cl, int res);
msgr_iothread_t();
~msgr_iothread_t();
void add_client(osd_client_t *cl);
void remove_client(osd_client_t *cl);
void wakeup_out(int peer_fd, ring_loop_t *outer_ring);
void stop();
};
#endif
struct osd_messenger_t
{
protected:
@ -123,6 +188,7 @@ protected:
int osd_ping_timeout = 0;
int log_level = 0;
bool use_sync_send_recv = false;
int iothread_count = 0;
#ifdef WITH_RDMA
bool use_rdma = true;
@ -134,10 +200,13 @@ protected:
bool rdma_odp = false;
#endif
std::vector<msgr_iothread_t*> iothreads;
std::vector<int> read_ready_clients;
std::vector<int> write_ready_clients;
int immediates_eventfd = -1;
std::mutex immediates_mu;
// We don't use ringloop->set_immediate here because we may have no ringloop in client :)
std::vector<std::function<void()>> set_immediate;
std::vector<std::function<void()>> immediates;
public:
timerfd_manager_t *tfd;
@ -155,10 +224,13 @@ public:
void parse_config(const json11::Json & config);
void connect_peer(uint64_t osd_num, json11::Json peer_state);
void stop_client(int peer_fd, bool force = false, bool force_delete = false);
void stop_client_from_iothread(osd_client_t *cl);
void outbox_push(osd_op_t *cur_op);
std::function<void(osd_op_t*)> exec_op;
std::function<void(osd_num_t)> repeer_pgs;
std::function<bool(osd_client_t*, json11::Json)> check_config_hook;
void handle_client_read(osd_client_t *cl, int res);
void handle_client_send(osd_client_t *cl, int res);
void read_requests();
void send_replies();
void accept_connections(int listen_fd);
@ -178,6 +250,9 @@ public:
void inc_op_stats(osd_op_stats_t & stats, uint64_t opcode, timespec & tv_begin, timespec & tv_end, uint64_t len);
void measure_exec(osd_op_t *cur_op);
void set_immediate(std::function<void()> cb);
void set_immediate_or_run(std::function<void()> cb);
protected:
void try_connect_peer(uint64_t osd_num);
void try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port);
@ -188,15 +263,7 @@ protected:
void cancel_osd_ops(osd_client_t *cl);
void cancel_op(osd_op_t *op);
bool try_send(osd_client_t *cl);
void handle_send(int result, osd_client_t *cl);
bool handle_read(int result, osd_client_t *cl);
bool handle_read_buffer(osd_client_t *cl, void *curbuf, int remain);
bool handle_finished_read(osd_client_t *cl);
void handle_op_hdr(osd_client_t *cl);
bool handle_reply_hdr(osd_client_t *cl);
void handle_reply_ready(osd_op_t *op);
void handle_immediates();
#ifdef WITH_RDMA
void try_send_rdma(osd_client_t *cl);
@ -205,4 +272,6 @@ protected:
bool try_recv_rdma(osd_client_t *cl);
void handle_rdma_events();
#endif
friend struct osd_client_t;
};

View File

@ -603,7 +603,7 @@ void osd_messenger_t::handle_rdma_events()
if (!is_send)
{
rc->cur_recv--;
if (!handle_read_buffer(cl, rc->recv_buffers[rc->next_recv_buf].buf, wc[i].byte_len))
if (!cl->handle_read_buffer(rc->recv_buffers[rc->next_recv_buf].buf, wc[i].byte_len))
{
// handle_read_buffer may stop the client
continue;
@ -666,9 +666,5 @@ void osd_messenger_t::handle_rdma_events()
}
}
} while (event_count > 0);
for (auto cb: set_immediate)
{
cb();
}
set_immediate.clear();
handle_immediates();
}

View File

@ -1,6 +1,7 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
#include <unistd.h>
#include "messenger.h"
void osd_messenger_t::read_requests()
@ -9,63 +10,119 @@ void osd_messenger_t::read_requests()
{
int peer_fd = read_ready_clients[i];
osd_client_t *cl = clients[peer_fd];
if (cl->read_msg.msg_iovlen)
if (!cl->try_recv(ringloop, use_sync_send_recv))
{
continue;
}
if (cl->read_remaining < receive_buffer_size)
{
cl->read_iov.iov_base = cl->in_buf;
cl->read_iov.iov_len = receive_buffer_size;
cl->read_msg.msg_iov = &cl->read_iov;
cl->read_msg.msg_iovlen = 1;
}
else
{
cl->read_iov.iov_base = 0;
cl->read_iov.iov_len = cl->read_remaining;
cl->read_msg.msg_iov = cl->recv_list.get_iovec();
cl->read_msg.msg_iovlen = cl->recv_list.get_size();
}
cl->refs++;
if (ringloop && !use_sync_send_recv)
{
io_uring_sqe* sqe = ringloop->get_sqe();
if (!sqe)
{
cl->read_msg.msg_iovlen = 0;
read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients.begin() + i);
return;
}
ring_data_t* data = ((ring_data_t*)sqe->user_data);
data->callback = [this, cl](ring_data_t *data) { handle_read(data->res, cl); };
my_uring_prep_recvmsg(sqe, peer_fd, &cl->read_msg, 0);
}
else
{
int result = recvmsg(peer_fd, &cl->read_msg, 0);
if (result < 0)
{
result = -errno;
}
handle_read(result, cl);
read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients.begin() + i);
return;
}
}
read_ready_clients.clear();
if (!iothreads.size())
{
handle_immediates();
}
}
bool osd_messenger_t::handle_read(int result, osd_client_t *cl)
bool osd_client_t::try_recv(ring_loop_t *ringloop, bool use_sync_send_recv)
{
bool ret = false;
auto cl = this;
if (cl->read_msg.msg_iovlen)
{
return true;
}
if (cl->read_remaining < cl->receive_buffer_size)
{
cl->read_iov.iov_base = cl->in_buf;
cl->read_iov.iov_len = cl->receive_buffer_size;
cl->read_msg.msg_iov = &cl->read_iov;
cl->read_msg.msg_iovlen = 1;
}
else
{
cl->read_iov.iov_base = 0;
cl->read_iov.iov_len = cl->read_remaining;
cl->read_msg.msg_iov = cl->recv_list.get_iovec();
cl->read_msg.msg_iovlen = cl->recv_list.get_size();
}
cl->refs++;
if (ringloop && !use_sync_send_recv)
{
io_uring_sqe* sqe = ringloop->get_sqe();
if (!sqe)
{
cl->read_msg.msg_iovlen = 0;
return false;
}
ring_data_t* data = ((ring_data_t*)sqe->user_data);
if (msgr->iothreads.size())
{
data->callback = [this](ring_data_t *data) { msgr->iothreads[peer_fd % msgr->iothreads.size()]->handle_client_read(this, data->res); };
}
else
{
data->callback = [this](ring_data_t *data) { msgr->handle_client_read(this, data->res); };
}
my_uring_prep_recvmsg(sqe, peer_fd, &cl->read_msg, 0);
}
else
{
int result = recvmsg(peer_fd, &cl->read_msg, 0);
if (result < 0)
{
result = -errno;
}
msgr->handle_client_read(this, result);
}
return true;
}
void osd_messenger_t::handle_client_read(osd_client_t *cl, int res)
{
res = cl->handle_read(res);
if (res == -ENOENT)
{
if (!cl->refs)
delete cl;
}
else if (res == -EIO)
{
stop_client(cl->peer_fd);
}
else if (res == -EAGAIN)
{
read_ready_clients.push_back(cl->peer_fd);
}
}
void msgr_iothread_t::handle_client_read(osd_client_t *cl, int res)
{
cl->mu.lock();
res = cl->handle_read(res);
if (res == -ENOENT)
{
if (!cl->refs)
cl->msgr->set_immediate([cl]() { delete cl; });
}
cl->mu.unlock();
if (res == -EIO)
{
cl->msgr->stop_client_from_iothread(cl);
}
else if (res == -EAGAIN)
{
read_ready_clients.push_back(cl->peer_fd);
ring->wakeup();
}
}
int osd_client_t::handle_read(int result)
{
auto cl = this;
cl->read_msg.msg_iovlen = 0;
cl->refs--;
if (cl->peer_state == PEER_STOPPED)
{
if (cl->refs <= 0)
{
delete cl;
}
return false;
return -ENOENT;
}
if (result <= 0 && result != -EAGAIN && result != -EINTR)
{
@ -74,27 +131,14 @@ bool osd_messenger_t::handle_read(int result, osd_client_t *cl)
{
fprintf(stderr, "Client %d socket read error: %d (%s). Disconnecting client\n", cl->peer_fd, -result, strerror(-result));
}
stop_client(cl->peer_fd);
return false;
}
if (result == -EAGAIN || result == -EINTR || result < cl->read_iov.iov_len)
{
cl->read_ready--;
if (cl->read_ready > 0)
read_ready_clients.push_back(cl->peer_fd);
}
else
{
read_ready_clients.push_back(cl->peer_fd);
return -EIO;
}
int expected = cl->read_iov.iov_len;
if (result > 0)
{
if (cl->read_iov.iov_base == cl->in_buf)
{
if (!handle_read_buffer(cl, cl->in_buf, result))
{
goto fin;
}
handle_read_buffer(cl->in_buf, result);
}
else
{
@ -103,28 +147,25 @@ bool osd_messenger_t::handle_read(int result, osd_client_t *cl)
cl->recv_list.eat(result);
if (cl->recv_list.done >= cl->recv_list.count)
{
if (!handle_finished_read(cl))
{
goto fin;
}
handle_finished_read();
}
}
if (result >= cl->read_iov.iov_len)
{
ret = true;
}
}
fin:
for (auto cb: set_immediate)
if (result == -EAGAIN || result == -EINTR || result < expected)
{
cb();
cl->read_ready--;
assert(cl->read_ready >= 0);
}
set_immediate.clear();
return ret;
if (cl->read_ready > 0)
{
return -EAGAIN;
}
return 0;
}
bool osd_messenger_t::handle_read_buffer(osd_client_t *cl, void *curbuf, int remain)
bool osd_client_t::handle_read_buffer(void *curbuf, int remain)
{
auto cl = this;
// Compose operation(s) from the buffer
while (remain > 0)
{
@ -160,7 +201,7 @@ bool osd_messenger_t::handle_read_buffer(osd_client_t *cl, void *curbuf, int rem
}
if (cl->recv_list.done >= cl->recv_list.count)
{
if (!handle_finished_read(cl))
if (!handle_finished_read())
{
return false;
}
@ -169,19 +210,20 @@ bool osd_messenger_t::handle_read_buffer(osd_client_t *cl, void *curbuf, int rem
return true;
}
bool osd_messenger_t::handle_finished_read(osd_client_t *cl)
bool osd_client_t::handle_finished_read()
{
auto cl = this;
cl->recv_list.reset();
if (cl->read_state == CL_READ_HDR)
{
if (cl->read_op->req.hdr.magic == SECONDARY_OSD_REPLY_MAGIC)
return handle_reply_hdr(cl);
return handle_reply_hdr();
else if (cl->read_op->req.hdr.magic == SECONDARY_OSD_OP_MAGIC)
handle_op_hdr(cl);
handle_op_hdr();
else
{
fprintf(stderr, "Received garbage: magic=%jx id=%ju opcode=%jx from %d\n", cl->read_op->req.hdr.magic, cl->read_op->req.hdr.id, cl->read_op->req.hdr.opcode, cl->peer_fd);
stop_client(cl->peer_fd);
msgr->stop_client_from_iothread(cl);
return false;
}
}
@ -189,7 +231,7 @@ bool osd_messenger_t::handle_finished_read(osd_client_t *cl)
{
// Operation is ready
cl->received_ops.push_back(cl->read_op);
set_immediate.push_back([this, op = cl->read_op]() { exec_op(op); });
msgr->set_immediate([msgr = this->msgr, op = cl->read_op, cl]() { msgr->exec_op(op); });
cl->read_op = NULL;
cl->read_state = 0;
}
@ -207,8 +249,9 @@ bool osd_messenger_t::handle_finished_read(osd_client_t *cl)
return true;
}
void osd_messenger_t::handle_op_hdr(osd_client_t *cl)
void osd_client_t::handle_op_hdr()
{
auto cl = this;
osd_op_t *cur_op = cl->read_op;
if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ)
{
@ -285,20 +328,21 @@ void osd_messenger_t::handle_op_hdr(osd_client_t *cl)
{
// Operation is ready
cl->received_ops.push_back(cur_op);
set_immediate.push_back([this, cur_op]() { exec_op(cur_op); });
msgr->set_immediate([msgr = this->msgr, cur_op, cl]() { msgr->exec_op(cur_op); });
cl->read_op = NULL;
cl->read_state = 0;
}
}
bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl)
bool osd_client_t::handle_reply_hdr()
{
auto cl = this;
auto req_it = cl->sent_ops.find(cl->read_op->req.hdr.id);
if (req_it == cl->sent_ops.end())
{
// Command out of sync. Drop connection
fprintf(stderr, "Client %d command out of sync: id %ju\n", cl->peer_fd, cl->read_op->req.hdr.id);
stop_client(cl->peer_fd);
msgr->stop_client_from_iothread(cl);
return false;
}
osd_op_t *op = req_it->second;
@ -315,7 +359,7 @@ bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl)
fprintf(stderr, "Client %d read reply of different length: expected %u+%u, got %jd+%u\n",
cl->peer_fd, expected_size, op->bitmap_len, op->reply.hdr.retval, bmp_len);
cl->sent_ops[op->req.hdr.id] = op;
stop_client(cl->peer_fd);
msgr->stop_client_from_iothread(cl);
return false;
}
if (bmp_len > 0)
@ -391,24 +435,92 @@ reuse:
return true;
}
void osd_messenger_t::handle_reply_ready(osd_op_t *op)
void osd_client_t::handle_reply_ready(osd_op_t *op)
{
// Measure subop latency
timespec tv_end;
clock_gettime(CLOCK_REALTIME, &tv_end);
stats.subop_stat_count[op->req.hdr.opcode]++;
if (!stats.subop_stat_count[op->req.hdr.opcode])
msgr->set_immediate([msgr = this->msgr, op, cl = this]()
{
// Measure subop latency
auto & stats = msgr->stats;
timespec tv_end;
clock_gettime(CLOCK_REALTIME, &tv_end);
stats.subop_stat_count[op->req.hdr.opcode]++;
stats.subop_stat_sum[op->req.hdr.opcode] = 0;
}
stats.subop_stat_sum[op->req.hdr.opcode] += (
(tv_end.tv_sec - op->tv_begin.tv_sec)*1000000 +
(tv_end.tv_nsec - op->tv_begin.tv_nsec)/1000
);
set_immediate.push_back([op]()
{
if (!stats.subop_stat_count[op->req.hdr.opcode])
{
stats.subop_stat_count[op->req.hdr.opcode]++;
stats.subop_stat_sum[op->req.hdr.opcode] = 0;
}
stats.subop_stat_sum[op->req.hdr.opcode] += (
(tv_end.tv_sec - op->tv_begin.tv_sec)*1000000 +
(tv_end.tv_nsec - op->tv_begin.tv_nsec)/1000
);
// Copy lambda to be unaffected by `delete op`
std::function<void(osd_op_t*)>(op->callback)(op);
});
}
static uint64_t one = 1;
void osd_messenger_t::set_immediate(std::function<void()> cb/*, ring_loop_t *ringloop*/)
{
if (!iothreads.size())
{
immediates.push_back(cb);
return;
}
immediates_mu.lock();
bool wakeup_main_thread = !immediates.size();
immediates.push_back(cb);
immediates_mu.unlock();
if (wakeup_main_thread)
{
// io_uring_sqe* sqe = ringloop ? ringloop->get_sqe() : NULL;
// if (!sqe)
// {
write(immediates_eventfd, &one, sizeof(one));
// FIXME: Can't use ringloop here, oops
// }
// else
// {
// ring_data_t* data = ((ring_data_t*)sqe->user_data);
// data->callback = [](ring_data_t*){};
// my_uring_prep_write(sqe, immediates_eventfd, &one, sizeof(one), 0);
// }
}
}
void osd_messenger_t::set_immediate_or_run(std::function<void()> cb/*, ring_loop_t *ringloop*/)
{
if (!iothreads.size())
{
cb();
return;
}
immediates_mu.lock();
bool wakeup_main_thread = !immediates.size();
immediates.push_back(cb);
immediates_mu.unlock();
if (wakeup_main_thread)
{
// io_uring_sqe* sqe = ringloop ? ringloop->get_sqe() : NULL;
// if (!sqe)
// {
write(immediates_eventfd, &one, sizeof(one));
// FIXME: Can't use ringloop here, oops
// }
// else
// {
// ring_data_t* data = ((ring_data_t*)sqe->user_data);
// data->callback = [](ring_data_t*){};
// my_uring_prep_write(sqe, immediates_eventfd, &one, sizeof(one), 0);
// }
}
}
void osd_messenger_t::handle_immediates()
{
auto to_run = std::move(immediates);
for (auto & cb: to_run)
{
cb();
}
}

View File

@ -15,10 +15,17 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
{
clock_gettime(CLOCK_REALTIME, &cur_op->tv_begin);
}
else
else if (cur_op->op_type == OSD_OP_IN)
{
measure_exec(cur_op);
}
if (iothreads.size())
{
cl->mu.lock();
}
if (cur_op->op_type == OSD_OP_IN)
{
// Check that operation actually belongs to this client
// FIXME: Review if this is still needed
bool found = false;
for (auto it = cl->received_ops.begin(); it != cl->received_ops.end(); it++)
{
@ -32,6 +39,10 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
if (!found)
{
delete cur_op;
if (iothreads.size())
{
cl->mu.unlock();
}
return;
}
}
@ -39,7 +50,6 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
auto & to_outbox = cl->write_msg.msg_iovlen ? cl->next_outbox : cl->outbox;
if (cur_op->op_type == OSD_OP_IN)
{
measure_exec(cur_op);
to_send_list.push_back((iovec){ .iov_base = cur_op->reply.buf, .iov_len = OSD_PACKET_SIZE });
}
else
@ -108,21 +118,36 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
#ifdef WITH_RDMA
if (cl->peer_state == PEER_RDMA)
{
if (iothreads.size())
{
cl->mu.unlock();
}
try_send_rdma(cl);
return;
}
#endif
if (!ringloop)
if (iothreads.size())
{
int should_wakeup = !cl->write_msg.msg_iovlen && !cl->write_state;
cl->write_state = CL_WRITE_READY;
cl->mu.unlock();
if (should_wakeup)
{
auto iot = iothreads[cl->peer_fd % iothreads.size()];
iot->wakeup_out(cl->peer_fd, ringloop);
}
}
else if (!ringloop)
{
// FIXME: It's worse because it doesn't allow batching
while (cl->outbox.size())
{
try_send(cl);
cl->try_send(NULL, true);
}
}
else
{
if ((cl->write_msg.msg_iovlen > 0 || !try_send(cl)) && (cl->write_state == 0))
if ((cl->write_msg.msg_iovlen > 0 || !cl->try_send(ringloop, use_sync_send_recv)) && (cl->write_state == 0))
{
cl->write_state = CL_WRITE_READY;
write_ready_clients.push_back(cur_op->peer_fd);
@ -180,8 +205,9 @@ void osd_messenger_t::measure_exec(osd_op_t *cur_op)
}
}
bool osd_messenger_t::try_send(osd_client_t *cl)
bool osd_client_t::try_send(ring_loop_t *ringloop, bool use_sync_send_recv)
{
auto cl = this;
int peer_fd = cl->peer_fd;
if (!cl->send_list.size() || cl->write_msg.msg_iovlen > 0)
{
@ -198,7 +224,14 @@ bool osd_messenger_t::try_send(osd_client_t *cl)
cl->write_msg.msg_iovlen = cl->send_list.size() < IOV_MAX ? cl->send_list.size() : IOV_MAX;
cl->refs++;
ring_data_t* data = ((ring_data_t*)sqe->user_data);
data->callback = [this, cl](ring_data_t *data) { handle_send(data->res, cl); };
if (msgr->iothreads.size())
{
data->callback = [this](ring_data_t *data) { msgr->iothreads[this->peer_fd % msgr->iothreads.size()]->handle_client_send(this, data->res); };
}
else
{
data->callback = [this](ring_data_t *data) { msgr->handle_client_send(this, data->res); };
}
my_uring_prep_sendmsg(sqe, peer_fd, &cl->write_msg, 0);
}
else
@ -211,18 +244,68 @@ bool osd_messenger_t::try_send(osd_client_t *cl)
{
result = -errno;
}
handle_send(result, cl);
msgr->handle_client_send(this, result);
}
return true;
}
void osd_messenger_t::handle_client_send(osd_client_t *cl, int res)
{
res = cl->handle_send(res);
if (res == -ENOENT)
{
if (!cl->refs)
delete cl;
}
else if (res == -EIO)
{
stop_client(cl->peer_fd);
}
else if (res == -EAGAIN)
{
write_ready_clients.push_back(cl->peer_fd);
}
}
void msgr_iothread_t::handle_client_send(osd_client_t *cl, int res)
{
cl->mu.lock();
res = cl->handle_send(res);
if (res == -ENOENT)
{
if (!cl->refs)
cl->msgr->set_immediate([cl]() { delete cl; });
}
cl->mu.unlock();
if (res == -EIO)
{
cl->msgr->stop_client_from_iothread(cl);
}
else if (res == -EAGAIN)
{
write_ready_mu.lock();
write_ready_clients.push_back(cl->peer_fd);
write_ready_mu.unlock();
ring->wakeup();
}
}
void osd_messenger_t::send_replies()
{
if (iothreads.size())
{
return;
}
for (int i = 0; i < write_ready_clients.size(); i++)
{
int peer_fd = write_ready_clients[i];
auto cl_it = clients.find(peer_fd);
if (cl_it != clients.end() && !try_send(cl_it->second))
if (cl_it == clients.end())
{
continue;
}
auto cl = cl_it->second;
if (!cl->try_send(ringloop, use_sync_send_recv))
{
write_ready_clients.erase(write_ready_clients.begin(), write_ready_clients.begin() + i);
return;
@ -231,24 +314,20 @@ void osd_messenger_t::send_replies()
write_ready_clients.clear();
}
void osd_messenger_t::handle_send(int result, osd_client_t *cl)
int osd_client_t::handle_send(int result)
{
auto cl = this;
cl->write_msg.msg_iovlen = 0;
cl->refs--;
if (cl->peer_state == PEER_STOPPED)
{
if (cl->refs <= 0)
{
delete cl;
}
return;
return -ENOENT;
}
if (result < 0 && result != -EAGAIN && result != -EINTR)
{
// this is a client socket, so don't panic. just disconnect it
fprintf(stderr, "Client %d socket write error: %d (%s). Disconnecting client\n", cl->peer_fd, -result, strerror(-result));
stop_client(cl->peer_fd);
return;
return -EIO;
}
if (result >= 0)
{
@ -261,7 +340,7 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl)
if (cl->outbox[done].flags & MSGR_SENDP_FREE)
{
// Reply fully sent
delete cl->outbox[done].op;
msgr->set_immediate_or_run([op = cl->outbox[done].op] { delete op; });
}
result -= iov.iov_len;
done++;
@ -291,26 +370,35 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl)
{
// FIXME: Do something better than just forgetting the FD
// FIXME: Ignore pings during RDMA state transition
if (log_level > 0)
{
fprintf(stderr, "Successfully connected with client %d using RDMA\n", cl->peer_fd);
}
cl->peer_state = PEER_RDMA;
tfd->set_fd_handler(cl->peer_fd, false, [this](int peer_fd, int epoll_events)
msgr->set_immediate_or_run([cl = this, msgr = this->msgr, peer_fd = this->peer_fd]()
{
// Do not miss the disconnection!
if (epoll_events & EPOLLRDHUP)
auto cl_it = msgr->clients.find(peer_fd);
if (cl_it == msgr->clients.end() || cl_it->second != cl)
{
handle_peer_epoll(peer_fd, epoll_events);
return;
}
if (msgr->log_level > 0)
{
fprintf(stderr, "Successfully connected with client %d using RDMA\n", peer_fd);
}
msgr->tfd->set_fd_handler(peer_fd, 0, [msgr](int peer_fd, int epoll_events)
{
// Do not miss the disconnection!
if (epoll_events & EPOLLRDHUP)
{
msgr->handle_peer_epoll(peer_fd, epoll_events);
}
});
// Add the initial receive request
msgr->try_recv_rdma(cl);
});
// Add the initial receive request
try_recv_rdma(cl);
}
#endif
}
if (cl->write_state != 0)
{
write_ready_clients.push_back(cl->peer_fd);
return -EAGAIN;
}
return 0;
}

View File

@ -11,6 +11,7 @@
void osd_messenger_t::cancel_osd_ops(osd_client_t *cl)
{
cl->mu.lock();
std::vector<osd_op_t*> cancel_ops;
cancel_ops.resize(cl->sent_ops.size());
int i = 0;
@ -20,6 +21,7 @@ void osd_messenger_t::cancel_osd_ops(osd_client_t *cl)
}
cl->sent_ops.clear();
cl->outbox.clear();
cl->mu.unlock();
for (auto op: cancel_ops)
{
cancel_op(op);
@ -53,8 +55,10 @@ void osd_messenger_t::stop_client(int peer_fd, bool force, bool force_delete)
return;
}
osd_client_t *cl = it->second;
cl->mu.lock();
if (cl->peer_state == PEER_CONNECTING && !force || cl->peer_state == PEER_STOPPED)
{
cl->mu.unlock();
return;
}
if (log_level > 0)
@ -71,6 +75,7 @@ void osd_messenger_t::stop_client(int peer_fd, bool force, bool force_delete)
// First set state to STOPPED so another stop_client() call doesn't try to free it again
cl->refs++;
cl->peer_state = PEER_STOPPED;
cl->mu.unlock();
if (cl->osd_num)
{
// ...and forget OSD peer
@ -78,7 +83,11 @@ void osd_messenger_t::stop_client(int peer_fd, bool force, bool force_delete)
}
#ifndef __MOCK__
// Then remove FD from the eventloop so we don't accidentally read something
tfd->set_fd_handler(peer_fd, false, NULL);
tfd->set_fd_handler(peer_fd, 0, NULL);
if (iothreads.size())
{
iothreads[peer_fd % iothreads.size()]->remove_client(cl);
}
if (cl->connect_timeout_id >= 0)
{
tfd->clear_timer(cl->connect_timeout_id);
@ -108,17 +117,24 @@ void osd_messenger_t::stop_client(int peer_fd, bool force, bool force_delete)
repeer_pgs(cl->osd_num);
}
// Then cancel all operations
cl->mu.lock();
if (cl->read_op)
{
if (!cl->read_op->callback)
auto op = cl->read_op;
cl->read_op = NULL;
cl->mu.unlock();
if (!op->callback)
{
delete cl->read_op;
delete op;
}
else
{
cancel_op(cl->read_op);
cancel_op(op);
}
cl->read_op = NULL;
}
else
{
cl->mu.unlock();
}
if (cl->osd_num)
{
@ -131,11 +147,32 @@ void osd_messenger_t::stop_client(int peer_fd, bool force, bool force_delete)
{
clients.erase(it);
}
cl->mu.lock();
cl->refs--;
if (cl->refs <= 0 || force_delete)
{
cl->mu.unlock();
delete cl;
}
else
cl->mu.unlock();
}
void osd_messenger_t::stop_client_from_iothread(osd_client_t *cl)
{
if (!iothreads.size())
{
stop_client(cl->peer_fd);
return;
}
set_immediate([this, cl, peer_fd = cl->peer_fd]()
{
auto cl_it = clients.find(peer_fd);
if (cl_it != clients.end() && cl_it->second == cl)
{
stop_client(peer_fd);
}
});
}
osd_client_t::~osd_client_t()

View File

@ -655,7 +655,7 @@ help:
ringloop->register_consumer(&consumer);
// Add FD to epoll
bool stop = false;
epmgr->tfd->set_fd_handler(sockfd[0], false, [this, &stop](int peer_fd, int epoll_events)
epmgr->tfd->set_fd_handler(sockfd[0], EPOLLIN, [this, &stop](int peer_fd, int epoll_events)
{
if (epoll_events & EPOLLRDHUP)
{

View File

@ -185,7 +185,7 @@ void kv_cli_t::run()
fcntl(0, F_SETFL, fcntl(0, F_GETFL, 0) | O_NONBLOCK);
try
{
epmgr->tfd->set_fd_handler(0, false, [this](int fd, int events)
epmgr->tfd->set_fd_handler(0, EPOLLIN, [this](int fd, int events)
{
if (events & EPOLLIN)
{
@ -193,7 +193,7 @@ void kv_cli_t::run()
}
if (events & EPOLLRDHUP)
{
epmgr->tfd->set_fd_handler(0, false, NULL);
epmgr->tfd->set_fd_handler(0, 0, NULL);
finished = true;
}
});

View File

@ -243,7 +243,7 @@ void nfs_proxy_t::run(json11::Json cfg)
// Create NFS socket and add it to epoll
int nfs_socket = create_and_bind_socket(bind_address, nfs_port, 128, &listening_port);
fcntl(nfs_socket, F_SETFL, fcntl(nfs_socket, F_GETFL, 0) | O_NONBLOCK);
epmgr->tfd->set_fd_handler(nfs_socket, false, [this](int nfs_socket, int epoll_events)
epmgr->tfd->set_fd_handler(nfs_socket, EPOLLIN, [this](int nfs_socket, int epoll_events)
{
if (epoll_events & EPOLLRDHUP)
{
@ -260,7 +260,7 @@ void nfs_proxy_t::run(json11::Json cfg)
// Create portmap socket and add it to epoll
int portmap_socket = create_and_bind_socket(bind_address, 111, 128, NULL);
fcntl(portmap_socket, F_SETFL, fcntl(portmap_socket, F_GETFL, 0) | O_NONBLOCK);
epmgr->tfd->set_fd_handler(portmap_socket, false, [this](int portmap_socket, int epoll_events)
epmgr->tfd->set_fd_handler(portmap_socket, EPOLLIN, [this](int portmap_socket, int epoll_events)
{
if (epoll_events & EPOLLRDHUP)
{
@ -466,7 +466,7 @@ void nfs_proxy_t::do_accept(int listen_fd)
{
cli->proc_table.insert(fn);
}
epmgr->tfd->set_fd_handler(nfs_fd, true, [cli](int nfs_fd, int epoll_events)
epmgr->tfd->set_fd_handler(nfs_fd, EPOLLIN|EPOLLOUT, [cli](int nfs_fd, int epoll_events)
{
// Handle incoming event
if (epoll_events & EPOLLRDHUP)
@ -723,7 +723,7 @@ void nfs_client_t::stop()
stopped = true;
if (refs <= 0)
{
parent->epmgr->tfd->set_fd_handler(nfs_fd, true, NULL);
parent->epmgr->tfd->set_fd_handler(nfs_fd, 0, NULL);
close(nfs_fd);
delete this;
}

View File

@ -141,6 +141,14 @@ void osd_t::parse_config(bool init)
config = msgr.merge_configs(cli_config, file_config, etcd_global_config, etcd_osd_config);
if (config.find("log_level") == this->config.end())
config["log_level"] = 1;
if (init)
{
// OSD number
osd_num = config["osd_num"].uint64_value();
if (!osd_num)
throw std::runtime_error("osd_num is required in the configuration");
msgr.osd_num = osd_num;
}
if (bs)
{
auto bs_cfg = json_to_bs(config);
@ -150,11 +158,6 @@ void osd_t::parse_config(bool init)
msgr.parse_config(config);
if (init)
{
// OSD number
osd_num = config["osd_num"].uint64_value();
if (!osd_num)
throw std::runtime_error("osd_num is required in the configuration");
msgr.osd_num = osd_num;
// Vital Blockstore parameters
bs_block_size = config["block_size"].uint64_value();
if (!bs_block_size)
@ -361,7 +364,7 @@ void osd_t::bind_socket()
listen_fd = create_and_bind_socket(bind_address, bind_port, listen_backlog, &listening_port);
fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
epmgr->set_fd_handler(listen_fd, false, [this](int fd, int events)
epmgr->set_fd_handler(listen_fd, EPOLLIN, [this](int fd, int events)
{
msgr.accept_connections(listen_fd);
});

View File

@ -247,6 +247,7 @@ resume_8:
finish:
if (cur_op->peer_fd)
{
// FIXME: Do it before executing sync
auto it = msgr.clients.find(cur_op->peer_fd);
if (it != msgr.clients.end())
it->second->dirty_pgs.clear();

View File

@ -43,7 +43,7 @@ int main(int narg, char *args[])
// Accept new connections
int listen_fd = create_and_bind_socket("0.0.0.0", 11203, 128, NULL);
fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
epmgr->set_fd_handler(listen_fd, false, [listen_fd, msgr](int fd, int events)
epmgr->set_fd_handler(listen_fd, EPOLLIN, [listen_fd, msgr](int fd, int events)
{
msgr->accept_connections(listen_fd);
});

View File

@ -21,7 +21,7 @@ epoll_manager_t::epoll_manager_t(ring_loop_t *ringloop)
throw std::runtime_error(std::string("epoll_create: ") + strerror(errno));
}
tfd = new timerfd_manager_t([this](int fd, bool wr, std::function<void(int, int)> handler) { set_fd_handler(fd, wr, handler); });
tfd = new timerfd_manager_t([this](int fd, int events, std::function<void(int, int)> handler) { set_fd_handler(fd, events, handler); });
if (ringloop)
{
@ -54,14 +54,14 @@ int epoll_manager_t::get_fd()
return epoll_fd;
}
void epoll_manager_t::set_fd_handler(int fd, bool wr, std::function<void(int, int)> handler)
void epoll_manager_t::set_fd_handler(int fd, int events, std::function<void(int, int)> handler)
{
if (handler != NULL)
{
bool exists = epoll_handlers.find(fd) != epoll_handlers.end();
epoll_event ev;
ev.data.fd = fd;
ev.events = (wr ? EPOLLOUT : 0) | EPOLLIN | EPOLLRDHUP | EPOLLET;
ev.events = events | EPOLLRDHUP | EPOLLET;
if (epoll_ctl(epoll_fd, exists ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &ev) < 0)
{
if (errno == ENOENT)

View File

@ -3,6 +3,8 @@
#pragma once
#include <sys/epoll.h>
#include <map>
#include "ringloop.h"
@ -21,7 +23,7 @@ public:
epoll_manager_t(ring_loop_t *ringloop);
~epoll_manager_t();
int get_fd();
void set_fd_handler(int fd, bool wr, std::function<void(int, int)> handler);
void set_fd_handler(int fd, int events, std::function<void(int, int)> handler);
void handle_events(int timeout);
timerfd_manager_t *tfd;

View File

@ -32,12 +32,22 @@ static inline void my_uring_prep_readv(struct io_uring_sqe *sqe, int fd, const s
my_uring_prep_rw(IORING_OP_READV, sqe, fd, iovecs, nr_vecs, offset);
}
static inline void my_uring_prep_read(struct io_uring_sqe *sqe, int fd, void *buf, unsigned nbytes, off_t offset)
{
my_uring_prep_rw(IORING_OP_READ, sqe, fd, buf, nbytes, offset);
}
static inline void my_uring_prep_read_fixed(struct io_uring_sqe *sqe, int fd, void *buf, unsigned nbytes, off_t offset, int buf_index)
{
my_uring_prep_rw(IORING_OP_READ_FIXED, sqe, fd, buf, nbytes, offset);
sqe->buf_index = buf_index;
}
static inline void my_uring_prep_write(struct io_uring_sqe *sqe, int fd, void *buf, unsigned nbytes, off_t offset)
{
my_uring_prep_rw(IORING_OP_WRITE, sqe, fd, buf, nbytes, offset);
}
static inline void my_uring_prep_writev(struct io_uring_sqe *sqe, int fd, const struct iovec *iovecs, unsigned nr_vecs, off_t offset)
{
my_uring_prep_rw(IORING_OP_WRITEV, sqe, fd, iovecs, nr_vecs, offset);

View File

@ -11,7 +11,7 @@
#include <stdexcept>
#include "timerfd_manager.h"
timerfd_manager_t::timerfd_manager_t(std::function<void(int, bool, std::function<void(int, int)>)> set_fd_handler)
timerfd_manager_t::timerfd_manager_t(std::function<void(int, int, std::function<void(int, int)>)> set_fd_handler)
{
this->set_fd_handler = set_fd_handler;
wait_state = 0;
@ -20,7 +20,7 @@ timerfd_manager_t::timerfd_manager_t(std::function<void(int, bool, std::function
{
throw std::runtime_error(std::string("timerfd_create: ") + strerror(errno));
}
set_fd_handler(timerfd, false, [this](int fd, int events)
set_fd_handler(timerfd, EPOLLIN, [this](int fd, int events)
{
handle_readable();
});
@ -28,7 +28,7 @@ timerfd_manager_t::timerfd_manager_t(std::function<void(int, bool, std::function
timerfd_manager_t::~timerfd_manager_t()
{
set_fd_handler(timerfd, false, NULL);
set_fd_handler(timerfd, 0, NULL);
close(timerfd);
}

View File

@ -30,9 +30,9 @@ class timerfd_manager_t
void trigger_nearest();
void handle_readable();
public:
std::function<void(int, bool, std::function<void(int, int)>)> set_fd_handler;
std::function<void(int, int, std::function<void(int, int)>)> set_fd_handler;
timerfd_manager_t(std::function<void(int, bool, std::function<void(int, int)>)> set_fd_handler);
timerfd_manager_t(std::function<void(int, int, std::function<void(int, int)>)> set_fd_handler);
~timerfd_manager_t();
int set_timer(uint64_t millis, bool repeat, std::function<void(int)> callback);
int set_timer_us(uint64_t micros, bool repeat, std::function<void(int)> callback);