In theory, implement syncs and replay for the non-immediate commit mode

Vitaliy Filippov 2020-06-14 23:57:58 +03:00
parent 4dde8b8a42
commit 64afec03ec
4 changed files with 422 additions and 115 deletions

View File

@ -9,10 +9,32 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd
msgr.ringloop = ringloop;
msgr.repeer_pgs = [this](osd_num_t peer_osd)
{
// peer_osd just connected or dropped connection
if (msgr.osd_peer_fds.find(peer_osd) != msgr.osd_peer_fds.end())
{
// really connected :)
// peer_osd just connected
continue_ops();
}
else if (unsynced_writes.size())
{
// peer_osd just dropped connection
for (auto op: unsynced_writes)
{
for (auto & part: op->parts)
{
if (part.osd_num == peer_osd && part.done)
{
// repeat this operation
part.osd_num = 0;
part.done = false;
assert(!part.sent);
op->done_count--;
}
}
if (op->done_count < op->parts.size())
{
cur_ops.insert(op);
}
}
continue_ops();
}
};
@ -30,40 +52,19 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd
void cluster_client_t::continue_ops()
{
for (auto op_it = unsent_ops.begin(); op_it != unsent_ops.end(); )
if (retry_timeout_id)
{
cluster_op_t *op = *op_it;
if (op->needs_reslice && !op->sent_count)
{
op->parts.clear();
op->done_count = 0;
op->needs_reslice = false;
}
if (!op->parts.size())
{
unsent_ops.erase(op_it++);
execute(op);
continue;
}
if (!op->needs_reslice)
{
for (auto & op_part: op->parts)
{
if (!op_part.sent && !op_part.done)
{
try_send(op, &op_part);
}
}
if (op->sent_count == op->parts.size() - op->done_count)
{
unsent_ops.erase(op_it++);
sent_ops.insert(op);
}
else
op_it++;
}
else
op_it++;
tfd->clear_timer(retry_timeout_id);
retry_timeout_id = 0;
}
if (!pg_count)
{
// Config is not loaded yet
return;
}
for (auto op_it = cur_ops.begin(); op_it != cur_ops.end(); )
{
continue_rw(*op_it++);
}
}
@ -88,33 +89,53 @@ void cluster_client_t::on_load_config_hook(json11::Json::object & config)
bs_disk_alignment = config["disk_alignment"].uint64_value();
bs_bitmap_granularity = config["bitmap_granularity"].uint64_value();
if (!bs_block_size)
bs_block_size = DEFAULT_BLOCK_SIZE;
if (!bs_disk_alignment)
bs_disk_alignment = DEFAULT_DISK_ALIGNMENT;
if (!bs_bitmap_granularity)
bs_bitmap_granularity = DEFAULT_BITMAP_GRANULARITY;
{
uint32_t block_order;
if ((block_order = is_power_of_two(bs_block_size)) >= 64 || bs_block_size < MIN_BLOCK_SIZE || bs_block_size >= MAX_BLOCK_SIZE)
throw std::runtime_error("Bad block size");
bs_block_size = DEFAULT_BLOCK_SIZE;
}
if (!bs_disk_alignment)
{
bs_disk_alignment = DEFAULT_DISK_ALIGNMENT;
}
if (!bs_bitmap_granularity)
{
bs_bitmap_granularity = DEFAULT_BITMAP_GRANULARITY;
}
uint32_t block_order;
if ((block_order = is_power_of_two(bs_block_size)) >= 64 || bs_block_size < MIN_BLOCK_SIZE || bs_block_size >= MAX_BLOCK_SIZE)
{
throw std::runtime_error("Bad block size");
}
if (config.find("pg_stripe_size") != config.end())
{
pg_stripe_size = config["pg_stripe_size"].uint64_value();
if (!pg_stripe_size)
{
pg_stripe_size = DEFAULT_PG_STRIPE_SIZE;
}
}
if (config["immediate_commit"] == "all")
{
// Cluster-wide immediate_commit mode
immediate_commit = true;
}
else if (config.find("client_dirty_limit") != config.end())
{
client_dirty_limit = config["client_dirty_limit"].uint64_value();
if (!client_dirty_limit)
{
client_dirty_limit = DEFAULT_CLIENT_DIRTY_LIMIT;
}
}
msgr.peer_connect_interval = config["peer_connect_interval"].uint64_value();
if (!msgr.peer_connect_interval)
{
msgr.peer_connect_interval = DEFAULT_PEER_CONNECT_INTERVAL;
}
msgr.peer_connect_timeout = config["peer_connect_timeout"].uint64_value();
if (!msgr.peer_connect_timeout)
{
msgr.peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT;
}
}
void cluster_client_t::on_load_pgs_hook(bool success)
@ -122,7 +143,11 @@ void cluster_client_t::on_load_pgs_hook(bool success)
if (success)
{
pg_count = st_cli.pg_config.size();
continue_ops();
for (auto op: offline_ops)
{
execute(op);
}
offline_ops.clear();
}
}
@ -131,16 +156,24 @@ void cluster_client_t::on_change_hook(json11::Json::object & changes)
if (pg_count != st_cli.pg_config.size())
{
// At this point, all operations should be suspended
// And they need to be resliced!
for (auto op: unsent_ops)
// And they have to be resliced!
for (auto op: cur_ops)
{
op->needs_reslice = true;
}
for (auto op: sent_ops)
for (auto op: unsynced_writes)
{
op->needs_reslice = true;
}
pg_count = st_cli.pg_config.size();
if (pg_count)
{
for (auto op: offline_ops)
{
execute(op);
}
offline_ops.clear();
}
}
continue_ops();
}
@ -153,53 +186,176 @@ void cluster_client_t::on_change_osd_state_hook(uint64_t peer_osd)
}
}
// FIXME: Implement OSD_OP_SYNC for immediate_commit == false
/**
* How writes are synced when immediate_commit is false
*
* WRITE:
* 1) copy op to the queue
* 2) slice op
* 3) connect & send all parts
* 4) if any of them fail due to disconnected peers, repeat after reconnecting
* 5) if any of them fail due to other errors, fail the operation
* 6) if PG count changes before all parts are done, wait for all in-progress parts to finish,
* throw all results away, reslice and resubmit op
* 7) when all parts are done, try to continue SYNCs
*
* SYNC:
* 1) add SYNC to the queue after writes
* 2) try to continue SYNCs
*
* Continue SYNCs:
* 1) check the queue for a SYNC ready for submission. if there is one:
* 2) slice it
* 3) connect & send all SYNC parts
* 4) if any of them fail due to disconnected peers, repeat SYNC after repeating all writes
* 5) if any of them fail due to other errors, fail the SYNC operation
*/
void cluster_client_t::execute(cluster_op_t *op)
{
if (op->opcode == OSD_OP_SYNC && immediate_commit)
{
// Syncs are not required in the immediate_commit mode
op->retval = 0;
std::function<void(cluster_op_t*)>(op->callback)(op);
return;
}
if (op->opcode != OSD_OP_READ && op->opcode != OSD_OP_OUT || !op->inode || !op->len ||
op->offset % bs_disk_alignment || op->len % bs_disk_alignment)
op->retval = 0;
if (op->opcode != OSD_OP_SYNC && op->opcode != OSD_OP_READ && op->opcode != OSD_OP_WRITE ||
(op->opcode == OSD_OP_READ || op->opcode == OSD_OP_WRITE) && (!op->inode || !op->len ||
op->offset % bs_disk_alignment || op->len % bs_disk_alignment))
{
op->retval = -EINVAL;
std::function<void(cluster_op_t*)>(op->callback)(op);
return;
}
if (!pg_stripe_size)
if (!pg_count)
{
// Config is not loaded yet
unsent_ops.insert(op);
// Config is not loaded yet, retry after connecting to etcd
offline_ops.push_back(op);
return;
}
if (op->opcode == OSD_OP_SYNC)
{
execute_sync(op);
return;
}
if (op->opcode == OSD_OP_WRITE && !immediate_commit)
{
// Copy operation
if (next_writes.size() > 0)
{
next_writes.push_back(op);
return;
}
if (queued_bytes >= client_dirty_limit)
{
// Push an extra SYNC operation to flush previous writes
cluster_op_t *sync_op = new cluster_op_t;
sync_op->is_internal = true;
sync_op->opcode = OSD_OP_SYNC;
sync_op->callback = [](cluster_op_t* sync_op) {};
execute_sync(sync_op);
next_writes.push_back(op);
queued_bytes = 0;
return;
}
queued_bytes += op->len;
}
cur_ops.insert(op);
continue_rw(op);
}
void cluster_client_t::continue_rw(cluster_op_t *op)
{
if (!pg_count)
{
return;
}
if (op->opcode == OSD_OP_WRITE && !immediate_commit && !op->is_internal)
{
// Save operation for replay when PG goes out of sync
// (primary OSD drops our connection in this case)
cluster_op_t *op_copy = new cluster_op_t();
op_copy->is_internal = true;
op_copy->orig_op = op;
op_copy->opcode = op->opcode;
op_copy->inode = op->inode;
op_copy->offset = op->offset;
op_copy->len = op->len;
op_copy->buf = malloc(op->len);
op_copy->callback = [](cluster_op_t* op_copy)
{
if (op_copy->orig_op)
{
// Acknowledge write and forget the original pointer
op_copy->orig_op->retval = op_copy->retval;
std::function<void(cluster_op_t*)>(op_copy->orig_op->callback)(op_copy->orig_op);
op_copy->orig_op = NULL;
}
};
memcpy(op_copy->buf, op->buf, op->len);
unsynced_ops.push_back(op_copy);
unsynced_bytes += op->len;
if (inmemory_commit)
unsynced_writes.push_back(op_copy);
cur_ops.erase(op);
cur_ops.insert(op_copy);
op = op_copy;
}
if (!op->parts.size())
{
// Slice the operation into parts
slice_rw(op);
}
if (!op->needs_reslice)
{
// Send unsent parts, if they're not subject to change
for (auto & op_part: op->parts)
{
// Immediately acknowledge write and continue with the copy
op->retval = op->len;
std::function<void(cluster_op_t*)>(op->callback)(op);
op = op_copy;
}
if (unsynced_bytes >= inmemory_dirty_limit)
{
// Push an extra SYNC operation
if (!op_part.sent && !op_part.done)
{
try_send(op, &op_part);
}
}
}
if (!op->sent_count)
{
if (op->done_count >= op->parts.size())
{
// Finished successfully
// Even if the PG count has changed in meanwhile we treat it as success
// because if some operations were invalid for the new PG count we'd get errors
cur_ops.erase(op);
op->retval = op->len;
std::function<void(cluster_op_t*)>(op->callback)(op);
continue_sync();
}
else if (op->retval != 0 && op->retval != -EPIPE)
{
// Fatal error (not -EPIPE)
cur_ops.erase(op);
if (!immediate_commit && op->opcode == OSD_OP_WRITE)
{
for (int i = 0; i < unsynced_writes.size(); i++)
{
if (unsynced_writes[i] == op)
{
unsynced_writes.erase(unsynced_writes.begin()+i, unsynced_writes.begin()+i+1);
break;
}
}
}
bool del = op->is_internal;
std::function<void(cluster_op_t*)>(op->callback)(op);
if (del)
{
delete op;
}
continue_sync();
}
else if (op->needs_reslice)
{
op->parts.clear();
op->done_count = 0;
op->needs_reslice = false;
continue_rw(op);
}
// else -EPIPE
}
}
void cluster_client_t::slice_rw(cluster_op_t *op)
{
// Slice the request into individual object stripe requests
// Primary OSDs still operate individual stripes, but their size is multiplied by PG minsize in case of EC
uint64_t pg_block_size = bs_block_size * pg_part_count;
@ -213,8 +369,8 @@ void cluster_client_t::execute(cluster_op_t *op)
part_count++;
}
}
op->retval = 0;
op->parts.resize(part_count);
bool resend = false;
int i = 0;
for (uint64_t stripe = first_stripe; stripe <= last_stripe; stripe += pg_block_size)
{
@ -231,22 +387,9 @@ void cluster_client_t::execute(cluster_op_t *op)
.sent = false,
.done = false,
};
if (!try_send(op, &op->parts[i]))
{
// Part needs to be sent later
resend = true;
}
i++;
}
}
if (resend)
{
unsent_ops.insert(op);
}
else
{
sent_ops.insert(op);
}
}
bool cluster_client_t::try_send(cluster_op_t *op, cluster_op_part_t *part)
@ -286,7 +429,7 @@ bool cluster_client_t::try_send(cluster_op_t *op, cluster_op_part_t *part)
{
part->op.send_list.push_back(part->buf, part->len);
}
else
else /* if (op->opcode == OSD_OP_READ) */
{
part->op.buf = part->buf;
}
@ -301,36 +444,163 @@ bool cluster_client_t::try_send(cluster_op_t *op, cluster_op_part_t *part)
return false;
}
void cluster_client_t::execute_sync(cluster_op_t *op)
{
if (immediate_commit)
{
// Syncs are not required in the immediate_commit mode
op->retval = 0;
std::function<void(cluster_op_t*)>(op->callback)(op);
}
else if (next_writes.size() > 0 || cur_sync != NULL)
{
// There are some writes postponed to be executed after SYNC
// Push this SYNC after them
next_writes.push_back(op);
}
else
{
cur_sync = op;
continue_sync();
}
}
void cluster_client_t::continue_sync()
{
if (!cur_sync || cur_sync->parts.size() > 0)
{
// Already submitted
return;
}
cur_sync->retval = 0;
std::set<osd_num_t> sync_osds;
for (auto prev_op: unsynced_writes)
{
if (prev_op->done_count < prev_op->parts.size())
{
// Writes not finished yet
return;
}
for (auto & part: prev_op->parts)
{
if (part.osd_num)
{
sync_osds.insert(part.osd_num);
}
}
}
if (!sync_osds.size())
{
// No dirty writes
finish_sync();
return;
}
// Check that all OSD connections are still alive
for (auto sync_osd: sync_osds)
{
auto peer_it = msgr.osd_peer_fds.find(sync_osd);
if (peer_it == msgr.osd_peer_fds.end())
{
// SYNC is pointless to send to a non connected OSD
return;
}
}
// Post sync to affected OSDs
cur_sync->parts.resize(sync_osds.size());
int i = 0;
for (auto sync_osd: sync_osds)
{
cur_sync->parts[i] = {
.parent = cur_sync,
.osd_num = sync_osd,
.sent = false,
.done = false,
};
send_sync(cur_sync, &cur_sync->parts[i]);
i++;
}
}
void cluster_client_t::finish_sync()
{
if (cur_sync->retval == -EPIPE)
{
// Retry later
cur_sync->parts.clear();
cur_sync->retval = 0;
cur_sync->sent_count = 0;
cur_sync->done_count = 0;
return;
}
std::function<void(cluster_op_t*)>(cur_sync->callback)(cur_sync);
if (!cur_sync->retval)
{
for (auto op: unsynced_writes)
{
if (op->is_internal)
{
delete op;
}
}
unsynced_writes.clear();
}
cur_sync = NULL;
int i;
for (i = 0; i < next_writes.size() && !cur_sync; i++)
{
execute(next_writes[i]);
}
next_writes.erase(next_writes.begin(), next_writes.begin()+i);
}
void cluster_client_t::send_sync(cluster_op_t *op, cluster_op_part_t *part)
{
auto peer_it = msgr.osd_peer_fds.find(part->osd_num);
assert(peer_it != msgr.osd_peer_fds.end());
part->sent = true;
op->sent_count++;
part->op = {
.op_type = OSD_OP_OUT,
.peer_fd = peer_it->second,
.req = {
.hdr = {
.magic = SECONDARY_OSD_OP_MAGIC,
.id = op_id++,
.opcode = OSD_OP_SYNC,
},
},
.callback = [this, part](osd_op_t *op_part)
{
handle_op_part(part);
},
};
part->op.send_list.push_back(part->op.req.buf, OSD_PACKET_SIZE);
msgr.outbox_push(&part->op);
}
void cluster_client_t::handle_op_part(cluster_op_part_t *part)
{
cluster_op_t *op = part->parent;
part->sent = false;
op->sent_count--;
part->op.buf = NULL;
if (part->op.reply.hdr.retval != part->op.req.rw.len)
int expected = part->op.req.hdr.opcode == OSD_OP_SYNC ? 0 : part->op.req.rw.len;
if (part->op.reply.hdr.retval != expected)
{
// Operation failed, retry
printf(
"Operation part failed on OSD %lu: retval=%ld (expected %u), reconnecting\n",
part->osd_num, part->op.reply.hdr.retval, part->op.req.rw.len
"Operation failed on OSD %lu: retval=%ld (expected %d), dropping connection\n",
part->osd_num, part->op.reply.hdr.retval, expected
);
msgr.stop_client(part->op.peer_fd);
if (op->sent_count == op->parts.size() - op->done_count - 1)
if (part->op.reply.hdr.retval && !retry_timeout_id)
{
// Resend later when OSDs come up
// FIXME: Check for different types of errors
// FIXME: Repeat operations after a small timeout, for the case when OSD is coming up
sent_ops.erase(op);
unsent_ops.insert(op);
retry_timeout_id = tfd->set_timer(up_wait_retry_interval, false, [this](int) { retry_timeout_id = 0; continue_ops(); });
}
if (op->sent_count == 0 && op->needs_reslice)
if (!op->retval || op->retval == -EPIPE)
{
// PG count has changed, reslice the operation
unsent_ops.erase(op);
op->parts.clear();
op->done_count = 0;
op->needs_reslice = false;
execute(op);
// Don't overwrite other errors with -EPIPE
op->retval = part->op.reply.hdr.retval;
}
}
else
@ -338,12 +608,17 @@ void cluster_client_t::handle_op_part(cluster_op_part_t *part)
// OK
part->done = true;
op->done_count++;
if (op->done_count >= op->parts.size())
}
if (op->sent_count == 0)
{
if (op->opcode == OSD_OP_SYNC)
{
// Finished!
sent_ops.erase(op);
op->retval = op->len;
std::function<void(cluster_op_t*)>(op->callback)(op);
assert(op == cur_sync);
finish_sync();
}
else
{
continue_rw(op);
}
}
}

