2020-09-17 23:02:40 +03:00
|
|
|
// Copyright (c) Vitaliy Filippov, 2019+
|
2021-02-06 01:26:07 +03:00
|
|
|
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
|
2020-09-17 23:02:40 +03:00
|
|
|
|
2020-10-06 02:35:11 +03:00
|
|
|
#include <stdexcept>
|
2021-03-31 01:03:15 +03:00
|
|
|
#include <assert.h>
|
2020-06-06 01:39:58 +03:00
|
|
|
#include "cluster_client.h"
|
|
|
|
|
2021-04-06 01:57:23 +03:00
|
|
|
#define PART_SENT 1
|
|
|
|
#define PART_DONE 2
|
|
|
|
#define PART_ERROR 4
|
2021-04-03 14:51:52 +03:00
|
|
|
#define CACHE_DIRTY 1
|
|
|
|
#define CACHE_FLUSHING 2
|
2021-04-09 11:14:52 +03:00
|
|
|
#define CACHE_REPEATING 3
|
2021-04-03 14:51:52 +03:00
|
|
|
#define OP_FLUSH_BUFFER 2
|
|
|
|
|
2020-06-06 16:32:35 +03:00
|
|
|
cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd, json11::Json & config)
|
2020-06-06 01:39:58 +03:00
|
|
|
{
|
|
|
|
this->ringloop = ringloop;
|
|
|
|
this->tfd = tfd;
|
2021-03-09 01:17:42 +03:00
|
|
|
this->config = config;
|
2020-10-11 02:14:15 +03:00
|
|
|
|
2020-06-15 13:22:20 +03:00
|
|
|
msgr.osd_num = 0;
|
2020-06-06 01:39:58 +03:00
|
|
|
msgr.tfd = tfd;
|
|
|
|
msgr.ringloop = ringloop;
|
|
|
|
msgr.repeer_pgs = [this](osd_num_t peer_osd)
|
|
|
|
{
|
|
|
|
if (msgr.osd_peer_fds.find(peer_osd) != msgr.osd_peer_fds.end())
|
|
|
|
{
|
2020-06-14 23:57:58 +03:00
|
|
|
// peer_osd just connected
|
|
|
|
continue_ops();
|
|
|
|
}
|
2021-04-03 14:51:52 +03:00
|
|
|
else if (dirty_buffers.size())
|
2020-06-14 23:57:58 +03:00
|
|
|
{
|
|
|
|
// peer_osd just dropped connection
|
2021-04-03 14:51:52 +03:00
|
|
|
// determine WHICH dirty_buffers are now obsolete and repeat them
|
|
|
|
for (auto & wr: dirty_buffers)
|
2020-06-27 02:13:33 +03:00
|
|
|
{
|
2021-04-03 14:51:52 +03:00
|
|
|
if (affects_osd(wr.first.inode, wr.first.stripe, wr.second.len, peer_osd) &&
|
2021-04-09 11:14:52 +03:00
|
|
|
wr.second.state != CACHE_REPEATING)
|
2020-06-27 02:13:33 +03:00
|
|
|
{
|
2021-04-03 14:51:52 +03:00
|
|
|
// FIXME: Flush in larger parts
|
2021-04-09 11:14:52 +03:00
|
|
|
flush_buffer(wr.first, &wr.second);
|
2020-06-14 23:57:58 +03:00
|
|
|
}
|
|
|
|
}
|
2020-06-06 01:39:58 +03:00
|
|
|
continue_ops();
|
|
|
|
}
|
|
|
|
};
|
2020-06-16 01:36:38 +03:00
|
|
|
msgr.exec_op = [this](osd_op_t *op)
|
|
|
|
{
|
|
|
|
// Garbage in
|
|
|
|
printf("Incoming garbage from peer %d\n", op->peer_fd);
|
|
|
|
msgr.stop_client(op->peer_fd);
|
|
|
|
delete op;
|
|
|
|
};
|
2021-03-09 01:17:42 +03:00
|
|
|
msgr.init();
|
2020-06-06 01:39:58 +03:00
|
|
|
|
|
|
|
st_cli.tfd = tfd;
|
|
|
|
st_cli.on_load_config_hook = [this](json11::Json::object & cfg) { on_load_config_hook(cfg); };
|
|
|
|
st_cli.on_change_osd_state_hook = [this](uint64_t peer_osd) { on_change_osd_state_hook(peer_osd); };
|
|
|
|
st_cli.on_change_hook = [this](json11::Json::object & changes) { on_change_hook(changes); };
|
|
|
|
st_cli.on_load_pgs_hook = [this](bool success) { on_load_pgs_hook(success); };
|
2020-06-06 16:32:35 +03:00
|
|
|
|
|
|
|
st_cli.parse_config(config);
|
2020-06-06 01:39:58 +03:00
|
|
|
st_cli.load_global_config();
|
2020-06-15 13:22:20 +03:00
|
|
|
|
2020-06-23 20:10:33 +03:00
|
|
|
if (ringloop)
|
2020-06-15 13:22:20 +03:00
|
|
|
{
|
2020-06-23 20:10:33 +03:00
|
|
|
consumer.loop = [this]()
|
|
|
|
{
|
|
|
|
msgr.read_requests();
|
|
|
|
msgr.send_replies();
|
|
|
|
this->ringloop->submit();
|
|
|
|
};
|
|
|
|
ringloop->register_consumer(&consumer);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
cluster_client_t::~cluster_client_t()
|
|
|
|
{
|
2021-04-03 14:51:52 +03:00
|
|
|
for (auto bp: dirty_buffers)
|
|
|
|
{
|
|
|
|
free(bp.second.buf);
|
|
|
|
}
|
|
|
|
dirty_buffers.clear();
|
2020-06-23 20:10:33 +03:00
|
|
|
if (ringloop)
|
|
|
|
{
|
|
|
|
ringloop->unregister_consumer(&consumer);
|
|
|
|
}
|
2020-06-06 01:39:58 +03:00
|
|
|
}
|
|
|
|
|
2020-09-05 22:05:21 +03:00
|
|
|
void cluster_client_t::continue_ops(bool up_retry)
|
2020-06-06 01:39:58 +03:00
|
|
|
{
|
2021-04-03 14:51:52 +03:00
|
|
|
if (!pgs_loaded)
|
|
|
|
{
|
|
|
|
// We're offline
|
|
|
|
return;
|
|
|
|
}
|
2021-04-08 10:52:21 +03:00
|
|
|
if (continuing_ops)
|
|
|
|
{
|
|
|
|
// Attempt to reenter the function
|
|
|
|
continuing_ops = 2;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
restart:
|
|
|
|
continuing_ops = 1;
|
2021-04-09 11:14:52 +03:00
|
|
|
op_queue_pos = 0;
|
2021-04-03 14:51:52 +03:00
|
|
|
bool has_flushes = false, has_writes = false;
|
2021-04-09 11:14:52 +03:00
|
|
|
while (op_queue_pos < op_queue.size())
|
2020-06-14 23:57:58 +03:00
|
|
|
{
|
2021-04-09 11:14:52 +03:00
|
|
|
auto op = op_queue[op_queue_pos];
|
|
|
|
bool rm = false, is_flush = op->flags & OP_FLUSH_BUFFER;
|
|
|
|
auto opcode = op->opcode;
|
|
|
|
if (!op->up_wait || up_retry)
|
2020-09-05 22:05:21 +03:00
|
|
|
{
|
2021-04-09 11:14:52 +03:00
|
|
|
op->up_wait = false;
|
2021-04-06 01:57:23 +03:00
|
|
|
if (opcode == OSD_OP_READ || opcode == OSD_OP_WRITE)
|
2020-09-05 22:05:21 +03:00
|
|
|
{
|
2021-04-06 01:57:23 +03:00
|
|
|
if (is_flush || !has_flushes)
|
2021-04-03 14:51:52 +03:00
|
|
|
{
|
2021-04-06 01:57:23 +03:00
|
|
|
// Regular writes can't proceed before buffer flushes
|
2021-04-09 11:14:52 +03:00
|
|
|
rm = continue_rw(op);
|
2021-04-03 14:51:52 +03:00
|
|
|
}
|
|
|
|
}
|
2021-04-06 01:57:23 +03:00
|
|
|
else if (opcode == OSD_OP_SYNC)
|
2021-04-03 14:51:52 +03:00
|
|
|
{
|
|
|
|
if (!has_writes)
|
|
|
|
{
|
|
|
|
// SYNC can't proceed before previous writes
|
2021-04-09 11:14:52 +03:00
|
|
|
rm = continue_sync(op);
|
2021-04-03 14:51:52 +03:00
|
|
|
}
|
2020-09-05 22:05:21 +03:00
|
|
|
}
|
|
|
|
}
|
2021-04-06 01:57:23 +03:00
|
|
|
if (opcode == OSD_OP_WRITE)
|
|
|
|
{
|
|
|
|
has_writes = has_writes || !rm;
|
|
|
|
if (is_flush)
|
|
|
|
{
|
|
|
|
has_flushes = has_writes || !rm;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
else if (opcode == OSD_OP_SYNC)
|
|
|
|
{
|
|
|
|
// Postpone writes until previous SYNC completes
|
|
|
|
// ...so dirty_writes can't contain anything newer than SYNC
|
|
|
|
has_flushes = has_writes || !rm;
|
|
|
|
}
|
2021-04-09 11:14:52 +03:00
|
|
|
if (rm)
|
2021-04-03 14:51:52 +03:00
|
|
|
{
|
2021-04-09 11:14:52 +03:00
|
|
|
op_queue.erase(op_queue.begin()+op_queue_pos, op_queue.begin()+op_queue_pos+1);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
op_queue_pos++;
|
|
|
|
}
|
|
|
|
if (continuing_ops == 2)
|
|
|
|
{
|
|
|
|
goto restart;
|
2021-04-03 14:51:52 +03:00
|
|
|
}
|
2020-06-06 01:39:58 +03:00
|
|
|
}
|
2021-04-09 11:14:52 +03:00
|
|
|
continuing_ops = 0;
|
2020-06-06 01:39:58 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
static uint32_t is_power_of_two(uint64_t value)
|
|
|
|
{
|
|
|
|
uint32_t l = 0;
|
|
|
|
while (value > 1)
|
|
|
|
{
|
|
|
|
if (value & 1)
|
|
|
|
{
|
|
|
|
return 64;
|
|
|
|
}
|
|
|
|
value = value >> 1;
|
|
|
|
l++;
|
|
|
|
}
|
|
|
|
return l;
|
|
|
|
}
|
|
|
|
|
|
|
|
void cluster_client_t::on_load_config_hook(json11::Json::object & config)
|
|
|
|
{
|
|
|
|
bs_block_size = config["block_size"].uint64_value();
|
|
|
|
bs_bitmap_granularity = config["bitmap_granularity"].uint64_value();
|
|
|
|
if (!bs_block_size)
|
2020-06-14 23:57:58 +03:00
|
|
|
{
|
2020-06-06 01:39:58 +03:00
|
|
|
bs_block_size = DEFAULT_BLOCK_SIZE;
|
2020-06-14 23:57:58 +03:00
|
|
|
}
|
2020-06-06 01:39:58 +03:00
|
|
|
if (!bs_bitmap_granularity)
|
2020-06-14 23:57:58 +03:00
|
|
|
{
|
2020-06-06 01:39:58 +03:00
|
|
|
bs_bitmap_granularity = DEFAULT_BITMAP_GRANULARITY;
|
2020-06-14 23:57:58 +03:00
|
|
|
}
|
|
|
|
uint32_t block_order;
|
|
|
|
if ((block_order = is_power_of_two(bs_block_size)) >= 64 || bs_block_size < MIN_BLOCK_SIZE || bs_block_size >= MAX_BLOCK_SIZE)
|
2020-06-06 01:39:58 +03:00
|
|
|
{
|
2020-06-14 23:57:58 +03:00
|
|
|
throw std::runtime_error("Bad block size");
|
2020-06-06 01:39:58 +03:00
|
|
|
}
|
2020-06-07 00:29:09 +03:00
|
|
|
if (config["immediate_commit"] == "all")
|
|
|
|
{
|
|
|
|
// Cluster-wide immediate_commit mode
|
|
|
|
immediate_commit = true;
|
|
|
|
}
|
2021-04-06 01:57:23 +03:00
|
|
|
if (config.find("client_max_dirty_bytes") != config.end())
|
|
|
|
{
|
|
|
|
client_max_dirty_bytes = config["client_max_dirty_bytes"].uint64_value();
|
|
|
|
}
|
2020-06-14 23:57:58 +03:00
|
|
|
else if (config.find("client_dirty_limit") != config.end())
|
|
|
|
{
|
2021-04-06 01:57:23 +03:00
|
|
|
// Old name
|
|
|
|
client_max_dirty_bytes = config["client_dirty_limit"].uint64_value();
|
2020-06-15 13:22:20 +03:00
|
|
|
}
|
2021-04-06 01:57:23 +03:00
|
|
|
if (config.find("client_max_dirty_ops") != config.end())
|
2020-06-15 13:22:20 +03:00
|
|
|
{
|
2021-04-06 01:57:23 +03:00
|
|
|
client_max_dirty_ops = config["client_max_dirty_ops"].uint64_value();
|
|
|
|
}
|
|
|
|
if (!client_max_dirty_bytes)
|
|
|
|
{
|
|
|
|
client_max_dirty_bytes = DEFAULT_CLIENT_MAX_DIRTY_BYTES;
|
|
|
|
}
|
|
|
|
if (!client_max_dirty_ops)
|
|
|
|
{
|
|
|
|
client_max_dirty_ops = DEFAULT_CLIENT_MAX_DIRTY_OPS;
|
2020-06-14 23:57:58 +03:00
|
|
|
}
|
2020-09-05 22:05:21 +03:00
|
|
|
up_wait_retry_interval = config["up_wait_retry_interval"].uint64_value();
|
|
|
|
if (!up_wait_retry_interval)
|
|
|
|
{
|
|
|
|
up_wait_retry_interval = 500;
|
|
|
|
}
|
|
|
|
else if (up_wait_retry_interval < 50)
|
|
|
|
{
|
|
|
|
up_wait_retry_interval = 50;
|
|
|
|
}
|
2021-03-09 01:17:42 +03:00
|
|
|
msgr.parse_config(config);
|
|
|
|
msgr.parse_config(this->config);
|
2020-06-15 13:22:20 +03:00
|
|
|
st_cli.load_pgs();
|
2020-06-06 01:39:58 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
void cluster_client_t::on_load_pgs_hook(bool success)
|
|
|
|
{
|
2020-09-03 00:52:41 +03:00
|
|
|
for (auto pool_item: st_cli.pool_config)
|
2020-06-06 01:39:58 +03:00
|
|
|
{
|
2020-09-03 00:52:41 +03:00
|
|
|
pg_counts[pool_item.first] = pool_item.second.real_pg_count;
|
2020-06-06 01:39:58 +03:00
|
|
|
}
|
2020-10-10 00:56:47 +03:00
|
|
|
pgs_loaded = true;
|
|
|
|
for (auto fn: on_ready_hooks)
|
|
|
|
{
|
|
|
|
fn();
|
|
|
|
}
|
|
|
|
on_ready_hooks.clear();
|
2020-09-03 00:52:41 +03:00
|
|
|
for (auto op: offline_ops)
|
|
|
|
{
|
|
|
|
execute(op);
|
|
|
|
}
|
|
|
|
offline_ops.clear();
|
2020-09-20 01:47:09 +03:00
|
|
|
continue_ops();
|
2020-06-06 01:39:58 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
void cluster_client_t::on_change_hook(json11::Json::object & changes)
|
|
|
|
{
|
2020-09-03 00:52:41 +03:00
|
|
|
for (auto pool_item: st_cli.pool_config)
|
2020-06-06 01:39:58 +03:00
|
|
|
{
|
2020-09-03 00:52:41 +03:00
|
|
|
if (pg_counts[pool_item.first] != pool_item.second.real_pg_count)
|
2020-06-14 23:57:58 +03:00
|
|
|
{
|
2020-09-03 00:52:41 +03:00
|
|
|
// At this point, all pool operations should have been suspended
|
|
|
|
// And now they have to be resliced!
|
2021-04-03 14:51:52 +03:00
|
|
|
for (auto op: op_queue)
|
2020-06-14 23:57:58 +03:00
|
|
|
{
|
2021-04-03 14:51:52 +03:00
|
|
|
if ((op->opcode == OSD_OP_WRITE || op->opcode == OSD_OP_READ) &&
|
|
|
|
INODE_POOL(op->inode) == pool_item.first)
|
2020-09-03 00:52:41 +03:00
|
|
|
{
|
|
|
|
op->needs_reslice = true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
pg_counts[pool_item.first] = pool_item.second.real_pg_count;
|
2020-06-14 23:57:58 +03:00
|
|
|
}
|
2020-06-06 01:39:58 +03:00
|
|
|
}
|
|
|
|
continue_ops();
|
|
|
|
}
|
|
|
|
|
|
|
|
void cluster_client_t::on_change_osd_state_hook(uint64_t peer_osd)
|
|
|
|
{
|
|
|
|
if (msgr.wanted_peers.find(peer_osd) != msgr.wanted_peers.end())
|
|
|
|
{
|
|
|
|
msgr.connect_peer(peer_osd, st_cli.peer_states[peer_osd]);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-04-03 01:54:52 +03:00
|
|
|
bool cluster_client_t::is_ready()
|
|
|
|
{
|
|
|
|
return pgs_loaded;
|
|
|
|
}
|
|
|
|
|
2020-10-10 00:56:47 +03:00
|
|
|
void cluster_client_t::on_ready(std::function<void(void)> fn)
|
|
|
|
{
|
|
|
|
if (pgs_loaded)
|
|
|
|
{
|
|
|
|
fn();
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
on_ready_hooks.push_back(fn);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-06-14 23:57:58 +03:00
|
|
|
/**
|
|
|
|
* How writes are synced when immediate_commit is false
|
|
|
|
*
|
2020-06-15 13:22:20 +03:00
|
|
|
* "Continue" WRITE:
|
2021-04-03 14:51:52 +03:00
|
|
|
* 1) if the operation is not sliced yet - slice it
|
|
|
|
* 2) if the operation doesn't require reslice - try to connect & send all remaining parts
|
|
|
|
* 3) if any of them fail due to disconnected peers or PGs not up, repeat after reconnecting or small timeout
|
|
|
|
* 4) if any of them fail due to other errors, fail the operation and forget it from the current "unsynced batch"
|
|
|
|
* 5) if PG count changes before all parts are done, wait for all in-progress parts to finish,
|
2020-06-14 23:57:58 +03:00
|
|
|
* throw all results away, reslice and resubmit op
|
2021-04-03 14:51:52 +03:00
|
|
|
* 6) when all parts are done, try to "continue" the current SYNC
|
|
|
|
* 7) if the operation succeeds, but then some OSDs drop their connections, repeat
|
2020-06-15 13:22:20 +03:00
|
|
|
* parts from the current "unsynced batch" previously sent to those OSDs in any order
|
2020-06-14 23:57:58 +03:00
|
|
|
*
|
2020-06-15 13:22:20 +03:00
|
|
|
* "Continue" current SYNC:
|
|
|
|
* 1) take all unsynced operations from the current batch
|
|
|
|
* 2) check if all affected OSDs are still alive
|
|
|
|
* 3) if yes, send all SYNCs. otherwise, leave current SYNC as is.
|
2020-06-14 23:57:58 +03:00
|
|
|
* 4) if any of them fail due to disconnected peers, repeat SYNC after repeating all writes
|
|
|
|
* 5) if any of them fail due to other errors, fail the SYNC operation
|
|
|
|
*/
|
2020-06-06 01:39:58 +03:00
|
|
|
void cluster_client_t::execute(cluster_op_t *op)
|
|
|
|
{
|
2021-04-03 14:51:52 +03:00
|
|
|
if (op->opcode != OSD_OP_SYNC && op->opcode != OSD_OP_READ && op->opcode != OSD_OP_WRITE)
|
2020-06-07 00:29:09 +03:00
|
|
|
{
|
2020-06-14 23:57:58 +03:00
|
|
|
op->retval = -EINVAL;
|
2020-06-07 00:29:09 +03:00
|
|
|
std::function<void(cluster_op_t*)>(op->callback)(op);
|
|
|
|
return;
|
|
|
|
}
|
2021-04-03 14:51:52 +03:00
|
|
|
op->retval = 0;
|
2020-06-07 00:29:09 +03:00
|
|
|
if (op->opcode == OSD_OP_WRITE && !immediate_commit)
|
|
|
|
{
|
2021-04-06 01:57:23 +03:00
|
|
|
if (dirty_bytes >= client_max_dirty_bytes || dirty_ops >= client_max_dirty_ops)
|
2020-06-14 23:57:58 +03:00
|
|
|
{
|
|
|
|
// Push an extra SYNC operation to flush previous writes
|
|
|
|
cluster_op_t *sync_op = new cluster_op_t;
|
|
|
|
sync_op->opcode = OSD_OP_SYNC;
|
2021-04-03 14:51:52 +03:00
|
|
|
sync_op->callback = [](cluster_op_t* sync_op)
|
|
|
|
{
|
|
|
|
delete sync_op;
|
|
|
|
};
|
|
|
|
op_queue.push_back(sync_op);
|
|
|
|
dirty_bytes = 0;
|
2021-04-06 01:57:23 +03:00
|
|
|
dirty_ops = 0;
|
2020-06-14 23:57:58 +03:00
|
|
|
}
|
2021-04-03 14:51:52 +03:00
|
|
|
dirty_bytes += op->len;
|
2021-04-06 01:57:23 +03:00
|
|
|
dirty_ops++;
|
2021-04-03 14:51:52 +03:00
|
|
|
}
|
|
|
|
else if (op->opcode == OSD_OP_SYNC)
|
|
|
|
{
|
|
|
|
dirty_bytes = 0;
|
2021-04-06 01:57:23 +03:00
|
|
|
dirty_ops = 0;
|
2020-06-14 23:57:58 +03:00
|
|
|
}
|
2021-04-03 14:51:52 +03:00
|
|
|
op_queue.push_back(op);
|
|
|
|
continue_ops();
|
2020-06-14 23:57:58 +03:00
|
|
|
}
|
|
|
|
|
2021-04-03 14:51:52 +03:00
|
|
|
void cluster_client_t::copy_write(cluster_op_t *op, std::map<object_id, cluster_buffer_t> & dirty_buffers)
|
2020-06-14 23:57:58 +03:00
|
|
|
{
|
2021-04-03 14:51:52 +03:00
|
|
|
// Save operation for replay when one of PGs goes out of sync
|
|
|
|
// (primary OSD drops our connection in this case)
|
|
|
|
auto dirty_it = dirty_buffers.lower_bound((object_id){
|
|
|
|
.inode = op->inode,
|
|
|
|
.stripe = op->offset,
|
|
|
|
});
|
|
|
|
while (dirty_it != dirty_buffers.begin())
|
|
|
|
{
|
|
|
|
dirty_it--;
|
|
|
|
if (dirty_it->first.inode != op->inode ||
|
|
|
|
(dirty_it->first.stripe + dirty_it->second.len) <= op->offset)
|
|
|
|
{
|
|
|
|
dirty_it++;
|
|
|
|
break;
|
|
|
|
}
|
2020-09-05 17:09:20 +03:00
|
|
|
}
|
2021-04-03 14:51:52 +03:00
|
|
|
uint64_t pos = op->offset, len = op->len, iov_idx = 0, iov_pos = 0;
|
|
|
|
while (len > 0)
|
2020-06-14 23:57:58 +03:00
|
|
|
{
|
2021-04-03 14:51:52 +03:00
|
|
|
uint64_t new_len = 0;
|
|
|
|
if (dirty_it == dirty_buffers.end())
|
|
|
|
{
|
|
|
|
new_len = len;
|
|
|
|
}
|
|
|
|
else if (dirty_it->first.inode != op->inode || dirty_it->first.stripe > pos)
|
|
|
|
{
|
|
|
|
new_len = dirty_it->first.stripe - pos;
|
|
|
|
if (new_len > len)
|
2020-06-14 23:57:58 +03:00
|
|
|
{
|
2021-04-03 14:51:52 +03:00
|
|
|
new_len = len;
|
2020-06-14 23:57:58 +03:00
|
|
|
}
|
2021-04-03 14:51:52 +03:00
|
|
|
}
|
|
|
|
if (new_len > 0)
|
|
|
|
{
|
|
|
|
dirty_it = dirty_buffers.emplace_hint(dirty_it, (object_id){
|
|
|
|
.inode = op->inode,
|
|
|
|
.stripe = pos,
|
|
|
|
}, (cluster_buffer_t){
|
|
|
|
.buf = malloc_or_die(new_len),
|
|
|
|
.len = new_len,
|
|
|
|
});
|
|
|
|
}
|
|
|
|
// FIXME: Split big buffers into smaller ones on overwrites. But this will require refcounting
|
|
|
|
dirty_it->second.state = CACHE_DIRTY;
|
|
|
|
uint64_t cur_len = (dirty_it->first.stripe + dirty_it->second.len - pos);
|
|
|
|
if (cur_len > len)
|
|
|
|
{
|
|
|
|
cur_len = len;
|
|
|
|
}
|
|
|
|
while (cur_len > 0 && iov_idx < op->iov.count)
|
2020-06-24 01:31:48 +03:00
|
|
|
{
|
2021-04-03 14:51:52 +03:00
|
|
|
unsigned iov_len = (op->iov.buf[iov_idx].iov_len - iov_pos);
|
|
|
|
if (iov_len <= cur_len)
|
|
|
|
{
|
|
|
|
memcpy(dirty_it->second.buf + pos - dirty_it->first.stripe,
|
|
|
|
op->iov.buf[iov_idx].iov_base + iov_pos, iov_len);
|
|
|
|
pos += iov_len;
|
|
|
|
len -= iov_len;
|
|
|
|
cur_len -= iov_len;
|
|
|
|
iov_pos = 0;
|
|
|
|
iov_idx++;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
memcpy(dirty_it->second.buf + pos - dirty_it->first.stripe,
|
|
|
|
op->iov.buf[iov_idx].iov_base + iov_pos, cur_len);
|
|
|
|
pos += cur_len;
|
|
|
|
len -= cur_len;
|
|
|
|
iov_pos += cur_len;
|
|
|
|
cur_len = 0;
|
|
|
|
}
|
2020-06-24 01:31:48 +03:00
|
|
|
}
|
2021-04-03 14:51:52 +03:00
|
|
|
dirty_it++;
|
2020-06-14 23:57:58 +03:00
|
|
|
}
|
2021-04-03 14:51:52 +03:00
|
|
|
}
|
|
|
|
|
2021-04-09 11:14:52 +03:00
|
|
|
void cluster_client_t::flush_buffer(const object_id & oid, cluster_buffer_t *wr)
|
2021-04-03 14:51:52 +03:00
|
|
|
{
|
2021-04-09 11:14:52 +03:00
|
|
|
wr->state = CACHE_REPEATING;
|
2021-04-03 14:51:52 +03:00
|
|
|
cluster_op_t *op = new cluster_op_t;
|
|
|
|
op->flags = OP_FLUSH_BUFFER;
|
|
|
|
op->opcode = OSD_OP_WRITE;
|
|
|
|
op->inode = oid.inode;
|
|
|
|
op->offset = oid.stripe;
|
2021-04-09 11:14:52 +03:00
|
|
|
op->len = wr->len;
|
|
|
|
op->iov.push_back(wr->buf, wr->len);
|
|
|
|
op->callback = [wr](cluster_op_t* op)
|
2020-06-14 23:57:58 +03:00
|
|
|
{
|
2021-04-09 11:14:52 +03:00
|
|
|
if (wr->state == CACHE_REPEATING)
|
|
|
|
{
|
|
|
|
wr->state = CACHE_DIRTY;
|
|
|
|
}
|
2021-04-03 14:51:52 +03:00
|
|
|
delete op;
|
|
|
|
};
|
2021-04-09 11:14:52 +03:00
|
|
|
op_queue.insert(op_queue.begin(), op);
|
|
|
|
if (continuing_ops)
|
|
|
|
{
|
|
|
|
continuing_ops = 2;
|
|
|
|
op_queue_pos++;
|
|
|
|
}
|
2021-04-03 14:51:52 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
int cluster_client_t::continue_rw(cluster_op_t *op)
|
|
|
|
{
|
|
|
|
if (op->state == 0)
|
|
|
|
goto resume_0;
|
|
|
|
else if (op->state == 1)
|
|
|
|
goto resume_1;
|
|
|
|
else if (op->state == 2)
|
|
|
|
goto resume_2;
|
|
|
|
else if (op->state == 3)
|
|
|
|
goto resume_3;
|
|
|
|
resume_0:
|
|
|
|
if (!op->len || op->offset % bs_bitmap_granularity || op->len % bs_bitmap_granularity)
|
|
|
|
{
|
|
|
|
op->retval = -EINVAL;
|
|
|
|
std::function<void(cluster_op_t*)>(op->callback)(op);
|
|
|
|
return 1;
|
2020-06-14 23:57:58 +03:00
|
|
|
}
|
|
|
|
{
|
2021-04-03 14:51:52 +03:00
|
|
|
pool_id_t pool_id = INODE_POOL(op->inode);
|
|
|
|
if (!pool_id)
|
2020-06-07 00:29:09 +03:00
|
|
|
{
|
2021-04-03 14:51:52 +03:00
|
|
|
op->retval = -EINVAL;
|
|
|
|
std::function<void(cluster_op_t*)>(op->callback)(op);
|
|
|
|
return 1;
|
|
|
|
}
|
|
|
|
if (st_cli.pool_config.find(pool_id) == st_cli.pool_config.end() ||
|
|
|
|
st_cli.pool_config[pool_id].real_pg_count == 0)
|
|
|
|
{
|
|
|
|
// Postpone operations to unknown pools
|
|
|
|
return 0;
|
2020-06-14 23:57:58 +03:00
|
|
|
}
|
|
|
|
}
|
2021-04-03 14:51:52 +03:00
|
|
|
if (op->opcode == OSD_OP_WRITE)
|
2020-06-14 23:57:58 +03:00
|
|
|
{
|
2021-04-06 01:57:23 +03:00
|
|
|
if (!immediate_commit && !(op->flags & OP_FLUSH_BUFFER))
|
2020-06-14 23:57:58 +03:00
|
|
|
{
|
2021-04-03 14:51:52 +03:00
|
|
|
copy_write(op, dirty_buffers);
|
2020-06-14 23:57:58 +03:00
|
|
|
}
|
2021-04-03 14:51:52 +03:00
|
|
|
}
|
|
|
|
resume_1:
|
|
|
|
// Slice the operation into parts
|
|
|
|
slice_rw(op);
|
|
|
|
op->needs_reslice = false;
|
|
|
|
resume_2:
|
|
|
|
// Send unsent parts, if they're not subject to change
|
|
|
|
op->state = 3;
|
2021-04-06 01:57:23 +03:00
|
|
|
if (op->needs_reslice)
|
|
|
|
{
|
|
|
|
for (int i = 0; i < op->parts.size(); i++)
|
|
|
|
{
|
|
|
|
if (!(op->parts[i].flags & PART_SENT) && op->retval)
|
|
|
|
{
|
|
|
|
op->retval = -EPIPE;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
goto resume_3;
|
|
|
|
}
|
2021-04-03 14:51:52 +03:00
|
|
|
for (int i = 0; i < op->parts.size(); i++)
|
|
|
|
{
|
2021-04-06 01:57:23 +03:00
|
|
|
if (!(op->parts[i].flags & PART_SENT))
|
2020-06-14 23:57:58 +03:00
|
|
|
{
|
2021-04-03 14:51:52 +03:00
|
|
|
if (!try_send(op, i))
|
2020-06-14 23:57:58 +03:00
|
|
|
{
|
2021-04-03 14:51:52 +03:00
|
|
|
// We'll need to retry again
|
2021-04-06 01:57:23 +03:00
|
|
|
op->up_wait = true;
|
|
|
|
if (!retry_timeout_id)
|
|
|
|
{
|
|
|
|
retry_timeout_id = tfd->set_timer(up_wait_retry_interval, false, [this](int)
|
|
|
|
{
|
|
|
|
retry_timeout_id = 0;
|
|
|
|
continue_ops(true);
|
|
|
|
});
|
|
|
|
}
|
2021-04-03 14:51:52 +03:00
|
|
|
op->state = 2;
|
2020-06-14 23:57:58 +03:00
|
|
|
}
|
2021-04-03 14:51:52 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
if (op->state == 2)
|
|
|
|
{
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
resume_3:
|
2021-04-06 01:57:23 +03:00
|
|
|
if (op->inflight_count > 0)
|
2021-04-03 14:51:52 +03:00
|
|
|
{
|
|
|
|
op->state = 3;
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
if (op->done_count >= op->parts.size())
|
|
|
|
{
|
|
|
|
// Finished successfully
|
|
|
|
// Even if the PG count has changed in meanwhile we treat it as success
|
|
|
|
// because if some operations were invalid for the new PG count we'd get errors
|
|
|
|
op->retval = op->len;
|
|
|
|
std::function<void(cluster_op_t*)>(op->callback)(op);
|
|
|
|
return 1;
|
|
|
|
}
|
|
|
|
else if (op->retval != 0 && op->retval != -EPIPE)
|
|
|
|
{
|
|
|
|
// Fatal error (not -EPIPE)
|
|
|
|
std::function<void(cluster_op_t*)>(op->callback)(op);
|
|
|
|
return 1;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
// -EPIPE - clear the error and retry
|
|
|
|
op->retval = 0;
|
|
|
|
if (op->needs_reslice)
|
|
|
|
{
|
|
|
|
op->parts.clear();
|
|
|
|
op->done_count = 0;
|
|
|
|
goto resume_1;
|
2020-06-07 00:29:09 +03:00
|
|
|
}
|
2020-06-15 13:22:20 +03:00
|
|
|
else
|
2020-06-07 00:29:09 +03:00
|
|
|
{
|
2021-04-06 01:57:23 +03:00
|
|
|
for (int i = 0; i < op->parts.size(); i++)
|
|
|
|
{
|
|
|
|
op->parts[i].flags = 0;
|
|
|
|
}
|
2021-04-03 14:51:52 +03:00
|
|
|
goto resume_2;
|
2020-06-07 00:29:09 +03:00
|
|
|
}
|
|
|
|
}
|
2021-04-03 14:51:52 +03:00
|
|
|
return 0;
|
2020-06-14 23:57:58 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
void cluster_client_t::slice_rw(cluster_op_t *op)
|
|
|
|
{
|
2020-06-06 01:39:58 +03:00
|
|
|
// Slice the request into individual object stripe requests
|
|
|
|
// Primary OSDs still operate individual stripes, but their size is multiplied by PG minsize in case of EC
|
2021-04-03 14:51:52 +03:00
|
|
|
auto & pool_cfg = st_cli.pool_config.at(INODE_POOL(op->inode));
|
|
|
|
uint32_t pg_data_size = (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks);
|
|
|
|
uint64_t pg_block_size = bs_block_size * pg_data_size;
|
2020-06-06 01:39:58 +03:00
|
|
|
uint64_t first_stripe = (op->offset / pg_block_size) * pg_block_size;
|
|
|
|
uint64_t last_stripe = ((op->offset + op->len + pg_block_size - 1) / pg_block_size - 1) * pg_block_size;
|
2020-06-14 23:57:58 +03:00
|
|
|
op->retval = 0;
|
2020-06-15 13:22:20 +03:00
|
|
|
op->parts.resize((last_stripe - first_stripe) / pg_block_size + 1);
|
2020-06-24 01:31:48 +03:00
|
|
|
int iov_idx = 0;
|
|
|
|
size_t iov_pos = 0;
|
2020-06-06 01:39:58 +03:00
|
|
|
int i = 0;
|
|
|
|
for (uint64_t stripe = first_stripe; stripe <= last_stripe; stripe += pg_block_size)
|
|
|
|
{
|
2020-10-01 18:51:49 +03:00
|
|
|
pg_num_t pg_num = (op->inode + stripe/pool_cfg.pg_stripe_size) % pool_cfg.real_pg_count + 1;
|
2020-06-15 13:22:20 +03:00
|
|
|
uint64_t begin = (op->offset < stripe ? stripe : op->offset);
|
|
|
|
uint64_t end = (op->offset + op->len) > (stripe + pg_block_size)
|
|
|
|
? (stripe + pg_block_size) : (op->offset + op->len);
|
2020-11-09 00:07:07 +03:00
|
|
|
op->parts[i] = (cluster_op_part_t){
|
2020-06-15 13:22:20 +03:00
|
|
|
.parent = op,
|
|
|
|
.offset = begin,
|
|
|
|
.len = (uint32_t)(end - begin),
|
|
|
|
.pg_num = pg_num,
|
2021-04-06 01:57:23 +03:00
|
|
|
.flags = 0,
|
2020-06-15 13:22:20 +03:00
|
|
|
};
|
2020-06-24 01:31:48 +03:00
|
|
|
int left = end-begin;
|
|
|
|
while (left > 0 && iov_idx < op->iov.count)
|
|
|
|
{
|
2020-06-26 02:18:58 +03:00
|
|
|
if (op->iov.buf[iov_idx].iov_len - iov_pos < left)
|
2020-06-24 01:31:48 +03:00
|
|
|
{
|
|
|
|
op->parts[i].iov.push_back(op->iov.buf[iov_idx].iov_base + iov_pos, op->iov.buf[iov_idx].iov_len - iov_pos);
|
|
|
|
left -= (op->iov.buf[iov_idx].iov_len - iov_pos);
|
|
|
|
iov_pos = 0;
|
|
|
|
iov_idx++;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
op->parts[i].iov.push_back(op->iov.buf[iov_idx].iov_base + iov_pos, left);
|
|
|
|
iov_pos += left;
|
|
|
|
left = 0;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
assert(left == 0);
|
2020-06-15 13:22:20 +03:00
|
|
|
i++;
|
2020-06-06 01:39:58 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-04-03 14:51:52 +03:00
|
|
|
bool cluster_client_t::affects_osd(uint64_t inode, uint64_t offset, uint64_t len, osd_num_t osd)
|
|
|
|
{
|
|
|
|
auto & pool_cfg = st_cli.pool_config.at(INODE_POOL(inode));
|
|
|
|
uint32_t pg_data_size = (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks);
|
|
|
|
uint64_t pg_block_size = bs_block_size * pg_data_size;
|
|
|
|
uint64_t first_stripe = (offset / pg_block_size) * pg_block_size;
|
|
|
|
uint64_t last_stripe = ((offset + len + pg_block_size - 1) / pg_block_size - 1) * pg_block_size;
|
|
|
|
for (uint64_t stripe = first_stripe; stripe <= last_stripe; stripe += pg_block_size)
|
|
|
|
{
|
|
|
|
pg_num_t pg_num = (stripe/pool_cfg.pg_stripe_size) % pool_cfg.real_pg_count + 1; // like map_to_pg()
|
|
|
|
auto pg_it = pool_cfg.pg_config.find(pg_num);
|
|
|
|
if (pg_it != pool_cfg.pg_config.end() && pg_it->second.cur_primary == osd)
|
|
|
|
{
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
bool cluster_client_t::try_send(cluster_op_t *op, int i)
|
2020-06-06 01:39:58 +03:00
|
|
|
{
|
2021-04-03 14:51:52 +03:00
|
|
|
auto part = &op->parts[i];
|
2020-09-03 00:52:41 +03:00
|
|
|
auto & pool_cfg = st_cli.pool_config[INODE_POOL(op->inode)];
|
|
|
|
auto pg_it = pool_cfg.pg_config.find(part->pg_num);
|
|
|
|
if (pg_it != pool_cfg.pg_config.end() &&
|
2020-06-06 01:39:58 +03:00
|
|
|
!pg_it->second.pause && pg_it->second.cur_primary)
|
|
|
|
{
|
|
|
|
osd_num_t primary_osd = pg_it->second.cur_primary;
|
|
|
|
auto peer_it = msgr.osd_peer_fds.find(primary_osd);
|
|
|
|
if (peer_it != msgr.osd_peer_fds.end())
|
|
|
|
{
|
|
|
|
int peer_fd = peer_it->second;
|
|
|
|
part->osd_num = primary_osd;
|
2021-04-06 01:57:23 +03:00
|
|
|
part->flags |= PART_SENT;
|
|
|
|
op->inflight_count++;
|
2020-11-09 00:07:07 +03:00
|
|
|
part->op = (osd_op_t){
|
2020-06-06 01:39:58 +03:00
|
|
|
.op_type = OSD_OP_OUT,
|
|
|
|
.peer_fd = peer_fd,
|
|
|
|
.req = { .rw = {
|
|
|
|
.header = {
|
|
|
|
.magic = SECONDARY_OSD_OP_MAGIC,
|
|
|
|
.id = op_id++,
|
|
|
|
.opcode = op->opcode,
|
|
|
|
},
|
|
|
|
.inode = op->inode,
|
|
|
|
.offset = part->offset,
|
|
|
|
.len = part->len,
|
|
|
|
} },
|
|
|
|
.callback = [this, part](osd_op_t *op_part)
|
|
|
|
{
|
|
|
|
handle_op_part(part);
|
|
|
|
},
|
|
|
|
};
|
2020-06-24 01:31:48 +03:00
|
|
|
part->op.iov = part->iov;
|
2020-06-06 01:39:58 +03:00
|
|
|
msgr.outbox_push(&part->op);
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
else if (msgr.wanted_peers.find(primary_osd) == msgr.wanted_peers.end())
|
|
|
|
{
|
|
|
|
msgr.connect_peer(primary_osd, st_cli.peer_states[primary_osd]);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2021-04-03 14:51:52 +03:00
|
|
|
int cluster_client_t::continue_sync(cluster_op_t *op)
|
2020-06-14 23:57:58 +03:00
|
|
|
{
|
2021-04-03 14:51:52 +03:00
|
|
|
if (op->state == 1)
|
|
|
|
goto resume_1;
|
|
|
|
if (immediate_commit || !dirty_osds.size())
|
2020-06-14 23:57:58 +03:00
|
|
|
{
|
2021-04-03 14:51:52 +03:00
|
|
|
// Sync is not required in the immediate_commit mode or if there are no dirty_osds
|
2020-06-14 23:57:58 +03:00
|
|
|
op->retval = 0;
|
|
|
|
std::function<void(cluster_op_t*)>(op->callback)(op);
|
2021-04-03 14:51:52 +03:00
|
|
|
return 1;
|
2020-06-14 23:57:58 +03:00
|
|
|
}
|
|
|
|
// Check that all OSD connections are still alive
|
2021-04-03 14:51:52 +03:00
|
|
|
for (auto sync_osd: dirty_osds)
|
2020-06-14 23:57:58 +03:00
|
|
|
{
|
|
|
|
auto peer_it = msgr.osd_peer_fds.find(sync_osd);
|
|
|
|
if (peer_it == msgr.osd_peer_fds.end())
|
|
|
|
{
|
2021-04-03 14:51:52 +03:00
|
|
|
return 0;
|
2020-06-14 23:57:58 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
// Post sync to affected OSDs
|
2021-04-03 14:51:52 +03:00
|
|
|
for (auto & prev_op: dirty_buffers)
|
2020-06-14 23:57:58 +03:00
|
|
|
{
|
2021-04-03 14:51:52 +03:00
|
|
|
if (prev_op.second.state == CACHE_DIRTY)
|
|
|
|
{
|
|
|
|
prev_op.second.state = CACHE_FLUSHING;
|
|
|
|
}
|
2020-06-14 23:57:58 +03:00
|
|
|
}
|
2021-04-03 14:51:52 +03:00
|
|
|
op->parts.resize(dirty_osds.size());
|
|
|
|
op->retval = 0;
|
2020-06-27 02:13:33 +03:00
|
|
|
{
|
2021-04-03 14:51:52 +03:00
|
|
|
int i = 0;
|
|
|
|
for (auto sync_osd: dirty_osds)
|
2020-06-27 02:13:33 +03:00
|
|
|
{
|
2021-04-03 14:51:52 +03:00
|
|
|
op->parts[i] = {
|
|
|
|
.parent = op,
|
|
|
|
.osd_num = sync_osd,
|
2021-04-06 01:57:23 +03:00
|
|
|
.flags = 0,
|
2021-04-03 14:51:52 +03:00
|
|
|
};
|
|
|
|
send_sync(op, &op->parts[i]);
|
|
|
|
i++;
|
2020-06-27 02:13:33 +03:00
|
|
|
}
|
|
|
|
}
|
2021-04-03 14:51:52 +03:00
|
|
|
dirty_osds.clear();
|
|
|
|
resume_1:
|
2021-04-06 01:57:23 +03:00
|
|
|
if (op->inflight_count > 0)
|
2020-06-14 23:57:58 +03:00
|
|
|
{
|
2021-04-03 14:51:52 +03:00
|
|
|
op->state = 1;
|
|
|
|
return 0;
|
2020-06-14 23:57:58 +03:00
|
|
|
}
|
2021-04-03 14:51:52 +03:00
|
|
|
if (op->retval != 0)
|
2020-06-14 23:57:58 +03:00
|
|
|
{
|
2021-04-03 14:51:52 +03:00
|
|
|
for (auto uw_it = dirty_buffers.begin(); uw_it != dirty_buffers.end(); uw_it++)
|
2020-06-14 23:57:58 +03:00
|
|
|
{
|
2021-04-03 14:51:52 +03:00
|
|
|
if (uw_it->second.state == CACHE_FLUSHING)
|
2020-06-14 23:57:58 +03:00
|
|
|
{
|
2021-04-03 14:51:52 +03:00
|
|
|
uw_it->second.state = CACHE_DIRTY;
|
2020-06-14 23:57:58 +03:00
|
|
|
}
|
|
|
|
}
|
2021-04-03 14:51:52 +03:00
|
|
|
if (op->retval == -EPIPE)
|
|
|
|
{
|
|
|
|
// Retry later
|
|
|
|
op->parts.clear();
|
|
|
|
op->retval = 0;
|
2021-04-06 01:57:23 +03:00
|
|
|
op->inflight_count = 0;
|
2021-04-03 14:51:52 +03:00
|
|
|
op->done_count = 0;
|
|
|
|
op->state = 0;
|
|
|
|
return 0;
|
|
|
|
}
|
2020-06-14 23:57:58 +03:00
|
|
|
}
|
2021-04-03 14:51:52 +03:00
|
|
|
else
|
2020-06-14 23:57:58 +03:00
|
|
|
{
|
2021-04-03 14:51:52 +03:00
|
|
|
for (auto uw_it = dirty_buffers.begin(); uw_it != dirty_buffers.end(); )
|
|
|
|
{
|
|
|
|
if (uw_it->second.state == CACHE_FLUSHING)
|
|
|
|
{
|
|
|
|
free(uw_it->second.buf);
|
|
|
|
dirty_buffers.erase(uw_it++);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
uw_it++;
|
|
|
|
}
|
2020-06-14 23:57:58 +03:00
|
|
|
}
|
2021-04-03 14:51:52 +03:00
|
|
|
std::function<void(cluster_op_t*)>(op->callback)(op);
|
|
|
|
return 1;
|
2020-06-14 23:57:58 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
void cluster_client_t::send_sync(cluster_op_t *op, cluster_op_part_t *part)
|
|
|
|
{
|
|
|
|
auto peer_it = msgr.osd_peer_fds.find(part->osd_num);
|
|
|
|
assert(peer_it != msgr.osd_peer_fds.end());
|
2021-04-06 01:57:23 +03:00
|
|
|
part->flags |= PART_SENT;
|
|
|
|
op->inflight_count++;
|
2020-11-09 00:07:07 +03:00
|
|
|
part->op = (osd_op_t){
|
2020-06-14 23:57:58 +03:00
|
|
|
.op_type = OSD_OP_OUT,
|
|
|
|
.peer_fd = peer_it->second,
|
|
|
|
.req = {
|
|
|
|
.hdr = {
|
|
|
|
.magic = SECONDARY_OSD_OP_MAGIC,
|
|
|
|
.id = op_id++,
|
|
|
|
.opcode = OSD_OP_SYNC,
|
|
|
|
},
|
|
|
|
},
|
|
|
|
.callback = [this, part](osd_op_t *op_part)
|
|
|
|
{
|
|
|
|
handle_op_part(part);
|
|
|
|
},
|
|
|
|
};
|
|
|
|
msgr.outbox_push(&part->op);
|
|
|
|
}
|
|
|
|
|
2020-06-06 01:39:58 +03:00
|
|
|
void cluster_client_t::handle_op_part(cluster_op_part_t *part)
|
|
|
|
{
|
|
|
|
cluster_op_t *op = part->parent;
|
2021-04-06 01:57:23 +03:00
|
|
|
op->inflight_count--;
|
2020-06-14 23:57:58 +03:00
|
|
|
int expected = part->op.req.hdr.opcode == OSD_OP_SYNC ? 0 : part->op.req.rw.len;
|
|