forked from vitalif/vitastor
Fix crashes, print some stats
Notably: - fix the `delete op` inside lambda callback crash (it frees the lambda itself which results in use-after-free with g++) - fix stop_client() reenterability - fix a bug in the blockstore layer which resulted in always returning version=0 for zero-length reads - change error codes for blockstore_stabilizetrace-sqes
parent
92c800bb64
commit
0f43f6d3f6
|
@ -93,7 +93,7 @@ Input:
|
||||||
- buf = pre-allocated obj_ver_id array <len> units long
|
- buf = pre-allocated obj_ver_id array <len> units long
|
||||||
|
|
||||||
Output:
|
Output:
|
||||||
- retval = 0 or negative error number (-EINVAL or -EBUSY if not synced)
|
- retval = 0 or negative error number (-EINVAL, -ENOENT if no such version or -EBUSY if not synced)
|
||||||
|
|
||||||
## BS_OP_SYNC_STAB_ALL
|
## BS_OP_SYNC_STAB_ALL
|
||||||
|
|
||||||
|
|
|
@ -322,7 +322,7 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op, bool first)
|
||||||
{
|
{
|
||||||
// Basic verification not passed
|
// Basic verification not passed
|
||||||
op->retval = -EINVAL;
|
op->retval = -EINVAL;
|
||||||
op->callback(op);
|
std::function<void (blockstore_op_t*)>(op->callback)(op);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (op->opcode == BS_OP_SYNC_STAB_ALL)
|
if (op->opcode == BS_OP_SYNC_STAB_ALL)
|
||||||
|
@ -365,13 +365,13 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op, bool first)
|
||||||
}
|
}
|
||||||
if (op->opcode == BS_OP_WRITE && !enqueue_write(op))
|
if (op->opcode == BS_OP_WRITE && !enqueue_write(op))
|
||||||
{
|
{
|
||||||
op->callback(op);
|
std::function<void (blockstore_op_t*)>(op->callback)(op);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (op->opcode == BS_OP_SYNC && immediate_commit == IMMEDIATE_ALL)
|
if (op->opcode == BS_OP_SYNC && immediate_commit == IMMEDIATE_ALL)
|
||||||
{
|
{
|
||||||
op->retval = 0;
|
op->retval = 0;
|
||||||
op->callback(op);
|
std::function<void (blockstore_op_t*)>(op->callback)(op);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// Call constructor without allocating memory. We'll call destructor before returning op back
|
// Call constructor without allocating memory. We'll call destructor before returning op back
|
||||||
|
|
|
@ -141,7 +141,7 @@ struct fulfill_read_t
|
||||||
};
|
};
|
||||||
|
|
||||||
#define PRIV(op) ((blockstore_op_private_t*)(op)->private_data)
|
#define PRIV(op) ((blockstore_op_private_t*)(op)->private_data)
|
||||||
#define FINISH_OP(op) PRIV(op)->~blockstore_op_private_t(); op->callback(op)
|
#define FINISH_OP(op) PRIV(op)->~blockstore_op_private_t(); std::function<void (blockstore_op_t*)>(op->callback)(op)
|
||||||
|
|
||||||
struct blockstore_op_private_t
|
struct blockstore_op_private_t
|
||||||
{
|
{
|
||||||
|
|
|
@ -131,63 +131,66 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op)
|
||||||
dirty_it--;
|
dirty_it--;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (clean_it != clean_db.end() && fulfilled < read_op->len)
|
if (clean_it != clean_db.end())
|
||||||
{
|
{
|
||||||
if (!result_version)
|
if (!result_version)
|
||||||
{
|
{
|
||||||
result_version = clean_it->second.version;
|
result_version = clean_it->second.version;
|
||||||
}
|
}
|
||||||
if (!clean_entry_bitmap_size)
|
if (fulfilled < read_op->len)
|
||||||
{
|
{
|
||||||
if (!fulfill_read(read_op, fulfilled, 0, block_size, ST_CURRENT, 0, clean_it->second.location))
|
if (!clean_entry_bitmap_size)
|
||||||
{
|
{
|
||||||
// need to wait. undo added requests, don't dequeue op
|
if (!fulfill_read(read_op, fulfilled, 0, block_size, ST_CURRENT, 0, clean_it->second.location))
|
||||||
PRIV(read_op)->read_vec.clear();
|
{
|
||||||
return 0;
|
// need to wait. undo added requests, don't dequeue op
|
||||||
}
|
PRIV(read_op)->read_vec.clear();
|
||||||
}
|
return 0;
|
||||||
else
|
}
|
||||||
{
|
|
||||||
uint64_t meta_loc = clean_it->second.location >> block_order;
|
|
||||||
uint8_t *clean_entry_bitmap;
|
|
||||||
if (inmemory_meta)
|
|
||||||
{
|
|
||||||
uint64_t sector = (meta_loc / (meta_block_size / clean_entry_size)) * meta_block_size;
|
|
||||||
uint64_t pos = (meta_loc % (meta_block_size / clean_entry_size));
|
|
||||||
clean_entry_bitmap = (uint8_t*)(metadata_buffer + sector + pos*clean_entry_size + sizeof(clean_disk_entry));
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
clean_entry_bitmap = (uint8_t*)(clean_bitmap + meta_loc*clean_entry_bitmap_size);
|
uint64_t meta_loc = clean_it->second.location >> block_order;
|
||||||
}
|
uint8_t *clean_entry_bitmap;
|
||||||
uint64_t bmp_start = 0, bmp_end = 0, bmp_size = block_size/bitmap_granularity;
|
if (inmemory_meta)
|
||||||
while (bmp_start < bmp_size)
|
|
||||||
{
|
|
||||||
while (!(clean_entry_bitmap[bmp_end >> 3] & (1 << (bmp_end & 0x7))) && bmp_end < bmp_size)
|
|
||||||
{
|
{
|
||||||
bmp_end++;
|
uint64_t sector = (meta_loc / (meta_block_size / clean_entry_size)) * meta_block_size;
|
||||||
|
uint64_t pos = (meta_loc % (meta_block_size / clean_entry_size));
|
||||||
|
clean_entry_bitmap = (uint8_t*)(metadata_buffer + sector + pos*clean_entry_size + sizeof(clean_disk_entry));
|
||||||
}
|
}
|
||||||
if (bmp_end > bmp_start)
|
else
|
||||||
{
|
{
|
||||||
// fill with zeroes
|
clean_entry_bitmap = (uint8_t*)(clean_bitmap + meta_loc*clean_entry_bitmap_size);
|
||||||
fulfill_read(read_op, fulfilled, bmp_start * bitmap_granularity,
|
|
||||||
bmp_end * bitmap_granularity, ST_DEL_STABLE, 0, 0);
|
|
||||||
}
|
}
|
||||||
bmp_start = bmp_end;
|
uint64_t bmp_start = 0, bmp_end = 0, bmp_size = block_size/bitmap_granularity;
|
||||||
while (clean_entry_bitmap[bmp_end >> 3] & (1 << (bmp_end & 0x7)) && bmp_end < bmp_size)
|
while (bmp_start < bmp_size)
|
||||||
{
|
{
|
||||||
bmp_end++;
|
while (!(clean_entry_bitmap[bmp_end >> 3] & (1 << (bmp_end & 0x7))) && bmp_end < bmp_size)
|
||||||
}
|
|
||||||
if (bmp_end > bmp_start)
|
|
||||||
{
|
|
||||||
if (!fulfill_read(read_op, fulfilled, bmp_start * bitmap_granularity,
|
|
||||||
bmp_end * bitmap_granularity, ST_CURRENT, 0, clean_it->second.location + bmp_start * bitmap_granularity))
|
|
||||||
{
|
{
|
||||||
// need to wait. undo added requests, don't dequeue op
|
bmp_end++;
|
||||||
PRIV(read_op)->read_vec.clear();
|
}
|
||||||
return 0;
|
if (bmp_end > bmp_start)
|
||||||
|
{
|
||||||
|
// fill with zeroes
|
||||||
|
fulfill_read(read_op, fulfilled, bmp_start * bitmap_granularity,
|
||||||
|
bmp_end * bitmap_granularity, ST_DEL_STABLE, 0, 0);
|
||||||
}
|
}
|
||||||
bmp_start = bmp_end;
|
bmp_start = bmp_end;
|
||||||
|
while (clean_entry_bitmap[bmp_end >> 3] & (1 << (bmp_end & 0x7)) && bmp_end < bmp_size)
|
||||||
|
{
|
||||||
|
bmp_end++;
|
||||||
|
}
|
||||||
|
if (bmp_end > bmp_start)
|
||||||
|
{
|
||||||
|
if (!fulfill_read(read_op, fulfilled, bmp_start * bitmap_granularity,
|
||||||
|
bmp_end * bitmap_granularity, ST_CURRENT, 0, clean_it->second.location + bmp_start * bitmap_granularity))
|
||||||
|
{
|
||||||
|
// need to wait. undo added requests, don't dequeue op
|
||||||
|
PRIV(read_op)->read_vec.clear();
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
bmp_start = bmp_end;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,7 +24,7 @@ int blockstore_impl_t::dequeue_rollback(blockstore_op_t *op)
|
||||||
// FIXME Skip this object version
|
// FIXME Skip this object version
|
||||||
}
|
}
|
||||||
bad_op:
|
bad_op:
|
||||||
op->retval = -EINVAL;
|
op->retval = -ENOENT;
|
||||||
FINISH_OP(op);
|
FINISH_OP(op);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,7 +55,7 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op)
|
||||||
if (clean_it == clean_db.end() || clean_it->second.version < v->version)
|
if (clean_it == clean_db.end() || clean_it->second.version < v->version)
|
||||||
{
|
{
|
||||||
// No such object version
|
// No such object version
|
||||||
op->retval = -EINVAL;
|
op->retval = -ENOENT;
|
||||||
FINISH_OP(op);
|
FINISH_OP(op);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -42,7 +42,7 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op)
|
||||||
else if (op->version < version)
|
else if (op->version < version)
|
||||||
{
|
{
|
||||||
// Invalid version requested
|
// Invalid version requested
|
||||||
op->retval = -EINVAL;
|
op->retval = -EEXIST;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (deleted && is_del)
|
if (deleted && is_del)
|
||||||
|
|
42
osd.cpp
42
osd.cpp
|
@ -7,7 +7,7 @@
|
||||||
|
|
||||||
#include "osd.h"
|
#include "osd.h"
|
||||||
|
|
||||||
static const char* osd_op_names[] = {
|
const char* osd_op_names[] = {
|
||||||
"",
|
"",
|
||||||
"read",
|
"read",
|
||||||
"write",
|
"write",
|
||||||
|
@ -54,6 +54,18 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo
|
||||||
send_stat_count = 0;
|
send_stat_count = 0;
|
||||||
send_stat_sum = 0;
|
send_stat_sum = 0;
|
||||||
}
|
}
|
||||||
|
if (incomplete_objects > 0)
|
||||||
|
{
|
||||||
|
printf("%lu object(s) incomplete\n", incomplete_objects);
|
||||||
|
}
|
||||||
|
if (degraded_objects > 0)
|
||||||
|
{
|
||||||
|
printf("%lu object(s) degraded\n", degraded_objects);
|
||||||
|
}
|
||||||
|
if (misplaced_objects > 0)
|
||||||
|
{
|
||||||
|
printf("%lu object(s) misplaced\n", misplaced_objects);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
this->bs_block_size = bs->get_block_size();
|
this->bs_block_size = bs->get_block_size();
|
||||||
// FIXME: use bitmap granularity instead
|
// FIXME: use bitmap granularity instead
|
||||||
|
@ -301,7 +313,8 @@ void osd_t::cancel_op(osd_op_t *op)
|
||||||
op->reply.hdr.id = op->req.hdr.id;
|
op->reply.hdr.id = op->req.hdr.id;
|
||||||
op->reply.hdr.opcode = op->req.hdr.opcode;
|
op->reply.hdr.opcode = op->req.hdr.opcode;
|
||||||
op->reply.hdr.retval = -EPIPE;
|
op->reply.hdr.retval = -EPIPE;
|
||||||
op->callback(op);
|
// Copy lambda to be unaffected by `delete op`
|
||||||
|
std::function<void(osd_op_t*)>(op->callback)(op);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -316,7 +329,16 @@ void osd_t::stop_client(int peer_fd)
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
auto & cl = it->second;
|
osd_client_t cl = it->second;
|
||||||
|
if (cl.osd_num)
|
||||||
|
{
|
||||||
|
printf("[%lu] Stopping client %d (OSD peer %lu)\n", osd_num, peer_fd, cl.osd_num);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
printf("[%lu] Stopping client %d (regular client)\n", osd_num, peer_fd);
|
||||||
|
}
|
||||||
|
clients.erase(it);
|
||||||
if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, peer_fd, NULL) < 0)
|
if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, peer_fd, NULL) < 0)
|
||||||
{
|
{
|
||||||
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
|
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
|
||||||
|
@ -350,7 +372,6 @@ void osd_t::stop_client(int peer_fd)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
free(cl.in_buf);
|
free(cl.in_buf);
|
||||||
clients.erase(it);
|
|
||||||
close(peer_fd);
|
close(peer_fd);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -372,18 +393,7 @@ void osd_t::exec_op(osd_op_t *cur_op)
|
||||||
(cur_op->req.rw.len > OSD_RW_MAX || cur_op->req.rw.len % OSD_RW_ALIGN || cur_op->req.rw.offset % OSD_RW_ALIGN))
|
(cur_op->req.rw.len > OSD_RW_MAX || cur_op->req.rw.len % OSD_RW_ALIGN || cur_op->req.rw.offset % OSD_RW_ALIGN))
|
||||||
{
|
{
|
||||||
// Bad command
|
// Bad command
|
||||||
cur_op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC;
|
finish_op(cur_op, -EINVAL);
|
||||||
cur_op->reply.hdr.id = cur_op->req.hdr.id;
|
|
||||||
cur_op->reply.hdr.opcode = cur_op->req.hdr.opcode;
|
|
||||||
cur_op->reply.hdr.retval = -EINVAL;
|
|
||||||
if (cur_op->peer_fd)
|
|
||||||
{
|
|
||||||
outbox_push(this->clients[cur_op->peer_fd], cur_op);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
cur_op->callback(cur_op);
|
|
||||||
}
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
inflight_ops++;
|
inflight_ops++;
|
||||||
|
|
3
osd.h
3
osd.h
|
@ -44,6 +44,8 @@
|
||||||
|
|
||||||
//#define OSD_STUB
|
//#define OSD_STUB
|
||||||
|
|
||||||
|
extern const char* osd_op_names[];
|
||||||
|
|
||||||
struct osd_op_buf_list_t
|
struct osd_op_buf_list_t
|
||||||
{
|
{
|
||||||
int count = 0, alloc = 0, sent = 0;
|
int count = 0, alloc = 0, sent = 0;
|
||||||
|
@ -194,6 +196,7 @@ class osd_t
|
||||||
|
|
||||||
std::map<uint64_t, int> osd_peer_fds;
|
std::map<uint64_t, int> osd_peer_fds;
|
||||||
std::map<pg_num_t, pg_t> pgs;
|
std::map<pg_num_t, pg_t> pgs;
|
||||||
|
uint64_t misplaced_objects = 0, degraded_objects = 0, incomplete_objects = 0;
|
||||||
int peering_state = 0;
|
int peering_state = 0;
|
||||||
unsigned pg_count = 0;
|
unsigned pg_count = 0;
|
||||||
uint64_t next_subop_id = 1;
|
uint64_t next_subop_id = 1;
|
||||||
|
|
|
@ -254,6 +254,7 @@ resume_4:
|
||||||
auto st_it = pg->degraded_objects.find(recovery_state.oid);
|
auto st_it = pg->degraded_objects.find(recovery_state.oid);
|
||||||
st = st_it->second;
|
st = st_it->second;
|
||||||
pg->degraded_objects.erase(st_it);
|
pg->degraded_objects.erase(st_it);
|
||||||
|
degraded_objects--;
|
||||||
}
|
}
|
||||||
st->object_count--;
|
st->object_count--;
|
||||||
if (st->state == OBJ_DEGRADED)
|
if (st->state == OBJ_DEGRADED)
|
||||||
|
|
|
@ -176,6 +176,9 @@ void osd_t::handle_peers()
|
||||||
if (!p.second.peering_state->list_ops.size())
|
if (!p.second.peering_state->list_ops.size())
|
||||||
{
|
{
|
||||||
p.second.calc_object_states();
|
p.second.calc_object_states();
|
||||||
|
incomplete_objects += p.second.incomplete_objects.size();
|
||||||
|
misplaced_objects += p.second.misplaced_objects.size();
|
||||||
|
degraded_objects += p.second.degraded_objects.size();
|
||||||
if (p.second.state & PG_HAS_UNCLEAN)
|
if (p.second.state & PG_HAS_UNCLEAN)
|
||||||
peering_state = peering_state | OSD_FLUSHING_PGS;
|
peering_state = peering_state | OSD_FLUSHING_PGS;
|
||||||
else
|
else
|
||||||
|
@ -256,6 +259,9 @@ void osd_t::start_pg_peering(pg_num_t pg_num)
|
||||||
pg.state = PG_PEERING;
|
pg.state = PG_PEERING;
|
||||||
pg.print_state();
|
pg.print_state();
|
||||||
pg.state_dict.clear();
|
pg.state_dict.clear();
|
||||||
|
incomplete_objects -= pg.incomplete_objects.size();
|
||||||
|
misplaced_objects -= pg.misplaced_objects.size();
|
||||||
|
degraded_objects -= pg.degraded_objects.size();
|
||||||
pg.incomplete_objects.clear();
|
pg.incomplete_objects.clear();
|
||||||
pg.misplaced_objects.clear();
|
pg.misplaced_objects.clear();
|
||||||
pg.degraded_objects.clear();
|
pg.degraded_objects.clear();
|
||||||
|
|
|
@ -204,6 +204,14 @@ void pg_obj_state_check_t::finish_object()
|
||||||
{
|
{
|
||||||
printf("Present on: osd %lu, role %ld%s\n", list[i].osd_num, (list[i].oid.stripe & STRIPE_MASK), list[i].is_stable ? " (stable)" : "");
|
printf("Present on: osd %lu, role %ld%s\n", list[i].osd_num, (list[i].oid.stripe & STRIPE_MASK), list[i].is_stable ? " (stable)" : "");
|
||||||
}
|
}
|
||||||
|
if (0)
|
||||||
|
{
|
||||||
|
// For future debug level
|
||||||
|
for (int i = obj_start; i < obj_end; i++)
|
||||||
|
{
|
||||||
|
printf("v%lu present on: osd %lu, role %ld%s\n", list[i].version, list[i].osd_num, (list[i].oid.stripe & STRIPE_MASK), list[i].is_stable ? " (stable)" : "");
|
||||||
|
}
|
||||||
|
}
|
||||||
state = OBJ_INCOMPLETE;
|
state = OBJ_INCOMPLETE;
|
||||||
pg->state = pg->state | PG_HAS_INCOMPLETE;
|
pg->state = pg->state | PG_HAS_INCOMPLETE;
|
||||||
}
|
}
|
||||||
|
|
|
@ -42,7 +42,8 @@ void osd_t::finish_op(osd_op_t *cur_op, int retval)
|
||||||
{
|
{
|
||||||
if (!cur_op->peer_fd)
|
if (!cur_op->peer_fd)
|
||||||
{
|
{
|
||||||
cur_op->callback(cur_op);
|
// Copy lambda to be unaffected by `delete op`
|
||||||
|
std::function<void(osd_op_t*)>(cur_op->callback)(cur_op);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -254,7 +255,7 @@ void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t*
|
||||||
if (subop->opcode == BS_OP_WRITE && subop->retval != subop->len)
|
if (subop->opcode == BS_OP_WRITE && subop->retval != subop->len)
|
||||||
{
|
{
|
||||||
// die
|
// die
|
||||||
throw std::runtime_error("local write operation failed");
|
throw std::runtime_error("local write operation failed (retval = "+std::to_string(subop->retval)+")");
|
||||||
}
|
}
|
||||||
handle_primary_subop(
|
handle_primary_subop(
|
||||||
subop->opcode == BS_OP_WRITE ? OSD_OP_SECONDARY_WRITE : OSD_OP_SECONDARY_READ,
|
subop->opcode == BS_OP_WRITE ? OSD_OP_SECONDARY_WRITE : OSD_OP_SECONDARY_READ,
|
||||||
|
@ -298,17 +299,19 @@ void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t*
|
||||||
}
|
}
|
||||||
subops[subop].callback = [cur_op, this](osd_op_t *subop)
|
subops[subop].callback = [cur_op, this](osd_op_t *subop)
|
||||||
{
|
{
|
||||||
|
int fail_fd = subop->req.hdr.opcode == OSD_OP_SECONDARY_WRITE &&
|
||||||
|
subop->reply.hdr.retval != subop->req.sec_rw.len ? subop->peer_fd : -1;
|
||||||
// so it doesn't get freed
|
// so it doesn't get freed
|
||||||
subop->buf = NULL;
|
subop->buf = NULL;
|
||||||
if (subop->req.hdr.opcode == OSD_OP_SECONDARY_WRITE && cur_op->reply.hdr.retval != cur_op->req.sec_rw.len)
|
|
||||||
{
|
|
||||||
// write operation failed, drop the connection
|
|
||||||
stop_client(subop->peer_fd);
|
|
||||||
}
|
|
||||||
handle_primary_subop(
|
handle_primary_subop(
|
||||||
subop->req.hdr.opcode, cur_op, subop->reply.hdr.retval,
|
subop->req.hdr.opcode, cur_op, subop->reply.hdr.retval,
|
||||||
subop->req.sec_rw.len, subop->reply.sec_rw.version
|
subop->req.sec_rw.len, subop->reply.sec_rw.version
|
||||||
);
|
);
|
||||||
|
if (fail_fd >= 0)
|
||||||
|
{
|
||||||
|
// write operation failed, drop the connection
|
||||||
|
stop_client(fail_fd);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
outbox_push(clients[subops[subop].peer_fd], &subops[subop]);
|
outbox_push(clients[subops[subop].peer_fd], &subops[subop]);
|
||||||
}
|
}
|
||||||
|
@ -322,8 +325,11 @@ void osd_t::handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int retval,
|
||||||
osd_primary_op_data_t *op_data = cur_op->op_data;
|
osd_primary_op_data_t *op_data = cur_op->op_data;
|
||||||
if (retval != expected)
|
if (retval != expected)
|
||||||
{
|
{
|
||||||
|
printf("%s subop failed: retval = %d (expected %d)\n", osd_op_names[opcode], retval, expected);
|
||||||
if (retval == -EPIPE)
|
if (retval == -EPIPE)
|
||||||
|
{
|
||||||
op_data->epipe++;
|
op_data->epipe++;
|
||||||
|
}
|
||||||
op_data->errors++;
|
op_data->errors++;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -565,7 +571,7 @@ resume_6:
|
||||||
op_data->st = 6;
|
op_data->st = 6;
|
||||||
return;
|
return;
|
||||||
resume_7:
|
resume_7:
|
||||||
// FIXME: Free them correctly (via a destructor or so)
|
// FIXME: Free those in the destructor?
|
||||||
delete op_data->unstable_write_osds;
|
delete op_data->unstable_write_osds;
|
||||||
delete[] op_data->unstable_writes;
|
delete[] op_data->unstable_writes;
|
||||||
op_data->unstable_writes = NULL;
|
op_data->unstable_writes = NULL;
|
||||||
|
@ -796,12 +802,13 @@ void osd_t::submit_primary_sync_subops(osd_op_t *cur_op)
|
||||||
};
|
};
|
||||||
subops[i].callback = [cur_op, this](osd_op_t *subop)
|
subops[i].callback = [cur_op, this](osd_op_t *subop)
|
||||||
{
|
{
|
||||||
if (cur_op->reply.hdr.retval != 0)
|
int fail_fd = subop->reply.hdr.retval != 0 ? subop->peer_fd : 0;
|
||||||
|
handle_primary_subop(OSD_OP_SECONDARY_SYNC, cur_op, subop->reply.hdr.retval, 0, 0);
|
||||||
|
if (fail_fd >= 0)
|
||||||
{
|
{
|
||||||
// sync operation failed, drop the connection
|
// sync operation failed, drop the connection
|
||||||
stop_client(subop->peer_fd);
|
stop_client(fail_fd);
|
||||||
}
|
}
|
||||||
handle_primary_subop(OSD_OP_SECONDARY_SYNC, cur_op, subop->reply.hdr.retval, 0, 0);
|
|
||||||
};
|
};
|
||||||
outbox_push(clients[subops[i].peer_fd], &subops[i]);
|
outbox_push(clients[subops[i].peer_fd], &subops[i]);
|
||||||
}
|
}
|
||||||
|
@ -853,12 +860,13 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op)
|
||||||
subops[i].send_list.push_back(op_data->unstable_writes + stab_osd.start, stab_osd.len * sizeof(obj_ver_id));
|
subops[i].send_list.push_back(op_data->unstable_writes + stab_osd.start, stab_osd.len * sizeof(obj_ver_id));
|
||||||
subops[i].callback = [cur_op, this](osd_op_t *subop)
|
subops[i].callback = [cur_op, this](osd_op_t *subop)
|
||||||
{
|
{
|
||||||
if (cur_op->reply.hdr.retval != 0)
|
int fail_fd = subop->reply.hdr.retval != 0 ? subop->peer_fd : 0;
|
||||||
|
handle_primary_subop(OSD_OP_SECONDARY_STABILIZE, cur_op, subop->reply.hdr.retval, 0, 0);
|
||||||
|
if (fail_fd >= 0)
|
||||||
{
|
{
|
||||||
// sync operation failed, drop the connection
|
// sync operation failed, drop the connection
|
||||||
stop_client(subop->peer_fd);
|
stop_client(fail_fd);
|
||||||
}
|
}
|
||||||
handle_primary_subop(OSD_OP_SECONDARY_STABILIZE, cur_op, subop->reply.hdr.retval, 0, 0);
|
|
||||||
};
|
};
|
||||||
outbox_push(clients[subops[i].peer_fd], &subops[i]);
|
outbox_push(clients[subops[i].peer_fd], &subops[i]);
|
||||||
}
|
}
|
||||||
|
|
|
@ -247,6 +247,7 @@ void osd_t::handle_reply_hdr(osd_client_t *cl)
|
||||||
(tv_end.tv_sec - op->tv_begin.tv_sec)*1000000 +
|
(tv_end.tv_sec - op->tv_begin.tv_sec)*1000000 +
|
||||||
(tv_end.tv_nsec - op->tv_begin.tv_nsec)/1000
|
(tv_end.tv_nsec - op->tv_begin.tv_nsec)/1000
|
||||||
);
|
);
|
||||||
op->callback(op);
|
// Copy lambda to be unaffected by `delete op`
|
||||||
|
std::function<void(osd_op_t*)>(op->callback)(op);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue