forked from vitalif/vitastor
Remember writes as "unsynced" only after completing them
Previously BS_OP_SYNC could take unfinished writes and add them into the journal before they were actually completed. This was leading to crashes with the message "BUG: Unexpected dirty_entry 2000000000001:9f2a0000 v3 unstable state during flush: 338"rel-0.5
parent
0949f08407
commit
b0ad1e1e6d
|
@ -646,7 +646,7 @@ bool journal_flusher_co::scan_dirty(int wait_base)
|
||||||
{
|
{
|
||||||
char err[1024];
|
char err[1024];
|
||||||
snprintf(
|
snprintf(
|
||||||
err, 1024, "BUG: Unexpected dirty_entry %lx:%lx v%lu unstable state during flush: %d",
|
err, 1024, "BUG: Unexpected dirty_entry %lx:%lx v%lu unstable state during flush: 0x%x",
|
||||||
dirty_it->first.oid.inode, dirty_it->first.oid.stripe, dirty_it->first.version, dirty_it->second.state
|
dirty_it->first.oid.inode, dirty_it->first.oid.stripe, dirty_it->first.version, dirty_it->second.state
|
||||||
);
|
);
|
||||||
throw std::runtime_error(err);
|
throw std::runtime_error(err);
|
||||||
|
|
|
@ -210,6 +210,7 @@ class blockstore_impl_t
|
||||||
blockstore_dirty_db_t dirty_db;
|
blockstore_dirty_db_t dirty_db;
|
||||||
std::vector<blockstore_op_t*> submit_queue;
|
std::vector<blockstore_op_t*> submit_queue;
|
||||||
std::vector<obj_ver_id> unsynced_big_writes, unsynced_small_writes;
|
std::vector<obj_ver_id> unsynced_big_writes, unsynced_small_writes;
|
||||||
|
int unsynced_big_write_count = 0;
|
||||||
allocator *data_alloc = NULL;
|
allocator *data_alloc = NULL;
|
||||||
uint8_t *zero_object;
|
uint8_t *zero_object;
|
||||||
|
|
||||||
|
|
|
@ -24,6 +24,7 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op, bool queue_has_in_prog
|
||||||
if (PRIV(op)->op_state == 0)
|
if (PRIV(op)->op_state == 0)
|
||||||
{
|
{
|
||||||
stop_sync_submitted = false;
|
stop_sync_submitted = false;
|
||||||
|
unsynced_big_write_count -= unsynced_big_writes.size();
|
||||||
PRIV(op)->sync_big_writes.swap(unsynced_big_writes);
|
PRIV(op)->sync_big_writes.swap(unsynced_big_writes);
|
||||||
PRIV(op)->sync_small_writes.swap(unsynced_small_writes);
|
PRIV(op)->sync_small_writes.swap(unsynced_small_writes);
|
||||||
PRIV(op)->sync_small_checked = 0;
|
PRIV(op)->sync_small_checked = 0;
|
||||||
|
|
|
@ -201,7 +201,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
|
||||||
if ((dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE)
|
if ((dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE)
|
||||||
{
|
{
|
||||||
blockstore_journal_check_t space_check(this);
|
blockstore_journal_check_t space_check(this);
|
||||||
if (!space_check.check_available(op, unsynced_big_writes.size() + 1, sizeof(journal_entry_big_write), JOURNAL_STABILIZE_RESERVATION))
|
if (!space_check.check_available(op, unsynced_big_write_count + 1, sizeof(journal_entry_big_write), JOURNAL_STABILIZE_RESERVATION))
|
||||||
{
|
{
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -250,11 +250,8 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
|
||||||
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0;
|
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0;
|
||||||
if (immediate_commit != IMMEDIATE_ALL)
|
if (immediate_commit != IMMEDIATE_ALL)
|
||||||
{
|
{
|
||||||
// Remember big write as unsynced
|
// Increase the counter, but don't save into unsynced_writes yet (can't sync until the write is finished)
|
||||||
unsynced_big_writes.push_back((obj_ver_id){
|
unsynced_big_write_count++;
|
||||||
.oid = op->oid,
|
|
||||||
.version = op->version,
|
|
||||||
});
|
|
||||||
PRIV(op)->op_state = 3;
|
PRIV(op)->op_state = 3;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -267,7 +264,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
|
||||||
// Small (journaled) write
|
// Small (journaled) write
|
||||||
// First check if the journal has sufficient space
|
// First check if the journal has sufficient space
|
||||||
blockstore_journal_check_t space_check(this);
|
blockstore_journal_check_t space_check(this);
|
||||||
if (unsynced_big_writes.size() && !space_check.check_available(op, unsynced_big_writes.size(), sizeof(journal_entry_big_write), 0)
|
if (unsynced_big_write_count && !space_check.check_available(op, unsynced_big_write_count, sizeof(journal_entry_big_write), 0)
|
||||||
|| !space_check.check_available(op, 1, sizeof(journal_entry_small_write), op->len + JOURNAL_STABILIZE_RESERVATION))
|
|| !space_check.check_available(op, 1, sizeof(journal_entry_small_write), op->len + JOURNAL_STABILIZE_RESERVATION))
|
||||||
{
|
{
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -359,14 +356,6 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
|
||||||
{
|
{
|
||||||
journal.next_free = journal_block_size;
|
journal.next_free = journal_block_size;
|
||||||
}
|
}
|
||||||
if (immediate_commit == IMMEDIATE_NONE)
|
|
||||||
{
|
|
||||||
// Remember small write as unsynced
|
|
||||||
unsynced_small_writes.push_back((obj_ver_id){
|
|
||||||
.oid = op->oid,
|
|
||||||
.version = op->version,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
if (!PRIV(op)->pending_ops)
|
if (!PRIV(op)->pending_ops)
|
||||||
{
|
{
|
||||||
PRIV(op)->op_state = 4;
|
PRIV(op)->op_state = 4;
|
||||||
|
@ -431,7 +420,7 @@ resume_2:
|
||||||
resume_4:
|
resume_4:
|
||||||
// Switch object state
|
// Switch object state
|
||||||
#ifdef BLOCKSTORE_DEBUG
|
#ifdef BLOCKSTORE_DEBUG
|
||||||
printf("Ack write %lx:%lx v%lu = state %x\n", op->oid.inode, op->oid.stripe, op->version, dirty_it->second.state);
|
printf("Ack write %lx:%lx v%lu = state 0x%x\n", op->oid.inode, op->oid.stripe, op->version, dirty_it->second.state);
|
||||||
#endif
|
#endif
|
||||||
bool imm = (dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE
|
bool imm = (dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE
|
||||||
? (immediate_commit == IMMEDIATE_ALL)
|
? (immediate_commit == IMMEDIATE_ALL)
|
||||||
|
@ -445,11 +434,31 @@ resume_4:
|
||||||
| (imm ? BS_ST_SYNCED : BS_ST_WRITTEN);
|
| (imm ? BS_ST_SYNCED : BS_ST_WRITTEN);
|
||||||
if (imm && ((dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_DELETE || (dirty_it->second.state & BS_ST_INSTANT)))
|
if (imm && ((dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_DELETE || (dirty_it->second.state & BS_ST_INSTANT)))
|
||||||
{
|
{
|
||||||
// Deletions are treated as immediately stable
|
// Deletions and 'instant' operations are treated as immediately stable
|
||||||
mark_stable(dirty_it->first);
|
mark_stable(dirty_it->first);
|
||||||
}
|
}
|
||||||
if (immediate_commit == IMMEDIATE_ALL)
|
if (!imm)
|
||||||
{
|
{
|
||||||
|
if ((dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE)
|
||||||
|
{
|
||||||
|
// Remember big write as unsynced
|
||||||
|
unsynced_big_writes.push_back((obj_ver_id){
|
||||||
|
.oid = op->oid,
|
||||||
|
.version = op->version,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Remember small write as unsynced
|
||||||
|
unsynced_small_writes.push_back((obj_ver_id){
|
||||||
|
.oid = op->oid,
|
||||||
|
.version = op->version,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (imm && (dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE)
|
||||||
|
{
|
||||||
|
// Unblock small writes
|
||||||
dirty_it++;
|
dirty_it++;
|
||||||
while (dirty_it != dirty_db.end() && dirty_it->first.oid == op->oid)
|
while (dirty_it != dirty_db.end() && dirty_it->first.oid == op->oid)
|
||||||
{
|
{
|
||||||
|
@ -583,14 +592,6 @@ int blockstore_impl_t::dequeue_del(blockstore_op_t *op)
|
||||||
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
|
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
|
||||||
PRIV(op)->pending_ops++;
|
PRIV(op)->pending_ops++;
|
||||||
}
|
}
|
||||||
else
|
|
||||||
{
|
|
||||||
// Remember delete as unsynced
|
|
||||||
unsynced_small_writes.push_back((obj_ver_id){
|
|
||||||
.oid = op->oid,
|
|
||||||
.version = op->version,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
if (!PRIV(op)->pending_ops)
|
if (!PRIV(op)->pending_ops)
|
||||||
{
|
{
|
||||||
PRIV(op)->op_state = 4;
|
PRIV(op)->op_state = 4;
|
||||||
|
|
Loading…
Reference in New Issue