View File

@ -9,6 +9,7 @@
#define DEFAULT_PG_STRIPE_SIZE 4*1024*1024
#define DEFAULT_DISK_ALIGNMENT 4096
#define DEFAULT_BITMAP_GRANULARITY 4096
#define DEFAULT_CLIENT_DIRTY_LIMIT 32*1024*1024
struct cluster_op_t;
@ -35,6 +36,8 @@ struct cluster_op_t
void *buf;
std::function<void(cluster_op_t*)> callback;
protected:
cluster_op_t *orig_op = NULL;
bool is_internal = false;
bool needs_reslice = false;
int sent_count = 0, done_count = 0;
std::vector<cluster_op_part_t> parts;
@ -53,17 +56,24 @@ class cluster_client_t
uint64_t bs_bitmap_granularity = 0;
uint64_t pg_count = 0;
bool immediate_commit = false;
bool inmemory_commit = false;
uint64_t inmemory_dirty_limit = 32*1024*1024;
// FIXME: Implement inmemory_commit mode. Note that it requires to return overlapping reads from memory.
uint64_t client_dirty_limit = 0;
int log_level;
int up_wait_retry_interval = 500; // ms
uint64_t op_id = 1;
etcd_state_client_t st_cli;
osd_messenger_t msgr;
std::set<cluster_op_t*> sent_ops, unsent_ops;
// operations currently in progress
std::set<cluster_op_t*> cur_ops;
int retry_timeout_id = 0;
// unsynced operations are copied in memory to allow replay when cluster isn't in the immediate_commit mode
std::vector<cluster_op_t*> unsynced_ops;
uint64_t unsynced_bytes = 0;
// unsynced_writes are replayed in any order (because only the SYNC operation guarantees ordering)
std::vector<cluster_op_t*> unsynced_writes;
cluster_op_t* cur_sync = NULL;
std::vector<cluster_op_t*> next_writes;
std::vector<cluster_op_t*> offline_ops;
uint64_t queued_bytes = 0;
public:
cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd, json11::Json & config);
@ -71,10 +81,16 @@ public:
protected:
void continue_ops();
void on_load_config_hook(json11::Json::object & cfg);
void on_load_config_hook(json11::Json::object & config);
void on_load_pgs_hook(bool success);
void on_change_hook(json11::Json::object & changes);
void on_change_osd_state_hook(uint64_t peer_osd);
void continue_rw(cluster_op_t *op);
void slice_rw(cluster_op_t *op);
bool try_send(cluster_op_t *op, cluster_op_part_t *part);
void execute_sync(cluster_op_t *op);
void continue_sync();
void finish_sync();
void send_sync(cluster_op_t *op, cluster_op_part_t *part);
void handle_op_part(cluster_op_part_t *part);
};

View File

@ -131,7 +131,7 @@ struct osd_client_t
std::deque<osd_op_t*> outbox;
std::map<int, osd_op_t*> sent_ops;
// PGs dirtied by this client's primary-writes (FIXME to drop the connection)
// PGs dirtied by this client's primary-writes
std::set<pg_num_t> dirty_pgs;
// Write state

View File

@ -133,6 +133,22 @@ void osd_t::start_pg_peering(pg_num_t pg_num)
it++;
}
dirty_pgs.erase(pg.pg_num);
// Drop connections of clients who have this PG in dirty_pgs
if (immediate_commit != IMMEDIATE_ALL)
{
std::vector<int> to_stop;
for (auto & cp: c_cli.clients)
{
if (cp.second.dirty_pgs.find(pg_num) != cp.second.dirty_pgs.end())
{
to_stop.push_back(cp.first);
}
}
for (auto peer_fd: to_stop)
{
c_cli.stop_client(peer_fd);
}
}
// Calculate current write OSD set
pg.pg_cursize = 0;
pg.cur_set.resize(pg.target_set.size());