Split blockstore implementation and interface header

blocking-uring-test
Vitaliy Filippov 2019-12-15 14:49:10 +03:00
parent 749ab6e2c6
commit a7e74670a5
21 changed files with 796 additions and 689 deletions

View File

@ -1,4 +1,4 @@
BLOCKSTORE_OBJS := allocator.o blockstore.o blockstore_init.o blockstore_open.o blockstore_journal.o blockstore_read.o \
BLOCKSTORE_OBJS := allocator.o blockstore.o blockstore_impl.o blockstore_init.o blockstore_open.o blockstore_journal.o blockstore_read.o \
blockstore_write.o blockstore_sync.o blockstore_stable.o blockstore_flush.o crc32c.o ringloop.o timerfd_interval.o
CXXFLAGS := -g -O3 -Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith -fPIC -fdiagnostics-color=always
all: $(BLOCKSTORE_OBJS) test test_blockstore libfio_blockstore.so osd
@ -6,7 +6,7 @@ clean:
rm -f *.o
crc32c.o: crc32c.c
g++ $(CXXFLAGS) -c -o $@ $<
%.o: %.cpp allocator.h blockstore_flush.h blockstore.h blockstore_init.h blockstore_journal.h crc32c.h ringloop.h xor.h timerfd_interval.h
%.o: %.cpp allocator.h blockstore_flush.h blockstore.h blockstore_impl.h blockstore_init.h blockstore_journal.h crc32c.h ringloop.h xor.h timerfd_interval.h
g++ $(CXXFLAGS) -c -o $@ $<
osd: $(BLOCKSTORE_OBJS) osd_main.cpp osd.h osd_ops.h osd.o
g++ $(CXXFLAGS) -ltcmalloc_minimal -luring -o osd osd_main.cpp osd.o $(BLOCKSTORE_OBJS)

View File

