forked from vitalif/vitastor
Implement OSD_OP_DELETE
parent
6355b968f4
commit
e8149e5848
7
osd.cpp
7
osd.cpp
|
@ -22,6 +22,7 @@ const char* osd_op_names[] = {
|
||||||
"primary_read",
|
"primary_read",
|
||||||
"primary_write",
|
"primary_write",
|
||||||
"primary_sync",
|
"primary_sync",
|
||||||
|
"primary_delete",
|
||||||
};
|
};
|
||||||
|
|
||||||
osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringloop)
|
osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringloop)
|
||||||
|
@ -446,7 +447,7 @@ void osd_t::exec_op(osd_op_t *cur_op)
|
||||||
cur_op->req.hdr.opcode < OSD_OP_MIN || cur_op->req.hdr.opcode > OSD_OP_MAX ||
|
cur_op->req.hdr.opcode < OSD_OP_MIN || cur_op->req.hdr.opcode > OSD_OP_MAX ||
|
||||||
(cur_op->req.hdr.opcode == OSD_OP_SECONDARY_READ || cur_op->req.hdr.opcode == OSD_OP_SECONDARY_WRITE) &&
|
(cur_op->req.hdr.opcode == OSD_OP_SECONDARY_READ || cur_op->req.hdr.opcode == OSD_OP_SECONDARY_WRITE) &&
|
||||||
(cur_op->req.sec_rw.len > OSD_RW_MAX || cur_op->req.sec_rw.len % bs_disk_alignment || cur_op->req.sec_rw.offset % bs_disk_alignment) ||
|
(cur_op->req.sec_rw.len > OSD_RW_MAX || cur_op->req.sec_rw.len % bs_disk_alignment || cur_op->req.sec_rw.offset % bs_disk_alignment) ||
|
||||||
(cur_op->req.hdr.opcode == OSD_OP_READ || cur_op->req.hdr.opcode == OSD_OP_WRITE) &&
|
(cur_op->req.hdr.opcode == OSD_OP_READ || cur_op->req.hdr.opcode == OSD_OP_WRITE || cur_op->req.hdr.opcode == OSD_OP_DELETE) &&
|
||||||
(cur_op->req.rw.len > OSD_RW_MAX || cur_op->req.rw.len % bs_disk_alignment || cur_op->req.rw.offset % bs_disk_alignment))
|
(cur_op->req.rw.len > OSD_RW_MAX || cur_op->req.rw.len % bs_disk_alignment || cur_op->req.rw.offset % bs_disk_alignment))
|
||||||
{
|
{
|
||||||
// Bad command
|
// Bad command
|
||||||
|
@ -484,6 +485,10 @@ void osd_t::exec_op(osd_op_t *cur_op)
|
||||||
{
|
{
|
||||||
continue_primary_sync(cur_op);
|
continue_primary_sync(cur_op);
|
||||||
}
|
}
|
||||||
|
else if (cur_op->req.hdr.opcode == OSD_OP_DELETE)
|
||||||
|
{
|
||||||
|
continue_primary_del(cur_op);
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
exec_secondary(cur_op);
|
exec_secondary(cur_op);
|
||||||
|
|
8
osd.h
8
osd.h
|
@ -361,6 +361,7 @@ class osd_t
|
||||||
|
|
||||||
// op execution
|
// op execution
|
||||||
void exec_op(osd_op_t *cur_op);
|
void exec_op(osd_op_t *cur_op);
|
||||||
|
void finish_op(osd_op_t *cur_op, int retval);
|
||||||
|
|
||||||
// secondary ops
|
// secondary ops
|
||||||
void exec_sync_stab_all(osd_op_t *cur_op);
|
void exec_sync_stab_all(osd_op_t *cur_op);
|
||||||
|
@ -374,11 +375,14 @@ class osd_t
|
||||||
void continue_primary_read(osd_op_t *cur_op);
|
void continue_primary_read(osd_op_t *cur_op);
|
||||||
void continue_primary_write(osd_op_t *cur_op);
|
void continue_primary_write(osd_op_t *cur_op);
|
||||||
void continue_primary_sync(osd_op_t *cur_op);
|
void continue_primary_sync(osd_op_t *cur_op);
|
||||||
void finish_op(osd_op_t *cur_op, int retval);
|
void continue_primary_del(osd_op_t *cur_op);
|
||||||
|
bool check_write_queue(osd_op_t *cur_op, pg_t & pg);
|
||||||
|
void remove_object_from_state(object_id & oid, pg_osd_set_state_t *object_state, pg_t &pg);
|
||||||
|
bool finalize_primary_write(osd_op_t *cur_op, pg_t & pg, pg_osd_set_t & loc_set, int base_state);
|
||||||
void handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int retval, int expected, uint64_t version);
|
void handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int retval, int expected, uint64_t version);
|
||||||
void pg_cancel_write_queue(pg_t & pg, object_id oid, int retval);
|
void pg_cancel_write_queue(pg_t & pg, object_id oid, int retval);
|
||||||
void submit_primary_subops(int submit_type, int read_pg_size, const uint64_t* osd_set, osd_op_t *cur_op);
|
void submit_primary_subops(int submit_type, int read_pg_size, const uint64_t* osd_set, osd_op_t *cur_op);
|
||||||
void submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_osd_set_state_t *object_state);
|
void submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_osd_set_t & loc_set);
|
||||||
void submit_primary_sync_subops(osd_op_t *cur_op);
|
void submit_primary_sync_subops(osd_op_t *cur_op);
|
||||||
void submit_primary_stab_subops(osd_op_t *cur_op);
|
void submit_primary_stab_subops(osd_op_t *cur_op);
|
||||||
|
|
||||||
|
|
|
@ -22,7 +22,8 @@
|
||||||
#define OSD_OP_READ 10
|
#define OSD_OP_READ 10
|
||||||
#define OSD_OP_WRITE 11
|
#define OSD_OP_WRITE 11
|
||||||
#define OSD_OP_SYNC 12
|
#define OSD_OP_SYNC 12
|
||||||
#define OSD_OP_MAX 12
|
#define OSD_OP_DELETE 13
|
||||||
|
#define OSD_OP_MAX 13
|
||||||
// Alignment & limit for read/write operations
|
// Alignment & limit for read/write operations
|
||||||
#ifndef MEM_ALIGNMENT
|
#ifndef MEM_ALIGNMENT
|
||||||
#define MEM_ALIGNMENT 512
|
#define MEM_ALIGNMENT 512
|
||||||
|
@ -59,6 +60,7 @@ struct __attribute__((__packed__)) osd_op_secondary_rw_t
|
||||||
// object
|
// object
|
||||||
object_id oid;
|
object_id oid;
|
||||||
// read/write version (automatic or specific)
|
// read/write version (automatic or specific)
|
||||||
|
// FIXME deny values close to UINT64_MAX
|
||||||
uint64_t version;
|
uint64_t version;
|
||||||
// offset
|
// offset
|
||||||
uint32_t offset;
|
uint32_t offset;
|
||||||
|
|
|
@ -287,6 +287,7 @@ void osd_t::start_pg_peering(pg_num_t pg_num)
|
||||||
// Calculate current write OSD set
|
// Calculate current write OSD set
|
||||||
pg.pg_cursize = 0;
|
pg.pg_cursize = 0;
|
||||||
pg.cur_set.resize(pg.target_set.size());
|
pg.cur_set.resize(pg.target_set.size());
|
||||||
|
pg.cur_loc_set.clear();
|
||||||
for (int role = 0; role < pg.target_set.size(); role++)
|
for (int role = 0; role < pg.target_set.size(); role++)
|
||||||
{
|
{
|
||||||
pg.cur_set[role] = pg.target_set[role] == this->osd_num ||
|
pg.cur_set[role] = pg.target_set[role] == this->osd_num ||
|
||||||
|
@ -294,6 +295,11 @@ void osd_t::start_pg_peering(pg_num_t pg_num)
|
||||||
if (pg.cur_set[role] != 0)
|
if (pg.cur_set[role] != 0)
|
||||||
{
|
{
|
||||||
pg.pg_cursize++;
|
pg.pg_cursize++;
|
||||||
|
pg.cur_loc_set.push_back({
|
||||||
|
.role = (uint64_t)role,
|
||||||
|
.osd_num = pg.cur_set[role],
|
||||||
|
.outdated = false,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (pg.target_history.size())
|
if (pg.target_history.size())
|
||||||
|
|
|
@ -114,6 +114,8 @@ struct pg_t
|
||||||
// cur_set is the current set of connected peer OSDs for this PG
|
// cur_set is the current set of connected peer OSDs for this PG
|
||||||
// cur_set = (role => osd_num or UINT64_MAX if missing). role numbers begin with zero
|
// cur_set = (role => osd_num or UINT64_MAX if missing). role numbers begin with zero
|
||||||
std::vector<osd_num_t> cur_set;
|
std::vector<osd_num_t> cur_set;
|
||||||
|
// same thing in state_dict-like format
|
||||||
|
pg_osd_set_t cur_loc_set;
|
||||||
// moved object map. by default, each object is considered to reside on the cur_set.
|
// moved object map. by default, each object is considered to reside on the cur_set.
|
||||||
// this map stores all objects that differ.
|
// this map stores all objects that differ.
|
||||||
// it may consume up to ~ (raw storage / object size) * 24 bytes in the worst case scenario
|
// it may consume up to ~ (raw storage / object size) * 24 bytes in the worst case scenario
|
||||||
|
|
374
osd_primary.cpp
374
osd_primary.cpp
|
@ -155,6 +155,33 @@ resume_2:
|
||||||
finish_op(cur_op, cur_op->req.rw.len);
|
finish_op(cur_op, cur_op->req.rw.len);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool osd_t::check_write_queue(osd_op_t *cur_op, pg_t & pg)
|
||||||
|
{
|
||||||
|
osd_primary_op_data_t *op_data = cur_op->op_data;
|
||||||
|
// Check if actions are pending for this object
|
||||||
|
auto act_it = pg.flush_actions.lower_bound((obj_piece_id_t){
|
||||||
|
.oid = op_data->oid,
|
||||||
|
.osd_num = 0,
|
||||||
|
});
|
||||||
|
if (act_it != pg.flush_actions.end() &&
|
||||||
|
act_it->first.oid.inode == op_data->oid.inode &&
|
||||||
|
(act_it->first.oid.stripe & ~STRIPE_MASK) == op_data->oid.stripe)
|
||||||
|
{
|
||||||
|
pg.write_queue.emplace(op_data->oid, cur_op);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
// Check if there are other write requests to the same object
|
||||||
|
auto vo_it = pg.write_queue.find(op_data->oid);
|
||||||
|
if (vo_it != pg.write_queue.end())
|
||||||
|
{
|
||||||
|
op_data->st = 1;
|
||||||
|
pg.write_queue.emplace(op_data->oid, cur_op);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
pg.write_queue.emplace(op_data->oid, cur_op);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
void osd_t::continue_primary_write(osd_op_t *cur_op)
|
void osd_t::continue_primary_write(osd_op_t *cur_op)
|
||||||
{
|
{
|
||||||
if (!cur_op->op_data && !prepare_primary_rw(cur_op))
|
if (!cur_op->op_data && !prepare_primary_rw(cur_op))
|
||||||
|
@ -172,30 +199,9 @@ void osd_t::continue_primary_write(osd_op_t *cur_op)
|
||||||
else if (op_data->st == 7) goto resume_7;
|
else if (op_data->st == 7) goto resume_7;
|
||||||
else if (op_data->st == 8) goto resume_8;
|
else if (op_data->st == 8) goto resume_8;
|
||||||
assert(op_data->st == 0);
|
assert(op_data->st == 0);
|
||||||
// Check if actions are pending for this object
|
if (!check_write_queue(cur_op, pg))
|
||||||
{
|
{
|
||||||
auto act_it = pg.flush_actions.lower_bound((obj_piece_id_t){
|
return;
|
||||||
.oid = op_data->oid,
|
|
||||||
.osd_num = 0,
|
|
||||||
});
|
|
||||||
if (act_it != pg.flush_actions.end() &&
|
|
||||||
act_it->first.oid.inode == op_data->oid.inode &&
|
|
||||||
(act_it->first.oid.stripe & ~STRIPE_MASK) == op_data->oid.stripe)
|
|
||||||
{
|
|
||||||
pg.write_queue.emplace(op_data->oid, cur_op);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Check if there are other write requests to the same object
|
|
||||||
{
|
|
||||||
auto vo_it = pg.write_queue.find(op_data->oid);
|
|
||||||
if (vo_it != pg.write_queue.end())
|
|
||||||
{
|
|
||||||
op_data->st = 1;
|
|
||||||
pg.write_queue.emplace(op_data->oid, cur_op);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
pg.write_queue.emplace(op_data->oid, cur_op);
|
|
||||||
}
|
}
|
||||||
resume_1:
|
resume_1:
|
||||||
// Determine blocks to read and write
|
// Determine blocks to read and write
|
||||||
|
@ -241,7 +247,7 @@ resume_5:
|
||||||
if (op_data->object_state->state & OBJ_MISPLACED)
|
if (op_data->object_state->state & OBJ_MISPLACED)
|
||||||
{
|
{
|
||||||
// Remove extra chunks
|
// Remove extra chunks
|
||||||
submit_primary_del_subops(cur_op, pg.cur_set.data(), op_data->object_state);
|
submit_primary_del_subops(cur_op, pg.cur_set.data(), op_data->object_state->osd_set);
|
||||||
if (op_data->n_subops > 0)
|
if (op_data->n_subops > 0)
|
||||||
{
|
{
|
||||||
op_data->st = 8;
|
op_data->st = 8;
|
||||||
|
@ -255,118 +261,19 @@ resume_8:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Clear object state
|
// Clear object state
|
||||||
if (op_data->object_state->state & OBJ_INCOMPLETE)
|
remove_object_from_state(op_data->oid, op_data->object_state, pg);
|
||||||
{
|
|
||||||
// Successful write means that object is not incomplete anymore
|
|
||||||
incomplete_objects--;
|
|
||||||
pg.incomplete_objects.erase(op_data->oid);
|
|
||||||
if (!pg.incomplete_objects.size())
|
|
||||||
{
|
|
||||||
pg.state = pg.state & ~PG_HAS_INCOMPLETE;
|
|
||||||
report_pg_state(pg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if (op_data->object_state->state & OBJ_DEGRADED)
|
|
||||||
{
|
|
||||||
degraded_objects--;
|
|
||||||
pg.degraded_objects.erase(op_data->oid);
|
|
||||||
if (!pg.degraded_objects.size())
|
|
||||||
{
|
|
||||||
pg.state = pg.state & ~PG_HAS_DEGRADED;
|
|
||||||
report_pg_state(pg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if (op_data->object_state->state & OBJ_MISPLACED)
|
|
||||||
{
|
|
||||||
misplaced_objects--;
|
|
||||||
pg.misplaced_objects.erase(op_data->oid);
|
|
||||||
if (!pg.misplaced_objects.size())
|
|
||||||
{
|
|
||||||
pg.state = pg.state & ~PG_HAS_MISPLACED;
|
|
||||||
report_pg_state(pg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
throw std::runtime_error("Invalid object state during recovery: "+std::to_string(op_data->object_state->state));
|
|
||||||
}
|
|
||||||
pg.clean_count++;
|
pg.clean_count++;
|
||||||
op_data->object_state->object_count--;
|
|
||||||
if (!op_data->object_state->object_count)
|
|
||||||
{
|
|
||||||
pg.state_dict.erase(op_data->object_state->osd_set);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// FIXME: Check for immediate_commit == IMMEDIATE_SMALL
|
|
||||||
if (immediate_commit == IMMEDIATE_ALL)
|
|
||||||
{
|
|
||||||
op_data->unstable_write_osds = new std::vector<unstable_osd_num_t>();
|
|
||||||
op_data->unstable_writes = new obj_ver_id[pg.pg_cursize];
|
|
||||||
{
|
|
||||||
int last_start = 0;
|
|
||||||
osd_num_t *osd_set = pg.cur_set.data();
|
|
||||||
for (int role = 0; role < pg.pg_size; role++)
|
|
||||||
{
|
|
||||||
if (osd_set[role] != 0)
|
|
||||||
{
|
|
||||||
op_data->unstable_writes[last_start] = (obj_ver_id){
|
|
||||||
.oid = {
|
|
||||||
.inode = op_data->oid.inode,
|
|
||||||
.stripe = op_data->oid.stripe | role,
|
|
||||||
},
|
|
||||||
.version = op_data->fact_ver,
|
|
||||||
};
|
|
||||||
op_data->unstable_write_osds->push_back((unstable_osd_num_t){
|
|
||||||
.osd_num = osd_set[role],
|
|
||||||
.start = last_start,
|
|
||||||
.len = 1,
|
|
||||||
});
|
|
||||||
last_start++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Stabilize version sets
|
|
||||||
submit_primary_stab_subops(cur_op);
|
|
||||||
resume_6:
|
|
||||||
op_data->st = 6;
|
|
||||||
return;
|
|
||||||
resume_7:
|
|
||||||
// FIXME: Free those in the destructor?
|
|
||||||
delete op_data->unstable_write_osds;
|
|
||||||
delete[] op_data->unstable_writes;
|
|
||||||
op_data->unstable_writes = NULL;
|
|
||||||
op_data->unstable_write_osds = NULL;
|
|
||||||
if (op_data->errors > 0)
|
|
||||||
{
|
|
||||||
pg_cancel_write_queue(pg, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
// Remember version as unstable
|
|
||||||
osd_num_t *osd_set = pg.cur_set.data();
|
|
||||||
for (int role = 0; role < pg.pg_size; role++)
|
|
||||||
{
|
|
||||||
if (osd_set[role] != 0)
|
|
||||||
{
|
|
||||||
this->unstable_writes[(osd_object_id_t){
|
|
||||||
.osd_num = osd_set[role],
|
|
||||||
.oid = {
|
|
||||||
.inode = op_data->oid.inode,
|
|
||||||
.stripe = op_data->oid.stripe | role,
|
|
||||||
},
|
|
||||||
}] = op_data->fact_ver;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Remember PG as dirty to drop the connection when PG goes offline
|
|
||||||
// (this is required because of the "lazy sync")
|
|
||||||
this->clients[cur_op->peer_fd].dirty_pgs.insert(op_data->pg_num);
|
|
||||||
dirty_pgs.insert(op_data->pg_num);
|
|
||||||
}
|
}
|
||||||
// Remove version override
|
// Remove version override
|
||||||
|
pg.ver_override.erase(op_data->oid);
|
||||||
|
// FIXME: Check for immediate_commit == IMMEDIATE_SMALL
|
||||||
|
resume_6:
|
||||||
|
resume_7:
|
||||||
|
if (!finalize_primary_write(cur_op, pg, pg.cur_loc_set, 6))
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
object_id oid = op_data->oid;
|
object_id oid = op_data->oid;
|
||||||
pg.ver_override.erase(oid);
|
|
||||||
finish_op(cur_op, cur_op->req.rw.len);
|
finish_op(cur_op, cur_op->req.rw.len);
|
||||||
// Continue other write operations to the same object
|
// Continue other write operations to the same object
|
||||||
auto next_it = pg.write_queue.find(oid);
|
auto next_it = pg.write_queue.find(oid);
|
||||||
|
@ -381,6 +288,77 @@ resume_7:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool osd_t::finalize_primary_write(osd_op_t *cur_op, pg_t & pg, pg_osd_set_t & loc_set, int base_state)
|
||||||
|
{
|
||||||
|
osd_primary_op_data_t *op_data = cur_op->op_data;
|
||||||
|
if (op_data->st == base_state)
|
||||||
|
{
|
||||||
|
goto resume_6;
|
||||||
|
}
|
||||||
|
else if (op_data->st == base_state+1)
|
||||||
|
{
|
||||||
|
goto resume_7;
|
||||||
|
}
|
||||||
|
if (immediate_commit == IMMEDIATE_ALL)
|
||||||
|
{
|
||||||
|
op_data->unstable_write_osds = new std::vector<unstable_osd_num_t>();
|
||||||
|
op_data->unstable_writes = new obj_ver_id[loc_set.size()];
|
||||||
|
{
|
||||||
|
int last_start = 0;
|
||||||
|
for (auto & chunk: loc_set)
|
||||||
|
{
|
||||||
|
op_data->unstable_writes[last_start] = (obj_ver_id){
|
||||||
|
.oid = {
|
||||||
|
.inode = op_data->oid.inode,
|
||||||
|
.stripe = op_data->oid.stripe | chunk.role,
|
||||||
|
},
|
||||||
|
.version = op_data->fact_ver,
|
||||||
|
};
|
||||||
|
op_data->unstable_write_osds->push_back((unstable_osd_num_t){
|
||||||
|
.osd_num = chunk.osd_num,
|
||||||
|
.start = last_start,
|
||||||
|
.len = 1,
|
||||||
|
});
|
||||||
|
last_start++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
submit_primary_stab_subops(cur_op);
|
||||||
|
resume_6:
|
||||||
|
op_data->st = 6;
|
||||||
|
return false;
|
||||||
|
resume_7:
|
||||||
|
// FIXME: Free those in the destructor?
|
||||||
|
delete op_data->unstable_write_osds;
|
||||||
|
delete[] op_data->unstable_writes;
|
||||||
|
op_data->unstable_writes = NULL;
|
||||||
|
op_data->unstable_write_osds = NULL;
|
||||||
|
if (op_data->errors > 0)
|
||||||
|
{
|
||||||
|
pg_cancel_write_queue(pg, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Remember version as unstable
|
||||||
|
for (auto & chunk: loc_set)
|
||||||
|
{
|
||||||
|
this->unstable_writes[(osd_object_id_t){
|
||||||
|
.osd_num = chunk.osd_num,
|
||||||
|
.oid = {
|
||||||
|
.inode = op_data->oid.inode,
|
||||||
|
.stripe = op_data->oid.stripe | chunk.role,
|
||||||
|
},
|
||||||
|
}] = op_data->fact_ver;
|
||||||
|
}
|
||||||
|
// Remember PG as dirty to drop the connection when PG goes offline
|
||||||
|
// (this is required because of the "lazy sync")
|
||||||
|
this->clients[cur_op->peer_fd].dirty_pgs.insert(op_data->pg_num);
|
||||||
|
dirty_pgs.insert(op_data->pg_num);
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
// Save and clear unstable_writes -> SYNC all -> STABLE all
|
// Save and clear unstable_writes -> SYNC all -> STABLE all
|
||||||
void osd_t::continue_primary_sync(osd_op_t *cur_op)
|
void osd_t::continue_primary_sync(osd_op_t *cur_op)
|
||||||
{
|
{
|
||||||
|
@ -543,3 +521,135 @@ finish:
|
||||||
goto resume_2;
|
goto resume_2;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Decrement pg_osd_set_state_t's object_count and change PG state accordingly
|
||||||
|
void osd_t::remove_object_from_state(object_id & oid, pg_osd_set_state_t *object_state, pg_t & pg)
|
||||||
|
{
|
||||||
|
if (object_state->state & OBJ_INCOMPLETE)
|
||||||
|
{
|
||||||
|
// Successful write means that object is not incomplete anymore
|
||||||
|
this->incomplete_objects--;
|
||||||
|
pg.incomplete_objects.erase(oid);
|
||||||
|
if (!pg.incomplete_objects.size())
|
||||||
|
{
|
||||||
|
pg.state = pg.state & ~PG_HAS_INCOMPLETE;
|
||||||
|
report_pg_state(pg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (object_state->state & OBJ_DEGRADED)
|
||||||
|
{
|
||||||
|
this->degraded_objects--;
|
||||||
|
pg.degraded_objects.erase(oid);
|
||||||
|
if (!pg.degraded_objects.size())
|
||||||
|
{
|
||||||
|
pg.state = pg.state & ~PG_HAS_DEGRADED;
|
||||||
|
report_pg_state(pg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (object_state->state & OBJ_MISPLACED)
|
||||||
|
{
|
||||||
|
this->misplaced_objects--;
|
||||||
|
pg.misplaced_objects.erase(oid);
|
||||||
|
if (!pg.misplaced_objects.size())
|
||||||
|
{
|
||||||
|
pg.state = pg.state & ~PG_HAS_MISPLACED;
|
||||||
|
report_pg_state(pg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
throw std::runtime_error("BUG: Invalid object state: "+std::to_string(object_state->state));
|
||||||
|
}
|
||||||
|
object_state->object_count--;
|
||||||
|
if (!object_state->object_count)
|
||||||
|
{
|
||||||
|
pg.state_dict.erase(object_state->osd_set);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void osd_t::continue_primary_del(osd_op_t *cur_op)
|
||||||
|
{
|
||||||
|
if (!cur_op->op_data && !prepare_primary_rw(cur_op))
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
osd_primary_op_data_t *op_data = cur_op->op_data;
|
||||||
|
auto & pg = pgs[op_data->pg_num];
|
||||||
|
if (op_data->st == 1) goto resume_1;
|
||||||
|
else if (op_data->st == 2) goto resume_2;
|
||||||
|
else if (op_data->st == 3) goto resume_3;
|
||||||
|
else if (op_data->st == 4) goto resume_4;
|
||||||
|
else if (op_data->st == 5) goto resume_5;
|
||||||
|
else if (op_data->st == 6) goto resume_6;
|
||||||
|
else if (op_data->st == 7) goto resume_7;
|
||||||
|
assert(op_data->st == 0);
|
||||||
|
// Delete is forbidden even in active PGs if they're also degraded or have previous dead OSDs
|
||||||
|
if (pg.state & (PG_DEGRADED | PG_LEFT_ON_DEAD))
|
||||||
|
{
|
||||||
|
finish_op(cur_op, -EBUSY);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (!check_write_queue(cur_op, pg))
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
resume_1:
|
||||||
|
// Determine which OSDs contain this object and delete it
|
||||||
|
op_data->prev_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data(), &op_data->object_state);
|
||||||
|
// Submit 1 read to determine the actual version number
|
||||||
|
submit_primary_subops(SUBMIT_RMW_READ, pg.pg_size, op_data->prev_set, cur_op);
|
||||||
|
resume_2:
|
||||||
|
op_data->st = 2;
|
||||||
|
return;
|
||||||
|
resume_3:
|
||||||
|
if (op_data->errors > 0)
|
||||||
|
{
|
||||||
|
pg_cancel_write_queue(pg, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Save version override for parallel reads
|
||||||
|
pg.ver_override[op_data->oid] = op_data->fact_ver;
|
||||||
|
// Submit deletes
|
||||||
|
op_data->fact_ver++;
|
||||||
|
submit_primary_del_subops(cur_op, NULL, op_data->object_state ? op_data->object_state->osd_set : pg.cur_loc_set);
|
||||||
|
resume_4:
|
||||||
|
op_data->st = 4;
|
||||||
|
return;
|
||||||
|
resume_5:
|
||||||
|
if (op_data->errors > 0)
|
||||||
|
{
|
||||||
|
pg_cancel_write_queue(pg, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Remove version override
|
||||||
|
pg.ver_override.erase(op_data->oid);
|
||||||
|
resume_6:
|
||||||
|
resume_7:
|
||||||
|
if (!finalize_primary_write(cur_op, pg, op_data->object_state ? op_data->object_state->osd_set : pg.cur_loc_set, 6))
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Adjust PG stats after "instant stabilize", because we need object_state above
|
||||||
|
if (!op_data->object_state)
|
||||||
|
{
|
||||||
|
pg.clean_count--;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
remove_object_from_state(op_data->oid, op_data->object_state, pg);
|
||||||
|
}
|
||||||
|
pg.total_count--;
|
||||||
|
object_id oid = op_data->oid;
|
||||||
|
finish_op(cur_op, cur_op->req.rw.len);
|
||||||
|
// Continue other write operations to the same object
|
||||||
|
auto next_it = pg.write_queue.find(oid);
|
||||||
|
auto this_it = next_it;
|
||||||
|
next_it++;
|
||||||
|
pg.write_queue.erase(this_it);
|
||||||
|
if (next_it != pg.write_queue.end() &&
|
||||||
|
next_it->first == oid)
|
||||||
|
{
|
||||||
|
osd_op_t *next_op = next_it->second;
|
||||||
|
continue_primary_write(next_op);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -1,9 +1,5 @@
|
||||||
#include "osd_primary.h"
|
#include "osd_primary.h"
|
||||||
|
|
||||||
#define SUBMIT_READ 0
|
|
||||||
#define SUBMIT_RMW_READ 1
|
|
||||||
#define SUBMIT_WRITE 2
|
|
||||||
|
|
||||||
void osd_t::autosync()
|
void osd_t::autosync()
|
||||||
{
|
{
|
||||||
if (immediate_commit != IMMEDIATE_ALL && !autosync_op)
|
if (immediate_commit != IMMEDIATE_ALL && !autosync_op)
|
||||||
|
@ -227,6 +223,10 @@ void osd_t::handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int retval,
|
||||||
{
|
{
|
||||||
continue_primary_sync(cur_op);
|
continue_primary_sync(cur_op);
|
||||||
}
|
}
|
||||||
|
else if (cur_op->req.hdr.opcode == OSD_OP_DELETE)
|
||||||
|
{
|
||||||
|
continue_primary_del(cur_op);
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
throw std::runtime_error("BUG: unknown opcode");
|
throw std::runtime_error("BUG: unknown opcode");
|
||||||
|
@ -234,13 +234,13 @@ void osd_t::handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int retval,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void osd_t::submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_osd_set_state_t *object_state)
|
void osd_t::submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_osd_set_t & loc_set)
|
||||||
{
|
{
|
||||||
osd_primary_op_data_t *op_data = cur_op->op_data;
|
osd_primary_op_data_t *op_data = cur_op->op_data;
|
||||||
int extra_chunks = 0;
|
int extra_chunks = 0;
|
||||||
for (auto chunk: object_state->osd_set)
|
for (auto & chunk: loc_set)
|
||||||
{
|
{
|
||||||
if (chunk.osd_num != cur_set[chunk.role])
|
if (!cur_set || chunk.osd_num != cur_set[chunk.role])
|
||||||
{
|
{
|
||||||
extra_chunks++;
|
extra_chunks++;
|
||||||
}
|
}
|
||||||
|
@ -254,9 +254,9 @@ void osd_t::submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_os
|
||||||
osd_op_t *subops = new osd_op_t[extra_chunks];
|
osd_op_t *subops = new osd_op_t[extra_chunks];
|
||||||
op_data->subops = subops;
|
op_data->subops = subops;
|
||||||
int i = 0;
|
int i = 0;
|
||||||
for (auto chunk: object_state->osd_set)
|
for (auto & chunk: loc_set)
|
||||||
{
|
{
|
||||||
if (chunk.osd_num != cur_set[chunk.role])
|
if (!cur_set || chunk.osd_num != cur_set[chunk.role])
|
||||||
{
|
{
|
||||||
if (chunk.osd_num == this->osd_num)
|
if (chunk.osd_num == this->osd_num)
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue