forked from vitalif/vitastor
Support iovecs for read operations
parent
0f6d193d73
commit
badf68c039
|
@ -446,15 +446,7 @@ bool cluster_client_t::try_send(cluster_op_t *op, cluster_op_part_t *part)
|
||||||
handle_op_part(part);
|
handle_op_part(part);
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
part->op.send_list.push_back(part->op.req.buf, OSD_PACKET_SIZE);
|
part->op.iov.push_back(part->buf, part->len);
|
||||||
if (op->opcode == OSD_OP_WRITE)
|
|
||||||
{
|
|
||||||
part->op.send_list.push_back(part->buf, part->len);
|
|
||||||
}
|
|
||||||
else /* if (op->opcode == OSD_OP_READ) */
|
|
||||||
{
|
|
||||||
part->op.buf = part->buf;
|
|
||||||
}
|
|
||||||
msgr.outbox_push(&part->op);
|
msgr.outbox_push(&part->op);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -606,7 +598,6 @@ void cluster_client_t::send_sync(cluster_op_t *op, cluster_op_part_t *part)
|
||||||
handle_op_part(part);
|
handle_op_part(part);
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
part->op.send_list.push_back(part->op.req.buf, OSD_PACKET_SIZE);
|
|
||||||
msgr.outbox_push(&part->op);
|
msgr.outbox_push(&part->op);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -214,7 +214,6 @@ void osd_messenger_t::check_peer_config(osd_client_t & cl)
|
||||||
{
|
{
|
||||||
osd_op_t *op = new osd_op_t();
|
osd_op_t *op = new osd_op_t();
|
||||||
op->op_type = OSD_OP_OUT;
|
op->op_type = OSD_OP_OUT;
|
||||||
op->send_list.push_back(op->req.buf, OSD_PACKET_SIZE);
|
|
||||||
op->peer_fd = cl.peer_fd;
|
op->peer_fd = cl.peer_fd;
|
||||||
op->req = {
|
op->req = {
|
||||||
.show_conf = {
|
.show_conf = {
|
||||||
|
|
89
messenger.h
89
messenger.h
|
@ -31,13 +31,32 @@
|
||||||
#define DEFAULT_PEER_CONNECT_INTERVAL 5
|
#define DEFAULT_PEER_CONNECT_INTERVAL 5
|
||||||
#define DEFAULT_PEER_CONNECT_TIMEOUT 5
|
#define DEFAULT_PEER_CONNECT_TIMEOUT 5
|
||||||
|
|
||||||
|
// Kind of a vector with small-list-optimisation
|
||||||
struct osd_op_buf_list_t
|
struct osd_op_buf_list_t
|
||||||
{
|
{
|
||||||
int count = 0, alloc = 0, sent = 0;
|
int count = 0, alloc = OSD_OP_INLINE_BUF_COUNT, done = 0;
|
||||||
iovec *buf = NULL;
|
iovec *buf = NULL;
|
||||||
iovec inline_buf[OSD_OP_INLINE_BUF_COUNT];
|
iovec inline_buf[OSD_OP_INLINE_BUF_COUNT];
|
||||||
|
|
||||||
~osd_op_buf_list_t()
|
inline osd_op_buf_list_t()
|
||||||
|
{
|
||||||
|
buf = inline_buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
inline osd_op_buf_list_t(const osd_op_buf_list_t & other)
|
||||||
|
{
|
||||||
|
buf = inline_buf;
|
||||||
|
append(other);
|
||||||
|
}
|
||||||
|
|
||||||
|
inline osd_op_buf_list_t & operator = (const osd_op_buf_list_t & other)
|
||||||
|
{
|
||||||
|
reset();
|
||||||
|
append(other);
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
inline ~osd_op_buf_list_t()
|
||||||
{
|
{
|
||||||
if (buf && buf != inline_buf)
|
if (buf && buf != inline_buf)
|
||||||
{
|
{
|
||||||
|
@ -45,26 +64,50 @@ struct osd_op_buf_list_t
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline void reset()
|
||||||
|
{
|
||||||
|
count = 0;
|
||||||
|
done = 0;
|
||||||
|
}
|
||||||
|
|
||||||
inline iovec* get_iovec()
|
inline iovec* get_iovec()
|
||||||
{
|
{
|
||||||
return (buf ? buf : inline_buf) + sent;
|
return buf + done;
|
||||||
}
|
}
|
||||||
|
|
||||||
inline int get_size()
|
inline int get_size()
|
||||||
{
|
{
|
||||||
return count - sent;
|
return count - done;
|
||||||
|
}
|
||||||
|
|
||||||
|
inline void append(const osd_op_buf_list_t & other)
|
||||||
|
{
|
||||||
|
if (count+other.count > alloc)
|
||||||
|
{
|
||||||
|
if (buf == inline_buf)
|
||||||
|
{
|
||||||
|
int old = alloc;
|
||||||
|
alloc = (((count+other.count+15)/16)*16);
|
||||||
|
buf = (iovec*)malloc(sizeof(iovec) * alloc);
|
||||||
|
memcpy(buf, inline_buf, sizeof(iovec) * old);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
alloc = (((count+other.count+15)/16)*16);
|
||||||
|
buf = (iovec*)realloc(buf, sizeof(iovec) * alloc);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (int i = 0; i < other.count; i++)
|
||||||
|
{
|
||||||
|
buf[count++] = other.buf[i];
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
inline void push_back(void *nbuf, size_t len)
|
inline void push_back(void *nbuf, size_t len)
|
||||||
{
|
{
|
||||||
if (count >= alloc)
|
if (count >= alloc)
|
||||||
{
|
{
|
||||||
if (!alloc)
|
if (buf == inline_buf)
|
||||||
{
|
|
||||||
alloc = OSD_OP_INLINE_BUF_COUNT;
|
|
||||||
buf = inline_buf;
|
|
||||||
}
|
|
||||||
else if (buf == inline_buf)
|
|
||||||
{
|
{
|
||||||
int old = alloc;
|
int old = alloc;
|
||||||
alloc = ((alloc/16)*16 + 1);
|
alloc = ((alloc/16)*16 + 1);
|
||||||
|
@ -79,6 +122,25 @@ struct osd_op_buf_list_t
|
||||||
}
|
}
|
||||||
buf[count++] = { .iov_base = nbuf, .iov_len = len };
|
buf[count++] = { .iov_base = nbuf, .iov_len = len };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline void eat(int result)
|
||||||
|
{
|
||||||
|
while (result > 0 && done < count)
|
||||||
|
{
|
||||||
|
iovec & iov = buf[done];
|
||||||
|
if (iov.iov_len <= result)
|
||||||
|
{
|
||||||
|
result -= iov.iov_len;
|
||||||
|
done++;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
iov.iov_len -= result;
|
||||||
|
iov.iov_base += result;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
struct blockstore_op_t;
|
struct blockstore_op_t;
|
||||||
|
@ -98,7 +160,7 @@ struct osd_op_t
|
||||||
osd_primary_op_data_t* op_data = NULL;
|
osd_primary_op_data_t* op_data = NULL;
|
||||||
std::function<void(osd_op_t*)> callback;
|
std::function<void(osd_op_t*)> callback;
|
||||||
|
|
||||||
osd_op_buf_list_t send_list;
|
osd_op_buf_list_t iov;
|
||||||
|
|
||||||
~osd_op_t();
|
~osd_op_t();
|
||||||
};
|
};
|
||||||
|
@ -117,12 +179,11 @@ struct osd_client_t
|
||||||
// Read state
|
// Read state
|
||||||
int read_ready = 0;
|
int read_ready = 0;
|
||||||
osd_op_t *read_op = NULL;
|
osd_op_t *read_op = NULL;
|
||||||
int read_reply_id = 0;
|
|
||||||
iovec read_iov;
|
iovec read_iov;
|
||||||
msghdr read_msg;
|
msghdr read_msg;
|
||||||
void *read_buf = NULL;
|
|
||||||
int read_remaining = 0;
|
int read_remaining = 0;
|
||||||
int read_state = 0;
|
int read_state = 0;
|
||||||
|
osd_op_buf_list_t recv_list;
|
||||||
|
|
||||||
// Incoming operations
|
// Incoming operations
|
||||||
std::vector<osd_op_t*> received_ops;
|
std::vector<osd_op_t*> received_ops;
|
||||||
|
@ -138,6 +199,7 @@ struct osd_client_t
|
||||||
osd_op_t *write_op = NULL;
|
osd_op_t *write_op = NULL;
|
||||||
msghdr write_msg;
|
msghdr write_msg;
|
||||||
int write_state = 0;
|
int write_state = 0;
|
||||||
|
osd_op_buf_list_t send_list;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct osd_wanted_peer_t
|
struct osd_wanted_peer_t
|
||||||
|
@ -211,4 +273,5 @@ protected:
|
||||||
bool handle_finished_read(osd_client_t & cl);
|
bool handle_finished_read(osd_client_t & cl);
|
||||||
void handle_op_hdr(osd_client_t *cl);
|
void handle_op_hdr(osd_client_t *cl);
|
||||||
bool handle_reply_hdr(osd_client_t *cl);
|
bool handle_reply_hdr(osd_client_t *cl);
|
||||||
|
void handle_reply_ready(osd_op_t *op);
|
||||||
};
|
};
|
||||||
|
|
160
msgr_receive.cpp
160
msgr_receive.cpp
|
@ -13,18 +13,20 @@ void osd_messenger_t::read_requests()
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
ring_data_t* data = ((ring_data_t*)sqe->user_data);
|
ring_data_t* data = ((ring_data_t*)sqe->user_data);
|
||||||
if (!cl.read_op || cl.read_remaining < receive_buffer_size)
|
if (cl.read_remaining < receive_buffer_size)
|
||||||
{
|
{
|
||||||
cl.read_iov.iov_base = cl.in_buf;
|
cl.read_iov.iov_base = cl.in_buf;
|
||||||
cl.read_iov.iov_len = receive_buffer_size;
|
cl.read_iov.iov_len = receive_buffer_size;
|
||||||
|
cl.read_msg.msg_iov = &cl.read_iov;
|
||||||
|
cl.read_msg.msg_iovlen = 1;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
cl.read_iov.iov_base = cl.read_buf;
|
cl.read_iov.iov_base = 0;
|
||||||
cl.read_iov.iov_len = cl.read_remaining;
|
cl.read_iov.iov_len = cl.read_remaining;
|
||||||
|
cl.read_msg.msg_iov = cl.recv_list.get_iovec();
|
||||||
|
cl.read_msg.msg_iovlen = cl.recv_list.get_size();
|
||||||
}
|
}
|
||||||
cl.read_msg.msg_iov = &cl.read_iov;
|
|
||||||
cl.read_msg.msg_iovlen = 1;
|
|
||||||
data->callback = [this, peer_fd](ring_data_t *data) { handle_read(data->res, peer_fd); };
|
data->callback = [this, peer_fd](ring_data_t *data) { handle_read(data->res, peer_fd); };
|
||||||
my_uring_prep_recvmsg(sqe, peer_fd, &cl.read_msg, 0);
|
my_uring_prep_recvmsg(sqe, peer_fd, &cl.read_msg, 0);
|
||||||
}
|
}
|
||||||
|
@ -69,31 +71,33 @@ bool osd_messenger_t::handle_read(int result, int peer_fd)
|
||||||
cl.read_op = new osd_op_t;
|
cl.read_op = new osd_op_t;
|
||||||
cl.read_op->peer_fd = peer_fd;
|
cl.read_op->peer_fd = peer_fd;
|
||||||
cl.read_op->op_type = OSD_OP_IN;
|
cl.read_op->op_type = OSD_OP_IN;
|
||||||
cl.read_buf = cl.read_op->req.buf;
|
cl.recv_list.push_back(cl.read_op->req.buf, OSD_PACKET_SIZE);
|
||||||
cl.read_remaining = OSD_PACKET_SIZE;
|
cl.read_remaining = OSD_PACKET_SIZE;
|
||||||
cl.read_state = CL_READ_HDR;
|
cl.read_state = CL_READ_HDR;
|
||||||
}
|
}
|
||||||
if (cl.read_remaining > remain)
|
while (cl.recv_list.done < cl.recv_list.count && remain > 0)
|
||||||
{
|
{
|
||||||
memcpy(cl.read_buf, curbuf, remain);
|
iovec* cur = cl.recv_list.get_iovec();
|
||||||
cl.read_remaining -= remain;
|
if (cur->iov_len > remain)
|
||||||
cl.read_buf += remain;
|
|
||||||
remain = 0;
|
|
||||||
if (cl.read_remaining <= 0)
|
|
||||||
{
|
{
|
||||||
if (!handle_finished_read(cl))
|
memcpy(cur->iov_base, curbuf, remain);
|
||||||
{
|
cl.read_remaining -= remain;
|
||||||
goto fin;
|
cur->iov_len -= remain;
|
||||||
}
|
cur->iov_base += remain;
|
||||||
|
remain = 0;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
memcpy(cur->iov_base, curbuf, cur->iov_len);
|
||||||
|
curbuf += cur->iov_len;
|
||||||
|
cl.read_remaining -= cur->iov_len;
|
||||||
|
remain -= cur->iov_len;
|
||||||
|
cur->iov_len = 0;
|
||||||
|
cl.recv_list.done++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
if (cl.recv_list.done >= cl.recv_list.count)
|
||||||
{
|
{
|
||||||
memcpy(cl.read_buf, curbuf, cl.read_remaining);
|
|
||||||
curbuf += cl.read_remaining;
|
|
||||||
remain -= cl.read_remaining;
|
|
||||||
cl.read_remaining = 0;
|
|
||||||
cl.read_buf = NULL;
|
|
||||||
if (!handle_finished_read(cl))
|
if (!handle_finished_read(cl))
|
||||||
{
|
{
|
||||||
goto fin;
|
goto fin;
|
||||||
|
@ -105,8 +109,8 @@ bool osd_messenger_t::handle_read(int result, int peer_fd)
|
||||||
{
|
{
|
||||||
// Long data
|
// Long data
|
||||||
cl.read_remaining -= result;
|
cl.read_remaining -= result;
|
||||||
cl.read_buf += result;
|
cl.recv_list.eat(result);
|
||||||
if (cl.read_remaining <= 0)
|
if (cl.recv_list.done >= cl.recv_list.count)
|
||||||
{
|
{
|
||||||
handle_finished_read(cl);
|
handle_finished_read(cl);
|
||||||
}
|
}
|
||||||
|
@ -128,6 +132,7 @@ fin:
|
||||||
|
|
||||||
bool osd_messenger_t::handle_finished_read(osd_client_t & cl)
|
bool osd_messenger_t::handle_finished_read(osd_client_t & cl)
|
||||||
{
|
{
|
||||||
|
cl.recv_list.reset();
|
||||||
if (cl.read_state == CL_READ_HDR)
|
if (cl.read_state == CL_READ_HDR)
|
||||||
{
|
{
|
||||||
if (cl.read_op->req.hdr.magic == SECONDARY_OSD_REPLY_MAGIC)
|
if (cl.read_op->req.hdr.magic == SECONDARY_OSD_REPLY_MAGIC)
|
||||||
|
@ -146,30 +151,9 @@ bool osd_messenger_t::handle_finished_read(osd_client_t & cl)
|
||||||
else if (cl.read_state == CL_READ_REPLY_DATA)
|
else if (cl.read_state == CL_READ_REPLY_DATA)
|
||||||
{
|
{
|
||||||
// Reply is ready
|
// Reply is ready
|
||||||
auto req_it = cl.sent_ops.find(cl.read_reply_id);
|
handle_reply_ready(cl.read_op);
|
||||||
osd_op_t *request = req_it->second;
|
|
||||||
cl.sent_ops.erase(req_it);
|
|
||||||
cl.read_reply_id = 0;
|
|
||||||
delete cl.read_op;
|
|
||||||
cl.read_op = NULL;
|
cl.read_op = NULL;
|
||||||
cl.read_state = 0;
|
cl.read_state = 0;
|
||||||
// Measure subop latency
|
|
||||||
timespec tv_end;
|
|
||||||
clock_gettime(CLOCK_REALTIME, &tv_end);
|
|
||||||
stats.subop_stat_count[request->req.hdr.opcode]++;
|
|
||||||
if (!stats.subop_stat_count[request->req.hdr.opcode])
|
|
||||||
{
|
|
||||||
stats.subop_stat_count[request->req.hdr.opcode]++;
|
|
||||||
stats.subop_stat_sum[request->req.hdr.opcode] = 0;
|
|
||||||
}
|
|
||||||
stats.subop_stat_sum[request->req.hdr.opcode] += (
|
|
||||||
(tv_end.tv_sec - request->tv_begin.tv_sec)*1000000 +
|
|
||||||
(tv_end.tv_nsec - request->tv_begin.tv_nsec)/1000
|
|
||||||
);
|
|
||||||
set_immediate.push_back([this, request]()
|
|
||||||
{
|
|
||||||
std::function<void(osd_op_t*)>(request->callback)(request);
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -215,82 +199,92 @@ void osd_messenger_t::handle_op_hdr(osd_client_t *cl)
|
||||||
if (cl->read_remaining > 0)
|
if (cl->read_remaining > 0)
|
||||||
{
|
{
|
||||||
// Read data
|
// Read data
|
||||||
cl->read_buf = cur_op->buf;
|
cl->recv_list.push_back(cur_op->buf, cl->read_remaining);
|
||||||
cl->read_state = CL_READ_DATA;
|
cl->read_state = CL_READ_DATA;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
// Operation is ready
|
// Operation is ready
|
||||||
cl->read_op = NULL;
|
|
||||||
cl->read_state = 0;
|
|
||||||
cl->received_ops.push_back(cur_op);
|
cl->received_ops.push_back(cur_op);
|
||||||
set_immediate.push_back([this, cur_op]() { exec_op(cur_op); });
|
set_immediate.push_back([this, cur_op]() { exec_op(cur_op); });
|
||||||
|
cl->read_op = NULL;
|
||||||
|
cl->read_state = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl)
|
bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl)
|
||||||
{
|
{
|
||||||
osd_op_t *cur_op = cl->read_op;
|
auto req_it = cl->sent_ops.find(cl->read_op->req.hdr.id);
|
||||||
auto req_it = cl->sent_ops.find(cur_op->req.hdr.id);
|
|
||||||
if (req_it == cl->sent_ops.end())
|
if (req_it == cl->sent_ops.end())
|
||||||
{
|
{
|
||||||
// Command out of sync. Drop connection
|
// Command out of sync. Drop connection
|
||||||
printf("Client %d command out of sync: id %lu\n", cl->peer_fd, cur_op->req.hdr.id);
|
printf("Client %d command out of sync: id %lu\n", cl->peer_fd, cl->read_op->req.hdr.id);
|
||||||
stop_client(cl->peer_fd);
|
stop_client(cl->peer_fd);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
osd_op_t *op = req_it->second;
|
osd_op_t *op = req_it->second;
|
||||||
memcpy(op->reply.buf, cur_op->req.buf, OSD_PACKET_SIZE);
|
memcpy(op->reply.buf, cl->read_op->req.buf, OSD_PACKET_SIZE);
|
||||||
|
cl->sent_ops.erase(req_it);
|
||||||
if ((op->reply.hdr.opcode == OSD_OP_SECONDARY_READ || op->reply.hdr.opcode == OSD_OP_READ) &&
|
if ((op->reply.hdr.opcode == OSD_OP_SECONDARY_READ || op->reply.hdr.opcode == OSD_OP_READ) &&
|
||||||
op->reply.hdr.retval > 0)
|
op->reply.hdr.retval > 0)
|
||||||
{
|
{
|
||||||
// Read data. In this case we assume that the buffer is preallocated by the caller (!)
|
// Read data. In this case we assume that the buffer is preallocated by the caller (!)
|
||||||
assert(op->buf);
|
assert(op->iov.count > 0);
|
||||||
|
cl->recv_list.append(op->iov);
|
||||||
|
delete cl->read_op;
|
||||||
|
cl->read_op = op;
|
||||||
cl->read_state = CL_READ_REPLY_DATA;
|
cl->read_state = CL_READ_REPLY_DATA;
|
||||||
cl->read_reply_id = op->req.hdr.id;
|
|
||||||
cl->read_buf = op->buf;
|
|
||||||
cl->read_remaining = op->reply.hdr.retval;
|
cl->read_remaining = op->reply.hdr.retval;
|
||||||
}
|
}
|
||||||
else if (op->reply.hdr.opcode == OSD_OP_SECONDARY_LIST && op->reply.hdr.retval > 0)
|
else if (op->reply.hdr.opcode == OSD_OP_SECONDARY_LIST && op->reply.hdr.retval > 0)
|
||||||
{
|
{
|
||||||
op->buf = memalign(MEM_ALIGNMENT, sizeof(obj_ver_id) * op->reply.hdr.retval);
|
assert(!op->iov.count);
|
||||||
|
delete cl->read_op;
|
||||||
|
cl->read_op = op;
|
||||||
cl->read_state = CL_READ_REPLY_DATA;
|
cl->read_state = CL_READ_REPLY_DATA;
|
||||||
cl->read_reply_id = op->req.hdr.id;
|
|
||||||
cl->read_buf = op->buf;
|
|
||||||
cl->read_remaining = sizeof(obj_ver_id) * op->reply.hdr.retval;
|
cl->read_remaining = sizeof(obj_ver_id) * op->reply.hdr.retval;
|
||||||
|
op->buf = memalign(MEM_ALIGNMENT, cl->read_remaining);
|
||||||
|
cl->recv_list.push_back(op->buf, cl->read_remaining);
|
||||||
}
|
}
|
||||||
else if (op->reply.hdr.opcode == OSD_OP_SHOW_CONFIG && op->reply.hdr.retval > 0)
|
else if (op->reply.hdr.opcode == OSD_OP_SHOW_CONFIG && op->reply.hdr.retval > 0)
|
||||||
{
|
{
|
||||||
op->buf = malloc(op->reply.hdr.retval);
|
assert(!op->iov.count);
|
||||||
|
delete cl->read_op;
|
||||||
|
cl->read_op = op;
|
||||||
cl->read_state = CL_READ_REPLY_DATA;
|
cl->read_state = CL_READ_REPLY_DATA;
|
||||||
cl->read_reply_id = op->req.hdr.id;
|
|
||||||
cl->read_buf = op->buf;
|
|
||||||
cl->read_remaining = op->reply.hdr.retval;
|
cl->read_remaining = op->reply.hdr.retval;
|
||||||
|
op->buf = malloc(op->reply.hdr.retval);
|
||||||
|
cl->recv_list.push_back(op->buf, op->reply.hdr.retval);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
delete cl->read_op;
|
// It's fine to reuse cl->read_op for the next reply
|
||||||
cl->read_state = 0;
|
handle_reply_ready(op);
|
||||||
cl->read_op = NULL;
|
cl->recv_list.push_back(cl->read_op->req.buf, OSD_PACKET_SIZE);
|
||||||
cl->sent_ops.erase(req_it);
|
cl->read_remaining = OSD_PACKET_SIZE;
|
||||||
// Measure subop latency
|
cl->read_state = CL_READ_HDR;
|
||||||
timespec tv_end;
|
|
||||||
clock_gettime(CLOCK_REALTIME, &tv_end);
|
|
||||||
stats.subop_stat_count[op->req.hdr.opcode]++;
|
|
||||||
if (!stats.subop_stat_count[op->req.hdr.opcode])
|
|
||||||
{
|
|
||||||
stats.subop_stat_count[op->req.hdr.opcode]++;
|
|
||||||
stats.subop_stat_sum[op->req.hdr.opcode] = 0;
|
|
||||||
}
|
|
||||||
stats.subop_stat_sum[op->req.hdr.opcode] += (
|
|
||||||
(tv_end.tv_sec - op->tv_begin.tv_sec)*1000000 +
|
|
||||||
(tv_end.tv_nsec - op->tv_begin.tv_nsec)/1000
|
|
||||||
);
|
|
||||||
set_immediate.push_back([this, op]()
|
|
||||||
{
|
|
||||||
// Copy lambda to be unaffected by `delete op`
|
|
||||||
std::function<void(osd_op_t*)>(op->callback)(op);
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void osd_messenger_t::handle_reply_ready(osd_op_t *op)
|
||||||
|
{
|
||||||
|
// Measure subop latency
|
||||||
|
timespec tv_end;
|
||||||
|
clock_gettime(CLOCK_REALTIME, &tv_end);
|
||||||
|
stats.subop_stat_count[op->req.hdr.opcode]++;
|
||||||
|
if (!stats.subop_stat_count[op->req.hdr.opcode])
|
||||||
|
{
|
||||||
|
stats.subop_stat_count[op->req.hdr.opcode]++;
|
||||||
|
stats.subop_stat_sum[op->req.hdr.opcode] = 0;
|
||||||
|
}
|
||||||
|
stats.subop_stat_sum[op->req.hdr.opcode] += (
|
||||||
|
(tv_end.tv_sec - op->tv_begin.tv_sec)*1000000 +
|
||||||
|
(tv_end.tv_nsec - op->tv_begin.tv_nsec)/1000
|
||||||
|
);
|
||||||
|
set_immediate.push_back([this, op]()
|
||||||
|
{
|
||||||
|
// Copy lambda to be unaffected by `delete op`
|
||||||
|
std::function<void(osd_op_t*)>(op->callback)(op);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
|
@ -80,10 +80,29 @@ bool osd_messenger_t::try_send(osd_client_t & cl)
|
||||||
{
|
{
|
||||||
stats.op_stat_bytes[cl.write_op->req.hdr.opcode] += cl.write_op->req.sec_rw.len;
|
stats.op_stat_bytes[cl.write_op->req.hdr.opcode] += cl.write_op->req.sec_rw.len;
|
||||||
}
|
}
|
||||||
|
cl.send_list.push_back(cl.write_op->reply.buf, OSD_PACKET_SIZE);
|
||||||
|
if (cl.write_op->req.hdr.opcode == OSD_OP_READ ||
|
||||||
|
cl.write_op->req.hdr.opcode == OSD_OP_SECONDARY_READ ||
|
||||||
|
cl.write_op->req.hdr.opcode == OSD_OP_SECONDARY_LIST ||
|
||||||
|
cl.write_op->req.hdr.opcode == OSD_OP_SHOW_CONFIG)
|
||||||
|
{
|
||||||
|
cl.send_list.append(cl.write_op->iov);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
cl.send_list.push_back(cl.write_op->req.buf, OSD_PACKET_SIZE);
|
||||||
|
if (cl.write_op->req.hdr.opcode == OSD_OP_WRITE ||
|
||||||
|
cl.write_op->req.hdr.opcode == OSD_OP_SECONDARY_WRITE ||
|
||||||
|
cl.write_op->req.hdr.opcode == OSD_OP_SECONDARY_STABILIZE ||
|
||||||
|
cl.write_op->req.hdr.opcode == OSD_OP_SECONDARY_ROLLBACK)
|
||||||
|
{
|
||||||
|
cl.send_list.append(cl.write_op->iov);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
cl.write_msg.msg_iov = cl.write_op->send_list.get_iovec();
|
cl.write_msg.msg_iov = cl.send_list.get_iovec();
|
||||||
cl.write_msg.msg_iovlen = cl.write_op->send_list.get_size();
|
cl.write_msg.msg_iovlen = cl.send_list.get_size();
|
||||||
data->callback = [this, peer_fd](ring_data_t *data) { handle_send(data->res, peer_fd); };
|
data->callback = [this, peer_fd](ring_data_t *data) { handle_send(data->res, peer_fd); };
|
||||||
my_uring_prep_sendmsg(sqe, peer_fd, &cl.write_msg, 0);
|
my_uring_prep_sendmsg(sqe, peer_fd, &cl.write_msg, 0);
|
||||||
return true;
|
return true;
|
||||||
|
@ -118,28 +137,14 @@ void osd_messenger_t::handle_send(int result, int peer_fd)
|
||||||
}
|
}
|
||||||
if (result >= 0)
|
if (result >= 0)
|
||||||
{
|
{
|
||||||
osd_op_t *cur_op = cl.write_op;
|
cl.send_list.eat(result);
|
||||||
while (result > 0 && cur_op->send_list.sent < cur_op->send_list.count)
|
if (cl.send_list.done >= cl.send_list.count)
|
||||||
{
|
|
||||||
iovec & iov = cur_op->send_list.buf[cur_op->send_list.sent];
|
|
||||||
if (iov.iov_len <= result)
|
|
||||||
{
|
|
||||||
result -= iov.iov_len;
|
|
||||||
cur_op->send_list.sent++;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
iov.iov_len -= result;
|
|
||||||
iov.iov_base += result;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (cur_op->send_list.sent >= cur_op->send_list.count)
|
|
||||||
{
|
{
|
||||||
// Done
|
// Done
|
||||||
if (cur_op->op_type == OSD_OP_IN)
|
cl.send_list.reset();
|
||||||
|
if (cl.write_op->op_type == OSD_OP_IN)
|
||||||
{
|
{
|
||||||
delete cur_op;
|
delete cl.write_op;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
|
1
osd.cpp
1
osd.cpp
|
@ -289,7 +289,6 @@ void osd_t::exec_op(osd_op_t *cur_op)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
inflight_ops++;
|
inflight_ops++;
|
||||||
cur_op->send_list.push_back(cur_op->reply.buf, OSD_PACKET_SIZE);
|
|
||||||
if (cur_op->req.hdr.magic != SECONDARY_OSD_OP_MAGIC ||
|
if (cur_op->req.hdr.magic != SECONDARY_OSD_OP_MAGIC ||
|
||||||
cur_op->req.hdr.opcode < OSD_OP_MIN || cur_op->req.hdr.opcode > OSD_OP_MAX ||
|
cur_op->req.hdr.opcode < OSD_OP_MIN || cur_op->req.hdr.opcode > OSD_OP_MAX ||
|
||||||
(cur_op->req.hdr.opcode == OSD_OP_SECONDARY_READ || cur_op->req.hdr.opcode == OSD_OP_SECONDARY_WRITE) &&
|
(cur_op->req.hdr.opcode == OSD_OP_SECONDARY_READ || cur_op->req.hdr.opcode == OSD_OP_SECONDARY_WRITE) &&
|
||||||
|
|
|
@ -183,8 +183,7 @@ void osd_t::submit_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback
|
||||||
// Peer
|
// Peer
|
||||||
int peer_fd = c_cli.osd_peer_fds[peer_osd];
|
int peer_fd = c_cli.osd_peer_fds[peer_osd];
|
||||||
op->op_type = OSD_OP_OUT;
|
op->op_type = OSD_OP_OUT;
|
||||||
op->send_list.push_back(op->req.buf, OSD_PACKET_SIZE);
|
op->iov.push_back(op->buf, count * sizeof(obj_ver_id));
|
||||||
op->send_list.push_back(op->buf, count * sizeof(obj_ver_id));
|
|
||||||
op->peer_fd = peer_fd;
|
op->peer_fd = peer_fd;
|
||||||
op->req = {
|
op->req = {
|
||||||
.sec_stab = {
|
.sec_stab = {
|
||||||
|
|
|
@ -303,7 +303,6 @@ void osd_t::submit_sync_and_list_subop(osd_num_t role_osd, pg_peering_state_t *p
|
||||||
auto & cl = c_cli.clients.at(c_cli.osd_peer_fds[role_osd]);
|
auto & cl = c_cli.clients.at(c_cli.osd_peer_fds[role_osd]);
|
||||||
osd_op_t *op = new osd_op_t();
|
osd_op_t *op = new osd_op_t();
|
||||||
op->op_type = OSD_OP_OUT;
|
op->op_type = OSD_OP_OUT;
|
||||||
op->send_list.push_back(op->req.buf, OSD_PACKET_SIZE);
|
|
||||||
op->peer_fd = cl.peer_fd;
|
op->peer_fd = cl.peer_fd;
|
||||||
op->req = {
|
op->req = {
|
||||||
.sec_sync = {
|
.sec_sync = {
|
||||||
|
@ -377,7 +376,6 @@ void osd_t::submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps)
|
||||||
// Peer
|
// Peer
|
||||||
osd_op_t *op = new osd_op_t();
|
osd_op_t *op = new osd_op_t();
|
||||||
op->op_type = OSD_OP_OUT;
|
op->op_type = OSD_OP_OUT;
|
||||||
op->send_list.push_back(op->req.buf, OSD_PACKET_SIZE);
|
|
||||||
op->peer_fd = c_cli.osd_peer_fds[role_osd];
|
op->peer_fd = c_cli.osd_peer_fds[role_osd];
|
||||||
op->req = {
|
op->req = {
|
||||||
.sec_list = {
|
.sec_list = {
|
||||||
|
|
|
@ -142,7 +142,7 @@ resume_2:
|
||||||
if (stripes[role].req_end != 0)
|
if (stripes[role].req_end != 0)
|
||||||
{
|
{
|
||||||
// Send buffer in parts to avoid copying
|
// Send buffer in parts to avoid copying
|
||||||
cur_op->send_list.push_back(
|
cur_op->iov.push_back(
|
||||||
stripes[role].read_buf + (stripes[role].req_start - stripes[role].read_start),
|
stripes[role].read_buf + (stripes[role].req_start - stripes[role].read_start),
|
||||||
stripes[role].req_end - stripes[role].req_start
|
stripes[role].req_end - stripes[role].req_start
|
||||||
);
|
);
|
||||||
|
@ -151,7 +151,7 @@ resume_2:
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
cur_op->send_list.push_back(cur_op->buf, cur_op->req.rw.len);
|
cur_op->iov.push_back(cur_op->buf, cur_op->req.rw.len);
|
||||||
}
|
}
|
||||||
finish_op(cur_op, cur_op->req.rw.len);
|
finish_op(cur_op, cur_op->req.rw.len);
|
||||||
}
|
}
|
||||||
|
|
|
@ -150,7 +150,6 @@ void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t*
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
subops[i].op_type = OSD_OP_OUT;
|
subops[i].op_type = OSD_OP_OUT;
|
||||||
subops[i].send_list.push_back(subops[i].req.buf, OSD_PACKET_SIZE);
|
|
||||||
subops[i].peer_fd = c_cli.osd_peer_fds.at(role_osd_num);
|
subops[i].peer_fd = c_cli.osd_peer_fds.at(role_osd_num);
|
||||||
subops[i].req.sec_rw = {
|
subops[i].req.sec_rw = {
|
||||||
.header = {
|
.header = {
|
||||||
|
@ -173,17 +172,24 @@ void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t*
|
||||||
subops[i].req.sec_rw.offset, subops[i].req.sec_rw.len
|
subops[i].req.sec_rw.offset, subops[i].req.sec_rw.len
|
||||||
);
|
);
|
||||||
#endif
|
#endif
|
||||||
subops[i].buf = w ? stripes[role].write_buf : stripes[role].read_buf;
|
if (w)
|
||||||
if (w && stripes[role].write_end > 0)
|
|
||||||
{
|
{
|
||||||
subops[i].send_list.push_back(stripes[role].write_buf, stripes[role].write_end - stripes[role].write_start);
|
if (stripes[role].write_end > stripes[role].write_start)
|
||||||
|
{
|
||||||
|
subops[i].iov.push_back(stripes[role].write_buf, stripes[role].write_end - stripes[role].write_start);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (stripes[role].read_end > stripes[role].read_start)
|
||||||
|
{
|
||||||
|
subops[i].iov.push_back(stripes[role].read_buf, stripes[role].read_end - stripes[role].read_start);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
subops[i].callback = [cur_op, this](osd_op_t *subop)
|
subops[i].callback = [cur_op, this](osd_op_t *subop)
|
||||||
{
|
{
|
||||||
int fail_fd = subop->req.hdr.opcode == OSD_OP_SECONDARY_WRITE &&
|
int fail_fd = subop->req.hdr.opcode == OSD_OP_SECONDARY_WRITE &&
|
||||||
subop->reply.hdr.retval != subop->req.sec_rw.len ? subop->peer_fd : -1;
|
subop->reply.hdr.retval != subop->req.sec_rw.len ? subop->peer_fd : -1;
|
||||||
// so it doesn't get freed
|
|
||||||
subop->buf = NULL;
|
|
||||||
handle_primary_subop(subop, cur_op);
|
handle_primary_subop(subop, cur_op);
|
||||||
if (fail_fd >= 0)
|
if (fail_fd >= 0)
|
||||||
{
|
{
|
||||||
|
@ -387,7 +393,6 @@ void osd_t::submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_os
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
subops[i].op_type = OSD_OP_OUT;
|
subops[i].op_type = OSD_OP_OUT;
|
||||||
subops[i].send_list.push_back(subops[i].req.buf, OSD_PACKET_SIZE);
|
|
||||||
subops[i].peer_fd = c_cli.osd_peer_fds.at(chunk.osd_num);
|
subops[i].peer_fd = c_cli.osd_peer_fds.at(chunk.osd_num);
|
||||||
subops[i].req.sec_del = {
|
subops[i].req.sec_del = {
|
||||||
.header = {
|
.header = {
|
||||||
|
@ -446,7 +451,6 @@ void osd_t::submit_primary_sync_subops(osd_op_t *cur_op)
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
subops[i].op_type = OSD_OP_OUT;
|
subops[i].op_type = OSD_OP_OUT;
|
||||||
subops[i].send_list.push_back(subops[i].req.buf, OSD_PACKET_SIZE);
|
|
||||||
subops[i].peer_fd = c_cli.osd_peer_fds.at(sync_osd);
|
subops[i].peer_fd = c_cli.osd_peer_fds.at(sync_osd);
|
||||||
subops[i].req.sec_sync = {
|
subops[i].req.sec_sync = {
|
||||||
.header = {
|
.header = {
|
||||||
|
@ -499,7 +503,6 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op)
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
subops[i].op_type = OSD_OP_OUT;
|
subops[i].op_type = OSD_OP_OUT;
|
||||||
subops[i].send_list.push_back(subops[i].req.buf, OSD_PACKET_SIZE);
|
|
||||||
subops[i].peer_fd = c_cli.osd_peer_fds.at(stab_osd.osd_num);
|
subops[i].peer_fd = c_cli.osd_peer_fds.at(stab_osd.osd_num);
|
||||||
subops[i].req.sec_stab = {
|
subops[i].req.sec_stab = {
|
||||||
.header = {
|
.header = {
|
||||||
|
@ -509,7 +512,7 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op)
|
||||||
},
|
},
|
||||||
.len = (uint64_t)(stab_osd.len * sizeof(obj_ver_id)),
|
.len = (uint64_t)(stab_osd.len * sizeof(obj_ver_id)),
|
||||||
};
|
};
|
||||||
subops[i].send_list.push_back(op_data->unstable_writes + stab_osd.start, stab_osd.len * sizeof(obj_ver_id));
|
subops[i].iov.push_back(op_data->unstable_writes + stab_osd.start, stab_osd.len * sizeof(obj_ver_id));
|
||||||
subops[i].callback = [cur_op, this](osd_op_t *subop)
|
subops[i].callback = [cur_op, this](osd_op_t *subop)
|
||||||
{
|
{
|
||||||
int fail_fd = subop->reply.hdr.retval != 0 ? subop->peer_fd : -1;
|
int fail_fd = subop->reply.hdr.retval != 0 ? subop->peer_fd : -1;
|
||||||
|
|
|
@ -16,7 +16,7 @@ void osd_t::secondary_op_callback(osd_op_t *op)
|
||||||
if (op->req.hdr.opcode == OSD_OP_SECONDARY_READ &&
|
if (op->req.hdr.opcode == OSD_OP_SECONDARY_READ &&
|
||||||
op->bs_op->retval > 0)
|
op->bs_op->retval > 0)
|
||||||
{
|
{
|
||||||
op->send_list.push_back(op->buf, op->bs_op->retval);
|
op->iov.push_back(op->buf, op->bs_op->retval);
|
||||||
}
|
}
|
||||||
else if (op->req.hdr.opcode == OSD_OP_SECONDARY_LIST)
|
else if (op->req.hdr.opcode == OSD_OP_SECONDARY_LIST)
|
||||||
{
|
{
|
||||||
|
@ -24,7 +24,7 @@ void osd_t::secondary_op_callback(osd_op_t *op)
|
||||||
op->buf = op->bs_op->buf;
|
op->buf = op->bs_op->buf;
|
||||||
if (op->bs_op->retval > 0)
|
if (op->bs_op->retval > 0)
|
||||||
{
|
{
|
||||||
op->send_list.push_back(op->buf, op->bs_op->retval * sizeof(obj_ver_id));
|
op->iov.push_back(op->buf, op->bs_op->retval * sizeof(obj_ver_id));
|
||||||
}
|
}
|
||||||
op->reply.sec_list.stable_count = op->bs_op->version;
|
op->reply.sec_list.stable_count = op->bs_op->version;
|
||||||
}
|
}
|
||||||
|
@ -105,7 +105,7 @@ void osd_t::exec_show_config(osd_op_t *cur_op)
|
||||||
std::string cfg_str = json11::Json(config).dump();
|
std::string cfg_str = json11::Json(config).dump();
|
||||||
cur_op->buf = malloc(cfg_str.size()+1);
|
cur_op->buf = malloc(cfg_str.size()+1);
|
||||||
memcpy(cur_op->buf, cfg_str.c_str(), cfg_str.size()+1);
|
memcpy(cur_op->buf, cfg_str.c_str(), cfg_str.size()+1);
|
||||||
cur_op->send_list.push_back(cur_op->buf, cfg_str.size()+1);
|
cur_op->iov.push_back(cur_op->buf, cfg_str.size()+1);
|
||||||
finish_op(cur_op, cfg_str.size()+1);
|
finish_op(cur_op, cfg_str.size()+1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -105,12 +105,11 @@ void stub_exec_op(osd_messenger_t *msgr, osd_op_t *op)
|
||||||
op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC;
|
op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC;
|
||||||
op->reply.hdr.id = op->req.hdr.id;
|
op->reply.hdr.id = op->req.hdr.id;
|
||||||
op->reply.hdr.opcode = op->req.hdr.opcode;
|
op->reply.hdr.opcode = op->req.hdr.opcode;
|
||||||
op->send_list.push_back(op->reply.buf, OSD_PACKET_SIZE);
|
|
||||||
if (op->req.hdr.opcode == OSD_OP_SECONDARY_READ)
|
if (op->req.hdr.opcode == OSD_OP_SECONDARY_READ)
|
||||||
{
|
{
|
||||||
op->reply.hdr.retval = op->req.sec_rw.len;
|
op->reply.hdr.retval = op->req.sec_rw.len;
|
||||||
op->buf = malloc(op->req.sec_rw.len);
|
op->buf = malloc(op->req.sec_rw.len);
|
||||||
op->send_list.push_back(op->buf, op->req.sec_rw.len);
|
op->iov.push_back(op->buf, op->req.sec_rw.len);
|
||||||
}
|
}
|
||||||
else if (op->req.hdr.opcode == OSD_OP_SECONDARY_WRITE)
|
else if (op->req.hdr.opcode == OSD_OP_SECONDARY_WRITE)
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue