forked from vitalif/vitastor
Compare commits
12 Commits
antietcd-i
...
msgr-iothr
Author | SHA1 | Date | |
---|---|---|---|
1d6f1203f8 | |||
d4dc3f4a9d | |||
0c0439b33b | |||
ea0d72289c | |||
e400a851f4 | |||
0fec7a9fea | |||
b9de2a92a9 | |||
5360a70853 | |||
4c2328eb13 | |||
313daef12d | |||
ad9c12e1b9 | |||
4473eb5512 |
@@ -34,7 +34,7 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd
|
||||
{
|
||||
// peer_osd just dropped connection
|
||||
// determine WHICH dirty_buffers are now obsolete and repeat them
|
||||
if (wb->repeat_ops_for(this, peer_osd) > 0)
|
||||
if (wb->repeat_ops_for(this, peer_osd, 0, 0) > 0)
|
||||
{
|
||||
continue_ops();
|
||||
}
|
||||
@@ -52,7 +52,8 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd
|
||||
st_cli.tfd = tfd;
|
||||
st_cli.on_load_config_hook = [this](json11::Json::object & cfg) { on_load_config_hook(cfg); };
|
||||
st_cli.on_change_osd_state_hook = [this](uint64_t peer_osd) { on_change_osd_state_hook(peer_osd); };
|
||||
st_cli.on_change_hook = [this](std::map<std::string, etcd_kv_t> & changes) { on_change_hook(changes); };
|
||||
st_cli.on_change_pool_config_hook = [this]() { on_change_pool_config_hook(); };
|
||||
st_cli.on_change_pg_state_hook = [this](pool_id_t pool_id, pg_num_t pg_num, osd_num_t prev_primary) { on_change_pg_state_hook(pool_id, pg_num, prev_primary); };
|
||||
st_cli.on_load_pgs_hook = [this](bool success) { on_load_pgs_hook(success); };
|
||||
st_cli.on_reload_hook = [this]() { st_cli.load_global_config(); };
|
||||
|
||||
@@ -77,11 +78,6 @@ cluster_client_t::~cluster_client_t()
|
||||
|
||||
cluster_op_t::~cluster_op_t()
|
||||
{
|
||||
if (buf)
|
||||
{
|
||||
free(buf);
|
||||
buf = NULL;
|
||||
}
|
||||
if (bitmap_buf)
|
||||
{
|
||||
free(bitmap_buf);
|
||||
@@ -427,7 +423,7 @@ void cluster_client_t::on_load_pgs_hook(bool success)
|
||||
continue_ops();
|
||||
}
|
||||
|
||||
void cluster_client_t::on_change_hook(std::map<std::string, etcd_kv_t> & changes)
|
||||
void cluster_client_t::on_change_pool_config_hook()
|
||||
{
|
||||
for (auto pool_item: st_cli.pool_config)
|
||||
{
|
||||
@@ -450,6 +446,19 @@ void cluster_client_t::on_change_hook(std::map<std::string, etcd_kv_t> & changes
|
||||
continue_ops();
|
||||
}
|
||||
|
||||
void cluster_client_t::on_change_pg_state_hook(pool_id_t pool_id, pg_num_t pg_num, osd_num_t prev_primary)
|
||||
{
|
||||
auto & pg_cfg = st_cli.pool_config[pool_id].pg_config[pg_num];
|
||||
if (pg_cfg.cur_primary != prev_primary)
|
||||
{
|
||||
// Repeat this PG operations because an OSD which stopped being primary may not fsync operations
|
||||
if (wb->repeat_ops_for(this, 0, pool_id, pg_num) > 0)
|
||||
{
|
||||
continue_ops();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool cluster_client_t::get_immediate_commit(uint64_t inode)
|
||||
{
|
||||
if (enable_writeback)
|
||||
@@ -570,6 +579,14 @@ void cluster_client_t::execute_internal(cluster_op_t *op)
|
||||
{
|
||||
op->cur_inode = op->inode;
|
||||
op->retval = 0;
|
||||
op->state = 0;
|
||||
op->retry_after = 0;
|
||||
op->inflight_count = 0;
|
||||
op->done_count = 0;
|
||||
op->part_bitmaps = NULL;
|
||||
op->bitmap_buf_size = 0;
|
||||
op->prev_wait = 0;
|
||||
assert(!op->prev && !op->next);
|
||||
// check alignment, readonly flag and so on
|
||||
if (!check_rw(op))
|
||||
{
|
||||
@@ -600,7 +617,9 @@ void cluster_client_t::execute_internal(cluster_op_t *op)
|
||||
{
|
||||
if (!(op->flags & OP_FLUSH_BUFFER) && !op->version /* no CAS write-repeat */)
|
||||
{
|
||||
wb->copy_write(op, CACHE_WRITTEN);
|
||||
uint64_t flush_id = ++wb->last_flush_id;
|
||||
wb->copy_write(op, CACHE_REPEATING, flush_id);
|
||||
op->flush_id = flush_id;
|
||||
}
|
||||
if (dirty_bytes >= client_max_dirty_bytes || dirty_ops >= client_max_dirty_ops)
|
||||
{
|
||||
@@ -816,6 +835,10 @@ resume_2:
|
||||
auto & pool_cfg = st_cli.pool_config.at(INODE_POOL(op->inode));
|
||||
op->retval = op->len / pool_cfg.bitmap_granularity;
|
||||
}
|
||||
if (op->flush_id)
|
||||
{
|
||||
wb->mark_flush_written(op->inode, op->offset, op->len, op->flush_id);
|
||||
}
|
||||
erase_op(op);
|
||||
return 1;
|
||||
}
|
||||
@@ -988,6 +1011,29 @@ void cluster_client_t::slice_rw(cluster_op_t *op)
|
||||
}
|
||||
}
|
||||
|
||||
bool cluster_client_t::affects_pg(uint64_t inode, uint64_t offset, uint64_t len, pool_id_t pool_id, pg_num_t pg_num)
|
||||
{
|
||||
if (INODE_POOL(inode) != pool_id)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
auto & pool_cfg = st_cli.pool_config.at(INODE_POOL(inode));
|
||||
uint32_t pg_data_size = (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks);
|
||||
uint64_t pg_block_size = pool_cfg.data_block_size * pg_data_size;
|
||||
uint64_t first_stripe = (offset / pg_block_size) * pg_block_size;
|
||||
uint64_t last_stripe = len > 0 ? ((offset + len - 1) / pg_block_size) * pg_block_size : first_stripe;
|
||||
if ((last_stripe/pool_cfg.pg_stripe_size) - (first_stripe/pool_cfg.pg_stripe_size) + 1 >= pool_cfg.real_pg_count)
|
||||
{
|
||||
// All PGs are affected
|
||||
return true;
|
||||
}
|
||||
pg_num_t first_pg_num = (first_stripe/pool_cfg.pg_stripe_size) % pool_cfg.real_pg_count + 1; // like map_to_pg()
|
||||
pg_num_t last_pg_num = (last_stripe/pool_cfg.pg_stripe_size) % pool_cfg.real_pg_count + 1; // like map_to_pg()
|
||||
return (first_pg_num <= last_pg_num
|
||||
? (pg_num >= first_pg_num && pg_num <= last_pg_num)
|
||||
: (pg_num >= first_pg_num || pg_num <= last_pg_num));
|
||||
}
|
||||
|
||||
bool cluster_client_t::affects_osd(uint64_t inode, uint64_t offset, uint64_t len, osd_num_t osd)
|
||||
{
|
||||
auto & pool_cfg = st_cli.pool_config.at(INODE_POOL(inode));
|
||||
@@ -1210,7 +1256,9 @@ void cluster_client_t::handle_op_part(cluster_op_part_t *part)
|
||||
// So do all these things after modifying operation state, otherwise we may hit reenterability bugs
|
||||
// FIXME postpone such things to set_immediate here to avoid bugs
|
||||
// Set op->retry_after to retry operation after a short pause (not immediately)
|
||||
if (!op->retry_after)
|
||||
if (!op->retry_after && (op->retval == -EPIPE ||
|
||||
op->retval == -EIO && client_eio_retry_interval ||
|
||||
op->retval == -ENOSPC && client_retry_enospc))
|
||||
{
|
||||
op->retry_after = op->retval != -EPIPE ? client_eio_retry_interval : client_retry_interval;
|
||||
}
|
||||
|
@@ -56,8 +56,6 @@ struct cluster_op_t
|
||||
protected:
|
||||
int state = 0;
|
||||
uint64_t cur_inode; // for snapshot reads
|
||||
void *buf = NULL;
|
||||
cluster_op_t *orig_op = NULL;
|
||||
bool needs_reslice = false;
|
||||
int retry_after = 0;
|
||||
int inflight_count = 0, done_count = 0;
|
||||
@@ -66,6 +64,7 @@ protected:
|
||||
unsigned bitmap_buf_size = 0;
|
||||
cluster_op_t *prev = NULL, *next = NULL;
|
||||
int prev_wait = 0;
|
||||
uint64_t flush_id = 0;
|
||||
friend class cluster_client_t;
|
||||
friend class writeback_cache_t;
|
||||
};
|
||||
@@ -81,6 +80,7 @@ class cluster_client_t
|
||||
ring_loop_t *ringloop;
|
||||
|
||||
std::map<pool_id_t, uint64_t> pg_counts;
|
||||
std::map<pool_pg_num_t, osd_num_t> pg_primary;
|
||||
// client_max_dirty_* is actually "max unsynced", for the case when immediate_commit is off
|
||||
uint64_t client_max_dirty_bytes = 0;
|
||||
uint64_t client_max_dirty_ops = 0;
|
||||
@@ -146,9 +146,11 @@ public:
|
||||
|
||||
protected:
|
||||
bool affects_osd(uint64_t inode, uint64_t offset, uint64_t len, osd_num_t osd);
|
||||
bool affects_pg(uint64_t inode, uint64_t offset, uint64_t len, pool_id_t pool_id, pg_num_t pg_num);
|
||||
void on_load_config_hook(json11::Json::object & config);
|
||||
void on_load_pgs_hook(bool success);
|
||||
void on_change_hook(std::map<std::string, etcd_kv_t> & changes);
|
||||
void on_change_pool_config_hook();
|
||||
void on_change_pg_state_hook(pool_id_t pool_id, pg_num_t pg_num, osd_num_t prev_primary);
|
||||
void on_change_osd_state_hook(uint64_t peer_osd);
|
||||
void execute_internal(cluster_op_t *op);
|
||||
void unshift_op(cluster_op_t *op);
|
||||
|
@@ -46,11 +46,12 @@ public:
|
||||
bool is_left_merged(dirty_buf_it_t dirty_it);
|
||||
bool is_right_merged(dirty_buf_it_t dirty_it);
|
||||
bool is_merged(const dirty_buf_it_t & dirty_it);
|
||||
void copy_write(cluster_op_t *op, int state);
|
||||
int repeat_ops_for(cluster_client_t *cli, osd_num_t peer_osd);
|
||||
void copy_write(cluster_op_t *op, int state, uint64_t new_flush_id = 0);
|
||||
int repeat_ops_for(cluster_client_t *cli, osd_num_t peer_osd, pool_id_t pool_id, pg_num_t pg_num);
|
||||
void start_writebacks(cluster_client_t *cli, int count);
|
||||
bool read_from_cache(cluster_op_t *op, uint32_t bitmap_granularity);
|
||||
void flush_buffers(cluster_client_t *cli, dirty_buf_it_t from_it, dirty_buf_it_t to_it);
|
||||
void mark_flush_written(uint64_t inode, uint64_t offset, uint64_t len, uint64_t flush_id);
|
||||
void fsync_start();
|
||||
void fsync_error();
|
||||
void fsync_ok();
|
||||
|
@@ -71,7 +71,7 @@ bool writeback_cache_t::is_merged(const dirty_buf_it_t & dirty_it)
|
||||
return is_left_merged(dirty_it) || is_right_merged(dirty_it);
|
||||
}
|
||||
|
||||
void writeback_cache_t::copy_write(cluster_op_t *op, int state)
|
||||
void writeback_cache_t::copy_write(cluster_op_t *op, int state, uint64_t new_flush_id)
|
||||
{
|
||||
// Save operation for replay when one of PGs goes out of sync
|
||||
// (primary OSD drops our connection in this case)
|
||||
@@ -180,6 +180,7 @@ void writeback_cache_t::copy_write(cluster_op_t *op, int state)
|
||||
.buf = buf,
|
||||
.len = op->len,
|
||||
.state = state,
|
||||
.flush_id = new_flush_id,
|
||||
.refcnt = refcnt,
|
||||
});
|
||||
if (state == CACHE_DIRTY)
|
||||
@@ -208,7 +209,7 @@ void writeback_cache_t::copy_write(cluster_op_t *op, int state)
|
||||
}
|
||||
}
|
||||
|
||||
int writeback_cache_t::repeat_ops_for(cluster_client_t *cli, osd_num_t peer_osd)
|
||||
int writeback_cache_t::repeat_ops_for(cluster_client_t *cli, osd_num_t peer_osd, pool_id_t pool_id, pg_num_t pg_num)
|
||||
{
|
||||
int repeated = 0;
|
||||
if (dirty_buffers.size())
|
||||
@@ -218,8 +219,11 @@ int writeback_cache_t::repeat_ops_for(cluster_client_t *cli, osd_num_t peer_osd)
|
||||
for (auto wr_it = dirty_buffers.begin(), flush_it = wr_it, last_it = wr_it; ; )
|
||||
{
|
||||
bool end = wr_it == dirty_buffers.end();
|
||||
bool flush_this = !end && wr_it->second.state != CACHE_REPEATING &&
|
||||
cli->affects_osd(wr_it->first.inode, wr_it->first.stripe, wr_it->second.len, peer_osd);
|
||||
bool flush_this = !end && wr_it->second.state != CACHE_REPEATING;
|
||||
if (peer_osd)
|
||||
flush_this = flush_this && cli->affects_osd(wr_it->first.inode, wr_it->first.stripe, wr_it->second.len, peer_osd);
|
||||
if (pool_id && pg_num)
|
||||
flush_this = flush_this && cli->affects_pg(wr_it->first.inode, wr_it->first.stripe, wr_it->second.len, pool_id, pg_num);
|
||||
if (flush_it != wr_it && (end || !flush_this ||
|
||||
wr_it->first.inode != flush_it->first.inode ||
|
||||
wr_it->first.stripe != last_it->first.stripe+last_it->second.len))
|
||||
@@ -265,7 +269,7 @@ void writeback_cache_t::flush_buffers(cluster_client_t *cli, dirty_buf_it_t from
|
||||
writebacks_active++;
|
||||
op->callback = [this, flush_id](cluster_op_t* op)
|
||||
{
|
||||
// Buffer flushes should be always retried, regardless of the error,
|
||||
// Buffer flushes are always retried, regardless of the error,
|
||||
// so they should never result in an error here
|
||||
assert(op->retval == op->len);
|
||||
for (auto fl_it = flushed_buffers.find(flush_id);
|
||||
@@ -277,16 +281,7 @@ void writeback_cache_t::flush_buffers(cluster_client_t *cli, dirty_buf_it_t from
|
||||
}
|
||||
flushed_buffers.erase(fl_it++);
|
||||
}
|
||||
for (auto dirty_it = find_dirty(op->inode, op->offset);
|
||||
dirty_it != dirty_buffers.end() && dirty_it->first.inode == op->inode &&
|
||||
dirty_it->first.stripe < op->offset+op->len; dirty_it++)
|
||||
{
|
||||
if (dirty_it->second.flush_id == flush_id && dirty_it->second.state == CACHE_REPEATING)
|
||||
{
|
||||
dirty_it->second.flush_id = 0;
|
||||
dirty_it->second.state = CACHE_WRITTEN;
|
||||
}
|
||||
}
|
||||
mark_flush_written(op->inode, op->offset, op->len, flush_id);
|
||||
delete op;
|
||||
writebacks_active--;
|
||||
// We can't call execute_internal because it affects an invalid copy of the list here
|
||||
@@ -304,6 +299,20 @@ void writeback_cache_t::flush_buffers(cluster_client_t *cli, dirty_buf_it_t from
|
||||
}
|
||||
}
|
||||
|
||||
void writeback_cache_t::mark_flush_written(uint64_t inode, uint64_t offset, uint64_t len, uint64_t flush_id)
|
||||
{
|
||||
for (auto dirty_it = find_dirty(inode, offset);
|
||||
dirty_it != dirty_buffers.end() && dirty_it->first.inode == inode &&
|
||||
dirty_it->first.stripe < offset+len; dirty_it++)
|
||||
{
|
||||
if (dirty_it->second.flush_id == flush_id && dirty_it->second.state == CACHE_REPEATING)
|
||||
{
|
||||
dirty_it->second.flush_id = 0;
|
||||
dirty_it->second.state = CACHE_WRITTEN;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void writeback_cache_t::start_writebacks(cluster_client_t *cli, int count)
|
||||
{
|
||||
if (!writeback_queue.size())
|
||||
|
@@ -890,6 +890,10 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
|
||||
}
|
||||
}
|
||||
}
|
||||
if (on_change_pool_config_hook)
|
||||
{
|
||||
on_change_pool_config_hook();
|
||||
}
|
||||
}
|
||||
else if (key == etcd_prefix+"/config/pgs")
|
||||
{
|
||||
@@ -1028,13 +1032,19 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
|
||||
else if (value.is_null())
|
||||
{
|
||||
auto & pg_cfg = this->pool_config[pool_id].pg_config[pg_num];
|
||||
auto prev_primary = pg_cfg.cur_primary;
|
||||
pg_cfg.state_exists = false;
|
||||
pg_cfg.cur_primary = 0;
|
||||
pg_cfg.cur_state = 0;
|
||||
if (on_change_pg_state_hook)
|
||||
{
|
||||
on_change_pg_state_hook(pool_id, pg_num, prev_primary);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
auto & pg_cfg = this->pool_config[pool_id].pg_config[pg_num];
|
||||
auto prev_primary = pg_cfg.cur_primary;
|
||||
pg_cfg.state_exists = true;
|
||||
osd_num_t cur_primary = value["primary"].uint64_value();
|
||||
int state = 0;
|
||||
@@ -1065,6 +1075,10 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
|
||||
}
|
||||
pg_cfg.cur_primary = cur_primary;
|
||||
pg_cfg.cur_state = state;
|
||||
if (on_change_pg_state_hook)
|
||||
{
|
||||
on_change_pg_state_hook(pool_id, pg_num, prev_primary);
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (key.substr(0, etcd_prefix.length()+11) == etcd_prefix+"/osd/state/")
|
||||
|
@@ -127,6 +127,8 @@ public:
|
||||
std::function<void(json11::Json::object &)> on_load_config_hook;
|
||||
std::function<json11::Json()> load_pgs_checks_hook;
|
||||
std::function<void(bool)> on_load_pgs_hook;
|
||||
std::function<void()> on_change_pool_config_hook;
|
||||
std::function<void(pool_id_t, pg_num_t, osd_num_t)> on_change_pg_state_hook;
|
||||
std::function<void(pool_id_t, pg_num_t)> on_change_pg_history_hook;
|
||||
std::function<void(osd_num_t)> on_change_osd_state_hook;
|
||||
std::function<void()> on_reload_hook;
|
||||
|
@@ -271,7 +271,7 @@ void http_co_t::close_connection()
|
||||
}
|
||||
if (peer_fd >= 0)
|
||||
{
|
||||
tfd->set_fd_handler(peer_fd, false, NULL);
|
||||
tfd->set_fd_handler(peer_fd, 0, NULL);
|
||||
close(peer_fd);
|
||||
peer_fd = -1;
|
||||
}
|
||||
@@ -314,7 +314,7 @@ void http_co_t::start_connection()
|
||||
stackout();
|
||||
return;
|
||||
}
|
||||
tfd->set_fd_handler(peer_fd, true, [this](int peer_fd, int epoll_events)
|
||||
tfd->set_fd_handler(peer_fd, EPOLLOUT, [this](int peer_fd, int epoll_events)
|
||||
{
|
||||
this->epoll_events |= epoll_events;
|
||||
handle_events();
|
||||
@@ -372,7 +372,7 @@ void http_co_t::handle_connect_result()
|
||||
}
|
||||
int one = 1;
|
||||
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
|
||||
tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events)
|
||||
tfd->set_fd_handler(peer_fd, EPOLLIN, [this](int peer_fd, int epoll_events)
|
||||
{
|
||||
this->epoll_events |= epoll_events;
|
||||
handle_events();
|
||||
|
@@ -15,6 +15,140 @@
|
||||
#include "msgr_rdma.h"
|
||||
#endif
|
||||
|
||||
#include <sys/poll.h>
|
||||
#include <sys/eventfd.h>
|
||||
|
||||
msgr_iothread_t::msgr_iothread_t()
|
||||
{
|
||||
ring = new ring_loop_t(RINGLOOP_DEFAULT_SIZE);
|
||||
epmgr = new epoll_manager_t(ring);
|
||||
submit_eventfd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
|
||||
if (submit_eventfd < 0)
|
||||
{
|
||||
throw std::runtime_error(std::string("failed to create eventfd: ")+strerror(errno));
|
||||
}
|
||||
epmgr->tfd->set_fd_handler(submit_eventfd, false, [this](int fd, int epoll_events)
|
||||
{
|
||||
// Reset eventfd counter
|
||||
uint64_t ctr = 0;
|
||||
int r = read(submit_eventfd, &ctr, 8);
|
||||
if (r < 0 && errno != EAGAIN && errno != EINTR)
|
||||
{
|
||||
fprintf(stderr, "Error resetting eventfd: %s\n", strerror(errno));
|
||||
}
|
||||
// Submit pending events
|
||||
submit_pending_writes();
|
||||
});
|
||||
thread = new std::thread(&msgr_iothread_t::run, this);
|
||||
}
|
||||
|
||||
msgr_iothread_t::~msgr_iothread_t()
|
||||
{
|
||||
stop();
|
||||
delete thread;
|
||||
delete epmgr;
|
||||
delete ring;
|
||||
}
|
||||
|
||||
void msgr_iothread_t::stop()
|
||||
{
|
||||
mu.lock();
|
||||
if (stopped)
|
||||
{
|
||||
mu.unlock();
|
||||
return;
|
||||
}
|
||||
stopped = true;
|
||||
if (callback_stopped)
|
||||
{
|
||||
*callback_stopped = true;
|
||||
callback_stopped = NULL;
|
||||
}
|
||||
close(submit_eventfd);
|
||||
mu.unlock();
|
||||
thread->join();
|
||||
}
|
||||
|
||||
static uint64_t one = 1;
|
||||
|
||||
void msgr_iothread_t::wakeup(osd_client_t *cl, ring_loop_t *outer_ring)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mu);
|
||||
if (!pending_clients.size())
|
||||
{
|
||||
io_uring_sqe* sqe = outer_ring->get_sqe();
|
||||
if (!sqe)
|
||||
{
|
||||
write(submit_eventfd, &one, sizeof(one));
|
||||
}
|
||||
else
|
||||
{
|
||||
ring_data_t* data = ((ring_data_t*)sqe->user_data);
|
||||
data->callback = [](ring_data_t*){};
|
||||
my_uring_prep_write(sqe, submit_eventfd, &one, sizeof(one), 0);
|
||||
}
|
||||
}
|
||||
add_pending(cl);
|
||||
}
|
||||
|
||||
void msgr_iothread_t::add_pending(osd_client_t *cl)
|
||||
{
|
||||
auto it = std::lower_bound(pending_clients.begin(), pending_clients.end(), cl);
|
||||
if (it == pending_clients.end() || *it != cl)
|
||||
{
|
||||
pending_clients.insert(it, cl);
|
||||
}
|
||||
}
|
||||
|
||||
void msgr_iothread_t::submit_pending_writes()
|
||||
{
|
||||
mu.lock();
|
||||
std::vector<osd_client_t*> clients = std::move(pending_clients);
|
||||
mu.unlock();
|
||||
for (int i = 0; i < clients.size(); i++)
|
||||
{
|
||||
auto cl = clients[i];
|
||||
std::unique_lock<std::mutex> lock(cl->mu);
|
||||
if (cl->peer_state == PEER_STOPPED)
|
||||
{
|
||||
cl->refs--;
|
||||
if (cl->refs <= 0)
|
||||
delete cl;
|
||||
continue;
|
||||
}
|
||||
if (!cl->try_send(ring, false/*, lock*/))
|
||||
{
|
||||
// ring is full (rare but what if...)
|
||||
ring->submit();
|
||||
mu.lock();
|
||||
for (; i < clients.size(); i++)
|
||||
{
|
||||
add_pending(clients[i]);
|
||||
}
|
||||
mu.unlock();
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
cl->refs--;
|
||||
}
|
||||
}
|
||||
ring->submit();
|
||||
}
|
||||
|
||||
void msgr_iothread_t::run()
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
mu.lock();
|
||||
if (stopped)
|
||||
return;
|
||||
mu.unlock();
|
||||
ring->loop();
|
||||
ring->wait();
|
||||
}
|
||||
}
|
||||
|
||||
void osd_messenger_t::init()
|
||||
{
|
||||
#ifdef WITH_RDMA
|
||||
@@ -35,7 +169,7 @@ void osd_messenger_t::init()
|
||||
? rdma_max_sge : rdma_context->attrx.orig_attr.max_sge;
|
||||
fprintf(stderr, "[OSD %ju] RDMA initialized successfully\n", osd_num);
|
||||
fcntl(rdma_context->channel->fd, F_SETFL, fcntl(rdma_context->channel->fd, F_GETFL, 0) | O_NONBLOCK);
|
||||
tfd->set_fd_handler(rdma_context->channel->fd, false, [this](int notify_fd, int epoll_events)
|
||||
tfd->set_fd_handler(rdma_context->channel->fd, EPOLLIN, [this](int notify_fd, int epoll_events)
|
||||
{
|
||||
handle_rdma_events();
|
||||
});
|
||||
@@ -43,6 +177,19 @@ void osd_messenger_t::init()
|
||||
}
|
||||
}
|
||||
#endif
|
||||
if (ringloop && iothread_count > 0)
|
||||
{
|
||||
for (int i = 0; i < iothread_count; i++)
|
||||
{
|
||||
auto iot = new msgr_iothread_t();
|
||||
iothreads.push_back(iot);
|
||||
}
|
||||
completion_eventfd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
|
||||
if (completion_eventfd < 0)
|
||||
{
|
||||
throw std::runtime_error(std::string("failed to create completion eventfd: ")+strerror(errno));
|
||||
}
|
||||
}
|
||||
keepalive_timer_id = tfd->set_timer(1000, true, [this](int)
|
||||
{
|
||||
auto cl_it = clients.begin();
|
||||
@@ -120,6 +267,11 @@ void osd_messenger_t::init()
|
||||
|
||||
osd_messenger_t::~osd_messenger_t()
|
||||
{
|
||||
if (completion_eventfd >= 0)
|
||||
{
|
||||
close(completion_eventfd);
|
||||
completion_eventfd = -1;
|
||||
}
|
||||
if (keepalive_timer_id >= 0)
|
||||
{
|
||||
tfd->clear_timer(keepalive_timer_id);
|
||||
@@ -129,6 +281,14 @@ osd_messenger_t::~osd_messenger_t()
|
||||
{
|
||||
stop_client(clients.begin()->first, true, true);
|
||||
}
|
||||
if (iothreads.size())
|
||||
{
|
||||
for (auto iot: iothreads)
|
||||
{
|
||||
delete iot;
|
||||
}
|
||||
iothreads.clear();
|
||||
}
|
||||
#ifdef WITH_RDMA
|
||||
if (rdma_context)
|
||||
{
|
||||
@@ -262,7 +422,8 @@ void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer
|
||||
clients[peer_fd]->connect_timeout_id = -1;
|
||||
clients[peer_fd]->osd_num = peer_osd;
|
||||
clients[peer_fd]->in_buf = malloc_or_die(receive_buffer_size);
|
||||
tfd->set_fd_handler(peer_fd, true, [this](int peer_fd, int epoll_events)
|
||||
clients[peer_fd]->receive_buffer_size = receive_buffer_size;
|
||||
tfd->set_fd_handler(peer_fd, EPOLLOUT, [this](int peer_fd, int epoll_events)
|
||||
{
|
||||
// Either OUT (connected) or HUP
|
||||
handle_connect_epoll(peer_fd);
|
||||
@@ -303,7 +464,7 @@ void osd_messenger_t::handle_connect_epoll(int peer_fd)
|
||||
int one = 1;
|
||||
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
|
||||
cl->peer_state = PEER_CONNECTED;
|
||||
tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events)
|
||||
tfd->set_fd_handler(peer_fd, EPOLLIN, [this](int peer_fd, int epoll_events)
|
||||
{
|
||||
handle_peer_epoll(peer_fd, epoll_events);
|
||||
});
|
||||
@@ -327,6 +488,14 @@ void osd_messenger_t::handle_peer_epoll(int peer_fd, int epoll_events)
|
||||
{
|
||||
// Mark client as ready (i.e. some data is available)
|
||||
auto cl = clients[peer_fd];
|
||||
#ifdef WITH_RDMA
|
||||
if (cl->peer_state == PEER_RDMA)
|
||||
{
|
||||
// FIXME: Do something better than just forgetting the FD
|
||||
// FIXME: Ignore pings during RDMA state transition
|
||||
return;
|
||||
}
|
||||
#endif
|
||||
cl->read_ready++;
|
||||
if (cl->read_ready == 1)
|
||||
{
|
||||
@@ -487,7 +656,7 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
|
||||
fprintf(stderr, "Connected to OSD %ju using RDMA\n", cl->osd_num);
|
||||
}
|
||||
cl->peer_state = PEER_RDMA;
|
||||
tfd->set_fd_handler(cl->peer_fd, false, [this](int peer_fd, int epoll_events)
|
||||
tfd->set_fd_handler(cl->peer_fd, EPOLLIN, [this](int peer_fd, int epoll_events)
|
||||
{
|
||||
// Do not miss the disconnection!
|
||||
if (epoll_events & EPOLLRDHUP)
|
||||
@@ -527,8 +696,9 @@ void osd_messenger_t::accept_connections(int listen_fd)
|
||||
clients[peer_fd]->peer_fd = peer_fd;
|
||||
clients[peer_fd]->peer_state = PEER_CONNECTED;
|
||||
clients[peer_fd]->in_buf = malloc_or_die(receive_buffer_size);
|
||||
clients[peer_fd]->receive_buffer_size = receive_buffer_size;
|
||||
// Add FD to epoll
|
||||
tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events)
|
||||
tfd->set_fd_handler(peer_fd, EPOLLIN, [this](int peer_fd, int epoll_events)
|
||||
{
|
||||
handle_peer_epoll(peer_fd, epoll_events);
|
||||
});
|
||||
|
@@ -11,6 +11,7 @@
|
||||
#include <map>
|
||||
#include <deque>
|
||||
#include <vector>
|
||||
#include <mutex>
|
||||
|
||||
#include "malloc_or_die.h"
|
||||
#include "json11/json11.hpp"
|
||||
@@ -47,6 +48,8 @@ struct msgr_rdma_context_t;
|
||||
|
||||
struct osd_client_t
|
||||
{
|
||||
std::mutex mu;
|
||||
|
||||
int refs = 0;
|
||||
|
||||
sockaddr_storage peer_addr;
|
||||
@@ -59,6 +62,7 @@ struct osd_client_t
|
||||
osd_num_t osd_num = 0;
|
||||
|
||||
void *in_buf = NULL;
|
||||
uint32_t receive_buffer_size = 0;
|
||||
|
||||
#ifdef WITH_RDMA
|
||||
msgr_rdma_connection_t *rdma_conn = NULL;
|
||||
@@ -89,6 +93,17 @@ struct osd_client_t
|
||||
std::vector<msgr_sendp_t> outbox, next_outbox;
|
||||
|
||||
~osd_client_t();
|
||||
|
||||
bool try_send(ring_loop_t *ringloop, bool use_sync_send_recv);
|
||||
void handle_send(int result);
|
||||
|
||||
bool try_recv(ring_loop_t *ringloop, bool use_sync_send_recv);
|
||||
bool handle_read(int result);
|
||||
bool handle_read_buffer(void *curbuf, int remain);
|
||||
bool handle_finished_read();
|
||||
void handle_op_hdr();
|
||||
bool handle_reply_hdr();
|
||||
void handle_reply_ready(osd_op_t *op);
|
||||
};
|
||||
|
||||
struct osd_wanted_peer_t
|
||||
@@ -111,6 +126,53 @@ struct osd_op_stats_t
|
||||
uint64_t subop_stat_count[OSD_OP_MAX+1] = { 0 };
|
||||
};
|
||||
|
||||
#ifdef __MOCK__
|
||||
class msgr_iothread_t;
|
||||
#else
|
||||
|
||||
#include <thread>
|
||||
|
||||
#include "epoll_manager.h"
|
||||
|
||||
class msgr_iothread_t
|
||||
{
|
||||
protected:
|
||||
ring_loop_t *ring = NULL;
|
||||
epoll_manager_t *epmgr = NULL;
|
||||
int submit_eventfd = -1;
|
||||
bool stopped = false;
|
||||
bool pending = false;
|
||||
bool *callback_stopped = NULL;
|
||||
std::mutex mu;
|
||||
std::vector<osd_client_t*> pending_clients;
|
||||
std::thread *thread = NULL;
|
||||
|
||||
void run();
|
||||
public:
|
||||
|
||||
msgr_iothread_t();
|
||||
~msgr_iothread_t();
|
||||
|
||||
epoll_manager_t *get_epmgr()
|
||||
{
|
||||
return epmgr;
|
||||
}
|
||||
|
||||
int get_eventfd()
|
||||
{
|
||||
return submit_eventfd;
|
||||
}
|
||||
|
||||
void wakeup(osd_client_t *cl, ring_loop_t *outer_ring);
|
||||
|
||||
void add_pending(osd_client_t *cl);
|
||||
|
||||
void submit_pending_writes();
|
||||
|
||||
void stop();
|
||||
};
|
||||
#endif
|
||||
|
||||
struct osd_messenger_t
|
||||
{
|
||||
protected:
|
||||
@@ -123,6 +185,7 @@ protected:
|
||||
int osd_ping_timeout = 0;
|
||||
int log_level = 0;
|
||||
bool use_sync_send_recv = false;
|
||||
int iothread_count = 4;
|
||||
|
||||
#ifdef WITH_RDMA
|
||||
bool use_rdma = true;
|
||||
@@ -134,10 +197,12 @@ protected:
|
||||
bool rdma_odp = false;
|
||||
#endif
|
||||
|
||||
std::vector<msgr_iothread_t*> iothreads;
|
||||
std::vector<int> read_ready_clients;
|
||||
std::vector<int> write_ready_clients;
|
||||
// We don't use ringloop->set_immediate here because we may have no ringloop in client :)
|
||||
std::vector<std::function<void()>> set_immediate;
|
||||
int completion_eventfd = -1;
|
||||
std::mutex completions_mu;
|
||||
std::vector<osd_op_t*> completions;
|
||||
|
||||
public:
|
||||
timerfd_manager_t *tfd;
|
||||
@@ -188,15 +253,8 @@ protected:
|
||||
void cancel_osd_ops(osd_client_t *cl);
|
||||
void cancel_op(osd_op_t *op);
|
||||
|
||||
bool try_send(osd_client_t *cl);
|
||||
void handle_send(int result, osd_client_t *cl);
|
||||
|
||||
bool handle_read(int result, osd_client_t *cl);
|
||||
bool handle_read_buffer(osd_client_t *cl, void *curbuf, int remain);
|
||||
bool handle_finished_read(osd_client_t *cl);
|
||||
void handle_op_hdr(osd_client_t *cl);
|
||||
bool handle_reply_hdr(osd_client_t *cl);
|
||||
void handle_reply_ready(osd_op_t *op);
|
||||
void add_completion(osd_op_t *op, ring_loop_t *ringloop = NULL);
|
||||
void handle_completions();
|
||||
|
||||
#ifdef WITH_RDMA
|
||||
void try_send_rdma(osd_client_t *cl);
|
||||
|
@@ -363,6 +363,8 @@ bool osd_messenger_t::connect_rdma(int peer_fd, std::string rdma_address, uint64
|
||||
auto cl = clients.at(peer_fd);
|
||||
cl->rdma_conn = rdma_conn;
|
||||
cl->peer_state = PEER_RDMA_CONNECTING;
|
||||
// Add the initial receive request
|
||||
try_recv_rdma(cl);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@@ -603,7 +605,7 @@ void osd_messenger_t::handle_rdma_events()
|
||||
if (!is_send)
|
||||
{
|
||||
rc->cur_recv--;
|
||||
if (!handle_read_buffer(cl, rc->recv_buffers[rc->next_recv_buf].buf, wc[i].byte_len))
|
||||
if (!cl->handle_read_buffer(rc->recv_buffers[rc->next_recv_buf].buf, wc[i].byte_len))
|
||||
{
|
||||
// handle_read_buffer may stop the client
|
||||
continue;
|
||||
@@ -666,9 +668,5 @@ void osd_messenger_t::handle_rdma_events()
|
||||
}
|
||||
}
|
||||
} while (event_count > 0);
|
||||
for (auto cb: set_immediate)
|
||||
{
|
||||
cb();
|
||||
}
|
||||
set_immediate.clear();
|
||||
handle_received_ops();
|
||||
}
|
||||
|
@@ -9,53 +9,64 @@ void osd_messenger_t::read_requests()
|
||||
{
|
||||
int peer_fd = read_ready_clients[i];
|
||||
osd_client_t *cl = clients[peer_fd];
|
||||
if (cl->read_msg.msg_iovlen)
|
||||
if (!cl->try_recv(ringloop, use_sync_send_recv))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
if (cl->read_remaining < receive_buffer_size)
|
||||
{
|
||||
cl->read_iov.iov_base = cl->in_buf;
|
||||
cl->read_iov.iov_len = receive_buffer_size;
|
||||
cl->read_msg.msg_iov = &cl->read_iov;
|
||||
cl->read_msg.msg_iovlen = 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
cl->read_iov.iov_base = 0;
|
||||
cl->read_iov.iov_len = cl->read_remaining;
|
||||
cl->read_msg.msg_iov = cl->recv_list.get_iovec();
|
||||
cl->read_msg.msg_iovlen = cl->recv_list.get_size();
|
||||
}
|
||||
cl->refs++;
|
||||
if (ringloop && !use_sync_send_recv)
|
||||
{
|
||||
io_uring_sqe* sqe = ringloop->get_sqe();
|
||||
if (!sqe)
|
||||
{
|
||||
cl->read_msg.msg_iovlen = 0;
|
||||
read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients.begin() + i);
|
||||
return;
|
||||
}
|
||||
ring_data_t* data = ((ring_data_t*)sqe->user_data);
|
||||
data->callback = [this, cl](ring_data_t *data) { handle_read(data->res, cl); };
|
||||
my_uring_prep_recvmsg(sqe, peer_fd, &cl->read_msg, 0);
|
||||
}
|
||||
else
|
||||
{
|
||||
int result = recvmsg(peer_fd, &cl->read_msg, 0);
|
||||
if (result < 0)
|
||||
{
|
||||
result = -errno;
|
||||
}
|
||||
handle_read(result, cl);
|
||||
read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients.begin() + i);
|
||||
return;
|
||||
}
|
||||
}
|
||||
read_ready_clients.clear();
|
||||
}
|
||||
|
||||
bool osd_messenger_t::handle_read(int result, osd_client_t *cl)
|
||||
bool osd_client_t::try_recv(ring_loop_t *ringloop, bool use_sync_send_recv)
|
||||
{
|
||||
auto cl = this;
|
||||
if (cl->read_msg.msg_iovlen)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
if (cl->read_remaining < cl->receive_buffer_size)
|
||||
{
|
||||
cl->read_iov.iov_base = cl->in_buf;
|
||||
cl->read_iov.iov_len = cl->receive_buffer_size;
|
||||
cl->read_msg.msg_iov = &cl->read_iov;
|
||||
cl->read_msg.msg_iovlen = 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
cl->read_iov.iov_base = 0;
|
||||
cl->read_iov.iov_len = cl->read_remaining;
|
||||
cl->read_msg.msg_iov = cl->recv_list.get_iovec();
|
||||
cl->read_msg.msg_iovlen = cl->recv_list.get_size();
|
||||
}
|
||||
cl->refs++;
|
||||
if (ringloop && !use_sync_send_recv)
|
||||
{
|
||||
io_uring_sqe* sqe = ringloop->get_sqe();
|
||||
if (!sqe)
|
||||
{
|
||||
cl->read_msg.msg_iovlen = 0;
|
||||
return false;
|
||||
}
|
||||
ring_data_t* data = ((ring_data_t*)sqe->user_data);
|
||||
data->callback = [this](ring_data_t *data) { handle_read(data->res); };
|
||||
my_uring_prep_recvmsg(sqe, peer_fd, &cl->read_msg, 0);
|
||||
}
|
||||
else
|
||||
{
|
||||
int result = recvmsg(peer_fd, &cl->read_msg, 0);
|
||||
if (result < 0)
|
||||
{
|
||||
result = -errno;
|
||||
}
|
||||
handle_read(result);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool osd_client_t::handle_read(int result)
|
||||
{
|
||||
auto cl = this;
|
||||
bool ret = false;
|
||||
cl->read_msg.msg_iovlen = 0;
|
||||
cl->refs--;
|
||||
@@ -74,7 +85,7 @@ bool osd_messenger_t::handle_read(int result, osd_client_t *cl)
|
||||
{
|
||||
fprintf(stderr, "Client %d socket read error: %d (%s). Disconnecting client\n", cl->peer_fd, -result, strerror(-result));
|
||||
}
|
||||
stop_client(cl->peer_fd);
|
||||
close(cl->peer_fd);
|
||||
return false;
|
||||
}
|
||||
if (result == -EAGAIN || result == -EINTR || result < cl->read_iov.iov_len)
|
||||
@@ -91,7 +102,7 @@ bool osd_messenger_t::handle_read(int result, osd_client_t *cl)
|
||||
{
|
||||
if (cl->read_iov.iov_base == cl->in_buf)
|
||||
{
|
||||
if (!handle_read_buffer(cl, cl->in_buf, result))
|
||||
if (!handle_read_buffer(cl->in_buf, result))
|
||||
{
|
||||
goto fin;
|
||||
}
|
||||
@@ -103,7 +114,7 @@ bool osd_messenger_t::handle_read(int result, osd_client_t *cl)
|
||||
cl->recv_list.eat(result);
|
||||
if (cl->recv_list.done >= cl->recv_list.count)
|
||||
{
|
||||
if (!handle_finished_read(cl))
|
||||
if (!handle_finished_read())
|
||||
{
|
||||
goto fin;
|
||||
}
|
||||
@@ -115,16 +126,13 @@ bool osd_messenger_t::handle_read(int result, osd_client_t *cl)
|
||||
}
|
||||
}
|
||||
fin:
|
||||
for (auto cb: set_immediate)
|
||||
{
|
||||
cb();
|
||||
}
|
||||
set_immediate.clear();
|
||||
msgr->handle_completions();
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool osd_messenger_t::handle_read_buffer(osd_client_t *cl, void *curbuf, int remain)
|
||||
bool osd_client_t::handle_read_buffer(void *curbuf, int remain)
|
||||
{
|
||||
auto cl = this;
|
||||
// Compose operation(s) from the buffer
|
||||
while (remain > 0)
|
||||
{
|
||||
@@ -160,7 +168,7 @@ bool osd_messenger_t::handle_read_buffer(osd_client_t *cl, void *curbuf, int rem
|
||||
}
|
||||
if (cl->recv_list.done >= cl->recv_list.count)
|
||||
{
|
||||
if (!handle_finished_read(cl))
|
||||
if (!handle_finished_read())
|
||||
{
|
||||
return false;
|
||||
}
|
||||
@@ -169,19 +177,20 @@ bool osd_messenger_t::handle_read_buffer(osd_client_t *cl, void *curbuf, int rem
|
||||
return true;
|
||||
}
|
||||
|
||||
bool osd_messenger_t::handle_finished_read(osd_client_t *cl)
|
||||
bool osd_client_t::handle_finished_read()
|
||||
{
|
||||
auto cl = this;
|
||||
cl->recv_list.reset();
|
||||
if (cl->read_state == CL_READ_HDR)
|
||||
{
|
||||
if (cl->read_op->req.hdr.magic == SECONDARY_OSD_REPLY_MAGIC)
|
||||
return handle_reply_hdr(cl);
|
||||
return handle_reply_hdr();
|
||||
else if (cl->read_op->req.hdr.magic == SECONDARY_OSD_OP_MAGIC)
|
||||
handle_op_hdr(cl);
|
||||
handle_op_hdr();
|
||||
else
|
||||
{
|
||||
fprintf(stderr, "Received garbage: magic=%jx id=%ju opcode=%jx from %d\n", cl->read_op->req.hdr.magic, cl->read_op->req.hdr.id, cl->read_op->req.hdr.opcode, cl->peer_fd);
|
||||
stop_client(cl->peer_fd);
|
||||
close(cl->peer_fd);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -189,14 +198,14 @@ bool osd_messenger_t::handle_finished_read(osd_client_t *cl)
|
||||
{
|
||||
// Operation is ready
|
||||
cl->received_ops.push_back(cl->read_op);
|
||||
set_immediate.push_back([this, op = cl->read_op]() { exec_op(op); });
|
||||
msgr->add_completion(cl->read_op);
|
||||
cl->read_op = NULL;
|
||||
cl->read_state = 0;
|
||||
}
|
||||
else if (cl->read_state == CL_READ_REPLY_DATA)
|
||||
{
|
||||
// Reply is ready
|
||||
handle_reply_ready(cl->read_op);
|
||||
msgr->add_completion(cl->read_op);
|
||||
cl->read_op = NULL;
|
||||
cl->read_state = 0;
|
||||
}
|
||||
@@ -207,8 +216,9 @@ bool osd_messenger_t::handle_finished_read(osd_client_t *cl)
|
||||
return true;
|
||||
}
|
||||
|
||||
void osd_messenger_t::handle_op_hdr(osd_client_t *cl)
|
||||
void osd_client_t::handle_op_hdr()
|
||||
{
|
||||
auto cl = this;
|
||||
osd_op_t *cur_op = cl->read_op;
|
||||
if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ)
|
||||
{
|
||||
@@ -285,20 +295,21 @@ void osd_messenger_t::handle_op_hdr(osd_client_t *cl)
|
||||
{
|
||||
// Operation is ready
|
||||
cl->received_ops.push_back(cur_op);
|
||||
set_immediate.push_back([this, cur_op]() { exec_op(cur_op); });
|
||||
msgr->add_completion(cur_op);
|
||||
cl->read_op = NULL;
|
||||
cl->read_state = 0;
|
||||
}
|
||||
}
|
||||
|
||||
bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl)
|
||||
bool osd_client_t::handle_reply_hdr()
|
||||
{
|
||||
auto cl = this;
|
||||
auto req_it = cl->sent_ops.find(cl->read_op->req.hdr.id);
|
||||
if (req_it == cl->sent_ops.end())
|
||||
{
|
||||
// Command out of sync. Drop connection
|
||||
fprintf(stderr, "Client %d command out of sync: id %ju\n", cl->peer_fd, cl->read_op->req.hdr.id);
|
||||
stop_client(cl->peer_fd);
|
||||
close(cl->peer_fd);
|
||||
return false;
|
||||
}
|
||||
osd_op_t *op = req_it->second;
|
||||
@@ -315,7 +326,7 @@ bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl)
|
||||
fprintf(stderr, "Client %d read reply of different length: expected %u+%u, got %jd+%u\n",
|
||||
cl->peer_fd, expected_size, op->bitmap_len, op->reply.hdr.retval, bmp_len);
|
||||
cl->sent_ops[op->req.hdr.id] = op;
|
||||
stop_client(cl->peer_fd);
|
||||
close(cl->peer_fd);
|
||||
return false;
|
||||
}
|
||||
if (bmp_len > 0)
|
||||
@@ -383,7 +394,7 @@ bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl)
|
||||
{
|
||||
reuse:
|
||||
// It's fine to reuse cl->read_op for the next reply
|
||||
handle_reply_ready(op);
|
||||
msgr->add_completion(op);
|
||||
cl->recv_list.push_back(cl->read_op->req.buf, OSD_PACKET_SIZE);
|
||||
cl->read_remaining = OSD_PACKET_SIZE;
|
||||
cl->read_state = CL_READ_HDR;
|
||||
@@ -391,24 +402,65 @@ reuse:
|
||||
return true;
|
||||
}
|
||||
|
||||
void osd_messenger_t::handle_reply_ready(osd_op_t *op)
|
||||
static uint64_t one = 1;
|
||||
|
||||
void osd_messenger_t::add_completion(osd_op_t *op, ring_loop_t *ringloop)
|
||||
{
|
||||
// Measure subop latency
|
||||
timespec tv_end;
|
||||
clock_gettime(CLOCK_REALTIME, &tv_end);
|
||||
stats.subop_stat_count[op->req.hdr.opcode]++;
|
||||
if (!stats.subop_stat_count[op->req.hdr.opcode])
|
||||
completions_mu.lock();
|
||||
bool wakeup_main_thread = !completions.size();
|
||||
completions.push_back(op);
|
||||
completions_mu.unlock();
|
||||
if (wakeup_main_thread)
|
||||
{
|
||||
stats.subop_stat_count[op->req.hdr.opcode]++;
|
||||
stats.subop_stat_sum[op->req.hdr.opcode] = 0;
|
||||
io_uring_sqe* sqe = ringloop ? ringloop->get_sqe() : NULL;
|
||||
if (!sqe)
|
||||
{
|
||||
write(completion_eventfd, &one, sizeof(one));
|
||||
}
|
||||
else
|
||||
{
|
||||
ring_data_t* data = ((ring_data_t*)sqe->user_data);
|
||||
data->callback = [](ring_data_t*){};
|
||||
my_uring_prep_write(sqe, completion_eventfd, &one, sizeof(one), 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void osd_messenger_t::handle_completions()
|
||||
{
|
||||
// Reset eventfd counter
|
||||
uint64_t ctr = 0;
|
||||
int r = read(completion_eventfd, &ctr, 8);
|
||||
if (r < 0 && errno != EAGAIN && errno != EINTR)
|
||||
{
|
||||
fprintf(stderr, "Error resetting eventfd: %s\n", strerror(errno));
|
||||
}
|
||||
completions_mu.lock();
|
||||
auto ops = std::move(completions);
|
||||
completions_mu.unlock();
|
||||
for (auto op: completions)
|
||||
{
|
||||
if (op->op_type == OSD_OP_IN)
|
||||
{
|
||||
exec_op(op);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Measure subop latency
|
||||
timespec tv_end;
|
||||
clock_gettime(CLOCK_REALTIME, &tv_end);
|
||||
stats.subop_stat_count[op->req.hdr.opcode]++;
|
||||
if (!stats.subop_stat_count[op->req.hdr.opcode])
|
||||
{
|
||||
stats.subop_stat_count[op->req.hdr.opcode]++;
|
||||
stats.subop_stat_sum[op->req.hdr.opcode] = 0;
|
||||
}
|
||||
stats.subop_stat_sum[op->req.hdr.opcode] += (
|
||||
(tv_end.tv_sec - op->tv_begin.tv_sec)*1000000 +
|
||||
(tv_end.tv_nsec - op->tv_begin.tv_nsec)/1000
|
||||
);
|
||||
// Copy lambda to be unaffected by `delete op`
|
||||
std::function<void(osd_op_t*)>(op->callback)(op);
|
||||
}
|
||||
}
|
||||
stats.subop_stat_sum[op->req.hdr.opcode] += (
|
||||
(tv_end.tv_sec - op->tv_begin.tv_sec)*1000000 +
|
||||
(tv_end.tv_nsec - op->tv_begin.tv_nsec)/1000
|
||||
);
|
||||
set_immediate.push_back([op]()
|
||||
{
|
||||
// Copy lambda to be unaffected by `delete op`
|
||||
std::function<void(osd_op_t*)>(op->callback)(op);
|
||||
});
|
||||
}
|
||||
|
@@ -15,31 +15,35 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
|
||||
{
|
||||
clock_gettime(CLOCK_REALTIME, &cur_op->tv_begin);
|
||||
}
|
||||
else
|
||||
else if (cur_op->op_type == OSD_OP_IN)
|
||||
{
|
||||
// Check that operation actually belongs to this client
|
||||
// FIXME: Review if this is still needed
|
||||
bool found = false;
|
||||
for (auto it = cl->received_ops.begin(); it != cl->received_ops.end(); it++)
|
||||
measure_exec(cur_op);
|
||||
}
|
||||
std::unique_lock<std::mutex> lock;
|
||||
if (iothreads.size())
|
||||
{
|
||||
lock = std::unique_lock<std::mutex>(cl->mu);
|
||||
}
|
||||
// Check that operation actually belongs to this client
|
||||
bool found = false;
|
||||
for (auto it = cl->received_ops.begin(); it != cl->received_ops.end(); it++)
|
||||
{
|
||||
if (*it == cur_op)
|
||||
{
|
||||
if (*it == cur_op)
|
||||
{
|
||||
found = true;
|
||||
cl->received_ops.erase(it, it+1);
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!found)
|
||||
{
|
||||
delete cur_op;
|
||||
return;
|
||||
found = true;
|
||||
cl->received_ops.erase(it, it+1);
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!found)
|
||||
{
|
||||
delete cur_op;
|
||||
return;
|
||||
}
|
||||
auto & to_send_list = cl->write_msg.msg_iovlen ? cl->next_send_list : cl->send_list;
|
||||
auto & to_outbox = cl->write_msg.msg_iovlen ? cl->next_outbox : cl->outbox;
|
||||
if (cur_op->op_type == OSD_OP_IN)
|
||||
{
|
||||
measure_exec(cur_op);
|
||||
to_send_list.push_back((iovec){ .iov_base = cur_op->reply.buf, .iov_len = OSD_PACKET_SIZE });
|
||||
}
|
||||
else
|
||||
@@ -117,12 +121,18 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
|
||||
// FIXME: It's worse because it doesn't allow batching
|
||||
while (cl->outbox.size())
|
||||
{
|
||||
try_send(cl);
|
||||
cl->try_send(NULL, true);
|
||||
}
|
||||
}
|
||||
else if (iothreads.size())
|
||||
{
|
||||
auto iot = iothreads[cl->peer_fd % iothreads.size()];
|
||||
cl->refs++;
|
||||
iot->wakeup(cl, ringloop);
|
||||
}
|
||||
else
|
||||
{
|
||||
if ((cl->write_msg.msg_iovlen > 0 || !try_send(cl)) && (cl->write_state == 0))
|
||||
if ((cl->write_msg.msg_iovlen > 0 || !cl->try_send(ringloop, use_sync_send_recv)) && (cl->write_state == 0))
|
||||
{
|
||||
cl->write_state = CL_WRITE_READY;
|
||||
write_ready_clients.push_back(cur_op->peer_fd);
|
||||
@@ -180,8 +190,9 @@ void osd_messenger_t::measure_exec(osd_op_t *cur_op)
|
||||
}
|
||||
}
|
||||
|
||||
bool osd_messenger_t::try_send(osd_client_t *cl)
|
||||
bool osd_client_t::try_send(ring_loop_t *ringloop, bool use_sync_send_recv)
|
||||
{
|
||||
auto cl = this;
|
||||
int peer_fd = cl->peer_fd;
|
||||
if (!cl->send_list.size() || cl->write_msg.msg_iovlen > 0)
|
||||
{
|
||||
@@ -198,7 +209,7 @@ bool osd_messenger_t::try_send(osd_client_t *cl)
|
||||
cl->write_msg.msg_iovlen = cl->send_list.size() < IOV_MAX ? cl->send_list.size() : IOV_MAX;
|
||||
cl->refs++;
|
||||
ring_data_t* data = ((ring_data_t*)sqe->user_data);
|
||||
data->callback = [this, cl](ring_data_t *data) { handle_send(data->res, cl); };
|
||||
data->callback = [this](ring_data_t *data) { handle_send(data->res); };
|
||||
my_uring_prep_sendmsg(sqe, peer_fd, &cl->write_msg, 0);
|
||||
}
|
||||
else
|
||||
@@ -211,18 +222,27 @@ bool osd_messenger_t::try_send(osd_client_t *cl)
|
||||
{
|
||||
result = -errno;
|
||||
}
|
||||
handle_send(result, cl);
|
||||
handle_send(result);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void osd_messenger_t::send_replies()
|
||||
{
|
||||
if (iothreads.size())
|
||||
{
|
||||
return;
|
||||
}
|
||||
for (int i = 0; i < write_ready_clients.size(); i++)
|
||||
{
|
||||
int peer_fd = write_ready_clients[i];
|
||||
auto cl_it = clients.find(peer_fd);
|
||||
if (cl_it != clients.end() && !try_send(cl_it->second))
|
||||
if (cl_it == clients.end())
|
||||
{
|
||||
continue;
|
||||
}
|
||||
auto cl = cl_it->second;
|
||||
if (!cl->try_send(ringloop, use_sync_send_recv))
|
||||
{
|
||||
write_ready_clients.erase(write_ready_clients.begin(), write_ready_clients.begin() + i);
|
||||
return;
|
||||
@@ -231,8 +251,9 @@ void osd_messenger_t::send_replies()
|
||||
write_ready_clients.clear();
|
||||
}
|
||||
|
||||
void osd_messenger_t::handle_send(int result, osd_client_t *cl)
|
||||
void osd_client_t::handle_send(int result)
|
||||
{
|
||||
auto cl = this;
|
||||
cl->write_msg.msg_iovlen = 0;
|
||||
cl->refs--;
|
||||
if (cl->peer_state == PEER_STOPPED)
|
||||
@@ -247,7 +268,7 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl)
|
||||
{
|
||||
// this is a client socket, so don't panic. just disconnect it
|
||||
fprintf(stderr, "Client %d socket write error: %d (%s). Disconnecting client\n", cl->peer_fd, -result, strerror(-result));
|
||||
stop_client(cl->peer_fd);
|
||||
close(cl->peer_fd);
|
||||
return;
|
||||
}
|
||||
if (result >= 0)
|
||||
@@ -289,23 +310,11 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl)
|
||||
#ifdef WITH_RDMA
|
||||
if (cl->rdma_conn && !cl->outbox.size() && cl->peer_state == PEER_RDMA_CONNECTING)
|
||||
{
|
||||
// FIXME: Do something better than just forgetting the FD
|
||||
// FIXME: Ignore pings during RDMA state transition
|
||||
if (log_level > 0)
|
||||
{
|
||||
fprintf(stderr, "Successfully connected with client %d using RDMA\n", cl->peer_fd);
|
||||
}
|
||||
cl->peer_state = PEER_RDMA;
|
||||
tfd->set_fd_handler(cl->peer_fd, false, [this](int peer_fd, int epoll_events)
|
||||
{
|
||||
// Do not miss the disconnection!
|
||||
if (epoll_events & EPOLLRDHUP)
|
||||
{
|
||||
handle_peer_epoll(peer_fd, epoll_events);
|
||||
}
|
||||
});
|
||||
// Add the initial receive request
|
||||
try_recv_rdma(cl);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
@@ -78,7 +78,7 @@ void osd_messenger_t::stop_client(int peer_fd, bool force, bool force_delete)
|
||||
}
|
||||
#ifndef __MOCK__
|
||||
// Then remove FD from the eventloop so we don't accidentally read something
|
||||
tfd->set_fd_handler(peer_fd, false, NULL);
|
||||
tfd->set_fd_handler(peer_fd, 0, NULL);
|
||||
if (cl->connect_timeout_id >= 0)
|
||||
{
|
||||
tfd->clear_timer(cl->connect_timeout_id);
|
||||
|
@@ -655,7 +655,7 @@ help:
|
||||
ringloop->register_consumer(&consumer);
|
||||
// Add FD to epoll
|
||||
bool stop = false;
|
||||
epmgr->tfd->set_fd_handler(sockfd[0], false, [this, &stop](int peer_fd, int epoll_events)
|
||||
epmgr->tfd->set_fd_handler(sockfd[0], EPOLLIN, [this, &stop](int peer_fd, int epoll_events)
|
||||
{
|
||||
if (epoll_events & EPOLLRDHUP)
|
||||
{
|
||||
|
@@ -12,6 +12,7 @@ add_library(vitastor_cli STATIC
|
||||
cli_ls.cpp
|
||||
cli_create.cpp
|
||||
cli_modify.cpp
|
||||
cli_osd_tree.cpp
|
||||
cli_flatten.cpp
|
||||
cli_merge.cpp
|
||||
cli_rm_data.cpp
|
||||
|
@@ -118,6 +118,12 @@ static const char* help_text =
|
||||
" With --dry-run only checks if deletion is possible without data loss and\n"
|
||||
" redundancy degradation.\n"
|
||||
"\n"
|
||||
"vitastor-cli osd-tree\n"
|
||||
" Show current OSD tree.\n"
|
||||
"\n"
|
||||
"vitastor-cli osds|ls-osd|osd-ls\n"
|
||||
" Show current OSDs as list.\n"
|
||||
"\n"
|
||||
"vitastor-cli create-pool|pool-create <name> (-s <pg_size>|--ec <N>+<K>) -n <pg_count> [OPTIONS]\n"
|
||||
" Create a pool. Required parameters:\n"
|
||||
" -s|--pg_size R Number of replicas for replicated pools\n"
|
||||
@@ -389,6 +395,17 @@ static int run(cli_tool_t *p, json11::Json::object cfg)
|
||||
// Allocate a new OSD number
|
||||
action_cb = p->start_alloc_osd(cfg);
|
||||
}
|
||||
else if (cmd[0] == "osd-tree")
|
||||
{
|
||||
// Print OSD tree
|
||||
action_cb = p->start_osd_tree(cfg);
|
||||
}
|
||||
else if (cmd[0] == "osds" || cmd[0] == "ls-osds" || cmd[0] == "ls-osd" || cmd[0] == "osd-ls")
|
||||
{
|
||||
// Print OSD list
|
||||
cfg["flat"] = true;
|
||||
action_cb = p->start_osd_tree(cfg);
|
||||
}
|
||||
else if (cmd[0] == "create-pool" || cmd[0] == "pool-create")
|
||||
{
|
||||
// Create a new pool
|
||||
|
@@ -7,6 +7,7 @@
|
||||
|
||||
#include "json11/json11.hpp"
|
||||
#include "object_id.h"
|
||||
#include "osd_id.h"
|
||||
#include "ringloop.h"
|
||||
#include <functional>
|
||||
|
||||
@@ -56,27 +57,31 @@ public:
|
||||
friend struct snap_flattener_t;
|
||||
friend struct snap_remover_t;
|
||||
|
||||
std::function<bool(cli_result_t &)> start_status(json11::Json);
|
||||
std::function<bool(cli_result_t &)> start_alloc_osd(json11::Json);
|
||||
std::function<bool(cli_result_t &)> start_create(json11::Json);
|
||||
std::function<bool(cli_result_t &)> start_describe(json11::Json);
|
||||
std::function<bool(cli_result_t &)> start_fix(json11::Json);
|
||||
std::function<bool(cli_result_t &)> start_ls(json11::Json);
|
||||
std::function<bool(cli_result_t &)> start_create(json11::Json);
|
||||
std::function<bool(cli_result_t &)> start_modify(json11::Json);
|
||||
std::function<bool(cli_result_t &)> start_rm_data(json11::Json);
|
||||
std::function<bool(cli_result_t &)> start_merge(json11::Json);
|
||||
std::function<bool(cli_result_t &)> start_flatten(json11::Json);
|
||||
std::function<bool(cli_result_t &)> start_rm(json11::Json);
|
||||
std::function<bool(cli_result_t &)> start_rm_osd(json11::Json cfg);
|
||||
std::function<bool(cli_result_t &)> start_alloc_osd(json11::Json cfg);
|
||||
std::function<bool(cli_result_t &)> start_ls(json11::Json);
|
||||
std::function<bool(cli_result_t &)> start_merge(json11::Json);
|
||||
std::function<bool(cli_result_t &)> start_modify(json11::Json);
|
||||
std::function<bool(cli_result_t &)> start_osd_tree(json11::Json);
|
||||
std::function<bool(cli_result_t &)> start_pool_create(json11::Json);
|
||||
std::function<bool(cli_result_t &)> start_pool_modify(json11::Json);
|
||||
std::function<bool(cli_result_t &)> start_pool_rm(json11::Json);
|
||||
std::function<bool(cli_result_t &)> start_pool_ls(json11::Json);
|
||||
std::function<bool(cli_result_t &)> start_rm(json11::Json);
|
||||
std::function<bool(cli_result_t &)> start_rm_data(json11::Json);
|
||||
std::function<bool(cli_result_t &)> start_rm_osd(json11::Json);
|
||||
std::function<bool(cli_result_t &)> start_status(json11::Json);
|
||||
|
||||
// Should be called like loop_and_wait(start_status(), <completion callback>)
|
||||
void loop_and_wait(std::function<bool(cli_result_t &)> loop_cb, std::function<void(const cli_result_t &)> complete_cb);
|
||||
|
||||
void etcd_txn(json11::Json txn);
|
||||
|
||||
void iterate_kvs_1(json11::Json kvs, const std::string & prefix, std::function<void(uint64_t num, json11::Json)> cb);
|
||||
void iterate_kvs_2(json11::Json kvs, const std::string & prefix, std::function<void(pool_id_t pool_id, uint64_t num, json11::Json)> cb);
|
||||
};
|
||||
|
||||
std::string print_table(json11::Json items, json11::Json header, bool use_esc);
|
||||
|
@@ -72,19 +72,10 @@ struct alloc_osd_t
|
||||
if (!parent->etcd_result["succeeded"].bool_value())
|
||||
{
|
||||
std::vector<osd_num_t> used;
|
||||
for (auto kv: parent->etcd_result["responses"][0]["response_range"]["kvs"].array_items())
|
||||
parent->iterate_kvs_1(parent->etcd_result["responses"][0]["response_range"]["kvs"], "/osd/stats/", [&](uint64_t cur_osd, json11::Json value)
|
||||
{
|
||||
std::string key = base64_decode(kv["key"].string_value());
|
||||
osd_num_t cur_osd;
|
||||
char null_byte = 0;
|
||||
int scanned = sscanf(key.c_str() + parent->cli->st_cli.etcd_prefix.length(), "/osd/stats/%ju%c", &cur_osd, &null_byte);
|
||||
if (scanned != 1 || !cur_osd)
|
||||
{
|
||||
fprintf(stderr, "Invalid key in etcd: %s\n", key.c_str());
|
||||
continue;
|
||||
}
|
||||
used.push_back(cur_osd);
|
||||
}
|
||||
});
|
||||
std::sort(used.begin(), used.end());
|
||||
if (used[used.size()-1] == used.size())
|
||||
{
|
||||
|
@@ -165,3 +165,43 @@ void cli_tool_t::loop_and_wait(std::function<bool(cli_result_t &)> loop_cb, std:
|
||||
ringloop->wakeup();
|
||||
});
|
||||
}
|
||||
|
||||
void cli_tool_t::iterate_kvs_1(json11::Json kvs, const std::string & prefix, std::function<void(uint64_t, json11::Json)> cb)
|
||||
{
|
||||
bool is_pool = prefix == "/pool/stats/";
|
||||
for (auto & kv_item: kvs.array_items())
|
||||
{
|
||||
auto kv = cli->st_cli.parse_etcd_kv(kv_item);
|
||||
uint64_t num = 0;
|
||||
char null_byte = 0;
|
||||
// OSD or pool number
|
||||
int scanned = sscanf(kv.key.substr(cli->st_cli.etcd_prefix.size() + prefix.size()).c_str(), "%ju%c", &num, &null_byte);
|
||||
if (scanned != 1 || !num || is_pool && num >= POOL_ID_MAX)
|
||||
{
|
||||
fprintf(stderr, "Invalid key in etcd: %s\n", kv.key.c_str());
|
||||
continue;
|
||||
}
|
||||
cb(num, kv.value);
|
||||
}
|
||||
}
|
||||
|
||||
void cli_tool_t::iterate_kvs_2(json11::Json kvs, const std::string & prefix, std::function<void(pool_id_t pool_id, uint64_t num, json11::Json)> cb)
|
||||
{
|
||||
bool is_inode = prefix == "/config/inode/" || prefix == "/inode/stats/";
|
||||
for (auto & kv_item: kvs.array_items())
|
||||
{
|
||||
auto kv = cli->st_cli.parse_etcd_kv(kv_item);
|
||||
pool_id_t pool_id = 0;
|
||||
uint64_t num = 0;
|
||||
char null_byte = 0;
|
||||
// pool+pg or pool+inode
|
||||
int scanned = sscanf(kv.key.substr(cli->st_cli.etcd_prefix.size() + prefix.size()).c_str(),
|
||||
"%u/%ju%c", &pool_id, &num, &null_byte);
|
||||
if (scanned != 2 || !pool_id || is_inode && INODE_POOL(num) || !is_inode && num >= UINT32_MAX)
|
||||
{
|
||||
fprintf(stderr, "Invalid key in etcd: %s\n", kv.key.c_str());
|
||||
continue;
|
||||
}
|
||||
cb(pool_id, num, kv.value);
|
||||
}
|
||||
}
|
||||
|
@@ -479,10 +479,14 @@ struct snap_merger_t
|
||||
{
|
||||
if (op->retval != op->len)
|
||||
{
|
||||
rwo->error_code = -op->retval;
|
||||
rwo->error_code = op->retval;
|
||||
rwo->error_offset = op->offset;
|
||||
rwo->error_read = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
rwo->error_code = 0;
|
||||
}
|
||||
continue_rwo.push_back(rwo);
|
||||
parent->ringloop->wakeup();
|
||||
};
|
||||
@@ -553,12 +557,15 @@ struct snap_merger_t
|
||||
if (use_cas && subop->retval == -EINTR)
|
||||
{
|
||||
// CAS failure - reread and repeat optimistically
|
||||
assert(rwo->todo == 1); // initial refcount from read_and_write
|
||||
rwo->error_code = -EINTR;
|
||||
rwo->start = rwo->end = 0;
|
||||
rwo->op.version = 0;
|
||||
rwo_read(rwo);
|
||||
delete subop;
|
||||
return;
|
||||
}
|
||||
rwo->error_code = -subop->retval;
|
||||
rwo->error_code = subop->retval;
|
||||
rwo->error_offset = subop->offset;
|
||||
rwo->error_read = false;
|
||||
}
|
||||
@@ -633,7 +640,7 @@ struct snap_merger_t
|
||||
{
|
||||
char buf[1024];
|
||||
snprintf(buf, 1024, "Error %s target at offset %jx: %s",
|
||||
rwo->error_read ? "reading" : "writing", rwo->error_offset, strerror(rwo->error_code));
|
||||
rwo->error_read ? "reading" : "writing", rwo->error_offset, strerror(-rwo->error_code));
|
||||
rwo_error = std::string(buf);
|
||||
}
|
||||
delete rwo;
|
||||
|
377
src/cmd/cli_osd_tree.cpp
Normal file
377
src/cmd/cli_osd_tree.cpp
Normal file
@@ -0,0 +1,377 @@
|
||||
// Copyright (c) Vitaliy Filippov, 2024
|
||||
// License: VNPL-1.1 (see README.md for details)
|
||||
|
||||
#include <ctype.h>
|
||||
#include "cli.h"
|
||||
#include "cluster_client.h"
|
||||
#include "epoll_manager.h"
|
||||
#include "pg_states.h"
|
||||
#include "str_util.h"
|
||||
|
||||
struct placement_osd_t
|
||||
{
|
||||
osd_num_t num;
|
||||
std::string parent;
|
||||
std::vector<std::string> tags;
|
||||
uint64_t size;
|
||||
uint64_t free;
|
||||
bool up;
|
||||
double reweight;
|
||||
uint32_t block_size, bitmap_granularity, immediate_commit;
|
||||
};
|
||||
|
||||
struct placement_node_t
|
||||
{
|
||||
std::string name;
|
||||
std::string parent;
|
||||
std::string level;
|
||||
std::vector<std::string> child_nodes;
|
||||
std::vector<osd_num_t> child_osds;
|
||||
};
|
||||
|
||||
struct placement_tree_t
|
||||
{
|
||||
std::map<std::string, placement_node_t> nodes;
|
||||
std::map<osd_num_t, placement_osd_t> osds;
|
||||
};
|
||||
|
||||
struct osd_tree_printer_t
|
||||
{
|
||||
cli_tool_t *parent;
|
||||
json11::Json cfg;
|
||||
bool flat = false;
|
||||
bool show_stats = false;
|
||||
|
||||
int state = 0;
|
||||
cli_result_t result;
|
||||
|
||||
json11::Json node_placement;
|
||||
std::map<uint64_t, json11::Json> osd_config;
|
||||
std::map<uint64_t, json11::Json> osd_stats;
|
||||
std::shared_ptr<placement_tree_t> placement_tree;
|
||||
|
||||
bool is_done() { return state == 100; }
|
||||
|
||||
void load_osd_tree()
|
||||
{
|
||||
if (state == 1)
|
||||
goto resume_1;
|
||||
parent->etcd_txn(json11::Json::object {
|
||||
{ "success", json11::Json::array {
|
||||
json11::Json::object {
|
||||
{ "request_range", json11::Json::object {
|
||||
{ "key", base64_encode(parent->cli->st_cli.etcd_prefix+"/config/node_placement") },
|
||||
} },
|
||||
},
|
||||
json11::Json::object {
|
||||
{ "request_range", json11::Json::object {
|
||||
{ "key", base64_encode(parent->cli->st_cli.etcd_prefix+"/config/osd/") },
|
||||
{ "range_end", base64_encode(parent->cli->st_cli.etcd_prefix+"/config/osd0") },
|
||||
} },
|
||||
},
|
||||
json11::Json::object {
|
||||
{ "request_range", json11::Json::object {
|
||||
{ "key", base64_encode(parent->cli->st_cli.etcd_prefix+"/osd/stats/") },
|
||||
{ "range_end", base64_encode(parent->cli->st_cli.etcd_prefix+"/osd/stats0") },
|
||||
} },
|
||||
},
|
||||
} },
|
||||
});
|
||||
state = 1;
|
||||
resume_1:
|
||||
if (parent->waiting > 0)
|
||||
return;
|
||||
if (parent->etcd_err.err)
|
||||
{
|
||||
result = parent->etcd_err;
|
||||
state = 100;
|
||||
return;
|
||||
}
|
||||
for (auto & item: parent->etcd_result["responses"][0]["response_range"]["kvs"].array_items())
|
||||
{
|
||||
node_placement = parent->cli->st_cli.parse_etcd_kv(item).value;
|
||||
}
|
||||
parent->iterate_kvs_1(parent->etcd_result["responses"][1]["response_range"]["kvs"], "/config/osd/", [&](uint64_t cur_osd, json11::Json value)
|
||||
{
|
||||
osd_config[cur_osd] = value;
|
||||
});
|
||||
parent->iterate_kvs_1(parent->etcd_result["responses"][2]["response_range"]["kvs"], "/osd/stats/", [&](uint64_t cur_osd, json11::Json value)
|
||||
{
|
||||
osd_stats[cur_osd] = value;
|
||||
});
|
||||
placement_tree = make_osd_tree(node_placement, osd_config, osd_stats);
|
||||
}
|
||||
|
||||
std::shared_ptr<placement_tree_t> make_osd_tree(json11::Json node_placement_json,
|
||||
std::map<uint64_t, json11::Json> osd_config, std::map<uint64_t, json11::Json> osd_stats)
|
||||
{
|
||||
auto node_placement = node_placement_json.object_items();
|
||||
auto tree = std::make_shared<placement_tree_t>();
|
||||
tree->nodes[""] = (placement_node_t){};
|
||||
// Add non-OSD items
|
||||
for (auto & kv: node_placement)
|
||||
{
|
||||
auto osd_num = stoull_full(kv.first);
|
||||
if (!osd_num)
|
||||
{
|
||||
auto level = kv.second["level"].string_value();
|
||||
tree->nodes[kv.first] = (placement_node_t){
|
||||
.name = kv.first,
|
||||
.parent = kv.second["parent"].string_value(),
|
||||
.level = level == "" ? "unknown" : level,
|
||||
};
|
||||
}
|
||||
}
|
||||
// Add OSDs
|
||||
for (auto & kv: osd_stats)
|
||||
{
|
||||
auto & osd = tree->osds[kv.first] = (placement_osd_t){
|
||||
.num = kv.first,
|
||||
.parent = kv.second["host"].string_value(),
|
||||
.size = kv.second["size"].uint64_value(),
|
||||
.free = kv.second["free"].uint64_value(),
|
||||
.up = parent->cli->st_cli.peer_states.find(kv.first) != parent->cli->st_cli.peer_states.end(),
|
||||
.reweight = 1,
|
||||
.block_size = (uint32_t)kv.second["data_block_size"].uint64_value(),
|
||||
.bitmap_granularity = (uint32_t)kv.second["bitmap_granularity"].uint64_value(),
|
||||
.immediate_commit = etcd_state_client_t::parse_immediate_commit(kv.second["immediate_commit"].string_value()),
|
||||
};
|
||||
if (tree->nodes.find(osd.parent) == tree->nodes.end())
|
||||
{
|
||||
// Autocreate all hosts
|
||||
tree->nodes[osd.parent] = (placement_node_t){
|
||||
.name = osd.parent,
|
||||
.level = "host",
|
||||
};
|
||||
}
|
||||
auto cfg_it = osd_config.find(osd.num);
|
||||
if (cfg_it != osd_config.end())
|
||||
{
|
||||
auto & osd_cfg = cfg_it->second;
|
||||
osd.reweight = osd_cfg["reweight"].is_number() ? osd_cfg["reweight"].number_value() : 1;
|
||||
if (osd_cfg["tags"].is_array())
|
||||
{
|
||||
for (auto & jtag: osd_cfg["tags"].array_items())
|
||||
osd.tags.push_back(jtag.string_value());
|
||||
}
|
||||
}
|
||||
auto np_it = node_placement.find(std::to_string(osd.num));
|
||||
if (np_it != node_placement.end())
|
||||
{
|
||||
osd.parent = np_it->second["parent"].string_value();
|
||||
}
|
||||
tree->nodes[osd.parent].child_osds.push_back(osd.num);
|
||||
}
|
||||
// Fill child_nodes
|
||||
for (auto & ip: tree->nodes)
|
||||
{
|
||||
if (tree->nodes.find(ip.second.parent) == tree->nodes.end())
|
||||
{
|
||||
ip.second.parent = "";
|
||||
}
|
||||
if (ip.first != "")
|
||||
{
|
||||
tree->nodes[ip.second.parent].child_nodes.push_back(ip.first);
|
||||
}
|
||||
}
|
||||
// FIXME: Maybe filter out loops here
|
||||
return tree;
|
||||
}
|
||||
|
||||
std::string format_tree()
|
||||
{
|
||||
std::vector<std::string> node_seq = { "" };
|
||||
std::vector<int> indents = { -1 };
|
||||
std::map<std::string, bool> seen;
|
||||
for (int i = 0; i < node_seq.size(); i++)
|
||||
{
|
||||
if (seen[node_seq[i]])
|
||||
{
|
||||
continue;
|
||||
}
|
||||
seen[node_seq[i]] = true;
|
||||
auto & child_nodes = placement_tree->nodes.at(node_seq[i]).child_nodes;
|
||||
if (child_nodes.size())
|
||||
{
|
||||
node_seq.insert(node_seq.begin()+i+1, child_nodes.begin(), child_nodes.end());
|
||||
indents.insert(indents.begin()+i+1, child_nodes.size(), indents[i]+1);
|
||||
}
|
||||
}
|
||||
json11::Json::array fmt_items;
|
||||
for (int i = 1; i < node_seq.size(); i++)
|
||||
{
|
||||
auto & node = placement_tree->nodes.at(node_seq[i]);
|
||||
if (!flat)
|
||||
{
|
||||
fmt_items.push_back(json11::Json::object{
|
||||
{ "type", str_repeat(" ", indents[i]) + node.level },
|
||||
{ "name", node.name },
|
||||
});
|
||||
}
|
||||
std::string parent = node.name;
|
||||
if (flat)
|
||||
{
|
||||
auto cur = &placement_tree->nodes.at(node.name);
|
||||
while (cur->parent != "" && cur->parent != node.name)
|
||||
{
|
||||
parent = cur->parent+"/"+parent;
|
||||
cur = &placement_tree->nodes.at(cur->parent);
|
||||
}
|
||||
}
|
||||
for (uint64_t osd_num: node.child_osds)
|
||||
{
|
||||
auto & osd = placement_tree->osds.at(osd_num);
|
||||
auto fmt = json11::Json::object{
|
||||
{ "type", (flat ? "osd" : str_repeat(" ", indents[i]+1) + "osd") },
|
||||
{ "name", osd.num },
|
||||
{ "parent", parent },
|
||||
{ "up", osd.up ? "up" : "down" },
|
||||
{ "size", format_size(osd.size, false, true) },
|
||||
{ "used", format_q(100.0*(osd.size - osd.free)/osd.size)+" %" },
|
||||
{ "reweight", format_q(osd.reweight) },
|
||||
{ "tags", implode(",", osd.tags) },
|
||||
{ "block", format_size(osd.block_size, false, true) },
|
||||
{ "bitmap", format_size(osd.bitmap_granularity, false, true) },
|
||||
{ "commit", osd.immediate_commit == IMMEDIATE_NONE ? "none" : (osd.immediate_commit == IMMEDIATE_ALL ? "all" : "small") },
|
||||
};
|
||||
if (show_stats)
|
||||
{
|
||||
auto op_stat = osd_stats[osd_num]["op_stats"];
|
||||
fmt["read_bw"] = format_size(op_stat["primary_read"]["bps"].uint64_value())+"/s";
|
||||
fmt["write_bw"] = format_size(op_stat["primary_write"]["bps"].uint64_value())+"/s";
|
||||
fmt["delete_bw"] = format_size(op_stat["primary_delete"]["bps"].uint64_value())+"/s";
|
||||
fmt["read_iops"] = format_q(op_stat["primary_read"]["iops"].uint64_value());
|
||||
fmt["write_iops"] = format_q(op_stat["primary_write"]["iops"].uint64_value());
|
||||
fmt["delete_iops"] = format_q(op_stat["primary_delete"]["iops"].uint64_value());
|
||||
fmt["read_lat"] = format_lat(op_stat["primary_read"]["lat"].uint64_value());
|
||||
fmt["write_lat"] = format_lat(op_stat["primary_write"]["lat"].uint64_value());
|
||||
fmt["delete_lat"] = format_lat(op_stat["primary_delete"]["lat"].uint64_value());
|
||||
}
|
||||
fmt_items.push_back(std::move(fmt));
|
||||
}
|
||||
}
|
||||
json11::Json::array cols;
|
||||
if (!flat)
|
||||
{
|
||||
cols.push_back(json11::Json::object{
|
||||
{ "key", "type" },
|
||||
{ "title", "TYPE" },
|
||||
});
|
||||
}
|
||||
cols.push_back(json11::Json::object{
|
||||
{ "key", "name" },
|
||||
{ "title", flat ? "OSD" : "NAME" },
|
||||
});
|
||||
if (flat)
|
||||
{
|
||||
cols.push_back(json11::Json::object{
|
||||
{ "key", "parent" },
|
||||
{ "title", "PARENT" },
|
||||
});
|
||||
}
|
||||
cols.push_back(json11::Json::object{
|
||||
{ "key", "up" },
|
||||
{ "title", "UP" },
|
||||
});
|
||||
cols.push_back(json11::Json::object{
|
||||
{ "key", "size" },
|
||||
{ "title", "SIZE" },
|
||||
});
|
||||
cols.push_back(json11::Json::object{
|
||||
{ "key", "used" },
|
||||
{ "title", "USED%" },
|
||||
});
|
||||
cols.push_back(json11::Json::object{
|
||||
{ "key", "tags" },
|
||||
{ "title", "TAGS" },
|
||||
});
|
||||
cols.push_back(json11::Json::object{
|
||||
{ "key", "reweight" },
|
||||
{ "title", "WEIGHT" },
|
||||
});
|
||||
cols.push_back(json11::Json::object{
|
||||
{ "key", "block" },
|
||||
{ "title", "BLOCK" },
|
||||
});
|
||||
cols.push_back(json11::Json::object{
|
||||
{ "key", "bitmap" },
|
||||
{ "title", "BITMAP" },
|
||||
});
|
||||
cols.push_back(json11::Json::object{
|
||||
{ "key", "commit" },
|
||||
{ "title", "IMM" },
|
||||
});
|
||||
if (show_stats)
|
||||
{
|
||||
cols.push_back(json11::Json::object{
|
||||
{ "key", "read_bw" },
|
||||
{ "title", "READ" },
|
||||
});
|
||||
cols.push_back(json11::Json::object{
|
||||
{ "key", "read_iops" },
|
||||
{ "title", "IOPS" },
|
||||
});
|
||||
cols.push_back(json11::Json::object{
|
||||
{ "key", "read_lat" },
|
||||
{ "title", "LAT" },
|
||||
});
|
||||
cols.push_back(json11::Json::object{
|
||||
{ "key", "write_bw" },
|
||||
{ "title", "WRITE" },
|
||||
});
|
||||
cols.push_back(json11::Json::object{
|
||||
{ "key", "write_iops" },
|
||||
{ "title", "IOPS" },
|
||||
});
|
||||
cols.push_back(json11::Json::object{
|
||||
{ "key", "write_lat" },
|
||||
{ "title", "LAT" },
|
||||
});
|
||||
cols.push_back(json11::Json::object{
|
||||
{ "key", "delete_bw" },
|
||||
{ "title", "DEL" },
|
||||
});
|
||||
cols.push_back(json11::Json::object{
|
||||
{ "key", "delete_iops" },
|
||||
{ "title", "IOPS" },
|
||||
});
|
||||
cols.push_back(json11::Json::object{
|
||||
{ "key", "delete_lat" },
|
||||
{ "title", "LAT" },
|
||||
});
|
||||
}
|
||||
return print_table(fmt_items, cols, parent->color);
|
||||
}
|
||||
|
||||
void loop()
|
||||
{
|
||||
if (state == 1)
|
||||
goto resume_1;
|
||||
resume_1:
|
||||
load_osd_tree();
|
||||
if (parent->waiting > 0)
|
||||
return;
|
||||
result.text = format_tree();
|
||||
state = 100;
|
||||
}
|
||||
};
|
||||
|
||||
std::function<bool(cli_result_t &)> cli_tool_t::start_osd_tree(json11::Json cfg)
|
||||
{
|
||||
auto osd_tree_printer = new osd_tree_printer_t();
|
||||
osd_tree_printer->parent = this;
|
||||
osd_tree_printer->cfg = cfg;
|
||||
osd_tree_printer->flat = cfg["flat"].bool_value();
|
||||
osd_tree_printer->show_stats = cfg["long"].bool_value();
|
||||
return [osd_tree_printer](cli_result_t & result)
|
||||
{
|
||||
osd_tree_printer->loop();
|
||||
if (osd_tree_printer->is_done())
|
||||
{
|
||||
result = osd_tree_printer->result;
|
||||
delete osd_tree_printer;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
};
|
||||
}
|
@@ -104,37 +104,16 @@ resume_1:
|
||||
{
|
||||
config_pools = parent->cli->st_cli.parse_etcd_kv(config_pools).value;
|
||||
}
|
||||
for (auto & kv_item: space_info["responses"][0]["response_range"]["kvs"].array_items())
|
||||
parent->iterate_kvs_1(space_info["responses"][0]["response_range"]["kvs"], "/pool/stats/", [&](uint64_t pool_id, json11::Json value)
|
||||
{
|
||||
auto kv = parent->cli->st_cli.parse_etcd_kv(kv_item);
|
||||
// pool ID
|
||||
pool_id_t pool_id;
|
||||
char null_byte = 0;
|
||||
int scanned = sscanf(kv.key.substr(parent->cli->st_cli.etcd_prefix.length()).c_str(), "/pool/stats/%u%c", &pool_id, &null_byte);
|
||||
if (scanned != 1 || !pool_id || pool_id >= POOL_ID_MAX)
|
||||
{
|
||||
fprintf(stderr, "Invalid key in etcd: %s\n", kv.key.c_str());
|
||||
continue;
|
||||
}
|
||||
// pool/stats/<N>
|
||||
pool_stats[pool_id] = kv.value.object_items();
|
||||
}
|
||||
pool_stats[pool_id] = value.object_items();
|
||||
});
|
||||
std::map<pool_id_t, uint64_t> osd_free;
|
||||
for (auto & kv_item: space_info["responses"][1]["response_range"]["kvs"].array_items())
|
||||
parent->iterate_kvs_1(space_info["responses"][1]["response_range"]["kvs"], "/osd/stats/", [&](uint64_t osd_num, json11::Json value)
|
||||
{
|
||||
auto kv = parent->cli->st_cli.parse_etcd_kv(kv_item);
|
||||
// osd ID
|
||||
osd_num_t osd_num;
|
||||
char null_byte = 0;
|
||||
int scanned = sscanf(kv.key.substr(parent->cli->st_cli.etcd_prefix.length()).c_str(), "/osd/stats/%ju%c", &osd_num, &null_byte);
|
||||
if (scanned != 1 || !osd_num || osd_num >= POOL_ID_MAX)
|
||||
{
|
||||
fprintf(stderr, "Invalid key in etcd: %s\n", kv.key.c_str());
|
||||
continue;
|
||||
}
|
||||
// osd/stats/<N>::free
|
||||
osd_free[osd_num] = kv.value["free"].uint64_value();
|
||||
}
|
||||
osd_free[osd_num] = value["free"].uint64_value();
|
||||
});
|
||||
// Calculate max_avail for each pool
|
||||
for (auto & pp: parent->cli->st_cli.pool_config)
|
||||
{
|
||||
@@ -254,29 +233,17 @@ resume_1:
|
||||
state = 100;
|
||||
return;
|
||||
}
|
||||
auto pg_stats = parent->etcd_result["responses"][0]["response_range"]["kvs"];
|
||||
// Calculate recovery percent
|
||||
std::map<pool_id_t, object_counts_t> counts;
|
||||
for (auto & kv_item: pg_stats.array_items())
|
||||
parent->iterate_kvs_2(parent->etcd_result["responses"][0]["response_range"]["kvs"], "/pg/stats/",
|
||||
[&](pool_id_t pool_id, uint64_t pg_num, json11::Json value)
|
||||
{
|
||||
auto kv = parent->cli->st_cli.parse_etcd_kv(kv_item);
|
||||
// pool ID & pg number
|
||||
pool_id_t pool_id;
|
||||
pg_num_t pg_num = 0;
|
||||
char null_byte = 0;
|
||||
int scanned = sscanf(kv.key.substr(parent->cli->st_cli.etcd_prefix.length()).c_str(),
|
||||
"/pg/stats/%u/%u%c", &pool_id, &pg_num, &null_byte);
|
||||
if (scanned != 2 || !pool_id || pool_id >= POOL_ID_MAX)
|
||||
{
|
||||
fprintf(stderr, "Invalid key in etcd: %s\n", kv.key.c_str());
|
||||
continue;
|
||||
}
|
||||
auto & cnt = counts[pool_id];
|
||||
cnt.object_count += kv.value["object_count"].uint64_value();
|
||||
cnt.misplaced_count += kv.value["misplaced_count"].uint64_value();
|
||||
cnt.degraded_count += kv.value["degraded_count"].uint64_value();
|
||||
cnt.incomplete_count += kv.value["incomplete_count"].uint64_value();
|
||||
}
|
||||
cnt.object_count += value["object_count"].uint64_value();
|
||||
cnt.misplaced_count += value["misplaced_count"].uint64_value();
|
||||
cnt.degraded_count += value["degraded_count"].uint64_value();
|
||||
cnt.incomplete_count += value["incomplete_count"].uint64_value();
|
||||
});
|
||||
for (auto & pp: pool_stats)
|
||||
{
|
||||
auto & cnt = counts[pp.first];
|
||||
@@ -317,35 +284,23 @@ resume_1:
|
||||
state = 100;
|
||||
return;
|
||||
}
|
||||
auto inode_stats = parent->etcd_result["responses"][0]["response_range"]["kvs"];
|
||||
// Performance statistics
|
||||
std::map<pool_id_t, io_stats_t> pool_io;
|
||||
for (auto & kv_item: inode_stats.array_items())
|
||||
parent->iterate_kvs_2(parent->etcd_result["responses"][0]["response_range"]["kvs"], "/inode/stats/",
|
||||
[&](pool_id_t pool_id, uint64_t inode_num, json11::Json value)
|
||||
{
|
||||
auto kv = parent->cli->st_cli.parse_etcd_kv(kv_item);
|
||||
// pool ID & inode number
|
||||
pool_id_t pool_id;
|
||||
inode_t only_inode_num;
|
||||
char null_byte = 0;
|
||||
int scanned = sscanf(kv.key.substr(parent->cli->st_cli.etcd_prefix.length()).c_str(),
|
||||
"/inode/stats/%u/%ju%c", &pool_id, &only_inode_num, &null_byte);
|
||||
if (scanned != 2 || !pool_id || pool_id >= POOL_ID_MAX || INODE_POOL(only_inode_num) != 0)
|
||||
{
|
||||
fprintf(stderr, "Invalid key in etcd: %s\n", kv.key.c_str());
|
||||
continue;
|
||||
}
|
||||
auto & io = pool_io[pool_id];
|
||||
io.read_iops += kv.value["read"]["iops"].uint64_value();
|
||||
io.read_bps += kv.value["read"]["bps"].uint64_value();
|
||||
io.read_lat += kv.value["read"]["lat"].uint64_value();
|
||||
io.write_iops += kv.value["write"]["iops"].uint64_value();
|
||||
io.write_bps += kv.value["write"]["bps"].uint64_value();
|
||||
io.write_lat += kv.value["write"]["lat"].uint64_value();
|
||||
io.delete_iops += kv.value["delete"]["iops"].uint64_value();
|
||||
io.delete_bps += kv.value["delete"]["bps"].uint64_value();
|
||||
io.delete_lat += kv.value["delete"]["lat"].uint64_value();
|
||||
io.read_iops += value["read"]["iops"].uint64_value();
|
||||
io.read_bps += value["read"]["bps"].uint64_value();
|
||||
io.read_lat += value["read"]["lat"].uint64_value();
|
||||
io.write_iops += value["write"]["iops"].uint64_value();
|
||||
io.write_bps += value["write"]["bps"].uint64_value();
|
||||
io.write_lat += value["write"]["lat"].uint64_value();
|
||||
io.delete_iops += value["delete"]["iops"].uint64_value();
|
||||
io.delete_bps += value["delete"]["bps"].uint64_value();
|
||||
io.delete_lat += value["delete"]["lat"].uint64_value();
|
||||
io.count++;
|
||||
}
|
||||
});
|
||||
for (auto & pp: pool_stats)
|
||||
{
|
||||
auto & io = pool_io[pp.first];
|
||||
|
@@ -18,7 +18,7 @@ struct status_printer_t
|
||||
cli_tool_t *parent;
|
||||
|
||||
int state = 0;
|
||||
json11::Json::array mon_members, osd_stats;
|
||||
json11::Json::array mon_members;
|
||||
json11::Json agg_stats;
|
||||
std::map<pool_id_t, json11::Json::object> pool_stats;
|
||||
json11::Json::array etcd_states;
|
||||
@@ -93,7 +93,7 @@ resume_2:
|
||||
return;
|
||||
}
|
||||
mon_members = parent->etcd_result["responses"][0]["response_range"]["kvs"].array_items();
|
||||
osd_stats = parent->etcd_result["responses"][1]["response_range"]["kvs"].array_items();
|
||||
auto osd_stats = parent->etcd_result["responses"][1]["response_range"]["kvs"];
|
||||
if (parent->etcd_result["responses"][2]["response_range"]["kvs"].array_items().size() > 0)
|
||||
{
|
||||
agg_stats = parent->cli->st_cli.parse_etcd_kv(parent->etcd_result["responses"][2]["response_range"]["kvs"][0]).value;
|
||||
@@ -133,20 +133,11 @@ resume_2:
|
||||
}
|
||||
int osd_count = 0, osd_up = 0;
|
||||
uint64_t total_raw = 0, free_raw = 0, free_down_raw = 0, down_raw = 0;
|
||||
for (int i = 0; i < osd_stats.size(); i++)
|
||||
parent->iterate_kvs_1(osd_stats, "/osd/stats", [&](uint64_t stat_osd_num, json11::Json value)
|
||||
{
|
||||
auto kv = parent->cli->st_cli.parse_etcd_kv(osd_stats[i]);
|
||||
osd_num_t stat_osd_num = 0;
|
||||
char null_byte = 0;
|
||||
int scanned = sscanf(kv.key.c_str() + parent->cli->st_cli.etcd_prefix.size(), "/osd/stats/%ju%c", &stat_osd_num, &null_byte);
|
||||
if (scanned != 1 || !stat_osd_num)
|
||||
{
|
||||
fprintf(stderr, "Invalid key in etcd: %s\n", kv.key.c_str());
|
||||
continue;
|
||||
}
|
||||
osd_count++;
|
||||
auto osd_size = kv.value["size"].uint64_value();
|
||||
auto osd_free = kv.value["free"].uint64_value();
|
||||
auto osd_size = value["size"].uint64_value();
|
||||
auto osd_free = value["free"].uint64_value();
|
||||
total_raw += osd_size;
|
||||
free_raw += osd_free;
|
||||
if (!osd_free)
|
||||
@@ -164,10 +155,10 @@ resume_2:
|
||||
}
|
||||
else
|
||||
{
|
||||
down_raw += kv.value["size"].uint64_value();
|
||||
free_down_raw += kv.value["free"].uint64_value();
|
||||
down_raw += value["size"].uint64_value();
|
||||
free_down_raw += value["free"].uint64_value();
|
||||
}
|
||||
}
|
||||
});
|
||||
int pool_count = 0, pools_active = 0;
|
||||
std::map<std::string, int> pgs_by_state;
|
||||
std::string pgs_by_state_str;
|
||||
|
@@ -185,7 +185,7 @@ void kv_cli_t::run()
|
||||
fcntl(0, F_SETFL, fcntl(0, F_GETFL, 0) | O_NONBLOCK);
|
||||
try
|
||||
{
|
||||
epmgr->tfd->set_fd_handler(0, false, [this](int fd, int events)
|
||||
epmgr->tfd->set_fd_handler(0, EPOLLIN, [this](int fd, int events)
|
||||
{
|
||||
if (events & EPOLLIN)
|
||||
{
|
||||
@@ -193,7 +193,7 @@ void kv_cli_t::run()
|
||||
}
|
||||
if (events & EPOLLRDHUP)
|
||||
{
|
||||
epmgr->tfd->set_fd_handler(0, false, NULL);
|
||||
epmgr->tfd->set_fd_handler(0, 0, NULL);
|
||||
finished = true;
|
||||
}
|
||||
});
|
||||
|
@@ -189,6 +189,12 @@ void nfs_proxy_t::run(json11::Json cfg)
|
||||
cmd->epmgr = epmgr;
|
||||
cmd->cli = cli;
|
||||
watch_stats();
|
||||
// Init Pseudo-FS before starting client because it depends on inode_change_hook
|
||||
if (fsname == "")
|
||||
{
|
||||
blockfs = new block_fs_state_t();
|
||||
blockfs->init(this, cfg);
|
||||
}
|
||||
// Load image metadata
|
||||
while (!cli->is_ready())
|
||||
{
|
||||
@@ -199,13 +205,8 @@ void nfs_proxy_t::run(json11::Json cfg)
|
||||
}
|
||||
// Check default pool
|
||||
check_default_pool();
|
||||
// Check if we're using VitastorFS
|
||||
if (fsname == "")
|
||||
{
|
||||
blockfs = new block_fs_state_t();
|
||||
blockfs->init(this, cfg);
|
||||
}
|
||||
else
|
||||
// Init VitastorFS after starting client because it depends on loaded inode configuration
|
||||
if (fsname != "")
|
||||
{
|
||||
kvfs = new kv_fs_state_t();
|
||||
kvfs->init(this, cfg);
|
||||
@@ -242,7 +243,7 @@ void nfs_proxy_t::run(json11::Json cfg)
|
||||
// Create NFS socket and add it to epoll
|
||||
int nfs_socket = create_and_bind_socket(bind_address, nfs_port, 128, &listening_port);
|
||||
fcntl(nfs_socket, F_SETFL, fcntl(nfs_socket, F_GETFL, 0) | O_NONBLOCK);
|
||||
epmgr->tfd->set_fd_handler(nfs_socket, false, [this](int nfs_socket, int epoll_events)
|
||||
epmgr->tfd->set_fd_handler(nfs_socket, EPOLLIN, [this](int nfs_socket, int epoll_events)
|
||||
{
|
||||
if (epoll_events & EPOLLRDHUP)
|
||||
{
|
||||
@@ -259,7 +260,7 @@ void nfs_proxy_t::run(json11::Json cfg)
|
||||
// Create portmap socket and add it to epoll
|
||||
int portmap_socket = create_and_bind_socket(bind_address, 111, 128, NULL);
|
||||
fcntl(portmap_socket, F_SETFL, fcntl(portmap_socket, F_GETFL, 0) | O_NONBLOCK);
|
||||
epmgr->tfd->set_fd_handler(portmap_socket, false, [this](int portmap_socket, int epoll_events)
|
||||
epmgr->tfd->set_fd_handler(portmap_socket, EPOLLIN, [this](int portmap_socket, int epoll_events)
|
||||
{
|
||||
if (epoll_events & EPOLLRDHUP)
|
||||
{
|
||||
@@ -465,7 +466,7 @@ void nfs_proxy_t::do_accept(int listen_fd)
|
||||
{
|
||||
cli->proc_table.insert(fn);
|
||||
}
|
||||
epmgr->tfd->set_fd_handler(nfs_fd, true, [cli](int nfs_fd, int epoll_events)
|
||||
epmgr->tfd->set_fd_handler(nfs_fd, EPOLLOUT, [cli](int nfs_fd, int epoll_events)
|
||||
{
|
||||
// Handle incoming event
|
||||
if (epoll_events & EPOLLRDHUP)
|
||||
@@ -722,7 +723,7 @@ void nfs_client_t::stop()
|
||||
stopped = true;
|
||||
if (refs <= 0)
|
||||
{
|
||||
parent->epmgr->tfd->set_fd_handler(nfs_fd, true, NULL);
|
||||
parent->epmgr->tfd->set_fd_handler(nfs_fd, 0, NULL);
|
||||
close(nfs_fd);
|
||||
delete this;
|
||||
}
|
||||
|
@@ -361,7 +361,7 @@ void osd_t::bind_socket()
|
||||
listen_fd = create_and_bind_socket(bind_address, bind_port, listen_backlog, &listening_port);
|
||||
fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
|
||||
|
||||
epmgr->set_fd_handler(listen_fd, false, [this](int fd, int events)
|
||||
epmgr->set_fd_handler(listen_fd, EPOLLIN, [this](int fd, int events)
|
||||
{
|
||||
msgr.accept_connections(listen_fd);
|
||||
});
|
||||
|
@@ -199,12 +199,14 @@ class osd_t
|
||||
ring_consumer_t consumer;
|
||||
|
||||
// op statistics
|
||||
osd_op_stats_t prev_stats;
|
||||
osd_op_stats_t prev_stats, prev_report_stats;
|
||||
timespec report_stats_ts;
|
||||
std::map<uint64_t, inode_stats_t> inode_stats;
|
||||
std::map<uint64_t, timespec> vanishing_inodes;
|
||||
const char* recovery_stat_names[2] = { "degraded", "misplaced" };
|
||||
recovery_stat_t recovery_stat[2];
|
||||
recovery_stat_t recovery_print_prev[2];
|
||||
recovery_stat_t recovery_report_prev[2];
|
||||
|
||||
// recovery auto-tuning
|
||||
int rtune_timer_id = -1;
|
||||
@@ -252,6 +254,7 @@ class osd_t
|
||||
bool check_peer_config(osd_client_t *cl, json11::Json conf);
|
||||
void repeer_pgs(osd_num_t osd_num);
|
||||
void start_pg_peering(pg_t & pg);
|
||||
void drop_dirty_pg_connections(pool_pg_num_t pg);
|
||||
void submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps);
|
||||
void discard_list_subop(osd_op_t *list_op);
|
||||
bool stop_pg(pg_t & pg);
|
||||
|
@@ -180,6 +180,12 @@ json11::Json osd_t::get_statistics()
|
||||
json11::Json::object st;
|
||||
timespec ts;
|
||||
clock_gettime(CLOCK_REALTIME, &ts);
|
||||
uint64_t ts_diff = 0;
|
||||
if (report_stats_ts.tv_sec != 0)
|
||||
ts_diff = (ts.tv_sec - report_stats_ts.tv_sec + (ts.tv_nsec - report_stats_ts.tv_nsec) / 1000000000);
|
||||
if (!ts_diff)
|
||||
ts_diff = 1;
|
||||
report_stats_ts = ts;
|
||||
char time_str[50] = { 0 };
|
||||
sprintf(time_str, "%jd.%03ld", (uint64_t)ts.tv_sec, ts.tv_nsec/1000000);
|
||||
st["time"] = time_str;
|
||||
@@ -196,33 +202,50 @@ json11::Json osd_t::get_statistics()
|
||||
json11::Json::object op_stats, subop_stats;
|
||||
for (int i = OSD_OP_MIN; i <= OSD_OP_MAX; i++)
|
||||
{
|
||||
auto n = (msgr.stats.op_stat_count[i] - prev_report_stats.op_stat_count[i]);
|
||||
op_stats[osd_op_names[i]] = json11::Json::object {
|
||||
{ "count", msgr.stats.op_stat_count[i] },
|
||||
{ "usec", msgr.stats.op_stat_sum[i] },
|
||||
{ "bytes", msgr.stats.op_stat_bytes[i] },
|
||||
{ "lat", (msgr.stats.op_stat_sum[i] - prev_report_stats.op_stat_sum[i]) / (n < 1 ? 1 : n) },
|
||||
{ "bps", (msgr.stats.op_stat_bytes[i] - prev_report_stats.op_stat_bytes[i]) / ts_diff },
|
||||
{ "iops", n / ts_diff },
|
||||
};
|
||||
}
|
||||
for (int i = OSD_OP_MIN; i <= OSD_OP_MAX; i++)
|
||||
{
|
||||
auto n = (msgr.stats.subop_stat_count[i] - prev_report_stats.subop_stat_count[i]);
|
||||
subop_stats[osd_op_names[i]] = json11::Json::object {
|
||||
{ "count", msgr.stats.subop_stat_count[i] },
|
||||
{ "usec", msgr.stats.subop_stat_sum[i] },
|
||||
{ "lat", (msgr.stats.subop_stat_sum[i] - prev_report_stats.subop_stat_sum[i]) / (n < 1 ? 1 : n) },
|
||||
{ "iops", n / ts_diff },
|
||||
};
|
||||
}
|
||||
st["op_stats"] = op_stats;
|
||||
st["subop_stats"] = subop_stats;
|
||||
auto n0 = recovery_stat[0].count - recovery_report_prev[0].count;
|
||||
auto n1 = recovery_stat[1].count - recovery_report_prev[1].count;
|
||||
st["recovery_stats"] = json11::Json::object {
|
||||
{ recovery_stat_names[0], json11::Json::object {
|
||||
{ "count", recovery_stat[0].count },
|
||||
{ "bytes", recovery_stat[0].bytes },
|
||||
{ "usec", recovery_stat[0].usec },
|
||||
{ "lat", (recovery_stat[0].usec - recovery_report_prev[0].usec) / (n0 < 1 ? 1 : n0) },
|
||||
{ "bps", (recovery_stat[0].bytes - recovery_report_prev[0].bytes) / ts_diff },
|
||||
{ "iops", n0 / ts_diff },
|
||||
} },
|
||||
{ recovery_stat_names[1], json11::Json::object {
|
||||
{ "count", recovery_stat[1].count },
|
||||
{ "bytes", recovery_stat[1].bytes },
|
||||
{ "usec", recovery_stat[1].usec },
|
||||
{ "lat", (recovery_stat[1].usec - recovery_report_prev[1].usec) / (n1 < 1 ? 1 : n1) },
|
||||
{ "bps", (recovery_stat[1].bytes - recovery_report_prev[1].bytes) / ts_diff },
|
||||
{ "iops", n1 / ts_diff },
|
||||
} },
|
||||
};
|
||||
prev_report_stats = msgr.stats;
|
||||
memcpy(recovery_report_prev, recovery_stat, sizeof(recovery_stat));
|
||||
return st;
|
||||
}
|
||||
|
||||
|
@@ -168,20 +168,15 @@ void osd_t::reset_pg(pg_t & pg)
|
||||
dirty_pgs.erase({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
|
||||
}
|
||||
|
||||
// Repeer on each connect/disconnect peer event
|
||||
void osd_t::start_pg_peering(pg_t & pg)
|
||||
// Drop connections of clients who have this PG in dirty_pgs
|
||||
void osd_t::drop_dirty_pg_connections(pool_pg_num_t pg)
|
||||
{
|
||||
pg.state = PG_PEERING;
|
||||
this->peering_state |= OSD_PEERING_PGS;
|
||||
reset_pg(pg);
|
||||
report_pg_state(pg);
|
||||
// Drop connections of clients who have this PG in dirty_pgs
|
||||
if (immediate_commit != IMMEDIATE_ALL)
|
||||
{
|
||||
std::vector<int> to_stop;
|
||||
for (auto & cp: msgr.clients)
|
||||
{
|
||||
if (cp.second->dirty_pgs.find({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }) != cp.second->dirty_pgs.end())
|
||||
if (cp.second->dirty_pgs.find(pg) != cp.second->dirty_pgs.end())
|
||||
{
|
||||
to_stop.push_back(cp.first);
|
||||
}
|
||||
@@ -191,6 +186,16 @@ void osd_t::start_pg_peering(pg_t & pg)
|
||||
msgr.stop_client(peer_fd);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Repeer on each connect/disconnect peer event
|
||||
void osd_t::start_pg_peering(pg_t & pg)
|
||||
{
|
||||
pg.state = PG_PEERING;
|
||||
this->peering_state |= OSD_PEERING_PGS;
|
||||
reset_pg(pg);
|
||||
report_pg_state(pg);
|
||||
drop_dirty_pg_connections({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
|
||||
// Try to connect with current peers if they're up, but we don't have connections to them
|
||||
// Otherwise we may erroneously decide that the pg is incomplete :-)
|
||||
for (auto pg_osd: pg.all_peers)
|
||||
@@ -460,6 +465,7 @@ bool osd_t::stop_pg(pg_t & pg)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
drop_dirty_pg_connections({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
|
||||
if (!(pg.state & (PG_ACTIVE | PG_REPEERING)))
|
||||
{
|
||||
finish_stop_pg(pg);
|
||||
|
@@ -247,6 +247,7 @@ resume_8:
|
||||
finish:
|
||||
if (cur_op->peer_fd)
|
||||
{
|
||||
// FIXME: Do it before executing sync
|
||||
auto it = msgr.clients.find(cur_op->peer_fd);
|
||||
if (it != msgr.clients.end())
|
||||
it->second->dirty_pgs.clear();
|
||||
|
@@ -43,8 +43,7 @@ void configure_single_pg_pool(cluster_client_t *cli)
|
||||
},
|
||||
});
|
||||
cli->st_cli.on_load_pgs_hook(true);
|
||||
std::map<std::string, etcd_kv_t> changes;
|
||||
cli->st_cli.on_change_hook(changes);
|
||||
cli->st_cli.on_change_pool_config_hook();
|
||||
}
|
||||
|
||||
int *test_write(cluster_client_t *cli, uint64_t offset, uint64_t len, uint8_t c, std::function<void()> cb = NULL, bool instant = false)
|
||||
@@ -281,7 +280,8 @@ void test1()
|
||||
uint8_t c = offset < 0xE000 ? 0x56 : (offset < 0x10000 ? 0x57 : 0x58);
|
||||
if (((uint8_t*)op->iov.buf[buf_idx].iov_base)[i] != c)
|
||||
{
|
||||
printf("Write replay: mismatch at %ju\n", offset-op->req.rw.offset);
|
||||
printf("Write replay: mismatch at %ju (expected %02x, have %02x)\n", offset-op->req.rw.offset,
|
||||
c, ((uint8_t*)op->iov.buf[buf_idx].iov_base)[i]);
|
||||
goto fail;
|
||||
}
|
||||
}
|
||||
@@ -290,9 +290,9 @@ void test1()
|
||||
assert(offset == op->req.rw.offset+op->req.rw.len);
|
||||
replay_ops.push_back(op);
|
||||
}
|
||||
if (replay_start != 0 || replay_end != 0x14000)
|
||||
if (replay_start != 0 || replay_end != 0x10000)
|
||||
{
|
||||
printf("Write replay: range mismatch: %jx-%jx\n", replay_start, replay_end);
|
||||
printf("Write replay: range mismatch: 0x%jx-0x%jx (expected 0-0x10000)\n", replay_start, replay_end);
|
||||
assert(0);
|
||||
}
|
||||
for (auto op: replay_ops)
|
||||
@@ -320,8 +320,6 @@ void test1()
|
||||
check_disconnected(cli, 1);
|
||||
pretend_connected(cli, 1);
|
||||
check_op_count(cli, 1, 1);
|
||||
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x1000), 0);
|
||||
check_op_count(cli, 1, 1);
|
||||
can_complete(r1);
|
||||
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x1000), 0);
|
||||
check_completed(r1);
|
||||
@@ -341,7 +339,7 @@ void test1()
|
||||
pretend_connected(cli, 1);
|
||||
cli->continue_ops(true);
|
||||
check_op_count(cli, 1, 1);
|
||||
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x2000), 0);
|
||||
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x1000), 0);
|
||||
check_op_count(cli, 1, 1);
|
||||
can_complete(r2);
|
||||
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0x1000, 0x1000), 0);
|
||||
|
@@ -54,14 +54,14 @@ int epoll_manager_t::get_fd()
|
||||
return epoll_fd;
|
||||
}
|
||||
|
||||
void epoll_manager_t::set_fd_handler(int fd, bool wr, std::function<void(int, int)> handler)
|
||||
void epoll_manager_t::set_fd_handler(int fd, int events, std::function<void(int, int)> handler)
|
||||
{
|
||||
if (handler != NULL)
|
||||
{
|
||||
bool exists = epoll_handlers.find(fd) != epoll_handlers.end();
|
||||
epoll_event ev;
|
||||
ev.data.fd = fd;
|
||||
ev.events = (wr ? EPOLLOUT : 0) | EPOLLIN | EPOLLRDHUP | EPOLLET;
|
||||
ev.events = events | EPOLLRDHUP | EPOLLET;
|
||||
if (epoll_ctl(epoll_fd, exists ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &ev) < 0)
|
||||
{
|
||||
if (errno == ENOENT)
|
||||
|
@@ -3,6 +3,8 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <sys/epoll.h>
|
||||
|
||||
#include <map>
|
||||
|
||||
#include "ringloop.h"
|
||||
@@ -21,7 +23,7 @@ public:
|
||||
epoll_manager_t(ring_loop_t *ringloop);
|
||||
~epoll_manager_t();
|
||||
int get_fd();
|
||||
void set_fd_handler(int fd, bool wr, std::function<void(int, int)> handler);
|
||||
void set_fd_handler(int fd, int events, std::function<void(int, int)> handler);
|
||||
void handle_events(int timeout);
|
||||
|
||||
timerfd_manager_t *tfd;
|
||||
|
@@ -32,12 +32,22 @@ static inline void my_uring_prep_readv(struct io_uring_sqe *sqe, int fd, const s
|
||||
my_uring_prep_rw(IORING_OP_READV, sqe, fd, iovecs, nr_vecs, offset);
|
||||
}
|
||||
|
||||
static inline void my_uring_prep_read(struct io_uring_sqe *sqe, int fd, void *buf, unsigned nbytes, off_t offset)
|
||||
{
|
||||
my_uring_prep_rw(IORING_OP_READ, sqe, fd, buf, nbytes, offset);
|
||||
}
|
||||
|
||||
static inline void my_uring_prep_read_fixed(struct io_uring_sqe *sqe, int fd, void *buf, unsigned nbytes, off_t offset, int buf_index)
|
||||
{
|
||||
my_uring_prep_rw(IORING_OP_READ_FIXED, sqe, fd, buf, nbytes, offset);
|
||||
sqe->buf_index = buf_index;
|
||||
}
|
||||
|
||||
static inline void my_uring_prep_write(struct io_uring_sqe *sqe, int fd, void *buf, unsigned nbytes, off_t offset)
|
||||
{
|
||||
my_uring_prep_rw(IORING_OP_WRITE, sqe, fd, buf, nbytes, offset);
|
||||
}
|
||||
|
||||
static inline void my_uring_prep_writev(struct io_uring_sqe *sqe, int fd, const struct iovec *iovecs, unsigned nr_vecs, off_t offset)
|
||||
{
|
||||
my_uring_prep_rw(IORING_OP_WRITEV, sqe, fd, iovecs, nr_vecs, offset);
|
||||
|
@@ -151,10 +151,11 @@ static uint64_t size_thresh[] = { (uint64_t)1024*1024*1024*1024, (uint64_t)1024*
|
||||
static uint64_t size_thresh_d[] = { (uint64_t)1000000000000, (uint64_t)1000000000, (uint64_t)1000000, (uint64_t)1000, 0 };
|
||||
static const int size_thresh_n = sizeof(size_thresh)/sizeof(size_thresh[0]);
|
||||
static const char *size_unit = "TGMKB";
|
||||
static const char *size_unit_ns = "TGMk ";
|
||||
|
||||
std::string format_size(uint64_t size, bool nobytes)
|
||||
std::string format_size(uint64_t size, bool nobytes, bool nospace)
|
||||
{
|
||||
uint64_t *thr = nobytes ? size_thresh_d : size_thresh;
|
||||
uint64_t *thr = (nobytes ? size_thresh_d : size_thresh);
|
||||
char buf[256];
|
||||
for (int i = 0; i < size_thresh_n; i++)
|
||||
{
|
||||
@@ -165,9 +166,19 @@ std::string format_size(uint64_t size, bool nobytes)
|
||||
assert(l < sizeof(buf)-2);
|
||||
if (buf[l-1] == '0')
|
||||
l -= 2;
|
||||
buf[l] = i == size_thresh_n-1 && nobytes ? 0 : ' ';
|
||||
buf[l+1] = i == size_thresh_n-1 && nobytes ? 0 : size_unit[i];
|
||||
buf[l+2] = 0;
|
||||
if (i == size_thresh_n-1 && nobytes)
|
||||
buf[l] = 0;
|
||||
else if (nospace)
|
||||
{
|
||||
buf[l] = size_unit_ns[i];
|
||||
buf[l+1] = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
buf[l] = ' ';
|
||||
buf[l+1] = size_unit[i];
|
||||
buf[l+2] = 0;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@@ -16,7 +16,7 @@ std::string strtolower(const std::string & in);
|
||||
std::string trim(const std::string & in, const char *rm_chars = " \n\r\t");
|
||||
std::string str_replace(const std::string & in, const std::string & needle, const std::string & replacement);
|
||||
uint64_t stoull_full(const std::string & str, int base = 0);
|
||||
std::string format_size(uint64_t size, bool nobytes = false);
|
||||
std::string format_size(uint64_t size, bool nobytes = false, bool nospace = false);
|
||||
void print_help(const char *help_text, std::string exe_name, std::string cmd, bool all);
|
||||
uint64_t parse_time(std::string time_str, bool *ok = NULL);
|
||||
std::string read_all_fd(int fd);
|
||||
|
@@ -11,7 +11,7 @@
|
||||
#include <stdexcept>
|
||||
#include "timerfd_manager.h"
|
||||
|
||||
timerfd_manager_t::timerfd_manager_t(std::function<void(int, bool, std::function<void(int, int)>)> set_fd_handler)
|
||||
timerfd_manager_t::timerfd_manager_t(std::function<void(int, int, std::function<void(int, int)>)> set_fd_handler)
|
||||
{
|
||||
this->set_fd_handler = set_fd_handler;
|
||||
wait_state = 0;
|
||||
@@ -20,7 +20,7 @@ timerfd_manager_t::timerfd_manager_t(std::function<void(int, bool, std::function
|
||||
{
|
||||
throw std::runtime_error(std::string("timerfd_create: ") + strerror(errno));
|
||||
}
|
||||
set_fd_handler(timerfd, false, [this](int fd, int events)
|
||||
set_fd_handler(timerfd, EPOLLIN, [this](int fd, int events)
|
||||
{
|
||||
handle_readable();
|
||||
});
|
||||
@@ -28,7 +28,7 @@ timerfd_manager_t::timerfd_manager_t(std::function<void(int, bool, std::function
|
||||
|
||||
timerfd_manager_t::~timerfd_manager_t()
|
||||
{
|
||||
set_fd_handler(timerfd, false, NULL);
|
||||
set_fd_handler(timerfd, 0, NULL);
|
||||
close(timerfd);
|
||||
}
|
||||
|
||||
|
@@ -30,9 +30,9 @@ class timerfd_manager_t
|
||||
void trigger_nearest();
|
||||
void handle_readable();
|
||||
public:
|
||||
std::function<void(int, bool, std::function<void(int, int)>)> set_fd_handler;
|
||||
std::function<void(int, int, std::function<void(int, int)>)> set_fd_handler;
|
||||
|
||||
timerfd_manager_t(std::function<void(int, bool, std::function<void(int, int)>)> set_fd_handler);
|
||||
timerfd_manager_t(std::function<void(int, int, std::function<void(int, int)>)> set_fd_handler);
|
||||
~timerfd_manager_t();
|
||||
int set_timer(uint64_t millis, bool repeat, std::function<void(int)> callback);
|
||||
int set_timer_us(uint64_t micros, bool repeat, std::function<void(int)> callback);
|
||||
|
Reference in New Issue
Block a user