Compare commits

..

No commits in common. "8bfea6e7deb72c418caa40a227b77a0544493b63" and "ad30b1151933d88c187f4394c50e8ce1c18e8093" have entirely different histories.

6 changed files with 25 additions and 121 deletions

View File

@ -193,10 +193,6 @@ void cluster_client_t::inc_wait(uint64_t opcode, uint64_t flags, cluster_op_t *n
bh_op_max *= 2; bh_op_max *= 2;
cluster_op_t **n = (cluster_op_t**)malloc_or_die(sizeof(cluster_op_t*) * bh_op_max); cluster_op_t **n = (cluster_op_t**)malloc_or_die(sizeof(cluster_op_t*) * bh_op_max);
memcpy(n, bh_ops, sizeof(cluster_op_t*) * bh_op_count); memcpy(n, bh_ops, sizeof(cluster_op_t*) * bh_op_count);
if (bh_ops != bh_ops_local)
{
free(bh_ops);
}
bh_ops = n; bh_ops = n;
} }
bh_ops[bh_op_count++] = next; bh_ops[bh_op_count++] = next;

View File

@ -23,24 +23,19 @@ epoll_manager_t::epoll_manager_t(ring_loop_t *ringloop)
tfd = new timerfd_manager_t([this](int fd, bool wr, std::function<void(int, int)> handler) { set_fd_handler(fd, wr, handler); }); tfd = new timerfd_manager_t([this](int fd, bool wr, std::function<void(int, int)> handler) { set_fd_handler(fd, wr, handler); });
if (ringloop) consumer.loop = [this]()
{ {
consumer.loop = [this]() if (pending)
{ handle_epoll_events();
if (pending) };
handle_uring_event(); ringloop->register_consumer(&consumer);
};
ringloop->register_consumer(&consumer); handle_epoll_events();
handle_uring_event();
}
} }
epoll_manager_t::~epoll_manager_t() epoll_manager_t::~epoll_manager_t()
{ {
if (ringloop) ringloop->unregister_consumer(&consumer);
{
ringloop->unregister_consumer(&consumer);
}
if (tfd) if (tfd)
{ {
delete tfd; delete tfd;
@ -49,11 +44,6 @@ epoll_manager_t::~epoll_manager_t()
close(epoll_fd); close(epoll_fd);
} }
int epoll_manager_t::get_fd()
{
return epoll_fd;
}
void epoll_manager_t::set_fd_handler(int fd, bool wr, std::function<void(int, int)> handler) void epoll_manager_t::set_fd_handler(int fd, bool wr, std::function<void(int, int)> handler)
{ {
if (handler != NULL) if (handler != NULL)
@ -85,7 +75,7 @@ void epoll_manager_t::set_fd_handler(int fd, bool wr, std::function<void(int, in
} }
} }
void epoll_manager_t::handle_uring_event() void epoll_manager_t::handle_epoll_events()
{ {
io_uring_sqe *sqe = ringloop->get_sqe(); io_uring_sqe *sqe = ringloop->get_sqe();
if (!sqe) if (!sqe)
@ -105,20 +95,14 @@ void epoll_manager_t::handle_uring_event()
{ {
throw std::runtime_error(std::string("epoll failed: ") + strerror(-data->res)); throw std::runtime_error(std::string("epoll failed: ") + strerror(-data->res));
} }
handle_uring_event(); handle_epoll_events();
}; };
ringloop->submit(); ringloop->submit();
handle_events(0);
}
void epoll_manager_t::handle_events(int timeout)
{
int nfds; int nfds;
epoll_event events[MAX_EPOLL_EVENTS]; epoll_event events[MAX_EPOLL_EVENTS];
do do
{ {
nfds = epoll_wait(epoll_fd, events, MAX_EPOLL_EVENTS, timeout); nfds = epoll_wait(epoll_fd, events, MAX_EPOLL_EVENTS, 0);
timeout = 0;
for (int i = 0; i < nfds; i++) for (int i = 0; i < nfds; i++)
{ {
auto cb_it = epoll_handlers.find(events[i].data.fd); auto cb_it = epoll_handlers.find(events[i].data.fd);

View File

@ -15,14 +15,11 @@ class epoll_manager_t
ring_consumer_t consumer; ring_consumer_t consumer;
ring_loop_t *ringloop; ring_loop_t *ringloop;
std::map<int, std::function<void(int, int)>> epoll_handlers; std::map<int, std::function<void(int, int)>> epoll_handlers;
void handle_uring_event();
public: public:
epoll_manager_t(ring_loop_t *ringloop); epoll_manager_t(ring_loop_t *ringloop);
~epoll_manager_t(); ~epoll_manager_t();
int get_fd();
void set_fd_handler(int fd, bool wr, std::function<void(int, int)> handler); void set_fd_handler(int fd, bool wr, std::function<void(int, int)> handler);
void handle_events(int timeout); void handle_epoll_events();
timerfd_manager_t *tfd; timerfd_manager_t *tfd;
}; };

View File

@ -32,7 +32,6 @@
struct sec_data struct sec_data
{ {
vitastor_c *cli = NULL; vitastor_c *cli = NULL;
bool epoll_based = false;
void *watch = NULL; void *watch = NULL;
bool last_sync = false; bool last_sync = false;
/* The list of completed io_u structs. */ /* The list of completed io_u structs. */
@ -59,7 +58,6 @@ struct sec_options
int rdma_port_num = 0; int rdma_port_num = 0;
int rdma_gid_index = 0; int rdma_gid_index = 0;
int rdma_mtu = 0; int rdma_mtu = 0;
int no_io_uring = 0;
}; };
static struct fio_option options[] = { static struct fio_option options[] = {
@ -195,16 +193,6 @@ static struct fio_option options[] = {
.category = FIO_OPT_C_ENGINE, .category = FIO_OPT_C_ENGINE,
.group = FIO_OPT_G_FILENAME, .group = FIO_OPT_G_FILENAME,
}, },
{
.name = "no_io_uring",
.lname = "Disable io_uring",
.type = FIO_OPT_BOOL,
.off1 = offsetof(struct sec_options, no_io_uring),
.help = "Use epoll and plain sendmsg/recvmsg instead of io_uring (slower)",
.def = "0",
.category = FIO_OPT_C_ENGINE,
.group = FIO_OPT_G_FILENAME,
},
{ {
.name = NULL, .name = NULL,
}, },
@ -293,17 +281,7 @@ static int sec_setup(struct thread_data *td)
opt_push(options, "log_level", std::to_string(o->cluster_log).c_str()); opt_push(options, "log_level", std::to_string(o->cluster_log).c_str());
// allow writeback caching if -direct is not set // allow writeback caching if -direct is not set
opt_push(options, "client_writeback_allowed", td->o.odirect ? "0" : "1"); opt_push(options, "client_writeback_allowed", td->o.odirect ? "0" : "1");
bsd->cli = o->no_io_uring ? NULL : vitastor_c_create_uring_json((const char**)options.data(), options.size()); bsd->cli = vitastor_c_create_uring_json((const char**)options.data(), options.size());
bsd->epoll_based = false;
if (!bsd->cli)
{
if (o->no_io_uring)
fprintf(stderr, "vitastor: io_uring disabled - I/O will be slower\n");
else
fprintf(stderr, "vitastor: failed to create io_uring: %s - I/O will be slower\n", strerror(errno));
bsd->cli = vitastor_c_create_epoll_json((const char**)options.data(), options.size());
bsd->epoll_based = true;
}
for (auto opt: options) for (auto opt: options)
free(opt); free(opt);
options.clear(); options.clear();
@ -311,24 +289,12 @@ static int sec_setup(struct thread_data *td)
{ {
bsd->watch = NULL; bsd->watch = NULL;
vitastor_c_watch_inode(bsd->cli, o->image, watch_callback, bsd); vitastor_c_watch_inode(bsd->cli, o->image, watch_callback, bsd);
if (!bsd->epoll_based) while (true)
{ {
while (true) vitastor_c_uring_handle_events(bsd->cli);
{ if (bsd->watch)
vitastor_c_uring_handle_events(bsd->cli); break;
if (bsd->watch) vitastor_c_uring_wait_events(bsd->cli);
break;
vitastor_c_uring_wait_events(bsd->cli);
}
}
else
{
while (true)
{
if (bsd->watch)
break;
vitastor_c_epoll_handle_events(bsd->cli, 1000);
}
} }
td->files[0]->real_file_size = vitastor_c_inode_get_size(bsd->watch); td->files[0]->real_file_size = vitastor_c_inode_get_size(bsd->watch);
if (!vitastor_c_inode_get_num(bsd->watch) || if (!vitastor_c_inode_get_num(bsd->watch) ||
@ -471,24 +437,12 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
static int sec_getevents(struct thread_data *td, unsigned int min, unsigned int max, const struct timespec *t) static int sec_getevents(struct thread_data *td, unsigned int min, unsigned int max, const struct timespec *t)
{ {
sec_data *bsd = (sec_data*)td->io_ops_data; sec_data *bsd = (sec_data*)td->io_ops_data;
if (!bsd->epoll_based) while (true)
{ {
while (true) vitastor_c_uring_handle_events(bsd->cli);
{ if (bsd->completed.size() >= min)
vitastor_c_uring_handle_events(bsd->cli); break;
if (bsd->completed.size() >= min) vitastor_c_uring_wait_events(bsd->cli);
break;
vitastor_c_uring_wait_events(bsd->cli);
}
}
else
{
while (true)
{
if (bsd->completed.size() >= min)
break;
vitastor_c_epoll_handle_events(bsd->cli, 1000);
}
} }
return bsd->completed.size(); return bsd->completed.size();
} }

View File

@ -186,26 +186,12 @@ vitastor_c *vitastor_c_create_uring_json(const char **options, int options_len)
return self; return self;
} }
vitastor_c *vitastor_c_create_epoll_json(const char **options, int options_len)
{
json11::Json::object cfg;
for (int i = 0; i < options_len-1; i += 2)
{
cfg[options[i]] = std::string(options[i+1]);
}
json11::Json cfg_json(cfg);
vitastor_c *self = new vitastor_c;
self->epmgr = new epoll_manager_t(NULL);
self->cli = new cluster_client_t(NULL, self->epmgr->tfd, cfg_json);
return self;
}
void vitastor_c_destroy(vitastor_c *client) void vitastor_c_destroy(vitastor_c *client)
{ {
delete client->cli; delete client->cli;
if (client->epmgr) if (client->epmgr)
delete client->epmgr; delete client->epmgr;
else if (client->tfd) else
delete client->tfd; delete client->tfd;
if (client->ringloop) if (client->ringloop)
delete client->ringloop; delete client->ringloop;
@ -243,16 +229,6 @@ int vitastor_c_uring_has_work(vitastor_c *client)
return client->ringloop->has_work(); return client->ringloop->has_work();
} }
int vitastor_c_epoll_get_fd(vitastor_c *client)
{
return !client->ringloop && client->epmgr ? client->epmgr->get_fd() : -1;
}
void vitastor_c_epoll_handle_events(vitastor_c *client, int timeout)
{
return client->epmgr->handle_events(timeout);
}
void vitastor_c_read(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len, void vitastor_c_read(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len,
struct iovec *iov, int iovcnt, VitastorReadHandler cb, void *opaque) struct iovec *iov, int iovcnt, VitastorReadHandler cb, void *opaque)
{ {

View File

@ -7,7 +7,7 @@
#define VITASTOR_QEMU_PROXY_H #define VITASTOR_QEMU_PROXY_H
// C API wrapper version // C API wrapper version
#define VITASTOR_C_API_VERSION 3 #define VITASTOR_C_API_VERSION 2
#ifndef POOL_ID_BITS #ifndef POOL_ID_BITS
#define POOL_ID_BITS 16 #define POOL_ID_BITS 16
@ -40,7 +40,6 @@ vitastor_c *vitastor_c_create_qemu_uring(QEMUSetFDHandler *aio_set_fd_handler, v
vitastor_c *vitastor_c_create_uring(const char *config_path, const char *etcd_host, const char *etcd_prefix, vitastor_c *vitastor_c_create_uring(const char *config_path, const char *etcd_host, const char *etcd_prefix,
int use_rdma, const char *rdma_device, int rdma_port_num, int rdma_gid_index, int rdma_mtu, int log_level); int use_rdma, const char *rdma_device, int rdma_port_num, int rdma_gid_index, int rdma_mtu, int log_level);
vitastor_c *vitastor_c_create_uring_json(const char **options, int options_len); vitastor_c *vitastor_c_create_uring_json(const char **options, int options_len);
vitastor_c *vitastor_c_create_epoll_json(const char **options, int options_len);
void vitastor_c_destroy(vitastor_c *client); void vitastor_c_destroy(vitastor_c *client);
int vitastor_c_is_ready(vitastor_c *client); int vitastor_c_is_ready(vitastor_c *client);
int vitastor_c_uring_register_eventfd(vitastor_c *client); int vitastor_c_uring_register_eventfd(vitastor_c *client);
@ -48,8 +47,6 @@ void vitastor_c_uring_wait_ready(vitastor_c *client);
void vitastor_c_uring_handle_events(vitastor_c *client); void vitastor_c_uring_handle_events(vitastor_c *client);
void vitastor_c_uring_wait_events(vitastor_c *client); void vitastor_c_uring_wait_events(vitastor_c *client);
int vitastor_c_uring_has_work(vitastor_c *client); int vitastor_c_uring_has_work(vitastor_c *client);
int vitastor_c_epoll_get_fd(vitastor_c *client);
void vitastor_c_epoll_handle_events(vitastor_c *client, int timeout);
void vitastor_c_read(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len, void vitastor_c_read(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len,
struct iovec *iov, int iovcnt, VitastorReadHandler cb, void *opaque); struct iovec *iov, int iovcnt, VitastorReadHandler cb, void *opaque);
void vitastor_c_write(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len, uint64_t check_version, void vitastor_c_write(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len, uint64_t check_version,