diff --git a/Makefile b/Makefile index 52da996bd..c9430e5a3 100644 --- a/Makefile +++ b/Makefile @@ -24,7 +24,7 @@ libblockstore.so: $(BLOCKSTORE_OBJS) libfio_blockstore.so: ./libblockstore.so fio_engine.cpp json11.o g++ $(CXXFLAGS) -shared -o libfio_blockstore.so fio_engine.cpp json11.o ./libblockstore.so -ltcmalloc_minimal -luring -OSD_OBJS := osd.o osd_exec_secondary.o osd_receive.o osd_send.o osd_peering.o osd_peering_pg.o osd_primary.o json11.o +OSD_OBJS := osd.o osd_exec_secondary.o osd_receive.o osd_send.o osd_peering.o osd_peering_pg.o osd_primary.o json11.o timerfd_interval.o osd_exec_secondary.o: osd_exec_secondary.cpp osd.h osd_ops.h g++ $(CXXFLAGS) -c -o $@ $< osd_receive.o: osd_receive.cpp osd.h osd_ops.h diff --git a/osd.cpp b/osd.cpp index e028fd5e1..8bfcd1fff 100644 --- a/osd.cpp +++ b/osd.cpp @@ -12,6 +12,7 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo this->config = config; this->bs = bs; this->ringloop = ringloop; + this->tick_tfd = new timerfd_interval(ringloop, 1, []() {}); this->bs_block_size = bs->get_block_size(); // FIXME: use bitmap granularity instead this->bs_disk_alignment = bs->get_disk_alignment(); @@ -85,6 +86,7 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo osd_t::~osd_t() { + delete tick_tfd; ringloop->unregister_consumer(consumer); close(epoll_fd); close(listen_fd); @@ -229,7 +231,10 @@ void osd_t::stop_client(int peer_fd) auto it = clients.find(peer_fd); if (it->second.osd_num) { + // FIXME cancel outbound operations osd_peer_fds.erase(it->second.osd_num); + repeer_pgs(it->second.osd_num, false); + peering_state |= OSD_PEERING_PEERS; } for (auto rit = read_ready_clients.begin(); rit != read_ready_clients.end(); rit++) { diff --git a/osd.h b/osd.h index a11223c02..cae269abe 100644 --- a/osd.h +++ b/osd.h @@ -14,6 +14,7 @@ #include "blockstore.h" #include "ringloop.h" +#include "timerfd_interval.h" #include "osd_ops.h" #include "osd_peering_pg.h" @@ -34,6 +35,8 @@ #define PEER_CONNECTING 1 #define PEER_CONNECTED 2 +#define OSD_PEERING_PEERS 1 +#define OSD_PEERING_PGS 2 //#define OSD_STUB @@ -186,6 +189,7 @@ class osd_t blockstore_t *bs; uint32_t bs_block_size, bs_disk_alignment; ring_loop_t *ringloop; + timerfd_interval *tick_tfd; int wait_state = 0; int epoll_fd = 0; @@ -217,6 +221,7 @@ class osd_t osd_peer_def_t parse_peer(std::string peer); void init_primary(); void handle_peers(); + void repeer_pgs(osd_num_t osd_num, bool is_connected); void start_pg_peering(int i); // op execution @@ -235,8 +240,8 @@ class osd_t void make_primary_reply(osd_op_t *op); void finish_primary_op(osd_op_t *cur_op, int retval); void handle_primary_read_subop(osd_op_t *cur_op, int ok); - int extend_missing_stripes(osd_read_stripe_t *stripes, osd_num_t *target_set, int minsize, int size); - void submit_read_subops(int read_pg_size, const uint64_t* target_set, osd_op_t *cur_op); + int extend_missing_stripes(osd_read_stripe_t *stripes, osd_num_t *osd_set, int minsize, int size); + void submit_read_subops(int read_pg_size, const uint64_t* osd_set, osd_op_t *cur_op); public: osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringloop); ~osd_t(); diff --git a/osd_peering.cpp b/osd_peering.cpp index 104112309..fd61ea652 100644 --- a/osd_peering.cpp +++ b/osd_peering.cpp @@ -17,12 +17,13 @@ void osd_t::init_primary() throw std::runtime_error("peer1 and peer2 osd numbers are the same"); pgs.push_back((pg_t){ .state = PG_OFFLINE, - .pg_cursize = 2, // or 3 + .pg_cursize = 0, .pg_num = 1, - .target_set = { 1, 0, 3 }, // or { 1, 2, 3 } + .target_set = { 1, 2, 3 }, + .cur_set = { 1, 0, 0 }, }); pg_count = 1; - peering_state = 1; + peering_state = OSD_PEERING_PEERS; } osd_peer_def_t osd_t::parse_peer(std::string peer) @@ -122,19 +123,19 @@ void osd_t::handle_connect_result(int peer_fd) } // Peering loop -// Ideally: Connect -> Ask & check config -> Start PG peering void osd_t::handle_peers() { - if (peering_state & 1) + if (peering_state & OSD_PEERING_PEERS) { for (int i = 0; i < peers.size(); i++) { if (osd_peer_fds.find(peers[i].osd_num) == osd_peer_fds.end() && - time(NULL) - peers[i].last_connect_attempt > 5) + time(NULL) - peers[i].last_connect_attempt > 5) // FIXME hardcode 5 { peers[i].last_connect_attempt = time(NULL); connect_peer(peers[i].osd_num, peers[i].addr.c_str(), peers[i].port, [this](osd_num_t osd_num, int peer_fd) { + // FIXME: Check peer config after connecting if (peer_fd < 0) { printf("Failed to connect to peer OSD %lu: %s\n", osd_num, strerror(-peer_fd)); @@ -144,116 +145,245 @@ void osd_t::handle_peers() int i; for (i = 0; i < peers.size(); i++) { - auto it = osd_peer_fds.find(peers[i].osd_num); - if (it == osd_peer_fds.end() || clients[it->second].peer_state != PEER_CONNECTED) - { + if (osd_peer_fds.find(peers[i].osd_num) == osd_peer_fds.end()) break; - } } if (i >= peers.size()) { - // Start PG peering - pgs[0].state = PG_PEERING; - pgs[0].state_dict.clear(); - pgs[0].obj_states.clear(); - pgs[0].ver_override.clear(); - if (pgs[0].peering_state) - delete pgs[0].peering_state; - peering_state = 2; - ringloop->wakeup(); + // Connected to all peers + peering_state = peering_state & ~OSD_PEERING_PEERS; } + repeer_pgs(osd_num, true); }); } } } - if (peering_state & 2) + if (peering_state & OSD_PEERING_PGS) { + bool still_doing_pgs = false; for (int i = 0; i < pgs.size(); i++) { if (pgs[i].state == PG_PEERING) { - if (!pgs[i].peering_state) - { - start_pg_peering(i); - } - else if (pgs[i].peering_state->list_done >= 3) + if (!pgs[i].peering_state->list_ops.size()) { pgs[i].calc_object_states(); - peering_state = 0; + } + else + { + still_doing_pgs = true; } } } + if (!still_doing_pgs) + { + // Done all PGs + peering_state = peering_state & ~OSD_PEERING_PGS; + } } } -void osd_t::start_pg_peering(int pg_idx) +void osd_t::repeer_pgs(osd_num_t osd_num, bool is_connected) { - // FIXME: Set PG_INCOMPLETE if incomplete - auto & pg = pgs[pg_idx]; - auto ps = pg.peering_state = new pg_peering_state_t(); + // Re-peer affected PGs + // FIXME: We shouldn't rely just on target_set. Other OSDs may also contain PG data. + osd_num_t real_osd = (is_connected ? osd_num : 0); + for (int i = 0; i < pgs.size(); i++) { - osd_num_t osd_num = this->osd_num; - osd_op_t *op = new osd_op_t(); - op->op_type = 0; - op->peer_fd = 0; - op->bs_op.opcode = BS_OP_LIST; - op->bs_op.callback = [ps, op, osd_num](blockstore_op_t *bs_op) + bool repeer = false; + for (int r = 0; r < pgs[i].target_set.size(); r++) { - if (op->bs_op.retval < 0) + if (pgs[i].target_set[r] == osd_num && + pgs[i].cur_set[r] != real_osd) { - throw std::runtime_error("OP_LIST failed"); + pgs[i].cur_set[r] = real_osd; + repeer = true; + break; } - printf( - "Got object list from OSD %lu (local): %d objects (%lu of them stable)\n", - osd_num, bs_op->retval, bs_op->version - ); - ps->list_results[osd_num] = { - .buf = (obj_ver_id*)op->bs_op.buf, - .total_count = (uint64_t)op->bs_op.retval, - .stable_count = op->bs_op.version, - }; - ps->list_done++; - delete op; - }; - bs->enqueue_op(&op->bs_op); - } - for (int i = 0; i < peers.size(); i++) - { - osd_num_t osd_num = peers[i].osd_num; - auto & cl = clients[osd_peer_fds[peers[i].osd_num]]; - osd_op_t *op = new osd_op_t(); - op->op_type = OSD_OP_OUT; - op->peer_fd = cl.peer_fd; - op->op = { - .sec_list = { - .header = { - .magic = SECONDARY_OSD_OP_MAGIC, - .id = 1, - .opcode = OSD_OP_SECONDARY_LIST, - }, - .pgnum = pg.pg_num, - .pgtotal = pg_count, - }, - }; - op->callback = [ps, osd_num](osd_op_t *op) + } + if (repeer) { - if (op->reply.hdr.retval < 0) - { - throw std::runtime_error("OP_LIST failed"); - } - printf( - "Got object list from OSD %lu: %ld objects (%lu of them stable)\n", - osd_num, op->reply.hdr.retval, op->reply.sec_list.stable_count - ); - ps->list_results[osd_num] = { - .buf = (obj_ver_id*)op->buf, - .total_count = (uint64_t)op->reply.hdr.retval, - .stable_count = op->reply.sec_list.stable_count, - }; - op->buf = NULL; - ps->list_done++; - delete op; - }; - outbox_push(cl, op); + // Repeer this pg + printf("Repeer PG %d because of OSD %lu\n", i, osd_num); + start_pg_peering(i); + peering_state |= OSD_PEERING_PGS; + } } } + +// Repeer on each connect/disconnect peer event +void osd_t::start_pg_peering(int pg_idx) +{ + auto & pg = pgs[pg_idx]; + pg.state = PG_PEERING; + pg.state_dict.clear(); + pg.obj_states.clear(); + pg.ver_override.clear(); + pg.pg_cursize = 0; + for (int role = 0; role < pg.cur_set.size(); role++) + { + if (pg.cur_set[role] != 0) + { + pg.pg_cursize++; + } + } + if (pg.pg_cursize < pg.pg_minsize) + { + pg.state = PG_INCOMPLETE; + } + if (pg.peering_state) + { + // Adjust the peering operation that's still in progress + for (auto & p: pg.peering_state->list_ops) + { + int role; + for (role = 0; role < pg.cur_set.size(); role++) + { + if (pg.cur_set[role] == p.first) + break; + } + if (pg.state == PG_INCOMPLETE || role >= pg.cur_set.size()) + { + // Discard the result after completion, which, chances are, will be unsuccessful + auto list_op = p.second; + if (list_op->peer_fd == 0) + { + // Self + list_op->bs_op.callback = [list_op](blockstore_op_t *bs_op) + { + if (list_op->bs_op.buf) + free(list_op->bs_op.buf); + delete list_op; + }; + } + else + { + // Peer + list_op->callback = [](osd_op_t *list_op) + { + delete list_op; + }; + } + pg.peering_state->list_ops.erase(p.first); + } + } + for (auto & p: pg.peering_state->list_results) + { + int role; + for (role = 0; role < pg.cur_set.size(); role++) + { + if (pg.cur_set[role] == p.first) + break; + } + if (pg.state == PG_INCOMPLETE || role >= pg.cur_set.size()) + { + pg.peering_state->list_results.erase(p.first); + } + } + } + if (pg.state == PG_INCOMPLETE) + { + if (pg.peering_state) + { + delete pg.peering_state; + pg.peering_state = NULL; + } + printf("PG %d is incomplete\n", pg.pg_num); + return; + } + if (!pg.peering_state) + { + pg.peering_state = new pg_peering_state_t(); + } + auto ps = pg.peering_state; + for (int role = 0; role < pg.cur_set.size(); role++) + { + osd_num_t role_osd = pg.cur_set[role]; + if (!role_osd) + { + continue; + } + if (ps->list_ops.find(role_osd) != ps->list_ops.end() || + ps->list_results.find(role_osd) != ps->list_results.end()) + { + continue; + } + if (role_osd == this->osd_num) + { + // Self + osd_op_t *op = new osd_op_t(); + op->op_type = 0; + op->peer_fd = 0; + op->bs_op.opcode = BS_OP_LIST; + op->bs_op.callback = [ps, op, role_osd](blockstore_op_t *bs_op) + { + if (op->bs_op.retval < 0) + { + throw std::runtime_error("local OP_LIST failed"); + } + printf( + "Got object list from OSD %lu (local): %d objects (%lu of them stable)\n", + role_osd, bs_op->retval, bs_op->version + ); + ps->list_results[role_osd] = { + .buf = (obj_ver_id*)op->bs_op.buf, + .total_count = (uint64_t)op->bs_op.retval, + .stable_count = op->bs_op.version, + }; + ps->list_done++; + ps->list_ops.erase(role_osd); + delete op; + }; + bs->enqueue_op(&op->bs_op); + ps->list_ops[role_osd] = op; + } + else + { + // Peer + auto & cl = clients[osd_peer_fds[role_osd]]; + osd_op_t *op = new osd_op_t(); + op->op_type = OSD_OP_OUT; + op->peer_fd = cl.peer_fd; + op->op = { + .sec_list = { + .header = { + .magic = SECONDARY_OSD_OP_MAGIC, + .id = this->next_subop_id++, + .opcode = OSD_OP_SECONDARY_LIST, + }, + .pgnum = pg.pg_num, + .pgtotal = pg_count, + }, + }; + op->callback = [this, ps, role_osd](osd_op_t *op) + { + if (op->reply.hdr.retval < 0) + { + int peer_fd = op->peer_fd; + printf("Failed to get object list from OSD %lu, disconnecting peer\n", role_osd); + delete op; + ps->list_ops.erase(role_osd); + stop_client(peer_fd); + return; + } + printf( + "Got object list from OSD %lu: %ld objects (%lu of them stable)\n", + role_osd, op->reply.hdr.retval, op->reply.sec_list.stable_count + ); + ps->list_results[role_osd] = { + .buf = (obj_ver_id*)op->buf, + .total_count = (uint64_t)op->reply.hdr.retval, + .stable_count = op->reply.sec_list.stable_count, + }; + // so it doesn't get freed. FIXME: do it better + op->buf = NULL; + ps->list_done++; + ps->list_ops.erase(role_osd); + delete op; + }; + outbox_push(cl, op); + ps->list_ops[role_osd] = op; + } + } + ringloop->wakeup(); +} diff --git a/osd_peering_pg.cpp b/osd_peering_pg.cpp index fcf6f3008..8ac3dd7f4 100644 --- a/osd_peering_pg.cpp +++ b/osd_peering_pg.cpp @@ -18,7 +18,7 @@ void pg_t::remember_object(pg_obj_state_check_t &st, std::vector & else if (st.n_roles < pg.pg_minsize) { state = OBJ_INCOMPLETE; - pg.state = pg.state | PG_HAS_INCOMPLETE; + pg.state = pg.state | PG_HAS_UNFOUND; } else { @@ -164,6 +164,7 @@ void pg_t::calc_object_states() std::sort(all.begin(), all.end()); // Walk over it and check object states pg.clean_count = 0; + pg.state = 0; int replica = 0; pg_obj_state_check_t st; for (int i = 0; i < all.size(); i++) @@ -225,7 +226,7 @@ void pg_t::calc_object_states() { st.n_stable++; } - if (pg.target_set[replica] == all[i].osd_num) + if (pg.cur_set[replica] == all[i].osd_num) { st.n_matched++; } @@ -242,5 +243,17 @@ void pg_t::calc_object_states() st.obj_end = st.ver_end = all.size(); remember_object(st, all); } + if (pg.pg_cursize < pg.pg_size) + { + pg.state = pg.state | PG_DEGRADED; + } + printf( + "PG %u is active%s%s%s%s\n", pg.pg_num, + (pg.state & PG_DEGRADED) ? " + degraded" : "", + (pg.state & PG_HAS_UNFOUND) ? " + has_unfound" : "", + (pg.state & PG_HAS_DEGRADED) ? " + has_degraded" : "", + (pg.state & PG_HAS_MISPLACED) ? " + has_misplaced" : "", + (pg.state & PG_HAS_UNCLEAN) ? " + has_unclean" : "" + ); pg.state = pg.state | PG_ACTIVE; } diff --git a/osd_peering_pg.h b/osd_peering_pg.h index 00291bf13..9ade5320d 100644 --- a/osd_peering_pg.h +++ b/osd_peering_pg.h @@ -14,10 +14,11 @@ #define PG_INCOMPLETE (1<<2) #define PG_ACTIVE (1<<3) // Plus any of these: -#define PG_HAS_INCOMPLETE (1<<4) -#define PG_HAS_DEGRADED (1<<5) -#define PG_HAS_MISPLACED (1<<6) -#define PG_HAS_UNCLEAN (1<<7) +#define PG_DEGRADED (1<<4) +#define PG_HAS_UNFOUND (1<<5) +#define PG_HAS_DEGRADED (1<<6) +#define PG_HAS_MISPLACED (1<<7) +#define PG_HAS_UNCLEAN (1<<8) // OSD object states #define OBJ_CLEAN 0x01 @@ -40,7 +41,9 @@ typedef std::vector pg_osd_set_t; struct pg_osd_set_state_t { + // (role -> osd_num_t) map, as in pg.target_set and pg.cur_set std::vector read_target; + // full OSD set including additional OSDs where the object is misplaced pg_osd_set_t osd_set; uint64_t state = 0; uint64_t object_count = 0; @@ -53,9 +56,12 @@ struct pg_list_result_t uint64_t stable_count; }; +struct osd_op_t; + struct pg_peering_state_t { // osd_num -> list result + spp::sparse_hash_map list_ops; spp::sparse_hash_map list_results; int list_done = 0; }; @@ -103,9 +109,12 @@ struct pg_t uint64_t pg_cursize = 3, pg_size = 3, pg_minsize = 2; pg_num_t pg_num; uint64_t clean_count = 0; - // target_set = (role => osd_num or UINT64_MAX if missing). role numbers start with zero + // target_set is the "correct" peer OSD set for this PG std::vector target_set; - // moved object map. by default, each object is considered to reside on the target_set. + // cur_set is the current set of connected peer OSDs for this PG + // cur_set = (role => osd_num or UINT64_MAX if missing). role numbers begin with zero + std::vector cur_set; + // moved object map. by default, each object is considered to reside on the cur_set. // this map stores all objects that differ. // it may consume up to ~ (raw storage / object size) * 24 bytes in the worst case scenario // which is up to ~192 MB per 1 TB in the worst case scenario diff --git a/osd_primary.cpp b/osd_primary.cpp index 96a35148d..698da1eda 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -54,7 +54,9 @@ void osd_t::exec_primary_read(osd_op_t *cur_op) pg_num_t pg_num = (oid % pg_count); // FIXME +1 if (((end - 1) / (bs_block_size*2)) != oid.stripe || (start % bs_disk_alignment) || (end % bs_disk_alignment) || - pg_num > pgs.size()) + pg_num > pgs.size() || + // FIXME: Postpone operations in inactive PGs + !(pgs[pg_num].state & PG_ACTIVE)) { finish_primary_op(cur_op, -EINVAL); return; @@ -79,30 +81,28 @@ void osd_t::exec_primary_read(osd_op_t *cur_op) auto vo_it = pgs[pg_num].ver_override.find(oid); op_data->target_ver = vo_it != pgs[pg_num].ver_override.end() ? vo_it->second : UINT64_MAX; } - if (pgs[pg_num].pg_cursize == pgs[pg_num].pg_size) + if (pgs[pg_num].state == PG_ACTIVE) { // Fast happy-path - submit_read_subops(pgs[pg_num].pg_minsize, pgs[pg_num].target_set.data(), cur_op); + submit_read_subops(pgs[pg_num].pg_minsize, pgs[pg_num].cur_set.data(), cur_op); cur_op->send_list.push_back(cur_op->buf, cur_op->op.rw.len); } else { - // PG is degraded - uint64_t* target_set; - { - auto it = pgs[pg_num].obj_states.find(oid); - target_set = (it != pgs[pg_num].obj_states.end() - ? it->second->read_target.data() - : pgs[pg_num].target_set.data()); - } - if (extend_missing_stripes(stripes, target_set, pgs[pg_num].pg_minsize, pgs[pg_num].pg_size) < 0) + // PG may be degraded or have misplaced objects + spp::sparse_hash_map obj_states; + auto st_it = pgs[pg_num].obj_states.find(oid); + uint64_t* cur_set = (st_it != pgs[pg_num].obj_states.end() + ? st_it->second->read_target.data() + : pgs[pg_num].cur_set.data()); + if (extend_missing_stripes(stripes, cur_set, pgs[pg_num].pg_minsize, pgs[pg_num].pg_size) < 0) { free(op_data); finish_primary_op(cur_op, -EIO); return; } // Submit reads - submit_read_subops(pgs[pg_num].pg_size, target_set, cur_op); + submit_read_subops(pgs[pg_num].pg_size, cur_set, cur_op); op_data->pg_minsize = pgs[pg_num].pg_minsize; op_data->pg_size = pgs[pg_num].pg_size; op_data->degraded = 1; @@ -158,11 +158,11 @@ void osd_t::handle_primary_read_subop(osd_op_t *cur_op, int ok) } } -int osd_t::extend_missing_stripes(osd_read_stripe_t *stripes, osd_num_t *target_set, int minsize, int size) +int osd_t::extend_missing_stripes(osd_read_stripe_t *stripes, osd_num_t *osd_set, int minsize, int size) { for (int role = 0; role < minsize; role++) { - if (stripes[role].end != 0 && target_set[role] == 0) + if (stripes[role].end != 0 && osd_set[role] == 0) { stripes[role].real_start = stripes[role].real_end = 0; // Stripe is missing. Extend read to other stripes. @@ -170,7 +170,7 @@ int osd_t::extend_missing_stripes(osd_read_stripe_t *stripes, osd_num_t *target_ int exist = 0; for (int j = 0; j < size; j++) { - if (target_set[j] != 0) + if (osd_set[j] != 0) { if (stripes[j].real_end == 0 || j >= minsize) { @@ -199,7 +199,7 @@ int osd_t::extend_missing_stripes(osd_read_stripe_t *stripes, osd_num_t *target_ return 0; } -void osd_t::submit_read_subops(int read_pg_size, const uint64_t* target_set, osd_op_t *cur_op) +void osd_t::submit_read_subops(int read_pg_size, const uint64_t* osd_set, osd_op_t *cur_op) { osd_primary_read_t *op_data = (osd_primary_read_t*)cur_op->op_data; osd_read_stripe_t *stripes = op_data->stripes; @@ -230,7 +230,7 @@ void osd_t::submit_read_subops(int read_pg_size, const uint64_t* target_set, osd { continue; } - auto role_osd_num = target_set[role]; + auto role_osd_num = osd_set[role]; if (role_osd_num != 0) { if (role_osd_num == this->osd_num) diff --git a/test.cpp b/test.cpp index fae4ae36a..92b3fa5c6 100644 --- a/test.cpp +++ b/test.cpp @@ -344,9 +344,9 @@ int main05(int argc, char *argv[]) .state = PG_PEERING, .pg_num = 1, .target_set = { 1, 2, 3 }, + .cur_set = { 1, 2, 3 }, .peering_state = new pg_peering_state_t(), }; - pg.peering_state->list_done = 3; for (uint64_t osd_num = 1; osd_num <= 3; osd_num++) { pg_list_result_t r = {