From f71d0c117b31cb905ee0857ff46973363675e7cc Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Mon, 11 May 2020 02:58:13 +0300 Subject: [PATCH] Measure & report op bandwidth, include local blockstore ops in stats --- osd.cpp | 8 ++- osd.h | 3 ++ osd_cluster.cpp | 6 ++- osd_flush.cpp | 2 + osd_peering.cpp | 6 ++- osd_primary_subops.cpp | 117 ++++++++++++++++++++++++++--------------- osd_send.cpp | 10 ++++ 7 files changed, 106 insertions(+), 46 deletions(-) diff --git a/osd.cpp b/osd.cpp index a048e8fa..b6faa1ac 100644 --- a/osd.cpp +++ b/osd.cpp @@ -525,9 +525,15 @@ void osd_t::print_stats() if (op_stat_count[0][i] != op_stat_count[1][i]) { uint64_t avg = (op_stat_sum[0][i] - op_stat_sum[1][i])/(op_stat_count[0][i] - op_stat_count[1][i]); - printf("avg latency for op %d (%s): %ld us\n", i, osd_op_names[i], avg); + uint64_t bw = (op_stat_bytes[0][i] - op_stat_bytes[1][i]) / print_stats_interval; + printf( + "avg latency for op %d (%s): %lu us, B/W: %.2f %s\n", i, osd_op_names[i], avg, + (bw > 1024*1024*1024 ? bw/1024.0/1024/1024 : (bw > 1024*1024 ? bw/1024.0/1024 : bw/1024.0)), + (bw > 1024*1024*1024 ? "GB/s" : (bw > 1024*1024 ? "MB/s" : "KB/s")) + ); op_stat_count[1][i] = op_stat_count[0][i]; op_stat_sum[1][i] = op_stat_sum[0][i]; + op_stat_bytes[1][i] = op_stat_bytes[0][i]; } } for (int i = 0; i <= OSD_OP_MAX; i++) diff --git a/osd.h b/osd.h index 4f5c880c..2338a893 100644 --- a/osd.h +++ b/osd.h @@ -285,6 +285,7 @@ class osd_t // op statistics uint64_t op_stat_sum[2][OSD_OP_MAX+1] = { 0 }; uint64_t op_stat_count[2][OSD_OP_MAX+1] = { 0 }; + uint64_t op_stat_bytes[2][OSD_OP_MAX+1] = { 0 }; uint64_t subop_stat_sum[2][OSD_OP_MAX+1] = { 0 }; uint64_t subop_stat_count[2][OSD_OP_MAX+1] = { 0 }; @@ -380,6 +381,8 @@ class osd_t void remove_object_from_state(object_id & oid, pg_osd_set_state_t *object_state, pg_t &pg); bool finalize_primary_write(osd_op_t *cur_op, pg_t & pg, pg_osd_set_t & loc_set, int base_state); void handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int retval, int expected, uint64_t version); + void handle_primary_bs_subop(osd_op_t *subop); + void add_bs_subop_stats(osd_op_t *subop); void pg_cancel_write_queue(pg_t & pg, object_id oid, int retval); void submit_primary_subops(int submit_type, int read_pg_size, const uint64_t* osd_set, osd_op_t *cur_op); void submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_osd_set_t & loc_set); diff --git a/osd_cluster.cpp b/osd_cluster.cpp index 7d02c0c7..990d48e4 100644 --- a/osd_cluster.cpp +++ b/osd_cluster.cpp @@ -152,12 +152,15 @@ json11::Json osd_t::get_statistics() st["size"] = bs->get_block_count() * bs->get_block_size(); st["free"] = bs->get_free_block_count() * bs->get_block_size(); } + // FIXME: report recovery ops and bandwidth + // FIXME: handle integer overflow json11::Json::object op_stats, subop_stats; for (int i = 0; i <= OSD_OP_MAX; i++) { op_stats[osd_op_names[i]] = json11::Json::object { { "count", op_stat_count[0][i] }, { "sum", op_stat_sum[0][i] }, + { "bytes", op_stat_bytes[0][i] }, }; } for (int i = 0; i <= OSD_OP_MAX; i++) @@ -297,6 +300,7 @@ void osd_t::start_etcd_watcher() } } }); + // FIXME apply config changes in runtime etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object { { "create_request", json11::Json::object { { "key", base64_encode(etcd_prefix+"/config/") }, @@ -334,7 +338,7 @@ void osd_t::start_etcd_watcher() void osd_t::load_global_config() { etcd_call("/kv/range", json11::Json::object { - { "key", base64_encode(etcd_prefix+"/config/osd/all") } + { "key", base64_encode(etcd_prefix+"/config/global") } }, ETCD_SLOW_TIMEOUT, [this](std::string err, json11::Json data) { if (err != "") diff --git a/osd_flush.cpp b/osd_flush.cpp index 5a56eb3b..62f19a52 100644 --- a/osd_flush.cpp +++ b/osd_flush.cpp @@ -152,10 +152,12 @@ void osd_t::submit_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback if (peer_osd == this->osd_num) { // local + clock_gettime(CLOCK_REALTIME, &op->tv_begin); op->bs_op = new blockstore_op_t({ .opcode = (uint64_t)(rollback ? BS_OP_ROLLBACK : BS_OP_STABLE), .callback = [this, op, pg_num, fb](blockstore_op_t *bs_op) { + add_bs_subop_stats(op); handle_flush_op(pg_num, fb, this->osd_num, bs_op->retval); delete op; }, diff --git a/osd_peering.cpp b/osd_peering.cpp index 37e8963b..35650361 100644 --- a/osd_peering.cpp +++ b/osd_peering.cpp @@ -412,6 +412,7 @@ void osd_t::submit_sync_and_list_subop(osd_num_t role_osd, pg_peering_state_t *p osd_op_t *op = new osd_op_t(); op->op_type = 0; op->peer_fd = 0; + clock_gettime(CLOCK_REALTIME, &op->tv_begin); op->bs_op = new blockstore_op_t(); op->bs_op->opcode = BS_OP_SYNC; op->bs_op->callback = [this, ps, op, role_osd](blockstore_op_t *bs_op) @@ -422,6 +423,7 @@ void osd_t::submit_sync_and_list_subop(osd_num_t role_osd, pg_peering_state_t *p force_stop(1); return; } + add_bs_subop_stats(op); delete op; ps->list_ops.erase(role_osd); submit_list_subop(role_osd, ps); @@ -474,17 +476,19 @@ void osd_t::submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps) osd_op_t *op = new osd_op_t(); op->op_type = 0; op->peer_fd = 0; + clock_gettime(CLOCK_REALTIME, &op->tv_begin); op->bs_op = new blockstore_op_t(); op->bs_op->opcode = BS_OP_LIST; op->bs_op->oid.stripe = pg_stripe_size; op->bs_op->len = pg_count; op->bs_op->offset = ps->pg_num-1; - op->bs_op->callback = [ps, op, role_osd](blockstore_op_t *bs_op) + op->bs_op->callback = [this, ps, op, role_osd](blockstore_op_t *bs_op) { if (op->bs_op->retval < 0) { throw std::runtime_error("local OP_LIST failed"); } + add_bs_subop_stats(op); printf( "[PG %u] Got object list from OSD %lu (local): %d object versions (%lu of them stable)\n", ps->pg_num, role_osd, bs_op->retval, bs_op->version diff --git a/osd_primary_subops.cpp b/osd_primary_subops.cpp index eea5e7fc..0e1a97d8 100644 --- a/osd_primary_subops.cpp +++ b/osd_primary_subops.cpp @@ -97,7 +97,7 @@ void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t* op_data->done = op_data->errors = 0; op_data->n_subops = n_subops; op_data->subops = subops; - int subop = 0; + int i = 0; for (int role = 0; role < pg_size; role++) { // We always submit zero-length writes to all replicas, even if the stripe is not modified @@ -110,19 +110,13 @@ void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t* { if (role_osd_num == this->osd_num) { - subops[subop].bs_op = new blockstore_op_t({ + clock_gettime(CLOCK_REALTIME, &subops[i].tv_begin); + subops[i].op_type = (long)cur_op; + subops[i].bs_op = new blockstore_op_t({ .opcode = (uint64_t)(w ? BS_OP_WRITE : BS_OP_READ), - .callback = [cur_op, this](blockstore_op_t *subop) + .callback = [subop = &subops[i], this](blockstore_op_t *bs_subop) { - if (subop->opcode == BS_OP_WRITE && subop->retval != subop->len) - { - // die - throw std::runtime_error("local write operation failed (retval = "+std::to_string(subop->retval)+")"); - } - handle_primary_subop( - subop->opcode == BS_OP_WRITE ? OSD_OP_SECONDARY_WRITE : OSD_OP_SECONDARY_READ, - cur_op, subop->retval, subop->len, subop->version - ); + handle_primary_bs_subop(subop); }, .oid = { .inode = op_data->oid.inode, @@ -133,14 +127,14 @@ void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t* .len = w ? stripes[role].write_end - stripes[role].write_start : stripes[role].read_end - stripes[role].read_start, .buf = w ? stripes[role].write_buf : stripes[role].read_buf, }); - bs->enqueue_op(subops[subop].bs_op); + bs->enqueue_op(subops[i].bs_op); } else { - subops[subop].op_type = OSD_OP_OUT; - subops[subop].send_list.push_back(subops[subop].req.buf, OSD_PACKET_SIZE); - subops[subop].peer_fd = this->osd_peer_fds.at(role_osd_num); - subops[subop].req.sec_rw = { + subops[i].op_type = OSD_OP_OUT; + subops[i].send_list.push_back(subops[i].req.buf, OSD_PACKET_SIZE); + subops[i].peer_fd = this->osd_peer_fds.at(role_osd_num); + subops[i].req.sec_rw = { .header = { .magic = SECONDARY_OSD_OP_MAGIC, .id = this->next_subop_id++, @@ -154,12 +148,12 @@ void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t* .offset = w ? stripes[role].write_start : stripes[role].read_start, .len = w ? stripes[role].write_end - stripes[role].write_start : stripes[role].read_end - stripes[role].read_start, }; - subops[subop].buf = w ? stripes[role].write_buf : stripes[role].read_buf; + subops[i].buf = w ? stripes[role].write_buf : stripes[role].read_buf; if (w && stripes[role].write_end > 0) { - subops[subop].send_list.push_back(stripes[role].write_buf, stripes[role].write_end - stripes[role].write_start); + subops[i].send_list.push_back(stripes[role].write_buf, stripes[role].write_end - stripes[role].write_start); } - subops[subop].callback = [cur_op, this](osd_op_t *subop) + subops[i].callback = [cur_op, this](osd_op_t *subop) { int fail_fd = subop->req.hdr.opcode == OSD_OP_SECONDARY_WRITE && subop->reply.hdr.retval != subop->req.sec_rw.len ? subop->peer_fd : -1; @@ -175,13 +169,59 @@ void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t* stop_client(fail_fd); } }; - outbox_push(clients[subops[subop].peer_fd], &subops[subop]); + outbox_push(clients[subops[i].peer_fd], &subops[i]); } - subop++; + i++; } } } +static uint64_t bs_op_to_osd_op[] = { + 0, + OSD_OP_SECONDARY_READ, // BS_OP_READ + OSD_OP_SECONDARY_WRITE, // BS_OP_WRITE + OSD_OP_SECONDARY_SYNC, // BS_OP_SYNC + OSD_OP_SECONDARY_STABILIZE, // BS_OP_STABLE + OSD_OP_SECONDARY_DELETE, // BS_OP_DELETE + OSD_OP_SECONDARY_LIST, // BS_OP_LIST + OSD_OP_SECONDARY_ROLLBACK, // BS_OP_ROLLBACK + OSD_OP_TEST_SYNC_STAB_ALL, // BS_OP_SYNC_STAB_ALL +}; + +void osd_t::handle_primary_bs_subop(osd_op_t *subop) +{ + osd_op_t *cur_op = (osd_op_t*)(long)subop->op_type; + blockstore_op_t *bs_op = subop->bs_op; + int expected = bs_op->opcode == BS_OP_READ || bs_op->opcode == BS_OP_WRITE ? bs_op->len : 0; + if (bs_op->retval != expected && bs_op->opcode != BS_OP_READ) + { + // die + throw std::runtime_error( + "local blockstore modification failed (opcode = "+std::to_string(bs_op->opcode)+ + " retval = "+std::to_string(bs_op->retval)+")" + ); + } + add_bs_subop_stats(subop); + handle_primary_subop(bs_op_to_osd_op[bs_op->opcode], cur_op, bs_op->retval, expected, bs_op->version); +} + +void osd_t::add_bs_subop_stats(osd_op_t *subop) +{ + // Include local blockstore ops in statistics + uint64_t opcode = bs_op_to_osd_op[subop->bs_op->opcode]; + timespec tv_end; + clock_gettime(CLOCK_REALTIME, &tv_end); + op_stat_count[0][opcode]++; + op_stat_sum[0][opcode] += ( + (tv_end.tv_sec - subop->tv_begin.tv_sec)*1000000 + + (tv_end.tv_nsec - subop->tv_begin.tv_nsec)/1000 + ); + if (opcode == OSD_OP_SECONDARY_READ || opcode == OSD_OP_SECONDARY_WRITE) + { + op_stat_bytes[0][opcode] += subop->bs_op->len; + } +} + void osd_t::handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int retval, int expected, uint64_t version) { osd_primary_op_data_t *op_data = cur_op->op_data; @@ -260,16 +300,13 @@ void osd_t::submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_os { if (chunk.osd_num == this->osd_num) { + clock_gettime(CLOCK_REALTIME, &subops[i].tv_begin); + subops[i].op_type = (long)cur_op; subops[i].bs_op = new blockstore_op_t({ .opcode = BS_OP_DELETE, - .callback = [cur_op, this](blockstore_op_t *subop) + .callback = [subop = &subops[i], this](blockstore_op_t *bs_subop) { - if (subop->retval != 0) - { - // die - throw std::runtime_error("local delete operation failed"); - } - handle_primary_subop(OSD_OP_SECONDARY_DELETE, cur_op, subop->retval, 0, 0); + handle_primary_bs_subop(subop); }, .oid = { .inode = op_data->oid.inode, @@ -328,16 +365,13 @@ void osd_t::submit_primary_sync_subops(osd_op_t *cur_op) osd_num_t sync_osd = (*(op_data->unstable_write_osds))[i].osd_num; if (sync_osd == this->osd_num) { + clock_gettime(CLOCK_REALTIME, &subops[i].tv_begin); + subops[i].op_type = (long)cur_op; subops[i].bs_op = new blockstore_op_t({ .opcode = BS_OP_SYNC, - .callback = [cur_op, this](blockstore_op_t *subop) + .callback = [subop = &subops[i], this](blockstore_op_t *bs_subop) { - if (subop->retval != 0) - { - // die - throw std::runtime_error("local sync operation failed"); - } - handle_primary_subop(OSD_OP_SECONDARY_SYNC, cur_op, subop->retval, 0, 0); + handle_primary_bs_subop(subop); }, }); bs->enqueue_op(subops[i].bs_op); @@ -382,16 +416,13 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op) auto & stab_osd = (*(op_data->unstable_write_osds))[i]; if (stab_osd.osd_num == this->osd_num) { + clock_gettime(CLOCK_REALTIME, &subops[i].tv_begin); + subops[i].op_type = (long)cur_op; subops[i].bs_op = new blockstore_op_t({ .opcode = BS_OP_STABLE, - .callback = [cur_op, this](blockstore_op_t *subop) + .callback = [subop = &subops[i], this](blockstore_op_t *bs_subop) { - if (subop->retval != 0) - { - // die - throw std::runtime_error("local stabilize operation failed"); - } - handle_primary_subop(OSD_OP_SECONDARY_STABILIZE, cur_op, subop->retval, 0, 0); + handle_primary_bs_subop(subop); }, .len = (uint32_t)stab_osd.len, .buf = (void*)(op_data->unstable_writes + stab_osd.start), diff --git a/osd_send.cpp b/osd_send.cpp index 991c1fc3..82eeda57 100644 --- a/osd_send.cpp +++ b/osd_send.cpp @@ -44,6 +44,16 @@ bool osd_t::try_send(osd_client_t & cl) (tv_end.tv_sec - cl.write_op->tv_begin.tv_sec)*1000000 + (tv_end.tv_nsec - cl.write_op->tv_begin.tv_nsec)/1000 ); + if (cl.write_op->req.hdr.opcode == OSD_OP_READ || + cl.write_op->req.hdr.opcode == OSD_OP_WRITE) + { + op_stat_bytes[0][cl.write_op->req.hdr.opcode] += cl.write_op->req.rw.len; + } + else if (cl.write_op->req.hdr.opcode == OSD_OP_SECONDARY_READ || + cl.write_op->req.hdr.opcode == OSD_OP_SECONDARY_WRITE) + { + op_stat_bytes[0][cl.write_op->req.hdr.opcode] += cl.write_op->req.sec_rw.len; + } } } cl.write_msg.msg_iov = cl.write_op->send_list.get_iovec();