From b1a0afd10a12580eba25d593e09f8be437c34a79 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Fri, 11 Aug 2023 02:28:49 +0300 Subject: [PATCH] Aggregate buffer flushes --- src/cluster_client.cpp | 54 +++++++++++++++++++++++++++---------- src/cluster_client.h | 5 +++- src/test_cluster_client.cpp | 12 ++++++--- 3 files changed, 52 insertions(+), 19 deletions(-) diff --git a/src/cluster_client.cpp b/src/cluster_client.cpp index eb8ab680..596ebd3c 100644 --- a/src/cluster_client.cpp +++ b/src/cluster_client.cpp @@ -41,14 +41,24 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd { // peer_osd just dropped connection // determine WHICH dirty_buffers are now obsolete and repeat them - for (auto & wr: dirty_buffers) + for (auto wr_it = dirty_buffers.begin(), flush_it = wr_it, last_it = wr_it; ; ) { - if (affects_osd(wr.first.inode, wr.first.stripe, wr.second.len, peer_osd) && - wr.second.state != CACHE_REPEATING) + bool end = wr_it == dirty_buffers.end(); + bool flush_this = !end && wr_it->second.state != CACHE_REPEATING && + affects_osd(wr_it->first.inode, wr_it->first.stripe, wr_it->second.len, peer_osd); + if (flush_it != wr_it && (end || !flush_this || + wr_it->first.inode != flush_it->first.inode || + wr_it->first.stripe != last_it->first.stripe+last_it->second.len)) { - // FIXME: Flush in larger parts - flush_buffer(wr.first, &wr.second); + flush_buffers(flush_it, wr_it); + flush_it = wr_it; } + if (end) + break; + last_it = wr_it; + wr_it++; + if (!flush_this) + flush_it = wr_it; } continue_ops(); } @@ -627,21 +637,37 @@ void cluster_client_t::copy_write(cluster_op_t *op, std::map::iterator from_it, + std::map::iterator to_it) { - wr->state = CACHE_REPEATING; + auto prev_it = std::prev(to_it); cluster_op_t *op = new cluster_op_t; op->flags = OSD_OP_IGNORE_READONLY|OP_FLUSH_BUFFER; op->opcode = OSD_OP_WRITE; - op->cur_inode = op->inode = oid.inode; - op->offset = oid.stripe; - op->len = wr->len; - op->iov.push_back(wr->buf, wr->len); - op->callback = [wr](cluster_op_t* op) + op->cur_inode = op->inode = from_it->first.inode; + op->offset = from_it->first.stripe; + op->len = prev_it->first.stripe + prev_it->second.len - from_it->first.stripe; + uint32_t calc_len = 0; + uint64_t flush_id = ++last_flush_id; + for (auto it = from_it; it != to_it; it++) { - if (wr->state == CACHE_REPEATING) + it->second.state = CACHE_REPEATING; + it->second.flush_id = flush_id; + op->iov.push_back(it->second.buf, it->second.len); + calc_len += it->second.len; + } + assert(calc_len == op->len); + op->callback = [this, flush_id](cluster_op_t* op) + { + for (auto dirty_it = find_dirty(op->inode, op->offset, dirty_buffers); + dirty_it != dirty_buffers.end() && dirty_it->first.inode == op->inode && + dirty_it->first.stripe < op->offset+op->len; dirty_it++) { - wr->state = CACHE_DIRTY; + if (dirty_it->second.flush_id == flush_id && dirty_it->second.state == CACHE_REPEATING) + { + dirty_it->second.flush_id = 0; + dirty_it->second.state = CACHE_DIRTY; + } } delete op; }; diff --git a/src/cluster_client.h b/src/cluster_client.h index acc95df4..385a7804 100644 --- a/src/cluster_client.h +++ b/src/cluster_client.h @@ -71,6 +71,7 @@ struct cluster_buffer_t void *buf; uint64_t len; int state; + uint64_t flush_id; }; struct inode_list_t; @@ -93,6 +94,7 @@ class cluster_client_t std::vector offline_ops; cluster_op_t *op_queue_head = NULL, *op_queue_tail = NULL; std::map dirty_buffers; + uint64_t last_flush_id = 0; std::set dirty_osds; uint64_t dirty_bytes = 0, dirty_ops = 0; @@ -138,7 +140,8 @@ public: protected: bool affects_osd(uint64_t inode, uint64_t offset, uint64_t len, osd_num_t osd); - void flush_buffer(const object_id & oid, cluster_buffer_t *wr); + void flush_buffers(std::map::iterator from_it, + std::map::iterator to_it); void on_load_config_hook(json11::Json::object & config); void on_load_pgs_hook(bool success); void on_change_hook(std::map & changes); diff --git a/src/test_cluster_client.cpp b/src/test_cluster_client.cpp index cece51fb..3865d7d4 100644 --- a/src/test_cluster_client.cpp +++ b/src/test_cluster_client.cpp @@ -192,11 +192,16 @@ void test1() check_op_count(cli, 1, 1); pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 4096), 0); check_completed(r1); + r1 = test_write(cli, 4096, 4096, 0x56); + can_complete(r1); + check_op_count(cli, 1, 1); + pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 4096, 4096), 0); + check_completed(r1); pretend_disconnected(cli, 1); int *r2 = test_sync(cli); pretend_connected(cli, 1); check_op_count(cli, 1, 1); - pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 4096), 0); + pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 8192), 0); check_op_count(cli, 1, 1); can_complete(r2); pretend_op_completed(cli, find_op(cli, 1, OSD_OP_SYNC, 0, 0), 0); @@ -321,9 +326,8 @@ void test1() check_disconnected(cli, 1); pretend_connected(cli, 1); cli->continue_ops(true); - check_op_count(cli, 1, 2); - pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x1000), 0); - pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0x1000, 0x1000), 0); + check_op_count(cli, 1, 1); + pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x2000), 0); check_op_count(cli, 1, 1); can_complete(r2); pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0x1000, 0x1000), 0);