forked from vitalif/vitastor
Compare commits
1 Commits
master
...
fsync-feed
Author | SHA1 | Date |
---|---|---|
Vitaliy Filippov | a39e8acc88 |
|
@ -82,3 +82,8 @@ uint32_t blockstore_t::get_bitmap_granularity()
|
|||
{
|
||||
return impl->get_bitmap_granularity();
|
||||
}
|
||||
|
||||
bool blockstore_t::wants_fsync()
|
||||
{
|
||||
return impl->wants_fsync();
|
||||
}
|
||||
|
|
|
@ -226,4 +226,7 @@ public:
|
|||
uint64_t get_journal_size();
|
||||
|
||||
uint32_t get_bitmap_granularity();
|
||||
|
||||
// Returns true if writing can stall due to a lack of fsync
|
||||
bool wants_fsync();
|
||||
};
|
||||
|
|
|
@ -167,7 +167,7 @@ void blockstore_impl_t::loop()
|
|||
// wait for all big writes to complete, submit data device fsync
|
||||
// wait for the data device fsync to complete, then submit journal writes for big writes
|
||||
// then submit an fsync operation
|
||||
if (has_writes)
|
||||
if (0 && has_writes)
|
||||
{
|
||||
// Can't submit SYNC before previous writes
|
||||
continue;
|
||||
|
@ -734,3 +734,15 @@ void blockstore_impl_t::disk_error_abort(const char *op, int retval, int expecte
|
|||
fprintf(stderr, "Disk %s failed: result is %d, expected %d. Can't continue, sorry :-(\n", op, retval, expected);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
bool blockstore_impl_t::wants_fsync()
|
||||
{
|
||||
if (!unstable_writes.size())
|
||||
{
|
||||
return false;
|
||||
}
|
||||
uint64_t journal_free_space = journal.next_free < journal.used_start
|
||||
? (journal.used_start - journal.next_free)
|
||||
: (journal.len - journal.next_free + journal.used_start - journal.block_size);
|
||||
return journal_fsync_feedback_limit > 0 && journal.len-journal_free_space >= journal_fsync_feedback_limit;
|
||||
}
|
||||
|
|
|
@ -264,6 +264,8 @@ class blockstore_impl_t
|
|||
int throttle_threshold_us = 50;
|
||||
// Maximum writes between automatically added fsync operations
|
||||
uint64_t autosync_writes = 128;
|
||||
// Maximum free space in the journal in bytes to start sending fsync feedback to primary OSDs
|
||||
uint64_t journal_fsync_feedback_limit = 0;
|
||||
/******* END OF OPTIONS *******/
|
||||
|
||||
struct ring_consumer_t ring_consumer;
|
||||
|
@ -433,4 +435,6 @@ public:
|
|||
inline uint64_t get_free_block_count() { return data_alloc->get_free_count(); }
|
||||
inline uint32_t get_bitmap_granularity() { return dsk.disk_alignment; }
|
||||
inline uint64_t get_journal_size() { return dsk.journal_len; }
|
||||
|
||||
bool wants_fsync();
|
||||
};
|
||||
|
|
|
@ -4,6 +4,25 @@
|
|||
#include <sys/file.h>
|
||||
#include "blockstore_impl.h"
|
||||
|
||||
static uint64_t parse_fsync_feedback(blockstore_config_t & config, uint64_t journal_len)
|
||||
{
|
||||
uint64_t journal_fsync_feedback_limit = 0;
|
||||
if (config.find("journal_min_free_bytes") == config.end() &&
|
||||
config.find("journal_min_free_percent") == config.end())
|
||||
{
|
||||
journal_fsync_feedback_limit = 90 * journal_len / 100;
|
||||
}
|
||||
else
|
||||
{
|
||||
journal_fsync_feedback_limit = strtoull(config["journal_min_free_bytes"].c_str(), NULL, 10);
|
||||
if (!journal_fsync_feedback_limit)
|
||||
{
|
||||
journal_fsync_feedback_limit = strtoull(config["journal_min_free_percent"].c_str(), NULL, 10) * journal_len / 100;
|
||||
}
|
||||
}
|
||||
return journal_fsync_feedback_limit;
|
||||
}
|
||||
|
||||
void blockstore_impl_t::parse_config(blockstore_config_t & config, bool init)
|
||||
{
|
||||
// Online-configurable options:
|
||||
|
@ -53,6 +72,8 @@ void blockstore_impl_t::parse_config(blockstore_config_t & config, bool init)
|
|||
}
|
||||
if (!init)
|
||||
{
|
||||
// has to be parsed after dsk.parse_config(), thus repeated here for online update
|
||||
journal_fsync_feedback_limit = parse_fsync_feedback(config, journal.len);
|
||||
return;
|
||||
}
|
||||
// Offline-configurable options:
|
||||
|
@ -96,6 +117,7 @@ void blockstore_impl_t::parse_config(blockstore_config_t & config, bool init)
|
|||
config["journal_no_same_sector_overwrites"] == "1" || config["journal_no_same_sector_overwrites"] == "yes";
|
||||
journal.inmemory = config["inmemory_journal"] != "false" && config["inmemory_journal"] != "0" &&
|
||||
config["inmemory_journal"] != "no";
|
||||
journal_fsync_feedback_limit = parse_fsync_feedback(config, journal.len);
|
||||
// Validate
|
||||
if (journal.sector_count < 2)
|
||||
{
|
||||
|
|
|
@ -16,7 +16,6 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
|
|||
{
|
||||
if (immediate_commit == IMMEDIATE_ALL)
|
||||
{
|
||||
// We can return immediately because sync is only dequeued after all previous writes
|
||||
op->retval = 0;
|
||||
FINISH_OP(op);
|
||||
return 2;
|
||||
|
|
|
@ -395,24 +395,27 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
|
|||
},
|
||||
},
|
||||
};
|
||||
json11::Json::object payload;
|
||||
if (this->osd_num)
|
||||
{
|
||||
payload["osd_num"] = this->osd_num;
|
||||
}
|
||||
#ifdef WITH_RDMA
|
||||
if (rdma_context)
|
||||
{
|
||||
cl->rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, rdma_max_msg);
|
||||
if (cl->rdma_conn)
|
||||
{
|
||||
json11::Json payload = json11::Json::object {
|
||||
{ "connect_rdma", cl->rdma_conn->addr.to_string() },
|
||||
{ "rdma_max_msg", cl->rdma_conn->max_msg },
|
||||
};
|
||||
std::string payload_str = payload.dump();
|
||||
op->req.show_conf.json_len = payload_str.size();
|
||||
op->buf = malloc_or_die(payload_str.size());
|
||||
op->iov.push_back(op->buf, payload_str.size());
|
||||
memcpy(op->buf, payload_str.c_str(), payload_str.size());
|
||||
payload["connect_rdma"] = cl->rdma_conn->addr.to_string();
|
||||
payload["rdma_max_msg"] = cl->rdma_conn->max_msg;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
std::string payload_str = json11::Json(payload).dump();
|
||||
op->req.show_conf.json_len = payload_str.size();
|
||||
op->buf = malloc_or_die(payload_str.size());
|
||||
op->iov.push_back(op->buf, payload_str.size());
|
||||
memcpy(op->buf, payload_str.c_str(), payload_str.size());
|
||||
op->callback = [this, cl](osd_op_t *op)
|
||||
{
|
||||
std::string json_err;
|
||||
|
|
|
@ -184,6 +184,14 @@ void osd_t::parse_config(bool init)
|
|||
// Allow to set it to 0
|
||||
autosync_writes = config["autosync_writes"].uint64_value();
|
||||
}
|
||||
if (!config["fsync_feedback_repeat_interval"].is_null())
|
||||
{
|
||||
fsync_feedback_repeat_interval = config["fsync_feedback_repeat_interval"].uint64_value();
|
||||
}
|
||||
if (!fsync_feedback_repeat_interval)
|
||||
{
|
||||
fsync_feedback_repeat_interval = 500; // ms
|
||||
}
|
||||
if (!config["client_queue_depth"].is_null())
|
||||
{
|
||||
client_queue_depth = config["client_queue_depth"].uint64_value();
|
||||
|
|
|
@ -122,6 +122,7 @@ class osd_t
|
|||
uint32_t scrub_list_limit = 1000;
|
||||
bool scrub_find_best = true;
|
||||
uint64_t scrub_ec_max_bruteforce = 100;
|
||||
uint64_t fsync_feedback_repeat_interval = 500;
|
||||
|
||||
// cluster state
|
||||
|
||||
|
@ -166,6 +167,8 @@ class osd_t
|
|||
uint64_t unstable_write_count = 0;
|
||||
std::map<osd_object_id_t, uint64_t> unstable_writes;
|
||||
std::deque<osd_op_t*> syncs_in_progress;
|
||||
std::map<int, timespec> unstable_write_osds;
|
||||
int fsync_feedback_timer_id = -1;
|
||||
|
||||
// client & peer I/O
|
||||
|
||||
|
@ -257,6 +260,7 @@ class osd_t
|
|||
void exec_show_config(osd_op_t *cur_op);
|
||||
void exec_secondary(osd_op_t *cur_op);
|
||||
void secondary_op_callback(osd_op_t *cur_op);
|
||||
void fsync_feedback();
|
||||
|
||||
// primary ops
|
||||
void autosync();
|
||||
|
|
|
@ -29,6 +29,23 @@ void osd_t::secondary_op_callback(osd_op_t *op)
|
|||
if (op->bs_op->retval > 0)
|
||||
op->iov.push_back(op->buf, op->bs_op->retval);
|
||||
}
|
||||
else if (op->req.hdr.opcode == OSD_OP_SEC_WRITE ||
|
||||
op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE)
|
||||
{
|
||||
#ifndef OSD_STUB
|
||||
fsync_feedback();
|
||||
#endif
|
||||
if (op->req.hdr.opcode == OSD_OP_SEC_WRITE)
|
||||
{
|
||||
auto & u = unstable_write_osds[op->peer_fd];
|
||||
u = u;
|
||||
}
|
||||
}
|
||||
else if (op->req.hdr.opcode == OSD_OP_SEC_SYNC)
|
||||
{
|
||||
// FIXME It would be more correct to track STABILIZE ops, not just reset on SYNC
|
||||
unstable_write_osds.erase(op->peer_fd);
|
||||
}
|
||||
else if (op->req.hdr.opcode == OSD_OP_SEC_LIST)
|
||||
{
|
||||
// allocated by blockstore
|
||||
|
@ -45,6 +62,71 @@ void osd_t::secondary_op_callback(osd_op_t *op)
|
|||
finish_op(op, retval);
|
||||
}
|
||||
|
||||
void osd_t::fsync_feedback()
|
||||
{
|
||||
if (!unstable_write_osds.size() || !bs->wants_fsync())
|
||||
{
|
||||
return;
|
||||
}
|
||||
bool postpone = false;
|
||||
// Broadcast fsync feedback
|
||||
timespec now;
|
||||
clock_gettime(CLOCK_REALTIME, &now);
|
||||
for (auto up_it = unstable_write_osds.begin(); up_it != unstable_write_osds.end(); )
|
||||
{
|
||||
auto & peer_fd = up_it->first;
|
||||
auto & last_feedback = up_it->second;
|
||||
if (msgr.clients.find(peer_fd) == msgr.clients.end() ||
|
||||
!msgr.clients.at(peer_fd)->osd_num)
|
||||
{
|
||||
unstable_write_osds.erase(up_it++);
|
||||
continue;
|
||||
}
|
||||
auto diff = (now.tv_sec-last_feedback.tv_sec)*1000 + (now.tv_nsec-last_feedback.tv_nsec)/1000000;
|
||||
if (diff > fsync_feedback_repeat_interval)
|
||||
{
|
||||
last_feedback = now;
|
||||
// Request fsync from the primary OSD
|
||||
// Note: primary OSD should NOT divide syncs by clients or this logic will break
|
||||
osd_op_t *fb_op = new osd_op_t();
|
||||
fb_op->op_type = OSD_OP_OUT;
|
||||
fb_op->req = (osd_any_op_t){
|
||||
.sync = {
|
||||
.header = {
|
||||
.magic = SECONDARY_OSD_OP_MAGIC,
|
||||
.id = msgr.next_subop_id++,
|
||||
.opcode = OSD_OP_SYNC,
|
||||
},
|
||||
},
|
||||
};
|
||||
fb_op->callback = [this](osd_op_t *op)
|
||||
{
|
||||
delete op;
|
||||
};
|
||||
fb_op->peer_fd = peer_fd;
|
||||
msgr.outbox_push(fb_op);
|
||||
}
|
||||
else
|
||||
{
|
||||
postpone = true;
|
||||
}
|
||||
up_it++;
|
||||
}
|
||||
if (fsync_feedback_timer_id >= 0)
|
||||
{
|
||||
tfd->clear_timer(fsync_feedback_timer_id);
|
||||
fsync_feedback_timer_id = -1;
|
||||
}
|
||||
if (postpone)
|
||||
{
|
||||
fsync_feedback_timer_id = tfd->set_timer(fsync_feedback_repeat_interval, false, [this](int timer_id)
|
||||
{
|
||||
fsync_feedback_timer_id = -1;
|
||||
fsync_feedback();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
void osd_t::exec_secondary(osd_op_t *cur_op)
|
||||
{
|
||||
if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP)
|
||||
|
@ -158,6 +240,7 @@ void osd_t::exec_show_config(osd_op_t *cur_op)
|
|||
json11::Json req_json = cur_op->req.show_conf.json_len > 0
|
||||
? json11::Json::parse(std::string((char *)cur_op->buf), json_err)
|
||||
: json11::Json();
|
||||
msgr.clients.at(cur_op->peer_fd)->osd_num = req_json["osd_num"].uint64_value();
|
||||
// Expose sensitive configuration values so peers can check them
|
||||
json11::Json::object wire_config = json11::Json::object {
|
||||
{ "osd_num", osd_num },
|
||||
|
|
Loading…
Reference in New Issue