Extract "state-watching" etcd client into a separate file
parent
6202260018
commit
f6a01a4819
11
Makefile
11
Makefile
|
@ -30,7 +30,8 @@ libfio_blockstore.so: ./libblockstore.so fio_engine.cpp json11.o
|
|||
g++ $(CXXFLAGS) -shared -o libfio_blockstore.so fio_engine.cpp json11.o ./libblockstore.so -ltcmalloc_minimal -luring
|
||||
|
||||
OSD_OBJS := osd.o osd_secondary.o osd_receive.o osd_send.o osd_peering.o osd_flush.o osd_peering_pg.o \
|
||||
osd_primary.o osd_primary_subops.o osd_cluster.o http_client.o osd_rmw.o json11.o timerfd_interval.o base64.o timerfd_manager.o
|
||||
osd_primary.o osd_primary_subops.o etcd_state_client.o osd_cluster.o http_client.o pg_states.o \
|
||||
osd_rmw.o json11.o timerfd_interval.o base64.o timerfd_manager.o
|
||||
base64.o: base64.cpp base64.h
|
||||
g++ $(CXXFLAGS) -c -o $@ $<
|
||||
osd_secondary.o: osd_secondary.cpp osd.h osd_ops.h ringloop.h
|
||||
|
@ -43,11 +44,15 @@ osd_peering.o: osd_peering.cpp osd.h osd_ops.h osd_peering_pg.h ringloop.h
|
|||
g++ $(CXXFLAGS) -c -o $@ $<
|
||||
osd_cluster.o: osd_cluster.cpp osd.h osd_ops.h ringloop.h
|
||||
g++ $(CXXFLAGS) -c -o $@ $<
|
||||
http_client.o: http_client.cpp http_client.h osd.h osd_ops.h ringloop.h
|
||||
http_client.o: http_client.cpp http_client.h osd.h osd_ops.h
|
||||
g++ $(CXXFLAGS) -c -o $@ $<
|
||||
etcd_state_client.o: etcd_state_client.cpp etcd_state_client.h http_client.h pg_states.h
|
||||
g++ $(CXXFLAGS) -c -o $@ $<
|
||||
osd_flush.o: osd_flush.cpp osd.h osd_ops.h osd_peering_pg.h ringloop.h
|
||||
g++ $(CXXFLAGS) -c -o $@ $<
|
||||
osd_peering_pg.o: osd_peering_pg.cpp object_id.h osd_peering_pg.h
|
||||
osd_peering_pg.o: osd_peering_pg.cpp object_id.h osd_peering_pg.h pg_states.h
|
||||
g++ $(CXXFLAGS) -c -o $@ $<
|
||||
pg_states.o: pg_states.cpp pg_states.h
|
||||
g++ $(CXXFLAGS) -c -o $@ $<
|
||||
osd_rmw.o: osd_rmw.cpp osd_rmw.h xor.h
|
||||
g++ $(CXXFLAGS) -c -o $@ $<
|
||||
|
|
|
@ -0,0 +1,354 @@
|
|||
#include "osd_ops.h"
|
||||
#include "pg_states.h"
|
||||
#include "etcd_state_client.h"
|
||||
#include "http_client.h"
|
||||
#include "base64.h"
|
||||
|
||||
json_kv_t etcd_state_client_t::parse_etcd_kv(const json11::Json & kv_json)
|
||||
{
|
||||
json_kv_t kv;
|
||||
kv.key = base64_decode(kv_json["key"].string_value());
|
||||
std::string json_err, json_text = base64_decode(kv_json["value"].string_value());
|
||||
kv.value = json_text == "" ? json11::Json() : json11::Json::parse(json_text, json_err);
|
||||
if (json_err != "")
|
||||
{
|
||||
printf("Bad JSON in etcd key %s: %s (value: %s)\n", kv.key.c_str(), json_err.c_str(), json_text.c_str());
|
||||
kv.key = "";
|
||||
}
|
||||
return kv;
|
||||
}
|
||||
|
||||
void etcd_state_client_t::etcd_call(std::string api, json11::Json payload, int timeout, std::function<void(std::string, json11::Json)> callback)
|
||||
{
|
||||
std::string req = payload.dump();
|
||||
req = "POST "+etcd_api_path+api+" HTTP/1.1\r\n"
|
||||
"Host: "+etcd_address+"\r\n"
|
||||
"Content-Type: application/json\r\n"
|
||||
"Content-Length: "+std::to_string(req.size())+"\r\n"
|
||||
"Connection: close\r\n"
|
||||
"\r\n"+req;
|
||||
http_request_json(tfd, etcd_address, req, timeout, callback);
|
||||
}
|
||||
|
||||
void etcd_state_client_t::etcd_txn(json11::Json txn, int timeout, std::function<void(std::string, json11::Json)> callback)
|
||||
{
|
||||
etcd_call("/kv/txn", txn, timeout, callback);
|
||||
}
|
||||
|
||||
void etcd_state_client_t::start_etcd_watcher()
|
||||
{
|
||||
etcd_watches_initialised = 0;
|
||||
etcd_watch_ws = open_websocket(tfd, etcd_address, etcd_api_path+"/watch", ETCD_SLOW_TIMEOUT, [this](const http_response_t *msg)
|
||||
{
|
||||
if (msg->body.length())
|
||||
{
|
||||
std::string json_err;
|
||||
json11::Json data = json11::Json::parse(msg->body, json_err);
|
||||
if (json_err != "")
|
||||
{
|
||||
printf("Bad JSON in etcd event: %s, ignoring event\n", json_err.c_str());
|
||||
}
|
||||
else
|
||||
{
|
||||
if (data["result"]["created"].bool_value())
|
||||
{
|
||||
etcd_watches_initialised++;
|
||||
}
|
||||
if (etcd_watches_initialised == 4)
|
||||
{
|
||||
etcd_watch_revision = data["result"]["header"]["revision"].uint64_value();
|
||||
}
|
||||
// First gather all changes into a hash to remove multiple overwrites
|
||||
json11::Json::object changes;
|
||||
for (auto & ev: data["result"]["events"].array_items())
|
||||
{
|
||||
auto kv = parse_etcd_kv(ev["kv"]);
|
||||
if (kv.key != "")
|
||||
{
|
||||
changes[kv.key] = kv.value;
|
||||
}
|
||||
}
|
||||
for (auto & kv: changes)
|
||||
{
|
||||
if (this->log_level > 0)
|
||||
{
|
||||
printf("Incoming event: %s -> %s\n", kv.first.c_str(), kv.second.dump().c_str());
|
||||
}
|
||||
parse_state(kv.first, kv.second);
|
||||
}
|
||||
// React to changes
|
||||
on_change_hook(changes);
|
||||
}
|
||||
}
|
||||
if (msg->eof)
|
||||
{
|
||||
etcd_watch_ws = NULL;
|
||||
if (etcd_watches_initialised == 0)
|
||||
{
|
||||
// Connection not established, retry in <ETCD_SLOW_TIMEOUT>
|
||||
tfd->set_timer(ETCD_SLOW_TIMEOUT, false, [this](int)
|
||||
{
|
||||
start_etcd_watcher();
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
// Connection was live, retry immediately
|
||||
start_etcd_watcher();
|
||||
}
|
||||
}
|
||||
});
|
||||
etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object {
|
||||
{ "create_request", json11::Json::object {
|
||||
{ "key", base64_encode(etcd_prefix+"/config/") },
|
||||
{ "range_end", base64_encode(etcd_prefix+"/config0") },
|
||||
{ "start_revision", etcd_watch_revision+1 },
|
||||
{ "watch_id", ETCD_CONFIG_WATCH_ID },
|
||||
} }
|
||||
}).dump());
|
||||
etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object {
|
||||
{ "create_request", json11::Json::object {
|
||||
{ "key", base64_encode(etcd_prefix+"/osd/state/") },
|
||||
{ "range_end", base64_encode(etcd_prefix+"/osd/state0") },
|
||||
{ "start_revision", etcd_watch_revision+1 },
|
||||
{ "watch_id", ETCD_OSD_STATE_WATCH_ID },
|
||||
} }
|
||||
}).dump());
|
||||
etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object {
|
||||
{ "create_request", json11::Json::object {
|
||||
{ "key", base64_encode(etcd_prefix+"/pg/state/") },
|
||||
{ "range_end", base64_encode(etcd_prefix+"/pg/state0") },
|
||||
{ "start_revision", etcd_watch_revision+1 },
|
||||
{ "watch_id", ETCD_PG_STATE_WATCH_ID },
|
||||
} }
|
||||
}).dump());
|
||||
etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object {
|
||||
{ "create_request", json11::Json::object {
|
||||
{ "key", base64_encode(etcd_prefix+"/pg/history/") },
|
||||
{ "range_end", base64_encode(etcd_prefix+"/pg/history0") },
|
||||
{ "start_revision", etcd_watch_revision+1 },
|
||||
{ "watch_id", ETCD_PG_HISTORY_WATCH_ID },
|
||||
} }
|
||||
}).dump());
|
||||
}
|
||||
|
||||
void etcd_state_client_t::load_global_config()
|
||||
{
|
||||
etcd_call("/kv/range", json11::Json::object {
|
||||
{ "key", base64_encode(etcd_prefix+"/config/global") }
|
||||
}, ETCD_SLOW_TIMEOUT, [this](std::string err, json11::Json data)
|
||||
{
|
||||
if (err != "")
|
||||
{
|
||||
printf("Error reading OSD configuration from etcd: %s\n", err.c_str());
|
||||
tfd->set_timer(ETCD_SLOW_TIMEOUT, false, [this](int timer_id)
|
||||
{
|
||||
load_global_config();
|
||||
});
|
||||
return;
|
||||
}
|
||||
if (!etcd_watch_revision)
|
||||
{
|
||||
etcd_watch_revision = data["header"]["revision"].uint64_value();
|
||||
}
|
||||
json11::Json::object global_config;
|
||||
if (data["kvs"].array_items().size() > 0)
|
||||
{
|
||||
auto kv = parse_etcd_kv(data["kvs"][0]);
|
||||
if (kv.value.is_object())
|
||||
{
|
||||
global_config = kv.value.object_items();
|
||||
}
|
||||
}
|
||||
on_load_config_hook(global_config);
|
||||
});
|
||||
}
|
||||
|
||||
void etcd_state_client_t::load_pgs()
|
||||
{
|
||||
json11::Json::array txn = {
|
||||
json11::Json::object {
|
||||
{ "request_range", json11::Json::object {
|
||||
{ "key", base64_encode(etcd_prefix+"/config/pgs") },
|
||||
} }
|
||||
},
|
||||
json11::Json::object {
|
||||
{ "request_range", json11::Json::object {
|
||||
{ "key", base64_encode(etcd_prefix+"/pg/history/") },
|
||||
{ "range_end", base64_encode(etcd_prefix+"/pg/history0") },
|
||||
} }
|
||||
},
|
||||
json11::Json::object {
|
||||
{ "request_range", json11::Json::object {
|
||||
{ "key", base64_encode(etcd_prefix+"/pg/state/") },
|
||||
{ "range_end", base64_encode(etcd_prefix+"/pg/state0") },
|
||||
} }
|
||||
},
|
||||
json11::Json::object {
|
||||
{ "request_range", json11::Json::object {
|
||||
{ "key", base64_encode(etcd_prefix+"/osd/state/") },
|
||||
{ "range_end", base64_encode(etcd_prefix+"/osd/state0") },
|
||||
} }
|
||||
},
|
||||
};
|
||||
json11::Json::object req = { { "success", txn } };
|
||||
json11::Json checks = load_pgs_checks_hook();
|
||||
if (checks.array_items().size() > 0)
|
||||
{
|
||||
req["compare"] = checks;
|
||||
}
|
||||
etcd_txn(req, ETCD_SLOW_TIMEOUT, [this](std::string err, json11::Json data)
|
||||
{
|
||||
if (err != "")
|
||||
{
|
||||
printf("Error loading PGs from etcd: %s\n", err.c_str());
|
||||
tfd->set_timer(ETCD_SLOW_TIMEOUT, false, [this](int timer_id)
|
||||
{
|
||||
load_pgs();
|
||||
});
|
||||
return;
|
||||
}
|
||||
if (!data["succeeded"].bool_value())
|
||||
{
|
||||
on_load_pgs_hook(false);
|
||||
return;
|
||||
}
|
||||
for (auto & res: data["responses"].array_items())
|
||||
{
|
||||
for (auto & kv_json: res["response_range"]["kvs"].array_items())
|
||||
{
|
||||
auto kv = parse_etcd_kv(kv_json);
|
||||
parse_state(kv.key, kv.value);
|
||||
}
|
||||
}
|
||||
on_load_pgs_hook(true);
|
||||
});
|
||||
}
|
||||
|
||||
void etcd_state_client_t::parse_state(const std::string & key, const json11::Json & value)
|
||||
{
|
||||
if (key == etcd_prefix+"/config/pgs")
|
||||
{
|
||||
for (auto & pg_item: this->pg_config)
|
||||
{
|
||||
pg_item.second.exists = false;
|
||||
}
|
||||
for (auto & pg_item: value["items"].object_items())
|
||||
{
|
||||
pg_num_t pg_num = stoull_full(pg_item.first);
|
||||
if (!pg_num)
|
||||
{
|
||||
printf("Bad key in PG configuration: %s (must be a number), skipped\n", pg_item.first.c_str());
|
||||
continue;
|
||||
}
|
||||
this->pg_config[pg_num].exists = true;
|
||||
this->pg_config[pg_num].pause = pg_item.second["pause"].bool_value();
|
||||
this->pg_config[pg_num].primary = pg_item.second["primary"].uint64_value();
|
||||
this->pg_config[pg_num].target_set.clear();
|
||||
for (auto pg_osd: pg_item.second["osd_set"].array_items())
|
||||
{
|
||||
this->pg_config[pg_num].target_set.push_back(pg_osd.uint64_value());
|
||||
}
|
||||
if (this->pg_config[pg_num].target_set.size() != 3)
|
||||
{
|
||||
printf("Bad PG %u config format: incorrect osd_set = %s\n", pg_num, pg_item.second["osd_set"].dump().c_str());
|
||||
this->pg_config[pg_num].target_set.resize(3);
|
||||
this->pg_config[pg_num].pause = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (key.substr(0, etcd_prefix.length()+12) == etcd_prefix+"/pg/history/")
|
||||
{
|
||||
// <etcd_prefix>/pg/history/%d
|
||||
pg_num_t pg_num = stoull_full(key.substr(etcd_prefix.length()+12));
|
||||
if (!pg_num)
|
||||
{
|
||||
printf("Bad etcd key %s, ignoring\n", key.c_str());
|
||||
}
|
||||
else
|
||||
{
|
||||
auto & pg_cfg = this->pg_config[pg_num];
|
||||
pg_cfg.target_history.clear();
|
||||
pg_cfg.all_peers.clear();
|
||||
// Refuse to start PG if any set of the <osd_sets> has no live OSDs
|
||||
for (auto hist_item: value["osd_sets"].array_items())
|
||||
{
|
||||
std::vector<osd_num_t> history_set;
|
||||
for (auto pg_osd: hist_item.array_items())
|
||||
{
|
||||
history_set.push_back(pg_osd.uint64_value());
|
||||
}
|
||||
pg_cfg.target_history.push_back(history_set);
|
||||
}
|
||||
// Include these additional OSDs when peering the PG
|
||||
for (auto pg_osd: value["all_peers"].array_items())
|
||||
{
|
||||
pg_cfg.all_peers.push_back(pg_osd.uint64_value());
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (key.substr(0, etcd_prefix.length()+10) == etcd_prefix+"/pg/state/")
|
||||
{
|
||||
// <etcd_prefix>/pg/state/%d
|
||||
pg_num_t pg_num = stoull_full(key.substr(etcd_prefix.length()+10));
|
||||
if (!pg_num)
|
||||
{
|
||||
printf("Bad etcd key %s, ignoring\n", key.c_str());
|
||||
}
|
||||
else if (value.is_null())
|
||||
{
|
||||
this->pg_config[pg_num].cur_primary = 0;
|
||||
this->pg_config[pg_num].cur_state = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
osd_num_t cur_primary = value["primary"].uint64_value();
|
||||
int state = 0;
|
||||
for (auto & e: value["state"].array_items())
|
||||
{
|
||||
int i;
|
||||
for (i = 0; i < pg_state_bit_count; i++)
|
||||
{
|
||||
if (e.string_value() == pg_state_names[i])
|
||||
{
|
||||
state = state | pg_state_bits[i];
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (i >= pg_state_bit_count)
|
||||
{
|
||||
printf("Unexpected PG %u state keyword in etcd: %s\n", pg_num, e.dump().c_str());
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (!cur_primary || !value["state"].is_array() || !state ||
|
||||
(state & PG_OFFLINE) && state != PG_OFFLINE ||
|
||||
(state & PG_PEERING) && state != PG_PEERING ||
|
||||
(state & PG_INCOMPLETE) && state != PG_INCOMPLETE)
|
||||
{
|
||||
printf("Unexpected PG %u state in etcd: primary=%lu, state=%s\n", pg_num, cur_primary, value["state"].dump().c_str());
|
||||
return;
|
||||
}
|
||||
this->pg_config[pg_num].cur_primary = cur_primary;
|
||||
this->pg_config[pg_num].cur_state = state;
|
||||
}
|
||||
}
|
||||
else if (key.substr(0, etcd_prefix.length()+11) == etcd_prefix+"/osd/state/")
|
||||
{
|
||||
// <etcd_prefix>/osd/state/%d
|
||||
osd_num_t peer_osd = std::stoull(key.substr(etcd_prefix.length()+11));
|
||||
if (peer_osd > 0)
|
||||
{
|
||||
if (value.is_object() && value["state"] == "up" &&
|
||||
value["addresses"].is_array() &&
|
||||
value["port"].int64_value() > 0 && value["port"].int64_value() < 65536)
|
||||
{
|
||||
this->peer_states[peer_osd] = value;
|
||||
}
|
||||
else
|
||||
{
|
||||
this->peer_states.erase(peer_osd);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,58 @@
|
|||
#pragma once
|
||||
|
||||
#include "http_client.h"
|
||||
#include "timerfd_manager.h"
|
||||
|
||||
#define ETCD_CONFIG_WATCH_ID 1
|
||||
#define ETCD_PG_STATE_WATCH_ID 2
|
||||
#define ETCD_PG_HISTORY_WATCH_ID 3
|
||||
#define ETCD_OSD_STATE_WATCH_ID 4
|
||||
|
||||
#define MAX_ETCD_ATTEMPTS 5
|
||||
#define ETCD_SLOW_TIMEOUT 5000
|
||||
#define ETCD_QUICK_TIMEOUT 1000
|
||||
|
||||
struct pg_config_t
|
||||
{
|
||||
bool exists;
|
||||
osd_num_t primary;
|
||||
std::vector<osd_num_t> target_set;
|
||||
std::vector<std::vector<osd_num_t>> target_history;
|
||||
std::vector<osd_num_t> all_peers;
|
||||
bool pause;
|
||||
osd_num_t cur_primary;
|
||||
int cur_state;
|
||||
};
|
||||
|
||||
struct json_kv_t
|
||||
{
|
||||
std::string key;
|
||||
json11::Json value;
|
||||
};
|
||||
|
||||
struct etcd_state_client_t
|
||||
{
|
||||
// FIXME Allow multiple etcd addresses and select random address
|
||||
std::string etcd_address, etcd_prefix, etcd_api_path;
|
||||
int log_level = 0;
|
||||
timerfd_manager_t *tfd = NULL;
|
||||
|
||||
int etcd_watches_initialised = 0;
|
||||
uint64_t etcd_watch_revision = 0;
|
||||
websocket_t *etcd_watch_ws = NULL;
|
||||
std::map<pg_num_t, pg_config_t> pg_config;
|
||||
std::map<osd_num_t, json11::Json> peer_states;
|
||||
|
||||
std::function<void(json11::Json::object &)> on_change_hook;
|
||||
std::function<void(json11::Json::object &)> on_load_config_hook;
|
||||
std::function<json11::Json()> load_pgs_checks_hook;
|
||||
std::function<void(bool)> on_load_pgs_hook;
|
||||
|
||||
json_kv_t parse_etcd_kv(const json11::Json & kv_json);
|
||||
void etcd_call(std::string api, json11::Json payload, int timeout, std::function<void(std::string, json11::Json)> callback);
|
||||
void etcd_txn(json11::Json txn, int timeout, std::function<void(std::string, json11::Json)> callback);
|
||||
void start_etcd_watcher();
|
||||
void load_global_config();
|
||||
void load_pgs();
|
||||
void parse_state(const std::string & key, const json11::Json & value);
|
||||
};
|
2
osd.cpp
2
osd.cpp
|
@ -407,7 +407,7 @@ void osd_t::stop_client(int peer_fd)
|
|||
{
|
||||
// Reload configuration from etcd when the connection is dropped
|
||||
printf("[OSD %lu] Stopping client %d (OSD peer %lu)\n", osd_num, peer_fd, cl.osd_num);
|
||||
peer_states.erase(cl.osd_num);
|
||||
st_cli.peer_states.erase(cl.osd_num);
|
||||
repeer_pgs(cl.osd_num);
|
||||
}
|
||||
else
|
||||
|
|
41
osd.h
41
osd.h
|
@ -18,8 +18,7 @@
|
|||
#include "timerfd_manager.h"
|
||||
#include "osd_ops.h"
|
||||
#include "osd_peering_pg.h"
|
||||
#include "http_client.h"
|
||||
#include "json11/json11.hpp"
|
||||
#include "etcd_state_client.h"
|
||||
|
||||
#define OSD_OP_IN 0
|
||||
#define OSD_OP_OUT 1
|
||||
|
@ -186,24 +185,6 @@ struct osd_wanted_peer_t
|
|||
int address_index;
|
||||
};
|
||||
|
||||
struct pg_config_t
|
||||
{
|
||||
bool exists;
|
||||
osd_num_t primary;
|
||||
std::vector<osd_num_t> target_set;
|
||||
std::vector<std::vector<osd_num_t>> target_history;
|
||||
std::vector<osd_num_t> all_peers;
|
||||
bool pause;
|
||||
osd_num_t cur_primary;
|
||||
int cur_state;
|
||||
};
|
||||
|
||||
struct json_kv_t
|
||||
{
|
||||
std::string key;
|
||||
json11::Json value;
|
||||
};
|
||||
|
||||
class osd_t
|
||||
{
|
||||
// config
|
||||
|
@ -231,16 +212,12 @@ class osd_t
|
|||
|
||||
// cluster state
|
||||
|
||||
etcd_state_client_t st_cli;
|
||||
int etcd_failed_attempts = 0;
|
||||
std::string etcd_lease_id;
|
||||
int etcd_watches_initialised = 0;
|
||||
uint64_t etcd_watch_revision = 0;
|
||||
websocket_t *etcd_watch_ws = NULL;
|
||||
json11::Json self_state;
|
||||
std::map<osd_num_t, json11::Json> peer_states;
|
||||
std::map<osd_num_t, osd_wanted_peer_t> wanted_peers;
|
||||
bool loading_peer_config = false;
|
||||
int etcd_failed_attempts = 0;
|
||||
std::map<pg_num_t, pg_config_t> pg_config;
|
||||
std::set<pg_num_t> pg_state_dirty;
|
||||
bool pg_config_applied = false;
|
||||
bool etcd_reporting_pg_state = false;
|
||||
|
@ -294,13 +271,12 @@ class osd_t
|
|||
uint64_t recovery_stat_bytes[2][2] = { 0 };
|
||||
|
||||
// cluster connection
|
||||
void etcd_call(std::string api, json11::Json payload, int timeout, std::function<void(std::string, json11::Json)> callback);
|
||||
void etcd_txn(json11::Json txn, int timeout, std::function<void(std::string, json11::Json)> callback);
|
||||
json_kv_t parse_etcd_kv(const json11::Json & kv_json);
|
||||
void parse_config(blockstore_config_t & config);
|
||||
void init_cluster();
|
||||
void start_etcd_watcher();
|
||||
void load_global_config();
|
||||
void on_change_etcd_state_hook(json11::Json::object & changes);
|
||||
void on_load_config_hook(json11::Json::object & changes);
|
||||
json11::Json on_load_pgs_checks_hook();
|
||||
void on_load_pgs_hook(bool success);
|
||||
void bind_socket();
|
||||
void acquire_lease();
|
||||
json11::Json get_osd_state();
|
||||
|
@ -312,12 +288,9 @@ class osd_t
|
|||
void report_statistics();
|
||||
void report_pg_state(pg_t & pg);
|
||||
void report_pg_states();
|
||||
void load_pgs();
|
||||
void parse_pg_state(const std::string & key, const json11::Json & value);
|
||||
void apply_pg_count();
|
||||
void apply_pg_config();
|
||||
void load_and_connect_peers();
|
||||
void parse_etcd_osd_state(const std::string & key, const json11::Json & value);
|
||||
|
||||
// event loop, socket read/write
|
||||
void loop();
|
||||
|
|
449
osd_cluster.cpp
449
osd_cluster.cpp
|
@ -1,45 +1,6 @@
|
|||
#include "osd.h"
|
||||
#include "base64.h"
|
||||
|
||||
#define ETCD_CONFIG_WATCH_ID 1
|
||||
#define ETCD_PG_STATE_WATCH_ID 2
|
||||
#define ETCD_PG_HISTORY_WATCH_ID 3
|
||||
#define ETCD_OSD_STATE_WATCH_ID 4
|
||||
|
||||
#define MAX_ETCD_ATTEMPTS 5
|
||||
#define ETCD_SLOW_TIMEOUT 5000
|
||||
#define ETCD_QUICK_TIMEOUT 1000
|
||||
|
||||
json_kv_t osd_t::parse_etcd_kv(const json11::Json & kv_json)
|
||||
{
|
||||
json_kv_t kv;
|
||||
kv.key = base64_decode(kv_json["key"].string_value());
|
||||
std::string json_err, json_text = base64_decode(kv_json["value"].string_value());
|
||||
kv.value = json_text == "" ? json11::Json() : json11::Json::parse(json_text, json_err);
|
||||
if (json_err != "")
|
||||
{
|
||||
printf("Bad JSON in etcd key %s: %s (value: %s)\n", kv.key.c_str(), json_err.c_str(), json_text.c_str());
|
||||
kv.key = "";
|
||||
}
|
||||
return kv;
|
||||
}
|
||||
|
||||
void osd_t::etcd_txn(json11::Json txn, int timeout, std::function<void(std::string, json11::Json)> callback)
|
||||
{
|
||||
etcd_call("/kv/txn", txn, timeout, callback);
|
||||
}
|
||||
|
||||
void osd_t::etcd_call(std::string api, json11::Json payload, int timeout, std::function<void(std::string, json11::Json)> callback)
|
||||
{
|
||||
std::string req = payload.dump();
|
||||
req = "POST "+etcd_api_path+api+" HTTP/1.1\r\n"
|
||||
"Host: "+etcd_address+"\r\n"
|
||||
"Content-Type: application/json\r\n"
|
||||
"Content-Length: "+std::to_string(req.size())+"\r\n"
|
||||
"Connection: close\r\n"
|
||||
"\r\n"+req;
|
||||
http_request_json(tfd, etcd_address, req, timeout, callback);
|
||||
}
|
||||
#include "etcd_state_client.h"
|
||||
|
||||
// Startup sequence:
|
||||
// Start etcd watcher -> Load global OSD configuration -> Bind socket -> Acquire lease -> Report&lock OSD state
|
||||
|
@ -62,7 +23,7 @@ void osd_t::init_cluster()
|
|||
parse_test_peer(pos < 0 ? peerstr : peerstr.substr(0, pos));
|
||||
peerstr = pos < 0 ? std::string("") : peerstr.substr(pos+1);
|
||||
}
|
||||
if (peer_states.size() < 2)
|
||||
if (st_cli.peer_states.size() < 2)
|
||||
{
|
||||
throw std::runtime_error("run_primary requires at least 2 peers");
|
||||
}
|
||||
|
@ -81,8 +42,17 @@ void osd_t::init_cluster()
|
|||
}
|
||||
else
|
||||
{
|
||||
st_cli.tfd = tfd;
|
||||
st_cli.etcd_address = etcd_address;
|
||||
st_cli.etcd_prefix = etcd_prefix;
|
||||
st_cli.etcd_api_path = etcd_api_path;
|
||||
st_cli.log_level = log_level;
|
||||
st_cli.on_change_hook = [this](json11::Json::object & changes) { on_change_etcd_state_hook(changes); };
|
||||
st_cli.on_load_config_hook = [this](json11::Json::object & cfg) { on_load_config_hook(cfg); };
|
||||
st_cli.load_pgs_checks_hook = [this]() { return on_load_pgs_checks_hook(); };
|
||||
st_cli.on_load_pgs_hook = [this](bool success) { on_load_pgs_hook(success); };
|
||||
peering_state = OSD_LOADING_PGS;
|
||||
load_global_config();
|
||||
st_cli.load_global_config();
|
||||
}
|
||||
if (run_primary && autosync_interval > 0)
|
||||
{
|
||||
|
@ -106,12 +76,12 @@ void osd_t::parse_test_peer(std::string peer)
|
|||
osd_num_t peer_osd = strtoull(osd_num_str.c_str(), NULL, 10);
|
||||
if (!peer_osd)
|
||||
throw new std::runtime_error("Could not parse OSD peer osd_num");
|
||||
else if (peer_states.find(peer_osd) != peer_states.end())
|
||||
else if (st_cli.peer_states.find(peer_osd) != st_cli.peer_states.end())
|
||||
throw std::runtime_error("Same osd number "+std::to_string(peer_osd)+" specified twice in peers");
|
||||
int port = strtoull(port_str.c_str(), NULL, 10);
|
||||
if (!port)
|
||||
throw new std::runtime_error("Could not parse OSD peer port");
|
||||
peer_states[peer_osd] = json11::Json::object {
|
||||
st_cli.peer_states[peer_osd] = json11::Json::object {
|
||||
{ "state", "up" },
|
||||
{ "addresses", json11::Json::array { addr } },
|
||||
{ "port", port },
|
||||
|
@ -220,7 +190,7 @@ void osd_t::report_statistics()
|
|||
} }
|
||||
});
|
||||
}
|
||||
etcd_txn(json11::Json::object { { "success", txn } }, ETCD_SLOW_TIMEOUT, [this](std::string err, json11::Json res)
|
||||
st_cli.etcd_txn(json11::Json::object { { "success", txn } }, ETCD_SLOW_TIMEOUT, [this](std::string err, json11::Json res)
|
||||
{
|
||||
etcd_reporting_stats = false;
|
||||
if (err != "")
|
||||
|
@ -240,157 +210,35 @@ void osd_t::report_statistics()
|
|||
});
|
||||
}
|
||||
|
||||
void osd_t::start_etcd_watcher()
|
||||
void osd_t::on_change_etcd_state_hook(json11::Json::object & changes)
|
||||
{
|
||||
etcd_watches_initialised = 0;
|
||||
etcd_watch_ws = open_websocket(tfd, etcd_address, etcd_api_path+"/watch", ETCD_SLOW_TIMEOUT, [this](const http_response_t *msg)
|
||||
{
|
||||
if (msg->body.length())
|
||||
{
|
||||
std::string json_err;
|
||||
json11::Json data = json11::Json::parse(msg->body, json_err);
|
||||
if (json_err != "")
|
||||
{
|
||||
printf("Bad JSON in etcd event: %s, ignoring event\n", json_err.c_str());
|
||||
}
|
||||
else
|
||||
{
|
||||
if (data["result"]["created"].bool_value())
|
||||
{
|
||||
etcd_watches_initialised++;
|
||||
}
|
||||
if (etcd_watches_initialised == 4)
|
||||
{
|
||||
etcd_watch_revision = data["result"]["header"]["revision"].uint64_value();
|
||||
}
|
||||
// First gather all changes into a hash to remove multiple overwrites
|
||||
json11::Json::object changes;
|
||||
for (auto & ev: data["result"]["events"].array_items())
|
||||
{
|
||||
auto kv = parse_etcd_kv(ev["kv"]);
|
||||
if (kv.key != "")
|
||||
{
|
||||
changes[kv.key] = kv.value;
|
||||
}
|
||||
}
|
||||
for (auto & kv: changes)
|
||||
{
|
||||
if (this->log_level > 0)
|
||||
{
|
||||
printf("Incoming event: %s -> %s\n", kv.first.c_str(), kv.second.dump().c_str());
|
||||
}
|
||||
if (kv.first.substr(0, etcd_prefix.length()+11) == etcd_prefix+"/osd/state/")
|
||||
{
|
||||
parse_etcd_osd_state(kv.first, kv.second);
|
||||
}
|
||||
else
|
||||
{
|
||||
parse_pg_state(kv.first, kv.second);
|
||||
}
|
||||
}
|
||||
apply_pg_count();
|
||||
apply_pg_config();
|
||||
}
|
||||
}
|
||||
if (msg->eof)
|
||||
{
|
||||
etcd_watch_ws = NULL;
|
||||
if (etcd_watches_initialised == 0)
|
||||
{
|
||||
// Connection not established, retry in <ETCD_SLOW_TIMEOUT>
|
||||
tfd->set_timer(ETCD_SLOW_TIMEOUT, false, [this](int)
|
||||
{
|
||||
start_etcd_watcher();
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
// Connection was live, retry immediately
|
||||
start_etcd_watcher();
|
||||
}
|
||||
}
|
||||
});
|
||||
// FIXME apply config changes in runtime
|
||||
etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object {
|
||||
{ "create_request", json11::Json::object {
|
||||
{ "key", base64_encode(etcd_prefix+"/config/") },
|
||||
{ "range_end", base64_encode(etcd_prefix+"/config0") },
|
||||
{ "start_revision", etcd_watch_revision+1 },
|
||||
{ "watch_id", ETCD_CONFIG_WATCH_ID },
|
||||
} }
|
||||
}).dump());
|
||||
etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object {
|
||||
{ "create_request", json11::Json::object {
|
||||
{ "key", base64_encode(etcd_prefix+"/osd/state/") },
|
||||
{ "range_end", base64_encode(etcd_prefix+"/osd/state0") },
|
||||
{ "start_revision", etcd_watch_revision+1 },
|
||||
{ "watch_id", ETCD_OSD_STATE_WATCH_ID },
|
||||
} }
|
||||
}).dump());
|
||||
etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object {
|
||||
{ "create_request", json11::Json::object {
|
||||
{ "key", base64_encode(etcd_prefix+"/pg/state/") },
|
||||
{ "range_end", base64_encode(etcd_prefix+"/pg/state0") },
|
||||
{ "start_revision", etcd_watch_revision+1 },
|
||||
{ "watch_id", ETCD_PG_STATE_WATCH_ID },
|
||||
} }
|
||||
}).dump());
|
||||
etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object {
|
||||
{ "create_request", json11::Json::object {
|
||||
{ "key", base64_encode(etcd_prefix+"/pg/history/") },
|
||||
{ "range_end", base64_encode(etcd_prefix+"/pg/history0") },
|
||||
{ "start_revision", etcd_watch_revision+1 },
|
||||
{ "watch_id", ETCD_PG_HISTORY_WATCH_ID },
|
||||
} }
|
||||
}).dump());
|
||||
// FIXME apply config changes in runtime (maybe, some)
|
||||
apply_pg_count();
|
||||
apply_pg_config();
|
||||
}
|
||||
|
||||
void osd_t::load_global_config()
|
||||
void osd_t::on_load_config_hook(json11::Json::object & global_config)
|
||||
{
|
||||
etcd_call("/kv/range", json11::Json::object {
|
||||
{ "key", base64_encode(etcd_prefix+"/config/global") }
|
||||
}, ETCD_SLOW_TIMEOUT, [this](std::string err, json11::Json data)
|
||||
blockstore_config_t osd_config = this->config;
|
||||
for (auto & cfg_var: global_config)
|
||||
{
|
||||
if (err != "")
|
||||
if (this->config.find(cfg_var.first) == this->config.end())
|
||||
{
|
||||
printf("Error reading OSD configuration from etcd: %s\n", err.c_str());
|
||||
tfd->set_timer(ETCD_SLOW_TIMEOUT, false, [this](int timer_id)
|
||||
{
|
||||
load_global_config();
|
||||
});
|
||||
return;
|
||||
// FIXME Convert int to str
|
||||
osd_config[cfg_var.first] = cfg_var.second.string_value();
|
||||
}
|
||||
if (!etcd_watch_revision)
|
||||
{
|
||||
etcd_watch_revision = data["header"]["revision"].uint64_value();
|
||||
}
|
||||
if (data["kvs"].array_items().size() > 0)
|
||||
{
|
||||
auto kv = parse_etcd_kv(data["kvs"][0]);
|
||||
if (kv.value.is_object())
|
||||
{
|
||||
blockstore_config_t osd_config = this->config;
|
||||
for (auto & cfg_var: kv.value.object_items())
|
||||
{
|
||||
if (this->config.find(cfg_var.first) == this->config.end())
|
||||
{
|
||||
osd_config[cfg_var.first] = cfg_var.second.string_value();
|
||||
}
|
||||
}
|
||||
parse_config(osd_config);
|
||||
}
|
||||
}
|
||||
bind_socket();
|
||||
start_etcd_watcher();
|
||||
acquire_lease();
|
||||
});
|
||||
}
|
||||
parse_config(osd_config);
|
||||
bind_socket();
|
||||
st_cli.start_etcd_watcher();
|
||||
acquire_lease();
|
||||
}
|
||||
|
||||
// Acquire lease
|
||||
void osd_t::acquire_lease()
|
||||
{
|
||||
// Maximum lease TTL is (report interval) + retries * (timeout + repeat interval)
|
||||
etcd_call("/lease/grant", json11::Json::object {
|
||||
st_cli.etcd_call("/lease/grant", json11::Json::object {
|
||||
{ "TTL", etcd_report_interval+(MAX_ETCD_ATTEMPTS*(2*ETCD_QUICK_TIMEOUT)+999)/1000 }
|
||||
}, ETCD_SLOW_TIMEOUT, [this](std::string err, json11::Json data)
|
||||
{
|
||||
|
@ -419,7 +267,7 @@ void osd_t::create_osd_state()
|
|||
{
|
||||
std::string state_key = base64_encode(etcd_prefix+"/osd/state/"+std::to_string(osd_num));
|
||||
self_state = get_osd_state();
|
||||
etcd_txn(json11::Json::object {
|
||||
st_cli.etcd_txn(json11::Json::object {
|
||||
// Check that the state key does not exist
|
||||
{ "compare", json11::Json::array {
|
||||
json11::Json::object {
|
||||
|
@ -456,7 +304,7 @@ void osd_t::create_osd_state()
|
|||
if (!data["succeeded"].bool_value())
|
||||
{
|
||||
// OSD is already up
|
||||
auto kv = parse_etcd_kv(data["responses"][0]["response_range"]["kvs"][0]);
|
||||
auto kv = st_cli.parse_etcd_kv(data["responses"][0]["response_range"]["kvs"][0]);
|
||||
printf("Key %s already exists in etcd, OSD %lu is still up\n", kv.key.c_str(), this->osd_num);
|
||||
int64_t port = kv.value["port"].int64_value();
|
||||
for (auto & addr: kv.value["addresses"].array_items())
|
||||
|
@ -468,7 +316,7 @@ void osd_t::create_osd_state()
|
|||
}
|
||||
if (run_primary)
|
||||
{
|
||||
load_pgs();
|
||||
st_cli.load_pgs();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -476,7 +324,7 @@ void osd_t::create_osd_state()
|
|||
// Renew lease
|
||||
void osd_t::renew_lease()
|
||||
{
|
||||
etcd_call("/lease/keepalive", json11::Json::object {
|
||||
st_cli.etcd_call("/lease/keepalive", json11::Json::object {
|
||||
{ "ID", etcd_lease_id }
|
||||
}, ETCD_QUICK_TIMEOUT, [this](std::string err, json11::Json data)
|
||||
{
|
||||
|
@ -512,7 +360,7 @@ void osd_t::force_stop(int exitcode)
|
|||
{
|
||||
if (etcd_lease_id != "")
|
||||
{
|
||||
etcd_call("/kv/lease/revoke", json11::Json::object {
|
||||
st_cli.etcd_call("/kv/lease/revoke", json11::Json::object {
|
||||
{ "ID", etcd_lease_id }
|
||||
}, ETCD_QUICK_TIMEOUT, [this, exitcode](std::string err, json11::Json data)
|
||||
{
|
||||
|
@ -531,7 +379,7 @@ void osd_t::force_stop(int exitcode)
|
|||
}
|
||||
}
|
||||
|
||||
void osd_t::load_pgs()
|
||||
json11::Json osd_t::on_load_pgs_checks_hook()
|
||||
{
|
||||
assert(this->pgs.size() == 0);
|
||||
json11::Json::array checks = {
|
||||
|
@ -541,174 +389,28 @@ void osd_t::load_pgs()
|
|||
{ "key", base64_encode(etcd_prefix+"/osd/state/"+std::to_string(osd_num)) },
|
||||
}
|
||||
};
|
||||
json11::Json::array txn = {
|
||||
json11::Json::object {
|
||||
{ "request_range", json11::Json::object {
|
||||
{ "key", base64_encode(etcd_prefix+"/config/pgs") },
|
||||
} }
|
||||
},
|
||||
json11::Json::object {
|
||||
{ "request_range", json11::Json::object {
|
||||
{ "key", base64_encode(etcd_prefix+"/pg/history/") },
|
||||
{ "range_end", base64_encode(etcd_prefix+"/pg/history0") },
|
||||
} }
|
||||
},
|
||||
json11::Json::object {
|
||||
{ "request_range", json11::Json::object {
|
||||
{ "key", base64_encode(etcd_prefix+"/pg/state/") },
|
||||
{ "range_end", base64_encode(etcd_prefix+"/pg/state0") },
|
||||
} }
|
||||
},
|
||||
};
|
||||
etcd_txn(json11::Json::object {
|
||||
{ "compare", checks }, { "success", txn }
|
||||
}, ETCD_SLOW_TIMEOUT, [this](std::string err, json11::Json data)
|
||||
{
|
||||
if (err != "")
|
||||
{
|
||||
printf("Error loading PGs from etcd: %s\n", err.c_str());
|
||||
tfd->set_timer(ETCD_SLOW_TIMEOUT, false, [this](int timer_id)
|
||||
{
|
||||
load_pgs();
|
||||
});
|
||||
return;
|
||||
}
|
||||
if (!data["succeeded"].bool_value())
|
||||
{
|
||||
printf("Error loading PGs from etcd: lease expired\n");
|
||||
force_stop(1);
|
||||
return;
|
||||
}
|
||||
peering_state &= ~OSD_LOADING_PGS;
|
||||
for (auto & res: data["responses"].array_items())
|
||||
{
|
||||
for (auto & kv_json: res["response_range"]["kvs"].array_items())
|
||||
{
|
||||
auto kv = parse_etcd_kv(kv_json);
|
||||
parse_pg_state(kv.key, kv.value);
|
||||
}
|
||||
}
|
||||
apply_pg_count();
|
||||
apply_pg_config();
|
||||
});
|
||||
return checks;
|
||||
}
|
||||
|
||||
void osd_t::parse_pg_state(const std::string & key, const json11::Json & value)
|
||||
void osd_t::on_load_pgs_hook(bool success)
|
||||
{
|
||||
if (key == etcd_prefix+"/config/pgs")
|
||||
if (!success)
|
||||
{
|
||||
for (auto & pg_item: this->pg_config)
|
||||
{
|
||||
pg_item.second.exists = false;
|
||||
}
|
||||
for (auto & pg_item: value["items"].object_items())
|
||||
{
|
||||
pg_num_t pg_num = stoull_full(pg_item.first);
|
||||
if (!pg_num)
|
||||
{
|
||||
printf("Bad key in PG configuration: %s (must be a number), skipped\n", pg_item.first.c_str());
|
||||
continue;
|
||||
}
|
||||
this->pg_config[pg_num].exists = true;
|
||||
this->pg_config[pg_num].pause = pg_item.second["pause"].bool_value();
|
||||
this->pg_config[pg_num].primary = pg_item.second["primary"].uint64_value();
|
||||
this->pg_config[pg_num].target_set.clear();
|
||||
for (auto pg_osd: pg_item.second["osd_set"].array_items())
|
||||
{
|
||||
this->pg_config[pg_num].target_set.push_back(pg_osd.uint64_value());
|
||||
}
|
||||
if (this->pg_config[pg_num].target_set.size() != 3)
|
||||
{
|
||||
printf("Bad PG %u config format: incorrect osd_set = %s\n", pg_num, pg_item.second["osd_set"].dump().c_str());
|
||||
this->pg_config[pg_num].target_set.resize(3);
|
||||
this->pg_config[pg_num].pause = true;
|
||||
}
|
||||
}
|
||||
printf("Error loading PGs from etcd: lease expired\n");
|
||||
force_stop(1);
|
||||
}
|
||||
else if (key.substr(0, etcd_prefix.length()+12) == etcd_prefix+"/pg/history/")
|
||||
else
|
||||
{
|
||||
// <etcd_prefix>/pg/history/%d
|
||||
pg_num_t pg_num = stoull_full(key.substr(etcd_prefix.length()+12));
|
||||
if (!pg_num)
|
||||
{
|
||||
printf("Bad etcd key %s, ignoring\n", key.c_str());
|
||||
}
|
||||
else
|
||||
{
|
||||
auto & pg_cfg = this->pg_config[pg_num];
|
||||
pg_cfg.target_history.clear();
|
||||
pg_cfg.all_peers.clear();
|
||||
// Refuse to start PG if any set of the <osd_sets> has no live OSDs
|
||||
for (auto hist_item: value["osd_sets"].array_items())
|
||||
{
|
||||
std::vector<osd_num_t> history_set;
|
||||
for (auto pg_osd: hist_item.array_items())
|
||||
{
|
||||
history_set.push_back(pg_osd.uint64_value());
|
||||
}
|
||||
pg_cfg.target_history.push_back(history_set);
|
||||
}
|
||||
// Include these additional OSDs when peering the PG
|
||||
for (auto pg_osd: value["all_peers"].array_items())
|
||||
{
|
||||
pg_cfg.all_peers.push_back(pg_osd.uint64_value());
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (key.substr(0, etcd_prefix.length()+10) == etcd_prefix+"/pg/state/")
|
||||
{
|
||||
// <etcd_prefix>/pg/state/%d
|
||||
pg_num_t pg_num = stoull_full(key.substr(etcd_prefix.length()+10));
|
||||
if (!pg_num)
|
||||
{
|
||||
printf("Bad etcd key %s, ignoring\n", key.c_str());
|
||||
}
|
||||
else if (value.is_null())
|
||||
{
|
||||
this->pg_config[pg_num].cur_primary = 0;
|
||||
this->pg_config[pg_num].cur_state = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
osd_num_t cur_primary = value["primary"].uint64_value();
|
||||
int state = 0;
|
||||
for (auto & e: value["state"].array_items())
|
||||
{
|
||||
int i;
|
||||
for (i = 0; i < pg_state_bit_count; i++)
|
||||
{
|
||||
if (e.string_value() == pg_state_names[i])
|
||||
{
|
||||
state = state | pg_state_bits[i];
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (i >= pg_state_bit_count)
|
||||
{
|
||||
printf("Unexpected PG %u state keyword in etcd: %s\n", pg_num, e.dump().c_str());
|
||||
force_stop(1);
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (!cur_primary || !value["state"].is_array() || !state ||
|
||||
(state & PG_OFFLINE) && state != PG_OFFLINE ||
|
||||
(state & PG_PEERING) && state != PG_PEERING ||
|
||||
(state & PG_INCOMPLETE) && state != PG_INCOMPLETE)
|
||||
{
|
||||
printf("Unexpected PG %u state in etcd: primary=%lu, state=%s\n", pg_num, cur_primary, value["state"].dump().c_str());
|
||||
force_stop(1);
|
||||
return;
|
||||
}
|
||||
this->pg_config[pg_num].cur_primary = cur_primary;
|
||||
this->pg_config[pg_num].cur_state = state;
|
||||
}
|
||||
peering_state &= ~OSD_LOADING_PGS;
|
||||
apply_pg_count();
|
||||
apply_pg_config();
|
||||
}
|
||||
}
|
||||
|
||||
void osd_t::apply_pg_count()
|
||||
{
|
||||
pg_num_t pg_count = pg_config.size();
|
||||
if (pg_count > 0 && (pg_config.begin()->first != 1 || std::prev(pg_config.end())->first != pg_count))
|
||||
pg_num_t pg_count = st_cli.pg_config.size();
|
||||
if (pg_count > 0 && (st_cli.pg_config.begin()->first != 1 || std::prev(st_cli.pg_config.end())->first != pg_count))
|
||||
{
|
||||
printf("Invalid PG configuration: PG numbers don't cover the whole 1..%d range\n", pg_count);
|
||||
force_stop(1);
|
||||
|
@ -741,7 +443,7 @@ void osd_t::apply_pg_count()
|
|||
void osd_t::apply_pg_config()
|
||||
{
|
||||
bool all_applied = true;
|
||||
for (auto & kv: pg_config)
|
||||
for (auto & kv: st_cli.pg_config)
|
||||
{
|
||||
pg_num_t pg_num = kv.first;
|
||||
auto & pg_cfg = kv.second;
|
||||
|
@ -962,7 +664,7 @@ void osd_t::report_pg_states()
|
|||
});
|
||||
}
|
||||
pg_state_dirty.clear();
|
||||
etcd_txn(json11::Json::object {
|
||||
st_cli.etcd_txn(json11::Json::object {
|
||||
{ "compare", checks }, { "success", success }, { "failure", failure }
|
||||
}, ETCD_QUICK_TIMEOUT, [this, reporting_pgs](std::string err, json11::Json data)
|
||||
{
|
||||
|
@ -984,15 +686,18 @@ void osd_t::report_pg_states()
|
|||
}
|
||||
for (auto & res: data["responses"].array_items())
|
||||
{
|
||||
auto kv = parse_etcd_kv(res["kvs"][0]);
|
||||
pg_num_t pg_num = stoull_full(kv.key.substr(etcd_prefix.length()+10));
|
||||
auto pg_it = pgs.find(pg_num);
|
||||
if (pg_it != pgs.end() && pg_it->second.state != PG_OFFLINE && pg_it->second.state != PG_STARTING)
|
||||
if (res["kvs"].array_items().size())
|
||||
{
|
||||
// Live PG state update failed
|
||||
printf("Failed to report state of PG %u which is live. Race condition detected, exiting\n", pg_num);
|
||||
force_stop(1);
|
||||
return;
|
||||
auto kv = st_cli.parse_etcd_kv(res["kvs"][0]);
|
||||
pg_num_t pg_num = stoull_full(kv.key.substr(etcd_prefix.length()+10));
|
||||
auto pg_it = pgs.find(pg_num);
|
||||
if (pg_it != pgs.end() && pg_it->second.state != PG_OFFLINE && pg_it->second.state != PG_STARTING)
|
||||
{
|
||||
// Live PG state update failed
|
||||
printf("Failed to report state of PG %u which is live. Race condition detected, exiting\n", pg_num);
|
||||
force_stop(1);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Retry after a short pause (hope we'll get some updates and update PG states accordingly)
|
||||
|
@ -1032,7 +737,7 @@ void osd_t::load_and_connect_peers()
|
|||
osd_num_t peer_osd = wp_it->first;
|
||||
if (osd_peer_fds.find(peer_osd) != osd_peer_fds.end())
|
||||
{
|
||||
// It shouldn't be here
|
||||
// Peer is already connected, it shouldn't be in wanted_peers
|
||||
wanted_peers.erase(wp_it++);
|
||||
if (!wanted_peers.size())
|
||||
{
|
||||
|
@ -1040,7 +745,7 @@ void osd_t::load_and_connect_peers()
|
|||
peering_state = peering_state & ~OSD_CONNECTING_PEERS;
|
||||
}
|
||||
}
|
||||
else if (peer_states.find(peer_osd) == peer_states.end())
|
||||
else if (st_cli.peer_states.find(peer_osd) == st_cli.peer_states.end())
|
||||
{
|
||||
if (!loading_peer_config && (time(NULL) - wp_it->second.last_load_attempt >= peer_connect_interval))
|
||||
{
|
||||
|
@ -1059,16 +764,16 @@ void osd_t::load_and_connect_peers()
|
|||
{
|
||||
// Try to connect
|
||||
wp_it->second.connecting = true;
|
||||
const std::string addr = peer_states[peer_osd]["addresses"][wp_it->second.address_index].string_value();
|
||||
int64_t peer_port = peer_states[peer_osd]["port"].int64_value();
|
||||
const std::string addr = st_cli.peer_states[peer_osd]["addresses"][wp_it->second.address_index].string_value();
|
||||
int64_t peer_port = st_cli.peer_states[peer_osd]["port"].int64_value();
|
||||
wp_it++;
|
||||
connect_peer(peer_osd, addr.c_str(), peer_port, [this](osd_num_t peer_osd, int peer_fd)
|
||||
{
|
||||
wanted_peers[peer_osd].connecting = false;
|
||||
if (peer_fd < 0)
|
||||
{
|
||||
int64_t peer_port = peer_states[peer_osd]["port"].int64_value();
|
||||
auto & addrs = peer_states[peer_osd]["addresses"].array_items();
|
||||
int64_t peer_port = st_cli.peer_states[peer_osd]["port"].int64_value();
|
||||
auto & addrs = st_cli.peer_states[peer_osd]["addresses"].array_items();
|
||||
const char *addr = addrs[wanted_peers[peer_osd].address_index].string_value().c_str();
|
||||
printf("Failed to connect to peer OSD %lu address %s port %ld: %s\n", peer_osd, addr, peer_port, strerror(-peer_fd));
|
||||
if (wanted_peers[peer_osd].address_index < addrs.size()-1)
|
||||
|
@ -1079,7 +784,7 @@ void osd_t::load_and_connect_peers()
|
|||
else
|
||||
{
|
||||
wanted_peers[peer_osd].last_connect_attempt = time(NULL);
|
||||
peer_states.erase(peer_osd);
|
||||
st_cli.peer_states.erase(peer_osd);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
@ -1102,7 +807,7 @@ void osd_t::load_and_connect_peers()
|
|||
}
|
||||
if (load_peer_txn.size() > 0)
|
||||
{
|
||||
etcd_txn(json11::Json::object { { "success", load_peer_txn } }, ETCD_QUICK_TIMEOUT, [this](std::string err, json11::Json data)
|
||||
st_cli.etcd_txn(json11::Json::object { { "success", load_peer_txn } }, ETCD_QUICK_TIMEOUT, [this](std::string err, json11::Json data)
|
||||
{
|
||||
// Ugly, but required to wake up the loop and retry connecting after <peer_connect_interval> seconds
|
||||
tfd->set_timer(peer_connect_interval*1000, false, [](int timer_id){});
|
||||
|
@ -1116,22 +821,10 @@ void osd_t::load_and_connect_peers()
|
|||
{
|
||||
if (res["response_range"]["kvs"].array_items().size())
|
||||
{
|
||||
auto kv = parse_etcd_kv(res["response_range"]["kvs"][0]);
|
||||
parse_etcd_osd_state(kv.key, kv.value);
|
||||
auto kv = st_cli.parse_etcd_kv(res["response_range"]["kvs"][0]);
|
||||
st_cli.parse_state(kv.key, kv.value);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
void osd_t::parse_etcd_osd_state(const std::string & key, const json11::Json & value)
|
||||
{
|
||||
// <etcd_prefix>/osd/state/<osd_num>
|
||||
osd_num_t peer_osd = std::stoull(key.substr(etcd_prefix.length()+11));
|
||||
if (peer_osd > 0 && value.is_object() && value["state"] == "up" &&
|
||||
value["addresses"].is_array() &&
|
||||
value["port"].int64_value() > 0 && value["port"].int64_value() < 65536)
|
||||
{
|
||||
peer_states[peer_osd] = value;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -383,35 +383,3 @@ void pg_t::print_state()
|
|||
total_count
|
||||
);
|
||||
}
|
||||
|
||||
const int pg_state_bit_count = 13;
|
||||
|
||||
const int pg_state_bits[13] = {
|
||||
PG_STARTING,
|
||||
PG_PEERING,
|
||||
PG_INCOMPLETE,
|
||||
PG_ACTIVE,
|
||||
PG_STOPPING,
|
||||
PG_OFFLINE,
|
||||
PG_DEGRADED,
|
||||
PG_HAS_INCOMPLETE,
|
||||
PG_HAS_DEGRADED,
|
||||
PG_HAS_MISPLACED,
|
||||
PG_HAS_UNCLEAN,
|
||||
PG_LEFT_ON_DEAD,
|
||||
};
|
||||
|
||||
const char *pg_state_names[13] = {
|
||||
"starting",
|
||||
"peering",
|
||||
"incomplete",
|
||||
"active",
|
||||
"stopping",
|
||||
"offline",
|
||||
"degraded",
|
||||
"has_incomplete",
|
||||
"has_degraded",
|
||||
"has_misplaced",
|
||||
"has_unclean",
|
||||
"left_on_dead",
|
||||
};
|
||||
|
|
|
@ -7,38 +7,7 @@
|
|||
|
||||
#include "object_id.h"
|
||||
#include "osd_ops.h"
|
||||
|
||||
// Placement group states
|
||||
// STARTING -> [acquire lock] -> PEERING -> INCOMPLETE|ACTIVE -> STOPPING -> OFFLINE -> [release lock]
|
||||
// Exactly one of these:
|
||||
#define PG_STARTING (1<<0)
|
||||
#define PG_PEERING (1<<1)
|
||||
#define PG_INCOMPLETE (1<<2)
|
||||
#define PG_ACTIVE (1<<3)
|
||||
#define PG_STOPPING (1<<4)
|
||||
#define PG_OFFLINE (1<<5)
|
||||
// Plus any of these:
|
||||
#define PG_DEGRADED (1<<6)
|
||||
#define PG_HAS_INCOMPLETE (1<<7)
|
||||
#define PG_HAS_DEGRADED (1<<8)
|
||||
#define PG_HAS_MISPLACED (1<<9)
|
||||
#define PG_HAS_UNCLEAN (1<<10)
|
||||
#define PG_LEFT_ON_DEAD (1<<11)
|
||||
|
||||
// FIXME: Safe default that doesn't depend on pg_stripe_size or pg_block_size
|
||||
#define STRIPE_MASK ((uint64_t)4096 - 1)
|
||||
|
||||
// OSD object states
|
||||
#define OBJ_DEGRADED 0x02
|
||||
#define OBJ_INCOMPLETE 0x04
|
||||
#define OBJ_MISPLACED 0x08
|
||||
#define OBJ_NEEDS_STABLE 0x10000
|
||||
#define OBJ_NEEDS_ROLLBACK 0x20000
|
||||
#define OBJ_BUGGY 0x80000
|
||||
|
||||
extern const int pg_state_bits[];
|
||||
extern const char *pg_state_names[];
|
||||
extern const int pg_state_bit_count;
|
||||
#include "pg_states.h"
|
||||
|
||||
struct pg_obj_loc_t
|
||||
{
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
#include "pg_states.h"
|
||||
|
||||
const int pg_state_bit_count = 13;
|
||||
|
||||
const int pg_state_bits[13] = {
|
||||
PG_STARTING,
|
||||
PG_PEERING,
|
||||
PG_INCOMPLETE,
|
||||
PG_ACTIVE,
|
||||
PG_STOPPING,
|
||||
PG_OFFLINE,
|
||||
PG_DEGRADED,
|
||||
PG_HAS_INCOMPLETE,
|
||||
PG_HAS_DEGRADED,
|
||||
PG_HAS_MISPLACED,
|
||||
PG_HAS_UNCLEAN,
|
||||
PG_LEFT_ON_DEAD,
|
||||
};
|
||||
|
||||
const char *pg_state_names[13] = {
|
||||
"starting",
|
||||
"peering",
|
||||
"incomplete",
|
||||
"active",
|
||||
"stopping",
|
||||
"offline",
|
||||
"degraded",
|
||||
"has_incomplete",
|
||||
"has_degraded",
|
||||
"has_misplaced",
|
||||
"has_unclean",
|
||||
"left_on_dead",
|
||||
};
|
|
@ -0,0 +1,33 @@
|
|||
#pragma once
|
||||
|
||||
// Placement group states
|
||||
// STARTING -> [acquire lock] -> PEERING -> INCOMPLETE|ACTIVE -> STOPPING -> OFFLINE -> [release lock]
|
||||
// Exactly one of these:
|
||||
#define PG_STARTING (1<<0)
|
||||
#define PG_PEERING (1<<1)
|
||||
#define PG_INCOMPLETE (1<<2)
|
||||
#define PG_ACTIVE (1<<3)
|
||||
#define PG_STOPPING (1<<4)
|
||||
#define PG_OFFLINE (1<<5)
|
||||
// Plus any of these:
|
||||
#define PG_DEGRADED (1<<6)
|
||||
#define PG_HAS_INCOMPLETE (1<<7)
|
||||
#define PG_HAS_DEGRADED (1<<8)
|
||||
#define PG_HAS_MISPLACED (1<<9)
|
||||
#define PG_HAS_UNCLEAN (1<<10)
|
||||
#define PG_LEFT_ON_DEAD (1<<11)
|
||||
|
||||
// FIXME: Safe default that doesn't depend on pg_stripe_size or pg_block_size
|
||||
#define STRIPE_MASK ((uint64_t)4096 - 1)
|
||||
|
||||
// OSD object states
|
||||
#define OBJ_DEGRADED 0x02
|
||||
#define OBJ_INCOMPLETE 0x04
|
||||
#define OBJ_MISPLACED 0x08
|
||||
#define OBJ_NEEDS_STABLE 0x10000
|
||||