forked from vitalif/vitastor
Compare commits
2 Commits
master
...
msgr-iothr
Author | SHA1 | Date |
---|---|---|
Vitaliy Filippov | 118a1cd521 | |
Vitaliy Filippov | 6da49d38fe |
|
@ -15,6 +15,106 @@
|
||||||
#include "msgr_rdma.h"
|
#include "msgr_rdma.h"
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#include <sys/poll.h>
|
||||||
|
|
||||||
|
msgr_iothread_t::msgr_iothread_t():
|
||||||
|
ring(RINGLOOP_DEFAULT_SIZE),
|
||||||
|
thread(&msgr_iothread_t::run, this)
|
||||||
|
{
|
||||||
|
eventfd = ring.register_eventfd();
|
||||||
|
if (eventfd < 0)
|
||||||
|
{
|
||||||
|
throw std::runtime_error(std::string("failed to register eventfd: ") + strerror(-eventfd));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
msgr_iothread_t::~msgr_iothread_t()
|
||||||
|
{
|
||||||
|
stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
void msgr_iothread_t::add_sqe(io_uring_sqe & sqe)
|
||||||
|
{
|
||||||
|
mu.lock();
|
||||||
|
queue.push_back((iothread_sqe_t){ .sqe = sqe, .data = std::move(*(ring_data_t*)sqe.user_data) });
|
||||||
|
if (queue.size() == 1)
|
||||||
|
{
|
||||||
|
cond.notify_all();
|
||||||
|
}
|
||||||
|
mu.unlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
void msgr_iothread_t::stop()
|
||||||
|
{
|
||||||
|
mu.lock();
|
||||||
|
if (stopped)
|
||||||
|
{
|
||||||
|
mu.unlock();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
stopped = true;
|
||||||
|
if (outer_loop_data)
|
||||||
|
{
|
||||||
|
outer_loop_data->callback = [](ring_data_t*){};
|
||||||
|
}
|
||||||
|
cond.notify_all();
|
||||||
|
close(eventfd);
|
||||||
|
mu.unlock();
|
||||||
|
thread.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
void msgr_iothread_t::add_to_ringloop(ring_loop_t *outer_loop)
|
||||||
|
{
|
||||||
|
assert(!this->outer_loop || this->outer_loop == outer_loop);
|
||||||
|
io_uring_sqe *sqe = outer_loop->get_sqe();
|
||||||
|
assert(sqe != NULL);
|
||||||
|
this->outer_loop = outer_loop;
|
||||||
|
this->outer_loop_data = ((ring_data_t*)sqe->user_data);
|
||||||
|
my_uring_prep_poll_add(sqe, eventfd, POLLIN);
|
||||||
|
outer_loop_data->callback = [this](ring_data_t *data)
|
||||||
|
{
|
||||||
|
if (data->res < 0)
|
||||||
|
{
|
||||||
|
throw std::runtime_error(std::string("eventfd poll failed: ") + strerror(-data->res));
|
||||||
|
}
|
||||||
|
outer_loop_data = NULL;
|
||||||
|
if (stopped)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
add_to_ringloop(this->outer_loop);
|
||||||
|
ring.loop();
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
void msgr_iothread_t::run()
|
||||||
|
{
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lk(mu);
|
||||||
|
while (!stopped && !queue.size())
|
||||||
|
cond.wait(lk);
|
||||||
|
if (stopped)
|
||||||
|
return;
|
||||||
|
int i = 0;
|
||||||
|
for (; i < queue.size(); i++)
|
||||||
|
{
|
||||||
|
io_uring_sqe *sqe = ring.get_sqe();
|
||||||
|
if (!sqe)
|
||||||
|
break;
|
||||||
|
ring_data_t *data = ((ring_data_t*)sqe->user_data);
|
||||||
|
*data = std::move(queue[i].data);
|
||||||
|
*sqe = queue[i].sqe;
|
||||||
|
sqe->user_data = (uint64_t)data;
|
||||||
|
}
|
||||||
|
queue.erase(queue.begin(), queue.begin()+i);
|
||||||
|
}
|
||||||
|
// We only want to offload sendmsg/recvmsg. Callbacks will be called in main thread
|
||||||
|
ring.submit();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void osd_messenger_t::init()
|
void osd_messenger_t::init()
|
||||||
{
|
{
|
||||||
#ifdef WITH_RDMA
|
#ifdef WITH_RDMA
|
||||||
|
@ -43,6 +143,15 @@ void osd_messenger_t::init()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
if (ringloop && iothread_count > 0)
|
||||||
|
{
|
||||||
|
for (int i = 0; i < iothread_count; i++)
|
||||||
|
{
|
||||||
|
auto iot = new msgr_iothread_t();
|
||||||
|
iothreads.push_back(iot);
|
||||||
|
iot->add_to_ringloop(ringloop);
|
||||||
|
}
|
||||||
|
}
|
||||||
keepalive_timer_id = tfd->set_timer(1000, true, [this](int)
|
keepalive_timer_id = tfd->set_timer(1000, true, [this](int)
|
||||||
{
|
{
|
||||||
auto cl_it = clients.begin();
|
auto cl_it = clients.begin();
|
||||||
|
@ -129,6 +238,14 @@ osd_messenger_t::~osd_messenger_t()
|
||||||
{
|
{
|
||||||
stop_client(clients.begin()->first, true, true);
|
stop_client(clients.begin()->first, true, true);
|
||||||
}
|
}
|
||||||
|
if (iothreads.size())
|
||||||
|
{
|
||||||
|
for (auto iot: iothreads)
|
||||||
|
{
|
||||||
|
delete iot;
|
||||||
|
}
|
||||||
|
iothreads.clear();
|
||||||
|
}
|
||||||
#ifdef WITH_RDMA
|
#ifdef WITH_RDMA
|
||||||
if (rdma_context)
|
if (rdma_context)
|
||||||
{
|
{
|
||||||
|
|
|
@ -111,6 +111,44 @@ struct osd_op_stats_t
|
||||||
uint64_t subop_stat_count[OSD_OP_MAX+1] = { 0 };
|
uint64_t subop_stat_count[OSD_OP_MAX+1] = { 0 };
|
||||||
};
|
};
|
||||||
|
|
||||||
|
#include <mutex>
|
||||||
|
#include <condition_variable>
|
||||||
|
#include <thread>
|
||||||
|
|
||||||
|
#ifdef __MOCK__
|
||||||
|
class msgr_iothread_t;
|
||||||
|
#else
|
||||||
|
struct iothread_sqe_t
|
||||||
|
{
|
||||||
|
io_uring_sqe sqe;
|
||||||
|
ring_data_t data;
|
||||||
|
};
|
||||||
|
|
||||||
|
class msgr_iothread_t
|
||||||
|
{
|
||||||
|
protected:
|
||||||
|
ring_loop_t ring;
|
||||||
|
ring_loop_t *outer_loop = NULL;
|
||||||
|
ring_data_t *outer_loop_data = NULL;
|
||||||
|
int eventfd = -1;
|
||||||
|
bool stopped = false;
|
||||||
|
std::mutex mu;
|
||||||
|
std::condition_variable cond;
|
||||||
|
std::vector<iothread_sqe_t> queue;
|
||||||
|
std::thread thread;
|
||||||
|
|
||||||
|
void run();
|
||||||
|
public:
|
||||||
|
|
||||||
|
msgr_iothread_t();
|
||||||
|
~msgr_iothread_t();
|
||||||
|
|
||||||
|
void add_sqe(io_uring_sqe & sqe);
|
||||||
|
void stop();
|
||||||
|
void add_to_ringloop(ring_loop_t *outer_loop);
|
||||||
|
};
|
||||||
|
#endif
|
||||||
|
|
||||||
struct osd_messenger_t
|
struct osd_messenger_t
|
||||||
{
|
{
|
||||||
protected:
|
protected:
|
||||||
|
@ -123,6 +161,7 @@ protected:
|
||||||
int osd_ping_timeout = 0;
|
int osd_ping_timeout = 0;
|
||||||
int log_level = 0;
|
int log_level = 0;
|
||||||
bool use_sync_send_recv = false;
|
bool use_sync_send_recv = false;
|
||||||
|
int iothread_count = 4;
|
||||||
|
|
||||||
#ifdef WITH_RDMA
|
#ifdef WITH_RDMA
|
||||||
bool use_rdma = true;
|
bool use_rdma = true;
|
||||||
|
@ -134,6 +173,7 @@ protected:
|
||||||
bool rdma_odp = false;
|
bool rdma_odp = false;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
std::vector<msgr_iothread_t*> iothreads;
|
||||||
std::vector<int> read_ready_clients;
|
std::vector<int> read_ready_clients;
|
||||||
std::vector<int> write_ready_clients;
|
std::vector<int> write_ready_clients;
|
||||||
// We don't use ringloop->set_immediate here because we may have no ringloop in client :)
|
// We don't use ringloop->set_immediate here because we may have no ringloop in client :)
|
||||||
|
|
|
@ -30,7 +30,11 @@ void osd_messenger_t::read_requests()
|
||||||
cl->refs++;
|
cl->refs++;
|
||||||
if (ringloop && !use_sync_send_recv)
|
if (ringloop && !use_sync_send_recv)
|
||||||
{
|
{
|
||||||
io_uring_sqe* sqe = ringloop->get_sqe();
|
auto iothread = iothreads.size() ? iothreads[peer_fd % iothreads.size()] : NULL;
|
||||||
|
io_uring_sqe sqe_local;
|
||||||
|
ring_data_t data_local;
|
||||||
|
sqe_local.user_data = (uint64_t)&data_local;
|
||||||
|
io_uring_sqe* sqe = (iothread ? &sqe_local : ringloop->get_sqe());
|
||||||
if (!sqe)
|
if (!sqe)
|
||||||
{
|
{
|
||||||
cl->read_msg.msg_iovlen = 0;
|
cl->read_msg.msg_iovlen = 0;
|
||||||
|
@ -40,6 +44,10 @@ void osd_messenger_t::read_requests()
|
||||||
ring_data_t* data = ((ring_data_t*)sqe->user_data);
|
ring_data_t* data = ((ring_data_t*)sqe->user_data);
|
||||||
data->callback = [this, cl](ring_data_t *data) { handle_read(data->res, cl); };
|
data->callback = [this, cl](ring_data_t *data) { handle_read(data->res, cl); };
|
||||||
my_uring_prep_recvmsg(sqe, peer_fd, &cl->read_msg, 0);
|
my_uring_prep_recvmsg(sqe, peer_fd, &cl->read_msg, 0);
|
||||||
|
if (iothread)
|
||||||
|
{
|
||||||
|
iothread->add_sqe(sqe_local);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
|
|
@ -189,7 +189,11 @@ bool osd_messenger_t::try_send(osd_client_t *cl)
|
||||||
}
|
}
|
||||||
if (ringloop && !use_sync_send_recv)
|
if (ringloop && !use_sync_send_recv)
|
||||||
{
|
{
|
||||||
io_uring_sqe* sqe = ringloop->get_sqe();
|
auto iothread = iothreads.size() ? iothreads[peer_fd % iothreads.size()] : NULL;
|
||||||
|
io_uring_sqe sqe_local;
|
||||||
|
ring_data_t data_local;
|
||||||
|
sqe_local.user_data = (uint64_t)&data_local;
|
||||||
|
io_uring_sqe* sqe = (iothread ? &sqe_local : ringloop->get_sqe());
|
||||||
if (!sqe)
|
if (!sqe)
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
|
@ -200,6 +204,10 @@ bool osd_messenger_t::try_send(osd_client_t *cl)
|
||||||
ring_data_t* data = ((ring_data_t*)sqe->user_data);
|
ring_data_t* data = ((ring_data_t*)sqe->user_data);
|
||||||
data->callback = [this, cl](ring_data_t *data) { handle_send(data->res, cl); };
|
data->callback = [this, cl](ring_data_t *data) { handle_send(data->res, cl); };
|
||||||
my_uring_prep_sendmsg(sqe, peer_fd, &cl->write_msg, 0);
|
my_uring_prep_sendmsg(sqe, peer_fd, &cl->write_msg, 0);
|
||||||
|
if (iothread)
|
||||||
|
{
|
||||||
|
iothread->add_sqe(sqe_local);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
|
|
@ -79,6 +79,7 @@ void ring_loop_t::loop()
|
||||||
struct io_uring_cqe *cqe;
|
struct io_uring_cqe *cqe;
|
||||||
while (!io_uring_peek_cqe(&ring, &cqe))
|
while (!io_uring_peek_cqe(&ring, &cqe))
|
||||||
{
|
{
|
||||||
|
mu.lock();
|
||||||
struct ring_data_t *d = (struct ring_data_t*)cqe->user_data;
|
struct ring_data_t *d = (struct ring_data_t*)cqe->user_data;
|
||||||
if (d->callback)
|
if (d->callback)
|
||||||
{
|
{
|
||||||
|
@ -90,12 +91,14 @@ void ring_loop_t::loop()
|
||||||
dl.res = cqe->res;
|
dl.res = cqe->res;
|
||||||
dl.callback.swap(d->callback);
|
dl.callback.swap(d->callback);
|
||||||
free_ring_data[free_ring_data_ptr++] = d - ring_datas;
|
free_ring_data[free_ring_data_ptr++] = d - ring_datas;
|
||||||
|
mu.unlock();
|
||||||
dl.callback(&dl);
|
dl.callback(&dl);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
fprintf(stderr, "Warning: empty callback in SQE\n");
|
fprintf(stderr, "Warning: empty callback in SQE\n");
|
||||||
free_ring_data[free_ring_data_ptr++] = d - ring_datas;
|
free_ring_data[free_ring_data_ptr++] = d - ring_datas;
|
||||||
|
mu.unlock();
|
||||||
}
|
}
|
||||||
io_uring_cqe_seen(&ring, cqe);
|
io_uring_cqe_seen(&ring, cqe);
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
#include <mutex>
|
||||||
|
|
||||||
#define RINGLOOP_DEFAULT_SIZE 1024
|
#define RINGLOOP_DEFAULT_SIZE 1024
|
||||||
|
|
||||||
|
@ -124,6 +125,7 @@ class ring_loop_t
|
||||||
std::vector<std::function<void()>> immediate_queue, immediate_queue2;
|
std::vector<std::function<void()>> immediate_queue, immediate_queue2;
|
||||||
std::vector<ring_consumer_t*> consumers;
|
std::vector<ring_consumer_t*> consumers;
|
||||||
struct ring_data_t *ring_datas;
|
struct ring_data_t *ring_datas;
|
||||||
|
std::mutex mu;
|
||||||
int *free_ring_data;
|
int *free_ring_data;
|
||||||
unsigned free_ring_data_ptr;
|
unsigned free_ring_data_ptr;
|
||||||
bool loop_again;
|
bool loop_again;
|
||||||
|
@ -138,12 +140,17 @@ public:
|
||||||
|
|
||||||
inline struct io_uring_sqe* get_sqe()
|
inline struct io_uring_sqe* get_sqe()
|
||||||
{
|
{
|
||||||
|
mu.lock();
|
||||||
if (free_ring_data_ptr == 0)
|
if (free_ring_data_ptr == 0)
|
||||||
|
{
|
||||||
|
mu.unlock();
|
||||||
return NULL;
|
return NULL;
|
||||||
|
}
|
||||||
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring);
|
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring);
|
||||||
assert(sqe);
|
assert(sqe);
|
||||||
*sqe = { 0 };
|
*sqe = { 0 };
|
||||||
io_uring_sqe_set_data(sqe, ring_datas + free_ring_data[--free_ring_data_ptr]);
|
io_uring_sqe_set_data(sqe, ring_datas + free_ring_data[--free_ring_data_ptr]);
|
||||||
|
mu.unlock();
|
||||||
return sqe;
|
return sqe;
|
||||||
}
|
}
|
||||||
inline void set_immediate(const std::function<void()> cb)
|
inline void set_immediate(const std::function<void()> cb)
|
||||||
|
|
Loading…
Reference in New Issue