Fix possible write stall
The stall occurred during fio Q=128 random write tests with low flusher_count (4). It was caused by flushers being unable to flush the beginning of the journal because it contained older writes to an object that also had writes in the very end of the journal, after dirty_start.sync-io-test
parent
c22e096943
commit
e6a4b634f8
|
@ -7,8 +7,8 @@ journal_flusher_t::journal_flusher_t(int flusher_count, blockstore_impl_t *bs)
|
||||||
dequeuing = false;
|
dequeuing = false;
|
||||||
active_flushers = 0;
|
active_flushers = 0;
|
||||||
syncing_flushers = 0;
|
syncing_flushers = 0;
|
||||||
sync_threshold = bs->journal_block_size / sizeof(journal_entry_stable);
|
flusher_start_threshold = bs->journal_block_size / sizeof(journal_entry_stable);
|
||||||
journal_trim_interval = sync_threshold;
|
journal_trim_interval = flusher_start_threshold;
|
||||||
journal_trim_counter = 0;
|
journal_trim_counter = 0;
|
||||||
journal_superblock = bs->journal.inmemory ? bs->journal.buffer : memalign(MEM_ALIGNMENT, bs->journal_block_size);
|
journal_superblock = bs->journal.inmemory ? bs->journal.buffer : memalign(MEM_ALIGNMENT, bs->journal_block_size);
|
||||||
co = new journal_flusher_co[flusher_count];
|
co = new journal_flusher_co[flusher_count];
|
||||||
|
@ -81,7 +81,7 @@ void journal_flusher_t::enqueue_flush(obj_ver_id ov)
|
||||||
flush_versions[ov.oid] = ov.version;
|
flush_versions[ov.oid] = ov.version;
|
||||||
flush_queue.push_back(ov.oid);
|
flush_queue.push_back(ov.oid);
|
||||||
}
|
}
|
||||||
if (!dequeuing && flush_queue.size() >= sync_threshold)
|
if (!dequeuing && flush_queue.size() >= flusher_start_threshold)
|
||||||
{
|
{
|
||||||
dequeuing = true;
|
dequeuing = true;
|
||||||
bs->ringloop->wakeup();
|
bs->ringloop->wakeup();
|
||||||
|
@ -101,19 +101,25 @@ void journal_flusher_t::unshift_flush(obj_ver_id ov)
|
||||||
flush_versions[ov.oid] = ov.version;
|
flush_versions[ov.oid] = ov.version;
|
||||||
flush_queue.push_front(ov.oid);
|
flush_queue.push_front(ov.oid);
|
||||||
}
|
}
|
||||||
if (!dequeuing && flush_queue.size() >= sync_threshold)
|
if (!dequeuing && flush_queue.size() >= flusher_start_threshold)
|
||||||
{
|
{
|
||||||
dequeuing = true;
|
dequeuing = true;
|
||||||
bs->ringloop->wakeup();
|
bs->ringloop->wakeup();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void journal_flusher_t::force_start()
|
void journal_flusher_t::request_trim()
|
||||||
{
|
{
|
||||||
dequeuing = true;
|
dequeuing = true;
|
||||||
|
trim_wanted++;
|
||||||
bs->ringloop->wakeup();
|
bs->ringloop->wakeup();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void journal_flusher_t::release_trim()
|
||||||
|
{
|
||||||
|
trim_wanted--;
|
||||||
|
}
|
||||||
|
|
||||||
#define await_sqe(label) \
|
#define await_sqe(label) \
|
||||||
resume_##label:\
|
resume_##label:\
|
||||||
sqe = bs->get_sqe();\
|
sqe = bs->get_sqe();\
|
||||||
|
@ -181,15 +187,72 @@ resume_0:
|
||||||
(bs->journal.dirty_start >= bs->journal.used_start ||
|
(bs->journal.dirty_start >= bs->journal.used_start ||
|
||||||
dirty_end->second.journal_sector < bs->journal.used_start))
|
dirty_end->second.journal_sector < bs->journal.used_start))
|
||||||
{
|
{
|
||||||
|
flusher->enqueue_flush(cur);
|
||||||
// We can't flush journal sectors that are still written to
|
// We can't flush journal sectors that are still written to
|
||||||
|
// However, as we group flushes by oid, current oid may have older writes to flush!
|
||||||
|
// And it may even block writes if we don't flush the older version
|
||||||
|
// (if it's in the beginning of the journal)...
|
||||||
|
// So first try to find an older version of the same object to flush.
|
||||||
|
bool found = false;
|
||||||
|
while (dirty_end != bs->dirty_db.begin())
|
||||||
|
{
|
||||||
|
dirty_end--;
|
||||||
|
if (dirty_end->first.oid != cur.oid)
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (!(dirty_end->second.journal_sector >= bs->journal.dirty_start &&
|
||||||
|
(bs->journal.dirty_start >= bs->journal.used_start ||
|
||||||
|
dirty_end->second.journal_sector < bs->journal.used_start)))
|
||||||
|
{
|
||||||
|
found = true;
|
||||||
|
cur.version = dirty_end->first.version;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!found)
|
||||||
|
{
|
||||||
|
// Try other objects
|
||||||
|
int search_left = flusher->flush_queue.size() - 1;
|
||||||
#ifdef BLOCKSTORE_DEBUG
|
#ifdef BLOCKSTORE_DEBUG
|
||||||
printf("Flusher overran writers - stopping\n");
|
printf("Flusher overran writers (dirty_start=%08lx) - searching for older flushes (%d left)\n", bs->journal.dirty_start, search_left);
|
||||||
|
#endif
|
||||||
|
while (search_left > 0)
|
||||||
|
{
|
||||||
|
cur.oid = flusher->flush_queue.front();
|
||||||
|
cur.version = flusher->flush_versions[cur.oid];
|
||||||
|
flusher->flush_queue.pop_front();
|
||||||
|
flusher->flush_versions.erase(cur.oid);
|
||||||
|
dirty_end = bs->dirty_db.find(cur);
|
||||||
|
if (dirty_end != bs->dirty_db.end())
|
||||||
|
{
|
||||||
|
if (dirty_end->second.journal_sector >= bs->journal.dirty_start &&
|
||||||
|
(bs->journal.dirty_start >= bs->journal.used_start ||
|
||||||
|
dirty_end->second.journal_sector < bs->journal.used_start))
|
||||||
|
{
|
||||||
|
#ifdef BLOCKSTORE_DEBUG
|
||||||
|
printf("Write %lu:%lu v%lu is too new: offset=%08lx\n", cur.oid.inode, cur.oid.stripe, cur.version, dirty_end->second.journal_sector);
|
||||||
#endif
|
#endif
|
||||||
flusher->enqueue_flush(cur);
|
flusher->enqueue_flush(cur);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
search_left--;
|
||||||
|
}
|
||||||
|
if (search_left <= 0)
|
||||||
|
{
|
||||||
|
#ifdef BLOCKSTORE_DEBUG
|
||||||
|
printf("No older flushes, stopping\n");
|
||||||
|
#endif
|
||||||
flusher->dequeuing = false;
|
flusher->dequeuing = false;
|
||||||
wait_state = 0;
|
wait_state = 0;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
repeat_it = flusher->sync_to_repeat.find(cur.oid);
|
repeat_it = flusher->sync_to_repeat.find(cur.oid);
|
||||||
if (repeat_it != flusher->sync_to_repeat.end())
|
if (repeat_it != flusher->sync_to_repeat.end())
|
||||||
{
|
{
|
||||||
|
@ -409,7 +472,7 @@ resume_1:
|
||||||
// Update clean_db and dirty_db, free old data locations
|
// Update clean_db and dirty_db, free old data locations
|
||||||
update_clean_db();
|
update_clean_db();
|
||||||
// Clear unused part of the journal every <journal_trim_interval> flushes
|
// Clear unused part of the journal every <journal_trim_interval> flushes
|
||||||
if (!((++flusher->journal_trim_counter) % flusher->journal_trim_interval))
|
if (!((++flusher->journal_trim_counter) % flusher->journal_trim_interval) || flusher->trim_wanted > 0)
|
||||||
{
|
{
|
||||||
flusher->journal_trim_counter = 0;
|
flusher->journal_trim_counter = 0;
|
||||||
if (bs->journal.trim())
|
if (bs->journal.trim())
|
||||||
|
|
|
@ -73,9 +73,10 @@ public:
|
||||||
// Journal flusher itself
|
// Journal flusher itself
|
||||||
class journal_flusher_t
|
class journal_flusher_t
|
||||||
{
|
{
|
||||||
|
int trim_wanted = 0;
|
||||||
bool dequeuing;
|
bool dequeuing;
|
||||||
int flusher_count;
|
int flusher_count;
|
||||||
int sync_threshold;
|
int flusher_start_threshold;
|
||||||
journal_flusher_co *co;
|
journal_flusher_co *co;
|
||||||
blockstore_impl_t *bs;
|
blockstore_impl_t *bs;
|
||||||
friend class journal_flusher_co;
|
friend class journal_flusher_co;
|
||||||
|
@ -96,7 +97,8 @@ public:
|
||||||
~journal_flusher_t();
|
~journal_flusher_t();
|
||||||
void loop();
|
void loop();
|
||||||
bool is_active();
|
bool is_active();
|
||||||
void force_start();
|
void request_trim();
|
||||||
|
void release_trim();
|
||||||
void enqueue_flush(obj_ver_id oid);
|
void enqueue_flush(obj_ver_id oid);
|
||||||
void unshift_flush(obj_ver_id oid);
|
void unshift_flush(obj_ver_id oid);
|
||||||
};
|
};
|
||||||
|
|
|
@ -282,6 +282,7 @@ void blockstore_impl_t::check_wait(blockstore_op_t *op)
|
||||||
#endif
|
#endif
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
flusher->release_trim();
|
||||||
PRIV(op)->wait_for = 0;
|
PRIV(op)->wait_for = 0;
|
||||||
}
|
}
|
||||||
else if (PRIV(op)->wait_for == WAIT_JOURNAL_BUFFER)
|
else if (PRIV(op)->wait_for == WAIT_JOURNAL_BUFFER)
|
||||||
|
|
|
@ -101,7 +101,7 @@ int blockstore_journal_check_t::check_available(blockstore_op_t *op, int entries
|
||||||
: bs->journal.used_start - bs->journal.next_free)
|
: bs->journal.used_start - bs->journal.next_free)
|
||||||
);
|
);
|
||||||
PRIV(op)->wait_for = WAIT_JOURNAL;
|
PRIV(op)->wait_for = WAIT_JOURNAL;
|
||||||
bs->flusher->force_start();
|
bs->flusher->request_trim();
|
||||||
PRIV(op)->wait_detail = bs->journal.used_start;
|
PRIV(op)->wait_detail = bs->journal.used_start;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -411,6 +411,10 @@ void blockstore_impl_t::release_journal_sectors(blockstore_op_t *op)
|
||||||
{
|
{
|
||||||
// We know for sure that we won't write into this sector anymore
|
// We know for sure that we won't write into this sector anymore
|
||||||
uint64_t new_ds = journal.sector_info[s-1].offset + journal.block_size;
|
uint64_t new_ds = journal.sector_info[s-1].offset + journal.block_size;
|
||||||
|
if (new_ds >= journal.len)
|
||||||
|
{
|
||||||
|
new_ds = journal.block_size;
|
||||||
|
}
|
||||||
if ((journal.dirty_start + (journal.dirty_start >= journal.used_start ? 0 : journal.len)) <
|
if ((journal.dirty_start + (journal.dirty_start >= journal.used_start ? 0 : journal.len)) <
|
||||||
(new_ds + (new_ds >= journal.used_start ? 0 : journal.len)))
|
(new_ds + (new_ds >= journal.used_start ? 0 : journal.len)))
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue