forked from vitalif/vitastor
Fill out the rest of the degraded read logic; now we need to make it a "coroutine"
parent
206c4eb655
commit
9fb2d3f840
3
osd.h
3
osd.h
|
@ -74,8 +74,8 @@ struct osd_client_t
|
|||
int peer_fd;
|
||||
int peer_state;
|
||||
std::function<void(int)> connect_callback;
|
||||
// osd numbers begin with 1
|
||||
uint64_t osd_num = 0;
|
||||
//int in_flight_ops = 0;
|
||||
|
||||
// Read state
|
||||
bool read_ready = false;
|
||||
|
@ -122,6 +122,7 @@ class osd_t
|
|||
std::vector<pg_t> pgs;
|
||||
int peering_state = 0;
|
||||
unsigned pg_count = 0;
|
||||
uint64_t next_subop_id = 1;
|
||||
|
||||
// client & peer I/O
|
||||
|
||||
|
|
|
@ -62,7 +62,18 @@ void pg_t::remember_object(pg_obj_state_check_t &st, std::vector<obj_ver_role> &
|
|||
auto it = pg.state_dict.find(st.osd_set);
|
||||
if (it == pg.state_dict.end())
|
||||
{
|
||||
std::vector<uint64_t> read_target;
|
||||
read_target.resize(pg.pg_size);
|
||||
for (int i = 0; i < pg.pg_size; i++)
|
||||
{
|
||||
read_target[i] = 0;
|
||||
}
|
||||
for (auto & o: st.osd_set)
|
||||
{
|
||||
read_target[o.role] = o.osd_num;
|
||||
}
|
||||
pg.state_dict[st.osd_set] = {
|
||||
.read_target = read_target,
|
||||
.osd_set = st.osd_set,
|
||||
.state = state,
|
||||
.object_count = 1,
|
||||
|
|
|
@ -39,6 +39,7 @@ typedef std::vector<pg_obj_loc_t> pg_osd_set_t;
|
|||
|
||||
struct pg_osd_set_state_t
|
||||
{
|
||||
std::vector<uint64_t> read_target;
|
||||
pg_osd_set_t osd_set;
|
||||
uint64_t state = 0;
|
||||
uint64_t object_count = 0;
|
||||
|
|
|
@ -40,7 +40,7 @@ void osd_t::exec_primary_read(osd_op_t *cur_op)
|
|||
}
|
||||
}
|
||||
auto vo_it = pgs[pg_num].ver_override.find(oid);
|
||||
uint64_t target_ver = vo_it != pgs[pg_num].ver_override.end() ? vo_it.second : UINT64_MAX;
|
||||
uint64_t target_ver = vo_it != pgs[pg_num].ver_override.end() ? vo_it->second : UINT64_MAX;
|
||||
if (pgs[pg_num].pg_cursize == 3)
|
||||
{
|
||||
// Fast happy-path
|
||||
|
@ -50,18 +50,22 @@ void osd_t::exec_primary_read(osd_op_t *cur_op)
|
|||
else
|
||||
{
|
||||
// PG is degraded
|
||||
auto it = pgs[pg_num].obj_states.find(oid);
|
||||
std::vector<uint64_t> & target_set = (it != pgs[pg_num].obj_states.end()
|
||||
? it->second->read_target
|
||||
: pgs[pg_num].target_set);
|
||||
uint64_t real_reads[pgs[pg_num].pg_size*2] = { 0 };
|
||||
memcpy(real_reads, reads, sizeof(uint64_t)*pgs[pg_num].pg_minsize*2);
|
||||
for (int role = 0; role < pgs[pg_num].pg_minsize; role++)
|
||||
{
|
||||
if (reads[role*2+1] != 0 && pgs[pg_num].target_set[role] == UINT64_MAX)
|
||||
if (reads[role*2+1] != 0 && target_set[role] == 0)
|
||||
{
|
||||
// Stripe is missing. Extend read to other stripes.
|
||||
// We need at least pg_minsize stripes to recover the lost part.
|
||||
int exist = 0;
|
||||
for (int j = 0; j < pgs[pg_num].pg_size; j++)
|
||||
{
|
||||
if (pgs[pg_num].target_set[j] != UINT64_MAX)
|
||||
if (target_set[j] != 0)
|
||||
{
|
||||
if (real_reads[j*2+1] == 0 || j >= pgs[pg_num].pg_minsize)
|
||||
{
|
||||
|
@ -80,6 +84,16 @@ void osd_t::exec_primary_read(osd_op_t *cur_op)
|
|||
}
|
||||
}
|
||||
}
|
||||
if (exist < pgs[pg_num].pg_minsize)
|
||||
{
|
||||
// Object is unreadable
|
||||
cur_op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC;
|
||||
cur_op->reply.hdr.id = cur_op->op.hdr.id;
|
||||
cur_op->reply.hdr.opcode = cur_op->op.hdr.opcode;
|
||||
cur_op->reply.hdr.retval = -EIO;
|
||||
outbox_push(clients[cur_op->peer_fd], cur_op);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
uint64_t pos[pgs[pg_num].pg_size];
|
||||
|
@ -98,23 +112,43 @@ void osd_t::exec_primary_read(osd_op_t *cur_op)
|
|||
// Submit reads
|
||||
osd_op_t read_ops[n_subops];
|
||||
int subop = 0;
|
||||
int errors = 0, done = 0;
|
||||
for (int role = 0; role < pgs[pg_num].pg_size; role++)
|
||||
{
|
||||
// FIXME Take remapped objects into account
|
||||
uint64_t role_osd_num = pgs[pg_num].target_set[role];
|
||||
if (role_osd_num != UINT64_MAX)
|
||||
uint64_t role_osd_num = target_set[role];
|
||||
if (role_osd_num != 0)
|
||||
{
|
||||
if (role_osd_num == this->osd_num)
|
||||
{
|
||||
read_ops[subop].bs_op = {
|
||||
.opcode = BS_OP_READ,
|
||||
.callback = [&](blockstore_op_t *op)
|
||||
{
|
||||
if (op->retval < op->len)
|
||||
errors++;
|
||||
else
|
||||
done++;
|
||||
// continue op
|
||||
},
|
||||
.oid = {
|
||||
.inode = oid.inode,
|
||||
.stripe = oid.stripe | role,
|
||||
},
|
||||
.version = target_ver,
|
||||
.offset = real_reads[role*2],
|
||||
.len = real_reads[role*2+1] - real_reads[role*2],
|
||||
.buf = buf + pos[role],
|
||||
};
|
||||
bs->enqueue_op(&read_ops[subop].bs_op);
|
||||
}
|
||||
else
|
||||
{
|
||||
read_ops[subop].op_type = OSD_OP_OUT;
|
||||
read_ops[subop].peer_fd = osd_peer_fds.get(role_osd_num);
|
||||
read_ops[subop].peer_fd = osd_peer_fds.at(role_osd_num);
|
||||
read_ops[subop].op.sec_rw = {
|
||||
.header = {
|
||||
.magic = SECONDARY_OSD_OP_MAGIC,
|
||||
.id = next_op_id++,
|
||||
.id = next_subop_id++,
|
||||
.opcode = OSD_OP_SECONDARY_READ,
|
||||
},
|
||||
.oid = {
|
||||
|
@ -126,7 +160,14 @@ void osd_t::exec_primary_read(osd_op_t *cur_op)
|
|||
.len = real_reads[role*2+1] - real_reads[role*2],
|
||||
};
|
||||
read_ops[subop].buf = buf + pos[role];
|
||||
read_ops[subop].callback = NULL;
|
||||
read_ops[subop].callback = [&](osd_op_t *osd_subop)
|
||||
{
|
||||
if (osd_subop->reply.hdr.retval < osd_subop->op.sec_rw.len)
|
||||
errors++;
|
||||
else
|
||||
done++;
|
||||
// continue op
|
||||
};
|
||||
}
|
||||
subop++;
|
||||
}
|
||||
|
@ -134,7 +175,7 @@ void osd_t::exec_primary_read(osd_op_t *cur_op)
|
|||
// Reconstruct missing stripes
|
||||
for (int role = 0; role < pgs[pg_num].pg_minsize; role++)
|
||||
{
|
||||
if (reads[role*2+1] != 0 && pgs[pg_num].target_set[role] == UINT64_MAX)
|
||||
if (reads[role*2+1] != 0 && target_set[role] == 0)
|
||||
{
|
||||
int other = role == 0 ? 1 : 0;
|
||||
int parity = pgs[pg_num].pg_size-1;
|
||||
|
|
Loading…
Reference in New Issue