diff --git a/osd.cpp b/osd.cpp index d6577ce74..8c4e32d9d 100644 --- a/osd.cpp +++ b/osd.cpp @@ -103,6 +103,7 @@ osd_op_t::~osd_op_t() void osd_t::parse_config(blockstore_config_t & config) { consul_address = config["consul_address"]; + consul_host = consul_address.find(':') >= 0 ? consul_address.substr(0, consul_address.find(':')) : consul_address; consul_prefix = config["consul_prefix"]; if (consul_prefix == "") consul_prefix = "microceph"; @@ -112,6 +113,7 @@ void osd_t::parse_config(blockstore_config_t & config) bind_address = config["bind_address"]; if (bind_address == "") bind_address = "0.0.0.0"; + // FIXME: select port automatically from range bind_port = strtoull(config["bind_port"].c_str(), NULL, 10); if (!bind_port || bind_port > 65535) bind_port = 11203; @@ -134,6 +136,9 @@ void osd_t::parse_config(blockstore_config_t & config) print_stats_interval = strtoull(config["print_stats_interval"].c_str(), NULL, 10); if (!print_stats_interval) print_stats_interval = 3; + peer_connect_interval = strtoull(config["peer_connect_interval"].c_str(), NULL, 10); + if (!peer_connect_interval) + peer_connect_interval = 5; } void osd_t::bind_socket() diff --git a/osd.h b/osd.h index f33f068a5..ed1c340fb 100644 --- a/osd.h +++ b/osd.h @@ -35,10 +35,11 @@ #define PEER_CONNECTING 1 #define PEER_CONNECTED 2 -#define OSD_CONNECTING_PEERS 1 -#define OSD_PEERING_PGS 2 -#define OSD_FLUSHING_PGS 4 -#define OSD_RECOVERING 8 +#define OSD_LOADING_PGS 0x01 +#define OSD_CONNECTING_PEERS 0x02 +#define OSD_PEERING_PGS 0x04 +#define OSD_FLUSHING_PGS 0x08 +#define OSD_RECOVERING 0x10 #define IMMEDIATE_NONE 0 #define IMMEDIATE_SMALL 1 @@ -50,6 +51,7 @@ #define DEFAULT_RECOVERY_QUEUE 4 #define MAX_CONSUL_ATTEMPTS 5 +#define CONSUL_START_INTERVAL 5000 #define CONSUL_RETRY_INTERVAL 1000 //#define OSD_STUB @@ -127,14 +129,6 @@ struct osd_op_t ~osd_op_t(); }; -struct osd_peer_def_t -{ - osd_num_t osd_num = 0; - std::string addr; - int port = 0; - time_t last_connect_attempt = 0; -}; - struct osd_client_t { sockaddr_in peer_addr; @@ -188,6 +182,13 @@ struct osd_recovery_op_t osd_op_t *osd_op = NULL; }; +struct osd_wanted_peer_t +{ + bool connecting; + time_t last_connect_attempt, last_load_attempt; + int address_index; +}; + class osd_t { friend struct http_co_t; @@ -198,7 +199,6 @@ class osd_t std::string consul_address, consul_host, consul_prefix = "microceph"; osd_num_t osd_num = 1; // OSD numbers start with 1 bool run_primary = false; - std::vector peers; blockstore_config_t config; std::string bind_address; int bind_port, listen_backlog; @@ -210,9 +210,13 @@ class osd_t int immediate_commit = IMMEDIATE_NONE; int autosync_interval = DEFAULT_AUTOSYNC_INTERVAL; // sync every 5 seconds int recovery_queue_depth = DEFAULT_RECOVERY_QUEUE; + int peer_connect_interval = 5; // peer OSDs + std::map peer_states; + std::map wanted_peers; + bool loading_peer_config = false; std::vector bind_addresses; int consul_failed_attempts = 0; @@ -264,6 +268,9 @@ class osd_t void reset_stats(); json11::Json get_status(); void report_status(); + void load_pgs(); + void parse_pgs(json11::Json data); + void load_and_connect_peers(); // event loop, socket read/write void loop(); @@ -278,6 +285,7 @@ class osd_t void handle_send(ring_data_t *data, int peer_fd); void outbox_push(osd_client_t & cl, osd_op_t *op); void http_request(std::string host, std::string request, std::function callback); + void http_request_json(std::string host, std::string request, std::function callback); // peer handling (primary OSD logic) void connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port, std::function callback); @@ -285,7 +293,7 @@ class osd_t void cancel_osd_ops(osd_client_t & cl); void cancel_op(osd_op_t *op); void stop_client(int peer_fd); - osd_peer_def_t parse_peer(std::string peer); + void parse_test_peer(std::string peer); void init_primary(); void handle_peers(); void repeer_pgs(osd_num_t osd_num, bool is_connected); diff --git a/osd_cluster.cpp b/osd_cluster.cpp index d38cfc6ec..e3e08b08b 100644 --- a/osd_cluster.cpp +++ b/osd_cluster.cpp @@ -1,9 +1,13 @@ #include "osd.h" #include "osd_http.h" +#include "base64.h" json11::Json osd_t::get_status() { json11::Json::object st; + timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + st["time"] = std::to_string(ts.tv_sec)+"."+std::to_string(ts.tv_nsec/1000000); st["state"] = "up"; if (bind_address != "0.0.0.0") st["addresses"] = { bind_address }; @@ -20,25 +24,6 @@ json11::Json osd_t::get_status() st["size"] = bs->get_block_count() * bs->get_block_size(); st["free"] = bs->get_free_block_count() * bs->get_block_size(); } - json11::Json::object pg_status; - for (auto & p: pgs) - { - auto & pg = p.second; - json11::Json::object pg_st; - json11::Json::array pg_state; - for (int i = 0; i < pg_state_bit_count; i++) - if (pg.state & pg_state_bits[i]) - pg_state.push_back(pg_state_names[i]); - pg_st["state"] = pg_state; - pg_st["object_count"] = pg.total_count; - pg_st["clean_count"] = pg.clean_count; - pg_st["misplaced_count"] = pg.misplaced_objects.size(); - pg_st["degraded_count"] = pg.degraded_objects.size(); - pg_st["incomplete_count"] = pg.incomplete_objects.size(); - pg_st["write_osd_set"] = pg.cur_set; - pg_status[std::to_string(pg.pg_num)] = pg_st; - } - st["pgs"] = pg_status; json11::Json::object op_stats, subop_stats; for (int i = 0; i <= OSD_OP_MAX; i++) { @@ -59,18 +44,37 @@ json11::Json osd_t::get_status() return st; } +/* + json11::Json::object pg_status; + for (auto & p: pgs) + { + auto & pg = p.second; + json11::Json::object pg_st; + json11::Json::array pg_state; + for (int i = 0; i < pg_state_bit_count; i++) + if (pg.state & pg_state_bits[i]) + pg_state.push_back(pg_state_names[i]); + pg_st["state"] = pg_state; + pg_st["object_count"] = pg.total_count; + pg_st["clean_count"] = pg.clean_count; + pg_st["misplaced_count"] = pg.misplaced_objects.size(); + pg_st["degraded_count"] = pg.degraded_objects.size(); + pg_st["incomplete_count"] = pg.incomplete_objects.size(); + pg_st["write_osd_set"] = pg.cur_set; + pg_status[std::to_string(pg.pg_num)] = pg_st; + } + st["pgs"] = pg_status; +*/ + void osd_t::report_status() { - if (consul_host == "") - { - consul_host = consul_address; - extract_port(consul_host); - } std::string st = get_status().dump(); - std::string req = "PUT /v1/kv/"+consul_prefix+"/osd/"+std::to_string(osd_num)+" HTTP/1.1\r\n"+ + // (!) Keys end with / to allow "select /osd/state/123/ by prefix" + // because Consul transactions fail if you try to read non-existing keys + std::string req = "PUT /v1/kv/"+consul_prefix+"/osd/state/"+std::to_string(osd_num)+"/ HTTP/1.1\r\n"+ "Host: "+consul_host+"\r\n"+ "Content-Length: "+std::to_string(st.size())+"\r\n"+ - "Connection: close\r\n" + "Connection: close\r\n"+ "\r\n"+st; http_request(consul_address, req, [this](int err, std::string res) { @@ -99,3 +103,196 @@ void osd_t::report_status() } }); } + +// Start -> Load PGs -> Load peers -> Connect to peers -> Peer PGs +// Wait for PG changes -> Start/Stop PGs when requested +// Peer connection is lost -> Reload connection data -> Try to reconnect -> Repeat +void osd_t::load_pgs() +{ + assert(this->pgs.size() == 0); + std::string req = "GET /v1/kv/"+consul_prefix+"/config/pgs?raw"+ + /*(consul_change_index > 0 ? "&index="+std::to_string(consul_change_index) : "")+*/ + " HTTP/1.1\r\n"+ + "Host: "+consul_host+"\r\n"+ + "Connection: close\r\n"+ + "\r\n"; + http_request_json(consul_address, req, [this](std::string err, json11::Json data) + { + if (err != "") + { + printf("Error loading PGs from Consul: %s\n", err.c_str()); + tfd->set_timer(CONSUL_START_INTERVAL, false, [this](int timer_id) + { + load_pgs(); + }); + return; + } + parse_pgs(data); + peering_state = OSD_CONNECTING_PEERS; + }); +} + +void osd_t::parse_pgs(json11::Json data) +{ + uint64_t pg_count = 0; + for (auto pg_item: data.object_items()) + { + char *pg_num_end = NULL; + pg_num_t pg_num = strtoull(pg_item.first.c_str(), &pg_num_end, 10); + if (!pg_num || *pg_num_end != 0) + { + throw std::runtime_error("Bad key in PG hash: "+pg_item.first); + } + auto & pg_json = pg_item.second; + osd_num_t primary_osd = 0; + std::vector target_set; + for (auto pg_osd_num: pg_json["osd_set"].array_items()) + { + osd_num_t pg_osd = pg_osd_num.uint64_value(); + target_set.push_back(pg_osd); + if (pg_osd != 0 && primary_osd == 0) + { + primary_osd = pg_osd; + } + } + if (target_set.size() != 3) + { + throw std::runtime_error("Bad PG "+std::to_string(pg_num)+" config format: incorrect osd_set"); + } + if (primary_osd == this->osd_num) + { + // Take this PG + this->pgs[pg_num] = (pg_t){ + .state = PG_PEERING, + .pg_cursize = 0, + .pg_num = pg_num, + .target_set = target_set, + .cur_set = target_set, + }; + this->pgs[pg_num].print_state(); + // Add peers + for (auto pg_osd: target_set) + { + // FIXME: Add OSDs from PG history to peers + if (pg_osd != this->osd_num && osd_peer_fds.find(pg_osd) == osd_peer_fds.end()) + { + wanted_peers[pg_osd] = { 0 }; + } + } + } + pg_count++; + } + this->pg_count = pg_count; +} + +void osd_t::load_and_connect_peers() +{ + json11::Json::array consul_txn; + for (auto wp_it = wanted_peers.begin(); wp_it != wanted_peers.end();) + { + osd_num_t osd_num = wp_it->first; + if (osd_peer_fds.find(osd_num) != osd_peer_fds.end()) + { + // It shouldn't be here + wanted_peers.erase(wp_it++); + if (!wanted_peers.size()) + { + // Connected to all peers + peering_state = peering_state & ~OSD_CONNECTING_PEERS; + } + } + else if (peer_states.find(osd_num) == peer_states.end() && + time(NULL) - wp_it->second.last_load_attempt >= peer_connect_interval) + { + if (!loading_peer_config) + { + // (Re)load OSD state from Consul + wp_it->second.last_load_attempt = time(NULL); + consul_txn.push_back(json11::Json::object { + { "KV", json11::Json::object { + { "Verb", "get-tree" }, + { "Key", consul_prefix+"/osd/state/"+std::to_string(osd_num)+"/" }, + } } + }); + } + wp_it++; + } + else if (!wp_it->second.connecting && + time(NULL) - wp_it->second.last_connect_attempt >= peer_connect_interval) + { + // Try to connect + wp_it->second.connecting = true; + const std::string & addr = peer_states[osd_num]["addresses"][wp_it->second.address_index].string_value(); + int64_t port = peer_states[osd_num]["port"].int64_value(); + wp_it++; + connect_peer(osd_num, addr.c_str(), port, [this](osd_num_t osd_num, int peer_fd) + { + wanted_peers[osd_num].connecting = false; + if (peer_fd < 0) + { + auto & addrs = peer_states[osd_num]["addresses"].array_items(); + const char *addr = addrs[wanted_peers[osd_num].address_index].string_value().c_str(); + printf("Failed to connect to peer OSD %lu address %s: %s\n", osd_num, addr, strerror(-peer_fd)); + if (wanted_peers[osd_num].address_index < addrs.size()-1) + { + // Try all addresses + wanted_peers[osd_num].address_index++; + } + else + { + wanted_peers[osd_num].last_connect_attempt = time(NULL); + peer_states.erase(osd_num); + } + return; + } + printf("Connected with peer OSD %lu (fd %d)\n", clients[peer_fd].osd_num, peer_fd); + // FIXME: Check peer config after connecting + wanted_peers.erase(osd_num); + if (!wanted_peers.size()) + { + // Connected to all peers + peering_state = peering_state & ~OSD_CONNECTING_PEERS; + } + repeer_pgs(osd_num, true); + }); + } + else + { + // Skip + wp_it++; + } + } + if (consul_txn.size() > 0) + { + std::string req = json11::Json(consul_txn).dump(); + req = "PUT /v1/txn HTTP/1.1\r\n" + "Host: "+consul_host+"\r\n" + "Content-Type: application/json\r\n" + "Content-Length: "+std::to_string(req.size())+"\r\n" + "Connection: close\r\n" + "\r\n"+req; + loading_peer_config = true; + http_request_json(consul_address, req, [this](std::string err, json11::Json data) + { + loading_peer_config = false; + if (err != "") + { + printf("Failed to load peer configuration from Consul"); + return; + } + for (auto & res: data["Results"].array_items()) + { + std::string key = res["KV"]["Key"].string_value(); + // /osd/state// + osd_num_t osd_num = std::stoull(key.substr(consul_prefix.length()+11, key.length()-consul_prefix.length()-12)); + std::string json_err; + json11::Json data = json11::Json::parse(base64_decode(res["KV"]["Value"].string_value()), json_err); + if (osd_num > 0 && data.is_object() && data["state"] == "up" && + data["addresses"].is_array() && data["port"].is_number()) + { + peer_states[osd_num] = data; + } + } + }); + } +} diff --git a/osd_http.cpp b/osd_http.cpp index f7a85521b..9b87fdad9 100644 --- a/osd_http.cpp +++ b/osd_http.cpp @@ -4,10 +4,12 @@ #include #include +#include + #include "osd_http.h" #include "osd.h" -int extract_port(std::string & host) +static int extract_port(std::string & host) { int port = 0; int pos = 0; @@ -97,6 +99,73 @@ void osd_t::http_request(std::string host, std::string request, std::functionresume(); } +void osd_t::http_request_json(std::string host, std::string request, + std::function callback) +{ + http_request(host, request, [this, callback](int err, std::string txt) + { + if (err != 0) + { + callback("Error code: "+std::to_string(err)+" ("+std::string(strerror(err))+")", json11::Json()); + return; + } + std::unique_ptr res(parse_http_response(txt)); + if (res->status_code != 200) + { + callback("HTTP "+std::to_string(res->status_code)+" "+res->status_line+" body: "+res->body, json11::Json()); + return; + } + std::string json_err; + json11::Json data = json11::Json::parse(res->body, json_err); + if (json_err != "") + { + callback("Bad JSON: "+json_err+" (response: "+res->body+")", json11::Json()); + return; + } + callback(std::string(), data); + }); +} + +http_response_t *parse_http_response(std::string res) +{ + http_response_t *parsed = new http_response_t(); + int pos = res.find("\r\n"); + pos = pos < 0 ? res.length() : pos+2; + std::string status_line = res.substr(0, pos); + int http_version; + char *status_text = NULL; + sscanf(status_line.c_str(), "HTTP/1.%d %d %ms", &http_version, &parsed->status_code, &status_text); + if (status_text) + { + parsed->status_line = status_text; + free(status_text); + status_text = NULL; + } + int prev = pos; + while ((pos = res.find("\r\n", prev)) > prev) + { + if (pos == prev+2) + { + parsed->body = res.substr(pos+2); + break; + } + std::string header = res.substr(prev, pos); + int p2 = header.find(":"); + if (p2 >= 0) + { + std::string key = header.substr(0, p2); + for (int i = 0; i < key.length(); i++) + key[i] = tolower(key[i]); + int p3 = p2+1; + while (p3 < header.length() && isblank(header[p3])) + p3++; + parsed->headers[key] = header.substr(p3); + } + prev = pos+2; + } + return parsed; +} + http_co_t::~http_co_t() { callback(code, response); diff --git a/osd_http.h b/osd_http.h index 6d0aa9087..3f7eb2bb9 100644 --- a/osd_http.h +++ b/osd_http.h @@ -1,7 +1,16 @@ #pragma once #include #include +#include +#include "json11/json11.hpp" -int extract_port(std::string & host); +struct http_response_t +{ + int status_code; + std::string status_line; + std::map headers; + std::string body; +}; +http_response_t *parse_http_response(std::string res); std::vector getifaddr_list(); diff --git a/osd_peering.cpp b/osd_peering.cpp index c14d516d7..6cd3507ff 100644 --- a/osd_peering.cpp +++ b/osd_peering.cpp @@ -3,36 +3,41 @@ #include +#include "base64.h" #include "osd.h" void osd_t::init_primary() { - // Initial test version of clustering code requires exactly 2 peers - // FIXME Hardcode - std::string peerstr = config["peers"]; - while (peerstr.size()) + if (consul_address == "") { - int pos = peerstr.find(','); - peers.push_back(parse_peer(pos < 0 ? peerstr : peerstr.substr(0, pos))); - peerstr = pos < 0 ? std::string("") : peerstr.substr(pos+1); - for (int i = 0; i < peers.size()-1; i++) - if (peers[i].osd_num == peers[peers.size()-1].osd_num) - throw std::runtime_error("same osd number "+std::to_string(peers[i].osd_num)+" specified twice in peers"); + // Test version of clustering code with 1 PG and 2 peers + // Example: peers = 2:127.0.0.1:11204,3:127.0.0.1:11205 + std::string peerstr = config["peers"]; + while (peerstr.size()) + { + int pos = peerstr.find(','); + parse_test_peer(pos < 0 ? peerstr : peerstr.substr(0, pos)); + peerstr = pos < 0 ? std::string("") : peerstr.substr(pos+1); + } + if (peer_states.size() < 2) + { + throw std::runtime_error("run_primary requires at least 2 peers"); + } + pgs[1] = (pg_t){ + .state = PG_PEERING, + .pg_cursize = 0, + .pg_num = 1, + .target_set = { 1, 2, 3 }, + .cur_set = { 0, 0, 0 }, + }; + pgs[1].print_state(); + pg_count = 1; + peering_state = OSD_CONNECTING_PEERS; } - if (peers.size() < 2) - throw std::runtime_error("run_primary requires at least 2 peers"); - pgs[1] = (pg_t){ - .state = PG_PEERING, - .pg_cursize = 0, - .pg_num = 1, - .target_set = { 1, 2, 3 }, - .cur_set = { 1, 0, 0 }, - }; - pgs[1].print_state(); - pg_count = 1; - peering_state = OSD_CONNECTING_PEERS; - if (consul_address != "") + else { + peering_state = OSD_LOADING_PGS; + load_pgs(); this->consul_tfd = new timerfd_interval(ringloop, consul_report_interval, [this]() { report_status(); @@ -47,24 +52,30 @@ void osd_t::init_primary() } } -osd_peer_def_t osd_t::parse_peer(std::string peer) +void osd_t::parse_test_peer(std::string peer) { // OSD_NUM:IP:PORT int pos1 = peer.find(':'); int pos2 = peer.find(':', pos1+1); if (pos1 < 0 || pos2 < 0) throw new std::runtime_error("OSD peer string must be in the form OSD_NUM:IP:PORT"); - osd_peer_def_t r; - r.addr = peer.substr(pos1+1, pos2-pos1-1); + std::string addr = peer.substr(pos1+1, pos2-pos1-1); std::string osd_num_str = peer.substr(0, pos1); std::string port_str = peer.substr(pos2+1); - r.osd_num = strtoull(osd_num_str.c_str(), NULL, 10); - if (!r.osd_num) + osd_num_t osd_num = strtoull(osd_num_str.c_str(), NULL, 10); + if (!osd_num) throw new std::runtime_error("Could not parse OSD peer osd_num"); - r.port = strtoull(port_str.c_str(), NULL, 10); - if (!r.port) + else if (peer_states.find(osd_num) != peer_states.end()) + throw std::runtime_error("Same osd number "+std::to_string(osd_num)+" specified twice in peers"); + int port = strtoull(port_str.c_str(), NULL, 10); + if (!port) throw new std::runtime_error("Could not parse OSD peer port"); - return r; + peer_states[osd_num] = json11::Json::object { + { "state", "up" }, + { "addresses", json11::Json::array { addr } }, + { "port", port }, + }; + wanted_peers[osd_num] = { 0 }; } void osd_t::connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port, std::function callback) @@ -149,36 +160,7 @@ void osd_t::handle_peers() { if (peering_state & OSD_CONNECTING_PEERS) { - for (int i = 0; i < peers.size(); i++) - { - if (osd_peer_fds.find(peers[i].osd_num) == osd_peer_fds.end() && - time(NULL) - peers[i].last_connect_attempt > 5) // FIXME hardcode 5 - { - peers[i].last_connect_attempt = time(NULL); - connect_peer(peers[i].osd_num, peers[i].addr.c_str(), peers[i].port, [this](osd_num_t osd_num, int peer_fd) - { - // FIXME: Check peer config after connecting - if (peer_fd < 0) - { - printf("Failed to connect to peer OSD %lu: %s\n", osd_num, strerror(-peer_fd)); - return; - } - printf("Connected with peer OSD %lu (fd %d)\n", clients[peer_fd].osd_num, peer_fd); - int i; - for (i = 0; i < peers.size(); i++) - { - if (osd_peer_fds.find(peers[i].osd_num) == osd_peer_fds.end()) - break; - } - if (i >= peers.size()) - { - // Connected to all peers - peering_state = peering_state & ~OSD_CONNECTING_PEERS; - } - repeer_pgs(osd_num, true); - }); - } - } + load_and_connect_peers(); } if (peering_state & OSD_PEERING_PGS) {