@ -1,300 +1,51 @@
#include "blockstore.h"
#include "blockstore_impl.h"
blockstore::blockstore(blockstore_config_t & config, ring_loop_t *ringloop)
blockstore_t::blockstore_t(blockstore_config_t & config, ring_loop_t *ringloop)
{
this->ringloop = ringloop;
ring_consumer.loop = [this]() { loop(); };
ringloop->register_consumer(ring_consumer);
initialized = 0;
block_order = strtoull(config["block_size_order"].c_str(), NULL, 10);
if (block_order == 0)
{
block_order = DEFAULT_ORDER;
}
block_size = 1 << block_order;
if (block_size < MIN_BLOCK_SIZE || block_size >= MAX_BLOCK_SIZE)
{
throw std::runtime_error("Bad block size");
}
zero_object = (uint8_t*)memalign(DISK_ALIGNMENT, block_size);
data_fd = meta_fd = journal.fd = -1;
try
{
open_data(config);
open_meta(config);
open_journal(config);
calc_lengths(config);
data_alloc = new allocator(block_count);
}
catch (std::exception & e)
{
if (data_fd >= 0)
close(data_fd);
if (meta_fd >= 0 && meta_fd != data_fd)
close(meta_fd);
if (journal.fd >= 0 && journal.fd != meta_fd)
close(journal.fd);
throw;
}
int flusher_count = strtoull(config["flusher_count"].c_str(), NULL, 10);
if (!flusher_count)
flusher_count = 32;
flusher = new journal_flusher_t(flusher_count, this);
impl = new blockstore_impl_t(config, ringloop);
}
blockstore::~blockstore()
blockstore_t::~blockstore_t()
{
delete data_alloc;
delete flusher;
free(zero_object);
ringloop->unregister_consumer(ring_consumer);
if (data_fd >= 0)
close(data_fd);
if (meta_fd >= 0 && meta_fd != data_fd)
close(meta_fd);
if (journal.fd >= 0 && journal.fd != meta_fd)
close(journal.fd);
if (metadata_buffer)
free(metadata_buffer);
delete impl;
}
bool blockstore::is_started()
void blockstore_t::loop()
{
return initialized == 10;
impl->loop();
}
// main event loop - produce requests
void blockstore::loop()
bool blockstore_t::is_started()
{
if (initialized != 10)
{
// read metadata, then journal
if (initialized == 0)
{
metadata_init_reader = new blockstore_init_meta(this);
initialized = 1;
}
if (initialized == 1)
{
int res = metadata_init_reader->loop();
if (!res)
{
delete metadata_init_reader;
metadata_init_reader = NULL;
journal_init_reader = new blockstore_init_journal(this);
initialized = 2;
}
}
if (initialized == 2)
{
int res = journal_init_reader->loop();
if (!res)
{
delete journal_init_reader;
journal_init_reader = NULL;
initialized = 10;
}
}
}
else
{
// try to submit ops
auto cur_sync = in_progress_syncs.begin();
while (cur_sync != in_progress_syncs.end())
{
continue_sync(*cur_sync++);
}
auto cur = submit_queue.begin();
int has_writes = 0;
while (cur != submit_queue.end())
{
auto op_ptr = cur;
auto op = *(cur++);
// FIXME: This needs some simplification
// Writes should not block reads if the ring is not full and if reads don't depend on them
// In all other cases we should stop submission
if (op->wait_for)
{
check_wait(op);
if (op->wait_for == WAIT_SQE)
{
break;
}
else if (op->wait_for)
{
if ((op->flags & OP_TYPE_MASK) == OP_WRITE ||
(op->flags & OP_TYPE_MASK) == OP_DELETE)
{
has_writes = 2;
}
continue;
}
}
unsigned ring_space = io_uring_sq_space_left(&ringloop->ring);
unsigned prev_sqe_pos = ringloop->ring.sq.sqe_tail;
int dequeue_op = 0;
if ((op->flags & OP_TYPE_MASK) == OP_READ)
{
dequeue_op = dequeue_read(op);
}
else if ((op->flags & OP_TYPE_MASK) == OP_WRITE ||
(op->flags & OP_TYPE_MASK) == OP_DELETE)
{
if (has_writes == 2)
{
// Some writes could not be submitted
break;
}
dequeue_op = dequeue_write(op);
has_writes = dequeue_op ? 1 : 2;
}
else if ((op->flags & OP_TYPE_MASK) == OP_SYNC)
{
// wait for all small writes to be submitted
// wait for all big writes to complete, submit data device fsync
// wait for the data device fsync to complete, then submit journal writes for big writes
// then submit an fsync operation
if (has_writes)
{
// Can't submit SYNC before previous writes
continue;
}
dequeue_op = dequeue_sync(op);
}
else if ((op->flags & OP_TYPE_MASK) == OP_STABLE)
{
dequeue_op = dequeue_stable(op);
}
if (dequeue_op)
{
submit_queue.erase(op_ptr);
}
else
{
ringloop->ring.sq.sqe_tail = prev_sqe_pos;
if (op->wait_for == WAIT_SQE)
{
op->wait_detail = 1 + ring_space;
// ring is full, stop submission
break;
}
}
}
if (!readonly)
{
flusher->loop();
}
int ret = ringloop->submit();
if (ret < 0)
{
throw std::runtime_error(std::string("io_uring_submit: ") + strerror(-ret));
}
}
return impl->is_started();
}
bool blockstore::is_safe_to_stop()
bool blockstore_t::is_safe_to_stop()
{
// It's safe to stop blockstore when there are no in-flight operations,
// no in-progress syncs and flusher isn't doing anything
if (submit_queue.size() > 0 || in_progress_syncs.size() > 0 || !readonly && flusher->is_active())
{
return false;
}
if (unsynced_big_writes.size() > 0 || unsynced_small_writes.size() > 0)
{
if (!readonly && !stop_sync_submitted)
{
// We should sync the blockstore before unmounting
blockstore_op_t *op = new blockstore_op_t;
op->flags = OP_SYNC;
op->buf = NULL;
op->callback = [](blockstore_op_t *op)
{
delete op;
};
enqueue_op(op);
stop_sync_submitted = true;
}
return false;
}
return true;
return impl->is_safe_to_stop();
}
void blockstore::check_wait(blockstore_op_t *op)
void blockstore_t::enqueue_op(blockstore_op_t *op)
{
if (op->wait_for == WAIT_SQE)
{
if (io_uring_sq_space_left(&ringloop->ring) < op->wait_detail)
{
// stop submission if there's still no free space
return;
}
op->wait_for = 0;
}
else if (op->wait_for == WAIT_IN_FLIGHT)
{
auto dirty_it = dirty_db.find((obj_ver_id){
.oid = op->oid,
.version = op->wait_detail,
});
if (dirty_it != dirty_db.end() && IS_IN_FLIGHT(dirty_it->second.state))
{
// do not submit
return;
}
op->wait_for = 0;
}
else if (op->wait_for == WAIT_JOURNAL)
{
if (journal.used_start == op->wait_detail)
{
// do not submit
return;
}
op->wait_for = 0;
}
else if (op->wait_for == WAIT_JOURNAL_BUFFER)
{
if (journal.sector_info[((journal.cur_sector + 1) % journal.sector_count)].usage_count > 0)
{
// do not submit
return;
}
op->wait_for = 0;
}
else if (op->wait_for == WAIT_FREE)
{
if (!data_alloc->get_free_count() && !flusher->is_active())
{
return;
}
op->wait_for = 0;
}
else
{
throw std::runtime_error("BUG: op->wait_for value is unexpected");
}
impl->enqueue_op(op);
}
void blockstore::enqueue_op(blockstore_op_t *op)
std::map<object_id, uint64_t> & blockstore_t::get_unstable_writes()
{
int type = op->flags & OP_TYPE_MASK;
if (type < OP_READ || type > OP_DELETE || (type == OP_READ || type == OP_WRITE) &&
(op->offset >= block_size || op->len > block_size-op->offset || (op->len % DISK_ALIGNMENT)) ||
readonly && type != OP_READ)
{
// Basic verification not passed
op->retval = -EINVAL;
op->callback(op);
return;
}
op->wait_for = 0;
op->sync_state = 0;
op->pending_ops = 0;
submit_queue.push_back(op);
if ((op->flags & OP_TYPE_MASK) == OP_WRITE)
{
enqueue_write(op);
}
ringloop->wakeup();
return impl->unstable_writes;
}
uint32_t blockstore_t::get_block_size()
{
return impl->get_block_size();
}
uint32_t blockstore_t::get_block_order()
{
return impl->get_block_order();
}
uint64_t blockstore_t::get_block_count()
{
return impl->get_block_count();
}

View File

