diff --git a/osd_rmw.cpp b/osd_rmw.cpp index d77aa51f..cb0bac98 100644 --- a/osd_rmw.cpp +++ b/osd_rmw.cpp @@ -120,10 +120,29 @@ void reconstruct_stripes_xor(osd_rmw_stripe_t *stripes, int pg_size) } } -struct reed_sol_matrix_t +struct reed_sol_erased_t { int *data; + int size; +}; + +inline bool operator < (const reed_sol_erased_t &a, const reed_sol_erased_t &b) +{ + for (int i = 0; i < a.size && i < b.size; i++) + { + if (a.data[i] < b.data[i]) + return -1; + else if (a.data[i] > b.data[i]) + return 1; + } + return 0; +} + +struct reed_sol_matrix_t +{ int refs = 0; + int *data; + std::map decodings; }; std::map matrices; @@ -140,8 +159,8 @@ void use_jerasure(int pg_size, int pg_minsize, bool use) } int *matrix = reed_sol_vandermonde_coding_matrix(pg_minsize, pg_size-pg_minsize, 32); matrices[key] = (reed_sol_matrix_t){ - .data = matrix, .refs = 0, + .data = matrix, }; rs_it = matrices.find(key); } @@ -149,11 +168,17 @@ void use_jerasure(int pg_size, int pg_minsize, bool use) if (rs_it->second.refs <= 0) { free(rs_it->second.data); + for (auto dec_it = rs_it->second.decodings.begin(); dec_it != rs_it->second.decodings.end();) + { + int *data = dec_it->second; + rs_it->second.decodings.erase(dec_it++); + free(data); + } matrices.erase(rs_it); } } -int* get_jerasure_matrix(int pg_size, int pg_minsize) +reed_sol_matrix_t* get_jerasure_matrix(int pg_size, int pg_minsize) { uint64_t key = (uint64_t)pg_size | ((uint64_t)pg_minsize) << 32; auto rs_it = matrices.find(key); @@ -161,61 +186,74 @@ int* get_jerasure_matrix(int pg_size, int pg_minsize) { throw std::runtime_error("jerasure matrix not initialized"); } - return rs_it->second.data; + return &rs_it->second; +} + +// jerasure_matrix_decode() decodes all chunks at once and tries to reencode all missing coding chunks. +// we don't need it. also it makes an extra allocation of int *erased on every call and doesn't cache +// the decoding matrix. +// all these flaws are fixed in this function: +int* get_jerasure_decoding_matrix(osd_rmw_stripe_t *stripes, int pg_size, int pg_minsize) +{ + int edd = 0; + int erased[pg_size] = { 0 }; + for (int i = 0; i < pg_size; i++) + if (stripes[i].read_end == 0 || stripes[i].missing) + erased[i] = 1; + for (int i = 0; i < pg_minsize; i++) + if (stripes[i].read_end != 0 && stripes[i].missing) + edd++; + if (edd == 0) + return NULL; + reed_sol_matrix_t *matrix = get_jerasure_matrix(pg_size, pg_minsize); + auto dec_it = matrix->decodings.find((reed_sol_erased_t){ .data = erased, .size = pg_size }); + if (dec_it == matrix->decodings.end()) + { + int *dm_ids = (int*)malloc(sizeof(int)*(pg_minsize + pg_minsize*pg_minsize + pg_size)); + int *decoding_matrix = dm_ids + pg_minsize; + if (!dm_ids) + throw std::bad_alloc(); + // we always use row_k_ones=1 and w=32 + if (jerasure_make_decoding_matrix(pg_minsize, pg_size-pg_minsize, 32, matrix->data, erased, decoding_matrix, dm_ids) < 0) + { + free(dm_ids); + throw std::runtime_error("jerasure_make_decoding_matrix() failed"); + } + int *erased_copy = dm_ids + pg_minsize + pg_minsize*pg_minsize; + memcpy(erased_copy, erased, pg_size*sizeof(int)); + matrix->decodings.emplace((reed_sol_erased_t){ .data = erased_copy, .size = pg_size }, dm_ids); + return dm_ids; + } + return dec_it->second; } void reconstruct_stripes_jerasure(osd_rmw_stripe_t *stripes, int pg_size, int pg_minsize) { - int *matrix = get_jerasure_matrix(pg_size, pg_minsize); - int erasures[pg_size]; + int *dm_ids = get_jerasure_decoding_matrix(stripes, pg_size, pg_minsize); + if (!dm_ids) + { + return; + } + int *decoding_matrix = dm_ids + pg_minsize; char *data_ptrs[pg_size] = { 0 }; - int erasure_count = 0; - int res = 0; for (int role = 0; role < pg_minsize; role++) { if (stripes[role].read_end != 0 && stripes[role].missing) { - erasures[erasure_count++] = role; - } - } - if (erasure_count > 0) - { - for (int role = erasure_count; role < pg_size; role++) - { - erasures[role] = -1; - } - for (int role = 0; role < pg_minsize; role++) - { - if (stripes[role].read_end != 0 && stripes[role].missing) + for (int other = 0; other < pg_size; other++) { - for (int other = 0; other < role; other++) - { - if (stripes[other].missing && - stripes[role].read_start == stripes[other].read_start && - stripes[role].read_end == stripes[other].read_end) - { - // We reconstruct multiple ranges - // Skip if the same range was already reconstructed - goto next_missing; - } - } - for (int other = 0; other < pg_size; other++) + if (stripes[other].read_end != 0 && !stripes[other].missing) { + assert(stripes[other].read_start <= stripes[role].read_start); + assert(stripes[other].read_end >= stripes[role].read_end); data_ptrs[other] = (char*)(stripes[other].read_buf + (stripes[role].read_start - stripes[other].read_start)); } - // FIXME jerasure has slightly dumb API and performs extra allocations internally - // also it creates a decoding matrix on every call which could be cached - // sooo :-) we have some room for improvements here :-) - res = jerasure_matrix_decode( - pg_minsize, pg_size-pg_minsize, 32, matrix, 1, erasures, - data_ptrs, data_ptrs+pg_minsize, stripes[role].read_end - stripes[role].read_start - ); - if (res < 0) - { - throw std::runtime_error("jerasure_matrix_decode() failed"); - } } - next_missing:; + data_ptrs[role] = (char*)stripes[role].read_buf; + jerasure_matrix_dotprod( + pg_minsize, 32, decoding_matrix+(role*pg_minsize), dm_ids, role, + data_ptrs, data_ptrs+pg_minsize, stripes[role].read_end - stripes[role].read_start + ); } } } @@ -602,7 +640,7 @@ void calc_rmw_parity_xor(osd_rmw_stripe_t *stripes, int pg_size, uint64_t *read_ void calc_rmw_parity_jerasure(osd_rmw_stripe_t *stripes, int pg_size, int pg_minsize, uint64_t *read_osd_set, uint64_t *write_osd_set, uint32_t chunk_size) { - int *matrix = get_jerasure_matrix(pg_size, pg_minsize); + reed_sol_matrix_t *matrix = get_jerasure_matrix(pg_size, pg_minsize); reconstruct_stripes_jerasure(stripes, pg_size, pg_minsize); uint32_t start = 0, end = 0; calc_rmw_parity_copy_mod(stripes, pg_size, pg_minsize, read_osd_set, write_osd_set, chunk_size, start, end); @@ -638,6 +676,7 @@ void calc_rmw_parity_jerasure(osd_rmw_stripe_t *stripes, int pg_size, int pg_min for (int i = 0; i < pg_size; i++) { assert(curbuf[i] < nbuf[i]); + assert(bufs[i][curbuf[i]].buf); data_ptrs[i] = bufs[i][curbuf[i]].buf + pos-positions[i]; uint32_t this_end = bufs[i][curbuf[i]].len + positions[i]; if (next_end > this_end) @@ -653,7 +692,10 @@ void calc_rmw_parity_jerasure(osd_rmw_stripe_t *stripes, int pg_size, int pg_min curbuf[i]++; } } - jerasure_matrix_encode(pg_minsize, pg_size-pg_minsize, 32, matrix, (char**)data_ptrs, (char**)data_ptrs+pg_minsize, next_end-pos); + jerasure_matrix_encode( + pg_minsize, pg_size-pg_minsize, 32, matrix->data, + (char**)data_ptrs, (char**)data_ptrs+pg_minsize, next_end-pos + ); pos = next_end; } } diff --git a/osd_rmw_test.cpp b/osd_rmw_test.cpp index 54c15148..426ec8ac 100644 --- a/osd_rmw_test.cpp +++ b/osd_rmw_test.cpp @@ -19,6 +19,7 @@ void test10(); void test11(); void test12(); void test13(); +void test14(); int main(int narg, char *args[]) { @@ -44,6 +45,8 @@ int main(int narg, char *args[]) test12(); // Test 13 test13(); + // Test 14 + test14(); // End printf("all ok\n"); return 0; @@ -103,7 +106,7 @@ void test1() assert(stripes[0].read_start == 0 && stripes[0].read_end == 128*1024); assert(stripes[2].read_start == 0 && stripes[2].read_end == 4096); // Test 1.3 - stripes[0] = { .req_start = 128*1024-4096, .req_end = 128*1024 }; + stripes[0] = (osd_rmw_stripe_t){ .req_start = 128*1024-4096, .req_end = 128*1024 }; cover_read(0, 128*1024, stripes[0]); assert(stripes[0].read_start == 0 && stripes[0].read_end == 128*1024-4096); } @@ -566,7 +569,7 @@ void test12() /*** -13. basic jerasure test +13. basic jerasure 2+2 test calc_rmw(offset=128K-4K, len=8K, osd_set=[1,2,0,0], write_set=[1,2,3,4]) = { read: [ [ 0, 128K ], [ 0, 128K ], [ 0, 0 ], [ 0, 0 ] ], @@ -624,7 +627,7 @@ void test13() assert(stripes[1].write_buf == write_buf+4096); assert(stripes[2].write_buf == rmw_buf); assert(stripes[3].write_buf == rmw_buf+128*1024); - // Test 13.3 - decode and verify + // Test 13.3 - full decode and verify osd_num_t read_osd_set[4] = { 0, 0, 3, 4 }; memset(stripes, 0, sizeof(stripes)); split_stripes(2, 128*1024, 0, 256*1024, stripes); @@ -632,7 +635,7 @@ void test13() assert(stripes[1].req_start == 0 && stripes[1].req_end == 128*1024); assert(stripes[2].req_start == 0 && stripes[2].req_end == 0); assert(stripes[3].req_start == 0 && stripes[3].req_end == 0); - for (int role = 0; role < 2; role++) + for (int role = 0; role < 4; role++) { stripes[role].read_start = stripes[role].req_start; stripes[role].read_end = stripes[role].req_end; @@ -655,9 +658,125 @@ void test13() check_pattern(stripes[0].read_buf+128*1024-4096, 4096, PATTERN3); check_pattern(stripes[1].read_buf, 4096, PATTERN3); check_pattern(stripes[1].read_buf+4096, 128*1024-4096, PATTERN2); - // Huh done free(read_buf); + // Test 13.4 - partial decode (only 1st chunk) and verify + memset(stripes, 0, sizeof(stripes)); + split_stripes(2, 128*1024, 0, 128*1024, stripes); + assert(stripes[0].req_start == 0 && stripes[0].req_end == 128*1024); + assert(stripes[1].req_start == 0 && stripes[1].req_end == 0); + assert(stripes[2].req_start == 0 && stripes[2].req_end == 0); + assert(stripes[3].req_start == 0 && stripes[3].req_end == 0); + for (int role = 0; role < 4; role++) + { + stripes[role].read_start = stripes[role].req_start; + stripes[role].read_end = stripes[role].req_end; + } + assert(extend_missing_stripes(stripes, read_osd_set, 2, 4) == 0); + assert(stripes[0].read_start == 0 && stripes[0].read_end == 128*1024); + assert(stripes[1].read_start == 0 && stripes[1].read_end == 0); + assert(stripes[2].read_start == 0 && stripes[2].read_end == 128*1024); + assert(stripes[3].read_start == 0 && stripes[3].read_end == 128*1024); + read_buf = alloc_read_buffer(stripes, 4, 0); + assert(read_buf); + assert(stripes[0].read_buf == read_buf); + assert(stripes[1].read_buf == NULL); + assert(stripes[2].read_buf == read_buf+128*1024); + assert(stripes[3].read_buf == read_buf+2*128*1024); + memcpy(read_buf+128*1024, rmw_buf, 128*1024); + memcpy(read_buf+2*128*1024, rmw_buf+128*1024, 128*1024); + reconstruct_stripes_jerasure(stripes, 4, 2); + check_pattern(stripes[0].read_buf, 128*1024-4096, PATTERN1); + check_pattern(stripes[0].read_buf+128*1024-4096, 4096, PATTERN3); + free(read_buf); + // Huh done free(rmw_buf); free(write_buf); use_jerasure(4, 2, false); } + +/*** + +13. basic jerasure 2+1 test + calc_rmw(offset=128K-4K, len=8K, osd_set=[1,2,0], write_set=[1,2,3]) + = { + read: [ [ 0, 128K ], [ 0, 128K ], [ 0, 0 ] ], + write: [ [ 128K-4K, 128K ], [ 0, 4K ], [ 0, 128K ] ], + input buffer: [ write0, write1 ], + rmw buffer: [ write2, read0, read1 ], + } + then, after calc_rmw_parity_jerasure(): all the same + then simulate read with read_osd_set=[0,2,3] and check read0 buffer + +***/ + +void test14() +{ + use_jerasure(3, 2, true); + osd_num_t osd_set[3] = { 1, 2, 0 }; + osd_num_t write_osd_set[3] = { 1, 2, 3 }; + osd_rmw_stripe_t stripes[3] = { 0 }; + // Test 13.0 + void *write_buf = malloc_or_die(8192); + split_stripes(2, 128*1024, 128*1024-4096, 8192, stripes); + assert(stripes[0].req_start == 128*1024-4096 && stripes[0].req_end == 128*1024); + assert(stripes[1].req_start == 0 && stripes[1].req_end == 4096); + assert(stripes[2].req_start == 0 && stripes[2].req_end == 0); + // Test 13.1 + void *rmw_buf = calc_rmw(write_buf, stripes, osd_set, 3, 2, 3, write_osd_set, 128*1024); + assert(rmw_buf); + assert(stripes[0].read_start == 0 && stripes[0].read_end == 128*1024-4096); + assert(stripes[1].read_start == 4096 && stripes[1].read_end == 128*1024); + assert(stripes[2].read_start == 0 && stripes[2].read_end == 0); + assert(stripes[0].write_start == 128*1024-4096 && stripes[0].write_end == 128*1024); + assert(stripes[1].write_start == 0 && stripes[1].write_end == 4096); + assert(stripes[2].write_start == 0 && stripes[2].write_end == 128*1024); + assert(stripes[0].read_buf == rmw_buf+128*1024); + assert(stripes[1].read_buf == rmw_buf+2*128*1024-4096); + assert(stripes[2].read_buf == NULL); + assert(stripes[0].write_buf == write_buf); + assert(stripes[1].write_buf == write_buf+4096); + assert(stripes[2].write_buf == rmw_buf); + // Test 13.2 - encode + set_pattern(write_buf, 8192, PATTERN3); + set_pattern(stripes[0].read_buf, 128*1024-4096, PATTERN1); + set_pattern(stripes[1].read_buf, 128*1024-4096, PATTERN2); + calc_rmw_parity_jerasure(stripes, 3, 2, osd_set, write_osd_set, 128*1024); + assert(stripes[0].write_start == 128*1024-4096 && stripes[0].write_end == 128*1024); + assert(stripes[1].write_start == 0 && stripes[1].write_end == 4096); + assert(stripes[2].write_start == 0 && stripes[2].write_end == 128*1024); + assert(stripes[0].write_buf == write_buf); + assert(stripes[1].write_buf == write_buf+4096); + assert(stripes[2].write_buf == rmw_buf); + // Test 13.3 - decode and verify + osd_num_t read_osd_set[4] = { 0, 2, 3 }; + memset(stripes, 0, sizeof(stripes)); + split_stripes(2, 128*1024, 0, 128*1024, stripes); + assert(stripes[0].req_start == 0 && stripes[0].req_end == 128*1024); + assert(stripes[1].req_start == 0 && stripes[1].req_end == 0); + assert(stripes[2].req_start == 0 && stripes[2].req_end == 0); + for (int role = 0; role < 3; role++) + { + stripes[role].read_start = stripes[role].req_start; + stripes[role].read_end = stripes[role].req_end; + } + assert(extend_missing_stripes(stripes, read_osd_set, 2, 3) == 0); + assert(stripes[0].read_start == 0 && stripes[0].read_end == 128*1024); + assert(stripes[1].read_start == 0 && stripes[1].read_end == 128*1024); + assert(stripes[2].read_start == 0 && stripes[2].read_end == 128*1024); + void *read_buf = alloc_read_buffer(stripes, 3, 0); + assert(read_buf); + assert(stripes[0].read_buf == read_buf); + assert(stripes[1].read_buf == read_buf+128*1024); + assert(stripes[2].read_buf == read_buf+2*128*1024); + set_pattern(stripes[1].read_buf, 4096, PATTERN3); + set_pattern(stripes[1].read_buf+4096, 128*1024-4096, PATTERN2); + memcpy(stripes[2].read_buf, rmw_buf, 128*1024); + reconstruct_stripes_jerasure(stripes, 3, 2); + check_pattern(stripes[0].read_buf, 128*1024-4096, PATTERN1); + check_pattern(stripes[0].read_buf+128*1024-4096, 4096, PATTERN3); + free(read_buf); + // Huh done + free(rmw_buf); + free(write_buf); + use_jerasure(3, 2, false); +}