diff --git a/Makefile b/Makefile index dc7041d40..02c6fdefa 100644 --- a/Makefile +++ b/Makefile @@ -10,8 +10,14 @@ json11.o: json11/json11.cpp g++ $(CXXFLAGS) -c -o json11.o json11/json11.cpp %.o: %.cpp allocator.h blockstore_flush.h blockstore.h blockstore_impl.h blockstore_init.h blockstore_journal.h crc32c.h ringloop.h xor.h timerfd_interval.h object_id.h g++ $(CXXFLAGS) -c -o $@ $< -osd: $(BLOCKSTORE_OBJS) osd_main.cpp osd.h osd_ops.h osd.o json11.o - g++ $(CXXFLAGS) -ltcmalloc_minimal -luring -o osd osd_main.cpp osd.o json11.o $(BLOCKSTORE_OBJS) +osd_exec_secondary.o: osd_exec_secondary.cpp osd.h osd_ops.h + g++ $(CXXFLAGS) -c -o $@ $< +osd_read.o: osd_read.cpp osd.h osd_ops.h + g++ $(CXXFLAGS) -c -o $@ $< +osd_send.o: osd_send.cpp osd.h osd_ops.h + g++ $(CXXFLAGS) -c -o $@ $< +osd: $(BLOCKSTORE_OBJS) osd_main.cpp osd.h osd_ops.h osd.o osd_exec_secondary.o osd_read.o osd_send.o json11.o + g++ $(CXXFLAGS) -ltcmalloc_minimal -luring -o osd osd_main.cpp osd.o osd_exec_secondary.o osd_read.o osd_send.o json11.o $(BLOCKSTORE_OBJS) test: test.cpp g++ $(CXXFLAGS) -o test -luring test.cpp test_blockstore: $(BLOCKSTORE_OBJS) test_blockstore.cpp diff --git a/osd.cpp b/osd.cpp index cf4903aef..59a78a8a5 100644 --- a/osd.cpp +++ b/osd.cpp @@ -4,20 +4,8 @@ #include #include -#include "json11/json11.hpp" - #include "osd.h" -#define CL_READ_OP 1 -#define CL_READ_DATA 2 -#define CL_READ_REPLY_DATA 3 -#define SQE_SENT 0x100l -#define CL_WRITE_READY 1 -#define CL_WRITE_REPLY 2 -#define CL_WRITE_DATA 3 - -// FIXME: Split into more files - osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringloop) { bind_address = config["bind_address"]; @@ -146,8 +134,6 @@ void osd_t::loop() ringloop->submit(); } -#define MAX_EPOLL_EVENTS 16 - int osd_t::handle_epoll_events() { epoll_event events[MAX_EPOLL_EVENTS]; @@ -241,206 +227,7 @@ void osd_t::stop_client(int peer_fd) close(peer_fd); } -void osd_t::read_requests() -{ - for (int i = 0; i < read_ready_clients.size(); i++) - { - int peer_fd = read_ready_clients[i]; - auto & cl = clients[peer_fd]; - io_uring_sqe* sqe = ringloop->get_sqe(); - if (!sqe) - { - read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients.begin() + i); - return; - } - ring_data_t* data = ((ring_data_t*)sqe->user_data); - if (!cl.read_buf) - { - // no reads in progress - // so this is either a new command or a reply to a previously sent command - if (!cl.read_op) - { - cl.read_op = new osd_op_t; - cl.read_op->peer_fd = peer_fd; - } - cl.read_op->op_type = OSD_OP_IN; - cl.read_buf = &cl.read_op->op_buf; - cl.read_remaining = OSD_PACKET_SIZE; - cl.read_state = CL_READ_OP; - } - cl.read_iov.iov_base = cl.read_buf; - cl.read_iov.iov_len = cl.read_remaining; - cl.read_msg.msg_iov = &cl.read_iov; - cl.read_msg.msg_iovlen = 1; - data->callback = [this, peer_fd](ring_data_t *data) { handle_read(data, peer_fd); }; - my_uring_prep_recvmsg(sqe, peer_fd, &cl.read_msg, 0); - cl.reading = true; - cl.read_ready = false; - } - read_ready_clients.clear(); -} - -void osd_t::handle_read(ring_data_t *data, int peer_fd) -{ - auto cl_it = clients.find(peer_fd); - if (cl_it != clients.end()) - { - auto & cl = cl_it->second; - if (data->res < 0 && data->res != -EAGAIN) - { - // this is a client socket, so don't panic. just disconnect it - printf("Client %d socket read error: %d (%s). Disconnecting client\n", peer_fd, -data->res, strerror(-data->res)); - stop_client(peer_fd); - return; - } - cl.reading = false; - if (cl.read_ready) - { - read_ready_clients.push_back(peer_fd); - } - if (data->res > 0) - { - cl.read_remaining -= data->res; - cl.read_buf += data->res; - if (cl.read_remaining <= 0) - { - cl.read_buf = NULL; - if (cl.read_state == CL_READ_OP) - { - if (cl.read_op->op.hdr.magic == SECONDARY_OSD_REPLY_MAGIC) - { - handle_read_reply(&cl); - } - else - { - handle_read_op(&cl); - } - } - else if (cl.read_state == CL_READ_DATA) - { - // Operation is ready - enqueue_op(cl.read_op); - cl.read_op = NULL; - cl.read_state = 0; - } - else if (cl.read_state == CL_READ_REPLY_DATA) - { - // Reply is ready - auto req_it = cl.sent_ops.find(cl.read_reply_id); - osd_op_t *request = req_it->second; - cl.sent_ops.erase(req_it); - cl.read_reply_id = 0; - cl.read_state = 0; - handle_reply(request); - } - } - } - } -} - -void osd_t::handle_read_op(osd_client_t *cl) -{ - osd_op_t *cur_op = cl->read_op; - if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ || - cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE || - cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE) - { - // Allocate a buffer - cur_op->buf = memalign(512, cur_op->op.sec_rw.len); - } - else if (cur_op->op.hdr.opcode == OSD_OP_READ || - cur_op->op.hdr.opcode == OSD_OP_WRITE) - { - cur_op->buf = memalign(512, cur_op->op.rw.len); - } - if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE || - cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE || - cur_op->op.hdr.opcode == OSD_OP_WRITE) - { - // Read data - cl->read_buf = cur_op->buf; - cl->read_remaining = (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE - ? cur_op->op.sec_rw.len - : cur_op->op.rw.len); - cl->read_state = CL_READ_DATA; - } - else - { - // Operation is ready - cl->read_op = NULL; - cl->read_state = 0; - enqueue_op(cur_op); - } -} - -void osd_t::handle_read_reply(osd_client_t *cl) -{ - osd_op_t *cur_op = cl->read_op; - auto req_it = cl->sent_ops.find(cur_op->op.hdr.id); - if (req_it == cl->sent_ops.end()) - { - // Command out of sync. Drop connection - // FIXME This is probably a peer, so handle all previously sent operations carefully - stop_client(cl->peer_fd); - return; - } - osd_op_t *request = req_it->second; - memcpy(request->reply_buf, cur_op->op_buf, OSD_PACKET_SIZE); - if (request->reply.hdr.opcode == OSD_OP_SECONDARY_READ && - request->reply.hdr.retval > 0) - { - // Read data - // FIXME: request->buf must be allocated - cl->read_state = CL_READ_REPLY_DATA; - cl->read_reply_id = request->op.hdr.id; - cl->read_buf = request->buf; - cl->read_remaining = request->reply.hdr.retval; - } - else if (request->reply.hdr.opcode == OSD_OP_SECONDARY_LIST && - request->reply.hdr.retval > 0) - { - request->buf = memalign(512, sizeof(obj_ver_id) * request->reply.hdr.retval); - cl->read_state = CL_READ_REPLY_DATA; - cl->read_reply_id = request->op.hdr.id; - cl->read_buf = request->buf; - cl->read_remaining = sizeof(obj_ver_id) * request->reply.hdr.retval; - } - else - { - cl->read_state = 0; - cl->sent_ops.erase(req_it); - handle_reply(request); - } -} - -void osd_t::handle_reply(osd_op_t *cur_op) -{ - -} - -void osd_t::secondary_op_callback(osd_op_t *cur_op) -{ - inflight_ops--; - auto cl_it = clients.find(cur_op->peer_fd); - if (cl_it != clients.end()) - { - auto & cl = cl_it->second; - if (cl.write_state == 0) - { - cl.write_state = CL_WRITE_READY; - write_ready_clients.push_back(cur_op->peer_fd); - } - make_reply(cur_op); - cl.completions.push_back(cur_op); - ringloop->wakeup(); - } - else - { - delete cur_op; - } -} - -void osd_t::enqueue_op(osd_op_t *cur_op) +void osd_t::exec_op(osd_op_t *cur_op) { if (stopping) { @@ -462,62 +249,11 @@ void osd_t::enqueue_op(osd_op_t *cur_op) } if (cur_op->op.hdr.opcode == OSD_OP_TEST_SYNC_STAB_ALL) { - // Sync and stabilize all objects - // This command is only valid for tests - // FIXME: Dedup between here & fio_engine - if (!allow_test_ops) - { - cur_op->bs_op.retval = -EINVAL; - secondary_op_callback(cur_op); - return; - } - cur_op->bs_op.opcode = BS_OP_SYNC; - cur_op->bs_op.callback = [this, cur_op](blockstore_op_t *op) - { - auto & unstable_writes = bs->get_unstable_writes(); - if (op->retval >= 0 && unstable_writes.size() > 0) - { - op->opcode = BS_OP_STABLE; - op->len = unstable_writes.size(); - obj_ver_id *vers = new obj_ver_id[op->len]; - op->buf = vers; - int i = 0; - for (auto it = unstable_writes.begin(); it != unstable_writes.end(); it++, i++) - { - vers[i] = { - .oid = it->first, - .version = it->second, - }; - } - unstable_writes.clear(); - op->callback = [this, cur_op](blockstore_op_t *op) - { - secondary_op_callback(cur_op); - obj_ver_id *vers = (obj_ver_id*)op->buf; - delete[] vers; - }; - bs->enqueue_op(op); - } - else - { - secondary_op_callback(cur_op); - } - }; - bs->enqueue_op(&cur_op->bs_op); - return; + exec_sync_stab_all(cur_op); } else if (cur_op->op.hdr.opcode == OSD_OP_SHOW_CONFIG) { - // FIXME: Send the real config, not its source - std::string *cfg_str = new std::string(std::move(json11::Json(config).dump())); - cur_op->buf = cfg_str; - auto & cl = clients[cur_op->peer_fd]; - cl.write_state = CL_WRITE_READY; - write_ready_clients.push_back(cur_op->peer_fd); - make_reply(cur_op); - cl.completions.push_back(cur_op); - ringloop->wakeup(); - return; + exec_show_config(cur_op); } else if (cur_op->op.hdr.opcode == OSD_OP_READ) { @@ -532,200 +268,8 @@ void osd_t::enqueue_op(osd_op_t *cur_op) // - send read requests // - reconstruct result } - cur_op->bs_op.callback = [this, cur_op](blockstore_op_t* bs_op) { secondary_op_callback(cur_op); }; - cur_op->bs_op.opcode = (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ ? BS_OP_READ - : (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE ? BS_OP_WRITE - : (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_SYNC ? BS_OP_SYNC - : (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE ? BS_OP_STABLE - : (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_DELETE ? BS_OP_DELETE - : (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_LIST ? BS_OP_LIST - : -1)))))); - if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ || - cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE) - { - cur_op->bs_op.oid = cur_op->op.sec_rw.oid; - cur_op->bs_op.version = cur_op->op.sec_rw.version; - cur_op->bs_op.offset = cur_op->op.sec_rw.offset; - cur_op->bs_op.len = cur_op->op.sec_rw.len; - cur_op->bs_op.buf = cur_op->buf; - } - else if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_DELETE) - { - cur_op->bs_op.oid = cur_op->op.sec_del.oid; - cur_op->bs_op.version = cur_op->op.sec_del.version; - } - else if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE) - { - cur_op->bs_op.len = cur_op->op.sec_stab.len/sizeof(obj_ver_id); - cur_op->bs_op.buf = cur_op->buf; - } - else if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_LIST) - { - cur_op->bs_op.len = cur_op->op.sec_list.pgtotal; - cur_op->bs_op.offset = cur_op->op.sec_list.pgnum; - } - bs->enqueue_op(&cur_op->bs_op); -} - -void osd_t::send_replies() -{ - for (int i = 0; i < write_ready_clients.size(); i++) - { - int peer_fd = write_ready_clients[i]; - auto & cl = clients[peer_fd]; - io_uring_sqe* sqe = ringloop->get_sqe(); - if (!sqe) - { - write_ready_clients.erase(write_ready_clients.begin(), write_ready_clients.begin() + i); - return; - } - ring_data_t* data = ((ring_data_t*)sqe->user_data); - if (!cl.write_buf) - { - // pick next command - cl.write_op = cl.completions.front(); - cl.completions.pop_front(); - if (cl.write_op->op_type == OSD_OP_OUT) - { - cl.write_buf = &cl.write_op->op_buf; - cl.write_remaining = OSD_PACKET_SIZE; - cl.write_state = CL_WRITE_REPLY; - } - else - { - cl.write_buf = &cl.write_op->reply_buf; - cl.write_remaining = OSD_PACKET_SIZE; - cl.write_state = CL_WRITE_REPLY; - } - } - cl.write_iov.iov_base = cl.write_buf; - cl.write_iov.iov_len = cl.write_remaining; - cl.write_msg.msg_iov = &cl.write_iov; - cl.write_msg.msg_iovlen = 1; - data->callback = [this, peer_fd](ring_data_t *data) { handle_send(data, peer_fd); }; - my_uring_prep_sendmsg(sqe, peer_fd, &cl.write_msg, 0); - cl.write_state = cl.write_state | SQE_SENT; - } - write_ready_clients.clear(); -} - -void osd_t::make_reply(osd_op_t *op) -{ - op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; - op->reply.hdr.id = op->op.hdr.id; - op->reply.hdr.opcode = op->op.hdr.opcode; - if (op->op.hdr.opcode == OSD_OP_SHOW_CONFIG) - { - std::string *str = (std::string*)op->buf; - op->reply.hdr.retval = str->size()+1; - } else { - op->reply.hdr.retval = op->bs_op.retval; - if (op->op.hdr.opcode == OSD_OP_SECONDARY_LIST) - op->reply.sec_list.stable_count = op->bs_op.version; - } -} - -void osd_t::handle_send(ring_data_t *data, int peer_fd) -{ - auto cl_it = clients.find(peer_fd); - if (cl_it != clients.end()) - { - auto & cl = cl_it->second; - if (data->res < 0 && data->res != -EAGAIN) - { - // this is a client socket, so don't panic. just disconnect it - printf("Client %d socket write error: %d (%s). Disconnecting client\n", peer_fd, -data->res, strerror(-data->res)); - stop_client(peer_fd); - return; - } - cl.write_state = cl.write_state & ~SQE_SENT; - if (data->res > 0) - { - cl.write_remaining -= data->res; - cl.write_buf += data->res; - if (cl.write_remaining <= 0) - { - cl.write_buf = NULL; - osd_op_t *cur_op = cl.write_op; - if (cl.write_state == CL_WRITE_REPLY) - { - // Send data - if (cur_op->op_type == OSD_OP_IN) - { - if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ && - cur_op->reply.hdr.retval > 0) - { - cl.write_buf = cur_op->buf; - cl.write_remaining = cur_op->reply.hdr.retval; - cl.write_state = CL_WRITE_DATA; - } - else if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_LIST && - cur_op->reply.hdr.retval > 0) - { - cl.write_buf = cur_op->buf; - cl.write_remaining = cur_op->reply.hdr.retval * sizeof(obj_ver_id); - cl.write_state = CL_WRITE_DATA; - } - else if (cur_op->op.hdr.opcode == OSD_OP_SHOW_CONFIG && - cur_op->reply.hdr.retval > 0) - { - cl.write_buf = (void*)((std::string*)cur_op->buf)->c_str(); - cl.write_remaining = cur_op->reply.hdr.retval; - cl.write_state = CL_WRITE_DATA; - } - else - { - goto op_done; - } - } - else - { - if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE) - { - cl.write_buf = cur_op->buf; - cl.write_remaining = cur_op->op.sec_rw.len; - cl.write_state = CL_WRITE_DATA; - } - else if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE) - { - cl.write_buf = cur_op->buf; - cl.write_remaining = cur_op->op.sec_stab.len; - cl.write_state = CL_WRITE_DATA; - } - else if (cur_op->op.hdr.opcode == OSD_OP_WRITE) - { - cl.write_buf = cur_op->buf; - cl.write_remaining = cur_op->op.rw.len; - cl.write_state = CL_WRITE_DATA; - } - else - { - goto op_done; - } - } - } - else if (cl.write_state == CL_WRITE_DATA) - { - op_done: - // Done - if (cur_op->op_type == OSD_OP_IN) - { - delete cur_op; - } - else - { - cl.sent_ops[cl.write_op->op.hdr.id] = cl.write_op; - } - cl.write_op = NULL; - cl.write_state = cl.completions.size() > 0 ? CL_WRITE_READY : 0; - } - } - } - if (cl.write_state != 0) - { - write_ready_clients.push_back(peer_fd); - } + exec_secondary(cur_op); } } diff --git a/osd.h b/osd.h index 2c2f975b4..a231999e8 100644 --- a/osd.h +++ b/osd.h @@ -24,6 +24,15 @@ #define OSD_OP_IN 0 #define OSD_OP_OUT 1 +#define CL_READ_OP 1 +#define CL_READ_DATA 2 +#define CL_READ_REPLY_DATA 3 +#define SQE_SENT 0x100l +#define CL_WRITE_READY 1 +#define CL_WRITE_REPLY 2 +#define CL_WRITE_DATA 3 +#define MAX_EPOLL_EVENTS 16 + struct osd_op_t { int op_type; @@ -171,7 +180,10 @@ class osd_t void handle_send(ring_data_t *data, int peer_fd); void handle_reply(osd_op_t *cur_op); - void enqueue_op(osd_op_t *cur_op); + void exec_op(osd_op_t *cur_op); + void exec_sync_stab_all(osd_op_t *cur_op); + void exec_show_config(osd_op_t *cur_op); + void exec_secondary(osd_op_t *cur_op); void secondary_op_callback(osd_op_t *cur_op); public: osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringloop); diff --git a/osd_exec_secondary.cpp b/osd_exec_secondary.cpp new file mode 100644 index 000000000..6bb4e6086 --- /dev/null +++ b/osd_exec_secondary.cpp @@ -0,0 +1,144 @@ +#include "osd.h" + +#include "json11/json11.hpp" + +void osd_t::handle_reply(osd_op_t *cur_op) +{ + +} + +void osd_t::secondary_op_callback(osd_op_t *cur_op) +{ + inflight_ops--; + auto cl_it = clients.find(cur_op->peer_fd); + if (cl_it != clients.end()) + { + auto & cl = cl_it->second; + if (cl.write_state == 0) + { + cl.write_state = CL_WRITE_READY; + write_ready_clients.push_back(cur_op->peer_fd); + } + make_reply(cur_op); + cl.completions.push_back(cur_op); + ringloop->wakeup(); + } + else + { + delete cur_op; + } +} + +void osd_t::exec_secondary(osd_op_t *cur_op) +{ + cur_op->bs_op.callback = [this, cur_op](blockstore_op_t* bs_op) { secondary_op_callback(cur_op); }; + cur_op->bs_op.opcode = (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ ? BS_OP_READ + : (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE ? BS_OP_WRITE + : (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_SYNC ? BS_OP_SYNC + : (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE ? BS_OP_STABLE + : (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_DELETE ? BS_OP_DELETE + : (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_LIST ? BS_OP_LIST + : -1)))))); + if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ || + cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE) + { + cur_op->bs_op.oid = cur_op->op.sec_rw.oid; + cur_op->bs_op.version = cur_op->op.sec_rw.version; + cur_op->bs_op.offset = cur_op->op.sec_rw.offset; + cur_op->bs_op.len = cur_op->op.sec_rw.len; + cur_op->bs_op.buf = cur_op->buf; + } + else if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_DELETE) + { + cur_op->bs_op.oid = cur_op->op.sec_del.oid; + cur_op->bs_op.version = cur_op->op.sec_del.version; + } + else if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE) + { + cur_op->bs_op.len = cur_op->op.sec_stab.len/sizeof(obj_ver_id); + cur_op->bs_op.buf = cur_op->buf; + } + else if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_LIST) + { + cur_op->bs_op.len = cur_op->op.sec_list.pgtotal; + cur_op->bs_op.offset = cur_op->op.sec_list.pgnum; + } + bs->enqueue_op(&cur_op->bs_op); +} + +void osd_t::exec_show_config(osd_op_t *cur_op) +{ + // FIXME: Send the real config, not its source + std::string *cfg_str = new std::string(std::move(json11::Json(config).dump())); + cur_op->buf = cfg_str; + auto & cl = clients[cur_op->peer_fd]; + cl.write_state = CL_WRITE_READY; + write_ready_clients.push_back(cur_op->peer_fd); + make_reply(cur_op); + cl.completions.push_back(cur_op); + ringloop->wakeup(); +} + +void osd_t::exec_sync_stab_all(osd_op_t *cur_op) +{ + // Sync and stabilize all objects + // This command is only valid for tests + // FIXME: Dedup between here & fio_engine + if (!allow_test_ops) + { + cur_op->bs_op.retval = -EINVAL; + secondary_op_callback(cur_op); + return; + } + cur_op->bs_op.opcode = BS_OP_SYNC; + cur_op->bs_op.callback = [this, cur_op](blockstore_op_t *op) + { + auto & unstable_writes = bs->get_unstable_writes(); + if (op->retval >= 0 && unstable_writes.size() > 0) + { + op->opcode = BS_OP_STABLE; + op->len = unstable_writes.size(); + obj_ver_id *vers = new obj_ver_id[op->len]; + op->buf = vers; + int i = 0; + for (auto it = unstable_writes.begin(); it != unstable_writes.end(); it++, i++) + { + vers[i] = { + .oid = it->first, + .version = it->second, + }; + } + unstable_writes.clear(); + op->callback = [this, cur_op](blockstore_op_t *op) + { + secondary_op_callback(cur_op); + obj_ver_id *vers = (obj_ver_id*)op->buf; + delete[] vers; + }; + bs->enqueue_op(op); + } + else + { + secondary_op_callback(cur_op); + } + }; + bs->enqueue_op(&cur_op->bs_op); +} + +void osd_t::make_reply(osd_op_t *op) +{ + op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; + op->reply.hdr.id = op->op.hdr.id; + op->reply.hdr.opcode = op->op.hdr.opcode; + if (op->op.hdr.opcode == OSD_OP_SHOW_CONFIG) + { + std::string *str = (std::string*)op->buf; + op->reply.hdr.retval = str->size()+1; + } + else + { + op->reply.hdr.retval = op->bs_op.retval; + if (op->op.hdr.opcode == OSD_OP_SECONDARY_LIST) + op->reply.sec_list.stable_count = op->bs_op.version; + } +} diff --git a/osd_read.cpp b/osd_read.cpp new file mode 100644 index 000000000..2c981a9aa --- /dev/null +++ b/osd_read.cpp @@ -0,0 +1,173 @@ +#include "osd.h" + +void osd_t::read_requests() +{ + for (int i = 0; i < read_ready_clients.size(); i++) + { + int peer_fd = read_ready_clients[i]; + auto & cl = clients[peer_fd]; + io_uring_sqe* sqe = ringloop->get_sqe(); + if (!sqe) + { + read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients.begin() + i); + return; + } + ring_data_t* data = ((ring_data_t*)sqe->user_data); + if (!cl.read_buf) + { + // no reads in progress + // so this is either a new command or a reply to a previously sent command + if (!cl.read_op) + { + cl.read_op = new osd_op_t; + cl.read_op->peer_fd = peer_fd; + } + cl.read_op->op_type = OSD_OP_IN; + cl.read_buf = &cl.read_op->op_buf; + cl.read_remaining = OSD_PACKET_SIZE; + cl.read_state = CL_READ_OP; + } + cl.read_iov.iov_base = cl.read_buf; + cl.read_iov.iov_len = cl.read_remaining; + cl.read_msg.msg_iov = &cl.read_iov; + cl.read_msg.msg_iovlen = 1; + data->callback = [this, peer_fd](ring_data_t *data) { handle_read(data, peer_fd); }; + my_uring_prep_recvmsg(sqe, peer_fd, &cl.read_msg, 0); + cl.reading = true; + cl.read_ready = false; + } + read_ready_clients.clear(); +} + +void osd_t::handle_read(ring_data_t *data, int peer_fd) +{ + auto cl_it = clients.find(peer_fd); + if (cl_it != clients.end()) + { + auto & cl = cl_it->second; + if (data->res < 0 && data->res != -EAGAIN) + { + // this is a client socket, so don't panic. just disconnect it + printf("Client %d socket read error: %d (%s). Disconnecting client\n", peer_fd, -data->res, strerror(-data->res)); + stop_client(peer_fd); + return; + } + cl.reading = false; + if (cl.read_ready) + { + read_ready_clients.push_back(peer_fd); + } + if (data->res > 0) + { + cl.read_remaining -= data->res; + cl.read_buf += data->res; + if (cl.read_remaining <= 0) + { + cl.read_buf = NULL; + if (cl.read_state == CL_READ_OP) + { + if (cl.read_op->op.hdr.magic == SECONDARY_OSD_REPLY_MAGIC) + { + handle_read_reply(&cl); + } + else + { + handle_read_op(&cl); + } + } + else if (cl.read_state == CL_READ_DATA) + { + // Operation is ready + exec_op(cl.read_op); + cl.read_op = NULL; + cl.read_state = 0; + } + else if (cl.read_state == CL_READ_REPLY_DATA) + { + // Reply is ready + auto req_it = cl.sent_ops.find(cl.read_reply_id); + osd_op_t *request = req_it->second; + cl.sent_ops.erase(req_it); + cl.read_reply_id = 0; + cl.read_state = 0; + handle_reply(request); + } + } + } + } +} + +void osd_t::handle_read_op(osd_client_t *cl) +{ + osd_op_t *cur_op = cl->read_op; + if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ || + cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE || + cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE) + { + // Allocate a buffer + cur_op->buf = memalign(512, cur_op->op.sec_rw.len); + } + else if (cur_op->op.hdr.opcode == OSD_OP_READ || + cur_op->op.hdr.opcode == OSD_OP_WRITE) + { + cur_op->buf = memalign(512, cur_op->op.rw.len); + } + if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE || + cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE || + cur_op->op.hdr.opcode == OSD_OP_WRITE) + { + // Read data + cl->read_buf = cur_op->buf; + cl->read_remaining = (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE + ? cur_op->op.sec_rw.len + : cur_op->op.rw.len); + cl->read_state = CL_READ_DATA; + } + else + { + // Operation is ready + cl->read_op = NULL; + cl->read_state = 0; + exec_op(cur_op); + } +} + +void osd_t::handle_read_reply(osd_client_t *cl) +{ + osd_op_t *cur_op = cl->read_op; + auto req_it = cl->sent_ops.find(cur_op->op.hdr.id); + if (req_it == cl->sent_ops.end()) + { + // Command out of sync. Drop connection + // FIXME This is probably a peer, so handle all previously sent operations carefully + stop_client(cl->peer_fd); + return; + } + osd_op_t *request = req_it->second; + memcpy(request->reply_buf, cur_op->op_buf, OSD_PACKET_SIZE); + if (request->reply.hdr.opcode == OSD_OP_SECONDARY_READ && + request->reply.hdr.retval > 0) + { + // Read data + // FIXME: request->buf must be allocated + cl->read_state = CL_READ_REPLY_DATA; + cl->read_reply_id = request->op.hdr.id; + cl->read_buf = request->buf; + cl->read_remaining = request->reply.hdr.retval; + } + else if (request->reply.hdr.opcode == OSD_OP_SECONDARY_LIST && + request->reply.hdr.retval > 0) + { + request->buf = memalign(512, sizeof(obj_ver_id) * request->reply.hdr.retval); + cl->read_state = CL_READ_REPLY_DATA; + cl->read_reply_id = request->op.hdr.id; + cl->read_buf = request->buf; + cl->read_remaining = sizeof(obj_ver_id) * request->reply.hdr.retval; + } + else + { + cl->read_state = 0; + cl->sent_ops.erase(req_it); + handle_reply(request); + } +} diff --git a/osd_send.cpp b/osd_send.cpp new file mode 100644 index 000000000..5685c3fc5 --- /dev/null +++ b/osd_send.cpp @@ -0,0 +1,146 @@ +#include "osd.h" + +void osd_t::send_replies() +{ + for (int i = 0; i < write_ready_clients.size(); i++) + { + int peer_fd = write_ready_clients[i]; + auto & cl = clients[peer_fd]; + io_uring_sqe* sqe = ringloop->get_sqe(); + if (!sqe) + { + write_ready_clients.erase(write_ready_clients.begin(), write_ready_clients.begin() + i); + return; + } + ring_data_t* data = ((ring_data_t*)sqe->user_data); + if (!cl.write_buf) + { + // pick next command + cl.write_op = cl.completions.front(); + cl.completions.pop_front(); + if (cl.write_op->op_type == OSD_OP_OUT) + { + cl.write_buf = &cl.write_op->op_buf; + cl.write_remaining = OSD_PACKET_SIZE; + cl.write_state = CL_WRITE_REPLY; + } + else + { + cl.write_buf = &cl.write_op->reply_buf; + cl.write_remaining = OSD_PACKET_SIZE; + cl.write_state = CL_WRITE_REPLY; + } + } + cl.write_iov.iov_base = cl.write_buf; + cl.write_iov.iov_len = cl.write_remaining; + cl.write_msg.msg_iov = &cl.write_iov; + cl.write_msg.msg_iovlen = 1; + data->callback = [this, peer_fd](ring_data_t *data) { handle_send(data, peer_fd); }; + my_uring_prep_sendmsg(sqe, peer_fd, &cl.write_msg, 0); + cl.write_state = cl.write_state | SQE_SENT; + } + write_ready_clients.clear(); +} + +void osd_t::handle_send(ring_data_t *data, int peer_fd) +{ + auto cl_it = clients.find(peer_fd); + if (cl_it != clients.end()) + { + auto & cl = cl_it->second; + if (data->res < 0 && data->res != -EAGAIN) + { + // this is a client socket, so don't panic. just disconnect it + printf("Client %d socket write error: %d (%s). Disconnecting client\n", peer_fd, -data->res, strerror(-data->res)); + stop_client(peer_fd); + return; + } + cl.write_state = cl.write_state & ~SQE_SENT; + if (data->res > 0) + { + cl.write_remaining -= data->res; + cl.write_buf += data->res; + if (cl.write_remaining <= 0) + { + cl.write_buf = NULL; + osd_op_t *cur_op = cl.write_op; + if (cl.write_state == CL_WRITE_REPLY) + { + // Send data + if (cur_op->op_type == OSD_OP_IN) + { + if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ && + cur_op->reply.hdr.retval > 0) + { + cl.write_buf = cur_op->buf; + cl.write_remaining = cur_op->reply.hdr.retval; + cl.write_state = CL_WRITE_DATA; + } + else if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_LIST && + cur_op->reply.hdr.retval > 0) + { + cl.write_buf = cur_op->buf; + cl.write_remaining = cur_op->reply.hdr.retval * sizeof(obj_ver_id); + cl.write_state = CL_WRITE_DATA; + } + else if (cur_op->op.hdr.opcode == OSD_OP_SHOW_CONFIG && + cur_op->reply.hdr.retval > 0) + { + cl.write_buf = (void*)((std::string*)cur_op->buf)->c_str(); + cl.write_remaining = cur_op->reply.hdr.retval; + cl.write_state = CL_WRITE_DATA; + } + else + { + goto op_done; + } + } + else + { + if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE) + { + cl.write_buf = cur_op->buf; + cl.write_remaining = cur_op->op.sec_rw.len; + cl.write_state = CL_WRITE_DATA; + } + else if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE) + { + cl.write_buf = cur_op->buf; + cl.write_remaining = cur_op->op.sec_stab.len; + cl.write_state = CL_WRITE_DATA; + } + else if (cur_op->op.hdr.opcode == OSD_OP_WRITE) + { + cl.write_buf = cur_op->buf; + cl.write_remaining = cur_op->op.rw.len; + cl.write_state = CL_WRITE_DATA; + } + else + { + goto op_done; + } + } + } + else if (cl.write_state == CL_WRITE_DATA) + { + op_done: + // Done + if (cur_op->op_type == OSD_OP_IN) + { + delete cur_op; + } + else + { + cl.sent_ops[cl.write_op->op.hdr.id] = cl.write_op; + } + cl.write_op = NULL; + cl.write_state = cl.completions.size() > 0 ? CL_WRITE_READY : 0; + } + } + } + if (cl.write_state != 0) + { + write_ready_clients.push_back(peer_fd); + } + } +}