Support iovecs in cluster_client_t
parent
a22d9f38aa
commit
d68370304e
|
@ -312,6 +312,7 @@ void cluster_client_t::continue_rw(cluster_op_t *op)
|
||||||
op_copy->offset = op->offset;
|
op_copy->offset = op->offset;
|
||||||
op_copy->len = op->len;
|
op_copy->len = op->len;
|
||||||
op_copy->buf = malloc(op->len);
|
op_copy->buf = malloc(op->len);
|
||||||
|
op_copy->iov.push_back(op_copy->buf, op->len);
|
||||||
op_copy->callback = [](cluster_op_t* op_copy)
|
op_copy->callback = [](cluster_op_t* op_copy)
|
||||||
{
|
{
|
||||||
if (op_copy->orig_op)
|
if (op_copy->orig_op)
|
||||||
|
@ -322,7 +323,12 @@ void cluster_client_t::continue_rw(cluster_op_t *op)
|
||||||
op_copy->orig_op = NULL;
|
op_copy->orig_op = NULL;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
memcpy(op_copy->buf, op->buf, op->len);
|
void *cur_buf = op_copy->buf;
|
||||||
|
for (int i = 0; i < op->iov.count; i++)
|
||||||
|
{
|
||||||
|
memcpy(cur_buf, op->iov.buf[i].iov_base, op->iov.buf[i].iov_len);
|
||||||
|
cur_buf += op->iov.buf[i].iov_len;
|
||||||
|
}
|
||||||
unsynced_writes.push_back(op_copy);
|
unsynced_writes.push_back(op_copy);
|
||||||
cur_ops.erase(op);
|
cur_ops.erase(op);
|
||||||
cur_ops.insert(op_copy);
|
cur_ops.insert(op_copy);
|
||||||
|
@ -407,6 +413,8 @@ void cluster_client_t::slice_rw(cluster_op_t *op)
|
||||||
uint64_t last_stripe = ((op->offset + op->len + pg_block_size - 1) / pg_block_size - 1) * pg_block_size;
|
uint64_t last_stripe = ((op->offset + op->len + pg_block_size - 1) / pg_block_size - 1) * pg_block_size;
|
||||||
op->retval = 0;
|
op->retval = 0;
|
||||||
op->parts.resize((last_stripe - first_stripe) / pg_block_size + 1);
|
op->parts.resize((last_stripe - first_stripe) / pg_block_size + 1);
|
||||||
|
int iov_idx = 0;
|
||||||
|
size_t iov_pos = 0;
|
||||||
int i = 0;
|
int i = 0;
|
||||||
for (uint64_t stripe = first_stripe; stripe <= last_stripe; stripe += pg_block_size)
|
for (uint64_t stripe = first_stripe; stripe <= last_stripe; stripe += pg_block_size)
|
||||||
{
|
{
|
||||||
|
@ -419,10 +427,27 @@ void cluster_client_t::slice_rw(cluster_op_t *op)
|
||||||
.offset = begin,
|
.offset = begin,
|
||||||
.len = (uint32_t)(end - begin),
|
.len = (uint32_t)(end - begin),
|
||||||
.pg_num = pg_num,
|
.pg_num = pg_num,
|
||||||
.buf = op->buf + begin - op->offset,
|
|
||||||
.sent = false,
|
.sent = false,
|
||||||
.done = false,
|
.done = false,
|
||||||
};
|
};
|
||||||
|
int left = end-begin;
|
||||||
|
while (left > 0 && iov_idx < op->iov.count)
|
||||||
|
{
|
||||||
|
if (op->iov.buf[iov_idx].iov_len - iov_pos > left)
|
||||||
|
{
|
||||||
|
op->parts[i].iov.push_back(op->iov.buf[iov_idx].iov_base + iov_pos, op->iov.buf[iov_idx].iov_len - iov_pos);
|
||||||
|
left -= (op->iov.buf[iov_idx].iov_len - iov_pos);
|
||||||
|
iov_pos = 0;
|
||||||
|
iov_idx++;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
op->parts[i].iov.push_back(op->iov.buf[iov_idx].iov_base + iov_pos, left);
|
||||||
|
iov_pos += left;
|
||||||
|
left = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert(left == 0);
|
||||||
i++;
|
i++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -459,7 +484,7 @@ bool cluster_client_t::try_send(cluster_op_t *op, cluster_op_part_t *part)
|
||||||
handle_op_part(part);
|
handle_op_part(part);
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
part->op.iov.push_back(part->buf, part->len);
|
part->op.iov = part->iov;
|
||||||
msgr.outbox_push(&part->op);
|
msgr.outbox_push(&part->op);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -619,7 +644,6 @@ void cluster_client_t::handle_op_part(cluster_op_part_t *part)
|
||||||
cluster_op_t *op = part->parent;
|
cluster_op_t *op = part->parent;
|
||||||
part->sent = false;
|
part->sent = false;
|
||||||
op->sent_count--;
|
op->sent_count--;
|
||||||
part->op.buf = NULL;
|
|
||||||
int expected = part->op.req.hdr.opcode == OSD_OP_SYNC ? 0 : part->op.req.rw.len;
|
int expected = part->op.req.hdr.opcode == OSD_OP_SYNC ? 0 : part->op.req.rw.len;
|
||||||
if (part->op.reply.hdr.retval != expected)
|
if (part->op.reply.hdr.retval != expected)
|
||||||
{
|
{
|
||||||
|
|
|
@ -20,7 +20,7 @@ struct cluster_op_part_t
|
||||||
uint32_t len;
|
uint32_t len;
|
||||||
pg_num_t pg_num;
|
pg_num_t pg_num;
|
||||||
osd_num_t osd_num;
|
osd_num_t osd_num;
|
||||||
void *buf;
|
osd_op_buf_list_t iov;
|
||||||
bool sent;
|
bool sent;
|
||||||
bool done;
|
bool done;
|
||||||
osd_op_t op;
|
osd_op_t op;
|
||||||
|
@ -33,9 +33,10 @@ struct cluster_op_t
|
||||||
uint64_t offset;
|
uint64_t offset;
|
||||||
uint64_t len;
|
uint64_t len;
|
||||||
int retval;
|
int retval;
|
||||||
void *buf;
|
osd_op_buf_list_t iov;
|
||||||
std::function<void(cluster_op_t*)> callback;
|
std::function<void(cluster_op_t*)> callback;
|
||||||
protected:
|
protected:
|
||||||
|
void *buf = NULL;
|
||||||
cluster_op_t *orig_op = NULL;
|
cluster_op_t *orig_op = NULL;
|
||||||
bool is_internal = false;
|
bool is_internal = false;
|
||||||
bool needs_reslice = false;
|
bool needs_reslice = false;
|
||||||
|
|
|
@ -175,7 +175,7 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
|
||||||
op->inode = opt->inode;
|
op->inode = opt->inode;
|
||||||
op->offset = io->offset;
|
op->offset = io->offset;
|
||||||
op->len = io->xfer_buflen;
|
op->len = io->xfer_buflen;
|
||||||
op->buf = io->xfer_buf;
|
op->iov.push_back(io->xfer_buf, io->xfer_buflen);
|
||||||
bsd->last_sync = false;
|
bsd->last_sync = false;
|
||||||
break;
|
break;
|
||||||
case DDIR_WRITE:
|
case DDIR_WRITE:
|
||||||
|
@ -183,7 +183,7 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
|
||||||
op->inode = opt->inode;
|
op->inode = opt->inode;
|
||||||
op->offset = io->offset;
|
op->offset = io->offset;
|
||||||
op->len = io->xfer_buflen;
|
op->len = io->xfer_buflen;
|
||||||
op->buf = io->xfer_buf;
|
op->iov.push_back(io->xfer_buf, io->xfer_buflen);
|
||||||
bsd->last_sync = false;
|
bsd->last_sync = false;
|
||||||
break;
|
break;
|
||||||
case DDIR_SYNC:
|
case DDIR_SYNC:
|
||||||
|
|
Loading…
Reference in New Issue