From a4c46ba7450682c48a50af37cf7824eafb999ef0 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Mon, 30 Nov 2020 00:08:25 +0300 Subject: [PATCH] Add jerasure EC support (reed_sol_van, others are slower) (not tested yet) --- Makefile | 6 +- etcd_state_client.cpp | 115 +++++++++----- etcd_state_client.h | 2 +- mon/mon.js | 34 ++-- osd_cluster.cpp | 24 +++ osd_id.h | 1 + osd_peering_pg.h | 2 +- osd_primary.cpp | 48 +++--- osd_primary.h | 2 +- osd_rmw.cpp | 320 +++++++++++++++++++++++++++++--------- osd_rmw.h | 13 +- osd_rmw_test.cpp | 354 +++++++++++++++++++++++++++++------------- test_pattern.h | 2 +- 13 files changed, 669 insertions(+), 254 deletions(-) diff --git a/Makefile b/Makefile index 23d0cb51..9447967e 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,7 @@ QEMU_PLUGINDIR ?= /usr/lib/x86_64-linux-gnu/qemu BLOCKSTORE_OBJS := allocator.o blockstore.o blockstore_impl.o blockstore_init.o blockstore_open.o blockstore_journal.o blockstore_read.o \ blockstore_write.o blockstore_sync.o blockstore_stable.o blockstore_rollback.o blockstore_flush.o crc32c.o ringloop.o # -fsanitize=address -CXXFLAGS := -g -O3 -Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith -fPIC -fdiagnostics-color=always +CXXFLAGS := -g -O3 -Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith -fPIC -fdiagnostics-color=always -I/usr/include/jerasure all: libfio_blockstore.so osd libfio_sec_osd.so libfio_cluster.so stub_osd stub_uring_osd stub_bench osd_test dump_journal qemu_driver.so nbd_proxy rm_inode clean: rm -f *.o libblockstore.so libfio_blockstore.so osd libfio_sec_osd.so libfio_cluster.so stub_osd stub_uring_osd stub_bench osd_test dump_journal qemu_driver.so nbd_proxy rm_inode @@ -36,13 +36,13 @@ OSD_OBJS := osd.o osd_secondary.o msgr_receive.o msgr_send.o osd_peering.o osd_f osd_primary.o osd_primary_subops.o etcd_state_client.o messenger.o osd_cluster.o http_client.o osd_ops.o pg_states.o \ osd_rmw.o json11.o base64.o timerfd_manager.o epoll_manager.o osd: ./libblockstore.so osd_main.cpp osd.h osd_ops.h $(OSD_OBJS) - g++ $(CXXFLAGS) -Wl,-rpath,'$(LIBDIR)/vitastor' -o $@ osd_main.cpp $(OSD_OBJS) ./libblockstore.so -ltcmalloc_minimal -luring + g++ $(CXXFLAGS) -Wl,-rpath,'$(LIBDIR)/vitastor' -o $@ osd_main.cpp $(OSD_OBJS) ./libblockstore.so -ltcmalloc_minimal -luring -lJerasure stub_osd: stub_osd.o rw_blocking.o g++ $(CXXFLAGS) -o $@ stub_osd.o rw_blocking.o -ltcmalloc_minimal osd_rmw_test: osd_rmw_test.o - g++ $(CXXFLAGS) -o $@ osd_rmw_test.o + g++ $(CXXFLAGS) -o $@ osd_rmw_test.o -lJerasure -fsanitize=address STUB_URING_OSD_OBJS := stub_uring_osd.o epoll_manager.o messenger.o msgr_send.o msgr_receive.o ringloop.o timerfd_manager.o json11.o stub_uring_osd: $(STUB_URING_OSD_OBJS) diff --git a/etcd_state_client.cpp b/etcd_state_client.cpp index c50a252b..e4528bd0 100644 --- a/etcd_state_client.cpp +++ b/etcd_state_client.cpp @@ -319,67 +319,98 @@ void etcd_state_client_t::parse_state(const std::string & key, const json11::Jso } for (auto & pool_item: value.object_items()) { + pool_config_t pc; + // ID pool_id_t pool_id = stoull_full(pool_item.first); if (!pool_id || pool_id >= POOL_ID_MAX) { printf("Pool ID %s is invalid (must be a number less than 0x%x), skipping pool\n", pool_item.first.c_str(), POOL_ID_MAX); continue; } - if (pool_item.second["pg_size"].uint64_value() < 1 || - pool_item.second["scheme"] == "xor" && pool_item.second["pg_size"].uint64_value() < 3) - { - printf("Pool %u has invalid pg_size, skipping pool\n", pool_id); - continue; - } - if (pool_item.second["pg_minsize"].uint64_value() < 1 || - pool_item.second["pg_minsize"].uint64_value() > pool_item.second["pg_size"].uint64_value() || - pool_item.second["pg_minsize"].uint64_value() < (pool_item.second["pg_size"].uint64_value() - 1)) - { - printf("Pool %u has invalid pg_minsize, skipping pool\n", pool_id); - continue; - } - if (pool_item.second["pg_count"].uint64_value() < 1) - { - printf("Pool %u has invalid pg_count, skipping pool\n", pool_id); - continue; - } - if (pool_item.second["name"].string_value() == "") + pc.id = pool_id; + // Pool Name + pc.name = pool_item.second["name"].string_value(); + if (pc.name == "") { printf("Pool %u has empty name, skipping pool\n", pool_id); continue; } - if (pool_item.second["scheme"] != "replicated" && pool_item.second["scheme"] != "xor") + // Failure Domain + pc.failure_domain = pool_item.second["failure_domain"].string_value(); + // Coding Scheme + if (pool_item.second["scheme"] == "replicated") + pc.scheme = POOL_SCHEME_REPLICATED; + else if (pool_item.second["scheme"] == "xor") + pc.scheme = POOL_SCHEME_XOR; + else if (pool_item.second["scheme"] == "jerasure") + pc.scheme = POOL_SCHEME_JERASURE; + else { - printf("Pool %u has invalid coding scheme (only \"xor\" and \"replicated\" are allowed), skipping pool\n", pool_id); + printf("Pool %u has invalid coding scheme (one of \"xor\", \"replicated\" or \"jerasure\" required), skipping pool\n", pool_id); continue; } - if (pool_item.second["max_osd_combinations"].uint64_value() > 0 && - pool_item.second["max_osd_combinations"].uint64_value() < 100) + // PG Size + pc.pg_size = pool_item.second["pg_size"].uint64_value(); + if (pc.pg_size < 1 || + pool_item.second["pg_size"].uint64_value() < 3 && + (pc.scheme == POOL_SCHEME_XOR || pc.scheme == POOL_SCHEME_JERASURE) || + pool_item.second["pg_size"].uint64_value() > 256) + { + printf("Pool %u has invalid pg_size, skipping pool\n", pool_id); + continue; + } + // Parity Chunks + pc.parity_chunks = pool_item.second["parity_chunks"].uint64_value(); + if (pc.scheme == POOL_SCHEME_XOR) + { + if (pc.parity_chunks > 1) + { + printf("Pool %u has invalid parity_chunks (must be 1), skipping pool\n", pool_id); + continue; + } + pc.parity_chunks = 1; + } + if (pc.scheme == POOL_SCHEME_JERASURE && + (pc.parity_chunks < 1 || pc.parity_chunks > pc.pg_size-2)) + { + printf("Pool %u has invalid parity_chunks (must be between 1 and pg_size-2), skipping pool\n", pool_id); + continue; + } + // PG MinSize + pc.pg_minsize = pool_item.second["pg_minsize"].uint64_value(); + if (pc.pg_minsize < 1 || pc.pg_minsize > pc.pg_size || + (pc.scheme == POOL_SCHEME_XOR || pc.scheme == POOL_SCHEME_JERASURE) && + pc.pg_minsize < (pc.pg_size-pc.parity_chunks)) + { + printf("Pool %u has invalid pg_minsize, skipping pool\n", pool_id); + continue; + } + // PG Count + pc.pg_count = pool_item.second["pg_count"].uint64_value(); + if (pc.pg_count < 1) + { + printf("Pool %u has invalid pg_count, skipping pool\n", pool_id); + continue; + } + // Max OSD Combinations + pc.max_osd_combinations = pool_item.second["max_osd_combinations"].uint64_value(); + if (!pc.max_osd_combinations) + pc.max_osd_combinations = 10000; + if (pc.max_osd_combinations > 0 && pc.max_osd_combinations < 100) { printf("Pool %u has invalid max_osd_combinations (must be at least 100), skipping pool\n", pool_id); continue; } + // PG Stripe Size + pc.pg_stripe_size = pool_item.second["pg_stripe_size"].uint64_value(); + uint64_t min_stripe_size = bs_block_size * (pc.scheme == POOL_SCHEME_REPLICATED ? 1 : (pc.pg_size-pc.parity_chunks)); + if (pc.pg_stripe_size < min_stripe_size) + pc.pg_stripe_size = min_stripe_size; + // Save + std::swap(pc.pg_config, this->pool_config[pool_id].pg_config); + std::swap(this->pool_config[pool_id], pc); auto & parsed_cfg = this->pool_config[pool_id]; parsed_cfg.exists = true; - parsed_cfg.id = pool_id; - parsed_cfg.name = pool_item.second["name"].string_value(); - parsed_cfg.scheme = pool_item.second["scheme"] == "replicated" ? POOL_SCHEME_REPLICATED : POOL_SCHEME_XOR; - parsed_cfg.pg_size = pool_item.second["pg_size"].uint64_value(); - parsed_cfg.pg_minsize = pool_item.second["pg_minsize"].uint64_value(); - parsed_cfg.pg_count = pool_item.second["pg_count"].uint64_value(); - parsed_cfg.failure_domain = pool_item.second["failure_domain"].string_value(); - parsed_cfg.pg_stripe_size = pool_item.second["pg_stripe_size"].uint64_value(); - uint64_t min_stripe_size = bs_block_size * - (parsed_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : parsed_cfg.pg_minsize); - if (parsed_cfg.pg_stripe_size < min_stripe_size) - { - parsed_cfg.pg_stripe_size = min_stripe_size; - } - parsed_cfg.max_osd_combinations = pool_item.second["max_osd_combinations"].uint64_value(); - if (!parsed_cfg.max_osd_combinations) - { - parsed_cfg.max_osd_combinations = 10000; - } for (auto & pg_item: parsed_cfg.pg_config) { if (pg_item.second.target_set.size() != parsed_cfg.pg_size) diff --git a/etcd_state_client.h b/etcd_state_client.h index 9db47eed..a55e80f7 100644 --- a/etcd_state_client.h +++ b/etcd_state_client.h @@ -43,7 +43,7 @@ struct pool_config_t pool_id_t id; std::string name; uint64_t scheme; - uint64_t pg_size, pg_minsize; + uint64_t pg_size, pg_minsize, parity_chunks; uint64_t pg_count; uint64_t real_pg_count; std::string failure_domain; diff --git a/mon/mon.js b/mon/mon.js index 26e5734f..0d676e5f 100644 --- a/mon/mon.js +++ b/mon/mon.js @@ -91,9 +91,12 @@ const etcd_tree = { /* pools: { : { name: 'testpool', - scheme: 'xor', + // jerasure uses Reed-Solomon-Vandermonde codes + scheme: 'replicated' | 'xor' | 'jerasure', pg_size: 3, pg_minsize: 2, + // number of parity chunks, required for jerasure + parity_chunks?: 1, pg_count: 100, failure_domain: 'host', max_osd_combinations: 10000, @@ -636,6 +639,7 @@ class Mon { pool_cfg.pg_size = Math.floor(pool_cfg.pg_size); pool_cfg.pg_minsize = Math.floor(pool_cfg.pg_minsize); + pool_cfg.parity_chunks = Math.floor(pool_cfg.parity_chunks) || undefined; pool_cfg.pg_count = Math.floor(pool_cfg.pg_count); pool_cfg.failure_domain = pool_cfg.failure_domain || 'host'; pool_cfg.max_osd_combinations = Math.floor(pool_cfg.max_osd_combinations) || 10000; @@ -645,8 +649,14 @@ class Mon console.log('Pool ID '+pool_id+' is invalid'); return false; } - if (!pool_cfg.pg_size || pool_cfg.pg_size < 1 || - pool_cfg.scheme === 'xor' && pool_cfg.pg_size < 3) + if (pool_cfg.scheme !== 'xor' && pool_cfg.scheme !== 'replicated' && pool_cfg.scheme !== 'jerasure') + { + if (warn) + console.log('Pool '+pool_id+' has invalid coding scheme (one of "xor", "replicated" and "jerasure" required)'); + return false; + } + if (!pool_cfg.pg_size || pool_cfg.pg_size < 1 || pool_cfg.pg_size > 256 || + (pool_cfg.scheme === 'xor' || pool_cfg.scheme == 'jerasure') && pool_cfg.pg_size < 3) { if (warn) console.log('Pool '+pool_id+' has invalid pg_size'); @@ -659,6 +669,18 @@ class Mon console.log('Pool '+pool_id+' has invalid pg_minsize'); return false; } + if (pool_cfg.scheme === 'xor' && pool_cfg.parity_chunks != 0 && pool_cfg.parity_chunks != 1) + { + if (warn) + console.log('Pool '+pool_id+' has invalid parity_chunks (must be 1)'); + return false; + } + if (pool_cfg.scheme === 'jerasure' && (pool_cfg.parity_chunks < 1 || pool_cfg.parity_chunks > pool_cfg.pg_size-2)) + { + if (warn) + console.log('Pool '+pool_id+' has invalid parity_chunks (must be between 1 and pg_size-2)'); + return false; + } if (!pool_cfg.pg_count || pool_cfg.pg_count < 1) { if (warn) @@ -671,12 +693,6 @@ class Mon console.log('Pool '+pool_id+' has empty name'); return false; } - if (pool_cfg.scheme !== 'xor' && pool_cfg.scheme !== 'replicated') - { - if (warn) - console.log('Pool '+pool_id+' has invalid coding scheme (only "xor" and "replicated" are allowed)'); - return false; - } if (pool_cfg.max_osd_combinations < 100) { if (warn) diff --git a/osd_cluster.cpp b/osd_cluster.cpp index 91346528..dd7164a0 100644 --- a/osd_cluster.cpp +++ b/osd_cluster.cpp @@ -4,6 +4,7 @@ #include "osd.h" #include "base64.h" #include "etcd_state_client.h" +#include "osd_rmw.h" // Startup sequence: // Start etcd watcher -> Load global OSD configuration -> Bind socket -> Acquire lease -> Report&lock OSD state @@ -32,12 +33,26 @@ void osd_t::init_cluster() } pgs[{ 1, 1 }] = (pg_t){ .state = PG_PEERING, + .scheme = POOL_SCHEME_XOR, .pg_cursize = 0, + .pg_size = 3, + .pg_minsize = 2, + .parity_chunks = 1, .pool_id = 1, .pg_num = 1, .target_set = { 1, 2, 3 }, .cur_set = { 0, 0, 0 }, }; + st_cli.pool_config[1] = (pool_config_t){ + .exists = true, + .id = 1, + .name = "testpool", + .scheme = POOL_SCHEME_XOR, + .pg_size = 3, + .pg_minsize = 2, + .pg_count = 1, + .real_pg_count = 1, + }; report_pg_state(pgs[{ 1, 1 }]); pg_counts[1] = 1; } @@ -583,6 +598,7 @@ void osd_t::apply_pg_config() .pg_cursize = 0, .pg_size = pool_item.second.pg_size, .pg_minsize = pool_item.second.pg_minsize, + .parity_chunks = pool_item.second.parity_chunks, .pool_id = pool_id, .pg_num = pg_num, .reported_epoch = pg_cfg.epoch, @@ -590,6 +606,10 @@ void osd_t::apply_pg_config() .all_peers = std::vector(all_peers.begin(), all_peers.end()), .target_set = pg_cfg.target_set, }; + if (pg.scheme == POOL_SCHEME_JERASURE) + { + use_jerasure(pg.pg_size, pg.pg_size-pg.parity_chunks, true); + } this->pg_state_dirty.insert({ .pool_id = pool_id, .pg_num = pg_num }); pg.print_state(); if (pg_cfg.cur_primary == this->osd_num) @@ -778,6 +798,10 @@ void osd_t::report_pg_states() { // Remove offline PGs after reporting their state this->pgs.erase(pg_it); + if (pg_it->second.scheme == POOL_SCHEME_JERASURE) + { + use_jerasure(pg_it->second.pg_size, pg_it->second.pg_size-pg_it->second.parity_chunks, false); + } } } } diff --git a/osd_id.h b/osd_id.h index ed359601..33dfb666 100644 --- a/osd_id.h +++ b/osd_id.h @@ -5,6 +5,7 @@ #define POOL_SCHEME_REPLICATED 1 #define POOL_SCHEME_XOR 2 +#define POOL_SCHEME_JERASURE 3 #define POOL_ID_MAX 0x10000 #define POOL_ID_BITS 16 #define INODE_POOL(inode) (pool_id_t)((inode) >> (64 - POOL_ID_BITS)) diff --git a/osd_peering_pg.h b/osd_peering_pg.h index c2ba4b92..f1829d5f 100644 --- a/osd_peering_pg.h +++ b/osd_peering_pg.h @@ -75,7 +75,7 @@ struct pg_t { int state = 0; uint64_t scheme = 0; - uint64_t pg_cursize = 0, pg_size = 0, pg_minsize = 0; + uint64_t pg_cursize = 0, pg_size = 0, pg_minsize = 0, parity_chunks = 0; pool_id_t pool_id = 0; pg_num_t pg_num = 0; uint64_t clean_count = 0, total_count = 0; diff --git a/osd_primary.cpp b/osd_primary.cpp index b4280b2f..bdfb1f99 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -16,8 +16,9 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op) { // PG number is calculated from the offset // Our EC scheme stores data in fixed chunks equal to (K*block size) - // K = pg_minsize in case of EC/XOR, or 1 for replicated pools + // K = (pg_size-parity_chunks) in case of EC/XOR, or 1 for replicated pools pool_id_t pool_id = INODE_POOL(cur_op->req.rw.inode); + // FIXME: We have to access pool config here, so make sure that it doesn't change while its PGs are active... auto pool_cfg_it = st_cli.pool_config.find(pool_id); if (pool_cfg_it == st_cli.pool_config.end()) { @@ -26,7 +27,8 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op) return false; } auto & pool_cfg = pool_cfg_it->second; - uint64_t pg_block_size = bs_block_size * (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_minsize); + uint64_t pg_data_size = (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks); + uint64_t pg_block_size = bs_block_size * pg_data_size; object_id oid = { .inode = cur_op->req.rw.inode, // oid.stripe = starting offset of the parity stripe @@ -37,6 +39,7 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op) if (pg_it == pgs.end() || !(pg_it->second.state & PG_ACTIVE)) { // This OSD is not primary for this PG or the PG is inactive + // FIXME: Allow reads from PGs degraded under pg_minsize, but don't allow writes finish_op(cur_op, -EPIPE); return false; } @@ -54,9 +57,9 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op) op_data->oid = oid; op_data->stripes = ((osd_rmw_stripe_t*)(op_data+1)); op_data->scheme = pool_cfg.scheme; + op_data->pg_data_size = pg_data_size; cur_op->op_data = op_data; - split_stripes((pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pg_it->second.pg_minsize), - bs_block_size, (uint32_t)(cur_op->req.rw.offset - oid.stripe), cur_op->req.rw.len, op_data->stripes); + split_stripes(pg_data_size, bs_block_size, (uint32_t)(cur_op->req.rw.offset - oid.stripe), cur_op->req.rw.len, op_data->stripes); pg_it->second.inflight++; return true; } @@ -101,7 +104,7 @@ void osd_t::continue_primary_read(osd_op_t *cur_op) else if (op_data->st == 2) goto resume_2; { auto & pg = pgs[{ .pool_id = INODE_POOL(op_data->oid.inode), .pg_num = op_data->pg_num }]; - for (int role = 0; role < (op_data->scheme == POOL_SCHEME_REPLICATED ? 1 : pg.pg_minsize); role++) + for (int role = 0; role < op_data->pg_data_size; role++) { op_data->stripes[role].read_start = op_data->stripes[role].req_start; op_data->stripes[role].read_end = op_data->stripes[role].req_end; @@ -112,24 +115,23 @@ void osd_t::continue_primary_read(osd_op_t *cur_op) if (pg.state == PG_ACTIVE || op_data->scheme == POOL_SCHEME_REPLICATED) { // Fast happy-path - cur_op->buf = alloc_read_buffer(op_data->stripes, - (op_data->scheme == POOL_SCHEME_REPLICATED ? 1 : pg.pg_minsize), 0); + cur_op->buf = alloc_read_buffer(op_data->stripes, op_data->pg_data_size, 0); submit_primary_subops(SUBMIT_READ, op_data->target_ver, - (op_data->scheme == POOL_SCHEME_REPLICATED ? pg.pg_size : pg.pg_minsize), pg.cur_set.data(), cur_op); + (op_data->scheme == POOL_SCHEME_REPLICATED ? pg.pg_size : op_data->pg_data_size), pg.cur_set.data(), cur_op); op_data->st = 1; } else { // PG may be degraded or have misplaced objects uint64_t* cur_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data(), &op_data->object_state); - if (extend_missing_stripes(op_data->stripes, cur_set, pg.pg_minsize, pg.pg_size) < 0) + if (extend_missing_stripes(op_data->stripes, cur_set, op_data->pg_data_size, pg.pg_size) < 0) { finish_op(cur_op, -EIO); return; } // Submit reads - op_data->pg_minsize = pg.pg_minsize; op_data->pg_size = pg.pg_size; + op_data->scheme = pg.scheme; op_data->degraded = 1; cur_op->buf = alloc_read_buffer(op_data->stripes, pg.pg_size, 0); submit_primary_subops(SUBMIT_READ, op_data->target_ver, pg.pg_size, cur_set, cur_op); @@ -147,14 +149,17 @@ resume_2: if (op_data->degraded) { // Reconstruct missing stripes - // FIXME: Always EC(k+1) by now. Add different coding schemes osd_rmw_stripe_t *stripes = op_data->stripes; - for (int role = 0; role < op_data->pg_minsize; role++) + if (op_data->scheme == POOL_SCHEME_XOR) + { + reconstruct_stripes_xor(stripes, op_data->pg_size); + } + else if (op_data->scheme == POOL_SCHEME_JERASURE) + { + reconstruct_stripes_jerasure(stripes, op_data->pg_size, op_data->pg_data_size); + } + for (int role = 0; role < op_data->pg_size; role++) { - if (stripes[role].read_end != 0 && stripes[role].missing) - { - reconstruct_stripe_xor(stripes, op_data->pg_size, role); - } if (stripes[role].req_end != 0) { // Send buffer in parts to avoid copying @@ -245,7 +250,7 @@ resume_1: else { cur_op->rmw_buf = calc_rmw(cur_op->buf, op_data->stripes, op_data->prev_set, - pg.pg_size, pg.pg_minsize, pg.pg_cursize, pg.cur_set.data(), bs_block_size); + pg.pg_size, op_data->pg_data_size, pg.pg_cursize, pg.cur_set.data(), bs_block_size); if (!cur_op->rmw_buf) { // Refuse partial overwrite of an incomplete object @@ -285,7 +290,14 @@ resume_3: else { // Recover missing stripes, calculate parity - calc_rmw_parity_xor(op_data->stripes, pg.pg_size, op_data->prev_set, pg.cur_set.data(), bs_block_size); + if (pg.scheme == POOL_SCHEME_XOR) + { + calc_rmw_parity_xor(op_data->stripes, pg.pg_size, op_data->prev_set, pg.cur_set.data(), bs_block_size); + } + else if (pg.scheme == POOL_SCHEME_JERASURE) + { + calc_rmw_parity_jerasure(op_data->stripes, pg.pg_size, op_data->pg_data_size, op_data->prev_set, pg.cur_set.data(), bs_block_size); + } } // Send writes if ((op_data->fact_ver >> (64-PG_EPOCH_BITS)) < pg.epoch) diff --git a/osd_primary.h b/osd_primary.h index 38e3a039..cda79bdb 100644 --- a/osd_primary.h +++ b/osd_primary.h @@ -25,7 +25,7 @@ struct osd_primary_op_data_t uint64_t fact_ver = 0; uint64_t scheme = 0; int n_subops = 0, done = 0, errors = 0, epipe = 0; - int degraded = 0, pg_size, pg_minsize; + int degraded = 0, pg_size, pg_data_size; osd_rmw_stripe_t *stripes; osd_op_t *subops = NULL; uint64_t *prev_set = NULL; diff --git a/osd_rmw.cpp b/osd_rmw.cpp index 818e2785..d77aa51f 100644 --- a/osd_rmw.cpp +++ b/osd_rmw.cpp @@ -3,6 +3,9 @@ #include #include +#include +#include +#include #include "xor.h" #include "osd_rmw.h" #include "malloc_or_die.h" @@ -75,44 +78,151 @@ void split_stripes(uint64_t pg_minsize, uint32_t bs_block_size, uint32_t start, } } -void reconstruct_stripe_xor(osd_rmw_stripe_t *stripes, int pg_size, int role) +void reconstruct_stripes_xor(osd_rmw_stripe_t *stripes, int pg_size) { - int prev = -2; - for (int other = 0; other < pg_size; other++) + for (int role = 0; role < pg_size; role++) { - if (other != role) + if (stripes[role].read_end != 0 && stripes[role].missing) { - if (prev == -2) + // Reconstruct missing stripe (XOR k+1) + int prev = -2; + for (int other = 0; other < pg_size; other++) { - prev = other; - } - else if (prev >= 0) - { - assert(stripes[role].read_start >= stripes[prev].read_start && - stripes[role].read_start >= stripes[other].read_start); - memxor( - stripes[prev].read_buf + (stripes[role].read_start - stripes[prev].read_start), - stripes[other].read_buf + (stripes[role].read_start - stripes[other].read_start), - stripes[role].read_buf, stripes[role].read_end - stripes[role].read_start - ); - prev = -1; - } - else - { - assert(stripes[role].read_start >= stripes[other].read_start); - memxor( - stripes[role].read_buf, - stripes[other].read_buf + (stripes[role].read_start - stripes[other].read_start), - stripes[role].read_buf, stripes[role].read_end - stripes[role].read_start - ); + if (other != role) + { + if (prev == -2) + { + prev = other; + } + else if (prev >= 0) + { + assert(stripes[role].read_start >= stripes[prev].read_start && + stripes[role].read_start >= stripes[other].read_start); + memxor( + stripes[prev].read_buf + (stripes[role].read_start - stripes[prev].read_start), + stripes[other].read_buf + (stripes[role].read_start - stripes[other].read_start), + stripes[role].read_buf, stripes[role].read_end - stripes[role].read_start + ); + prev = -1; + } + else + { + assert(stripes[role].read_start >= stripes[other].read_start); + memxor( + stripes[role].read_buf, + stripes[other].read_buf + (stripes[role].read_start - stripes[other].read_start), + stripes[role].read_buf, stripes[role].read_end - stripes[role].read_start + ); + } + } } } } } -int extend_missing_stripes(osd_rmw_stripe_t *stripes, osd_num_t *osd_set, int minsize, int size) +struct reed_sol_matrix_t { - for (int role = 0; role < minsize; role++) + int *data; + int refs = 0; +}; + +std::map matrices; + +void use_jerasure(int pg_size, int pg_minsize, bool use) +{ + uint64_t key = (uint64_t)pg_size | ((uint64_t)pg_minsize) << 32; + auto rs_it = matrices.find(key); + if (rs_it == matrices.end()) + { + if (!use) + { + return; + } + int *matrix = reed_sol_vandermonde_coding_matrix(pg_minsize, pg_size-pg_minsize, 32); + matrices[key] = (reed_sol_matrix_t){ + .data = matrix, + .refs = 0, + }; + rs_it = matrices.find(key); + } + rs_it->second.refs += (!use ? -1 : 1); + if (rs_it->second.refs <= 0) + { + free(rs_it->second.data); + matrices.erase(rs_it); + } +} + +int* get_jerasure_matrix(int pg_size, int pg_minsize) +{ + uint64_t key = (uint64_t)pg_size | ((uint64_t)pg_minsize) << 32; + auto rs_it = matrices.find(key); + if (rs_it == matrices.end()) + { + throw std::runtime_error("jerasure matrix not initialized"); + } + return rs_it->second.data; +} + +void reconstruct_stripes_jerasure(osd_rmw_stripe_t *stripes, int pg_size, int pg_minsize) +{ + int *matrix = get_jerasure_matrix(pg_size, pg_minsize); + int erasures[pg_size]; + char *data_ptrs[pg_size] = { 0 }; + int erasure_count = 0; + int res = 0; + for (int role = 0; role < pg_minsize; role++) + { + if (stripes[role].read_end != 0 && stripes[role].missing) + { + erasures[erasure_count++] = role; + } + } + if (erasure_count > 0) + { + for (int role = erasure_count; role < pg_size; role++) + { + erasures[role] = -1; + } + for (int role = 0; role < pg_minsize; role++) + { + if (stripes[role].read_end != 0 && stripes[role].missing) + { + for (int other = 0; other < role; other++) + { + if (stripes[other].missing && + stripes[role].read_start == stripes[other].read_start && + stripes[role].read_end == stripes[other].read_end) + { + // We reconstruct multiple ranges + // Skip if the same range was already reconstructed + goto next_missing; + } + } + for (int other = 0; other < pg_size; other++) + { + data_ptrs[other] = (char*)(stripes[other].read_buf + (stripes[role].read_start - stripes[other].read_start)); + } + // FIXME jerasure has slightly dumb API and performs extra allocations internally + // also it creates a decoding matrix on every call which could be cached + // sooo :-) we have some room for improvements here :-) + res = jerasure_matrix_decode( + pg_minsize, pg_size-pg_minsize, 32, matrix, 1, erasures, + data_ptrs, data_ptrs+pg_minsize, stripes[role].read_end - stripes[role].read_start + ); + if (res < 0) + { + throw std::runtime_error("jerasure_matrix_decode() failed"); + } + } + next_missing:; + } + } +} + +int extend_missing_stripes(osd_rmw_stripe_t *stripes, osd_num_t *osd_set, int pg_minsize, int pg_size) +{ + for (int role = 0; role < pg_minsize; role++) { if (stripes[role].read_end != 0 && osd_set[role] == 0) { @@ -121,21 +231,21 @@ int extend_missing_stripes(osd_rmw_stripe_t *stripes, osd_num_t *osd_set, int mi // We need at least pg_minsize stripes to recover the lost part. // FIXME: LRC EC and similar don't require to read all other stripes. int exist = 0; - for (int j = 0; j < size; j++) + for (int j = 0; j < pg_size; j++) { if (osd_set[j] != 0) { extend_read(stripes[role].read_start, stripes[role].read_end, stripes[j]); exist++; - if (exist >= minsize) + if (exist >= pg_minsize) { break; } } } - if (exist < minsize) + if (exist < pg_minsize) { - // Less than minsize stripes are available for this object + // Less than pg_minsize stripes are available for this object return -1; } } @@ -369,19 +479,9 @@ static void xor_multiple_buffers(buf_len_t *xor1, int n1, buf_len_t *xor2, int n } } -void calc_rmw_parity_xor(osd_rmw_stripe_t *stripes, int pg_size, uint64_t *read_osd_set, uint64_t *write_osd_set, uint32_t chunk_size) +static void calc_rmw_parity_copy_mod(osd_rmw_stripe_t *stripes, int pg_size, int pg_minsize, + uint64_t *read_osd_set, uint64_t *write_osd_set, uint32_t chunk_size, uint32_t &start, uint32_t &end) { - int pg_minsize = pg_size-1; - for (int role = 0; role < pg_size; role++) - { - if (stripes[role].read_end != 0 && stripes[role].missing) - { - // Reconstruct missing stripe (XOR k+1) - reconstruct_stripe_xor(stripes, pg_size, role); - break; - } - } - uint32_t start = 0, end = 0; if (write_osd_set[pg_minsize] != 0 || write_osd_set != read_osd_set) { // Required for the next two if()s @@ -421,6 +521,53 @@ void calc_rmw_parity_xor(osd_rmw_stripe_t *stripes, int pg_size, uint64_t *read_ } } } +} + +static void calc_rmw_parity_copy_parity(osd_rmw_stripe_t *stripes, int pg_size, int pg_minsize, + uint64_t *read_osd_set, uint64_t *write_osd_set, uint32_t chunk_size, uint32_t start, uint32_t end) +{ + if (write_osd_set != read_osd_set) + { + for (int role = pg_minsize; role < pg_size; role++) + { + if (write_osd_set[role] != read_osd_set[role] && (start != 0 || end != chunk_size)) + { + // Copy new parity into the read buffer to write it back + memcpy( + stripes[role].read_buf + start, + stripes[role].write_buf, + end - start + ); + stripes[role].write_buf = stripes[role].read_buf; + stripes[role].write_start = 0; + stripes[role].write_end = chunk_size; + } + } + } +#ifdef RMW_DEBUG + printf("calc_rmw_parity:\n"); + for (int role = 0; role < pg_size; role++) + { + auto & s = stripes[role]; + printf( + "Tr=%lu Tw=%lu Q=%x-%x R=%x-%x W=%x-%x Rb=%lx Wb=%lx\n", + read_osd_set[role], write_osd_set[role], + s.req_start, s.req_end, + s.read_start, s.read_end, + s.write_start, s.write_end, + (uint64_t)s.read_buf, + (uint64_t)s.write_buf + ); + } +#endif +} + +void calc_rmw_parity_xor(osd_rmw_stripe_t *stripes, int pg_size, uint64_t *read_osd_set, uint64_t *write_osd_set, uint32_t chunk_size) +{ + int pg_minsize = pg_size-1; + reconstruct_stripes_xor(stripes, pg_size); + uint32_t start = 0, end = 0; + calc_rmw_parity_copy_mod(stripes, pg_size, pg_minsize, read_osd_set, write_osd_set, chunk_size, start, end); if (write_osd_set[pg_minsize] != 0 && end != 0) { // Calculate new parity (XOR k+1) @@ -449,38 +596,67 @@ void calc_rmw_parity_xor(osd_rmw_stripe_t *stripes, int pg_size, uint64_t *read_ } } } - if (write_osd_set != read_osd_set) + calc_rmw_parity_copy_parity(stripes, pg_size, pg_minsize, read_osd_set, write_osd_set, chunk_size, start, end); +} + +void calc_rmw_parity_jerasure(osd_rmw_stripe_t *stripes, int pg_size, int pg_minsize, + uint64_t *read_osd_set, uint64_t *write_osd_set, uint32_t chunk_size) +{ + int *matrix = get_jerasure_matrix(pg_size, pg_minsize); + reconstruct_stripes_jerasure(stripes, pg_size, pg_minsize); + uint32_t start = 0, end = 0; + calc_rmw_parity_copy_mod(stripes, pg_size, pg_minsize, read_osd_set, write_osd_set, chunk_size, start, end); + if (end != 0) { - for (int role = pg_minsize; role < pg_size; role++) + int i; + for (i = pg_minsize; i < pg_size; i++) { - if (write_osd_set[role] != read_osd_set[role] && (start != 0 || end != chunk_size)) + if (write_osd_set[i] != 0) + break; + } + if (i < pg_size) + { + // Calculate new coding chunks + buf_len_t bufs[pg_size][3]; + int nbuf[pg_size] = { 0 }, curbuf[pg_size] = { 0 }; + uint32_t positions[pg_size]; + void *data_ptrs[pg_size] = { 0 }; + for (int i = 0; i < pg_minsize; i++) { - // Copy new parity into the read buffer to write it back - memcpy( - stripes[role].read_buf + start, - stripes[role].write_buf, - end - start - ); - stripes[role].write_buf = stripes[role].read_buf; - stripes[role].write_start = 0; - stripes[role].write_end = chunk_size; + get_old_new_buffers(stripes[i], start, end, bufs[i], nbuf[i]); + positions[i] = start; + } + for (int i = pg_minsize; i < pg_size; i++) + { + bufs[i][nbuf[i]++] = { .buf = stripes[i].write_buf, .len = end-start }; + positions[i] = start; + } + uint32_t pos = start; + while (pos < end) + { + uint32_t next_end = end; + for (int i = 0; i < pg_size; i++) + { + assert(curbuf[i] < nbuf[i]); + data_ptrs[i] = bufs[i][curbuf[i]].buf + pos-positions[i]; + uint32_t this_end = bufs[i][curbuf[i]].len + positions[i]; + if (next_end > this_end) + next_end = this_end; + } + assert(next_end > pos); + for (int i = 0; i < pg_size; i++) + { + uint32_t this_end = bufs[i][curbuf[i]].len + positions[i]; + if (next_end >= this_end) + { + positions[i] += bufs[i][curbuf[i]].len; + curbuf[i]++; + } + } + jerasure_matrix_encode(pg_minsize, pg_size-pg_minsize, 32, matrix, (char**)data_ptrs, (char**)data_ptrs+pg_minsize, next_end-pos); + pos = next_end; } } } -#ifdef RMW_DEBUG - printf("calc_rmw_xor:\n"); - for (int role = 0; role < pg_size; role++) - { - auto & s = stripes[role]; - printf( - "Tr=%lu Tw=%lu Q=%x-%x R=%x-%x W=%x-%x Rb=%lx Wb=%lx\n", - read_osd_set[role], write_osd_set[role], - s.req_start, s.req_end, - s.read_start, s.read_end, - s.write_start, s.write_end, - (uint64_t)s.read_buf, - (uint64_t)s.write_buf - ); - } -#endif + calc_rmw_parity_copy_parity(stripes, pg_size, pg_minsize, read_osd_set, write_osd_set, chunk_size, start, end); } diff --git a/osd_rmw.h b/osd_rmw.h index b0b1a97c..89fa2d74 100644 --- a/osd_rmw.h +++ b/osd_rmw.h @@ -26,11 +26,13 @@ struct osd_rmw_stripe_t bool missing; }; +// Here pg_minsize is the number of data chunks, not the minimum number of alive OSDs for the PG to operate + void split_stripes(uint64_t pg_minsize, uint32_t bs_block_size, uint32_t start, uint32_t len, osd_rmw_stripe_t *stripes); -void reconstruct_stripe_xor(osd_rmw_stripe_t *stripes, int pg_size, int role); +void reconstruct_stripes_xor(osd_rmw_stripe_t *stripes, int pg_size); -int extend_missing_stripes(osd_rmw_stripe_t *stripes, osd_num_t *osd_set, int minsize, int size); +int extend_missing_stripes(osd_rmw_stripe_t *stripes, osd_num_t *osd_set, int pg_minsize, int pg_size); void* alloc_read_buffer(osd_rmw_stripe_t *stripes, int read_pg_size, uint64_t add_size); @@ -38,3 +40,10 @@ void* calc_rmw(void *request_buf, osd_rmw_stripe_t *stripes, uint64_t *read_osd_ uint64_t pg_size, uint64_t pg_minsize, uint64_t pg_cursize, uint64_t *write_osd_set, uint64_t chunk_size); void calc_rmw_parity_xor(osd_rmw_stripe_t *stripes, int pg_size, uint64_t *read_osd_set, uint64_t *write_osd_set, uint32_t chunk_size); + +void use_jerasure(int pg_size, int pg_minsize, bool use); + +void reconstruct_stripes_jerasure(osd_rmw_stripe_t *stripes, int pg_size, int pg_minsize); + +void calc_rmw_parity_jerasure(osd_rmw_stripe_t *stripes, int pg_size, int pg_minsize, + uint64_t *read_osd_set, uint64_t *write_osd_set, uint32_t chunk_size); diff --git a/osd_rmw_test.cpp b/osd_rmw_test.cpp index 5e70a4d1..54c15148 100644 --- a/osd_rmw_test.cpp +++ b/osd_rmw_test.cpp @@ -18,107 +18,7 @@ void test9(); void test10(); void test11(); void test12(); - -/*** - -Cases: - -1. split(offset=128K-4K, len=8K) - = [ [ 128K-4K, 128K ], [ 0, 4K ], [ 0, 0 ] ] - -2. read(offset=128K-4K, len=8K, osd_set=[1,0,3]) - = { read: [ [ 0, 128K ], [ 0, 4K ], [ 0, 4K ] ] } - -3. cover_read(0, 128K, { req: [ 128K-4K, 4K ] }) - = { read: [ 0, 128K-4K ] } - -4. write(offset=128K-4K, len=8K, osd_set=[1,0,3]) - = { - read: [ [ 0, 128K ], [ 4K, 128K ], [ 4K, 128K ] ], - write: [ [ 128K-4K, 128K ], [ 0, 4K ], [ 0, 128K ] ], - input buffer: [ write0, write1 ], - rmw buffer: [ write2, read0, read1, read2 ], - } - + check write2 buffer - -5. write(offset=0, len=128K+64K, osd_set=[1,0,3]) - = { - req: [ [ 0, 128K ], [ 0, 64K ], [ 0, 0 ] ], - read: [ [ 64K, 128K ], [ 64K, 128K ], [ 64K, 128K ] ], - write: [ [ 0, 128K ], [ 0, 64K ], [ 0, 128K ] ], - input buffer: [ write0, write1 ], - rmw buffer: [ write2, read0, read1, read2 ], - } - -6. write(offset=0, len=128K+64K, osd_set=[1,2,3]) - = { - req: [ [ 0, 128K ], [ 0, 64K ], [ 0, 0 ] ], - read: [ [ 0, 0 ], [ 64K, 128K ], [ 0, 0 ] ], - write: [ [ 0, 128K ], [ 0, 64K ], [ 0, 128K ] ], - input buffer: [ write0, write1 ], - rmw buffer: [ write2, read1 ], - } - -7. calc_rmw(offset=128K-4K, len=8K, osd_set=[1,0,3], write_set=[1,2,3]) - = { - read: [ [ 0, 128K ], [ 0, 128K ], [ 0, 128K ] ], - write: [ [ 128K-4K, 128K ], [ 0, 4K ], [ 0, 128K ] ], - input buffer: [ write0, write1 ], - rmw buffer: [ write2, read0, read1, read2 ], - } - then, after calc_rmw_parity_xor(): { - write: [ [ 128K-4K, 128K ], [ 0, 128K ], [ 0, 128K ] ], - write1==read1, - } - + check write1 buffer - + check write2 buffer - -8. calc_rmw(offset=0, len=128K+4K, osd_set=[0,2,3], write_set=[1,2,3]) - = { - read: [ [ 0, 0 ], [ 4K, 128K ], [ 0, 0 ] ], - write: [ [ 0, 128K ], [ 0, 4K ], [ 0, 128K ] ], - input buffer: [ write0, write1 ], - rmw buffer: [ write2, read1 ], - } - + check write2 buffer - -9. object recovery case: - calc_rmw(offset=0, len=0, read_osd_set=[0,2,3], write_osd_set=[1,2,3]) - = { - read: [ [ 0, 128K ], [ 0, 128K ], [ 0, 128K ] ], - write: [ [ 0, 0 ], [ 0, 0 ], [ 0, 0 ] ], - input buffer: NULL, - rmw buffer: [ read0, read1, read2 ], - } - then, after calc_rmw_parity_xor(): { - write: [ [ 0, 128K ], [ 0, 0 ], [ 0, 0 ] ], - write0==read0, - } - + check write0 buffer - -10. full overwrite/recovery case: - calc_rmw(offset=0, len=256K, read_osd_set=[1,0,0], write_osd_set=[1,2,3]) - = { - read: [ [ 0, 0 ], [ 0, 0 ], [ 0, 0 ] ], - write: [ [ 0, 128K ], [ 0, 128K ], [ 0, 128K ] ], - input buffer: [ write0, write1 ], - rmw buffer: [ write2 ], - } - then, after calc_rmw_parity_xor(): all the same - + check write2 buffer - -10. partial recovery case: - calc_rmw(offset=128K, len=128K, read_osd_set=[1,0,0], write_osd_set=[1,2,3]) - = { - read: [ [ 0, 128K ], [ 0, 0 ], [ 0, 0 ] ], - write: [ [ 0, 0 ], [ 0, 128K ], [ 0, 128K ] ], - input buffer: [ write1 ], - rmw buffer: [ write2, read0 ], - } - then, after calc_rmw_parity_xor(): all the same - + check write2 buffer - -***/ +void test13(); int main(int narg, char *args[]) { @@ -142,6 +42,8 @@ int main(int narg, char *args[]) test11(); // Test 12 test12(); + // Test 13 + test13(); // End printf("all ok\n"); return 0; @@ -169,6 +71,19 @@ void dump_stripes(osd_rmw_stripe_t *stripes, int pg_size) printf("\n"); } +/*** + +1. split(offset=128K-4K, len=8K) + = [ [ 128K-4K, 128K ], [ 0, 4K ], [ 0, 0 ] ] + + read(offset=128K-4K, len=8K, osd_set=[1,0,3]) + = { read: [ [ 0, 128K ], [ 0, 4K ], [ 0, 4K ] ] } + + cover_read(0, 128K, { req: [ 128K-4K, 4K ] }) + = { read: [ 0, 128K-4K ] } + +***/ + void test1() { osd_num_t osd_set[3] = { 1, 0, 3 }; @@ -193,6 +108,19 @@ void test1() assert(stripes[0].read_start == 0 && stripes[0].read_end == 128*1024-4096); } +/*** + +4. write(offset=128K-4K, len=8K, osd_set=[1,0,3]) + = { + read: [ [ 0, 128K ], [ 4K, 128K ], [ 4K, 128K ] ], + write: [ [ 128K-4K, 128K ], [ 0, 4K ], [ 0, 128K ] ], + input buffer: [ write0, write1 ], + rmw buffer: [ write2, read0, read1, read2 ], + } + + check write2 buffer + +***/ + void test4() { osd_num_t osd_set[3] = { 1, 0, 3 }; @@ -226,6 +154,19 @@ void test4() free(write_buf); } +/*** + +5. write(offset=0, len=128K+64K, osd_set=[1,0,3]) + = { + req: [ [ 0, 128K ], [ 0, 64K ], [ 0, 0 ] ], + read: [ [ 64K, 128K ], [ 64K, 128K ], [ 64K, 128K ] ], + write: [ [ 0, 128K ], [ 0, 64K ], [ 0, 128K ] ], + input buffer: [ write0, write1 ], + rmw buffer: [ write2, read0, read1, read2 ], + } + +***/ + void test5() { osd_num_t osd_set[3] = { 1, 0, 3 }; @@ -254,6 +195,19 @@ void test5() free(write_buf); } +/*** + +6. write(offset=0, len=128K+64K, osd_set=[1,2,3]) + = { + req: [ [ 0, 128K ], [ 0, 64K ], [ 0, 0 ] ], + read: [ [ 0, 0 ], [ 64K, 128K ], [ 0, 0 ] ], + write: [ [ 0, 128K ], [ 0, 64K ], [ 0, 128K ] ], + input buffer: [ write0, write1 ], + rmw buffer: [ write2, read1 ], + } + +***/ + void test6() { osd_num_t osd_set[3] = { 1, 2, 3 }; @@ -278,6 +232,24 @@ void test6() free(write_buf); } +/*** + +7. calc_rmw(offset=128K-4K, len=8K, osd_set=[1,0,3], write_set=[1,2,3]) + = { + read: [ [ 0, 128K ], [ 0, 128K ], [ 0, 128K ] ], + write: [ [ 128K-4K, 128K ], [ 0, 4K ], [ 0, 128K ] ], + input buffer: [ write0, write1 ], + rmw buffer: [ write2, read0, read1, read2 ], + } + then, after calc_rmw_parity_xor(): { + write: [ [ 128K-4K, 128K ], [ 0, 128K ], [ 0, 128K ] ], + write1==read1, + } + + check write1 buffer + + check write2 buffer + +***/ + void test7() { osd_num_t osd_set[3] = { 1, 0, 3 }; @@ -318,6 +290,19 @@ void test7() free(write_buf); } +/*** + +8. calc_rmw(offset=0, len=128K+4K, osd_set=[0,2,3], write_set=[1,2,3]) + = { + read: [ [ 0, 0 ], [ 4K, 128K ], [ 0, 0 ] ], + write: [ [ 0, 128K ], [ 0, 4K ], [ 0, 128K ] ], + input buffer: [ write0, write1 ], + rmw buffer: [ write2, read1 ], + } + + check write2 buffer + +***/ + void test8() { osd_num_t osd_set[3] = { 0, 2, 3 }; @@ -355,6 +340,24 @@ void test8() free(write_buf); } +/*** + +9. object recovery case: + calc_rmw(offset=0, len=0, read_osd_set=[0,2,3], write_osd_set=[1,2,3]) + = { + read: [ [ 0, 128K ], [ 0, 128K ], [ 0, 128K ] ], + write: [ [ 0, 0 ], [ 0, 0 ], [ 0, 0 ] ], + input buffer: NULL, + rmw buffer: [ read0, read1, read2 ], + } + then, after calc_rmw_parity_xor(): { + write: [ [ 0, 128K ], [ 0, 0 ], [ 0, 0 ] ], + write0==read0, + } + + check write0 buffer + +***/ + void test9() { osd_num_t osd_set[3] = { 0, 2, 3 }; @@ -395,6 +398,21 @@ void test9() free(rmw_buf); } +/*** + +10. full overwrite/recovery case: + calc_rmw(offset=0, len=256K, read_osd_set=[1,0,0], write_osd_set=[1,2,3]) + = { + read: [ [ 0, 0 ], [ 0, 0 ], [ 0, 0 ] ], + write: [ [ 0, 128K ], [ 0, 128K ], [ 0, 128K ] ], + input buffer: [ write0, write1 ], + rmw buffer: [ write2 ], + } + then, after calc_rmw_parity_xor(): all the same + + check write2 buffer + +***/ + void test10() { osd_num_t osd_set[3] = { 1, 0, 0 }; @@ -436,6 +454,21 @@ void test10() free(write_buf); } +/*** + +11. partial recovery case: + calc_rmw(offset=128K, len=128K, read_osd_set=[1,0,0], write_osd_set=[1,2,3]) + = { + read: [ [ 0, 128K ], [ 0, 0 ], [ 0, 0 ] ], + write: [ [ 0, 0 ], [ 0, 128K ], [ 0, 128K ] ], + input buffer: [ write1 ], + rmw buffer: [ write2, read0 ], + } + then, after calc_rmw_parity_xor(): all the same + + check write2 buffer + +***/ + void test11() { osd_num_t osd_set[3] = { 1, 0, 0 }; @@ -477,17 +510,32 @@ void test11() free(write_buf); } +/*** + +12. parity recovery case: + calc_rmw(offset=0, len=0, read_osd_set=[1,2,0], write_osd_set=[1,2,3]) + = { + read: [ [ 0, 128K ], [ 0, 128K ], [ 0, 0 ] ], + write: [ [ 0, 0 ], [ 0, 0 ], [ 0, 128K ] ], + input buffer: [], + rmw buffer: [ write2, read0, read1 ], + } + then, after calc_rmw_parity_xor(): all the same + + check write2 buffer + +***/ + void test12() { osd_num_t osd_set[3] = { 1, 2, 0 }; osd_num_t write_osd_set[3] = { 1, 2, 3 }; osd_rmw_stripe_t stripes[3] = { 0 }; - // Test 11.0 + // Test 12.0 split_stripes(2, 128*1024, 0, 0, stripes); assert(stripes[0].req_start == 0 && stripes[0].req_end == 0); assert(stripes[1].req_start == 0 && stripes[1].req_end == 0); assert(stripes[2].req_start == 0 && stripes[2].req_end == 0); - // Test 11.1 + // Test 12.1 void *rmw_buf = calc_rmw(NULL, stripes, osd_set, 3, 2, 3, write_osd_set, 128*1024); assert(rmw_buf); assert(stripes[0].read_start == 0 && stripes[0].read_end == 128*1024); @@ -502,7 +550,7 @@ void test12() assert(stripes[0].write_buf == NULL); assert(stripes[1].write_buf == NULL); assert(stripes[2].write_buf == rmw_buf); - // Test 11.2 + // Test 12.2 set_pattern(stripes[0].read_buf, 128*1024, PATTERN1); set_pattern(stripes[1].read_buf, 128*1024, PATTERN2); calc_rmw_parity_xor(stripes, 3, osd_set, write_osd_set, 128*1024); @@ -515,3 +563,101 @@ void test12() check_pattern(stripes[2].write_buf, 128*1024, PATTERN1^PATTERN2); free(rmw_buf); } + +/*** + +13. basic jerasure test + calc_rmw(offset=128K-4K, len=8K, osd_set=[1,2,0,0], write_set=[1,2,3,4]) + = { + read: [ [ 0, 128K ], [ 0, 128K ], [ 0, 0 ], [ 0, 0 ] ], + write: [ [ 128K-4K, 128K ], [ 0, 4K ], [ 0, 128K ], [ 0, 128K ] ], + input buffer: [ write0, write1 ], + rmw buffer: [ write2, write3, read0, read1 ], + } + then, after calc_rmw_parity_jerasure(): all the same + then simulate read with read_osd_set=[0,0,3,4] and check read0,read1 buffers + +***/ + +void test13() +{ + use_jerasure(4, 2, true); + osd_num_t osd_set[4] = { 1, 2, 0, 0 }; + osd_num_t write_osd_set[4] = { 1, 2, 3, 4 }; + osd_rmw_stripe_t stripes[4] = { 0 }; + // Test 13.0 + void *write_buf = malloc_or_die(8192); + split_stripes(2, 128*1024, 128*1024-4096, 8192, stripes); + assert(stripes[0].req_start == 128*1024-4096 && stripes[0].req_end == 128*1024); + assert(stripes[1].req_start == 0 && stripes[1].req_end == 4096); + assert(stripes[2].req_start == 0 && stripes[2].req_end == 0); + assert(stripes[3].req_start == 0 && stripes[3].req_end == 0); + // Test 13.1 + void *rmw_buf = calc_rmw(write_buf, stripes, osd_set, 4, 2, 4, write_osd_set, 128*1024); + assert(rmw_buf); + assert(stripes[0].read_start == 0 && stripes[0].read_end == 128*1024-4096); + assert(stripes[1].read_start == 4096 && stripes[1].read_end == 128*1024); + assert(stripes[2].read_start == 0 && stripes[2].read_end == 0); + assert(stripes[3].read_start == 0 && stripes[3].read_end == 0); + assert(stripes[0].write_start == 128*1024-4096 && stripes[0].write_end == 128*1024); + assert(stripes[1].write_start == 0 && stripes[1].write_end == 4096); + assert(stripes[2].write_start == 0 && stripes[2].write_end == 128*1024); + assert(stripes[3].write_start == 0 && stripes[3].write_end == 128*1024); + assert(stripes[0].read_buf == rmw_buf+2*128*1024); + assert(stripes[1].read_buf == rmw_buf+3*128*1024-4096); + assert(stripes[2].read_buf == NULL); + assert(stripes[3].read_buf == NULL); + assert(stripes[0].write_buf == write_buf); + assert(stripes[1].write_buf == write_buf+4096); + assert(stripes[2].write_buf == rmw_buf); + assert(stripes[3].write_buf == rmw_buf+128*1024); + // Test 13.2 - encode + set_pattern(write_buf, 8192, PATTERN3); + set_pattern(stripes[0].read_buf, 128*1024-4096, PATTERN1); + set_pattern(stripes[1].read_buf, 128*1024-4096, PATTERN2); + calc_rmw_parity_jerasure(stripes, 4, 2, osd_set, write_osd_set, 128*1024); + assert(stripes[0].write_start == 128*1024-4096 && stripes[0].write_end == 128*1024); + assert(stripes[1].write_start == 0 && stripes[1].write_end == 4096); + assert(stripes[2].write_start == 0 && stripes[2].write_end == 128*1024); + assert(stripes[3].write_start == 0 && stripes[3].write_end == 128*1024); + assert(stripes[0].write_buf == write_buf); + assert(stripes[1].write_buf == write_buf+4096); + assert(stripes[2].write_buf == rmw_buf); + assert(stripes[3].write_buf == rmw_buf+128*1024); + // Test 13.3 - decode and verify + osd_num_t read_osd_set[4] = { 0, 0, 3, 4 }; + memset(stripes, 0, sizeof(stripes)); + split_stripes(2, 128*1024, 0, 256*1024, stripes); + assert(stripes[0].req_start == 0 && stripes[0].req_end == 128*1024); + assert(stripes[1].req_start == 0 && stripes[1].req_end == 128*1024); + assert(stripes[2].req_start == 0 && stripes[2].req_end == 0); + assert(stripes[3].req_start == 0 && stripes[3].req_end == 0); + for (int role = 0; role < 2; role++) + { + stripes[role].read_start = stripes[role].req_start; + stripes[role].read_end = stripes[role].req_end; + } + assert(extend_missing_stripes(stripes, read_osd_set, 2, 4) == 0); + assert(stripes[0].read_start == 0 && stripes[0].read_end == 128*1024); + assert(stripes[1].read_start == 0 && stripes[1].read_end == 128*1024); + assert(stripes[2].read_start == 0 && stripes[2].read_end == 128*1024); + assert(stripes[3].read_start == 0 && stripes[3].read_end == 128*1024); + void *read_buf = alloc_read_buffer(stripes, 4, 0); + assert(read_buf); + assert(stripes[0].read_buf == read_buf); + assert(stripes[1].read_buf == read_buf+128*1024); + assert(stripes[2].read_buf == read_buf+2*128*1024); + assert(stripes[3].read_buf == read_buf+3*128*1024); + memcpy(read_buf+2*128*1024, rmw_buf, 128*1024); + memcpy(read_buf+3*128*1024, rmw_buf+128*1024, 128*1024); + reconstruct_stripes_jerasure(stripes, 4, 2); + check_pattern(stripes[0].read_buf, 128*1024-4096, PATTERN1); + check_pattern(stripes[0].read_buf+128*1024-4096, 4096, PATTERN3); + check_pattern(stripes[1].read_buf, 4096, PATTERN3); + check_pattern(stripes[1].read_buf+4096, 128*1024-4096, PATTERN2); + // Huh done + free(read_buf); + free(rmw_buf); + free(write_buf); + use_jerasure(4, 2, false); +} diff --git a/test_pattern.h b/test_pattern.h index 537602b9..7a9c5313 100644 --- a/test_pattern.h +++ b/test_pattern.h @@ -12,4 +12,4 @@ #define PATTERN3 0x426bd7854eb08509 #define set_pattern(buf, len, pattern) for (uint64_t i = 0; i < len; i += 8) { *(uint64_t*)((void*)buf + i) = pattern; } -#define check_pattern(buf, len, pattern) for (uint64_t i = 0; i < len; i += 8) { assert(*(uint64_t*)(buf + i) == pattern); } +#define check_pattern(buf, len, pattern) { uint64_t bad = UINT64_MAX; for (uint64_t i = 0; i < len; i += 8) { if ((*(uint64_t*)(buf + i)) != (pattern)) { bad = i; break; } } if (bad != UINT64_MAX) { printf("mismatch at %lx\n", bad); } assert(bad == UINT64_MAX); }