diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp index de28da93..bbf77396 100644 --- a/blockstore_flush.cpp +++ b/blockstore_flush.cpp @@ -124,6 +124,7 @@ void journal_flusher_t::force_start() }\ data = ((ring_data_t*)sqe->user_data); +// FIXME: Implement batch flushing bool journal_flusher_co::loop() { // This is much better than implementing the whole function as an FSM diff --git a/osd.cpp b/osd.cpp index 23124554..4b509b92 100644 --- a/osd.cpp +++ b/osd.cpp @@ -28,49 +28,64 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo this->config = config; this->bs = bs; this->ringloop = ringloop; - this->tick_tfd = new timerfd_interval(ringloop, 3, [this]() - { - for (int i = 0; i <= OSD_OP_MAX; i++) - { - if (op_stat_count[i] != 0) - { - printf("avg latency for op %d (%s): %ld us\n", i, osd_op_names[i], op_stat_sum[i]/op_stat_count[i]); - op_stat_count[i] = 0; - op_stat_sum[i] = 0; - } - } - for (int i = 0; i <= OSD_OP_MAX; i++) - { - if (subop_stat_count[i] != 0) - { - printf("avg latency for subop %d (%s): %ld us\n", i, osd_op_names[i], subop_stat_sum[i]/subop_stat_count[i]); - subop_stat_count[i] = 0; - subop_stat_sum[i] = 0; - } - } - if (send_stat_count != 0) - { - printf("avg latency to send stabilize subop: %ld us\n", send_stat_sum/send_stat_count); - send_stat_count = 0; - send_stat_sum = 0; - } - if (incomplete_objects > 0) - { - printf("%lu object(s) incomplete\n", incomplete_objects); - } - if (degraded_objects > 0) - { - printf("%lu object(s) degraded\n", degraded_objects); - } - if (misplaced_objects > 0) - { - printf("%lu object(s) misplaced\n", misplaced_objects); - } - }); + this->bs_block_size = bs->get_block_size(); // FIXME: use bitmap granularity instead this->bs_disk_alignment = bs->get_disk_alignment(); + parse_config(config); + + bind_socket(); + + this->stats_tfd = new timerfd_interval(ringloop, 3, [this]() + { + print_stats(); + }); + + if (run_primary) + init_primary(); + + consumer.loop = [this]() { loop(); }; + ringloop->register_consumer(&consumer); +} + +osd_t::~osd_t() +{ + delete stats_tfd; + if (sync_tfd) + { + delete sync_tfd; + sync_tfd = NULL; + } + ringloop->unregister_consumer(&consumer); + close(epoll_fd); + close(listen_fd); +} + +osd_op_t::~osd_op_t() +{ + if (bs_op) + { + delete bs_op; + } + if (op_data) + { + free(op_data); + } + if (rmw_buf) + { + free(rmw_buf); + } + if (buf) + { + // Note: reusing osd_op_t WILL currently lead to memory leaks + // So we don't reuse it, but free it every time + free(buf); + } +} + +void osd_t::parse_config(blockstore_config_t & config) +{ bind_address = config["bind_address"]; if (bind_address == "") bind_address = "0.0.0.0"; @@ -85,9 +100,13 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo else if (config["immediate_commit"] == "small") immediate_commit = IMMEDIATE_SMALL; run_primary = config["run_primary"] == "true" || config["run_primary"] == "1" || config["run_primary"] == "yes"; - if (run_primary) - init_primary(); + autosync_interval = strtoull(config["autosync_interval"].c_str(), NULL, 10); + if (autosync_interval < 0 || autosync_interval > MAX_AUTOSYNC_INTERVAL) + autosync_interval = DEFAULT_AUTOSYNC_INTERVAL; +} +void osd_t::bind_socket() +{ listen_fd = socket(AF_INET, SOCK_STREAM, 0); if (listen_fd < 0) { @@ -136,39 +155,6 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo close(epoll_fd); throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); } - - consumer.loop = [this]() { loop(); }; - ringloop->register_consumer(&consumer); -} - -osd_t::~osd_t() -{ - delete tick_tfd; - ringloop->unregister_consumer(&consumer); - close(epoll_fd); - close(listen_fd); -} - -osd_op_t::~osd_op_t() -{ - if (bs_op) - { - delete bs_op; - } - if (op_data) - { - free(op_data); - } - if (rmw_buf) - { - free(rmw_buf); - } - if (buf) - { - // Note: reusing osd_op_t WILL currently lead to memory leaks - // So we don't reuse it, but free it every time - free(buf); - } } bool osd_t::shutdown() @@ -423,3 +409,43 @@ void osd_t::exec_op(osd_op_t *cur_op) exec_secondary(cur_op); } } + +void osd_t::print_stats() +{ + for (int i = 0; i <= OSD_OP_MAX; i++) + { + if (op_stat_count[i] != 0) + { + printf("avg latency for op %d (%s): %ld us\n", i, osd_op_names[i], op_stat_sum[i]/op_stat_count[i]); + op_stat_count[i] = 0; + op_stat_sum[i] = 0; + } + } + for (int i = 0; i <= OSD_OP_MAX; i++) + { + if (subop_stat_count[i] != 0) + { + printf("avg latency for subop %d (%s): %ld us\n", i, osd_op_names[i], subop_stat_sum[i]/subop_stat_count[i]); + subop_stat_count[i] = 0; + subop_stat_sum[i] = 0; + } + } + if (send_stat_count != 0) + { + printf("avg latency to send stabilize subop: %ld us\n", send_stat_sum/send_stat_count); + send_stat_count = 0; + send_stat_sum = 0; + } + if (incomplete_objects > 0) + { + printf("%lu object(s) incomplete\n", incomplete_objects); + } + if (degraded_objects > 0) + { + printf("%lu object(s) degraded\n", degraded_objects); + } + if (misplaced_objects > 0) + { + printf("%lu object(s) misplaced\n", misplaced_objects); + } +} diff --git a/osd.h b/osd.h index 004bee6f..b9f86385 100644 --- a/osd.h +++ b/osd.h @@ -42,6 +42,9 @@ #define IMMEDIATE_SMALL 1 #define IMMEDIATE_ALL 2 +#define MAX_AUTOSYNC_INTERVAL 3600 +#define DEFAULT_AUTOSYNC_INTERVAL 5 + //#define OSD_STUB extern const char* osd_op_names[]; @@ -191,6 +194,7 @@ class osd_t bool allow_test_ops = true; int receive_buffer_size = 9000; int immediate_commit = IMMEDIATE_NONE; + int autosync_interval = DEFAULT_AUTOSYNC_INTERVAL; // sync every 5 seconds // peer OSDs @@ -201,6 +205,7 @@ class osd_t unsigned pg_count = 0; uint64_t next_subop_id = 1; osd_recovery_state_t recovery_state; + osd_op_t *autosync_op = NULL; // Unstable writes std::map unstable_writes; @@ -214,7 +219,7 @@ class osd_t uint32_t bs_block_size, bs_disk_alignment; uint64_t pg_stripe_size = 4*1024*1024; // 4 MB by default ring_loop_t *ringloop; - timerfd_interval *tick_tfd; + timerfd_interval *stats_tfd = NULL, *sync_tfd = NULL; int wait_state = 0; int epoll_fd = 0; @@ -232,6 +237,9 @@ class osd_t uint64_t send_stat_count = 0; // methods + void parse_config(blockstore_config_t & config); + void bind_socket(); + void print_stats(); // event loop, socket read/write void loop(); @@ -274,6 +282,7 @@ class osd_t void secondary_op_callback(osd_op_t *cur_op); // primary ops + void autosync(); bool prepare_primary_rw(osd_op_t *cur_op); void continue_primary_read(osd_op_t *cur_op); void continue_primary_write(osd_op_t *cur_op); diff --git a/osd_peering.cpp b/osd_peering.cpp index f8b09597..f662dad5 100644 --- a/osd_peering.cpp +++ b/osd_peering.cpp @@ -31,6 +31,13 @@ void osd_t::init_primary() pgs[1].print_state(); pg_count = 1; peering_state = OSD_CONNECTING_PEERS; + if (autosync_interval > 0) + { + this->sync_tfd = new timerfd_interval(ringloop, 3, [this]() + { + autosync(); + }); + } } osd_peer_def_t osd_t::parse_peer(std::string peer) diff --git a/osd_primary.cpp b/osd_primary.cpp index ba79ef2e..2ecc3341 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -71,10 +71,9 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op) // But we must not use K in the process of calculating the PG number // So we calculate the PG number using a separate setting which should be per-inode (FIXME) pg_num_t pg_num = (cur_op->req.rw.inode + cur_op->req.rw.offset / pg_stripe_size) % pg_count + 1; - // FIXME: Postpone operations in inactive PGs if (pgs.find(pg_num) == pgs.end() || !(pgs[pg_num].state & PG_ACTIVE)) { - finish_op(cur_op, -EINVAL); + finish_op(cur_op, -EPIPE); return false; } uint64_t pg_block_size = bs_block_size * pgs[pg_num].pg_minsize; @@ -620,8 +619,35 @@ resume_7: } } +void osd_t::autosync() +{ + if (immediate_commit != IMMEDIATE_ALL && !autosync_op) + { + autosync_op = new osd_op_t(); + autosync_op->op_type = OSD_OP_IN; + autosync_op->req = { + .sync = { + .header = { + .magic = SECONDARY_OSD_OP_MAGIC, + .id = 1, + .opcode = OSD_OP_SYNC, + }, + }, + }; + autosync_op->callback = [this](osd_op_t *op) + { + if (op->reply.hdr.retval < 0) + { + printf("Warning: automatic sync resulted in an error: %ld (%s)\n", -op->reply.hdr.retval, strerror(-op->reply.hdr.retval)); + } + delete autosync_op; + autosync_op = NULL; + }; + exec_op(autosync_op); + } +} + // Save and clear unstable_writes -> SYNC all -> STABLE all -// FIXME: Run regular automatic syncs based on the number of unstable writes and/or system time void osd_t::continue_primary_sync(osd_op_t *cur_op) { if (!cur_op->op_data)