diff --git a/src/blockstore.h b/src/blockstore.h index 413d1166..265372c5 100644 --- a/src/blockstore.h +++ b/src/blockstore.h @@ -107,7 +107,7 @@ Input: - buf = pre-allocated obj_ver_id array units long Output: -- retval = 0 or negative error number (-EINVAL, -ENOENT if no such version or -EBUSY if not synced) +- retval = 0 or negative error number (-ENOENT if no such version for stabilize) ## BS_OP_SYNC_STAB_ALL diff --git a/src/blockstore_impl.cpp b/src/blockstore_impl.cpp index 2f37de91..32952740 100644 --- a/src/blockstore_impl.cpp +++ b/src/blockstore_impl.cpp @@ -171,7 +171,7 @@ void blockstore_impl_t::loop() // Can't submit SYNC before previous writes continue; } - wr_st = continue_sync(op, false); + wr_st = continue_sync(op); if (wr_st != 2) { has_writes = wr_st > 0 ? 1 : 2; @@ -371,13 +371,18 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op) ringloop->set_immediate([op]() { std::function(op->callback)(op); }); return; } + init_op(op); + submit_queue.push_back(op); + ringloop->wakeup(); +} + +void blockstore_impl_t::init_op(blockstore_op_t *op) +{ // Call constructor without allocating memory. We'll call destructor before returning op back new ((void*)op->private_data) blockstore_op_private_t; PRIV(op)->wait_for = 0; PRIV(op)->op_state = 0; PRIV(op)->pending_ops = 0; - submit_queue.push_back(op); - ringloop->wakeup(); } static bool replace_stable(object_id oid, uint64_t version, int search_start, int search_end, obj_ver_id* list) diff --git a/src/blockstore_impl.h b/src/blockstore_impl.h index 6541fa22..c7d7d6ef 100644 --- a/src/blockstore_impl.h +++ b/src/blockstore_impl.h @@ -216,6 +216,11 @@ struct pool_shard_settings_t uint32_t pg_stripe_size; }; +#define STAB_SPLIT_DONE 1 +#define STAB_SPLIT_WAIT 2 +#define STAB_SPLIT_SYNC 3 +#define STAB_SPLIT_TODO 4 + class blockstore_impl_t { blockstore_disk_t dsk; @@ -298,6 +303,7 @@ class blockstore_impl_t blockstore_init_journal* journal_init_reader; void check_wait(blockstore_op_t *op); + void init_op(blockstore_op_t *op); // Read int dequeue_read(blockstore_op_t *read_op); @@ -317,7 +323,7 @@ class blockstore_impl_t void handle_write_event(ring_data_t *data, blockstore_op_t *op); // Sync - int continue_sync(blockstore_op_t *op, bool queue_has_in_progress_sync); + int continue_sync(blockstore_op_t *op); void ack_sync(blockstore_op_t *op); // Stabilize @@ -325,6 +331,8 @@ class blockstore_impl_t int continue_stable(blockstore_op_t *op); void mark_stable(const obj_ver_id & ov, bool forget_dirty = false); void stabilize_object(object_id oid, uint64_t max_ver); + blockstore_op_t* selective_sync(blockstore_op_t *op); + int split_stab_op(blockstore_op_t *op, std::function decider); // Rollback int dequeue_rollback(blockstore_op_t *op); diff --git a/src/blockstore_rollback.cpp b/src/blockstore_rollback.cpp index c5199ac8..d0db8a99 100644 --- a/src/blockstore_rollback.cpp +++ b/src/blockstore_rollback.cpp @@ -9,48 +9,39 @@ int blockstore_impl_t::dequeue_rollback(blockstore_op_t *op) { return continue_rollback(op); } - obj_ver_id *v, *nv; - int i, todo = op->len; - for (i = 0, v = (obj_ver_id*)op->buf, nv = (obj_ver_id*)op->buf; i < op->len; i++, v++, nv++) + int r = split_stab_op(op, [this](obj_ver_id ov) { - if (nv != v) - { - *nv = *v; - } // Check that there are some versions greater than v->version (which may be zero), // check that they're unstable, synced, and not currently written to auto dirty_it = dirty_db.lower_bound((obj_ver_id){ - .oid = v->oid, + .oid = ov.oid, .version = UINT64_MAX, }); if (dirty_it == dirty_db.begin()) { -skip_ov: // Already rolled back, skip this object version - todo--; - nv--; - continue; + return STAB_SPLIT_DONE; } else { dirty_it--; - if (dirty_it->first.oid != v->oid || dirty_it->first.version < v->version) + if (dirty_it->first.oid != ov.oid || dirty_it->first.version < ov.version) { - goto skip_ov; + // Already rolled back, skip this object version + return STAB_SPLIT_DONE; } - while (dirty_it->first.oid == v->oid && dirty_it->first.version > v->version) + while (dirty_it->first.oid == ov.oid && dirty_it->first.version > ov.version) { if (IS_IN_FLIGHT(dirty_it->second.state)) { // Object write is still in progress. Wait until the write request completes - return 0; + return STAB_SPLIT_WAIT; } else if (!IS_SYNCED(dirty_it->second.state) || IS_STABLE(dirty_it->second.state)) { - op->retval = -EBUSY; - FINISH_OP(op); - return 2; + // Sync the object + return STAB_SPLIT_SYNC; } if (dirty_it == dirty_db.begin()) { @@ -58,19 +49,16 @@ skip_ov: } dirty_it--; } + return STAB_SPLIT_TODO; } - } - op->len = todo; - if (!todo) + }); + if (r != 1) { - // Already rolled back - op->retval = 0; - FINISH_OP(op); - return 2; + return r; } // Check journal space blockstore_journal_check_t space_check(this); - if (!space_check.check_available(op, todo, sizeof(journal_entry_rollback), 0)) + if (!space_check.check_available(op, op->len, sizeof(journal_entry_rollback), 0)) { return 0; } @@ -78,7 +66,8 @@ skip_ov: BS_SUBMIT_CHECK_SQES(space_check.sectors_to_write); // Prepare and submit journal entries int s = 0; - for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++) + auto v = (obj_ver_id*)op->buf; + for (int i = 0; i < op->len; i++, v++) { if (!journal.entry_fits(sizeof(journal_entry_rollback)) && journal.sector_info[journal.cur_sector].dirty) diff --git a/src/blockstore_stable.cpp b/src/blockstore_stable.cpp index 34be29ec..4b6f52c1 100644 --- a/src/blockstore_stable.cpp +++ b/src/blockstore_stable.cpp @@ -41,60 +41,309 @@ // 4) after a while it takes his synced object list and sends stabilize requests // to peers and to its own blockstore, thus freeing the old version -int blockstore_impl_t::dequeue_stable(blockstore_op_t *op) +struct ver_vector_t { - if (PRIV(op)->op_state) + obj_ver_id *items = NULL; + uint64_t alloc = 0, size = 0; +}; + +static void init_versions(ver_vector_t & vec, obj_ver_id *start, obj_ver_id *end, uint64_t len) +{ + if (!vec.items) { - return continue_stable(op); + vec.alloc = len; + vec.items = (obj_ver_id*)malloc_or_die(sizeof(obj_ver_id) * vec.alloc); + for (auto sv = start; sv < end; sv++) + { + vec.items[vec.size++] = *sv; + } } +} + +static void append_version(ver_vector_t & vec, obj_ver_id ov) +{ + if (vec.size >= vec.alloc) + { + vec.alloc = !vec.alloc ? 4 : vec.alloc*2; + vec.items = (obj_ver_id*)realloc_or_die(vec.items, sizeof(obj_ver_id) * vec.alloc); + } + vec.items[vec.size++] = ov; +} + +static bool check_unsynced(std::vector & check, obj_ver_id ov, std::vector & to, int *count) +{ + bool found = false; + int j = 0, k = 0; + while (j < check.size()) + { + if (check[j] == ov) + found = true; + if (check[j].oid == ov.oid && check[j].version <= ov.version) + { + to.push_back(check[j++]); + if (count) + (*count)--; + } + else + check[k++] = check[j++]; + } + check.resize(k); + return found; +} + +blockstore_op_t* blockstore_impl_t::selective_sync(blockstore_op_t *op) +{ + unsynced_big_write_count -= unsynced_big_writes.size(); + unsynced_big_writes.swap(PRIV(op)->sync_big_writes); + unsynced_big_write_count += unsynced_big_writes.size(); + unsynced_small_writes.swap(PRIV(op)->sync_small_writes); + // Create a sync operation, insert into the end of the queue + // And move ourselves into the end too! + // Rather hacky but that's what we need... + blockstore_op_t *sync_op = new blockstore_op_t; + sync_op->opcode = BS_OP_SYNC; + sync_op->buf = NULL; + sync_op->callback = [this](blockstore_op_t *sync_op) + { + delete sync_op; + }; + init_op(sync_op); + int sync_res = continue_sync(sync_op); + if (sync_res != 2) + { + // Put SYNC into the queue if it's not finished yet + submit_queue.push_back(sync_op); + } + // Restore unsynced_writes + unsynced_small_writes.swap(PRIV(op)->sync_small_writes); + unsynced_big_write_count -= unsynced_big_writes.size(); + unsynced_big_writes.swap(PRIV(op)->sync_big_writes); + unsynced_big_write_count += unsynced_big_writes.size(); + if (sync_res == 2) + { + // Sync is immediately completed + return NULL; + } + return sync_op; +} + +// Returns: 2 = stop processing and dequeue, 0 = stop processing and do not dequeue, 1 = proceed with op itself +int blockstore_impl_t::split_stab_op(blockstore_op_t *op, std::function decider) +{ + bool add_sync = false; + ver_vector_t good_vers, bad_vers; obj_ver_id* v; int i, todo = 0; for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++) { - auto dirty_it = dirty_db.find(*v); - if (dirty_it == dirty_db.end()) + int action = decider(*v); + if (action < 0) { - auto & clean_db = clean_db_shard(v->oid); - auto clean_it = clean_db.find(v->oid); - if (clean_it == clean_db.end() || clean_it->second.version < v->version) + // Rollback changes + for (auto & ov: PRIV(op)->sync_big_writes) { - // No such object version - op->retval = -ENOENT; - FINISH_OP(op); - return 2; + unsynced_big_writes.push_back(ov); + unsynced_big_write_count++; } - else + for (auto & ov: PRIV(op)->sync_small_writes) { - // Already stable + unsynced_small_writes.push_back(ov); } - } - else if (IS_IN_FLIGHT(dirty_it->second.state)) - { - // Object write is still in progress. Wait until the write request completes - return 0; - } - else if (!IS_SYNCED(dirty_it->second.state)) - { - // Object not synced yet. Caller must sync it first - op->retval = -EBUSY; + free(good_vers.items); + good_vers.items = NULL; + free(bad_vers.items); + bad_vers.items = NULL; + // Error + op->retval = action; FINISH_OP(op); return 2; } - else if (!IS_STABLE(dirty_it->second.state)) + else if (action == STAB_SPLIT_DONE) { + // Already done + init_versions(good_vers, (obj_ver_id*)op->buf, v, op->len); + } + else if (action == STAB_SPLIT_WAIT) + { + // Already in progress, we just have to wait until it finishes + init_versions(good_vers, (obj_ver_id*)op->buf, v, op->len); + append_version(bad_vers, *v); + } + else if (action == STAB_SPLIT_SYNC) + { + // Needs a SYNC, we have to send a SYNC if not already in progress + // + // If the object is not present in unsynced_(big|small)_writes then + // it's currently being synced. If it's present then we can initiate + // its sync ourselves. + init_versions(good_vers, (obj_ver_id*)op->buf, v, op->len); + append_version(bad_vers, *v); + if (!add_sync) + { + PRIV(op)->sync_big_writes.clear(); + PRIV(op)->sync_small_writes.clear(); + add_sync = true; + } + check_unsynced(unsynced_small_writes, *v, PRIV(op)->sync_small_writes, NULL); + check_unsynced(unsynced_big_writes, *v, PRIV(op)->sync_big_writes, &unsynced_big_write_count); + } + else /* if (action == STAB_SPLIT_TODO) */ + { + if (good_vers.items) + { + // If we're selecting versions then append it + // Main idea is that 99% of the time all versions passed to BS_OP_STABLE are synced + // And we don't want to select/allocate anything in that optimistic case + append_version(good_vers, *v); + } todo++; } } - if (!todo) + // In a pessimistic scenario, an operation may be split into 3: + // - Stabilize synced entries + // - Sync unsynced entries + // - Continue for unsynced entries after sync + add_sync = add_sync && (PRIV(op)->sync_big_writes.size() || PRIV(op)->sync_small_writes.size()); + if (!todo && !bad_vers.size) { // Already stable op->retval = 0; FINISH_OP(op); return 2; } + op->retval = 0; + if (!todo && !add_sync) + { + // Only wait for inflight writes or current in-progress syncs + return 0; + } + blockstore_op_t *sync_op = NULL, *split_stab_op = NULL; + if (add_sync) + { + // Initiate a selective sync for PRIV(op)->sync_(big|small)_writes + sync_op = selective_sync(op); + } + if (bad_vers.size) + { + // Split part of the request into a separate operation + split_stab_op = new blockstore_op_t; + split_stab_op->opcode = op->opcode; + split_stab_op->buf = bad_vers.items; + split_stab_op->len = bad_vers.size; + init_op(split_stab_op); + submit_queue.push_back(split_stab_op); + } + if (sync_op || split_stab_op || good_vers.items) + { + void *orig_buf = op->buf; + if (good_vers.items) + { + op->buf = good_vers.items; + op->len = good_vers.size; + } + // Make a wrapped callback + int *split_op_counter = (int*)malloc_or_die(sizeof(int)); + *split_op_counter = (sync_op ? 1 : 0) + (split_stab_op ? 1 : 0) + (todo ? 1 : 0); + auto cb = [this, op, good_items = good_vers.items, + bad_items = bad_vers.items, split_op_counter, + orig_buf, real_cb = op->callback](blockstore_op_t *split_op) + { + if (split_op->retval != 0) + op->retval = split_op->retval; + (*split_op_counter)--; + assert((*split_op_counter) >= 0); + if (op != split_op) + delete split_op; + if (!*split_op_counter) + { + free(good_items); + free(bad_items); + free(split_op_counter); + op->buf = orig_buf; + real_cb(op); + } + }; + if (sync_op) + { + sync_op->callback = cb; + } + if (split_stab_op) + { + split_stab_op->callback = cb; + } + op->callback = cb; + } + if (!todo) + { + // All work is postponed + op->callback = NULL; + return 2; + } + return 1; +} + +int blockstore_impl_t::dequeue_stable(blockstore_op_t *op) +{ + if (PRIV(op)->op_state) + { + return continue_stable(op); + } + int r = split_stab_op(op, [this](obj_ver_id ov) + { + auto dirty_it = dirty_db.find(ov); + if (dirty_it == dirty_db.end()) + { + auto & clean_db = clean_db_shard(ov.oid); + auto clean_it = clean_db.find(ov.oid); + if (clean_it == clean_db.end() || clean_it->second.version < ov.version) + { + // No such object version + printf("Error: %lx:%lx v%lu not found while stabilizing\n", ov.oid.inode, ov.oid.stripe, ov.version); + return -ENOENT; + } + else + { + // Already stable + return STAB_SPLIT_DONE; + } + } + else if (IS_IN_FLIGHT(dirty_it->second.state)) + { + // Object write is still in progress. Wait until the write request completes + return STAB_SPLIT_WAIT; + } + else if (!IS_SYNCED(dirty_it->second.state)) + { + // Object not synced yet - sync it + // In previous versions we returned EBUSY here and required + // the caller (OSD) to issue a global sync first. But a global sync + // waits for all writes in the queue including inflight writes. And + // inflight writes may themselves be blocked by unstable writes being + // still present in the journal and not flushed away from it. + // So we must sync specific objects here. + // + // Even more, we have to process "stabilize" request in parts. That is, + // we must stabilize all objects which are already synced. Otherwise + // they may block objects which are NOT synced yet. + return STAB_SPLIT_SYNC; + } + else if (IS_STABLE(dirty_it->second.state)) + { + // Already stable + return STAB_SPLIT_DONE; + } + else + { + return STAB_SPLIT_TODO; + } + }); + if (r != 1) + { + return r; + } // Check journal space blockstore_journal_check_t space_check(this); - if (!space_check.check_available(op, todo, sizeof(journal_entry_stable), 0)) + if (!space_check.check_available(op, op->len, sizeof(journal_entry_stable), 0)) { return 0; } @@ -102,9 +351,9 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op) BS_SUBMIT_CHECK_SQES(space_check.sectors_to_write); // Prepare and submit journal entries int s = 0; - for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++) + auto v = (obj_ver_id*)op->buf; + for (int i = 0; i < op->len; i++, v++) { - // FIXME: Only stabilize versions that aren't stable yet if (!journal.entry_fits(sizeof(journal_entry_stable)) && journal.sector_info[journal.cur_sector].dirty) { diff --git a/src/blockstore_sync.cpp b/src/blockstore_sync.cpp index ddacff16..85706091 100644 --- a/src/blockstore_sync.cpp +++ b/src/blockstore_sync.cpp @@ -12,7 +12,7 @@ #define SYNC_JOURNAL_SYNC_SENT 7 #define SYNC_DONE 8 -int blockstore_impl_t::continue_sync(blockstore_op_t *op, bool queue_has_in_progress_sync) +int blockstore_impl_t::continue_sync(blockstore_op_t *op) { if (immediate_commit == IMMEDIATE_ALL) { @@ -145,7 +145,7 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op, bool queue_has_in_prog PRIV(op)->op_state = SYNC_DONE; } } - if (PRIV(op)->op_state == SYNC_DONE && !queue_has_in_progress_sync) + if (PRIV(op)->op_state == SYNC_DONE) { ack_sync(op); return 2; diff --git a/src/object_id.h b/src/object_id.h index 0b46c1ea..ca768111 100644 --- a/src/object_id.h +++ b/src/object_id.h @@ -39,6 +39,11 @@ struct __attribute__((__packed__)) obj_ver_id uint64_t version; }; +inline bool operator == (const obj_ver_id & a, const obj_ver_id & b) +{ + return a.oid == b.oid && a.version == b.version; +} + inline bool operator < (const obj_ver_id & a, const obj_ver_id & b) { return a.oid < b.oid || a.oid == b.oid && a.version < b.version;