diff --git a/src/etcd_state_client.cpp b/src/etcd_state_client.cpp index f42c320f..eabfa319 100644 --- a/src/etcd_state_client.cpp +++ b/src/etcd_state_client.cpp @@ -333,7 +333,7 @@ void etcd_state_client_t::start_etcd_watcher() etcd_watch_ws = NULL; } if (this->log_level > 1) - fprintf(stderr, "Trying to connect to etcd websocket at %s\n", etcd_address.c_str()); + fprintf(stderr, "Trying to connect to etcd websocket at %s, watch from revision %lu\n", etcd_address.c_str(), etcd_watch_revision); etcd_watch_ws = open_websocket(tfd, etcd_address, etcd_api_path+"/watch", etcd_slow_timeout, [this, cur_addr = selected_etcd_address](const http_response_t *msg) { @@ -356,8 +356,8 @@ void etcd_state_client_t::start_etcd_watcher() watch_id == ETCD_PG_HISTORY_WATCH_ID || watch_id == ETCD_OSD_STATE_WATCH_ID) etcd_watches_initialised++; - if (etcd_watches_initialised == 4 && this->log_level > 0) - fprintf(stderr, "Successfully subscribed to etcd at %s\n", cur_addr.c_str()); + if (etcd_watches_initialised == ETCD_TOTAL_WATCHES && this->log_level > 0) + fprintf(stderr, "Successfully subscribed to etcd at %s, revision %lu\n", cur_addr.c_str(), etcd_watch_revision); } if (data["result"]["canceled"].bool_value()) { @@ -393,9 +393,13 @@ void etcd_state_client_t::start_etcd_watcher() exit(1); } } - if (etcd_watches_initialised == 4) + if (etcd_watches_initialised == ETCD_TOTAL_WATCHES && !data["result"]["header"]["revision"].is_null()) { - etcd_watch_revision = data["result"]["header"]["revision"].uint64_value()+1; + // Protect against a revision beign split into multiple messages and some + // of them being lost. Even though I'm not sure if etcd actually splits them + // Also sometimes etcd sends something without a header, like: + // {"error": {"grpc_code": 14, "http_code": 503, "http_status": "Service Unavailable", "message": "error reading from server: EOF"}} + etcd_watch_revision = data["result"]["header"]["revision"].uint64_value(); addresses_to_try.clear(); } // First gather all changes into a hash to remove multiple overwrites @@ -507,7 +511,7 @@ void etcd_state_client_t::start_ws_keepalive() { ws_keepalive_timer = tfd->set_timer(etcd_ws_keepalive_interval*1000, true, [this](int) { - if (!etcd_watch_ws) + if (!etcd_watch_ws || etcd_watches_initialised < ETCD_TOTAL_WATCHES) { // Do nothing } @@ -636,18 +640,28 @@ void etcd_state_client_t::load_pgs() on_load_pgs_hook(false); return; } + reset_pg_exists(); if (!etcd_watch_revision) { etcd_watch_revision = data["header"]["revision"].uint64_value()+1; + if (this->log_level > 3) + { + fprintf(stderr, "Loaded revision %lu of PG configuration\n", etcd_watch_revision-1); + } } for (auto & res: data["responses"].array_items()) { for (auto & kv_json: res["response_range"]["kvs"].array_items()) { auto kv = parse_etcd_kv(kv_json); + if (this->log_level > 3) + { + fprintf(stderr, "Loaded key: %s -> %s\n", kv.key.c_str(), kv.value.dump().c_str()); + } parse_state(kv); } } + clean_nonexistent_pgs(); on_load_pgs_hook(true); start_etcd_watcher(); }); @@ -668,6 +682,73 @@ void etcd_state_client_t::load_pgs() } #endif +void etcd_state_client_t::reset_pg_exists() +{ + for (auto & pool_item: pool_config) + { + for (auto & pg_item: pool_item.second.pg_config) + { + pg_item.second.state_exists = false; + pg_item.second.history_exists = false; + } + } + seen_peers.clear(); +} + +void etcd_state_client_t::clean_nonexistent_pgs() +{ + for (auto & pool_item: pool_config) + { + for (auto pg_it = pool_item.second.pg_config.begin(); pg_it != pool_item.second.pg_config.end(); ) + { + auto & pg_cfg = pg_it->second; + if (!pg_cfg.config_exists && !pg_cfg.state_exists && !pg_cfg.history_exists) + { + if (this->log_level > 3) + { + fprintf(stderr, "PG %u/%u disappeared after reload, forgetting it\n", pool_item.first, pg_it->first); + } + pool_item.second.pg_config.erase(pg_it++); + } + else + { + if (!pg_cfg.state_exists) + { + if (this->log_level > 3) + { + fprintf(stderr, "PG %u/%u primary OSD disappeared after reload, forgetting it\n", pool_item.first, pg_it->first); + } + parse_state((etcd_kv_t){ + .key = etcd_prefix+"/pg/state/"+std::to_string(pool_item.first)+"/"+std::to_string(pg_it->first), + }); + } + if (!pg_cfg.history_exists) + { + if (this->log_level > 3) + { + fprintf(stderr, "PG %u/%u history disappeared after reload, forgetting it\n", pool_item.first, pg_it->first); + } + parse_state((etcd_kv_t){ + .key = etcd_prefix+"/pg/history/"+std::to_string(pool_item.first)+"/"+std::to_string(pg_it->first), + }); + } + pg_it++; + } + } + } + for (auto & peer_item: peer_states) + { + if (seen_peers.find(peer_item.first) == seen_peers.end()) + { + fprintf(stderr, "OSD %lu state disappeared after reload, forgetting it\n", peer_item.first); + parse_state((etcd_kv_t){ + .key = etcd_prefix+"/osd/state/"+std::to_string(peer_item.first), + }); + } + } + seen_peers.clear(); +} + void etcd_state_client_t::parse_state(const etcd_kv_t & kv) { const std::string & key = kv.key; @@ -822,7 +903,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv) { for (auto & pg_item: pool_item.second.pg_config) { - pg_item.second.exists = false; + pg_item.second.config_exists = false; } } for (auto & pool_item: value["items"].object_items()) @@ -845,7 +926,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv) continue; } auto & parsed_cfg = this->pool_config[pool_id].pg_config[pg_num]; - parsed_cfg.exists = true; + parsed_cfg.config_exists = true; parsed_cfg.pause = pg_item.second["pause"].bool_value(); parsed_cfg.primary = pg_item.second["primary"].uint64_value(); parsed_cfg.target_set.clear(); @@ -866,7 +947,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv) int n = 0; for (auto pg_it = pool_item.second.pg_config.begin(); pg_it != pool_item.second.pg_config.end(); pg_it++) { - if (pg_it->second.exists && pg_it->first != ++n) + if (pg_it->second.config_exists && pg_it->first != ++n) { fprintf( stderr, "Invalid pool %u PG configuration: PG numbers don't cover whole 1..%lu range\n", @@ -874,7 +955,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv) ); for (pg_it = pool_item.second.pg_config.begin(); pg_it != pool_item.second.pg_config.end(); pg_it++) { - pg_it->second.exists = false; + pg_it->second.config_exists = false; } n = 0; break; @@ -899,6 +980,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv) auto & pg_cfg = this->pool_config[pool_id].pg_config[pg_num]; pg_cfg.target_history.clear(); pg_cfg.all_peers.clear(); + pg_cfg.history_exists = !value.is_null(); // Refuse to start PG if any set of the has no live OSDs for (auto & hist_item: value["osd_sets"].array_items()) { @@ -951,11 +1033,15 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv) } else if (value.is_null()) { - this->pool_config[pool_id].pg_config[pg_num].cur_primary = 0; - this->pool_config[pool_id].pg_config[pg_num].cur_state = 0; + auto & pg_cfg = this->pool_config[pool_id].pg_config[pg_num]; + pg_cfg.state_exists = false; + pg_cfg.cur_primary = 0; + pg_cfg.cur_state = 0; } else { + auto & pg_cfg = this->pool_config[pool_id].pg_config[pg_num]; + pg_cfg.state_exists = true; osd_num_t cur_primary = value["primary"].uint64_value(); int state = 0; for (auto & e: value["state"].array_items()) @@ -983,8 +1069,8 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv) fprintf(stderr, "Unexpected pool %u PG %u state in etcd: primary=%lu, state=%s\n", pool_id, pg_num, cur_primary, value["state"].dump().c_str()); return; } - this->pool_config[pool_id].pg_config[pg_num].cur_primary = cur_primary; - this->pool_config[pool_id].pg_config[pg_num].cur_state = state; + pg_cfg.cur_primary = cur_primary; + pg_cfg.cur_state = state; } } else if (key.substr(0, etcd_prefix.length()+11) == etcd_prefix+"/osd/state/") @@ -998,6 +1084,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv) value["port"].int64_value() > 0 && value["port"].int64_value() < 65536) { this->peer_states[peer_osd] = value; + this->seen_peers.insert(peer_osd); } else { diff --git a/src/etcd_state_client.h b/src/etcd_state_client.h index 667941ef..bf7b18e5 100644 --- a/src/etcd_state_client.h +++ b/src/etcd_state_client.h @@ -3,6 +3,8 @@ #pragma once +#include + #include "json11/json11.hpp" #include "osd_id.h" #include "timerfd_manager.h" @@ -11,6 +13,7 @@ #define ETCD_PG_STATE_WATCH_ID 2 #define ETCD_PG_HISTORY_WATCH_ID 3 #define ETCD_OSD_STATE_WATCH_ID 4 +#define ETCD_TOTAL_WATCHES 4 #define DEFAULT_BLOCK_SIZE 128*1024 #define MIN_DATA_BLOCK_SIZE 4*1024 @@ -30,7 +33,7 @@ struct etcd_kv_t struct pg_config_t { - bool exists; + bool config_exists, history_exists, state_exists; osd_num_t primary; std::vector target_set; std::vector> target_history; @@ -113,6 +116,7 @@ public: uint64_t etcd_watch_revision = 0; std::map pool_config; std::map peer_states; + std::set seen_peers; std::map inode_config; std::map inode_by_name; @@ -138,6 +142,8 @@ public: void start_ws_keepalive(); void load_global_config(); void load_pgs(); + void reset_pg_exists(); + void clean_nonexistent_pgs(); void parse_state(const etcd_kv_t & kv); void parse_config(const json11::Json & config); void insert_inode_config(const inode_config_t & cfg); diff --git a/src/osd_cluster.cpp b/src/osd_cluster.cpp index 110646b8..a98c6c84 100644 --- a/src/osd_cluster.cpp +++ b/src/osd_cluster.cpp @@ -649,7 +649,7 @@ void osd_t::apply_pg_config() { pg_num_t pg_num = kv.first; auto & pg_cfg = kv.second; - bool take = pg_cfg.exists && pg_cfg.primary == this->osd_num && + bool take = pg_cfg.config_exists && pg_cfg.primary == this->osd_num && !pg_cfg.pause && (!pg_cfg.cur_primary || pg_cfg.cur_primary == this->osd_num); auto pg_it = this->pgs.find({ .pool_id = pool_id, .pg_num = pg_num }); bool currently_taken = pg_it != this->pgs.end() && pg_it->second.state != PG_OFFLINE;