diff --git a/Makefile b/Makefile index dcafd956..dc7041d4 100644 --- a/Makefile +++ b/Makefile @@ -20,5 +20,5 @@ test_allocator: test_allocator.cpp allocator.o g++ $(CXXFLAGS) -o test_allocator test_allocator.cpp allocator.o libfio_blockstore.so: fio_engine.cpp $(BLOCKSTORE_OBJS) g++ $(CXXFLAGS) -ltcmalloc_minimal -shared -luring -o libfio_blockstore.so fio_engine.cpp $(BLOCKSTORE_OBJS) -libfio_sec_osd.so: fio_sec_osd.cpp +libfio_sec_osd.so: fio_sec_osd.cpp osd_ops.h g++ $(CXXFLAGS) -ltcmalloc_minimal -shared -luring -o libfio_sec_osd.so fio_sec_osd.cpp diff --git a/fio_sec_osd.cpp b/fio_sec_osd.cpp index 0081d3ce..ac546648 100644 --- a/fio_sec_osd.cpp +++ b/fio_sec_osd.cpp @@ -154,7 +154,7 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io) union { osd_any_op_t op; - uint8_t op_buf[OSD_OP_PACKET_SIZE] = { 0 }; + uint8_t op_buf[OSD_PACKET_SIZE] = { 0 }; }; op.hdr.magic = SECONDARY_OSD_OP_MAGIC; @@ -195,7 +195,7 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io) bsd->op_n++; bsd->queue[n] = io; - if (write(bsd->connect_fd, op_buf, OSD_OP_PACKET_SIZE) != OSD_OP_PACKET_SIZE) + if (write(bsd->connect_fd, op_buf, OSD_PACKET_SIZE) != OSD_PACKET_SIZE) { perror("write"); exit(1); @@ -249,11 +249,11 @@ static int sec_getevents(struct thread_data *td, unsigned int min, unsigned int union { osd_any_reply_t reply; - uint8_t reply_buf[OSD_REPLY_PACKET_SIZE] = { 0 }; + uint8_t reply_buf[OSD_PACKET_SIZE] = { 0 }; }; while (bsd->completed.size() < min) { - read_blocking(bsd->connect_fd, reply_buf, OSD_REPLY_PACKET_SIZE); + read_blocking(bsd->connect_fd, reply_buf, OSD_PACKET_SIZE); if (reply.hdr.magic != SECONDARY_OSD_REPLY_MAGIC) { fprintf(stderr, "bad reply: magic = %lx instead of %lx\n", reply.hdr.magic, SECONDARY_OSD_REPLY_MAGIC); diff --git a/osd.cpp b/osd.cpp index 087bfad1..cf4903ae 100644 --- a/osd.cpp +++ b/osd.cpp @@ -10,11 +10,14 @@ #define CL_READ_OP 1 #define CL_READ_DATA 2 +#define CL_READ_REPLY_DATA 3 #define SQE_SENT 0x100l #define CL_WRITE_READY 1 #define CL_WRITE_REPLY 2 #define CL_WRITE_DATA 3 +// FIXME: Split into more files + osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringloop) { bind_address = config["bind_address"]; @@ -91,7 +94,8 @@ osd_op_t::~osd_op_t() if (buf) { // Note: reusing osd_op_t WILL currently lead to memory leaks - if (op.hdr.opcode == OSD_OP_SHOW_CONFIG) + if (op_type == OSD_OP_IN && + op.hdr.opcode == OSD_OP_SHOW_CONFIG) { std::string *str = (std::string*)buf; delete str; @@ -252,10 +256,16 @@ void osd_t::read_requests() ring_data_t* data = ((ring_data_t*)sqe->user_data); if (!cl.read_buf) { - // no reads in progress, so this is probably a new command - cl.read_op = new osd_op_t; + // no reads in progress + // so this is either a new command or a reply to a previously sent command + if (!cl.read_op) + { + cl.read_op = new osd_op_t; + cl.read_op->peer_fd = peer_fd; + } + cl.read_op->op_type = OSD_OP_IN; cl.read_buf = &cl.read_op->op_buf; - cl.read_remaining = OSD_OP_PACKET_SIZE; + cl.read_remaining = OSD_PACKET_SIZE; cl.read_state = CL_READ_OP; } cl.read_iov.iov_base = cl.read_buf; @@ -294,55 +304,120 @@ void osd_t::handle_read(ring_data_t *data, int peer_fd) cl.read_buf += data->res; if (cl.read_remaining <= 0) { - osd_op_t *cur_op = cl.read_op; cl.read_buf = NULL; if (cl.read_state == CL_READ_OP) { - if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ || - cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE || - cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE) + if (cl.read_op->op.hdr.magic == SECONDARY_OSD_REPLY_MAGIC) { - // Allocate a buffer - cur_op->buf = memalign(512, cur_op->op.sec_rw.len); - } - else if (cur_op->op.hdr.opcode == OSD_OP_READ || - cur_op->op.hdr.opcode == OSD_OP_WRITE) - { - cur_op->buf = memalign(512, cur_op->op.rw.len); - } - if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE || - cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE || - cur_op->op.hdr.opcode == OSD_OP_WRITE) - { - // Read data - cl.read_buf = cur_op->buf; - cl.read_remaining = (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE - ? cur_op->op.sec_rw.len - : cur_op->op.rw.len); - cl.read_state = CL_READ_DATA; + handle_read_reply(&cl); } else { - // Command is ready - cur_op->peer_fd = peer_fd; - enqueue_op(cur_op); - cl.read_op = NULL; - cl.read_state = 0; + handle_read_op(&cl); } } else if (cl.read_state == CL_READ_DATA) { - // Command is ready - cur_op->peer_fd = peer_fd; - enqueue_op(cur_op); + // Operation is ready + enqueue_op(cl.read_op); cl.read_op = NULL; cl.read_state = 0; } + else if (cl.read_state == CL_READ_REPLY_DATA) + { + // Reply is ready + auto req_it = cl.sent_ops.find(cl.read_reply_id); + osd_op_t *request = req_it->second; + cl.sent_ops.erase(req_it); + cl.read_reply_id = 0; + cl.read_state = 0; + handle_reply(request); + } } } } } +void osd_t::handle_read_op(osd_client_t *cl) +{ + osd_op_t *cur_op = cl->read_op; + if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ || + cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE || + cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE) + { + // Allocate a buffer + cur_op->buf = memalign(512, cur_op->op.sec_rw.len); + } + else if (cur_op->op.hdr.opcode == OSD_OP_READ || + cur_op->op.hdr.opcode == OSD_OP_WRITE) + { + cur_op->buf = memalign(512, cur_op->op.rw.len); + } + if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE || + cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE || + cur_op->op.hdr.opcode == OSD_OP_WRITE) + { + // Read data + cl->read_buf = cur_op->buf; + cl->read_remaining = (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE + ? cur_op->op.sec_rw.len + : cur_op->op.rw.len); + cl->read_state = CL_READ_DATA; + } + else + { + // Operation is ready + cl->read_op = NULL; + cl->read_state = 0; + enqueue_op(cur_op); + } +} + +void osd_t::handle_read_reply(osd_client_t *cl) +{ + osd_op_t *cur_op = cl->read_op; + auto req_it = cl->sent_ops.find(cur_op->op.hdr.id); + if (req_it == cl->sent_ops.end()) + { + // Command out of sync. Drop connection + // FIXME This is probably a peer, so handle all previously sent operations carefully + stop_client(cl->peer_fd); + return; + } + osd_op_t *request = req_it->second; + memcpy(request->reply_buf, cur_op->op_buf, OSD_PACKET_SIZE); + if (request->reply.hdr.opcode == OSD_OP_SECONDARY_READ && + request->reply.hdr.retval > 0) + { + // Read data + // FIXME: request->buf must be allocated + cl->read_state = CL_READ_REPLY_DATA; + cl->read_reply_id = request->op.hdr.id; + cl->read_buf = request->buf; + cl->read_remaining = request->reply.hdr.retval; + } + else if (request->reply.hdr.opcode == OSD_OP_SECONDARY_LIST && + request->reply.hdr.retval > 0) + { + request->buf = memalign(512, sizeof(obj_ver_id) * request->reply.hdr.retval); + cl->read_state = CL_READ_REPLY_DATA; + cl->read_reply_id = request->op.hdr.id; + cl->read_buf = request->buf; + cl->read_remaining = sizeof(obj_ver_id) * request->reply.hdr.retval; + } + else + { + cl->read_state = 0; + cl->sent_ops.erase(req_it); + handle_reply(request); + } +} + +void osd_t::handle_reply(osd_op_t *cur_op) +{ + +} + void osd_t::secondary_op_callback(osd_op_t *cur_op) { inflight_ops--; @@ -355,6 +430,7 @@ void osd_t::secondary_op_callback(osd_op_t *cur_op) cl.write_state = CL_WRITE_READY; write_ready_clients.push_back(cur_op->peer_fd); } + make_reply(cur_op); cl.completions.push_back(cur_op); ringloop->wakeup(); } @@ -438,10 +514,24 @@ void osd_t::enqueue_op(osd_op_t *cur_op) auto & cl = clients[cur_op->peer_fd]; cl.write_state = CL_WRITE_READY; write_ready_clients.push_back(cur_op->peer_fd); + make_reply(cur_op); cl.completions.push_back(cur_op); ringloop->wakeup(); return; } + else if (cur_op->op.hdr.opcode == OSD_OP_READ) + { + // Primary OSD also works with individual stripes, but they're twice the size of the blockstore's stripe + // - convert offset & len to stripe number + // - fail operation if offset & len span multiple stripes + // - calc stripe hash and determine PG + // - check if this is our PG + // - redirect or fail operation if not + // - determine whether we need to read A and B or just A or just B or A + parity or B + parity + // and determine read ranges for both objects + // - send read requests + // - reconstruct result + } cur_op->bs_op.callback = [this, cur_op](blockstore_op_t* bs_op) { secondary_op_callback(cur_op); }; cur_op->bs_op.opcode = (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ ? BS_OP_READ : (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE ? BS_OP_WRITE @@ -466,7 +556,7 @@ void osd_t::enqueue_op(osd_op_t *cur_op) } else if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE) { - cur_op->bs_op.len = cur_op->op.sec_stabilize.len/sizeof(obj_ver_id); + cur_op->bs_op.len = cur_op->op.sec_stab.len/sizeof(obj_ver_id); cur_op->bs_op.buf = cur_op->buf; } else if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_LIST) @@ -495,10 +585,18 @@ void osd_t::send_replies() // pick next command cl.write_op = cl.completions.front(); cl.completions.pop_front(); - make_reply(cl.write_op); - cl.write_buf = &cl.write_op->reply_buf; - cl.write_remaining = OSD_REPLY_PACKET_SIZE; - cl.write_state = CL_WRITE_REPLY; + if (cl.write_op->op_type == OSD_OP_OUT) + { + cl.write_buf = &cl.write_op->op_buf; + cl.write_remaining = OSD_PACKET_SIZE; + cl.write_state = CL_WRITE_REPLY; + } + else + { + cl.write_buf = &cl.write_op->reply_buf; + cl.write_remaining = OSD_PACKET_SIZE; + cl.write_state = CL_WRITE_REPLY; + } } cl.write_iov.iov_base = cl.write_buf; cl.write_iov.iov_len = cl.write_remaining; @@ -515,6 +613,7 @@ void osd_t::make_reply(osd_op_t *op) { op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; op->reply.hdr.id = op->op.hdr.id; + op->reply.hdr.opcode = op->op.hdr.opcode; if (op->op.hdr.opcode == OSD_OP_SHOW_CONFIG) { std::string *str = (std::string*)op->buf; @@ -553,37 +652,72 @@ void osd_t::handle_send(ring_data_t *data, int peer_fd) if (cl.write_state == CL_WRITE_REPLY) { // Send data - if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ && - cur_op->reply.hdr.retval > 0) + if (cur_op->op_type == OSD_OP_IN) { - cl.write_buf = cur_op->buf; - cl.write_remaining = cur_op->reply.hdr.retval; - cl.write_state = CL_WRITE_DATA; - } - else if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_LIST && - cur_op->reply.hdr.retval > 0) - { - cl.write_buf = cur_op->buf; - cl.write_remaining = cur_op->reply.hdr.retval * sizeof(obj_ver_id); - cl.write_state = CL_WRITE_DATA; - } - else if (cur_op->op.hdr.opcode == OSD_OP_SHOW_CONFIG && - cur_op->reply.hdr.retval > 0) - { - cl.write_buf = (void*)((std::string*)cur_op->buf)->c_str(); - cl.write_remaining = cur_op->reply.hdr.retval; - cl.write_state = CL_WRITE_DATA; + if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ && + cur_op->reply.hdr.retval > 0) + { + cl.write_buf = cur_op->buf; + cl.write_remaining = cur_op->reply.hdr.retval; + cl.write_state = CL_WRITE_DATA; + } + else if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_LIST && + cur_op->reply.hdr.retval > 0) + { + cl.write_buf = cur_op->buf; + cl.write_remaining = cur_op->reply.hdr.retval * sizeof(obj_ver_id); + cl.write_state = CL_WRITE_DATA; + } + else if (cur_op->op.hdr.opcode == OSD_OP_SHOW_CONFIG && + cur_op->reply.hdr.retval > 0) + { + cl.write_buf = (void*)((std::string*)cur_op->buf)->c_str(); + cl.write_remaining = cur_op->reply.hdr.retval; + cl.write_state = CL_WRITE_DATA; + } + else + { + goto op_done; + } } else { - goto op_done; + if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE) + { + cl.write_buf = cur_op->buf; + cl.write_remaining = cur_op->op.sec_rw.len; + cl.write_state = CL_WRITE_DATA; + } + else if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE) + { + cl.write_buf = cur_op->buf; + cl.write_remaining = cur_op->op.sec_stab.len; + cl.write_state = CL_WRITE_DATA; + } + else if (cur_op->op.hdr.opcode == OSD_OP_WRITE) + { + cl.write_buf = cur_op->buf; + cl.write_remaining = cur_op->op.rw.len; + cl.write_state = CL_WRITE_DATA; + } + else + { + goto op_done; + } } } else if (cl.write_state == CL_WRITE_DATA) { op_done: // Done - delete cur_op; + if (cur_op->op_type == OSD_OP_IN) + { + delete cur_op; + } + else + { + cl.sent_ops[cl.write_op->op.hdr.id] = cl.write_op; + } cl.write_op = NULL; cl.write_state = cl.completions.size() > 0 ? CL_WRITE_READY : 0; } diff --git a/osd.h b/osd.h index a4693c38..2c2f975b 100644 --- a/osd.h +++ b/osd.h @@ -16,21 +16,27 @@ #include "ringloop.h" #include "osd_ops.h" +#include "sparsepp/sparsepp/spp.h" + #define STRIPE_NUM(stripe) ((stripe) >> 4) #define STRIPE_REPLICA(stripe) ((stripe) & 0xf) +#define OSD_OP_IN 0 +#define OSD_OP_OUT 1 + struct osd_op_t { + int op_type; int peer_fd; union { osd_any_op_t op; - uint8_t op_buf[OSD_OP_PACKET_SIZE] = { 0 }; + uint8_t op_buf[OSD_PACKET_SIZE] = { 0 }; }; union { osd_any_reply_t reply; - uint8_t reply_buf[OSD_REPLY_PACKET_SIZE] = { 0 }; + uint8_t reply_buf[OSD_PACKET_SIZE] = { 0 }; }; blockstore_op_t bs_op; void *buf = NULL; @@ -49,12 +55,16 @@ struct osd_client_t bool read_ready = false; bool reading = false; osd_op_t *read_op = NULL; + int read_reply_id = 0; iovec read_iov; msghdr read_msg; void *read_buf = NULL; int read_remaining = 0; int read_state = 0; + // Outbound operations sent to this client (which is probably an OSD peer) + std::map sent_ops; + // Completed operations to send replies back to the client std::deque completions; @@ -67,17 +77,71 @@ struct osd_client_t int write_state = 0; }; +struct osd_pg_role_t +{ + int role; + uint64_t osd_num; +}; + +typedef std::vector osd_acting_set_t; + +namespace std +{ + template<> struct hash + { + inline size_t operator()(const osd_acting_set_t &s) const + { + size_t seed = 0; + for (int i = 0; i < s.size(); i++) + { + // Copy-pasted from spp::hash_combine() + seed ^= (s[i].role + 0xc6a4a7935bd1e995 + (seed << 6) + (seed >> 2)); + seed ^= (s[i].osd_num + 0xc6a4a7935bd1e995 + (seed << 6) + (seed >> 2)); + } + return seed; + } + }; +} + +#define PG_ST_OFFLINE 1 +#define PG_ST_PEERING 2 +#define PG_ST_INCOMPLETE 3 +#define PG_ST_DEGRADED 4 +#define PG_ST_MISPLACED 5 +#define PG_ST_ACTIVE 6 + +struct osd_pg_t +{ + int state; + unsigned num; + std::vector target_set; + // moved object map. by default, each object is considered to reside on the target_set. + // this map stores all objects that differ. + // this map may consume up to ~ (raw storage / object size) * 24 bytes in the worst case scenario + // which is up to ~192 MB per 1 TB in the worst case scenario + std::unordered_map acting_set_ids; + std::map acting_sets; + spp::sparse_hash_map object_map; +}; + class osd_t { // config + uint64_t osd_num = 0; blockstore_config_t config; std::string bind_address; int bind_port, listen_backlog; int client_queue_depth = 128; bool allow_test_ops = true; - // fields + // peer OSDs + + std::map osd_peer_fds; + std::vector pgs; + unsigned pg_count; + + // client & peer I/O bool stopping = false; int inflight_ops = 0; @@ -93,15 +157,21 @@ class osd_t std::vector read_ready_clients; std::vector write_ready_clients; + // methods + void loop(); int handle_epoll_events(); void stop_client(int peer_fd); void read_requests(); void handle_read(ring_data_t *data, int peer_fd); - void enqueue_op(osd_op_t *cur_op); + void handle_read_op(osd_client_t *cl); + void handle_read_reply(osd_client_t *cl); void send_replies(); void make_reply(osd_op_t *op); void handle_send(ring_data_t *data, int peer_fd); + + void handle_reply(osd_op_t *cur_op); + void enqueue_op(osd_op_t *cur_op); void secondary_op_callback(osd_op_t *cur_op); public: osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringloop); diff --git a/osd_client.cpp b/osd_client.cpp new file mode 100644 index 00000000..6ea07f17 --- /dev/null +++ b/osd_client.cpp @@ -0,0 +1,40 @@ +void slice() +{ + // Slice the request into blockstore requests to individual objects + // Primary OSD still operates individual stripes, except they're twice the size of the blockstore's stripe. + std::vector read_parts; + int block = bs->get_block_size(); + uint64_t stripe1 = cur_op->op.rw.offset / block / 2; + uint64_t stripe2 = (cur_op->op.rw.offset + cur_op->op.rw.len + block*2 - 1) / block / 2 - 1; + for (uint64_t s = stripe1; s <= stripe2; s++) + { + uint64_t start = s == stripe1 ? cur_op->op.rw.offset - stripe1*block*2 : 0; + uint64_t end = s == stripe2 ? cur_op->op.rw.offset + cur_op->op.rw.len - stripe2*block*2 : block*2; + if (start < block) + { + read_parts.push_back({ + .role = 1, + .oid = { + .inode = cur_op->op.rw.inode, + .stripe = (s << STRIPE_ROLE_BITS) | 1, + }, + .version = UINT64_MAX, + .offset = start, + .len = (block < end ? block : end) - start, + }); + } + if (end > block) + { + read_parts.push_back({ + .role = 2, + .oid = { + .inode = cur_op->op.rw.inode, + .stripe = (s << STRIPE_ROLE_BITS) | 2, + }, + .version = UINT64_MAX, + .offset = (start > block ? start-block : 0), + .len = end - (start > block ? start-block : 0), + }); + } + } +} diff --git a/osd_main.cpp b/osd_main.cpp index 3ffec848..7c4700f4 100644 --- a/osd_main.cpp +++ b/osd_main.cpp @@ -2,8 +2,8 @@ int main(int narg, char *args[]) { - if (sizeof(osd_any_op_t) >= OSD_OP_PACKET_SIZE || - sizeof(osd_any_reply_t) >= OSD_REPLY_PACKET_SIZE) + if (sizeof(osd_any_op_t) >= OSD_PACKET_SIZE || + sizeof(osd_any_reply_t) >= OSD_PACKET_SIZE) { perror("BUG: too small packet size"); return 1; diff --git a/osd_ops.h b/osd_ops.h index 4464e116..204ed36b 100644 --- a/osd_ops.h +++ b/osd_ops.h @@ -5,9 +5,8 @@ // Magic numbers #define SECONDARY_OSD_OP_MAGIC 0x2bd7b10325434553l #define SECONDARY_OSD_REPLY_MAGIC 0xbaa699b87b434553l -// Operation request headers and operation reply headers have fixed size after which comes data -#define OSD_OP_PACKET_SIZE 0x80 -#define OSD_REPLY_PACKET_SIZE 0x40 +// Operation request / reply headers have fixed size after which comes data +#define OSD_PACKET_SIZE 0x80 // Opcodes #define OSD_OP_MIN 1 #define OSD_OP_SECONDARY_READ 1 @@ -42,6 +41,8 @@ struct __attribute__((__packed__)) osd_reply_header_t uint64_t magic; // operation id uint64_t id; + // operation type + uint64_t opcode; // return value int64_t retval; }; @@ -127,10 +128,11 @@ struct __attribute__((__packed__)) osd_reply_secondary_list_t { osd_reply_header_t header; // stable object version count. header.retval = total object version count + // FIXME: maybe change to the number of bytes in the reply... uint64_t stable_count; }; -// read or write to the primary OSD +// read or write to the primary OSD (must be within individual stripe) struct __attribute__((__packed__)) osd_op_rw_t { osd_op_header_t header; @@ -153,7 +155,7 @@ union osd_any_op_t osd_op_secondary_rw_t sec_rw; osd_op_secondary_del_t sec_del; osd_op_secondary_sync_t sec_sync; - osd_op_secondary_stabilize_t sec_stabilize; + osd_op_secondary_stabilize_t sec_stab; osd_op_secondary_list_t sec_list; osd_op_show_config_t show_conf; osd_op_rw_t rw; @@ -165,7 +167,7 @@ union osd_any_reply_t osd_reply_secondary_rw_t sec_rw; osd_reply_secondary_del_t sec_del; osd_reply_secondary_sync_t sec_sync; - osd_reply_secondary_stabilize_t sec_stabilize; + osd_reply_secondary_stabilize_t sec_stab; osd_reply_secondary_list_t sec_list; osd_reply_show_config_t show_conf; osd_reply_rw_t rw;