forked from vitalif/vitastor
Implement loading PGs from Consul (in theory)
parent
582f485578
commit
309486d746
5
osd.cpp
5
osd.cpp
|
@ -103,6 +103,7 @@ osd_op_t::~osd_op_t()
|
|||
void osd_t::parse_config(blockstore_config_t & config)
|
||||
{
|
||||
consul_address = config["consul_address"];
|
||||
consul_host = consul_address.find(':') >= 0 ? consul_address.substr(0, consul_address.find(':')) : consul_address;
|
||||
consul_prefix = config["consul_prefix"];
|
||||
if (consul_prefix == "")
|
||||
consul_prefix = "microceph";
|
||||
|
@ -112,6 +113,7 @@ void osd_t::parse_config(blockstore_config_t & config)
|
|||
bind_address = config["bind_address"];
|
||||
if (bind_address == "")
|
||||
bind_address = "0.0.0.0";
|
||||
// FIXME: select port automatically from range
|
||||
bind_port = strtoull(config["bind_port"].c_str(), NULL, 10);
|
||||
if (!bind_port || bind_port > 65535)
|
||||
bind_port = 11203;
|
||||
|
@ -134,6 +136,9 @@ void osd_t::parse_config(blockstore_config_t & config)
|
|||
print_stats_interval = strtoull(config["print_stats_interval"].c_str(), NULL, 10);
|
||||
if (!print_stats_interval)
|
||||
print_stats_interval = 3;
|
||||
peer_connect_interval = strtoull(config["peer_connect_interval"].c_str(), NULL, 10);
|
||||
if (!peer_connect_interval)
|
||||
peer_connect_interval = 5;
|
||||
}
|
||||
|
||||
void osd_t::bind_socket()
|
||||
|
|
36
osd.h
36
osd.h
|
@ -35,10 +35,11 @@
|
|||
#define PEER_CONNECTING 1
|
||||
#define PEER_CONNECTED 2
|
||||
|
||||
#define OSD_CONNECTING_PEERS 1
|
||||
#define OSD_PEERING_PGS 2
|
||||
#define OSD_FLUSHING_PGS 4
|
||||
#define OSD_RECOVERING 8
|
||||
#define OSD_LOADING_PGS 0x01
|
||||
#define OSD_CONNECTING_PEERS 0x02
|
||||
#define OSD_PEERING_PGS 0x04
|
||||
#define OSD_FLUSHING_PGS 0x08
|
||||
#define OSD_RECOVERING 0x10
|
||||
|
||||
#define IMMEDIATE_NONE 0
|
||||
#define IMMEDIATE_SMALL 1
|
||||
|
@ -50,6 +51,7 @@
|
|||
#define DEFAULT_RECOVERY_QUEUE 4
|
||||
|
||||
#define MAX_CONSUL_ATTEMPTS 5
|
||||
#define CONSUL_START_INTERVAL 5000
|
||||
#define CONSUL_RETRY_INTERVAL 1000
|
||||
|
||||
//#define OSD_STUB
|
||||
|
@ -127,14 +129,6 @@ struct osd_op_t
|
|||
~osd_op_t();
|
||||
};
|
||||
|
||||
struct osd_peer_def_t
|
||||
{
|
||||
osd_num_t osd_num = 0;
|
||||
std::string addr;
|
||||
int port = 0;
|
||||
time_t last_connect_attempt = 0;
|
||||
};
|
||||
|
||||
struct osd_client_t
|
||||
{
|
||||
sockaddr_in peer_addr;
|
||||
|
@ -188,6 +182,13 @@ struct osd_recovery_op_t
|
|||
osd_op_t *osd_op = NULL;
|
||||
};
|
||||
|
||||
struct osd_wanted_peer_t
|
||||
{
|
||||
bool connecting;
|
||||
time_t last_connect_attempt, last_load_attempt;
|
||||
int address_index;
|
||||
};
|
||||
|
||||
class osd_t
|
||||
{
|
||||
friend struct http_co_t;
|
||||
|
@ -198,7 +199,6 @@ class osd_t
|
|||
std::string consul_address, consul_host, consul_prefix = "microceph";
|
||||
osd_num_t osd_num = 1; // OSD numbers start with 1
|
||||
bool run_primary = false;
|
||||
std::vector<osd_peer_def_t> peers;
|
||||
blockstore_config_t config;
|
||||
std::string bind_address;
|
||||
int bind_port, listen_backlog;
|
||||
|
@ -210,9 +210,13 @@ class osd_t
|
|||
int immediate_commit = IMMEDIATE_NONE;
|
||||
int autosync_interval = DEFAULT_AUTOSYNC_INTERVAL; // sync every 5 seconds
|
||||
int recovery_queue_depth = DEFAULT_RECOVERY_QUEUE;
|
||||
int peer_connect_interval = 5;
|
||||
|
||||
// peer OSDs
|
||||
|
||||
std::map<osd_num_t, json11::Json> peer_states;
|
||||
std::map<osd_num_t, osd_wanted_peer_t> wanted_peers;
|
||||
bool loading_peer_config = false;
|
||||
std::vector<std::string> bind_addresses;
|
||||
int consul_failed_attempts = 0;
|
||||
|
||||
|
@ -264,6 +268,9 @@ class osd_t
|
|||
void reset_stats();
|
||||
json11::Json get_status();
|
||||
void report_status();
|
||||
void load_pgs();
|
||||
void parse_pgs(json11::Json data);
|
||||
void load_and_connect_peers();
|
||||
|
||||
// event loop, socket read/write
|
||||
void loop();
|
||||
|
@ -278,6 +285,7 @@ class osd_t
|
|||
void handle_send(ring_data_t *data, int peer_fd);
|
||||
void outbox_push(osd_client_t & cl, osd_op_t *op);
|
||||
void http_request(std::string host, std::string request, std::function<void(int, std::string)> callback);
|
||||
void http_request_json(std::string host, std::string request, std::function<void(std::string, json11::Json data)> callback);
|
||||
|
||||
// peer handling (primary OSD logic)
|
||||
void connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port, std::function<void(osd_num_t, int)> callback);
|
||||
|
@ -285,7 +293,7 @@ class osd_t
|
|||
void cancel_osd_ops(osd_client_t & cl);
|
||||
void cancel_op(osd_op_t *op);
|
||||
void stop_client(int peer_fd);
|
||||
osd_peer_def_t parse_peer(std::string peer);
|
||||
void parse_test_peer(std::string peer);
|
||||
void init_primary();
|
||||
void handle_peers();
|
||||
void repeer_pgs(osd_num_t osd_num, bool is_connected);
|
||||
|
|
249
osd_cluster.cpp
249
osd_cluster.cpp
|
@ -1,9 +1,13 @@
|
|||
#include "osd.h"
|
||||
#include "osd_http.h"
|
||||
#include "base64.h"
|
||||
|
||||
json11::Json osd_t::get_status()
|
||||
{
|
||||
json11::Json::object st;
|
||||
timespec ts;
|
||||
clock_gettime(CLOCK_REALTIME, &ts);
|
||||
st["time"] = std::to_string(ts.tv_sec)+"."+std::to_string(ts.tv_nsec/1000000);
|
||||
st["state"] = "up";
|
||||
if (bind_address != "0.0.0.0")
|
||||
st["addresses"] = { bind_address };
|
||||
|
@ -20,25 +24,6 @@ json11::Json osd_t::get_status()
|
|||
st["size"] = bs->get_block_count() * bs->get_block_size();
|
||||
st["free"] = bs->get_free_block_count() * bs->get_block_size();
|
||||
}
|
||||
json11::Json::object pg_status;
|
||||
for (auto & p: pgs)
|
||||
{
|
||||
auto & pg = p.second;
|
||||
json11::Json::object pg_st;
|
||||
json11::Json::array pg_state;
|
||||
for (int i = 0; i < pg_state_bit_count; i++)
|
||||
if (pg.state & pg_state_bits[i])
|
||||
pg_state.push_back(pg_state_names[i]);
|
||||
pg_st["state"] = pg_state;
|
||||
pg_st["object_count"] = pg.total_count;
|
||||
pg_st["clean_count"] = pg.clean_count;
|
||||
pg_st["misplaced_count"] = pg.misplaced_objects.size();
|
||||
pg_st["degraded_count"] = pg.degraded_objects.size();
|
||||
pg_st["incomplete_count"] = pg.incomplete_objects.size();
|
||||
pg_st["write_osd_set"] = pg.cur_set;
|
||||
pg_status[std::to_string(pg.pg_num)] = pg_st;
|
||||
}
|
||||
st["pgs"] = pg_status;
|
||||
json11::Json::object op_stats, subop_stats;
|
||||
for (int i = 0; i <= OSD_OP_MAX; i++)
|
||||
{
|
||||
|
@ -59,18 +44,37 @@ json11::Json osd_t::get_status()
|
|||
return st;
|
||||
}
|
||||
|
||||
/*
|
||||
json11::Json::object pg_status;
|
||||
for (auto & p: pgs)
|
||||
{
|
||||
auto & pg = p.second;
|
||||
json11::Json::object pg_st;
|
||||
json11::Json::array pg_state;
|
||||
for (int i = 0; i < pg_state_bit_count; i++)
|
||||
if (pg.state & pg_state_bits[i])
|
||||
pg_state.push_back(pg_state_names[i]);
|
||||
pg_st["state"] = pg_state;
|
||||
pg_st["object_count"] = pg.total_count;
|
||||
pg_st["clean_count"] = pg.clean_count;
|
||||
pg_st["misplaced_count"] = pg.misplaced_objects.size();
|
||||
pg_st["degraded_count"] = pg.degraded_objects.size();
|
||||
pg_st["incomplete_count"] = pg.incomplete_objects.size();
|
||||
pg_st["write_osd_set"] = pg.cur_set;
|
||||
pg_status[std::to_string(pg.pg_num)] = pg_st;
|
||||
}
|
||||
st["pgs"] = pg_status;
|
||||
*/
|
||||
|
||||
void osd_t::report_status()
|
||||
{
|
||||
if (consul_host == "")
|
||||
{
|
||||
consul_host = consul_address;
|
||||
extract_port(consul_host);
|
||||
}
|
||||
std::string st = get_status().dump();
|
||||
std::string req = "PUT /v1/kv/"+consul_prefix+"/osd/"+std::to_string(osd_num)+" HTTP/1.1\r\n"+
|
||||
// (!) Keys end with / to allow "select /osd/state/123/ by prefix"
|
||||
// because Consul transactions fail if you try to read non-existing keys
|
||||
std::string req = "PUT /v1/kv/"+consul_prefix+"/osd/state/"+std::to_string(osd_num)+"/ HTTP/1.1\r\n"+
|
||||
"Host: "+consul_host+"\r\n"+
|
||||
"Content-Length: "+std::to_string(st.size())+"\r\n"+
|
||||
"Connection: close\r\n"
|
||||
"Connection: close\r\n"+
|
||||
"\r\n"+st;
|
||||
http_request(consul_address, req, [this](int err, std::string res)
|
||||
{
|
||||
|
@ -99,3 +103,196 @@ void osd_t::report_status()
|
|||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Start -> Load PGs -> Load peers -> Connect to peers -> Peer PGs
|
||||
// Wait for PG changes -> Start/Stop PGs when requested
|
||||
// Peer connection is lost -> Reload connection data -> Try to reconnect -> Repeat
|
||||
void osd_t::load_pgs()
|
||||
{
|
||||
assert(this->pgs.size() == 0);
|
||||
std::string req = "GET /v1/kv/"+consul_prefix+"/config/pgs?raw"+
|
||||
/*(consul_change_index > 0 ? "&index="+std::to_string(consul_change_index) : "")+*/
|
||||
" HTTP/1.1\r\n"+
|
||||
"Host: "+consul_host+"\r\n"+
|
||||
"Connection: close\r\n"+
|
||||
"\r\n";
|
||||
http_request_json(consul_address, req, [this](std::string err, json11::Json data)
|
||||
{
|
||||
if (err != "")
|
||||
{
|
||||
printf("Error loading PGs from Consul: %s\n", err.c_str());
|
||||
tfd->set_timer(CONSUL_START_INTERVAL, false, [this](int timer_id)
|
||||
{
|
||||
load_pgs();
|
||||
});
|
||||
return;
|
||||
}
|
||||
parse_pgs(data);
|
||||
peering_state = OSD_CONNECTING_PEERS;
|
||||
});
|
||||
}
|
||||
|
||||
void osd_t::parse_pgs(json11::Json data)
|
||||
{
|
||||
uint64_t pg_count = 0;
|
||||
for (auto pg_item: data.object_items())
|
||||
{
|
||||
char *pg_num_end = NULL;
|
||||
pg_num_t pg_num = strtoull(pg_item.first.c_str(), &pg_num_end, 10);
|
||||
if (!pg_num || *pg_num_end != 0)
|
||||
{
|
||||
throw std::runtime_error("Bad key in PG hash: "+pg_item.first);
|
||||
}
|
||||
auto & pg_json = pg_item.second;
|
||||
osd_num_t primary_osd = 0;
|
||||
std::vector<osd_num_t> target_set;
|
||||
for (auto pg_osd_num: pg_json["osd_set"].array_items())
|
||||
{
|
||||
osd_num_t pg_osd = pg_osd_num.uint64_value();
|
||||
target_set.push_back(pg_osd);
|
||||
if (pg_osd != 0 && primary_osd == 0)
|
||||
{
|
||||
primary_osd = pg_osd;
|
||||
}
|
||||
}
|
||||
if (target_set.size() != 3)
|
||||
{
|
||||
throw std::runtime_error("Bad PG "+std::to_string(pg_num)+" config format: incorrect osd_set");
|
||||
}
|
||||
if (primary_osd == this->osd_num)
|
||||
{
|
||||
// Take this PG
|
||||
this->pgs[pg_num] = (pg_t){
|
||||
.state = PG_PEERING,
|
||||
.pg_cursize = 0,
|
||||
.pg_num = pg_num,
|
||||
.target_set = target_set,
|
||||
.cur_set = target_set,
|
||||
};
|
||||
this->pgs[pg_num].print_state();
|
||||
// Add peers
|
||||
for (auto pg_osd: target_set)
|
||||
{
|
||||
// FIXME: Add OSDs from PG history to peers
|
||||
if (pg_osd != this->osd_num && osd_peer_fds.find(pg_osd) == osd_peer_fds.end())
|
||||
{
|
||||
wanted_peers[pg_osd] = { 0 };
|
||||
}
|
||||
}
|
||||
}
|
||||
pg_count++;
|
||||
}
|
||||
this->pg_count = pg_count;
|
||||
}
|
||||
|
||||
void osd_t::load_and_connect_peers()
|
||||
{
|
||||
json11::Json::array consul_txn;
|
||||
for (auto wp_it = wanted_peers.begin(); wp_it != wanted_peers.end();)
|
||||
{
|
||||
osd_num_t osd_num = wp_it->first;
|
||||
if (osd_peer_fds.find(osd_num) != osd_peer_fds.end())
|
||||
{
|
||||
// It shouldn't be here
|
||||
wanted_peers.erase(wp_it++);
|
||||
if (!wanted_peers.size())
|
||||
{
|
||||
// Connected to all peers
|
||||
peering_state = peering_state & ~OSD_CONNECTING_PEERS;
|
||||
}
|
||||
}
|
||||
else if (peer_states.find(osd_num) == peer_states.end() &&
|
||||
time(NULL) - wp_it->second.last_load_attempt >= peer_connect_interval)
|
||||
{
|
||||
if (!loading_peer_config)
|
||||
{
|
||||
// (Re)load OSD state from Consul
|
||||
wp_it->second.last_load_attempt = time(NULL);
|
||||
consul_txn.push_back(json11::Json::object {
|
||||
{ "KV", json11::Json::object {
|
||||
{ "Verb", "get-tree" },
|
||||
{ "Key", consul_prefix+"/osd/state/"+std::to_string(osd_num)+"/" },
|
||||
} }
|
||||
});
|
||||
}
|
||||
wp_it++;
|
||||
}
|
||||
else if (!wp_it->second.connecting &&
|
||||
time(NULL) - wp_it->second.last_connect_attempt >= peer_connect_interval)
|
||||
{
|
||||
// Try to connect
|
||||
wp_it->second.connecting = true;
|
||||
const std::string & addr = peer_states[osd_num]["addresses"][wp_it->second.address_index].string_value();
|
||||
int64_t port = peer_states[osd_num]["port"].int64_value();
|
||||
wp_it++;
|
||||
connect_peer(osd_num, addr.c_str(), port, [this](osd_num_t osd_num, int peer_fd)
|
||||
{
|
||||
wanted_peers[osd_num].connecting = false;
|
||||
if (peer_fd < 0)
|
||||
{
|
||||
auto & addrs = peer_states[osd_num]["addresses"].array_items();
|
||||
const char *addr = addrs[wanted_peers[osd_num].address_index].string_value().c_str();
|
||||
printf("Failed to connect to peer OSD %lu address %s: %s\n", osd_num, addr, strerror(-peer_fd));
|
||||
if (wanted_peers[osd_num].address_index < addrs.size()-1)
|
||||
{
|
||||
// Try all addresses
|
||||
wanted_peers[osd_num].address_index++;
|
||||
}
|
||||
else
|
||||
{
|
||||
wanted_peers[osd_num].last_connect_attempt = time(NULL);
|
||||
peer_states.erase(osd_num);
|
||||
}
|
||||
return;
|
||||
}
|
||||
printf("Connected with peer OSD %lu (fd %d)\n", clients[peer_fd].osd_num, peer_fd);
|
||||
// FIXME: Check peer config after connecting
|
||||
wanted_peers.erase(osd_num);
|
||||
if (!wanted_peers.size())
|
||||
{
|
||||
// Connected to all peers
|
||||
peering_state = peering_state & ~OSD_CONNECTING_PEERS;
|
||||
}
|
||||
repeer_pgs(osd_num, true);
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
// Skip
|
||||
wp_it++;
|
||||
}
|
||||
}
|
||||
if (consul_txn.size() > 0)
|
||||
{
|
||||
std::string req = json11::Json(consul_txn).dump();
|
||||
req = "PUT /v1/txn HTTP/1.1\r\n"
|
||||
"Host: "+consul_host+"\r\n"
|
||||
"Content-Type: application/json\r\n"
|
||||
"Content-Length: "+std::to_string(req.size())+"\r\n"
|
||||
"Connection: close\r\n"
|
||||
"\r\n"+req;
|
||||
loading_peer_config = true;
|
||||
http_request_json(consul_address, req, [this](std::string err, json11::Json data)
|
||||
{
|
||||
loading_peer_config = false;
|
||||
if (err != "")
|
||||
{
|
||||
printf("Failed to load peer configuration from Consul");
|
||||
return;
|
||||
}
|
||||
for (auto & res: data["Results"].array_items())
|
||||
{
|
||||
std::string key = res["KV"]["Key"].string_value();
|
||||
// <consul_prefix>/osd/state/<osd_num>/
|
||||
osd_num_t osd_num = std::stoull(key.substr(consul_prefix.length()+11, key.length()-consul_prefix.length()-12));
|
||||
std::string json_err;
|
||||
json11::Json data = json11::Json::parse(base64_decode(res["KV"]["Value"].string_value()), json_err);
|
||||
if (osd_num > 0 && data.is_object() && data["state"] == "up" &&
|
||||
data["addresses"].is_array() && data["port"].is_number())
|
||||
{
|
||||
peer_states[osd_num] = data;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
71
osd_http.cpp
71
osd_http.cpp
|
@ -4,10 +4,12 @@
|
|||
#include <net/if.h>
|
||||
#include <ifaddrs.h>
|
||||
|
||||
#include <ctype.h>
|
||||
|
||||
#include "osd_http.h"
|
||||
#include "osd.h"
|
||||
|
||||
int extract_port(std::string & host)
|
||||
static int extract_port(std::string & host)
|
||||
{
|
||||
int port = 0;
|
||||
int pos = 0;
|
||||
|
@ -97,6 +99,73 @@ void osd_t::http_request(std::string host, std::string request, std::function<vo
|
|||
handler->resume();
|
||||
}
|
||||
|
||||
void osd_t::http_request_json(std::string host, std::string request,
|
||||
std::function<void(std::string, json11::Json r)> callback)
|
||||
{
|
||||
http_request(host, request, [this, callback](int err, std::string txt)
|
||||
{
|
||||
if (err != 0)
|
||||
{
|
||||
callback("Error code: "+std::to_string(err)+" ("+std::string(strerror(err))+")", json11::Json());
|
||||
return;
|
||||
}
|
||||
std::unique_ptr<http_response_t> res(parse_http_response(txt));
|
||||
if (res->status_code != 200)
|
||||
{
|
||||
callback("HTTP "+std::to_string(res->status_code)+" "+res->status_line+" body: "+res->body, json11::Json());
|
||||
return;
|
||||
}
|
||||
std::string json_err;
|
||||
json11::Json data = json11::Json::parse(res->body, json_err);
|
||||
if (json_err != "")
|
||||
{
|
||||
callback("Bad JSON: "+json_err+" (response: "+res->body+")", json11::Json());
|
||||
return;
|
||||
}
|
||||
callback(std::string(), data);
|
||||
});
|
||||
}
|
||||
|
||||
http_response_t *parse_http_response(std::string res)
|
||||
{
|
||||
http_response_t *parsed = new http_response_t();
|
||||
int pos = res.find("\r\n");
|
||||
pos = pos < 0 ? res.length() : pos+2;
|
||||
std::string status_line = res.substr(0, pos);
|
||||
int http_version;
|
||||
char *status_text = NULL;
|
||||
sscanf(status_line.c_str(), "HTTP/1.%d %d %ms", &http_version, &parsed->status_code, &status_text);
|
||||
if (status_text)
|
||||
{
|
||||
parsed->status_line = status_text;
|
||||
free(status_text);
|
||||
status_text = NULL;
|
||||
}
|
||||
int prev = pos;
|
||||
while ((pos = res.find("\r\n", prev)) > prev)
|
||||
{
|
||||
if (pos == prev+2)
|
||||
{
|
||||
parsed->body = res.substr(pos+2);
|
||||
break;
|
||||
}
|
||||
std::string header = res.substr(prev, pos);
|
||||
int p2 = header.find(":");
|
||||
if (p2 >= 0)
|
||||
{
|
||||
std::string key = header.substr(0, p2);
|
||||
for (int i = 0; i < key.length(); i++)
|
||||
key[i] = tolower(key[i]);
|
||||
int p3 = p2+1;
|
||||
while (p3 < header.length() && isblank(header[p3]))
|
||||
p3++;
|
||||
parsed->headers[key] = header.substr(p3);
|
||||
}
|
||||
prev = pos+2;
|
||||
}
|
||||
return parsed;
|
||||
}
|
||||
|
||||
http_co_t::~http_co_t()
|
||||
{
|
||||
callback(code, response);
|
||||
|
|
11
osd_http.h
11
osd_http.h
|
@ -1,7 +1,16 @@
|
|||
#pragma once
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include <map>
|
||||
#include "json11/json11.hpp"
|
||||
|
||||
int extract_port(std::string & host);
|
||||
struct http_response_t
|
||||
{
|
||||
int status_code;
|
||||
std::string status_line;
|
||||
std::map<std::string, std::string> headers;
|
||||
std::string body;
|
||||
};
|
||||
|
||||
http_response_t *parse_http_response(std::string res);
|
||||
std::vector<std::string> getifaddr_list();
|
||||
|
|
104
osd_peering.cpp
104
osd_peering.cpp
|
@ -3,36 +3,41 @@
|
|||
|
||||
#include <algorithm>
|
||||
|
||||
#include "base64.h"
|
||||
#include "osd.h"
|
||||
|
||||
void osd_t::init_primary()
|
||||
{
|
||||
// Initial test version of clustering code requires exactly 2 peers
|
||||
// FIXME Hardcode
|
||||
std::string peerstr = config["peers"];
|
||||
while (peerstr.size())
|
||||
if (consul_address == "")
|
||||
{
|
||||
int pos = peerstr.find(',');
|
||||
peers.push_back(parse_peer(pos < 0 ? peerstr : peerstr.substr(0, pos)));
|
||||
peerstr = pos < 0 ? std::string("") : peerstr.substr(pos+1);
|
||||
for (int i = 0; i < peers.size()-1; i++)
|
||||
if (peers[i].osd_num == peers[peers.size()-1].osd_num)
|
||||
throw std::runtime_error("same osd number "+std::to_string(peers[i].osd_num)+" specified twice in peers");
|
||||
// Test version of clustering code with 1 PG and 2 peers
|
||||
// Example: peers = 2:127.0.0.1:11204,3:127.0.0.1:11205
|
||||
std::string peerstr = config["peers"];
|
||||
while (peerstr.size())
|
||||
{
|
||||
int pos = peerstr.find(',');
|
||||
parse_test_peer(pos < 0 ? peerstr : peerstr.substr(0, pos));
|
||||
peerstr = pos < 0 ? std::string("") : peerstr.substr(pos+1);
|
||||
}
|
||||
if (peer_states.size() < 2)
|
||||
{
|
||||
throw std::runtime_error("run_primary requires at least 2 peers");
|
||||
}
|
||||
pgs[1] = (pg_t){
|
||||
.state = PG_PEERING,
|
||||
.pg_cursize = 0,
|
||||
.pg_num = 1,
|
||||
.target_set = { 1, 2, 3 },
|
||||
.cur_set = { 0, 0, 0 },
|
||||
};
|
||||
pgs[1].print_state();
|
||||
pg_count = 1;
|
||||
peering_state = OSD_CONNECTING_PEERS;
|
||||
}
|
||||
if (peers.size() < 2)
|
||||
throw std::runtime_error("run_primary requires at least 2 peers");
|
||||
pgs[1] = (pg_t){
|
||||
.state = PG_PEERING,
|
||||
.pg_cursize = 0,
|
||||
.pg_num = 1,
|
||||
.target_set = { 1, 2, 3 },
|
||||
.cur_set = { 1, 0, 0 },
|
||||
};
|
||||
pgs[1].print_state();
|
||||
pg_count = 1;
|
||||
peering_state = OSD_CONNECTING_PEERS;
|
||||
if (consul_address != "")
|
||||
else
|
||||
{
|
||||
peering_state = OSD_LOADING_PGS;
|
||||
load_pgs();
|
||||
this->consul_tfd = new timerfd_interval(ringloop, consul_report_interval, [this]()
|
||||
{
|
||||
report_status();
|
||||
|
@ -47,24 +52,30 @@ void osd_t::init_primary()
|
|||
}
|
||||
}
|
||||
|
||||
osd_peer_def_t osd_t::parse_peer(std::string peer)
|
||||
void osd_t::parse_test_peer(std::string peer)
|
||||
{
|
||||
// OSD_NUM:IP:PORT
|
||||
int pos1 = peer.find(':');
|
||||
int pos2 = peer.find(':', pos1+1);
|
||||
if (pos1 < 0 || pos2 < 0)
|
||||
throw new std::runtime_error("OSD peer string must be in the form OSD_NUM:IP:PORT");
|
||||
osd_peer_def_t r;
|
||||
r.addr = peer.substr(pos1+1, pos2-pos1-1);
|
||||
std::string addr = peer.substr(pos1+1, pos2-pos1-1);
|
||||
std::string osd_num_str = peer.substr(0, pos1);
|
||||
std::string port_str = peer.substr(pos2+1);
|
||||
r.osd_num = strtoull(osd_num_str.c_str(), NULL, 10);
|
||||
if (!r.osd_num)
|
||||
osd_num_t osd_num = strtoull(osd_num_str.c_str(), NULL, 10);
|
||||
if (!osd_num)
|
||||
throw new std::runtime_error("Could not parse OSD peer osd_num");
|
||||
r.port = strtoull(port_str.c_str(), NULL, 10);
|
||||
if (!r.port)
|
||||
else if (peer_states.find(osd_num) != peer_states.end())
|
||||
throw std::runtime_error("Same osd number "+std::to_string(osd_num)+" specified twice in peers");
|
||||
int port = strtoull(port_str.c_str(), NULL, 10);
|
||||
if (!port)
|
||||
throw new std::runtime_error("Could not parse OSD peer port");
|
||||
return r;
|
||||
peer_states[osd_num] = json11::Json::object {
|
||||
{ "state", "up" },
|
||||
{ "addresses", json11::Json::array { addr } },
|
||||
{ "port", port },
|
||||
};
|
||||
wanted_peers[osd_num] = { 0 };
|
||||
}
|
||||
|
||||
void osd_t::connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port, std::function<void(osd_num_t, int)> callback)
|
||||
|
@ -149,36 +160,7 @@ void osd_t::handle_peers()
|
|||
{
|
||||
if (peering_state & OSD_CONNECTING_PEERS)
|
||||
{
|
||||
for (int i = 0; i < peers.size(); i++)
|
||||
{
|
||||
if (osd_peer_fds.find(peers[i].osd_num) == osd_peer_fds.end() &&
|
||||
time(NULL) - peers[i].last_connect_attempt > 5) // FIXME hardcode 5
|
||||
{
|
||||
peers[i].last_connect_attempt = time(NULL);
|
||||
connect_peer(peers[i].osd_num, peers[i].addr.c_str(), peers[i].port, [this](osd_num_t osd_num, int peer_fd)
|
||||
{
|
||||
// FIXME: Check peer config after connecting
|
||||
if (peer_fd < 0)
|
||||
{
|
||||
printf("Failed to connect to peer OSD %lu: %s\n", osd_num, strerror(-peer_fd));
|
||||
return;
|
||||
}
|
||||
printf("Connected with peer OSD %lu (fd %d)\n", clients[peer_fd].osd_num, peer_fd);
|
||||
int i;
|
||||
for (i = 0; i < peers.size(); i++)
|
||||
{
|
||||
if (osd_peer_fds.find(peers[i].osd_num) == osd_peer_fds.end())
|
||||
break;
|
||||
}
|
||||
if (i >= peers.size())
|
||||
{
|
||||
// Connected to all peers
|
||||
peering_state = peering_state & ~OSD_CONNECTING_PEERS;
|
||||
}
|
||||
repeer_pgs(osd_num, true);
|
||||
});
|
||||
}
|
||||
}
|
||||
load_and_connect_peers();
|
||||
}
|
||||
if (peering_state & OSD_PEERING_PGS)
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue