forked from vitalif/vitastor
Compare commits
1 Commits
master
...
fsync-feed
Author | SHA1 | Date |
---|---|---|
Vitaliy Filippov | a39e8acc88 |
|
@ -82,3 +82,8 @@ uint32_t blockstore_t::get_bitmap_granularity()
|
||||||
{
|
{
|
||||||
return impl->get_bitmap_granularity();
|
return impl->get_bitmap_granularity();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool blockstore_t::wants_fsync()
|
||||||
|
{
|
||||||
|
return impl->wants_fsync();
|
||||||
|
}
|
||||||
|
|
|
@ -226,4 +226,7 @@ public:
|
||||||
uint64_t get_journal_size();
|
uint64_t get_journal_size();
|
||||||
|
|
||||||
uint32_t get_bitmap_granularity();
|
uint32_t get_bitmap_granularity();
|
||||||
|
|
||||||
|
// Returns true if writing can stall due to a lack of fsync
|
||||||
|
bool wants_fsync();
|
||||||
};
|
};
|
||||||
|
|
|
@ -167,7 +167,7 @@ void blockstore_impl_t::loop()
|
||||||
// wait for all big writes to complete, submit data device fsync
|
// wait for all big writes to complete, submit data device fsync
|
||||||
// wait for the data device fsync to complete, then submit journal writes for big writes
|
// wait for the data device fsync to complete, then submit journal writes for big writes
|
||||||
// then submit an fsync operation
|
// then submit an fsync operation
|
||||||
if (has_writes)
|
if (0 && has_writes)
|
||||||
{
|
{
|
||||||
// Can't submit SYNC before previous writes
|
// Can't submit SYNC before previous writes
|
||||||
continue;
|
continue;
|
||||||
|
@ -734,3 +734,15 @@ void blockstore_impl_t::disk_error_abort(const char *op, int retval, int expecte
|
||||||
fprintf(stderr, "Disk %s failed: result is %d, expected %d. Can't continue, sorry :-(\n", op, retval, expected);
|
fprintf(stderr, "Disk %s failed: result is %d, expected %d. Can't continue, sorry :-(\n", op, retval, expected);
|
||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool blockstore_impl_t::wants_fsync()
|
||||||
|
{
|
||||||
|
if (!unstable_writes.size())
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
uint64_t journal_free_space = journal.next_free < journal.used_start
|
||||||
|
? (journal.used_start - journal.next_free)
|
||||||
|
: (journal.len - journal.next_free + journal.used_start - journal.block_size);
|
||||||
|
return journal_fsync_feedback_limit > 0 && journal.len-journal_free_space >= journal_fsync_feedback_limit;
|
||||||
|
}
|
||||||
|
|
|
@ -264,6 +264,8 @@ class blockstore_impl_t
|
||||||
int throttle_threshold_us = 50;
|
int throttle_threshold_us = 50;
|
||||||
// Maximum writes between automatically added fsync operations
|
// Maximum writes between automatically added fsync operations
|
||||||
uint64_t autosync_writes = 128;
|
uint64_t autosync_writes = 128;
|
||||||
|
// Maximum free space in the journal in bytes to start sending fsync feedback to primary OSDs
|
||||||
|
uint64_t journal_fsync_feedback_limit = 0;
|
||||||
/******* END OF OPTIONS *******/
|
/******* END OF OPTIONS *******/
|
||||||
|
|
||||||
struct ring_consumer_t ring_consumer;
|
struct ring_consumer_t ring_consumer;
|
||||||
|
@ -433,4 +435,6 @@ public:
|
||||||
inline uint64_t get_free_block_count() { return data_alloc->get_free_count(); }
|
inline uint64_t get_free_block_count() { return data_alloc->get_free_count(); }
|
||||||
inline uint32_t get_bitmap_granularity() { return dsk.disk_alignment; }
|
inline uint32_t get_bitmap_granularity() { return dsk.disk_alignment; }
|
||||||
inline uint64_t get_journal_size() { return dsk.journal_len; }
|
inline uint64_t get_journal_size() { return dsk.journal_len; }
|
||||||
|
|
||||||
|
bool wants_fsync();
|
||||||
};
|
};
|
||||||
|
|
|
@ -4,6 +4,25 @@
|
||||||
#include <sys/file.h>
|
#include <sys/file.h>
|
||||||
#include "blockstore_impl.h"
|
#include "blockstore_impl.h"
|
||||||
|
|
||||||
|
static uint64_t parse_fsync_feedback(blockstore_config_t & config, uint64_t journal_len)
|
||||||
|
{
|
||||||
|
uint64_t journal_fsync_feedback_limit = 0;
|
||||||
|
if (config.find("journal_min_free_bytes") == config.end() &&
|
||||||
|
config.find("journal_min_free_percent") == config.end())
|
||||||
|
{
|
||||||
|
journal_fsync_feedback_limit = 90 * journal_len / 100;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
journal_fsync_feedback_limit = strtoull(config["journal_min_free_bytes"].c_str(), NULL, 10);
|
||||||
|
if (!journal_fsync_feedback_limit)
|
||||||
|
{
|
||||||
|
journal_fsync_feedback_limit = strtoull(config["journal_min_free_percent"].c_str(), NULL, 10) * journal_len / 100;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return journal_fsync_feedback_limit;
|
||||||
|
}
|
||||||
|
|
||||||
void blockstore_impl_t::parse_config(blockstore_config_t & config, bool init)
|
void blockstore_impl_t::parse_config(blockstore_config_t & config, bool init)
|
||||||
{
|
{
|
||||||
// Online-configurable options:
|
// Online-configurable options:
|
||||||
|
@ -53,6 +72,8 @@ void blockstore_impl_t::parse_config(blockstore_config_t & config, bool init)
|
||||||
}
|
}
|
||||||
if (!init)
|
if (!init)
|
||||||
{
|
{
|
||||||
|
// has to be parsed after dsk.parse_config(), thus repeated here for online update
|
||||||
|
journal_fsync_feedback_limit = parse_fsync_feedback(config, journal.len);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// Offline-configurable options:
|
// Offline-configurable options:
|
||||||
|
@ -96,6 +117,7 @@ void blockstore_impl_t::parse_config(blockstore_config_t & config, bool init)
|
||||||
config["journal_no_same_sector_overwrites"] == "1" || config["journal_no_same_sector_overwrites"] == "yes";
|
config["journal_no_same_sector_overwrites"] == "1" || config["journal_no_same_sector_overwrites"] == "yes";
|
||||||
journal.inmemory = config["inmemory_journal"] != "false" && config["inmemory_journal"] != "0" &&
|
journal.inmemory = config["inmemory_journal"] != "false" && config["inmemory_journal"] != "0" &&
|
||||||
config["inmemory_journal"] != "no";
|
config["inmemory_journal"] != "no";
|
||||||
|
journal_fsync_feedback_limit = parse_fsync_feedback(config, journal.len);
|
||||||
// Validate
|
// Validate
|
||||||
if (journal.sector_count < 2)
|
if (journal.sector_count < 2)
|
||||||
{
|
{
|
||||||
|
|
|
@ -16,7 +16,6 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
|
||||||
{
|
{
|
||||||
if (immediate_commit == IMMEDIATE_ALL)
|
if (immediate_commit == IMMEDIATE_ALL)
|
||||||
{
|
{
|
||||||
// We can return immediately because sync is only dequeued after all previous writes
|
|
||||||
op->retval = 0;
|
op->retval = 0;
|
||||||
FINISH_OP(op);
|
FINISH_OP(op);
|
||||||
return 2;
|
return 2;
|
||||||
|
|
|
@ -395,24 +395,27 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
json11::Json::object payload;
|
||||||
|
if (this->osd_num)
|
||||||
|
{
|
||||||
|
payload["osd_num"] = this->osd_num;
|
||||||
|
}
|
||||||
#ifdef WITH_RDMA
|
#ifdef WITH_RDMA
|
||||||
if (rdma_context)
|
if (rdma_context)
|
||||||
{
|
{
|
||||||
cl->rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, rdma_max_msg);
|
cl->rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, rdma_max_msg);
|
||||||
if (cl->rdma_conn)
|
if (cl->rdma_conn)
|
||||||
{
|
{
|
||||||
json11::Json payload = json11::Json::object {
|
payload["connect_rdma"] = cl->rdma_conn->addr.to_string();
|
||||||
{ "connect_rdma", cl->rdma_conn->addr.to_string() },
|
payload["rdma_max_msg"] = cl->rdma_conn->max_msg;
|
||||||
{ "rdma_max_msg", cl->rdma_conn->max_msg },
|
}
|
||||||
};
|
}
|
||||||
std::string payload_str = payload.dump();
|
#endif
|
||||||
|
std::string payload_str = json11::Json(payload).dump();
|
||||||
op->req.show_conf.json_len = payload_str.size();
|
op->req.show_conf.json_len = payload_str.size();
|
||||||
op->buf = malloc_or_die(payload_str.size());
|
op->buf = malloc_or_die(payload_str.size());
|
||||||
op->iov.push_back(op->buf, payload_str.size());
|
op->iov.push_back(op->buf, payload_str.size());
|
||||||
memcpy(op->buf, payload_str.c_str(), payload_str.size());
|
memcpy(op->buf, payload_str.c_str(), payload_str.size());
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
op->callback = [this, cl](osd_op_t *op)
|
op->callback = [this, cl](osd_op_t *op)
|
||||||
{
|
{
|
||||||
std::string json_err;
|
std::string json_err;
|
||||||
|
|
|
@ -184,6 +184,14 @@ void osd_t::parse_config(bool init)
|
||||||
// Allow to set it to 0
|
// Allow to set it to 0
|
||||||
autosync_writes = config["autosync_writes"].uint64_value();
|
autosync_writes = config["autosync_writes"].uint64_value();
|
||||||
}
|
}
|
||||||
|
if (!config["fsync_feedback_repeat_interval"].is_null())
|
||||||
|
{
|
||||||
|
fsync_feedback_repeat_interval = config["fsync_feedback_repeat_interval"].uint64_value();
|
||||||
|
}
|
||||||
|
if (!fsync_feedback_repeat_interval)
|
||||||
|
{
|
||||||
|
fsync_feedback_repeat_interval = 500; // ms
|
||||||
|
}
|
||||||
if (!config["client_queue_depth"].is_null())
|
if (!config["client_queue_depth"].is_null())
|
||||||
{
|
{
|
||||||
client_queue_depth = config["client_queue_depth"].uint64_value();
|
client_queue_depth = config["client_queue_depth"].uint64_value();
|
||||||
|
|
|
@ -122,6 +122,7 @@ class osd_t
|
||||||
uint32_t scrub_list_limit = 1000;
|
uint32_t scrub_list_limit = 1000;
|
||||||
bool scrub_find_best = true;
|
bool scrub_find_best = true;
|
||||||
uint64_t scrub_ec_max_bruteforce = 100;
|
uint64_t scrub_ec_max_bruteforce = 100;
|
||||||
|
uint64_t fsync_feedback_repeat_interval = 500;
|
||||||
|
|
||||||
// cluster state
|
// cluster state
|
||||||
|
|
||||||
|
@ -166,6 +167,8 @@ class osd_t
|
||||||
uint64_t unstable_write_count = 0;
|
uint64_t unstable_write_count = 0;
|
||||||
std::map<osd_object_id_t, uint64_t> unstable_writes;
|
std::map<osd_object_id_t, uint64_t> unstable_writes;
|
||||||
std::deque<osd_op_t*> syncs_in_progress;
|
std::deque<osd_op_t*> syncs_in_progress;
|
||||||
|
std::map<int, timespec> unstable_write_osds;
|
||||||
|
int fsync_feedback_timer_id = -1;
|
||||||
|
|
||||||
// client & peer I/O
|
// client & peer I/O
|
||||||
|
|
||||||
|
@ -257,6 +260,7 @@ class osd_t
|
||||||
void exec_show_config(osd_op_t *cur_op);
|
void exec_show_config(osd_op_t *cur_op);
|
||||||
void exec_secondary(osd_op_t *cur_op);
|
void exec_secondary(osd_op_t *cur_op);
|
||||||
void secondary_op_callback(osd_op_t *cur_op);
|
void secondary_op_callback(osd_op_t *cur_op);
|
||||||
|
void fsync_feedback();
|
||||||
|
|
||||||
// primary ops
|
// primary ops
|
||||||
void autosync();
|
void autosync();
|
||||||
|
|
|
@ -29,6 +29,23 @@ void osd_t::secondary_op_callback(osd_op_t *op)
|
||||||
if (op->bs_op->retval > 0)
|
if (op->bs_op->retval > 0)
|
||||||
op->iov.push_back(op->buf, op->bs_op->retval);
|
op->iov.push_back(op->buf, op->bs_op->retval);
|
||||||
}
|
}
|
||||||
|
else if (op->req.hdr.opcode == OSD_OP_SEC_WRITE ||
|
||||||
|
op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE)
|
||||||
|
{
|
||||||
|
#ifndef OSD_STUB
|
||||||
|
fsync_feedback();
|
||||||
|
#endif
|
||||||
|
if (op->req.hdr.opcode == OSD_OP_SEC_WRITE)
|
||||||
|
{
|
||||||
|
auto & u = unstable_write_osds[op->peer_fd];
|
||||||
|
u = u;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (op->req.hdr.opcode == OSD_OP_SEC_SYNC)
|
||||||
|
{
|
||||||
|
// FIXME It would be more correct to track STABILIZE ops, not just reset on SYNC
|
||||||
|
unstable_write_osds.erase(op->peer_fd);
|
||||||
|
}
|
||||||
else if (op->req.hdr.opcode == OSD_OP_SEC_LIST)
|
else if (op->req.hdr.opcode == OSD_OP_SEC_LIST)
|
||||||
{
|
{
|
||||||
// allocated by blockstore
|
// allocated by blockstore
|
||||||
|
@ -45,6 +62,71 @@ void osd_t::secondary_op_callback(osd_op_t *op)
|
||||||
finish_op(op, retval);
|
finish_op(op, retval);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void osd_t::fsync_feedback()
|
||||||
|
{
|
||||||
|
if (!unstable_write_osds.size() || !bs->wants_fsync())
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
bool postpone = false;
|
||||||
|
// Broadcast fsync feedback
|
||||||
|
timespec now;
|
||||||
|
clock_gettime(CLOCK_REALTIME, &now);
|
||||||
|
for (auto up_it = unstable_write_osds.begin(); up_it != unstable_write_osds.end(); )
|
||||||
|
{
|
||||||
|
auto & peer_fd = up_it->first;
|
||||||
|
auto & last_feedback = up_it->second;
|
||||||
|
if (msgr.clients.find(peer_fd) == msgr.clients.end() ||
|
||||||
|
!msgr.clients.at(peer_fd)->osd_num)
|
||||||
|
{
|
||||||
|
unstable_write_osds.erase(up_it++);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
auto diff = (now.tv_sec-last_feedback.tv_sec)*1000 + (now.tv_nsec-last_feedback.tv_nsec)/1000000;
|
||||||
|
if (diff > fsync_feedback_repeat_interval)
|
||||||
|
{
|
||||||
|
last_feedback = now;
|
||||||
|
// Request fsync from the primary OSD
|
||||||
|
// Note: primary OSD should NOT divide syncs by clients or this logic will break
|
||||||
|
osd_op_t *fb_op = new osd_op_t();
|
||||||
|
fb_op->op_type = OSD_OP_OUT;
|
||||||
|
fb_op->req = (osd_any_op_t){
|
||||||
|
.sync = {
|
||||||
|
.header = {
|
||||||
|
.magic = SECONDARY_OSD_OP_MAGIC,
|
||||||
|
.id = msgr.next_subop_id++,
|
||||||
|
.opcode = OSD_OP_SYNC,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
fb_op->callback = [this](osd_op_t *op)
|
||||||
|
{
|
||||||
|
delete op;
|
||||||
|
};
|
||||||
|
fb_op->peer_fd = peer_fd;
|
||||||
|
msgr.outbox_push(fb_op);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
postpone = true;
|
||||||
|
}
|
||||||
|
up_it++;
|
||||||
|
}
|
||||||
|
if (fsync_feedback_timer_id >= 0)
|
||||||
|
{
|
||||||
|
tfd->clear_timer(fsync_feedback_timer_id);
|
||||||
|
fsync_feedback_timer_id = -1;
|
||||||
|
}
|
||||||
|
if (postpone)
|
||||||
|
{
|
||||||
|
fsync_feedback_timer_id = tfd->set_timer(fsync_feedback_repeat_interval, false, [this](int timer_id)
|
||||||
|
{
|
||||||
|
fsync_feedback_timer_id = -1;
|
||||||
|
fsync_feedback();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void osd_t::exec_secondary(osd_op_t *cur_op)
|
void osd_t::exec_secondary(osd_op_t *cur_op)
|
||||||
{
|
{
|
||||||
if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP)
|
if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP)
|
||||||
|
@ -158,6 +240,7 @@ void osd_t::exec_show_config(osd_op_t *cur_op)
|
||||||
json11::Json req_json = cur_op->req.show_conf.json_len > 0
|
json11::Json req_json = cur_op->req.show_conf.json_len > 0
|
||||||
? json11::Json::parse(std::string((char *)cur_op->buf), json_err)
|
? json11::Json::parse(std::string((char *)cur_op->buf), json_err)
|
||||||
: json11::Json();
|
: json11::Json();
|
||||||
|
msgr.clients.at(cur_op->peer_fd)->osd_num = req_json["osd_num"].uint64_value();
|
||||||
// Expose sensitive configuration values so peers can check them
|
// Expose sensitive configuration values so peers can check them
|
||||||
json11::Json::object wire_config = json11::Json::object {
|
json11::Json::object wire_config = json11::Json::object {
|
||||||
{ "osd_num", osd_num },
|
{ "osd_num", osd_num },
|
||||||
|
|
Loading…
Reference in New Issue