forked from vitalif/vitastor
Fix write->delete->write bugs, add & fix some debugging output
parent
a45e0e5e67
commit
1018764c91
|
@ -76,6 +76,9 @@ void journal_flusher_t::loop()
|
||||||
|
|
||||||
void journal_flusher_t::enqueue_flush(obj_ver_id ov)
|
void journal_flusher_t::enqueue_flush(obj_ver_id ov)
|
||||||
{
|
{
|
||||||
|
#ifdef BLOCKSTORE_DEBUG
|
||||||
|
printf("enqueue_flush %lx:%lx v%lu\n", ov.oid.inode, ov.oid.stripe, ov.version);
|
||||||
|
#endif
|
||||||
auto it = flush_versions.find(ov.oid);
|
auto it = flush_versions.find(ov.oid);
|
||||||
if (it != flush_versions.end())
|
if (it != flush_versions.end())
|
||||||
{
|
{
|
||||||
|
@ -94,8 +97,11 @@ void journal_flusher_t::enqueue_flush(obj_ver_id ov)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void journal_flusher_t::unshift_flush(obj_ver_id ov)
|
void journal_flusher_t::unshift_flush(obj_ver_id ov, bool force)
|
||||||
{
|
{
|
||||||
|
#ifdef BLOCKSTORE_DEBUG
|
||||||
|
printf("unshift_flush %lx:%lx v%lu\n", ov.oid.inode, ov.oid.stripe, ov.version);
|
||||||
|
#endif
|
||||||
auto it = flush_versions.find(ov.oid);
|
auto it = flush_versions.find(ov.oid);
|
||||||
if (it != flush_versions.end())
|
if (it != flush_versions.end())
|
||||||
{
|
{
|
||||||
|
@ -105,15 +111,38 @@ void journal_flusher_t::unshift_flush(obj_ver_id ov)
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
flush_versions[ov.oid] = ov.version;
|
flush_versions[ov.oid] = ov.version;
|
||||||
|
if (!force)
|
||||||
|
flush_queue.push_front(ov.oid);
|
||||||
}
|
}
|
||||||
flush_queue.push_front(ov.oid);
|
if (force)
|
||||||
if (!dequeuing && (flush_queue.size() >= flusher_start_threshold || trim_wanted > 0))
|
flush_queue.push_front(ov.oid);
|
||||||
|
if (force || !dequeuing && (flush_queue.size() >= flusher_start_threshold || trim_wanted > 0))
|
||||||
{
|
{
|
||||||
dequeuing = true;
|
dequeuing = true;
|
||||||
bs->ringloop->wakeup();
|
bs->ringloop->wakeup();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void journal_flusher_t::remove_flush(object_id oid)
|
||||||
|
{
|
||||||
|
#ifdef BLOCKSTORE_DEBUG
|
||||||
|
printf("undo_flush %lx:%lx\n", oid.inode, oid.stripe);
|
||||||
|
#endif
|
||||||
|
auto v_it = flush_versions.find(oid);
|
||||||
|
if (v_it != flush_versions.end())
|
||||||
|
{
|
||||||
|
flush_versions.erase(v_it);
|
||||||
|
for (auto q_it = flush_queue.begin(); q_it != flush_queue.end(); q_it++)
|
||||||
|
{
|
||||||
|
if (*q_it == oid)
|
||||||
|
{
|
||||||
|
flush_queue.erase(q_it);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void journal_flusher_t::request_trim()
|
void journal_flusher_t::request_trim()
|
||||||
{
|
{
|
||||||
dequeuing = true;
|
dequeuing = true;
|
||||||
|
@ -319,8 +348,8 @@ resume_1:
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
// Writes and deletes shouldn't happen at the same time
|
// Writes and deletes shouldn't happen at the same time
|
||||||
assert(!(copy_count > 0 || has_writes) || !has_delete);
|
assert(!has_writes || !has_delete);
|
||||||
if (copy_count == 0 && !has_writes && !has_delete || has_delete && old_clean_loc == UINT64_MAX)
|
if (!has_writes && !has_delete || has_delete && old_clean_loc == UINT64_MAX)
|
||||||
{
|
{
|
||||||
// Nothing to flush
|
// Nothing to flush
|
||||||
bs->erase_dirty(dirty_start, std::next(dirty_end), clean_loc);
|
bs->erase_dirty(dirty_start, std::next(dirty_end), clean_loc);
|
||||||
|
@ -445,8 +474,8 @@ resume_1:
|
||||||
clean_disk_entry *new_entry = (clean_disk_entry*)(meta_new.buf + meta_new.pos*bs->clean_entry_size);
|
clean_disk_entry *new_entry = (clean_disk_entry*)(meta_new.buf + meta_new.pos*bs->clean_entry_size);
|
||||||
if (new_entry->oid.inode != 0 && new_entry->oid != cur.oid)
|
if (new_entry->oid.inode != 0 && new_entry->oid != cur.oid)
|
||||||
{
|
{
|
||||||
printf("Fatal error (metadata corruption or bug): tried to overwrite non-zero metadata entry %lx (%lx:%lx) with %lx:%lx\n",
|
printf("Fatal error (metadata corruption or bug): tried to overwrite non-zero metadata entry %lu (%lx:%lx) with %lx:%lx\n",
|
||||||
clean_loc, new_entry->oid.inode, new_entry->oid.stripe, cur.oid.inode, cur.oid.stripe);
|
clean_loc >> bs->block_order, new_entry->oid.inode, new_entry->oid.stripe, cur.oid.inode, cur.oid.stripe);
|
||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
new_entry->oid = cur.oid;
|
new_entry->oid = cur.oid;
|
||||||
|
@ -513,7 +542,7 @@ resume_1:
|
||||||
if (repeat_it != flusher->sync_to_repeat.end() && repeat_it->second > cur.version)
|
if (repeat_it != flusher->sync_to_repeat.end() && repeat_it->second > cur.version)
|
||||||
{
|
{
|
||||||
// Requeue version
|
// Requeue version
|
||||||
flusher->unshift_flush({ .oid = cur.oid, .version = repeat_it->second });
|
flusher->unshift_flush({ .oid = cur.oid, .version = repeat_it->second }, false);
|
||||||
}
|
}
|
||||||
flusher->sync_to_repeat.erase(repeat_it);
|
flusher->sync_to_repeat.erase(repeat_it);
|
||||||
trim_journal:
|
trim_journal:
|
||||||
|
@ -602,7 +631,7 @@ bool journal_flusher_co::scan_dirty(int wait_base)
|
||||||
{
|
{
|
||||||
char err[1024];
|
char err[1024];
|
||||||
snprintf(
|
snprintf(
|
||||||
err, 1024, "BUG: Unexpected dirty_entry %lx:%lx v%lu state during flush: %d",
|
err, 1024, "BUG: Unexpected dirty_entry %lx:%lx v%lu unstable state during flush: %d",
|
||||||
dirty_it->first.oid.inode, dirty_it->first.oid.stripe, dirty_it->first.version, dirty_it->second.state
|
dirty_it->first.oid.inode, dirty_it->first.oid.stripe, dirty_it->first.version, dirty_it->second.state
|
||||||
);
|
);
|
||||||
throw std::runtime_error(err);
|
throw std::runtime_error(err);
|
||||||
|
|
|
@ -107,5 +107,6 @@ public:
|
||||||
void request_trim();
|
void request_trim();
|
||||||
void release_trim();
|
void release_trim();
|
||||||
void enqueue_flush(obj_ver_id oid);
|
void enqueue_flush(obj_ver_id oid);
|
||||||
void unshift_flush(obj_ver_id oid);
|
void unshift_flush(obj_ver_id oid, bool force);
|
||||||
|
void remove_flush(object_id oid);
|
||||||
};
|
};
|
||||||
|
|
|
@ -111,7 +111,7 @@ void blockstore_init_meta::handle_entries(void* entries, unsigned count, int blo
|
||||||
{
|
{
|
||||||
// free the previous block
|
// free the previous block
|
||||||
#ifdef BLOCKSTORE_DEBUG
|
#ifdef BLOCKSTORE_DEBUG
|
||||||
printf("Free block %lu (new location is %lu)\n", clean_it->second.location >> block_order, done_cnt+i >> block_order);
|
printf("Free block %lu (new location is %lu)\n", clean_it->second.location >> block_order, done_cnt+i);
|
||||||
#endif
|
#endif
|
||||||
bs->data_alloc->set(clean_it->second.location >> block_order, false);
|
bs->data_alloc->set(clean_it->second.location >> block_order, false);
|
||||||
}
|
}
|
||||||
|
@ -557,9 +557,9 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
|
||||||
{
|
{
|
||||||
#ifdef BLOCKSTORE_DEBUG
|
#ifdef BLOCKSTORE_DEBUG
|
||||||
printf(
|
printf(
|
||||||
"je_big_write%s oid=%lx:%lx ver=%lu loc=%08lx\n",
|
"je_big_write%s oid=%lx:%lx ver=%lu loc=%lu\n",
|
||||||
je->type == JE_BIG_WRITE_INSTANT ? "_instant" : "",
|
je->type == JE_BIG_WRITE_INSTANT ? "_instant" : "",
|
||||||
je->big_write.oid.inode, je->big_write.oid.stripe, je->big_write.version, je->big_write.location
|
je->big_write.oid.inode, je->big_write.oid.stripe, je->big_write.version, je->big_write.location >> bs->block_order
|
||||||
);
|
);
|
||||||
#endif
|
#endif
|
||||||
auto dirty_it = bs->dirty_db.upper_bound((obj_ver_id){
|
auto dirty_it = bs->dirty_db.upper_bound((obj_ver_id){
|
||||||
|
@ -570,13 +570,18 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
|
||||||
{
|
{
|
||||||
dirty_it--;
|
dirty_it--;
|
||||||
if (dirty_it->first.oid == je->big_write.oid &&
|
if (dirty_it->first.oid == je->big_write.oid &&
|
||||||
|
dirty_it->first.version >= je->big_write.version &&
|
||||||
(dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_DELETE)
|
(dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_DELETE)
|
||||||
{
|
{
|
||||||
// It is allowed to overwrite a deleted object with a
|
// It is allowed to overwrite a deleted object with a
|
||||||
// version number less than deletion version number,
|
// version number smaller than deletion version number,
|
||||||
// because the presence of a BIG_WRITE entry means that
|
// because the presence of a BIG_WRITE entry means that
|
||||||
// the data for it is already on disk.
|
// its data and metadata are already flushed.
|
||||||
// Purge all dirty and clean entries for this object.
|
// We don't know if newer versions are flushed, but
|
||||||
|
// the previous delete definitely is.
|
||||||
|
// So we flush previous dirty entries, but retain the clean one.
|
||||||
|
// This feature is required for writes happening shortly
|
||||||
|
// after deletes.
|
||||||
auto dirty_end = dirty_it;
|
auto dirty_end = dirty_it;
|
||||||
dirty_end++;
|
dirty_end++;
|
||||||
while (1)
|
while (1)
|
||||||
|
@ -592,13 +597,14 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
bs->erase_dirty(dirty_it, dirty_end, UINT64_MAX);
|
|
||||||
auto clean_it = bs->clean_db.find(je->big_write.oid);
|
auto clean_it = bs->clean_db.find(je->big_write.oid);
|
||||||
if (clean_it != bs->clean_db.end())
|
bs->erase_dirty(
|
||||||
{
|
dirty_it, dirty_end,
|
||||||
bs->data_alloc->set(clean_it->second.location >> bs->block_order, false);
|
clean_it != bs->clean_db.end() ? clean_it->second.location : UINT64_MAX
|
||||||
bs->clean_db.erase(clean_it);
|
);
|
||||||
}
|
// Remove it from the flusher's queue, too
|
||||||
|
// Otherwise it may end up referring to a small unstable write after reading the rest of the journal
|
||||||
|
bs->flusher->remove_flush(je->big_write.oid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
auto clean_it = bs->clean_db.find(je->big_write.oid);
|
auto clean_it = bs->clean_db.find(je->big_write.oid);
|
||||||
|
|
|
@ -243,6 +243,9 @@ void blockstore_impl_t::erase_dirty(blockstore_dirty_db_t::iterator dirty_start,
|
||||||
if (IS_DELETE(dirty_it->second.state))
|
if (IS_DELETE(dirty_it->second.state))
|
||||||
{
|
{
|
||||||
object_id oid = dirty_it->first.oid;
|
object_id oid = dirty_it->first.oid;
|
||||||
|
#ifdef BLOCKSTORE_DEBUG
|
||||||
|
printf("Unblock writes-after-delete %lx:%lx v%lx\n", oid.inode, oid.stripe, dirty_it->first.version);
|
||||||
|
#endif
|
||||||
dirty_it = dirty_end;
|
dirty_it = dirty_end;
|
||||||
// Unblock operations blocked by delete flushing
|
// Unblock operations blocked by delete flushing
|
||||||
uint32_t next_state = BS_ST_IN_FLIGHT;
|
uint32_t next_state = BS_ST_IN_FLIGHT;
|
||||||
|
|
|
@ -213,9 +213,6 @@ void blockstore_impl_t::mark_stable(const obj_ver_id & v)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#ifdef BLOCKSTORE_DEBUG
|
|
||||||
printf("enqueue_flush %lx:%lx v%lu\n", v.oid.inode, v.oid.stripe, v.version);
|
|
||||||
#endif
|
|
||||||
flusher->enqueue_flush(v);
|
flusher->enqueue_flush(v);
|
||||||
}
|
}
|
||||||
auto unstab_it = unstable_writes.find(v.oid);
|
auto unstab_it = unstable_writes.find(v.oid);
|
||||||
|
|
|
@ -57,13 +57,16 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op)
|
||||||
{
|
{
|
||||||
// It's allowed to write versions with low numbers over deletes
|
// It's allowed to write versions with low numbers over deletes
|
||||||
// However, we have to flush those deletes first as we use version number for ordering
|
// However, we have to flush those deletes first as we use version number for ordering
|
||||||
|
#ifdef BLOCKSTORE_DEBUG
|
||||||
|
printf("Write %lx:%lx v%lu over delete (real v%lu) offset=%u len=%u\n", op->oid.inode, op->oid.stripe, version, op->version, op->offset, op->len);
|
||||||
|
#endif
|
||||||
wait_del = true;
|
wait_del = true;
|
||||||
PRIV(op)->real_version = op->version;
|
PRIV(op)->real_version = op->version;
|
||||||
op->version = version;
|
op->version = version;
|
||||||
flusher->unshift_flush((obj_ver_id){
|
flusher->unshift_flush((obj_ver_id){
|
||||||
.oid = op->oid,
|
.oid = op->oid,
|
||||||
.version = version-1,
|
.version = version-1,
|
||||||
});
|
}, true);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -87,7 +90,7 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op)
|
||||||
#ifdef BLOCKSTORE_DEBUG
|
#ifdef BLOCKSTORE_DEBUG
|
||||||
if (is_del)
|
if (is_del)
|
||||||
printf("Delete %lx:%lx v%lu\n", op->oid.inode, op->oid.stripe, op->version);
|
printf("Delete %lx:%lx v%lu\n", op->oid.inode, op->oid.stripe, op->version);
|
||||||
else
|
else if (!wait_del)
|
||||||
printf("Write %lx:%lx v%lu offset=%u len=%u\n", op->oid.inode, op->oid.stripe, op->version, op->offset, op->len);
|
printf("Write %lx:%lx v%lu offset=%u len=%u\n", op->oid.inode, op->oid.stripe, op->version, op->offset, op->len);
|
||||||
#endif
|
#endif
|
||||||
// FIXME No strict need to add it into dirty_db here, it's just left
|
// FIXME No strict need to add it into dirty_db here, it's just left
|
||||||
|
@ -141,6 +144,9 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
|
||||||
if (PRIV(op)->real_version != 0)
|
if (PRIV(op)->real_version != 0)
|
||||||
{
|
{
|
||||||
// Restore original low version number for unblocked operations
|
// Restore original low version number for unblocked operations
|
||||||
|
#ifdef BLOCKSTORE_DEBUG
|
||||||
|
printf("Restoring %lx:%lx version: v%lu -> v%lu\n", op->oid.inode, op->oid.stripe, op->version, PRIV(op)->real_version);
|
||||||
|
#endif
|
||||||
auto prev_it = dirty_it;
|
auto prev_it = dirty_it;
|
||||||
prev_it--;
|
prev_it--;
|
||||||
if (prev_it->first.oid == op->oid && prev_it->first.version >= PRIV(op)->real_version)
|
if (prev_it->first.oid == op->oid && prev_it->first.version >= PRIV(op)->real_version)
|
||||||
|
@ -396,7 +402,7 @@ resume_2:
|
||||||
resume_4:
|
resume_4:
|
||||||
// Switch object state
|
// Switch object state
|
||||||
#ifdef BLOCKSTORE_DEBUG
|
#ifdef BLOCKSTORE_DEBUG
|
||||||
printf("Ack write %lx:%lx v%lu = %d\n", op->oid.inode, op->oid.stripe, op->version, dirty_it->second.state);
|
printf("Ack write %lx:%lx v%lu = state %x\n", op->oid.inode, op->oid.stripe, op->version, dirty_it->second.state);
|
||||||
#endif
|
#endif
|
||||||
bool imm = (dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE
|
bool imm = (dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE
|
||||||
? (immediate_commit == IMMEDIATE_ALL)
|
? (immediate_commit == IMMEDIATE_ALL)
|
||||||
|
|
Loading…
Reference in New Issue