diff --git a/osd.cpp b/osd.cpp index 25b53136..233b4139 100644 --- a/osd.cpp +++ b/osd.cpp @@ -376,7 +376,14 @@ void osd_t::exec_op(osd_op_t *cur_op) cur_op->reply.hdr.id = cur_op->req.hdr.id; cur_op->reply.hdr.opcode = cur_op->req.hdr.opcode; cur_op->reply.hdr.retval = -EINVAL; - outbox_push(this->clients[cur_op->peer_fd], cur_op); + if (cur_op->peer_fd) + { + outbox_push(this->clients[cur_op->peer_fd], cur_op); + } + else + { + cur_op->callback(cur_op); + } return; } inflight_ops++; diff --git a/osd.h b/osd.h index f53e46d2..7e13428b 100644 --- a/osd.h +++ b/osd.h @@ -262,7 +262,7 @@ class osd_t void continue_primary_read(osd_op_t *cur_op); void continue_primary_write(osd_op_t *cur_op); void continue_primary_sync(osd_op_t *cur_op); - void finish_primary_op(osd_op_t *cur_op, int retval); + void finish_op(osd_op_t *cur_op, int retval); void handle_primary_subop(osd_op_t *cur_op, int ok, uint64_t version); void submit_primary_subops(int submit_type, int read_pg_size, const uint64_t* osd_set, osd_op_t *cur_op); void submit_primary_sync_subops(osd_op_t *cur_op); diff --git a/osd_primary.cpp b/osd_primary.cpp index 12749109..b3cd58b8 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -36,7 +36,7 @@ struct osd_primary_op_data_t obj_ver_id *unstable_writes = NULL; }; -void osd_t::finish_primary_op(osd_op_t *cur_op, int retval) +void osd_t::finish_op(osd_op_t *cur_op, int retval) { // FIXME add separate magic number auto cl_it = clients.find(cur_op->peer_fd); @@ -46,7 +46,15 @@ void osd_t::finish_primary_op(osd_op_t *cur_op, int retval) cur_op->reply.hdr.id = cur_op->req.hdr.id; cur_op->reply.hdr.opcode = cur_op->req.hdr.opcode; cur_op->reply.hdr.retval = retval; - outbox_push(cl_it->second, cur_op); + if (!cur_op->peer_fd) + { + cur_op->callback(cur_op); + delete cur_op; + } + else + { + outbox_push(cl_it->second, cur_op); + } } else { @@ -64,7 +72,7 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op) // FIXME: Postpone operations in inactive PGs if (pgs.find(pg_num) == pgs.end() || !(pgs[pg_num].state & PG_ACTIVE)) { - finish_primary_op(cur_op, -EINVAL); + finish_op(cur_op, -EINVAL); return false; } uint64_t pg_parity_size = bs_block_size * pgs[pg_num].pg_minsize; @@ -78,7 +86,7 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op) (cur_op->req.rw.offset % bs_disk_alignment) != 0 || (cur_op->req.rw.len % bs_disk_alignment) != 0) { - finish_primary_op(cur_op, -EINVAL); + finish_op(cur_op, -EINVAL); return false; } osd_primary_op_data_t *op_data = (osd_primary_op_data_t*)calloc( @@ -129,7 +137,7 @@ void osd_t::continue_primary_read(osd_op_t *cur_op) if (extend_missing_stripes(op_data->stripes, cur_set, pg.pg_minsize, pg.pg_size) < 0) { free(op_data); - finish_primary_op(cur_op, -EIO); + finish_op(cur_op, -EIO); return; } // Submit reads @@ -148,7 +156,7 @@ resume_2: { free(op_data); cur_op->op_data = NULL; - finish_primary_op(cur_op, -EIO); + finish_op(cur_op, -EIO); return; } if (op_data->degraded) @@ -174,7 +182,7 @@ resume_2: } free(op_data); cur_op->op_data = NULL; - finish_primary_op(cur_op, cur_op->req.rw.len); + finish_op(cur_op, cur_op->req.rw.len); } void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t* osd_set, osd_op_t *cur_op) @@ -435,7 +443,7 @@ resume_7: } // Remove version override pg.ver_override.erase(op_data->oid); - finish_primary_op(cur_op, cur_op->req.rw.len); + finish_op(cur_op, cur_op->req.rw.len); // Continue other write operations to the same object { auto next_it = pg.write_queue.find(op_data->oid); @@ -548,7 +556,7 @@ resume_6: finish: assert(syncs_in_progress.front() == cur_op); syncs_in_progress.pop_front(); - finish_primary_op(cur_op, 0); + finish_op(cur_op, 0); if (syncs_in_progress.size() > 0) { cur_op = syncs_in_progress.front(); diff --git a/osd_secondary.cpp b/osd_secondary.cpp index e3b17468..6b4c0604 100644 --- a/osd_secondary.cpp +++ b/osd_secondary.cpp @@ -5,44 +5,31 @@ void osd_t::secondary_op_callback(osd_op_t *op) { inflight_ops--; - auto cl_it = clients.find(op->peer_fd); - if (cl_it != clients.end()) + if (op->req.hdr.opcode == OSD_OP_SECONDARY_READ || + op->req.hdr.opcode == OSD_OP_SECONDARY_WRITE) { - op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; - op->reply.hdr.id = op->req.hdr.id; - op->reply.hdr.opcode = op->req.hdr.opcode; - op->reply.hdr.retval = op->bs_op->retval; - if (op->req.hdr.opcode == OSD_OP_SECONDARY_READ || - op->req.hdr.opcode == OSD_OP_SECONDARY_WRITE) - { - op->reply.sec_rw.version = op->bs_op->version; - } - else if (op->req.hdr.opcode == OSD_OP_SECONDARY_DELETE) - { - op->reply.sec_del.version = op->bs_op->version; - } - if (op->req.hdr.opcode == OSD_OP_SECONDARY_READ && - op->reply.hdr.retval > 0) - { - op->send_list.push_back(op->buf, op->reply.hdr.retval); - } - else if (op->req.hdr.opcode == OSD_OP_SECONDARY_LIST) - { - // allocated by blockstore - op->buf = op->bs_op->buf; - if (op->reply.hdr.retval > 0) - { - op->send_list.push_back(op->buf, op->reply.hdr.retval * sizeof(obj_ver_id)); - } - op->reply.sec_list.stable_count = op->bs_op->version; - } - auto & cl = cl_it->second; - outbox_push(cl, op); + op->reply.sec_rw.version = op->bs_op->version; } - else + else if (op->req.hdr.opcode == OSD_OP_SECONDARY_DELETE) { - delete op; + op->reply.sec_del.version = op->bs_op->version; } + if (op->req.hdr.opcode == OSD_OP_SECONDARY_READ && + op->bs_op->retval > 0) + { + op->send_list.push_back(op->buf, op->bs_op->retval); + } + else if (op->req.hdr.opcode == OSD_OP_SECONDARY_LIST) + { + // allocated by blockstore + op->buf = op->bs_op->buf; + if (op->bs_op->retval > 0) + { + op->send_list.push_back(op->buf, op->bs_op->retval * sizeof(obj_ver_id)); + } + op->reply.sec_list.stable_count = op->bs_op->version; + } + finish_op(op, op->bs_op->retval); } void osd_t::exec_secondary(osd_op_t *cur_op) @@ -114,15 +101,10 @@ void osd_t::exec_show_config(osd_op_t *cur_op) { // FIXME: Send the real config, not its source std::string cfg_str = json11::Json(config).dump(); - cur_op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; - cur_op->reply.hdr.id = cur_op->req.hdr.id; - cur_op->reply.hdr.opcode = cur_op->req.hdr.opcode; - cur_op->reply.hdr.retval = cfg_str.size()+1; cur_op->buf = malloc(cfg_str.size()+1); memcpy(cur_op->buf, cfg_str.c_str(), cfg_str.size()+1); - auto & cl = clients[cur_op->peer_fd]; cur_op->send_list.push_back(cur_op->buf, cur_op->reply.hdr.retval); - outbox_push(cl, cur_op); + finish_op(cur_op, cfg_str.size()+1); } void osd_t::exec_sync_stab_all(osd_op_t *cur_op)