Begin op enqueueing into the blockstore

blocking-uring-test
Vitaliy Filippov 2019-12-13 20:12:31 +03:00
parent 283d03ef18
commit f6eb4326b3
2 changed files with 159 additions and 29 deletions

157
osd.cpp
View File

@ -6,22 +6,45 @@
#include "osd_ops.h"
#include "ringloop.h"
struct osd_op_t
{
union
{
osd_any_op_t op;
uint8_t op_buf[OSD_OP_PACKET_SIZE];
};
blockstore_operation bs_op;
int client_fd;
void *buf = NULL;
};
struct osd_client_t
{
sockaddr_in peer_addr;
socklen_t peer_addr_size;
int peer_fd;
bool ready;
bool reading;
bool ready = false;
bool reading = false;
int in_flight_ops = 0;
struct osd_op_t *cur_op = NULL;
iovec iov;
msghdr msg;
void *cur_buf = NULL;
int cur_len = 0;
int cur_done = 0, cur_remaining = 0;
};
class osd_t
{
// config
int client_queue_depth = 128;
// fields
blockstore *bs;
ring_loop_t *ringloop;
int wait_state = 0;
int epoll_fd = 0;
int listen_fd = 0;
@ -29,20 +52,20 @@ class osd_t
std::string bind_address;
int bind_port, listen_backlog;
ring_loop_t *ringloop;
std::unordered_map<int,osd_client_t> clients;
std::deque<int> ready_clients;
void handle_epoll_events();
public:
osd_t(ring_loop_t *ringloop);
osd_t(blockstore *bs, ring_loop_t *ringloop);
~osd_t();
void loop();
};
osd_t::osd_t(ring_loop_t *ringloop)
osd_t::osd_t(blockstore *bs, ring_loop_t *ringloop)
{
this->bs = bs;
this->ringloop = ringloop;
listen_fd = socket(AF_INET, SOCK_STREAM, 0);
@ -205,18 +228,18 @@ void osd_t::stop_client(int peer_fd)
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
}
auto it = clients.find(peer_fd);
clients.erase(it);
if (cl.ready)
if (it->ready)
{
for (auto it = ready_clients.begin(); it != ready_clients.end(); it++)
for (auto rit = ready_clients.begin(); rit != ready_clients.end(); rit++)
{
if (*it == peer_fd)
if (*rit == peer_fd)
{
ready_clients.erase(it);
ready_clients.erase(rit);
break;
}
}
}
clients.erase(it);
close(peer_fd);
}
@ -228,9 +251,11 @@ void osd_t::read_commands()
auto & cl = clients[peer_fd];
if (!cl.cur_buf)
{
// no reads in progress, this is probably a new command
cl.cur_buf = cl.command_buffer;
cl.cur_len = OSD_OP_PACKET_SIZE;
// no reads in progress, so this is probably a new command
cl.cur_op = new osd_op_t;
cl.cur_buf = &cl.cur_op->op_buf;
cl.cur_done = 0;
cl.cur_remaining = OSD_OP_PACKET_SIZE;
}
struct io_uring_sqe* sqe = ringloop->get_sqe();
if (!sqe)
@ -240,13 +265,14 @@ void osd_t::read_commands()
}
struct ring_data_t* data = ((ring_data_t*)sqe->user_data);
cl.iov.iov_base = cl.cur_buf;
cl.iov.iov_len = cl.cur_len;
cl.iov.iov_len = cl.cur_remaining;
cl.msg.msg_iov = &cl.iov;
cl.msg.msg_iovlen = 1;
data->callback = [this, peer_fd](ring_data_t *data) { handle_read(data, peer_fd); };
my_uring_prep_recvmsg(sqe, peer_fd, &cl.msg, 0);
ringloop->submit();
cl.reading = true;
cl.ready = false;
}
ready_clients.clear();
}
@ -265,7 +291,106 @@ void osd_t::handle_read(ring_data_t *data, int peer_fd)
}
cl->reading = false;
if (cl->ready)
{
ready_clients.push_back(peer_fd);
}
if (data->res > 0)
{
cl->cur_done += data->res;
cl->cur_remaining -= data->res;
cl->cur_buf += data->res;
if (cl->cur_remaining <= 0)
{
cl->cur_buf = NULL;
if (cl->read_state == CL_READ_COMMAND)
{
osd_op_t *cur_op = cl->cur_op;
if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ ||
cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE ||
cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE)
{
// Allocate a buffer
cur_op->buf = memalign(512, cur_op->op.sec_rw.len);
}
if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE ||
cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE)
{
// Read data
cl->cur_buf = cur_op->buf;
cl->cur_done = 0;
cl->cur_remaining = cur_op->op.sec_rw.len;
cl->read_state = CL_READ_DATA;
}
else
{
// Command is ready
enqueue_op(cur_op);
cl->cur_op = NULL;
cl->read_state = 0;
}
}
else if (cl->read_state == CL_READ_DATA)
{
// Command is ready
enqueue_op(cur_op);
cl->cur_op = NULL;
cl->read_state = 0;
}
}
}
}
}
void osd_t::enqueue_op(int peer_fd, osd_op_t *cur_op)
{
cur_op->bs_op->callback = [this, peer_fd, cur_op](blockstore_operation* bs_op)
{
auto cl = clients.find(peer_fd);
if (cl != clients.end())
{
cl->replies.push(cur_op);
}
else
{
if (cur_op->buf)
free(cur_op->buf);
delete cur_op;
}
};
if (cur_op->op->hdr.magic != SECONDARY_OSD_OP_MAGIC ||
cur_op->op->hdr.opcode < OSD_OP_MIN || cur_op->op->hdr.opcode > OSD_OP_MAX ||
(cur_op->op->hdr.opcode == OSD_OP_SECONDARY_READ || cur_op->op->hdr.opcode == OSD_OP_SECONDARY_WRITE) &&
(cur_op->op->sec_rw.len > OSD_RW_MAX || cur_op->op->sec_rw.len % OSD_RW_ALIGN || cur_op->op->sec_rw.offset % OSD_RW_ALIGN))
{
// Bad command
cur_op->bs_op->retval = -EINVAL;
cur_op->bs_op->callback();
return;
}
cur_op->bs_op->flags = (cur_op->op->hdr.opcode == OSD_OP_SECONDARY_READ ? OP_READ
: (cur_op->op->hdr.opcode == OSD_OP_SECONDARY_WRITE ? OP_WRITE
: (cur_op->op->hdr.opcode == OSD_OP_SECONDARY_SYNC ? OP_SYNC
: (cur_op->op->hdr.opcode == OSD_OP_SECONDARY_STABILIZE ? OP_STABLE
: (cur_op->op->hdr.opcode == OSD_OP_SECONDARY_DELETE ? OP_DELETE
: -1))))));
if (cur_op->op->hdr.opcode == OSD_OP_SECONDARY_READ ||
cur_op->op->hdr.opcode == OSD_OP_SECONDARY_WRITE)
{
cur_op->bs_op->oid = cur_op->op->sec_rw.oid;
cur_op->bs_op->version = cur_op->op->sec_rw.version;
cur_op->bs_op->offset = cur_op->op->sec_rw.offset;
cur_op->bs_op->len = cur_op->op->sec_rw.len;
cur_op->bs_op->buf = cur_op->buf;
}
else if (cur_op->op->hdr.opcode == OSD_OP_SECONDARY_DELETE)
{
cur_op->bs_op->oid = cur_op->op->sec_del.oid;
cur_op->bs_op->version = cur_op->op->sec_del.version;
}
else if (cur_op->op->hdr.opcode == OSD_OP_SECONDARY_STABILIZE)
{
cur_op->bs_op->len = cur_op->op->len/sizeof(obj_ver_id);
cur_op->bs_op->buf = cur_op->buf;
}
bs->enqueue_op(cur_op->bs_op);
}

