diff --git a/osd.h b/osd.h index 55734b3b..22e8dbc7 100644 --- a/osd.h +++ b/osd.h @@ -217,10 +217,10 @@ class osd_t // peer OSDs + std::string etcd_lease_id; std::map peer_states; std::map wanted_peers; bool loading_peer_config = false; - std::vector bind_addresses; int etcd_failed_attempts = 0; std::map osd_peer_fds; @@ -264,15 +264,21 @@ class osd_t uint64_t subop_stat_sum[2][OSD_OP_MAX+1] = { 0 }; uint64_t subop_stat_count[2][OSD_OP_MAX+1] = { 0 }; - // methods + // cluster connection + void etcd_call(std::string api, json11::Json payload, std::function callback); + void etcd_txn(json11::Json txn, std::function callback); void parse_config(blockstore_config_t & config); + void init_cluster(); + void load_global_config(); void bind_socket(); + void acquire_lease(); + void create_state(); + void renew_lease(); void print_stats(); void reset_stats(); json11::Json get_status(); - void etcd_txn(json11::Json txn, std::function callback); - void init_cluster(); - void report_status(); + json11::Json get_statistics(); + void report_statistics(); void load_pgs(); void parse_pgs(const json11::Json & pg_config, const std::map & pg_history); void load_and_connect_peers(); @@ -300,7 +306,6 @@ class osd_t void cancel_op(osd_op_t *op); void stop_client(int peer_fd); void parse_test_peer(std::string peer); - void init_primary(); void handle_peers(); void repeer_pgs(osd_num_t osd_num); void start_pg_peering(pg_num_t pg_num); @@ -348,6 +353,7 @@ class osd_t public: osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringloop); ~osd_t(); + void force_stop(); bool shutdown(); }; diff --git a/osd_cluster.cpp b/osd_cluster.cpp index b69709de..82ed3ceb 100644 --- a/osd_cluster.cpp +++ b/osd_cluster.cpp @@ -2,64 +2,67 @@ #include "osd_http.h" #include "base64.h" -void osd_t::init_cluster() +void osd_t::etcd_txn(json11::Json txn, std::function callback) { - if (run_primary) - { - init_primary(); - } - if (etcd_address != "") - { - if (!run_primary) - { - report_status(); - } - printf("[OSD %lu] reporting to etcd at %s each %d seconds\n", osd_num, etcd_address.c_str(), etcd_report_interval); - tfd->set_timer(etcd_report_interval*1000, true, [this](int timer_id) - { - report_status(); - }); - } - else - { - bind_socket(); - } + etcd_call("/kv/txn", txn, callback); } -void osd_t::init_primary() +void osd_t::etcd_call(std::string api, json11::Json payload, std::function callback) +{ + std::string req = payload.dump(); + req = "POST "+etcd_api_path+api+" HTTP/1.1\r\n" + "Host: "+etcd_host+"\r\n" + "Content-Type: application/json\r\n" + "Content-Length: "+std::to_string(req.size())+"\r\n" + "Connection: close\r\n" + "\r\n"+req; + http_request_json(etcd_address, req, callback); +} + +// Startup sequence: +// Load global OSD configuration -> Bind socket -> Acquire lease -> Report state +// -> Load PGs -> Load peers -> Connect to peers -> Peer PGs +// Event handling +// Wait for PG changes -> Start/Stop PGs when requested +// Peer connection is lost -> Reload connection data -> Try to reconnect +void osd_t::init_cluster() { if (etcd_address == "") { - // Test version of clustering code with 1 PG and 2 peers - // Example: peers = 2:127.0.0.1:11204,3:127.0.0.1:11205 - std::string peerstr = config["peers"]; - while (peerstr.size()) + if (run_primary) { - int pos = peerstr.find(','); - parse_test_peer(pos < 0 ? peerstr : peerstr.substr(0, pos)); - peerstr = pos < 0 ? std::string("") : peerstr.substr(pos+1); + // Test version of clustering code with 1 PG and 2 peers + // Example: peers = 2:127.0.0.1:11204,3:127.0.0.1:11205 + std::string peerstr = config["peers"]; + while (peerstr.size()) + { + int pos = peerstr.find(','); + parse_test_peer(pos < 0 ? peerstr : peerstr.substr(0, pos)); + peerstr = pos < 0 ? std::string("") : peerstr.substr(pos+1); + } + if (peer_states.size() < 2) + { + throw std::runtime_error("run_primary requires at least 2 peers"); + } + pgs[1] = (pg_t){ + .state = PG_PEERING, + .pg_cursize = 0, + .pg_num = 1, + .target_set = { 1, 2, 3 }, + .cur_set = { 0, 0, 0 }, + }; + pgs[1].print_state(); + pg_count = 1; + peering_state = OSD_CONNECTING_PEERS; } - if (peer_states.size() < 2) - { - throw std::runtime_error("run_primary requires at least 2 peers"); - } - pgs[1] = (pg_t){ - .state = PG_PEERING, - .pg_cursize = 0, - .pg_num = 1, - .target_set = { 1, 2, 3 }, - .cur_set = { 0, 0, 0 }, - }; - pgs[1].print_state(); - pg_count = 1; - peering_state = OSD_CONNECTING_PEERS; + bind_socket(); } else { peering_state = OSD_LOADING_PGS; - load_pgs(); + load_global_config(); } - if (autosync_interval > 0) + if (run_primary && autosync_interval > 0) { this->tfd->set_timer(autosync_interval*1000, true, [this](int timer_id) { @@ -97,22 +100,24 @@ void osd_t::parse_test_peer(std::string peer) json11::Json osd_t::get_status() { json11::Json::object st; - timespec ts; - clock_gettime(CLOCK_REALTIME, &ts); - st["time"] = std::to_string(ts.tv_sec)+"."+std::to_string(ts.tv_nsec/1000000); st["state"] = "up"; if (bind_address != "0.0.0.0") st["addresses"] = { bind_address }; else - { - if (bind_addresses.size() == 0) - bind_addresses = getifaddr_list(); - st["addresses"] = bind_addresses; - } + st["addresses"] = getifaddr_list(); st["port"] = listening_port; st["primary_enabled"] = run_primary; - st["blockstore_ready"] = bs->is_started(); st["blockstore_enabled"] = bs ? true : false; + return st; +} + +json11::Json osd_t::get_statistics() +{ + json11::Json::object st; + timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + st["time"] = std::to_string(ts.tv_sec)+"."+std::to_string(ts.tv_nsec/1000000); + st["blockstore_ready"] = bs->is_started(); if (bs) { st["size"] = bs->get_block_count() * bs->get_block_size(); @@ -138,36 +143,43 @@ json11::Json osd_t::get_status() return st; } -void osd_t::report_status() +void osd_t::report_statistics() { - std::string st = get_status().dump(); - // (!) Keys end with . to allow "select /osd/state/123. by prefix" - // because etcd transactions fail if you try to read non-existing keys json11::Json::array txn = { json11::Json::object { { "request_put", json11::Json::object { - { "key", base64_encode(etcd_prefix+"/osd/state/"+std::to_string(osd_num)) }, - { "value", base64_encode(st) }, + { "key", base64_encode(etcd_prefix+"/osd/stats/"+std::to_string(osd_num)) }, + { "value", base64_encode(get_statistics().dump()) }, + { "lease", etcd_lease_id }, } } } }; for (auto & p: pgs) { auto & pg = p.second; - json11::Json::object pg_st; json11::Json::array pg_state; for (int i = 0; i < pg_state_bit_count; i++) if (pg.state & pg_state_bits[i]) pg_state.push_back(pg_state_names[i]); - pg_st["state"] = pg_state; - pg_st["object_count"] = pg.total_count; - pg_st["clean_count"] = pg.clean_count; - pg_st["misplaced_count"] = pg.misplaced_objects.size(); - pg_st["degraded_count"] = pg.degraded_objects.size(); - pg_st["incomplete_count"] = pg.incomplete_objects.size(); - pg_st["write_osd_set"] = pg.cur_set; + json11::Json::object pg_stats; + pg_stats["object_count"] = pg.total_count; + pg_stats["clean_count"] = pg.clean_count; + pg_stats["misplaced_count"] = pg.misplaced_objects.size(); + pg_stats["degraded_count"] = pg.degraded_objects.size(); + pg_stats["incomplete_count"] = pg.incomplete_objects.size(); + pg_stats["write_osd_set"] = pg.cur_set; txn.push_back(json11::Json::object { { "request_put", json11::Json::object { { "key", base64_encode(etcd_prefix+"/pg/state/"+std::to_string(pg.pg_num)) }, - { "value", base64_encode(json11::Json(pg_st).dump()) }, + { "value", base64_encode(json11::Json(json11::Json::object { + { "primary", this->osd_num }, + { "state", pg_state }, + }).dump()) }, + { "lease", etcd_lease_id }, + } } + }); + txn.push_back(json11::Json::object { + { "request_put", json11::Json::object { + { "key", base64_encode(etcd_prefix+"/pg/stats/"+std::to_string(pg.pg_num)) }, + { "value", base64_encode(json11::Json(pg_stats).dump()) }, } } }); if (pg.state == PG_ACTIVE && pg.target_history.size()) @@ -194,9 +206,13 @@ void osd_t::report_status() // Retry tfd->set_timer(ETCD_RETRY_INTERVAL, false, [this](int timer_id) { - report_status(); + report_statistics(); }); } + else if (res["error"] != "") + { + throw std::runtime_error("Error reporting state to etcd: "); + } else { etcd_failed_attempts = 0; @@ -204,37 +220,199 @@ void osd_t::report_status() }); } -void osd_t::etcd_txn(json11::Json txn, std::function callback) +void osd_t::load_global_config() { - std::string req = txn.dump(); - req = "POST "+etcd_api_path+"/kv/txn HTTP/1.1\r\n" - "Host: "+etcd_host+"\r\n" - "Content-Type: application/json\r\n" - "Content-Length: "+std::to_string(req.size())+"\r\n" - "Connection: close\r\n" - "\r\n"+req; - http_request_json(etcd_address, req, callback); + etcd_call("/kv/range", json11::Json::object { + { "key", base64_encode(etcd_prefix+"/config/osd/all") } + }, [this](std::string err, json11::Json data) + { + if (err != "") + { + printf("Error reading OSD configuration from etcd: %s\n", err.c_str()); + tfd->set_timer(ETCD_START_INTERVAL, false, [this](int timer_id) + { + load_global_config(); + }); + return; + } + if (data["responses"][0]["response_range"]["kvs"].array_items().size() > 0) + { + std::string key = base64_decode(data["responses"][0]["response_range"]["kvs"][0]["key"].string_value()); + std::string json_text = base64_decode(data["responses"][0]["response_range"]["kvs"][0]["value"].string_value()); + std::string json_err; + json11::Json value = json11::Json::parse(json_text, json_err); + if (json_err != "") + { + printf("Bad JSON in etcd key %s: %s (value: %s)\n", key.c_str(), json_err.c_str(), json_text.c_str()); + } + else + { + blockstore_config_t osd_config = this->config; + for (auto & cfg_var: value.object_items()) + { + if (this->config.find(cfg_var.first) == this->config.end()) + { + osd_config[cfg_var.first] = cfg_var.second.string_value(); + } + } + parse_config(osd_config); + } + } + bind_socket(); + acquire_lease(); + }); +} + +// Acquire lease +void osd_t::acquire_lease() +{ + etcd_call("/lease/grant", json11::Json::object { + { "TTL", etcd_report_interval+(MAX_ETCD_ATTEMPTS*ETCD_RETRY_INTERVAL+999)/1000 } + }, [this](std::string err, json11::Json data) + { + if (err != "" || data["ID"].string_value() == "") + { + printf("Error acquiring a lease from etcd: %s\n", err.c_str()); + tfd->set_timer(ETCD_START_INTERVAL, false, [this](int timer_id) + { + acquire_lease(); + }); + return; + } + etcd_lease_id = data["ID"].string_value(); + create_state(); + }); + printf("[OSD %lu] reporting to etcd at %s each %d seconds\n", this->osd_num, etcd_address.c_str(), etcd_report_interval); + tfd->set_timer(etcd_report_interval*1000, true, [this](int timer_id) + { + renew_lease(); + }); +} + +// Report "up" state once, then keep it alive using the lease +// Do it first to allow "monitors" check it when moving PGs +void osd_t::create_state() +{ + std::string state_key = base64_encode(etcd_prefix+"/osd/state/"+std::to_string(osd_num)); + etcd_txn(json11::Json::object { + // Check that the state key does not exist + { "compare", json11::Json::array { + json11::Json::object { + { "target", "CREATE" }, + { "create_revision", 0 }, + { "key", state_key }, + } + } }, + { "success", json11::Json::array { + json11::Json::object { + { "request_put", json11::Json::object { + { "key", state_key }, + { "value", base64_encode(get_status().dump()) }, + { "lease", etcd_lease_id }, + } } + }, + } }, + { "failure", json11::Json::array { + json11::Json::object { + { "request_range", json11::Json::object { + { "key", state_key }, + } } + }, + } }, + }, [this](std::string err, json11::Json data) + { + if (err != "") + { + // FIXME Retry? + printf("Error reporting OSD state to etcd: %s\n", err.c_str()); + exit(1); + } + if (data["responses"][0]["response_range"].is_object()) + { + // OSD is already up + auto & kv = data["responses"][0]["response_range"]["kvs"][0]; + std::string key = base64_decode(kv["key"].string_value()); + std::string json_err; + json11::Json state = json11::Json::parse(base64_decode(kv["value"].string_value()), json_err); + printf("Key %s already exists in etcd, OSD %lu is still up\n", key.c_str(), this->osd_num); + int64_t port = state["port"].int64_value(); + for (auto & addr: state["addresses"].array_items()) + { + printf(" listening at: %s:%ld\n", addr.string_value().c_str(), port); + } + exit(0); + } + if (run_primary) + { + load_pgs(); + } + }); +} + +// Renew lease +void osd_t::renew_lease() +{ + etcd_call("/lease/keepalive", json11::Json::object { + { "ID", etcd_lease_id } + }, [this](std::string err, json11::Json data) + { + if (err == "" && data["result"]["TTL"].string_value() == "") + { + // Die + throw std::runtime_error("etcd lease has expired"); + } + if (err != "") + { + etcd_failed_attempts++; + printf("Error renewing etcd lease: %s\n", err.c_str()); + if (etcd_failed_attempts > MAX_ETCD_ATTEMPTS) + { + // Die + throw std::runtime_error("Cluster connection failed"); + } + // Retry + tfd->set_timer(ETCD_RETRY_INTERVAL, false, [this](int timer_id) + { + renew_lease(); + }); + } + else + { + etcd_failed_attempts = 0; + report_statistics(); + } + }); +} + +void osd_t::force_stop() +{ + if (etcd_lease_id != "") + { + etcd_call("/lease/revoke", json11::Json::object { + { "ID", etcd_lease_id } + }, [this](std::string err, json11::Json data) + { + if (err != "") + { + printf("Error revoking etcd lease: %s\n", err.c_str()); + } + printf("[OSD %lu] Force stopping\n", this->osd_num); + exit(0); + }); + } } -// Start -> Load config & PGs -> Load peers -> Connect to peers -> Peer PGs -// Wait for PG changes -> Start/Stop PGs when requested -// Peer connection is lost -> Reload connection data -> Try to reconnect -> Repeat void osd_t::load_pgs() { assert(this->pgs.size() == 0); + json11::Json::array checks = { + json11::Json::object { + { "target", "LEASE" }, + { "lease", etcd_lease_id }, + { "key", base64_encode(etcd_prefix+"/osd/state/"+std::to_string(osd_num)) }, + } + }; json11::Json::array txn = { - // Update OSD state when loading PGs to allow "monitors" do CAS transactions when moving PGs - json11::Json::object { - { "request_put", json11::Json::object { - { "key", base64_encode(etcd_prefix+"/osd/state/"+std::to_string(osd_num)) }, - { "value", base64_encode(get_status().dump()) }, - } } - }, - json11::Json::object { - { "request_range", json11::Json::object { - { "key", base64_encode(etcd_prefix+"/config/osd/all") }, - } } - }, json11::Json::object { { "request_range", json11::Json::object { { "key", base64_encode(etcd_prefix+"/config/pgs") }, @@ -247,7 +425,7 @@ void osd_t::load_pgs() } } }, }; - etcd_txn(json11::Json::object { { "success", txn } }, [this](std::string err, json11::Json data) + etcd_txn(json11::Json::object { { "compare", checks }, { "success", txn } }, [this](std::string err, json11::Json data) { if (err != "") { @@ -258,16 +436,16 @@ void osd_t::load_pgs() }); return; } + if (!data["responses"].array_items().size()) + { + printf("Error loading PGs from etcd: lease expired\n"); + exit(1); + } peering_state &= ~OSD_LOADING_PGS; - blockstore_config_t osd_config = this->config; json11::Json pg_config; std::map pg_history; for (auto & res: data["responses"].array_items()) { - if (!res["response_range"].is_object()) - { - continue; - } for (auto & kvs: res["response_range"]["kvs"].array_items()) { std::string key = base64_decode(kvs["key"].string_value()); @@ -277,23 +455,13 @@ void osd_t::load_pgs() { printf("Bad JSON in etcd key %s: %s (value: %s)\n", key.c_str(), json_err.c_str(), json_text.c_str()); } - if (key == etcd_prefix+"/config/osd/all") - { - for (auto & cfg_var: value.object_items()) - { - if (this->config.find(cfg_var.first) == this->config.end()) - { - osd_config[cfg_var.first] = cfg_var.second.string_value(); - } - } - } else if (key == etcd_prefix+"/config/pgs") { pg_config = value; } else if (key.substr(0, etcd_prefix.length()+12) == etcd_prefix+"/pg/history/") { - // /pg/history/%d. + // /pg/history/%d pg_num_t pg_num = stoull_full(key.substr(etcd_prefix.length()+12)); if (pg_num) { @@ -302,10 +470,8 @@ void osd_t::load_pgs() } } } - parse_config(osd_config); - bind_socket(); parse_pgs(pg_config, pg_history); - report_status(); + report_statistics(); }); } diff --git a/osd_main.cpp b/osd_main.cpp index ceab1480..f1146ca2 100644 --- a/osd_main.cpp +++ b/osd_main.cpp @@ -2,8 +2,15 @@ #include -void handle_sigint(int sig) +static osd_t *osd = NULL; + +static void handle_sigint(int sig) { + if (osd) + { + osd->force_stop(); + return; + } exit(0); } @@ -25,11 +32,11 @@ int main(int narg, char *args[]) } } signal(SIGINT, handle_sigint); + signal(SIGTERM, handle_sigint); ring_loop_t *ringloop = new ring_loop_t(512); // FIXME: Create Blockstore from on-disk superblock config and check it against the OSD cluster config - // FIXME: Prevent two OSD starting with same number blockstore_t *bs = new blockstore_t(config, ringloop); - osd_t *osd = new osd_t(config, bs, ringloop); + osd = new osd_t(config, bs, ringloop); while (1) { ringloop->loop();