diff --git a/src/client/cluster_client.cpp b/src/client/cluster_client.cpp index 4f356d69..80819823 100644 --- a/src/client/cluster_client.cpp +++ b/src/client/cluster_client.cpp @@ -617,7 +617,9 @@ void cluster_client_t::execute_internal(cluster_op_t *op) { if (!(op->flags & OP_FLUSH_BUFFER) && !op->version /* no CAS write-repeat */) { - wb->copy_write(op, CACHE_WRITTEN); + uint64_t flush_id = ++wb->last_flush_id; + wb->copy_write(op, CACHE_REPEATING, flush_id); + op->flush_id = flush_id; } if (dirty_bytes >= client_max_dirty_bytes || dirty_ops >= client_max_dirty_ops) { @@ -833,6 +835,10 @@ resume_2: auto & pool_cfg = st_cli.pool_config.at(INODE_POOL(op->inode)); op->retval = op->len / pool_cfg.bitmap_granularity; } + if (op->flush_id) + { + wb->mark_flush_written(op->inode, op->offset, op->len, op->flush_id); + } erase_op(op); return 1; } diff --git a/src/client/cluster_client.h b/src/client/cluster_client.h index fca9fb32..ea4dd9d2 100644 --- a/src/client/cluster_client.h +++ b/src/client/cluster_client.h @@ -64,6 +64,7 @@ protected: unsigned bitmap_buf_size = 0; cluster_op_t *prev = NULL, *next = NULL; int prev_wait = 0; + uint64_t flush_id = 0; friend class cluster_client_t; friend class writeback_cache_t; }; diff --git a/src/client/cluster_client_impl.h b/src/client/cluster_client_impl.h index 78972c26..a3b53343 100644 --- a/src/client/cluster_client_impl.h +++ b/src/client/cluster_client_impl.h @@ -46,11 +46,12 @@ public: bool is_left_merged(dirty_buf_it_t dirty_it); bool is_right_merged(dirty_buf_it_t dirty_it); bool is_merged(const dirty_buf_it_t & dirty_it); - void copy_write(cluster_op_t *op, int state); + void copy_write(cluster_op_t *op, int state, uint64_t new_flush_id = 0); int repeat_ops_for(cluster_client_t *cli, osd_num_t peer_osd, pool_id_t pool_id, pg_num_t pg_num); void start_writebacks(cluster_client_t *cli, int count); bool read_from_cache(cluster_op_t *op, uint32_t bitmap_granularity); void flush_buffers(cluster_client_t *cli, dirty_buf_it_t from_it, dirty_buf_it_t to_it); + void mark_flush_written(uint64_t inode, uint64_t offset, uint64_t len, uint64_t flush_id); void fsync_start(); void fsync_error(); void fsync_ok(); diff --git a/src/client/cluster_client_wb.cpp b/src/client/cluster_client_wb.cpp index 86aea2bf..6d842e70 100644 --- a/src/client/cluster_client_wb.cpp +++ b/src/client/cluster_client_wb.cpp @@ -71,7 +71,7 @@ bool writeback_cache_t::is_merged(const dirty_buf_it_t & dirty_it) return is_left_merged(dirty_it) || is_right_merged(dirty_it); } -void writeback_cache_t::copy_write(cluster_op_t *op, int state) +void writeback_cache_t::copy_write(cluster_op_t *op, int state, uint64_t new_flush_id) { // Save operation for replay when one of PGs goes out of sync // (primary OSD drops our connection in this case) @@ -180,6 +180,7 @@ void writeback_cache_t::copy_write(cluster_op_t *op, int state) .buf = buf, .len = op->len, .state = state, + .flush_id = new_flush_id, .refcnt = refcnt, }); if (state == CACHE_DIRTY) @@ -268,7 +269,7 @@ void writeback_cache_t::flush_buffers(cluster_client_t *cli, dirty_buf_it_t from writebacks_active++; op->callback = [this, flush_id](cluster_op_t* op) { - // Buffer flushes should be always retried, regardless of the error, + // Buffer flushes are always retried, regardless of the error, // so they should never result in an error here assert(op->retval == op->len); for (auto fl_it = flushed_buffers.find(flush_id); @@ -280,16 +281,7 @@ void writeback_cache_t::flush_buffers(cluster_client_t *cli, dirty_buf_it_t from } flushed_buffers.erase(fl_it++); } - for (auto dirty_it = find_dirty(op->inode, op->offset); - dirty_it != dirty_buffers.end() && dirty_it->first.inode == op->inode && - dirty_it->first.stripe < op->offset+op->len; dirty_it++) - { - if (dirty_it->second.flush_id == flush_id && dirty_it->second.state == CACHE_REPEATING) - { - dirty_it->second.flush_id = 0; - dirty_it->second.state = CACHE_WRITTEN; - } - } + mark_flush_written(op->inode, op->offset, op->len, flush_id); delete op; writebacks_active--; // We can't call execute_internal because it affects an invalid copy of the list here @@ -307,6 +299,20 @@ void writeback_cache_t::flush_buffers(cluster_client_t *cli, dirty_buf_it_t from } } +void writeback_cache_t::mark_flush_written(uint64_t inode, uint64_t offset, uint64_t len, uint64_t flush_id) +{ + for (auto dirty_it = find_dirty(inode, offset); + dirty_it != dirty_buffers.end() && dirty_it->first.inode == inode && + dirty_it->first.stripe < offset+len; dirty_it++) + { + if (dirty_it->second.flush_id == flush_id && dirty_it->second.state == CACHE_REPEATING) + { + dirty_it->second.flush_id = 0; + dirty_it->second.state = CACHE_WRITTEN; + } + } +} + void writeback_cache_t::start_writebacks(cluster_client_t *cli, int count) { if (!writeback_queue.size()) diff --git a/src/test/test_cluster_client.cpp b/src/test/test_cluster_client.cpp index 447d4817..db552159 100644 --- a/src/test/test_cluster_client.cpp +++ b/src/test/test_cluster_client.cpp @@ -43,8 +43,7 @@ void configure_single_pg_pool(cluster_client_t *cli) }, }); cli->st_cli.on_load_pgs_hook(true); - std::map changes; - cli->st_cli.on_change_hook(changes); + cli->st_cli.on_change_pool_config_hook(); } int *test_write(cluster_client_t *cli, uint64_t offset, uint64_t len, uint8_t c, std::function cb = NULL, bool instant = false) @@ -281,7 +280,8 @@ void test1() uint8_t c = offset < 0xE000 ? 0x56 : (offset < 0x10000 ? 0x57 : 0x58); if (((uint8_t*)op->iov.buf[buf_idx].iov_base)[i] != c) { - printf("Write replay: mismatch at %ju\n", offset-op->req.rw.offset); + printf("Write replay: mismatch at %ju (expected %02x, have %02x)\n", offset-op->req.rw.offset, + c, ((uint8_t*)op->iov.buf[buf_idx].iov_base)[i]); goto fail; } } @@ -290,9 +290,9 @@ void test1() assert(offset == op->req.rw.offset+op->req.rw.len); replay_ops.push_back(op); } - if (replay_start != 0 || replay_end != 0x14000) + if (replay_start != 0 || replay_end != 0x10000) { - printf("Write replay: range mismatch: %jx-%jx\n", replay_start, replay_end); + printf("Write replay: range mismatch: 0x%jx-0x%jx (expected 0-0x10000)\n", replay_start, replay_end); assert(0); } for (auto op: replay_ops) @@ -320,8 +320,6 @@ void test1() check_disconnected(cli, 1); pretend_connected(cli, 1); check_op_count(cli, 1, 1); - pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x1000), 0); - check_op_count(cli, 1, 1); can_complete(r1); pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x1000), 0); check_completed(r1); @@ -341,7 +339,7 @@ void test1() pretend_connected(cli, 1); cli->continue_ops(true); check_op_count(cli, 1, 1); - pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x2000), 0); + pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x1000), 0); check_op_count(cli, 1, 1); can_complete(r2); pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0x1000, 0x1000), 0);