forked from vitalif/vitastor
Fix canceling of write operations on PG re-peer (which led to use-after-free, too...)
parent
3469bead67
commit
45b1c2fbf1
|
@ -9,10 +9,7 @@
|
|||
osd_op_t::~osd_op_t()
|
||||
{
|
||||
assert(!bs_op);
|
||||
if (op_data)
|
||||
{
|
||||
free(op_data);
|
||||
}
|
||||
assert(!op_data);
|
||||
if (rmw_buf)
|
||||
{
|
||||
free(rmw_buf);
|
||||
|
@ -271,29 +268,37 @@ void cluster_client_t::cancel_osd_ops(osd_client_t & cl)
|
|||
{
|
||||
for (auto p: cl.sent_ops)
|
||||
{
|
||||
cancel_out_op(p.second);
|
||||
cancel_op(p.second);
|
||||
}
|
||||
cl.sent_ops.clear();
|
||||
for (auto op: cl.outbox)
|
||||
{
|
||||
cancel_out_op(op);
|
||||
cancel_op(op);
|
||||
}
|
||||
cl.outbox.clear();
|
||||
if (cl.write_op)
|
||||
{
|
||||
cancel_out_op(cl.write_op);
|
||||
cancel_op(cl.write_op);
|
||||
cl.write_op = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
void cluster_client_t::cancel_out_op(osd_op_t *op)
|
||||
void cluster_client_t::cancel_op(osd_op_t *op)
|
||||
{
|
||||
op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC;
|
||||
op->reply.hdr.id = op->req.hdr.id;
|
||||
op->reply.hdr.opcode = op->req.hdr.opcode;
|
||||
op->reply.hdr.retval = -EPIPE;
|
||||
// Copy lambda to be unaffected by `delete op`
|
||||
std::function<void(osd_op_t*)>(op->callback)(op);
|
||||
if (op->op_type == OSD_OP_OUT)
|
||||
{
|
||||
op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC;
|
||||
op->reply.hdr.id = op->req.hdr.id;
|
||||
op->reply.hdr.opcode = op->req.hdr.opcode;
|
||||
op->reply.hdr.retval = -EPIPE;
|
||||
// Copy lambda to be unaffected by `delete op`
|
||||
std::function<void(osd_op_t*)>(op->callback)(op);
|
||||
}
|
||||
else
|
||||
{
|
||||
// This function is only called in stop_client(), so it's fine to destroy the operation
|
||||
delete op;
|
||||
}
|
||||
}
|
||||
|
||||
void cluster_client_t::stop_client(int peer_fd)
|
||||
|
@ -349,7 +354,6 @@ void cluster_client_t::stop_client(int peer_fd)
|
|||
}
|
||||
}
|
||||
free(cl.in_buf);
|
||||
assert(peer_fd != 0);
|
||||
close(peer_fd);
|
||||
if (repeer_osd)
|
||||
{
|
||||
|
|
|
@ -122,11 +122,12 @@ struct osd_client_t
|
|||
int read_remaining = 0;
|
||||
int read_state = 0;
|
||||
|
||||
// Outbound operations sent to this peer
|
||||
std::map<int, osd_op_t*> sent_ops;
|
||||
// Incoming operations
|
||||
std::vector<osd_op_t*> received_ops;
|
||||
|
||||
// Outbound messages (replies or requests)
|
||||
// Outbound operations
|
||||
std::deque<osd_op_t*> outbox;
|
||||
std::map<int, osd_op_t*> sent_ops;
|
||||
|
||||
// PGs dirtied by this client's primary-writes (FIXME to drop the connection)
|
||||
std::set<pg_num_t> dirty_pgs;
|
||||
|
@ -180,28 +181,28 @@ struct cluster_client_t
|
|||
// op statistics
|
||||
osd_op_stats_t stats;
|
||||
|
||||
// public
|
||||
public:
|
||||
void connect_peer(uint64_t osd_num, json11::Json address_list, int port);
|
||||
void stop_client(int peer_fd);
|
||||
void outbox_push(osd_op_t *cur_op);
|
||||
std::function<void(osd_op_t*)> exec_op;
|
||||
std::function<void(osd_num_t)> repeer_pgs;
|
||||
void handle_peer_epoll(int peer_fd, int epoll_events);
|
||||
void read_requests();
|
||||
void send_replies();
|
||||
|
||||
// private
|
||||
protected:
|
||||
void try_connect_peer(uint64_t osd_num);
|
||||
void try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port);
|
||||
void handle_connect_epoll(int peer_fd);
|
||||
void handle_peer_epoll(int peer_fd, int epoll_events);
|
||||
void on_connect_peer(osd_num_t peer_osd, int peer_fd);
|
||||
void check_peer_config(osd_client_t & cl);
|
||||
void cancel_osd_ops(osd_client_t & cl);
|
||||
void cancel_out_op(osd_op_t *op);
|
||||
void cancel_op(osd_op_t *op);
|
||||
|
||||
bool try_send(osd_client_t & cl);
|
||||
void send_replies();
|
||||
void handle_send(int result, int peer_fd);
|
||||
|
||||
void read_requests();
|
||||
bool handle_read(int result, int peer_fd);
|
||||
void handle_finished_read(osd_client_t & cl);
|
||||
void handle_op_hdr(osd_client_t *cl);
|
||||
|
|
3
osd.h
3
osd.h
|
@ -188,6 +188,7 @@ class osd_t
|
|||
bool prepare_primary_rw(osd_op_t *cur_op);
|
||||
void continue_primary_read(osd_op_t *cur_op);
|
||||
void continue_primary_write(osd_op_t *cur_op);
|
||||
void cancel_primary_write(osd_op_t *cur_op);
|
||||
void continue_primary_sync(osd_op_t *cur_op);
|
||||
void continue_primary_del(osd_op_t *cur_op);
|
||||
bool check_write_queue(osd_op_t *cur_op, pg_t & pg);
|
||||
|
@ -196,7 +197,7 @@ class osd_t
|
|||
void handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int retval, int expected, uint64_t version);
|
||||
void handle_primary_bs_subop(osd_op_t *subop);
|
||||
void add_bs_subop_stats(osd_op_t *subop);
|
||||
void pg_cancel_write_queue(pg_t & pg, object_id oid, int retval);
|
||||
void pg_cancel_write_queue(pg_t & pg, osd_op_t *first_op, object_id oid, int retval);
|
||||
void submit_primary_subops(int submit_type, int read_pg_size, const uint64_t* osd_set, osd_op_t *cur_op);
|
||||
void submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_osd_set_t & loc_set);
|
||||
void submit_primary_sync_subops(osd_op_t *cur_op);
|
||||
|
|
|
@ -120,7 +120,7 @@ void osd_t::start_pg_peering(pg_num_t pg_num)
|
|||
pg.flush_batch = NULL;
|
||||
for (auto p: pg.write_queue)
|
||||
{
|
||||
finish_op(p.second, -EPIPE);
|
||||
cancel_primary_write(p.second);
|
||||
}
|
||||
pg.write_queue.clear();
|
||||
for (auto it = unstable_writes.begin(); it != unstable_writes.end(); )
|
||||
|
@ -132,7 +132,6 @@ void osd_t::start_pg_peering(pg_num_t pg_num)
|
|||
else
|
||||
it++;
|
||||
}
|
||||
pg.inflight = 0;
|
||||
dirty_pgs.erase(pg.pg_num);
|
||||
// Calculate current write OSD set
|
||||
pg.pg_cursize = 0;
|
||||
|
|
|
@ -218,7 +218,7 @@ resume_2:
|
|||
resume_3:
|
||||
if (op_data->errors > 0)
|
||||
{
|
||||
pg_cancel_write_queue(pg, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
|
||||
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
|
||||
return;
|
||||
}
|
||||
// Save version override for parallel reads
|
||||
|
@ -233,7 +233,7 @@ resume_4:
|
|||
resume_5:
|
||||
if (op_data->errors > 0)
|
||||
{
|
||||
pg_cancel_write_queue(pg, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
|
||||
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
|
||||
return;
|
||||
}
|
||||
if (op_data->fact_ver == 1)
|
||||
|
@ -268,7 +268,7 @@ resume_5:
|
|||
resume_8:
|
||||
if (op_data->errors > 0)
|
||||
{
|
||||
pg_cancel_write_queue(pg, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
|
||||
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -291,6 +291,7 @@ resume_7:
|
|||
// Continue other write operations to the same object
|
||||
auto next_it = pg.write_queue.find(oid);
|
||||
auto this_it = next_it;
|
||||
assert(this_it->second == cur_op);
|
||||
next_it++;
|
||||
pg.write_queue.erase(this_it);
|
||||
if (next_it != pg.write_queue.end() &&
|
||||
|
@ -347,7 +348,7 @@ resume_7:
|
|||
op_data->unstable_write_osds = NULL;
|
||||
if (op_data->errors > 0)
|
||||
{
|
||||
pg_cancel_write_queue(pg, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
|
||||
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -618,7 +619,7 @@ resume_2:
|
|||
resume_3:
|
||||
if (op_data->errors > 0)
|
||||
{
|
||||
pg_cancel_write_queue(pg, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
|
||||
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
|
||||
return;
|
||||
}
|
||||
// Save version override for parallel reads
|
||||
|
@ -632,7 +633,7 @@ resume_4:
|
|||
resume_5:
|
||||
if (op_data->errors > 0)
|
||||
{
|
||||
pg_cancel_write_queue(pg, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
|
||||
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
|
||||
return;
|
||||
}
|
||||
// Remove version override
|
||||
|
|
|
@ -33,15 +33,24 @@ void osd_t::autosync()
|
|||
void osd_t::finish_op(osd_op_t *cur_op, int retval)
|
||||
{
|
||||
inflight_ops--;
|
||||
if (cur_op->op_data && cur_op->op_data->pg_num > 0)
|
||||
if (cur_op->op_data)
|
||||
{
|
||||
auto & pg = pgs[cur_op->op_data->pg_num];
|
||||
pg.inflight--;
|
||||
assert(pg.inflight >= 0);
|
||||
if ((pg.state & PG_STOPPING) && pg.inflight == 0 && !pg.flush_batch)
|
||||
if (cur_op->op_data->pg_num > 0)
|
||||
{
|
||||
finish_stop_pg(pg);
|
||||
auto & pg = pgs[cur_op->op_data->pg_num];
|
||||
pg.inflight--;
|
||||
assert(pg.inflight >= 0);
|
||||
if ((pg.state & PG_STOPPING) && pg.inflight == 0 && !pg.flush_batch)
|
||||
{
|
||||
finish_stop_pg(pg);
|
||||
}
|
||||
}
|
||||
assert(!cur_op->op_data->subops);
|
||||
assert(!cur_op->op_data->unstable_write_osds);
|
||||
assert(!cur_op->op_data->unstable_writes);
|
||||
assert(!cur_op->op_data->dirty_pgs);
|
||||
free(cur_op->op_data);
|
||||
cur_op->op_data = NULL;
|
||||
}
|
||||
if (!cur_op->peer_fd)
|
||||
{
|
||||
|
@ -290,6 +299,23 @@ void osd_t::handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int retval,
|
|||
}
|
||||
}
|
||||
|
||||
void osd_t::cancel_primary_write(osd_op_t *cur_op)
|
||||
{
|
||||
if (cur_op->op_data && cur_op->op_data->subops)
|
||||
{
|
||||
// Primary-write operation is waiting for subops, subops
|
||||
// are sent to peer OSDs, so we can't just throw them away.
|
||||
// Mark them with an extra EPIPE.
|
||||
cur_op->op_data->errors++;
|
||||
cur_op->op_data->epipe++;
|
||||
cur_op->op_data->done--; // Caution: `done` must be signed because may become -1 here
|
||||
}
|
||||
else
|
||||
{
|
||||
finish_op(cur_op, -EPIPE);
|
||||
}
|
||||
}
|
||||
|
||||
void osd_t::submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_osd_set_t & loc_set)
|
||||
{
|
||||
osd_primary_op_data_t *op_data = cur_op->op_data;
|
||||
|
@ -474,9 +500,20 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op)
|
|||
}
|
||||
}
|
||||
|
||||
void osd_t::pg_cancel_write_queue(pg_t & pg, object_id oid, int retval)
|
||||
void osd_t::pg_cancel_write_queue(pg_t & pg, osd_op_t *first_op, object_id oid, int retval)
|
||||
{
|
||||
auto st_it = pg.write_queue.find(oid), it = st_it;
|
||||
finish_op(first_op, retval);
|
||||
if (it != pg.write_queue.end() && it->second == first_op)
|
||||
{
|
||||
it++;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Write queue doesn't match the first operation.
|
||||
// first_op is a leftover operation from the previous peering of the same PG.
|
||||
return;
|
||||
}
|
||||
while (it != pg.write_queue.end() && it->first == oid)
|
||||
{
|
||||
finish_op(it->second, retval);
|
||||
|
|
|
@ -123,6 +123,7 @@ void cluster_client_t::handle_finished_read(osd_client_t & cl)
|
|||
else if (cl.read_state == CL_READ_DATA)
|
||||
{
|
||||
// Operation is ready
|
||||
cl.received_ops.push_back(cl.read_op);
|
||||
exec_op(cl.read_op);
|
||||
cl.read_op = NULL;
|
||||
cl.read_state = 0;
|
||||
|
@ -203,6 +204,7 @@ void cluster_client_t::handle_op_hdr(osd_client_t *cl)
|
|||
// Operation is ready
|
||||
cl->read_op = NULL;
|
||||
cl->read_state = 0;
|
||||
cl->received_ops.push_back(cur_op);
|
||||
exec_op(cur_op);
|
||||
}
|
||||
}
|
||||
|
|
19
osd_send.cpp
19
osd_send.cpp
|
@ -8,6 +8,25 @@ void cluster_client_t::outbox_push(osd_op_t *cur_op)
|
|||
{
|
||||
clock_gettime(CLOCK_REALTIME, &cur_op->tv_begin);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Check that operation actually belongs to this client
|
||||
bool found = false;
|
||||
for (auto it = cl.received_ops.begin(); it != cl.received_ops.end(); it++)
|
||||
{
|
||||
if (*it == cur_op)
|
||||
{
|
||||
found = true;
|
||||
cl.received_ops.erase(it, it+1);
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!found)
|
||||
{
|
||||
delete cur_op;
|
||||
return;
|
||||
}
|
||||
}
|
||||
cl.outbox.push_back(cur_op);
|
||||
if (cl.write_op || cl.outbox.size() > 1 || !try_send(cl))
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue