forked from vitalif/vitastor
Handle all io_uring events using lambdas
parent
7739f628cb
commit
1c6b9778a4
|
@ -3,7 +3,6 @@
|
|||
blockstore::blockstore(spp::sparse_hash_map<std::string, std::string> & config, ring_loop_t *ringloop)
|
||||
{
|
||||
this->ringloop = ringloop;
|
||||
ring_consumer.handle_event = [this](ring_data_t *d) { handle_event(d); };
|
||||
ring_consumer.loop = [this]() { loop(); };
|
||||
ringloop->register_consumer(ring_consumer);
|
||||
initialized = 0;
|
||||
|
@ -56,48 +55,6 @@ blockstore::~blockstore()
|
|||
free(journal.sector_info);
|
||||
}
|
||||
|
||||
// main event loop - handle requests
|
||||
void blockstore::handle_event(ring_data_t *data)
|
||||
{
|
||||
if (initialized != 10)
|
||||
{
|
||||
if (metadata_init_reader)
|
||||
{
|
||||
metadata_init_reader->handle_event(data);
|
||||
}
|
||||
else if (journal_init_reader)
|
||||
{
|
||||
journal_init_reader->handle_event(data);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
struct blockstore_operation* op = (struct blockstore_operation*)data->op;
|
||||
if ((op->flags & OP_TYPE_MASK) == OP_READ)
|
||||
{
|
||||
handle_read_event(data, op);
|
||||
}
|
||||
else if ((op->flags & OP_TYPE_MASK) == OP_WRITE ||
|
||||
(op->flags & OP_TYPE_MASK) == OP_DELETE)
|
||||
{
|
||||
handle_write_event(data, op);
|
||||
}
|
||||
else if ((op->flags & OP_TYPE_MASK) == OP_SYNC)
|
||||
{
|
||||
handle_sync_event(data, op);
|
||||
}
|
||||
else if ((op->flags & OP_TYPE_MASK) == OP_STABLE)
|
||||
{
|
||||
handle_stable_event(data, op);
|
||||
}
|
||||
else if ((op->flags & OP_TYPE_MASK) == OP_INTERNAL_FLUSH)
|
||||
{
|
||||
// Operation is not a blockstore_operation at all
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// main event loop - produce requests
|
||||
void blockstore::loop()
|
||||
{
|
||||
|
|
|
@ -188,7 +188,6 @@ public:
|
|||
#define OP_SYNC 3
|
||||
#define OP_STABLE 4
|
||||
#define OP_DELETE 5
|
||||
#define OP_INTERNAL_FLUSH 6
|
||||
#define OP_TYPE_MASK 0x7
|
||||
|
||||
// Suspend operation until there are more free SQEs
|
||||
|
@ -221,7 +220,7 @@ struct blockstore_operation
|
|||
// FIXME: Move internal fields somewhere
|
||||
friend class blockstore;
|
||||
friend class blockstore_journal_check_t;
|
||||
friend void prepare_journal_sector_write(blockstore_operation *op, journal_t & journal, io_uring_sqe *sqe);
|
||||
friend void prepare_journal_sector_write(journal_t & journal, io_uring_sqe *sqe, std::function<void(ring_data_t*)> cb);
|
||||
private:
|
||||
// Wait status
|
||||
int wait_for;
|
||||
|
@ -328,7 +327,6 @@ public:
|
|||
~blockstore();
|
||||
|
||||
// Event loop
|
||||
void handle_event(ring_data_t* data);
|
||||
void loop();
|
||||
|
||||
// Returns true when it's safe to destroy the instance. If destroying the instance
|
||||
|
|
|
@ -88,7 +88,10 @@ void journal_flusher_co::loop()
|
|||
v.insert(it, (copy_buffer_t){ .offset = offset, .len = submit_len, .buf = memalign(512, submit_len) });
|
||||
data = ((ring_data_t*)sqe->user_data);
|
||||
data->iov = (struct iovec){ v.end()->buf, (size_t)submit_len };
|
||||
data->op = this;
|
||||
data->callback = [this](ring_data_t* data)
|
||||
{
|
||||
wait_count--;
|
||||
};
|
||||
io_uring_prep_readv(
|
||||
sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + dirty_it->second.location + offset
|
||||
);
|
||||
|
@ -156,7 +159,11 @@ void journal_flusher_co::loop()
|
|||
}
|
||||
data = ((ring_data_t*)sqe->user_data);
|
||||
data->iov = (struct iovec){ meta_it->second.buf, 512 };
|
||||
data->op = this;
|
||||
data->callback = [this](ring_data_t* data)
|
||||
{
|
||||
|
||||
wait_count--;
|
||||
};
|
||||
io_uring_prep_writev(
|
||||
sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_sector
|
||||
);
|
||||
|
@ -181,7 +188,10 @@ void journal_flusher_co::loop()
|
|||
}
|
||||
data = ((ring_data_t*)sqe->user_data);
|
||||
data->iov = (struct iovec){ it->buf, (size_t)it->len };
|
||||
data->op = this;
|
||||
data->callback = [this](ring_data_t* data)
|
||||
{
|
||||
wait_count--;
|
||||
};
|
||||
io_uring_prep_writev(
|
||||
sqe, bs->data_fd, &data->iov, 1, bs->data_offset + clean_loc + it->offset
|
||||
);
|
||||
|
@ -210,7 +220,10 @@ void journal_flusher_co::loop()
|
|||
}
|
||||
data = ((ring_data_t*)sqe->user_data);
|
||||
data->iov = (struct iovec){ meta_it->second.buf, 512 };
|
||||
data->op = this;
|
||||
data->callback = [this](ring_data_t* data)
|
||||
{
|
||||
wait_count--;
|
||||
};
|
||||
io_uring_prep_writev(
|
||||
sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_sector
|
||||
);
|
||||
|
|
|
@ -15,7 +15,7 @@ struct meta_sector_t
|
|||
class journal_flusher_t;
|
||||
|
||||
// Journal flusher coroutine
|
||||
class journal_flusher_co
|
||||
struct journal_flusher_co
|
||||
{
|
||||
blockstore *bs;
|
||||
journal_flusher_t *flusher;
|
||||
|
|
|
@ -42,6 +42,7 @@ int blockstore_init_meta::loop()
|
|||
metadata_buffer + (prev == 1 ? bs->metadata_buf_size : 0),
|
||||
bs->meta_len - metadata_read > bs->metadata_buf_size ? bs->metadata_buf_size : bs->meta_len - metadata_read,
|
||||
};
|
||||
data->callback = [this](ring_data_t *data) { handle_event(data); };
|
||||
io_uring_prep_readv(sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + metadata_read);
|
||||
bs->ringloop->submit();
|
||||
submitted = (prev == 1 ? 2 : 1);
|
||||
|
@ -179,6 +180,7 @@ int blockstore_init_journal::loop()
|
|||
}
|
||||
struct ring_data_t *data = ((ring_data_t*)sqe->user_data);
|
||||
data->iov = { journal_buffer, 512 };
|
||||
data->callback = [this](ring_data_t *data) { handle_event(data); };
|
||||
io_uring_prep_readv(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset);
|
||||
bs->ringloop->submit();
|
||||
step = 1;
|
||||
|
@ -211,6 +213,7 @@ int blockstore_init_journal::loop()
|
|||
journal_buffer + (done_buf == 1 ? JOURNAL_BUFFER_SIZE : 0),
|
||||
end - journal_pos < JOURNAL_BUFFER_SIZE ? end - journal_pos : JOURNAL_BUFFER_SIZE,
|
||||
};
|
||||
data->callback = [this](ring_data_t *data) { handle_event(data); };
|
||||
io_uring_prep_readv(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + journal_pos);
|
||||
bs->ringloop->submit();
|
||||
submitted = done_buf == 1 ? 2 : 1;
|
||||
|
|
|
@ -7,9 +7,9 @@ class blockstore_init_meta
|
|||
uint64_t metadata_read = 0;
|
||||
int prev = 0, prev_done = 0, done_len = 0, submitted = 0, done_cnt = 0;
|
||||
void handle_entries(struct clean_disk_entry* entries, int count);
|
||||
void handle_event(ring_data_t *data);
|
||||
public:
|
||||
blockstore_init_meta(blockstore *bs);
|
||||
void handle_event(ring_data_t *data);
|
||||
int loop();
|
||||
};
|
||||
|
||||
|
@ -24,8 +24,8 @@ class blockstore_init_journal
|
|||
bool wrapped = false;
|
||||
int submitted = 0, done_buf = 0, done_len = 0;
|
||||
int handle_journal_part(void *buf, uint64_t len);
|
||||
void handle_event(ring_data_t *data);
|
||||
public:
|
||||
blockstore_init_journal(blockstore* bs);
|
||||
void handle_event(ring_data_t *data);
|
||||
int loop();
|
||||
};
|
||||
|
|
|
@ -47,12 +47,12 @@ int blockstore_journal_check_t::check_available(blockstore_operation *op, int re
|
|||
return 1;
|
||||
}
|
||||
|
||||
void prepare_journal_sector_write(blockstore_operation *op, journal_t & journal, io_uring_sqe *sqe)
|
||||
void prepare_journal_sector_write(journal_t & journal, io_uring_sqe *sqe, std::function<void(ring_data_t*)> cb)
|
||||
{
|
||||
journal.sector_info[journal.cur_sector].usage_count++;
|
||||
struct ring_data_t *data = ((ring_data_t*)sqe->user_data);
|
||||
ring_data_t *data = ((ring_data_t*)sqe->user_data);
|
||||
data->iov = (struct iovec){ journal.sector_buf + 512*journal.cur_sector, 512 };
|
||||
data->op = op;
|
||||
data->callback = cb;
|
||||
io_uring_prep_writev(
|
||||
sqe, journal.fd, &data->iov, 1, journal.offset + journal.sector_info[journal.cur_sector].offset
|
||||
);
|
||||
|
|
|
@ -157,4 +157,4 @@ inline journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t
|
|||
}
|
||||
|
||||
// FIXME: make inline
|
||||
void prepare_journal_sector_write(blockstore_operation *op, journal_t & journal, io_uring_sqe *sqe);
|
||||
void prepare_journal_sector_write(journal_t & journal, io_uring_sqe *sqe, std::function<void(ring_data_t*)> cb);
|
||||
|
|
|
@ -31,7 +31,7 @@ int blockstore::fulfill_read_push(blockstore_operation *op, uint64_t &fulfilled,
|
|||
&data->iov, 1,
|
||||
(IS_JOURNAL(item_state) ? journal.offset : data_offset) + item_location + cur_start - item_start
|
||||
);
|
||||
data->op = op;
|
||||
data->callback = [this, op](ring_data_t *data) { handle_read_event(data, op); };
|
||||
fulfilled += cur_end-cur_start;
|
||||
}
|
||||
return 1;
|
||||
|
|
|
@ -85,6 +85,7 @@ int blockstore::dequeue_stable(blockstore_operation *op)
|
|||
BS_SUBMIT_GET_SQE_DECL(sqe[i]);
|
||||
}
|
||||
// Prepare and submit journal entries
|
||||
auto cb = [this, op](ring_data_t *data) { handle_stable_event(data, op); };
|
||||
int s = 0, cur_sector = -1;
|
||||
for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++)
|
||||
{
|
||||
|
@ -99,7 +100,7 @@ int blockstore::dequeue_stable(blockstore_operation *op)
|
|||
if (cur_sector == -1)
|
||||
op->min_used_journal_sector = 1 + journal.cur_sector;
|
||||
cur_sector = journal.cur_sector;
|
||||
prepare_journal_sector_write(op, journal, sqe[s++]);
|
||||
prepare_journal_sector_write(journal, sqe[s++], cb);
|
||||
}
|
||||
}
|
||||
op->max_used_journal_sector = 1 + journal.cur_sector;
|
||||
|
|
|
@ -37,12 +37,13 @@ int blockstore::dequeue_sync(blockstore_operation *op)
|
|||
|
||||
int blockstore::continue_sync(blockstore_operation *op)
|
||||
{
|
||||
auto cb = [this, op](ring_data_t *data) { handle_sync_event(data, op); };
|
||||
if (op->sync_state == SYNC_HAS_SMALL)
|
||||
{
|
||||
// No big writes, just fsync the journal
|
||||
BS_SUBMIT_GET_SQE(sqe, data);
|
||||
io_uring_prep_fsync(sqe, journal.fd, 0);
|
||||
data->op = op;
|
||||
data->callback = cb;
|
||||
op->pending_ops = 1;
|
||||
op->sync_state = SYNC_JOURNAL_SYNC_SENT;
|
||||
}
|
||||
|
@ -51,7 +52,7 @@ int blockstore::continue_sync(blockstore_operation *op)
|
|||
// 1st step: fsync data
|
||||
BS_SUBMIT_GET_SQE(sqe, data);
|
||||
io_uring_prep_fsync(sqe, data_fd, 0);
|
||||
data->op = op;
|
||||
data->callback = cb;
|
||||
op->pending_ops = 1;
|
||||
op->sync_state = SYNC_DATA_SYNC_SENT;
|
||||
}
|
||||
|
@ -88,14 +89,14 @@ int blockstore::continue_sync(blockstore_operation *op)
|
|||
if (cur_sector == -1)
|
||||
op->min_used_journal_sector = 1 + journal.cur_sector;
|
||||
cur_sector = journal.cur_sector;
|
||||
prepare_journal_sector_write(op, journal, sqe[s++]);
|
||||
prepare_journal_sector_write(journal, sqe[s++], cb);
|
||||
}
|
||||
}
|
||||
op->max_used_journal_sector = 1 + journal.cur_sector;
|
||||
// ... And a journal fsync
|
||||
io_uring_prep_fsync(sqe[s], journal.fd, 0);
|
||||
struct ring_data_t *data = ((ring_data_t*)sqe[s]->user_data);
|
||||
data->op = op;
|
||||
data->callback = cb;
|
||||
op->pending_ops = 1 + s;
|
||||
op->sync_state = SYNC_JOURNAL_SYNC_SENT;
|
||||
}
|
||||
|
|
|
@ -58,6 +58,7 @@ void blockstore::enqueue_write(blockstore_operation *op)
|
|||
// First step of the write algorithm: dequeue operation and submit initial write(s)
|
||||
int blockstore::dequeue_write(blockstore_operation *op)
|
||||
{
|
||||
auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); };
|
||||
auto dirty_it = dirty_db.find((obj_ver_id){
|
||||
.oid = op->oid,
|
||||
.version = op->version,
|
||||
|
@ -94,7 +95,7 @@ int blockstore::dequeue_write(blockstore_operation *op)
|
|||
vcnt = 1;
|
||||
op->iov_zerofill[0] = (struct iovec){ op->buf, op->len };
|
||||
}
|
||||
data->op = op;
|
||||
data->callback = cb;
|
||||
io_uring_prep_writev(
|
||||
sqe, data_fd, op->iov_zerofill, vcnt, data_offset + (loc << block_order)
|
||||
);
|
||||
|
@ -127,12 +128,12 @@ int blockstore::dequeue_write(blockstore_operation *op)
|
|||
je->len = op->len;
|
||||
je->crc32 = je_crc32((journal_entry*)je);
|
||||
journal.crc32_last = je->crc32;
|
||||
prepare_journal_sector_write(op, journal, sqe1);
|
||||
prepare_journal_sector_write(journal, sqe1, cb);
|
||||
op->min_used_journal_sector = op->max_used_journal_sector = 1 + journal.cur_sector;
|
||||
// Prepare journal data write
|
||||
journal.next_free = (journal.next_free + op->len) < journal.len ? journal.next_free : 512;
|
||||
data2->iov = (struct iovec){ op->buf, op->len };
|
||||
data2->op = op;
|
||||
data2->callback = cb;
|
||||
io_uring_prep_writev(
|
||||
sqe2, journal.fd, &data2->iov, 1, journal.offset + journal.next_free
|
||||
);
|
||||
|
|
10
ringloop.cpp
10
ringloop.cpp
|
@ -52,7 +52,6 @@ void ring_loop_t::unregister_consumer(int number)
|
|||
{
|
||||
if (number < consumers.size())
|
||||
{
|
||||
consumers[number].handle_event = NULL;
|
||||
consumers[number].loop = NULL;
|
||||
}
|
||||
}
|
||||
|
@ -67,14 +66,9 @@ void ring_loop_t::loop(bool sleep)
|
|||
while ((io_uring_peek_cqe(ring, &cqe), cqe))
|
||||
{
|
||||
struct ring_data_t *d = (struct ring_data_t*)cqe->user_data;
|
||||
if (d->source < consumers.size())
|
||||
if (d->callback)
|
||||
{
|
||||
d->res = cqe->res;
|
||||
ring_consumer_t & c = consumers[d->source];
|
||||
if (c.handle_event != NULL)
|
||||
{
|
||||
c.handle_event(d);
|
||||
}
|
||||
d->callback(d);
|
||||
}
|
||||
io_uring_cqe_seen(ring, cqe);
|
||||
}
|
||||
|
|
|
@ -15,13 +15,12 @@ struct ring_data_t
|
|||
uint64_t source;
|
||||
struct iovec iov; // for single-entry read/write operations
|
||||
int res;
|
||||
void *op;
|
||||
std::function<void(ring_data_t*)> callback;
|
||||
};
|
||||
|
||||
struct ring_consumer_t
|
||||
{
|
||||
int number;
|
||||
std::function<void(ring_data_t*)> handle_event;
|
||||
std::function<void(void)> loop;
|
||||
};
|
||||
|
||||
|
|
Loading…
Reference in New Issue