Put get_trim_pos into the "critical section". Fixes rare journal corruption issue
The consequence of this issue was that in some very rare cases (only reproduced under load in CI when running 4+ tests in parallel) small write data written to journal could overwrite journal entries. Also add an assert-type safety check to be able to catch this issue in the future again in case of a regression.openssl
parent
bdd48e4cf1
commit
86b4682975
|
@ -675,53 +675,58 @@ resume_1:
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
flusher->trimming = true;
|
flusher->trimming = true;
|
||||||
// First update journal "superblock" and only then update <used_start> in memory
|
// Recheck the position with the "lock" taken
|
||||||
await_sqe(12);
|
new_trim_pos = bs->journal.get_trim_pos();
|
||||||
*((journal_entry_start*)flusher->journal_superblock) = {
|
if (new_trim_pos != bs->journal.used_start)
|
||||||
.crc32 = 0,
|
|
||||||
.magic = JOURNAL_MAGIC,
|
|
||||||
.type = JE_START,
|
|
||||||
.size = sizeof(journal_entry_start),
|
|
||||||
.reserved = 0,
|
|
||||||
.journal_start = new_trim_pos,
|
|
||||||
.version = JOURNAL_VERSION,
|
|
||||||
};
|
|
||||||
((journal_entry_start*)flusher->journal_superblock)->crc32 = je_crc32((journal_entry*)flusher->journal_superblock);
|
|
||||||
data->iov = (struct iovec){ flusher->journal_superblock, bs->dsk.journal_block_size };
|
|
||||||
data->callback = simple_callback_w;
|
|
||||||
my_uring_prep_writev(sqe, bs->dsk.journal_fd, &data->iov, 1, bs->journal.offset);
|
|
||||||
wait_count++;
|
|
||||||
resume_13:
|
|
||||||
if (wait_count > 0)
|
|
||||||
{
|
{
|
||||||
wait_state = 13;
|
// First update journal "superblock" and only then update <used_start> in memory
|
||||||
return false;
|
await_sqe(12);
|
||||||
}
|
*((journal_entry_start*)flusher->journal_superblock) = {
|
||||||
if (!bs->disable_journal_fsync)
|
.crc32 = 0,
|
||||||
{
|
.magic = JOURNAL_MAGIC,
|
||||||
await_sqe(20);
|
.type = JE_START,
|
||||||
my_uring_prep_fsync(sqe, bs->dsk.journal_fd, IORING_FSYNC_DATASYNC);
|
.size = sizeof(journal_entry_start),
|
||||||
data->iov = { 0 };
|
.reserved = 0,
|
||||||
|
.journal_start = new_trim_pos,
|
||||||
|
.version = JOURNAL_VERSION,
|
||||||
|
};
|
||||||
|
((journal_entry_start*)flusher->journal_superblock)->crc32 = je_crc32((journal_entry*)flusher->journal_superblock);
|
||||||
|
data->iov = (struct iovec){ flusher->journal_superblock, bs->dsk.journal_block_size };
|
||||||
data->callback = simple_callback_w;
|
data->callback = simple_callback_w;
|
||||||
resume_21:
|
my_uring_prep_writev(sqe, bs->dsk.journal_fd, &data->iov, 1, bs->journal.offset);
|
||||||
|
wait_count++;
|
||||||
|
resume_13:
|
||||||
if (wait_count > 0)
|
if (wait_count > 0)
|
||||||
{
|
{
|
||||||
wait_state = 21;
|
wait_state = 13;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
if (!bs->disable_journal_fsync)
|
||||||
bs->journal.used_start = new_trim_pos;
|
{
|
||||||
|
await_sqe(20);
|
||||||
|
my_uring_prep_fsync(sqe, bs->dsk.journal_fd, IORING_FSYNC_DATASYNC);
|
||||||
|
data->iov = { 0 };
|
||||||
|
data->callback = simple_callback_w;
|
||||||
|
resume_21:
|
||||||
|
if (wait_count > 0)
|
||||||
|
{
|
||||||
|
wait_state = 21;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
bs->journal.used_start = new_trim_pos;
|
||||||
#ifdef BLOCKSTORE_DEBUG
|
#ifdef BLOCKSTORE_DEBUG
|
||||||
printf("Journal trimmed to %08lx (next_free=%08lx)\n", bs->journal.used_start, bs->journal.next_free);
|
printf("Journal trimmed to %08lx (next_free=%08lx)\n", bs->journal.used_start, bs->journal.next_free);
|
||||||
#endif
|
#endif
|
||||||
|
if (bs->journal.flush_journal && !flusher->flush_queue.size())
|
||||||
|
{
|
||||||
|
assert(bs->journal.used_start == bs->journal.next_free);
|
||||||
|
printf("Journal flushed\n");
|
||||||
|
exit(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
flusher->trimming = false;
|
flusher->trimming = false;
|
||||||
}
|
}
|
||||||
if (bs->journal.flush_journal && !flusher->flush_queue.size())
|
|
||||||
{
|
|
||||||
assert(bs->journal.used_start == bs->journal.next_free);
|
|
||||||
printf("Journal flushed\n");
|
|
||||||
exit(0);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// All done
|
// All done
|
||||||
flusher->active_flushers--;
|
flusher->active_flushers--;
|
||||||
|
|
|
@ -236,14 +236,6 @@ journal_t::~journal_t()
|
||||||
uint64_t journal_t::get_trim_pos()
|
uint64_t journal_t::get_trim_pos()
|
||||||
{
|
{
|
||||||
auto journal_used_it = used_sectors.lower_bound(used_start);
|
auto journal_used_it = used_sectors.lower_bound(used_start);
|
||||||
#ifdef BLOCKSTORE_DEBUG
|
|
||||||
printf(
|
|
||||||
"Trimming journal (used_start=%08lx, next_free=%08lx, dirty_start=%08lx, new_start=%08lx, new_refcount=%ld)\n",
|
|
||||||
used_start, next_free, dirty_start,
|
|
||||||
journal_used_it == used_sectors.end() ? 0 : journal_used_it->first,
|
|
||||||
journal_used_it == used_sectors.end() ? 0 : journal_used_it->second
|
|
||||||
);
|
|
||||||
#endif
|
|
||||||
if (journal_used_it == used_sectors.end())
|
if (journal_used_it == used_sectors.end())
|
||||||
{
|
{
|
||||||
// Journal is cleared to its end, restart from the beginning
|
// Journal is cleared to its end, restart from the beginning
|
||||||
|
@ -256,12 +248,26 @@ uint64_t journal_t::get_trim_pos()
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
// next_free does not need updating during trim
|
// next_free does not need updating during trim
|
||||||
|
#ifdef BLOCKSTORE_DEBUG
|
||||||
|
printf(
|
||||||
|
"Trimming journal (used_start=%08lx, next_free=%08lx, dirty_start=%08lx, new_start=%08lx, new_refcount=%ld)\n",
|
||||||
|
used_start, next_free, dirty_start,
|
||||||
|
journal_used_it->first, journal_used_it->second
|
||||||
|
);
|
||||||
|
#endif
|
||||||
return journal_used_it->first;
|
return journal_used_it->first;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (journal_used_it->first > used_start)
|
else if (journal_used_it->first > used_start)
|
||||||
{
|
{
|
||||||
// Journal is cleared up to <journal_used_it>
|
// Journal is cleared up to <journal_used_it>
|
||||||
|
#ifdef BLOCKSTORE_DEBUG
|
||||||
|
printf(
|
||||||
|
"Trimming journal (used_start=%08lx, next_free=%08lx, dirty_start=%08lx, new_start=%08lx, new_refcount=%ld)\n",
|
||||||
|
used_start, next_free, dirty_start,
|
||||||
|
journal_used_it->first, journal_used_it->second
|
||||||
|
);
|
||||||
|
#endif
|
||||||
return journal_used_it->first;
|
return journal_used_it->first;
|
||||||
}
|
}
|
||||||
// Can't trim journal
|
// Can't trim journal
|
||||||
|
|
|
@ -409,7 +409,23 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
|
||||||
);
|
);
|
||||||
#endif
|
#endif
|
||||||
// Figure out where data will be
|
// Figure out where data will be
|
||||||
journal.next_free = (journal.next_free + op->len) <= journal.len ? journal.next_free : dsk.journal_block_size;
|
auto next_next_free = (journal.next_free + op->len) <= journal.len ? journal.next_free : dsk.journal_block_size;
|
||||||
|
if (op->len > 0)
|
||||||
|
{
|
||||||
|
auto journal_used_it = journal.used_sectors.lower_bound(next_next_free);
|
||||||
|
if (journal_used_it != journal.used_sectors.end() &&
|
||||||
|
journal_used_it->first < next_next_free + op->len)
|
||||||
|
{
|
||||||
|
printf(
|
||||||
|
"BUG: Attempt to overwrite used offset (%lx, %lu refs) of the journal with the object %lx:%lx v%lu: data at %lx, len %x!"
|
||||||
|
" Journal used_start=%08lx (%lu refs), next_free=%08lx, dirty_start=%08lx\n",
|
||||||
|
journal_used_it->first, journal_used_it->second, op->oid.inode, op->oid.stripe, op->version, next_next_free, op->len,
|
||||||
|
journal.used_start, journal.used_sectors[journal.used_start], journal.next_free, journal.dirty_start
|
||||||
|
);
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
journal.next_free = next_next_free;
|
||||||
je->oid = op->oid;
|
je->oid = op->oid;
|
||||||
je->version = op->version;
|
je->version = op->version;
|
||||||
je->offset = op->offset;
|
je->offset = op->offset;
|
||||||
|
|
Loading…
Reference in New Issue