forked from vitalif/vitastor
Rework continue_ops() to remove a CPU hot spot
This rework increases fio -rw=randread -iodepth=128 result from ~120k to ~180k iops :)rdma-zerocopy
parent
d749159585
commit
f8ff39b0ab
|
@ -108,8 +108,101 @@ cluster_op_t::~cluster_op_t()
|
|||
}
|
||||
}
|
||||
|
||||
void cluster_client_t::calc_wait(cluster_op_t *op)
|
||||
{
|
||||
op->prev_wait = 0;
|
||||
if (op->opcode == OSD_OP_WRITE)
|
||||
{
|
||||
for (auto prev = op->prev; prev; prev = prev->prev)
|
||||
{
|
||||
if (prev->opcode == OSD_OP_SYNC ||
|
||||
prev->opcode == OSD_OP_WRITE && !(op->flags & OP_FLUSH_BUFFER) && (prev->flags & OP_FLUSH_BUFFER))
|
||||
{
|
||||
op->prev_wait++;
|
||||
}
|
||||
}
|
||||
if (!op->prev_wait && pgs_loaded)
|
||||
continue_rw(op);
|
||||
}
|
||||
else if (op->opcode == OSD_OP_SYNC)
|
||||
{
|
||||
for (auto prev = op->prev; prev; prev = prev->prev)
|
||||
{
|
||||
if (prev->opcode == OSD_OP_SYNC || prev->opcode == OSD_OP_WRITE)
|
||||
{
|
||||
op->prev_wait++;
|
||||
}
|
||||
}
|
||||
if (!op->prev_wait && pgs_loaded)
|
||||
continue_sync(op);
|
||||
}
|
||||
else
|
||||
{
|
||||
for (auto prev = op->prev; prev; prev = prev->prev)
|
||||
{
|
||||
if (prev->opcode == OSD_OP_WRITE && prev->flags & OP_FLUSH_BUFFER)
|
||||
{
|
||||
op->prev_wait++;
|
||||
}
|
||||
else if (prev->opcode == OSD_OP_WRITE || prev->opcode == OSD_OP_READ)
|
||||
{
|
||||
// Flushes are always in the beginning
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!op->prev_wait && pgs_loaded)
|
||||
continue_rw(op);
|
||||
}
|
||||
}
|
||||
|
||||
void cluster_client_t::inc_wait(uint64_t opcode, uint64_t flags, cluster_op_t *next, int inc)
|
||||
{
|
||||
if (opcode == OSD_OP_WRITE)
|
||||
{
|
||||
while (next)
|
||||
{
|
||||
auto n2 = next->next;
|
||||
if (next->opcode == OSD_OP_SYNC ||
|
||||
next->opcode == OSD_OP_WRITE && (flags & OP_FLUSH_BUFFER) && !(next->flags & OP_FLUSH_BUFFER) ||
|
||||
next->opcode == OSD_OP_READ && (flags & OP_FLUSH_BUFFER))
|
||||
{
|
||||
next->prev_wait += inc;
|
||||
if (!next->prev_wait)
|
||||
{
|
||||
if (next->opcode == OSD_OP_SYNC)
|
||||
continue_sync(next);
|
||||
else
|
||||
continue_rw(next);
|
||||
}
|
||||
}
|
||||
next = n2;
|
||||
}
|
||||
}
|
||||
else if (opcode == OSD_OP_SYNC)
|
||||
{
|
||||
while (next)
|
||||
{
|
||||
auto n2 = next->next;
|
||||
if (next->opcode == OSD_OP_SYNC || next->opcode == OSD_OP_WRITE)
|
||||
{
|
||||
next->prev_wait += inc;
|
||||
if (!next->prev_wait)
|
||||
{
|
||||
if (next->opcode == OSD_OP_SYNC)
|
||||
continue_sync(next);
|
||||
else
|
||||
continue_rw(next);
|
||||
}
|
||||
}
|
||||
next = n2;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void cluster_client_t::erase_op(cluster_op_t *op)
|
||||
{
|
||||
uint64_t opcode = op->opcode, flags = op->flags;
|
||||
cluster_op_t *next = op->next;
|
||||
if (op->prev)
|
||||
op->prev->next = op->next;
|
||||
if (op->next)
|
||||
|
@ -120,6 +213,8 @@ void cluster_client_t::erase_op(cluster_op_t *op)
|
|||
op_queue_tail = op->prev;
|
||||
op->next = op->prev = NULL;
|
||||
std::function<void(cluster_op_t*)>(op->callback)(op);
|
||||
if (!immediate_commit)
|
||||
inc_wait(opcode, flags, next, -1);
|
||||
}
|
||||
|
||||
void cluster_client_t::continue_ops(bool up_retry)
|
||||
|
@ -136,46 +231,20 @@ void cluster_client_t::continue_ops(bool up_retry)
|
|||
}
|
||||
restart:
|
||||
continuing_ops = 1;
|
||||
bool has_flushes = false, has_writes = false;
|
||||
for (auto op = op_queue_head; op; )
|
||||
{
|
||||
cluster_op_t *next_op = op->next;
|
||||
bool rm = false, is_flush = op->flags & OP_FLUSH_BUFFER;
|
||||
auto opcode = op->opcode;
|
||||
if (!op->up_wait || up_retry)
|
||||
{
|
||||
op->up_wait = false;
|
||||
if (opcode == OSD_OP_READ || opcode == OSD_OP_WRITE)
|
||||
if (!op->prev_wait)
|
||||
{
|
||||
if (is_flush || !has_flushes)
|
||||
{
|
||||
// Regular writes can't proceed before buffer flushes
|
||||
rm = continue_rw(op);
|
||||
if (op->opcode == OSD_OP_SYNC)
|
||||
continue_sync(op);
|
||||
else
|
||||
continue_rw(op);
|
||||
}
|
||||
}
|
||||
else if (opcode == OSD_OP_SYNC)
|
||||
{
|
||||
if (!has_writes)
|
||||
{
|
||||
// SYNC can't proceed before previous writes
|
||||
rm = continue_sync(op);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (opcode == OSD_OP_WRITE)
|
||||
{
|
||||
has_writes = has_writes || !rm;
|
||||
if (is_flush)
|
||||
{
|
||||
has_flushes = has_flushes || !rm;
|
||||
}
|
||||
}
|
||||
else if (opcode == OSD_OP_SYNC)
|
||||
{
|
||||
// Postpone writes until previous SYNC completes
|
||||
// ...so dirty_writes can't contain anything newer than SYNC
|
||||
has_flushes = has_flushes || !rm;
|
||||
}
|
||||
op = next_op;
|
||||
if (continuing_ops == 2)
|
||||
{
|
||||
|
@ -218,11 +287,8 @@ void cluster_client_t::on_load_config_hook(json11::Json::object & config)
|
|||
{
|
||||
throw std::runtime_error("Bad block size");
|
||||
}
|
||||
if (config["immediate_commit"] == "all")
|
||||
{
|
||||
// Cluster-wide immediate_commit mode
|
||||
immediate_commit = true;
|
||||
}
|
||||
immediate_commit = (config["immediate_commit"] == "all");
|
||||
if (config.find("client_max_dirty_bytes") != config.end())
|
||||
{
|
||||
client_max_dirty_bytes = config["client_max_dirty_bytes"].uint64_value();
|
||||
|
@ -377,6 +443,7 @@ void cluster_client_t::execute(cluster_op_t *op)
|
|||
op_queue_tail = op_queue_head = sync_op;
|
||||
dirty_bytes = 0;
|
||||
dirty_ops = 0;
|
||||
calc_wait(sync_op);
|
||||
}
|
||||
dirty_bytes += op->len;
|
||||
dirty_ops++;
|
||||
|
@ -394,7 +461,15 @@ void cluster_client_t::execute(cluster_op_t *op)
|
|||
}
|
||||
else
|
||||
op_queue_tail = op_queue_head = op;
|
||||
continue_ops();
|
||||
if (!immediate_commit)
|
||||
calc_wait(op);
|
||||
else if (pgs_loaded)
|
||||
{
|
||||
if (op->opcode == OSD_OP_SYNC)
|
||||
continue_sync(op);
|
||||
else
|
||||
continue_rw(op);
|
||||
}
|
||||
}
|
||||
|
||||
void cluster_client_t::copy_write(cluster_op_t *op, std::map<object_id, cluster_buffer_t> & dirty_buffers)
|
||||
|
@ -501,11 +576,8 @@ void cluster_client_t::flush_buffer(const object_id & oid, cluster_buffer_t *wr)
|
|||
}
|
||||
else
|
||||
op_queue_tail = op_queue_head = op;
|
||||
if (continuing_ops)
|
||||
{
|
||||
// Rescan queue from the beginning
|
||||
continuing_ops = 2;
|
||||
}
|
||||
inc_wait(op->opcode, op->flags, op->next, 1);
|
||||
continue_rw(op);
|
||||
}
|
||||
|
||||
int cluster_client_t::continue_rw(cluster_op_t *op)
|
||||
|
@ -1034,7 +1106,10 @@ void cluster_client_t::handle_op_part(cluster_op_part_t *part)
|
|||
}
|
||||
if (op->inflight_count == 0)
|
||||
{
|
||||
continue_ops();
|
||||
if (op->opcode == OSD_OP_SYNC)
|
||||
continue_sync(op);
|
||||
else
|
||||
continue_rw(op);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -36,7 +36,7 @@ struct cluster_op_t
|
|||
std::function<void(cluster_op_t*)> callback;
|
||||
~cluster_op_t();
|
||||
protected:
|
||||
int flags = 0;
|
||||
uint64_t flags = 0;
|
||||
int state = 0;
|
||||
uint64_t cur_inode; // for snapshot reads
|
||||
void *buf = NULL;
|
||||
|
@ -48,6 +48,7 @@ protected:
|
|||
void *bitmap_buf = NULL, *part_bitmaps = NULL;
|
||||
unsigned bitmap_buf_size = 0;
|
||||
cluster_op_t *prev = NULL, *next = NULL;
|
||||
int prev_wait = 0;
|
||||
friend class cluster_client_t;
|
||||
};
|
||||
|
||||
|
@ -67,7 +68,8 @@ class cluster_client_t
|
|||
uint64_t bs_block_size = 0;
|
||||
uint32_t bs_bitmap_granularity = 0, bs_bitmap_size = 0;
|
||||
std::map<pool_id_t, uint64_t> pg_counts;
|
||||
bool immediate_commit = false;
|
||||
// WARNING: initially true so execute() doesn't create fake sync
|
||||
bool immediate_commit = true;
|
||||
// FIXME: Implement inmemory_commit mode. Note that it requires to return overlapping reads from memory.
|
||||
uint64_t client_max_dirty_bytes = 0;
|
||||
uint64_t client_max_dirty_ops = 0;
|
||||
|
@ -118,4 +120,6 @@ protected:
|
|||
void handle_op_part(cluster_op_part_t *part);
|
||||
void copy_part_bitmap(cluster_op_t *op, cluster_op_part_t *part);
|
||||
void erase_op(cluster_op_t *op);
|
||||
void calc_wait(cluster_op_t *op);
|
||||
void inc_wait(uint64_t opcode, uint64_t flags, cluster_op_t *next, int inc);
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue