forked from vitalif/vitastor
Parse pool configuration in etcd_state_client
parent
0918ea08fa
commit
293cb5bd1d
|
@ -233,6 +233,11 @@ void etcd_state_client_t::load_global_config()
|
|||
void etcd_state_client_t::load_pgs()
|
||||
{
|
||||
json11::Json::array txn = {
|
||||
json11::Json::object {
|
||||
{ "request_range", json11::Json::object {
|
||||
{ "key", base64_encode(etcd_prefix+"/config/pools") },
|
||||
} }
|
||||
},
|
||||
json11::Json::object {
|
||||
{ "request_range", json11::Json::object {
|
||||
{ "key", base64_encode(etcd_prefix+"/config/pgs") },
|
||||
|
@ -293,47 +298,120 @@ void etcd_state_client_t::load_pgs()
|
|||
|
||||
void etcd_state_client_t::parse_state(const std::string & key, const json11::Json & value)
|
||||
{
|
||||
if (key == etcd_prefix+"/config/pgs")
|
||||
if (key == etcd_prefix+"/config/pools")
|
||||
{
|
||||
for (auto & pg_item: this->pg_config)
|
||||
for (auto & pool_item: this->pool_config)
|
||||
{
|
||||
pool_item.second.exists = false;
|
||||
}
|
||||
for (auto & pool_item: value.object_items())
|
||||
{
|
||||
pool_id_t pool_id = stoull_full(pool_item.first);
|
||||
if (!pool_id || pool_id >= POOL_ID_MAX)
|
||||
{
|
||||
printf("Pool ID %s is invalid (must be a number less than 0x%x), skipping pool\n", pool_item.first.c_str(), POOL_ID_MAX);
|
||||
continue;
|
||||
}
|
||||
if (pool_item.second["pg_size"].uint64_value() < 1 ||
|
||||
pool_item.second["scheme"] == "xor" && pool_item.second["pg_size"].uint64_value() < 3)
|
||||
{
|
||||
printf("Pool %lu has invalid pg_size, skipping pool\n", pool_id);
|
||||
continue;
|
||||
}
|
||||
if (pool_item.second["pg_minsize"].uint64_value() < 1 ||
|
||||
pool_item.second["pg_minsize"].uint64_value() > pool_item.second["pg_size"].uint64_value() ||
|
||||
pool_item.second["pg_minsize"].uint64_value() < (pool_item.second["pg_size"].uint64_value() - 1))
|
||||
{
|
||||
printf("Pool %lu has invalid pg_minsize, skipping pool\n", pool_id);
|
||||
continue;
|
||||
}
|
||||
if (pool_item.second["pg_count"].uint64_value() < 1)
|
||||
{
|
||||
printf("Pool %lu has invalid pg_count, skipping pool\n", pool_id);
|
||||
continue;
|
||||
}
|
||||
if (pool_item.second["name"].string_value() == "")
|
||||
{
|
||||
printf("Pool %lu has empty name, skipping pool\n", pool_id);
|
||||
continue;
|
||||
}
|
||||
if (pool_item.second["scheme"] != "replicated" && pool_item.second["scheme"] != "xor")
|
||||
{
|
||||
printf("Pool %lu has invalid coding scheme (only \"xor\" and \"replicated\" are allowed), skipping pool\n", pool_id);
|
||||
continue;
|
||||
}
|
||||
if (pool_item.second["max_osd_combinations"].uint64_value() > 0 &&
|
||||
pool_item.second["max_osd_combinations"].uint64_value() < 100)
|
||||
{
|
||||
printf("Pool %lu has invalid max_osd_combinations (must be at least 100), skipping pool\n", pool_id);
|
||||
continue;
|
||||
}
|
||||
this->pool_config[pool_id].exists = true;
|
||||
this->pool_config[pool_id].id = pool_id;
|
||||
this->pool_config[pool_id].name = pool_item.second["name"].string_value();
|
||||
this->pool_config[pool_id].scheme = pool_item.second["scheme"] == "replicated" ? POOL_SCHEME_REPLICATED : POOL_SCHEME_XOR;
|
||||
this->pool_config[pool_id].pg_size = pool_item.second["pg_size"].uint64_value();
|
||||
this->pool_config[pool_id].pg_minsize = pool_item.second["pg_minsize"].uint64_value();
|
||||
this->pool_config[pool_id].pg_count = pool_item.second["pg_count"].uint64_value();
|
||||
this->pool_config[pool_id].failure_domain = pool_item.second["failure_domain"].string_value();
|
||||
this->pool_config[pool_id].max_osd_combinations = pool_item.second["max_osd_combinations"].uint64_value();
|
||||
if (!this->pool_config[pool_id].max_osd_combinations)
|
||||
{
|
||||
this->pool_config[pool_id].max_osd_combinations = 10000;
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (key == etcd_prefix+"/config/pgs")
|
||||
{
|
||||
for (auto & pool_item: this->pool_config)
|
||||
{
|
||||
for (auto & pg_item: pool_item.second.pg_config)
|
||||
{
|
||||
pg_item.second.exists = false;
|
||||
}
|
||||
for (auto & pg_item: value["items"].object_items())
|
||||
}
|
||||
for (auto & pool_item: value["items"].object_items())
|
||||
{
|
||||
pool_id_t pool_id = stoull_full(pool_item.first);
|
||||
if (!pool_id || pool_id >= POOL_ID_MAX)
|
||||
{
|
||||
printf("Pool ID %s is invalid in PG configuration (must be a number less than 0x%x), skipping pool\n", pool_item.first.c_str(), POOL_ID_MAX);
|
||||
continue;
|
||||
}
|
||||
for (auto & pg_item: pool_item.second.object_items())
|
||||
{
|
||||
pg_num_t pg_num = stoull_full(pg_item.first);
|
||||
if (!pg_num)
|
||||
{
|
||||
printf("Bad key in PG configuration: %s (must be a number), skipped\n", pg_item.first.c_str());
|
||||
printf("Bad key in pool %lu PG configuration: %s (must be a number), skipped\n", pool_id, pg_item.first.c_str());
|
||||
continue;
|
||||
}
|
||||
this->pg_config[pg_num].exists = true;
|
||||
this->pg_config[pg_num].pause = pg_item.second["pause"].bool_value();
|
||||
this->pg_config[pg_num].primary = pg_item.second["primary"].uint64_value();
|
||||
this->pg_config[pg_num].target_set.clear();
|
||||
for (auto pg_osd: pg_item.second["osd_set"].array_items())
|
||||
auto & parsed_cfg = this->pool_config[pool_id].pg_config[pg_num];
|
||||
parsed_cfg.exists = true;
|
||||
parsed_cfg.pause = pg_item.second["pause"].bool_value();
|
||||
parsed_cfg.primary = pg_item.second["primary"].uint64_value();
|
||||
parsed_cfg.target_set.clear();
|
||||
for (auto & pg_osd: pg_item.second["osd_set"].array_items())
|
||||
{
|
||||
this->pg_config[pg_num].target_set.push_back(pg_osd.uint64_value());
|
||||
parsed_cfg.target_set.push_back(pg_osd.uint64_value());
|
||||
}
|
||||
if (this->pg_config[pg_num].target_set.size() != 3)
|
||||
{
|
||||
printf("Bad PG %u config format: incorrect osd_set = %s\n", pg_num, pg_item.second["osd_set"].dump().c_str());
|
||||
this->pg_config[pg_num].target_set.resize(3);
|
||||
this->pg_config[pg_num].pause = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (key.substr(0, etcd_prefix.length()+12) == etcd_prefix+"/pg/history/")
|
||||
{
|
||||
// <etcd_prefix>/pg/history/%d
|
||||
pg_num_t pg_num = stoull_full(key.substr(etcd_prefix.length()+12));
|
||||
if (!pg_num)
|
||||
// <etcd_prefix>/pg/history/%d/%d
|
||||
pool_id_t pool_id = 0;
|
||||
pg_num_t pg_num = 0;
|
||||
char null_byte = 0;
|
||||
sscanf(key.c_str() + etcd_prefix.length()+12, "%lu/%u%c", &pool_id, &pg_num, &null_byte);
|
||||
if (!pool_id || pool_id >= POOL_ID_MAX || !pg_num || null_byte != 0)
|
||||
{
|
||||
printf("Bad etcd key %s, ignoring\n", key.c_str());
|
||||
}
|
||||
else
|
||||
{
|
||||
auto & pg_cfg = this->pg_config[pg_num];
|
||||
auto & pg_cfg = this->pool_config[pool_id].pg_config[pg_num];
|
||||
pg_cfg.target_history.clear();
|
||||
pg_cfg.all_peers.clear();
|
||||
// Refuse to start PG if any set of the <osd_sets> has no live OSDs
|
||||
|
@ -355,22 +433,25 @@ void etcd_state_client_t::parse_state(const std::string & key, const json11::Jso
|
|||
pg_cfg.epoch = value["epoch"].uint64_value();
|
||||
if (on_change_pg_history_hook != NULL)
|
||||
{
|
||||
on_change_pg_history_hook(pg_num);
|
||||
on_change_pg_history_hook(pool_id, pg_num);
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (key.substr(0, etcd_prefix.length()+10) == etcd_prefix+"/pg/state/")
|
||||
{
|
||||
// <etcd_prefix>/pg/state/%d
|
||||
pg_num_t pg_num = stoull_full(key.substr(etcd_prefix.length()+10));
|
||||
if (!pg_num)
|
||||
// <etcd_prefix>/pg/state/%d/%d
|
||||
pool_id_t pool_id = 0;
|
||||
pg_num_t pg_num = 0;
|
||||
char null_byte = 0;
|
||||
sscanf(key.c_str() + etcd_prefix.length()+10, "%lu/%u%c", &pool_id, &pg_num, &null_byte);
|
||||
if (!pool_id || pool_id >= POOL_ID_MAX || !pg_num || null_byte != 0)
|
||||
{
|
||||
printf("Bad etcd key %s, ignoring\n", key.c_str());
|
||||
}
|
||||
else if (value.is_null())
|
||||
{
|
||||
this->pg_config[pg_num].cur_primary = 0;
|
||||
this->pg_config[pg_num].cur_state = 0;
|
||||
this->pool_config[pool_id].pg_config[pg_num].cur_primary = 0;
|
||||
this->pool_config[pool_id].pg_config[pg_num].cur_state = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -401,8 +482,8 @@ void etcd_state_client_t::parse_state(const std::string & key, const json11::Jso
|
|||
printf("Unexpected PG %u state in etcd: primary=%lu, state=%s\n", pg_num, cur_primary, value["state"].dump().c_str());
|
||||
return;
|
||||
}
|
||||
this->pg_config[pg_num].cur_primary = cur_primary;
|
||||
this->pg_config[pg_num].cur_state = state;
|
||||
this->pool_config[pool_id].pg_config[pg_num].cur_primary = cur_primary;
|
||||
this->pool_config[pool_id].pg_config[pg_num].cur_state = state;
|
||||
}
|
||||
}
|
||||
else if (key.substr(0, etcd_prefix.length()+11) == etcd_prefix+"/osd/state/")
|
||||
|
|
|
@ -13,6 +13,17 @@
|
|||
#define ETCD_SLOW_TIMEOUT 5000
|
||||
#define ETCD_QUICK_TIMEOUT 1000
|
||||
|
||||
#define POOL_SCHEME_REPLICATED 1
|
||||
#define POOL_SCHEME_XOR 2
|
||||
#define POOL_ID_MAX 0x10000
|
||||
#define POOL_ID_BITS 16
|
||||
|
||||
struct json_kv_t
|
||||
{
|
||||
std::string key;
|
||||
json11::Json value;
|
||||
};
|
||||
|
||||
struct pg_config_t
|
||||
{
|
||||
bool exists;
|
||||
|
@ -26,10 +37,19 @@ struct pg_config_t
|
|||
uint64_t epoch;
|
||||
};
|
||||
|
||||
struct json_kv_t
|
||||
typedef uint64_t pool_id_t;
|
||||
|
||||
struct pool_config_t
|
||||
{
|
||||
std::string key;
|
||||
json11::Json value;
|
||||
bool exists;
|
||||
pool_id_t id;
|
||||
std::string name;
|
||||
uint64_t scheme;
|
||||
uint64_t pg_size, pg_minsize;
|
||||
uint64_t pg_count;
|
||||
std::string failure_domain;
|
||||
uint64_t max_osd_combinations;
|
||||
std::map<pg_num_t, pg_config_t> pg_config;
|
||||
};
|
||||
|
||||
struct etcd_state_client_t
|
||||
|
@ -42,14 +62,14 @@ struct etcd_state_client_t
|
|||
int etcd_watches_initialised = 0;
|
||||
uint64_t etcd_watch_revision = 0;
|
||||
websocket_t *etcd_watch_ws = NULL;
|
||||
std::map<pg_num_t, pg_config_t> pg_config;
|
||||
std::map<pool_id_t, pool_config_t> pool_config;
|
||||
std::map<osd_num_t, json11::Json> peer_states;
|
||||
|
||||
std::function<void(json11::Json::object &)> on_change_hook;
|
||||
std::function<void(json11::Json::object &)> on_load_config_hook;
|
||||
std::function<json11::Json()> load_pgs_checks_hook;
|
||||
std::function<void(bool)> on_load_pgs_hook;
|
||||
std::function<void(pg_num_t)> on_change_pg_history_hook;
|
||||
std::function<void(pool_id_t, pg_num_t)> on_change_pg_history_hook;
|
||||
std::function<void(osd_num_t)> on_change_osd_state_hook;
|
||||
|
||||
json_kv_t parse_etcd_kv(const json11::Json & kv_json);
|
||||
|
|
15
lp/mon.js
15
lp/mon.js
|
@ -557,12 +557,14 @@ class Mon
|
|||
console.log('Pool ID '+pool_id+' is invalid');
|
||||
return false;
|
||||
}
|
||||
if (!pool_cfg.pg_size || pool_cfg.pg_size < 1)
|
||||
if (!pool_cfg.pg_size || pool_cfg.pg_size < 1 ||
|
||||
pool_cfg.scheme === 'xor' && pool_cfg.pg_size < 3)
|
||||
{
|
||||
console.log('Pool '+pool_id+' has invalid pg_size');
|
||||
return false;
|
||||
}
|
||||
if (!pool_cfg.pg_minsize || pool_cfg.pg_minsize < 1 || pool_cfg.pg_minsize > pool_cfg.pg_size)
|
||||
if (!pool_cfg.pg_minsize || pool_cfg.pg_minsize < 1 || pool_cfg.pg_minsize > pool_cfg.pg_size ||
|
||||
pool_cfg.scheme === 'xor' && pool_cfg.pg_minsize < (pool_cfg.pg_size - 1))
|
||||
{
|
||||
console.log('Pool '+pool_id+' has invalid pg_minsize');
|
||||
return false;
|
||||
|
@ -574,12 +576,17 @@ class Mon
|
|||
}
|
||||
if (!pool_cfg.name)
|
||||
{
|
||||
console.log('Pool '+pool_id+' has invalid pg_count');
|
||||
console.log('Pool '+pool_id+' has empty name');
|
||||
return false;
|
||||
}
|
||||
if (pool_cfg.scheme !== 'xor' && pool_cfg.scheme !== 'replicated')
|
||||
{
|
||||
console.log('Pool '+pool_id+' has invalid coding scheme (only "xor" and "replicated" are allowed)');
|
||||
return false;
|
||||
}
|
||||
if (pool_cfg.max_osd_combinations < 100)
|
||||
{
|
||||
console.log('Pool '+pool_id+' has invalid max_osd_combinations');
|
||||
console.log('Pool '+pool_id+' has invalid max_osd_combinations (must be at least 100)');
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
|
|
|
@ -102,6 +102,12 @@ void osd_t::start_pg_peering(pg_num_t pg_num)
|
|||
pg.state = PG_PEERING;
|
||||
this->peering_state |= OSD_PEERING_PGS;
|
||||
report_pg_state(pg);
|
||||
if (parsed_cfg.target_set.size() != 3)
|
||||
{
|
||||
printf("Bad PG %u config format: incorrect osd_set = %s\n", pg_num, pg_item.second["osd_set"].dump().c_str());
|
||||
parsed_cfg.target_set.resize(3);
|
||||
parsed_cfg.pause = true;
|
||||
}
|
||||
// Reset PG state
|
||||
pg.cur_peers.clear();
|
||||
pg.state_dict.clear();
|
||||
|
|
Loading…
Reference in New Issue