Aggregate buffer flushes
Test / buildenv (push) Successful in 11s Details
Test / build (push) Successful in 2m45s Details
Test / make_test (push) Successful in 36s Details
Test / test_add_osd (push) Successful in 2m47s Details
Test / test_cas (push) Successful in 10s Details
Test / test_change_pg_count (push) Successful in 41s Details
Test / test_change_pg_count_ec (push) Successful in 35s Details
Test / test_change_pg_size (push) Successful in 9s Details
Test / test_create_nomaxid (push) Successful in 8s Details
Test / test_etcd_fail (push) Successful in 1m47s Details
Test / test_interrupted_rebalance (push) Successful in 2m8s Details
Test / test_interrupted_rebalance_imm (push) Successful in 2m58s Details
Test / test_interrupted_rebalance_ec (push) Successful in 1m47s Details
Test / test_interrupted_rebalance_ec_imm (push) Successful in 1m30s Details
Test / test_failure_domain (push) Successful in 8s Details
Test / test_snapshot (push) Successful in 24s Details
Test / test_snapshot_ec (push) Successful in 30s Details
Test / test_minsize_1 (push) Successful in 13s Details
Test / test_move_reappear (push) Successful in 17s Details
Test / test_rm (push) Successful in 14s Details
Test / test_snapshot_chain (push) Successful in 2m21s Details
Test / test_snapshot_chain_ec (push) Successful in 2m56s Details
Test / test_snapshot_down (push) Successful in 25s Details
Test / test_snapshot_down_ec (push) Successful in 23s Details
Test / test_splitbrain (push) Successful in 19s Details
Test / test_rebalance_verify (push) Successful in 3m39s Details
Test / test_rebalance_verify_imm (push) Successful in 3m35s Details
Test / test_rebalance_verify_ec (push) Successful in 5m6s Details
Test / test_rebalance_verify_ec_imm (push) Successful in 6m8s Details
Test / test_write (push) Successful in 50s Details
Test / test_write_xor (push) Successful in 55s Details
Test / test_write_no_same (push) Successful in 14s Details
Test / test_heal_pg_size_2 (push) Successful in 4m14s Details
Test / test_heal_ec (push) Successful in 5m2s Details
Test / test_heal_csum_32k_dmj (push) Successful in 5m1s Details
Test / test_heal_csum_32k_dj (push) Successful in 6m12s Details
Test / test_heal_csum_32k (push) Successful in 6m35s Details
Test / test_heal_csum_4k_dmj (push) Successful in 7m18s Details
Test / test_heal_csum_4k_dj (push) Successful in 6m24s Details
Test / test_heal_csum_4k (push) Successful in 5m12s Details
Test / test_scrub (push) Successful in 1m39s Details
Test / test_scrub_zero_osd_2 (push) Successful in 49s Details
Test / test_scrub_xor (push) Successful in 45s Details
Test / test_scrub_pg_size_3 (push) Successful in 1m19s Details
Test / test_scrub_pg_size_6_pg_minsize_4_osd_count_6_ec (push) Successful in 57s Details
Test / test_scrub_ec (push) Successful in 59s Details

fsync-feedback
Vitaliy Filippov 2023-08-11 02:28:49 +03:00
parent 85b6134910
commit b1a0afd10a
3 changed files with 52 additions and 19 deletions

View File

