diff --git a/mon/mon.js b/mon/mon.js index 9843fb1d..c6b981bf 100644 --- a/mon/mon.js +++ b/mon/mon.js @@ -78,9 +78,15 @@ const etcd_tree = { disk_alignment: 4096, bitmap_granularity: 4096, immediate_commit: false, // 'all' or 'small' + // client - configurable online + client_max_dirty_bytes: 33554432, + client_max_dirty_ops: 1024, + client_enable_writeback: false, + client_max_buffered_bytes: 33554432, + client_max_buffered_ops: 1024, + client_max_writeback_iodepth: 256, // client and osd - configurable online log_level: 0, - client_dirty_limit: 33554432, peer_connect_interval: 5, // seconds. min: 1 peer_connect_timeout: 5, // seconds. min: 1 osd_idle_timeout: 5, // seconds. min: 1 diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index d0c4fcb3..47b3c847 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -137,6 +137,7 @@ endif (${WITH_FIO}) add_library(vitastor_client SHARED cluster_client.cpp cluster_client_list.cpp + cluster_client_wb.cpp vitastor_c.cpp cli_common.cpp cli_alloc_osd.cpp @@ -300,7 +301,7 @@ target_link_libraries(test_crc32 add_executable(test_cluster_client EXCLUDE_FROM_ALL test_cluster_client.cpp - pg_states.cpp osd_ops.cpp cluster_client.cpp cluster_client_list.cpp msgr_op.cpp mock/messenger.cpp msgr_stop.cpp + pg_states.cpp osd_ops.cpp cluster_client.cpp cluster_client_list.cpp cluster_client_wb.cpp msgr_op.cpp mock/messenger.cpp msgr_stop.cpp etcd_state_client.cpp timerfd_manager.cpp str_util.cpp ../json11/json11.cpp ) target_compile_definitions(test_cluster_client PUBLIC -D__MOCK__) diff --git a/src/cli.cpp b/src/cli.cpp index f35febb7..f0d138e3 100644 --- a/src/cli.cpp +++ b/src/cli.cpp @@ -349,6 +349,7 @@ static int run(cli_tool_t *p, json11::Json::object cfg) p->ringloop->wait(); } // Destroy the client + p->cli->flush(); delete p->cli; delete p->epmgr; delete p->ringloop; diff --git a/src/cluster_client.cpp b/src/cluster_client.cpp index e627f95b..261f8127 100644 --- a/src/cluster_client.cpp +++ b/src/cluster_client.cpp @@ -3,21 +3,13 @@ #include #include -#include "cluster_client.h" - -#define SCRAP_BUFFER_SIZE 4*1024*1024 -#define PART_SENT 1 -#define PART_DONE 2 -#define PART_ERROR 4 -#define PART_RETRY 8 -#define CACHE_DIRTY 1 -#define CACHE_FLUSHING 2 -#define CACHE_REPEATING 3 -#define OP_FLUSH_BUFFER 0x02 -#define OP_IMMEDIATE_COMMIT 0x04 +#include "cluster_client_impl.h" +#include "http_client.h" // json_is_true cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd, json11::Json & config) { + wb = new writeback_cache_t(); + cli_config = config.object_items(); file_config = osd_messenger_t::read_config(config); config = osd_messenger_t::merge_configs(cli_config, file_config, etcd_global_config, {}); @@ -37,30 +29,14 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd continue_lists(); continue_raw_ops(peer_osd); } - else if (dirty_buffers.size()) + else { // peer_osd just dropped connection // determine WHICH dirty_buffers are now obsolete and repeat them - for (auto wr_it = dirty_buffers.begin(), flush_it = wr_it, last_it = wr_it; ; ) + if (wb->repeat_ops_for(this, peer_osd) > 0) { - bool end = wr_it == dirty_buffers.end(); - bool flush_this = !end && wr_it->second.state != CACHE_REPEATING && - affects_osd(wr_it->first.inode, wr_it->first.stripe, wr_it->second.len, peer_osd); - if (flush_it != wr_it && (end || !flush_this || - wr_it->first.inode != flush_it->first.inode || - wr_it->first.stripe != last_it->first.stripe+last_it->second.len)) - { - flush_buffers(flush_it, wr_it); - flush_it = wr_it; - } - if (end) - break; - last_it = wr_it; - wr_it++; - if (!flush_this) - flush_it = wr_it; + continue_ops(); } - continue_ops(); } }; msgr.exec_op = [this](osd_op_t *op) @@ -88,16 +64,14 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd cluster_client_t::~cluster_client_t() { - for (auto bp: dirty_buffers) - { - free(bp.second.buf); - } - dirty_buffers.clear(); + msgr.repeer_pgs = [this](osd_num_t){}; if (ringloop) { ringloop->unregister_consumer(&consumer); } free(scrap_buffer); + delete wb; + wb = NULL; } cluster_op_t::~cluster_op_t() @@ -146,6 +120,19 @@ void cluster_client_t::init_msgr() } } +void cluster_client_t::unshift_op(cluster_op_t *op) +{ + op->next = op_queue_head; + if (op_queue_head) + { + op_queue_head->prev = op; + op_queue_head = op; + } + else + op_queue_tail = op_queue_head = op; + inc_wait(op->opcode, op->flags, op->next, 1); +} + void cluster_client_t::calc_wait(cluster_op_t *op) { op->prev_wait = 0; @@ -166,7 +153,7 @@ void cluster_client_t::calc_wait(cluster_op_t *op) { for (auto prev = op->prev; prev; prev = prev->prev) { - if (prev->opcode == OSD_OP_SYNC || prev->opcode == OSD_OP_WRITE && !(prev->flags & OP_IMMEDIATE_COMMIT)) + if (prev->opcode == OSD_OP_SYNC || prev->opcode == OSD_OP_WRITE && (!(prev->flags & OP_IMMEDIATE_COMMIT) || enable_writeback)) { op->prev_wait++; } @@ -187,7 +174,7 @@ void cluster_client_t::inc_wait(uint64_t opcode, uint64_t flags, cluster_op_t *n while (next) { auto n2 = next->next; - if (next->opcode == OSD_OP_SYNC && !(flags & OP_IMMEDIATE_COMMIT) || + if (next->opcode == OSD_OP_SYNC && (!(flags & OP_IMMEDIATE_COMMIT) || enable_writeback) || next->opcode == OSD_OP_WRITE && (flags & OP_FLUSH_BUFFER) && !(next->flags & OP_FLUSH_BUFFER)) { next->prev_wait += inc; @@ -239,13 +226,37 @@ void cluster_client_t::erase_op(cluster_op_t *op) op_queue_tail = op->prev; op->next = op->prev = NULL; if (flags & OP_FLUSH_BUFFER) + { + // Completed flushes change writeback buffer states, + // so the callback should be run before inc_wait() + // which may continue following SYNCs, but these SYNCs + // should know about the changed buffer state + // This is ugly but this is the way we do it std::function(op->callback)(op); - if (!(flags & OP_IMMEDIATE_COMMIT)) + } + if (!(flags & OP_IMMEDIATE_COMMIT) || enable_writeback) + { inc_wait(opcode, flags, next, -1); - // Call callback at the end to avoid inconsistencies in prev_wait - // if the callback adds more operations itself + } if (!(flags & OP_FLUSH_BUFFER)) + { + // Call callback at the end to avoid inconsistencies in prev_wait + // if the callback adds more operations itself std::function(op->callback)(op); + } + if (flags & OP_FLUSH_BUFFER) + { + int i = 0; + while (i < wb->writeback_overflow.size() && wb->writebacks_active < client_max_writeback_iodepth) + { + execute_internal(wb->writeback_overflow[i]); + i++; + } + if (i > 0) + { + wb->writeback_overflow.erase(wb->writeback_overflow.begin(), wb->writeback_overflow.begin()+i); + } + } } void cluster_client_t::continue_ops(bool up_retry) @@ -289,6 +300,7 @@ void cluster_client_t::on_load_config_hook(json11::Json::object & etcd_global_co { this->etcd_global_config = etcd_global_config; config = osd_messenger_t::merge_configs(cli_config, file_config, etcd_global_config, {}); + // client_max_dirty_bytes/client_dirty_limit if (config.find("client_max_dirty_bytes") != config.end()) { client_max_dirty_bytes = config["client_max_dirty_bytes"].uint64_value(); @@ -304,11 +316,34 @@ void cluster_client_t::on_load_config_hook(json11::Json::object & etcd_global_co { client_max_dirty_bytes = DEFAULT_CLIENT_MAX_DIRTY_BYTES; } + // client_max_dirty_ops client_max_dirty_ops = config["client_max_dirty_ops"].uint64_value(); if (!client_max_dirty_ops) { client_max_dirty_ops = DEFAULT_CLIENT_MAX_DIRTY_OPS; } + // client_enable_writeback + enable_writeback = json_is_true(config["client_enable_writeback"]) && + json_is_true(config["client_writeback_allowed"]); + // client_max_buffered_bytes + client_max_buffered_bytes = config["client_max_buffered_bytes"].uint64_value(); + if (!client_max_buffered_bytes) + { + client_max_buffered_bytes = DEFAULT_CLIENT_MAX_BUFFERED_BYTES; + } + // client_max_buffered_ops + client_max_buffered_ops = config["client_max_buffered_ops"].uint64_value(); + if (!client_max_buffered_ops) + { + client_max_buffered_ops = DEFAULT_CLIENT_MAX_BUFFERED_OPS; + } + // client_max_writeback_iodepth + client_max_writeback_iodepth = config["client_max_writeback_iodepth"].uint64_value(); + if (!client_max_writeback_iodepth) + { + client_max_writeback_iodepth = DEFAULT_CLIENT_MAX_WRITEBACK_IODEPTH; + } + // up_wait_retry_interval up_wait_retry_interval = config["up_wait_retry_interval"].uint64_value(); if (!up_wait_retry_interval) { @@ -368,6 +403,8 @@ void cluster_client_t::on_change_hook(std::map & changes bool cluster_client_t::get_immediate_commit(uint64_t inode) { + if (enable_writeback) + return false; pool_id_t pool_id = INODE_POOL(inode); if (!pool_id) return true; @@ -402,6 +439,41 @@ void cluster_client_t::on_ready(std::function fn) } } +bool cluster_client_t::flush() +{ + if (!ringloop) + { + if (wb->writeback_queue.size()) + { + wb->start_writebacks(this, 0); + cluster_op_t *sync = new cluster_op_t; + sync->opcode = OSD_OP_SYNC; + sync->callback = [this](cluster_op_t *sync) + { + delete sync; + }; + execute(sync); + } + return op_queue_head == NULL; + } + bool sync_done = false; + cluster_op_t *sync = new cluster_op_t; + sync->opcode = OSD_OP_SYNC; + sync->callback = [this, &sync_done](cluster_op_t *sync) + { + delete sync; + sync_done = true; + }; + execute(sync); + while (!sync_done) + { + ringloop->loop(); + if (!sync_done) + ringloop->wait(); + } + return true; +} + /** * How writes are synced when immediate_commit is false * @@ -422,6 +494,9 @@ void cluster_client_t::on_ready(std::function fn) * 3) if yes, send all SYNCs. otherwise, leave current SYNC as is. * 4) if any of them fail due to disconnected peers, repeat SYNC after repeating all writes * 5) if any of them fail due to other errors, fail the SYNC operation + * + * If writeback caching is turned on and writeback limit is not exhausted: + * data is just copied and the write is confirmed to the client. */ void cluster_client_t::execute(cluster_op_t *op) { @@ -437,36 +512,73 @@ void cluster_client_t::execute(cluster_op_t *op) offline_ops.push_back(op); return; } + op->flags = op->flags & OSD_OP_IGNORE_READONLY; // the only allowed flag + execute_internal(op); +} + +void cluster_client_t::execute_internal(cluster_op_t *op) +{ op->cur_inode = op->inode; op->retval = 0; - op->flags = op->flags & OSD_OP_IGNORE_READONLY; // the only allowed flag // check alignment, readonly flag and so on if (!check_rw(op)) { return; } + if (op->opcode == OSD_OP_WRITE && enable_writeback && !(op->flags & OP_FLUSH_BUFFER) && + !op->version /* FIXME no CAS writeback */) + { + if (wb->writebacks_active >= client_max_writeback_iodepth) + { + // Writeback queue is full, postpone the operation + wb->writeback_overflow.push_back(op); + return; + } + // Just copy and acknowledge the operation + wb->copy_write(op, CACHE_DIRTY); + while (wb->writeback_bytes + op->len > client_max_buffered_bytes || wb->writeback_queue_size > client_max_buffered_ops) + { + // Initiate some writeback (asynchronously) + wb->start_writebacks(this, 1); + } + op->retval = op->len; + std::function(op->callback)(op); + return; + } if (op->opcode == OSD_OP_WRITE && !(op->flags & OP_IMMEDIATE_COMMIT)) { if (!(op->flags & OP_FLUSH_BUFFER)) { - copy_write(op, dirty_buffers); + wb->copy_write(op, CACHE_WRITTEN); } if (dirty_bytes >= client_max_dirty_bytes || dirty_ops >= client_max_dirty_ops) { // Push an extra SYNC operation to flush previous writes cluster_op_t *sync_op = new cluster_op_t; sync_op->opcode = OSD_OP_SYNC; + sync_op->flags = OP_FLUSH_BUFFER; sync_op->callback = [](cluster_op_t* sync_op) { delete sync_op; }; - execute(sync_op); + execute_internal(sync_op); } dirty_bytes += op->len; dirty_ops++; } else if (op->opcode == OSD_OP_SYNC) { + // Flush the whole write-back queue first + if (!(op->flags & OP_FLUSH_BUFFER) && wb->writeback_overflow.size() > 0) + { + // Writeback queue is full, postpone the operation + wb->writeback_overflow.push_back(op); + return; + } + if (wb->writeback_queue.size()) + { + wb->start_writebacks(this, 0); + } dirty_bytes = 0; dirty_ops = 0; } @@ -478,7 +590,7 @@ void cluster_client_t::execute(cluster_op_t *op) } else op_queue_tail = op_queue_head = op; - if (!(op->flags & OP_IMMEDIATE_COMMIT)) + if (!(op->flags & OP_IMMEDIATE_COMMIT) || enable_writeback) calc_wait(op); else { @@ -552,136 +664,6 @@ void cluster_client_t::execute_raw(osd_num_t osd_num, osd_op_t *op) } } -static std::map::iterator find_dirty(uint64_t inode, uint64_t offset, std::map & dirty_buffers) -{ - auto dirty_it = dirty_buffers.lower_bound((object_id){ - .inode = inode, - .stripe = offset, - }); - while (dirty_it != dirty_buffers.begin()) - { - dirty_it--; - if (dirty_it->first.inode != inode || - (dirty_it->first.stripe + dirty_it->second.len) <= offset) - { - dirty_it++; - break; - } - } - return dirty_it; -} - -void cluster_client_t::copy_write(cluster_op_t *op, std::map & dirty_buffers) -{ - // Save operation for replay when one of PGs goes out of sync - // (primary OSD drops our connection in this case) - auto dirty_it = find_dirty(op->inode, op->offset, dirty_buffers); - uint64_t pos = op->offset, len = op->len, iov_idx = 0, iov_pos = 0; - while (len > 0) - { - uint64_t new_len = 0; - if (dirty_it == dirty_buffers.end() || dirty_it->first.inode != op->inode) - { - new_len = len; - } - else if (dirty_it->first.stripe > pos) - { - new_len = dirty_it->first.stripe - pos; - if (new_len > len) - { - new_len = len; - } - } - if (new_len > 0) - { - dirty_it = dirty_buffers.emplace_hint(dirty_it, (object_id){ - .inode = op->inode, - .stripe = pos, - }, (cluster_buffer_t){ - .buf = malloc_or_die(new_len), - .len = new_len, - }); - } - // FIXME: Split big buffers into smaller ones on overwrites. But this will require refcounting - dirty_it->second.state = CACHE_DIRTY; - uint64_t cur_len = (dirty_it->first.stripe + dirty_it->second.len - pos); - if (cur_len > len) - { - cur_len = len; - } - while (cur_len > 0 && iov_idx < op->iov.count) - { - unsigned iov_len = (op->iov.buf[iov_idx].iov_len - iov_pos); - if (iov_len <= cur_len) - { - memcpy((uint8_t*)dirty_it->second.buf + pos - dirty_it->first.stripe, - (uint8_t*)op->iov.buf[iov_idx].iov_base + iov_pos, iov_len); - pos += iov_len; - len -= iov_len; - cur_len -= iov_len; - iov_pos = 0; - iov_idx++; - } - else - { - memcpy((uint8_t*)dirty_it->second.buf + pos - dirty_it->first.stripe, - (uint8_t*)op->iov.buf[iov_idx].iov_base + iov_pos, cur_len); - pos += cur_len; - len -= cur_len; - iov_pos += cur_len; - cur_len = 0; - } - } - dirty_it++; - } -} - -void cluster_client_t::flush_buffers(std::map::iterator from_it, - std::map::iterator to_it) -{ - auto prev_it = std::prev(to_it); - cluster_op_t *op = new cluster_op_t; - op->flags = OSD_OP_IGNORE_READONLY|OP_FLUSH_BUFFER; - op->opcode = OSD_OP_WRITE; - op->cur_inode = op->inode = from_it->first.inode; - op->offset = from_it->first.stripe; - op->len = prev_it->first.stripe + prev_it->second.len - from_it->first.stripe; - uint32_t calc_len = 0; - uint64_t flush_id = ++last_flush_id; - for (auto it = from_it; it != to_it; it++) - { - it->second.state = CACHE_REPEATING; - it->second.flush_id = flush_id; - op->iov.push_back(it->second.buf, it->second.len); - calc_len += it->second.len; - } - assert(calc_len == op->len); - op->callback = [this, flush_id](cluster_op_t* op) - { - for (auto dirty_it = find_dirty(op->inode, op->offset, dirty_buffers); - dirty_it != dirty_buffers.end() && dirty_it->first.inode == op->inode && - dirty_it->first.stripe < op->offset+op->len; dirty_it++) - { - if (dirty_it->second.flush_id == flush_id && dirty_it->second.state == CACHE_REPEATING) - { - dirty_it->second.flush_id = 0; - dirty_it->second.state = CACHE_DIRTY; - } - } - delete op; - }; - op->next = op_queue_head; - if (op_queue_head) - { - op_queue_head->prev = op; - op_queue_head = op; - } - else - op_queue_tail = op_queue_head = op; - inc_wait(op->opcode, op->flags, op->next, 1); - continue_rw(op); -} - int cluster_client_t::continue_rw(cluster_op_t *op) { if (op->state == 0) @@ -785,7 +767,8 @@ resume_2: erase_op(op); return 1; } - else if (op->retval != 0 && op->retval != -EPIPE && op->retval != -EIO && op->retval != -ENOSPC) + else if (op->retval != 0 && !(op->flags & OP_FLUSH_BUFFER) && + op->retval != -EPIPE && op->retval != -EIO && op->retval != -ENOSPC) { // Fatal error (neither -EPIPE, -EIO nor -ENOSPC) // FIXME: Add a parameter to allow to not wait for EIOs (incomplete or corrupted objects) to heal @@ -857,50 +840,6 @@ static void add_iov(int size, bool skip, cluster_op_t *op, int &iov_idx, size_t } } -static void copy_to_op(cluster_op_t *op, uint64_t offset, uint8_t *buf, uint64_t len, uint32_t bitmap_granularity) -{ - if (op->opcode == OSD_OP_READ) - { - // Not OSD_OP_READ_BITMAP or OSD_OP_READ_CHAIN_BITMAP - int iov_idx = 0; - uint64_t cur_offset = op->offset; - while (iov_idx < op->iov.count && cur_offset+op->iov.buf[iov_idx].iov_len <= offset) - { - cur_offset += op->iov.buf[iov_idx].iov_len; - iov_idx++; - } - while (iov_idx < op->iov.count && cur_offset < offset+len) - { - auto & v = op->iov.buf[iov_idx]; - auto begin = (cur_offset < offset ? offset : cur_offset); - auto end = (cur_offset+v.iov_len > offset+len ? offset+len : cur_offset+v.iov_len); - memcpy( - v.iov_base + begin - cur_offset, - buf + (cur_offset <= offset ? 0 : cur_offset-offset), - end - begin - ); - cur_offset += v.iov_len; - iov_idx++; - } - } - // Set bitmap bits - int start_bit = (offset-op->offset)/bitmap_granularity; - int end_bit = (offset-op->offset+len)/bitmap_granularity; - for (int bit = start_bit; bit < end_bit;) - { - if (!(bit%8) && bit <= end_bit-8) - { - ((uint8_t*)op->bitmap_buf)[bit/8] = 0xFF; - bit += 8; - } - else - { - ((uint8_t*)op->bitmap_buf)[bit/8] |= (1 << (bit%8)); - bit++; - } - } -} - void cluster_client_t::slice_rw(cluster_op_t *op) { // Slice the request into individual object stripe requests @@ -929,52 +868,11 @@ void cluster_client_t::slice_rw(cluster_op_t *op) int iov_idx = 0; size_t iov_pos = 0; int i = 0; - bool dirty_copied = false; - if (dirty_buffers.size() && (op->opcode == OSD_OP_READ || - op->opcode == OSD_OP_READ_BITMAP || op->opcode == OSD_OP_READ_CHAIN_BITMAP)) - { - // We also have to return reads from CACHE_REPEATING buffers - they are not - // guaranteed to be present on target OSDs at the moment of repeating - // And we're also free to return data from other cached buffers just - // because it's faster - auto dirty_it = find_dirty(op->cur_inode, op->offset, dirty_buffers); - while (dirty_it != dirty_buffers.end() && dirty_it->first.inode == op->cur_inode && - dirty_it->first.stripe < op->offset+op->len) - { - uint64_t begin = dirty_it->first.stripe, end = dirty_it->first.stripe + dirty_it->second.len; - if (begin < op->offset) - begin = op->offset; - if (end > op->offset+op->len) - end = op->offset+op->len; - bool skip_prev = true; - uint64_t cur = begin, prev = begin; - while (cur < end) - { - unsigned bmp_loc = (cur - op->offset)/pool_cfg.bitmap_granularity; - bool skip = (((*((uint8_t*)op->bitmap_buf + bmp_loc/8)) >> (bmp_loc%8)) & 0x1); - if (skip_prev != skip) - { - if (cur > prev && !skip) - { - // Copy data - dirty_copied = true; - copy_to_op(op, prev, (uint8_t*)dirty_it->second.buf + prev - dirty_it->first.stripe, cur-prev, pool_cfg.bitmap_granularity); - } - skip_prev = skip; - prev = cur; - } - cur += pool_cfg.bitmap_granularity; - } - assert(cur > prev); - if (!skip_prev) - { - // Copy data - dirty_copied = true; - copy_to_op(op, prev, (uint8_t*)dirty_it->second.buf + prev - dirty_it->first.stripe, cur-prev, pool_cfg.bitmap_granularity); - } - dirty_it++; - } - } + // We also have to return reads from CACHE_REPEATING buffers - they are not + // guaranteed to be present on target OSDs at the moment of repeating + // And we're also free to return data from other cached buffers just + // because it's faster + bool dirty_copied = wb->read_from_cache(op, pool_cfg.bitmap_granularity); for (uint64_t stripe = first_stripe; stripe <= last_stripe; stripe += pg_block_size) { pg_num_t pg_num = (stripe/pool_cfg.pg_stripe_size) % pool_cfg.real_pg_count + 1; // like map_to_pg() @@ -1146,13 +1044,7 @@ int cluster_client_t::continue_sync(cluster_op_t *op) do_it++; } // Post sync to affected OSDs - for (auto & prev_op: dirty_buffers) - { - if (prev_op.second.state == CACHE_DIRTY) - { - prev_op.second.state = CACHE_FLUSHING; - } - } + wb->fsync_start(); op->parts.resize(dirty_osds.size()); op->retval = 0; { @@ -1177,13 +1069,7 @@ resume_1: } if (op->retval != 0) { - for (auto uw_it = dirty_buffers.begin(); uw_it != dirty_buffers.end(); uw_it++) - { - if (uw_it->second.state == CACHE_FLUSHING) - { - uw_it->second.state = CACHE_DIRTY; - } - } + wb->fsync_error(); if (op->retval == -EPIPE || op->retval == -EIO || op->retval == -ENOSPC) { // Retry later @@ -1197,16 +1083,7 @@ resume_1: } else { - for (auto uw_it = dirty_buffers.begin(); uw_it != dirty_buffers.end(); ) - { - if (uw_it->second.state == CACHE_FLUSHING) - { - free(uw_it->second.buf); - dirty_buffers.erase(uw_it++); - } - else - uw_it++; - } + wb->fsync_ok(); } erase_op(op); return 1; diff --git a/src/cluster_client.h b/src/cluster_client.h index 1ef7ecaf..a12a7855 100644 --- a/src/cluster_client.h +++ b/src/cluster_client.h @@ -8,6 +8,9 @@ #define DEFAULT_CLIENT_MAX_DIRTY_BYTES 32*1024*1024 #define DEFAULT_CLIENT_MAX_DIRTY_OPS 1024 +#define DEFAULT_CLIENT_MAX_BUFFERED_BYTES 32*1024*1024 +#define DEFAULT_CLIENT_MAX_BUFFERED_OPS 1024 +#define DEFAULT_CLIENT_MAX_WRITEBACK_IODEPTH 256 #define INODE_LIST_DONE 1 #define INODE_LIST_HAS_UNSTABLE 2 #define OSD_OP_READ_BITMAP OSD_OP_SEC_READ_BMP @@ -64,18 +67,12 @@ protected: cluster_op_t *prev = NULL, *next = NULL; int prev_wait = 0; friend class cluster_client_t; -}; - -struct cluster_buffer_t -{ - void *buf; - uint64_t len; - int state; - uint64_t flush_id; + friend class writeback_cache_t; }; struct inode_list_t; struct inode_list_osd_t; +class writeback_cache_t; // FIXME: Split into public and private interfaces class cluster_client_t @@ -84,17 +81,23 @@ class cluster_client_t ring_loop_t *ringloop; std::map pg_counts; - // FIXME: Implement inmemory_commit mode. Note that it requires to return overlapping reads from memory. + // client_max_dirty_* is actually "max unsynced", for the case when immediate_commit is off uint64_t client_max_dirty_bytes = 0; uint64_t client_max_dirty_ops = 0; + // writeback improves (1) small consecutive writes and (2) Q1 writes without fsync + bool enable_writeback = false; + // client_max_buffered_* is the real "dirty limit" - maximum amount of writes buffered in memory + uint64_t client_max_buffered_bytes = 0; + uint64_t client_max_buffered_ops = 0; + uint64_t client_max_writeback_iodepth = 0; + int log_level; int up_wait_retry_interval = 500; // ms int retry_timeout_id = 0; std::vector offline_ops; cluster_op_t *op_queue_head = NULL, *op_queue_tail = NULL; - std::map dirty_buffers; - uint64_t last_flush_id = 0; + writeback_cache_t *wb = NULL; std::set dirty_osds; uint64_t dirty_bytes = 0, dirty_ops = 0; @@ -124,10 +127,10 @@ public: void execute_raw(osd_num_t osd_num, osd_op_t *op); bool is_ready(); void on_ready(std::function fn); + bool flush(); bool get_immediate_commit(uint64_t inode); - static void copy_write(cluster_op_t *op, std::map & dirty_buffers); void continue_ops(bool up_retry = false); inode_list_t *list_inode_start(inode_t inode, std::function&& objects, pg_num_t pg_num, osd_num_t primary_osd, int status)> callback); @@ -140,12 +143,12 @@ public: protected: bool affects_osd(uint64_t inode, uint64_t offset, uint64_t len, osd_num_t osd); - void flush_buffers(std::map::iterator from_it, - std::map::iterator to_it); void on_load_config_hook(json11::Json::object & config); void on_load_pgs_hook(bool success); void on_change_hook(std::map & changes); void on_change_osd_state_hook(uint64_t peer_osd); + void execute_internal(cluster_op_t *op); + void unshift_op(cluster_op_t *op); int continue_rw(cluster_op_t *op); bool check_rw(cluster_op_t *op); void slice_rw(cluster_op_t *op); @@ -161,4 +164,6 @@ protected: void continue_listing(inode_list_t *lst); void send_list(inode_list_osd_t *cur_list); void continue_raw_ops(osd_num_t peer_osd); + + friend class writeback_cache_t; }; diff --git a/src/cluster_client_impl.h b/src/cluster_client_impl.h new file mode 100644 index 00000000..2f8954b4 --- /dev/null +++ b/src/cluster_client_impl.h @@ -0,0 +1,57 @@ +// Copyright (c) Vitaliy Filippov, 2019+ +// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details) + +#pragma once + +#include "cluster_client.h" + +#define SCRAP_BUFFER_SIZE 4*1024*1024 +#define PART_SENT 1 +#define PART_DONE 2 +#define PART_ERROR 4 +#define PART_RETRY 8 +#define CACHE_DIRTY 1 +#define CACHE_WRITTEN 2 +#define CACHE_FLUSHING 3 +#define CACHE_REPEATING 4 +#define OP_FLUSH_BUFFER 0x02 +#define OP_IMMEDIATE_COMMIT 0x04 + +struct cluster_buffer_t +{ + uint8_t *buf; + uint64_t len; + int state; + uint64_t flush_id; + uint64_t *refcnt; +}; + +typedef std::map::iterator dirty_buf_it_t; + +class writeback_cache_t +{ +public: + uint64_t writeback_bytes = 0; + int writeback_queue_size = 0; + int writebacks_active = 0; + uint64_t last_flush_id = 0; + + std::map dirty_buffers; + std::vector writeback_overflow; + std::vector writeback_queue; + std::multimap flushed_buffers; // flush_id => refcnt + + ~writeback_cache_t(); + dirty_buf_it_t find_dirty(uint64_t inode, uint64_t offset); + bool is_left_merged(dirty_buf_it_t dirty_it); + bool is_right_merged(dirty_buf_it_t dirty_it); + bool is_merged(const dirty_buf_it_t & dirty_it); + void copy_write(cluster_op_t *op, int state); + int repeat_ops_for(cluster_client_t *cli, osd_num_t peer_osd); + void start_writebacks(cluster_client_t *cli, int count); + bool read_from_cache(cluster_op_t *op, uint32_t bitmap_granularity); + void flush_buffers(cluster_client_t *cli, dirty_buf_it_t from_it, dirty_buf_it_t to_it); + void fsync_start(); + void fsync_error(); + void fsync_ok(); +}; diff --git a/src/cluster_client_wb.cpp b/src/cluster_client_wb.cpp new file mode 100644 index 00000000..66d4a457 --- /dev/null +++ b/src/cluster_client_wb.cpp @@ -0,0 +1,498 @@ +// Copyright (c) Vitaliy Filippov, 2019+ +// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details) + +#include + +#include "cluster_client_impl.h" + +writeback_cache_t::~writeback_cache_t() +{ + for (auto & bp: dirty_buffers) + { + if (!--(*bp.second.refcnt)) + { + free(bp.second.refcnt); // refcnt is allocated with the buffer + } + } + dirty_buffers.clear(); +} + +dirty_buf_it_t writeback_cache_t::find_dirty(uint64_t inode, uint64_t offset) +{ + auto dirty_it = dirty_buffers.lower_bound((object_id){ + .inode = inode, + .stripe = offset, + }); + while (dirty_it != dirty_buffers.begin()) + { + dirty_it--; + if (dirty_it->first.inode != inode || + (dirty_it->first.stripe + dirty_it->second.len) <= offset) + { + dirty_it++; + break; + } + } + return dirty_it; +} + +bool writeback_cache_t::is_left_merged(dirty_buf_it_t dirty_it) +{ + if (dirty_it != dirty_buffers.begin()) + { + auto prev_it = dirty_it; + prev_it--; + if (prev_it->first.inode == dirty_it->first.inode && + prev_it->first.stripe+prev_it->second.len == dirty_it->first.stripe && + prev_it->second.state == CACHE_DIRTY) + { + return true; + } + } + return false; +} + +bool writeback_cache_t::is_right_merged(dirty_buf_it_t dirty_it) +{ + auto next_it = dirty_it; + next_it++; + if (next_it != dirty_buffers.end() && + next_it->first.inode == dirty_it->first.inode && + next_it->first.stripe == dirty_it->first.stripe+dirty_it->second.len && + next_it->second.state == CACHE_DIRTY) + { + return true; + } + return false; +} + +bool writeback_cache_t::is_merged(const dirty_buf_it_t & dirty_it) +{ + return is_left_merged(dirty_it) || is_right_merged(dirty_it); +} + +void writeback_cache_t::copy_write(cluster_op_t *op, int state) +{ + // Save operation for replay when one of PGs goes out of sync + // (primary OSD drops our connection in this case) + // ...or just save it for writeback if write buffering is enabled + if (op->len == 0) + { + return; + } + auto dirty_it = find_dirty(op->inode, op->offset); + auto new_end = op->offset + op->len; + while (dirty_it != dirty_buffers.end() && + dirty_it->first.inode == op->inode && + dirty_it->first.stripe < op->offset+op->len) + { + assert(dirty_it->first.stripe + dirty_it->second.len > op->offset); + // Remove overlapping part(s) of buffers + auto old_end = dirty_it->first.stripe + dirty_it->second.len; + if (dirty_it->first.stripe < op->offset) + { + if (old_end > new_end) + { + // Split into end and start + dirty_it->second.len = op->offset - dirty_it->first.stripe; + dirty_it = dirty_buffers.emplace_hint(dirty_it, (object_id){ + .inode = op->inode, + .stripe = new_end, + }, (cluster_buffer_t){ + .buf = dirty_it->second.buf + new_end - dirty_it->first.stripe, + .len = old_end - new_end, + .state = dirty_it->second.state, + .flush_id = dirty_it->second.flush_id, + .refcnt = dirty_it->second.refcnt, + }); + (*dirty_it->second.refcnt)++; + if (dirty_it->second.state == CACHE_DIRTY) + { + writeback_bytes -= op->len; + writeback_queue_size++; + } + break; + } + else + { + // Only leave the beginning + if (dirty_it->second.state == CACHE_DIRTY) + { + writeback_bytes -= old_end - op->offset; + if (is_left_merged(dirty_it) && !is_right_merged(dirty_it)) + { + writeback_queue_size++; + } + } + dirty_it->second.len = op->offset - dirty_it->first.stripe; + dirty_it++; + } + } + else if (old_end > new_end) + { + // Only leave the end + if (dirty_it->second.state == CACHE_DIRTY) + { + writeback_bytes -= new_end - dirty_it->first.stripe; + if (!is_left_merged(dirty_it) && is_right_merged(dirty_it)) + { + writeback_queue_size++; + } + } + auto new_dirty_it = dirty_buffers.emplace_hint(dirty_it, (object_id){ + .inode = op->inode, + .stripe = new_end, + }, (cluster_buffer_t){ + .buf = dirty_it->second.buf + new_end - dirty_it->first.stripe, + .len = old_end - new_end, + .state = dirty_it->second.state, + .flush_id = dirty_it->second.flush_id, + .refcnt = dirty_it->second.refcnt, + }); + dirty_buffers.erase(dirty_it); + dirty_it = new_dirty_it; + break; + } + else + { + // Remove the whole buffer + if (dirty_it->second.state == CACHE_DIRTY && !is_merged(dirty_it)) + { + writeback_bytes -= dirty_it->second.len; + assert(writeback_queue_size > 0); + writeback_queue_size--; + } + if (!--(*dirty_it->second.refcnt)) + { + free(dirty_it->second.refcnt); + } + dirty_buffers.erase(dirty_it++); + } + } + // Overlapping buffers are removed, just insert the new one + uint64_t *refcnt = (uint64_t*)malloc_or_die(sizeof(uint64_t) + op->len); + uint8_t *buf = (uint8_t*)refcnt + sizeof(uint64_t); + *refcnt = 1; + dirty_it = dirty_buffers.emplace_hint(dirty_it, (object_id){ + .inode = op->inode, + .stripe = op->offset, + }, (cluster_buffer_t){ + .buf = buf, + .len = op->len, + .state = state, + .refcnt = refcnt, + }); + if (state == CACHE_DIRTY) + { + writeback_bytes += op->len; + // Track consecutive write-back operations + if (!is_merged(dirty_it)) + { + // is OK to contain more than actual number of consecutive + // requests as long as it doesn't miss anything. But + // is always calculated correctly. + writeback_queue_size++; + writeback_queue.push_back((object_id){ + .inode = op->inode, + .stripe = op->offset, + }); + } + } + uint64_t pos = 0, len = op->len, iov_idx = 0; + while (len > 0 && iov_idx < op->iov.count) + { + auto & iov = op->iov.buf[iov_idx]; + memcpy(buf + pos, iov.iov_base, iov.iov_len); + pos += iov.iov_len; + iov_idx++; + } +} + +int writeback_cache_t::repeat_ops_for(cluster_client_t *cli, osd_num_t peer_osd) +{ + int repeated = 0; + if (dirty_buffers.size()) + { + // peer_osd just dropped connection + // determine WHICH dirty_buffers are now obsolete and repeat them + for (auto wr_it = dirty_buffers.begin(), flush_it = wr_it, last_it = wr_it; ; ) + { + bool end = wr_it == dirty_buffers.end(); + bool flush_this = !end && wr_it->second.state != CACHE_REPEATING && + cli->affects_osd(wr_it->first.inode, wr_it->first.stripe, wr_it->second.len, peer_osd); + if (flush_it != wr_it && (end || !flush_this || + wr_it->first.inode != flush_it->first.inode || + wr_it->first.stripe != last_it->first.stripe+last_it->second.len)) + { + repeated++; + flush_buffers(cli, flush_it, wr_it); + flush_it = wr_it; + } + if (end) + break; + last_it = wr_it; + wr_it++; + if (!flush_this) + flush_it = wr_it; + } + } + return repeated; +} + +void writeback_cache_t::flush_buffers(cluster_client_t *cli, dirty_buf_it_t from_it, dirty_buf_it_t to_it) +{ + auto prev_it = to_it; + prev_it--; + bool is_writeback = from_it->second.state == CACHE_DIRTY; + cluster_op_t *op = new cluster_op_t; + op->flags = OSD_OP_IGNORE_READONLY|OP_FLUSH_BUFFER; + op->opcode = OSD_OP_WRITE; + op->cur_inode = op->inode = from_it->first.inode; + op->offset = from_it->first.stripe; + op->len = prev_it->first.stripe + prev_it->second.len - from_it->first.stripe; + uint32_t calc_len = 0; + uint64_t flush_id = ++last_flush_id; + for (auto it = from_it; it != to_it; it++) + { + it->second.state = CACHE_REPEATING; + it->second.flush_id = flush_id; + (*it->second.refcnt)++; + flushed_buffers.emplace(flush_id, it->second.refcnt); + op->iov.push_back(it->second.buf, it->second.len); + calc_len += it->second.len; + } + assert(calc_len == op->len); + writebacks_active++; + op->callback = [this, cli, flush_id](cluster_op_t* op) + { + // Buffer flushes should be always retried, regardless of the error, + // so they should never result in an error here + assert(op->retval == op->len); + for (auto fl_it = flushed_buffers.find(flush_id); + fl_it != flushed_buffers.end() && fl_it->first == flush_id; ) + { + if (!--(*fl_it->second)) // refcnt + { + free(fl_it->second); + } + flushed_buffers.erase(fl_it++); + } + for (auto dirty_it = find_dirty(op->inode, op->offset); + dirty_it != dirty_buffers.end() && dirty_it->first.inode == op->inode && + dirty_it->first.stripe < op->offset+op->len; dirty_it++) + { + if (dirty_it->second.flush_id == flush_id && dirty_it->second.state == CACHE_REPEATING) + { + dirty_it->second.flush_id = 0; + dirty_it->second.state = CACHE_WRITTEN; + } + } + delete op; + writebacks_active--; + // We can't call execute_internal because it affects an invalid copy of the list here + // (erase_op remembers `next` after writeback callback) + }; + if (is_writeback) + { + cli->execute_internal(op); + } + else + { + // Insert repeated flushes into the beginning + cli->unshift_op(op); + cli->continue_rw(op); + } +} + +void writeback_cache_t::start_writebacks(cluster_client_t *cli, int count) +{ + if (!writeback_queue.size()) + { + return; + } + std::vector queue_copy; + queue_copy.swap(writeback_queue); + int started = 0, i = 0; + for (i = 0; i < queue_copy.size() && (!count || started < count); i++) + { + object_id & req = queue_copy[i]; + auto dirty_it = find_dirty(req.inode, req.stripe); + if (dirty_it == dirty_buffers.end() || + dirty_it->first.inode != req.inode || + dirty_it->second.state != CACHE_DIRTY) + { + continue; + } + auto from_it = dirty_it; + uint64_t off = dirty_it->first.stripe; + while (from_it != dirty_buffers.begin()) + { + from_it--; + if (from_it->second.state != CACHE_DIRTY || + from_it->first.inode != req.inode || + from_it->first.stripe+from_it->second.len != off) + { + from_it++; + break; + } + off = from_it->first.stripe; + } + off = dirty_it->first.stripe + dirty_it->second.len; + auto to_it = dirty_it; + to_it++; + while (to_it != dirty_buffers.end()) + { + if (to_it->second.state != CACHE_DIRTY || + to_it->first.inode != req.inode || + to_it->first.stripe != off) + { + break; + } + off = to_it->first.stripe + to_it->second.len; + to_it++; + } + started++; + assert(writeback_queue_size > 0); + writeback_queue_size--; + writeback_bytes -= off - from_it->first.stripe; + flush_buffers(cli, from_it, to_it); + } + queue_copy.erase(queue_copy.begin(), queue_copy.begin()+i); + if (writeback_queue.size()) + { + queue_copy.insert(queue_copy.end(), writeback_queue.begin(), writeback_queue.end()); + } + queue_copy.swap(writeback_queue); +} + +static void copy_to_op(cluster_op_t *op, uint64_t offset, uint8_t *buf, uint64_t len, uint32_t bitmap_granularity) +{ + if (op->opcode == OSD_OP_READ) + { + // Not OSD_OP_READ_BITMAP or OSD_OP_READ_CHAIN_BITMAP + int iov_idx = 0; + uint64_t cur_offset = op->offset; + while (iov_idx < op->iov.count && cur_offset+op->iov.buf[iov_idx].iov_len <= offset) + { + cur_offset += op->iov.buf[iov_idx].iov_len; + iov_idx++; + } + while (iov_idx < op->iov.count && cur_offset < offset+len) + { + auto & v = op->iov.buf[iov_idx]; + auto begin = (cur_offset < offset ? offset : cur_offset); + auto end = (cur_offset+v.iov_len > offset+len ? offset+len : cur_offset+v.iov_len); + memcpy( + v.iov_base + begin - cur_offset, + buf + (cur_offset <= offset ? 0 : cur_offset-offset), + end - begin + ); + cur_offset += v.iov_len; + iov_idx++; + } + } + // Set bitmap bits + int start_bit = (offset-op->offset)/bitmap_granularity; + int end_bit = (offset-op->offset+len)/bitmap_granularity; + for (int bit = start_bit; bit < end_bit;) + { + if (!(bit%8) && bit <= end_bit-8) + { + ((uint8_t*)op->bitmap_buf)[bit/8] = 0xFF; + bit += 8; + } + else + { + ((uint8_t*)op->bitmap_buf)[bit/8] |= (1 << (bit%8)); + bit++; + } + } +} + +bool writeback_cache_t::read_from_cache(cluster_op_t *op, uint32_t bitmap_granularity) +{ + bool dirty_copied = false; + if (dirty_buffers.size() && (op->opcode == OSD_OP_READ || + op->opcode == OSD_OP_READ_BITMAP || op->opcode == OSD_OP_READ_CHAIN_BITMAP)) + { + // We also have to return reads from CACHE_REPEATING buffers - they are not + // guaranteed to be present on target OSDs at the moment of repeating + // And we're also free to return data from other cached buffers just + // because it's faster + auto dirty_it = find_dirty(op->cur_inode, op->offset); + while (dirty_it != dirty_buffers.end() && dirty_it->first.inode == op->cur_inode && + dirty_it->first.stripe < op->offset+op->len) + { + uint64_t begin = dirty_it->first.stripe, end = dirty_it->first.stripe + dirty_it->second.len; + if (begin < op->offset) + begin = op->offset; + if (end > op->offset+op->len) + end = op->offset+op->len; + bool skip_prev = true; + uint64_t cur = begin, prev = begin; + while (cur < end) + { + unsigned bmp_loc = (cur - op->offset)/bitmap_granularity; + bool skip = (((*((uint8_t*)op->bitmap_buf + bmp_loc/8)) >> (bmp_loc%8)) & 0x1); + if (skip_prev != skip) + { + if (cur > prev && !skip) + { + // Copy data + dirty_copied = true; + copy_to_op(op, prev, dirty_it->second.buf + prev - dirty_it->first.stripe, cur-prev, bitmap_granularity); + } + skip_prev = skip; + prev = cur; + } + cur += bitmap_granularity; + } + assert(cur > prev); + if (!skip_prev) + { + // Copy data + dirty_copied = true; + copy_to_op(op, prev, dirty_it->second.buf + prev - dirty_it->first.stripe, cur-prev, bitmap_granularity); + } + dirty_it++; + } + } + return dirty_copied; +} + +void writeback_cache_t::fsync_start() +{ + for (auto & prev_op: dirty_buffers) + { + if (prev_op.second.state == CACHE_WRITTEN) + { + prev_op.second.state = CACHE_FLUSHING; + } + } +} + +void writeback_cache_t::fsync_error() +{ + for (auto & prev_op: dirty_buffers) + { + if (prev_op.second.state == CACHE_FLUSHING) + { + prev_op.second.state = CACHE_WRITTEN; + } + } +} + +void writeback_cache_t::fsync_ok() +{ + for (auto uw_it = dirty_buffers.begin(); uw_it != dirty_buffers.end(); ) + { + if (uw_it->second.state == CACHE_FLUSHING) + { + if (!--(*uw_it->second.refcnt)) + free(uw_it->second.refcnt); + dirty_buffers.erase(uw_it++); + } + else + uw_it++; + } +} diff --git a/src/fio_cluster.cpp b/src/fio_cluster.cpp index 87739c1f..6626cecb 100644 --- a/src/fio_cluster.cpp +++ b/src/fio_cluster.cpp @@ -24,6 +24,7 @@ #include #include +#include #include "vitastor_c.h" #include "fio_headers.h" @@ -203,6 +204,15 @@ static void watch_callback(void *opaque, long watch) bsd->watch = (void*)watch; } +static void opt_push(std::vector & options, const char *opt, const char *value) +{ + if (value) + { + options.push_back(strdup(opt)); + options.push_back(strdup(value)); + } +} + static int sec_setup(struct thread_data *td) { sec_options *o = (sec_options*)td->eo; @@ -254,8 +264,27 @@ static int sec_setup(struct thread_data *td) { o->inode = 0; } - bsd->cli = vitastor_c_create_uring(o->config_path, o->etcd_host, o->etcd_prefix, - o->use_rdma, o->rdma_device, o->rdma_port_num, o->rdma_gid_index, o->rdma_mtu, o->cluster_log); + std::vector options; + opt_push(options, "config_path", o->config_path); + opt_push(options, "etcd_address", o->etcd_host); + opt_push(options, "etcd_prefix", o->etcd_prefix); + if (o->use_rdma != -1) + opt_push(options, "use_rdma", std::to_string(o->use_rdma).c_str()); + opt_push(options, "rdma_device", o->rdma_device); + if (o->rdma_port_num) + opt_push(options, "rdma_port_num", std::to_string(o->rdma_port_num).c_str()); + if (o->rdma_gid_index) + opt_push(options, "rdma_gid_index", std::to_string(o->rdma_gid_index).c_str()); + if (o->rdma_mtu) + opt_push(options, "rdma_mtu", std::to_string(o->rdma_mtu).c_str()); + if (o->cluster_log) + opt_push(options, "log_level", std::to_string(o->cluster_log).c_str()); + // allow writeback caching if -direct is not set + opt_push(options, "client_writeback_allowed", td->o.odirect ? "0" : "1"); + bsd->cli = vitastor_c_create_uring_json((const char**)options.data(), options.size()); + for (auto opt: options) + free(opt); + options.clear(); if (o->image) { bsd->watch = NULL; diff --git a/src/mock/messenger.cpp b/src/mock/messenger.cpp index 2d5004b5..32150cef 100644 --- a/src/mock/messenger.cpp +++ b/src/mock/messenger.cpp @@ -55,3 +55,10 @@ json11::Json::object osd_messenger_t::merge_configs(const json11::Json::object & { return cli_config; } + +bool json_is_true(const json11::Json & val) +{ + if (val.is_string()) + return val == "true" || val == "yes" || val == "1"; + return val.bool_value(); +} diff --git a/src/mock/ringloop.h b/src/mock/ringloop.h index 37b29050..6ed6ed0b 100644 --- a/src/mock/ringloop.h +++ b/src/mock/ringloop.h @@ -22,4 +22,10 @@ public: void submit() { } + void wait() + { + } + void loop() + { + } }; diff --git a/src/nbd_proxy.cpp b/src/nbd_proxy.cpp index c333b54b..c87592ca 100644 --- a/src/nbd_proxy.cpp +++ b/src/nbd_proxy.cpp @@ -341,6 +341,7 @@ public: ringloop->loop(); ringloop->wait(); } + cli->flush(); delete cli; delete epmgr; delete ringloop; diff --git a/src/nfs_proxy.cpp b/src/nfs_proxy.cpp index a0c3bb53..04a64291 100644 --- a/src/nfs_proxy.cpp +++ b/src/nfs_proxy.cpp @@ -34,7 +34,10 @@ nfs_proxy_t::~nfs_proxy_t() if (cmd) delete cmd; if (cli) + { + cli->flush(); delete cli; + } if (epmgr) delete epmgr; if (ringloop) @@ -261,16 +264,8 @@ void nfs_proxy_t::run(json11::Json cfg) ringloop->loop(); ringloop->wait(); } - /*// Sync at the end - cluster_op_t *close_sync = new cluster_op_t; - close_sync->opcode = OSD_OP_SYNC; - close_sync->callback = [&stop](cluster_op_t *op) - { - stop = true; - delete op; - }; - cli->execute(close_sync);*/ // Destroy the client + cli->flush(); delete cli; delete epmgr; delete ringloop; diff --git a/src/qemu_driver.c b/src/qemu_driver.c index 28eaf9b5..edc42432 100644 --- a/src/qemu_driver.c +++ b/src/qemu_driver.c @@ -388,6 +388,43 @@ static void vitastor_aio_set_fd_handler(void *vcli, int fd, int unused1, IOHandl ); } +typedef struct str_array +{ + const char **items; + int len, alloc; +} str_array; + +static void strarray_push(str_array *a, const char *str) +{ + if (a->len >= a->alloc) + { + a->alloc = !a->alloc ? 4 : 2*a->alloc; + a->items = (const char**)realloc(a->items, a->alloc*sizeof(char*)); + if (!a->items) + { + fprintf(stderr, "bad alloc\n"); + abort(); + } + } + a->items[a->len++] = str; +} + +static void strarray_push_kv(str_array *a, const char *key, const char *value) +{ + if (key && value) + { + strarray_push(a, key); + strarray_push(a, value); + } +} + +static void strarray_free(str_array *a) +{ + free(a->items); + a->items = NULL; + a->len = a->alloc = 0; +} + static int vitastor_file_open(BlockDriverState *bs, QDict *options, int flags, Error **errp) { VitastorRPC task; @@ -406,23 +443,19 @@ static int vitastor_file_open(BlockDriverState *bs, QDict *options, int flags, E client->rdma_gid_index = qdict_get_try_int(options, "rdma-gid-index", 0); client->rdma_mtu = qdict_get_try_int(options, "rdma-mtu", 0); client->ctx = bdrv_get_aio_context(bs); -#if defined VITASTOR_C_API_VERSION && VITASTOR_C_API_VERSION >= 2 - client->proxy = vitastor_c_create_qemu_uring( - vitastor_aio_set_fd_handler, client, client->config_path, client->etcd_host, client->etcd_prefix, - client->use_rdma, client->rdma_device, client->rdma_port_num, client->rdma_gid_index, client->rdma_mtu, 0 - ); - if (!client->proxy) - { - fprintf(stderr, "vitastor: failed to create io_uring: %s - I/O will be slower\n", strerror(errno)); - client->uring_eventfd = -1; -#endif - client->proxy = vitastor_c_create_qemu( - vitastor_aio_set_fd_handler, client, client->config_path, client->etcd_host, client->etcd_prefix, - client->use_rdma, client->rdma_device, client->rdma_port_num, client->rdma_gid_index, client->rdma_mtu, 0 - ); -#if defined VITASTOR_C_API_VERSION && VITASTOR_C_API_VERSION >= 2 - } - else + str_array opt = {}; + strarray_push_kv(&opt, "config_path", qdict_get_try_str(options, "config-path")); + strarray_push_kv(&opt, "etcd_address", qdict_get_try_str(options, "etcd-host")); + strarray_push_kv(&opt, "etcd_prefix", qdict_get_try_str(options, "etcd-prefix")); + strarray_push_kv(&opt, "use_rdma", qdict_get_try_str(options, "use-rdma")); + strarray_push_kv(&opt, "rdma_device", qdict_get_try_str(options, "rdma-device")); + strarray_push_kv(&opt, "rdma_port_num", qdict_get_try_str(options, "rdma-port-num")); + strarray_push_kv(&opt, "rdma_gid_index", qdict_get_try_str(options, "rdma-gid-index")); + strarray_push_kv(&opt, "rdma_mtu", qdict_get_try_str(options, "rdma-mtu")); + strarray_push_kv(&opt, "client_writeback_allowed", (flags & BDRV_O_NOCACHE) ? "0" : "1"); + client->proxy = vitastor_c_create_uring_json(opt.items, opt.len); + strarray_free(&opt); + if (client->proxy) { client->uring_eventfd = vitastor_c_uring_register_eventfd(client->proxy); if (client->uring_eventfd < 0) @@ -434,7 +467,45 @@ static int vitastor_file_open(BlockDriverState *bs, QDict *options, int flags, E } universal_aio_set_fd_handler(client->ctx, client->uring_eventfd, vitastor_uring_handler, NULL, client); } + else + { + // Writeback cache is unusable without io_uring because the client can't correctly flush on exit + fprintf(stderr, "vitastor: failed to create io_uring: %s - I/O will be slower%s\n", + strerror(errno), (flags & BDRV_O_NOCACHE ? "" : " and writeback cache will be disabled")); + client->uring_eventfd = -1; +#if defined VITASTOR_C_API_VERSION && VITASTOR_C_API_VERSION >= 2 + client->proxy = vitastor_c_create_qemu_uring( + vitastor_aio_set_fd_handler, client, client->config_path, client->etcd_host, client->etcd_prefix, + client->use_rdma, client->rdma_device, client->rdma_port_num, client->rdma_gid_index, client->rdma_mtu, 0 + ); + if (!client->proxy) + { + fprintf(stderr, "vitastor: failed to create io_uring: %s - I/O will be slower\n", strerror(errno)); + client->uring_eventfd = -1; + client->proxy = vitastor_c_create_qemu( + vitastor_aio_set_fd_handler, client, client->config_path, client->etcd_host, client->etcd_prefix, + client->use_rdma, client->rdma_device, client->rdma_port_num, client->rdma_gid_index, client->rdma_mtu, 0 + ); + } + else + { + client->uring_eventfd = vitastor_c_uring_register_eventfd(client->proxy); + if (client->uring_eventfd < 0) + { + fprintf(stderr, "vitastor: failed to create io_uring eventfd: %s\n", strerror(errno)); + error_setg(errp, "failed to create io_uring eventfd"); + vitastor_close(bs); + return -1; + } + universal_aio_set_fd_handler(client->ctx, client->uring_eventfd, vitastor_uring_handler, NULL, client); + } +#else + client->proxy = vitastor_c_create_qemu( + vitastor_aio_set_fd_handler, client, client->config_path, client->etcd_host, client->etcd_prefix, + client->use_rdma, client->rdma_device, client->rdma_port_num, client->rdma_gid_index, client->rdma_mtu, 0 + ); #endif + } image = client->image = g_strdup(qdict_get_try_str(options, "image")); client->readonly = (flags & BDRV_O_RDWR) ? 1 : 0; // Get image metadata (size and readonly flag) or just wait until the client is ready diff --git a/src/test_cluster_client.cpp b/src/test_cluster_client.cpp index 3865d7d4..f7ed646a 100644 --- a/src/test_cluster_client.cpp +++ b/src/test_cluster_client.cpp @@ -4,7 +4,7 @@ #include #include #include -#include "cluster_client.h" +#include "cluster_client_impl.h" void configure_single_pg_pool(cluster_client_t *cli) { @@ -47,11 +47,11 @@ void configure_single_pg_pool(cluster_client_t *cli) cli->st_cli.on_change_hook(changes); } -int *test_write(cluster_client_t *cli, uint64_t offset, uint64_t len, uint8_t c, std::function cb = NULL) +int *test_write(cluster_client_t *cli, uint64_t offset, uint64_t len, uint8_t c, std::function cb = NULL, bool instant = false) { printf("Post write %lx+%lx\n", offset, len); int *r = new int; - *r = -1; + *r = instant ? -2 : -1; cluster_op_t *op = new cluster_op_t(); op->opcode = OSD_OP_WRITE; op->inode = 0x1000000000001; @@ -72,6 +72,13 @@ int *test_write(cluster_client_t *cli, uint64_t offset, uint64_t len, uint8_t c, cb(); }; cli->execute(op); + if (instant) + { + long res = *r; + assert(*r >= 0); + delete r; + return (int*)res; + } return r; } @@ -160,6 +167,13 @@ osd_op_t *find_op(cluster_client_t *cli, osd_num_t osd_num, uint64_t opcode, uin } op_it++; } + op_it = cli->msgr.clients[peer_fd]->sent_ops.begin(); + while (op_it != cli->msgr.clients[peer_fd]->sent_ops.end()) + { + printf("Found opcode %lu offset %lx size %x\n", op_it->second->req.hdr.opcode, op_it->second->req.rw.offset, op_it->second->req.rw.len); + op_it++; + } + printf("Not found opcode %lu offset %lx size %lx\n", opcode, offset, len); return NULL; } @@ -341,7 +355,7 @@ void test1() void test2() { - std::map unsynced_writes; + writeback_cache_t *wb = new writeback_cache_t(); cluster_op_t *op = new cluster_op_t(); op->opcode = OSD_OP_WRITE; op->inode = 1; @@ -350,19 +364,19 @@ void test2() op->iov.push_back(malloc_or_die(4096*1024), 4096); // 0-4k = 0x55 memset(op->iov.buf[0].iov_base, 0x55, op->iov.buf[0].iov_len); - cluster_client_t::copy_write(op, unsynced_writes); + wb->copy_write(op, CACHE_WRITTEN); // 8k-12k = 0x66 op->offset = 8192; memset(op->iov.buf[0].iov_base, 0x66, op->iov.buf[0].iov_len); - cluster_client_t::copy_write(op, unsynced_writes); + wb->copy_write(op, CACHE_WRITTEN); // 4k-1M+4k = 0x77 op->len = op->iov.buf[0].iov_len = 1048576; op->offset = 4096; memset(op->iov.buf[0].iov_base, 0x77, op->iov.buf[0].iov_len); - cluster_client_t::copy_write(op, unsynced_writes); + wb->copy_write(op, CACHE_WRITTEN); // check it - assert(unsynced_writes.size() == 4); - auto uit = unsynced_writes.begin(); + assert(wb->dirty_buffers.size() == 2); + auto uit = wb->dirty_buffers.begin(); int i; assert(uit->first.inode == 1); assert(uit->first.stripe == 0); @@ -372,35 +386,106 @@ void test2() uit++; assert(uit->first.inode == 1); assert(uit->first.stripe == 4096); - assert(uit->second.len == 4096); - for (i = 0; i < uit->second.len && ((uint8_t*)uit->second.buf)[i] == 0x77; i++) {} - assert(i == uit->second.len); - uit++; - assert(uit->first.inode == 1); - assert(uit->first.stripe == 8192); - assert(uit->second.len == 4096); - for (i = 0; i < uit->second.len && ((uint8_t*)uit->second.buf)[i] == 0x77; i++) {} - assert(i == uit->second.len); - uit++; - assert(uit->first.inode == 1); - assert(uit->first.stripe == 12*1024); - assert(uit->second.len == 1016*1024); + assert(uit->second.len == 1048576); for (i = 0; i < uit->second.len && ((uint8_t*)uit->second.buf)[i] == 0x77; i++) {} assert(i == uit->second.len); uit++; // free memory free(op->iov.buf[0].iov_base); delete op; - for (auto p: unsynced_writes) - { - free(p.second.buf); - } + delete wb; printf("[ok] copy_write test\n"); } +void test_writeback() +{ + json11::Json config = json11::Json::object { + { "client_enable_writeback", true }, + { "client_writeback_allowed", true }, + { "client_max_buffered_bytes", 1024*1024 }, + { "client_max_buffered_ops", 2 }, + { "client_max_writeback_iodepth", 2 }, + { "client_max_dirty_bytes", 1024*1024 }, + { "client_max_dirty_ops", 2 }, + }; + timerfd_manager_t *tfd = new timerfd_manager_t([](int fd, bool wr, std::function callback){}); + cluster_client_t *cli = new cluster_client_t(NULL, tfd, config); + + configure_single_pg_pool(cli); + pretend_connected(cli, 1); + + // Check that 3 consecutive writes are merged by writeback + assert((long)test_write(cli, 0, 4096, 0x55, NULL, true) == 1); + check_op_count(cli, 1, 0); + assert((long)test_write(cli, 4096, 4096, 0x55, NULL, true) == 1); + check_op_count(cli, 1, 0); + assert((long)test_write(cli, 8192, 4096, 0x55, NULL, true) == 1); + check_op_count(cli, 1, 0); + + assert((long)test_write(cli, 1024*1024, 4096, 0x66, NULL, true) == 1); + check_op_count(cli, 1, 0); + + // 3rd and 4th writes should trigger 1 writeback each + assert((long)test_write(cli, 2*1024*1024, 4096, 0x66, NULL, true) == 1); + check_op_count(cli, 1, 1); + assert((long)test_write(cli, 3*1024*1024, 4096, 0x66, NULL, true) == 1); + check_op_count(cli, 1, 2); + + // 5th write should be postponed until at least 1 writeback is completed + int *r1 = test_write(cli, 4*1024*1024, 4096, 0x67, NULL); + check_op_count(cli, 1, 2); + can_complete(r1); + pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 3*4096), 0); + check_completed(r1); + // autosync because max_dirty_ops=2, flush waits for sync + check_op_count(cli, 1, 1); + pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 1024*1024, 4096), 0); + check_op_count(cli, 1, 1); + pretend_op_completed(cli, find_op(cli, 1, OSD_OP_SYNC, 0, 0), 0); + check_op_count(cli, 1, 1); + pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 2*1024*1024, 4096), 0); + check_op_count(cli, 1, 0); + + int *r2 = test_sync(cli); + check_op_count(cli, 1, 1); + pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 3*1024*1024, 4096), 0); + check_op_count(cli, 1, 1); + // autosync because max_dirty_ops=2, flush waits for sync + pretend_op_completed(cli, find_op(cli, 1, OSD_OP_SYNC, 0, 0), 0); + check_op_count(cli, 1, 1); + pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 4*1024*1024, 4096), 0); + check_op_count(cli, 1, 1); + can_complete(r2); + pretend_op_completed(cli, find_op(cli, 1, OSD_OP_SYNC, 0, 0), 0); + check_completed(r2); + + // Check cutting of the beginning and end + assert((long)test_write(cli, 0, 32768, 0x55, NULL, true) == 1); + check_op_count(cli, 1, 0); + assert((long)test_write(cli, 32768, 32768, 0x56, NULL, true) == 1); + check_op_count(cli, 1, 0); + assert((long)test_write(cli, 16384, 32768, 0x57, NULL, true) == 1); + check_op_count(cli, 1, 0); + assert((long)test_write(cli, 16384+4096, 32768-4096, 0x58, NULL, true) == 1); + check_op_count(cli, 1, 0); + r2 = test_sync(cli); + check_op_count(cli, 1, 1); + pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 65536), 0); + check_op_count(cli, 1, 1); + can_complete(r2); + pretend_op_completed(cli, find_op(cli, 1, OSD_OP_SYNC, 0, 0), 0); + check_completed(r2); + + // Free client + delete cli; + delete tfd; + printf("[ok] writeback test\n"); +} + int main(int narg, char *args[]) { test1(); test2(); + test_writeback(); return 0; }