First implementation of journal trimming

In theory it's possible to start testing blockstore at this point!
blocking-uring-test
Vitaliy Filippov 2019-11-15 14:09:41 +03:00
parent c7d9dc027b
commit a4aaa3c7c7
6 changed files with 94 additions and 19 deletions

View File

@ -160,6 +160,7 @@ void blockstore::loop()
} }
} }
} }
flusher->loop();
} }
} }

View File

@ -4,10 +4,13 @@ journal_flusher_t::journal_flusher_t(int flusher_count, blockstore *bs)
{ {
this->bs = bs; this->bs = bs;
this->flusher_count = flusher_count; this->flusher_count = flusher_count;
this->active_flushers = 0; active_flushers = 0;
this->active_until_sync = 0; active_until_sync = 0;
this->sync_required = true; sync_required = true;
this->sync_threshold = flusher_count == 1 ? 1 : flusher_count/2; sync_threshold = flusher_count == 1 ? 1 : flusher_count/2;
journal_trim_interval = sync_threshold;
journal_trim_counter = 0;
journal_superblock = (uint8_t*)memalign(512, 512);
co = new journal_flusher_co[flusher_count]; co = new journal_flusher_co[flusher_count];
for (int i = 0; i < flusher_count; i++) for (int i = 0; i < flusher_count; i++)
{ {
@ -31,6 +34,7 @@ journal_flusher_co::journal_flusher_co()
journal_flusher_t::~journal_flusher_t() journal_flusher_t::~journal_flusher_t()
{ {
free(journal_superblock);
delete[] co; delete[] co;
} }
@ -110,6 +114,10 @@ void journal_flusher_co::loop()
goto resume_10; goto resume_10;
else if (wait_state == 11) else if (wait_state == 11)
goto resume_11; goto resume_11;
else if (wait_state == 12)
goto resume_12;
else if (wait_state == 13)
goto resume_13;
resume_0: resume_0:
if (!flusher->flush_queue.size()) if (!flusher->flush_queue.size())
return; return;
@ -276,8 +284,8 @@ resume_0:
.oid = cur.oid, .oid = cur.oid,
.version = cur.version, .version = cur.version,
}; };
// I consider unordered writes to data & metadata safe here, because // I consider unordered writes to data & metadata safe here
// "dirty" entries always override "clean" entries in our case // BUT it requires that journal entries even older than clean_db should be replayed after restart
await_sqe(6); await_sqe(6);
data->iov = (struct iovec){ meta_it->second.buf, 512 }; data->iov = (struct iovec){ meta_it->second.buf, 512 };
data->callback = simple_callback; data->callback = simple_callback;
@ -373,7 +381,57 @@ resume_0:
if (dirty_it->first.oid != cur.oid) if (dirty_it->first.oid != cur.oid)
dirty_it++; dirty_it++;
bs->dirty_db.erase(dirty_it, std::next(dirty_end)); bs->dirty_db.erase(dirty_it, std::next(dirty_end));
// FIXME: ...and clear unused part of the journal (with some interval, not for every flushed op) // Clear unused part of the journal every <journal_trim_interval> flushes
if (!((++flusher->journal_trim_counter) % flusher->journal_trim_interval))
{
flusher->journal_trim_counter = 0;
journal_used_it = bs->journal.used_sectors.lower_bound(bs->journal.used_start);
if (journal_used_it == bs->journal.used_sectors.end())
{
// Journal is cleared to its end, restart from the beginning
journal_used_it = bs->journal.used_sectors.begin();
if (journal_used_it == bs->journal.used_sectors.end())
{
// Journal is empty
bs->journal.used_start = bs->journal.next_free;
}
else
{
bs->journal.used_start = journal_used_it->first;
}
}
else if (journal_used_it->first > bs->journal.used_start)
{
// Journal is cleared up to <journal_used_it>
bs->journal.used_start = journal_used_it->first;
}
else
{
// Can't trim journal
goto do_not_trim;
}
// Update journal "superblock"
await_sqe(12);
data->callback = simple_callback;
*((journal_entry_start*)flusher->journal_superblock) = {
.crc32 = 0,
.magic = JOURNAL_MAGIC,
.type = JE_START,
.size = sizeof(journal_entry_start),
.reserved = 0,
.journal_start = bs->journal.used_start,
};
((journal_entry_start*)flusher->journal_superblock)->crc32 = je_crc32((journal_entry*)flusher->journal_superblock);
data->iov = (struct iovec){ flusher->journal_superblock, 512 };
io_uring_prep_writev(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset);
wait_count++;
wait_state = 13;
resume_13:
if (wait_count > 0)
return;
}
do_not_trim:
// All done
wait_state = 0; wait_state = 0;
flusher->active_flushers--; flusher->active_flushers--;
repeat_it = flusher->sync_to_repeat.find(cur.oid); repeat_it = flusher->sync_to_repeat.find(cur.oid);

View File

@ -37,6 +37,7 @@ class journal_flusher_co
uint64_t offset, len, submit_len, clean_loc, meta_sector, meta_pos; uint64_t offset, len, submit_len, clean_loc, meta_sector, meta_pos;
std::map<uint64_t, meta_sector_t>::iterator meta_it; std::map<uint64_t, meta_sector_t>::iterator meta_it;
std::map<object_id, uint64_t>::iterator repeat_it; std::map<object_id, uint64_t>::iterator repeat_it;
std::map<uint64_t, uint64_t>::iterator journal_used_it;
std::function<void(ring_data_t*)> simple_callback; std::function<void(ring_data_t*)> simple_callback;
std::list<flusher_sync_t>::iterator cur_sync; std::list<flusher_sync_t>::iterator cur_sync;
friend class journal_flusher_t; friend class journal_flusher_t;
@ -55,6 +56,9 @@ class journal_flusher_t
blockstore *bs; blockstore *bs;
friend class journal_flusher_co; friend class journal_flusher_co;
int journal_trim_counter, journal_trim_interval;
uint8_t* journal_superblock;
int active_flushers, active_until_sync; int active_flushers, active_until_sync;
std::list<flusher_sync_t> syncs; std::list<flusher_sync_t> syncs;
std::map<object_id, uint64_t> sync_to_repeat; std::map<object_id, uint64_t> sync_to_repeat;

View File

@ -142,8 +142,9 @@ void blockstore_init_journal::handle_event(ring_data_t *data)
throw new std::runtime_error("first entry of the journal is corrupt"); throw new std::runtime_error("first entry of the journal is corrupt");
} }
journal_pos = bs->journal.used_start = je->journal_start; journal_pos = bs->journal.used_start = je->journal_start;
crc32_last = je->crc32_replaced; crc32_last = 0;
step = 2; step = 2;
started = false;
} }
} }
else if (step == 2 || step == 3) else if (step == 2 || step == 3)
@ -271,7 +272,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
{ {
journal_entry *je = (journal_entry*)((uint8_t*)buf + total_pos + pos); journal_entry *je = (journal_entry*)((uint8_t*)buf + total_pos + pos);
if (je->magic != JOURNAL_MAGIC || je_crc32(je) != je->crc32 || if (je->magic != JOURNAL_MAGIC || je_crc32(je) != je->crc32 ||
je->type < JE_SMALL_WRITE || je->type > JE_DELETE || je->crc32_prev != crc32_last) je->type < JE_SMALL_WRITE || je->type > JE_DELETE || started && je->crc32_prev != crc32_last)
{ {
if (pos == 0) if (pos == 0)
{ {
@ -286,7 +287,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
break; break;
} }
} }
bs->journal.used_sectors[total_pos]++; started = true;
pos += je->size; pos += je->size;
crc32_last = je->crc32; crc32_last = je->crc32;
if (je->type == JE_SMALL_WRITE) if (je->type == JE_SMALL_WRITE)
@ -306,10 +307,11 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
// FIXME: OOPS. Please don't modify total_pos here // FIXME: OOPS. Please don't modify total_pos here
total_pos += je->small_write.len; total_pos += je->small_write.len;
} }
bs->dirty_db.emplace((obj_ver_id){ obj_ver_id ov = {
.oid = je->small_write.oid, .oid = je->small_write.oid,
.version = je->small_write.version, .version = je->small_write.version,
}, (dirty_entry){ };
bs->dirty_db.emplace(ov, (dirty_entry){
.state = ST_J_SYNCED, .state = ST_J_SYNCED,
.flags = 0, .flags = 0,
.location = location, .location = location,
@ -317,14 +319,17 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
.len = je->small_write.len, .len = je->small_write.len,
.journal_sector = total_pos, .journal_sector = total_pos,
}); });
bs->journal.used_sectors[total_pos]++;
bs->flusher->queue_flush(ov);
} }
else if (je->type == JE_BIG_WRITE) else if (je->type == JE_BIG_WRITE)
{ {
// oid, version, block // oid, version, block
bs->dirty_db.emplace((obj_ver_id){ obj_ver_id ov = {
.oid = je->big_write.oid, .oid = je->big_write.oid,
.version = je->big_write.version, .version = je->big_write.version,
}, (dirty_entry){ };
bs->dirty_db.emplace(ov, (dirty_entry){
.state = ST_D_META_SYNCED, .state = ST_D_META_SYNCED,
.flags = 0, .flags = 0,
.location = je->big_write.location, .location = je->big_write.location,
@ -332,14 +337,17 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
.len = bs->block_size, .len = bs->block_size,
.journal_sector = total_pos, .journal_sector = total_pos,
}); });
bs->journal.used_sectors[total_pos]++;
bs->flusher->queue_flush(ov);
} }
else if (je->type == JE_STABLE) else if (je->type == JE_STABLE)
{ {
// oid, version // oid, version
auto it = bs->dirty_db.find((obj_ver_id){ obj_ver_id ov = {
.oid = je->stable.oid, .oid = je->stable.oid,
.version = je->stable.version, .version = je->stable.version,
}); };
auto it = bs->dirty_db.find(ov);
if (it == bs->dirty_db.end()) if (it == bs->dirty_db.end())
{ {
// journal contains a legitimate STABLE entry for a non-existing dirty write // journal contains a legitimate STABLE entry for a non-existing dirty write
@ -356,10 +364,11 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
else if (je->type == JE_DELETE) else if (je->type == JE_DELETE)
{ {
// oid, version // oid, version
bs->dirty_db.emplace((obj_ver_id){ obj_ver_id ov = {
.oid = je->del.oid, .oid = je->del.oid,
.version = je->del.version, .version = je->del.version,
}, (dirty_entry){ };
bs->dirty_db.emplace(ov, (dirty_entry){
.state = ST_DEL_SYNCED, .state = ST_DEL_SYNCED,
.flags = 0, .flags = 0,
.location = 0, .location = 0,
@ -367,6 +376,8 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
.len = 0, .len = 0,
.journal_sector = total_pos, .journal_sector = total_pos,
}); });
bs->journal.used_sectors[total_pos]++;
bs->flusher->queue_flush(ov);
} }
} }
} }

View File

@ -19,6 +19,7 @@ class blockstore_init_journal
uint8_t *journal_buffer = NULL; uint8_t *journal_buffer = NULL;
int step = 0; int step = 0;
uint32_t crc32_last = 0; uint32_t crc32_last = 0;
bool started = false;
uint64_t done_pos = 0, journal_pos = 0; uint64_t done_pos = 0, journal_pos = 0;
uint64_t cur_skip = 0; uint64_t cur_skip = 0;
bool wrapped = false; bool wrapped = false;

View File

@ -22,7 +22,7 @@ struct __attribute__((__packed__)) journal_entry_start
uint16_t magic; uint16_t magic;
uint16_t type; uint16_t type;
uint32_t size; uint32_t size;
uint32_t crc32_replaced; uint32_t reserved;
uint64_t journal_start; uint64_t journal_start;
}; };