diff --git a/osd.cpp b/osd.cpp index e9724d60..a1eacae0 100644 --- a/osd.cpp +++ b/osd.cpp @@ -72,11 +72,6 @@ osd_t::~osd_t() delete sync_tfd; sync_tfd = NULL; } - if (consul_tfd) - { - delete consul_tfd; - consul_tfd = NULL; - } ringloop->unregister_consumer(&consumer); close(epoll_fd); close(listen_fd); @@ -106,15 +101,40 @@ osd_op_t::~osd_op_t() void osd_t::parse_config(blockstore_config_t & config) { + int pos; // Initial startup configuration - consul_address = config["consul_address"]; - consul_host = consul_address.find(':') >= 0 ? consul_address.substr(0, consul_address.find(':')) : consul_address; - consul_prefix = config["consul_prefix"]; - if (consul_prefix == "") - consul_prefix = "microceph"; - consul_report_interval = strtoull(config["consul_report_interval"].c_str(), NULL, 10); - if (consul_report_interval <= 0) - consul_report_interval = 30; + etcd_address = config["etcd_address"]; + etcd_prefix = config["etcd_prefix"]; + if (etcd_prefix == "") + etcd_prefix = "/microceph"; + if ((pos = etcd_address.find('/')) >= 0) + { + etcd_api_path = etcd_address.substr(pos); + etcd_address = etcd_address.substr(0, pos); + } + else if (config.find("etcd_version") != config.end()) + { + int major, minor; + if (sscanf(config["etcd_version"].c_str(), "%d.%d", &major, &minor) < 2) + throw std::runtime_error("etcd_version should be in the form MAJOR.MINOR (for example, 3.2)"); + if (major < 3 || major == 3 && minor < 2) + throw std::runtime_error("Your etcd is too old, minimum required version is 3.2"); + else if (major == 3 && minor == 2) + etcd_api_path = "/v3alpha"; + else if (major == 3 && minor == 3) + etcd_api_path = "/v3beta"; + else + etcd_api_path = "/v3"; + } + else + etcd_api_path = "/v3"; + if ((pos = etcd_address.find(':')) >= 0) + etcd_host = etcd_address.substr(0, pos); + else + etcd_host = etcd_address; + etcd_report_interval = strtoull(config["etcd_report_interval"].c_str(), NULL, 10); + if (etcd_report_interval <= 0) + etcd_report_interval = 30; osd_num = strtoull(config["osd_num"].c_str(), NULL, 10); if (!osd_num) throw std::runtime_error("osd_num is required in the configuration"); @@ -384,7 +404,7 @@ void osd_t::stop_client(int peer_fd) { if (cl.osd_num) { - // Reload configuration from Consul when the connection is dropped + // Reload configuration from etcd when the connection is dropped printf("[%lu] Stopping client %d (OSD peer %lu)\n", osd_num, peer_fd, cl.osd_num); peer_states.erase(cl.osd_num); repeer_pgs(cl.osd_num); diff --git a/osd.h b/osd.h index da79273f..7b6113cb 100644 --- a/osd.h +++ b/osd.h @@ -50,9 +50,9 @@ #define MAX_RECOVERY_QUEUE 2048 #define DEFAULT_RECOVERY_QUEUE 4 -#define MAX_CONSUL_ATTEMPTS 5 -#define CONSUL_START_INTERVAL 5000 -#define CONSUL_RETRY_INTERVAL 1000 +#define MAX_ETCD_ATTEMPTS 5 +#define ETCD_START_INTERVAL 5000 +#define ETCD_RETRY_INTERVAL 1000 //#define OSD_STUB @@ -197,8 +197,10 @@ class osd_t // config blockstore_config_t config; + std::string etcd_address, etcd_host, etcd_prefix, etcd_api_path; + int etcd_report_interval = 30; + bool readonly = false; - std::string consul_address, consul_host, consul_prefix = "microceph"; osd_num_t osd_num = 1; // OSD numbers start with 1 bool run_primary = false; std::string bind_address; @@ -207,7 +209,6 @@ class osd_t bool allow_test_ops = true; int receive_buffer_size = 9000; int print_stats_interval = 3; - int consul_report_interval = 30; int immediate_commit = IMMEDIATE_NONE; int autosync_interval = DEFAULT_AUTOSYNC_INTERVAL; // sync every 5 seconds int recovery_queue_depth = DEFAULT_RECOVERY_QUEUE; @@ -221,7 +222,7 @@ class osd_t std::map wanted_peers; bool loading_peer_config = false; std::vector bind_addresses; - int consul_failed_attempts = 0; + int etcd_failed_attempts = 0; std::map osd_peer_fds; std::map pgs; @@ -245,7 +246,7 @@ class osd_t uint32_t bs_block_size, bs_disk_alignment; uint64_t pg_stripe_size = 4*1024*1024; // 4 MB by default ring_loop_t *ringloop; - timerfd_interval *stats_tfd = NULL, *sync_tfd = NULL, *consul_tfd = NULL; + timerfd_interval *stats_tfd = NULL, *sync_tfd = NULL; timerfd_manager_t *tfd = NULL; int wait_state = 0; @@ -271,7 +272,7 @@ class osd_t void print_stats(); void reset_stats(); json11::Json get_status(); - void consul_txn(json11::Json txn, std::function callback); + void etcd_txn(json11::Json txn, std::function callback); void init_cluster(); void report_status(); void load_pgs(); diff --git a/osd_cluster.cpp b/osd_cluster.cpp index 9ceb308a..e83322ed 100644 --- a/osd_cluster.cpp +++ b/osd_cluster.cpp @@ -8,14 +8,14 @@ void osd_t::init_cluster() { init_primary(); } - if (consul_address != "") + if (etcd_address != "") { if (!run_primary) { report_status(); } - printf("[OSD %lu] reporting to Consul at %s each %d seconds\n", osd_num, consul_address.c_str(), consul_report_interval); - this->consul_tfd = new timerfd_interval(ringloop, consul_report_interval, [this]() + printf("[OSD %lu] reporting to etcd at %s each %d seconds\n", osd_num, etcd_address.c_str(), etcd_report_interval); + tfd->set_timer(etcd_report_interval*1000, true, [this](int timer_id) { report_status(); }); @@ -28,7 +28,7 @@ void osd_t::init_cluster() void osd_t::init_primary() { - if (consul_address == "") + if (etcd_address == "") { // Test version of clustering code with 1 PG and 2 peers // Example: peers = 2:127.0.0.1:11204,3:127.0.0.1:11205 @@ -142,16 +142,13 @@ void osd_t::report_status() { std::string st = get_status().dump(); // (!) Keys end with . to allow "select /osd/state/123. by prefix" - // because Consul transactions fail if you try to read non-existing keys - json11::Json::array txn = { - json11::Json::object { - { "KV", json11::Json::object { - { "Verb", "set" }, - { "Key", consul_prefix+"/osd/state/"+std::to_string(osd_num)+"." }, - { "Value", base64_encode(st) }, - } } - }, - }; + // because etcd transactions fail if you try to read non-existing keys + json11::Json::array txn = { json11::Json::object { + { "requestPut", json11::Json::object { + { "key", base64_encode(etcd_prefix+"/osd/state/"+std::to_string(osd_num)) }, + { "value", base64_encode(st) }, + } } + } }; for (auto & p: pgs) { auto & pg = p.second; @@ -168,10 +165,9 @@ void osd_t::report_status() pg_st["incomplete_count"] = pg.incomplete_objects.size(); pg_st["write_osd_set"] = pg.cur_set; txn.push_back(json11::Json::object { - { "KV", json11::Json::object { - { "Verb", "set" }, - { "Key", consul_prefix+"/pg/state/"+std::to_string(pg.pg_num)+"." }, - { "Value", base64_encode(json11::Json(pg_st).dump()) }, + { "requestPut", json11::Json::object { + { "key", base64_encode(etcd_prefix+"/pg/state/"+std::to_string(pg.pg_num)) }, + { "value", base64_encode(json11::Json(pg_st).dump()) }, } } }); if (pg.state == PG_ACTIVE && pg.target_history.size()) @@ -179,46 +175,45 @@ void osd_t::report_status() pg.target_history.clear(); pg.all_peers = pg.target_set; txn.push_back(json11::Json::object { - { "KV", json11::Json::object { - { "Verb", "delete" }, - { "Key", consul_prefix+"/pg/history/"+std::to_string(pg.pg_num)+"." }, + { "requestDeleteRange", json11::Json::object { + { "key", base64_encode(etcd_prefix+"/pg/history/"+std::to_string(pg.pg_num)) }, } } }); } } - consul_txn(txn, [this](std::string err, json11::Json res) + etcd_txn(json11::Json::object { { "success", txn } }, [this](std::string err, json11::Json res) { if (err != "") { - consul_failed_attempts++; - printf("Error reporting state to Consul: %s\n", err.c_str()); - if (consul_failed_attempts > MAX_CONSUL_ATTEMPTS) + etcd_failed_attempts++; + printf("Error reporting state to etcd: %s\n", err.c_str()); + if (etcd_failed_attempts > MAX_ETCD_ATTEMPTS) { throw std::runtime_error("Cluster connection failed"); } // Retry - tfd->set_timer(CONSUL_RETRY_INTERVAL, false, [this](int timer_id) + tfd->set_timer(ETCD_RETRY_INTERVAL, false, [this](int timer_id) { report_status(); }); } else { - consul_failed_attempts = 0; + etcd_failed_attempts = 0; } }); } -void osd_t::consul_txn(json11::Json txn, std::function callback) +void osd_t::etcd_txn(json11::Json txn, std::function callback) { std::string req = txn.dump(); - req = "PUT /v1/txn HTTP/1.1\r\n" - "Host: "+consul_host+"\r\n" + req = "POST "+etcd_api_path+"/kv/txn HTTP/1.1\r\n" + "Host: "+etcd_host+"\r\n" "Content-Type: application/json\r\n" "Content-Length: "+std::to_string(req.size())+"\r\n" "Connection: close\r\n" "\r\n"+req; - http_request_json(consul_address, req, callback); + http_request_json(etcd_address, req, callback); } // Start -> Load config & PGs -> Load peers -> Connect to peers -> Peer PGs @@ -230,37 +225,34 @@ void osd_t::load_pgs() json11::Json::array txn = { // Update OSD state when loading PGs to allow "monitors" do CAS transactions when moving PGs json11::Json::object { - { "KV", json11::Json::object { - { "Verb", "set" }, - { "Key", consul_prefix+"/osd/state/"+std::to_string(osd_num)+"." }, - { "Value", base64_encode(get_status().dump()) }, + { "requestPut", json11::Json::object { + { "key", base64_encode(etcd_prefix+"/osd/state/"+std::to_string(osd_num)) }, + { "value", base64_encode(get_status().dump()) }, } } }, json11::Json::object { - { "KV", json11::Json::object { - { "Verb", "get-tree" }, - { "Key", consul_prefix+"/config/osd/all" }, + { "requestRange", json11::Json::object { + { "key", base64_encode(etcd_prefix+"/config/osd/all") }, } } }, json11::Json::object { - { "KV", json11::Json::object { - { "Verb", "get" }, - { "Key", consul_prefix+"/config/pgs" }, + { "requestRange", json11::Json::object { + { "key", base64_encode(etcd_prefix+"/config/pgs") }, } } }, json11::Json::object { - { "KV", json11::Json::object { - { "Verb", "get-tree" }, - { "Key", consul_prefix+"/pg/history/" }, + { "requestRange", json11::Json::object { + { "key", base64_encode(etcd_prefix+"/pg/history/") }, + { "range_end", base64_encode(etcd_prefix+"/pg/history0") }, } } }, }; - consul_txn(txn, [this](std::string err, json11::Json data) + etcd_txn(json11::Json::object { { "success", txn } }, [this](std::string err, json11::Json data) { if (err != "") { - printf("Error loading PGs from Consul: %s\n", err.c_str()); - tfd->set_timer(CONSUL_START_INTERVAL, false, [this](int timer_id) + printf("Error loading PGs from etcd: %s\n", err.c_str()); + tfd->set_timer(ETCD_START_INTERVAL, false, [this](int timer_id) { load_pgs(); }); @@ -270,40 +262,43 @@ void osd_t::load_pgs() blockstore_config_t osd_config = this->config; json11::Json pg_config; std::map pg_history; - for (auto & res: data["Results"].array_items()) + for (auto & res: data["responses"].array_items()) { - std::string key = res["KV"]["Key"].string_value(); - if (key == (consul_prefix+"/osd/state/"+std::to_string(osd_num)+".")) + if (!res["response_range"].is_object()) { continue; } - std::string json_err, json_text = base64_decode(res["KV"]["Value"].string_value()); - json11::Json value = json11::Json::parse(json_text, json_err); - if (json_err != "") + for (auto & kvs: res["response_range"]["kvs"].array_items()) { - printf("Bad JSON in Consul key %s: %s (value: %s)\n", key.c_str(), json_err.c_str(), json_text.c_str()); - } - if (key == consul_prefix+"/config/osd/all") - { - for (auto & cfg_var: value.object_items()) + std::string key = base64_decode(kvs["key"].string_value()); + std::string json_err, json_text = base64_decode(kvs["value"].string_value()); + json11::Json value = json11::Json::parse(json_text, json_err); + if (json_err != "") { - if (this->config.find(cfg_var.first) == this->config.end()) + printf("Bad JSON in etcd key %s: %s (value: %s)\n", key.c_str(), json_err.c_str(), json_text.c_str()); + } + if (key == etcd_prefix+"/config/osd/all") + { + for (auto & cfg_var: value.object_items()) { - osd_config[cfg_var.first] = cfg_var.second.string_value(); + if (this->config.find(cfg_var.first) == this->config.end()) + { + osd_config[cfg_var.first] = cfg_var.second.string_value(); + } } } - } - else if (key == consul_prefix+"/config/pgs") - { - pg_config = value; - } - else if (key.substr(0, consul_prefix.length()+12) == consul_prefix+"/pg/history/") - { - // /pg/history/%d. - pg_num_t pg_num = stoull_full(key.substr(consul_prefix.length()+12, key.length()-consul_prefix.length()-13)); - if (pg_num) + else if (key == etcd_prefix+"/config/pgs") { - pg_history[pg_num] = value; + pg_config = value; + } + else if (key.substr(0, etcd_prefix.length()+12) == etcd_prefix+"/pg/history/") + { + // /pg/history/%d. + pg_num_t pg_num = stoull_full(key.substr(etcd_prefix.length()+12)); + if (pg_num) + { + pg_history[pg_num] = value; + } } } } @@ -410,12 +405,11 @@ void osd_t::load_and_connect_peers() { if (!loading_peer_config && (time(NULL) - wp_it->second.last_load_attempt >= peer_connect_interval)) { - // (Re)load OSD state from Consul + // (Re)load OSD state from etcd wp_it->second.last_load_attempt = time(NULL); load_peer_txn.push_back(json11::Json::object { - { "KV", json11::Json::object { - { "Verb", "get-tree" }, - { "Key", consul_prefix+"/osd/state/"+std::to_string(peer_osd)+"." }, + { "requestRange", json11::Json::object { + { "key", base64_encode(etcd_prefix+"/osd/state/"+std::to_string(peer_osd)) }, } } }); } @@ -469,32 +463,35 @@ void osd_t::load_and_connect_peers() } if (load_peer_txn.size() > 0) { - consul_txn(load_peer_txn, [this](std::string err, json11::Json data) + etcd_txn(json11::Json::object { { "success", load_peer_txn } }, [this](std::string err, json11::Json data) { // Ugly, but required to wake up the loop tfd->set_timer(peer_connect_interval*1000, false, [](int timer_id){}); loading_peer_config = false; if (err != "") { - printf("Failed to load peer configuration from Consul: %s\n", err.c_str()); + printf("Failed to load peer configuration from etcd: %s\n", err.c_str()); return; } - for (auto & res: data["Results"].array_items()) + for (auto & res: data["responses"].array_items()) { - std::string key = res["KV"]["Key"].string_value(); - // /osd/state/. - osd_num_t peer_osd = std::stoull(key.substr(consul_prefix.length()+11, key.length()-consul_prefix.length()-12)); - std::string json_err; - std::string json_text = base64_decode(res["KV"]["Value"].string_value()); - json11::Json st = json11::Json::parse(json_text, json_err); - if (json_err != "") + if (res["response_range"]["kvs"].array_items().size()) { - printf("Bad JSON in Consul key %s: %s (value: %s)\n", key.c_str(), json_err.c_str(), json_text.c_str()); - } - if (peer_osd > 0 && st.is_object() && st["state"] == "up" && - st["addresses"].is_array() && st["port"].int64_value() > 0 && st["port"].int64_value() < 65536) - { - peer_states[peer_osd] = st; + std::string key = base64_decode(res["response_range"]["kvs"][0]["key"].string_value()); + // /osd/state/ + osd_num_t peer_osd = std::stoull(key.substr(etcd_prefix.length()+11)); + std::string json_err; + std::string json_text = base64_decode(res["response_range"]["kvs"][0]["value"].string_value()); + json11::Json st = json11::Json::parse(json_text, json_err); + if (json_err != "") + { + printf("Bad JSON in etcd key %s: %s (value: %s)\n", key.c_str(), json_err.c_str(), json_text.c_str()); + } + if (peer_osd > 0 && st.is_object() && st["state"] == "up" && + st["addresses"].is_array() && st["port"].int64_value() > 0 && st["port"].int64_value() < 65536) + { + peer_states[peer_osd] = st; + } } } });