@ -3,93 +3,21 @@
#ifndef _LARGEFILE64_SOURCE
#define _LARGEFILE64_SOURCE
#endif
#include <sys/types.h>
#include <sys/ioctl.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdint.h>
#include <malloc.h>
#include <linux/fs.h>
#include <liburing.h>
#include <vector>
#include <stdint.h>
#include <map>
#include <list>
#include <deque>
#include <set>
#include <unordered_map>
#include <functional>
#include "sparsepp/sparsepp/spp.h"
#include "allocator.h"
#include "ringloop.h"
//#define BLOCKSTORE_DEBUG
// States are not stored on disk. Instead, they're deduced from the journal
#define ST_J_IN_FLIGHT 1
#define ST_J_SUBMITTED 2
#define ST_J_WRITTEN 3
#define ST_J_SYNCED 4
#define ST_J_STABLE 5
#define ST_D_IN_FLIGHT 15
#define ST_D_SUBMITTED 16
#define ST_D_WRITTEN 17
#define ST_D_META_WRITTEN 19
#define ST_D_META_SYNCED 20
#define ST_D_STABLE 21
#define ST_DEL_IN_FLIGHT 31
#define ST_DEL_SUBMITTED 32
#define ST_DEL_WRITTEN 33
#define ST_DEL_SYNCED 34
#define ST_DEL_STABLE 35
#define ST_CURRENT 48
#define IS_IN_FLIGHT(st) (st == ST_J_IN_FLIGHT || st == ST_D_IN_FLIGHT || st == ST_DEL_IN_FLIGHT || st == ST_J_SUBMITTED || st == ST_D_SUBMITTED || st == ST_DEL_SUBMITTED)
#define IS_STABLE(st) (st == ST_J_STABLE || st == ST_D_STABLE || st == ST_DEL_STABLE || st == ST_CURRENT)
#define IS_SYNCED(st) (IS_STABLE(st) || st == ST_J_SYNCED || st == ST_D_META_SYNCED || st == ST_DEL_SYNCED)
#define IS_JOURNAL(st) (st >= ST_J_SUBMITTED && st <= ST_J_STABLE)
#define IS_BIG_WRITE(st) (st >= ST_D_SUBMITTED && st <= ST_D_STABLE)
#define IS_DELETE(st) (st >= ST_DEL_SUBMITTED && st <= ST_DEL_STABLE)
#define IS_UNSYNCED(st) (st >= ST_J_SUBMITTED && st <= ST_J_WRITTEN || st >= ST_D_SUBMITTED && st <= ST_D_META_WRITTEN || st >= ST_DEL_SUBMITTED && st <= ST_DEL_WRITTEN)
// Default block size is 128 KB, current allowed range is 4K - 128M
#define DEFAULT_ORDER 17
#define MIN_BLOCK_SIZE 4*1024
#define MAX_BLOCK_SIZE 128*1024*1024
#define DISK_ALIGNMENT 512
#define BS_SUBMIT_GET_SQE(sqe, data) \
BS_SUBMIT_GET_ONLY_SQE(sqe); \
struct ring_data_t *data = ((ring_data_t*)sqe->user_data)
#define BS_SUBMIT_GET_ONLY_SQE(sqe) \
struct io_uring_sqe *sqe = get_sqe();\
if (!sqe)\
{\
/* Pause until there are more requests available */\
op->wait_for = WAIT_SQE;\
return 0;\
}
#define BS_SUBMIT_GET_SQE_DECL(sqe) \
sqe = get_sqe();\
if (!sqe)\
{\
/* Pause until there are more requests available */\
op->wait_for = WAIT_SQE;\
return 0;\
}
class blockstore;
class blockstore_op_t;
// 16 bytes per object/stripe id
// stripe includes replica number in 4 least significant bits
struct __attribute__((__packed__)) object_id
@ -98,8 +26,6 @@ struct __attribute__((__packed__)) object_id
uint64_t stripe;
};
#include "blockstore_journal.h"
inline bool operator == (const object_id & a, const object_id & b)
{
return a.inode == b.inode && a.stripe == b.stripe;
@ -115,21 +41,6 @@ inline bool operator < (const object_id & a, const object_id & b)
return a.inode < b.inode || a.inode == b.inode && a.stripe < b.stripe;
}
// 24 bytes per "clean" entry on disk with fixed metadata tables
// FIXME: maybe add crc32's to metadata
struct __attribute__((__packed__)) clean_disk_entry
{
object_id oid;
uint64_t version;
};
// 32 = 16 + 16 bytes per "clean" entry in memory (object_id => clean_entry)
struct __attribute__((__packed__)) clean_entry
{
uint64_t version;
uint64_t location;
};
// 56 = 24 + 32 bytes per dirty entry in memory (obj_ver_id => dirty_entry)
struct __attribute__((__packed__)) obj_ver_id
{
@ -142,45 +53,19 @@ inline bool operator < (const obj_ver_id & a, const obj_ver_id & b)
return a.oid < b.oid || a.oid == b.oid && a.version < b.version;
}
struct __attribute__((__packed__)) dirty_entry
{
uint32_t state;
uint32_t flags; // unneeded, but present for alignment
uint64_t location; // location in either journal or data -> in BYTES
uint32_t offset; // data offset within object (stripe)
uint32_t len; // data length
uint64_t journal_sector; // journal sector used for this entry
};
class oid_hash
{
public:
size_t operator()(const object_id &s) const
{
size_t seed = 0;
spp::hash_combine(seed, s.inode);
spp::hash_combine(seed, s.stripe);
// Copy-pasted from spp::hash_combine()
seed ^= (s.inode + 0xc6a4a7935bd1e995 + (seed << 6) + (seed >> 2));
seed ^= (s.stripe + 0xc6a4a7935bd1e995 + (seed << 6) + (seed >> 2));
return seed;
}
};
// - Sync must be submitted after previous writes/deletes (not before!)
// - Reads to the same object must be submitted after previous writes/deletes
// are written (not necessarily synced) in their location. This is because we
// rely on read-modify-write for erasure coding and we must return new data
// to calculate parity for subsequent writes
// - Writes may be submitted in any order, because they don't overlap. Each write
// goes into a new location - either on the journal device or on the data device
// - Stable (stabilize) must be submitted after sync of that object is completed
// It's even OK to return an error to the caller if that object is not synced yet
// - Journal trim may be processed only after all versions are moved to
// the main storage AND after all read operations for older versions complete
// - If an operation can not be submitted because the ring is full
// we should stop submission of other operations. Otherwise some "scatter" reads
// may end up blocked for a long time.
// Otherwise, the submit order is free, that is all operations may be submitted immediately
// In fact, adding a write operation must immediately result in dirty_db being populated
#define OP_READ 1
#define OP_WRITE 2
#define OP_SYNC 3
@ -188,21 +73,7 @@ public:
#define OP_DELETE 5
#define OP_TYPE_MASK 0x7
// Suspend operation until there are more free SQEs
#define WAIT_SQE 1
// Suspend operation until version <wait_detail> of object <oid> is written
#define WAIT_IN_FLIGHT 2
// Suspend operation until there are <wait_detail> bytes of free space in the journal on disk
#define WAIT_JOURNAL 3
// Suspend operation until the next journal sector buffer is free
#define WAIT_JOURNAL_BUFFER 4
// Suspend operation until there is some free space on the data device
#define WAIT_FREE 5
struct fulfill_read_t
{
uint64_t offset, len;
};
#define BS_OP_PRIVATE_DATA_SIZE 256
struct blockstore_op_t
{
@ -222,126 +93,19 @@ struct blockstore_op_t
void *buf;
int retval;
// FIXME: Move internal fields somewhere
friend class blockstore;
friend class blockstore_journal_check_t;
friend void prepare_journal_sector_write(journal_t & journal, io_uring_sqe *sqe, std::function<void(ring_data_t*)> cb);
private:
// Wait status
int wait_for;
uint64_t wait_detail;
int pending_ops;
// Read
std::vector<fulfill_read_t> read_vec;
// Sync, write
uint64_t min_used_journal_sector, max_used_journal_sector;
// Write
struct iovec iov_zerofill[3];
// Sync
std::vector<obj_ver_id> sync_big_writes, sync_small_writes;
std::list<blockstore_op_t*>::iterator in_progress_ptr;
int sync_state, prev_sync_count;
uint8_t private_data[BS_OP_PRIVATE_DATA_SIZE];
};
#include "blockstore_init.h"
typedef std::unordered_map<std::string, std::string> blockstore_config_t;
#include "blockstore_flush.h"
class blockstore_impl_t;
typedef spp::sparse_hash_map<std::string, std::string> blockstore_config_t;
class blockstore
class blockstore_t
{
struct ring_consumer_t ring_consumer;
// Another option is https://github.com/algorithm-ninja/cpp-btree
spp::sparse_hash_map<object_id, clean_entry, oid_hash> clean_db;
std::map<obj_ver_id, dirty_entry> dirty_db;
std::list<blockstore_op_t*> submit_queue; // FIXME: funny thing is that vector is better here
std::vector<obj_ver_id> unsynced_big_writes, unsynced_small_writes;
std::list<blockstore_op_t*> in_progress_syncs; // ...and probably here, too
allocator *data_alloc = NULL;
uint8_t *zero_object;
uint64_t block_count;
uint32_t block_order, block_size;
int meta_fd;
int data_fd;
uint64_t meta_offset, meta_size, meta_area, meta_len;
uint64_t data_offset, data_size, data_len;
bool readonly = false;
// FIXME: separate flags for data, metadata and journal
bool disable_fsync = false;
bool inmemory_meta = false;
void *metadata_buffer = NULL;
struct journal_t journal;
journal_flusher_t *flusher;
ring_loop_t *ringloop;
bool stop_sync_submitted;
inline struct io_uring_sqe* get_sqe()
{
return ringloop->get_sqe();
}
friend class blockstore_init_meta;
friend class blockstore_init_journal;
friend class blockstore_journal_check_t;
friend class journal_flusher_t;
friend class journal_flusher_co;
void calc_lengths(blockstore_config_t & config);
void open_data(blockstore_config_t & config);
void open_meta(blockstore_config_t & config);
void open_journal(blockstore_config_t & config);
// Asynchronous init
int initialized;
int metadata_buf_size;
blockstore_init_meta* metadata_init_reader;
blockstore_init_journal* journal_init_reader;
void check_wait(blockstore_op_t *op);
// Read
int dequeue_read(blockstore_op_t *read_op);
int fulfill_read(blockstore_op_t *read_op, uint64_t &fulfilled, uint32_t item_start, uint32_t item_end,
uint32_t item_state, uint64_t item_version, uint64_t item_location);
int fulfill_read_push(blockstore_op_t *op, void *buf, uint64_t offset, uint64_t len,
uint32_t item_state, uint64_t item_version);
void handle_read_event(ring_data_t *data, blockstore_op_t *op);
// Write
void enqueue_write(blockstore_op_t *op);
int dequeue_write(blockstore_op_t *op);
int dequeue_del(blockstore_op_t *op);
void handle_write_event(ring_data_t *data, blockstore_op_t *op);
// Sync
int dequeue_sync(blockstore_op_t *op);
void handle_sync_event(ring_data_t *data, blockstore_op_t *op);
int continue_sync(blockstore_op_t *op);
void ack_one_sync(blockstore_op_t *op);
int ack_sync(blockstore_op_t *op);
// Stabilize
int dequeue_stable(blockstore_op_t *op);
void handle_stable_event(ring_data_t *data, blockstore_op_t *op);
void stabilize_object(object_id oid, uint64_t max_ver);
blockstore_impl_t *impl;
public:
blockstore(blockstore_config_t & config, ring_loop_t *ringloop);
~blockstore();
blockstore_t(blockstore_config_t & config, ring_loop_t *ringloop);
~blockstore_t();
// Event loop
void loop();
@ -359,9 +123,9 @@ public:
void enqueue_op(blockstore_op_t *op);
// Unstable writes are added here (map of object_id -> version)
std::map<object_id, uint64_t> unstable_writes;
std::map<object_id, uint64_t> & get_unstable_writes();
inline uint32_t get_block_size() { return block_size; }
inline uint32_t get_block_order() { return block_order; }
inline uint64_t get_block_count() { return block_count; }
uint32_t get_block_size();
uint32_t get_block_order();
uint64_t get_block_count();
};

