From 0df51e8b219e6b1a564f9cb8cddd4e1d756e76dd Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sun, 26 Feb 2023 02:53:24 +0300 Subject: [PATCH] Postpone handling incoming messages ? --- src/messenger.h | 1 + src/msgr_rdma.cpp | 20 +++++++++++++------- src/msgr_rdma.h | 7 +++++++ 3 files changed, 21 insertions(+), 7 deletions(-) diff --git a/src/messenger.h b/src/messenger.h index 238e5164..cf84af5a 100644 --- a/src/messenger.h +++ b/src/messenger.h @@ -134,6 +134,7 @@ protected: msgr_rdma_context_t *rdma_context = NULL; uint64_t rdma_max_sge = 0, rdma_max_send = 0, rdma_max_recv = 0; uint64_t rdma_max_msg = 0; + std::vector rdma_handle_buffers; #endif std::vector read_ready_clients; diff --git a/src/msgr_rdma.cpp b/src/msgr_rdma.cpp index 8e63195f..0147c6a7 100644 --- a/src/msgr_rdma.cpp +++ b/src/msgr_rdma.cpp @@ -492,13 +492,9 @@ void osd_messenger_t::handle_rdma_events() if (!is_send) { rc->cur_recv--; - if (!handle_read_buffer(cl, rc->recv_buffers[rc->next_recv_buf], wc[i].byte_len)) - { - // handle_read_buffer may stop the client - continue; - } - try_recv_rdma_wr(cl, rc->recv_buffers[rc->next_recv_buf]); - rc->next_recv_buf = (rc->next_recv_buf+1) % rc->recv_buffers.size(); + rdma_handle_buffers.push_back((rdma_hb_t){ .peer_fd = client_id, .buf = rc->recv_buffers[0], .len = wc[i].byte_len }); + rc->recv_buffers.erase(rc->recv_buffers.begin(), rc->recv_buffers.begin()+1); + try_recv_rdma(cl); } else { @@ -547,6 +543,16 @@ void osd_messenger_t::handle_rdma_events() } } } while (event_count > 0); + for (auto & hb: rdma_handle_buffers) + { + auto cl_it = clients.find(hb.peer_fd); + if (cl_it != clients.end()) + { + handle_read_buffer(cl_it->second, hb.buf, hb.len); + } + free(hb.buf); + } + rdma_handle_buffers.clear(); for (auto cb: set_immediate) { cb(); diff --git a/src/msgr_rdma.h b/src/msgr_rdma.h index 7c56f4a1..5cd8d10a 100644 --- a/src/msgr_rdma.h +++ b/src/msgr_rdma.h @@ -57,3 +57,10 @@ struct msgr_rdma_connection_t static msgr_rdma_connection_t *create(msgr_rdma_context_t *ctx, uint32_t max_send, uint32_t max_recv, uint32_t max_sge, uint32_t max_msg); int connect(msgr_rdma_address_t *dest); }; + +struct rdma_hb_t +{ + int peer_fd; + void *buf; + uint64_t len; +};