Compare commits
1 Commits
master
...
uring-zero
Author | SHA1 | Date |
---|---|---|
|
94363abb73 |
docs/config
|
@ -34,6 +34,7 @@ between clients, OSDs and etcd.
|
|||
- [etcd_ws_keepalive_interval](#etcd_ws_keepalive_interval)
|
||||
- [etcd_min_reload_interval](#etcd_min_reload_interval)
|
||||
- [tcp_header_buffer_size](#tcp_header_buffer_size)
|
||||
- [min_zerocopy_send_size](#min_zerocopy_send_size)
|
||||
- [use_sync_send_recv](#use_sync_send_recv)
|
||||
|
||||
## osd_network
|
||||
|
@ -313,6 +314,17 @@ is received without an additional copy. You can try to play with this
|
|||
parameter and see how it affects random iops and linear bandwidth if you
|
||||
want.
|
||||
|
||||
## min_zerocopy_send_size
|
||||
|
||||
- Type: integer
|
||||
- Default: 12288
|
||||
|
||||
OSDs and clients will attempt to use io_uring-based zero-copy TCP send
|
||||
for buffers larger than this number of bytes. Zero-copy send with io_uring is
|
||||
supported since Linux kernel version 6.1. Support is auto-detected and disabled
|
||||
automatically when not available. It can also be disabled explicitly by setting
|
||||
this parameter to a negative value.
|
||||
|
||||
## use_sync_send_recv
|
||||
|
||||
- Type: boolean
|
||||
|
|
|
@ -34,6 +34,7 @@
|
|||
- [etcd_ws_keepalive_interval](#etcd_ws_keepalive_interval)
|
||||
- [etcd_min_reload_interval](#etcd_min_reload_interval)
|
||||
- [tcp_header_buffer_size](#tcp_header_buffer_size)
|
||||
- [min_zerocopy_send_size](#min_zerocopy_send_size)
|
||||
- [use_sync_send_recv](#use_sync_send_recv)
|
||||
|
||||
## osd_network
|
||||
|
@ -321,6 +322,17 @@ Vitastor содержат 128-байтные заголовки, за котор
|
|||
поменять этот параметр и посмотреть, как он влияет на производительность
|
||||
случайного и линейного доступа.
|
||||
|
||||
## min_zerocopy_send_size
|
||||
|
||||
- Тип: целое число
|
||||
- Значение по умолчанию: 12288
|
||||
|
||||
OSD и клиенты будут пробовать использовать TCP-отправку без копирования (zero-copy) на
|
||||
основе io_uring для буферов больших, чем это число байт. Отправка без копирования
|
||||
поддерживается в io_uring, начиная с версии ядра Linux 6.1. Наличие поддержки
|
||||
проверяется автоматически и zero-copy отключается, когда поддержки нет. Также
|
||||
её можно отключить явно, установив данный параметр в отрицательное значение.
|
||||
|
||||
## use_sync_send_recv
|
||||
|
||||
- Тип: булево (да/нет)
|
||||
|
|
|
@ -373,6 +373,21 @@
|
|||
параметра читается без дополнительного копирования. Вы можете попробовать
|
||||
поменять этот параметр и посмотреть, как он влияет на производительность
|
||||
случайного и линейного доступа.
|
||||
- name: min_zerocopy_send_size
|
||||
type: int
|
||||
default: 12288
|
||||
info: |
|
||||
OSDs and clients will attempt to use io_uring-based zero-copy TCP send
|
||||
for buffers larger than this number of bytes. Zero-copy send with io_uring is
|
||||
supported since Linux kernel version 6.1. Support is auto-detected and disabled
|
||||
automatically when not available. It can also be disabled explicitly by setting
|
||||
this parameter to a negative value.
|
||||
info_ru: |
|
||||
OSD и клиенты будут пробовать использовать TCP-отправку без копирования (zero-copy) на
|
||||
основе io_uring для буферов больших, чем это число байт. Отправка без копирования
|
||||
поддерживается в io_uring, начиная с версии ядра Linux 6.1. Наличие поддержки
|
||||
проверяется автоматически и zero-copy отключается, когда поддержки нет. Также
|
||||
её можно отключить явно, установив данный параметр в отрицательное значение.
|
||||
- name: use_sync_send_recv
|
||||
type: bool
|
||||
default: false
|
||||
|
|
|
@ -167,6 +167,10 @@ void osd_messenger_t::init()
|
|||
}
|
||||
}
|
||||
#endif
|
||||
if (ringloop)
|
||||
{
|
||||
has_sendmsg_zc = ringloop->has_sendmsg_zc();
|
||||
}
|
||||
if (ringloop && iothread_count > 0)
|
||||
{
|
||||
for (int i = 0; i < iothread_count; i++)
|
||||
|
@ -329,6 +333,9 @@ void osd_messenger_t::parse_config(const json11::Json & config)
|
|||
this->receive_buffer_size = 65536;
|
||||
this->use_sync_send_recv = config["use_sync_send_recv"].bool_value() ||
|
||||
config["use_sync_send_recv"].uint64_value();
|
||||
this->min_zerocopy_send_size = config["min_zerocopy_send_size"].is_null()
|
||||
? 12*1024
|
||||
: (int)config["min_zerocopy_send_size"].int64_value();
|
||||
this->peer_connect_interval = config["peer_connect_interval"].uint64_value();
|
||||
if (!this->peer_connect_interval)
|
||||
this->peer_connect_interval = 5;
|
||||
|
@ -896,6 +903,7 @@ static const char* local_only_params[] = {
|
|||
"tcp_header_buffer_size",
|
||||
"use_rdma",
|
||||
"use_sync_send_recv",
|
||||
"min_zerocopy_send_size",
|
||||
};
|
||||
|
||||
static const char **local_only_end = local_only_params + (sizeof(local_only_params)/sizeof(local_only_params[0]));
|
||||
|
|
|
@ -88,6 +88,7 @@ struct osd_client_t
|
|||
int write_state = 0;
|
||||
std::vector<iovec> send_list, next_send_list;
|
||||
std::vector<msgr_sendp_t> outbox, next_outbox;
|
||||
std::vector<osd_op_t*> zc_free_list;
|
||||
|
||||
~osd_client_t();
|
||||
};
|
||||
|
@ -175,6 +176,7 @@ protected:
|
|||
int osd_ping_timeout = 0;
|
||||
int log_level = 0;
|
||||
bool use_sync_send_recv = false;
|
||||
int min_zerocopy_send_size = 12*1024;
|
||||
int iothread_count = 0;
|
||||
|
||||
#ifdef WITH_RDMA
|
||||
|
@ -201,8 +203,9 @@ protected:
|
|||
std::vector<osd_op_t*> set_immediate_ops;
|
||||
|
||||
public:
|
||||
timerfd_manager_t *tfd;
|
||||
ring_loop_t *ringloop;
|
||||
timerfd_manager_t *tfd = NULL;
|
||||
ring_loop_t *ringloop = NULL;
|
||||
bool has_sendmsg_zc = false;
|
||||
// osd_num_t is only for logging and asserts
|
||||
osd_num_t osd_num;
|
||||
uint64_t next_subop_id = 1;
|
||||
|
@ -261,7 +264,7 @@ protected:
|
|||
void cancel_op(osd_op_t *op);
|
||||
|
||||
bool try_send(osd_client_t *cl);
|
||||
void handle_send(int result, osd_client_t *cl);
|
||||
void handle_send(int result, bool prev, bool more, osd_client_t *cl);
|
||||
|
||||
bool handle_read(int result, osd_client_t *cl);
|
||||
bool handle_read_buffer(osd_client_t *cl, void *curbuf, int remain);
|
||||
|
|
|
@ -203,8 +203,24 @@ bool osd_messenger_t::try_send(osd_client_t *cl)
|
|||
cl->write_msg.msg_iovlen = cl->send_list.size() < IOV_MAX ? cl->send_list.size() : IOV_MAX;
|
||||
cl->refs++;
|
||||
ring_data_t* data = ((ring_data_t*)sqe->user_data);
|
||||
data->callback = [this, cl](ring_data_t *data) { handle_send(data->res, cl); };
|
||||
my_uring_prep_sendmsg(sqe, peer_fd, &cl->write_msg, 0);
|
||||
data->callback = [this, cl](ring_data_t *data) { handle_send(data->res, data->prev, data->more, cl); };
|
||||
bool use_zc = has_sendmsg_zc && min_zerocopy_send_size >= 0;
|
||||
if (use_zc && min_zerocopy_send_size > 0)
|
||||
{
|
||||
size_t avg_size = 0;
|
||||
for (size_t i = 0; i < cl->write_msg.msg_iovlen; i++)
|
||||
avg_size += cl->write_msg.msg_iov[i].iov_len;
|
||||
if (avg_size/cl->write_msg.msg_iovlen < min_zerocopy_send_size)
|
||||
use_zc = false;
|
||||
}
|
||||
if (use_zc)
|
||||
{
|
||||
my_uring_prep_sendmsg_zc(sqe, peer_fd, &cl->write_msg, MSG_WAITALL);
|
||||
}
|
||||
else
|
||||
{
|
||||
my_uring_prep_sendmsg(sqe, peer_fd, &cl->write_msg, MSG_WAITALL);
|
||||
}
|
||||
if (iothread)
|
||||
{
|
||||
iothread->add_sqe(sqe_local);
|
||||
|
@ -220,7 +236,7 @@ bool osd_messenger_t::try_send(osd_client_t *cl)
|
|||
{
|
||||
result = -errno;
|
||||
}
|
||||
handle_send(result, cl);
|
||||
handle_send(result, false, false, cl);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
@ -240,10 +256,16 @@ void osd_messenger_t::send_replies()
|
|||
write_ready_clients.clear();
|
||||
}
|
||||
|
||||
void osd_messenger_t::handle_send(int result, osd_client_t *cl)
|
||||
void osd_messenger_t::handle_send(int result, bool prev, bool more, osd_client_t *cl)
|
||||
{
|
||||
cl->write_msg.msg_iovlen = 0;
|
||||
cl->refs--;
|
||||
if (!prev)
|
||||
{
|
||||
cl->write_msg.msg_iovlen = 0;
|
||||
}
|
||||
if (!more)
|
||||
{
|
||||
cl->refs--;
|
||||
}
|
||||
if (cl->peer_state == PEER_STOPPED)
|
||||
{
|
||||
if (cl->refs <= 0)
|
||||
|
@ -261,6 +283,16 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl)
|
|||
}
|
||||
if (result >= 0)
|
||||
{
|
||||
if (prev)
|
||||
{
|
||||
// Second notification - only free a batch of postponed ops
|
||||
int i = 0;
|
||||
for (; i < cl->zc_free_list.size() && cl->zc_free_list[i]; i++)
|
||||
delete cl->zc_free_list[i];
|
||||
if (i > 0)
|
||||
cl->zc_free_list.erase(cl->zc_free_list.begin(), cl->zc_free_list.begin()+i+1);
|
||||
return;
|
||||
}
|
||||
int done = 0;
|
||||
while (result > 0 && done < cl->send_list.size())
|
||||
{
|
||||
|
@ -270,7 +302,10 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl)
|
|||
if (cl->outbox[done].flags & MSGR_SENDP_FREE)
|
||||
{
|
||||
// Reply fully sent
|
||||
delete cl->outbox[done].op;
|
||||
if (more)
|
||||
cl->zc_free_list.push_back(cl->outbox[done].op);
|
||||
else
|
||||
delete cl->outbox[done].op;
|
||||
}
|
||||
result -= iov.iov_len;
|
||||
done++;
|
||||
|
@ -282,6 +317,11 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl)
|
|||
break;
|
||||
}
|
||||
}
|
||||
if (more)
|
||||
{
|
||||
assert(done == cl->send_list.size());
|
||||
cl->zc_free_list.push_back(NULL); // end marker
|
||||
}
|
||||
if (done > 0)
|
||||
{
|
||||
cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+done);
|
||||
|
|
|
@ -30,6 +30,12 @@ ring_loop_t::ring_loop_t(int qd, bool multithreaded)
|
|||
free_ring_data[i] = i;
|
||||
}
|
||||
in_loop = false;
|
||||
auto probe = io_uring_get_probe();
|
||||
if (probe)
|
||||
{
|
||||
support_zc = io_uring_opcode_supported(probe, IORING_OP_SENDMSG_ZC);
|
||||
io_uring_free_probe(probe);
|
||||
}
|
||||
}
|
||||
|
||||
ring_loop_t::~ring_loop_t()
|
||||
|
@ -108,7 +114,17 @@ void ring_loop_t::loop()
|
|||
if (mt)
|
||||
mu.lock();
|
||||
struct ring_data_t *d = (struct ring_data_t*)cqe->user_data;
|
||||
if (d->callback)
|
||||
if (cqe->flags & IORING_CQE_F_MORE)
|
||||
{
|
||||
// There will be a second notification
|
||||
d->res = cqe->res;
|
||||
d->more = true;
|
||||
if (d->callback)
|
||||
d->callback(d);
|
||||
d->prev = true;
|
||||
d->more = false;
|
||||
}
|
||||
else if (d->callback)
|
||||
{
|
||||
// First free ring_data item, then call the callback
|
||||
// so it has at least 1 free slot for the next event
|
||||
|
@ -116,7 +132,10 @@ void ring_loop_t::loop()
|
|||
struct ring_data_t dl;
|
||||
dl.iov = d->iov;
|
||||
dl.res = cqe->res;
|
||||
dl.more = false;
|
||||
dl.prev = d->prev;
|
||||
dl.callback.swap(d->callback);
|
||||
d->prev = d->more = false;
|
||||
free_ring_data[free_ring_data_ptr++] = d - ring_datas;
|
||||
if (mt)
|
||||
mu.unlock();
|
||||
|
|
|
@ -62,6 +62,12 @@ static inline void my_uring_prep_sendmsg(struct io_uring_sqe *sqe, int fd, const
|
|||
sqe->msg_flags = flags;
|
||||
}
|
||||
|
||||
static inline void my_uring_prep_sendmsg_zc(struct io_uring_sqe *sqe, int fd, const struct msghdr *msg, unsigned flags)
|
||||
{
|
||||
my_uring_prep_rw(IORING_OP_SENDMSG_ZC, sqe, fd, msg, 1, 0);
|
||||
sqe->msg_flags = flags;
|
||||
}
|
||||
|
||||
static inline void my_uring_prep_poll_add(struct io_uring_sqe *sqe, int fd, short poll_mask)
|
||||
{
|
||||
my_uring_prep_rw(IORING_OP_POLL_ADD, sqe, fd, NULL, 0, 0);
|
||||
|
@ -112,6 +118,8 @@ struct ring_data_t
|
|||
{
|
||||
struct iovec iov; // for single-entry read/write operations
|
||||
int res;
|
||||
bool prev: 1;
|
||||
bool more: 1;
|
||||
std::function<void(ring_data_t*)> callback;
|
||||
};
|
||||
|
||||
|
@ -133,6 +141,7 @@ class ring_loop_t
|
|||
bool loop_again;
|
||||
struct io_uring ring;
|
||||
int ring_eventfd = -1;
|
||||
bool support_zc = false;
|
||||
public:
|
||||
ring_loop_t(int qd, bool multithreaded = false);
|
||||
~ring_loop_t();
|
||||
|
@ -163,6 +172,10 @@ public:
|
|||
{
|
||||
return loop_again;
|
||||
}
|
||||
inline bool has_sendmsg_zc()
|
||||
{
|
||||
return support_zc;
|
||||
}
|
||||
|
||||
void loop();
|
||||
void wakeup();
|
||||
|
|
Loading…
Reference in New Issue