forked from vitalif/vitastor
Transform primary_r/w into "coroutines"
parent
4c0178f180
commit
09588a349f
4
osd.cpp
4
osd.cpp
|
@ -332,11 +332,11 @@ void osd_t::exec_op(osd_op_t *cur_op)
|
|||
}
|
||||
else if (cur_op->req.hdr.opcode == OSD_OP_READ)
|
||||
{
|
||||
exec_primary_read(cur_op);
|
||||
continue_primary_read(cur_op);
|
||||
}
|
||||
else if (cur_op->req.hdr.opcode == OSD_OP_WRITE)
|
||||
{
|
||||
exec_primary_write(cur_op);
|
||||
continue_primary_write(cur_op);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
8
osd.h
8
osd.h
|
@ -236,12 +236,12 @@ class osd_t
|
|||
|
||||
// primary ops
|
||||
bool prepare_primary_rw(osd_op_t *cur_op);
|
||||
void exec_primary_read(osd_op_t *cur_op);
|
||||
void exec_primary_write(osd_op_t *cur_op);
|
||||
void continue_primary_read(osd_op_t *cur_op);
|
||||
void continue_primary_write(osd_op_t *cur_op);
|
||||
void exec_primary_sync(osd_op_t *cur_op);
|
||||
void finish_primary_op(osd_op_t *cur_op, int retval);
|
||||
void handle_primary_read_subop(osd_op_t *cur_op, int ok);
|
||||
void submit_read_subops(int read_pg_size, const uint64_t* osd_set, osd_op_t *cur_op);
|
||||
void handle_primary_subop(osd_op_t *cur_op, int ok, uint64_t version);
|
||||
void submit_primary_subops(int submit_type, int read_pg_size, const uint64_t* osd_set, osd_op_t *cur_op);
|
||||
public:
|
||||
osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringloop);
|
||||
~osd_t();
|
||||
|
|
165
osd_primary.cpp
165
osd_primary.cpp
|
@ -1,6 +1,10 @@
|
|||
#include "osd.h"
|
||||
#include "osd_rmw.h"
|
||||
|
||||
#define SUBMIT_READ 0
|
||||
#define SUBMIT_RMW_READ 1
|
||||
#define SUBMIT_WRITE 2
|
||||
|
||||
// read: read directly or read paired stripe(s), reconstruct, return
|
||||
// write: read paired stripe(s), modify, write
|
||||
//
|
||||
|
@ -12,14 +16,15 @@
|
|||
|
||||
struct osd_primary_op_data_t
|
||||
{
|
||||
int st = 0;
|
||||
pg_num_t pg_num;
|
||||
object_id oid;
|
||||
uint64_t target_ver;
|
||||
uint64_t fact_ver = 0;
|
||||
int n_subops = 0, done = 0, errors = 0;
|
||||
int degraded = 0, pg_size, pg_minsize;
|
||||
osd_rmw_stripe_t *stripes;
|
||||
osd_op_t *subops = NULL;
|
||||
bool should_read_version = false;
|
||||
};
|
||||
|
||||
void osd_t::finish_primary_op(osd_op_t *cur_op, int retval)
|
||||
|
@ -71,13 +76,16 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op)
|
|||
return true;
|
||||
}
|
||||
|
||||
void osd_t::exec_primary_read(osd_op_t *cur_op)
|
||||
void osd_t::continue_primary_read(osd_op_t *cur_op)
|
||||
{
|
||||
if (!prepare_primary_rw(cur_op))
|
||||
if (!cur_op->op_data && !prepare_primary_rw(cur_op))
|
||||
{
|
||||
return;
|
||||
}
|
||||
osd_primary_op_data_t *op_data = (osd_primary_op_data_t*)cur_op->op_data;
|
||||
osd_primary_op_data_t *op_data = cur_op->op_data;
|
||||
if (op_data->st == 1) goto resume_1;
|
||||
else if (op_data->st == 2) goto resume_2;
|
||||
{
|
||||
auto & pg = pgs[op_data->pg_num];
|
||||
for (int role = 0; role < pg.pg_minsize; role++)
|
||||
{
|
||||
|
@ -91,8 +99,9 @@ void osd_t::exec_primary_read(osd_op_t *cur_op)
|
|||
{
|
||||
// Fast happy-path
|
||||
cur_op->buf = alloc_read_buffer(op_data->stripes, pg.pg_minsize, 0);
|
||||
submit_read_subops(pg.pg_minsize, pg.cur_set.data(), cur_op);
|
||||
submit_primary_subops(SUBMIT_READ, pg.pg_minsize, pg.cur_set.data(), cur_op);
|
||||
cur_op->send_list.push_back(cur_op->buf, cur_op->req.rw.len);
|
||||
op_data->st = 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -108,25 +117,17 @@ void osd_t::exec_primary_read(osd_op_t *cur_op)
|
|||
return;
|
||||
}
|
||||
// Submit reads
|
||||
cur_op->buf = alloc_read_buffer(op_data->stripes, pg.pg_size, 0);
|
||||
submit_read_subops(pg.pg_size, cur_set, cur_op);
|
||||
op_data->pg_minsize = pg.pg_minsize;
|
||||
op_data->pg_size = pg.pg_size;
|
||||
op_data->degraded = 1;
|
||||
cur_op->buf = alloc_read_buffer(op_data->stripes, pg.pg_size, 0);
|
||||
submit_primary_subops(SUBMIT_READ, pg.pg_size, cur_set, cur_op);
|
||||
op_data->st = 1;
|
||||
}
|
||||
}
|
||||
|
||||
void osd_t::handle_primary_read_subop(osd_op_t *cur_op, int ok)
|
||||
{
|
||||
osd_primary_op_data_t *op_data = (osd_primary_op_data_t*)cur_op->op_data;
|
||||
if (!ok)
|
||||
op_data->errors++;
|
||||
else
|
||||
op_data->done++;
|
||||
if ((op_data->errors + op_data->done) >= op_data->n_subops)
|
||||
{
|
||||
delete[] op_data->subops;
|
||||
op_data->subops = NULL;
|
||||
resume_1:
|
||||
return;
|
||||
resume_2:
|
||||
if (op_data->errors > 0)
|
||||
{
|
||||
free(op_data);
|
||||
|
@ -149,7 +150,8 @@ void osd_t::handle_primary_read_subop(osd_op_t *cur_op, int ok)
|
|||
{
|
||||
// Send buffer in parts to avoid copying
|
||||
cur_op->send_list.push_back(
|
||||
stripes[role].read_buf + (stripes[role].read_start - stripes[role].req_start), stripes[role].req_end
|
||||
stripes[role].read_buf + (stripes[role].req_start - stripes[role].read_start),
|
||||
stripes[role].req_end - stripes[role].req_start
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -158,40 +160,42 @@ void osd_t::handle_primary_read_subop(osd_op_t *cur_op, int ok)
|
|||
cur_op->op_data = NULL;
|
||||
finish_primary_op(cur_op, cur_op->req.rw.len);
|
||||
}
|
||||
}
|
||||
|
||||
void osd_t::submit_read_subops(int read_pg_size, const uint64_t* osd_set, osd_op_t *cur_op)
|
||||
void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t* osd_set, osd_op_t *cur_op)
|
||||
{
|
||||
osd_primary_op_data_t *op_data = (osd_primary_op_data_t*)cur_op->op_data;
|
||||
bool w = submit_type == SUBMIT_WRITE;
|
||||
osd_primary_op_data_t *op_data = cur_op->op_data;
|
||||
osd_rmw_stripe_t *stripes = op_data->stripes;
|
||||
// Allocate subops
|
||||
int n_subops = 0, force_read = -1;
|
||||
for (int role = 0; role < read_pg_size; role++)
|
||||
int n_subops = 0, zero_read = -1;
|
||||
for (int role = 0; role < pg_size; role++)
|
||||
{
|
||||
if (osd_set[role] == this->osd_num || osd_set[role] != 0 && force_read == -1)
|
||||
if (osd_set[role] == this->osd_num || osd_set[role] != 0 && zero_read == -1)
|
||||
{
|
||||
force_read = role;
|
||||
zero_read = role;
|
||||
}
|
||||
if (osd_set[role] != 0 && stripes[role].read_end != 0)
|
||||
if (osd_set[role] != 0 &&
|
||||
(w ? stripes[role].write_end : stripes[role].read_end) != 0)
|
||||
{
|
||||
n_subops++;
|
||||
}
|
||||
}
|
||||
if (!n_subops && op_data->should_read_version)
|
||||
if (!n_subops && submit_type == SUBMIT_RMW_READ)
|
||||
{
|
||||
n_subops = 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
force_read = -1;
|
||||
zero_read = -1;
|
||||
}
|
||||
osd_op_t *subops = new osd_op_t[n_subops];
|
||||
op_data->done = op_data->errors = 0;
|
||||
op_data->n_subops = n_subops;
|
||||
op_data->subops = subops;
|
||||
int subop = 0;
|
||||
for (int role = 0; role < read_pg_size; role++)
|
||||
for (int role = 0; role < pg_size; role++)
|
||||
{
|
||||
if (stripes[role].read_end == 0 && force_read != role)
|
||||
if ((submit_type == SUBMIT_WRITE ? stripes[role].write_end : stripes[role].read_end) == 0 && zero_read != role)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
@ -201,19 +205,19 @@ void osd_t::submit_read_subops(int read_pg_size, const uint64_t* osd_set, osd_op
|
|||
if (role_osd_num == this->osd_num)
|
||||
{
|
||||
subops[subop].bs_op = new blockstore_op_t({
|
||||
.opcode = BS_OP_READ,
|
||||
.opcode = (uint64_t)(w ? BS_OP_WRITE : BS_OP_READ),
|
||||
.callback = [cur_op, this](blockstore_op_t *subop)
|
||||
{
|
||||
handle_primary_read_subop(cur_op, subop->retval == subop->len);
|
||||
handle_primary_subop(cur_op, subop->retval == subop->len, subop->version);
|
||||
},
|
||||
.oid = {
|
||||
.inode = op_data->oid.inode,
|
||||
.stripe = op_data->oid.stripe | role,
|
||||
},
|
||||
.version = op_data->target_ver,
|
||||
.offset = stripes[role].read_start,
|
||||
.len = stripes[role].read_end - stripes[role].read_start,
|
||||
.buf = stripes[role].read_buf,
|
||||
.version = w ? 0 : (submit_type == SUBMIT_RMW_READ ? UINT64_MAX : op_data->target_ver),
|
||||
.offset = w ? stripes[role].write_start : stripes[role].read_start,
|
||||
.len = w ? stripes[role].write_end - stripes[role].write_start : stripes[role].read_end - stripes[role].read_start,
|
||||
.buf = w ? stripes[role].write_buf : stripes[role].read_buf,
|
||||
});
|
||||
bs->enqueue_op(subops[subop].bs_op);
|
||||
}
|
||||
|
@ -225,22 +229,22 @@ void osd_t::submit_read_subops(int read_pg_size, const uint64_t* osd_set, osd_op
|
|||
.header = {
|
||||
.magic = SECONDARY_OSD_OP_MAGIC,
|
||||
.id = this->next_subop_id++,
|
||||
.opcode = OSD_OP_SECONDARY_READ,
|
||||
.opcode = (uint64_t)(w ? OSD_OP_SECONDARY_WRITE : OSD_OP_SECONDARY_READ),
|
||||
},
|
||||
.oid = {
|
||||
.inode = op_data->oid.inode,
|
||||
.stripe = op_data->oid.stripe | role,
|
||||
},
|
||||
.version = op_data->target_ver,
|
||||
.offset = stripes[role].read_start,
|
||||
.len = stripes[role].read_end - stripes[role].read_start,
|
||||
.version = w ? 0 : (submit_type == SUBMIT_RMW_READ ? UINT64_MAX : op_data->target_ver),
|
||||
.offset = w ? stripes[role].write_start : stripes[role].read_start,
|
||||
.len = w ? stripes[role].write_end - stripes[role].write_start : stripes[role].read_end - stripes[role].read_start,
|
||||
};
|
||||
subops[subop].buf = stripes[role].read_buf;
|
||||
subops[subop].buf = w ? stripes[role].write_buf : stripes[role].read_buf;
|
||||
subops[subop].callback = [cur_op, this](osd_op_t *subop)
|
||||
{
|
||||
// so it doesn't get freed
|
||||
subop->buf = NULL;
|
||||
handle_primary_read_subop(cur_op, subop->reply.hdr.retval == subop->req.sec_rw.len);
|
||||
handle_primary_subop(cur_op, subop->reply.hdr.retval == subop->req.sec_rw.len, subop->reply.sec_rw.version);
|
||||
};
|
||||
outbox_push(clients[subops[subop].peer_fd], &subops[subop]);
|
||||
}
|
||||
|
@ -249,15 +253,50 @@ void osd_t::submit_read_subops(int read_pg_size, const uint64_t* osd_set, osd_op
|
|||
}
|
||||
}
|
||||
|
||||
void osd_t::exec_primary_write(osd_op_t *cur_op)
|
||||
void osd_t::handle_primary_subop(osd_op_t *cur_op, int ok, uint64_t version)
|
||||
{
|
||||
if (!prepare_primary_rw(cur_op))
|
||||
osd_primary_op_data_t *op_data = cur_op->op_data;
|
||||
op_data->fact_ver = version;
|
||||
if (!ok)
|
||||
{
|
||||
op_data->errors++;
|
||||
}
|
||||
else
|
||||
{
|
||||
op_data->done++;
|
||||
}
|
||||
if ((op_data->errors + op_data->done) >= op_data->n_subops)
|
||||
{
|
||||
delete[] op_data->subops;
|
||||
op_data->subops = NULL;
|
||||
op_data->st++;
|
||||
if (cur_op->req.hdr.opcode == OSD_OP_READ)
|
||||
{
|
||||
continue_primary_read(cur_op);
|
||||
}
|
||||
else
|
||||
{
|
||||
continue_primary_write(cur_op);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void osd_t::continue_primary_write(osd_op_t *cur_op)
|
||||
{
|
||||
if (!cur_op->op_data && !prepare_primary_rw(cur_op))
|
||||
{
|
||||
return;
|
||||
}
|
||||
osd_primary_op_data_t *op_data = (osd_primary_op_data_t*)cur_op->op_data;
|
||||
osd_primary_op_data_t *op_data = cur_op->op_data;
|
||||
// FIXME: Handle operation cancel
|
||||
auto & pg = pgs[op_data->pg_num];
|
||||
if (op_data->st == 1) goto resume_1;
|
||||
else if (op_data->st == 2) goto resume_2;
|
||||
else if (op_data->st == 3) goto resume_3;
|
||||
else if (op_data->st == 4) goto resume_4;
|
||||
else if (op_data->st == 5) goto resume_5;
|
||||
// Check if actions are pending for this object
|
||||
{
|
||||
auto act_it = pg.obj_stab_actions.lower_bound((obj_piece_id_t){
|
||||
.oid = op_data->oid,
|
||||
.osd_num = 0,
|
||||
|
@ -271,25 +310,41 @@ void osd_t::exec_primary_write(osd_op_t *cur_op)
|
|||
finish_primary_op(cur_op, -EIO);
|
||||
return;
|
||||
}
|
||||
}
|
||||
resume_1:
|
||||
// Check if there are other write requests to the same object
|
||||
|
||||
{
|
||||
auto vo_it = pg.ver_override.find(op_data->oid);
|
||||
if (vo_it != pg.ver_override.end())
|
||||
{
|
||||
op_data->st = 1;
|
||||
//pg.write_queue.push_back(cur_op);
|
||||
return;
|
||||
}
|
||||
}
|
||||
// Determine blocks to read
|
||||
cur_op->rmw_buf = calc_rmw_reads(cur_op->buf, op_data->stripes, pg.cur_set.data(), pg.pg_size, pg.pg_minsize, pg.pg_cursize);
|
||||
op_data->should_read_version = true;
|
||||
// Read required blocks
|
||||
submit_read_subops(pg.pg_size, pg.cur_set.data(), cur_op);
|
||||
// ->>>>> Continue from the callback
|
||||
submit_primary_subops(SUBMIT_RMW_READ, pg.pg_size, pg.cur_set.data(), cur_op);
|
||||
op_data->st = 2;
|
||||
resume_2:
|
||||
return;
|
||||
resume_3:
|
||||
// Save version override
|
||||
pg.ver_override[op_data->oid] = op_data->fact_ver;
|
||||
// Calculate parity
|
||||
calc_rmw_parity(op_data->stripes, op_data->pg_size);
|
||||
// Save version override if degraded
|
||||
|
||||
// Send writes
|
||||
|
||||
// ->>>>> Continue from the callback
|
||||
submit_primary_subops(SUBMIT_WRITE, pg.pg_size, pg.cur_set.data(), cur_op);
|
||||
op_data->st = 4;
|
||||
resume_4:
|
||||
return;
|
||||
resume_5:
|
||||
// Remember version as unstable
|
||||
|
||||
// Remove version override if degraded
|
||||
|
||||
finish_primary_op(cur_op, cur_op->req.rw.len);
|
||||
}
|
||||
|
||||
void osd_t::exec_primary_sync(osd_op_t *cur_op)
|
||||
|
|
Loading…
Reference in New Issue