diff --git a/osd.cpp b/osd.cpp index 59a78a8a..be8560bd 100644 --- a/osd.cpp +++ b/osd.cpp @@ -82,6 +82,7 @@ osd_op_t::~osd_op_t() if (buf) { // Note: reusing osd_op_t WILL currently lead to memory leaks + // So we don't reuse it, but free it every time if (op_type == OSD_OP_IN && op.hdr.opcode == OSD_OP_SHOW_CONFIG) { @@ -134,6 +135,79 @@ void osd_t::loop() ringloop->submit(); } +void osd_t::connect_peer(unsigned osd_num, char *peer_host, int peer_port, std::function callback) +{ + struct sockaddr_in addr; + int r; + if ((r = inet_pton(AF_INET, peer_host, &addr.sin_addr)) != 1) + { + callback(-EINVAL); + return; + } + addr.sin_family = AF_INET; + addr.sin_port = htons(peer_port ? peer_port : 11203); + int peer_fd = socket(AF_INET, SOCK_STREAM, 0); + if (peer_fd < 0) + { + callback(-errno); + return; + } + fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK); + r = connect(peer_fd, (sockaddr*)&addr, sizeof(addr)); + if (r < 0 && r != EINPROGRESS) + { + close(peer_fd); + callback(-errno); + return; + } + clients[peer_fd] = (osd_client_t){ + .peer_addr = addr, + .peer_port = peer_port, + .peer_fd = peer_fd, + .peer_state = PEER_CONNECTING, + .connect_callback = callback, + .osd_num = osd_num, + }; + osd_peer_fds[osd_num] = peer_fd; + // Add FD to epoll (EPOLLOUT for tracking connect() result) + epoll_event ev; + ev.data.fd = peer_fd; + ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP; + if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peer_fd, &ev) < 0) + { + throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); + } +} + +void osd_t::handle_connect_result(int peer_fd) +{ + auto & cl = clients[peer_fd]; + std::function callback = cl.connect_callback; + int result = 0; + socklen_t result_len = sizeof(result); + if (getsockopt(peer_fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0) + { + result = errno; + } + if (result != 0) + { + stop_client(peer_fd); + callback(-result); + return; + } + // Disable EPOLLOUT on this fd + cl.connect_callback = NULL; + cl.peer_state = PEER_CONNECTED; + epoll_event ev; + ev.data.fd = peer_fd; + ev.events = EPOLLIN | EPOLLRDHUP; + if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, peer_fd, &ev) < 0) + { + throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); + } + callback(peer_fd); +} + int osd_t::handle_epoll_events() { epoll_event events[MAX_EPOLL_EVENTS]; @@ -153,8 +227,9 @@ int osd_t::handle_epoll_events() fcntl(peer_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK); clients[peer_fd] = { .peer_addr = addr, - .peer_addr_size = peer_addr_size, + .peer_port = ntohs(addr.sin_port), .peer_fd = peer_fd, + .peer_state = PEER_CONNECTED, }; // Add FD to epoll epoll_event ev; @@ -181,6 +256,13 @@ int osd_t::handle_epoll_events() printf("osd: client %d disconnected\n", cl.peer_fd); stop_client(cl.peer_fd); } + else if (cl.peer_state == PEER_CONNECTING) + { + if (events[i].events & EPOLLOUT) + { + handle_connect_result(cl.peer_fd); + } + } else if (!cl.read_ready) { // Mark client as ready (i.e. some data is available) @@ -207,6 +289,10 @@ void osd_t::stop_client(int peer_fd) throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); } auto it = clients.find(peer_fd); + if (it->second.osd_num) + { + osd_peer_fds.erase(it->second.osd_num); + } for (auto rit = read_ready_clients.begin(); rit != read_ready_clients.end(); rit++) { if (*rit == peer_fd) diff --git a/osd.h b/osd.h index a231999e..804a13c7 100644 --- a/osd.h +++ b/osd.h @@ -53,11 +53,17 @@ struct osd_op_t ~osd_op_t(); }; +#define PEER_CONNECTING 1 +#define PEER_CONNECTED 2 + struct osd_client_t { sockaddr_in peer_addr; - socklen_t peer_addr_size; + int peer_port; int peer_fd; + int peer_state; + std::function connect_callback; + uint64_t osd_num = 0; //int in_flight_ops = 0; // Read state @@ -112,17 +118,22 @@ namespace std }; } -#define PG_ST_OFFLINE 1 -#define PG_ST_PEERING 2 -#define PG_ST_INCOMPLETE 3 -#define PG_ST_DEGRADED 4 -#define PG_ST_MISPLACED 5 -#define PG_ST_ACTIVE 6 +// Placement group state: +// Exactly one of these: +#define PG_OFFLINE (1<<0) +#define PG_PEERING (1<<1) +#define PG_INCOMPLETE (1<<2) +#define PG_ACTIVE (1<<3) +// Plus any of these: +#define PG_HAS_UNFOUND (1<<4) +#define PG_HAS_DEGRADED (1<<5) +#define PG_HAS_MISPLACED (1<<6) struct osd_pg_t { int state; unsigned num; + uint64_t n_unfound = 0, n_degraded = 0, n_misplaced = 0; std::vector target_set; // moved object map. by default, each object is considered to reside on the target_set. // this map stores all objects that differ. @@ -137,7 +148,7 @@ class osd_t { // config - uint64_t osd_num = 0; + uint64_t osd_num = 1; // OSD numbers start with 1 blockstore_config_t config; std::string bind_address; int bind_port, listen_backlog; @@ -168,9 +179,9 @@ class osd_t // methods + // event loop, socket read/write void loop(); int handle_epoll_events(); - void stop_client(int peer_fd); void read_requests(); void handle_read(ring_data_t *data, int peer_fd); void handle_read_op(osd_client_t *cl); @@ -179,6 +190,12 @@ class osd_t void make_reply(osd_op_t *op); void handle_send(ring_data_t *data, int peer_fd); + // connect/disconnect + void connect_peer(unsigned osd_num, char *peer_host, int peer_port, std::function callback); + void handle_connect_result(int peer_fd); + void stop_client(int peer_fd); + + // op execution void handle_reply(osd_op_t *cur_op); void exec_op(osd_op_t *cur_op); void exec_sync_stab_all(osd_op_t *cur_op);