From ffe073473a8a5288ccf04b9a63168b728ba18cae Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Thu, 13 Feb 2020 19:13:16 +0300 Subject: [PATCH] Remove hardcode of the EC(2+1) scheme, now it supports EC(k+1), fix some bugs --- blockstore.h | 5 +- blockstore_impl.cpp | 29 ++++++--- blockstore_impl.h | 2 +- blockstore_write.cpp | 7 ++- object_id.h | 12 +--- osd.h | 6 +- osd_exec_secondary.cpp | 7 ++- osd_ops.h | 3 +- osd_peering.cpp | 24 ++++---- osd_peering_pg.cpp | 8 +-- osd_peering_pg.h | 3 + osd_primary.cpp | 130 +++++++++++++++++++++++++++++------------ test_osd.cpp | 42 ++++++++----- 13 files changed, 183 insertions(+), 95 deletions(-) diff --git a/blockstore.h b/blockstore.h index 824422bd..0bdab53a 100644 --- a/blockstore.h +++ b/blockstore.h @@ -37,8 +37,9 @@ /* BS_OP_LIST: Input: -- len = divisor -- offset = modulo. object is listed if (object_id % len) == offset. +- oid.stripe = parity block size +- len = PG count or 0 to list all objects +- offset = PG number Output: - retval = total obj_ver_id count diff --git a/blockstore_impl.cpp b/blockstore_impl.cpp index 98cf43f6..d478a052 100644 --- a/blockstore_impl.cpp +++ b/blockstore_impl.cpp @@ -312,6 +312,10 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op, bool first) op->callback(op); return; } + if (op->opcode == BS_OP_WRITE && !enqueue_write(op)) + { + return; + } // Call constructor without allocating memory. We'll call destructor before returning op back new ((void*)op->private_data) blockstore_op_private_t; PRIV(op)->wait_for = 0; @@ -325,22 +329,28 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op, bool first) { submit_queue.push_front(op); } - if (op->opcode == BS_OP_WRITE) - { - enqueue_write(op); - } ringloop->wakeup(); } void blockstore_impl_t::process_list(blockstore_op_t *op) { // Count objects + uint32_t list_pg = op->offset; + uint32_t pg_count = op->len; + uint64_t parity_block_size = op->oid.stripe; + if (pg_count != 0 && (parity_block_size < MIN_BLOCK_SIZE || list_pg >= pg_count)) + { + op->retval = -EINVAL; + FINISH_OP(op); + return; + } uint64_t stable_count = 0; - if (op->len) + if (pg_count > 0) { for (auto it = clean_db.begin(); it != clean_db.end(); it++) { - if ((it->first % op->len) == op->offset) + uint32_t pg = (it->first.inode + it->first.stripe / parity_block_size) % pg_count; + if (pg == list_pg) { stable_count++; } @@ -353,7 +363,7 @@ void blockstore_impl_t::process_list(blockstore_op_t *op) uint64_t total_count = stable_count; for (auto it = dirty_db.begin(); it != dirty_db.end(); it++) { - if (!op->len || (it->first.oid % op->len) == op->offset) + if (!pg_count || ((it->first.oid.inode + it->first.oid.stripe / parity_block_size) % pg_count) == list_pg) { if (IS_STABLE(it->second.state)) { @@ -369,13 +379,14 @@ void blockstore_impl_t::process_list(blockstore_op_t *op) if (!op->buf) { op->retval = -ENOMEM; + FINISH_OP(op); return; } obj_ver_id *vers = (obj_ver_id*)op->buf; int i = 0; for (auto it = clean_db.begin(); it != clean_db.end(); it++) { - if (!op->len || (it->first % op->len) == op->offset) + if (!pg_count || ((it->first.inode + it->first.stripe / parity_block_size) % pg_count) == list_pg) { vers[i++] = { .oid = it->first, @@ -386,7 +397,7 @@ void blockstore_impl_t::process_list(blockstore_op_t *op) int j = stable_count; for (auto it = dirty_db.begin(); it != dirty_db.end(); it++) { - if (!op->len || (it->first.oid % op->len) == op->offset) + if (!pg_count || ((it->first.oid.inode + it->first.oid.stripe / parity_block_size) % pg_count) == list_pg) { if (IS_STABLE(it->second.state)) { diff --git a/blockstore_impl.h b/blockstore_impl.h index 1ea4f366..351163dc 100644 --- a/blockstore_impl.h +++ b/blockstore_impl.h @@ -258,7 +258,7 @@ class blockstore_impl_t void handle_read_event(ring_data_t *data, blockstore_op_t *op); // Write - void enqueue_write(blockstore_op_t *op); + bool enqueue_write(blockstore_op_t *op); int dequeue_write(blockstore_op_t *op); int dequeue_del(blockstore_op_t *op); void ack_write(blockstore_op_t *op); diff --git a/blockstore_write.cpp b/blockstore_write.cpp index 00ca6d0c..d2761e89 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -1,6 +1,6 @@ #include "blockstore_impl.h" -void blockstore_impl_t::enqueue_write(blockstore_op_t *op) +bool blockstore_impl_t::enqueue_write(blockstore_op_t *op) { // Check or assign version number bool found = false, deleted = false, is_del = (op->opcode == BS_OP_DELETE); @@ -40,14 +40,14 @@ void blockstore_impl_t::enqueue_write(blockstore_op_t *op) // Invalid version requested op->retval = -EINVAL; FINISH_OP(op); - return; + return false; } if (deleted && is_del) { // Already deleted op->retval = 0; FINISH_OP(op); - return; + return false; } // Immediately add the operation into dirty_db, so subsequent reads could see it #ifdef BLOCKSTORE_DEBUG @@ -68,6 +68,7 @@ void blockstore_impl_t::enqueue_write(blockstore_op_t *op) .len = is_del ? 0 : op->len, .journal_sector = 0, }); + return true; } // First step of the write algorithm: dequeue operation and submit initial write(s) diff --git a/object_id.h b/object_id.h index ccc14c7d..5264a3e5 100644 --- a/object_id.h +++ b/object_id.h @@ -2,23 +2,15 @@ #include -// Max 64 replicas -#define STRIPE_MASK 0x3F -#define STRIPE_SHIFT 6 - // 16 bytes per object/stripe id -// stripe includes replica number in 6 (or maybe 4, see above) least significant bits +// stripe = (start of the parity stripe + peer role) +// i.e. for example (256KB + one of 0,1,2) struct __attribute__((__packed__)) object_id { uint64_t inode; uint64_t stripe; }; -inline uint64_t operator % (const object_id & a, const uint64_t b) -{ - return ((a.inode % b) * (0x100000000 % b) * (0x100000000 % b) + (a.stripe >> STRIPE_SHIFT) % b) % b; -} - inline bool operator == (const object_id & a, const object_id & b) { return a.inode == b.inode && a.stripe == b.stripe; diff --git a/osd.h b/osd.h index 8abf0648..eaf9c1a9 100644 --- a/osd.h +++ b/osd.h @@ -10,6 +10,7 @@ #include #include +#include #include #include "blockstore.h" @@ -149,6 +150,9 @@ struct osd_client_t // Outbound messages (replies or requests) std::deque outbox; + // PGs dirtied by this client's primary-writes + std::set dirty_pgs; + // Write state osd_op_t *write_op = NULL; iovec write_iov; @@ -188,6 +192,7 @@ class osd_t int inflight_ops = 0; blockstore_t *bs; uint32_t bs_block_size, bs_disk_alignment; + uint64_t parity_block_size = 4*1024*1024; // 4 MB by default ring_loop_t *ringloop; timerfd_interval *tick_tfd; @@ -239,7 +244,6 @@ class osd_t void exec_primary_read(osd_op_t *cur_op); void exec_primary_write(osd_op_t *cur_op); void exec_primary_sync(osd_op_t *cur_op); - void make_primary_reply(osd_op_t *op); void finish_primary_op(osd_op_t *cur_op, int retval); void handle_primary_read_subop(osd_op_t *cur_op, int ok); int extend_missing_stripes(osd_read_stripe_t *stripes, osd_num_t *osd_set, int minsize, int size); diff --git a/osd_exec_secondary.cpp b/osd_exec_secondary.cpp index 75f2ccb6..e4cf48ac 100644 --- a/osd_exec_secondary.cpp +++ b/osd_exec_secondary.cpp @@ -51,15 +51,16 @@ void osd_t::exec_secondary(osd_op_t *cur_op) } else if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_LIST) { - if (cur_op->op.sec_list.pgtotal < cur_op->op.sec_list.pgnum) + if (cur_op->op.sec_list.pg_count < cur_op->op.sec_list.list_pg) { // requested pg number is greater than total pg count cur_op->bs_op.retval = -EINVAL; secondary_op_callback(cur_op); return; } - cur_op->bs_op.len = cur_op->op.sec_list.pgtotal; - cur_op->bs_op.offset = cur_op->op.sec_list.pgnum - 1; + cur_op->bs_op.oid.stripe = cur_op->op.sec_list.parity_block_size; + cur_op->bs_op.len = cur_op->op.sec_list.pg_count; + cur_op->bs_op.offset = cur_op->op.sec_list.list_pg - 1; } #ifdef OSD_STUB cur_op->bs_op.retval = cur_op->bs_op.len; diff --git a/osd_ops.h b/osd_ops.h index 0feb105f..8309e10c 100644 --- a/osd_ops.h +++ b/osd_ops.h @@ -131,7 +131,8 @@ struct __attribute__((__packed__)) osd_op_secondary_list_t { osd_op_header_t header; // placement group total number and total count - pg_num_t pgnum, pgtotal; + pg_num_t list_pg, pg_count; + uint64_t parity_block_size; }; struct __attribute__((__packed__)) osd_reply_secondary_list_t diff --git a/osd_peering.cpp b/osd_peering.cpp index fd61ea65..b5e8c780 100644 --- a/osd_peering.cpp +++ b/osd_peering.cpp @@ -234,18 +234,18 @@ void osd_t::start_pg_peering(int pg_idx) if (pg.peering_state) { // Adjust the peering operation that's still in progress - for (auto & p: pg.peering_state->list_ops) + for (auto it = pg.peering_state->list_ops.begin(); it != pg.peering_state->list_ops.end(); it++) { int role; for (role = 0; role < pg.cur_set.size(); role++) { - if (pg.cur_set[role] == p.first) + if (pg.cur_set[role] == it->first) break; } if (pg.state == PG_INCOMPLETE || role >= pg.cur_set.size()) { // Discard the result after completion, which, chances are, will be unsuccessful - auto list_op = p.second; + auto list_op = it->second; if (list_op->peer_fd == 0) { // Self @@ -264,7 +264,8 @@ void osd_t::start_pg_peering(int pg_idx) delete list_op; }; } - pg.peering_state->list_ops.erase(p.first); + pg.peering_state->list_ops.erase(it); + it = pg.peering_state->list_ops.begin(); } } for (auto & p: pg.peering_state->list_results) @@ -315,6 +316,9 @@ void osd_t::start_pg_peering(int pg_idx) op->op_type = 0; op->peer_fd = 0; op->bs_op.opcode = BS_OP_LIST; + op->bs_op.oid.stripe = parity_block_size; + op->bs_op.len = pg_count, + op->bs_op.offset = pg.pg_num-1, op->bs_op.callback = [ps, op, role_osd](blockstore_op_t *bs_op) { if (op->bs_op.retval < 0) @@ -351,19 +355,19 @@ void osd_t::start_pg_peering(int pg_idx) .id = this->next_subop_id++, .opcode = OSD_OP_SECONDARY_LIST, }, - .pgnum = pg.pg_num, - .pgtotal = pg_count, + .list_pg = pg.pg_num, + .pg_count = pg_count, + .parity_block_size = parity_block_size, }, }; op->callback = [this, ps, role_osd](osd_op_t *op) { if (op->reply.hdr.retval < 0) { - int peer_fd = op->peer_fd; - printf("Failed to get object list from OSD %lu, disconnecting peer\n", role_osd); - delete op; + printf("Failed to get object list from OSD %lu (retval=%ld), disconnecting peer\n", role_osd, op->reply.hdr.retval); ps->list_ops.erase(role_osd); - stop_client(peer_fd); + stop_client(op->peer_fd); + delete op; return; } printf( diff --git a/osd_peering_pg.cpp b/osd_peering_pg.cpp index 8ac3dd7f..e34f2e42 100644 --- a/osd_peering_pg.cpp +++ b/osd_peering_pg.cpp @@ -170,7 +170,7 @@ void pg_t::calc_object_states() for (int i = 0; i < all.size(); i++) { if (st.oid.inode != all[i].oid.inode || - st.oid.stripe != (all[i].oid.stripe >> STRIPE_SHIFT)) + st.oid.stripe != (all[i].oid.stripe & ~STRIPE_MASK)) { if (st.oid.inode != 0) { @@ -179,7 +179,7 @@ void pg_t::calc_object_states() remember_object(st, all); } st.obj_start = st.ver_start = i; - st.oid = { .inode = all[i].oid.inode, .stripe = all[i].oid.stripe >> STRIPE_SHIFT }; + st.oid = { .inode = all[i].oid.inode, .stripe = all[i].oid.stripe & ~STRIPE_MASK }; st.max_ver = st.target_ver = all[i].version; st.has_roles = st.n_copies = st.n_roles = st.n_stable = st.n_matched = 0; st.is_buggy = st.has_old_unstable = false; @@ -192,7 +192,7 @@ void pg_t::calc_object_states() st.ver_end = i; i++; while (i < all.size() && st.oid.inode == all[i].oid.inode && - st.oid.stripe == (all[i].oid.stripe >> STRIPE_SHIFT)) + st.oid.stripe == (all[i].oid.stripe & ~STRIPE_MASK)) { if (!all[i].is_stable) { @@ -248,7 +248,7 @@ void pg_t::calc_object_states() pg.state = pg.state | PG_DEGRADED; } printf( - "PG %u is active%s%s%s%s\n", pg.pg_num, + "PG %u is active%s%s%s%s%s\n", pg.pg_num, (pg.state & PG_DEGRADED) ? " + degraded" : "", (pg.state & PG_HAS_UNFOUND) ? " + has_unfound" : "", (pg.state & PG_HAS_DEGRADED) ? " + has_degraded" : "", diff --git a/osd_peering_pg.h b/osd_peering_pg.h index 9ade5320..b515e640 100644 --- a/osd_peering_pg.h +++ b/osd_peering_pg.h @@ -20,6 +20,9 @@ #define PG_HAS_MISPLACED (1<<7) #define PG_HAS_UNCLEAN (1<<8) +// FIXME: Safe default that doesn't depend on parity_block_size of pg_parity_size +#define STRIPE_MASK ((uint64_t)4096 - 1) + // OSD object states #define OBJ_CLEAN 0x01 #define OBJ_MISPLACED 0x02 diff --git a/osd_primary.cpp b/osd_primary.cpp index 698da1ed..f918db27 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -43,20 +43,46 @@ void osd_t::finish_primary_op(osd_op_t *cur_op, int retval) outbox_push(this->clients[cur_op->peer_fd], cur_op); } +inline void split_stripes(uint64_t pg_minsize, uint32_t bs_block_size, uint64_t start, uint64_t end, osd_read_stripe_t *stripes) +{ + for (int role = 0; role < pg_minsize; role++) + { + if (start < (1+role)*bs_block_size && end > role*bs_block_size) + { + stripes[role].real_start = stripes[role].start + = start < role*bs_block_size ? 0 : start-role*bs_block_size; + stripes[role].real_end = stripes[role].end + = end > (role+1)*bs_block_size ? bs_block_size : end-role*bs_block_size; + } + } +} + void osd_t::exec_primary_read(osd_op_t *cur_op) { - object_id oid = { - .inode = cur_op->op.rw.inode, - .stripe = (cur_op->op.rw.offset / (bs_block_size*2)) << STRIPE_SHIFT, - }; + // PG number is calculated from the offset + // Our EC scheme stores data in fixed chunks equal to (K*block size) + // But we must not use K in the process of calculating the PG number + // So we calculate the PG number using a separate setting which should be per-inode (FIXME) uint64_t start = cur_op->op.rw.offset; uint64_t end = cur_op->op.rw.offset + cur_op->op.rw.len; - pg_num_t pg_num = (oid % pg_count); // FIXME +1 - if (((end - 1) / (bs_block_size*2)) != oid.stripe || - (start % bs_disk_alignment) || (end % bs_disk_alignment) || - pg_num > pgs.size() || - // FIXME: Postpone operations in inactive PGs - !(pgs[pg_num].state & PG_ACTIVE)) + // FIXME Real pg_num should equal the below expression + 1 + pg_num_t pg_num = (cur_op->op.rw.inode + cur_op->op.rw.offset / parity_block_size) % pg_count; + // FIXME: Postpone operations in inactive PGs + if (pg_num > pgs.size() || !(pgs[pg_num].state & PG_ACTIVE)) + { + finish_primary_op(cur_op, -EINVAL); + return; + } + uint64_t pg_parity_size = bs_block_size * pgs[pg_num].pg_minsize; + object_id oid = { + .inode = cur_op->op.rw.inode, + // oid.stripe = starting offset of the parity stripe, so it can be mapped back to the PG + .stripe = (cur_op->op.rw.offset / parity_block_size) * parity_block_size + + ((cur_op->op.rw.offset % parity_block_size) / pg_parity_size) * pg_parity_size + }; + if (end > (oid.stripe + pg_parity_size) || + (start % bs_disk_alignment) != 0 || + (end % bs_disk_alignment) != 0) { finish_primary_op(cur_op, -EINVAL); return; @@ -65,18 +91,10 @@ void osd_t::exec_primary_read(osd_op_t *cur_op) sizeof(osd_primary_read_t) + sizeof(osd_read_stripe_t) * pgs[pg_num].pg_size, 1 ); op_data->oid = oid; - osd_read_stripe_t *stripes = (op_data->stripes = ((osd_read_stripe_t*)(op_data+1))); + op_data->stripes = ((osd_read_stripe_t*)(op_data+1)); cur_op->op_data = op_data; - for (int role = 0; role < pgs[pg_num].pg_minsize; role++) - { - if (start < (1+role)*bs_block_size && end > role*bs_block_size) - { - stripes[role].real_start = stripes[role].start - = start < role*bs_block_size ? 0 : start-role*bs_block_size; - stripes[role].end = stripes[role].real_end - = end > (role+1)*bs_block_size ? bs_block_size : end-role*bs_block_size; - } - } + split_stripes(pgs[pg_num].pg_minsize, bs_block_size, start, end, op_data->stripes); + // Determine version { auto vo_it = pgs[pg_num].ver_override.find(oid); op_data->target_ver = vo_it != pgs[pg_num].ver_override.end() ? vo_it->second : UINT64_MAX; @@ -95,7 +113,7 @@ void osd_t::exec_primary_read(osd_op_t *cur_op) uint64_t* cur_set = (st_it != pgs[pg_num].obj_states.end() ? st_it->second->read_target.data() : pgs[pg_num].cur_set.data()); - if (extend_missing_stripes(stripes, cur_set, pgs[pg_num].pg_minsize, pgs[pg_num].pg_size) < 0) + if (extend_missing_stripes(op_data->stripes, cur_set, pgs[pg_num].pg_minsize, pgs[pg_num].pg_size) < 0) { free(op_data); finish_primary_op(cur_op, -EIO); @@ -130,18 +148,40 @@ void osd_t::handle_primary_read_subop(osd_op_t *cur_op, int ok) if (op_data->degraded) { // Reconstruct missing stripes + // FIXME: Always EC(k+1) by now. Add different coding schemes osd_read_stripe_t *stripes = op_data->stripes; for (int role = 0; role < op_data->pg_minsize; role++) { if (stripes[role].end != 0 && stripes[role].real_end == 0) { - int other = role == 0 ? 1 : 0; - int parity = op_data->pg_size-1; - memxor( - cur_op->buf + stripes[other].pos + (stripes[other].real_start - stripes[role].start), - cur_op->buf + stripes[parity].pos + (stripes[parity].real_start - stripes[role].start), - cur_op->buf + stripes[role].pos, stripes[role].end - stripes[role].start - ); + int prev = -2; + for (int other = 0; other < op_data->pg_size; other++) + { + if (other != role) + { + if (prev == -2) + { + prev = other; + } + else if (prev >= 0) + { + memxor( + cur_op->buf + stripes[prev].pos + (stripes[prev].real_start - stripes[role].start), + cur_op->buf + stripes[other].pos + (stripes[other].real_start - stripes[other].start), + cur_op->buf + stripes[role].pos, stripes[role].end - stripes[role].start + ); + prev = -1; + } + else + { + memxor( + cur_op->buf + stripes[role].pos, + cur_op->buf + stripes[other].pos + (stripes[other].real_start - stripes[role].start), + cur_op->buf + stripes[role].pos, stripes[role].end - stripes[role].start + ); + } + } + } } if (stripes[role].end != 0) { @@ -286,6 +326,31 @@ void osd_t::submit_read_subops(int read_pg_size, const uint64_t* osd_set, osd_op void osd_t::exec_primary_write(osd_op_t *cur_op) { + // "RAID5" EC(k+1) parity modification variants (Px = previous, Nx = new): + // 1,2,3 write N1 -> read P2 -> write N3 = N1^P2 + // _,2,3 write N1 -> read P2 -> write N3 = N1^P2 + // 1,_,3 write N1 -> read P1,P3 -> write N3 = N1^P3^P1 + // 1,2,_ write N1 -> read nothing + // 1,2,3,4 write N1 -> read P2,P3 -> write N4 = N1^P2^P3 + // (or read P1,P4 -> write N4 = N1^P4^P1) + // 1,_,3,4 write N1 -> read P1,P4 -> write N4 = N1^P4^P1 + // _,2,3,4 write N1 -> read P2,P3 -> write N4 = N1^P3^P2 + // 1,2,3,4,5 write N1 -> read P1,P5 -> write N5 = N1^P5^P1 + // 1,_,3,4,5 write N1 -> read P1,P5 -> write N5 = N1^P5^P1 + // _,2,3,4,5 write N1 -> read P2,P3,P4 -> write N5 = N1^P2^P3^P4 + // + // I.e, when we write a part: + // 1) If parity is missing and all other parts are available: + // just overwrite the part + // 2) If the modified part is missing and all other parts are available: + // read all other parts except parity, xor them all with the new data + // 3) If all parts are available and size=3: + // read the paired data stripe, xor it with the new data + // 4) Otherwise: + // read old parity and old data of the modified part, xor them both with the new data + // Ouсh. Scary. But faster than the generic variant. + // + // Generic variant for jerasure is a simple RMW process: read all -> decode -> modify -> encode -> write } @@ -293,10 +358,3 @@ void osd_t::exec_primary_sync(osd_op_t *cur_op) { } - -void osd_t::make_primary_reply(osd_op_t *op) -{ - op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; - op->reply.hdr.id = op->op.hdr.id; - op->reply.hdr.opcode = op->op.hdr.opcode; -} diff --git a/test_osd.cpp b/test_osd.cpp index 8fa2b2c6..336a63a8 100644 --- a/test_osd.cpp +++ b/test_osd.cpp @@ -94,6 +94,27 @@ int connect_osd(const char *osd_address, int osd_port) return connect_fd; } +bool check_reply(int r, osd_any_op_t & op, osd_any_reply_t & reply, int expected) +{ + if (r != OSD_PACKET_SIZE) + { + printf("read failed\n"); + return false; + } + if (reply.hdr.magic != SECONDARY_OSD_REPLY_MAGIC || + reply.hdr.id != op.hdr.id || reply.hdr.opcode != op.hdr.opcode) + { + printf("bad reply: magic, id or opcode does not match request\n"); + return false; + } + if (reply.hdr.retval != expected) + { + printf("operation failed, retval=%ld\n", reply.hdr.retval); + return false; + } + return true; +} + uint64_t test_write(int connect_fd, uint64_t inode, uint64_t stripe, uint64_t version, uint64_t pattern) { union @@ -116,18 +137,15 @@ uint64_t test_write(int connect_fd, uint64_t inode, uint64_t stripe, uint64_t ve op.sec_rw.version = version; op.sec_rw.offset = 0; op.sec_rw.len = 128*1024; - void *data = memalign(512, 128*1024); - for (int i = 0; i < 128*1024/sizeof(uint64_t); i++) + void *data = memalign(512, op.sec_rw.len); + for (int i = 0; i < (op.sec_rw.len)/sizeof(uint64_t); i++) ((uint64_t*)data)[i] = pattern; write_blocking(connect_fd, op_buf, OSD_PACKET_SIZE); - write_blocking(connect_fd, data, 128*1024); + write_blocking(connect_fd, data, op.sec_rw.len); int r = read_blocking(connect_fd, reply_buf, OSD_PACKET_SIZE); - if (r != OSD_PACKET_SIZE || reply.hdr.magic != SECONDARY_OSD_REPLY_MAGIC || - reply.hdr.id != 1 || reply.hdr.opcode != OSD_OP_SECONDARY_WRITE || - reply.hdr.retval != 128*1024) + if (!check_reply(r, op, reply, op.sec_rw.len)) { free(data); - perror("read"); return 0; } version = reply.sec_rw.version; @@ -135,12 +153,9 @@ uint64_t test_write(int connect_fd, uint64_t inode, uint64_t stripe, uint64_t ve op.hdr.id = 2; write_blocking(connect_fd, op_buf, OSD_PACKET_SIZE); r = read_blocking(connect_fd, reply_buf, OSD_PACKET_SIZE); - if (r != OSD_PACKET_SIZE || reply.hdr.magic != SECONDARY_OSD_REPLY_MAGIC || - reply.hdr.id != 2 || reply.hdr.opcode != OSD_OP_TEST_SYNC_STAB_ALL || - reply.hdr.retval != 0) + if (!check_reply(r, op, reply, 0)) { free(data); - perror("read"); return 0; } free(data); @@ -168,12 +183,9 @@ void* test_primary_read(int connect_fd, uint64_t inode, uint64_t offset, uint64_ void *data = memalign(512, len); write_blocking(connect_fd, op_buf, OSD_PACKET_SIZE); int r = read_blocking(connect_fd, reply_buf, OSD_PACKET_SIZE); - if (r != OSD_PACKET_SIZE || reply.hdr.magic != SECONDARY_OSD_REPLY_MAGIC || - reply.hdr.id != 1 || reply.hdr.opcode != OSD_OP_READ || - reply.hdr.retval != len) + if (!check_reply(r, op, reply, len)) { free(data); - perror("read"); return NULL; } r = read_blocking(connect_fd, data, len);