forked from vitalif/vitastor
Attempt journal trim even without new flushes
This is the new guaranteed unblocking method which replaces old trims in init and rollback, and also fixes a possible stall when just several writes in the beginning of the journal are flushed without triggering a subsequent trim.
parent
5fbe36198a
commit
faa5e1436f
|
@ -15,6 +15,7 @@ journal_flusher_t::journal_flusher_t(int flusher_count, blockstore_impl_t *bs)
|
|||
flusher_start_threshold = bs->journal_block_size / sizeof(journal_entry_stable);
|
||||
journal_trim_interval = 512;
|
||||
journal_trim_counter = 0;
|
||||
trim_wanted = 0;
|
||||
journal_superblock = bs->journal.inmemory ? bs->journal.buffer : memalign_or_die(MEM_ALIGNMENT, bs->journal_block_size);
|
||||
co = new journal_flusher_co[flusher_count];
|
||||
for (int i = 0; i < flusher_count; i++)
|
||||
|
@ -86,7 +87,7 @@ void journal_flusher_t::enqueue_flush(obj_ver_id ov)
|
|||
flush_versions[ov.oid] = ov.version;
|
||||
flush_queue.push_back(ov.oid);
|
||||
}
|
||||
if (!dequeuing && flush_queue.size() >= flusher_start_threshold)
|
||||
if (!dequeuing && (flush_queue.size() >= flusher_start_threshold || trim_wanted > 0))
|
||||
{
|
||||
dequeuing = true;
|
||||
bs->ringloop->wakeup();
|
||||
|
@ -106,7 +107,7 @@ void journal_flusher_t::unshift_flush(obj_ver_id ov)
|
|||
flush_versions[ov.oid] = ov.version;
|
||||
flush_queue.push_front(ov.oid);
|
||||
}
|
||||
if (!dequeuing && flush_queue.size() >= flusher_start_threshold)
|
||||
if (!dequeuing && (flush_queue.size() >= flusher_start_threshold || trim_wanted > 0))
|
||||
{
|
||||
dequeuing = true;
|
||||
bs->ringloop->wakeup();
|
||||
|
@ -120,6 +121,16 @@ void journal_flusher_t::request_trim()
|
|||
bs->ringloop->wakeup();
|
||||
}
|
||||
|
||||
void journal_flusher_t::mark_trim_possible()
|
||||
{
|
||||
if (trim_wanted > 0)
|
||||
{
|
||||
dequeuing = true;
|
||||
journal_trim_counter++;
|
||||
bs->ringloop->wakeup();
|
||||
}
|
||||
}
|
||||
|
||||
void journal_flusher_t::release_trim()
|
||||
{
|
||||
trim_wanted--;
|
||||
|
@ -183,6 +194,12 @@ bool journal_flusher_co::loop()
|
|||
resume_0:
|
||||
if (!flusher->flush_queue.size() || !flusher->dequeuing)
|
||||
{
|
||||
if (flusher->trim_wanted > 0 && flusher->journal_trim_counter > 0)
|
||||
{
|
||||
// Attempt forced trim
|
||||
flusher->active_flushers++;
|
||||
goto trim_journal;
|
||||
}
|
||||
flusher->dequeuing = false;
|
||||
wait_state = 0;
|
||||
return true;
|
||||
|
@ -307,7 +324,7 @@ resume_1:
|
|||
{
|
||||
// Nothing to flush
|
||||
bs->erase_dirty(dirty_start, std::next(dirty_end), clean_loc);
|
||||
goto trim_journal;
|
||||
goto release_oid;
|
||||
}
|
||||
if (clean_loc == UINT64_MAX)
|
||||
{
|
||||
|
@ -487,6 +504,18 @@ resume_1:
|
|||
}
|
||||
// Update clean_db and dirty_db, free old data locations
|
||||
update_clean_db();
|
||||
#ifdef BLOCKSTORE_DEBUG
|
||||
printf("Flushed %lx:%lx v%lu (%d copies, wr:%d, del:%d), %ld left\n", cur.oid.inode, cur.oid.stripe, cur.version,
|
||||
copy_count, has_writes, has_delete, flusher->flush_queue.size());
|
||||
#endif
|
||||
release_oid:
|
||||
repeat_it = flusher->sync_to_repeat.find(cur.oid);
|
||||
if (repeat_it != flusher->sync_to_repeat.end() && repeat_it->second > cur.version)
|
||||
{
|
||||
// Requeue version
|
||||
flusher->unshift_flush({ .oid = cur.oid, .version = repeat_it->second });
|
||||
}
|
||||
flusher->sync_to_repeat.erase(repeat_it);
|
||||
trim_journal:
|
||||
// Clear unused part of the journal every <journal_trim_interval> flushes
|
||||
if (!((++flusher->journal_trim_counter) % flusher->journal_trim_interval) || flusher->trim_wanted > 0)
|
||||
|
@ -545,18 +574,7 @@ resume_1:
|
|||
}
|
||||
}
|
||||
// All done
|
||||
#ifdef BLOCKSTORE_DEBUG
|
||||
printf("Flushed %lx:%lx v%lu (%d copies, wr:%d, del:%d), %ld left\n", cur.oid.inode, cur.oid.stripe, cur.version,
|
||||
copy_count, has_writes, has_delete, flusher->flush_queue.size());
|
||||
#endif
|
||||
flusher->active_flushers--;
|
||||
repeat_it = flusher->sync_to_repeat.find(cur.oid);
|
||||
if (repeat_it != flusher->sync_to_repeat.end() && repeat_it->second > cur.version)
|
||||
{
|
||||
// Requeue version
|
||||
flusher->unshift_flush({ .oid = cur.oid, .version = repeat_it->second });
|
||||
}
|
||||
flusher->sync_to_repeat.erase(repeat_it);
|
||||
wait_state = 0;
|
||||
goto resume_0;
|
||||
}
|
||||
|
|
|
@ -103,6 +103,7 @@ public:
|
|||
~journal_flusher_t();
|
||||
void loop();
|
||||
bool is_active();
|
||||
void mark_trim_possible();
|
||||
void request_trim();
|
||||
void release_trim();
|
||||
void enqueue_flush(obj_ver_id oid);
|
||||
|
|
|
@ -399,6 +399,7 @@ resume_1:
|
|||
}
|
||||
}
|
||||
}
|
||||
bs->flusher->mark_trim_possible();
|
||||
bs->journal.dirty_start = bs->journal.next_free;
|
||||
printf(
|
||||
"Journal entries loaded: %lu, free journal space: %lu bytes (%08lx..%08lx is used), free blocks: %lu / %lu\n",
|
||||
|
|
|
@ -148,6 +148,7 @@ resume_5:
|
|||
{
|
||||
mark_rolled_back(*v);
|
||||
}
|
||||
flusher->mark_trim_possible();
|
||||
// Acknowledge op
|
||||
op->retval = 0;
|
||||
FINISH_OP(op);
|
||||
|
|
Loading…
Reference in New Issue