forked from vitalif/vitastor
Compare commits
1 Commits
master
...
sec_osd_ms
Author | SHA1 | Date |
---|---|---|
Vitaliy Filippov | 68d4f2a481 |
|
@ -111,11 +111,12 @@ if (${WITH_FIO})
|
||||||
# libfio_vitastor_sec.so
|
# libfio_vitastor_sec.so
|
||||||
add_library(fio_vitastor_sec SHARED
|
add_library(fio_vitastor_sec SHARED
|
||||||
fio_sec_osd.cpp
|
fio_sec_osd.cpp
|
||||||
rw_blocking.cpp
|
|
||||||
addr_util.cpp
|
|
||||||
)
|
)
|
||||||
target_link_libraries(fio_vitastor_sec
|
target_link_libraries(fio_vitastor_sec
|
||||||
|
vitastor_common
|
||||||
tcmalloc_minimal
|
tcmalloc_minimal
|
||||||
|
${LIBURING_LIBRARIES}
|
||||||
|
${IBVERBS_LIBRARIES}
|
||||||
)
|
)
|
||||||
endif (${WITH_FIO})
|
endif (${WITH_FIO})
|
||||||
|
|
||||||
|
|
|
@ -29,16 +29,23 @@
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
|
|
||||||
#include "addr_util.h"
|
#include "addr_util.h"
|
||||||
#include "rw_blocking.h"
|
#include "epoll_manager.h"
|
||||||
|
#include "ringloop.h"
|
||||||
|
#include "messenger.h"
|
||||||
#include "osd_ops.h"
|
#include "osd_ops.h"
|
||||||
#include "fio_headers.h"
|
#include "fio_headers.h"
|
||||||
|
|
||||||
struct sec_data
|
struct sec_data
|
||||||
{
|
{
|
||||||
|
epoll_manager_t *epmgr;
|
||||||
|
ring_loop_t *ringloop;
|
||||||
|
osd_messenger_t *msgr;
|
||||||
|
ring_consumer_t looper;
|
||||||
|
void *bitmap_buf;
|
||||||
int connect_fd;
|
int connect_fd;
|
||||||
|
uint64_t op_id;
|
||||||
/* block_size = 1 << block_order (128KB by default) */
|
/* block_size = 1 << block_order (128KB by default) */
|
||||||
uint64_t block_order = 17, block_size = 1 << 17;
|
uint64_t block_order = 17, block_size = 1 << 17;
|
||||||
std::unordered_map<uint64_t, io_u*> queue;
|
|
||||||
bool last_sync = false;
|
bool last_sync = false;
|
||||||
/* The list of completed io_u structs. */
|
/* The list of completed io_u structs. */
|
||||||
std::vector<io_u*> completed;
|
std::vector<io_u*> completed;
|
||||||
|
@ -111,9 +118,6 @@ static struct fio_option options[] = {
|
||||||
static int sec_setup(struct thread_data *td)
|
static int sec_setup(struct thread_data *td)
|
||||||
{
|
{
|
||||||
sec_data *bsd;
|
sec_data *bsd;
|
||||||
//fio_file *f;
|
|
||||||
//int r;
|
|
||||||
//int64_t size;
|
|
||||||
|
|
||||||
bsd = new sec_data;
|
bsd = new sec_data;
|
||||||
if (!bsd)
|
if (!bsd)
|
||||||
|
@ -130,8 +134,6 @@ static int sec_setup(struct thread_data *td)
|
||||||
td->o.open_files++;
|
td->o.open_files++;
|
||||||
}
|
}
|
||||||
|
|
||||||
//f = td->files[0];
|
|
||||||
//f->real_file_size = size;
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -140,6 +142,10 @@ static void sec_cleanup(struct thread_data *td)
|
||||||
sec_data *bsd = (sec_data*)td->io_ops_data;
|
sec_data *bsd = (sec_data*)td->io_ops_data;
|
||||||
if (bsd)
|
if (bsd)
|
||||||
{
|
{
|
||||||
|
delete bsd->msgr;
|
||||||
|
delete bsd->epmgr;
|
||||||
|
delete bsd->ringloop;
|
||||||
|
free(bsd->bitmap_buf);
|
||||||
close(bsd->connect_fd);
|
close(bsd->connect_fd);
|
||||||
delete bsd;
|
delete bsd;
|
||||||
}
|
}
|
||||||
|
@ -174,6 +180,45 @@ static int sec_init(struct thread_data *td)
|
||||||
int one = 1;
|
int one = 1;
|
||||||
setsockopt(bsd->connect_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
|
setsockopt(bsd->connect_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
|
||||||
|
|
||||||
|
fcntl(bsd->connect_fd, F_SETFL, fcntl(bsd->connect_fd, F_GETFL, 0) | O_NONBLOCK);
|
||||||
|
|
||||||
|
json11::Json cfg = json11::Json::object{ { "use_rdma", 0 } };
|
||||||
|
|
||||||
|
bsd->bitmap_buf = malloc(4096);
|
||||||
|
|
||||||
|
bsd->ringloop = new ring_loop_t(512);
|
||||||
|
bsd->epmgr = new epoll_manager_t(bsd->ringloop);
|
||||||
|
bsd->msgr = new osd_messenger_t();
|
||||||
|
bsd->msgr->tfd = bsd->epmgr->tfd;
|
||||||
|
bsd->msgr->ringloop = bsd->ringloop;
|
||||||
|
bsd->msgr->repeer_pgs = [](osd_num_t){};
|
||||||
|
bsd->msgr->parse_config(cfg);
|
||||||
|
bsd->msgr->init();
|
||||||
|
|
||||||
|
bsd->looper.loop = [bsd]()
|
||||||
|
{
|
||||||
|
bsd->msgr->read_requests();
|
||||||
|
bsd->msgr->send_replies();
|
||||||
|
bsd->ringloop->submit();
|
||||||
|
};
|
||||||
|
bsd->ringloop->register_consumer(&bsd->looper);
|
||||||
|
|
||||||
|
int peer_fd = bsd->connect_fd;
|
||||||
|
bsd->msgr->clients[peer_fd] = new osd_client_t();
|
||||||
|
bsd->msgr->clients[peer_fd]->peer_addr = addr;
|
||||||
|
bsd->msgr->clients[peer_fd]->peer_port = ntohs(((sockaddr_in*)&addr)->sin_port);
|
||||||
|
bsd->msgr->clients[peer_fd]->peer_fd = peer_fd;
|
||||||
|
bsd->msgr->clients[peer_fd]->peer_state = PEER_CONNECTED;
|
||||||
|
bsd->msgr->clients[peer_fd]->connect_timeout_id = -1;
|
||||||
|
bsd->msgr->clients[peer_fd]->osd_num = 1;
|
||||||
|
bsd->msgr->clients[peer_fd]->in_buf = malloc_or_die(bsd->msgr->receive_buffer_size);
|
||||||
|
bsd->epmgr->tfd->set_fd_handler(peer_fd, true, [msgr = bsd->msgr](int peer_fd, int epoll_events)
|
||||||
|
{
|
||||||
|
// Either OUT (connected) or HUP
|
||||||
|
msgr->handle_peer_epoll(peer_fd, epoll_events);
|
||||||
|
});
|
||||||
|
bsd->msgr->osd_peer_fds[1] = peer_fd;
|
||||||
|
|
||||||
// FIXME: read config (block size) from OSD
|
// FIXME: read config (block size) from OSD
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -193,7 +238,12 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
|
||||||
}
|
}
|
||||||
|
|
||||||
io->engine_data = bsd;
|
io->engine_data = bsd;
|
||||||
osd_any_op_t op = { 0 };
|
|
||||||
|
osd_op_t *oo = new osd_op_t();
|
||||||
|
oo->op_type = OSD_OP_OUT;
|
||||||
|
oo->peer_fd = bsd->connect_fd;
|
||||||
|
|
||||||
|
osd_any_op_t & op = oo->req;
|
||||||
|
|
||||||
op.hdr.magic = SECONDARY_OSD_OP_MAGIC;
|
op.hdr.magic = SECONDARY_OSD_OP_MAGIC;
|
||||||
op.hdr.id = n;
|
op.hdr.id = n;
|
||||||
|
@ -210,6 +260,9 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
|
||||||
op.sec_rw.version = UINT64_MAX; // last unstable
|
op.sec_rw.version = UINT64_MAX; // last unstable
|
||||||
op.sec_rw.offset = io->offset % bsd->block_size;
|
op.sec_rw.offset = io->offset % bsd->block_size;
|
||||||
op.sec_rw.len = io->xfer_buflen;
|
op.sec_rw.len = io->xfer_buflen;
|
||||||
|
op.sec_rw.attr_len = 4;
|
||||||
|
oo->bitmap = bsd->bitmap_buf;
|
||||||
|
oo->bitmap_len = 4;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -218,6 +271,7 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
|
||||||
op.rw.offset = io->offset;
|
op.rw.offset = io->offset;
|
||||||
op.rw.len = io->xfer_buflen;
|
op.rw.len = io->xfer_buflen;
|
||||||
}
|
}
|
||||||
|
oo->iov.push_back(io->xfer_buf, io->xfer_buflen);
|
||||||
bsd->last_sync = false;
|
bsd->last_sync = false;
|
||||||
break;
|
break;
|
||||||
case DDIR_WRITE:
|
case DDIR_WRITE:
|
||||||
|
@ -239,6 +293,7 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
|
||||||
op.rw.offset = io->offset;
|
op.rw.offset = io->offset;
|
||||||
op.rw.len = io->xfer_buflen;
|
op.rw.len = io->xfer_buflen;
|
||||||
}
|
}
|
||||||
|
oo->iov.push_back(io->xfer_buf, io->xfer_buflen);
|
||||||
bsd->last_sync = false;
|
bsd->last_sync = false;
|
||||||
break;
|
break;
|
||||||
case DDIR_SYNC:
|
case DDIR_SYNC:
|
||||||
|
@ -260,6 +315,21 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
|
||||||
return FIO_Q_COMPLETED;
|
return FIO_Q_COMPLETED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
oo->callback = [td, io](osd_op_t *oo)
|
||||||
|
{
|
||||||
|
sec_options *opt = (sec_options*)td->eo;
|
||||||
|
sec_data *bsd = (sec_data*)td->io_ops_data;
|
||||||
|
if (opt->trace)
|
||||||
|
{
|
||||||
|
printf("--- %s # %ld %ld\n", io->ddir == DDIR_READ ? "READ" :
|
||||||
|
(io->ddir == DDIR_WRITE ? "WRITE" : "SYNC"), oo->reply.hdr.id, oo->reply.hdr.retval);
|
||||||
|
}
|
||||||
|
io->error = oo->reply.hdr.retval < 0 ? -oo->reply.hdr.retval : 0;
|
||||||
|
bsd->completed.push_back(io);
|
||||||
|
delete oo;
|
||||||
|
};
|
||||||
|
bsd->msgr->outbox_push(oo);
|
||||||
|
|
||||||
if (opt->trace)
|
if (opt->trace)
|
||||||
{
|
{
|
||||||
printf("+++ %s # %d\n", io->ddir == DDIR_READ ? "READ" :
|
printf("+++ %s # %d\n", io->ddir == DDIR_READ ? "READ" :
|
||||||
|
@ -269,21 +339,6 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
|
||||||
io->error = 0;
|
io->error = 0;
|
||||||
bsd->inflight++;
|
bsd->inflight++;
|
||||||
bsd->op_n++;
|
bsd->op_n++;
|
||||||
bsd->queue[n] = io;
|
|
||||||
|
|
||||||
iovec iov[2] = { { .iov_base = op.buf, .iov_len = OSD_PACKET_SIZE } };
|
|
||||||
int iovcnt = 1, wtotal = OSD_PACKET_SIZE;
|
|
||||||
if (io->ddir == DDIR_WRITE)
|
|
||||||
{
|
|
||||||
iov[1] = { .iov_base = io->xfer_buf, .iov_len = io->xfer_buflen };
|
|
||||||
wtotal += io->xfer_buflen;
|
|
||||||
iovcnt++;
|
|
||||||
}
|
|
||||||
if (writev_blocking(bsd->connect_fd, iov, iovcnt) != wtotal)
|
|
||||||
{
|
|
||||||
perror("writev");
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (io->error != 0)
|
if (io->error != 0)
|
||||||
return FIO_Q_COMPLETED;
|
return FIO_Q_COMPLETED;
|
||||||
|
@ -292,57 +347,13 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
|
||||||
|
|
||||||
static int sec_getevents(struct thread_data *td, unsigned int min, unsigned int max, const struct timespec *t)
|
static int sec_getevents(struct thread_data *td, unsigned int min, unsigned int max, const struct timespec *t)
|
||||||
{
|
{
|
||||||
sec_options *opt = (sec_options*)td->eo;
|
|
||||||
sec_data *bsd = (sec_data*)td->io_ops_data;
|
sec_data *bsd = (sec_data*)td->io_ops_data;
|
||||||
// FIXME timeout, at least poll. Now it's the stupidest implementation possible
|
|
||||||
osd_any_reply_t reply;
|
|
||||||
while (bsd->completed.size() < min)
|
while (bsd->completed.size() < min)
|
||||||
{
|
{
|
||||||
read_blocking(bsd->connect_fd, reply.buf, OSD_PACKET_SIZE);
|
bsd->ringloop->loop();
|
||||||
if (reply.hdr.magic != SECONDARY_OSD_REPLY_MAGIC)
|
if (bsd->completed.size() >= min)
|
||||||
{
|
break;
|
||||||
fprintf(stderr, "bad reply: magic = %lx instead of %lx\n", reply.hdr.magic, SECONDARY_OSD_REPLY_MAGIC);
|
bsd->ringloop->wait();
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
auto it = bsd->queue.find(reply.hdr.id);
|
|
||||||
if (it == bsd->queue.end())
|
|
||||||
{
|
|
||||||
fprintf(stderr, "bad reply: op id %lx missing in local queue\n", reply.hdr.id);
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
io_u* io = it->second;
|
|
||||||
bsd->queue.erase(it);
|
|
||||||
if (io->ddir == DDIR_READ)
|
|
||||||
{
|
|
||||||
if (reply.hdr.retval != io->xfer_buflen)
|
|
||||||
{
|
|
||||||
fprintf(stderr, "Short read: retval = %ld instead of %llu\n", reply.hdr.retval, io->xfer_buflen);
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
read_blocking(bsd->connect_fd, io->xfer_buf, io->xfer_buflen);
|
|
||||||
}
|
|
||||||
else if (io->ddir == DDIR_WRITE)
|
|
||||||
{
|
|
||||||
if (reply.hdr.retval != io->xfer_buflen)
|
|
||||||
{
|
|
||||||
fprintf(stderr, "Short write: retval = %ld instead of %llu\n", reply.hdr.retval, io->xfer_buflen);
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if (io->ddir == DDIR_SYNC)
|
|
||||||
{
|
|
||||||
if (reply.hdr.retval != 0)
|
|
||||||
{
|
|
||||||
fprintf(stderr, "Sync failed: retval = %ld\n", reply.hdr.retval);
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (opt->trace)
|
|
||||||
{
|
|
||||||
printf("--- %s # %ld\n", io->ddir == DDIR_READ ? "READ" :
|
|
||||||
(io->ddir == DDIR_WRITE ? "WRITE" : "SYNC"), reply.hdr.id);
|
|
||||||
}
|
|
||||||
bsd->completed.push_back(io);
|
|
||||||
}
|
}
|
||||||
return bsd->completed.size();
|
return bsd->completed.size();
|
||||||
}
|
}
|
||||||
|
|
|
@ -120,7 +120,6 @@ struct osd_messenger_t
|
||||||
protected:
|
protected:
|
||||||
int keepalive_timer_id = -1;
|
int keepalive_timer_id = -1;
|
||||||
|
|
||||||
uint32_t receive_buffer_size = 0;
|
|
||||||
int peer_connect_interval = 0;
|
int peer_connect_interval = 0;
|
||||||
int peer_connect_timeout = 0;
|
int peer_connect_timeout = 0;
|
||||||
int osd_idle_timeout = 0;
|
int osd_idle_timeout = 0;
|
||||||
|
@ -142,6 +141,8 @@ protected:
|
||||||
std::vector<std::function<void()>> set_immediate;
|
std::vector<std::function<void()>> set_immediate;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
uint32_t receive_buffer_size = 0;
|
||||||
|
|
||||||
timerfd_manager_t *tfd;
|
timerfd_manager_t *tfd;
|
||||||
ring_loop_t *ringloop;
|
ring_loop_t *ringloop;
|
||||||
// osd_num_t is only for logging and asserts
|
// osd_num_t is only for logging and asserts
|
||||||
|
@ -172,10 +173,11 @@ public:
|
||||||
bool connect_rdma(int peer_fd, std::string rdma_address, uint64_t client_max_msg);
|
bool connect_rdma(int peer_fd, std::string rdma_address, uint64_t client_max_msg);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
void handle_peer_epoll(int peer_fd, int epoll_events);
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
void try_connect_peer(uint64_t osd_num);
|
void try_connect_peer(uint64_t osd_num);
|
||||||
void try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port);
|
void try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port);
|
||||||
void handle_peer_epoll(int peer_fd, int epoll_events);
|
|
||||||
void handle_connect_epoll(int peer_fd);
|
void handle_connect_epoll(int peer_fd);
|
||||||
void on_connect_peer(osd_num_t peer_osd, int peer_fd);
|
void on_connect_peer(osd_num_t peer_osd, int peer_fd);
|
||||||
void check_peer_config(osd_client_t *cl);
|
void check_peer_config(osd_client_t *cl);
|
||||||
|
|
Loading…
Reference in New Issue