forked from vitalif/vitastor
Fix multiple-sector journal writes, add assertions to not miss any SQEs
parent
e66ed47515
commit
9b5d8b9ad4
|
@ -25,6 +25,10 @@ int blockstore_journal_check_t::check_available(blockstore_op_t *op, int entries
|
||||||
: (bs->journal.block_size - next_in_pos) / size;
|
: (bs->journal.block_size - next_in_pos) / size;
|
||||||
if (fits > 0)
|
if (fits > 0)
|
||||||
{
|
{
|
||||||
|
if (fits > required)
|
||||||
|
{
|
||||||
|
fits = required;
|
||||||
|
}
|
||||||
if (first_sector == -1)
|
if (first_sector == -1)
|
||||||
{
|
{
|
||||||
first_sector = next_sector;
|
first_sector = next_sector;
|
||||||
|
@ -116,12 +120,10 @@ int blockstore_journal_check_t::check_available(blockstore_op_t *op, int entries
|
||||||
|
|
||||||
journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type, uint32_t size)
|
journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type, uint32_t size)
|
||||||
{
|
{
|
||||||
if (journal.block_size - journal.in_sector_pos < size ||
|
if (!journal.entry_fits(size))
|
||||||
journal.no_same_sector_overwrites && journal.sector_info[journal.cur_sector].written)
|
|
||||||
{
|
{
|
||||||
assert(!journal.sector_info[journal.cur_sector].dirty);
|
assert(!journal.sector_info[journal.cur_sector].dirty);
|
||||||
// Move to the next journal sector
|
// Move to the next journal sector
|
||||||
journal.sector_info[journal.cur_sector].written = false;
|
|
||||||
if (journal.sector_info[journal.cur_sector].usage_count > 0)
|
if (journal.sector_info[journal.cur_sector].usage_count > 0)
|
||||||
{
|
{
|
||||||
// Also select next sector buffer in memory
|
// Also select next sector buffer in memory
|
||||||
|
@ -132,6 +134,7 @@ journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type,
|
||||||
{
|
{
|
||||||
journal.dirty_start = journal.next_free;
|
journal.dirty_start = journal.next_free;
|
||||||
}
|
}
|
||||||
|
journal.sector_info[journal.cur_sector].written = false;
|
||||||
journal.sector_info[journal.cur_sector].offset = journal.next_free;
|
journal.sector_info[journal.cur_sector].offset = journal.next_free;
|
||||||
journal.in_sector_pos = 0;
|
journal.in_sector_pos = 0;
|
||||||
journal.next_free = (journal.next_free+journal.block_size) < journal.len ? journal.next_free + journal.block_size : journal.block_size;
|
journal.next_free = (journal.next_free+journal.block_size) < journal.len ? journal.next_free + journal.block_size : journal.block_size;
|
||||||
|
|
|
@ -133,7 +133,7 @@ inline uint32_t je_crc32(journal_entry *je)
|
||||||
struct journal_sector_info_t
|
struct journal_sector_info_t
|
||||||
{
|
{
|
||||||
uint64_t offset;
|
uint64_t offset;
|
||||||
uint64_t usage_count;
|
uint64_t usage_count; // flusher_count!
|
||||||
bool written;
|
bool written;
|
||||||
bool dirty;
|
bool dirty;
|
||||||
};
|
};
|
||||||
|
@ -170,13 +170,18 @@ struct journal_t
|
||||||
~journal_t();
|
~journal_t();
|
||||||
bool trim();
|
bool trim();
|
||||||
uint64_t get_trim_pos();
|
uint64_t get_trim_pos();
|
||||||
|
inline bool entry_fits(int size)
|
||||||
|
{
|
||||||
|
return !(block_size - in_sector_pos < size ||
|
||||||
|
no_same_sector_overwrites && sector_info[cur_sector].written);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
struct blockstore_journal_check_t
|
struct blockstore_journal_check_t
|
||||||
{
|
{
|
||||||
blockstore_impl_t *bs;
|
blockstore_impl_t *bs;
|
||||||
uint64_t next_pos, next_sector, next_in_pos;
|
uint64_t next_pos, next_sector, next_in_pos;
|
||||||
int sectors_required, first_sector;
|
int sectors_required, first_sector; // "sectors to write"
|
||||||
bool right_dir; // writing to the end or the beginning of the ring buffer
|
bool right_dir; // writing to the end or the beginning of the ring buffer
|
||||||
|
|
||||||
blockstore_journal_check_t(blockstore_impl_t *bs);
|
blockstore_journal_check_t(blockstore_impl_t *bs);
|
||||||
|
|
|
@ -83,35 +83,27 @@ skip_ov:
|
||||||
// Prepare and submit journal entries
|
// Prepare and submit journal entries
|
||||||
auto cb = [this, op](ring_data_t *data) { handle_rollback_event(data, op); };
|
auto cb = [this, op](ring_data_t *data) { handle_rollback_event(data, op); };
|
||||||
int s = 0, cur_sector = -1;
|
int s = 0, cur_sector = -1;
|
||||||
if ((journal_block_size - journal.in_sector_pos) < sizeof(journal_entry_rollback) &&
|
|
||||||
journal.sector_info[journal.cur_sector].dirty)
|
|
||||||
{
|
|
||||||
PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector;
|
|
||||||
prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb);
|
|
||||||
cur_sector = ((journal.cur_sector + 1) % journal.sector_count);
|
|
||||||
}
|
|
||||||
for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++)
|
for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++)
|
||||||
{
|
{
|
||||||
|
if (!journal.entry_fits(sizeof(journal_entry_rollback)) &&
|
||||||
|
journal.sector_info[journal.cur_sector].dirty)
|
||||||
|
{
|
||||||
|
if (cur_sector == -1)
|
||||||
|
PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector;
|
||||||
|
prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb);
|
||||||
|
cur_sector = journal.cur_sector;
|
||||||
|
}
|
||||||
journal_entry_rollback *je = (journal_entry_rollback*)
|
journal_entry_rollback *je = (journal_entry_rollback*)
|
||||||
prefill_single_journal_entry(journal, JE_ROLLBACK, sizeof(journal_entry_rollback));
|
prefill_single_journal_entry(journal, JE_ROLLBACK, sizeof(journal_entry_rollback));
|
||||||
journal.sector_info[journal.cur_sector].dirty = false;
|
|
||||||
je->oid = v->oid;
|
je->oid = v->oid;
|
||||||
je->version = v->version;
|
je->version = v->version;
|
||||||
je->crc32 = je_crc32((journal_entry*)je);
|
je->crc32 = je_crc32((journal_entry*)je);
|
||||||
journal.crc32_last = je->crc32;
|
journal.crc32_last = je->crc32;
|
||||||
if (cur_sector != journal.cur_sector)
|
|
||||||
{
|
|
||||||
// Write previous sector. We should write the sector only after filling it,
|
|
||||||
// because otherwise we'll write a lot more sectors in the "no_same_sector_overwrite" mode
|
|
||||||
if (cur_sector != -1)
|
|
||||||
prepare_journal_sector_write(journal, cur_sector, sqe[s++], cb);
|
|
||||||
else
|
|
||||||
PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector;
|
|
||||||
cur_sector = journal.cur_sector;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (cur_sector != -1)
|
prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb);
|
||||||
prepare_journal_sector_write(journal, cur_sector, sqe[s++], cb);
|
assert(s == space_check.sectors_required);
|
||||||
|
if (cur_sector == -1)
|
||||||
|
PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector;
|
||||||
PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
|
PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
|
||||||
PRIV(op)->pending_ops = s;
|
PRIV(op)->pending_ops = s;
|
||||||
PRIV(op)->op_state = 1;
|
PRIV(op)->op_state = 1;
|
||||||
|
|
|
@ -106,36 +106,28 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op)
|
||||||
// Prepare and submit journal entries
|
// Prepare and submit journal entries
|
||||||
auto cb = [this, op](ring_data_t *data) { handle_stable_event(data, op); };
|
auto cb = [this, op](ring_data_t *data) { handle_stable_event(data, op); };
|
||||||
int s = 0, cur_sector = -1;
|
int s = 0, cur_sector = -1;
|
||||||
if ((journal_block_size - journal.in_sector_pos) < sizeof(journal_entry_stable) &&
|
|
||||||
journal.sector_info[journal.cur_sector].dirty)
|
|
||||||
{
|
|
||||||
PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector;
|
|
||||||
prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb);
|
|
||||||
cur_sector = ((journal.cur_sector + 1) % journal.sector_count);
|
|
||||||
}
|
|
||||||
for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++)
|
for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++)
|
||||||
{
|
{
|
||||||
// FIXME: Only stabilize versions that aren't stable yet
|
// FIXME: Only stabilize versions that aren't stable yet
|
||||||
|
if (!journal.entry_fits(sizeof(journal_entry_stable)) &&
|
||||||
|
journal.sector_info[journal.cur_sector].dirty)
|
||||||
|
{
|
||||||
|
if (cur_sector == -1)
|
||||||
|
PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector;
|
||||||
|
prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb);
|
||||||
|
cur_sector = journal.cur_sector;
|
||||||
|
}
|
||||||
journal_entry_stable *je = (journal_entry_stable*)
|
journal_entry_stable *je = (journal_entry_stable*)
|
||||||
prefill_single_journal_entry(journal, JE_STABLE, sizeof(journal_entry_stable));
|
prefill_single_journal_entry(journal, JE_STABLE, sizeof(journal_entry_stable));
|
||||||
journal.sector_info[journal.cur_sector].dirty = false;
|
|
||||||
je->oid = v->oid;
|
je->oid = v->oid;
|
||||||
je->version = v->version;
|
je->version = v->version;
|
||||||
je->crc32 = je_crc32((journal_entry*)je);
|
je->crc32 = je_crc32((journal_entry*)je);
|
||||||
journal.crc32_last = je->crc32;
|
journal.crc32_last = je->crc32;
|
||||||
if (cur_sector != journal.cur_sector)
|
|
||||||
{
|
|
||||||
// Write previous sector. We should write the sector only after filling it,
|
|
||||||
// because otherwise we'll write a lot more sectors in the "no_same_sector_overwrite" mode
|
|
||||||
if (cur_sector != -1)
|
|
||||||
prepare_journal_sector_write(journal, cur_sector, sqe[s++], cb);
|
|
||||||
else
|
|
||||||
PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector;
|
|
||||||
cur_sector = journal.cur_sector;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (cur_sector != -1)
|
prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb);
|
||||||
prepare_journal_sector_write(journal, cur_sector, sqe[s++], cb);
|
assert(s == space_check.sectors_required);
|
||||||
|
if (cur_sector == -1)
|
||||||
|
PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector;
|
||||||
PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
|
PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
|
||||||
PRIV(op)->pending_ops = s;
|
PRIV(op)->pending_ops = s;
|
||||||
PRIV(op)->op_state = 1;
|
PRIV(op)->op_state = 1;
|
||||||
|
|
|
@ -120,21 +120,21 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
|
||||||
// Prepare and submit journal entries
|
// Prepare and submit journal entries
|
||||||
auto it = PRIV(op)->sync_big_writes.begin();
|
auto it = PRIV(op)->sync_big_writes.begin();
|
||||||
int s = 0, cur_sector = -1;
|
int s = 0, cur_sector = -1;
|
||||||
if ((journal_block_size - journal.in_sector_pos) < sizeof(journal_entry_big_write) &&
|
|
||||||
journal.sector_info[journal.cur_sector].dirty)
|
|
||||||
{
|
|
||||||
PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector;
|
|
||||||
prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb);
|
|
||||||
cur_sector = ((journal.cur_sector + 1) % journal.sector_count);
|
|
||||||
}
|
|
||||||
while (it != PRIV(op)->sync_big_writes.end())
|
while (it != PRIV(op)->sync_big_writes.end())
|
||||||
{
|
{
|
||||||
|
if (!journal.entry_fits(sizeof(journal_entry_big_write)) &&
|
||||||
|
journal.sector_info[journal.cur_sector].dirty)
|
||||||
|
{
|
||||||
|
if (cur_sector == -1)
|
||||||
|
PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector;
|
||||||
|
prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb);
|
||||||
|
cur_sector = journal.cur_sector;
|
||||||
|
}
|
||||||
journal_entry_big_write *je = (journal_entry_big_write*)prefill_single_journal_entry(
|
journal_entry_big_write *je = (journal_entry_big_write*)prefill_single_journal_entry(
|
||||||
journal, (dirty_db[*it].state & BS_ST_INSTANT) ? JE_BIG_WRITE_INSTANT : JE_BIG_WRITE,
|
journal, (dirty_db[*it].state & BS_ST_INSTANT) ? JE_BIG_WRITE_INSTANT : JE_BIG_WRITE,
|
||||||
sizeof(journal_entry_big_write)
|
sizeof(journal_entry_big_write)
|
||||||
);
|
);
|
||||||
dirty_db[*it].journal_sector = journal.sector_info[journal.cur_sector].offset;
|
dirty_db[*it].journal_sector = journal.sector_info[journal.cur_sector].offset;
|
||||||
journal.sector_info[journal.cur_sector].dirty = false;
|
|
||||||
journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++;
|
journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++;
|
||||||
#ifdef BLOCKSTORE_DEBUG
|
#ifdef BLOCKSTORE_DEBUG
|
||||||
printf(
|
printf(
|
||||||
|
@ -151,19 +151,11 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
|
||||||
je->crc32 = je_crc32((journal_entry*)je);
|
je->crc32 = je_crc32((journal_entry*)je);
|
||||||
journal.crc32_last = je->crc32;
|
journal.crc32_last = je->crc32;
|
||||||
it++;
|
it++;
|
||||||
if (cur_sector != journal.cur_sector)
|
|
||||||
{
|
|
||||||
// Write the previous sector. We should write the sector only after filling it,
|
|
||||||
// because otherwise we'll write a lot more sectors in the "no_same_sector_overwrite" mode
|
|
||||||
if (cur_sector != -1)
|
|
||||||
prepare_journal_sector_write(journal, cur_sector, sqe[s++], cb);
|
|
||||||
else
|
|
||||||
PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector;
|
|
||||||
cur_sector = journal.cur_sector;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (cur_sector != -1)
|
prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb);
|
||||||
prepare_journal_sector_write(journal, cur_sector, sqe[s++], cb);
|
assert(s == space_check.sectors_required);
|
||||||
|
if (cur_sector == -1)
|
||||||
|
PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector;
|
||||||
PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
|
PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
|
||||||
PRIV(op)->pending_ops = s;
|
PRIV(op)->pending_ops = s;
|
||||||
PRIV(op)->op_state = SYNC_JOURNAL_WRITE_SENT;
|
PRIV(op)->op_state = SYNC_JOURNAL_WRITE_SENT;
|
||||||
|
|
|
@ -377,7 +377,6 @@ resume_2:
|
||||||
sizeof(journal_entry_big_write)
|
sizeof(journal_entry_big_write)
|
||||||
);
|
);
|
||||||
dirty_it->second.journal_sector = journal.sector_info[journal.cur_sector].offset;
|
dirty_it->second.journal_sector = journal.sector_info[journal.cur_sector].offset;
|
||||||
journal.sector_info[journal.cur_sector].dirty = false;
|
|
||||||
journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++;
|
journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++;
|
||||||
#ifdef BLOCKSTORE_DEBUG
|
#ifdef BLOCKSTORE_DEBUG
|
||||||
printf(
|
printf(
|
||||||
|
|
Loading…
Reference in New Issue