diff --git a/src/blockstore.cpp b/src/blockstore.cpp index 2bd729de..453ac7e7 100644 --- a/src/blockstore.cpp +++ b/src/blockstore.cpp @@ -82,3 +82,8 @@ uint32_t blockstore_t::get_bitmap_granularity() { return impl->get_bitmap_granularity(); } + +bool blockstore_t::wants_fsync() +{ + return impl->wants_fsync(); +} diff --git a/src/blockstore.h b/src/blockstore.h index 201c259f..71b7f081 100644 --- a/src/blockstore.h +++ b/src/blockstore.h @@ -226,4 +226,7 @@ public: uint64_t get_journal_size(); uint32_t get_bitmap_granularity(); + + // Returns true if writing can stall due to a lack of fsync + bool wants_fsync(); }; diff --git a/src/blockstore_impl.cpp b/src/blockstore_impl.cpp index d8ccff9c..a81e246e 100644 --- a/src/blockstore_impl.cpp +++ b/src/blockstore_impl.cpp @@ -167,7 +167,7 @@ void blockstore_impl_t::loop() // wait for all big writes to complete, submit data device fsync // wait for the data device fsync to complete, then submit journal writes for big writes // then submit an fsync operation - if (has_writes) + if (0 && has_writes) { // Can't submit SYNC before previous writes continue; @@ -734,3 +734,15 @@ void blockstore_impl_t::disk_error_abort(const char *op, int retval, int expecte fprintf(stderr, "Disk %s failed: result is %d, expected %d. Can't continue, sorry :-(\n", op, retval, expected); exit(1); } + +bool blockstore_impl_t::wants_fsync() +{ + if (!unstable_writes.size()) + { + return false; + } + uint64_t journal_free_space = journal.next_free < journal.used_start + ? (journal.used_start - journal.next_free) + : (journal.len - journal.next_free + journal.used_start - journal.block_size); + return journal_fsync_feedback_limit > 0 && journal.len-journal_free_space >= journal_fsync_feedback_limit; +} diff --git a/src/blockstore_impl.h b/src/blockstore_impl.h index d833b8a8..8533b474 100644 --- a/src/blockstore_impl.h +++ b/src/blockstore_impl.h @@ -264,6 +264,8 @@ class blockstore_impl_t int throttle_threshold_us = 50; // Maximum writes between automatically added fsync operations uint64_t autosync_writes = 128; + // Maximum free space in the journal in bytes to start sending fsync feedback to primary OSDs + uint64_t journal_fsync_feedback_limit = 0; /******* END OF OPTIONS *******/ struct ring_consumer_t ring_consumer; @@ -433,4 +435,6 @@ public: inline uint64_t get_free_block_count() { return data_alloc->get_free_count(); } inline uint32_t get_bitmap_granularity() { return dsk.disk_alignment; } inline uint64_t get_journal_size() { return dsk.journal_len; } + + bool wants_fsync(); }; diff --git a/src/blockstore_open.cpp b/src/blockstore_open.cpp index 7c57dbde..26a0b254 100644 --- a/src/blockstore_open.cpp +++ b/src/blockstore_open.cpp @@ -4,6 +4,25 @@ #include #include "blockstore_impl.h" +static uint64_t parse_fsync_feedback(blockstore_config_t & config, uint64_t journal_len) +{ + uint64_t journal_fsync_feedback_limit = 0; + if (config.find("journal_min_free_bytes") == config.end() && + config.find("journal_min_free_percent") == config.end()) + { + journal_fsync_feedback_limit = 90 * journal_len / 100; + } + else + { + journal_fsync_feedback_limit = strtoull(config["journal_min_free_bytes"].c_str(), NULL, 10); + if (!journal_fsync_feedback_limit) + { + journal_fsync_feedback_limit = strtoull(config["journal_min_free_percent"].c_str(), NULL, 10) * journal_len / 100; + } + } + return journal_fsync_feedback_limit; +} + void blockstore_impl_t::parse_config(blockstore_config_t & config, bool init) { // Online-configurable options: @@ -53,6 +72,8 @@ void blockstore_impl_t::parse_config(blockstore_config_t & config, bool init) } if (!init) { + // has to be parsed after dsk.parse_config(), thus repeated here for online update + journal_fsync_feedback_limit = parse_fsync_feedback(config, journal.len); return; } // Offline-configurable options: @@ -96,6 +117,7 @@ void blockstore_impl_t::parse_config(blockstore_config_t & config, bool init) config["journal_no_same_sector_overwrites"] == "1" || config["journal_no_same_sector_overwrites"] == "yes"; journal.inmemory = config["inmemory_journal"] != "false" && config["inmemory_journal"] != "0" && config["inmemory_journal"] != "no"; + journal_fsync_feedback_limit = parse_fsync_feedback(config, journal.len); // Validate if (journal.sector_count < 2) { diff --git a/src/blockstore_sync.cpp b/src/blockstore_sync.cpp index 918eb90c..f52084f3 100644 --- a/src/blockstore_sync.cpp +++ b/src/blockstore_sync.cpp @@ -16,7 +16,6 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op) { if (immediate_commit == IMMEDIATE_ALL) { - // We can return immediately because sync is only dequeued after all previous writes op->retval = 0; FINISH_OP(op); return 2; diff --git a/src/messenger.cpp b/src/messenger.cpp index d08d5e9a..5fb32e62 100644 --- a/src/messenger.cpp +++ b/src/messenger.cpp @@ -395,24 +395,27 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl) }, }, }; + json11::Json::object payload; + if (this->osd_num) + { + payload["osd_num"] = this->osd_num; + } #ifdef WITH_RDMA if (rdma_context) { cl->rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, rdma_max_msg); if (cl->rdma_conn) { - json11::Json payload = json11::Json::object { - { "connect_rdma", cl->rdma_conn->addr.to_string() }, - { "rdma_max_msg", cl->rdma_conn->max_msg }, - }; - std::string payload_str = payload.dump(); - op->req.show_conf.json_len = payload_str.size(); - op->buf = malloc_or_die(payload_str.size()); - op->iov.push_back(op->buf, payload_str.size()); - memcpy(op->buf, payload_str.c_str(), payload_str.size()); + payload["connect_rdma"] = cl->rdma_conn->addr.to_string(); + payload["rdma_max_msg"] = cl->rdma_conn->max_msg; } } #endif + std::string payload_str = json11::Json(payload).dump(); + op->req.show_conf.json_len = payload_str.size(); + op->buf = malloc_or_die(payload_str.size()); + op->iov.push_back(op->buf, payload_str.size()); + memcpy(op->buf, payload_str.c_str(), payload_str.size()); op->callback = [this, cl](osd_op_t *op) { std::string json_err; diff --git a/src/osd.cpp b/src/osd.cpp index 32157ab1..57e05831 100644 --- a/src/osd.cpp +++ b/src/osd.cpp @@ -184,6 +184,14 @@ void osd_t::parse_config(bool init) // Allow to set it to 0 autosync_writes = config["autosync_writes"].uint64_value(); } + if (!config["fsync_feedback_repeat_interval"].is_null()) + { + fsync_feedback_repeat_interval = config["fsync_feedback_repeat_interval"].uint64_value(); + } + if (!fsync_feedback_repeat_interval) + { + fsync_feedback_repeat_interval = 500; // ms + } if (!config["client_queue_depth"].is_null()) { client_queue_depth = config["client_queue_depth"].uint64_value(); diff --git a/src/osd.h b/src/osd.h index c7a67de9..13953e3c 100644 --- a/src/osd.h +++ b/src/osd.h @@ -122,6 +122,7 @@ class osd_t uint32_t scrub_list_limit = 1000; bool scrub_find_best = true; uint64_t scrub_ec_max_bruteforce = 100; + uint64_t fsync_feedback_repeat_interval = 500; // cluster state @@ -166,6 +167,8 @@ class osd_t uint64_t unstable_write_count = 0; std::map unstable_writes; std::deque syncs_in_progress; + std::map unstable_write_osds; + int fsync_feedback_timer_id = -1; // client & peer I/O @@ -257,6 +260,7 @@ class osd_t void exec_show_config(osd_op_t *cur_op); void exec_secondary(osd_op_t *cur_op); void secondary_op_callback(osd_op_t *cur_op); + void fsync_feedback(); // primary ops void autosync(); diff --git a/src/osd_secondary.cpp b/src/osd_secondary.cpp index 91939e04..132f208a 100644 --- a/src/osd_secondary.cpp +++ b/src/osd_secondary.cpp @@ -29,6 +29,23 @@ void osd_t::secondary_op_callback(osd_op_t *op) if (op->bs_op->retval > 0) op->iov.push_back(op->buf, op->bs_op->retval); } + else if (op->req.hdr.opcode == OSD_OP_SEC_WRITE || + op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE) + { +#ifndef OSD_STUB + fsync_feedback(); +#endif + if (op->req.hdr.opcode == OSD_OP_SEC_WRITE) + { + auto & u = unstable_write_osds[op->peer_fd]; + u = u; + } + } + else if (op->req.hdr.opcode == OSD_OP_SEC_SYNC) + { + // FIXME It would be more correct to track STABILIZE ops, not just reset on SYNC + unstable_write_osds.erase(op->peer_fd); + } else if (op->req.hdr.opcode == OSD_OP_SEC_LIST) { // allocated by blockstore @@ -45,6 +62,71 @@ void osd_t::secondary_op_callback(osd_op_t *op) finish_op(op, retval); } +void osd_t::fsync_feedback() +{ + if (!unstable_write_osds.size() || !bs->wants_fsync()) + { + return; + } + bool postpone = false; + // Broadcast fsync feedback + timespec now; + clock_gettime(CLOCK_REALTIME, &now); + for (auto up_it = unstable_write_osds.begin(); up_it != unstable_write_osds.end(); ) + { + auto & peer_fd = up_it->first; + auto & last_feedback = up_it->second; + if (msgr.clients.find(peer_fd) == msgr.clients.end() || + !msgr.clients.at(peer_fd)->osd_num) + { + unstable_write_osds.erase(up_it++); + continue; + } + auto diff = (now.tv_sec-last_feedback.tv_sec)*1000 + (now.tv_nsec-last_feedback.tv_nsec)/1000000; + if (diff > fsync_feedback_repeat_interval) + { + last_feedback = now; + // Request fsync from the primary OSD + // Note: primary OSD should NOT divide syncs by clients or this logic will break + osd_op_t *fb_op = new osd_op_t(); + fb_op->op_type = OSD_OP_OUT; + fb_op->req = (osd_any_op_t){ + .sync = { + .header = { + .magic = SECONDARY_OSD_OP_MAGIC, + .id = msgr.next_subop_id++, + .opcode = OSD_OP_SYNC, + }, + }, + }; + fb_op->callback = [this](osd_op_t *op) + { + delete op; + }; + fb_op->peer_fd = peer_fd; + msgr.outbox_push(fb_op); + } + else + { + postpone = true; + } + up_it++; + } + if (fsync_feedback_timer_id >= 0) + { + tfd->clear_timer(fsync_feedback_timer_id); + fsync_feedback_timer_id = -1; + } + if (postpone) + { + fsync_feedback_timer_id = tfd->set_timer(fsync_feedback_repeat_interval, false, [this](int timer_id) + { + fsync_feedback_timer_id = -1; + fsync_feedback(); + }); + } +} + void osd_t::exec_secondary(osd_op_t *cur_op) { if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP) @@ -158,6 +240,7 @@ void osd_t::exec_show_config(osd_op_t *cur_op) json11::Json req_json = cur_op->req.show_conf.json_len > 0 ? json11::Json::parse(std::string((char *)cur_op->buf), json_err) : json11::Json(); + msgr.clients.at(cur_op->peer_fd)->osd_num = req_json["osd_num"].uint64_value(); // Expose sensitive configuration values so peers can check them json11::Json::object wire_config = json11::Json::object { { "osd_num", osd_num },