Implement client writeback cache

- Disabled by default, enable with client_enable_writeback=true
- Even then only enabled in FIO when -direct is disabled and in QEMU when
  block device cache is enabled in settings
- Can also be enabled in other clients like vitastor-cli using parameter
  client_writeback_allowed=true, but not recommended
fsync-feedback
Vitaliy Filippov 2023-08-12 20:26:40 +03:00
parent cd543a90bc
commit 38db53f5ee
14 changed files with 999 additions and 360 deletions

View File

@ -78,9 +78,15 @@ const etcd_tree = {
disk_alignment: 4096,
bitmap_granularity: 4096,
immediate_commit: false, // 'all' or 'small'
// client - configurable online
client_max_dirty_bytes: 33554432,
client_max_dirty_ops: 1024,
client_enable_writeback: false,
client_max_buffered_bytes: 33554432,
client_max_buffered_ops: 1024,
client_max_writeback_iodepth: 256,
// client and osd - configurable online
log_level: 0,
client_dirty_limit: 33554432,
peer_connect_interval: 5, // seconds. min: 1
peer_connect_timeout: 5, // seconds. min: 1
osd_idle_timeout: 5, // seconds. min: 1

View File

@ -137,6 +137,7 @@ endif (${WITH_FIO})
add_library(vitastor_client SHARED
cluster_client.cpp
cluster_client_list.cpp
cluster_client_wb.cpp
vitastor_c.cpp
cli_common.cpp
cli_alloc_osd.cpp
@ -300,7 +301,7 @@ target_link_libraries(test_crc32
add_executable(test_cluster_client
EXCLUDE_FROM_ALL
test_cluster_client.cpp
pg_states.cpp osd_ops.cpp cluster_client.cpp cluster_client_list.cpp msgr_op.cpp mock/messenger.cpp msgr_stop.cpp
pg_states.cpp osd_ops.cpp cluster_client.cpp cluster_client_list.cpp cluster_client_wb.cpp msgr_op.cpp mock/messenger.cpp msgr_stop.cpp
etcd_state_client.cpp timerfd_manager.cpp str_util.cpp ../json11/json11.cpp
)
target_compile_definitions(test_cluster_client PUBLIC -D__MOCK__)

View File

@ -349,6 +349,7 @@ static int run(cli_tool_t *p, json11::Json::object cfg)
p->ringloop->wait();
}
// Destroy the client
p->cli->flush();
delete p->cli;
delete p->epmgr;
delete p->ringloop;

View File

@ -3,21 +3,13 @@
#include <stdexcept>
#include <assert.h>
#include "cluster_client.h"
#define SCRAP_BUFFER_SIZE 4*1024*1024
#define PART_SENT 1
#define PART_DONE 2
#define PART_ERROR 4
#define PART_RETRY 8
#define CACHE_DIRTY 1
#define CACHE_FLUSHING 2
#define CACHE_REPEATING 3
#define OP_FLUSH_BUFFER 0x02
#define OP_IMMEDIATE_COMMIT 0x04
#include "cluster_client_impl.h"
#include "http_client.h" // json_is_true
cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd, json11::Json & config)
{
wb = new writeback_cache_t();
cli_config = config.object_items();
file_config = osd_messenger_t::read_config(config);
config = osd_messenger_t::merge_configs(cli_config, file_config, etcd_global_config, {});
@ -37,30 +29,14 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd
continue_lists();
continue_raw_ops(peer_osd);
}
else if (dirty_buffers.size())
else
{
// peer_osd just dropped connection
// determine WHICH dirty_buffers are now obsolete and repeat them
for (auto wr_it = dirty_buffers.begin(), flush_it = wr_it, last_it = wr_it; ; )
if (wb->repeat_ops_for(this, peer_osd) > 0)
{
bool end = wr_it == dirty_buffers.end();
bool flush_this = !end && wr_it->second.state != CACHE_REPEATING &&
affects_osd(wr_it->first.inode, wr_it->first.stripe, wr_it->second.len, peer_osd);
if (flush_it != wr_it && (end || !flush_this ||
wr_it->first.inode != flush_it->first.inode ||
wr_it->first.stripe != last_it->first.stripe+last_it->second.len))
{
flush_buffers(flush_it, wr_it);
flush_it = wr_it;
}
if (end)
break;
last_it = wr_it;
wr_it++;
if (!flush_this)
flush_it = wr_it;
continue_ops();
}
continue_ops();
}
};
msgr.exec_op = [this](osd_op_t *op)
@ -88,16 +64,14 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd
cluster_client_t::~cluster_client_t()
{
for (auto bp: dirty_buffers)
{
free(bp.second.buf);
}
dirty_buffers.clear();
msgr.repeer_pgs = [this](osd_num_t){};
if (ringloop)
{
ringloop->unregister_consumer(&consumer);
}
free(scrap_buffer);
delete wb;
wb = NULL;
}
cluster_op_t::~cluster_op_t()
@ -146,6 +120,19 @@ void cluster_client_t::init_msgr()
}
}
void cluster_client_t::unshift_op(cluster_op_t *op)
{
op->next = op_queue_head;
if (op_queue_head)
{
op_queue_head->prev = op;
op_queue_head = op;
}
else
op_queue_tail = op_queue_head = op;
inc_wait(op->opcode, op->flags, op->next, 1);
}
void cluster_client_t::calc_wait(cluster_op_t *op)
{
op->prev_wait = 0;
@ -166,7 +153,7 @@ void cluster_client_t::calc_wait(cluster_op_t *op)
{
for (auto prev = op->prev; prev; prev = prev->prev)
{
if (prev->opcode == OSD_OP_SYNC || prev->opcode == OSD_OP_WRITE && !(prev->flags & OP_IMMEDIATE_COMMIT))
if (prev->opcode == OSD_OP_SYNC || prev->opcode == OSD_OP_WRITE && (!(prev->flags & OP_IMMEDIATE_COMMIT) || enable_writeback))
{
op->prev_wait++;
}
@ -187,7 +174,7 @@ void cluster_client_t::inc_wait(uint64_t opcode, uint64_t flags, cluster_op_t *n
while (next)
{
auto n2 = next->next;
if (next->opcode == OSD_OP_SYNC && !(flags & OP_IMMEDIATE_COMMIT) ||
if (next->opcode == OSD_OP_SYNC && (!(flags & OP_IMMEDIATE_COMMIT) || enable_writeback) ||
next->opcode == OSD_OP_WRITE && (flags & OP_FLUSH_BUFFER) && !(next->flags & OP_FLUSH_BUFFER))
{
next->prev_wait += inc;
@ -239,13 +226,37 @@ void cluster_client_t::erase_op(cluster_op_t *op)
op_queue_tail = op->prev;
op->next = op->prev = NULL;
if (flags & OP_FLUSH_BUFFER)
{
// Completed flushes change writeback buffer states,
// so the callback should be run before inc_wait()
// which may continue following SYNCs, but these SYNCs
// should know about the changed buffer state
// This is ugly but this is the way we do it
std::function<void(cluster_op_t*)>(op->callback)(op);
if (!(flags & OP_IMMEDIATE_COMMIT))
}
if (!(flags & OP_IMMEDIATE_COMMIT) || enable_writeback)
{
inc_wait(opcode, flags, next, -1);
// Call callback at the end to avoid inconsistencies in prev_wait
// if the callback adds more operations itself
}
if (!(flags & OP_FLUSH_BUFFER))
{
// Call callback at the end to avoid inconsistencies in prev_wait
// if the callback adds more operations itself
std::function<void(cluster_op_t*)>(op->callback)(op);
}
if (flags & OP_FLUSH_BUFFER)
{
int i = 0;
while (i < wb->writeback_overflow.size() && wb->writebacks_active < client_max_writeback_iodepth)
{
execute_internal(wb->writeback_overflow[i]);
i++;
}
if (i > 0)
{
wb->writeback_overflow.erase(wb->writeback_overflow.begin(), wb->writeback_overflow.begin()+i);
}
}
}
void cluster_client_t::continue_ops(bool up_retry)
@ -289,6 +300,7 @@ void cluster_client_t::on_load_config_hook(json11::Json::object & etcd_global_co
{
this->etcd_global_config = etcd_global_config;
config = osd_messenger_t::merge_configs(cli_config, file_config, etcd_global_config, {});
// client_max_dirty_bytes/client_dirty_limit
if (config.find("client_max_dirty_bytes") != config.end())
{
client_max_dirty_bytes = config["client_max_dirty_bytes"].uint64_value();
@ -304,11 +316,34 @@ void cluster_client_t::on_load_config_hook(json11::Json::object & etcd_global_co
{
client_max_dirty_bytes = DEFAULT_CLIENT_MAX_DIRTY_BYTES;
}
// client_max_dirty_ops
client_max_dirty_ops = config["client_max_dirty_ops"].uint64_value();
if (!client_max_dirty_ops)
{
client_max_dirty_ops = DEFAULT_CLIENT_MAX_DIRTY_OPS;
}
// client_enable_writeback
enable_writeback = json_is_true(config["client_enable_writeback"]) &&
json_is_true(config["client_writeback_allowed"]);
// client_max_buffered_bytes
client_max_buffered_bytes = config["client_max_buffered_bytes"].uint64_value();
if (!client_max_buffered_bytes)
{
client_max_buffered_bytes = DEFAULT_CLIENT_MAX_BUFFERED_BYTES;
}
// client_max_buffered_ops
client_max_buffered_ops = config["client_max_buffered_ops"].uint64_value();
if (!client_max_buffered_ops)
{
client_max_buffered_ops = DEFAULT_CLIENT_MAX_BUFFERED_OPS;
}
// client_max_writeback_iodepth
client_max_writeback_iodepth = config["client_max_writeback_iodepth"].uint64_value();
if (!client_max_writeback_iodepth)
{
client_max_writeback_iodepth = DEFAULT_CLIENT_MAX_WRITEBACK_IODEPTH;
}
// up_wait_retry_interval
up_wait_retry_interval = config["up_wait_retry_interval"].uint64_value();
if (!up_wait_retry_interval)
{
@ -368,6 +403,8 @@ void cluster_client_t::on_change_hook(std::map<std::string, etcd_kv_t> & changes
bool cluster_client_t::get_immediate_commit(uint64_t inode)
{
if (enable_writeback)
return false;
pool_id_t pool_id = INODE_POOL(inode);
if (!pool_id)
return true;
@ -402,6 +439,41 @@ void cluster_client_t::on_ready(std::function<void(void)> fn)
}
}
bool cluster_client_t::flush()
{
if (!ringloop)
{
if (wb->writeback_queue.size())
{
wb->start_writebacks(this, 0);
cluster_op_t *sync = new cluster_op_t;
sync->opcode = OSD_OP_SYNC;
sync->callback = [this](cluster_op_t *sync)
{
delete sync;
};
execute(sync);
}
return op_queue_head == NULL;
}
bool sync_done = false;
cluster_op_t *sync = new cluster_op_t;
sync->opcode = OSD_OP_SYNC;
sync->callback = [this, &sync_done](cluster_op_t *sync)
{
delete sync;
sync_done = true;
};
execute(sync);
while (!sync_done)
{
ringloop->loop();
if (!sync_done)
ringloop->wait();
}
return true;
}
/**
* How writes are synced when immediate_commit is false
*
@ -422,6 +494,9 @@ void cluster_client_t::on_ready(std::function<void(void)> fn)
* 3) if yes, send all SYNCs. otherwise, leave current SYNC as is.
* 4) if any of them fail due to disconnected peers, repeat SYNC after repeating all writes
* 5) if any of them fail due to other errors, fail the SYNC operation
*
* If writeback caching is turned on and writeback limit is not exhausted:
* data is just copied and the write is confirmed to the client.
*/
void cluster_client_t::execute(cluster_op_t *op)
{
@ -437,36 +512,73 @@ void cluster_client_t::execute(cluster_op_t *op)
offline_ops.push_back(op);
return;
}
op->flags = op->flags & OSD_OP_IGNORE_READONLY; // the only allowed flag
execute_internal(op);
}
void cluster_client_t::execute_internal(cluster_op_t *op)
{
op->cur_inode = op->inode;
op->retval = 0;
op->flags = op->flags & OSD_OP_IGNORE_READONLY; // the only allowed flag
// check alignment, readonly flag and so on
if (!check_rw(op))
{
return;
}
if (op->opcode == OSD_OP_WRITE && enable_writeback && !(op->flags & OP_FLUSH_BUFFER) &&
!op->version /* FIXME no CAS writeback */)
{
if (wb->writebacks_active >= client_max_writeback_iodepth)
{
// Writeback queue is full, postpone the operation
wb->writeback_overflow.push_back(op);
return;
}
// Just copy and acknowledge the operation
wb->copy_write(op, CACHE_DIRTY);
while (wb->writeback_bytes + op->len > client_max_buffered_bytes || wb->writeback_queue_size > client_max_buffered_ops)
{
// Initiate some writeback (asynchronously)
wb->start_writebacks(this, 1);
}
op->retval = op->len;
std::function<void(cluster_op_t*)>(op->callback)(op);
return;
}
if (op->opcode == OSD_OP_WRITE && !(op->flags & OP_IMMEDIATE_COMMIT))
{
if (!(op->flags & OP_FLUSH_BUFFER))
{
copy_write(op, dirty_buffers);
wb->copy_write(op, CACHE_WRITTEN);
}
if (dirty_bytes >= client_max_dirty_bytes || dirty_ops >= client_max_dirty_ops)
{
// Push an extra SYNC operation to flush previous writes
cluster_op_t *sync_op = new cluster_op_t;
sync_op->opcode = OSD_OP_SYNC;
sync_op->flags = OP_FLUSH_BUFFER;
sync_op->callback = [](cluster_op_t* sync_op)
{
delete sync_op;
};
execute(sync_op);
execute_internal(sync_op);
}
dirty_bytes += op->len;
dirty_ops++;
}
else if (op->opcode == OSD_OP_SYNC)
{
// Flush the whole write-back queue first
if (!(op->flags & OP_FLUSH_BUFFER) && wb->writeback_overflow.size() > 0)
{
// Writeback queue is full, postpone the operation
wb->writeback_overflow.push_back(op);
return;
}
if (wb->writeback_queue.size())
{
wb->start_writebacks(this, 0);
}
dirty_bytes = 0;
dirty_ops = 0;
}
@ -478,7 +590,7 @@ void cluster_client_t::execute(cluster_op_t *op)
}
else
op_queue_tail = op_queue_head = op;
if (!(op->flags & OP_IMMEDIATE_COMMIT))
if (!(op->flags & OP_IMMEDIATE_COMMIT) || enable_writeback)
calc_wait(op);
else
{
@ -552,136 +664,6 @@ void cluster_client_t::execute_raw(osd_num_t osd_num, osd_op_t *op)
}
}
static std::map<object_id, cluster_buffer_t>::iterator find_dirty(uint64_t inode, uint64_t offset, std::map<object_id, cluster_buffer_t> & dirty_buffers)
{
auto dirty_it = dirty_buffers.lower_bound((object_id){
.inode = inode,
.stripe = offset,
});
while (dirty_it != dirty_buffers.begin())
{
dirty_it--;
if (dirty_it->first.inode != inode ||
(dirty_it->first.stripe + dirty_it->second.len) <= offset)
{
dirty_it++;
break;
}
}
return dirty_it;
}
void cluster_client_t::copy_write(cluster_op_t *op, std::map<object_id, cluster_buffer_t> & dirty_buffers)
{
// Save operation for replay when one of PGs goes out of sync
// (primary OSD drops our connection in this case)
auto dirty_it = find_dirty(op->inode, op->offset, dirty_buffers);
uint64_t pos = op->offset, len = op->len, iov_idx = 0, iov_pos = 0;
while (len > 0)
{
uint64_t new_len = 0;
if (dirty_it == dirty_buffers.end() || dirty_it->first.inode != op->inode)
{
new_len = len;
}
else if (dirty_it->first.stripe > pos)
{
new_len = dirty_it->first.stripe - pos;
if (new_len > len)
{
new_len = len;
}
}
if (new_len > 0)
{
dirty_it = dirty_buffers.emplace_hint(dirty_it, (object_id){
.inode = op->inode,
.stripe = pos,
}, (cluster_buffer_t){
.buf = malloc_or_die(new_len),
.len = new_len,
});
}
// FIXME: Split big buffers into smaller ones on overwrites. But this will require refcounting
dirty_it->second.state = CACHE_DIRTY;
uint64_t cur_len = (dirty_it->first.stripe + dirty_it->second.len - pos);
if (cur_len > len)
{
cur_len = len;
}
while (cur_len > 0 && iov_idx < op->iov.count)
{
unsigned iov_len = (op->iov.buf[iov_idx].iov_len - iov_pos);
if (iov_len <= cur_len)
{
memcpy((uint8_t*)dirty_it->second.buf + pos - dirty_it->first.stripe,
(uint8_t*)op->iov.buf[iov_idx].iov_base + iov_pos, iov_len);
pos += iov_len;
len -= iov_len;
cur_len -= iov_len;
iov_pos = 0;
iov_idx++;
}
else
{
memcpy((uint8_t*)dirty_it->second.buf + pos - dirty_it->first.stripe,
(uint8_t*)op->iov.buf[iov_idx].iov_base + iov_pos, cur_len);
pos += cur_len;
len -= cur_len;
iov_pos += cur_len;
cur_len = 0;
}
}
dirty_it++;
}
}
void cluster_client_t::flush_buffers(std::map<object_id, cluster_buffer_t>::iterator from_it,
std::map<object_id, cluster_buffer_t>::iterator to_it)
{
auto prev_it = std::prev(to_it);
cluster_op_t *op = new cluster_op_t;
op->flags = OSD_OP_IGNORE_READONLY|OP_FLUSH_BUFFER;
op->opcode = OSD_OP_WRITE;
op->cur_inode = op->inode = from_it->first.inode;
op->offset = from_it->first.stripe;
op->len = prev_it->first.stripe + prev_it->second.len - from_it->first.stripe;
uint32_t calc_len = 0;
uint64_t flush_id = ++last_flush_id;
for (auto it = from_it; it != to_it; it++)
{
it->second.state = CACHE_REPEATING;
it->second.flush_id = flush_id;
op->iov.push_back(it->second.buf, it->second.len);
calc_len += it->second.len;
}
assert(calc_len == op->len);
op->callback = [this, flush_id](cluster_op_t* op)
{
for (auto dirty_it = find_dirty(op->inode, op->offset, dirty_buffers);
dirty_it != dirty_buffers.end() && dirty_it->first.inode == op->inode &&
dirty_it->first.stripe < op->offset+op->len; dirty_it++)
{
if (dirty_it->second.flush_id == flush_id && dirty_it->second.state == CACHE_REPEATING)
{
dirty_it->second.flush_id = 0;
dirty_it->second.state = CACHE_DIRTY;
}
}
delete op;
};
op->next = op_queue_head;
if (op_queue_head)
{
op_queue_head->prev = op;
op_queue_head = op;
}
else
op_queue_tail = op_queue_head = op;
inc_wait(op->opcode, op->flags, op->next, 1);
continue_rw(op);
}
int cluster_client_t::continue_rw(cluster_op_t *op)
{
if (op->state == 0)
@ -785,7 +767,8 @@ resume_2:
erase_op(op);
return 1;
}
else if (op->retval != 0 && op->retval != -EPIPE && op->retval != -EIO && op->retval != -ENOSPC)
else if (op->retval != 0 && !(op->flags & OP_FLUSH_BUFFER) &&
op->retval != -EPIPE && op->retval != -EIO && op->retval != -ENOSPC)
{
// Fatal error (neither -EPIPE, -EIO nor -ENOSPC)
// FIXME: Add a parameter to allow to not wait for EIOs (incomplete or corrupted objects) to heal
@ -857,50 +840,6 @@ static void add_iov(int size, bool skip, cluster_op_t *op, int &iov_idx, size_t
}
}
static void copy_to_op(cluster_op_t *op, uint64_t offset, uint8_t *buf, uint64_t len, uint32_t bitmap_granularity)
{
if (op->opcode == OSD_OP_READ)
{
// Not OSD_OP_READ_BITMAP or OSD_OP_READ_CHAIN_BITMAP
int iov_idx = 0;
uint64_t cur_offset = op->offset;
while (iov_idx < op->iov.count && cur_offset+op->iov.buf[iov_idx].iov_len <= offset)
{
cur_offset += op->iov.buf[iov_idx].iov_len;
iov_idx++;
}
while (iov_idx < op->iov.count && cur_offset < offset+len)
{
auto & v = op->iov.buf[iov_idx];
auto begin = (cur_offset < offset ? offset : cur_offset);
auto end = (cur_offset+v.iov_len > offset+len ? offset+len : cur_offset+v.iov_len);
memcpy(
v.iov_base + begin - cur_offset,
buf + (cur_offset <= offset ? 0 : cur_offset-offset),
end - begin
);
cur_offset += v.iov_len;
iov_idx++;
}
}
// Set bitmap bits
int start_bit = (offset-op->offset)/bitmap_granularity;
int end_bit = (offset-op->offset+len)/bitmap_granularity;
for (int bit = start_bit; bit < end_bit;)
{
if (!(bit%8) && bit <= end_bit-8)
{
((uint8_t*)op->bitmap_buf)[bit/8] = 0xFF;
bit += 8;
}
else
{
((uint8_t*)op->bitmap_buf)[bit/8] |= (1 << (bit%8));
bit++;
}
}
}
void cluster_client_t::slice_rw(cluster_op_t *op)
{
// Slice the request into individual object stripe requests
@ -929,52 +868,11 @@ void cluster_client_t::slice_rw(cluster_op_t *op)
int iov_idx = 0;
size_t iov_pos = 0;
int i = 0;
bool dirty_copied = false;
if (dirty_buffers.size() && (op->opcode == OSD_OP_READ ||
op->opcode == OSD_OP_READ_BITMAP || op->opcode == OSD_OP_READ_CHAIN_BITMAP))
{
// We also have to return reads from CACHE_REPEATING buffers - they are not
// guaranteed to be present on target OSDs at the moment of repeating
// And we're also free to return data from other cached buffers just
// because it's faster
auto dirty_it = find_dirty(op->cur_inode, op->offset, dirty_buffers);
while (dirty_it != dirty_buffers.end() && dirty_it->first.inode == op->cur_inode &&
dirty_it->first.stripe < op->offset+op->len)
{
uint64_t begin = dirty_it->first.stripe, end = dirty_it->first.stripe + dirty_it->second.len;
if (begin < op->offset)
begin = op->offset;
if (end > op->offset+op->len)
end = op->offset+op->len;
bool skip_prev = true;
uint64_t cur = begin, prev = begin;
while (cur < end)
{
unsigned bmp_loc = (cur - op->offset)/pool_cfg.bitmap_granularity;
bool skip = (((*((uint8_t*)op->bitmap_buf + bmp_loc/8)) >> (bmp_loc%8)) & 0x1);
if (skip_prev != skip)
{
if (cur > prev && !skip)
{
// Copy data
dirty_copied = true;
copy_to_op(op, prev, (uint8_t*)dirty_it->second.buf + prev - dirty_it->first.stripe, cur-prev, pool_cfg.bitmap_granularity);
}
skip_prev = skip;
prev = cur;
}
cur += pool_cfg.bitmap_granularity;
}
assert(cur > prev);
if (!skip_prev)
{
// Copy data
dirty_copied = true;
copy_to_op(op, prev, (uint8_t*)dirty_it->second.buf + prev - dirty_it->first.stripe, cur-prev, pool_cfg.bitmap_granularity);
}
dirty_it++;
}
}
// We also have to return reads from CACHE_REPEATING buffers - they are not
// guaranteed to be present on target OSDs at the moment of repeating
// And we're also free to return data from other cached buffers just
// because it's faster
bool dirty_copied = wb->read_from_cache(op, pool_cfg.bitmap_granularity);
for (uint64_t stripe = first_stripe; stripe <= last_stripe; stripe += pg_block_size)
{
pg_num_t pg_num = (stripe/pool_cfg.pg_stripe_size) % pool_cfg.real_pg_count + 1; // like map_to_pg()
@ -1146,13 +1044,7 @@ int cluster_client_t::continue_sync(cluster_op_t *op)
do_it++;
}
// Post sync to affected OSDs
for (auto & prev_op: dirty_buffers)
{
if (prev_op.second.state == CACHE_DIRTY)
{
prev_op.second.state = CACHE_FLUSHING;
}
}
wb->fsync_start();
op->parts.resize(dirty_osds.size());
op->retval = 0;
{
@ -1177,13 +1069,7 @@ resume_1:
}
if (op->retval != 0)
{
for (auto uw_it = dirty_buffers.begin(); uw_it != dirty_buffers.end(); uw_it++)
{
if (uw_it->second.state == CACHE_FLUSHING)
{
uw_it->second.state = CACHE_DIRTY;
}
}
wb->fsync_error();
if (op->retval == -EPIPE || op->retval == -EIO || op->retval == -ENOSPC)
{
// Retry later
@ -1197,16 +1083,7 @@ resume_1:
}
else
{
for (auto uw_it = dirty_buffers.begin(); uw_it != dirty_buffers.end(); )
{
if (uw_it->second.state == CACHE_FLUSHING)
{
free(uw_it->second.buf);
dirty_buffers.erase(uw_it++);
}
else
uw_it++;
}
wb->fsync_ok();
}
erase_op(op);
return 1;

View File

@ -8,6 +8,9 @@
#define DEFAULT_CLIENT_MAX_DIRTY_BYTES 32*1024*1024
#define DEFAULT_CLIENT_MAX_DIRTY_OPS 1024
#define DEFAULT_CLIENT_MAX_BUFFERED_BYTES 32*1024*1024
#define DEFAULT_CLIENT_MAX_BUFFERED_OPS 1024
#define DEFAULT_CLIENT_MAX_WRITEBACK_IODEPTH 256
#define INODE_LIST_DONE 1
#define INODE_LIST_HAS_UNSTABLE 2
#define OSD_OP_READ_BITMAP OSD_OP_SEC_READ_BMP
@ -64,18 +67,12 @@ protected:
cluster_op_t *prev = NULL, *next = NULL;
int prev_wait = 0;
friend class cluster_client_t;
};
struct cluster_buffer_t
{
void *buf;
uint64_t len;
int state;
uint64_t flush_id;
friend class writeback_cache_t;
};
struct inode_list_t;
struct inode_list_osd_t;
class writeback_cache_t;
// FIXME: Split into public and private interfaces
class cluster_client_t
@ -84,17 +81,23 @@ class cluster_client_t
ring_loop_t *ringloop;
std::map<pool_id_t, uint64_t> pg_counts;
// FIXME: Implement inmemory_commit mode. Note that it requires to return overlapping reads from memory.
// client_max_dirty_* is actually "max unsynced", for the case when immediate_commit is off
uint64_t client_max_dirty_bytes = 0;
uint64_t client_max_dirty_ops = 0;
// writeback improves (1) small consecutive writes and (2) Q1 writes without fsync
bool enable_writeback = false;
// client_max_buffered_* is the real "dirty limit" - maximum amount of writes buffered in memory
uint64_t client_max_buffered_bytes = 0;
uint64_t client_max_buffered_ops = 0;
uint64_t client_max_writeback_iodepth = 0;
int log_level;
int up_wait_retry_interval = 500; // ms
int retry_timeout_id = 0;
std::vector<cluster_op_t*> offline_ops;
cluster_op_t *op_queue_head = NULL, *op_queue_tail = NULL;
std::map<object_id, cluster_buffer_t> dirty_buffers;
uint64_t last_flush_id = 0;
writeback_cache_t *wb = NULL;
std::set<osd_num_t> dirty_osds;
uint64_t dirty_bytes = 0, dirty_ops = 0;
@ -124,10 +127,10 @@ public:
void execute_raw(osd_num_t osd_num, osd_op_t *op);
bool is_ready();
void on_ready(std::function<void(void)> fn);
bool flush();
bool get_immediate_commit(uint64_t inode);
static void copy_write(cluster_op_t *op, std::map<object_id, cluster_buffer_t> & dirty_buffers);
void continue_ops(bool up_retry = false);
inode_list_t *list_inode_start(inode_t inode,
std::function<void(inode_list_t* lst, std::set<object_id>&& objects, pg_num_t pg_num, osd_num_t primary_osd, int status)> callback);
@ -140,12 +143,12 @@ public:
protected:
bool affects_osd(uint64_t inode, uint64_t offset, uint64_t len, osd_num_t osd);
void flush_buffers(std::map<object_id, cluster_buffer_t>::iterator from_it,
std::map<object_id, cluster_buffer_t>::iterator to_it);
void on_load_config_hook(json11::Json::object & config);
void on_load_pgs_hook(bool success);
void on_change_hook(std::map<std::string, etcd_kv_t> & changes);
void on_change_osd_state_hook(uint64_t peer_osd);
void execute_internal(cluster_op_t *op);
void unshift_op(cluster_op_t *op);
int continue_rw(cluster_op_t *op);
bool check_rw(cluster_op_t *op);
void slice_rw(cluster_op_t *op);
@ -161,4 +164,6 @@ protected:
void continue_listing(inode_list_t *lst);
void send_list(inode_list_osd_t *cur_list);
void continue_raw_ops(osd_num_t peer_osd);
friend class writeback_cache_t;
};

57
src/cluster_client_impl.h Normal file
View File

@ -0,0 +1,57 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
#pragma once
#include "cluster_client.h"
#define SCRAP_BUFFER_SIZE 4*1024*1024
#define PART_SENT 1
#define PART_DONE 2
#define PART_ERROR 4
#define PART_RETRY 8
#define CACHE_DIRTY 1
#define CACHE_WRITTEN 2
#define CACHE_FLUSHING 3
#define CACHE_REPEATING 4
#define OP_FLUSH_BUFFER 0x02
#define OP_IMMEDIATE_COMMIT 0x04
struct cluster_buffer_t
{
uint8_t *buf;
uint64_t len;
int state;
uint64_t flush_id;
uint64_t *refcnt;
};
typedef std::map<object_id, cluster_buffer_t>::iterator dirty_buf_it_t;
class writeback_cache_t
{
public:
uint64_t writeback_bytes = 0;
int writeback_queue_size = 0;
int writebacks_active = 0;
uint64_t last_flush_id = 0;
std::map<object_id, cluster_buffer_t> dirty_buffers;
std::vector<cluster_op_t*> writeback_overflow;
std::vector<object_id> writeback_queue;
std::multimap<uint64_t, uint64_t*> flushed_buffers; // flush_id => refcnt
~writeback_cache_t();
dirty_buf_it_t find_dirty(uint64_t inode, uint64_t offset);
bool is_left_merged(dirty_buf_it_t dirty_it);
bool is_right_merged(dirty_buf_it_t dirty_it);
bool is_merged(const dirty_buf_it_t & dirty_it);
void copy_write(cluster_op_t *op, int state);
int repeat_ops_for(cluster_client_t *cli, osd_num_t peer_osd);
void start_writebacks(cluster_client_t *cli, int count);
bool read_from_cache(cluster_op_t *op, uint32_t bitmap_granularity);
void flush_buffers(cluster_client_t *cli, dirty_buf_it_t from_it, dirty_buf_it_t to_it);
void fsync_start();
void fsync_error();
void fsync_ok();
};

498
src/cluster_client_wb.cpp Normal file
View File

@ -0,0 +1,498 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
#include <cassert>
#include "cluster_client_impl.h"
writeback_cache_t::~writeback_cache_t()
{
for (auto & bp: dirty_buffers)
{
if (!--(*bp.second.refcnt))
{
free(bp.second.refcnt); // refcnt is allocated with the buffer
}
}
dirty_buffers.clear();
}
dirty_buf_it_t writeback_cache_t::find_dirty(uint64_t inode, uint64_t offset)
{
auto dirty_it = dirty_buffers.lower_bound((object_id){
.inode = inode,
.stripe = offset,
});
while (dirty_it != dirty_buffers.begin())
{
dirty_it--;
if (dirty_it->first.inode != inode ||
(dirty_it->first.stripe + dirty_it->second.len) <= offset)
{
dirty_it++;
break;
}
}
return dirty_it;
}
bool writeback_cache_t::is_left_merged(dirty_buf_it_t dirty_it)
{
if (dirty_it != dirty_buffers.begin())
{
auto prev_it = dirty_it;
prev_it--;
if (prev_it->first.inode == dirty_it->first.inode &&
prev_it->first.stripe+prev_it->second.len == dirty_it->first.stripe &&
prev_it->second.state == CACHE_DIRTY)
{
return true;
}
}
return false;
}
bool writeback_cache_t::is_right_merged(dirty_buf_it_t dirty_it)
{
auto next_it = dirty_it;
next_it++;
if (next_it != dirty_buffers.end() &&
next_it->first.inode == dirty_it->first.inode &&
next_it->first.stripe == dirty_it->first.stripe+dirty_it->second.len &&
next_it->second.state == CACHE_DIRTY)
{
return true;
}
return false;
}
bool writeback_cache_t::is_merged(const dirty_buf_it_t & dirty_it)
{
return is_left_merged(dirty_it) || is_right_merged(dirty_it);
}
void writeback_cache_t::copy_write(cluster_op_t *op, int state)
{
// Save operation for replay when one of PGs goes out of sync
// (primary OSD drops our connection in this case)
// ...or just save it for writeback if write buffering is enabled
if (op->len == 0)
{
return;
}
auto dirty_it = find_dirty(op->inode, op->offset);
auto new_end = op->offset + op->len;
while (dirty_it != dirty_buffers.end() &&
dirty_it->first.inode == op->inode &&
dirty_it->first.stripe < op->offset+op->len)
{
assert(dirty_it->first.stripe + dirty_it->second.len > op->offset);
// Remove overlapping part(s) of buffers
auto old_end = dirty_it->first.stripe + dirty_it->second.len;
if (dirty_it->first.stripe < op->offset)
{
if (old_end > new_end)
{
// Split into end and start
dirty_it->second.len = op->offset - dirty_it->first.stripe;
dirty_it = dirty_buffers.emplace_hint(dirty_it, (object_id){
.inode = op->inode,
.stripe = new_end,
}, (cluster_buffer_t){
.buf = dirty_it->second.buf + new_end - dirty_it->first.stripe,
.len = old_end - new_end,
.state = dirty_it->second.state,
.flush_id = dirty_it->second.flush_id,
.refcnt = dirty_it->second.refcnt,
});
(*dirty_it->second.refcnt)++;
if (dirty_it->second.state == CACHE_DIRTY)
{
writeback_bytes -= op->len;
writeback_queue_size++;
}
break;
}
else
{
// Only leave the beginning
if (dirty_it->second.state == CACHE_DIRTY)
{
writeback_bytes -= old_end - op->offset;
if (is_left_merged(dirty_it) && !is_right_merged(dirty_it))
{
writeback_queue_size++;
}
}
dirty_it->second.len = op->offset - dirty_it->first.stripe;
dirty_it++;
}
}
else if (old_end > new_end)
{
// Only leave the end
if (dirty_it->second.state == CACHE_DIRTY)
{
writeback_bytes -= new_end - dirty_it->first.stripe;
if (!is_left_merged(dirty_it) && is_right_merged(dirty_it))
{
writeback_queue_size++;
}
}
auto new_dirty_it = dirty_buffers.emplace_hint(dirty_it, (object_id){
.inode = op->inode,
.stripe = new_end,
}, (cluster_buffer_t){
.buf = dirty_it->second.buf + new_end - dirty_it->first.stripe,
.len = old_end - new_end,
.state = dirty_it->second.state,
.flush_id = dirty_it->second.flush_id,
.refcnt = dirty_it->second.refcnt,
});
dirty_buffers.erase(dirty_it);
dirty_it = new_dirty_it;
break;
}
else
{
// Remove the whole buffer
if (dirty_it->second.state == CACHE_DIRTY && !is_merged(dirty_it))
{
writeback_bytes -= dirty_it->second.len;
assert(writeback_queue_size > 0);
writeback_queue_size--;
}
if (!--(*dirty_it->second.refcnt))
{
free(dirty_it->second.refcnt);
}
dirty_buffers.erase(dirty_it++);
}
}
// Overlapping buffers are removed, just insert the new one
uint64_t *refcnt = (uint64_t*)malloc_or_die(sizeof(uint64_t) + op->len);
uint8_t *buf = (uint8_t*)refcnt + sizeof(uint64_t);
*refcnt = 1;
dirty_it = dirty_buffers.emplace_hint(dirty_it, (object_id){
.inode = op->inode,
.stripe = op->offset,
}, (cluster_buffer_t){
.buf = buf,
.len = op->len,
.state = state,
.refcnt = refcnt,
});
if (state == CACHE_DIRTY)
{
writeback_bytes += op->len;
// Track consecutive write-back operations
if (!is_merged(dirty_it))
{
// <writeback_queue> is OK to contain more than actual number of consecutive
// requests as long as it doesn't miss anything. But <writeback_queue_size>
// is always calculated correctly.
writeback_queue_size++;
writeback_queue.push_back((object_id){
.inode = op->inode,
.stripe = op->offset,
});
}
}
uint64_t pos = 0, len = op->len, iov_idx = 0;
while (len > 0 && iov_idx < op->iov.count)
{
auto & iov = op->iov.buf[iov_idx];
memcpy(buf + pos, iov.iov_base, iov.iov_len);
pos += iov.iov_len;
iov_idx++;
}
}
int writeback_cache_t::repeat_ops_for(cluster_client_t *cli, osd_num_t peer_osd)
{
int repeated = 0;
if (dirty_buffers.size())
{
// peer_osd just dropped connection
// determine WHICH dirty_buffers are now obsolete and repeat them
for (auto wr_it = dirty_buffers.begin(), flush_it = wr_it, last_it = wr_it; ; )
{
bool end = wr_it == dirty_buffers.end();
bool flush_this = !end && wr_it->second.state != CACHE_REPEATING &&
cli->affects_osd(wr_it->first.inode, wr_it->first.stripe, wr_it->second.len, peer_osd);
if (flush_it != wr_it && (end || !flush_this ||
wr_it->first.inode != flush_it->first.inode ||
wr_it->first.stripe != last_it->first.stripe+last_it->second.len))
{
repeated++;
flush_buffers(cli, flush_it, wr_it);
flush_it = wr_it;
}
if (end)
break;
last_it = wr_it;
wr_it++;
if (!flush_this)
flush_it = wr_it;
}
}
return repeated;
}
void writeback_cache_t::flush_buffers(cluster_client_t *cli, dirty_buf_it_t from_it, dirty_buf_it_t to_it)
{
auto prev_it = to_it;
prev_it--;
bool is_writeback = from_it->second.state == CACHE_DIRTY;
cluster_op_t *op = new cluster_op_t;
op->flags = OSD_OP_IGNORE_READONLY|OP_FLUSH_BUFFER;
op->opcode = OSD_OP_WRITE;
op->cur_inode = op->inode = from_it->first.inode;
op->offset = from_it->first.stripe;
op->len = prev_it->first.stripe + prev_it->second.len - from_it->first.stripe;
uint32_t calc_len = 0;
uint64_t flush_id = ++last_flush_id;
for (auto it = from_it; it != to_it; it++)
{
it->second.state = CACHE_REPEATING;
it->second.flush_id = flush_id;
(*it->second.refcnt)++;
flushed_buffers.emplace(flush_id, it->second.refcnt);
op->iov.push_back(it->second.buf, it->second.len);
calc_len += it->second.len;
}
assert(calc_len == op->len);
writebacks_active++;
op->callback = [this, cli, flush_id](cluster_op_t* op)
{
// Buffer flushes should be always retried, regardless of the error,
// so they should never result in an error here
assert(op->retval == op->len);
for (auto fl_it = flushed_buffers.find(flush_id);
fl_it != flushed_buffers.end() && fl_it->first == flush_id; )
{
if (!--(*fl_it->second)) // refcnt
{
free(fl_it->second);
}
flushed_buffers.erase(fl_it++);
}
for (auto dirty_it = find_dirty(op->inode, op->offset);
dirty_it != dirty_buffers.end() && dirty_it->first.inode == op->inode &&
dirty_it->first.stripe < op->offset+op->len; dirty_it++)
{
if (dirty_it->second.flush_id == flush_id && dirty_it->second.state == CACHE_REPEATING)
{
dirty_it->second.flush_id = 0;
dirty_it->second.state = CACHE_WRITTEN;
}
}
delete op;
writebacks_active--;
// We can't call execute_internal because it affects an invalid copy of the list here
// (erase_op remembers `next` after writeback callback)
};
if (is_writeback)
{
cli->execute_internal(op);
}
else
{
// Insert repeated flushes into the beginning
cli->unshift_op(op);
cli->continue_rw(op);
}
}
void writeback_cache_t::start_writebacks(cluster_client_t *cli, int count)
{
if (!writeback_queue.size())
{
return;
}
std::vector<object_id> queue_copy;
queue_copy.swap(writeback_queue);
int started = 0, i = 0;
for (i = 0; i < queue_copy.size() && (!count || started < count); i++)
{
object_id & req = queue_copy[i];
auto dirty_it = find_dirty(req.inode, req.stripe);
if (dirty_it == dirty_buffers.end() ||
dirty_it->first.inode != req.inode ||
dirty_it->second.state != CACHE_DIRTY)
{
continue;
}
auto from_it = dirty_it;
uint64_t off = dirty_it->first.stripe;
while (from_it != dirty_buffers.begin())
{
from_it--;
if (from_it->second.state != CACHE_DIRTY ||
from_it->first.inode != req.inode ||
from_it->first.stripe+from_it->second.len != off)
{
from_it++;
break;
}
off = from_it->first.stripe;
}
off = dirty_it->first.stripe + dirty_it->second.len;
auto to_it = dirty_it;
to_it++;
while (to_it != dirty_buffers.end())
{
if (to_it->second.state != CACHE_DIRTY ||
to_it->first.inode != req.inode ||
to_it->first.stripe != off)
{
break;
}
off = to_it->first.stripe + to_it->second.len;
to_it++;
}
started++;
assert(writeback_queue_size > 0);
writeback_queue_size--;
writeback_bytes -= off - from_it->first.stripe;
flush_buffers(cli, from_it, to_it);
}
queue_copy.erase(queue_copy.begin(), queue_copy.begin()+i);
if (writeback_queue.size())
{
queue_copy.insert(queue_copy.end(), writeback_queue.begin(), writeback_queue.end());
}
queue_copy.swap(writeback_queue);
}
static void copy_to_op(cluster_op_t *op, uint64_t offset, uint8_t *buf, uint64_t len, uint32_t bitmap_granularity)
{
if (op->opcode == OSD_OP_READ)
{
// Not OSD_OP_READ_BITMAP or OSD_OP_READ_CHAIN_BITMAP
int iov_idx = 0;
uint64_t cur_offset = op->offset;
while (iov_idx < op->iov.count && cur_offset+op->iov.buf[iov_idx].iov_len <= offset)
{
cur_offset += op->iov.buf[iov_idx].iov_len;
iov_idx++;
}
while (iov_idx < op->iov.count && cur_offset < offset+len)
{
auto & v = op->iov.buf[iov_idx];
auto begin = (cur_offset < offset ? offset : cur_offset);
auto end = (cur_offset+v.iov_len > offset+len ? offset+len : cur_offset+v.iov_len);
memcpy(
v.iov_base + begin - cur_offset,
buf + (cur_offset <= offset ? 0 : cur_offset-offset),
end - begin
);
cur_offset += v.iov_len;
iov_idx++;
}
}
// Set bitmap bits
int start_bit = (offset-op->offset)/bitmap_granularity;
int end_bit = (offset-op->offset+len)/bitmap_granularity;
for (int bit = start_bit; bit < end_bit;)
{
if (!(bit%8) && bit <= end_bit-8)
{
((uint8_t*)op->bitmap_buf)[bit/8] = 0xFF;
bit += 8;
}
else
{
((uint8_t*)op->bitmap_buf)[bit/8] |= (1 << (bit%8));
bit++;
}
}
}
bool writeback_cache_t::read_from_cache(cluster_op_t *op, uint32_t bitmap_granularity)
{
bool dirty_copied = false;
if (dirty_buffers.size() && (op->opcode == OSD_OP_READ ||
op->opcode == OSD_OP_READ_BITMAP || op->opcode == OSD_OP_READ_CHAIN_BITMAP))
{
// We also have to return reads from CACHE_REPEATING buffers - they are not
// guaranteed to be present on target OSDs at the moment of repeating
// And we're also free to return data from other cached buffers just
// because it's faster
auto dirty_it = find_dirty(op->cur_inode, op->offset);
while (dirty_it != dirty_buffers.end() && dirty_it->first.inode == op->cur_inode &&
dirty_it->first.stripe < op->offset+op->len)
{
uint64_t begin = dirty_it->first.stripe, end = dirty_it->first.stripe + dirty_it->second.len;
if (begin < op->offset)
begin = op->offset;
if (end > op->offset+op->len)
end = op->offset+op->len;
bool skip_prev = true;
uint64_t cur = begin, prev = begin;
while (cur < end)
{
unsigned bmp_loc = (cur - op->offset)/bitmap_granularity;
bool skip = (((*((uint8_t*)op->bitmap_buf + bmp_loc/8)) >> (bmp_loc%8)) & 0x1);
if (skip_prev != skip)
{
if (cur > prev && !skip)
{
// Copy data
dirty_copied = true;
copy_to_op(op, prev, dirty_it->second.buf + prev - dirty_it->first.stripe, cur-prev, bitmap_granularity);
}
skip_prev = skip;
prev = cur;
}
cur += bitmap_granularity;
}
assert(cur > prev);
if (!skip_prev)
{
// Copy data
dirty_copied = true;
copy_to_op(op, prev, dirty_it->second.buf + prev - dirty_it->first.stripe, cur-prev, bitmap_granularity);
}
dirty_it++;
}
}
return dirty_copied;
}
void writeback_cache_t::fsync_start()
{
for (auto & prev_op: dirty_buffers)
{
if (prev_op.second.state == CACHE_WRITTEN)
{
prev_op.second.state = CACHE_FLUSHING;
}
}
}
void writeback_cache_t::fsync_error()
{
for (auto & prev_op: dirty_buffers)
{
if (prev_op.second.state == CACHE_FLUSHING)
{
prev_op.second.state = CACHE_WRITTEN;
}
}
}
void writeback_cache_t::fsync_ok()
{
for (auto uw_it = dirty_buffers.begin(); uw_it != dirty_buffers.end(); )
{
if (uw_it->second.state == CACHE_FLUSHING)
{
if (!--(*uw_it->second.refcnt))
free(uw_it->second.refcnt);
dirty_buffers.erase(uw_it++);
}
else
uw_it++;
}
}

View File

@ -24,6 +24,7 @@
#include <netinet/tcp.h>
#include <vector>
#include <string>
#include "vitastor_c.h"
#include "fio_headers.h"
@ -203,6 +204,15 @@ static void watch_callback(void *opaque, long watch)
bsd->watch = (void*)watch;
}
static void opt_push(std::vector<char *> & options, const char *opt, const char *value)
{
if (value)
{
options.push_back(strdup(opt));
options.push_back(strdup(value));
}
}
static int sec_setup(struct thread_data *td)
{
sec_options *o = (sec_options*)td->eo;
@ -254,8 +264,27 @@ static int sec_setup(struct thread_data *td)
{
o->inode = 0;
}
bsd->cli = vitastor_c_create_uring(o->config_path, o->etcd_host, o->etcd_prefix,
o->use_rdma, o->rdma_device, o->rdma_port_num, o->rdma_gid_index, o->rdma_mtu, o->cluster_log);
std::vector<char *> options;
opt_push(options, "config_path", o->config_path);
opt_push(options, "etcd_address", o->etcd_host);
opt_push(options, "etcd_prefix", o->etcd_prefix);
if (o->use_rdma != -1)
opt_push(options, "use_rdma", std::to_string(o->use_rdma).c_str());
opt_push(options, "rdma_device", o->rdma_device);
if (o->rdma_port_num)
opt_push(options, "rdma_port_num", std::to_string(o->rdma_port_num).c_str());
if (o->rdma_gid_index)
opt_push(options, "rdma_gid_index", std::to_string(o->rdma_gid_index).c_str());
if (o->rdma_mtu)
opt_push(options, "rdma_mtu", std::to_string(o->rdma_mtu).c_str());
if (o->cluster_log)
opt_push(options, "log_level", std::to_string(o->cluster_log).c_str());
// allow writeback caching if -direct is not set
opt_push(options, "client_writeback_allowed", td->o.odirect ? "0" : "1");
bsd->cli = vitastor_c_create_uring_json((const char**)options.data(), options.size());
for (auto opt: options)
free(opt);
options.clear();
if (o->image)
{
bsd->watch = NULL;

View File

@ -55,3 +55,10 @@ json11::Json::object osd_messenger_t::merge_configs(const json11::Json::object &
{
return cli_config;
}
bool json_is_true(const json11::Json & val)
{
if (val.is_string())
return val == "true" || val == "yes" || val == "1";
return val.bool_value();
}

View File

@ -22,4 +22,10 @@ public:
void submit()
{
}
void wait()
{
}
void loop()
{
}
};

View File

@ -341,6 +341,7 @@ public:
ringloop->loop();
ringloop->wait();
}
cli->flush();
delete cli;
delete epmgr;
delete ringloop;

View File

@ -34,7 +34,10 @@ nfs_proxy_t::~nfs_proxy_t()
if (cmd)
delete cmd;
if (cli)
{
cli->flush();
delete cli;
}
if (epmgr)
delete epmgr;
if (ringloop)
@ -261,16 +264,8 @@ void nfs_proxy_t::run(json11::Json cfg)
ringloop->loop();
ringloop->wait();
}
/*// Sync at the end
cluster_op_t *close_sync = new cluster_op_t;
close_sync->opcode = OSD_OP_SYNC;
close_sync->callback = [&stop](cluster_op_t *op)
{
stop = true;
delete op;
};
cli->execute(close_sync);*/
// Destroy the client
cli->flush();
delete cli;
delete epmgr;
delete ringloop;

View File

@ -388,6 +388,43 @@ static void vitastor_aio_set_fd_handler(void *vcli, int fd, int unused1, IOHandl
);
}
typedef struct str_array
{
const char **items;
int len, alloc;
} str_array;
static void strarray_push(str_array *a, const char *str)
{
if (a->len >= a->alloc)
{
a->alloc = !a->alloc ? 4 : 2*a->alloc;
a->items = (const char**)realloc(a->items, a->alloc*sizeof(char*));
if (!a->items)
{
fprintf(stderr, "bad alloc\n");
abort();
}
}
a->items[a->len++] = str;
}
static void strarray_push_kv(str_array *a, const char *key, const char *value)
{
if (key && value)
{
strarray_push(a, key);
strarray_push(a, value);
}
}
static void strarray_free(str_array *a)
{
free(a->items);
a->items = NULL;
a->len = a->alloc = 0;
}
static int vitastor_file_open(BlockDriverState *bs, QDict *options, int flags, Error **errp)
{
VitastorRPC task;
@ -406,23 +443,19 @@ static int vitastor_file_open(BlockDriverState *bs, QDict *options, int flags, E
client->rdma_gid_index = qdict_get_try_int(options, "rdma-gid-index", 0);
client->rdma_mtu = qdict_get_try_int(options, "rdma-mtu", 0);
client->ctx = bdrv_get_aio_context(bs);
#if defined VITASTOR_C_API_VERSION && VITASTOR_C_API_VERSION >= 2
client->proxy = vitastor_c_create_qemu_uring(
vitastor_aio_set_fd_handler, client, client->config_path, client->etcd_host, client->etcd_prefix,
client->use_rdma, client->rdma_device, client->rdma_port_num, client->rdma_gid_index, client->rdma_mtu, 0
);
if (!client->proxy)
{
fprintf(stderr, "vitastor: failed to create io_uring: %s - I/O will be slower\n", strerror(errno));
client->uring_eventfd = -1;
#endif
client->proxy = vitastor_c_create_qemu(
vitastor_aio_set_fd_handler, client, client->config_path, client->etcd_host, client->etcd_prefix,
client->use_rdma, client->rdma_device, client->rdma_port_num, client->rdma_gid_index, client->rdma_mtu, 0
);
#if defined VITASTOR_C_API_VERSION && VITASTOR_C_API_VERSION >= 2
}
else
str_array opt = {};
strarray_push_kv(&opt, "config_path", qdict_get_try_str(options, "config-path"));
strarray_push_kv(&opt, "etcd_address", qdict_get_try_str(options, "etcd-host"));
strarray_push_kv(&opt, "etcd_prefix", qdict_get_try_str(options, "etcd-prefix"));
strarray_push_kv(&opt, "use_rdma", qdict_get_try_str(options, "use-rdma"));
strarray_push_kv(&opt, "rdma_device", qdict_get_try_str(options, "rdma-device"));
strarray_push_kv(&opt, "rdma_port_num", qdict_get_try_str(options, "rdma-port-num"));
strarray_push_kv(&opt, "rdma_gid_index", qdict_get_try_str(options, "rdma-gid-index"));
strarray_push_kv(&opt, "rdma_mtu", qdict_get_try_str(options, "rdma-mtu"));
strarray_push_kv(&opt, "client_writeback_allowed", (flags & BDRV_O_NOCACHE) ? "0" : "1");
client->proxy = vitastor_c_create_uring_json(opt.items, opt.len);
strarray_free(&opt);
if (client->proxy)
{
client->uring_eventfd = vitastor_c_uring_register_eventfd(client->proxy);
if (client->uring_eventfd < 0)
@ -434,7 +467,45 @@ static int vitastor_file_open(BlockDriverState *bs, QDict *options, int flags, E
}
universal_aio_set_fd_handler(client->ctx, client->uring_eventfd, vitastor_uring_handler, NULL, client);
}
else
{
// Writeback cache is unusable without io_uring because the client can't correctly flush on exit
fprintf(stderr, "vitastor: failed to create io_uring: %s - I/O will be slower%s\n",
strerror(errno), (flags & BDRV_O_NOCACHE ? "" : " and writeback cache will be disabled"));
client->uring_eventfd = -1;
#if defined VITASTOR_C_API_VERSION && VITASTOR_C_API_VERSION >= 2
client->proxy = vitastor_c_create_qemu_uring(
vitastor_aio_set_fd_handler, client, client->config_path, client->etcd_host, client->etcd_prefix,
client->use_rdma, client->rdma_device, client->rdma_port_num, client->rdma_gid_index, client->rdma_mtu, 0
);
if (!client->proxy)
{
fprintf(stderr, "vitastor: failed to create io_uring: %s - I/O will be slower\n", strerror(errno));
client->uring_eventfd = -1;
client->proxy = vitastor_c_create_qemu(
vitastor_aio_set_fd_handler, client, client->config_path, client->etcd_host, client->etcd_prefix,
client->use_rdma, client->rdma_device, client->rdma_port_num, client->rdma_gid_index, client->rdma_mtu, 0
);
}
else
{
client->uring_eventfd = vitastor_c_uring_register_eventfd(client->proxy);
if (client->uring_eventfd < 0)
{
fprintf(stderr, "vitastor: failed to create io_uring eventfd: %s\n", strerror(errno));
error_setg(errp, "failed to create io_uring eventfd");
vitastor_close(bs);
return -1;
}
universal_aio_set_fd_handler(client->ctx, client->uring_eventfd, vitastor_uring_handler, NULL, client);
}
#else
client->proxy = vitastor_c_create_qemu(
vitastor_aio_set_fd_handler, client, client->config_path, client->etcd_host, client->etcd_prefix,
client->use_rdma, client->rdma_device, client->rdma_port_num, client->rdma_gid_index, client->rdma_mtu, 0
);
#endif
}
image = client->image = g_strdup(qdict_get_try_str(options, "image"));
client->readonly = (flags & BDRV_O_RDWR) ? 1 : 0;
// Get image metadata (size and readonly flag) or just wait until the client is ready

View File

@ -4,7 +4,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include "cluster_client.h"
#include "cluster_client_impl.h"
void configure_single_pg_pool(cluster_client_t *cli)
{
@ -47,11 +47,11 @@ void configure_single_pg_pool(cluster_client_t *cli)
cli->st_cli.on_change_hook(changes);
}
int *test_write(cluster_client_t *cli, uint64_t offset, uint64_t len, uint8_t c, std::function<void()> cb = NULL)
int *test_write(cluster_client_t *cli, uint64_t offset, uint64_t len, uint8_t c, std::function<void()> cb = NULL, bool instant = false)
{
printf("Post write %lx+%lx\n", offset, len);
int *r = new int;
*r = -1;
*r = instant ? -2 : -1;
cluster_op_t *op = new cluster_op_t();
op->opcode = OSD_OP_WRITE;
op->inode = 0x1000000000001;
@ -72,6 +72,13 @@ int *test_write(cluster_client_t *cli, uint64_t offset, uint64_t len, uint8_t c,
cb();
};
cli->execute(op);
if (instant)
{
long res = *r;
assert(*r >= 0);
delete r;
return (int*)res;
}
return r;
}
@ -160,6 +167,13 @@ osd_op_t *find_op(cluster_client_t *cli, osd_num_t osd_num, uint64_t opcode, uin
}
op_it++;
}
op_it = cli->msgr.clients[peer_fd]->sent_ops.begin();
while (op_it != cli->msgr.clients[peer_fd]->sent_ops.end())
{
printf("Found opcode %lu offset %lx size %x\n", op_it->second->req.hdr.opcode, op_it->second->req.rw.offset, op_it->second->req.rw.len);
op_it++;
}
printf("Not found opcode %lu offset %lx size %lx\n", opcode, offset, len);
return NULL;
}
@ -341,7 +355,7 @@ void test1()
void test2()
{
std::map<object_id, cluster_buffer_t> unsynced_writes;
writeback_cache_t *wb = new writeback_cache_t();
cluster_op_t *op = new cluster_op_t();
op->opcode = OSD_OP_WRITE;
op->inode = 1;
@ -350,19 +364,19 @@ void test2()
op->iov.push_back(malloc_or_die(4096*1024), 4096);
// 0-4k = 0x55
memset(op->iov.buf[0].iov_base, 0x55, op->iov.buf[0].iov_len);
cluster_client_t::copy_write(op, unsynced_writes);
wb->copy_write(op, CACHE_WRITTEN);
// 8k-12k = 0x66
op->offset = 8192;
memset(op->iov.buf[0].iov_base, 0x66, op->iov.buf[0].iov_len);
cluster_client_t::copy_write(op, unsynced_writes);
wb->copy_write(op, CACHE_WRITTEN);
// 4k-1M+4k = 0x77
op->len = op->iov.buf[0].iov_len = 1048576;
op->offset = 4096;
memset(op->iov.buf[0].iov_base, 0x77, op->iov.buf[0].iov_len);
cluster_client_t::copy_write(op, unsynced_writes);
wb->copy_write(op, CACHE_WRITTEN);
// check it
assert(unsynced_writes.size() == 4);
auto uit = unsynced_writes.begin();
assert(wb->dirty_buffers.size() == 2);
auto uit = wb->dirty_buffers.begin();
int i;
assert(uit->first.inode == 1);
assert(uit->first.stripe == 0);
@ -372,35 +386,106 @@ void test2()
uit++;
assert(uit->first.inode == 1);
assert(uit->first.stripe == 4096);
assert(uit->second.len == 4096);
for (i = 0; i < uit->second.len && ((uint8_t*)uit->second.buf)[i] == 0x77; i++) {}
assert(i == uit->second.len);
uit++;
assert(uit->first.inode == 1);
assert(uit->first.stripe == 8192);
assert(uit->second.len == 4096);
for (i = 0; i < uit->second.len && ((uint8_t*)uit->second.buf)[i] == 0x77; i++) {}
assert(i == uit->second.len);
uit++;
assert(uit->first.inode == 1);
assert(uit->first.stripe == 12*1024);
assert(uit->second.len == 1016*1024);
assert(uit->second.len == 1048576);
for (i = 0; i < uit->second.len && ((uint8_t*)uit->second.buf)[i] == 0x77; i++) {}
assert(i == uit->second.len);
uit++;
// free memory
free(op->iov.buf[0].iov_base);
delete op;
for (auto p: unsynced_writes)
{
free(p.second.buf);
}
delete wb;
printf("[ok] copy_write test\n");
}
void test_writeback()
{
json11::Json config = json11::Json::object {
{ "client_enable_writeback", true },
{ "client_writeback_allowed", true },
{ "client_max_buffered_bytes", 1024*1024 },
{ "client_max_buffered_ops", 2 },
{ "client_max_writeback_iodepth", 2 },
{ "client_max_dirty_bytes", 1024*1024 },
{ "client_max_dirty_ops", 2 },
};
timerfd_manager_t *tfd = new timerfd_manager_t([](int fd, bool wr, std::function<void(int, int)> callback){});
cluster_client_t *cli = new cluster_client_t(NULL, tfd, config);
configure_single_pg_pool(cli);
pretend_connected(cli, 1);
// Check that 3 consecutive writes are merged by writeback
assert((long)test_write(cli, 0, 4096, 0x55, NULL, true) == 1);
check_op_count(cli, 1, 0);
assert((long)test_write(cli, 4096, 4096, 0x55, NULL, true) == 1);
check_op_count(cli, 1, 0);
assert((long)test_write(cli, 8192, 4096, 0x55, NULL, true) == 1);
check_op_count(cli, 1, 0);
assert((long)test_write(cli, 1024*1024, 4096, 0x66, NULL, true) == 1);
check_op_count(cli, 1, 0);
// 3rd and 4th writes should trigger 1 writeback each
assert((long)test_write(cli, 2*1024*1024, 4096, 0x66, NULL, true) == 1);
check_op_count(cli, 1, 1);
assert((long)test_write(cli, 3*1024*1024, 4096, 0x66, NULL, true) == 1);
check_op_count(cli, 1, 2);
// 5th write should be postponed until at least 1 writeback is completed
int *r1 = test_write(cli, 4*1024*1024, 4096, 0x67, NULL);
check_op_count(cli, 1, 2);
can_complete(r1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 3*4096), 0);
check_completed(r1);
// autosync because max_dirty_ops=2, flush waits for sync
check_op_count(cli, 1, 1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 1024*1024, 4096), 0);
check_op_count(cli, 1, 1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_SYNC, 0, 0), 0);
check_op_count(cli, 1, 1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 2*1024*1024, 4096), 0);
check_op_count(cli, 1, 0);
int *r2 = test_sync(cli);
check_op_count(cli, 1, 1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 3*1024*1024, 4096), 0);
check_op_count(cli, 1, 1);
// autosync because max_dirty_ops=2, flush waits for sync
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_SYNC, 0, 0), 0);
check_op_count(cli, 1, 1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 4*1024*1024, 4096), 0);
check_op_count(cli, 1, 1);
can_complete(r2);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_SYNC, 0, 0), 0);
check_completed(r2);
// Check cutting of the beginning and end
assert((long)test_write(cli, 0, 32768, 0x55, NULL, true) == 1);
check_op_count(cli, 1, 0);
assert((long)test_write(cli, 32768, 32768, 0x56, NULL, true) == 1);
check_op_count(cli, 1, 0);
assert((long)test_write(cli, 16384, 32768, 0x57, NULL, true) == 1);
check_op_count(cli, 1, 0);
assert((long)test_write(cli, 16384+4096, 32768-4096, 0x58, NULL, true) == 1);
check_op_count(cli, 1, 0);
r2 = test_sync(cli);
check_op_count(cli, 1, 1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 65536), 0);
check_op_count(cli, 1, 1);
can_complete(r2);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_SYNC, 0, 0), 0);
check_completed(r2);
// Free client
delete cli;
delete tfd;
printf("[ok] writeback test\n");
}
int main(int narg, char *args[])
{
test1();
test2();
test_writeback();
return 0;
}