forked from vitalif/vitastor
Block writes earlier than sync/stabilize would be blocked, too
parent
b3f2102f33
commit
4b05bde3a2
|
@ -32,7 +32,12 @@ bool blockstore_t::is_safe_to_stop()
|
|||
|
||||
void blockstore_t::enqueue_op(blockstore_op_t *op)
|
||||
{
|
||||
impl->enqueue_op(op);
|
||||
impl->enqueue_op(op, false);
|
||||
}
|
||||
|
||||
void blockstore_t::enqueue_op_first(blockstore_op_t *op)
|
||||
{
|
||||
impl->enqueue_op(op, true);
|
||||
}
|
||||
|
||||
std::map<object_id, uint64_t> & blockstore_t::get_unstable_writes()
|
||||
|
|
|
@ -93,6 +93,10 @@ public:
|
|||
// Submission
|
||||
void enqueue_op(blockstore_op_t *op);
|
||||
|
||||
// Insert operation into the beginning of the queue
|
||||
// Intended for the OSD syncer "thread" to be able to stabilize something when the journal is full
|
||||
void enqueue_op_first(blockstore_op_t *op);
|
||||
|
||||
// Unstable writes are added here (map of object_id -> version)
|
||||
std::map<object_id, uint64_t> & get_unstable_writes();
|
||||
|
||||
|
|
|
@ -289,12 +289,17 @@ void blockstore_impl_t::check_wait(blockstore_op_t *op)
|
|||
}
|
||||
}
|
||||
|
||||
void blockstore_impl_t::enqueue_op(blockstore_op_t *op)
|
||||
void blockstore_impl_t::enqueue_op(blockstore_op_t *op, bool first)
|
||||
{
|
||||
int type = op->opcode & BS_OP_TYPE_MASK;
|
||||
if (type < BS_OP_MIN || type > BS_OP_MAX || (type == BS_OP_READ || type == BS_OP_WRITE) &&
|
||||
(op->offset >= block_size || op->len > block_size-op->offset || (op->len % DISK_ALIGNMENT)) ||
|
||||
readonly && type != BS_OP_READ)
|
||||
if (type < BS_OP_MIN || type > BS_OP_MAX ||
|
||||
((type == BS_OP_READ || type == BS_OP_WRITE) && (
|
||||
op->offset >= block_size ||
|
||||
op->len > block_size-op->offset ||
|
||||
(op->len % DISK_ALIGNMENT)
|
||||
)) ||
|
||||
readonly && type != BS_OP_READ ||
|
||||
first && type == BS_OP_WRITE)
|
||||
{
|
||||
// Basic verification not passed
|
||||
op->retval = -EINVAL;
|
||||
|
@ -313,7 +318,14 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op)
|
|||
PRIV(op)->wait_for = 0;
|
||||
PRIV(op)->sync_state = 0;
|
||||
PRIV(op)->pending_ops = 0;
|
||||
submit_queue.push_back(op);
|
||||
if (!first)
|
||||
{
|
||||
submit_queue.push_back(op);
|
||||
}
|
||||
else
|
||||
{
|
||||
submit_queue.push_front(op);
|
||||
}
|
||||
if (type == BS_OP_WRITE)
|
||||
{
|
||||
enqueue_write(op);
|
||||
|
|
|
@ -286,7 +286,7 @@ public:
|
|||
bool is_stalled();
|
||||
|
||||
// Submission
|
||||
void enqueue_op(blockstore_op_t *op);
|
||||
void enqueue_op(blockstore_op_t *op, bool first = false);
|
||||
|
||||
// Unstable writes are added here (map of object_id -> version)
|
||||
std::map<object_id, uint64_t> unstable_writes;
|
||||
|
|
|
@ -6,6 +6,9 @@
|
|||
#define JOURNAL_MAGIC 0x4A33
|
||||
#define JOURNAL_BUFFER_SIZE 4*1024*1024
|
||||
|
||||
// We reserve some extra space for future stabilize requests during writes
|
||||
#define JOURNAL_STABILIZE_RESERVATION 65536
|
||||
|
||||
// Journal entries
|
||||
// Journal entries are linked to each other by their crc32 value
|
||||
// The journal is almost a blockchain, because object versions constantly increase
|
||||
|
|
|
@ -79,6 +79,11 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
|
|||
});
|
||||
if (dirty_it->second.state == ST_D_IN_FLIGHT)
|
||||
{
|
||||
blockstore_journal_check_t space_check(this);
|
||||
if (!space_check.check_available(op, unsynced_big_writes.size() + 1, sizeof(journal_entry_big_write), JOURNAL_STABILIZE_RESERVATION))
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
// Big (redirect) write
|
||||
uint64_t loc = data_alloc->find_free();
|
||||
if (loc == UINT64_MAX)
|
||||
|
@ -137,12 +142,9 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
|
|||
// Small (journaled) write
|
||||
// First check if the journal has sufficient space
|
||||
// FIXME Always two SQEs for now. Although it's possible to send 1 sometimes
|
||||
//two_sqes = (JOURNAL_BLOCK_SIZE - journal.in_sector_pos < sizeof(struct journal_entry_small_write)
|
||||
// ? (journal.len - next_pos < op->len)
|
||||
// : (journal.sector_info[journal.cur_sector].offset + JOURNAL_BLOCK_SIZE != journal.next_free ||
|
||||
// journal.len - next_pos < op->len);
|
||||
blockstore_journal_check_t space_check(this);
|
||||
if (!space_check.check_available(op, 1, sizeof(journal_entry_small_write), op->len))
|
||||
if (unsynced_big_writes.size() && !space_check.check_available(op, unsynced_big_writes.size(), sizeof(journal_entry_big_write), 0)
|
||||
|| !space_check.check_available(op, 1, sizeof(journal_entry_small_write), op->len + JOURNAL_STABILIZE_RESERVATION))
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue