Selectively sync nonsynced objects on STABILIZE/ROLLBACK (fix for github issue #51)

epoch-deletions
Vitaliy Filippov 2023-04-06 01:33:39 +03:00
parent d06ed2b0e7
commit 0fbf4c6a08
7 changed files with 320 additions and 64 deletions

View File

@ -107,7 +107,7 @@ Input:
- buf = pre-allocated obj_ver_id array <len> units long
Output:
- retval = 0 or negative error number (-EINVAL, -ENOENT if no such version or -EBUSY if not synced)
- retval = 0 or negative error number (-ENOENT if no such version for stabilize)
## BS_OP_SYNC_STAB_ALL

View File

@ -171,7 +171,7 @@ void blockstore_impl_t::loop()
// Can't submit SYNC before previous writes
continue;
}
wr_st = continue_sync(op, false);
wr_st = continue_sync(op);
if (wr_st != 2)
{
has_writes = wr_st > 0 ? 1 : 2;
@ -371,13 +371,18 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op)
ringloop->set_immediate([op]() { std::function<void (blockstore_op_t*)>(op->callback)(op); });
return;
}
init_op(op);
submit_queue.push_back(op);
ringloop->wakeup();
}
void blockstore_impl_t::init_op(blockstore_op_t *op)
{
// Call constructor without allocating memory. We'll call destructor before returning op back
new ((void*)op->private_data) blockstore_op_private_t;
PRIV(op)->wait_for = 0;
PRIV(op)->op_state = 0;
PRIV(op)->pending_ops = 0;
submit_queue.push_back(op);
ringloop->wakeup();
}
static bool replace_stable(object_id oid, uint64_t version, int search_start, int search_end, obj_ver_id* list)

View File

