From 43fe1d88e73ab32cca7d6f8e6dcdd033a1ba1f74 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sat, 28 Mar 2020 19:09:20 +0300 Subject: [PATCH] Fix memory leaks with subops, fix recovery crashes --- osd_primary.cpp | 64 +++++++++++++++++++++++-------------------------- osd_receive.cpp | 6 +++++ 2 files changed, 36 insertions(+), 34 deletions(-) diff --git a/osd_primary.cpp b/osd_primary.cpp index c16e3f0a..5fa28163 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -40,27 +40,26 @@ struct osd_primary_op_data_t void osd_t::finish_op(osd_op_t *cur_op, int retval) { - // FIXME add separate magic number - auto cl_it = clients.find(cur_op->peer_fd); - if (cl_it != clients.end()) + if (!cur_op->peer_fd) { - cur_op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; - cur_op->reply.hdr.id = cur_op->req.hdr.id; - cur_op->reply.hdr.opcode = cur_op->req.hdr.opcode; - cur_op->reply.hdr.retval = retval; - if (!cur_op->peer_fd) - { - cur_op->callback(cur_op); - delete cur_op; - } - else - { - outbox_push(cl_it->second, cur_op); - } + cur_op->callback(cur_op); } else { - delete cur_op; + // FIXME add separate magic number + auto cl_it = clients.find(cur_op->peer_fd); + if (cl_it != clients.end()) + { + cur_op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; + cur_op->reply.hdr.id = cur_op->req.hdr.id; + cur_op->reply.hdr.opcode = cur_op->req.hdr.opcode; + cur_op->reply.hdr.retval = retval; + outbox_push(cl_it->second, cur_op); + } + else + { + delete cur_op; + } } } @@ -159,7 +158,6 @@ void osd_t::continue_primary_read(osd_op_t *cur_op) uint64_t* cur_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data()); if (extend_missing_stripes(op_data->stripes, cur_set, pg.pg_minsize, pg.pg_size) < 0) { - free(op_data); finish_op(cur_op, -EIO); return; } @@ -177,8 +175,6 @@ resume_1: resume_2: if (op_data->errors > 0) { - free(op_data); - cur_op->op_data = NULL; finish_op(cur_op, -EIO); return; } @@ -203,8 +199,6 @@ resume_2: } } } - free(op_data); - cur_op->op_data = NULL; finish_op(cur_op, cur_op->req.rw.len); } @@ -415,6 +409,7 @@ void osd_t::continue_primary_write(osd_op_t *cur_op) op_data->stripes[i].read_start = 0; op_data->stripes[i].read_end = bs_block_size; op_data->stripes[i].missing = op_data->prev_set[i] == 0; + op_data->stripes[i].write_end = 0; } op_data->degraded = 1; submit_primary_subops(SUBMIT_READ, pg.pg_size, op_data->prev_set, cur_op); @@ -429,6 +424,12 @@ resume_9: op_data->recovery_buf + cur_op->req.rw.offset - op_data->oid.stripe, cur_op->buf, cur_op->req.rw.len ); + } + free(cur_op->buf); + cur_op->buf = op_data->recovery_buf; + op_data->recovery_buf = NULL; + if (cur_op->req.rw.len > 0) + { // Write modified parts uint32_t start = 0, end = 0; for (int role = 0; role < pg.pg_minsize; role++) @@ -439,22 +440,16 @@ resume_9: end = std::max(op_data->stripes[role].req_end, end); op_data->stripes[role].write_start = op_data->stripes[role].req_start; op_data->stripes[role].write_end = op_data->stripes[role].req_end; - op_data->stripes[role].write_buf = op_data->recovery_buf + role*bs_block_size + op_data->stripes[role].write_start; + op_data->stripes[role].write_buf = cur_op->buf + role*bs_block_size + op_data->stripes[role].write_start; } } for (int role = pg.pg_minsize; role < pg.pg_size; role++) { op_data->stripes[role].write_start = start; op_data->stripes[role].write_end = end; - op_data->stripes[role].write_buf = op_data->recovery_buf + role*bs_block_size + op_data->stripes[role].write_start; + op_data->stripes[role].write_buf = cur_op->buf + role*bs_block_size + op_data->stripes[role].write_start; } } - if (cur_op->buf) - { - free(cur_op->buf); - } - cur_op->buf = op_data->recovery_buf; - op_data->recovery_buf = NULL; // Also write recovered parts uint64_t *cur_set = pg.cur_set.data(); for (int role = 0; role < pg.pg_size; role++) @@ -463,7 +458,7 @@ resume_9: { op_data->stripes[role].write_start = 0; op_data->stripes[role].write_end = bs_block_size; - op_data->stripes[role].write_buf = op_data->recovery_buf + role*bs_block_size; + op_data->stripes[role].write_buf = cur_op->buf + role*bs_block_size; } } pg.ver_override[op_data->oid] = op_data->fact_ver; @@ -554,16 +549,17 @@ resume_7: this->clients[cur_op->peer_fd].dirty_pgs.insert(op_data->pg_num); } // Remove version override - pg.ver_override.erase(op_data->oid); + object_id oid = op_data->oid; + pg.ver_override.erase(oid); finish_op(cur_op, cur_op->req.rw.len); // Continue other write operations to the same object { - auto next_it = pg.write_queue.find(op_data->oid); + auto next_it = pg.write_queue.find(oid); auto this_it = next_it; next_it++; pg.write_queue.erase(this_it); if (next_it != pg.write_queue.end() && - next_it->first == op_data->oid) + next_it->first == oid) { osd_op_t *next_op = next_it->second; continue_primary_write(next_op); diff --git a/osd_receive.cpp b/osd_receive.cpp index 2acd4901..d68e7045 100644 --- a/osd_receive.cpp +++ b/osd_receive.cpp @@ -133,6 +133,7 @@ void osd_t::handle_finished_read(osd_client_t & cl) osd_op_t *request = req_it->second; cl.sent_ops.erase(req_it); cl.read_reply_id = 0; + delete cl.read_op; cl.read_op = NULL; cl.read_state = 0; // Measure subop latency @@ -145,6 +146,10 @@ void osd_t::handle_finished_read(osd_client_t & cl) ); request->callback(request); } + else + { + assert(0); + } } void osd_t::handle_op_hdr(osd_client_t *cl) @@ -230,6 +235,7 @@ void osd_t::handle_reply_hdr(osd_client_t *cl) } else { + delete cl->read_op; cl->read_state = 0; cl->read_op = NULL; cl->sent_ops.erase(req_it);