diff --git a/osd.cpp b/osd.cpp index 603683d0..67857181 100644 --- a/osd.cpp +++ b/osd.cpp @@ -6,22 +6,45 @@ #include "osd_ops.h" #include "ringloop.h" +struct osd_op_t +{ + union + { + osd_any_op_t op; + uint8_t op_buf[OSD_OP_PACKET_SIZE]; + }; + blockstore_operation bs_op; + int client_fd; + void *buf = NULL; +}; + struct osd_client_t { sockaddr_in peer_addr; socklen_t peer_addr_size; int peer_fd; - bool ready; - bool reading; + bool ready = false; + bool reading = false; + int in_flight_ops = 0; + struct osd_op_t *cur_op = NULL; iovec iov; msghdr msg; void *cur_buf = NULL; - int cur_len = 0; + int cur_done = 0, cur_remaining = 0; }; class osd_t { + // config + + int client_queue_depth = 128; + + // fields + + blockstore *bs; + ring_loop_t *ringloop; + int wait_state = 0; int epoll_fd = 0; int listen_fd = 0; @@ -29,20 +52,20 @@ class osd_t std::string bind_address; int bind_port, listen_backlog; - ring_loop_t *ringloop; std::unordered_map clients; std::deque ready_clients; void handle_epoll_events(); public: - osd_t(ring_loop_t *ringloop); + osd_t(blockstore *bs, ring_loop_t *ringloop); ~osd_t(); void loop(); }; -osd_t::osd_t(ring_loop_t *ringloop) +osd_t::osd_t(blockstore *bs, ring_loop_t *ringloop) { + this->bs = bs; this->ringloop = ringloop; listen_fd = socket(AF_INET, SOCK_STREAM, 0); @@ -205,18 +228,18 @@ void osd_t::stop_client(int peer_fd) throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); } auto it = clients.find(peer_fd); - clients.erase(it); - if (cl.ready) + if (it->ready) { - for (auto it = ready_clients.begin(); it != ready_clients.end(); it++) + for (auto rit = ready_clients.begin(); rit != ready_clients.end(); rit++) { - if (*it == peer_fd) + if (*rit == peer_fd) { - ready_clients.erase(it); + ready_clients.erase(rit); break; } } } + clients.erase(it); close(peer_fd); } @@ -228,9 +251,11 @@ void osd_t::read_commands() auto & cl = clients[peer_fd]; if (!cl.cur_buf) { - // no reads in progress, this is probably a new command - cl.cur_buf = cl.command_buffer; - cl.cur_len = OSD_OP_PACKET_SIZE; + // no reads in progress, so this is probably a new command + cl.cur_op = new osd_op_t; + cl.cur_buf = &cl.cur_op->op_buf; + cl.cur_done = 0; + cl.cur_remaining = OSD_OP_PACKET_SIZE; } struct io_uring_sqe* sqe = ringloop->get_sqe(); if (!sqe) @@ -240,13 +265,14 @@ void osd_t::read_commands() } struct ring_data_t* data = ((ring_data_t*)sqe->user_data); cl.iov.iov_base = cl.cur_buf; - cl.iov.iov_len = cl.cur_len; + cl.iov.iov_len = cl.cur_remaining; cl.msg.msg_iov = &cl.iov; cl.msg.msg_iovlen = 1; data->callback = [this, peer_fd](ring_data_t *data) { handle_read(data, peer_fd); }; my_uring_prep_recvmsg(sqe, peer_fd, &cl.msg, 0); ringloop->submit(); cl.reading = true; + cl.ready = false; } ready_clients.clear(); } @@ -265,7 +291,106 @@ void osd_t::handle_read(ring_data_t *data, int peer_fd) } cl->reading = false; if (cl->ready) + { ready_clients.push_back(peer_fd); - + } + if (data->res > 0) + { + cl->cur_done += data->res; + cl->cur_remaining -= data->res; + cl->cur_buf += data->res; + if (cl->cur_remaining <= 0) + { + cl->cur_buf = NULL; + if (cl->read_state == CL_READ_COMMAND) + { + osd_op_t *cur_op = cl->cur_op; + if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ || + cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE || + cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE) + { + // Allocate a buffer + cur_op->buf = memalign(512, cur_op->op.sec_rw.len); + } + if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE || + cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE) + { + // Read data + cl->cur_buf = cur_op->buf; + cl->cur_done = 0; + cl->cur_remaining = cur_op->op.sec_rw.len; + cl->read_state = CL_READ_DATA; + } + else + { + // Command is ready + enqueue_op(cur_op); + cl->cur_op = NULL; + cl->read_state = 0; + } + } + else if (cl->read_state == CL_READ_DATA) + { + // Command is ready + enqueue_op(cur_op); + cl->cur_op = NULL; + cl->read_state = 0; + } + } + } } } + +void osd_t::enqueue_op(int peer_fd, osd_op_t *cur_op) +{ + cur_op->bs_op->callback = [this, peer_fd, cur_op](blockstore_operation* bs_op) + { + auto cl = clients.find(peer_fd); + if (cl != clients.end()) + { + cl->replies.push(cur_op); + } + else + { + if (cur_op->buf) + free(cur_op->buf); + delete cur_op; + } + }; + if (cur_op->op->hdr.magic != SECONDARY_OSD_OP_MAGIC || + cur_op->op->hdr.opcode < OSD_OP_MIN || cur_op->op->hdr.opcode > OSD_OP_MAX || + (cur_op->op->hdr.opcode == OSD_OP_SECONDARY_READ || cur_op->op->hdr.opcode == OSD_OP_SECONDARY_WRITE) && + (cur_op->op->sec_rw.len > OSD_RW_MAX || cur_op->op->sec_rw.len % OSD_RW_ALIGN || cur_op->op->sec_rw.offset % OSD_RW_ALIGN)) + { + // Bad command + cur_op->bs_op->retval = -EINVAL; + cur_op->bs_op->callback(); + return; + } + cur_op->bs_op->flags = (cur_op->op->hdr.opcode == OSD_OP_SECONDARY_READ ? OP_READ + : (cur_op->op->hdr.opcode == OSD_OP_SECONDARY_WRITE ? OP_WRITE + : (cur_op->op->hdr.opcode == OSD_OP_SECONDARY_SYNC ? OP_SYNC + : (cur_op->op->hdr.opcode == OSD_OP_SECONDARY_STABILIZE ? OP_STABLE + : (cur_op->op->hdr.opcode == OSD_OP_SECONDARY_DELETE ? OP_DELETE + : -1)))))); + if (cur_op->op->hdr.opcode == OSD_OP_SECONDARY_READ || + cur_op->op->hdr.opcode == OSD_OP_SECONDARY_WRITE) + { + cur_op->bs_op->oid = cur_op->op->sec_rw.oid; + cur_op->bs_op->version = cur_op->op->sec_rw.version; + cur_op->bs_op->offset = cur_op->op->sec_rw.offset; + cur_op->bs_op->len = cur_op->op->sec_rw.len; + cur_op->bs_op->buf = cur_op->buf; + } + else if (cur_op->op->hdr.opcode == OSD_OP_SECONDARY_DELETE) + { + cur_op->bs_op->oid = cur_op->op->sec_del.oid; + cur_op->bs_op->version = cur_op->op->sec_del.version; + } + else if (cur_op->op->hdr.opcode == OSD_OP_SECONDARY_STABILIZE) + { + cur_op->bs_op->len = cur_op->op->len/sizeof(obj_ver_id); + cur_op->bs_op->buf = cur_op->buf; + } + bs->enqueue_op(cur_op->bs_op); +} diff --git a/osd_ops.h b/osd_ops.h index 76acd1fa..6cfa04eb 100644 --- a/osd_ops.h +++ b/osd_ops.h @@ -7,12 +7,16 @@ #define SECONDARY_OSD_REPLY_MAGIC 0xd17a57243b580b99baa699b87b434553 #define OSD_OP_PACKET_SIZE 0x80 #define OSD_REPLY_PACKET_SIZE 0x80 +#define OSD_OP_MIN 0x01 #define OSD_OP_SECONDARY_READ 0x01 #define OSD_OP_SECONDARY_WRITE 0x02 #define OSD_OP_SECONDARY_SYNC 0x03 #define OSD_OP_SECONDARY_STABILIZE 0x04 #define OSD_OP_SECONDARY_DELETE 0x05 #define OSD_OP_SECONDARY_LIST 0x10 +#define OSD_OP_MAX 0x10 +#define OSD_RW_ALIGN 512 +#define OSD_RW_MAX 64*1024*1024 // common header of all operations struct __attribute__((__packed__)) osd_op_header_t @@ -110,11 +114,11 @@ struct __attribute__((__packed__)) osd_op_secondary_stabilize_t struct __attribute__((__packed__)) { osd_op_header_t header; - // oid array length + // obj_ver_id array length in bytes uint32_t len; } packet; - // oid array - object_id *oids; + // object&version array + obj_ver_id *objects; }; struct __attribute__((__packed__)) osd_reply_secondary_stabilize_t @@ -150,20 +154,21 @@ struct __attribute__((__packed__)) osd_reply_secondary_list_t union osd_any_op_t { - osd_op_secondary_rw_t secondary_rw; - osd_op_secondary_del_t secondary_del; - osd_op_secondary_sync_t op_sync; - osd_op_secondary_stabilize_t op_stabilize; - osd_op_secondary_list_t op_list; + osd_op_header_t hdr; + osd_op_secondary_rw_t sec_rw; + osd_op_secondary_del_t sec_del; + osd_op_secondary_sync_t sec_sync; + osd_op_secondary_stabilize_t sec_stabilize; + osd_op_secondary_list_t sec_list; }; union osd_any_reply_t { - osd_reply_secondary_rw_t secondary_rw; - osd_reply_secondary_del_t secondary_del; - osd_reply_secondary_sync_t op_sync; - osd_reply_secondary_stabilize_t op_stabilize; - osd_reply_secondary_list_t op_list; + osd_reply_secondary_rw_t sec_rw; + osd_reply_secondary_del_t sec_del; + osd_reply_secondary_sync_t sec_sync; + osd_reply_secondary_stabilize_t sec_stabilize; + osd_reply_secondary_list_t sec_list; }; static int size_ok = sizeof(osd_any_op_t) < OSD_OP_PACKET_SIZE &&