diff --git a/blockstore_init.cpp b/blockstore_init.cpp index 0098465e..6425674e 100644 --- a/blockstore_init.cpp +++ b/blockstore_init.cpp @@ -440,7 +440,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u { journal_entry *je = (journal_entry*)(buf + proc_pos - done_pos + pos); if (je->magic != JOURNAL_MAGIC || je_crc32(je) != je->crc32 || - je->type < JE_SMALL_WRITE || je->type > JE_DELETE || started && je->crc32_prev != crc32_last) + je->type < JE_MIN || je->type > JE_MAX || started && je->crc32_prev != crc32_last) { if (pos == 0) { diff --git a/blockstore_journal.h b/blockstore_journal.h index 94169847..6851e9cb 100644 --- a/blockstore_journal.h +++ b/blockstore_journal.h @@ -12,12 +12,14 @@ // Journal entries // Journal entries are linked to each other by their crc32 value // The journal is almost a blockchain, because object versions constantly increase +#define JE_MIN 0x01 #define JE_START 0x01 #define JE_SMALL_WRITE 0x02 #define JE_BIG_WRITE 0x03 #define JE_STABLE 0x04 #define JE_DELETE 0x05 #define JE_ROLLBACK 0x06 +#define JE_MAX 0x06 // crc32c comes first to ease calculation and is equal to crc32() struct __attribute__((__packed__)) journal_entry_start diff --git a/blockstore_rollback.cpp b/blockstore_rollback.cpp index bdb8b812..d7821e2b 100644 --- a/blockstore_rollback.cpp +++ b/blockstore_rollback.cpp @@ -21,6 +21,7 @@ int blockstore_impl_t::dequeue_rollback(blockstore_op_t *op) if (v->version == 0) { // Already rolled back + // FIXME Skip this object version } bad_op: op->retval = -EINVAL; @@ -147,7 +148,7 @@ resume_3: ring_data_t *data = ((ring_data_t*)sqe->user_data); my_uring_prep_fsync(sqe, journal.fd, IORING_FSYNC_DATASYNC); data->iov = { 0 }; - data->callback = [this, op](ring_data_t *data) { handle_stable_event(data, op); }; + data->callback = [this, op](ring_data_t *data) { handle_rollback_event(data, op); }; PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0; PRIV(op)->pending_ops = 1; PRIV(op)->op_state = 4; @@ -163,18 +164,18 @@ resume_5: .oid = v->oid, .version = UINT64_MAX, }); - rm_end--; auto rm_start = rm_end; + assert(rm_start != dirty_db.begin()); + rm_start--; while (1) { - if (rm_end->first.oid != v->oid) + if (rm_start->first.oid != v->oid) break; - else if (rm_end->first.version <= v->version) + else if (rm_start->first.version <= v->version) break; - rm_start = rm_end; - if (rm_end == dirty_db.begin()) + if (rm_start == dirty_db.begin()) break; - rm_end--; + rm_start--; } if (rm_end != rm_start) erase_dirty(rm_start, rm_end, UINT64_MAX); @@ -202,7 +203,7 @@ void blockstore_impl_t::handle_rollback_event(ring_data_t *data, blockstore_op_t if (PRIV(op)->pending_ops == 0) { PRIV(op)->op_state++; - if (!continue_stable(op)) + if (!continue_rollback(op)) { submit_queue.push_front(op); } diff --git a/osd.cpp b/osd.cpp index 720aca6d..25b53136 100644 --- a/osd.cpp +++ b/osd.cpp @@ -327,7 +327,7 @@ void osd_t::stop_client(int peer_fd) cancel_osd_ops(cl); osd_peer_fds.erase(cl.osd_num); repeer_pgs(cl.osd_num, false); - peering_state |= OSD_PEERING_PEERS; + peering_state |= OSD_CONNECTING_PEERS; } if (cl.read_op) { @@ -388,6 +388,7 @@ void osd_t::exec_op(osd_op_t *cur_op) { exec_show_config(cur_op); } + // FIXME: Do not handle operations immediately, manage some sort of a queue instead else if (cur_op->req.hdr.opcode == OSD_OP_READ) { continue_primary_read(cur_op); diff --git a/osd.h b/osd.h index a640c2ac..f53e46d2 100644 --- a/osd.h +++ b/osd.h @@ -32,8 +32,10 @@ #define PEER_CONNECTING 1 #define PEER_CONNECTED 2 -#define OSD_PEERING_PEERS 1 + +#define OSD_CONNECTING_PEERS 1 #define OSD_PEERING_PGS 2 +#define OSD_FLUSHING_PGS 4 #define IMMEDIATE_NONE 0 #define IMMEDIATE_SMALL 1 @@ -182,7 +184,7 @@ class osd_t // peer OSDs std::map osd_peer_fds; - std::vector pgs; + std::map pgs; int peering_state = 0; unsigned pg_count = 0; uint64_t next_subop_id = 1; @@ -241,7 +243,10 @@ class osd_t void init_primary(); void handle_peers(); void repeer_pgs(osd_num_t osd_num, bool is_connected); - void start_pg_peering(int i); + void start_pg_peering(pg_num_t pg_num); + void submit_pg_flush_ops(pg_num_t pg_num); + void handle_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, osd_num_t osd_num, bool ok); + void submit_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback, osd_num_t osd_num, int count, obj_ver_id *data); // op execution void exec_op(osd_op_t *cur_op); diff --git a/osd_ops.h b/osd_ops.h index 7155fda4..863ab2e4 100644 --- a/osd_ops.h +++ b/osd_ops.h @@ -169,6 +169,7 @@ struct __attribute__((__packed__)) osd_reply_sync_t osd_reply_header_t header; }; +// FIXME it would be interesting to try to unify blockstore_op and osd_op formats union osd_any_op_t { osd_op_header_t hdr; diff --git a/osd_peering.cpp b/osd_peering.cpp index ae4b89b8..15976dea 100644 --- a/osd_peering.cpp +++ b/osd_peering.cpp @@ -21,15 +21,15 @@ void osd_t::init_primary() } if (peers.size() < 2) throw std::runtime_error("run_primary requires at least 2 peers"); - pgs.push_back((pg_t){ + pgs[1] = (pg_t){ .state = PG_OFFLINE, .pg_cursize = 0, .pg_num = 1, .target_set = { 1, 2, 3 }, .cur_set = { 1, 0, 0 }, - }); + }; pg_count = 1; - peering_state = OSD_PEERING_PEERS; + peering_state = OSD_CONNECTING_PEERS; } osd_peer_def_t osd_t::parse_peer(std::string peer) @@ -132,7 +132,7 @@ void osd_t::handle_connect_result(int peer_fd) // Peering loop void osd_t::handle_peers() { - if (peering_state & OSD_PEERING_PEERS) + if (peering_state & OSD_CONNECTING_PEERS) { for (int i = 0; i < peers.size(); i++) { @@ -158,7 +158,7 @@ void osd_t::handle_peers() if (i >= peers.size()) { // Connected to all peers - peering_state = peering_state & ~OSD_PEERING_PEERS; + peering_state = peering_state & ~OSD_CONNECTING_PEERS; } repeer_pgs(osd_num, true); }); @@ -167,27 +167,239 @@ void osd_t::handle_peers() } if (peering_state & OSD_PEERING_PGS) { - bool still_doing_pgs = false; - for (int i = 0; i < pgs.size(); i++) + bool still = false; + for (auto & p: pgs) { - if (pgs[i].state == PG_PEERING) + if (p.second.state == PG_PEERING) { - if (!pgs[i].peering_state->list_ops.size()) + if (!p.second.peering_state->list_ops.size()) { - pgs[i].calc_object_states(); + p.second.calc_object_states(); + if (p.second.state & PG_HAS_UNCLEAN) + { + peering_state = peering_state | OSD_FLUSHING_PGS; + } } else { - still_doing_pgs = true; + still = true; } } } - if (!still_doing_pgs) + if (!still) { // Done all PGs peering_state = peering_state & ~OSD_PEERING_PGS; } } + if (peering_state & OSD_FLUSHING_PGS) + { + bool still = false; + for (auto & p: pgs) + { + if (p.second.state & PG_HAS_UNCLEAN) + { + if (!p.second.flush_batch) + { + submit_pg_flush_ops(p.first); + } + still = true; + } + } + if (!still) + { + peering_state = peering_state & ~OSD_FLUSHING_PGS; + } + } +} + +#define FLUSH_BATCH 512 + +struct pg_flush_batch_t +{ + std::map> rollback_lists; + std::map> stable_lists; + int flush_ops = 0, flush_done = 0; + int flush_objects = 0; +}; + +void osd_t::submit_pg_flush_ops(pg_num_t pg_num) +{ + pg_t & pg = pgs[pg_num]; + pg_flush_batch_t *fb = new pg_flush_batch_t(); + pg.flush_batch = fb; + auto it = pg.flush_actions.begin(), prev_it = pg.flush_actions.begin(); + bool first = true; + while (it != pg.flush_actions.end()) + { + if (!first && (it->first.oid.inode != prev_it->first.oid.inode || + (it->first.oid.stripe & ~STRIPE_MASK) != (prev_it->first.oid.stripe & ~STRIPE_MASK)) && + fb->rollback_lists[it->first.osd_num].size() >= FLUSH_BATCH || + fb->stable_lists[it->first.osd_num].size() >= FLUSH_BATCH) + { + // Stop only at the object boundary + break; + } + it->second.submitted = true; + if (it->second.rollback) + { + fb->flush_objects++; + fb->rollback_lists[it->first.osd_num].push_back((obj_ver_id){ + .oid = it->first.oid, + .version = it->second.rollback_to, + }); + } + if (it->second.make_stable) + { + fb->flush_objects++; + fb->stable_lists[it->first.osd_num].push_back((obj_ver_id){ + .oid = it->first.oid, + .version = it->second.stable_to, + }); + } + prev_it = it; + first = false; + it++; + } + for (auto & l: fb->rollback_lists) + { + if (l.second.size() > 0) + { + fb->flush_ops++; + submit_flush_op(pg.pg_num, fb, true, l.first, l.second.size(), l.second.data()); + } + } + for (auto & l: fb->stable_lists) + { + if (l.second.size() > 0) + { + fb->flush_ops++; + submit_flush_op(pg.pg_num, fb, false, l.first, l.second.size(), l.second.data()); + } + } +} + +void osd_t::handle_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, osd_num_t osd_num, bool ok) +{ + if (pgs.find(pg_num) == pgs.end() || pgs[pg_num].flush_batch != fb) + { + // Throw the result away + return; + } + if (!ok) + { + if (osd_num == this->osd_num) + throw std::runtime_error("Error while doing local flush operation"); + else + { + assert(osd_peer_fds.find(osd_num) != osd_peer_fds.end()); + stop_client(osd_peer_fds[osd_num]); + return; + } + } + fb->flush_done++; + if (fb->flush_done == fb->flush_ops) + { + // This flush batch is done + std::vector continue_ops; + auto & pg = pgs[pg_num]; + auto it = pg.flush_actions.begin(), prev_it = it; + auto erase_start = it; + while (1) + { + if (it == pg.flush_actions.end() || + it->first.oid.inode != prev_it->first.oid.inode || + (it->first.oid.stripe & ~STRIPE_MASK) != (prev_it->first.oid.stripe & ~STRIPE_MASK)) + { + auto wr_it = pg.write_queue.find((object_id){ + .inode = prev_it->first.oid.inode, + .stripe = (prev_it->first.oid.stripe & ~STRIPE_MASK), + }); + if (wr_it != pg.write_queue.end()) + { + continue_ops.push_back(wr_it->second); + pg.write_queue.erase(wr_it); + } + } + if ((it == pg.flush_actions.end() || !it->second.submitted) && + erase_start != it) + { + pg.flush_actions.erase(erase_start, it); + } + if (it == pg.flush_actions.end()) + { + break; + } + prev_it = it; + if (!it->second.submitted) + { + it++; + erase_start = it; + } + else + { + it++; + } + } + delete fb; + pg.flush_batch = NULL; + if (!pg.flush_actions.size()) + { + pg.state = pg.state & ~PG_HAS_UNCLEAN; + } + for (osd_op_t *op: continue_ops) + { + continue_primary_write(op); + } + } +} + +void osd_t::submit_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback, osd_num_t osd_num, int count, obj_ver_id *data) +{ + osd_op_t *op = new osd_op_t(); + // Copy buffer so it gets freed along with the operation + op->buf = malloc(sizeof(obj_ver_id) * count); + memcpy(op->buf, data, sizeof(obj_ver_id) * count); + if (osd_num == this->osd_num) + { + // local + op->bs_op = new blockstore_op_t({ + .opcode = (uint64_t)(rollback ? BS_OP_ROLLBACK : BS_OP_STABLE), + .callback = [this, op, pg_num, fb](blockstore_op_t *bs_op) + { + handle_flush_op(pg_num, fb, this->osd_num, bs_op->retval == 0); + delete op; + }, + .len = (uint32_t)count, + .buf = op->buf, + }); + bs->enqueue_op(op->bs_op); + } + else + { + // Peer + int peer_fd = osd_peer_fds[osd_num]; + op->op_type = OSD_OP_OUT; + op->send_list.push_back(op->req.buf, OSD_PACKET_SIZE); + op->send_list.push_back(op->buf, count * sizeof(obj_ver_id)); + op->peer_fd = peer_fd; + op->req = { + .sec_stab = { + .header = { + .magic = SECONDARY_OSD_OP_MAGIC, + .id = this->next_subop_id++, + .opcode = (uint64_t)(rollback ? OSD_OP_SECONDARY_ROLLBACK : OSD_OP_SECONDARY_STABILIZE), + }, + .len = count * sizeof(obj_ver_id), + }, + }; + op->callback = [this, pg_num, fb](osd_op_t *op) + { + handle_flush_op(pg_num, fb, clients[op->peer_fd].osd_num, op->reply.hdr.retval == 0); + delete op; + }; + outbox_push(clients[peer_fd], op); + } } void osd_t::repeer_pgs(osd_num_t osd_num, bool is_connected) @@ -195,15 +407,15 @@ void osd_t::repeer_pgs(osd_num_t osd_num, bool is_connected) // Re-peer affected PGs // FIXME: We shouldn't rely just on target_set. Other OSDs may also contain PG data. osd_num_t real_osd = (is_connected ? osd_num : 0); - for (int i = 0; i < pgs.size(); i++) + for (auto & p: pgs) { bool repeer = false; - for (int r = 0; r < pgs[i].target_set.size(); r++) + for (int r = 0; r < p.second.target_set.size(); r++) { - if (pgs[i].target_set[r] == osd_num && - pgs[i].cur_set[r] != real_osd) + if (p.second.target_set[r] == osd_num && + p.second.cur_set[r] != real_osd) { - pgs[i].cur_set[r] = real_osd; + p.second.cur_set[r] = real_osd; repeer = true; break; } @@ -211,21 +423,25 @@ void osd_t::repeer_pgs(osd_num_t osd_num, bool is_connected) if (repeer) { // Repeer this pg - printf("Repeer PG %d because of OSD %lu\n", i, osd_num); - start_pg_peering(i); + printf("Repeer PG %d because of OSD %lu\n", p.second.pg_num, osd_num); + start_pg_peering(p.second.pg_num); peering_state |= OSD_PEERING_PGS; } } } // Repeer on each connect/disconnect peer event -void osd_t::start_pg_peering(int pg_idx) +void osd_t::start_pg_peering(pg_num_t pg_num) { - auto & pg = pgs[pg_idx]; + auto & pg = pgs[pg_num]; pg.state = PG_PEERING; pg.state_dict.clear(); pg.obj_states.clear(); pg.ver_override.clear(); + pg.flush_actions.clear(); + if (pg.flush_batch) + delete pg.flush_batch; + pg.flush_batch = NULL; pg.pg_cursize = 0; for (int role = 0; role < pg.cur_set.size(); role++) { @@ -330,8 +546,8 @@ void osd_t::start_pg_peering(int pg_idx) op->bs_op = new blockstore_op_t(); op->bs_op->opcode = BS_OP_LIST; op->bs_op->oid.stripe = parity_block_size; - op->bs_op->len = pg_count, - op->bs_op->offset = pg.pg_num-1, + op->bs_op->len = pg_count; + op->bs_op->offset = pg.pg_num-1; op->bs_op->callback = [ps, op, role_osd](blockstore_op_t *bs_op) { if (op->bs_op->retval < 0) diff --git a/osd_peering_pg.cpp b/osd_peering_pg.cpp index 4d9dcc47..e39fe8a1 100644 --- a/osd_peering_pg.cpp +++ b/osd_peering_pg.cpp @@ -134,7 +134,7 @@ void pg_t::remember_object(pg_obj_state_check_t &st, std::vector & auto & pcs = pp.second; if (pcs.stable_ver < pcs.max_ver) { - auto & act = obj_stab_actions[pp.first]; + auto & act = flush_actions[pp.first]; if (pcs.max_ver > st.target_ver) { act.rollback = true; diff --git a/osd_peering_pg.h b/osd_peering_pg.h index 344d4ec6..8c0c0fa0 100644 --- a/osd_peering_pg.h +++ b/osd_peering_pg.h @@ -102,12 +102,15 @@ struct obj_piece_ver_t uint64_t stable_ver = 0; }; -struct obj_stab_action_t +struct flush_action_t { bool rollback = false, make_stable = false; uint64_t stable_to = 0, rollback_to = 0; + bool submitted = false; }; +struct pg_flush_batch_t; + struct pg_t { int state; @@ -125,9 +128,11 @@ struct pg_t // which is up to ~192 MB per 1 TB in the worst case scenario std::map state_dict; btree::btree_map obj_states; - std::map obj_stab_actions; + std::map flush_actions; btree::btree_map ver_override; pg_peering_state_t *peering_state = NULL; + pg_flush_batch_t *flush_batch = NULL; + int flush_actions_in_progress = 0; std::multimap write_queue; diff --git a/osd_primary.cpp b/osd_primary.cpp index deb1e967..12749109 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -60,10 +60,9 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op) // Our EC scheme stores data in fixed chunks equal to (K*block size) // But we must not use K in the process of calculating the PG number // So we calculate the PG number using a separate setting which should be per-inode (FIXME) - // FIXME Real pg_num should equal the below expression + 1 - pg_num_t pg_num = (cur_op->req.rw.inode + cur_op->req.rw.offset / parity_block_size) % pg_count; + pg_num_t pg_num = (cur_op->req.rw.inode + cur_op->req.rw.offset / parity_block_size) % pg_count + 1; // FIXME: Postpone operations in inactive PGs - if (pg_num > pgs.size() || !(pgs[pg_num].state & PG_ACTIVE)) + if (pgs.find(pg_num) == pgs.end() || !(pgs[pg_num].state & PG_ACTIVE)) { finish_primary_op(cur_op, -EINVAL); return false; @@ -331,17 +330,15 @@ void osd_t::continue_primary_write(osd_op_t *cur_op) assert(op_data->st == 0); // Check if actions are pending for this object { - auto act_it = pg.obj_stab_actions.lower_bound((obj_piece_id_t){ + auto act_it = pg.flush_actions.lower_bound((obj_piece_id_t){ .oid = op_data->oid, .osd_num = 0, }); - if (act_it != pg.obj_stab_actions.end() && + if (act_it != pg.flush_actions.end() && act_it->first.oid.inode == op_data->oid.inode && (act_it->first.oid.stripe & ~STRIPE_MASK) == op_data->oid.stripe) { - // FIXME postpone the request until actions are done - free(op_data); - finish_primary_op(cur_op, -EIO); + pg.write_queue.emplace(op_data->oid, cur_op); return; } }