Use epoll_manager in osd

Vitaliy Filippov 2020-06-20 01:28:18 +03:00
parent badf68c039
commit 9abaf5b735
3 changed files with 17 additions and 106 deletions

View File

@ -16,7 +16,7 @@ libfio_blockstore.so: ./libblockstore.so fio_engine.o json11.o
OSD_OBJS := osd.o osd_secondary.o msgr_receive.o msgr_send.o osd_peering.o osd_flush.o osd_peering_pg.o \ OSD_OBJS := osd.o osd_secondary.o msgr_receive.o msgr_send.o osd_peering.o osd_flush.o osd_peering_pg.o \
osd_primary.o osd_primary_subops.o etcd_state_client.o messenger.o osd_cluster.o http_client.o pg_states.o \ osd_primary.o osd_primary_subops.o etcd_state_client.o messenger.o osd_cluster.o http_client.o pg_states.o \
osd_rmw.o json11.o base64.o timerfd_manager.o osd_rmw.o json11.o base64.o timerfd_manager.o epoll_manager.o
osd: ./libblockstore.so osd_main.cpp osd.h osd_ops.h $(OSD_OBJS) osd: ./libblockstore.so osd_main.cpp osd.h osd_ops.h $(OSD_OBJS)
g++ $(CXXFLAGS) -o $@ osd_main.cpp $(OSD_OBJS) ./libblockstore.so -ltcmalloc_minimal -luring g++ $(CXXFLAGS) -o $@ osd_main.cpp $(OSD_OBJS) ./libblockstore.so -ltcmalloc_minimal -luring
@ -103,29 +103,29 @@ msgr_receive.o: msgr_receive.cpp json11/json11.hpp messenger.h object_id.h osd_i
g++ $(CXXFLAGS) -c -o $@ $< g++ $(CXXFLAGS) -c -o $@ $<
msgr_send.o: msgr_send.cpp json11/json11.hpp messenger.h object_id.h osd_id.h osd_ops.h ringloop.h timerfd_manager.h msgr_send.o: msgr_send.cpp json11/json11.hpp messenger.h object_id.h osd_id.h osd_ops.h ringloop.h timerfd_manager.h
g++ $(CXXFLAGS) -c -o $@ $< g++ $(CXXFLAGS) -c -o $@ $<
osd.o: osd.cpp blockstore.h cpp-btree/btree_map.h etcd_state_client.h http_client.h json11/json11.hpp messenger.h object_id.h osd.h osd_id.h osd_ops.h osd_peering_pg.h pg_states.h ringloop.h timerfd_manager.h osd.o: osd.cpp blockstore.h cpp-btree/btree_map.h epoll_manager.h etcd_state_client.h http_client.h json11/json11.hpp messenger.h object_id.h osd.h osd_id.h osd_ops.h osd_peering_pg.h pg_states.h ringloop.h timerfd_manager.h
g++ $(CXXFLAGS) -c -o $@ $< g++ $(CXXFLAGS) -c -o $@ $<
osd_cluster.o: osd_cluster.cpp base64.h blockstore.h cpp-btree/btree_map.h etcd_state_client.h http_client.h json11/json11.hpp messenger.h object_id.h osd.h osd_id.h osd_ops.h osd_peering_pg.h pg_states.h ringloop.h timerfd_manager.h osd_cluster.o: osd_cluster.cpp base64.h blockstore.h cpp-btree/btree_map.h epoll_manager.h etcd_state_client.h http_client.h json11/json11.hpp messenger.h object_id.h osd.h osd_id.h osd_ops.h osd_peering_pg.h pg_states.h ringloop.h timerfd_manager.h
g++ $(CXXFLAGS) -c -o $@ $< g++ $(CXXFLAGS) -c -o $@ $<
osd_flush.o: osd_flush.cpp blockstore.h cpp-btree/btree_map.h etcd_state_client.h http_client.h json11/json11.hpp messenger.h object_id.h osd.h osd_id.h osd_ops.h osd_peering_pg.h pg_states.h ringloop.h timerfd_manager.h osd_flush.o: osd_flush.cpp blockstore.h cpp-btree/btree_map.h epoll_manager.h etcd_state_client.h http_client.h json11/json11.hpp messenger.h object_id.h osd.h osd_id.h osd_ops.h osd_peering_pg.h pg_states.h ringloop.h timerfd_manager.h
g++ $(CXXFLAGS) -c -o $@ $< g++ $(CXXFLAGS) -c -o $@ $<
osd_main.o: osd_main.cpp blockstore.h cpp-btree/btree_map.h etcd_state_client.h http_client.h json11/json11.hpp messenger.h object_id.h osd.h osd_id.h osd_ops.h osd_peering_pg.h pg_states.h ringloop.h timerfd_manager.h osd_main.o: osd_main.cpp blockstore.h cpp-btree/btree_map.h epoll_manager.h etcd_state_client.h http_client.h json11/json11.hpp messenger.h object_id.h osd.h osd_id.h osd_ops.h osd_peering_pg.h pg_states.h ringloop.h timerfd_manager.h
g++ $(CXXFLAGS) -c -o $@ $< g++ $(CXXFLAGS) -c -o $@ $<
osd_peering.o: osd_peering.cpp base64.h blockstore.h cpp-btree/btree_map.h etcd_state_client.h http_client.h json11/json11.hpp messenger.h object_id.h osd.h osd_id.h osd_ops.h osd_peering_pg.h pg_states.h ringloop.h timerfd_manager.h osd_peering.o: osd_peering.cpp base64.h blockstore.h cpp-btree/btree_map.h epoll_manager.h etcd_state_client.h http_client.h json11/json11.hpp messenger.h object_id.h osd.h osd_id.h osd_ops.h osd_peering_pg.h pg_states.h ringloop.h timerfd_manager.h
g++ $(CXXFLAGS) -c -o $@ $< g++ $(CXXFLAGS) -c -o $@ $<
osd_peering_pg.o: osd_peering_pg.cpp cpp-btree/btree_map.h object_id.h osd_id.h osd_ops.h osd_peering_pg.h pg_states.h osd_peering_pg.o: osd_peering_pg.cpp cpp-btree/btree_map.h object_id.h osd_id.h osd_ops.h osd_peering_pg.h pg_states.h
g++ $(CXXFLAGS) -c -o $@ $< g++ $(CXXFLAGS) -c -o $@ $<
osd_peering_pg_test.o: osd_peering_pg_test.cpp cpp-btree/btree_map.h object_id.h osd_id.h osd_ops.h osd_peering_pg.h pg_states.h osd_peering_pg_test.o: osd_peering_pg_test.cpp cpp-btree/btree_map.h object_id.h osd_id.h osd_ops.h osd_peering_pg.h pg_states.h
g++ $(CXXFLAGS) -c -o $@ $< g++ $(CXXFLAGS) -c -o $@ $<
osd_primary.o: osd_primary.cpp blockstore.h cpp-btree/btree_map.h etcd_state_client.h http_client.h json11/json11.hpp messenger.h object_id.h osd.h osd_id.h osd_ops.h osd_peering_pg.h osd_primary.h osd_rmw.h pg_states.h ringloop.h timerfd_manager.h osd_primary.o: osd_primary.cpp blockstore.h cpp-btree/btree_map.h epoll_manager.h etcd_state_client.h http_client.h json11/json11.hpp messenger.h object_id.h osd.h osd_id.h osd_ops.h osd_peering_pg.h osd_primary.h osd_rmw.h pg_states.h ringloop.h timerfd_manager.h
g++ $(CXXFLAGS) -c -o $@ $< g++ $(CXXFLAGS) -c -o $@ $<
osd_primary_subops.o: osd_primary_subops.cpp blockstore.h cpp-btree/btree_map.h etcd_state_client.h http_client.h json11/json11.hpp messenger.h object_id.h osd.h osd_id.h osd_ops.h osd_peering_pg.h osd_primary.h osd_rmw.h pg_states.h ringloop.h timerfd_manager.h osd_primary_subops.o: osd_primary_subops.cpp blockstore.h cpp-btree/btree_map.h epoll_manager.h etcd_state_client.h http_client.h json11/json11.hpp messenger.h object_id.h osd.h osd_id.h osd_ops.h osd_peering_pg.h osd_primary.h osd_rmw.h pg_states.h ringloop.h timerfd_manager.h
g++ $(CXXFLAGS) -c -o $@ $< g++ $(CXXFLAGS) -c -o $@ $<
osd_rmw.o: osd_rmw.cpp object_id.h osd_id.h osd_rmw.h xor.h osd_rmw.o: osd_rmw.cpp object_id.h osd_id.h osd_rmw.h xor.h
g++ $(CXXFLAGS) -c -o $@ $< g++ $(CXXFLAGS) -c -o $@ $<
osd_rmw_test.o: osd_rmw_test.cpp object_id.h osd_id.h osd_rmw.cpp osd_rmw.h test_pattern.h xor.h osd_rmw_test.o: osd_rmw_test.cpp object_id.h osd_id.h osd_rmw.cpp osd_rmw.h test_pattern.h xor.h
g++ $(CXXFLAGS) -c -o $@ $< g++ $(CXXFLAGS) -c -o $@ $<
osd_secondary.o: osd_secondary.cpp blockstore.h cpp-btree/btree_map.h etcd_state_client.h http_client.h json11/json11.hpp messenger.h object_id.h osd.h osd_id.h osd_ops.h osd_peering_pg.h pg_states.h ringloop.h timerfd_manager.h osd_secondary.o: osd_secondary.cpp blockstore.h cpp-btree/btree_map.h epoll_manager.h etcd_state_client.h http_client.h json11/json11.hpp messenger.h object_id.h osd.h osd_id.h osd_ops.h osd_peering_pg.h pg_states.h ringloop.h timerfd_manager.h
g++ $(CXXFLAGS) -c -o $@ $< g++ $(CXXFLAGS) -c -o $@ $<
osd_test.o: osd_test.cpp object_id.h osd_id.h osd_ops.h rw_blocking.h test_pattern.h osd_test.o: osd_test.cpp object_id.h osd_id.h osd_ops.h rw_blocking.h test_pattern.h
g++ $(CXXFLAGS) -c -o $@ $< g++ $(CXXFLAGS) -c -o $@ $<

