Forget unstable writes when re-peering, rename parity_block_size -> pg_stripe_size, pg_parity_size -> pg_block_size
parent
8a8b619875
commit
92c800bb64
|
@ -395,8 +395,8 @@ void blockstore_impl_t::process_list(blockstore_op_t *op)
|
||||||
// Count objects
|
// Count objects
|
||||||
uint32_t list_pg = op->offset;
|
uint32_t list_pg = op->offset;
|
||||||
uint32_t pg_count = op->len;
|
uint32_t pg_count = op->len;
|
||||||
uint64_t parity_block_size = op->oid.stripe;
|
uint64_t pg_stripe_size = op->oid.stripe;
|
||||||
if (pg_count != 0 && (parity_block_size < MIN_BLOCK_SIZE || list_pg >= pg_count))
|
if (pg_count != 0 && (pg_stripe_size < MIN_BLOCK_SIZE || list_pg >= pg_count))
|
||||||
{
|
{
|
||||||
op->retval = -EINVAL;
|
op->retval = -EINVAL;
|
||||||
FINISH_OP(op);
|
FINISH_OP(op);
|
||||||
|
@ -407,7 +407,7 @@ void blockstore_impl_t::process_list(blockstore_op_t *op)
|
||||||
{
|
{
|
||||||
for (auto it = clean_db.begin(); it != clean_db.end(); it++)
|
for (auto it = clean_db.begin(); it != clean_db.end(); it++)
|
||||||
{
|
{
|
||||||
uint32_t pg = (it->first.inode + it->first.stripe / parity_block_size) % pg_count;
|
uint32_t pg = (it->first.inode + it->first.stripe / pg_stripe_size) % pg_count;
|
||||||
if (pg == list_pg)
|
if (pg == list_pg)
|
||||||
{
|
{
|
||||||
stable_count++;
|
stable_count++;
|
||||||
|
@ -421,7 +421,7 @@ void blockstore_impl_t::process_list(blockstore_op_t *op)
|
||||||
uint64_t total_count = stable_count;
|
uint64_t total_count = stable_count;
|
||||||
for (auto it = dirty_db.begin(); it != dirty_db.end(); it++)
|
for (auto it = dirty_db.begin(); it != dirty_db.end(); it++)
|
||||||
{
|
{
|
||||||
if (!pg_count || ((it->first.oid.inode + it->first.oid.stripe / parity_block_size) % pg_count) == list_pg)
|
if (!pg_count || ((it->first.oid.inode + it->first.oid.stripe / pg_stripe_size) % pg_count) == list_pg)
|
||||||
{
|
{
|
||||||
if (IS_STABLE(it->second.state))
|
if (IS_STABLE(it->second.state))
|
||||||
{
|
{
|
||||||
|
@ -444,7 +444,7 @@ void blockstore_impl_t::process_list(blockstore_op_t *op)
|
||||||
int i = 0;
|
int i = 0;
|
||||||
for (auto it = clean_db.begin(); it != clean_db.end(); it++)
|
for (auto it = clean_db.begin(); it != clean_db.end(); it++)
|
||||||
{
|
{
|
||||||
if (!pg_count || ((it->first.inode + it->first.stripe / parity_block_size) % pg_count) == list_pg)
|
if (!pg_count || ((it->first.inode + it->first.stripe / pg_stripe_size) % pg_count) == list_pg)
|
||||||
{
|
{
|
||||||
vers[i++] = {
|
vers[i++] = {
|
||||||
.oid = it->first,
|
.oid = it->first,
|
||||||
|
@ -455,7 +455,7 @@ void blockstore_impl_t::process_list(blockstore_op_t *op)
|
||||||
int j = stable_count;
|
int j = stable_count;
|
||||||
for (auto it = dirty_db.begin(); it != dirty_db.end(); it++)
|
for (auto it = dirty_db.begin(); it != dirty_db.end(); it++)
|
||||||
{
|
{
|
||||||
if (!pg_count || ((it->first.oid.inode + it->first.oid.stripe / parity_block_size) % pg_count) == list_pg)
|
if (!pg_count || ((it->first.oid.inode + it->first.oid.stripe / pg_stripe_size) % pg_count) == list_pg)
|
||||||
{
|
{
|
||||||
if (IS_STABLE(it->second.state))
|
if (IS_STABLE(it->second.state))
|
||||||
{
|
{
|
||||||
|
|
7
osd.h
7
osd.h
|
@ -209,7 +209,7 @@ class osd_t
|
||||||
int inflight_ops = 0;
|
int inflight_ops = 0;
|
||||||
blockstore_t *bs;
|
blockstore_t *bs;
|
||||||
uint32_t bs_block_size, bs_disk_alignment;
|
uint32_t bs_block_size, bs_disk_alignment;
|
||||||
uint64_t parity_block_size = 4*1024*1024; // 4 MB by default
|
uint64_t pg_stripe_size = 4*1024*1024; // 4 MB by default
|
||||||
ring_loop_t *ringloop;
|
ring_loop_t *ringloop;
|
||||||
timerfd_interval *tick_tfd;
|
timerfd_interval *tick_tfd;
|
||||||
|
|
||||||
|
@ -281,6 +281,11 @@ class osd_t
|
||||||
void submit_primary_subops(int submit_type, int read_pg_size, const uint64_t* osd_set, osd_op_t *cur_op);
|
void submit_primary_subops(int submit_type, int read_pg_size, const uint64_t* osd_set, osd_op_t *cur_op);
|
||||||
void submit_primary_sync_subops(osd_op_t *cur_op);
|
void submit_primary_sync_subops(osd_op_t *cur_op);
|
||||||
void submit_primary_stab_subops(osd_op_t *cur_op);
|
void submit_primary_stab_subops(osd_op_t *cur_op);
|
||||||
|
|
||||||
|
inline pg_num_t map_to_pg(object_id oid)
|
||||||
|
{
|
||||||
|
return (oid.inode + oid.stripe / pg_stripe_size) % pg_count + 1;
|
||||||
|
}
|
||||||
public:
|
public:
|
||||||
osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringloop);
|
osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringloop);
|
||||||
~osd_t();
|
~osd_t();
|
||||||
|
|
|
@ -130,7 +130,7 @@ struct __attribute__((__packed__)) osd_op_secondary_list_t
|
||||||
osd_op_header_t header;
|
osd_op_header_t header;
|
||||||
// placement group total number and total count
|
// placement group total number and total count
|
||||||
pg_num_t list_pg, pg_count;
|
pg_num_t list_pg, pg_count;
|
||||||
uint64_t parity_block_size;
|
uint64_t pg_stripe_size;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct __attribute__((__packed__)) osd_reply_secondary_list_t
|
struct __attribute__((__packed__)) osd_reply_secondary_list_t
|
||||||
|
|
|
@ -271,6 +271,15 @@ void osd_t::start_pg_peering(pg_num_t pg_num)
|
||||||
cancel_op(p.second);
|
cancel_op(p.second);
|
||||||
}
|
}
|
||||||
pg.write_queue.clear();
|
pg.write_queue.clear();
|
||||||
|
// Forget this PG's unstable writes
|
||||||
|
for (auto it = unstable_writes.begin(); it != unstable_writes.end(); )
|
||||||
|
{
|
||||||
|
pg_num_t n = (it->first.oid.inode + it->first.oid.stripe / pg_stripe_size) % pg_count + 1;
|
||||||
|
if (n == pg.pg_num)
|
||||||
|
unstable_writes.erase(it++);
|
||||||
|
else
|
||||||
|
it++;
|
||||||
|
}
|
||||||
pg.pg_cursize = 0;
|
pg.pg_cursize = 0;
|
||||||
for (int role = 0; role < pg.cur_set.size(); role++)
|
for (int role = 0; role < pg.cur_set.size(); role++)
|
||||||
{
|
{
|
||||||
|
@ -374,7 +383,7 @@ void osd_t::start_pg_peering(pg_num_t pg_num)
|
||||||
op->peer_fd = 0;
|
op->peer_fd = 0;
|
||||||
op->bs_op = new blockstore_op_t();
|
op->bs_op = new blockstore_op_t();
|
||||||
op->bs_op->opcode = BS_OP_LIST;
|
op->bs_op->opcode = BS_OP_LIST;
|
||||||
op->bs_op->oid.stripe = parity_block_size;
|
op->bs_op->oid.stripe = pg_stripe_size;
|
||||||
op->bs_op->len = pg_count;
|
op->bs_op->len = pg_count;
|
||||||
op->bs_op->offset = pg.pg_num-1;
|
op->bs_op->offset = pg.pg_num-1;
|
||||||
op->bs_op->callback = [ps, op, role_osd](blockstore_op_t *bs_op)
|
op->bs_op->callback = [ps, op, role_osd](blockstore_op_t *bs_op)
|
||||||
|
@ -416,7 +425,7 @@ void osd_t::start_pg_peering(pg_num_t pg_num)
|
||||||
},
|
},
|
||||||
.list_pg = pg.pg_num,
|
.list_pg = pg.pg_num,
|
||||||
.pg_count = pg_count,
|
.pg_count = pg_count,
|
||||||
.parity_block_size = parity_block_size,
|
.pg_stripe_size = pg_stripe_size,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
op->callback = [this, ps, role_osd](osd_op_t *op)
|
op->callback = [this, ps, role_osd](osd_op_t *op)
|
||||||
|
|
|
@ -22,7 +22,7 @@
|
||||||
#define PG_HAS_MISPLACED (1<<8)
|
#define PG_HAS_MISPLACED (1<<8)
|
||||||
#define PG_HAS_UNCLEAN (1<<9)
|
#define PG_HAS_UNCLEAN (1<<9)
|
||||||
|
|
||||||
// FIXME: Safe default that doesn't depend on parity_block_size or pg_parity_size
|
// FIXME: Safe default that doesn't depend on pg_stripe_size or pg_block_size
|
||||||
#define STRIPE_MASK ((uint64_t)4096 - 1)
|
#define STRIPE_MASK ((uint64_t)4096 - 1)
|
||||||
|
|
||||||
// OSD object states
|
// OSD object states
|
||||||
|
|
|
@ -69,21 +69,21 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op)
|
||||||
// Our EC scheme stores data in fixed chunks equal to (K*block size)
|
// Our EC scheme stores data in fixed chunks equal to (K*block size)
|
||||||
// But we must not use K in the process of calculating the PG number
|
// But we must not use K in the process of calculating the PG number
|
||||||
// So we calculate the PG number using a separate setting which should be per-inode (FIXME)
|
// So we calculate the PG number using a separate setting which should be per-inode (FIXME)
|
||||||
pg_num_t pg_num = (cur_op->req.rw.inode + cur_op->req.rw.offset / parity_block_size) % pg_count + 1;
|
pg_num_t pg_num = (cur_op->req.rw.inode + cur_op->req.rw.offset / pg_stripe_size) % pg_count + 1;
|
||||||
// FIXME: Postpone operations in inactive PGs
|
// FIXME: Postpone operations in inactive PGs
|
||||||
if (pgs.find(pg_num) == pgs.end() || !(pgs[pg_num].state & PG_ACTIVE))
|
if (pgs.find(pg_num) == pgs.end() || !(pgs[pg_num].state & PG_ACTIVE))
|
||||||
{
|
{
|
||||||
finish_op(cur_op, -EINVAL);
|
finish_op(cur_op, -EINVAL);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
uint64_t pg_parity_size = bs_block_size * pgs[pg_num].pg_minsize;
|
uint64_t pg_block_size = bs_block_size * pgs[pg_num].pg_minsize;
|
||||||
object_id oid = {
|
object_id oid = {
|
||||||
.inode = cur_op->req.rw.inode,
|
.inode = cur_op->req.rw.inode,
|
||||||
// oid.stripe = starting offset of the parity stripe, so it can be mapped back to the PG
|
// oid.stripe = starting offset of the parity stripe, so it can be mapped back to the PG
|
||||||
.stripe = (cur_op->req.rw.offset / parity_block_size) * parity_block_size +
|
.stripe = (cur_op->req.rw.offset / pg_stripe_size) * pg_stripe_size +
|
||||||
((cur_op->req.rw.offset % parity_block_size) / pg_parity_size) * pg_parity_size
|
((cur_op->req.rw.offset % pg_stripe_size) / pg_block_size) * pg_block_size
|
||||||
};
|
};
|
||||||
if ((cur_op->req.rw.offset + cur_op->req.rw.len) > (oid.stripe + pg_parity_size) ||
|
if ((cur_op->req.rw.offset + cur_op->req.rw.len) > (oid.stripe + pg_block_size) ||
|
||||||
(cur_op->req.rw.offset % bs_disk_alignment) != 0 ||
|
(cur_op->req.rw.offset % bs_disk_alignment) != 0 ||
|
||||||
(cur_op->req.rw.len % bs_disk_alignment) != 0)
|
(cur_op->req.rw.len % bs_disk_alignment) != 0)
|
||||||
{
|
{
|
||||||
|
@ -708,7 +708,6 @@ resume_5:
|
||||||
op_data->st = 5;
|
op_data->st = 5;
|
||||||
return;
|
return;
|
||||||
resume_6:
|
resume_6:
|
||||||
// FIXME: Free them correctly (via a destructor or so)
|
|
||||||
if (op_data->errors > 0)
|
if (op_data->errors > 0)
|
||||||
{
|
{
|
||||||
// Return objects back into the unstable write set
|
// Return objects back into the unstable write set
|
||||||
|
@ -716,15 +715,20 @@ resume_6:
|
||||||
{
|
{
|
||||||
for (int i = 0; i < unstable_osd.len; i++)
|
for (int i = 0; i < unstable_osd.len; i++)
|
||||||
{
|
{
|
||||||
uint64_t & uv = this->unstable_writes[(osd_object_id_t){
|
// Expect those from peered PGs
|
||||||
|
auto & w = op_data->unstable_writes[i];
|
||||||
|
if (pgs[map_to_pg(w.oid)].state & PG_ACTIVE)
|
||||||
|
{
|
||||||
|
uint64_t & dest = this->unstable_writes[(osd_object_id_t){
|
||||||
.osd_num = unstable_osd.osd_num,
|
.osd_num = unstable_osd.osd_num,
|
||||||
.oid = op_data->unstable_writes[i].oid,
|
.oid = w.oid,
|
||||||
}];
|
}];
|
||||||
uv = uv < op_data->unstable_writes[i].version ? op_data->unstable_writes[i].version : uv;
|
dest = dest < w.version ? w.version : dest;
|
||||||
}
|
|
||||||
// FIXME: But filter those from peered PGs
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// FIXME: Free those in the destructor?
|
||||||
delete op_data->unstable_write_osds;
|
delete op_data->unstable_write_osds;
|
||||||
delete[] op_data->unstable_writes;
|
delete[] op_data->unstable_writes;
|
||||||
op_data->unstable_writes = NULL;
|
op_data->unstable_writes = NULL;
|
||||||
|
|
|
@ -82,7 +82,7 @@ void osd_t::exec_secondary(osd_op_t *cur_op)
|
||||||
secondary_op_callback(cur_op);
|
secondary_op_callback(cur_op);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
cur_op->bs_op->oid.stripe = cur_op->req.sec_list.parity_block_size;
|
cur_op->bs_op->oid.stripe = cur_op->req.sec_list.pg_stripe_size;
|
||||||
cur_op->bs_op->len = cur_op->req.sec_list.pg_count;
|
cur_op->bs_op->len = cur_op->req.sec_list.pg_count;
|
||||||
cur_op->bs_op->offset = cur_op->req.sec_list.list_pg - 1;
|
cur_op->bs_op->offset = cur_op->req.sec_list.list_pg - 1;
|
||||||
#ifdef OSD_STUB
|
#ifdef OSD_STUB
|
||||||
|
|
Loading…
Reference in New Issue