vitastor/osd_primary.cpp

1007 lines
35 KiB
C++
Raw Normal View History

#include "osd.h"
#include "osd_rmw.h"
// FIXME: Split into more files
#define SUBMIT_READ 0
#define SUBMIT_RMW_READ 1
#define SUBMIT_WRITE 2
2020-02-03 12:35:02 +03:00
// read: read directly or read paired stripe(s), reconstruct, return
// write: read paired stripe(s), reconstruct, modify, calculate parity, write
2020-02-09 18:57:45 +03:00
//
2020-02-03 12:35:02 +03:00
// nuance: take care to read the same version from paired stripes!
2020-02-09 18:57:45 +03:00
// to do so, we remember "last readable" version until a write request completes
// and we postpone other write requests to the same stripe until completion of previous ones
2020-02-03 12:35:02 +03:00
//
// sync: sync peers, get unstable versions, stabilize them
2020-02-03 12:35:02 +03:00
2020-02-25 20:10:17 +03:00
struct unstable_osd_num_t
{
osd_num_t osd_num;
int start, len;
};
struct osd_primary_op_data_t
2020-02-03 12:35:02 +03:00
{
int st = 0;
2020-02-03 12:35:02 +03:00
pg_num_t pg_num;
object_id oid;
uint64_t target_ver;
uint64_t fact_ver = 0;
int n_subops = 0, done = 0, errors = 0, epipe = 0;
2020-02-03 12:35:02 +03:00
int degraded = 0, pg_size, pg_minsize;
osd_rmw_stripe_t *stripes;
2020-02-03 12:35:02 +03:00
osd_op_t *subops = NULL;
uint64_t *prev_set = NULL;
pg_osd_set_state_t *object_state = NULL;
2020-02-25 20:10:17 +03:00
// for sync. oops, requires freeing
std::vector<unstable_osd_num_t> *unstable_write_osds = NULL;
2020-04-03 13:03:42 +03:00
pg_num_t *dirty_pgs = NULL;
int dirty_pg_count = 0;
2020-02-25 20:10:17 +03:00
obj_ver_id *unstable_writes = NULL;
2020-02-03 12:35:02 +03:00
};
void osd_t::finish_op(osd_op_t *cur_op, int retval)
2020-02-03 12:35:02 +03:00
{
2020-04-03 13:03:42 +03:00
inflight_ops--;
if (cur_op->op_data && cur_op->op_data->pg_num > 0)
{
auto & pg = pgs[cur_op->op_data->pg_num];
int n = --pg.inflight;
assert(n >= 0);
if (n == 0 && (pg.state & PG_STOPPING))
{
finish_stop_pg(pg);
}
}
if (!cur_op->peer_fd)
{
// Copy lambda to be unaffected by `delete op`
std::function<void(osd_op_t*)>(cur_op->callback)(cur_op);
}
else
{
// FIXME add separate magic number
auto cl_it = clients.find(cur_op->peer_fd);
if (cl_it != clients.end())
{
cur_op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC;
cur_op->reply.hdr.id = cur_op->req.hdr.id;
cur_op->reply.hdr.opcode = cur_op->req.hdr.opcode;
cur_op->reply.hdr.retval = retval;
outbox_push(cl_it->second, cur_op);
}
else
{
delete cur_op;
}
}
2020-02-03 12:35:02 +03:00
}
bool osd_t::prepare_primary_rw(osd_op_t *cur_op)
{
// PG number is calculated from the offset
// Our EC scheme stores data in fixed chunks equal to (K*block size)
// But we must not use K in the process of calculating the PG number
// So we calculate the PG number using a separate setting which should be per-inode (FIXME)
pg_num_t pg_num = (cur_op->req.rw.inode + cur_op->req.rw.offset / pg_stripe_size) % pg_count + 1;
2020-04-03 13:03:42 +03:00
auto pg_it = pgs.find(pg_num);
if (pg_it == pgs.end() || !(pg_it->second.state & PG_ACTIVE))
{
2020-04-03 13:03:42 +03:00
// This OSD is not primary for this PG or the PG is inactive
finish_op(cur_op, -EPIPE);
return false;
}
2020-04-03 13:03:42 +03:00
uint64_t pg_block_size = bs_block_size * pg_it->second.pg_minsize;
2020-01-30 22:06:46 +03:00
object_id oid = {
2020-02-23 19:03:06 +03:00
.inode = cur_op->req.rw.inode,
// oid.stripe = starting offset of the parity stripe, so it can be mapped back to the PG
.stripe = (cur_op->req.rw.offset / pg_stripe_size) * pg_stripe_size +
((cur_op->req.rw.offset % pg_stripe_size) / pg_block_size) * pg_block_size
2020-01-30 22:06:46 +03:00
};
if ((cur_op->req.rw.offset + cur_op->req.rw.len) > (oid.stripe + pg_block_size) ||
2020-02-23 19:03:06 +03:00
(cur_op->req.rw.offset % bs_disk_alignment) != 0 ||
(cur_op->req.rw.len % bs_disk_alignment) != 0)
2020-01-30 22:06:46 +03:00
{
finish_op(cur_op, -EINVAL);
return false;
2020-01-30 22:06:46 +03:00
}
osd_primary_op_data_t *op_data = (osd_primary_op_data_t*)calloc(
2020-04-03 13:03:42 +03:00
sizeof(osd_primary_op_data_t) + sizeof(osd_rmw_stripe_t) * pg_it->second.pg_size, 1
2020-02-03 12:35:02 +03:00
);
op_data->pg_num = pg_num;
2020-02-09 18:22:29 +03:00
op_data->oid = oid;
op_data->stripes = ((osd_rmw_stripe_t*)(op_data+1));
2020-02-03 12:35:02 +03:00
cur_op->op_data = op_data;
2020-04-03 13:03:42 +03:00
split_stripes(pg_it->second.pg_minsize, bs_block_size, (uint32_t)(cur_op->req.rw.offset - oid.stripe), cur_op->req.rw.len, op_data->stripes);
pg_it->second.inflight++;
return true;
}
uint64_t* get_object_osd_set(pg_t &pg, object_id &oid, uint64_t *def, pg_osd_set_state_t **object_state)
{
if (!(pg.state & (PG_HAS_INCOMPLETE | PG_HAS_DEGRADED | PG_HAS_MISPLACED)))
{
*object_state = NULL;
return def;
}
auto st_it = pg.incomplete_objects.find(oid);
if (st_it != pg.incomplete_objects.end())
{
*object_state = st_it->second;
return st_it->second->read_target.data();
}
st_it = pg.degraded_objects.find(oid);
if (st_it != pg.degraded_objects.end())
{
*object_state = st_it->second;
return st_it->second->read_target.data();
}
st_it = pg.misplaced_objects.find(oid);
if (st_it != pg.misplaced_objects.end())
{
*object_state = st_it->second;
return st_it->second->read_target.data();
}
*object_state = NULL;
return def;
}
void osd_t::continue_primary_read(osd_op_t *cur_op)
{
if (!cur_op->op_data && !prepare_primary_rw(cur_op))
2020-02-03 12:35:02 +03:00
{
return;
2020-02-03 12:35:02 +03:00
}
osd_primary_op_data_t *op_data = cur_op->op_data;
if (op_data->st == 1) goto resume_1;
else if (op_data->st == 2) goto resume_2;
2020-01-30 22:06:46 +03:00
{
auto & pg = pgs[op_data->pg_num];
for (int role = 0; role < pg.pg_minsize; role++)
2020-02-03 12:35:02 +03:00
{
op_data->stripes[role].read_start = op_data->stripes[role].req_start;
op_data->stripes[role].read_end = op_data->stripes[role].req_end;
2020-02-03 12:35:02 +03:00
}
// Determine version
auto vo_it = pg.ver_override.find(op_data->oid);
op_data->target_ver = vo_it != pg.ver_override.end() ? vo_it->second : UINT64_MAX;
if (pg.state == PG_ACTIVE)
2020-01-30 22:06:46 +03:00
{
// Fast happy-path
cur_op->buf = alloc_read_buffer(op_data->stripes, pg.pg_minsize, 0);
submit_primary_subops(SUBMIT_READ, pg.pg_minsize, pg.cur_set.data(), cur_op);
cur_op->send_list.push_back(cur_op->buf, cur_op->req.rw.len);
op_data->st = 1;
2020-02-03 12:35:02 +03:00
}
else
2020-02-03 12:35:02 +03:00
{
// PG may be degraded or have misplaced objects
uint64_t* cur_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data(), &op_data->object_state);
if (extend_missing_stripes(op_data->stripes, cur_set, pg.pg_minsize, pg.pg_size) < 0)
2020-01-30 22:06:46 +03:00
{
finish_op(cur_op, -EIO);
return;
2020-01-30 22:06:46 +03:00
}
// Submit reads
op_data->pg_minsize = pg.pg_minsize;
op_data->pg_size = pg.pg_size;
op_data->degraded = 1;
cur_op->buf = alloc_read_buffer(op_data->stripes, pg.pg_size, 0);
submit_primary_subops(SUBMIT_READ, pg.pg_size, cur_set, cur_op);
op_data->st = 1;
2020-01-30 22:06:46 +03:00
}
}
resume_1:
return;
resume_2:
if (op_data->errors > 0)
{
finish_op(cur_op, op_data->epipe > 0 ? -EPIPE : -EIO);
return;
2020-02-03 12:35:02 +03:00
}
if (op_data->degraded)
{
// Reconstruct missing stripes
// FIXME: Always EC(k+1) by now. Add different coding schemes
osd_rmw_stripe_t *stripes = op_data->stripes;
for (int role = 0; role < op_data->pg_minsize; role++)
{
if (stripes[role].read_end != 0 && stripes[role].missing)
{
reconstruct_stripe(stripes, op_data->pg_size, role);
}
if (stripes[role].req_end != 0)
{
// Send buffer in parts to avoid copying
cur_op->send_list.push_back(
stripes[role].read_buf + (stripes[role].req_start - stripes[role].read_start),
stripes[role].req_end - stripes[role].req_start
);
}
}
}
finish_op(cur_op, cur_op->req.rw.len);
2020-02-03 12:35:02 +03:00
}
void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t* osd_set, osd_op_t *cur_op)
2020-02-03 12:35:02 +03:00
{
bool w = submit_type == SUBMIT_WRITE;
osd_primary_op_data_t *op_data = cur_op->op_data;
osd_rmw_stripe_t *stripes = op_data->stripes;
// Allocate subops
int n_subops = 0, zero_read = -1;
for (int role = 0; role < pg_size; role++)
2020-02-03 12:35:02 +03:00
{
if (osd_set[role] == this->osd_num || osd_set[role] != 0 && zero_read == -1)
2020-02-03 12:35:02 +03:00
{
zero_read = role;
2020-01-31 14:07:16 +03:00
}
2020-02-25 01:20:45 +03:00
if (osd_set[role] != 0 && (w || stripes[role].read_end != 0))
2020-02-09 18:57:45 +03:00
{
n_subops++;
2020-02-09 18:57:45 +03:00
}
2020-02-03 12:35:02 +03:00
}
if (!n_subops && submit_type == SUBMIT_RMW_READ)
{
n_subops = 1;
}
else
{
zero_read = -1;
}
uint64_t op_version = w ? op_data->fact_ver+1 : (submit_type == SUBMIT_RMW_READ ? UINT64_MAX : op_data->target_ver);
2020-02-03 12:35:02 +03:00
osd_op_t *subops = new osd_op_t[n_subops];
op_data->fact_ver = 0;
op_data->done = op_data->errors = 0;
2020-02-03 12:35:02 +03:00
op_data->n_subops = n_subops;
op_data->subops = subops;
int subop = 0;
for (int role = 0; role < pg_size; role++)
2020-02-03 12:35:02 +03:00
{
2020-02-25 01:20:45 +03:00
// We always submit zero-length writes to all replicas, even if the stripe is not modified
if (!(w || stripes[role].read_end != 0 || zero_read == role))
2020-02-09 18:22:29 +03:00
{
continue;
}
2020-02-25 01:20:45 +03:00
osd_num_t role_osd_num = osd_set[role];
2020-02-03 12:35:02 +03:00
if (role_osd_num != 0)
2020-01-31 13:46:23 +03:00
{
2020-02-03 12:35:02 +03:00
if (role_osd_num == this->osd_num)
2020-01-31 13:46:23 +03:00
{
2020-02-23 23:19:11 +03:00
subops[subop].bs_op = new blockstore_op_t({
.opcode = (uint64_t)(w ? BS_OP_WRITE : BS_OP_READ),
.callback = [cur_op, this](blockstore_op_t *subop)
2020-02-03 12:35:02 +03:00
{
if (subop->opcode == BS_OP_WRITE && subop->retval != subop->len)
{
// die
throw std::runtime_error("local write operation failed (retval = "+std::to_string(subop->retval)+")");
}
handle_primary_subop(
subop->opcode == BS_OP_WRITE ? OSD_OP_SECONDARY_WRITE : OSD_OP_SECONDARY_READ,
cur_op, subop->retval, subop->len, subop->version
);
2020-02-03 12:35:02 +03:00
},
.oid = {
.inode = op_data->oid.inode,
.stripe = op_data->oid.stripe | role,
},
.version = op_version,
.offset = w ? stripes[role].write_start : stripes[role].read_start,
.len = w ? stripes[role].write_end - stripes[role].write_start : stripes[role].read_end - stripes[role].read_start,
.buf = w ? stripes[role].write_buf : stripes[role].read_buf,
2020-02-23 23:19:11 +03:00
});
bs->enqueue_op(subops[subop].bs_op);
2020-02-03 12:35:02 +03:00
}
else
{
subops[subop].op_type = OSD_OP_OUT;
2020-02-29 02:27:10 +03:00
subops[subop].send_list.push_back(subops[subop].req.buf, OSD_PACKET_SIZE);
2020-02-03 12:35:02 +03:00
subops[subop].peer_fd = this->osd_peer_fds.at(role_osd_num);
2020-02-23 19:03:06 +03:00
subops[subop].req.sec_rw = {
2020-02-03 12:35:02 +03:00
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
.id = this->next_subop_id++,
.opcode = (uint64_t)(w ? OSD_OP_SECONDARY_WRITE : OSD_OP_SECONDARY_READ),
2020-02-03 12:35:02 +03:00
},
.oid = {
.inode = op_data->oid.inode,
.stripe = op_data->oid.stripe | role,
},
.version = op_version,
.offset = w ? stripes[role].write_start : stripes[role].read_start,
.len = w ? stripes[role].write_end - stripes[role].write_start : stripes[role].read_end - stripes[role].read_start,
2020-02-03 12:35:02 +03:00
};
subops[subop].buf = w ? stripes[role].write_buf : stripes[role].read_buf;
2020-02-25 01:20:45 +03:00
if (w && stripes[role].write_end > 0)
{
subops[subop].send_list.push_back(stripes[role].write_buf, stripes[role].write_end - stripes[role].write_start);
}
subops[subop].callback = [cur_op, this](osd_op_t *subop)
2020-02-03 12:35:02 +03:00
{
int fail_fd = subop->req.hdr.opcode == OSD_OP_SECONDARY_WRITE &&
subop->reply.hdr.retval != subop->req.sec_rw.len ? subop->peer_fd : -1;
2020-02-24 01:01:34 +03:00
// so it doesn't get freed
2020-02-09 18:22:29 +03:00
subop->buf = NULL;
handle_primary_subop(
subop->req.hdr.opcode, cur_op, subop->reply.hdr.retval,
subop->req.sec_rw.len, subop->reply.sec_rw.version
);
if (fail_fd >= 0)
{
// write operation failed, drop the connection
stop_client(fail_fd);
}
2020-02-03 12:35:02 +03:00
};
2020-02-09 18:22:29 +03:00
outbox_push(clients[subops[subop].peer_fd], &subops[subop]);
2020-01-31 13:46:23 +03:00
}
2020-02-03 12:35:02 +03:00
subop++;
2020-01-31 13:46:23 +03:00
}
2020-01-30 22:06:46 +03:00
}
}
void osd_t::handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int retval, int expected, uint64_t version)
2020-01-30 22:06:46 +03:00
{
osd_primary_op_data_t *op_data = cur_op->op_data;
if (retval != expected)
{
printf("%s subop failed: retval = %d (expected %d)\n", osd_op_names[opcode], retval, expected);
if (retval == -EPIPE)
{
op_data->epipe++;
}
op_data->errors++;
}
else
{
op_data->done++;
if (opcode == OSD_OP_SECONDARY_READ || opcode == OSD_OP_SECONDARY_WRITE)
{
if (op_data->fact_ver != 0 && op_data->fact_ver != version)
{
throw std::runtime_error("different fact_versions returned from subops: "+std::to_string(version)+" vs "+std::to_string(op_data->fact_ver));
}
op_data->fact_ver = version;
}
}
if ((op_data->errors + op_data->done) >= op_data->n_subops)
{
delete[] op_data->subops;
op_data->subops = NULL;
op_data->st++;
if (cur_op->req.hdr.opcode == OSD_OP_READ)
{
continue_primary_read(cur_op);
}
2020-02-25 22:52:03 +03:00
else if (cur_op->req.hdr.opcode == OSD_OP_WRITE)
{
continue_primary_write(cur_op);
}
2020-02-25 22:52:03 +03:00
else if (cur_op->req.hdr.opcode == OSD_OP_SYNC)
{
continue_primary_sync(cur_op);
}
else
{
throw std::runtime_error("BUG: unknown opcode");
}
}
}
void osd_t::pg_cancel_write_queue(pg_t & pg, object_id oid, int retval)
{
auto st_it = pg.write_queue.find(oid), it = st_it;
while (it != pg.write_queue.end() && it->first == oid)
{
finish_op(it->second, retval);
it++;
}
if (st_it != it)
{
pg.write_queue.erase(st_it, it);
}
}
void osd_t::continue_primary_write(osd_op_t *cur_op)
{
if (!cur_op->op_data && !prepare_primary_rw(cur_op))
{
return;
}
osd_primary_op_data_t *op_data = cur_op->op_data;
auto & pg = pgs[op_data->pg_num];
if (op_data->st == 1) goto resume_1;
else if (op_data->st == 2) goto resume_2;
else if (op_data->st == 3) goto resume_3;
else if (op_data->st == 4) goto resume_4;
else if (op_data->st == 5) goto resume_5;
else if (op_data->st == 6) goto resume_6;
else if (op_data->st == 7) goto resume_7;
else if (op_data->st == 8) goto resume_8;
assert(op_data->st == 0);
// Check if actions are pending for this object
{
auto act_it = pg.flush_actions.lower_bound((obj_piece_id_t){
.oid = op_data->oid,
.osd_num = 0,
});
if (act_it != pg.flush_actions.end() &&
act_it->first.oid.inode == op_data->oid.inode &&
(act_it->first.oid.stripe & ~STRIPE_MASK) == op_data->oid.stripe)
{
pg.write_queue.emplace(op_data->oid, cur_op);
return;
}
}
// Check if there are other write requests to the same object
{
auto vo_it = pg.write_queue.find(op_data->oid);
if (vo_it != pg.write_queue.end())
{
op_data->st = 1;
2020-02-25 01:20:45 +03:00
pg.write_queue.emplace(op_data->oid, cur_op);
return;
}
pg.write_queue.emplace(op_data->oid, cur_op);
}
resume_1:
// Determine blocks to read and write
// Missing chunks are allowed to be overwritten even in incomplete objects
2020-04-06 00:55:47 +03:00
// FIXME: Allow to do small writes to the old (degraded/misplaced) OSD set for the lower performance impact
op_data->prev_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data(), &op_data->object_state);
cur_op->rmw_buf = calc_rmw(cur_op->buf, op_data->stripes, op_data->prev_set,
pg.pg_size, pg.pg_minsize, pg.pg_cursize, pg.cur_set.data(), bs_block_size);
// Read required blocks
submit_primary_subops(SUBMIT_RMW_READ, pg.pg_size, pg.cur_set.data(), cur_op);
resume_2:
2020-02-25 01:20:45 +03:00
op_data->st = 2;
return;
resume_3:
if (op_data->errors > 0)
{
pg_cancel_write_queue(pg, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
return;
}
2020-02-25 01:20:45 +03:00
// Save version override for parallel reads
pg.ver_override[op_data->oid] = op_data->fact_ver;
// Recover missing stripes, calculate parity
calc_rmw_parity(op_data->stripes, pg.pg_size, op_data->prev_set, pg.cur_set.data(), bs_block_size);
// Send writes
submit_primary_subops(SUBMIT_WRITE, pg.pg_size, pg.cur_set.data(), cur_op);
resume_4:
2020-02-25 01:20:45 +03:00
op_data->st = 4;
return;
resume_5:
if (op_data->errors > 0)
{
pg_cancel_write_queue(pg, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
return;
}
if (op_data->object_state)
{
if (op_data->object_state->state & OBJ_MISPLACED)
{
// Remove extra chunks
submit_primary_del_subops(cur_op, pg.cur_set.data(), op_data->object_state);
if (op_data->n_subops > 0)
{
op_data->st = 8;
return;
resume_8:
if (op_data->errors > 0)
{
pg_cancel_write_queue(pg, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
return;
}
}
}
// Clear object state
if (op_data->object_state->state & OBJ_INCOMPLETE)
{
// Successful write means that object is not incomplete anymore
incomplete_objects--;
pg.incomplete_objects.erase(op_data->oid);
if (!pg.incomplete_objects.size())
{
pg.state = pg.state & ~PG_HAS_INCOMPLETE;
pg.print_state();
}
}
else if (op_data->object_state->state & OBJ_DEGRADED)
{
degraded_objects--;
pg.degraded_objects.erase(op_data->oid);
if (!pg.degraded_objects.size())
{
pg.state = pg.state & ~PG_HAS_DEGRADED;
pg.print_state();
}
}
else if (op_data->object_state->state & OBJ_MISPLACED)
{
misplaced_objects--;
pg.misplaced_objects.erase(op_data->oid);
if (!pg.misplaced_objects.size())
{
pg.state = pg.state & ~PG_HAS_MISPLACED;
pg.print_state();
}
}
else
{
throw std::runtime_error("Invalid object state during recovery: "+std::to_string(op_data->object_state->state));
}
// FIXME: Track object count during normal writes, too
pg.clean_count++;
op_data->object_state->object_count--;
if (!op_data->object_state->object_count)
{
pg.state_dict.erase(op_data->object_state->osd_set);
}
}
// FIXME: Check for immediate_commit == IMMEDIATE_SMALL
if (immediate_commit == IMMEDIATE_ALL)
2020-02-25 01:20:45 +03:00
{
op_data->unstable_write_osds = new std::vector<unstable_osd_num_t>();
op_data->unstable_writes = new obj_ver_id[pg.pg_cursize];
2020-02-25 01:20:45 +03:00
{
int last_start = 0;
osd_num_t *osd_set = pg.cur_set.data();
for (int role = 0; role < pg.pg_size; role++)
{
if (osd_set[role] != 0)
{
op_data->unstable_writes[last_start] = (obj_ver_id){
.oid = {
.inode = op_data->oid.inode,
.stripe = op_data->oid.stripe | role,
},
.version = op_data->fact_ver,
};
op_data->unstable_write_osds->push_back((unstable_osd_num_t){
.osd_num = osd_set[role],
.start = last_start,
.len = 1,
});
last_start++;
}
}
2020-02-25 01:20:45 +03:00
}
// Stabilize version sets
submit_primary_stab_subops(cur_op);
resume_6:
op_data->st = 6;
return;
resume_7:
// FIXME: Free those in the destructor?
delete op_data->unstable_write_osds;
delete[] op_data->unstable_writes;
op_data->unstable_writes = NULL;
op_data->unstable_write_osds = NULL;
if (op_data->errors > 0)
{
pg_cancel_write_queue(pg, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
return;
}
}
else
{
// Remember version as unstable
osd_num_t *osd_set = pg.cur_set.data();
for (int role = 0; role < pg.pg_size; role++)
{
if (osd_set[role] != 0)
{
this->unstable_writes[(osd_object_id_t){
.osd_num = osd_set[role],
.oid = {
.inode = op_data->oid.inode,
.stripe = op_data->oid.stripe | role,
},
}] = op_data->fact_ver;
}
}
// Remember PG as dirty to drop the connection when PG goes offline
// (this is required because of the "lazy sync")
this->clients[cur_op->peer_fd].dirty_pgs.insert(op_data->pg_num);
2020-04-03 13:03:42 +03:00
dirty_pgs.insert(op_data->pg_num);
2020-02-25 01:20:45 +03:00
}
// Remove version override
object_id oid = op_data->oid;
pg.ver_override.erase(oid);
finish_op(cur_op, cur_op->req.rw.len);
2020-02-25 01:20:45 +03:00
// Continue other write operations to the same object
auto next_it = pg.write_queue.find(oid);
auto this_it = next_it;
next_it++;
pg.write_queue.erase(this_it);
if (next_it != pg.write_queue.end() &&
next_it->first == oid)
2020-02-25 01:20:45 +03:00
{
osd_op_t *next_op = next_it->second;
continue_primary_write(next_op);
2020-02-25 01:20:45 +03:00
}
2020-01-30 22:06:46 +03:00
}
void osd_t::autosync()
{
if (immediate_commit != IMMEDIATE_ALL && !autosync_op)
{
autosync_op = new osd_op_t();
autosync_op->op_type = OSD_OP_IN;
autosync_op->req = {
.sync = {
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
.id = 1,
.opcode = OSD_OP_SYNC,
},
},
};
autosync_op->callback = [this](osd_op_t *op)
{
if (op->reply.hdr.retval < 0)
{
printf("Warning: automatic sync resulted in an error: %ld (%s)\n", -op->reply.hdr.retval, strerror(-op->reply.hdr.retval));
}
delete autosync_op;
autosync_op = NULL;
};
exec_op(autosync_op);
}
}
2020-02-25 20:10:17 +03:00
// Save and clear unstable_writes -> SYNC all -> STABLE all
void osd_t::continue_primary_sync(osd_op_t *cur_op)
2020-01-30 22:06:46 +03:00
{
2020-02-25 20:10:17 +03:00
if (!cur_op->op_data)
{
cur_op->op_data = (osd_primary_op_data_t*)calloc(sizeof(osd_primary_op_data_t), 1);
}
osd_primary_op_data_t *op_data = cur_op->op_data;
if (op_data->st == 1) goto resume_1;
else if (op_data->st == 2) goto resume_2;
else if (op_data->st == 3) goto resume_3;
else if (op_data->st == 4) goto resume_4;
else if (op_data->st == 5) goto resume_5;
else if (op_data->st == 6) goto resume_6;
assert(op_data->st == 0);
2020-02-25 20:10:17 +03:00
if (syncs_in_progress.size() > 0)
{
// Wait for previous syncs, if any
// FIXME: We may try to execute the current one in parallel, like in Blockstore, but I'm not sure if it matters at all
syncs_in_progress.push_back(cur_op);
op_data->st = 1;
2020-02-25 20:10:17 +03:00
resume_1:
return;
}
else
{
syncs_in_progress.push_back(cur_op);
}
resume_2:
if (unstable_writes.size() == 0)
{
// Nothing to sync
goto finish;
}
// Save and clear unstable_writes
2020-04-03 13:03:42 +03:00
// In theory it is possible to do in on a per-client basis, but this seems to be an unnecessary complication
// It would be cool not to copy these here at all, but someone has to deduplicate them by object IDs anyway
2020-02-25 20:10:17 +03:00
{
2020-04-03 13:03:42 +03:00
op_data->unstable_write_osds = new std::vector<unstable_osd_num_t>();
op_data->unstable_writes = new obj_ver_id[this->unstable_writes.size()];
op_data->dirty_pgs = new pg_num_t[dirty_pgs.size()];
op_data->dirty_pg_count = dirty_pgs.size();
2020-02-25 20:10:17 +03:00
osd_num_t last_osd = 0;
int last_start = 0, last_end = 0;
for (auto it = this->unstable_writes.begin(); it != this->unstable_writes.end(); it++)
2020-02-25 20:10:17 +03:00
{
if (last_osd != it->first.osd_num)
{
if (last_osd != 0)
{
op_data->unstable_write_osds->push_back((unstable_osd_num_t){
2020-02-25 20:10:17 +03:00
.osd_num = last_osd,
.start = last_start,
.len = last_end - last_start,
});
}
last_osd = it->first.osd_num;
last_start = last_end;
}
op_data->unstable_writes[last_end] = (obj_ver_id){
2020-02-25 20:10:17 +03:00
.oid = it->first.oid,
.version = it->second,
};
last_end++;
}
if (last_osd != 0)
{
op_data->unstable_write_osds->push_back((unstable_osd_num_t){
2020-02-25 20:10:17 +03:00
.osd_num = last_osd,
.start = last_start,
.len = last_end - last_start,
});
}
2020-04-03 13:03:42 +03:00
int dpg = 0;
for (auto dirty_pg_num: dirty_pgs)
{
pgs[dirty_pg_num].inflight++;
op_data->dirty_pgs[dpg++] = dirty_pg_num;
}
dirty_pgs.clear();
this->unstable_writes.clear();
2020-02-25 20:10:17 +03:00
}
if (immediate_commit != IMMEDIATE_ALL)
{
// SYNC
submit_primary_sync_subops(cur_op);
2020-02-25 20:10:17 +03:00
resume_3:
op_data->st = 3;
return;
2020-02-25 20:10:17 +03:00
resume_4:
if (op_data->errors > 0)
{
goto resume_6;
}
}
2020-02-25 20:10:17 +03:00
// Stabilize version sets
submit_primary_stab_subops(cur_op);
resume_5:
op_data->st = 5;
2020-02-25 20:10:17 +03:00
return;
resume_6:
2020-04-03 13:03:42 +03:00
for (int i = 0; i < op_data->dirty_pg_count; i++)
{
pgs[op_data->dirty_pgs[i]].inflight--;
}
if (op_data->errors > 0)
{
// Return objects back into the unstable write set
for (auto unstable_osd: *(op_data->unstable_write_osds))
{
for (int i = 0; i < unstable_osd.len; i++)
{
2020-04-03 13:03:42 +03:00
// Except those from peered PGs
auto & w = op_data->unstable_writes[i];
2020-04-03 13:03:42 +03:00
pg_num_t wpg = map_to_pg(w.oid);
if (pgs[wpg].state & PG_ACTIVE)
{
uint64_t & dest = this->unstable_writes[(osd_object_id_t){
.osd_num = unstable_osd.osd_num,
.oid = w.oid,
}];
dest = dest < w.version ? w.version : dest;
2020-04-03 13:03:42 +03:00
dirty_pgs.insert(wpg);
}
}
}
}
// FIXME: Free those in the destructor?
2020-04-03 13:03:42 +03:00
delete op_data->dirty_pgs;
delete op_data->unstable_write_osds;
delete[] op_data->unstable_writes;
op_data->unstable_writes = NULL;
op_data->unstable_write_osds = NULL;
if (op_data->errors > 0)
{
finish_op(cur_op, op_data->epipe > 0 ? -EPIPE : -EIO);
}
else
{
2020-02-25 20:10:17 +03:00
finish:
2020-04-03 13:03:42 +03:00
if (cur_op->peer_fd)
{
auto it = clients.find(cur_op->peer_fd);
if (it != clients.end())
it->second.dirty_pgs.clear();
}
finish_op(cur_op, 0);
}
2020-02-25 20:10:17 +03:00
assert(syncs_in_progress.front() == cur_op);
syncs_in_progress.pop_front();
if (syncs_in_progress.size() > 0)
{
cur_op = syncs_in_progress.front();
op_data->st++;
goto resume_2;
2020-02-25 20:10:17 +03:00
}
}
void osd_t::submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_osd_set_state_t *object_state)
{
osd_primary_op_data_t *op_data = cur_op->op_data;
int extra_chunks = 0;
for (auto chunk: object_state->osd_set)
{
if (chunk.osd_num != cur_set[chunk.role])
{
extra_chunks++;
}
}
op_data->n_subops = extra_chunks;
op_data->done = op_data->errors = 0;
if (!extra_chunks)
{
return;
}
osd_op_t *subops = new osd_op_t[extra_chunks];
op_data->subops = subops;
int i = 0;
for (auto chunk: object_state->osd_set)
{
if (chunk.osd_num != cur_set[chunk.role])
{
if (chunk.osd_num == this->osd_num)
{
subops[i].bs_op = new blockstore_op_t({
.opcode = BS_OP_DELETE,
.callback = [cur_op, this](blockstore_op_t *subop)
{
if (subop->retval != 0)
{
// die
throw std::runtime_error("local delete operation failed");
}
handle_primary_subop(OSD_OP_SECONDARY_DELETE, cur_op, subop->retval, 0, 0);
},
.oid = {
.inode = op_data->oid.inode,
.stripe = op_data->oid.stripe | chunk.role,
},
// Same version as write
.version = op_data->fact_ver,
});
bs->enqueue_op(subops[i].bs_op);
}
else
{
subops[i].op_type = OSD_OP_OUT;
subops[i].send_list.push_back(subops[i].req.buf, OSD_PACKET_SIZE);
subops[i].peer_fd = osd_peer_fds.at(chunk.osd_num);
subops[i].req.sec_del = {
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
.id = this->next_subop_id++,
.opcode = OSD_OP_SECONDARY_DELETE,
},
.oid = {
.inode = op_data->oid.inode,
.stripe = op_data->oid.stripe | chunk.role,
},
// Same version as write
.version = op_data->fact_ver,
};
subops[i].callback = [cur_op, this](osd_op_t *subop)
{
int fail_fd = subop->reply.hdr.retval != 0 ? subop->peer_fd : 0;
handle_primary_subop(OSD_OP_SECONDARY_DELETE, cur_op, subop->reply.hdr.retval, 0, 0);
if (fail_fd >= 0)
{
// delete operation failed, drop the connection
stop_client(fail_fd);
}
};
outbox_push(clients[subops[i].peer_fd], &subops[i]);
}
i++;
}
}
}
2020-02-25 20:10:17 +03:00
void osd_t::submit_primary_sync_subops(osd_op_t *cur_op)
{
osd_primary_op_data_t *op_data = cur_op->op_data;
int n_osds = op_data->unstable_write_osds->size();
osd_op_t *subops = new osd_op_t[n_osds];
op_data->done = op_data->errors = 0;
op_data->n_subops = n_osds;
op_data->subops = subops;
for (int i = 0; i < n_osds; i++)
{
osd_num_t sync_osd = (*(op_data->unstable_write_osds))[i].osd_num;
if (sync_osd == this->osd_num)
{
subops[i].bs_op = new blockstore_op_t({
.opcode = BS_OP_SYNC,
.callback = [cur_op, this](blockstore_op_t *subop)
{
if (subop->retval != 0)
{
// die
throw std::runtime_error("local sync operation failed");
}
handle_primary_subop(OSD_OP_SECONDARY_SYNC, cur_op, subop->retval, 0, 0);
2020-02-25 20:10:17 +03:00
},
});
bs->enqueue_op(subops[i].bs_op);
}
else
{
subops[i].op_type = OSD_OP_OUT;
2020-02-29 02:27:10 +03:00
subops[i].send_list.push_back(subops[i].req.buf, OSD_PACKET_SIZE);
2020-02-25 20:10:17 +03:00
subops[i].peer_fd = osd_peer_fds.at(sync_osd);
subops[i].req.sec_sync = {
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
.id = this->next_subop_id++,
.opcode = OSD_OP_SECONDARY_SYNC,
},
};
subops[i].callback = [cur_op, this](osd_op_t *subop)
{
int fail_fd = subop->reply.hdr.retval != 0 ? subop->peer_fd : 0;
handle_primary_subop(OSD_OP_SECONDARY_SYNC, cur_op, subop->reply.hdr.retval, 0, 0);
if (fail_fd >= 0)
{
// sync operation failed, drop the connection
stop_client(fail_fd);
}
2020-02-25 20:10:17 +03:00
};
outbox_push(clients[subops[i].peer_fd], &subops[i]);
}
}
}
void osd_t::submit_primary_stab_subops(osd_op_t *cur_op)
{
osd_primary_op_data_t *op_data = cur_op->op_data;
int n_osds = op_data->unstable_write_osds->size();
osd_op_t *subops = new osd_op_t[n_osds];
op_data->done = op_data->errors = 0;
op_data->n_subops = n_osds;
op_data->subops = subops;
for (int i = 0; i < n_osds; i++)
{
auto & stab_osd = (*(op_data->unstable_write_osds))[i];
if (stab_osd.osd_num == this->osd_num)
{
subops[i].bs_op = new blockstore_op_t({
.opcode = BS_OP_STABLE,
.callback = [cur_op, this](blockstore_op_t *subop)
{
if (subop->retval != 0)
{
// die
throw std::runtime_error("local stabilize operation failed");
}
handle_primary_subop(OSD_OP_SECONDARY_STABILIZE, cur_op, subop->retval, 0, 0);
2020-02-25 20:10:17 +03:00
},
.len = (uint32_t)stab_osd.len,
.buf = (void*)(op_data->unstable_writes + stab_osd.start),
});
bs->enqueue_op(subops[i].bs_op);
}
else
{
subops[i].op_type = OSD_OP_OUT;
2020-02-29 02:27:10 +03:00
subops[i].send_list.push_back(subops[i].req.buf, OSD_PACKET_SIZE);
2020-02-25 20:10:17 +03:00
subops[i].peer_fd = osd_peer_fds.at(stab_osd.osd_num);
subops[i].req.sec_stab = {
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
.id = this->next_subop_id++,
.opcode = OSD_OP_SECONDARY_STABILIZE,
},
.len = (uint64_t)(stab_osd.len * sizeof(obj_ver_id)),
};
subops[i].send_list.push_back(op_data->unstable_writes + stab_osd.start, stab_osd.len * sizeof(obj_ver_id));
subops[i].callback = [cur_op, this](osd_op_t *subop)
{
int fail_fd = subop->reply.hdr.retval != 0 ? subop->peer_fd : 0;
handle_primary_subop(OSD_OP_SECONDARY_STABILIZE, cur_op, subop->reply.hdr.retval, 0, 0);
if (fail_fd >= 0)
{
// sync operation failed, drop the connection
stop_client(fail_fd);
}
2020-02-25 20:10:17 +03:00
};
outbox_push(clients[subops[i].peer_fd], &subops[i]);
}
}
}