forked from vitalif/vitastor
Delete misplaced chunks after moving the object, reset object state in primary_write
parent
cf7de0f181
commit
d59be0e8b4
1
osd.h
1
osd.h
|
@ -301,6 +301,7 @@ class osd_t
|
|||
void handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int retval, int expected, uint64_t version);
|
||||
void pg_cancel_write_queue(pg_t & pg, object_id oid, int retval);
|
||||
void submit_primary_subops(int submit_type, int read_pg_size, const uint64_t* osd_set, osd_op_t *cur_op);
|
||||
void submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_osd_set_state_t *object_state);
|
||||
void submit_primary_sync_subops(osd_op_t *cur_op);
|
||||
void submit_primary_stab_subops(osd_op_t *cur_op);
|
||||
|
||||
|
|
|
@ -254,41 +254,6 @@ void osd_t::submit_recovery_op(osd_recovery_op_t *op)
|
|||
throw std::runtime_error("Failed to recover an object");
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
pg_t *pg = &pgs[op->pg_num];
|
||||
pg_osd_set_state_t *st;
|
||||
if (op->degraded)
|
||||
{
|
||||
auto st_it = pg->degraded_objects.find(op->oid);
|
||||
st = st_it->second;
|
||||
pg->degraded_objects.erase(st_it);
|
||||
degraded_objects--;
|
||||
if (!pg->degraded_objects.size())
|
||||
{
|
||||
pg->state = pg->state & ~PG_HAS_DEGRADED;
|
||||
pg->print_state();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
auto st_it = pg->misplaced_objects.find(op->oid);
|
||||
st = st_it->second;
|
||||
pg->misplaced_objects.erase(st_it);
|
||||
misplaced_objects--;
|
||||
if (!pg->misplaced_objects.size())
|
||||
{
|
||||
pg->state = pg->state & ~PG_HAS_MISPLACED;
|
||||
pg->print_state();
|
||||
}
|
||||
}
|
||||
pg->clean_count++;
|
||||
st->object_count--;
|
||||
if (!st->object_count)
|
||||
{
|
||||
pg->state_dict.erase(st->osd_set);
|
||||
}
|
||||
}
|
||||
recovery_ops.erase(op->oid);
|
||||
delete osd_op;
|
||||
op->osd_op = NULL;
|
||||
|
|
168
osd_primary.cpp
168
osd_primary.cpp
|
@ -1,18 +1,20 @@
|
|||
#include "osd.h"
|
||||
#include "osd_rmw.h"
|
||||
|
||||
// FIXME: Split into more files
|
||||
|
||||
#define SUBMIT_READ 0
|
||||
#define SUBMIT_RMW_READ 1
|
||||
#define SUBMIT_WRITE 2
|
||||
|
||||
// read: read directly or read paired stripe(s), reconstruct, return
|
||||
// write: read paired stripe(s), modify, write
|
||||
// write: read paired stripe(s), reconstruct, modify, calculate parity, write
|
||||
//
|
||||
// nuance: take care to read the same version from paired stripes!
|
||||
// to do so, we remember "last readable" version until a write request completes
|
||||
// and we postpone other write requests to the same stripe until completion of previous ones
|
||||
//
|
||||
// sync: sync peers, get unstable versions from somewhere, stabilize them
|
||||
// sync: sync peers, get unstable versions, stabilize them
|
||||
|
||||
struct unstable_osd_num_t
|
||||
{
|
||||
|
@ -32,7 +34,7 @@ struct osd_primary_op_data_t
|
|||
osd_rmw_stripe_t *stripes;
|
||||
osd_op_t *subops = NULL;
|
||||
uint64_t *prev_set = NULL;
|
||||
uint64_t object_state = 0;
|
||||
pg_osd_set_state_t *object_state = NULL;
|
||||
|
||||
// for sync. oops, requires freeing
|
||||
std::vector<unstable_osd_num_t> *unstable_write_osds = NULL;
|
||||
|
@ -118,32 +120,32 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op)
|
|||
return true;
|
||||
}
|
||||
|
||||
uint64_t* get_object_osd_set(pg_t &pg, object_id &oid, uint64_t *def, uint64_t &object_state)
|
||||
uint64_t* get_object_osd_set(pg_t &pg, object_id &oid, uint64_t *def, pg_osd_set_state_t **object_state)
|
||||
{
|
||||
if (!(pg.state & (PG_HAS_INCOMPLETE | PG_HAS_DEGRADED | PG_HAS_MISPLACED)))
|
||||
{
|
||||
object_state = 0;
|
||||
*object_state = NULL;
|
||||
return def;
|
||||
}
|
||||
auto st_it = pg.incomplete_objects.find(oid);
|
||||
if (st_it != pg.incomplete_objects.end())
|
||||
{
|
||||
object_state = st_it->second->state;
|
||||
*object_state = st_it->second;
|
||||
return st_it->second->read_target.data();
|
||||
}
|
||||
st_it = pg.degraded_objects.find(oid);
|
||||
if (st_it != pg.degraded_objects.end())
|
||||
{
|
||||
object_state = st_it->second->state;
|
||||
*object_state = st_it->second;
|
||||
return st_it->second->read_target.data();
|
||||
}
|
||||
st_it = pg.misplaced_objects.find(oid);
|
||||
if (st_it != pg.misplaced_objects.end())
|
||||
{
|
||||
object_state = st_it->second->state;
|
||||
*object_state = st_it->second;
|
||||
return st_it->second->read_target.data();
|
||||
}
|
||||
object_state = 0;
|
||||
*object_state = NULL;
|
||||
return def;
|
||||
}
|
||||
|
||||
|
@ -177,7 +179,7 @@ void osd_t::continue_primary_read(osd_op_t *cur_op)
|
|||
else
|
||||
{
|
||||
// PG may be degraded or have misplaced objects
|
||||
uint64_t* cur_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data(), op_data->object_state);
|
||||
uint64_t* cur_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data(), &op_data->object_state);
|
||||
if (extend_missing_stripes(op_data->stripes, cur_set, pg.pg_minsize, pg.pg_size) < 0)
|
||||
{
|
||||
finish_op(cur_op, -EIO);
|
||||
|
@ -418,6 +420,7 @@ void osd_t::continue_primary_write(osd_op_t *cur_op)
|
|||
else if (op_data->st == 5) goto resume_5;
|
||||
else if (op_data->st == 6) goto resume_6;
|
||||
else if (op_data->st == 7) goto resume_7;
|
||||
else if (op_data->st == 8) goto resume_8;
|
||||
assert(op_data->st == 0);
|
||||
// Check if actions are pending for this object
|
||||
{
|
||||
|
@ -447,7 +450,7 @@ void osd_t::continue_primary_write(osd_op_t *cur_op)
|
|||
resume_1:
|
||||
// Determine blocks to read and write
|
||||
// Missing chunks are allowed to be overwritten even in incomplete objects
|
||||
op_data->prev_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data(), op_data->object_state);
|
||||
op_data->prev_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data(), &op_data->object_state);
|
||||
cur_op->rmw_buf = calc_rmw(cur_op->buf, op_data->stripes, op_data->prev_set,
|
||||
pg.pg_size, pg.pg_minsize, pg.pg_cursize, pg.cur_set.data(), bs_block_size);
|
||||
// Read required blocks
|
||||
|
@ -476,6 +479,68 @@ resume_5:
|
|||
pg_cancel_write_queue(pg, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
|
||||
return;
|
||||
}
|
||||
if (op_data->object_state)
|
||||
{
|
||||
if (op_data->object_state->state & OBJ_MISPLACED)
|
||||
{
|
||||
// Remove extra chunks
|
||||
submit_primary_del_subops(cur_op, pg.cur_set.data(), op_data->object_state);
|
||||
if (op_data->n_subops > 0)
|
||||
{
|
||||
op_data->st = 8;
|
||||
return;
|
||||
resume_8:
|
||||
if (op_data->errors > 0)
|
||||
{
|
||||
pg_cancel_write_queue(pg, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Clear object state
|
||||
if (op_data->object_state->state & OBJ_INCOMPLETE)
|
||||
{
|
||||
// Successful write means that object is not incomplete anymore
|
||||
incomplete_objects--;
|
||||
pg.incomplete_objects.erase(op_data->oid);
|
||||
if (!pg.incomplete_objects.size())
|
||||
{
|
||||
pg.state = pg.state & ~PG_HAS_INCOMPLETE;
|
||||
pg.print_state();
|
||||
}
|
||||
}
|
||||
else if (op_data->object_state->state & OBJ_DEGRADED)
|
||||
{
|
||||
degraded_objects--;
|
||||
pg.degraded_objects.erase(op_data->oid);
|
||||
if (!pg.degraded_objects.size())
|
||||
{
|
||||
pg.state = pg.state & ~PG_HAS_DEGRADED;
|
||||
pg.print_state();
|
||||
}
|
||||
}
|
||||
else if (op_data->object_state->state & OBJ_MISPLACED)
|
||||
{
|
||||
misplaced_objects--;
|
||||
pg.misplaced_objects.erase(op_data->oid);
|
||||
if (!pg.misplaced_objects.size())
|
||||
{
|
||||
pg.state = pg.state & ~PG_HAS_MISPLACED;
|
||||
pg.print_state();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
throw std::runtime_error("Invalid object state during recovery: "+std::to_string(op_data->object_state->state));
|
||||
}
|
||||
// FIXME: Track object count during normal writes, too
|
||||
pg.clean_count++;
|
||||
op_data->object_state->object_count--;
|
||||
if (!op_data->object_state->object_count)
|
||||
{
|
||||
pg.state_dict.erase(op_data->object_state->osd_set);
|
||||
}
|
||||
}
|
||||
// FIXME: Check for immediate_commit == IMMEDIATE_SMALL
|
||||
if (immediate_commit == IMMEDIATE_ALL)
|
||||
{
|
||||
|
@ -746,6 +811,87 @@ finish:
|
|||
}
|
||||
}
|
||||
|
||||
void osd_t::submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_osd_set_state_t *object_state)
|
||||
{
|
||||
osd_primary_op_data_t *op_data = cur_op->op_data;
|
||||
int extra_chunks = 0;
|
||||
for (auto chunk: object_state->osd_set)
|
||||
{
|
||||
if (chunk.osd_num != cur_set[chunk.role])
|
||||
{
|
||||
extra_chunks++;
|
||||
}
|
||||
}
|
||||
op_data->n_subops = extra_chunks;
|
||||
op_data->done = op_data->errors = 0;
|
||||
if (!extra_chunks)
|
||||
{
|
||||
return;
|
||||
}
|
||||
osd_op_t *subops = new osd_op_t[extra_chunks];
|
||||
op_data->subops = subops;
|
||||
int i = 0;
|
||||
for (auto chunk: object_state->osd_set)
|
||||
{
|
||||
if (chunk.osd_num != cur_set[chunk.role])
|
||||
{
|
||||
if (chunk.osd_num == this->osd_num)
|
||||
{
|
||||
subops[i].bs_op = new blockstore_op_t({
|
||||
.opcode = BS_OP_DELETE,
|
||||
.callback = [cur_op, this](blockstore_op_t *subop)
|
||||
{
|
||||
if (subop->retval != 0)
|
||||
{
|
||||
// die
|
||||
throw std::runtime_error("local delete operation failed");
|
||||
}
|
||||
handle_primary_subop(OSD_OP_SECONDARY_DELETE, cur_op, subop->retval, 0, 0);
|
||||
},
|
||||
.oid = {
|
||||
.inode = op_data->oid.inode,
|
||||
.stripe = op_data->oid.stripe | chunk.role,
|
||||
},
|
||||
// Same version as write
|
||||
.version = op_data->fact_ver,
|
||||
});
|
||||
bs->enqueue_op(subops[i].bs_op);
|
||||
}
|
||||
else
|
||||
{
|
||||
subops[i].op_type = OSD_OP_OUT;
|
||||
subops[i].send_list.push_back(subops[i].req.buf, OSD_PACKET_SIZE);
|
||||
subops[i].peer_fd = osd_peer_fds.at(chunk.osd_num);
|
||||
subops[i].req.sec_del = {
|
||||
.header = {
|
||||
.magic = SECONDARY_OSD_OP_MAGIC,
|
||||
.id = this->next_subop_id++,
|
||||
.opcode = OSD_OP_SECONDARY_DELETE,
|
||||
},
|
||||
.oid = {
|
||||
.inode = op_data->oid.inode,
|
||||
.stripe = op_data->oid.stripe | chunk.role,
|
||||
},
|
||||
// Same version as write
|
||||
.version = op_data->fact_ver,
|
||||
};
|
||||
subops[i].callback = [cur_op, this](osd_op_t *subop)
|
||||
{
|
||||
int fail_fd = subop->reply.hdr.retval != 0 ? subop->peer_fd : 0;
|
||||
handle_primary_subop(OSD_OP_SECONDARY_DELETE, cur_op, subop->reply.hdr.retval, 0, 0);
|
||||
if (fail_fd >= 0)
|
||||
{
|
||||
// delete operation failed, drop the connection
|
||||
stop_client(fail_fd);
|
||||
}
|
||||
};
|
||||
outbox_push(clients[subops[i].peer_fd], &subops[i]);
|
||||
}
|
||||
i++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void osd_t::submit_primary_sync_subops(osd_op_t *cur_op)
|
||||
{
|
||||
osd_primary_op_data_t *op_data = cur_op->op_data;
|
||||
|
|
Loading…
Reference in New Issue