@ -216,6 +216,11 @@ struct pool_shard_settings_t
uint32_t pg_stripe_size;
};
#define STAB_SPLIT_DONE 1
#define STAB_SPLIT_WAIT 2
#define STAB_SPLIT_SYNC 3
#define STAB_SPLIT_TODO 4
class blockstore_impl_t
{
blockstore_disk_t dsk;
@ -298,6 +303,7 @@ class blockstore_impl_t
blockstore_init_journal* journal_init_reader;
void check_wait(blockstore_op_t *op);
void init_op(blockstore_op_t *op);
// Read
int dequeue_read(blockstore_op_t *read_op);
@ -317,7 +323,7 @@ class blockstore_impl_t
void handle_write_event(ring_data_t *data, blockstore_op_t *op);
// Sync
int continue_sync(blockstore_op_t *op, bool queue_has_in_progress_sync);
int continue_sync(blockstore_op_t *op);
void ack_sync(blockstore_op_t *op);
// Stabilize
@ -325,6 +331,8 @@ class blockstore_impl_t
int continue_stable(blockstore_op_t *op);
void mark_stable(const obj_ver_id & ov, bool forget_dirty = false);
void stabilize_object(object_id oid, uint64_t max_ver);
blockstore_op_t* selective_sync(blockstore_op_t *op);
int split_stab_op(blockstore_op_t *op, std::function<int(obj_ver_id v)> decider);
// Rollback
int dequeue_rollback(blockstore_op_t *op);

View File

@ -9,48 +9,39 @@ int blockstore_impl_t::dequeue_rollback(blockstore_op_t *op)
{
return continue_rollback(op);
}
obj_ver_id *v, *nv;
int i, todo = op->len;
for (i = 0, v = (obj_ver_id*)op->buf, nv = (obj_ver_id*)op->buf; i < op->len; i++, v++, nv++)
int r = split_stab_op(op, [this](obj_ver_id ov)
{
if (nv != v)
{
*nv = *v;
}
// Check that there are some versions greater than v->version (which may be zero),
// check that they're unstable, synced, and not currently written to
auto dirty_it = dirty_db.lower_bound((obj_ver_id){
.oid = v->oid,
.oid = ov.oid,
.version = UINT64_MAX,
});
if (dirty_it == dirty_db.begin())
{
skip_ov:
// Already rolled back, skip this object version
todo--;
nv--;
continue;
return STAB_SPLIT_DONE;
}
else
{
dirty_it--;
if (dirty_it->first.oid != v->oid || dirty_it->first.version < v->version)
if (dirty_it->first.oid != ov.oid || dirty_it->first.version < ov.version)
{
goto skip_ov;
// Already rolled back, skip this object version
return STAB_SPLIT_DONE;
}
while (dirty_it->first.oid == v->oid && dirty_it->first.version > v->version)
while (dirty_it->first.oid == ov.oid && dirty_it->first.version > ov.version)
{
if (IS_IN_FLIGHT(dirty_it->second.state))
{
// Object write is still in progress. Wait until the write request completes
return 0;
return STAB_SPLIT_WAIT;
}
else if (!IS_SYNCED(dirty_it->second.state) ||
IS_STABLE(dirty_it->second.state))
{
op->retval = -EBUSY;
FINISH_OP(op);
return 2;
// Sync the object
return STAB_SPLIT_SYNC;
}
if (dirty_it == dirty_db.begin())
{
@ -58,19 +49,16 @@ skip_ov:
}
dirty_it--;
}
return STAB_SPLIT_TODO;
}
}
op->len = todo;
if (!todo)
});
if (r != 1)
{
// Already rolled back
op->retval = 0;
FINISH_OP(op);
return 2;
return r;
}
// Check journal space
blockstore_journal_check_t space_check(this);
if (!space_check.check_available(op, todo, sizeof(journal_entry_rollback), 0))
if (!space_check.check_available(op, op->len, sizeof(journal_entry_rollback), 0))
{
return 0;
}
@ -78,7 +66,8 @@ skip_ov:
BS_SUBMIT_CHECK_SQES(space_check.sectors_to_write);
// Prepare and submit journal entries
int s = 0;
for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++)
auto v = (obj_ver_id*)op->buf;
for (int i = 0; i < op->len; i++, v++)
{
if (!journal.entry_fits(sizeof(journal_entry_rollback)) &&
journal.sector_info[journal.cur_sector].dirty)

View File

@ -41,60 +41,309 @@
// 4) after a while it takes his synced object list and sends stabilize requests
// to peers and to its own blockstore, thus freeing the old version
int blockstore_impl_t::dequeue_stable(blockstore_op_t *op)
struct ver_vector_t
{
if (PRIV(op)->op_state)
obj_ver_id *items = NULL;
uint64_t alloc = 0, size = 0;
};
static void init_versions(ver_vector_t & vec, obj_ver_id *start, obj_ver_id *end, uint64_t len)
{
if (!vec.items)
{
return continue_stable(op);
vec.alloc = len;
vec.items = (obj_ver_id*)malloc_or_die(sizeof(obj_ver_id) * vec.alloc);
for (auto sv = start; sv < end; sv++)
{
vec.items[vec.size++] = *sv;
}
}
}
static void append_version(ver_vector_t & vec, obj_ver_id ov)
{
if (vec.size >= vec.alloc)
{
vec.alloc = !vec.alloc ? 4 : vec.alloc*2;
vec.items = (obj_ver_id*)realloc_or_die(vec.items, sizeof(obj_ver_id) * vec.alloc);
}
vec.items[vec.size++] = ov;
}
static bool check_unsynced(std::vector<obj_ver_id> & check, obj_ver_id ov, std::vector<obj_ver_id> & to, int *count)
{
bool found = false;
int j = 0, k = 0;
while (j < check.size())
{
if (check[j] == ov)
found = true;
if (check[j].oid == ov.oid && check[j].version <= ov.version)
{
to.push_back(check[j++]);
if (count)
(*count)--;
}
else
check[k++] = check[j++];
}
check.resize(k);
return found;
}
blockstore_op_t* blockstore_impl_t::selective_sync(blockstore_op_t *op)
{
unsynced_big_write_count -= unsynced_big_writes.size();
unsynced_big_writes.swap(PRIV(op)->sync_big_writes);
unsynced_big_write_count += unsynced_big_writes.size();
unsynced_small_writes.swap(PRIV(op)->sync_small_writes);
// Create a sync operation, insert into the end of the queue
// And move ourselves into the end too!
// Rather hacky but that's what we need...
blockstore_op_t *sync_op = new blockstore_op_t;
sync_op->opcode = BS_OP_SYNC;
sync_op->buf = NULL;
sync_op->callback = [this](blockstore_op_t *sync_op)
{
delete sync_op;
};
init_op(sync_op);
int sync_res = continue_sync(sync_op);
if (sync_res != 2)
{
// Put SYNC into the queue if it's not finished yet
submit_queue.push_back(sync_op);
}
// Restore unsynced_writes
unsynced_small_writes.swap(PRIV(op)->sync_small_writes);
unsynced_big_write_count -= unsynced_big_writes.size();
unsynced_big_writes.swap(PRIV(op)->sync_big_writes);
unsynced_big_write_count += unsynced_big_writes.size();
if (sync_res == 2)
{
// Sync is immediately completed
return NULL;
}
return sync_op;
}
// Returns: 2 = stop processing and dequeue, 0 = stop processing and do not dequeue, 1 = proceed with op itself
int blockstore_impl_t::split_stab_op(blockstore_op_t *op, std::function<int(obj_ver_id v)> decider)
{
bool add_sync = false;
ver_vector_t good_vers, bad_vers;
obj_ver_id* v;
int i, todo = 0;
for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++)
{
auto dirty_it = dirty_db.find(*v);
if (dirty_it == dirty_db.end())
int action = decider(*v);
if (action < 0)
{
auto & clean_db = clean_db_shard(v->oid);
auto clean_it = clean_db.find(v->oid);
if (clean_it == clean_db.end() || clean_it->second.version < v->version)
// Rollback changes
for (auto & ov: PRIV(op)->sync_big_writes)
{
// No such object version
op->retval = -ENOENT;
FINISH_OP(op);
return 2;
unsynced_big_writes.push_back(ov);
unsynced_big_write_count++;
}
else
for (auto & ov: PRIV(op)->sync_small_writes)
{
// Already stable
unsynced_small_writes.push_back(ov);
}
}
else if (IS_IN_FLIGHT(dirty_it->second.state))
{
// Object write is still in progress. Wait until the write request completes
return 0;
}
else if (!IS_SYNCED(dirty_it->second.state))
{
// Object not synced yet. Caller must sync it first
op->retval = -EBUSY;
free(good_vers.items);
good_vers.items = NULL;
free(bad_vers.items);
bad_vers.items = NULL;
// Error
op->retval = action;
FINISH_OP(op);
return 2;
}
else if (!IS_STABLE(dirty_it->second.state))
else if (action == STAB_SPLIT_DONE)
{
// Already done
init_versions(good_vers, (obj_ver_id*)op->buf, v, op->len);
}
else if (action == STAB_SPLIT_WAIT)
{
// Already in progress, we just have to wait until it finishes
init_versions(good_vers, (obj_ver_id*)op->buf, v, op->len);
append_version(bad_vers, *v);
}
else if (action == STAB_SPLIT_SYNC)
{
// Needs a SYNC, we have to send a SYNC if not already in progress
//
// If the object is not present in unsynced_(big|small)_writes then
// it's currently being synced. If it's present then we can initiate
// its sync ourselves.
init_versions(good_vers, (obj_ver_id*)op->buf, v, op->len);
append_version(bad_vers, *v);
if (!add_sync)
{
PRIV(op)->sync_big_writes.clear();
PRIV(op)->sync_small_writes.clear();
add_sync = true;
}
check_unsynced(unsynced_small_writes, *v, PRIV(op)->sync_small_writes, NULL);
check_unsynced(unsynced_big_writes, *v, PRIV(op)->sync_big_writes, &unsynced_big_write_count);
}
else /* if (action == STAB_SPLIT_TODO) */
{
if (good_vers.items)
{
// If we're selecting versions then append it
// Main idea is that 99% of the time all versions passed to BS_OP_STABLE are synced
// And we don't want to select/allocate anything in that optimistic case
append_version(good_vers, *v);
}
todo++;
}
}
if (!todo)
// In a pessimistic scenario, an operation may be split into 3:
// - Stabilize synced entries
// - Sync unsynced entries
// - Continue for unsynced entries after sync
add_sync = add_sync && (PRIV(op)->sync_big_writes.size() || PRIV(op)->sync_small_writes.size());
if (!todo && !bad_vers.size)
{
// Already stable
op->retval = 0;
FINISH_OP(op);
return 2;
}
op->retval = 0;
if (!todo && !add_sync)
{
// Only wait for inflight writes or current in-progress syncs
return 0;
}
blockstore_op_t *sync_op = NULL, *split_stab_op = NULL;
if (add_sync)
{
// Initiate a selective sync for PRIV(op)->sync_(big|small)_writes
sync_op = selective_sync(op);
}
if (bad_vers.size)
{
// Split part of the request into a separate operation
split_stab_op = new blockstore_op_t;
split_stab_op->opcode = op->opcode;
split_stab_op->buf = bad_vers.items;
split_stab_op->len = bad_vers.size;
init_op(split_stab_op);
submit_queue.push_back(split_stab_op);
}
if (sync_op || split_stab_op || good_vers.items)
{
void *orig_buf = op->buf;
if (good_vers.items)
{
op->buf = good_vers.items;
op->len = good_vers.size;
}
// Make a wrapped callback
int *split_op_counter = (int*)malloc_or_die(sizeof(int));
*split_op_counter = (sync_op ? 1 : 0) + (split_stab_op ? 1 : 0) + (todo ? 1 : 0);
auto cb = [this, op, good_items = good_vers.items,
bad_items = bad_vers.items, split_op_counter,
orig_buf, real_cb = op->callback](blockstore_op_t *split_op)
{
if (split_op->retval != 0)
op->retval = split_op->retval;
(*split_op_counter)--;
assert((*split_op_counter) >= 0);
if (op != split_op)
delete split_op;
if (!*split_op_counter)
{
free(good_items);
free(bad_items);
free(split_op_counter);
op->buf = orig_buf;
real_cb(op);
}
};
if (sync_op)
{
sync_op->callback = cb;
}
if (split_stab_op)
{
split_stab_op->callback = cb;
}
op->callback = cb;
}
if (!todo)
{
// All work is postponed
op->callback = NULL;
return 2;
}
return 1;
}
int blockstore_impl_t::dequeue_stable(blockstore_op_t *op)
{
if (PRIV(op)->op_state)
{
return continue_stable(op);
}
int r = split_stab_op(op, [this](obj_ver_id ov)
{
auto dirty_it = dirty_db.find(ov);
if (dirty_it == dirty_db.end())
{
auto & clean_db = clean_db_shard(ov.oid);
auto clean_it = clean_db.find(ov.oid);
if (clean_it == clean_db.end() || clean_it->second.version < ov.version)
{
// No such object version
printf("Error: %lx:%lx v%lu not found while stabilizing\n", ov.oid.inode, ov.oid.stripe, ov.version);
return -ENOENT;
}
else
{
// Already stable
return STAB_SPLIT_DONE;
}
}
else if (IS_IN_FLIGHT(dirty_it->second.state))
{
// Object write is still in progress. Wait until the write request completes
return STAB_SPLIT_WAIT;
}
else if (!IS_SYNCED(dirty_it->second.state))
{
// Object not synced yet - sync it
// In previous versions we returned EBUSY here and required
// the caller (OSD) to issue a global sync first. But a global sync
// waits for all writes in the queue including inflight writes. And
// inflight writes may themselves be blocked by unstable writes being
// still present in the journal and not flushed away from it.
// So we must sync specific objects here.
//
// Even more, we have to process "stabilize" request in parts. That is,
// we must stabilize all objects which are already synced. Otherwise
// they may block objects which are NOT synced yet.
return STAB_SPLIT_SYNC;
}
else if (IS_STABLE(dirty_it->second.state))
{
// Already stable
return STAB_SPLIT_DONE;
}
else
{
return STAB_SPLIT_TODO;
}
});
if (r != 1)
{
return r;
}
// Check journal space
blockstore_journal_check_t space_check(this);
if (!space_check.check_available(op, todo, sizeof(journal_entry_stable), 0))
if (!space_check.check_available(op, op->len, sizeof(journal_entry_stable), 0))
{
return 0;
}
@ -102,9 +351,9 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op)
BS_SUBMIT_CHECK_SQES(space_check.sectors_to_write);
// Prepare and submit journal entries
int s = 0;
for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++)
auto v = (obj_ver_id*)op->buf;
for (int i = 0; i < op->len; i++, v++)
{
// FIXME: Only stabilize versions that aren't stable yet
if (!journal.entry_fits(sizeof(journal_entry_stable)) &&
journal.sector_info[journal.cur_sector].dirty)
{

View File

@ -12,7 +12,7 @@
#define SYNC_JOURNAL_SYNC_SENT 7
#define SYNC_DONE 8
int blockstore_impl_t::continue_sync(blockstore_op_t *op, bool queue_has_in_progress_sync)
int blockstore_impl_t::continue_sync(blockstore_op_t *op)
{
if (immediate_commit == IMMEDIATE_ALL)
{
@ -145,7 +145,7 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op, bool queue_has_in_prog
PRIV(op)->op_state = SYNC_DONE;
}
}
if (PRIV(op)->op_state == SYNC_DONE && !queue_has_in_progress_sync)
if (PRIV(op)->op_state == SYNC_DONE)
{
ack_sync(op);
return 2;

View File

@ -39,6 +39,11 @@ struct __attribute__((__packed__)) obj_ver_id
uint64_t version;
};
inline bool operator == (const obj_ver_id & a, const obj_ver_id & b)
{
return a.oid == b.oid && a.version == b.version;
}
inline bool operator < (const obj_ver_id & a, const obj_ver_id & b)
{
return a.oid < b.oid || a.oid == b.oid && a.version < b.version;