Compare commits

..

1 Commits

Author SHA1 Message Date
735b97fe33 Trace I/O operations (SQEs, recvmsg/sendmsg, uring_submit) 2020-06-09 00:52:29 +03:00
14 changed files with 126 additions and 124 deletions

View File

@@ -18,7 +18,7 @@ OSD_OBJS := osd.o osd_secondary.o msgr_receive.o msgr_send.o osd_peering.o osd_f
osd_primary.o osd_primary_subops.o etcd_state_client.o messenger.o osd_cluster.o http_client.o pg_states.o \
osd_rmw.o json11.o base64.o timerfd_manager.o
osd: ./libblockstore.so osd_main.cpp osd.h osd_ops.h $(OSD_OBJS)
g++ $(CXXFLAGS) -o $@ osd_main.cpp $(OSD_OBJS) ./libblockstore.so -ltcmalloc_minimal -luring -lpthread
g++ $(CXXFLAGS) -o $@ osd_main.cpp $(OSD_OBJS) ./libblockstore.so -ltcmalloc_minimal -luring
stub_osd: stub_osd.o rw_blocking.o
g++ $(CXXFLAGS) -o $@ stub_osd.o rw_blocking.o -ltcmalloc_minimal
@@ -87,7 +87,7 @@ dump_journal.o: dump_journal.cpp allocator.h blockstore.h blockstore_flush.h blo
g++ $(CXXFLAGS) -c -o $@ $<
epoll_manager.o: epoll_manager.cpp epoll_manager.h ringloop.h timerfd_manager.h
g++ $(CXXFLAGS) -c -o $@ $<
etcd_state_client.o: etcd_state_client.cpp base64.h etcd_state_client.h http_client.h json11/json11.hpp object_id.h osd_id.h osd_ops.h pg_states.h ringloop.h timerfd_manager.h
etcd_state_client.o: etcd_state_client.cpp base64.h etcd_state_client.h http_client.h json11/json11.hpp object_id.h osd_id.h osd_ops.h pg_states.h timerfd_manager.h
g++ $(CXXFLAGS) -c -o $@ $<
fio_cluster.o: fio_cluster.cpp cluster_client.h epoll_manager.h etcd_state_client.h fio/fio.h fio/optgroup.h http_client.h json11/json11.hpp messenger.h object_id.h osd_id.h osd_ops.h ringloop.h timerfd_manager.h
g++ $(CXXFLAGS) -c -o $@ $<
@@ -95,7 +95,7 @@ fio_engine.o: fio_engine.cpp blockstore.h fio/fio.h fio/optgroup.h json11/json11
g++ $(CXXFLAGS) -c -o $@ $<
fio_sec_osd.o: fio_sec_osd.cpp fio/fio.h fio/optgroup.h object_id.h osd_id.h osd_ops.h rw_blocking.h
g++ $(CXXFLAGS) -c -o $@ $<
http_client.o: http_client.cpp http_client.h json11/json11.hpp ringloop.h timerfd_manager.h
http_client.o: http_client.cpp http_client.h json11/json11.hpp timerfd_manager.h
g++ $(CXXFLAGS) -c -o $@ $<
messenger.o: messenger.cpp json11/json11.hpp messenger.h object_id.h osd_id.h osd_ops.h ringloop.h timerfd_manager.h
g++ $(CXXFLAGS) -c -o $@ $<
@@ -149,5 +149,5 @@ test_blockstore.o: test_blockstore.cpp blockstore.h object_id.h ringloop.h timer
g++ $(CXXFLAGS) -c -o $@ $<
timerfd_interval.o: timerfd_interval.cpp ringloop.h timerfd_interval.h
g++ $(CXXFLAGS) -c -o $@ $<
timerfd_manager.o: timerfd_manager.cpp ringloop.h timerfd_manager.h
timerfd_manager.o: timerfd_manager.cpp timerfd_manager.h
g++ $(CXXFLAGS) -c -o $@ $<

View File

