forked from vitalif/vitastor
Only use EPOLLOUT while connecting
parent
8736b3ad32
commit
a22d9f38aa
|
@ -16,7 +16,7 @@ epoll_manager_t::epoll_manager_t(ring_loop_t *ringloop)
|
||||||
throw std::runtime_error(std::string("epoll_create: ") + strerror(errno));
|
throw std::runtime_error(std::string("epoll_create: ") + strerror(errno));
|
||||||
}
|
}
|
||||||
|
|
||||||
tfd = new timerfd_manager_t([this](int fd, std::function<void(int, int)> handler) { set_fd_handler(fd, handler); });
|
tfd = new timerfd_manager_t([this](int fd, bool wr, std::function<void(int, int)> handler) { set_fd_handler(fd, wr, handler); });
|
||||||
|
|
||||||
handle_epoll_events();
|
handle_epoll_events();
|
||||||
}
|
}
|
||||||
|
@ -31,14 +31,14 @@ epoll_manager_t::~epoll_manager_t()
|
||||||
close(epoll_fd);
|
close(epoll_fd);
|
||||||
}
|
}
|
||||||
|
|
||||||
void epoll_manager_t::set_fd_handler(int fd, std::function<void(int, int)> handler)
|
void epoll_manager_t::set_fd_handler(int fd, bool wr, std::function<void(int, int)> handler)
|
||||||
{
|
{
|
||||||
if (handler != NULL)
|
if (handler != NULL)
|
||||||
{
|
{
|
||||||
bool exists = epoll_handlers.find(fd) != epoll_handlers.end();
|
bool exists = epoll_handlers.find(fd) != epoll_handlers.end();
|
||||||
epoll_event ev;
|
epoll_event ev;
|
||||||
ev.data.fd = fd;
|
ev.data.fd = fd;
|
||||||
ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLET;
|
ev.events = (wr ? EPOLLOUT : 0) | EPOLLIN | EPOLLRDHUP | EPOLLET;
|
||||||
if (epoll_ctl(epoll_fd, exists ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &ev) < 0)
|
if (epoll_ctl(epoll_fd, exists ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &ev) < 0)
|
||||||
{
|
{
|
||||||
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
|
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
|
||||||
|
|
|
@ -13,7 +13,7 @@ class epoll_manager_t
|
||||||
public:
|
public:
|
||||||
epoll_manager_t(ring_loop_t *ringloop);
|
epoll_manager_t(ring_loop_t *ringloop);
|
||||||
~epoll_manager_t();
|
~epoll_manager_t();
|
||||||
void set_fd_handler(int fd, std::function<void(int, int)> handler);
|
void set_fd_handler(int fd, bool wr, std::function<void(int, int)> handler);
|
||||||
void handle_epoll_events();
|
void handle_epoll_events();
|
||||||
|
|
||||||
timerfd_manager_t *tfd;
|
timerfd_manager_t *tfd;
|
||||||
|
|
|
@ -156,7 +156,7 @@ http_co_t::~http_co_t()
|
||||||
}
|
}
|
||||||
if (peer_fd >= 0)
|
if (peer_fd >= 0)
|
||||||
{
|
{
|
||||||
tfd->set_fd_handler(peer_fd, NULL);
|
tfd->set_fd_handler(peer_fd, false, NULL);
|
||||||
close(peer_fd);
|
close(peer_fd);
|
||||||
peer_fd = -1;
|
peer_fd = -1;
|
||||||
}
|
}
|
||||||
|
@ -223,7 +223,7 @@ void http_co_t::start_connection()
|
||||||
end();
|
end();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
tfd->set_fd_handler(peer_fd, [this](int peer_fd, int epoll_events)
|
tfd->set_fd_handler(peer_fd, true, [this](int peer_fd, int epoll_events)
|
||||||
{
|
{
|
||||||
this->epoll_events |= epoll_events;
|
this->epoll_events |= epoll_events;
|
||||||
handle_events();
|
handle_events();
|
||||||
|
@ -276,6 +276,11 @@ void http_co_t::handle_connect_result()
|
||||||
}
|
}
|
||||||
int one = 1;
|
int one = 1;
|
||||||
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
|
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
|
||||||
|
tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events)
|
||||||
|
{
|
||||||
|
this->epoll_events |= epoll_events;
|
||||||
|
handle_events();
|
||||||
|
});
|
||||||
state = HTTP_CO_SENDING_REQUEST;
|
state = HTTP_CO_SENDING_REQUEST;
|
||||||
submit_send();
|
submit_send();
|
||||||
stackout();
|
stackout();
|
||||||
|
|
|
@ -120,7 +120,7 @@ void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer
|
||||||
.osd_num = peer_osd,
|
.osd_num = peer_osd,
|
||||||
.in_buf = malloc(receive_buffer_size),
|
.in_buf = malloc(receive_buffer_size),
|
||||||
};
|
};
|
||||||
tfd->set_fd_handler(peer_fd, [this](int peer_fd, int epoll_events)
|
tfd->set_fd_handler(peer_fd, true, [this](int peer_fd, int epoll_events)
|
||||||
{
|
{
|
||||||
// Either OUT (connected) or HUP
|
// Either OUT (connected) or HUP
|
||||||
handle_connect_epoll(peer_fd);
|
handle_connect_epoll(peer_fd);
|
||||||
|
@ -151,8 +151,7 @@ void osd_messenger_t::handle_connect_epoll(int peer_fd)
|
||||||
int one = 1;
|
int one = 1;
|
||||||
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
|
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
|
||||||
cl.peer_state = PEER_CONNECTED;
|
cl.peer_state = PEER_CONNECTED;
|
||||||
// FIXME Disable EPOLLOUT on this fd
|
tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events)
|
||||||
tfd->set_fd_handler(peer_fd, [this](int peer_fd, int epoll_events)
|
|
||||||
{
|
{
|
||||||
handle_peer_epoll(peer_fd, epoll_events);
|
handle_peer_epoll(peer_fd, epoll_events);
|
||||||
});
|
});
|
||||||
|
@ -335,7 +334,7 @@ void osd_messenger_t::stop_client(int peer_fd)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
clients.erase(it);
|
clients.erase(it);
|
||||||
tfd->set_fd_handler(peer_fd, NULL);
|
tfd->set_fd_handler(peer_fd, false, NULL);
|
||||||
if (cl.osd_num)
|
if (cl.osd_num)
|
||||||
{
|
{
|
||||||
osd_peer_fds.erase(cl.osd_num);
|
osd_peer_fds.erase(cl.osd_num);
|
||||||
|
@ -394,7 +393,7 @@ void osd_messenger_t::accept_connections(int listen_fd)
|
||||||
.in_buf = malloc(receive_buffer_size),
|
.in_buf = malloc(receive_buffer_size),
|
||||||
};
|
};
|
||||||
// Add FD to epoll
|
// Add FD to epoll
|
||||||
tfd->set_fd_handler(peer_fd, [this](int peer_fd, int epoll_events)
|
tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events)
|
||||||
{
|
{
|
||||||
handle_peer_epoll(peer_fd, epoll_events);
|
handle_peer_epoll(peer_fd, epoll_events);
|
||||||
});
|
});
|
||||||
|
|
2
osd.cpp
2
osd.cpp
|
@ -169,7 +169,7 @@ void osd_t::bind_socket()
|
||||||
|
|
||||||
fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
|
fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
|
||||||
|
|
||||||
epmgr->set_fd_handler(listen_fd, [this](int fd, int events)
|
epmgr->set_fd_handler(listen_fd, false, [this](int fd, int events)
|
||||||
{
|
{
|
||||||
c_cli.accept_connections(listen_fd);
|
c_cli.accept_connections(listen_fd);
|
||||||
});
|
});
|
||||||
|
|
|
@ -38,7 +38,7 @@ int main(int narg, char *args[])
|
||||||
msgr->exec_op = [msgr](osd_op_t *op) { stub_exec_op(msgr, op); };
|
msgr->exec_op = [msgr](osd_op_t *op) { stub_exec_op(msgr, op); };
|
||||||
// Accept new connections
|
// Accept new connections
|
||||||
int listen_fd = bind_stub("0.0.0.0", 11203);
|
int listen_fd = bind_stub("0.0.0.0", 11203);
|
||||||
epmgr->set_fd_handler(listen_fd, [listen_fd, msgr](int fd, int events)
|
epmgr->set_fd_handler(listen_fd, false, [listen_fd, msgr](int fd, int events)
|
||||||
{
|
{
|
||||||
msgr->accept_connections(listen_fd);
|
msgr->accept_connections(listen_fd);
|
||||||
});
|
});
|
||||||
|
|
|
@ -6,7 +6,7 @@
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include "timerfd_manager.h"
|
#include "timerfd_manager.h"
|
||||||
|
|
||||||
timerfd_manager_t::timerfd_manager_t(std::function<void(int, std::function<void(int, int)>)> set_fd_handler)
|
timerfd_manager_t::timerfd_manager_t(std::function<void(int, bool, std::function<void(int, int)>)> set_fd_handler)
|
||||||
{
|
{
|
||||||
this->set_fd_handler = set_fd_handler;
|
this->set_fd_handler = set_fd_handler;
|
||||||
wait_state = 0;
|
wait_state = 0;
|
||||||
|
@ -15,7 +15,7 @@ timerfd_manager_t::timerfd_manager_t(std::function<void(int, std::function<void(
|
||||||
{
|
{
|
||||||
throw std::runtime_error(std::string("timerfd_create: ") + strerror(errno));
|
throw std::runtime_error(std::string("timerfd_create: ") + strerror(errno));
|
||||||
}
|
}
|
||||||
set_fd_handler(timerfd, [this](int fd, int events)
|
set_fd_handler(timerfd, false, [this](int fd, int events)
|
||||||
{
|
{
|
||||||
handle_readable();
|
handle_readable();
|
||||||
});
|
});
|
||||||
|
@ -23,7 +23,7 @@ timerfd_manager_t::timerfd_manager_t(std::function<void(int, std::function<void(
|
||||||
|
|
||||||
timerfd_manager_t::~timerfd_manager_t()
|
timerfd_manager_t::~timerfd_manager_t()
|
||||||
{
|
{
|
||||||
set_fd_handler(timerfd, NULL);
|
set_fd_handler(timerfd, false, NULL);
|
||||||
close(timerfd);
|
close(timerfd);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,9 +26,9 @@ class timerfd_manager_t
|
||||||
void trigger_nearest();
|
void trigger_nearest();
|
||||||
void handle_readable();
|
void handle_readable();
|
||||||
public:
|
public:
|
||||||
std::function<void(int, std::function<void(int, int)>)> set_fd_handler;
|
std::function<void(int, bool, std::function<void(int, int)>)> set_fd_handler;
|
||||||
|
|
||||||
timerfd_manager_t(std::function<void(int, std::function<void(int, int)>)> set_fd_handler);
|
timerfd_manager_t(std::function<void(int, bool, std::function<void(int, int)>)> set_fd_handler);
|
||||||
~timerfd_manager_t();
|
~timerfd_manager_t();
|
||||||
int set_timer(uint64_t millis, bool repeat, std::function<void(int)> callback);
|
int set_timer(uint64_t millis, bool repeat, std::function<void(int)> callback);
|
||||||
void clear_timer(int timer_id);
|
void clear_timer(int timer_id);
|
||||||
|
|
Loading…
Reference in New Issue