From 4f9b5286a01bef7eab663aae30328b5f6a8a908a Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Fri, 4 Sep 2020 22:17:44 +0300 Subject: [PATCH] Add replicated pool support to OSD logic ...in theory :-D now it needs some testing --- lp/mon-main.js | 2 +- lp/mon.js | 4 +- osd.h | 2 +- osd_peering.cpp | 4 +- osd_peering_pg.cpp | 71 ++++++++++++++++++++++--------- osd_primary.cpp | 80 +++++++++++++++++++++++++++------- osd_primary.h | 1 + osd_primary_subops.cpp | 97 +++++++++++++++++++++++++----------------- osd_rmw.cpp | 5 +++ pg_states.cpp | 8 ++-- pg_states.h | 4 +- 11 files changed, 195 insertions(+), 83 deletions(-) diff --git a/lp/mon-main.js b/lp/mon-main.js index 77a074e4..4e44bdce 100644 --- a/lp/mon-main.js +++ b/lp/mon-main.js @@ -15,7 +15,7 @@ for (let i = 2; i < process.argv.length; i++) if (!options.etcd_url) { - console.error('USAGE: '+process.argv[0]+' '+process.argv[1]+' --etcd_url "http://127.0.0.1:2379,..." --etcd_prefix "/rage" --etcd_start_timeout 5'); + console.error('USAGE: '+process.argv[0]+' '+process.argv[1]+' --etcd_url "http://127.0.0.1:2379,..." --etcd_prefix "/vitastor" --etcd_start_timeout 5'); process.exit(); } diff --git a/lp/mon.js b/lp/mon.js index c70f181a..667dfabb 100644 --- a/lp/mon.js +++ b/lp/mon.js @@ -136,7 +136,9 @@ class Mon /* : { : { primary: osd_num_t, - state: ("starting"|"peering"|"incomplete"|"active"|"stopping"|"offline"|"degraded"|"has_incomplete"|"has_degraded"|"has_misplaced"|"has_unclean"|"left_on_dead")[], + state: ("starting"|"peering"|"incomplete"|"active"|"stopping"|"offline"| + "degraded"|"has_incomplete"|"has_degraded"|"has_misplaced"|"has_unclean"| + "has_invalid"|"left_on_dead")[], } }, */ }, diff --git a/osd.h b/osd.h index 6e1a8bec..564a7f74 100644 --- a/osd.h +++ b/osd.h @@ -193,7 +193,7 @@ class osd_t void add_bs_subop_stats(osd_op_t *subop); void pg_cancel_write_queue(pg_t & pg, osd_op_t *first_op, object_id oid, int retval); void submit_primary_subops(int submit_type, uint64_t op_version, int pg_size, const uint64_t* osd_set, osd_op_t *cur_op); - void submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_osd_set_t & loc_set); + void submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, uint64_t pg_size, pg_osd_set_t & loc_set); void submit_primary_sync_subops(osd_op_t *cur_op); void submit_primary_stab_subops(osd_op_t *cur_op); diff --git a/osd_peering.cpp b/osd_peering.cpp index 5bb81161..cee57df8 100644 --- a/osd_peering.cpp +++ b/osd_peering.cpp @@ -26,7 +26,7 @@ void osd_t::handle_peers() degraded_objects += p.second.degraded_objects.size(); if ((p.second.state & (PG_ACTIVE | PG_HAS_UNCLEAN)) == (PG_ACTIVE | PG_HAS_UNCLEAN)) peering_state = peering_state | OSD_FLUSHING_PGS; - else + else if (p.second.state & PG_ACTIVE) peering_state = peering_state | OSD_RECOVERING; } else @@ -184,6 +184,7 @@ void osd_t::start_pg_peering(pg_t & pg) { pg.state = PG_INCOMPLETE; report_pg_state(pg); + return; } } } @@ -191,6 +192,7 @@ void osd_t::start_pg_peering(pg_t & pg) { pg.state = PG_INCOMPLETE; report_pg_state(pg); + return; } std::set cur_peers; for (auto pg_osd: pg.all_peers) diff --git a/osd_peering_pg.cpp b/osd_peering_pg.cpp index 655ece72..1926d6e9 100644 --- a/osd_peering_pg.cpp +++ b/osd_peering_pg.cpp @@ -33,6 +33,7 @@ struct obj_piece_ver_t struct pg_obj_state_check_t { pg_t *pg; + bool replicated = false; std::vector list; int list_pos; int obj_start = 0, obj_end = 0, ver_start = 0, ver_end = 0; @@ -41,7 +42,7 @@ struct pg_obj_state_check_t uint64_t last_ver = 0; uint64_t target_ver = 0; uint64_t n_copies = 0, has_roles = 0, n_roles = 0, n_stable = 0, n_mismatched = 0; - uint64_t n_unstable = 0, n_buggy = 0; + uint64_t n_unstable = 0, n_invalid = 0; pg_osd_set_t osd_set; int log_level; @@ -73,6 +74,12 @@ void pg_obj_state_check_t::walk() { finish_object(); } + if (pg->state & PG_HAS_INVALID) + { + // Stop PGs with "invalid" objects + pg->state = PG_INCOMPLETE | PG_HAS_INVALID; + return; + } if (pg->pg_cursize < pg->pg_size) { pg->state |= PG_DEGRADED; @@ -92,7 +99,7 @@ void pg_obj_state_check_t::start_object() target_ver = 0; ver_start = list_pos; has_roles = n_copies = n_roles = n_stable = n_mismatched = 0; - n_unstable = n_buggy = 0; + n_unstable = n_invalid = 0; } void pg_obj_state_check_t::handle_version() @@ -111,11 +118,11 @@ void pg_obj_state_check_t::handle_version() has_roles = n_copies = n_roles = n_stable = n_mismatched = 0; last_ver = list[list_pos].version; } - int replica = (list[list_pos].oid.stripe & STRIPE_MASK); + unsigned replica = (list[list_pos].oid.stripe & STRIPE_MASK); n_copies++; - if (replica >= pg->pg_size) + if (replicated && replica > 0 || replica >= pg->pg_size) { - n_buggy++; + n_invalid++; } else { @@ -123,14 +130,32 @@ void pg_obj_state_check_t::handle_version() { n_stable++; } - if (pg->cur_set[replica] != list[list_pos].osd_num) + if (replicated) { - n_mismatched++; + int i; + for (i = 0; i < pg->cur_set.size(); i++) + { + if (pg->cur_set[i] == list[list_pos].osd_num) + { + break; + } + } + if (i == pg->cur_set.size()) + { + n_mismatched++; + } } - if (!(has_roles & (1 << replica))) + else { - has_roles = has_roles | (1 << replica); - n_roles++; + if (pg->cur_set[replica] != list[list_pos].osd_num) + { + n_mismatched++; + } + if (!(has_roles & (1 << replica))) + { + has_roles = has_roles | (1 << replica); + n_roles++; + } } } } @@ -151,11 +176,14 @@ void pg_obj_state_check_t::finish_object() obj_end = list_pos; // Remember the decision uint64_t state = 0; - if (n_buggy > 0) + if (n_invalid > 0) { - state = OBJ_BUGGY; - // FIXME: bring pg offline - throw std::runtime_error("buggy object state"); + // It's not allowed to change the replication scheme for a pool other than by recreating it + // So we must bring the PG offline + state = OBJ_INCOMPLETE; + pg->state |= PG_HAS_INVALID; + pg->total_count++; + return; } if (n_unstable > 0) { @@ -201,7 +229,7 @@ void pg_obj_state_check_t::finish_object() { return; } - if (n_roles < pg->pg_minsize) + if (!replicated && n_roles < pg->pg_minsize) { if (log_level > 1) { @@ -221,7 +249,7 @@ void pg_obj_state_check_t::finish_object() } if (n_mismatched > 0) { - if (n_roles >= pg->pg_cursize && log_level > 1) + if (log_level > 1 && (replicated || n_roles >= pg->pg_cursize)) { printf("Object is misplaced: inode=%lu stripe=%lu version=%lu/%lu\n", oid.inode, oid.stripe, target_ver, max_ver); } @@ -234,14 +262,16 @@ void pg_obj_state_check_t::finish_object() { for (int i = obj_start; i < obj_end; i++) { - printf("v%lu present on: osd %lu, role %ld%s\n", list[i].version, list[i].osd_num, (list[i].oid.stripe & STRIPE_MASK), list[i].is_stable ? " (stable)" : ""); + printf("v%lu present on: osd %lu, role %ld%s\n", list[i].version, list[i].osd_num, + (list[i].oid.stripe & STRIPE_MASK), list[i].is_stable ? " (stable)" : ""); } } else { for (int i = ver_start; i < ver_end; i++) { - printf("Target version present on: osd %lu, role %ld%s\n", list[i].osd_num, (list[i].oid.stripe & STRIPE_MASK), list[i].is_stable ? " (stable)" : ""); + printf("Target version present on: osd %lu, role %ld%s\n", list[i].osd_num, + (list[i].oid.stripe & STRIPE_MASK), list[i].is_stable ? " (stable)" : ""); } } } @@ -343,6 +373,7 @@ void pg_t::calc_object_states(int log_level) pg_obj_state_check_t st; st.log_level = log_level; st.pg = this; + st.replicated = (this->scheme == POOL_SCHEME_REPLICATED); auto ps = peering_state; epoch = 0; for (auto it: ps->list_results) @@ -384,7 +415,7 @@ void pg_t::calc_object_states(int log_level) void pg_t::print_state() { printf( - "[PG %u] is %s%s%s%s%s%s%s%s%s%s%s (%lu objects)\n", pg_num, + "[PG %u] is %s%s%s%s%s%s%s%s%s%s%s%s%s (%lu objects)\n", pg_num, (state & PG_STARTING) ? "starting" : "", (state & PG_OFFLINE) ? "offline" : "", (state & PG_PEERING) ? "peering" : "", @@ -396,6 +427,8 @@ void pg_t::print_state() (state & PG_HAS_DEGRADED) ? " + has_degraded" : "", (state & PG_HAS_MISPLACED) ? " + has_misplaced" : "", (state & PG_HAS_UNCLEAN) ? " + has_unclean" : "", + (state & PG_HAS_INVALID) ? " + has_invalid" : "", + (state & PG_LEFT_ON_DEAD) ? " + left_on_dead" : "", total_count ); } diff --git a/osd_primary.cpp b/osd_primary.cpp index c1261815..c99fa1d2 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -13,14 +13,16 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op) { // PG number is calculated from the offset // Our EC scheme stores data in fixed chunks equal to (K*block size) - // K = pg_minsize and will be a property of the inode. Not it's hardcoded (FIXME) - uint64_t pg_block_size = bs_block_size * 2; + // K = pg_minsize in case of EC/XOR, or 1 for replicated pools + pool_id_t pool_id = INODE_POOL(cur_op->req.rw.inode); + auto & pool_cfg = st_cli.pool_config[pool_id]; + uint64_t pg_block_size = bs_block_size * (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_minsize); object_id oid = { .inode = cur_op->req.rw.inode, // oid.stripe = starting offset of the parity stripe .stripe = (cur_op->req.rw.offset/pg_block_size)*pg_block_size, }; - pool_id_t pool_id = INODE_POOL(oid.inode); + // FIXME: pg_stripe_size may be a per-pool config pg_num_t pg_num = (cur_op->req.rw.inode + oid.stripe/pg_stripe_size) % pg_counts[pool_id] + 1; auto pg_it = pgs.find({ .pool_id = pool_id, .pg_num = pg_num }); if (pg_it == pgs.end() || !(pg_it->second.state & PG_ACTIVE)) @@ -37,13 +39,15 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op) return false; } osd_primary_op_data_t *op_data = (osd_primary_op_data_t*)calloc( - sizeof(osd_primary_op_data_t) + sizeof(osd_rmw_stripe_t) * pg_it->second.pg_size, 1 + sizeof(osd_primary_op_data_t) + sizeof(osd_rmw_stripe_t) * (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pg_it->second.pg_size), 1 ); op_data->pg_num = pg_num; op_data->oid = oid; - op_data->stripes = ((osd_rmw_stripe_t*)(op_data+1)); + op_data->stripes = pool_cfg.scheme == POOL_SCHEME_REPLICATED ? NULL : ((osd_rmw_stripe_t*)(op_data+1)); + op_data->scheme = pool_cfg.scheme; cur_op->op_data = op_data; - split_stripes(pg_it->second.pg_minsize, bs_block_size, (uint32_t)(cur_op->req.rw.offset - oid.stripe), cur_op->req.rw.len, op_data->stripes); + split_stripes((pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pg_it->second.pg_minsize), + bs_block_size, (uint32_t)(cur_op->req.rw.offset - oid.stripe), cur_op->req.rw.len, op_data->stripes); pg_it->second.inflight++; return true; } @@ -88,7 +92,7 @@ void osd_t::continue_primary_read(osd_op_t *cur_op) else if (op_data->st == 2) goto resume_2; { auto & pg = pgs[{ .pool_id = INODE_POOL(op_data->oid.inode), .pg_num = op_data->pg_num }]; - for (int role = 0; role < pg.pg_minsize; role++) + for (int role = 0; role < (op_data->scheme == POOL_SCHEME_REPLICATED ? 1 : pg.pg_minsize); role++) { op_data->stripes[role].read_start = op_data->stripes[role].req_start; op_data->stripes[role].read_end = op_data->stripes[role].req_end; @@ -96,7 +100,7 @@ void osd_t::continue_primary_read(osd_op_t *cur_op) // Determine version auto vo_it = pg.ver_override.find(op_data->oid); op_data->target_ver = vo_it != pg.ver_override.end() ? vo_it->second : UINT64_MAX; - if (pg.state == PG_ACTIVE) + if (pg.state == PG_ACTIVE || op_data->scheme == POOL_SCHEME_REPLICATED) { // Fast happy-path cur_op->buf = alloc_read_buffer(op_data->stripes, pg.pg_minsize, 0); @@ -210,10 +214,33 @@ void osd_t::continue_primary_write(osd_op_t *cur_op) resume_1: // Determine blocks to read and write // Missing chunks are allowed to be overwritten even in incomplete objects - // FIXME: Allow to do small writes to the old (degraded/misplaced) OSD set for the lower performance impact + // FIXME: Allow to do small writes to the old (degraded/misplaced) OSD set for lower performance impact op_data->prev_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data(), &op_data->object_state); - cur_op->rmw_buf = calc_rmw(cur_op->buf, op_data->stripes, op_data->prev_set, - pg.pg_size, pg.pg_minsize, pg.pg_cursize, pg.cur_set.data(), bs_block_size); + if (op_data->scheme == POOL_SCHEME_REPLICATED) + { + // Simplified algorithm + op_data->stripes[0].write_start = op_data->stripes[0].req_start; + op_data->stripes[0].write_end = op_data->stripes[0].req_end; + op_data->stripes[0].write_buf = cur_op->buf; + if (pg.cur_set.data() != op_data->prev_set && (op_data->stripes[0].write_start != 0 || + op_data->stripes[0].write_end != bs_block_size)) + { + // Object is degraded/misplaced and will be moved to + op_data->stripes[0].read_start = 0; + op_data->stripes[0].read_end = bs_block_size; + cur_op->rmw_buf = op_data->stripes[0].read_buf = memalign(MEM_ALIGNMENT, bs_block_size); + if (!cur_op->rmw_buf) + { + printf("Failed to allocate %u bytes\n", bs_block_size); + exit(1); + } + } + } + else + { + cur_op->rmw_buf = calc_rmw(cur_op->buf, op_data->stripes, op_data->prev_set, + pg.pg_size, pg.pg_minsize, pg.pg_cursize, pg.cur_set.data(), bs_block_size); + } // Read required blocks submit_primary_subops(SUBMIT_RMW_READ, UINT64_MAX, pg.pg_size, op_data->prev_set, cur_op); resume_2: @@ -227,8 +254,27 @@ resume_3: } // Save version override for parallel reads pg.ver_override[op_data->oid] = op_data->fact_ver; - // Recover missing stripes, calculate parity - calc_rmw_parity_xor(op_data->stripes, pg.pg_size, op_data->prev_set, pg.cur_set.data(), bs_block_size); + if (op_data->scheme == POOL_SCHEME_REPLICATED) + { + // Only (possibly) copy new data from the request into the recovery buffer + if (pg.cur_set.data() != op_data->prev_set && (op_data->stripes[0].write_start != 0 || + op_data->stripes[0].write_end != bs_block_size)) + { + memcpy( + op_data->stripes[0].read_buf + op_data->stripes[0].req_start, + op_data->stripes[0].write_buf, + op_data->stripes[0].req_end - op_data->stripes[0].req_start + ); + op_data->stripes[0].write_buf = op_data->stripes[0].read_buf; + op_data->stripes[0].write_start = 0; + op_data->stripes[0].write_end = bs_block_size; + } + } + else + { + // Recover missing stripes, calculate parity + calc_rmw_parity_xor(op_data->stripes, pg.pg_size, op_data->prev_set, pg.cur_set.data(), bs_block_size); + } // Send writes if ((op_data->fact_ver >> (64-PG_EPOCH_BITS)) < pg.epoch) { @@ -291,7 +337,7 @@ resume_5: if (op_data->object_state->state & OBJ_MISPLACED) { // Remove extra chunks - submit_primary_del_subops(cur_op, pg.cur_set.data(), op_data->object_state->osd_set); + submit_primary_del_subops(cur_op, pg.cur_set.data(), pg.pg_size, op_data->object_state->osd_set); if (op_data->n_subops > 0) { resume_8: @@ -314,7 +360,9 @@ resume_9: // FIXME: Check for immediate_commit == IMMEDIATE_SMALL resume_6: resume_7: - if (!remember_unstable_write(cur_op, pg, pg.cur_loc_set, 6)) + // FIXME: Replicated writes are always "immediate" + if (op_data->scheme != POOL_SCHEME_REPLICATED && + !remember_unstable_write(cur_op, pg, pg.cur_loc_set, 6)) { return; } @@ -657,7 +705,7 @@ resume_3: pg.ver_override[op_data->oid] = op_data->fact_ver; // Submit deletes op_data->fact_ver++; - submit_primary_del_subops(cur_op, NULL, op_data->object_state ? op_data->object_state->osd_set : pg.cur_loc_set); + submit_primary_del_subops(cur_op, NULL, 0, op_data->object_state ? op_data->object_state->osd_set : pg.cur_loc_set); resume_4: op_data->st = 4; return; diff --git a/osd_primary.h b/osd_primary.h index a3c64b12..c7cbecf9 100644 --- a/osd_primary.h +++ b/osd_primary.h @@ -20,6 +20,7 @@ struct osd_primary_op_data_t object_id oid; uint64_t target_ver; uint64_t fact_ver = 0; + uint64_t scheme = 0; int n_subops = 0, done = 0, errors = 0, epipe = 0; int degraded = 0, pg_size, pg_minsize; osd_rmw_stripe_t *stripes; diff --git a/osd_primary_subops.cpp b/osd_primary_subops.cpp index d02b4cee..9e85057e 100644 --- a/osd_primary_subops.cpp +++ b/osd_primary_subops.cpp @@ -78,9 +78,10 @@ void osd_t::finish_op(osd_op_t *cur_op, int retval) void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, int pg_size, const uint64_t* osd_set, osd_op_t *cur_op) { - bool w = submit_type == SUBMIT_WRITE; + bool wr = submit_type == SUBMIT_WRITE; osd_primary_op_data_t *op_data = cur_op->op_data; osd_rmw_stripe_t *stripes = op_data->stripes; + bool rep = op_data->scheme == POOL_SCHEME_REPLICATED; // Allocate subops int n_subops = 0, zero_read = -1; for (int role = 0; role < pg_size; role++) @@ -89,12 +90,12 @@ void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, int pg_s { zero_read = role; } - if (osd_set[role] != 0 && (w || stripes[role].read_end != 0)) + if (osd_set[role] != 0 && (wr || !rep && stripes[role].read_end != 0)) { n_subops++; } } - if (!n_subops && submit_type == SUBMIT_RMW_READ) + if (!n_subops && (submit_type == SUBMIT_RMW_READ || rep)) { n_subops = 1; } @@ -111,36 +112,37 @@ void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, int pg_s for (int role = 0; role < pg_size; role++) { // We always submit zero-length writes to all replicas, even if the stripe is not modified - if (!(w || stripes[role].read_end != 0 || zero_read == role)) + if (!(wr || !rep && stripes[role].read_end != 0 || zero_read == role)) { continue; } osd_num_t role_osd_num = osd_set[role]; if (role_osd_num != 0) { + int stripe_num = rep ? 0 : role; if (role_osd_num == this->osd_num) { clock_gettime(CLOCK_REALTIME, &subops[i].tv_begin); subops[i].op_type = (uint64_t)cur_op; subops[i].bs_op = new blockstore_op_t({ - .opcode = (uint64_t)(w ? BS_OP_WRITE : BS_OP_READ), + .opcode = (uint64_t)(wr ? (rep ? BS_OP_WRITE_STABLE : BS_OP_WRITE) : BS_OP_READ), .callback = [subop = &subops[i], this](blockstore_op_t *bs_subop) { handle_primary_bs_subop(subop); }, .oid = { .inode = op_data->oid.inode, - .stripe = op_data->oid.stripe | role, + .stripe = op_data->oid.stripe | stripe_num, }, .version = op_version, - .offset = w ? stripes[role].write_start : stripes[role].read_start, - .len = w ? stripes[role].write_end - stripes[role].write_start : stripes[role].read_end - stripes[role].read_start, - .buf = w ? stripes[role].write_buf : stripes[role].read_buf, + .offset = wr ? stripes[stripe_num].write_start : stripes[stripe_num].read_start, + .len = wr ? stripes[stripe_num].write_end - stripes[stripe_num].write_start : stripes[stripe_num].read_end - stripes[stripe_num].read_start, + .buf = wr ? stripes[stripe_num].write_buf : stripes[stripe_num].read_buf, }); #ifdef OSD_DEBUG printf( - "Submit %s to local: %lu:%lu v%lu %u-%u\n", w ? "write" : "read", - op_data->oid.inode, op_data->oid.stripe | role, op_version, + "Submit %s to local: %lu:%lu v%lu %u-%u\n", wr ? "write" : "read", + op_data->oid.inode, op_data->oid.stripe | stripe_num, op_version, subops[i].bs_op->offset, subops[i].bs_op->len ); #endif @@ -154,35 +156,35 @@ void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, int pg_s .header = { .magic = SECONDARY_OSD_OP_MAGIC, .id = c_cli.next_subop_id++, - .opcode = (uint64_t)(w ? OSD_OP_SEC_WRITE : OSD_OP_SEC_READ), + .opcode = (uint64_t)(wr ? (rep ? OSD_OP_SEC_WRITE_STABLE : OSD_OP_SEC_WRITE) : OSD_OP_SEC_READ), }, .oid = { .inode = op_data->oid.inode, - .stripe = op_data->oid.stripe | role, + .stripe = op_data->oid.stripe | stripe_num, }, .version = op_version, - .offset = w ? stripes[role].write_start : stripes[role].read_start, - .len = w ? stripes[role].write_end - stripes[role].write_start : stripes[role].read_end - stripes[role].read_start, + .offset = wr ? stripes[stripe_num].write_start : stripes[stripe_num].read_start, + .len = wr ? stripes[stripe_num].write_end - stripes[stripe_num].write_start : stripes[stripe_num].read_end - stripes[stripe_num].read_start, }; #ifdef OSD_DEBUG printf( - "Submit %s to osd %lu: %lu:%lu v%lu %u-%u\n", w ? "write" : "read", role_osd_num, - op_data->oid.inode, op_data->oid.stripe | role, op_version, + "Submit %s to osd %lu: %lu:%lu v%lu %u-%u\n", wr ? "write" : "read", role_osd_num, + op_data->oid.inode, op_data->oid.stripe | stripe_num, op_version, subops[i].req.sec_rw.offset, subops[i].req.sec_rw.len ); #endif - if (w) + if (wr) { - if (stripes[role].write_end > stripes[role].write_start) + if (stripes[stripe_num].write_end > stripes[stripe_num].write_start) { - subops[i].iov.push_back(stripes[role].write_buf, stripes[role].write_end - stripes[role].write_start); + subops[i].iov.push_back(stripes[stripe_num].write_buf, stripes[stripe_num].write_end - stripes[stripe_num].write_start); } } else { - if (stripes[role].read_end > stripes[role].read_start) + if (stripes[stripe_num].read_end > stripes[stripe_num].read_start) { - subops[i].iov.push_back(stripes[role].read_buf, stripes[role].read_end - stripes[role].read_start); + subops[i].iov.push_back(stripes[stripe_num].read_buf, stripes[stripe_num].read_end - stripes[stripe_num].read_start); } } subops[i].callback = [cur_op, this](osd_op_t *subop) @@ -205,21 +207,23 @@ void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, int pg_s static uint64_t bs_op_to_osd_op[] = { 0, - OSD_OP_SEC_READ, // BS_OP_READ - OSD_OP_SEC_WRITE, // BS_OP_WRITE - OSD_OP_SEC_SYNC, // BS_OP_SYNC - OSD_OP_SEC_STABILIZE, // BS_OP_STABLE - OSD_OP_SEC_DELETE, // BS_OP_DELETE - OSD_OP_SEC_LIST, // BS_OP_LIST - OSD_OP_SEC_ROLLBACK, // BS_OP_ROLLBACK - OSD_OP_TEST_SYNC_STAB_ALL, // BS_OP_SYNC_STAB_ALL + OSD_OP_SEC_READ, // BS_OP_READ = 1 + OSD_OP_SEC_WRITE, // BS_OP_WRITE = 2 + OSD_OP_SEC_WRITE_STABLE, // BS_OP_WRITE_STABLE = 3 + OSD_OP_SEC_SYNC, // BS_OP_SYNC = 4 + OSD_OP_SEC_STABILIZE, // BS_OP_STABLE = 5 + OSD_OP_SEC_DELETE, // BS_OP_DELETE = 6 + OSD_OP_SEC_LIST, // BS_OP_LIST = 7 + OSD_OP_SEC_ROLLBACK, // BS_OP_ROLLBACK = 8 + OSD_OP_TEST_SYNC_STAB_ALL, // BS_OP_SYNC_STAB_ALL = 9 }; void osd_t::handle_primary_bs_subop(osd_op_t *subop) { osd_op_t *cur_op = (osd_op_t*)subop->op_type; blockstore_op_t *bs_op = subop->bs_op; - int expected = bs_op->opcode == BS_OP_READ || bs_op->opcode == BS_OP_WRITE ? bs_op->len : 0; + int expected = bs_op->opcode == BS_OP_READ || bs_op->opcode == BS_OP_WRITE + || bs_op->opcode == BS_OP_WRITE_STABLE ? bs_op->len : 0; if (bs_op->retval != expected && bs_op->opcode != BS_OP_READ) { // die @@ -231,7 +235,7 @@ void osd_t::handle_primary_bs_subop(osd_op_t *subop) add_bs_subop_stats(subop); subop->req.hdr.opcode = bs_op_to_osd_op[bs_op->opcode]; subop->reply.hdr.retval = bs_op->retval; - if (bs_op->opcode == BS_OP_READ || bs_op->opcode == BS_OP_WRITE) + if (bs_op->opcode == BS_OP_READ || bs_op->opcode == BS_OP_WRITE || bs_op->opcode == BS_OP_WRITE_STABLE) { subop->req.sec_rw.len = bs_op->len; subop->reply.sec_rw.version = bs_op->version; @@ -269,7 +273,7 @@ void osd_t::handle_primary_subop(osd_op_t *subop, osd_op_t *cur_op) uint64_t opcode = subop->req.hdr.opcode; int retval = subop->reply.hdr.retval; int expected = opcode == OSD_OP_SEC_READ || opcode == OSD_OP_SEC_WRITE - ? subop->req.sec_rw.len : 0; + || opcode == OSD_OP_SEC_WRITE_STABLE ? subop->req.sec_rw.len : 0; osd_primary_op_data_t *op_data = cur_op->op_data; if (retval != expected) { @@ -283,7 +287,7 @@ void osd_t::handle_primary_subop(osd_op_t *subop, osd_op_t *cur_op) else { op_data->done++; - if (opcode == OSD_OP_SEC_READ || opcode == OSD_OP_SEC_WRITE) + if (opcode == OSD_OP_SEC_READ || opcode == OSD_OP_SEC_WRITE || opcode == OSD_OP_SEC_WRITE_STABLE) { uint64_t version = subop->reply.sec_rw.version; #ifdef OSD_DEBUG @@ -346,13 +350,27 @@ void osd_t::cancel_primary_write(osd_op_t *cur_op) } } -void osd_t::submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_osd_set_t & loc_set) +static bool contains_osd(osd_num_t *osd_set, uint64_t size, osd_num_t osd_num) +{ + for (uint64_t i = 0; i < size; i++) + { + if (osd_set[i] == osd_num) + { + return true; + } + } + return false; +} + +void osd_t::submit_primary_del_subops(osd_op_t *cur_op, osd_num_t *cur_set, uint64_t pg_size, pg_osd_set_t & loc_set) { osd_primary_op_data_t *op_data = cur_op->op_data; + bool rep = op_data->scheme == POOL_SCHEME_REPLICATED; int extra_chunks = 0; + // ordered comparison for EC/XOR, unordered for replicated pools for (auto & chunk: loc_set) { - if (!cur_set || chunk.osd_num != cur_set[chunk.role]) + if (!cur_set || (rep ? contains_osd(cur_set, pg_size, chunk.osd_num) : chunk.osd_num != cur_set[chunk.role])) { extra_chunks++; } @@ -368,8 +386,9 @@ void osd_t::submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_os int i = 0; for (auto & chunk: loc_set) { - if (!cur_set || chunk.osd_num != cur_set[chunk.role]) + if (!cur_set || (rep ? contains_osd(cur_set, pg_size, chunk.osd_num) : chunk.osd_num != cur_set[chunk.role])) { + int stripe_num = op_data->scheme == POOL_SCHEME_REPLICATED ? 0 : chunk.role; if (chunk.osd_num == this->osd_num) { clock_gettime(CLOCK_REALTIME, &subops[i].tv_begin); @@ -382,7 +401,7 @@ void osd_t::submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_os }, .oid = { .inode = op_data->oid.inode, - .stripe = op_data->oid.stripe | chunk.role, + .stripe = op_data->oid.stripe | stripe_num, }, // Same version as write .version = op_data->fact_ver, @@ -401,7 +420,7 @@ void osd_t::submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_os }, .oid = { .inode = op_data->oid.inode, - .stripe = op_data->oid.stripe | chunk.role, + .stripe = op_data->oid.stripe | stripe_num, }, // Same version as write .version = op_data->fact_ver, diff --git a/osd_rmw.cpp b/osd_rmw.cpp index bfba4de4..318ced47 100644 --- a/osd_rmw.cpp +++ b/osd_rmw.cpp @@ -153,6 +153,11 @@ void* alloc_read_buffer(osd_rmw_stripe_t *stripes, int read_pg_size, uint64_t ad } // Allocate buffer void *buf = memalign(MEM_ALIGNMENT, buf_size); + if (!buf) + { + printf("Failed to allocate %lu bytes\n", buf_size); + exit(1); + } uint64_t buf_pos = add_size; for (int role = 0; role < read_pg_size; role++) { diff --git a/pg_states.cpp b/pg_states.cpp index e2f909a4..01a91bd0 100644 --- a/pg_states.cpp +++ b/pg_states.cpp @@ -1,8 +1,8 @@ #include "pg_states.h" -const int pg_state_bit_count = 13; +const int pg_state_bit_count = 14; -const int pg_state_bits[13] = { +const int pg_state_bits[14] = { PG_STARTING, PG_PEERING, PG_INCOMPLETE, @@ -14,10 +14,11 @@ const int pg_state_bits[13] = { PG_HAS_DEGRADED, PG_HAS_MISPLACED, PG_HAS_UNCLEAN, + PG_HAS_INVALID, PG_LEFT_ON_DEAD, }; -const char *pg_state_names[13] = { +const char *pg_state_names[14] = { "starting", "peering", "incomplete", @@ -29,5 +30,6 @@ const char *pg_state_names[13] = { "has_degraded", "has_misplaced", "has_unclean", + "has_invalid", "left_on_dead", }; diff --git a/pg_states.h b/pg_states.h index ff826d5b..9f6d8dfa 100644 --- a/pg_states.h +++ b/pg_states.h @@ -15,7 +15,8 @@ #define PG_HAS_DEGRADED (1<<8) #define PG_HAS_MISPLACED (1<<9) #define PG_HAS_UNCLEAN (1<<10) -#define PG_LEFT_ON_DEAD (1<<11) +#define PG_HAS_INVALID (1<<11) +#define PG_LEFT_ON_DEAD (1<<12) // FIXME: Safe default that doesn't depend on pg_stripe_size or pg_block_size #define STRIPE_MASK ((uint64_t)4096 - 1) @@ -26,7 +27,6 @@ #define OBJ_MISPLACED 0x08 #define OBJ_NEEDS_STABLE 0x10000 #define OBJ_NEEDS_ROLLBACK 0x20000 -#define OBJ_BUGGY 0x80000 extern const int pg_state_bits[]; extern const char *pg_state_names[];