1
0
Fork 0

Compare commits

...

1 Commits

10 changed files with 154 additions and 11 deletions

View File

@ -82,3 +82,8 @@ uint32_t blockstore_t::get_bitmap_granularity()
{ {
return impl->get_bitmap_granularity(); return impl->get_bitmap_granularity();
} }
bool blockstore_t::wants_fsync()
{
return impl->wants_fsync();
}

View File

@ -226,4 +226,7 @@ public:
uint64_t get_journal_size(); uint64_t get_journal_size();
uint32_t get_bitmap_granularity(); uint32_t get_bitmap_granularity();
// Returns true if writing can stall due to a lack of fsync
bool wants_fsync();
}; };

View File

@ -167,7 +167,7 @@ void blockstore_impl_t::loop()
// wait for all big writes to complete, submit data device fsync // wait for all big writes to complete, submit data device fsync
// wait for the data device fsync to complete, then submit journal writes for big writes // wait for the data device fsync to complete, then submit journal writes for big writes
// then submit an fsync operation // then submit an fsync operation
if (has_writes) if (0 && has_writes)
{ {
// Can't submit SYNC before previous writes // Can't submit SYNC before previous writes
continue; continue;
@ -734,3 +734,15 @@ void blockstore_impl_t::disk_error_abort(const char *op, int retval, int expecte
fprintf(stderr, "Disk %s failed: result is %d, expected %d. Can't continue, sorry :-(\n", op, retval, expected); fprintf(stderr, "Disk %s failed: result is %d, expected %d. Can't continue, sorry :-(\n", op, retval, expected);
exit(1); exit(1);
} }
bool blockstore_impl_t::wants_fsync()
{
if (!unstable_writes.size())
{
return false;
}
uint64_t journal_free_space = journal.next_free < journal.used_start
? (journal.used_start - journal.next_free)
: (journal.len - journal.next_free + journal.used_start - journal.block_size);
return journal_fsync_feedback_limit > 0 && journal.len-journal_free_space >= journal_fsync_feedback_limit;
}

View File

@ -264,6 +264,8 @@ class blockstore_impl_t
int throttle_threshold_us = 50; int throttle_threshold_us = 50;
// Maximum writes between automatically added fsync operations // Maximum writes between automatically added fsync operations
uint64_t autosync_writes = 128; uint64_t autosync_writes = 128;
// Maximum free space in the journal in bytes to start sending fsync feedback to primary OSDs
uint64_t journal_fsync_feedback_limit = 0;
/******* END OF OPTIONS *******/ /******* END OF OPTIONS *******/
struct ring_consumer_t ring_consumer; struct ring_consumer_t ring_consumer;
@ -433,4 +435,6 @@ public:
inline uint64_t get_free_block_count() { return data_alloc->get_free_count(); } inline uint64_t get_free_block_count() { return data_alloc->get_free_count(); }
inline uint32_t get_bitmap_granularity() { return dsk.disk_alignment; } inline uint32_t get_bitmap_granularity() { return dsk.disk_alignment; }
inline uint64_t get_journal_size() { return dsk.journal_len; } inline uint64_t get_journal_size() { return dsk.journal_len; }
bool wants_fsync();
}; };

View File

