forked from vitalif/vitastor
Check peer config (at least, number) after connecting
parent
642802b595
commit
6a21ea207e
5
osd.cpp
5
osd.cpp
|
@ -292,7 +292,8 @@ restart:
|
||||||
while ((peer_fd = accept(listen_fd, (sockaddr*)&addr, &peer_addr_size)) >= 0)
|
while ((peer_fd = accept(listen_fd, (sockaddr*)&addr, &peer_addr_size)) >= 0)
|
||||||
{
|
{
|
||||||
char peer_str[256];
|
char peer_str[256];
|
||||||
printf("osd: new client %d: connection from %s port %d\n", peer_fd, inet_ntop(AF_INET, &addr.sin_addr, peer_str, 256), ntohs(addr.sin_port));
|
printf("[OSD %lu] new client %d: connection from %s port %d\n", this->osd_num, peer_fd,
|
||||||
|
inet_ntop(AF_INET, &addr.sin_addr, peer_str, 256), ntohs(addr.sin_port));
|
||||||
fcntl(peer_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
|
fcntl(peer_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
|
||||||
int one = 1;
|
int one = 1;
|
||||||
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
|
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
|
||||||
|
@ -333,7 +334,7 @@ restart:
|
||||||
else if (events[i].events & EPOLLRDHUP)
|
else if (events[i].events & EPOLLRDHUP)
|
||||||
{
|
{
|
||||||
// Stop client
|
// Stop client
|
||||||
printf("osd: client %d disconnected\n", cl.peer_fd);
|
printf("[OSD %lu] client %d disconnected\n", this->osd_num, cl.peer_fd);
|
||||||
stop_client(cl.peer_fd);
|
stop_client(cl.peer_fd);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
|
1
osd.h
1
osd.h
|
@ -297,6 +297,7 @@ class osd_t
|
||||||
// peer handling (primary OSD logic)
|
// peer handling (primary OSD logic)
|
||||||
void connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port, std::function<void(osd_num_t, int)> callback);
|
void connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port, std::function<void(osd_num_t, int)> callback);
|
||||||
void handle_connect_result(int peer_fd);
|
void handle_connect_result(int peer_fd);
|
||||||
|
void check_peer_config(osd_client_t & cl);
|
||||||
void cancel_osd_ops(osd_client_t & cl);
|
void cancel_osd_ops(osd_client_t & cl);
|
||||||
void cancel_op(osd_op_t *op);
|
void cancel_op(osd_op_t *op);
|
||||||
void stop_client(int peer_fd);
|
void stop_client(int peer_fd);
|
||||||
|
|
|
@ -14,7 +14,7 @@ void osd_t::init_cluster()
|
||||||
{
|
{
|
||||||
report_status();
|
report_status();
|
||||||
}
|
}
|
||||||
printf("OSD %lu reporting to Consul at %s each %d seconds\n", osd_num, consul_address.c_str(), consul_report_interval);
|
printf("[OSD %lu] reporting to Consul at %s each %d seconds\n", osd_num, consul_address.c_str(), consul_report_interval);
|
||||||
this->consul_tfd = new timerfd_interval(ringloop, consul_report_interval, [this]()
|
this->consul_tfd = new timerfd_interval(ringloop, consul_report_interval, [this]()
|
||||||
{
|
{
|
||||||
report_status();
|
report_status();
|
||||||
|
|
|
@ -54,7 +54,6 @@ void osd_t::connect_peer(osd_num_t peer_osd, const char *peer_host, int peer_por
|
||||||
.osd_num = peer_osd,
|
.osd_num = peer_osd,
|
||||||
.in_buf = malloc(receive_buffer_size),
|
.in_buf = malloc(receive_buffer_size),
|
||||||
};
|
};
|
||||||
osd_peer_fds[peer_osd] = peer_fd;
|
|
||||||
// Add FD to epoll (EPOLLOUT for tracking connect() result)
|
// Add FD to epoll (EPOLLOUT for tracking connect() result)
|
||||||
epoll_event ev;
|
epoll_event ev;
|
||||||
ev.data.fd = peer_fd;
|
ev.data.fd = peer_fd;
|
||||||
|
@ -74,7 +73,6 @@ void osd_t::handle_connect_result(int peer_fd)
|
||||||
cl.connect_timeout_id = -1;
|
cl.connect_timeout_id = -1;
|
||||||
}
|
}
|
||||||
osd_num_t peer_osd = cl.osd_num;
|
osd_num_t peer_osd = cl.osd_num;
|
||||||
auto callback = cl.connect_callback;
|
|
||||||
int result = 0;
|
int result = 0;
|
||||||
socklen_t result_len = sizeof(result);
|
socklen_t result_len = sizeof(result);
|
||||||
if (getsockopt(peer_fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0)
|
if (getsockopt(peer_fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0)
|
||||||
|
@ -83,6 +81,7 @@ void osd_t::handle_connect_result(int peer_fd)
|
||||||
}
|
}
|
||||||
if (result != 0)
|
if (result != 0)
|
||||||
{
|
{
|
||||||
|
auto callback = cl.connect_callback;
|
||||||
stop_client(peer_fd);
|
stop_client(peer_fd);
|
||||||
callback(peer_osd, -result);
|
callback(peer_osd, -result);
|
||||||
return;
|
return;
|
||||||
|
@ -90,7 +89,6 @@ void osd_t::handle_connect_result(int peer_fd)
|
||||||
int one = 1;
|
int one = 1;
|
||||||
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
|
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
|
||||||
// Disable EPOLLOUT on this fd
|
// Disable EPOLLOUT on this fd
|
||||||
cl.connect_callback = NULL;
|
|
||||||
cl.peer_state = PEER_CONNECTED;
|
cl.peer_state = PEER_CONNECTED;
|
||||||
epoll_event ev;
|
epoll_event ev;
|
||||||
ev.data.fd = peer_fd;
|
ev.data.fd = peer_fd;
|
||||||
|
@ -99,7 +97,59 @@ void osd_t::handle_connect_result(int peer_fd)
|
||||||
{
|
{
|
||||||
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
|
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
|
||||||
}
|
}
|
||||||
callback(peer_osd, peer_fd);
|
// Check OSD number
|
||||||
|
check_peer_config(cl);
|
||||||
|
}
|
||||||
|
|
||||||
|
void osd_t::check_peer_config(osd_client_t & cl)
|
||||||
|
{
|
||||||
|
osd_op_t *op = new osd_op_t();
|
||||||
|
op->op_type = OSD_OP_OUT;
|
||||||
|
op->send_list.push_back(op->req.buf, OSD_PACKET_SIZE);
|
||||||
|
op->peer_fd = cl.peer_fd;
|
||||||
|
op->req = {
|
||||||
|
.show_conf = {
|
||||||
|
.header = {
|
||||||
|
.magic = SECONDARY_OSD_OP_MAGIC,
|
||||||
|
.id = this->next_subop_id++,
|
||||||
|
.opcode = OSD_OP_SHOW_CONFIG,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
op->callback = [this](osd_op_t *op)
|
||||||
|
{
|
||||||
|
std::string json_err;
|
||||||
|
json11::Json config = json11::Json::parse(std::string((char*)op->buf), json_err);
|
||||||
|
osd_client_t & cl = clients[op->peer_fd];
|
||||||
|
bool err = false;
|
||||||
|
if (op->reply.hdr.retval < 0)
|
||||||
|
{
|
||||||
|
err = true;
|
||||||
|
printf("Failed to get config from OSD %lu (retval=%ld), disconnecting peer\n", cl.osd_num, op->reply.hdr.retval);
|
||||||
|
}
|
||||||
|
else if (json_err != "")
|
||||||
|
{
|
||||||
|
err = true;
|
||||||
|
printf("Failed to get config from OSD %lu: bad JSON: %s, disconnecting peer\n", cl.osd_num, json_err.c_str());
|
||||||
|
}
|
||||||
|
else if (config["osd_num"].uint64_value() != cl.osd_num)
|
||||||
|
{
|
||||||
|
err = true;
|
||||||
|
printf("Connected to OSD %lu instead of OSD %lu, peer state is outdated, disconnecting peer\n", config["osd_num"].uint64_value(), cl.osd_num);
|
||||||
|
}
|
||||||
|
if (err)
|
||||||
|
{
|
||||||
|
stop_client(op->peer_fd);
|
||||||
|
delete op;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
osd_peer_fds[cl.osd_num] = cl.peer_fd;
|
||||||
|
auto callback = cl.connect_callback;
|
||||||
|
cl.connect_callback = NULL;
|
||||||
|
callback(cl.osd_num, cl.peer_fd);
|
||||||
|
delete op;
|
||||||
|
};
|
||||||
|
outbox_push(cl, op);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Peering loop
|
// Peering loop
|
||||||
|
|
|
@ -233,6 +233,15 @@ void osd_t::handle_reply_hdr(osd_client_t *cl)
|
||||||
cl->read_buf = op->buf;
|
cl->read_buf = op->buf;
|
||||||
cl->read_remaining = sizeof(obj_ver_id) * op->reply.hdr.retval;
|
cl->read_remaining = sizeof(obj_ver_id) * op->reply.hdr.retval;
|
||||||
}
|
}
|
||||||
|
else if (op->reply.hdr.opcode == OSD_OP_SHOW_CONFIG &&
|
||||||
|
op->reply.hdr.retval > 0)
|
||||||
|
{
|
||||||
|
op->buf = malloc(op->reply.hdr.retval);
|
||||||
|
cl->read_state = CL_READ_REPLY_DATA;
|
||||||
|
cl->read_reply_id = op->req.hdr.id;
|
||||||
|
cl->read_buf = op->buf;
|
||||||
|
cl->read_remaining = op->reply.hdr.retval;
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
delete cl->read_op;
|
delete cl->read_op;
|
||||||
|
|
|
@ -102,7 +102,7 @@ void osd_t::exec_show_config(osd_op_t *cur_op)
|
||||||
std::string cfg_str = json11::Json(config).dump();
|
std::string cfg_str = json11::Json(config).dump();
|
||||||
cur_op->buf = malloc(cfg_str.size()+1);
|
cur_op->buf = malloc(cfg_str.size()+1);
|
||||||
memcpy(cur_op->buf, cfg_str.c_str(), cfg_str.size()+1);
|
memcpy(cur_op->buf, cfg_str.c_str(), cfg_str.size()+1);
|
||||||
cur_op->send_list.push_back(cur_op->buf, cur_op->reply.hdr.retval);
|
cur_op->send_list.push_back(cur_op->buf, cfg_str.size()+1);
|
||||||
finish_op(cur_op, cfg_str.size()+1);
|
finish_op(cur_op, cfg_str.size()+1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue