forked from vitalif/vitastor
Delete objects only after a SYNC during rebalance in the non-immediate_commit mode
Previously OSDs could commit deletes before writes during recovery or rebalance in the "lazy fsync" (immediate_commit=off) mode which could result in lost objectsrel-0.5
parent
c5fb1d5987
commit
435045751d
|
@ -66,6 +66,7 @@ const etcd_tree = {
|
|||
autosync_interval: 5,
|
||||
client_queue_depth: 128, // unused
|
||||
recovery_queue_depth: 4,
|
||||
recovery_sync_batch: 16,
|
||||
readonly: false,
|
||||
no_recovery: false,
|
||||
no_rebalance: false,
|
||||
|
|
|
@ -96,6 +96,9 @@ void osd_t::parse_config(blockstore_config_t & config)
|
|||
recovery_queue_depth = strtoull(config["recovery_queue_depth"].c_str(), NULL, 10);
|
||||
if (recovery_queue_depth < 1 || recovery_queue_depth > MAX_RECOVERY_QUEUE)
|
||||
recovery_queue_depth = DEFAULT_RECOVERY_QUEUE;
|
||||
recovery_sync_batch = strtoull(config["recovery_sync_batch"].c_str(), NULL, 10);
|
||||
if (recovery_sync_batch < 1 || recovery_sync_batch > MAX_RECOVERY_QUEUE)
|
||||
recovery_sync_batch = DEFAULT_RECOVERY_BATCH;
|
||||
if (config["readonly"] == "true" || config["readonly"] == "1" || config["readonly"] == "yes")
|
||||
readonly = true;
|
||||
print_stats_interval = strtoull(config["print_stats_interval"].c_str(), NULL, 10);
|
||||
|
|
|
@ -37,6 +37,7 @@
|
|||
#define DEFAULT_AUTOSYNC_INTERVAL 5
|
||||
#define MAX_RECOVERY_QUEUE 2048
|
||||
#define DEFAULT_RECOVERY_QUEUE 4
|
||||
#define DEFAULT_RECOVERY_BATCH 16
|
||||
|
||||
//#define OSD_STUB
|
||||
|
||||
|
@ -76,6 +77,7 @@ class osd_t
|
|||
int immediate_commit = IMMEDIATE_NONE;
|
||||
int autosync_interval = DEFAULT_AUTOSYNC_INTERVAL; // sync every 5 seconds
|
||||
int recovery_queue_depth = DEFAULT_RECOVERY_QUEUE;
|
||||
int recovery_sync_batch = DEFAULT_RECOVERY_BATCH;
|
||||
int log_level = 0;
|
||||
|
||||
// cluster state
|
||||
|
@ -97,9 +99,11 @@ class osd_t
|
|||
std::map<pool_pg_num_t, pg_t> pgs;
|
||||
std::set<pool_pg_num_t> dirty_pgs;
|
||||
std::set<osd_num_t> dirty_osds;
|
||||
int copies_to_delete_after_sync_count = 0;
|
||||
uint64_t misplaced_objects = 0, degraded_objects = 0, incomplete_objects = 0;
|
||||
int peering_state = 0;
|
||||
std::map<object_id, osd_recovery_op_t> recovery_ops;
|
||||
int recovery_done = 0;
|
||||
osd_op_t *autosync_op = NULL;
|
||||
|
||||
// Unstable writes
|
||||
|
@ -201,6 +205,7 @@ class osd_t
|
|||
void pg_cancel_write_queue(pg_t & pg, osd_op_t *first_op, object_id oid, int retval);
|
||||
void submit_primary_subops(int submit_type, uint64_t op_version, int pg_size, const uint64_t* osd_set, osd_op_t *cur_op);
|
||||
void submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, uint64_t set_size, pg_osd_set_t & loc_set);
|
||||
void submit_primary_del_batch(osd_op_t *cur_op, obj_ver_osd_t *chunks_to_delete, int chunks_to_delete_count);
|
||||
void submit_primary_sync_subops(osd_op_t *cur_op);
|
||||
void submit_primary_stab_subops(osd_op_t *cur_op);
|
||||
|
||||
|
|
|
@ -270,7 +270,6 @@ void osd_t::submit_recovery_op(osd_recovery_op_t *op)
|
|||
}
|
||||
op->osd_op->callback = [this, op](osd_op_t *osd_op)
|
||||
{
|
||||
// Don't sync the write, it will be synced by our regular sync coroutine
|
||||
if (osd_op->reply.hdr.retval < 0)
|
||||
{
|
||||
// Error recovering object
|
||||
|
@ -292,6 +291,17 @@ void osd_t::submit_recovery_op(osd_recovery_op_t *op)
|
|||
op->osd_op = NULL;
|
||||
recovery_ops.erase(op->oid);
|
||||
delete osd_op;
|
||||
if (immediate_commit != IMMEDIATE_ALL)
|
||||
{
|
||||
recovery_done++;
|
||||
if (recovery_done >= recovery_sync_batch)
|
||||
{
|
||||
// Force sync every <recovery_sync_batch> operations
|
||||
// This is required not to pile up an excessive amount of delete operations
|
||||
autosync();
|
||||
recovery_done = 0;
|
||||
}
|
||||
}
|
||||
continue_recovery();
|
||||
};
|
||||
exec_op(op->osd_op);
|
||||
|
|
|
@ -103,6 +103,8 @@ void osd_t::reset_pg(pg_t & pg)
|
|||
{
|
||||
pg.cur_peers.clear();
|
||||
pg.state_dict.clear();
|
||||
copies_to_delete_after_sync_count -= pg.copies_to_delete_after_sync.size();
|
||||
pg.copies_to_delete_after_sync.clear();
|
||||
incomplete_objects -= pg.incomplete_objects.size();
|
||||
misplaced_objects -= pg.misplaced_objects.size();
|
||||
degraded_objects -= pg.degraded_objects.size();
|
||||
|
|
|
@ -56,6 +56,13 @@ struct obj_piece_id_t
|
|||
uint64_t osd_num;
|
||||
};
|
||||
|
||||
struct obj_ver_osd_t
|
||||
{
|
||||
uint64_t osd_num;
|
||||
object_id oid;
|
||||
uint64_t version;
|
||||
};
|
||||
|
||||
struct flush_action_t
|
||||
{
|
||||
bool rollback = false, make_stable = false;
|
||||
|
@ -101,6 +108,7 @@ struct pg_t
|
|||
std::map<pg_osd_set_t, pg_osd_set_state_t> state_dict;
|
||||
btree::btree_map<object_id, pg_osd_set_state_t*> incomplete_objects, misplaced_objects, degraded_objects;
|
||||
std::map<obj_piece_id_t, flush_action_t> flush_actions;
|
||||
std::vector<obj_ver_osd_t> copies_to_delete_after_sync;
|
||||
btree::btree_map<object_id, uint64_t> ver_override;
|
||||
pg_peering_state_t *peering_state = NULL;
|
||||
pg_flush_batch_t *flush_batch = NULL;
|
||||
|
|
|
@ -367,6 +367,32 @@ resume_7:
|
|||
}
|
||||
// Any kind of a non-clean object can have extra chunks, because we don't record objects
|
||||
// as degraded & misplaced or incomplete & misplaced at the same time. So try to remove extra chunks
|
||||
if (immediate_commit != IMMEDIATE_ALL)
|
||||
{
|
||||
// We can't remove extra chunks yet if fsyncs are explicit, because
|
||||
// new copies may not be committed to stable storage yet
|
||||
// We can only remove extra chunks after a successful SYNC for this PG
|
||||
for (auto & chunk: op_data->object_state->osd_set)
|
||||
{
|
||||
// Check is the same as in submit_primary_del_subops()
|
||||
if (op_data->scheme == POOL_SCHEME_REPLICATED
|
||||
? !contains_osd(pg.cur_set.data(), pg.pg_size, chunk.osd_num)
|
||||
: (chunk.osd_num != pg.cur_set[chunk.role]))
|
||||
{
|
||||
pg.copies_to_delete_after_sync.push_back((obj_ver_osd_t){
|
||||
.osd_num = chunk.osd_num,
|
||||
.oid = {
|
||||
.inode = op_data->oid.inode,
|
||||
.stripe = op_data->oid.stripe | (op_data->scheme == POOL_SCHEME_REPLICATED ? 0 : chunk.role),
|
||||
},
|
||||
.version = op_data->fact_ver,
|
||||
});
|
||||
copies_to_delete_after_sync_count++;
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
submit_primary_del_subops(cur_op, pg.cur_set.data(), pg.pg_size, op_data->object_state->osd_set);
|
||||
if (op_data->n_subops > 0)
|
||||
{
|
||||
|
@ -380,6 +406,7 @@ resume_9:
|
|||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Clear object state
|
||||
remove_object_from_state(op_data->oid, op_data->object_state, pg);
|
||||
pg.clean_count++;
|
||||
|
@ -511,6 +538,8 @@ void osd_t::continue_primary_sync(osd_op_t *cur_op)
|
|||
else if (op_data->st == 4) goto resume_4;
|
||||
else if (op_data->st == 5) goto resume_5;
|
||||
else if (op_data->st == 6) goto resume_6;
|
||||
else if (op_data->st == 7) goto resume_7;
|
||||
else if (op_data->st == 8) goto resume_8;
|
||||
assert(op_data->st == 0);
|
||||
if (syncs_in_progress.size() > 0)
|
||||
{
|
||||
|
@ -572,11 +601,34 @@ resume_2:
|
|||
this->unstable_writes.clear();
|
||||
}
|
||||
{
|
||||
void *dirty_buf = malloc_or_die(sizeof(pool_pg_num_t)*dirty_pgs.size() + sizeof(osd_num_t)*dirty_osds.size());
|
||||
void *dirty_buf = malloc_or_die(
|
||||
sizeof(pool_pg_num_t)*dirty_pgs.size() +
|
||||
sizeof(osd_num_t)*dirty_osds.size() +
|
||||
sizeof(obj_ver_osd_t)*this->copies_to_delete_after_sync_count
|
||||
);
|
||||
op_data->dirty_pgs = (pool_pg_num_t*)dirty_buf;
|
||||
op_data->dirty_osds = (osd_num_t*)(dirty_buf + sizeof(pool_pg_num_t)*dirty_pgs.size());
|
||||
op_data->dirty_pg_count = dirty_pgs.size();
|
||||
op_data->dirty_osd_count = dirty_osds.size();
|
||||
if (this->copies_to_delete_after_sync_count)
|
||||
{
|
||||
op_data->copies_to_delete_count = 0;
|
||||
op_data->copies_to_delete = (obj_ver_osd_t*)(op_data->dirty_osds + op_data->dirty_osd_count);
|
||||
for (auto dirty_pg_num: dirty_pgs)
|
||||
{
|
||||
auto & pg = pgs.at(dirty_pg_num);
|
||||
assert(pg.copies_to_delete_after_sync.size() <= this->copies_to_delete_after_sync_count);
|
||||
memcpy(
|
||||
op_data->copies_to_delete + op_data->copies_to_delete_count,
|
||||
pg.copies_to_delete_after_sync.data(),
|
||||
sizeof(obj_ver_osd_t)*pg.copies_to_delete_after_sync.size()
|
||||
);
|
||||
op_data->copies_to_delete_count += pg.copies_to_delete_after_sync.size();
|
||||
this->copies_to_delete_after_sync_count -= pg.copies_to_delete_after_sync.size();
|
||||
pg.copies_to_delete_after_sync.clear();
|
||||
}
|
||||
assert(this->copies_to_delete_after_sync_count == 0);
|
||||
}
|
||||
int dpg = 0;
|
||||
for (auto dirty_pg_num: dirty_pgs)
|
||||
{
|
||||
|
@ -649,6 +701,36 @@ resume_6:
|
|||
}
|
||||
}
|
||||
}
|
||||
if (op_data->copies_to_delete)
|
||||
{
|
||||
// Return 'copies to delete' back into respective PGs
|
||||
for (int i = 0; i < op_data->copies_to_delete_count; i++)
|
||||
{
|
||||
auto & w = op_data->copies_to_delete[i];
|
||||
auto & pg = pgs.at((pool_pg_num_t){
|
||||
.pool_id = INODE_POOL(w.oid.inode),
|
||||
.pg_num = map_to_pg(w.oid, st_cli.pool_config.at(INODE_POOL(w.oid.inode)).pg_stripe_size),
|
||||
});
|
||||
if (pg.state & PG_ACTIVE)
|
||||
{
|
||||
pg.copies_to_delete_after_sync.push_back(w);
|
||||
copies_to_delete_after_sync_count++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (op_data->copies_to_delete)
|
||||
{
|
||||
// Actually delete copies which we wanted to delete
|
||||
submit_primary_del_batch(cur_op, op_data->copies_to_delete, op_data->copies_to_delete_count);
|
||||
resume_7:
|
||||
op_data->st = 7;
|
||||
return;
|
||||
resume_8:
|
||||
if (op_data->errors > 0)
|
||||
{
|
||||
goto resume_6;
|
||||
}
|
||||
}
|
||||
for (int i = 0; i < op_data->dirty_pg_count; i++)
|
||||
{
|
||||
|
|
|
@ -38,4 +38,8 @@ struct osd_primary_op_data_t
|
|||
osd_num_t *dirty_osds = NULL;
|
||||
int dirty_osd_count = 0;
|
||||
obj_ver_id *unstable_writes = NULL;
|
||||
obj_ver_osd_t *copies_to_delete = NULL;
|
||||
int copies_to_delete_count = 0;
|
||||
};
|
||||
|
||||
bool contains_osd(osd_num_t *osd_set, uint64_t size, osd_num_t osd_num);
|
||||
|
|
|
@ -355,7 +355,7 @@ void osd_t::cancel_primary_write(osd_op_t *cur_op)
|
|||
}
|
||||
}
|
||||
|
||||
static bool contains_osd(osd_num_t *osd_set, uint64_t size, osd_num_t osd_num)
|
||||
bool contains_osd(osd_num_t *osd_set, uint64_t size, osd_num_t osd_num)
|
||||
{
|
||||
for (uint64_t i = 0; i < size; i++)
|
||||
{
|
||||
|
@ -371,29 +371,43 @@ void osd_t::submit_primary_del_subops(osd_op_t *cur_op, osd_num_t *cur_set, uint
|
|||
{
|
||||
osd_primary_op_data_t *op_data = cur_op->op_data;
|
||||
bool rep = op_data->scheme == POOL_SCHEME_REPLICATED;
|
||||
int extra_chunks = 0;
|
||||
// ordered comparison for EC/XOR, unordered for replicated pools
|
||||
obj_ver_osd_t extra_chunks[loc_set.size()];
|
||||
int chunks_to_del = 0;
|
||||
for (auto & chunk: loc_set)
|
||||
{
|
||||
if (!cur_set || (rep ? !contains_osd(cur_set, set_size, chunk.osd_num) : chunk.osd_num != cur_set[chunk.role]))
|
||||
// ordered comparison for EC/XOR, unordered for replicated pools
|
||||
if (!cur_set || (rep
|
||||
? !contains_osd(cur_set, set_size, chunk.osd_num)
|
||||
: (chunk.osd_num != cur_set[chunk.role])))
|
||||
{
|
||||
extra_chunks++;
|
||||
extra_chunks[chunks_to_del++] = (obj_ver_osd_t){
|
||||
.osd_num = chunk.osd_num,
|
||||
.oid = {
|
||||
.inode = op_data->oid.inode,
|
||||
.stripe = op_data->oid.stripe | (rep ? 0 : chunk.role),
|
||||
},
|
||||
// Same version as write
|
||||
.version = op_data->fact_ver,
|
||||
};
|
||||
}
|
||||
}
|
||||
op_data->n_subops = extra_chunks;
|
||||
submit_primary_del_batch(cur_op, extra_chunks, chunks_to_del);
|
||||
}
|
||||
|
||||
void osd_t::submit_primary_del_batch(osd_op_t *cur_op, obj_ver_osd_t *chunks_to_delete, int chunks_to_delete_count)
|
||||
{
|
||||
osd_primary_op_data_t *op_data = cur_op->op_data;
|
||||
op_data->n_subops = chunks_to_delete_count;
|
||||
op_data->done = op_data->errors = 0;
|
||||
if (!extra_chunks)
|
||||
if (!op_data->n_subops)
|
||||
{
|
||||
return;
|
||||
}
|
||||
osd_op_t *subops = new osd_op_t[extra_chunks];
|
||||
osd_op_t *subops = new osd_op_t[chunks_to_delete_count];
|
||||
op_data->subops = subops;
|
||||
int i = 0;
|
||||
for (auto & chunk: loc_set)
|
||||
for (int i = 0; i < chunks_to_delete_count; i++)
|
||||
{
|
||||
if (!cur_set || (rep ? !contains_osd(cur_set, set_size, chunk.osd_num) : chunk.osd_num != cur_set[chunk.role]))
|
||||
{
|
||||
int stripe_num = op_data->scheme == POOL_SCHEME_REPLICATED ? 0 : chunk.role;
|
||||
auto & chunk = chunks_to_delete[i];
|
||||
if (chunk.osd_num == this->osd_num)
|
||||
{
|
||||
clock_gettime(CLOCK_REALTIME, &subops[i].tv_begin);
|
||||
|
@ -404,12 +418,8 @@ void osd_t::submit_primary_del_subops(osd_op_t *cur_op, osd_num_t *cur_set, uint
|
|||
{
|
||||
handle_primary_bs_subop(subop);
|
||||
},
|
||||
.oid = {
|
||||
.inode = op_data->oid.inode,
|
||||
.stripe = op_data->oid.stripe | stripe_num,
|
||||
},
|
||||
// Same version as write
|
||||
.version = op_data->fact_ver,
|
||||
.oid = chunk.oid,
|
||||
.version = chunk.version,
|
||||
});
|
||||
bs->enqueue_op(subops[i].bs_op);
|
||||
}
|
||||
|
@ -423,12 +433,8 @@ void osd_t::submit_primary_del_subops(osd_op_t *cur_op, osd_num_t *cur_set, uint
|
|||
.id = c_cli.next_subop_id++,
|
||||
.opcode = OSD_OP_SEC_DELETE,
|
||||
},
|
||||
.oid = {
|
||||
.inode = op_data->oid.inode,
|
||||
.stripe = op_data->oid.stripe | stripe_num,
|
||||
},
|
||||
// Same version as write
|
||||
.version = op_data->fact_ver,
|
||||
.oid = chunk.oid,
|
||||
.version = chunk.version,
|
||||
};
|
||||
subops[i].callback = [cur_op, this](osd_op_t *subop)
|
||||
{
|
||||
|
@ -442,8 +448,6 @@ void osd_t::submit_primary_del_subops(osd_op_t *cur_op, osd_num_t *cur_set, uint
|
|||
};
|
||||
c_cli.outbox_push(&subops[i]);
|
||||
}
|
||||
i++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue