From 2a8e40835e92e15ad1ca0e7f82f1991d3334f610 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Fri, 17 Apr 2020 01:59:06 +0300 Subject: [PATCH] Fix reporting to Consul, report even if we are purely secondary --- osd.cpp | 2 ++ osd.h | 1 + osd_cluster.cpp | 23 +++++++++++++++++++---- osd_http.cpp | 14 +++++++------- osd_peering.cpp | 4 ---- timerfd_manager.cpp | 11 ++++++++++- 6 files changed, 39 insertions(+), 16 deletions(-) diff --git a/osd.cpp b/osd.cpp index 8c4e32d9..733d82e3 100644 --- a/osd.cpp +++ b/osd.cpp @@ -47,6 +47,8 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo if (run_primary) init_primary(); + init_cluster(); + consumer.loop = [this]() { loop(); }; ringloop->register_consumer(&consumer); } diff --git a/osd.h b/osd.h index ed1c340f..dad47fd9 100644 --- a/osd.h +++ b/osd.h @@ -267,6 +267,7 @@ class osd_t void print_stats(); void reset_stats(); json11::Json get_status(); + void init_cluster(); void report_status(); void load_pgs(); void parse_pgs(json11::Json data); diff --git a/osd_cluster.cpp b/osd_cluster.cpp index e3e08b08..192f6783 100644 --- a/osd_cluster.cpp +++ b/osd_cluster.cpp @@ -2,6 +2,19 @@ #include "osd_http.h" #include "base64.h" +void osd_t::init_cluster() +{ + if (consul_address != "") + { + printf("OSD %lu reporting to Consul at %s each %d seconds\n", osd_num, consul_address.c_str(), consul_report_interval); + report_status(); + this->consul_tfd = new timerfd_interval(ringloop, consul_report_interval, [this]() + { + report_status(); + }); + } +} + json11::Json osd_t::get_status() { json11::Json::object st; @@ -18,6 +31,8 @@ json11::Json osd_t::get_status() st["addresses"] = bind_addresses; } st["port"] = bind_port; + st["primary_enabled"] = run_primary; + st["blockstore_ready"] = bs->is_started(); st["blockstore_enabled"] = bs ? true : false; if (bs) { @@ -69,9 +84,9 @@ json11::Json osd_t::get_status() void osd_t::report_status() { std::string st = get_status().dump(); - // (!) Keys end with / to allow "select /osd/state/123/ by prefix" + // (!) Keys end with . to allow "select /osd/state/123. by prefix" // because Consul transactions fail if you try to read non-existing keys - std::string req = "PUT /v1/kv/"+consul_prefix+"/osd/state/"+std::to_string(osd_num)+"/ HTTP/1.1\r\n"+ + std::string req = "PUT /v1/kv/"+consul_prefix+"/osd/state/"+std::to_string(osd_num)+". HTTP/1.1\r\n"+ "Host: "+consul_host+"\r\n"+ "Content-Length: "+std::to_string(st.size())+"\r\n"+ "Connection: close\r\n"+ @@ -211,7 +226,7 @@ void osd_t::load_and_connect_peers() consul_txn.push_back(json11::Json::object { { "KV", json11::Json::object { { "Verb", "get-tree" }, - { "Key", consul_prefix+"/osd/state/"+std::to_string(osd_num)+"/" }, + { "Key", consul_prefix+"/osd/state/"+std::to_string(osd_num)+"." }, } } }); } @@ -283,7 +298,7 @@ void osd_t::load_and_connect_peers() for (auto & res: data["Results"].array_items()) { std::string key = res["KV"]["Key"].string_value(); - // /osd/state// + // /osd/state/. osd_num_t osd_num = std::stoull(key.substr(consul_prefix.length()+11, key.length()-consul_prefix.length()-12)); std::string json_err; json11::Json data = json11::Json::parse(base64_decode(res["KV"]["Value"].string_value()), json_err); diff --git a/osd_http.cpp b/osd_http.cpp index 9b87fdad..dcdf3d69 100644 --- a/osd_http.cpp +++ b/osd_http.cpp @@ -308,12 +308,7 @@ void http_co_t::resume() // Read response if (st == 5) { - if (epoll_events & (EPOLLRDHUP|EPOLLERR)) - { - delete this; - return; - } - else if (epoll_events & EPOLLIN) + if (epoll_events & EPOLLIN) { if (rbuf.size() != 9000) rbuf.resize(9000); @@ -332,7 +327,12 @@ void http_co_t::resume() }; my_uring_prep_recvmsg(sqe, peer_fd, &msg, 0); st = 6; - epoll_events = 0; + epoll_events = epoll_events & ~EPOLLIN; + } + else if (epoll_events & (EPOLLRDHUP|EPOLLERR)) + { + delete this; + return; } } if (st == 6) diff --git a/osd_peering.cpp b/osd_peering.cpp index 6cd3507f..28d69bc7 100644 --- a/osd_peering.cpp +++ b/osd_peering.cpp @@ -38,10 +38,6 @@ void osd_t::init_primary() { peering_state = OSD_LOADING_PGS; load_pgs(); - this->consul_tfd = new timerfd_interval(ringloop, consul_report_interval, [this]() - { - report_status(); - }); } if (autosync_interval > 0) { diff --git a/timerfd_manager.cpp b/timerfd_manager.cpp index 3d68fb93..3c8f1cb9 100644 --- a/timerfd_manager.cpp +++ b/timerfd_manager.cpp @@ -98,10 +98,19 @@ void timerfd_manager_t::set_nearest() nearest = i; } } + timespec now; + clock_gettime(CLOCK_MONOTONIC, &now); itimerspec exp = { .it_interval = { 0 }, .it_value = timers[nearest].next, }; + exp.it_value.tv_sec -= now.tv_sec; + exp.it_value.tv_nsec -= now.tv_nsec; + if (exp.it_value.tv_nsec < 0) + { + exp.it_value.tv_sec--; + exp.it_value.tv_nsec += 1000000000; + } if (timerfd_settime(timerfd, 0, &exp, NULL)) { throw std::runtime_error(std::string("timerfd_settime: ") + strerror(errno)); @@ -130,7 +139,7 @@ void timerfd_manager_t::set_wait() } ring_data_t *data = ((ring_data_t*)sqe->user_data); my_uring_prep_poll_add(sqe, timerfd, POLLIN); - data->callback = [&](ring_data_t *data) + data->callback = [this](ring_data_t *data) { if (data->res < 0) {