1
0
Fork 0

Compare commits

...

3 Commits

Author SHA1 Message Date
Vitaliy Filippov c414a90abc TRACE 2020-05-28 12:41:08 +03:00
Vitaliy Filippov 36fe7d394b EPOLLLT 2020-05-28 12:41:08 +03:00
Vitaliy Filippov 540137dd23 Submit 2020-05-28 12:41:08 +03:00
13 changed files with 82 additions and 34 deletions

View File

@ -116,6 +116,11 @@ void journal_flusher_t::force_start()
#define await_sqe(label) \ #define await_sqe(label) \
resume_##label:\ resume_##label:\
{\
timespec now;\
clock_gettime(CLOCK_REALTIME, &now);\
printf("get_sqe %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);\
}\
sqe = bs->get_sqe();\ sqe = bs->get_sqe();\
if (!sqe)\ if (!sqe)\
{\ {\

View File

@ -62,6 +62,11 @@
struct ring_data_t *data = ((ring_data_t*)sqe->user_data) struct ring_data_t *data = ((ring_data_t*)sqe->user_data)
#define BS_SUBMIT_GET_ONLY_SQE(sqe) \ #define BS_SUBMIT_GET_ONLY_SQE(sqe) \
{\
timespec now;\
clock_gettime(CLOCK_REALTIME, &now);\
printf("get_sqe %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);\
}\
struct io_uring_sqe *sqe = get_sqe();\ struct io_uring_sqe *sqe = get_sqe();\
if (!sqe)\ if (!sqe)\
{\ {\
@ -71,6 +76,11 @@
} }
#define BS_SUBMIT_GET_SQE_DECL(sqe) \ #define BS_SUBMIT_GET_SQE_DECL(sqe) \
{\
timespec now;\
clock_gettime(CLOCK_REALTIME, &now);\
printf("get_sqe %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);\
}\
sqe = get_sqe();\ sqe = get_sqe();\
if (!sqe)\ if (!sqe)\
{\ {\

View File

@ -153,6 +153,11 @@ resume_2:
resume_3: resume_3:
if (!disable_journal_fsync) if (!disable_journal_fsync)
{ {
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("get_sqe %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
io_uring_sqe *sqe = get_sqe(); io_uring_sqe *sqe = get_sqe();
if (!sqe) if (!sqe)
{ {

View File

@ -296,6 +296,11 @@ int blockstore_impl_t::continue_write(blockstore_op_t *op)
return 1; return 1;
resume_2: resume_2:
// Only for the immediate_commit mode: prepare and submit big_write journal entry // Only for the immediate_commit mode: prepare and submit big_write journal entry
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("get_sqe %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
sqe = get_sqe(); sqe = get_sqe();
if (!sqe) if (!sqe)
{ {
@ -323,6 +328,11 @@ resume_2:
return 1; return 1;
resume_4: resume_4:
// Switch object state // Switch object state
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("write_done %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
#ifdef BLOCKSTORE_DEBUG #ifdef BLOCKSTORE_DEBUG
printf("Ack write %lu:%lu v%lu = %d\n", op->oid.inode, op->oid.stripe, op->version, dirty_it->second.state); printf("Ack write %lu:%lu v%lu = %d\n", op->oid.inode, op->oid.stripe, op->version, dirty_it->second.state);
#endif #endif

View File

@ -115,7 +115,7 @@ void cluster_client_t::try_connect_peer_addr(osd_num_t peer_osd, const char *pee
.osd_num = peer_osd, .osd_num = peer_osd,
.in_buf = malloc(receive_buffer_size), .in_buf = malloc(receive_buffer_size),
}; };
tfd->set_fd_handler(peer_fd, [this](int peer_fd, int epoll_events) tfd->set_fd_handler(peer_fd, true, [this](int peer_fd, int epoll_events)
{ {
// Either OUT (connected) or HUP // Either OUT (connected) or HUP
handle_connect_epoll(peer_fd); handle_connect_epoll(peer_fd);
@ -146,8 +146,7 @@ void cluster_client_t::handle_connect_epoll(int peer_fd)
int one = 1; int one = 1;
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
cl.peer_state = PEER_CONNECTED; cl.peer_state = PEER_CONNECTED;
// FIXME Disable EPOLLOUT on this fd tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events)
tfd->set_fd_handler(peer_fd, [this](int peer_fd, int epoll_events)
{ {
handle_peer_epoll(peer_fd, epoll_events); handle_peer_epoll(peer_fd, epoll_events);
}); });
@ -320,7 +319,7 @@ void cluster_client_t::stop_client(int peer_fd)
} }
} }
clients.erase(it); clients.erase(it);
tfd->set_fd_handler(peer_fd, NULL); tfd->set_fd_handler(peer_fd, false, NULL);
if (cl.osd_num) if (cl.osd_num)
{ {
osd_peer_fds.erase(cl.osd_num); osd_peer_fds.erase(cl.osd_num);

View File

@ -149,7 +149,7 @@ http_co_t::~http_co_t()
} }
if (peer_fd >= 0) if (peer_fd >= 0)
{ {
tfd->set_fd_handler(peer_fd, NULL); tfd->set_fd_handler(peer_fd, false, NULL);
close(peer_fd); close(peer_fd);
peer_fd = -1; peer_fd = -1;
} }
@ -204,24 +204,10 @@ void http_co_t::start_connection()
delete this; delete this;
}); });
} }
tfd->set_fd_handler(peer_fd, [this](int peer_fd, int epoll_events) tfd->set_fd_handler(peer_fd, true, [this](int peer_fd, int epoll_events)
{ {
this->epoll_events |= epoll_events; this->epoll_events |= epoll_events;
if (state == HTTP_CO_CONNECTING) handle_connect_result();
{
handle_connect_result();
}
else
{
if (this->epoll_events & EPOLLIN)
{
submit_read();
}
else if (this->epoll_events & (EPOLLRDHUP|EPOLLERR))
{
delete this;
}
}
}); });
epoll_events = 0; epoll_events = 0;
// Finally call connect // Finally call connect
@ -253,6 +239,18 @@ void http_co_t::handle_connect_result()
} }
int one = 1; int one = 1;
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events)
{
this->epoll_events |= epoll_events;
if (this->epoll_events & EPOLLIN)
{
submit_read();
}
else if (this->epoll_events & (EPOLLRDHUP|EPOLLERR))
{
delete this;
}
});
state = HTTP_CO_SENDING_REQUEST; state = HTTP_CO_SENDING_REQUEST;
submit_send(); submit_send();
} }

