Compare commits

...

1 Commits

7 changed files with 122 additions and 1 deletions

View File

@ -155,7 +155,9 @@ NAN_METHOD(NodeVitastor::Read)
self->Ref();
vitastor_c_read(self->c, ((pool << (64-POOL_ID_BITS)) | inode), req->offset, req->len, &req->iov, 1, on_read_finish, req);
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
vitastor_c_uring_handle_events(self->c);
#endif
}
NodeVitastorRequest* NodeVitastor::get_write_request(const Nan::FunctionCallbackInfo<v8::Value> & info, int argpos)
@ -224,7 +226,9 @@ NAN_METHOD(NodeVitastor::Write)
req->iov_list.size() ? req->iov_list.data() : &req->iov,
req->iov_list.size() ? req->iov_list.size() : 1,
on_write_finish, req);
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
vitastor_c_uring_handle_events(self->c);
#endif
}
NodeVitastorRequest* NodeVitastor::get_delete_request(const Nan::FunctionCallbackInfo<v8::Value> & info, int argpos)
@ -271,7 +275,9 @@ NAN_METHOD(NodeVitastor::Delete)
self->Ref();
vitastor_c_delete(self->c, ((pool << (64-POOL_ID_BITS)) | inode), req->offset, req->len, req->version,
on_write_finish, req);
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
vitastor_c_uring_handle_events(self->c);
#endif
}
// sync(callback(err))
@ -288,7 +294,9 @@ NAN_METHOD(NodeVitastor::Sync)
self->Ref();
vitastor_c_sync(self->c, on_write_finish, req);
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
vitastor_c_uring_handle_events(self->c);
#endif
}
// read_bitmap(pool, inode, offset, length, with_parents, callback(err, bitmap_buffer))
@ -310,7 +318,9 @@ NAN_METHOD(NodeVitastor::ReadBitmap)
auto req = new NodeVitastorRequest(self, callback);
self->Ref();
vitastor_c_read_bitmap(self->c, ((pool << (64-POOL_ID_BITS)) | inode), offset, len, with_parents, on_read_bitmap_finish, req);
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
vitastor_c_uring_handle_events(self->c);
#endif
}
static void on_error(NodeVitastorRequest *req, Nan::Callback & nanCallback, long retval)
@ -335,7 +345,9 @@ NAN_METHOD(NodeVitastor::OnReady)
auto req = new NodeVitastorRequest(self, callback);
self->Ref();
vitastor_c_on_ready(self->c, on_ready_finish, req);
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
vitastor_c_uring_handle_events(self->c);
#endif
}
void NodeVitastor::on_ready_finish(void *opaque, long retval)
@ -480,7 +492,9 @@ NAN_METHOD(NodeVitastorImage::Create)
img->Ref();
cli->Ref();
vitastor_c_watch_inode(cli->c, (char*)img->name.c_str(), on_watch_start, img);
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
vitastor_c_uring_handle_events(cli->c);
#endif
info.GetReturnValue().Set(info.This());
}
@ -621,7 +635,9 @@ void NodeVitastorImage::exec_request(NodeVitastorRequest *req)
uint64_t ino = vitastor_c_inode_get_num(watch);
cli->Ref();
vitastor_c_read(cli->c, ino, req->offset, req->len, &req->iov, 1, NodeVitastor::on_read_finish, req);
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
vitastor_c_uring_handle_events(cli->c);
#endif
}
else if (req->op == NODE_VITASTOR_WRITE)
{
@ -631,7 +647,9 @@ void NodeVitastorImage::exec_request(NodeVitastorRequest *req)
req->iov_list.size() ? req->iov_list.data() : &req->iov,
req->iov_list.size() ? req->iov_list.size() : 1,
NodeVitastor::on_write_finish, req);
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
vitastor_c_uring_handle_events(cli->c);
#endif
}
else if (req->op == NODE_VITASTOR_DELETE)
{
@ -639,7 +657,9 @@ void NodeVitastorImage::exec_request(NodeVitastorRequest *req)
cli->Ref();
vitastor_c_delete(cli->c, ino, req->offset, req->len, req->version,
NodeVitastor::on_write_finish, req);
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
vitastor_c_uring_handle_events(cli->c);
#endif
}
else if (req->op == NODE_VITASTOR_SYNC)
{
@ -649,7 +669,9 @@ void NodeVitastorImage::exec_request(NodeVitastorRequest *req)
if (imm != IMMEDIATE_ALL)
{
vitastor_c_sync(cli->c, NodeVitastor::on_write_finish, req);
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
vitastor_c_uring_handle_events(cli->c);
#endif
}
else
{
@ -661,7 +683,9 @@ void NodeVitastorImage::exec_request(NodeVitastorRequest *req)
uint64_t ino = vitastor_c_inode_get_num(watch);
cli->Ref();
vitastor_c_read_bitmap(cli->c, ino, req->offset, req->len, req->with_parents, NodeVitastor::on_read_bitmap_finish, req);
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
vitastor_c_uring_handle_events(cli->c);
#endif
}
else if (req->op == NODE_VITASTOR_GET_INFO)
{
@ -793,7 +817,9 @@ NAN_METHOD(NodeVitastorKV::Open)
delete req;
kv->Unref();
});
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
vitastor_c_uring_handle_events(kv->cli->c);
#endif
}
// close(callback(err))
@ -817,7 +843,9 @@ NAN_METHOD(NodeVitastorKV::Close)
delete req;
kv->Unref();
});
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
vitastor_c_uring_handle_events(kv->cli->c);
#endif
}
// set_config({ ...config })
@ -876,7 +904,9 @@ void NodeVitastorKV::get_impl(const Nan::FunctionCallbackInfo<v8::Value> & info,
delete req;
kv->Unref();
}, allow_cache);
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
vitastor_c_uring_handle_events(kv->cli->c);
#endif
}
// get(key, callback(err, value))
@ -949,7 +979,9 @@ NAN_METHOD(NodeVitastorKV::Set)
delete cas_req;
kv->Unref();
}, cas_cb);
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
vitastor_c_uring_handle_events(kv->cli->c);
#endif
}
// del(key, callback(err), cas_compare(old_value)?)
@ -988,7 +1020,9 @@ NAN_METHOD(NodeVitastorKV::Del)
delete cas_req;
kv->Unref();
}, cas_cb);
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
vitastor_c_uring_handle_events(kv->cli->c);
#endif
}
// list(start_key?)
@ -1109,7 +1143,9 @@ NAN_METHOD(NodeVitastorKVListing::Next)
list->iter = req;
list->kv->Unref();
});
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
vitastor_c_uring_handle_events(list->kv->cli->c);
#endif
}
// close()

