From 45b1c2fbf1d8b54daf6c3e7fb5bcbd8af6bc8f7d Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Mon, 1 Jun 2020 01:53:32 +0300 Subject: [PATCH] Fix canceling of write operations on PG re-peer (which led to use-after-free, too...) --- cluster_client.cpp | 34 +++++++++++++++------------- cluster_client.h | 19 ++++++++-------- osd.h | 3 ++- osd_peering.cpp | 3 +-- osd_primary.cpp | 13 ++++++----- osd_primary_subops.cpp | 51 ++++++++++++++++++++++++++++++++++++------ osd_receive.cpp | 2 ++ osd_send.cpp | 19 ++++++++++++++++ 8 files changed, 104 insertions(+), 40 deletions(-) diff --git a/cluster_client.cpp b/cluster_client.cpp index e2f6d403..41f76b91 100644 --- a/cluster_client.cpp +++ b/cluster_client.cpp @@ -9,10 +9,7 @@ osd_op_t::~osd_op_t() { assert(!bs_op); - if (op_data) - { - free(op_data); - } + assert(!op_data); if (rmw_buf) { free(rmw_buf); @@ -271,29 +268,37 @@ void cluster_client_t::cancel_osd_ops(osd_client_t & cl) { for (auto p: cl.sent_ops) { - cancel_out_op(p.second); + cancel_op(p.second); } cl.sent_ops.clear(); for (auto op: cl.outbox) { - cancel_out_op(op); + cancel_op(op); } cl.outbox.clear(); if (cl.write_op) { - cancel_out_op(cl.write_op); + cancel_op(cl.write_op); cl.write_op = NULL; } } -void cluster_client_t::cancel_out_op(osd_op_t *op) +void cluster_client_t::cancel_op(osd_op_t *op) { - op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; - op->reply.hdr.id = op->req.hdr.id; - op->reply.hdr.opcode = op->req.hdr.opcode; - op->reply.hdr.retval = -EPIPE; - // Copy lambda to be unaffected by `delete op` - std::function(op->callback)(op); + if (op->op_type == OSD_OP_OUT) + { + op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; + op->reply.hdr.id = op->req.hdr.id; + op->reply.hdr.opcode = op->req.hdr.opcode; + op->reply.hdr.retval = -EPIPE; + // Copy lambda to be unaffected by `delete op` + std::function(op->callback)(op); + } + else + { + // This function is only called in stop_client(), so it's fine to destroy the operation + delete op; + } } void cluster_client_t::stop_client(int peer_fd) @@ -349,7 +354,6 @@ void cluster_client_t::stop_client(int peer_fd) } } free(cl.in_buf); - assert(peer_fd != 0); close(peer_fd); if (repeer_osd) { diff --git a/cluster_client.h b/cluster_client.h index 8fbc5d9f..040e4a4e 100644 --- a/cluster_client.h +++ b/cluster_client.h @@ -122,11 +122,12 @@ struct osd_client_t int read_remaining = 0; int read_state = 0; - // Outbound operations sent to this peer - std::map sent_ops; + // Incoming operations + std::vector received_ops; - // Outbound messages (replies or requests) + // Outbound operations std::deque outbox; + std::map sent_ops; // PGs dirtied by this client's primary-writes (FIXME to drop the connection) std::set dirty_pgs; @@ -180,28 +181,28 @@ struct cluster_client_t // op statistics osd_op_stats_t stats; - // public +public: void connect_peer(uint64_t osd_num, json11::Json address_list, int port); void stop_client(int peer_fd); void outbox_push(osd_op_t *cur_op); std::function exec_op; std::function repeer_pgs; + void handle_peer_epoll(int peer_fd, int epoll_events); + void read_requests(); + void send_replies(); - // private +protected: void try_connect_peer(uint64_t osd_num); void try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port); void handle_connect_epoll(int peer_fd); - void handle_peer_epoll(int peer_fd, int epoll_events); void on_connect_peer(osd_num_t peer_osd, int peer_fd); void check_peer_config(osd_client_t & cl); void cancel_osd_ops(osd_client_t & cl); - void cancel_out_op(osd_op_t *op); + void cancel_op(osd_op_t *op); bool try_send(osd_client_t & cl); - void send_replies(); void handle_send(int result, int peer_fd); - void read_requests(); bool handle_read(int result, int peer_fd); void handle_finished_read(osd_client_t & cl); void handle_op_hdr(osd_client_t *cl); diff --git a/osd.h b/osd.h index 1a796287..d35472a1 100644 --- a/osd.h +++ b/osd.h @@ -188,6 +188,7 @@ class osd_t bool prepare_primary_rw(osd_op_t *cur_op); void continue_primary_read(osd_op_t *cur_op); void continue_primary_write(osd_op_t *cur_op); + void cancel_primary_write(osd_op_t *cur_op); void continue_primary_sync(osd_op_t *cur_op); void continue_primary_del(osd_op_t *cur_op); bool check_write_queue(osd_op_t *cur_op, pg_t & pg); @@ -196,7 +197,7 @@ class osd_t void handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int retval, int expected, uint64_t version); void handle_primary_bs_subop(osd_op_t *subop); void add_bs_subop_stats(osd_op_t *subop); - void pg_cancel_write_queue(pg_t & pg, object_id oid, int retval); + void pg_cancel_write_queue(pg_t & pg, osd_op_t *first_op, object_id oid, int retval); void submit_primary_subops(int submit_type, int read_pg_size, const uint64_t* osd_set, osd_op_t *cur_op); void submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_osd_set_t & loc_set); void submit_primary_sync_subops(osd_op_t *cur_op); diff --git a/osd_peering.cpp b/osd_peering.cpp index c4c97427..be900d58 100644 --- a/osd_peering.cpp +++ b/osd_peering.cpp @@ -120,7 +120,7 @@ void osd_t::start_pg_peering(pg_num_t pg_num) pg.flush_batch = NULL; for (auto p: pg.write_queue) { - finish_op(p.second, -EPIPE); + cancel_primary_write(p.second); } pg.write_queue.clear(); for (auto it = unstable_writes.begin(); it != unstable_writes.end(); ) @@ -132,7 +132,6 @@ void osd_t::start_pg_peering(pg_num_t pg_num) else it++; } - pg.inflight = 0; dirty_pgs.erase(pg.pg_num); // Calculate current write OSD set pg.pg_cursize = 0; diff --git a/osd_primary.cpp b/osd_primary.cpp index 3257152d..022cfae7 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -218,7 +218,7 @@ resume_2: resume_3: if (op_data->errors > 0) { - pg_cancel_write_queue(pg, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO); + pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO); return; } // Save version override for parallel reads @@ -233,7 +233,7 @@ resume_4: resume_5: if (op_data->errors > 0) { - pg_cancel_write_queue(pg, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO); + pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO); return; } if (op_data->fact_ver == 1) @@ -268,7 +268,7 @@ resume_5: resume_8: if (op_data->errors > 0) { - pg_cancel_write_queue(pg, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO); + pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO); return; } } @@ -291,6 +291,7 @@ resume_7: // Continue other write operations to the same object auto next_it = pg.write_queue.find(oid); auto this_it = next_it; + assert(this_it->second == cur_op); next_it++; pg.write_queue.erase(this_it); if (next_it != pg.write_queue.end() && @@ -347,7 +348,7 @@ resume_7: op_data->unstable_write_osds = NULL; if (op_data->errors > 0) { - pg_cancel_write_queue(pg, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO); + pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO); return false; } } @@ -618,7 +619,7 @@ resume_2: resume_3: if (op_data->errors > 0) { - pg_cancel_write_queue(pg, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO); + pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO); return; } // Save version override for parallel reads @@ -632,7 +633,7 @@ resume_4: resume_5: if (op_data->errors > 0) { - pg_cancel_write_queue(pg, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO); + pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO); return; } // Remove version override diff --git a/osd_primary_subops.cpp b/osd_primary_subops.cpp index d763e9f0..1ad73852 100644 --- a/osd_primary_subops.cpp +++ b/osd_primary_subops.cpp @@ -33,15 +33,24 @@ void osd_t::autosync() void osd_t::finish_op(osd_op_t *cur_op, int retval) { inflight_ops--; - if (cur_op->op_data && cur_op->op_data->pg_num > 0) + if (cur_op->op_data) { - auto & pg = pgs[cur_op->op_data->pg_num]; - pg.inflight--; - assert(pg.inflight >= 0); - if ((pg.state & PG_STOPPING) && pg.inflight == 0 && !pg.flush_batch) + if (cur_op->op_data->pg_num > 0) { - finish_stop_pg(pg); + auto & pg = pgs[cur_op->op_data->pg_num]; + pg.inflight--; + assert(pg.inflight >= 0); + if ((pg.state & PG_STOPPING) && pg.inflight == 0 && !pg.flush_batch) + { + finish_stop_pg(pg); + } } + assert(!cur_op->op_data->subops); + assert(!cur_op->op_data->unstable_write_osds); + assert(!cur_op->op_data->unstable_writes); + assert(!cur_op->op_data->dirty_pgs); + free(cur_op->op_data); + cur_op->op_data = NULL; } if (!cur_op->peer_fd) { @@ -290,6 +299,23 @@ void osd_t::handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int retval, } } +void osd_t::cancel_primary_write(osd_op_t *cur_op) +{ + if (cur_op->op_data && cur_op->op_data->subops) + { + // Primary-write operation is waiting for subops, subops + // are sent to peer OSDs, so we can't just throw them away. + // Mark them with an extra EPIPE. + cur_op->op_data->errors++; + cur_op->op_data->epipe++; + cur_op->op_data->done--; // Caution: `done` must be signed because may become -1 here + } + else + { + finish_op(cur_op, -EPIPE); + } +} + void osd_t::submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_osd_set_t & loc_set) { osd_primary_op_data_t *op_data = cur_op->op_data; @@ -474,9 +500,20 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op) } } -void osd_t::pg_cancel_write_queue(pg_t & pg, object_id oid, int retval) +void osd_t::pg_cancel_write_queue(pg_t & pg, osd_op_t *first_op, object_id oid, int retval) { auto st_it = pg.write_queue.find(oid), it = st_it; + finish_op(first_op, retval); + if (it != pg.write_queue.end() && it->second == first_op) + { + it++; + } + else + { + // Write queue doesn't match the first operation. + // first_op is a leftover operation from the previous peering of the same PG. + return; + } while (it != pg.write_queue.end() && it->first == oid) { finish_op(it->second, retval); diff --git a/osd_receive.cpp b/osd_receive.cpp index 4097fc94..aa4e41eb 100644 --- a/osd_receive.cpp +++ b/osd_receive.cpp @@ -123,6 +123,7 @@ void cluster_client_t::handle_finished_read(osd_client_t & cl) else if (cl.read_state == CL_READ_DATA) { // Operation is ready + cl.received_ops.push_back(cl.read_op); exec_op(cl.read_op); cl.read_op = NULL; cl.read_state = 0; @@ -203,6 +204,7 @@ void cluster_client_t::handle_op_hdr(osd_client_t *cl) // Operation is ready cl->read_op = NULL; cl->read_state = 0; + cl->received_ops.push_back(cur_op); exec_op(cur_op); } } diff --git a/osd_send.cpp b/osd_send.cpp index cc0527b6..319698ba 100644 --- a/osd_send.cpp +++ b/osd_send.cpp @@ -8,6 +8,25 @@ void cluster_client_t::outbox_push(osd_op_t *cur_op) { clock_gettime(CLOCK_REALTIME, &cur_op->tv_begin); } + else + { + // Check that operation actually belongs to this client + bool found = false; + for (auto it = cl.received_ops.begin(); it != cl.received_ops.end(); it++) + { + if (*it == cur_op) + { + found = true; + cl.received_ops.erase(it, it+1); + break; + } + } + if (!found) + { + delete cur_op; + return; + } + } cl.outbox.push_back(cur_op); if (cl.write_op || cl.outbox.size() > 1 || !try_send(cl)) {