diff --git a/Makefile b/Makefile index 35d0f9ae..63f2d5c7 100644 --- a/Makefile +++ b/Makefile @@ -1,3 +1,5 @@ -all: allocator.o blockstore.o +all: allocator.o blockstore.o blockstore_open.o blockstore_read.o test %.o: %.cpp gcc -c -o $@ $< +test: test.cpp + gcc -o test -luring test.cpp diff --git a/blockstore.cpp b/blockstore.cpp index 1636c4c4..2172dd78 100644 --- a/blockstore.cpp +++ b/blockstore.cpp @@ -1,234 +1,27 @@ -#define _LARGEFILE64_SOURCE -#include -#include -#include -#include -#include -#include -#include +#include "blockstore.h" -#include -#include - -#include "allocator.h" -#include "sparsepp/sparsepp/spp.h" - -// States are not stored on disk. Instead, they're deduced from the journal - -#define ST_IN_FLIGHT 1 -#define ST_J_WRITTEN 2 -#define ST_J_SYNCED 3 -#define ST_J_STABLE 4 -#define ST_J_MOVED 5 -#define ST_J_MOVE_SYNCED 6 -#define ST_D_WRITTEN 16 -#define ST_D_SYNCED 17 -#define ST_D_META_WRITTEN 18 -#define ST_D_META_SYNCED 19 -#define ST_D_STABLE 20 -#define ST_D_META_MOVED 21 -#define ST_D_META_COMMITTED 22 -#define ST_CURRENT 32 -#define IS_STABLE(st) ((st) == 4 || (st) == 5 || (st) == 6 || (st) == 20 || (st) == 21 || (st) == 22 || (st) == 32) -#define IS_JOURNAL(st) (st >= 2 && st <= 6) - -// Default object size is 128 KB -#define DEFAULT_ORDER 17 -#define MAX_BLOCK_SIZE 128*1024*1024 -#define DISK_ALIGNMENT 4096 -#define MIN_JOURNAL_SIZE 4*1024*1024 -#define JOURNAL_MAGIC 0x4A33 - -#define STRIPE_NUM(oid) ((oid) >> 4) -#define STRIPE_REPLICA(oid) ((oid) & 0xf) - -// 16 bytes per object/stripe id -// stripe includes replica number in 4 least significant bits -struct __attribute__((__packed__)) object_id +blockstore::blockstore(spp::sparse_hash_map & config, io_uring *ring) { - uint64_t inode; - uint64_t stripe; -}; - -bool operator == (const object_id & a, const object_id & b) -{ - return b.inode == a.inode && b.stripe == a.stripe; -} - -// 32 bytes per "clean" entry on disk with fixed metadata tables -struct __attribute__((__packed__)) clean_disk_entry -{ - uint64_t inode; - uint64_t stripe; - uint64_t version; - uint8_t flags; - uint8_t reserved[7]; -}; - -// 28 bytes per "clean" entry in memory -struct __attribute__((__packed__)) clean_entry -{ - uint64_t version; - uint32_t state; - uint64_t location; -}; - -// 48 bytes per dirty entry in memory -struct __attribute__((__packed__)) dirty_entry -{ - uint64_t version; - uint32_t state; - uint32_t flags; - uint64_t location; // location in either journal or data - uint32_t offset; // offset within stripe - uint32_t size; // entry size -}; - -// Journal entries -// Journal entries are linked to each other by their crc32 value -// The journal is almost a blockchain, because object versions constantly increase -#define JE_START 0x01 -#define JE_SMALL_WRITE 0x02 -#define JE_BIG_WRITE 0x03 -#define JE_STABLE 0x04 -#define JE_DELETE 0x05 - -struct __attribute__((__packed__)) journal_entry_start -{ - uint32_t type; - uint32_t size; - uint32_t crc32; - uint32_t reserved1; - uint64_t offset; -}; - -struct __attribute__((__packed__)) journal_entry_small_write -{ - uint32_t type; - uint32_t size; - uint32_t crc32; - uint32_t crc32_prev; - object_id oid; - uint64_t version; - uint32_t offset; - uint32_t len; -}; - -struct __attribute__((__packed__)) journal_entry_big_write -{ - uint32_t type; - uint32_t size; - uint32_t crc32; - uint32_t crc32_prev; - object_id oid; - uint64_t version; - uint64_t block; -}; - -struct __attribute__((__packed__)) journal_entry_stable -{ - uint32_t type; - uint32_t size; - uint32_t crc32; - uint32_t crc32_prev; - object_id oid; - uint64_t version; -}; - -struct __attribute__((__packed__)) journal_entry_del -{ - uint32_t type; - uint32_t size; - uint32_t crc32; - uint32_t crc32_prev; - object_id oid; - uint64_t version; -}; - -struct __attribute__((__packed__)) journal_entry -{ - union + this->ring = ring; + initialized = 0; + block_order = stoull(config["block_size_order"]); + block_size = 1 << block_order; + if (block_size <= 1 || block_size >= MAX_BLOCK_SIZE) { - struct __attribute__((__packed__)) - { - uint16_t magic; - uint16_t type; - uint32_t size; - uint32_t crc32; - }; - journal_entry_start start; - journal_entry_small_write small_write; - journal_entry_big_write big_write; - journal_entry_stable stable; - journal_entry_del del; - }; -}; - -typedef std::vector dirty_list; - -class oid_hash -{ -public: - size_t operator()(const object_id &s) const - { - size_t seed = 0; - spp::hash_combine(seed, s.inode); - spp::hash_combine(seed, s.stripe); - return seed; + throw new std::runtime_error("Bad block size"); } -}; - -class blockstore -{ -public: - spp::sparse_hash_map object_db; - spp::sparse_hash_map dirty_queue; - int block_order, block_size; - uint64_t block_count; - allocator *data_alloc; - - int journal_fd; - int meta_fd; - int data_fd; - - uint64_t journal_offset, journal_size, journal_len; - uint64_t meta_offset, meta_size, meta_len; - uint64_t data_offset, data_size, data_len; - - uint64_t journal_start, journal_end; - - blockstore(spp::sparse_hash_map & config) + data_fd = meta_fd = journal_fd = -1; + try { - block_order = stoll(config["block_size_order"]); - block_size = 1 << block_order; - if (block_size <= 1 || block_size >= MAX_BLOCK_SIZE) - { - throw new std::runtime_error("Bad block size"); - } - data_fd = meta_fd = journal_fd = -1; - try - { - open_data(config); - open_meta(config); - open_journal(config); - calc_lengths(config); - data_alloc = allocator_create(block_count); - if (!data_alloc) - throw new std::bad_alloc(); - } - catch (std::exception & e) - { - if (data_fd >= 0) - close(data_fd); - if (meta_fd >= 0 && meta_fd != data_fd) - close(meta_fd); - if (journal_fd >= 0 && journal_fd != meta_fd) - close(journal_fd); - throw e; - } + open_data(config); + open_meta(config); + open_journal(config); + calc_lengths(config); + data_alloc = allocator_create(block_count); + if (!data_alloc) + throw new std::bad_alloc(); } - - ~blockstore() + catch (std::exception & e) { if (data_fd >= 0) close(data_fd); @@ -236,239 +29,145 @@ public: close(meta_fd); if (journal_fd >= 0 && journal_fd != meta_fd) close(journal_fd); + throw e; } +} - void calc_lengths(spp::sparse_hash_map & config) +blockstore::~blockstore() +{ + if (data_fd >= 0) + close(data_fd); + if (meta_fd >= 0 && meta_fd != data_fd) + close(meta_fd); + if (journal_fd >= 0 && journal_fd != meta_fd) + close(journal_fd); +} + +// must be called in the event loop until it returns 0 +int blockstore::init_loop() +{ + // read metadata, then journal + if (initialized) { - // data - data_len = data_size - data_offset; - if (data_fd == meta_fd && data_offset < meta_offset) + return 0; + } + if (!metadata_init_reader) + { + metadata_init_reader = new blockstore_init_meta(this); + } + if (metadata_init_reader->read_loop()) + { + return 1; + } + if (!journal_init_reader) + { + journal_init_reader = new blockstore_init_journal(this); + } + if (journal_init_reader->read_loop()) + { + return 1; + } + initialized = true; + delete metadata_init_reader; + delete journal_init_reader; + metadata_init_reader = NULL; + journal_init_reader = NULL; + return 0; +} + +blockstore_init_meta::blockstore_init_meta(blockstore *bs) +{ + this->bs = bs; +} + +int blockstore_init_meta::read_loop() +{ + if (metadata_read >= bs->meta_len) + { + return 0; + } + if (!metadata_buffer) + { + metadata_buffer = new uint8_t[2*bs->metadata_buf_size]; + } + if (submitted) + { + struct io_uring_cqe *cqe; + io_uring_peek_cqe(bs->ring, &cqe); + if (cqe) { - data_len = meta_offset - data_offset; - } - if (data_fd == journal_fd && data_offset < journal_offset) - { - data_len = data_len < journal_offset-data_offset - ? data_len : journal_offset-data_offset; - } - // meta - meta_len = (meta_fd == data_fd ? data_size : meta_size) - meta_offset; - if (meta_fd == data_fd && meta_offset < data_offset) - { - meta_len = data_offset - meta_offset; - } - if (meta_fd == journal_fd && meta_offset < journal_offset) - { - meta_len = meta_len < journal_offset-meta_offset - ? meta_len : journal_offset-meta_offset; - } - // journal - journal_len = (journal_fd == data_fd ? data_size : (journal_fd == meta_fd ? meta_size : journal_size)) - journal_offset; - if (journal_fd == data_fd && journal_offset < data_offset) - { - journal_len = data_offset - journal_offset; - } - if (journal_fd == meta_fd && journal_offset < meta_offset) - { - journal_len = journal_len < meta_offset-journal_offset - ? journal_len : meta_offset-journal_offset; - } - // required metadata size - block_count = data_len / block_size; - uint64_t meta_required = block_count * sizeof(clean_disk_entry); - if (meta_len < meta_required) - { - throw new std::runtime_error("Metadata area is too small"); - } - // requested journal size - uint64_t journal_wanted = stoll(config["journal_size"]); - if (journal_wanted > journal_len) - { - throw new std::runtime_error("Requested journal_size is too large"); - } - else if (journal_wanted > 0) - { - journal_len = journal_wanted; - } - if (journal_len < MIN_JOURNAL_SIZE) - { - throw new std::runtime_error("Journal is too small"); + if (cqe->res < 0) + { + throw new std::runtime_error( + std::string("read metadata failed at offset ") + std::to_string(metadata_read) + + std::string(": ") + strerror(-cqe->res) + ); + } + prev_done = cqe->res > 0 ? submitted : 0; + done_len = cqe->res; + metadata_read += cqe->res; + submitted = 0; } } - - void open_data(spp::sparse_hash_map & config) + if (!submitted) { - int sectsize; - data_offset = stoll(config["data_offset"]); - if (data_offset % DISK_ALIGNMENT) + struct io_uring_sqe *sqe = io_uring_get_sqe(bs->ring); + if (!sqe) { - throw new std::runtime_error("data_offset not aligned"); + throw new std::runtime_error("io_uring is full while trying to read metadata"); } - data_fd = open(config["data_device"].c_str(), O_DIRECT|O_RDWR); - if (data_fd == -1) + submit_iov = { + metadata_buffer + (prev == 1 ? bs->metadata_buf_size : 0), + bs->meta_len - metadata_read > bs->metadata_buf_size ? bs->metadata_buf_size : bs->meta_len - metadata_read, + }; + io_uring_prep_readv(sqe, bs->meta_fd, &submit_iov, 1, bs->meta_offset + metadata_read); + io_uring_submit(bs->ring); + submitted = (prev == 1 ? 2 : 1); + prev = submitted; + } + if (prev_done) + { + assert(!(done_len % sizeof(clean_disk_entry))); + int count = done_len / sizeof(clean_disk_entry); + struct clean_disk_entry *entries = (struct clean_disk_entry*)(metadata_buffer + (prev_done == 1 ? bs->metadata_buf_size : 0)); + // handle entries + handle_entries(entries, count); + done_cnt += count; + prev_done = 0; + done_len = 0; + } + if (metadata_read >= bs->meta_len) + { + // metadata read finished + delete[] metadata_buffer; + metadata_buffer = NULL; + return 0; + } + return 1; +} + +void blockstore_init_meta::handle_entries(struct clean_disk_entry* entries, int count) +{ + for (unsigned i = 0; i < count; i++) + { + if (entries[i].oid.inode > 0) { - throw new std::runtime_error("Failed to open data device"); - } - if (ioctl(data_fd, BLKSSZGET, §size) < 0 || - ioctl(data_fd, BLKGETSIZE64, &data_size) < 0 || - sectsize != 512) - { - throw new std::runtime_error("Data device sector is not equal to 512 bytes"); - } - if (data_offset >= data_size) - { - throw new std::runtime_error("data_offset exceeds device size"); + allocator_set(bs->data_alloc, done_cnt+i, true); + bs->object_db[entries[i].oid] = (struct clean_entry){ + entries[i].version, + (uint32_t)(entries[i].flags ? ST_CURRENT : ST_D_META_SYNCED), + done_cnt+i + }; } } +} - void open_meta(spp::sparse_hash_map & config) - { - int sectsize; - meta_offset = stoll(config["meta_offset"]); - if (meta_offset % DISK_ALIGNMENT) - { - throw new std::runtime_error("meta_offset not aligned"); - } - if (config["meta_device"] != "") - { - meta_offset = 0; - meta_fd = open(config["meta_device"].c_str(), O_DIRECT|O_RDWR); - if (meta_fd == -1) - { - throw new std::runtime_error("Failed to open metadata device"); - } - if (ioctl(meta_fd, BLKSSZGET, §size) < 0 || - ioctl(meta_fd, BLKGETSIZE64, &meta_size) < 0 || - sectsize != 512) - { - throw new std::runtime_error("Metadata device sector is not equal to 512 bytes (or ioctl failed)"); - } - if (meta_offset >= meta_size) - { - throw new std::runtime_error("meta_offset exceeds device size"); - } - } - else - { - meta_fd = data_fd; - meta_size = 0; - if (meta_offset >= data_size) - { - throw new std::runtime_error("meta_offset exceeds device size"); - } - } - } +blockstore_init_journal::blockstore_init_journal(blockstore *bs) +{ + this->bs = bs; +} - void open_journal(spp::sparse_hash_map & config) - { - int sectsize; - journal_offset = stoll(config["journal_offset"]); - if (journal_offset % DISK_ALIGNMENT) - { - throw new std::runtime_error("journal_offset not aligned"); - } - if (config["journal_device"] != "") - { - journal_fd = open(config["journal_device"].c_str(), O_DIRECT|O_RDWR); - if (journal_fd == -1) - { - throw new std::runtime_error("Failed to open journal device"); - } - if (ioctl(journal_fd, BLKSSZGET, §size) < 0 || - ioctl(journal_fd, BLKGETSIZE64, &journal_size) < 0 || - sectsize != 512) - { - throw new std::runtime_error("Journal device sector is not equal to 512 bytes"); - } - } - else - { - journal_fd = meta_fd; - journal_size = 0; - if (journal_offset >= data_size) - { - throw new std::runtime_error("journal_offset exceeds device size"); - } - } - } - - struct read_fulfill - { - uint64_t flags; - uint64_t offset; - uint64_t len; - void *buf; - }; - - void fulfill_read(std::map & fulfill, uint8_t* buf, uint32_t offset, uint32_t len, - uint32_t item_start, uint32_t dirty_end, uint32_t item_state, uint64_t item_location) - { - uint32_t dirty_start = item_start; - if (dirty_start < offset+len && dirty_end > offset) - { - dirty_start = dirty_start < offset ? offset : dirty_start; - dirty_end = dirty_end > offset+len ? offset+len : dirty_end; - auto fulfill_near = fulfill.lower_bound(dirty_start); - if (fulfill_near != fulfill.begin()) - { - fulfill_near--; - if (fulfill_near->second.offset + fulfill_near->second.len <= dirty_start) - fulfill_near++; - } - while (fulfill_near != fulfill.end() && fulfill_near->second.offset < dirty_end) - { - if (fulfill_near->second.offset > dirty_start) - { - fulfill[dirty_start] = (read_fulfill){ - item_state, - item_location + dirty_start - item_start, - fulfill_near->second.offset - dirty_start, - buf + dirty_start - offset, - }; - } - dirty_start = fulfill_near->second.offset + fulfill_near->second.len; - } - if (dirty_start < dirty_end) - { - fulfill[dirty_start] = (read_fulfill){ - item_state, - item_location + dirty_start - item_start, - dirty_end - dirty_start, - buf + dirty_start - offset - }; - } - } - } - - // flags: READ_DIRTY -#define READ_DIRTY 1 - int read(object_id oid, uint32_t offset, uint32_t len, uint32_t flags, uint8_t *buf, void (*callback)(int arg), int arg) - { - auto clean_it = object_db.find(oid); - auto dirty_it = dirty_queue.find(oid); - if (clean_it == object_db.end() && dirty_it == object_db.end()) - { - memset(buf, 0, len); - callback(arg); - return 0; - } - uint64_t fulfilled = 0; - std::map fulfill; - //std::vector fulfill; - if (dirty_it != object_db.end()) - { - dirty_list dirty = dirty_it->second; - for (int i = dirty.size()-1; i >= 0; i--) - { - if ((flags & READ_DIRTY) || IS_STABLE(dirty[i].state)) - { - fulfill_read(fulfill, buf, offset, len, dirty[i].offset, dirty[i].offset + dirty[i].size, IS_JOURNAL(dirty[i].state), dirty[i].location); - } - } - } - if (clean_it != object_db.end()) - { - fulfill_read(fulfill, buf, offset, len, 0, block_size, 0, clean_it->second.location); - } - } -}; +int blockstore_init_journal::read_loop() +{ + return 0; +} diff --git a/blockstore.h b/blockstore.h new file mode 100644 index 00000000..39cc5219 --- /dev/null +++ b/blockstore.h @@ -0,0 +1,203 @@ +#pragma once + +#define _LARGEFILE64_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "allocator.h" +#include "sparsepp/sparsepp/spp.h" + +// States are not stored on disk. Instead, they're deduced from the journal + +#define ST_IN_FLIGHT 1 +#define ST_J_WRITTEN 2 +#define ST_J_SYNCED 3 +#define ST_J_STABLE 4 +#define ST_J_MOVED 5 +#define ST_J_MOVE_SYNCED 6 +#define ST_D_WRITTEN 16 +#define ST_D_SYNCED 17 +#define ST_D_META_WRITTEN 18 +#define ST_D_META_SYNCED 19 +#define ST_D_STABLE 20 +#define ST_D_META_MOVED 21 +#define ST_D_META_COMMITTED 22 +#define ST_CURRENT 32 +#define IS_STABLE(st) ((st) == 4 || (st) == 5 || (st) == 6 || (st) == 20 || (st) == 21 || (st) == 22 || (st) == 32) +#define IS_JOURNAL(st) (st >= 2 && st <= 6) + +// Default object size is 128 KB +#define DEFAULT_ORDER 17 +#define MAX_BLOCK_SIZE 128*1024*1024 +#define DISK_ALIGNMENT 512 + +#define STRIPE_NUM(oid) ((oid) >> 4) +#define STRIPE_REPLICA(oid) ((oid) & 0xf) + +// 16 bytes per object/stripe id +// stripe includes replica number in 4 least significant bits +struct __attribute__((__packed__)) object_id +{ + uint64_t inode; + uint64_t stripe; +}; + +#include "blockstore_journal.h" + +inline bool operator == (const object_id & a, const object_id & b) +{ + return b.inode == a.inode && b.stripe == a.stripe; +} + +// 32 bytes per "clean" entry on disk with fixed metadata tables +struct __attribute__((__packed__)) clean_disk_entry +{ + object_id oid; + uint64_t version; + uint8_t flags; + uint8_t reserved[7]; +}; + +// 28 bytes per "clean" entry in memory +struct __attribute__((__packed__)) clean_entry +{ + uint64_t version; + uint32_t state; + uint64_t location; +}; + +// 48 bytes per dirty entry in memory +struct __attribute__((__packed__)) dirty_entry +{ + uint64_t version; + uint32_t state; + uint32_t flags; + uint64_t location; // location in either journal or data + uint32_t offset; // offset within stripe + uint32_t size; // entry size +}; + +typedef std::vector dirty_list; + +class oid_hash +{ +public: + size_t operator()(const object_id &s) const + { + size_t seed = 0; + spp::hash_combine(seed, s.inode); + spp::hash_combine(seed, s.stripe); + return seed; + } +}; + +// SYNC must be submitted after previous WRITEs/DELETEs (not before!) +// READs to the same object must be submitted after previous WRITEs/DELETEs +// Otherwise, the submit order is free, that is all operations may be submitted immediately +// In fact, adding a write operation must immediately result in dirty_queue being populated + +#define OP_READ 1 +#define OP_READ_DIRTY 2 +#define OP_WRITE 3 +#define OP_SYNC 4 +#define OP_STABLE 5 +#define OP_DELETE 6 + +#define WAIT_SQE 1 +#define WAIT_IN_FLIGHT 2 + +struct blockstore_operation +{ + std::function callback; + + uint32_t flags; + object_id oid; + uint64_t version; + uint32_t offset; + uint32_t len; + uint8_t *buf; + + std::map read_vec; + int completed; + int wait_for; + uint64_t wait_version; +}; + +class blockstore; + +class blockstore_init_meta +{ + blockstore *bs; + uint8_t *metadata_buffer; + uint64_t metadata_read = 0; + struct iovec submit_iov; + int prev = 0, prev_done = 0, done_len = 0, submitted = 0, done_cnt = 0; +public: + blockstore_init_meta(blockstore* bs); + int read_loop(); + void handle_entries(struct clean_disk_entry* entries, int count); +}; + +class blockstore_init_journal +{ + blockstore *bs; +public: + blockstore_init_journal(blockstore* bs); + int read_loop(); +}; + +class blockstore +{ +public: + spp::sparse_hash_map object_db; + spp::sparse_hash_map dirty_queue; + std::deque submit_queue; + std::set in_process_ops; + int block_order, block_size; + uint64_t block_count; + allocator *data_alloc; + + int journal_fd; + int meta_fd; + int data_fd; + + uint64_t journal_offset, journal_size, journal_len; + uint64_t meta_offset, meta_size, meta_area, meta_len; + uint64_t data_offset, data_size, data_len; + + uint64_t journal_start, journal_end; + + struct io_uring *ring; + + blockstore(spp::sparse_hash_map & config, struct io_uring *ring); + ~blockstore(); + + void calc_lengths(spp::sparse_hash_map & config); + void open_data(spp::sparse_hash_map & config); + void open_meta(spp::sparse_hash_map & config); + void open_journal(spp::sparse_hash_map & config); + + // Asynchronous init + int initialized; + int metadata_buf_size; + blockstore_init_meta* metadata_init_reader; + blockstore_init_journal* journal_init_reader; + int init_loop(); + + // Read + int read(blockstore_operation *read_op); + int fulfill_read(blockstore_operation & read_op, uint32_t item_start, uint32_t item_end, + uint32_t item_state, uint64_t item_version, uint64_t item_location); +}; diff --git a/blockstore_journal.h b/blockstore_journal.h new file mode 100644 index 00000000..d12f7273 --- /dev/null +++ b/blockstore_journal.h @@ -0,0 +1,90 @@ +#pragma once + +#define MIN_JOURNAL_SIZE 4*1024*1024 +#define JOURNAL_MAGIC 0x4A33 + +// Journal entries +// Journal entries are linked to each other by their crc32 value +// The journal is almost a blockchain, because object versions constantly increase +#define JE_START 0x01 +#define JE_SMALL_WRITE 0x02 +#define JE_BIG_WRITE 0x03 +#define JE_STABLE 0x04 +#define JE_DELETE 0x05 + +struct __attribute__((__packed__)) journal_entry_start +{ + uint16_t magic; + uint16_t type; + uint32_t size; + uint32_t crc32; + uint32_t reserved1; + uint64_t offset; +}; + +struct __attribute__((__packed__)) journal_entry_small_write +{ + uint16_t magic; + uint16_t type; + uint32_t size; + uint32_t crc32; + uint32_t crc32_prev; + object_id oid; + uint64_t version; + uint32_t offset; + uint32_t len; + // small_write entries contain bytes of data, but data is stored in the next journal sector +}; + +struct __attribute__((__packed__)) journal_entry_big_write +{ + uint16_t magic; + uint16_t type; + uint32_t size; + uint32_t crc32; + uint32_t crc32_prev; + object_id oid; + uint64_t version; + uint64_t block; +}; + +struct __attribute__((__packed__)) journal_entry_stable +{ + uint16_t magic; + uint16_t type; + uint32_t size; + uint32_t crc32; + uint32_t crc32_prev; + object_id oid; + uint64_t version; +}; + +struct __attribute__((__packed__)) journal_entry_del +{ + uint16_t magic; + uint16_t type; + uint32_t size; + uint32_t crc32; + uint32_t crc32_prev; + object_id oid; + uint64_t version; +}; + +struct __attribute__((__packed__)) journal_entry +{ + union + { + struct __attribute__((__packed__)) + { + uint16_t magic; + uint16_t type; + uint32_t size; + uint32_t crc32; + }; + journal_entry_start start; + journal_entry_small_write small_write; + journal_entry_big_write big_write; + journal_entry_stable stable; + journal_entry_del del; + }; +}; diff --git a/blockstore_open.cpp b/blockstore_open.cpp new file mode 100644 index 00000000..c9f47152 --- /dev/null +++ b/blockstore_open.cpp @@ -0,0 +1,160 @@ +#include "blockstore.h" + +void blockstore::calc_lengths(spp::sparse_hash_map & config) +{ + // data + data_len = data_size - data_offset; + if (data_fd == meta_fd && data_offset < meta_offset) + { + data_len = meta_offset - data_offset; + } + if (data_fd == journal_fd && data_offset < journal_offset) + { + data_len = data_len < journal_offset-data_offset + ? data_len : journal_offset-data_offset; + } + // meta + meta_area = (meta_fd == data_fd ? data_size : meta_size) - meta_offset; + if (meta_fd == data_fd && meta_offset < data_offset) + { + meta_area = data_offset - meta_offset; + } + if (meta_fd == journal_fd && meta_offset < journal_offset) + { + meta_area = meta_area < journal_offset-meta_offset + ? meta_area : journal_offset-meta_offset; + } + // journal + journal_len = (journal_fd == data_fd ? data_size : (journal_fd == meta_fd ? meta_size : journal_size)) - journal_offset; + if (journal_fd == data_fd && journal_offset < data_offset) + { + journal_len = data_offset - journal_offset; + } + if (journal_fd == meta_fd && journal_offset < meta_offset) + { + journal_len = journal_len < meta_offset-journal_offset + ? journal_len : meta_offset-journal_offset; + } + // required metadata size + block_count = data_len / block_size; + meta_len = block_count * sizeof(clean_disk_entry); + if (meta_area < meta_len) + { + throw new std::runtime_error("Metadata area is too small"); + } + metadata_buf_size = stoull(config["meta_buf_size"]); + if (metadata_buf_size < 65536) + { + metadata_buf_size = 4*1024*1024; + } + // requested journal size + uint64_t journal_wanted = stoull(config["journal_size"]); + if (journal_wanted > journal_len) + { + throw new std::runtime_error("Requested journal_size is too large"); + } + else if (journal_wanted > 0) + { + journal_len = journal_wanted; + } + if (journal_len < MIN_JOURNAL_SIZE) + { + throw new std::runtime_error("Journal is too small"); + } +} + +void blockstore::open_data(spp::sparse_hash_map & config) +{ + int sectsize; + data_offset = stoull(config["data_offset"]); + if (data_offset % DISK_ALIGNMENT) + { + throw new std::runtime_error("data_offset not aligned"); + } + data_fd = open(config["data_device"].c_str(), O_DIRECT|O_RDWR); + if (data_fd == -1) + { + throw new std::runtime_error("Failed to open data device"); + } + if (ioctl(data_fd, BLKSSZGET, §size) < 0 || + ioctl(data_fd, BLKGETSIZE64, &data_size) < 0 || + sectsize != 512) + { + throw new std::runtime_error("Data device sector is not equal to 512 bytes"); + } + if (data_offset >= data_size) + { + throw new std::runtime_error("data_offset exceeds device size"); + } +} + +void blockstore::open_meta(spp::sparse_hash_map & config) +{ + int sectsize; + meta_offset = stoull(config["meta_offset"]); + if (meta_offset % DISK_ALIGNMENT) + { + throw new std::runtime_error("meta_offset not aligned"); + } + if (config["meta_device"] != "") + { + meta_offset = 0; + meta_fd = open(config["meta_device"].c_str(), O_DIRECT|O_RDWR); + if (meta_fd == -1) + { + throw new std::runtime_error("Failed to open metadata device"); + } + if (ioctl(meta_fd, BLKSSZGET, §size) < 0 || + ioctl(meta_fd, BLKGETSIZE64, &meta_size) < 0 || + sectsize != 512) + { + throw new std::runtime_error("Metadata device sector is not equal to 512 bytes (or ioctl failed)"); + } + if (meta_offset >= meta_size) + { + throw new std::runtime_error("meta_offset exceeds device size"); + } + } + else + { + meta_fd = data_fd; + meta_size = 0; + if (meta_offset >= data_size) + { + throw new std::runtime_error("meta_offset exceeds device size"); + } + } +} + +void blockstore::open_journal(spp::sparse_hash_map & config) +{ + int sectsize; + journal_offset = stoull(config["journal_offset"]); + if (journal_offset % DISK_ALIGNMENT) + { + throw new std::runtime_error("journal_offset not aligned"); + } + if (config["journal_device"] != "") + { + journal_fd = open(config["journal_device"].c_str(), O_DIRECT|O_RDWR); + if (journal_fd == -1) + { + throw new std::runtime_error("Failed to open journal device"); + } + if (ioctl(journal_fd, BLKSSZGET, §size) < 0 || + ioctl(journal_fd, BLKGETSIZE64, &journal_size) < 0 || + sectsize != 512) + { + throw new std::runtime_error("Journal device sector is not equal to 512 bytes"); + } + } + else + { + journal_fd = meta_fd; + journal_size = 0; + if (journal_offset >= data_size) + { + throw new std::runtime_error("journal_offset exceeds device size"); + } + } +} diff --git a/blockstore_read.cpp b/blockstore_read.cpp new file mode 100644 index 00000000..0cb686da --- /dev/null +++ b/blockstore_read.cpp @@ -0,0 +1,142 @@ +#include "blockstore.h" + +int blockstore::fulfill_read(blockstore_operation & read_op, uint32_t item_start, uint32_t item_end, + uint32_t item_state, uint64_t item_version, uint64_t item_location) +{ + uint32_t cur_start = item_start; + if (cur_start < read_op.offset + read_op.len && item_end > read_op.offset) + { + cur_start = cur_start < read_op.offset ? read_op.offset : cur_start; + item_end = item_end > read_op.offset + read_op.len ? read_op.offset + read_op.len : item_end; + auto fulfill_near = read_op.read_vec.lower_bound(cur_start); + if (fulfill_near != read_op.read_vec.begin()) + { + fulfill_near--; + if (fulfill_near->first + fulfill_near->second.iov_len <= cur_start) + { + fulfill_near++; + } + } + while (fulfill_near != read_op.read_vec.end() && fulfill_near->first < item_end) + { + if (fulfill_near->first > cur_start) + { + if (item_state == ST_IN_FLIGHT) + { + // Pause until it's written somewhere + read_op.wait_for = WAIT_IN_FLIGHT; + read_op.wait_version = item_version; + return -1; + } + struct io_uring_sqe *sqe = io_uring_get_sqe(ring); + if (!sqe) + { + // Pause until there are more requests available + read_op.wait_for = WAIT_SQE; + return -1; + } + read_op.read_vec[cur_start] = (struct iovec){ + read_op.buf + cur_start - read_op.offset, + fulfill_near->first - cur_start + }; + io_uring_prep_readv( + sqe, + IS_JOURNAL(item_state) ? journal_fd : data_fd, + // FIXME: &read_op.read_vec is forbidden + &read_op.read_vec[cur_start], 1, + (IS_JOURNAL(item_state) ? journal_offset : data_offset) + item_location + cur_start - item_start + ); + io_uring_sqe_set_data(sqe, 0/*read op link*/); + } + cur_start = fulfill_near->first + fulfill_near->second.iov_len; + fulfill_near++; + } + if (cur_start < item_end) + { + if (item_state == ST_IN_FLIGHT) + { + // Pause until it's written somewhere + read_op.wait_for = WAIT_IN_FLIGHT; + read_op.wait_version = item_version; + return -1; + } + struct io_uring_sqe *sqe = io_uring_get_sqe(ring); + if (!sqe) + { + // Pause until there are more requests available + read_op.wait_for = WAIT_SQE; + return -1; + } + read_op.read_vec[cur_start] = (struct iovec){ + read_op.buf + cur_start - read_op.offset, + item_end - cur_start + }; + io_uring_prep_readv( + sqe, + IS_JOURNAL(item_state) ? journal_fd : data_fd, + &read_op.read_vec[cur_start], 1, + (IS_JOURNAL(item_state) ? journal_offset : data_offset) + item_location + cur_start - item_start + ); + io_uring_sqe_set_data(sqe, 0/*read op link*/); + } + } + return 0; +} + +int blockstore::read(blockstore_operation *read_op) +{ + auto clean_it = object_db.find(read_op->oid); + auto dirty_it = dirty_queue.find(read_op->oid); + if (clean_it == object_db.end() && dirty_it == object_db.end()) + { + // region is not allocated - return zeroes + memset(read_op->buf, 0, read_op->len); + read_op->callback(read_op); + return 0; + } + unsigned prev_sqe_pos = ring->sq.sqe_tail; + uint64_t fulfilled = 0; + if (dirty_it != object_db.end()) + { + dirty_list dirty = dirty_it->second; + for (int i = dirty.size()-1; i >= 0; i--) + { + if (read_op->flags == OP_READ_DIRTY || IS_STABLE(dirty[i].state)) + { + if (fulfill_read(*read_op, dirty[i].offset, dirty[i].offset + dirty[i].size, dirty[i].state, dirty[i].version, dirty[i].location) < 0) + { + // need to wait for something, undo added requests and requeue op + ring->sq.sqe_tail = prev_sqe_pos; + read_op->read_vec.clear(); + submit_queue.push_front(read_op); + return 0; + } + } + } + } + if (clean_it != object_db.end()) + { + if (fulfill_read(*read_op, 0, block_size, ST_CURRENT, 0, clean_it->second.location) < 0) + { + // need to wait for something, undo added requests and requeue op + ring->sq.sqe_tail = prev_sqe_pos; + read_op->read_vec.clear(); + submit_queue.push_front(read_op); + return 0; + } + } + if (!read_op->read_vec.size()) + { + // region is not allocated - return zeroes + free(read_op); + memset(read_op->buf, 0, read_op->len); + read_op->callback(read_op); + return 0; + } + int ret = io_uring_submit(ring); + if (ret < 0) + { + throw new std::runtime_error(std::string("io_uring_submit: ") + strerror(-ret)); + } + return 0; +} diff --git a/test.cpp b/test.cpp new file mode 100644 index 00000000..79dd27ac --- /dev/null +++ b/test.cpp @@ -0,0 +1,62 @@ +#define _LARGEFILE64_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +static int setup_context(unsigned entries, struct io_uring *ring) +{ + int ret = io_uring_queue_init(entries, ring, 0); + if (ret < 0) + { + fprintf(stderr, "queue_init: %s\n", strerror(-ret)); + return -1; + } + return 0; +} + +static void test_write(struct io_uring *ring, int fd) +{ + struct io_uring_sqe *sqe = io_uring_get_sqe(ring); + assert(sqe); + uint8_t *buf = (uint8_t*)memalign(512, 1024*1024*1024); + struct iovec iov = { buf, 1024*1024*1024 }; + io_uring_prep_writev(sqe, fd, &iov, 1, 0); + io_uring_sqe_set_data(sqe, 0); + io_uring_submit_and_wait(ring, 1); + struct io_uring_cqe *cqe; + io_uring_peek_cqe(ring, &cqe); + int ret = cqe->res; + //int ret = writev(fd, &iov, 1); + if (ret < 0) + printf("cqe failed: %d %s\n", ret, strerror(-ret)); + else + printf("result: %d\n", ret); + free(buf); +} + +int main(int argc, char *argv[]) +{ + struct io_uring ring; + int fd = open("testfile", O_RDWR | O_DIRECT, 0644); + if (fd < 0) + { + perror("open infile"); + return 1; + } + if (setup_context(32, &ring)) + return 1; + test_write(&ring, fd); + close(fd); + io_uring_queue_exit(&ring); + return 0; +}