diff --git a/Makefile b/Makefile index 88a333cc8..08522a28d 100644 --- a/Makefile +++ b/Makefile @@ -30,10 +30,12 @@ osd_read.o: osd_read.cpp osd.h osd_ops.h g++ $(CXXFLAGS) -c -o $@ $< osd_send.o: osd_send.cpp osd.h osd_ops.h g++ $(CXXFLAGS) -c -o $@ $< +osd_peering.o: osd_peering.cpp osd.h osd_ops.h + g++ $(CXXFLAGS) -c -o $@ $< osd.o: osd.cpp osd.h osd_ops.h g++ $(CXXFLAGS) -c -o $@ $< -osd: ./libblockstore.so osd_main.cpp osd.h osd_ops.h osd.o osd_exec_secondary.o osd_read.o osd_send.o json11.o - g++ $(CXXFLAGS) -o osd osd_main.cpp osd.o osd_exec_secondary.o osd_read.o osd_send.o json11.o ./libblockstore.so -ltcmalloc_minimal -luring +osd: ./libblockstore.so osd_main.cpp osd.h osd_ops.h osd.o osd_exec_secondary.o osd_read.o osd_send.o osd_peering.o json11.o + g++ $(CXXFLAGS) -o osd osd_main.cpp osd.o osd_exec_secondary.o osd_read.o osd_send.o osd_peering.o json11.o ./libblockstore.so -ltcmalloc_minimal -luring stub_osd: stub_osd.cpp osd_ops.h g++ $(CXXFLAGS) -o stub_osd stub_osd.cpp -ltcmalloc_minimal diff --git a/blockstore_impl.h b/blockstore_impl.h index 0553dcf92..2697597f7 100644 --- a/blockstore_impl.h +++ b/blockstore_impl.h @@ -28,6 +28,7 @@ #define MEM_ALIGNMENT 512 // States are not stored on disk. Instead, they're deduced from the journal +// FIXME: Rename to BS_ST_* #define ST_J_IN_FLIGHT 1 #define ST_J_SUBMITTED 2 @@ -171,12 +172,6 @@ struct blockstore_op_private_t #include "blockstore_flush.h" -struct blockstore_params_t -{ - uint32_t block_size; - -}; - class blockstore_impl_t { /******* OPTIONS *******/ diff --git a/object_id.h b/object_id.h index bb353ac68..e08c7da5d 100644 --- a/object_id.h +++ b/object_id.h @@ -55,4 +55,17 @@ namespace std return seed; } }; + + template<> struct hash + { + inline size_t operator()(const obj_ver_id &s) const + { + size_t seed = 0; + // Copy-pasted from spp::hash_combine() + seed ^= (s.oid.inode + 0xc6a4a7935bd1e995 + (seed << 6) + (seed >> 2)); + seed ^= (s.oid.stripe + 0xc6a4a7935bd1e995 + (seed << 6) + (seed >> 2)); + seed ^= (s.version + 0xc6a4a7935bd1e995 + (seed << 6) + (seed >> 2)); + return seed; + } + }; } diff --git a/osd.cpp b/osd.cpp index aa55a07b5..de2166190 100644 --- a/osd.cpp +++ b/osd.cpp @@ -9,16 +9,22 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringloop) { + this->config = config; + this->bs = bs; + this->ringloop = ringloop; + bind_address = config["bind_address"]; if (bind_address == "") bind_address = "0.0.0.0"; bind_port = strtoull(config["bind_port"].c_str(), NULL, 10); if (!bind_port || bind_port > 65535) bind_port = 11203; - - this->config = config; - this->bs = bs; - this->ringloop = ringloop; + osd_num = strtoull(config["osd_num"].c_str(), NULL, 10); + if (!osd_num) + throw std::runtime_error("osd_num is required in the configuration"); + run_primary = config["run_primary"] == "true" || config["run_primary"] == "1" || config["run_primary"] == "yes"; + if (run_primary) + init_primary(); listen_fd = socket(AF_INET, SOCK_STREAM, 0); if (listen_fd < 0) @@ -61,6 +67,7 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo epoll_event ev; ev.data.fd = listen_fd; + // FIXME: Use EPOLLET ev.events = EPOLLIN; if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &ev) < 0) { @@ -133,86 +140,12 @@ void osd_t::loop() { handle_epoll_events(); } + handle_peers(); send_replies(); read_requests(); ringloop->submit(); } -void osd_t::connect_peer(unsigned osd_num, char *peer_host, int peer_port, std::function callback) -{ - struct sockaddr_in addr; - int r; - if ((r = inet_pton(AF_INET, peer_host, &addr.sin_addr)) != 1) - { - callback(-EINVAL); - return; - } - addr.sin_family = AF_INET; - addr.sin_port = htons(peer_port ? peer_port : 11203); - int peer_fd = socket(AF_INET, SOCK_STREAM, 0); - if (peer_fd < 0) - { - callback(-errno); - return; - } - fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK); - r = connect(peer_fd, (sockaddr*)&addr, sizeof(addr)); - if (r < 0 && r != EINPROGRESS) - { - close(peer_fd); - callback(-errno); - return; - } - clients[peer_fd] = (osd_client_t){ - .peer_addr = addr, - .peer_port = peer_port, - .peer_fd = peer_fd, - .peer_state = PEER_CONNECTING, - .connect_callback = callback, - .osd_num = osd_num, - }; - osd_peer_fds[osd_num] = peer_fd; - // Add FD to epoll (EPOLLOUT for tracking connect() result) - epoll_event ev; - ev.data.fd = peer_fd; - ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP; - if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peer_fd, &ev) < 0) - { - throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); - } -} - -void osd_t::handle_connect_result(int peer_fd) -{ - auto & cl = clients[peer_fd]; - std::function callback = cl.connect_callback; - int result = 0; - socklen_t result_len = sizeof(result); - if (getsockopt(peer_fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0) - { - result = errno; - } - if (result != 0) - { - stop_client(peer_fd); - callback(-result); - return; - } - int one = 1; - setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); - // Disable EPOLLOUT on this fd - cl.connect_callback = NULL; - cl.peer_state = PEER_CONNECTED; - epoll_event ev; - ev.data.fd = peer_fd; - ev.events = EPOLLIN | EPOLLRDHUP; - if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, peer_fd, &ev) < 0) - { - throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); - } - callback(peer_fd); -} - int osd_t::handle_epoll_events() { epoll_event events[MAX_EPOLL_EVENTS]; diff --git a/osd.h b/osd.h index 7879eaa10..f64bbca36 100644 --- a/osd.h +++ b/osd.h @@ -97,7 +97,8 @@ struct osd_client_t struct osd_pg_role_t { - int role; + // role = (stripe role: 1, 2, 3, ...) | (stable ? 0 : 1<<63) + uint64_t role; uint64_t osd_num; }; @@ -121,7 +122,7 @@ namespace std }; } -// Placement group state: +// Placement group states // Exactly one of these: #define PG_OFFLINE (1<<0) #define PG_PEERING (1<<1) @@ -132,10 +133,23 @@ namespace std #define PG_HAS_DEGRADED (1<<5) #define PG_HAS_MISPLACED (1<<6) +// OSD object states +#define OSD_CLEAN 0x01 +#define OSD_MISPLACED 0x02 +#define OSD_DEGRADED 0x03 +#define OSD_INCOMPLETE 0x04 +#define OSD_HALF_STABLE 0x10000 +#define OSD_NEEDS_ROLLBACK 0x20000 + +struct osd_pg_peering_state_t +{ + std::unordered_map list_ops; +}; + struct osd_pg_t { int state; - unsigned num; + uint64_t pg_num; uint64_t n_unfound = 0, n_degraded = 0, n_misplaced = 0; std::vector target_set; // moved object map. by default, each object is considered to reside on the target_set. @@ -145,6 +159,15 @@ struct osd_pg_t std::unordered_map acting_set_ids; std::map acting_sets; spp::sparse_hash_map object_map; + osd_pg_peering_state_t *peering_state = NULL; +}; + +struct osd_peer_def_t +{ + uint64_t osd_num = 0; + std::string addr; + int port = 0; + time_t last_connect_attempt = 0; }; class osd_t @@ -152,6 +175,8 @@ class osd_t // config uint64_t osd_num = 1; // OSD numbers start with 1 + bool run_primary = false; + std::vector peers; blockstore_config_t config; std::string bind_address; int bind_port, listen_backlog; @@ -162,7 +187,8 @@ class osd_t std::map osd_peer_fds; std::vector pgs; - unsigned pg_count; + bool needs_peering = false; + unsigned pg_count = 0; // client & peer I/O @@ -193,10 +219,13 @@ class osd_t void make_reply(osd_op_t *op); void handle_send(ring_data_t *data, int peer_fd); - // connect/disconnect - void connect_peer(unsigned osd_num, char *peer_host, int peer_port, std::function callback); + // peer handling (primary OSD logic) + void connect_peer(unsigned osd_num, const char *peer_host, int peer_port, std::function callback); void handle_connect_result(int peer_fd); void stop_client(int peer_fd); + osd_peer_def_t parse_peer(std::string peer); + void init_primary(); + void handle_peers(); // op execution void handle_reply(osd_op_t *cur_op); diff --git a/osd_peering.cpp b/osd_peering.cpp new file mode 100644 index 000000000..5e72c0738 --- /dev/null +++ b/osd_peering.cpp @@ -0,0 +1,157 @@ +#include +#include +#include "osd.h" + +void osd_t::init_primary() +{ + // Initial test version of clustering code requires exactly 2 peers + if (config["peer1"] == "" || config["peer2"] == "") + throw std::runtime_error("run_primary requires two peers"); + peers.push_back(parse_peer(config["peer1"])); + peers.push_back(parse_peer(config["peer2"])); + if (peers[1].osd_num == peers[0].osd_num) + throw std::runtime_error("peer1 and peer2 osd numbers are the same"); + pgs.push_back((osd_pg_t){ + .state = PG_OFFLINE, + .pg_num = 1, + .target_set = { { .role = 1, .osd_num = 1 }, { .role = 2, .osd_num = 2 }, { .role = 3, .osd_num = 3 } }, + .object_map = spp::sparse_hash_map(), + }); + pg_count = 1; + needs_peering = true; +} + +osd_peer_def_t osd_t::parse_peer(std::string peer) +{ + // OSD_NUM:IP:PORT + size_t pos1 = peer.find(':'); + size_t pos2 = peer.find(':', pos1+1); + if (pos1 < 0 || pos2 < 0) + throw new std::runtime_error("OSD peer string must be in the form OSD_NUM:IP:PORT"); + osd_peer_def_t r; + r.addr = peer.substr(pos1+1, pos2-pos1-1); + std::string osd_num_str = peer.substr(0, pos1); + std::string port_str = peer.substr(pos2+1); + r.osd_num = strtoull(osd_num_str.c_str(), NULL, 10); + if (!r.osd_num) + throw new std::runtime_error("Could not parse OSD peer osd_num"); + r.port = strtoull(port_str.c_str(), NULL, 10); + if (!r.port) + throw new std::runtime_error("Could not parse OSD peer port"); + return r; +} + +void osd_t::connect_peer(unsigned osd_num, const char *peer_host, int peer_port, std::function callback) +{ + struct sockaddr_in addr; + int r; + if ((r = inet_pton(AF_INET, peer_host, &addr.sin_addr)) != 1) + { + callback(-EINVAL); + return; + } + addr.sin_family = AF_INET; + addr.sin_port = htons(peer_port ? peer_port : 11203); + int peer_fd = socket(AF_INET, SOCK_STREAM, 0); + if (peer_fd < 0) + { + callback(-errno); + return; + } + fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK); + r = connect(peer_fd, (sockaddr*)&addr, sizeof(addr)); + if (r < 0 && errno != EINPROGRESS) + { + close(peer_fd); + callback(-errno); + return; + } + clients[peer_fd] = (osd_client_t){ + .peer_addr = addr, + .peer_port = peer_port, + .peer_fd = peer_fd, + .peer_state = PEER_CONNECTING, + .connect_callback = callback, + .osd_num = osd_num, + }; + osd_peer_fds[osd_num] = peer_fd; + // Add FD to epoll (EPOLLOUT for tracking connect() result) + epoll_event ev; + ev.data.fd = peer_fd; + ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP; + if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peer_fd, &ev) < 0) + { + throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); + } +} + +void osd_t::handle_connect_result(int peer_fd) +{ + auto & cl = clients[peer_fd]; + std::function callback = cl.connect_callback; + int result = 0; + socklen_t result_len = sizeof(result); + if (getsockopt(peer_fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0) + { + result = errno; + } + if (result != 0) + { + stop_client(peer_fd); + callback(-result); + return; + } + int one = 1; + setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); + // Disable EPOLLOUT on this fd + cl.connect_callback = NULL; + cl.peer_state = PEER_CONNECTED; + epoll_event ev; + ev.data.fd = peer_fd; + ev.events = EPOLLIN | EPOLLRDHUP; + if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, peer_fd, &ev) < 0) + { + throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); + } + callback(peer_fd); +} + +// Peering loop +// Ideally: Connect -> Ask & check config -> Start PG peering +void osd_t::handle_peers() +{ + if (needs_peering) + { + for (int i = 0; i < peers.size(); i++) + { + if (osd_peer_fds.find(peers[i].osd_num) == osd_peer_fds.end() && + time(NULL) - peers[i].last_connect_attempt > 5) + { + peers[i].last_connect_attempt = time(NULL); + connect_peer(peers[i].osd_num, peers[i].addr.c_str(), peers[i].port, [this](int peer_fd) + { + printf("Connected with peer OSD %lu (fd %d)\n", clients[peer_fd].osd_num, peer_fd); + // Restart PG peering + pgs[0].state = PG_PEERING; + pgs[0].acting_set_ids.clear(); + pgs[0].acting_sets.clear(); + pgs[0].object_map.clear(); + if (pgs[0].peering_state) + delete pgs[0].peering_state; + ringloop->wakeup(); + }); + } + } + } + for (int i = 0; i < pgs.size(); i++) + { + if (pgs[i].state == PG_PEERING) + { + if (!pgs[i].peering_state) + { + pgs[i].peering_state = new osd_pg_peering_state_t(); + + } + } + } +} diff --git a/test.cpp b/test.cpp index ba4fd77ed..9e35c0131 100644 --- a/test.cpp +++ b/test.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include "blockstore.h" #include "blockstore_impl.h" @@ -236,7 +237,7 @@ int main02(int argc, char *argv[]) return 0; } -int main(int argc, char *argv[]) +int main03(int argc, char *argv[]) { int listen_fd = socket(AF_INET, SOCK_STREAM, 0), enable = 1; assert(listen_fd >= 0); @@ -283,3 +284,67 @@ int main(int argc, char *argv[]) return 0; } + +struct obj_ver_role +{ + object_id oid; + uint64_t version; + uint32_t osd_num; + uint32_t is_stable; +}; + +inline bool operator < (const obj_ver_role & a, const obj_ver_role & b) +{ + return a.oid < b.oid || + a.oid == b.oid && a.version < b.version || + a.oid == b.oid && a.version == b.version || + a.oid == b.oid && a.version == b.version && a.osd_num < b.osd_num; +} + +int main(int argc, char *argv[]) +{ + /*spp::sparse_hash_set osd1, osd2; + // fill takes 18.9 s + for (int i = 0; i < 1024*1024*8*2; i++) + { + obj_ver_id ovid = { { rand() % 500, rand() }, rand() }; + osd1.insert(ovid); + osd2.insert(ovid); + } + for (int i = 0; i < 50000; i++) + { + obj_ver_id ovid = { { rand() % 500, rand() }, rand() }; + osd1.insert(ovid); + ovid = { { rand() % 500, rand() }, rand() }; + osd2.insert(ovid); + } + // diff takes only 2.3 s + spp::sparse_hash_set osd1diff; + for (obj_ver_id e: osd1) + { + auto it = osd2.find(e); + if (it != osd2.end()) + osd2.erase(it); + else + osd1diff.insert(e); + }*/ + // fill vector takes 2 s + std::vector to_sort; + to_sort.resize(1024*1024*8*2*3); + printf("Filling\n"); + for (int i = 0; i < 1024*1024*8*2*3; i++) + { + to_sort[i] = { + .oid = (object_id){ + .inode = rand() % 500, + .stripe = rand(), + }, + .version = rand(), + .osd_num = rand() % 16, + }; + } + printf("Sorting\n"); + // sort takes 7 s + std::sort(to_sort.begin(), to_sort.end()); + return 0; +}