forked from vitalif/vitastor
Make [un]register_consumer operate on pointers, rename get_loop_again() to has_work()
parent
1e21555343
commit
9cb07d844b
|
@ -5,7 +5,7 @@ blockstore_impl_t::blockstore_impl_t(blockstore_config_t & config, ring_loop_t *
|
|||
assert(sizeof(blockstore_op_private_t) <= BS_OP_PRIVATE_DATA_SIZE);
|
||||
this->ringloop = ringloop;
|
||||
ring_consumer.loop = [this]() { loop(); };
|
||||
ringloop->register_consumer(ring_consumer);
|
||||
ringloop->register_consumer(&ring_consumer);
|
||||
initialized = 0;
|
||||
zero_object = (uint8_t*)memalign(MEM_ALIGNMENT, block_size);
|
||||
data_fd = meta_fd = journal.fd = -1;
|
||||
|
@ -36,7 +36,7 @@ blockstore_impl_t::~blockstore_impl_t()
|
|||
delete data_alloc;
|
||||
delete flusher;
|
||||
free(zero_object);
|
||||
ringloop->unregister_consumer(ring_consumer);
|
||||
ringloop->unregister_consumer(&ring_consumer);
|
||||
if (data_fd >= 0)
|
||||
close(data_fd);
|
||||
if (meta_fd >= 0 && meta_fd != data_fd)
|
||||
|
@ -205,7 +205,7 @@ void blockstore_impl_t::loop()
|
|||
{
|
||||
live = true;
|
||||
}
|
||||
queue_stall = !live && !ringloop->get_loop_again();
|
||||
queue_stall = !live && !ringloop->has_work();
|
||||
live = false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -100,7 +100,7 @@ static void bs_cleanup(struct thread_data *td)
|
|||
bsd->ringloop->loop();
|
||||
if (bsd->bs->is_safe_to_stop())
|
||||
goto safe;
|
||||
} while (bsd->ringloop->get_loop_again());
|
||||
} while (bsd->ringloop->has_work());
|
||||
bsd->ringloop->wait();
|
||||
}
|
||||
safe:
|
||||
|
|
4
osd.cpp
4
osd.cpp
|
@ -122,13 +122,13 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo
|
|||
}
|
||||
|
||||
consumer.loop = [this]() { loop(); };
|
||||
ringloop->register_consumer(consumer);
|
||||
ringloop->register_consumer(&consumer);
|
||||
}
|
||||
|
||||
osd_t::~osd_t()
|
||||
{
|
||||
delete tick_tfd;
|
||||
ringloop->unregister_consumer(consumer);
|
||||
ringloop->unregister_consumer(&consumer);
|
||||
close(epoll_fd);
|
||||
close(listen_fd);
|
||||
}
|
||||
|
|
18
ringloop.cpp
18
ringloop.cpp
|
@ -27,11 +27,10 @@ ring_loop_t::~ring_loop_t()
|
|||
io_uring_queue_exit(&ring);
|
||||
}
|
||||
|
||||
int ring_loop_t::register_consumer(ring_consumer_t & consumer)
|
||||
void ring_loop_t::register_consumer(ring_consumer_t *consumer)
|
||||
{
|
||||
consumer.number = consumers.size();
|
||||
unregister_consumer(consumer);
|
||||
consumers.push_back(consumer);
|
||||
return consumer.number;
|
||||
}
|
||||
|
||||
void ring_loop_t::wakeup()
|
||||
|
@ -39,12 +38,15 @@ void ring_loop_t::wakeup()
|
|||
loop_again = true;
|
||||
}
|
||||
|
||||
void ring_loop_t::unregister_consumer(ring_consumer_t & consumer)
|
||||
void ring_loop_t::unregister_consumer(ring_consumer_t *consumer)
|
||||
{
|
||||
if (consumer.number >= 0 && consumer.number < consumers.size())
|
||||
for (int i = 0; i < consumers.size(); i++)
|
||||
{
|
||||
consumers[consumer.number].loop = NULL;
|
||||
consumer.number = -1;
|
||||
if (consumers[i] == consumer)
|
||||
{
|
||||
consumers.erase(consumers.begin()+i, consumers.begin()+i+1);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -67,7 +69,7 @@ void ring_loop_t::loop()
|
|||
loop_again = false;
|
||||
for (int i = 0; i < consumers.size(); i++)
|
||||
{
|
||||
consumers[i].loop();
|
||||
consumers[i]->loop();
|
||||
}
|
||||
} while (loop_again);
|
||||
}
|
||||
|
|
|
@ -113,13 +113,12 @@ struct ring_data_t
|
|||
|
||||
struct ring_consumer_t
|
||||
{
|
||||
int number;
|
||||
std::function<void(void)> loop;
|
||||
};
|
||||
|
||||
class ring_loop_t
|
||||
{
|
||||
std::vector<ring_consumer_t> consumers;
|
||||
std::vector<ring_consumer_t*> consumers;
|
||||
struct ring_data_t *ring_datas;
|
||||
int *free_ring_data;
|
||||
unsigned free_ring_data_ptr;
|
||||
|
@ -128,8 +127,8 @@ class ring_loop_t
|
|||
public:
|
||||
ring_loop_t(int qd);
|
||||
~ring_loop_t();
|
||||
int register_consumer(ring_consumer_t & consumer);
|
||||
void unregister_consumer(ring_consumer_t & consumer);
|
||||
void register_consumer(ring_consumer_t *consumer);
|
||||
void unregister_consumer(ring_consumer_t *consumer);
|
||||
|
||||
inline struct io_uring_sqe* get_sqe()
|
||||
{
|
||||
|
@ -153,7 +152,7 @@ public:
|
|||
{
|
||||
return free_ring_data_ptr;
|
||||
}
|
||||
inline bool get_loop_again()
|
||||
inline bool has_work()
|
||||
{
|
||||
return loop_again;
|
||||
}
|
||||
|
|
|
@ -115,7 +115,7 @@ int main(int narg, char *args[])
|
|||
}
|
||||
};
|
||||
|
||||
ringloop->register_consumer(main_cons);
|
||||
ringloop->register_consumer(&main_cons);
|
||||
while (1)
|
||||
{
|
||||
ringloop->loop();
|
||||
|
|
|
@ -20,14 +20,14 @@ timerfd_interval::timerfd_interval(ring_loop_t *ringloop, int seconds, std::func
|
|||
throw std::runtime_error(std::string("timerfd_settime: ") + strerror(errno));
|
||||
}
|
||||
consumer.loop = [this]() { loop(); };
|
||||
ringloop->register_consumer(consumer);
|
||||
ringloop->register_consumer(&consumer);
|
||||
this->ringloop = ringloop;
|
||||
this->callback = cb;
|
||||
}
|
||||
|
||||
timerfd_interval::~timerfd_interval()
|
||||
{
|
||||
ringloop->unregister_consumer(consumer);
|
||||
ringloop->unregister_consumer(&consumer);
|
||||
close(timerfd);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue