forked from vitalif/vitastor
Allow to use lazy sync with replicated pools
parent
352caeba14
commit
53832d184a
|
@ -51,13 +51,6 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op)
|
||||||
op->retval = 0;
|
op->retval = 0;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (op->opcode == BS_OP_WRITE_STABLE && immediate_commit != IMMEDIATE_ALL &&
|
|
||||||
(op->len == block_size || deleted || immediate_commit != IMMEDIATE_SMALL))
|
|
||||||
{
|
|
||||||
// WRITE_STABLE only works with immediate commit by now
|
|
||||||
op->retval = -EINVAL;
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (is_inflight_big && !is_del && !deleted && op->len < block_size &&
|
if (is_inflight_big && !is_del && !deleted && op->len < block_size &&
|
||||||
immediate_commit != IMMEDIATE_ALL)
|
immediate_commit != IMMEDIATE_ALL)
|
||||||
{
|
{
|
||||||
|
|
1
osd.h
1
osd.h
|
@ -91,6 +91,7 @@ class osd_t
|
||||||
std::map<pool_id_t, pg_num_t> pg_counts;
|
std::map<pool_id_t, pg_num_t> pg_counts;
|
||||||
std::map<pool_pg_num_t, pg_t> pgs;
|
std::map<pool_pg_num_t, pg_t> pgs;
|
||||||
std::set<pool_pg_num_t> dirty_pgs;
|
std::set<pool_pg_num_t> dirty_pgs;
|
||||||
|
std::set<osd_num_t> dirty_osds;
|
||||||
uint64_t misplaced_objects = 0, degraded_objects = 0, incomplete_objects = 0;
|
uint64_t misplaced_objects = 0, degraded_objects = 0, incomplete_objects = 0;
|
||||||
int peering_state = 0;
|
int peering_state = 0;
|
||||||
std::map<object_id, osd_recovery_op_t> recovery_ops;
|
std::map<object_id, osd_recovery_op_t> recovery_ops;
|
||||||
|
|
|
@ -357,9 +357,7 @@ resume_9:
|
||||||
// FIXME: Check for immediate_commit == IMMEDIATE_SMALL
|
// FIXME: Check for immediate_commit == IMMEDIATE_SMALL
|
||||||
resume_6:
|
resume_6:
|
||||||
resume_7:
|
resume_7:
|
||||||
// FIXME: Replicated writes are always "immediate"
|
if (!remember_unstable_write(cur_op, pg, pg.cur_loc_set, 6))
|
||||||
if (op_data->scheme != POOL_SCHEME_REPLICATED &&
|
|
||||||
!remember_unstable_write(cur_op, pg, pg.cur_loc_set, 6))
|
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -393,6 +391,9 @@ bool osd_t::remember_unstable_write(osd_op_t *cur_op, pg_t & pg, pg_osd_set_t &
|
||||||
}
|
}
|
||||||
if (immediate_commit == IMMEDIATE_ALL)
|
if (immediate_commit == IMMEDIATE_ALL)
|
||||||
{
|
{
|
||||||
|
if (op_data->scheme != POOL_SCHEME_REPLICATED)
|
||||||
|
{
|
||||||
|
// Send STABILIZE ops immediately
|
||||||
op_data->unstable_write_osds = new std::vector<unstable_osd_num_t>();
|
op_data->unstable_write_osds = new std::vector<unstable_osd_num_t>();
|
||||||
op_data->unstable_writes = new obj_ver_id[loc_set.size()];
|
op_data->unstable_writes = new obj_ver_id[loc_set.size()];
|
||||||
{
|
{
|
||||||
|
@ -430,11 +431,15 @@ resume_7:
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
// Remember version as unstable
|
if (op_data->scheme != POOL_SCHEME_REPLICATED)
|
||||||
|
{
|
||||||
|
// Remember version as unstable for EC/XOR
|
||||||
for (auto & chunk: loc_set)
|
for (auto & chunk: loc_set)
|
||||||
{
|
{
|
||||||
|
this->dirty_osds.insert(chunk.osd_num);
|
||||||
this->unstable_writes[(osd_object_id_t){
|
this->unstable_writes[(osd_object_id_t){
|
||||||
.osd_num = chunk.osd_num,
|
.osd_num = chunk.osd_num,
|
||||||
.oid = {
|
.oid = {
|
||||||
|
@ -443,6 +448,15 @@ resume_7:
|
||||||
},
|
},
|
||||||
}] = op_data->fact_ver;
|
}] = op_data->fact_ver;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Only remember to sync OSDs for replicated pools
|
||||||
|
for (auto & chunk: loc_set)
|
||||||
|
{
|
||||||
|
this->dirty_osds.insert(chunk.osd_num);
|
||||||
|
}
|
||||||
|
}
|
||||||
// Remember PG as dirty to drop the connection when PG goes offline
|
// Remember PG as dirty to drop the connection when PG goes offline
|
||||||
// (this is required because of the "lazy sync")
|
// (this is required because of the "lazy sync")
|
||||||
c_cli.clients[cur_op->peer_fd].dirty_pgs.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
|
c_cli.clients[cur_op->peer_fd].dirty_pgs.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
|
||||||
|
@ -480,7 +494,7 @@ resume_1:
|
||||||
syncs_in_progress.push_back(cur_op);
|
syncs_in_progress.push_back(cur_op);
|
||||||
}
|
}
|
||||||
resume_2:
|
resume_2:
|
||||||
if (unstable_writes.size() == 0)
|
if (dirty_osds.size() == 0)
|
||||||
{
|
{
|
||||||
// Nothing to sync
|
// Nothing to sync
|
||||||
goto finish;
|
goto finish;
|
||||||
|
@ -488,11 +502,10 @@ resume_2:
|
||||||
// Save and clear unstable_writes
|
// Save and clear unstable_writes
|
||||||
// In theory it is possible to do in on a per-client basis, but this seems to be an unnecessary complication
|
// In theory it is possible to do in on a per-client basis, but this seems to be an unnecessary complication
|
||||||
// It would be cool not to copy these here at all, but someone has to deduplicate them by object IDs anyway
|
// It would be cool not to copy these here at all, but someone has to deduplicate them by object IDs anyway
|
||||||
|
if (unstable_writes.size() > 0)
|
||||||
{
|
{
|
||||||
op_data->unstable_write_osds = new std::vector<unstable_osd_num_t>();
|
op_data->unstable_write_osds = new std::vector<unstable_osd_num_t>();
|
||||||
op_data->unstable_writes = new obj_ver_id[this->unstable_writes.size()];
|
op_data->unstable_writes = new obj_ver_id[this->unstable_writes.size()];
|
||||||
op_data->dirty_pgs = new pool_pg_num_t[dirty_pgs.size()];
|
|
||||||
op_data->dirty_pg_count = dirty_pgs.size();
|
|
||||||
osd_num_t last_osd = 0;
|
osd_num_t last_osd = 0;
|
||||||
int last_start = 0, last_end = 0;
|
int last_start = 0, last_end = 0;
|
||||||
for (auto it = this->unstable_writes.begin(); it != this->unstable_writes.end(); it++)
|
for (auto it = this->unstable_writes.begin(); it != this->unstable_writes.end(); it++)
|
||||||
|
@ -524,6 +537,14 @@ resume_2:
|
||||||
.len = last_end - last_start,
|
.len = last_end - last_start,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
this->unstable_writes.clear();
|
||||||
|
}
|
||||||
|
{
|
||||||
|
void *dirty_buf = malloc_or_die(sizeof(pool_pg_num_t)*dirty_pgs.size() + sizeof(osd_num_t)*dirty_osds.size());
|
||||||
|
op_data->dirty_pgs = (pool_pg_num_t*)dirty_buf;
|
||||||
|
op_data->dirty_osds = (osd_num_t*)(dirty_buf + sizeof(pool_pg_num_t)*dirty_pgs.size());
|
||||||
|
op_data->dirty_pg_count = dirty_pgs.size();
|
||||||
|
op_data->dirty_osd_count = dirty_osds.size();
|
||||||
int dpg = 0;
|
int dpg = 0;
|
||||||
for (auto dirty_pg_num: dirty_pgs)
|
for (auto dirty_pg_num: dirty_pgs)
|
||||||
{
|
{
|
||||||
|
@ -531,7 +552,12 @@ resume_2:
|
||||||
op_data->dirty_pgs[dpg++] = dirty_pg_num;
|
op_data->dirty_pgs[dpg++] = dirty_pg_num;
|
||||||
}
|
}
|
||||||
dirty_pgs.clear();
|
dirty_pgs.clear();
|
||||||
this->unstable_writes.clear();
|
dpg = 0;
|
||||||
|
for (auto osd_num: dirty_osds)
|
||||||
|
{
|
||||||
|
op_data->dirty_osds[dpg++] = osd_num;
|
||||||
|
}
|
||||||
|
dirty_osds.clear();
|
||||||
}
|
}
|
||||||
if (immediate_commit != IMMEDIATE_ALL)
|
if (immediate_commit != IMMEDIATE_ALL)
|
||||||
{
|
{
|
||||||
|
@ -546,13 +572,27 @@ resume_4:
|
||||||
goto resume_6;
|
goto resume_6;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Stabilize version sets
|
if (op_data->unstable_writes)
|
||||||
|
{
|
||||||
|
// Stabilize version sets, if any
|
||||||
submit_primary_stab_subops(cur_op);
|
submit_primary_stab_subops(cur_op);
|
||||||
resume_5:
|
resume_5:
|
||||||
op_data->st = 5;
|
op_data->st = 5;
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
resume_6:
|
resume_6:
|
||||||
if (op_data->errors > 0)
|
if (op_data->errors > 0)
|
||||||
|
{
|
||||||
|
// Return PGs and OSDs back into their dirty sets
|
||||||
|
for (int i = 0; i < op_data->dirty_pg_count; i++)
|
||||||
|
{
|
||||||
|
dirty_pgs.insert(op_data->dirty_pgs[i]);
|
||||||
|
}
|
||||||
|
for (int i = 0; i < op_data->dirty_osd_count; i++)
|
||||||
|
{
|
||||||
|
dirty_osds.insert(op_data->dirty_osds[i]);
|
||||||
|
}
|
||||||
|
if (op_data->unstable_writes)
|
||||||
{
|
{
|
||||||
// Return objects back into the unstable write set
|
// Return objects back into the unstable write set
|
||||||
for (auto unstable_osd: *(op_data->unstable_write_osds))
|
for (auto unstable_osd: *(op_data->unstable_write_osds))
|
||||||
|
@ -574,6 +614,7 @@ resume_6:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
for (int i = 0; i < op_data->dirty_pg_count; i++)
|
for (int i = 0; i < op_data->dirty_pg_count; i++)
|
||||||
{
|
{
|
||||||
auto & pg = pgs.at(op_data->dirty_pgs[i]);
|
auto & pg = pgs.at(op_data->dirty_pgs[i]);
|
||||||
|
@ -584,11 +625,16 @@ resume_6:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// FIXME: Free those in the destructor?
|
// FIXME: Free those in the destructor?
|
||||||
delete op_data->dirty_pgs;
|
free(op_data->dirty_pgs);
|
||||||
|
op_data->dirty_pgs = NULL;
|
||||||
|
op_data->dirty_osds = NULL;
|
||||||
|
if (op_data->unstable_writes)
|
||||||
|
{
|
||||||
delete op_data->unstable_write_osds;
|
delete op_data->unstable_write_osds;
|
||||||
delete[] op_data->unstable_writes;
|
delete[] op_data->unstable_writes;
|
||||||
op_data->unstable_writes = NULL;
|
op_data->unstable_writes = NULL;
|
||||||
op_data->unstable_write_osds = NULL;
|
op_data->unstable_write_osds = NULL;
|
||||||
|
}
|
||||||
if (op_data->errors > 0)
|
if (op_data->errors > 0)
|
||||||
{
|
{
|
||||||
finish_op(cur_op, op_data->epipe > 0 ? -EPIPE : -EIO);
|
finish_op(cur_op, op_data->epipe > 0 ? -EPIPE : -EIO);
|
||||||
|
|
|
@ -32,5 +32,7 @@ struct osd_primary_op_data_t
|
||||||
std::vector<unstable_osd_num_t> *unstable_write_osds = NULL;
|
std::vector<unstable_osd_num_t> *unstable_write_osds = NULL;
|
||||||
pool_pg_num_t *dirty_pgs = NULL;
|
pool_pg_num_t *dirty_pgs = NULL;
|
||||||
int dirty_pg_count = 0;
|
int dirty_pg_count = 0;
|
||||||
|
osd_num_t *dirty_osds = NULL;
|
||||||
|
int dirty_osd_count = 0;
|
||||||
obj_ver_id *unstable_writes = NULL;
|
obj_ver_id *unstable_writes = NULL;
|
||||||
};
|
};
|
||||||
|
|
|
@ -445,14 +445,14 @@ void osd_t::submit_primary_del_subops(osd_op_t *cur_op, osd_num_t *cur_set, uint
|
||||||
void osd_t::submit_primary_sync_subops(osd_op_t *cur_op)
|
void osd_t::submit_primary_sync_subops(osd_op_t *cur_op)
|
||||||
{
|
{
|
||||||
osd_primary_op_data_t *op_data = cur_op->op_data;
|
osd_primary_op_data_t *op_data = cur_op->op_data;
|
||||||
int n_osds = op_data->unstable_write_osds->size();
|
int n_osds = op_data->dirty_osd_count;
|
||||||
osd_op_t *subops = new osd_op_t[n_osds];
|
osd_op_t *subops = new osd_op_t[n_osds];
|
||||||
op_data->done = op_data->errors = 0;
|
op_data->done = op_data->errors = 0;
|
||||||
op_data->n_subops = n_osds;
|
op_data->n_subops = n_osds;
|
||||||
op_data->subops = subops;
|
op_data->subops = subops;
|
||||||
for (int i = 0; i < n_osds; i++)
|
for (int i = 0; i < n_osds; i++)
|
||||||
{
|
{
|
||||||
osd_num_t sync_osd = (*(op_data->unstable_write_osds))[i].osd_num;
|
osd_num_t sync_osd = op_data->dirty_osds[i];
|
||||||
if (sync_osd == this->osd_num)
|
if (sync_osd == this->osd_num)
|
||||||
{
|
{
|
||||||
clock_gettime(CLOCK_REALTIME, &subops[i].tv_begin);
|
clock_gettime(CLOCK_REALTIME, &subops[i].tv_begin);
|
||||||
|
|
Loading…
Reference in New Issue