diff --git a/cluster_client.cpp b/cluster_client.cpp index a460a61f..c88f25dc 100644 --- a/cluster_client.cpp +++ b/cluster_client.cpp @@ -46,6 +46,8 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd msgr.stop_client(op->peer_fd); delete op; }; + msgr.use_sync_send_recv = config["use_sync_send_recv"].bool_value() || + config["use_sync_send_recv"].uint64_value(); st_cli.tfd = tfd; st_cli.on_load_config_hook = [this](json11::Json::object & cfg) { on_load_config_hook(cfg); }; diff --git a/fio_cluster.cpp b/fio_cluster.cpp index 6c28a6a9..eb106c9a 100644 --- a/fio_cluster.cpp +++ b/fio_cluster.cpp @@ -3,17 +3,17 @@ // Random write: // // fio -thread -ioengine=./libfio_cluster.so -name=test -bs=4k -direct=1 -fsync=16 -iodepth=16 -rw=randwrite \ -// -etcd=127.0.0.1:2379 [-etcd_prefix=/microceph] -size=1000M +// -etcd=127.0.0.1:2379 [-etcd_prefix=/microceph] -inode=1 -size=1000M // // Linear write: // // fio -thread -ioengine=./libfio_cluster.so -name=test -bs=128k -direct=1 -fsync=32 -iodepth=32 -rw=write \ -// -etcd=127.0.0.1:2379 [-etcd_prefix=/microceph] -size=1000M +// -etcd=127.0.0.1:2379 [-etcd_prefix=/microceph] -inode=1 -size=1000M // // Random read (run with -iodepth=32 or -iodepth=1): // // fio -thread -ioengine=./libfio_cluster.so -name=test -bs=4k -direct=1 -iodepth=32 -rw=randread \ -// -etcd=127.0.0.1:2379 [-etcd_prefix=/microceph] -size=1000M +// -etcd=127.0.0.1:2379 [-etcd_prefix=/microceph] -inode=1 -size=1000M #include #include diff --git a/messenger.cpp b/messenger.cpp index 9c867036..a44aaf7c 100644 --- a/messenger.cpp +++ b/messenger.cpp @@ -169,7 +169,10 @@ void osd_messenger_t::handle_peer_epoll(int peer_fd, int epoll_events) if (cl.read_ready == 1) { read_ready_clients.push_back(cl.peer_fd); - ringloop->wakeup(); + if (ringloop) + ringloop->wakeup(); + else + read_requests(); } } } diff --git a/messenger.h b/messenger.h index a5729ca0..4dcf6ecd 100644 --- a/messenger.h +++ b/messenger.h @@ -233,6 +233,7 @@ struct osd_messenger_t int peer_connect_interval = DEFAULT_PEER_CONNECT_INTERVAL; int peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT; int log_level = 0; + bool use_sync_send_recv = false; std::map wanted_peers; std::map osd_peer_fds; diff --git a/msgr_receive.cpp b/msgr_receive.cpp index ac0edcfb..e956622b 100644 --- a/msgr_receive.cpp +++ b/msgr_receive.cpp @@ -6,13 +6,6 @@ void osd_messenger_t::read_requests() { int peer_fd = read_ready_clients[i]; auto & cl = clients[peer_fd]; - io_uring_sqe* sqe = ringloop->get_sqe(); - if (!sqe) - { - read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients.begin() + i); - return; - } - ring_data_t* data = ((ring_data_t*)sqe->user_data); if (cl.read_remaining < receive_buffer_size) { cl.read_iov.iov_base = cl.in_buf; @@ -27,8 +20,27 @@ void osd_messenger_t::read_requests() cl.read_msg.msg_iov = cl.recv_list.get_iovec(); cl.read_msg.msg_iovlen = cl.recv_list.get_size(); } - data->callback = [this, peer_fd](ring_data_t *data) { handle_read(data->res, peer_fd); }; - my_uring_prep_recvmsg(sqe, peer_fd, &cl.read_msg, 0); + if (ringloop && !use_sync_send_recv) + { + io_uring_sqe* sqe = ringloop->get_sqe(); + if (!sqe) + { + read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients.begin() + i); + return; + } + ring_data_t* data = ((ring_data_t*)sqe->user_data); + data->callback = [this, peer_fd](ring_data_t *data) { handle_read(data->res, peer_fd); }; + my_uring_prep_recvmsg(sqe, peer_fd, &cl.read_msg, 0); + } + else + { + int result = recvmsg(peer_fd, &cl.read_msg, 0); + if (result < 0) + { + result = -errno; + } + handle_read(result, peer_fd); + } } read_ready_clients.clear(); } diff --git a/msgr_send.cpp b/msgr_send.cpp index 0f9fde6e..200ebe39 100644 --- a/msgr_send.cpp +++ b/msgr_send.cpp @@ -42,12 +42,6 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op) bool osd_messenger_t::try_send(osd_client_t & cl) { int peer_fd = cl.peer_fd; - io_uring_sqe* sqe = ringloop->get_sqe(); - if (!sqe) - { - return false; - } - ring_data_t* data = ((ring_data_t*)sqe->user_data); if (!cl.write_op) { // pick next command @@ -103,8 +97,26 @@ bool osd_messenger_t::try_send(osd_client_t & cl) } cl.write_msg.msg_iov = cl.send_list.get_iovec(); cl.write_msg.msg_iovlen = cl.send_list.get_size(); - data->callback = [this, peer_fd](ring_data_t *data) { handle_send(data->res, peer_fd); }; - my_uring_prep_sendmsg(sqe, peer_fd, &cl.write_msg, 0); + if (ringloop && !use_sync_send_recv) + { + io_uring_sqe* sqe = ringloop->get_sqe(); + if (!sqe) + { + return false; + } + ring_data_t* data = ((ring_data_t*)sqe->user_data); + data->callback = [this, peer_fd](ring_data_t *data) { handle_send(data->res, peer_fd); }; + my_uring_prep_sendmsg(sqe, peer_fd, &cl.write_msg, 0); + } + else + { + int result = sendmsg(peer_fd, &cl.write_msg, MSG_NOSIGNAL); + if (result < 0) + { + result = -errno; + } + handle_send(result, peer_fd); + } return true; }