diff --git a/etcd_state_client.cpp b/etcd_state_client.cpp index 9a95fb34..156f5603 100644 --- a/etcd_state_client.cpp +++ b/etcd_state_client.cpp @@ -315,35 +315,35 @@ void etcd_state_client_t::parse_state(const std::string & key, const json11::Jso if (pool_item.second["pg_size"].uint64_value() < 1 || pool_item.second["scheme"] == "xor" && pool_item.second["pg_size"].uint64_value() < 3) { - printf("Pool %lu has invalid pg_size, skipping pool\n", pool_id); + printf("Pool %u has invalid pg_size, skipping pool\n", pool_id); continue; } if (pool_item.second["pg_minsize"].uint64_value() < 1 || pool_item.second["pg_minsize"].uint64_value() > pool_item.second["pg_size"].uint64_value() || pool_item.second["pg_minsize"].uint64_value() < (pool_item.second["pg_size"].uint64_value() - 1)) { - printf("Pool %lu has invalid pg_minsize, skipping pool\n", pool_id); + printf("Pool %u has invalid pg_minsize, skipping pool\n", pool_id); continue; } if (pool_item.second["pg_count"].uint64_value() < 1) { - printf("Pool %lu has invalid pg_count, skipping pool\n", pool_id); + printf("Pool %u has invalid pg_count, skipping pool\n", pool_id); continue; } if (pool_item.second["name"].string_value() == "") { - printf("Pool %lu has empty name, skipping pool\n", pool_id); + printf("Pool %u has empty name, skipping pool\n", pool_id); continue; } if (pool_item.second["scheme"] != "replicated" && pool_item.second["scheme"] != "xor") { - printf("Pool %lu has invalid coding scheme (only \"xor\" and \"replicated\" are allowed), skipping pool\n", pool_id); + printf("Pool %u has invalid coding scheme (only \"xor\" and \"replicated\" are allowed), skipping pool\n", pool_id); continue; } if (pool_item.second["max_osd_combinations"].uint64_value() > 0 && pool_item.second["max_osd_combinations"].uint64_value() < 100) { - printf("Pool %lu has invalid max_osd_combinations (must be at least 100), skipping pool\n", pool_id); + printf("Pool %u has invalid max_osd_combinations (must be at least 100), skipping pool\n", pool_id); continue; } auto & parsed_cfg = this->pool_config[pool_id]; @@ -360,6 +360,15 @@ void etcd_state_client_t::parse_state(const std::string & key, const json11::Jso { parsed_cfg.max_osd_combinations = 10000; } + for (auto & pg_item: parsed_cfg.pg_config) + { + if (pg_item.second.target_set.size() != parsed_cfg.pg_size) + { + printf("Pool %u PG %u configuration is invalid: osd_set size %lu != pool pg_size %lu\n", + pool_id, pg_item.first, pg_item.second.target_set.size(), parsed_cfg.pg_size); + pg_item.second.pause = true; + } + } } } else if (key == etcd_prefix+"/config/pgs") @@ -384,7 +393,7 @@ void etcd_state_client_t::parse_state(const std::string & key, const json11::Jso pg_num_t pg_num = stoull_full(pg_item.first); if (!pg_num) { - printf("Bad key in pool %lu PG configuration: %s (must be a number), skipped\n", pool_id, pg_item.first.c_str()); + printf("Bad key in pool %u PG configuration: %s (must be a number), skipped\n", pool_id, pg_item.first.c_str()); continue; } auto & parsed_cfg = this->pool_config[pool_id].pg_config[pg_num]; @@ -396,6 +405,12 @@ void etcd_state_client_t::parse_state(const std::string & key, const json11::Jso { parsed_cfg.target_set.push_back(pg_osd.uint64_value()); } + if (parsed_cfg.target_set.size() != pool_config[pool_id].pg_size) + { + printf("Pool %u PG %u configuration is invalid: osd_set size %lu != pool pg_size %lu\n", + pool_id, pg_num, parsed_cfg.target_set.size(), pool_config[pool_id].pg_size); + parsed_cfg.pause = true; + } } } for (auto & pool_item: this->pool_config) @@ -406,7 +421,7 @@ void etcd_state_client_t::parse_state(const std::string & key, const json11::Jso if (pg_it->second.exists && pg_it->first != ++n) { printf( - "Invalid pool %lu PG configuration: PG numbers don't cover whole 1..%lu range\n", + "Invalid pool %u PG configuration: PG numbers don't cover whole 1..%lu range\n", pool_item.second.id, pool_item.second.pg_config.size() ); for (pg_it = pool_item.second.pg_config.begin(); pg_it != pool_item.second.pg_config.end(); pg_it++) @@ -426,7 +441,7 @@ void etcd_state_client_t::parse_state(const std::string & key, const json11::Jso pool_id_t pool_id = 0; pg_num_t pg_num = 0; char null_byte = 0; - sscanf(key.c_str() + etcd_prefix.length()+12, "%lu/%u%c", &pool_id, &pg_num, &null_byte); + sscanf(key.c_str() + etcd_prefix.length()+12, "%u/%u%c", &pool_id, &pg_num, &null_byte); if (!pool_id || pool_id >= POOL_ID_MAX || !pg_num || null_byte != 0) { printf("Bad etcd key %s, ignoring\n", key.c_str()); @@ -465,7 +480,7 @@ void etcd_state_client_t::parse_state(const std::string & key, const json11::Jso pool_id_t pool_id = 0; pg_num_t pg_num = 0; char null_byte = 0; - sscanf(key.c_str() + etcd_prefix.length()+10, "%lu/%u%c", &pool_id, &pg_num, &null_byte); + sscanf(key.c_str() + etcd_prefix.length()+10, "%u/%u%c", &pool_id, &pg_num, &null_byte); if (!pool_id || pool_id >= POOL_ID_MAX || !pg_num || null_byte != 0) { printf("Bad etcd key %s, ignoring\n", key.c_str()); @@ -492,7 +507,7 @@ void etcd_state_client_t::parse_state(const std::string & key, const json11::Jso } if (i >= pg_state_bit_count) { - printf("Unexpected PG %u state keyword in etcd: %s\n", pg_num, e.dump().c_str()); + printf("Unexpected pool %u PG %u state keyword in etcd: %s\n", pool_id, pg_num, e.dump().c_str()); return; } } @@ -501,7 +516,7 @@ void etcd_state_client_t::parse_state(const std::string & key, const json11::Jso (state & PG_PEERING) && state != PG_PEERING || (state & PG_INCOMPLETE) && state != PG_INCOMPLETE) { - printf("Unexpected PG %u state in etcd: primary=%lu, state=%s\n", pg_num, cur_primary, value["state"].dump().c_str()); + printf("Unexpected pool %u PG %u state in etcd: primary=%lu, state=%s\n", pool_id, pg_num, cur_primary, value["state"].dump().c_str()); return; } this->pool_config[pool_id].pg_config[pg_num].cur_primary = cur_primary; diff --git a/etcd_state_client.h b/etcd_state_client.h index a6b27b11..e3001f6d 100644 --- a/etcd_state_client.h +++ b/etcd_state_client.h @@ -13,12 +13,6 @@ #define ETCD_SLOW_TIMEOUT 5000 #define ETCD_QUICK_TIMEOUT 1000 -#define POOL_SCHEME_REPLICATED 1 -#define POOL_SCHEME_XOR 2 -#define POOL_ID_MAX 0x10000 -#define POOL_ID_BITS 16 -#define INODE_POOL(inode) ((inode) >> (64 - POOL_ID_BITS)) - struct json_kv_t { std::string key; @@ -38,8 +32,6 @@ struct pg_config_t uint64_t epoch; }; -typedef uint64_t pool_id_t; - struct pool_config_t { bool exists; diff --git a/messenger.h b/messenger.h index c206beb7..2d60d138 100644 --- a/messenger.h +++ b/messenger.h @@ -193,7 +193,7 @@ struct osd_client_t std::map sent_ops; // PGs dirtied by this client's primary-writes - std::set dirty_pgs; + std::set dirty_pgs; // Write state osd_op_t *write_op = NULL; diff --git a/osd.h b/osd.h index 1b0fa1a8..6e1a8bec 100644 --- a/osd.h +++ b/osd.h @@ -48,7 +48,6 @@ struct osd_recovery_op_t { int st = 0; bool degraded = false; - pg_num_t pg_num = 0; object_id oid = { 0 }; osd_op_t *osd_op = NULL; }; @@ -82,18 +81,18 @@ class osd_t std::string etcd_lease_id; json11::Json self_state; bool loading_peer_config = false; - std::set pg_state_dirty; + std::set pg_state_dirty; bool pg_config_applied = false; bool etcd_reporting_pg_state = false; bool etcd_reporting_stats = false; // peers and PGs - std::map pgs; - std::set dirty_pgs; + std::map pg_counts; + std::map pgs; + std::set dirty_pgs; uint64_t misplaced_objects = 0, degraded_objects = 0, incomplete_objects = 0; int peering_state = 0; - unsigned pg_count = 0; std::map recovery_ops; osd_op_t *autosync_op = NULL; @@ -126,7 +125,7 @@ class osd_t void parse_config(blockstore_config_t & config); void init_cluster(); void on_change_osd_state_hook(osd_num_t peer_osd); - void on_change_pg_history_hook(pg_num_t pg_num); + void on_change_pg_history_hook(pool_id_t pool_id, pg_num_t pg_num); void on_change_etcd_state_hook(json11::Json::object & changes); void on_load_config_hook(json11::Json::object & changes); json11::Json on_load_pgs_checks_hook(); @@ -152,17 +151,17 @@ class osd_t void parse_test_peer(std::string peer); void handle_peers(); void repeer_pgs(osd_num_t osd_num); - void start_pg_peering(pg_num_t pg_num); + void start_pg_peering(pg_t & pg); void submit_sync_and_list_subop(osd_num_t role_osd, pg_peering_state_t *ps); void submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps); void discard_list_subop(osd_op_t *list_op); - bool stop_pg(pg_num_t pg_num); + bool stop_pg(pg_t & pg); void finish_stop_pg(pg_t & pg); // flushing, recovery and backfill - void submit_pg_flush_ops(pg_num_t pg_num); - void handle_flush_op(bool rollback, pg_num_t pg_num, pg_flush_batch_t *fb, osd_num_t peer_osd, int retval); - void submit_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback, osd_num_t peer_osd, int count, obj_ver_id *data); + void submit_pg_flush_ops(pg_t & pg); + void handle_flush_op(bool rollback, pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t *fb, osd_num_t peer_osd, int retval); + void submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback, osd_num_t peer_osd, int count, obj_ver_id *data); bool pick_next_recovery(osd_recovery_op_t &op); void submit_recovery_op(osd_recovery_op_t *op); bool continue_recovery(); @@ -200,6 +199,9 @@ class osd_t inline pg_num_t map_to_pg(object_id oid) { + uint64_t pg_count = pg_counts[INODE_POOL(oid.inode)]; + if (!pg_count) + pg_count = 1; return (oid.inode + oid.stripe / pg_stripe_size) % pg_count + 1; } diff --git a/osd_cluster.cpp b/osd_cluster.cpp index c1b6277c..5d111dc8 100644 --- a/osd_cluster.cpp +++ b/osd_cluster.cpp @@ -14,7 +14,7 @@ void osd_t::init_cluster() { if (run_primary) { - // Test version of clustering code with 1 PG and 2 peers + // Test version of clustering code with 1 pool, 1 PG and 2 peers // Example: peers = 2:127.0.0.1:11204,3:127.0.0.1:11205 std::string peerstr = config["peers"]; while (peerstr.size()) @@ -27,15 +27,16 @@ void osd_t::init_cluster() { throw std::runtime_error("run_primary requires at least 2 peers"); } - pgs[1] = (pg_t){ + pgs[{ 1, 1 }] = (pg_t){ .state = PG_PEERING, .pg_cursize = 0, + .pool_id = 1, .pg_num = 1, .target_set = { 1, 2, 3 }, .cur_set = { 0, 0, 0 }, }; - report_pg_state(pgs[1]); - pg_count = 1; + report_pg_state(pgs[{ 1, 1 }]); + pg_counts[1] = 1; } bind_socket(); } @@ -44,7 +45,7 @@ void osd_t::init_cluster() st_cli.tfd = tfd; st_cli.log_level = log_level; st_cli.on_change_osd_state_hook = [this](osd_num_t peer_osd) { on_change_osd_state_hook(peer_osd); }; - st_cli.on_change_pg_history_hook = [this](pg_num_t pg_num) { on_change_pg_history_hook(pg_num); }; + st_cli.on_change_pg_history_hook = [this](pool_id_t pool_id, pg_num_t pg_num) { on_change_pg_history_hook(pool_id, pg_num); }; st_cli.on_change_hook = [this](json11::Json::object & changes) { on_change_etcd_state_hook(changes); }; st_cli.on_load_config_hook = [this](json11::Json::object & cfg) { on_load_config_hook(cfg); }; st_cli.load_pgs_checks_hook = [this]() { return on_load_pgs_checks_hook(); }; @@ -223,13 +224,16 @@ void osd_t::on_change_etcd_state_hook(json11::Json::object & changes) apply_pg_config(); } -void osd_t::on_change_pg_history_hook(pg_num_t pg_num) +void osd_t::on_change_pg_history_hook(pool_id_t pool_id, pg_num_t pg_num) { - auto pg_it = pgs.find(pg_num); + auto pg_it = pgs.find({ + .pool_id = pool_id, + .pg_num = pg_num, + }); if (pg_it != pgs.end() && pg_it->second.epoch > pg_it->second.reported_epoch && - st_cli.pg_config[pg_num].epoch >= pg_it->second.epoch) + st_cli.pool_config[pool_id].pg_config[pg_num].epoch >= pg_it->second.epoch) { - pg_it->second.reported_epoch = st_cli.pg_config[pg_num].epoch; + pg_it->second.reported_epoch = st_cli.pool_config[pool_id].pg_config[pg_num].epoch; object_id oid = { 0 }; bool first = true; for (auto op: pg_it->second.write_queue) @@ -451,144 +455,156 @@ void osd_t::on_load_pgs_hook(bool success) void osd_t::apply_pg_count() { - pg_num_t pg_count = st_cli.pg_config.size(); - if (this->pg_count != 0 && this->pg_count != pg_count) + for (auto & pool_item: st_cli.pool_config) { - // Check that all PGs are offline. It is not allowed to change PG count when any PGs are online - // The external tool must wait for all PGs to come down before changing PG count - // If it doesn't wait, a restarted OSD may apply the new count immediately which will lead to bugs - // So an OSD just dies if it detects PG count change while there are active PGs - int still_active = 0; - for (auto & kv: pgs) + if (pool_item.second.real_pg_count != 0 && + pool_item.second.real_pg_count != pg_counts[pool_item.first]) { - if (kv.second.state & PG_ACTIVE) + // Check that all pool PGs are offline. It is not allowed to change PG count when any PGs are online + // The external tool must wait for all PGs to come down before changing PG count + // If it doesn't wait, a restarted OSD may apply the new count immediately which will lead to bugs + // So an OSD just dies if it detects PG count change while there are active PGs + int still_active = 0; + for (auto & kv: pgs) { - still_active++; + if (kv.first.pool_id == pool_item.first && (kv.second.state & PG_ACTIVE)) + { + still_active++; + } + } + if (still_active > 0) + { + printf("[OSD %lu] PG count change detected, but %d PG(s) are still active. This is not allowed. Exiting\n", this->osd_num, still_active); + force_stop(1); + return; } } - if (still_active > 0) - { - printf("[OSD %lu] PG count change detected, but %d PG(s) are still active. This is not allowed. Exiting\n", this->osd_num, still_active); - force_stop(1); - return; - } + this->pg_counts[pool_item.first] = pool_item.second.real_pg_count; } - this->pg_count = pg_count; } void osd_t::apply_pg_config() { bool all_applied = true; - for (auto & kv: st_cli.pg_config) + for (auto & pool_item: st_cli.pool_config) { - pg_num_t pg_num = kv.first; - auto & pg_cfg = kv.second; - bool take = pg_cfg.exists && pg_cfg.primary == this->osd_num && - !pg_cfg.pause && (!pg_cfg.cur_primary || pg_cfg.cur_primary == this->osd_num); - bool currently_taken = this->pgs.find(pg_num) != this->pgs.end() && - this->pgs[pg_num].state != PG_OFFLINE; - if (currently_taken && !take) + auto pool_id = pool_item.first; + for (auto & kv: pool_item.second.pg_config) { - // Stop this PG - stop_pg(pg_num); - } - else if (take) - { - // Take this PG - std::set all_peers; - for (osd_num_t pg_osd: pg_cfg.target_set) + pg_num_t pg_num = kv.first; + auto & pg_cfg = kv.second; + bool take = pg_cfg.exists && pg_cfg.primary == this->osd_num && + !pg_cfg.pause && (!pg_cfg.cur_primary || pg_cfg.cur_primary == this->osd_num); + auto pg_it = this->pgs.find({ .pool_id = pool_id, .pg_num = pg_num }); + bool currently_taken = pg_it != this->pgs.end() && pg_it->second.state != PG_OFFLINE; + if (currently_taken && !take) { - if (pg_osd != 0) - { - all_peers.insert(pg_osd); - } + // Stop this PG + stop_pg(pg_it->second); } - for (osd_num_t pg_osd: pg_cfg.all_peers) + else if (take) { - if (pg_osd != 0) - { - all_peers.insert(pg_osd); - } - } - for (auto & hist_item: pg_cfg.target_history) - { - for (auto pg_osd: hist_item) + // Take this PG + std::set all_peers; + for (osd_num_t pg_osd: pg_cfg.target_set) { if (pg_osd != 0) { all_peers.insert(pg_osd); } } - } - if (currently_taken) - { - if (this->pgs[pg_num].state & (PG_ACTIVE | PG_INCOMPLETE | PG_PEERING)) + for (osd_num_t pg_osd: pg_cfg.all_peers) { - if (this->pgs[pg_num].target_set == pg_cfg.target_set) + if (pg_osd != 0) { - // No change in osd_set; history changes are ignored - continue; + all_peers.insert(pg_osd); } - else + } + for (auto & hist_item: pg_cfg.target_history) + { + for (auto pg_osd: hist_item) { - // Stop PG, reapply change after stopping - stop_pg(pg_num); + if (pg_osd != 0) + { + all_peers.insert(pg_osd); + } + } + } + if (currently_taken) + { + if (pg_it->second.state & (PG_ACTIVE | PG_INCOMPLETE | PG_PEERING)) + { + if (pg_it->second.target_set == pg_cfg.target_set) + { + // No change in osd_set; history changes are ignored + continue; + } + else + { + // Stop PG, reapply change after stopping + stop_pg(pg_it->second); + all_applied = false; + continue; + } + } + else if (pg_it->second.state & PG_STOPPING) + { + // Reapply change after stopping all_applied = false; continue; } - } - else if (this->pgs[pg_num].state & PG_STOPPING) - { - // Reapply change after stopping - all_applied = false; - continue; - } - else if (this->pgs[pg_num].state & PG_STARTING) - { - if (pg_cfg.cur_primary == this->osd_num) + else if (pg_it->second.state & PG_STARTING) { - // PG locked, continue + if (pg_cfg.cur_primary == this->osd_num) + { + // PG locked, continue + } + else + { + // Reapply change after locking the PG + all_applied = false; + continue; + } } else { - // Reapply change after locking the PG - all_applied = false; - continue; + throw std::runtime_error("Unexpected PG "+std::to_string(pg_num)+" state: "+std::to_string(pg_it->second.state)); } } + auto & pg = this->pgs[{ .pool_id = pool_id, .pg_num = pg_num }]; + pg = (pg_t){ + .state = pg_cfg.cur_primary == this->osd_num ? PG_PEERING : PG_STARTING, + .scheme = pool_item.second.scheme, + .pg_cursize = 0, + .pg_size = pool_item.second.pg_size, + .pg_minsize = pool_item.second.pg_minsize, + .pool_id = pool_id, + .pg_num = pg_num, + .reported_epoch = pg_cfg.epoch, + .target_history = pg_cfg.target_history, + .all_peers = std::vector(all_peers.begin(), all_peers.end()), + .target_set = pg_cfg.target_set, + }; + this->pg_state_dirty.insert({ .pool_id = pool_id, .pg_num = pg_num }); + pg.print_state(); + if (pg_cfg.cur_primary == this->osd_num) + { + // Add peers + for (auto pg_osd: all_peers) + { + if (pg_osd != this->osd_num && c_cli.osd_peer_fds.find(pg_osd) == c_cli.osd_peer_fds.end()) + { + c_cli.connect_peer(pg_osd, st_cli.peer_states[pg_osd]); + } + } + start_pg_peering(pg); + } else { - throw std::runtime_error("Unexpected PG "+std::to_string(pg_num)+" state: "+std::to_string(this->pgs[pg_num].state)); + // Reapply change after locking the PG + all_applied = false; } } - this->pgs[pg_num] = (pg_t){ - .state = pg_cfg.cur_primary == this->osd_num ? PG_PEERING : PG_STARTING, - .pg_cursize = 0, - .pg_num = pg_num, - .reported_epoch = pg_cfg.epoch, - .target_history = pg_cfg.target_history, - .all_peers = std::vector(all_peers.begin(), all_peers.end()), - .target_set = pg_cfg.target_set, - }; - this->pg_state_dirty.insert(pg_num); - this->pgs[pg_num].print_state(); - if (pg_cfg.cur_primary == this->osd_num) - { - // Add peers - for (auto pg_osd: all_peers) - { - if (pg_osd != this->osd_num && c_cli.osd_peer_fds.find(pg_osd) == c_cli.osd_peer_fds.end()) - { - c_cli.connect_peer(pg_osd, st_cli.peer_states[pg_osd]); - } - } - start_pg_peering(pg_num); - } - else - { - // Reapply change after locking the PG - all_applied = false; - } } } report_pg_states(); @@ -601,7 +617,7 @@ void osd_t::report_pg_states() { return; } - std::vector> reporting_pgs; + std::vector> reporting_pgs; json11::Json::array checks; json11::Json::array success; json11::Json::array failure; @@ -613,8 +629,8 @@ void osd_t::report_pg_states() continue; } auto & pg = pg_it->second; - reporting_pgs.push_back({ pg.pg_num, pg.history_changed }); - std::string state_key_base64 = base64_encode(st_cli.etcd_prefix+"/pg/state/"+std::to_string(pg.pg_num)); + reporting_pgs.push_back({ *it, pg.history_changed }); + std::string state_key_base64 = base64_encode(st_cli.etcd_prefix+"/pg/state/"+std::to_string(pg.pool_id)+"/"+std::to_string(pg.pg_num)); if (pg.state == PG_STARTING) { // Check that the PG key does not exist @@ -656,7 +672,7 @@ void osd_t::report_pg_states() } success.push_back(json11::Json::object { { "request_put", json11::Json::object { - { "key", base64_encode(st_cli.etcd_prefix+"/pg/state/"+std::to_string(pg.pg_num)) }, + { "key", state_key_base64 }, { "value", base64_encode(json11::Json(json11::Json::object { { "primary", this->osd_num }, { "state", pg_state_keywords }, @@ -669,7 +685,7 @@ void osd_t::report_pg_states() { // Prevent race conditions (for the case when the monitor is updating this key at the same time) pg.history_changed = false; - std::string history_key = base64_encode(st_cli.etcd_prefix+"/pg/history/"+std::to_string(pg.pg_num)); + std::string history_key = base64_encode(st_cli.etcd_prefix+"/pg/history/"+std::to_string(pg.pool_id)+"/"+std::to_string(pg.pg_num)); json11::Json::object history_value = { { "epoch", pg.epoch }, { "all_peers", pg.all_peers }, @@ -724,14 +740,20 @@ void osd_t::report_pg_states() auto kv = st_cli.parse_etcd_kv(res["kvs"][0]); if (kv.key.substr(st_cli.etcd_prefix.length()+10) == st_cli.etcd_prefix+"/pg/state/") { - pg_num_t pg_num = stoull_full(kv.key.substr(st_cli.etcd_prefix.length()+10)); - auto pg_it = pgs.find(pg_num); - if (pg_it != pgs.end() && pg_it->second.state != PG_OFFLINE && pg_it->second.state != PG_STARTING) + pool_id_t pool_id = 0; + pg_num_t pg_num = 0; + char null_byte = 0; + sscanf(kv.key.c_str() + st_cli.etcd_prefix.length()+10, "%u/%u%c", &pool_id, &pg_num, &null_byte); + if (null_byte == 0) { - // Live PG state update failed - printf("Failed to report state of PG %u which is live. Race condition detected, exiting\n", pg_num); - force_stop(1); - return; + auto pg_it = pgs.find({ .pool_id = pool_id, .pg_num = pg_num }); + if (pg_it != pgs.end() && pg_it->second.state != PG_OFFLINE && pg_it->second.state != PG_STARTING) + { + // Live PG state update failed + printf("Failed to report state of pool %u PG %u which is live. Race condition detected, exiting\n", pool_id, pg_num); + force_stop(1); + return; + } } } } diff --git a/osd_flush.cpp b/osd_flush.cpp index 78838218..eab1cb52 100644 --- a/osd_flush.cpp +++ b/osd_flush.cpp @@ -2,9 +2,8 @@ #define FLUSH_BATCH 512 -void osd_t::submit_pg_flush_ops(pg_num_t pg_num) +void osd_t::submit_pg_flush_ops(pg_t & pg) { - pg_t & pg = pgs[pg_num]; pg_flush_batch_t *fb = new pg_flush_batch_t(); pg.flush_batch = fb; auto it = pg.flush_actions.begin(), prev_it = pg.flush_actions.begin(); @@ -45,7 +44,7 @@ void osd_t::submit_pg_flush_ops(pg_num_t pg_num) if (l.second.size() > 0) { fb->flush_ops++; - submit_flush_op(pg.pg_num, fb, true, l.first, l.second.size(), l.second.data()); + submit_flush_op(pg.pool_id, pg.pg_num, fb, true, l.first, l.second.size(), l.second.data()); } } for (auto & l: fb->stable_lists) @@ -53,14 +52,15 @@ void osd_t::submit_pg_flush_ops(pg_num_t pg_num) if (l.second.size() > 0) { fb->flush_ops++; - submit_flush_op(pg.pg_num, fb, false, l.first, l.second.size(), l.second.data()); + submit_flush_op(pg.pool_id, pg.pg_num, fb, false, l.first, l.second.size(), l.second.data()); } } } -void osd_t::handle_flush_op(bool rollback, pg_num_t pg_num, pg_flush_batch_t *fb, osd_num_t peer_osd, int retval) +void osd_t::handle_flush_op(bool rollback, pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t *fb, osd_num_t peer_osd, int retval) { - if (pgs.find(pg_num) == pgs.end() || pgs[pg_num].flush_batch != fb) + pool_pg_num_t pg_id = { .pool_id = pool_id, .pg_num = pg_num }; + if (pgs.find(pg_id) == pgs.end() || pgs[pg_id].flush_batch != fb) { // Throw the result away return; @@ -92,7 +92,7 @@ void osd_t::handle_flush_op(bool rollback, pg_num_t pg_num, pg_flush_batch_t *fb { // This flush batch is done std::vector continue_ops; - auto & pg = pgs[pg_num]; + auto & pg = pgs[pg_id]; auto it = pg.flush_actions.begin(), prev_it = it; auto erase_start = it; while (1) @@ -153,7 +153,7 @@ void osd_t::handle_flush_op(bool rollback, pg_num_t pg_num, pg_flush_batch_t *fb } } -void osd_t::submit_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback, osd_num_t peer_osd, int count, obj_ver_id *data) +void osd_t::submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback, osd_num_t peer_osd, int count, obj_ver_id *data) { osd_op_t *op = new osd_op_t(); // Copy buffer so it gets freed along with the operation @@ -165,10 +165,10 @@ void osd_t::submit_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback clock_gettime(CLOCK_REALTIME, &op->tv_begin); op->bs_op = new blockstore_op_t({ .opcode = (uint64_t)(rollback ? BS_OP_ROLLBACK : BS_OP_STABLE), - .callback = [this, op, pg_num, fb](blockstore_op_t *bs_op) + .callback = [this, op, pool_id, pg_num, fb](blockstore_op_t *bs_op) { add_bs_subop_stats(op); - handle_flush_op(bs_op->opcode == BS_OP_ROLLBACK, pg_num, fb, this->osd_num, bs_op->retval); + handle_flush_op(bs_op->opcode == BS_OP_ROLLBACK, pool_id, pg_num, fb, this->osd_num, bs_op->retval); delete op->bs_op; op->bs_op = NULL; delete op; @@ -195,9 +195,9 @@ void osd_t::submit_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback .len = count * sizeof(obj_ver_id), }, }; - op->callback = [this, pg_num, fb, peer_osd](osd_op_t *op) + op->callback = [this, pool_id, pg_num, fb, peer_osd](osd_op_t *op) { - handle_flush_op(op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK, pg_num, fb, peer_osd, op->reply.hdr.retval); + handle_flush_op(op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK, pool_id, pg_num, fb, peer_osd, op->reply.hdr.retval); delete op; }; c_cli.outbox_push(op); @@ -215,7 +215,6 @@ bool osd_t::pick_next_recovery(osd_recovery_op_t &op) if (recovery_ops.find(obj_it->first) == recovery_ops.end()) { op.degraded = true; - op.pg_num = pg_it->first; op.oid = obj_it->first; return true; } @@ -231,7 +230,6 @@ bool osd_t::pick_next_recovery(osd_recovery_op_t &op) if (recovery_ops.find(obj_it->first) == recovery_ops.end()) { op.degraded = false; - op.pg_num = pg_it->first; op.oid = obj_it->first; return true; } diff --git a/osd_id.h b/osd_id.h index 69720521..fd0c6254 100644 --- a/osd_id.h +++ b/osd_id.h @@ -1,4 +1,24 @@ #pragma once +#define POOL_SCHEME_REPLICATED 1 +#define POOL_SCHEME_XOR 2 +#define POOL_ID_MAX 0x10000 +#define POOL_ID_BITS 16 +#define INODE_POOL(inode) (pool_id_t)((inode) >> (64 - POOL_ID_BITS)) + +// Pool ID is 16 bits long +typedef uint32_t pool_id_t; + typedef uint64_t osd_num_t; typedef uint32_t pg_num_t; + +struct pool_pg_num_t +{ + pool_id_t pool_id; + pg_num_t pg_num; +}; + +inline bool operator < (const pool_pg_num_t & a, const pool_pg_num_t & b) +{ + return a.pool_id < b.pool_id || a.pool_id == b.pool_id && a.pg_num < b.pg_num; +} diff --git a/osd_peering.cpp b/osd_peering.cpp index abf5e3fb..5bb81161 100644 --- a/osd_peering.cpp +++ b/osd_peering.cpp @@ -50,7 +50,7 @@ void osd_t::handle_peers() { if (!p.second.flush_batch) { - submit_pg_flush_ops(p.first); + submit_pg_flush_ops(p.second); } still = true; } @@ -89,25 +89,18 @@ void osd_t::repeer_pgs(osd_num_t peer_osd) { // Repeer this pg printf("[PG %u] Repeer because of OSD %lu\n", p.second.pg_num, peer_osd); - start_pg_peering(p.second.pg_num); + start_pg_peering(p.second); } } } } // Repeer on each connect/disconnect peer event -void osd_t::start_pg_peering(pg_num_t pg_num) +void osd_t::start_pg_peering(pg_t & pg) { - auto & pg = pgs[pg_num]; pg.state = PG_PEERING; this->peering_state |= OSD_PEERING_PGS; report_pg_state(pg); - if (parsed_cfg.target_set.size() != 3) - { - printf("Bad PG %u config format: incorrect osd_set = %s\n", pg_num, pg_item.second["osd_set"].dump().c_str()); - parsed_cfg.target_set.resize(3); - parsed_cfg.pause = true; - } // Reset PG state pg.cur_peers.clear(); pg.state_dict.clear(); @@ -132,20 +125,19 @@ void osd_t::start_pg_peering(pg_num_t pg_num) for (auto it = unstable_writes.begin(); it != unstable_writes.end(); ) { // Forget this PG's unstable writes - pg_num_t n = (it->first.oid.inode + it->first.oid.stripe / pg_stripe_size) % pg_count + 1; - if (n == pg.pg_num) + if (INODE_POOL(it->first.oid.inode) == pg.pool_id && map_to_pg(it->first.oid) == pg.pg_num) unstable_writes.erase(it++); else it++; } - dirty_pgs.erase(pg.pg_num); + dirty_pgs.erase({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }); // Drop connections of clients who have this PG in dirty_pgs if (immediate_commit != IMMEDIATE_ALL) { std::vector to_stop; for (auto & cp: c_cli.clients) { - if (cp.second.dirty_pgs.find(pg_num) != cp.second.dirty_pgs.end()) + if (cp.second.dirty_pgs.find({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }) != cp.second.dirty_pgs.end()) { to_stop.push_back(cp.first); } @@ -351,7 +343,9 @@ void osd_t::submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps) op->bs_op = new blockstore_op_t(); op->bs_op->opcode = BS_OP_LIST; op->bs_op->oid.stripe = pg_stripe_size; - op->bs_op->len = pg_count; + op->bs_op->oid.inode = ((uint64_t)ps->pool_id << (64 - POOL_ID_BITS)); + op->bs_op->version = ((uint64_t)(ps->pool_id+1) << (64 - POOL_ID_BITS)) - 1; + op->bs_op->len = pg_counts[ps->pool_id]; op->bs_op->offset = ps->pg_num-1; op->bs_op->callback = [this, ps, op, role_osd](blockstore_op_t *bs_op) { @@ -391,8 +385,10 @@ void osd_t::submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps) .opcode = OSD_OP_SEC_LIST, }, .list_pg = ps->pg_num, - .pg_count = pg_count, + .pg_count = pg_counts[ps->pool_id], .pg_stripe_size = pg_stripe_size, + .min_inode = ((uint64_t)(ps->pool_id) << (64 - POOL_ID_BITS)), + .max_inode = ((uint64_t)(ps->pool_id+1) << (64 - POOL_ID_BITS)) - 1, }, }; op->callback = [this, ps, role_osd](osd_op_t *op) @@ -448,14 +444,8 @@ void osd_t::discard_list_subop(osd_op_t *list_op) } } -bool osd_t::stop_pg(pg_num_t pg_num) +bool osd_t::stop_pg(pg_t & pg) { - auto pg_it = pgs.find(pg_num); - if (pg_it == pgs.end()) - { - return false; - } - auto & pg = pg_it->second; if (pg.peering_state) { // Stop peering @@ -498,7 +488,7 @@ void osd_t::finish_stop_pg(pg_t & pg) void osd_t::report_pg_state(pg_t & pg) { pg.print_state(); - this->pg_state_dirty.insert(pg.pg_num); + this->pg_state_dirty.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }); if (pg.state == PG_ACTIVE && (pg.target_history.size() > 0 || pg.all_peers.size() > pg.target_set.size())) { // Clear history of active+clean PGs diff --git a/osd_peering_pg.h b/osd_peering_pg.h index 6fadb658..701f8e88 100644 --- a/osd_peering_pg.h +++ b/osd_peering_pg.h @@ -44,6 +44,7 @@ struct pg_peering_state_t // osd_num -> list result std::unordered_map list_ops; std::unordered_map list_results; + pool_id_t pool_id = 0; pg_num_t pg_num = 0; }; @@ -71,8 +72,10 @@ struct pg_flush_batch_t struct pg_t { int state = 0; - uint64_t pg_cursize = 3, pg_size = 3, pg_minsize = 2; - pg_num_t pg_num; + uint64_t scheme = 0; + uint64_t pg_cursize = 0, pg_size = 0, pg_minsize = 0; + pool_id_t pool_id = 0; + pg_num_t pg_num = 0; uint64_t clean_count = 0, total_count = 0; // epoch number - should increase with each non-clean activation of the PG uint64_t epoch = 0, reported_epoch = 0; diff --git a/osd_primary.cpp b/osd_primary.cpp index 772aa67b..c1261815 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -20,8 +20,9 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op) // oid.stripe = starting offset of the parity stripe .stripe = (cur_op->req.rw.offset/pg_block_size)*pg_block_size, }; - pg_num_t pg_num = (cur_op->req.rw.inode + oid.stripe/pg_stripe_size) % pg_count + 1; - auto pg_it = pgs.find(pg_num); + pool_id_t pool_id = INODE_POOL(oid.inode); + pg_num_t pg_num = (cur_op->req.rw.inode + oid.stripe/pg_stripe_size) % pg_counts[pool_id] + 1; + auto pg_it = pgs.find({ .pool_id = pool_id, .pg_num = pg_num }); if (pg_it == pgs.end() || !(pg_it->second.state & PG_ACTIVE)) { // This OSD is not primary for this PG or the PG is inactive @@ -86,7 +87,7 @@ void osd_t::continue_primary_read(osd_op_t *cur_op) if (op_data->st == 1) goto resume_1; else if (op_data->st == 2) goto resume_2; { - auto & pg = pgs[op_data->pg_num]; + auto & pg = pgs[{ .pool_id = INODE_POOL(op_data->oid.inode), .pg_num = op_data->pg_num }]; for (int role = 0; role < pg.pg_minsize; role++) { op_data->stripes[role].read_start = op_data->stripes[role].req_start; @@ -190,7 +191,7 @@ void osd_t::continue_primary_write(osd_op_t *cur_op) return; } osd_primary_op_data_t *op_data = cur_op->op_data; - auto & pg = pgs[op_data->pg_num]; + auto & pg = pgs[{ .pool_id = INODE_POOL(op_data->oid.inode), .pg_num = op_data->pg_num }]; if (op_data->st == 1) goto resume_1; else if (op_data->st == 2) goto resume_2; else if (op_data->st == 3) goto resume_3; @@ -246,7 +247,7 @@ resume_3: { // Report newer epoch before writing // FIXME: We may report only one PG state here... - this->pg_state_dirty.insert(pg.pg_num); + this->pg_state_dirty.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }); pg.history_changed = true; report_pg_states(); resume_10: @@ -399,8 +400,8 @@ resume_7: } // Remember PG as dirty to drop the connection when PG goes offline // (this is required because of the "lazy sync") - c_cli.clients[cur_op->peer_fd].dirty_pgs.insert(op_data->pg_num); - dirty_pgs.insert(op_data->pg_num); + c_cli.clients[cur_op->peer_fd].dirty_pgs.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }); + dirty_pgs.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }); } return true; } @@ -445,7 +446,7 @@ resume_2: { op_data->unstable_write_osds = new std::vector(); op_data->unstable_writes = new obj_ver_id[this->unstable_writes.size()]; - op_data->dirty_pgs = new pg_num_t[dirty_pgs.size()]; + op_data->dirty_pgs = new pool_pg_num_t[dirty_pgs.size()]; op_data->dirty_pg_count = dirty_pgs.size(); osd_num_t last_osd = 0; int last_start = 0, last_end = 0; @@ -515,7 +516,7 @@ resume_6: { // Except those from peered PGs auto & w = op_data->unstable_writes[i]; - pg_num_t wpg = map_to_pg(w.oid); + pool_pg_num_t wpg = { .pool_id = INODE_POOL(w.oid.inode), .pg_num = map_to_pg(w.oid) }; if (pgs[wpg].state & PG_ACTIVE) { uint64_t & dest = this->unstable_writes[(osd_object_id_t){ @@ -621,7 +622,7 @@ void osd_t::continue_primary_del(osd_op_t *cur_op) return; } osd_primary_op_data_t *op_data = cur_op->op_data; - auto & pg = pgs[op_data->pg_num]; + auto & pg = pgs[{ .pool_id = INODE_POOL(op_data->oid.inode), .pg_num = op_data->pg_num }]; if (op_data->st == 1) goto resume_1; else if (op_data->st == 2) goto resume_2; else if (op_data->st == 3) goto resume_3; diff --git a/osd_primary.h b/osd_primary.h index 63e15259..a3c64b12 100644 --- a/osd_primary.h +++ b/osd_primary.h @@ -29,7 +29,7 @@ struct osd_primary_op_data_t // for sync. oops, requires freeing std::vector *unstable_write_osds = NULL; - pg_num_t *dirty_pgs = NULL; + pool_pg_num_t *dirty_pgs = NULL; int dirty_pg_count = 0; obj_ver_id *unstable_writes = NULL; }; diff --git a/osd_primary_subops.cpp b/osd_primary_subops.cpp index 91148422..d02b4cee 100644 --- a/osd_primary_subops.cpp +++ b/osd_primary_subops.cpp @@ -37,7 +37,7 @@ void osd_t::finish_op(osd_op_t *cur_op, int retval) { if (cur_op->op_data->pg_num > 0) { - auto & pg = pgs[cur_op->op_data->pg_num]; + auto & pg = pgs[{ .pool_id = INODE_POOL(cur_op->op_data->oid.inode), .pg_num = cur_op->op_data->pg_num }]; pg.inflight--; assert(pg.inflight >= 0); if ((pg.state & PG_STOPPING) && pg.inflight == 0 && !pg.flush_batch)