Compare commits
1 Commits
master
...
eventloop-
Author | SHA1 | Date |
---|---|---|
|
194f7e0187 |
node-binding
|
@ -155,7 +155,9 @@ NAN_METHOD(NodeVitastor::Read)
|
|||
|
||||
self->Ref();
|
||||
vitastor_c_read(self->c, ((pool << (64-POOL_ID_BITS)) | inode), req->offset, req->len, &req->iov, 1, on_read_finish, req);
|
||||
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
|
||||
vitastor_c_uring_handle_events(self->c);
|
||||
#endif
|
||||
}
|
||||
|
||||
NodeVitastorRequest* NodeVitastor::get_write_request(const Nan::FunctionCallbackInfo<v8::Value> & info, int argpos)
|
||||
|
@ -224,7 +226,9 @@ NAN_METHOD(NodeVitastor::Write)
|
|||
req->iov_list.size() ? req->iov_list.data() : &req->iov,
|
||||
req->iov_list.size() ? req->iov_list.size() : 1,
|
||||
on_write_finish, req);
|
||||
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
|
||||
vitastor_c_uring_handle_events(self->c);
|
||||
#endif
|
||||
}
|
||||
|
||||
NodeVitastorRequest* NodeVitastor::get_delete_request(const Nan::FunctionCallbackInfo<v8::Value> & info, int argpos)
|
||||
|
@ -271,7 +275,9 @@ NAN_METHOD(NodeVitastor::Delete)
|
|||
self->Ref();
|
||||
vitastor_c_delete(self->c, ((pool << (64-POOL_ID_BITS)) | inode), req->offset, req->len, req->version,
|
||||
on_write_finish, req);
|
||||
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
|
||||
vitastor_c_uring_handle_events(self->c);
|
||||
#endif
|
||||
}
|
||||
|
||||
// sync(callback(err))
|
||||
|
@ -288,7 +294,9 @@ NAN_METHOD(NodeVitastor::Sync)
|
|||
|
||||
self->Ref();
|
||||
vitastor_c_sync(self->c, on_write_finish, req);
|
||||
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
|
||||
vitastor_c_uring_handle_events(self->c);
|
||||
#endif
|
||||
}
|
||||
|
||||
// read_bitmap(pool, inode, offset, length, with_parents, callback(err, bitmap_buffer))
|
||||
|
@ -310,7 +318,9 @@ NAN_METHOD(NodeVitastor::ReadBitmap)
|
|||
auto req = new NodeVitastorRequest(self, callback);
|
||||
self->Ref();
|
||||
vitastor_c_read_bitmap(self->c, ((pool << (64-POOL_ID_BITS)) | inode), offset, len, with_parents, on_read_bitmap_finish, req);
|
||||
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
|
||||
vitastor_c_uring_handle_events(self->c);
|
||||
#endif
|
||||
}
|
||||
|
||||
static void on_error(NodeVitastorRequest *req, Nan::Callback & nanCallback, long retval)
|
||||
|
@ -335,7 +345,9 @@ NAN_METHOD(NodeVitastor::OnReady)
|
|||
auto req = new NodeVitastorRequest(self, callback);
|
||||
self->Ref();
|
||||
vitastor_c_on_ready(self->c, on_ready_finish, req);
|
||||
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
|
||||
vitastor_c_uring_handle_events(self->c);
|
||||
#endif
|
||||
}
|
||||
|
||||
void NodeVitastor::on_ready_finish(void *opaque, long retval)
|
||||
|
@ -480,7 +492,9 @@ NAN_METHOD(NodeVitastorImage::Create)
|
|||
img->Ref();
|
||||
cli->Ref();
|
||||
vitastor_c_watch_inode(cli->c, (char*)img->name.c_str(), on_watch_start, img);
|
||||
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
|
||||
vitastor_c_uring_handle_events(cli->c);
|
||||
#endif
|
||||
|
||||
info.GetReturnValue().Set(info.This());
|
||||
}
|
||||
|
@ -621,7 +635,9 @@ void NodeVitastorImage::exec_request(NodeVitastorRequest *req)
|
|||
uint64_t ino = vitastor_c_inode_get_num(watch);
|
||||
cli->Ref();
|
||||
vitastor_c_read(cli->c, ino, req->offset, req->len, &req->iov, 1, NodeVitastor::on_read_finish, req);
|
||||
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
|
||||
vitastor_c_uring_handle_events(cli->c);
|
||||
#endif
|
||||
}
|
||||
else if (req->op == NODE_VITASTOR_WRITE)
|
||||
{
|
||||
|
@ -631,7 +647,9 @@ void NodeVitastorImage::exec_request(NodeVitastorRequest *req)
|
|||
req->iov_list.size() ? req->iov_list.data() : &req->iov,
|
||||
req->iov_list.size() ? req->iov_list.size() : 1,
|
||||
NodeVitastor::on_write_finish, req);
|
||||
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
|
||||
vitastor_c_uring_handle_events(cli->c);
|
||||
#endif
|
||||
}
|
||||
else if (req->op == NODE_VITASTOR_DELETE)
|
||||
{
|
||||
|
@ -639,7 +657,9 @@ void NodeVitastorImage::exec_request(NodeVitastorRequest *req)
|
|||
cli->Ref();
|
||||
vitastor_c_delete(cli->c, ino, req->offset, req->len, req->version,
|
||||
NodeVitastor::on_write_finish, req);
|
||||
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
|
||||
vitastor_c_uring_handle_events(cli->c);
|
||||
#endif
|
||||
}
|
||||
else if (req->op == NODE_VITASTOR_SYNC)
|
||||
{
|
||||
|
@ -649,7 +669,9 @@ void NodeVitastorImage::exec_request(NodeVitastorRequest *req)
|
|||
if (imm != IMMEDIATE_ALL)
|
||||
{
|
||||
vitastor_c_sync(cli->c, NodeVitastor::on_write_finish, req);
|
||||
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
|
||||
vitastor_c_uring_handle_events(cli->c);
|
||||
#endif
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -661,7 +683,9 @@ void NodeVitastorImage::exec_request(NodeVitastorRequest *req)
|
|||
uint64_t ino = vitastor_c_inode_get_num(watch);
|
||||
cli->Ref();
|
||||
vitastor_c_read_bitmap(cli->c, ino, req->offset, req->len, req->with_parents, NodeVitastor::on_read_bitmap_finish, req);
|
||||
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
|
||||
vitastor_c_uring_handle_events(cli->c);
|
||||
#endif
|
||||
}
|
||||
else if (req->op == NODE_VITASTOR_GET_INFO)
|
||||
{
|
||||
|
@ -793,7 +817,9 @@ NAN_METHOD(NodeVitastorKV::Open)
|
|||
delete req;
|
||||
kv->Unref();
|
||||
});
|
||||
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
|
||||
vitastor_c_uring_handle_events(kv->cli->c);
|
||||
#endif
|
||||
}
|
||||
|
||||
// close(callback(err))
|
||||
|
@ -817,7 +843,9 @@ NAN_METHOD(NodeVitastorKV::Close)
|
|||
delete req;
|
||||
kv->Unref();
|
||||
});
|
||||
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
|
||||
vitastor_c_uring_handle_events(kv->cli->c);
|
||||
#endif
|
||||
}
|
||||
|
||||
// set_config({ ...config })
|
||||
|
@ -876,7 +904,9 @@ void NodeVitastorKV::get_impl(const Nan::FunctionCallbackInfo<v8::Value> & info,
|
|||
delete req;
|
||||
kv->Unref();
|
||||
}, allow_cache);
|
||||
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
|
||||
vitastor_c_uring_handle_events(kv->cli->c);
|
||||
#endif
|
||||
}
|
||||
|
||||
// get(key, callback(err, value))
|
||||
|
@ -949,7 +979,9 @@ NAN_METHOD(NodeVitastorKV::Set)
|
|||
delete cas_req;
|
||||
kv->Unref();
|
||||
}, cas_cb);
|
||||
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
|
||||
vitastor_c_uring_handle_events(kv->cli->c);
|
||||
#endif
|
||||
}
|
||||
|
||||
// del(key, callback(err), cas_compare(old_value)?)
|
||||
|
@ -988,7 +1020,9 @@ NAN_METHOD(NodeVitastorKV::Del)
|
|||
delete cas_req;
|
||||
kv->Unref();
|
||||
}, cas_cb);
|
||||
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
|
||||
vitastor_c_uring_handle_events(kv->cli->c);
|
||||
#endif
|
||||
}
|
||||
|
||||
// list(start_key?)
|
||||
|
@ -1109,7 +1143,9 @@ NAN_METHOD(NodeVitastorKVListing::Next)
|
|||
list->iter = req;
|
||||
list->kv->Unref();
|
||||
});
|
||||
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
|
||||
vitastor_c_uring_handle_events(list->kv->cli->c);
|
||||
#endif
|
||||
}
|
||||
|
||||
// close()
|
||||
|
|
|
@ -294,7 +294,9 @@ static void coroutine_fn vitastor_co_get_metadata(VitastorRPC *task)
|
|||
|
||||
qemu_mutex_lock(&client->mutex);
|
||||
vitastor_c_watch_inode(client->proxy, client->image, vitastor_co_generic_cb, task);
|
||||
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
|
||||
vitastor_schedule_uring_handler(client);
|
||||
#endif
|
||||
qemu_mutex_unlock(&client->mutex);
|
||||
|
||||
while (!task->complete)
|
||||
|
@ -749,7 +751,9 @@ static int coroutine_fn vitastor_co_preadv(BlockDriverState *bs,
|
|||
uint64_t inode = client->watch ? vitastor_c_inode_get_num(client->watch) : client->inode;
|
||||
qemu_mutex_lock(&client->mutex);
|
||||
vitastor_c_read(client->proxy, inode, offset, bytes, iov->iov, iov->niov, vitastor_co_read_cb, &task);
|
||||
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
|
||||
vitastor_schedule_uring_handler(client);
|
||||
#endif
|
||||
qemu_mutex_unlock(&client->mutex);
|
||||
|
||||
while (!task.complete)
|
||||
|
@ -783,7 +787,9 @@ static int coroutine_fn vitastor_co_pwritev(BlockDriverState *bs,
|
|||
uint64_t inode = client->watch ? vitastor_c_inode_get_num(client->watch) : client->inode;
|
||||
qemu_mutex_lock(&client->mutex);
|
||||
vitastor_c_write(client->proxy, inode, offset, bytes, 0, iov->iov, iov->niov, vitastor_co_generic_cb, &task);
|
||||
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
|
||||
vitastor_schedule_uring_handler(client);
|
||||
#endif
|
||||
qemu_mutex_unlock(&client->mutex);
|
||||
|
||||
while (!task.complete)
|
||||
|
@ -863,7 +869,9 @@ static int coroutine_fn vitastor_co_block_status(
|
|||
task.bitmap = client->last_bitmap = NULL;
|
||||
qemu_mutex_lock(&client->mutex);
|
||||
vitastor_c_read_bitmap(client->proxy, task.inode, task.offset, task.len, !client->skip_parents, vitastor_co_read_bitmap_cb, &task);
|
||||
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
|
||||
vitastor_schedule_uring_handler(client);
|
||||
#endif
|
||||
qemu_mutex_unlock(&client->mutex);
|
||||
while (!task.complete)
|
||||
{
|
||||
|
@ -950,7 +958,9 @@ static int coroutine_fn vitastor_co_flush(BlockDriverState *bs)
|
|||
|
||||
qemu_mutex_lock(&client->mutex);
|
||||
vitastor_c_sync(client->proxy, vitastor_co_generic_cb, &task);
|
||||
#if !defined VITASTOR_C_API_VERSION || VITASTOR_C_API_VERSION < 5
|
||||
vitastor_schedule_uring_handler(client);
|
||||
#endif
|
||||
qemu_mutex_unlock(&client->mutex);
|
||||
|
||||
while (!task.complete)
|
||||
|
|
|
@ -127,6 +127,7 @@ vitastor_c *vitastor_c_create_qemu_uring(QEMUSetFDHandler *aio_set_fd_handler, v
|
|||
auto self = vitastor_c_create_qemu_common(aio_set_fd_handler, aio_context);
|
||||
self->ringloop = ringloop;
|
||||
self->cli = new cluster_client_t(self->ringloop, self->tfd, cfg_json);
|
||||
ringloop->loop_continue();
|
||||
return self;
|
||||
}
|
||||
|
||||
|
@ -150,6 +151,7 @@ vitastor_c *vitastor_c_create_uring(const char *config_path, const char *etcd_ho
|
|||
self->ringloop = ringloop;
|
||||
self->epmgr = new epoll_manager_t(self->ringloop);
|
||||
self->cli = new cluster_client_t(self->ringloop, self->epmgr->tfd, cfg_json);
|
||||
ringloop->loop_continue();
|
||||
return self;
|
||||
}
|
||||
|
||||
|
@ -183,6 +185,7 @@ vitastor_c *vitastor_c_create_uring_json(const char **options, int options_len)
|
|||
self->ringloop = ringloop;
|
||||
self->epmgr = new epoll_manager_t(self->ringloop);
|
||||
self->cli = new cluster_client_t(self->ringloop, self->epmgr->tfd, cfg_json);
|
||||
ringloop->loop_continue();
|
||||
return self;
|
||||
}
|
||||
|
||||
|
@ -228,6 +231,10 @@ void vitastor_c_on_ready(vitastor_c *client, VitastorIOHandler cb, void *opaque)
|
|||
{
|
||||
cb(opaque, 0);
|
||||
});
|
||||
if (client->ringloop)
|
||||
{
|
||||
client->ringloop->loop_continue();
|
||||
}
|
||||
}
|
||||
|
||||
void vitastor_c_uring_wait_ready(vitastor_c *client)
|
||||
|
@ -284,6 +291,10 @@ void vitastor_c_read(vitastor_c *client, uint64_t inode, uint64_t offset, uint64
|
|||
delete op;
|
||||
};
|
||||
client->cli->execute(op);
|
||||
if (client->ringloop)
|
||||
{
|
||||
client->ringloop->loop_continue();
|
||||
}
|
||||
}
|
||||
|
||||
void vitastor_c_write(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len, uint64_t check_version,
|
||||
|
@ -305,6 +316,10 @@ void vitastor_c_write(vitastor_c *client, uint64_t inode, uint64_t offset, uint6
|
|||
delete op;
|
||||
};
|
||||
client->cli->execute(op);
|
||||
if (client->ringloop)
|
||||
{
|
||||
client->ringloop->loop_continue();
|
||||
}
|
||||
}
|
||||
|
||||
void vitastor_c_delete(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len, uint64_t check_version,
|
||||
|
@ -322,6 +337,10 @@ void vitastor_c_delete(vitastor_c *client, uint64_t inode, uint64_t offset, uint
|
|||
delete op;
|
||||
};
|
||||
client->cli->execute(op);
|
||||
if (client->ringloop)
|
||||
{
|
||||
client->ringloop->loop_continue();
|
||||
}
|
||||
}
|
||||
|
||||
void vitastor_c_read_bitmap(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len,
|
||||
|
@ -344,6 +363,10 @@ void vitastor_c_read_bitmap(vitastor_c *client, uint64_t inode, uint64_t offset,
|
|||
delete op;
|
||||
};
|
||||
client->cli->execute(op);
|
||||
if (client->ringloop)
|
||||
{
|
||||
client->ringloop->loop_continue();
|
||||
}
|
||||
}
|
||||
|
||||
void vitastor_c_sync(vitastor_c *client, VitastorIOHandler cb, void *opaque)
|
||||
|
@ -356,6 +379,10 @@ void vitastor_c_sync(vitastor_c *client, VitastorIOHandler cb, void *opaque)
|
|||
delete op;
|
||||
};
|
||||
client->cli->execute(op);
|
||||
if (client->ringloop)
|
||||
{
|
||||
client->ringloop->loop_continue();
|
||||
}
|
||||
}
|
||||
|
||||
void vitastor_c_watch_inode(vitastor_c *client, char *image, VitastorIOHandler cb, void *opaque)
|
||||
|
@ -365,6 +392,10 @@ void vitastor_c_watch_inode(vitastor_c *client, char *image, VitastorIOHandler c
|
|||
auto watch = client->cli->st_cli.watch_inode(std::string(image));
|
||||
cb(opaque, (long)watch);
|
||||
});
|
||||
if (client->ringloop)
|
||||
{
|
||||
client->ringloop->loop_continue();
|
||||
}
|
||||
}
|
||||
|
||||
void vitastor_c_close_watch(vitastor_c *client, void *handle)
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
#define VITASTOR_QEMU_PROXY_H
|
||||
|
||||
// C API wrapper version
|
||||
#define VITASTOR_C_API_VERSION 4
|
||||
#define VITASTOR_C_API_VERSION 5
|
||||
|
||||
#ifndef POOL_ID_BITS
|
||||
#define POOL_ID_BITS 16
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
|
||||
#include <sys/epoll.h>
|
||||
#include <sys/poll.h>
|
||||
#include <poll.h>
|
||||
#include <unistd.h>
|
||||
#include <stdexcept>
|
||||
|
||||
|
@ -74,6 +75,27 @@ void epoll_manager_t::set_fd_handler(int fd, bool wr, std::function<void(int, in
|
|||
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
|
||||
}
|
||||
epoll_handlers[fd] = handler;
|
||||
// We use edge-triggered epoll so it may miss events which already happened
|
||||
// on the FD at the moment of adding it to epoll. So check for these with poll()
|
||||
struct pollfd initpoll = { .fd = fd, .events = (short)((wr ? POLLOUT : 0) | POLLIN | POLLRDHUP) };
|
||||
int r = poll(&initpoll, 1, 0);
|
||||
if (r < 0)
|
||||
throw std::runtime_error(std::string("poll: ") + strerror(errno));
|
||||
if (r > 0)
|
||||
{
|
||||
auto events = ((initpoll.revents & POLLOUT) ? EPOLLOUT : 0) |
|
||||
((initpoll.revents & POLLIN) ? EPOLLIN : 0) |
|
||||
((initpoll.revents & POLLRDHUP) ? EPOLLRDHUP : 0);
|
||||
tfd->set_timer_us(1, false, [this, fd, events](int)
|
||||
{
|
||||
auto cb_it = epoll_handlers.find(fd);
|
||||
if (cb_it != epoll_handlers.end())
|
||||
{
|
||||
auto & cb = cb_it->second;
|
||||
cb(fd, events);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
|
@ -29,6 +29,7 @@ ring_loop_t::ring_loop_t(int qd, bool multithreaded)
|
|||
{
|
||||
free_ring_data[i] = i;
|
||||
}
|
||||
in_loop = false;
|
||||
}
|
||||
|
||||
ring_loop_t::~ring_loop_t()
|
||||
|
@ -86,6 +87,11 @@ io_uring_sqe* ring_loop_t::get_sqe()
|
|||
|
||||
void ring_loop_t::loop()
|
||||
{
|
||||
if (in_loop)
|
||||
{
|
||||
return;
|
||||
}
|
||||
in_loop = true;
|
||||
if (ring_eventfd >= 0)
|
||||
{
|
||||
// Reset eventfd counter
|
||||
|
@ -125,6 +131,17 @@ void ring_loop_t::loop()
|
|||
}
|
||||
io_uring_cqe_seen(&ring, cqe);
|
||||
}
|
||||
in_loop = false;
|
||||
loop_continue();
|
||||
}
|
||||
|
||||
void ring_loop_t::loop_continue()
|
||||
{
|
||||
if (in_loop)
|
||||
{
|
||||
return;
|
||||
}
|
||||
in_loop = true;
|
||||
do
|
||||
{
|
||||
loop_again = false;
|
||||
|
@ -140,6 +157,7 @@ void ring_loop_t::loop()
|
|||
}
|
||||
}
|
||||
} while (loop_again);
|
||||
in_loop = false;
|
||||
}
|
||||
|
||||
unsigned ring_loop_t::save()
|
||||
|
@ -189,5 +207,7 @@ int ring_loop_t::register_eventfd()
|
|||
ring_eventfd = -1;
|
||||
return r;
|
||||
}
|
||||
// Loop once to prevent skipping events happened before eventfd was registered
|
||||
loop();
|
||||
return ring_eventfd;
|
||||
}
|
||||
|
|
|
@ -129,6 +129,7 @@ class ring_loop_t
|
|||
bool mt;
|
||||
int *free_ring_data;
|
||||
unsigned free_ring_data_ptr;
|
||||
bool in_loop;
|
||||
bool loop_again;
|
||||
struct io_uring ring;
|
||||
int ring_eventfd = -1;
|
||||
|
@ -164,6 +165,7 @@ public:
|
|||
}
|
||||
|
||||
void loop();
|
||||
void loop_continue();
|
||||
void wakeup();
|
||||
|
||||
unsigned save();
|
||||
|
|
Loading…
Reference in New Issue