|
|
|
@ -29,16 +29,23 @@
|
|
|
|
|
#include <unordered_map>
|
|
|
|
|
|
|
|
|
|
#include "addr_util.h"
|
|
|
|
|
#include "rw_blocking.h"
|
|
|
|
|
#include "epoll_manager.h"
|
|
|
|
|
#include "ringloop.h"
|
|
|
|
|
#include "messenger.h"
|
|
|
|
|
#include "osd_ops.h"
|
|
|
|
|
#include "fio_headers.h"
|
|
|
|
|
|
|
|
|
|
struct sec_data
|
|
|
|
|
{
|
|
|
|
|
epoll_manager_t *epmgr;
|
|
|
|
|
ring_loop_t *ringloop;
|
|
|
|
|
osd_messenger_t *msgr;
|
|
|
|
|
ring_consumer_t looper;
|
|
|
|
|
void *bitmap_buf;
|
|
|
|
|
int connect_fd;
|
|
|
|
|
uint64_t op_id;
|
|
|
|
|
/* block_size = 1 << block_order (128KB by default) */
|
|
|
|
|
uint64_t block_order = 17, block_size = 1 << 17;
|
|
|
|
|
std::unordered_map<uint64_t, io_u*> queue;
|
|
|
|
|
bool last_sync = false;
|
|
|
|
|
/* The list of completed io_u structs. */
|
|
|
|
|
std::vector<io_u*> completed;
|
|
|
|
@ -111,9 +118,6 @@ static struct fio_option options[] = {
|
|
|
|
|
static int sec_setup(struct thread_data *td)
|
|
|
|
|
{
|
|
|
|
|
sec_data *bsd;
|
|
|
|
|
//fio_file *f;
|
|
|
|
|
//int r;
|
|
|
|
|
//int64_t size;
|
|
|
|
|
|
|
|
|
|
bsd = new sec_data;
|
|
|
|
|
if (!bsd)
|
|
|
|
@ -130,8 +134,6 @@ static int sec_setup(struct thread_data *td)
|
|
|
|
|
td->o.open_files++;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//f = td->files[0];
|
|
|
|
|
//f->real_file_size = size;
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -140,6 +142,10 @@ static void sec_cleanup(struct thread_data *td)
|
|
|
|
|
sec_data *bsd = (sec_data*)td->io_ops_data;
|
|
|
|
|
if (bsd)
|
|
|
|
|
{
|
|
|
|
|
delete bsd->msgr;
|
|
|
|
|
delete bsd->epmgr;
|
|
|
|
|
delete bsd->ringloop;
|
|
|
|
|
free(bsd->bitmap_buf);
|
|
|
|
|
close(bsd->connect_fd);
|
|
|
|
|
delete bsd;
|
|
|
|
|
}
|
|
|
|
@ -174,6 +180,45 @@ static int sec_init(struct thread_data *td)
|
|
|
|
|
int one = 1;
|
|
|
|
|
setsockopt(bsd->connect_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
|
|
|
|
|
|
|
|
|
|
fcntl(bsd->connect_fd, F_SETFL, fcntl(bsd->connect_fd, F_GETFL, 0) | O_NONBLOCK);
|
|
|
|
|
|
|
|
|
|
json11::Json cfg = json11::Json::object{ { "use_rdma", 0 } };
|
|
|
|
|
|
|
|
|
|
bsd->bitmap_buf = malloc(4096);
|
|
|
|
|
|
|
|
|
|
bsd->ringloop = new ring_loop_t(512);
|
|
|
|
|
bsd->epmgr = new epoll_manager_t(bsd->ringloop);
|
|
|
|
|
bsd->msgr = new osd_messenger_t();
|
|
|
|
|
bsd->msgr->tfd = bsd->epmgr->tfd;
|
|
|
|
|
bsd->msgr->ringloop = bsd->ringloop;
|
|
|
|
|
bsd->msgr->repeer_pgs = [](osd_num_t){};
|
|
|
|
|
bsd->msgr->parse_config(cfg);
|
|
|
|
|
bsd->msgr->init();
|
|
|
|
|
|
|
|
|
|
bsd->looper.loop = [bsd]()
|
|
|
|
|
{
|
|
|
|
|
bsd->msgr->read_requests();
|
|
|
|
|
bsd->msgr->send_replies();
|
|
|
|
|
bsd->ringloop->submit();
|
|
|
|
|
};
|
|
|
|
|
bsd->ringloop->register_consumer(&bsd->looper);
|
|
|
|
|
|
|
|
|
|
int peer_fd = bsd->connect_fd;
|
|
|
|
|
bsd->msgr->clients[peer_fd] = new osd_client_t();
|
|
|
|
|
bsd->msgr->clients[peer_fd]->peer_addr = addr;
|
|
|
|
|
bsd->msgr->clients[peer_fd]->peer_port = ntohs(((sockaddr_in*)&addr)->sin_port);
|
|
|
|
|
bsd->msgr->clients[peer_fd]->peer_fd = peer_fd;
|
|
|
|
|
bsd->msgr->clients[peer_fd]->peer_state = PEER_CONNECTED;
|
|
|
|
|
bsd->msgr->clients[peer_fd]->connect_timeout_id = -1;
|
|
|
|
|
bsd->msgr->clients[peer_fd]->osd_num = 1;
|
|
|
|
|
bsd->msgr->clients[peer_fd]->in_buf = malloc_or_die(bsd->msgr->receive_buffer_size);
|
|
|
|
|
bsd->epmgr->tfd->set_fd_handler(peer_fd, true, [msgr = bsd->msgr](int peer_fd, int epoll_events)
|
|
|
|
|
{
|
|
|
|
|
// Either OUT (connected) or HUP
|
|
|
|
|
msgr->handle_peer_epoll(peer_fd, epoll_events);
|
|
|
|
|
});
|
|
|
|
|
bsd->msgr->osd_peer_fds[1] = peer_fd;
|
|
|
|
|
|
|
|
|
|
// FIXME: read config (block size) from OSD
|
|
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
@ -193,7 +238,12 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
io->engine_data = bsd;
|
|
|
|
|
osd_any_op_t op = { 0 };
|
|
|
|
|
|
|
|
|
|
osd_op_t *oo = new osd_op_t();
|
|
|
|
|
oo->op_type = OSD_OP_OUT;
|
|
|
|
|
oo->peer_fd = bsd->connect_fd;
|
|
|
|
|
|
|
|
|
|
osd_any_op_t & op = oo->req;
|
|
|
|
|
|
|
|
|
|
op.hdr.magic = SECONDARY_OSD_OP_MAGIC;
|
|
|
|
|
op.hdr.id = n;
|
|
|
|
@ -210,6 +260,9 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
|
|
|
|
|
op.sec_rw.version = UINT64_MAX; // last unstable
|
|
|
|
|
op.sec_rw.offset = io->offset % bsd->block_size;
|
|
|
|
|
op.sec_rw.len = io->xfer_buflen;
|
|
|
|
|
op.sec_rw.attr_len = 4;
|
|
|
|
|
oo->bitmap = bsd->bitmap_buf;
|
|
|
|
|
oo->bitmap_len = 4;
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
@ -218,6 +271,7 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
|
|
|
|
|
op.rw.offset = io->offset;
|
|
|
|
|
op.rw.len = io->xfer_buflen;
|
|
|
|
|
}
|
|
|
|
|
oo->iov.push_back(io->xfer_buf, io->xfer_buflen);
|
|
|
|
|
bsd->last_sync = false;
|
|
|
|
|
break;
|
|
|
|
|
case DDIR_WRITE:
|
|
|
|
@ -239,6 +293,7 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
|
|
|
|
|
op.rw.offset = io->offset;
|
|
|
|
|
op.rw.len = io->xfer_buflen;
|
|
|
|
|
}
|
|
|
|
|
oo->iov.push_back(io->xfer_buf, io->xfer_buflen);
|
|
|
|
|
bsd->last_sync = false;
|
|
|
|
|
break;
|
|
|
|
|
case DDIR_SYNC:
|
|
|
|
@ -260,6 +315,21 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
|
|
|
|
|
return FIO_Q_COMPLETED;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
oo->callback = [td, io](osd_op_t *oo)
|
|
|
|
|
{
|
|
|
|
|
sec_options *opt = (sec_options*)td->eo;
|
|
|
|
|
sec_data *bsd = (sec_data*)td->io_ops_data;
|
|
|
|
|
if (opt->trace)
|
|
|
|
|
{
|
|
|
|
|
printf("--- %s # %ld %ld\n", io->ddir == DDIR_READ ? "READ" :
|
|
|
|
|
(io->ddir == DDIR_WRITE ? "WRITE" : "SYNC"), oo->reply.hdr.id, oo->reply.hdr.retval);
|
|
|
|
|
}
|
|
|
|
|
io->error = oo->reply.hdr.retval < 0 ? -oo->reply.hdr.retval : 0;
|
|
|
|
|
bsd->completed.push_back(io);
|
|
|
|
|
delete oo;
|
|
|
|
|
};
|
|
|
|
|
bsd->msgr->outbox_push(oo);
|
|
|
|
|
|
|
|
|
|
if (opt->trace)
|
|
|
|
|
{
|
|
|
|
|
printf("+++ %s # %d\n", io->ddir == DDIR_READ ? "READ" :
|
|
|
|
@ -269,21 +339,6 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
|
|
|
|
|
io->error = 0;
|
|
|
|
|
bsd->inflight++;
|
|
|
|
|
bsd->op_n++;
|
|
|
|
|
bsd->queue[n] = io;
|
|
|
|
|
|
|
|
|
|
iovec iov[2] = { { .iov_base = op.buf, .iov_len = OSD_PACKET_SIZE } };
|
|
|
|
|
int iovcnt = 1, wtotal = OSD_PACKET_SIZE;
|
|
|
|
|
if (io->ddir == DDIR_WRITE)
|
|
|
|
|
{
|
|
|
|
|
iov[1] = { .iov_base = io->xfer_buf, .iov_len = io->xfer_buflen };
|
|
|
|
|
wtotal += io->xfer_buflen;
|
|
|
|
|
iovcnt++;
|
|
|
|
|
}
|
|
|
|
|
if (writev_blocking(bsd->connect_fd, iov, iovcnt) != wtotal)
|
|
|
|
|
{
|
|
|
|
|
perror("writev");
|
|
|
|
|
exit(1);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (io->error != 0)
|
|
|
|
|
return FIO_Q_COMPLETED;
|
|
|
|
@ -292,57 +347,13 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
|
|
|
|
|
|
|
|
|
|
static int sec_getevents(struct thread_data *td, unsigned int min, unsigned int max, const struct timespec *t)
|
|
|
|
|
{
|
|
|
|
|
sec_options *opt = (sec_options*)td->eo;
|
|
|
|
|
sec_data *bsd = (sec_data*)td->io_ops_data;
|
|
|
|
|
// FIXME timeout, at least poll. Now it's the stupidest implementation possible
|
|
|
|
|
osd_any_reply_t reply;
|
|
|
|
|
while (bsd->completed.size() < min)
|
|
|
|
|
{
|
|
|
|
|
read_blocking(bsd->connect_fd, reply.buf, OSD_PACKET_SIZE);
|
|
|
|
|
if (reply.hdr.magic != SECONDARY_OSD_REPLY_MAGIC)
|
|
|
|
|
{
|
|
|
|
|
fprintf(stderr, "bad reply: magic = %lx instead of %lx\n", reply.hdr.magic, SECONDARY_OSD_REPLY_MAGIC);
|
|
|
|
|
exit(1);
|
|
|
|
|
}
|
|
|
|
|
auto it = bsd->queue.find(reply.hdr.id);
|
|
|
|
|
if (it == bsd->queue.end())
|
|
|
|
|
{
|
|
|
|
|
fprintf(stderr, "bad reply: op id %lx missing in local queue\n", reply.hdr.id);
|
|
|
|
|
exit(1);
|
|
|
|
|
}
|
|
|
|
|
io_u* io = it->second;
|
|
|
|
|
bsd->queue.erase(it);
|
|
|
|
|
if (io->ddir == DDIR_READ)
|
|
|
|
|
{
|
|
|
|
|
if (reply.hdr.retval != io->xfer_buflen)
|
|
|
|
|
{
|
|
|
|
|
fprintf(stderr, "Short read: retval = %ld instead of %llu\n", reply.hdr.retval, io->xfer_buflen);
|
|
|
|
|
exit(1);
|
|
|
|
|
}
|
|
|
|
|
read_blocking(bsd->connect_fd, io->xfer_buf, io->xfer_buflen);
|
|
|
|
|
}
|
|
|
|
|
else if (io->ddir == DDIR_WRITE)
|
|
|
|
|
{
|
|
|
|
|
if (reply.hdr.retval != io->xfer_buflen)
|
|
|
|
|
{
|
|
|
|
|
fprintf(stderr, "Short write: retval = %ld instead of %llu\n", reply.hdr.retval, io->xfer_buflen);
|
|
|
|
|
exit(1);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
else if (io->ddir == DDIR_SYNC)
|
|
|
|
|
{
|
|
|
|
|
if (reply.hdr.retval != 0)
|
|
|
|
|
{
|
|
|
|
|
fprintf(stderr, "Sync failed: retval = %ld\n", reply.hdr.retval);
|
|
|
|
|
exit(1);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (opt->trace)
|
|
|
|
|
{
|
|
|
|
|
printf("--- %s # %ld\n", io->ddir == DDIR_READ ? "READ" :
|
|
|
|
|
(io->ddir == DDIR_WRITE ? "WRITE" : "SYNC"), reply.hdr.id);
|
|
|
|
|
}
|
|
|
|
|
bsd->completed.push_back(io);
|
|
|
|
|
bsd->ringloop->loop();
|
|
|
|
|
if (bsd->completed.size() >= min)
|
|
|
|
|
break;
|
|
|
|
|
bsd->ringloop->wait();
|
|
|
|
|
}
|
|
|
|
|
return bsd->completed.size();
|
|
|
|
|
}
|
|
|
|
|