@ -41,14 +41,24 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd
{
// peer_osd just dropped connection
// determine WHICH dirty_buffers are now obsolete and repeat them
for (auto & wr: dirty_buffers)
for (auto wr_it = dirty_buffers.begin(), flush_it = wr_it, last_it = wr_it; ; )
{
if (affects_osd(wr.first.inode, wr.first.stripe, wr.second.len, peer_osd) &&
wr.second.state != CACHE_REPEATING)
bool end = wr_it == dirty_buffers.end();
bool flush_this = !end && wr_it->second.state != CACHE_REPEATING &&
affects_osd(wr_it->first.inode, wr_it->first.stripe, wr_it->second.len, peer_osd);
if (flush_it != wr_it && (end || !flush_this ||
wr_it->first.inode != flush_it->first.inode ||
wr_it->first.stripe != last_it->first.stripe+last_it->second.len))
{
// FIXME: Flush in larger parts
flush_buffer(wr.first, &wr.second);
flush_buffers(flush_it, wr_it);
flush_it = wr_it;
}
if (end)
break;
last_it = wr_it;
wr_it++;
if (!flush_this)
flush_it = wr_it;
}
continue_ops();
}
@ -627,21 +637,37 @@ void cluster_client_t::copy_write(cluster_op_t *op, std::map<object_id, cluster_
}
}
void cluster_client_t::flush_buffer(const object_id & oid, cluster_buffer_t *wr)
void cluster_client_t::flush_buffers(std::map<object_id, cluster_buffer_t>::iterator from_it,
std::map<object_id, cluster_buffer_t>::iterator to_it)
{
wr->state = CACHE_REPEATING;
auto prev_it = std::prev(to_it);
cluster_op_t *op = new cluster_op_t;
op->flags = OSD_OP_IGNORE_READONLY|OP_FLUSH_BUFFER;
op->opcode = OSD_OP_WRITE;
op->cur_inode = op->inode = oid.inode;
op->offset = oid.stripe;
op->len = wr->len;
op->iov.push_back(wr->buf, wr->len);
op->callback = [wr](cluster_op_t* op)
op->cur_inode = op->inode = from_it->first.inode;
op->offset = from_it->first.stripe;
op->len = prev_it->first.stripe + prev_it->second.len - from_it->first.stripe;
uint32_t calc_len = 0;
uint64_t flush_id = ++last_flush_id;
for (auto it = from_it; it != to_it; it++)
{
if (wr->state == CACHE_REPEATING)
it->second.state = CACHE_REPEATING;
it->second.flush_id = flush_id;
op->iov.push_back(it->second.buf, it->second.len);
calc_len += it->second.len;
}
assert(calc_len == op->len);
op->callback = [this, flush_id](cluster_op_t* op)
{
for (auto dirty_it = find_dirty(op->inode, op->offset, dirty_buffers);
dirty_it != dirty_buffers.end() && dirty_it->first.inode == op->inode &&
dirty_it->first.stripe < op->offset+op->len; dirty_it++)
{
wr->state = CACHE_DIRTY;
if (dirty_it->second.flush_id == flush_id && dirty_it->second.state == CACHE_REPEATING)
{
dirty_it->second.flush_id = 0;
dirty_it->second.state = CACHE_DIRTY;
}
}
delete op;
};

View File

@ -71,6 +71,7 @@ struct cluster_buffer_t
void *buf;
uint64_t len;
int state;
uint64_t flush_id;
};
struct inode_list_t;
@ -93,6 +94,7 @@ class cluster_client_t
std::vector<cluster_op_t*> offline_ops;
cluster_op_t *op_queue_head = NULL, *op_queue_tail = NULL;
std::map<object_id, cluster_buffer_t> dirty_buffers;
uint64_t last_flush_id = 0;
std::set<osd_num_t> dirty_osds;
uint64_t dirty_bytes = 0, dirty_ops = 0;
@ -138,7 +140,8 @@ public:
protected:
bool affects_osd(uint64_t inode, uint64_t offset, uint64_t len, osd_num_t osd);
void flush_buffer(const object_id & oid, cluster_buffer_t *wr);
void flush_buffers(std::map<object_id, cluster_buffer_t>::iterator from_it,
std::map<object_id, cluster_buffer_t>::iterator to_it);
void on_load_config_hook(json11::Json::object & config);
void on_load_pgs_hook(bool success);
void on_change_hook(std::map<std::string, etcd_kv_t> & changes);

View File

@ -192,11 +192,16 @@ void test1()
check_op_count(cli, 1, 1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 4096), 0);
check_completed(r1);
r1 = test_write(cli, 4096, 4096, 0x56);
can_complete(r1);
check_op_count(cli, 1, 1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 4096, 4096), 0);
check_completed(r1);
pretend_disconnected(cli, 1);
int *r2 = test_sync(cli);
pretend_connected(cli, 1);
check_op_count(cli, 1, 1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 4096), 0);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 8192), 0);
check_op_count(cli, 1, 1);
can_complete(r2);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_SYNC, 0, 0), 0);
@ -321,9 +326,8 @@ void test1()
check_disconnected(cli, 1);
pretend_connected(cli, 1);
cli->continue_ops(true);
check_op_count(cli, 1, 2);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x1000), 0);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0x1000, 0x1000), 0);
check_op_count(cli, 1, 1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x2000), 0);
check_op_count(cli, 1, 1);
can_complete(r2);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0x1000, 0x1000), 0);