@ -4,6 +4,25 @@
#include <sys/file.h> #include <sys/file.h>
#include "blockstore_impl.h" #include "blockstore_impl.h"
static uint64_t parse_fsync_feedback(blockstore_config_t & config, uint64_t journal_len)
{
uint64_t journal_fsync_feedback_limit = 0;
if (config.find("journal_min_free_bytes") == config.end() &&
config.find("journal_min_free_percent") == config.end())
{
journal_fsync_feedback_limit = 90 * journal_len / 100;
}
else
{
journal_fsync_feedback_limit = strtoull(config["journal_min_free_bytes"].c_str(), NULL, 10);
if (!journal_fsync_feedback_limit)
{
journal_fsync_feedback_limit = strtoull(config["journal_min_free_percent"].c_str(), NULL, 10) * journal_len / 100;
}
}
return journal_fsync_feedback_limit;
}
void blockstore_impl_t::parse_config(blockstore_config_t & config, bool init) void blockstore_impl_t::parse_config(blockstore_config_t & config, bool init)
{ {
// Online-configurable options: // Online-configurable options:
@ -53,6 +72,8 @@ void blockstore_impl_t::parse_config(blockstore_config_t & config, bool init)
} }
if (!init) if (!init)
{ {
// has to be parsed after dsk.parse_config(), thus repeated here for online update
journal_fsync_feedback_limit = parse_fsync_feedback(config, journal.len);
return; return;
} }
// Offline-configurable options: // Offline-configurable options:
@ -96,6 +117,7 @@ void blockstore_impl_t::parse_config(blockstore_config_t & config, bool init)
config["journal_no_same_sector_overwrites"] == "1" || config["journal_no_same_sector_overwrites"] == "yes"; config["journal_no_same_sector_overwrites"] == "1" || config["journal_no_same_sector_overwrites"] == "yes";
journal.inmemory = config["inmemory_journal"] != "false" && config["inmemory_journal"] != "0" && journal.inmemory = config["inmemory_journal"] != "false" && config["inmemory_journal"] != "0" &&
config["inmemory_journal"] != "no"; config["inmemory_journal"] != "no";
journal_fsync_feedback_limit = parse_fsync_feedback(config, journal.len);
// Validate // Validate
if (journal.sector_count < 2) if (journal.sector_count < 2)
{ {

View File

@ -16,7 +16,6 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
{ {
if (immediate_commit == IMMEDIATE_ALL) if (immediate_commit == IMMEDIATE_ALL)
{ {
// We can return immediately because sync is only dequeued after all previous writes
op->retval = 0; op->retval = 0;
FINISH_OP(op); FINISH_OP(op);
return 2; return 2;

View File

@ -395,24 +395,27 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
}, },
}, },
}; };
json11::Json::object payload;
if (this->osd_num)
{
payload["osd_num"] = this->osd_num;
}
#ifdef WITH_RDMA #ifdef WITH_RDMA
if (rdma_context) if (rdma_context)
{ {
cl->rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, rdma_max_msg); cl->rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, rdma_max_msg);
if (cl->rdma_conn) if (cl->rdma_conn)
{ {
json11::Json payload = json11::Json::object { payload["connect_rdma"] = cl->rdma_conn->addr.to_string();
{ "connect_rdma", cl->rdma_conn->addr.to_string() }, payload["rdma_max_msg"] = cl->rdma_conn->max_msg;
{ "rdma_max_msg", cl->rdma_conn->max_msg },
};
std::string payload_str = payload.dump();
op->req.show_conf.json_len = payload_str.size();
op->buf = malloc_or_die(payload_str.size());
op->iov.push_back(op->buf, payload_str.size());
memcpy(op->buf, payload_str.c_str(), payload_str.size());
} }
} }
#endif #endif
std::string payload_str = json11::Json(payload).dump();
op->req.show_conf.json_len = payload_str.size();
op->buf = malloc_or_die(payload_str.size());
op->iov.push_back(op->buf, payload_str.size());
memcpy(op->buf, payload_str.c_str(), payload_str.size());
op->callback = [this, cl](osd_op_t *op) op->callback = [this, cl](osd_op_t *op)
{ {
std::string json_err; std::string json_err;

View File

@ -184,6 +184,14 @@ void osd_t::parse_config(bool init)
// Allow to set it to 0 // Allow to set it to 0
autosync_writes = config["autosync_writes"].uint64_value(); autosync_writes = config["autosync_writes"].uint64_value();
} }
if (!config["fsync_feedback_repeat_interval"].is_null())
{
fsync_feedback_repeat_interval = config["fsync_feedback_repeat_interval"].uint64_value();
}
if (!fsync_feedback_repeat_interval)
{
fsync_feedback_repeat_interval = 500; // ms
}
if (!config["client_queue_depth"].is_null()) if (!config["client_queue_depth"].is_null())
{ {
client_queue_depth = config["client_queue_depth"].uint64_value(); client_queue_depth = config["client_queue_depth"].uint64_value();

View File

@ -122,6 +122,7 @@ class osd_t
uint32_t scrub_list_limit = 1000; uint32_t scrub_list_limit = 1000;
bool scrub_find_best = true; bool scrub_find_best = true;
uint64_t scrub_ec_max_bruteforce = 100; uint64_t scrub_ec_max_bruteforce = 100;
uint64_t fsync_feedback_repeat_interval = 500;
// cluster state // cluster state
@ -166,6 +167,8 @@ class osd_t
uint64_t unstable_write_count = 0; uint64_t unstable_write_count = 0;
std::map<osd_object_id_t, uint64_t> unstable_writes; std::map<osd_object_id_t, uint64_t> unstable_writes;
std::deque<osd_op_t*> syncs_in_progress; std::deque<osd_op_t*> syncs_in_progress;
std::map<int, timespec> unstable_write_osds;
int fsync_feedback_timer_id = -1;
// client & peer I/O // client & peer I/O
@ -257,6 +260,7 @@ class osd_t
void exec_show_config(osd_op_t *cur_op); void exec_show_config(osd_op_t *cur_op);
void exec_secondary(osd_op_t *cur_op); void exec_secondary(osd_op_t *cur_op);
void secondary_op_callback(osd_op_t *cur_op); void secondary_op_callback(osd_op_t *cur_op);
void fsync_feedback();
// primary ops // primary ops
void autosync(); void autosync();

View File

@ -29,6 +29,23 @@ void osd_t::secondary_op_callback(osd_op_t *op)
if (op->bs_op->retval > 0) if (op->bs_op->retval > 0)
op->iov.push_back(op->buf, op->bs_op->retval); op->iov.push_back(op->buf, op->bs_op->retval);
} }
else if (op->req.hdr.opcode == OSD_OP_SEC_WRITE ||
op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE)
{
#ifndef OSD_STUB
fsync_feedback();
#endif
if (op->req.hdr.opcode == OSD_OP_SEC_WRITE)
{
auto & u = unstable_write_osds[op->peer_fd];
u = u;
}
}
else if (op->req.hdr.opcode == OSD_OP_SEC_SYNC)
{
// FIXME It would be more correct to track STABILIZE ops, not just reset on SYNC
unstable_write_osds.erase(op->peer_fd);
}
else if (op->req.hdr.opcode == OSD_OP_SEC_LIST) else if (op->req.hdr.opcode == OSD_OP_SEC_LIST)
{ {
// allocated by blockstore // allocated by blockstore
@ -45,6 +62,71 @@ void osd_t::secondary_op_callback(osd_op_t *op)
finish_op(op, retval); finish_op(op, retval);
} }
void osd_t::fsync_feedback()
{
if (!unstable_write_osds.size() || !bs->wants_fsync())
{
return;
}
bool postpone = false;
// Broadcast fsync feedback
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
for (auto up_it = unstable_write_osds.begin(); up_it != unstable_write_osds.end(); )
{
auto & peer_fd = up_it->first;
auto & last_feedback = up_it->second;
if (msgr.clients.find(peer_fd) == msgr.clients.end() ||
!msgr.clients.at(peer_fd)->osd_num)
{
unstable_write_osds.erase(up_it++);
continue;
}
auto diff = (now.tv_sec-last_feedback.tv_sec)*1000 + (now.tv_nsec-last_feedback.tv_nsec)/1000000;
if (diff > fsync_feedback_repeat_interval)
{
last_feedback = now;
// Request fsync from the primary OSD
// Note: primary OSD should NOT divide syncs by clients or this logic will break
osd_op_t *fb_op = new osd_op_t();
fb_op->op_type = OSD_OP_OUT;
fb_op->req = (osd_any_op_t){
.sync = {
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
.id = msgr.next_subop_id++,
.opcode = OSD_OP_SYNC,
},
},
};
fb_op->callback = [this](osd_op_t *op)
{
delete op;
};
fb_op->peer_fd = peer_fd;
msgr.outbox_push(fb_op);
}
else
{
postpone = true;
}
up_it++;
}
if (fsync_feedback_timer_id >= 0)
{
tfd->clear_timer(fsync_feedback_timer_id);
fsync_feedback_timer_id = -1;
}
if (postpone)
{
fsync_feedback_timer_id = tfd->set_timer(fsync_feedback_repeat_interval, false, [this](int timer_id)
{
fsync_feedback_timer_id = -1;
fsync_feedback();
});
}
}
void osd_t::exec_secondary(osd_op_t *cur_op) void osd_t::exec_secondary(osd_op_t *cur_op)
{ {
if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP) if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP)
@ -158,6 +240,7 @@ void osd_t::exec_show_config(osd_op_t *cur_op)
json11::Json req_json = cur_op->req.show_conf.json_len > 0 json11::Json req_json = cur_op->req.show_conf.json_len > 0
? json11::Json::parse(std::string((char *)cur_op->buf), json_err) ? json11::Json::parse(std::string((char *)cur_op->buf), json_err)
: json11::Json(); : json11::Json();
msgr.clients.at(cur_op->peer_fd)->osd_num = req_json["osd_num"].uint64_value();
// Expose sensitive configuration values so peers can check them // Expose sensitive configuration values so peers can check them
json11::Json::object wire_config = json11::Json::object { json11::Json::object wire_config = json11::Json::object {
{ "osd_num", osd_num }, { "osd_num", osd_num },