Clear old PG states, history, and OSD states on etcd state reload
Test / buildenv (push) Successful in 9s Details
Test / build (push) Successful in 2m53s Details
Test / make_test (push) Successful in 32s Details
Test / test_add_osd (push) Successful in 1m27s Details
Test / test_cas (push) Successful in 8s Details
Test / test_change_pg_count (push) Successful in 40s Details
Test / test_change_pg_size (push) Successful in 8s Details
Test / test_create_nomaxid (push) Successful in 8s Details
Test / test_change_pg_count_ec (push) Successful in 39s Details
Test / test_etcd_fail (push) Successful in 53s Details
Test / test_interrupted_rebalance (push) Successful in 1m50s Details
Test / test_failure_domain (push) Successful in 43s Details
Test / test_interrupted_rebalance_ec (push) Successful in 2m3s Details
Test / test_interrupted_rebalance_imm (push) Successful in 2m9s Details
Test / test_snapshot (push) Successful in 24s Details
Test / test_minsize_1 (push) Successful in 14s Details
Test / test_snapshot_ec (push) Successful in 30s Details
Test / test_interrupted_rebalance_ec_imm (push) Successful in 1m24s Details
Test / test_rm (push) Successful in 16s Details
Test / test_snapshot_down (push) Successful in 23s Details
Test / test_snapshot_down_ec (push) Successful in 25s Details
Test / test_splitbrain (push) Successful in 21s Details
Test / test_snapshot_chain (push) Successful in 2m24s Details
Test / test_snapshot_chain_ec (push) Successful in 3m5s Details
Test / test_rebalance_verify_imm (push) Successful in 3m21s Details
Test / test_write (push) Successful in 36s Details
Test / test_rebalance_verify (push) Successful in 4m12s Details
Test / test_write_no_same (push) Successful in 15s Details
Test / test_write_xor (push) Successful in 52s Details
Test / test_rebalance_verify_ec_imm (push) Successful in 4m29s Details
Test / test_rebalance_verify_ec (push) Successful in 5m25s Details
Test / test_heal_pg_size_2 (push) Successful in 4m10s Details
Test / test_heal_ec (push) Successful in 4m46s Details
Test / test_heal_csum_32k_dmj (push) Successful in 5m31s Details
Test / test_heal_csum_32k_dj (push) Successful in 5m41s Details
Test / test_heal_csum_32k (push) Successful in 6m41s Details
Test / test_scrub (push) Successful in 1m13s Details
Test / test_heal_csum_4k_dmj (push) Successful in 6m53s Details
Test / test_scrub_xor (push) Successful in 54s Details
Test / test_scrub_zero_osd_2 (push) Successful in 58s Details
Test / test_heal_csum_4k_dj (push) Successful in 6m27s Details
Test / test_scrub_pg_size_6_pg_minsize_4_osd_count_6_ec (push) Successful in 1m15s Details
Test / test_scrub_pg_size_3 (push) Successful in 1m27s Details
Test / test_heal_csum_4k (push) Successful in 6m20s Details
Test / test_scrub_ec (push) Successful in 29s Details
Test / test_move_reappear (push) Successful in 17s Details

Also add protection from etcd watcher messages being split into multiple websocket
messages - I'm not sure if etcd actually does that, but it's better to have extra
protection anyway.

Also check that all etcd watchers are started in the keepalive routine, otherwise
it sometimes tries to revive etcd watchers starting with revision=1 which obviously
always fails because this revision is nearly always compacted.

All these changes should fix an old rarely reproduced bug where SOMETIMES OSDs
didn't react to PG config changes which was leading to offline pools on node reboot.
It happened on the full reload of state from etcd.
kv-update
Vitaliy Filippov 2023-12-24 00:05:43 +03:00
parent 1299373988
commit f72f14e6a7
3 changed files with 109 additions and 16 deletions

View File

