Compare commits

...

1 Commits

Author SHA1 Message Date
Vitaliy Filippov 94363abb73 Implement io_uring zero-copy send support 2025-04-04 01:31:43 +03:00
8 changed files with 133 additions and 11 deletions

View File

@ -34,6 +34,7 @@ between clients, OSDs and etcd.
- [etcd_ws_keepalive_interval](#etcd_ws_keepalive_interval)
- [etcd_min_reload_interval](#etcd_min_reload_interval)
- [tcp_header_buffer_size](#tcp_header_buffer_size)
- [min_zerocopy_send_size](#min_zerocopy_send_size)
- [use_sync_send_recv](#use_sync_send_recv)
## osd_network
@ -313,6 +314,17 @@ is received without an additional copy. You can try to play with this
parameter and see how it affects random iops and linear bandwidth if you
want.
## min_zerocopy_send_size
- Type: integer
- Default: 12288
OSDs and clients will attempt to use io_uring-based zero-copy TCP send
for buffers larger than this number of bytes. Zero-copy send with io_uring is
supported since Linux kernel version 6.1. Support is auto-detected and disabled
automatically when not available. It can also be disabled explicitly by setting
this parameter to a negative value.
## use_sync_send_recv
- Type: boolean

View File

@ -34,6 +34,7 @@
- [etcd_ws_keepalive_interval](#etcd_ws_keepalive_interval)
- [etcd_min_reload_interval](#etcd_min_reload_interval)
- [tcp_header_buffer_size](#tcp_header_buffer_size)
- [min_zerocopy_send_size](#min_zerocopy_send_size)
- [use_sync_send_recv](#use_sync_send_recv)
## osd_network
@ -321,6 +322,17 @@ Vitastor содержат 128-байтные заголовки, за котор
поменять этот параметр и посмотреть, как он влияет на производительность
случайного и линейного доступа.
## min_zerocopy_send_size
- Тип: целое число
- Значение по умолчанию: 12288
OSD и клиенты будут пробовать использовать TCP-отправку без копирования (zero-copy) на
основе io_uring для буферов больших, чем это число байт. Отправка без копирования
поддерживается в io_uring, начиная с версии ядра Linux 6.1. Наличие поддержки
проверяется автоматически и zero-copy отключается, когда поддержки нет. Также
её можно отключить явно, установив данный параметр в отрицательное значение.
## use_sync_send_recv
- Тип: булево (да/нет)

View File

@ -373,6 +373,21 @@
параметра читается без дополнительного копирования. Вы можете попробовать
поменять этот параметр и посмотреть, как он влияет на производительность
случайного и линейного доступа.
- name: min_zerocopy_send_size
type: int
default: 12288
info: |
OSDs and clients will attempt to use io_uring-based zero-copy TCP send
for buffers larger than this number of bytes. Zero-copy send with io_uring is
supported since Linux kernel version 6.1. Support is auto-detected and disabled
automatically when not available. It can also be disabled explicitly by setting
this parameter to a negative value.
info_ru: |
OSD и клиенты будут пробовать использовать TCP-отправку без копирования (zero-copy) на
основе io_uring для буферов больших, чем это число байт. Отправка без копирования
поддерживается в io_uring, начиная с версии ядра Linux 6.1. Наличие поддержки
проверяется автоматически и zero-copy отключается, когда поддержки нет. Также
её можно отключить явно, установив данный параметр в отрицательное значение.
- name: use_sync_send_recv
type: bool
default: false

View File

@ -167,6 +167,10 @@ void osd_messenger_t::init()
}
}
#endif
if (ringloop)
{
has_sendmsg_zc = ringloop->has_sendmsg_zc();
}
if (ringloop && iothread_count > 0)
{
for (int i = 0; i < iothread_count; i++)
@ -329,6 +333,9 @@ void osd_messenger_t::parse_config(const json11::Json & config)
this->receive_buffer_size = 65536;
this->use_sync_send_recv = config["use_sync_send_recv"].bool_value() ||
config["use_sync_send_recv"].uint64_value();
this->min_zerocopy_send_size = config["min_zerocopy_send_size"].is_null()
? 12*1024
: (int)config["min_zerocopy_send_size"].int64_value();
this->peer_connect_interval = config["peer_connect_interval"].uint64_value();
if (!this->peer_connect_interval)
this->peer_connect_interval = 5;
@ -896,6 +903,7 @@ static const char* local_only_params[] = {
"tcp_header_buffer_size",
"use_rdma",
"use_sync_send_recv",
"min_zerocopy_send_size",
};
static const char **local_only_end = local_only_params + (sizeof(local_only_params)/sizeof(local_only_params[0]));

View File

@ -88,6 +88,7 @@ struct osd_client_t
int write_state = 0;
std::vector<iovec> send_list, next_send_list;
std::vector<msgr_sendp_t> outbox, next_outbox;
std::vector<osd_op_t*> zc_free_list;
~osd_client_t();
};
@ -175,6 +176,7 @@ protected:
int osd_ping_timeout = 0;
int log_level = 0;
bool use_sync_send_recv = false;
int min_zerocopy_send_size = 12*1024;
int iothread_count = 0;
#ifdef WITH_RDMA
@ -201,8 +203,9 @@ protected:
std::vector<osd_op_t*> set_immediate_ops;
public:
timerfd_manager_t *tfd;
ring_loop_t *ringloop;
timerfd_manager_t *tfd = NULL;
ring_loop_t *ringloop = NULL;
bool has_sendmsg_zc = false;
// osd_num_t is only for logging and asserts
osd_num_t osd_num;
uint64_t next_subop_id = 1;
@ -261,7 +264,7 @@ protected:
void cancel_op(osd_op_t *op);
bool try_send(osd_client_t *cl);
void handle_send(int result, osd_client_t *cl);
void handle_send(int result, bool prev, bool more, osd_client_t *cl);
bool handle_read(int result, osd_client_t *cl);
bool handle_read_buffer(osd_client_t *cl, void *curbuf, int remain);

View File

@ -203,8 +203,24 @@ bool osd_messenger_t::try_send(osd_client_t *cl)
cl->write_msg.msg_iovlen = cl->send_list.size() < IOV_MAX ? cl->send_list.size() : IOV_MAX;
cl->refs++;
ring_data_t* data = ((ring_data_t*)sqe->user_data);
data->callback = [this, cl](ring_data_t *data) { handle_send(data->res, cl); };
my_uring_prep_sendmsg(sqe, peer_fd, &cl->write_msg, 0);
data->callback = [this, cl](ring_data_t *data) { handle_send(data->res, data->prev, data->more, cl); };
bool use_zc = has_sendmsg_zc && min_zerocopy_send_size >= 0;
if (use_zc && min_zerocopy_send_size > 0)
{
size_t avg_size = 0;
for (size_t i = 0; i < cl->write_msg.msg_iovlen; i++)
avg_size += cl->write_msg.msg_iov[i].iov_len;
if (avg_size/cl->write_msg.msg_iovlen < min_zerocopy_send_size)
use_zc = false;
}
if (use_zc)
{
my_uring_prep_sendmsg_zc(sqe, peer_fd, &cl->write_msg, MSG_WAITALL);
}
else
{
my_uring_prep_sendmsg(sqe, peer_fd, &cl->write_msg, MSG_WAITALL);
}
if (iothread)
{
iothread->add_sqe(sqe_local);
@ -220,7 +236,7 @@ bool osd_messenger_t::try_send(osd_client_t *cl)
{
result = -errno;
}
handle_send(result, cl);
handle_send(result, false, false, cl);
}
return true;
}
@ -240,10 +256,16 @@ void osd_messenger_t::send_replies()
write_ready_clients.clear();
}
void osd_messenger_t::handle_send(int result, osd_client_t *cl)
void osd_messenger_t::handle_send(int result, bool prev, bool more, osd_client_t *cl)
{
cl->write_msg.msg_iovlen = 0;
cl->refs--;
if (!prev)
{
cl->write_msg.msg_iovlen = 0;
}
if (!more)
{
cl->refs--;
}
if (cl->peer_state == PEER_STOPPED)
{
if (cl->refs <= 0)
@ -261,6 +283,16 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl)
}
if (result >= 0)
{
if (prev)
{
// Second notification - only free a batch of postponed ops
int i = 0;
for (; i < cl->zc_free_list.size() && cl->zc_free_list[i]; i++)
delete cl->zc_free_list[i];
if (i > 0)
cl->zc_free_list.erase(cl->zc_free_list.begin(), cl->zc_free_list.begin()+i+1);
return;
}
int done = 0;
while (result > 0 && done < cl->send_list.size())
{
@ -270,7 +302,10 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl)
if (cl->outbox[done].flags & MSGR_SENDP_FREE)
{
// Reply fully sent
delete cl->outbox[done].op;
if (more)
cl->zc_free_list.push_back(cl->outbox[done].op);
else
delete cl->outbox[done].op;
}
result -= iov.iov_len;
done++;
@ -282,6 +317,11 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl)
break;
}
}
if (more)
{
assert(done == cl->send_list.size());
cl->zc_free_list.push_back(NULL); // end marker
}
if (done > 0)
{
cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+done);

View File

@ -30,6 +30,12 @@ ring_loop_t::ring_loop_t(int qd, bool multithreaded)
free_ring_data[i] = i;
}
in_loop = false;
auto probe = io_uring_get_probe();
if (probe)
{
support_zc = io_uring_opcode_supported(probe, IORING_OP_SENDMSG_ZC);
io_uring_free_probe(probe);
}
}
ring_loop_t::~ring_loop_t()
@ -108,7 +114,17 @@ void ring_loop_t::loop()
if (mt)
mu.lock();
struct ring_data_t *d = (struct ring_data_t*)cqe->user_data;
if (d->callback)
if (cqe->flags & IORING_CQE_F_MORE)
{
// There will be a second notification
d->res = cqe->res;
d->more = true;
if (d->callback)
d->callback(d);
d->prev = true;
d->more = false;
}
else if (d->callback)
{
// First free ring_data item, then call the callback
// so it has at least 1 free slot for the next event
@ -116,7 +132,10 @@ void ring_loop_t::loop()
struct ring_data_t dl;
dl.iov = d->iov;
dl.res = cqe->res;
dl.more = false;
dl.prev = d->prev;
dl.callback.swap(d->callback);
d->prev = d->more = false;
free_ring_data[free_ring_data_ptr++] = d - ring_datas;
if (mt)
mu.unlock();

View File

@ -62,6 +62,12 @@ static inline void my_uring_prep_sendmsg(struct io_uring_sqe *sqe, int fd, const
sqe->msg_flags = flags;
}
static inline void my_uring_prep_sendmsg_zc(struct io_uring_sqe *sqe, int fd, const struct msghdr *msg, unsigned flags)
{
my_uring_prep_rw(IORING_OP_SENDMSG_ZC, sqe, fd, msg, 1, 0);
sqe->msg_flags = flags;
}
static inline void my_uring_prep_poll_add(struct io_uring_sqe *sqe, int fd, short poll_mask)
{
my_uring_prep_rw(IORING_OP_POLL_ADD, sqe, fd, NULL, 0, 0);
@ -112,6 +118,8 @@ struct ring_data_t
{
struct iovec iov; // for single-entry read/write operations
int res;
bool prev: 1;
bool more: 1;
std::function<void(ring_data_t*)> callback;
};
@ -133,6 +141,7 @@ class ring_loop_t
bool loop_again;
struct io_uring ring;
int ring_eventfd = -1;
bool support_zc = false;
public:
ring_loop_t(int qd, bool multithreaded = false);
~ring_loop_t();
@ -163,6 +172,10 @@ public:
{
return loop_again;
}
inline bool has_sendmsg_zc()
{
return support_zc;
}
void loop();
void wakeup();