Fix possible journal corruption caused by concurrent flushing and writing of the same journal sector

trace-sqes
Vitaliy Filippov 2020-03-07 17:36:58 +03:00
parent 1696446545
commit c863543bfe
6 changed files with 48 additions and 13 deletions

View File

@ -4,8 +4,9 @@ journal_flusher_t::journal_flusher_t(int flusher_count, blockstore_impl_t *bs)
{ {
this->bs = bs; this->bs = bs;
this->flusher_count = flusher_count; this->flusher_count = flusher_count;
dequeuing = false;
active_flushers = 0; active_flushers = 0;
sync_threshold = flusher_count == 1 ? 1 : flusher_count/2; sync_threshold = bs->journal_block_size / sizeof(journal_entry_stable);
journal_trim_interval = sync_threshold; journal_trim_interval = sync_threshold;
journal_trim_counter = 0; journal_trim_counter = 0;
journal_superblock = bs->journal.inmemory ? bs->journal.buffer : memalign(MEM_ALIGNMENT, bs->journal_block_size); journal_superblock = bs->journal.inmemory ? bs->journal.buffer : memalign(MEM_ALIGNMENT, bs->journal_block_size);
@ -55,17 +56,13 @@ journal_flusher_t::~journal_flusher_t()
bool journal_flusher_t::is_active() bool journal_flusher_t::is_active()
{ {
return active_flushers > 0 || start_forced && flush_queue.size() > 0 || flush_queue.size() >= sync_threshold; return active_flushers > 0 || dequeuing;
} }
void journal_flusher_t::loop() void journal_flusher_t::loop()
{ {
for (int i = 0; i < flusher_count; i++) for (int i = 0; (active_flushers > 0 || dequeuing) && i < flusher_count; i++)
{ {
if (!active_flushers && (start_forced ? !flush_queue.size() : (flush_queue.size() < sync_threshold)))
{
return;
}
co[i].loop(); co[i].loop();
} }
} }
@ -83,6 +80,11 @@ void journal_flusher_t::enqueue_flush(obj_ver_id ov)
flush_versions[ov.oid] = ov.version; flush_versions[ov.oid] = ov.version;
flush_queue.push_back(ov.oid); flush_queue.push_back(ov.oid);
} }
if (!dequeuing && flush_queue.size() >= sync_threshold)
{
dequeuing = true;
bs->ringloop->wakeup();
}
} }
void journal_flusher_t::unshift_flush(obj_ver_id ov) void journal_flusher_t::unshift_flush(obj_ver_id ov)
@ -98,11 +100,16 @@ void journal_flusher_t::unshift_flush(obj_ver_id ov)
flush_versions[ov.oid] = ov.version; flush_versions[ov.oid] = ov.version;
flush_queue.push_front(ov.oid); flush_queue.push_front(ov.oid);
} }
if (!dequeuing && flush_queue.size() >= sync_threshold)
{
dequeuing = true;
bs->ringloop->wakeup();
}
} }
void journal_flusher_t::force_start() void journal_flusher_t::force_start()
{ {
start_forced = true; dequeuing = true;
bs->ringloop->wakeup(); bs->ringloop->wakeup();
} }
@ -155,10 +162,9 @@ bool journal_flusher_co::loop()
else if (wait_state == 18) else if (wait_state == 18)
goto resume_18; goto resume_18;
resume_0: resume_0:
if (!flusher->flush_queue.size() || if (!flusher->flush_queue.size() || !flusher->dequeuing)
!flusher->start_forced && !flusher->active_flushers && flusher->flush_queue.size() < flusher->sync_threshold)
{ {
flusher->start_forced = false; flusher->dequeuing = false;
wait_state = 0; wait_state = 0;
return true; return true;
} }
@ -169,6 +175,16 @@ resume_0:
dirty_end = bs->dirty_db.find(cur); dirty_end = bs->dirty_db.find(cur);
if (dirty_end != bs->dirty_db.end()) if (dirty_end != bs->dirty_db.end())
{ {
if (dirty_end->second.journal_sector >= bs->journal.dirty_start &&
(bs->journal.dirty_start >= bs->journal.used_start ||
dirty_end->second.journal_sector < bs->journal.used_start))
{
// We can't flush journal sectors that are still written to
flusher->enqueue_flush(cur);
flusher->dequeuing = false;
wait_state = 0;
return true;
}
repeat_it = flusher->sync_to_repeat.find(cur.oid); repeat_it = flusher->sync_to_repeat.find(cur.oid);
if (repeat_it != flusher->sync_to_repeat.end()) if (repeat_it != flusher->sync_to_repeat.end())
{ {

View File

@ -73,7 +73,7 @@ public:
// Journal flusher itself // Journal flusher itself
class journal_flusher_t class journal_flusher_t
{ {
bool start_forced = false; bool dequeuing;
int flusher_count; int flusher_count;
int sync_threshold; int sync_threshold;
journal_flusher_co *co; journal_flusher_co *co;

View File

@ -402,6 +402,7 @@ resume_1:
} }
// Trim journal on start so we don't stall when all entries are older // Trim journal on start so we don't stall when all entries are older
bs->journal.trim(); bs->journal.trim();
bs->journal.dirty_start = bs->journal.next_free;
printf( printf(
"Journal entries loaded: %lu, free journal space: %lu bytes (%lu..%lu is used), free blocks: %lu / %lu\n", "Journal entries loaded: %lu, free journal space: %lu bytes (%lu..%lu is used), free blocks: %lu / %lu\n",
entries_loaded, entries_loaded,

View File

@ -92,6 +92,10 @@ journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type,
// Also select next sector buffer in memory // Also select next sector buffer in memory
journal.cur_sector = ((journal.cur_sector + 1) % journal.sector_count); journal.cur_sector = ((journal.cur_sector + 1) % journal.sector_count);
} }
else
{
journal.dirty_start = journal.next_free;
}
journal.sector_info[journal.cur_sector].offset = journal.next_free; journal.sector_info[journal.cur_sector].offset = journal.next_free;
journal.in_sector_pos = 0; journal.in_sector_pos = 0;
journal.next_free = (journal.next_free+journal.block_size) < journal.len ? journal.next_free + journal.block_size : journal.block_size; journal.next_free = (journal.next_free+journal.block_size) < journal.len ? journal.next_free + journal.block_size : journal.block_size;

View File

@ -137,8 +137,12 @@ struct journal_t
uint64_t block_size = 512; uint64_t block_size = 512;
uint64_t offset, len; uint64_t offset, len;
// Next free block offset
uint64_t next_free = 0; uint64_t next_free = 0;
// First occupied block offset
uint64_t used_start = 0; uint64_t used_start = 0;
// End of the last block not used for writing anymore
uint64_t dirty_start = 0;
uint32_t crc32_last = 0; uint32_t crc32_last = 0;
// Current sector(s) used for writing // Current sector(s) used for writing

View File

@ -249,7 +249,7 @@ void blockstore_impl_t::handle_write_event(ring_data_t *data, blockstore_op_t *o
void blockstore_impl_t::release_journal_sectors(blockstore_op_t *op) void blockstore_impl_t::release_journal_sectors(blockstore_op_t *op)
{ {
// Release used journal sectors // Release flushed journal sectors
if (PRIV(op)->min_flushed_journal_sector > 0 && if (PRIV(op)->min_flushed_journal_sector > 0 &&
PRIV(op)->max_flushed_journal_sector > 0) PRIV(op)->max_flushed_journal_sector > 0)
{ {
@ -257,6 +257,16 @@ void blockstore_impl_t::release_journal_sectors(blockstore_op_t *op)
while (1) while (1)
{ {
journal.sector_info[s-1].usage_count--; journal.sector_info[s-1].usage_count--;
if (s != (1+journal.cur_sector) && journal.sector_info[s-1].usage_count == 0)
{
// We know for sure that we won't write into this sector anymore
uint64_t new_ds = journal.sector_info[s-1].offset + journal.block_size;
if ((journal.dirty_start + (journal.dirty_start >= journal.used_start ? 0 : journal.len)) <
(new_ds + (new_ds >= journal.used_start ? 0 : journal.len)))
{
journal.dirty_start = new_ds;
}
}
if (s == PRIV(op)->max_flushed_journal_sector) if (s == PRIV(op)->max_flushed_journal_sector)
break; break;
s = 1 + s % journal.sector_count; s = 1 + s % journal.sector_count;