View File

@ -294,7 +294,9 @@ static void coroutine_fn vitastor_co_get_metadata(VitastorRPC *task)
qemu_mutex_lock(&client->mutex);
vitastor_c_watch_inode(client->proxy, client->image, vitastor_co_generic_cb, task);
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
vitastor_schedule_uring_handler(client);
#endif
qemu_mutex_unlock(&client->mutex);
while (!task->complete)
@ -749,7 +751,9 @@ static int coroutine_fn vitastor_co_preadv(BlockDriverState *bs,
uint64_t inode = client->watch ? vitastor_c_inode_get_num(client->watch) : client->inode;
qemu_mutex_lock(&client->mutex);
vitastor_c_read(client->proxy, inode, offset, bytes, iov->iov, iov->niov, vitastor_co_read_cb, &task);
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
vitastor_schedule_uring_handler(client);
#endif
qemu_mutex_unlock(&client->mutex);
while (!task.complete)
@ -783,7 +787,9 @@ static int coroutine_fn vitastor_co_pwritev(BlockDriverState *bs,
uint64_t inode = client->watch ? vitastor_c_inode_get_num(client->watch) : client->inode;
qemu_mutex_lock(&client->mutex);
vitastor_c_write(client->proxy, inode, offset, bytes, 0, iov->iov, iov->niov, vitastor_co_generic_cb, &task);
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
vitastor_schedule_uring_handler(client);
#endif
qemu_mutex_unlock(&client->mutex);
while (!task.complete)
@ -863,7 +869,9 @@ static int coroutine_fn vitastor_co_block_status(
task.bitmap = client->last_bitmap = NULL;
qemu_mutex_lock(&client->mutex);
vitastor_c_read_bitmap(client->proxy, task.inode, task.offset, task.len, !client->skip_parents, vitastor_co_read_bitmap_cb, &task);
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
vitastor_schedule_uring_handler(client);
#endif
qemu_mutex_unlock(&client->mutex);
while (!task.complete)
{
@ -950,7 +958,9 @@ static int coroutine_fn vitastor_co_flush(BlockDriverState *bs)
qemu_mutex_lock(&client->mutex);
vitastor_c_sync(client->proxy, vitastor_co_generic_cb, &task);
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
vitastor_schedule_uring_handler(client);
#endif
qemu_mutex_unlock(&client->mutex);
while (!task.complete)

View File

@ -127,6 +127,7 @@ vitastor_c *vitastor_c_create_qemu_uring(QEMUSetFDHandler *aio_set_fd_handler, v
auto self = vitastor_c_create_qemu_common(aio_set_fd_handler, aio_context);
self->ringloop = ringloop;
self->cli = new cluster_client_t(self->ringloop, self->tfd, cfg_json);
ringloop->loop_continue();
return self;
}
@ -150,6 +151,7 @@ vitastor_c *vitastor_c_create_uring(const char *config_path, const char *etcd_ho
self->ringloop = ringloop;
self->epmgr = new epoll_manager_t(self->ringloop);
self->cli = new cluster_client_t(self->ringloop, self->epmgr->tfd, cfg_json);
ringloop->loop_continue();
return self;
}
@ -183,6 +185,7 @@ vitastor_c *vitastor_c_create_uring_json(const char **options, int options_len)
self->ringloop = ringloop;
self->epmgr = new epoll_manager_t(self->ringloop);
self->cli = new cluster_client_t(self->ringloop, self->epmgr->tfd, cfg_json);
ringloop->loop_continue();
return self;
}
@ -228,6 +231,10 @@ void vitastor_c_on_ready(vitastor_c *client, VitastorIOHandler cb, void *opaque)
{
cb(opaque, 0);
});
if (client->ringloop)
{
client->ringloop->loop_continue();
}
}
void vitastor_c_uring_wait_ready(vitastor_c *client)
@ -284,6 +291,10 @@ void vitastor_c_read(vitastor_c *client, uint64_t inode, uint64_t offset, uint64
delete op;
};
client->cli->execute(op);
if (client->ringloop)
{
client->ringloop->loop_continue();
}
}
void vitastor_c_write(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len, uint64_t check_version,
@ -305,6 +316,10 @@ void vitastor_c_write(vitastor_c *client, uint64_t inode, uint64_t offset, uint6
delete op;
};
client->cli->execute(op);
if (client->ringloop)
{
client->ringloop->loop_continue();
}
}
void vitastor_c_delete(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len, uint64_t check_version,
@ -322,6 +337,10 @@ void vitastor_c_delete(vitastor_c *client, uint64_t inode, uint64_t offset, uint
delete op;
};
client->cli->execute(op);
if (client->ringloop)
{
client->ringloop->loop_continue();
}
}
void vitastor_c_read_bitmap(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len,
@ -344,6 +363,10 @@ void vitastor_c_read_bitmap(vitastor_c *client, uint64_t inode, uint64_t offset,
delete op;
};
client->cli->execute(op);
if (client->ringloop)
{
client->ringloop->loop_continue();
}
}
void vitastor_c_sync(vitastor_c *client, VitastorIOHandler cb, void *opaque)
@ -356,6 +379,10 @@ void vitastor_c_sync(vitastor_c *client, VitastorIOHandler cb, void *opaque)
delete op;
};
client->cli->execute(op);
if (client->ringloop)
{
client->ringloop->loop_continue();
}
}
void vitastor_c_watch_inode(vitastor_c *client, char *image, VitastorIOHandler cb, void *opaque)
@ -365,6 +392,10 @@ void vitastor_c_watch_inode(vitastor_c *client, char *image, VitastorIOHandler c
auto watch = client->cli->st_cli.watch_inode(std::string(image));
cb(opaque, (long)watch);
});
if (client->ringloop)
{
client->ringloop->loop_continue();
}
}
void vitastor_c_close_watch(vitastor_c *client, void *handle)

