diff --git a/blockstore_impl.cpp b/blockstore_impl.cpp index c06111932..1f36e0ccd 100644 --- a/blockstore_impl.cpp +++ b/blockstore_impl.cpp @@ -62,6 +62,7 @@ bool blockstore_impl_t::is_stalled() // main event loop - produce requests void blockstore_impl_t::loop() { + // FIXME: initialized == 10 is ugly if (initialized != 10) { // read metadata, then journal @@ -89,6 +90,7 @@ void blockstore_impl_t::loop() delete journal_init_reader; journal_init_reader = NULL; initialized = 10; + ringloop->wakeup(); } } } @@ -168,6 +170,11 @@ void blockstore_impl_t::loop() { dequeue_op = dequeue_stable(op); } + else if ((op->opcode & BS_OP_TYPE_MASK) == BS_OP_LIST) + { + process_list(op); + dequeue_op = true; + } if (dequeue_op) { submit_queue.erase(op_ptr); @@ -304,13 +311,6 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op, bool first) op->callback(op); return; } - else if (type == BS_OP_LIST) - { - // List operation is processed synchronously - process_list(op); - op->callback(op); - return; - } // Call constructor without allocating memory. We'll call destructor before returning op back new ((void*)op->private_data) blockstore_op_private_t; PRIV(op)->wait_for = 0; @@ -397,4 +397,5 @@ void blockstore_impl_t::process_list(blockstore_op_t *op) } } } + FINISH_OP(op); } diff --git a/osd.h b/osd.h index f64bbca36..521c201da 100644 --- a/osd.h +++ b/osd.h @@ -52,6 +52,7 @@ struct osd_op_t }; blockstore_op_t bs_op; void *buf = NULL; + std::function callback; ~osd_op_t(); }; @@ -83,8 +84,8 @@ struct osd_client_t // Outbound operations sent to this client (which is probably an OSD peer) std::map sent_ops; - // Completed operations to send replies back to the client - std::deque completions; + // Outbound messages (replies or requests) + std::deque outbox; // Write state osd_op_t *write_op = NULL; @@ -141,9 +142,14 @@ namespace std #define OSD_HALF_STABLE 0x10000 #define OSD_NEEDS_ROLLBACK 0x20000 +class osd_t; + struct osd_pg_peering_state_t { + osd_t* self; + uint64_t pg_num; std::unordered_map list_ops; + int list_done = 0; }; struct osd_pg_t @@ -187,7 +193,7 @@ class osd_t std::map osd_peer_fds; std::vector pgs; - bool needs_peering = false; + int peering_state = 0; unsigned pg_count = 0; // client & peer I/O @@ -218,6 +224,7 @@ class osd_t void send_replies(); void make_reply(osd_op_t *op); void handle_send(ring_data_t *data, int peer_fd); + void outbox_push(osd_client_t & cl, osd_op_t *op); // peer handling (primary OSD logic) void connect_peer(unsigned osd_num, const char *peer_host, int peer_port, std::function callback); @@ -226,9 +233,9 @@ class osd_t osd_peer_def_t parse_peer(std::string peer); void init_primary(); void handle_peers(); + void start_pg_peering(int i); // op execution - void handle_reply(osd_op_t *cur_op); void exec_op(osd_op_t *cur_op); void exec_sync_stab_all(osd_op_t *cur_op); void exec_show_config(osd_op_t *cur_op); diff --git a/osd_exec_secondary.cpp b/osd_exec_secondary.cpp index deb8347d5..d7f68e323 100644 --- a/osd_exec_secondary.cpp +++ b/osd_exec_secondary.cpp @@ -2,11 +2,6 @@ #include "json11/json11.hpp" -void osd_t::handle_reply(osd_op_t *cur_op) -{ - -} - void osd_t::secondary_op_callback(osd_op_t *cur_op) { inflight_ops--; @@ -14,14 +9,8 @@ void osd_t::secondary_op_callback(osd_op_t *cur_op) if (cl_it != clients.end()) { auto & cl = cl_it->second; - if (cl.write_state == 0) - { - cl.write_state = CL_WRITE_READY; - write_ready_clients.push_back(cur_op->peer_fd); - } make_reply(cur_op); - cl.completions.push_back(cur_op); - ringloop->wakeup(); + outbox_push(cl, cur_op); } else { @@ -80,7 +69,7 @@ void osd_t::exec_show_config(osd_op_t *cur_op) cl.write_state = CL_WRITE_READY; write_ready_clients.push_back(cur_op->peer_fd); make_reply(cur_op); - cl.completions.push_back(cur_op); + cl.outbox.push_back(cur_op); ringloop->wakeup(); } diff --git a/osd_peering.cpp b/osd_peering.cpp index 5e72c0738..76a6ea086 100644 --- a/osd_peering.cpp +++ b/osd_peering.cpp @@ -18,7 +18,7 @@ void osd_t::init_primary() .object_map = spp::sparse_hash_map(), }); pg_count = 1; - needs_peering = true; + peering_state = 1; } osd_peer_def_t osd_t::parse_peer(std::string peer) @@ -120,7 +120,7 @@ void osd_t::handle_connect_result(int peer_fd) // Ideally: Connect -> Ask & check config -> Start PG peering void osd_t::handle_peers() { - if (needs_peering) + if (peering_state & 1) { for (int i = 0; i < peers.size(); i++) { @@ -131,27 +131,100 @@ void osd_t::handle_peers() connect_peer(peers[i].osd_num, peers[i].addr.c_str(), peers[i].port, [this](int peer_fd) { printf("Connected with peer OSD %lu (fd %d)\n", clients[peer_fd].osd_num, peer_fd); - // Restart PG peering - pgs[0].state = PG_PEERING; - pgs[0].acting_set_ids.clear(); - pgs[0].acting_sets.clear(); - pgs[0].object_map.clear(); - if (pgs[0].peering_state) - delete pgs[0].peering_state; - ringloop->wakeup(); + int i; + for (i = 0; i < peers.size(); i++) + { + auto it = osd_peer_fds.find(peers[i].osd_num); + if (it == osd_peer_fds.end() || clients[it->second].peer_state != PEER_CONNECTED) + { + break; + } + } + if (i >= peers.size()) + { + // Start PG peering + pgs[0].state = PG_PEERING; + pgs[0].acting_set_ids.clear(); + pgs[0].acting_sets.clear(); + pgs[0].object_map.clear(); + if (pgs[0].peering_state) + delete pgs[0].peering_state; + peering_state = 2; + ringloop->wakeup(); + } }); } } } - for (int i = 0; i < pgs.size(); i++) + if (peering_state & 2) { - if (pgs[i].state == PG_PEERING) + for (int i = 0; i < pgs.size(); i++) { - if (!pgs[i].peering_state) + if (pgs[i].state == PG_PEERING) { - pgs[i].peering_state = new osd_pg_peering_state_t(); - + if (!pgs[i].peering_state) + { + start_pg_peering(i); + } + else if (pgs[i].peering_state->list_done >= 3) + { + // FIXME + peering_state = 0; + } } } } } + +void osd_t::start_pg_peering(int pg_idx) +{ + auto & pg = pgs[pg_idx]; + auto ps = pg.peering_state = new osd_pg_peering_state_t(); + ps->self = this; + ps->pg_num = pg_idx; // FIXME probably shouldn't be pg_idx + { + osd_op_t *op = new osd_op_t(); + op->op_type = 0; + op->peer_fd = 0; + op->bs_op.opcode = BS_OP_LIST; + op->bs_op.callback = [ps, op](blockstore_op_t *bs_op) + { + printf( + "Got object list from OSD %lu (local): %d objects (%lu of them stable)\n", + ps->self->osd_num, bs_op->retval, bs_op->version + ); + ps->list_done++; + }; + pg.peering_state->list_ops[osd_num] = op; + bs->enqueue_op(&op->bs_op); + } + for (int i = 0; i < peers.size(); i++) + { + auto & cl = clients[osd_peer_fds[peers[i].osd_num]]; + osd_op_t *op = new osd_op_t(); + op->op_type = OSD_OP_OUT; + op->peer_fd = cl.peer_fd; + op->op = { + .sec_list = { + .header = { + .magic = SECONDARY_OSD_OP_MAGIC, + .id = 1, + .opcode = OSD_OP_SECONDARY_LIST, + }, + .pgnum = 1, + .pgtotal = 1, + }, + }; + op->callback = [ps](osd_op_t *op) + { + printf( + "Got object list from OSD %lu: %ld objects (%lu of them stable)\n", + ps->self->clients[op->peer_fd].osd_num, op->reply.hdr.retval, + op->reply.sec_list.stable_count + ); + ps->list_done++; + }; + pg.peering_state->list_ops[cl.osd_num] = op; + outbox_push(cl, op); + } +} diff --git a/osd_read.cpp b/osd_read.cpp index 2c981a9aa..82246b166 100644 --- a/osd_read.cpp +++ b/osd_read.cpp @@ -90,7 +90,7 @@ void osd_t::handle_read(ring_data_t *data, int peer_fd) cl.sent_ops.erase(req_it); cl.read_reply_id = 0; cl.read_state = 0; - handle_reply(request); + request->callback(request); } } } @@ -168,6 +168,6 @@ void osd_t::handle_read_reply(osd_client_t *cl) { cl->read_state = 0; cl->sent_ops.erase(req_it); - handle_reply(request); + request->callback(request); } } diff --git a/osd_send.cpp b/osd_send.cpp index 7b03a5a2b..cbed2a540 100644 --- a/osd_send.cpp +++ b/osd_send.cpp @@ -1,5 +1,16 @@ #include "osd.h" +void osd_t::outbox_push(osd_client_t & cl, osd_op_t *cur_op) +{ + if (cl.write_state == 0) + { + cl.write_state = CL_WRITE_READY; + write_ready_clients.push_back(cur_op->peer_fd); + } + cl.outbox.push_back(cur_op); + ringloop->wakeup(); +} + void osd_t::send_replies() { for (int i = 0; i < write_ready_clients.size(); i++) @@ -16,8 +27,8 @@ void osd_t::send_replies() if (!cl.write_buf) { // pick next command - cl.write_op = cl.completions.front(); - cl.completions.pop_front(); + cl.write_op = cl.outbox.front(); + cl.outbox.pop_front(); if (cl.write_op->op_type == OSD_OP_OUT) { cl.write_buf = &cl.write_op->op_buf; @@ -135,7 +146,7 @@ void osd_t::handle_send(ring_data_t *data, int peer_fd) cl.sent_ops[cl.write_op->op.hdr.id] = cl.write_op; } cl.write_op = NULL; - cl.write_state = cl.completions.size() > 0 ? CL_WRITE_READY : 0; + cl.write_state = cl.outbox.size() > 0 ? CL_WRITE_READY : 0; } } }