From ba63af49b45aa62b1d93faee4cdb45d441de2d8a Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sun, 23 Jan 2022 16:25:47 +0300 Subject: [PATCH] Add etcd retries everywhere (they were missing in some places) --- src/cli.cpp | 20 +++++++++-- src/cli.h | 3 ++ src/cli_alloc_osd.cpp | 25 +++----------- src/cli_create.cpp | 47 +++++++++----------------- src/cli_df.cpp | 14 ++------ src/cli_ls.cpp | 14 ++------ src/cli_modify.cpp | 22 ++++-------- src/cli_snap_rm.cpp | 12 +++---- src/etcd_state_client.cpp | 46 +++++++++++++++++++++---- src/etcd_state_client.h | 5 +-- src/http_client.cpp | 71 +++++++++++++++------------------------ src/osd_cluster.cpp | 14 ++++---- tests/run_3osds.sh | 2 +- 13 files changed, 134 insertions(+), 161 deletions(-) diff --git a/src/cli.cpp b/src/cli.cpp index 2486ca91d..3c0d3e19f 100644 --- a/src/cli.cpp +++ b/src/cli.cpp @@ -177,7 +177,7 @@ void cli_tool_t::change_parent(inode_t cur, inode_t new_parent) new_cfg.parent_id = new_parent; json11::Json::object cur_cfg_json = cli->st_cli.serialize_inode_cfg(&new_cfg); waiting++; - cli->st_cli.etcd_txn(json11::Json::object { + cli->st_cli.etcd_txn_slow(json11::Json::object { { "compare", json11::Json::array { json11::Json::object { { "target", "MOD" }, @@ -194,7 +194,7 @@ void cli_tool_t::change_parent(inode_t cur, inode_t new_parent) } } }, } }, - }, cli->st_cli.etcd_slow_timeout, [this, new_parent, cur, cur_name](std::string err, json11::Json res) + }, [this, new_parent, cur, cur_name](std::string err, json11::Json res) { if (err != "") { @@ -229,6 +229,22 @@ void cli_tool_t::change_parent(inode_t cur, inode_t new_parent) }); } +void cli_tool_t::etcd_txn(json11::Json txn) +{ + waiting++; + cli->st_cli.etcd_txn_slow(txn, [this](std::string err, json11::Json res) + { + waiting--; + if (err != "") + { + fprintf(stderr, "Error reading from etcd: %s\n", err.c_str()); + exit(1); + } + etcd_result = res; + ringloop->wakeup(); + }); +} + inode_config_t* cli_tool_t::get_inode_cfg(const std::string & name) { for (auto & ic: cli->st_cli.inode_config) diff --git a/src/cli.h b/src/cli.h index 90007a331..ae272caae 100644 --- a/src/cli.h +++ b/src/cli.h @@ -34,6 +34,7 @@ public: cluster_client_t *cli = NULL; int waiting = 0; + json11::Json etcd_result; ring_consumer_t consumer; std::function action_cb; @@ -60,6 +61,8 @@ public: std::function start_snap_rm(json11::Json); std::function start_alloc_osd(json11::Json cfg, uint64_t *out = NULL); std::function simple_offsets(json11::Json cfg); + + void etcd_txn(json11::Json txn); }; uint64_t parse_size(std::string size_str); diff --git a/src/cli_alloc_osd.cpp b/src/cli_alloc_osd.cpp index 0a5e9a418..676de69b6 100644 --- a/src/cli_alloc_osd.cpp +++ b/src/cli_alloc_osd.cpp @@ -13,7 +13,6 @@ struct alloc_osd_t { cli_tool_t *parent; - json11::Json result; uint64_t new_id = 1; int state = 0; @@ -29,7 +28,7 @@ struct alloc_osd_t goto resume_1; do { - etcd_txn(json11::Json::object { + parent->etcd_txn(json11::Json::object { { "compare", json11::Json::array { json11::Json::object { { "target", "VERSION" }, @@ -63,10 +62,10 @@ struct alloc_osd_t state = 1; if (parent->waiting > 0) return; - if (!result["succeeded"].bool_value()) + if (!parent->etcd_result["succeeded"].bool_value()) { std::vector used; - for (auto kv: result["responses"][0]["response_range"]["kvs"].array_items()) + for (auto kv: parent->etcd_result["responses"][0]["response_range"]["kvs"].array_items()) { std::string key = base64_decode(kv["key"].string_value()); osd_num_t cur_osd; @@ -98,25 +97,9 @@ struct alloc_osd_t new_id = used[e-1]+1; } } - } while (!result["succeeded"].bool_value()); + } while (!parent->etcd_result["succeeded"].bool_value()); state = 100; } - - void etcd_txn(json11::Json txn) - { - parent->waiting++; - parent->cli->st_cli.etcd_txn(txn, parent->cli->st_cli.etcd_slow_timeout, [this](std::string err, json11::Json res) - { - parent->waiting--; - if (err != "") - { - fprintf(stderr, "Error reading from etcd: %s\n", err.c_str()); - exit(1); - } - this->result = res; - parent->ringloop->wakeup(); - }); - } }; std::function cli_tool_t::start_alloc_osd(json11::Json cfg, uint64_t *out) diff --git a/src/cli_create.cpp b/src/cli_create.cpp index 9c652e355..2ac24f2c0 100644 --- a/src/cli_create.cpp +++ b/src/cli_create.cpp @@ -31,7 +31,6 @@ struct image_creator_t inode_t new_parent_id = 0; inode_t new_id = 0, old_id = 0; uint64_t max_id_mod_rev = 0, cfg_mod_rev = 0, idx_mod_rev = 0; - json11::Json result; int state = 0; @@ -125,26 +124,26 @@ struct image_creator_t } do { - etcd_txn(json11::Json::object { + parent->etcd_txn(json11::Json::object { { "success", json11::Json::array { get_next_id() } } }); state = 2; resume_2: if (parent->waiting > 0) return; - extract_next_id(result["responses"][0]); + extract_next_id(parent->etcd_result["responses"][0]); attempt_create(); state = 3; resume_3: if (parent->waiting > 0) return; - if (!result["succeeded"].bool_value() && - result["responses"][0]["response_range"]["kvs"].array_items().size() > 0) + if (!parent->etcd_result["succeeded"].bool_value() && + parent->etcd_result["responses"][0]["response_range"]["kvs"].array_items().size() > 0) { fprintf(stderr, "Image %s already exists\n", image_name.c_str()); exit(1); } - } while (!result["succeeded"].bool_value()); + } while (!parent->etcd_result["succeeded"].bool_value()); if (parent->progress) { printf("Image %s created\n", image_name.c_str()); @@ -196,13 +195,13 @@ resume_3: resume_4: if (parent->waiting > 0) return; - if (!result["succeeded"].bool_value() && - result["responses"][0]["response_range"]["kvs"].array_items().size() > 0) + if (!parent->etcd_result["succeeded"].bool_value() && + parent->etcd_result["responses"][0]["response_range"]["kvs"].array_items().size() > 0) { fprintf(stderr, "Snapshot %s@%s already exists\n", image_name.c_str(), new_snap.c_str()); exit(1); } - } while (!result["succeeded"].bool_value()); + } while (!parent->etcd_result["succeeded"].bool_value()); if (parent->progress) { printf("Snapshot %s@%s created\n", image_name.c_str(), new_snap.c_str()); @@ -246,7 +245,7 @@ resume_4: goto resume_2; else if (state == 3) goto resume_3; - etcd_txn(json11::Json::object { { "success", json11::Json::array { + parent->etcd_txn(json11::Json::object { { "success", json11::Json::array { get_next_id(), json11::Json::object { { "request_range", json11::Json::object { @@ -260,11 +259,11 @@ resume_4: resume_2: if (parent->waiting > 0) return; - extract_next_id(result["responses"][0]); + extract_next_id(parent->etcd_result["responses"][0]); old_id = 0; old_pool_id = 0; cfg_mod_rev = idx_mod_rev = 0; - if (result["responses"][1]["response_range"]["kvs"].array_items().size() == 0) + if (parent->etcd_result["responses"][1]["response_range"]["kvs"].array_items().size() == 0) { for (auto & ic: parent->cli->st_cli.inode_config) { @@ -283,7 +282,7 @@ resume_2: { // FIXME: Parse kvs in etcd_state_client automatically { - auto kv = parent->cli->st_cli.parse_etcd_kv(result["responses"][1]["response_range"]["kvs"][0]); + auto kv = parent->cli->st_cli.parse_etcd_kv(parent->etcd_result["responses"][1]["response_range"]["kvs"][0]); old_id = INODE_NO_POOL(kv.value["id"].uint64_value()); old_pool_id = (pool_id_t)kv.value["pool_id"].uint64_value(); idx_mod_rev = kv.mod_revision; @@ -293,7 +292,7 @@ resume_2: exit(1); } } - etcd_txn(json11::Json::object { + parent->etcd_txn(json11::Json::object { { "success", json11::Json::array { json11::Json::object { { "request_range", json11::Json::object { @@ -310,7 +309,7 @@ resume_3: if (parent->waiting > 0) return; { - auto kv = parent->cli->st_cli.parse_etcd_kv(result["responses"][0]["response_range"]["kvs"][0]); + auto kv = parent->cli->st_cli.parse_etcd_kv(parent->etcd_result["responses"][0]["response_range"]["kvs"][0]); size = kv.value["size"].uint64_value(); new_parent_id = kv.value["parent_id"].uint64_value(); uint64_t parent_pool_id = kv.value["parent_pool_id"].uint64_value(); @@ -439,28 +438,12 @@ resume_3: } }, }); }; - etcd_txn(json11::Json::object { + parent->etcd_txn(json11::Json::object { { "compare", checks }, { "success", success }, { "failure", failure }, }); } - - void etcd_txn(json11::Json txn) - { - parent->waiting++; - parent->cli->st_cli.etcd_txn(txn, parent->cli->st_cli.etcd_slow_timeout, [this](std::string err, json11::Json res) - { - parent->waiting--; - if (err != "") - { - fprintf(stderr, "Error reading from etcd: %s\n", err.c_str()); - exit(1); - } - this->result = res; - parent->ringloop->wakeup(); - }); - } }; uint64_t parse_size(std::string size_str) diff --git a/src/cli_df.cpp b/src/cli_df.cpp index 7744258eb..b486c240c 100644 --- a/src/cli_df.cpp +++ b/src/cli_df.cpp @@ -24,8 +24,7 @@ struct pool_lister_t if (state == 1) goto resume_1; // Space statistics - pool/stats/ - parent->waiting++; - parent->cli->st_cli.etcd_txn(json11::Json::object { + parent->etcd_txn(json11::Json::object { { "success", json11::Json::array { json11::Json::object { { "request_range", json11::Json::object { @@ -48,21 +47,12 @@ struct pool_lister_t } }, }, } }, - }, parent->cli->st_cli.etcd_slow_timeout, [this](std::string err, json11::Json res) - { - parent->waiting--; - if (err != "") - { - fprintf(stderr, "Error reading from etcd: %s\n", err.c_str()); - exit(1); - } - space_info = res; - parent->ringloop->wakeup(); }); state = 1; resume_1: if (parent->waiting > 0) return; + space_info = parent->etcd_result; std::map osd_free; for (auto & kv_item: space_info["responses"][0]["response_range"]["kvs"].array_items()) { diff --git a/src/cli_ls.cpp b/src/cli_ls.cpp index 2fd250926..e9a7450bc 100644 --- a/src/cli_ls.cpp +++ b/src/cli_ls.cpp @@ -84,8 +84,7 @@ struct image_lister_t // Space statistics // inode/stats//::raw_used divided by pool/stats/::pg_real_size // multiplied by 1 or number of data drives - parent->waiting++; - parent->cli->st_cli.etcd_txn(json11::Json::object { + parent->etcd_txn(json11::Json::object { { "success", json11::Json::array { json11::Json::object { { "request_range", json11::Json::object { @@ -112,21 +111,12 @@ struct image_lister_t } }, }, } }, - }, parent->cli->st_cli.etcd_slow_timeout, [this](std::string err, json11::Json res) - { - parent->waiting--; - if (err != "") - { - fprintf(stderr, "Error reading from etcd: %s\n", err.c_str()); - exit(1); - } - space_info = res; - parent->ringloop->wakeup(); }); state = 1; resume_1: if (parent->waiting > 0) return; + space_info = parent->etcd_result; std::map pool_pg_real_size; for (auto & kv_item: space_info["responses"][0]["response_range"]["kvs"].array_items()) { diff --git a/src/cli_modify.cpp b/src/cli_modify.cpp index 8ffe493cc..aeb0e839a 100644 --- a/src/cli_modify.cpp +++ b/src/cli_modify.cpp @@ -170,29 +170,19 @@ resume_1: } } }); } - parent->waiting++; - parent->cli->st_cli.etcd_txn(json11::Json::object { + parent->etcd_txn(json11::Json::object { { "compare", checks }, { "success", success }, - }, parent->cli->st_cli.etcd_slow_timeout, [this](std::string err, json11::Json res) - { - if (err != "") - { - fprintf(stderr, "Error changing %s: %s\n", image_name.c_str(), err.c_str()); - exit(1); - } - if (!res["succeeded"].bool_value()) - { - fprintf(stderr, "Image %s was modified by someone else, please repeat your request\n", image_name.c_str()); - exit(1); - } - parent->waiting--; - parent->ringloop->wakeup(); }); state = 2; resume_2: if (parent->waiting > 0) return; + if (!parent->etcd_result["succeeded"].bool_value()) + { + fprintf(stderr, "Image %s was modified by someone else, please repeat your request\n", image_name.c_str()); + exit(1); + } printf("Image %s modified\n", image_name.c_str()); state = 100; } diff --git a/src/cli_snap_rm.cpp b/src/cli_snap_rm.cpp index 61a6b2422..a7d1ba4fa 100644 --- a/src/cli_snap_rm.cpp +++ b/src/cli_snap_rm.cpp @@ -256,9 +256,9 @@ resume_9: }); } parent->waiting++; - parent->cli->st_cli.etcd_txn(json11::Json::object { + parent->cli->st_cli.etcd_txn_slow(json11::Json::object { { "success", reads }, - }, parent->cli->st_cli.etcd_slow_timeout, [this](std::string err, json11::Json data) + }, [this](std::string err, json11::Json data) { parent->waiting--; if (err != "") @@ -414,10 +414,10 @@ resume_9: } } parent->waiting++; - parent->cli->st_cli.etcd_txn(json11::Json::object { + parent->cli->st_cli.etcd_txn_slow(json11::Json::object { { "compare", cmp }, { "success", txn }, - }, parent->cli->st_cli.etcd_slow_timeout, [this, target_name, child_name](std::string err, json11::Json res) + }, [this, target_name, child_name](std::string err, json11::Json res) { parent->waiting--; if (err != "") @@ -454,7 +454,7 @@ resume_9: "/"+std::to_string(INODE_NO_POOL(cur)) ); parent->waiting++; - parent->cli->st_cli.etcd_txn(json11::Json::object { + parent->cli->st_cli.etcd_txn_slow(json11::Json::object { { "compare", json11::Json::array { json11::Json::object { { "target", "MOD" }, @@ -475,7 +475,7 @@ resume_9: } }, }, } }, - }, parent->cli->st_cli.etcd_slow_timeout, [this, cur_name](std::string err, json11::Json res) + }, [this, cur_name](std::string err, json11::Json res) { parent->waiting--; if (err != "") diff --git a/src/etcd_state_client.cpp b/src/etcd_state_client.cpp index 582a70b74..7681e30e2 100644 --- a/src/etcd_state_client.cpp +++ b/src/etcd_state_client.cpp @@ -54,12 +54,18 @@ etcd_kv_t etcd_state_client_t::parse_etcd_kv(const json11::Json & kv_json) return kv; } -void etcd_state_client_t::etcd_txn(json11::Json txn, int timeout, std::function callback) +void etcd_state_client_t::etcd_txn(json11::Json txn, int timeout, int retries, int interval, std::function callback) { - etcd_call("/kv/txn", txn, timeout, callback); + etcd_call("/kv/txn", txn, timeout, retries, interval, callback); } -void etcd_state_client_t::etcd_call(std::string api, json11::Json payload, int timeout, std::function callback) +void etcd_state_client_t::etcd_txn_slow(json11::Json txn, std::function callback) +{ + etcd_call("/kv/txn", txn, etcd_slow_timeout, max_etcd_attempts, 0, callback); +} + +void etcd_state_client_t::etcd_call(std::string api, json11::Json payload, int timeout, + int retries, int interval, std::function callback) { if (!etcd_addresses.size() && !etcd_local.size()) { @@ -83,7 +89,8 @@ void etcd_state_client_t::etcd_call(std::string api, json11::Json payload, int t "Connection: keep-alive\r\n" "Keep-Alive: timeout="+std::to_string(etcd_keepalive_timeout)+"\r\n" "\r\n"+req; - auto cb = [this, cur_addr = selected_etcd_address, callback](const http_response_t *response) + auto cb = [this, api, payload, timeout, retries, interval, callback, + cur_addr = selected_etcd_address](const http_response_t *response) { std::string err; json11::Json data; @@ -92,8 +99,30 @@ void etcd_state_client_t::etcd_call(std::string api, json11::Json payload, int t { if (cur_addr == selected_etcd_address) selected_etcd_address = ""; + if (retries > 0) + { + if (this->log_level > 0) + { + printf( + "Warning: etcd request failed: %s, retrying %d more times\n", + err.c_str(), retries + ); + } + if (interval > 0) + { + tfd->set_timer(interval, false, [this, api, payload, timeout, retries, interval, callback](int) + { + etcd_call(api, payload, timeout, retries-1, interval, callback); + }); + } + else + etcd_call(api, payload, timeout, retries-1, interval, callback); + } + else + callback(err, data); } - callback(err, data); + else + callback(err, data); }; if (!keepalive_client) { @@ -255,6 +284,8 @@ void etcd_state_client_t::start_etcd_watcher() http_close(etcd_watch_ws); etcd_watch_ws = NULL; } + if (this->log_level > 1) + printf("Trying to connect to etcd websocket at %s\n", etcd_address.c_str()); etcd_watch_ws = open_websocket(tfd, etcd_address, etcd_api_path+"/watch", etcd_slow_timeout, [this, cur_addr = selected_etcd_address](const http_response_t *msg) { @@ -438,7 +469,7 @@ void etcd_state_client_t::load_global_config() { etcd_call("/kv/range", json11::Json::object { { "key", base64_encode(etcd_prefix+"/config/global") } - }, etcd_slow_timeout, [this](std::string err, json11::Json data) + }, etcd_slow_timeout, max_etcd_attempts, 0, [this](std::string err, json11::Json data) { if (err != "") { @@ -511,10 +542,11 @@ void etcd_state_client_t::load_pgs() { req["compare"] = checks; } - etcd_txn(req, etcd_slow_timeout, [this](std::string err, json11::Json data) + etcd_txn_slow(req, [this](std::string err, json11::Json data) { if (err != "") { + // Retry indefinitely fprintf(stderr, "Error loading PGs from etcd: %s\n", err.c_str()); tfd->set_timer(etcd_slow_timeout, false, [this](int timer_id) { diff --git a/src/etcd_state_client.h b/src/etcd_state_client.h index c86a70422..2cb53d244 100644 --- a/src/etcd_state_client.h +++ b/src/etcd_state_client.h @@ -112,8 +112,9 @@ public: json11::Json::object serialize_inode_cfg(inode_config_t *cfg); etcd_kv_t parse_etcd_kv(const json11::Json & kv_json); - void etcd_call(std::string api, json11::Json payload, int timeout, std::function callback); - void etcd_txn(json11::Json txn, int timeout, std::function callback); + void etcd_call(std::string api, json11::Json payload, int timeout, int retries, int interval, std::function callback); + void etcd_txn(json11::Json txn, int timeout, int retries, int interval, std::function callback); + void etcd_txn_slow(json11::Json txn, std::function callback); void start_etcd_watcher(); void load_global_config(); void load_pgs(); diff --git a/src/http_client.cpp b/src/http_client.cpp index c9af942f1..eca2c255f 100644 --- a/src/http_client.cpp +++ b/src/http_client.cpp @@ -59,6 +59,7 @@ struct http_co_t inline void stackin() { onstack++; } inline void stackout() { onstack--; if (!onstack && ended) end(); } inline void end() { ended = true; if (!onstack) { delete this; } } + void run_cb_and_clear(); void start_connection(); void close_connection(); void handle_events(); @@ -119,6 +120,16 @@ void http_request(http_co_t *handler, const std::string & host, const std::strin handler->send_request(host, request, options, response_callback); } +void http_co_t::run_cb_and_clear() +{ + parsed.eof = true; + std::function cb; + cb.swap(response_callback); + // Call callback after clearing it because otherwise we may hit reenterability problems + if (cb != NULL) + cb(&parsed); +} + void http_co_t::send_request(const std::string & host, const std::string & request, const http_options_t & options, std::function response_callback) { @@ -137,6 +148,10 @@ void http_co_t::send_request(const std::string & host, const std::string & reque stackout(); return; } + if (state == HTTP_CO_KEEPALIVE && connected_host != host) + { + close_connection(); + } this->request_timeout = options.timeout < 0 ? 0 : (options.timeout == 0 ? DEFAULT_TIMEOUT : options.timeout); this->want_streaming = options.want_streaming; this->keepalive = options.keepalive; @@ -146,22 +161,14 @@ void http_co_t::send_request(const std::string & host, const std::string & reque this->sent = 0; this->response_callback = response_callback; this->parsed = {}; - if (state == HTTP_CO_KEEPALIVE && connected_host != host) - { - close_connection(); - } if (request_timeout > 0) { timeout_id = tfd->set_timer(request_timeout, false, [this](int timer_id) { stackin(); close_connection(); - if (this->response_callback != NULL) - { - parsed = { .error = "HTTP request timed out" }; - this->response_callback(&parsed); - this->response_callback = NULL; - } + parsed = { .error = "HTTP request timed out" }; + run_cb_and_clear(); stackout(); }); } @@ -258,6 +265,7 @@ void http_co_t::close_connection() state = HTTP_CO_CLOSED; connected_host = ""; response = ""; + epoll_events = 0; } void http_co_t::start_connection() @@ -267,8 +275,7 @@ void http_co_t::start_connection() if (!string_to_addr(host.c_str(), 1, 80, &addr)) { parsed = { .error = "Invalid address: "+host }; - response_callback(&parsed); - response_callback = NULL; + run_cb_and_clear(); stackout(); return; } @@ -276,8 +283,7 @@ void http_co_t::start_connection() if (peer_fd < 0) { parsed = { .error = std::string("socket: ")+strerror(errno) }; - response_callback(&parsed); - response_callback = NULL; + run_cb_and_clear(); stackout(); return; } @@ -289,8 +295,7 @@ void http_co_t::start_connection() { close_connection(); parsed = { .error = std::string("connect: ")+strerror(errno) }; - response_callback(&parsed); - response_callback = NULL; + run_cb_and_clear(); stackout(); return; } @@ -323,12 +328,7 @@ void http_co_t::handle_events() else if (epoll_events & (EPOLLRDHUP|EPOLLERR)) { close_connection(); - if (response_callback != NULL) - { - parsed.eof = true; - response_callback(&parsed); - response_callback = NULL; - } + run_cb_and_clear(); break; } } @@ -349,8 +349,7 @@ void http_co_t::handle_connect_result() { close_connection(); parsed = { .error = std::string("connect: ")+strerror(result) }; - response_callback(&parsed); - response_callback = NULL; + run_cb_and_clear(); stackout(); return; } @@ -388,12 +387,8 @@ again: else if (res < 0) { close_connection(); - if (response_callback != NULL) - { - parsed = { .error = std::string("sendmsg: ")+strerror(errno) }; - response_callback(&parsed); - response_callback = NULL; - } + parsed = { .error = std::string("sendmsg: ")+strerror(errno) }; + run_cb_and_clear(); stackout(); return; } @@ -442,13 +437,7 @@ void http_co_t::submit_read() close_connection(); if (res < 0) parsed = { .error = std::string("recvmsg: ")+strerror(-res) }; - else - parsed.eof = true; - if (response_callback != NULL) - { - response_callback(&parsed); - response_callback = NULL; - } + run_cb_and_clear(); } else { @@ -497,12 +486,8 @@ bool http_co_t::handle_read() { // Sorry, unsupported response close_connection(); - if (response_callback != NULL) - { - parsed = { .error = "Response has neither Connection: close, nor Transfer-Encoding: chunked nor Content-Length headers" }; - response_callback(&parsed); - response_callback = NULL; - } + parsed = { .error = "Response has neither Connection: close, nor Transfer-Encoding: chunked nor Content-Length headers" }; + run_cb_and_clear(); stackout(); return false; } diff --git a/src/osd_cluster.cpp b/src/osd_cluster.cpp index 58002a00b..05098d4ce 100644 --- a/src/osd_cluster.cpp +++ b/src/osd_cluster.cpp @@ -277,7 +277,7 @@ void osd_t::report_statistics() } } }); } - st_cli.etcd_txn(json11::Json::object { { "success", txn } }, st_cli.etcd_slow_timeout, [this](std::string err, json11::Json res) + st_cli.etcd_txn_slow(json11::Json::object { { "success", txn } }, [this](std::string err, json11::Json res) { etcd_reporting_stats = false; if (err != "") @@ -356,11 +356,11 @@ void osd_t::acquire_lease() // Maximum lease TTL is (report interval) + retries * (timeout + repeat interval) st_cli.etcd_call("/lease/grant", json11::Json::object { { "TTL", etcd_report_interval+(st_cli.max_etcd_attempts*(2*st_cli.etcd_quick_timeout)+999)/1000 } - }, st_cli.etcd_quick_timeout, [this](std::string err, json11::Json data) + }, st_cli.etcd_quick_timeout, 0, 0, [this](std::string err, json11::Json data) { if (err != "" || data["ID"].string_value() == "") { - printf("Error acquiring a lease from etcd: %s\n", err.c_str()); + printf("Error acquiring a lease from etcd: %s, retrying\n", err.c_str()); tfd->set_timer(st_cli.etcd_quick_timeout, false, [this](int timer_id) { acquire_lease(); @@ -408,7 +408,7 @@ void osd_t::create_osd_state() } } }, } }, - }, st_cli.etcd_quick_timeout, [this](std::string err, json11::Json data) + }, st_cli.etcd_quick_timeout, 0, 0, [this](std::string err, json11::Json data) { if (err != "") { @@ -452,7 +452,7 @@ void osd_t::renew_lease() { st_cli.etcd_call("/lease/keepalive", json11::Json::object { { "ID", etcd_lease_id } - }, st_cli.etcd_quick_timeout, [this](std::string err, json11::Json data) + }, st_cli.etcd_quick_timeout, 0, 0, [this](std::string err, json11::Json data) { if (err == "" && data["result"]["TTL"].string_value() == "") { @@ -488,7 +488,7 @@ void osd_t::force_stop(int exitcode) { st_cli.etcd_call("/kv/lease/revoke", json11::Json::object { { "ID", etcd_lease_id } - }, st_cli.etcd_quick_timeout, [this, exitcode](std::string err, json11::Json data) + }, st_cli.etcd_quick_timeout, st_cli.max_etcd_attempts, 0, [this, exitcode](std::string err, json11::Json data) { if (err != "") { @@ -826,7 +826,7 @@ void osd_t::report_pg_states() etcd_reporting_pg_state = true; st_cli.etcd_txn(json11::Json::object { { "compare", checks }, { "success", success }, { "failure", failure } - }, st_cli.etcd_quick_timeout, [this, reporting_pgs](std::string err, json11::Json data) + }, st_cli.etcd_quick_timeout, 0, 0, [this, reporting_pgs](std::string err, json11::Json data) { etcd_reporting_pg_state = false; if (!data["succeeded"].bool_value()) diff --git a/tests/run_3osds.sh b/tests/run_3osds.sh index 4766a9e96..a507a98d4 100644 --- a/tests/run_3osds.sh +++ b/tests/run_3osds.sh @@ -31,7 +31,7 @@ else $ETCDCTL put /vitastor/config/pools '{"1":{"name":"testpool","scheme":"xor","pg_size":'$PG_SIZE',"pg_minsize":'$PG_MINSIZE',"parity_chunks":1,"pg_count":'$PG_COUNT',"failure_domain":"osd"}}' fi -sleep 2 +sleep 3 if ! ($ETCDCTL get /vitastor/config/pgs --print-value-only | jq -s -e '(. | length) != 0 and ([ .[0].items["1"][] | select(((.osd_set | select(. != 0) | sort | unique) | length) == '$PG_SIZE') ] | length) == '$PG_COUNT); then format_error "FAILED: $PG_COUNT PG(s) NOT CONFIGURED"