98
osd.cpp
View File

@ -1,5 +1,4 @@
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/epoll.h>
#include <sys/poll.h> #include <sys/poll.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <netinet/tcp.h> #include <netinet/tcp.h>
@ -7,8 +6,6 @@
#include "osd.h" #include "osd.h"
#define MAX_EPOLL_EVENTS 64
const char* osd_op_names[] = { const char* osd_op_names[] = {
"", "",
"read", "read",
@ -38,13 +35,9 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo
parse_config(config); parse_config(config);
epoll_fd = epoll_create(1); epmgr = new epoll_manager_t(ringloop);
if (epoll_fd < 0) this->tfd = epmgr->tfd;
{
throw std::runtime_error(std::string("epoll_create: ") + strerror(errno));
}
this->tfd = new timerfd_manager_t([this](int fd, std::function<void(int, int)> handler) { set_fd_handler(fd, handler); });
this->tfd->set_timer(print_stats_interval*1000, true, [this](int timer_id) this->tfd->set_timer(print_stats_interval*1000, true, [this](int timer_id)
{ {
print_stats(); print_stats();
@ -63,13 +56,8 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo
osd_t::~osd_t() osd_t::~osd_t()
{ {
if (tfd)
{
delete tfd;
tfd = NULL;
}
ringloop->unregister_consumer(&consumer); ringloop->unregister_consumer(&consumer);
close(epoll_fd); delete epmgr;
close(listen_fd); close(listen_fd);
} }
@ -181,15 +169,10 @@ void osd_t::bind_socket()
fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK); fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
epoll_event ev; epmgr->set_fd_handler(listen_fd, [this](int fd, int events)
ev.data.fd = listen_fd;
ev.events = EPOLLIN | EPOLLET;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &ev) < 0)
{ {
close(listen_fd); c_cli.accept_connections(listen_fd);
close(epoll_fd); });
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
}
} }
bool osd_t::shutdown() bool osd_t::shutdown()
@ -204,81 +187,12 @@ bool osd_t::shutdown()
void osd_t::loop() void osd_t::loop()
{ {
if (!wait_state)
{
handle_epoll_events();
wait_state = 1;
}
handle_peers(); handle_peers();
c_cli.read_requests(); c_cli.read_requests();
c_cli.send_replies(); c_cli.send_replies();
ringloop->submit(); ringloop->submit();
} }
void osd_t::set_fd_handler(int fd, std::function<void(int, int)> handler)
{
if (handler != NULL)
{
bool exists = epoll_handlers.find(fd) != epoll_handlers.end();
epoll_event ev;
ev.data.fd = fd;
ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLET;
if (epoll_ctl(epoll_fd, exists ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &ev) < 0)
{
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
}
epoll_handlers[fd] = handler;
}
else
{
if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL) < 0 && errno != ENOENT)
{
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
}
epoll_handlers.erase(fd);
}
}
void osd_t::handle_epoll_events()
{
io_uring_sqe *sqe = ringloop->get_sqe();
if (!sqe)
{
throw std::runtime_error("can't get SQE, will fall out of sync with EPOLLET");
}
ring_data_t *data = ((ring_data_t*)sqe->user_data);
my_uring_prep_poll_add(sqe, epoll_fd, POLLIN);
data->callback = [this](ring_data_t *data)
{
if (data->res < 0)
{
throw std::runtime_error(std::string("epoll failed: ") + strerror(-data->res));
}
handle_epoll_events();
};
ringloop->submit();
int nfds;
epoll_event events[MAX_EPOLL_EVENTS];
restart:
nfds = epoll_wait(epoll_fd, events, MAX_EPOLL_EVENTS, 0);
for (int i = 0; i < nfds; i++)
{
if (events[i].data.fd == listen_fd)
{
c_cli.accept_connections(listen_fd);
}
else
{
auto & cb = epoll_handlers[events[i].data.fd];
cb(events[i].data.fd, events[i].events);
}
}
if (nfds == MAX_EPOLL_EVENTS)
{
goto restart;
}
}
void osd_t::exec_op(osd_op_t *cur_op) void osd_t::exec_op(osd_op_t *cur_op)
{ {
clock_gettime(CLOCK_REALTIME, &cur_op->tv_begin); clock_gettime(CLOCK_REALTIME, &cur_op->tv_begin);

7
osd.h
View File

@ -16,6 +16,7 @@
#include "blockstore.h" #include "blockstore.h"
#include "ringloop.h" #include "ringloop.h"
#include "timerfd_manager.h" #include "timerfd_manager.h"
#include "epoll_manager.h"
#include "osd_peering_pg.h" #include "osd_peering_pg.h"
#include "messenger.h" #include "messenger.h"
#include "etcd_state_client.h" #include "etcd_state_client.h"
@ -111,13 +112,11 @@ class osd_t
uint64_t pg_stripe_size = DEFAULT_PG_STRIPE_SIZE; uint64_t pg_stripe_size = DEFAULT_PG_STRIPE_SIZE;
ring_loop_t *ringloop; ring_loop_t *ringloop;
timerfd_manager_t *tfd = NULL; timerfd_manager_t *tfd = NULL;
epoll_manager_t *epmgr = NULL;
int wait_state = 0;
int epoll_fd = 0;
int listening_port = 0; int listening_port = 0;
int listen_fd = 0; int listen_fd = 0;
ring_consumer_t consumer; ring_consumer_t consumer;
std::map<int, std::function<void(int, int)>> epoll_handlers;
// op statistics // op statistics
osd_op_stats_t prev_stats; osd_op_stats_t prev_stats;
@ -149,8 +148,6 @@ class osd_t
// event loop, socket read/write // event loop, socket read/write
void loop(); void loop();
void set_fd_handler(int fd, std::function<void(int, int)> handler);
void handle_epoll_events();
// peer handling (primary OSD logic) // peer handling (primary OSD logic)
void parse_test_peer(std::string peer); void parse_test_peer(std::string peer);