forked from vitalif/vitastor
Add PING operation and timeouts to detect OSD failures when a host goes down
parent
836635c518
commit
ad577c4aac
|
@ -8,13 +8,11 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd
|
|||
{
|
||||
this->ringloop = ringloop;
|
||||
this->tfd = tfd;
|
||||
|
||||
log_level = config["log_level"].int64_value();
|
||||
this->config = config;
|
||||
|
||||
msgr.osd_num = 0;
|
||||
msgr.tfd = tfd;
|
||||
msgr.ringloop = ringloop;
|
||||
msgr.log_level = log_level;
|
||||
msgr.repeer_pgs = [this](osd_num_t peer_osd)
|
||||
{
|
||||
if (msgr.osd_peer_fds.find(peer_osd) != msgr.osd_peer_fds.end())
|
||||
|
@ -67,8 +65,7 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd
|
|||
msgr.stop_client(op->peer_fd);
|
||||
delete op;
|
||||
};
|
||||
msgr.use_sync_send_recv = config["use_sync_send_recv"].bool_value() ||
|
||||
config["use_sync_send_recv"].uint64_value();
|
||||
msgr.init();
|
||||
|
||||
st_cli.tfd = tfd;
|
||||
st_cli.on_load_config_hook = [this](json11::Json::object & cfg) { on_load_config_hook(cfg); };
|
||||
|
@ -185,16 +182,8 @@ void cluster_client_t::on_load_config_hook(json11::Json::object & config)
|
|||
{
|
||||
up_wait_retry_interval = 50;
|
||||
}
|
||||
msgr.peer_connect_interval = config["peer_connect_interval"].uint64_value();
|
||||
if (!msgr.peer_connect_interval)
|
||||
{
|
||||
msgr.peer_connect_interval = DEFAULT_PEER_CONNECT_INTERVAL;
|
||||
}
|
||||
msgr.peer_connect_timeout = config["peer_connect_timeout"].uint64_value();
|
||||
if (!msgr.peer_connect_timeout)
|
||||
{
|
||||
msgr.peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT;
|
||||
}
|
||||
msgr.parse_config(config);
|
||||
msgr.parse_config(this->config);
|
||||
st_cli.load_pgs();
|
||||
}
|
||||
|
||||
|
|
|
@ -82,6 +82,7 @@ class cluster_client_t
|
|||
public:
|
||||
etcd_state_client_t st_cli;
|
||||
osd_messenger_t msgr;
|
||||
json11::Json config;
|
||||
|
||||
cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd, json11::Json & config);
|
||||
~cluster_client_t();
|
||||
|
|
|
@ -26,14 +26,106 @@ osd_op_t::~osd_op_t()
|
|||
}
|
||||
}
|
||||
|
||||
void osd_messenger_t::init()
|
||||
{
|
||||
keepalive_timer_id = tfd->set_timer(1000, true, [this](int)
|
||||
{
|
||||
for (auto cl_it = clients.begin(); cl_it != clients.end();)
|
||||
{
|
||||
auto cl = (cl_it++)->second;
|
||||
if (!cl->osd_num)
|
||||
{
|
||||
// Do not run keepalive on regular clients
|
||||
continue;
|
||||
}
|
||||
if (cl->ping_time_remaining > 0)
|
||||
{
|
||||
cl->ping_time_remaining--;
|
||||
if (!cl->ping_time_remaining)
|
||||
{
|
||||
// Ping timed out, stop the client
|
||||
stop_client(cl->peer_fd, true);
|
||||
}
|
||||
}
|
||||
else if (cl->idle_time_remaining > 0)
|
||||
{
|
||||
cl->idle_time_remaining--;
|
||||
if (!cl->idle_time_remaining)
|
||||
{
|
||||
// Connection is idle for <osd_idle_time>, send ping
|
||||
osd_op_t *op = new osd_op_t();
|
||||
op->op_type = OSD_OP_OUT;
|
||||
op->peer_fd = cl->peer_fd;
|
||||
op->req = (osd_any_op_t){
|
||||
.hdr = {
|
||||
.magic = SECONDARY_OSD_OP_MAGIC,
|
||||
.id = this->next_subop_id++,
|
||||
.opcode = OSD_OP_PING,
|
||||
},
|
||||
};
|
||||
op->callback = [this, cl](osd_op_t *op)
|
||||
{
|
||||
int fail_fd = (op->reply.hdr.retval != 0 ? op->peer_fd : -1);
|
||||
cl->ping_time_remaining = 0;
|
||||
delete op;
|
||||
if (fail_fd >= 0)
|
||||
{
|
||||
stop_client(fail_fd, true);
|
||||
}
|
||||
};
|
||||
outbox_push(op);
|
||||
cl->ping_time_remaining = osd_ping_timeout;
|
||||
cl->idle_time_remaining = osd_idle_timeout;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
cl->idle_time_remaining = osd_idle_timeout;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
osd_messenger_t::~osd_messenger_t()
|
||||
{
|
||||
if (keepalive_timer_id >= 0)
|
||||
{
|
||||
tfd->clear_timer(keepalive_timer_id);
|
||||
keepalive_timer_id = -1;
|
||||
}
|
||||
while (clients.size() > 0)
|
||||
{
|
||||
stop_client(clients.begin()->first, true);
|
||||
}
|
||||
}
|
||||
|
||||
void osd_messenger_t::parse_config(const json11::Json & config)
|
||||
{
|
||||
this->use_sync_send_recv = config["use_sync_send_recv"].bool_value() ||
|
||||
config["use_sync_send_recv"].uint64_value();
|
||||
this->peer_connect_interval = config["peer_connect_interval"].uint64_value();
|
||||
if (!this->peer_connect_interval)
|
||||
{
|
||||
this->peer_connect_interval = DEFAULT_PEER_CONNECT_INTERVAL;
|
||||
}
|
||||
this->peer_connect_timeout = config["peer_connect_timeout"].uint64_value();
|
||||
if (!this->peer_connect_timeout)
|
||||
{
|
||||
this->peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT;
|
||||
}
|
||||
this->osd_idle_timeout = config["osd_idle_timeout"].uint64_value();
|
||||
if (!this->osd_idle_timeout)
|
||||
{
|
||||
this->osd_idle_timeout = DEFAULT_OSD_PING_TIMEOUT;
|
||||
}
|
||||
this->osd_ping_timeout = config["osd_ping_timeout"].uint64_value();
|
||||
if (!this->osd_ping_timeout)
|
||||
{
|
||||
this->osd_ping_timeout = DEFAULT_OSD_PING_TIMEOUT;
|
||||
}
|
||||
this->log_level = config["log_level"].uint64_value();
|
||||
}
|
||||
|
||||
void osd_messenger_t::connect_peer(uint64_t peer_osd, json11::Json peer_state)
|
||||
{
|
||||
if (wanted_peers.find(peer_osd) == wanted_peers.end())
|
||||
|
|
|
@ -34,6 +34,7 @@
|
|||
|
||||
#define DEFAULT_PEER_CONNECT_INTERVAL 5
|
||||
#define DEFAULT_PEER_CONNECT_TIMEOUT 5
|
||||
#define DEFAULT_OSD_PING_TIMEOUT 5
|
||||
|
||||
// Kind of a vector with small-list-optimisation
|
||||
struct osd_op_buf_list_t
|
||||
|
@ -198,6 +199,8 @@ struct osd_client_t
|
|||
int peer_fd;
|
||||
int peer_state;
|
||||
int connect_timeout_id = -1;
|
||||
int ping_time_remaining = 0;
|
||||
int idle_time_remaining = 0;
|
||||
osd_num_t osd_num = 0;
|
||||
|
||||
void *in_buf = NULL;
|
||||
|
@ -251,6 +254,7 @@ struct osd_messenger_t
|
|||
{
|
||||
timerfd_manager_t *tfd;
|
||||
ring_loop_t *ringloop;
|
||||
int keepalive_timer_id = -1;
|
||||
|
||||
// osd_num_t is only for logging and asserts
|
||||
osd_num_t osd_num;
|
||||
|
@ -258,6 +262,8 @@ struct osd_messenger_t
|
|||
int receive_buffer_size = 64*1024;
|
||||
int peer_connect_interval = DEFAULT_PEER_CONNECT_INTERVAL;
|
||||
int peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT;
|
||||
int osd_idle_timeout = DEFAULT_OSD_PING_TIMEOUT;
|
||||
int osd_ping_timeout = DEFAULT_OSD_PING_TIMEOUT;
|
||||
int log_level = 0;
|
||||
bool use_sync_send_recv = false;
|
||||
|
||||
|
@ -274,6 +280,8 @@ struct osd_messenger_t
|
|||
osd_op_stats_t stats;
|
||||
|
||||
public:
|
||||
void init();
|
||||
void parse_config(const json11::Json & config);
|
||||
void connect_peer(uint64_t osd_num, json11::Json peer_state);
|
||||
void stop_client(int peer_fd, bool force = false);
|
||||
void outbox_push(osd_op_t *cur_op);
|
||||
|
|
16
src/osd.cpp
16
src/osd.cpp
|
@ -37,6 +37,7 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo
|
|||
c_cli.ringloop = this->ringloop;
|
||||
c_cli.exec_op = [this](osd_op_t *op) { exec_op(op); };
|
||||
c_cli.repeer_pgs = [this](osd_num_t peer_osd) { repeer_pgs(peer_osd); };
|
||||
c_cli.init();
|
||||
|
||||
init_cluster();
|
||||
|
||||
|
@ -100,14 +101,7 @@ void osd_t::parse_config(blockstore_config_t & config)
|
|||
slow_log_interval = strtoull(config["slow_log_interval"].c_str(), NULL, 10);
|
||||
if (!slow_log_interval)
|
||||
slow_log_interval = 10;
|
||||
c_cli.peer_connect_interval = strtoull(config["peer_connect_interval"].c_str(), NULL, 10);
|
||||
if (!c_cli.peer_connect_interval)
|
||||
c_cli.peer_connect_interval = DEFAULT_PEER_CONNECT_INTERVAL;
|
||||
c_cli.peer_connect_timeout = strtoull(config["peer_connect_timeout"].c_str(), NULL, 10);
|
||||
if (!c_cli.peer_connect_timeout)
|
||||
c_cli.peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT;
|
||||
log_level = strtoull(config["log_level"].c_str(), NULL, 10);
|
||||
c_cli.log_level = log_level;
|
||||
c_cli.parse_config(json_config);
|
||||
}
|
||||
|
||||
void osd_t::bind_socket()
|
||||
|
@ -211,6 +205,12 @@ void osd_t::exec_op(osd_op_t *cur_op)
|
|||
finish_op(cur_op, -EINVAL);
|
||||
return;
|
||||
}
|
||||
if (cur_op->req.hdr.opcode == OSD_OP_PING)
|
||||
{
|
||||
// Pong
|
||||
finish_op(cur_op, 0);
|
||||
return;
|
||||
}
|
||||
if (readonly &&
|
||||
cur_op->req.hdr.opcode != OSD_OP_SEC_READ &&
|
||||
cur_op->req.hdr.opcode != OSD_OP_SEC_LIST &&
|
||||
|
|
|
@ -19,4 +19,5 @@ const char* osd_op_names[] = {
|
|||
"primary_write",
|
||||
"primary_sync",
|
||||
"primary_delete",
|
||||
"ping",
|
||||
};
|
||||
|
|
|
@ -27,7 +27,8 @@
|
|||
#define OSD_OP_WRITE 12
|
||||
#define OSD_OP_SYNC 13
|
||||
#define OSD_OP_DELETE 14
|
||||
#define OSD_OP_MAX 14
|
||||
#define OSD_OP_PING 15
|
||||
#define OSD_OP_MAX 15
|
||||
// Alignment & limit for read/write operations
|
||||
#ifndef MEM_ALIGNMENT
|
||||
#define MEM_ALIGNMENT 512
|
||||
|
|
Loading…
Reference in New Issue