From a56f8cd14e3da39e7bf0c0cb495085d2c3f43aa6 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Tue, 2 Jun 2020 18:44:23 +0300 Subject: [PATCH] Simplify handle_primary_subop() arguments --- osd.h | 2 +- osd_peering_pg.cpp | 43 ++++++++++++------------- osd_primary_subops.cpp | 49 +++++++++++++++++++++------- osd_test.cpp | 73 +++++++++++++++++++++++++++++++++++++++++- 4 files changed, 131 insertions(+), 36 deletions(-) diff --git a/osd.h b/osd.h index d35472a1..e52e5b6e 100644 --- a/osd.h +++ b/osd.h @@ -194,7 +194,7 @@ class osd_t bool check_write_queue(osd_op_t *cur_op, pg_t & pg); void remove_object_from_state(object_id & oid, pg_osd_set_state_t *object_state, pg_t &pg); bool finalize_primary_write(osd_op_t *cur_op, pg_t & pg, pg_osd_set_t & loc_set, int base_state); - void handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int retval, int expected, uint64_t version); + void handle_primary_subop(osd_op_t *subop, osd_op_t *cur_op); void handle_primary_bs_subop(osd_op_t *subop); void add_bs_subop_stats(osd_op_t *subop); void pg_cancel_write_queue(pg_t & pg, osd_op_t *first_op, object_id oid, int retval); diff --git a/osd_peering_pg.cpp b/osd_peering_pg.cpp index 56f544a1..b2ce66ea 100644 --- a/osd_peering_pg.cpp +++ b/osd_peering_pg.cpp @@ -206,17 +206,6 @@ void pg_obj_state_check_t::finish_object() if (log_level > 1) { printf("Object is incomplete: inode=%lu stripe=%lu version=%lu/%lu\n", oid.inode, oid.stripe, target_ver, max_ver); - for (int i = ver_start; i < ver_end; i++) - { - printf("Present on: osd %lu, role %ld%s\n", list[i].osd_num, (list[i].oid.stripe & STRIPE_MASK), list[i].is_stable ? " (stable)" : ""); - } - } - if (log_level > 2) - { - for (int i = obj_start; i < obj_end; i++) - { - printf("v%lu present on: osd %lu, role %ld%s\n", list[i].version, list[i].osd_num, (list[i].oid.stripe & STRIPE_MASK), list[i].is_stable ? " (stable)" : ""); - } } state = OBJ_INCOMPLETE; pg->state = pg->state | PG_HAS_INCOMPLETE; @@ -226,11 +215,21 @@ void pg_obj_state_check_t::finish_object() if (log_level > 1) { printf("Object is degraded: inode=%lu stripe=%lu version=%lu/%lu\n", oid.inode, oid.stripe, target_ver, max_ver); - for (int i = ver_start; i < ver_end; i++) - { - printf("Present on: osd %lu, role %ld%s\n", list[i].osd_num, (list[i].oid.stripe & STRIPE_MASK), list[i].is_stable ? " (stable)" : ""); - } } + state = OBJ_DEGRADED; + pg->state = pg->state | PG_HAS_DEGRADED; + } + if (n_mismatched > 0) + { + if (n_roles >= pg->pg_cursize && log_level > 1) + { + printf("Object is misplaced: inode=%lu stripe=%lu version=%lu/%lu\n", oid.inode, oid.stripe, target_ver, max_ver); + } + state |= OBJ_MISPLACED; + pg->state = pg->state | PG_HAS_MISPLACED; + } + if (log_level > 1 && (n_roles < pg->pg_cursize || n_mismatched > 0)) + { if (log_level > 2) { for (int i = obj_start; i < obj_end; i++) @@ -238,13 +237,13 @@ void pg_obj_state_check_t::finish_object() printf("v%lu present on: osd %lu, role %ld%s\n", list[i].version, list[i].osd_num, (list[i].oid.stripe & STRIPE_MASK), list[i].is_stable ? " (stable)" : ""); } } - state = OBJ_DEGRADED; - pg->state = pg->state | PG_HAS_DEGRADED; - } - if (n_mismatched > 0) - { - state |= OBJ_MISPLACED; - pg->state = pg->state | PG_HAS_MISPLACED; + else + { + for (int i = ver_start; i < ver_end; i++) + { + printf("Target version present on: osd %lu, role %ld%s\n", list[i].osd_num, (list[i].oid.stripe & STRIPE_MASK), list[i].is_stable ? " (stable)" : ""); + } + } } pg->total_count++; if (state != 0 || ver_end < obj_end) diff --git a/osd_primary_subops.cpp b/osd_primary_subops.cpp index 1ad73852..ccff4dc9 100644 --- a/osd_primary_subops.cpp +++ b/osd_primary_subops.cpp @@ -138,6 +138,13 @@ void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t* .len = w ? stripes[role].write_end - stripes[role].write_start : stripes[role].read_end - stripes[role].read_start, .buf = w ? stripes[role].write_buf : stripes[role].read_buf, }); +#ifdef OSD_DEBUG + printf( + "Submit %s to local: %lu:%lu v%lu %u-%u\n", w ? "write" : "read", + op_data->oid.inode, op_data->oid.stripe | role, op_version, + subops[i].bs_op->offset, subops[i].bs_op->len + ); +#endif bs->enqueue_op(subops[i].bs_op); } else @@ -159,6 +166,13 @@ void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t* .offset = w ? stripes[role].write_start : stripes[role].read_start, .len = w ? stripes[role].write_end - stripes[role].write_start : stripes[role].read_end - stripes[role].read_start, }; +#ifdef OSD_DEBUG + printf( + "Submit %s to osd %lu: %lu:%lu v%lu %u-%u\n", w ? "write" : "read", role_osd_num, + op_data->oid.inode, op_data->oid.stripe | role, op_version, + subops[i].req.sec_rw.offset, subops[i].req.sec_rw.len + ); +#endif subops[i].buf = w ? stripes[role].write_buf : stripes[role].read_buf; if (w && stripes[role].write_end > 0) { @@ -170,10 +184,7 @@ void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t* subop->reply.hdr.retval != subop->req.sec_rw.len ? subop->peer_fd : -1; // so it doesn't get freed subop->buf = NULL; - handle_primary_subop( - subop->req.hdr.opcode, cur_op, subop->reply.hdr.retval, - subop->req.sec_rw.len, subop->reply.sec_rw.version - ); + handle_primary_subop(subop, cur_op); if (fail_fd >= 0) { // write operation failed, drop the connection @@ -213,12 +224,16 @@ void osd_t::handle_primary_bs_subop(osd_op_t *subop) ); } add_bs_subop_stats(subop); - uint64_t opcode = bs_op_to_osd_op[bs_op->opcode]; - int retval = bs_op->retval; - uint64_t version = bs_op->version; + subop->req.hdr.opcode = bs_op_to_osd_op[bs_op->opcode]; + subop->reply.hdr.retval = bs_op->retval; + if (bs_op->opcode == BS_OP_READ || bs_op->opcode == BS_OP_WRITE) + { + subop->req.sec_rw.len = bs_op->len; + subop->reply.sec_rw.version = bs_op->version; + } delete bs_op; subop->bs_op = NULL; - handle_primary_subop(opcode, cur_op, retval, expected, version); + handle_primary_subop(subop, cur_op); } void osd_t::add_bs_subop_stats(osd_op_t *subop) @@ -244,8 +259,12 @@ void osd_t::add_bs_subop_stats(osd_op_t *subop) } } -void osd_t::handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int retval, int expected, uint64_t version) +void osd_t::handle_primary_subop(osd_op_t *subop, osd_op_t *cur_op) { + uint64_t opcode = subop->req.hdr.opcode; + int retval = subop->reply.hdr.retval; + int expected = opcode == OSD_OP_SECONDARY_READ || opcode == OSD_OP_SECONDARY_WRITE + ? subop->req.sec_rw.len : 0; osd_primary_op_data_t *op_data = cur_op->op_data; if (retval != expected) { @@ -261,6 +280,12 @@ void osd_t::handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int retval, op_data->done++; if (opcode == OSD_OP_SECONDARY_READ || opcode == OSD_OP_SECONDARY_WRITE) { + uint64_t version = subop->reply.sec_rw.version; +#ifdef OSD_DEBUG + uint64_t peer_osd = c_cli.clients.find(subop->peer_fd) != c_cli.clients.end() + ? c_cli.clients[subop->peer_fd].osd_num : osd_num; + printf("subop %lu from osd %lu: version = %lu\n", opcode, peer_osd, version); +#endif if (op_data->fact_ver != 0 && op_data->fact_ver != version) { throw std::runtime_error( @@ -380,7 +405,7 @@ void osd_t::submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_os subops[i].callback = [cur_op, this](osd_op_t *subop) { int fail_fd = subop->reply.hdr.retval != 0 ? subop->peer_fd : -1; - handle_primary_subop(OSD_OP_SECONDARY_DELETE, cur_op, subop->reply.hdr.retval, 0, 0); + handle_primary_subop(subop, cur_op); if (fail_fd >= 0) { // delete operation failed, drop the connection @@ -433,7 +458,7 @@ void osd_t::submit_primary_sync_subops(osd_op_t *cur_op) subops[i].callback = [cur_op, this](osd_op_t *subop) { int fail_fd = subop->reply.hdr.retval != 0 ? subop->peer_fd : -1; - handle_primary_subop(OSD_OP_SECONDARY_SYNC, cur_op, subop->reply.hdr.retval, 0, 0); + handle_primary_subop(subop, cur_op); if (fail_fd >= 0) { // sync operation failed, drop the connection @@ -488,7 +513,7 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op) subops[i].callback = [cur_op, this](osd_op_t *subop) { int fail_fd = subop->reply.hdr.retval != 0 ? subop->peer_fd : -1; - handle_primary_subop(OSD_OP_SECONDARY_STABILIZE, cur_op, subop->reply.hdr.retval, 0, 0); + handle_primary_subop(subop, cur_op); if (fail_fd >= 0) { // sync operation failed, drop the connection diff --git a/osd_test.cpp b/osd_test.cpp index 140934af..e61cb78e 100644 --- a/osd_test.cpp +++ b/osd_test.cpp @@ -19,6 +19,8 @@ int connect_osd(const char *osd_address, int osd_port); +uint64_t test_read(int connect_fd, uint64_t inode, uint64_t stripe, uint64_t version, uint64_t offset, uint64_t len); + uint64_t test_write(int connect_fd, uint64_t inode, uint64_t stripe, uint64_t version, uint64_t pattern); void* test_primary_read(int connect_fd, uint64_t inode, uint64_t offset, uint64_t len); @@ -105,7 +107,7 @@ int main3(int narg, char *args[]) return 0; } -int main(int narg, char *args[]) +int main4(int narg, char *args[]) { int connect_fd; // Cluster write (sync not implemented yet) @@ -117,6 +119,15 @@ int main(int narg, char *args[]) return 0; } +int main(int narg, char *args[]) +{ + int connect_fd; + connect_fd = connect_osd("192.168.7.2", 43051); + test_read(connect_fd, 1, 1039663104, UINT64_MAX, 0, 128*1024); + close(connect_fd); + return 0; +} + int connect_osd(const char *osd_address, int osd_port) { struct sockaddr_in addr; @@ -167,6 +178,66 @@ bool check_reply(int r, osd_any_op_t & op, osd_any_reply_t & reply, int expected return true; } +uint64_t test_read(int connect_fd, uint64_t inode, uint64_t stripe, uint64_t version, uint64_t offset, uint64_t len) +{ + osd_any_op_t op; + osd_any_reply_t reply; + op.hdr.magic = SECONDARY_OSD_OP_MAGIC; + op.hdr.id = 1; + op.hdr.opcode = OSD_OP_SECONDARY_READ; + op.sec_rw.oid = { + .inode = inode, + .stripe = stripe, + }; + op.sec_rw.version = version; + op.sec_rw.offset = offset; + op.sec_rw.len = len; + void *data = memalign(MEM_ALIGNMENT, op.sec_rw.len); + write_blocking(connect_fd, op.buf, OSD_PACKET_SIZE); + int r = read_blocking(connect_fd, reply.buf, OSD_PACKET_SIZE); + if (!check_reply(r, op, reply, op.sec_rw.len)) + { + free(data); + return 0; + } + r = read_blocking(connect_fd, data, len); + if (r != len) + { + free(data); + perror("read data"); + return 0; + } + free(data); + printf("Read %lu:%lu v%lu = v%lu\n", inode, stripe, version, reply.sec_rw.version); + op.hdr.opcode = OSD_OP_SECONDARY_LIST; + op.sec_list.list_pg = 1; + op.sec_list.pg_count = 1; + op.sec_list.pg_stripe_size = 4*1024*1024; + write_blocking(connect_fd, op.buf, OSD_PACKET_SIZE); + r = read_blocking(connect_fd, reply.buf, OSD_PACKET_SIZE); + if (reply.hdr.retval < 0 || !check_reply(r, op, reply, reply.hdr.retval)) + { + return 0; + } + data = memalign(MEM_ALIGNMENT, sizeof(obj_ver_id)*reply.hdr.retval); + r = read_blocking(connect_fd, data, sizeof(obj_ver_id)*reply.hdr.retval); + if (r != sizeof(obj_ver_id)*reply.hdr.retval) + { + free(data); + perror("read data"); + return 0; + } + obj_ver_id *ov = (obj_ver_id*)data; + for (int i = 0; i < reply.hdr.retval; i++) + { + if (ov[i].oid.inode == inode && (ov[i].oid.stripe & ~(4096-1)) == (stripe & ~(4096-1))) + { + printf("list: %lu:%lu v%lu stable=%d\n", ov[i].oid.inode, ov[i].oid.stripe, ov[i].version, i < reply.sec_list.stable_count ? 1 : 0); + } + } + return 0; +} + uint64_t test_write(int connect_fd, uint64_t inode, uint64_t stripe, uint64_t version, uint64_t pattern) { osd_any_op_t op;