diff --git a/Makefile b/Makefile index 093736a6..731c1224 100644 --- a/Makefile +++ b/Makefile @@ -1,13 +1,15 @@ BLOCKSTORE_OBJS := allocator.o blockstore.o blockstore_init.o blockstore_open.o blockstore_journal.o blockstore_read.o \ blockstore_write.o blockstore_sync.o blockstore_stable.o blockstore_flush.o crc32c.o ringloop.o timerfd_interval.o osd.o CXXFLAGS := -g -O3 -Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith -fPIC -fdiagnostics-color=always -all: $(BLOCKSTORE_OBJS) test test_blockstore libfio_blockstore.so +all: $(BLOCKSTORE_OBJS) test test_blockstore libfio_blockstore.so osd clean: rm -f *.o crc32c.o: crc32c.c g++ $(CXXFLAGS) -c -o $@ $< %.o: %.cpp allocator.h blockstore_flush.h blockstore.h blockstore_init.h blockstore_journal.h crc32c.h ringloop.h xor.h timerfd_interval.h g++ $(CXXFLAGS) -c -o $@ $< +osd: $(BLOCKSTORE_OBJS) osd_main.cpp osd.h osd_ops.h + g++ $(CXXFLAGS) -ltcmalloc_minimal -luring -o osd osd_main.cpp $(BLOCKSTORE_OBJS) test: test.cpp g++ $(CXXFLAGS) -o test -luring test.cpp test_blockstore: $(BLOCKSTORE_OBJS) test_blockstore.cpp diff --git a/osd.cpp b/osd.cpp index 8dd717b4..8125304a 100644 --- a/osd.cpp +++ b/osd.cpp @@ -4,10 +4,7 @@ #include #include -#include - -#include "osd_ops.h" -#include "ringloop.h" +#include "osd.h" #define CL_READ_OP 1 #define CL_READ_DATA 2 @@ -16,97 +13,15 @@ #define CL_WRITE_REPLY 2 #define CL_WRITE_DATA 3 -struct osd_op_t +osd_t::osd_t(blockstore_config_t & config, blockstore *bs, ring_loop_t *ringloop) { - int peer_fd; - union - { - osd_any_op_t op; - uint8_t op_buf[OSD_OP_PACKET_SIZE] = { 0 }; - }; - union - { - osd_any_reply_t reply; - uint8_t reply_buf[OSD_REPLY_PACKET_SIZE] = { 0 }; - }; - blockstore_operation bs_op; - void *buf = NULL; + bind_address = config["bind_address"]; + if (bind_address == "") + bind_address = "0.0.0.0"; + bind_port = strtoull(config["bind_port"].c_str(), NULL, 10); + if (!bind_port || bind_port > 65535) + bind_port = 11203; - ~osd_op_t() - { - if (buf) - free(buf); - } -}; - -struct osd_client_t -{ - sockaddr_in peer_addr; - socklen_t peer_addr_size; - int peer_fd; - //int in_flight_ops = 0; - - // Read state - bool read_ready = false; - bool reading = false; - osd_op_t *read_op = NULL; - iovec read_iov; - msghdr read_msg; - void *read_buf = NULL; - int read_remaining = 0; - int read_state = 0; - - // Completed operations to send replies back to the client - std::deque completions; - - // Write state - osd_op_t *write_op = NULL; - iovec write_iov; - msghdr write_msg; - void *write_buf = NULL; - int write_remaining = 0; - int write_state = 0; -}; - -class osd_t -{ - // config - - int client_queue_depth = 128; - - // fields - - blockstore *bs; - ring_loop_t *ringloop; - - int wait_state = 0; - int epoll_fd = 0; - int listen_fd = 0; - ring_consumer_t consumer; - - std::string bind_address; - int bind_port, listen_backlog; - - std::unordered_map clients; - std::vector read_ready_clients; - std::vector write_ready_clients; - - void loop(); - int handle_epoll_events(); - void stop_client(int peer_fd); - void read_requests(); - void handle_read(ring_data_t *data, int peer_fd); - void enqueue_op(osd_op_t *cur_op); - void send_replies(); - void make_reply(osd_op_t *op); - void handle_send(ring_data_t *data, int peer_fd); -public: - osd_t(blockstore *bs, ring_loop_t *ringloop); - ~osd_t(); -}; - -osd_t::osd_t(blockstore *bs, ring_loop_t *ringloop) -{ this->bs = bs; this->ringloop = ringloop; diff --git a/osd.h b/osd.h new file mode 100644 index 00000000..5e38f34a --- /dev/null +++ b/osd.h @@ -0,0 +1,97 @@ +#pragma once + +#include + +#include + +#include "ringloop.h" +#include "osd_ops.h" + +struct osd_op_t +{ + int peer_fd; + union + { + osd_any_op_t op; + uint8_t op_buf[OSD_OP_PACKET_SIZE] = { 0 }; + }; + union + { + osd_any_reply_t reply; + uint8_t reply_buf[OSD_REPLY_PACKET_SIZE] = { 0 }; + }; + blockstore_operation bs_op; + void *buf = NULL; + + ~osd_op_t() + { + if (buf) + free(buf); + } +}; + +struct osd_client_t +{ + sockaddr_in peer_addr; + socklen_t peer_addr_size; + int peer_fd; + //int in_flight_ops = 0; + + // Read state + bool read_ready = false; + bool reading = false; + osd_op_t *read_op = NULL; + iovec read_iov; + msghdr read_msg; + void *read_buf = NULL; + int read_remaining = 0; + int read_state = 0; + + // Completed operations to send replies back to the client + std::deque completions; + + // Write state + osd_op_t *write_op = NULL; + iovec write_iov; + msghdr write_msg; + void *write_buf = NULL; + int write_remaining = 0; + int write_state = 0; +}; + +class osd_t +{ + // config + + int client_queue_depth = 128; + + // fields + + blockstore *bs; + ring_loop_t *ringloop; + + int wait_state = 0; + int epoll_fd = 0; + int listen_fd = 0; + ring_consumer_t consumer; + + std::string bind_address; + int bind_port, listen_backlog; + + std::unordered_map clients; + std::vector read_ready_clients; + std::vector write_ready_clients; + + void loop(); + int handle_epoll_events(); + void stop_client(int peer_fd); + void read_requests(); + void handle_read(ring_data_t *data, int peer_fd); + void enqueue_op(osd_op_t *cur_op); + void send_replies(); + void make_reply(osd_op_t *op); + void handle_send(ring_data_t *data, int peer_fd); +public: + osd_t(blockstore_config_t & config, blockstore *bs, ring_loop_t *ringloop); + ~osd_t(); +}; diff --git a/osd_main.cpp b/osd_main.cpp new file mode 100644 index 00000000..065202cd --- /dev/null +++ b/osd_main.cpp @@ -0,0 +1,27 @@ +#include "osd.h" + +int main(int narg, char *args[]) +{ + if (sizeof(osd_any_op_t) >= OSD_OP_PACKET_SIZE || + sizeof(osd_any_reply_t) >= OSD_REPLY_PACKET_SIZE) + { + perror("BUG: too small packet size"); + return 1; + } + blockstore_config_t config; + config["meta_device"] = "./test_meta.bin"; + config["journal_device"] = "./test_journal.bin"; + config["data_device"] = "./test_data.bin"; + ring_loop_t *ringloop = new ring_loop_t(512); + blockstore *bs = new blockstore(config, ringloop); + osd_t *osd = new osd_t(config, bs, ringloop); + while (1) + { + ringloop->loop(); + ringloop->wait(); + } + delete osd; + delete bs; + delete ringloop; + return 0; +} diff --git a/osd_ops.h b/osd_ops.h index f2027284..fca97ad1 100644 --- a/osd_ops.h +++ b/osd_ops.h @@ -61,8 +61,6 @@ struct __attribute__((__packed__)) osd_op_secondary_rw_t struct __attribute__((__packed__)) osd_reply_secondary_rw_t { osd_reply_header_t header; - // buffer size - uint64_t len; }; // delete object on the secondary OSD @@ -115,8 +113,6 @@ struct __attribute__((__packed__)) osd_op_secondary_list_t struct __attribute__((__packed__)) osd_reply_secondary_list_t { osd_reply_header_t header; - // oid array length - uint64_t len; }; union osd_any_op_t @@ -138,7 +134,3 @@ union osd_any_reply_t osd_reply_secondary_stabilize_t sec_stabilize; osd_reply_secondary_list_t sec_list; }; - -static int size_ok = sizeof(osd_any_op_t) < OSD_OP_PACKET_SIZE && - sizeof(osd_any_reply_t) < OSD_REPLY_PACKET_SIZE - ? (perror("BUG: too small packet size"), 0) : 1;