diff --git a/Makefile b/Makefile index 01f65e9d..6fee527b 100644 --- a/Makefile +++ b/Makefile @@ -27,7 +27,7 @@ libblockstore.so: $(BLOCKSTORE_OBJS) libfio_blockstore.so: ./libblockstore.so fio_engine.cpp json11.o g++ $(CXXFLAGS) -shared -o libfio_blockstore.so fio_engine.cpp json11.o ./libblockstore.so -ltcmalloc_minimal -luring -OSD_OBJS := osd.o osd_secondary.o osd_receive.o osd_send.o osd_peering.o osd_peering_pg.o osd_primary.o osd_rmw.o json11.o timerfd_interval.o +OSD_OBJS := osd.o osd_secondary.o osd_receive.o osd_send.o osd_peering.o osd_flush.o osd_peering_pg.o osd_primary.o osd_rmw.o json11.o timerfd_interval.o osd_secondary.o: osd_secondary.cpp osd.h osd_ops.h ringloop.h g++ $(CXXFLAGS) -c -o $@ $< osd_receive.o: osd_receive.cpp osd.h osd_ops.h ringloop.h @@ -36,6 +36,8 @@ osd_send.o: osd_send.cpp osd.h osd_ops.h ringloop.h g++ $(CXXFLAGS) -c -o $@ $< osd_peering.o: osd_peering.cpp osd.h osd_ops.h osd_peering_pg.h ringloop.h g++ $(CXXFLAGS) -c -o $@ $< +osd_flush.o: osd_flush.cpp osd.h osd_ops.h osd_peering_pg.h ringloop.h + g++ $(CXXFLAGS) -c -o $@ $< osd_peering_pg.o: osd_peering_pg.cpp object_id.h osd_peering_pg.h g++ $(CXXFLAGS) -c -o $@ $< osd_rmw.o: osd_rmw.cpp osd_rmw.h xor.h diff --git a/osd_flush.cpp b/osd_flush.cpp new file mode 100644 index 00000000..bb4a06ae --- /dev/null +++ b/osd_flush.cpp @@ -0,0 +1,183 @@ +#include "osd.h" + +#define FLUSH_BATCH 512 + +void osd_t::submit_pg_flush_ops(pg_num_t pg_num) +{ + pg_t & pg = pgs[pg_num]; + pg_flush_batch_t *fb = new pg_flush_batch_t(); + pg.flush_batch = fb; + auto it = pg.flush_actions.begin(), prev_it = pg.flush_actions.begin(); + bool first = true; + while (it != pg.flush_actions.end()) + { + if (!first && (it->first.oid.inode != prev_it->first.oid.inode || + (it->first.oid.stripe & ~STRIPE_MASK) != (prev_it->first.oid.stripe & ~STRIPE_MASK)) && + fb->rollback_lists[it->first.osd_num].size() >= FLUSH_BATCH || + fb->stable_lists[it->first.osd_num].size() >= FLUSH_BATCH) + { + // Stop only at the object boundary + break; + } + it->second.submitted = true; + if (it->second.rollback) + { + fb->flush_objects++; + fb->rollback_lists[it->first.osd_num].push_back((obj_ver_id){ + .oid = it->first.oid, + .version = it->second.rollback_to, + }); + } + if (it->second.make_stable) + { + fb->flush_objects++; + fb->stable_lists[it->first.osd_num].push_back((obj_ver_id){ + .oid = it->first.oid, + .version = it->second.stable_to, + }); + } + prev_it = it; + first = false; + it++; + } + for (auto & l: fb->rollback_lists) + { + if (l.second.size() > 0) + { + fb->flush_ops++; + submit_flush_op(pg.pg_num, fb, true, l.first, l.second.size(), l.second.data()); + } + } + for (auto & l: fb->stable_lists) + { + if (l.second.size() > 0) + { + fb->flush_ops++; + submit_flush_op(pg.pg_num, fb, false, l.first, l.second.size(), l.second.data()); + } + } +} + +void osd_t::handle_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, osd_num_t osd_num, bool ok) +{ + if (pgs.find(pg_num) == pgs.end() || pgs[pg_num].flush_batch != fb) + { + // Throw the result away + return; + } + if (!ok) + { + if (osd_num == this->osd_num) + throw std::runtime_error("Error while doing local flush operation"); + else + { + assert(osd_peer_fds.find(osd_num) != osd_peer_fds.end()); + stop_client(osd_peer_fds[osd_num]); + return; + } + } + fb->flush_done++; + if (fb->flush_done == fb->flush_ops) + { + // This flush batch is done + std::vector continue_ops; + auto & pg = pgs[pg_num]; + auto it = pg.flush_actions.begin(), prev_it = it; + auto erase_start = it; + while (1) + { + if (it == pg.flush_actions.end() || + it->first.oid.inode != prev_it->first.oid.inode || + (it->first.oid.stripe & ~STRIPE_MASK) != (prev_it->first.oid.stripe & ~STRIPE_MASK)) + { + auto wr_it = pg.write_queue.find((object_id){ + .inode = prev_it->first.oid.inode, + .stripe = (prev_it->first.oid.stripe & ~STRIPE_MASK), + }); + if (wr_it != pg.write_queue.end()) + { + continue_ops.push_back(wr_it->second); + pg.write_queue.erase(wr_it); + } + } + if ((it == pg.flush_actions.end() || !it->second.submitted) && + erase_start != it) + { + pg.flush_actions.erase(erase_start, it); + } + if (it == pg.flush_actions.end()) + { + break; + } + prev_it = it; + if (!it->second.submitted) + { + it++; + erase_start = it; + } + else + { + it++; + } + } + delete fb; + pg.flush_batch = NULL; + if (!pg.flush_actions.size()) + { + pg.state = pg.state & ~PG_HAS_UNCLEAN; + pg.print_state(); + } + for (osd_op_t *op: continue_ops) + { + continue_primary_write(op); + } + } +} + +void osd_t::submit_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback, osd_num_t osd_num, int count, obj_ver_id *data) +{ + osd_op_t *op = new osd_op_t(); + // Copy buffer so it gets freed along with the operation + op->buf = malloc(sizeof(obj_ver_id) * count); + memcpy(op->buf, data, sizeof(obj_ver_id) * count); + if (osd_num == this->osd_num) + { + // local + op->bs_op = new blockstore_op_t({ + .opcode = (uint64_t)(rollback ? BS_OP_ROLLBACK : BS_OP_STABLE), + .callback = [this, op, pg_num, fb](blockstore_op_t *bs_op) + { + handle_flush_op(pg_num, fb, this->osd_num, bs_op->retval == 0); + delete op; + }, + .len = (uint32_t)count, + .buf = op->buf, + }); + bs->enqueue_op(op->bs_op); + } + else + { + // Peer + int peer_fd = osd_peer_fds[osd_num]; + op->op_type = OSD_OP_OUT; + op->send_list.push_back(op->req.buf, OSD_PACKET_SIZE); + op->send_list.push_back(op->buf, count * sizeof(obj_ver_id)); + op->peer_fd = peer_fd; + op->req = { + .sec_stab = { + .header = { + .magic = SECONDARY_OSD_OP_MAGIC, + .id = this->next_subop_id++, + .opcode = (uint64_t)(rollback ? OSD_OP_SECONDARY_ROLLBACK : OSD_OP_SECONDARY_STABILIZE), + }, + .len = count * sizeof(obj_ver_id), + }, + }; + op->callback = [this, pg_num, fb](osd_op_t *op) + { + handle_flush_op(pg_num, fb, clients[op->peer_fd].osd_num, op->reply.hdr.retval == 0); + delete op; + }; + outbox_push(clients[peer_fd], op); + } +} diff --git a/osd_peering.cpp b/osd_peering.cpp index 908ac25c..1076fdf0 100644 --- a/osd_peering.cpp +++ b/osd_peering.cpp @@ -214,196 +214,6 @@ void osd_t::handle_peers() } } -#define FLUSH_BATCH 512 - -struct pg_flush_batch_t -{ - std::map> rollback_lists; - std::map> stable_lists; - int flush_ops = 0, flush_done = 0; - int flush_objects = 0; -}; - -void osd_t::submit_pg_flush_ops(pg_num_t pg_num) -{ - pg_t & pg = pgs[pg_num]; - pg_flush_batch_t *fb = new pg_flush_batch_t(); - pg.flush_batch = fb; - auto it = pg.flush_actions.begin(), prev_it = pg.flush_actions.begin(); - bool first = true; - while (it != pg.flush_actions.end()) - { - if (!first && (it->first.oid.inode != prev_it->first.oid.inode || - (it->first.oid.stripe & ~STRIPE_MASK) != (prev_it->first.oid.stripe & ~STRIPE_MASK)) && - fb->rollback_lists[it->first.osd_num].size() >= FLUSH_BATCH || - fb->stable_lists[it->first.osd_num].size() >= FLUSH_BATCH) - { - // Stop only at the object boundary - break; - } - it->second.submitted = true; - if (it->second.rollback) - { - fb->flush_objects++; - fb->rollback_lists[it->first.osd_num].push_back((obj_ver_id){ - .oid = it->first.oid, - .version = it->second.rollback_to, - }); - } - if (it->second.make_stable) - { - fb->flush_objects++; - fb->stable_lists[it->first.osd_num].push_back((obj_ver_id){ - .oid = it->first.oid, - .version = it->second.stable_to, - }); - } - prev_it = it; - first = false; - it++; - } - for (auto & l: fb->rollback_lists) - { - if (l.second.size() > 0) - { - fb->flush_ops++; - submit_flush_op(pg.pg_num, fb, true, l.first, l.second.size(), l.second.data()); - } - } - for (auto & l: fb->stable_lists) - { - if (l.second.size() > 0) - { - fb->flush_ops++; - submit_flush_op(pg.pg_num, fb, false, l.first, l.second.size(), l.second.data()); - } - } -} - -void osd_t::handle_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, osd_num_t osd_num, bool ok) -{ - if (pgs.find(pg_num) == pgs.end() || pgs[pg_num].flush_batch != fb) - { - // Throw the result away - return; - } - if (!ok) - { - if (osd_num == this->osd_num) - throw std::runtime_error("Error while doing local flush operation"); - else - { - assert(osd_peer_fds.find(osd_num) != osd_peer_fds.end()); - stop_client(osd_peer_fds[osd_num]); - return; - } - } - fb->flush_done++; - if (fb->flush_done == fb->flush_ops) - { - // This flush batch is done - std::vector continue_ops; - auto & pg = pgs[pg_num]; - auto it = pg.flush_actions.begin(), prev_it = it; - auto erase_start = it; - while (1) - { - if (it == pg.flush_actions.end() || - it->first.oid.inode != prev_it->first.oid.inode || - (it->first.oid.stripe & ~STRIPE_MASK) != (prev_it->first.oid.stripe & ~STRIPE_MASK)) - { - auto wr_it = pg.write_queue.find((object_id){ - .inode = prev_it->first.oid.inode, - .stripe = (prev_it->first.oid.stripe & ~STRIPE_MASK), - }); - if (wr_it != pg.write_queue.end()) - { - continue_ops.push_back(wr_it->second); - pg.write_queue.erase(wr_it); - } - } - if ((it == pg.flush_actions.end() || !it->second.submitted) && - erase_start != it) - { - pg.flush_actions.erase(erase_start, it); - } - if (it == pg.flush_actions.end()) - { - break; - } - prev_it = it; - if (!it->second.submitted) - { - it++; - erase_start = it; - } - else - { - it++; - } - } - delete fb; - pg.flush_batch = NULL; - if (!pg.flush_actions.size()) - { - pg.state = pg.state & ~PG_HAS_UNCLEAN; - pg.print_state(); - } - for (osd_op_t *op: continue_ops) - { - continue_primary_write(op); - } - } -} - -void osd_t::submit_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback, osd_num_t osd_num, int count, obj_ver_id *data) -{ - osd_op_t *op = new osd_op_t(); - // Copy buffer so it gets freed along with the operation - op->buf = malloc(sizeof(obj_ver_id) * count); - memcpy(op->buf, data, sizeof(obj_ver_id) * count); - if (osd_num == this->osd_num) - { - // local - op->bs_op = new blockstore_op_t({ - .opcode = (uint64_t)(rollback ? BS_OP_ROLLBACK : BS_OP_STABLE), - .callback = [this, op, pg_num, fb](blockstore_op_t *bs_op) - { - handle_flush_op(pg_num, fb, this->osd_num, bs_op->retval == 0); - delete op; - }, - .len = (uint32_t)count, - .buf = op->buf, - }); - bs->enqueue_op(op->bs_op); - } - else - { - // Peer - int peer_fd = osd_peer_fds[osd_num]; - op->op_type = OSD_OP_OUT; - op->send_list.push_back(op->req.buf, OSD_PACKET_SIZE); - op->send_list.push_back(op->buf, count * sizeof(obj_ver_id)); - op->peer_fd = peer_fd; - op->req = { - .sec_stab = { - .header = { - .magic = SECONDARY_OSD_OP_MAGIC, - .id = this->next_subop_id++, - .opcode = (uint64_t)(rollback ? OSD_OP_SECONDARY_ROLLBACK : OSD_OP_SECONDARY_STABILIZE), - }, - .len = count * sizeof(obj_ver_id), - }, - }; - op->callback = [this, pg_num, fb](osd_op_t *op) - { - handle_flush_op(pg_num, fb, clients[op->peer_fd].osd_num, op->reply.hdr.retval == 0); - delete op; - }; - outbox_push(clients[peer_fd], op); - } -} - void osd_t::repeer_pgs(osd_num_t osd_num, bool is_connected) { // Re-peer affected PGs diff --git a/osd_peering_pg.cpp b/osd_peering_pg.cpp index 65d9a809..f3f64fda 100644 --- a/osd_peering_pg.cpp +++ b/osd_peering_pg.cpp @@ -27,13 +27,13 @@ void pg_t::remember_object(pg_obj_state_check_t &st, std::vector & } else if (st.n_roles < pg.pg_minsize) { - printf("Object is unfound: inode=%lu stripe=%lu version=%lu/%lu\n", st.oid.inode, st.oid.stripe, st.target_ver, st.max_ver); + printf("Object is incomplete: inode=%lu stripe=%lu version=%lu/%lu\n", st.oid.inode, st.oid.stripe, st.target_ver, st.max_ver); for (int i = st.ver_start; i < st.ver_end; i++) { printf("Present on: osd %lu, role %ld%s\n", all[i].osd_num, (all[i].oid.stripe & STRIPE_MASK), all[i].is_stable ? " (stable)" : ""); } state = OBJ_INCOMPLETE; - pg.state = pg.state | PG_HAS_UNFOUND; + pg.state = pg.state | PG_HAS_INCOMPLETE; } else { @@ -48,7 +48,7 @@ void pg_t::remember_object(pg_obj_state_check_t &st, std::vector & if (st.n_copies > pg.pg_size) { state |= OBJ_OVERCOPIED; - pg.state = pg.state | PG_HAS_UNCLEAN; + pg.state = pg.state | PG_HAS_MISPLACED; } if (st.n_stable < st.n_copies) { @@ -288,7 +288,7 @@ void pg_t::print_state() (state & PG_INCOMPLETE) ? "incomplete" : "", (state & PG_ACTIVE) ? "active" : "", (state & PG_DEGRADED) ? " + degraded" : "", - (state & PG_HAS_UNFOUND) ? " + has_unfound" : "", + (state & PG_HAS_INCOMPLETE) ? " + has_incomplete" : "", (state & PG_HAS_DEGRADED) ? " + has_degraded" : "", (state & PG_HAS_MISPLACED) ? " + has_misplaced" : "", (state & PG_HAS_UNCLEAN) ? " + has_unclean" : "", diff --git a/osd_peering_pg.h b/osd_peering_pg.h index 0dac9396..e820d0a7 100644 --- a/osd_peering_pg.h +++ b/osd_peering_pg.h @@ -16,12 +16,12 @@ #define PG_ACTIVE (1<<3) // Plus any of these: #define PG_DEGRADED (1<<4) -#define PG_HAS_UNFOUND (1<<5) +#define PG_HAS_INCOMPLETE (1<<5) #define PG_HAS_DEGRADED (1<<6) #define PG_HAS_MISPLACED (1<<7) #define PG_HAS_UNCLEAN (1<<8) -// FIXME: Safe default that doesn't depend on parity_block_size of pg_parity_size +// FIXME: Safe default that doesn't depend on parity_block_size or pg_parity_size #define STRIPE_MASK ((uint64_t)4096 - 1) // OSD object states @@ -109,7 +109,13 @@ struct flush_action_t bool submitted = false; }; -struct pg_flush_batch_t; +struct pg_flush_batch_t +{ + std::map> rollback_lists; + std::map> stable_lists; + int flush_ops = 0, flush_done = 0; + int flush_objects = 0; +}; struct pg_t {