@@ -33,6 +33,12 @@ journal_flusher_co::journal_flusher_co()
);
}
wait_count--;
if (!wait_count)
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("finished %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
};
simple_callback_w = [this](ring_data_t* data)
{
@@ -45,6 +51,12 @@ journal_flusher_co::journal_flusher_co()
);
}
wait_count--;
if (!wait_count)
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("finished %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
};
}
@@ -122,6 +134,11 @@ void journal_flusher_t::release_trim()
#define await_sqe(label) \
resume_##label:\
{\
timespec now;\
clock_gettime(CLOCK_REALTIME, &now);\
printf("get_sqe %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);\
}\
sqe = bs->get_sqe();\
if (!sqe)\
{\

View File

@@ -62,6 +62,11 @@
struct ring_data_t *data = ((ring_data_t*)sqe->user_data)
#define BS_SUBMIT_GET_ONLY_SQE(sqe) \
{\
timespec now;\
clock_gettime(CLOCK_REALTIME, &now);\
printf("get_sqe %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);\
}\
struct io_uring_sqe *sqe = get_sqe();\
if (!sqe)\
{\
@@ -71,6 +76,11 @@
}
#define BS_SUBMIT_GET_SQE_DECL(sqe) \
{\
timespec now;\
clock_gettime(CLOCK_REALTIME, &now);\
printf("get_sqe %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);\
}\
sqe = get_sqe();\
if (!sqe)\
{\

View File

@@ -147,6 +147,11 @@ resume_2:
resume_3:
if (!disable_journal_fsync)
{
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("get_sqe %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
io_uring_sqe *sqe = get_sqe();
if (!sqe)
{
@@ -237,6 +242,11 @@ void blockstore_impl_t::handle_stable_event(ring_data_t *data, blockstore_op_t *
PRIV(op)->pending_ops--;
if (PRIV(op)->pending_ops == 0)
{
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("finished %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
PRIV(op)->op_state++;
if (!continue_stable(op))
{

View File

@@ -302,6 +302,11 @@ int blockstore_impl_t::continue_write(blockstore_op_t *op)
return 1;
resume_2:
// Only for the immediate_commit mode: prepare and submit big_write journal entry
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("get_sqe %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
sqe = get_sqe();
if (!sqe)
{
@@ -333,6 +338,11 @@ resume_2:
return 1;
resume_4:
// Switch object state
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("write_done %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
#ifdef BLOCKSTORE_DEBUG
printf("Ack write %lu:%lu v%lu = %d\n", op->oid.inode, op->oid.stripe, op->version, dirty_it->second.state);
#endif

View File

@@ -57,6 +57,11 @@ void epoll_manager_t::set_fd_handler(int fd, std::function<void(int, int)> handl
void epoll_manager_t::handle_epoll_events()
{
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("epoll %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
io_uring_sqe *sqe = ringloop->get_sqe();
if (!sqe)
{

View File

@@ -24,6 +24,11 @@ void osd_messenger_t::read_requests()
{
result = -errno;
}
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("recvmsg done %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
handle_read(result, peer_fd);
}
}

View File

@@ -81,6 +81,11 @@ bool osd_messenger_t::try_send(osd_client_t & cl)
int result = sendmsg(peer_fd, &cl.write_msg, MSG_NOSIGNAL);
if (result < 0)
result = -errno;
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("sendmsg done %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
handle_send(result, peer_fd);
return true;
}

100
osd.cpp
View File

@@ -1,6 +1,5 @@
#include <sys/socket.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <sys/poll.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
@@ -44,14 +43,8 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo
{
throw std::runtime_error(std::string("epoll_create: ") + strerror(errno));
}
event_fd = eventfd(0, EFD_NONBLOCK);
if (event_fd < 0)
{
throw std::runtime_error(std::string("eventfd: ") + strerror(errno));
}
this->tfd = new timerfd_manager_t(ringloop);
this->tfd->set_fd_handler = [this](int fd, std::function<void(int, int)> handler) { set_fd_handler(fd, handler); };
this->tfd = new timerfd_manager_t([this](int fd, std::function<void(int, int)> handler) { set_fd_handler(fd, handler); });
this->tfd->set_timer(print_stats_interval*1000, true, [this](int timer_id)
{
print_stats();
@@ -66,40 +59,17 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo
consumer.loop = [this]() { loop(); };
ringloop->register_consumer(&consumer);
epoll_thread = new std::thread([this]()
{
int nfds;
epoll_event events[MAX_EPOLL_EVENTS];
while (1)
{
nfds = epoll_wait(epoll_fd, events, MAX_EPOLL_EVENTS, -1);
{
std::lock_guard<std::mutex> guard(epoll_mutex);
for (int i = 0; i < nfds; i++)
{
int fd = events[i].data.fd;
int ev = events[i].events;
epoll_ready[fd] |= ev;
}
uint64_t n = 1;
write(event_fd, &n, 8);
}
}
});
}
osd_t::~osd_t()
{
close(epoll_fd);
epoll_thread->join();
delete epoll_thread;
if (tfd)
{
delete tfd;
tfd = NULL;
}
ringloop->unregister_consumer(&consumer);
close(event_fd);
close(epoll_fd);
close(listen_fd);
}
@@ -218,13 +188,8 @@ void osd_t::bind_socket()
{
close(listen_fd);
close(epoll_fd);
throw std::runtime_error(std::string("epoll_ctl (add listen_fd): ") + strerror(errno));
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
}
epoll_handlers[listen_fd] = [this](int peer_fd, int epoll_events)
{
c_cli.accept_connections(listen_fd);
};
}
bool osd_t::shutdown()
@@ -239,23 +204,10 @@ bool osd_t::shutdown()
void osd_t::loop()
{
std::map<int,int> cur_epoll;
if (!wait_state)
{
std::lock_guard<std::mutex> guard(epoll_mutex);
cur_epoll.swap(epoll_ready);
}
for (auto p: cur_epoll)
{
auto cb_it = epoll_handlers.find(p.first);
if (cb_it != epoll_handlers.end())
{
cb_it->second(p.first, p.second);
}
}
if (!(wait_state & 2))
{
handle_eventfd();
wait_state = wait_state | 2;
handle_epoll_events();
wait_state = 1;
}
handle_peers();
c_cli.read_requests();
@@ -273,7 +225,7 @@ void osd_t::set_fd_handler(int fd, std::function<void(int, int)> handler)
ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLET;
if (epoll_ctl(epoll_fd, exists ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &ev) < 0)
{
throw std::runtime_error(std::string(exists ? "epoll_ctl (mod fd): " : "epoll_ctl (add fd): ") + strerror(errno));
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
}
epoll_handlers[fd] = handler;
}
@@ -281,36 +233,54 @@ void osd_t::set_fd_handler(int fd, std::function<void(int, int)> handler)
{
if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL) < 0 && errno != ENOENT)
{
throw std::runtime_error(std::string("epoll_ctl (remove fd): ") + strerror(errno));
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
}
epoll_handlers.erase(fd);
}
}
void osd_t::handle_eventfd()
void osd_t::handle_epoll_events()
{
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("epoll %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
io_uring_sqe *sqe = ringloop->get_sqe();
if (!sqe)
{
throw std::runtime_error("can't get SQE, will fall out of sync with eventfd");
throw std::runtime_error("can't get SQE, will fall out of sync with EPOLLET");
}
ring_data_t *data = ((ring_data_t*)sqe->user_data);
my_uring_prep_poll_add(sqe, event_fd, POLLIN);
my_uring_prep_poll_add(sqe, epoll_fd, POLLIN);
data->callback = [this](ring_data_t *data)
{
if (data->res < 0)
{
throw std::runtime_error(std::string("epoll failed: ") + strerror(-data->res));
}
handle_eventfd();
handle_epoll_events();
};
ringloop->submit();
uint64_t n = 0;
size_t res = read(event_fd, &n, 8);
if (res == 8)
int nfds;
epoll_event events[MAX_EPOLL_EVENTS];
restart:
nfds = epoll_wait(epoll_fd, events, MAX_EPOLL_EVENTS, 0);
for (int i = 0; i < nfds; i++)
{
// No need to do anything, the loop has already woken up
ringloop->wakeup();
if (events[i].data.fd == listen_fd)
{
c_cli.accept_connections(listen_fd);
}
else
{
auto & cb = epoll_handlers[events[i].data.fd];
cb(events[i].data.fd, events[i].events);
}
}
if (nfds == MAX_EPOLL_EVENTS)
{
goto restart;
}
}

8
osd.h
View File

@@ -12,8 +12,6 @@
#include <set>
#include <deque>
#include <mutex>
#include <thread>
#include "blockstore.h"
#include "ringloop.h"
@@ -116,10 +114,6 @@ class osd_t
int wait_state = 0;
int epoll_fd = 0;
int event_fd = 0;
std::thread *epoll_thread = NULL;
std::mutex epoll_mutex;
std::map<int, int> epoll_ready;
int listening_port = 0;
int listen_fd = 0;
ring_consumer_t consumer;
@@ -156,7 +150,7 @@ class osd_t
// event loop, socket read/write
void loop();
void set_fd_handler(int fd, std::function<void(int, int)> handler);
void handle_eventfd();
void handle_epoll_events();
// peer handling (primary OSD logic)
void parse_test_peer(std::string peer);

View File

@@ -198,6 +198,7 @@ void osd_t::continue_primary_write(osd_op_t *cur_op)
else if (op_data->st == 8) goto resume_8;
else if (op_data->st == 9) goto resume_9;
assert(op_data->st == 0);
printf("primary_write\n");
if (!check_write_queue(cur_op, pg))
{
return;
@@ -389,6 +390,7 @@ void osd_t::continue_primary_sync(osd_op_t *cur_op)
else if (op_data->st == 5) goto resume_5;
else if (op_data->st == 6) goto resume_6;
assert(op_data->st == 0);
printf("primary_sync\n");
if (syncs_in_progress.size() > 0)
{
// Wait for previous syncs, if any

View File

@@ -4,6 +4,8 @@
#define _LARGEFILE64_SOURCE
#endif
#include <stdio.h>
#include <time.h>
#include <string.h>
#include <assert.h>
#include <liburing.h>
@@ -158,7 +160,13 @@ public:
}
inline int submit()
{
return io_uring_submit(&ring);
int r = io_uring_submit(&ring);
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("submit %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
return r;
}
inline int wait()
{

View File

@@ -1,24 +1,29 @@
#include <sys/timerfd.h>
#include <sys/poll.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include "timerfd_manager.h"
timerfd_manager_t::timerfd_manager_t(ring_loop_t *ringloop)
timerfd_manager_t::timerfd_manager_t(std::function<void(int, std::function<void(int, int)>)> set_fd_handler)
{
this->set_fd_handler = set_fd_handler;
wait_state = 0;
timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
if (timerfd < 0)
{
throw std::runtime_error(std::string("timerfd_create: ") + strerror(errno));
}
consumer.loop = [this]() { loop(); };
ringloop->register_consumer(&consumer);
this->ringloop = ringloop;
set_fd_handler(timerfd, [this](int fd, int events)
{
handle_readable();
});
}
timerfd_manager_t::~timerfd_manager_t()
{
ringloop->unregister_consumer(&consumer);
set_fd_handler(timerfd, NULL);
close(timerfd);
}
@@ -48,7 +53,6 @@ int timerfd_manager_t::set_timer(uint64_t millis, bool repeat, std::function<voi
});
inc_timer(timers[timers.size()-1]);
set_nearest();
set_wait();
return timer_id;
}
@@ -69,7 +73,6 @@ void timerfd_manager_t::clear_timer(int timer_id)
nearest--;
}
set_nearest();
set_wait();
break;
}
}
@@ -154,36 +157,3 @@ void timerfd_manager_t::trigger_nearest()
cb(nearest_id);
nearest = -1;
}
void timerfd_manager_t::loop()
{
if (!(wait_state & 1) && timers.size())
{
set_nearest();
}
set_wait();
}
void timerfd_manager_t::set_wait()
{
if ((wait_state & 3) == 1)
{
io_uring_sqe *sqe = ringloop->get_sqe();
if (!sqe)
{
return;
}
ring_data_t *data = ((ring_data_t*)sqe->user_data);
my_uring_prep_poll_add(sqe, timerfd, POLLIN);
data->callback = [this](ring_data_t *data)
{
if (data->res < 0)
{
throw std::runtime_error(std::string("waiting for timer failed: ") + strerror(-data->res));
}
handle_readable();
set_wait();
};
wait_state = 3;
}
}

View File

@@ -1,7 +1,8 @@
#pragma once
#include <time.h>
#include "ringloop.h"
#include <vector>
#include <functional>
struct timerfd_timer_t
{
@@ -19,20 +20,15 @@ class timerfd_manager_t
int nearest = -1;
int id = 1;
std::vector<timerfd_timer_t> timers;
ring_loop_t *ringloop;
ring_consumer_t consumer;
void inc_timer(timerfd_timer_t & t);
void set_nearest();
void trigger_nearest();
void handle_readable();
void set_wait();
void loop();
public:
// FIXME shouldn't be here
std::function<void(int, std::function<void(int, int)>)> set_fd_handler;
timerfd_manager_t(ring_loop_t *ringloop);
timerfd_manager_t(std::function<void(int, std::function<void(int, int)>)> set_fd_handler);
~timerfd_manager_t();
int set_timer(uint64_t millis, bool repeat, std::function<void(int)> callback);
void clear_timer(int timer_id);