From d4fd9d982aa0d51e223a456b4f87cd1d34fe20b3 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sun, 23 Feb 2020 02:11:43 +0300 Subject: [PATCH] Implement read-modify-write calculation and extract it into a separate file --- Makefile | 6 +- blockstore_impl.cpp | 6 + blockstore_impl.h | 3 + object_id.h | 1 + osd.h | 6 +- osd_id.h | 4 + osd_ops.h | 6 +- osd_primary.cpp | 282 ++++++++++++++-------------------- osd_rmw.cpp | 365 ++++++++++++++++++++++++++++++++++++++++++++ osd_rmw.h | 36 +++++ osd_rmw_test.cpp | 100 ++++++++++++ 11 files changed, 637 insertions(+), 178 deletions(-) create mode 100644 osd_id.h create mode 100644 osd_rmw.cpp create mode 100644 osd_rmw.h create mode 100644 osd_rmw_test.cpp diff --git a/Makefile b/Makefile index c9430e5a3..1659c87af 100644 --- a/Makefile +++ b/Makefile @@ -24,7 +24,7 @@ libblockstore.so: $(BLOCKSTORE_OBJS) libfio_blockstore.so: ./libblockstore.so fio_engine.cpp json11.o g++ $(CXXFLAGS) -shared -o libfio_blockstore.so fio_engine.cpp json11.o ./libblockstore.so -ltcmalloc_minimal -luring -OSD_OBJS := osd.o osd_exec_secondary.o osd_receive.o osd_send.o osd_peering.o osd_peering_pg.o osd_primary.o json11.o timerfd_interval.o +OSD_OBJS := osd.o osd_exec_secondary.o osd_receive.o osd_send.o osd_peering.o osd_peering_pg.o osd_primary.o osd_rmw.o json11.o timerfd_interval.o osd_exec_secondary.o: osd_exec_secondary.cpp osd.h osd_ops.h g++ $(CXXFLAGS) -c -o $@ $< osd_receive.o: osd_receive.cpp osd.h osd_ops.h @@ -35,6 +35,10 @@ osd_peering.o: osd_peering.cpp osd.h osd_ops.h osd_peering_pg.h g++ $(CXXFLAGS) -c -o $@ $< osd_peering_pg.o: osd_peering_pg.cpp object_id.h osd_peering_pg.h g++ $(CXXFLAGS) -c -o $@ $< +osd_rmw.o: osd_rmw.cpp osd_rmw.h xor.h + g++ $(CXXFLAGS) -c -o $@ $< +osd_rmw_test: osd_rmw_test.cpp osd_rmw.cpp osd_rmw.h xor.h + g++ $(CXXFLAGS) -o $@ $< osd_primary.o: osd_primary.cpp osd.h osd_ops.h osd_peering_pg.h xor.h g++ $(CXXFLAGS) -c -o $@ $< osd.o: osd.cpp osd.h osd_ops.h osd_peering_pg.h diff --git a/blockstore_impl.cpp b/blockstore_impl.cpp index d478a0525..2eb76cca7 100644 --- a/blockstore_impl.cpp +++ b/blockstore_impl.cpp @@ -316,6 +316,12 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op, bool first) { return; } + if (0 && op->opcode == BS_OP_SYNC && immediate_commit) + { + op->retval = 0; + op->callback(op); + return; + } // Call constructor without allocating memory. We'll call destructor before returning op back new ((void*)op->private_data) blockstore_op_private_t; PRIV(op)->wait_for = 0; diff --git a/blockstore_impl.h b/blockstore_impl.h index 351163dc5..34ef453d9 100644 --- a/blockstore_impl.h +++ b/blockstore_impl.h @@ -189,6 +189,9 @@ class blockstore_impl_t bool readonly = false; // It is safe to disable fsync() if drive write cache is writethrough bool disable_data_fsync = false, disable_meta_fsync = false, disable_journal_fsync = false; + // Enable if you want every operation to be executed with an "implicit fsync" + // FIXME Not implemented yet + bool immediate_commit = false; bool inmemory_meta = false; int flusher_count; /******* END OF OPTIONS *******/ diff --git a/object_id.h b/object_id.h index 5264a3e59..5846ba6a2 100644 --- a/object_id.h +++ b/object_id.h @@ -1,6 +1,7 @@ #pragma once #include +#include // 16 bytes per object/stripe id // stripe = (start of the parity stripe + peer role) diff --git a/osd.h b/osd.h index eaf9c1a95..3d9666f9b 100644 --- a/osd.h +++ b/osd.h @@ -92,6 +92,8 @@ struct osd_op_buf_list_t } }; +struct osd_primary_op_data_t; + struct osd_op_t { int op_type; @@ -108,7 +110,7 @@ struct osd_op_t }; blockstore_op_t bs_op; void *buf = NULL; - void *op_data = NULL; + osd_primary_op_data_t* op_data = NULL; std::function callback; osd_op_buf_list_t send_list; @@ -241,12 +243,12 @@ class osd_t void secondary_op_callback(osd_op_t *cur_op); // primary ops + bool prepare_primary_rw(osd_op_t *cur_op); void exec_primary_read(osd_op_t *cur_op); void exec_primary_write(osd_op_t *cur_op); void exec_primary_sync(osd_op_t *cur_op); void finish_primary_op(osd_op_t *cur_op, int retval); void handle_primary_read_subop(osd_op_t *cur_op, int ok); - int extend_missing_stripes(osd_read_stripe_t *stripes, osd_num_t *osd_set, int minsize, int size); void submit_read_subops(int read_pg_size, const uint64_t* osd_set, osd_op_t *cur_op); public: osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringloop); diff --git a/osd_id.h b/osd_id.h new file mode 100644 index 000000000..69720521e --- /dev/null +++ b/osd_id.h @@ -0,0 +1,4 @@ +#pragma once + +typedef uint64_t osd_num_t; +typedef uint32_t pg_num_t; diff --git a/osd_ops.h b/osd_ops.h index 8309e10ce..47b17bc3a 100644 --- a/osd_ops.h +++ b/osd_ops.h @@ -1,6 +1,7 @@ #pragma once #include "object_id.h" +#include "osd_id.h" // Magic numbers #define SECONDARY_OSD_OP_MAGIC 0x2bd7b10325434553l @@ -26,9 +27,6 @@ #define OSD_RW_ALIGN 512 #define OSD_RW_MAX 64*1024*1024 -typedef uint64_t osd_num_t; -typedef uint32_t pg_num_t; - // common request and reply headers struct __attribute__((__packed__)) osd_op_header_t { @@ -152,7 +150,7 @@ struct __attribute__((__packed__)) osd_op_rw_t // offset uint64_t offset; // length - uint64_t len; + uint32_t len; }; struct __attribute__((__packed__)) osd_reply_rw_t diff --git a/osd_primary.cpp b/osd_primary.cpp index f918db270..3471e8198 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -1,5 +1,5 @@ #include "osd.h" -#include "xor.h" +#include "osd_rmw.h" // read: read directly or read paired stripe(s), reconstruct, return // write: read paired stripe(s), modify, write @@ -10,19 +10,7 @@ // // sync: sync peers, get unstable versions from somewhere, stabilize them -struct off_len_t -{ - uint64_t offset, len; -}; - -struct osd_read_stripe_t -{ - uint64_t pos; - uint32_t start, end; - uint32_t real_start, real_end; -}; - -struct osd_primary_read_t +struct osd_primary_op_data_t { pg_num_t pg_num; object_id oid; @@ -31,6 +19,9 @@ struct osd_primary_read_t int degraded = 0, pg_size, pg_minsize; osd_read_stripe_t *stripes; osd_op_t *subops = NULL; + void *rmw_buf = NULL; + + bool should_read_version = false; }; void osd_t::finish_primary_op(osd_op_t *cur_op, int retval) @@ -43,35 +34,19 @@ void osd_t::finish_primary_op(osd_op_t *cur_op, int retval) outbox_push(this->clients[cur_op->peer_fd], cur_op); } -inline void split_stripes(uint64_t pg_minsize, uint32_t bs_block_size, uint64_t start, uint64_t end, osd_read_stripe_t *stripes) -{ - for (int role = 0; role < pg_minsize; role++) - { - if (start < (1+role)*bs_block_size && end > role*bs_block_size) - { - stripes[role].real_start = stripes[role].start - = start < role*bs_block_size ? 0 : start-role*bs_block_size; - stripes[role].real_end = stripes[role].end - = end > (role+1)*bs_block_size ? bs_block_size : end-role*bs_block_size; - } - } -} - -void osd_t::exec_primary_read(osd_op_t *cur_op) +bool osd_t::prepare_primary_rw(osd_op_t *cur_op) { // PG number is calculated from the offset // Our EC scheme stores data in fixed chunks equal to (K*block size) // But we must not use K in the process of calculating the PG number // So we calculate the PG number using a separate setting which should be per-inode (FIXME) - uint64_t start = cur_op->op.rw.offset; - uint64_t end = cur_op->op.rw.offset + cur_op->op.rw.len; // FIXME Real pg_num should equal the below expression + 1 pg_num_t pg_num = (cur_op->op.rw.inode + cur_op->op.rw.offset / parity_block_size) % pg_count; // FIXME: Postpone operations in inactive PGs if (pg_num > pgs.size() || !(pgs[pg_num].state & PG_ACTIVE)) { finish_primary_op(cur_op, -EINVAL); - return; + return false; } uint64_t pg_parity_size = bs_block_size * pgs[pg_num].pg_minsize; object_id oid = { @@ -80,56 +55,72 @@ void osd_t::exec_primary_read(osd_op_t *cur_op) .stripe = (cur_op->op.rw.offset / parity_block_size) * parity_block_size + ((cur_op->op.rw.offset % parity_block_size) / pg_parity_size) * pg_parity_size }; - if (end > (oid.stripe + pg_parity_size) || - (start % bs_disk_alignment) != 0 || - (end % bs_disk_alignment) != 0) + if ((cur_op->op.rw.offset + cur_op->op.rw.len) > (oid.stripe + pg_parity_size) || + (cur_op->op.rw.offset % bs_disk_alignment) != 0 || + (cur_op->op.rw.len % bs_disk_alignment) != 0) { finish_primary_op(cur_op, -EINVAL); - return; + return false; } - osd_primary_read_t *op_data = (osd_primary_read_t*)calloc( - sizeof(osd_primary_read_t) + sizeof(osd_read_stripe_t) * pgs[pg_num].pg_size, 1 + osd_primary_op_data_t *op_data = (osd_primary_op_data_t*)calloc( + sizeof(osd_primary_op_data_t) + sizeof(osd_read_stripe_t) * pgs[pg_num].pg_size, 1 ); + op_data->pg_num = pg_num; op_data->oid = oid; op_data->stripes = ((osd_read_stripe_t*)(op_data+1)); cur_op->op_data = op_data; - split_stripes(pgs[pg_num].pg_minsize, bs_block_size, start, end, op_data->stripes); - // Determine version + split_stripes(pgs[pg_num].pg_minsize, bs_block_size, (uint32_t)(cur_op->op.rw.offset - oid.stripe), cur_op->op.rw.len, op_data->stripes); + return true; +} + +void osd_t::exec_primary_read(osd_op_t *cur_op) +{ + if (!prepare_primary_rw(cur_op)) { - auto vo_it = pgs[pg_num].ver_override.find(oid); - op_data->target_ver = vo_it != pgs[pg_num].ver_override.end() ? vo_it->second : UINT64_MAX; + return; } - if (pgs[pg_num].state == PG_ACTIVE) + osd_primary_op_data_t *op_data = (osd_primary_op_data_t*)cur_op->op_data; + auto & pg = pgs[op_data->pg_num]; + for (int role = 0; role < pg.pg_minsize; role++) + { + op_data->stripes[role].read_start = op_data->stripes[role].req_start; + op_data->stripes[role].read_end = op_data->stripes[role].req_end; + } + // Determine version + auto vo_it = pg.ver_override.find(op_data->oid); + op_data->target_ver = vo_it != pg.ver_override.end() ? vo_it->second : UINT64_MAX; + if (pg.state == PG_ACTIVE) { // Fast happy-path - submit_read_subops(pgs[pg_num].pg_minsize, pgs[pg_num].cur_set.data(), cur_op); + cur_op->buf = alloc_read_buffer(op_data->stripes, pg.pg_minsize, 0); + submit_read_subops(pg.pg_minsize, pg.cur_set.data(), cur_op); cur_op->send_list.push_back(cur_op->buf, cur_op->op.rw.len); } else { // PG may be degraded or have misplaced objects - spp::sparse_hash_map obj_states; - auto st_it = pgs[pg_num].obj_states.find(oid); - uint64_t* cur_set = (st_it != pgs[pg_num].obj_states.end() + auto st_it = pg.obj_states.find(op_data->oid); + uint64_t* cur_set = (st_it != pg.obj_states.end() ? st_it->second->read_target.data() - : pgs[pg_num].cur_set.data()); - if (extend_missing_stripes(op_data->stripes, cur_set, pgs[pg_num].pg_minsize, pgs[pg_num].pg_size) < 0) + : pg.cur_set.data()); + if (extend_missing_stripes(op_data->stripes, cur_set, pg.pg_minsize, pg.pg_size) < 0) { free(op_data); finish_primary_op(cur_op, -EIO); return; } // Submit reads - submit_read_subops(pgs[pg_num].pg_size, cur_set, cur_op); - op_data->pg_minsize = pgs[pg_num].pg_minsize; - op_data->pg_size = pgs[pg_num].pg_size; + cur_op->buf = alloc_read_buffer(op_data->stripes, pg.pg_size, 0); + submit_read_subops(pg.pg_size, cur_set, cur_op); + op_data->pg_minsize = pg.pg_minsize; + op_data->pg_size = pg.pg_size; op_data->degraded = 1; } } void osd_t::handle_primary_read_subop(osd_op_t *cur_op, int ok) { - osd_primary_read_t *op_data = (osd_primary_read_t*)cur_op->op_data; + osd_primary_op_data_t *op_data = (osd_primary_op_data_t*)cur_op->op_data; if (!ok) op_data->errors++; else @@ -152,42 +143,15 @@ void osd_t::handle_primary_read_subop(osd_op_t *cur_op, int ok) osd_read_stripe_t *stripes = op_data->stripes; for (int role = 0; role < op_data->pg_minsize; role++) { - if (stripes[role].end != 0 && stripes[role].real_end == 0) + if (stripes[role].read_end != 0 && stripes[role].missing) { - int prev = -2; - for (int other = 0; other < op_data->pg_size; other++) - { - if (other != role) - { - if (prev == -2) - { - prev = other; - } - else if (prev >= 0) - { - memxor( - cur_op->buf + stripes[prev].pos + (stripes[prev].real_start - stripes[role].start), - cur_op->buf + stripes[other].pos + (stripes[other].real_start - stripes[other].start), - cur_op->buf + stripes[role].pos, stripes[role].end - stripes[role].start - ); - prev = -1; - } - else - { - memxor( - cur_op->buf + stripes[role].pos, - cur_op->buf + stripes[other].pos + (stripes[other].real_start - stripes[role].start), - cur_op->buf + stripes[role].pos, stripes[role].end - stripes[role].start - ); - } - } - } + reconstruct_stripe(stripes, op_data->pg_size, role); } - if (stripes[role].end != 0) + if (stripes[role].req_end != 0) { // Send buffer in parts to avoid copying cur_op->send_list.push_back( - cur_op->buf + stripes[role].pos + (stripes[role].real_start - stripes[role].start), stripes[role].end + stripes[role].read_buf + (stripes[role].read_start - stripes[role].req_start), stripes[role].req_end ); } } @@ -198,75 +162,38 @@ void osd_t::handle_primary_read_subop(osd_op_t *cur_op, int ok) } } -int osd_t::extend_missing_stripes(osd_read_stripe_t *stripes, osd_num_t *osd_set, int minsize, int size) -{ - for (int role = 0; role < minsize; role++) - { - if (stripes[role].end != 0 && osd_set[role] == 0) - { - stripes[role].real_start = stripes[role].real_end = 0; - // Stripe is missing. Extend read to other stripes. - // We need at least pg_minsize stripes to recover the lost part. - int exist = 0; - for (int j = 0; j < size; j++) - { - if (osd_set[j] != 0) - { - if (stripes[j].real_end == 0 || j >= minsize) - { - stripes[j].real_start = stripes[role].start; - stripes[j].real_end = stripes[role].end; - } - else - { - stripes[j].real_start = stripes[j].start < stripes[role].start ? stripes[j].start : stripes[role].start; - stripes[j].real_end = stripes[j].end > stripes[role].end ? stripes[j].end : stripes[role].end; - } - exist++; - if (exist >= minsize) - { - break; - } - } - } - if (exist < minsize) - { - // Less than minsize stripes are available for this object - return -1; - } - } - } - return 0; -} - void osd_t::submit_read_subops(int read_pg_size, const uint64_t* osd_set, osd_op_t *cur_op) { - osd_primary_read_t *op_data = (osd_primary_read_t*)cur_op->op_data; + osd_primary_op_data_t *op_data = (osd_primary_op_data_t*)cur_op->op_data; osd_read_stripe_t *stripes = op_data->stripes; - uint64_t buf_size = 0; - int n_subops = 0; + // Allocate subops + int n_subops = 0, force_read = -1; for (int role = 0; role < read_pg_size; role++) { - if (stripes[role].real_end != 0) + if (osd_set[role] == this->osd_num || osd_set[role] != 0 && force_read == -1) + { + force_read = role; + } + if (osd_set[role] != 0 && stripes[role].read_end != 0) { n_subops++; - stripes[role].pos = buf_size; - buf_size += stripes[role].real_end - stripes[role].real_start; - } - else if (stripes[role].end != 0) - { - stripes[role].pos = buf_size; - buf_size += stripes[role].end - stripes[role].start; } } + if (!n_subops && op_data->should_read_version) + { + n_subops = 1; + } + else + { + force_read = -1; + } osd_op_t *subops = new osd_op_t[n_subops]; - cur_op->buf = memalign(MEM_ALIGNMENT, buf_size); op_data->n_subops = n_subops; op_data->subops = subops; int subop = 0; for (int role = 0; role < read_pg_size; role++) { - if (stripes[role].real_end == 0) + if (stripes[role].read_end == 0 && force_read != role) { continue; } @@ -277,7 +204,7 @@ void osd_t::submit_read_subops(int read_pg_size, const uint64_t* osd_set, osd_op { subops[subop].bs_op = { .opcode = BS_OP_READ, - .callback = [this, cur_op](blockstore_op_t *subop) + .callback = [cur_op, this](blockstore_op_t *subop) { handle_primary_read_subop(cur_op, subop->retval == subop->len); }, @@ -286,9 +213,9 @@ void osd_t::submit_read_subops(int read_pg_size, const uint64_t* osd_set, osd_op .stripe = op_data->oid.stripe | role, }, .version = op_data->target_ver, - .offset = stripes[role].real_start, - .len = stripes[role].real_end - stripes[role].real_start, - .buf = cur_op->buf + stripes[role].pos, + .offset = stripes[role].read_start, + .len = stripes[role].read_end - stripes[role].read_start, + .buf = stripes[role].read_buf, }; bs->enqueue_op(&subops[subop].bs_op); } @@ -307,11 +234,11 @@ void osd_t::submit_read_subops(int read_pg_size, const uint64_t* osd_set, osd_op .stripe = op_data->oid.stripe | role, }, .version = op_data->target_ver, - .offset = stripes[role].real_start, - .len = stripes[role].real_end - stripes[role].real_start, + .offset = stripes[role].read_start, + .len = stripes[role].read_end - stripes[role].read_start, }; - subops[subop].buf = cur_op->buf + stripes[role].pos; - subops[subop].callback = [this, cur_op](osd_op_t *subop) + subops[subop].buf = stripes[role].read_buf; + subops[subop].callback = [cur_op, this](osd_op_t *subop) { // so it doesn't get freed. FIXME: do it better subop->buf = NULL; @@ -326,31 +253,44 @@ void osd_t::submit_read_subops(int read_pg_size, const uint64_t* osd_set, osd_op void osd_t::exec_primary_write(osd_op_t *cur_op) { - // "RAID5" EC(k+1) parity modification variants (Px = previous, Nx = new): - // 1,2,3 write N1 -> read P2 -> write N3 = N1^P2 - // _,2,3 write N1 -> read P2 -> write N3 = N1^P2 - // 1,_,3 write N1 -> read P1,P3 -> write N3 = N1^P3^P1 - // 1,2,_ write N1 -> read nothing - // 1,2,3,4 write N1 -> read P2,P3 -> write N4 = N1^P2^P3 - // (or read P1,P4 -> write N4 = N1^P4^P1) - // 1,_,3,4 write N1 -> read P1,P4 -> write N4 = N1^P4^P1 - // _,2,3,4 write N1 -> read P2,P3 -> write N4 = N1^P3^P2 - // 1,2,3,4,5 write N1 -> read P1,P5 -> write N5 = N1^P5^P1 - // 1,_,3,4,5 write N1 -> read P1,P5 -> write N5 = N1^P5^P1 - // _,2,3,4,5 write N1 -> read P2,P3,P4 -> write N5 = N1^P2^P3^P4 - // - // I.e, when we write a part: - // 1) If parity is missing and all other parts are available: - // just overwrite the part - // 2) If the modified part is missing and all other parts are available: - // read all other parts except parity, xor them all with the new data - // 3) If all parts are available and size=3: - // read the paired data stripe, xor it with the new data - // 4) Otherwise: - // read old parity and old data of the modified part, xor them both with the new data - // Ouсh. Scary. But faster than the generic variant. - // - // Generic variant for jerasure is a simple RMW process: read all -> decode -> modify -> encode -> write + if (!prepare_primary_rw(cur_op)) + { + return; + } + osd_primary_op_data_t *op_data = (osd_primary_op_data_t*)cur_op->op_data; + auto & pg = pgs[op_data->pg_num]; + // Check if actions are pending for this object + auto act_it = pg.obj_stab_actions.lower_bound((obj_piece_id_t){ + .oid = op_data->oid, + .osd_num = 0, + }); + if (act_it != pg.obj_stab_actions.end() && + act_it->first.oid.inode == op_data->oid.inode && + (act_it->first.oid.stripe & ~STRIPE_MASK) == op_data->oid.stripe) + { + // FIXME postpone the request until actions are done + free(op_data); + finish_primary_op(cur_op, -EIO); + return; + } + // Check if there are other write requests to the same object + + // Determine blocks to read + op_data->rmw_buf = calc_rmw_reads(cur_op->buf, op_data->stripes, pg.cur_set.data(), pg.pg_size, pg.pg_minsize, pg.pg_cursize); + op_data->should_read_version = true; + // Read required blocks + submit_read_subops(pg.pg_size, pg.cur_set.data(), cur_op); + // ->>>>> Continue from the callback + // Calculate parity + calc_rmw_parity(op_data->stripes, op_data->pg_size); + // Save version override if degraded + + // Send writes + + // ->>>>> Continue from the callback + // Remember version as unstable + + // Remove version override if degraded } diff --git a/osd_rmw.cpp b/osd_rmw.cpp new file mode 100644 index 000000000..911d6f06a --- /dev/null +++ b/osd_rmw.cpp @@ -0,0 +1,365 @@ +#include +#include +#include "xor.h" +#include "osd_rmw.h" + +static inline void extend_read(uint32_t start, uint32_t end, osd_read_stripe_t & stripe) +{ + if (stripe.read_end == 0) + { + stripe.read_start = start; + stripe.read_end = end; + } + else + { + if (stripe.read_end < end) + stripe.read_end = end; + if (stripe.read_start > start) + stripe.read_start = start; + } +} + +static inline void cover_read(uint32_t start, uint32_t end, osd_read_stripe_t & stripe) +{ + // Subtract write request from request + if (start >= stripe.req_start && + end <= stripe.req_end) + { + return; + } + if (start <= stripe.req_start && + end >= stripe.req_start && + end <= stripe.req_end) + { + end = stripe.req_start; + } + else if (start >= stripe.req_start && + start <= stripe.req_end && + end >= stripe.req_end) + { + start = stripe.req_end; + } + if (stripe.read_end == 0) + { + stripe.read_start = start; + stripe.read_end = end; + } + else + { + if (stripe.read_end < end) + stripe.read_end = end; + if (stripe.read_start > start) + stripe.read_start = start; + } +} + +void split_stripes(uint64_t pg_minsize, uint32_t bs_block_size, uint32_t start, uint32_t end, osd_read_stripe_t *stripes) +{ + end = start+end; + for (int role = 0; role < pg_minsize; role++) + { + if (start < (1+role)*bs_block_size && end > role*bs_block_size) + { + stripes[role].req_start = start < role*bs_block_size ? 0 : start-role*bs_block_size; + stripes[role].req_end = end > (role+1)*bs_block_size ? bs_block_size : end-role*bs_block_size; + } + } +} + +void reconstruct_stripe(osd_read_stripe_t *stripes, int pg_size, int role) +{ + int prev = -2; + for (int other = 0; other < pg_size; other++) + { + if (other != role) + { + if (prev == -2) + { + prev = other; + } + else if (prev >= 0) + { + memxor( + stripes[prev].read_buf + (stripes[prev].read_start - stripes[role].read_start), + stripes[other].read_buf + (stripes[other].read_start - stripes[other].read_start), + stripes[role].read_buf, stripes[role].read_end - stripes[role].read_start + ); + prev = -1; + } + else + { + memxor( + stripes[role].read_buf, + stripes[other].read_buf + (stripes[other].read_start - stripes[role].read_start), + stripes[role].read_buf, stripes[role].read_end - stripes[role].read_start + ); + } + } + } +} + +int extend_missing_stripes(osd_read_stripe_t *stripes, osd_num_t *osd_set, int minsize, int size) +{ + for (int role = 0; role < minsize; role++) + { + if (stripes[role].read_end != 0 && osd_set[role] == 0) + { + stripes[role].missing = true; + // Stripe is missing. Extend read to other stripes. + // We need at least pg_minsize stripes to recover the lost part. + // FIXME: LRC EC and similar don't require to read all other stripes. + int exist = 0; + for (int j = 0; j < size; j++) + { + if (osd_set[j] != 0) + { + extend_read(stripes[role].read_start, stripes[role].read_end, stripes[j]); + exist++; + if (exist >= minsize) + { + break; + } + } + } + if (exist < minsize) + { + // Less than minsize stripes are available for this object + return -1; + } + } + } + return 0; +} + +void* alloc_read_buffer(osd_read_stripe_t *stripes, int read_pg_size, uint64_t add_size) +{ + // Calculate buffer size + uint64_t buf_size = add_size; + for (int role = 0; role < read_pg_size; role++) + { + if (stripes[role].read_end != 0) + { + buf_size += stripes[role].read_end - stripes[role].read_start; + } + } + // Allocate buffer + void *buf = memalign(MEM_ALIGNMENT, buf_size); + uint64_t buf_pos = add_size; + for (int role = 0; role < read_pg_size; role++) + { + if (stripes[role].read_end != 0) + { + stripes[role].read_buf = buf + buf_pos; + buf_pos += stripes[role].read_end - stripes[role].read_start; + } + } + return buf; +} + +void* calc_rmw_reads(void *write_buf, osd_read_stripe_t *stripes, uint64_t *osd_set, uint64_t pg_size, uint64_t pg_minsize, uint64_t pg_cursize) +{ + // Generic parity modification (read-modify-write) algorithm + // Reconstruct -> Read -> Calc parity -> Write + // Now we always read continuous ranges. This means that an update of the beginning + // of one data stripe and the end of another will lead to a read of full paired stripes. + // FIXME: (Maybe) read small individual ranges in that case instead. + uint32_t start = 0, end = 0; + for (int role = 0; role < pg_minsize; role++) + { + if (stripes[role].req_end != 0) + { + start = !end || stripes[role].req_start < start ? stripes[role].req_start : start; + end = std::max(stripes[role].req_end, end); + } + } + for (int role = 0; role < pg_minsize; role++) + { + cover_read(start, end, stripes[role]); + } + int has_parity = 0; + for (int role = pg_minsize; role < pg_size; role++) + { + if (osd_set[role] != 0) + { + has_parity++; + stripes[role].write_start = start; + stripes[role].write_end = end; + } + else + stripes[role].missing = true; + } + if (pg_cursize < pg_size) + { + if (has_parity == 0) + { + // Parity is missing, we don't need to read anything + for (int role = 0; role < pg_minsize; role++) + { + stripes[role].read_end = 0; + } + } + else + { + // Other stripe(s) are missing + for (int role = 0; role < pg_minsize; role++) + { + if (osd_set[role] == 0 && stripes[role].read_end != 0) + { + stripes[role].missing = true; + for (int r2 = 0; r2 < pg_size; r2++) + { + // Read the non-covered range of from all other stripes to reconstruct it + if (r2 != role && osd_set[r2] != 0) + { + extend_read(stripes[role].read_start, stripes[role].read_end, stripes[r2]); + } + } + } + } + } + } + // Allocate read buffers + void *rmw_buf = alloc_read_buffer(stripes, pg_size, has_parity * (end - start)); + // Position parity & write buffers + uint64_t buf_pos = 0, in_pos = 0; + for (int role = 0; role < pg_size; role++) + { + if (stripes[role].req_end != 0) + { + stripes[role].write_buf = write_buf + in_pos; + in_pos += stripes[role].req_end - stripes[role].req_start; + } + else if (role >= pg_minsize && osd_set[role] != 0) + { + stripes[role].write_buf = rmw_buf + buf_pos; + buf_pos += end - start; + } + } + return rmw_buf; +} + +static void get_old_new_buffers(osd_read_stripe_t & stripe, uint32_t wr_start, uint32_t wr_end, buf_len_t *bufs, int & nbufs) +{ + uint32_t ns = 0, ne = 0, os = 0, oe = 0; + if (stripe.req_end > wr_start && + stripe.req_start < wr_end) + { + ns = std::max(stripe.req_start, wr_start); + ne = std::min(stripe.req_end, wr_end); + } + if (stripe.read_end > wr_start && + stripe.read_start < wr_end) + { + os = std::max(stripe.read_start, wr_start); + oe = std::min(stripe.req_end, wr_end); + } + if (ne && (!oe || ns <= os)) + { + // NEW or NEW->OLD + bufs[nbufs++] = { .buf = stripe.write_buf + ns - stripe.req_start, .len = ne-ns }; + if (os < ne) + os = ne; + if (oe > os) + { + // NEW->OLD + bufs[nbufs++] = { .buf = stripe.read_buf + os - stripe.read_start, .len = oe-os }; + } + } + else if (oe) + { + // OLD or OLD->NEW or OLD->NEW->OLD + if (ne) + { + // OLD->NEW or OLD->NEW->OLD + bufs[nbufs++] = { .buf = stripe.read_buf + os - stripe.read_start, .len = ns-os }; + bufs[nbufs++] = { .buf = stripe.write_buf + ns - stripe.req_start, .len = ne-ns }; + if (oe > ne) + { + // OLD->NEW->OLD + bufs[nbufs++] = { .buf = stripe.read_buf + ne - stripe.read_start, .len = oe-ne }; + } + } + else + { + // OLD + bufs[nbufs++] = { .buf = stripe.read_buf + os - stripe.read_start, .len = oe-os }; + } + } +} + +static void xor_multiple_buffers(buf_len_t *xor1, int n1, buf_len_t *xor2, int n2, void *dest, uint32_t len) +{ + assert(n1 > 0 && n2 > 0); + int i1 = 0, i2 = 0; + uint32_t start1 = 0, start2 = 0, end1 = xor1[0].len, end2 = xor2[0].len; + uint32_t pos = 0; + while (pos < len) + { + // We know for sure that ranges overlap + uint32_t end = std::min(end1, end2); + memxor(xor1[i1].buf + pos-start1, xor2[i2].buf + pos-start2, dest+pos, end-pos); + pos = end; + if (pos >= end1) + { + i1++; + if (i1 >= n1) + { + assert(pos >= end2); + return; + } + start1 = end1; + end1 += xor1[i1].len; + } + if (pos >= end2) + { + i2++; + start2 = end2; + end2 += xor2[i2].len; + } + } +} + +void calc_rmw_parity(osd_read_stripe_t *stripes, int pg_size) +{ + if (stripes[pg_size-1].missing) + { + // Parity OSD is unavailable + return; + } + for (int role = 0; role < pg_size; role++) + { + if (stripes[role].read_end != 0 && stripes[role].missing) + { + // Reconstruct missing stripe (EC k+1) + reconstruct_stripe(stripes, pg_size, role); + break; + } + } + // Calculate new parity (EC k+1) + int parity = pg_size-1, prev = -2; + auto wr_end = stripes[parity].write_end; + auto wr_start = stripes[parity].write_start; + for (int other = 0; other < pg_size-1; other++) + { + if (prev == -2) + { + prev = other; + } + else + { + int n1 = 0, n2 = 0; + buf_len_t xor1[3], xor2[3]; + if (prev == -1) + { + xor1[n1++] = { .buf = stripes[parity].write_buf, .len = wr_end-wr_start }; + } + else + { + get_old_new_buffers(stripes[prev], wr_start, wr_end, xor1, n1); + prev = -1; + } + get_old_new_buffers(stripes[other], wr_start, wr_end, xor2, n2); + xor_multiple_buffers(xor1, n1, xor2, n2, stripes[parity].write_buf, wr_end-wr_start); + } + } +} diff --git a/osd_rmw.h b/osd_rmw.h new file mode 100644 index 000000000..7f2393e35 --- /dev/null +++ b/osd_rmw.h @@ -0,0 +1,36 @@ +#pragma once + +#include +#include "object_id.h" +#include "osd_id.h" + +#ifndef MEM_ALIGNMENT +#define MEM_ALIGNMENT 512 +#endif + +struct buf_len_t +{ + void *buf; + uint64_t len; +}; + +struct osd_read_stripe_t +{ + void *read_buf, *write_buf; + uint32_t req_start, req_end; + uint32_t read_start, read_end; + uint32_t write_start, write_end; + bool missing; +}; + +void split_stripes(uint64_t pg_minsize, uint32_t bs_block_size, uint32_t start, uint32_t len, osd_read_stripe_t *stripes); + +void reconstruct_stripe(osd_read_stripe_t *stripes, int pg_size, int role); + +int extend_missing_stripes(osd_read_stripe_t *stripes, osd_num_t *osd_set, int minsize, int size); + +void* alloc_read_buffer(osd_read_stripe_t *stripes, int read_pg_size, uint64_t add_size); + +void* calc_rmw_reads(void *write_buf, osd_read_stripe_t *stripes, uint64_t *osd_set, uint64_t pg_size, uint64_t pg_minsize, uint64_t pg_cursize); + +void calc_rmw_parity(osd_read_stripe_t *stripes, int pg_size); diff --git a/osd_rmw_test.cpp b/osd_rmw_test.cpp new file mode 100644 index 000000000..a04e348a6 --- /dev/null +++ b/osd_rmw_test.cpp @@ -0,0 +1,100 @@ +#include +#include "osd_rmw.cpp" + +#define PATTERN0 0x8c4641acc762840e +#define PATTERN1 0x70a549add9a2280a +#define PATTERN2 0xffe3bad5f578a78e +#define PATTERN3 0x426bd7854eb08509 + +#define set_pattern(buf, len, pattern) for (uint64_t i = 0; i < len; i += 8) { *(uint64_t*)((void*)buf + i) = pattern; } +#define check_pattern(buf, len, pattern) for (uint64_t i = 0; i < len; i += 8) { assert(*(uint64_t*)(buf + i) == pattern); } + +int main(int narg, char *args[]) +{ + osd_num_t osd_set[3] = { 1, 0, 3 }; + osd_read_stripe_t stripes[3] = { 0 }; + // Test 1 + split_stripes(2, 128*1024, 128*1024-4096, 8192, stripes); + assert(stripes[0].req_start == 128*1024-4096 && stripes[0].req_end == 128*1024); + assert(stripes[1].req_start == 0 && stripes[1].req_end == 4096); + assert(stripes[2].req_end == 0); + // Test 2 + for (int i = 0; i < 3; i++) + { + stripes[i].read_start = stripes[i].req_start; + stripes[i].read_end = stripes[i].req_end; + } + assert(extend_missing_stripes(stripes, osd_set, 2, 3) == 0); + assert(stripes[0].read_start == 0 && stripes[0].read_end == 128*1024); + assert(stripes[2].read_start == 0 && stripes[2].read_end == 4096); + // Test 3 + stripes[0] = { .req_start = 128*1024-4096, .req_end = 128*1024 }; + cover_read(0, 128*1024, stripes[0]); + assert(stripes[0].read_start == 0 && stripes[0].read_end == 128*1024-4096); + // Test 4.1 + memset(stripes, 0, sizeof(stripes)); + split_stripes(2, 128*1024, 128*1024-4096, 8192, stripes); + void* write_buf = malloc(8192); + void* rmw_buf = calc_rmw_reads(write_buf, stripes, osd_set, 3, 2, 2); + assert(stripes[0].read_start == 0 && stripes[0].read_end == 128*1024); + assert(stripes[1].read_start == 4096 && stripes[1].read_end == 128*1024); + assert(stripes[2].read_start == 4096 && stripes[2].read_end == 128*1024); + assert(stripes[0].read_buf == rmw_buf+128*1024); + assert(stripes[1].read_buf == rmw_buf+128*1024*2); + assert(stripes[2].read_buf == rmw_buf+128*1024*3-4096); + assert(stripes[0].write_buf == write_buf); + assert(stripes[1].write_buf == write_buf+4096); + assert(stripes[2].write_buf == rmw_buf); + // Test 4.2 + set_pattern(write_buf, 8192, PATTERN0); + set_pattern(stripes[0].read_buf, 128*1024, PATTERN1); // old data + set_pattern(stripes[1].read_buf, 128*1024-4096, UINT64_MAX); // didn't read it, it's missing + set_pattern(stripes[2].read_buf, 128*1024-4096, 0); // old parity = 0 + calc_rmw_parity(stripes, 3); + check_pattern(stripes[2].write_buf, 4096, PATTERN0^PATTERN1); // new parity + check_pattern(stripes[2].write_buf+4096, 128*1024-4096*2, 0); // new parity + check_pattern(stripes[2].write_buf+128*1024-4096, 4096, PATTERN0^PATTERN1); // new parity + free(rmw_buf); + free(write_buf); + // Test 5.1 + memset(stripes, 0, sizeof(stripes)); + split_stripes(2, 128*1024, 0, 64*1024*3, stripes); + assert(stripes[0].req_start == 0 && stripes[0].req_end == 128*1024); + assert(stripes[1].req_start == 0 && stripes[1].req_end == 64*1024); + assert(stripes[2].req_end == 0); + // Test 5.2 + write_buf = malloc(64*1024*3); + rmw_buf = calc_rmw_reads(write_buf, stripes, osd_set, 3, 2, 2); + assert(stripes[0].read_start == 64*1024 && stripes[0].read_end == 128*1024); + assert(stripes[1].read_start == 64*1024 && stripes[1].read_end == 128*1024); + assert(stripes[2].read_start == 64*1024 && stripes[2].read_end == 128*1024); + assert(stripes[0].read_buf == rmw_buf+128*1024); + assert(stripes[1].read_buf == rmw_buf+64*3*1024); + assert(stripes[2].read_buf == rmw_buf+64*4*1024); + assert(stripes[0].write_buf == write_buf); + assert(stripes[1].write_buf == write_buf+128*1024); + assert(stripes[2].write_buf == rmw_buf); + free(rmw_buf); + free(write_buf); + // Test 6.1 + memset(stripes, 0, sizeof(stripes)); + split_stripes(2, 128*1024, 0, 64*1024*3, stripes); + osd_set[1] = 2; + write_buf = malloc(64*1024*3); + rmw_buf = calc_rmw_reads(write_buf, stripes, osd_set, 3, 2, 3); + assert(stripes[0].read_end == 0); + assert(stripes[1].read_start == 64*1024 && stripes[1].read_end == 128*1024); + assert(stripes[2].read_end == 0); + assert(stripes[0].read_buf == 0); + assert(stripes[1].read_buf == rmw_buf+128*1024); + assert(stripes[2].read_buf == 0); + assert(stripes[0].write_buf == write_buf); + assert(stripes[1].write_buf == write_buf+128*1024); + assert(stripes[2].write_buf == rmw_buf); + free(rmw_buf); + free(write_buf); + osd_set[1] = 0; + // End + printf("all ok\n"); + return 0; +}