diff --git a/Makefile b/Makefile index e7c1c3a33..8efe430e2 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -BLOCKSTORE_OBJS := allocator.o blockstore.o blockstore_init.o blockstore_open.o blockstore_journal.o blockstore_read.o \ +BLOCKSTORE_OBJS := allocator.o blockstore.o blockstore_impl.o blockstore_init.o blockstore_open.o blockstore_journal.o blockstore_read.o \ blockstore_write.o blockstore_sync.o blockstore_stable.o blockstore_flush.o crc32c.o ringloop.o timerfd_interval.o CXXFLAGS := -g -O3 -Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith -fPIC -fdiagnostics-color=always all: $(BLOCKSTORE_OBJS) test test_blockstore libfio_blockstore.so osd @@ -6,7 +6,7 @@ clean: rm -f *.o crc32c.o: crc32c.c g++ $(CXXFLAGS) -c -o $@ $< -%.o: %.cpp allocator.h blockstore_flush.h blockstore.h blockstore_init.h blockstore_journal.h crc32c.h ringloop.h xor.h timerfd_interval.h +%.o: %.cpp allocator.h blockstore_flush.h blockstore.h blockstore_impl.h blockstore_init.h blockstore_journal.h crc32c.h ringloop.h xor.h timerfd_interval.h g++ $(CXXFLAGS) -c -o $@ $< osd: $(BLOCKSTORE_OBJS) osd_main.cpp osd.h osd_ops.h osd.o g++ $(CXXFLAGS) -ltcmalloc_minimal -luring -o osd osd_main.cpp osd.o $(BLOCKSTORE_OBJS) diff --git a/blockstore.cpp b/blockstore.cpp index f08ea7ddd..897521932 100644 --- a/blockstore.cpp +++ b/blockstore.cpp @@ -1,300 +1,51 @@ -#include "blockstore.h" +#include "blockstore_impl.h" -blockstore::blockstore(blockstore_config_t & config, ring_loop_t *ringloop) +blockstore_t::blockstore_t(blockstore_config_t & config, ring_loop_t *ringloop) { - this->ringloop = ringloop; - ring_consumer.loop = [this]() { loop(); }; - ringloop->register_consumer(ring_consumer); - initialized = 0; - block_order = strtoull(config["block_size_order"].c_str(), NULL, 10); - if (block_order == 0) - { - block_order = DEFAULT_ORDER; - } - block_size = 1 << block_order; - if (block_size < MIN_BLOCK_SIZE || block_size >= MAX_BLOCK_SIZE) - { - throw std::runtime_error("Bad block size"); - } - zero_object = (uint8_t*)memalign(DISK_ALIGNMENT, block_size); - data_fd = meta_fd = journal.fd = -1; - try - { - open_data(config); - open_meta(config); - open_journal(config); - calc_lengths(config); - data_alloc = new allocator(block_count); - } - catch (std::exception & e) - { - if (data_fd >= 0) - close(data_fd); - if (meta_fd >= 0 && meta_fd != data_fd) - close(meta_fd); - if (journal.fd >= 0 && journal.fd != meta_fd) - close(journal.fd); - throw; - } - int flusher_count = strtoull(config["flusher_count"].c_str(), NULL, 10); - if (!flusher_count) - flusher_count = 32; - flusher = new journal_flusher_t(flusher_count, this); + impl = new blockstore_impl_t(config, ringloop); } -blockstore::~blockstore() +blockstore_t::~blockstore_t() { - delete data_alloc; - delete flusher; - free(zero_object); - ringloop->unregister_consumer(ring_consumer); - if (data_fd >= 0) - close(data_fd); - if (meta_fd >= 0 && meta_fd != data_fd) - close(meta_fd); - if (journal.fd >= 0 && journal.fd != meta_fd) - close(journal.fd); - if (metadata_buffer) - free(metadata_buffer); + delete impl; } -bool blockstore::is_started() +void blockstore_t::loop() { - return initialized == 10; + impl->loop(); } -// main event loop - produce requests -void blockstore::loop() +bool blockstore_t::is_started() { - if (initialized != 10) - { - // read metadata, then journal - if (initialized == 0) - { - metadata_init_reader = new blockstore_init_meta(this); - initialized = 1; - } - if (initialized == 1) - { - int res = metadata_init_reader->loop(); - if (!res) - { - delete metadata_init_reader; - metadata_init_reader = NULL; - journal_init_reader = new blockstore_init_journal(this); - initialized = 2; - } - } - if (initialized == 2) - { - int res = journal_init_reader->loop(); - if (!res) - { - delete journal_init_reader; - journal_init_reader = NULL; - initialized = 10; - } - } - } - else - { - // try to submit ops - auto cur_sync = in_progress_syncs.begin(); - while (cur_sync != in_progress_syncs.end()) - { - continue_sync(*cur_sync++); - } - auto cur = submit_queue.begin(); - int has_writes = 0; - while (cur != submit_queue.end()) - { - auto op_ptr = cur; - auto op = *(cur++); - // FIXME: This needs some simplification - // Writes should not block reads if the ring is not full and if reads don't depend on them - // In all other cases we should stop submission - if (op->wait_for) - { - check_wait(op); - if (op->wait_for == WAIT_SQE) - { - break; - } - else if (op->wait_for) - { - if ((op->flags & OP_TYPE_MASK) == OP_WRITE || - (op->flags & OP_TYPE_MASK) == OP_DELETE) - { - has_writes = 2; - } - continue; - } - } - unsigned ring_space = io_uring_sq_space_left(&ringloop->ring); - unsigned prev_sqe_pos = ringloop->ring.sq.sqe_tail; - int dequeue_op = 0; - if ((op->flags & OP_TYPE_MASK) == OP_READ) - { - dequeue_op = dequeue_read(op); - } - else if ((op->flags & OP_TYPE_MASK) == OP_WRITE || - (op->flags & OP_TYPE_MASK) == OP_DELETE) - { - if (has_writes == 2) - { - // Some writes could not be submitted - break; - } - dequeue_op = dequeue_write(op); - has_writes = dequeue_op ? 1 : 2; - } - else if ((op->flags & OP_TYPE_MASK) == OP_SYNC) - { - // wait for all small writes to be submitted - // wait for all big writes to complete, submit data device fsync - // wait for the data device fsync to complete, then submit journal writes for big writes - // then submit an fsync operation - if (has_writes) - { - // Can't submit SYNC before previous writes - continue; - } - dequeue_op = dequeue_sync(op); - } - else if ((op->flags & OP_TYPE_MASK) == OP_STABLE) - { - dequeue_op = dequeue_stable(op); - } - if (dequeue_op) - { - submit_queue.erase(op_ptr); - } - else - { - ringloop->ring.sq.sqe_tail = prev_sqe_pos; - if (op->wait_for == WAIT_SQE) - { - op->wait_detail = 1 + ring_space; - // ring is full, stop submission - break; - } - } - } - if (!readonly) - { - flusher->loop(); - } - int ret = ringloop->submit(); - if (ret < 0) - { - throw std::runtime_error(std::string("io_uring_submit: ") + strerror(-ret)); - } - } + return impl->is_started(); } -bool blockstore::is_safe_to_stop() +bool blockstore_t::is_safe_to_stop() { - // It's safe to stop blockstore when there are no in-flight operations, - // no in-progress syncs and flusher isn't doing anything - if (submit_queue.size() > 0 || in_progress_syncs.size() > 0 || !readonly && flusher->is_active()) - { - return false; - } - if (unsynced_big_writes.size() > 0 || unsynced_small_writes.size() > 0) - { - if (!readonly && !stop_sync_submitted) - { - // We should sync the blockstore before unmounting - blockstore_op_t *op = new blockstore_op_t; - op->flags = OP_SYNC; - op->buf = NULL; - op->callback = [](blockstore_op_t *op) - { - delete op; - }; - enqueue_op(op); - stop_sync_submitted = true; - } - return false; - } - return true; + return impl->is_safe_to_stop(); } -void blockstore::check_wait(blockstore_op_t *op) +void blockstore_t::enqueue_op(blockstore_op_t *op) { - if (op->wait_for == WAIT_SQE) - { - if (io_uring_sq_space_left(&ringloop->ring) < op->wait_detail) - { - // stop submission if there's still no free space - return; - } - op->wait_for = 0; - } - else if (op->wait_for == WAIT_IN_FLIGHT) - { - auto dirty_it = dirty_db.find((obj_ver_id){ - .oid = op->oid, - .version = op->wait_detail, - }); - if (dirty_it != dirty_db.end() && IS_IN_FLIGHT(dirty_it->second.state)) - { - // do not submit - return; - } - op->wait_for = 0; - } - else if (op->wait_for == WAIT_JOURNAL) - { - if (journal.used_start == op->wait_detail) - { - // do not submit - return; - } - op->wait_for = 0; - } - else if (op->wait_for == WAIT_JOURNAL_BUFFER) - { - if (journal.sector_info[((journal.cur_sector + 1) % journal.sector_count)].usage_count > 0) - { - // do not submit - return; - } - op->wait_for = 0; - } - else if (op->wait_for == WAIT_FREE) - { - if (!data_alloc->get_free_count() && !flusher->is_active()) - { - return; - } - op->wait_for = 0; - } - else - { - throw std::runtime_error("BUG: op->wait_for value is unexpected"); - } + impl->enqueue_op(op); } -void blockstore::enqueue_op(blockstore_op_t *op) +std::map & blockstore_t::get_unstable_writes() { - int type = op->flags & OP_TYPE_MASK; - if (type < OP_READ || type > OP_DELETE || (type == OP_READ || type == OP_WRITE) && - (op->offset >= block_size || op->len > block_size-op->offset || (op->len % DISK_ALIGNMENT)) || - readonly && type != OP_READ) - { - // Basic verification not passed - op->retval = -EINVAL; - op->callback(op); - return; - } - op->wait_for = 0; - op->sync_state = 0; - op->pending_ops = 0; - submit_queue.push_back(op); - if ((op->flags & OP_TYPE_MASK) == OP_WRITE) - { - enqueue_write(op); - } - ringloop->wakeup(); + return impl->unstable_writes; +} + +uint32_t blockstore_t::get_block_size() +{ + return impl->get_block_size(); +} + +uint32_t blockstore_t::get_block_order() +{ + return impl->get_block_order(); +} + +uint64_t blockstore_t::get_block_count() +{ + return impl->get_block_count(); } diff --git a/blockstore.h b/blockstore.h index d5d509efc..44bfcb89b 100644 --- a/blockstore.h +++ b/blockstore.h @@ -3,93 +3,21 @@ #ifndef _LARGEFILE64_SOURCE #define _LARGEFILE64_SOURCE #endif -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include + #include -#include -#include -#include +#include #include -#include "sparsepp/sparsepp/spp.h" - -#include "allocator.h" #include "ringloop.h" -//#define BLOCKSTORE_DEBUG - -// States are not stored on disk. Instead, they're deduced from the journal - -#define ST_J_IN_FLIGHT 1 -#define ST_J_SUBMITTED 2 -#define ST_J_WRITTEN 3 -#define ST_J_SYNCED 4 -#define ST_J_STABLE 5 - -#define ST_D_IN_FLIGHT 15 -#define ST_D_SUBMITTED 16 -#define ST_D_WRITTEN 17 -#define ST_D_META_WRITTEN 19 -#define ST_D_META_SYNCED 20 -#define ST_D_STABLE 21 - -#define ST_DEL_IN_FLIGHT 31 -#define ST_DEL_SUBMITTED 32 -#define ST_DEL_WRITTEN 33 -#define ST_DEL_SYNCED 34 -#define ST_DEL_STABLE 35 - -#define ST_CURRENT 48 - -#define IS_IN_FLIGHT(st) (st == ST_J_IN_FLIGHT || st == ST_D_IN_FLIGHT || st == ST_DEL_IN_FLIGHT || st == ST_J_SUBMITTED || st == ST_D_SUBMITTED || st == ST_DEL_SUBMITTED) -#define IS_STABLE(st) (st == ST_J_STABLE || st == ST_D_STABLE || st == ST_DEL_STABLE || st == ST_CURRENT) -#define IS_SYNCED(st) (IS_STABLE(st) || st == ST_J_SYNCED || st == ST_D_META_SYNCED || st == ST_DEL_SYNCED) -#define IS_JOURNAL(st) (st >= ST_J_SUBMITTED && st <= ST_J_STABLE) -#define IS_BIG_WRITE(st) (st >= ST_D_SUBMITTED && st <= ST_D_STABLE) -#define IS_DELETE(st) (st >= ST_DEL_SUBMITTED && st <= ST_DEL_STABLE) -#define IS_UNSYNCED(st) (st >= ST_J_SUBMITTED && st <= ST_J_WRITTEN || st >= ST_D_SUBMITTED && st <= ST_D_META_WRITTEN || st >= ST_DEL_SUBMITTED && st <= ST_DEL_WRITTEN) - // Default block size is 128 KB, current allowed range is 4K - 128M #define DEFAULT_ORDER 17 #define MIN_BLOCK_SIZE 4*1024 #define MAX_BLOCK_SIZE 128*1024*1024 #define DISK_ALIGNMENT 512 -#define BS_SUBMIT_GET_SQE(sqe, data) \ - BS_SUBMIT_GET_ONLY_SQE(sqe); \ - struct ring_data_t *data = ((ring_data_t*)sqe->user_data) - -#define BS_SUBMIT_GET_ONLY_SQE(sqe) \ - struct io_uring_sqe *sqe = get_sqe();\ - if (!sqe)\ - {\ - /* Pause until there are more requests available */\ - op->wait_for = WAIT_SQE;\ - return 0;\ - } - -#define BS_SUBMIT_GET_SQE_DECL(sqe) \ - sqe = get_sqe();\ - if (!sqe)\ - {\ - /* Pause until there are more requests available */\ - op->wait_for = WAIT_SQE;\ - return 0;\ - } - -class blockstore; - -class blockstore_op_t; - // 16 bytes per object/stripe id // stripe includes replica number in 4 least significant bits struct __attribute__((__packed__)) object_id @@ -98,8 +26,6 @@ struct __attribute__((__packed__)) object_id uint64_t stripe; }; -#include "blockstore_journal.h" - inline bool operator == (const object_id & a, const object_id & b) { return a.inode == b.inode && a.stripe == b.stripe; @@ -115,21 +41,6 @@ inline bool operator < (const object_id & a, const object_id & b) return a.inode < b.inode || a.inode == b.inode && a.stripe < b.stripe; } -// 24 bytes per "clean" entry on disk with fixed metadata tables -// FIXME: maybe add crc32's to metadata -struct __attribute__((__packed__)) clean_disk_entry -{ - object_id oid; - uint64_t version; -}; - -// 32 = 16 + 16 bytes per "clean" entry in memory (object_id => clean_entry) -struct __attribute__((__packed__)) clean_entry -{ - uint64_t version; - uint64_t location; -}; - // 56 = 24 + 32 bytes per dirty entry in memory (obj_ver_id => dirty_entry) struct __attribute__((__packed__)) obj_ver_id { @@ -142,45 +53,19 @@ inline bool operator < (const obj_ver_id & a, const obj_ver_id & b) return a.oid < b.oid || a.oid == b.oid && a.version < b.version; } -struct __attribute__((__packed__)) dirty_entry -{ - uint32_t state; - uint32_t flags; // unneeded, but present for alignment - uint64_t location; // location in either journal or data -> in BYTES - uint32_t offset; // data offset within object (stripe) - uint32_t len; // data length - uint64_t journal_sector; // journal sector used for this entry -}; - class oid_hash { public: size_t operator()(const object_id &s) const { size_t seed = 0; - spp::hash_combine(seed, s.inode); - spp::hash_combine(seed, s.stripe); + // Copy-pasted from spp::hash_combine() + seed ^= (s.inode + 0xc6a4a7935bd1e995 + (seed << 6) + (seed >> 2)); + seed ^= (s.stripe + 0xc6a4a7935bd1e995 + (seed << 6) + (seed >> 2)); return seed; } }; -// - Sync must be submitted after previous writes/deletes (not before!) -// - Reads to the same object must be submitted after previous writes/deletes -// are written (not necessarily synced) in their location. This is because we -// rely on read-modify-write for erasure coding and we must return new data -// to calculate parity for subsequent writes -// - Writes may be submitted in any order, because they don't overlap. Each write -// goes into a new location - either on the journal device or on the data device -// - Stable (stabilize) must be submitted after sync of that object is completed -// It's even OK to return an error to the caller if that object is not synced yet -// - Journal trim may be processed only after all versions are moved to -// the main storage AND after all read operations for older versions complete -// - If an operation can not be submitted because the ring is full -// we should stop submission of other operations. Otherwise some "scatter" reads -// may end up blocked for a long time. -// Otherwise, the submit order is free, that is all operations may be submitted immediately -// In fact, adding a write operation must immediately result in dirty_db being populated - #define OP_READ 1 #define OP_WRITE 2 #define OP_SYNC 3 @@ -188,21 +73,7 @@ public: #define OP_DELETE 5 #define OP_TYPE_MASK 0x7 -// Suspend operation until there are more free SQEs -#define WAIT_SQE 1 -// Suspend operation until version of object is written -#define WAIT_IN_FLIGHT 2 -// Suspend operation until there are bytes of free space in the journal on disk -#define WAIT_JOURNAL 3 -// Suspend operation until the next journal sector buffer is free -#define WAIT_JOURNAL_BUFFER 4 -// Suspend operation until there is some free space on the data device -#define WAIT_FREE 5 - -struct fulfill_read_t -{ - uint64_t offset, len; -}; +#define BS_OP_PRIVATE_DATA_SIZE 256 struct blockstore_op_t { @@ -222,126 +93,19 @@ struct blockstore_op_t void *buf; int retval; - // FIXME: Move internal fields somewhere - friend class blockstore; - friend class blockstore_journal_check_t; - friend void prepare_journal_sector_write(journal_t & journal, io_uring_sqe *sqe, std::function cb); -private: - // Wait status - int wait_for; - uint64_t wait_detail; - int pending_ops; - - // Read - std::vector read_vec; - - // Sync, write - uint64_t min_used_journal_sector, max_used_journal_sector; - - // Write - struct iovec iov_zerofill[3]; - - // Sync - std::vector sync_big_writes, sync_small_writes; - std::list::iterator in_progress_ptr; - int sync_state, prev_sync_count; + uint8_t private_data[BS_OP_PRIVATE_DATA_SIZE]; }; -#include "blockstore_init.h" +typedef std::unordered_map blockstore_config_t; -#include "blockstore_flush.h" +class blockstore_impl_t; -typedef spp::sparse_hash_map blockstore_config_t; - -class blockstore +class blockstore_t { - struct ring_consumer_t ring_consumer; - - // Another option is https://github.com/algorithm-ninja/cpp-btree - spp::sparse_hash_map clean_db; - std::map dirty_db; - std::list submit_queue; // FIXME: funny thing is that vector is better here - std::vector unsynced_big_writes, unsynced_small_writes; - std::list in_progress_syncs; // ...and probably here, too - allocator *data_alloc = NULL; - uint8_t *zero_object; - - uint64_t block_count; - uint32_t block_order, block_size; - - int meta_fd; - int data_fd; - - uint64_t meta_offset, meta_size, meta_area, meta_len; - uint64_t data_offset, data_size, data_len; - - bool readonly = false; - // FIXME: separate flags for data, metadata and journal - bool disable_fsync = false; - bool inmemory_meta = false; - void *metadata_buffer = NULL; - - struct journal_t journal; - journal_flusher_t *flusher; - - ring_loop_t *ringloop; - - bool stop_sync_submitted; - - inline struct io_uring_sqe* get_sqe() - { - return ringloop->get_sqe(); - } - - friend class blockstore_init_meta; - friend class blockstore_init_journal; - friend class blockstore_journal_check_t; - friend class journal_flusher_t; - friend class journal_flusher_co; - - void calc_lengths(blockstore_config_t & config); - void open_data(blockstore_config_t & config); - void open_meta(blockstore_config_t & config); - void open_journal(blockstore_config_t & config); - - // Asynchronous init - int initialized; - int metadata_buf_size; - blockstore_init_meta* metadata_init_reader; - blockstore_init_journal* journal_init_reader; - - void check_wait(blockstore_op_t *op); - - // Read - int dequeue_read(blockstore_op_t *read_op); - int fulfill_read(blockstore_op_t *read_op, uint64_t &fulfilled, uint32_t item_start, uint32_t item_end, - uint32_t item_state, uint64_t item_version, uint64_t item_location); - int fulfill_read_push(blockstore_op_t *op, void *buf, uint64_t offset, uint64_t len, - uint32_t item_state, uint64_t item_version); - void handle_read_event(ring_data_t *data, blockstore_op_t *op); - - // Write - void enqueue_write(blockstore_op_t *op); - int dequeue_write(blockstore_op_t *op); - int dequeue_del(blockstore_op_t *op); - void handle_write_event(ring_data_t *data, blockstore_op_t *op); - - // Sync - int dequeue_sync(blockstore_op_t *op); - void handle_sync_event(ring_data_t *data, blockstore_op_t *op); - int continue_sync(blockstore_op_t *op); - void ack_one_sync(blockstore_op_t *op); - int ack_sync(blockstore_op_t *op); - - // Stabilize - int dequeue_stable(blockstore_op_t *op); - void handle_stable_event(ring_data_t *data, blockstore_op_t *op); - void stabilize_object(object_id oid, uint64_t max_ver); - + blockstore_impl_t *impl; public: - - blockstore(blockstore_config_t & config, ring_loop_t *ringloop); - ~blockstore(); + blockstore_t(blockstore_config_t & config, ring_loop_t *ringloop); + ~blockstore_t(); // Event loop void loop(); @@ -359,9 +123,9 @@ public: void enqueue_op(blockstore_op_t *op); // Unstable writes are added here (map of object_id -> version) - std::map unstable_writes; + std::map & get_unstable_writes(); - inline uint32_t get_block_size() { return block_size; } - inline uint32_t get_block_order() { return block_order; } - inline uint64_t get_block_count() { return block_count; } + uint32_t get_block_size(); + uint32_t get_block_order(); + uint64_t get_block_count(); }; diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp index e0087e8aa..8d1a16479 100644 --- a/blockstore_flush.cpp +++ b/blockstore_flush.cpp @@ -1,6 +1,6 @@ -#include "blockstore.h" +#include "blockstore_impl.h" -journal_flusher_t::journal_flusher_t(int flusher_count, blockstore *bs) +journal_flusher_t::journal_flusher_t(int flusher_count, blockstore_impl_t *bs) { this->bs = bs; this->flusher_count = flusher_count; diff --git a/blockstore_flush.h b/blockstore_flush.h index f6e20eac1..7ac33ec94 100644 --- a/blockstore_flush.h +++ b/blockstore_flush.h @@ -32,7 +32,7 @@ class journal_flusher_t; // Journal flusher coroutine class journal_flusher_co { - blockstore *bs; + blockstore_impl_t *bs; journal_flusher_t *flusher; int wait_state, wait_count; struct io_uring_sqe *sqe; @@ -64,7 +64,7 @@ class journal_flusher_t int flusher_count; int sync_threshold; journal_flusher_co *co; - blockstore *bs; + blockstore_impl_t *bs; friend class journal_flusher_co; int journal_trim_counter, journal_trim_interval; @@ -78,7 +78,7 @@ class journal_flusher_t std::deque flush_queue; std::map flush_versions; public: - journal_flusher_t(int flusher_count, blockstore *bs); + journal_flusher_t(int flusher_count, blockstore_impl_t *bs); ~journal_flusher_t(); void loop(); bool is_active(); diff --git a/blockstore_impl.cpp b/blockstore_impl.cpp new file mode 100644 index 000000000..abdb851e3 --- /dev/null +++ b/blockstore_impl.cpp @@ -0,0 +1,303 @@ +#include "blockstore_impl.h" + +blockstore_impl_t::blockstore_impl_t(blockstore_config_t & config, ring_loop_t *ringloop) +{ + assert(sizeof(blockstore_op_private_t) <= BS_OP_PRIVATE_DATA_SIZE); + this->ringloop = ringloop; + ring_consumer.loop = [this]() { loop(); }; + ringloop->register_consumer(ring_consumer); + initialized = 0; + block_order = strtoull(config["block_size_order"].c_str(), NULL, 10); + if (block_order == 0) + { + block_order = DEFAULT_ORDER; + } + block_size = 1 << block_order; + if (block_size < MIN_BLOCK_SIZE || block_size >= MAX_BLOCK_SIZE) + { + throw std::runtime_error("Bad block size"); + } + zero_object = (uint8_t*)memalign(DISK_ALIGNMENT, block_size); + data_fd = meta_fd = journal.fd = -1; + try + { + open_data(config); + open_meta(config); + open_journal(config); + calc_lengths(config); + data_alloc = new allocator(block_count); + } + catch (std::exception & e) + { + if (data_fd >= 0) + close(data_fd); + if (meta_fd >= 0 && meta_fd != data_fd) + close(meta_fd); + if (journal.fd >= 0 && journal.fd != meta_fd) + close(journal.fd); + throw; + } + int flusher_count = strtoull(config["flusher_count"].c_str(), NULL, 10); + if (!flusher_count) + flusher_count = 32; + flusher = new journal_flusher_t(flusher_count, this); +} + +blockstore_impl_t::~blockstore_impl_t() +{ + delete data_alloc; + delete flusher; + free(zero_object); + ringloop->unregister_consumer(ring_consumer); + if (data_fd >= 0) + close(data_fd); + if (meta_fd >= 0 && meta_fd != data_fd) + close(meta_fd); + if (journal.fd >= 0 && journal.fd != meta_fd) + close(journal.fd); + if (metadata_buffer) + free(metadata_buffer); +} + +bool blockstore_impl_t::is_started() +{ + return initialized == 10; +} + +// main event loop - produce requests +void blockstore_impl_t::loop() +{ + if (initialized != 10) + { + // read metadata, then journal + if (initialized == 0) + { + metadata_init_reader = new blockstore_init_meta(this); + initialized = 1; + } + if (initialized == 1) + { + int res = metadata_init_reader->loop(); + if (!res) + { + delete metadata_init_reader; + metadata_init_reader = NULL; + journal_init_reader = new blockstore_init_journal(this); + initialized = 2; + } + } + if (initialized == 2) + { + int res = journal_init_reader->loop(); + if (!res) + { + delete journal_init_reader; + journal_init_reader = NULL; + initialized = 10; + } + } + } + else + { + // try to submit ops + auto cur_sync = in_progress_syncs.begin(); + while (cur_sync != in_progress_syncs.end()) + { + continue_sync(*cur_sync++); + } + auto cur = submit_queue.begin(); + int has_writes = 0; + while (cur != submit_queue.end()) + { + auto op_ptr = cur; + auto op = *(cur++); + // FIXME: This needs some simplification + // Writes should not block reads if the ring is not full and if reads don't depend on them + // In all other cases we should stop submission + if (PRIV(op)->wait_for) + { + check_wait(op); + if (PRIV(op)->wait_for == WAIT_SQE) + { + break; + } + else if (PRIV(op)->wait_for) + { + if ((op->flags & OP_TYPE_MASK) == OP_WRITE || + (op->flags & OP_TYPE_MASK) == OP_DELETE) + { + has_writes = 2; + } + continue; + } + } + unsigned ring_space = io_uring_sq_space_left(&ringloop->ring); + unsigned prev_sqe_pos = ringloop->ring.sq.sqe_tail; + int dequeue_op = 0; + if ((op->flags & OP_TYPE_MASK) == OP_READ) + { + dequeue_op = dequeue_read(op); + } + else if ((op->flags & OP_TYPE_MASK) == OP_WRITE || + (op->flags & OP_TYPE_MASK) == OP_DELETE) + { + if (has_writes == 2) + { + // Some writes could not be submitted + break; + } + dequeue_op = dequeue_write(op); + has_writes = dequeue_op ? 1 : 2; + } + else if ((op->flags & OP_TYPE_MASK) == OP_SYNC) + { + // wait for all small writes to be submitted + // wait for all big writes to complete, submit data device fsync + // wait for the data device fsync to complete, then submit journal writes for big writes + // then submit an fsync operation + if (has_writes) + { + // Can't submit SYNC before previous writes + continue; + } + dequeue_op = dequeue_sync(op); + } + else if ((op->flags & OP_TYPE_MASK) == OP_STABLE) + { + dequeue_op = dequeue_stable(op); + } + if (dequeue_op) + { + submit_queue.erase(op_ptr); + } + else + { + ringloop->ring.sq.sqe_tail = prev_sqe_pos; + if (PRIV(op)->wait_for == WAIT_SQE) + { + PRIV(op)->wait_detail = 1 + ring_space; + // ring is full, stop submission + break; + } + } + } + if (!readonly) + { + flusher->loop(); + } + int ret = ringloop->submit(); + if (ret < 0) + { + throw std::runtime_error(std::string("io_uring_submit: ") + strerror(-ret)); + } + } +} + +bool blockstore_impl_t::is_safe_to_stop() +{ + // It's safe to stop blockstore when there are no in-flight operations, + // no in-progress syncs and flusher isn't doing anything + if (submit_queue.size() > 0 || in_progress_syncs.size() > 0 || !readonly && flusher->is_active()) + { + return false; + } + if (unsynced_big_writes.size() > 0 || unsynced_small_writes.size() > 0) + { + if (!readonly && !stop_sync_submitted) + { + // We should sync the blockstore before unmounting + blockstore_op_t *op = new blockstore_op_t; + op->flags = OP_SYNC; + op->buf = NULL; + op->callback = [](blockstore_op_t *op) + { + delete op; + }; + enqueue_op(op); + stop_sync_submitted = true; + } + return false; + } + return true; +} + +void blockstore_impl_t::check_wait(blockstore_op_t *op) +{ + if (PRIV(op)->wait_for == WAIT_SQE) + { + if (io_uring_sq_space_left(&ringloop->ring) < PRIV(op)->wait_detail) + { + // stop submission if there's still no free space + return; + } + PRIV(op)->wait_for = 0; + } + else if (PRIV(op)->wait_for == WAIT_IN_FLIGHT) + { + auto dirty_it = dirty_db.find((obj_ver_id){ + .oid = op->oid, + .version = PRIV(op)->wait_detail, + }); + if (dirty_it != dirty_db.end() && IS_IN_FLIGHT(dirty_it->second.state)) + { + // do not submit + return; + } + PRIV(op)->wait_for = 0; + } + else if (PRIV(op)->wait_for == WAIT_JOURNAL) + { + if (journal.used_start == PRIV(op)->wait_detail) + { + // do not submit + return; + } + PRIV(op)->wait_for = 0; + } + else if (PRIV(op)->wait_for == WAIT_JOURNAL_BUFFER) + { + if (journal.sector_info[((journal.cur_sector + 1) % journal.sector_count)].usage_count > 0) + { + // do not submit + return; + } + PRIV(op)->wait_for = 0; + } + else if (PRIV(op)->wait_for == WAIT_FREE) + { + if (!data_alloc->get_free_count() && !flusher->is_active()) + { + return; + } + PRIV(op)->wait_for = 0; + } + else + { + throw std::runtime_error("BUG: op->wait_for value is unexpected"); + } +} + +void blockstore_impl_t::enqueue_op(blockstore_op_t *op) +{ + int type = op->flags & OP_TYPE_MASK; + if (type < OP_READ || type > OP_DELETE || (type == OP_READ || type == OP_WRITE) && + (op->offset >= block_size || op->len > block_size-op->offset || (op->len % DISK_ALIGNMENT)) || + readonly && type != OP_READ) + { + // Basic verification not passed + op->retval = -EINVAL; + op->callback(op); + return; + } + // Call constructor without allocating memory. We'll call destructor before returning op back + new ((void*)op->private_data) blockstore_op_private_t; + PRIV(op)->wait_for = 0; + PRIV(op)->sync_state = 0; + PRIV(op)->pending_ops = 0; + submit_queue.push_back(op); + if ((op->flags & OP_TYPE_MASK) == OP_WRITE) + { + enqueue_write(op); + } + ringloop->wakeup(); +} diff --git a/blockstore_impl.h b/blockstore_impl.h new file mode 100644 index 000000000..23010e38e --- /dev/null +++ b/blockstore_impl.h @@ -0,0 +1,279 @@ +#pragma once + +#include "blockstore.h" + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "sparsepp/sparsepp/spp.h" + +#include "allocator.h" + +//#define BLOCKSTORE_DEBUG + +// States are not stored on disk. Instead, they're deduced from the journal + +#define ST_J_IN_FLIGHT 1 +#define ST_J_SUBMITTED 2 +#define ST_J_WRITTEN 3 +#define ST_J_SYNCED 4 +#define ST_J_STABLE 5 + +#define ST_D_IN_FLIGHT 15 +#define ST_D_SUBMITTED 16 +#define ST_D_WRITTEN 17 +#define ST_D_META_WRITTEN 19 +#define ST_D_META_SYNCED 20 +#define ST_D_STABLE 21 + +#define ST_DEL_IN_FLIGHT 31 +#define ST_DEL_SUBMITTED 32 +#define ST_DEL_WRITTEN 33 +#define ST_DEL_SYNCED 34 +#define ST_DEL_STABLE 35 + +#define ST_CURRENT 48 + +#define IS_IN_FLIGHT(st) (st == ST_J_IN_FLIGHT || st == ST_D_IN_FLIGHT || st == ST_DEL_IN_FLIGHT || st == ST_J_SUBMITTED || st == ST_D_SUBMITTED || st == ST_DEL_SUBMITTED) +#define IS_STABLE(st) (st == ST_J_STABLE || st == ST_D_STABLE || st == ST_DEL_STABLE || st == ST_CURRENT) +#define IS_SYNCED(st) (IS_STABLE(st) || st == ST_J_SYNCED || st == ST_D_META_SYNCED || st == ST_DEL_SYNCED) +#define IS_JOURNAL(st) (st >= ST_J_SUBMITTED && st <= ST_J_STABLE) +#define IS_BIG_WRITE(st) (st >= ST_D_SUBMITTED && st <= ST_D_STABLE) +#define IS_DELETE(st) (st >= ST_DEL_SUBMITTED && st <= ST_DEL_STABLE) +#define IS_UNSYNCED(st) (st >= ST_J_SUBMITTED && st <= ST_J_WRITTEN || st >= ST_D_SUBMITTED && st <= ST_D_META_WRITTEN || st >= ST_DEL_SUBMITTED && st <= ST_DEL_WRITTEN) + +#define BS_SUBMIT_GET_SQE(sqe, data) \ + BS_SUBMIT_GET_ONLY_SQE(sqe); \ + struct ring_data_t *data = ((ring_data_t*)sqe->user_data) + +#define BS_SUBMIT_GET_ONLY_SQE(sqe) \ + struct io_uring_sqe *sqe = get_sqe();\ + if (!sqe)\ + {\ + /* Pause until there are more requests available */\ + PRIV(op)->wait_for = WAIT_SQE;\ + return 0;\ + } + +#define BS_SUBMIT_GET_SQE_DECL(sqe) \ + sqe = get_sqe();\ + if (!sqe)\ + {\ + /* Pause until there are more requests available */\ + PRIV(op)->wait_for = WAIT_SQE;\ + return 0;\ + } + +#include "blockstore_journal.h" + +// 24 bytes per "clean" entry on disk with fixed metadata tables +// FIXME: maybe add crc32's to metadata +struct __attribute__((__packed__)) clean_disk_entry +{ + object_id oid; + uint64_t version; +}; + +// 32 = 16 + 16 bytes per "clean" entry in memory (object_id => clean_entry) +struct __attribute__((__packed__)) clean_entry +{ + uint64_t version; + uint64_t location; +}; + +// 56 = 24 + 32 bytes per dirty entry in memory (obj_ver_id => dirty_entry) +struct __attribute__((__packed__)) dirty_entry +{ + uint32_t state; + uint32_t flags; // unneeded, but present for alignment + uint64_t location; // location in either journal or data -> in BYTES + uint32_t offset; // data offset within object (stripe) + uint32_t len; // data length + uint64_t journal_sector; // journal sector used for this entry +}; + +// - Sync must be submitted after previous writes/deletes (not before!) +// - Reads to the same object must be submitted after previous writes/deletes +// are written (not necessarily synced) in their location. This is because we +// rely on read-modify-write for erasure coding and we must return new data +// to calculate parity for subsequent writes +// - Writes may be submitted in any order, because they don't overlap. Each write +// goes into a new location - either on the journal device or on the data device +// - Stable (stabilize) must be submitted after sync of that object is completed +// It's even OK to return an error to the caller if that object is not synced yet +// - Journal trim may be processed only after all versions are moved to +// the main storage AND after all read operations for older versions complete +// - If an operation can not be submitted because the ring is full +// we should stop submission of other operations. Otherwise some "scatter" reads +// may end up blocked for a long time. +// Otherwise, the submit order is free, that is all operations may be submitted immediately +// In fact, adding a write operation must immediately result in dirty_db being populated + +// Suspend operation until there are more free SQEs +#define WAIT_SQE 1 +// Suspend operation until version of object is written +#define WAIT_IN_FLIGHT 2 +// Suspend operation until there are bytes of free space in the journal on disk +#define WAIT_JOURNAL 3 +// Suspend operation until the next journal sector buffer is free +#define WAIT_JOURNAL_BUFFER 4 +// Suspend operation until there is some free space on the data device +#define WAIT_FREE 5 + +struct fulfill_read_t +{ + uint64_t offset, len; +}; + +#define PRIV(op) ((blockstore_op_private_t*)(op)->private_data) +#define FINISH_OP(op) PRIV(op)->~blockstore_op_private_t(); op->callback(op) + +struct blockstore_op_private_t +{ + // Wait status + int wait_for; + uint64_t wait_detail; + int pending_ops; + + // Read + std::vector read_vec; + + // Sync, write + uint64_t min_used_journal_sector, max_used_journal_sector; + + // Write + struct iovec iov_zerofill[3]; + + // Sync + std::vector sync_big_writes, sync_small_writes; + std::list::iterator in_progress_ptr; + int sync_state, prev_sync_count; +}; + +#include "blockstore_init.h" + +#include "blockstore_flush.h" + +class blockstore_impl_t +{ + struct ring_consumer_t ring_consumer; + + // Another option is https://github.com/algorithm-ninja/cpp-btree + spp::sparse_hash_map clean_db; + std::map dirty_db; + std::list submit_queue; // FIXME: funny thing is that vector is better here + std::vector unsynced_big_writes, unsynced_small_writes; + std::list in_progress_syncs; // ...and probably here, too + allocator *data_alloc = NULL; + uint8_t *zero_object; + + uint64_t block_count; + uint32_t block_order, block_size; + + int meta_fd; + int data_fd; + + uint64_t meta_offset, meta_size, meta_area, meta_len; + uint64_t data_offset, data_size, data_len; + + bool readonly = false; + // FIXME: separate flags for data, metadata and journal + bool disable_fsync = false; + bool inmemory_meta = false; + void *metadata_buffer = NULL; + + struct journal_t journal; + journal_flusher_t *flusher; + + ring_loop_t *ringloop; + + bool stop_sync_submitted; + + inline struct io_uring_sqe* get_sqe() + { + return ringloop->get_sqe(); + } + + friend class blockstore_init_meta; + friend class blockstore_init_journal; + friend class blockstore_journal_check_t; + friend class journal_flusher_t; + friend class journal_flusher_co; + + void calc_lengths(blockstore_config_t & config); + void open_data(blockstore_config_t & config); + void open_meta(blockstore_config_t & config); + void open_journal(blockstore_config_t & config); + + // Asynchronous init + int initialized; + int metadata_buf_size; + blockstore_init_meta* metadata_init_reader; + blockstore_init_journal* journal_init_reader; + + void check_wait(blockstore_op_t *op); + + // Read + int dequeue_read(blockstore_op_t *read_op); + int fulfill_read(blockstore_op_t *read_op, uint64_t &fulfilled, uint32_t item_start, uint32_t item_end, + uint32_t item_state, uint64_t item_version, uint64_t item_location); + int fulfill_read_push(blockstore_op_t *op, void *buf, uint64_t offset, uint64_t len, + uint32_t item_state, uint64_t item_version); + void handle_read_event(ring_data_t *data, blockstore_op_t *op); + + // Write + void enqueue_write(blockstore_op_t *op); + int dequeue_write(blockstore_op_t *op); + int dequeue_del(blockstore_op_t *op); + void handle_write_event(ring_data_t *data, blockstore_op_t *op); + + // Sync + int dequeue_sync(blockstore_op_t *op); + void handle_sync_event(ring_data_t *data, blockstore_op_t *op); + int continue_sync(blockstore_op_t *op); + void ack_one_sync(blockstore_op_t *op); + int ack_sync(blockstore_op_t *op); + + // Stabilize + int dequeue_stable(blockstore_op_t *op); + void handle_stable_event(ring_data_t *data, blockstore_op_t *op); + void stabilize_object(object_id oid, uint64_t max_ver); + +public: + + blockstore_impl_t(blockstore_config_t & config, ring_loop_t *ringloop); + ~blockstore_impl_t(); + + // Event loop + void loop(); + + // Returns true when blockstore is ready to process operations + // (Although you're free to enqueue them before that) + bool is_started(); + + // Returns true when it's safe to destroy the instance. If destroying the instance + // requires to purge some queues, starts that process. Should be called in the event + // loop until it returns true. + bool is_safe_to_stop(); + + // Submission + void enqueue_op(blockstore_op_t *op); + + // Unstable writes are added here (map of object_id -> version) + std::map unstable_writes; + + inline uint32_t get_block_size() { return block_size; } + inline uint32_t get_block_order() { return block_order; } + inline uint64_t get_block_count() { return block_count; } +}; diff --git a/blockstore_init.cpp b/blockstore_init.cpp index 38c846c39..55f26d6d2 100644 --- a/blockstore_init.cpp +++ b/blockstore_init.cpp @@ -1,6 +1,6 @@ -#include "blockstore.h" +#include "blockstore_impl.h" -blockstore_init_meta::blockstore_init_meta(blockstore *bs) +blockstore_init_meta::blockstore_init_meta(blockstore_impl_t *bs) { this->bs = bs; } @@ -128,7 +128,7 @@ void blockstore_init_meta::handle_entries(struct clean_disk_entry* entries, unsi } } -blockstore_init_journal::blockstore_init_journal(blockstore *bs) +blockstore_init_journal::blockstore_init_journal(blockstore_impl_t *bs) { this->bs = bs; simple_callback = [this](ring_data_t *data1) diff --git a/blockstore_init.h b/blockstore_init.h index 395ac58bb..6d5e9960b 100644 --- a/blockstore_init.h +++ b/blockstore_init.h @@ -2,7 +2,7 @@ class blockstore_init_meta { - blockstore *bs; + blockstore_impl_t *bs; int wait_state = 0, wait_count = 0; void *metadata_buffer = NULL; uint64_t metadata_read = 0; @@ -14,7 +14,7 @@ class blockstore_init_meta void handle_entries(struct clean_disk_entry* entries, unsigned count, int block_order); void handle_event(ring_data_t *data); public: - blockstore_init_meta(blockstore *bs); + blockstore_init_meta(blockstore_impl_t *bs); int loop(); }; @@ -26,11 +26,12 @@ struct bs_init_journal_done class blockstore_init_journal { - blockstore *bs; + blockstore_impl_t *bs; int wait_state = 0, wait_count = 0, handle_res = 0; uint64_t entries_loaded = 0; uint32_t crc32_last = 0; bool started = false; + // FIXME: use DISK_ALIGNMENT everywhere uint64_t next_free = 512; std::vector done; uint64_t journal_pos = 0; @@ -46,6 +47,6 @@ class blockstore_init_journal int handle_journal_part(void *buf, uint64_t done_pos, uint64_t len); void handle_event(ring_data_t *data); public: - blockstore_init_journal(blockstore* bs); + blockstore_init_journal(blockstore_impl_t* bs); int loop(); }; diff --git a/blockstore_journal.cpp b/blockstore_journal.cpp index 9df489ec7..2ecb5ee27 100644 --- a/blockstore_journal.cpp +++ b/blockstore_journal.cpp @@ -1,6 +1,6 @@ -#include "blockstore.h" +#include "blockstore_impl.h" -blockstore_journal_check_t::blockstore_journal_check_t(blockstore *bs) +blockstore_journal_check_t::blockstore_journal_check_t(blockstore_impl_t *bs) { this->bs = bs; sectors_required = 0; @@ -40,7 +40,7 @@ int blockstore_journal_check_t::check_available(blockstore_op_t *op, int require if (bs->journal.sector_info[next_sector].usage_count > 0) { // No memory buffer available. Wait for it. - op->wait_for = WAIT_JOURNAL_BUFFER; + PRIV(op)->wait_for = WAIT_JOURNAL_BUFFER; return 0; } } @@ -56,9 +56,9 @@ int blockstore_journal_check_t::check_available(blockstore_op_t *op, int require if (!right_dir && next_pos >= bs->journal.used_start-512) { // No space in the journal. Wait until used_start changes. - op->wait_for = WAIT_JOURNAL; + PRIV(op)->wait_for = WAIT_JOURNAL; bs->flusher->force_start(); - op->wait_detail = bs->journal.used_start; + PRIV(op)->wait_detail = bs->journal.used_start; return 0; } return 1; diff --git a/blockstore_journal.h b/blockstore_journal.h index 2d7557cbb..1e97dd8ec 100644 --- a/blockstore_journal.h +++ b/blockstore_journal.h @@ -138,12 +138,12 @@ struct journal_t struct blockstore_journal_check_t { - blockstore *bs; + blockstore_impl_t *bs; uint64_t next_pos, next_sector, next_in_pos; int sectors_required; bool right_dir; // writing to the end or the beginning of the ring buffer - blockstore_journal_check_t(blockstore *bs); + blockstore_journal_check_t(blockstore_impl_t *bs); int check_available(blockstore_op_t *op, int required, int size, int data_after); }; diff --git a/blockstore_open.cpp b/blockstore_open.cpp index ae2e94385..b330a7a73 100644 --- a/blockstore_open.cpp +++ b/blockstore_open.cpp @@ -1,6 +1,6 @@ -#include "blockstore.h" +#include "blockstore_impl.h" -void blockstore::calc_lengths(blockstore_config_t & config) +void blockstore_impl_t::calc_lengths(blockstore_config_t & config) { if (config["readonly"] == "true" || config["readonly"] == "1" || config["readonly"] == "yes") { @@ -111,7 +111,7 @@ void check_size(int fd, uint64_t *size, std::string name) } } -void blockstore::open_data(blockstore_config_t & config) +void blockstore_impl_t::open_data(blockstore_config_t & config) { data_offset = strtoull(config["data_offset"].c_str(), NULL, 10); if (data_offset % DISK_ALIGNMENT) @@ -130,7 +130,7 @@ void blockstore::open_data(blockstore_config_t & config) } } -void blockstore::open_meta(blockstore_config_t & config) +void blockstore_impl_t::open_meta(blockstore_config_t & config) { meta_offset = strtoull(config["meta_offset"].c_str(), NULL, 10); if (meta_offset % DISK_ALIGNMENT) @@ -162,7 +162,7 @@ void blockstore::open_meta(blockstore_config_t & config) } } -void blockstore::open_journal(blockstore_config_t & config) +void blockstore_impl_t::open_journal(blockstore_config_t & config) { journal.offset = strtoull(config["journal_offset"].c_str(), NULL, 10); if (journal.offset % DISK_ALIGNMENT) diff --git a/blockstore_read.cpp b/blockstore_read.cpp index 9d9df07c2..11df30f01 100644 --- a/blockstore_read.cpp +++ b/blockstore_read.cpp @@ -1,13 +1,13 @@ -#include "blockstore.h" +#include "blockstore_impl.h" -int blockstore::fulfill_read_push(blockstore_op_t *op, void *buf, uint64_t offset, uint64_t len, +int blockstore_impl_t::fulfill_read_push(blockstore_op_t *op, void *buf, uint64_t offset, uint64_t len, uint32_t item_state, uint64_t item_version) { if (IS_IN_FLIGHT(item_state)) { // Pause until it's written somewhere - op->wait_for = WAIT_IN_FLIGHT; - op->wait_detail = item_version; + PRIV(op)->wait_for = WAIT_IN_FLIGHT; + PRIV(op)->wait_detail = item_version; return 0; } else if (IS_DELETE(item_state)) @@ -23,7 +23,7 @@ int blockstore::fulfill_read_push(blockstore_op_t *op, void *buf, uint64_t offse } BS_SUBMIT_GET_SQE(sqe, data); data->iov = (struct iovec){ buf, len }; - op->pending_ops++; + PRIV(op)->pending_ops++; my_uring_prep_readv( sqe, IS_JOURNAL(item_state) ? journal.fd : data_fd, @@ -34,7 +34,7 @@ int blockstore::fulfill_read_push(blockstore_op_t *op, void *buf, uint64_t offse return 1; } -int blockstore::fulfill_read(blockstore_op_t *read_op, uint64_t &fulfilled, uint32_t item_start, uint32_t item_end, +int blockstore_impl_t::fulfill_read(blockstore_op_t *read_op, uint64_t &fulfilled, uint32_t item_start, uint32_t item_end, uint32_t item_state, uint64_t item_version, uint64_t item_location) { uint32_t cur_start = item_start; @@ -42,19 +42,19 @@ int blockstore::fulfill_read(blockstore_op_t *read_op, uint64_t &fulfilled, uint { cur_start = cur_start < read_op->offset ? read_op->offset : cur_start; item_end = item_end > read_op->offset + read_op->len ? read_op->offset + read_op->len : item_end; - auto it = read_op->read_vec.begin(); + auto it = PRIV(read_op)->read_vec.begin(); while (1) { - for (; it != read_op->read_vec.end(); it++) + for (; it != PRIV(read_op)->read_vec.end(); it++) if (it->offset >= cur_start) break; - if (it == read_op->read_vec.end() || it->offset > cur_start) + if (it == PRIV(read_op)->read_vec.end() || it->offset > cur_start) { fulfill_read_t el = { .offset = cur_start, - .len = it == read_op->read_vec.end() || it->offset >= item_end ? item_end-cur_start : it->offset-cur_start, + .len = it == PRIV(read_op)->read_vec.end() || it->offset >= item_end ? item_end-cur_start : it->offset-cur_start, }; - it = read_op->read_vec.insert(it, el); + it = PRIV(read_op)->read_vec.insert(it, el); fulfilled += el.len; if (!fulfill_read_push(read_op, read_op->buf + el.offset - read_op->offset, item_location + el.offset - item_start, el.len, item_state, item_version)) { @@ -62,14 +62,14 @@ int blockstore::fulfill_read(blockstore_op_t *read_op, uint64_t &fulfilled, uint } } cur_start = it->offset + it->len; - if (it == read_op->read_vec.end() || cur_start >= item_end) + if (it == PRIV(read_op)->read_vec.end() || cur_start >= item_end) break; } } return 1; } -int blockstore::dequeue_read(blockstore_op_t *read_op) +int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op) { auto clean_it = clean_db.find(read_op->oid); auto dirty_it = dirty_db.upper_bound((obj_ver_id){ @@ -89,7 +89,7 @@ int blockstore::dequeue_read(blockstore_op_t *read_op) return 1; } uint64_t fulfilled = 0; - read_op->pending_ops = 0; + PRIV(read_op)->pending_ops = 0; if (dirty_found) { while (dirty_it->first.oid == read_op->oid) @@ -108,7 +108,7 @@ int blockstore::dequeue_read(blockstore_op_t *read_op) dirty.state, dirty_it->first.version, dirty.location)) { // need to wait. undo added requests, don't dequeue op - read_op->read_vec.clear(); + PRIV(read_op)->read_vec.clear(); return 0; } } @@ -124,14 +124,14 @@ int blockstore::dequeue_read(blockstore_op_t *read_op) if (!fulfill_read(read_op, fulfilled, 0, block_size, ST_CURRENT, 0, clean_it->second.location)) { // need to wait. undo added requests, don't dequeue op - read_op->read_vec.clear(); + PRIV(read_op)->read_vec.clear(); return 0; } } - if (!read_op->pending_ops) + if (!PRIV(read_op)->pending_ops) { // everything is fulfilled from memory - if (!read_op->read_vec.size()) + if (!PRIV(read_op)->read_vec.size()) { // region is not allocated - return zeroes memset(read_op->buf, 0, read_op->len); @@ -148,18 +148,18 @@ int blockstore::dequeue_read(blockstore_op_t *read_op) return 1; } -void blockstore::handle_read_event(ring_data_t *data, blockstore_op_t *op) +void blockstore_impl_t::handle_read_event(ring_data_t *data, blockstore_op_t *op) { - op->pending_ops--; + PRIV(op)->pending_ops--; if (data->res != data->iov.iov_len) { // read error op->retval = data->res; } - if (op->pending_ops == 0) + if (PRIV(op)->pending_ops == 0) { if (op->retval == 0) op->retval = op->len; - op->callback(op); + FINISH_OP(op); } } diff --git a/blockstore_stable.cpp b/blockstore_stable.cpp index 6fa7186ff..ace475306 100644 --- a/blockstore_stable.cpp +++ b/blockstore_stable.cpp @@ -1,4 +1,4 @@ -#include "blockstore.h" +#include "blockstore_impl.h" // Stabilize small write: // 1) Copy data from the journal to the data device @@ -38,7 +38,7 @@ // 4) after a while it takes his synced object list and sends stabilize requests // to peers and to its own blockstore, thus freeing the old version -int blockstore::dequeue_stable(blockstore_op_t *op) +int blockstore_impl_t::dequeue_stable(blockstore_op_t *op) { obj_ver_id* v; int i, todo = 0; @@ -52,7 +52,7 @@ int blockstore::dequeue_stable(blockstore_op_t *op) { // No such object version op->retval = -EINVAL; - op->callback(op); + FINISH_OP(op); return 1; } else @@ -64,7 +64,7 @@ int blockstore::dequeue_stable(blockstore_op_t *op) { // Object not synced yet. Caller must sync it first op->retval = EAGAIN; - op->callback(op); + FINISH_OP(op); return 1; } else if (!IS_STABLE(dirty_it->second.state)) @@ -76,7 +76,7 @@ int blockstore::dequeue_stable(blockstore_op_t *op) { // Already stable op->retval = 0; - op->callback(op); + FINISH_OP(op); return 1; } // Check journal space @@ -111,17 +111,17 @@ int blockstore::dequeue_stable(blockstore_op_t *op) if (cur_sector != journal.cur_sector) { if (cur_sector == -1) - op->min_used_journal_sector = 1 + journal.cur_sector; + PRIV(op)->min_used_journal_sector = 1 + journal.cur_sector; cur_sector = journal.cur_sector; prepare_journal_sector_write(journal, sqe[s++], cb); } } - op->max_used_journal_sector = 1 + journal.cur_sector; - op->pending_ops = s; + PRIV(op)->max_used_journal_sector = 1 + journal.cur_sector; + PRIV(op)->pending_ops = s; return 1; } -void blockstore::handle_stable_event(ring_data_t *data, blockstore_op_t *op) +void blockstore_impl_t::handle_stable_event(ring_data_t *data, blockstore_op_t *op) { if (data->res != data->iov.iov_len) { @@ -130,21 +130,21 @@ void blockstore::handle_stable_event(ring_data_t *data, blockstore_op_t *op) "). in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111" ); } - op->pending_ops--; - if (op->pending_ops == 0) + PRIV(op)->pending_ops--; + if (PRIV(op)->pending_ops == 0) { // Release used journal sectors - if (op->min_used_journal_sector > 0) + if (PRIV(op)->min_used_journal_sector > 0) { - uint64_t s = op->min_used_journal_sector; + uint64_t s = PRIV(op)->min_used_journal_sector; while (1) { journal.sector_info[s-1].usage_count--; - if (s == op->max_used_journal_sector) + if (s == PRIV(op)->max_used_journal_sector) break; s = 1 + s % journal.sector_count; } - op->min_used_journal_sector = op->max_used_journal_sector = 0; + PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 0; } // First step: mark dirty_db entries as stable, acknowledge op completion obj_ver_id* v; @@ -191,6 +191,6 @@ void blockstore::handle_stable_event(ring_data_t *data, blockstore_op_t *op) } // Acknowledge op op->retval = 0; - op->callback(op); + FINISH_OP(op); } } diff --git a/blockstore_sync.cpp b/blockstore_sync.cpp index 9c68cccc9..f35227ca0 100644 --- a/blockstore_sync.cpp +++ b/blockstore_sync.cpp @@ -1,4 +1,4 @@ -#include "blockstore.h" +#include "blockstore_impl.h" #define SYNC_HAS_SMALL 1 #define SYNC_HAS_BIG 2 @@ -7,36 +7,36 @@ #define SYNC_JOURNAL_SYNC_SENT 5 #define SYNC_DONE 6 -int blockstore::dequeue_sync(blockstore_op_t *op) +int blockstore_impl_t::dequeue_sync(blockstore_op_t *op) { - if (op->sync_state == 0) + if (PRIV(op)->sync_state == 0) { stop_sync_submitted = false; - op->sync_big_writes.swap(unsynced_big_writes); - op->sync_small_writes.swap(unsynced_small_writes); - if (op->sync_big_writes.size() > 0) - op->sync_state = SYNC_HAS_BIG; - else if (op->sync_small_writes.size() > 0) - op->sync_state = SYNC_HAS_SMALL; + PRIV(op)->sync_big_writes.swap(unsynced_big_writes); + PRIV(op)->sync_small_writes.swap(unsynced_small_writes); + if (PRIV(op)->sync_big_writes.size() > 0) + PRIV(op)->sync_state = SYNC_HAS_BIG; + else if (PRIV(op)->sync_small_writes.size() > 0) + PRIV(op)->sync_state = SYNC_HAS_SMALL; else - op->sync_state = SYNC_DONE; + PRIV(op)->sync_state = SYNC_DONE; unsynced_big_writes.clear(); unsynced_small_writes.clear(); } int r = continue_sync(op); if (r) { - op->prev_sync_count = in_progress_syncs.size(); - op->in_progress_ptr = in_progress_syncs.insert(in_progress_syncs.end(), op); + PRIV(op)->prev_sync_count = in_progress_syncs.size(); + PRIV(op)->in_progress_ptr = in_progress_syncs.insert(in_progress_syncs.end(), op); ack_sync(op); } return r; } -int blockstore::continue_sync(blockstore_op_t *op) +int blockstore_impl_t::continue_sync(blockstore_op_t *op) { auto cb = [this, op](ring_data_t *data) { handle_sync_event(data, op); }; - if (op->sync_state == SYNC_HAS_SMALL) + if (PRIV(op)->sync_state == SYNC_HAS_SMALL) { // No big writes, just fsync the journal if (!disable_fsync) @@ -45,16 +45,16 @@ int blockstore::continue_sync(blockstore_op_t *op) my_uring_prep_fsync(sqe, journal.fd, IORING_FSYNC_DATASYNC); data->iov = { 0 }; data->callback = cb; - op->min_used_journal_sector = op->max_used_journal_sector = 0; - op->pending_ops = 1; - op->sync_state = SYNC_JOURNAL_SYNC_SENT; + PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 0; + PRIV(op)->pending_ops = 1; + PRIV(op)->sync_state = SYNC_JOURNAL_SYNC_SENT; } else { - op->sync_state = SYNC_DONE; + PRIV(op)->sync_state = SYNC_DONE; } } - else if (op->sync_state == SYNC_HAS_BIG) + else if (PRIV(op)->sync_state == SYNC_HAS_BIG) { // 1st step: fsync data if (!disable_fsync) @@ -63,21 +63,21 @@ int blockstore::continue_sync(blockstore_op_t *op) my_uring_prep_fsync(sqe, data_fd, IORING_FSYNC_DATASYNC); data->iov = { 0 }; data->callback = cb; - op->min_used_journal_sector = op->max_used_journal_sector = 0; - op->pending_ops = 1; - op->sync_state = SYNC_DATA_SYNC_SENT; + PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 0; + PRIV(op)->pending_ops = 1; + PRIV(op)->sync_state = SYNC_DATA_SYNC_SENT; } else { - op->sync_state = SYNC_DATA_SYNC_DONE; + PRIV(op)->sync_state = SYNC_DATA_SYNC_DONE; } } - if (op->sync_state == SYNC_DATA_SYNC_DONE) + if (PRIV(op)->sync_state == SYNC_DATA_SYNC_DONE) { // 2nd step: Data device is synced, prepare & write journal entries // Check space in the journal and journal memory buffers blockstore_journal_check_t space_check(this); - if (!space_check.check_available(op, op->sync_big_writes.size(), sizeof(journal_entry_big_write), 0)) + if (!space_check.check_available(op, PRIV(op)->sync_big_writes.size(), sizeof(journal_entry_big_write), 0)) { return 0; } @@ -88,9 +88,9 @@ int blockstore::continue_sync(blockstore_op_t *op) BS_SUBMIT_GET_SQE_DECL(sqe[i]); } // Prepare and submit journal entries - auto it = op->sync_big_writes.begin(); + auto it = PRIV(op)->sync_big_writes.begin(); int s = 0, cur_sector = -1; - while (it != op->sync_big_writes.end()) + while (it != PRIV(op)->sync_big_writes.end()) { journal_entry_big_write *je = (journal_entry_big_write*) prefill_single_journal_entry(journal, JE_BIG_WRITE, sizeof(journal_entry_big_write)); @@ -108,12 +108,12 @@ int blockstore::continue_sync(blockstore_op_t *op) if (cur_sector != journal.cur_sector) { if (cur_sector == -1) - op->min_used_journal_sector = 1 + journal.cur_sector; + PRIV(op)->min_used_journal_sector = 1 + journal.cur_sector; cur_sector = journal.cur_sector; prepare_journal_sector_write(journal, sqe[s++], cb); } } - op->max_used_journal_sector = 1 + journal.cur_sector; + PRIV(op)->max_used_journal_sector = 1 + journal.cur_sector; // ... And a journal fsync if (!disable_fsync) { @@ -121,17 +121,17 @@ int blockstore::continue_sync(blockstore_op_t *op) struct ring_data_t *data = ((ring_data_t*)sqe[s]->user_data); data->iov = { 0 }; data->callback = cb; - op->pending_ops = 1 + s; + PRIV(op)->pending_ops = 1 + s; } else - op->pending_ops = s; - op->sync_state = SYNC_JOURNAL_SYNC_SENT; + PRIV(op)->pending_ops = s; + PRIV(op)->sync_state = SYNC_JOURNAL_SYNC_SENT; ringloop->submit(); } return 1; } -void blockstore::handle_sync_event(ring_data_t *data, blockstore_op_t *op) +void blockstore_impl_t::handle_sync_event(ring_data_t *data, blockstore_op_t *op) { if (data->res != data->iov.iov_len) { @@ -140,30 +140,30 @@ void blockstore::handle_sync_event(ring_data_t *data, blockstore_op_t *op) "). in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111" ); } - op->pending_ops--; - if (op->pending_ops == 0) + PRIV(op)->pending_ops--; + if (PRIV(op)->pending_ops == 0) { // Release used journal sectors - if (op->min_used_journal_sector > 0) + if (PRIV(op)->min_used_journal_sector > 0) { - uint64_t s = op->min_used_journal_sector; + uint64_t s = PRIV(op)->min_used_journal_sector; while (1) { journal.sector_info[s-1].usage_count--; - if (s == op->max_used_journal_sector) + if (s == PRIV(op)->max_used_journal_sector) break; s = 1 + s % journal.sector_count; } - op->min_used_journal_sector = op->max_used_journal_sector = 0; + PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 0; } // Handle states - if (op->sync_state == SYNC_DATA_SYNC_SENT) + if (PRIV(op)->sync_state == SYNC_DATA_SYNC_SENT) { - op->sync_state = SYNC_DATA_SYNC_DONE; + PRIV(op)->sync_state = SYNC_DATA_SYNC_DONE; } - else if (op->sync_state == SYNC_JOURNAL_SYNC_SENT) + else if (PRIV(op)->sync_state == SYNC_JOURNAL_SYNC_SENT) { - op->sync_state = SYNC_DONE; + PRIV(op)->sync_state = SYNC_DONE; ack_sync(op); } else @@ -173,12 +173,12 @@ void blockstore::handle_sync_event(ring_data_t *data, blockstore_op_t *op) } } -int blockstore::ack_sync(blockstore_op_t *op) +int blockstore_impl_t::ack_sync(blockstore_op_t *op) { - if (op->sync_state == SYNC_DONE && op->prev_sync_count == 0) + if (PRIV(op)->sync_state == SYNC_DONE && PRIV(op)->prev_sync_count == 0) { // Remove dependency of subsequent syncs - auto it = op->in_progress_ptr; + auto it = PRIV(op)->in_progress_ptr; int done_syncs = 1; ++it; // Acknowledge sync @@ -186,8 +186,8 @@ int blockstore::ack_sync(blockstore_op_t *op) while (it != in_progress_syncs.end()) { auto & next_sync = *it++; - next_sync->prev_sync_count -= done_syncs; - if (next_sync->prev_sync_count == 0 && next_sync->sync_state == SYNC_DONE) + PRIV(next_sync)->prev_sync_count -= done_syncs; + if (PRIV(next_sync)->prev_sync_count == 0 && PRIV(next_sync)->sync_state == SYNC_DONE) { done_syncs++; // Acknowledge next_sync @@ -199,10 +199,10 @@ int blockstore::ack_sync(blockstore_op_t *op) return 0; } -void blockstore::ack_one_sync(blockstore_op_t *op) +void blockstore_impl_t::ack_one_sync(blockstore_op_t *op) { // Handle states - for (auto it = op->sync_big_writes.begin(); it != op->sync_big_writes.end(); it++) + for (auto it = PRIV(op)->sync_big_writes.begin(); it != PRIV(op)->sync_big_writes.end(); it++) { #ifdef BLOCKSTORE_DEBUG printf("Ack sync big %lu:%lu v%lu\n", it->oid.inode, it->oid.stripe, it->version); @@ -211,7 +211,7 @@ void blockstore::ack_one_sync(blockstore_op_t *op) unstab = unstab < it->version ? it->version : unstab; dirty_db[*it].state = ST_D_META_SYNCED; } - for (auto it = op->sync_small_writes.begin(); it != op->sync_small_writes.end(); it++) + for (auto it = PRIV(op)->sync_small_writes.begin(); it != PRIV(op)->sync_small_writes.end(); it++) { #ifdef BLOCKSTORE_DEBUG printf("Ack sync small %lu:%lu v%lu\n", it->oid.inode, it->oid.stripe, it->version); @@ -220,7 +220,7 @@ void blockstore::ack_one_sync(blockstore_op_t *op) unstab = unstab < it->version ? it->version : unstab; dirty_db[*it].state = dirty_db[*it].state == ST_DEL_WRITTEN ? ST_DEL_SYNCED : ST_J_SYNCED; } - in_progress_syncs.erase(op->in_progress_ptr); + in_progress_syncs.erase(PRIV(op)->in_progress_ptr); op->retval = 0; - op->callback(op); + FINISH_OP(op); } diff --git a/blockstore_write.cpp b/blockstore_write.cpp index 4cda85b26..24ada2dc1 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -1,6 +1,6 @@ -#include "blockstore.h" +#include "blockstore_impl.h" -void blockstore::enqueue_write(blockstore_op_t *op) +void blockstore_impl_t::enqueue_write(blockstore_op_t *op) { // Assign version number bool found = false, deleted = false, is_del = (op->flags & OP_TYPE_MASK) == OP_DELETE; @@ -35,7 +35,7 @@ void blockstore::enqueue_write(blockstore_op_t *op) { // Already deleted op->retval = 0; - op->callback(op); + FINISH_OP(op); return; } // Immediately add the operation into dirty_db, so subsequent reads could see it @@ -60,7 +60,7 @@ void blockstore::enqueue_write(blockstore_op_t *op) } // First step of the write algorithm: dequeue operation and submit initial write(s) -int blockstore::dequeue_write(blockstore_op_t *op) +int blockstore_impl_t::dequeue_write(blockstore_op_t *op) { auto dirty_it = dirty_db.find((obj_ver_id){ .oid = op->oid, @@ -76,11 +76,11 @@ int blockstore::dequeue_write(blockstore_op_t *op) if (flusher->is_active()) { // hope that some space will be available after flush - op->wait_for = WAIT_FREE; + PRIV(op)->wait_for = WAIT_FREE; return 0; } op->retval = -ENOSPC; - op->callback(op); + FINISH_OP(op); return 1; } BS_SUBMIT_GET_SQE(sqe, data); @@ -96,24 +96,24 @@ int blockstore::dequeue_write(blockstore_op_t *op) // Zero fill newly allocated object. First write is always a big write // FIXME: Add "no-zero-fill" mode which will just leave random garbage (insecure, but may be useful) if (op->offset > 0) - op->iov_zerofill[vcnt++] = (struct iovec){ zero_object, op->offset }; - op->iov_zerofill[vcnt++] = (struct iovec){ op->buf, op->len }; + PRIV(op)->iov_zerofill[vcnt++] = (struct iovec){ zero_object, op->offset }; + PRIV(op)->iov_zerofill[vcnt++] = (struct iovec){ op->buf, op->len }; if (op->offset+op->len < block_size) - op->iov_zerofill[vcnt++] = (struct iovec){ zero_object, block_size - (op->offset + op->len) }; + PRIV(op)->iov_zerofill[vcnt++] = (struct iovec){ zero_object, block_size - (op->offset + op->len) }; data->iov.iov_len = block_size; } else { vcnt = 1; - op->iov_zerofill[0] = (struct iovec){ op->buf, op->len }; + PRIV(op)->iov_zerofill[0] = (struct iovec){ op->buf, op->len }; data->iov.iov_len = op->len; // to check it in the callback } data->callback = [this, op](ring_data_t *data) { handle_write_event(data, op); }; my_uring_prep_writev( - sqe, data_fd, op->iov_zerofill, vcnt, data_offset + (loc << block_order) + sqe, data_fd, PRIV(op)->iov_zerofill, vcnt, data_offset + (loc << block_order) ); - op->pending_ops = 1; - op->min_used_journal_sector = op->max_used_journal_sector = 0; + PRIV(op)->pending_ops = 1; + PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 0; // Remember big write as unsynced unsynced_big_writes.push_back((obj_ver_id){ .oid = op->oid, @@ -157,7 +157,7 @@ int blockstore::dequeue_write(blockstore_op_t *op) journal.crc32_last = je->crc32; auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); }; prepare_journal_sector_write(journal, sqe1, cb); - op->min_used_journal_sector = op->max_used_journal_sector = 1 + journal.cur_sector; + PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 1 + journal.cur_sector; // Prepare journal data write if (journal.inmemory) { @@ -174,7 +174,7 @@ int blockstore::dequeue_write(blockstore_op_t *op) journal.next_free += op->len; if (journal.next_free >= journal.len) journal.next_free = 512; - op->pending_ops = 2; + PRIV(op)->pending_ops = 2; // Remember small write as unsynced unsynced_small_writes.push_back((obj_ver_id){ .oid = op->oid, @@ -184,7 +184,7 @@ int blockstore::dequeue_write(blockstore_op_t *op) return 1; } -void blockstore::handle_write_event(ring_data_t *data, blockstore_op_t *op) +void blockstore_impl_t::handle_write_event(ring_data_t *data, blockstore_op_t *op) { if (data->res != data->iov.iov_len) { @@ -194,21 +194,21 @@ void blockstore::handle_write_event(ring_data_t *data, blockstore_op_t *op) "). in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111" ); } - op->pending_ops--; - if (op->pending_ops == 0) + PRIV(op)->pending_ops--; + if (PRIV(op)->pending_ops == 0) { // Release used journal sectors - if (op->min_used_journal_sector > 0) + if (PRIV(op)->min_used_journal_sector > 0) { - uint64_t s = op->min_used_journal_sector; + uint64_t s = PRIV(op)->min_used_journal_sector; while (1) { journal.sector_info[s-1].usage_count--; - if (s == op->max_used_journal_sector) + if (s == PRIV(op)->max_used_journal_sector) break; s = 1 + s % journal.sector_count; } - op->min_used_journal_sector = op->max_used_journal_sector = 0; + PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 0; } // Switch object state auto & dirty_entry = dirty_db[(obj_ver_id){ @@ -232,11 +232,11 @@ void blockstore::handle_write_event(ring_data_t *data, blockstore_op_t *op) } // Acknowledge write without sync op->retval = op->len; - op->callback(op); + FINISH_OP(op); } } -int blockstore::dequeue_del(blockstore_op_t *op) +int blockstore_impl_t::dequeue_del(blockstore_op_t *op) { auto dirty_it = dirty_db.find((obj_ver_id){ .oid = op->oid, @@ -262,8 +262,8 @@ int blockstore::dequeue_del(blockstore_op_t *op) journal.crc32_last = je->crc32; auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); }; prepare_journal_sector_write(journal, sqe, cb); - op->min_used_journal_sector = op->max_used_journal_sector = 1 + journal.cur_sector; - op->pending_ops = 1; + PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 1 + journal.cur_sector; + PRIV(op)->pending_ops = 1; dirty_it->second.state = ST_DEL_SUBMITTED; // Remember small write as unsynced unsynced_small_writes.push_back((obj_ver_id){ diff --git a/fio_engine.cpp b/fio_engine.cpp index a015a2759..6b587ee03 100644 --- a/fio_engine.cpp +++ b/fio_engine.cpp @@ -24,7 +24,7 @@ extern "C" { struct bs_data { - blockstore *bs; + blockstore_t *bs; ring_loop_t *ringloop; /* The list of completed io_u structs. */ std::vector completed; @@ -154,7 +154,7 @@ static int bs_init(struct thread_data *td) if (read_only) config["readonly"] = "true"; bsd->ringloop = new ring_loop_t(512); - bsd->bs = new blockstore(config, bsd->ringloop); + bsd->bs = new blockstore_t(config, bsd->ringloop); while (1) { bsd->ringloop->loop(); @@ -232,21 +232,22 @@ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io) op->callback = [io, n](blockstore_op_t *op) { bs_data *bsd = (bs_data*)io->engine_data; - if (op->retval >= 0 && bsd->bs->unstable_writes.size() > 0) + auto & unstable_writes = bsd->bs->get_unstable_writes(); + if (op->retval >= 0 && unstable_writes.size() > 0) { op->flags = OP_STABLE; - op->len = bsd->bs->unstable_writes.size(); + op->len = unstable_writes.size(); obj_ver_id *vers = new obj_ver_id[op->len]; op->buf = vers; int i = 0; - for (auto it = bsd->bs->unstable_writes.begin(); it != bsd->bs->unstable_writes.end(); it++, i++) + for (auto it = unstable_writes.begin(); it != unstable_writes.end(); it++, i++) { vers[i] = { .oid = it->first, .version = it->second, }; } - bsd->bs->unstable_writes.clear(); + unstable_writes.clear(); op->callback = [io, n](blockstore_op_t *op) { io->error = op->retval < 0 ? -op->retval : 0; diff --git a/osd.cpp b/osd.cpp index 02b03a725..0e7aa0cf5 100644 --- a/osd.cpp +++ b/osd.cpp @@ -13,7 +13,7 @@ #define CL_WRITE_REPLY 2 #define CL_WRITE_DATA 3 -osd_t::osd_t(blockstore_config_t & config, blockstore *bs, ring_loop_t *ringloop) +osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringloop) { bind_address = config["bind_address"]; if (bind_address == "") diff --git a/osd.h b/osd.h index 8fc43602b..bcc12b11b 100644 --- a/osd.h +++ b/osd.h @@ -1,8 +1,16 @@ #pragma once +#include +#include +#include +#include +#include +#include #include +#include #include +#include #include "ringloop.h" #include "osd_ops.h" @@ -72,7 +80,7 @@ class osd_t // fields - blockstore *bs; + blockstore_t *bs; ring_loop_t *ringloop; int wait_state = 0; @@ -94,7 +102,7 @@ class osd_t void make_reply(osd_op_t *op); void handle_send(ring_data_t *data, int peer_fd); public: - osd_t(blockstore_config_t & config, blockstore *bs, ring_loop_t *ringloop); + osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringloop); ~osd_t(); bool shutdown(); }; diff --git a/osd_main.cpp b/osd_main.cpp index 065202cdb..3ffec8488 100644 --- a/osd_main.cpp +++ b/osd_main.cpp @@ -13,7 +13,7 @@ int main(int narg, char *args[]) config["journal_device"] = "./test_journal.bin"; config["data_device"] = "./test_data.bin"; ring_loop_t *ringloop = new ring_loop_t(512); - blockstore *bs = new blockstore(config, ringloop); + blockstore_t *bs = new blockstore_t(config, ringloop); osd_t *osd = new osd_t(config, bs, ringloop); while (1) { diff --git a/test_blockstore.cpp b/test_blockstore.cpp index a7ad88529..66a57d389 100644 --- a/test_blockstore.cpp +++ b/test_blockstore.cpp @@ -1,15 +1,15 @@ -#include +#include #include "timerfd_interval.h" #include "blockstore.h" int main(int narg, char *args[]) { - spp::sparse_hash_map config; + blockstore_config_t config; config["meta_device"] = "./test_meta.bin"; config["journal_device"] = "./test_journal.bin"; config["data_device"] = "./test_data.bin"; ring_loop_t *ringloop = new ring_loop_t(512); - blockstore *bs = new blockstore(config, ringloop); + blockstore_t *bs = new blockstore_t(config, ringloop); timerfd_interval tick_tfd(ringloop, 1, []() { printf("tick 1s\n");