forked from vitalif/vitastor
Wakeup ring loop
parent
b6174075de
commit
8c690c76ec
|
@ -48,7 +48,7 @@ blockstore::~blockstore()
|
|||
{
|
||||
delete flusher;
|
||||
free(zero_object);
|
||||
ringloop->unregister_consumer(ring_consumer.number);
|
||||
ringloop->unregister_consumer(ring_consumer);
|
||||
if (data_fd >= 0)
|
||||
close(data_fd);
|
||||
if (meta_fd >= 0 && meta_fd != data_fd)
|
||||
|
@ -246,5 +246,6 @@ int blockstore::enqueue_op(blockstore_operation *op)
|
|||
{
|
||||
enqueue_write(op);
|
||||
}
|
||||
ringloop->wakeup(ring_consumer);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -254,6 +254,7 @@ int blockstore_init_journal::loop()
|
|||
bs->journal.crc32_last = crc32_last;
|
||||
journal_buffer = NULL;
|
||||
step = 100;
|
||||
return 0;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
|
30
ringloop.cpp
30
ringloop.cpp
|
@ -27,17 +27,22 @@ int ring_loop_t::register_consumer(ring_consumer_t & consumer)
|
|||
return consumer.number;
|
||||
}
|
||||
|
||||
void ring_loop_t::unregister_consumer(int number)
|
||||
void ring_loop_t::wakeup(ring_consumer_t & consumer)
|
||||
{
|
||||
if (number < consumers.size())
|
||||
loop_again = true;
|
||||
}
|
||||
|
||||
void ring_loop_t::unregister_consumer(ring_consumer_t & consumer)
|
||||
{
|
||||
if (consumer.number >= 0 && consumer.number < consumers.size())
|
||||
{
|
||||
consumers[number].loop = NULL;
|
||||
consumers[consumer.number].loop = NULL;
|
||||
consumer.number = -1;
|
||||
}
|
||||
}
|
||||
|
||||
void ring_loop_t::loop(bool sleep)
|
||||
void ring_loop_t::loop()
|
||||
{
|
||||
// FIXME: we should loop until all "coroutines" are suspended. currently we loop only once before sleeping
|
||||
struct io_uring_cqe *cqe;
|
||||
while (!io_uring_peek_cqe(&ring, &cqe))
|
||||
{
|
||||
|
@ -49,12 +54,13 @@ void ring_loop_t::loop(bool sleep)
|
|||
}
|
||||
io_uring_cqe_seen(&ring, cqe);
|
||||
}
|
||||
for (int i = 0; i < consumers.size(); i++)
|
||||
do
|
||||
{
|
||||
consumers[i].loop();
|
||||
}
|
||||
if (sleep)
|
||||
{
|
||||
io_uring_wait_cqe(&ring, &cqe);
|
||||
}
|
||||
loop_again = false;
|
||||
for (int i = 0; i < consumers.size(); i++)
|
||||
{
|
||||
consumers[i].loop();
|
||||
}
|
||||
} while (loop_again);
|
||||
io_uring_wait_cqe(&ring, &cqe);
|
||||
}
|
||||
|
|
|
@ -120,6 +120,7 @@ class ring_loop_t
|
|||
{
|
||||
std::vector<ring_consumer_t> consumers;
|
||||
struct ring_data_t *ring_data;
|
||||
bool loop_again;
|
||||
public:
|
||||
struct io_uring ring;
|
||||
ring_loop_t(int qd);
|
||||
|
@ -134,8 +135,9 @@ public:
|
|||
return sqe;
|
||||
}
|
||||
int register_consumer(ring_consumer_t & consumer);
|
||||
void unregister_consumer(int number);
|
||||
void loop(bool sleep);
|
||||
void wakeup(ring_consumer_t & consumer);
|
||||
void unregister_consumer(ring_consumer_t & consumer);
|
||||
void loop();
|
||||
inline int submit()
|
||||
{
|
||||
return io_uring_submit(&ring);
|
||||
|
|
|
@ -36,7 +36,7 @@ public:
|
|||
|
||||
~timerfd_interval()
|
||||
{
|
||||
ringloop->unregister_consumer(consumer.number);
|
||||
ringloop->unregister_consumer(consumer);
|
||||
close(timerfd);
|
||||
}
|
||||
|
||||
|
@ -94,16 +94,22 @@ int main(int narg, char *args[])
|
|||
{
|
||||
printf("completed %d\n", op->retval);
|
||||
};
|
||||
ring_consumer_t main_cons;
|
||||
bool bs_was_done = false;
|
||||
while (true)
|
||||
main_cons.loop = [&]()
|
||||
{
|
||||
bool bs_done = bs->is_started();
|
||||
if (bs_done && !bs_was_done)
|
||||
{
|
||||
printf("init completed\n");
|
||||
bs->enqueue_op(&op);
|
||||
bs_was_done = true;
|
||||
}
|
||||
ringloop->loop(true);
|
||||
};
|
||||
ringloop->register_consumer(main_cons);
|
||||
while (true)
|
||||
{
|
||||
ringloop->loop();
|
||||
}
|
||||
delete bs;
|
||||
delete ringloop;
|
||||
|
|
Loading…
Reference in New Issue