forked from vitalif/vitastor
Fix BS_OP_DELETE (the implementation was untested up to this point)
parent
af5cd45071
commit
165c204555
|
@ -275,32 +275,26 @@ resume_0:
|
|||
#endif
|
||||
flusher->active_flushers++;
|
||||
resume_1:
|
||||
// Find it in clean_db
|
||||
clean_it = bs->clean_db.find(cur.oid);
|
||||
old_clean_loc = (clean_it != bs->clean_db.end() ? clean_it->second.location : UINT64_MAX);
|
||||
// Scan dirty versions of the object
|
||||
if (!scan_dirty(1))
|
||||
{
|
||||
wait_state += 1;
|
||||
return false;
|
||||
}
|
||||
if (copy_count == 0 && clean_loc == UINT64_MAX && !has_delete && !has_empty)
|
||||
// Writes and deletes shouldn't happen at the same time
|
||||
assert(!(copy_count > 0 || has_writes) || !has_delete);
|
||||
if (copy_count == 0 && !has_writes && !has_delete || has_delete && old_clean_loc == UINT64_MAX)
|
||||
{
|
||||
// Nothing to flush
|
||||
flusher->active_flushers--;
|
||||
repeat_it = flusher->sync_to_repeat.find(cur.oid);
|
||||
if (repeat_it != flusher->sync_to_repeat.end() && repeat_it->second > cur.version)
|
||||
{
|
||||
// Requeue version
|
||||
flusher->unshift_flush({ .oid = cur.oid, .version = repeat_it->second });
|
||||
bs->erase_dirty(dirty_start, std::next(dirty_end), clean_loc);
|
||||
goto trim_journal;
|
||||
}
|
||||
flusher->sync_to_repeat.erase(repeat_it);
|
||||
wait_state = 0;
|
||||
goto resume_0;
|
||||
}
|
||||
// Find it in clean_db
|
||||
clean_it = bs->clean_db.find(cur.oid);
|
||||
old_clean_loc = (clean_it != bs->clean_db.end() ? clean_it->second.location : UINT64_MAX);
|
||||
if (clean_loc == UINT64_MAX)
|
||||
{
|
||||
if (copy_count > 0 && has_delete || old_clean_loc == UINT64_MAX)
|
||||
if (old_clean_loc == UINT64_MAX)
|
||||
{
|
||||
// Object not allocated. This is a bug.
|
||||
char err[1024];
|
||||
|
@ -471,6 +465,7 @@ resume_1:
|
|||
}
|
||||
// Update clean_db and dirty_db, free old data locations
|
||||
update_clean_db();
|
||||
trim_journal:
|
||||
// Clear unused part of the journal every <journal_trim_interval> flushes
|
||||
if (!((++flusher->journal_trim_counter) % flusher->journal_trim_interval) || flusher->trim_wanted > 0)
|
||||
{
|
||||
|
@ -530,7 +525,7 @@ bool journal_flusher_co::scan_dirty(int wait_base)
|
|||
copy_count = 0;
|
||||
clean_loc = UINT64_MAX;
|
||||
has_delete = false;
|
||||
has_empty = false;
|
||||
has_writes = false;
|
||||
skip_copy = false;
|
||||
clean_init_bitmap = false;
|
||||
while (1)
|
||||
|
@ -538,11 +533,8 @@ bool journal_flusher_co::scan_dirty(int wait_base)
|
|||
if (dirty_it->second.state == ST_J_STABLE && !skip_copy)
|
||||
{
|
||||
// First we submit all reads
|
||||
if (dirty_it->second.len == 0)
|
||||
{
|
||||
has_empty = true;
|
||||
}
|
||||
else
|
||||
has_writes = true;
|
||||
if (dirty_it->second.len != 0)
|
||||
{
|
||||
offset = dirty_it->second.offset;
|
||||
end_offset = dirty_it->second.offset + dirty_it->second.len;
|
||||
|
@ -584,6 +576,7 @@ bool journal_flusher_co::scan_dirty(int wait_base)
|
|||
else if (dirty_it->second.state == ST_D_STABLE && !skip_copy)
|
||||
{
|
||||
// There is an unflushed big write. Copy small writes in its position
|
||||
has_writes = true;
|
||||
clean_loc = dirty_it->second.location;
|
||||
clean_init_bitmap = true;
|
||||
clean_bitmap_offset = dirty_it->second.offset;
|
||||
|
|
|
@ -45,7 +45,7 @@ class journal_flusher_co
|
|||
std::map<object_id, uint64_t>::iterator repeat_it;
|
||||
std::function<void(ring_data_t*)> simple_callback_r, simple_callback_w;
|
||||
|
||||
bool skip_copy, has_delete, has_empty;
|
||||
bool skip_copy, has_delete, has_writes;
|
||||
blockstore_clean_db_t::iterator clean_it;
|
||||
std::vector<copy_buffer_t> v;
|
||||
std::vector<copy_buffer_t>::iterator it;
|
||||
|
|
|
@ -144,7 +144,7 @@ void blockstore_impl_t::loop()
|
|||
{
|
||||
dequeue_op = dequeue_read(op);
|
||||
}
|
||||
else if (op->opcode == BS_OP_WRITE || op->opcode == BS_OP_DELETE)
|
||||
else if (op->opcode == BS_OP_WRITE)
|
||||
{
|
||||
if (has_writes == 2)
|
||||
{
|
||||
|
@ -154,6 +154,16 @@ void blockstore_impl_t::loop()
|
|||
dequeue_op = dequeue_write(op);
|
||||
has_writes = dequeue_op ? 1 : 2;
|
||||
}
|
||||
else if (op->opcode == BS_OP_DELETE)
|
||||
{
|
||||
if (has_writes == 2)
|
||||
{
|
||||
// Some writes could not be submitted
|
||||
break;
|
||||
}
|
||||
dequeue_op = dequeue_del(op);
|
||||
has_writes = dequeue_op ? 1 : 2;
|
||||
}
|
||||
else if (op->opcode == BS_OP_SYNC)
|
||||
{
|
||||
// wait for all small writes to be submitted
|
||||
|
@ -370,7 +380,7 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op, bool first)
|
|||
}
|
||||
};
|
||||
}
|
||||
if (op->opcode == BS_OP_WRITE && !enqueue_write(op))
|
||||
if ((op->opcode == BS_OP_WRITE || op->opcode == BS_OP_DELETE) && !enqueue_write(op))
|
||||
{
|
||||
std::function<void (blockstore_op_t*)>(op->callback)(op);
|
||||
return;
|
||||
|
|
|
@ -671,6 +671,10 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
|
|||
#ifdef BLOCKSTORE_DEBUG
|
||||
printf("je_delete oid=%lu:%lu ver=%lu\n", je->del.oid.inode, je->del.oid.stripe, je->del.version);
|
||||
#endif
|
||||
auto clean_it = bs->clean_db.find(je->del.oid);
|
||||
if (clean_it == bs->clean_db.end() ||
|
||||
clean_it->second.version < je->del.version)
|
||||
{
|
||||
// oid, version
|
||||
obj_ver_id ov = {
|
||||
.oid = je->del.oid,
|
||||
|
@ -686,6 +690,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
|
|||
});
|
||||
bs->journal.used_sectors[proc_pos]++;
|
||||
}
|
||||
}
|
||||
started = true;
|
||||
pos += je->size;
|
||||
crc32_last = je->crc32;
|
||||
|
|
|
@ -100,6 +100,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
|
|||
.oid = op->oid,
|
||||
.version = op->version,
|
||||
});
|
||||
assert(dirty_it != dirty_db.end());
|
||||
if (dirty_it->second.state == ST_J_WAIT_BIG)
|
||||
{
|
||||
return 0;
|
||||
|
@ -292,6 +293,7 @@ int blockstore_impl_t::continue_write(blockstore_op_t *op)
|
|||
.oid = op->oid,
|
||||
.version = op->version,
|
||||
});
|
||||
assert(dirty_it != dirty_db.end());
|
||||
if (PRIV(op)->op_state == 2)
|
||||
goto resume_2;
|
||||
else if (PRIV(op)->op_state == 4)
|
||||
|
@ -435,6 +437,7 @@ int blockstore_impl_t::dequeue_del(blockstore_op_t *op)
|
|||
.oid = op->oid,
|
||||
.version = op->version,
|
||||
});
|
||||
assert(dirty_it != dirty_db.end());
|
||||
blockstore_journal_check_t space_check(this);
|
||||
if (!space_check.check_available(op, 1, sizeof(journal_entry_del), 0))
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue