forked from vitalif/vitastor
Implement flushing (stabilize/rollback) of unstable entries on start of the PG
parent
46f9bd2a69
commit
21d0b06959
|
@ -440,7 +440,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
|
|||
{
|
||||
journal_entry *je = (journal_entry*)(buf + proc_pos - done_pos + pos);
|
||||
if (je->magic != JOURNAL_MAGIC || je_crc32(je) != je->crc32 ||
|
||||
je->type < JE_SMALL_WRITE || je->type > JE_DELETE || started && je->crc32_prev != crc32_last)
|
||||
je->type < JE_MIN || je->type > JE_MAX || started && je->crc32_prev != crc32_last)
|
||||
{
|
||||
if (pos == 0)
|
||||
{
|
||||
|
|
|
@ -12,12 +12,14 @@
|
|||
// Journal entries
|
||||
// Journal entries are linked to each other by their crc32 value
|
||||
// The journal is almost a blockchain, because object versions constantly increase
|
||||
#define JE_MIN 0x01
|
||||
#define JE_START 0x01
|
||||
#define JE_SMALL_WRITE 0x02
|
||||
#define JE_BIG_WRITE 0x03
|
||||
#define JE_STABLE 0x04
|
||||
#define JE_DELETE 0x05
|
||||
#define JE_ROLLBACK 0x06
|
||||
#define JE_MAX 0x06
|
||||
|
||||
// crc32c comes first to ease calculation and is equal to crc32()
|
||||
struct __attribute__((__packed__)) journal_entry_start
|
||||
|
|
|
@ -21,6 +21,7 @@ int blockstore_impl_t::dequeue_rollback(blockstore_op_t *op)
|
|||
if (v->version == 0)
|
||||
{
|
||||
// Already rolled back
|
||||
// FIXME Skip this object version
|
||||
}
|
||||
bad_op:
|
||||
op->retval = -EINVAL;
|
||||
|
@ -147,7 +148,7 @@ resume_3:
|
|||
ring_data_t *data = ((ring_data_t*)sqe->user_data);
|
||||
my_uring_prep_fsync(sqe, journal.fd, IORING_FSYNC_DATASYNC);
|
||||
data->iov = { 0 };
|
||||
data->callback = [this, op](ring_data_t *data) { handle_stable_event(data, op); };
|
||||
data->callback = [this, op](ring_data_t *data) { handle_rollback_event(data, op); };
|
||||
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0;
|
||||
PRIV(op)->pending_ops = 1;
|
||||
PRIV(op)->op_state = 4;
|
||||
|
@ -163,18 +164,18 @@ resume_5:
|
|||
.oid = v->oid,
|
||||
.version = UINT64_MAX,
|
||||
});
|
||||
rm_end--;
|
||||
auto rm_start = rm_end;
|
||||
assert(rm_start != dirty_db.begin());
|
||||
rm_start--;
|
||||
while (1)
|
||||
{
|
||||
if (rm_end->first.oid != v->oid)
|
||||
if (rm_start->first.oid != v->oid)
|
||||
break;
|
||||
else if (rm_end->first.version <= v->version)
|
||||
else if (rm_start->first.version <= v->version)
|
||||
break;
|
||||
rm_start = rm_end;
|
||||
if (rm_end == dirty_db.begin())
|
||||
if (rm_start == dirty_db.begin())
|
||||
break;
|
||||
rm_end--;
|
||||
rm_start--;
|
||||
}
|
||||
if (rm_end != rm_start)
|
||||
erase_dirty(rm_start, rm_end, UINT64_MAX);
|
||||
|
@ -202,7 +203,7 @@ void blockstore_impl_t::handle_rollback_event(ring_data_t *data, blockstore_op_t
|
|||
if (PRIV(op)->pending_ops == 0)
|
||||
{
|
||||
PRIV(op)->op_state++;
|
||||
if (!continue_stable(op))
|
||||
if (!continue_rollback(op))
|
||||
{
|
||||
submit_queue.push_front(op);
|
||||
}
|
||||
|
|
3
osd.cpp
3
osd.cpp
|
@ -327,7 +327,7 @@ void osd_t::stop_client(int peer_fd)
|
|||
cancel_osd_ops(cl);
|
||||
osd_peer_fds.erase(cl.osd_num);
|
||||
repeer_pgs(cl.osd_num, false);
|
||||
peering_state |= OSD_PEERING_PEERS;
|
||||
peering_state |= OSD_CONNECTING_PEERS;
|
||||
}
|
||||
if (cl.read_op)
|
||||
{
|
||||
|
@ -388,6 +388,7 @@ void osd_t::exec_op(osd_op_t *cur_op)
|
|||
{
|
||||
exec_show_config(cur_op);
|
||||
}
|
||||
// FIXME: Do not handle operations immediately, manage some sort of a queue instead
|
||||
else if (cur_op->req.hdr.opcode == OSD_OP_READ)
|
||||
{
|
||||
continue_primary_read(cur_op);
|
||||
|
|
11
osd.h
11
osd.h
|
@ -32,8 +32,10 @@
|
|||
|
||||
#define PEER_CONNECTING 1
|
||||
#define PEER_CONNECTED 2
|
||||
#define OSD_PEERING_PEERS 1
|
||||
|
||||
#define OSD_CONNECTING_PEERS 1
|
||||
#define OSD_PEERING_PGS 2
|
||||
#define OSD_FLUSHING_PGS 4
|
||||
|
||||
#define IMMEDIATE_NONE 0
|
||||
#define IMMEDIATE_SMALL 1
|
||||
|
@ -182,7 +184,7 @@ class osd_t
|
|||
// peer OSDs
|
||||
|
||||
std::map<uint64_t, int> osd_peer_fds;
|
||||
std::vector<pg_t> pgs;
|
||||
std::map<pg_num_t, pg_t> pgs;
|
||||
int peering_state = 0;
|
||||
unsigned pg_count = 0;
|
||||
uint64_t next_subop_id = 1;
|
||||
|
@ -241,7 +243,10 @@ class osd_t
|
|||
void init_primary();
|
||||
void handle_peers();
|
||||
void repeer_pgs(osd_num_t osd_num, bool is_connected);
|
||||
void start_pg_peering(int i);
|
||||
void start_pg_peering(pg_num_t pg_num);
|
||||
void submit_pg_flush_ops(pg_num_t pg_num);
|
||||
void handle_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, osd_num_t osd_num, bool ok);
|
||||
void submit_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback, osd_num_t osd_num, int count, obj_ver_id *data);
|
||||
|
||||
// op execution
|
||||
void exec_op(osd_op_t *cur_op);
|
||||
|
|
|
@ -169,6 +169,7 @@ struct __attribute__((__packed__)) osd_reply_sync_t
|
|||
osd_reply_header_t header;
|
||||
};
|
||||
|
||||
// FIXME it would be interesting to try to unify blockstore_op and osd_op formats
|
||||
union osd_any_op_t
|
||||
{
|
||||
osd_op_header_t hdr;
|
||||
|
|
262
osd_peering.cpp
262
osd_peering.cpp
|
@ -21,15 +21,15 @@ void osd_t::init_primary()
|
|||
}
|
||||
if (peers.size() < 2)
|
||||
throw std::runtime_error("run_primary requires at least 2 peers");
|
||||
pgs.push_back((pg_t){
|
||||
pgs[1] = (pg_t){
|
||||
.state = PG_OFFLINE,
|
||||
.pg_cursize = 0,
|
||||
.pg_num = 1,
|
||||
.target_set = { 1, 2, 3 },
|
||||
.cur_set = { 1, 0, 0 },
|
||||
});
|
||||
};
|
||||
pg_count = 1;
|
||||
peering_state = OSD_PEERING_PEERS;
|
||||
peering_state = OSD_CONNECTING_PEERS;
|
||||
}
|
||||
|
||||
osd_peer_def_t osd_t::parse_peer(std::string peer)
|
||||
|
@ -132,7 +132,7 @@ void osd_t::handle_connect_result(int peer_fd)
|
|||
// Peering loop
|
||||
void osd_t::handle_peers()
|
||||
{
|
||||
if (peering_state & OSD_PEERING_PEERS)
|
||||
if (peering_state & OSD_CONNECTING_PEERS)
|
||||
{
|
||||
for (int i = 0; i < peers.size(); i++)
|
||||
{
|
||||
|
@ -158,7 +158,7 @@ void osd_t::handle_peers()
|
|||
if (i >= peers.size())
|
||||
{
|
||||
// Connected to all peers
|
||||
peering_state = peering_state & ~OSD_PEERING_PEERS;
|
||||
peering_state = peering_state & ~OSD_CONNECTING_PEERS;
|
||||
}
|
||||
repeer_pgs(osd_num, true);
|
||||
});
|
||||
|
@ -167,27 +167,239 @@ void osd_t::handle_peers()
|
|||
}
|
||||
if (peering_state & OSD_PEERING_PGS)
|
||||
{
|
||||
bool still_doing_pgs = false;
|
||||
for (int i = 0; i < pgs.size(); i++)
|
||||
bool still = false;
|
||||
for (auto & p: pgs)
|
||||
{
|
||||
if (pgs[i].state == PG_PEERING)
|
||||
if (p.second.state == PG_PEERING)
|
||||
{
|
||||
if (!pgs[i].peering_state->list_ops.size())
|
||||
if (!p.second.peering_state->list_ops.size())
|
||||
{
|
||||
pgs[i].calc_object_states();
|
||||
p.second.calc_object_states();
|
||||
if (p.second.state & PG_HAS_UNCLEAN)
|
||||
{
|
||||
peering_state = peering_state | OSD_FLUSHING_PGS;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
still_doing_pgs = true;
|
||||
still = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!still_doing_pgs)
|
||||
if (!still)
|
||||
{
|
||||
// Done all PGs
|
||||
peering_state = peering_state & ~OSD_PEERING_PGS;
|
||||
}
|
||||
}
|
||||
if (peering_state & OSD_FLUSHING_PGS)
|
||||
{
|
||||
bool still = false;
|
||||
for (auto & p: pgs)
|
||||
{
|
||||
if (p.second.state & PG_HAS_UNCLEAN)
|
||||
{
|
||||
if (!p.second.flush_batch)
|
||||
{
|
||||
submit_pg_flush_ops(p.first);
|
||||
}
|
||||
still = true;
|
||||
}
|
||||
}
|
||||
if (!still)
|
||||
{
|
||||
peering_state = peering_state & ~OSD_FLUSHING_PGS;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#define FLUSH_BATCH 512
|
||||
|
||||
struct pg_flush_batch_t
|
||||
{
|
||||
std::map<osd_num_t, std::vector<obj_ver_id>> rollback_lists;
|
||||
std::map<osd_num_t, std::vector<obj_ver_id>> stable_lists;
|
||||
int flush_ops = 0, flush_done = 0;
|
||||
int flush_objects = 0;
|
||||
};
|
||||
|
||||
void osd_t::submit_pg_flush_ops(pg_num_t pg_num)
|
||||
{
|
||||
pg_t & pg = pgs[pg_num];
|
||||
pg_flush_batch_t *fb = new pg_flush_batch_t();
|
||||
pg.flush_batch = fb;
|
||||
auto it = pg.flush_actions.begin(), prev_it = pg.flush_actions.begin();
|
||||
bool first = true;
|
||||
while (it != pg.flush_actions.end())
|
||||
{
|
||||
if (!first && (it->first.oid.inode != prev_it->first.oid.inode ||
|
||||
(it->first.oid.stripe & ~STRIPE_MASK) != (prev_it->first.oid.stripe & ~STRIPE_MASK)) &&
|
||||
fb->rollback_lists[it->first.osd_num].size() >= FLUSH_BATCH ||
|
||||
fb->stable_lists[it->first.osd_num].size() >= FLUSH_BATCH)
|
||||
{
|
||||
// Stop only at the object boundary
|
||||
break;
|
||||
}
|
||||
it->second.submitted = true;
|
||||
if (it->second.rollback)
|
||||
{
|
||||
fb->flush_objects++;
|
||||
fb->rollback_lists[it->first.osd_num].push_back((obj_ver_id){
|
||||
.oid = it->first.oid,
|
||||
.version = it->second.rollback_to,
|
||||
});
|
||||
}
|
||||
if (it->second.make_stable)
|
||||
{
|
||||
fb->flush_objects++;
|
||||
fb->stable_lists[it->first.osd_num].push_back((obj_ver_id){
|
||||
.oid = it->first.oid,
|
||||
.version = it->second.stable_to,
|
||||
});
|
||||
}
|
||||
prev_it = it;
|
||||
first = false;
|
||||
it++;
|
||||
}
|
||||
for (auto & l: fb->rollback_lists)
|
||||
{
|
||||
if (l.second.size() > 0)
|
||||
{
|
||||
fb->flush_ops++;
|
||||
submit_flush_op(pg.pg_num, fb, true, l.first, l.second.size(), l.second.data());
|
||||
}
|
||||
}
|
||||
for (auto & l: fb->stable_lists)
|
||||
{
|
||||
if (l.second.size() > 0)
|
||||
{
|
||||
fb->flush_ops++;
|
||||
submit_flush_op(pg.pg_num, fb, false, l.first, l.second.size(), l.second.data());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void osd_t::handle_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, osd_num_t osd_num, bool ok)
|
||||
{
|
||||
if (pgs.find(pg_num) == pgs.end() || pgs[pg_num].flush_batch != fb)
|
||||
{
|
||||
// Throw the result away
|
||||
return;
|
||||
}
|
||||
if (!ok)
|
||||
{
|
||||
if (osd_num == this->osd_num)
|
||||
throw std::runtime_error("Error while doing local flush operation");
|
||||
else
|
||||
{
|
||||
assert(osd_peer_fds.find(osd_num) != osd_peer_fds.end());
|
||||
stop_client(osd_peer_fds[osd_num]);
|
||||
return;
|
||||
}
|
||||
}
|
||||
fb->flush_done++;
|
||||
if (fb->flush_done == fb->flush_ops)
|
||||
{
|
||||
// This flush batch is done
|
||||
std::vector<osd_op_t*> continue_ops;
|
||||
auto & pg = pgs[pg_num];
|
||||
auto it = pg.flush_actions.begin(), prev_it = it;
|
||||
auto erase_start = it;
|
||||
while (1)
|
||||
{
|
||||
if (it == pg.flush_actions.end() ||
|
||||
it->first.oid.inode != prev_it->first.oid.inode ||
|
||||
(it->first.oid.stripe & ~STRIPE_MASK) != (prev_it->first.oid.stripe & ~STRIPE_MASK))
|
||||
{
|
||||
auto wr_it = pg.write_queue.find((object_id){
|
||||
.inode = prev_it->first.oid.inode,
|
||||
.stripe = (prev_it->first.oid.stripe & ~STRIPE_MASK),
|
||||
});
|
||||
if (wr_it != pg.write_queue.end())
|
||||
{
|
||||
continue_ops.push_back(wr_it->second);
|
||||
pg.write_queue.erase(wr_it);
|
||||
}
|
||||
}
|
||||
if ((it == pg.flush_actions.end() || !it->second.submitted) &&
|
||||
erase_start != it)
|
||||
{
|
||||
pg.flush_actions.erase(erase_start, it);
|
||||
}
|
||||
if (it == pg.flush_actions.end())
|
||||
{
|
||||
break;
|
||||
}
|
||||
prev_it = it;
|
||||
if (!it->second.submitted)
|
||||
{
|
||||
it++;
|
||||
erase_start = it;
|
||||
}
|
||||
else
|
||||
{
|
||||
it++;
|
||||
}
|
||||
}
|
||||
delete fb;
|
||||
pg.flush_batch = NULL;
|
||||
if (!pg.flush_actions.size())
|
||||
{
|
||||
pg.state = pg.state & ~PG_HAS_UNCLEAN;
|
||||
}
|
||||
for (osd_op_t *op: continue_ops)
|
||||
{
|
||||
continue_primary_write(op);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void osd_t::submit_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback, osd_num_t osd_num, int count, obj_ver_id *data)
|
||||
{
|
||||
osd_op_t *op = new osd_op_t();
|
||||
// Copy buffer so it gets freed along with the operation
|
||||
op->buf = malloc(sizeof(obj_ver_id) * count);
|
||||
memcpy(op->buf, data, sizeof(obj_ver_id) * count);
|
||||
if (osd_num == this->osd_num)
|
||||
{
|
||||
// local
|
||||
op->bs_op = new blockstore_op_t({
|
||||
.opcode = (uint64_t)(rollback ? BS_OP_ROLLBACK : BS_OP_STABLE),
|
||||
.callback = [this, op, pg_num, fb](blockstore_op_t *bs_op)
|
||||
{
|
||||
handle_flush_op(pg_num, fb, this->osd_num, bs_op->retval == 0);
|
||||
delete op;
|
||||
},
|
||||
.len = (uint32_t)count,
|
||||
.buf = op->buf,
|
||||
});
|
||||
bs->enqueue_op(op->bs_op);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Peer
|
||||
int peer_fd = osd_peer_fds[osd_num];
|
||||
op->op_type = OSD_OP_OUT;
|
||||
op->send_list.push_back(op->req.buf, OSD_PACKET_SIZE);
|
||||
op->send_list.push_back(op->buf, count * sizeof(obj_ver_id));
|
||||
op->peer_fd = peer_fd;
|
||||
op->req = {
|
||||
.sec_stab = {
|
||||
.header = {
|
||||
.magic = SECONDARY_OSD_OP_MAGIC,
|
||||
.id = this->next_subop_id++,
|
||||
.opcode = (uint64_t)(rollback ? OSD_OP_SECONDARY_ROLLBACK : OSD_OP_SECONDARY_STABILIZE),
|
||||
},
|
||||
.len = count * sizeof(obj_ver_id),
|
||||
},
|
||||
};
|
||||
op->callback = [this, pg_num, fb](osd_op_t *op)
|
||||
{
|
||||
handle_flush_op(pg_num, fb, clients[op->peer_fd].osd_num, op->reply.hdr.retval == 0);
|
||||
delete op;
|
||||
};
|
||||
outbox_push(clients[peer_fd], op);
|
||||
}
|
||||
}
|
||||
|
||||
void osd_t::repeer_pgs(osd_num_t osd_num, bool is_connected)
|
||||
|
@ -195,15 +407,15 @@ void osd_t::repeer_pgs(osd_num_t osd_num, bool is_connected)
|
|||
// Re-peer affected PGs
|
||||
// FIXME: We shouldn't rely just on target_set. Other OSDs may also contain PG data.
|
||||
osd_num_t real_osd = (is_connected ? osd_num : 0);
|
||||
for (int i = 0; i < pgs.size(); i++)
|
||||
for (auto & p: pgs)
|
||||
{
|
||||
bool repeer = false;
|
||||
for (int r = 0; r < pgs[i].target_set.size(); r++)
|
||||
for (int r = 0; r < p.second.target_set.size(); r++)
|
||||
{
|
||||
if (pgs[i].target_set[r] == osd_num &&
|
||||
pgs[i].cur_set[r] != real_osd)
|
||||
if (p.second.target_set[r] == osd_num &&
|
||||
p.second.cur_set[r] != real_osd)
|
||||
{
|
||||
pgs[i].cur_set[r] = real_osd;
|
||||
p.second.cur_set[r] = real_osd;
|
||||
repeer = true;
|
||||
break;
|
||||
}
|
||||
|
@ -211,21 +423,25 @@ void osd_t::repeer_pgs(osd_num_t osd_num, bool is_connected)
|
|||
if (repeer)
|
||||
{
|
||||
// Repeer this pg
|
||||
printf("Repeer PG %d because of OSD %lu\n", i, osd_num);
|
||||
start_pg_peering(i);
|
||||
printf("Repeer PG %d because of OSD %lu\n", p.second.pg_num, osd_num);
|
||||
start_pg_peering(p.second.pg_num);
|
||||
peering_state |= OSD_PEERING_PGS;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Repeer on each connect/disconnect peer event
|
||||
void osd_t::start_pg_peering(int pg_idx)
|
||||
void osd_t::start_pg_peering(pg_num_t pg_num)
|
||||
{
|
||||
auto & pg = pgs[pg_idx];
|
||||
auto & pg = pgs[pg_num];
|
||||
pg.state = PG_PEERING;
|
||||
pg.state_dict.clear();
|
||||
pg.obj_states.clear();
|
||||
pg.ver_override.clear();
|
||||
pg.flush_actions.clear();
|
||||
if (pg.flush_batch)
|
||||
delete pg.flush_batch;
|
||||
pg.flush_batch = NULL;
|
||||
pg.pg_cursize = 0;
|
||||
for (int role = 0; role < pg.cur_set.size(); role++)
|
||||
{
|
||||
|
@ -330,8 +546,8 @@ void osd_t::start_pg_peering(int pg_idx)
|
|||
op->bs_op = new blockstore_op_t();
|
||||
op->bs_op->opcode = BS_OP_LIST;
|
||||
op->bs_op->oid.stripe = parity_block_size;
|
||||
op->bs_op->len = pg_count,
|
||||
op->bs_op->offset = pg.pg_num-1,
|
||||
op->bs_op->len = pg_count;
|
||||
op->bs_op->offset = pg.pg_num-1;
|
||||
op->bs_op->callback = [ps, op, role_osd](blockstore_op_t *bs_op)
|
||||
{
|
||||
if (op->bs_op->retval < 0)
|
||||
|
|
|
@ -134,7 +134,7 @@ void pg_t::remember_object(pg_obj_state_check_t &st, std::vector<obj_ver_role> &
|
|||
auto & pcs = pp.second;
|
||||
if (pcs.stable_ver < pcs.max_ver)
|
||||
{
|
||||
auto & act = obj_stab_actions[pp.first];
|
||||
auto & act = flush_actions[pp.first];
|
||||
if (pcs.max_ver > st.target_ver)
|
||||
{
|
||||
act.rollback = true;
|
||||
|
|
|
@ -102,12 +102,15 @@ struct obj_piece_ver_t
|
|||
uint64_t stable_ver = 0;
|
||||
};
|
||||
|
||||
struct obj_stab_action_t
|
||||
struct flush_action_t
|
||||
{
|
||||
bool rollback = false, make_stable = false;
|
||||
uint64_t stable_to = 0, rollback_to = 0;
|
||||
bool submitted = false;
|
||||
};
|
||||
|
||||
struct pg_flush_batch_t;
|
||||
|
||||
struct pg_t
|
||||
{
|
||||
int state;
|
||||
|
@ -125,9 +128,11 @@ struct pg_t
|
|||
// which is up to ~192 MB per 1 TB in the worst case scenario
|
||||
std::map<pg_osd_set_t, pg_osd_set_state_t> state_dict;
|
||||
btree::btree_map<object_id, pg_osd_set_state_t*> obj_states;
|
||||
std::map<obj_piece_id_t, obj_stab_action_t> obj_stab_actions;
|
||||
std::map<obj_piece_id_t, flush_action_t> flush_actions;
|
||||
btree::btree_map<object_id, uint64_t> ver_override;
|
||||
pg_peering_state_t *peering_state = NULL;
|
||||
pg_flush_batch_t *flush_batch = NULL;
|
||||
int flush_actions_in_progress = 0;
|
||||
|
||||
std::multimap<object_id, osd_op_t*> write_queue;
|
||||
|
||||
|
|
|
@ -60,10 +60,9 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op)
|
|||
// Our EC scheme stores data in fixed chunks equal to (K*block size)
|
||||
// But we must not use K in the process of calculating the PG number
|
||||
// So we calculate the PG number using a separate setting which should be per-inode (FIXME)
|
||||
// FIXME Real pg_num should equal the below expression + 1
|
||||
pg_num_t pg_num = (cur_op->req.rw.inode + cur_op->req.rw.offset / parity_block_size) % pg_count;
|
||||
pg_num_t pg_num = (cur_op->req.rw.inode + cur_op->req.rw.offset / parity_block_size) % pg_count + 1;
|
||||
// FIXME: Postpone operations in inactive PGs
|
||||
if (pg_num > pgs.size() || !(pgs[pg_num].state & PG_ACTIVE))
|
||||
if (pgs.find(pg_num) == pgs.end() || !(pgs[pg_num].state & PG_ACTIVE))
|
||||
{
|
||||
finish_primary_op(cur_op, -EINVAL);
|
||||
return false;
|
||||
|
@ -331,17 +330,15 @@ void osd_t::continue_primary_write(osd_op_t *cur_op)
|
|||
assert(op_data->st == 0);
|
||||
// Check if actions are pending for this object
|
||||
{
|
||||
auto act_it = pg.obj_stab_actions.lower_bound((obj_piece_id_t){
|
||||
auto act_it = pg.flush_actions.lower_bound((obj_piece_id_t){
|
||||
.oid = op_data->oid,
|
||||
.osd_num = 0,
|
||||
});
|
||||
if (act_it != pg.obj_stab_actions.end() &&
|
||||
if (act_it != pg.flush_actions.end() &&
|
||||
act_it->first.oid.inode == op_data->oid.inode &&
|
||||
(act_it->first.oid.stripe & ~STRIPE_MASK) == op_data->oid.stripe)
|
||||
{
|
||||
// FIXME postpone the request until actions are done
|
||||
free(op_data);
|
||||
finish_primary_op(cur_op, -EIO);
|
||||
pg.write_queue.emplace(op_data->oid, cur_op);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue