diff --git a/blockstore_impl.cpp b/blockstore_impl.cpp index afa032ffd..faf3cb3cb 100644 --- a/blockstore_impl.cpp +++ b/blockstore_impl.cpp @@ -125,6 +125,12 @@ void blockstore_impl_t::loop() if (PRIV(op)->wait_for) { check_wait(op); +#ifdef BLOCKSTORE_DEBUG + if (PRIV(op)->wait_for) + { + printf("still waiting for %d\n", PRIV(op)->wait_for); + } +#endif if (PRIV(op)->wait_for == WAIT_SQE) { break; @@ -270,7 +276,9 @@ void blockstore_impl_t::check_wait(blockstore_op_t *op) } else if (PRIV(op)->wait_for == WAIT_JOURNAL_BUFFER) { - if (journal.sector_info[((journal.cur_sector + 1) % journal.sector_count)].usage_count > 0) + int next = ((journal.cur_sector + 1) % journal.sector_count); + if (journal.sector_info[next].usage_count > 0 || + journal.sector_info[next].dirty) { // do not submit return; diff --git a/blockstore_impl.h b/blockstore_impl.h index a9072e17d..cf84963e7 100644 --- a/blockstore_impl.h +++ b/blockstore_impl.h @@ -256,6 +256,8 @@ class blockstore_impl_t void enqueue_write(blockstore_op_t *op); int dequeue_write(blockstore_op_t *op); int dequeue_del(blockstore_op_t *op); + void ack_write(blockstore_op_t *op); + void release_journal_sectors(blockstore_op_t *op); void handle_write_event(ring_data_t *data, blockstore_op_t *op); // Sync diff --git a/blockstore_journal.cpp b/blockstore_journal.cpp index 745301df5..fe434118a 100644 --- a/blockstore_journal.cpp +++ b/blockstore_journal.cpp @@ -22,6 +22,11 @@ int blockstore_journal_check_t::check_available(blockstore_op_t *op, int require next_in_pos += fits * size; sectors_required++; } + else if (bs->journal.sector_info[next_sector].dirty) + { + // sectors_required is more like "sectors to write" + sectors_required++; + } if (required <= 0) { break; @@ -33,13 +38,19 @@ int blockstore_journal_check_t::check_available(blockstore_op_t *op, int require right_dir = false; } next_in_pos = 0; - if (bs->journal.sector_info[next_sector].usage_count > 0) + if (bs->journal.sector_info[next_sector].usage_count > 0 || + bs->journal.sector_info[next_sector].dirty) { next_sector = ((next_sector + 1) % bs->journal.sector_count); } - if (bs->journal.sector_info[next_sector].usage_count > 0) + if (bs->journal.sector_info[next_sector].usage_count > 0 || + bs->journal.sector_info[next_sector].dirty) { // No memory buffer available. Wait for it. +#ifdef BLOCKSTORE_DEBUG + printf("next journal buffer %d is still dirty=%d used=%d\n", next_sector, + bs->journal.sector_info[next_sector].dirty, bs->journal.sector_info[next_sector].usage_count); +#endif PRIV(op)->wait_for = WAIT_JOURNAL_BUFFER; return 0; } @@ -68,6 +79,7 @@ journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type, { if (JOURNAL_BLOCK_SIZE - journal.in_sector_pos < size) { + assert(!journal.sector_info[journal.cur_sector].dirty); // Move to the next journal sector if (journal.sector_info[journal.cur_sector].usage_count > 0) { @@ -91,22 +103,24 @@ journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type, je->type = type; je->size = size; je->crc32_prev = journal.crc32_last; + journal.sector_info[journal.cur_sector].dirty = true; return je; } -void prepare_journal_sector_write(journal_t & journal, io_uring_sqe *sqe, std::function cb) +void prepare_journal_sector_write(journal_t & journal, int cur_sector, io_uring_sqe *sqe, std::function cb) { - journal.sector_info[journal.cur_sector].usage_count++; + journal.sector_info[cur_sector].dirty = false; + journal.sector_info[cur_sector].usage_count++; ring_data_t *data = ((ring_data_t*)sqe->user_data); data->iov = (struct iovec){ (journal.inmemory - ? journal.buffer + journal.sector_info[journal.cur_sector].offset - : journal.sector_buf + JOURNAL_BLOCK_SIZE*journal.cur_sector), + ? journal.buffer + journal.sector_info[cur_sector].offset + : journal.sector_buf + JOURNAL_BLOCK_SIZE*cur_sector), JOURNAL_BLOCK_SIZE }; data->callback = cb; my_uring_prep_writev( - sqe, journal.fd, &data->iov, 1, journal.offset + journal.sector_info[journal.cur_sector].offset + sqe, journal.fd, &data->iov, 1, journal.offset + journal.sector_info[cur_sector].offset ); } diff --git a/blockstore_journal.h b/blockstore_journal.h index 2e6ea7f2d..87c6c9a52 100644 --- a/blockstore_journal.h +++ b/blockstore_journal.h @@ -112,6 +112,7 @@ struct journal_sector_info_t { uint64_t offset; uint64_t usage_count; + bool dirty; }; struct journal_t @@ -154,4 +155,4 @@ struct blockstore_journal_check_t journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type, uint32_t size); -void prepare_journal_sector_write(journal_t & journal, io_uring_sqe *sqe, std::function cb); +void prepare_journal_sector_write(journal_t & journal, int sector, io_uring_sqe *sqe, std::function cb); diff --git a/blockstore_stable.cpp b/blockstore_stable.cpp index 7942801b4..cdff4de85 100644 --- a/blockstore_stable.cpp +++ b/blockstore_stable.cpp @@ -94,6 +94,14 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op) // Prepare and submit journal entries auto cb = [this, op](ring_data_t *data) { handle_stable_event(data, op); }; int s = 0, cur_sector = -1; + if ((JOURNAL_BLOCK_SIZE - journal.in_sector_pos) < sizeof(journal_entry_stable) && + journal.sector_info[journal.cur_sector].dirty) + { + if (cur_sector == -1) + PRIV(op)->min_used_journal_sector = 1 + journal.cur_sector; + cur_sector = journal.cur_sector; + prepare_journal_sector_write(journal, cur_sector, sqe[s++], cb); + } for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++) { auto unstab_it = unstable_writes.find(v->oid); @@ -104,6 +112,7 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op) } journal_entry_stable *je = (journal_entry_stable*) prefill_single_journal_entry(journal, JE_STABLE, sizeof(journal_entry_stable)); + journal.sector_info[journal.cur_sector].dirty = false; je->oid = v->oid; je->version = v->version; je->crc32 = je_crc32((journal_entry*)je); @@ -113,7 +122,7 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op) if (cur_sector == -1) PRIV(op)->min_used_journal_sector = 1 + journal.cur_sector; cur_sector = journal.cur_sector; - prepare_journal_sector_write(journal, sqe[s++], cb); + prepare_journal_sector_write(journal, cur_sector, sqe[s++], cb); } } PRIV(op)->max_used_journal_sector = 1 + journal.cur_sector; @@ -135,18 +144,7 @@ void blockstore_impl_t::handle_stable_event(ring_data_t *data, blockstore_op_t * if (PRIV(op)->pending_ops == 0) { // Release used journal sectors - if (PRIV(op)->min_used_journal_sector > 0) - { - uint64_t s = PRIV(op)->min_used_journal_sector; - while (1) - { - journal.sector_info[s-1].usage_count--; - if (s == PRIV(op)->max_used_journal_sector) - break; - s = 1 + s % journal.sector_count; - } - PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 0; - } + release_journal_sectors(op); // First step: mark dirty_db entries as stable, acknowledge op completion obj_ver_id* v; int i; diff --git a/blockstore_sync.cpp b/blockstore_sync.cpp index f56bbc2cd..dc8adf3d7 100644 --- a/blockstore_sync.cpp +++ b/blockstore_sync.cpp @@ -39,14 +39,36 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op) if (PRIV(op)->sync_state == SYNC_HAS_SMALL) { // No big writes, just fsync the journal - if (!disable_fsync) + int n_sqes = disable_fsync ? 0 : 1; + if (journal.sector_info[journal.cur_sector].dirty) { - BS_SUBMIT_GET_SQE(sqe, data); - my_uring_prep_fsync(sqe, journal.fd, IORING_FSYNC_DATASYNC); - data->iov = { 0 }; - data->callback = cb; - PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 0; - PRIV(op)->pending_ops = 1; + n_sqes++; + } + if (n_sqes > 0) + { + io_uring_sqe* sqes[n_sqes]; + for (int i = 0; i < n_sqes; i++) + { + BS_SUBMIT_GET_SQE_DECL(sqes[i]); + } + int s = 0; + if (journal.sector_info[journal.cur_sector].dirty) + { + prepare_journal_sector_write(journal, journal.cur_sector, sqes[s++], cb); + PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 1 + journal.cur_sector; + } + else + { + PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 0; + } + if (!disable_fsync) + { + ring_data_t *data = ((ring_data_t*)sqes[s]->user_data); + my_uring_prep_fsync(sqes[s++], journal.fd, IORING_FSYNC_DATASYNC); + data->iov = { 0 }; + data->callback = cb; + } + PRIV(op)->pending_ops = s; PRIV(op)->sync_state = SYNC_JOURNAL_SYNC_SENT; } else @@ -90,11 +112,20 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op) // Prepare and submit journal entries auto it = PRIV(op)->sync_big_writes.begin(); int s = 0, cur_sector = -1; + if ((JOURNAL_BLOCK_SIZE - journal.in_sector_pos) < sizeof(journal_entry_big_write) && + journal.sector_info[journal.cur_sector].dirty) + { + if (cur_sector == -1) + PRIV(op)->min_used_journal_sector = 1 + journal.cur_sector; + cur_sector = journal.cur_sector; + prepare_journal_sector_write(journal, cur_sector, sqe[s++], cb); + } while (it != PRIV(op)->sync_big_writes.end()) { journal_entry_big_write *je = (journal_entry_big_write*) prefill_single_journal_entry(journal, JE_BIG_WRITE, sizeof(journal_entry_big_write)); dirty_db[*it].journal_sector = journal.sector_info[journal.cur_sector].offset; + journal.sector_info[journal.cur_sector].dirty = false; journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++; #ifdef BLOCKSTORE_DEBUG printf("journal offset %lu is used by %lu:%lu v%lu\n", dirty_db[*it].journal_sector, it->oid.inode, it->oid.stripe, it->version); @@ -112,7 +143,7 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op) if (cur_sector == -1) PRIV(op)->min_used_journal_sector = 1 + journal.cur_sector; cur_sector = journal.cur_sector; - prepare_journal_sector_write(journal, sqe[s++], cb); + prepare_journal_sector_write(journal, cur_sector, sqe[s++], cb); } } PRIV(op)->max_used_journal_sector = 1 + journal.cur_sector; @@ -147,18 +178,7 @@ void blockstore_impl_t::handle_sync_event(ring_data_t *data, blockstore_op_t *op if (PRIV(op)->pending_ops == 0) { // Release used journal sectors - if (PRIV(op)->min_used_journal_sector > 0) - { - uint64_t s = PRIV(op)->min_used_journal_sector; - while (1) - { - journal.sector_info[s-1].usage_count--; - if (s == PRIV(op)->max_used_journal_sector) - break; - s = 1 + s % journal.sector_count; - } - PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 0; - } + release_journal_sectors(op); // Handle states if (PRIV(op)->sync_state == SYNC_DATA_SYNC_SENT) { diff --git a/blockstore_write.cpp b/blockstore_write.cpp index 8a84bdd1f..7a70bd6d1 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -137,7 +137,6 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) { // Small (journaled) write // First check if the journal has sufficient space - // FIXME Always two SQEs for now. Although it's possible to send 1 sometimes blockstore_journal_check_t space_check(this); if (unsynced_big_writes.size() && !space_check.check_available(op, unsynced_big_writes.size(), sizeof(journal_entry_big_write), 0) || !space_check.check_available(op, 1, sizeof(journal_entry_small_write), op->len + JOURNAL_STABILIZE_RESERVATION)) @@ -145,18 +144,34 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) return 0; } // There is sufficient space. Get SQE(s) - BS_SUBMIT_GET_ONLY_SQE(sqe1); + struct io_uring_sqe *sqe1 = NULL; + if ((JOURNAL_BLOCK_SIZE - journal.in_sector_pos) < sizeof(journal_entry_small_write) && + journal.sector_info[journal.cur_sector].dirty) + { + // Write current journal sector only if it's dirty and full + BS_SUBMIT_GET_SQE_DECL(sqe1); + } struct io_uring_sqe *sqe2 = NULL; - struct ring_data_t *data2 = NULL; if (op->len > 0) { BS_SUBMIT_GET_SQE_DECL(sqe2); - data2 = ((ring_data_t*)sqe2->user_data); } - // FIXME: Write journal sector here only if it is full. Otherwise, defer it until SYNC. This will help reduce WA - // Got SQEs. Prepare journal sector write + // Got SQEs. Prepare previous journal sector write if required + auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); }; + if (sqe1) + { + prepare_journal_sector_write(journal, journal.cur_sector, sqe1, cb); + // FIXME rename to min/max _flushing + PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 1 + journal.cur_sector; + PRIV(op)->pending_ops++; + } + else + { + PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 0; + } + // Then pre-fill journal entry journal_entry_small_write *je = (journal_entry_small_write*) - prefill_single_journal_entry(journal, JE_SMALL_WRITE, sizeof(struct journal_entry_small_write)); + prefill_single_journal_entry(journal, JE_SMALL_WRITE, sizeof(journal_entry_small_write)); dirty_it->second.journal_sector = journal.sector_info[journal.cur_sector].offset; journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++; #ifdef BLOCKSTORE_DEBUG @@ -172,9 +187,6 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) je->crc32_data = crc32c(0, op->buf, op->len); je->crc32 = je_crc32((journal_entry*)je); journal.crc32_last = je->crc32; - auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); }; - prepare_journal_sector_write(journal, sqe1, cb); - PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 1 + journal.cur_sector; if (op->len > 0) { // Prepare journal data write @@ -183,28 +195,34 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) // Copy data memcpy(journal.buffer + journal.next_free, op->buf, op->len); } + ring_data_t *data2 = ((ring_data_t*)sqe2->user_data); data2->iov = (struct iovec){ op->buf, op->len }; data2->callback = cb; my_uring_prep_writev( sqe2, journal.fd, &data2->iov, 1, journal.offset + journal.next_free ); - PRIV(op)->pending_ops = 2; + PRIV(op)->pending_ops++; } else { // Zero-length overwrite. Allowed to bump object version in EC placement groups without actually writing data - PRIV(op)->pending_ops = 1; } dirty_it->second.location = journal.next_free; dirty_it->second.state = ST_J_SUBMITTED; journal.next_free += op->len; if (journal.next_free >= journal.len) + { journal.next_free = JOURNAL_BLOCK_SIZE; + } // Remember small write as unsynced unsynced_small_writes.push_back((obj_ver_id){ .oid = op->oid, .version = op->version, }); + if (!PRIV(op)->pending_ops) + { + ack_write(op); + } } return 1; } @@ -223,45 +241,56 @@ void blockstore_impl_t::handle_write_event(ring_data_t *data, blockstore_op_t *o PRIV(op)->pending_ops--; if (PRIV(op)->pending_ops == 0) { - // Release used journal sectors - if (PRIV(op)->min_used_journal_sector > 0) - { - uint64_t s = PRIV(op)->min_used_journal_sector; - while (1) - { - journal.sector_info[s-1].usage_count--; - if (s == PRIV(op)->max_used_journal_sector) - break; - s = 1 + s % journal.sector_count; - } - PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 0; - } - // Switch object state - auto & dirty_entry = dirty_db[(obj_ver_id){ - .oid = op->oid, - .version = op->version, - }]; -#ifdef BLOCKSTORE_DEBUG - printf("Ack write %lu:%lu v%lu = %d\n", op->oid.inode, op->oid.stripe, op->version, dirty_entry.state); -#endif - if (dirty_entry.state == ST_J_SUBMITTED) - { - dirty_entry.state = ST_J_WRITTEN; - } - else if (dirty_entry.state == ST_D_SUBMITTED) - { - dirty_entry.state = ST_D_WRITTEN; - } - else if (dirty_entry.state == ST_DEL_SUBMITTED) - { - dirty_entry.state = ST_DEL_WRITTEN; - } - // Acknowledge write without sync - op->retval = op->len; - FINISH_OP(op); + release_journal_sectors(op); + ack_write(op); } } +void blockstore_impl_t::release_journal_sectors(blockstore_op_t *op) +{ + // Release used journal sectors + if (PRIV(op)->min_used_journal_sector > 0 && + PRIV(op)->max_used_journal_sector > 0) + { + uint64_t s = PRIV(op)->min_used_journal_sector; + while (1) + { + journal.sector_info[s-1].usage_count--; + if (s == PRIV(op)->max_used_journal_sector) + break; + s = 1 + s % journal.sector_count; + } + PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 0; + } +} + +void blockstore_impl_t::ack_write(blockstore_op_t *op) +{ + // Switch object state + auto & dirty_entry = dirty_db[(obj_ver_id){ + .oid = op->oid, + .version = op->version, + }]; +#ifdef BLOCKSTORE_DEBUG + printf("Ack write %lu:%lu v%lu = %d\n", op->oid.inode, op->oid.stripe, op->version, dirty_entry.state); +#endif + if (dirty_entry.state == ST_J_SUBMITTED) + { + dirty_entry.state = ST_J_WRITTEN; + } + else if (dirty_entry.state == ST_D_SUBMITTED) + { + dirty_entry.state = ST_D_WRITTEN; + } + else if (dirty_entry.state == ST_DEL_SUBMITTED) + { + dirty_entry.state = ST_DEL_WRITTEN; + } + // Acknowledge write without sync + op->retval = op->len; + FINISH_OP(op); +} + int blockstore_impl_t::dequeue_del(blockstore_op_t *op) { auto dirty_it = dirty_db.find((obj_ver_id){ @@ -287,7 +316,7 @@ int blockstore_impl_t::dequeue_del(blockstore_op_t *op) je->crc32 = je_crc32((journal_entry*)je); journal.crc32_last = je->crc32; auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); }; - prepare_journal_sector_write(journal, sqe, cb); + prepare_journal_sector_write(journal, journal.cur_sector, sqe, cb); PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 1 + journal.cur_sector; PRIV(op)->pending_ops = 1; dirty_it->second.state = ST_DEL_SUBMITTED;