FSM is a dreadful unreadable thing, reimplement using gotos

blocking-uring-test
Vitaliy Filippov 2019-11-12 18:16:03 +03:00
parent 34451b6e44
commit 4afa95b0e3
4 changed files with 150 additions and 126 deletions

View File

@ -83,6 +83,11 @@ void blockstore::handle_event(ring_data_t *data)
{ {
handle_stable_event(data, op); handle_stable_event(data, op);
} }
else if ((op->flags & OP_TYPE_MASK) == OP_INTERNAL_FLUSH)
{
// Operation is not a blockstore_operation at all
}
} }
} }

View File

@ -33,8 +33,9 @@
#define ST_J_WRITTEN 3 #define ST_J_WRITTEN 3
#define ST_J_SYNCED 4 #define ST_J_SYNCED 4
#define ST_J_STABLE 5 #define ST_J_STABLE 5
#define ST_J_MOVED 6 #define ST_J_MOVE_READ_SUBMITTED 6
#define ST_J_MOVE_SYNCED 7 #define ST_J_MOVE_WRITE_SUBMITTED 7
#define ST_J_MOVE_SYNCED 8
#define ST_D_SUBMITTED 16 #define ST_D_SUBMITTED 16
#define ST_D_WRITTEN 17 #define ST_D_WRITTEN 17
@ -183,6 +184,7 @@ public:
#define OP_SYNC 3 #define OP_SYNC 3
#define OP_STABLE 4 #define OP_STABLE 4
#define OP_DELETE 5 #define OP_DELETE 5
#define OP_INTERNAL_FLUSH 6
#define OP_TYPE_MASK 0x7 #define OP_TYPE_MASK 0x7
// Suspend operation until there are more free SQEs // Suspend operation until there are more free SQEs
@ -196,9 +198,10 @@ public:
struct blockstore_operation struct blockstore_operation
{ {
std::function<void (blockstore_operation*)> callback;
// flags contain operation type and possibly other flags // flags contain operation type and possibly other flags
uint32_t flags; uint64_t flags;
// finish callback
std::function<void (blockstore_operation*)> callback;
// For reads, writes & deletes: oid is the requested object // For reads, writes & deletes: oid is the requested object
object_id oid; object_id oid;
// For reads: version=0 -> last stable, version=UINT64_MAX -> last unstable, version=X -> specific version // For reads: version=0 -> last stable, version=UINT64_MAX -> last unstable, version=X -> specific version

View File

@ -156,154 +156,170 @@ void blockstore::handle_stable_event(ring_data_t *data, blockstore_operation *op
} }
} }
struct offset_len struct copy_buffer_t
{ {
uint64_t offset, len; uint64_t offset, len;
void *buf;
}; };
class journal_flusher_t class journal_flusher_t
{ {
blockstore *bs; blockstore *bs;
int state; int wait_state, wait_count;
struct io_uring_sqe *sqe;
struct ring_data_t *data;
bool skip_copy;
obj_ver_id cur; obj_ver_id cur;
std::map<obj_ver_id, dirty_entry>::iterator dirty_it; std::map<obj_ver_id, dirty_entry>::iterator dirty_it;
std::vector<offset_len> v; std::vector<copy_buffer_t> v;
std::vector<offset_len>::iterator it; std::vector<copy_buffer_t>::iterator it;
uint64_t offset, len; uint64_t offset, len, submit_len, clean_loc;
bool allocated;
public: public:
journal_flusher_t(); journal_flusher_t(int flush_count);
std::deque<obj_ver_id> flush_queue; std::deque<obj_ver_id> flush_queue;
void stabilize_object_loop(); void loop();
}; };
#define F_NEXT_OBJ 0 journal_flusher_t::journal_flusher_t(int flusher_count)
#define F_NEXT_VER 1
#define F_FIND_POS 2
#define F_SUBMIT_FULL 3
#define F_SUBMIT_PART 4
#define F_CUT_OFFSET 5
#define F_FINISH_VER 6
journal_flusher_t::journal_flusher_t()
{ {
state = F_NEXT_OBJ;
} }
// It would be prettier as a coroutine (maybe https://github.com/hnes/libaco ?) void journal_flusher_t::loop()
// Now it's a state machine
void journal_flusher_t::stabilize_object_loop()
{ {
begin: // This is much better than implementing the whole function as an FSM
if (state == F_NEXT_OBJ) // Maybe I should consider a coroutine library like https://github.com/hnes/libaco ...
if (wait_state == 1)
goto resume_1;
else if (wait_state == 3)
goto resume_3;
else if (wait_state == 4)
goto resume_4;
else if (wait_state == 5)
goto resume_5;
if (!flush_queue.size())
return;
cur = flush_queue.front();
flush_queue.pop_front();
dirty_it = bs->dirty_db.find(cur);
if (dirty_it != bs->dirty_db.end())
{ {
// Pick next object v.clear();
if (!flush_queue.size()) wait_count = 0;
return; clean_loc = UINT64_MAX;
while (1) allocated = false;
skip_copy = false;
do
{ {
cur = flush_queue.front(); if (dirty_it->second.state == ST_J_STABLE)
flush_queue.pop_front(); {
dirty_it = bs->dirty_db.find(cur); // First we submit all reads
if (dirty_it != bs->dirty_db.end()) offset = dirty_it->second.offset;
len = dirty_it->second.size;
it = v.begin();
while (1)
{
for (; it != v.end(); it++)
if (it->offset >= offset)
break;
if (it == v.end() || it->offset > offset)
{
submit_len = it->offset >= offset+len ? len : it->offset-offset;
resume_1:
sqe = bs->get_sqe();
if (!sqe)
{
// Can't submit read, ring is full
wait_state = 1;
return;
}
v.insert(it, (copy_buffer_t){ .offset = offset, .len = submit_len, .buf = memalign(512, submit_len) });
data = ((ring_data_t*)sqe->user_data);
data->iov = (struct iovec){ v.end()->buf, (size_t)submit_len };
data->op = this;
io_uring_prep_readv(
sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + dirty_it->second.location + offset
);
wait_count++;
}
if (it == v.end() || it->offset+it->len >= offset+len)
{
break;
}
}
// So subsequent stabilizers don't flush the entry again
dirty_it->second.state = ST_J_READ_SUBMITTED;
}
else if (dirty_it->second.state == ST_D_STABLE)
{
// Copy last STABLE entry metadata
if (!skip_copy)
{
clean_loc = dirty_it->second.location;
}
skip_copy = true;
}
else if (IS_STABLE(dirty_it->second.state))
{ {
state = F_NEXT_VER;
v.clear();
break; break;
} }
else if (flush_queue.size() == 0) dirty_it--;
return; } while (dirty_it != bs->dirty_db.begin() && dirty_it->first.oid == cur.oid);
} if (clean_loc == UINT64_MAX)
}
if (state == F_NEXT_VER)
{
if (dirty_it->second.state == ST_J_STABLE)
{ {
offset = dirty_it->second.offset; // Find it in clean_db
len = dirty_it->second.size; auto clean_it = bs->clean_db.find(cur.oid);
it = v.begin(); if (clean_it == bs->clean_db.end())
state = F_FIND_POS; {
} // Object not present at all. We must allocate and zero it.
else if (dirty_it->second.state == ST_D_STABLE) clean_loc = allocator_find_free(bs->data_alloc);
{ if (clean_loc == UINT64_MAX)
{
state = F_NEXT_OBJ; throw new std::runtime_error("No space on the data device while trying to flush journal");
} }
else if (IS_STABLE(dirty_it->second.state)) // This is an interesting part. Flushing journal results in an allocation we don't know where to put O_o.
{ allocator_set(bs->data_alloc, clean_loc, true);
state = F_NEXT_OBJ; allocated = true;
} }
else
state = F_FINISH_VER;
}
if (state == F_FIND_POS)
{
for (; it != v.end(); it++)
if (it->offset >= offset)
break;
if (it == v.end() || it->offset >= offset+len)
{
state = F_SUBMIT_FULL;
}
else
{
if (it->offset > offset)
state = F_SUBMIT_PART;
else else
state = F_CUT_OFFSET; clean_loc = clean_it->second.location;
} }
} wait_state = 3;
if (state == F_SUBMIT_FULL) resume_3:
{ // After reads complete we submit writes
struct io_uring_sqe *sqe = get_sqe(); if (wait_count == 0)
if (!sqe)
return;
struct ring_data_t *data = ((ring_data_t*)sqe->user_data);
data->iov = (struct iovec){ malloc(len), len };
data->op = op; // FIXME OOPS
io_uring_prep_readv(
sqe, journal_fd, &data->iov, 1, journal_offset + dirty_it->second.location + offset
);
op->pending_ops = 1;
v.insert(it, (offset_len){ .offset = offset, .len = len });
state = F_SUBMIT_FULL_WRITE;
return;
}
if (state == F_SUBMIT_FULL_WRITE)
{
struct io_uring_sqe *sqe = get_sqe();
if (!sqe)
return;
struct ring_data_t *data = ((ring_data_t*)sqe->user_data);
}
if (state == F_SUBMIT_PART)
{
if (!can_submit)
{ {
return; for (it = v.begin(); it != v.end(); it++)
{
resume_4:
sqe = bs->get_sqe();
if (!sqe)
{
// Can't submit a write, ring is full
wait_state = 4;
return;
}
data = ((ring_data_t*)sqe->user_data);
data->iov = (struct iovec){ it->buf, (size_t)it->len };
data->op = this;
io_uring_prep_writev(
sqe, bs->data_fd, &data->iov, 1, bs->data_offset + clean_loc + it->offset
);
wait_count++;
}
wait_state = 5;
resume_5:
// Done, free all buffers
if (wait_count == 0)
{
for (it = v.begin(); it != v.end(); it++)
{
free(it->buf);
}
v.clear();
wait_state = 0;
}
} }
v.insert(it, (offset_len){ .offset = offset, .len = it->offset-offset });
state = F_CUT_OFFSET;
} }
if (state == F_CUT_OFFSET)
{
if (offset+len > it->offset+it->len)
{
len = offset+len - (it->offset+it->len);
offset = it->offset+it->len;
state = F_FIND_POS;
}
else
state = F_FINISH_VER;
}
if (state == F_FINISH_VER)
{
dirty_it--;
if (dirty_it == bs->dirty_db.begin() || dirty_it->first.oid != cur.oid)
state = F_NEXT_OBJ;
else
state = F_NEXT_VER;
}
goto begin;
} }

View File

@ -66,7 +66,7 @@ int blockstore::dequeue_write(blockstore_operation *op)
{ {
// Big (redirect) write // Big (redirect) write
uint64_t loc = allocator_find_free(data_alloc); uint64_t loc = allocator_find_free(data_alloc);
if (loc == (uint64_t)-1) if (loc == UINT64_MAX)
{ {
// no space // no space
op->retval = -ENOSPC; op->retval = -ENOSPC;