Finish reply code, make it compile
parent
02a0eb49c2
commit
d3c6314d01
15
Makefile
15
Makefile
|
@ -1,17 +1,18 @@
|
||||||
BLOCKSTORE_OBJS := allocator.o blockstore.o blockstore_init.o blockstore_open.o blockstore_journal.o blockstore_read.o \
|
BLOCKSTORE_OBJS := allocator.o blockstore.o blockstore_init.o blockstore_open.o blockstore_journal.o blockstore_read.o \
|
||||||
blockstore_write.o blockstore_sync.o blockstore_stable.o blockstore_flush.o crc32c.o ringloop.o timerfd_interval.o
|
blockstore_write.o blockstore_sync.o blockstore_stable.o blockstore_flush.o crc32c.o ringloop.o timerfd_interval.o osd.o
|
||||||
|
CXXFLAGS := -g -O3 -Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith -fPIC -fdiagnostics-color=always
|
||||||
all: $(BLOCKSTORE_OBJS) test test_blockstore libfio_blockstore.so
|
all: $(BLOCKSTORE_OBJS) test test_blockstore libfio_blockstore.so
|
||||||
clean:
|
clean:
|
||||||
rm -f *.o
|
rm -f *.o
|
||||||
crc32c.o: crc32c.c
|
crc32c.o: crc32c.c
|
||||||
g++ -g -O3 -fPIC -c -o $@ $<
|
g++ $(CXXFLAGS) -c -o $@ $<
|
||||||
%.o: %.cpp allocator.h blockstore_flush.h blockstore.h blockstore_init.h blockstore_journal.h crc32c.h ringloop.h xor.h timerfd_interval.h
|
%.o: %.cpp allocator.h blockstore_flush.h blockstore.h blockstore_init.h blockstore_journal.h crc32c.h ringloop.h xor.h timerfd_interval.h
|
||||||
g++ -g -O3 -Wall -Wno-sign-compare -Wno-parentheses -Wno-pointer-arith -fPIC -c -o $@ $<
|
g++ $(CXXFLAGS) -c -o $@ $<
|
||||||
test: test.cpp
|
test: test.cpp
|
||||||
g++ -g -O3 -o test -luring test.cpp
|
g++ $(CXXFLAGS) -o test -luring test.cpp
|
||||||
test_blockstore: $(BLOCKSTORE_OBJS) test_blockstore.cpp
|
test_blockstore: $(BLOCKSTORE_OBJS) test_blockstore.cpp
|
||||||
g++ -g -o test_blockstore -ltcmalloc_minimal -luring test_blockstore.cpp $(BLOCKSTORE_OBJS)
|
g++ $(CXXFLAGS) -o test_blockstore -ltcmalloc_minimal -luring test_blockstore.cpp $(BLOCKSTORE_OBJS)
|
||||||
test_allocator: test_allocator.cpp allocator.o
|
test_allocator: test_allocator.cpp allocator.o
|
||||||
g++ -g -o test_allocator test_allocator.cpp allocator.o
|
g++ $(CXXFLAGS) -o test_allocator test_allocator.cpp allocator.o
|
||||||
libfio_blockstore.so: fio_engine.cpp $(BLOCKSTORE_OBJS)
|
libfio_blockstore.so: fio_engine.cpp $(BLOCKSTORE_OBJS)
|
||||||
g++ -g -O3 -ltcmalloc_minimal -Wno-pointer-arith -fPIC -shared -luring -o libfio_blockstore.so fio_engine.cpp $(BLOCKSTORE_OBJS)
|
g++ $(CXXFLAGS) -ltcmalloc_minimal -shared -luring -o libfio_blockstore.so fio_engine.cpp $(BLOCKSTORE_OBJS)
|
||||||
|
|
|
@ -91,8 +91,8 @@ static struct fio_option options[] = {
|
||||||
static int bs_setup(struct thread_data *td)
|
static int bs_setup(struct thread_data *td)
|
||||||
{
|
{
|
||||||
bs_data *bsd;
|
bs_data *bsd;
|
||||||
fio_file *f;
|
//fio_file *f;
|
||||||
int r;
|
//int r;
|
||||||
//int64_t size;
|
//int64_t size;
|
||||||
|
|
||||||
bsd = new bs_data;
|
bsd = new bs_data;
|
||||||
|
@ -109,8 +109,8 @@ static int bs_setup(struct thread_data *td)
|
||||||
td->o.nr_files = td->o.nr_files ? : 1;
|
td->o.nr_files = td->o.nr_files ? : 1;
|
||||||
td->o.open_files++;
|
td->o.open_files++;
|
||||||
}
|
}
|
||||||
f = td->files[0];
|
|
||||||
|
|
||||||
|
//f = td->files[0];
|
||||||
//f->real_file_size = size;
|
//f->real_file_size = size;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
260
osd.cpp
260
osd.cpp
|
@ -1,13 +1,20 @@
|
||||||
#include <sys/socket.h>
|
#include <sys/socket.h>
|
||||||
#include <sys/epoll.h>
|
#include <sys/epoll.h>
|
||||||
|
#include <sys/poll.h>
|
||||||
#include <netinet/in.h>
|
#include <netinet/in.h>
|
||||||
#include <arpa/inet.h>
|
#include <arpa/inet.h>
|
||||||
|
|
||||||
|
#include <unordered_map>
|
||||||
|
|
||||||
#include "osd_ops.h"
|
#include "osd_ops.h"
|
||||||
#include "ringloop.h"
|
#include "ringloop.h"
|
||||||
|
|
||||||
#define CL_READ_OP 1
|
#define CL_READ_OP 1
|
||||||
#define CL_READ_DATA 2
|
#define CL_READ_DATA 2
|
||||||
|
#define SQE_SENT 0x100l
|
||||||
|
#define CL_WRITE_READY 1
|
||||||
|
#define CL_WRITE_REPLY 2
|
||||||
|
#define CL_WRITE_DATA 3
|
||||||
|
|
||||||
struct osd_op_t
|
struct osd_op_t
|
||||||
{
|
{
|
||||||
|
@ -24,6 +31,12 @@ struct osd_op_t
|
||||||
};
|
};
|
||||||
blockstore_operation bs_op;
|
blockstore_operation bs_op;
|
||||||
void *buf = NULL;
|
void *buf = NULL;
|
||||||
|
|
||||||
|
~osd_op_t()
|
||||||
|
{
|
||||||
|
if (buf)
|
||||||
|
free(buf);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
struct osd_client_t
|
struct osd_client_t
|
||||||
|
@ -48,7 +61,6 @@ struct osd_client_t
|
||||||
|
|
||||||
// Write state
|
// Write state
|
||||||
osd_op_t *write_op = NULL;
|
osd_op_t *write_op = NULL;
|
||||||
int write_state = 0;
|
|
||||||
iovec write_iov;
|
iovec write_iov;
|
||||||
msghdr write_msg;
|
msghdr write_msg;
|
||||||
void *write_buf = NULL;
|
void *write_buf = NULL;
|
||||||
|
@ -76,14 +88,21 @@ class osd_t
|
||||||
int bind_port, listen_backlog;
|
int bind_port, listen_backlog;
|
||||||
|
|
||||||
std::unordered_map<int,osd_client_t> clients;
|
std::unordered_map<int,osd_client_t> clients;
|
||||||
std::deque<int> read_ready_clients;
|
std::vector<int> read_ready_clients;
|
||||||
std::list<int> write_ready_clients;
|
std::vector<int> write_ready_clients;
|
||||||
|
|
||||||
void handle_epoll_events();
|
void loop();
|
||||||
|
int handle_epoll_events();
|
||||||
|
void stop_client(int peer_fd);
|
||||||
|
void read_requests();
|
||||||
|
void handle_read(ring_data_t *data, int peer_fd);
|
||||||
|
void enqueue_op(osd_op_t *cur_op);
|
||||||
|
void send_replies();
|
||||||
|
void make_reply(osd_op_t *op);
|
||||||
|
void handle_send(ring_data_t *data, int peer_fd);
|
||||||
public:
|
public:
|
||||||
osd_t(blockstore *bs, ring_loop_t *ringloop);
|
osd_t(blockstore *bs, ring_loop_t *ringloop);
|
||||||
~osd_t();
|
~osd_t();
|
||||||
void loop();
|
|
||||||
};
|
};
|
||||||
|
|
||||||
osd_t::osd_t(blockstore *bs, ring_loop_t *ringloop)
|
osd_t::osd_t(blockstore *bs, ring_loop_t *ringloop)
|
||||||
|
@ -100,7 +119,8 @@ osd_t::osd_t(blockstore *bs, ring_loop_t *ringloop)
|
||||||
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
|
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
|
||||||
|
|
||||||
sockaddr_in addr;
|
sockaddr_in addr;
|
||||||
if ((int r = inet_pton(AF_INET, bind_address.c_str(), &addr.sin_addr)) != 1)
|
int r;
|
||||||
|
if ((r = inet_pton(AF_INET, bind_address.c_str(), &addr.sin_addr)) != 1)
|
||||||
{
|
{
|
||||||
close(listen_fd);
|
close(listen_fd);
|
||||||
throw std::runtime_error("bind address "+bind_address+(r == 0 ? " is not valid" : ": no ipv4 support"));
|
throw std::runtime_error("bind address "+bind_address+(r == 0 ? " is not valid" : ": no ipv4 support"));
|
||||||
|
@ -108,7 +128,7 @@ osd_t::osd_t(blockstore *bs, ring_loop_t *ringloop)
|
||||||
addr.sin_family = AF_INET;
|
addr.sin_family = AF_INET;
|
||||||
addr.sin_port = htons(bind_port);
|
addr.sin_port = htons(bind_port);
|
||||||
|
|
||||||
if (bind(listen_fd, &addr, sizeof(addr)) < 0)
|
if (bind(listen_fd, (sockaddr*)&addr, sizeof(addr)) < 0)
|
||||||
{
|
{
|
||||||
close(listen_fd);
|
close(listen_fd);
|
||||||
throw std::runtime_error(std::string("bind: ") + strerror(errno));
|
throw std::runtime_error(std::string("bind: ") + strerror(errno));
|
||||||
|
@ -150,10 +170,8 @@ osd_t::~osd_t()
|
||||||
|
|
||||||
void osd_t::loop()
|
void osd_t::loop()
|
||||||
{
|
{
|
||||||
if (wait_state == 1)
|
if (wait_state == 0)
|
||||||
{
|
{
|
||||||
return;
|
|
||||||
}
|
|
||||||
io_uring_sqe *sqe = ringloop->get_sqe();
|
io_uring_sqe *sqe = ringloop->get_sqe();
|
||||||
if (!sqe)
|
if (!sqe)
|
||||||
{
|
{
|
||||||
|
@ -172,6 +190,9 @@ void osd_t::loop()
|
||||||
wait_state = 0;
|
wait_state = 0;
|
||||||
};
|
};
|
||||||
wait_state = 1;
|
wait_state = 1;
|
||||||
|
}
|
||||||
|
send_replies();
|
||||||
|
read_requests();
|
||||||
ringloop->submit();
|
ringloop->submit();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -194,7 +215,7 @@ int osd_t::handle_epoll_events()
|
||||||
sockaddr_in addr;
|
sockaddr_in addr;
|
||||||
socklen_t peer_addr_size = sizeof(addr);
|
socklen_t peer_addr_size = sizeof(addr);
|
||||||
int peer_fd;
|
int peer_fd;
|
||||||
while ((peer_fd = accept(listen_fd, &addr, &peer_addr_size)) >= 0)
|
while ((peer_fd = accept(listen_fd, (sockaddr*)&addr, &peer_addr_size)) >= 0)
|
||||||
{
|
{
|
||||||
fcntl(peer_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
|
fcntl(peer_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
|
||||||
clients[peer_fd] = {
|
clients[peer_fd] = {
|
||||||
|
@ -250,7 +271,7 @@ void osd_t::stop_client(int peer_fd)
|
||||||
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
|
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
|
||||||
}
|
}
|
||||||
auto it = clients.find(peer_fd);
|
auto it = clients.find(peer_fd);
|
||||||
if (it->read_ready)
|
if (it->second.read_ready)
|
||||||
{
|
{
|
||||||
for (auto rit = read_ready_clients.begin(); rit != read_ready_clients.end(); rit++)
|
for (auto rit = read_ready_clients.begin(); rit != read_ready_clients.end(); rit++)
|
||||||
{
|
{
|
||||||
|
@ -265,7 +286,7 @@ void osd_t::stop_client(int peer_fd)
|
||||||
close(peer_fd);
|
close(peer_fd);
|
||||||
}
|
}
|
||||||
|
|
||||||
void osd_t::read_commands()
|
void osd_t::read_requests()
|
||||||
{
|
{
|
||||||
for (int i = 0; i < read_ready_clients.size(); i++)
|
for (int i = 0; i < read_ready_clients.size(); i++)
|
||||||
{
|
{
|
||||||
|
@ -274,8 +295,7 @@ void osd_t::read_commands()
|
||||||
io_uring_sqe* sqe = ringloop->get_sqe();
|
io_uring_sqe* sqe = ringloop->get_sqe();
|
||||||
if (!sqe)
|
if (!sqe)
|
||||||
{
|
{
|
||||||
read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients().begin() + i);
|
read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients.begin() + i);
|
||||||
ringloop->submit();
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
ring_data_t* data = ((ring_data_t*)sqe->user_data);
|
ring_data_t* data = ((ring_data_t*)sqe->user_data);
|
||||||
|
@ -297,14 +317,14 @@ void osd_t::read_commands()
|
||||||
cl.read_ready = false;
|
cl.read_ready = false;
|
||||||
}
|
}
|
||||||
read_ready_clients.clear();
|
read_ready_clients.clear();
|
||||||
ringloop->submit();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void osd_t::handle_read(ring_data_t *data, int peer_fd)
|
void osd_t::handle_read(ring_data_t *data, int peer_fd)
|
||||||
{
|
{
|
||||||
auto cl = clients.find(peer_fd);
|
auto cl_it = clients.find(peer_fd);
|
||||||
if (cl != clients.end())
|
if (cl_it != clients.end())
|
||||||
{
|
{
|
||||||
|
auto & cl = cl_it->second;
|
||||||
if (data->res < 0 && data->res != -EAGAIN)
|
if (data->res < 0 && data->res != -EAGAIN)
|
||||||
{
|
{
|
||||||
// this is a client socket, so don't panic. just disconnect it
|
// this is a client socket, so don't panic. just disconnect it
|
||||||
|
@ -312,21 +332,21 @@ void osd_t::handle_read(ring_data_t *data, int peer_fd)
|
||||||
stop_client(peer_fd);
|
stop_client(peer_fd);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
cl->reading = false;
|
cl.reading = false;
|
||||||
if (cl->read_ready)
|
if (cl.read_ready)
|
||||||
{
|
{
|
||||||
read_ready_clients.push_back(peer_fd);
|
read_ready_clients.push_back(peer_fd);
|
||||||
}
|
}
|
||||||
if (data->res > 0)
|
if (data->res > 0)
|
||||||
{
|
{
|
||||||
cl->read_remaining -= data->res;
|
cl.read_remaining -= data->res;
|
||||||
cl->read_buf += data->res;
|
cl.read_buf += data->res;
|
||||||
if (cl->read_remaining <= 0)
|
if (cl.read_remaining <= 0)
|
||||||
{
|
{
|
||||||
cl->read_buf = NULL;
|
osd_op_t *cur_op = cl.read_op;
|
||||||
if (cl->read_state == CL_READ_OP)
|
cl.read_buf = NULL;
|
||||||
|
if (cl.read_state == CL_READ_OP)
|
||||||
{
|
{
|
||||||
osd_op_t *cur_op = cl->read_op;
|
|
||||||
if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ ||
|
if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ ||
|
||||||
cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE ||
|
cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE ||
|
||||||
cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE)
|
cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE)
|
||||||
|
@ -338,26 +358,26 @@ void osd_t::handle_read(ring_data_t *data, int peer_fd)
|
||||||
cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE)
|
cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE)
|
||||||
{
|
{
|
||||||
// Read data
|
// Read data
|
||||||
cl->read_buf = cur_op->buf;
|
cl.read_buf = cur_op->buf;
|
||||||
cl->read_remaining = cur_op->op.sec_rw.len;
|
cl.read_remaining = cur_op->op.sec_rw.len;
|
||||||
cl->read_state = CL_READ_DATA;
|
cl.read_state = CL_READ_DATA;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
// Command is ready
|
// Command is ready
|
||||||
cur_op->peer_fd = peer_fd;
|
cur_op->peer_fd = peer_fd;
|
||||||
enqueue_op(cur_op);
|
enqueue_op(cur_op);
|
||||||
cl->read_op = NULL;
|
cl.read_op = NULL;
|
||||||
cl->read_state = 0;
|
cl.read_state = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (cl->read_state == CL_READ_DATA)
|
else if (cl.read_state == CL_READ_DATA)
|
||||||
{
|
{
|
||||||
// Command is ready
|
// Command is ready
|
||||||
cur_op->peer_fd = peer_fd;
|
cur_op->peer_fd = peer_fd;
|
||||||
enqueue_op(cur_op);
|
enqueue_op(cur_op);
|
||||||
cl->read_op = NULL;
|
cl.read_op = NULL;
|
||||||
cl->read_state = 0;
|
cl.read_state = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -366,59 +386,159 @@ void osd_t::handle_read(ring_data_t *data, int peer_fd)
|
||||||
|
|
||||||
void osd_t::enqueue_op(osd_op_t *cur_op)
|
void osd_t::enqueue_op(osd_op_t *cur_op)
|
||||||
{
|
{
|
||||||
cur_op->bs_op->callback = [this, cur_op](blockstore_operation* bs_op)
|
cur_op->bs_op.callback = [this, cur_op](blockstore_operation* bs_op)
|
||||||
{
|
{
|
||||||
auto cl = clients.find(cur_op->peer_fd);
|
auto cl_it = clients.find(cur_op->peer_fd);
|
||||||
if (cl != clients.end())
|
if (cl_it != clients.end())
|
||||||
{
|
{
|
||||||
cur_op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC;
|
auto & cl = cl_it->second;
|
||||||
cur_op->reply.hdr.id = cur_op->op.hdr.id;
|
if (cl.write_state == 0)
|
||||||
cur_op->reply.hdr.retval = bs_op->retval;
|
{
|
||||||
cl->completions.push(cur_op);
|
cl.write_state = CL_WRITE_READY;
|
||||||
|
write_ready_clients.push_back(cur_op->peer_fd);
|
||||||
|
}
|
||||||
|
cl.completions.push_back(cur_op);
|
||||||
ringloop->wakeup();
|
ringloop->wakeup();
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
if (cur_op->buf)
|
|
||||||
free(cur_op->buf);
|
|
||||||
delete cur_op;
|
delete cur_op;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
if (cur_op->op->hdr.magic != SECONDARY_OSD_OP_MAGIC ||
|
if (cur_op->op.hdr.magic != SECONDARY_OSD_OP_MAGIC ||
|
||||||
cur_op->op->hdr.opcode < OSD_OP_MIN || cur_op->op->hdr.opcode > OSD_OP_MAX ||
|
cur_op->op.hdr.opcode < OSD_OP_MIN || cur_op->op.hdr.opcode > OSD_OP_MAX ||
|
||||||
(cur_op->op->hdr.opcode == OSD_OP_SECONDARY_READ || cur_op->op->hdr.opcode == OSD_OP_SECONDARY_WRITE) &&
|
(cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ || cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE) &&
|
||||||
(cur_op->op->sec_rw.len > OSD_RW_MAX || cur_op->op->sec_rw.len % OSD_RW_ALIGN || cur_op->op->sec_rw.offset % OSD_RW_ALIGN))
|
(cur_op->op.sec_rw.len > OSD_RW_MAX || cur_op->op.sec_rw.len % OSD_RW_ALIGN || cur_op->op.sec_rw.offset % OSD_RW_ALIGN))
|
||||||
{
|
{
|
||||||
// Bad command
|
// Bad command
|
||||||
cur_op->bs_op->retval = -EINVAL;
|
cur_op->bs_op.retval = -EINVAL;
|
||||||
cur_op->bs_op->callback();
|
cur_op->bs_op.callback(&cur_op->bs_op);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// FIXME: list op is not a blockstore op yet
|
// FIXME: LIST is not a blockstore op yet
|
||||||
cur_op->bs_op->flags = (cur_op->op->hdr.opcode == OSD_OP_SECONDARY_READ ? OP_READ
|
cur_op->bs_op.flags = (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ ? OP_READ
|
||||||
: (cur_op->op->hdr.opcode == OSD_OP_SECONDARY_WRITE ? OP_WRITE
|
: (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE ? OP_WRITE
|
||||||
: (cur_op->op->hdr.opcode == OSD_OP_SECONDARY_SYNC ? OP_SYNC
|
: (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_SYNC ? OP_SYNC
|
||||||
: (cur_op->op->hdr.opcode == OSD_OP_SECONDARY_STABILIZE ? OP_STABLE
|
: (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE ? OP_STABLE
|
||||||
: (cur_op->op->hdr.opcode == OSD_OP_SECONDARY_DELETE ? OP_DELETE
|
: (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_DELETE ? OP_DELETE
|
||||||
: -1))))));
|
: -1)))));
|
||||||
if (cur_op->op->hdr.opcode == OSD_OP_SECONDARY_READ ||
|
if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ ||
|
||||||
cur_op->op->hdr.opcode == OSD_OP_SECONDARY_WRITE)
|
cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE)
|
||||||
{
|
{
|
||||||
cur_op->bs_op->oid = cur_op->op->sec_rw.oid;
|
cur_op->bs_op.oid = cur_op->op.sec_rw.oid;
|
||||||
cur_op->bs_op->version = cur_op->op->sec_rw.version;
|
cur_op->bs_op.version = cur_op->op.sec_rw.version;
|
||||||
cur_op->bs_op->offset = cur_op->op->sec_rw.offset;
|
cur_op->bs_op.offset = cur_op->op.sec_rw.offset;
|
||||||
cur_op->bs_op->len = cur_op->op->sec_rw.len;
|
cur_op->bs_op.len = cur_op->op.sec_rw.len;
|
||||||
cur_op->bs_op->buf = cur_op->buf;
|
cur_op->bs_op.buf = cur_op->buf;
|
||||||
}
|
}
|
||||||
else if (cur_op->op->hdr.opcode == OSD_OP_SECONDARY_DELETE)
|
else if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_DELETE)
|
||||||
{
|
{
|
||||||
cur_op->bs_op->oid = cur_op->op->sec_del.oid;
|
cur_op->bs_op.oid = cur_op->op.sec_del.oid;
|
||||||
cur_op->bs_op->version = cur_op->op->sec_del.version;
|
cur_op->bs_op.version = cur_op->op.sec_del.version;
|
||||||
}
|
}
|
||||||
else if (cur_op->op->hdr.opcode == OSD_OP_SECONDARY_STABILIZE)
|
else if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE)
|
||||||
{
|
{
|
||||||
cur_op->bs_op->len = cur_op->op->len/sizeof(obj_ver_id);
|
cur_op->bs_op.len = cur_op->op.sec_stabilize.len/sizeof(obj_ver_id);
|
||||||
cur_op->bs_op->buf = cur_op->buf;
|
cur_op->bs_op.buf = cur_op->buf;
|
||||||
|
}
|
||||||
|
bs->enqueue_op(&cur_op->bs_op);
|
||||||
|
}
|
||||||
|
|
||||||
|
void osd_t::send_replies()
|
||||||
|
{
|
||||||
|
for (int i = 0; i < write_ready_clients.size(); i++)
|
||||||
|
{
|
||||||
|
int peer_fd = write_ready_clients[i];
|
||||||
|
auto & cl = clients[peer_fd];
|
||||||
|
io_uring_sqe* sqe = ringloop->get_sqe();
|
||||||
|
if (!sqe)
|
||||||
|
{
|
||||||
|
write_ready_clients.erase(write_ready_clients.begin(), write_ready_clients.begin() + i);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
ring_data_t* data = ((ring_data_t*)sqe->user_data);
|
||||||
|
if (!cl.write_buf)
|
||||||
|
{
|
||||||
|
// pick next command
|
||||||
|
cl.write_op = cl.completions.front();
|
||||||
|
cl.completions.pop_front();
|
||||||
|
make_reply(cl.write_op);
|
||||||
|
cl.write_buf = &cl.write_op->reply_buf;
|
||||||
|
cl.write_remaining = OSD_REPLY_PACKET_SIZE;
|
||||||
|
cl.write_state = CL_WRITE_REPLY;
|
||||||
|
}
|
||||||
|
cl.write_iov.iov_base = cl.write_buf;
|
||||||
|
cl.write_iov.iov_len = cl.write_remaining;
|
||||||
|
cl.write_msg.msg_iov = &cl.write_iov;
|
||||||
|
cl.write_msg.msg_iovlen = 1;
|
||||||
|
data->callback = [this, peer_fd](ring_data_t *data) { handle_send(data, peer_fd); };
|
||||||
|
my_uring_prep_sendmsg(sqe, peer_fd, &cl.write_msg, 0);
|
||||||
|
cl.write_state = cl.write_state | SQE_SENT;
|
||||||
|
}
|
||||||
|
write_ready_clients.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
void osd_t::make_reply(osd_op_t *op)
|
||||||
|
{
|
||||||
|
op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC;
|
||||||
|
op->reply.hdr.id = op->op.hdr.id;
|
||||||
|
op->reply.hdr.retval = op->bs_op.retval;
|
||||||
|
}
|
||||||
|
|
||||||
|
void osd_t::handle_send(ring_data_t *data, int peer_fd)
|
||||||
|
{
|
||||||
|
auto cl_it = clients.find(peer_fd);
|
||||||
|
if (cl_it != clients.end())
|
||||||
|
{
|
||||||
|
auto & cl = cl_it->second;
|
||||||
|
if (data->res < 0 && data->res != -EAGAIN)
|
||||||
|
{
|
||||||
|
// this is a client socket, so don't panic. just disconnect it
|
||||||
|
printf("Client %d socket write error: %d (%s). Disconnecting client\n", peer_fd, -data->res, strerror(-data->res));
|
||||||
|
stop_client(peer_fd);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
cl.write_state = cl.write_state & ~SQE_SENT;
|
||||||
|
if (data->res > 0)
|
||||||
|
{
|
||||||
|
cl.write_remaining -= data->res;
|
||||||
|
cl.write_buf += data->res;
|
||||||
|
if (cl.write_remaining <= 0)
|
||||||
|
{
|
||||||
|
cl.write_buf = NULL;
|
||||||
|
osd_op_t *cur_op = cl.write_op;
|
||||||
|
if (cl.write_state == CL_WRITE_REPLY)
|
||||||
|
{
|
||||||
|
if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ &&
|
||||||
|
cur_op->reply.hdr.retval > 0)
|
||||||
|
{
|
||||||
|
// Send data
|
||||||
|
cl.write_buf = cur_op->buf;
|
||||||
|
cl.write_remaining = cur_op->reply.hdr.retval;
|
||||||
|
cl.write_state = CL_WRITE_DATA;
|
||||||
|
}
|
||||||
|
else if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_LIST)
|
||||||
|
{
|
||||||
|
// FIXME
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
goto op_done;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (cl.write_state == CL_WRITE_DATA)
|
||||||
|
{
|
||||||
|
op_done:
|
||||||
|
// Done
|
||||||
|
delete cur_op;
|
||||||
|
cl.write_op = NULL;
|
||||||
|
cl.write_state = cl.completions.size() > 0 ? CL_WRITE_READY : 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (cl.write_state != 0)
|
||||||
|
{
|
||||||
|
write_ready_clients.push_back(peer_fd);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
bs->enqueue_op(cur_op->bs_op);
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,8 +4,9 @@
|
||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
|
|
||||||
// Magic numbers
|
// Magic numbers
|
||||||
#define SECONDARY_OSD_OP_MAGIC 0xf3f003b966ace9ab2bd7b10325434553
|
|
||||||
#define SECONDARY_OSD_REPLY_MAGIC 0xd17a57243b580b99baa699b87b434553
|
#define SECONDARY_OSD_OP_MAGIC 0x2bd7b10325434553l
|
||||||
|
#define SECONDARY_OSD_REPLY_MAGIC 0xbaa699b87b434553l
|
||||||
// Operation request headers and operation reply headers have fixed size after which comes data
|
// Operation request headers and operation reply headers have fixed size after which comes data
|
||||||
#define OSD_OP_PACKET_SIZE 0x80
|
#define OSD_OP_PACKET_SIZE 0x80
|
||||||
#define OSD_REPLY_PACKET_SIZE 0x40
|
#define OSD_REPLY_PACKET_SIZE 0x40
|
||||||
|
|
|
@ -49,7 +49,7 @@ int main(int narg, char *args[])
|
||||||
}
|
}
|
||||||
else if (main_state == 2)
|
else if (main_state == 2)
|
||||||
{
|
{
|
||||||
printf("version %u written, syncing\n", op.version);
|
printf("version %lu written, syncing\n", op.version);
|
||||||
version = op.version;
|
version = op.version;
|
||||||
op.flags = OP_SYNC;
|
op.flags = OP_SYNC;
|
||||||
bs->enqueue_op(&op);
|
bs->enqueue_op(&op);
|
||||||
|
@ -57,7 +57,7 @@ int main(int narg, char *args[])
|
||||||
}
|
}
|
||||||
else if (main_state == 4)
|
else if (main_state == 4)
|
||||||
{
|
{
|
||||||
printf("stabilizing version %u\n", version);
|
printf("stabilizing version %lu\n", version);
|
||||||
op.flags = OP_STABLE;
|
op.flags = OP_STABLE;
|
||||||
op.len = 1;
|
op.len = 1;
|
||||||
*((obj_ver_id*)op.buf) = {
|
*((obj_ver_id*)op.buf) = {
|
||||||
|
|
Loading…
Reference in New Issue