diff --git a/osd.cpp b/osd.cpp index 3c6d0971..35bfd8e2 100644 --- a/osd.cpp +++ b/osd.cpp @@ -292,7 +292,8 @@ restart: while ((peer_fd = accept(listen_fd, (sockaddr*)&addr, &peer_addr_size)) >= 0) { char peer_str[256]; - printf("osd: new client %d: connection from %s port %d\n", peer_fd, inet_ntop(AF_INET, &addr.sin_addr, peer_str, 256), ntohs(addr.sin_port)); + printf("[OSD %lu] new client %d: connection from %s port %d\n", this->osd_num, peer_fd, + inet_ntop(AF_INET, &addr.sin_addr, peer_str, 256), ntohs(addr.sin_port)); fcntl(peer_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK); int one = 1; setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); @@ -333,7 +334,7 @@ restart: else if (events[i].events & EPOLLRDHUP) { // Stop client - printf("osd: client %d disconnected\n", cl.peer_fd); + printf("[OSD %lu] client %d disconnected\n", this->osd_num, cl.peer_fd); stop_client(cl.peer_fd); } else diff --git a/osd.h b/osd.h index d95ce141..96af32ce 100644 --- a/osd.h +++ b/osd.h @@ -297,6 +297,7 @@ class osd_t // peer handling (primary OSD logic) void connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port, std::function callback); void handle_connect_result(int peer_fd); + void check_peer_config(osd_client_t & cl); void cancel_osd_ops(osd_client_t & cl); void cancel_op(osd_op_t *op); void stop_client(int peer_fd); diff --git a/osd_cluster.cpp b/osd_cluster.cpp index 0acfd98b..f95ca6a8 100644 --- a/osd_cluster.cpp +++ b/osd_cluster.cpp @@ -14,7 +14,7 @@ void osd_t::init_cluster() { report_status(); } - printf("OSD %lu reporting to Consul at %s each %d seconds\n", osd_num, consul_address.c_str(), consul_report_interval); + printf("[OSD %lu] reporting to Consul at %s each %d seconds\n", osd_num, consul_address.c_str(), consul_report_interval); this->consul_tfd = new timerfd_interval(ringloop, consul_report_interval, [this]() { report_status(); diff --git a/osd_peering.cpp b/osd_peering.cpp index d90e7c94..784e0e24 100644 --- a/osd_peering.cpp +++ b/osd_peering.cpp @@ -54,7 +54,6 @@ void osd_t::connect_peer(osd_num_t peer_osd, const char *peer_host, int peer_por .osd_num = peer_osd, .in_buf = malloc(receive_buffer_size), }; - osd_peer_fds[peer_osd] = peer_fd; // Add FD to epoll (EPOLLOUT for tracking connect() result) epoll_event ev; ev.data.fd = peer_fd; @@ -74,7 +73,6 @@ void osd_t::handle_connect_result(int peer_fd) cl.connect_timeout_id = -1; } osd_num_t peer_osd = cl.osd_num; - auto callback = cl.connect_callback; int result = 0; socklen_t result_len = sizeof(result); if (getsockopt(peer_fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0) @@ -83,6 +81,7 @@ void osd_t::handle_connect_result(int peer_fd) } if (result != 0) { + auto callback = cl.connect_callback; stop_client(peer_fd); callback(peer_osd, -result); return; @@ -90,7 +89,6 @@ void osd_t::handle_connect_result(int peer_fd) int one = 1; setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); // Disable EPOLLOUT on this fd - cl.connect_callback = NULL; cl.peer_state = PEER_CONNECTED; epoll_event ev; ev.data.fd = peer_fd; @@ -99,7 +97,59 @@ void osd_t::handle_connect_result(int peer_fd) { throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); } - callback(peer_osd, peer_fd); + // Check OSD number + check_peer_config(cl); +} + +void osd_t::check_peer_config(osd_client_t & cl) +{ + osd_op_t *op = new osd_op_t(); + op->op_type = OSD_OP_OUT; + op->send_list.push_back(op->req.buf, OSD_PACKET_SIZE); + op->peer_fd = cl.peer_fd; + op->req = { + .show_conf = { + .header = { + .magic = SECONDARY_OSD_OP_MAGIC, + .id = this->next_subop_id++, + .opcode = OSD_OP_SHOW_CONFIG, + }, + }, + }; + op->callback = [this](osd_op_t *op) + { + std::string json_err; + json11::Json config = json11::Json::parse(std::string((char*)op->buf), json_err); + osd_client_t & cl = clients[op->peer_fd]; + bool err = false; + if (op->reply.hdr.retval < 0) + { + err = true; + printf("Failed to get config from OSD %lu (retval=%ld), disconnecting peer\n", cl.osd_num, op->reply.hdr.retval); + } + else if (json_err != "") + { + err = true; + printf("Failed to get config from OSD %lu: bad JSON: %s, disconnecting peer\n", cl.osd_num, json_err.c_str()); + } + else if (config["osd_num"].uint64_value() != cl.osd_num) + { + err = true; + printf("Connected to OSD %lu instead of OSD %lu, peer state is outdated, disconnecting peer\n", config["osd_num"].uint64_value(), cl.osd_num); + } + if (err) + { + stop_client(op->peer_fd); + delete op; + return; + } + osd_peer_fds[cl.osd_num] = cl.peer_fd; + auto callback = cl.connect_callback; + cl.connect_callback = NULL; + callback(cl.osd_num, cl.peer_fd); + delete op; + }; + outbox_push(cl, op); } // Peering loop diff --git a/osd_receive.cpp b/osd_receive.cpp index a383691c..06e4f7d1 100644 --- a/osd_receive.cpp +++ b/osd_receive.cpp @@ -233,6 +233,15 @@ void osd_t::handle_reply_hdr(osd_client_t *cl) cl->read_buf = op->buf; cl->read_remaining = sizeof(obj_ver_id) * op->reply.hdr.retval; } + else if (op->reply.hdr.opcode == OSD_OP_SHOW_CONFIG && + op->reply.hdr.retval > 0) + { + op->buf = malloc(op->reply.hdr.retval); + cl->read_state = CL_READ_REPLY_DATA; + cl->read_reply_id = op->req.hdr.id; + cl->read_buf = op->buf; + cl->read_remaining = op->reply.hdr.retval; + } else { delete cl->read_op; diff --git a/osd_secondary.cpp b/osd_secondary.cpp index c6cc513f..0df20b53 100644 --- a/osd_secondary.cpp +++ b/osd_secondary.cpp @@ -102,7 +102,7 @@ void osd_t::exec_show_config(osd_op_t *cur_op) std::string cfg_str = json11::Json(config).dump(); cur_op->buf = malloc(cfg_str.size()+1); memcpy(cur_op->buf, cfg_str.c_str(), cfg_str.size()+1); - cur_op->send_list.push_back(cur_op->buf, cur_op->reply.hdr.retval); + cur_op->send_list.push_back(cur_op->buf, cfg_str.size()+1); finish_op(cur_op, cfg_str.size()+1); }