diff --git a/Makefile b/Makefile index 4fd0c726..dcafd956 100644 --- a/Makefile +++ b/Makefile @@ -6,10 +6,12 @@ clean: rm -f *.o crc32c.o: crc32c.c g++ $(CXXFLAGS) -c -o $@ $< +json11.o: json11/json11.cpp + g++ $(CXXFLAGS) -c -o json11.o json11/json11.cpp %.o: %.cpp allocator.h blockstore_flush.h blockstore.h blockstore_impl.h blockstore_init.h blockstore_journal.h crc32c.h ringloop.h xor.h timerfd_interval.h object_id.h g++ $(CXXFLAGS) -c -o $@ $< -osd: $(BLOCKSTORE_OBJS) osd_main.cpp osd.h osd_ops.h osd.o - g++ $(CXXFLAGS) -ltcmalloc_minimal -luring -o osd osd_main.cpp osd.o $(BLOCKSTORE_OBJS) +osd: $(BLOCKSTORE_OBJS) osd_main.cpp osd.h osd_ops.h osd.o json11.o + g++ $(CXXFLAGS) -ltcmalloc_minimal -luring -o osd osd_main.cpp osd.o json11.o $(BLOCKSTORE_OBJS) test: test.cpp g++ $(CXXFLAGS) -o test -luring test.cpp test_blockstore: $(BLOCKSTORE_OBJS) test_blockstore.cpp diff --git a/osd.cpp b/osd.cpp index f1d67dd6..087bfad1 100644 --- a/osd.cpp +++ b/osd.cpp @@ -4,6 +4,8 @@ #include #include +#include "json11/json11.hpp" + #include "osd.h" #define CL_READ_OP 1 @@ -22,6 +24,7 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo if (!bind_port || bind_port > 65535) bind_port = 11203; + this->config = config; this->bs = bs; this->ringloop = ringloop; @@ -83,6 +86,21 @@ osd_t::~osd_t() close(listen_fd); } +osd_op_t::~osd_op_t() +{ + if (buf) + { + // Note: reusing osd_op_t WILL currently lead to memory leaks + if (op.hdr.opcode == OSD_OP_SHOW_CONFIG) + { + std::string *str = (std::string*)buf; + delete str; + } + else + free(buf); + } +} + bool osd_t::shutdown() { stopping = true; @@ -287,12 +305,20 @@ void osd_t::handle_read(ring_data_t *data, int peer_fd) // Allocate a buffer cur_op->buf = memalign(512, cur_op->op.sec_rw.len); } + else if (cur_op->op.hdr.opcode == OSD_OP_READ || + cur_op->op.hdr.opcode == OSD_OP_WRITE) + { + cur_op->buf = memalign(512, cur_op->op.rw.len); + } if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE || - cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE) + cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE || + cur_op->op.hdr.opcode == OSD_OP_WRITE) { // Read data cl.read_buf = cur_op->buf; - cl.read_remaining = cur_op->op.sec_rw.len; + cl.read_remaining = (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE + ? cur_op->op.sec_rw.len + : cur_op->op.rw.len); cl.read_state = CL_READ_DATA; } else @@ -349,7 +375,8 @@ void osd_t::enqueue_op(osd_op_t *cur_op) inflight_ops++; if (cur_op->op.hdr.magic != SECONDARY_OSD_OP_MAGIC || cur_op->op.hdr.opcode < OSD_OP_MIN || cur_op->op.hdr.opcode > OSD_OP_MAX || - (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ || cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE) && + (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ || cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE || + cur_op->op.hdr.opcode == OSD_OP_READ || cur_op->op.hdr.opcode == OSD_OP_WRITE) && (cur_op->op.sec_rw.len > OSD_RW_MAX || cur_op->op.sec_rw.len % OSD_RW_ALIGN || cur_op->op.sec_rw.offset % OSD_RW_ALIGN)) { // Bad command @@ -403,6 +430,18 @@ void osd_t::enqueue_op(osd_op_t *cur_op) bs->enqueue_op(&cur_op->bs_op); return; } + else if (cur_op->op.hdr.opcode == OSD_OP_SHOW_CONFIG) + { + // FIXME: Send the real config, not its source + std::string *cfg_str = new std::string(std::move(json11::Json(config).dump())); + cur_op->buf = cfg_str; + auto & cl = clients[cur_op->peer_fd]; + cl.write_state = CL_WRITE_READY; + write_ready_clients.push_back(cur_op->peer_fd); + cl.completions.push_back(cur_op); + ringloop->wakeup(); + return; + } cur_op->bs_op.callback = [this, cur_op](blockstore_op_t* bs_op) { secondary_op_callback(cur_op); }; cur_op->bs_op.opcode = (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ ? BS_OP_READ : (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE ? BS_OP_WRITE @@ -476,9 +515,17 @@ void osd_t::make_reply(osd_op_t *op) { op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; op->reply.hdr.id = op->op.hdr.id; - op->reply.hdr.retval = op->bs_op.retval; - if (op->op.hdr.opcode == OSD_OP_SECONDARY_LIST) - op->reply.sec_list.stable_count = op->bs_op.version; + if (op->op.hdr.opcode == OSD_OP_SHOW_CONFIG) + { + std::string *str = (std::string*)op->buf; + op->reply.hdr.retval = str->size()+1; + } + else + { + op->reply.hdr.retval = op->bs_op.retval; + if (op->op.hdr.opcode == OSD_OP_SECONDARY_LIST) + op->reply.sec_list.stable_count = op->bs_op.version; + } } void osd_t::handle_send(ring_data_t *data, int peer_fd) @@ -505,10 +552,10 @@ void osd_t::handle_send(ring_data_t *data, int peer_fd) osd_op_t *cur_op = cl.write_op; if (cl.write_state == CL_WRITE_REPLY) { + // Send data if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ && cur_op->reply.hdr.retval > 0) { - // Send data cl.write_buf = cur_op->buf; cl.write_remaining = cur_op->reply.hdr.retval; cl.write_state = CL_WRITE_DATA; @@ -516,11 +563,17 @@ void osd_t::handle_send(ring_data_t *data, int peer_fd) else if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_LIST && cur_op->reply.hdr.retval > 0) { - // Send data cl.write_buf = cur_op->buf; cl.write_remaining = cur_op->reply.hdr.retval * sizeof(obj_ver_id); cl.write_state = CL_WRITE_DATA; } + else if (cur_op->op.hdr.opcode == OSD_OP_SHOW_CONFIG && + cur_op->reply.hdr.retval > 0) + { + cl.write_buf = (void*)((std::string*)cur_op->buf)->c_str(); + cl.write_remaining = cur_op->reply.hdr.retval; + cl.write_state = CL_WRITE_DATA; + } else { goto op_done; diff --git a/osd.h b/osd.h index 0b0ab8bc..a4693c38 100644 --- a/osd.h +++ b/osd.h @@ -35,11 +35,7 @@ struct osd_op_t blockstore_op_t bs_op; void *buf = NULL; - ~osd_op_t() - { - if (buf) - free(buf); - } + ~osd_op_t(); }; struct osd_client_t @@ -75,6 +71,7 @@ class osd_t { // config + blockstore_config_t config; std::string bind_address; int bind_port, listen_backlog; int client_queue_depth = 128; diff --git a/osd_ops.h b/osd_ops.h index 75d7fe0d..4464e116 100644 --- a/osd_ops.h +++ b/osd_ops.h @@ -9,15 +9,18 @@ #define OSD_OP_PACKET_SIZE 0x80 #define OSD_REPLY_PACKET_SIZE 0x40 // Opcodes -#define OSD_OP_MIN 0x01 -#define OSD_OP_SECONDARY_READ 0x01 -#define OSD_OP_SECONDARY_WRITE 0x02 -#define OSD_OP_SECONDARY_SYNC 0x03 -#define OSD_OP_SECONDARY_STABILIZE 0x04 -#define OSD_OP_SECONDARY_DELETE 0x05 -#define OSD_OP_TEST_SYNC_STAB_ALL 0x06 -#define OSD_OP_SECONDARY_LIST 0x07 -#define OSD_OP_MAX 0x07 +#define OSD_OP_MIN 1 +#define OSD_OP_SECONDARY_READ 1 +#define OSD_OP_SECONDARY_WRITE 2 +#define OSD_OP_SECONDARY_SYNC 3 +#define OSD_OP_SECONDARY_STABILIZE 4 +#define OSD_OP_SECONDARY_DELETE 5 +#define OSD_OP_TEST_SYNC_STAB_ALL 6 +#define OSD_OP_SECONDARY_LIST 7 +#define OSD_OP_SHOW_CONFIG 8 +#define OSD_OP_READ 9 +#define OSD_OP_WRITE 10 +#define OSD_OP_MAX 10 // Alignment & limit for read/write operations #define OSD_RW_ALIGN 512 #define OSD_RW_MAX 64*1024*1024 @@ -101,6 +104,17 @@ struct __attribute__((__packed__)) osd_reply_secondary_stabilize_t osd_reply_header_t header; }; +// show configuration +struct __attribute__((__packed__)) osd_op_show_config_t +{ + osd_op_header_t header; +}; + +struct __attribute__((__packed__)) osd_reply_show_config_t +{ + osd_reply_header_t header; +}; + // list objects on replica struct __attribute__((__packed__)) osd_op_secondary_list_t { @@ -141,6 +155,7 @@ union osd_any_op_t osd_op_secondary_sync_t sec_sync; osd_op_secondary_stabilize_t sec_stabilize; osd_op_secondary_list_t sec_list; + osd_op_show_config_t show_conf; osd_op_rw_t rw; }; @@ -152,5 +167,6 @@ union osd_any_reply_t osd_reply_secondary_sync_t sec_sync; osd_reply_secondary_stabilize_t sec_stabilize; osd_reply_secondary_list_t sec_list; + osd_reply_show_config_t show_conf; osd_reply_rw_t rw; };