Acquire etcd leases, prevent starting two OSDs with the same number

trace-sqes
Vitaliy Filippov 2020-04-24 16:46:02 +03:00
parent d398ddfd3b
commit caa01c6aaf
3 changed files with 302 additions and 123 deletions

18
osd.h
View File

@ -217,10 +217,10 @@ class osd_t
// peer OSDs // peer OSDs
std::string etcd_lease_id;
std::map<osd_num_t, json11::Json> peer_states; std::map<osd_num_t, json11::Json> peer_states;
std::map<osd_num_t, osd_wanted_peer_t> wanted_peers; std::map<osd_num_t, osd_wanted_peer_t> wanted_peers;
bool loading_peer_config = false; bool loading_peer_config = false;
std::vector<std::string> bind_addresses;
int etcd_failed_attempts = 0; int etcd_failed_attempts = 0;
std::map<uint64_t, int> osd_peer_fds; std::map<uint64_t, int> osd_peer_fds;
@ -264,15 +264,21 @@ class osd_t
uint64_t subop_stat_sum[2][OSD_OP_MAX+1] = { 0 }; uint64_t subop_stat_sum[2][OSD_OP_MAX+1] = { 0 };
uint64_t subop_stat_count[2][OSD_OP_MAX+1] = { 0 }; uint64_t subop_stat_count[2][OSD_OP_MAX+1] = { 0 };
// methods // cluster connection
void etcd_call(std::string api, json11::Json payload, std::function<void(std::string, json11::Json)> callback);
void etcd_txn(json11::Json txn, std::function<void(std::string, json11::Json)> callback);
void parse_config(blockstore_config_t & config); void parse_config(blockstore_config_t & config);
void init_cluster();
void load_global_config();
void bind_socket(); void bind_socket();
void acquire_lease();
void create_state();
void renew_lease();
void print_stats(); void print_stats();
void reset_stats(); void reset_stats();
json11::Json get_status(); json11::Json get_status();
void etcd_txn(json11::Json txn, std::function<void(std::string, json11::Json)> callback); json11::Json get_statistics();
void init_cluster(); void report_statistics();
void report_status();
void load_pgs(); void load_pgs();
void parse_pgs(const json11::Json & pg_config, const std::map<pg_num_t, json11::Json> & pg_history); void parse_pgs(const json11::Json & pg_config, const std::map<pg_num_t, json11::Json> & pg_history);
void load_and_connect_peers(); void load_and_connect_peers();
@ -300,7 +306,6 @@ class osd_t
void cancel_op(osd_op_t *op); void cancel_op(osd_op_t *op);
void stop_client(int peer_fd); void stop_client(int peer_fd);
void parse_test_peer(std::string peer); void parse_test_peer(std::string peer);
void init_primary();
void handle_peers(); void handle_peers();
void repeer_pgs(osd_num_t osd_num); void repeer_pgs(osd_num_t osd_num);
void start_pg_peering(pg_num_t pg_num); void start_pg_peering(pg_num_t pg_num);
@ -348,6 +353,7 @@ class osd_t
public: public:
osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringloop); osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringloop);
~osd_t(); ~osd_t();
void force_stop();
bool shutdown(); bool shutdown();
}; };

View File

@ -2,64 +2,67 @@
#include "osd_http.h" #include "osd_http.h"
#include "base64.h" #include "base64.h"
void osd_t::init_cluster() void osd_t::etcd_txn(json11::Json txn, std::function<void(std::string, json11::Json)> callback)
{ {
if (run_primary) etcd_call("/kv/txn", txn, callback);
{
init_primary();
}
if (etcd_address != "")
{
if (!run_primary)
{
report_status();
}
printf("[OSD %lu] reporting to etcd at %s each %d seconds\n", osd_num, etcd_address.c_str(), etcd_report_interval);
tfd->set_timer(etcd_report_interval*1000, true, [this](int timer_id)
{
report_status();
});
}
else
{
bind_socket();
}
} }
void osd_t::init_primary() void osd_t::etcd_call(std::string api, json11::Json payload, std::function<void(std::string, json11::Json)> callback)
{
std::string req = payload.dump();
req = "POST "+etcd_api_path+api+" HTTP/1.1\r\n"
"Host: "+etcd_host+"\r\n"
"Content-Type: application/json\r\n"
"Content-Length: "+std::to_string(req.size())+"\r\n"
"Connection: close\r\n"
"\r\n"+req;
http_request_json(etcd_address, req, callback);
}
// Startup sequence:
// Load global OSD configuration -> Bind socket -> Acquire lease -> Report state
// -> Load PGs -> Load peers -> Connect to peers -> Peer PGs
// Event handling
// Wait for PG changes -> Start/Stop PGs when requested
// Peer connection is lost -> Reload connection data -> Try to reconnect
void osd_t::init_cluster()
{ {
if (etcd_address == "") if (etcd_address == "")
{ {
// Test version of clustering code with 1 PG and 2 peers if (run_primary)
// Example: peers = 2:127.0.0.1:11204,3:127.0.0.1:11205
std::string peerstr = config["peers"];
while (peerstr.size())
{ {
int pos = peerstr.find(','); // Test version of clustering code with 1 PG and 2 peers
parse_test_peer(pos < 0 ? peerstr : peerstr.substr(0, pos)); // Example: peers = 2:127.0.0.1:11204,3:127.0.0.1:11205
peerstr = pos < 0 ? std::string("") : peerstr.substr(pos+1); std::string peerstr = config["peers"];
while (peerstr.size())
{
int pos = peerstr.find(',');
parse_test_peer(pos < 0 ? peerstr : peerstr.substr(0, pos));
peerstr = pos < 0 ? std::string("") : peerstr.substr(pos+1);
}
if (peer_states.size() < 2)
{
throw std::runtime_error("run_primary requires at least 2 peers");
}
pgs[1] = (pg_t){
.state = PG_PEERING,
.pg_cursize = 0,
.pg_num = 1,
.target_set = { 1, 2, 3 },
.cur_set = { 0, 0, 0 },
};
pgs[1].print_state();
pg_count = 1;
peering_state = OSD_CONNECTING_PEERS;
} }
if (peer_states.size() < 2) bind_socket();
{
throw std::runtime_error("run_primary requires at least 2 peers");
}
pgs[1] = (pg_t){
.state = PG_PEERING,
.pg_cursize = 0,
.pg_num = 1,
.target_set = { 1, 2, 3 },
.cur_set = { 0, 0, 0 },
};
pgs[1].print_state();
pg_count = 1;
peering_state = OSD_CONNECTING_PEERS;
} }
else else
{ {
peering_state = OSD_LOADING_PGS; peering_state = OSD_LOADING_PGS;
load_pgs(); load_global_config();
} }
if (autosync_interval > 0) if (run_primary && autosync_interval > 0)
{ {
this->tfd->set_timer(autosync_interval*1000, true, [this](int timer_id) this->tfd->set_timer(autosync_interval*1000, true, [this](int timer_id)
{ {
@ -97,22 +100,24 @@ void osd_t::parse_test_peer(std::string peer)
json11::Json osd_t::get_status() json11::Json osd_t::get_status()
{ {
json11::Json::object st; json11::Json::object st;
timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
st["time"] = std::to_string(ts.tv_sec)+"."+std::to_string(ts.tv_nsec/1000000);
st["state"] = "up"; st["state"] = "up";
if (bind_address != "0.0.0.0") if (bind_address != "0.0.0.0")
st["addresses"] = { bind_address }; st["addresses"] = { bind_address };
else else
{ st["addresses"] = getifaddr_list();
if (bind_addresses.size() == 0)
bind_addresses = getifaddr_list();
st["addresses"] = bind_addresses;
}
st["port"] = listening_port; st["port"] = listening_port;
st["primary_enabled"] = run_primary; st["primary_enabled"] = run_primary;
st["blockstore_ready"] = bs->is_started();
st["blockstore_enabled"] = bs ? true : false; st["blockstore_enabled"] = bs ? true : false;
return st;
}
json11::Json osd_t::get_statistics()
{
json11::Json::object st;
timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
st["time"] = std::to_string(ts.tv_sec)+"."+std::to_string(ts.tv_nsec/1000000);
st["blockstore_ready"] = bs->is_started();
if (bs) if (bs)
{ {
st["size"] = bs->get_block_count() * bs->get_block_size(); st["size"] = bs->get_block_count() * bs->get_block_size();
@ -138,36 +143,43 @@ json11::Json osd_t::get_status()
return st; return st;
} }
void osd_t::report_status() void osd_t::report_statistics()
{ {
std::string st = get_status().dump();
// (!) Keys end with . to allow "select /osd/state/123. by prefix"
// because etcd transactions fail if you try to read non-existing keys
json11::Json::array txn = { json11::Json::object { json11::Json::array txn = { json11::Json::object {
{ "request_put", json11::Json::object { { "request_put", json11::Json::object {
{ "key", base64_encode(etcd_prefix+"/osd/state/"+std::to_string(osd_num)) }, { "key", base64_encode(etcd_prefix+"/osd/stats/"+std::to_string(osd_num)) },
{ "value", base64_encode(st) }, { "value", base64_encode(get_statistics().dump()) },
{ "lease", etcd_lease_id },
} } } }
} }; } };
for (auto & p: pgs) for (auto & p: pgs)
{ {
auto & pg = p.second; auto & pg = p.second;
json11::Json::object pg_st;
json11::Json::array pg_state; json11::Json::array pg_state;
for (int i = 0; i < pg_state_bit_count; i++) for (int i = 0; i < pg_state_bit_count; i++)
if (pg.state & pg_state_bits[i]) if (pg.state & pg_state_bits[i])
pg_state.push_back(pg_state_names[i]); pg_state.push_back(pg_state_names[i]);
pg_st["state"] = pg_state; json11::Json::object pg_stats;
pg_st["object_count"] = pg.total_count; pg_stats["object_count"] = pg.total_count;
pg_st["clean_count"] = pg.clean_count; pg_stats["clean_count"] = pg.clean_count;
pg_st["misplaced_count"] = pg.misplaced_objects.size(); pg_stats["misplaced_count"] = pg.misplaced_objects.size();
pg_st["degraded_count"] = pg.degraded_objects.size(); pg_stats["degraded_count"] = pg.degraded_objects.size();
pg_st["incomplete_count"] = pg.incomplete_objects.size(); pg_stats["incomplete_count"] = pg.incomplete_objects.size();
pg_st["write_osd_set"] = pg.cur_set; pg_stats["write_osd_set"] = pg.cur_set;
txn.push_back(json11::Json::object { txn.push_back(json11::Json::object {
{ "request_put", json11::Json::object { { "request_put", json11::Json::object {
{ "key", base64_encode(etcd_prefix+"/pg/state/"+std::to_string(pg.pg_num)) }, { "key", base64_encode(etcd_prefix+"/pg/state/"+std::to_string(pg.pg_num)) },
{ "value", base64_encode(json11::Json(pg_st).dump()) }, { "value", base64_encode(json11::Json(json11::Json::object {
{ "primary", this->osd_num },
{ "state", pg_state },
}).dump()) },
{ "lease", etcd_lease_id },
} }
});
txn.push_back(json11::Json::object {
{ "request_put", json11::Json::object {
{ "key", base64_encode(etcd_prefix+"/pg/stats/"+std::to_string(pg.pg_num)) },
{ "value", base64_encode(json11::Json(pg_stats).dump()) },
} } } }
}); });
if (pg.state == PG_ACTIVE && pg.target_history.size()) if (pg.state == PG_ACTIVE && pg.target_history.size())
@ -194,9 +206,13 @@ void osd_t::report_status()
// Retry // Retry
tfd->set_timer(ETCD_RETRY_INTERVAL, false, [this](int timer_id) tfd->set_timer(ETCD_RETRY_INTERVAL, false, [this](int timer_id)
{ {
report_status(); report_statistics();
}); });
} }
else if (res["error"] != "")
{
throw std::runtime_error("Error reporting state to etcd: ");
}
else else
{ {
etcd_failed_attempts = 0; etcd_failed_attempts = 0;
@ -204,37 +220,199 @@ void osd_t::report_status()
}); });
} }
void osd_t::etcd_txn(json11::Json txn, std::function<void(std::string, json11::Json)> callback) void osd_t::load_global_config()
{ {
std::string req = txn.dump(); etcd_call("/kv/range", json11::Json::object {
req = "POST "+etcd_api_path+"/kv/txn HTTP/1.1\r\n" { "key", base64_encode(etcd_prefix+"/config/osd/all") }
"Host: "+etcd_host+"\r\n" }, [this](std::string err, json11::Json data)
"Content-Type: application/json\r\n" {
"Content-Length: "+std::to_string(req.size())+"\r\n" if (err != "")
"Connection: close\r\n" {
"\r\n"+req; printf("Error reading OSD configuration from etcd: %s\n", err.c_str());
http_request_json(etcd_address, req, callback); tfd->set_timer(ETCD_START_INTERVAL, false, [this](int timer_id)
{
load_global_config();
});
return;
}
if (data["responses"][0]["response_range"]["kvs"].array_items().size() > 0)
{
std::string key = base64_decode(data["responses"][0]["response_range"]["kvs"][0]["key"].string_value());
std::string json_text = base64_decode(data["responses"][0]["response_range"]["kvs"][0]["value"].string_value());
std::string json_err;
json11::Json value = json11::Json::parse(json_text, json_err);
if (json_err != "")
{
printf("Bad JSON in etcd key %s: %s (value: %s)\n", key.c_str(), json_err.c_str(), json_text.c_str());
}
else
{
blockstore_config_t osd_config = this->config;
for (auto & cfg_var: value.object_items())
{
if (this->config.find(cfg_var.first) == this->config.end())
{
osd_config[cfg_var.first] = cfg_var.second.string_value();
}
}
parse_config(osd_config);
}
}
bind_socket();
acquire_lease();
});
}
// Acquire lease
void osd_t::acquire_lease()
{
etcd_call("/lease/grant", json11::Json::object {
{ "TTL", etcd_report_interval+(MAX_ETCD_ATTEMPTS*ETCD_RETRY_INTERVAL+999)/1000 }
}, [this](std::string err, json11::Json data)
{
if (err != "" || data["ID"].string_value() == "")
{
printf("Error acquiring a lease from etcd: %s\n", err.c_str());
tfd->set_timer(ETCD_START_INTERVAL, false, [this](int timer_id)
{
acquire_lease();
});
return;
}
etcd_lease_id = data["ID"].string_value();
create_state();
});
printf("[OSD %lu] reporting to etcd at %s each %d seconds\n", this->osd_num, etcd_address.c_str(), etcd_report_interval);
tfd->set_timer(etcd_report_interval*1000, true, [this](int timer_id)
{
renew_lease();
});
}
// Report "up" state once, then keep it alive using the lease
// Do it first to allow "monitors" check it when moving PGs
void osd_t::create_state()
{
std::string state_key = base64_encode(etcd_prefix+"/osd/state/"+std::to_string(osd_num));
etcd_txn(json11::Json::object {
// Check that the state key does not exist
{ "compare", json11::Json::array {
json11::Json::object {
{ "target", "CREATE" },
{ "create_revision", 0 },
{ "key", state_key },
}
} },
{ "success", json11::Json::array {
json11::Json::object {
{ "request_put", json11::Json::object {
{ "key", state_key },
{ "value", base64_encode(get_status().dump()) },
{ "lease", etcd_lease_id },
} }
},
} },
{ "failure", json11::Json::array {
json11::Json::object {
{ "request_range", json11::Json::object {
{ "key", state_key },
} }
},
} },
}, [this](std::string err, json11::Json data)
{
if (err != "")
{
// FIXME Retry?
printf("Error reporting OSD state to etcd: %s\n", err.c_str());
exit(1);
}
if (data["responses"][0]["response_range"].is_object())
{
// OSD is already up
auto & kv = data["responses"][0]["response_range"]["kvs"][0];
std::string key = base64_decode(kv["key"].string_value());
std::string json_err;
json11::Json state = json11::Json::parse(base64_decode(kv["value"].string_value()), json_err);
printf("Key %s already exists in etcd, OSD %lu is still up\n", key.c_str(), this->osd_num);
int64_t port = state["port"].int64_value();
for (auto & addr: state["addresses"].array_items())
{
printf(" listening at: %s:%ld\n", addr.string_value().c_str(), port);
}
exit(0);
}
if (run_primary)
{
load_pgs();
}
});
}
// Renew lease
void osd_t::renew_lease()
{
etcd_call("/lease/keepalive", json11::Json::object {
{ "ID", etcd_lease_id }
}, [this](std::string err, json11::Json data)
{
if (err == "" && data["result"]["TTL"].string_value() == "")
{
// Die
throw std::runtime_error("etcd lease has expired");
}
if (err != "")
{
etcd_failed_attempts++;
printf("Error renewing etcd lease: %s\n", err.c_str());
if (etcd_failed_attempts > MAX_ETCD_ATTEMPTS)
{
// Die
throw std::runtime_error("Cluster connection failed");
}
// Retry
tfd->set_timer(ETCD_RETRY_INTERVAL, false, [this](int timer_id)
{
renew_lease();
});
}
else
{
etcd_failed_attempts = 0;
report_statistics();
}
});
}
void osd_t::force_stop()
{
if (etcd_lease_id != "")
{
etcd_call("/lease/revoke", json11::Json::object {
{ "ID", etcd_lease_id }
}, [this](std::string err, json11::Json data)
{
if (err != "")
{
printf("Error revoking etcd lease: %s\n", err.c_str());
}
printf("[OSD %lu] Force stopping\n", this->osd_num);
exit(0);
});
}
} }
// Start -> Load config & PGs -> Load peers -> Connect to peers -> Peer PGs
// Wait for PG changes -> Start/Stop PGs when requested
// Peer connection is lost -> Reload connection data -> Try to reconnect -> Repeat
void osd_t::load_pgs() void osd_t::load_pgs()
{ {
assert(this->pgs.size() == 0); assert(this->pgs.size() == 0);
json11::Json::array checks = {
json11::Json::object {
{ "target", "LEASE" },
{ "lease", etcd_lease_id },
{ "key", base64_encode(etcd_prefix+"/osd/state/"+std::to_string(osd_num)) },
}
};
json11::Json::array txn = { json11::Json::array txn = {
// Update OSD state when loading PGs to allow "monitors" do CAS transactions when moving PGs
json11::Json::object {
{ "request_put", json11::Json::object {
{ "key", base64_encode(etcd_prefix+"/osd/state/"+std::to_string(osd_num)) },
{ "value", base64_encode(get_status().dump()) },
} }
},
json11::Json::object {
{ "request_range", json11::Json::object {
{ "key", base64_encode(etcd_prefix+"/config/osd/all") },
} }
},
json11::Json::object { json11::Json::object {
{ "request_range", json11::Json::object { { "request_range", json11::Json::object {
{ "key", base64_encode(etcd_prefix+"/config/pgs") }, { "key", base64_encode(etcd_prefix+"/config/pgs") },
@ -247,7 +425,7 @@ void osd_t::load_pgs()
} } } }
}, },
}; };
etcd_txn(json11::Json::object { { "success", txn } }, [this](std::string err, json11::Json data) etcd_txn(json11::Json::object { { "compare", checks }, { "success", txn } }, [this](std::string err, json11::Json data)
{ {
if (err != "") if (err != "")
{ {
@ -258,16 +436,16 @@ void osd_t::load_pgs()
}); });
return; return;
} }
if (!data["responses"].array_items().size())
{
printf("Error loading PGs from etcd: lease expired\n");
exit(1);
}
peering_state &= ~OSD_LOADING_PGS; peering_state &= ~OSD_LOADING_PGS;
blockstore_config_t osd_config = this->config;
json11::Json pg_config; json11::Json pg_config;
std::map<pg_num_t, json11::Json> pg_history; std::map<pg_num_t, json11::Json> pg_history;
for (auto & res: data["responses"].array_items()) for (auto & res: data["responses"].array_items())
{ {
if (!res["response_range"].is_object())
{
continue;
}
for (auto & kvs: res["response_range"]["kvs"].array_items()) for (auto & kvs: res["response_range"]["kvs"].array_items())
{ {
std::string key = base64_decode(kvs["key"].string_value()); std::string key = base64_decode(kvs["key"].string_value());
@ -277,23 +455,13 @@ void osd_t::load_pgs()
{ {
printf("Bad JSON in etcd key %s: %s (value: %s)\n", key.c_str(), json_err.c_str(), json_text.c_str()); printf("Bad JSON in etcd key %s: %s (value: %s)\n", key.c_str(), json_err.c_str(), json_text.c_str());
} }
if (key == etcd_prefix+"/config/osd/all")
{
for (auto & cfg_var: value.object_items())
{
if (this->config.find(cfg_var.first) == this->config.end())
{
osd_config[cfg_var.first] = cfg_var.second.string_value();
}
}
}
else if (key == etcd_prefix+"/config/pgs") else if (key == etcd_prefix+"/config/pgs")
{ {
pg_config = value; pg_config = value;
} }
else if (key.substr(0, etcd_prefix.length()+12) == etcd_prefix+"/pg/history/") else if (key.substr(0, etcd_prefix.length()+12) == etcd_prefix+"/pg/history/")
{ {
// <etcd_prefix>/pg/history/%d. // <etcd_prefix>/pg/history/%d
pg_num_t pg_num = stoull_full(key.substr(etcd_prefix.length()+12)); pg_num_t pg_num = stoull_full(key.substr(etcd_prefix.length()+12));
if (pg_num) if (pg_num)
{ {
@ -302,10 +470,8 @@ void osd_t::load_pgs()
} }
} }
} }
parse_config(osd_config);
bind_socket();
parse_pgs(pg_config, pg_history); parse_pgs(pg_config, pg_history);
report_status(); report_statistics();
}); });
} }

View File

@ -2,8 +2,15 @@
#include <signal.h> #include <signal.h>
void handle_sigint(int sig) static osd_t *osd = NULL;
static void handle_sigint(int sig)
{ {
if (osd)
{
osd->force_stop();
return;
}
exit(0); exit(0);
} }
@ -25,11 +32,11 @@ int main(int narg, char *args[])
} }
} }
signal(SIGINT, handle_sigint); signal(SIGINT, handle_sigint);
signal(SIGTERM, handle_sigint);
ring_loop_t *ringloop = new ring_loop_t(512); ring_loop_t *ringloop = new ring_loop_t(512);
// FIXME: Create Blockstore from on-disk superblock config and check it against the OSD cluster config // FIXME: Create Blockstore from on-disk superblock config and check it against the OSD cluster config
// FIXME: Prevent two OSD starting with same number
blockstore_t *bs = new blockstore_t(config, ringloop); blockstore_t *bs = new blockstore_t(config, ringloop);
osd_t *osd = new osd_t(config, bs, ringloop); osd = new osd_t(config, bs, ringloop);
while (1) while (1)
{ {
ringloop->loop(); ringloop->loop();