forked from vitalif/vitastor
Use json11, add show_config operation, begin primary operations
parent
4677ace4cc
commit
8a386270bd
6
Makefile
6
Makefile
|
@ -6,10 +6,12 @@ clean:
|
|||
rm -f *.o
|
||||
crc32c.o: crc32c.c
|
||||
g++ $(CXXFLAGS) -c -o $@ $<
|
||||
json11.o: json11/json11.cpp
|
||||
g++ $(CXXFLAGS) -c -o json11.o json11/json11.cpp
|
||||
%.o: %.cpp allocator.h blockstore_flush.h blockstore.h blockstore_impl.h blockstore_init.h blockstore_journal.h crc32c.h ringloop.h xor.h timerfd_interval.h object_id.h
|
||||
g++ $(CXXFLAGS) -c -o $@ $<
|
||||
osd: $(BLOCKSTORE_OBJS) osd_main.cpp osd.h osd_ops.h osd.o
|
||||
g++ $(CXXFLAGS) -ltcmalloc_minimal -luring -o osd osd_main.cpp osd.o $(BLOCKSTORE_OBJS)
|
||||
osd: $(BLOCKSTORE_OBJS) osd_main.cpp osd.h osd_ops.h osd.o json11.o
|
||||
g++ $(CXXFLAGS) -ltcmalloc_minimal -luring -o osd osd_main.cpp osd.o json11.o $(BLOCKSTORE_OBJS)
|
||||
test: test.cpp
|
||||
g++ $(CXXFLAGS) -o test -luring test.cpp
|
||||
test_blockstore: $(BLOCKSTORE_OBJS) test_blockstore.cpp
|
||||
|
|
69
osd.cpp
69
osd.cpp
|
@ -4,6 +4,8 @@
|
|||
#include <netinet/in.h>
|
||||
#include <arpa/inet.h>
|
||||
|
||||
#include "json11/json11.hpp"
|
||||
|
||||
#include "osd.h"
|
||||
|
||||
#define CL_READ_OP 1
|
||||
|
@ -22,6 +24,7 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo
|
|||
if (!bind_port || bind_port > 65535)
|
||||
bind_port = 11203;
|
||||
|
||||
this->config = config;
|
||||
this->bs = bs;
|
||||
this->ringloop = ringloop;
|
||||
|
||||
|
@ -83,6 +86,21 @@ osd_t::~osd_t()
|
|||
close(listen_fd);
|
||||
}
|
||||
|
||||
osd_op_t::~osd_op_t()
|
||||
{
|
||||
if (buf)
|
||||
{
|
||||
// Note: reusing osd_op_t WILL currently lead to memory leaks
|
||||
if (op.hdr.opcode == OSD_OP_SHOW_CONFIG)
|
||||
{
|
||||
std::string *str = (std::string*)buf;
|
||||
delete str;
|
||||
}
|
||||
else
|
||||
free(buf);
|
||||
}
|
||||
}
|
||||
|
||||
bool osd_t::shutdown()
|
||||
{
|
||||
stopping = true;
|
||||
|
@ -287,12 +305,20 @@ void osd_t::handle_read(ring_data_t *data, int peer_fd)
|
|||
// Allocate a buffer
|
||||
cur_op->buf = memalign(512, cur_op->op.sec_rw.len);
|
||||
}
|
||||
else if (cur_op->op.hdr.opcode == OSD_OP_READ ||
|
||||
cur_op->op.hdr.opcode == OSD_OP_WRITE)
|
||||
{
|
||||
cur_op->buf = memalign(512, cur_op->op.rw.len);
|
||||
}
|
||||
if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE ||
|
||||
cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE)
|
||||
cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE ||
|
||||
cur_op->op.hdr.opcode == OSD_OP_WRITE)
|
||||
{
|
||||
// Read data
|
||||
cl.read_buf = cur_op->buf;
|
||||
cl.read_remaining = cur_op->op.sec_rw.len;
|
||||
cl.read_remaining = (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE
|
||||
? cur_op->op.sec_rw.len
|
||||
: cur_op->op.rw.len);
|
||||
cl.read_state = CL_READ_DATA;
|
||||
}
|
||||
else
|
||||
|
@ -349,7 +375,8 @@ void osd_t::enqueue_op(osd_op_t *cur_op)
|
|||
inflight_ops++;
|
||||
if (cur_op->op.hdr.magic != SECONDARY_OSD_OP_MAGIC ||
|
||||
cur_op->op.hdr.opcode < OSD_OP_MIN || cur_op->op.hdr.opcode > OSD_OP_MAX ||
|
||||
(cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ || cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE) &&
|
||||
(cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ || cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE ||
|
||||
cur_op->op.hdr.opcode == OSD_OP_READ || cur_op->op.hdr.opcode == OSD_OP_WRITE) &&
|
||||
(cur_op->op.sec_rw.len > OSD_RW_MAX || cur_op->op.sec_rw.len % OSD_RW_ALIGN || cur_op->op.sec_rw.offset % OSD_RW_ALIGN))
|
||||
{
|
||||
// Bad command
|
||||
|
@ -403,6 +430,18 @@ void osd_t::enqueue_op(osd_op_t *cur_op)
|
|||
bs->enqueue_op(&cur_op->bs_op);
|
||||
return;
|
||||
}
|
||||
else if (cur_op->op.hdr.opcode == OSD_OP_SHOW_CONFIG)
|
||||
{
|
||||
// FIXME: Send the real config, not its source
|
||||
std::string *cfg_str = new std::string(std::move(json11::Json(config).dump()));
|
||||
cur_op->buf = cfg_str;
|
||||
auto & cl = clients[cur_op->peer_fd];
|
||||
cl.write_state = CL_WRITE_READY;
|
||||
write_ready_clients.push_back(cur_op->peer_fd);
|
||||
cl.completions.push_back(cur_op);
|
||||
ringloop->wakeup();
|
||||
return;
|
||||
}
|
||||
cur_op->bs_op.callback = [this, cur_op](blockstore_op_t* bs_op) { secondary_op_callback(cur_op); };
|
||||
cur_op->bs_op.opcode = (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ ? BS_OP_READ
|
||||
: (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE ? BS_OP_WRITE
|
||||
|
@ -476,9 +515,17 @@ void osd_t::make_reply(osd_op_t *op)
|
|||
{
|
||||
op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC;
|
||||
op->reply.hdr.id = op->op.hdr.id;
|
||||
op->reply.hdr.retval = op->bs_op.retval;
|
||||
if (op->op.hdr.opcode == OSD_OP_SECONDARY_LIST)
|
||||
op->reply.sec_list.stable_count = op->bs_op.version;
|
||||
if (op->op.hdr.opcode == OSD_OP_SHOW_CONFIG)
|
||||
{
|
||||
std::string *str = (std::string*)op->buf;
|
||||
op->reply.hdr.retval = str->size()+1;
|
||||
}
|
||||
else
|
||||
{
|
||||
op->reply.hdr.retval = op->bs_op.retval;
|
||||
if (op->op.hdr.opcode == OSD_OP_SECONDARY_LIST)
|
||||
op->reply.sec_list.stable_count = op->bs_op.version;
|
||||
}
|
||||
}
|
||||
|
||||
void osd_t::handle_send(ring_data_t *data, int peer_fd)
|
||||
|
@ -505,10 +552,10 @@ void osd_t::handle_send(ring_data_t *data, int peer_fd)
|
|||
osd_op_t *cur_op = cl.write_op;
|
||||
if (cl.write_state == CL_WRITE_REPLY)
|
||||
{
|
||||
// Send data
|
||||
if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ &&
|
||||
cur_op->reply.hdr.retval > 0)
|
||||
{
|
||||
// Send data
|
||||
cl.write_buf = cur_op->buf;
|
||||
cl.write_remaining = cur_op->reply.hdr.retval;
|
||||
cl.write_state = CL_WRITE_DATA;
|
||||
|
@ -516,11 +563,17 @@ void osd_t::handle_send(ring_data_t *data, int peer_fd)
|
|||
else if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_LIST &&
|
||||
cur_op->reply.hdr.retval > 0)
|
||||
{
|
||||
// Send data
|
||||
cl.write_buf = cur_op->buf;
|
||||
cl.write_remaining = cur_op->reply.hdr.retval * sizeof(obj_ver_id);
|
||||
cl.write_state = CL_WRITE_DATA;
|
||||
}
|
||||
else if (cur_op->op.hdr.opcode == OSD_OP_SHOW_CONFIG &&
|
||||
cur_op->reply.hdr.retval > 0)
|
||||
{
|
||||
cl.write_buf = (void*)((std::string*)cur_op->buf)->c_str();
|
||||
cl.write_remaining = cur_op->reply.hdr.retval;
|
||||
cl.write_state = CL_WRITE_DATA;
|
||||
}
|
||||
else
|
||||
{
|
||||
goto op_done;
|
||||
|
|
7
osd.h
7
osd.h
|
@ -35,11 +35,7 @@ struct osd_op_t
|
|||
blockstore_op_t bs_op;
|
||||
void *buf = NULL;
|
||||
|
||||
~osd_op_t()
|
||||
{
|
||||
if (buf)
|
||||
free(buf);
|
||||
}
|
||||
~osd_op_t();
|
||||
};
|
||||
|
||||
struct osd_client_t
|
||||
|
@ -75,6 +71,7 @@ class osd_t
|
|||
{
|
||||
// config
|
||||
|
||||
blockstore_config_t config;
|
||||
std::string bind_address;
|
||||
int bind_port, listen_backlog;
|
||||
int client_queue_depth = 128;
|
||||
|
|
34
osd_ops.h
34
osd_ops.h
|
@ -9,15 +9,18 @@
|
|||
#define OSD_OP_PACKET_SIZE 0x80
|
||||
#define OSD_REPLY_PACKET_SIZE 0x40
|
||||
// Opcodes
|
||||
#define OSD_OP_MIN 0x01
|
||||
#define OSD_OP_SECONDARY_READ 0x01
|
||||
#define OSD_OP_SECONDARY_WRITE 0x02
|
||||
#define OSD_OP_SECONDARY_SYNC 0x03
|
||||
#define OSD_OP_SECONDARY_STABILIZE 0x04
|
||||
#define OSD_OP_SECONDARY_DELETE 0x05
|
||||
#define OSD_OP_TEST_SYNC_STAB_ALL 0x06
|
||||
#define OSD_OP_SECONDARY_LIST 0x07
|
||||
#define OSD_OP_MAX 0x07
|
||||
#define OSD_OP_MIN 1
|
||||
#define OSD_OP_SECONDARY_READ 1
|
||||
#define OSD_OP_SECONDARY_WRITE 2
|
||||
#define OSD_OP_SECONDARY_SYNC 3
|
||||
#define OSD_OP_SECONDARY_STABILIZE 4
|
||||
#define OSD_OP_SECONDARY_DELETE 5
|
||||
#define OSD_OP_TEST_SYNC_STAB_ALL 6
|
||||
#define OSD_OP_SECONDARY_LIST 7
|
||||
#define OSD_OP_SHOW_CONFIG 8
|
||||
#define OSD_OP_READ 9
|
||||
#define OSD_OP_WRITE 10
|
||||
#define OSD_OP_MAX 10
|
||||
// Alignment & limit for read/write operations
|
||||
#define OSD_RW_ALIGN 512
|
||||
#define OSD_RW_MAX 64*1024*1024
|
||||
|
@ -101,6 +104,17 @@ struct __attribute__((__packed__)) osd_reply_secondary_stabilize_t
|
|||
osd_reply_header_t header;
|
||||
};
|
||||
|
||||
// show configuration
|
||||
struct __attribute__((__packed__)) osd_op_show_config_t
|
||||
{
|
||||
osd_op_header_t header;
|
||||
};
|
||||
|
||||
struct __attribute__((__packed__)) osd_reply_show_config_t
|
||||
{
|
||||
osd_reply_header_t header;
|
||||
};
|
||||
|
||||
// list objects on replica
|
||||
struct __attribute__((__packed__)) osd_op_secondary_list_t
|
||||
{
|
||||
|
@ -141,6 +155,7 @@ union osd_any_op_t
|
|||
osd_op_secondary_sync_t sec_sync;
|
||||
osd_op_secondary_stabilize_t sec_stabilize;
|
||||
osd_op_secondary_list_t sec_list;
|
||||
osd_op_show_config_t show_conf;
|
||||
osd_op_rw_t rw;
|
||||
};
|
||||
|
||||
|
@ -152,5 +167,6 @@ union osd_any_reply_t
|
|||
osd_reply_secondary_sync_t sec_sync;
|
||||
osd_reply_secondary_stabilize_t sec_stabilize;
|
||||
osd_reply_secondary_list_t sec_list;
|
||||
osd_reply_show_config_t show_conf;
|
||||
osd_reply_rw_t rw;
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue