forked from vitalif/vitastor
Compare commits
2 Commits
master
...
msgr-iothr
Author | SHA1 | Date |
---|---|---|
Vitaliy Filippov | 118a1cd521 | |
Vitaliy Filippov | 6da49d38fe |
|
@ -15,6 +15,106 @@
|
|||
#include "msgr_rdma.h"
|
||||
#endif
|
||||
|
||||
#include <sys/poll.h>
|
||||
|
||||
msgr_iothread_t::msgr_iothread_t():
|
||||
ring(RINGLOOP_DEFAULT_SIZE),
|
||||
thread(&msgr_iothread_t::run, this)
|
||||
{
|
||||
eventfd = ring.register_eventfd();
|
||||
if (eventfd < 0)
|
||||
{
|
||||
throw std::runtime_error(std::string("failed to register eventfd: ") + strerror(-eventfd));
|
||||
}
|
||||
}
|
||||
|
||||
msgr_iothread_t::~msgr_iothread_t()
|
||||
{
|
||||
stop();
|
||||
}
|
||||
|
||||
void msgr_iothread_t::add_sqe(io_uring_sqe & sqe)
|
||||
{
|
||||
mu.lock();
|
||||
queue.push_back((iothread_sqe_t){ .sqe = sqe, .data = std::move(*(ring_data_t*)sqe.user_data) });
|
||||
if (queue.size() == 1)
|
||||
{
|
||||
cond.notify_all();
|
||||
}
|
||||
mu.unlock();
|
||||
}
|
||||
|
||||
void msgr_iothread_t::stop()
|
||||
{
|
||||
mu.lock();
|
||||
if (stopped)
|
||||
{
|
||||
mu.unlock();
|
||||
return;
|
||||
}
|
||||
stopped = true;
|
||||
if (outer_loop_data)
|
||||
{
|
||||
outer_loop_data->callback = [](ring_data_t*){};
|
||||
}
|
||||
cond.notify_all();
|
||||
close(eventfd);
|
||||
mu.unlock();
|
||||
thread.join();
|
||||
}
|
||||
|
||||
void msgr_iothread_t::add_to_ringloop(ring_loop_t *outer_loop)
|
||||
{
|
||||
assert(!this->outer_loop || this->outer_loop == outer_loop);
|
||||
io_uring_sqe *sqe = outer_loop->get_sqe();
|
||||
assert(sqe != NULL);
|
||||
this->outer_loop = outer_loop;
|
||||
this->outer_loop_data = ((ring_data_t*)sqe->user_data);
|
||||
my_uring_prep_poll_add(sqe, eventfd, POLLIN);
|
||||
outer_loop_data->callback = [this](ring_data_t *data)
|
||||
{
|
||||
if (data->res < 0)
|
||||
{
|
||||
throw std::runtime_error(std::string("eventfd poll failed: ") + strerror(-data->res));
|
||||
}
|
||||
outer_loop_data = NULL;
|
||||
if (stopped)
|
||||
{
|
||||
return;
|
||||
}
|
||||
add_to_ringloop(this->outer_loop);
|
||||
ring.loop();
|
||||
};
|
||||
}
|
||||
|
||||
void msgr_iothread_t::run()
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
{
|
||||
std::unique_lock<std::mutex> lk(mu);
|
||||
while (!stopped && !queue.size())
|
||||
cond.wait(lk);
|
||||
if (stopped)
|
||||
return;
|
||||
int i = 0;
|
||||
for (; i < queue.size(); i++)
|
||||
{
|
||||
io_uring_sqe *sqe = ring.get_sqe();
|
||||
if (!sqe)
|
||||
break;
|
||||
ring_data_t *data = ((ring_data_t*)sqe->user_data);
|
||||
*data = std::move(queue[i].data);
|
||||
*sqe = queue[i].sqe;
|
||||
sqe->user_data = (uint64_t)data;
|
||||
}
|
||||
queue.erase(queue.begin(), queue.begin()+i);
|
||||
}
|
||||
// We only want to offload sendmsg/recvmsg. Callbacks will be called in main thread
|
||||
ring.submit();
|
||||
}
|
||||
}
|
||||
|
||||
void osd_messenger_t::init()
|
||||
{
|
||||
#ifdef WITH_RDMA
|
||||
|
@ -43,6 +143,15 @@ void osd_messenger_t::init()
|
|||
}
|
||||
}
|
||||
#endif
|
||||
if (ringloop && iothread_count > 0)
|
||||
{
|
||||
for (int i = 0; i < iothread_count; i++)
|
||||
{
|
||||
auto iot = new msgr_iothread_t();
|
||||
iothreads.push_back(iot);
|
||||
iot->add_to_ringloop(ringloop);
|
||||
}
|
||||
}
|
||||
keepalive_timer_id = tfd->set_timer(1000, true, [this](int)
|
||||
{
|
||||
auto cl_it = clients.begin();
|
||||
|
@ -129,6 +238,14 @@ osd_messenger_t::~osd_messenger_t()
|
|||
{
|
||||
stop_client(clients.begin()->first, true, true);
|
||||
}
|
||||
if (iothreads.size())
|
||||
{
|
||||
for (auto iot: iothreads)
|
||||
{
|
||||
delete iot;
|
||||
}
|
||||
iothreads.clear();
|
||||
}
|
||||
#ifdef WITH_RDMA
|
||||
if (rdma_context)
|
||||
{
|
||||
|
|
|
@ -111,6 +111,44 @@ struct osd_op_stats_t
|
|||
uint64_t subop_stat_count[OSD_OP_MAX+1] = { 0 };
|
||||
};
|
||||
|
||||
#include <mutex>
|
||||
#include <condition_variable>
|
||||
#include <thread>
|
||||
|
||||
#ifdef __MOCK__
|
||||
class msgr_iothread_t;
|
||||
#else
|
||||
struct iothread_sqe_t
|
||||
{
|
||||
io_uring_sqe sqe;
|
||||
ring_data_t data;
|
||||
};
|
||||
|
||||
class msgr_iothread_t
|
||||
{
|
||||
protected:
|
||||
ring_loop_t ring;
|
||||
ring_loop_t *outer_loop = NULL;
|
||||
ring_data_t *outer_loop_data = NULL;
|
||||
int eventfd = -1;
|
||||
bool stopped = false;
|
||||
std::mutex mu;
|
||||
std::condition_variable cond;
|
||||
std::vector<iothread_sqe_t> queue;
|
||||
std::thread thread;
|
||||
|
||||
void run();
|
||||
public:
|
||||
|
||||
msgr_iothread_t();
|
||||
~msgr_iothread_t();
|
||||
|
||||
void add_sqe(io_uring_sqe & sqe);
|
||||
void stop();
|
||||
void add_to_ringloop(ring_loop_t *outer_loop);
|
||||
};
|
||||
#endif
|
||||
|
||||
struct osd_messenger_t
|
||||
{
|
||||
protected:
|
||||
|
@ -123,6 +161,7 @@ protected:
|
|||
int osd_ping_timeout = 0;
|
||||
int log_level = 0;
|
||||
bool use_sync_send_recv = false;
|
||||
int iothread_count = 4;
|
||||
|
||||
#ifdef WITH_RDMA
|
||||
bool use_rdma = true;
|
||||
|
@ -134,6 +173,7 @@ protected:
|
|||
bool rdma_odp = false;
|
||||
#endif
|
||||
|
||||
std::vector<msgr_iothread_t*> iothreads;
|
||||
std::vector<int> read_ready_clients;
|
||||
std::vector<int> write_ready_clients;
|
||||
// We don't use ringloop->set_immediate here because we may have no ringloop in client :)
|
||||
|
|
|
@ -30,7 +30,11 @@ void osd_messenger_t::read_requests()
|
|||
cl->refs++;
|
||||
if (ringloop && !use_sync_send_recv)
|
||||
{
|
||||
io_uring_sqe* sqe = ringloop->get_sqe();
|
||||
auto iothread = iothreads.size() ? iothreads[peer_fd % iothreads.size()] : NULL;
|
||||
io_uring_sqe sqe_local;
|
||||
ring_data_t data_local;
|
||||
sqe_local.user_data = (uint64_t)&data_local;
|
||||
io_uring_sqe* sqe = (iothread ? &sqe_local : ringloop->get_sqe());
|
||||
if (!sqe)
|
||||
{
|
||||
cl->read_msg.msg_iovlen = 0;
|
||||
|
@ -40,6 +44,10 @@ void osd_messenger_t::read_requests()
|
|||
ring_data_t* data = ((ring_data_t*)sqe->user_data);
|
||||
data->callback = [this, cl](ring_data_t *data) { handle_read(data->res, cl); };
|
||||
my_uring_prep_recvmsg(sqe, peer_fd, &cl->read_msg, 0);
|
||||
if (iothread)
|
||||
{
|
||||
iothread->add_sqe(sqe_local);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
|
@ -189,7 +189,11 @@ bool osd_messenger_t::try_send(osd_client_t *cl)
|
|||
}
|
||||
if (ringloop && !use_sync_send_recv)
|
||||
{
|
||||
io_uring_sqe* sqe = ringloop->get_sqe();
|
||||
auto iothread = iothreads.size() ? iothreads[peer_fd % iothreads.size()] : NULL;
|
||||
io_uring_sqe sqe_local;
|
||||
ring_data_t data_local;
|
||||
sqe_local.user_data = (uint64_t)&data_local;
|
||||
io_uring_sqe* sqe = (iothread ? &sqe_local : ringloop->get_sqe());
|
||||
if (!sqe)
|
||||
{
|
||||
return false;
|
||||
|
@ -200,6 +204,10 @@ bool osd_messenger_t::try_send(osd_client_t *cl)
|
|||
ring_data_t* data = ((ring_data_t*)sqe->user_data);
|
||||
data->callback = [this, cl](ring_data_t *data) { handle_send(data->res, cl); };
|
||||
my_uring_prep_sendmsg(sqe, peer_fd, &cl->write_msg, 0);
|
||||
if (iothread)
|
||||
{
|
||||
iothread->add_sqe(sqe_local);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
|
@ -79,6 +79,7 @@ void ring_loop_t::loop()
|
|||
struct io_uring_cqe *cqe;
|
||||
while (!io_uring_peek_cqe(&ring, &cqe))
|
||||
{
|
||||
mu.lock();
|
||||
struct ring_data_t *d = (struct ring_data_t*)cqe->user_data;
|
||||
if (d->callback)
|
||||
{
|
||||
|
@ -90,12 +91,14 @@ void ring_loop_t::loop()
|
|||
dl.res = cqe->res;
|
||||
dl.callback.swap(d->callback);
|
||||
free_ring_data[free_ring_data_ptr++] = d - ring_datas;
|
||||
mu.unlock();
|
||||
dl.callback(&dl);
|
||||
}
|
||||
else
|
||||
{
|
||||
fprintf(stderr, "Warning: empty callback in SQE\n");
|
||||
free_ring_data[free_ring_data_ptr++] = d - ring_datas;
|
||||
mu.unlock();
|
||||
}
|
||||
io_uring_cqe_seen(&ring, cqe);
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
#include <string>
|
||||
#include <functional>
|
||||
#include <vector>
|
||||
#include <mutex>
|
||||
|
||||
#define RINGLOOP_DEFAULT_SIZE 1024
|
||||
|
||||
|
@ -124,6 +125,7 @@ class ring_loop_t
|
|||
std::vector<std::function<void()>> immediate_queue, immediate_queue2;
|
||||
std::vector<ring_consumer_t*> consumers;
|
||||
struct ring_data_t *ring_datas;
|
||||
std::mutex mu;
|
||||
int *free_ring_data;
|
||||
unsigned free_ring_data_ptr;
|
||||
bool loop_again;
|
||||
|
@ -138,12 +140,17 @@ public:
|
|||
|
||||
inline struct io_uring_sqe* get_sqe()
|
||||
{
|
||||
mu.lock();
|
||||
if (free_ring_data_ptr == 0)
|
||||
{
|
||||
mu.unlock();
|
||||
return NULL;
|
||||
}
|
||||
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring);
|
||||
assert(sqe);
|
||||
*sqe = { 0 };
|
||||
io_uring_sqe_set_data(sqe, ring_datas + free_ring_data[--free_ring_data_ptr]);
|
||||
mu.unlock();
|
||||
return sqe;
|
||||
}
|
||||
inline void set_immediate(const std::function<void()> cb)
|
||||
|
|
Loading…
Reference in New Issue