forked from vitalif/vitastor
Compare commits
3 Commits
master
...
trace-sqes
Author | SHA1 | Date |
---|---|---|
Vitaliy Filippov | c414a90abc | |
Vitaliy Filippov | 36fe7d394b | |
Vitaliy Filippov | 540137dd23 |
|
@ -116,6 +116,11 @@ void journal_flusher_t::force_start()
|
||||||
|
|
||||||
#define await_sqe(label) \
|
#define await_sqe(label) \
|
||||||
resume_##label:\
|
resume_##label:\
|
||||||
|
{\
|
||||||
|
timespec now;\
|
||||||
|
clock_gettime(CLOCK_REALTIME, &now);\
|
||||||
|
printf("get_sqe %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);\
|
||||||
|
}\
|
||||||
sqe = bs->get_sqe();\
|
sqe = bs->get_sqe();\
|
||||||
if (!sqe)\
|
if (!sqe)\
|
||||||
{\
|
{\
|
||||||
|
|
|
@ -62,6 +62,11 @@
|
||||||
struct ring_data_t *data = ((ring_data_t*)sqe->user_data)
|
struct ring_data_t *data = ((ring_data_t*)sqe->user_data)
|
||||||
|
|
||||||
#define BS_SUBMIT_GET_ONLY_SQE(sqe) \
|
#define BS_SUBMIT_GET_ONLY_SQE(sqe) \
|
||||||
|
{\
|
||||||
|
timespec now;\
|
||||||
|
clock_gettime(CLOCK_REALTIME, &now);\
|
||||||
|
printf("get_sqe %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);\
|
||||||
|
}\
|
||||||
struct io_uring_sqe *sqe = get_sqe();\
|
struct io_uring_sqe *sqe = get_sqe();\
|
||||||
if (!sqe)\
|
if (!sqe)\
|
||||||
{\
|
{\
|
||||||
|
@ -71,6 +76,11 @@
|
||||||
}
|
}
|
||||||
|
|
||||||
#define BS_SUBMIT_GET_SQE_DECL(sqe) \
|
#define BS_SUBMIT_GET_SQE_DECL(sqe) \
|
||||||
|
{\
|
||||||
|
timespec now;\
|
||||||
|
clock_gettime(CLOCK_REALTIME, &now);\
|
||||||
|
printf("get_sqe %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);\
|
||||||
|
}\
|
||||||
sqe = get_sqe();\
|
sqe = get_sqe();\
|
||||||
if (!sqe)\
|
if (!sqe)\
|
||||||
{\
|
{\
|
||||||
|
|
|
@ -153,6 +153,11 @@ resume_2:
|
||||||
resume_3:
|
resume_3:
|
||||||
if (!disable_journal_fsync)
|
if (!disable_journal_fsync)
|
||||||
{
|
{
|
||||||
|
{
|
||||||
|
timespec now;
|
||||||
|
clock_gettime(CLOCK_REALTIME, &now);
|
||||||
|
printf("get_sqe %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
|
||||||
|
}
|
||||||
io_uring_sqe *sqe = get_sqe();
|
io_uring_sqe *sqe = get_sqe();
|
||||||
if (!sqe)
|
if (!sqe)
|
||||||
{
|
{
|
||||||
|
|
|
@ -296,6 +296,11 @@ int blockstore_impl_t::continue_write(blockstore_op_t *op)
|
||||||
return 1;
|
return 1;
|
||||||
resume_2:
|
resume_2:
|
||||||
// Only for the immediate_commit mode: prepare and submit big_write journal entry
|
// Only for the immediate_commit mode: prepare and submit big_write journal entry
|
||||||
|
{
|
||||||
|
timespec now;
|
||||||
|
clock_gettime(CLOCK_REALTIME, &now);
|
||||||
|
printf("get_sqe %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
|
||||||
|
}
|
||||||
sqe = get_sqe();
|
sqe = get_sqe();
|
||||||
if (!sqe)
|
if (!sqe)
|
||||||
{
|
{
|
||||||
|
@ -323,6 +328,11 @@ resume_2:
|
||||||
return 1;
|
return 1;
|
||||||
resume_4:
|
resume_4:
|
||||||
// Switch object state
|
// Switch object state
|
||||||
|
{
|
||||||
|
timespec now;
|
||||||
|
clock_gettime(CLOCK_REALTIME, &now);
|
||||||
|
printf("write_done %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
|
||||||
|
}
|
||||||
#ifdef BLOCKSTORE_DEBUG
|
#ifdef BLOCKSTORE_DEBUG
|
||||||
printf("Ack write %lu:%lu v%lu = %d\n", op->oid.inode, op->oid.stripe, op->version, dirty_it->second.state);
|
printf("Ack write %lu:%lu v%lu = %d\n", op->oid.inode, op->oid.stripe, op->version, dirty_it->second.state);
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -115,7 +115,7 @@ void cluster_client_t::try_connect_peer_addr(osd_num_t peer_osd, const char *pee
|
||||||
.osd_num = peer_osd,
|
.osd_num = peer_osd,
|
||||||
.in_buf = malloc(receive_buffer_size),
|
.in_buf = malloc(receive_buffer_size),
|
||||||
};
|
};
|
||||||
tfd->set_fd_handler(peer_fd, [this](int peer_fd, int epoll_events)
|
tfd->set_fd_handler(peer_fd, true, [this](int peer_fd, int epoll_events)
|
||||||
{
|
{
|
||||||
// Either OUT (connected) or HUP
|
// Either OUT (connected) or HUP
|
||||||
handle_connect_epoll(peer_fd);
|
handle_connect_epoll(peer_fd);
|
||||||
|
@ -146,8 +146,7 @@ void cluster_client_t::handle_connect_epoll(int peer_fd)
|
||||||
int one = 1;
|
int one = 1;
|
||||||
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
|
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
|
||||||
cl.peer_state = PEER_CONNECTED;
|
cl.peer_state = PEER_CONNECTED;
|
||||||
// FIXME Disable EPOLLOUT on this fd
|
tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events)
|
||||||
tfd->set_fd_handler(peer_fd, [this](int peer_fd, int epoll_events)
|
|
||||||
{
|
{
|
||||||
handle_peer_epoll(peer_fd, epoll_events);
|
handle_peer_epoll(peer_fd, epoll_events);
|
||||||
});
|
});
|
||||||
|
@ -320,7 +319,7 @@ void cluster_client_t::stop_client(int peer_fd)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
clients.erase(it);
|
clients.erase(it);
|
||||||
tfd->set_fd_handler(peer_fd, NULL);
|
tfd->set_fd_handler(peer_fd, false, NULL);
|
||||||
if (cl.osd_num)
|
if (cl.osd_num)
|
||||||
{
|
{
|
||||||
osd_peer_fds.erase(cl.osd_num);
|
osd_peer_fds.erase(cl.osd_num);
|
||||||
|
|
|
@ -149,7 +149,7 @@ http_co_t::~http_co_t()
|
||||||
}
|
}
|
||||||
if (peer_fd >= 0)
|
if (peer_fd >= 0)
|
||||||
{
|
{
|
||||||
tfd->set_fd_handler(peer_fd, NULL);
|
tfd->set_fd_handler(peer_fd, false, NULL);
|
||||||
close(peer_fd);
|
close(peer_fd);
|
||||||
peer_fd = -1;
|
peer_fd = -1;
|
||||||
}
|
}
|
||||||
|
@ -204,24 +204,10 @@ void http_co_t::start_connection()
|
||||||
delete this;
|
delete this;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
tfd->set_fd_handler(peer_fd, [this](int peer_fd, int epoll_events)
|
tfd->set_fd_handler(peer_fd, true, [this](int peer_fd, int epoll_events)
|
||||||
{
|
{
|
||||||
this->epoll_events |= epoll_events;
|
this->epoll_events |= epoll_events;
|
||||||
if (state == HTTP_CO_CONNECTING)
|
handle_connect_result();
|
||||||
{
|
|
||||||
handle_connect_result();
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
if (this->epoll_events & EPOLLIN)
|
|
||||||
{
|
|
||||||
submit_read();
|
|
||||||
}
|
|
||||||
else if (this->epoll_events & (EPOLLRDHUP|EPOLLERR))
|
|
||||||
{
|
|
||||||
delete this;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
epoll_events = 0;
|
epoll_events = 0;
|
||||||
// Finally call connect
|
// Finally call connect
|
||||||
|
@ -253,6 +239,18 @@ void http_co_t::handle_connect_result()
|
||||||
}
|
}
|
||||||
int one = 1;
|
int one = 1;
|
||||||
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
|
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
|
||||||
|
tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events)
|
||||||
|
{
|
||||||
|
this->epoll_events |= epoll_events;
|
||||||
|
if (this->epoll_events & EPOLLIN)
|
||||||
|
{
|
||||||
|
submit_read();
|
||||||
|
}
|
||||||
|
else if (this->epoll_events & (EPOLLRDHUP|EPOLLERR))
|
||||||
|
{
|
||||||
|
delete this;
|
||||||
|
}
|
||||||
|
});
|
||||||
state = HTTP_CO_SENDING_REQUEST;
|
state = HTTP_CO_SENDING_REQUEST;
|
||||||
submit_send();
|
submit_send();
|
||||||
}
|
}
|
||||||
|
|
21
osd.cpp
21
osd.cpp
|
@ -42,7 +42,7 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo
|
||||||
throw std::runtime_error(std::string("epoll_create: ") + strerror(errno));
|
throw std::runtime_error(std::string("epoll_create: ") + strerror(errno));
|
||||||
}
|
}
|
||||||
|
|
||||||
this->tfd = new timerfd_manager_t([this](int fd, std::function<void(int, int)> handler) { set_fd_handler(fd, handler); });
|
this->tfd = new timerfd_manager_t([this](int fd, bool out, std::function<void(int, int)> handler) { set_fd_handler(fd, out, handler); });
|
||||||
this->tfd->set_timer(print_stats_interval*1000, true, [this](int timer_id)
|
this->tfd->set_timer(print_stats_interval*1000, true, [this](int timer_id)
|
||||||
{
|
{
|
||||||
print_stats();
|
print_stats();
|
||||||
|
@ -202,7 +202,7 @@ void osd_t::bind_socket()
|
||||||
|
|
||||||
epoll_event ev;
|
epoll_event ev;
|
||||||
ev.data.fd = listen_fd;
|
ev.data.fd = listen_fd;
|
||||||
ev.events = EPOLLIN | EPOLLET;
|
ev.events = EPOLLIN;
|
||||||
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &ev) < 0)
|
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &ev) < 0)
|
||||||
{
|
{
|
||||||
close(listen_fd);
|
close(listen_fd);
|
||||||
|
@ -234,14 +234,14 @@ void osd_t::loop()
|
||||||
ringloop->submit();
|
ringloop->submit();
|
||||||
}
|
}
|
||||||
|
|
||||||
void osd_t::set_fd_handler(int fd, std::function<void(int, int)> handler)
|
void osd_t::set_fd_handler(int fd, bool out, std::function<void(int, int)> handler)
|
||||||
{
|
{
|
||||||
if (handler != NULL)
|
if (handler != NULL)
|
||||||
{
|
{
|
||||||
bool exists = epoll_handlers.find(fd) != epoll_handlers.end();
|
bool exists = epoll_handlers.find(fd) != epoll_handlers.end();
|
||||||
epoll_event ev;
|
epoll_event ev;
|
||||||
ev.data.fd = fd;
|
ev.data.fd = fd;
|
||||||
ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLET;
|
ev.events = EPOLLIN | (out ? EPOLLOUT : 0) | EPOLLRDHUP;
|
||||||
if (epoll_ctl(epoll_fd, exists ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &ev) < 0)
|
if (epoll_ctl(epoll_fd, exists ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &ev) < 0)
|
||||||
{
|
{
|
||||||
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
|
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
|
||||||
|
@ -260,11 +260,18 @@ void osd_t::set_fd_handler(int fd, std::function<void(int, int)> handler)
|
||||||
|
|
||||||
void osd_t::handle_epoll_events()
|
void osd_t::handle_epoll_events()
|
||||||
{
|
{
|
||||||
|
wait_state = 0;
|
||||||
|
{
|
||||||
|
timespec now;
|
||||||
|
clock_gettime(CLOCK_REALTIME, &now);
|
||||||
|
printf("get_sqe %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
|
||||||
|
}
|
||||||
io_uring_sqe *sqe = ringloop->get_sqe();
|
io_uring_sqe *sqe = ringloop->get_sqe();
|
||||||
if (!sqe)
|
if (!sqe)
|
||||||
{
|
{
|
||||||
throw std::runtime_error("can't get SQE, will fall out of sync with EPOLLET");
|
return;
|
||||||
}
|
}
|
||||||
|
wait_state = 1;
|
||||||
ring_data_t *data = ((ring_data_t*)sqe->user_data);
|
ring_data_t *data = ((ring_data_t*)sqe->user_data);
|
||||||
my_uring_prep_poll_add(sqe, epoll_fd, POLLIN);
|
my_uring_prep_poll_add(sqe, epoll_fd, POLLIN);
|
||||||
data->callback = [this](ring_data_t *data)
|
data->callback = [this](ring_data_t *data)
|
||||||
|
@ -275,7 +282,6 @@ void osd_t::handle_epoll_events()
|
||||||
}
|
}
|
||||||
handle_epoll_events();
|
handle_epoll_events();
|
||||||
};
|
};
|
||||||
ringloop->submit();
|
|
||||||
int nfds;
|
int nfds;
|
||||||
epoll_event events[MAX_EPOLL_EVENTS];
|
epoll_event events[MAX_EPOLL_EVENTS];
|
||||||
restart:
|
restart:
|
||||||
|
@ -305,7 +311,7 @@ restart:
|
||||||
.in_buf = malloc(c_cli.receive_buffer_size),
|
.in_buf = malloc(c_cli.receive_buffer_size),
|
||||||
};
|
};
|
||||||
// Add FD to epoll
|
// Add FD to epoll
|
||||||
set_fd_handler(peer_fd, [this](int peer_fd, int epoll_events)
|
set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events)
|
||||||
{
|
{
|
||||||
c_cli.handle_peer_epoll(peer_fd, epoll_events);
|
c_cli.handle_peer_epoll(peer_fd, epoll_events);
|
||||||
});
|
});
|
||||||
|
@ -323,6 +329,7 @@ restart:
|
||||||
cb(events[i].data.fd, events[i].events);
|
cb(events[i].data.fd, events[i].events);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
printf("%d events\n", nfds);
|
||||||
if (nfds == MAX_EPOLL_EVENTS)
|
if (nfds == MAX_EPOLL_EVENTS)
|
||||||
{
|
{
|
||||||
goto restart;
|
goto restart;
|
||||||
|
|
2
osd.h
2
osd.h
|
@ -149,7 +149,7 @@ class osd_t
|
||||||
|
|
||||||
// event loop, socket read/write
|
// event loop, socket read/write
|
||||||
void loop();
|
void loop();
|
||||||
void set_fd_handler(int fd, std::function<void(int, int)> handler);
|
void set_fd_handler(int fd, bool out, std::function<void(int, int)> handler);
|
||||||
void handle_epoll_events();
|
void handle_epoll_events();
|
||||||
|
|
||||||
// peer handling (primary OSD logic)
|
// peer handling (primary OSD logic)
|
||||||
|
|
|
@ -199,6 +199,7 @@ void osd_t::continue_primary_write(osd_op_t *cur_op)
|
||||||
else if (op_data->st == 7) goto resume_7;
|
else if (op_data->st == 7) goto resume_7;
|
||||||
else if (op_data->st == 8) goto resume_8;
|
else if (op_data->st == 8) goto resume_8;
|
||||||
assert(op_data->st == 0);
|
assert(op_data->st == 0);
|
||||||
|
printf("primary_write\n");
|
||||||
if (!check_write_queue(cur_op, pg))
|
if (!check_write_queue(cur_op, pg))
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
|
@ -387,6 +388,7 @@ void osd_t::continue_primary_sync(osd_op_t *cur_op)
|
||||||
else if (op_data->st == 5) goto resume_5;
|
else if (op_data->st == 5) goto resume_5;
|
||||||
else if (op_data->st == 6) goto resume_6;
|
else if (op_data->st == 6) goto resume_6;
|
||||||
assert(op_data->st == 0);
|
assert(op_data->st == 0);
|
||||||
|
printf("primary_sync\n");
|
||||||
if (syncs_in_progress.size() > 0)
|
if (syncs_in_progress.size() > 0)
|
||||||
{
|
{
|
||||||
// Wait for previous syncs, if any
|
// Wait for previous syncs, if any
|
||||||
|
|
|
@ -6,6 +6,11 @@ void cluster_client_t::read_requests()
|
||||||
{
|
{
|
||||||
int peer_fd = read_ready_clients[i];
|
int peer_fd = read_ready_clients[i];
|
||||||
auto & cl = clients[peer_fd];
|
auto & cl = clients[peer_fd];
|
||||||
|
{
|
||||||
|
timespec now;
|
||||||
|
clock_gettime(CLOCK_REALTIME, &now);
|
||||||
|
printf("get_sqe %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
|
||||||
|
}
|
||||||
io_uring_sqe* sqe = ringloop->get_sqe();
|
io_uring_sqe* sqe = ringloop->get_sqe();
|
||||||
if (!sqe)
|
if (!sqe)
|
||||||
{
|
{
|
||||||
|
|
|
@ -18,11 +18,18 @@ void cluster_client_t::outbox_push(osd_op_t *cur_op)
|
||||||
}
|
}
|
||||||
ringloop->wakeup();
|
ringloop->wakeup();
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
ringloop->submit();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool cluster_client_t::try_send(osd_client_t & cl)
|
bool cluster_client_t::try_send(osd_client_t & cl)
|
||||||
{
|
{
|
||||||
int peer_fd = cl.peer_fd;
|
int peer_fd = cl.peer_fd;
|
||||||
|
{
|
||||||
|
timespec now;
|
||||||
|
clock_gettime(CLOCK_REALTIME, &now);
|
||||||
|
printf("get_sqe %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
|
||||||
|
}
|
||||||
io_uring_sqe* sqe = ringloop->get_sqe();
|
io_uring_sqe* sqe = ringloop->get_sqe();
|
||||||
if (!sqe)
|
if (!sqe)
|
||||||
{
|
{
|
||||||
|
|
|
@ -6,7 +6,7 @@
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include "timerfd_manager.h"
|
#include "timerfd_manager.h"
|
||||||
|
|
||||||
timerfd_manager_t::timerfd_manager_t(std::function<void(int, std::function<void(int, int)>)> set_fd_handler)
|
timerfd_manager_t::timerfd_manager_t(std::function<void(int, bool, std::function<void(int, int)>)> set_fd_handler)
|
||||||
{
|
{
|
||||||
this->set_fd_handler = set_fd_handler;
|
this->set_fd_handler = set_fd_handler;
|
||||||
wait_state = 0;
|
wait_state = 0;
|
||||||
|
@ -15,7 +15,7 @@ timerfd_manager_t::timerfd_manager_t(std::function<void(int, std::function<void(
|
||||||
{
|
{
|
||||||
throw std::runtime_error(std::string("timerfd_create: ") + strerror(errno));
|
throw std::runtime_error(std::string("timerfd_create: ") + strerror(errno));
|
||||||
}
|
}
|
||||||
set_fd_handler(timerfd, [this](int fd, int events)
|
set_fd_handler(timerfd, false, [this](int fd, int events)
|
||||||
{
|
{
|
||||||
handle_readable();
|
handle_readable();
|
||||||
});
|
});
|
||||||
|
@ -23,7 +23,7 @@ timerfd_manager_t::timerfd_manager_t(std::function<void(int, std::function<void(
|
||||||
|
|
||||||
timerfd_manager_t::~timerfd_manager_t()
|
timerfd_manager_t::~timerfd_manager_t()
|
||||||
{
|
{
|
||||||
set_fd_handler(timerfd, NULL);
|
set_fd_handler(timerfd, false, NULL);
|
||||||
close(timerfd);
|
close(timerfd);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,9 +26,9 @@ class timerfd_manager_t
|
||||||
void trigger_nearest();
|
void trigger_nearest();
|
||||||
void handle_readable();
|
void handle_readable();
|
||||||
public:
|
public:
|
||||||
std::function<void(int, std::function<void(int, int)>)> set_fd_handler;
|
std::function<void(int, bool, std::function<void(int, int)>)> set_fd_handler;
|
||||||
|
|
||||||
timerfd_manager_t(std::function<void(int, std::function<void(int, int)>)> set_fd_handler);
|
timerfd_manager_t(std::function<void(int, bool, std::function<void(int, int)>)> set_fd_handler);
|
||||||
~timerfd_manager_t();
|
~timerfd_manager_t();
|
||||||
int set_timer(uint64_t millis, bool repeat, std::function<void(int)> callback);
|
int set_timer(uint64_t millis, bool repeat, std::function<void(int)> callback);
|
||||||
void clear_timer(int timer_id);
|
void clear_timer(int timer_id);
|
||||||
|
|
Loading…
Reference in New Issue