From f72f14e6a7dc0ee62c3781501dbf68aa31b78556 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sun, 24 Dec 2023 00:05:43 +0300 Subject: [PATCH] Clear old PG states, history, and OSD states on etcd state reload Also add protection from etcd watcher messages being split into multiple websocket messages - I'm not sure if etcd actually does that, but it's better to have extra protection anyway. Also check that all etcd watchers are started in the keepalive routine, otherwise it sometimes tries to revive etcd watchers starting with revision=1 which obviously always fails because this revision is nearly always compacted. All these changes should fix an old rarely reproduced bug where SOMETIMES OSDs didn't react to PG config changes which was leading to offline pools on node reboot. It happened on the full reload of state from etcd. --- src/etcd_state_client.cpp | 115 +++++++++++++++++++++++++++++++++----- src/etcd_state_client.h | 8 ++- src/osd_cluster.cpp | 2 +- 3 files changed, 109 insertions(+), 16 deletions(-) diff --git a/src/etcd_state_client.cpp b/src/etcd_state_client.cpp index f42c320f..eabfa319 100644 --- a/src/etcd_state_client.cpp +++ b/src/etcd_state_client.cpp @@ -333,7 +333,7 @@ void etcd_state_client_t::start_etcd_watcher() etcd_watch_ws = NULL; } if (this->log_level > 1) - fprintf(stderr, "Trying to connect to etcd websocket at %s\n", etcd_address.c_str()); + fprintf(stderr, "Trying to connect to etcd websocket at %s, watch from revision %lu\n", etcd_address.c_str(), etcd_watch_revision); etcd_watch_ws = open_websocket(tfd, etcd_address, etcd_api_path+"/watch", etcd_slow_timeout, [this, cur_addr = selected_etcd_address](const http_response_t *msg) { @@ -356,8 +356,8 @@ void etcd_state_client_t::start_etcd_watcher() watch_id == ETCD_PG_HISTORY_WATCH_ID || watch_id == ETCD_OSD_STATE_WATCH_ID) etcd_watches_initialised++; - if (etcd_watches_initialised == 4 && this->log_level > 0) - fprintf(stderr, "Successfully subscribed to etcd at %s\n", cur_addr.c_str()); + if (etcd_watches_initialised == ETCD_TOTAL_WATCHES && this->log_level > 0) + fprintf(stderr, "Successfully subscribed to etcd at %s, revision %lu\n", cur_addr.c_str(), etcd_watch_revision); } if (data["result"]["canceled"].bool_value()) { @@ -393,9 +393,13 @@ void etcd_state_client_t::start_etcd_watcher() exit(1); } } - if (etcd_watches_initialised == 4) + if (etcd_watches_initialised == ETCD_TOTAL_WATCHES && !data["result"]["header"]["revision"].is_null()) { - etcd_watch_revision = data["result"]["header"]["revision"].uint64_value()+1; + // Protect against a revision beign split into multiple messages and some + // of them being lost. Even though I'm not sure if etcd actually splits them + // Also sometimes etcd sends something without a header, like: + // {"error": {"grpc_code": 14, "http_code": 503, "http_status": "Service Unavailable", "message": "error reading from server: EOF"}} + etcd_watch_revision = data["result"]["header"]["revision"].uint64_value(); addresses_to_try.clear(); } // First gather all changes into a hash to remove multiple overwrites @@ -507,7 +511,7 @@ void etcd_state_client_t::start_ws_keepalive() { ws_keepalive_timer = tfd->set_timer(etcd_ws_keepalive_interval*1000, true, [this](int) { - if (!etcd_watch_ws) + if (!etcd_watch_ws || etcd_watches_initialised < ETCD_TOTAL_WATCHES) { // Do nothing } @@ -636,18 +640,28 @@ void etcd_state_client_t::load_pgs() on_load_pgs_hook(false); return; } + reset_pg_exists(); if (!etcd_watch_revision) { etcd_watch_revision = data["header"]["revision"].uint64_value()+1; + if (this->log_level > 3) + { + fprintf(stderr, "Loaded revision %lu of PG configuration\n", etcd_watch_revision-1); + } } for (auto & res: data["responses"].array_items()) { for (auto & kv_json: res["response_range"]["kvs"].array_items()) { auto kv = parse_etcd_kv(kv_json); + if (this->log_level > 3) + { + fprintf(stderr, "Loaded key: %s -> %s\n", kv.key.c_str(), kv.value.dump().c_str()); + } parse_state(kv); } } + clean_nonexistent_pgs(); on_load_pgs_hook(true); start_etcd_watcher(); }); @@ -668,6 +682,73 @@ void etcd_state_client_t::load_pgs() } #endif +void etcd_state_client_t::reset_pg_exists() +{ + for (auto & pool_item: pool_config) + { + for (auto & pg_item: pool_item.second.pg_config) + { + pg_item.second.state_exists = false; + pg_item.second.history_exists = false; + } + } + seen_peers.clear(); +} + +void etcd_state_client_t::clean_nonexistent_pgs() +{ + for (auto & pool_item: pool_config) + { + for (auto pg_it = pool_item.second.pg_config.begin(); pg_it != pool_item.second.pg_config.end(); ) + { + auto & pg_cfg = pg_it->second; + if (!pg_cfg.config_exists && !pg_cfg.state_exists && !pg_cfg.history_exists) + { + if (this->log_level > 3) + { + fprintf(stderr, "PG %u/%u disappeared after reload, forgetting it\n", pool_item.first, pg_it->first); + } + pool_item.second.pg_config.erase(pg_it++); + } + else + { + if (!pg_cfg.state_exists) + { + if (this->log_level > 3) + { + fprintf(stderr, "PG %u/%u primary OSD disappeared after reload, forgetting it\n", pool_item.first, pg_it->first); + } + parse_state((etcd_kv_t){ + .key = etcd_prefix+"/pg/state/"+std::to_string(pool_item.first)+"/"+std::to_string(pg_it->first), + }); + } + if (!pg_cfg.history_exists) + { + if (this->log_level > 3) + { + fprintf(stderr, "PG %u/%u history disappeared after reload, forgetting it\n", pool_item.first, pg_it->first); + } + parse_state((etcd_kv_t){ + .key = etcd_prefix+"/pg/history/"+std::to_string(pool_item.first)+"/"+std::to_string(pg_it->first), + }); + } + pg_it++; + } + } + } + for (auto & peer_item: peer_states) + { + if (seen_peers.find(peer_item.first) == seen_peers.end()) + { + fprintf(stderr, "OSD %lu state disappeared after reload, forgetting it\n", peer_item.first); + parse_state((etcd_kv_t){ + .key = etcd_prefix+"/osd/state/"+std::to_string(peer_item.first), + }); + } + } + seen_peers.clear(); +} + void etcd_state_client_t::parse_state(const etcd_kv_t & kv) { const std::string & key = kv.key; @@ -822,7 +903,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv) { for (auto & pg_item: pool_item.second.pg_config) { - pg_item.second.exists = false; + pg_item.second.config_exists = false; } } for (auto & pool_item: value["items"].object_items()) @@ -845,7 +926,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv) continue; } auto & parsed_cfg = this->pool_config[pool_id].pg_config[pg_num]; - parsed_cfg.exists = true; + parsed_cfg.config_exists = true; parsed_cfg.pause = pg_item.second["pause"].bool_value(); parsed_cfg.primary = pg_item.second["primary"].uint64_value(); parsed_cfg.target_set.clear(); @@ -866,7 +947,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv) int n = 0; for (auto pg_it = pool_item.second.pg_config.begin(); pg_it != pool_item.second.pg_config.end(); pg_it++) { - if (pg_it->second.exists && pg_it->first != ++n) + if (pg_it->second.config_exists && pg_it->first != ++n) { fprintf( stderr, "Invalid pool %u PG configuration: PG numbers don't cover whole 1..%lu range\n", @@ -874,7 +955,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv) ); for (pg_it = pool_item.second.pg_config.begin(); pg_it != pool_item.second.pg_config.end(); pg_it++) { - pg_it->second.exists = false; + pg_it->second.config_exists = false; } n = 0; break; @@ -899,6 +980,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv) auto & pg_cfg = this->pool_config[pool_id].pg_config[pg_num]; pg_cfg.target_history.clear(); pg_cfg.all_peers.clear(); + pg_cfg.history_exists = !value.is_null(); // Refuse to start PG if any set of the has no live OSDs for (auto & hist_item: value["osd_sets"].array_items()) { @@ -951,11 +1033,15 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv) } else if (value.is_null()) { - this->pool_config[pool_id].pg_config[pg_num].cur_primary = 0; - this->pool_config[pool_id].pg_config[pg_num].cur_state = 0; + auto & pg_cfg = this->pool_config[pool_id].pg_config[pg_num]; + pg_cfg.state_exists = false; + pg_cfg.cur_primary = 0; + pg_cfg.cur_state = 0; } else { + auto & pg_cfg = this->pool_config[pool_id].pg_config[pg_num]; + pg_cfg.state_exists = true; osd_num_t cur_primary = value["primary"].uint64_value(); int state = 0; for (auto & e: value["state"].array_items()) @@ -983,8 +1069,8 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv) fprintf(stderr, "Unexpected pool %u PG %u state in etcd: primary=%lu, state=%s\n", pool_id, pg_num, cur_primary, value["state"].dump().c_str()); return; } - this->pool_config[pool_id].pg_config[pg_num].cur_primary = cur_primary; - this->pool_config[pool_id].pg_config[pg_num].cur_state = state; + pg_cfg.cur_primary = cur_primary; + pg_cfg.cur_state = state; } } else if (key.substr(0, etcd_prefix.length()+11) == etcd_prefix+"/osd/state/") @@ -998,6 +1084,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv) value["port"].int64_value() > 0 && value["port"].int64_value() < 65536) { this->peer_states[peer_osd] = value; + this->seen_peers.insert(peer_osd); } else { diff --git a/src/etcd_state_client.h b/src/etcd_state_client.h index 667941ef..bf7b18e5 100644 --- a/src/etcd_state_client.h +++ b/src/etcd_state_client.h @@ -3,6 +3,8 @@ #pragma once +#include + #include "json11/json11.hpp" #include "osd_id.h" #include "timerfd_manager.h" @@ -11,6 +13,7 @@ #define ETCD_PG_STATE_WATCH_ID 2 #define ETCD_PG_HISTORY_WATCH_ID 3 #define ETCD_OSD_STATE_WATCH_ID 4 +#define ETCD_TOTAL_WATCHES 4 #define DEFAULT_BLOCK_SIZE 128*1024 #define MIN_DATA_BLOCK_SIZE 4*1024 @@ -30,7 +33,7 @@ struct etcd_kv_t struct pg_config_t { - bool exists; + bool config_exists, history_exists, state_exists; osd_num_t primary; std::vector target_set; std::vector> target_history; @@ -113,6 +116,7 @@ public: uint64_t etcd_watch_revision = 0; std::map pool_config; std::map peer_states; + std::set seen_peers; std::map inode_config; std::map inode_by_name; @@ -138,6 +142,8 @@ public: void start_ws_keepalive(); void load_global_config(); void load_pgs(); + void reset_pg_exists(); + void clean_nonexistent_pgs(); void parse_state(const etcd_kv_t & kv); void parse_config(const json11::Json & config); void insert_inode_config(const inode_config_t & cfg); diff --git a/src/osd_cluster.cpp b/src/osd_cluster.cpp index 110646b8..a98c6c84 100644 --- a/src/osd_cluster.cpp +++ b/src/osd_cluster.cpp @@ -649,7 +649,7 @@ void osd_t::apply_pg_config() { pg_num_t pg_num = kv.first; auto & pg_cfg = kv.second; - bool take = pg_cfg.exists && pg_cfg.primary == this->osd_num && + bool take = pg_cfg.config_exists && pg_cfg.primary == this->osd_num && !pg_cfg.pause && (!pg_cfg.cur_primary || pg_cfg.cur_primary == this->osd_num); auto pg_it = this->pgs.find({ .pool_id = pool_id, .pg_num = pg_num }); bool currently_taken = pg_it != this->pgs.end() && pg_it->second.state != PG_OFFLINE;