Add grouping flusher class

blocking-uring-test
Vitaliy Filippov 2019-11-13 17:41:25 +03:00
parent db66b3916e
commit 75398414d1
2 changed files with 62 additions and 14 deletions

View File

@ -279,6 +279,7 @@ class blockstore
friend class blockstore_init_journal; friend class blockstore_init_journal;
friend class blockstore_journal_check_t; friend class blockstore_journal_check_t;
friend class journal_flusher_t; friend class journal_flusher_t;
friend class journal_flusher_co;
void calc_lengths(spp::sparse_hash_map<std::string, std::string> & config); void calc_lengths(spp::sparse_hash_map<std::string, std::string> & config);
void open_data(spp::sparse_hash_map<std::string, std::string> & config); void open_data(spp::sparse_hash_map<std::string, std::string> & config);

View File

@ -12,7 +12,7 @@
// 512b+4K (journal) + sync + 512b (journal) + sync + 4K (data) [+ sync?] + 512b (metadata) + sync. // 512b+4K (journal) + sync + 512b (journal) + sync + 4K (data) [+ sync?] + 512b (metadata) + sync.
// WA = 2.375. It's not the best, SSD FTL-like redirect-write with defragmentation // WA = 2.375. It's not the best, SSD FTL-like redirect-write with defragmentation
// could probably be lower even with defragmentation. But it's fixed and it's still // could probably be lower even with defragmentation. But it's fixed and it's still
// better than in Ceph. :) // better than in Ceph. :) except for HDD-only clusters, because each write results in 3 seeks.
// Stabilize big write: // Stabilize big write:
// 1) Copy metadata from the journal to the metadata device // 1) Copy metadata from the journal to the metadata device
@ -29,6 +29,7 @@
// just before submitting fsync // just before submitting fsync
// 3) it submits syncs to blockstore and peers // 3) it submits syncs to blockstore and peers
// 4) after everyone acks sync it takes the object list and sends stabilize requests to everyone // 4) after everyone acks sync it takes the object list and sends stabilize requests to everyone
// 5) after everyone acks stabilize requests it acks the client's sync operation
int blockstore::dequeue_stable(blockstore_operation *op) int blockstore::dequeue_stable(blockstore_operation *op)
{ {
@ -170,9 +171,13 @@ struct meta_sector_t
int usage_count; int usage_count;
}; };
class journal_flusher_t class journal_flusher_t;
// Journal flusher coroutine
class journal_flusher_co
{ {
blockstore *bs; blockstore *bs;
journal_flusher_t *flusher;
int wait_state, wait_count; int wait_state, wait_count;
struct io_uring_sqe *sqe; struct io_uring_sqe *sqe;
struct ring_data_t *data; struct ring_data_t *data;
@ -183,19 +188,59 @@ class journal_flusher_t
std::vector<copy_buffer_t>::iterator it; std::vector<copy_buffer_t>::iterator it;
uint64_t offset, len, submit_len, clean_loc, meta_sector, meta_pos; uint64_t offset, len, submit_len, clean_loc, meta_sector, meta_pos;
std::map<uint64_t, meta_sector_t>::iterator meta_it; std::map<uint64_t, meta_sector_t>::iterator meta_it;
friend class journal_flusher_t;
public: public:
journal_flusher_t(int flush_count);
std::map<uint64_t, meta_sector_t> meta_sectors;
std::deque<obj_ver_id> flush_queue;
void loop(); void loop();
}; };
journal_flusher_t::journal_flusher_t(int flusher_count) // Journal flusher itself
class journal_flusher_t
{ {
int flusher_count;
int active_flushers;
journal_flusher_co *co;
blockstore *bs;
friend class journal_flusher_co;
public:
std::map<uint64_t, meta_sector_t> meta_sectors;
std::deque<obj_ver_id> flush_queue;
journal_flusher_t(int flusher_count, blockstore *bs);
~journal_flusher_t();
void loop();
};
journal_flusher_t::journal_flusher_t(int flusher_count, blockstore *bs)
{
this->bs = bs;
this->flusher_count = flusher_count;
this->active_flushers = 0;
co = new journal_flusher_co[flusher_count];
for (int i = 0; i < flusher_count; i++)
{
co[i].bs = bs;
co[i].wait_state = 0;
co[i].flusher = this;
}
}
journal_flusher_t::~journal_flusher_t()
{
delete[] co;
} }
void journal_flusher_t::loop() void journal_flusher_t::loop()
{
if (!active_flushers && !flush_queue.size())
{
return;
}
for (int i = 0; i < flusher_count; i++)
{
co[i].loop();
}
}
void journal_flusher_co::loop()
{ {
// This is much better than implementing the whole function as an FSM // This is much better than implementing the whole function as an FSM
// Maybe I should consider a coroutine library like https://github.com/hnes/libaco ... // Maybe I should consider a coroutine library like https://github.com/hnes/libaco ...
@ -213,13 +258,14 @@ void journal_flusher_t::loop()
goto resume_6; goto resume_6;
else if (wait_state == 7) else if (wait_state == 7)
goto resume_7; goto resume_7;
if (!flush_queue.size()) if (!flusher->flush_queue.size())
return; return;
cur = flush_queue.front(); cur = flusher->flush_queue.front();
flush_queue.pop_front(); flusher->flush_queue.pop_front();
dirty_it = bs->dirty_db.find(cur); dirty_it = bs->dirty_db.find(cur);
if (dirty_it != bs->dirty_db.end()) if (dirty_it != bs->dirty_db.end())
{ {
flusher->active_flushers++;
v.clear(); v.clear();
wait_count = 0; wait_count = 0;
clean_loc = UINT64_MAX; clean_loc = UINT64_MAX;
@ -299,11 +345,11 @@ void journal_flusher_t::loop()
// so I'll avoid it as long as I can. // so I'll avoid it as long as I can.
meta_sector = (clean_loc / (512 / sizeof(clean_disk_entry))) * 512; meta_sector = (clean_loc / (512 / sizeof(clean_disk_entry))) * 512;
meta_pos = (clean_loc % (512 / sizeof(clean_disk_entry))); meta_pos = (clean_loc % (512 / sizeof(clean_disk_entry)));
meta_it = meta_sectors.find(meta_sector); meta_it = flusher->meta_sectors.find(meta_sector);
if (meta_it == meta_sectors.end()) if (meta_it == flusher->meta_sectors.end())
{ {
// Not in memory yet, read it // Not in memory yet, read it
meta_it = meta_sectors.emplace(meta_sector, (meta_sector_t){ meta_it = flusher->meta_sectors.emplace(meta_sector, (meta_sector_t){
.offset = meta_sector, .offset = meta_sector,
.len = 512, .len = 512,
.state = 0, // 0 = not read yet .state = 0, // 0 = not read yet
@ -387,7 +433,7 @@ void journal_flusher_t::loop()
if (meta_it->second.usage_count == 0) if (meta_it->second.usage_count == 0)
{ {
free(meta_it->second.buf); free(meta_it->second.buf);
meta_sectors.erase(meta_it); flusher->meta_sectors.erase(meta_it);
} }
for (it = v.begin(); it != v.end(); it++) for (it = v.begin(); it != v.end(); it++)
{ {
@ -395,6 +441,7 @@ void journal_flusher_t::loop()
} }
v.clear(); v.clear();
wait_state = 0; wait_state = 0;
flusher->active_flushers--;
} }
// FIXME Now sync everything // FIXME Now sync everything
} }