forked from vitalif/vitastor
Reserve journal space for stabilize requests dynamically to prevent stalls
parent
3fd873d263
commit
cf36445359
|
@ -274,7 +274,7 @@ class blockstore_impl_t
|
||||||
blockstore_dirty_db_t dirty_db;
|
blockstore_dirty_db_t dirty_db;
|
||||||
std::vector<blockstore_op_t*> submit_queue;
|
std::vector<blockstore_op_t*> submit_queue;
|
||||||
std::vector<obj_ver_id> unsynced_big_writes, unsynced_small_writes;
|
std::vector<obj_ver_id> unsynced_big_writes, unsynced_small_writes;
|
||||||
int unsynced_big_write_count = 0;
|
int unsynced_big_write_count = 0, unstable_unsynced = 0;
|
||||||
int unsynced_queued_ops = 0;
|
int unsynced_queued_ops = 0;
|
||||||
allocator *data_alloc = NULL;
|
allocator *data_alloc = NULL;
|
||||||
uint8_t *zero_object;
|
uint8_t *zero_object;
|
||||||
|
|
|
@ -145,6 +145,7 @@ journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type,
|
||||||
journal.sector_info[journal.cur_sector].offset = journal.next_free;
|
journal.sector_info[journal.cur_sector].offset = journal.next_free;
|
||||||
journal.in_sector_pos = 0;
|
journal.in_sector_pos = 0;
|
||||||
journal.next_free = (journal.next_free+journal.block_size) < journal.len ? journal.next_free + journal.block_size : journal.block_size;
|
journal.next_free = (journal.next_free+journal.block_size) < journal.len ? journal.next_free + journal.block_size : journal.block_size;
|
||||||
|
assert(journal.next_free != journal.used_start);
|
||||||
memset(journal.inmemory
|
memset(journal.inmemory
|
||||||
? (uint8_t*)journal.buffer + journal.sector_info[journal.cur_sector].offset
|
? (uint8_t*)journal.buffer + journal.sector_info[journal.cur_sector].offset
|
||||||
: (uint8_t*)journal.sector_buf + journal.block_size*journal.cur_sector, 0, journal.block_size);
|
: (uint8_t*)journal.sector_buf + journal.block_size*journal.cur_sector, 0, journal.block_size);
|
||||||
|
|
|
@ -13,12 +13,6 @@
|
||||||
#define JOURNAL_BUFFER_SIZE 4*1024*1024
|
#define JOURNAL_BUFFER_SIZE 4*1024*1024
|
||||||
#define JOURNAL_ENTRY_HEADER_SIZE 16
|
#define JOURNAL_ENTRY_HEADER_SIZE 16
|
||||||
|
|
||||||
// We reserve some extra space for future stabilize requests during writes
|
|
||||||
// FIXME: This value should be dynamic i.e. Blockstore ideally shouldn't allow
|
|
||||||
// writing more than can be stabilized afterwards
|
|
||||||
#define JOURNAL_STABILIZE_RESERVATION 65536
|
|
||||||
#define JOURNAL_INSTANT_RESERVATION 131072
|
|
||||||
|
|
||||||
// Journal entries
|
// Journal entries
|
||||||
// Journal entries are linked to each other by their crc32 value
|
// Journal entries are linked to each other by their crc32 value
|
||||||
// The journal is almost a blockchain, because object versions constantly increase
|
// The journal is almost a blockchain, because object versions constantly increase
|
||||||
|
|
|
@ -86,14 +86,15 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
|
||||||
auto & dirty_entry = dirty_db.at(sbw);
|
auto & dirty_entry = dirty_db.at(sbw);
|
||||||
uint64_t dyn_size = dsk.dirty_dyn_size(dirty_entry.offset, dirty_entry.len);
|
uint64_t dyn_size = dsk.dirty_dyn_size(dirty_entry.offset, dirty_entry.len);
|
||||||
if (!space_check.check_available(op, 1, sizeof(journal_entry_big_write) + dyn_size,
|
if (!space_check.check_available(op, 1, sizeof(journal_entry_big_write) + dyn_size,
|
||||||
left == 0 ? JOURNAL_STABILIZE_RESERVATION : 0))
|
(unstable_writes.size()+unstable_unsynced)*journal.block_size))
|
||||||
{
|
{
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (!space_check.check_available(op, PRIV(op)->sync_big_writes.size(),
|
else if (!space_check.check_available(op, PRIV(op)->sync_big_writes.size(),
|
||||||
sizeof(journal_entry_big_write) + dsk.clean_entry_bitmap_size, JOURNAL_STABILIZE_RESERVATION))
|
sizeof(journal_entry_big_write) + dsk.clean_entry_bitmap_size,
|
||||||
|
(unstable_writes.size()+unstable_unsynced)*journal.block_size))
|
||||||
{
|
{
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -184,6 +185,11 @@ void blockstore_impl_t::ack_sync(blockstore_op_t *op)
|
||||||
{
|
{
|
||||||
mark_stable(dirty_it->first);
|
mark_stable(dirty_it->first);
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
unstable_unsynced--;
|
||||||
|
assert(unstable_unsynced >= 0);
|
||||||
|
}
|
||||||
dirty_it++;
|
dirty_it++;
|
||||||
while (dirty_it != dirty_db.end() && dirty_it->first.oid == it->oid)
|
while (dirty_it != dirty_db.end() && dirty_it->first.oid == it->oid)
|
||||||
{
|
{
|
||||||
|
@ -214,6 +220,11 @@ void blockstore_impl_t::ack_sync(blockstore_op_t *op)
|
||||||
{
|
{
|
||||||
mark_stable(*it);
|
mark_stable(*it);
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
unstable_unsynced--;
|
||||||
|
assert(unstable_unsynced >= 0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
op->retval = 0;
|
op->retval = 0;
|
||||||
|
|
|
@ -320,7 +320,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
|
||||||
blockstore_journal_check_t space_check(this);
|
blockstore_journal_check_t space_check(this);
|
||||||
if (!space_check.check_available(op, unsynced_big_write_count + 1,
|
if (!space_check.check_available(op, unsynced_big_write_count + 1,
|
||||||
sizeof(journal_entry_big_write) + dsk.clean_dyn_size,
|
sizeof(journal_entry_big_write) + dsk.clean_dyn_size,
|
||||||
(dirty_it->second.state & BS_ST_INSTANT) ? JOURNAL_INSTANT_RESERVATION : JOURNAL_STABILIZE_RESERVATION))
|
(unstable_writes.size()+unstable_unsynced)*journal.block_size))
|
||||||
{
|
{
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -386,6 +386,10 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
|
||||||
sqe, dsk.data_fd, PRIV(op)->iov_zerofill, vcnt, dsk.data_offset + (loc << dsk.block_order) + op->offset - stripe_offset
|
sqe, dsk.data_fd, PRIV(op)->iov_zerofill, vcnt, dsk.data_offset + (loc << dsk.block_order) + op->offset - stripe_offset
|
||||||
);
|
);
|
||||||
PRIV(op)->pending_ops = 1;
|
PRIV(op)->pending_ops = 1;
|
||||||
|
if (immediate_commit != IMMEDIATE_ALL && !(dirty_it->second.state & BS_ST_INSTANT))
|
||||||
|
{
|
||||||
|
unstable_unsynced++;
|
||||||
|
}
|
||||||
if (immediate_commit != IMMEDIATE_ALL)
|
if (immediate_commit != IMMEDIATE_ALL)
|
||||||
{
|
{
|
||||||
// Increase the counter, but don't save into unsynced_writes yet (can't sync until the write is finished)
|
// Increase the counter, but don't save into unsynced_writes yet (can't sync until the write is finished)
|
||||||
|
@ -408,7 +412,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
|
||||||
sizeof(journal_entry_big_write) + dsk.clean_dyn_size, 0)
|
sizeof(journal_entry_big_write) + dsk.clean_dyn_size, 0)
|
||||||
|| !space_check.check_available(op, 1,
|
|| !space_check.check_available(op, 1,
|
||||||
sizeof(journal_entry_small_write) + dyn_size,
|
sizeof(journal_entry_small_write) + dyn_size,
|
||||||
op->len + ((dirty_it->second.state & BS_ST_INSTANT) ? JOURNAL_INSTANT_RESERVATION : JOURNAL_STABILIZE_RESERVATION)))
|
(unstable_writes.size()+unstable_unsynced)*journal.block_size))
|
||||||
{
|
{
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -499,6 +503,11 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
|
||||||
if (journal.next_free >= journal.len)
|
if (journal.next_free >= journal.len)
|
||||||
{
|
{
|
||||||
journal.next_free = dsk.journal_block_size;
|
journal.next_free = dsk.journal_block_size;
|
||||||
|
assert(journal.next_free != journal.used_start);
|
||||||
|
}
|
||||||
|
if (immediate_commit == IMMEDIATE_NONE && !(dirty_it->second.state & BS_ST_INSTANT))
|
||||||
|
{
|
||||||
|
unstable_unsynced++;
|
||||||
}
|
}
|
||||||
if (!PRIV(op)->pending_ops)
|
if (!PRIV(op)->pending_ops)
|
||||||
{
|
{
|
||||||
|
@ -538,7 +547,7 @@ resume_2:
|
||||||
uint64_t dyn_size = dsk.dirty_dyn_size(op->offset, op->len);
|
uint64_t dyn_size = dsk.dirty_dyn_size(op->offset, op->len);
|
||||||
blockstore_journal_check_t space_check(this);
|
blockstore_journal_check_t space_check(this);
|
||||||
if (!space_check.check_available(op, 1, sizeof(journal_entry_big_write) + dyn_size,
|
if (!space_check.check_available(op, 1, sizeof(journal_entry_big_write) + dyn_size,
|
||||||
((dirty_it->second.state & BS_ST_INSTANT) ? JOURNAL_INSTANT_RESERVATION : JOURNAL_STABILIZE_RESERVATION)))
|
(unstable_writes.size()+unstable_unsynced)*journal.block_size))
|
||||||
{
|
{
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -582,14 +591,20 @@ resume_4:
|
||||||
#endif
|
#endif
|
||||||
bool is_big = (dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE;
|
bool is_big = (dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE;
|
||||||
bool imm = is_big ? (immediate_commit == IMMEDIATE_ALL) : (immediate_commit != IMMEDIATE_NONE);
|
bool imm = is_big ? (immediate_commit == IMMEDIATE_ALL) : (immediate_commit != IMMEDIATE_NONE);
|
||||||
|
bool is_instant = ((dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_DELETE || (dirty_it->second.state & BS_ST_INSTANT));
|
||||||
if (imm)
|
if (imm)
|
||||||
{
|
{
|
||||||
auto & unstab = unstable_writes[op->oid];
|
auto & unstab = unstable_writes[op->oid];
|
||||||
unstab = unstab < op->version ? op->version : unstab;
|
unstab = unstab < op->version ? op->version : unstab;
|
||||||
}
|
}
|
||||||
|
else if (!is_instant)
|
||||||
|
{
|
||||||
|
unstable_unsynced--;
|
||||||
|
assert(unstable_unsynced >= 0);
|
||||||
|
}
|
||||||
dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK)
|
dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK)
|
||||||
| (imm ? BS_ST_SYNCED : BS_ST_WRITTEN);
|
| (imm ? BS_ST_SYNCED : BS_ST_WRITTEN);
|
||||||
if (imm && ((dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_DELETE || (dirty_it->second.state & BS_ST_INSTANT)))
|
if (imm && is_instant)
|
||||||
{
|
{
|
||||||
// Deletions and 'instant' operations are treated as immediately stable
|
// Deletions and 'instant' operations are treated as immediately stable
|
||||||
mark_stable(dirty_it->first);
|
mark_stable(dirty_it->first);
|
||||||
|
@ -735,7 +750,7 @@ int blockstore_impl_t::dequeue_del(blockstore_op_t *op)
|
||||||
});
|
});
|
||||||
assert(dirty_it != dirty_db.end());
|
assert(dirty_it != dirty_db.end());
|
||||||
blockstore_journal_check_t space_check(this);
|
blockstore_journal_check_t space_check(this);
|
||||||
if (!space_check.check_available(op, 1, sizeof(journal_entry_del), JOURNAL_INSTANT_RESERVATION))
|
if (!space_check.check_available(op, 1, sizeof(journal_entry_del), (unstable_writes.size()+unstable_unsynced)*journal.block_size))
|
||||||
{
|
{
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue