1
0
Fork 0

Compare commits

...

2 Commits

6 changed files with 185 additions and 2 deletions

View File

@ -15,6 +15,106 @@
#include "msgr_rdma.h"
#endif
#include <sys/poll.h>
msgr_iothread_t::msgr_iothread_t():
ring(RINGLOOP_DEFAULT_SIZE),
thread(&msgr_iothread_t::run, this)
{
eventfd = ring.register_eventfd();
if (eventfd < 0)
{
throw std::runtime_error(std::string("failed to register eventfd: ") + strerror(-eventfd));
}
}
msgr_iothread_t::~msgr_iothread_t()
{
stop();
}
void msgr_iothread_t::add_sqe(io_uring_sqe & sqe)
{
mu.lock();
queue.push_back((iothread_sqe_t){ .sqe = sqe, .data = std::move(*(ring_data_t*)sqe.user_data) });
if (queue.size() == 1)
{
cond.notify_all();
}
mu.unlock();
}
void msgr_iothread_t::stop()
{
mu.lock();
if (stopped)
{
mu.unlock();
return;
}
stopped = true;
if (outer_loop_data)
{
outer_loop_data->callback = [](ring_data_t*){};
}
cond.notify_all();
close(eventfd);
mu.unlock();
thread.join();
}
void msgr_iothread_t::add_to_ringloop(ring_loop_t *outer_loop)
{
assert(!this->outer_loop || this->outer_loop == outer_loop);
io_uring_sqe *sqe = outer_loop->get_sqe();
assert(sqe != NULL);
this->outer_loop = outer_loop;
this->outer_loop_data = ((ring_data_t*)sqe->user_data);
my_uring_prep_poll_add(sqe, eventfd, POLLIN);
outer_loop_data->callback = [this](ring_data_t *data)
{
if (data->res < 0)
{
throw std::runtime_error(std::string("eventfd poll failed: ") + strerror(-data->res));
}
outer_loop_data = NULL;
if (stopped)
{
return;
}
add_to_ringloop(this->outer_loop);
ring.loop();
};
}
void msgr_iothread_t::run()
{
while (true)
{
{
std::unique_lock<std::mutex> lk(mu);
while (!stopped && !queue.size())
cond.wait(lk);
if (stopped)
return;
int i = 0;
for (; i < queue.size(); i++)
{
io_uring_sqe *sqe = ring.get_sqe();
if (!sqe)
break;
ring_data_t *data = ((ring_data_t*)sqe->user_data);
*data = std::move(queue[i].data);
*sqe = queue[i].sqe;
sqe->user_data = (uint64_t)data;
}
queue.erase(queue.begin(), queue.begin()+i);
}
// We only want to offload sendmsg/recvmsg. Callbacks will be called in main thread
ring.submit();
}
}
void osd_messenger_t::init()
{
#ifdef WITH_RDMA
@ -43,6 +143,15 @@ void osd_messenger_t::init()
}
}
#endif
if (ringloop && iothread_count > 0)
{
for (int i = 0; i < iothread_count; i++)
{
auto iot = new msgr_iothread_t();
iothreads.push_back(iot);
iot->add_to_ringloop(ringloop);
}
}
keepalive_timer_id = tfd->set_timer(1000, true, [this](int)
{
auto cl_it = clients.begin();
@ -129,6 +238,14 @@ osd_messenger_t::~osd_messenger_t()
{
stop_client(clients.begin()->first, true, true);
}
if (iothreads.size())
{
for (auto iot: iothreads)
{
delete iot;
}
iothreads.clear();
}
#ifdef WITH_RDMA
if (rdma_context)
{

View File

@ -111,6 +111,44 @@ struct osd_op_stats_t
uint64_t subop_stat_count[OSD_OP_MAX+1] = { 0 };
};
#include <mutex>
#include <condition_variable>
#include <thread>
#ifdef __MOCK__
class msgr_iothread_t;
#else
struct iothread_sqe_t
{
io_uring_sqe sqe;
ring_data_t data;
};
class msgr_iothread_t
{
protected:
ring_loop_t ring;
ring_loop_t *outer_loop = NULL;
ring_data_t *outer_loop_data = NULL;
int eventfd = -1;
bool stopped = false;
std::mutex mu;
std::condition_variable cond;
std::vector<iothread_sqe_t> queue;
std::thread thread;
void run();
public:
msgr_iothread_t();
~msgr_iothread_t();
void add_sqe(io_uring_sqe & sqe);
void stop();
void add_to_ringloop(ring_loop_t *outer_loop);
};
#endif
struct osd_messenger_t
{
protected:
@ -123,6 +161,7 @@ protected:
int osd_ping_timeout = 0;
int log_level = 0;
bool use_sync_send_recv = false;
int iothread_count = 4;
#ifdef WITH_RDMA
bool use_rdma = true;
@ -134,6 +173,7 @@ protected:
bool rdma_odp = false;
#endif
std::vector<msgr_iothread_t*> iothreads;
std::vector<int> read_ready_clients;
std::vector<int> write_ready_clients;
// We don't use ringloop->set_immediate here because we may have no ringloop in client :)

View File

@ -30,7 +30,11 @@ void osd_messenger_t::read_requests()
cl->refs++;
if (ringloop && !use_sync_send_recv)
{
io_uring_sqe* sqe = ringloop->get_sqe();
auto iothread = iothreads.size() ? iothreads[peer_fd % iothreads.size()] : NULL;
io_uring_sqe sqe_local;
ring_data_t data_local;
sqe_local.user_data = (uint64_t)&data_local;
io_uring_sqe* sqe = (iothread ? &sqe_local : ringloop->get_sqe());
if (!sqe)
{
cl->read_msg.msg_iovlen = 0;
@ -40,6 +44,10 @@ void osd_messenger_t::read_requests()
ring_data_t* data = ((ring_data_t*)sqe->user_data);
data->callback = [this, cl](ring_data_t *data) { handle_read(data->res, cl); };
my_uring_prep_recvmsg(sqe, peer_fd, &cl->read_msg, 0);
if (iothread)
{
iothread->add_sqe(sqe_local);
}
}
else
{

View File

@ -189,7 +189,11 @@ bool osd_messenger_t::try_send(osd_client_t *cl)
}
if (ringloop && !use_sync_send_recv)
{
io_uring_sqe* sqe = ringloop->get_sqe();
auto iothread = iothreads.size() ? iothreads[peer_fd % iothreads.size()] : NULL;
io_uring_sqe sqe_local;
ring_data_t data_local;
sqe_local.user_data = (uint64_t)&data_local;
io_uring_sqe* sqe = (iothread ? &sqe_local : ringloop->get_sqe());
if (!sqe)
{
return false;
@ -200,6 +204,10 @@ bool osd_messenger_t::try_send(osd_client_t *cl)
ring_data_t* data = ((ring_data_t*)sqe->user_data);
data->callback = [this, cl](ring_data_t *data) { handle_send(data->res, cl); };
my_uring_prep_sendmsg(sqe, peer_fd, &cl->write_msg, 0);
if (iothread)
{
iothread->add_sqe(sqe_local);
}
}
else
{

View File

@ -79,6 +79,7 @@ void ring_loop_t::loop()
struct io_uring_cqe *cqe;
while (!io_uring_peek_cqe(&ring, &cqe))
{
mu.lock();
struct ring_data_t *d = (struct ring_data_t*)cqe->user_data;
if (d->callback)
{
@ -90,12 +91,14 @@ void ring_loop_t::loop()
dl.res = cqe->res;
dl.callback.swap(d->callback);
free_ring_data[free_ring_data_ptr++] = d - ring_datas;
mu.unlock();
dl.callback(&dl);
}
else
{
fprintf(stderr, "Warning: empty callback in SQE\n");
free_ring_data[free_ring_data_ptr++] = d - ring_datas;
mu.unlock();
}
io_uring_cqe_seen(&ring, cqe);
}

View File

@ -14,6 +14,7 @@
#include <string>
#include <functional>
#include <vector>
#include <mutex>
#define RINGLOOP_DEFAULT_SIZE 1024
@ -124,6 +125,7 @@ class ring_loop_t
std::vector<std::function<void()>> immediate_queue, immediate_queue2;
std::vector<ring_consumer_t*> consumers;
struct ring_data_t *ring_datas;
std::mutex mu;
int *free_ring_data;
unsigned free_ring_data_ptr;
bool loop_again;
@ -138,12 +140,17 @@ public:
inline struct io_uring_sqe* get_sqe()
{
mu.lock();
if (free_ring_data_ptr == 0)
{
mu.unlock();
return NULL;
}
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring);
assert(sqe);
*sqe = { 0 };
io_uring_sqe_set_data(sqe, ring_datas + free_ring_data[--free_ring_data_ptr]);
mu.unlock();
return sqe;
}
inline void set_immediate(const std::function<void()> cb)