diff --git a/blockstore.h b/blockstore.h index 8ac61235..0c83729e 100644 --- a/blockstore.h +++ b/blockstore.h @@ -195,11 +195,16 @@ public: struct blockstore_operation { std::function callback; + // flags contain operation type and possibly other flags uint32_t flags; + // For reads, writes & deletes: oid is the requested object object_id oid; // For reads: version=0 -> last stable, version=UINT64_MAX -> last unstable, version=X -> specific version + // For writes & deletes: a new version is assigned automatically uint64_t version; + // For reads & writes: offset & len are the requested part of the object, buf is the buffer uint32_t offset; + // For stabilize requests: buf contains obj_ver_id's to stabilize uint32_t len; uint8_t *buf; int retval; diff --git a/blockstore_stable.cpp b/blockstore_stable.cpp index ea2594be..64cca4e1 100644 --- a/blockstore_stable.cpp +++ b/blockstore_stable.cpp @@ -23,37 +23,48 @@ // WA = 1.012. Very good :) // AND We must do it in batches, for the sake of reduced fsync call count +// AND We must know what we stabilize. Basic workflow is like: +// 1) primary OSD receives sync request +// 2) it determines his own unsynced writes from blockstore's information +// just before submitting fsync +// 3) it submits syncs to blockstore and peers +// 4) after everyone acks sync it takes the object list and sends stabilize requests to everyone int blockstore::dequeue_stable(blockstore_operation *op) { - auto dirty_it = dirty_db.find((obj_ver_id){ - .oid = op->oid, - .version = op->version, - }); - if (dirty_it == dirty_db.end()) + obj_ver_id* v; + int i, todo = 0; + for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++) { - auto clean_it = clean_db.find(op->oid); - if (clean_it == clean_db.end() || clean_it->second.version < op->version) + auto dirty_it = dirty_db.find(*v); + if (dirty_it == dirty_db.end()) { - // No such object version - op->retval = EINVAL; + auto clean_it = clean_db.find(v->oid); + if (clean_it == clean_db.end() || clean_it->second.version < v->version) + { + // No such object version + op->retval = EINVAL; + op->callback(op); + return 1; + } + else + { + // Already stable + } } - else + else if (IS_UNSYNCED(dirty_it->second.state)) { - // Already stable - op->retval = 0; + // Object not synced yet. Caller must sync it first + op->retval = EAGAIN; + op->callback(op); + return 1; + } + else if (!IS_STABLE(dirty_it->second.state)) + { + todo++; } - op->callback(op); - return 1; } - else if (IS_UNSYNCED(dirty_it->second.state)) - { - // Object not synced yet. Caller must sync it first - op->retval = EAGAIN; - op->callback(op); - return 1; - } - else if (IS_STABLE(dirty_it->second.state)) + if (!todo) { // Already stable op->retval = 0; @@ -62,26 +73,43 @@ int blockstore::dequeue_stable(blockstore_operation *op) } // Check journal space blockstore_journal_check_t space_check(this); - if (!space_check.check_available(op, 1, sizeof(journal_entry_stable), 0)) + if (!space_check.check_available(op, todo, sizeof(journal_entry_stable), 0)) { return 0; } - // There is sufficient space. Get SQE - BS_SUBMIT_GET_SQE(sqe, data); - // Got SQE. Prepare journal sector write - journal_entry_stable *je = (journal_entry_stable*)prefill_single_journal_entry(journal, JE_STABLE, sizeof(struct journal_entry_stable)); - je->oid = op->oid; - je->version = op->version; - je->crc32 = je_crc32((journal_entry*)je); - journal.crc32_last = je->crc32; - data->iov = (struct iovec){ journal.sector_buf + 512*journal.cur_sector, 512 }; - data->op = op; - io_uring_prep_writev( - sqe, journal.fd, &data->iov, 1, journal.offset + journal.sector_info[journal.cur_sector].offset - ); - journal.sector_info[journal.cur_sector].usage_count++; - op->pending_ops = 1; - op->min_used_journal_sector = op->max_used_journal_sector = 1 + journal.cur_sector; + // There is sufficient space. Get SQEs + struct io_uring_sqe *sqe[space_check.sectors_required+1]; + for (int i = 0; i < space_check.sectors_required+1; i++) + { + BS_SUBMIT_GET_SQE_DECL(sqe[i]); + } + // Prepare and submit journal entries + op->min_used_journal_sector = 1 + journal.cur_sector; + int s = 0, cur_sector = -1; + for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++) + { + journal_entry_stable *je = (journal_entry_stable*) + prefill_single_journal_entry(journal, JE_STABLE, sizeof(journal_entry_stable)); + je->oid = v->oid; + je->version = v->version; + je->crc32 = je_crc32((journal_entry*)je); + journal.crc32_last = je->crc32; + if (cur_sector != journal.cur_sector) + { + cur_sector = journal.cur_sector; + // FIXME: Deduplicate this piece of code, too (something like write_journal) + journal.sector_info[journal.cur_sector].usage_count++; + struct ring_data_t *data = ((ring_data_t*)sqe[s]->user_data); + data->iov = (struct iovec){ journal.sector_buf + 512*journal.cur_sector, 512 }; + data->op = op; + io_uring_prep_writev( + sqe[s], journal.fd, &data->iov, 1, journal.offset + journal.sector_info[journal.cur_sector].offset + ); + s++; + } + } + op->pending_ops = s; + op->max_used_journal_sector = 1 + journal.cur_sector; return 1; } @@ -101,32 +129,35 @@ void blockstore::handle_stable_event(ring_data_t *data, blockstore_operation *op op->pending_ops--; if (op->pending_ops == 0) { - // Mark all dirty_db entries up to op->version as stable - auto dirty_it = dirty_db.find((obj_ver_id){ - .oid = op->oid, - .version = op->version, - }); - if (dirty_it->second.state == ST_J_SYNCED) + // First step: mark dirty_db entries as stable, acknowledge op completion + // FIXME: oops... we seem to have to copy object id/version pairs... + obj_ver_id* v; + int i; + for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++) { - dirty_it->second.state = ST_J_STABLE; + // Mark all dirty_db entries up to op->version as stable + auto dirty_it = dirty_db.find((obj_ver_id){ + .oid = v->oid, + .version = v->version, + }); + if (dirty_it != dirty_db.end()) + { + do + { + if (dirty_it->second.state == ST_J_SYNCED) + { + dirty_it->second.state = ST_J_STABLE; + } + else if (dirty_it->second.state == ST_D_META_SYNCED) + { + dirty_it->second.state = ST_D_STABLE; + } + dirty_it--; + } while (dirty_it != dirty_db.begin() && dirty_it->first.oid == v->oid); + } // Acknowledge op op->retval = 0; op->callback(op); } - else if (dirty_it->second.state == ST_D_META_SYNCED) - { - dirty_it->second.state = ST_D_STABLE; - // Acknowledge op - op->retval = 0; - op->callback(op); - } - else if (dirty_it->second.state == ST_J_STABLE) - { - - } - else if (dirty_it->second.state == ST_D_STABLE) - { - - } } } diff --git a/blockstore_write.cpp b/blockstore_write.cpp index 9b08c8c9..8a5511ad 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -35,6 +35,24 @@ void blockstore::enqueue_write(blockstore_operation *op) .offset = op->offset, .size = op->len, }); + // Remember write as unsynced here, so external consumers could get + // the list of dirty objects to sync just before issuing a SYNC request + if (op->len == block_size) + { + // Remember big write as unsynced + unsynced_big_writes.push_back((obj_ver_id){ + .oid = op->oid, + .version = op->version, + }); + } + else + { + // Remember small write as unsynced + unsynced_small_writes.push_back((obj_ver_id){ + .oid = op->oid, + .version = op->version, + }); + } } // First step of the write algorithm: dequeue operation and submit initial write(s) @@ -66,11 +84,6 @@ int blockstore::dequeue_write(blockstore_operation *op) ); op->pending_ops = 1; op->min_used_journal_sector = op->max_used_journal_sector = 0; - // Remember write as unsynced - unsynced_big_writes.push_back((obj_ver_id){ - .oid = op->oid, - .version = op->version, - }); } else { @@ -116,10 +129,6 @@ int blockstore::dequeue_write(blockstore_operation *op) dirty_it->second.state = ST_J_SUBMITTED; journal.next_free += op->len; op->pending_ops = 2; - unsynced_small_writes.push_back((obj_ver_id){ - .oid = op->oid, - .version = op->version, - }); } return 1; }