diff --git a/etcd_state_client.cpp b/etcd_state_client.cpp index c3788cf9..a4199559 100644 --- a/etcd_state_client.cpp +++ b/etcd_state_client.cpp @@ -351,6 +351,12 @@ void etcd_state_client_t::parse_state(const std::string & key, const json11::Jso { pg_cfg.all_peers.push_back(pg_osd.uint64_value()); } + // Read epoch + pg_cfg.epoch = value["epoch"].uint64_value(); + if (on_change_pg_history_hook != NULL) + { + on_change_pg_history_hook(pg_num); + } } } else if (key.substr(0, etcd_prefix.length()+10) == etcd_prefix+"/pg/state/") diff --git a/etcd_state_client.h b/etcd_state_client.h index 7140c805..ec312fb3 100644 --- a/etcd_state_client.h +++ b/etcd_state_client.h @@ -23,6 +23,7 @@ struct pg_config_t bool pause; osd_num_t cur_primary; int cur_state; + uint64_t epoch; }; struct json_kv_t @@ -48,7 +49,8 @@ struct etcd_state_client_t std::function on_load_config_hook; std::function load_pgs_checks_hook; std::function on_load_pgs_hook; - std::function on_change_osd_state_hook; + std::function on_change_pg_history_hook; + std::function on_change_osd_state_hook; json_kv_t parse_etcd_kv(const json11::Json & kv_json); void etcd_call(std::string api, json11::Json payload, int timeout, std::function callback); diff --git a/lp/mon.js b/lp/mon.js index 758cbb94..0dbc47b0 100644 --- a/lp/mon.js +++ b/lp/mon.js @@ -371,6 +371,7 @@ class Mon pg_history[i] = { osd_sets: [], all_peers: [], + epoch: 0, }; for (let j = 0; j < mul; j++) { @@ -384,6 +385,10 @@ class Mon { Array.prototype.push.apply(pg_history[i].all_peers, hist.all_peers); } + if (hist && hist.epoch) + { + pg_history[i].epoch = pg_history[i].epoch < hist.epoch ? hist.epoch : pg_history[i].epoch; + } } } } @@ -393,6 +398,7 @@ class Mon // So, merge ALL PGs history let all_sets = {}; let all_peers = {}; + let max_epoch = 0; for (const pg of prev_pgs) { all_sets[pg.join(' ')] = pg; @@ -414,12 +420,16 @@ class Mon all_peers[osd_num] = Number(osd_num); } } + if (hist && hist.epoch) + { + max_epoch = max_epoch < hist.epoch ? hist.epoch : max_epoch; + } } all_sets = Object.values(all_sets); all_peers = Object.values(all_peers); for (let i = 0; i < new_pg_count; i++) { - pg_history[i] = { osd_sets: all_sets, all_peers }; + pg_history[i] = { osd_sets: all_sets, all_peers, epoch: max_epoch }; } } // Mark history keys for removed PGs as removed diff --git a/osd.h b/osd.h index fc88c3b4..9c62af9c 100644 --- a/osd.h +++ b/osd.h @@ -127,7 +127,8 @@ class osd_t // cluster connection void parse_config(blockstore_config_t & config); void init_cluster(); - void on_change_osd_state_hook(uint64_t osd_num); + void on_change_osd_state_hook(osd_num_t peer_osd); + void on_change_pg_history_hook(pg_num_t pg_num); void on_change_etcd_state_hook(json11::Json::object & changes); void on_load_config_hook(json11::Json::object & changes); json11::Json on_load_pgs_checks_hook(); @@ -194,7 +195,7 @@ class osd_t void handle_primary_bs_subop(osd_op_t *subop); void add_bs_subop_stats(osd_op_t *subop); void pg_cancel_write_queue(pg_t & pg, osd_op_t *first_op, object_id oid, int retval); - void submit_primary_subops(int submit_type, int read_pg_size, const uint64_t* osd_set, osd_op_t *cur_op); + void submit_primary_subops(int submit_type, uint64_t op_version, int pg_size, const uint64_t* osd_set, osd_op_t *cur_op); void submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_osd_set_t & loc_set); void submit_primary_sync_subops(osd_op_t *cur_op); void submit_primary_stab_subops(osd_op_t *cur_op); diff --git a/osd_cluster.cpp b/osd_cluster.cpp index 89682cf6..772f70d1 100644 --- a/osd_cluster.cpp +++ b/osd_cluster.cpp @@ -43,7 +43,8 @@ void osd_t::init_cluster() { st_cli.tfd = tfd; st_cli.log_level = log_level; - st_cli.on_change_osd_state_hook = [this](uint64_t peer_osd) { on_change_osd_state_hook(peer_osd); }; + st_cli.on_change_osd_state_hook = [this](osd_num_t peer_osd) { on_change_osd_state_hook(peer_osd); }; + st_cli.on_change_pg_history_hook = [this](pg_num_t pg_num) { on_change_pg_history_hook(pg_num); }; st_cli.on_change_hook = [this](json11::Json::object & changes) { on_change_etcd_state_hook(changes); }; st_cli.on_load_config_hook = [this](json11::Json::object & cfg) { on_load_config_hook(cfg); }; st_cli.load_pgs_checks_hook = [this]() { return on_load_pgs_checks_hook(); }; @@ -207,7 +208,7 @@ void osd_t::report_statistics() }); } -void osd_t::on_change_osd_state_hook(uint64_t peer_osd) +void osd_t::on_change_osd_state_hook(osd_num_t peer_osd) { if (c_cli.wanted_peers.find(peer_osd) != c_cli.wanted_peers.end()) { @@ -222,6 +223,27 @@ void osd_t::on_change_etcd_state_hook(json11::Json::object & changes) apply_pg_config(); } +void osd_t::on_change_pg_history_hook(pg_num_t pg_num) +{ + auto pg_it = pgs.find(pg_num); + if (pg_it != pgs.end() && pg_it->second.epoch > pg_it->second.reported_epoch && + st_cli.pg_config[pg_num].epoch >= pg_it->second.epoch) + { + pg_it->second.reported_epoch = st_cli.pg_config[pg_num].epoch; + object_id oid = { 0 }; + bool first = true; + for (auto op: pg_it->second.write_queue) + { + if (first || oid != op.first) + { + oid = op.first; + first = false; + continue_primary_write(op.second); + } + } + } +} + void osd_t::on_load_config_hook(json11::Json::object & global_config) { blockstore_config_t osd_config = this->config; @@ -549,6 +571,7 @@ void osd_t::apply_pg_config() .state = pg_cfg.cur_primary == this->osd_num ? PG_PEERING : PG_STARTING, .pg_cursize = 0, .pg_num = pg_num, + .reported_epoch = pg_cfg.epoch, .target_history = pg_cfg.target_history, .all_peers = std::vector(all_peers.begin(), all_peers.end()), .target_set = pg_cfg.target_set, @@ -584,7 +607,6 @@ void osd_t::report_pg_states() { return; } - etcd_reporting_pg_state = true; std::vector> reporting_pgs; json11::Json::array checks; json11::Json::array success; @@ -651,26 +673,26 @@ void osd_t::report_pg_states() }); if (pg.history_changed) { + // Prevent race conditions (for the case when the monitor is updating this key at the same time) pg.history_changed = false; - if (pg.state == PG_ACTIVE) - { - success.push_back(json11::Json::object { - { "request_delete_range", json11::Json::object { - { "key", base64_encode(st_cli.etcd_prefix+"/pg/history/"+std::to_string(pg.pg_num)) }, - } } - }); - } - else if (pg.state == (PG_ACTIVE|PG_LEFT_ON_DEAD)) - { - success.push_back(json11::Json::object { - { "request_put", json11::Json::object { - { "key", base64_encode(st_cli.etcd_prefix+"/pg/history/"+std::to_string(pg.pg_num)) }, - { "value", base64_encode(json11::Json(json11::Json::object { - { "all_peers", pg.all_peers }, - }).dump()) }, - } } - }); - } + std::string history_key = base64_encode(st_cli.etcd_prefix+"/pg/history/"+std::to_string(pg.pg_num)); + json11::Json::object history_value = { + { "epoch", pg.epoch }, + { "all_peers", pg.all_peers }, + { "osd_sets", pg.target_history }, + }; + checks.push_back(json11::Json::object { + { "target", "MOD" }, + { "key", history_key }, + { "result", "LESS" }, + { "mod_revision", st_cli.etcd_watch_revision+1 }, + }); + success.push_back(json11::Json::object { + { "request_put", json11::Json::object { + { "key", history_key }, + { "value", base64_encode(json11::Json(history_value).dump()) }, + } } + }); } } failure.push_back(json11::Json::object { @@ -680,6 +702,7 @@ void osd_t::report_pg_states() }); } pg_state_dirty.clear(); + etcd_reporting_pg_state = true; st_cli.etcd_txn(json11::Json::object { { "compare", checks }, { "success", success }, { "failure", failure } }, ETCD_QUICK_TIMEOUT, [this, reporting_pgs](std::string err, json11::Json data) @@ -705,14 +728,17 @@ void osd_t::report_pg_states() if (res["kvs"].array_items().size()) { auto kv = st_cli.parse_etcd_kv(res["kvs"][0]); - pg_num_t pg_num = stoull_full(kv.key.substr(st_cli.etcd_prefix.length()+10)); - auto pg_it = pgs.find(pg_num); - if (pg_it != pgs.end() && pg_it->second.state != PG_OFFLINE && pg_it->second.state != PG_STARTING) + if (kv.key.substr(st_cli.etcd_prefix.length()+10) == st_cli.etcd_prefix+"/pg/state/") { - // Live PG state update failed - printf("Failed to report state of PG %u which is live. Race condition detected, exiting\n", pg_num); - force_stop(1); - return; + pg_num_t pg_num = stoull_full(kv.key.substr(st_cli.etcd_prefix.length()+10)); + auto pg_it = pgs.find(pg_num); + if (pg_it != pgs.end() && pg_it->second.state != PG_OFFLINE && pg_it->second.state != PG_STARTING) + { + // Live PG state update failed + printf("Failed to report state of PG %u which is live. Race condition detected, exiting\n", pg_num); + force_stop(1); + return; + } } } } diff --git a/osd_peering_pg.cpp b/osd_peering_pg.cpp index b2ce66ea..655ece72 100644 --- a/osd_peering_pg.cpp +++ b/osd_peering_pg.cpp @@ -344,6 +344,7 @@ void pg_t::calc_object_states(int log_level) st.log_level = log_level; st.pg = this; auto ps = peering_state; + epoch = 0; for (auto it: ps->list_results) { auto nstab = it.second.stable_count; @@ -354,6 +355,10 @@ void pg_t::calc_object_states(int log_level) obj_ver_id *ov = it.second.buf; for (uint64_t i = 0; i < n; i++, ov++) { + if ((ov->version >> (64-PG_EPOCH_BITS)) > epoch) + { + epoch = (ov->version >> (64-PG_EPOCH_BITS)); + } st.list[start+i] = { .oid = ov->oid, .version = ov->version, @@ -369,6 +374,11 @@ void pg_t::calc_object_states(int log_level) std::sort(st.list.begin(), st.list.end()); // Walk over it and check object states st.walk(); + if (this->state & (PG_DEGRADED|PG_LEFT_ON_DEAD)) + { + assert(epoch != ((1ul << PG_EPOCH_BITS)-1)); + epoch++; + } } void pg_t::print_state() diff --git a/osd_peering_pg.h b/osd_peering_pg.h index 41e2122d..6fadb658 100644 --- a/osd_peering_pg.h +++ b/osd_peering_pg.h @@ -9,6 +9,8 @@ #include "osd_ops.h" #include "pg_states.h" +#define PG_EPOCH_BITS 48 + struct pg_obj_loc_t { uint64_t role; @@ -72,6 +74,8 @@ struct pg_t uint64_t pg_cursize = 3, pg_size = 3, pg_minsize = 2; pg_num_t pg_num; uint64_t clean_count = 0, total_count = 0; + // epoch number - should increase with each non-clean activation of the PG + uint64_t epoch = 0, reported_epoch = 0; // target history and all potential peers std::vector> target_history; std::vector all_peers; diff --git a/osd_primary.cpp b/osd_primary.cpp index 1a61666b..772aa67b 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -99,7 +99,7 @@ void osd_t::continue_primary_read(osd_op_t *cur_op) { // Fast happy-path cur_op->buf = alloc_read_buffer(op_data->stripes, pg.pg_minsize, 0); - submit_primary_subops(SUBMIT_READ, pg.pg_minsize, pg.cur_set.data(), cur_op); + submit_primary_subops(SUBMIT_READ, op_data->target_ver, pg.pg_minsize, pg.cur_set.data(), cur_op); op_data->st = 1; } else @@ -116,7 +116,7 @@ void osd_t::continue_primary_read(osd_op_t *cur_op) op_data->pg_size = pg.pg_size; op_data->degraded = 1; cur_op->buf = alloc_read_buffer(op_data->stripes, pg.pg_size, 0); - submit_primary_subops(SUBMIT_READ, pg.pg_size, cur_set, cur_op); + submit_primary_subops(SUBMIT_READ, op_data->target_ver, pg.pg_size, cur_set, cur_op); op_data->st = 1; } } @@ -200,6 +200,7 @@ void osd_t::continue_primary_write(osd_op_t *cur_op) else if (op_data->st == 7) goto resume_7; else if (op_data->st == 8) goto resume_8; else if (op_data->st == 9) goto resume_9; + else if (op_data->st == 10) goto resume_10; assert(op_data->st == 0); if (!check_write_queue(cur_op, pg)) { @@ -213,7 +214,7 @@ resume_1: cur_op->rmw_buf = calc_rmw(cur_op->buf, op_data->stripes, op_data->prev_set, pg.pg_size, pg.pg_minsize, pg.pg_cursize, pg.cur_set.data(), bs_block_size); // Read required blocks - submit_primary_subops(SUBMIT_RMW_READ, pg.pg_size, op_data->prev_set, cur_op); + submit_primary_subops(SUBMIT_RMW_READ, UINT64_MAX, pg.pg_size, op_data->prev_set, cur_op); resume_2: op_data->st = 2; return; @@ -228,7 +229,34 @@ resume_3: // Recover missing stripes, calculate parity calc_rmw_parity_xor(op_data->stripes, pg.pg_size, op_data->prev_set, pg.cur_set.data(), bs_block_size); // Send writes - submit_primary_subops(SUBMIT_WRITE, pg.pg_size, pg.cur_set.data(), cur_op); + if ((op_data->fact_ver >> (64-PG_EPOCH_BITS)) < pg.epoch) + { + op_data->target_ver = ((uint64_t)pg.epoch << (64-PG_EPOCH_BITS)) | 1; + } + else + { + if ((op_data->fact_ver & (1ul<<(64-PG_EPOCH_BITS) - 1)) == (1ul<<(64-PG_EPOCH_BITS) - 1)) + { + assert(pg.epoch != ((1ul << PG_EPOCH_BITS)-1)); + pg.epoch++; + } + op_data->target_ver = op_data->fact_ver + 1; + } + if (pg.epoch > pg.reported_epoch) + { + // Report newer epoch before writing + // FIXME: We may report only one PG state here... + this->pg_state_dirty.insert(pg.pg_num); + pg.history_changed = true; + report_pg_states(); +resume_10: + if (pg.epoch > pg.reported_epoch) + { + op_data->st = 10; + return; + } + } + submit_primary_subops(SUBMIT_WRITE, op_data->target_ver, pg.pg_size, pg.cur_set.data(), cur_op); resume_4: op_data->st = 4; return; @@ -614,7 +642,7 @@ resume_1: // Determine which OSDs contain this object and delete it op_data->prev_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data(), &op_data->object_state); // Submit 1 read to determine the actual version number - submit_primary_subops(SUBMIT_RMW_READ, pg.pg_size, op_data->prev_set, cur_op); + submit_primary_subops(SUBMIT_RMW_READ, UINT64_MAX, pg.pg_size, op_data->prev_set, cur_op); resume_2: op_data->st = 2; return; diff --git a/osd_primary_subops.cpp b/osd_primary_subops.cpp index a99776d8..673a168d 100644 --- a/osd_primary_subops.cpp +++ b/osd_primary_subops.cpp @@ -76,7 +76,7 @@ void osd_t::finish_op(osd_op_t *cur_op, int retval) } } -void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t* osd_set, osd_op_t *cur_op) +void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, int pg_size, const uint64_t* osd_set, osd_op_t *cur_op) { bool w = submit_type == SUBMIT_WRITE; osd_primary_op_data_t *op_data = cur_op->op_data; @@ -102,7 +102,6 @@ void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t* { zero_read = -1; } - uint64_t op_version = w ? op_data->fact_ver+1 : (submit_type == SUBMIT_RMW_READ ? UINT64_MAX : op_data->target_ver); osd_op_t *subops = new osd_op_t[n_subops]; op_data->fact_ver = 0; op_data->done = op_data->errors = 0;