View File

@ -7,12 +7,16 @@
#define SECONDARY_OSD_REPLY_MAGIC 0xd17a57243b580b99baa699b87b434553
#define OSD_OP_PACKET_SIZE 0x80
#define OSD_REPLY_PACKET_SIZE 0x80
#define OSD_OP_MIN 0x01
#define OSD_OP_SECONDARY_READ 0x01
#define OSD_OP_SECONDARY_WRITE 0x02
#define OSD_OP_SECONDARY_SYNC 0x03
#define OSD_OP_SECONDARY_STABILIZE 0x04
#define OSD_OP_SECONDARY_DELETE 0x05
#define OSD_OP_SECONDARY_LIST 0x10
#define OSD_OP_MAX 0x10
#define OSD_RW_ALIGN 512
#define OSD_RW_MAX 64*1024*1024
// common header of all operations
struct __attribute__((__packed__)) osd_op_header_t
@ -110,11 +114,11 @@ struct __attribute__((__packed__)) osd_op_secondary_stabilize_t
struct __attribute__((__packed__))
{
osd_op_header_t header;
// oid array length
// obj_ver_id array length in bytes
uint32_t len;
} packet;
// oid array
object_id *oids;
// object&version array
obj_ver_id *objects;
};
struct __attribute__((__packed__)) osd_reply_secondary_stabilize_t
@ -150,20 +154,21 @@ struct __attribute__((__packed__)) osd_reply_secondary_list_t
union osd_any_op_t
{
osd_op_secondary_rw_t secondary_rw;
osd_op_secondary_del_t secondary_del;
osd_op_secondary_sync_t op_sync;
osd_op_secondary_stabilize_t op_stabilize;
osd_op_secondary_list_t op_list;
osd_op_header_t hdr;
osd_op_secondary_rw_t sec_rw;
osd_op_secondary_del_t sec_del;
osd_op_secondary_sync_t sec_sync;
osd_op_secondary_stabilize_t sec_stabilize;
osd_op_secondary_list_t sec_list;
};
union osd_any_reply_t
{
osd_reply_secondary_rw_t secondary_rw;
osd_reply_secondary_del_t secondary_del;
osd_reply_secondary_sync_t op_sync;
osd_reply_secondary_stabilize_t op_stabilize;
osd_reply_secondary_list_t op_list;
osd_reply_secondary_rw_t sec_rw;
osd_reply_secondary_del_t sec_del;
osd_reply_secondary_sync_t sec_sync;
osd_reply_secondary_stabilize_t sec_stabilize;
osd_reply_secondary_list_t sec_list;
};
static int size_ok = sizeof(osd_any_op_t) < OSD_OP_PACKET_SIZE &&