View File

@ -1,6 +1,6 @@
#include "blockstore.h"
#include "blockstore_impl.h"
journal_flusher_t::journal_flusher_t(int flusher_count, blockstore *bs)
journal_flusher_t::journal_flusher_t(int flusher_count, blockstore_impl_t *bs)
{
this->bs = bs;
this->flusher_count = flusher_count;

View File

@ -32,7 +32,7 @@ class journal_flusher_t;
// Journal flusher coroutine
class journal_flusher_co
{
blockstore *bs;
blockstore_impl_t *bs;
journal_flusher_t *flusher;
int wait_state, wait_count;
struct io_uring_sqe *sqe;
@ -64,7 +64,7 @@ class journal_flusher_t
int flusher_count;
int sync_threshold;
journal_flusher_co *co;
blockstore *bs;
blockstore_impl_t *bs;
friend class journal_flusher_co;
int journal_trim_counter, journal_trim_interval;
@ -78,7 +78,7 @@ class journal_flusher_t
std::deque<object_id> flush_queue;
std::map<object_id, uint64_t> flush_versions;
public:
journal_flusher_t(int flusher_count, blockstore *bs);
journal_flusher_t(int flusher_count, blockstore_impl_t *bs);
~journal_flusher_t();
void loop();
bool is_active();

303
blockstore_impl.cpp Normal file
View File

@ -0,0 +1,303 @@
#include "blockstore_impl.h"
blockstore_impl_t::blockstore_impl_t(blockstore_config_t & config, ring_loop_t *ringloop)
{
assert(sizeof(blockstore_op_private_t) <= BS_OP_PRIVATE_DATA_SIZE);
this->ringloop = ringloop;
ring_consumer.loop = [this]() { loop(); };
ringloop->register_consumer(ring_consumer);
initialized = 0;
block_order = strtoull(config["block_size_order"].c_str(), NULL, 10);
if (block_order == 0)
{
block_order = DEFAULT_ORDER;
}
block_size = 1 << block_order;
if (block_size < MIN_BLOCK_SIZE || block_size >= MAX_BLOCK_SIZE)
{
throw std::runtime_error("Bad block size");
}
zero_object = (uint8_t*)memalign(DISK_ALIGNMENT, block_size);
data_fd = meta_fd = journal.fd = -1;
try
{
open_data(config);
open_meta(config);
open_journal(config);
calc_lengths(config);
data_alloc = new allocator(block_count);
}
catch (std::exception & e)
{
if (data_fd >= 0)
close(data_fd);
if (meta_fd >= 0 && meta_fd != data_fd)
close(meta_fd);
if (journal.fd >= 0 && journal.fd != meta_fd)
close(journal.fd);
throw;
}
int flusher_count = strtoull(config["flusher_count"].c_str(), NULL, 10);
if (!flusher_count)
flusher_count = 32;
flusher = new journal_flusher_t(flusher_count, this);
}
blockstore_impl_t::~blockstore_impl_t()
{
delete data_alloc;
delete flusher;
free(zero_object);
ringloop->unregister_consumer(ring_consumer);
if (data_fd >= 0)
close(data_fd);
if (meta_fd >= 0 && meta_fd != data_fd)
close(meta_fd);
if (journal.fd >= 0 && journal.fd != meta_fd)
close(journal.fd);
if (metadata_buffer)
free(metadata_buffer);
}
bool blockstore_impl_t::is_started()
{
return initialized == 10;
}
// main event loop - produce requests
void blockstore_impl_t::loop()
{
if (initialized != 10)
{
// read metadata, then journal
if (initialized == 0)
{
metadata_init_reader = new blockstore_init_meta(this);
initialized = 1;
}
if (initialized == 1)
{
int res = metadata_init_reader->loop();
if (!res)
{
delete metadata_init_reader;
metadata_init_reader = NULL;
journal_init_reader = new blockstore_init_journal(this);
initialized = 2;
}
}
if (initialized == 2)
{
int res = journal_init_reader->loop();
if (!res)
{
delete journal_init_reader;
journal_init_reader = NULL;
initialized = 10;
}
}
}
else
{
// try to submit ops
auto cur_sync = in_progress_syncs.begin();
while (cur_sync != in_progress_syncs.end())
{
continue_sync(*cur_sync++);
}
auto cur = submit_queue.begin();
int has_writes = 0;
while (cur != submit_queue.end())
{
auto op_ptr = cur;
auto op = *(cur++);
// FIXME: This needs some simplification
// Writes should not block reads if the ring is not full and if reads don't depend on them
// In all other cases we should stop submission
if (PRIV(op)->wait_for)
{
check_wait(op);
if (PRIV(op)->wait_for == WAIT_SQE)
{
break;
}
else if (PRIV(op)->wait_for)
{
if ((op->flags & OP_TYPE_MASK) == OP_WRITE ||
(op->flags & OP_TYPE_MASK) == OP_DELETE)
{
has_writes = 2;
}
continue;
}
}
unsigned ring_space = io_uring_sq_space_left(&ringloop->ring);
unsigned prev_sqe_pos = ringloop->ring.sq.sqe_tail;
int dequeue_op = 0;
if ((op->flags & OP_TYPE_MASK) == OP_READ)
{
dequeue_op = dequeue_read(op);
}
else if ((op->flags & OP_TYPE_MASK) == OP_WRITE ||
(op->flags & OP_TYPE_MASK) == OP_DELETE)
{
if (has_writes == 2)
{
// Some writes could not be submitted
break;
}
dequeue_op = dequeue_write(op);
has_writes = dequeue_op ? 1 : 2;
}
else if ((op->flags & OP_TYPE_MASK) == OP_SYNC)
{
// wait for all small writes to be submitted
// wait for all big writes to complete, submit data device fsync
// wait for the data device fsync to complete, then submit journal writes for big writes
// then submit an fsync operation
if (has_writes)
{
// Can't submit SYNC before previous writes
continue;
}
dequeue_op = dequeue_sync(op);
}
else if ((op->flags & OP_TYPE_MASK) == OP_STABLE)
{
dequeue_op = dequeue_stable(op);
}
if (dequeue_op)
{
submit_queue.erase(op_ptr);
}
else
{
ringloop->ring.sq.sqe_tail = prev_sqe_pos;
if (PRIV(op)->wait_for == WAIT_SQE)
{
PRIV(op)->wait_detail = 1 + ring_space;
// ring is full, stop submission
break;
}
}
}
if (!readonly)
{
flusher->loop();
}
int ret = ringloop->submit();
if (ret < 0)
{
throw std::runtime_error(std::string("io_uring_submit: ") + strerror(-ret));
}
}
}
bool blockstore_impl_t::is_safe_to_stop()
{
// It's safe to stop blockstore when there are no in-flight operations,
// no in-progress syncs and flusher isn't doing anything
if (submit_queue.size() > 0 || in_progress_syncs.size() > 0 || !readonly && flusher->is_active())
{
return false;
}
if (unsynced_big_writes.size() > 0 || unsynced_small_writes.size() > 0)
{
if (!readonly && !stop_sync_submitted)
{
// We should sync the blockstore before unmounting
blockstore_op_t *op = new blockstore_op_t;
op->flags = OP_SYNC;
op->buf = NULL;
op->callback = [](blockstore_op_t *op)
{
delete op;
};
enqueue_op(op);
stop_sync_submitted = true;
}
return false;
}
return true;
}
void blockstore_impl_t::check_wait(blockstore_op_t *op)
{
if (PRIV(op)->wait_for == WAIT_SQE)
{
if (io_uring_sq_space_left(&ringloop->ring) < PRIV(op)->wait_detail)
{
// stop submission if there's still no free space
return;
}
PRIV(op)->wait_for = 0;
}
else if (PRIV(op)->wait_for == WAIT_IN_FLIGHT)
{
auto dirty_it = dirty_db.find((obj_ver_id){
.oid = op->oid,
.version = PRIV(op)->wait_detail,
});
if (dirty_it != dirty_db.end() && IS_IN_FLIGHT(dirty_it->second.state))
{
// do not submit
return;
}
PRIV(op)->wait_for = 0;
}
else if (PRIV(op)->wait_for == WAIT_JOURNAL)
{
if (journal.used_start == PRIV(op)->wait_detail)
{
// do not submit
return;
}
PRIV(op)->wait_for = 0;
}
else if (PRIV(op)->wait_for == WAIT_JOURNAL_BUFFER)
{
if (journal.sector_info[((journal.cur_sector + 1) % journal.sector_count)].usage_count > 0)
{
// do not submit
return;
}
PRIV(op)->wait_for = 0;
}
else if (PRIV(op)->wait_for == WAIT_FREE)
{
if (!data_alloc->get_free_count() && !flusher->is_active())
{
return;
}
PRIV(op)->wait_for = 0;
}
else
{
throw std::runtime_error("BUG: op->wait_for value is unexpected");
}
}
void blockstore_impl_t::enqueue_op(blockstore_op_t *op)
{
int type = op->flags & OP_TYPE_MASK;
if (type < OP_READ || type > OP_DELETE || (type == OP_READ || type == OP_WRITE) &&
(op->offset >= block_size || op->len > block_size-op->offset || (op->len % DISK_ALIGNMENT)) ||
readonly && type != OP_READ)
{
// Basic verification not passed
op->retval = -EINVAL;
op->callback(op);
return;
}
// Call constructor without allocating memory. We'll call destructor before returning op back
new ((void*)op->private_data) blockstore_op_private_t;
PRIV(op)->wait_for = 0;
PRIV(op)->sync_state = 0;
PRIV(op)->pending_ops = 0;
submit_queue.push_back(op);
if ((op->flags & OP_TYPE_MASK) == OP_WRITE)
{
enqueue_write(op);
}
ringloop->wakeup();
}

279
blockstore_impl.h Normal file
View File

@ -0,0 +1,279 @@
#pragma once
#include "blockstore.h"
#include <sys/types.h>
#include <sys/ioctl.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <malloc.h>
#include <linux/fs.h>
#include <vector>
#include <list>
#include <deque>
#include <set>
#include <new>
#include "sparsepp/sparsepp/spp.h"
#include "allocator.h"
//#define BLOCKSTORE_DEBUG
// States are not stored on disk. Instead, they're deduced from the journal
#define ST_J_IN_FLIGHT 1
#define ST_J_SUBMITTED 2
#define ST_J_WRITTEN 3
#define ST_J_SYNCED 4
#define ST_J_STABLE 5
#define ST_D_IN_FLIGHT 15
#define ST_D_SUBMITTED 16
#define ST_D_WRITTEN 17
#define ST_D_META_WRITTEN 19
#define ST_D_META_SYNCED 20
#define ST_D_STABLE 21
#define ST_DEL_IN_FLIGHT 31
#define ST_DEL_SUBMITTED 32
#define ST_DEL_WRITTEN 33
#define ST_DEL_SYNCED 34
#define ST_DEL_STABLE 35
#define ST_CURRENT 48
#define IS_IN_FLIGHT(st) (st == ST_J_IN_FLIGHT || st == ST_D_IN_FLIGHT || st == ST_DEL_IN_FLIGHT || st == ST_J_SUBMITTED || st == ST_D_SUBMITTED || st == ST_DEL_SUBMITTED)
#define IS_STABLE(st) (st == ST_J_STABLE || st == ST_D_STABLE || st == ST_DEL_STABLE || st == ST_CURRENT)
#define IS_SYNCED(st) (IS_STABLE(st) || st == ST_J_SYNCED || st == ST_D_META_SYNCED || st == ST_DEL_SYNCED)
#define IS_JOURNAL(st) (st >= ST_J_SUBMITTED && st <= ST_J_STABLE)
#define IS_BIG_WRITE(st) (st >= ST_D_SUBMITTED && st <= ST_D_STABLE)
#define IS_DELETE(st) (st >= ST_DEL_SUBMITTED && st <= ST_DEL_STABLE)
#define IS_UNSYNCED(st) (st >= ST_J_SUBMITTED && st <= ST_J_WRITTEN || st >= ST_D_SUBMITTED && st <= ST_D_META_WRITTEN || st >= ST_DEL_SUBMITTED && st <= ST_DEL_WRITTEN)
#define BS_SUBMIT_GET_SQE(sqe, data) \
BS_SUBMIT_GET_ONLY_SQE(sqe); \
struct ring_data_t *data = ((ring_data_t*)sqe->user_data)
#define BS_SUBMIT_GET_ONLY_SQE(sqe) \
struct io_uring_sqe *sqe = get_sqe();\
if (!sqe)\
{\
/* Pause until there are more requests available */\
PRIV(op)->wait_for = WAIT_SQE;\
return 0;\
}
#define BS_SUBMIT_GET_SQE_DECL(sqe) \
sqe = get_sqe();\
if (!sqe)\
{\
/* Pause until there are more requests available */\
PRIV(op)->wait_for = WAIT_SQE;\
return 0;\
}
#include "blockstore_journal.h"
// 24 bytes per "clean" entry on disk with fixed metadata tables
// FIXME: maybe add crc32's to metadata
struct __attribute__((__packed__)) clean_disk_entry
{
object_id oid;
uint64_t version;
};
// 32 = 16 + 16 bytes per "clean" entry in memory (object_id => clean_entry)
struct __attribute__((__packed__)) clean_entry
{
uint64_t version;
uint64_t location;
};
// 56 = 24 + 32 bytes per dirty entry in memory (obj_ver_id => dirty_entry)
struct __attribute__((__packed__)) dirty_entry
{
uint32_t state;
uint32_t flags; // unneeded, but present for alignment
uint64_t location; // location in either journal or data -> in BYTES
uint32_t offset; // data offset within object (stripe)
uint32_t len; // data length
uint64_t journal_sector; // journal sector used for this entry
};
// - Sync must be submitted after previous writes/deletes (not before!)
// - Reads to the same object must be submitted after previous writes/deletes
// are written (not necessarily synced) in their location. This is because we
// rely on read-modify-write for erasure coding and we must return new data
// to calculate parity for subsequent writes
// - Writes may be submitted in any order, because they don't overlap. Each write
// goes into a new location - either on the journal device or on the data device
// - Stable (stabilize) must be submitted after sync of that object is completed
// It's even OK to return an error to the caller if that object is not synced yet
// - Journal trim may be processed only after all versions are moved to
// the main storage AND after all read operations for older versions complete
// - If an operation can not be submitted because the ring is full
// we should stop submission of other operations. Otherwise some "scatter" reads
// may end up blocked for a long time.
// Otherwise, the submit order is free, that is all operations may be submitted immediately
// In fact, adding a write operation must immediately result in dirty_db being populated
// Suspend operation until there are more free SQEs
#define WAIT_SQE 1
// Suspend operation until version <wait_detail> of object <oid> is written
#define WAIT_IN_FLIGHT 2
// Suspend operation until there are <wait_detail> bytes of free space in the journal on disk
#define WAIT_JOURNAL 3
// Suspend operation until the next journal sector buffer is free
#define WAIT_JOURNAL_BUFFER 4
// Suspend operation until there is some free space on the data device
#define WAIT_FREE 5
struct fulfill_read_t
{
uint64_t offset, len;
};
#define PRIV(op) ((blockstore_op_private_t*)(op)->private_data)
#define FINISH_OP(op) PRIV(op)->~blockstore_op_private_t(); op->callback(op)
struct blockstore_op_private_t
{
// Wait status
int wait_for;
uint64_t wait_detail;
int pending_ops;
// Read
std::vector<fulfill_read_t> read_vec;
// Sync, write
uint64_t min_used_journal_sector, max_used_journal_sector;
// Write
struct iovec iov_zerofill[3];
// Sync
std::vector<obj_ver_id> sync_big_writes, sync_small_writes;
std::list<blockstore_op_t*>::iterator in_progress_ptr;
int sync_state, prev_sync_count;
};
#include "blockstore_init.h"
#include "blockstore_flush.h"
class blockstore_impl_t
{
struct ring_consumer_t ring_consumer;
// Another option is https://github.com/algorithm-ninja/cpp-btree
spp::sparse_hash_map<object_id, clean_entry, oid_hash> clean_db;
std::map<obj_ver_id, dirty_entry> dirty_db;
std::list<blockstore_op_t*> submit_queue; // FIXME: funny thing is that vector is better here
std::vector<obj_ver_id> unsynced_big_writes, unsynced_small_writes;
std::list<blockstore_op_t*> in_progress_syncs; // ...and probably here, too
allocator *data_alloc = NULL;
uint8_t *zero_object;
uint64_t block_count;
uint32_t block_order, block_size;
int meta_fd;
int data_fd;
uint64_t meta_offset, meta_size, meta_area, meta_len;
uint64_t data_offset, data_size, data_len;
bool readonly = false;
// FIXME: separate flags for data, metadata and journal
bool disable_fsync = false;
bool inmemory_meta = false;
void *metadata_buffer = NULL;
struct journal_t journal;
journal_flusher_t *flusher;
ring_loop_t *ringloop;
bool stop_sync_submitted;
inline struct io_uring_sqe* get_sqe()
{
return ringloop->get_sqe();
}
friend class blockstore_init_meta;
friend class blockstore_init_journal;
friend class blockstore_journal_check_t;
friend class journal_flusher_t;
friend class journal_flusher_co;
void calc_lengths(blockstore_config_t & config);
void open_data(blockstore_config_t & config);
void open_meta(blockstore_config_t & config);
void open_journal(blockstore_config_t & config);
// Asynchronous init
int initialized;
int metadata_buf_size;
blockstore_init_meta* metadata_init_reader;
blockstore_init_journal* journal_init_reader;
void check_wait(blockstore_op_t *op);
// Read
int dequeue_read(blockstore_op_t *read_op);
int fulfill_read(blockstore_op_t *read_op, uint64_t &fulfilled, uint32_t item_start, uint32_t item_end,
uint32_t item_state, uint64_t item_version, uint64_t item_location);
int fulfill_read_push(blockstore_op_t *op, void *buf, uint64_t offset, uint64_t len,
uint32_t item_state, uint64_t item_version);
void handle_read_event(ring_data_t *data, blockstore_op_t *op);
// Write
void enqueue_write(blockstore_op_t *op);
int dequeue_write(blockstore_op_t *op);
int dequeue_del(blockstore_op_t *op);
void handle_write_event(ring_data_t *data, blockstore_op_t *op);
// Sync
int dequeue_sync(blockstore_op_t *op);
void handle_sync_event(ring_data_t *data, blockstore_op_t *op);
int continue_sync(blockstore_op_t *op);
void ack_one_sync(blockstore_op_t *op);
int ack_sync(blockstore_op_t *op);
// Stabilize
int dequeue_stable(blockstore_op_t *op);
void handle_stable_event(ring_data_t *data, blockstore_op_t *op);
void stabilize_object(object_id oid, uint64_t max_ver);
public:
blockstore_impl_t(blockstore_config_t & config, ring_loop_t *ringloop);
~blockstore_impl_t();
// Event loop
void loop();
// Returns true when blockstore is ready to process operations
// (Although you're free to enqueue them before that)
bool is_started();
// Returns true when it's safe to destroy the instance. If destroying the instance
// requires to purge some queues, starts that process. Should be called in the event
// loop until it returns true.
bool is_safe_to_stop();
// Submission
void enqueue_op(blockstore_op_t *op);
// Unstable writes are added here (map of object_id -> version)
std::map<object_id, uint64_t> unstable_writes;
inline uint32_t get_block_size() { return block_size; }
inline uint32_t get_block_order() { return block_order; }
inline uint64_t get_block_count() { return block_count; }
};

View File

@ -1,6 +1,6 @@
#include "blockstore.h"
#include "blockstore_impl.h"
blockstore_init_meta::blockstore_init_meta(blockstore *bs)
blockstore_init_meta::blockstore_init_meta(blockstore_impl_t *bs)
{
this->bs = bs;
}
@ -128,7 +128,7 @@ void blockstore_init_meta::handle_entries(struct clean_disk_entry* entries, unsi
}
}
blockstore_init_journal::blockstore_init_journal(blockstore *bs)
blockstore_init_journal::blockstore_init_journal(blockstore_impl_t *bs)
{
this->bs = bs;
simple_callback = [this](ring_data_t *data1)

View File

@ -2,7 +2,7 @@
class blockstore_init_meta
{
blockstore *bs;
blockstore_impl_t *bs;
int wait_state = 0, wait_count = 0;
void *metadata_buffer = NULL;
uint64_t metadata_read = 0;
@ -14,7 +14,7 @@ class blockstore_init_meta
void handle_entries(struct clean_disk_entry* entries, unsigned count, int block_order);
void handle_event(ring_data_t *data);
public:
blockstore_init_meta(blockstore *bs);
blockstore_init_meta(blockstore_impl_t *bs);
int loop();
};
@ -26,11 +26,12 @@ struct bs_init_journal_done
class blockstore_init_journal
{
blockstore *bs;
blockstore_impl_t *bs;
int wait_state = 0, wait_count = 0, handle_res = 0;
uint64_t entries_loaded = 0;
uint32_t crc32_last = 0;
bool started = false;
// FIXME: use DISK_ALIGNMENT everywhere
uint64_t next_free = 512;
std::vector<bs_init_journal_done> done;
uint64_t journal_pos = 0;
@ -46,6 +47,6 @@ class blockstore_init_journal
int handle_journal_part(void *buf, uint64_t done_pos, uint64_t len);
void handle_event(ring_data_t *data);
public:
blockstore_init_journal(blockstore* bs);
blockstore_init_journal(blockstore_impl_t* bs);
int loop();
};

View File

@ -1,6 +1,6 @@
#include "blockstore.h"
#include "blockstore_impl.h"
blockstore_journal_check_t::blockstore_journal_check_t(blockstore *bs)
blockstore_journal_check_t::blockstore_journal_check_t(blockstore_impl_t *bs)
{
this->bs = bs;
sectors_required = 0;
@ -40,7 +40,7 @@ int blockstore_journal_check_t::check_available(blockstore_op_t *op, int require
if (bs->journal.sector_info[next_sector].usage_count > 0)
{
// No memory buffer available. Wait for it.
op->wait_for = WAIT_JOURNAL_BUFFER;
PRIV(op)->wait_for = WAIT_JOURNAL_BUFFER;
return 0;
}
}
@ -56,9 +56,9 @@ int blockstore_journal_check_t::check_available(blockstore_op_t *op, int require
if (!right_dir && next_pos >= bs->journal.used_start-512)
{
// No space in the journal. Wait until used_start changes.
op->wait_for = WAIT_JOURNAL;
PRIV(op)->wait_for = WAIT_JOURNAL;
bs->flusher->force_start();
op->wait_detail = bs->journal.used_start;
PRIV(op)->wait_detail = bs->journal.used_start;
return 0;
}
return 1;

View File

@ -138,12 +138,12 @@ struct journal_t
struct blockstore_journal_check_t
{
blockstore *bs;
blockstore_impl_t *bs;
uint64_t next_pos, next_sector, next_in_pos;
int sectors_required;
bool right_dir; // writing to the end or the beginning of the ring buffer
blockstore_journal_check_t(blockstore *bs);
blockstore_journal_check_t(blockstore_impl_t *bs);
int check_available(blockstore_op_t *op, int required, int size, int data_after);
};

View File

@ -1,6 +1,6 @@
#include "blockstore.h"
#include "blockstore_impl.h"
void blockstore::calc_lengths(blockstore_config_t & config)
void blockstore_impl_t::calc_lengths(blockstore_config_t & config)
{
if (config["readonly"] == "true" || config["readonly"] == "1" || config["readonly"] == "yes")
{
@ -111,7 +111,7 @@ void check_size(int fd, uint64_t *size, std::string name)
}
}
void blockstore::open_data(blockstore_config_t & config)
void blockstore_impl_t::open_data(blockstore_config_t & config)
{
data_offset = strtoull(config["data_offset"].c_str(), NULL, 10);
if (data_offset % DISK_ALIGNMENT)
@ -130,7 +130,7 @@ void blockstore::open_data(blockstore_config_t & config)
}
}
void blockstore::open_meta(blockstore_config_t & config)
void blockstore_impl_t::open_meta(blockstore_config_t & config)
{
meta_offset = strtoull(config["meta_offset"].c_str(), NULL, 10);
if (meta_offset % DISK_ALIGNMENT)
@ -162,7 +162,7 @@ void blockstore::open_meta(blockstore_config_t & config)
}
}
void blockstore::open_journal(blockstore_config_t & config)
void blockstore_impl_t::open_journal(blockstore_config_t & config)
{
journal.offset = strtoull(config["journal_offset"].c_str(), NULL, 10);
if (journal.offset % DISK_ALIGNMENT)

View File

@ -1,13 +1,13 @@
#include "blockstore.h"
#include "blockstore_impl.h"