From 776fe954a5bb4ad93dbddce19b7b4051db574958 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sat, 17 Oct 2020 10:52:21 +0000 Subject: [PATCH] Fix crashes on multiple OSD reconnects Identify clients by pointers instead of peer_fd as peer may be dropped and reconnected between callbacks Yeah maybe I need some Rust, but ... maybe in the future :) --- messenger.cpp | 95 ++++++++++---------- messenger.h | 17 ++-- msgr_receive.cpp | 219 ++++++++++++++++++++++++----------------------- msgr_send.cpp | 159 +++++++++++++++++----------------- osd_peering.cpp | 4 +- osd_primary.cpp | 4 +- 6 files changed, 258 insertions(+), 240 deletions(-) diff --git a/messenger.cpp b/messenger.cpp index 2fa6f26d..c18f936e 100644 --- a/messenger.cpp +++ b/messenger.cpp @@ -102,7 +102,7 @@ void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer { timeout_id = tfd->set_timer(1000*peer_connect_timeout, false, [this, peer_fd](int timer_id) { - osd_num_t peer_osd = clients[peer_fd].osd_num; + osd_num_t peer_osd = clients[peer_fd]->osd_num; stop_client(peer_fd); on_connect_peer(peer_osd, -EIO); return; @@ -116,7 +116,7 @@ void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer return; } assert(peer_osd != this->osd_num); - clients[peer_fd] = (osd_client_t){ + clients[peer_fd] = new osd_client_t({ .peer_addr = addr, .peer_port = peer_port, .peer_fd = peer_fd, @@ -124,7 +124,7 @@ void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer .connect_timeout_id = timeout_id, .osd_num = peer_osd, .in_buf = malloc_or_die(receive_buffer_size), - }; + }); tfd->set_fd_handler(peer_fd, true, [this](int peer_fd, int epoll_events) { // Either OUT (connected) or HUP @@ -134,13 +134,13 @@ void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer void osd_messenger_t::handle_connect_epoll(int peer_fd) { - auto & cl = clients[peer_fd]; - if (cl.connect_timeout_id >= 0) + auto cl = clients[peer_fd]; + if (cl->connect_timeout_id >= 0) { - tfd->clear_timer(cl.connect_timeout_id); - cl.connect_timeout_id = -1; + tfd->clear_timer(cl->connect_timeout_id); + cl->connect_timeout_id = -1; } - osd_num_t peer_osd = cl.osd_num; + osd_num_t peer_osd = cl->osd_num; int result = 0; socklen_t result_len = sizeof(result); if (getsockopt(peer_fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0) @@ -155,7 +155,7 @@ void osd_messenger_t::handle_connect_epoll(int peer_fd) } int one = 1; setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); - cl.peer_state = PEER_CONNECTED; + cl->peer_state = PEER_CONNECTED; tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events) { handle_peer_epoll(peer_fd, epoll_events); @@ -176,11 +176,11 @@ void osd_messenger_t::handle_peer_epoll(int peer_fd, int epoll_events) else if (epoll_events & EPOLLIN) { // Mark client as ready (i.e. some data is available) - auto & cl = clients[peer_fd]; - cl.read_ready++; - if (cl.read_ready == 1) + auto cl = clients[peer_fd]; + cl->read_ready++; + if (cl->read_ready == 1) { - read_ready_clients.push_back(cl.peer_fd); + read_ready_clients.push_back(cl->peer_fd); if (ringloop) ringloop->wakeup(); else @@ -228,11 +228,11 @@ void osd_messenger_t::on_connect_peer(osd_num_t peer_osd, int peer_fd) repeer_pgs(peer_osd); } -void osd_messenger_t::check_peer_config(osd_client_t & cl) +void osd_messenger_t::check_peer_config(osd_client_t *cl) { osd_op_t *op = new osd_op_t(); op->op_type = OSD_OP_OUT; - op->peer_fd = cl.peer_fd; + op->peer_fd = cl->peer_fd; op->req = { .show_conf = { .header = { @@ -242,16 +242,15 @@ void osd_messenger_t::check_peer_config(osd_client_t & cl) }, }, }; - op->callback = [this](osd_op_t *op) + op->callback = [this, cl](osd_op_t *op) { - osd_client_t & cl = clients[op->peer_fd]; std::string json_err; json11::Json config; bool err = false; if (op->reply.hdr.retval < 0) { err = true; - printf("Failed to get config from OSD %lu (retval=%ld), disconnecting peer\n", cl.osd_num, op->reply.hdr.retval); + printf("Failed to get config from OSD %lu (retval=%ld), disconnecting peer\n", cl->osd_num, op->reply.hdr.retval); } else { @@ -259,45 +258,45 @@ void osd_messenger_t::check_peer_config(osd_client_t & cl) if (json_err != "") { err = true; - printf("Failed to get config from OSD %lu: bad JSON: %s, disconnecting peer\n", cl.osd_num, json_err.c_str()); + printf("Failed to get config from OSD %lu: bad JSON: %s, disconnecting peer\n", cl->osd_num, json_err.c_str()); } - else if (config["osd_num"].uint64_value() != cl.osd_num) + else if (config["osd_num"].uint64_value() != cl->osd_num) { err = true; - printf("Connected to OSD %lu instead of OSD %lu, peer state is outdated, disconnecting peer\n", config["osd_num"].uint64_value(), cl.osd_num); + printf("Connected to OSD %lu instead of OSD %lu, peer state is outdated, disconnecting peer\n", config["osd_num"].uint64_value(), cl->osd_num); } } if (err) { - osd_num_t osd_num = cl.osd_num; + osd_num_t osd_num = cl->osd_num; stop_client(op->peer_fd); on_connect_peer(osd_num, -1); delete op; return; } - osd_peer_fds[cl.osd_num] = cl.peer_fd; - on_connect_peer(cl.osd_num, cl.peer_fd); + osd_peer_fds[cl->osd_num] = cl->peer_fd; + on_connect_peer(cl->osd_num, cl->peer_fd); delete op; }; outbox_push(op); } -void osd_messenger_t::cancel_osd_ops(osd_client_t & cl) +void osd_messenger_t::cancel_osd_ops(osd_client_t *cl) { - for (auto p: cl.sent_ops) + for (auto p: cl->sent_ops) { cancel_op(p.second); } - cl.sent_ops.clear(); - for (auto op: cl.outbox) + cl->sent_ops.clear(); + for (auto op: cl->outbox) { cancel_op(op); } - cl.outbox.clear(); - if (cl.write_op) + cl->outbox.clear(); + if (cl->write_op) { - cancel_op(cl.write_op); - cl.write_op = NULL; + cancel_op(cl->write_op); + cl->write_op = NULL; } } @@ -328,15 +327,15 @@ void osd_messenger_t::stop_client(int peer_fd) return; } uint64_t repeer_osd = 0; - osd_client_t cl = it->second; - if (cl.peer_state == PEER_CONNECTED) + osd_client_t *cl = it->second; + if (cl->peer_state == PEER_CONNECTED) { - if (cl.osd_num) + if (cl->osd_num) { // Reload configuration from etcd when the connection is dropped if (log_level > 0) - printf("[OSD %lu] Stopping client %d (OSD peer %lu)\n", osd_num, peer_fd, cl.osd_num); - repeer_osd = cl.osd_num; + printf("[OSD %lu] Stopping client %d (OSD peer %lu)\n", osd_num, peer_fd, cl->osd_num); + repeer_osd = cl->osd_num; } else { @@ -344,18 +343,19 @@ void osd_messenger_t::stop_client(int peer_fd) printf("[OSD %lu] Stopping client %d (regular client)\n", osd_num, peer_fd); } } + cl->peer_state = PEER_STOPPED; clients.erase(it); tfd->set_fd_handler(peer_fd, false, NULL); - if (cl.osd_num) + if (cl->osd_num) { - osd_peer_fds.erase(cl.osd_num); + osd_peer_fds.erase(cl->osd_num); // Cancel outbound operations cancel_osd_ops(cl); } - if (cl.read_op) + if (cl->read_op) { - delete cl.read_op; - cl.read_op = NULL; + delete cl->read_op; + cl->read_op = NULL; } for (auto rit = read_ready_clients.begin(); rit != read_ready_clients.end(); rit++) { @@ -373,8 +373,13 @@ void osd_messenger_t::stop_client(int peer_fd) break; } } - free(cl.in_buf); + free(cl->in_buf); + cl->in_buf = NULL; close(peer_fd); + if (cl->refs <= 0) + { + delete cl; + } if (repeer_osd) { repeer_pgs(repeer_osd); @@ -396,13 +401,13 @@ void osd_messenger_t::accept_connections(int listen_fd) fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK); int one = 1; setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); - clients[peer_fd] = { + clients[peer_fd] = new osd_client_t({ .peer_addr = addr, .peer_port = ntohs(addr.sin_port), .peer_fd = peer_fd, .peer_state = PEER_CONNECTED, .in_buf = malloc_or_die(receive_buffer_size), - }; + }); // Add FD to epoll tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events) { diff --git a/messenger.h b/messenger.h index a81fd4f6..c718fc91 100644 --- a/messenger.h +++ b/messenger.h @@ -30,6 +30,7 @@ #define PEER_CONNECTING 1 #define PEER_CONNECTED 2 +#define PEER_STOPPED 3 #define DEFAULT_PEER_CONNECT_INTERVAL 5 #define DEFAULT_PEER_CONNECT_TIMEOUT 5 @@ -190,6 +191,8 @@ struct osd_op_t struct osd_client_t { + int refs = 0; + sockaddr_in peer_addr; int peer_port; int peer_fd; @@ -263,7 +266,7 @@ struct osd_messenger_t std::map osd_peer_fds; uint64_t next_subop_id = 1; - std::map clients; + std::map clients; std::vector read_ready_clients; std::vector write_ready_clients; std::vector> set_immediate; @@ -288,15 +291,15 @@ protected: void try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port); void handle_connect_epoll(int peer_fd); void on_connect_peer(osd_num_t peer_osd, int peer_fd); - void check_peer_config(osd_client_t & cl); - void cancel_osd_ops(osd_client_t & cl); + void check_peer_config(osd_client_t *cl); + void cancel_osd_ops(osd_client_t *cl); void cancel_op(osd_op_t *op); - bool try_send(osd_client_t & cl); - void handle_send(int result, int peer_fd); + bool try_send(osd_client_t *cl); + void handle_send(int result, osd_client_t *cl); - bool handle_read(int result, int peer_fd); - bool handle_finished_read(osd_client_t & cl); + bool handle_read(int result, osd_client_t *cl); + bool handle_finished_read(osd_client_t *cl); void handle_op_hdr(osd_client_t *cl); bool handle_reply_hdr(osd_client_t *cl); void handle_reply_ready(osd_op_t *op); diff --git a/msgr_receive.cpp b/msgr_receive.cpp index 8cf68b6c..2f959f45 100644 --- a/msgr_receive.cpp +++ b/msgr_receive.cpp @@ -8,21 +8,22 @@ void osd_messenger_t::read_requests() for (int i = 0; i < read_ready_clients.size(); i++) { int peer_fd = read_ready_clients[i]; - auto & cl = clients[peer_fd]; - if (cl.read_remaining < receive_buffer_size) + osd_client_t *cl = clients[peer_fd]; + if (cl->read_remaining < receive_buffer_size) { - cl.read_iov.iov_base = cl.in_buf; - cl.read_iov.iov_len = receive_buffer_size; - cl.read_msg.msg_iov = &cl.read_iov; - cl.read_msg.msg_iovlen = 1; + cl->read_iov.iov_base = cl->in_buf; + cl->read_iov.iov_len = receive_buffer_size; + cl->read_msg.msg_iov = &cl->read_iov; + cl->read_msg.msg_iovlen = 1; } else { - cl.read_iov.iov_base = 0; - cl.read_iov.iov_len = cl.read_remaining; - cl.read_msg.msg_iov = cl.recv_list.get_iovec(); - cl.read_msg.msg_iovlen = cl.recv_list.get_size(); + cl->read_iov.iov_base = 0; + cl->read_iov.iov_len = cl->read_remaining; + cl->read_msg.msg_iov = cl->recv_list.get_iovec(); + cl->read_msg.msg_iovlen = cl->recv_list.get_size(); } + cl->refs++; if (ringloop && !use_sync_send_recv) { io_uring_sqe* sqe = ringloop->get_sqe(); @@ -32,111 +33,115 @@ void osd_messenger_t::read_requests() return; } ring_data_t* data = ((ring_data_t*)sqe->user_data); - data->callback = [this, peer_fd](ring_data_t *data) { handle_read(data->res, peer_fd); }; - my_uring_prep_recvmsg(sqe, peer_fd, &cl.read_msg, 0); + data->callback = [this, cl](ring_data_t *data) { handle_read(data->res, cl); }; + my_uring_prep_recvmsg(sqe, peer_fd, &cl->read_msg, 0); } else { - int result = recvmsg(peer_fd, &cl.read_msg, 0); + int result = recvmsg(peer_fd, &cl->read_msg, 0); if (result < 0) { result = -errno; } - handle_read(result, peer_fd); + handle_read(result, cl); } } read_ready_clients.clear(); } -bool osd_messenger_t::handle_read(int result, int peer_fd) +bool osd_messenger_t::handle_read(int result, osd_client_t *cl) { bool ret = false; - auto cl_it = clients.find(peer_fd); - if (cl_it != clients.end()) + cl->refs--; + if (cl->peer_state == PEER_STOPPED) { - auto & cl = cl_it->second; - if (result <= 0 && result != -EAGAIN) + if (cl->refs <= 0) { - // this is a client socket, so don't panic on error. just disconnect it - if (result != 0) - { - printf("Client %d socket read error: %d (%s). Disconnecting client\n", peer_fd, -result, strerror(-result)); - } - stop_client(peer_fd); - return false; + delete cl; } - if (result == -EAGAIN || result < cl.read_iov.iov_len) + return false; + } + if (result <= 0 && result != -EAGAIN) + { + // this is a client socket, so don't panic on error. just disconnect it + if (result != 0) { - cl.read_ready--; - if (cl.read_ready > 0) - read_ready_clients.push_back(peer_fd); + printf("Client %d socket read error: %d (%s). Disconnecting client\n", cl->peer_fd, -result, strerror(-result)); + } + stop_client(cl->peer_fd); + return false; + } + if (result == -EAGAIN || result < cl->read_iov.iov_len) + { + cl->read_ready--; + if (cl->read_ready > 0) + read_ready_clients.push_back(cl->peer_fd); + } + else + { + read_ready_clients.push_back(cl->peer_fd); + } + if (result > 0) + { + if (cl->read_iov.iov_base == cl->in_buf) + { + // Compose operation(s) from the buffer + int remain = result; + void *curbuf = cl->in_buf; + while (remain > 0) + { + if (!cl->read_op) + { + cl->read_op = new osd_op_t; + cl->read_op->peer_fd = cl->peer_fd; + cl->read_op->op_type = OSD_OP_IN; + cl->recv_list.push_back(cl->read_op->req.buf, OSD_PACKET_SIZE); + cl->read_remaining = OSD_PACKET_SIZE; + cl->read_state = CL_READ_HDR; + } + while (cl->recv_list.done < cl->recv_list.count && remain > 0) + { + iovec* cur = cl->recv_list.get_iovec(); + if (cur->iov_len > remain) + { + memcpy(cur->iov_base, curbuf, remain); + cl->read_remaining -= remain; + cur->iov_len -= remain; + cur->iov_base += remain; + remain = 0; + } + else + { + memcpy(cur->iov_base, curbuf, cur->iov_len); + curbuf += cur->iov_len; + cl->read_remaining -= cur->iov_len; + remain -= cur->iov_len; + cur->iov_len = 0; + cl->recv_list.done++; + } + } + if (cl->recv_list.done >= cl->recv_list.count) + { + if (!handle_finished_read(cl)) + { + goto fin; + } + } + } } else { - read_ready_clients.push_back(peer_fd); + // Long data + cl->read_remaining -= result; + cl->recv_list.eat(result); + if (cl->recv_list.done >= cl->recv_list.count) + { + handle_finished_read(cl); + } } - if (result > 0) + if (result >= cl->read_iov.iov_len) { - if (cl.read_iov.iov_base == cl.in_buf) - { - // Compose operation(s) from the buffer - int remain = result; - void *curbuf = cl.in_buf; - while (remain > 0) - { - if (!cl.read_op) - { - cl.read_op = new osd_op_t; - cl.read_op->peer_fd = peer_fd; - cl.read_op->op_type = OSD_OP_IN; - cl.recv_list.push_back(cl.read_op->req.buf, OSD_PACKET_SIZE); - cl.read_remaining = OSD_PACKET_SIZE; - cl.read_state = CL_READ_HDR; - } - while (cl.recv_list.done < cl.recv_list.count && remain > 0) - { - iovec* cur = cl.recv_list.get_iovec(); - if (cur->iov_len > remain) - { - memcpy(cur->iov_base, curbuf, remain); - cl.read_remaining -= remain; - cur->iov_len -= remain; - cur->iov_base += remain; - remain = 0; - } - else - { - memcpy(cur->iov_base, curbuf, cur->iov_len); - curbuf += cur->iov_len; - cl.read_remaining -= cur->iov_len; - remain -= cur->iov_len; - cur->iov_len = 0; - cl.recv_list.done++; - } - } - if (cl.recv_list.done >= cl.recv_list.count) - { - if (!handle_finished_read(cl)) - { - goto fin; - } - } - } - } - else - { - // Long data - cl.read_remaining -= result; - cl.recv_list.eat(result); - if (cl.recv_list.done >= cl.recv_list.count) - { - handle_finished_read(cl); - } - } - if (result >= cl.read_iov.iov_len) - { - ret = true; - } + ret = true; } } fin: @@ -148,30 +153,30 @@ fin: return ret; } -bool osd_messenger_t::handle_finished_read(osd_client_t & cl) +bool osd_messenger_t::handle_finished_read(osd_client_t *cl) { - cl.recv_list.reset(); - if (cl.read_state == CL_READ_HDR) + cl->recv_list.reset(); + if (cl->read_state == CL_READ_HDR) { - if (cl.read_op->req.hdr.magic == SECONDARY_OSD_REPLY_MAGIC) - return handle_reply_hdr(&cl); + if (cl->read_op->req.hdr.magic == SECONDARY_OSD_REPLY_MAGIC) + return handle_reply_hdr(cl); else - handle_op_hdr(&cl); + handle_op_hdr(cl); } - else if (cl.read_state == CL_READ_DATA) + else if (cl->read_state == CL_READ_DATA) { // Operation is ready - cl.received_ops.push_back(cl.read_op); - set_immediate.push_back([this, op = cl.read_op]() { exec_op(op); }); - cl.read_op = NULL; - cl.read_state = 0; + cl->received_ops.push_back(cl->read_op); + set_immediate.push_back([this, op = cl->read_op]() { exec_op(op); }); + cl->read_op = NULL; + cl->read_state = 0; } - else if (cl.read_state == CL_READ_REPLY_DATA) + else if (cl->read_state == CL_READ_REPLY_DATA) { // Reply is ready - handle_reply_ready(cl.read_op); - cl.read_op = NULL; - cl.read_state = 0; + handle_reply_ready(cl->read_op); + cl->read_op = NULL; + cl->read_state = 0; } else { diff --git a/msgr_send.cpp b/msgr_send.cpp index 54376f6f..4ed0d001 100644 --- a/msgr_send.cpp +++ b/msgr_send.cpp @@ -6,7 +6,7 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op) { assert(cur_op->peer_fd); - auto & cl = clients.at(cur_op->peer_fd); + osd_client_t *cl = clients.at(cur_op->peer_fd); if (cur_op->op_type == OSD_OP_OUT) { clock_gettime(CLOCK_REALTIME, &cur_op->tv_begin); @@ -15,12 +15,12 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op) { // Check that operation actually belongs to this client bool found = false; - for (auto it = cl.received_ops.begin(); it != cl.received_ops.end(); it++) + for (auto it = cl->received_ops.begin(); it != cl->received_ops.end(); it++) { if (*it == cur_op) { found = true; - cl.received_ops.erase(it, it+1); + cl->received_ops.erase(it, it+1); break; } } @@ -30,85 +30,86 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op) return; } } - cl.outbox.push_back(cur_op); + cl->outbox.push_back(cur_op); if (!ringloop) { - while (cl.write_op || cl.outbox.size()) + while (cl->write_op || cl->outbox.size()) { try_send(cl); } } - else if (cl.write_op || cl.outbox.size() > 1 || !try_send(cl)) + else if (cl->write_op || cl->outbox.size() > 1 || !try_send(cl)) { - if (cl.write_state == 0) + if (cl->write_state == 0) { - cl.write_state = CL_WRITE_READY; + cl->write_state = CL_WRITE_READY; write_ready_clients.push_back(cur_op->peer_fd); } ringloop->wakeup(); } } -bool osd_messenger_t::try_send(osd_client_t & cl) +bool osd_messenger_t::try_send(osd_client_t *cl) { - int peer_fd = cl.peer_fd; - if (!cl.write_op) + int peer_fd = cl->peer_fd; + if (!cl->write_op) { // pick next command - cl.write_op = cl.outbox.front(); - cl.outbox.pop_front(); - cl.write_state = CL_WRITE_REPLY; - if (cl.write_op->op_type == OSD_OP_IN) + cl->write_op = cl->outbox.front(); + cl->outbox.pop_front(); + cl->write_state = CL_WRITE_REPLY; + if (cl->write_op->op_type == OSD_OP_IN) { // Measure execution latency timespec tv_end; clock_gettime(CLOCK_REALTIME, &tv_end); - stats.op_stat_count[cl.write_op->req.hdr.opcode]++; - if (!stats.op_stat_count[cl.write_op->req.hdr.opcode]) + stats.op_stat_count[cl->write_op->req.hdr.opcode]++; + if (!stats.op_stat_count[cl->write_op->req.hdr.opcode]) { - stats.op_stat_count[cl.write_op->req.hdr.opcode]++; - stats.op_stat_sum[cl.write_op->req.hdr.opcode] = 0; - stats.op_stat_bytes[cl.write_op->req.hdr.opcode] = 0; + stats.op_stat_count[cl->write_op->req.hdr.opcode]++; + stats.op_stat_sum[cl->write_op->req.hdr.opcode] = 0; + stats.op_stat_bytes[cl->write_op->req.hdr.opcode] = 0; } - stats.op_stat_sum[cl.write_op->req.hdr.opcode] += ( - (tv_end.tv_sec - cl.write_op->tv_begin.tv_sec)*1000000 + - (tv_end.tv_nsec - cl.write_op->tv_begin.tv_nsec)/1000 + stats.op_stat_sum[cl->write_op->req.hdr.opcode] += ( + (tv_end.tv_sec - cl->write_op->tv_begin.tv_sec)*1000000 + + (tv_end.tv_nsec - cl->write_op->tv_begin.tv_nsec)/1000 ); - if (cl.write_op->req.hdr.opcode == OSD_OP_READ || - cl.write_op->req.hdr.opcode == OSD_OP_WRITE) + if (cl->write_op->req.hdr.opcode == OSD_OP_READ || + cl->write_op->req.hdr.opcode == OSD_OP_WRITE) { - stats.op_stat_bytes[cl.write_op->req.hdr.opcode] += cl.write_op->req.rw.len; + stats.op_stat_bytes[cl->write_op->req.hdr.opcode] += cl->write_op->req.rw.len; } - else if (cl.write_op->req.hdr.opcode == OSD_OP_SEC_READ || - cl.write_op->req.hdr.opcode == OSD_OP_SEC_WRITE || - cl.write_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE) + else if (cl->write_op->req.hdr.opcode == OSD_OP_SEC_READ || + cl->write_op->req.hdr.opcode == OSD_OP_SEC_WRITE || + cl->write_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE) { - stats.op_stat_bytes[cl.write_op->req.hdr.opcode] += cl.write_op->req.sec_rw.len; + stats.op_stat_bytes[cl->write_op->req.hdr.opcode] += cl->write_op->req.sec_rw.len; } - cl.send_list.push_back(cl.write_op->reply.buf, OSD_PACKET_SIZE); - if (cl.write_op->req.hdr.opcode == OSD_OP_READ || - cl.write_op->req.hdr.opcode == OSD_OP_SEC_READ || - cl.write_op->req.hdr.opcode == OSD_OP_SEC_LIST || - cl.write_op->req.hdr.opcode == OSD_OP_SHOW_CONFIG) + cl->send_list.push_back(cl->write_op->reply.buf, OSD_PACKET_SIZE); + if (cl->write_op->req.hdr.opcode == OSD_OP_READ || + cl->write_op->req.hdr.opcode == OSD_OP_SEC_READ || + cl->write_op->req.hdr.opcode == OSD_OP_SEC_LIST || + cl->write_op->req.hdr.opcode == OSD_OP_SHOW_CONFIG) { - cl.send_list.append(cl.write_op->iov); + cl->send_list.append(cl->write_op->iov); } } else { - cl.send_list.push_back(cl.write_op->req.buf, OSD_PACKET_SIZE); - if (cl.write_op->req.hdr.opcode == OSD_OP_WRITE || - cl.write_op->req.hdr.opcode == OSD_OP_SEC_WRITE || - cl.write_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE || - cl.write_op->req.hdr.opcode == OSD_OP_SEC_STABILIZE || - cl.write_op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK) + cl->send_list.push_back(cl->write_op->req.buf, OSD_PACKET_SIZE); + if (cl->write_op->req.hdr.opcode == OSD_OP_WRITE || + cl->write_op->req.hdr.opcode == OSD_OP_SEC_WRITE || + cl->write_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE || + cl->write_op->req.hdr.opcode == OSD_OP_SEC_STABILIZE || + cl->write_op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK) { - cl.send_list.append(cl.write_op->iov); + cl->send_list.append(cl->write_op->iov); } } } - cl.write_msg.msg_iov = cl.send_list.get_iovec(); - cl.write_msg.msg_iovlen = cl.send_list.get_size(); + cl->write_msg.msg_iov = cl->send_list.get_iovec(); + cl->write_msg.msg_iovlen = cl->send_list.get_size(); + cl->refs++; if (ringloop && !use_sync_send_recv) { io_uring_sqe* sqe = ringloop->get_sqe(); @@ -117,17 +118,17 @@ bool osd_messenger_t::try_send(osd_client_t & cl) return false; } ring_data_t* data = ((ring_data_t*)sqe->user_data); - data->callback = [this, peer_fd](ring_data_t *data) { handle_send(data->res, peer_fd); }; - my_uring_prep_sendmsg(sqe, peer_fd, &cl.write_msg, 0); + data->callback = [this, cl](ring_data_t *data) { handle_send(data->res, cl); }; + my_uring_prep_sendmsg(sqe, peer_fd, &cl->write_msg, 0); } else { - int result = sendmsg(peer_fd, &cl.write_msg, MSG_NOSIGNAL); + int result = sendmsg(peer_fd, &cl->write_msg, MSG_NOSIGNAL); if (result < 0) { result = -errno; } - handle_send(result, peer_fd); + handle_send(result, cl); } return true; } @@ -146,41 +147,45 @@ void osd_messenger_t::send_replies() write_ready_clients.clear(); } -void osd_messenger_t::handle_send(int result, int peer_fd) +void osd_messenger_t::handle_send(int result, osd_client_t *cl) { - auto cl_it = clients.find(peer_fd); - if (cl_it != clients.end()) + cl->refs--; + if (cl->peer_state == PEER_STOPPED) { - auto & cl = cl_it->second; - if (result < 0 && result != -EAGAIN) + if (!cl->refs) { - // this is a client socket, so don't panic. just disconnect it - printf("Client %d socket write error: %d (%s). Disconnecting client\n", peer_fd, -result, strerror(-result)); - stop_client(peer_fd); - return; + delete cl; } - if (result >= 0) + return; + } + if (result < 0 && result != -EAGAIN) + { + // this is a client socket, so don't panic. just disconnect it + printf("Client %d socket write error: %d (%s). Disconnecting client\n", cl->peer_fd, -result, strerror(-result)); + stop_client(cl->peer_fd); + return; + } + if (result >= 0) + { + cl->send_list.eat(result); + if (cl->send_list.done >= cl->send_list.count) { - cl.send_list.eat(result); - if (cl.send_list.done >= cl.send_list.count) + // Done + cl->send_list.reset(); + if (cl->write_op->op_type == OSD_OP_IN) { - // Done - cl.send_list.reset(); - if (cl.write_op->op_type == OSD_OP_IN) - { - delete cl.write_op; - } - else - { - cl.sent_ops[cl.write_op->req.hdr.id] = cl.write_op; - } - cl.write_op = NULL; - cl.write_state = cl.outbox.size() > 0 ? CL_WRITE_READY : 0; + delete cl->write_op; } - } - if (cl.write_state != 0) - { - write_ready_clients.push_back(peer_fd); + else + { + cl->sent_ops[cl->write_op->req.hdr.id] = cl->write_op; + } + cl->write_op = NULL; + cl->write_state = cl->outbox.size() > 0 ? CL_WRITE_READY : 0; } } + if (cl->write_state != 0) + { + write_ready_clients.push_back(cl->peer_fd); + } } diff --git a/osd_peering.cpp b/osd_peering.cpp index 9f911273..b29c8385 100644 --- a/osd_peering.cpp +++ b/osd_peering.cpp @@ -141,7 +141,7 @@ void osd_t::start_pg_peering(pg_t & pg) std::vector to_stop; for (auto & cp: c_cli.clients) { - if (cp.second.dirty_pgs.find({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }) != cp.second.dirty_pgs.end()) + if (cp.second->dirty_pgs.find({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }) != cp.second->dirty_pgs.end()) { to_stop.push_back(cp.first); } @@ -308,7 +308,7 @@ void osd_t::submit_sync_and_list_subop(osd_num_t role_osd, pg_peering_state_t *p auto & cl = c_cli.clients.at(c_cli.osd_peer_fds[role_osd]); osd_op_t *op = new osd_op_t(); op->op_type = OSD_OP_OUT; - op->peer_fd = cl.peer_fd; + op->peer_fd = cl->peer_fd; op->req = { .sec_sync = { .header = { diff --git a/osd_primary.cpp b/osd_primary.cpp index 6d406191..434d92f6 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -461,7 +461,7 @@ resume_7: } // Remember PG as dirty to drop the connection when PG goes offline // (this is required because of the "lazy sync") - c_cli.clients[cur_op->peer_fd].dirty_pgs.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }); + c_cli.clients[cur_op->peer_fd]->dirty_pgs.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }); dirty_pgs.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }); } return true; @@ -651,7 +651,7 @@ finish: { auto it = c_cli.clients.find(cur_op->peer_fd); if (it != c_cli.clients.end()) - it->second.dirty_pgs.clear(); + it->second->dirty_pgs.clear(); } finish_op(cur_op, 0); }