@ -333,7 +333,7 @@ void etcd_state_client_t::start_etcd_watcher()
etcd_watch_ws = NULL;
}
if (this->log_level > 1)
fprintf(stderr, "Trying to connect to etcd websocket at %s\n", etcd_address.c_str());
fprintf(stderr, "Trying to connect to etcd websocket at %s, watch from revision %lu\n", etcd_address.c_str(), etcd_watch_revision);
etcd_watch_ws = open_websocket(tfd, etcd_address, etcd_api_path+"/watch", etcd_slow_timeout,
[this, cur_addr = selected_etcd_address](const http_response_t *msg)
{
@ -356,8 +356,8 @@ void etcd_state_client_t::start_etcd_watcher()
watch_id == ETCD_PG_HISTORY_WATCH_ID ||
watch_id == ETCD_OSD_STATE_WATCH_ID)
etcd_watches_initialised++;
if (etcd_watches_initialised == 4 && this->log_level > 0)
fprintf(stderr, "Successfully subscribed to etcd at %s\n", cur_addr.c_str());
if (etcd_watches_initialised == ETCD_TOTAL_WATCHES && this->log_level > 0)
fprintf(stderr, "Successfully subscribed to etcd at %s, revision %lu\n", cur_addr.c_str(), etcd_watch_revision);
}
if (data["result"]["canceled"].bool_value())
{
@ -393,9 +393,13 @@ void etcd_state_client_t::start_etcd_watcher()
exit(1);
}
}
if (etcd_watches_initialised == 4)
if (etcd_watches_initialised == ETCD_TOTAL_WATCHES && !data["result"]["header"]["revision"].is_null())
{
etcd_watch_revision = data["result"]["header"]["revision"].uint64_value()+1;
// Protect against a revision beign split into multiple messages and some
// of them being lost. Even though I'm not sure if etcd actually splits them
// Also sometimes etcd sends something without a header, like:
// {"error": {"grpc_code": 14, "http_code": 503, "http_status": "Service Unavailable", "message": "error reading from server: EOF"}}
etcd_watch_revision = data["result"]["header"]["revision"].uint64_value();
addresses_to_try.clear();
}
// First gather all changes into a hash to remove multiple overwrites
@ -507,7 +511,7 @@ void etcd_state_client_t::start_ws_keepalive()
{
ws_keepalive_timer = tfd->set_timer(etcd_ws_keepalive_interval*1000, true, [this](int)
{
if (!etcd_watch_ws)
if (!etcd_watch_ws || etcd_watches_initialised < ETCD_TOTAL_WATCHES)
{
// Do nothing
}
@ -636,18 +640,28 @@ void etcd_state_client_t::load_pgs()
on_load_pgs_hook(false);
return;
}
reset_pg_exists();
if (!etcd_watch_revision)
{
etcd_watch_revision = data["header"]["revision"].uint64_value()+1;
if (this->log_level > 3)
{
fprintf(stderr, "Loaded revision %lu of PG configuration\n", etcd_watch_revision-1);
}
}
for (auto & res: data["responses"].array_items())
{
for (auto & kv_json: res["response_range"]["kvs"].array_items())
{
auto kv = parse_etcd_kv(kv_json);
if (this->log_level > 3)
{
fprintf(stderr, "Loaded key: %s -> %s\n", kv.key.c_str(), kv.value.dump().c_str());
}
parse_state(kv);
}
}
clean_nonexistent_pgs();
on_load_pgs_hook(true);
start_etcd_watcher();
});
@ -668,6 +682,73 @@ void etcd_state_client_t::load_pgs()
}
#endif
void etcd_state_client_t::reset_pg_exists()
{
for (auto & pool_item: pool_config)
{
for (auto & pg_item: pool_item.second.pg_config)
{
pg_item.second.state_exists = false;
pg_item.second.history_exists = false;
}
}
seen_peers.clear();
}
void etcd_state_client_t::clean_nonexistent_pgs()
{
for (auto & pool_item: pool_config)
{
for (auto pg_it = pool_item.second.pg_config.begin(); pg_it != pool_item.second.pg_config.end(); )
{
auto & pg_cfg = pg_it->second;
if (!pg_cfg.config_exists && !pg_cfg.state_exists && !pg_cfg.history_exists)
{
if (this->log_level > 3)
{
fprintf(stderr, "PG %u/%u disappeared after reload, forgetting it\n", pool_item.first, pg_it->first);
}
pool_item.second.pg_config.erase(pg_it++);
}
else
{
if (!pg_cfg.state_exists)
{
if (this->log_level > 3)
{
fprintf(stderr, "PG %u/%u primary OSD disappeared after reload, forgetting it\n", pool_item.first, pg_it->first);
}
parse_state((etcd_kv_t){
.key = etcd_prefix+"/pg/state/"+std::to_string(pool_item.first)+"/"+std::to_string(pg_it->first),
});
}
if (!pg_cfg.history_exists)
{
if (this->log_level > 3)
{
fprintf(stderr, "PG %u/%u history disappeared after reload, forgetting it\n", pool_item.first, pg_it->first);
}
parse_state((etcd_kv_t){
.key = etcd_prefix+"/pg/history/"+std::to_string(pool_item.first)+"/"+std::to_string(pg_it->first),
});
}
pg_it++;
}
}
}
for (auto & peer_item: peer_states)
{
if (seen_peers.find(peer_item.first) == seen_peers.end())
{
fprintf(stderr, "OSD %lu state disappeared after reload, forgetting it\n", peer_item.first);
parse_state((etcd_kv_t){
.key = etcd_prefix+"/osd/state/"+std::to_string(peer_item.first),
});
}
}
seen_peers.clear();
}
void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
{
const std::string & key = kv.key;
@ -822,7 +903,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
{
for (auto & pg_item: pool_item.second.pg_config)
{
pg_item.second.exists = false;
pg_item.second.config_exists = false;
}
}
for (auto & pool_item: value["items"].object_items())
@ -845,7 +926,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
continue;
}
auto & parsed_cfg = this->pool_config[pool_id].pg_config[pg_num];
parsed_cfg.exists = true;
parsed_cfg.config_exists = true;
parsed_cfg.pause = pg_item.second["pause"].bool_value();
parsed_cfg.primary = pg_item.second["primary"].uint64_value();
parsed_cfg.target_set.clear();
@ -866,7 +947,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
int n = 0;
for (auto pg_it = pool_item.second.pg_config.begin(); pg_it != pool_item.second.pg_config.end(); pg_it++)
{
if (pg_it->second.exists && pg_it->first != ++n)
if (pg_it->second.config_exists && pg_it->first != ++n)
{
fprintf(
stderr, "Invalid pool %u PG configuration: PG numbers don't cover whole 1..%lu range\n",
@ -874,7 +955,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
);
for (pg_it = pool_item.second.pg_config.begin(); pg_it != pool_item.second.pg_config.end(); pg_it++)
{
pg_it->second.exists = false;
pg_it->second.config_exists = false;
}
n = 0;
break;
@ -899,6 +980,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
auto & pg_cfg = this->pool_config[pool_id].pg_config[pg_num];
pg_cfg.target_history.clear();
pg_cfg.all_peers.clear();
pg_cfg.history_exists = !value.is_null();
// Refuse to start PG if any set of the <osd_sets> has no live OSDs
for (auto & hist_item: value["osd_sets"].array_items())
{
@ -951,11 +1033,15 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
}
else if (value.is_null())
{
this->pool_config[pool_id].pg_config[pg_num].cur_primary = 0;
this->pool_config[pool_id].pg_config[pg_num].cur_state = 0;
auto & pg_cfg = this->pool_config[pool_id].pg_config[pg_num];
pg_cfg.state_exists = false;
pg_cfg.cur_primary = 0;
pg_cfg.cur_state = 0;
}
else
{
auto & pg_cfg = this->pool_config[pool_id].pg_config[pg_num];
pg_cfg.state_exists = true;
osd_num_t cur_primary = value["primary"].uint64_value();
int state = 0;
for (auto & e: value["state"].array_items())
@ -983,8 +1069,8 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
fprintf(stderr, "Unexpected pool %u PG %u state in etcd: primary=%lu, state=%s\n", pool_id, pg_num, cur_primary, value["state"].dump().c_str());
return;
}
this->pool_config[pool_id].pg_config[pg_num].cur_primary = cur_primary;
this->pool_config[pool_id].pg_config[pg_num].cur_state = state;
pg_cfg.cur_primary = cur_primary;
pg_cfg.cur_state = state;
}
}
else if (key.substr(0, etcd_prefix.length()+11) == etcd_prefix+"/osd/state/")
@ -998,6 +1084,7 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
value["port"].int64_value() > 0 && value["port"].int64_value() < 65536)
{
this->peer_states[peer_osd] = value;
this->seen_peers.insert(peer_osd);
}
else
{

View File

@ -3,6 +3,8 @@
#pragma once
#include <set>
#include "json11/json11.hpp"
#include "osd_id.h"
#include "timerfd_manager.h"
@ -11,6 +13,7 @@
#define ETCD_PG_STATE_WATCH_ID 2
#define ETCD_PG_HISTORY_WATCH_ID 3
#define ETCD_OSD_STATE_WATCH_ID 4
#define ETCD_TOTAL_WATCHES 4
#define DEFAULT_BLOCK_SIZE 128*1024
#define MIN_DATA_BLOCK_SIZE 4*1024
@ -30,7 +33,7 @@ struct etcd_kv_t
struct pg_config_t
{
bool exists;
bool config_exists, history_exists, state_exists;
osd_num_t primary;
std::vector<osd_num_t> target_set;
std::vector<std::vector<osd_num_t>> target_history;
@ -113,6 +116,7 @@ public:
uint64_t etcd_watch_revision = 0;
std::map<pool_id_t, pool_config_t> pool_config;
std::map<osd_num_t, json11::Json> peer_states;
std::set<osd_num_t> seen_peers;
std::map<inode_t, inode_config_t> inode_config;
std::map<std::string, inode_t> inode_by_name;
@ -138,6 +142,8 @@ public:
void start_ws_keepalive();
void load_global_config();
void load_pgs();
void reset_pg_exists();
void clean_nonexistent_pgs();
void parse_state(const etcd_kv_t & kv);
void parse_config(const json11::Json & config);
void insert_inode_config(const inode_config_t & cfg);

View File

@ -649,7 +649,7 @@ void osd_t::apply_pg_config()
{
pg_num_t pg_num = kv.first;
auto & pg_cfg = kv.second;
bool take = pg_cfg.exists && pg_cfg.primary == this->osd_num &&
bool take = pg_cfg.config_exists && pg_cfg.primary == this->osd_num &&
!pg_cfg.pause && (!pg_cfg.cur_primary || pg_cfg.cur_primary == this->osd_num);
auto pg_it = this->pgs.find({ .pool_id = pool_id, .pg_num = pg_num });
bool currently_taken = pg_it != this->pgs.end() && pg_it->second.state != PG_OFFLINE;