Reply using a single finish_op() method, allow to call OSD ops from inside the OSD
parent
036f4c5bf3
commit
dbd8418798
9
osd.cpp
9
osd.cpp
|
@ -376,7 +376,14 @@ void osd_t::exec_op(osd_op_t *cur_op)
|
||||||
cur_op->reply.hdr.id = cur_op->req.hdr.id;
|
cur_op->reply.hdr.id = cur_op->req.hdr.id;
|
||||||
cur_op->reply.hdr.opcode = cur_op->req.hdr.opcode;
|
cur_op->reply.hdr.opcode = cur_op->req.hdr.opcode;
|
||||||
cur_op->reply.hdr.retval = -EINVAL;
|
cur_op->reply.hdr.retval = -EINVAL;
|
||||||
outbox_push(this->clients[cur_op->peer_fd], cur_op);
|
if (cur_op->peer_fd)
|
||||||
|
{
|
||||||
|
outbox_push(this->clients[cur_op->peer_fd], cur_op);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
cur_op->callback(cur_op);
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
inflight_ops++;
|
inflight_ops++;
|
||||||
|
|
2
osd.h
2
osd.h
|
@ -262,7 +262,7 @@ class osd_t
|
||||||
void continue_primary_read(osd_op_t *cur_op);
|
void continue_primary_read(osd_op_t *cur_op);
|
||||||
void continue_primary_write(osd_op_t *cur_op);
|
void continue_primary_write(osd_op_t *cur_op);
|
||||||
void continue_primary_sync(osd_op_t *cur_op);
|
void continue_primary_sync(osd_op_t *cur_op);
|
||||||
void finish_primary_op(osd_op_t *cur_op, int retval);
|
void finish_op(osd_op_t *cur_op, int retval);
|
||||||
void handle_primary_subop(osd_op_t *cur_op, int ok, uint64_t version);
|
void handle_primary_subop(osd_op_t *cur_op, int ok, uint64_t version);
|
||||||
void submit_primary_subops(int submit_type, int read_pg_size, const uint64_t* osd_set, osd_op_t *cur_op);
|
void submit_primary_subops(int submit_type, int read_pg_size, const uint64_t* osd_set, osd_op_t *cur_op);
|
||||||
void submit_primary_sync_subops(osd_op_t *cur_op);
|
void submit_primary_sync_subops(osd_op_t *cur_op);
|
||||||
|
|
|
@ -36,7 +36,7 @@ struct osd_primary_op_data_t
|
||||||
obj_ver_id *unstable_writes = NULL;
|
obj_ver_id *unstable_writes = NULL;
|
||||||
};
|
};
|
||||||
|
|
||||||
void osd_t::finish_primary_op(osd_op_t *cur_op, int retval)
|
void osd_t::finish_op(osd_op_t *cur_op, int retval)
|
||||||
{
|
{
|
||||||
// FIXME add separate magic number
|
// FIXME add separate magic number
|
||||||
auto cl_it = clients.find(cur_op->peer_fd);
|
auto cl_it = clients.find(cur_op->peer_fd);
|
||||||
|
@ -46,7 +46,15 @@ void osd_t::finish_primary_op(osd_op_t *cur_op, int retval)
|
||||||
cur_op->reply.hdr.id = cur_op->req.hdr.id;
|
cur_op->reply.hdr.id = cur_op->req.hdr.id;
|
||||||
cur_op->reply.hdr.opcode = cur_op->req.hdr.opcode;
|
cur_op->reply.hdr.opcode = cur_op->req.hdr.opcode;
|
||||||
cur_op->reply.hdr.retval = retval;
|
cur_op->reply.hdr.retval = retval;
|
||||||
outbox_push(cl_it->second, cur_op);
|
if (!cur_op->peer_fd)
|
||||||
|
{
|
||||||
|
cur_op->callback(cur_op);
|
||||||
|
delete cur_op;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
outbox_push(cl_it->second, cur_op);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -64,7 +72,7 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op)
|
||||||
// FIXME: Postpone operations in inactive PGs
|
// FIXME: Postpone operations in inactive PGs
|
||||||
if (pgs.find(pg_num) == pgs.end() || !(pgs[pg_num].state & PG_ACTIVE))
|
if (pgs.find(pg_num) == pgs.end() || !(pgs[pg_num].state & PG_ACTIVE))
|
||||||
{
|
{
|
||||||
finish_primary_op(cur_op, -EINVAL);
|
finish_op(cur_op, -EINVAL);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
uint64_t pg_parity_size = bs_block_size * pgs[pg_num].pg_minsize;
|
uint64_t pg_parity_size = bs_block_size * pgs[pg_num].pg_minsize;
|
||||||
|
@ -78,7 +86,7 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op)
|
||||||
(cur_op->req.rw.offset % bs_disk_alignment) != 0 ||
|
(cur_op->req.rw.offset % bs_disk_alignment) != 0 ||
|
||||||
(cur_op->req.rw.len % bs_disk_alignment) != 0)
|
(cur_op->req.rw.len % bs_disk_alignment) != 0)
|
||||||
{
|
{
|
||||||
finish_primary_op(cur_op, -EINVAL);
|
finish_op(cur_op, -EINVAL);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
osd_primary_op_data_t *op_data = (osd_primary_op_data_t*)calloc(
|
osd_primary_op_data_t *op_data = (osd_primary_op_data_t*)calloc(
|
||||||
|
@ -129,7 +137,7 @@ void osd_t::continue_primary_read(osd_op_t *cur_op)
|
||||||
if (extend_missing_stripes(op_data->stripes, cur_set, pg.pg_minsize, pg.pg_size) < 0)
|
if (extend_missing_stripes(op_data->stripes, cur_set, pg.pg_minsize, pg.pg_size) < 0)
|
||||||
{
|
{
|
||||||
free(op_data);
|
free(op_data);
|
||||||
finish_primary_op(cur_op, -EIO);
|
finish_op(cur_op, -EIO);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// Submit reads
|
// Submit reads
|
||||||
|
@ -148,7 +156,7 @@ resume_2:
|
||||||
{
|
{
|
||||||
free(op_data);
|
free(op_data);
|
||||||
cur_op->op_data = NULL;
|
cur_op->op_data = NULL;
|
||||||
finish_primary_op(cur_op, -EIO);
|
finish_op(cur_op, -EIO);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (op_data->degraded)
|
if (op_data->degraded)
|
||||||
|
@ -174,7 +182,7 @@ resume_2:
|
||||||
}
|
}
|
||||||
free(op_data);
|
free(op_data);
|
||||||
cur_op->op_data = NULL;
|
cur_op->op_data = NULL;
|
||||||
finish_primary_op(cur_op, cur_op->req.rw.len);
|
finish_op(cur_op, cur_op->req.rw.len);
|
||||||
}
|
}
|
||||||
|
|
||||||
void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t* osd_set, osd_op_t *cur_op)
|
void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t* osd_set, osd_op_t *cur_op)
|
||||||
|
@ -435,7 +443,7 @@ resume_7:
|
||||||
}
|
}
|
||||||
// Remove version override
|
// Remove version override
|
||||||
pg.ver_override.erase(op_data->oid);
|
pg.ver_override.erase(op_data->oid);
|
||||||
finish_primary_op(cur_op, cur_op->req.rw.len);
|
finish_op(cur_op, cur_op->req.rw.len);
|
||||||
// Continue other write operations to the same object
|
// Continue other write operations to the same object
|
||||||
{
|
{
|
||||||
auto next_it = pg.write_queue.find(op_data->oid);
|
auto next_it = pg.write_queue.find(op_data->oid);
|
||||||
|
@ -548,7 +556,7 @@ resume_6:
|
||||||
finish:
|
finish:
|
||||||
assert(syncs_in_progress.front() == cur_op);
|
assert(syncs_in_progress.front() == cur_op);
|
||||||
syncs_in_progress.pop_front();
|
syncs_in_progress.pop_front();
|
||||||
finish_primary_op(cur_op, 0);
|
finish_op(cur_op, 0);
|
||||||
if (syncs_in_progress.size() > 0)
|
if (syncs_in_progress.size() > 0)
|
||||||
{
|
{
|
||||||
cur_op = syncs_in_progress.front();
|
cur_op = syncs_in_progress.front();
|
||||||
|
|
|
@ -5,44 +5,31 @@
|
||||||
void osd_t::secondary_op_callback(osd_op_t *op)
|
void osd_t::secondary_op_callback(osd_op_t *op)
|
||||||
{
|
{
|
||||||
inflight_ops--;
|
inflight_ops--;
|
||||||
auto cl_it = clients.find(op->peer_fd);
|
if (op->req.hdr.opcode == OSD_OP_SECONDARY_READ ||
|
||||||
if (cl_it != clients.end())
|
op->req.hdr.opcode == OSD_OP_SECONDARY_WRITE)
|
||||||
{
|
{
|
||||||
op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC;
|
op->reply.sec_rw.version = op->bs_op->version;
|
||||||
op->reply.hdr.id = op->req.hdr.id;
|
|
||||||
op->reply.hdr.opcode = op->req.hdr.opcode;
|
|
||||||
op->reply.hdr.retval = op->bs_op->retval;
|
|
||||||
if (op->req.hdr.opcode == OSD_OP_SECONDARY_READ ||
|
|
||||||
op->req.hdr.opcode == OSD_OP_SECONDARY_WRITE)
|
|
||||||
{
|
|
||||||
op->reply.sec_rw.version = op->bs_op->version;
|
|
||||||
}
|
|
||||||
else if (op->req.hdr.opcode == OSD_OP_SECONDARY_DELETE)
|
|
||||||
{
|
|
||||||
op->reply.sec_del.version = op->bs_op->version;
|
|
||||||
}
|
|
||||||
if (op->req.hdr.opcode == OSD_OP_SECONDARY_READ &&
|
|
||||||
op->reply.hdr.retval > 0)
|
|
||||||
{
|
|
||||||
op->send_list.push_back(op->buf, op->reply.hdr.retval);
|
|
||||||
}
|
|
||||||
else if (op->req.hdr.opcode == OSD_OP_SECONDARY_LIST)
|
|
||||||
{
|
|
||||||
// allocated by blockstore
|
|
||||||
op->buf = op->bs_op->buf;
|
|
||||||
if (op->reply.hdr.retval > 0)
|
|
||||||
{
|
|
||||||
op->send_list.push_back(op->buf, op->reply.hdr.retval * sizeof(obj_ver_id));
|
|
||||||
}
|
|
||||||
op->reply.sec_list.stable_count = op->bs_op->version;
|
|
||||||
}
|
|
||||||
auto & cl = cl_it->second;
|
|
||||||
outbox_push(cl, op);
|
|
||||||
}
|
}
|
||||||
else
|
else if (op->req.hdr.opcode == OSD_OP_SECONDARY_DELETE)
|
||||||
{
|
{
|
||||||
delete op;
|
op->reply.sec_del.version = op->bs_op->version;
|
||||||
}
|
}
|
||||||
|
if (op->req.hdr.opcode == OSD_OP_SECONDARY_READ &&
|
||||||
|
op->bs_op->retval > 0)
|
||||||
|
{
|
||||||
|
op->send_list.push_back(op->buf, op->bs_op->retval);
|
||||||
|
}
|
||||||
|
else if (op->req.hdr.opcode == OSD_OP_SECONDARY_LIST)
|
||||||
|
{
|
||||||
|
// allocated by blockstore
|
||||||
|
op->buf = op->bs_op->buf;
|
||||||
|
if (op->bs_op->retval > 0)
|
||||||
|
{
|
||||||
|
op->send_list.push_back(op->buf, op->bs_op->retval * sizeof(obj_ver_id));
|
||||||
|
}
|
||||||
|
op->reply.sec_list.stable_count = op->bs_op->version;
|
||||||
|
}
|
||||||
|
finish_op(op, op->bs_op->retval);
|
||||||
}
|
}
|
||||||
|
|
||||||
void osd_t::exec_secondary(osd_op_t *cur_op)
|
void osd_t::exec_secondary(osd_op_t *cur_op)
|
||||||
|
@ -114,15 +101,10 @@ void osd_t::exec_show_config(osd_op_t *cur_op)
|
||||||
{
|
{
|
||||||
// FIXME: Send the real config, not its source
|
// FIXME: Send the real config, not its source
|
||||||
std::string cfg_str = json11::Json(config).dump();
|
std::string cfg_str = json11::Json(config).dump();
|
||||||
cur_op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC;
|
|
||||||
cur_op->reply.hdr.id = cur_op->req.hdr.id;
|
|
||||||
cur_op->reply.hdr.opcode = cur_op->req.hdr.opcode;
|
|
||||||
cur_op->reply.hdr.retval = cfg_str.size()+1;
|
|
||||||
cur_op->buf = malloc(cfg_str.size()+1);
|
cur_op->buf = malloc(cfg_str.size()+1);
|
||||||
memcpy(cur_op->buf, cfg_str.c_str(), cfg_str.size()+1);
|
memcpy(cur_op->buf, cfg_str.c_str(), cfg_str.size()+1);
|
||||||
auto & cl = clients[cur_op->peer_fd];
|
|
||||||
cur_op->send_list.push_back(cur_op->buf, cur_op->reply.hdr.retval);
|
cur_op->send_list.push_back(cur_op->buf, cur_op->reply.hdr.retval);
|
||||||
outbox_push(cl, cur_op);
|
finish_op(cur_op, cfg_str.size()+1);
|
||||||
}
|
}
|
||||||
|
|
||||||
void osd_t::exec_sync_stab_all(osd_op_t *cur_op)
|
void osd_t::exec_sync_stab_all(osd_op_t *cur_op)
|
||||||
|
|
Loading…
Reference in New Issue