21
osd.cpp
View File

@ -42,7 +42,7 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo
throw std::runtime_error(std::string("epoll_create: ") + strerror(errno)); throw std::runtime_error(std::string("epoll_create: ") + strerror(errno));
} }
this->tfd = new timerfd_manager_t([this](int fd, std::function<void(int, int)> handler) { set_fd_handler(fd, handler); }); this->tfd = new timerfd_manager_t([this](int fd, bool out, std::function<void(int, int)> handler) { set_fd_handler(fd, out, handler); });
this->tfd->set_timer(print_stats_interval*1000, true, [this](int timer_id) this->tfd->set_timer(print_stats_interval*1000, true, [this](int timer_id)
{ {
print_stats(); print_stats();
@ -202,7 +202,7 @@ void osd_t::bind_socket()
epoll_event ev; epoll_event ev;
ev.data.fd = listen_fd; ev.data.fd = listen_fd;
ev.events = EPOLLIN | EPOLLET; ev.events = EPOLLIN;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &ev) < 0) if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &ev) < 0)
{ {
close(listen_fd); close(listen_fd);
@ -234,14 +234,14 @@ void osd_t::loop()
ringloop->submit(); ringloop->submit();
} }
void osd_t::set_fd_handler(int fd, std::function<void(int, int)> handler) void osd_t::set_fd_handler(int fd, bool out, std::function<void(int, int)> handler)
{ {
if (handler != NULL) if (handler != NULL)
{ {
bool exists = epoll_handlers.find(fd) != epoll_handlers.end(); bool exists = epoll_handlers.find(fd) != epoll_handlers.end();
epoll_event ev; epoll_event ev;
ev.data.fd = fd; ev.data.fd = fd;
ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLET; ev.events = EPOLLIN | (out ? EPOLLOUT : 0) | EPOLLRDHUP;
if (epoll_ctl(epoll_fd, exists ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &ev) < 0) if (epoll_ctl(epoll_fd, exists ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &ev) < 0)
{ {
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
@ -260,11 +260,18 @@ void osd_t::set_fd_handler(int fd, std::function<void(int, int)> handler)
void osd_t::handle_epoll_events() void osd_t::handle_epoll_events()
{ {
wait_state = 0;
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("get_sqe %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
io_uring_sqe *sqe = ringloop->get_sqe(); io_uring_sqe *sqe = ringloop->get_sqe();
if (!sqe) if (!sqe)
{ {
throw std::runtime_error("can't get SQE, will fall out of sync with EPOLLET"); return;
} }
wait_state = 1;
ring_data_t *data = ((ring_data_t*)sqe->user_data); ring_data_t *data = ((ring_data_t*)sqe->user_data);
my_uring_prep_poll_add(sqe, epoll_fd, POLLIN); my_uring_prep_poll_add(sqe, epoll_fd, POLLIN);
data->callback = [this](ring_data_t *data) data->callback = [this](ring_data_t *data)
@ -275,7 +282,6 @@ void osd_t::handle_epoll_events()
} }
handle_epoll_events(); handle_epoll_events();
}; };
ringloop->submit();
int nfds; int nfds;
epoll_event events[MAX_EPOLL_EVENTS]; epoll_event events[MAX_EPOLL_EVENTS];
restart: restart:
@ -305,7 +311,7 @@ restart:
.in_buf = malloc(c_cli.receive_buffer_size), .in_buf = malloc(c_cli.receive_buffer_size),
}; };
// Add FD to epoll // Add FD to epoll
set_fd_handler(peer_fd, [this](int peer_fd, int epoll_events) set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events)
{ {
c_cli.handle_peer_epoll(peer_fd, epoll_events); c_cli.handle_peer_epoll(peer_fd, epoll_events);
}); });
@ -323,6 +329,7 @@ restart:
cb(events[i].data.fd, events[i].events); cb(events[i].data.fd, events[i].events);
} }
} }
printf("%d events\n", nfds);
if (nfds == MAX_EPOLL_EVENTS) if (nfds == MAX_EPOLL_EVENTS)
{ {
goto restart; goto restart;

2
osd.h
View File

@ -149,7 +149,7 @@ class osd_t
// event loop, socket read/write // event loop, socket read/write
void loop(); void loop();
void set_fd_handler(int fd, std::function<void(int, int)> handler); void set_fd_handler(int fd, bool out, std::function<void(int, int)> handler);
void handle_epoll_events(); void handle_epoll_events();
// peer handling (primary OSD logic) // peer handling (primary OSD logic)

View File

@ -199,6 +199,7 @@ void osd_t::continue_primary_write(osd_op_t *cur_op)
else if (op_data->st == 7) goto resume_7; else if (op_data->st == 7) goto resume_7;
else if (op_data->st == 8) goto resume_8; else if (op_data->st == 8) goto resume_8;
assert(op_data->st == 0); assert(op_data->st == 0);
printf("primary_write\n");
if (!check_write_queue(cur_op, pg)) if (!check_write_queue(cur_op, pg))
{ {
return; return;
@ -387,6 +388,7 @@ void osd_t::continue_primary_sync(osd_op_t *cur_op)
else if (op_data->st == 5) goto resume_5; else if (op_data->st == 5) goto resume_5;
else if (op_data->st == 6) goto resume_6; else if (op_data->st == 6) goto resume_6;
assert(op_data->st == 0); assert(op_data->st == 0);
printf("primary_sync\n");
if (syncs_in_progress.size() > 0) if (syncs_in_progress.size() > 0)
{ {
// Wait for previous syncs, if any // Wait for previous syncs, if any

View File

@ -6,6 +6,11 @@ void cluster_client_t::read_requests()
{ {
int peer_fd = read_ready_clients[i]; int peer_fd = read_ready_clients[i];
auto & cl = clients[peer_fd]; auto & cl = clients[peer_fd];
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("get_sqe %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
io_uring_sqe* sqe = ringloop->get_sqe(); io_uring_sqe* sqe = ringloop->get_sqe();
if (!sqe) if (!sqe)
{ {

View File

@ -18,11 +18,18 @@ void cluster_client_t::outbox_push(osd_op_t *cur_op)
} }
ringloop->wakeup(); ringloop->wakeup();
} }
else
ringloop->submit();
} }
bool cluster_client_t::try_send(osd_client_t & cl) bool cluster_client_t::try_send(osd_client_t & cl)
{ {
int peer_fd = cl.peer_fd; int peer_fd = cl.peer_fd;
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("get_sqe %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
io_uring_sqe* sqe = ringloop->get_sqe(); io_uring_sqe* sqe = ringloop->get_sqe();
if (!sqe) if (!sqe)
{ {

View File

@ -6,7 +6,7 @@
#include <string.h> #include <string.h>
#include "timerfd_manager.h" #include "timerfd_manager.h"
timerfd_manager_t::timerfd_manager_t(std::function<void(int, std::function<void(int, int)>)> set_fd_handler) timerfd_manager_t::timerfd_manager_t(std::function<void(int, bool, std::function<void(int, int)>)> set_fd_handler)
{ {
this->set_fd_handler = set_fd_handler; this->set_fd_handler = set_fd_handler;
wait_state = 0; wait_state = 0;
@ -15,7 +15,7 @@ timerfd_manager_t::timerfd_manager_t(std::function<void(int, std::function<void(
{ {
throw std::runtime_error(std::string("timerfd_create: ") + strerror(errno)); throw std::runtime_error(std::string("timerfd_create: ") + strerror(errno));
} }
set_fd_handler(timerfd, [this](int fd, int events) set_fd_handler(timerfd, false, [this](int fd, int events)
{ {
handle_readable(); handle_readable();
}); });
@ -23,7 +23,7 @@ timerfd_manager_t::timerfd_manager_t(std::function<void(int, std::function<void(
timerfd_manager_t::~timerfd_manager_t() timerfd_manager_t::~timerfd_manager_t()
{ {
set_fd_handler(timerfd, NULL); set_fd_handler(timerfd, false, NULL);
close(timerfd); close(timerfd);
} }

View File

@ -26,9 +26,9 @@ class timerfd_manager_t
void trigger_nearest(); void trigger_nearest();
void handle_readable(); void handle_readable();
public: public:
std::function<void(int, std::function<void(int, int)>)> set_fd_handler; std::function<void(int, bool, std::function<void(int, int)>)> set_fd_handler;
timerfd_manager_t(std::function<void(int, std::function<void(int, int)>)> set_fd_handler); timerfd_manager_t(std::function<void(int, bool, std::function<void(int, int)>)> set_fd_handler);
~timerfd_manager_t(); ~timerfd_manager_t();
int set_timer(uint64_t millis, bool repeat, std::function<void(int)> callback); int set_timer(uint64_t millis, bool repeat, std::function<void(int)> callback);
void clear_timer(int timer_id); void clear_timer(int timer_id);