diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp index b27980557..1a76789ea 100644 --- a/blockstore_flush.cpp +++ b/blockstore_flush.cpp @@ -154,6 +154,7 @@ bool journal_flusher_co::loop() goto resume_17; else if (wait_state == 18) goto resume_18; +resume_0: if (!flusher->flush_queue.size() || !flusher->start_forced && !flusher->active_flushers && flusher->flush_queue.size() < flusher->sync_threshold) { @@ -181,7 +182,7 @@ bool journal_flusher_co::loop() if (repeat_it->second < cur.version) repeat_it->second = cur.version; wait_state = 0; - return true; + goto resume_0; } else flusher->sync_to_repeat[cur.oid] = 0; @@ -196,7 +197,7 @@ resume_1: wait_state += 1; return false; } - if (copy_count == 0 && clean_loc == UINT64_MAX && !has_delete) + if (copy_count == 0 && clean_loc == UINT64_MAX && !has_delete && !has_empty) { // Nothing to flush flusher->active_flushers--; @@ -208,7 +209,7 @@ resume_1: } flusher->sync_to_repeat.erase(repeat_it); wait_state = 0; - return true; + goto resume_0; } // Find it in clean_db clean_it = bs->clean_db.find(cur.oid); @@ -427,7 +428,7 @@ resume_1: } flusher->sync_to_repeat.erase(repeat_it); wait_state = 0; - return true; + goto resume_0; } return true; } @@ -444,6 +445,7 @@ bool journal_flusher_co::scan_dirty(int wait_base) copy_count = 0; clean_loc = UINT64_MAX; has_delete = false; + has_empty = false; skip_copy = false; clean_init_bitmap = false; while (1) @@ -451,40 +453,47 @@ bool journal_flusher_co::scan_dirty(int wait_base) if (dirty_it->second.state == ST_J_STABLE && !skip_copy) { // First we submit all reads - offset = dirty_it->second.offset; - end_offset = dirty_it->second.offset + dirty_it->second.len; - it = v.begin(); - while (1) + if (dirty_it->second.len == 0) { - for (; it != v.end(); it++) - if (it->offset >= offset) - break; - if (it == v.end() || it->offset > offset && it->len > 0) + has_empty = true; + } + else + { + offset = dirty_it->second.offset; + end_offset = dirty_it->second.offset + dirty_it->second.len; + it = v.begin(); + while (1) { - submit_offset = dirty_it->second.location + offset - dirty_it->second.offset; - submit_len = it == v.end() || it->offset >= end_offset ? end_offset-offset : it->offset-offset; - it = v.insert(it, (copy_buffer_t){ .offset = offset, .len = submit_len, .buf = memalign(MEM_ALIGNMENT, submit_len) }); - copy_count++; - if (bs->journal.inmemory) + for (; it != v.end(); it++) + if (it->offset >= offset) + break; + if (it == v.end() || it->offset > offset && it->len > 0) { - // Take it from memory - memcpy(v.back().buf, bs->journal.buffer + submit_offset, submit_len); - } - else - { - // Read it from disk - await_sqe(0); - data->iov = (struct iovec){ v.back().buf, (size_t)submit_len }; - data->callback = simple_callback_r; - my_uring_prep_readv( - sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + submit_offset - ); - wait_count++; + submit_offset = dirty_it->second.location + offset - dirty_it->second.offset; + submit_len = it == v.end() || it->offset >= end_offset ? end_offset-offset : it->offset-offset; + it = v.insert(it, (copy_buffer_t){ .offset = offset, .len = submit_len, .buf = memalign(MEM_ALIGNMENT, submit_len) }); + copy_count++; + if (bs->journal.inmemory) + { + // Take it from memory + memcpy(v.back().buf, bs->journal.buffer + submit_offset, submit_len); + } + else + { + // Read it from disk + await_sqe(0); + data->iov = (struct iovec){ v.back().buf, (size_t)submit_len }; + data->callback = simple_callback_r; + my_uring_prep_readv( + sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + submit_offset + ); + wait_count++; + } } + offset = it->offset+it->len; + if (it == v.end() || offset >= end_offset) + break; } - offset = it->offset+it->len; - if (it == v.end() || offset >= end_offset) - break; } } else if (dirty_it->second.state == ST_D_STABLE && !skip_copy) diff --git a/blockstore_flush.h b/blockstore_flush.h index cdf726e42..294bd0a54 100644 --- a/blockstore_flush.h +++ b/blockstore_flush.h @@ -45,7 +45,7 @@ class journal_flusher_co std::map::iterator repeat_it; std::function simple_callback_r, simple_callback_w; - bool skip_copy, has_delete; + bool skip_copy, has_delete, has_empty; spp::sparse_hash_map::iterator clean_it; std::vector v; std::vector::iterator it; diff --git a/blockstore_init.cpp b/blockstore_init.cpp index 8d36ad853..284d15677 100644 --- a/blockstore_init.cpp +++ b/blockstore_init.cpp @@ -402,7 +402,15 @@ resume_1: } // Trim journal on start so we don't stall when all entries are older bs->journal.trim(); - printf("Journal entries loaded: %lu, free blocks: %lu / %lu\n", entries_loaded, bs->data_alloc->get_free_count(), bs->block_count); + printf( + "Journal entries loaded: %lu, free journal space: %lu bytes (%lu..%lu is used), free blocks: %lu / %lu\n", + entries_loaded, + (bs->journal.next_free >= bs->journal.used_start + ? bs->journal.len-bs->journal.block_size - (bs->journal.next_free-bs->journal.used_start) + : bs->journal.used_start - bs->journal.next_free), + bs->journal.used_start, bs->journal.next_free, + bs->data_alloc->get_free_count(), bs->block_count + ); bs->journal.crc32_last = crc32_last; return 0; } diff --git a/blockstore_journal.cpp b/blockstore_journal.cpp index ce2fd9fc7..0d0950746 100644 --- a/blockstore_journal.cpp +++ b/blockstore_journal.cpp @@ -67,6 +67,12 @@ int blockstore_journal_check_t::check_available(blockstore_op_t *op, int require if (!right_dir && next_pos >= bs->journal.used_start-bs->journal.block_size) { // No space in the journal. Wait until used_start changes. + printf( + "Ran out of journal space (free space: %lu bytes)\n", + (bs->journal.next_free >= bs->journal.used_start + ? bs->journal.len-bs->journal.block_size - (bs->journal.next_free-bs->journal.used_start) + : bs->journal.used_start - bs->journal.next_free) + ); PRIV(op)->wait_for = WAIT_JOURNAL; bs->flusher->force_start(); PRIV(op)->wait_detail = bs->journal.used_start; diff --git a/fio_sec_osd.cpp b/fio_sec_osd.cpp index b97300079..d32bb5c3a 100644 --- a/fio_sec_osd.cpp +++ b/fio_sec_osd.cpp @@ -5,7 +5,7 @@ // Random write: // // fio -thread -ioengine=./libfio_sec_osd.so -name=test -bs=4k -direct=1 -fsync=16 -iodepth=16 -rw=randwrite \ -// -host=127.0.0.1 -port=11203 -size=1000M +// -host=127.0.0.1 -port=11203 [-single_primary=1] -size=1000M // // Linear write: // @@ -50,6 +50,7 @@ struct sec_options int __pad; char *host = NULL; int port = 0; + bool single_primary = false; }; static struct fio_option options[] = { @@ -71,6 +72,16 @@ static struct fio_option options[] = { .category = FIO_OPT_C_ENGINE, .group = FIO_OPT_G_FILENAME, }, + { + .name = "single_primary", + .lname = "Single Primary", + .type = FIO_OPT_BOOL, + .off1 = offsetof(struct sec_options, single_primary), + .help = "Test single Primary OSD (one PG) instead of Secondary", + .def = "0", + .category = FIO_OPT_C_ENGINE, + .group = FIO_OPT_G_FILENAME, + }, { .name = NULL, }, @@ -150,6 +161,7 @@ static int sec_init(struct thread_data *td) /* Begin read or write request. */ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io) { + sec_options *opt = (sec_options*)td->eo; sec_data *bsd = (sec_data*)td->io_ops_data; int n = bsd->op_n; @@ -167,31 +179,59 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io) switch (io->ddir) { case DDIR_READ: - op.hdr.opcode = OSD_OP_SECONDARY_READ; - op.sec_rw.oid = { - .inode = 1, - .stripe = io->offset >> bsd->block_order, - }; - op.sec_rw.version = UINT64_MAX; // last unstable - op.sec_rw.offset = io->offset % bsd->block_size; - op.sec_rw.len = io->xfer_buflen; + if (!opt->single_primary) + { + op.hdr.opcode = OSD_OP_SECONDARY_READ; + op.sec_rw.oid = { + .inode = 1, + .stripe = io->offset >> bsd->block_order, + }; + op.sec_rw.version = UINT64_MAX; // last unstable + op.sec_rw.offset = io->offset % bsd->block_size; + op.sec_rw.len = io->xfer_buflen; + } + else + { + op.hdr.opcode = OSD_OP_READ; + op.rw.inode = 1; + op.rw.offset = io->offset; + op.rw.len = io->xfer_buflen; + } bsd->last_sync = false; break; case DDIR_WRITE: - op.hdr.opcode = OSD_OP_SECONDARY_WRITE; - op.sec_rw.oid = { - .inode = 1, - .stripe = io->offset >> bsd->block_order, - }; - op.sec_rw.version = 0; // assign automatically - op.sec_rw.offset = io->offset % bsd->block_size; - op.sec_rw.len = io->xfer_buflen; + if (!opt->single_primary) + { + op.hdr.opcode = OSD_OP_SECONDARY_WRITE; + op.sec_rw.oid = { + .inode = 1, + .stripe = io->offset >> bsd->block_order, + }; + op.sec_rw.version = 0; // assign automatically + op.sec_rw.offset = io->offset % bsd->block_size; + op.sec_rw.len = io->xfer_buflen; + } + else + { + op.hdr.opcode = OSD_OP_WRITE; + op.rw.inode = 1; + op.rw.offset = io->offset; + op.rw.len = io->xfer_buflen; + } bsd->last_sync = false; break; case DDIR_SYNC: - // Allowed only for testing: sync & stabilize all unstable object versions - op.hdr.opcode = OSD_OP_TEST_SYNC_STAB_ALL; - // fio sends 32 syncs with -fsync=32. we omit 31 of them even though it's not 100% fine (FIXME: fix fio itself) + if (!opt->single_primary) + { + // Allowed only for testing: sync & stabilize all unstable object versions + op.hdr.opcode = OSD_OP_TEST_SYNC_STAB_ALL; + } + else + { + op.hdr.opcode = OSD_OP_SYNC; + } + // fio sends 32 syncs with -fsync=32. we omit 31 of them even though + // generally it may not be 100% correct (FIXME: fix fio itself) bsd->last_sync = true; break; default: diff --git a/osd_peering_pg.cpp b/osd_peering_pg.cpp index 22276792a..0db329a29 100644 --- a/osd_peering_pg.cpp +++ b/osd_peering_pg.cpp @@ -133,6 +133,7 @@ void pg_t::remember_object(pg_obj_state_check_t &st, std::vector & } else pg.clean_count++; + pg.total_count++; } // FIXME: Write at least some tests for this function @@ -167,6 +168,7 @@ void pg_t::calc_object_states() std::sort(all.begin(), all.end()); // Walk over it and check object states pg.clean_count = 0; + pg.total_count = 0; pg.state = 0; int replica = 0; pg_obj_state_check_t st; @@ -251,12 +253,13 @@ void pg_t::calc_object_states() pg.state = pg.state | PG_DEGRADED; } printf( - "PG %u is active%s%s%s%s%s\n", pg.pg_num, + "PG %u is active%s%s%s%s%s (%lu objects)\n", pg.pg_num, (pg.state & PG_DEGRADED) ? " + degraded" : "", (pg.state & PG_HAS_UNFOUND) ? " + has_unfound" : "", (pg.state & PG_HAS_DEGRADED) ? " + has_degraded" : "", (pg.state & PG_HAS_MISPLACED) ? " + has_misplaced" : "", - (pg.state & PG_HAS_UNCLEAN) ? " + has_unclean" : "" + (pg.state & PG_HAS_UNCLEAN) ? " + has_unclean" : "", + pg.total_count ); pg.state = pg.state | PG_ACTIVE; } diff --git a/osd_peering_pg.h b/osd_peering_pg.h index da79bfcb6..62ba3d6b7 100644 --- a/osd_peering_pg.h +++ b/osd_peering_pg.h @@ -111,7 +111,7 @@ struct pg_t int state; uint64_t pg_cursize = 3, pg_size = 3, pg_minsize = 2; pg_num_t pg_num; - uint64_t clean_count = 0; + uint64_t clean_count = 0, total_count = 0; // target_set is the "correct" peer OSD set for this PG std::vector target_set; // cur_set is the current set of connected peer OSDs for this PG diff --git a/osd_primary.cpp b/osd_primary.cpp index 77ab81056..511444894 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -328,17 +328,18 @@ void osd_t::continue_primary_write(osd_op_t *cur_op) return; } } -resume_1: // Check if there are other write requests to the same object { - auto vo_it = pg.ver_override.find(op_data->oid); - if (vo_it != pg.ver_override.end()) + auto vo_it = pg.write_queue.find(op_data->oid); + if (vo_it != pg.write_queue.end()) { op_data->st = 1; pg.write_queue.emplace(op_data->oid, cur_op); return; } + pg.write_queue.emplace(op_data->oid, cur_op); } +resume_1: // Determine blocks to read cur_op->rmw_buf = calc_rmw_reads(cur_op->buf, op_data->stripes, pg.cur_set.data(), pg.pg_size, pg.pg_minsize, pg.pg_cursize); // Read required blocks @@ -381,10 +382,13 @@ resume_5: // Continue other write operations to the same object { auto next_it = pg.write_queue.find(op_data->oid); - if (next_it != pg.write_queue.end()) + auto this_it = next_it; + next_it++; + pg.write_queue.erase(this_it); + if (next_it != pg.write_queue.end() && + next_it->first == op_data->oid) { osd_op_t *next_op = next_it->second; - pg.write_queue.erase(next_it); continue_primary_write(next_op); } }