diff --git a/blockstore_impl.cpp b/blockstore_impl.cpp index 4aa715ee..fa396f95 100644 --- a/blockstore_impl.cpp +++ b/blockstore_impl.cpp @@ -355,7 +355,7 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op, bool first) op->callback(op); return; } - if (0 && op->opcode == BS_OP_SYNC && immediate_commit) + if (op->opcode == BS_OP_SYNC && immediate_commit == IMMEDIATE_ALL) { op->retval = 0; op->callback(op); diff --git a/blockstore_impl.h b/blockstore_impl.h index de4a5a7b..7214c19a 100644 --- a/blockstore_impl.h +++ b/blockstore_impl.h @@ -34,8 +34,7 @@ #define ST_D_IN_FLIGHT 15 #define ST_D_SUBMITTED 16 #define ST_D_WRITTEN 17 -#define ST_D_META_WRITTEN 19 -#define ST_D_META_SYNCED 20 +#define ST_D_SYNCED 20 #define ST_D_STABLE 21 #define ST_DEL_IN_FLIGHT 31 @@ -46,13 +45,17 @@ #define ST_CURRENT 48 +#define IMMEDIATE_NONE 0 +#define IMMEDIATE_SMALL 1 +#define IMMEDIATE_ALL 2 + #define IS_IN_FLIGHT(st) (st == ST_J_IN_FLIGHT || st == ST_D_IN_FLIGHT || st == ST_DEL_IN_FLIGHT || st == ST_J_SUBMITTED || st == ST_D_SUBMITTED || st == ST_DEL_SUBMITTED) #define IS_STABLE(st) (st == ST_J_STABLE || st == ST_D_STABLE || st == ST_DEL_STABLE || st == ST_CURRENT) -#define IS_SYNCED(st) (IS_STABLE(st) || st == ST_J_SYNCED || st == ST_D_META_SYNCED || st == ST_DEL_SYNCED) +#define IS_SYNCED(st) (IS_STABLE(st) || st == ST_J_SYNCED || st == ST_D_SYNCED || st == ST_DEL_SYNCED) #define IS_JOURNAL(st) (st >= ST_J_SUBMITTED && st <= ST_J_STABLE) #define IS_BIG_WRITE(st) (st >= ST_D_SUBMITTED && st <= ST_D_STABLE) #define IS_DELETE(st) (st >= ST_DEL_SUBMITTED && st <= ST_DEL_STABLE) -#define IS_UNSYNCED(st) (st >= ST_J_SUBMITTED && st <= ST_J_WRITTEN || st >= ST_D_SUBMITTED && st <= ST_D_META_WRITTEN || st >= ST_DEL_SUBMITTED && st <= ST_DEL_WRITTEN) +#define IS_UNSYNCED(st) (st >= ST_J_SUBMITTED && st <= ST_J_WRITTEN || st >= ST_D_SUBMITTED && st <= ST_D_WRITTEN|| st >= ST_DEL_SUBMITTED && st <= ST_DEL_WRITTEN) #define BS_SUBMIT_GET_SQE(sqe, data) \ BS_SUBMIT_GET_ONLY_SQE(sqe); \ @@ -195,8 +198,8 @@ class blockstore_impl_t // It is safe to disable fsync() if drive write cache is writethrough bool disable_data_fsync = false, disable_meta_fsync = false, disable_journal_fsync = false; // Enable if you want every operation to be executed with an "implicit fsync" - // FIXME Not implemented yet - bool immediate_commit = false; + // Suitable only for server SSDs with capacitors, requires disabled data and journal fsyncs + int immediate_commit = IMMEDIATE_NONE; bool inmemory_meta = false; int flusher_count; /******* END OF OPTIONS *******/ @@ -268,7 +271,7 @@ class blockstore_impl_t bool enqueue_write(blockstore_op_t *op); int dequeue_write(blockstore_op_t *op); int dequeue_del(blockstore_op_t *op); - void ack_write(blockstore_op_t *op); + int continue_write(blockstore_op_t *op); void release_journal_sectors(blockstore_op_t *op); void handle_write_event(ring_data_t *data, blockstore_op_t *op); diff --git a/blockstore_init.cpp b/blockstore_init.cpp index 75637eec..ede217c4 100644 --- a/blockstore_init.cpp +++ b/blockstore_init.cpp @@ -558,7 +558,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u .version = je->big_write.version, }; bs->dirty_db.emplace(ov, (dirty_entry){ - .state = ST_D_META_SYNCED, + .state = ST_D_SYNCED, .flags = 0, .location = je->big_write.location, .offset = je->big_write.offset, @@ -595,7 +595,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u { while (1) { - it->second.state = (it->second.state == ST_D_META_SYNCED + it->second.state = (it->second.state == ST_D_SYNCED ? ST_D_STABLE : (it->second.state == ST_DEL_SYNCED ? ST_DEL_STABLE : ST_J_STABLE)); if (it == bs->dirty_db.begin()) diff --git a/blockstore_open.cpp b/blockstore_open.cpp index 481dc3ec..ab38e5ff 100644 --- a/blockstore_open.cpp +++ b/blockstore_open.cpp @@ -34,6 +34,14 @@ void blockstore_impl_t::parse_config(blockstore_config_t & config) { disable_journal_fsync = true; } + if (config["immediate_commit"] == "all") + { + immediate_commit = IMMEDIATE_ALL; + } + else if (config["immediate_commit"] == "small") + { + immediate_commit = IMMEDIATE_SMALL; + } metadata_buf_size = strtoull(config["meta_buf_size"].c_str(), NULL, 10); cfg_journal_size = strtoull(config["journal_size"].c_str(), NULL, 10); data_device = config["data_device"]; @@ -129,6 +137,22 @@ void blockstore_impl_t::parse_config(blockstore_config_t & config) { metadata_buf_size = 4*1024*1024; } + if (meta_device == "") + { + disable_meta_fsync = disable_data_fsync; + } + if (journal_device == "") + { + disable_journal_fsync = disable_meta_fsync; + } + if (immediate_commit != IMMEDIATE_NONE && !disable_journal_fsync) + { + throw std::runtime_error("immediate_commit requires disable_journal_fsync"); + } + if (immediate_commit == IMMEDIATE_ALL && !disable_data_fsync) + { + throw std::runtime_error("immediate_commit=all requires disable_journal_fsync and disable_data_fsync"); + } // init some fields clean_entry_bitmap_size = block_size / bitmap_granularity / 8; clean_entry_size = sizeof(clean_disk_entry) + clean_entry_bitmap_size; @@ -283,7 +307,6 @@ void blockstore_impl_t::open_meta() else { meta_fd = data_fd; - disable_meta_fsync = disable_data_fsync; meta_size = 0; if (meta_offset >= data_size) { @@ -306,7 +329,6 @@ void blockstore_impl_t::open_journal() else { journal.fd = meta_fd; - disable_journal_fsync = disable_meta_fsync; journal.device_size = 0; if (journal.offset >= data_size) { diff --git a/blockstore_stable.cpp b/blockstore_stable.cpp index f849ac9c..d7c04ec4 100644 --- a/blockstore_stable.cpp +++ b/blockstore_stable.cpp @@ -181,7 +181,7 @@ resume_5: { dirty_it->second.state = ST_J_STABLE; } - else if (dirty_it->second.state == ST_D_META_SYNCED) + else if (dirty_it->second.state == ST_D_SYNCED) { dirty_it->second.state = ST_D_STABLE; } diff --git a/blockstore_sync.cpp b/blockstore_sync.cpp index 2c5f5e35..1a1aa354 100644 --- a/blockstore_sync.cpp +++ b/blockstore_sync.cpp @@ -252,7 +252,7 @@ void blockstore_impl_t::ack_one_sync(blockstore_op_t *op) #endif auto & unstab = unstable_writes[it->oid]; unstab = unstab < it->version ? it->version : unstab; - dirty_db[*it].state = ST_D_META_SYNCED; + dirty_db[*it].state = ST_D_SYNCED; } for (auto it = PRIV(op)->sync_small_writes.begin(); it != PRIV(op)->sync_small_writes.end(); it++) { diff --git a/blockstore_write.cpp b/blockstore_write.cpp index c0260b4a..678dd0f5 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -75,6 +75,10 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op) // First step of the write algorithm: dequeue operation and submit initial write(s) int blockstore_impl_t::dequeue_write(blockstore_op_t *op) { + if (PRIV(op)->op_state) + { + return continue_write(op); + } auto dirty_it = dirty_db.find((obj_ver_id){ .oid = op->oid, .version = op->version, @@ -129,11 +133,19 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) ); PRIV(op)->pending_ops = 1; PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0; - // Remember big write as unsynced - unsynced_big_writes.push_back((obj_ver_id){ - .oid = op->oid, - .version = op->version, - }); + if (immediate_commit != IMMEDIATE_ALL) + { + // Remember big write as unsynced + unsynced_big_writes.push_back((obj_ver_id){ + .oid = op->oid, + .version = op->version, + }); + PRIV(op)->op_state = 3; + } + else + { + PRIV(op)->op_state = 1; + } } else { @@ -147,10 +159,11 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) } // There is sufficient space. Get SQE(s) struct io_uring_sqe *sqe1 = NULL; - if ((journal_block_size - journal.in_sector_pos) < sizeof(journal_entry_small_write) && + if (immediate_commit != IMMEDIATE_NONE || + (journal_block_size - journal.in_sector_pos) < sizeof(journal_entry_small_write) && journal.sector_info[journal.cur_sector].dirty) { - // Write current journal sector only if it's dirty and full + // Write current journal sector only if it's dirty and full, or in the immediate_commit mode BS_SUBMIT_GET_SQE_DECL(sqe1); } struct io_uring_sqe *sqe2 = NULL; @@ -160,15 +173,18 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) } // Got SQEs. Prepare previous journal sector write if required auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); }; - if (sqe1) + if (immediate_commit == IMMEDIATE_NONE) { - prepare_journal_sector_write(journal, journal.cur_sector, sqe1, cb); - PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector; - PRIV(op)->pending_ops++; - } - else - { - PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0; + if (sqe1) + { + prepare_journal_sector_write(journal, journal.cur_sector, sqe1, cb); + PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector; + PRIV(op)->pending_ops++; + } + else + { + PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0; + } } // Then pre-fill journal entry journal_entry_small_write *je = (journal_entry_small_write*) @@ -188,6 +204,12 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) je->crc32_data = crc32c(0, op->buf, op->len); je->crc32 = je_crc32((journal_entry*)je); journal.crc32_last = je->crc32; + if (immediate_commit != IMMEDIATE_NONE) + { + prepare_journal_sector_write(journal, journal.cur_sector, sqe1, cb); + PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector; + PRIV(op)->pending_ops++; + } if (op->len > 0) { // Prepare journal data write @@ -215,19 +237,99 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) { journal.next_free = journal_block_size; } - // Remember small write as unsynced - unsynced_small_writes.push_back((obj_ver_id){ - .oid = op->oid, - .version = op->version, - }); + if (immediate_commit == IMMEDIATE_NONE) + { + // Remember small write as unsynced + unsynced_small_writes.push_back((obj_ver_id){ + .oid = op->oid, + .version = op->version, + }); + } if (!PRIV(op)->pending_ops) { - ack_write(op); + PRIV(op)->op_state = 4; + continue_write(op); + } + else + { + PRIV(op)->op_state = 3; } } return 1; } +int blockstore_impl_t::continue_write(blockstore_op_t *op) +{ + io_uring_sqe *sqe = NULL; + journal_entry_big_write *je; + auto & dirty_entry = dirty_db[(obj_ver_id){ + .oid = op->oid, + .version = op->version, + }]; + if (PRIV(op)->op_state == 2) + goto resume_2; + else if (PRIV(op)->op_state == 4) + goto resume_4; + else + return 1; +resume_2: + // Only for the immediate_commit mode: prepare and submit big_write journal entry + sqe = get_sqe(); + if (!sqe) + { + return 0; + } + je = (journal_entry_big_write*)prefill_single_journal_entry(journal, JE_BIG_WRITE, sizeof(journal_entry_big_write)); + dirty_entry.journal_sector = journal.sector_info[journal.cur_sector].offset; + journal.sector_info[journal.cur_sector].dirty = false; + journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++; +#ifdef BLOCKSTORE_DEBUG + printf("journal offset %lu is used by %lu:%lu v%lu\n", journal.sector_info[journal.cur_sector].offset, op->oid.inode, op->oid.stripe, op->version); +#endif + je->oid = op->oid; + je->version = op->version; + je->offset = op->offset; + je->len = op->len; + je->location = dirty_entry.location; + je->crc32 = je_crc32((journal_entry*)je); + journal.crc32_last = je->crc32; + prepare_journal_sector_write(journal, journal.cur_sector, sqe, + [this, op](ring_data_t *data) { handle_write_event(data, op); }); + PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector; + PRIV(op)->pending_ops = 1; + PRIV(op)->op_state = 3; + return 1; +resume_4: + // Switch object state +#ifdef BLOCKSTORE_DEBUG + printf("Ack write %lu:%lu v%lu = %d\n", op->oid.inode, op->oid.stripe, op->version, dirty_entry.state); +#endif + bool imm = dirty_entry.state == ST_D_SUBMITTED + ? (immediate_commit == IMMEDIATE_ALL) + : (immediate_commit != IMMEDIATE_NONE); + if (imm) + { + auto & unstab = unstable_writes[op->oid]; + unstab = unstab < op->version ? op->version : unstab; + } + if (dirty_entry.state == ST_J_SUBMITTED) + { + dirty_entry.state = imm ? ST_J_SYNCED : ST_J_WRITTEN; + } + else if (dirty_entry.state == ST_D_SUBMITTED) + { + dirty_entry.state = imm ? ST_D_SYNCED : ST_D_WRITTEN; + } + else if (dirty_entry.state == ST_DEL_SUBMITTED) + { + dirty_entry.state = imm ? ST_DEL_SYNCED : ST_DEL_WRITTEN; + } + // Acknowledge write + op->retval = op->len; + FINISH_OP(op); + return 1; +} + void blockstore_impl_t::handle_write_event(ring_data_t *data, blockstore_op_t *op) { live = true; @@ -243,7 +345,11 @@ void blockstore_impl_t::handle_write_event(ring_data_t *data, blockstore_op_t *o if (PRIV(op)->pending_ops == 0) { release_journal_sectors(op); - ack_write(op); + PRIV(op)->op_state++; + if (!continue_write(op)) + { + submit_queue.push_front(op); + } } } @@ -275,33 +381,6 @@ void blockstore_impl_t::release_journal_sectors(blockstore_op_t *op) } } -void blockstore_impl_t::ack_write(blockstore_op_t *op) -{ - // Switch object state - auto & dirty_entry = dirty_db[(obj_ver_id){ - .oid = op->oid, - .version = op->version, - }]; -#ifdef BLOCKSTORE_DEBUG - printf("Ack write %lu:%lu v%lu = %d\n", op->oid.inode, op->oid.stripe, op->version, dirty_entry.state); -#endif - if (dirty_entry.state == ST_J_SUBMITTED) - { - dirty_entry.state = ST_J_WRITTEN; - } - else if (dirty_entry.state == ST_D_SUBMITTED) - { - dirty_entry.state = ST_D_WRITTEN; - } - else if (dirty_entry.state == ST_DEL_SUBMITTED) - { - dirty_entry.state = ST_DEL_WRITTEN; - } - // Acknowledge write without sync - op->retval = op->len; - FINISH_OP(op); -} - int blockstore_impl_t::dequeue_del(blockstore_op_t *op) { auto dirty_it = dirty_db.find((obj_ver_id){ @@ -313,8 +392,30 @@ int blockstore_impl_t::dequeue_del(blockstore_op_t *op) { return 0; } - BS_SUBMIT_GET_ONLY_SQE(sqe); + io_uring_sqe *sqe = NULL; + if (immediate_commit != IMMEDIATE_NONE || + (journal_block_size - journal.in_sector_pos) < sizeof(journal_entry_del) && + journal.sector_info[journal.cur_sector].dirty) + { + // Write current journal sector only if it's dirty and full, or in the immediate_commit mode + BS_SUBMIT_GET_SQE_DECL(sqe); + } + auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); }; // Prepare journal sector write + if (immediate_commit == IMMEDIATE_NONE) + { + if (sqe) + { + prepare_journal_sector_write(journal, journal.cur_sector, sqe, cb); + PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector; + PRIV(op)->pending_ops++; + } + else + { + PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0; + } + } + // Pre-fill journal entry journal_entry_del *je = (journal_entry_del*) prefill_single_journal_entry(journal, JE_DELETE, sizeof(struct journal_entry_del)); dirty_it->second.journal_sector = journal.sector_info[journal.cur_sector].offset; @@ -326,15 +427,26 @@ int blockstore_impl_t::dequeue_del(blockstore_op_t *op) je->version = op->version; je->crc32 = je_crc32((journal_entry*)je); journal.crc32_last = je->crc32; - auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); }; - prepare_journal_sector_write(journal, journal.cur_sector, sqe, cb); - PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector; - PRIV(op)->pending_ops = 1; dirty_it->second.state = ST_DEL_SUBMITTED; - // Remember small write as unsynced - unsynced_small_writes.push_back((obj_ver_id){ - .oid = op->oid, - .version = op->version, - }); + if (immediate_commit != IMMEDIATE_NONE) + { + prepare_journal_sector_write(journal, journal.cur_sector, sqe, cb); + PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector; + PRIV(op)->pending_ops++; + // Remember small write as unsynced + unsynced_small_writes.push_back((obj_ver_id){ + .oid = op->oid, + .version = op->version, + }); + } + if (!PRIV(op)->pending_ops) + { + PRIV(op)->op_state = 4; + continue_write(op); + } + else + { + PRIV(op)->op_state = 3; + } return 1; } diff --git a/test.cpp b/test.cpp index df31c5ec..0dad16bc 100644 --- a/test.cpp +++ b/test.cpp @@ -181,7 +181,7 @@ int main0(int argc, char *argv[]) }, .version = 1, }] = (dirty_entry){ - .state = ST_D_META_SYNCED, + .state = ST_D_SYNCED, .flags = 0, .location = (uint64_t)i << 17, .offset = 0,