#define _LARGEFILE64_SOURCE #include #include #include #include #include #include "allocator.h" #include "sparsepp/sparsepp/spp.h" // States are not stored on disk. Instead, they're deduced from the journal #define ST_IN_FLIGHT 1 #define ST_J_WRITTEN 2 #define ST_J_SYNCED 3 #define ST_J_STABLE 4 #define ST_J_MOVED 5 #define ST_J_MOVE_SYNCED 6 #define ST_D_WRITTEN 16 #define ST_D_SYNCED 17 #define ST_D_META_WRITTEN 18 #define ST_D_META_SYNCED 19 #define ST_D_STABLE 20 #define ST_D_META_MOVED 21 #define ST_D_META_COMMITTED 22 #define ST_CURRENT 32 #define DEFAULT_ORDER 17 #define MAX_BLOCK_SIZE 128*1024*1024 #define DISK_ALIGNMENT 4096 #define MIN_JOURNAL_SIZE 4*1024*1024 #define STRIPE_NUM(oid) ((oid) >> 4) #define STRIPE_REPLICA(oid) ((oid) & 0xf) struct __attribute__((__packed__)) oid { uint64_t inode; uint64_t stripe; }; struct __attribute__((__packed__)) meta_entry { uint64_t inode; uint64_t stripe; uint32_t epoch; uint32_t version; uint64_t location_flags; }; struct __attribute__((__packed__)) object_version { uint32_t epoch; uint32_t version; uint64_t location; uint32_t size; uint32_t state; bool in_journal() { return (location & (1 << 63)); } uint64_t offset() { return (location & ~(1 << 63)); } }; struct __attribute__((__packed__)) object_ver_list { uint64_t count; object_version versions[]; }; struct __attribute__((__packed__)) object_info { object_version first; object_ver_list *other; }; class oid_hash { public: size_t operator()(const oid &s) const { size_t seed = 0; spp::hash_combine(seed, s.inode); spp::hash_combine(seed, s.stripe); return seed; } }; class blockstore { public: spp::sparse_hash_map object_db; int block_order, block_size; uint64_t block_count; allocator *data_alloc; int journal_fd; int meta_fd; int data_fd; uint64_t journal_offset, journal_size, journal_len; uint64_t meta_offset, meta_size, meta_len; uint64_t data_offset, data_size, data_len; blockstore(std::unordered_map & config) { block_order = stoll(config["block_size_order"]); block_size = 1 << block_order; if (block_size <= 1 || block_size >= MAX_BLOCK_SIZE) { throw new std::runtime_error("Bad block size"); } data_fd = meta_fd = journal_fd = -1; try { open_data(config); open_meta(config); open_journal(config); calc_lengths(config); data_alloc = allocator_create(block_count); if (!data_alloc) throw new std::bad_alloc(); } catch (std::exception & e) { if (data_fd >= 0) close(data_fd); if (meta_fd >= 0 && meta_fd != data_fd) close(meta_fd); if (journal_fd >= 0 && journal_fd != meta_fd) close(journal_fd); throw e; } } ~blockstore() { if (data_fd >= 0) close(data_fd); if (meta_fd >= 0 && meta_fd != data_fd) close(meta_fd); if (journal_fd >= 0 && journal_fd != meta_fd) close(journal_fd); } void calc_lengths(std::unordered_map & config) { // data data_len = data_size - data_offset; if (data_fd == meta_fd && data_offset < meta_offset) { data_len = meta_offset - data_offset; } if (data_fd == journal_fd && data_offset < journal_offset) { data_len = data_len < journal_offset-data_offset ? data_len : journal_offset-data_offset; } // meta meta_len = (meta_fd == data_fd ? data_size : meta_size) - meta_offset; if (meta_fd == data_fd && meta_offset < data_offset) { meta_len = data_offset - meta_offset; } if (meta_fd == journal_fd && meta_offset < journal_offset) { meta_len = meta_len < journal_offset-meta_offset ? meta_len : journal_offset-meta_offset; } // journal journal_len = (journal_fd == data_fd ? data_size : (journal_fd == meta_fd ? meta_size : journal_size)) - journal_offset; if (journal_fd == data_fd && journal_offset < data_offset) { journal_len = data_offset - journal_offset; } if (journal_fd == meta_fd && journal_offset < meta_offset) { journal_len = journal_len < meta_offset-journal_offset ? journal_len : meta_offset-journal_offset; } // required metadata size block_count = data_len / block_size; uint64_t meta_required = block_count * sizeof(meta_entry); if (meta_len < meta_required) { throw new std::runtime_error("Metadata area is too small"); } // requested journal size uint64_t journal_wanted = stoll(config["journal_size"]); if (journal_wanted > journal_len) { throw new std::runtime_error("Requested journal_size is too large"); } else if (journal_wanted > 0) { journal_len = journal_wanted; } if (journal_len < MIN_JOURNAL_SIZE) { throw new std::runtime_error("Journal is too small"); } } void open_data(std::unordered_map & config) { int sectsize; data_offset = stoll(config["data_offset"]); if (data_offset % DISK_ALIGNMENT) { throw new std::runtime_error("data_offset not aligned"); } data_fd = open(config["data_device"], O_DIRECT|O_RDWR); if (data_fd == -1) { throw new std::runtime_error("Failed to open data device"); } if (ioctl(data_fd, BLKSSZGET, §size) < 0 || ioctl(data_fd, BLKGETSIZE64, &data_size) < 0 || sectsize != 512) { throw new std::runtime_error("Data device sector is not equal to 512 bytes"); } if (data_offset >= data_size) { throw new std::runtime_error("data_offset exceeds device size"); } } void open_meta(std::unordered_map & config) { int sectsize; meta_offset = stoll(config["meta_offset"]); if (meta_offset % DISK_ALIGNMENT) { throw new std::runtime_error("meta_offset not aligned"); } if (config["meta_device"] != "") { meta_offset = 0; meta_fd = open(config["meta_device"], O_DIRECT|O_RDWR); if (meta_fd == -1) { throw new std::runtime_error("Failed to open metadata device"); } if (ioctl(meta_fd, BLKSSZGET, §size) < 0 || ioctl(meta_fd, BLKGETSIZE64, &meta_size) < 0 || sectsize != 512) { throw new std::runtime_error("Metadata device sector is not equal to 512 bytes (or ioctl failed)"); } if (meta_offset >= meta_size) { throw new std::runtime_error("meta_offset exceeds device size"); } } else { meta_fd = data_fd; meta_size = 0; if (meta_offset >= data_size) { throw new std::runtime_error("meta_offset exceeds device size"); } } } void open_journal(std::unordered_map & config) { int sectsize; journal_offset = stoll(config["journal_offset"]); if (journal_offset % DISK_ALIGNMENT) { throw new std::runtime_error("journal_offset not aligned"); } if (config["journal_device"] != "") { journal_fd = open(config["journal_device"], O_DIRECT|O_RDWR); if (journal_fd == -1) { throw new std::runtime_error("Failed to open journal device"); } if (ioctl(journal_fd, BLKSSZGET, §size) < 0 || ioctl(journal_fd, BLKGETSIZE64, &journal_size) < 0 || sectsize != 512) { throw new std::runtime_error("Journal device sector is not equal to 512 bytes"); } } else { journal_fd = meta_fd; journal_size = 0; if (journal_offset >= data_size) { throw new std::runtime_error("journal_offset exceeds device size"); } } } int read(oid stripe, uint32_t offset, uint32_t len, void *buf, void (*callback)(int arg), int arg) { auto o = object_db.find(stripe); if (o == object_db.end()) { memset(buf, 0, len); callback(arg); return; } auto info = o->second; } };