View File

@ -7,7 +7,7 @@
#define VITASTOR_QEMU_PROXY_H
// C API wrapper version
#define VITASTOR_C_API_VERSION 4
#define VITASTOR_C_API_VERSION 5
#ifndef POOL_ID_BITS
#define POOL_ID_BITS 16

View File

@ -3,6 +3,7 @@
#include <sys/epoll.h>
#include <sys/poll.h>
#include <poll.h>
#include <unistd.h>
#include <stdexcept>
@ -74,6 +75,27 @@ void epoll_manager_t::set_fd_handler(int fd, bool wr, std::function<void(int, in
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
}
epoll_handlers[fd] = handler;
// We use edge-triggered epoll so it may miss events which already happened
// on the FD at the moment of adding it to epoll. So check for these with poll()
struct pollfd initpoll = { .fd = fd, .events = (short)((wr ? POLLOUT : 0) | POLLIN | POLLRDHUP) };
int r = poll(&initpoll, 1, 0);
if (r < 0)
throw std::runtime_error(std::string("poll: ") + strerror(errno));
if (r > 0)
{
auto events = ((initpoll.revents & POLLOUT) ? EPOLLOUT : 0) |
((initpoll.revents & POLLIN) ? EPOLLIN : 0) |
((initpoll.revents & POLLRDHUP) ? EPOLLRDHUP : 0);
tfd->set_timer_us(1, false, [this, fd, events](int)
{
auto cb_it = epoll_handlers.find(fd);
if (cb_it != epoll_handlers.end())
{
auto & cb = cb_it->second;
cb(fd, events);
}
});
}
}
else
{

View File

@ -29,6 +29,7 @@ ring_loop_t::ring_loop_t(int qd, bool multithreaded)
{
free_ring_data[i] = i;
}
in_loop = false;
}
ring_loop_t::~ring_loop_t()
@ -86,6 +87,11 @@ io_uring_sqe* ring_loop_t::get_sqe()
void ring_loop_t::loop()
{
if (in_loop)
{
return;
}
in_loop = true;
if (ring_eventfd >= 0)
{
// Reset eventfd counter
@ -125,6 +131,17 @@ void ring_loop_t::loop()
}
io_uring_cqe_seen(&ring, cqe);
}
in_loop = false;
loop_continue();
}
void ring_loop_t::loop_continue()
{
if (in_loop)
{
return;
}
in_loop = true;
do
{
loop_again = false;
@ -140,6 +157,7 @@ void ring_loop_t::loop()
}
}
} while (loop_again);
in_loop = false;
}
unsigned ring_loop_t::save()
@ -189,5 +207,7 @@ int ring_loop_t::register_eventfd()
ring_eventfd = -1;
return r;
}
// Loop once to prevent skipping events happened before eventfd was registered
loop();
return ring_eventfd;
}

View File

@ -129,6 +129,7 @@ class ring_loop_t
bool mt;
int *free_ring_data;
unsigned free_ring_data_ptr;
bool in_loop;
bool loop_again;
struct io_uring ring;
int ring_eventfd = -1;
@ -164,6 +165,7 @@ public:
}
void loop();
void loop_continue();
void wakeup();
unsigned save();