From 9fb2d3f840233c864e44596484f706afedfa39f2 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sun, 2 Feb 2020 00:05:56 +0300 Subject: [PATCH] Fill out the rest of the degraded read logic; now we need to make it a "coroutine" --- osd.h | 3 ++- osd_peering_pg.cpp | 11 +++++++++ osd_peering_pg.h | 1 + osd_primary.cpp | 61 ++++++++++++++++++++++++++++++++++++++-------- 4 files changed, 65 insertions(+), 11 deletions(-) diff --git a/osd.h b/osd.h index 0dae0bfe8..24188b61b 100644 --- a/osd.h +++ b/osd.h @@ -74,8 +74,8 @@ struct osd_client_t int peer_fd; int peer_state; std::function connect_callback; + // osd numbers begin with 1 uint64_t osd_num = 0; - //int in_flight_ops = 0; // Read state bool read_ready = false; @@ -122,6 +122,7 @@ class osd_t std::vector pgs; int peering_state = 0; unsigned pg_count = 0; + uint64_t next_subop_id = 1; // client & peer I/O diff --git a/osd_peering_pg.cpp b/osd_peering_pg.cpp index e0c1c9dfc..fcf6f3008 100644 --- a/osd_peering_pg.cpp +++ b/osd_peering_pg.cpp @@ -62,7 +62,18 @@ void pg_t::remember_object(pg_obj_state_check_t &st, std::vector & auto it = pg.state_dict.find(st.osd_set); if (it == pg.state_dict.end()) { + std::vector read_target; + read_target.resize(pg.pg_size); + for (int i = 0; i < pg.pg_size; i++) + { + read_target[i] = 0; + } + for (auto & o: st.osd_set) + { + read_target[o.role] = o.osd_num; + } pg.state_dict[st.osd_set] = { + .read_target = read_target, .osd_set = st.osd_set, .state = state, .object_count = 1, diff --git a/osd_peering_pg.h b/osd_peering_pg.h index 4602b9812..5e7b25e12 100644 --- a/osd_peering_pg.h +++ b/osd_peering_pg.h @@ -39,6 +39,7 @@ typedef std::vector pg_osd_set_t; struct pg_osd_set_state_t { + std::vector read_target; pg_osd_set_t osd_set; uint64_t state = 0; uint64_t object_count = 0; diff --git a/osd_primary.cpp b/osd_primary.cpp index 5a7150395..2a2e05f31 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -40,7 +40,7 @@ void osd_t::exec_primary_read(osd_op_t *cur_op) } } auto vo_it = pgs[pg_num].ver_override.find(oid); - uint64_t target_ver = vo_it != pgs[pg_num].ver_override.end() ? vo_it.second : UINT64_MAX; + uint64_t target_ver = vo_it != pgs[pg_num].ver_override.end() ? vo_it->second : UINT64_MAX; if (pgs[pg_num].pg_cursize == 3) { // Fast happy-path @@ -50,18 +50,22 @@ void osd_t::exec_primary_read(osd_op_t *cur_op) else { // PG is degraded + auto it = pgs[pg_num].obj_states.find(oid); + std::vector & target_set = (it != pgs[pg_num].obj_states.end() + ? it->second->read_target + : pgs[pg_num].target_set); uint64_t real_reads[pgs[pg_num].pg_size*2] = { 0 }; memcpy(real_reads, reads, sizeof(uint64_t)*pgs[pg_num].pg_minsize*2); for (int role = 0; role < pgs[pg_num].pg_minsize; role++) { - if (reads[role*2+1] != 0 && pgs[pg_num].target_set[role] == UINT64_MAX) + if (reads[role*2+1] != 0 && target_set[role] == 0) { // Stripe is missing. Extend read to other stripes. // We need at least pg_minsize stripes to recover the lost part. int exist = 0; for (int j = 0; j < pgs[pg_num].pg_size; j++) { - if (pgs[pg_num].target_set[j] != UINT64_MAX) + if (target_set[j] != 0) { if (real_reads[j*2+1] == 0 || j >= pgs[pg_num].pg_minsize) { @@ -80,6 +84,16 @@ void osd_t::exec_primary_read(osd_op_t *cur_op) } } } + if (exist < pgs[pg_num].pg_minsize) + { + // Object is unreadable + cur_op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; + cur_op->reply.hdr.id = cur_op->op.hdr.id; + cur_op->reply.hdr.opcode = cur_op->op.hdr.opcode; + cur_op->reply.hdr.retval = -EIO; + outbox_push(clients[cur_op->peer_fd], cur_op); + return; + } } } uint64_t pos[pgs[pg_num].pg_size]; @@ -98,23 +112,43 @@ void osd_t::exec_primary_read(osd_op_t *cur_op) // Submit reads osd_op_t read_ops[n_subops]; int subop = 0; + int errors = 0, done = 0; for (int role = 0; role < pgs[pg_num].pg_size; role++) { - // FIXME Take remapped objects into account - uint64_t role_osd_num = pgs[pg_num].target_set[role]; - if (role_osd_num != UINT64_MAX) + uint64_t role_osd_num = target_set[role]; + if (role_osd_num != 0) { if (role_osd_num == this->osd_num) { + read_ops[subop].bs_op = { + .opcode = BS_OP_READ, + .callback = [&](blockstore_op_t *op) + { + if (op->retval < op->len) + errors++; + else + done++; + // continue op + }, + .oid = { + .inode = oid.inode, + .stripe = oid.stripe | role, + }, + .version = target_ver, + .offset = real_reads[role*2], + .len = real_reads[role*2+1] - real_reads[role*2], + .buf = buf + pos[role], + }; + bs->enqueue_op(&read_ops[subop].bs_op); } else { read_ops[subop].op_type = OSD_OP_OUT; - read_ops[subop].peer_fd = osd_peer_fds.get(role_osd_num); + read_ops[subop].peer_fd = osd_peer_fds.at(role_osd_num); read_ops[subop].op.sec_rw = { .header = { .magic = SECONDARY_OSD_OP_MAGIC, - .id = next_op_id++, + .id = next_subop_id++, .opcode = OSD_OP_SECONDARY_READ, }, .oid = { @@ -126,7 +160,14 @@ void osd_t::exec_primary_read(osd_op_t *cur_op) .len = real_reads[role*2+1] - real_reads[role*2], }; read_ops[subop].buf = buf + pos[role]; - read_ops[subop].callback = NULL; + read_ops[subop].callback = [&](osd_op_t *osd_subop) + { + if (osd_subop->reply.hdr.retval < osd_subop->op.sec_rw.len) + errors++; + else + done++; + // continue op + }; } subop++; } @@ -134,7 +175,7 @@ void osd_t::exec_primary_read(osd_op_t *cur_op) // Reconstruct missing stripes for (int role = 0; role < pgs[pg_num].pg_minsize; role++) { - if (reads[role*2+1] != 0 && pgs[pg_num].target_set[role] == UINT64_MAX) + if (reads[role*2+1] != 0 && target_set[role] == 0) { int other = role == 0 ? 1 : 0; int parity = pgs[pg_num].pg_size-1;